1
16
17 package stats
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "path/filepath"
24 "sort"
25 "strings"
26 "sync"
27 "time"
28
29 cadvisormemory "github.com/google/cadvisor/cache/memory"
30 cadvisorfs "github.com/google/cadvisor/fs"
31 cadvisorapiv2 "github.com/google/cadvisor/info/v2"
32 "google.golang.org/grpc/codes"
33 "google.golang.org/grpc/status"
34 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
35 "k8s.io/apimachinery/pkg/types"
36 internalapi "k8s.io/cri-api/pkg/apis"
37 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
38 "k8s.io/klog/v2"
39 statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
40 kubetypes "k8s.io/kubelet/pkg/types"
41 "k8s.io/kubernetes/pkg/kubelet/cadvisor"
42 "k8s.io/kubernetes/pkg/kubelet/server/stats"
43 "k8s.io/utils/clock"
44 )
45
46 var (
47
48 defaultCachePeriod = 10 * time.Minute
49 )
50
51
52 type cpuUsageRecord struct {
53 stats *runtimeapi.CpuUsage
54 usageNanoCores *uint64
55 }
56
57
58
59 type criStatsProvider struct {
60
61
62
63 cadvisor cadvisor.Interface
64
65 resourceAnalyzer stats.ResourceAnalyzer
66
67
68 runtimeService internalapi.RuntimeService
69
70 imageService internalapi.ImageManagerService
71
72 hostStatsProvider HostStatsProvider
73
74 windowsNetworkStatsProvider interface{}
75
76 clock clock.Clock
77
78
79 cpuUsageCache map[string]*cpuUsageRecord
80 mutex sync.RWMutex
81 podAndContainerStatsFromCRI bool
82 }
83
84
85
86 func newCRIStatsProvider(
87 cadvisor cadvisor.Interface,
88 resourceAnalyzer stats.ResourceAnalyzer,
89 runtimeService internalapi.RuntimeService,
90 imageService internalapi.ImageManagerService,
91 hostStatsProvider HostStatsProvider,
92 podAndContainerStatsFromCRI bool,
93 ) containerStatsProvider {
94 return &criStatsProvider{
95 cadvisor: cadvisor,
96 resourceAnalyzer: resourceAnalyzer,
97 runtimeService: runtimeService,
98 imageService: imageService,
99 hostStatsProvider: hostStatsProvider,
100 cpuUsageCache: make(map[string]*cpuUsageRecord),
101 podAndContainerStatsFromCRI: podAndContainerStatsFromCRI,
102 clock: clock.RealClock{},
103 }
104 }
105
106
107 func (p *criStatsProvider) ListPodStats(ctx context.Context) ([]statsapi.PodStats, error) {
108
109 return p.listPodStats(ctx, false)
110 }
111
112
113
114
115
116
117
118
119
120
121
122 func (p *criStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage(ctx context.Context) ([]statsapi.PodStats, error) {
123
124 return p.listPodStats(ctx, true)
125 }
126
127 func (p *criStatsProvider) listPodStats(ctx context.Context, updateCPUNanoCoreUsage bool) ([]statsapi.PodStats, error) {
128
129
130 rootFsInfo, err := p.cadvisor.RootFsInfo()
131 if err != nil {
132 return nil, fmt.Errorf("failed to get rootFs info: %v", err)
133 }
134
135 containerMap, podSandboxMap, err := p.getPodAndContainerMaps(ctx)
136 if err != nil {
137 return nil, fmt.Errorf("failed to get pod or container map: %v", err)
138 }
139
140 if p.podAndContainerStatsFromCRI {
141 result, err := p.listPodStatsStrictlyFromCRI(ctx, updateCPUNanoCoreUsage, containerMap, podSandboxMap, &rootFsInfo)
142 if err == nil {
143
144 return result, nil
145 }
146 s, ok := status.FromError(err)
147
148 if !ok || s.Code() != codes.Unimplemented {
149 return nil, err
150 }
151
152 klog.V(5).ErrorS(err,
153 "CRI implementation must be updated to support ListPodSandboxStats if PodAndContainerStatsFromCRI feature gate is enabled. Falling back to populating with cAdvisor; this call will fail in the future.",
154 )
155 }
156 return p.listPodStatsPartiallyFromCRI(ctx, updateCPUNanoCoreUsage, containerMap, podSandboxMap, &rootFsInfo)
157 }
158
159 func (p *criStatsProvider) listPodStatsPartiallyFromCRI(ctx context.Context, updateCPUNanoCoreUsage bool, containerMap map[string]*runtimeapi.Container, podSandboxMap map[string]*runtimeapi.PodSandbox, rootFsInfo *cadvisorapiv2.FsInfo) ([]statsapi.PodStats, error) {
160
161
162
163 fsIDtoInfo := make(map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo)
164
165
166 sandboxIDToPodStats := make(map[string]*statsapi.PodStats)
167
168 resp, err := p.runtimeService.ListContainerStats(ctx, &runtimeapi.ContainerStatsFilter{})
169 if err != nil {
170 return nil, fmt.Errorf("failed to list all container stats: %v", err)
171 }
172 allInfos, err := getCadvisorContainerInfo(p.cadvisor)
173 if err != nil {
174 return nil, fmt.Errorf("failed to fetch cadvisor stats: %v", err)
175 }
176 caInfos, allInfos := getCRICadvisorStats(allInfos)
177
178
179
180 containerNetworkStats, err := p.listContainerNetworkStats()
181 if err != nil {
182 return nil, fmt.Errorf("failed to list container network stats: %v", err)
183 }
184
185 for _, stats := range resp {
186 containerID := stats.Attributes.Id
187 container, found := containerMap[containerID]
188 if !found {
189 continue
190 }
191
192 podSandboxID := container.PodSandboxId
193 podSandbox, found := podSandboxMap[podSandboxID]
194 if !found {
195 continue
196 }
197
198
199
200 ps, found := sandboxIDToPodStats[podSandboxID]
201 if !found {
202 ps = buildPodStats(podSandbox)
203 sandboxIDToPodStats[podSandboxID] = ps
204 }
205
206
207 cs, err := p.makeContainerStats(stats, container, rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata(), updateCPUNanoCoreUsage)
208 if err != nil {
209 return nil, fmt.Errorf("make container stats: %w", err)
210 }
211 p.addPodNetworkStats(ps, podSandboxID, caInfos, cs, containerNetworkStats[podSandboxID])
212 p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
213 p.addSwapStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
214 p.addProcessStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
215
216
217
218 caStats, caFound := caInfos[containerID]
219 if !caFound {
220 klog.V(5).InfoS("Unable to find cadvisor stats for container", "containerID", containerID)
221 } else {
222 p.addCadvisorContainerStats(cs, &caStats)
223 }
224 ps.Containers = append(ps.Containers, *cs)
225 }
226
227 p.cleanupOutdatedCaches()
228
229 result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats))
230 for _, s := range sandboxIDToPodStats {
231 makePodStorageStats(s, rootFsInfo, p.resourceAnalyzer, p.hostStatsProvider, true)
232 result = append(result, *s)
233 }
234 return result, nil
235 }
236
237 func (p *criStatsProvider) listPodStatsStrictlyFromCRI(ctx context.Context, updateCPUNanoCoreUsage bool, containerMap map[string]*runtimeapi.Container, podSandboxMap map[string]*runtimeapi.PodSandbox, rootFsInfo *cadvisorapiv2.FsInfo) ([]statsapi.PodStats, error) {
238 criSandboxStats, err := p.runtimeService.ListPodSandboxStats(ctx, &runtimeapi.PodSandboxStatsFilter{})
239 if err != nil {
240 return nil, err
241 }
242
243 fsIDtoInfo := make(map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo)
244 summarySandboxStats := make([]statsapi.PodStats, 0, len(podSandboxMap))
245 for _, criSandboxStat := range criSandboxStats {
246 if criSandboxStat == nil || criSandboxStat.Attributes == nil {
247 klog.V(5).InfoS("Unable to find CRI stats for sandbox")
248 continue
249 }
250 podSandbox, found := podSandboxMap[criSandboxStat.Attributes.Id]
251 if !found {
252 continue
253 }
254 ps := buildPodStats(podSandbox)
255 if err := p.addCRIPodContainerStats(criSandboxStat, ps, fsIDtoInfo, containerMap, podSandbox, rootFsInfo, updateCPUNanoCoreUsage); err != nil {
256 return nil, fmt.Errorf("add CRI pod container stats: %w", err)
257 }
258 addCRIPodNetworkStats(ps, criSandboxStat)
259 addCRIPodCPUStats(ps, criSandboxStat)
260 addCRIPodMemoryStats(ps, criSandboxStat)
261 addCRIPodProcessStats(ps, criSandboxStat)
262 makePodStorageStats(ps, rootFsInfo, p.resourceAnalyzer, p.hostStatsProvider, true)
263 summarySandboxStats = append(summarySandboxStats, *ps)
264 }
265 return summarySandboxStats, nil
266 }
267
268
269 func (p *criStatsProvider) ListPodCPUAndMemoryStats(ctx context.Context) ([]statsapi.PodStats, error) {
270
271 sandboxIDToPodStats := make(map[string]*statsapi.PodStats)
272 containerMap, podSandboxMap, err := p.getPodAndContainerMaps(ctx)
273 if err != nil {
274 return nil, fmt.Errorf("failed to get pod or container map: %v", err)
275 }
276
277 result := make([]statsapi.PodStats, 0, len(podSandboxMap))
278 if p.podAndContainerStatsFromCRI {
279 criSandboxStats, err := p.runtimeService.ListPodSandboxStats(ctx, &runtimeapi.PodSandboxStatsFilter{})
280
281 if err == nil {
282 for _, criSandboxStat := range criSandboxStats {
283 podSandbox, found := podSandboxMap[criSandboxStat.Attributes.Id]
284 if !found {
285 continue
286 }
287 ps := buildPodStats(podSandbox)
288 addCRIPodCPUStats(ps, criSandboxStat)
289 addCRIPodMemoryStats(ps, criSandboxStat)
290 result = append(result, *ps)
291 }
292 return result, err
293 }
294
295 s, ok := status.FromError(err)
296
297 if !ok || s.Code() != codes.Unimplemented {
298 return nil, err
299 }
300
301 klog.ErrorS(err,
302 "CRI implementation must be updated to support ListPodSandboxStats if PodAndContainerStatsFromCRI feature gate is enabled. Falling back to populating with cAdvisor; this call will fail in the future.",
303 )
304 }
305
306 resp, err := p.runtimeService.ListContainerStats(ctx, &runtimeapi.ContainerStatsFilter{})
307 if err != nil {
308 return nil, fmt.Errorf("failed to list all container stats: %v", err)
309 }
310
311 allInfos, err := getCadvisorContainerInfo(p.cadvisor)
312 if err != nil {
313 return nil, fmt.Errorf("failed to fetch cadvisor stats: %v", err)
314 }
315 caInfos, allInfos := getCRICadvisorStats(allInfos)
316
317 for _, stats := range resp {
318 containerID := stats.Attributes.Id
319 container, found := containerMap[containerID]
320 if !found {
321 continue
322 }
323
324 podSandboxID := container.PodSandboxId
325 podSandbox, found := podSandboxMap[podSandboxID]
326 if !found {
327 continue
328 }
329
330
331
332 ps, found := sandboxIDToPodStats[podSandboxID]
333 if !found {
334 ps = buildPodStats(podSandbox)
335 sandboxIDToPodStats[podSandboxID] = ps
336 }
337
338
339 cs := p.makeContainerCPUAndMemoryStats(stats, container)
340 p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
341
342
343
344 caStats, caFound := caInfos[containerID]
345 if !caFound {
346 klog.V(4).InfoS("Unable to find cadvisor stats for container", "containerID", containerID)
347 } else {
348 p.addCadvisorContainerCPUAndMemoryStats(cs, &caStats)
349 }
350 ps.Containers = append(ps.Containers, *cs)
351 }
352
353 p.cleanupOutdatedCaches()
354
355 for _, s := range sandboxIDToPodStats {
356 result = append(result, *s)
357 }
358 return result, nil
359 }
360
361 func (p *criStatsProvider) getPodAndContainerMaps(ctx context.Context) (map[string]*runtimeapi.Container, map[string]*runtimeapi.PodSandbox, error) {
362 containers, err := p.runtimeService.ListContainers(ctx, &runtimeapi.ContainerFilter{})
363 if err != nil {
364 return nil, nil, fmt.Errorf("failed to list all containers: %v", err)
365 }
366
367
368 podSandboxMap := make(map[string]*runtimeapi.PodSandbox)
369 podSandboxes, err := p.runtimeService.ListPodSandbox(ctx, &runtimeapi.PodSandboxFilter{})
370 if err != nil {
371 return nil, nil, fmt.Errorf("failed to list all pod sandboxes: %v", err)
372 }
373 podSandboxes = removeTerminatedPods(podSandboxes)
374 for _, s := range podSandboxes {
375 podSandboxMap[s.Id] = s
376 }
377
378 containers = removeTerminatedContainers(containers)
379
380 containerMap := make(map[string]*runtimeapi.Container)
381 for _, c := range containers {
382 containerMap[c.Id] = c
383 }
384 return containerMap, podSandboxMap, nil
385 }
386
387
388 func (p *criStatsProvider) ImageFsStats(ctx context.Context) (imageFsRet *statsapi.FsStats, containerFsRet *statsapi.FsStats, errRet error) {
389 resp, err := p.imageService.ImageFsInfo(ctx)
390 if err != nil {
391 return nil, nil, err
392 }
393
394
395
396
397
398 if len(resp.GetImageFilesystems()) == 0 {
399 return nil, nil, fmt.Errorf("imageFs information is unavailable")
400 }
401 fs := resp.GetImageFilesystems()[0]
402 imageFsRet = &statsapi.FsStats{
403 Time: metav1.NewTime(time.Unix(0, fs.Timestamp)),
404 UsedBytes: &fs.UsedBytes.Value,
405 }
406 if fs.InodesUsed != nil {
407 imageFsRet.InodesUsed = &fs.InodesUsed.Value
408 }
409 imageFsInfo, err := p.getFsInfo(fs.GetFsId())
410 if err != nil {
411 return nil, nil, fmt.Errorf("get filesystem info: %w", err)
412 }
413 if imageFsInfo != nil {
414
415
416
417
418 imageFsRet.AvailableBytes = &imageFsInfo.Available
419 imageFsRet.CapacityBytes = &imageFsInfo.Capacity
420 imageFsRet.InodesFree = imageFsInfo.InodesFree
421 imageFsRet.Inodes = imageFsInfo.Inodes
422 }
423
424 return imageFsRet, imageFsRet, nil
425 }
426
427
428
429 func (p *criStatsProvider) ImageFsDevice(ctx context.Context) (string, error) {
430 resp, err := p.imageService.ImageFsInfo(ctx)
431 if err != nil {
432 return "", err
433 }
434 for _, fs := range resp.GetImageFilesystems() {
435 fsInfo, err := p.getFsInfo(fs.GetFsId())
436 if err != nil {
437 return "", fmt.Errorf("get filesystem info: %w", err)
438 }
439 if fsInfo != nil {
440 return fsInfo.Device, nil
441 }
442 }
443 return "", errors.New("imagefs device is not found")
444 }
445
446
447
448
449 func (p *criStatsProvider) getFsInfo(fsID *runtimeapi.FilesystemIdentifier) (*cadvisorapiv2.FsInfo, error) {
450 if fsID == nil {
451 klog.V(2).InfoS("Failed to get filesystem info: fsID is nil")
452 return nil, nil
453 }
454 mountpoint := fsID.GetMountpoint()
455 fsInfo, err := p.cadvisor.GetDirFsInfo(mountpoint)
456 if err != nil {
457 msg := "Failed to get the info of the filesystem with mountpoint"
458 if errors.Is(err, cadvisorfs.ErrNoSuchDevice) ||
459 errors.Is(err, cadvisorfs.ErrDeviceNotInPartitionsMap) ||
460 errors.Is(err, cadvisormemory.ErrDataNotFound) {
461 klog.V(2).InfoS(msg, "mountpoint", mountpoint, "err", err)
462 } else {
463 klog.ErrorS(err, msg, "mountpoint", mountpoint)
464 return nil, fmt.Errorf("%s: %w", msg, err)
465 }
466 return nil, nil
467 }
468 return &fsInfo, nil
469 }
470
471
472 func buildPodStats(podSandbox *runtimeapi.PodSandbox) *statsapi.PodStats {
473 return &statsapi.PodStats{
474 PodRef: statsapi.PodReference{
475 Name: podSandbox.Metadata.Name,
476 UID: podSandbox.Metadata.Uid,
477 Namespace: podSandbox.Metadata.Namespace,
478 },
479
480 StartTime: metav1.NewTime(time.Unix(0, podSandbox.CreatedAt)),
481 }
482 }
483
484 func (p *criStatsProvider) addPodNetworkStats(
485 ps *statsapi.PodStats,
486 podSandboxID string,
487 caInfos map[string]cadvisorapiv2.ContainerInfo,
488 cs *statsapi.ContainerStats,
489 netStats *statsapi.NetworkStats,
490 ) {
491 caPodSandbox, found := caInfos[podSandboxID]
492
493 if found {
494 networkStats := cadvisorInfoToNetworkStats(&caPodSandbox)
495 if networkStats != nil {
496 ps.Network = networkStats
497 return
498 }
499 }
500
501
502 if netStats != nil {
503 ps.Network = netStats
504 return
505 }
506
507
508 klog.V(4).InfoS("Unable to find network stats for sandbox", "sandboxID", podSandboxID)
509 }
510
511 func (p *criStatsProvider) addPodCPUMemoryStats(
512 ps *statsapi.PodStats,
513 podUID types.UID,
514 allInfos map[string]cadvisorapiv2.ContainerInfo,
515 cs *statsapi.ContainerStats,
516 ) {
517
518 podCgroupInfo := getCadvisorPodInfoFromPodUID(podUID, allInfos)
519 if podCgroupInfo != nil {
520 cpu, memory := cadvisorInfoToCPUandMemoryStats(podCgroupInfo)
521 ps.CPU = cpu
522 ps.Memory = memory
523 return
524 }
525
526
527 if cs.CPU != nil {
528 if ps.CPU == nil {
529 ps.CPU = &statsapi.CPUStats{}
530 }
531
532 ps.CPU.Time = cs.CPU.Time
533 usageCoreNanoSeconds := getUint64Value(cs.CPU.UsageCoreNanoSeconds) + getUint64Value(ps.CPU.UsageCoreNanoSeconds)
534 usageNanoCores := getUint64Value(cs.CPU.UsageNanoCores) + getUint64Value(ps.CPU.UsageNanoCores)
535 ps.CPU.UsageCoreNanoSeconds = &usageCoreNanoSeconds
536 ps.CPU.UsageNanoCores = &usageNanoCores
537 }
538
539 if cs.Memory != nil {
540 if ps.Memory == nil {
541 ps.Memory = &statsapi.MemoryStats{}
542 }
543
544 ps.Memory.Time = cs.Memory.Time
545 availableBytes := getUint64Value(cs.Memory.AvailableBytes) + getUint64Value(ps.Memory.AvailableBytes)
546 usageBytes := getUint64Value(cs.Memory.UsageBytes) + getUint64Value(ps.Memory.UsageBytes)
547 workingSetBytes := getUint64Value(cs.Memory.WorkingSetBytes) + getUint64Value(ps.Memory.WorkingSetBytes)
548 rSSBytes := getUint64Value(cs.Memory.RSSBytes) + getUint64Value(ps.Memory.RSSBytes)
549 pageFaults := getUint64Value(cs.Memory.PageFaults) + getUint64Value(ps.Memory.PageFaults)
550 majorPageFaults := getUint64Value(cs.Memory.MajorPageFaults) + getUint64Value(ps.Memory.MajorPageFaults)
551 ps.Memory.AvailableBytes = &availableBytes
552 ps.Memory.UsageBytes = &usageBytes
553 ps.Memory.WorkingSetBytes = &workingSetBytes
554 ps.Memory.RSSBytes = &rSSBytes
555 ps.Memory.PageFaults = &pageFaults
556 ps.Memory.MajorPageFaults = &majorPageFaults
557 }
558 }
559
560 func (p *criStatsProvider) addSwapStats(
561 ps *statsapi.PodStats,
562 podUID types.UID,
563 allInfos map[string]cadvisorapiv2.ContainerInfo,
564 cs *statsapi.ContainerStats,
565 ) {
566
567 podCgroupInfo := getCadvisorPodInfoFromPodUID(podUID, allInfos)
568 if podCgroupInfo != nil {
569 ps.Swap = cadvisorInfoToSwapStats(podCgroupInfo)
570 return
571 }
572
573
574 if cs.Swap != nil {
575 if ps.Swap == nil {
576 ps.Swap = &statsapi.SwapStats{Time: cs.Swap.Time}
577 }
578 swapAvailableBytes := getUint64Value(cs.Swap.SwapAvailableBytes) + getUint64Value(ps.Swap.SwapAvailableBytes)
579 swapUsageBytes := getUint64Value(cs.Swap.SwapUsageBytes) + getUint64Value(ps.Swap.SwapUsageBytes)
580 ps.Swap.SwapAvailableBytes = &swapAvailableBytes
581 ps.Swap.SwapUsageBytes = &swapUsageBytes
582 }
583 }
584
585 func (p *criStatsProvider) addProcessStats(
586 ps *statsapi.PodStats,
587 podUID types.UID,
588 allInfos map[string]cadvisorapiv2.ContainerInfo,
589 cs *statsapi.ContainerStats,
590 ) {
591
592 info := getCadvisorPodInfoFromPodUID(podUID, allInfos)
593 if info != nil {
594 ps.ProcessStats = cadvisorInfoToProcessStats(info)
595 return
596 }
597 }
598
599 func (p *criStatsProvider) makeContainerStats(
600 stats *runtimeapi.ContainerStats,
601 container *runtimeapi.Container,
602 rootFsInfo *cadvisorapiv2.FsInfo,
603 fsIDtoInfo map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo,
604 meta *runtimeapi.PodSandboxMetadata,
605 updateCPUNanoCoreUsage bool,
606 ) (*statsapi.ContainerStats, error) {
607 result := &statsapi.ContainerStats{
608 Name: stats.Attributes.Metadata.Name,
609
610 StartTime: metav1.NewTime(time.Unix(0, container.CreatedAt)),
611 CPU: &statsapi.CPUStats{},
612 Memory: &statsapi.MemoryStats{},
613 Rootfs: &statsapi.FsStats{},
614 Swap: &statsapi.SwapStats{},
615
616 }
617 if stats.Cpu != nil {
618 result.CPU.Time = metav1.NewTime(time.Unix(0, stats.Cpu.Timestamp))
619 if stats.Cpu.UsageCoreNanoSeconds != nil {
620 result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
621 }
622 var usageNanoCores *uint64
623 if updateCPUNanoCoreUsage {
624 usageNanoCores = p.getAndUpdateContainerUsageNanoCores(stats)
625 } else {
626 usageNanoCores = p.getContainerUsageNanoCores(stats)
627 }
628 if usageNanoCores != nil {
629 result.CPU.UsageNanoCores = usageNanoCores
630 }
631 } else {
632 result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
633 result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
634 result.CPU.UsageNanoCores = uint64Ptr(0)
635 }
636 if stats.Memory != nil {
637 result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
638 if stats.Memory.WorkingSetBytes != nil {
639 result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value
640 }
641 } else {
642 result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
643 result.Memory.WorkingSetBytes = uint64Ptr(0)
644 }
645 if stats.Swap != nil {
646 result.Swap.Time = metav1.NewTime(time.Unix(0, stats.Swap.Timestamp))
647 if stats.Swap.SwapUsageBytes != nil {
648 result.Swap.SwapUsageBytes = &stats.Swap.SwapUsageBytes.Value
649 }
650 if stats.Swap.SwapAvailableBytes != nil {
651 result.Swap.SwapAvailableBytes = &stats.Swap.SwapAvailableBytes.Value
652 }
653 } else {
654 result.Swap.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
655 result.Swap.SwapUsageBytes = uint64Ptr(0)
656 result.Swap.SwapAvailableBytes = uint64Ptr(0)
657 }
658 if stats.WritableLayer != nil {
659 result.Rootfs.Time = metav1.NewTime(time.Unix(0, stats.WritableLayer.Timestamp))
660 if stats.WritableLayer.UsedBytes != nil {
661 result.Rootfs.UsedBytes = &stats.WritableLayer.UsedBytes.Value
662 }
663 if stats.WritableLayer.InodesUsed != nil {
664 result.Rootfs.InodesUsed = &stats.WritableLayer.InodesUsed.Value
665 }
666 }
667 fsID := stats.GetWritableLayer().GetFsId()
668 var err error
669 if fsID != nil {
670 imageFsInfo, found := fsIDtoInfo[*fsID]
671 if !found {
672 imageFsInfo, err = p.getFsInfo(fsID)
673 if err != nil {
674 return nil, fmt.Errorf("get filesystem info: %w", err)
675 }
676 fsIDtoInfo[*fsID] = imageFsInfo
677 }
678 if imageFsInfo != nil {
679
680
681
682
683 result.Rootfs.AvailableBytes = &imageFsInfo.Available
684 result.Rootfs.CapacityBytes = &imageFsInfo.Capacity
685 result.Rootfs.InodesFree = imageFsInfo.InodesFree
686 result.Rootfs.Inodes = imageFsInfo.Inodes
687 }
688 }
689
690
691
692 result.Logs, err = p.hostStatsProvider.getPodContainerLogStats(meta.GetNamespace(), meta.GetName(), types.UID(meta.GetUid()), container.GetMetadata().GetName(), rootFsInfo)
693 if err != nil {
694 klog.ErrorS(err, "Unable to fetch container log stats", "containerName", container.GetMetadata().GetName())
695 }
696 return result, nil
697 }
698
699 func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
700 stats *runtimeapi.ContainerStats,
701 container *runtimeapi.Container,
702 ) *statsapi.ContainerStats {
703 result := &statsapi.ContainerStats{
704 Name: stats.Attributes.Metadata.Name,
705
706 StartTime: metav1.NewTime(time.Unix(0, container.CreatedAt)),
707 CPU: &statsapi.CPUStats{},
708 Memory: &statsapi.MemoryStats{},
709
710 }
711 if stats.Cpu != nil {
712 result.CPU.Time = metav1.NewTime(time.Unix(0, stats.Cpu.Timestamp))
713 if stats.Cpu.UsageCoreNanoSeconds != nil {
714 result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
715 }
716
717 usageNanoCores := p.getContainerUsageNanoCores(stats)
718 if usageNanoCores != nil {
719 result.CPU.UsageNanoCores = usageNanoCores
720 }
721 } else {
722 result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
723 result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
724 result.CPU.UsageNanoCores = uint64Ptr(0)
725 }
726 if stats.Memory != nil {
727 result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
728 if stats.Memory.WorkingSetBytes != nil {
729 result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value
730 }
731 } else {
732 result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
733 result.Memory.WorkingSetBytes = uint64Ptr(0)
734 }
735
736 return result
737 }
738
739
740
741 func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
742 if stats == nil || stats.Attributes == nil {
743 return nil
744 }
745
746
747 if stats.Cpu != nil && stats.Cpu.UsageNanoCores != nil {
748 return &stats.Cpu.UsageNanoCores.Value
749 }
750
751 p.mutex.RLock()
752 defer p.mutex.RUnlock()
753
754 cached, ok := p.cpuUsageCache[stats.Attributes.Id]
755 if !ok || cached.usageNanoCores == nil {
756 return nil
757 }
758
759 latestUsage := *cached.usageNanoCores
760 return &latestUsage
761 }
762
763
764
765
766 func (p *criStatsProvider) getAndUpdateContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
767 if stats == nil || stats.Attributes == nil || stats.Cpu == nil {
768 return nil
769 }
770
771 if stats.Cpu.UsageNanoCores != nil {
772 return &stats.Cpu.UsageNanoCores.Value
773 }
774
775 if stats.Cpu.UsageCoreNanoSeconds == nil {
776 return nil
777 }
778 id := stats.Attributes.Id
779 usage, err := func() (*uint64, error) {
780 p.mutex.Lock()
781 defer p.mutex.Unlock()
782
783 cached, ok := p.cpuUsageCache[id]
784 if !ok || cached.stats.UsageCoreNanoSeconds == nil || stats.Cpu.UsageCoreNanoSeconds.Value < cached.stats.UsageCoreNanoSeconds.Value {
785
786 p.cpuUsageCache[id] = &cpuUsageRecord{stats: stats.Cpu, usageNanoCores: nil}
787 return nil, nil
788 }
789
790 newStats := stats.Cpu
791 cachedStats := cached.stats
792 nanoSeconds := newStats.Timestamp - cachedStats.Timestamp
793 if nanoSeconds <= 0 {
794 return nil, fmt.Errorf("zero or negative interval (%v - %v)", newStats.Timestamp, cachedStats.Timestamp)
795 }
796 usageNanoCores := uint64(float64(newStats.UsageCoreNanoSeconds.Value-cachedStats.UsageCoreNanoSeconds.Value) /
797 float64(nanoSeconds) * float64(time.Second/time.Nanosecond))
798
799
800 usageToUpdate := usageNanoCores
801 p.cpuUsageCache[id] = &cpuUsageRecord{stats: newStats, usageNanoCores: &usageToUpdate}
802
803 return &usageNanoCores, nil
804 }()
805
806 if err != nil {
807
808 klog.ErrorS(err, "Failed updating cpu usage nano core")
809 }
810 return usage
811 }
812
813 func (p *criStatsProvider) cleanupOutdatedCaches() {
814 p.mutex.Lock()
815 defer p.mutex.Unlock()
816
817 for k, v := range p.cpuUsageCache {
818 if v == nil {
819 delete(p.cpuUsageCache, k)
820 continue
821 }
822
823 if time.Since(time.Unix(0, v.stats.Timestamp)) > defaultCachePeriod {
824 delete(p.cpuUsageCache, k)
825 }
826 }
827 }
828
829
830
831
832
833
834
835 func removeTerminatedPods(pods []*runtimeapi.PodSandbox) []*runtimeapi.PodSandbox {
836 podMap := make(map[statsapi.PodReference][]*runtimeapi.PodSandbox)
837
838 sort.Slice(pods, func(i, j int) bool {
839 return pods[i].CreatedAt < pods[j].CreatedAt
840 })
841 for _, pod := range pods {
842 refID := statsapi.PodReference{
843 Name: pod.GetMetadata().GetName(),
844 Namespace: pod.GetMetadata().GetNamespace(),
845
846 }
847 podMap[refID] = append(podMap[refID], pod)
848 }
849
850 result := make([]*runtimeapi.PodSandbox, 0)
851 for _, refs := range podMap {
852 if len(refs) == 1 {
853 result = append(result, refs[0])
854 continue
855 }
856 found := false
857 for i := 0; i < len(refs); i++ {
858 if refs[i].State == runtimeapi.PodSandboxState_SANDBOX_READY {
859 found = true
860 result = append(result, refs[i])
861 }
862 }
863 if !found {
864 result = append(result, refs[len(refs)-1])
865 }
866 }
867 return result
868 }
869
870
871
872 func removeTerminatedContainers(containers []*runtimeapi.Container) []*runtimeapi.Container {
873 containerMap := make(map[containerID][]*runtimeapi.Container)
874
875 sort.Slice(containers, func(i, j int) bool {
876 return containers[i].CreatedAt < containers[j].CreatedAt
877 })
878 for _, container := range containers {
879 refID := containerID{
880 podRef: buildPodRef(container.Labels),
881 containerName: kubetypes.GetContainerName(container.Labels),
882 }
883 containerMap[refID] = append(containerMap[refID], container)
884 }
885
886 result := make([]*runtimeapi.Container, 0)
887 for _, refs := range containerMap {
888 for i := 0; i < len(refs); i++ {
889 if refs[i].State == runtimeapi.ContainerState_CONTAINER_RUNNING {
890 result = append(result, refs[i])
891 }
892 }
893 }
894 return result
895 }
896
897 func (p *criStatsProvider) addCadvisorContainerStats(
898 cs *statsapi.ContainerStats,
899 caPodStats *cadvisorapiv2.ContainerInfo,
900 ) {
901 if caPodStats.Spec.HasCustomMetrics {
902 cs.UserDefinedMetrics = cadvisorInfoToUserDefinedMetrics(caPodStats)
903 }
904
905 cpu, memory := cadvisorInfoToCPUandMemoryStats(caPodStats)
906 if cpu != nil {
907 cs.CPU = cpu
908 }
909 if memory != nil {
910 cs.Memory = memory
911 }
912 }
913
914 func (p *criStatsProvider) addCadvisorContainerCPUAndMemoryStats(
915 cs *statsapi.ContainerStats,
916 caPodStats *cadvisorapiv2.ContainerInfo,
917 ) {
918 if caPodStats.Spec.HasCustomMetrics {
919 cs.UserDefinedMetrics = cadvisorInfoToUserDefinedMetrics(caPodStats)
920 }
921
922 cpu, memory := cadvisorInfoToCPUandMemoryStats(caPodStats)
923 if cpu != nil {
924 cs.CPU = cpu
925 }
926 if memory != nil {
927 cs.Memory = memory
928 }
929 }
930
931 func getCRICadvisorStats(infos map[string]cadvisorapiv2.ContainerInfo) (map[string]cadvisorapiv2.ContainerInfo, map[string]cadvisorapiv2.ContainerInfo) {
932 stats := make(map[string]cadvisorapiv2.ContainerInfo)
933 filteredInfos, cinfosByPodCgroupKey := filterTerminatedContainerInfoAndAssembleByPodCgroupKey(infos)
934 for key, info := range filteredInfos {
935
936
937
938
939 if strings.HasSuffix(key, ".mount") {
940 continue
941 }
942
943 if !isPodManagedContainer(&info) {
944 continue
945 }
946 stats[extractIDFromCgroupPath(key)] = info
947 }
948 return stats, cinfosByPodCgroupKey
949 }
950
951 func extractIDFromCgroupPath(cgroupPath string) string {
952
953 id := filepath.Base(cgroupPath)
954
955
956
957 systemdSuffix := ".scope"
958 if strings.HasSuffix(id, systemdSuffix) {
959 id = strings.TrimSuffix(id, systemdSuffix)
960 components := strings.Split(id, "-")
961 if len(components) > 1 {
962 id = components[len(components)-1]
963 }
964 }
965 return id
966 }
967
968 func criInterfaceToSummary(criIface *runtimeapi.NetworkInterfaceUsage) statsapi.InterfaceStats {
969 return statsapi.InterfaceStats{
970 Name: criIface.Name,
971 RxBytes: valueOfUInt64Value(criIface.RxBytes),
972 RxErrors: valueOfUInt64Value(criIface.RxErrors),
973 TxBytes: valueOfUInt64Value(criIface.TxBytes),
974 TxErrors: valueOfUInt64Value(criIface.TxErrors),
975 }
976 }
977
978 func valueOfUInt64Value(value *runtimeapi.UInt64Value) *uint64 {
979 if value == nil {
980 return nil
981 }
982 return &value.Value
983 }
984
View as plain text