1
16
17 package config
18
19 import (
20 "reflect"
21 "testing"
22 "time"
23
24 v1 "k8s.io/api/core/v1"
25 discoveryv1 "k8s.io/api/discovery/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/types"
28 "k8s.io/apimachinery/pkg/util/wait"
29 "k8s.io/apimachinery/pkg/watch"
30 "k8s.io/client-go/informers"
31 "k8s.io/client-go/kubernetes/fake"
32 ktesting "k8s.io/client-go/testing"
33 "k8s.io/utils/ptr"
34 )
35
36 func TestNewServicesSourceApi_UpdatesAndMultipleServices(t *testing.T) {
37 service1v1 := &v1.Service{
38 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s1"},
39 Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}}}
40 service1v2 := &v1.Service{
41 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s1"},
42 Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 20}}}}
43 service2 := &v1.Service{
44 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "s2"},
45 Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 30}}}}
46
47
48 client := fake.NewSimpleClientset()
49 fakeWatch := watch.NewFake()
50 client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
51
52 stopCh := make(chan struct{})
53 defer close(stopCh)
54
55 handler := NewServiceHandlerMock()
56
57 sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
58
59 serviceConfig := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
60 serviceConfig.RegisterEventHandler(handler)
61 go sharedInformers.Start(stopCh)
62 go serviceConfig.Run(stopCh)
63
64
65 fakeWatch.Add(service1v1)
66 handler.ValidateServices(t, []*v1.Service{service1v1})
67
68
69 fakeWatch.Add(service2)
70 handler.ValidateServices(t, []*v1.Service{service1v1, service2})
71
72
73 fakeWatch.Modify(service1v2)
74 handler.ValidateServices(t, []*v1.Service{service1v2, service2})
75
76
77 fakeWatch.Delete(service1v2)
78 handler.ValidateServices(t, []*v1.Service{service2})
79
80
81 fakeWatch.Delete(service2)
82 handler.ValidateServices(t, []*v1.Service{})
83 }
84
85 func TestNewEndpointsSourceApi_UpdatesAndMultipleEndpoints(t *testing.T) {
86 endpoints1v1 := &discoveryv1.EndpointSlice{
87 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
88 AddressType: discoveryv1.AddressTypeIPv4,
89 Endpoints: []discoveryv1.Endpoint{{
90 Addresses: []string{
91 "1.2.3.4",
92 },
93 }},
94 Ports: []discoveryv1.EndpointPort{{
95 Port: ptr.To[int32](8080),
96 Protocol: ptr.To(v1.ProtocolTCP),
97 }},
98 }
99 endpoints1v2 := &discoveryv1.EndpointSlice{
100 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e1"},
101 AddressType: discoveryv1.AddressTypeIPv4,
102 Endpoints: []discoveryv1.Endpoint{{
103 Addresses: []string{
104 "1.2.3.4",
105 "4.3.2.1",
106 },
107 }},
108 Ports: []discoveryv1.EndpointPort{{
109 Port: ptr.To[int32](8080),
110 Protocol: ptr.To(v1.ProtocolTCP),
111 }},
112 }
113 endpoints2 := &discoveryv1.EndpointSlice{
114 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "e2"},
115 AddressType: discoveryv1.AddressTypeIPv4,
116 Endpoints: []discoveryv1.Endpoint{{
117 Addresses: []string{
118 "5.6.7.8",
119 },
120 }},
121 Ports: []discoveryv1.EndpointPort{{
122 Port: ptr.To[int32](8080),
123 Protocol: ptr.To(v1.ProtocolTCP),
124 }},
125 }
126
127
128 client := fake.NewSimpleClientset()
129 fakeWatch := watch.NewFake()
130 client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
131
132 stopCh := make(chan struct{})
133 defer close(stopCh)
134
135 handler := NewEndpointSliceHandlerMock()
136
137 sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
138
139 endpointsliceConfig := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
140 endpointsliceConfig.RegisterEventHandler(handler)
141 go sharedInformers.Start(stopCh)
142 go endpointsliceConfig.Run(stopCh)
143
144
145 fakeWatch.Add(endpoints1v1)
146 handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v1})
147
148
149 fakeWatch.Add(endpoints2)
150 handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v1, endpoints2})
151
152
153 fakeWatch.Modify(endpoints1v2)
154 handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints1v2, endpoints2})
155
156
157 fakeWatch.Delete(endpoints1v2)
158 handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{endpoints2})
159
160
161 fakeWatch.Delete(endpoints2)
162 handler.ValidateEndpointSlices(t, []*discoveryv1.EndpointSlice{})
163 }
164
165 func TestInitialSync(t *testing.T) {
166 svc1 := &v1.Service{
167 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
168 Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
169 }
170 svc2 := &v1.Service{
171 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
172 Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
173 }
174 eps1 := &discoveryv1.EndpointSlice{
175 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
176 }
177 eps2 := &discoveryv1.EndpointSlice{
178 ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
179 }
180
181 expectedSvcState := map[types.NamespacedName]*v1.Service{
182 {Name: svc1.Name, Namespace: svc1.Namespace}: svc1,
183 {Name: svc2.Name, Namespace: svc2.Namespace}: svc2,
184 }
185 expectedEpsState := map[types.NamespacedName]*discoveryv1.EndpointSlice{
186 {Name: eps1.Name, Namespace: eps1.Namespace}: eps1,
187 {Name: eps2.Name, Namespace: eps2.Namespace}: eps2,
188 }
189
190
191 client := fake.NewSimpleClientset(svc1, svc2, eps2, eps1)
192 sharedInformers := informers.NewSharedInformerFactory(client, 0)
193
194 svcConfig := NewServiceConfig(sharedInformers.Core().V1().Services(), 0)
195 svcHandler := NewServiceHandlerMock()
196 svcConfig.RegisterEventHandler(svcHandler)
197
198 epsConfig := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), 0)
199 epsHandler := NewEndpointSliceHandlerMock()
200 epsConfig.RegisterEventHandler(epsHandler)
201
202 stopCh := make(chan struct{})
203 defer close(stopCh)
204 sharedInformers.Start(stopCh)
205
206 err := wait.PollImmediate(time.Millisecond*10, wait.ForeverTestTimeout, func() (bool, error) {
207 svcHandler.lock.Lock()
208 defer svcHandler.lock.Unlock()
209 if reflect.DeepEqual(svcHandler.state, expectedSvcState) {
210 return true, nil
211 }
212 return false, nil
213 })
214 if err != nil {
215 t.Fatal("Timed out waiting for the completion of handler `OnServiceAdd`")
216 }
217
218 err = wait.PollImmediate(time.Millisecond*10, wait.ForeverTestTimeout, func() (bool, error) {
219 epsHandler.lock.Lock()
220 defer epsHandler.lock.Unlock()
221 if reflect.DeepEqual(epsHandler.state, expectedEpsState) {
222 return true, nil
223 }
224 return false, nil
225 })
226 if err != nil {
227 t.Fatal("Timed out waiting for the completion of handler `OnEndpointsAdd`")
228 }
229
230 svcConfig.Run(stopCh)
231 epsConfig.Run(stopCh)
232
233 gotSvc := <-svcHandler.updated
234 gotSvcState := make(map[types.NamespacedName]*v1.Service, len(gotSvc))
235 for _, svc := range gotSvc {
236 gotSvcState[types.NamespacedName{Namespace: svc.Namespace, Name: svc.Name}] = svc
237 }
238 if !reflect.DeepEqual(gotSvcState, expectedSvcState) {
239 t.Fatalf("Expected service state: %v\nGot: %v\n", expectedSvcState, gotSvcState)
240 }
241
242 gotEps := <-epsHandler.updated
243 gotEpsState := make(map[types.NamespacedName]*discoveryv1.EndpointSlice, len(gotEps))
244 for _, eps := range gotEps {
245 gotEpsState[types.NamespacedName{Namespace: eps.Namespace, Name: eps.Name}] = eps
246 }
247 if !reflect.DeepEqual(gotEpsState, expectedEpsState) {
248 t.Fatalf("Expected endpoints state: %v\nGot: %v\n", expectedEpsState, gotEpsState)
249 }
250 }
251
View as plain text