1
16
17 package apiserver
18
19 import (
20 "context"
21 "fmt"
22 "sync"
23 "testing"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/util/wait"
29 clientset "k8s.io/client-go/kubernetes"
30 "k8s.io/kubernetes/cmd/kube-apiserver/app/options"
31 "k8s.io/kubernetes/pkg/controlplane"
32 "k8s.io/kubernetes/pkg/controlplane/reconcilers"
33 "k8s.io/kubernetes/test/integration/framework"
34 "k8s.io/kubernetes/test/utils/ktesting"
35 )
36
37
38
39 func multiEtcdSetup(ctx context.Context, t *testing.T) (clientset.Interface, framework.TearDownFunc) {
40 etcdArgs := []string{"--experimental-watch-progress-notify-interval", "1s"}
41 etcd0URL, stopEtcd0, err := framework.RunCustomEtcd("etcd_watchcache0", etcdArgs, nil)
42 if err != nil {
43 t.Fatalf("Couldn't start etcd: %v", err)
44 }
45
46 etcd1URL, stopEtcd1, err := framework.RunCustomEtcd("etcd_watchcache1", etcdArgs, nil)
47 if err != nil {
48 t.Fatalf("Couldn't start etcd: %v", err)
49 }
50
51 etcdOptions := framework.DefaultEtcdOptions()
52
53 etcdOptions.StorageConfig.Transport.ServerList = []string{etcd0URL}
54 etcdOptions.EtcdServersOverrides = []string{fmt.Sprintf("/events#%s", etcd1URL)}
55 etcdOptions.EnableWatchCache = true
56
57 clientSet, _, tearDownFn := framework.StartTestServer(ctx, t, framework.TestServerSetup{
58 ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
59
60 opts.Etcd = etcdOptions
61 },
62 ModifyServerConfig: func(config *controlplane.Config) {
63
64 config.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType
65 },
66 })
67
68 closeFn := func() {
69 tearDownFn()
70 stopEtcd1()
71 stopEtcd0()
72 }
73
74
75
76
77
78 if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
79 _, err := clientSet.CoreV1().Services("default").Get(ctx, "kubernetes", metav1.GetOptions{})
80 return err == nil, nil
81 }); err != nil {
82 t.Fatalf("Failed to wait for kubernetes service: %v:", err)
83 }
84 return clientSet, closeFn
85 }
86
87 func TestWatchCacheUpdatedByEtcd(t *testing.T) {
88 tCtx := ktesting.Init(t)
89 c, closeFn := multiEtcdSetup(tCtx, t)
90 defer closeFn()
91
92 makeConfigMap := func(name string) *v1.ConfigMap {
93 return &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: name}}
94 }
95 makeSecret := func(name string) *v1.Secret {
96 return &v1.Secret{ObjectMeta: metav1.ObjectMeta{Name: name}}
97 }
98 makeEvent := func(name string) *v1.Event {
99 return &v1.Event{ObjectMeta: metav1.ObjectMeta{Name: name}}
100 }
101
102 cm, err := c.CoreV1().ConfigMaps("default").Create(tCtx, makeConfigMap("name"), metav1.CreateOptions{})
103 if err != nil {
104 t.Errorf("Couldn't create configmap: %v", err)
105 }
106 ev, err := c.CoreV1().Events("default").Create(tCtx, makeEvent("name"), metav1.CreateOptions{})
107 if err != nil {
108 t.Errorf("Couldn't create event: %v", err)
109 }
110
111 listOptions := metav1.ListOptions{
112 ResourceVersion: "0",
113 ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
114 }
115
116
117
118 t.Logf("Waiting for configmaps watchcache synced to %s", cm.ResourceVersion)
119 if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
120 res, err := c.CoreV1().ConfigMaps("default").List(tCtx, listOptions)
121 if err != nil {
122 return false, nil
123 }
124 return res.ResourceVersion == cm.ResourceVersion, nil
125 }); err != nil {
126 t.Errorf("Failed to wait for configmaps watchcache synced: %v", err)
127 }
128 t.Logf("Waiting for events watchcache synced to %s", ev.ResourceVersion)
129 if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
130 res, err := c.CoreV1().Events("default").List(tCtx, listOptions)
131 if err != nil {
132 return false, nil
133 }
134 return res.ResourceVersion == ev.ResourceVersion, nil
135 }); err != nil {
136 t.Errorf("Failed to wait for events watchcache synced: %v", err)
137 }
138
139
140
141 se, err := c.CoreV1().Secrets("default").Create(tCtx, makeSecret("name"), metav1.CreateOptions{})
142 if err != nil {
143 t.Errorf("Couldn't create secret: %v", err)
144 }
145
146 t.Logf("Waiting for configmaps watchcache synced to %s", se.ResourceVersion)
147 if err := wait.Poll(100*time.Millisecond, wait.ForeverTestTimeout, func() (bool, error) {
148 res, err := c.CoreV1().ConfigMaps("default").List(tCtx, listOptions)
149 if err != nil {
150 return false, nil
151 }
152 return res.ResourceVersion == se.ResourceVersion, nil
153 }); err != nil {
154 t.Errorf("Failed to wait for configmaps watchcache synced: %v", err)
155 }
156 t.Logf("Waiting for events watchcache NOT synced to %s", se.ResourceVersion)
157 if err := wait.Poll(100*time.Millisecond, 5*time.Second, func() (bool, error) {
158 res, err := c.CoreV1().Events("default").List(tCtx, listOptions)
159 if err != nil {
160 return false, nil
161 }
162 return res.ResourceVersion == se.ResourceVersion, nil
163 }); err == nil || err != wait.ErrWaitTimeout {
164 t.Errorf("Events watchcache unexpected synced: %v", err)
165 }
166 }
167
168 func BenchmarkListFromWatchCache(b *testing.B) {
169 tCtx := ktesting.Init(b)
170 c, _, tearDownFn := framework.StartTestServer(tCtx, b, framework.TestServerSetup{
171 ModifyServerConfig: func(config *controlplane.Config) {
172
173 config.ExtraConfig.EndpointReconcilerType = reconcilers.NoneEndpointReconcilerType
174 },
175 })
176 defer tearDownFn()
177
178 namespaces, secretsPerNamespace := 100, 1000
179 wg := sync.WaitGroup{}
180
181 errCh := make(chan error, namespaces)
182 for i := 0; i < namespaces; i++ {
183 wg.Add(1)
184 index := i
185 go func() {
186 defer wg.Done()
187
188 ns := &v1.Namespace{
189 ObjectMeta: metav1.ObjectMeta{Name: fmt.Sprintf("namespace-%d", index)},
190 }
191 ns, err := c.CoreV1().Namespaces().Create(tCtx, ns, metav1.CreateOptions{})
192 if err != nil {
193 errCh <- err
194 return
195 }
196
197 for j := 0; j < secretsPerNamespace; j++ {
198 secret := &v1.Secret{
199 ObjectMeta: metav1.ObjectMeta{
200 Name: fmt.Sprintf("secret-%d", j),
201 },
202 }
203 _, err := c.CoreV1().Secrets(ns.Name).Create(tCtx, secret, metav1.CreateOptions{})
204 if err != nil {
205 errCh <- err
206 return
207 }
208 }
209 }()
210 }
211
212 wg.Wait()
213 close(errCh)
214 for err := range errCh {
215 b.Error(err)
216 }
217
218 b.ResetTimer()
219
220 opts := metav1.ListOptions{
221 ResourceVersion: "0",
222 }
223 for i := 0; i < b.N; i++ {
224 secrets, err := c.CoreV1().Secrets("").List(tCtx, opts)
225 if err != nil {
226 b.Errorf("failed to list secrets: %v", err)
227 }
228 b.Logf("Number of secrets: %d", len(secrets.Items))
229 }
230 }
231
View as plain text