1
16
17 package framework
18
19 import (
20 "bytes"
21 "context"
22 "encoding/json"
23 "fmt"
24 "io"
25 "math/rand"
26 "net/url"
27 "os"
28 "os/exec"
29 "path"
30 "strconv"
31 "strings"
32 "sync"
33 "time"
34
35 "github.com/onsi/ginkgo/v2"
36 "github.com/onsi/gomega"
37
38 v1 "k8s.io/api/core/v1"
39 discoveryv1 "k8s.io/api/discovery/v1"
40 apierrors "k8s.io/apimachinery/pkg/api/errors"
41 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
42 "k8s.io/apimachinery/pkg/fields"
43 "k8s.io/apimachinery/pkg/runtime"
44 "k8s.io/apimachinery/pkg/runtime/schema"
45 "k8s.io/apimachinery/pkg/util/sets"
46 "k8s.io/apimachinery/pkg/util/uuid"
47 "k8s.io/apimachinery/pkg/util/wait"
48 "k8s.io/apimachinery/pkg/watch"
49 "k8s.io/client-go/dynamic"
50 clientset "k8s.io/client-go/kubernetes"
51 restclient "k8s.io/client-go/rest"
52 "k8s.io/client-go/tools/cache"
53 "k8s.io/client-go/tools/clientcmd"
54 clientcmdapi "k8s.io/client-go/tools/clientcmd/api"
55 watchtools "k8s.io/client-go/tools/watch"
56 imageutils "k8s.io/kubernetes/test/utils/image"
57 netutils "k8s.io/utils/net"
58 )
59
60 const (
61
62 awsMasterIP = "172.20.0.9"
63 )
64
65
66 const (
67
68 PodListTimeout = time.Minute
69
70
71 PodStartTimeout = 5 * time.Minute
72
73
74
75
76 PodStartShortTimeout = 2 * time.Minute
77
78
79 PodDeleteTimeout = 5 * time.Minute
80
81
82 PodGetTimeout = 2 * time.Minute
83
84
85 PodEventTimeout = 2 * time.Minute
86
87
88 ServiceStartTimeout = 3 * time.Minute
89
90
91 Poll = 2 * time.Second
92
93
94 PollShortTimeout = 1 * time.Minute
95
96
97
98
99 ServiceAccountProvisionTimeout = 2 * time.Minute
100
101
102
103 SingleCallTimeout = 5 * time.Minute
104
105
106
107 NodeReadyInitialTimeout = 20 * time.Second
108
109
110 PodReadyBeforeTimeout = 5 * time.Minute
111
112
113
114 ClaimProvisionShortTimeout = 1 * time.Minute
115
116
117 ClaimProvisionTimeout = 5 * time.Minute
118
119
120
121 RestartNodeReadyAgainTimeout = 5 * time.Minute
122
123
124
125 RestartPodReadyAgainTimeout = 5 * time.Minute
126
127
128 SnapshotCreateTimeout = 5 * time.Minute
129
130
131 SnapshotDeleteTimeout = 5 * time.Minute
132 )
133
134 var (
135
136 BusyBoxImage = imageutils.GetE2EImage(imageutils.BusyBox)
137
138
139 ProvidersWithSSH = []string{"gce", "gke", "aws", "local", "azure"}
140
141
142 ServeHostnameImage = imageutils.GetE2EImage(imageutils.Agnhost)
143 )
144
145
146
147 var RunID = uuid.NewUUID()
148
149
150 type CreateTestingNSFn func(ctx context.Context, baseName string, c clientset.Interface, labels map[string]string) (*v1.Namespace, error)
151
152
153 func APIAddress() string {
154 instanceURL, err := url.Parse(TestContext.Host)
155 ExpectNoError(err)
156 return instanceURL.Hostname()
157 }
158
159
160 func ProviderIs(providers ...string) bool {
161 for _, provider := range providers {
162 if strings.EqualFold(provider, TestContext.Provider) {
163 return true
164 }
165 }
166 return false
167 }
168
169
170 func MasterOSDistroIs(supportedMasterOsDistros ...string) bool {
171 for _, distro := range supportedMasterOsDistros {
172 if strings.EqualFold(distro, TestContext.MasterOSDistro) {
173 return true
174 }
175 }
176 return false
177 }
178
179
180 func NodeOSDistroIs(supportedNodeOsDistros ...string) bool {
181 for _, distro := range supportedNodeOsDistros {
182 if strings.EqualFold(distro, TestContext.NodeOSDistro) {
183 return true
184 }
185 }
186 return false
187 }
188
189
190 func NodeOSArchIs(supportedNodeOsArchs ...string) bool {
191 for _, arch := range supportedNodeOsArchs {
192 if strings.EqualFold(arch, TestContext.NodeOSArch) {
193 return true
194 }
195 }
196 return false
197 }
198
199
200
201
202 func DeleteNamespaces(ctx context.Context, c clientset.Interface, deleteFilter, skipFilter []string) ([]string, error) {
203 ginkgo.By("Deleting namespaces")
204 nsList, err := c.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
205 ExpectNoError(err, "Failed to get namespace list")
206 var deleted []string
207 var wg sync.WaitGroup
208 OUTER:
209 for _, item := range nsList.Items {
210 for _, pattern := range skipFilter {
211 if strings.Contains(item.Name, pattern) {
212 continue OUTER
213 }
214 }
215 if deleteFilter != nil {
216 var shouldDelete bool
217 for _, pattern := range deleteFilter {
218 if strings.Contains(item.Name, pattern) {
219 shouldDelete = true
220 break
221 }
222 }
223 if !shouldDelete {
224 continue OUTER
225 }
226 }
227 wg.Add(1)
228 deleted = append(deleted, item.Name)
229 go func(nsName string) {
230 defer wg.Done()
231 defer ginkgo.GinkgoRecover()
232 gomega.Expect(c.CoreV1().Namespaces().Delete(ctx, nsName, metav1.DeleteOptions{})).To(gomega.Succeed())
233 Logf("namespace : %v api call to delete is complete ", nsName)
234 }(item.Name)
235 }
236 wg.Wait()
237 return deleted, nil
238 }
239
240
241 func WaitForNamespacesDeleted(ctx context.Context, c clientset.Interface, namespaces []string, timeout time.Duration) error {
242 ginkgo.By(fmt.Sprintf("Waiting for namespaces %+v to vanish", namespaces))
243 nsMap := map[string]bool{}
244 for _, ns := range namespaces {
245 nsMap[ns] = true
246 }
247
248 return wait.PollWithContext(ctx, 2*time.Second, timeout,
249 func(ctx context.Context) (bool, error) {
250 nsList, err := c.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
251 if err != nil {
252 return false, err
253 }
254 for _, item := range nsList.Items {
255 if _, ok := nsMap[item.Name]; ok {
256 return false, nil
257 }
258 }
259 return true, nil
260 })
261 }
262
263 func waitForConfigMapInNamespace(ctx context.Context, c clientset.Interface, ns, name string, timeout time.Duration) error {
264 fieldSelector := fields.OneTermEqualSelector("metadata.name", name).String()
265 ctx, cancel := watchtools.ContextWithOptionalTimeout(ctx, timeout)
266 defer cancel()
267 lw := &cache.ListWatch{
268 ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
269 options.FieldSelector = fieldSelector
270 return c.CoreV1().ConfigMaps(ns).List(ctx, options)
271 },
272 WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
273 options.FieldSelector = fieldSelector
274 return c.CoreV1().ConfigMaps(ns).Watch(ctx, options)
275 },
276 }
277 _, err := watchtools.UntilWithSync(ctx, lw, &v1.ConfigMap{}, nil, func(event watch.Event) (bool, error) {
278 switch event.Type {
279 case watch.Deleted:
280 return false, apierrors.NewNotFound(schema.GroupResource{Resource: "configmaps"}, name)
281 case watch.Added, watch.Modified:
282 return true, nil
283 }
284 return false, nil
285 })
286 return err
287 }
288
289 func waitForServiceAccountInNamespace(ctx context.Context, c clientset.Interface, ns, serviceAccountName string, timeout time.Duration) error {
290 fieldSelector := fields.OneTermEqualSelector("metadata.name", serviceAccountName).String()
291 ctx, cancel := watchtools.ContextWithOptionalTimeout(ctx, timeout)
292 defer cancel()
293 lw := &cache.ListWatch{
294 ListFunc: func(options metav1.ListOptions) (object runtime.Object, e error) {
295 options.FieldSelector = fieldSelector
296 return c.CoreV1().ServiceAccounts(ns).List(ctx, options)
297 },
298 WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
299 options.FieldSelector = fieldSelector
300 return c.CoreV1().ServiceAccounts(ns).Watch(ctx, options)
301 },
302 }
303 _, err := watchtools.UntilWithSync(ctx, lw, &v1.ServiceAccount{}, nil, func(event watch.Event) (bool, error) {
304 switch event.Type {
305 case watch.Deleted:
306 return false, apierrors.NewNotFound(schema.GroupResource{Resource: "serviceaccounts"}, serviceAccountName)
307 case watch.Added, watch.Modified:
308 return true, nil
309 }
310 return false, nil
311 })
312 if err != nil {
313 return fmt.Errorf("wait for service account %q in namespace %q: %w", serviceAccountName, ns, err)
314 }
315 return nil
316 }
317
318
319
320
321 func WaitForDefaultServiceAccountInNamespace(ctx context.Context, c clientset.Interface, namespace string) error {
322 return waitForServiceAccountInNamespace(ctx, c, namespace, defaultServiceAccountName, ServiceAccountProvisionTimeout)
323 }
324
325
326
327
328 func WaitForKubeRootCAInNamespace(ctx context.Context, c clientset.Interface, namespace string) error {
329 return waitForConfigMapInNamespace(ctx, c, namespace, "kube-root-ca.crt", ServiceAccountProvisionTimeout)
330 }
331
332
333
334 func CreateTestingNS(ctx context.Context, baseName string, c clientset.Interface, labels map[string]string) (*v1.Namespace, error) {
335 if labels == nil {
336 labels = map[string]string{}
337 }
338 labels["e2e-run"] = string(RunID)
339
340
341
342
343 name := fmt.Sprintf("%v-%v", baseName, RandomSuffix())
344
345 namespaceObj := &v1.Namespace{
346 ObjectMeta: metav1.ObjectMeta{
347 Name: name,
348 Namespace: "",
349 Labels: labels,
350 },
351 Status: v1.NamespaceStatus{},
352 }
353
354 var got *v1.Namespace
355 if err := wait.PollUntilContextTimeout(ctx, Poll, 30*time.Second, true, func(ctx context.Context) (bool, error) {
356 var err error
357 got, err = c.CoreV1().Namespaces().Create(ctx, namespaceObj, metav1.CreateOptions{})
358 if err != nil {
359 if apierrors.IsAlreadyExists(err) {
360
361 Logf("Namespace name %q was already taken, generate a new name and retry", namespaceObj.Name)
362 namespaceObj.Name = fmt.Sprintf("%v-%v", baseName, RandomSuffix())
363 } else {
364 Logf("Unexpected error while creating namespace: %v", err)
365 }
366 return false, nil
367 }
368 return true, nil
369 }); err != nil {
370 return nil, err
371 }
372
373 if TestContext.VerifyServiceAccount {
374 if err := WaitForDefaultServiceAccountInNamespace(ctx, c, got.Name); err != nil {
375
376
377
378 return got, err
379 }
380 }
381 return got, nil
382 }
383
384
385
386 func CheckTestingNSDeletedExcept(ctx context.Context, c clientset.Interface, skip string) error {
387
388
389
390
391
392
393
394
395
396
397
398 timeout := 60 * time.Minute
399
400 Logf("Waiting for terminating namespaces to be deleted...")
401 for start := time.Now(); time.Since(start) < timeout; time.Sleep(15 * time.Second) {
402 namespaces, err := c.CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
403 if err != nil {
404 Logf("Listing namespaces failed: %v", err)
405 continue
406 }
407 terminating := 0
408 for _, ns := range namespaces.Items {
409 if strings.HasPrefix(ns.ObjectMeta.Name, "e2e-tests-") && ns.ObjectMeta.Name != skip {
410 if ns.Status.Phase == v1.NamespaceActive {
411 return fmt.Errorf("Namespace %s is active", ns.ObjectMeta.Name)
412 }
413 terminating++
414 }
415 }
416 if terminating == 0 {
417 return nil
418 }
419 }
420 return fmt.Errorf("Waiting for terminating namespaces to be deleted timed out")
421 }
422
423
424
425 func WaitForServiceEndpointsNum(ctx context.Context, c clientset.Interface, namespace, serviceName string, expectNum int, interval, timeout time.Duration) error {
426 return wait.PollWithContext(ctx, interval, timeout, func(ctx context.Context) (bool, error) {
427 Logf("Waiting for amount of service:%s endpoints to be %d", serviceName, expectNum)
428 endpoint, err := c.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
429 if err != nil {
430 Logf("Unexpected error trying to get Endpoints for %s : %v", serviceName, err)
431 return false, nil
432 }
433
434 if countEndpointsNum(endpoint) != expectNum {
435 Logf("Unexpected number of Endpoints, got %d, expected %d", countEndpointsNum(endpoint), expectNum)
436 return false, nil
437 }
438
439
440
441 addressType := discoveryv1.AddressTypeIPv4
442 if isIPv6Endpoint(endpoint) {
443 addressType = discoveryv1.AddressTypeIPv6
444 }
445
446 esList, err := c.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)})
447 if err != nil {
448 Logf("Unexpected error trying to get EndpointSlices for %s : %v", serviceName, err)
449 return false, nil
450 }
451
452 if len(esList.Items) == 0 {
453 Logf("Waiting for at least 1 EndpointSlice to exist")
454 return false, nil
455 }
456
457 if countEndpointsSlicesNum(esList, addressType) != expectNum {
458 Logf("Unexpected number of Endpoints on Slices, got %d, expected %d", countEndpointsSlicesNum(esList, addressType), expectNum)
459 return false, nil
460 }
461 return true, nil
462 })
463 }
464
465 func countEndpointsNum(e *v1.Endpoints) int {
466 num := 0
467 for _, sub := range e.Subsets {
468 num += len(sub.Addresses)
469 }
470 return num
471 }
472
473
474 func isIPv6Endpoint(e *v1.Endpoints) bool {
475 for _, sub := range e.Subsets {
476 for _, addr := range sub.Addresses {
477 if len(addr.IP) == 0 {
478 continue
479 }
480
481 return netutils.IsIPv6String(addr.IP)
482 }
483 }
484
485 return false
486 }
487
488 func countEndpointsSlicesNum(epList *discoveryv1.EndpointSliceList, addressType discoveryv1.AddressType) int {
489
490 addresses := sets.Set[string]{}
491 for _, epSlice := range epList.Items {
492 if epSlice.AddressType != addressType {
493 continue
494 }
495 for _, ep := range epSlice.Endpoints {
496 if len(ep.Addresses) > 0 {
497 addresses.Insert(ep.Addresses[0])
498 }
499 }
500 }
501 return addresses.Len()
502 }
503
504
505 func restclientConfig(kubeContext string) (*clientcmdapi.Config, error) {
506 Logf(">>> kubeConfig: %s", TestContext.KubeConfig)
507 if TestContext.KubeConfig == "" {
508 return nil, fmt.Errorf("KubeConfig must be specified to load client config")
509 }
510 c, err := clientcmd.LoadFromFile(TestContext.KubeConfig)
511 if err != nil {
512 return nil, fmt.Errorf("error loading KubeConfig: %v", err.Error())
513 }
514 if kubeContext != "" {
515 Logf(">>> kubeContext: %s", kubeContext)
516 c.CurrentContext = kubeContext
517 }
518 return c, nil
519 }
520
521
522 type ClientConfigGetter func() (*restclient.Config, error)
523
524
525 func LoadConfig() (config *restclient.Config, err error) {
526 defer func() {
527 if err == nil && config != nil {
528 testDesc := ginkgo.CurrentSpecReport()
529 if len(testDesc.ContainerHierarchyTexts) > 0 {
530 testName := strings.Join(testDesc.ContainerHierarchyTexts, " ")
531 if len(testDesc.LeafNodeText) > 0 {
532 testName = testName + " " + testDesc.LeafNodeText
533 }
534 config.UserAgent = fmt.Sprintf("%s -- %s", restclient.DefaultKubernetesUserAgent(), testName)
535 }
536 }
537 }()
538
539 if TestContext.NodeE2E {
540
541 return &restclient.Config{
542 Host: TestContext.Host,
543 BearerToken: TestContext.BearerToken,
544 TLSClientConfig: restclient.TLSClientConfig{
545 Insecure: true,
546 },
547 }, nil
548 }
549 c, err := restclientConfig(TestContext.KubeContext)
550 if err != nil {
551 if TestContext.KubeConfig == "" {
552 return restclient.InClusterConfig()
553 }
554 return nil, err
555 }
556
557
558 if TestContext.Host == "" && c.Clusters != nil {
559 currentContext, ok := c.Clusters[c.CurrentContext]
560 if ok {
561 TestContext.Host = currentContext.Server
562 }
563 }
564
565 return clientcmd.NewDefaultClientConfig(*c, &clientcmd.ConfigOverrides{ClusterInfo: clientcmdapi.Cluster{Server: TestContext.Host}}).ClientConfig()
566 }
567
568
569 func LoadClientset() (*clientset.Clientset, error) {
570 config, err := LoadConfig()
571 if err != nil {
572 return nil, fmt.Errorf("error creating client: %v", err.Error())
573 }
574 return clientset.NewForConfig(config)
575 }
576
577
578 func RandomSuffix() string {
579 return strconv.Itoa(rand.Intn(10000))
580 }
581
582
583 func StartCmdAndStreamOutput(cmd *exec.Cmd) (stdout, stderr io.ReadCloser, err error) {
584 stdout, err = cmd.StdoutPipe()
585 if err != nil {
586 return
587 }
588 stderr, err = cmd.StderrPipe()
589 if err != nil {
590 return
591 }
592 Logf("Asynchronously running '%s %s'", cmd.Path, strings.Join(cmd.Args, " "))
593 err = cmd.Start()
594 return
595 }
596
597
598 func TryKill(cmd *exec.Cmd) {
599 if err := cmd.Process.Kill(); err != nil {
600 Logf("ERROR failed to kill command %v! The process may leak", cmd)
601 }
602 }
603
604
605
606 func EnsureLoadBalancerResourcesDeleted(ctx context.Context, ip, portRange string) error {
607 return TestContext.CloudConfig.Provider.EnsureLoadBalancerResourcesDeleted(ctx, ip, portRange)
608 }
609
610
611
612 func CoreDump(dir string) {
613 if TestContext.DisableLogDump {
614 Logf("Skipping dumping logs from cluster")
615 return
616 }
617 var cmd *exec.Cmd
618 if TestContext.LogexporterGCSPath != "" {
619 Logf("Dumping logs from nodes to GCS directly at path: %s", TestContext.LogexporterGCSPath)
620 cmd = exec.Command(path.Join(TestContext.RepoRoot, "cluster", "log-dump", "log-dump.sh"), dir, TestContext.LogexporterGCSPath)
621 } else {
622 Logf("Dumping logs locally to: %s", dir)
623 cmd = exec.Command(path.Join(TestContext.RepoRoot, "cluster", "log-dump", "log-dump.sh"), dir)
624 }
625 cmd.Env = append(os.Environ(), fmt.Sprintf("LOG_DUMP_SYSTEMD_SERVICES=%s", parseSystemdServices(TestContext.SystemdServices)))
626 cmd.Env = append(os.Environ(), fmt.Sprintf("LOG_DUMP_SYSTEMD_JOURNAL=%v", TestContext.DumpSystemdJournal))
627
628 cmd.Stdout = os.Stdout
629 cmd.Stderr = os.Stderr
630 if err := cmd.Run(); err != nil {
631 Logf("Error running cluster/log-dump/log-dump.sh: %v", err)
632 }
633 }
634
635
636 func parseSystemdServices(services string) string {
637 return strings.TrimSpace(strings.Replace(services, ",", " ", -1))
638 }
639
640
641
642 func RunCmd(command string, args ...string) (string, string, error) {
643 return RunCmdEnv(nil, command, args...)
644 }
645
646
647
648
649 func RunCmdEnv(env []string, command string, args ...string) (string, string, error) {
650 Logf("Running %s %v", command, args)
651 var bout, berr bytes.Buffer
652 cmd := exec.Command(command, args...)
653
654
655
656
657
658 cmd.Stdout = io.MultiWriter(os.Stdout, &bout)
659 cmd.Stderr = io.MultiWriter(os.Stderr, &berr)
660 cmd.Env = env
661 err := cmd.Run()
662 stdout, stderr := bout.String(), berr.String()
663 if err != nil {
664 return "", "", fmt.Errorf("error running %s %v; got error %v, stdout %q, stderr %q",
665 command, args, err, stdout, stderr)
666 }
667 return stdout, stderr, nil
668 }
669
670
671
672 func getControlPlaneAddresses(ctx context.Context, c clientset.Interface) ([]string, []string, []string) {
673 var externalIPs, internalIPs, hostnames []string
674
675
676 eps, err := c.CoreV1().Endpoints(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{})
677 if err != nil {
678 Failf("Failed to get kubernetes endpoints: %v", err)
679 }
680 for _, subset := range eps.Subsets {
681 for _, address := range subset.Addresses {
682 if address.IP != "" {
683 internalIPs = append(internalIPs, address.IP)
684 }
685 }
686 }
687
688
689 hostURL, err := url.Parse(TestContext.Host)
690 if err != nil {
691 Failf("Failed to parse hostname: %v", err)
692 }
693 if netutils.ParseIPSloppy(hostURL.Host) != nil {
694 externalIPs = append(externalIPs, hostURL.Host)
695 } else {
696 hostnames = append(hostnames, hostURL.Host)
697 }
698
699 return externalIPs, internalIPs, hostnames
700 }
701
702
703
704
705
706 func GetControlPlaneAddresses(ctx context.Context, c clientset.Interface) []string {
707 externalIPs, internalIPs, _ := getControlPlaneAddresses(ctx, c)
708
709 ips := sets.NewString()
710 switch TestContext.Provider {
711 case "gce", "gke":
712 for _, ip := range externalIPs {
713 ips.Insert(ip)
714 }
715 for _, ip := range internalIPs {
716 ips.Insert(ip)
717 }
718 case "aws":
719 ips.Insert(awsMasterIP)
720 default:
721 Failf("This test is not supported for provider %s and should be disabled", TestContext.Provider)
722 }
723 return ips.List()
724 }
725
726
727 func PrettyPrintJSON(metrics interface{}) string {
728 output := &bytes.Buffer{}
729 if err := json.NewEncoder(output).Encode(metrics); err != nil {
730 Logf("Error building encoder: %v", err)
731 return ""
732 }
733 formatted := &bytes.Buffer{}
734 if err := json.Indent(formatted, output.Bytes(), "", " "); err != nil {
735 Logf("Error indenting: %v", err)
736 return ""
737 }
738 return formatted.String()
739 }
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760 func WatchEventSequenceVerifier(ctx context.Context, dc dynamic.Interface, resourceType schema.GroupVersionResource, namespace string, resourceName string, listOptions metav1.ListOptions, expectedWatchEvents []watch.Event, scenario func(*watchtools.RetryWatcher) []watch.Event, retryCleanup func() error) {
761 listWatcher := &cache.ListWatch{
762 WatchFunc: func(listOptions metav1.ListOptions) (watch.Interface, error) {
763 return dc.Resource(resourceType).Namespace(namespace).Watch(ctx, listOptions)
764 },
765 }
766
767 retries := 3
768 retriesLoop:
769 for try := 1; try <= retries; try++ {
770 initResource, err := dc.Resource(resourceType).Namespace(namespace).List(ctx, listOptions)
771 ExpectNoError(err, "Failed to fetch initial resource")
772
773 resourceWatch, err := watchtools.NewRetryWatcher(initResource.GetResourceVersion(), listWatcher)
774 ExpectNoError(err, "Failed to create a resource watch of %v in namespace %v", resourceType.Resource, namespace)
775
776
777 actualWatchEvents := scenario(resourceWatch)
778 errs := sets.NewString()
779 gomega.Expect(len(expectedWatchEvents)).To(gomega.BeNumerically("<=", len(actualWatchEvents)), "Did not get enough watch events")
780
781 totalValidWatchEvents := 0
782 foundEventIndexes := map[int]*int{}
783
784 for watchEventIndex, expectedWatchEvent := range expectedWatchEvents {
785 foundExpectedWatchEvent := false
786 actualWatchEventsLoop:
787 for actualWatchEventIndex, actualWatchEvent := range actualWatchEvents {
788 if foundEventIndexes[actualWatchEventIndex] != nil {
789 continue actualWatchEventsLoop
790 }
791 if actualWatchEvent.Type == expectedWatchEvent.Type {
792 foundExpectedWatchEvent = true
793 foundEventIndexes[actualWatchEventIndex] = &watchEventIndex
794 break actualWatchEventsLoop
795 }
796 }
797 if !foundExpectedWatchEvent {
798 errs.Insert(fmt.Sprintf("Watch event %v not found", expectedWatchEvent.Type))
799 }
800 totalValidWatchEvents++
801 }
802 err = retryCleanup()
803 ExpectNoError(err, "Error occurred when cleaning up resources")
804 if errs.Len() > 0 && try < retries {
805 fmt.Println("invariants violated:\n", strings.Join(errs.List(), "\n - "))
806 continue retriesLoop
807 }
808 if errs.Len() > 0 {
809 Failf("Unexpected error(s): %v", strings.Join(errs.List(), "\n - "))
810 }
811 gomega.Expect(expectedWatchEvents).To(gomega.HaveLen(totalValidWatchEvents), "Error: there must be an equal amount of total valid watch events (%d) and expected watch events (%d)", totalValidWatchEvents, len(expectedWatchEvents))
812 break retriesLoop
813 }
814 }
815
View as plain text