1
16
17 package cache
18
19 import (
20 "fmt"
21 "strconv"
22 "sync"
23 "time"
24
25 "k8s.io/klog/v2"
26
27 "k8s.io/apimachinery/pkg/api/meta"
28 "k8s.io/apimachinery/pkg/runtime"
29 utilcache "k8s.io/apimachinery/pkg/util/cache"
30 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
31 "k8s.io/apimachinery/pkg/util/sets"
32 )
33
34
35
36
37
38
39 type MutationCache interface {
40 GetByKey(key string) (interface{}, bool, error)
41 ByIndex(indexName, indexKey string) ([]interface{}, error)
42 Mutation(interface{})
43 }
44
45
46 type ResourceVersionComparator interface {
47 CompareResourceVersion(lhs, rhs runtime.Object) int
48 }
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63 func NewIntegerResourceVersionMutationCache(backingCache Store, indexer Indexer, ttl time.Duration, includeAdds bool) MutationCache {
64 return &mutationCache{
65 backingCache: backingCache,
66 indexer: indexer,
67 mutationCache: utilcache.NewLRUExpireCache(100),
68 comparator: etcdObjectVersioner{},
69 ttl: ttl,
70 includeAdds: includeAdds,
71 }
72 }
73
74
75
76
77 type mutationCache struct {
78 lock sync.Mutex
79 backingCache Store
80 indexer Indexer
81 mutationCache *utilcache.LRUExpireCache
82 includeAdds bool
83 ttl time.Duration
84
85 comparator ResourceVersionComparator
86 }
87
88
89
90
91 func (c *mutationCache) GetByKey(key string) (interface{}, bool, error) {
92 c.lock.Lock()
93 defer c.lock.Unlock()
94
95 obj, exists, err := c.backingCache.GetByKey(key)
96 if err != nil {
97 return nil, false, err
98 }
99 if !exists {
100 if !c.includeAdds {
101
102
103 return nil, false, nil
104 }
105 obj, exists = c.mutationCache.Get(key)
106 if !exists {
107 return nil, false, nil
108 }
109 }
110 objRuntime, ok := obj.(runtime.Object)
111 if !ok {
112 return obj, true, nil
113 }
114 return c.newerObject(key, objRuntime), true, nil
115 }
116
117
118
119 func (c *mutationCache) ByIndex(name string, indexKey string) ([]interface{}, error) {
120 c.lock.Lock()
121 defer c.lock.Unlock()
122 if c.indexer == nil {
123 return nil, fmt.Errorf("no indexer has been provided to the mutation cache")
124 }
125 keys, err := c.indexer.IndexKeys(name, indexKey)
126 if err != nil {
127 return nil, err
128 }
129 var items []interface{}
130 keySet := sets.NewString()
131 for _, key := range keys {
132 keySet.Insert(key)
133 obj, exists, err := c.indexer.GetByKey(key)
134 if err != nil {
135 return nil, err
136 }
137 if !exists {
138 continue
139 }
140 if objRuntime, ok := obj.(runtime.Object); ok {
141 items = append(items, c.newerObject(key, objRuntime))
142 } else {
143 items = append(items, obj)
144 }
145 }
146
147 if c.includeAdds {
148 fn := c.indexer.GetIndexers()[name]
149
150 for _, key := range c.mutationCache.Keys() {
151 updated, ok := c.mutationCache.Get(key)
152 if !ok {
153 continue
154 }
155 if keySet.Has(key.(string)) {
156 continue
157 }
158 elements, err := fn(updated)
159 if err != nil {
160 klog.V(4).Infof("Unable to calculate an index entry for mutation cache entry %s: %v", key, err)
161 continue
162 }
163 for _, inIndex := range elements {
164 if inIndex != indexKey {
165 continue
166 }
167 items = append(items, updated)
168 break
169 }
170 }
171 }
172
173 return items, nil
174 }
175
176
177
178
179 func (c *mutationCache) newerObject(key string, backing runtime.Object) runtime.Object {
180 mutatedObj, exists := c.mutationCache.Get(key)
181 if !exists {
182 return backing
183 }
184 mutatedObjRuntime, ok := mutatedObj.(runtime.Object)
185 if !ok {
186 return backing
187 }
188 if c.comparator.CompareResourceVersion(backing, mutatedObjRuntime) >= 0 {
189 c.mutationCache.Remove(key)
190 return backing
191 }
192 return mutatedObjRuntime
193 }
194
195
196
197
198
199
200 func (c *mutationCache) Mutation(obj interface{}) {
201 c.lock.Lock()
202 defer c.lock.Unlock()
203
204 key, err := DeletionHandlingMetaNamespaceKeyFunc(obj)
205 if err != nil {
206
207 utilruntime.HandleError(err)
208 return
209 }
210
211 if objRuntime, ok := obj.(runtime.Object); ok {
212 if mutatedObj, exists := c.mutationCache.Get(key); exists {
213 if mutatedObjRuntime, ok := mutatedObj.(runtime.Object); ok {
214 if c.comparator.CompareResourceVersion(objRuntime, mutatedObjRuntime) < 0 {
215 return
216 }
217 }
218 }
219 }
220 c.mutationCache.Add(key, obj, c.ttl)
221 }
222
223
224
225 type etcdObjectVersioner struct{}
226
227
228 func (a etcdObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) {
229 accessor, err := meta.Accessor(obj)
230 if err != nil {
231 return 0, err
232 }
233 version := accessor.GetResourceVersion()
234 if len(version) == 0 {
235 return 0, nil
236 }
237 return strconv.ParseUint(version, 10, 64)
238 }
239
240
241
242 func (a etcdObjectVersioner) CompareResourceVersion(lhs, rhs runtime.Object) int {
243 lhsVersion, err := a.ObjectResourceVersion(lhs)
244 if err != nil {
245
246 panic(err)
247 }
248 rhsVersion, err := a.ObjectResourceVersion(rhs)
249 if err != nil {
250
251 panic(err)
252 }
253
254 if lhsVersion == rhsVersion {
255 return 0
256 }
257 if lhsVersion < rhsVersion {
258 return -1
259 }
260
261 return 1
262 }
263
View as plain text