...
1
16
17 package stats
18
19 import (
20 "sync"
21 "sync/atomic"
22 "time"
23
24 "k8s.io/apimachinery/pkg/types"
25 "k8s.io/apimachinery/pkg/util/wait"
26 "k8s.io/client-go/tools/record"
27
28 "k8s.io/klog/v2"
29 )
30
31 type statCache map[types.UID]*volumeStatCalculator
32
33
34 type fsResourceAnalyzerInterface interface {
35 GetPodVolumeStats(uid types.UID) (PodVolumeStats, bool)
36 }
37
38
39 type fsResourceAnalyzer struct {
40 statsProvider Provider
41 calcPeriod time.Duration
42 cachedVolumeStats atomic.Value
43 startOnce sync.Once
44 eventRecorder record.EventRecorder
45 }
46
47 var _ fsResourceAnalyzerInterface = &fsResourceAnalyzer{}
48
49
50 func newFsResourceAnalyzer(statsProvider Provider, calcVolumePeriod time.Duration, eventRecorder record.EventRecorder) *fsResourceAnalyzer {
51 r := &fsResourceAnalyzer{
52 statsProvider: statsProvider,
53 calcPeriod: calcVolumePeriod,
54 eventRecorder: eventRecorder,
55 }
56 r.cachedVolumeStats.Store(make(statCache))
57 return r
58 }
59
60
61 func (s *fsResourceAnalyzer) Start() {
62 s.startOnce.Do(func() {
63 if s.calcPeriod <= 0 {
64 klog.InfoS("Volume stats collection disabled")
65 return
66 }
67 klog.InfoS("Starting FS ResourceAnalyzer")
68 go wait.Forever(func() { s.updateCachedPodVolumeStats() }, s.calcPeriod)
69 })
70 }
71
72
73 func (s *fsResourceAnalyzer) updateCachedPodVolumeStats() {
74 oldCache := s.cachedVolumeStats.Load().(statCache)
75 newCache := make(statCache)
76
77
78 for _, pod := range s.statsProvider.GetPods() {
79 if value, found := oldCache[pod.GetUID()]; !found {
80 newCache[pod.GetUID()] = newVolumeStatCalculator(s.statsProvider, s.calcPeriod, pod, s.eventRecorder).StartOnce()
81 } else {
82 newCache[pod.GetUID()] = value
83 }
84 }
85
86
87 for uid, entry := range oldCache {
88 if _, found := newCache[uid]; !found {
89 entry.StopOnce()
90 }
91 }
92
93
94 s.cachedVolumeStats.Store(newCache)
95 }
96
97
98
99 func (s *fsResourceAnalyzer) GetPodVolumeStats(uid types.UID) (PodVolumeStats, bool) {
100 cache := s.cachedVolumeStats.Load().(statCache)
101 statCalc, found := cache[uid]
102 if !found {
103
104
105 return PodVolumeStats{}, false
106 }
107 return statCalc.GetLatest()
108 }
109
View as plain text