...

Source file src/k8s.io/client-go/tools/cache/fifo.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package cache
    18  
    19  import (
    20  	"errors"
    21  	"sync"
    22  
    23  	"k8s.io/apimachinery/pkg/util/sets"
    24  )
    25  
    26  // PopProcessFunc is passed to Pop() method of Queue interface.
    27  // It is supposed to process the accumulator popped from the queue.
    28  type PopProcessFunc func(obj interface{}, isInInitialList bool) error
    29  
    30  // ErrRequeue may be returned by a PopProcessFunc to safely requeue
    31  // the current item. The value of Err will be returned from Pop.
    32  type ErrRequeue struct {
    33  	// Err is returned by the Pop function
    34  	Err error
    35  }
    36  
    37  // ErrFIFOClosed used when FIFO is closed
    38  var ErrFIFOClosed = errors.New("DeltaFIFO: manipulating with closed queue")
    39  
    40  func (e ErrRequeue) Error() string {
    41  	if e.Err == nil {
    42  		return "the popped item should be requeued without returning an error"
    43  	}
    44  	return e.Err.Error()
    45  }
    46  
    47  // Queue extends Store with a collection of Store keys to "process".
    48  // Every Add, Update, or Delete may put the object's key in that collection.
    49  // A Queue has a way to derive the corresponding key given an accumulator.
    50  // A Queue can be accessed concurrently from multiple goroutines.
    51  // A Queue can be "closed", after which Pop operations return an error.
    52  type Queue interface {
    53  	Store
    54  
    55  	// Pop blocks until there is at least one key to process or the
    56  	// Queue is closed.  In the latter case Pop returns with an error.
    57  	// In the former case Pop atomically picks one key to process,
    58  	// removes that (key, accumulator) association from the Store, and
    59  	// processes the accumulator.  Pop returns the accumulator that
    60  	// was processed and the result of processing.  The PopProcessFunc
    61  	// may return an ErrRequeue{inner} and in this case Pop will (a)
    62  	// return that (key, accumulator) association to the Queue as part
    63  	// of the atomic processing and (b) return the inner error from
    64  	// Pop.
    65  	Pop(PopProcessFunc) (interface{}, error)
    66  
    67  	// AddIfNotPresent puts the given accumulator into the Queue (in
    68  	// association with the accumulator's key) if and only if that key
    69  	// is not already associated with a non-empty accumulator.
    70  	AddIfNotPresent(interface{}) error
    71  
    72  	// HasSynced returns true if the first batch of keys have all been
    73  	// popped.  The first batch of keys are those of the first Replace
    74  	// operation if that happened before any Add, AddIfNotPresent,
    75  	// Update, or Delete; otherwise the first batch is empty.
    76  	HasSynced() bool
    77  
    78  	// Close the queue
    79  	Close()
    80  }
    81  
    82  // Pop is helper function for popping from Queue.
    83  // WARNING: Do NOT use this function in non-test code to avoid races
    84  // unless you really really really really know what you are doing.
    85  //
    86  // NOTE: This function is deprecated and may be removed in the future without
    87  // additional warning.
    88  func Pop(queue Queue) interface{} {
    89  	var result interface{}
    90  	queue.Pop(func(obj interface{}, isInInitialList bool) error {
    91  		result = obj
    92  		return nil
    93  	})
    94  	return result
    95  }
    96  
    97  // FIFO is a Queue in which (a) each accumulator is simply the most
    98  // recently provided object and (b) the collection of keys to process
    99  // is a FIFO.  The accumulators all start out empty, and deleting an
   100  // object from its accumulator empties the accumulator.  The Resync
   101  // operation is a no-op.
   102  //
   103  // Thus: if multiple adds/updates of a single object happen while that
   104  // object's key is in the queue before it has been processed then it
   105  // will only be processed once, and when it is processed the most
   106  // recent version will be processed. This can't be done with a channel
   107  //
   108  // FIFO solves this use case:
   109  //   - You want to process every object (exactly) once.
   110  //   - You want to process the most recent version of the object when you process it.
   111  //   - You do not want to process deleted objects, they should be removed from the queue.
   112  //   - You do not want to periodically reprocess objects.
   113  //
   114  // Compare with DeltaFIFO for other use cases.
   115  type FIFO struct {
   116  	lock sync.RWMutex
   117  	cond sync.Cond
   118  	// We depend on the property that every key in `items` is also in `queue`
   119  	items map[string]interface{}
   120  	queue []string
   121  
   122  	// populated is true if the first batch of items inserted by Replace() has been populated
   123  	// or Delete/Add/Update was called first.
   124  	populated bool
   125  	// initialPopulationCount is the number of items inserted by the first call of Replace()
   126  	initialPopulationCount int
   127  
   128  	// keyFunc is used to make the key used for queued item insertion and retrieval, and
   129  	// should be deterministic.
   130  	keyFunc KeyFunc
   131  
   132  	// Indication the queue is closed.
   133  	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
   134  	// Currently, not used to gate any of CRUD operations.
   135  	closed bool
   136  }
   137  
   138  var (
   139  	_ = Queue(&FIFO{}) // FIFO is a Queue
   140  )
   141  
   142  // Close the queue.
   143  func (f *FIFO) Close() {
   144  	f.lock.Lock()
   145  	defer f.lock.Unlock()
   146  	f.closed = true
   147  	f.cond.Broadcast()
   148  }
   149  
   150  // HasSynced returns true if an Add/Update/Delete/AddIfNotPresent are called first,
   151  // or the first batch of items inserted by Replace() has been popped.
   152  func (f *FIFO) HasSynced() bool {
   153  	f.lock.Lock()
   154  	defer f.lock.Unlock()
   155  	return f.hasSynced_locked()
   156  }
   157  
   158  func (f *FIFO) hasSynced_locked() bool {
   159  	return f.populated && f.initialPopulationCount == 0
   160  }
   161  
   162  // Add inserts an item, and puts it in the queue. The item is only enqueued
   163  // if it doesn't already exist in the set.
   164  func (f *FIFO) Add(obj interface{}) error {
   165  	id, err := f.keyFunc(obj)
   166  	if err != nil {
   167  		return KeyError{obj, err}
   168  	}
   169  	f.lock.Lock()
   170  	defer f.lock.Unlock()
   171  	f.populated = true
   172  	if _, exists := f.items[id]; !exists {
   173  		f.queue = append(f.queue, id)
   174  	}
   175  	f.items[id] = obj
   176  	f.cond.Broadcast()
   177  	return nil
   178  }
   179  
   180  // AddIfNotPresent inserts an item, and puts it in the queue. If the item is already
   181  // present in the set, it is neither enqueued nor added to the set.
   182  //
   183  // This is useful in a single producer/consumer scenario so that the consumer can
   184  // safely retry items without contending with the producer and potentially enqueueing
   185  // stale items.
   186  func (f *FIFO) AddIfNotPresent(obj interface{}) error {
   187  	id, err := f.keyFunc(obj)
   188  	if err != nil {
   189  		return KeyError{obj, err}
   190  	}
   191  	f.lock.Lock()
   192  	defer f.lock.Unlock()
   193  	f.addIfNotPresent(id, obj)
   194  	return nil
   195  }
   196  
   197  // addIfNotPresent assumes the fifo lock is already held and adds the provided
   198  // item to the queue under id if it does not already exist.
   199  func (f *FIFO) addIfNotPresent(id string, obj interface{}) {
   200  	f.populated = true
   201  	if _, exists := f.items[id]; exists {
   202  		return
   203  	}
   204  
   205  	f.queue = append(f.queue, id)
   206  	f.items[id] = obj
   207  	f.cond.Broadcast()
   208  }
   209  
   210  // Update is the same as Add in this implementation.
   211  func (f *FIFO) Update(obj interface{}) error {
   212  	return f.Add(obj)
   213  }
   214  
   215  // Delete removes an item. It doesn't add it to the queue, because
   216  // this implementation assumes the consumer only cares about the objects,
   217  // not the order in which they were created/added.
   218  func (f *FIFO) Delete(obj interface{}) error {
   219  	id, err := f.keyFunc(obj)
   220  	if err != nil {
   221  		return KeyError{obj, err}
   222  	}
   223  	f.lock.Lock()
   224  	defer f.lock.Unlock()
   225  	f.populated = true
   226  	delete(f.items, id)
   227  	return err
   228  }
   229  
   230  // List returns a list of all the items.
   231  func (f *FIFO) List() []interface{} {
   232  	f.lock.RLock()
   233  	defer f.lock.RUnlock()
   234  	list := make([]interface{}, 0, len(f.items))
   235  	for _, item := range f.items {
   236  		list = append(list, item)
   237  	}
   238  	return list
   239  }
   240  
   241  // ListKeys returns a list of all the keys of the objects currently
   242  // in the FIFO.
   243  func (f *FIFO) ListKeys() []string {
   244  	f.lock.RLock()
   245  	defer f.lock.RUnlock()
   246  	list := make([]string, 0, len(f.items))
   247  	for key := range f.items {
   248  		list = append(list, key)
   249  	}
   250  	return list
   251  }
   252  
   253  // Get returns the requested item, or sets exists=false.
   254  func (f *FIFO) Get(obj interface{}) (item interface{}, exists bool, err error) {
   255  	key, err := f.keyFunc(obj)
   256  	if err != nil {
   257  		return nil, false, KeyError{obj, err}
   258  	}
   259  	return f.GetByKey(key)
   260  }
   261  
   262  // GetByKey returns the requested item, or sets exists=false.
   263  func (f *FIFO) GetByKey(key string) (item interface{}, exists bool, err error) {
   264  	f.lock.RLock()
   265  	defer f.lock.RUnlock()
   266  	item, exists = f.items[key]
   267  	return item, exists, nil
   268  }
   269  
   270  // IsClosed checks if the queue is closed
   271  func (f *FIFO) IsClosed() bool {
   272  	f.lock.Lock()
   273  	defer f.lock.Unlock()
   274  	return f.closed
   275  }
   276  
   277  // Pop waits until an item is ready and processes it. If multiple items are
   278  // ready, they are returned in the order in which they were added/updated.
   279  // The item is removed from the queue (and the store) before it is processed,
   280  // so if you don't successfully process it, it should be added back with
   281  // AddIfNotPresent(). process function is called under lock, so it is safe
   282  // update data structures in it that need to be in sync with the queue.
   283  func (f *FIFO) Pop(process PopProcessFunc) (interface{}, error) {
   284  	f.lock.Lock()
   285  	defer f.lock.Unlock()
   286  	for {
   287  		for len(f.queue) == 0 {
   288  			// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
   289  			// When Close() is called, the f.closed is set and the condition is broadcasted.
   290  			// Which causes this loop to continue and return from the Pop().
   291  			if f.closed {
   292  				return nil, ErrFIFOClosed
   293  			}
   294  
   295  			f.cond.Wait()
   296  		}
   297  		isInInitialList := !f.hasSynced_locked()
   298  		id := f.queue[0]
   299  		f.queue = f.queue[1:]
   300  		if f.initialPopulationCount > 0 {
   301  			f.initialPopulationCount--
   302  		}
   303  		item, ok := f.items[id]
   304  		if !ok {
   305  			// Item may have been deleted subsequently.
   306  			continue
   307  		}
   308  		delete(f.items, id)
   309  		err := process(item, isInInitialList)
   310  		if e, ok := err.(ErrRequeue); ok {
   311  			f.addIfNotPresent(id, item)
   312  			err = e.Err
   313  		}
   314  		return item, err
   315  	}
   316  }
   317  
   318  // Replace will delete the contents of 'f', using instead the given map.
   319  // 'f' takes ownership of the map, you should not reference the map again
   320  // after calling this function. f's queue is reset, too; upon return, it
   321  // will contain the items in the map, in no particular order.
   322  func (f *FIFO) Replace(list []interface{}, resourceVersion string) error {
   323  	items := make(map[string]interface{}, len(list))
   324  	for _, item := range list {
   325  		key, err := f.keyFunc(item)
   326  		if err != nil {
   327  			return KeyError{item, err}
   328  		}
   329  		items[key] = item
   330  	}
   331  
   332  	f.lock.Lock()
   333  	defer f.lock.Unlock()
   334  
   335  	if !f.populated {
   336  		f.populated = true
   337  		f.initialPopulationCount = len(items)
   338  	}
   339  
   340  	f.items = items
   341  	f.queue = f.queue[:0]
   342  	for id := range items {
   343  		f.queue = append(f.queue, id)
   344  	}
   345  	if len(f.queue) > 0 {
   346  		f.cond.Broadcast()
   347  	}
   348  	return nil
   349  }
   350  
   351  // Resync will ensure that every object in the Store has its key in the queue.
   352  // This should be a no-op, because that property is maintained by all operations.
   353  func (f *FIFO) Resync() error {
   354  	f.lock.Lock()
   355  	defer f.lock.Unlock()
   356  
   357  	inQueue := sets.NewString()
   358  	for _, id := range f.queue {
   359  		inQueue.Insert(id)
   360  	}
   361  	for id := range f.items {
   362  		if !inQueue.Has(id) {
   363  			f.queue = append(f.queue, id)
   364  		}
   365  	}
   366  	if len(f.queue) > 0 {
   367  		f.cond.Broadcast()
   368  	}
   369  	return nil
   370  }
   371  
   372  // NewFIFO returns a Store which can be used to queue up items to
   373  // process.
   374  func NewFIFO(keyFunc KeyFunc) *FIFO {
   375  	f := &FIFO{
   376  		items:   map[string]interface{}{},
   377  		queue:   []string{},
   378  		keyFunc: keyFunc,
   379  	}
   380  	f.cond.L = &f.lock
   381  	return f
   382  }
   383  

View as plain text