1 /* 2 Copyright 2014 The Kubernetes Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 package cache 18 19 import ( 20 "errors" 21 "sync" 22 23 "k8s.io/apimachinery/pkg/util/sets" 24 ) 25 26 // PopProcessFunc is passed to Pop() method of Queue interface. 27 // It is supposed to process the accumulator popped from the queue. 28 type PopProcessFunc func(obj interface{}, isInInitialList bool) error 29 30 // ErrRequeue may be returned by a PopProcessFunc to safely requeue 31 // the current item. The value of Err will be returned from Pop. 32 type ErrRequeue struct { 33 // Err is returned by the Pop function 34 Err error 35 } 36 37 // ErrFIFOClosed used when FIFO is closed 38 var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue") 39 40 func (e ErrRequeue) Error() string { 41 if e.Err == nil { 42 return "the popped item should be requeued without returning an error" 43 } 44 return e.Err.Error() 45 } 46 47 // Queue extends Store with a collection of Store keys to "process". 48 // Every Add, Update, or Delete may put the object's key in that collection. 49 // A Queue has a way to derive the corresponding key given an accumulator. 50 // A Queue can be accessed concurrently from multiple goroutines. 51 // A Queue can be "closed", after which Pop operations return an error. 52 type Queue interface { 53 Store 54 55 // Pop blocks until there is at least one key to process or the 56 // Queue is closed. In the latter case Pop returns with an error. 57 // In the former case Pop atomically picks one key to process, 58 // removes that (key, accumulator) association from the Store, and 59 // processes the accumulator. Pop returns the accumulator that 60 // was processed and the result of processing. The PopProcessFunc 61 // may return an ErrRequeue{inner} and in this case Pop will (a) 62 // return that (key, accumulator) association to the Queue as part 63 // of the atomic processing and (b) return the inner error from 64 // Pop. 65 Pop(PopProcessFunc) (interface{}, error) 66 67 // AddIfNotPresent puts the given accumulator into the Queue (in 68 // association with the accumulator's key) if and only if that key 69 // is not already associated with a non-empty accumulator. 70 AddIfNotPresent(interface{}) error 71 72 // HasSynced returns true if the first batch of keys have all been 73 // popped. The first batch of keys are those of the first Replace 74 // operation if that happened before any Add, AddIfNotPresent, 75 // Update, or Delete; otherwise the first batch is empty. 76 HasSynced() bool 77 78 // Close the queue 79 Close() 80 } 81 82 // Pop is helper function for popping from Queue. 83 // WARNING: Do NOT use this function in non-test code to avoid races 84 // unless you really really really really know what you are doing. 85 // 86 // NOTE: This function is deprecated and may be removed in the future without 87 // additional warning. 88 func Pop(queue Queue) interface{} { 89 var result interface{} 90 queue.Pop(func(obj interface{}, isInInitialList bool) error { 91 result = obj 92 return nil 93 }) 94 return result 95 } 96 97 // FIFO is a Queue in which (a) each accumulator is simply the most 98 // recently provided object and (b) the collection of keys to process 99 // is a FIFO. The accumulators all start out empty, and deleting an 100 // object from its accumulator empties the accumulator. The Resync 101 // operation is a no-op. 102 // 103 // Thus: if multiple adds/updates of a single object happen while that 104 // object's key is in the queue before it has been processed then it 105 // will only be processed once, and when it is processed the most 106 // recent version will be processed. This can't be done with a channel 107 // 108 // FIFO solves this use case: 109 // - You want to process every object (exactly) once. 110 // - You want to process the most recent version of the object when you process it. 111 // - You do not want to process deleted objects, they should be removed from the queue. 112 // - You do not want to periodically reprocess objects. 113 // 114 // Compare with DeltaFIFO for other use cases. 115 type FIFO struct { 116 lock sync.RWMutex 117 cond sync.Cond 118 // We depend on the property that every key in `items` is also in `queue` 119 items map[string]interface{} 120 queue []string 121 122 // populated is true if the first batch of items inserted by Replace() has been populated 123 // or Delete/Add/Update was called first. 124 populated bool 125 // initialPopulationCount is the number of items inserted by the first call of Replace() 126 initialPopulationCount int 127 128 // keyFunc is used to make the key used for queued item insertion and retrieval, and 129 // should be deterministic. 130 keyFunc KeyFunc 131 132 // Indication the queue is closed. 133 // Used to indicate a queue is closed so a control loop can exit when a queue is empty. 134 // Currently, not used to gate any of CRUD operations. 135 closed bool 136 } 137 138 var ( 139 _ = Queue(&FIFO{}) // FIFO is a Queue 140 ) 141 142 // Close the queue. 143 func (f *FIFO) Close() { 144 f.lock.Lock() 145 defer f.lock.Unlock() 146 f.closed = true 147 f.cond.Broadcast() 148 } 149 150 // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first, 151 // or the first batch of items inserted by Replace() has been popped. 152 func (f *FIFO) HasSynced() bool { 153 f.lock.Lock() 154 defer f.lock.Unlock() 155 return f.hasSynced_locked() 156 } 157 158 func (f *FIFO) hasSynced_locked() bool { 159 return f.populated && f.initialPopulationCount == 0 160 } 161 162 // Add inserts an item, and puts it in the queue. The item is only enqueued 163 // if it doesn't already exist in the set. 164 func (f *FIFO) Add(obj interface{}) error { 165 id, err := f.keyFunc(obj) 166 if err != nil { 167 return KeyError{obj, err} 168 } 169 f.lock.Lock() 170 defer f.lock.Unlock() 171 f.populated = true 172 if _, exists := f.items[id]; !exists { 173 f.queue = append(f.queue, id) 174 } 175 f.items[id] = obj 176 f.cond.Broadcast() 177 return nil 178 } 179 180 // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already 181 // present in the set, it is neither enqueued nor added to the set. 182 // 183 // This is useful in a single producer/consumer scenario so that the consumer can 184 // safely retry items without contending with the producer and potentially enqueueing 185 // stale items. 186 func (f *FIFO) AddIfNotPresent(obj interface{}) error { 187 id, err := f.keyFunc(obj) 188 if err != nil { 189 return KeyError{obj, err} 190 } 191 f.lock.Lock() 192 defer f.lock.Unlock() 193 f.addIfNotPresent(id, obj) 194 return nil 195 } 196 197 // addIfNotPresent assumes the fifo lock is already held and adds the provided 198 // item to the queue under id if it does not already exist. 199 func (f *FIFO) addIfNotPresent(id string, obj interface{}) { 200 f.populated = true 201 if _, exists := f.items[id]; exists { 202 return 203 } 204 205 f.queue = append(f.queue, id) 206 f.items[id] = obj 207 f.cond.Broadcast() 208 } 209 210 // Update is the same as Add in this implementation. 211 func (f *FIFO) Update(obj interface{}) error { 212 return f.Add(obj) 213 } 214 215 // Delete removes an item. It doesn't add it to the queue, because 216 // this implementation assumes the consumer only cares about the objects, 217 // not the order in which they were created/added. 218 func (f *FIFO) Delete(obj interface{}) error { 219 id, err := f.keyFunc(obj) 220 if err != nil { 221 return KeyError{obj, err} 222 } 223 f.lock.Lock() 224 defer f.lock.Unlock() 225 f.populated = true 226 delete(f.items, id) 227 return err 228 } 229 230 // List returns a list of all the items. 231 func (f *FIFO) List() []interface{} { 232 f.lock.RLock() 233 defer f.lock.RUnlock() 234 list := make([]interface{}, 0, len(f.items)) 235 for _, item := range f.items { 236 list = append(list, item) 237 } 238 return list 239 } 240 241 // ListKeys returns a list of all the keys of the objects currently 242 // in the FIFO. 243 func (f *FIFO) ListKeys() []string { 244 f.lock.RLock() 245 defer f.lock.RUnlock() 246 list := make([]string, 0, len(f.items)) 247 for key := range f.items { 248 list = append(list, key) 249 } 250 return list 251 } 252 253 // Get returns the requested item, or sets exists=false. 254 func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) { 255 key, err := f.keyFunc(obj) 256 if err != nil { 257 return nil, false, KeyError{obj, err} 258 } 259 return f.GetByKey(key) 260 } 261 262 // GetByKey returns the requested item, or sets exists=false. 263 func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) { 264 f.lock.RLock() 265 defer f.lock.RUnlock() 266 item, exists = f.items[key] 267 return item, exists, nil 268 } 269 270 // IsClosed checks if the queue is closed 271 func (f *FIFO) IsClosed() bool { 272 f.lock.Lock() 273 defer f.lock.Unlock() 274 return f.closed 275 } 276 277 // Pop waits until an item is ready and processes it. If multiple items are 278 // ready, they are returned in the order in which they were added/updated. 279 // The item is removed from the queue (and the store) before it is processed, 280 // so if you don't successfully process it, it should be added back with 281 // AddIfNotPresent(). process function is called under lock, so it is safe 282 // update data structures in it that need to be in sync with the queue. 283 func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) { 284 f.lock.Lock() 285 defer f.lock.Unlock() 286 for { 287 for len(f.queue) == 0 { 288 // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. 289 // When Close() is called, the f.closed is set and the condition is broadcasted. 290 // Which causes this loop to continue and return from the Pop(). 291 if f.closed { 292 return nil, ErrFIFOClosed 293 } 294 295 f.cond.Wait() 296 } 297 isInInitialList := !f.hasSynced_locked() 298 id := f.queue[0] 299 f.queue = f.queue[1:] 300 if f.initialPopulationCount > 0 { 301 f.initialPopulationCount-- 302 } 303 item, ok := f.items[id] 304 if !ok { 305 // Item may have been deleted subsequently. 306 continue 307 } 308 delete(f.items, id) 309 err := process(item, isInInitialList) 310 if e, ok := err.(ErrRequeue); ok { 311 f.addIfNotPresent(id, item) 312 err = e.Err 313 } 314 return item, err 315 } 316 } 317 318 // Replace will delete the contents of 'f', using instead the given map. 319 // 'f' takes ownership of the map, you should not reference the map again 320 // after calling this function. f's queue is reset, too; upon return, it 321 // will contain the items in the map, in no particular order. 322 func (f *FIFO) Replace(list []interface{}, resourceVersion string) error { 323 items := make(map[string]interface{}, len(list)) 324 for _, item := range list { 325 key, err := f.keyFunc(item) 326 if err != nil { 327 return KeyError{item, err} 328 } 329 items[key] = item 330 } 331 332 f.lock.Lock() 333 defer f.lock.Unlock() 334 335 if !f.populated { 336 f.populated = true 337 f.initialPopulationCount = len(items) 338 } 339 340 f.items = items 341 f.queue = f.queue[:0] 342 for id := range items { 343 f.queue = append(f.queue, id) 344 } 345 if len(f.queue) > 0 { 346 f.cond.Broadcast() 347 } 348 return nil 349 } 350 351 // Resync will ensure that every object in the Store has its key in the queue. 352 // This should be a no-op, because that property is maintained by all operations. 353 func (f *FIFO) Resync() error { 354 f.lock.Lock() 355 defer f.lock.Unlock() 356 357 inQueue := sets.NewString() 358 for _, id := range f.queue { 359 inQueue.Insert(id) 360 } 361 for id := range f.items { 362 if !inQueue.Has(id) { 363 f.queue = append(f.queue, id) 364 } 365 } 366 if len(f.queue) > 0 { 367 f.cond.Broadcast() 368 } 369 return nil 370 } 371 372 // NewFIFO returns a Store which can be used to queue up items to 373 // process. 374 func NewFIFO(keyFunc KeyFunc) *FIFO { 375 f := &FIFO{ 376 items: map[string]interface{}{}, 377 queue: []string{}, 378 keyFunc: keyFunc, 379 } 380 f.cond.L = &f.lock 381 return f 382 } 383