1
16
17 package service
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "net"
24 "strconv"
25 "strings"
26 "time"
27
28 "github.com/onsi/ginkgo/v2"
29 v1 "k8s.io/api/core/v1"
30 discoveryv1 "k8s.io/api/discovery/v1"
31 policyv1 "k8s.io/api/policy/v1"
32 apierrors "k8s.io/apimachinery/pkg/api/errors"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/fields"
35 "k8s.io/apimachinery/pkg/labels"
36 "k8s.io/apimachinery/pkg/runtime"
37 "k8s.io/apimachinery/pkg/util/intstr"
38 utilnet "k8s.io/apimachinery/pkg/util/net"
39 "k8s.io/apimachinery/pkg/util/sets"
40 "k8s.io/apimachinery/pkg/util/uuid"
41 "k8s.io/apimachinery/pkg/util/wait"
42 "k8s.io/apimachinery/pkg/watch"
43 clientset "k8s.io/client-go/kubernetes"
44 "k8s.io/client-go/tools/cache"
45 "k8s.io/kubernetes/test/e2e/framework"
46 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
47 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
48 e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
49 e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
50 testutils "k8s.io/kubernetes/test/utils"
51 imageutils "k8s.io/kubernetes/test/utils/image"
52 netutils "k8s.io/utils/net"
53 )
54
55
56 var NodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
57
58
59 var errAllocated = errors.New("provided port is already allocated")
60
61
62 type TestJig struct {
63 Client clientset.Interface
64 Namespace string
65 Name string
66 ID string
67 Labels map[string]string
68
69
70 ExternalIPs bool
71 }
72
73
74 func NewTestJig(client clientset.Interface, namespace, name string) *TestJig {
75 j := &TestJig{}
76 j.Client = client
77 j.Namespace = namespace
78 j.Name = name
79 j.ID = j.Name + "-" + string(uuid.NewUUID())
80 j.Labels = map[string]string{"testid": j.ID}
81
82 return j
83 }
84
85
86
87
88 func (j *TestJig) newServiceTemplate(proto v1.Protocol, port int32) *v1.Service {
89 service := &v1.Service{
90 ObjectMeta: metav1.ObjectMeta{
91 Namespace: j.Namespace,
92 Name: j.Name,
93 Labels: j.Labels,
94 },
95 Spec: v1.ServiceSpec{
96 Selector: j.Labels,
97 Ports: []v1.ServicePort{
98 {
99 Protocol: proto,
100 Port: port,
101 },
102 },
103 },
104 }
105 return service
106 }
107
108
109
110
111 func (j *TestJig) CreateTCPServiceWithPort(ctx context.Context, tweak func(svc *v1.Service), port int32) (*v1.Service, error) {
112 svc := j.newServiceTemplate(v1.ProtocolTCP, port)
113 if tweak != nil {
114 tweak(svc)
115 }
116 result, err := j.Client.CoreV1().Services(j.Namespace).Create(ctx, svc, metav1.CreateOptions{})
117 if err != nil {
118 return nil, fmt.Errorf("failed to create TCP Service %q: %w", svc.Name, err)
119 }
120 return j.sanityCheckService(result, svc.Spec.Type)
121 }
122
123
124
125
126 func (j *TestJig) CreateTCPService(ctx context.Context, tweak func(svc *v1.Service)) (*v1.Service, error) {
127 return j.CreateTCPServiceWithPort(ctx, tweak, 80)
128 }
129
130
131
132
133 func (j *TestJig) CreateUDPService(ctx context.Context, tweak func(svc *v1.Service)) (*v1.Service, error) {
134 svc := j.newServiceTemplate(v1.ProtocolUDP, 80)
135 if tweak != nil {
136 tweak(svc)
137 }
138 result, err := j.Client.CoreV1().Services(j.Namespace).Create(ctx, svc, metav1.CreateOptions{})
139 if err != nil {
140 return nil, fmt.Errorf("failed to create UDP Service %q: %w", svc.Name, err)
141 }
142 return j.sanityCheckService(result, svc.Spec.Type)
143 }
144
145
146
147 func (j *TestJig) CreateExternalNameService(ctx context.Context, tweak func(svc *v1.Service)) (*v1.Service, error) {
148 svc := &v1.Service{
149 ObjectMeta: metav1.ObjectMeta{
150 Namespace: j.Namespace,
151 Name: j.Name,
152 Labels: j.Labels,
153 },
154 Spec: v1.ServiceSpec{
155 Selector: j.Labels,
156 ExternalName: "foo.example.com",
157 Type: v1.ServiceTypeExternalName,
158 },
159 }
160 if tweak != nil {
161 tweak(svc)
162 }
163 result, err := j.Client.CoreV1().Services(j.Namespace).Create(ctx, svc, metav1.CreateOptions{})
164 if err != nil {
165 return nil, fmt.Errorf("failed to create ExternalName Service %q: %w", svc.Name, err)
166 }
167 return j.sanityCheckService(result, svc.Spec.Type)
168 }
169
170
171 func (j *TestJig) ChangeServiceType(ctx context.Context, newType v1.ServiceType, timeout time.Duration) error {
172 ingressIP := ""
173 svc, err := j.UpdateService(ctx, func(s *v1.Service) {
174 for _, ing := range s.Status.LoadBalancer.Ingress {
175 if ing.IP != "" {
176 ingressIP = ing.IP
177 }
178 }
179 s.Spec.Type = newType
180 s.Spec.Ports[0].NodePort = 0
181 })
182 if err != nil {
183 return err
184 }
185 if ingressIP != "" {
186 _, err = j.WaitForLoadBalancerDestroy(ctx, ingressIP, int(svc.Spec.Ports[0].Port), timeout)
187 }
188 return err
189 }
190
191
192
193
194
195 func (j *TestJig) CreateOnlyLocalNodePortService(ctx context.Context, createPod bool) (*v1.Service, error) {
196 ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=NodePort and ExternalTrafficPolicy=Local")
197 svc, err := j.CreateTCPService(ctx, func(svc *v1.Service) {
198 svc.Spec.Type = v1.ServiceTypeNodePort
199 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
200 svc.Spec.Ports = []v1.ServicePort{{Protocol: v1.ProtocolTCP, Port: 80}}
201 })
202 if err != nil {
203 return nil, err
204 }
205
206 if createPod {
207 ginkgo.By("creating a pod to be part of the service " + j.Name)
208 _, err = j.Run(ctx, nil)
209 if err != nil {
210 return nil, err
211 }
212 }
213 return svc, nil
214 }
215
216
217
218
219
220 func (j *TestJig) CreateOnlyLocalLoadBalancerService(ctx context.Context, timeout time.Duration, createPod bool,
221 tweak func(svc *v1.Service)) (*v1.Service, error) {
222 _, err := j.CreateLoadBalancerService(ctx, timeout, func(svc *v1.Service) {
223 ginkgo.By("setting ExternalTrafficPolicy=Local")
224 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
225 if tweak != nil {
226 tweak(svc)
227 }
228 })
229 if err != nil {
230 return nil, err
231 }
232
233 if createPod {
234 ginkgo.By("creating a pod to be part of the service " + j.Name)
235 _, err = j.Run(ctx, nil)
236 if err != nil {
237 return nil, err
238 }
239 }
240 ginkgo.By("waiting for loadbalancer for service " + j.Namespace + "/" + j.Name)
241 return j.WaitForLoadBalancer(ctx, timeout)
242 }
243
244
245
246 func (j *TestJig) CreateLoadBalancerService(ctx context.Context, timeout time.Duration, tweak func(svc *v1.Service)) (*v1.Service, error) {
247 ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=LoadBalancer")
248 svc := j.newServiceTemplate(v1.ProtocolTCP, 80)
249 svc.Spec.Type = v1.ServiceTypeLoadBalancer
250
251 svc.Spec.SessionAffinity = v1.ServiceAffinityNone
252 if tweak != nil {
253 tweak(svc)
254 }
255 _, err := j.Client.CoreV1().Services(j.Namespace).Create(ctx, svc, metav1.CreateOptions{})
256 if err != nil {
257 return nil, fmt.Errorf("failed to create LoadBalancer Service %q: %w", svc.Name, err)
258 }
259
260 ginkgo.By("waiting for loadbalancer for service " + j.Namespace + "/" + j.Name)
261 return j.WaitForLoadBalancer(ctx, timeout)
262 }
263
264
265
266 func (j *TestJig) GetEndpointNodes(ctx context.Context) (map[string][]string, error) {
267 return j.GetEndpointNodesWithIP(ctx, v1.NodeExternalIP)
268 }
269
270
271
272 func (j *TestJig) GetEndpointNodesWithIP(ctx context.Context, addressType v1.NodeAddressType) (map[string][]string, error) {
273 nodes, err := j.ListNodesWithEndpoint(ctx)
274 if err != nil {
275 return nil, err
276 }
277 nodeMap := map[string][]string{}
278 for _, node := range nodes {
279 nodeMap[node.Name] = e2enode.GetAddresses(&node, addressType)
280 }
281 return nodeMap, nil
282 }
283
284
285
286 func (j *TestJig) ListNodesWithEndpoint(ctx context.Context) ([]v1.Node, error) {
287 nodeNames, err := j.GetEndpointNodeNames(ctx)
288 if err != nil {
289 return nil, err
290 }
291 allNodes, err := j.Client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
292 if err != nil {
293 return nil, err
294 }
295 epNodes := make([]v1.Node, 0, nodeNames.Len())
296 for _, node := range allNodes.Items {
297 if nodeNames.Has(node.Name) {
298 epNodes = append(epNodes, node)
299 }
300 }
301 return epNodes, nil
302 }
303
304
305
306 func (j *TestJig) GetEndpointNodeNames(ctx context.Context) (sets.String, error) {
307 err := j.waitForAvailableEndpoint(ctx, ServiceEndpointsTimeout)
308 if err != nil {
309 return nil, err
310 }
311 endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
312 if err != nil {
313 return nil, fmt.Errorf("get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
314 }
315 if len(endpoints.Subsets) == 0 {
316 return nil, fmt.Errorf("endpoint has no subsets, cannot determine node addresses")
317 }
318 epNodes := sets.NewString()
319 for _, ss := range endpoints.Subsets {
320 for _, e := range ss.Addresses {
321 if e.NodeName != nil {
322 epNodes.Insert(*e.NodeName)
323 }
324 }
325 }
326 return epNodes, nil
327 }
328
329
330 func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) error {
331 return wait.PollUntilContextTimeout(ctx, framework.Poll, KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) {
332 endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
333 if err != nil {
334 framework.Logf("Get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
335 return false, nil
336 }
337 if len(endpoints.Subsets) == 0 {
338 framework.Logf("Expect endpoints with subsets, got none.")
339 return false, nil
340 }
341
342 if len(endpoints.Subsets[0].Addresses) == 0 {
343 framework.Logf("Expected Ready endpoints - found none")
344 return false, nil
345 }
346 epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
347 framework.Logf("Pod for service %s/%s is on node %s", j.Namespace, j.Name, epHostName)
348 if epHostName != nodeName {
349 framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
350 return false, nil
351 }
352 return true, nil
353 })
354 }
355
356
357 func (j *TestJig) waitForAvailableEndpoint(ctx context.Context, timeout time.Duration) error {
358
359 endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name)
360 stopCh := make(chan struct{})
361 endpointAvailable := false
362 endpointSliceAvailable := false
363
364 var controller cache.Controller
365 _, controller = cache.NewInformer(
366 &cache.ListWatch{
367 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
368 options.FieldSelector = endpointSelector.String()
369 obj, err := j.Client.CoreV1().Endpoints(j.Namespace).List(ctx, options)
370 return runtime.Object(obj), err
371 },
372 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
373 options.FieldSelector = endpointSelector.String()
374 return j.Client.CoreV1().Endpoints(j.Namespace).Watch(ctx, options)
375 },
376 },
377 &v1.Endpoints{},
378 0,
379 cache.ResourceEventHandlerFuncs{
380 AddFunc: func(obj interface{}) {
381 if e, ok := obj.(*v1.Endpoints); ok {
382 if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
383 endpointAvailable = true
384 }
385 }
386 },
387 UpdateFunc: func(old, cur interface{}) {
388 if e, ok := cur.(*v1.Endpoints); ok {
389 if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
390 endpointAvailable = true
391 }
392 }
393 },
394 },
395 )
396 defer func() {
397 close(stopCh)
398 }()
399
400 go controller.Run(stopCh)
401
402 var esController cache.Controller
403 _, esController = cache.NewInformer(
404 &cache.ListWatch{
405 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
406 options.LabelSelector = "kubernetes.io/service-name=" + j.Name
407 obj, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, options)
408 return runtime.Object(obj), err
409 },
410 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
411 options.LabelSelector = "kubernetes.io/service-name=" + j.Name
412 return j.Client.DiscoveryV1().EndpointSlices(j.Namespace).Watch(ctx, options)
413 },
414 },
415 &discoveryv1.EndpointSlice{},
416 0,
417 cache.ResourceEventHandlerFuncs{
418 AddFunc: func(obj interface{}) {
419 if es, ok := obj.(*discoveryv1.EndpointSlice); ok {
420
421
422
423 if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
424 endpointSliceAvailable = true
425 }
426 }
427 },
428 UpdateFunc: func(old, cur interface{}) {
429 if es, ok := cur.(*discoveryv1.EndpointSlice); ok {
430
431
432
433 if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
434 endpointSliceAvailable = true
435 }
436 }
437 },
438 },
439 )
440
441 go esController.Run(stopCh)
442
443 err := wait.PollWithContext(ctx, 1*time.Second, timeout, func(ctx context.Context) (bool, error) {
444 return endpointAvailable && endpointSliceAvailable, nil
445 })
446 if err != nil {
447 return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout)
448 }
449 return nil
450 }
451
452
453
454
455 func (j *TestJig) sanityCheckService(svc *v1.Service, svcType v1.ServiceType) (*v1.Service, error) {
456 if svcType == "" {
457 svcType = v1.ServiceTypeClusterIP
458 }
459 if svc.Spec.Type != svcType {
460 return nil, fmt.Errorf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
461 }
462
463 if svcType != v1.ServiceTypeExternalName {
464 if svc.Spec.ExternalName != "" {
465 return nil, fmt.Errorf("unexpected Spec.ExternalName (%s) for service, expected empty", svc.Spec.ExternalName)
466 }
467 if svc.Spec.ClusterIP == "" {
468 return nil, fmt.Errorf("didn't get ClusterIP for non-ExternalName service")
469 }
470 } else {
471 if svc.Spec.ClusterIP != "" {
472 return nil, fmt.Errorf("unexpected Spec.ClusterIP (%s) for ExternalName service, expected empty", svc.Spec.ClusterIP)
473 }
474 }
475
476 expectNodePorts := needsNodePorts(svc)
477 for i, port := range svc.Spec.Ports {
478 hasNodePort := (port.NodePort != 0)
479 if hasNodePort != expectNodePorts {
480 return nil, fmt.Errorf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
481 }
482 if hasNodePort {
483 if !NodePortRange.Contains(int(port.NodePort)) {
484 return nil, fmt.Errorf("out-of-range nodePort (%d) for service", port.NodePort)
485 }
486 }
487 }
488
489
490
491
492
493
494
495
496 return svc, nil
497 }
498
499 func needsNodePorts(svc *v1.Service) bool {
500 if svc == nil {
501 return false
502 }
503
504 if svc.Spec.Type == v1.ServiceTypeNodePort {
505 return true
506 }
507 if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
508 return false
509 }
510
511 if svc.Spec.AllocateLoadBalancerNodePorts == nil {
512 return true
513 }
514 return *svc.Spec.AllocateLoadBalancerNodePorts
515 }
516
517
518
519
520 func (j *TestJig) UpdateService(ctx context.Context, update func(*v1.Service)) (*v1.Service, error) {
521 for i := 0; i < 3; i++ {
522 service, err := j.Client.CoreV1().Services(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
523 if err != nil {
524 return nil, fmt.Errorf("failed to get Service %q: %w", j.Name, err)
525 }
526 update(service)
527 result, err := j.Client.CoreV1().Services(j.Namespace).Update(ctx, service, metav1.UpdateOptions{})
528 if err == nil {
529 return j.sanityCheckService(result, service.Spec.Type)
530 }
531 if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) {
532 return nil, fmt.Errorf("failed to update Service %q: %w", j.Name, err)
533 }
534 }
535 return nil, fmt.Errorf("too many retries updating Service %q", j.Name)
536 }
537
538
539 func (j *TestJig) WaitForNewIngressIP(ctx context.Context, existingIP string, timeout time.Duration) (*v1.Service, error) {
540 framework.Logf("Waiting up to %v for service %q to get a new ingress IP", timeout, j.Name)
541 service, err := j.waitForCondition(ctx, timeout, "have a new ingress IP", func(svc *v1.Service) bool {
542 if len(svc.Status.LoadBalancer.Ingress) == 0 {
543 return false
544 }
545 ip := svc.Status.LoadBalancer.Ingress[0].IP
546 if ip == "" || ip == existingIP {
547 return false
548 }
549 return true
550 })
551 if err != nil {
552 return nil, err
553 }
554 return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer)
555 }
556
557
558 func (j *TestJig) ChangeServiceNodePort(ctx context.Context, initial int) (*v1.Service, error) {
559 var err error
560 var service *v1.Service
561 for i := 1; i < NodePortRange.Size; i++ {
562 offs1 := initial - NodePortRange.Base
563 offs2 := (offs1 + i) % NodePortRange.Size
564 newPort := NodePortRange.Base + offs2
565 service, err = j.UpdateService(ctx, func(s *v1.Service) {
566 s.Spec.Ports[0].NodePort = int32(newPort)
567 })
568 if err != nil && strings.Contains(err.Error(), errAllocated.Error()) {
569 framework.Logf("tried nodePort %d, but it is in use, will try another", newPort)
570 continue
571 }
572
573 break
574 }
575 return service, err
576 }
577
578
579 func (j *TestJig) WaitForLoadBalancer(ctx context.Context, timeout time.Duration) (*v1.Service, error) {
580 framework.Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, j.Name)
581 service, err := j.waitForCondition(ctx, timeout, "have a load balancer", func(svc *v1.Service) bool {
582 return len(svc.Status.LoadBalancer.Ingress) > 0
583 })
584 if err != nil {
585 return nil, err
586 }
587
588 for i, ing := range service.Status.LoadBalancer.Ingress {
589 if ing.IP == "" && ing.Hostname == "" {
590 return nil, fmt.Errorf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
591 }
592 }
593
594 return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer)
595 }
596
597
598 func (j *TestJig) WaitForLoadBalancerDestroy(ctx context.Context, ip string, port int, timeout time.Duration) (*v1.Service, error) {
599
600 defer func() {
601 if err := framework.EnsureLoadBalancerResourcesDeleted(ctx, ip, strconv.Itoa(port)); err != nil {
602 framework.Logf("Failed to delete cloud resources for service: %s %d (%v)", ip, port, err)
603 }
604 }()
605
606 framework.Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, j.Name)
607 service, err := j.waitForCondition(ctx, timeout, "have no load balancer", func(svc *v1.Service) bool {
608 return len(svc.Status.LoadBalancer.Ingress) == 0
609 })
610 if err != nil {
611 return nil, err
612 }
613 return j.sanityCheckService(service, service.Spec.Type)
614 }
615
616 func (j *TestJig) waitForCondition(ctx context.Context, timeout time.Duration, message string, conditionFn func(*v1.Service) bool) (*v1.Service, error) {
617 var service *v1.Service
618 pollFunc := func(ctx context.Context) (bool, error) {
619 svc, err := j.Client.CoreV1().Services(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
620 if err != nil {
621 framework.Logf("Retrying .... error trying to get Service %s: %v", j.Name, err)
622 return false, nil
623 }
624 if conditionFn(svc) {
625 service = svc
626 return true, nil
627 }
628 return false, nil
629 }
630 if err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, pollFunc); err != nil {
631 return nil, fmt.Errorf("timed out waiting for service %q to %s: %w", j.Name, message, err)
632 }
633 return service, nil
634 }
635
636
637
638
639 func (j *TestJig) newRCTemplate() *v1.ReplicationController {
640 var replicas int32 = 1
641 var grace int64 = 3
642
643 rc := &v1.ReplicationController{
644 ObjectMeta: metav1.ObjectMeta{
645 Namespace: j.Namespace,
646 Name: j.Name,
647 Labels: j.Labels,
648 },
649 Spec: v1.ReplicationControllerSpec{
650 Replicas: &replicas,
651 Selector: j.Labels,
652 Template: &v1.PodTemplateSpec{
653 ObjectMeta: metav1.ObjectMeta{
654 Labels: j.Labels,
655 },
656 Spec: v1.PodSpec{
657 Containers: []v1.Container{
658 {
659 Name: "netexec",
660 Image: imageutils.GetE2EImage(imageutils.Agnhost),
661 Args: []string{"netexec", "--http-port=80", "--udp-port=80"},
662 ReadinessProbe: &v1.Probe{
663 PeriodSeconds: 3,
664 ProbeHandler: v1.ProbeHandler{
665 HTTPGet: &v1.HTTPGetAction{
666 Port: intstr.FromInt32(80),
667 Path: "/hostName",
668 },
669 },
670 },
671 },
672 },
673 TerminationGracePeriodSeconds: &grace,
674 },
675 },
676 },
677 }
678 return rc
679 }
680
681
682 func (j *TestJig) AddRCAntiAffinity(rc *v1.ReplicationController) {
683 var replicas int32 = 2
684
685 rc.Spec.Replicas = &replicas
686 if rc.Spec.Template.Spec.Affinity == nil {
687 rc.Spec.Template.Spec.Affinity = &v1.Affinity{}
688 }
689 if rc.Spec.Template.Spec.Affinity.PodAntiAffinity == nil {
690 rc.Spec.Template.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{}
691 }
692 rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
693 rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
694 v1.PodAffinityTerm{
695 LabelSelector: &metav1.LabelSelector{MatchLabels: j.Labels},
696 Namespaces: nil,
697 TopologyKey: "kubernetes.io/hostname",
698 })
699 }
700
701
702 func (j *TestJig) CreatePDB(ctx context.Context, rc *v1.ReplicationController) (*policyv1.PodDisruptionBudget, error) {
703 pdb := j.newPDBTemplate(rc)
704 newPdb, err := j.Client.PolicyV1().PodDisruptionBudgets(j.Namespace).Create(ctx, pdb, metav1.CreateOptions{})
705 if err != nil {
706 return nil, fmt.Errorf("failed to create PDB %q %v", pdb.Name, err)
707 }
708 if err := j.waitForPdbReady(ctx); err != nil {
709 return nil, fmt.Errorf("failed waiting for PDB to be ready: %w", err)
710 }
711
712 return newPdb, nil
713 }
714
715
716
717
718 func (j *TestJig) newPDBTemplate(rc *v1.ReplicationController) *policyv1.PodDisruptionBudget {
719 minAvailable := intstr.FromInt32(*rc.Spec.Replicas - 1)
720
721 pdb := &policyv1.PodDisruptionBudget{
722 ObjectMeta: metav1.ObjectMeta{
723 Namespace: j.Namespace,
724 Name: j.Name,
725 Labels: j.Labels,
726 },
727 Spec: policyv1.PodDisruptionBudgetSpec{
728 MinAvailable: &minAvailable,
729 Selector: &metav1.LabelSelector{MatchLabels: j.Labels},
730 },
731 }
732
733 return pdb
734 }
735
736
737
738
739 func (j *TestJig) Run(ctx context.Context, tweak func(rc *v1.ReplicationController)) (*v1.ReplicationController, error) {
740 rc := j.newRCTemplate()
741 if tweak != nil {
742 tweak(rc)
743 }
744 result, err := j.Client.CoreV1().ReplicationControllers(j.Namespace).Create(ctx, rc, metav1.CreateOptions{})
745 if err != nil {
746 return nil, fmt.Errorf("failed to create RC %q: %w", rc.Name, err)
747 }
748 pods, err := j.waitForPodsCreated(ctx, int(*(rc.Spec.Replicas)))
749 if err != nil {
750 return nil, fmt.Errorf("failed to create pods: %w", err)
751 }
752 if err := j.waitForPodsReady(ctx, pods); err != nil {
753 return nil, fmt.Errorf("failed waiting for pods to be running: %w", err)
754 }
755 return result, nil
756 }
757
758
759 func (j *TestJig) Scale(ctx context.Context, replicas int) error {
760 rc := j.Name
761 scale, err := j.Client.CoreV1().ReplicationControllers(j.Namespace).GetScale(ctx, rc, metav1.GetOptions{})
762 if err != nil {
763 return fmt.Errorf("failed to get scale for RC %q: %w", rc, err)
764 }
765
766 scale.ResourceVersion = ""
767 scale.Spec.Replicas = int32(replicas)
768 _, err = j.Client.CoreV1().ReplicationControllers(j.Namespace).UpdateScale(ctx, rc, scale, metav1.UpdateOptions{})
769 if err != nil {
770 return fmt.Errorf("failed to scale RC %q: %w", rc, err)
771 }
772 pods, err := j.waitForPodsCreated(ctx, replicas)
773 if err != nil {
774 return fmt.Errorf("failed waiting for pods: %w", err)
775 }
776 if err := j.waitForPodsReady(ctx, pods); err != nil {
777 return fmt.Errorf("failed waiting for pods to be running: %w", err)
778 }
779 return nil
780 }
781
782 func (j *TestJig) waitForPdbReady(ctx context.Context) error {
783 timeout := 2 * time.Minute
784 for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
785 pdb, err := j.Client.PolicyV1().PodDisruptionBudgets(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
786 if err != nil {
787 return err
788 }
789 if pdb.Status.DisruptionsAllowed > 0 {
790 return nil
791 }
792 }
793
794 return fmt.Errorf("timeout waiting for PDB %q to be ready", j.Name)
795 }
796
797 func (j *TestJig) waitForPodsCreated(ctx context.Context, replicas int) ([]string, error) {
798
799 timeout := 2 * time.Minute
800
801 label := labels.SelectorFromSet(labels.Set(j.Labels))
802 framework.Logf("Waiting up to %v for %d pods to be created", timeout, replicas)
803 for start := time.Now(); time.Since(start) < timeout && ctx.Err() == nil; time.Sleep(2 * time.Second) {
804 options := metav1.ListOptions{LabelSelector: label.String()}
805 pods, err := j.Client.CoreV1().Pods(j.Namespace).List(ctx, options)
806 if err != nil {
807 return nil, err
808 }
809
810 found := []string{}
811 for _, pod := range pods.Items {
812 if pod.DeletionTimestamp != nil {
813 continue
814 }
815 found = append(found, pod.Name)
816 }
817 if len(found) == replicas {
818 framework.Logf("Found all %d pods", replicas)
819 return found, nil
820 }
821 framework.Logf("Found %d/%d pods - will retry", len(found), replicas)
822 }
823 return nil, fmt.Errorf("timeout waiting for %d pods to be created", replicas)
824 }
825
826 func (j *TestJig) waitForPodsReady(ctx context.Context, pods []string) error {
827 timeout := 2 * time.Minute
828 if !e2epod.CheckPodsRunningReady(ctx, j.Client, j.Namespace, pods, timeout) {
829 return fmt.Errorf("timeout waiting for %d pods to be ready", len(pods))
830 }
831 return nil
832 }
833
834 func testReachabilityOverServiceName(ctx context.Context, serviceName string, sp v1.ServicePort, execPod *v1.Pod) error {
835 return testEndpointReachability(ctx, serviceName, sp.Port, sp.Protocol, execPod)
836 }
837
838 func testReachabilityOverClusterIP(ctx context.Context, clusterIP string, sp v1.ServicePort, execPod *v1.Pod) error {
839
840 if netutils.ParseIPSloppy(clusterIP) == nil {
841 return fmt.Errorf("unable to parse ClusterIP: %s", clusterIP)
842 }
843 return testEndpointReachability(ctx, clusterIP, sp.Port, sp.Protocol, execPod)
844 }
845
846 func testReachabilityOverExternalIP(ctx context.Context, externalIP string, sp v1.ServicePort, execPod *v1.Pod) error {
847 return testEndpointReachability(ctx, externalIP, sp.Port, sp.Protocol, execPod)
848 }
849
850 func testReachabilityOverNodePorts(ctx context.Context, nodes *v1.NodeList, sp v1.ServicePort, pod *v1.Pod, clusterIP string, externalIPs bool) error {
851 internalAddrs := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
852 isClusterIPV4 := netutils.IsIPv4String(clusterIP)
853
854 for _, internalAddr := range internalAddrs {
855
856
857 if isInvalidOrLocalhostAddress(internalAddr) {
858 framework.Logf("skipping testEndpointReachability() for internal address %s", internalAddr)
859 continue
860 }
861
862 if isClusterIPV4 != netutils.IsIPv4String(internalAddr) {
863 framework.Logf("skipping testEndpointReachability() for internal address %s as it does not match clusterIP (%s) family", internalAddr, clusterIP)
864 continue
865 }
866
867 err := testEndpointReachability(ctx, internalAddr, sp.NodePort, sp.Protocol, pod)
868 if err != nil {
869 return err
870 }
871 }
872 if externalIPs {
873 externalAddrs := e2enode.CollectAddresses(nodes, v1.NodeExternalIP)
874 for _, externalAddr := range externalAddrs {
875 if isClusterIPV4 != netutils.IsIPv4String(externalAddr) {
876 framework.Logf("skipping testEndpointReachability() for external address %s as it does not match clusterIP (%s) family", externalAddr, clusterIP)
877 continue
878 }
879 err := testEndpointReachability(ctx, externalAddr, sp.NodePort, sp.Protocol, pod)
880 if err != nil {
881 return err
882 }
883 }
884 }
885 return nil
886 }
887
888
889
890 func isInvalidOrLocalhostAddress(ip string) bool {
891 parsedIP := netutils.ParseIPSloppy(ip)
892 if parsedIP == nil || parsedIP.IsLoopback() {
893 return true
894 }
895 return false
896 }
897
898
899
900
901 func testEndpointReachability(ctx context.Context, endpoint string, port int32, protocol v1.Protocol, execPod *v1.Pod) error {
902 ep := net.JoinHostPort(endpoint, strconv.Itoa(int(port)))
903 cmd := ""
904 switch protocol {
905 case v1.ProtocolTCP:
906 cmd = fmt.Sprintf("echo hostName | nc -v -t -w 2 %s %v", endpoint, port)
907 case v1.ProtocolUDP:
908 cmd = fmt.Sprintf("echo hostName | nc -v -u -w 2 %s %v", endpoint, port)
909 default:
910 return fmt.Errorf("service reachability check is not supported for %v", protocol)
911 }
912
913 err := wait.PollUntilContextTimeout(ctx, 1*time.Second, ServiceReachabilityShortPollTimeout, true, func(ctx context.Context) (bool, error) {
914 stdout, err := e2epodoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
915 if err != nil {
916 framework.Logf("Service reachability failing with error: %v\nRetrying...", err)
917 return false, nil
918 }
919 trimmed := strings.TrimSpace(stdout)
920 if trimmed != "" {
921 return true, nil
922 }
923 return false, nil
924 })
925 if err != nil {
926 return fmt.Errorf("service is not reachable within %v timeout on endpoint %s over %s protocol", ServiceReachabilityShortPollTimeout, ep, protocol)
927 }
928 return nil
929 }
930
931
932
933 func (j *TestJig) checkClusterIPServiceReachability(ctx context.Context, svc *v1.Service, pod *v1.Pod) error {
934 clusterIP := svc.Spec.ClusterIP
935 servicePorts := svc.Spec.Ports
936 externalIPs := svc.Spec.ExternalIPs
937
938 err := j.waitForAvailableEndpoint(ctx, ServiceEndpointsTimeout)
939 if err != nil {
940 return err
941 }
942
943 for _, servicePort := range servicePorts {
944 err = testReachabilityOverServiceName(ctx, svc.Name, servicePort, pod)
945 if err != nil {
946 return err
947 }
948 err = testReachabilityOverClusterIP(ctx, clusterIP, servicePort, pod)
949 if err != nil {
950 return err
951 }
952 if len(externalIPs) > 0 {
953 for _, externalIP := range externalIPs {
954 err = testReachabilityOverExternalIP(ctx, externalIP, servicePort, pod)
955 if err != nil {
956 return err
957 }
958 }
959 }
960 }
961 return nil
962 }
963
964
965
966
967
968
969 func (j *TestJig) checkNodePortServiceReachability(ctx context.Context, svc *v1.Service, pod *v1.Pod) error {
970 clusterIP := svc.Spec.ClusterIP
971 servicePorts := svc.Spec.Ports
972
973
974 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, j.Client, 2)
975 if err != nil {
976 return err
977 }
978
979 err = j.waitForAvailableEndpoint(ctx, ServiceEndpointsTimeout)
980 if err != nil {
981 return err
982 }
983
984 for _, servicePort := range servicePorts {
985 err = testReachabilityOverServiceName(ctx, svc.Name, servicePort, pod)
986 if err != nil {
987 return err
988 }
989 err = testReachabilityOverClusterIP(ctx, clusterIP, servicePort, pod)
990 if err != nil {
991 return err
992 }
993 err = testReachabilityOverNodePorts(ctx, nodes, servicePort, pod, clusterIP, j.ExternalIPs)
994 if err != nil {
995 return err
996 }
997 }
998
999 return nil
1000 }
1001
1002
1003
1004 func (j *TestJig) checkExternalServiceReachability(ctx context.Context, svc *v1.Service, pod *v1.Pod) error {
1005
1006 svcName := fmt.Sprintf("%s.%s.svc.%s", svc.Name, svc.Namespace, framework.TestContext.ClusterDNSDomain)
1007
1008 cmd := fmt.Sprintf("nslookup %s", svcName)
1009 return wait.PollUntilContextTimeout(ctx, framework.Poll, ServiceReachabilityShortPollTimeout, true, func(ctx context.Context) (done bool, err error) {
1010 _, stderr, err := e2epodoutput.RunHostCmdWithFullOutput(pod.Namespace, pod.Name, cmd)
1011
1012
1013 if err != nil || (framework.NodeOSDistroIs("windows") && strings.Contains(stderr, fmt.Sprintf("can't find %s", svcName))) {
1014 framework.Logf("ExternalName service %q failed to resolve to IP", pod.Namespace+"/"+pod.Name)
1015 return false, nil
1016 }
1017 return true, nil
1018 })
1019 }
1020
1021
1022 func (j *TestJig) CheckServiceReachability(ctx context.Context, svc *v1.Service, pod *v1.Pod) error {
1023 svcType := svc.Spec.Type
1024
1025 _, err := j.sanityCheckService(svc, svcType)
1026 if err != nil {
1027 return err
1028 }
1029
1030 switch svcType {
1031 case v1.ServiceTypeClusterIP:
1032 return j.checkClusterIPServiceReachability(ctx, svc, pod)
1033 case v1.ServiceTypeNodePort:
1034 return j.checkNodePortServiceReachability(ctx, svc, pod)
1035 case v1.ServiceTypeExternalName:
1036 return j.checkExternalServiceReachability(ctx, svc, pod)
1037 case v1.ServiceTypeLoadBalancer:
1038 return j.checkClusterIPServiceReachability(ctx, svc, pod)
1039 default:
1040 return fmt.Errorf("unsupported service type \"%s\" to verify service reachability for \"%s\" service. This may due to diverse implementation of the service type", svcType, svc.Name)
1041 }
1042 }
1043
1044
1045 func (j *TestJig) CreateServicePods(ctx context.Context, replica int) error {
1046 config := testutils.RCConfig{
1047 Client: j.Client,
1048 Name: j.Name,
1049 Image: framework.ServeHostnameImage,
1050 Command: []string{"/agnhost", "serve-hostname", "--http=false", "--tcp", "--udp"},
1051 Namespace: j.Namespace,
1052 Labels: j.Labels,
1053 PollInterval: 3 * time.Second,
1054 Timeout: framework.PodReadyBeforeTimeout,
1055 Replicas: replica,
1056 }
1057 return e2erc.RunRC(ctx, config)
1058 }
1059
1060
1061
1062
1063 func (j *TestJig) CreateSCTPServiceWithPort(ctx context.Context, tweak func(svc *v1.Service), port int32) (*v1.Service, error) {
1064 svc := j.newServiceTemplate(v1.ProtocolSCTP, port)
1065 if tweak != nil {
1066 tweak(svc)
1067 }
1068 result, err := j.Client.CoreV1().Services(j.Namespace).Create(ctx, svc, metav1.CreateOptions{})
1069 if err != nil {
1070 return nil, fmt.Errorf("failed to create SCTP Service %q: %w", svc.Name, err)
1071 }
1072 return j.sanityCheckService(result, svc.Spec.Type)
1073 }
1074
1075
1076
1077 func (j *TestJig) CreateLoadBalancerServiceWaitForClusterIPOnly(tweak func(svc *v1.Service)) (*v1.Service, error) {
1078 ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=LoadBalancer")
1079 svc := j.newServiceTemplate(v1.ProtocolTCP, 80)
1080 svc.Spec.Type = v1.ServiceTypeLoadBalancer
1081
1082 svc.Spec.SessionAffinity = v1.ServiceAffinityNone
1083 if tweak != nil {
1084 tweak(svc)
1085 }
1086 result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
1087 if err != nil {
1088 return nil, fmt.Errorf("failed to create LoadBalancer Service %q: %w", svc.Name, err)
1089 }
1090
1091 return j.sanityCheckService(result, v1.ServiceTypeLoadBalancer)
1092 }
1093
View as plain text