...

Source file src/k8s.io/kubernetes/pkg/kubelet/server/stats/volume_stat_calculator.go

Documentation: k8s.io/kubernetes/pkg/kubelet/server/stats

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package stats
    18  
    19  import (
    20  	"fmt"
    21  	"sync"
    22  	"sync/atomic"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/util/wait"
    27  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    28  	"k8s.io/client-go/tools/record"
    29  	"k8s.io/component-helpers/storage/ephemeral"
    30  	"k8s.io/klog/v2"
    31  	stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
    32  	"k8s.io/kubernetes/pkg/features"
    33  	"k8s.io/kubernetes/pkg/volume"
    34  	"k8s.io/kubernetes/pkg/volume/util"
    35  	utiltrace "k8s.io/utils/trace"
    36  )
    37  
    38  // volumeStatCalculator calculates volume metrics for a given pod periodically in the background and caches the result
    39  type volumeStatCalculator struct {
    40  	statsProvider Provider
    41  	jitterPeriod  time.Duration
    42  	pod           *v1.Pod
    43  	stopChannel   chan struct{}
    44  	startO        sync.Once
    45  	stopO         sync.Once
    46  	latest        atomic.Value
    47  	eventRecorder record.EventRecorder
    48  }
    49  
    50  // PodVolumeStats encapsulates the VolumeStats for a pod.
    51  // It consists of two lists, for local ephemeral volumes, and for persistent volumes respectively.
    52  type PodVolumeStats struct {
    53  	EphemeralVolumes  []stats.VolumeStats
    54  	PersistentVolumes []stats.VolumeStats
    55  }
    56  
    57  // newVolumeStatCalculator creates a new VolumeStatCalculator
    58  func newVolumeStatCalculator(statsProvider Provider, jitterPeriod time.Duration, pod *v1.Pod, eventRecorder record.EventRecorder) *volumeStatCalculator {
    59  	return &volumeStatCalculator{
    60  		statsProvider: statsProvider,
    61  		jitterPeriod:  jitterPeriod,
    62  		pod:           pod,
    63  		stopChannel:   make(chan struct{}),
    64  		eventRecorder: eventRecorder,
    65  	}
    66  }
    67  
    68  // StartOnce starts pod volume calc that will occur periodically in the background until s.StopOnce is called
    69  func (s *volumeStatCalculator) StartOnce() *volumeStatCalculator {
    70  	s.startO.Do(func() {
    71  		go wait.JitterUntil(func() {
    72  			s.calcAndStoreStats()
    73  		}, s.jitterPeriod, 1.0, true, s.stopChannel)
    74  	})
    75  	return s
    76  }
    77  
    78  // StopOnce stops background pod volume calculation.  Will not stop a currently executing calculations until
    79  // they complete their current iteration.
    80  func (s *volumeStatCalculator) StopOnce() *volumeStatCalculator {
    81  	s.stopO.Do(func() {
    82  		close(s.stopChannel)
    83  	})
    84  	return s
    85  }
    86  
    87  // getLatest returns the most recent PodVolumeStats from the cache
    88  func (s *volumeStatCalculator) GetLatest() (PodVolumeStats, bool) {
    89  	result := s.latest.Load()
    90  	if result == nil {
    91  		return PodVolumeStats{}, false
    92  	}
    93  	return result.(PodVolumeStats), true
    94  }
    95  
    96  // calcAndStoreStats calculates PodVolumeStats for a given pod and writes the result to the s.latest cache.
    97  // If the pod references PVCs, the prometheus metrics for those are updated with the result.
    98  func (s *volumeStatCalculator) calcAndStoreStats() {
    99  	// Find all Volumes for the Pod
   100  	volumes, found := s.statsProvider.ListVolumesForPod(s.pod.UID)
   101  	blockVolumes, bvFound := s.statsProvider.ListBlockVolumesForPod(s.pod.UID)
   102  	if !found && !bvFound {
   103  		return
   104  	}
   105  
   106  	metricVolumes := make(map[string]volume.MetricsProvider)
   107  
   108  	if found {
   109  		for name, v := range volumes {
   110  			metricVolumes[name] = v
   111  		}
   112  	}
   113  	if bvFound {
   114  		for name, v := range blockVolumes {
   115  			// Only add the blockVolume if it implements the MetricsProvider interface
   116  			if _, ok := v.(volume.MetricsProvider); ok {
   117  				// Some drivers inherit the MetricsProvider interface from Filesystem
   118  				// mode volumes, but do not implement it for Block mode. Checking
   119  				// SupportsMetrics() will prevent panics in that case.
   120  				if v.SupportsMetrics() {
   121  					metricVolumes[name] = v
   122  				}
   123  			}
   124  		}
   125  	}
   126  
   127  	// Get volume specs for the pod - key'd by volume name
   128  	volumesSpec := make(map[string]v1.Volume)
   129  	for _, v := range s.pod.Spec.Volumes {
   130  		volumesSpec[v.Name] = v
   131  	}
   132  
   133  	// Call GetMetrics on each Volume and copy the result to a new VolumeStats.FsStats
   134  	var ephemeralStats []stats.VolumeStats
   135  	var persistentStats []stats.VolumeStats
   136  	for name, v := range metricVolumes {
   137  		metric, err := func() (*volume.Metrics, error) {
   138  			trace := utiltrace.New(fmt.Sprintf("Calculate volume metrics of %v for pod %v/%v", name, s.pod.Namespace, s.pod.Name))
   139  			defer trace.LogIfLong(1 * time.Second)
   140  			return v.GetMetrics()
   141  		}()
   142  		if err != nil {
   143  			// Expected for Volumes that don't support Metrics
   144  			if !volume.IsNotSupported(err) {
   145  				klog.V(4).InfoS("Failed to calculate volume metrics", "pod", klog.KObj(s.pod), "podUID", s.pod.UID, "volumeName", name, "err", err)
   146  			}
   147  			continue
   148  		}
   149  		// Lookup the volume spec and add a 'PVCReference' for volumes that reference a PVC
   150  		volSpec := volumesSpec[name]
   151  		var pvcRef *stats.PVCReference
   152  		if pvcSource := volSpec.PersistentVolumeClaim; pvcSource != nil {
   153  			pvcRef = &stats.PVCReference{
   154  				Name:      pvcSource.ClaimName,
   155  				Namespace: s.pod.GetNamespace(),
   156  			}
   157  		} else if volSpec.Ephemeral != nil {
   158  			pvcRef = &stats.PVCReference{
   159  				Name:      ephemeral.VolumeClaimName(s.pod, &volSpec),
   160  				Namespace: s.pod.GetNamespace(),
   161  			}
   162  		}
   163  		volumeStats := s.parsePodVolumeStats(name, pvcRef, metric, volSpec)
   164  		if util.IsLocalEphemeralVolume(volSpec) {
   165  			ephemeralStats = append(ephemeralStats, volumeStats)
   166  		} else {
   167  			persistentStats = append(persistentStats, volumeStats)
   168  		}
   169  
   170  		if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) {
   171  			if metric.Abnormal != nil && metric.Message != nil && (*metric.Abnormal) {
   172  				s.eventRecorder.Event(s.pod, v1.EventTypeWarning, "VolumeConditionAbnormal", fmt.Sprintf("Volume %s: %s", name, *metric.Message))
   173  			}
   174  		}
   175  	}
   176  
   177  	// Store the new stats
   178  	s.latest.Store(PodVolumeStats{EphemeralVolumes: ephemeralStats,
   179  		PersistentVolumes: persistentStats})
   180  }
   181  
   182  // parsePodVolumeStats converts (internal) volume.Metrics to (external) stats.VolumeStats structures
   183  func (s *volumeStatCalculator) parsePodVolumeStats(podName string, pvcRef *stats.PVCReference, metric *volume.Metrics, volSpec v1.Volume) stats.VolumeStats {
   184  
   185  	var (
   186  		available, capacity, used, inodes, inodesFree, inodesUsed uint64
   187  	)
   188  
   189  	if metric.Available != nil {
   190  		available = uint64(metric.Available.Value())
   191  	}
   192  	if metric.Capacity != nil {
   193  		capacity = uint64(metric.Capacity.Value())
   194  	}
   195  	if metric.Used != nil {
   196  		used = uint64(metric.Used.Value())
   197  	}
   198  	if metric.Inodes != nil {
   199  		inodes = uint64(metric.Inodes.Value())
   200  	}
   201  	if metric.InodesFree != nil {
   202  		inodesFree = uint64(metric.InodesFree.Value())
   203  	}
   204  	if metric.InodesUsed != nil {
   205  		inodesUsed = uint64(metric.InodesUsed.Value())
   206  	}
   207  
   208  	volumeStats := stats.VolumeStats{
   209  		Name:   podName,
   210  		PVCRef: pvcRef,
   211  		FsStats: stats.FsStats{Time: metric.Time, AvailableBytes: &available, CapacityBytes: &capacity,
   212  			UsedBytes: &used, Inodes: &inodes, InodesFree: &inodesFree, InodesUsed: &inodesUsed},
   213  	}
   214  
   215  	if metric.Abnormal != nil {
   216  		volumeStats.VolumeHealthStats = &stats.VolumeHealthStats{
   217  			Abnormal: *metric.Abnormal,
   218  		}
   219  	}
   220  
   221  	return volumeStats
   222  }
   223  

View as plain text