1
16
17 package config
18
19 import (
20 "reflect"
21 "sort"
22 "sync"
23 "testing"
24 "time"
25
26 "k8s.io/api/core/v1"
27 discoveryv1 "k8s.io/api/discovery/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/types"
30 "k8s.io/apimachinery/pkg/util/wait"
31 "k8s.io/apimachinery/pkg/watch"
32 informers "k8s.io/client-go/informers"
33 "k8s.io/client-go/kubernetes/fake"
34 ktesting "k8s.io/client-go/testing"
35 "k8s.io/utils/ptr"
36 )
37
38 type sortedServices []*v1.Service
39
40 func (s sortedServices) Len() int {
41 return len(s)
42 }
43 func (s sortedServices) Swap(i, j int) {
44 s[i], s[j] = s[j], s[i]
45 }
46 func (s sortedServices) Less(i, j int) bool {
47 return s[i].Name < s[j].Name
48 }
49
50 type ServiceHandlerMock struct {
51 lock sync.Mutex
52
53 state map[types.NamespacedName]*v1.Service
54 synced bool
55 updated chan []*v1.Service
56 process func([]*v1.Service)
57 }
58
59 func NewServiceHandlerMock() *ServiceHandlerMock {
60 shm := &ServiceHandlerMock{
61 state: make(map[types.NamespacedName]*v1.Service),
62 updated: make(chan []*v1.Service, 5),
63 }
64 shm.process = func(services []*v1.Service) {
65 shm.updated <- services
66 }
67 return shm
68 }
69
70 func (h *ServiceHandlerMock) OnServiceAdd(service *v1.Service) {
71 h.lock.Lock()
72 defer h.lock.Unlock()
73 namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
74 h.state[namespacedName] = service
75 h.sendServices()
76 }
77
78 func (h *ServiceHandlerMock) OnServiceUpdate(oldService, service *v1.Service) {
79 h.lock.Lock()
80 defer h.lock.Unlock()
81 namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
82 h.state[namespacedName] = service
83 h.sendServices()
84 }
85
86 func (h *ServiceHandlerMock) OnServiceDelete(service *v1.Service) {
87 h.lock.Lock()
88 defer h.lock.Unlock()
89 namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
90 delete(h.state, namespacedName)
91 h.sendServices()
92 }
93
94 func (h *ServiceHandlerMock) OnServiceSynced() {
95 h.lock.Lock()
96 defer h.lock.Unlock()
97 h.synced = true
98 h.sendServices()
99 }
100
101 func (h *ServiceHandlerMock) sendServices() {
102 if !h.synced {
103 return
104 }
105 services := make([]*v1.Service, 0, len(h.state))
106 for _, svc := range h.state {
107 services = append(services, svc)
108 }
109 sort.Sort(sortedServices(services))
110 h.process(services)
111 }
112
113 func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*v1.Service) {
114
115
116
117 var services []*v1.Service
118 for {
119 select {
120 case services = <-h.updated:
121 if reflect.DeepEqual(services, expectedServices) {
122 return
123 }
124
125
126 case <-time.After(wait.ForeverTestTimeout):
127 t.Errorf("Timed out. Expected %#v, Got %#v", expectedServices, services)
128 return
129 }
130 }
131 }
132
133 type sortedEndpointSlices []*discoveryv1.EndpointSlice
134
135 func (s sortedEndpointSlices) Len() int {
136 return len(s)
137 }
138 func (s sortedEndpointSlices) Swap(i, j int) {
139 s[i], s[j] = s[j], s[i]
140 }
141 func (s sortedEndpointSlices) Less(i, j int) bool {
142 return s[i].Name < s[j].Name
143 }
144
145 type EndpointSliceHandlerMock struct {
146 lock sync.Mutex
147
148 state map[types.NamespacedName]*discoveryv1.EndpointSlice
149 synced bool
150 updated chan []*discoveryv1.EndpointSlice
151 process func([]*discoveryv1.EndpointSlice)
152 }
153
154 func NewEndpointSliceHandlerMock() *EndpointSliceHandlerMock {
155 ehm := &EndpointSliceHandlerMock{
156 state: make(map[types.NamespacedName]*discoveryv1.EndpointSlice),
157 updated: make(chan []*discoveryv1.EndpointSlice, 5),
158 }
159 ehm.process = func(endpoints []*discoveryv1.EndpointSlice) {
160 ehm.updated <- endpoints
161 }
162 return ehm
163 }
164
165 func (h *EndpointSliceHandlerMock) OnEndpointSliceAdd(slice *discoveryv1.EndpointSlice) {
166 h.lock.Lock()
167 defer h.lock.Unlock()
168 namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name}
169 h.state[namespacedName] = slice
170 h.sendEndpointSlices()
171 }
172
173 func (h *EndpointSliceHandlerMock) OnEndpointSliceUpdate(oldSlice, slice *discoveryv1.EndpointSlice) {
174 h.lock.Lock()
175 defer h.lock.Unlock()
176 namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name}
177 h.state[namespacedName] = slice
178 h.sendEndpointSlices()
179 }
180
181 func (h *EndpointSliceHandlerMock) OnEndpointSliceDelete(slice *discoveryv1.EndpointSlice) {
182 h.lock.Lock()
183 defer h.lock.Unlock()
184 namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name}
185 delete(h.state, namespacedName)
186 h.sendEndpointSlices()
187 }
188
189 func (h *EndpointSliceHandlerMock) OnEndpointSlicesSynced() {
190 h.lock.Lock()
191 defer h.lock.Unlock()
192 h.synced = true
193 h.sendEndpointSlices()
194 }
195
196 func (h *EndpointSliceHandlerMock) sendEndpointSlices() {
197 if !h.synced {
198 return
199 }
200 slices := make([]*discoveryv1.EndpointSlice, 0, len(h.state))
201 for _, eps := range h.state {
202 slices = append(slices, eps)
203 }
204 sort.Sort(sortedEndpointSlices(slices))
205 h.process(slices)
206 }
207
208 func (h *EndpointSliceHandlerMock) ValidateEndpointSlices(t *testing.T, expectedSlices []*discoveryv1.EndpointSlice) {
209
210
211
212 var slices []*discoveryv1.EndpointSlice
213 for {
214 select {
215 case slices = <-h.updated:
216 if reflect.DeepEqual(slices, expectedSlices) {
217 return
218 }
219
220
221 case <-time.After(wait.ForeverTestTimeout):
222 t.Errorf("Timed out. Expected %#v, Got %#v", expectedSlices, slices)
223 return
224 }
225 }
226 }
227
228 func TestNewServiceAddedAndNotified(t *testing.T) {
229 client := fake.NewSimpleClientset()
230 fakeWatch := watch.NewFake()
231 client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
232
233 stopCh := make(chan struct{})
234 defer close(stopCh)
235
236 sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
237
238 config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
239 handler := NewServiceHandlerMock()
240 config.RegisterEventHandler(handler)
241 go sharedInformers.Start(stopCh)
242 go config.Run(stopCh)
243
244 service := &v1.Service{
245 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
246 Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
247 }
248 fakeWatch.Add(service)
249 handler.ValidateServices(t, []*v1.Service{service})
250 }
251
252 func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
253 client := fake.NewSimpleClientset()
254 fakeWatch := watch.NewFake()
255 client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
256
257 stopCh := make(chan struct{})
258 defer close(stopCh)
259
260 sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
261
262 config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
263 handler := NewServiceHandlerMock()
264 config.RegisterEventHandler(handler)
265 go sharedInformers.Start(stopCh)
266 go config.Run(stopCh)
267
268 service1 := &v1.Service{
269 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
270 Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
271 }
272 fakeWatch.Add(service1)
273 handler.ValidateServices(t, []*v1.Service{service1})
274
275 service2 := &v1.Service{
276 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
277 Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 20}}},
278 }
279 fakeWatch.Add(service2)
280 services := []*v1.Service{service2, service1}
281 handler.ValidateServices(t, services)
282
283 fakeWatch.Delete(service1)
284 services = []*v1.Service{service2}
285 handler.ValidateServices(t, services)
286 }
287
288 func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
289 client := fake.NewSimpleClientset()
290 fakeWatch := watch.NewFake()
291 client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
292
293 stopCh := make(chan struct{})
294 defer close(stopCh)
295
296 sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
297
298 config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
299 handler := NewServiceHandlerMock()
300 handler2 := NewServiceHandlerMock()
301 config.RegisterEventHandler(handler)
302 config.RegisterEventHandler(handler2)
303 go sharedInformers.Start(stopCh)
304 go config.Run(stopCh)
305
306 service1 := &v1.Service{
307 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
308 Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
309 }
310 service2 := &v1.Service{
311 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
312 Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 20}}},
313 }
314 fakeWatch.Add(service1)
315 fakeWatch.Add(service2)
316
317 services := []*v1.Service{service2, service1}
318 handler.ValidateServices(t, services)
319 handler2.ValidateServices(t, services)
320 }
321
322 func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
323 client := fake.NewSimpleClientset()
324 fakeWatch := watch.NewFake()
325 client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
326
327 stopCh := make(chan struct{})
328 defer close(stopCh)
329
330 sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
331
332 config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
333 handler := NewEndpointSliceHandlerMock()
334 handler2 := NewEndpointSliceHandlerMock()
335 config.RegisterEventHandler(handler)
336 config.RegisterEventHandler(handler2)
337 go sharedInformers.Start(stopCh)
338 go config.Run(stopCh)
339
340 endpoints1 := &discoveryv1.EndpointSlice{
341 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
342 AddressType: discoveryv1.AddressTypeIPv4,
343 Endpoints: []discoveryv1.Endpoint{{
344 Addresses: []string{"1.1.1.1"},
345 }, {
346 Addresses: []string{"2.2.2.2"},
347 }},
348 Ports: []discoveryv1.EndpointPort{{Port: ptr.To[int32](80)}},
349 }
350 endpoints2 := &discoveryv1.EndpointSlice{
351 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
352 AddressType: discoveryv1.AddressTypeIPv4,
353 Endpoints: []discoveryv1.Endpoint{{
354 Addresses: []string{"3.3.3.3"},
355 }, {
356 Addresses: []string{"4.4.4.4"},
357 }},
358 Ports: []discoveryv1.EndpointPort{{Port: ptr.To[int32](80)}},
359 }
360 fakeWatch.Add(endpoints1)
361 fakeWatch.Add(endpoints2)
362
363 endpoints := []*discoveryv1.EndpointSlice{endpoints2, endpoints1}
364 handler.ValidateEndpointSlices(t, endpoints)
365 handler2.ValidateEndpointSlices(t, endpoints)
366 }
367
368 func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
369 client := fake.NewSimpleClientset()
370 fakeWatch := watch.NewFake()
371 client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
372
373 stopCh := make(chan struct{})
374 defer close(stopCh)
375
376 sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
377
378 config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
379 handler := NewEndpointSliceHandlerMock()
380 handler2 := NewEndpointSliceHandlerMock()
381 config.RegisterEventHandler(handler)
382 config.RegisterEventHandler(handler2)
383 go sharedInformers.Start(stopCh)
384 go config.Run(stopCh)
385
386 endpoints1 := &discoveryv1.EndpointSlice{
387 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
388 AddressType: discoveryv1.AddressTypeIPv4,
389 Endpoints: []discoveryv1.Endpoint{{
390 Addresses: []string{"1.1.1.1"},
391 }, {
392 Addresses: []string{"2.2.2.2"},
393 }},
394 Ports: []discoveryv1.EndpointPort{{Port: ptr.To[int32](80)}},
395 }
396 endpoints2 := &discoveryv1.EndpointSlice{
397 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
398 AddressType: discoveryv1.AddressTypeIPv4,
399 Endpoints: []discoveryv1.Endpoint{{
400 Addresses: []string{"3.3.3.3"},
401 }, {
402 Addresses: []string{"4.4.4.4"},
403 }},
404 Ports: []discoveryv1.EndpointPort{{Port: ptr.To[int32](80)}},
405 }
406 fakeWatch.Add(endpoints1)
407 fakeWatch.Add(endpoints2)
408
409 endpoints := []*discoveryv1.EndpointSlice{endpoints2, endpoints1}
410 handler.ValidateEndpointSlices(t, endpoints)
411 handler2.ValidateEndpointSlices(t, endpoints)
412
413
414 endpoints3 := &discoveryv1.EndpointSlice{
415 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
416 AddressType: discoveryv1.AddressTypeIPv4,
417 Endpoints: []discoveryv1.Endpoint{{
418 Addresses: []string{"5.5.5.5"},
419 }, {
420 Addresses: []string{"6.6.6.6"},
421 }},
422 Ports: []discoveryv1.EndpointPort{{Port: ptr.To[int32](80)}},
423 }
424 fakeWatch.Add(endpoints3)
425 endpoints = []*discoveryv1.EndpointSlice{endpoints2, endpoints1, endpoints3}
426 handler.ValidateEndpointSlices(t, endpoints)
427 handler2.ValidateEndpointSlices(t, endpoints)
428
429
430 endpoints1v2 := &discoveryv1.EndpointSlice{
431 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
432 AddressType: discoveryv1.AddressTypeIPv4,
433 Endpoints: []discoveryv1.Endpoint{{
434 Addresses: []string{"7.7.7.7"},
435 }},
436 Ports: []discoveryv1.EndpointPort{{Port: ptr.To[int32](80)}},
437 }
438 fakeWatch.Modify(endpoints1v2)
439 endpoints = []*discoveryv1.EndpointSlice{endpoints2, endpoints1v2, endpoints3}
440 handler.ValidateEndpointSlices(t, endpoints)
441 handler2.ValidateEndpointSlices(t, endpoints)
442
443
444 fakeWatch.Delete(endpoints2)
445 endpoints = []*discoveryv1.EndpointSlice{endpoints1v2, endpoints3}
446 handler.ValidateEndpointSlices(t, endpoints)
447 handler2.ValidateEndpointSlices(t, endpoints)
448 }
449
450
451
452
453
454
View as plain text