1
16
17 package apimachinery
18
19 import (
20 "context"
21 "fmt"
22 "os"
23 "sort"
24 "time"
25
26 "github.com/google/go-cmp/cmp"
27 "github.com/onsi/ginkgo/v2"
28
29 v1 "k8s.io/api/core/v1"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/runtime"
32 "k8s.io/apimachinery/pkg/util/wait"
33 "k8s.io/apimachinery/pkg/watch"
34 "k8s.io/client-go/tools/cache"
35 "k8s.io/kubernetes/test/e2e/feature"
36 "k8s.io/kubernetes/test/e2e/framework"
37 )
38
39 var _ = SIGDescribe("API Streaming (aka. WatchList)", framework.WithSerial(), feature.WatchList, func() {
40 f := framework.NewDefaultFramework("watchlist")
41 ginkgo.It("should be requested when ENABLE_CLIENT_GO_WATCH_LIST_ALPHA is set", func(ctx context.Context) {
42 prevWatchListEnvValue, wasWatchListEnvSet := os.LookupEnv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA")
43 os.Setenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA", "true")
44 defer func() {
45 if !wasWatchListEnvSet {
46 os.Unsetenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA")
47 return
48 }
49 os.Setenv("ENABLE_CLIENT_GO_WATCH_LIST_ALPHA", prevWatchListEnvValue)
50 }()
51 stopCh := make(chan struct{})
52 defer close(stopCh)
53 secretInformer := cache.NewSharedIndexInformer(
54 &cache.ListWatch{
55 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
56 return nil, fmt.Errorf("unexpected list call")
57 },
58 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
59 return f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Watch(context.TODO(), options)
60 },
61 },
62 &v1.Secret{},
63 time.Duration(0),
64 nil,
65 )
66
67 ginkgo.By(fmt.Sprintf("Adding 5 secrets to %s namespace", f.Namespace.Name))
68 for i := 1; i <= 5; i++ {
69 _, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{})
70 framework.ExpectNoError(err)
71 }
72
73 ginkgo.By("Starting the secret informer")
74 go secretInformer.Run(stopCh)
75
76 ginkgo.By("Waiting until the secret informer is fully synchronised")
77 err := wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 30*time.Second, false, func(context.Context) (done bool, err error) {
78 return secretInformer.HasSynced(), nil
79 })
80 framework.ExpectNoError(err, "Failed waiting for the secret informer in %s namespace to be synced", f.Namespace.Namespace)
81
82 ginkgo.By("Verifying if the secret informer was properly synchronised")
83 verifyStore(ctx, f, secretInformer.GetStore())
84
85 ginkgo.By("Modifying a secret and checking if the update was picked up by the secret informer")
86 secret, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Get(ctx, "secret-1", metav1.GetOptions{})
87 framework.ExpectNoError(err)
88 secret.StringData = map[string]string{"foo": "bar"}
89 _, err = f.ClientSet.CoreV1().Secrets(f.Namespace.Name).Update(ctx, secret, metav1.UpdateOptions{})
90 framework.ExpectNoError(err)
91 verifyStore(ctx, f, secretInformer.GetStore())
92 })
93 })
94
95 func verifyStore(ctx context.Context, f *framework.Framework, store cache.Store) {
96 ginkgo.By(fmt.Sprintf("Listing secrets directly from the server from %s namespace", f.Namespace.Name))
97 expectedSecretsList, err := f.ClientSet.CoreV1().Secrets(f.Namespace.Name).List(ctx, metav1.ListOptions{})
98 framework.ExpectNoError(err)
99 expectedSecrets := expectedSecretsList.Items
100
101 err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, 30*time.Second, true, func(ctx context.Context) (done bool, err error) {
102 ginkgo.By("Comparing secrets retrieved directly from the server with the ones that have been streamed to the secret informer")
103 rawStreamedSecrets := store.List()
104 streamedSecrets := make([]v1.Secret, 0, len(rawStreamedSecrets))
105 for _, rawSecret := range rawStreamedSecrets {
106 streamedSecrets = append(streamedSecrets, *rawSecret.(*v1.Secret))
107 }
108 sort.Sort(byName(expectedSecrets))
109 sort.Sort(byName(streamedSecrets))
110 return cmp.Equal(expectedSecrets, streamedSecrets), nil
111 })
112 framework.ExpectNoError(err)
113 }
114
115 type byName []v1.Secret
116
117 func (a byName) Len() int { return len(a) }
118 func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
119 func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
120
121 func newSecret(name string) *v1.Secret {
122 return &v1.Secret{
123 ObjectMeta: metav1.ObjectMeta{Name: name},
124 }
125 }
126
View as plain text