     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package network
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"strings"
    23  	"time"
    25  	"github.com/onsi/ginkgo/v2"
    26  	v1 "k8s.io/api/core/v1"
    27  	discoveryv1 "k8s.io/api/discovery/v1"
    28  	"k8s.io/apimachinery/pkg/api/resource"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/util/intstr"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	clientset "k8s.io/client-go/kubernetes"
    33  	"k8s.io/kubernetes/test/e2e/feature"
    34  	"k8s.io/kubernetes/test/e2e/framework"
    35  	e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
    36  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    37  	e2eservice "k8s.io/kubernetes/test/e2e/framework/service"
    38  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    39  	"k8s.io/kubernetes/test/e2e/network/common"
    40  	imageutils "k8s.io/kubernetes/test/utils/image"
    41  	admissionapi "k8s.io/pod-security-admission/api"
    42  )
    44  var _ = common.SIGDescribe(feature.TopologyHints, func() {
    45  	f := framework.NewDefaultFramework("topology-hints")
    46  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    48  	// filled in BeforeEach
    49  	var c clientset.Interface
    51  	ginkgo.BeforeEach(func(ctx context.Context) {
    52  		c = f.ClientSet
    53  		e2eskipper.SkipUnlessMultizone(ctx, c)
    54  	})
    56  	ginkgo.It("should distribute endpoints evenly", func(ctx context.Context) {
    57  		portNum := int32(9376)
    58  		thLabels := map[string]string{labelKey: clientLabelValue}
    59  		img := imageutils.GetE2EImage(imageutils.Agnhost)
    60  		ports := []v1.ContainerPort{{ContainerPort: int32(portNum)}}
    61  		dsConf := e2edaemonset.NewDaemonSet("topology-serve-hostname", img, thLabels, nil, nil, ports, "serve-hostname")
    62  		ds, err := c.AppsV1().DaemonSets(f.Namespace.Name).Create(ctx, dsConf, metav1.CreateOptions{})
    63  		framework.ExpectNoError(err, "error creating DaemonSet")
    65  		svc := createServiceReportErr(ctx, c, f.Namespace.Name, &v1.Service{
    66  			ObjectMeta: metav1.ObjectMeta{
    67  				Name: "topology-hints",
    68  				Annotations: map[string]string{
    69  					v1.AnnotationTopologyMode: "Auto",
    70  				},
    71  			},
    72  			Spec: v1.ServiceSpec{
    73  				Selector:                 thLabels,
    74  				PublishNotReadyAddresses: true,
    75  				Ports: []v1.ServicePort{{
    76  					Name:       "example",
    77  					Port:       80,
    78  					TargetPort: intstr.FromInt32(portNum),
    79  					Protocol:   v1.ProtocolTCP,
    80  				}},
    81  			},
    82  		})
    84  		err = wait.PollWithContext(ctx, 5*time.Second, framework.PodStartTimeout, func(ctx context.Context) (bool, error) {
    85  			return e2edaemonset.CheckRunningOnAllNodes(ctx, f, ds)
    86  		})
    87  		framework.ExpectNoError(err, "timed out waiting for DaemonSets to be ready")
    89  		// All Nodes should have same allocatable CPUs. If not, then skip the test.
    90  		schedulableNodes := map[string]*v1.Node{}
    91  		for _, nodeName := range e2edaemonset.SchedulableNodes(ctx, c, ds) {
    92  			schedulableNodes[nodeName] = nil
    93  		}
    95  		nodeList, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
    96  		framework.ExpectNoError(err, "Error when listing all Nodes")
    97  		var lastNodeCPU resource.Quantity
    98  		firstNode := true
    99  		for i := range nodeList.Items {
   100  			node := nodeList.Items[i]
   101  			if _, ok := schedulableNodes[node.Name]; !ok {
   102  				continue
   103  			}
   104  			schedulableNodes[node.Name] = &node
   106  			nodeCPU, found := node.Status.Allocatable[v1.ResourceCPU]
   107  			if !found {
   108  				framework.Failf("Error when getting allocatable CPU of Node '%s'", node.Name)
   109  			}
   110  			if firstNode {
   111  				lastNodeCPU = nodeCPU
   112  				firstNode = false
   113  			} else if !nodeCPU.Equal(lastNodeCPU) {
   114  				e2eskipper.Skipf("Expected Nodes to have equivalent allocatable CPUs, but Node '%s' is different from the previous one. %d not equals %d",
   115  					node.Name, nodeCPU.Value(), lastNodeCPU.Value())
   116  			}
   117  		}
   119  		framework.Logf("Waiting for %d endpoints to be tracked in EndpointSlices", len(schedulableNodes))
   121  		var finalSlices []discoveryv1.EndpointSlice
   122  		err = wait.PollWithContext(ctx, 5*time.Second, 3*time.Minute, func(ctx context.Context) (bool, error) {
   123  			slices, listErr := c.DiscoveryV1().EndpointSlices(f.Namespace.Name).List(ctx, metav1.ListOptions{LabelSelector: fmt.Sprintf("%s=%s", discoveryv1.LabelServiceName, svc.Name)})
   124  			if listErr != nil {
   125  				return false, listErr
   126  			}
   128  			numEndpoints := 0
   129  			for _, slice := range slices.Items {
   130  				numEndpoints += len(slice.Endpoints)
   131  			}
   132  			if len(schedulableNodes) > numEndpoints {
   133  				framework.Logf("Expected %d endpoints, got %d", len(schedulableNodes), numEndpoints)
   134  				return false, nil
   135  			}
   137  			finalSlices = slices.Items
   138  			return true, nil
   139  		})
   140  		framework.ExpectNoError(err, "timed out waiting for EndpointSlices to be ready")
   142  		ginkgo.By("having hints set for each endpoint")
   143  		for _, slice := range finalSlices {
   144  			for _, ep := range slice.Endpoints {
   145  				if ep.Zone == nil {
   146  					framework.Failf("Expected endpoint in %s to have zone: %v", slice.Name, ep)
   147  				}
   148  				if ep.Hints == nil || len(ep.Hints.ForZones) == 0 {
   149  					framework.Failf("Expected endpoint in %s to have hints: %v", slice.Name, ep)
   150  				}
   151  				if len(ep.Hints.ForZones) > 1 {
   152  					framework.Failf("Expected endpoint in %s to have exactly 1 zone hint, got %d: %v", slice.Name, len(ep.Hints.ForZones), ep)
   153  				}
   154  				if *ep.Zone != ep.Hints.ForZones[0].Name {
   155  					framework.Failf("Expected endpoint in %s to have same zone hint, got %s: %v", slice.Name, *ep.Zone, ep)
   156  				}
   157  			}
   158  		}
   160  		nodesByZone := map[string]string{}
   161  		zonesWithNode := map[string]string{}
   162  		for _, node := range schedulableNodes {
   163  			if zone, ok := node.Labels[v1.LabelTopologyZone]; ok {
   164  				nodesByZone[node.Name] = zone
   165  				zonesWithNode[zone] = node.Name
   166  			}
   167  		}
   169  		podList, err := c.CoreV1().Pods(f.Namespace.Name).List(ctx, metav1.ListOptions{})
   170  		framework.ExpectNoError(err)
   171  		podsByZone := map[string]string{}
   172  		for _, pod := range podList.Items {
   173  			if zone, ok := nodesByZone[pod.Spec.NodeName]; ok {
   174  				podsByZone[pod.Name] = zone
   175  			}
   176  		}
   178  		ginkgo.By("keeping requests in the same zone")
   179  		for fromZone, nodeName := range zonesWithNode {
   180  			ginkgo.By("creating a client pod for probing the service from " + fromZone)
   181  			podName := "curl-from-" + fromZone
   182  			clientPod := e2epod.NewAgnhostPod(f.Namespace.Name, podName, nil, nil, nil, "serve-hostname")
   183  			nodeSelection := e2epod.NodeSelection{Name: nodeName}
   184  			e2epod.SetNodeSelection(&clientPod.Spec, nodeSelection)
   185  			cmd := fmt.Sprintf(`date; for i in $(seq 1 3000); do sleep 1; echo "Date: $(date) Try: ${i}"; curl -q -s --connect-timeout 2 http://%s:80/ ; echo; done`, svc.Name)
   186  			clientPod.Spec.Containers[0].Command = []string{"/bin/sh", "-c", cmd}
   187  			clientPod.Spec.Containers[0].Name = clientPod.Name
   188  			e2epod.NewPodClient(f).CreateSync(ctx, clientPod)
   190  			framework.Logf("Ensuring that requests from %s pod on %s node stay in %s zone", clientPod.Name, nodeName, fromZone)
   192  			var logs string
   193  			if pollErr := wait.PollWithContext(ctx, 5*time.Second, e2eservice.KubeProxyLagTimeout, func(ctx context.Context) (bool, error) {
   194  				var err error
   195  				logs, err = e2epod.GetPodLogs(ctx, c, f.Namespace.Name, clientPod.Name, clientPod.Name)
   196  				framework.ExpectNoError(err)
   197  				framework.Logf("Pod client logs: %s", logs)
   199  				logLines := strings.Split(logs, "\n")
   200  				if len(logLines) < 6 {
   201  					framework.Logf("only %d log lines, waiting for at least 6", len(logLines))
   202  					return false, nil
   203  				}
   205  				consecutiveSameZone := 0
   207  				for i := len(logLines) - 1; i > 0; i-- {
   208  					if logLines[i] == "" || strings.HasPrefix(logLines[i], "Date:") {
   209  						continue
   210  					}
   211  					destZone, ok := podsByZone[logLines[i]]
   212  					if !ok {
   213  						framework.Logf("could not determine dest zone from log line: %s", logLines[i])
   214  						return false, nil
   215  					}
   216  					if fromZone != destZone {
   217  						framework.Logf("expected request from %s to stay in %s zone, delivered to %s zone", clientPod.Name, fromZone, destZone)
   218  						return false, nil
   219  					}
   220  					consecutiveSameZone++
   221  					if consecutiveSameZone >= 5 {
   222  						return true, nil
   223  					}
   224  				}
   226  				return false, nil
   227  			}); pollErr != nil {
   228  				framework.Failf("expected 5 consecutive requests from %s to stay in zone %s within %v, stdout: %v", clientPod.Name, fromZone, e2eservice.KubeProxyLagTimeout, logs)
   229  			}
   230  		}
   231  	})
   232  })

