...

Source file src/k8s.io/kubernetes/test/e2e/network/traffic_distribution.go

Documentation: k8s.io/kubernetes/test/e2e/network

     1  /*
     2  Copyright 2024 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package network
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"slices"
    23  	"strings"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	discoveryv1 "k8s.io/api/discovery/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/util/intstr"
    30  	clientset "k8s.io/client-go/kubernetes"
    31  	"k8s.io/kubernetes/test/e2e/feature"
    32  	"k8s.io/kubernetes/test/e2e/framework"
    33  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    34  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    35  	e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
    36  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    37  	"k8s.io/kubernetes/test/e2e/network/common"
    38  	"k8s.io/kubernetes/test/utils/format"
    39  	admissionapi "k8s.io/pod-security-admission/api"
    40  
    41  	"github.com/onsi/ginkgo/v2"
    42  	"github.com/onsi/gomega"
    43  )
    44  
    45  var _ = common.SIGDescribe(feature.TrafficDistribution, func() {
    46  	f := framework.NewDefaultFramework("traffic-distribution")
    47  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    48  
    49  	var c clientset.Interface
    50  
    51  	ginkgo.BeforeEach(func(ctx context.Context) {
    52  		c = f.ClientSet
    53  		e2eskipper.SkipUnlessMultizone(ctx, c)
    54  	})
    55  
    56  	////////////////////////////////////////////////////////////////////////////
    57  	// Helper functions
    58  	////////////////////////////////////////////////////////////////////////////
    59  
    60  	// endpointSlicesForService returns a helper function to be used with
    61  	// gomega.Eventually(...). It fetches the EndpointSlices for the given
    62  	// serviceName.
    63  	endpointSlicesForService := func(serviceName string) any {
    64  		return func(ctx context.Context) ([]discoveryv1.EndpointSlice, error) {
    65  			slices, err := c.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, serviceName)})
    66  			if err != nil {
    67  				return nil, err
    68  			}
    69  			return slices.Items, nil
    70  		}
    71  	}
    72  
    73  	// gomegaCustomError constructs a function that can be returned from a gomega
    74  	// matcher to report an error.
    75  	gomegaCustomError := func(format string, a ...any) func() string {
    76  		return func() string {
    77  			return fmt.Sprintf(format, a...)
    78  		}
    79  	}
    80  
    81  	// endpointSlicesHaveSameZoneHints returns a matcher function to be used with
    82  	// gomega.Eventually().Should(...). It checks that the passed EndpointSlices
    83  	// have zone-hints which match the endpoint's zone.
    84  	endpointSlicesHaveSameZoneHints := framework.MakeMatcher(func(slices []discoveryv1.EndpointSlice) (func() string, error) {
    85  		if len(slices) == 0 {
    86  			return nil, fmt.Errorf("no endpointslices found")
    87  		}
    88  		for _, slice := range slices {
    89  			for _, endpoint := range slice.Endpoints {
    90  				var ip string
    91  				if len(endpoint.Addresses) > 0 {
    92  					ip = endpoint.Addresses[0]
    93  				}
    94  				var zone string
    95  				if endpoint.Zone != nil {
    96  					zone = *endpoint.Zone
    97  				}
    98  				if endpoint.Hints == nil || len(endpoint.Hints.ForZones) != 1 || endpoint.Hints.ForZones[0].Name != zone {
    99  					return gomegaCustomError("endpoint with ip %v does not have the correct hint, want hint for zone %q\nEndpointSlices=\n%v", ip, zone, format.Object(slices, 1 /* indent one level */)), nil
   100  				}
   101  			}
   102  		}
   103  		return nil, nil
   104  	})
   105  
   106  	// requestsFromClient returns a helper function to be used with
   107  	// gomega.Eventually(...). It fetches the logs from the clientPod and returns
   108  	// them in reverse-chronological order.
   109  	requestsFromClient := func(clientPod *v1.Pod) any {
   110  		return func(ctx context.Context) (reverseChronologicalLogLines []string, err error) {
   111  			logs, err := e2epod.GetPodLogs(ctx, c, f.Namespace.Name, clientPod.Name, clientPod.Spec.Containers[0].Name)
   112  			if err != nil {
   113  				return nil, err
   114  			}
   115  			logLines := strings.Split(logs, "\n")
   116  			slices.Reverse(logLines)
   117  			return logLines, nil
   118  		}
   119  	}
   120  
   121  	////////////////////////////////////////////////////////////////////////////
   122  	// Main test specifications.
   123  	////////////////////////////////////////////////////////////////////////////
   124  
   125  	ginkgo.When("Service has trafficDistribution=PreferClose", func() {
   126  		ginkgo.It("should route traffic to an endpoint that is close to the client", func(ctx context.Context) {
   127  
   128  			ginkgo.By("finding 3 zones with schedulable nodes")
   129  			allZonesSet, err := e2enode.GetSchedulableClusterZones(ctx, c)
   130  			framework.ExpectNoError(err)
   131  			if len(allZonesSet) < 3 {
   132  				framework.Failf("got %d zones with schedulable nodes, want atleast 3 zones with schedulable nodes", len(allZonesSet))
   133  			}
   134  			zones := allZonesSet.UnsortedList()[:3]
   135  
   136  			ginkgo.By(fmt.Sprintf("finding a node in each of the chosen 3 zones %v", zones))
   137  			nodeList, err := e2enode.GetReadySchedulableNodes(ctx, c)
   138  			framework.ExpectNoError(err)
   139  			nodeForZone := make(map[string]string)
   140  			for _, zone := range zones {
   141  				found := false
   142  				for _, node := range nodeList.Items {
   143  					if zone == node.Labels[v1.LabelTopologyZone] {
   144  						found = true
   145  						nodeForZone[zone] = node.GetName()
   146  					}
   147  				}
   148  				if !found {
   149  					framework.Failf("could not find a node in zone %q; nodes=\n%v", zone, format.Object(nodeList, 1 /* indent one level */))
   150  				}
   151  			}
   152  
   153  			ginkgo.By(fmt.Sprintf("creating 1 pod each in 2 zones %v (out of the total 3 zones)", zones[:2]))
   154  			zoneForServingPod := make(map[string]string)
   155  			var servingPods []*v1.Pod
   156  			servingPodLabels := map[string]string{"app": f.UniqueName}
   157  			for _, zone := range zones[:2] {
   158  				pod := e2epod.NewAgnhostPod(f.Namespace.Name, "serving-pod-in-"+zone, nil, nil, nil, "serve-hostname")
   159  				nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
   160  				e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
   161  				pod.Labels = servingPodLabels
   162  
   163  				servingPods = append(servingPods, pod)
   164  				zoneForServingPod[pod.Name] = zone
   165  				ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{})
   166  			}
   167  			e2epod.NewPodClient(f).CreateBatch(ctx, servingPods)
   168  
   169  			trafficDist := v1.ServiceTrafficDistributionPreferClose
   170  			svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{
   171  				ObjectMeta: metav1.ObjectMeta{
   172  					Name: "traffic-dist-test-service",
   173  				},
   174  				Spec: v1.ServiceSpec{
   175  					Selector:            servingPodLabels,
   176  					TrafficDistribution: &trafficDist,
   177  					Ports: []v1.ServicePort{{
   178  						Port:       80,
   179  						TargetPort: intstr.FromInt32(9376),
   180  						Protocol:   v1.ProtocolTCP,
   181  					}},
   182  				},
   183  			})
   184  			ginkgo.By(fmt.Sprintf("creating a service=%q with trafficDistribution=%v", svc.GetName(), *svc.Spec.TrafficDistribution))
   185  			ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Services(f.Namespace.Name).Delete), svc.GetName(), metav1.DeleteOptions{})
   186  
   187  			ginkgo.By("ensuring EndpointSlice for service have correct same-zone hints")
   188  			gomega.Eventually(ctx, endpointSlicesForService(svc.GetName())).WithPolling(5 * time.Second).WithTimeout(e2eservice.ServiceEndpointsTimeout).Should(endpointSlicesHaveSameZoneHints)
   189  
   190  			ginkgo.By("keeping traffic within the same zone as the client, when serving pods exist in the same zone")
   191  
   192  			createClientPod := func(ctx context.Context, zone string) *v1.Pod {
   193  				pod := e2epod.NewAgnhostPod(f.Namespace.Name, "client-pod-in-"+zone, nil, nil, nil)
   194  				pod.Spec.NodeName = nodeForZone[zone]
   195  				nodeSelection := e2epod.NodeSelection{Name: nodeForZone[zone]}
   196  				e2epod.SetNodeSelection(&pod.Spec, nodeSelection)
   197  				cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
   198  				pod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
   199  				pod.Spec.Containers[0].Name = pod.Name
   200  
   201  				ginkgo.DeferCleanup(framework.IgnoreNotFound(c.CoreV1().Pods(f.Namespace.Name).Delete), pod.GetName(), metav1.DeleteOptions{})
   202  				return e2epod.NewPodClient(f).CreateSync(ctx, pod)
   203  			}
   204  
   205  			for _, clientZone := range zones[:2] {
   206  				framework.Logf("creating a client pod for probing the service from zone=%q which also has a serving pod", clientZone)
   207  				clientPod := createClientPod(ctx, clientZone)
   208  
   209  				framework.Logf("ensuring that requests from clientPod=%q on zone=%q stay in the same zone", clientPod.Name, clientZone)
   210  
   211  				requestsSucceedAndStayInSameZone := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
   212  					logLines := reverseChronologicalLogLines
   213  					if len(logLines) < 20 {
   214  						return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
   215  					}
   216  					consecutiveSameZone := 0
   217  
   218  					for _, logLine := range logLines {
   219  						if logLine == "" || strings.HasPrefix(logLine, "Date:") {
   220  							continue
   221  						}
   222  						destZone, ok := zoneForServingPod[logLine]
   223  						if !ok {
   224  							return gomegaCustomError("could not determine dest zone from log line: %s\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
   225  						}
   226  						if clientZone != destZone {
   227  							return gomegaCustomError("expected request from clientPod=%q to stay in it's zone=%q, delivered to zone=%q\nreverseChronologicalLogLines=\n%v", clientPod.Name, clientZone, destZone, strings.Join(reverseChronologicalLogLines, "\n")), nil
   228  						}
   229  						consecutiveSameZone++
   230  						if consecutiveSameZone >= 10 {
   231  							return nil, nil // Pass condition.
   232  						}
   233  					}
   234  					// Ideally, the matcher would never reach this condition
   235  					return gomegaCustomError("requests didn't meet the required criteria to stay in same zone\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
   236  				})
   237  
   238  				gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedAndStayInSameZone)
   239  			}
   240  
   241  			ginkgo.By("routing traffic cluster-wide, when there are no serving pods in the same zone as the client")
   242  
   243  			clientZone := zones[2]
   244  			framework.Logf("creating a client pod for probing the service from zone=%q which DOES NOT has a serving pod", clientZone)
   245  			clientPod := createClientPod(ctx, clientZone)
   246  
   247  			framework.Logf("ensuring that requests from clientPod=%q on zone=%q (without a serving pod) are not dropped, and get routed to one of the serving pods anywhere in the cluster", clientPod.Name, clientZone)
   248  
   249  			requestsSucceedByReachingAnyServingPod := framework.MakeMatcher(func(reverseChronologicalLogLines []string) (func() string, error) {
   250  				logLines := reverseChronologicalLogLines
   251  				if len(logLines) < 20 {
   252  					return gomegaCustomError("got %d log lines, waiting for at least 20\nreverseChronologicalLogLines=\n%v", len(logLines), strings.Join(reverseChronologicalLogLines, "\n")), nil
   253  				}
   254  
   255  				// Requests are counted as successful when the response read from the log
   256  				// lines is the name of a recognizable serving pod.
   257  				consecutiveSuccessfulRequests := 0
   258  
   259  				for _, logLine := range logLines {
   260  					if logLine == "" || strings.HasPrefix(logLine, "Date:") {
   261  						continue
   262  					}
   263  					_, servingPodExists := zoneForServingPod[logLine]
   264  					if !servingPodExists {
   265  						return gomegaCustomError("request from client pod likely failed because we got an unrecognizable response = %v; want response to be one of the serving pod names\nreverseChronologicalLogLines=\n%v", logLine, strings.Join(reverseChronologicalLogLines, "\n")), nil
   266  					}
   267  					consecutiveSuccessfulRequests++
   268  					if consecutiveSuccessfulRequests >= 10 {
   269  						return nil, nil // Pass condition
   270  					}
   271  				}
   272  				// Ideally, the matcher would never reach this condition
   273  				return gomegaCustomError("requests didn't meet the required criteria to reach a serving pod\nreverseChronologicalLogLines=\n%v", strings.Join(reverseChronologicalLogLines, "\n")), nil
   274  			})
   275  
   276  			gomega.Eventually(ctx, requestsFromClient(clientPod)).WithPolling(5 * time.Second).WithTimeout(e2eservice.KubeProxyLagTimeout).Should(requestsSucceedByReachingAnyServingPod)
   277  
   278  		})
   279  
   280  	})
   281  })
   282  

View as plain text