1
16
17 package stats
18
19 import (
20 "fmt"
21 "sync"
22 "sync/atomic"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 "k8s.io/apimachinery/pkg/util/wait"
27 utilfeature "k8s.io/apiserver/pkg/util/feature"
28 "k8s.io/client-go/tools/record"
29 "k8s.io/component-helpers/storage/ephemeral"
30 "k8s.io/klog/v2"
31 stats "k8s.io/kubelet/pkg/apis/stats/v1alpha1"
32 "k8s.io/kubernetes/pkg/features"
33 "k8s.io/kubernetes/pkg/volume"
34 "k8s.io/kubernetes/pkg/volume/util"
35 utiltrace "k8s.io/utils/trace"
36 )
37
38
39 type volumeStatCalculator struct {
40 statsProvider Provider
41 jitterPeriod time.Duration
42 pod *v1.Pod
43 stopChannel chan struct{}
44 startO sync.Once
45 stopO sync.Once
46 latest atomic.Value
47 eventRecorder record.EventRecorder
48 }
49
50
51
52 type PodVolumeStats struct {
53 EphemeralVolumes []stats.VolumeStats
54 PersistentVolumes []stats.VolumeStats
55 }
56
57
58 func newVolumeStatCalculator(statsProvider Provider, jitterPeriod time.Duration, pod *v1.Pod, eventRecorder record.EventRecorder) *volumeStatCalculator {
59 return &volumeStatCalculator{
60 statsProvider: statsProvider,
61 jitterPeriod: jitterPeriod,
62 pod: pod,
63 stopChannel: make(chan struct{}),
64 eventRecorder: eventRecorder,
65 }
66 }
67
68
69 func (s *volumeStatCalculator) StartOnce() *volumeStatCalculator {
70 s.startO.Do(func() {
71 go wait.JitterUntil(func() {
72 s.calcAndStoreStats()
73 }, s.jitterPeriod, 1.0, true, s.stopChannel)
74 })
75 return s
76 }
77
78
79
80 func (s *volumeStatCalculator) StopOnce() *volumeStatCalculator {
81 s.stopO.Do(func() {
82 close(s.stopChannel)
83 })
84 return s
85 }
86
87
88 func (s *volumeStatCalculator) GetLatest() (PodVolumeStats, bool) {
89 result := s.latest.Load()
90 if result == nil {
91 return PodVolumeStats{}, false
92 }
93 return result.(PodVolumeStats), true
94 }
95
96
97
98 func (s *volumeStatCalculator) calcAndStoreStats() {
99
100 volumes, found := s.statsProvider.ListVolumesForPod(s.pod.UID)
101 blockVolumes, bvFound := s.statsProvider.ListBlockVolumesForPod(s.pod.UID)
102 if !found && !bvFound {
103 return
104 }
105
106 metricVolumes := make(map[string]volume.MetricsProvider)
107
108 if found {
109 for name, v := range volumes {
110 metricVolumes[name] = v
111 }
112 }
113 if bvFound {
114 for name, v := range blockVolumes {
115
116 if _, ok := v.(volume.MetricsProvider); ok {
117
118
119
120 if v.SupportsMetrics() {
121 metricVolumes[name] = v
122 }
123 }
124 }
125 }
126
127
128 volumesSpec := make(map[string]v1.Volume)
129 for _, v := range s.pod.Spec.Volumes {
130 volumesSpec[v.Name] = v
131 }
132
133
134 var ephemeralStats []stats.VolumeStats
135 var persistentStats []stats.VolumeStats
136 for name, v := range metricVolumes {
137 metric, err := func() (*volume.Metrics, error) {
138 trace := utiltrace.New(fmt.Sprintf("Calculate volume metrics of %v for pod %v/%v", name, s.pod.Namespace, s.pod.Name))
139 defer trace.LogIfLong(1 * time.Second)
140 return v.GetMetrics()
141 }()
142 if err != nil {
143
144 if !volume.IsNotSupported(err) {
145 klog.V(4).InfoS("Failed to calculate volume metrics", "pod", klog.KObj(s.pod), "podUID", s.pod.UID, "volumeName", name, "err", err)
146 }
147 continue
148 }
149
150 volSpec := volumesSpec[name]
151 var pvcRef *stats.PVCReference
152 if pvcSource := volSpec.PersistentVolumeClaim; pvcSource != nil {
153 pvcRef = &stats.PVCReference{
154 Name: pvcSource.ClaimName,
155 Namespace: s.pod.GetNamespace(),
156 }
157 } else if volSpec.Ephemeral != nil {
158 pvcRef = &stats.PVCReference{
159 Name: ephemeral.VolumeClaimName(s.pod, &volSpec),
160 Namespace: s.pod.GetNamespace(),
161 }
162 }
163 volumeStats := s.parsePodVolumeStats(name, pvcRef, metric, volSpec)
164 if util.IsLocalEphemeralVolume(volSpec) {
165 ephemeralStats = append(ephemeralStats, volumeStats)
166 } else {
167 persistentStats = append(persistentStats, volumeStats)
168 }
169
170 if utilfeature.DefaultFeatureGate.Enabled(features.CSIVolumeHealth) {
171 if metric.Abnormal != nil && metric.Message != nil && (*metric.Abnormal) {
172 s.eventRecorder.Event(s.pod, v1.EventTypeWarning, "VolumeConditionAbnormal", fmt.Sprintf("Volume %s: %s", name, *metric.Message))
173 }
174 }
175 }
176
177
178 s.latest.Store(PodVolumeStats{EphemeralVolumes: ephemeralStats,
179 PersistentVolumes: persistentStats})
180 }
181
182
183 func (s *volumeStatCalculator) parsePodVolumeStats(podName string, pvcRef *stats.PVCReference, metric *volume.Metrics, volSpec v1.Volume) stats.VolumeStats {
184
185 var (
186 available, capacity, used, inodes, inodesFree, inodesUsed uint64
187 )
188
189 if metric.Available != nil {
190 available = uint64(metric.Available.Value())
191 }
192 if metric.Capacity != nil {
193 capacity = uint64(metric.Capacity.Value())
194 }
195 if metric.Used != nil {
196 used = uint64(metric.Used.Value())
197 }
198 if metric.Inodes != nil {
199 inodes = uint64(metric.Inodes.Value())
200 }
201 if metric.InodesFree != nil {
202 inodesFree = uint64(metric.InodesFree.Value())
203 }
204 if metric.InodesUsed != nil {
205 inodesUsed = uint64(metric.InodesUsed.Value())
206 }
207
208 volumeStats := stats.VolumeStats{
209 Name: podName,
210 PVCRef: pvcRef,
211 FsStats: stats.FsStats{Time: metric.Time, AvailableBytes: &available, CapacityBytes: &capacity,
212 UsedBytes: &used, Inodes: &inodes, InodesFree: &inodesFree, InodesUsed: &inodesUsed},
213 }
214
215 if metric.Abnormal != nil {
216 volumeStats.VolumeHealthStats = &stats.VolumeHealthStats{
217 Abnormal: *metric.Abnormal,
218 }
219 }
220
221 return volumeStats
222 }
223
View as plain text