1
16
17 package monitoring
18
19 import (
20 "context"
21 "fmt"
22 "math"
23 "os"
24 "time"
25
26 "k8s.io/apimachinery/pkg/util/wait"
27 "k8s.io/kubernetes/test/e2e/feature"
28 "k8s.io/kubernetes/test/e2e/framework"
29 e2eautoscaling "k8s.io/kubernetes/test/e2e/framework/autoscaling"
30 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
31 instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
32 admissionapi "k8s.io/pod-security-admission/api"
33
34 "github.com/onsi/ginkgo/v2"
35 "golang.org/x/oauth2/google"
36 gcm "google.golang.org/api/monitoring/v3"
37 "google.golang.org/api/option"
38 )
39
40 var (
41
42
43 stackdriverMetrics = []string{
44 "uptime",
45 "memory/bytes_total",
46 "memory/bytes_used",
47 "cpu/reserved_cores",
48 "cpu/usage_time",
49 "memory/page_fault_count",
50 "disk/bytes_used",
51 "disk/bytes_total",
52 "cpu/utilization",
53 }
54
55 pollFrequency = time.Second * 5
56 pollTimeout = time.Minute * 7
57
58 rcName = "resource-consumer"
59 memoryUsed = 64
60 memoryLimit int64 = 200
61 tolerance = 0.25
62 )
63
64 var _ = instrumentation.SIGDescribe("Stackdriver Monitoring", func() {
65 ginkgo.BeforeEach(func() {
66 e2eskipper.SkipUnlessProviderIs("gce", "gke")
67 })
68
69 f := framework.NewDefaultFramework("stackdriver-monitoring")
70 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
71
72 f.It("should have cluster metrics", feature.StackdriverMonitoring, func(ctx context.Context) {
73 testStackdriverMonitoring(ctx, f, 1, 100, 200)
74 })
75
76 })
77
78 func testStackdriverMonitoring(ctx context.Context, f *framework.Framework, pods, allPodsCPU int, perPodCPU int64) {
79 projectID := framework.TestContext.CloudConfig.ProjectID
80
81 client, err := google.DefaultClient(ctx, gcm.CloudPlatformScope)
82 framework.ExpectNoError(err)
83
84
85
86
87
88
96
97 gcmService, err := gcm.NewService(ctx, option.WithHTTPClient(client))
98
99
100
101 basePathOverride := os.Getenv("STACKDRIVER_API_ENDPOINT_OVERRIDE")
102 if basePathOverride != "" {
103 gcmService.BasePath = basePathOverride
104 }
105
106 framework.ExpectNoError(err)
107
108 rc := e2eautoscaling.NewDynamicResourceConsumer(ctx, rcName, f.Namespace.Name, e2eautoscaling.KindDeployment, pods, allPodsCPU, memoryUsed, 0, perPodCPU, memoryLimit, f.ClientSet, f.ScalesGetter, e2eautoscaling.Disable, e2eautoscaling.Idle)
109 ginkgo.DeferCleanup(rc.CleanUp)
110
111 rc.WaitForReplicas(ctx, pods, 15*time.Minute)
112
113 metricsMap := map[string]bool{}
114 pollingFunction := checkForMetrics(projectID, gcmService, time.Now(), metricsMap, allPodsCPU, perPodCPU)
115 err = wait.Poll(pollFrequency, pollTimeout, pollingFunction)
116 if err != nil {
117 framework.Logf("Missing metrics: %+v\n", metricsMap)
118 }
119 framework.ExpectNoError(err)
120 }
121
122 func checkForMetrics(projectID string, gcmService *gcm.Service, start time.Time, metricsMap map[string]bool, cpuUsed int, cpuLimit int64) func() (bool, error) {
123 return func() (bool, error) {
124 counter := 0
125 correctUtilization := false
126 for _, metric := range stackdriverMetrics {
127 metricsMap[metric] = false
128 }
129 for _, metric := range stackdriverMetrics {
130
131 ts, err := fetchTimeSeries(projectID, gcmService, metric, start, time.Now())
132 framework.ExpectNoError(err)
133 if len(ts) > 0 {
134 counter = counter + 1
135 metricsMap[metric] = true
136 framework.Logf("Received %v timeseries for metric %v\n", len(ts), metric)
137 } else {
138 framework.Logf("No timeseries for metric %v\n", metric)
139 }
140
141 var sum float64
142 switch metric {
143 case "cpu/utilization":
144 for _, t := range ts {
145 max := t.Points[0]
146 maxEnd, _ := time.Parse(time.RFC3339, max.Interval.EndTime)
147 for _, p := range t.Points {
148 pEnd, _ := time.Parse(time.RFC3339, p.Interval.EndTime)
149 if pEnd.After(maxEnd) {
150 max = p
151 maxEnd, _ = time.Parse(time.RFC3339, max.Interval.EndTime)
152 }
153 }
154 sum = sum + *max.Value.DoubleValue
155 framework.Logf("Received %v points for metric %v\n",
156 len(t.Points), metric)
157 }
158 framework.Logf("Most recent cpu/utilization sum*cpu/limit: %v\n", sum*float64(cpuLimit))
159 if math.Abs(sum*float64(cpuLimit)-float64(cpuUsed)) > tolerance*float64(cpuUsed) {
160 return false, nil
161 }
162 correctUtilization = true
163 }
164 }
165 if counter < 9 || !correctUtilization {
166 return false, nil
167 }
168 return true, nil
169 }
170 }
171
172 func createMetricFilter(metric string, containerName string) string {
173 return fmt.Sprintf(`metric.type="container.googleapis.com/container/%s" AND
174 resource.label.container_name="%s"`, metric, containerName)
175 }
176
177 func fetchTimeSeries(projectID string, gcmService *gcm.Service, metric string, start time.Time, end time.Time) ([]*gcm.TimeSeries, error) {
178 response, err := gcmService.Projects.TimeSeries.
179 List(fullProjectName(projectID)).
180 Filter(createMetricFilter(metric, rcName)).
181 IntervalStartTime(start.Format(time.RFC3339)).
182 IntervalEndTime(end.Format(time.RFC3339)).
183 Do()
184 if err != nil {
185 return nil, err
186 }
187 return response.TimeSeries, nil
188 }
189
190 func fullProjectName(name string) string {
191 return fmt.Sprintf("projects/%s", name)
192 }
193
View as plain text