1
16
17 package reconcilers
18
19
23
24 import (
25 "reflect"
26 "sort"
27 "testing"
28 "time"
29
30 "github.com/google/uuid"
31 corev1 "k8s.io/api/core/v1"
32 "k8s.io/apimachinery/pkg/api/apitesting"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/runtime"
35 "k8s.io/apimachinery/pkg/runtime/schema"
36 "k8s.io/apimachinery/pkg/runtime/serializer"
37 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
38 "k8s.io/apiserver/pkg/storage"
39 etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
40 "k8s.io/apiserver/pkg/storage/storagebackend/factory"
41 "k8s.io/client-go/kubernetes/fake"
42 "k8s.io/kubernetes/pkg/apis/core"
43 netutils "k8s.io/utils/net"
44 )
45
46 func init() {
47 var scheme = runtime.NewScheme()
48
49 metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
50 utilruntime.Must(core.AddToScheme(scheme))
51 utilruntime.Must(corev1.AddToScheme(scheme))
52 utilruntime.Must(scheme.SetVersionPriority(corev1.SchemeGroupVersion))
53
54 codecs = serializer.NewCodecFactory(scheme)
55 }
56
57 var codecs serializer.CodecFactory
58
59 type fakeLeases struct {
60 storageLeases
61 }
62
63 var _ Leases = &fakeLeases{}
64
65 func newFakeLeases(t *testing.T, s storage.Interface) *fakeLeases {
66
67
68
69
70
71 base := "/" + uuid.New().String() + "/masterleases/"
72 return &fakeLeases{
73 storageLeases{
74 storage: s,
75 destroyFn: func() {},
76 baseKey: base,
77 leaseTime: 1 * time.Minute,
78 },
79 }
80 }
81
82 func (f *fakeLeases) SetKeys(keys []string) error {
83 for _, ip := range keys {
84 if err := f.UpdateLease(ip); err != nil {
85 return err
86 }
87 }
88 return nil
89 }
90
91 func TestLeaseEndpointReconciler(t *testing.T) {
92 server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
93 t.Cleanup(func() { server.Terminate(t) })
94
95 newFunc := func() runtime.Object { return &corev1.Endpoints{} }
96 newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
97 sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
98
99 s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc, newListFunc, "")
100 if err != nil {
101 t.Fatalf("Error creating storage: %v", err)
102 }
103 t.Cleanup(dFunc)
104
105 reconcileTests := []struct {
106 testName string
107 serviceName string
108 ip string
109 endpointPorts []corev1.EndpointPort
110 endpointKeys []string
111 initialState []runtime.Object
112 expectUpdate []runtime.Object
113 expectCreate []runtime.Object
114 expectLeases []string
115 }{
116 {
117 testName: "no existing endpoints",
118 serviceName: "foo",
119 ip: "1.2.3.4",
120 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
121 initialState: nil,
122 expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
123 expectLeases: []string{"1.2.3.4"},
124 },
125 {
126 testName: "existing endpoints satisfy",
127 serviceName: "foo",
128 ip: "1.2.3.4",
129 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
130 initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
131 expectLeases: []string{"1.2.3.4"},
132 },
133 {
134 testName: "existing endpoints satisfy, no endpointslice",
135 serviceName: "foo",
136 ip: "1.2.3.4",
137 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
138 initialState: []runtime.Object{
139 makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
140 },
141 expectCreate: []runtime.Object{
142 makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
143 },
144 expectLeases: []string{"1.2.3.4"},
145 },
146 {
147 testName: "existing endpointslice satisfies, no endpoints",
148 serviceName: "foo",
149 ip: "1.2.3.4",
150 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
151 initialState: []runtime.Object{
152 makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
153 },
154 expectCreate: []runtime.Object{
155 makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
156 },
157 expectLeases: []string{"1.2.3.4"},
158 },
159 {
160 testName: "existing endpoints satisfy, endpointslice is wrong",
161 serviceName: "foo",
162 ip: "1.2.3.4",
163 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
164 initialState: []runtime.Object{
165 makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
166 makeEndpointSlice("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
167 },
168 expectUpdate: []runtime.Object{
169 makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
170 },
171 expectLeases: []string{"1.2.3.4"},
172 },
173 {
174 testName: "existing endpointslice satisfies, endpoints is wrong",
175 serviceName: "foo",
176 ip: "1.2.3.4",
177 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
178 initialState: []runtime.Object{
179 makeEndpoints("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
180 makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
181 },
182 expectUpdate: []runtime.Object{
183 makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
184 },
185 expectLeases: []string{"1.2.3.4"},
186 },
187 {
188 testName: "existing endpoints satisfy + refresh existing key",
189 serviceName: "foo",
190 ip: "1.2.3.4",
191 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
192 endpointKeys: []string{"1.2.3.4"},
193 initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
194 expectLeases: []string{"1.2.3.4"},
195 },
196 {
197 testName: "existing endpoints satisfy but too many",
198 serviceName: "foo",
199 ip: "1.2.3.4",
200 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
201 initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
202 expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
203 expectLeases: []string{"1.2.3.4"},
204 },
205 {
206 testName: "existing endpoints satisfy but too many + extra masters",
207 serviceName: "foo",
208 ip: "1.2.3.4",
209 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
210 endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
211 initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
212 expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
213 expectLeases: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
214 },
215 {
216 testName: "existing endpoints satisfy but too many + extra masters + delete first",
217 serviceName: "foo",
218 ip: "4.3.2.4",
219 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
220 endpointKeys: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
221 initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
222 expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
223 expectLeases: []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
224 },
225 {
226 testName: "existing endpoints current IP missing",
227 serviceName: "foo",
228 ip: "4.3.2.2",
229 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
230 endpointKeys: []string{"4.3.2.1"},
231 initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
232 expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
233 expectLeases: []string{"4.3.2.1", "4.3.2.2"},
234 },
235 {
236 testName: "existing endpoints wrong name",
237 serviceName: "foo",
238 ip: "1.2.3.4",
239 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
240 initialState: makeEndpointsArray("bar", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
241 expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
242 expectLeases: []string{"1.2.3.4"},
243 },
244 {
245 testName: "existing endpoints wrong IP",
246 serviceName: "foo",
247 ip: "1.2.3.4",
248 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
249 initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
250 expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
251 expectLeases: []string{"1.2.3.4"},
252 },
253 {
254 testName: "existing endpoints wrong port",
255 serviceName: "foo",
256 ip: "1.2.3.4",
257 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
258 initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}),
259 expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
260 expectLeases: []string{"1.2.3.4"},
261 },
262 {
263 testName: "existing endpoints wrong protocol",
264 serviceName: "foo",
265 ip: "1.2.3.4",
266 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
267 initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}),
268 expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
269 expectLeases: []string{"1.2.3.4"},
270 },
271 {
272 testName: "existing endpoints wrong port name",
273 serviceName: "foo",
274 ip: "1.2.3.4",
275 endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
276 initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
277 expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}),
278 expectLeases: []string{"1.2.3.4"},
279 },
280 {
281 testName: "existing endpoints without skip mirror label",
282 serviceName: "foo",
283 ip: "1.2.3.4",
284 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
285 initialState: []runtime.Object{
286
287
288 &corev1.Endpoints{
289 ObjectMeta: metav1.ObjectMeta{
290 Namespace: metav1.NamespaceDefault,
291 Name: "foo",
292 },
293 Subsets: []corev1.EndpointSubset{{
294 Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
295 Ports: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
296 }},
297 },
298 makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
299 },
300 expectUpdate: []runtime.Object{
301 makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
302
303 },
304 expectLeases: []string{"1.2.3.4"},
305 },
306 {
307 testName: "existing endpoints extra service ports satisfy",
308 serviceName: "foo",
309 ip: "1.2.3.4",
310 endpointPorts: []corev1.EndpointPort{
311 {Name: "foo", Port: 8080, Protocol: "TCP"},
312 {Name: "bar", Port: 1000, Protocol: "TCP"},
313 {Name: "baz", Port: 1010, Protocol: "TCP"},
314 },
315 initialState: makeEndpointsArray("foo", []string{"1.2.3.4"},
316 []corev1.EndpointPort{
317 {Name: "foo", Port: 8080, Protocol: "TCP"},
318 {Name: "bar", Port: 1000, Protocol: "TCP"},
319 {Name: "baz", Port: 1010, Protocol: "TCP"},
320 },
321 ),
322 expectLeases: []string{"1.2.3.4"},
323 },
324 {
325 testName: "existing endpoints extra service ports missing port",
326 serviceName: "foo",
327 ip: "1.2.3.4",
328 endpointPorts: []corev1.EndpointPort{
329 {Name: "foo", Port: 8080, Protocol: "TCP"},
330 {Name: "bar", Port: 1000, Protocol: "TCP"},
331 },
332 initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
333 expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"},
334 []corev1.EndpointPort{
335 {Name: "foo", Port: 8080, Protocol: "TCP"},
336 {Name: "bar", Port: 1000, Protocol: "TCP"},
337 },
338 ),
339 expectLeases: []string{"1.2.3.4"},
340 },
341 }
342 for _, test := range reconcileTests {
343 t.Run(test.testName, func(t *testing.T) {
344 fakeLeases := newFakeLeases(t, s)
345 err := fakeLeases.SetKeys(test.endpointKeys)
346 if err != nil {
347 t.Errorf("unexpected error creating keys: %v", err)
348 }
349 clientset := fake.NewSimpleClientset(test.initialState...)
350
351 epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
352 r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
353 err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, true)
354 if err != nil {
355 t.Errorf("unexpected error reconciling: %v", err)
356 }
357
358 err = verifyCreatesAndUpdates(clientset, test.expectCreate, test.expectUpdate)
359 if err != nil {
360 t.Errorf("unexpected error in side effects: %v", err)
361 }
362
363 leases, err := fakeLeases.ListLeases()
364 if err != nil {
365 t.Errorf("unexpected error: %v", err)
366 }
367
368 sort.Strings(leases)
369 sort.Strings(test.expectLeases)
370 if !reflect.DeepEqual(leases, test.expectLeases) {
371 t.Errorf("expected %v got: %v", test.expectLeases, leases)
372 }
373 })
374 }
375
376 nonReconcileTests := []struct {
377 testName string
378 serviceName string
379 ip string
380 endpointPorts []corev1.EndpointPort
381 endpointKeys []string
382 initialState []runtime.Object
383 expectUpdate []runtime.Object
384 expectCreate []runtime.Object
385 expectLeases []string
386 }{
387 {
388 testName: "existing endpoints extra service ports missing port no update",
389 serviceName: "foo",
390 ip: "1.2.3.4",
391 endpointPorts: []corev1.EndpointPort{
392 {Name: "foo", Port: 8080, Protocol: "TCP"},
393 {Name: "bar", Port: 1000, Protocol: "TCP"},
394 },
395 initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
396 expectUpdate: nil,
397 expectLeases: []string{"1.2.3.4"},
398 },
399 {
400 testName: "existing endpoints extra service ports, wrong ports, wrong IP",
401 serviceName: "foo",
402 ip: "1.2.3.4",
403 endpointPorts: []corev1.EndpointPort{
404 {Name: "foo", Port: 8080, Protocol: "TCP"},
405 {Name: "bar", Port: 1000, Protocol: "TCP"},
406 },
407 initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
408 expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
409 expectLeases: []string{"1.2.3.4"},
410 },
411 {
412 testName: "no existing endpoints",
413 serviceName: "foo",
414 ip: "1.2.3.4",
415 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
416 initialState: nil,
417 expectCreate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
418 expectLeases: []string{"1.2.3.4"},
419 },
420 }
421 for _, test := range nonReconcileTests {
422 t.Run(test.testName, func(t *testing.T) {
423 fakeLeases := newFakeLeases(t, s)
424 err := fakeLeases.SetKeys(test.endpointKeys)
425 if err != nil {
426 t.Errorf("unexpected error creating keys: %v", err)
427 }
428 clientset := fake.NewSimpleClientset(test.initialState...)
429 epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
430 r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
431 err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false)
432 if err != nil {
433 t.Errorf("unexpected error reconciling: %v", err)
434 }
435
436 err = verifyCreatesAndUpdates(clientset, test.expectCreate, test.expectUpdate)
437 if err != nil {
438 t.Errorf("unexpected error in side effects: %v", err)
439 }
440
441 leases, err := fakeLeases.ListLeases()
442 if err != nil {
443 t.Errorf("unexpected error: %v", err)
444 }
445
446 sort.Strings(leases)
447 sort.Strings(test.expectLeases)
448 if !reflect.DeepEqual(leases, test.expectLeases) {
449 t.Errorf("expected %v got: %v", test.expectLeases, leases)
450 }
451 })
452 }
453 }
454
455 func TestLeaseRemoveEndpoints(t *testing.T) {
456 server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
457 t.Cleanup(func() { server.Terminate(t) })
458
459 newFunc := func() runtime.Object { return &corev1.Endpoints{} }
460 newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
461 sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
462
463 s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc, newListFunc, "")
464 if err != nil {
465 t.Fatalf("Error creating storage: %v", err)
466 }
467 t.Cleanup(dFunc)
468
469 stopTests := []struct {
470 testName string
471 serviceName string
472 ip string
473 endpointPorts []corev1.EndpointPort
474 endpointKeys []string
475 initialState []runtime.Object
476 expectUpdate []runtime.Object
477 expectLeases []string
478 apiServerStartup bool
479 }{
480 {
481 testName: "successful remove previous endpoints before apiserver starts",
482 serviceName: "foo",
483 ip: "1.2.3.4",
484 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
485 endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
486 initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
487 expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
488 expectLeases: []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"},
489 apiServerStartup: true,
490 },
491 {
492 testName: "successful stop reconciling",
493 serviceName: "foo",
494 ip: "1.2.3.4",
495 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
496 endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
497 initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
498 expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
499 expectLeases: []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"},
500 },
501 {
502 testName: "stop reconciling with ip not in endpoint ip list",
503 serviceName: "foo",
504 ip: "5.6.7.8",
505 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
506 endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
507 initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
508 expectLeases: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
509 },
510 {
511 testName: "endpoint with no subset",
512 serviceName: "foo",
513 ip: "1.2.3.4",
514 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
515 endpointKeys: []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
516 initialState: makeEndpointsArray("foo", nil, nil),
517 expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
518 expectLeases: []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"},
519 },
520 {
521 testName: "the last API server was shut down cleanly",
522 serviceName: "foo",
523 ip: "1.2.3.4",
524 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
525 endpointKeys: []string{"1.2.3.4"},
526 initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
527 expectUpdate: makeEndpointsArray("foo", []string{}, []corev1.EndpointPort{}),
528 expectLeases: []string{},
529 },
530 }
531 for _, test := range stopTests {
532 t.Run(test.testName, func(t *testing.T) {
533 fakeLeases := newFakeLeases(t, s)
534 err := fakeLeases.SetKeys(test.endpointKeys)
535 if err != nil {
536 t.Errorf("unexpected error creating keys: %v", err)
537 }
538 clientset := fake.NewSimpleClientset(test.initialState...)
539 epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
540 r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
541 if !test.apiServerStartup {
542 r.StopReconciling()
543 }
544 err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts)
545
546 if !contains(test.endpointKeys, test.ip) {
547 if !storage.IsNotFound(err) {
548 t.Errorf("expected error StorageError: key not found, Code: 1, Key: /registry/base/key/%s got: %v", test.ip, err)
549 }
550 } else if err != nil {
551 t.Errorf("unexpected error reconciling: %v", err)
552 }
553
554 err = verifyCreatesAndUpdates(clientset, nil, test.expectUpdate)
555 if err != nil {
556 t.Errorf("unexpected error in side effects: %v", err)
557 }
558
559 leases, err := fakeLeases.ListLeases()
560 if err != nil {
561 t.Errorf("unexpected error: %v", err)
562 }
563
564 sort.Strings(leases)
565 sort.Strings(test.expectLeases)
566 if !reflect.DeepEqual(leases, test.expectLeases) {
567 t.Errorf("expected %v got: %v", test.expectLeases, leases)
568 }
569 })
570 }
571 }
572
573 func contains(s []string, str string) bool {
574 for _, v := range s {
575 if v == str {
576 return true
577 }
578 }
579 return false
580 }
581
582 func TestApiserverShutdown(t *testing.T) {
583 server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
584 t.Cleanup(func() { server.Terminate(t) })
585
586 newFunc := func() runtime.Object { return &corev1.Endpoints{} }
587 newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
588 sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
589
590 s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc, newListFunc, "")
591 if err != nil {
592 t.Fatalf("Error creating storage: %v", err)
593 }
594 t.Cleanup(dFunc)
595
596 reconcileTests := []struct {
597 testName string
598 serviceName string
599 ip string
600 endpointPorts []corev1.EndpointPort
601 endpointKeys []string
602 initialState []runtime.Object
603 expectUpdate []runtime.Object
604 expectLeases []string
605 shutDownBeforeReconcile bool
606 }{
607 {
608 testName: "last apiserver shutdown after endpoint reconcile",
609 serviceName: "foo",
610 ip: "1.2.3.4",
611 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
612 endpointKeys: []string{"1.2.3.4"},
613 initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
614 expectUpdate: makeEndpointsArray("foo", []string{}, []corev1.EndpointPort{}),
615 expectLeases: []string{},
616 shutDownBeforeReconcile: false,
617 },
618 {
619 testName: "last apiserver shutdown before endpoint reconcile",
620 serviceName: "foo",
621 ip: "1.2.3.4",
622 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
623 endpointKeys: []string{"1.2.3.4"},
624 initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
625 expectUpdate: makeEndpointsArray("foo", []string{}, []corev1.EndpointPort{}),
626 expectLeases: []string{},
627 shutDownBeforeReconcile: true,
628 },
629 {
630 testName: "not the last apiserver which was shutdown before endpoint reconcile",
631 serviceName: "foo",
632 ip: "1.2.3.4",
633 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
634 endpointKeys: []string{"1.2.3.4", "4.3.2.1"},
635 initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
636 expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
637 expectLeases: []string{"4.3.2.1"},
638 shutDownBeforeReconcile: true,
639 },
640 {
641 testName: "not the last apiserver which was shutdown after endpoint reconcile",
642 serviceName: "foo",
643 ip: "1.2.3.4",
644 endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
645 endpointKeys: []string{"1.2.3.4", "4.3.2.1"},
646 initialState: makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
647 expectUpdate: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
648 expectLeases: []string{"4.3.2.1"},
649 shutDownBeforeReconcile: false,
650 },
651 }
652 for _, test := range reconcileTests {
653 t.Run(test.testName, func(t *testing.T) {
654 fakeLeases := newFakeLeases(t, s)
655 err := fakeLeases.SetKeys(test.endpointKeys)
656 if err != nil {
657 t.Errorf("unexpected error creating keys: %v", err)
658 }
659 clientset := fake.NewSimpleClientset(test.initialState...)
660
661 epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
662 r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
663
664 if test.shutDownBeforeReconcile {
665
666 r.StopReconciling()
667 err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts)
668 if err != nil {
669 t.Errorf("unexpected error remove endpoints: %v", err)
670 }
671
672
673 err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false)
674 if err != nil {
675 t.Errorf("unexpected error reconciling: %v", err)
676 }
677 } else {
678
679 err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false)
680 if err != nil {
681 t.Errorf("unexpected error reconciling: %v", err)
682 }
683
684 r.StopReconciling()
685 err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts)
686 if err != nil {
687 t.Errorf("unexpected error remove endpoints: %v", err)
688 }
689 }
690
691 err = verifyCreatesAndUpdates(clientset, nil, test.expectUpdate)
692 if err != nil {
693 t.Errorf("unexpected error in side effects: %v", err)
694 }
695
696 leases, err := fakeLeases.ListLeases()
697 if err != nil {
698 t.Errorf("unexpected error: %v", err)
699 }
700
701 sort.Strings(leases)
702 sort.Strings(test.expectLeases)
703 if !reflect.DeepEqual(leases, test.expectLeases) {
704 t.Errorf("expected %v got: %v", test.expectLeases, leases)
705 }
706 })
707 }
708 }
709
View as plain text