    17  package scheduling
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math"
    23  	"time"
    25  	"github.com/onsi/ginkgo/v2"
    26  	"github.com/onsi/gomega"
    27  	v1 "k8s.io/api/core/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/labels"
    30  	"k8s.io/apimachinery/pkg/util/intstr"
    31  	"k8s.io/apimachinery/pkg/util/sets"
    32  	"k8s.io/apimachinery/pkg/util/uuid"
    33  	clientset "k8s.io/client-go/kubernetes"
    34  	"k8s.io/kubernetes/test/e2e/framework"
    35  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    36  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    37  	e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
    38  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    39  	testutils "k8s.io/kubernetes/test/utils"
    40  	imageutils "k8s.io/kubernetes/test/utils/image"
    41  	admissionapi "k8s.io/pod-security-admission/api"
    42  )
    44  var _ = SIGDescribe("Multi-AZ Clusters", func() {
    45  	f := framework.NewDefaultFramework("multi-az")
    46  	f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
    47  	var zoneCount int
    48  	var err error
    49  	var zoneNames sets.Set[string]
    50  	ginkgo.BeforeEach(func(ctx context.Context) {
    51  		cs := f.ClientSet
    53  		if zoneCount <= 0 {
    54  			zoneNames, err = e2enode.GetSchedulableClusterZones(ctx, cs)
    55  			framework.ExpectNoError(err)
    56  			zoneCount = len(zoneNames)
    57  		}
    58  		ginkgo.By(fmt.Sprintf("Checking for multi-zone cluster. Schedulable zone count = %d", zoneCount))
    59  		msg := fmt.Sprintf("Schedulable zone count is %d, only run for multi-zone clusters, skipping test", zoneCount)
    60  		e2eskipper.SkipUnlessAtLeast(zoneCount, 2, msg)
    61  		// TODO: SkipUnlessDefaultScheduler() // Non-default schedulers might not spread
    63  		e2enode.WaitForTotalHealthy(ctx, cs, time.Minute)
    64  		nodeList, err := e2enode.GetReadySchedulableNodes(ctx, cs)
    65  		framework.ExpectNoError(err)
    67  		// make the nodes have balanced cpu,mem usage
    68  		err = createBalancedPodForNodes(ctx, f, cs, f.Namespace.Name, nodeList.Items, podRequestedResource, 0.0)
    69  		framework.ExpectNoError(err)
    70  	})
    71  	f.It("should spread the pods of a service across zones", f.WithSerial(), func(ctx context.Context) {
    72  		SpreadServiceOrFail(ctx, f, 5*zoneCount, zoneNames, imageutils.GetPauseImageName())
    73  	})
    75  	f.It("should spread the pods of a replication controller across zones", f.WithSerial(), func(ctx context.Context) {
    76  		SpreadRCOrFail(ctx, f, int32(5*zoneCount), zoneNames, framework.ServeHostnameImage, []string{"serve-hostname"})
    77  	})
    78  })
    80  // SpreadServiceOrFail check that the pods comprising a service
    81  // get spread evenly across available zones
    82  func SpreadServiceOrFail(ctx context.Context, f *framework.Framework, replicaCount int, zoneNames sets.Set[string], image string) {
    83  	// First create the service
    84  	serviceName := "test-service"
    85  	serviceSpec := &v1.Service{
    86  		ObjectMeta: metav1.ObjectMeta{
    87  			Name:      serviceName,
    88  			Namespace: f.Namespace.Name,
    89  		},
    90  		Spec: v1.ServiceSpec{
    91  			Selector: map[string]string{
    92  				"service": serviceName,
    93  			},
    94  			Ports: []v1.ServicePort{{
    95  				Port:       80,
    96  				TargetPort: intstr.FromInt32(80),
    97  			}},
    98  		},
    99  	}
   100  	_, err := f.ClientSet.CoreV1().Services(f.Namespace.Name).Create(ctx, serviceSpec, metav1.CreateOptions{})
   101  	framework.ExpectNoError(err)
   103  	// Now create some pods behind the service
   104  	podSpec := &v1.Pod{
   105  		ObjectMeta: metav1.ObjectMeta{
   106  			Name:   serviceName,
   107  			Labels: map[string]string{"service": serviceName},
   108  		},
   109  		Spec: v1.PodSpec{
   110  			Containers: []v1.Container{
   111  				{
   112  					Name:  "test",
   113  					Image: image,
   114  				},
   115  			},
   116  		},
   117  	}
   119  	// Caution: StartPods requires at least one pod to replicate.
   120  	// Based on the callers, replicas is always positive number: zoneCount >= 0 implies (2*zoneCount)+1 > 0.
   121  	// Thus, no need to test for it. Once the precondition changes to zero number of replicas,
   122  	// test for replicaCount > 0. Otherwise, StartPods panics.
   123  	framework.ExpectNoError(testutils.StartPods(f.ClientSet, replicaCount, f.Namespace.Name, serviceName, *podSpec, false, framework.Logf))
   125  	// Wait for all of them to be scheduled
   126  	selector := labels.SelectorFromSet(labels.Set(map[string]string{"service": serviceName}))
   127  	pods, err := e2epod.WaitForPodsWithLabelScheduled(ctx, f.ClientSet, f.Namespace.Name, selector)
   128  	framework.ExpectNoError(err)
   130  	// Now make sure they're spread across zones
   131  	checkZoneSpreading(ctx, f.ClientSet, pods, sets.List(zoneNames))
   132  }
   134  // Find the name of the zone in which a Node is running
   135  func getZoneNameForNode(node v1.Node) (string, error) {
   136  	if z, ok := node.Labels[v1.LabelFailureDomainBetaZone]; ok {
   137  		return z, nil
   138  	} else if z, ok := node.Labels[v1.LabelTopologyZone]; ok {
   139  		return z, nil
   140  	}
   141  	return "", fmt.Errorf("node %s doesn't have zone label %s or %s",
   142  		node.Name, v1.LabelFailureDomainBetaZone, v1.LabelTopologyZone)
   143  }
   145  // Find the name of the zone in which the pod is scheduled
   146  func getZoneNameForPod(ctx context.Context, c clientset.Interface, pod v1.Pod) (string, error) {
   147  	ginkgo.By(fmt.Sprintf("Getting zone name for pod %s, on node %s", pod.Name, pod.Spec.NodeName))
   148  	node, err := c.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
   149  	framework.ExpectNoError(err)
   150  	return getZoneNameForNode(*node)
   151  }
   153  // Determine whether a set of pods are approximately evenly spread
   154  // across a given set of zones
   155  func checkZoneSpreading(ctx context.Context, c clientset.Interface, pods *v1.PodList, zoneNames []string) {
   156  	podsPerZone := make(map[string]int)
   157  	for _, zoneName := range zoneNames {
   158  		podsPerZone[zoneName] = 0
   159  	}
   160  	for _, pod := range pods.Items {
   161  		if pod.DeletionTimestamp != nil {
   162  			continue
   163  		}
   164  		zoneName, err := getZoneNameForPod(ctx, c, pod)
   165  		framework.ExpectNoError(err)
   166  		podsPerZone[zoneName] = podsPerZone[zoneName] + 1
   167  	}
   168  	minPodsPerZone := math.MaxInt32
   169  	maxPodsPerZone := 0
   170  	for _, podCount := range podsPerZone {
   171  		if podCount < minPodsPerZone {
   172  			minPodsPerZone = podCount
   173  		}
   174  		if podCount > maxPodsPerZone {
   175  			maxPodsPerZone = podCount
   176  		}
   177  	}
   178  	gomega.Expect(maxPodsPerZone-minPodsPerZone).To(gomega.BeNumerically("~", 0, 2),
   179  		"Pods were not evenly spread across zones.  %d in one zone and %d in another zone",
   180  		minPodsPerZone, maxPodsPerZone)
   181  }
   183  // SpreadRCOrFail Check that the pods comprising a replication
   184  // controller get spread evenly across available zones
   185  func SpreadRCOrFail(ctx context.Context, f *framework.Framework, replicaCount int32, zoneNames sets.Set[string], image string, args []string) {
   186  	name := "ubelite-spread-rc-" + string(uuid.NewUUID())
   187  	ginkgo.By(fmt.Sprintf("Creating replication controller %s", name))
   188  	controller, err := f.ClientSet.CoreV1().ReplicationControllers(f.Namespace.Name).Create(ctx, &v1.ReplicationController{
   189  		ObjectMeta: metav1.ObjectMeta{
   190  			Namespace: f.Namespace.Name,
   191  			Name:      name,
   192  		},
   193  		Spec: v1.ReplicationControllerSpec{
   194  			Replicas: &replicaCount,
   195  			Selector: map[string]string{
   196  				"name": name,
   197  			},
   198  			Template: &v1.PodTemplateSpec{
   199  				ObjectMeta: metav1.ObjectMeta{
   200  					Labels: map[string]string{"name": name},
   201  				},
   202  				Spec: v1.PodSpec{
   203  					Containers: []v1.Container{
   204  						{
   205  							Name:  name,
   206  							Image: image,
   207  							Args:  args,
   208  							Ports: []v1.ContainerPort{{ContainerPort: 9376}},
   209  						},
   210  					},
   211  				},
   212  			},
   213  		},
   214  	}, metav1.CreateOptions{})
   215  	framework.ExpectNoError(err)
   216  	// Cleanup the replication controller when we are done.
   217  	defer func() {
   218  		// Resize the replication controller to zero to get rid of pods.
   219  		if err := e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, f.Namespace.Name, controller.Name); err != nil {
   220  			framework.Logf("Failed to cleanup replication controller %v: %v.", controller.Name, err)
   221  		}
   222  	}()
   223  	// List the pods, making sure we observe all the replicas.
   224  	selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
   225  	_, err = e2epod.PodsCreated(ctx, f.ClientSet, f.Namespace.Name, name, replicaCount)
   226  	framework.ExpectNoError(err)
   228  	// Wait for all of them to be scheduled
   229  	ginkgo.By(fmt.Sprintf("Waiting for %d replicas of %s to be scheduled.  Selector: %v", replicaCount, name, selector))
   230  	pods, err := e2epod.WaitForPodsWithLabelScheduled(ctx, f.ClientSet, f.Namespace.Name, selector)
   231  	framework.ExpectNoError(err)
   233  	// Now make sure they're spread across zones
   234  	checkZoneSpreading(ctx, f.ClientSet, pods, sets.List(zoneNames))
   235  }

