1
16
17 package benchmark
18
19 import (
20 "context"
21 "encoding/json"
22 "flag"
23 "fmt"
24 "io"
25 "math"
26 "os"
27 "path"
28 "strings"
29 "sync"
30 "testing"
31 "time"
32
33 "github.com/google/go-cmp/cmp"
34
35 v1 "k8s.io/api/core/v1"
36 apierrors "k8s.io/apimachinery/pkg/api/errors"
37 "k8s.io/apimachinery/pkg/api/meta"
38 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
39 "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
40 "k8s.io/apimachinery/pkg/runtime/schema"
41 "k8s.io/apimachinery/pkg/util/wait"
42 utilfeature "k8s.io/apiserver/pkg/util/feature"
43 cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
44 "k8s.io/client-go/dynamic"
45 "k8s.io/client-go/informers"
46 coreinformers "k8s.io/client-go/informers/core/v1"
47 clientset "k8s.io/client-go/kubernetes"
48 "k8s.io/client-go/restmapper"
49 "k8s.io/component-base/featuregate"
50 featuregatetesting "k8s.io/component-base/featuregate/testing"
51 "k8s.io/component-base/metrics/legacyregistry"
52 "k8s.io/klog/v2"
53 "k8s.io/kubernetes/pkg/scheduler/apis/config"
54 "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
55 "k8s.io/kubernetes/pkg/scheduler/apis/config/validation"
56 frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
57 "k8s.io/kubernetes/pkg/scheduler/metrics"
58 "k8s.io/kubernetes/test/integration/framework"
59 testutils "k8s.io/kubernetes/test/utils"
60 "k8s.io/kubernetes/test/utils/ktesting"
61 "k8s.io/kubernetes/test/utils/ktesting/initoption"
62 "sigs.k8s.io/yaml"
63 )
64
65 type operationCode string
66
67 const (
68 createAnyOpcode operationCode = "createAny"
69 createNodesOpcode operationCode = "createNodes"
70 createNamespacesOpcode operationCode = "createNamespaces"
71 createPodsOpcode operationCode = "createPods"
72 createPodSetsOpcode operationCode = "createPodSets"
73 createResourceClaimsOpcode operationCode = "createResourceClaims"
74 createResourceDriverOpcode operationCode = "createResourceDriver"
75 churnOpcode operationCode = "churn"
76 barrierOpcode operationCode = "barrier"
77 sleepOpcode operationCode = "sleep"
78 )
79
80 const (
81
82
83
84 Create = "create"
85
86 Recreate = "recreate"
87 )
88
89 const (
90 configFile = "config/performance-config.yaml"
91 extensionPointsLabelName = "extension_point"
92 resultLabelName = "result"
93 )
94
95 var (
96 defaultMetricsCollectorConfig = metricsCollectorConfig{
97 Metrics: map[string]*labelValues{
98 "scheduler_framework_extension_point_duration_seconds": {
99 label: extensionPointsLabelName,
100 values: []string{"Filter", "Score"},
101 },
102 "scheduler_scheduling_attempt_duration_seconds": {
103 label: resultLabelName,
104 values: []string{metrics.ScheduledResult, metrics.UnschedulableResult, metrics.ErrorResult},
105 },
106 "scheduler_pod_scheduling_duration_seconds": nil,
107 "scheduler_pod_scheduling_sli_duration_seconds": nil,
108 },
109 }
110 )
111
112
113
114
115 type testCase struct {
116
117 Name string
118
119
120 FeatureGates map[featuregate.Feature]bool
121
122
123
124 MetricsCollectorConfig *metricsCollectorConfig
125
126
127
128 WorkloadTemplate []op
129
130 Workloads []*workload
131
132
133 SchedulerConfigPath string
134
135
136
137 DefaultPodTemplatePath *string
138
139 Labels []string
140 }
141
142 func (tc *testCase) collectsMetrics() bool {
143 for _, op := range tc.WorkloadTemplate {
144 if op.realOp.collectsMetrics() {
145 return true
146 }
147 }
148 return false
149 }
150
151 func (tc *testCase) workloadNamesUnique() error {
152 workloadUniqueNames := map[string]bool{}
153 for _, w := range tc.Workloads {
154 if workloadUniqueNames[w.Name] {
155 return fmt.Errorf("%s: workload name %s is not unique", tc.Name, w.Name)
156 }
157 workloadUniqueNames[w.Name] = true
158 }
159 return nil
160 }
161
162
163
164
165 type workload struct {
166
167 Name string
168
169 Params params
170
171 Labels []string
172 }
173
174 type params struct {
175 params map[string]int
176
177 isUsed map[string]bool
178 }
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198 func (p *params) UnmarshalJSON(b []byte) error {
199 aux := map[string]int{}
200
201 if err := json.Unmarshal(b, &aux); err != nil {
202 return err
203 }
204
205 p.params = aux
206 p.isUsed = map[string]bool{}
207 return nil
208 }
209
210
211 func (p params) get(key string) (int, error) {
212 p.isUsed[key] = true
213 param, ok := p.params[key]
214 if ok {
215 return param, nil
216 }
217 return 0, fmt.Errorf("parameter %s is undefined", key)
218 }
219
220
221 func (w workload) unusedParams() []string {
222 var ret []string
223 for name := range w.Params.params {
224 if !w.Params.isUsed[name] {
225 ret = append(ret, name)
226 }
227 }
228 return ret
229 }
230
231
232 type op struct {
233 realOp realOp
234 }
235
236
237
238 func (op *op) UnmarshalJSON(b []byte) error {
239 possibleOps := []realOp{
240 &createAny{},
241 &createNodesOp{},
242 &createNamespacesOp{},
243 &createPodsOp{},
244 &createPodSetsOp{},
245 &createResourceClaimsOp{},
246 &createResourceDriverOp{},
247 &churnOp{},
248 &barrierOp{},
249 &sleepOp{},
250
251 }
252 var firstError error
253 for _, possibleOp := range possibleOps {
254 if err := json.Unmarshal(b, possibleOp); err == nil {
255 if err2 := possibleOp.isValid(true); err2 == nil {
256 op.realOp = possibleOp
257 return nil
258 } else if firstError == nil {
259
260
261 firstError = err2
262 }
263 }
264 }
265 return fmt.Errorf("cannot unmarshal %s into any known op type: %w", string(b), firstError)
266 }
267
268
269
270 type realOp interface {
271
272
273 isValid(allowParameterization bool) error
274
275 collectsMetrics() bool
276
277
278
279
280
281
282 patchParams(w *workload) (realOp, error)
283 }
284
285
286
287 type runnableOp interface {
288 realOp
289
290
291
292 requiredNamespaces() []string
293
294 run(ktesting.TContext)
295 }
296
297 func isValidParameterizable(val string) bool {
298 return strings.HasPrefix(val, "$")
299 }
300
301 func isValidCount(allowParameterization bool, count int, countParam string) bool {
302 if !allowParameterization || countParam == "" {
303
304 return count >= 0
305 }
306 return isValidParameterizable(countParam)
307 }
308
309
310 type createNodesOp struct {
311
312 Opcode operationCode
313
314 Count int
315
316 CountParam string
317
318
319 NodeTemplatePath *string
320
321
322
323 NodeAllocatableStrategy *testutils.NodeAllocatableStrategy
324 LabelNodePrepareStrategy *testutils.LabelNodePrepareStrategy
325 UniqueNodeLabelStrategy *testutils.UniqueNodeLabelStrategy
326 }
327
328 func (cno *createNodesOp) isValid(allowParameterization bool) error {
329 if cno.Opcode != createNodesOpcode {
330 return fmt.Errorf("invalid opcode %q", cno.Opcode)
331 }
332 if !isValidCount(allowParameterization, cno.Count, cno.CountParam) {
333 return fmt.Errorf("invalid Count=%d / CountParam=%q", cno.Count, cno.CountParam)
334 }
335 return nil
336 }
337
338 func (*createNodesOp) collectsMetrics() bool {
339 return false
340 }
341
342 func (cno createNodesOp) patchParams(w *workload) (realOp, error) {
343 if cno.CountParam != "" {
344 var err error
345 cno.Count, err = w.Params.get(cno.CountParam[1:])
346 if err != nil {
347 return nil, err
348 }
349 }
350 return &cno, (&cno).isValid(false)
351 }
352
353
354 type createNamespacesOp struct {
355
356 Opcode operationCode
357
358
359 Prefix string
360
361 Count int
362
363 CountParam string
364
365
366 NamespaceTemplatePath *string
367 }
368
369 func (cmo *createNamespacesOp) isValid(allowParameterization bool) error {
370 if cmo.Opcode != createNamespacesOpcode {
371 return fmt.Errorf("invalid opcode %q", cmo.Opcode)
372 }
373 if !isValidCount(allowParameterization, cmo.Count, cmo.CountParam) {
374 return fmt.Errorf("invalid Count=%d / CountParam=%q", cmo.Count, cmo.CountParam)
375 }
376 return nil
377 }
378
379 func (*createNamespacesOp) collectsMetrics() bool {
380 return false
381 }
382
383 func (cmo createNamespacesOp) patchParams(w *workload) (realOp, error) {
384 if cmo.CountParam != "" {
385 var err error
386 cmo.Count, err = w.Params.get(cmo.CountParam[1:])
387 if err != nil {
388 return nil, err
389 }
390 }
391 return &cmo, (&cmo).isValid(false)
392 }
393
394
395
396
397 type createPodsOp struct {
398
399 Opcode operationCode
400
401 Count int
402
403 CountParam string
404
405
406
407 CollectMetrics bool
408
409
410
411 Namespace *string
412
413
414
415 PodTemplatePath *string
416
417
418
419 SkipWaitToCompletion bool
420
421
422 PersistentVolumeTemplatePath *string
423 PersistentVolumeClaimTemplatePath *string
424 }
425
426 func (cpo *createPodsOp) isValid(allowParameterization bool) error {
427 if cpo.Opcode != createPodsOpcode {
428 return fmt.Errorf("invalid opcode %q; expected %q", cpo.Opcode, createPodsOpcode)
429 }
430 if !isValidCount(allowParameterization, cpo.Count, cpo.CountParam) {
431 return fmt.Errorf("invalid Count=%d / CountParam=%q", cpo.Count, cpo.CountParam)
432 }
433 if cpo.CollectMetrics && cpo.SkipWaitToCompletion {
434
435
436
437 return fmt.Errorf("collectMetrics and skipWaitToCompletion cannot be true at the same time")
438 }
439 return nil
440 }
441
442 func (cpo *createPodsOp) collectsMetrics() bool {
443 return cpo.CollectMetrics
444 }
445
446 func (cpo createPodsOp) patchParams(w *workload) (realOp, error) {
447 if cpo.CountParam != "" {
448 var err error
449 cpo.Count, err = w.Params.get(cpo.CountParam[1:])
450 if err != nil {
451 return nil, err
452 }
453 }
454 return &cpo, (&cpo).isValid(false)
455 }
456
457
458 type createPodSetsOp struct {
459
460 Opcode operationCode
461
462 Count int
463
464 CountParam string
465
466
467 NamespacePrefix string
468
469 CreatePodsOp createPodsOp
470 }
471
472 func (cpso *createPodSetsOp) isValid(allowParameterization bool) error {
473 if cpso.Opcode != createPodSetsOpcode {
474 return fmt.Errorf("invalid opcode %q; expected %q", cpso.Opcode, createPodSetsOpcode)
475 }
476 if !isValidCount(allowParameterization, cpso.Count, cpso.CountParam) {
477 return fmt.Errorf("invalid Count=%d / CountParam=%q", cpso.Count, cpso.CountParam)
478 }
479 return cpso.CreatePodsOp.isValid(allowParameterization)
480 }
481
482 func (cpso *createPodSetsOp) collectsMetrics() bool {
483 return cpso.CreatePodsOp.CollectMetrics
484 }
485
486 func (cpso createPodSetsOp) patchParams(w *workload) (realOp, error) {
487 if cpso.CountParam != "" {
488 var err error
489 cpso.Count, err = w.Params.get(cpso.CountParam[1:])
490 if err != nil {
491 return nil, err
492 }
493 }
494 return &cpso, (&cpso).isValid(true)
495 }
496
497
498 type churnOp struct {
499
500 Opcode operationCode
501
502
503
504
505
506 Mode string
507
508
509 Number int
510
511 IntervalMilliseconds int64
512
513
514
515 Namespace *string
516
517 TemplatePaths []string
518 }
519
520 func (co *churnOp) isValid(_ bool) error {
521 if co.Opcode != churnOpcode {
522 return fmt.Errorf("invalid opcode %q", co.Opcode)
523 }
524 if co.Mode != Recreate && co.Mode != Create {
525 return fmt.Errorf("invalid mode: %v. must be one of %v", co.Mode, []string{Recreate, Create})
526 }
527 if co.Number < 0 {
528 return fmt.Errorf("number (%v) cannot be negative", co.Number)
529 }
530 if co.Mode == Recreate && co.Number == 0 {
531 return fmt.Errorf("number cannot be 0 when mode is %v", Recreate)
532 }
533 if len(co.TemplatePaths) == 0 {
534 return fmt.Errorf("at least one template spec file needs to be specified")
535 }
536 return nil
537 }
538
539 func (*churnOp) collectsMetrics() bool {
540 return false
541 }
542
543 func (co churnOp) patchParams(w *workload) (realOp, error) {
544 return &co, nil
545 }
546
547
548
549
550 type barrierOp struct {
551
552 Opcode operationCode
553
554
555 Namespaces []string
556 }
557
558 func (bo *barrierOp) isValid(allowParameterization bool) error {
559 if bo.Opcode != barrierOpcode {
560 return fmt.Errorf("invalid opcode %q", bo.Opcode)
561 }
562 return nil
563 }
564
565 func (*barrierOp) collectsMetrics() bool {
566 return false
567 }
568
569 func (bo barrierOp) patchParams(w *workload) (realOp, error) {
570 return &bo, nil
571 }
572
573
574
575 type sleepOp struct {
576
577 Opcode operationCode
578
579 Duration time.Duration
580 }
581
582 func (so *sleepOp) UnmarshalJSON(data []byte) (err error) {
583 var tmp struct {
584 Opcode operationCode
585 Duration string
586 }
587 if err = json.Unmarshal(data, &tmp); err != nil {
588 return err
589 }
590
591 so.Opcode = tmp.Opcode
592 so.Duration, err = time.ParseDuration(tmp.Duration)
593 return err
594 }
595
596 func (so *sleepOp) isValid(_ bool) error {
597 if so.Opcode != sleepOpcode {
598 return fmt.Errorf("invalid opcode %q; expected %q", so.Opcode, sleepOpcode)
599 }
600 return nil
601 }
602
603 func (so *sleepOp) collectsMetrics() bool {
604 return false
605 }
606
607 func (so sleepOp) patchParams(_ *workload) (realOp, error) {
608 return &so, nil
609 }
610
611 var useTestingLog = flag.Bool("use-testing-log", false, "Write log entries with testing.TB.Log. This is more suitable for unit testing and debugging, but less realistic in real benchmarks.")
612
613 func initTestOutput(tb testing.TB) io.Writer {
614 var output io.Writer
615 if *useTestingLog {
616 output = framework.NewTBWriter(tb)
617 } else {
618 tmpDir := tb.TempDir()
619 logfileName := path.Join(tmpDir, "output.log")
620 fileOutput, err := os.Create(logfileName)
621 if err != nil {
622 tb.Fatalf("create log file: %v", err)
623 }
624 output = fileOutput
625
626 tb.Cleanup(func() {
627
628
629
630
631 if err := fileOutput.Close(); err != nil {
632 tb.Fatalf("close log file: %v", err)
633 }
634 log, err := os.ReadFile(logfileName)
635 if err != nil {
636 tb.Fatalf("read log file: %v", err)
637 }
638 tb.Logf("full log output:\n%s", string(log))
639 })
640 }
641 return output
642 }
643
644 type cleanupKeyType struct{}
645
646 var cleanupKey = cleanupKeyType{}
647
648
649
650
651
652
653
654 func shouldCleanup(ctx context.Context) bool {
655 val := ctx.Value(cleanupKey)
656 if enabled, ok := val.(bool); ok {
657 return enabled
658 }
659 return true
660 }
661
662
663
664 func withCleanup(tCtx ktesting.TContext, enabled bool) ktesting.TContext {
665 return ktesting.WithValue(tCtx, cleanupKey, enabled)
666 }
667
668 var perfSchedulingLabelFilter = flag.String("perf-scheduling-label-filter", "performance", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by BenchmarkPerfScheduling")
669
670
671
672 func RunBenchmarkPerfScheduling(b *testing.B, outOfTreePluginRegistry frameworkruntime.Registry) {
673 testCases, err := getTestCases(configFile)
674 if err != nil {
675 b.Fatal(err)
676 }
677 if err = validateTestCases(testCases); err != nil {
678 b.Fatal(err)
679 }
680
681 output := initTestOutput(b)
682
683
684
685
686
687
688
689 _ = framework.RedirectKlog(b, output)
690
691 dataItems := DataItems{Version: "v1"}
692 for _, tc := range testCases {
693 b.Run(tc.Name, func(b *testing.B) {
694 for _, w := range tc.Workloads {
695 b.Run(w.Name, func(b *testing.B) {
696 if !enabled(*perfSchedulingLabelFilter, append(tc.Labels, w.Labels...)...) {
697 b.Skipf("disabled by label filter %q", *perfSchedulingLabelFilter)
698 }
699 tCtx := ktesting.Init(b, initoption.PerTestOutput(*useTestingLog))
700
701
702
703
704
705
706
707
708 framework.GoleakCheck(b)
709
710
711
712 framework.StartEtcd(b, output)
713
714
715 timeout := 30 * time.Minute
716 tCtx = ktesting.WithTimeout(tCtx, timeout, fmt.Sprintf("timed out after the %s per-test timeout", timeout))
717
718 for feature, flag := range tc.FeatureGates {
719 defer featuregatetesting.SetFeatureGateDuringTest(b, utilfeature.DefaultFeatureGate, feature, flag)()
720 }
721 informerFactory, tCtx := setupClusterForWorkload(tCtx, tc.SchedulerConfigPath, tc.FeatureGates, outOfTreePluginRegistry)
722
723
724
725 tCtx = withCleanup(tCtx, false)
726
727 results := runWorkload(tCtx, tc, w, informerFactory)
728 dataItems.DataItems = append(dataItems.DataItems, results...)
729
730 if len(results) > 0 {
731
732
733
734
735 b.ReportMetric(0, "ns/op")
736
737
738
739
740 for _, result := range results {
741
742
743
744
745
746 metric := strings.ReplaceAll(result.Labels["Metric"], "_seconds", "_"+result.Unit)
747 for key, value := range result.Data {
748 b.ReportMetric(value, metric+"/"+key)
749 }
750 }
751 }
752
753
754
755 legacyregistry.Reset()
756 })
757 }
758 })
759 }
760 if err := dataItems2JSONFile(dataItems, b.Name()+"_benchmark"); err != nil {
761 b.Fatalf("unable to write measured data %+v: %v", dataItems, err)
762 }
763 }
764
765 var testSchedulingLabelFilter = flag.String("test-scheduling-label-filter", "integration-test", "comma-separated list of labels which a testcase must have (no prefix or +) or must not have (-), used by TestScheduling")
766
767 type schedulerConfig struct {
768 schedulerConfigPath string
769 featureGates map[featuregate.Feature]bool
770 }
771
772 func (c schedulerConfig) equals(tc *testCase) bool {
773 return c.schedulerConfigPath == tc.SchedulerConfigPath &&
774 cmp.Equal(c.featureGates, tc.FeatureGates)
775 }
776
777 func loadSchedulerConfig(file string) (*config.KubeSchedulerConfiguration, error) {
778 data, err := os.ReadFile(file)
779 if err != nil {
780 return nil, err
781 }
782
783 obj, gvk, err := scheme.Codecs.UniversalDecoder().Decode(data, nil, nil)
784 if err != nil {
785 return nil, err
786 }
787 if cfgObj, ok := obj.(*config.KubeSchedulerConfiguration); ok {
788 return cfgObj, nil
789 }
790 return nil, fmt.Errorf("couldn't decode as KubeSchedulerConfiguration, got %s: ", gvk)
791 }
792
793 func unrollWorkloadTemplate(tb ktesting.TB, wt []op, w *workload) []op {
794 var unrolled []op
795 for opIndex, o := range wt {
796 realOp, err := o.realOp.patchParams(w)
797 if err != nil {
798 tb.Fatalf("op %d: %v", opIndex, err)
799 }
800 switch concreteOp := realOp.(type) {
801 case *createPodSetsOp:
802 tb.Logf("Creating %d pod sets %s", concreteOp.Count, concreteOp.CountParam)
803 for i := 0; i < concreteOp.Count; i++ {
804 copy := concreteOp.CreatePodsOp
805 ns := fmt.Sprintf("%s-%d", concreteOp.NamespacePrefix, i)
806 copy.Namespace = &ns
807 unrolled = append(unrolled, op{realOp: ©})
808 }
809 default:
810 unrolled = append(unrolled, o)
811 }
812 }
813 return unrolled
814 }
815
816 func setupClusterForWorkload(tCtx ktesting.TContext, configPath string, featureGates map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, ktesting.TContext) {
817 var cfg *config.KubeSchedulerConfiguration
818 var err error
819 if configPath != "" {
820 cfg, err = loadSchedulerConfig(configPath)
821 if err != nil {
822 tCtx.Fatalf("error loading scheduler config file: %v", err)
823 }
824 if err = validation.ValidateKubeSchedulerConfiguration(cfg); err != nil {
825 tCtx.Fatalf("validate scheduler config file failed: %v", err)
826 }
827 }
828 return mustSetupCluster(tCtx, cfg, featureGates, outOfTreePluginRegistry)
829 }
830
831 func runWorkload(tCtx ktesting.TContext, tc *testCase, w *workload, informerFactory informers.SharedInformerFactory) []DataItem {
832 b, benchmarking := tCtx.TB().(*testing.B)
833 if benchmarking {
834 start := time.Now()
835 b.Cleanup(func() {
836 duration := time.Since(start)
837
838
839
840 b.ReportMetric(duration.Seconds(), "runtime_seconds")
841 })
842 }
843 cleanup := shouldCleanup(tCtx)
844
845
846
847
848 var throughputErrorMargin float64
849 if benchmarking {
850
851
852 throughputErrorMargin = 30
853 }
854
855
856
857
858
859 podInformer := informerFactory.Core().V1().Pods()
860
861
862 tCtx = ktesting.WithCancel(tCtx)
863 var wg sync.WaitGroup
864 defer wg.Wait()
865 defer tCtx.Cancel("workload is done")
866
867 var mu sync.Mutex
868 var dataItems []DataItem
869 nextNodeIndex := 0
870
871
872 numPodsScheduledPerNamespace := make(map[string]int)
873
874 if cleanup {
875
876 defer cleanupWorkload(tCtx, tc, numPodsScheduledPerNamespace)
877 }
878
879 for opIndex, op := range unrollWorkloadTemplate(tCtx, tc.WorkloadTemplate, w) {
880 realOp, err := op.realOp.patchParams(w)
881 if err != nil {
882 tCtx.Fatalf("op %d: %v", opIndex, err)
883 }
884 select {
885 case <-tCtx.Done():
886 tCtx.Fatalf("op %d: %v", opIndex, context.Cause(tCtx))
887 default:
888 }
889 switch concreteOp := realOp.(type) {
890 case *createNodesOp:
891 nodePreparer, err := getNodePreparer(fmt.Sprintf("node-%d-", opIndex), concreteOp, tCtx.Client())
892 if err != nil {
893 tCtx.Fatalf("op %d: %v", opIndex, err)
894 }
895 if err := nodePreparer.PrepareNodes(tCtx, nextNodeIndex); err != nil {
896 tCtx.Fatalf("op %d: %v", opIndex, err)
897 }
898 if cleanup {
899 defer func() {
900 if err := nodePreparer.CleanupNodes(tCtx); err != nil {
901 tCtx.Fatalf("failed to clean up nodes, error: %v", err)
902 }
903 }()
904 }
905 nextNodeIndex += concreteOp.Count
906
907 case *createNamespacesOp:
908 nsPreparer, err := newNamespacePreparer(tCtx, concreteOp)
909 if err != nil {
910 tCtx.Fatalf("op %d: %v", opIndex, err)
911 }
912 if err := nsPreparer.prepare(tCtx); err != nil {
913 err2 := nsPreparer.cleanup(tCtx)
914 if err2 != nil {
915 err = fmt.Errorf("prepare: %v; cleanup: %v", err, err2)
916 }
917 tCtx.Fatalf("op %d: %v", opIndex, err)
918 }
919 for _, n := range nsPreparer.namespaces() {
920 if _, ok := numPodsScheduledPerNamespace[n]; ok {
921
922 continue
923 }
924 numPodsScheduledPerNamespace[n] = 0
925 }
926
927 case *createPodsOp:
928 var namespace string
929
930 namespace = fmt.Sprintf("namespace-%d", opIndex)
931 if concreteOp.Namespace != nil {
932 namespace = *concreteOp.Namespace
933 }
934 createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace)
935 if concreteOp.PodTemplatePath == nil {
936 concreteOp.PodTemplatePath = tc.DefaultPodTemplatePath
937 }
938 var collectors []testDataCollector
939
940
941
942 var collectorCtx ktesting.TContext
943 var collectorWG sync.WaitGroup
944 defer collectorWG.Wait()
945
946 if concreteOp.CollectMetrics {
947 collectorCtx = ktesting.WithCancel(tCtx)
948 defer collectorCtx.Cancel("cleaning up")
949 name := tCtx.Name()
950
951 name = name[strings.Index(name, "/")+1:]
952 collectors = getTestDataCollectors(collectorCtx, podInformer, fmt.Sprintf("%s/%s", name, namespace), namespace, tc.MetricsCollectorConfig, throughputErrorMargin)
953 for _, collector := range collectors {
954
955 collector := collector
956 collectorWG.Add(1)
957 go func() {
958 defer collectorWG.Done()
959 collector.run(collectorCtx)
960 }()
961 }
962 }
963 if err := createPods(tCtx, namespace, concreteOp); err != nil {
964 tCtx.Fatalf("op %d: %v", opIndex, err)
965 }
966 if concreteOp.SkipWaitToCompletion {
967
968
969 numPodsScheduledPerNamespace[namespace] += concreteOp.Count
970 } else {
971 if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, concreteOp.Count); err != nil {
972 tCtx.Fatalf("op %d: error in waiting for pods to get scheduled: %v", opIndex, err)
973 }
974 }
975 if concreteOp.CollectMetrics {
976
977
978
979 collectorCtx.Cancel("collecting metrix, collector must stop first")
980 collectorWG.Wait()
981 mu.Lock()
982 for _, collector := range collectors {
983 dataItems = append(dataItems, collector.collect()...)
984 }
985 mu.Unlock()
986 }
987
988 if !concreteOp.SkipWaitToCompletion {
989
990
991
992 legacyregistry.Reset()
993 }
994
995 case *churnOp:
996 var namespace string
997 if concreteOp.Namespace != nil {
998 namespace = *concreteOp.Namespace
999 } else {
1000 namespace = fmt.Sprintf("namespace-%d", opIndex)
1001 }
1002 restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cacheddiscovery.NewMemCacheClient(tCtx.Client().Discovery()))
1003
1004 nsObj := &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}
1005 if _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, nsObj, metav1.CreateOptions{}); err != nil && !apierrors.IsAlreadyExists(err) {
1006 tCtx.Fatalf("op %d: unable to create namespace %v: %v", opIndex, namespace, err)
1007 }
1008
1009 var churnFns []func(name string) string
1010
1011 for i, path := range concreteOp.TemplatePaths {
1012 unstructuredObj, gvk, err := getUnstructuredFromFile(path)
1013 if err != nil {
1014 tCtx.Fatalf("op %d: unable to parse the %v-th template path: %v", opIndex, i, err)
1015 }
1016
1017 mapping, err := restMapper.RESTMapping(gvk.GroupKind(), gvk.Version)
1018 if err != nil {
1019 tCtx.Fatalf("op %d: unable to find GVR for %v: %v", opIndex, gvk, err)
1020 }
1021 gvr := mapping.Resource
1022
1023 var dynRes dynamic.ResourceInterface
1024 if mapping.Scope.Name() == meta.RESTScopeNameNamespace {
1025 dynRes = tCtx.Dynamic().Resource(gvr).Namespace(namespace)
1026 } else {
1027 dynRes = tCtx.Dynamic().Resource(gvr)
1028 }
1029
1030 churnFns = append(churnFns, func(name string) string {
1031 if name != "" {
1032 if err := dynRes.Delete(tCtx, name, metav1.DeleteOptions{}); err != nil {
1033 tCtx.Errorf("op %d: unable to delete %v: %v", opIndex, name, err)
1034 }
1035 return ""
1036 }
1037
1038 live, err := dynRes.Create(tCtx, unstructuredObj, metav1.CreateOptions{})
1039 if err != nil {
1040 return ""
1041 }
1042 return live.GetName()
1043 })
1044 }
1045
1046 var interval int64 = 500
1047 if concreteOp.IntervalMilliseconds != 0 {
1048 interval = concreteOp.IntervalMilliseconds
1049 }
1050 ticker := time.NewTicker(time.Duration(interval) * time.Millisecond)
1051 defer ticker.Stop()
1052
1053 switch concreteOp.Mode {
1054 case Create:
1055 wg.Add(1)
1056 go func() {
1057 defer wg.Done()
1058 count, threshold := 0, concreteOp.Number
1059 if threshold == 0 {
1060 threshold = math.MaxInt32
1061 }
1062 for count < threshold {
1063 select {
1064 case <-ticker.C:
1065 for i := range churnFns {
1066 churnFns[i]("")
1067 }
1068 count++
1069 case <-tCtx.Done():
1070 return
1071 }
1072 }
1073 }()
1074 case Recreate:
1075 wg.Add(1)
1076 go func() {
1077 defer wg.Done()
1078 retVals := make([][]string, len(churnFns))
1079
1080 for i := range retVals {
1081 retVals[i] = make([]string, concreteOp.Number)
1082 }
1083
1084 count := 0
1085 for {
1086 select {
1087 case <-ticker.C:
1088 for i := range churnFns {
1089 retVals[i][count%concreteOp.Number] = churnFns[i](retVals[i][count%concreteOp.Number])
1090 }
1091 count++
1092 case <-tCtx.Done():
1093 return
1094 }
1095 }
1096 }()
1097 }
1098
1099 case *barrierOp:
1100 for _, namespace := range concreteOp.Namespaces {
1101 if _, ok := numPodsScheduledPerNamespace[namespace]; !ok {
1102 tCtx.Fatalf("op %d: unknown namespace %s", opIndex, namespace)
1103 }
1104 }
1105 if err := waitUntilPodsScheduled(tCtx, podInformer, concreteOp.Namespaces, numPodsScheduledPerNamespace); err != nil {
1106 tCtx.Fatalf("op %d: %v", opIndex, err)
1107 }
1108
1109
1110 if len(concreteOp.Namespaces) == 0 {
1111 numPodsScheduledPerNamespace = make(map[string]int)
1112 } else {
1113 for _, namespace := range concreteOp.Namespaces {
1114 delete(numPodsScheduledPerNamespace, namespace)
1115 }
1116 }
1117
1118 case *sleepOp:
1119 select {
1120 case <-tCtx.Done():
1121 case <-time.After(concreteOp.Duration):
1122 }
1123 default:
1124 runable, ok := concreteOp.(runnableOp)
1125 if !ok {
1126 tCtx.Fatalf("op %d: invalid op %v", opIndex, concreteOp)
1127 }
1128 for _, namespace := range runable.requiredNamespaces() {
1129 createNamespaceIfNotPresent(tCtx, namespace, &numPodsScheduledPerNamespace)
1130 }
1131 runable.run(tCtx)
1132 }
1133 }
1134
1135
1136 unusedParams := w.unusedParams()
1137 if len(unusedParams) != 0 {
1138 tCtx.Fatalf("the parameters %v are defined on workload %s, but unused.\nPlease make sure there are no typos.", unusedParams, w.Name)
1139 }
1140
1141
1142
1143 return dataItems
1144 }
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155 func cleanupWorkload(tCtx ktesting.TContext, tc *testCase, numPodsScheduledPerNamespace map[string]int) {
1156 deleteNow := *metav1.NewDeleteOptions(0)
1157 for namespace := range numPodsScheduledPerNamespace {
1158
1159
1160
1161 if err := tCtx.Client().CoreV1().Pods(namespace).DeleteCollection(tCtx, deleteNow, metav1.ListOptions{}); err != nil {
1162 tCtx.Fatalf("failed to delete pods in namespace %q: %v", namespace, err)
1163 }
1164 if err := tCtx.Client().CoreV1().Namespaces().Delete(tCtx, namespace, deleteNow); err != nil {
1165 tCtx.Fatalf("Deleting Namespace %q in numPodsScheduledPerNamespace: %v", namespace, err)
1166 }
1167 }
1168
1169
1170
1171
1172
1173 if err := wait.PollUntilContextTimeout(tCtx, time.Second, 5*time.Minute, false, func(ctx context.Context) (bool, error) {
1174 namespaces, err := tCtx.Client().CoreV1().Namespaces().List(ctx, metav1.ListOptions{})
1175 if err != nil {
1176 return false, err
1177 }
1178 for _, namespace := range namespaces.Items {
1179 if _, ok := numPodsScheduledPerNamespace[namespace.Name]; ok {
1180
1181 return false, nil
1182 }
1183 }
1184
1185 return true, nil
1186 }); err != nil {
1187 tCtx.Fatalf("failed while waiting for namespace removal: %v", err)
1188 }
1189 }
1190
1191 func createNamespaceIfNotPresent(tCtx ktesting.TContext, namespace string, podsPerNamespace *map[string]int) {
1192 if _, ok := (*podsPerNamespace)[namespace]; !ok {
1193
1194
1195 _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: namespace}}, metav1.CreateOptions{})
1196 if err != nil {
1197 tCtx.Fatalf("failed to create namespace for Pod: %v", namespace)
1198 }
1199 (*podsPerNamespace)[namespace] = 0
1200 }
1201 }
1202
1203 type testDataCollector interface {
1204 run(tCtx ktesting.TContext)
1205 collect() []DataItem
1206 }
1207
1208 func getTestDataCollectors(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, name, namespace string, mcc *metricsCollectorConfig, throughputErrorMargin float64) []testDataCollector {
1209 if mcc == nil {
1210 mcc = &defaultMetricsCollectorConfig
1211 }
1212 return []testDataCollector{
1213 newThroughputCollector(tCtx, podInformer, map[string]string{"Name": name}, []string{namespace}, throughputErrorMargin),
1214 newMetricsCollector(mcc, map[string]string{"Name": name}),
1215 }
1216 }
1217
1218 func getNodePreparer(prefix string, cno *createNodesOp, clientset clientset.Interface) (testutils.TestNodePreparer, error) {
1219 var nodeStrategy testutils.PrepareNodeStrategy = &testutils.TrivialNodePrepareStrategy{}
1220 if cno.NodeAllocatableStrategy != nil {
1221 nodeStrategy = cno.NodeAllocatableStrategy
1222 } else if cno.LabelNodePrepareStrategy != nil {
1223 nodeStrategy = cno.LabelNodePrepareStrategy
1224 } else if cno.UniqueNodeLabelStrategy != nil {
1225 nodeStrategy = cno.UniqueNodeLabelStrategy
1226 }
1227
1228 if cno.NodeTemplatePath != nil {
1229 node, err := getNodeSpecFromFile(cno.NodeTemplatePath)
1230 if err != nil {
1231 return nil, err
1232 }
1233 return framework.NewIntegrationTestNodePreparerWithNodeSpec(
1234 clientset,
1235 []testutils.CountToStrategy{{Count: cno.Count, Strategy: nodeStrategy}},
1236 node,
1237 ), nil
1238 }
1239 return framework.NewIntegrationTestNodePreparer(
1240 clientset,
1241 []testutils.CountToStrategy{{Count: cno.Count, Strategy: nodeStrategy}},
1242 prefix,
1243 ), nil
1244 }
1245
1246 func createPods(tCtx ktesting.TContext, namespace string, cpo *createPodsOp) error {
1247 strategy, err := getPodStrategy(cpo)
1248 if err != nil {
1249 return err
1250 }
1251 tCtx.Logf("creating %d pods in namespace %q", cpo.Count, namespace)
1252 config := testutils.NewTestPodCreatorConfig()
1253 config.AddStrategy(namespace, cpo.Count, strategy)
1254 podCreator := testutils.NewTestPodCreator(tCtx.Client(), config)
1255 return podCreator.CreatePods(tCtx)
1256 }
1257
1258
1259
1260
1261 func waitUntilPodsScheduledInNamespace(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespace string, wantCount int) error {
1262 var pendingPod *v1.Pod
1263
1264 err := wait.PollUntilContextTimeout(tCtx, 1*time.Second, 10*time.Minute, true, func(ctx context.Context) (bool, error) {
1265 select {
1266 case <-ctx.Done():
1267 return true, ctx.Err()
1268 default:
1269 }
1270 scheduled, unscheduled, err := getScheduledPods(podInformer, namespace)
1271 if err != nil {
1272 return false, err
1273 }
1274 if len(scheduled) >= wantCount {
1275 tCtx.Logf("scheduling succeed")
1276 return true, nil
1277 }
1278 tCtx.Logf("namespace: %s, pods: want %d, got %d", namespace, wantCount, len(scheduled))
1279 if len(unscheduled) > 0 {
1280 pendingPod = unscheduled[0]
1281 } else {
1282 pendingPod = nil
1283 }
1284 return false, nil
1285 })
1286
1287 if err != nil && pendingPod != nil {
1288 err = fmt.Errorf("at least pod %s is not scheduled: %v", klog.KObj(pendingPod), err)
1289 }
1290 return err
1291 }
1292
1293
1294
1295 func waitUntilPodsScheduled(tCtx ktesting.TContext, podInformer coreinformers.PodInformer, namespaces []string, numPodsScheduledPerNamespace map[string]int) error {
1296
1297 if len(namespaces) == 0 {
1298 for namespace := range numPodsScheduledPerNamespace {
1299 namespaces = append(namespaces, namespace)
1300 }
1301 }
1302 for _, namespace := range namespaces {
1303 select {
1304 case <-tCtx.Done():
1305 return context.Cause(tCtx)
1306 default:
1307 }
1308 wantCount, ok := numPodsScheduledPerNamespace[namespace]
1309 if !ok {
1310 return fmt.Errorf("unknown namespace %s", namespace)
1311 }
1312 if err := waitUntilPodsScheduledInNamespace(tCtx, podInformer, namespace, wantCount); err != nil {
1313 return fmt.Errorf("error waiting for pods in namespace %q: %w", namespace, err)
1314 }
1315 }
1316 return nil
1317 }
1318
1319 func getSpecFromFile(path *string, spec interface{}) error {
1320 bytes, err := os.ReadFile(*path)
1321 if err != nil {
1322 return err
1323 }
1324 return yaml.UnmarshalStrict(bytes, spec)
1325 }
1326
1327 func getUnstructuredFromFile(path string) (*unstructured.Unstructured, *schema.GroupVersionKind, error) {
1328 bytes, err := os.ReadFile(path)
1329 if err != nil {
1330 return nil, nil, err
1331 }
1332
1333 bytes, err = yaml.YAMLToJSONStrict(bytes)
1334 if err != nil {
1335 return nil, nil, fmt.Errorf("cannot covert YAML to JSON: %v", err)
1336 }
1337
1338 obj, gvk, err := unstructured.UnstructuredJSONScheme.Decode(bytes, nil, nil)
1339 if err != nil {
1340 return nil, nil, err
1341 }
1342 unstructuredObj, ok := obj.(*unstructured.Unstructured)
1343 if !ok {
1344 return nil, nil, fmt.Errorf("cannot convert spec file in %v to an unstructured obj", path)
1345 }
1346 return unstructuredObj, gvk, nil
1347 }
1348
1349 func getTestCases(path string) ([]*testCase, error) {
1350 testCases := make([]*testCase, 0)
1351 if err := getSpecFromFile(&path, &testCases); err != nil {
1352 return nil, fmt.Errorf("parsing test cases error: %w", err)
1353 }
1354 return testCases, nil
1355 }
1356
1357 func validateTestCases(testCases []*testCase) error {
1358 if len(testCases) == 0 {
1359 return fmt.Errorf("no test cases defined")
1360 }
1361 testCaseUniqueNames := map[string]bool{}
1362 for _, tc := range testCases {
1363 if testCaseUniqueNames[tc.Name] {
1364 return fmt.Errorf("%s: name is not unique", tc.Name)
1365 }
1366 testCaseUniqueNames[tc.Name] = true
1367 if len(tc.Workloads) == 0 {
1368 return fmt.Errorf("%s: no workloads defined", tc.Name)
1369 }
1370 if err := tc.workloadNamesUnique(); err != nil {
1371 return err
1372 }
1373 if len(tc.WorkloadTemplate) == 0 {
1374 return fmt.Errorf("%s: no ops defined", tc.Name)
1375 }
1376
1377
1378
1379 if !tc.collectsMetrics() {
1380 return fmt.Errorf("%s: no op in the workload template collects metrics", tc.Name)
1381 }
1382
1383
1384
1385 }
1386 return nil
1387 }
1388
1389 func getPodStrategy(cpo *createPodsOp) (testutils.TestPodCreateStrategy, error) {
1390 basePod := makeBasePod()
1391 if cpo.PodTemplatePath != nil {
1392 var err error
1393 basePod, err = getPodSpecFromFile(cpo.PodTemplatePath)
1394 if err != nil {
1395 return nil, err
1396 }
1397 }
1398 if cpo.PersistentVolumeClaimTemplatePath == nil {
1399 return testutils.NewCustomCreatePodStrategy(basePod), nil
1400 }
1401
1402 pvTemplate, err := getPersistentVolumeSpecFromFile(cpo.PersistentVolumeTemplatePath)
1403 if err != nil {
1404 return nil, err
1405 }
1406 pvcTemplate, err := getPersistentVolumeClaimSpecFromFile(cpo.PersistentVolumeClaimTemplatePath)
1407 if err != nil {
1408 return nil, err
1409 }
1410 return testutils.NewCreatePodWithPersistentVolumeStrategy(pvcTemplate, getCustomVolumeFactory(pvTemplate), basePod), nil
1411 }
1412
1413 func getNodeSpecFromFile(path *string) (*v1.Node, error) {
1414 nodeSpec := &v1.Node{}
1415 if err := getSpecFromFile(path, nodeSpec); err != nil {
1416 return nil, fmt.Errorf("parsing Node: %w", err)
1417 }
1418 return nodeSpec, nil
1419 }
1420
1421 func getPodSpecFromFile(path *string) (*v1.Pod, error) {
1422 podSpec := &v1.Pod{}
1423 if err := getSpecFromFile(path, podSpec); err != nil {
1424 return nil, fmt.Errorf("parsing Pod: %w", err)
1425 }
1426 return podSpec, nil
1427 }
1428
1429 func getPersistentVolumeSpecFromFile(path *string) (*v1.PersistentVolume, error) {
1430 persistentVolumeSpec := &v1.PersistentVolume{}
1431 if err := getSpecFromFile(path, persistentVolumeSpec); err != nil {
1432 return nil, fmt.Errorf("parsing PersistentVolume: %w", err)
1433 }
1434 return persistentVolumeSpec, nil
1435 }
1436
1437 func getPersistentVolumeClaimSpecFromFile(path *string) (*v1.PersistentVolumeClaim, error) {
1438 persistentVolumeClaimSpec := &v1.PersistentVolumeClaim{}
1439 if err := getSpecFromFile(path, persistentVolumeClaimSpec); err != nil {
1440 return nil, fmt.Errorf("parsing PersistentVolumeClaim: %w", err)
1441 }
1442 return persistentVolumeClaimSpec, nil
1443 }
1444
1445 func getCustomVolumeFactory(pvTemplate *v1.PersistentVolume) func(id int) *v1.PersistentVolume {
1446 return func(id int) *v1.PersistentVolume {
1447 pv := pvTemplate.DeepCopy()
1448 volumeID := fmt.Sprintf("vol-%d", id)
1449 pv.ObjectMeta.Name = volumeID
1450 pvs := pv.Spec.PersistentVolumeSource
1451 if pvs.CSI != nil {
1452 pvs.CSI.VolumeHandle = volumeID
1453 } else if pvs.AWSElasticBlockStore != nil {
1454 pvs.AWSElasticBlockStore.VolumeID = volumeID
1455 }
1456 return pv
1457 }
1458 }
1459
1460
1461 type namespacePreparer struct {
1462 count int
1463 prefix string
1464 spec *v1.Namespace
1465 }
1466
1467 func newNamespacePreparer(tCtx ktesting.TContext, cno *createNamespacesOp) (*namespacePreparer, error) {
1468 ns := &v1.Namespace{}
1469 if cno.NamespaceTemplatePath != nil {
1470 if err := getSpecFromFile(cno.NamespaceTemplatePath, ns); err != nil {
1471 return nil, fmt.Errorf("parsing NamespaceTemplate: %w", err)
1472 }
1473 }
1474
1475 return &namespacePreparer{
1476 count: cno.Count,
1477 prefix: cno.Prefix,
1478 spec: ns,
1479 }, nil
1480 }
1481
1482
1483 func (p *namespacePreparer) namespaces() []string {
1484 namespaces := make([]string, p.count)
1485 for i := 0; i < p.count; i++ {
1486 namespaces[i] = fmt.Sprintf("%s-%d", p.prefix, i)
1487 }
1488 return namespaces
1489 }
1490
1491
1492 func (p *namespacePreparer) prepare(tCtx ktesting.TContext) error {
1493 base := &v1.Namespace{}
1494 if p.spec != nil {
1495 base = p.spec
1496 }
1497 tCtx.Logf("Making %d namespaces with prefix %q and template %v", p.count, p.prefix, *base)
1498 for i := 0; i < p.count; i++ {
1499 n := base.DeepCopy()
1500 n.Name = fmt.Sprintf("%s-%d", p.prefix, i)
1501 if err := testutils.RetryWithExponentialBackOff(func() (bool, error) {
1502 _, err := tCtx.Client().CoreV1().Namespaces().Create(tCtx, n, metav1.CreateOptions{})
1503 return err == nil || apierrors.IsAlreadyExists(err), nil
1504 }); err != nil {
1505 return err
1506 }
1507 }
1508 return nil
1509 }
1510
1511
1512 func (p *namespacePreparer) cleanup(tCtx ktesting.TContext) error {
1513 var errRet error
1514 for i := 0; i < p.count; i++ {
1515 n := fmt.Sprintf("%s-%d", p.prefix, i)
1516 if err := tCtx.Client().CoreV1().Namespaces().Delete(tCtx, n, metav1.DeleteOptions{}); err != nil {
1517 tCtx.Errorf("Deleting Namespace: %v", err)
1518 errRet = err
1519 }
1520 }
1521 return errRet
1522 }
1523
View as plain text