
Source file src/k8s.io/kubernetes/pkg/kubelet/stats/cri_stats_provider.go

Documentation: k8s.io/kubernetes/pkg/kubelet/stats

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package stats
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"path/filepath"
    24  	"sort"
    25  	"strings"
    26  	"sync"
    27  	"time"
    29  	cadvisormemory "github.com/google/cadvisor/cache/memory"
    30  	cadvisorfs "github.com/google/cadvisor/fs"
    31  	cadvisorapiv2 "github.com/google/cadvisor/info/v2"
    32  	"google.golang.org/grpc/codes"
    33  	"google.golang.org/grpc/status"
    34  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    35  	"k8s.io/apimachinery/pkg/types"
    36  	internalapi "k8s.io/cri-api/pkg/apis"
    37  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    38  	"k8s.io/klog/v2"
    39  	statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
    40  	kubetypes "k8s.io/kubelet/pkg/types"
    41  	"k8s.io/kubernetes/pkg/kubelet/cadvisor"
    42  	"k8s.io/kubernetes/pkg/kubelet/server/stats"
    43  	"k8s.io/utils/clock"
    44  )
    46  var (
    47  	// defaultCachePeriod is the default cache period for each cpuUsage.
    48  	defaultCachePeriod = 10 * time.Minute
    49  )
    51  // cpuUsageRecord holds the cpu usage stats and the calculated usageNanoCores.
    52  type cpuUsageRecord struct {
    53  	stats          *runtimeapi.CpuUsage
    54  	usageNanoCores *uint64
    55  }
    57  // criStatsProvider implements the containerStatsProvider interface by getting
    58  // the container stats from CRI.
    59  type criStatsProvider struct {
    60  	// cadvisor is used to get the node root filesystem's stats (such as the
    61  	// capacity/available bytes/inodes) that will be populated in per container
    62  	// filesystem stats.
    63  	cadvisor cadvisor.Interface
    64  	// resourceAnalyzer is used to get the volume stats of the pods.
    65  	resourceAnalyzer stats.ResourceAnalyzer
    66  	// runtimeService is used to get the status and stats of the pods and its
    67  	// managed containers.
    68  	runtimeService internalapi.RuntimeService
    69  	// imageService is used to get the stats of the image filesystem.
    70  	imageService internalapi.ImageManagerService
    71  	// hostStatsProvider is used to get the status of the host filesystem consumed by pods.
    72  	hostStatsProvider HostStatsProvider
    73  	// windowsNetworkStatsProvider is used by kubelet to gather networking stats on Windows
    74  	windowsNetworkStatsProvider interface{} //nolint:unused // U1000 We can't import hcsshim due to Build constraints in hcsshim
    75  	// clock is used report current time
    76  	clock clock.Clock
    78  	// cpuUsageCache caches the cpu usage for containers.
    79  	cpuUsageCache               map[string]*cpuUsageRecord
    80  	mutex                       sync.RWMutex
    81  	podAndContainerStatsFromCRI bool
    82  }
    84  // newCRIStatsProvider returns a containerStatsProvider implementation that
    85  // provides container stats using CRI.
    86  func newCRIStatsProvider(
    87  	cadvisor cadvisor.Interface,
    88  	resourceAnalyzer stats.ResourceAnalyzer,
    89  	runtimeService internalapi.RuntimeService,
    90  	imageService internalapi.ImageManagerService,
    91  	hostStatsProvider HostStatsProvider,
    92  	podAndContainerStatsFromCRI bool,
    93  ) containerStatsProvider {
    94  	return &criStatsProvider{
    95  		cadvisor:                    cadvisor,
    96  		resourceAnalyzer:            resourceAnalyzer,
    97  		runtimeService:              runtimeService,
    98  		imageService:                imageService,
    99  		hostStatsProvider:           hostStatsProvider,
   100  		cpuUsageCache:               make(map[string]*cpuUsageRecord),
   101  		podAndContainerStatsFromCRI: podAndContainerStatsFromCRI,
   102  		clock:                       clock.RealClock{},
   103  	}
   104  }
   106  // ListPodStats returns the stats of all the pod-managed containers.
   107  func (p *criStatsProvider) ListPodStats(ctx context.Context) ([]statsapi.PodStats, error) {
   108  	// Don't update CPU nano core usage.
   109  	return p.listPodStats(ctx, false)
   110  }
   112  // ListPodStatsAndUpdateCPUNanoCoreUsage updates the cpu nano core usage for
   113  // the containers and returns the stats for all the pod-managed containers.
   114  // This is a workaround because CRI runtimes do not supply nano core usages,
   115  // so this function calculate the difference between the current and the last
   116  // (cached) cpu stats to calculate this metrics. The implementation assumes a
   117  // single caller to periodically invoke this function to update the metrics. If
   118  // there exist multiple callers, the period used to compute the cpu usage may
   119  // vary and the usage could be incoherent (e.g., spiky). If no caller calls
   120  // this function, the cpu usage will stay nil. Right now, eviction manager is
   121  // the only caller, and it calls this function every 10s.
   122  func (p *criStatsProvider) ListPodStatsAndUpdateCPUNanoCoreUsage(ctx context.Context) ([]statsapi.PodStats, error) {
   123  	// Update CPU nano core usage.
   124  	return p.listPodStats(ctx, true)
   125  }
   127  func (p *criStatsProvider) listPodStats(ctx context.Context, updateCPUNanoCoreUsage bool) ([]statsapi.PodStats, error) {
   128  	// Gets node root filesystem information, which will be used to populate
   129  	// the available and capacity bytes/inodes in container stats.
   130  	rootFsInfo, err := p.cadvisor.RootFsInfo()
   131  	if err != nil {
   132  		return nil, fmt.Errorf("failed to get rootFs info: %v", err)
   133  	}
   135  	containerMap, podSandboxMap, err := p.getPodAndContainerMaps(ctx)
   136  	if err != nil {
   137  		return nil, fmt.Errorf("failed to get pod or container map: %v", err)
   138  	}
   140  	if p.podAndContainerStatsFromCRI {
   141  		result, err := p.listPodStatsStrictlyFromCRI(ctx, updateCPUNanoCoreUsage, containerMap, podSandboxMap, &rootFsInfo)
   142  		if err == nil {
   143  			// Call succeeded
   144  			return result, nil
   145  		}
   146  		s, ok := status.FromError(err)
   147  		// Legitimate failure, rather than the CRI implementation does not support ListPodSandboxStats.
   148  		if !ok || s.Code() != codes.Unimplemented {
   149  			return nil, err
   150  		}
   151  		// CRI implementation doesn't support ListPodSandboxStats, warn and fallback.
   152  		klog.V(5).ErrorS(err,
   153  			"CRI implementation must be updated to support ListPodSandboxStats if PodAndContainerStatsFromCRI feature gate is enabled. Falling back to populating with cAdvisor; this call will fail in the future.",
   154  		)
   155  	}
   156  	return p.listPodStatsPartiallyFromCRI(ctx, updateCPUNanoCoreUsage, containerMap, podSandboxMap, &rootFsInfo)
   157  }
   159  func (p *criStatsProvider) listPodStatsPartiallyFromCRI(ctx context.Context, updateCPUNanoCoreUsage bool, containerMap map[string]*runtimeapi.Container, podSandboxMap map[string]*runtimeapi.PodSandbox, rootFsInfo *cadvisorapiv2.FsInfo) ([]statsapi.PodStats, error) {
   160  	// fsIDtoInfo is a map from filesystem id to its stats. This will be used
   161  	// as a cache to avoid querying cAdvisor for the filesystem stats with the
   162  	// same filesystem id many times.
   163  	fsIDtoInfo := make(map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo)
   165  	// sandboxIDToPodStats is a temporary map from sandbox ID to its pod stats.
   166  	sandboxIDToPodStats := make(map[string]*statsapi.PodStats)
   168  	resp, err := p.runtimeService.ListContainerStats(ctx, &runtimeapi.ContainerStatsFilter{})
   169  	if err != nil {
   170  		return nil, fmt.Errorf("failed to list all container stats: %v", err)
   171  	}
   172  	allInfos, err := getCadvisorContainerInfo(p.cadvisor)
   173  	if err != nil {
   174  		return nil, fmt.Errorf("failed to fetch cadvisor stats: %v", err)
   175  	}
   176  	caInfos, allInfos := getCRICadvisorStats(allInfos)
   178  	// get network stats for containers.
   179  	// This is only used on Windows. For other platforms, (nil, nil) should be returned.
   180  	containerNetworkStats, err := p.listContainerNetworkStats()
   181  	if err != nil {
   182  		return nil, fmt.Errorf("failed to list container network stats: %v", err)
   183  	}
   185  	for _, stats := range resp {
   186  		containerID := stats.Attributes.Id
   187  		container, found := containerMap[containerID]
   188  		if !found {
   189  			continue
   190  		}
   192  		podSandboxID := container.PodSandboxId
   193  		podSandbox, found := podSandboxMap[podSandboxID]
   194  		if !found {
   195  			continue
   196  		}
   198  		// Creates the stats of the pod (if not created yet) which the
   199  		// container belongs to.
   200  		ps, found := sandboxIDToPodStats[podSandboxID]
   201  		if !found {
   202  			ps = buildPodStats(podSandbox)
   203  			sandboxIDToPodStats[podSandboxID] = ps
   204  		}
   206  		// Fill available stats for full set of required pod stats
   207  		cs, err := p.makeContainerStats(stats, container, rootFsInfo, fsIDtoInfo, podSandbox.GetMetadata(), updateCPUNanoCoreUsage)
   208  		if err != nil {
   209  			return nil, fmt.Errorf("make container stats: %w", err)
   210  		}
   211  		p.addPodNetworkStats(ps, podSandboxID, caInfos, cs, containerNetworkStats[podSandboxID])
   212  		p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
   213  		p.addSwapStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
   214  		p.addProcessStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
   216  		// If cadvisor stats is available for the container, use it to populate
   217  		// container stats
   218  		caStats, caFound := caInfos[containerID]
   219  		if !caFound {
   220  			klog.V(5).InfoS("Unable to find cadvisor stats for container", "containerID", containerID)
   221  		} else {
   222  			p.addCadvisorContainerStats(cs, &caStats)
   223  		}
   224  		ps.Containers = append(ps.Containers, *cs)
   225  	}
   226  	// cleanup outdated caches.
   227  	p.cleanupOutdatedCaches()
   229  	result := make([]statsapi.PodStats, 0, len(sandboxIDToPodStats))
   230  	for _, s := range sandboxIDToPodStats {
   231  		makePodStorageStats(s, rootFsInfo, p.resourceAnalyzer, p.hostStatsProvider, true)
   232  		result = append(result, *s)
   233  	}
   234  	return result, nil
   235  }
   237  func (p *criStatsProvider) listPodStatsStrictlyFromCRI(ctx context.Context, updateCPUNanoCoreUsage bool, containerMap map[string]*runtimeapi.Container, podSandboxMap map[string]*runtimeapi.PodSandbox, rootFsInfo *cadvisorapiv2.FsInfo) ([]statsapi.PodStats, error) {
   238  	criSandboxStats, err := p.runtimeService.ListPodSandboxStats(ctx, &runtimeapi.PodSandboxStatsFilter{})
   239  	if err != nil {
   240  		return nil, err
   241  	}
   243  	fsIDtoInfo := make(map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo)
   244  	summarySandboxStats := make([]statsapi.PodStats, 0, len(podSandboxMap))
   245  	for _, criSandboxStat := range criSandboxStats {
   246  		if criSandboxStat == nil || criSandboxStat.Attributes == nil {
   247  			klog.V(5).InfoS("Unable to find CRI stats for sandbox")
   248  			continue
   249  		}
   250  		podSandbox, found := podSandboxMap[criSandboxStat.Attributes.Id]
   251  		if !found {
   252  			continue
   253  		}
   254  		ps := buildPodStats(podSandbox)
   255  		if err := p.addCRIPodContainerStats(criSandboxStat, ps, fsIDtoInfo, containerMap, podSandbox, rootFsInfo, updateCPUNanoCoreUsage); err != nil {
   256  			return nil, fmt.Errorf("add CRI pod container stats: %w", err)
   257  		}
   258  		addCRIPodNetworkStats(ps, criSandboxStat)
   259  		addCRIPodCPUStats(ps, criSandboxStat)
   260  		addCRIPodMemoryStats(ps, criSandboxStat)
   261  		addCRIPodProcessStats(ps, criSandboxStat)
   262  		makePodStorageStats(ps, rootFsInfo, p.resourceAnalyzer, p.hostStatsProvider, true)
   263  		summarySandboxStats = append(summarySandboxStats, *ps)
   264  	}
   265  	return summarySandboxStats, nil
   266  }
   268  // ListPodCPUAndMemoryStats returns the CPU and Memory stats of all the pod-managed containers.
   269  func (p *criStatsProvider) ListPodCPUAndMemoryStats(ctx context.Context) ([]statsapi.PodStats, error) {
   270  	// sandboxIDToPodStats is a temporary map from sandbox ID to its pod stats.
   271  	sandboxIDToPodStats := make(map[string]*statsapi.PodStats)
   272  	containerMap, podSandboxMap, err := p.getPodAndContainerMaps(ctx)
   273  	if err != nil {
   274  		return nil, fmt.Errorf("failed to get pod or container map: %v", err)
   275  	}
   277  	result := make([]statsapi.PodStats, 0, len(podSandboxMap))
   278  	if p.podAndContainerStatsFromCRI {
   279  		criSandboxStats, err := p.runtimeService.ListPodSandboxStats(ctx, &runtimeapi.PodSandboxStatsFilter{})
   280  		// Call succeeded
   281  		if err == nil {
   282  			for _, criSandboxStat := range criSandboxStats {
   283  				podSandbox, found := podSandboxMap[criSandboxStat.Attributes.Id]
   284  				if !found {
   285  					continue
   286  				}
   287  				ps := buildPodStats(podSandbox)
   288  				addCRIPodCPUStats(ps, criSandboxStat)
   289  				addCRIPodMemoryStats(ps, criSandboxStat)
   290  				result = append(result, *ps)
   291  			}
   292  			return result, err
   293  		}
   294  		// Call failed, why?
   295  		s, ok := status.FromError(err)
   296  		// Legitimate failure, rather than the CRI implementation does not support ListPodSandboxStats.
   297  		if !ok || s.Code() != codes.Unimplemented {
   298  			return nil, err
   299  		}
   300  		// CRI implementation doesn't support ListPodSandboxStats, warn and fallback.
   301  		klog.ErrorS(err,
   302  			"CRI implementation must be updated to support ListPodSandboxStats if PodAndContainerStatsFromCRI feature gate is enabled. Falling back to populating with cAdvisor; this call will fail in the future.",
   303  		)
   304  	}
   306  	resp, err := p.runtimeService.ListContainerStats(ctx, &runtimeapi.ContainerStatsFilter{})
   307  	if err != nil {
   308  		return nil, fmt.Errorf("failed to list all container stats: %v", err)
   309  	}
   311  	allInfos, err := getCadvisorContainerInfo(p.cadvisor)
   312  	if err != nil {
   313  		return nil, fmt.Errorf("failed to fetch cadvisor stats: %v", err)
   314  	}
   315  	caInfos, allInfos := getCRICadvisorStats(allInfos)
   317  	for _, stats := range resp {
   318  		containerID := stats.Attributes.Id
   319  		container, found := containerMap[containerID]
   320  		if !found {
   321  			continue
   322  		}
   324  		podSandboxID := container.PodSandboxId
   325  		podSandbox, found := podSandboxMap[podSandboxID]
   326  		if !found {
   327  			continue
   328  		}
   330  		// Creates the stats of the pod (if not created yet) which the
   331  		// container belongs to.
   332  		ps, found := sandboxIDToPodStats[podSandboxID]
   333  		if !found {
   334  			ps = buildPodStats(podSandbox)
   335  			sandboxIDToPodStats[podSandboxID] = ps
   336  		}
   338  		// Fill available CPU and memory stats for full set of required pod stats
   339  		cs := p.makeContainerCPUAndMemoryStats(stats, container)
   340  		p.addPodCPUMemoryStats(ps, types.UID(podSandbox.Metadata.Uid), allInfos, cs)
   342  		// If cadvisor stats is available for the container, use it to populate
   343  		// container stats
   344  		caStats, caFound := caInfos[containerID]
   345  		if !caFound {
   346  			klog.V(4).InfoS("Unable to find cadvisor stats for container", "containerID", containerID)
   347  		} else {
   348  			p.addCadvisorContainerCPUAndMemoryStats(cs, &caStats)
   349  		}
   350  		ps.Containers = append(ps.Containers, *cs)
   351  	}
   352  	// cleanup outdated caches.
   353  	p.cleanupOutdatedCaches()
   355  	for _, s := range sandboxIDToPodStats {
   356  		result = append(result, *s)
   357  	}
   358  	return result, nil
   359  }
   361  func (p *criStatsProvider) getPodAndContainerMaps(ctx context.Context) (map[string]*runtimeapi.Container, map[string]*runtimeapi.PodSandbox, error) {
   362  	containers, err := p.runtimeService.ListContainers(ctx, &runtimeapi.ContainerFilter{})
   363  	if err != nil {
   364  		return nil, nil, fmt.Errorf("failed to list all containers: %v", err)
   365  	}
   367  	// Creates pod sandbox map between the pod sandbox ID and the PodSandbox object.
   368  	podSandboxMap := make(map[string]*runtimeapi.PodSandbox)
   369  	podSandboxes, err := p.runtimeService.ListPodSandbox(ctx, &runtimeapi.PodSandboxFilter{})
   370  	if err != nil {
   371  		return nil, nil, fmt.Errorf("failed to list all pod sandboxes: %v", err)
   372  	}
   373  	podSandboxes = removeTerminatedPods(podSandboxes)
   374  	for _, s := range podSandboxes {
   375  		podSandboxMap[s.Id] = s
   376  	}
   378  	containers = removeTerminatedContainers(containers)
   379  	// Creates container map between the container ID and the Container object.
   380  	containerMap := make(map[string]*runtimeapi.Container)
   381  	for _, c := range containers {
   382  		containerMap[c.Id] = c
   383  	}
   384  	return containerMap, podSandboxMap, nil
   385  }
   387  // ImageFsStats returns the stats of the image filesystem.
   388  func (p *criStatsProvider) ImageFsStats(ctx context.Context) (imageFsRet *statsapi.FsStats, containerFsRet *statsapi.FsStats, errRet error) {
   389  	resp, err := p.imageService.ImageFsInfo(ctx)
   390  	if err != nil {
   391  		return nil, nil, err
   392  	}
   394  	// CRI may return the stats of multiple image filesystems but we only
   395  	// return the first one.
   396  	//
   397  	// TODO(yguo0905): Support returning stats of multiple image filesystems.
   398  	if len(resp.GetImageFilesystems()) == 0 {
   399  		return nil, nil, fmt.Errorf("imageFs information is unavailable")
   400  	}
   401  	fs := resp.GetImageFilesystems()[0]
   402  	imageFsRet = &statsapi.FsStats{
   403  		Time:      metav1.NewTime(time.Unix(0, fs.Timestamp)),
   404  		UsedBytes: &fs.UsedBytes.Value,
   405  	}
   406  	if fs.InodesUsed != nil {
   407  		imageFsRet.InodesUsed = &fs.InodesUsed.Value
   408  	}
   409  	imageFsInfo, err := p.getFsInfo(fs.GetFsId())
   410  	if err != nil {
   411  		return nil, nil, fmt.Errorf("get filesystem info: %w", err)
   412  	}
   413  	if imageFsInfo != nil {
   414  		// The image filesystem id is unknown to the local node or there's
   415  		// an error on retrieving the stats. In these cases, we omit those
   416  		// stats and return the best-effort partial result. See
   417  		// https://github.com/kubernetes/heapster/issues/1793.
   418  		imageFsRet.AvailableBytes = &imageFsInfo.Available
   419  		imageFsRet.CapacityBytes = &imageFsInfo.Capacity
   420  		imageFsRet.InodesFree = imageFsInfo.InodesFree
   421  		imageFsRet.Inodes = imageFsInfo.Inodes
   422  	}
   423  	// TODO: For CRI Stats Provider we don't support separate disks yet.
   424  	return imageFsRet, imageFsRet, nil
   425  }
   427  // ImageFsDevice returns name of the device where the image filesystem locates,
   428  // e.g. /dev/sda1.
   429  func (p *criStatsProvider) ImageFsDevice(ctx context.Context) (string, error) {
   430  	resp, err := p.imageService.ImageFsInfo(ctx)
   431  	if err != nil {
   432  		return "", err
   433  	}
   434  	for _, fs := range resp.GetImageFilesystems() {
   435  		fsInfo, err := p.getFsInfo(fs.GetFsId())
   436  		if err != nil {
   437  			return "", fmt.Errorf("get filesystem info: %w", err)
   438  		}
   439  		if fsInfo != nil {
   440  			return fsInfo.Device, nil
   441  		}
   442  	}
   443  	return "", errors.New("imagefs device is not found")
   444  }
   446  // getFsInfo returns the information of the filesystem with the specified
   447  // fsID. If any error occurs, this function logs the error and returns
   448  // nil.
   449  func (p *criStatsProvider) getFsInfo(fsID *runtimeapi.FilesystemIdentifier) (*cadvisorapiv2.FsInfo, error) {
   450  	if fsID == nil {
   451  		klog.V(2).InfoS("Failed to get filesystem info: fsID is nil")
   452  		return nil, nil
   453  	}
   454  	mountpoint := fsID.GetMountpoint()
   455  	fsInfo, err := p.cadvisor.GetDirFsInfo(mountpoint)
   456  	if err != nil {
   457  		msg := "Failed to get the info of the filesystem with mountpoint"
   458  		if errors.Is(err, cadvisorfs.ErrNoSuchDevice) ||
   459  			errors.Is(err, cadvisorfs.ErrDeviceNotInPartitionsMap) ||
   460  			errors.Is(err, cadvisormemory.ErrDataNotFound) {
   461  			klog.V(2).InfoS(msg, "mountpoint", mountpoint, "err", err)
   462  		} else {
   463  			klog.ErrorS(err, msg, "mountpoint", mountpoint)
   464  			return nil, fmt.Errorf("%s: %w", msg, err)
   465  		}
   466  		return nil, nil
   467  	}
   468  	return &fsInfo, nil
   469  }
   471  // buildPodStats returns a PodStats that identifies the Pod managing cinfo
   472  func buildPodStats(podSandbox *runtimeapi.PodSandbox) *statsapi.PodStats {
   473  	return &statsapi.PodStats{
   474  		PodRef: statsapi.PodReference{
   475  			Name:      podSandbox.Metadata.Name,
   476  			UID:       podSandbox.Metadata.Uid,
   477  			Namespace: podSandbox.Metadata.Namespace,
   478  		},
   479  		// The StartTime in the summary API is the pod creation time.
   480  		StartTime: metav1.NewTime(time.Unix(0, podSandbox.CreatedAt)),
   481  	}
   482  }
   484  func (p *criStatsProvider) addPodNetworkStats(
   485  	ps *statsapi.PodStats,
   486  	podSandboxID string,
   487  	caInfos map[string]cadvisorapiv2.ContainerInfo,
   488  	cs *statsapi.ContainerStats,
   489  	netStats *statsapi.NetworkStats,
   490  ) {
   491  	caPodSandbox, found := caInfos[podSandboxID]
   492  	// try get network stats from cadvisor first.
   493  	if found {
   494  		networkStats := cadvisorInfoToNetworkStats(&caPodSandbox)
   495  		if networkStats != nil {
   496  			ps.Network = networkStats
   497  			return
   498  		}
   499  	}
   501  	// Not found from cadvisor, get from netStats.
   502  	if netStats != nil {
   503  		ps.Network = netStats
   504  		return
   505  	}
   507  	// TODO: sum Pod network stats from container stats.
   508  	klog.V(4).InfoS("Unable to find network stats for sandbox", "sandboxID", podSandboxID)
   509  }
   511  func (p *criStatsProvider) addPodCPUMemoryStats(
   512  	ps *statsapi.PodStats,
   513  	podUID types.UID,
   514  	allInfos map[string]cadvisorapiv2.ContainerInfo,
   515  	cs *statsapi.ContainerStats,
   516  ) {
   517  	// try get cpu and memory stats from cadvisor first.
   518  	podCgroupInfo := getCadvisorPodInfoFromPodUID(podUID, allInfos)
   519  	if podCgroupInfo != nil {
   520  		cpu, memory := cadvisorInfoToCPUandMemoryStats(podCgroupInfo)
   521  		ps.CPU = cpu
   522  		ps.Memory = memory
   523  		return
   524  	}
   526  	// Sum Pod cpu and memory stats from containers stats.
   527  	if cs.CPU != nil {
   528  		if ps.CPU == nil {
   529  			ps.CPU = &statsapi.CPUStats{}
   530  		}
   532  		ps.CPU.Time = cs.CPU.Time
   533  		usageCoreNanoSeconds := getUint64Value(cs.CPU.UsageCoreNanoSeconds) + getUint64Value(ps.CPU.UsageCoreNanoSeconds)
   534  		usageNanoCores := getUint64Value(cs.CPU.UsageNanoCores) + getUint64Value(ps.CPU.UsageNanoCores)
   535  		ps.CPU.UsageCoreNanoSeconds = &usageCoreNanoSeconds
   536  		ps.CPU.UsageNanoCores = &usageNanoCores
   537  	}
   539  	if cs.Memory != nil {
   540  		if ps.Memory == nil {
   541  			ps.Memory = &statsapi.MemoryStats{}
   542  		}
   544  		ps.Memory.Time = cs.Memory.Time
   545  		availableBytes := getUint64Value(cs.Memory.AvailableBytes) + getUint64Value(ps.Memory.AvailableBytes)
   546  		usageBytes := getUint64Value(cs.Memory.UsageBytes) + getUint64Value(ps.Memory.UsageBytes)
   547  		workingSetBytes := getUint64Value(cs.Memory.WorkingSetBytes) + getUint64Value(ps.Memory.WorkingSetBytes)
   548  		rSSBytes := getUint64Value(cs.Memory.RSSBytes) + getUint64Value(ps.Memory.RSSBytes)
   549  		pageFaults := getUint64Value(cs.Memory.PageFaults) + getUint64Value(ps.Memory.PageFaults)
   550  		majorPageFaults := getUint64Value(cs.Memory.MajorPageFaults) + getUint64Value(ps.Memory.MajorPageFaults)
   551  		ps.Memory.AvailableBytes = &availableBytes
   552  		ps.Memory.UsageBytes = &usageBytes
   553  		ps.Memory.WorkingSetBytes = &workingSetBytes
   554  		ps.Memory.RSSBytes = &rSSBytes
   555  		ps.Memory.PageFaults = &pageFaults
   556  		ps.Memory.MajorPageFaults = &majorPageFaults
   557  	}
   558  }
   560  func (p *criStatsProvider) addSwapStats(
   561  	ps *statsapi.PodStats,
   562  	podUID types.UID,
   563  	allInfos map[string]cadvisorapiv2.ContainerInfo,
   564  	cs *statsapi.ContainerStats,
   565  ) {
   566  	// try get cpu and memory stats from cadvisor first.
   567  	podCgroupInfo := getCadvisorPodInfoFromPodUID(podUID, allInfos)
   568  	if podCgroupInfo != nil {
   569  		ps.Swap = cadvisorInfoToSwapStats(podCgroupInfo)
   570  		return
   571  	}
   573  	// Sum Pod cpu and memory stats from containers stats.
   574  	if cs.Swap != nil {
   575  		if ps.Swap == nil {
   576  			ps.Swap = &statsapi.SwapStats{Time: cs.Swap.Time}
   577  		}
   578  		swapAvailableBytes := getUint64Value(cs.Swap.SwapAvailableBytes) + getUint64Value(ps.Swap.SwapAvailableBytes)
   579  		swapUsageBytes := getUint64Value(cs.Swap.SwapUsageBytes) + getUint64Value(ps.Swap.SwapUsageBytes)
   580  		ps.Swap.SwapAvailableBytes = &swapAvailableBytes
   581  		ps.Swap.SwapUsageBytes = &swapUsageBytes
   582  	}
   583  }
   585  func (p *criStatsProvider) addProcessStats(
   586  	ps *statsapi.PodStats,
   587  	podUID types.UID,
   588  	allInfos map[string]cadvisorapiv2.ContainerInfo,
   589  	cs *statsapi.ContainerStats,
   590  ) {
   591  	// try get process stats from cadvisor only.
   592  	info := getCadvisorPodInfoFromPodUID(podUID, allInfos)
   593  	if info != nil {
   594  		ps.ProcessStats = cadvisorInfoToProcessStats(info)
   595  		return
   596  	}
   597  }
   599  func (p *criStatsProvider) makeContainerStats(
   600  	stats *runtimeapi.ContainerStats,
   601  	container *runtimeapi.Container,
   602  	rootFsInfo *cadvisorapiv2.FsInfo,
   603  	fsIDtoInfo map[runtimeapi.FilesystemIdentifier]*cadvisorapiv2.FsInfo,
   604  	meta *runtimeapi.PodSandboxMetadata,
   605  	updateCPUNanoCoreUsage bool,
   606  ) (*statsapi.ContainerStats, error) {
   607  	result := &statsapi.ContainerStats{
   608  		Name: stats.Attributes.Metadata.Name,
   609  		// The StartTime in the summary API is the container creation time.
   610  		StartTime: metav1.NewTime(time.Unix(0, container.CreatedAt)),
   611  		CPU:       &statsapi.CPUStats{},
   612  		Memory:    &statsapi.MemoryStats{},
   613  		Rootfs:    &statsapi.FsStats{},
   614  		Swap:      &statsapi.SwapStats{},
   615  		// UserDefinedMetrics is not supported by CRI.
   616  	}
   617  	if stats.Cpu != nil {
   618  		result.CPU.Time = metav1.NewTime(time.Unix(0, stats.Cpu.Timestamp))
   619  		if stats.Cpu.UsageCoreNanoSeconds != nil {
   620  			result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
   621  		}
   622  		var usageNanoCores *uint64
   623  		if updateCPUNanoCoreUsage {
   624  			usageNanoCores = p.getAndUpdateContainerUsageNanoCores(stats)
   625  		} else {
   626  			usageNanoCores = p.getContainerUsageNanoCores(stats)
   627  		}
   628  		if usageNanoCores != nil {
   629  			result.CPU.UsageNanoCores = usageNanoCores
   630  		}
   631  	} else {
   632  		result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
   633  		result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
   634  		result.CPU.UsageNanoCores = uint64Ptr(0)
   635  	}
   636  	if stats.Memory != nil {
   637  		result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
   638  		if stats.Memory.WorkingSetBytes != nil {
   639  			result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value
   640  		}
   641  	} else {
   642  		result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
   643  		result.Memory.WorkingSetBytes = uint64Ptr(0)
   644  	}
   645  	if stats.Swap != nil {
   646  		result.Swap.Time = metav1.NewTime(time.Unix(0, stats.Swap.Timestamp))
   647  		if stats.Swap.SwapUsageBytes != nil {
   648  			result.Swap.SwapUsageBytes = &stats.Swap.SwapUsageBytes.Value
   649  		}
   650  		if stats.Swap.SwapAvailableBytes != nil {
   651  			result.Swap.SwapAvailableBytes = &stats.Swap.SwapAvailableBytes.Value
   652  		}
   653  	} else {
   654  		result.Swap.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
   655  		result.Swap.SwapUsageBytes = uint64Ptr(0)
   656  		result.Swap.SwapAvailableBytes = uint64Ptr(0)
   657  	}
   658  	if stats.WritableLayer != nil {
   659  		result.Rootfs.Time = metav1.NewTime(time.Unix(0, stats.WritableLayer.Timestamp))
   660  		if stats.WritableLayer.UsedBytes != nil {
   661  			result.Rootfs.UsedBytes = &stats.WritableLayer.UsedBytes.Value
   662  		}
   663  		if stats.WritableLayer.InodesUsed != nil {
   664  			result.Rootfs.InodesUsed = &stats.WritableLayer.InodesUsed.Value
   665  		}
   666  	}
   667  	fsID := stats.GetWritableLayer().GetFsId()
   668  	var err error
   669  	if fsID != nil {
   670  		imageFsInfo, found := fsIDtoInfo[*fsID]
   671  		if !found {
   672  			imageFsInfo, err = p.getFsInfo(fsID)
   673  			if err != nil {
   674  				return nil, fmt.Errorf("get filesystem info: %w", err)
   675  			}
   676  			fsIDtoInfo[*fsID] = imageFsInfo
   677  		}
   678  		if imageFsInfo != nil {
   679  			// The image filesystem id is unknown to the local node or there's
   680  			// an error on retrieving the stats. In these cases, we omit those stats
   681  			// and return the best-effort partial result. See
   682  			// https://github.com/kubernetes/heapster/issues/1793.
   683  			result.Rootfs.AvailableBytes = &imageFsInfo.Available
   684  			result.Rootfs.CapacityBytes = &imageFsInfo.Capacity
   685  			result.Rootfs.InodesFree = imageFsInfo.InodesFree
   686  			result.Rootfs.Inodes = imageFsInfo.Inodes
   687  		}
   688  	}
   689  	// NOTE: This doesn't support the old pod log path, `/var/log/pods/UID`. For containers
   690  	// using old log path, empty log stats are returned. This is fine, because we don't
   691  	// officially support in-place upgrade anyway.
   692  	result.Logs, err = p.hostStatsProvider.getPodContainerLogStats(meta.GetNamespace(), meta.GetName(), types.UID(meta.GetUid()), container.GetMetadata().GetName(), rootFsInfo)
   693  	if err != nil {
   694  		klog.ErrorS(err, "Unable to fetch container log stats", "containerName", container.GetMetadata().GetName())
   695  	}
   696  	return result, nil
   697  }
   699  func (p *criStatsProvider) makeContainerCPUAndMemoryStats(
   700  	stats *runtimeapi.ContainerStats,
   701  	container *runtimeapi.Container,
   702  ) *statsapi.ContainerStats {
   703  	result := &statsapi.ContainerStats{
   704  		Name: stats.Attributes.Metadata.Name,
   705  		// The StartTime in the summary API is the container creation time.
   706  		StartTime: metav1.NewTime(time.Unix(0, container.CreatedAt)),
   707  		CPU:       &statsapi.CPUStats{},
   708  		Memory:    &statsapi.MemoryStats{},
   709  		// UserDefinedMetrics is not supported by CRI.
   710  	}
   711  	if stats.Cpu != nil {
   712  		result.CPU.Time = metav1.NewTime(time.Unix(0, stats.Cpu.Timestamp))
   713  		if stats.Cpu.UsageCoreNanoSeconds != nil {
   714  			result.CPU.UsageCoreNanoSeconds = &stats.Cpu.UsageCoreNanoSeconds.Value
   715  		}
   717  		usageNanoCores := p.getContainerUsageNanoCores(stats)
   718  		if usageNanoCores != nil {
   719  			result.CPU.UsageNanoCores = usageNanoCores
   720  		}
   721  	} else {
   722  		result.CPU.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
   723  		result.CPU.UsageCoreNanoSeconds = uint64Ptr(0)
   724  		result.CPU.UsageNanoCores = uint64Ptr(0)
   725  	}
   726  	if stats.Memory != nil {
   727  		result.Memory.Time = metav1.NewTime(time.Unix(0, stats.Memory.Timestamp))
   728  		if stats.Memory.WorkingSetBytes != nil {
   729  			result.Memory.WorkingSetBytes = &stats.Memory.WorkingSetBytes.Value
   730  		}
   731  	} else {
   732  		result.Memory.Time = metav1.NewTime(time.Unix(0, time.Now().UnixNano()))
   733  		result.Memory.WorkingSetBytes = uint64Ptr(0)
   734  	}
   736  	return result
   737  }
   739  // getContainerUsageNanoCores first attempts to get the usage nano cores from the stats reported
   740  // by the CRI. If it is unable to, it gets the information from the cache instead.
   741  func (p *criStatsProvider) getContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
   742  	if stats == nil || stats.Attributes == nil {
   743  		return nil
   744  	}
   746  	// Bypass the cache if the CRI implementation specified the UsageNanoCores.
   747  	if stats.Cpu != nil && stats.Cpu.UsageNanoCores != nil {
   748  		return &stats.Cpu.UsageNanoCores.Value
   749  	}
   751  	p.mutex.RLock()
   752  	defer p.mutex.RUnlock()
   754  	cached, ok := p.cpuUsageCache[stats.Attributes.Id]
   755  	if !ok || cached.usageNanoCores == nil {
   756  		return nil
   757  	}
   758  	// return a copy of the usage
   759  	latestUsage := *cached.usageNanoCores
   760  	return &latestUsage
   761  }
   763  // getAndUpdateContainerUsageNanoCores first attempts to get the usage nano cores from the stats reported
   764  // by the CRI. If it is unable to, it computes usageNanoCores based on the given and the cached usageCoreNanoSeconds,
   765  // updates the cache with the computed usageNanoCores, and returns the usageNanoCores.
   766  func (p *criStatsProvider) getAndUpdateContainerUsageNanoCores(stats *runtimeapi.ContainerStats) *uint64 {
   767  	if stats == nil || stats.Attributes == nil || stats.Cpu == nil {
   768  		return nil
   769  	}
   770  	// Bypass the cache if the CRI implementation specified the UsageNanoCores.
   771  	if stats.Cpu.UsageNanoCores != nil {
   772  		return &stats.Cpu.UsageNanoCores.Value
   773  	}
   774  	// If there is no UsageNanoCores, nor UsageCoreNanoSeconds, there is no information to use
   775  	if stats.Cpu.UsageCoreNanoSeconds == nil {
   776  		return nil
   777  	}
   778  	id := stats.Attributes.Id
   779  	usage, err := func() (*uint64, error) {
   780  		p.mutex.Lock()
   781  		defer p.mutex.Unlock()
   783  		cached, ok := p.cpuUsageCache[id]
   784  		if !ok || cached.stats.UsageCoreNanoSeconds == nil || stats.Cpu.UsageCoreNanoSeconds.Value < cached.stats.UsageCoreNanoSeconds.Value {
   785  			// Cannot compute the usage now, but update the cached stats anyway
   786  			p.cpuUsageCache[id] = &cpuUsageRecord{stats: stats.Cpu, usageNanoCores: nil}
   787  			return nil, nil
   788  		}
   790  		newStats := stats.Cpu
   791  		cachedStats := cached.stats
   792  		nanoSeconds := newStats.Timestamp - cachedStats.Timestamp
   793  		if nanoSeconds <= 0 {
   794  			return nil, fmt.Errorf("zero or negative interval (%v - %v)", newStats.Timestamp, cachedStats.Timestamp)
   795  		}
   796  		usageNanoCores := uint64(float64(newStats.UsageCoreNanoSeconds.Value-cachedStats.UsageCoreNanoSeconds.Value) /
   797  			float64(nanoSeconds) * float64(time.Second/time.Nanosecond))
   799  		// Update cache with new value.
   800  		usageToUpdate := usageNanoCores
   801  		p.cpuUsageCache[id] = &cpuUsageRecord{stats: newStats, usageNanoCores: &usageToUpdate}
   803  		return &usageNanoCores, nil
   804  	}()
   806  	if err != nil {
   807  		// This should not happen. Log now to raise visibility
   808  		klog.ErrorS(err, "Failed updating cpu usage nano core")
   809  	}
   810  	return usage
   811  }
   813  func (p *criStatsProvider) cleanupOutdatedCaches() {
   814  	p.mutex.Lock()
   815  	defer p.mutex.Unlock()
   817  	for k, v := range p.cpuUsageCache {
   818  		if v == nil {
   819  			delete(p.cpuUsageCache, k)
   820  			continue
   821  		}
   823  		if time.Since(time.Unix(0, v.stats.Timestamp)) > defaultCachePeriod {
   824  			delete(p.cpuUsageCache, k)
   825  		}
   826  	}
   827  }
   829  // removeTerminatedPods returns pods with terminated ones removed.
   830  // It only removes a terminated pod when there is a running instance
   831  // of the pod with the same name and namespace.
   832  // This is needed because:
   833  // 1) PodSandbox may be recreated;
   834  // 2) Pod may be recreated with the same name and namespace.
   835  func removeTerminatedPods(pods []*runtimeapi.PodSandbox) []*runtimeapi.PodSandbox {
   836  	podMap := make(map[statsapi.PodReference][]*runtimeapi.PodSandbox)
   837  	// Sort order by create time
   838  	sort.Slice(pods, func(i, j int) bool {
   839  		return pods[i].CreatedAt < pods[j].CreatedAt
   840  	})
   841  	for _, pod := range pods {
   842  		refID := statsapi.PodReference{
   843  			Name:      pod.GetMetadata().GetName(),
   844  			Namespace: pod.GetMetadata().GetNamespace(),
   845  			// UID is intentionally left empty.
   846  		}
   847  		podMap[refID] = append(podMap[refID], pod)
   848  	}
   850  	result := make([]*runtimeapi.PodSandbox, 0)
   851  	for _, refs := range podMap {
   852  		if len(refs) == 1 {
   853  			result = append(result, refs[0])
   854  			continue
   855  		}
   856  		found := false
   857  		for i := 0; i < len(refs); i++ {
   858  			if refs[i].State == runtimeapi.PodSandboxState_SANDBOX_READY {
   859  				found = true
   860  				result = append(result, refs[i])
   861  			}
   862  		}
   863  		if !found {
   864  			result = append(result, refs[len(refs)-1])
   865  		}
   866  	}
   867  	return result
   868  }
   870  // removeTerminatedContainers removes all terminated containers since they should
   871  // not be used for usage calculations.
   872  func removeTerminatedContainers(containers []*runtimeapi.Container) []*runtimeapi.Container {
   873  	containerMap := make(map[containerID][]*runtimeapi.Container)
   874  	// Sort order by create time
   875  	sort.Slice(containers, func(i, j int) bool {
   876  		return containers[i].CreatedAt < containers[j].CreatedAt
   877  	})
   878  	for _, container := range containers {
   879  		refID := containerID{
   880  			podRef:        buildPodRef(container.Labels),
   881  			containerName: kubetypes.GetContainerName(container.Labels),
   882  		}
   883  		containerMap[refID] = append(containerMap[refID], container)
   884  	}
   886  	result := make([]*runtimeapi.Container, 0)
   887  	for _, refs := range containerMap {
   888  		for i := 0; i < len(refs); i++ {
   889  			if refs[i].State == runtimeapi.ContainerState_CONTAINER_RUNNING {
   890  				result = append(result, refs[i])
   891  			}
   892  		}
   893  	}
   894  	return result
   895  }
   897  func (p *criStatsProvider) addCadvisorContainerStats(
   898  	cs *statsapi.ContainerStats,
   899  	caPodStats *cadvisorapiv2.ContainerInfo,
   900  ) {
   901  	if caPodStats.Spec.HasCustomMetrics {
   902  		cs.UserDefinedMetrics = cadvisorInfoToUserDefinedMetrics(caPodStats)
   903  	}
   905  	cpu, memory := cadvisorInfoToCPUandMemoryStats(caPodStats)
   906  	if cpu != nil {
   907  		cs.CPU = cpu
   908  	}
   909  	if memory != nil {
   910  		cs.Memory = memory
   911  	}
   912  }
   914  func (p *criStatsProvider) addCadvisorContainerCPUAndMemoryStats(
   915  	cs *statsapi.ContainerStats,
   916  	caPodStats *cadvisorapiv2.ContainerInfo,
   917  ) {
   918  	if caPodStats.Spec.HasCustomMetrics {
   919  		cs.UserDefinedMetrics = cadvisorInfoToUserDefinedMetrics(caPodStats)
   920  	}
   922  	cpu, memory := cadvisorInfoToCPUandMemoryStats(caPodStats)
   923  	if cpu != nil {
   924  		cs.CPU = cpu
   925  	}
   926  	if memory != nil {
   927  		cs.Memory = memory
   928  	}
   929  }
   931  func getCRICadvisorStats(infos map[string]cadvisorapiv2.ContainerInfo) (map[string]cadvisorapiv2.ContainerInfo, map[string]cadvisorapiv2.ContainerInfo) {
   932  	stats := make(map[string]cadvisorapiv2.ContainerInfo)
   933  	filteredInfos, cinfosByPodCgroupKey := filterTerminatedContainerInfoAndAssembleByPodCgroupKey(infos)
   934  	for key, info := range filteredInfos {
   935  		// On systemd using devicemapper each mount into the container has an
   936  		// associated cgroup. We ignore them to ensure we do not get duplicate
   937  		// entries in our summary. For details on .mount units:
   938  		// http://man7.org/linux/man-pages/man5/systemd.mount.5.html
   939  		if strings.HasSuffix(key, ".mount") {
   940  			continue
   941  		}
   942  		// Build the Pod key if this container is managed by a Pod
   943  		if !isPodManagedContainer(&info) {
   944  			continue
   945  		}
   946  		stats[extractIDFromCgroupPath(key)] = info
   947  	}
   948  	return stats, cinfosByPodCgroupKey
   949  }
   951  func extractIDFromCgroupPath(cgroupPath string) string {
   952  	// case0 == cgroupfs: "/kubepods/burstable/pod2fc932ce-fdcc-454b-97bd-aadfdeb4c340/9be25294016e2dc0340dd605ce1f57b492039b267a6a618a7ad2a7a58a740f32"
   953  	id := filepath.Base(cgroupPath)
   955  	// case1 == systemd: "/kubepods.slice/kubepods-burstable.slice/kubepods-burstable-pod2fc932ce_fdcc_454b_97bd_aadfdeb4c340.slice/cri-containerd-aaefb9d8feed2d453b543f6d928cede7a4dbefa6a0ae7c9b990dd234c56e93b9.scope"
   956  	// trim anything before the final '-' and suffix .scope
   957  	systemdSuffix := ".scope"
   958  	if strings.HasSuffix(id, systemdSuffix) {
   959  		id = strings.TrimSuffix(id, systemdSuffix)
   960  		components := strings.Split(id, "-")
   961  		if len(components) > 1 {
   962  			id = components[len(components)-1]
   963  		}
   964  	}
   965  	return id
   966  }
   968  func criInterfaceToSummary(criIface *runtimeapi.NetworkInterfaceUsage) statsapi.InterfaceStats {
   969  	return statsapi.InterfaceStats{
   970  		Name:     criIface.Name,
   971  		RxBytes:  valueOfUInt64Value(criIface.RxBytes),
   972  		RxErrors: valueOfUInt64Value(criIface.RxErrors),
   973  		TxBytes:  valueOfUInt64Value(criIface.TxBytes),
   974  		TxErrors: valueOfUInt64Value(criIface.TxErrors),
   975  	}
   976  }
   978  func valueOfUInt64Value(value *runtimeapi.UInt64Value) *uint64 {
   979  	if value == nil {
   980  		return nil
   981  	}
   982  	return &value.Value
   983  }

View as plain text