1
16
17 package benchmark
18
19 import (
20 "bytes"
21 "encoding/json"
22 "flag"
23 "fmt"
24 "math"
25 "os"
26 "path"
27 "sort"
28 "strings"
29 "time"
30
31 v1 "k8s.io/api/core/v1"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/labels"
34 "k8s.io/apimachinery/pkg/util/sets"
35 "k8s.io/client-go/informers"
36 coreinformers "k8s.io/client-go/informers/core/v1"
37 restclient "k8s.io/client-go/rest"
38 "k8s.io/component-base/featuregate"
39 "k8s.io/component-base/metrics/legacyregistry"
40 "k8s.io/component-base/metrics/testutil"
41 "k8s.io/klog/v2"
42 kubeschedulerconfigv1 "k8s.io/kube-scheduler/config/v1"
43 apiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
44 "k8s.io/kubernetes/pkg/features"
45 "k8s.io/kubernetes/pkg/scheduler/apis/config"
46 kubeschedulerscheme "k8s.io/kubernetes/pkg/scheduler/apis/config/scheme"
47 frameworkruntime "k8s.io/kubernetes/pkg/scheduler/framework/runtime"
48 "k8s.io/kubernetes/test/integration/framework"
49 "k8s.io/kubernetes/test/integration/util"
50 testutils "k8s.io/kubernetes/test/utils"
51 "k8s.io/kubernetes/test/utils/ktesting"
52 )
53
54 const (
55 dateFormat = "2006-01-02T15:04:05Z"
56 testNamespace = "sched-test"
57 setupNamespace = "sched-setup"
58 throughputSampleInterval = time.Second
59 )
60
61 var dataItemsDir = flag.String("data-items-dir", "", "destination directory for storing generated data items for perf dashboard")
62
63 func newDefaultComponentConfig() (*config.KubeSchedulerConfiguration, error) {
64 gvk := kubeschedulerconfigv1.SchemeGroupVersion.WithKind("KubeSchedulerConfiguration")
65 cfg := config.KubeSchedulerConfiguration{}
66 _, _, err := kubeschedulerscheme.Codecs.UniversalDecoder().Decode(nil, &gvk, &cfg)
67 if err != nil {
68 return nil, err
69 }
70 return &cfg, nil
71 }
72
73
74
75
76
77
78
79
80
81
82 func mustSetupCluster(tCtx ktesting.TContext, config *config.KubeSchedulerConfiguration, enabledFeatures map[featuregate.Feature]bool, outOfTreePluginRegistry frameworkruntime.Registry) (informers.SharedInformerFactory, ktesting.TContext) {
83
84 framework.MinVerbosity = 0
85
86
87
88 runtimeConfig := []string{"api/alpha=false"}
89 if enabledFeatures[features.DynamicResourceAllocation] {
90 runtimeConfig = append(runtimeConfig, "resource.k8s.io/v1alpha2=true")
91 }
92 customFlags := []string{
93
94 "--disable-admission-plugins=ServiceAccount,TaintNodesByCondition,Priority",
95 "--runtime-config=" + strings.Join(runtimeConfig, ","),
96 }
97 server, err := apiservertesting.StartTestServer(tCtx, apiservertesting.NewDefaultTestServerOptions(), customFlags, framework.SharedEtcd())
98 if err != nil {
99 tCtx.Fatalf("start apiserver: %v", err)
100 }
101 tCtx.Cleanup(server.TearDownFn)
102
103
104
105
106
107
108
109 cfg := restclient.CopyConfig(server.ClientConfig)
110 cfg.QPS = 5000.0
111 cfg.Burst = 5000
112
113
114 if config == nil {
115 var err error
116 config, err = newDefaultComponentConfig()
117 if err != nil {
118 tCtx.Fatalf("Error creating default component config: %v", err)
119 }
120 }
121
122 tCtx = ktesting.WithRESTConfig(tCtx, cfg)
123
124
125
126 _, informerFactory := util.StartScheduler(tCtx, tCtx.Client(), cfg, config, outOfTreePluginRegistry)
127 util.StartFakePVController(tCtx, tCtx.Client(), informerFactory)
128 runGC := util.CreateGCController(tCtx, tCtx, *cfg, informerFactory)
129 runNS := util.CreateNamespaceController(tCtx, tCtx, *cfg, informerFactory)
130
131 runResourceClaimController := func() {}
132 if enabledFeatures[features.DynamicResourceAllocation] {
133
134
135 runResourceClaimController = util.CreateResourceClaimController(tCtx, tCtx, tCtx.Client(), informerFactory)
136 }
137
138 informerFactory.Start(tCtx.Done())
139 informerFactory.WaitForCacheSync(tCtx.Done())
140 go runGC()
141 go runNS()
142 go runResourceClaimController()
143
144 return informerFactory, tCtx
145 }
146
147
148
149 func getScheduledPods(podInformer coreinformers.PodInformer, namespaces ...string) ([]*v1.Pod, []*v1.Pod, error) {
150 pods, err := podInformer.Lister().List(labels.Everything())
151 if err != nil {
152 return nil, nil, err
153 }
154
155 s := sets.New(namespaces...)
156 scheduled := make([]*v1.Pod, 0, len(pods))
157 unscheduled := make([]*v1.Pod, 0, len(pods))
158 for i := range pods {
159 pod := pods[i]
160 if len(s) == 0 || s.Has(pod.Namespace) {
161 if len(pod.Spec.NodeName) > 0 {
162 scheduled = append(scheduled, pod)
163 } else {
164 unscheduled = append(unscheduled, pod)
165 }
166 }
167 }
168 return scheduled, unscheduled, nil
169 }
170
171
172 type DataItem struct {
173
174
175 Data map[string]float64 `json:"data"`
176
177
178 Unit string `json:"unit"`
179
180 Labels map[string]string `json:"labels,omitempty"`
181 }
182
183
184 type DataItems struct {
185 Version string `json:"version"`
186 DataItems []DataItem `json:"dataItems"`
187 }
188
189
190 func makeBasePod() *v1.Pod {
191 basePod := &v1.Pod{
192 ObjectMeta: metav1.ObjectMeta{
193 GenerateName: "pod-",
194 },
195 Spec: testutils.MakePodSpec(),
196 }
197 return basePod
198 }
199
200 func dataItems2JSONFile(dataItems DataItems, namePrefix string) error {
201
202
203
204
205
206
207
208 labels := sets.New[string]()
209 for _, item := range dataItems.DataItems {
210 for label := range item.Labels {
211 labels.Insert(label)
212 }
213 }
214 for _, item := range dataItems.DataItems {
215 for label := range labels {
216 if _, ok := item.Labels[label]; !ok {
217 item.Labels[label] = "not applicable"
218 }
219 }
220 }
221
222 b, err := json.Marshal(dataItems)
223 if err != nil {
224 return err
225 }
226
227 destFile := fmt.Sprintf("%v_%v.json", namePrefix, time.Now().Format(dateFormat))
228 if *dataItemsDir != "" {
229
230 if err := os.MkdirAll(*dataItemsDir, 0750); err != nil {
231 return fmt.Errorf("dataItemsDir path %v does not exist and cannot be created: %v", *dataItemsDir, err)
232 }
233 destFile = path.Join(*dataItemsDir, destFile)
234 }
235 formatted := &bytes.Buffer{}
236 if err := json.Indent(formatted, b, "", " "); err != nil {
237 return fmt.Errorf("indenting error: %v", err)
238 }
239 return os.WriteFile(destFile, formatted.Bytes(), 0644)
240 }
241
242 type labelValues struct {
243 label string
244 values []string
245 }
246
247
248
249 type metricsCollectorConfig struct {
250 Metrics map[string]*labelValues
251 }
252
253
254
255 type metricsCollector struct {
256 *metricsCollectorConfig
257 labels map[string]string
258 }
259
260 func newMetricsCollector(config *metricsCollectorConfig, labels map[string]string) *metricsCollector {
261 return &metricsCollector{
262 metricsCollectorConfig: config,
263 labels: labels,
264 }
265 }
266
267 func (*metricsCollector) run(tCtx ktesting.TContext) {
268
269 }
270
271 func (pc *metricsCollector) collect() []DataItem {
272 var dataItems []DataItem
273 for metric, labelVals := range pc.Metrics {
274
275 if labelVals == nil {
276 dataItem := collectHistogramVec(metric, pc.labels, nil)
277 if dataItem != nil {
278 dataItems = append(dataItems, *dataItem)
279 }
280 } else {
281
282 for _, value := range labelVals.values {
283 lvMap := map[string]string{labelVals.label: value}
284 dataItem := collectHistogramVec(metric, pc.labels, lvMap)
285 if dataItem != nil {
286 dataItems = append(dataItems, *dataItem)
287 }
288 }
289 }
290 }
291 return dataItems
292 }
293
294 func collectHistogramVec(metric string, labels map[string]string, lvMap map[string]string) *DataItem {
295 vec, err := testutil.GetHistogramVecFromGatherer(legacyregistry.DefaultGatherer, metric, lvMap)
296 if err != nil {
297 klog.Error(err)
298 return nil
299 }
300
301 if err := vec.Validate(); err != nil {
302 klog.ErrorS(err, "the validation for HistogramVec is failed. The data for this metric won't be stored in a benchmark result file", "metric", metric, "labels", labels)
303 return nil
304 }
305
306 if vec.GetAggregatedSampleCount() == 0 {
307 klog.InfoS("It is expected that this metric wasn't recorded. The data for this metric won't be stored in a benchmark result file", "metric", metric, "labels", labels)
308 return nil
309 }
310
311 q50 := vec.Quantile(0.50)
312 q90 := vec.Quantile(0.90)
313 q95 := vec.Quantile(0.95)
314 q99 := vec.Quantile(0.99)
315 avg := vec.Average()
316
317 msFactor := float64(time.Second) / float64(time.Millisecond)
318
319
320 labelMap := map[string]string{"Metric": metric}
321 for k, v := range labels {
322 labelMap[k] = v
323 }
324 for k, v := range lvMap {
325 labelMap[k] = v
326 }
327 return &DataItem{
328 Labels: labelMap,
329 Data: map[string]float64{
330 "Perc50": q50 * msFactor,
331 "Perc90": q90 * msFactor,
332 "Perc95": q95 * msFactor,
333 "Perc99": q99 * msFactor,
334 "Average": avg * msFactor,
335 },
336 Unit: "ms",
337 }
338 }
339
340 type throughputCollector struct {
341 podInformer coreinformers.PodInformer
342 schedulingThroughputs []float64
343 labels map[string]string
344 namespaces []string
345 errorMargin float64
346 }
347
348 func newThroughputCollector(tb ktesting.TB, podInformer coreinformers.PodInformer, labels map[string]string, namespaces []string, errorMargin float64) *throughputCollector {
349 return &throughputCollector{
350 podInformer: podInformer,
351 labels: labels,
352 namespaces: namespaces,
353 errorMargin: errorMargin,
354 }
355 }
356
357 func (tc *throughputCollector) run(tCtx ktesting.TContext) {
358 podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...)
359 if err != nil {
360 klog.Fatalf("%v", err)
361 }
362 lastScheduledCount := len(podsScheduled)
363 ticker := time.NewTicker(throughputSampleInterval)
364 defer ticker.Stop()
365 lastSampleTime := time.Now()
366 started := false
367 skipped := 0
368
369 for {
370 select {
371 case <-tCtx.Done():
372 return
373 case <-ticker.C:
374 now := time.Now()
375 podsScheduled, _, err := getScheduledPods(tc.podInformer, tc.namespaces...)
376 if err != nil {
377 klog.Fatalf("%v", err)
378 }
379
380 scheduled := len(podsScheduled)
381
382 if scheduled == 0 {
383 continue
384 }
385 if !started {
386 started = true
387
388
389 lastScheduledCount = scheduled
390 lastSampleTime = now
391 continue
392 }
393
394 newScheduled := scheduled - lastScheduledCount
395 if newScheduled == 0 {
396
397
398
399
400 skipped++
401 continue
402 }
403
404
405
406
407
408
409 duration := now.Sub(lastSampleTime)
410 durationInSeconds := duration.Seconds()
411 throughput := float64(newScheduled) / durationInSeconds
412 expectedDuration := throughputSampleInterval * time.Duration(skipped+1)
413 errorMargin := (duration - expectedDuration).Seconds() / expectedDuration.Seconds() * 100
414 if tc.errorMargin > 0 && math.Abs(errorMargin) > tc.errorMargin {
415
416 tCtx.Errorf("ERROR: Expected throuput collector to sample at regular time intervals. The %d most recent intervals took %s instead of %s, a difference of %0.1f%%.", skipped+1, duration, expectedDuration, errorMargin)
417 }
418
419
420
421 for i := 0; i <= skipped; i++ {
422 tc.schedulingThroughputs = append(tc.schedulingThroughputs, throughput)
423 }
424 lastScheduledCount = scheduled
425 klog.Infof("%d pods scheduled", lastScheduledCount)
426 skipped = 0
427 lastSampleTime = now
428 }
429 }
430 }
431
432 func (tc *throughputCollector) collect() []DataItem {
433 throughputSummary := DataItem{Labels: tc.labels}
434 if length := len(tc.schedulingThroughputs); length > 0 {
435 sort.Float64s(tc.schedulingThroughputs)
436 sum := 0.0
437 for i := range tc.schedulingThroughputs {
438 sum += tc.schedulingThroughputs[i]
439 }
440
441 throughputSummary.Labels["Metric"] = "SchedulingThroughput"
442 throughputSummary.Data = map[string]float64{
443 "Average": sum / float64(length),
444 "Perc50": tc.schedulingThroughputs[int(math.Ceil(float64(length*50)/100))-1],
445 "Perc90": tc.schedulingThroughputs[int(math.Ceil(float64(length*90)/100))-1],
446 "Perc95": tc.schedulingThroughputs[int(math.Ceil(float64(length*95)/100))-1],
447 "Perc99": tc.schedulingThroughputs[int(math.Ceil(float64(length*99)/100))-1],
448 }
449 throughputSummary.Unit = "pods/s"
450 }
451
452 return []DataItem{throughputSummary}
453 }
454
View as plain text