1
16
17 package network
18
19 import (
20 "context"
21 "encoding/json"
22 "errors"
23 "fmt"
24 "math/rand"
25 "net"
26 "net/http"
27 "sort"
28 "strconv"
29 "strings"
30 "time"
31
32 appsv1 "k8s.io/api/apps/v1"
33 v1 "k8s.io/api/core/v1"
34 discoveryv1 "k8s.io/api/discovery/v1"
35
36 apierrors "k8s.io/apimachinery/pkg/api/errors"
37 "k8s.io/apimachinery/pkg/api/resource"
38 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 "k8s.io/apimachinery/pkg/labels"
40 "k8s.io/apimachinery/pkg/runtime/schema"
41 "k8s.io/apimachinery/pkg/types"
42 "k8s.io/apimachinery/pkg/util/intstr"
43 utilnet "k8s.io/apimachinery/pkg/util/net"
44 utilrand "k8s.io/apimachinery/pkg/util/rand"
45 "k8s.io/apimachinery/pkg/util/sets"
46 "k8s.io/apimachinery/pkg/util/wait"
47 watch "k8s.io/apimachinery/pkg/watch"
48 admissionapi "k8s.io/pod-security-admission/api"
49
50 clientset "k8s.io/client-go/kubernetes"
51 "k8s.io/client-go/tools/cache"
52 watchtools "k8s.io/client-go/tools/watch"
53 "k8s.io/client-go/util/retry"
54
55 cloudprovider "k8s.io/cloud-provider"
56 netutils "k8s.io/utils/net"
57 utilpointer "k8s.io/utils/pointer"
58
59 podutil "k8s.io/kubernetes/pkg/api/v1/pod"
60 "k8s.io/kubernetes/test/e2e/framework"
61 e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
62 e2eendpoints "k8s.io/kubernetes/test/e2e/framework/endpoints"
63 e2eendpointslice "k8s.io/kubernetes/test/e2e/framework/endpointslice"
64 e2enetwork "k8s.io/kubernetes/test/e2e/framework/network"
65 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
66 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
67 e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
68 e2eproviders "k8s.io/kubernetes/test/e2e/framework/providers"
69 e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
70 e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
71 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
72 "k8s.io/kubernetes/test/e2e/network/common"
73 testutils "k8s.io/kubernetes/test/utils"
74 imageutils "k8s.io/kubernetes/test/utils/image"
75
76 "github.com/onsi/ginkgo/v2"
77 "github.com/onsi/gomega"
78 )
79
80 const (
81 defaultServeHostnameServicePort = 80
82 defaultServeHostnameServiceName = "svc-hostname"
83
84
85
86
87
88 AffinityTimeout = 2 * time.Minute
89
90
91
92 AffinityConfirmCount = 15
93
94
95
96 SessionAffinityTimeout = 125
97
98
99 kubeProxyLabelName = "kube-proxy"
100 clusterAddonLabelKey = "k8s-app"
101 kubeAPIServerLabelName = "kube-apiserver"
102 clusterComponentKey = "component"
103
104 svcReadyTimeout = 1 * time.Minute
105 )
106
107 var (
108 defaultServeHostnameService = v1.Service{
109 ObjectMeta: metav1.ObjectMeta{
110 Name: defaultServeHostnameServiceName,
111 },
112 Spec: v1.ServiceSpec{
113 Ports: []v1.ServicePort{{
114 Port: int32(defaultServeHostnameServicePort),
115 TargetPort: intstr.FromInt32(9376),
116 Protocol: v1.ProtocolTCP,
117 }},
118 Selector: map[string]string{
119 "name": defaultServeHostnameServiceName,
120 },
121 },
122 }
123 )
124
125
126 type portsByPodName map[string][]int
127
128
129 type portsByPodUID map[types.UID][]int
130
131
132 type fullPortsByPodName map[string][]v1.ContainerPort
133
134
135 type fullPortsByPodUID map[types.UID][]v1.ContainerPort
136
137
138
139 func affinityCheckFromPod(execPod *v1.Pod, serviceIP string, servicePort int) (time.Duration, time.Duration, func() []string) {
140 timeout := AffinityTimeout
141
142 interval := 2 * AffinityConfirmCount * time.Second
143
144 serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
145 curl := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, serviceIPPort)
146 cmd := fmt.Sprintf("for i in $(seq 0 %d); do echo; %s ; done", AffinityConfirmCount, curl)
147 getHosts := func() []string {
148 stdout, err := e2eoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
149 if err != nil {
150 framework.Logf("Failed to get response from %s. Retry until timeout", serviceIPPort)
151 return nil
152 }
153 return strings.Split(stdout, "\n")
154 }
155
156 return interval, timeout, getHosts
157 }
158
159
160
161 func affinityCheckFromTest(ctx context.Context, cs clientset.Interface, serviceIP string, servicePort int) (time.Duration, time.Duration, func() []string) {
162 interval := 2 * time.Second
163 timeout := e2eservice.GetServiceLoadBalancerPropagationTimeout(ctx, cs)
164
165 params := &e2enetwork.HTTPPokeParams{Timeout: 2 * time.Second}
166 getHosts := func() []string {
167 var hosts []string
168 for i := 0; i < AffinityConfirmCount; i++ {
169 result := e2enetwork.PokeHTTP(serviceIP, servicePort, "", params)
170 if result.Status == e2enetwork.HTTPSuccess {
171 hosts = append(hosts, string(result.Body))
172 }
173 }
174 return hosts
175 }
176
177 return interval, timeout, getHosts
178 }
179
180
181
182
183
184
185 func checkAffinity(ctx context.Context, cs clientset.Interface, execPod *v1.Pod, serviceIP string, servicePort int, shouldHold bool) bool {
186 var interval, timeout time.Duration
187 var getHosts func() []string
188 if execPod != nil {
189 interval, timeout, getHosts = affinityCheckFromPod(execPod, serviceIP, servicePort)
190 } else {
191 interval, timeout, getHosts = affinityCheckFromTest(ctx, cs, serviceIP, servicePort)
192 }
193
194 var tracker affinityTracker
195 if pollErr := wait.PollImmediate(interval, timeout, func() (bool, error) {
196 hosts := getHosts()
197 for _, host := range hosts {
198 if len(host) > 0 {
199 tracker.recordHost(strings.TrimSpace(host))
200 }
201 }
202
203 trackerFulfilled, affinityHolds := tracker.checkHostTrace(AffinityConfirmCount)
204 if !trackerFulfilled {
205 return false, nil
206 }
207
208 if !shouldHold && !affinityHolds {
209 return true, nil
210 }
211 if shouldHold && affinityHolds {
212 return true, nil
213 }
214 return false, nil
215 }); pollErr != nil {
216 trackerFulfilled, _ := tracker.checkHostTrace(AffinityConfirmCount)
217 if !wait.Interrupted(pollErr) {
218 checkAffinityFailed(tracker, pollErr.Error())
219 return false
220 }
221 if !trackerFulfilled {
222 checkAffinityFailed(tracker, fmt.Sprintf("Connection timed out or not enough responses."))
223 }
224 if shouldHold {
225 checkAffinityFailed(tracker, "Affinity should hold but didn't.")
226 } else {
227 checkAffinityFailed(tracker, "Affinity shouldn't hold but did.")
228 }
229 return true
230 }
231 return true
232 }
233
234
235 type affinityTracker struct {
236 hostTrace []string
237 }
238
239
240 func (at *affinityTracker) recordHost(host string) {
241 at.hostTrace = append(at.hostTrace, host)
242 framework.Logf("Received response from host: %s", host)
243 }
244
245
246 func (at *affinityTracker) checkHostTrace(count int) (fulfilled, affinityHolds bool) {
247 fulfilled = (len(at.hostTrace) >= count)
248 if len(at.hostTrace) == 0 {
249 return fulfilled, true
250 }
251 last := at.hostTrace[0:]
252 if len(at.hostTrace)-count >= 0 {
253 last = at.hostTrace[len(at.hostTrace)-count:]
254 }
255 host := at.hostTrace[len(at.hostTrace)-1]
256 for _, h := range last {
257 if h != host {
258 return fulfilled, false
259 }
260 }
261 return fulfilled, true
262 }
263
264 func checkAffinityFailed(tracker affinityTracker, err string) {
265 framework.Logf("%v", tracker.hostTrace)
266 framework.Failf(err)
267 }
268
269
270
271 func StartServeHostnameService(ctx context.Context, c clientset.Interface, svc *v1.Service, ns string, replicas int) ([]string, string, error) {
272 podNames := make([]string, replicas)
273 name := svc.ObjectMeta.Name
274 ginkgo.By("creating service " + name + " in namespace " + ns)
275 _, err := c.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{})
276 if err != nil {
277 return podNames, "", err
278 }
279
280 var createdPods []*v1.Pod
281 maxContainerFailures := 0
282 config := testutils.RCConfig{
283 Client: c,
284 Image: framework.ServeHostnameImage,
285 Command: []string{"/agnhost", "serve-hostname"},
286 Name: name,
287 Namespace: ns,
288 PollInterval: 3 * time.Second,
289 Timeout: framework.PodReadyBeforeTimeout,
290 Replicas: replicas,
291 CreatedPods: &createdPods,
292 MaxContainerFailures: &maxContainerFailures,
293 }
294 err = e2erc.RunRC(ctx, config)
295 if err != nil {
296 return podNames, "", err
297 }
298
299 if len(createdPods) != replicas {
300 return podNames, "", fmt.Errorf("incorrect number of running pods: %v", len(createdPods))
301 }
302
303 for i := range createdPods {
304 podNames[i] = createdPods[i].ObjectMeta.Name
305 }
306 sort.StringSlice(podNames).Sort()
307
308 service, err := c.CoreV1().Services(ns).Get(ctx, name, metav1.GetOptions{})
309 if err != nil {
310 return podNames, "", err
311 }
312 if service.Spec.ClusterIP == "" {
313 return podNames, "", fmt.Errorf("service IP is blank for %v", name)
314 }
315 serviceIP := service.Spec.ClusterIP
316 return podNames, serviceIP, nil
317 }
318
319
320 func StopServeHostnameService(ctx context.Context, clientset clientset.Interface, ns, name string) error {
321 if err := e2erc.DeleteRCAndWaitForGC(ctx, clientset, ns, name); err != nil {
322 return err
323 }
324 if err := clientset.CoreV1().Services(ns).Delete(ctx, name, metav1.DeleteOptions{}); err != nil {
325 return err
326 }
327 return nil
328 }
329
330
331
332
333
334 func verifyServeHostnameServiceUp(ctx context.Context, c clientset.Interface, ns string, expectedPods []string, serviceIP string, servicePort int) error {
335
336 hostExecPod := launchHostExecPod(ctx, c, ns, "verify-service-up-host-exec-pod")
337
338
339 execPod := e2epod.CreateExecPodOrFail(ctx, c, ns, "verify-service-up-exec-pod-", nil)
340 defer func() {
341 e2epod.DeletePodOrFail(ctx, c, ns, hostExecPod.Name)
342 e2epod.DeletePodOrFail(ctx, c, ns, execPod.Name)
343 }()
344
345
346 cmdFunc := func(podName string) string {
347 wgetCmd := "wget -q -O -"
348
349 if !framework.NodeOSDistroIs("windows") {
350 wgetCmd += " -T 1"
351 }
352 serviceIPPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
353 cmd := fmt.Sprintf("for i in $(seq 1 %d); do %s http://%s 2>&1 || true; echo; done",
354 50*len(expectedPods), wgetCmd, serviceIPPort)
355 framework.Logf("Executing cmd %q in pod %v/%v", cmd, ns, podName)
356
357 output, err := e2eoutput.RunHostCmd(ns, podName, cmd)
358 if err != nil {
359 framework.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", cmd, ns, podName, err, output)
360 }
361 return output
362 }
363
364 expectedEndpoints := sets.NewString(expectedPods...)
365 ginkgo.By(fmt.Sprintf("verifying service has %d reachable backends", len(expectedPods)))
366 for _, podName := range []string{hostExecPod.Name, execPod.Name} {
367 passed := false
368 gotEndpoints := sets.NewString()
369
370
371 for start := time.Now(); time.Since(start) < e2eservice.KubeProxyLagTimeout; time.Sleep(5 * time.Second) {
372 for _, endpoint := range strings.Split(cmdFunc(podName), "\n") {
373 trimmedEp := strings.TrimSpace(endpoint)
374 if trimmedEp != "" {
375 gotEndpoints.Insert(trimmedEp)
376 }
377 }
378
379
380
381
382 if gotEndpoints.IsSuperset(expectedEndpoints) {
383 if !gotEndpoints.Equal(expectedEndpoints) {
384 framework.Logf("Ignoring unexpected output wgetting endpoints of service %s: %v", serviceIP, gotEndpoints.Difference(expectedEndpoints))
385 }
386 passed = true
387 break
388 }
389 framework.Logf("Unable to reach the following endpoints of service %s: %v", serviceIP, expectedEndpoints.Difference(gotEndpoints))
390 }
391 if !passed {
392
393 exp := expectedEndpoints.List()
394 got := gotEndpoints.List()
395 sort.StringSlice(exp).Sort()
396 sort.StringSlice(got).Sort()
397 return fmt.Errorf("service verification failed for: %s\nexpected %v\nreceived %v", serviceIP, exp, got)
398 }
399 }
400 return nil
401 }
402
403
404 func verifyServeHostnameServiceDown(ctx context.Context, c clientset.Interface, ns string, serviceIP string, servicePort int) error {
405
406 hostExecPod := launchHostExecPod(ctx, c, ns, "verify-service-down-host-exec-pod")
407 defer func() {
408 e2epod.DeletePodOrFail(ctx, c, ns, hostExecPod.Name)
409 }()
410
411 ipPort := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
412
413
414
415 command := fmt.Sprintf(
416 "curl -g -s --connect-timeout 2 http://%s && echo service-down-failed", ipPort)
417
418 for start := time.Now(); time.Since(start) < e2eservice.KubeProxyLagTimeout; time.Sleep(5 * time.Second) {
419 output, err := e2eoutput.RunHostCmd(ns, hostExecPod.Name, command)
420 if err != nil {
421 framework.Logf("error while kubectl execing %q in pod %v/%v: %v\nOutput: %v", command, ns, hostExecPod.Name, err, output)
422 }
423 if !strings.Contains(output, "service-down-failed") {
424 return nil
425 }
426 framework.Logf("service still alive - still waiting")
427 }
428
429 return fmt.Errorf("waiting for service to be down timed out")
430 }
431
432
433 func testNotReachableHTTP(host string, port int, timeout time.Duration) {
434 pollfn := func() (bool, error) {
435 result := e2enetwork.PokeHTTP(host, port, "/", nil)
436 if result.Code == 0 {
437 return true, nil
438 }
439 return false, nil
440 }
441
442 if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
443 framework.Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err)
444 }
445 }
446
447
448 func testRejectedHTTP(host string, port int, timeout time.Duration) {
449 pollfn := func() (bool, error) {
450 result := e2enetwork.PokeHTTP(host, port, "/", nil)
451 if result.Status == e2enetwork.HTTPRefused {
452 return true, nil
453 }
454 return false, nil
455 }
456
457 if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
458 framework.Failf("HTTP service %v:%v not rejected: %v", host, port, err)
459 }
460 }
461
462
463 type UDPPokeParams struct {
464 Timeout time.Duration
465 Response string
466 }
467
468
469 type UDPPokeResult struct {
470 Status UDPPokeStatus
471 Error error
472 Response []byte
473 }
474
475
476 type UDPPokeStatus string
477
478 const (
479
480 UDPSuccess UDPPokeStatus = "Success"
481
482 UDPError UDPPokeStatus = "UnknownError"
483
484 UDPTimeout UDPPokeStatus = "TimedOut"
485
486 UDPRefused UDPPokeStatus = "ConnectionRefused"
487
488 UDPBadResponse UDPPokeStatus = "BadResponse"
489
490 )
491
492
493
494
495
496
497
498
499
500
501
502 func pokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPokeResult {
503 hostPort := net.JoinHostPort(host, strconv.Itoa(port))
504 url := fmt.Sprintf("udp://%s", hostPort)
505
506 ret := UDPPokeResult{}
507
508
509
510 if host == "" {
511 framework.Failf("Got empty host for UDP poke (%s)", url)
512 return ret
513 }
514 if port == 0 {
515 framework.Failf("Got port==0 for UDP poke (%s)", url)
516 return ret
517 }
518
519
520 if params == nil {
521 params = &UDPPokeParams{}
522 }
523
524 framework.Logf("Poking %v", url)
525
526 con, err := net.Dial("udp", hostPort)
527 if err != nil {
528 ret.Status = UDPError
529 ret.Error = err
530 framework.Logf("Poke(%q): %v", url, err)
531 return ret
532 }
533
534 _, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
535 if err != nil {
536 ret.Error = err
537 neterr, ok := err.(net.Error)
538 if ok && neterr.Timeout() {
539 ret.Status = UDPTimeout
540 } else if strings.Contains(err.Error(), "connection refused") {
541 ret.Status = UDPRefused
542 } else {
543 ret.Status = UDPError
544 }
545 framework.Logf("Poke(%q): %v", url, err)
546 return ret
547 }
548
549 if params.Timeout != 0 {
550 err = con.SetDeadline(time.Now().Add(params.Timeout))
551 if err != nil {
552 ret.Status = UDPError
553 ret.Error = err
554 framework.Logf("Poke(%q): %v", url, err)
555 return ret
556 }
557 }
558
559 bufsize := len(params.Response) + 1
560 if bufsize == 0 {
561 bufsize = 4096
562 }
563 var buf = make([]byte, bufsize)
564 n, err := con.Read(buf)
565 if err != nil {
566 ret.Error = err
567 neterr, ok := err.(net.Error)
568 if ok && neterr.Timeout() {
569 ret.Status = UDPTimeout
570 } else if strings.Contains(err.Error(), "connection refused") {
571 ret.Status = UDPRefused
572 } else {
573 ret.Status = UDPError
574 }
575 framework.Logf("Poke(%q): %v", url, err)
576 return ret
577 }
578 ret.Response = buf[0:n]
579
580 if params.Response != "" && string(ret.Response) != params.Response {
581 ret.Status = UDPBadResponse
582 ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response))
583 framework.Logf("Poke(%q): %v", url, ret.Error)
584 return ret
585 }
586
587 ret.Status = UDPSuccess
588 framework.Logf("Poke(%q): success", url)
589 return ret
590 }
591
592
593 func testReachableUDP(host string, port int, timeout time.Duration) {
594 pollfn := func() (bool, error) {
595 result := pokeUDP(host, port, "echo hello", &UDPPokeParams{
596 Timeout: 3 * time.Second,
597 Response: "hello",
598 })
599 if result.Status == UDPSuccess {
600 return true, nil
601 }
602 return false, nil
603 }
604
605 if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
606 framework.Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
607 }
608 }
609
610
611 func testNotReachableUDP(host string, port int, timeout time.Duration) {
612 pollfn := func() (bool, error) {
613 result := pokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
614 if result.Status != UDPSuccess && result.Status != UDPError {
615 return true, nil
616 }
617 return false, nil
618 }
619 if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
620 framework.Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err)
621 }
622 }
623
624
625 func testRejectedUDP(host string, port int, timeout time.Duration) {
626 pollfn := func() (bool, error) {
627 result := pokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
628 if result.Status == UDPRefused {
629 return true, nil
630 }
631 return false, nil
632 }
633 if err := wait.PollImmediate(framework.Poll, timeout, pollfn); err != nil {
634 framework.Failf("UDP service %v:%v not rejected: %v", host, port, err)
635 }
636 }
637
638
639 func TestHTTPHealthCheckNodePort(host string, port int, request string, timeout time.Duration, expectSucceed bool, threshold int) error {
640 count := 0
641 condition := func() (bool, error) {
642 success, _ := testHTTPHealthCheckNodePort(host, port, request)
643 if success && expectSucceed ||
644 !success && !expectSucceed {
645 count++
646 }
647 if count >= threshold {
648 return true, nil
649 }
650 return false, nil
651 }
652
653 if err := wait.PollImmediate(time.Second, timeout, condition); err != nil {
654 return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v%v, got %d", threshold, expectSucceed, host, port, count)
655 }
656 return nil
657 }
658
659 func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, error) {
660 ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
661 url := fmt.Sprintf("http://%s%s", ipPort, request)
662 if ip == "" || port == 0 {
663 framework.Failf("Got empty IP for reachability check (%s)", url)
664 return false, fmt.Errorf("invalid input ip or port")
665 }
666 framework.Logf("Testing HTTP health check on %v", url)
667 resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second)
668 if err != nil {
669 framework.Logf("Got error testing for reachability of %s: %v", url, err)
670 return false, err
671 }
672 defer resp.Body.Close()
673 if err != nil {
674 framework.Logf("Got error reading response from %s: %v", url, err)
675 return false, err
676 }
677
678 if resp.StatusCode == 503 {
679 return false, nil
680 }
681
682 if resp.StatusCode == 200 {
683 return true, nil
684 }
685 return false, fmt.Errorf("unexpected HTTP response code %s from health check responder at %s", resp.Status, url)
686 }
687
688 func testHTTPHealthCheckNodePortFromTestContainer(ctx context.Context, config *e2enetwork.NetworkingTestConfig, host string, port int, timeout time.Duration, expectSucceed bool, threshold int) error {
689 count := 0
690 pollFn := func() (bool, error) {
691 statusCode, err := config.GetHTTPCodeFromTestContainer(ctx,
692 "/healthz",
693 host,
694 port)
695 if err != nil {
696 framework.Logf("Got error reading status code from http://%s:%d/healthz via test container: %v", host, port, err)
697 return false, nil
698 }
699 framework.Logf("Got status code from http://%s:%d/healthz via test container: %d", host, port, statusCode)
700 success := statusCode == 200
701 if (success && expectSucceed) ||
702 (!success && !expectSucceed) {
703 count++
704 }
705 return count >= threshold, nil
706 }
707 err := wait.PollImmediate(time.Second, timeout, pollFn)
708 if err != nil {
709 return fmt.Errorf("error waiting for healthCheckNodePort: expected at least %d succeed=%v on %v:%v/healthz, got %d", threshold, expectSucceed, host, port, count)
710 }
711 return nil
712 }
713
714
715
716 func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
717 tr := utilnet.SetTransportDefaults(&http.Transport{
718 DisableKeepAlives: true,
719 })
720 client := &http.Client{
721 Transport: tr,
722 Timeout: timeout,
723 }
724 return client.Get(url)
725 }
726
727 func getServeHostnameService(name string) *v1.Service {
728 svc := defaultServeHostnameService.DeepCopy()
729 svc.ObjectMeta.Name = name
730 svc.Spec.Selector["name"] = name
731 return svc
732 }
733
734
735 func waitForAPIServerUp(ctx context.Context, c clientset.Interface) error {
736 for start := time.Now(); time.Since(start) < time.Minute; time.Sleep(5 * time.Second) {
737 body, err := c.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(ctx).Raw()
738 if err == nil && string(body) == "ok" {
739 return nil
740 }
741 }
742 return fmt.Errorf("waiting for apiserver timed out")
743 }
744
745
746
747 func getEndpointNodesWithInternalIP(ctx context.Context, jig *e2eservice.TestJig) (map[string]string, error) {
748 nodesWithIPs, err := jig.GetEndpointNodesWithIP(ctx, v1.NodeInternalIP)
749 if err != nil {
750 return nil, err
751 }
752 endpointsNodeMap := make(map[string]string, len(nodesWithIPs))
753 for nodeName, internalIPs := range nodesWithIPs {
754 if len(internalIPs) < 1 {
755 return nil, fmt.Errorf("no internal ip found for node %s", nodeName)
756 }
757 endpointsNodeMap[nodeName] = internalIPs[0]
758 }
759 return endpointsNodeMap, nil
760 }
761
762 var _ = common.SIGDescribe("Services", func() {
763 f := framework.NewDefaultFramework("services")
764 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
765
766 var cs clientset.Interface
767
768 ginkgo.BeforeEach(func() {
769 cs = f.ClientSet
770 })
771
772
773
774
779 framework.ConformanceIt("should provide secure master service", func(ctx context.Context) {
780 _, err := cs.CoreV1().Services(metav1.NamespaceDefault).Get(ctx, "kubernetes", metav1.GetOptions{})
781 framework.ExpectNoError(err, "failed to fetch the service object for the service named kubernetes")
782 })
783
784
789 framework.ConformanceIt("should serve a basic endpoint from pods", func(ctx context.Context) {
790 serviceName := "endpoint-test2"
791 ns := f.Namespace.Name
792 jig := e2eservice.NewTestJig(cs, ns, serviceName)
793
794 ginkgo.By("creating service " + serviceName + " in namespace " + ns)
795 ginkgo.DeferCleanup(func(ctx context.Context) {
796 err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
797 framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
798 })
799 svc, err := jig.CreateTCPServiceWithPort(ctx, nil, 80)
800 framework.ExpectNoError(err)
801
802 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
803
804 names := map[string]bool{}
805 ginkgo.DeferCleanup(func(ctx context.Context) {
806 for name := range names {
807 err := cs.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
808 framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", name, ns)
809 }
810 })
811
812 name1 := "pod1"
813 name2 := "pod2"
814
815 createPodOrFail(ctx, f, ns, name1, jig.Labels, []v1.ContainerPort{{ContainerPort: 80}}, "netexec", "--http-port", "80")
816 names[name1] = true
817 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{name1: {80}})
818
819 ginkgo.By("Checking if the Service forwards traffic to pod1")
820 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
821 err = jig.CheckServiceReachability(ctx, svc, execPod)
822 framework.ExpectNoError(err)
823
824 createPodOrFail(ctx, f, ns, name2, jig.Labels, []v1.ContainerPort{{ContainerPort: 80}}, "netexec", "--http-port", "80")
825 names[name2] = true
826 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{name1: {80}, name2: {80}})
827
828 ginkgo.By("Checking if the Service forwards traffic to pod1 and pod2")
829 err = jig.CheckServiceReachability(ctx, svc, execPod)
830 framework.ExpectNoError(err)
831
832 e2epod.DeletePodOrFail(ctx, cs, ns, name1)
833 delete(names, name1)
834 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{name2: {80}})
835
836 ginkgo.By("Checking if the Service forwards traffic to pod2")
837 err = jig.CheckServiceReachability(ctx, svc, execPod)
838 framework.ExpectNoError(err)
839
840 e2epod.DeletePodOrFail(ctx, cs, ns, name2)
841 delete(names, name2)
842 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
843 })
844
845
850 framework.ConformanceIt("should serve multiport endpoints from pods", func(ctx context.Context) {
851
852 serviceName := "multi-endpoint-test"
853 ns := f.Namespace.Name
854 jig := e2eservice.NewTestJig(cs, ns, serviceName)
855
856 ginkgo.DeferCleanup(func(ctx context.Context) {
857 err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
858 framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
859 })
860
861 svc1port := "svc1"
862 svc2port := "svc2"
863
864 ginkgo.By("creating service " + serviceName + " in namespace " + ns)
865 svc, err := jig.CreateTCPService(ctx, func(service *v1.Service) {
866 service.Spec.Ports = []v1.ServicePort{
867 {
868 Name: "portname1",
869 Port: 80,
870 TargetPort: intstr.FromString(svc1port),
871 },
872 {
873 Name: "portname2",
874 Port: 81,
875 TargetPort: intstr.FromString(svc2port),
876 },
877 }
878 })
879 framework.ExpectNoError(err)
880
881 port1 := 100
882 port2 := 101
883 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
884
885 names := map[string]bool{}
886 ginkgo.DeferCleanup(func(ctx context.Context) {
887 for name := range names {
888 err := cs.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
889 framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", name, ns)
890 }
891 })
892
893 containerPorts1 := []v1.ContainerPort{
894 {
895 Name: svc1port,
896 ContainerPort: int32(port1),
897 },
898 }
899 containerPorts2 := []v1.ContainerPort{
900 {
901 Name: svc2port,
902 ContainerPort: int32(port2),
903 },
904 }
905
906 podname1 := "pod1"
907 podname2 := "pod2"
908
909 createPodOrFail(ctx, f, ns, podname1, jig.Labels, containerPorts1, "netexec", "--http-port", strconv.Itoa(port1))
910 names[podname1] = true
911 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podname1: {port1}})
912
913 createPodOrFail(ctx, f, ns, podname2, jig.Labels, containerPorts2, "netexec", "--http-port", strconv.Itoa(port2))
914 names[podname2] = true
915 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podname1: {port1}, podname2: {port2}})
916
917 ginkgo.By("Checking if the Service forwards traffic to pods")
918 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
919 err = jig.CheckServiceReachability(ctx, svc, execPod)
920 framework.ExpectNoError(err)
921
922 e2epod.DeletePodOrFail(ctx, cs, ns, podname1)
923 delete(names, podname1)
924 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podname2: {port2}})
925
926 e2epod.DeletePodOrFail(ctx, cs, ns, podname2)
927 delete(names, podname2)
928 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
929 })
930
931 ginkgo.It("should be updated after adding or deleting ports ", func(ctx context.Context) {
932 serviceName := "edit-port-test"
933 ns := f.Namespace.Name
934 jig := e2eservice.NewTestJig(cs, ns, serviceName)
935
936 svc1port := "svc1"
937 ginkgo.By("creating service " + serviceName + " in namespace " + ns)
938 svc, err := jig.CreateTCPService(ctx, func(service *v1.Service) {
939 service.Spec.Ports = []v1.ServicePort{
940 {
941 Name: "portname1",
942 Port: 80,
943 TargetPort: intstr.FromString(svc1port),
944 },
945 }
946 })
947 framework.ExpectNoError(err)
948 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
949
950 podname1 := "pod1"
951 port1 := 100
952 containerPorts1 := []v1.ContainerPort{
953 {
954 Name: svc1port,
955 ContainerPort: int32(port1),
956 },
957 }
958 createPodOrFail(ctx, f, ns, podname1, jig.Labels, containerPorts1, "netexec", "--http-port", strconv.Itoa(port1))
959 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podname1: {port1}})
960
961 ginkgo.By("Checking if the Service " + serviceName + " forwards traffic to " + podname1)
962 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
963 err = jig.CheckServiceReachability(ctx, svc, execPod)
964 framework.ExpectNoError(err)
965
966 ginkgo.By("Adding a new port to service " + serviceName)
967 svc2port := "svc2"
968 svc, err = jig.UpdateService(ctx, func(s *v1.Service) {
969 s.Spec.Ports = []v1.ServicePort{
970 {
971 Name: "portname1",
972 Port: 80,
973 TargetPort: intstr.FromString(svc1port),
974 },
975 {
976 Name: "portname2",
977 Port: 81,
978 TargetPort: intstr.FromString(svc2port),
979 },
980 }
981 })
982 framework.ExpectNoError(err)
983
984 ginkgo.By("Adding a new endpoint to the new port ")
985 podname2 := "pod2"
986 port2 := 101
987 containerPorts2 := []v1.ContainerPort{
988 {
989 Name: svc2port,
990 ContainerPort: int32(port2),
991 },
992 }
993 createPodOrFail(ctx, f, ns, podname2, jig.Labels, containerPorts2, "netexec", "--http-port", strconv.Itoa(port2))
994 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podname1: {port1}, podname2: {port2}})
995
996 ginkgo.By("Checking if the Service forwards traffic to " + podname1 + " and " + podname2)
997 err = jig.CheckServiceReachability(ctx, svc, execPod)
998 framework.ExpectNoError(err)
999
1000 ginkgo.By("Deleting a port from service " + serviceName)
1001 svc, err = jig.UpdateService(ctx, func(s *v1.Service) {
1002 s.Spec.Ports = []v1.ServicePort{
1003 {
1004 Name: "portname1",
1005 Port: 80,
1006 TargetPort: intstr.FromString(svc1port),
1007 },
1008 }
1009 })
1010 framework.ExpectNoError(err)
1011
1012 ginkgo.By("Checking if the Service forwards traffic to " + podname1 + " and not forwards to " + podname2)
1013 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{podname1: {port1}})
1014 err = jig.CheckServiceReachability(ctx, svc, execPod)
1015 framework.ExpectNoError(err)
1016 })
1017
1018 ginkgo.It("should preserve source pod IP for traffic thru service cluster IP [LinuxOnly]", func(ctx context.Context) {
1019
1020 e2eskipper.SkipIfNodeOSDistroIs("windows")
1021
1022 serviceName := "sourceip-test"
1023 ns := f.Namespace.Name
1024
1025 ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns)
1026 jig := e2eservice.NewTestJig(cs, ns, serviceName)
1027 jig.ExternalIPs = false
1028 servicePort := 8080
1029 tcpService, err := jig.CreateTCPServiceWithPort(ctx, nil, int32(servicePort))
1030 framework.ExpectNoError(err)
1031 ginkgo.DeferCleanup(func(ctx context.Context) {
1032 framework.Logf("Cleaning up the sourceip test service")
1033 err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
1034 framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
1035 })
1036 serviceIP := tcpService.Spec.ClusterIP
1037 framework.Logf("sourceip-test cluster ip: %s", serviceIP)
1038
1039 ginkgo.By("Picking 2 Nodes to test whether source IP is preserved or not")
1040 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
1041 framework.ExpectNoError(err)
1042 nodeCounts := len(nodes.Items)
1043 if nodeCounts < 2 {
1044 e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
1045 }
1046
1047 ginkgo.By("Creating a webserver pod to be part of the TCP service which echoes back source ip")
1048 serverPodName := "echo-sourceip"
1049 pod := e2epod.NewAgnhostPod(ns, serverPodName, nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort))
1050 pod.Labels = jig.Labels
1051 _, err = cs.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
1052 framework.ExpectNoError(err)
1053 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout))
1054 ginkgo.DeferCleanup(func(ctx context.Context) {
1055 framework.Logf("Cleaning up the echo server pod")
1056 err := cs.CoreV1().Pods(ns).Delete(ctx, serverPodName, metav1.DeleteOptions{})
1057 framework.ExpectNoError(err, "failed to delete pod: %s on node", serverPodName)
1058 })
1059
1060 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{serverPodName: {servicePort}})
1061
1062 ginkgo.By("Creating pause pod deployment")
1063 deployment := createPausePodDeployment(ctx, cs, "pause-pod", ns, nodeCounts)
1064
1065 ginkgo.DeferCleanup(func(ctx context.Context) {
1066 framework.Logf("Deleting deployment")
1067 err = cs.AppsV1().Deployments(ns).Delete(ctx, deployment.Name, metav1.DeleteOptions{})
1068 framework.ExpectNoError(err, "Failed to delete deployment %s", deployment.Name)
1069 })
1070
1071 framework.ExpectNoError(e2edeployment.WaitForDeploymentComplete(cs, deployment), "Failed to complete pause pod deployment")
1072
1073 deployment, err = cs.AppsV1().Deployments(ns).Get(ctx, deployment.Name, metav1.GetOptions{})
1074 framework.ExpectNoError(err, "Error in retrieving pause pod deployment")
1075 labelSelector, err := metav1.LabelSelectorAsSelector(deployment.Spec.Selector)
1076
1077 pausePods, err := cs.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{LabelSelector: labelSelector.String()})
1078 framework.ExpectNoError(err, "Error in listing pods associated with pause pod deployments")
1079
1080 gomega.Expect(pausePods.Items[0].Spec.NodeName).NotTo(gomega.Equal(pausePods.Items[1].Spec.NodeName))
1081
1082 serviceAddress := net.JoinHostPort(serviceIP, strconv.Itoa(servicePort))
1083
1084 for _, pausePod := range pausePods.Items {
1085 sourceIP, execPodIP := execSourceIPTest(pausePod, serviceAddress)
1086 ginkgo.By("Verifying the preserved source ip")
1087 gomega.Expect(sourceIP).To(gomega.Equal(execPodIP))
1088 }
1089 })
1090
1091 ginkgo.It("should allow pods to hairpin back to themselves through services", func(ctx context.Context) {
1092 serviceName := "hairpin-test"
1093 ns := f.Namespace.Name
1094
1095 ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns)
1096 jig := e2eservice.NewTestJig(cs, ns, serviceName)
1097 jig.ExternalIPs = false
1098 servicePort := 8080
1099 svc, err := jig.CreateTCPServiceWithPort(ctx, nil, int32(servicePort))
1100 framework.ExpectNoError(err)
1101 serviceIP := svc.Spec.ClusterIP
1102 framework.Logf("hairpin-test cluster ip: %s", serviceIP)
1103
1104 ginkgo.By("creating a client/server pod")
1105 serverPodName := "hairpin"
1106 podTemplate := e2epod.NewAgnhostPod(ns, serverPodName, nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort))
1107 podTemplate.Labels = jig.Labels
1108 pod, err := cs.CoreV1().Pods(ns).Create(ctx, podTemplate, metav1.CreateOptions{})
1109 framework.ExpectNoError(err)
1110 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pod.Name, f.Namespace.Name, framework.PodStartTimeout))
1111
1112 ginkgo.By("waiting for the service to expose an endpoint")
1113 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{serverPodName: {servicePort}})
1114
1115 ginkgo.By("Checking if the pod can reach itself")
1116 err = jig.CheckServiceReachability(ctx, svc, pod)
1117 framework.ExpectNoError(err)
1118 })
1119
1120 ginkgo.It("should be able to up and down services", func(ctx context.Context) {
1121 ns := f.Namespace.Name
1122 numPods, servicePort := 3, defaultServeHostnameServicePort
1123
1124 svc1 := "up-down-1"
1125 svc2 := "up-down-2"
1126 svc3 := "up-down-3"
1127
1128 ginkgo.By("creating " + svc1 + " in namespace " + ns)
1129 podNames1, svc1IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc1), ns, numPods)
1130 framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc1, ns)
1131 ginkgo.By("creating " + svc2 + " in namespace " + ns)
1132 podNames2, svc2IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc2), ns, numPods)
1133 framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc2, ns)
1134
1135 ginkgo.By("verifying service " + svc1 + " is up")
1136 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames1, svc1IP, servicePort))
1137
1138 ginkgo.By("verifying service " + svc2 + " is up")
1139 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames2, svc2IP, servicePort))
1140
1141
1142 ginkgo.By("stopping service " + svc1)
1143 framework.ExpectNoError(StopServeHostnameService(ctx, f.ClientSet, ns, svc1))
1144
1145 ginkgo.By("verifying service " + svc1 + " is not up")
1146 framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svc1IP, servicePort))
1147 ginkgo.By("verifying service " + svc2 + " is still up")
1148 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames2, svc2IP, servicePort))
1149
1150
1151 ginkgo.By("creating service " + svc3 + " in namespace " + ns)
1152 podNames3, svc3IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc3), ns, numPods)
1153 framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc3, ns)
1154
1155 if svc2IP == svc3IP {
1156 framework.Failf("service IPs conflict: %v", svc2IP)
1157 }
1158
1159 ginkgo.By("verifying service " + svc2 + " is still up")
1160 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames2, svc2IP, servicePort))
1161
1162 ginkgo.By("verifying service " + svc3 + " is up")
1163 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames3, svc3IP, servicePort))
1164 })
1165
1166 ginkgo.It("should work after the service has been recreated", func(ctx context.Context) {
1167 serviceName := "service-deletion"
1168 ns := f.Namespace.Name
1169 numPods, servicePort := 1, defaultServeHostnameServicePort
1170
1171 ginkgo.By("creating the service " + serviceName + " in namespace " + ns)
1172 ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, serviceName)
1173 podNames, svcIP, _ := StartServeHostnameService(ctx, cs, getServeHostnameService(serviceName), ns, numPods)
1174 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames, svcIP, servicePort))
1175
1176 ginkgo.By("deleting the service " + serviceName + " in namespace " + ns)
1177 err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
1178 framework.ExpectNoError(err)
1179
1180 ginkgo.By("Waiting for the service " + serviceName + " in namespace " + ns + " to disappear")
1181 if pollErr := wait.PollImmediate(framework.Poll, e2eservice.RespondingTimeout, func() (bool, error) {
1182 _, err := cs.CoreV1().Services(ns).Get(ctx, serviceName, metav1.GetOptions{})
1183 if err != nil {
1184 if apierrors.IsNotFound(err) {
1185 framework.Logf("Service %s/%s is gone.", ns, serviceName)
1186 return true, nil
1187 }
1188 return false, err
1189 }
1190 framework.Logf("Service %s/%s still exists", ns, serviceName)
1191 return false, nil
1192 }); pollErr != nil {
1193 framework.Failf("Failed to wait for service to disappear: %v", pollErr)
1194 }
1195
1196 ginkgo.By("recreating the service " + serviceName + " in namespace " + ns)
1197 svc, err := cs.CoreV1().Services(ns).Create(ctx, getServeHostnameService(serviceName), metav1.CreateOptions{})
1198 framework.ExpectNoError(err)
1199 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames, svc.Spec.ClusterIP, servicePort))
1200 })
1201
1202 f.It("should work after restarting kube-proxy", f.WithDisruptive(), func(ctx context.Context) {
1203 kubeProxyLabelSet := map[string]string{clusterAddonLabelKey: kubeProxyLabelName}
1204 e2eskipper.SkipUnlessComponentRunsAsPodsAndClientCanDeleteThem(ctx, kubeProxyLabelName, cs, metav1.NamespaceSystem, kubeProxyLabelSet)
1205
1206
1207 ns := f.Namespace.Name
1208 numPods, servicePort := 3, defaultServeHostnameServicePort
1209
1210 svc1 := "restart-proxy-1"
1211 svc2 := "restart-proxy-2"
1212
1213 ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, svc1)
1214 podNames1, svc1IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc1), ns, numPods)
1215 framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc1, ns)
1216
1217 ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, svc2)
1218 podNames2, svc2IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc2), ns, numPods)
1219 framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc2, ns)
1220
1221 if svc1IP == svc2IP {
1222 framework.Failf("VIPs conflict: %v", svc1IP)
1223 }
1224
1225 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames1, svc1IP, servicePort))
1226 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames2, svc2IP, servicePort))
1227
1228 if err := restartComponent(ctx, cs, kubeProxyLabelName, metav1.NamespaceSystem, kubeProxyLabelSet); err != nil {
1229 framework.Failf("error restarting kube-proxy: %v", err)
1230 }
1231 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames1, svc1IP, servicePort))
1232 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames2, svc2IP, servicePort))
1233 })
1234
1235 f.It("should work after restarting apiserver", f.WithDisruptive(), func(ctx context.Context) {
1236
1237 if !framework.ProviderIs("gke") {
1238 e2eskipper.SkipUnlessComponentRunsAsPodsAndClientCanDeleteThem(ctx, kubeAPIServerLabelName, cs, metav1.NamespaceSystem, map[string]string{clusterComponentKey: kubeAPIServerLabelName})
1239 }
1240
1241
1242 ns := f.Namespace.Name
1243 numPods, servicePort := 3, defaultServeHostnameServicePort
1244
1245 svc1 := "restart-apiserver-1"
1246 svc2 := "restart-apiserver-2"
1247
1248 ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, svc1)
1249 podNames1, svc1IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc1), ns, numPods)
1250 framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc1, ns)
1251
1252 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames1, svc1IP, servicePort))
1253
1254
1255 ginkgo.By("Restarting apiserver")
1256 if err := restartApiserver(ctx, ns, cs); err != nil {
1257 framework.Failf("error restarting apiserver: %v", err)
1258 }
1259 ginkgo.By("Waiting for apiserver to come up by polling /healthz")
1260 if err := waitForAPIServerUp(ctx, cs); err != nil {
1261 framework.Failf("error while waiting for apiserver up: %v", err)
1262 }
1263 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames1, svc1IP, servicePort))
1264
1265
1266 ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, svc2)
1267 podNames2, svc2IP, err := StartServeHostnameService(ctx, cs, getServeHostnameService(svc2), ns, numPods)
1268 framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svc2, ns)
1269
1270 if svc1IP == svc2IP {
1271 framework.Failf("VIPs conflict: %v", svc1IP)
1272 }
1273 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames1, svc1IP, servicePort))
1274 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podNames2, svc2IP, servicePort))
1275 })
1276
1277
1284 framework.ConformanceIt("should be able to create a functioning NodePort service", func(ctx context.Context) {
1285 serviceName := "nodeport-test"
1286 ns := f.Namespace.Name
1287
1288 jig := e2eservice.NewTestJig(cs, ns, serviceName)
1289
1290 ginkgo.By("creating service " + serviceName + " with type=NodePort in namespace " + ns)
1291 nodePortService, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
1292 svc.Spec.Type = v1.ServiceTypeNodePort
1293 svc.Spec.Ports = []v1.ServicePort{
1294 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(9376)},
1295 }
1296 })
1297 framework.ExpectNoError(err)
1298 err = jig.CreateServicePods(ctx, 2)
1299 framework.ExpectNoError(err)
1300 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
1301 err = jig.CheckServiceReachability(ctx, nodePortService, execPod)
1302 framework.ExpectNoError(err)
1303 })
1304
1305
1310 ginkgo.It("should be possible to connect to a service via ExternalIP when the external IP is not assigned to a node", func(ctx context.Context) {
1311 serviceName := "externalip-test"
1312 ns := f.Namespace.Name
1313 externalIP := "203.0.113.250"
1314 if framework.TestContext.ClusterIsIPv6() {
1315 externalIP = "2001:DB8::cb00:71fa"
1316 }
1317
1318 jig := e2eservice.NewTestJig(cs, ns, serviceName)
1319 jig.ExternalIPs = false
1320
1321 ginkgo.By("creating service " + serviceName + " with type=clusterIP in namespace " + ns)
1322 clusterIPService, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
1323 svc.Spec.Type = v1.ServiceTypeClusterIP
1324 svc.Spec.ExternalIPs = []string{externalIP}
1325 svc.Spec.Ports = []v1.ServicePort{
1326 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(9376)},
1327 }
1328 })
1329 if err != nil && strings.Contains(err.Error(), "Use of external IPs is denied by admission control") {
1330 e2eskipper.Skipf("Admission controller to deny services with external IPs is enabled - skip.")
1331 }
1332 framework.ExpectNoError(err)
1333 err = jig.CreateServicePods(ctx, 2)
1334 framework.ExpectNoError(err)
1335 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
1336 err = jig.CheckServiceReachability(ctx, clusterIPService, execPod)
1337 framework.ExpectNoError(err)
1338 })
1339
1340
1347 ginkgo.It("should be able to update service type to NodePort listening on same port number but different protocols", func(ctx context.Context) {
1348 serviceName := "nodeport-update-service"
1349 ns := f.Namespace.Name
1350 jig := e2eservice.NewTestJig(cs, ns, serviceName)
1351 jig.ExternalIPs = false
1352
1353 ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP in namespace " + ns)
1354 tcpService, err := jig.CreateTCPService(ctx, nil)
1355 framework.ExpectNoError(err)
1356 ginkgo.DeferCleanup(func(ctx context.Context) {
1357 framework.Logf("Cleaning up the updating NodePorts test service")
1358 err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
1359 framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
1360 })
1361 framework.Logf("Service Port TCP: %v", tcpService.Spec.Ports[0].Port)
1362
1363 ginkgo.By("changing the TCP service to type=NodePort")
1364 nodePortService, err := jig.UpdateService(ctx, func(s *v1.Service) {
1365 s.Spec.Type = v1.ServiceTypeNodePort
1366 s.Spec.Ports = []v1.ServicePort{
1367 {
1368 Name: "tcp-port",
1369 Port: 80,
1370 Protocol: v1.ProtocolTCP,
1371 TargetPort: intstr.FromInt32(9376),
1372 },
1373 }
1374 })
1375 framework.ExpectNoError(err)
1376
1377 err = jig.CreateServicePods(ctx, 2)
1378 framework.ExpectNoError(err)
1379 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
1380 err = jig.CheckServiceReachability(ctx, nodePortService, execPod)
1381 framework.ExpectNoError(err)
1382
1383 ginkgo.By("Updating NodePort service to listen TCP and UDP based requests over same Port")
1384 nodePortService, err = jig.UpdateService(ctx, func(s *v1.Service) {
1385 s.Spec.Type = v1.ServiceTypeNodePort
1386 s.Spec.Ports = []v1.ServicePort{
1387 {
1388 Name: "tcp-port",
1389 Port: 80,
1390 Protocol: v1.ProtocolTCP,
1391 TargetPort: intstr.FromInt32(9376),
1392 },
1393 {
1394 Name: "udp-port",
1395 Port: 80,
1396 Protocol: v1.ProtocolUDP,
1397 TargetPort: intstr.FromInt32(9376),
1398 },
1399 }
1400 })
1401 framework.ExpectNoError(err)
1402 err = jig.CheckServiceReachability(ctx, nodePortService, execPod)
1403 framework.ExpectNoError(err)
1404 nodePortCounts := len(nodePortService.Spec.Ports)
1405 gomega.Expect(nodePortCounts).To(gomega.Equal(2), "updated service should have two Ports but found %d Ports", nodePortCounts)
1406
1407 for _, port := range nodePortService.Spec.Ports {
1408 gomega.Expect(port.NodePort).ToNot(gomega.BeZero(), "NodePort service failed to allocate NodePort for Port %s", port.Name)
1409 framework.Logf("NodePort service allocates NodePort: %d for Port: %s over Protocol: %s", port.NodePort, port.Name, port.Protocol)
1410 }
1411 })
1412
1413
1420 framework.ConformanceIt("should be able to change the type from ExternalName to ClusterIP", func(ctx context.Context) {
1421 serviceName := "externalname-service"
1422 ns := f.Namespace.Name
1423 jig := e2eservice.NewTestJig(cs, ns, serviceName)
1424
1425 ginkgo.By("creating a service " + serviceName + " with the type=ExternalName in namespace " + ns)
1426 _, err := jig.CreateExternalNameService(ctx, nil)
1427 framework.ExpectNoError(err)
1428 ginkgo.DeferCleanup(func(ctx context.Context) {
1429 framework.Logf("Cleaning up the ExternalName to ClusterIP test service")
1430 err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
1431 framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns)
1432 })
1433
1434 ginkgo.By("changing the ExternalName service to type=ClusterIP")
1435 clusterIPService, err := jig.UpdateService(ctx, func(s *v1.Service) {
1436 s.Spec.Type = v1.ServiceTypeClusterIP
1437 s.Spec.ExternalName = ""
1438 s.Spec.Ports = []v1.ServicePort{
1439 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(9376)},
1440 }
1441 })
1442 framework.ExpectNoError(err)
1443
1444 err = jig.CreateServicePods(ctx, 2)
1445 framework.ExpectNoError(err)
1446 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
1447 err = jig.CheckServiceReachability(ctx, clusterIPService, execPod)
1448 framework.ExpectNoError(err)
1449 })
1450
1451
1459 framework.ConformanceIt("should be able to change the type from ExternalName to NodePort", func(ctx context.Context) {
1460 serviceName := "externalname-service"
1461 ns := f.Namespace.Name
1462 jig := e2eservice.NewTestJig(cs, ns, serviceName)
1463
1464 ginkgo.By("creating a service " + serviceName + " with the type=ExternalName in namespace " + ns)
1465 _, err := jig.CreateExternalNameService(ctx, nil)
1466 framework.ExpectNoError(err)
1467 ginkgo.DeferCleanup(func(ctx context.Context) {
1468 framework.Logf("Cleaning up the ExternalName to NodePort test service")
1469 err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
1470 framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns)
1471 })
1472
1473 ginkgo.By("changing the ExternalName service to type=NodePort")
1474 nodePortService, err := jig.UpdateService(ctx, func(s *v1.Service) {
1475 s.Spec.Type = v1.ServiceTypeNodePort
1476 s.Spec.ExternalName = ""
1477 s.Spec.Ports = []v1.ServicePort{
1478 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(9376)},
1479 }
1480 })
1481 framework.ExpectNoError(err)
1482 err = jig.CreateServicePods(ctx, 2)
1483 framework.ExpectNoError(err)
1484
1485 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
1486 err = jig.CheckServiceReachability(ctx, nodePortService, execPod)
1487 framework.ExpectNoError(err)
1488 })
1489
1490
1497 framework.ConformanceIt("should be able to change the type from ClusterIP to ExternalName", func(ctx context.Context) {
1498 serviceName := "clusterip-service"
1499 ns := f.Namespace.Name
1500 jig := e2eservice.NewTestJig(cs, ns, serviceName)
1501
1502 ginkgo.By("creating a service " + serviceName + " with the type=ClusterIP in namespace " + ns)
1503 _, err := jig.CreateTCPService(ctx, nil)
1504 framework.ExpectNoError(err)
1505 ginkgo.DeferCleanup(func(ctx context.Context) {
1506 framework.Logf("Cleaning up the ClusterIP to ExternalName test service")
1507 err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
1508 framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns)
1509 })
1510
1511 ginkgo.By("Creating active service to test reachability when its FQDN is referred as externalName for another service")
1512 externalServiceName := "externalsvc"
1513 externalServiceFQDN := createAndGetExternalServiceFQDN(ctx, cs, ns, externalServiceName)
1514 ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, externalServiceName)
1515
1516 ginkgo.By("changing the ClusterIP service to type=ExternalName")
1517 externalNameService, err := jig.UpdateService(ctx, func(s *v1.Service) {
1518 s.Spec.Type = v1.ServiceTypeExternalName
1519 s.Spec.ExternalName = externalServiceFQDN
1520 })
1521 framework.ExpectNoError(err)
1522 if externalNameService.Spec.ClusterIP != "" {
1523 framework.Failf("Spec.ClusterIP was not cleared")
1524 }
1525 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
1526 err = jig.CheckServiceReachability(ctx, externalNameService, execPod)
1527 framework.ExpectNoError(err)
1528 })
1529
1530
1537 framework.ConformanceIt("should be able to change the type from NodePort to ExternalName", func(ctx context.Context) {
1538 serviceName := "nodeport-service"
1539 ns := f.Namespace.Name
1540 jig := e2eservice.NewTestJig(cs, ns, serviceName)
1541
1542 ginkgo.By("creating a service " + serviceName + " with the type=NodePort in namespace " + ns)
1543 _, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
1544 svc.Spec.Type = v1.ServiceTypeNodePort
1545 })
1546 framework.ExpectNoError(err)
1547 ginkgo.DeferCleanup(func(ctx context.Context) {
1548 framework.Logf("Cleaning up the NodePort to ExternalName test service")
1549 err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
1550 framework.ExpectNoError(err, "failed to delete service %s in namespace %s", serviceName, ns)
1551 })
1552
1553 ginkgo.By("Creating active service to test reachability when its FQDN is referred as externalName for another service")
1554 externalServiceName := "externalsvc"
1555 externalServiceFQDN := createAndGetExternalServiceFQDN(ctx, cs, ns, externalServiceName)
1556 ginkgo.DeferCleanup(StopServeHostnameService, f.ClientSet, ns, externalServiceName)
1557
1558 ginkgo.By("changing the NodePort service to type=ExternalName")
1559 externalNameService, err := jig.UpdateService(ctx, func(s *v1.Service) {
1560 s.Spec.Type = v1.ServiceTypeExternalName
1561 s.Spec.ExternalName = externalServiceFQDN
1562 })
1563 framework.ExpectNoError(err)
1564 if externalNameService.Spec.ClusterIP != "" {
1565 framework.Failf("Spec.ClusterIP was not cleared")
1566 }
1567 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
1568 err = jig.CheckServiceReachability(ctx, externalNameService, execPod)
1569 framework.ExpectNoError(err)
1570 })
1571
1572 ginkgo.It("should prevent NodePort collisions", func(ctx context.Context) {
1573
1574 baseName := "nodeport-collision-"
1575 serviceName1 := baseName + "1"
1576 serviceName2 := baseName + "2"
1577 ns := f.Namespace.Name
1578
1579 t := NewServerTest(cs, ns, serviceName1)
1580 defer func() {
1581 defer ginkgo.GinkgoRecover()
1582 errs := t.Cleanup()
1583 if len(errs) != 0 {
1584 framework.Failf("errors in cleanup: %v", errs)
1585 }
1586 }()
1587
1588 ginkgo.By("creating service " + serviceName1 + " with type NodePort in namespace " + ns)
1589 service := t.BuildServiceSpec()
1590 service.Spec.Type = v1.ServiceTypeNodePort
1591 result, err := t.CreateService(service)
1592 framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName1, ns)
1593
1594 if result.Spec.Type != v1.ServiceTypeNodePort {
1595 framework.Failf("got unexpected Spec.Type for new service: %v", result)
1596 }
1597 if len(result.Spec.Ports) != 1 {
1598 framework.Failf("got unexpected len(Spec.Ports) for new service: %v", result)
1599 }
1600 port := result.Spec.Ports[0]
1601 if port.NodePort == 0 {
1602 framework.Failf("got unexpected Spec.Ports[0].NodePort for new service: %v", result)
1603 }
1604
1605 ginkgo.By("creating service " + serviceName2 + " with conflicting NodePort")
1606 service2 := t.BuildServiceSpec()
1607 service2.Name = serviceName2
1608 service2.Spec.Type = v1.ServiceTypeNodePort
1609 service2.Spec.Ports[0].NodePort = port.NodePort
1610 result2, err := t.CreateService(service2)
1611 if err == nil {
1612 framework.Failf("Created service with conflicting NodePort: %v", result2)
1613 }
1614 expectedErr := fmt.Sprintf("%d.*port is already allocated", port.NodePort)
1615 gomega.Expect(fmt.Sprintf("%v", err)).To(gomega.MatchRegexp(expectedErr))
1616
1617 ginkgo.By("deleting service " + serviceName1 + " to release NodePort")
1618 err = t.DeleteService(serviceName1)
1619 framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName1, ns)
1620
1621 ginkgo.By("creating service " + serviceName2 + " with no-longer-conflicting NodePort")
1622 _, err = t.CreateService(service2)
1623 framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName1, ns)
1624 })
1625
1626 ginkgo.It("should check NodePort out-of-range", func(ctx context.Context) {
1627
1628 serviceName := "nodeport-range-test"
1629 ns := f.Namespace.Name
1630
1631 t := NewServerTest(cs, ns, serviceName)
1632 defer func() {
1633 defer ginkgo.GinkgoRecover()
1634 errs := t.Cleanup()
1635 if len(errs) != 0 {
1636 framework.Failf("errors in cleanup: %v", errs)
1637 }
1638 }()
1639
1640 service := t.BuildServiceSpec()
1641 service.Spec.Type = v1.ServiceTypeNodePort
1642
1643 ginkgo.By("creating service " + serviceName + " with type NodePort in namespace " + ns)
1644 service, err := t.CreateService(service)
1645 framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)
1646
1647 if service.Spec.Type != v1.ServiceTypeNodePort {
1648 framework.Failf("got unexpected Spec.Type for new service: %v", service)
1649 }
1650 if len(service.Spec.Ports) != 1 {
1651 framework.Failf("got unexpected len(Spec.Ports) for new service: %v", service)
1652 }
1653 port := service.Spec.Ports[0]
1654 if port.NodePort == 0 {
1655 framework.Failf("got unexpected Spec.Ports[0].nodePort for new service: %v", service)
1656 }
1657 if !e2eservice.NodePortRange.Contains(int(port.NodePort)) {
1658 framework.Failf("got unexpected (out-of-range) port for new service: %v", service)
1659 }
1660
1661 outOfRangeNodePort := 0
1662 for {
1663 outOfRangeNodePort = 1 + rand.Intn(65535)
1664 if !e2eservice.NodePortRange.Contains(outOfRangeNodePort) {
1665 break
1666 }
1667 }
1668 ginkgo.By(fmt.Sprintf("changing service "+serviceName+" to out-of-range NodePort %d", outOfRangeNodePort))
1669 result, err := e2eservice.UpdateService(ctx, cs, ns, serviceName, func(s *v1.Service) {
1670 s.Spec.Ports[0].NodePort = int32(outOfRangeNodePort)
1671 })
1672 if err == nil {
1673 framework.Failf("failed to prevent update of service with out-of-range NodePort: %v", result)
1674 }
1675 expectedErr := fmt.Sprintf("%d.*port is not in the valid range", outOfRangeNodePort)
1676 gomega.Expect(fmt.Sprintf("%v", err)).To(gomega.MatchRegexp(expectedErr))
1677
1678 ginkgo.By("deleting original service " + serviceName)
1679 err = t.DeleteService(serviceName)
1680 framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
1681
1682 ginkgo.By(fmt.Sprintf("creating service "+serviceName+" with out-of-range NodePort %d", outOfRangeNodePort))
1683 service = t.BuildServiceSpec()
1684 service.Spec.Type = v1.ServiceTypeNodePort
1685 service.Spec.Ports[0].NodePort = int32(outOfRangeNodePort)
1686 service, err = t.CreateService(service)
1687 if err == nil {
1688 framework.Failf("failed to prevent create of service with out-of-range NodePort (%d): %v", outOfRangeNodePort, service)
1689 }
1690 gomega.Expect(fmt.Sprintf("%v", err)).To(gomega.MatchRegexp(expectedErr))
1691 })
1692
1693 ginkgo.It("should release NodePorts on delete", func(ctx context.Context) {
1694
1695 serviceName := "nodeport-reuse"
1696 ns := f.Namespace.Name
1697
1698 t := NewServerTest(cs, ns, serviceName)
1699 defer func() {
1700 defer ginkgo.GinkgoRecover()
1701 errs := t.Cleanup()
1702 if len(errs) != 0 {
1703 framework.Failf("errors in cleanup: %v", errs)
1704 }
1705 }()
1706
1707 service := t.BuildServiceSpec()
1708 service.Spec.Type = v1.ServiceTypeNodePort
1709
1710 ginkgo.By("creating service " + serviceName + " with type NodePort in namespace " + ns)
1711 service, err := t.CreateService(service)
1712 framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)
1713
1714 if service.Spec.Type != v1.ServiceTypeNodePort {
1715 framework.Failf("got unexpected Spec.Type for new service: %v", service)
1716 }
1717 if len(service.Spec.Ports) != 1 {
1718 framework.Failf("got unexpected len(Spec.Ports) for new service: %v", service)
1719 }
1720 port := service.Spec.Ports[0]
1721 if port.NodePort == 0 {
1722 framework.Failf("got unexpected Spec.Ports[0].nodePort for new service: %v", service)
1723 }
1724 if !e2eservice.NodePortRange.Contains(int(port.NodePort)) {
1725 framework.Failf("got unexpected (out-of-range) port for new service: %v", service)
1726 }
1727 nodePort := port.NodePort
1728
1729 ginkgo.By("deleting original service " + serviceName)
1730 err = t.DeleteService(serviceName)
1731 framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
1732
1733 hostExec := launchHostExecPod(ctx, f.ClientSet, f.Namespace.Name, "hostexec")
1734 cmd := fmt.Sprintf(`! ss -ant46 'sport = :%d' | tail -n +2 | grep LISTEN`, nodePort)
1735 var stdout string
1736 if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) {
1737 var err error
1738 stdout, err = e2eoutput.RunHostCmd(hostExec.Namespace, hostExec.Name, cmd)
1739 if err != nil {
1740 framework.Logf("expected node port (%d) to not be in use, stdout: %v", nodePort, stdout)
1741 return false, nil
1742 }
1743 return true, nil
1744 }); pollErr != nil {
1745 framework.Failf("expected node port (%d) to not be in use in %v, stdout: %v", nodePort, e2eservice.KubeProxyLagTimeout, stdout)
1746 }
1747
1748 ginkgo.By(fmt.Sprintf("creating service "+serviceName+" with same NodePort %d", nodePort))
1749 service = t.BuildServiceSpec()
1750 service.Spec.Type = v1.ServiceTypeNodePort
1751 service.Spec.Ports[0].NodePort = nodePort
1752 _, err = t.CreateService(service)
1753 framework.ExpectNoError(err, "failed to create service: %s in namespace: %s", serviceName, ns)
1754 })
1755
1756 ginkgo.It("should create endpoints for unready pods", func(ctx context.Context) {
1757 serviceName := "tolerate-unready"
1758 ns := f.Namespace.Name
1759
1760 t := NewServerTest(cs, ns, serviceName)
1761 defer func() {
1762 defer ginkgo.GinkgoRecover()
1763 errs := t.Cleanup()
1764 if len(errs) != 0 {
1765 framework.Failf("errors in cleanup: %v", errs)
1766 }
1767 }()
1768
1769 t.Name = "slow-terminating-unready-pod"
1770 t.Image = imageutils.GetE2EImage(imageutils.Agnhost)
1771 port := int32(80)
1772 terminateSeconds := int64(100)
1773
1774 service := &v1.Service{
1775 ObjectMeta: metav1.ObjectMeta{
1776 Name: t.ServiceName,
1777 Namespace: t.Namespace,
1778 },
1779 Spec: v1.ServiceSpec{
1780 Selector: t.Labels,
1781 Ports: []v1.ServicePort{{
1782 Name: "http",
1783 Port: port,
1784 TargetPort: intstr.FromInt32(port),
1785 }},
1786 PublishNotReadyAddresses: true,
1787 },
1788 }
1789 rcSpec := e2erc.ByNameContainer(t.Name, 1, t.Labels, v1.Container{
1790 Args: []string{"netexec", fmt.Sprintf("--http-port=%d", port)},
1791 Name: t.Name,
1792 Image: t.Image,
1793 Ports: []v1.ContainerPort{{ContainerPort: int32(port), Protocol: v1.ProtocolTCP}},
1794 ReadinessProbe: &v1.Probe{
1795 ProbeHandler: v1.ProbeHandler{
1796 Exec: &v1.ExecAction{
1797 Command: []string{"/bin/false"},
1798 },
1799 },
1800 },
1801 Lifecycle: &v1.Lifecycle{
1802 PreStop: &v1.LifecycleHandler{
1803 Exec: &v1.ExecAction{
1804 Command: []string{"/bin/sleep", fmt.Sprintf("%d", terminateSeconds)},
1805 },
1806 },
1807 },
1808 }, nil)
1809 rcSpec.Spec.Template.Spec.TerminationGracePeriodSeconds = &terminateSeconds
1810
1811 ginkgo.By(fmt.Sprintf("creating RC %v with selectors %v", rcSpec.Name, rcSpec.Spec.Selector))
1812 _, err := t.CreateRC(rcSpec)
1813 framework.ExpectNoError(err)
1814
1815 ginkgo.By(fmt.Sprintf("creating Service %v with selectors %v", service.Name, service.Spec.Selector))
1816 _, err = t.CreateService(service)
1817 framework.ExpectNoError(err)
1818
1819 ginkgo.By("Verifying pods for RC " + t.Name)
1820 framework.ExpectNoError(e2epod.VerifyPods(ctx, t.Client, t.Namespace, t.Name, false, 1))
1821
1822 svcName := fmt.Sprintf("%v.%v.svc.%v", serviceName, f.Namespace.Name, framework.TestContext.ClusterDNSDomain)
1823 ginkgo.By("Waiting for endpoints of Service with DNS name " + svcName)
1824
1825 execPod := e2epod.CreateExecPodOrFail(ctx, f.ClientSet, f.Namespace.Name, "execpod-", nil)
1826 execPodName := execPod.Name
1827 cmd := fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:%d/", svcName, port)
1828 var stdout string
1829 if pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, e2eservice.KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) {
1830 var err error
1831 stdout, err = e2eoutput.RunHostCmd(f.Namespace.Name, execPodName, cmd)
1832 if err != nil {
1833 framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.Name, stdout, err)
1834 return false, nil
1835 }
1836 return true, nil
1837 }); pollErr != nil {
1838 framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.Name, e2eservice.KubeProxyLagTimeout, stdout)
1839 }
1840
1841 ginkgo.By("Scaling down replication controller to zero")
1842 e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, t.Namespace, rcSpec.Name, 0, false)
1843
1844 ginkgo.By("Update service to not tolerate unready services")
1845 _, err = e2eservice.UpdateService(ctx, f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) {
1846 s.Spec.PublishNotReadyAddresses = false
1847 })
1848 framework.ExpectNoError(err)
1849
1850 ginkgo.By("Check if pod is unreachable")
1851 cmd = fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:%d/; test \"$?\" -ne \"0\"", svcName, port)
1852 if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) {
1853 var err error
1854 stdout, err = e2eoutput.RunHostCmd(f.Namespace.Name, execPodName, cmd)
1855 if err != nil {
1856 framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.Name, stdout, err)
1857 return false, nil
1858 }
1859 return true, nil
1860 }); pollErr != nil {
1861 framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.Name, e2eservice.KubeProxyLagTimeout, stdout)
1862 }
1863
1864 ginkgo.By("Update service to tolerate unready services again")
1865 _, err = e2eservice.UpdateService(ctx, f.ClientSet, t.Namespace, t.ServiceName, func(s *v1.Service) {
1866 s.Spec.PublishNotReadyAddresses = true
1867 })
1868 framework.ExpectNoError(err)
1869
1870 ginkgo.By("Check if terminating pod is available through service")
1871 cmd = fmt.Sprintf("curl -q -s --connect-timeout 2 http://%s:%d/", svcName, port)
1872 if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyLagTimeout, func() (bool, error) {
1873 var err error
1874 stdout, err = e2eoutput.RunHostCmd(f.Namespace.Name, execPodName, cmd)
1875 if err != nil {
1876 framework.Logf("expected un-ready endpoint for Service %v, stdout: %v, err %v", t.Name, stdout, err)
1877 return false, nil
1878 }
1879 return true, nil
1880 }); pollErr != nil {
1881 framework.Failf("expected un-ready endpoint for Service %v within %v, stdout: %v", t.Name, e2eservice.KubeProxyLagTimeout, stdout)
1882 }
1883
1884 ginkgo.By("Remove pods immediately")
1885 label := labels.SelectorFromSet(labels.Set(t.Labels))
1886 options := metav1.ListOptions{LabelSelector: label.String()}
1887 podClient := t.Client.CoreV1().Pods(f.Namespace.Name)
1888 pods, err := podClient.List(ctx, options)
1889 if err != nil {
1890 framework.Logf("warning: error retrieving pods: %s", err)
1891 } else {
1892 for _, pod := range pods.Items {
1893 var gracePeriodSeconds int64 = 0
1894 err := podClient.Delete(ctx, pod.Name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
1895 if err != nil {
1896 framework.Logf("warning: error force deleting pod '%s': %s", pod.Name, err)
1897 }
1898 }
1899 }
1900 })
1901
1902 ginkgo.It("should be able to connect to terminating and unready endpoints if PublishNotReadyAddresses is true", func(ctx context.Context) {
1903 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
1904 framework.ExpectNoError(err)
1905 nodeCounts := len(nodes.Items)
1906 if nodeCounts < 2 {
1907 e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
1908 }
1909 node0 := nodes.Items[0]
1910 node1 := nodes.Items[1]
1911
1912 serviceName := "svc-tolerate-unready"
1913 ns := f.Namespace.Name
1914 servicePort := 80
1915
1916 ginkgo.By("creating a NodePort TCP service " + serviceName + " that PublishNotReadyAddresses on" + ns)
1917 jig := e2eservice.NewTestJig(cs, ns, serviceName)
1918 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
1919 svc.Spec.Ports = []v1.ServicePort{
1920 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
1921 }
1922 svc.Spec.Type = v1.ServiceTypeNodePort
1923 svc.Spec.PublishNotReadyAddresses = true
1924 })
1925 framework.ExpectNoError(err, "failed to create Service")
1926
1927 ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
1928 gracePeriod := int64(300)
1929 webserverPod0 := &v1.Pod{
1930 ObjectMeta: metav1.ObjectMeta{
1931 Name: "webserver-pod",
1932 },
1933 Spec: v1.PodSpec{
1934 Containers: []v1.Container{
1935 {
1936 Name: "agnhost",
1937 Image: imageutils.GetE2EImage(imageutils.Agnhost),
1938 Args: []string{"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod)},
1939 Ports: []v1.ContainerPort{
1940 {
1941 ContainerPort: 80,
1942 },
1943 },
1944 ReadinessProbe: &v1.Probe{
1945 ProbeHandler: v1.ProbeHandler{
1946 HTTPGet: &v1.HTTPGetAction{
1947 Path: "/readyz",
1948 Port: intstr.IntOrString{
1949 IntVal: int32(80),
1950 },
1951 Scheme: v1.URISchemeHTTP,
1952 },
1953 },
1954 },
1955 LivenessProbe: &v1.Probe{
1956 ProbeHandler: v1.ProbeHandler{
1957 HTTPGet: &v1.HTTPGetAction{
1958 Path: "/healthz",
1959 Port: intstr.IntOrString{
1960 IntVal: int32(80),
1961 },
1962 Scheme: v1.URISchemeHTTP,
1963 },
1964 },
1965 },
1966 },
1967 },
1968 },
1969 }
1970 webserverPod0.Labels = jig.Labels
1971 webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod)
1972 e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
1973
1974 _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
1975 framework.ExpectNoError(err, "failed to create pod")
1976 err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout)
1977 if err != nil {
1978 framework.Failf("error waiting for pod %s to be ready %v", webserverPod0.Name, err)
1979 }
1980 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
1981
1982 ginkgo.By("Creating 1 pause pods that will try to connect to the webservers")
1983 pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
1984 e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
1985
1986 pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
1987 framework.ExpectNoError(err, "failed to create pod")
1988 err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout)
1989 if err != nil {
1990 framework.Failf("error waiting for pod %s to be ready %v", pausePod1.Name, err)
1991 }
1992
1993
1994
1995
1996 err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
1997 framework.ExpectNoError(err)
1998
1999
2000 err = e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, webserverPod0.Name, "pod not ready", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) {
2001 return !podutil.IsPodReady(pod), nil
2002 })
2003 if err != nil {
2004 framework.Failf("error waiting for pod %s to be unready %v", webserverPod0.Name, err)
2005 }
2006
2007 nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
2008 nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP)
2009 clusterIPAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
2010 nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
2011 nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
2012
2013 for i := 0; i < 5; i++ {
2014 execHostnameTest(*pausePod1, clusterIPAddress, webserverPod0.Name)
2015 execHostnameTest(*pausePod1, nodePortAddress0, webserverPod0.Name)
2016 execHostnameTest(*pausePod1, nodePortAddress1, webserverPod0.Name)
2017 time.Sleep(5 * time.Second)
2018 }
2019 })
2020
2021 ginkgo.It("should not be able to connect to terminating and unready endpoints if PublishNotReadyAddresses is false", func(ctx context.Context) {
2022 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
2023 framework.ExpectNoError(err)
2024 nodeCounts := len(nodes.Items)
2025 if nodeCounts < 2 {
2026 e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
2027 }
2028 node0 := nodes.Items[0]
2029 node1 := nodes.Items[1]
2030
2031 serviceName := "svc-not-tolerate-unready"
2032 ns := f.Namespace.Name
2033 servicePort := 80
2034
2035 ginkgo.By("creating a NodePort TCP service " + serviceName + " that PublishNotReadyAddresses on" + ns)
2036 jig := e2eservice.NewTestJig(cs, ns, serviceName)
2037 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
2038 svc.Spec.Ports = []v1.ServicePort{
2039 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
2040 }
2041 svc.Spec.Type = v1.ServiceTypeNodePort
2042 svc.Spec.PublishNotReadyAddresses = false
2043 })
2044 framework.ExpectNoError(err, "failed to create Service")
2045
2046 ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
2047 gracePeriod := int64(300)
2048 webserverPod0 := &v1.Pod{
2049 ObjectMeta: metav1.ObjectMeta{
2050 Name: "webserver-pod",
2051 },
2052 Spec: v1.PodSpec{
2053 Containers: []v1.Container{
2054 {
2055 Name: "agnhost",
2056 Image: imageutils.GetE2EImage(imageutils.Agnhost),
2057 Args: []string{"netexec", "--http-port=80", fmt.Sprintf("--delay-shutdown=%d", gracePeriod)},
2058 Ports: []v1.ContainerPort{
2059 {
2060 ContainerPort: 80,
2061 },
2062 },
2063 ReadinessProbe: &v1.Probe{
2064 ProbeHandler: v1.ProbeHandler{
2065 HTTPGet: &v1.HTTPGetAction{
2066 Path: "/readyz",
2067 Port: intstr.IntOrString{
2068 IntVal: int32(80),
2069 },
2070 Scheme: v1.URISchemeHTTP,
2071 },
2072 },
2073 },
2074 LivenessProbe: &v1.Probe{
2075 ProbeHandler: v1.ProbeHandler{
2076 HTTPGet: &v1.HTTPGetAction{
2077 Path: "/healthz",
2078 Port: intstr.IntOrString{
2079 IntVal: int32(80),
2080 },
2081 Scheme: v1.URISchemeHTTP,
2082 },
2083 },
2084 },
2085 },
2086 },
2087 },
2088 }
2089 webserverPod0.Labels = jig.Labels
2090 webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(gracePeriod)
2091 e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2092
2093 _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
2094 framework.ExpectNoError(err, "failed to create pod")
2095 err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout)
2096 if err != nil {
2097 framework.Failf("error waiting for pod %s to be ready %v", webserverPod0.Name, err)
2098 }
2099 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
2100
2101 ginkgo.By("Creating 1 pause pods that will try to connect to the webservers")
2102 pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
2103 e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
2104
2105 pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
2106 framework.ExpectNoError(err, "failed to create pod")
2107 err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout)
2108 if err != nil {
2109 framework.Failf("error waiting for pod %s to be ready %v", pausePod1.Name, err)
2110 }
2111
2112
2113
2114
2115 err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
2116 framework.ExpectNoError(err)
2117
2118
2119 err = e2epod.WaitForPodCondition(ctx, f.ClientSet, f.Namespace.Name, webserverPod0.Name, "pod not ready", framework.PodStartTimeout, func(pod *v1.Pod) (bool, error) {
2120 return !podutil.IsPodReady(pod), nil
2121 })
2122 if err != nil {
2123 framework.Failf("error waiting for pod %s to be unready %v", webserverPod0.Name, err)
2124 }
2125
2126 nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
2127 nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP)
2128 nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
2129 nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
2130
2131
2132 cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress0)
2133 if pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, true, func(_ context.Context) (bool, error) {
2134 _, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
2135 if err != nil {
2136 return true, nil
2137 }
2138 return false, nil
2139 }); pollErr != nil {
2140 framework.ExpectNoError(pollErr, "pod on node0 still serves traffic")
2141 }
2142 cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress1)
2143 if pollErr := wait.PollUntilContextTimeout(ctx, framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, true, func(_ context.Context) (bool, error) {
2144 _, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
2145 if err != nil {
2146 return true, nil
2147 }
2148 return false, nil
2149 }); pollErr != nil {
2150 framework.ExpectNoError(pollErr, "pod on node1 still serves traffic")
2151 }
2152
2153 clusterIPAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
2154
2155 for i := 0; i < 5; i++ {
2156 cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, clusterIPAddress)
2157 _, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
2158 gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to cluster IP")
2159
2160 cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress0)
2161 _, err = e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
2162 gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to NodePort address")
2163
2164 cmd = fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress1)
2165 _, err = e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
2166 gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to NodePort address")
2167
2168 time.Sleep(5 * time.Second)
2169 }
2170 })
2171
2172
2181 framework.ConformanceIt("should have session affinity work for service with type clusterIP [LinuxOnly]", func(ctx context.Context) {
2182 svc := getServeHostnameService("affinity-clusterip")
2183 svc.Spec.Type = v1.ServiceTypeClusterIP
2184 execAffinityTestForNonLBService(ctx, f, cs, svc)
2185 })
2186
2187 ginkgo.It("should have session affinity timeout work for service with type clusterIP [LinuxOnly]", func(ctx context.Context) {
2188 svc := getServeHostnameService("affinity-clusterip-timeout")
2189 svc.Spec.Type = v1.ServiceTypeClusterIP
2190 execAffinityTestForSessionAffinityTimeout(ctx, f, cs, svc)
2191 })
2192
2193
2203 framework.ConformanceIt("should be able to switch session affinity for service with type clusterIP [LinuxOnly]", func(ctx context.Context) {
2204 svc := getServeHostnameService("affinity-clusterip-transition")
2205 svc.Spec.Type = v1.ServiceTypeClusterIP
2206 execAffinityTestForNonLBServiceWithTransition(ctx, f, cs, svc)
2207 })
2208
2209
2218 framework.ConformanceIt("should have session affinity work for NodePort service [LinuxOnly]", func(ctx context.Context) {
2219 svc := getServeHostnameService("affinity-nodeport")
2220 svc.Spec.Type = v1.ServiceTypeNodePort
2221 execAffinityTestForNonLBService(ctx, f, cs, svc)
2222 })
2223
2224 ginkgo.It("should have session affinity timeout work for NodePort service [LinuxOnly]", func(ctx context.Context) {
2225 svc := getServeHostnameService("affinity-nodeport-timeout")
2226 svc.Spec.Type = v1.ServiceTypeNodePort
2227 execAffinityTestForSessionAffinityTimeout(ctx, f, cs, svc)
2228 })
2229
2230
2240 framework.ConformanceIt("should be able to switch session affinity for NodePort service [LinuxOnly]", func(ctx context.Context) {
2241 svc := getServeHostnameService("affinity-nodeport-transition")
2242 svc.Spec.Type = v1.ServiceTypeNodePort
2243 execAffinityTestForNonLBServiceWithTransition(ctx, f, cs, svc)
2244 })
2245
2246 ginkgo.It("should implement service.kubernetes.io/service-proxy-name", func(ctx context.Context) {
2247 ns := f.Namespace.Name
2248 numPods, servicePort := 3, defaultServeHostnameServicePort
2249 serviceProxyNameLabels := map[string]string{"service.kubernetes.io/service-proxy-name": "foo-bar"}
2250
2251
2252
2253
2254
2255
2256 ginkgo.By("creating service-disabled in namespace " + ns)
2257 svcDisabled := getServeHostnameService("service-proxy-disabled")
2258 svcDisabled.ObjectMeta.Labels = serviceProxyNameLabels
2259 _, svcDisabledIP, err := StartServeHostnameService(ctx, cs, svcDisabled, ns, numPods)
2260 framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svcDisabledIP, ns)
2261
2262 ginkgo.By("creating service in namespace " + ns)
2263 svcToggled := getServeHostnameService("service-proxy-toggled")
2264 podToggledNames, svcToggledIP, err := StartServeHostnameService(ctx, cs, svcToggled, ns, numPods)
2265 framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svcToggledIP, ns)
2266
2267 jig := e2eservice.NewTestJig(cs, ns, svcToggled.ObjectMeta.Name)
2268
2269 ginkgo.By("verifying service is up")
2270 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podToggledNames, svcToggledIP, servicePort))
2271
2272 ginkgo.By("verifying service-disabled is not up")
2273 framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svcDisabledIP, servicePort))
2274
2275 ginkgo.By("adding service-proxy-name label")
2276 _, err = jig.UpdateService(ctx, func(svc *v1.Service) {
2277 svc.ObjectMeta.Labels = serviceProxyNameLabels
2278 })
2279 framework.ExpectNoError(err)
2280
2281 ginkgo.By("verifying service is not up")
2282 framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svcToggledIP, servicePort))
2283
2284 ginkgo.By("removing service-proxy-name annotation")
2285 _, err = jig.UpdateService(ctx, func(svc *v1.Service) {
2286 svc.ObjectMeta.Labels = nil
2287 })
2288 framework.ExpectNoError(err)
2289
2290 ginkgo.By("verifying service is up")
2291 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podToggledNames, svcToggledIP, servicePort))
2292
2293 ginkgo.By("verifying service-disabled is still not up")
2294 framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svcDisabledIP, servicePort))
2295 })
2296
2297 ginkgo.It("should implement service.kubernetes.io/headless", func(ctx context.Context) {
2298 ns := f.Namespace.Name
2299 numPods, servicePort := 3, defaultServeHostnameServicePort
2300 serviceHeadlessLabels := map[string]string{v1.IsHeadlessService: ""}
2301
2302
2303
2304
2305
2306
2307 ginkgo.By("creating service-headless in namespace " + ns)
2308 svcHeadless := getServeHostnameService("service-headless")
2309 svcHeadless.ObjectMeta.Labels = serviceHeadlessLabels
2310
2311 _, svcHeadlessIP, err := StartServeHostnameService(ctx, cs, svcHeadless, ns, numPods)
2312 framework.ExpectNoError(err, "failed to create replication controller with headless service: %s in the namespace: %s", svcHeadlessIP, ns)
2313
2314 ginkgo.By("creating service in namespace " + ns)
2315 svcHeadlessToggled := getServeHostnameService("service-headless-toggled")
2316 podHeadlessToggledNames, svcHeadlessToggledIP, err := StartServeHostnameService(ctx, cs, svcHeadlessToggled, ns, numPods)
2317 framework.ExpectNoError(err, "failed to create replication controller with service: %s in the namespace: %s", svcHeadlessToggledIP, ns)
2318
2319 jig := e2eservice.NewTestJig(cs, ns, svcHeadlessToggled.ObjectMeta.Name)
2320
2321 ginkgo.By("verifying service is up")
2322 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podHeadlessToggledNames, svcHeadlessToggledIP, servicePort))
2323
2324 ginkgo.By("verifying service-headless is not up")
2325 framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svcHeadlessIP, servicePort))
2326
2327 ginkgo.By("adding service.kubernetes.io/headless label")
2328 _, err = jig.UpdateService(ctx, func(svc *v1.Service) {
2329 svc.ObjectMeta.Labels = serviceHeadlessLabels
2330 })
2331 framework.ExpectNoError(err)
2332
2333 ginkgo.By("verifying service is not up")
2334 framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svcHeadlessToggledIP, servicePort))
2335
2336 ginkgo.By("removing service.kubernetes.io/headless annotation")
2337 _, err = jig.UpdateService(ctx, func(svc *v1.Service) {
2338 svc.ObjectMeta.Labels = nil
2339 })
2340 framework.ExpectNoError(err)
2341
2342 ginkgo.By("verifying service is up")
2343 framework.ExpectNoError(verifyServeHostnameServiceUp(ctx, cs, ns, podHeadlessToggledNames, svcHeadlessToggledIP, servicePort))
2344
2345 ginkgo.By("verifying service-headless is still not up")
2346 framework.ExpectNoError(verifyServeHostnameServiceDown(ctx, cs, ns, svcHeadlessIP, servicePort))
2347 })
2348
2349 ginkgo.It("should be rejected when no endpoints exist", func(ctx context.Context) {
2350 namespace := f.Namespace.Name
2351 serviceName := "no-pods"
2352 jig := e2eservice.NewTestJig(cs, namespace, serviceName)
2353 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests)
2354 framework.ExpectNoError(err)
2355 port := 80
2356
2357 ginkgo.By("creating a service with no endpoints")
2358 _, err = jig.CreateTCPServiceWithPort(ctx, nil, int32(port))
2359 framework.ExpectNoError(err)
2360
2361 nodeName := nodes.Items[0].Name
2362 podName := "execpod-noendpoints"
2363
2364 ginkgo.By(fmt.Sprintf("creating %v on node %v", podName, nodeName))
2365 execPod := e2epod.CreateExecPodOrFail(ctx, f.ClientSet, namespace, podName, func(pod *v1.Pod) {
2366 nodeSelection := e2epod.NodeSelection{Name: nodeName}
2367 e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
2368 })
2369
2370 serviceAddress := net.JoinHostPort(serviceName, strconv.Itoa(port))
2371 framework.Logf("waiting up to %v to connect to %v", e2eservice.KubeProxyEndpointLagTimeout, serviceAddress)
2372 cmd := fmt.Sprintf("/agnhost connect --timeout=3s %s", serviceAddress)
2373
2374 ginkgo.By(fmt.Sprintf("hitting service %v from pod %v on node %v", serviceAddress, podName, nodeName))
2375 expectedErr := "REFUSED"
2376 if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, func() (bool, error) {
2377 _, err := e2eoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
2378
2379 if err != nil {
2380 if strings.Contains(err.Error(), expectedErr) {
2381 framework.Logf("error contained '%s', as expected: %s", expectedErr, err.Error())
2382 return true, nil
2383 }
2384 framework.Logf("error didn't contain '%s', keep trying: %s", expectedErr, err.Error())
2385 return false, nil
2386 }
2387 return true, errors.New("expected connect call to fail")
2388 }); pollErr != nil {
2389 framework.ExpectNoError(pollErr)
2390 }
2391 })
2392
2393
2394 ginkgo.It("should be rejected for evicted pods (no endpoints exist)", func(ctx context.Context) {
2395 namespace := f.Namespace.Name
2396 serviceName := "evicted-pods"
2397 jig := e2eservice.NewTestJig(cs, namespace, serviceName)
2398 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, e2eservice.MaxNodesForEndpointsTests)
2399 framework.ExpectNoError(err)
2400 nodeName := nodes.Items[0].Name
2401
2402 port := 80
2403
2404 ginkgo.By("creating a service with no endpoints")
2405 _, err = jig.CreateTCPServiceWithPort(ctx, func(s *v1.Service) {
2406
2407 s.Spec.PublishNotReadyAddresses = true
2408 }, int32(port))
2409 framework.ExpectNoError(err)
2410
2411
2412 ginkgo.By("creating a client pod that is going to be evicted for the service " + serviceName)
2413 evictedPod := e2epod.NewAgnhostPod(namespace, "evicted-pod", nil, nil, nil)
2414 evictedPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", "sleep 10; dd if=/dev/zero of=file bs=1M count=10; sleep 10000"}
2415 evictedPod.Spec.Containers[0].Name = "evicted-pod"
2416 evictedPod.Spec.Containers[0].Resources = v1.ResourceRequirements{
2417 Limits: v1.ResourceList{"ephemeral-storage": resource.MustParse("5Mi")},
2418 }
2419 e2epod.NewPodClient(f).Create(ctx, evictedPod)
2420 err = e2epod.WaitForPodTerminatedInNamespace(ctx, f.ClientSet, evictedPod.Name, "Evicted", f.Namespace.Name)
2421 if err != nil {
2422 framework.Failf("error waiting for pod to be evicted: %v", err)
2423 }
2424
2425 podName := "execpod-evictedpods"
2426 ginkgo.By(fmt.Sprintf("creating %v on node %v", podName, nodeName))
2427 execPod := e2epod.CreateExecPodOrFail(ctx, f.ClientSet, namespace, podName, func(pod *v1.Pod) {
2428 nodeSelection := e2epod.NodeSelection{Name: nodeName}
2429 e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
2430 })
2431
2432 if epErr := wait.PollImmediate(framework.Poll, e2eservice.ServiceEndpointsTimeout, func() (bool, error) {
2433 endpoints, err := cs.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
2434 if err != nil {
2435 framework.Logf("error fetching '%s/%s' Endpoints: %s", namespace, serviceName, err.Error())
2436 return false, err
2437 }
2438 if len(endpoints.Subsets) > 0 {
2439 framework.Logf("expected '%s/%s' Endpoints to be empty, got: %v", namespace, serviceName, endpoints.Subsets)
2440 return false, nil
2441 }
2442 epsList, err := cs.DiscoveryV1().EndpointSlices(namespace).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)})
2443 if err != nil {
2444 framework.Logf("error fetching '%s/%s' EndpointSlices: %s", namespace, serviceName, err.Error())
2445 return false, err
2446 }
2447 if len(epsList.Items) != 1 {
2448 framework.Logf("expected exactly 1 EndpointSlice, got: %d", len(epsList.Items))
2449 return false, nil
2450 }
2451 endpointSlice := epsList.Items[0]
2452 if len(endpointSlice.Endpoints) > 0 {
2453 framework.Logf("expected EndpointSlice to be empty, got %d endpoints", len(endpointSlice.Endpoints))
2454 return false, nil
2455 }
2456 return true, nil
2457 }); epErr != nil {
2458 framework.ExpectNoError(epErr)
2459 }
2460
2461 serviceAddress := net.JoinHostPort(serviceName, strconv.Itoa(port))
2462 framework.Logf("waiting up to %v to connect to %v", e2eservice.KubeProxyEndpointLagTimeout, serviceAddress)
2463 cmd := fmt.Sprintf("/agnhost connect --timeout=3s %s", serviceAddress)
2464
2465 ginkgo.By(fmt.Sprintf("hitting service %v from pod %v on node %v expected to be refused", serviceAddress, podName, nodeName))
2466 expectedErr := "REFUSED"
2467 if pollErr := wait.PollImmediate(framework.Poll, e2eservice.KubeProxyEndpointLagTimeout, func() (bool, error) {
2468 _, err := e2eoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
2469
2470 if err != nil {
2471 if strings.Contains(err.Error(), expectedErr) {
2472 framework.Logf("error contained '%s', as expected: %s", expectedErr, err.Error())
2473 return true, nil
2474 }
2475 framework.Logf("error didn't contain '%s', keep trying: %s", expectedErr, err.Error())
2476 return false, nil
2477 }
2478 return true, errors.New("expected connect call to fail")
2479 }); pollErr != nil {
2480 framework.ExpectNoError(pollErr)
2481 }
2482 })
2483
2484 ginkgo.It("should respect internalTrafficPolicy=Local Pod to Pod", func(ctx context.Context) {
2485
2486
2487 e2eskipper.SkipIfNodeOSDistroIs("windows")
2488
2489 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
2490 framework.ExpectNoError(err)
2491 nodeCounts := len(nodes.Items)
2492 if nodeCounts < 2 {
2493 e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
2494 }
2495 node0 := nodes.Items[0]
2496 node1 := nodes.Items[1]
2497
2498 serviceName := "svc-itp"
2499 ns := f.Namespace.Name
2500 servicePort := 80
2501
2502 ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP and internalTrafficPolicy=Local in namespace " + ns)
2503 local := v1.ServiceInternalTrafficPolicyLocal
2504 jig := e2eservice.NewTestJig(cs, ns, serviceName)
2505 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
2506 svc.Spec.Ports = []v1.ServicePort{
2507 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
2508 }
2509 svc.Spec.InternalTrafficPolicy = &local
2510 })
2511 framework.ExpectNoError(err)
2512
2513 ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
2514 webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort))
2515 webserverPod0.Labels = jig.Labels
2516 e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2517
2518 _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
2519 framework.ExpectNoError(err)
2520 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
2521
2522 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
2523
2524 ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
2525 pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
2526 e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2527
2528 pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
2529 framework.ExpectNoError(err)
2530 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
2531
2532 pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
2533 e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
2534
2535 pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
2536 framework.ExpectNoError(err)
2537 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
2538
2539
2540 serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
2541 for i := 0; i < 5; i++ {
2542
2543 execHostnameTest(*pausePod0, serviceAddress, webserverPod0.Name)
2544
2545
2546 cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, serviceAddress)
2547 _, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
2548 gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to cluster IP")
2549 }
2550 })
2551
2552 ginkgo.It("should respect internalTrafficPolicy=Local Pod (hostNetwork: true) to Pod", func(ctx context.Context) {
2553
2554
2555 e2eskipper.SkipIfNodeOSDistroIs("windows")
2556
2557 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
2558 framework.ExpectNoError(err)
2559 nodeCounts := len(nodes.Items)
2560 if nodeCounts < 2 {
2561 e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
2562 }
2563 node0 := nodes.Items[0]
2564 node1 := nodes.Items[1]
2565
2566 serviceName := "svc-itp"
2567 ns := f.Namespace.Name
2568 servicePort := 8000
2569
2570 ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP and internalTrafficPolicy=Local in namespace " + ns)
2571 local := v1.ServiceInternalTrafficPolicyLocal
2572 jig := e2eservice.NewTestJig(cs, ns, serviceName)
2573 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
2574 svc.Spec.Ports = []v1.ServicePort{
2575 {Port: 8000, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(8000)},
2576 }
2577 svc.Spec.InternalTrafficPolicy = &local
2578 })
2579 framework.ExpectNoError(err)
2580
2581 ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
2582 webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort))
2583 webserverPod0.Labels = jig.Labels
2584 e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2585
2586 _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
2587 framework.ExpectNoError(err)
2588 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
2589
2590 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
2591
2592 ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
2593 pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
2594 pausePod0.Spec.HostNetwork = true
2595 e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2596
2597 pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
2598 framework.ExpectNoError(err)
2599 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
2600
2601 pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
2602 pausePod1.Spec.HostNetwork = true
2603 e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
2604
2605 pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
2606 framework.ExpectNoError(err)
2607 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
2608
2609
2610 serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
2611 for i := 0; i < 5; i++ {
2612
2613 execHostnameTest(*pausePod0, serviceAddress, webserverPod0.Name)
2614
2615
2616 cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, serviceAddress)
2617 _, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
2618 gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to cluster IP")
2619 }
2620 })
2621
2622 ginkgo.It("should respect internalTrafficPolicy=Local Pod and Node, to Pod (hostNetwork: true)", func(ctx context.Context) {
2623
2624
2625 e2eskipper.SkipIfNodeOSDistroIs("windows")
2626
2627 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
2628 framework.ExpectNoError(err)
2629 nodeCounts := len(nodes.Items)
2630 if nodeCounts < 2 {
2631 e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
2632 }
2633 node0 := nodes.Items[0]
2634 node1 := nodes.Items[1]
2635
2636 serviceName := "svc-itp"
2637 ns := f.Namespace.Name
2638 servicePort := 80
2639
2640
2641 endpointPort := 10180
2642
2643 ginkgo.By("creating a TCP service " + serviceName + " with type=ClusterIP and internalTrafficPolicy=Local in namespace " + ns)
2644 local := v1.ServiceInternalTrafficPolicyLocal
2645 jig := e2eservice.NewTestJig(cs, ns, serviceName)
2646 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
2647 svc.Spec.Ports = []v1.ServicePort{
2648 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt(endpointPort)},
2649 }
2650 svc.Spec.InternalTrafficPolicy = &local
2651 })
2652 framework.ExpectNoError(err)
2653
2654 ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
2655 webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(endpointPort), "--udp-port", strconv.Itoa(endpointPort))
2656 webserverPod0.Labels = jig.Labels
2657 webserverPod0.Spec.HostNetwork = true
2658 e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2659
2660 _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
2661 framework.ExpectNoError(err)
2662 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
2663
2664 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {endpointPort}})
2665
2666 ginkgo.By("Creating 2 pause pods that will try to connect to the webserver")
2667 pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
2668 e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2669
2670 pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
2671 framework.ExpectNoError(err)
2672 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
2673
2674 pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
2675 e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
2676
2677 pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
2678 framework.ExpectNoError(err)
2679 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
2680
2681
2682 serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
2683 for i := 0; i < 5; i++ {
2684
2685
2686 execHostnameTest(*pausePod0, serviceAddress, node0.Name)
2687
2688
2689 cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, serviceAddress)
2690 _, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
2691 gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to cluster IP")
2692 }
2693
2694 ginkgo.By("Creating 2 pause hostNetwork pods that will try to connect to the webserver")
2695 pausePod2 := e2epod.NewAgnhostPod(ns, "pause-pod-2", nil, nil, nil)
2696 pausePod2.Spec.HostNetwork = true
2697 e2epod.SetNodeSelection(&pausePod2.Spec, e2epod.NodeSelection{Name: node0.Name})
2698
2699 pausePod2, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod2, metav1.CreateOptions{})
2700 framework.ExpectNoError(err)
2701 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod2.Name, f.Namespace.Name, framework.PodStartTimeout))
2702
2703 pausePod3 := e2epod.NewAgnhostPod(ns, "pause-pod-3", nil, nil, nil)
2704 pausePod3.Spec.HostNetwork = true
2705 e2epod.SetNodeSelection(&pausePod3.Spec, e2epod.NodeSelection{Name: node1.Name})
2706
2707 pausePod3, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod3, metav1.CreateOptions{})
2708 framework.ExpectNoError(err)
2709 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod3.Name, f.Namespace.Name, framework.PodStartTimeout))
2710
2711
2712 for i := 0; i < 5; i++ {
2713
2714
2715 execHostnameTest(*pausePod2, serviceAddress, node0.Name)
2716
2717
2718 cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, serviceAddress)
2719 _, err := e2eoutput.RunHostCmd(pausePod3.Namespace, pausePod3.Name, cmd)
2720 gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to cluster IP")
2721 }
2722 })
2723
2724 ginkgo.It("should fail health check node port if there are only terminating endpoints", func(ctx context.Context) {
2725
2726 e2eskipper.SkipIfNodeOSDistroIs("windows")
2727
2728 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
2729 framework.ExpectNoError(err)
2730 nodeCounts := len(nodes.Items)
2731 if nodeCounts < 2 {
2732 e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
2733 }
2734 node0 := nodes.Items[0]
2735
2736 serviceName := "svc-proxy-terminating"
2737 ns := f.Namespace.Name
2738 servicePort := 80
2739
2740 ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
2741 jig := e2eservice.NewTestJig(cs, ns, serviceName)
2742 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
2743 svc.Spec.Ports = []v1.ServicePort{
2744 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
2745 }
2746 svc.Spec.Type = v1.ServiceTypeLoadBalancer
2747 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
2748 })
2749 framework.ExpectNoError(err)
2750
2751 ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
2752 webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100")
2753 webserverPod0.Labels = jig.Labels
2754 webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100)
2755 e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2756
2757 _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
2758 framework.ExpectNoError(err)
2759 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
2760 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
2761
2762 pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
2763 e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2764
2765 pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
2766 framework.ExpectNoError(err)
2767 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
2768
2769 nodeIPs := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
2770 healthCheckNodePortAddr := net.JoinHostPort(nodeIPs[0], strconv.Itoa(int(svc.Spec.HealthCheckNodePort)))
2771
2772 err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
2773 cmd := fmt.Sprintf(`curl -s -o /dev/null -w "%%{http_code}" --max-time 5 http://%s/healthz`, healthCheckNodePortAddr)
2774 out, err := e2eoutput.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd)
2775 if err != nil {
2776 framework.Logf("unexpected error trying to connect to nodeport %s : %v", healthCheckNodePortAddr, err)
2777 return false, nil
2778 }
2779
2780 expectedOut := "200"
2781 if out != expectedOut {
2782 framework.Logf("expected output: %s , got %s", expectedOut, out)
2783 return false, nil
2784 }
2785 return true, nil
2786 })
2787 framework.ExpectNoError(err)
2788
2789
2790 ginkgo.By("Terminating the webserver pod")
2791 err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
2792 framework.ExpectNoError(err)
2793
2794
2795 err = wait.PollImmediate(time.Second, time.Minute, func() (bool, error) {
2796 cmd := fmt.Sprintf(`curl -s -o /dev/null -w "%%{http_code}" --max-time 5 http://%s/healthz`, healthCheckNodePortAddr)
2797 out, err := e2eoutput.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd)
2798 if err != nil {
2799 framework.Logf("unexpected error trying to connect to nodeport %s : %v", healthCheckNodePortAddr, err)
2800 return false, nil
2801 }
2802
2803 expectedOut := "503"
2804 if out != expectedOut {
2805 framework.Logf("expected output: %s , got %s", expectedOut, out)
2806 return false, nil
2807 }
2808 return true, nil
2809 })
2810 framework.ExpectNoError(err)
2811
2812
2813 nodePortAddress := net.JoinHostPort(nodeIPs[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
2814 execHostnameTest(*pausePod0, nodePortAddress, webserverPod0.Name)
2815 })
2816
2817 ginkgo.It("should fallback to terminating endpoints when there are no ready endpoints with internalTrafficPolicy=Cluster", func(ctx context.Context) {
2818
2819 e2eskipper.SkipIfNodeOSDistroIs("windows")
2820
2821 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
2822 framework.ExpectNoError(err)
2823 nodeCounts := len(nodes.Items)
2824 if nodeCounts < 2 {
2825 e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
2826 }
2827 node0 := nodes.Items[0]
2828 node1 := nodes.Items[1]
2829
2830 serviceName := "svc-proxy-terminating"
2831 ns := f.Namespace.Name
2832 servicePort := 80
2833
2834 ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
2835 jig := e2eservice.NewTestJig(cs, ns, serviceName)
2836 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
2837 svc.Spec.Ports = []v1.ServicePort{
2838 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
2839 }
2840 })
2841 framework.ExpectNoError(err)
2842
2843 ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
2844 webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100")
2845 webserverPod0.Labels = jig.Labels
2846 webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100)
2847 e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2848
2849 _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
2850 framework.ExpectNoError(err)
2851 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
2852 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
2853
2854 ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
2855 pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
2856 e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2857
2858 pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
2859 framework.ExpectNoError(err)
2860 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
2861
2862 pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
2863 e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
2864
2865 pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
2866 framework.ExpectNoError(err)
2867 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
2868
2869
2870
2871
2872 err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
2873 framework.ExpectNoError(err)
2874
2875
2876 serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
2877 for i := 0; i < 5; i++ {
2878
2879
2880
2881
2882
2883 execHostnameTest(*pausePod0, serviceAddress, webserverPod0.Name)
2884 execHostnameTest(*pausePod1, serviceAddress, webserverPod0.Name)
2885
2886 time.Sleep(5 * time.Second)
2887 }
2888 })
2889
2890 ginkgo.It("should fallback to local terminating endpoints when there are no ready endpoints with internalTrafficPolicy=Local", func(ctx context.Context) {
2891
2892 e2eskipper.SkipIfNodeOSDistroIs("windows")
2893
2894 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
2895 framework.ExpectNoError(err)
2896 nodeCounts := len(nodes.Items)
2897 if nodeCounts < 2 {
2898 e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
2899 }
2900 node0 := nodes.Items[0]
2901 node1 := nodes.Items[1]
2902
2903 serviceName := "svc-proxy-terminating"
2904 ns := f.Namespace.Name
2905 servicePort := 80
2906
2907 ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
2908 jig := e2eservice.NewTestJig(cs, ns, serviceName)
2909 local := v1.ServiceInternalTrafficPolicyLocal
2910 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
2911 svc.Spec.Ports = []v1.ServicePort{
2912 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
2913 }
2914 svc.Spec.InternalTrafficPolicy = &local
2915 })
2916 framework.ExpectNoError(err)
2917
2918 ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
2919 webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100")
2920 webserverPod0.Labels = jig.Labels
2921 webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100)
2922 e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2923
2924 _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
2925 framework.ExpectNoError(err)
2926 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
2927 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
2928
2929 ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
2930 pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
2931 e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
2932
2933 pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
2934 framework.ExpectNoError(err)
2935 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
2936
2937 pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
2938 e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
2939
2940 pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
2941 framework.ExpectNoError(err)
2942 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
2943
2944
2945
2946
2947 err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
2948 framework.ExpectNoError(err)
2949
2950
2951 serviceAddress := net.JoinHostPort(svc.Spec.ClusterIP, strconv.Itoa(servicePort))
2952 for i := 0; i < 5; i++ {
2953
2954
2955
2956
2957
2958 execHostnameTest(*pausePod0, serviceAddress, webserverPod0.Name)
2959
2960 cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, serviceAddress)
2961 _, err := e2eoutput.RunHostCmd(pausePod1.Namespace, pausePod1.Name, cmd)
2962 gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to cluster IP")
2963
2964 time.Sleep(5 * time.Second)
2965 }
2966 })
2967
2968 ginkgo.It("should fallback to terminating endpoints when there are no ready endpoints with externallTrafficPolicy=Cluster", func(ctx context.Context) {
2969
2970 e2eskipper.SkipIfNodeOSDistroIs("windows")
2971
2972 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
2973 framework.ExpectNoError(err)
2974 nodeCounts := len(nodes.Items)
2975 if nodeCounts < 2 {
2976 e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
2977 }
2978 node0 := nodes.Items[0]
2979 node1 := nodes.Items[1]
2980
2981 serviceName := "svc-proxy-terminating"
2982 ns := f.Namespace.Name
2983 servicePort := 80
2984
2985 ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
2986 jig := e2eservice.NewTestJig(cs, ns, serviceName)
2987 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
2988 svc.Spec.Ports = []v1.ServicePort{
2989 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
2990 }
2991 svc.Spec.Type = v1.ServiceTypeNodePort
2992 })
2993 framework.ExpectNoError(err)
2994
2995 ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
2996 webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100")
2997 webserverPod0.Labels = jig.Labels
2998 webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100)
2999 e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
3000
3001 _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
3002 framework.ExpectNoError(err)
3003 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
3004 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
3005
3006 ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
3007 pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
3008 e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
3009
3010 pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
3011 framework.ExpectNoError(err)
3012 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
3013
3014 pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
3015 e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
3016
3017 pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
3018 framework.ExpectNoError(err)
3019 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
3020
3021
3022
3023
3024 err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
3025 framework.ExpectNoError(err)
3026
3027
3028 nodeIPs := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
3029 nodePortAddress := net.JoinHostPort(nodeIPs[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
3030 for i := 0; i < 5; i++ {
3031
3032
3033
3034
3035
3036 execHostnameTest(*pausePod0, nodePortAddress, webserverPod0.Name)
3037 execHostnameTest(*pausePod1, nodePortAddress, webserverPod0.Name)
3038
3039 time.Sleep(5 * time.Second)
3040 }
3041 })
3042
3043 ginkgo.It("should fallback to local terminating endpoints when there are no ready endpoints with externalTrafficPolicy=Local", func(ctx context.Context) {
3044
3045 e2eskipper.SkipIfNodeOSDistroIs("windows")
3046
3047 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
3048 framework.ExpectNoError(err)
3049 nodeCounts := len(nodes.Items)
3050 if nodeCounts < 2 {
3051 e2eskipper.Skipf("The test requires at least two ready nodes on %s, but found %v", framework.TestContext.Provider, nodeCounts)
3052 }
3053 node0 := nodes.Items[0]
3054 node1 := nodes.Items[1]
3055
3056 serviceName := "svc-proxy-terminating"
3057 ns := f.Namespace.Name
3058 servicePort := 80
3059
3060 ginkgo.By("creating a TCP service " + serviceName + " where all pods are terminating" + ns)
3061 jig := e2eservice.NewTestJig(cs, ns, serviceName)
3062 svc, err := jig.CreateTCPService(ctx, func(svc *v1.Service) {
3063 svc.Spec.Ports = []v1.ServicePort{
3064 {Port: 80, Name: "http", Protocol: v1.ProtocolTCP, TargetPort: intstr.FromInt32(80)},
3065 }
3066 svc.Spec.Type = v1.ServiceTypeNodePort
3067 svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
3068 })
3069 framework.ExpectNoError(err)
3070
3071 ginkgo.By("Creating 1 webserver pod to be part of the TCP service")
3072 webserverPod0 := e2epod.NewAgnhostPod(ns, "echo-hostname-0", nil, nil, nil, "netexec", "--http-port", strconv.Itoa(servicePort), "--delay-shutdown", "100")
3073 webserverPod0.Labels = jig.Labels
3074 webserverPod0.Spec.TerminationGracePeriodSeconds = utilpointer.Int64(100)
3075 e2epod.SetNodeSelection(&webserverPod0.Spec, e2epod.NodeSelection{Name: node0.Name})
3076
3077 _, err = cs.CoreV1().Pods(ns).Create(ctx, webserverPod0, metav1.CreateOptions{})
3078 framework.ExpectNoError(err)
3079 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, webserverPod0.Name, f.Namespace.Name, framework.PodStartTimeout))
3080 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{webserverPod0.Name: {servicePort}})
3081
3082 ginkgo.By("Creating 2 pause pods that will try to connect to the webservers")
3083 pausePod0 := e2epod.NewAgnhostPod(ns, "pause-pod-0", nil, nil, nil)
3084 e2epod.SetNodeSelection(&pausePod0.Spec, e2epod.NodeSelection{Name: node0.Name})
3085
3086 pausePod0, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod0, metav1.CreateOptions{})
3087 framework.ExpectNoError(err)
3088 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod0.Name, f.Namespace.Name, framework.PodStartTimeout))
3089
3090 pausePod1 := e2epod.NewAgnhostPod(ns, "pause-pod-1", nil, nil, nil)
3091 e2epod.SetNodeSelection(&pausePod1.Spec, e2epod.NodeSelection{Name: node1.Name})
3092
3093 pausePod1, err = cs.CoreV1().Pods(ns).Create(ctx, pausePod1, metav1.CreateOptions{})
3094 framework.ExpectNoError(err)
3095 framework.ExpectNoError(e2epod.WaitTimeoutForPodReadyInNamespace(ctx, f.ClientSet, pausePod1.Name, f.Namespace.Name, framework.PodStartTimeout))
3096
3097
3098
3099
3100 err = cs.CoreV1().Pods(ns).Delete(ctx, webserverPod0.Name, metav1.DeleteOptions{})
3101 framework.ExpectNoError(err)
3102
3103
3104 nodeIPs0 := e2enode.GetAddresses(&node0, v1.NodeInternalIP)
3105 nodeIPs1 := e2enode.GetAddresses(&node1, v1.NodeInternalIP)
3106 nodePortAddress0 := net.JoinHostPort(nodeIPs0[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
3107 nodePortAddress1 := net.JoinHostPort(nodeIPs1[0], strconv.Itoa(int(svc.Spec.Ports[0].NodePort)))
3108 for i := 0; i < 5; i++ {
3109
3110
3111
3112
3113
3114
3115
3116
3117
3118
3119
3120 cmd := fmt.Sprintf(`curl -q -s --connect-timeout 5 %s/hostname`, nodePortAddress1)
3121 _, err := e2eoutput.RunHostCmd(pausePod0.Namespace, pausePod0.Name, cmd)
3122 gomega.Expect(err).To(gomega.HaveOccurred(), "expected error when trying to connect to node port for pausePod0")
3123
3124 execHostnameTest(*pausePod0, nodePortAddress0, webserverPod0.Name)
3125 execHostnameTest(*pausePod1, nodePortAddress0, webserverPod0.Name)
3126
3127 time.Sleep(5 * time.Second)
3128 }
3129 })
3130
3131
3136 framework.ConformanceIt("should find a service from listing all namespaces", func(ctx context.Context) {
3137 ginkgo.By("fetching services")
3138 svcs, _ := f.ClientSet.CoreV1().Services("").List(ctx, metav1.ListOptions{})
3139
3140 foundSvc := false
3141 for _, svc := range svcs.Items {
3142 if svc.ObjectMeta.Name == "kubernetes" && svc.ObjectMeta.Namespace == "default" {
3143 foundSvc = true
3144 break
3145 }
3146 }
3147
3148 if !foundSvc {
3149 framework.Fail("could not find service 'kubernetes' in service list in all namespaces")
3150 }
3151 })
3152
3153
3161 framework.ConformanceIt("should test the lifecycle of an Endpoint", func(ctx context.Context) {
3162 testNamespaceName := f.Namespace.Name
3163 testEndpointName := "testservice"
3164 testEndpoints := v1.Endpoints{
3165 ObjectMeta: metav1.ObjectMeta{
3166 Name: testEndpointName,
3167 Labels: map[string]string{
3168 "test-endpoint-static": "true",
3169 },
3170 },
3171 Subsets: []v1.EndpointSubset{{
3172 Addresses: []v1.EndpointAddress{{
3173 IP: "10.0.0.24",
3174 }},
3175 Ports: []v1.EndpointPort{{
3176 Name: "http",
3177 Port: 80,
3178 Protocol: v1.ProtocolTCP,
3179 }},
3180 }},
3181 }
3182 w := &cache.ListWatch{
3183 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
3184 options.LabelSelector = "test-endpoint-static=true"
3185 return f.ClientSet.CoreV1().Endpoints(testNamespaceName).Watch(ctx, options)
3186 },
3187 }
3188 endpointsList, err := f.ClientSet.CoreV1().Endpoints("").List(ctx, metav1.ListOptions{LabelSelector: "test-endpoint-static=true"})
3189 framework.ExpectNoError(err, "failed to list Endpoints")
3190
3191 ginkgo.By("creating an Endpoint")
3192 _, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Create(ctx, &testEndpoints, metav1.CreateOptions{})
3193 framework.ExpectNoError(err, "failed to create Endpoint")
3194 ginkgo.By("waiting for available Endpoint")
3195 ctxUntil, cancel := context.WithTimeout(ctx, 30*time.Second)
3196 defer cancel()
3197 _, err = watchtools.Until(ctxUntil, endpointsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
3198 switch event.Type {
3199 case watch.Added:
3200 if endpoints, ok := event.Object.(*v1.Endpoints); ok {
3201 found := endpoints.ObjectMeta.Name == endpoints.Name &&
3202 endpoints.Labels["test-endpoint-static"] == "true"
3203 return found, nil
3204 }
3205 default:
3206 framework.Logf("observed event type %v", event.Type)
3207 }
3208 return false, nil
3209 })
3210 framework.ExpectNoError(err, "failed to see %v event", watch.Added)
3211
3212 ginkgo.By("listing all Endpoints")
3213 endpointsList, err = f.ClientSet.CoreV1().Endpoints("").List(ctx, metav1.ListOptions{LabelSelector: "test-endpoint-static=true"})
3214 framework.ExpectNoError(err, "failed to list Endpoints")
3215 eventFound := false
3216 var foundEndpoint v1.Endpoints
3217 for _, endpoint := range endpointsList.Items {
3218 if endpoint.ObjectMeta.Name == testEndpointName && endpoint.ObjectMeta.Namespace == testNamespaceName {
3219 eventFound = true
3220 foundEndpoint = endpoint
3221 break
3222 }
3223 }
3224 if !eventFound {
3225 framework.Fail("unable to find Endpoint Service in list of Endpoints")
3226 }
3227
3228 ginkgo.By("updating the Endpoint")
3229 foundEndpoint.ObjectMeta.Labels["test-service"] = "updated"
3230 _, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Update(ctx, &foundEndpoint, metav1.UpdateOptions{})
3231 framework.ExpectNoError(err, "failed to update Endpoint with new label")
3232
3233 ctxUntil, cancel = context.WithTimeout(ctx, 30*time.Second)
3234 defer cancel()
3235 _, err = watchtools.Until(ctxUntil, endpointsList.ResourceVersion, w, func(event watch.Event) (bool, error) {
3236 switch event.Type {
3237 case watch.Modified:
3238 if endpoints, ok := event.Object.(*v1.Endpoints); ok {
3239 found := endpoints.ObjectMeta.Name == endpoints.Name &&
3240 endpoints.Labels["test-endpoint-static"] == "true"
3241 return found, nil
3242 }
3243 default:
3244 framework.Logf("observed event type %v", event.Type)
3245 }
3246 return false, nil
3247 })
3248 framework.ExpectNoError(err, "failed to see %v event", watch.Modified)
3249
3250 ginkgo.By("fetching the Endpoint")
3251 endpoints, err := f.ClientSet.CoreV1().Endpoints(testNamespaceName).Get(ctx, testEndpointName, metav1.GetOptions{})
3252 framework.ExpectNoError(err, "failed to fetch Endpoint")
3253 gomega.Expect(foundEndpoint.ObjectMeta.Labels).To(gomega.HaveKeyWithValue("test-service", "updated"), "failed to update Endpoint %v in namespace %v label not updated", testEndpointName, testNamespaceName)
3254
3255 endpointPatch, err := json.Marshal(map[string]interface{}{
3256 "metadata": map[string]interface{}{
3257 "labels": map[string]string{
3258 "test-service": "patched",
3259 },
3260 },
3261 "subsets": []map[string]interface{}{
3262 {
3263 "addresses": []map[string]string{
3264 {
3265 "ip": "10.0.0.25",
3266 },
3267 },
3268 "ports": []map[string]interface{}{
3269 {
3270 "name": "http-test",
3271 "port": int32(8080),
3272 },
3273 },
3274 },
3275 },
3276 })
3277 framework.ExpectNoError(err, "failed to marshal JSON for WatchEvent patch")
3278 ginkgo.By("patching the Endpoint")
3279 _, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Patch(ctx, testEndpointName, types.StrategicMergePatchType, []byte(endpointPatch), metav1.PatchOptions{})
3280 framework.ExpectNoError(err, "failed to patch Endpoint")
3281 ctxUntil, cancel = context.WithTimeout(ctx, 30*time.Second)
3282 defer cancel()
3283 _, err = watchtools.Until(ctxUntil, endpoints.ResourceVersion, w, func(event watch.Event) (bool, error) {
3284 switch event.Type {
3285 case watch.Modified:
3286 if endpoints, ok := event.Object.(*v1.Endpoints); ok {
3287 found := endpoints.ObjectMeta.Name == endpoints.Name &&
3288 endpoints.Labels["test-endpoint-static"] == "true"
3289 return found, nil
3290 }
3291 default:
3292 framework.Logf("observed event type %v", event.Type)
3293 }
3294 return false, nil
3295 })
3296 framework.ExpectNoError(err, "failed to see %v event", watch.Modified)
3297
3298 ginkgo.By("fetching the Endpoint")
3299 endpoints, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Get(ctx, testEndpointName, metav1.GetOptions{})
3300 framework.ExpectNoError(err, "failed to fetch Endpoint")
3301 gomega.Expect(endpoints.ObjectMeta.Labels).To(gomega.HaveKeyWithValue("test-service", "patched"), "failed to patch Endpoint with Label")
3302 endpointSubsetOne := endpoints.Subsets[0]
3303 endpointSubsetOneAddresses := endpointSubsetOne.Addresses[0]
3304 endpointSubsetOnePorts := endpointSubsetOne.Ports[0]
3305 gomega.Expect(endpointSubsetOneAddresses.IP).To(gomega.Equal("10.0.0.25"), "failed to patch Endpoint")
3306 gomega.Expect(endpointSubsetOnePorts.Name).To(gomega.Equal("http-test"), "failed to patch Endpoint")
3307 gomega.Expect(endpointSubsetOnePorts.Port).To(gomega.Equal(int32(8080)), "failed to patch Endpoint")
3308
3309 ginkgo.By("deleting the Endpoint by Collection")
3310 err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: "test-endpoint-static=true"})
3311 framework.ExpectNoError(err, "failed to delete Endpoint by Collection")
3312
3313 ginkgo.By("waiting for Endpoint deletion")
3314 ctxUntil, cancel = context.WithTimeout(ctx, 30*time.Second)
3315 defer cancel()
3316 _, err = watchtools.Until(ctxUntil, endpoints.ResourceVersion, w, func(event watch.Event) (bool, error) {
3317 switch event.Type {
3318 case watch.Deleted:
3319 if endpoints, ok := event.Object.(*v1.Endpoints); ok {
3320 found := endpoints.ObjectMeta.Name == endpoints.Name &&
3321 endpoints.Labels["test-endpoint-static"] == "true"
3322 return found, nil
3323 }
3324 default:
3325 framework.Logf("observed event type %v", event.Type)
3326 }
3327 return false, nil
3328 })
3329 framework.ExpectNoError(err, "failed to see %v event", watch.Deleted)
3330
3331 ginkgo.By("fetching the Endpoint")
3332 _, err = f.ClientSet.CoreV1().Endpoints(testNamespaceName).Get(ctx, testEndpointName, metav1.GetOptions{})
3333 gomega.Expect(err).To(gomega.HaveOccurred(), "should not be able to fetch Endpoint")
3334 })
3335
3336
3345 framework.ConformanceIt("should complete a service status lifecycle", func(ctx context.Context) {
3346
3347 ns := f.Namespace.Name
3348 svcResource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
3349 svcClient := f.ClientSet.CoreV1().Services(ns)
3350
3351 testSvcName := "test-service-" + utilrand.String(5)
3352 testSvcLabels := map[string]string{"test-service-static": "true"}
3353 testSvcLabelsFlat := "test-service-static=true"
3354
3355 w := &cache.ListWatch{
3356 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
3357 options.LabelSelector = testSvcLabelsFlat
3358 return cs.CoreV1().Services(ns).Watch(ctx, options)
3359 },
3360 }
3361
3362 svcList, err := cs.CoreV1().Services("").List(ctx, metav1.ListOptions{LabelSelector: testSvcLabelsFlat})
3363 framework.ExpectNoError(err, "failed to list Services")
3364
3365 ginkgo.By("creating a Service")
3366 testService := v1.Service{
3367 ObjectMeta: metav1.ObjectMeta{
3368 Name: testSvcName,
3369 Labels: testSvcLabels,
3370 },
3371 Spec: v1.ServiceSpec{
3372 Type: "LoadBalancer",
3373 Ports: []v1.ServicePort{{
3374 Name: "http",
3375 Protocol: v1.ProtocolTCP,
3376 Port: int32(80),
3377 TargetPort: intstr.FromInt32(80),
3378 }},
3379 LoadBalancerClass: utilpointer.String("example.com/internal-vip"),
3380 },
3381 }
3382 _, err = cs.CoreV1().Services(ns).Create(ctx, &testService, metav1.CreateOptions{})
3383 framework.ExpectNoError(err, "failed to create Service")
3384
3385 ginkgo.By("watching for the Service to be added")
3386 ctxUntil, cancel := context.WithTimeout(ctx, svcReadyTimeout)
3387 defer cancel()
3388 _, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
3389 if svc, ok := event.Object.(*v1.Service); ok {
3390 found := svc.ObjectMeta.Name == testService.ObjectMeta.Name &&
3391 svc.ObjectMeta.Namespace == ns &&
3392 svc.Labels["test-service-static"] == "true"
3393 if !found {
3394 framework.Logf("observed Service %v in namespace %v with labels: %v & ports %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Spec.Ports)
3395 return false, nil
3396 }
3397 framework.Logf("Found Service %v in namespace %v with labels: %v & ports %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Spec.Ports)
3398 return found, nil
3399 }
3400 framework.Logf("Observed event: %+v", event.Object)
3401 return false, nil
3402 })
3403 framework.ExpectNoError(err, "Failed to locate Service %v in namespace %v", testService.ObjectMeta.Name, ns)
3404 framework.Logf("Service %s created", testSvcName)
3405
3406 ginkgo.By("Getting /status")
3407 svcStatusUnstructured, err := f.DynamicClient.Resource(svcResource).Namespace(ns).Get(ctx, testSvcName, metav1.GetOptions{}, "status")
3408 framework.ExpectNoError(err, "Failed to fetch ServiceStatus of Service %s in namespace %s", testSvcName, ns)
3409 svcStatusBytes, err := json.Marshal(svcStatusUnstructured)
3410 framework.ExpectNoError(err, "Failed to marshal unstructured response. %v", err)
3411
3412 var svcStatus v1.Service
3413 err = json.Unmarshal(svcStatusBytes, &svcStatus)
3414 framework.ExpectNoError(err, "Failed to unmarshal JSON bytes to a Service object type")
3415 framework.Logf("Service %s has LoadBalancer: %v", testSvcName, svcStatus.Status.LoadBalancer)
3416
3417 ginkgo.By("patching the ServiceStatus")
3418 lbStatus := v1.LoadBalancerStatus{
3419 Ingress: []v1.LoadBalancerIngress{{IP: "203.0.113.1"}},
3420 }
3421 lbStatusJSON, err := json.Marshal(lbStatus)
3422 framework.ExpectNoError(err, "Failed to marshal JSON. %v", err)
3423 _, err = svcClient.Patch(ctx, testSvcName, types.MergePatchType,
3424 []byte(`{"metadata":{"annotations":{"patchedstatus":"true"}},"status":{"loadBalancer":`+string(lbStatusJSON)+`}}`),
3425 metav1.PatchOptions{}, "status")
3426 framework.ExpectNoError(err, "Could not patch service status", err)
3427
3428 ginkgo.By("watching for the Service to be patched")
3429 ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout)
3430 defer cancel()
3431
3432 _, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
3433 if svc, ok := event.Object.(*v1.Service); ok {
3434 found := svc.ObjectMeta.Name == testService.ObjectMeta.Name &&
3435 svc.ObjectMeta.Namespace == ns &&
3436 svc.Annotations["patchedstatus"] == "true"
3437 if !found {
3438 framework.Logf("observed Service %v in namespace %v with annotations: %v & LoadBalancer: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer)
3439 return false, nil
3440 }
3441 framework.Logf("Found Service %v in namespace %v with annotations: %v & LoadBalancer: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer)
3442 return found, nil
3443 }
3444 framework.Logf("Observed event: %+v", event.Object)
3445 return false, nil
3446 })
3447 framework.ExpectNoError(err, "failed to locate Service %v in namespace %v", testService.ObjectMeta.Name, ns)
3448 framework.Logf("Service %s has service status patched", testSvcName)
3449
3450 ginkgo.By("updating the ServiceStatus")
3451
3452 var statusToUpdate, updatedStatus *v1.Service
3453 err = retry.RetryOnConflict(retry.DefaultRetry, func() error {
3454 statusToUpdate, err = svcClient.Get(ctx, testSvcName, metav1.GetOptions{})
3455 framework.ExpectNoError(err, "Unable to retrieve service %s", testSvcName)
3456
3457 statusToUpdate.Status.Conditions = append(statusToUpdate.Status.Conditions, metav1.Condition{
3458 Type: "StatusUpdate",
3459 Status: metav1.ConditionTrue,
3460 Reason: "E2E",
3461 Message: "Set from e2e test",
3462 })
3463
3464 updatedStatus, err = svcClient.UpdateStatus(ctx, statusToUpdate, metav1.UpdateOptions{})
3465 return err
3466 })
3467 framework.ExpectNoError(err, "\n\n Failed to UpdateStatus. %v\n\n", err)
3468 framework.Logf("updatedStatus.Conditions: %#v", updatedStatus.Status.Conditions)
3469
3470 ginkgo.By("watching for the Service to be updated")
3471 ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout)
3472 defer cancel()
3473 _, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
3474 if svc, ok := event.Object.(*v1.Service); ok {
3475 found := svc.ObjectMeta.Name == testService.ObjectMeta.Name &&
3476 svc.ObjectMeta.Namespace == ns &&
3477 svc.Annotations["patchedstatus"] == "true"
3478 if !found {
3479 framework.Logf("Observed Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer)
3480 return false, nil
3481 }
3482 for _, cond := range svc.Status.Conditions {
3483 if cond.Type == "StatusUpdate" &&
3484 cond.Reason == "E2E" &&
3485 cond.Message == "Set from e2e test" {
3486 framework.Logf("Found Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.Conditions)
3487 return found, nil
3488 } else {
3489 framework.Logf("Observed Service %v in namespace %v with annotations: %v & Conditions: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Annotations, svc.Status.LoadBalancer)
3490 return false, nil
3491 }
3492 }
3493 }
3494 framework.Logf("Observed event: %+v", event.Object)
3495 return false, nil
3496 })
3497 framework.ExpectNoError(err, "failed to locate Service %v in namespace %v", testService.ObjectMeta.Name, ns)
3498 framework.Logf("Service %s has service status updated", testSvcName)
3499
3500 ginkgo.By("patching the service")
3501 servicePatchPayload, err := json.Marshal(v1.Service{
3502 ObjectMeta: metav1.ObjectMeta{
3503 Labels: map[string]string{
3504 "test-service": "patched",
3505 },
3506 },
3507 })
3508
3509 _, err = svcClient.Patch(ctx, testSvcName, types.StrategicMergePatchType, []byte(servicePatchPayload), metav1.PatchOptions{})
3510 framework.ExpectNoError(err, "failed to patch service. %v", err)
3511
3512 ginkgo.By("watching for the Service to be patched")
3513 ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout)
3514 defer cancel()
3515 _, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
3516 if svc, ok := event.Object.(*v1.Service); ok {
3517 found := svc.ObjectMeta.Name == testService.ObjectMeta.Name &&
3518 svc.ObjectMeta.Namespace == ns &&
3519 svc.Labels["test-service"] == "patched"
3520 if !found {
3521 framework.Logf("observed Service %v in namespace %v with labels: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels)
3522 return false, nil
3523 }
3524 framework.Logf("Found Service %v in namespace %v with labels: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels)
3525 return found, nil
3526 }
3527 framework.Logf("Observed event: %+v", event.Object)
3528 return false, nil
3529 })
3530 framework.ExpectNoError(err, "failed to locate Service %v in namespace %v", testService.ObjectMeta.Name, ns)
3531 framework.Logf("Service %s patched", testSvcName)
3532
3533 ginkgo.By("deleting the service")
3534 err = cs.CoreV1().Services(ns).Delete(ctx, testSvcName, metav1.DeleteOptions{})
3535 framework.ExpectNoError(err, "failed to delete the Service. %v", err)
3536
3537 ginkgo.By("watching for the Service to be deleted")
3538 ctxUntil, cancel = context.WithTimeout(ctx, svcReadyTimeout)
3539 defer cancel()
3540 _, err = watchtools.Until(ctxUntil, svcList.ResourceVersion, w, func(event watch.Event) (bool, error) {
3541 switch event.Type {
3542 case watch.Deleted:
3543 if svc, ok := event.Object.(*v1.Service); ok {
3544 found := svc.ObjectMeta.Name == testService.ObjectMeta.Name &&
3545 svc.ObjectMeta.Namespace == ns &&
3546 svc.Labels["test-service-static"] == "true"
3547 if !found {
3548 framework.Logf("observed Service %v in namespace %v with labels: %v & annotations: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Annotations)
3549 return false, nil
3550 }
3551 framework.Logf("Found Service %v in namespace %v with labels: %v & annotations: %v", svc.ObjectMeta.Name, svc.ObjectMeta.Namespace, svc.Labels, svc.Annotations)
3552 return found, nil
3553 }
3554 default:
3555 framework.Logf("Observed event: %+v", event.Type)
3556 }
3557 return false, nil
3558 })
3559 framework.ExpectNoError(err, "failed to delete Service %v in namespace %v", testService.ObjectMeta.Name, ns)
3560 framework.Logf("Service %s deleted", testSvcName)
3561 })
3562
3563
3572 framework.ConformanceIt("should delete a collection of services", func(ctx context.Context) {
3573
3574 ns := f.Namespace.Name
3575 svcClient := f.ClientSet.CoreV1().Services(ns)
3576 svcResource := schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}
3577 svcDynamicClient := f.DynamicClient.Resource(svcResource).Namespace(ns)
3578
3579 svcLabel := map[string]string{"e2e-test-service": "delete"}
3580 deleteLabel := labels.SelectorFromSet(svcLabel).String()
3581
3582 ginkgo.By("creating a collection of services")
3583
3584 testServices := []struct {
3585 name string
3586 label map[string]string
3587 port int
3588 }{
3589 {
3590 name: "e2e-svc-a-" + utilrand.String(5),
3591 label: map[string]string{"e2e-test-service": "delete"},
3592 port: 8001,
3593 },
3594 {
3595 name: "e2e-svc-b-" + utilrand.String(5),
3596 label: map[string]string{"e2e-test-service": "delete"},
3597 port: 8002,
3598 },
3599 {
3600 name: "e2e-svc-c-" + utilrand.String(5),
3601 label: map[string]string{"e2e-test-service": "keep"},
3602 port: 8003,
3603 },
3604 }
3605
3606 for _, testService := range testServices {
3607 func() {
3608 framework.Logf("Creating %s", testService.name)
3609
3610 svc := v1.Service{
3611 ObjectMeta: metav1.ObjectMeta{
3612 Name: testService.name,
3613 Labels: testService.label,
3614 },
3615 Spec: v1.ServiceSpec{
3616 Type: "ClusterIP",
3617 Ports: []v1.ServicePort{{
3618 Name: "http",
3619 Protocol: v1.ProtocolTCP,
3620 Port: int32(testService.port),
3621 TargetPort: intstr.FromInt(testService.port),
3622 }},
3623 },
3624 }
3625 _, err := svcClient.Create(ctx, &svc, metav1.CreateOptions{})
3626 framework.ExpectNoError(err, "failed to create Service")
3627
3628 }()
3629 }
3630
3631 svcList, err := cs.CoreV1().Services(ns).List(ctx, metav1.ListOptions{})
3632 framework.ExpectNoError(err, "failed to list Services")
3633 gomega.Expect(svcList.Items).To(gomega.HaveLen(3), "Required count of services out of sync")
3634
3635 ginkgo.By("deleting service collection")
3636 err = svcDynamicClient.DeleteCollection(ctx, metav1.DeleteOptions{}, metav1.ListOptions{LabelSelector: deleteLabel})
3637 framework.ExpectNoError(err, "failed to delete service collection. %v", err)
3638
3639 svcList, err = cs.CoreV1().Services(ns).List(ctx, metav1.ListOptions{})
3640 framework.ExpectNoError(err, "failed to list Services")
3641 gomega.Expect(svcList.Items).To(gomega.HaveLen(1), "Required count of services out of sync")
3642
3643 framework.Logf("Collection of services has been deleted")
3644 })
3645
3646
3656 framework.ConformanceIt("should serve endpoints on same port and different protocols", func(ctx context.Context) {
3657 serviceName := "multiprotocol-test"
3658 testLabels := map[string]string{"app": "multiport"}
3659 ns := f.Namespace.Name
3660 containerPort := 80
3661
3662 svcTCPport := v1.ServicePort{
3663 Name: "tcp-port",
3664 Port: 80,
3665 TargetPort: intstr.FromInt(containerPort),
3666 Protocol: v1.ProtocolTCP,
3667 }
3668 svcUDPport := v1.ServicePort{
3669 Name: "udp-port",
3670 Port: 80,
3671 TargetPort: intstr.FromInt(containerPort),
3672 Protocol: v1.ProtocolUDP,
3673 }
3674
3675 ginkgo.By("creating service " + serviceName + " in namespace " + ns)
3676
3677 testService := v1.Service{
3678 ObjectMeta: metav1.ObjectMeta{
3679 Name: serviceName,
3680 Labels: testLabels,
3681 },
3682 Spec: v1.ServiceSpec{
3683 Type: v1.ServiceTypeClusterIP,
3684 Selector: testLabels,
3685 Ports: []v1.ServicePort{svcTCPport, svcUDPport},
3686 },
3687 }
3688 service, err := cs.CoreV1().Services(ns).Create(ctx, &testService, metav1.CreateOptions{})
3689 framework.ExpectNoError(err, "failed to create Service")
3690
3691 containerPorts := []v1.ContainerPort{{
3692 Name: svcTCPport.Name,
3693 ContainerPort: int32(containerPort),
3694 Protocol: v1.ProtocolTCP,
3695 }, {
3696 Name: svcUDPport.Name,
3697 ContainerPort: int32(containerPort),
3698 Protocol: v1.ProtocolUDP,
3699 }}
3700 podname1 := "pod1"
3701 ginkgo.By("creating pod " + podname1 + " in namespace " + ns)
3702 createPodOrFail(ctx, f, ns, podname1, testLabels, containerPorts, "netexec", "--http-port", strconv.Itoa(containerPort), "--udp-port", strconv.Itoa(containerPort))
3703 validateEndpointsPortsWithProtocolsOrFail(cs, ns, serviceName, fullPortsByPodName{podname1: containerPorts})
3704
3705 ginkgo.By("Checking if the Service forwards traffic to the TCP and UDP port")
3706 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
3707 err = testEndpointReachability(ctx, service.Spec.ClusterIP, 80, v1.ProtocolTCP, execPod, 30*time.Second)
3708 if err != nil {
3709 framework.Failf("Failed to connect to Service TCP port: %v", err)
3710 }
3711 err = testEndpointReachability(ctx, service.Spec.ClusterIP, 80, v1.ProtocolUDP, execPod, 30*time.Second)
3712 if err != nil {
3713 framework.Failf("Failed to connect to Service UDP port: %v", err)
3714 }
3715
3716 ginkgo.By("Checking if the Service forwards traffic to TCP only")
3717 service, err = cs.CoreV1().Services(ns).Get(ctx, serviceName, metav1.GetOptions{})
3718 if err != nil {
3719 framework.Failf("failed to get Service %q: %v", serviceName, err)
3720 }
3721 service.Spec.Ports = []v1.ServicePort{svcTCPport}
3722 _, err = cs.CoreV1().Services(ns).Update(ctx, service, metav1.UpdateOptions{})
3723 if err != nil {
3724 framework.Failf("failed to get Service %q: %v", serviceName, err)
3725 }
3726
3727
3728 err = testEndpointReachability(ctx, service.Spec.ClusterIP, 80, v1.ProtocolTCP, execPod, 30*time.Second)
3729 if err != nil {
3730 framework.Failf("Failed to connect to Service TCP port: %v", err)
3731 }
3732
3733
3734
3735 gomega.Eventually(ctx, func() error {
3736 return testEndpointReachability(ctx, service.Spec.ClusterIP, 80, v1.ProtocolUDP, execPod, 6*time.Second)
3737 }).WithTimeout(30 * time.Second).WithPolling(5 * time.Second).ShouldNot(gomega.BeNil())
3738
3739 ginkgo.By("Checking if the Service forwards traffic to UDP only")
3740 service, err = cs.CoreV1().Services(ns).Get(ctx, serviceName, metav1.GetOptions{})
3741 if err != nil {
3742 framework.Failf("failed to get Service %q: %v", serviceName, err)
3743 }
3744 service.Spec.Ports = []v1.ServicePort{svcUDPport}
3745 _, err = cs.CoreV1().Services(ns).Update(ctx, service, metav1.UpdateOptions{})
3746 if err != nil {
3747 framework.Failf("failed to update Service %q: %v", serviceName, err)
3748 }
3749
3750
3751 err = testEndpointReachability(ctx, service.Spec.ClusterIP, 80, v1.ProtocolUDP, execPod, 30*time.Second)
3752 if err != nil {
3753 framework.Failf("Failed to connect to Service UDP port: %v", err)
3754 }
3755
3756
3757
3758 gomega.Eventually(ctx, func() error {
3759 return testEndpointReachability(ctx, service.Spec.ClusterIP, 80, v1.ProtocolTCP, execPod, 6*time.Second)
3760 }).WithTimeout(30 * time.Second).WithPolling(5 * time.Second).ShouldNot(gomega.BeNil())
3761 })
3762
3763
3768 ginkgo.It("should serve endpoints on same port and different protocol for internal traffic on Type LoadBalancer ", func(ctx context.Context) {
3769 serviceName := "multiprotocol-lb-test"
3770 ns := f.Namespace.Name
3771 jig := e2eservice.NewTestJig(cs, ns, serviceName)
3772
3773 ginkgo.DeferCleanup(func(ctx context.Context) {
3774 err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
3775 framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
3776 })
3777
3778 svc1port := "svc1"
3779 svc2port := "svc2"
3780
3781 ginkgo.By("creating service " + serviceName + " in namespace " + ns)
3782 svc, err := jig.CreateLoadBalancerServiceWaitForClusterIPOnly(func(service *v1.Service) {
3783 service.Spec.Ports = []v1.ServicePort{
3784 {
3785 Name: "portname1",
3786 Port: 80,
3787 TargetPort: intstr.FromString(svc1port),
3788 Protocol: v1.ProtocolTCP,
3789 },
3790 {
3791 Name: "portname2",
3792 Port: 80,
3793 TargetPort: intstr.FromString(svc2port),
3794 Protocol: v1.ProtocolUDP,
3795 },
3796 }
3797 })
3798 framework.ExpectNoError(err)
3799
3800 containerPort := 100
3801
3802 names := map[string]bool{}
3803 ginkgo.DeferCleanup(func(ctx context.Context) {
3804 for name := range names {
3805 err := cs.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
3806 framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", name, ns)
3807 }
3808 })
3809
3810 containerPorts := []v1.ContainerPort{
3811 {
3812 Name: svc1port,
3813 ContainerPort: int32(containerPort),
3814 Protocol: v1.ProtocolTCP,
3815 },
3816 {
3817 Name: svc2port,
3818 ContainerPort: int32(containerPort),
3819 Protocol: v1.ProtocolUDP,
3820 },
3821 }
3822
3823 podname1 := "pod1"
3824
3825 createPodOrFail(ctx, f, ns, podname1, jig.Labels, containerPorts, "netexec", "--http-port", strconv.Itoa(containerPort), "--udp-port", strconv.Itoa(containerPort))
3826 validateEndpointsPortsWithProtocolsOrFail(cs, ns, serviceName, fullPortsByPodName{podname1: containerPorts})
3827
3828 ginkgo.By("Checking if the Service forwards traffic to pods")
3829 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod", nil)
3830 err = jig.CheckServiceReachability(ctx, svc, execPod)
3831 framework.ExpectNoError(err)
3832 e2epod.DeletePodOrFail(ctx, cs, ns, podname1)
3833 })
3834
3835
3836
3837 f.It("should allow creating a basic SCTP service with pod and endpoints [LinuxOnly]", f.WithSerial(), func(ctx context.Context) {
3838 serviceName := "sctp-endpoint-test"
3839 ns := f.Namespace.Name
3840 jig := e2eservice.NewTestJig(cs, ns, serviceName)
3841
3842 ginkgo.By("getting the state of the sctp module on nodes")
3843 nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, cs, 2)
3844 framework.ExpectNoError(err)
3845 sctpLoadedAtStart := CheckSCTPModuleLoadedOnNodes(ctx, f, nodes)
3846
3847 ginkgo.By("creating service " + serviceName + " in namespace " + ns)
3848 _, err = jig.CreateSCTPServiceWithPort(ctx, nil, 5060)
3849 framework.ExpectNoError(err)
3850 ginkgo.DeferCleanup(func(ctx context.Context) {
3851 err := cs.CoreV1().Services(ns).Delete(ctx, serviceName, metav1.DeleteOptions{})
3852 framework.ExpectNoError(err, "failed to delete service: %s in namespace: %s", serviceName, ns)
3853 })
3854
3855 err = e2enetwork.WaitForService(ctx, f.ClientSet, ns, serviceName, true, 5*time.Second, e2eservice.TestTimeout)
3856 framework.ExpectNoError(err, fmt.Sprintf("error while waiting for service:%s err: %v", serviceName, err))
3857
3858 ginkgo.By("validating endpoints do not exist yet")
3859 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
3860
3861 ginkgo.By("creating a pod for the service")
3862 names := map[string]bool{}
3863
3864 name1 := "pod1"
3865
3866 createPodOrFail(ctx, f, ns, name1, jig.Labels, []v1.ContainerPort{{ContainerPort: 5060, Protocol: v1.ProtocolSCTP}})
3867 names[name1] = true
3868 ginkgo.DeferCleanup(func(ctx context.Context) {
3869 for name := range names {
3870 err := cs.CoreV1().Pods(ns).Delete(ctx, name, metav1.DeleteOptions{})
3871 framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", name, ns)
3872 }
3873 })
3874
3875 ginkgo.By("validating endpoints exists")
3876 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{name1: {5060}})
3877
3878 ginkgo.By("deleting the pod")
3879 e2epod.DeletePodOrFail(ctx, cs, ns, name1)
3880 delete(names, name1)
3881 ginkgo.By("validating endpoints do not exist anymore")
3882 validateEndpointsPortsOrFail(ctx, cs, ns, serviceName, portsByPodName{})
3883
3884 ginkgo.By("validating sctp module is still not loaded")
3885 sctpLoadedAtEnd := CheckSCTPModuleLoadedOnNodes(ctx, f, nodes)
3886 if !sctpLoadedAtStart && sctpLoadedAtEnd {
3887 framework.Failf("The state of the sctp module has changed due to the test case")
3888 }
3889 })
3890 })
3891
3892
3893
3894
3895
3896 func execAffinityTestForSessionAffinityTimeout(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
3897 ns := f.Namespace.Name
3898 numPods, servicePort, serviceName := 3, defaultServeHostnameServicePort, svc.ObjectMeta.Name
3899 ginkgo.By("creating service in namespace " + ns)
3900 serviceType := svc.Spec.Type
3901
3902 svcSessionAffinityTimeout := int32(SessionAffinityTimeout)
3903 svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
3904 svc.Spec.SessionAffinityConfig = &v1.SessionAffinityConfig{
3905 ClientIP: &v1.ClientIPConfig{TimeoutSeconds: &svcSessionAffinityTimeout},
3906 }
3907 _, _, err := StartServeHostnameService(ctx, cs, svc, ns, numPods)
3908 framework.ExpectNoError(err, "failed to create replication controller with service in the namespace: %s", ns)
3909 ginkgo.DeferCleanup(StopServeHostnameService, cs, ns, serviceName)
3910 jig := e2eservice.NewTestJig(cs, ns, serviceName)
3911 svc, err = jig.Client.CoreV1().Services(ns).Get(ctx, serviceName, metav1.GetOptions{})
3912 framework.ExpectNoError(err, "failed to fetch service: %s in namespace: %s", serviceName, ns)
3913 var svcIP string
3914 if serviceType == v1.ServiceTypeNodePort {
3915 nodes, err := e2enode.GetReadySchedulableNodes(ctx, cs)
3916 framework.ExpectNoError(err)
3917
3918 family := v1.IPv4Protocol
3919 if netutils.IsIPv6String(svc.Spec.ClusterIP) {
3920 family = v1.IPv6Protocol
3921 }
3922 svcIP = e2enode.FirstAddressByTypeAndFamily(nodes, v1.NodeInternalIP, family)
3923 gomega.Expect(svcIP).NotTo(gomega.BeEmpty(), "failed to get Node internal IP for family: %s", family)
3924 servicePort = int(svc.Spec.Ports[0].NodePort)
3925 } else {
3926 svcIP = svc.Spec.ClusterIP
3927 }
3928
3929 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod-affinity", nil)
3930 ginkgo.DeferCleanup(func(ctx context.Context) {
3931 framework.Logf("Cleaning up the exec pod")
3932 err := cs.CoreV1().Pods(ns).Delete(ctx, execPod.Name, metav1.DeleteOptions{})
3933 framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", execPod.Name, ns)
3934 })
3935 err = jig.CheckServiceReachability(ctx, svc, execPod)
3936 framework.ExpectNoError(err)
3937
3938
3939 if !checkAffinity(ctx, cs, execPod, svcIP, servicePort, true) {
3940 framework.Failf("the service %s (%s:%d) should be sticky until the timeout expires", svc.Name, svcIP, servicePort)
3941 }
3942
3943
3944 hosts := sets.NewString()
3945 cmd := fmt.Sprintf(`curl -q -s --connect-timeout 2 http://%s/`, net.JoinHostPort(svcIP, strconv.Itoa(servicePort)))
3946 for i := 0; i < 10; i++ {
3947 hostname, err := e2eoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
3948 if err == nil {
3949 hosts.Insert(hostname)
3950 if hosts.Len() > 1 {
3951 return
3952 }
3953
3954
3955
3956
3957
3958
3959
3960
3961
3962
3963
3964
3965 time.Sleep(time.Duration(svcSessionAffinityTimeout+5) * time.Second)
3966 }
3967 }
3968 framework.Fail("Session is sticky after reaching the timeout")
3969 }
3970
3971 func execAffinityTestForNonLBServiceWithTransition(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
3972 execAffinityTestForNonLBServiceWithOptionalTransition(ctx, f, cs, svc, true)
3973 }
3974
3975 func execAffinityTestForNonLBService(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
3976 execAffinityTestForNonLBServiceWithOptionalTransition(ctx, f, cs, svc, false)
3977 }
3978
3979
3980
3981
3982
3983
3984 func execAffinityTestForNonLBServiceWithOptionalTransition(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service, isTransitionTest bool) {
3985 ns := f.Namespace.Name
3986 numPods, servicePort, serviceName := 3, defaultServeHostnameServicePort, svc.ObjectMeta.Name
3987 ginkgo.By("creating service in namespace " + ns)
3988 serviceType := svc.Spec.Type
3989 svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
3990 _, _, err := StartServeHostnameService(ctx, cs, svc, ns, numPods)
3991 framework.ExpectNoError(err, "failed to create replication controller with service in the namespace: %s", ns)
3992 ginkgo.DeferCleanup(StopServeHostnameService, cs, ns, serviceName)
3993 jig := e2eservice.NewTestJig(cs, ns, serviceName)
3994 svc, err = jig.Client.CoreV1().Services(ns).Get(ctx, serviceName, metav1.GetOptions{})
3995 framework.ExpectNoError(err, "failed to fetch service: %s in namespace: %s", serviceName, ns)
3996 var svcIP string
3997 if serviceType == v1.ServiceTypeNodePort {
3998 nodes, err := e2enode.GetReadySchedulableNodes(ctx, cs)
3999 framework.ExpectNoError(err)
4000
4001 family := v1.IPv4Protocol
4002 if netutils.IsIPv6String(svc.Spec.ClusterIP) {
4003 family = v1.IPv6Protocol
4004 }
4005 svcIP = e2enode.FirstAddressByTypeAndFamily(nodes, v1.NodeInternalIP, family)
4006 gomega.Expect(svcIP).NotTo(gomega.BeEmpty(), "failed to get Node internal IP for family: %s", family)
4007 servicePort = int(svc.Spec.Ports[0].NodePort)
4008 } else {
4009 svcIP = svc.Spec.ClusterIP
4010 }
4011
4012 execPod := e2epod.CreateExecPodOrFail(ctx, cs, ns, "execpod-affinity", nil)
4013 ginkgo.DeferCleanup(func(ctx context.Context) {
4014 framework.Logf("Cleaning up the exec pod")
4015 err := cs.CoreV1().Pods(ns).Delete(ctx, execPod.Name, metav1.DeleteOptions{})
4016 framework.ExpectNoError(err, "failed to delete pod: %s in namespace: %s", execPod.Name, ns)
4017 })
4018 err = jig.CheckServiceReachability(ctx, svc, execPod)
4019 framework.ExpectNoError(err)
4020
4021 if !isTransitionTest {
4022 if !checkAffinity(ctx, cs, execPod, svcIP, servicePort, true) {
4023 framework.Failf("Failed to check affinity for service %s/%s", ns, svc.Name)
4024 }
4025 }
4026 if isTransitionTest {
4027 _, err = jig.UpdateService(ctx, func(svc *v1.Service) {
4028 svc.Spec.SessionAffinity = v1.ServiceAffinityNone
4029 })
4030 framework.ExpectNoError(err)
4031 if !checkAffinity(ctx, cs, execPod, svcIP, servicePort, false) {
4032 framework.Failf("Failed to check affinity for service %s/%s without session affinity", ns, svc.Name)
4033 }
4034 _, err = jig.UpdateService(ctx, func(svc *v1.Service) {
4035 svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
4036 })
4037 framework.ExpectNoError(err)
4038 if !checkAffinity(ctx, cs, execPod, svcIP, servicePort, true) {
4039 framework.Failf("Failed to check affinity for service %s/%s with session affinity", ns, svc.Name)
4040 }
4041 }
4042 }
4043
4044 func execAffinityTestForLBServiceWithTransition(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
4045 execAffinityTestForLBServiceWithOptionalTransition(ctx, f, cs, svc, true)
4046 }
4047
4048 func execAffinityTestForLBService(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service) {
4049 execAffinityTestForLBServiceWithOptionalTransition(ctx, f, cs, svc, false)
4050 }
4051
4052
4053
4054
4055 func execAffinityTestForLBServiceWithOptionalTransition(ctx context.Context, f *framework.Framework, cs clientset.Interface, svc *v1.Service, isTransitionTest bool) {
4056 numPods, ns, serviceName := 3, f.Namespace.Name, svc.ObjectMeta.Name
4057
4058 ginkgo.By("creating service in namespace " + ns)
4059 svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
4060 _, _, err := StartServeHostnameService(ctx, cs, svc, ns, numPods)
4061 framework.ExpectNoError(err, "failed to create replication controller with service in the namespace: %s", ns)
4062 jig := e2eservice.NewTestJig(cs, ns, serviceName)
4063 ginkgo.By("waiting for loadbalancer for service " + ns + "/" + serviceName)
4064 svc, err = jig.WaitForLoadBalancer(ctx, e2eservice.GetServiceLoadBalancerCreationTimeout(ctx, cs))
4065 framework.ExpectNoError(err)
4066 ginkgo.DeferCleanup(func(ctx context.Context) {
4067 podNodePairs, err := e2enode.PodNodePairs(ctx, cs, ns)
4068 framework.Logf("[pod,node] pairs: %+v; err: %v", podNodePairs, err)
4069 _ = StopServeHostnameService(ctx, cs, ns, serviceName)
4070 lb := cloudprovider.DefaultLoadBalancerName(svc)
4071 framework.Logf("cleaning load balancer resource for %s", lb)
4072 e2eservice.CleanupServiceResources(ctx, cs, lb, framework.TestContext.CloudConfig.Region, framework.TestContext.CloudConfig.Zone)
4073 })
4074 ingressIP := e2eservice.GetIngressPoint(&svc.Status.LoadBalancer.Ingress[0])
4075 port := int(svc.Spec.Ports[0].Port)
4076
4077 if !isTransitionTest {
4078 if !checkAffinity(ctx, cs, nil, ingressIP, port, true) {
4079 framework.Failf("Failed to verify affinity for loadbalance service %s/%s", ns, serviceName)
4080 }
4081 }
4082 if isTransitionTest {
4083 svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
4084 svc.Spec.SessionAffinity = v1.ServiceAffinityNone
4085 })
4086 framework.ExpectNoError(err)
4087 if !checkAffinity(ctx, cs, nil, ingressIP, port, false) {
4088 framework.Failf("Failed to verify affinity for loadbalance service %s/%s without session affinity ", ns, serviceName)
4089 }
4090 svc, err = jig.UpdateService(ctx, func(svc *v1.Service) {
4091 svc.Spec.SessionAffinity = v1.ServiceAffinityClientIP
4092 })
4093 framework.ExpectNoError(err)
4094 if !checkAffinity(ctx, cs, nil, ingressIP, port, true) {
4095 framework.Failf("Failed to verify affinity for loadbalance service %s/%s with session affinity ", ns, serviceName)
4096 }
4097 }
4098 }
4099
4100 func createAndGetExternalServiceFQDN(ctx context.Context, cs clientset.Interface, ns, serviceName string) string {
4101 _, _, err := StartServeHostnameService(ctx, cs, getServeHostnameService(serviceName), ns, 2)
4102 framework.ExpectNoError(err, "Expected Service %s to be running", serviceName)
4103 return fmt.Sprintf("%s.%s.svc.%s", serviceName, ns, framework.TestContext.ClusterDNSDomain)
4104 }
4105
4106 func createPausePodDeployment(ctx context.Context, cs clientset.Interface, name, ns string, replicas int) *appsv1.Deployment {
4107 labels := map[string]string{"deployment": "agnhost-pause"}
4108 pauseDeployment := e2edeployment.NewDeployment(name, int32(replicas), labels, "", "", appsv1.RollingUpdateDeploymentStrategyType)
4109
4110 pauseDeployment.Spec.Template.Spec.Containers[0] = e2epod.NewAgnhostContainer("agnhost-pause", nil, nil, "pause")
4111 pauseDeployment.Spec.Template.Spec.Affinity = &v1.Affinity{
4112 PodAntiAffinity: &v1.PodAntiAffinity{
4113 RequiredDuringSchedulingIgnoredDuringExecution: []v1.PodAffinityTerm{
4114 {
4115 LabelSelector: &metav1.LabelSelector{MatchLabels: labels},
4116 TopologyKey: "kubernetes.io/hostname",
4117 Namespaces: []string{ns},
4118 },
4119 },
4120 },
4121 }
4122
4123 deployment, err := cs.AppsV1().Deployments(ns).Create(ctx, pauseDeployment, metav1.CreateOptions{})
4124 framework.ExpectNoError(err, "Error in creating deployment for pause pod")
4125 return deployment
4126 }
4127
4128
4129 func createPodOrFail(ctx context.Context, f *framework.Framework, ns, name string, labels map[string]string, containerPorts []v1.ContainerPort, args ...string) {
4130 ginkgo.By(fmt.Sprintf("Creating pod %s in namespace %s", name, ns))
4131 pod := e2epod.NewAgnhostPod(ns, name, nil, nil, containerPorts, args...)
4132 pod.ObjectMeta.Labels = labels
4133
4134
4135 pod.Spec.Containers[0].Env = []v1.EnvVar{{Name: "FOO", Value: " "}}
4136 e2epod.NewPodClient(f).CreateSync(ctx, pod)
4137 }
4138
4139
4140
4141 func launchHostExecPod(ctx context.Context, client clientset.Interface, ns, name string) *v1.Pod {
4142 framework.Logf("Creating new host exec pod")
4143 hostExecPod := e2epod.NewExecPodSpec(ns, name, true)
4144 pod, err := client.CoreV1().Pods(ns).Create(ctx, hostExecPod, metav1.CreateOptions{})
4145 framework.ExpectNoError(err)
4146 err = e2epod.WaitTimeoutForPodReadyInNamespace(ctx, client, name, ns, framework.PodStartTimeout)
4147 framework.ExpectNoError(err)
4148 return pod
4149 }
4150
4151
4152 func checkReachabilityFromPod(expectToBeReachable bool, timeout time.Duration, namespace, pod, target string) {
4153 cmd := fmt.Sprintf("wget -T 5 -qO- %q", target)
4154 err := wait.PollImmediate(framework.Poll, timeout, func() (bool, error) {
4155 _, err := e2eoutput.RunHostCmd(namespace, pod, cmd)
4156 if expectToBeReachable && err != nil {
4157 framework.Logf("Expect target to be reachable. But got err: %v. Retry until timeout", err)
4158 return false, nil
4159 }
4160
4161 if !expectToBeReachable && err == nil {
4162 framework.Logf("Expect target NOT to be reachable. But it is reachable. Retry until timeout")
4163 return false, nil
4164 }
4165 return true, nil
4166 })
4167 framework.ExpectNoError(err)
4168 }
4169
4170
4171
4172 func enableAndDisableInternalLB() (enable func(svc *v1.Service), disable func(svc *v1.Service)) {
4173 return framework.TestContext.CloudConfig.Provider.EnableAndDisableInternalLB()
4174 }
4175
4176 func validatePorts(ep, expectedEndpoints portsByPodUID) error {
4177 if len(ep) != len(expectedEndpoints) {
4178
4179 return fmt.Errorf("invalid number of endpoints got %v, expected %v", ep, expectedEndpoints)
4180 }
4181 for podUID := range expectedEndpoints {
4182 if _, ok := ep[podUID]; !ok {
4183 return fmt.Errorf("endpoint %v not found", podUID)
4184 }
4185 if len(ep[podUID]) != len(expectedEndpoints[podUID]) {
4186 return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
4187 }
4188 sort.Ints(ep[podUID])
4189 sort.Ints(expectedEndpoints[podUID])
4190 for index := range ep[podUID] {
4191 if ep[podUID][index] != expectedEndpoints[podUID][index] {
4192 return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
4193 }
4194 }
4195 }
4196 return nil
4197 }
4198
4199 func translatePodNameToUID(ctx context.Context, c clientset.Interface, ns string, expectedEndpoints portsByPodName) (portsByPodUID, error) {
4200 portsByUID := make(portsByPodUID)
4201 for name, portList := range expectedEndpoints {
4202 pod, err := c.CoreV1().Pods(ns).Get(ctx, name, metav1.GetOptions{})
4203 if err != nil {
4204 return nil, fmt.Errorf("failed to get pod %s, that's pretty weird. validation failed: %w", name, err)
4205 }
4206 portsByUID[pod.ObjectMeta.UID] = portList
4207 }
4208 return portsByUID, nil
4209 }
4210
4211
4212 func validateEndpointsPortsOrFail(ctx context.Context, c clientset.Interface, namespace, serviceName string, expectedEndpoints portsByPodName) {
4213 ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", framework.ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
4214 expectedPortsByPodUID, err := translatePodNameToUID(ctx, c, namespace, expectedEndpoints)
4215 framework.ExpectNoError(err, "failed to translate pod name to UID, ns:%s, expectedEndpoints:%v", namespace, expectedEndpoints)
4216
4217 var (
4218 pollErr error
4219 i = 0
4220 )
4221 if pollErr = wait.PollImmediate(time.Second, framework.ServiceStartTimeout, func() (bool, error) {
4222 i++
4223
4224 ep, err := c.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
4225 if err != nil {
4226 framework.Logf("Failed go get Endpoints object: %v", err)
4227
4228 return false, nil
4229 }
4230 portsByUID := portsByPodUID(e2eendpoints.GetContainerPortsByPodUID(ep))
4231 if err := validatePorts(portsByUID, expectedPortsByPodUID); err != nil {
4232 if i%5 == 0 {
4233 framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
4234 }
4235 return false, nil
4236 }
4237
4238
4239
4240 if _, err := c.Discovery().ServerResourcesForGroupVersion(discoveryv1.SchemeGroupVersion.String()); err == nil {
4241 opts := metav1.ListOptions{
4242 LabelSelector: "kubernetes.io/service-name=" + serviceName,
4243 }
4244 es, err := c.DiscoveryV1().EndpointSlices(namespace).List(ctx, opts)
4245 if err != nil {
4246 framework.Logf("Failed go list EndpointSlice objects: %v", err)
4247
4248 return false, nil
4249 }
4250 portsByUID = portsByPodUID(e2eendpointslice.GetContainerPortsByPodUID(es.Items))
4251 if err := validatePorts(portsByUID, expectedPortsByPodUID); err != nil {
4252 if i%5 == 0 {
4253 framework.Logf("Unexpected endpoint slices: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
4254 }
4255 return false, nil
4256 }
4257 }
4258 framework.Logf("successfully validated that service %s in namespace %s exposes endpoints %v",
4259 serviceName, namespace, expectedEndpoints)
4260 return true, nil
4261 }); pollErr != nil {
4262 if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{}); err == nil {
4263 for _, pod := range pods.Items {
4264 framework.Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
4265 }
4266 } else {
4267 framework.Logf("Can't list pod debug info: %v", err)
4268 }
4269 }
4270 framework.ExpectNoError(pollErr, "error waithing for service %s in namespace %s to expose endpoints %v: %v", serviceName, namespace, expectedEndpoints)
4271 }
4272
4273 func restartApiserver(ctx context.Context, namespace string, cs clientset.Interface) error {
4274 if framework.ProviderIs("gke") {
4275
4276 v, err := cs.Discovery().ServerVersion()
4277 if err != nil {
4278 return err
4279 }
4280 return e2eproviders.MasterUpgradeGKE(ctx, namespace, v.GitVersion[1:])
4281 }
4282
4283 return restartComponent(ctx, cs, kubeAPIServerLabelName, metav1.NamespaceSystem, map[string]string{clusterComponentKey: kubeAPIServerLabelName})
4284 }
4285
4286
4287 func restartComponent(ctx context.Context, cs clientset.Interface, cName, ns string, matchLabels map[string]string) error {
4288 pods, err := e2epod.GetPods(ctx, cs, ns, matchLabels)
4289 if err != nil {
4290 return fmt.Errorf("failed to get %s's pods, err: %w", cName, err)
4291 }
4292 if len(pods) == 0 {
4293 return fmt.Errorf("%s pod count is 0", cName)
4294 }
4295
4296 if err := e2epod.DeletePodsWithGracePeriod(ctx, cs, pods, 0); err != nil {
4297 return fmt.Errorf("failed to restart component: %s, err: %w", cName, err)
4298 }
4299
4300 _, err = e2epod.PodsCreatedByLabel(ctx, cs, ns, cName, int32(len(pods)), labels.SelectorFromSet(matchLabels))
4301 return err
4302 }
4303
4304
4305 func validateEndpointsPortsWithProtocolsOrFail(c clientset.Interface, namespace, serviceName string, expectedEndpoints fullPortsByPodName) {
4306 ginkgo.By(fmt.Sprintf("waiting up to %v for service %s in namespace %s to expose endpoints %v", framework.ServiceStartTimeout, serviceName, namespace, expectedEndpoints))
4307 expectedPortsByPodUID, err := translatePortsByPodNameToPortsByPodUID(c, namespace, expectedEndpoints)
4308 framework.ExpectNoError(err, "failed to translate pod name to UID, ns:%s, expectedEndpoints:%v", namespace, expectedEndpoints)
4309
4310 var (
4311 pollErr error
4312 i = 0
4313 )
4314 if pollErr = wait.PollImmediate(time.Second, framework.ServiceStartTimeout, func() (bool, error) {
4315 i++
4316
4317 ep, err := c.CoreV1().Endpoints(namespace).Get(context.TODO(), serviceName, metav1.GetOptions{})
4318 if err != nil {
4319 framework.Logf("Failed go get Endpoints object: %v", err)
4320
4321 return false, nil
4322 }
4323 portsByUID := fullPortsByPodUID(e2eendpoints.GetFullContainerPortsByPodUID(ep))
4324 if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil {
4325 if i%5 == 0 {
4326 framework.Logf("Unexpected endpoints: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
4327 }
4328 return false, nil
4329 }
4330
4331
4332
4333 if _, err := c.Discovery().ServerResourcesForGroupVersion(discoveryv1.SchemeGroupVersion.String()); err == nil {
4334 opts := metav1.ListOptions{
4335 LabelSelector: "kubernetes.io/service-name=" + serviceName,
4336 }
4337 es, err := c.DiscoveryV1().EndpointSlices(namespace).List(context.TODO(), opts)
4338 if err != nil {
4339 framework.Logf("Failed go list EndpointSlice objects: %v", err)
4340
4341 return false, nil
4342 }
4343 portsByUID = fullPortsByPodUID(e2eendpointslice.GetFullContainerPortsByPodUID(es.Items))
4344 if err := validatePortsAndProtocols(portsByUID, expectedPortsByPodUID); err != nil {
4345 if i%5 == 0 {
4346 framework.Logf("Unexpected endpoint slices: found %v, expected %v, will retry", portsByUID, expectedEndpoints)
4347 }
4348 return false, nil
4349 }
4350 }
4351 framework.Logf("successfully validated that service %s in namespace %s exposes endpoints %v",
4352 serviceName, namespace, expectedEndpoints)
4353 return true, nil
4354 }); pollErr != nil {
4355 if pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{}); err == nil {
4356 for _, pod := range pods.Items {
4357 framework.Logf("Pod %s\t%s\t%s\t%s", pod.Namespace, pod.Name, pod.Spec.NodeName, pod.DeletionTimestamp)
4358 }
4359 } else {
4360 framework.Logf("Can't list pod debug info: %v", err)
4361 }
4362 }
4363 framework.ExpectNoError(pollErr, "error waithing for service %s in namespace %s to expose endpoints %v: %v", serviceName, namespace, expectedEndpoints)
4364 }
4365
4366 func translatePortsByPodNameToPortsByPodUID(c clientset.Interface, ns string, expectedEndpoints fullPortsByPodName) (fullPortsByPodUID, error) {
4367 portsByUID := make(fullPortsByPodUID)
4368 for name, portList := range expectedEndpoints {
4369 pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), name, metav1.GetOptions{})
4370 if err != nil {
4371 return nil, fmt.Errorf("failed to get pod %s, that's pretty weird. validation failed: %w", name, err)
4372 }
4373 portsByUID[pod.ObjectMeta.UID] = portList
4374 }
4375 return portsByUID, nil
4376 }
4377
4378 func validatePortsAndProtocols(ep, expectedEndpoints fullPortsByPodUID) error {
4379 if len(ep) != len(expectedEndpoints) {
4380
4381 return fmt.Errorf("invalid number of endpoints got %v, expected %v", ep, expectedEndpoints)
4382 }
4383 for podUID := range expectedEndpoints {
4384 if _, ok := ep[podUID]; !ok {
4385 return fmt.Errorf("endpoint %v not found", podUID)
4386 }
4387 if len(ep[podUID]) != len(expectedEndpoints[podUID]) {
4388 return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
4389 }
4390 var match bool
4391 for _, epPort := range ep[podUID] {
4392 match = false
4393 for _, expectedPort := range expectedEndpoints[podUID] {
4394 if epPort.ContainerPort == expectedPort.ContainerPort && epPort.Protocol == expectedPort.Protocol {
4395 match = true
4396 }
4397 }
4398 if !match {
4399 return fmt.Errorf("invalid list of ports for uid %v. Got %v, expected %v", podUID, ep[podUID], expectedEndpoints[podUID])
4400 }
4401 }
4402 }
4403 return nil
4404 }
4405
View as plain text