...
1
16
17 package cache
18
19 import (
20 "fmt"
21 "os"
22 "reflect"
23 "strconv"
24 "sync"
25 "time"
26
27 "k8s.io/klog/v2"
28
29 "k8s.io/apimachinery/pkg/runtime"
30 "k8s.io/apimachinery/pkg/util/diff"
31 )
32
33 var mutationDetectionEnabled = false
34
35 func init() {
36 mutationDetectionEnabled, _ = strconv.ParseBool(os.Getenv("KUBE_CACHE_MUTATION_DETECTOR"))
37 }
38
39
40 type MutationDetector interface {
41
42 AddObject(obj interface{})
43
44
45 Run(stopCh <-chan struct{})
46 }
47
48
49 func NewCacheMutationDetector(name string) MutationDetector {
50 if !mutationDetectionEnabled {
51 return dummyMutationDetector{}
52 }
53 klog.Warningln("Mutation detector is enabled, this will result in memory leakage.")
54 return &defaultCacheMutationDetector{name: name, period: 1 * time.Second, retainDuration: 2 * time.Minute}
55 }
56
57 type dummyMutationDetector struct{}
58
59 func (dummyMutationDetector) Run(stopCh <-chan struct{}) {
60 }
61 func (dummyMutationDetector) AddObject(obj interface{}) {
62 }
63
64
65
66
67 type defaultCacheMutationDetector struct {
68 name string
69 period time.Duration
70
71
72 compareObjectsLock sync.Mutex
73
74
75 addedObjsLock sync.Mutex
76 addedObjs []cacheObj
77
78 cachedObjs []cacheObj
79
80 retainDuration time.Duration
81 lastRotated time.Time
82 retainedCachedObjs []cacheObj
83
84
85
86
87
88 failureFunc func(message string)
89 }
90
91
92 type cacheObj struct {
93 cached interface{}
94 copied interface{}
95 }
96
97 func (d *defaultCacheMutationDetector) Run(stopCh <-chan struct{}) {
98
99 for {
100 if d.lastRotated.IsZero() {
101 d.lastRotated = time.Now()
102 } else if time.Since(d.lastRotated) > d.retainDuration {
103 d.retainedCachedObjs = d.cachedObjs
104 d.cachedObjs = nil
105 d.lastRotated = time.Now()
106 }
107
108 d.CompareObjects()
109
110 select {
111 case <-stopCh:
112 return
113 case <-time.After(d.period):
114 }
115 }
116 }
117
118
119
120 func (d *defaultCacheMutationDetector) AddObject(obj interface{}) {
121 if _, ok := obj.(DeletedFinalStateUnknown); ok {
122 return
123 }
124 if obj, ok := obj.(runtime.Object); ok {
125 copiedObj := obj.DeepCopyObject()
126
127 d.addedObjsLock.Lock()
128 defer d.addedObjsLock.Unlock()
129 d.addedObjs = append(d.addedObjs, cacheObj{cached: obj, copied: copiedObj})
130 }
131 }
132
133 func (d *defaultCacheMutationDetector) CompareObjects() {
134 d.compareObjectsLock.Lock()
135 defer d.compareObjectsLock.Unlock()
136
137
138
139 d.addedObjsLock.Lock()
140 d.cachedObjs = append(d.cachedObjs, d.addedObjs...)
141 d.addedObjs = nil
142 d.addedObjsLock.Unlock()
143
144 altered := false
145 for i, obj := range d.cachedObjs {
146 if !reflect.DeepEqual(obj.cached, obj.copied) {
147 fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied))
148 altered = true
149 }
150 }
151 for i, obj := range d.retainedCachedObjs {
152 if !reflect.DeepEqual(obj.cached, obj.copied) {
153 fmt.Printf("CACHE %s[%d] ALTERED!\n%v\n", d.name, i, diff.ObjectGoPrintSideBySide(obj.cached, obj.copied))
154 altered = true
155 }
156 }
157
158 if altered {
159 msg := fmt.Sprintf("cache %s modified", d.name)
160 if d.failureFunc != nil {
161 d.failureFunc(msg)
162 return
163 }
164 panic(msg)
165 }
166 }
167
View as plain text