1
16
17 package endpoints
18
19 import (
20 "errors"
21 "fmt"
22 "testing"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 apierrors "k8s.io/apimachinery/pkg/api/errors"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/util/intstr"
29 "k8s.io/apimachinery/pkg/util/wait"
30 "k8s.io/client-go/informers"
31 clientset "k8s.io/client-go/kubernetes"
32 kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
33 "k8s.io/kubernetes/pkg/controller/endpoint"
34 "k8s.io/kubernetes/test/integration/framework"
35 "k8s.io/kubernetes/test/utils/ktesting"
36 )
37
38 func TestEndpointUpdates(t *testing.T) {
39
40 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
41 defer server.TearDownFn()
42
43 client, err := clientset.NewForConfig(server.ClientConfig)
44 if err != nil {
45 t.Fatalf("Error creating clientset: %v", err)
46 }
47
48 informers := informers.NewSharedInformerFactory(client, 0)
49
50 tCtx := ktesting.Init(t)
51 epController := endpoint.NewEndpointController(
52 tCtx,
53 informers.Core().V1().Pods(),
54 informers.Core().V1().Services(),
55 informers.Core().V1().Endpoints(),
56 client,
57 0)
58
59
60 informers.Start(tCtx.Done())
61 go epController.Run(tCtx, 1)
62
63
64 ns := framework.CreateNamespaceOrDie(client, "test-endpoints-updates", t)
65 defer framework.DeleteNamespaceOrDie(client, ns, t)
66
67
68 pod := &v1.Pod{
69 ObjectMeta: metav1.ObjectMeta{
70 Name: "test-pod",
71 Namespace: ns.Name,
72 Labels: labelMap(),
73 },
74 Spec: v1.PodSpec{
75 NodeName: "fakenode",
76 Containers: []v1.Container{
77 {
78 Name: "fake-name",
79 Image: "fakeimage",
80 },
81 },
82 },
83 }
84
85 createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
86 if err != nil {
87 t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
88 }
89
90
91 createdPod.Status = v1.PodStatus{
92 Phase: v1.PodRunning,
93 PodIPs: []v1.PodIP{{IP: "1.1.1.1"}, {IP: "2001:db8::"}},
94 }
95 _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{})
96 if err != nil {
97 t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
98 }
99
100
101 svc := newService(ns.Name, "foo1")
102 svc1, err := client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
103 if err != nil {
104 t.Fatalf("Failed to create service %s: %v", svc.Name, err)
105 }
106
107
108 var resVersion string
109 if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
110 endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
111 if err != nil {
112 t.Logf("error fetching endpoints: %v", err)
113 return false, nil
114 }
115 resVersion = endpoints.ObjectMeta.ResourceVersion
116 return true, nil
117 }); err != nil {
118 t.Fatalf("endpoints not found: %v", err)
119 }
120
121
122 svc1.SetAnnotations(map[string]string{"foo": "bar"})
123 _, err = client.CoreV1().Services(ns.Name).Update(tCtx, svc1, metav1.UpdateOptions{})
124 if err != nil {
125 t.Fatalf("Failed to update service %s: %v", svc1.Name, err)
126 }
127
128
129
130
131
132 svc2 := newService(ns.Name, "foo2")
133 _, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc2, metav1.CreateOptions{})
134 if err != nil {
135 t.Fatalf("Failed to create service %s: %v", svc.Name, err)
136 }
137
138 if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
139 _, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc2.Name, metav1.GetOptions{})
140 if err != nil {
141 t.Logf("error fetching endpoints: %v", err)
142 return false, nil
143 }
144 return true, nil
145 }); err != nil {
146 t.Fatalf("endpoints not found: %v", err)
147 }
148
149
150
151 endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
152 if err != nil {
153 t.Fatalf("error fetching endpoints: %v", err)
154 }
155 if resVersion != endpoints.ObjectMeta.ResourceVersion {
156 t.Fatalf("endpoints resource version does not match, expected %s received %s", resVersion, endpoints.ObjectMeta.ResourceVersion)
157 }
158
159 }
160
161
162
163
164 func TestExternalNameToClusterIPTransition(t *testing.T) {
165
166 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
167 defer server.TearDownFn()
168
169 client, err := clientset.NewForConfig(server.ClientConfig)
170 if err != nil {
171 t.Fatalf("Error creating clientset: %v", err)
172 }
173
174 informers := informers.NewSharedInformerFactory(client, 0)
175
176 tCtx := ktesting.Init(t)
177 epController := endpoint.NewEndpointController(
178 tCtx,
179 informers.Core().V1().Pods(),
180 informers.Core().V1().Services(),
181 informers.Core().V1().Endpoints(),
182 client,
183 0)
184
185
186 informers.Start(tCtx.Done())
187 go epController.Run(tCtx, 1)
188
189
190 ns := framework.CreateNamespaceOrDie(client, "test-endpoints-updates", t)
191 defer framework.DeleteNamespaceOrDie(client, ns, t)
192
193
194 pod := &v1.Pod{
195 ObjectMeta: metav1.ObjectMeta{
196 Name: "test-pod",
197 Namespace: ns.Name,
198 Labels: labelMap(),
199 },
200 Spec: v1.PodSpec{
201 NodeName: "fakenode",
202 Containers: []v1.Container{
203 {
204 Name: "fake-name",
205 Image: "fakeimage",
206 },
207 },
208 },
209 }
210
211 createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
212 if err != nil {
213 t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
214 }
215
216
217 createdPod.Status = v1.PodStatus{
218 Phase: v1.PodRunning,
219 PodIPs: []v1.PodIP{{IP: "1.1.1.1"}, {IP: "2001:db8::"}},
220 }
221 _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{})
222 if err != nil {
223 t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
224 }
225
226
227 svc := newExternalNameService(ns.Name, "foo1")
228 svc1, err := client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
229 if err != nil {
230 t.Fatalf("Failed to create service %s: %v", svc.Name, err)
231 }
232
233 err = wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) {
234 endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
235 if err == nil {
236 t.Errorf("expected no endpoints for externalName service, got: %v", endpoints)
237 return true, nil
238 }
239 return false, nil
240 })
241 if err == nil {
242 t.Errorf("expected error waiting for endpoints")
243 }
244
245
246 svc1.Spec.Type = v1.ServiceTypeClusterIP
247 _, err = client.CoreV1().Services(ns.Name).Update(tCtx, svc1, metav1.UpdateOptions{})
248 if err != nil {
249 t.Fatalf("Failed to update service %s: %v", svc1.Name, err)
250 }
251
252 if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
253 ep, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc1.Name, metav1.GetOptions{})
254 if err != nil {
255 t.Logf("no endpoints found, error: %v", err)
256 return false, nil
257 }
258 t.Logf("endpoint %s was successfully created", svc1.Name)
259 if _, ok := ep.Labels[v1.IsHeadlessService]; ok {
260 t.Errorf("ClusterIP endpoint should not have headless label, got: %v", ep)
261 }
262 return true, nil
263 }); err != nil {
264 t.Fatalf("endpoints not found: %v", err)
265 }
266 }
267
268
269
270
271
272
273 func TestEndpointWithTerminatingPod(t *testing.T) {
274
275 server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
276 defer server.TearDownFn()
277
278 client, err := clientset.NewForConfig(server.ClientConfig)
279 if err != nil {
280 t.Fatalf("Error creating clientset: %v", err)
281 }
282
283 informers := informers.NewSharedInformerFactory(client, 0)
284
285 tCtx := ktesting.Init(t)
286 epController := endpoint.NewEndpointController(
287 tCtx,
288 informers.Core().V1().Pods(),
289 informers.Core().V1().Services(),
290 informers.Core().V1().Endpoints(),
291 client,
292 0)
293
294
295 informers.Start(tCtx.Done())
296 go epController.Run(tCtx, 1)
297
298
299 ns := framework.CreateNamespaceOrDie(client, "test-endpoints-terminating", t)
300 defer framework.DeleteNamespaceOrDie(client, ns, t)
301
302
303 pod := &v1.Pod{
304 ObjectMeta: metav1.ObjectMeta{
305 Name: "test-pod",
306 Labels: labelMap(),
307 },
308 Spec: v1.PodSpec{
309 NodeName: "fake-node",
310 Containers: []v1.Container{
311 {
312 Name: "fakename",
313 Image: "fakeimage",
314 Ports: []v1.ContainerPort{
315 {
316 Name: "port-443",
317 ContainerPort: 443,
318 },
319 },
320 },
321 },
322 },
323 Status: v1.PodStatus{
324 Phase: v1.PodRunning,
325 Conditions: []v1.PodCondition{
326 {
327 Type: v1.PodReady,
328 Status: v1.ConditionTrue,
329 },
330 },
331 PodIP: "10.0.0.1",
332 PodIPs: []v1.PodIP{
333 {
334 IP: "10.0.0.1",
335 },
336 },
337 },
338 }
339
340 createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
341 if err != nil {
342 t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
343 }
344
345 createdPod.Status = pod.Status
346 _, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{})
347 if err != nil {
348 t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
349 }
350
351
352 svc := &v1.Service{
353 ObjectMeta: metav1.ObjectMeta{
354 Name: "test-service",
355 Namespace: ns.Name,
356 Labels: map[string]string{
357 "foo": "bar",
358 },
359 },
360 Spec: v1.ServiceSpec{
361 Selector: map[string]string{
362 "foo": "bar",
363 },
364 Ports: []v1.ServicePort{
365 {Name: "port-443", Port: 443, Protocol: "TCP", TargetPort: intstr.FromInt32(443)},
366 },
367 },
368 }
369 _, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
370 if err != nil {
371 t.Fatalf("Failed to create service %s: %v", svc.Name, err)
372 }
373
374
375 if err := wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) {
376 endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
377 if err != nil {
378 return false, nil
379 }
380
381 numEndpoints := 0
382 for _, subset := range endpoints.Subsets {
383 numEndpoints += len(subset.Addresses)
384 }
385
386 if numEndpoints == 0 {
387 return false, nil
388 }
389
390 return true, nil
391 }); err != nil {
392 t.Fatalf("endpoints not found: %v", err)
393 }
394
395 err = client.CoreV1().Pods(ns.Name).Delete(tCtx, pod.Name, metav1.DeleteOptions{})
396 if err != nil {
397 t.Fatalf("error deleting test pod: %v", err)
398 }
399
400
401 if err := wait.PollImmediate(1*time.Second, 10*time.Second, func() (bool, error) {
402
403
404 pod, err := client.CoreV1().Pods(ns.Name).Get(tCtx, pod.Name, metav1.GetOptions{})
405 if apierrors.IsNotFound(err) {
406 return false, fmt.Errorf("expected Pod %q to exist with deletion timestamp but was not found: %v", pod.Name, err)
407 }
408 if err != nil {
409 return false, nil
410 }
411
412 if pod.DeletionTimestamp == nil {
413 return false, errors.New("pod did not have deletion timestamp set")
414 }
415
416 endpoints, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
417 if err != nil {
418 return false, nil
419 }
420
421 numEndpoints := 0
422 for _, subset := range endpoints.Subsets {
423 numEndpoints += len(subset.Addresses)
424 }
425
426 if numEndpoints > 0 {
427 return false, nil
428 }
429
430 return true, nil
431 }); err != nil {
432 t.Fatalf("error checking for no endpoints with terminating pods: %v", err)
433 }
434 }
435
436 func labelMap() map[string]string {
437 return map[string]string{"foo": "bar"}
438 }
439
440
441 func newService(namespace, name string) *v1.Service {
442 return &v1.Service{
443 ObjectMeta: metav1.ObjectMeta{
444 Name: name,
445 Namespace: namespace,
446 Labels: labelMap(),
447 },
448 Spec: v1.ServiceSpec{
449 Selector: labelMap(),
450 Ports: []v1.ServicePort{
451 {Name: "port-1338", Port: 1338, Protocol: "TCP", TargetPort: intstr.FromInt32(1338)},
452 {Name: "port-1337", Port: 1337, Protocol: "TCP", TargetPort: intstr.FromInt32(1337)},
453 },
454 },
455 }
456 }
457
458
459 func newExternalNameService(namespace, name string) *v1.Service {
460 svc := newService(namespace, name)
461 svc.Spec.Type = v1.ServiceTypeExternalName
462 svc.Spec.ExternalName = "google.com"
463 return svc
464 }
465
View as plain text