1
16
17 package scheduling
18
19 import (
20 "context"
21 "fmt"
22 "math"
23 "time"
24
25 "github.com/onsi/ginkgo/v2"
26 "github.com/onsi/gomega"
27 v1 "k8s.io/api/core/v1"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/labels"
30 "k8s.io/apimachinery/pkg/util/intstr"
31 "k8s.io/apimachinery/pkg/util/sets"
32 "k8s.io/apimachinery/pkg/util/uuid"
33 clientset "k8s.io/client-go/kubernetes"
34 "k8s.io/kubernetes/test/e2e/framework"
35 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
36 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
37 e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
38 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
39 testutils "k8s.io/kubernetes/test/utils"
40 imageutils "k8s.io/kubernetes/test/utils/image"
41 admissionapi "k8s.io/pod-security-admission/api"
42 )
43
44 var _ = SIGDescribe("Multi-AZ Clusters", func() {
45 f := framework.NewDefaultFramework("multi-az")
46 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
47 var zoneCount int
48 var err error
49 var zoneNames sets.Set[string]
50 ginkgo.BeforeEach(func(ctx context.Context) {
51 cs := f.ClientSet
52
53 if zoneCount <= 0 {
54 zoneNames, err = e2enode.GetSchedulableClusterZones(ctx, cs)
55 framework.ExpectNoError(err)
56 zoneCount = len(zoneNames)
57 }
58 ginkgo.By(fmt.Sprintf("Checking for multi-zone cluster. Schedulable zone count = %d", zoneCount))
59 msg := fmt.Sprintf("Schedulable zone count is %d, only run for multi-zone clusters, skipping test", zoneCount)
60 e2eskipper.SkipUnlessAtLeast(zoneCount, 2, msg)
61
62
63 e2enode.WaitForTotalHealthy(ctx, cs, time.Minute)
64 nodeList, err := e2enode.GetReadySchedulableNodes(ctx, cs)
65 framework.ExpectNoError(err)
66
67
68 err = createBalancedPodForNodes(ctx, f, cs, f.Namespace.Name, nodeList.Items, podRequestedResource, 0.0)
69 framework.ExpectNoError(err)
70 })
71 f.It("should spread the pods of a service across zones", f.WithSerial(), func(ctx context.Context) {
72 SpreadServiceOrFail(ctx, f, 5*zoneCount, zoneNames, imageutils.GetPauseImageName())
73 })
74
75 f.It("should spread the pods of a replication controller across zones", f.WithSerial(), func(ctx context.Context) {
76 SpreadRCOrFail(ctx, f, int32(5*zoneCount), zoneNames, framework.ServeHostnameImage, []string{"serve-hostname"})
77 })
78 })
79
80
81
82 func SpreadServiceOrFail(ctx context.Context, f *framework.Framework, replicaCount int, zoneNames sets.Set[string], image string) {
83
84 serviceName := "test-service"
85 serviceSpec := &v1.Service{
86 ObjectMeta: metav1.ObjectMeta{
87 Name: serviceName,
88 Namespace: f.Namespace.Name,
89 },
90 Spec: v1.ServiceSpec{
91 Selector: map[string]string{
92 "service": serviceName,
93 },
94 Ports: []v1.ServicePort{{
95 Port: 80,
96 TargetPort: intstr.FromInt32(80),
97 }},
98 },
99 }
100 _, err := f.ClientSet.CoreV1().Services(f.Namespace.Name).Create(ctx, serviceSpec, metav1.CreateOptions{})
101 framework.ExpectNoError(err)
102
103
104 podSpec := &v1.Pod{
105 ObjectMeta: metav1.ObjectMeta{
106 Name: serviceName,
107 Labels: map[string]string{"service": serviceName},
108 },
109 Spec: v1.PodSpec{
110 Containers: []v1.Container{
111 {
112 Name: "test",
113 Image: image,
114 },
115 },
116 },
117 }
118
119
120
121
122
123 framework.ExpectNoError(testutils.StartPods(f.ClientSet, replicaCount, f.Namespace.Name, serviceName, *podSpec, false, framework.Logf))
124
125
126 selector := labels.SelectorFromSet(labels.Set(map[string]string{"service": serviceName}))
127 pods, err := e2epod.WaitForPodsWithLabelScheduled(ctx, f.ClientSet, f.Namespace.Name, selector)
128 framework.ExpectNoError(err)
129
130
131 checkZoneSpreading(ctx, f.ClientSet, pods, sets.List(zoneNames))
132 }
133
134
135 func getZoneNameForNode(node v1.Node) (string, error) {
136 if z, ok := node.Labels[v1.LabelFailureDomainBetaZone]; ok {
137 return z, nil
138 } else if z, ok := node.Labels[v1.LabelTopologyZone]; ok {
139 return z, nil
140 }
141 return "", fmt.Errorf("node %s doesn't have zone label %s or %s",
142 node.Name, v1.LabelFailureDomainBetaZone, v1.LabelTopologyZone)
143 }
144
145
146 func getZoneNameForPod(ctx context.Context, c clientset.Interface, pod v1.Pod) (string, error) {
147 ginkgo.By(fmt.Sprintf("Getting zone name for pod %s, on node %s", pod.Name, pod.Spec.NodeName))
148 node, err := c.CoreV1().Nodes().Get(ctx, pod.Spec.NodeName, metav1.GetOptions{})
149 framework.ExpectNoError(err)
150 return getZoneNameForNode(*node)
151 }
152
153
154
155 func checkZoneSpreading(ctx context.Context, c clientset.Interface, pods *v1.PodList, zoneNames []string) {
156 podsPerZone := make(map[string]int)
157 for _, zoneName := range zoneNames {
158 podsPerZone[zoneName] = 0
159 }
160 for _, pod := range pods.Items {
161 if pod.DeletionTimestamp != nil {
162 continue
163 }
164 zoneName, err := getZoneNameForPod(ctx, c, pod)
165 framework.ExpectNoError(err)
166 podsPerZone[zoneName] = podsPerZone[zoneName] + 1
167 }
168 minPodsPerZone := math.MaxInt32
169 maxPodsPerZone := 0
170 for _, podCount := range podsPerZone {
171 if podCount < minPodsPerZone {
172 minPodsPerZone = podCount
173 }
174 if podCount > maxPodsPerZone {
175 maxPodsPerZone = podCount
176 }
177 }
178 gomega.Expect(maxPodsPerZone-minPodsPerZone).To(gomega.BeNumerically("~", 0, 2),
179 "Pods were not evenly spread across zones. %d in one zone and %d in another zone",
180 minPodsPerZone, maxPodsPerZone)
181 }
182
183
184
185 func SpreadRCOrFail(ctx context.Context, f *framework.Framework, replicaCount int32, zoneNames sets.Set[string], image string, args []string) {
186 name := "ubelite-spread-rc-" + string(uuid.NewUUID())
187 ginkgo.By(fmt.Sprintf("Creating replication controller %s", name))
188 controller, err := f.ClientSet.CoreV1().ReplicationControllers(f.Namespace.Name).Create(ctx, &v1.ReplicationController{
189 ObjectMeta: metav1.ObjectMeta{
190 Namespace: f.Namespace.Name,
191 Name: name,
192 },
193 Spec: v1.ReplicationControllerSpec{
194 Replicas: &replicaCount,
195 Selector: map[string]string{
196 "name": name,
197 },
198 Template: &v1.PodTemplateSpec{
199 ObjectMeta: metav1.ObjectMeta{
200 Labels: map[string]string{"name": name},
201 },
202 Spec: v1.PodSpec{
203 Containers: []v1.Container{
204 {
205 Name: name,
206 Image: image,
207 Args: args,
208 Ports: []v1.ContainerPort{{ContainerPort: 9376}},
209 },
210 },
211 },
212 },
213 },
214 }, metav1.CreateOptions{})
215 framework.ExpectNoError(err)
216
217 defer func() {
218
219 if err := e2erc.DeleteRCAndWaitForGC(ctx, f.ClientSet, f.Namespace.Name, controller.Name); err != nil {
220 framework.Logf("Failed to cleanup replication controller %v: %v.", controller.Name, err)
221 }
222 }()
223
224 selector := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
225 _, err = e2epod.PodsCreated(ctx, f.ClientSet, f.Namespace.Name, name, replicaCount)
226 framework.ExpectNoError(err)
227
228
229 ginkgo.By(fmt.Sprintf("Waiting for %d replicas of %s to be scheduled. Selector: %v", replicaCount, name, selector))
230 pods, err := e2epod.WaitForPodsWithLabelScheduled(ctx, f.ClientSet, f.Namespace.Name, selector)
231 framework.ExpectNoError(err)
232
233
234 checkZoneSpreading(ctx, f.ClientSet, pods, sets.List(zoneNames))
235 }
236
View as plain text