1
2
3
4
19
20 package scale
21
22 import (
23 "context"
24 "fmt"
25 "os"
26 "sync"
27 "time"
28
29 appsv1 "k8s.io/api/apps/v1"
30 v1 "k8s.io/api/core/v1"
31 networkingv1 "k8s.io/api/networking/v1"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/util/intstr"
34 clientset "k8s.io/client-go/kubernetes"
35
36 "k8s.io/kubernetes/test/e2e/framework"
37 e2edeployment "k8s.io/kubernetes/test/e2e/framework/deployment"
38 e2eingress "k8s.io/kubernetes/test/e2e/framework/ingress"
39 "k8s.io/kubernetes/test/e2e/framework/providers/gce"
40 imageutils "k8s.io/kubernetes/test/utils/image"
41 )
42
43 const (
44 numIngressesSmall = 5
45 numIngressesMedium = 20
46 numIngressesLarge = 50
47 numIngressesExtraLarge = 99
48
49 scaleTestIngressNamePrefix = "ing-scale"
50 scaleTestBackendName = "echoheaders-scale"
51 scaleTestSecretName = "tls-secret-scale"
52 scaleTestHostname = "scale.ingress.com"
53 scaleTestNumBackends = 10
54 scaleTestPollInterval = 15 * time.Second
55
56
57
58 waitForIngressMaxTimeout = 80 * time.Minute
59 ingressesCleanupTimeout = 80 * time.Minute
60 )
61
62 var (
63 scaleTestLabels = map[string]string{
64 "app": scaleTestBackendName,
65 }
66 )
67
68
69 type IngressScaleFramework struct {
70 Clientset clientset.Interface
71 Jig *e2eingress.TestJig
72 GCEController *gce.IngressController
73 CloudConfig framework.CloudConfig
74 Logger e2eingress.TestLogger
75
76 Namespace string
77 EnableTLS bool
78 NumIngressesTest []int
79 OutputFile string
80
81 ScaleTestDeploy *appsv1.Deployment
82 ScaleTestSvcs []*v1.Service
83 ScaleTestIngs []*networkingv1.Ingress
84
85
86
87 BatchCreateLatencies [][]time.Duration
88
89 BatchDurations []time.Duration
90
91
92 StepCreateLatencies []time.Duration
93
94
95 StepUpdateLatencies []time.Duration
96 }
97
98
99 func NewIngressScaleFramework(cs clientset.Interface, ns string, cloudConfig framework.CloudConfig) *IngressScaleFramework {
100 return &IngressScaleFramework{
101 Namespace: ns,
102 Clientset: cs,
103 CloudConfig: cloudConfig,
104 Logger: &e2eingress.E2ELogger{},
105 EnableTLS: true,
106 NumIngressesTest: []int{
107 numIngressesSmall,
108 numIngressesMedium,
109 numIngressesLarge,
110 numIngressesExtraLarge,
111 },
112 }
113 }
114
115
116 func (f *IngressScaleFramework) PrepareScaleTest(ctx context.Context) error {
117 f.Logger.Infof("Initializing ingress test suite and gce controller...")
118 f.Jig = e2eingress.NewIngressTestJig(f.Clientset)
119 f.Jig.Logger = f.Logger
120 f.Jig.PollInterval = scaleTestPollInterval
121 f.GCEController = &gce.IngressController{
122 Client: f.Clientset,
123 Cloud: f.CloudConfig,
124 }
125 if err := f.GCEController.Init(ctx); err != nil {
126 return fmt.Errorf("failed to initialize GCE controller: %w", err)
127 }
128
129 f.ScaleTestSvcs = []*v1.Service{}
130 f.ScaleTestIngs = []*networkingv1.Ingress{}
131
132 return nil
133 }
134
135
136 func (f *IngressScaleFramework) CleanupScaleTest(ctx context.Context) []error {
137 var errs []error
138
139 f.Logger.Infof("Cleaning up ingresses...")
140 for _, ing := range f.ScaleTestIngs {
141 if ing != nil {
142 if err := f.Clientset.NetworkingV1().Ingresses(ing.Namespace).Delete(ctx, ing.Name, metav1.DeleteOptions{}); err != nil {
143 errs = append(errs, fmt.Errorf("error while deleting ingress %s/%s: %w", ing.Namespace, ing.Name, err))
144 }
145 }
146 }
147 f.Logger.Infof("Cleaning up services...")
148 for _, svc := range f.ScaleTestSvcs {
149 if svc != nil {
150 if err := f.Clientset.CoreV1().Services(svc.Namespace).Delete(ctx, svc.Name, metav1.DeleteOptions{}); err != nil {
151 errs = append(errs, fmt.Errorf("error while deleting service %s/%s: %w", svc.Namespace, svc.Name, err))
152 }
153 }
154 }
155 if f.ScaleTestDeploy != nil {
156 f.Logger.Infof("Cleaning up deployment %s...", f.ScaleTestDeploy.Name)
157 if err := f.Clientset.AppsV1().Deployments(f.ScaleTestDeploy.Namespace).Delete(ctx, f.ScaleTestDeploy.Name, metav1.DeleteOptions{}); err != nil {
158 errs = append(errs, fmt.Errorf("error while deleting deployment %s/%s: %w", f.ScaleTestDeploy.Namespace, f.ScaleTestDeploy.Name, err))
159 }
160 }
161
162 f.Logger.Infof("Cleaning up cloud resources...")
163 if err := f.GCEController.CleanupIngressControllerWithTimeout(ctx, ingressesCleanupTimeout); err != nil {
164 errs = append(errs, err)
165 }
166
167 return errs
168 }
169
170
171 func (f *IngressScaleFramework) RunScaleTest(ctx context.Context) []error {
172 var errs []error
173
174 testDeploy := generateScaleTestBackendDeploymentSpec(scaleTestNumBackends)
175 f.Logger.Infof("Creating deployment %s...", testDeploy.Name)
176 testDeploy, err := f.Jig.Client.AppsV1().Deployments(f.Namespace).Create(ctx, testDeploy, metav1.CreateOptions{})
177 if err != nil {
178 errs = append(errs, fmt.Errorf("failed to create deployment %s: %w", testDeploy.Name, err))
179 return errs
180 }
181 f.ScaleTestDeploy = testDeploy
182
183 if f.EnableTLS {
184 f.Logger.Infof("Ensuring TLS secret %s...", scaleTestSecretName)
185 if err := f.Jig.PrepareTLSSecret(ctx, f.Namespace, scaleTestSecretName, scaleTestHostname); err != nil {
186 errs = append(errs, fmt.Errorf("failed to prepare TLS secret %s: %w", scaleTestSecretName, err))
187 return errs
188 }
189 }
190
191
192 numIngsCreated := 0
193
194 prepareIngsFunc := func(ctx context.Context, numIngsNeeded int) {
195 var ingWg sync.WaitGroup
196 numIngsToCreate := numIngsNeeded - numIngsCreated
197 ingWg.Add(numIngsToCreate)
198 svcQueue := make(chan *v1.Service, numIngsToCreate)
199 ingQueue := make(chan *networkingv1.Ingress, numIngsToCreate)
200 errQueue := make(chan error, numIngsToCreate)
201 latencyQueue := make(chan time.Duration, numIngsToCreate)
202 start := time.Now()
203 for ; numIngsCreated < numIngsNeeded; numIngsCreated++ {
204 suffix := fmt.Sprintf("%d", numIngsCreated)
205 go func() {
206 defer ingWg.Done()
207
208 start := time.Now()
209 svcCreated, ingCreated, err := f.createScaleTestServiceIngress(ctx, suffix, f.EnableTLS)
210 svcQueue <- svcCreated
211 ingQueue <- ingCreated
212 if err != nil {
213 errQueue <- err
214 return
215 }
216 f.Logger.Infof("Waiting for ingress %s to come up...", ingCreated.Name)
217 if err := f.Jig.WaitForGivenIngressWithTimeout(ctx, ingCreated, false, waitForIngressMaxTimeout); err != nil {
218 errQueue <- err
219 return
220 }
221 elapsed := time.Since(start)
222 f.Logger.Infof("Spent %s for ingress %s to come up", elapsed, ingCreated.Name)
223 latencyQueue <- elapsed
224 }()
225 }
226
227
228 f.Logger.Infof("Waiting for %d ingresses to come up...", numIngsToCreate)
229 ingWg.Wait()
230 close(svcQueue)
231 close(ingQueue)
232 close(errQueue)
233 close(latencyQueue)
234 elapsed := time.Since(start)
235 for svc := range svcQueue {
236 f.ScaleTestSvcs = append(f.ScaleTestSvcs, svc)
237 }
238 for ing := range ingQueue {
239 f.ScaleTestIngs = append(f.ScaleTestIngs, ing)
240 }
241 var createLatencies []time.Duration
242 for latency := range latencyQueue {
243 createLatencies = append(createLatencies, latency)
244 }
245 f.BatchCreateLatencies = append(f.BatchCreateLatencies, createLatencies)
246 if len(errQueue) != 0 {
247 f.Logger.Errorf("Failed while creating services and ingresses, spent %v", elapsed)
248 for err := range errQueue {
249 errs = append(errs, err)
250 }
251 return
252 }
253 f.Logger.Infof("Spent %s for %d ingresses to come up", elapsed, numIngsToCreate)
254 f.BatchDurations = append(f.BatchDurations, elapsed)
255 }
256
257 measureCreateUpdateFunc := func(ctx context.Context) {
258 f.Logger.Infof("Create one more ingress and wait for it to come up")
259 start := time.Now()
260 svcCreated, ingCreated, err := f.createScaleTestServiceIngress(ctx, fmt.Sprintf("%d", numIngsCreated), f.EnableTLS)
261 numIngsCreated = numIngsCreated + 1
262 f.ScaleTestSvcs = append(f.ScaleTestSvcs, svcCreated)
263 f.ScaleTestIngs = append(f.ScaleTestIngs, ingCreated)
264 if err != nil {
265 errs = append(errs, err)
266 return
267 }
268
269 f.Logger.Infof("Waiting for ingress %s to come up...", ingCreated.Name)
270 if err := f.Jig.WaitForGivenIngressWithTimeout(ctx, ingCreated, false, waitForIngressMaxTimeout); err != nil {
271 errs = append(errs, err)
272 return
273 }
274 elapsed := time.Since(start)
275 f.Logger.Infof("Spent %s for ingress %s to come up", elapsed, ingCreated.Name)
276 f.StepCreateLatencies = append(f.StepCreateLatencies, elapsed)
277
278 f.Logger.Infof("Updating ingress and wait for change to take effect")
279 ingToUpdate, err := f.Clientset.NetworkingV1().Ingresses(f.Namespace).Get(ctx, ingCreated.Name, metav1.GetOptions{})
280 if err != nil {
281 errs = append(errs, err)
282 return
283 }
284 addTestPathToIngress(ingToUpdate)
285 start = time.Now()
286 ingToUpdate, err = f.Clientset.NetworkingV1().Ingresses(f.Namespace).Update(ctx, ingToUpdate, metav1.UpdateOptions{})
287 if err != nil {
288 errs = append(errs, err)
289 return
290 }
291
292 if err := f.Jig.WaitForGivenIngressWithTimeout(ctx, ingToUpdate, false, waitForIngressMaxTimeout); err != nil {
293 errs = append(errs, err)
294 return
295 }
296 elapsed = time.Since(start)
297 f.Logger.Infof("Spent %s for updating ingress %s", elapsed, ingToUpdate.Name)
298 f.StepUpdateLatencies = append(f.StepUpdateLatencies, elapsed)
299 }
300
301 defer f.dumpLatencies()
302
303 for _, num := range f.NumIngressesTest {
304 f.Logger.Infof("Create more ingresses until we reach %d ingresses", num)
305 prepareIngsFunc(ctx, num)
306 f.Logger.Infof("Measure create and update latency with %d ingresses", num)
307 measureCreateUpdateFunc(ctx)
308
309 if len(errs) != 0 {
310 return errs
311 }
312 }
313
314 return errs
315 }
316
317 func (f *IngressScaleFramework) dumpLatencies() {
318 f.Logger.Infof("Dumping scale test latencies...")
319 formattedData := f.GetFormattedLatencies()
320 if f.OutputFile != "" {
321 f.Logger.Infof("Dumping scale test latencies to file %s...", f.OutputFile)
322 os.WriteFile(f.OutputFile, []byte(formattedData), 0644)
323 return
324 }
325 f.Logger.Infof("\n%v", formattedData)
326 }
327
328
329
330 func (f *IngressScaleFramework) GetFormattedLatencies() string {
331 if len(f.NumIngressesTest) == 0 ||
332 len(f.NumIngressesTest) != len(f.BatchCreateLatencies) ||
333 len(f.NumIngressesTest) != len(f.BatchDurations) ||
334 len(f.NumIngressesTest) != len(f.StepCreateLatencies) ||
335 len(f.NumIngressesTest) != len(f.StepUpdateLatencies) {
336 return "Failed to construct latencies output."
337 }
338
339 res := "--- Procedure logs ---\n"
340 for i, latencies := range f.BatchCreateLatencies {
341 res += fmt.Sprintf("Create %d ingresses parallelly, each of them takes below amount of time before starts serving traffic:\n", len(latencies))
342 for _, latency := range latencies {
343 res = res + fmt.Sprintf("- %v\n", latency)
344 }
345 res += fmt.Sprintf("Total duration for completing %d ingress creations: %v\n", len(latencies), f.BatchDurations[i])
346 res += fmt.Sprintf("Duration to create one more ingress with %d ingresses existing: %v\n", f.NumIngressesTest[i], f.StepCreateLatencies[i])
347 res += fmt.Sprintf("Duration to update one ingress with %d ingresses existing: %v\n", f.NumIngressesTest[i]+1, f.StepUpdateLatencies[i])
348 }
349 res = res + "--- Summary ---\n"
350 var batchTotalStr, batchAvgStr, singleCreateStr, singleUpdateStr string
351 for i, latencies := range f.BatchCreateLatencies {
352 batchTotalStr += fmt.Sprintf("Batch creation total latency for %d ingresses with %d ingresses existing: %v\n", len(latencies), f.NumIngressesTest[i]-len(latencies), f.BatchDurations[i])
353 var avgLatency time.Duration
354 for _, latency := range latencies {
355 avgLatency = avgLatency + latency
356 }
357 avgLatency /= time.Duration(len(latencies))
358 batchAvgStr += fmt.Sprintf("Batch creation average latency for %d ingresses with %d ingresses existing: %v\n", len(latencies), f.NumIngressesTest[i]-len(latencies), avgLatency)
359 singleCreateStr += fmt.Sprintf("Single ingress creation latency with %d ingresses existing: %v\n", f.NumIngressesTest[i], f.StepCreateLatencies[i])
360 singleUpdateStr += fmt.Sprintf("Single ingress update latency with %d ingresses existing: %v\n", f.NumIngressesTest[i]+1, f.StepUpdateLatencies[i])
361 }
362 res += batchTotalStr + batchAvgStr + singleCreateStr + singleUpdateStr
363 return res
364 }
365
366 func addTestPathToIngress(ing *networkingv1.Ingress) {
367 prefixPathType := networkingv1.PathTypeImplementationSpecific
368 ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths = append(
369 ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths,
370 networkingv1.HTTPIngressPath{
371 Path: "/test",
372 PathType: &prefixPathType,
373 Backend: ing.Spec.Rules[0].IngressRuleValue.HTTP.Paths[0].Backend,
374 })
375 }
376
377 func (f *IngressScaleFramework) createScaleTestServiceIngress(ctx context.Context, suffix string, enableTLS bool) (*v1.Service, *networkingv1.Ingress, error) {
378 svcCreated, err := f.Clientset.CoreV1().Services(f.Namespace).Create(ctx, generateScaleTestServiceSpec(suffix), metav1.CreateOptions{})
379 if err != nil {
380 return nil, nil, err
381 }
382 ingCreated, err := f.Clientset.NetworkingV1().Ingresses(f.Namespace).Create(ctx, generateScaleTestIngressSpec(suffix, enableTLS), metav1.CreateOptions{})
383 if err != nil {
384 return nil, nil, err
385 }
386 return svcCreated, ingCreated, nil
387 }
388
389 func generateScaleTestIngressSpec(suffix string, enableTLS bool) *networkingv1.Ingress {
390 prefixPathType := networkingv1.PathTypeImplementationSpecific
391 ing := &networkingv1.Ingress{
392 ObjectMeta: metav1.ObjectMeta{
393 Name: fmt.Sprintf("%s-%s", scaleTestIngressNamePrefix, suffix),
394 },
395 Spec: networkingv1.IngressSpec{
396 TLS: []networkingv1.IngressTLS{
397 {SecretName: scaleTestSecretName},
398 },
399 Rules: []networkingv1.IngressRule{
400 {
401 Host: scaleTestHostname,
402 IngressRuleValue: networkingv1.IngressRuleValue{
403 HTTP: &networkingv1.HTTPIngressRuleValue{
404 Paths: []networkingv1.HTTPIngressPath{
405 {
406 Path: "/scale",
407 PathType: &prefixPathType,
408 Backend: networkingv1.IngressBackend{
409 Service: &networkingv1.IngressServiceBackend{
410 Name: fmt.Sprintf("%s-%s", scaleTestBackendName, suffix),
411 Port: networkingv1.ServiceBackendPort{
412 Number: 80,
413 },
414 },
415 },
416 },
417 },
418 },
419 },
420 },
421 },
422 },
423 }
424 if enableTLS {
425 ing.Spec.TLS = []networkingv1.IngressTLS{
426 {SecretName: scaleTestSecretName},
427 }
428 }
429 return ing
430 }
431
432 func generateScaleTestServiceSpec(suffix string) *v1.Service {
433 return &v1.Service{
434 ObjectMeta: metav1.ObjectMeta{
435 Name: fmt.Sprintf("%s-%s", scaleTestBackendName, suffix),
436 Labels: scaleTestLabels,
437 },
438 Spec: v1.ServiceSpec{
439 Ports: []v1.ServicePort{{
440 Name: "http",
441 Protocol: v1.ProtocolTCP,
442 Port: 80,
443 TargetPort: intstr.FromInt32(8080),
444 }},
445 Selector: scaleTestLabels,
446 Type: v1.ServiceTypeNodePort,
447 },
448 }
449 }
450
451 func generateScaleTestBackendDeploymentSpec(numReplicas int32) *appsv1.Deployment {
452 d := e2edeployment.NewDeployment(
453 scaleTestBackendName, numReplicas, scaleTestLabels, scaleTestBackendName,
454 imageutils.GetE2EImage(imageutils.Agnhost), appsv1.RollingUpdateDeploymentStrategyType)
455 d.Spec.Template.Spec.Containers[0].Command = []string{
456 "/agnhost",
457 "netexec",
458 "--http-port=8080",
459 }
460 d.Spec.Template.Spec.Containers[0].Ports = []v1.ContainerPort{{ContainerPort: 8080}}
461 d.Spec.Template.Spec.Containers[0].ReadinessProbe = &v1.Probe{
462 ProbeHandler: v1.ProbeHandler{
463 HTTPGet: &v1.HTTPGetAction{
464 Port: intstr.FromInt32(8080),
465 Path: "/healthz",
466 },
467 },
468 FailureThreshold: 10,
469 PeriodSeconds: 1,
470 SuccessThreshold: 1,
471 TimeoutSeconds: 1,
472 }
473 return d
474 }
475
View as plain text