1
16
17 package logging
18
19 import (
20 "context"
21 "fmt"
22 "strconv"
23 "strings"
24 "sync"
25 "time"
26
27 v1 "k8s.io/api/core/v1"
28 "k8s.io/kubernetes/test/e2e/framework"
29 e2econfig "k8s.io/kubernetes/test/e2e/framework/config"
30 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
31 e2eoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
32 instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
33 imageutils "k8s.io/kubernetes/test/utils/image"
34 admissionapi "k8s.io/pod-security-admission/api"
35
36 "github.com/onsi/ginkgo/v2"
37 "github.com/onsi/gomega"
38 )
39
40 var loggingSoak struct {
41 Scale int `default:"1" usage:"number of waves of pods"`
42 TimeBetweenWaves time.Duration `default:"5000ms" usage:"time to wait before dumping the next wave of pods"`
43 }
44 var _ = e2econfig.AddOptions(&loggingSoak, "instrumentation.logging.soak")
45
46 var _ = instrumentation.SIGDescribe("Logging soak [Performance]", framework.WithSlow(), framework.WithDisruptive(), func() {
47
48 f := framework.NewDefaultFramework("logging-soak")
49 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
50
51
52 kbRateInSeconds := 1 * time.Second
53 totalLogTime := 2 * time.Minute
54
55
56
57
58
59
60 ginkgo.It(fmt.Sprintf("should survive logging 1KB every %v seconds, for a duration of %v", kbRateInSeconds, totalLogTime), func(ctx context.Context) {
61 ginkgo.By(fmt.Sprintf("scaling up to %v pods per node", loggingSoak.Scale))
62 defer ginkgo.GinkgoRecover()
63 var wg sync.WaitGroup
64 wg.Add(loggingSoak.Scale)
65 for i := 0; i < loggingSoak.Scale; i++ {
66 go func(i int) {
67 defer wg.Done()
68 defer ginkgo.GinkgoRecover()
69 wave := fmt.Sprintf("wave%v", strconv.Itoa(i))
70 framework.Logf("Starting logging soak, wave = %v", wave)
71 RunLogPodsWithSleepOf(ctx, f, kbRateInSeconds, wave, totalLogTime)
72 framework.Logf("Completed logging soak, wave %v", i)
73 }(i)
74
75 time.Sleep(loggingSoak.TimeBetweenWaves)
76 }
77 framework.Logf("Waiting on all %v logging soak waves to complete", loggingSoak.Scale)
78 wg.Wait()
79 })
80 })
81
82
83
84 func RunLogPodsWithSleepOf(ctx context.Context, f *framework.Framework, sleep time.Duration, podname string, timeout time.Duration) {
85
86 nodes, err := e2enode.GetReadySchedulableNodes(ctx, f.ClientSet)
87 framework.ExpectNoError(err)
88 totalPods := len(nodes.Items)
89 gomega.Expect(nodes.Items).ToNot(gomega.BeEmpty())
90
91 kilobyte := strings.Repeat("logs-123", 128)
92
93 appName := "logging-soak" + podname
94 podlables := e2enode.CreatePodsPerNodeForSimpleApp(
95 ctx,
96 f.ClientSet,
97 f.Namespace.Name,
98 appName,
99 func(n v1.Node) v1.PodSpec {
100 return v1.PodSpec{
101 Containers: []v1.Container{{
102 Name: "logging-soak",
103 Image: imageutils.GetE2EImage(imageutils.BusyBox),
104 Args: []string{
105 "/bin/sh",
106 "-c",
107 fmt.Sprintf("while true ; do echo %v ; sleep %v; done", kilobyte, sleep.Seconds()),
108 },
109 }},
110 NodeName: n.Name,
111 RestartPolicy: v1.RestartPolicyAlways,
112 }
113 },
114 totalPods,
115 )
116
117 logSoakVerification := f.NewClusterVerification(
118 f.Namespace,
119 framework.PodStateVerification{
120 Selectors: podlables,
121 ValidPhases: []v1.PodPhase{v1.PodRunning, v1.PodSucceeded},
122
123
124 Verify: func(p v1.Pod) (bool, error) {
125 s, err := e2eoutput.LookForStringInLog(f.Namespace.Name, p.Name, "logging-soak", "logs-123", 1*time.Second)
126 return s != "", err
127 },
128 },
129 )
130
131 largeClusterForgiveness := time.Duration(len(nodes.Items)/5) * time.Second
132 pods, err := logSoakVerification.WaitFor(ctx, totalPods, timeout+largeClusterForgiveness)
133
134 if err != nil {
135 framework.Failf("Error in wait... %v", err)
136 } else if len(pods) < totalPods {
137 framework.Failf("Only got %v out of %v", len(pods), totalPods)
138 }
139 }
140
View as plain text