1
16
17 package kubelet
18
19 import (
20 "bytes"
21 "context"
22 "encoding/json"
23 "fmt"
24 "sort"
25 "strings"
26 "sync"
27 "text/tabwriter"
28 "time"
29
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 utilerrors "k8s.io/apimachinery/pkg/util/errors"
32 "k8s.io/apimachinery/pkg/util/wait"
33
34 clientset "k8s.io/client-go/kubernetes"
35 restclient "k8s.io/client-go/rest"
36 kubeletstatsv1alpha1 "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
37 "k8s.io/kubernetes/test/e2e/framework"
38 e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
39 )
40
41 const (
42
43 proxyTimeout = 2 * time.Minute
44
45
46
47 dockerOperationsKey = "docker_operations_total"
48
49
50
51 dockerOperationsErrorsKey = "docker_operations_errors_total"
52
53
54
55 dockerOperationsTimeoutKey = "docker_operations_timeout_total"
56 )
57
58
59 type ContainerResourceUsage struct {
60 Name string
61 Timestamp time.Time
62 CPUUsageInCores float64
63 MemoryUsageInBytes uint64
64 MemoryWorkingSetInBytes uint64
65 MemoryRSSInBytes uint64
66
67 CPUInterval time.Duration
68 }
69
70
71 type ResourceUsagePerContainer map[string]*ContainerResourceUsage
72
73
74 type ResourceUsagePerNode map[string]ResourceUsagePerContainer
75
76
77
78 type ContainersCPUSummary map[string]map[float64]float64
79
80
81
82 type NodesCPUSummary map[string]ContainersCPUSummary
83
84
85 type RuntimeOperationMonitor struct {
86 client clientset.Interface
87 nodesRuntimeOps map[string]NodeRuntimeOperationErrorRate
88 }
89
90
91 type NodeRuntimeOperationErrorRate map[string]*RuntimeOperationErrorRate
92
93
94 type RuntimeOperationErrorRate struct {
95 TotalNumber float64
96 ErrorRate float64
97 TimeoutRate float64
98 }
99
100
101 func ProxyRequest(ctx context.Context, c clientset.Interface, node, endpoint string, port int) (restclient.Result, error) {
102
103 var result restclient.Result
104 finished := make(chan struct{}, 1)
105 go func() {
106 result = c.CoreV1().RESTClient().Get().
107 Resource("nodes").
108 SubResource("proxy").
109 Name(fmt.Sprintf("%v:%v", node, port)).
110 Suffix(endpoint).
111 Do(ctx)
112
113 finished <- struct{}{}
114 }()
115 select {
116 case <-finished:
117 return result, nil
118 case <-time.After(proxyTimeout):
119 return restclient.Result{}, nil
120 }
121 }
122
123
124 func NewRuntimeOperationMonitor(ctx context.Context, c clientset.Interface) *RuntimeOperationMonitor {
125 m := &RuntimeOperationMonitor{
126 client: c,
127 nodesRuntimeOps: make(map[string]NodeRuntimeOperationErrorRate),
128 }
129 nodes, err := m.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
130 if err != nil {
131 framework.Failf("RuntimeOperationMonitor: unable to get list of nodes: %v", err)
132 }
133 for _, node := range nodes.Items {
134 m.nodesRuntimeOps[node.Name] = make(NodeRuntimeOperationErrorRate)
135 }
136
137 m.GetRuntimeOperationErrorRate(ctx)
138 return m
139 }
140
141
142
143 func (m *RuntimeOperationMonitor) GetRuntimeOperationErrorRate(ctx context.Context) map[string]NodeRuntimeOperationErrorRate {
144 for node := range m.nodesRuntimeOps {
145 nodeResult, err := getNodeRuntimeOperationErrorRate(ctx, m.client, node)
146 if err != nil {
147 framework.Logf("GetRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
148 continue
149 }
150 m.nodesRuntimeOps[node] = nodeResult
151 }
152 return m.nodesRuntimeOps
153 }
154
155
156 func (m *RuntimeOperationMonitor) GetLatestRuntimeOperationErrorRate(ctx context.Context) map[string]NodeRuntimeOperationErrorRate {
157 result := make(map[string]NodeRuntimeOperationErrorRate)
158 for node := range m.nodesRuntimeOps {
159 result[node] = make(NodeRuntimeOperationErrorRate)
160 oldNodeResult := m.nodesRuntimeOps[node]
161 curNodeResult, err := getNodeRuntimeOperationErrorRate(ctx, m.client, node)
162 if err != nil {
163 framework.Logf("GetLatestRuntimeOperationErrorRate: unable to get kubelet metrics from node %q: %v", node, err)
164 continue
165 }
166 for op, cur := range curNodeResult {
167 t := *cur
168 if old, found := oldNodeResult[op]; found {
169 t.ErrorRate = (t.ErrorRate*t.TotalNumber - old.ErrorRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber)
170 t.TimeoutRate = (t.TimeoutRate*t.TotalNumber - old.TimeoutRate*old.TotalNumber) / (t.TotalNumber - old.TotalNumber)
171 t.TotalNumber -= old.TotalNumber
172 }
173 result[node][op] = &t
174 }
175 m.nodesRuntimeOps[node] = curNodeResult
176 }
177 return result
178 }
179
180
181 func FormatRuntimeOperationErrorRate(nodesResult map[string]NodeRuntimeOperationErrorRate) string {
182 lines := []string{}
183 for node, nodeResult := range nodesResult {
184 lines = append(lines, fmt.Sprintf("node %q runtime operation error rate:", node))
185 for op, result := range nodeResult {
186 line := fmt.Sprintf("operation %q: total - %.0f; error rate - %f; timeout rate - %f", op,
187 result.TotalNumber, result.ErrorRate, result.TimeoutRate)
188 lines = append(lines, line)
189 }
190 lines = append(lines, fmt.Sprintln())
191 }
192 return strings.Join(lines, "\n")
193 }
194
195
196 func getNodeRuntimeOperationErrorRate(ctx context.Context, c clientset.Interface, node string) (NodeRuntimeOperationErrorRate, error) {
197 result := make(NodeRuntimeOperationErrorRate)
198 ms, err := e2emetrics.GetKubeletMetrics(ctx, c, node)
199 if err != nil {
200 return result, err
201 }
202
203
204 allOps := ms[dockerOperationsKey]
205 errOps := ms[dockerOperationsErrorsKey]
206 timeoutOps := ms[dockerOperationsTimeoutKey]
207 for _, sample := range allOps {
208 operation := string(sample.Metric["operation_type"])
209 result[operation] = &RuntimeOperationErrorRate{TotalNumber: float64(sample.Value)}
210 }
211 for _, sample := range errOps {
212 operation := string(sample.Metric["operation_type"])
213
214 if _, found := result[operation]; found {
215 result[operation].ErrorRate = float64(sample.Value) / result[operation].TotalNumber
216 }
217 }
218 for _, sample := range timeoutOps {
219 operation := string(sample.Metric["operation_type"])
220 if _, found := result[operation]; found {
221 result[operation].TimeoutRate = float64(sample.Value) / result[operation].TotalNumber
222 }
223 }
224 return result, nil
225 }
226
227
228 func GetStatsSummary(ctx context.Context, c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) {
229 ctx, cancel := context.WithTimeout(ctx, framework.SingleCallTimeout)
230 defer cancel()
231
232 data, err := c.CoreV1().RESTClient().Get().
233 Resource("nodes").
234 SubResource("proxy").
235 Name(fmt.Sprintf("%v:%v", nodeName, framework.KubeletPort)).
236 Suffix("stats/summary").
237 Do(ctx).Raw()
238
239 if err != nil {
240 return nil, err
241 }
242
243 summary := kubeletstatsv1alpha1.Summary{}
244 err = json.Unmarshal(data, &summary)
245 if err != nil {
246 return nil, err
247 }
248 return &summary, nil
249 }
250
251 func getNodeStatsSummary(ctx context.Context, c clientset.Interface, nodeName string) (*kubeletstatsv1alpha1.Summary, error) {
252 data, err := c.CoreV1().RESTClient().Get().
253 Resource("nodes").
254 SubResource("proxy").
255 Name(fmt.Sprintf("%v:%v", nodeName, framework.KubeletPort)).
256 Suffix("stats/summary").
257 SetHeader("Content-Type", "application/json").
258 Do(ctx).Raw()
259
260 if err != nil {
261 return nil, err
262 }
263
264 var summary *kubeletstatsv1alpha1.Summary
265 err = json.Unmarshal(data, &summary)
266 if err != nil {
267 return nil, err
268 }
269 return summary, nil
270 }
271
272 func getSystemContainerStats(summary *kubeletstatsv1alpha1.Summary) map[string]*kubeletstatsv1alpha1.ContainerStats {
273 statsList := summary.Node.SystemContainers
274 statsMap := make(map[string]*kubeletstatsv1alpha1.ContainerStats)
275 for i := range statsList {
276 statsMap[statsList[i].Name] = &statsList[i]
277 }
278
279
280
281 statsMap[rootContainerName] = &kubeletstatsv1alpha1.ContainerStats{
282 CPU: summary.Node.CPU,
283 Memory: summary.Node.Memory,
284 }
285 return statsMap
286 }
287
288 const (
289 rootContainerName = "/"
290 )
291
292
293 func TargetContainers() []string {
294 return []string{
295 rootContainerName,
296 kubeletstatsv1alpha1.SystemContainerRuntime,
297 kubeletstatsv1alpha1.SystemContainerKubelet,
298 }
299 }
300
301 func formatResourceUsageStats(nodeName string, containerStats ResourceUsagePerContainer) string {
302
303
304
305
306
307
308
309
310 buf := &bytes.Buffer{}
311 w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
312 fmt.Fprintf(w, "container\tcpu(cores)\tmemory_working_set(MB)\tmemory_rss(MB)\n")
313 for name, s := range containerStats {
314 fmt.Fprintf(w, "%q\t%.3f\t%.2f\t%.2f\n", name, s.CPUUsageInCores, float64(s.MemoryWorkingSetInBytes)/(1024*1024), float64(s.MemoryRSSInBytes)/(1024*1024))
315 }
316 w.Flush()
317 return fmt.Sprintf("Resource usage on node %q:\n%s", nodeName, buf.String())
318 }
319
320
321 func GetKubeletHeapStats(ctx context.Context, c clientset.Interface, nodeName string) (string, error) {
322 client, err := ProxyRequest(ctx, c, nodeName, "debug/pprof/heap", framework.KubeletPort)
323 if err != nil {
324 return "", err
325 }
326 raw, errRaw := client.Raw()
327 if errRaw != nil {
328 return "", err
329 }
330 kubeletstatsv1alpha1 := string(raw)
331
332 numLines := 23
333 lines := strings.Split(kubeletstatsv1alpha1, "\n")
334 return strings.Join(lines[len(lines)-numLines:], "\n"), nil
335 }
336
337 func computeContainerResourceUsage(name string, oldStats, newStats *kubeletstatsv1alpha1.ContainerStats) *ContainerResourceUsage {
338 return &ContainerResourceUsage{
339 Name: name,
340 Timestamp: newStats.CPU.Time.Time,
341 CPUUsageInCores: float64(*newStats.CPU.UsageCoreNanoSeconds-*oldStats.CPU.UsageCoreNanoSeconds) / float64(newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time).Nanoseconds()),
342 MemoryUsageInBytes: *newStats.Memory.UsageBytes,
343 MemoryWorkingSetInBytes: *newStats.Memory.WorkingSetBytes,
344 MemoryRSSInBytes: *newStats.Memory.RSSBytes,
345 CPUInterval: newStats.CPU.Time.Time.Sub(oldStats.CPU.Time.Time),
346 }
347 }
348
349
350
351
352 type resourceCollector struct {
353 lock sync.RWMutex
354 node string
355 containers []string
356 client clientset.Interface
357 buffers map[string][]*ContainerResourceUsage
358 pollingInterval time.Duration
359 stop func()
360 }
361
362 func newResourceCollector(c clientset.Interface, nodeName string, containerNames []string, pollingInterval time.Duration) *resourceCollector {
363 buffers := make(map[string][]*ContainerResourceUsage)
364 return &resourceCollector{
365 node: nodeName,
366 containers: containerNames,
367 client: c,
368 buffers: buffers,
369 pollingInterval: pollingInterval,
370 }
371 }
372
373
374 func (r *resourceCollector) Start(ctx context.Context) {
375 ctx, cancel := context.WithCancel(ctx)
376 r.stop = cancel
377
378 oldStats := make(map[string]*kubeletstatsv1alpha1.ContainerStats)
379 go wait.UntilWithContext(ctx, func(ctx context.Context) { r.collectStats(ctx, oldStats) }, r.pollingInterval)
380 }
381
382
383 func (r *resourceCollector) Stop() {
384 r.stop()
385 }
386
387
388
389 func (r *resourceCollector) collectStats(ctx context.Context, oldStatsMap map[string]*kubeletstatsv1alpha1.ContainerStats) {
390 summary, err := getNodeStatsSummary(ctx, r.client, r.node)
391 if err != nil {
392 framework.Logf("Error getting node stats summary on %q, err: %v", r.node, err)
393 return
394 }
395 cStatsMap := getSystemContainerStats(summary)
396 r.lock.Lock()
397 defer r.lock.Unlock()
398 for _, name := range r.containers {
399 cStats, ok := cStatsMap[name]
400 if !ok {
401 framework.Logf("Missing info/stats for container %q on node %q", name, r.node)
402 return
403 }
404
405 if oldStats, ok := oldStatsMap[name]; ok {
406 if oldStats.CPU == nil || cStats.CPU == nil || oldStats.Memory == nil || cStats.Memory == nil {
407 continue
408 }
409 if oldStats.CPU.Time.Equal(&cStats.CPU.Time) {
410
411 continue
412 }
413 r.buffers[name] = append(r.buffers[name], computeContainerResourceUsage(name, oldStats, cStats))
414 }
415
416 oldStatsMap[name] = cStats
417 }
418 }
419
420 func (r *resourceCollector) GetLatest() (ResourceUsagePerContainer, error) {
421 r.lock.RLock()
422 defer r.lock.RUnlock()
423 kubeletstatsv1alpha1 := make(ResourceUsagePerContainer)
424 for _, name := range r.containers {
425 contStats, ok := r.buffers[name]
426 if !ok || len(contStats) == 0 {
427 return nil, fmt.Errorf("Resource usage on node %q is not ready yet", r.node)
428 }
429 kubeletstatsv1alpha1[name] = contStats[len(contStats)-1]
430 }
431 return kubeletstatsv1alpha1, nil
432 }
433
434
435 func (r *resourceCollector) Reset() {
436 r.lock.Lock()
437 defer r.lock.Unlock()
438 for _, name := range r.containers {
439 r.buffers[name] = []*ContainerResourceUsage{}
440 }
441 }
442
443 type resourceUsageByCPU []*ContainerResourceUsage
444
445 func (r resourceUsageByCPU) Len() int { return len(r) }
446 func (r resourceUsageByCPU) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
447 func (r resourceUsageByCPU) Less(i, j int) bool { return r[i].CPUUsageInCores < r[j].CPUUsageInCores }
448
449
450 var percentiles = [...]float64{0.05, 0.20, 0.50, 0.70, 0.90, 0.95, 0.99}
451
452
453
454 func (r *resourceCollector) GetBasicCPUStats(containerName string) map[float64]float64 {
455 r.lock.RLock()
456 defer r.lock.RUnlock()
457 result := make(map[float64]float64, len(percentiles))
458 usages := r.buffers[containerName]
459 sort.Sort(resourceUsageByCPU(usages))
460 for _, q := range percentiles {
461 index := int(float64(len(usages))*q) - 1
462 if index < 0 {
463
464 result[q] = 0
465 continue
466 }
467 result[q] = usages[index].CPUUsageInCores
468 }
469 return result
470 }
471
472
473 type ResourceMonitor struct {
474 client clientset.Interface
475 containers []string
476 pollingInterval time.Duration
477 collectors map[string]*resourceCollector
478 }
479
480
481 func NewResourceMonitor(c clientset.Interface, containerNames []string, pollingInterval time.Duration) *ResourceMonitor {
482 return &ResourceMonitor{
483 containers: containerNames,
484 client: c,
485 pollingInterval: pollingInterval,
486 }
487 }
488
489
490 func (r *ResourceMonitor) Start(ctx context.Context) {
491
492 nodes, err := r.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
493 if err != nil {
494 framework.Failf("ResourceMonitor: unable to get list of nodes: %v", err)
495 }
496 r.collectors = make(map[string]*resourceCollector, 0)
497 for _, node := range nodes.Items {
498 collector := newResourceCollector(r.client, node.Name, r.containers, r.pollingInterval)
499 r.collectors[node.Name] = collector
500 collector.Start(ctx)
501 }
502 }
503
504
505 func (r *ResourceMonitor) Stop() {
506 for _, collector := range r.collectors {
507 collector.Stop()
508 }
509 }
510
511
512 func (r *ResourceMonitor) Reset() {
513 for _, collector := range r.collectors {
514 collector.Reset()
515 }
516 }
517
518
519 func (r *ResourceMonitor) LogLatest() {
520 summary, err := r.GetLatest()
521 if err != nil {
522 framework.Logf("%v", err)
523 }
524 framework.Logf("%s", r.FormatResourceUsage(summary))
525 }
526
527
528
529 func (r *ResourceMonitor) FormatResourceUsage(s ResourceUsagePerNode) string {
530 summary := []string{}
531 for node, usage := range s {
532 summary = append(summary, formatResourceUsageStats(node, usage))
533 }
534 return strings.Join(summary, "\n")
535 }
536
537
538 func (r *ResourceMonitor) GetLatest() (ResourceUsagePerNode, error) {
539 result := make(ResourceUsagePerNode)
540 errs := []error{}
541 for key, collector := range r.collectors {
542 s, err := collector.GetLatest()
543 if err != nil {
544 errs = append(errs, err)
545 continue
546 }
547 result[key] = s
548 }
549 return result, utilerrors.NewAggregate(errs)
550 }
551
552
553 func (r *ResourceMonitor) GetMasterNodeLatest(usagePerNode ResourceUsagePerNode) ResourceUsagePerNode {
554 result := make(ResourceUsagePerNode)
555 var masterUsage ResourceUsagePerContainer
556 var nodesUsage []ResourceUsagePerContainer
557 for node, usage := range usagePerNode {
558 if strings.HasSuffix(node, "master") {
559 masterUsage = usage
560 } else {
561 nodesUsage = append(nodesUsage, usage)
562 }
563 }
564 nodeAvgUsage := make(ResourceUsagePerContainer)
565 for _, nodeUsage := range nodesUsage {
566 for c, usage := range nodeUsage {
567 if _, found := nodeAvgUsage[c]; !found {
568 nodeAvgUsage[c] = &ContainerResourceUsage{Name: usage.Name}
569 }
570 nodeAvgUsage[c].CPUUsageInCores += usage.CPUUsageInCores
571 nodeAvgUsage[c].MemoryUsageInBytes += usage.MemoryUsageInBytes
572 nodeAvgUsage[c].MemoryWorkingSetInBytes += usage.MemoryWorkingSetInBytes
573 nodeAvgUsage[c].MemoryRSSInBytes += usage.MemoryRSSInBytes
574 }
575 }
576 for c := range nodeAvgUsage {
577 nodeAvgUsage[c].CPUUsageInCores /= float64(len(nodesUsage))
578 nodeAvgUsage[c].MemoryUsageInBytes /= uint64(len(nodesUsage))
579 nodeAvgUsage[c].MemoryWorkingSetInBytes /= uint64(len(nodesUsage))
580 nodeAvgUsage[c].MemoryRSSInBytes /= uint64(len(nodesUsage))
581 }
582 result["master"] = masterUsage
583 result["node"] = nodeAvgUsage
584 return result
585 }
586
587
588 func (r *ResourceMonitor) FormatCPUSummary(summary NodesCPUSummary) string {
589
590
591
592
593
594
595
596 var summaryStrings []string
597 var header []string
598 header = append(header, "container")
599 for _, p := range percentiles {
600 header = append(header, fmt.Sprintf("%.0fth%%", p*100))
601 }
602 for nodeName, containers := range summary {
603 buf := &bytes.Buffer{}
604 w := tabwriter.NewWriter(buf, 1, 0, 1, ' ', 0)
605 fmt.Fprintf(w, "%s\n", strings.Join(header, "\t"))
606 for _, containerName := range TargetContainers() {
607 var s []string
608 s = append(s, fmt.Sprintf("%q", containerName))
609 data, ok := containers[containerName]
610 for _, p := range percentiles {
611 value := "N/A"
612 if ok {
613 value = fmt.Sprintf("%.3f", data[p])
614 }
615 s = append(s, value)
616 }
617 fmt.Fprintf(w, "%s\n", strings.Join(s, "\t"))
618 }
619 w.Flush()
620 summaryStrings = append(summaryStrings, fmt.Sprintf("CPU usage of containers on node %q\n:%s", nodeName, buf.String()))
621 }
622 return strings.Join(summaryStrings, "\n")
623 }
624
625
626 func (r *ResourceMonitor) LogCPUSummary() {
627 summary := r.GetCPUSummary()
628 framework.Logf("%s", r.FormatCPUSummary(summary))
629 }
630
631
632 func (r *ResourceMonitor) GetCPUSummary() NodesCPUSummary {
633 result := make(NodesCPUSummary)
634 for nodeName, collector := range r.collectors {
635 result[nodeName] = make(ContainersCPUSummary)
636 for _, containerName := range TargetContainers() {
637 data := collector.GetBasicCPUStats(containerName)
638 result[nodeName][containerName] = data
639 }
640 }
641 return result
642 }
643
644
645 func (r *ResourceMonitor) GetMasterNodeCPUSummary(summaryPerNode NodesCPUSummary) NodesCPUSummary {
646 result := make(NodesCPUSummary)
647 var masterSummary ContainersCPUSummary
648 var nodesSummaries []ContainersCPUSummary
649 for node, summary := range summaryPerNode {
650 if strings.HasSuffix(node, "master") {
651 masterSummary = summary
652 } else {
653 nodesSummaries = append(nodesSummaries, summary)
654 }
655 }
656
657 nodeAvgSummary := make(ContainersCPUSummary)
658 for _, nodeSummary := range nodesSummaries {
659 for c, summary := range nodeSummary {
660 if _, found := nodeAvgSummary[c]; !found {
661 nodeAvgSummary[c] = map[float64]float64{}
662 }
663 for perc, value := range summary {
664 nodeAvgSummary[c][perc] += value
665 }
666 }
667 }
668 for c := range nodeAvgSummary {
669 for perc := range nodeAvgSummary[c] {
670 nodeAvgSummary[c][perc] /= float64(len(nodesSummaries))
671 }
672 }
673 result["master"] = masterSummary
674 result["node"] = nodeAvgSummary
675 return result
676 }
677
View as plain text