1
16
17 package apimachinery
18
19 import (
20 "context"
21 "fmt"
22 "sort"
23 "testing"
24 "time"
25
26 "github.com/stretchr/testify/require"
27
28 "github.com/google/go-cmp/cmp"
29 v1 "k8s.io/api/core/v1"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/runtime"
32 "k8s.io/apimachinery/pkg/util/wait"
33 "k8s.io/apimachinery/pkg/watch"
34 "k8s.io/client-go/kubernetes"
35 "k8s.io/client-go/tools/cache"
36 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
37 "k8s.io/kubernetes/test/integration/framework"
38 "k8s.io/utils/ptr"
39 )
40
41 func TestReflectorWatchListFallback(t *testing.T) {
42 ctx := context.TODO()
43
44 t.Log("Starting etcd that will be used by two different instances of kube-apiserver")
45 etcdURL, etcdTearDownFn, err := framework.RunCustomEtcd("etcd_watchlist", []string{"--experimental-watch-progress-notify-interval", "1s"}, nil)
46 require.NoError(t, err)
47 defer etcdTearDownFn()
48 etcdOptions := framework.DefaultEtcdOptions()
49 etcdOptions.StorageConfig.Transport.ServerList = []string{etcdURL}
50
51 t.Log("Starting the first server with the WatchList feature enabled")
52 server1 := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--feature-gates=WatchList=true"}, &etcdOptions.StorageConfig)
53 defer server1.TearDownFn()
54 clientSet, err := kubernetes.NewForConfig(server1.ClientConfig)
55 require.NoError(t, err)
56
57 ns := framework.CreateNamespaceOrDie(clientSet, "reflector-fallback-watchlist", t)
58 defer framework.DeleteNamespaceOrDie(clientSet, ns, t)
59
60 t.Logf("Adding 5 secrets to %s namespace", ns.Name)
61 for i := 1; i <= 5; i++ {
62 _, err := clientSet.CoreV1().Secrets(ns.Name).Create(ctx, newSecret(fmt.Sprintf("secret-%d", i)), metav1.CreateOptions{})
63 require.NoError(t, err)
64 }
65
66 t.Log("Creating a secret reflector that will use the WatchList feature to synchronise the store")
67 store := &wrappedStore{Store: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc)}
68 lw := &wrappedListWatch{&cache.ListWatch{}}
69 lw.SetClient(ctx, clientSet, ns)
70 target := cache.NewReflector(lw, &v1.Secret{}, store, time.Duration(0))
71 target.UseWatchList = ptr.To(true)
72
73 t.Log("Waiting until the secret reflector synchronises to the store (call to the Replace method)")
74 reflectorCtx, reflectorCtxCancel := context.WithCancel(context.Background())
75 defer reflectorCtxCancel()
76 store.setCancelOnReplace(reflectorCtxCancel)
77 err = target.ListAndWatch(reflectorCtx.Done())
78 require.NoError(t, err)
79
80 t.Log("Verifying if the secret reflector was properly synchronised")
81 verifyStore(t, ctx, clientSet, store, ns)
82
83 t.Log("Starting the second server with the WatchList feature disabled")
84 server2 := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--feature-gates=WatchList=false"}, &etcdOptions.StorageConfig)
85 defer server2.TearDownFn()
86 clientSet2, err := kubernetes.NewForConfig(server2.ClientConfig)
87 require.NoError(t, err)
88
89 t.Log("Pointing the ListWatcher used by the secret reflector to the second server (with the WatchList feature disabled)")
90 lw.SetClient(ctx, clientSet2, ns)
91 reflectorCtx, reflectorCtxCancel = context.WithCancel(context.Background())
92 defer reflectorCtxCancel()
93 store.setCancelOnReplace(reflectorCtxCancel)
94 err = target.ListAndWatch(reflectorCtx.Done())
95 require.NoError(t, err)
96
97 t.Log("Verifying if the secret reflector was properly synchronised")
98 verifyStore(t, ctx, clientSet, store, ns)
99 }
100
101
102 func verifyStore(t *testing.T, ctx context.Context, clientSet kubernetes.Interface, store cache.Store, namespace *v1.Namespace) {
103 t.Logf("Listing secrets directly from the server from %s namespace", namespace.Name)
104 expectedSecretsList, err := clientSet.CoreV1().Secrets(namespace.Name).List(ctx, metav1.ListOptions{})
105 require.NoError(t, err)
106 expectedSecrets := expectedSecretsList.Items
107
108 err = wait.PollUntilContextTimeout(ctx, 100*time.Millisecond, wait.ForeverTestTimeout, true, func(ctx context.Context) (done bool, err error) {
109 t.Log("Comparing secrets retrieved directly from the server with the ones that have been streamed to the secret reflector")
110 rawStreamedSecrets := store.List()
111 streamedSecrets := make([]v1.Secret, 0, len(rawStreamedSecrets))
112 for _, rawSecret := range rawStreamedSecrets {
113 streamedSecrets = append(streamedSecrets, *rawSecret.(*v1.Secret))
114 }
115 sort.Sort(byName(expectedSecrets))
116 sort.Sort(byName(streamedSecrets))
117 return cmp.Equal(expectedSecrets, streamedSecrets), nil
118 })
119 require.NoError(t, err)
120 }
121
122 type byName []v1.Secret
123
124 func (a byName) Len() int { return len(a) }
125 func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name }
126 func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
127
128 func newSecret(name string) *v1.Secret {
129 return &v1.Secret{
130 ObjectMeta: metav1.ObjectMeta{Name: name},
131 }
132 }
133
134 type wrappedStore struct {
135 cache.Store
136 ctxCancel context.CancelFunc
137 }
138
139 func (s *wrappedStore) Replace(items []interface{}, rv string) error {
140 s.ctxCancel()
141 return s.Store.Replace(items, rv)
142 }
143
144 func (s *wrappedStore) setCancelOnReplace(ctxCancel context.CancelFunc) {
145 s.ctxCancel = ctxCancel
146 }
147
148 type wrappedListWatch struct {
149 *cache.ListWatch
150 }
151
152 func (lw *wrappedListWatch) SetClient(ctx context.Context, clientSet kubernetes.Interface, ns *v1.Namespace) {
153 lw.ListFunc = func(options metav1.ListOptions) (runtime.Object, error) {
154 return clientSet.CoreV1().Secrets(ns.Name).List(ctx, options)
155 }
156 lw.WatchFunc = func(options metav1.ListOptions) (watch.Interface, error) {
157 return clientSet.CoreV1().Secrets(ns.Name).Watch(ctx, options)
158 }
159 }
160
View as plain text