...

Source file src/k8s.io/client-go/tools/cache/heap.go

Documentation: k8s.io/client-go/tools/cache

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // This file implements a heap data structure.
    18  
    19  package cache
    20  
    21  import (
    22  	"container/heap"
    23  	"fmt"
    24  	"sync"
    25  )
    26  
    27  const (
    28  	closedMsg = "heap is closed"
    29  )
    30  
    31  // LessFunc is used to compare two objects in the heap.
    32  type LessFunc func(interface{}, interface{}) bool
    33  
    34  type heapItem struct {
    35  	obj   interface{} // The object which is stored in the heap.
    36  	index int         // The index of the object's key in the Heap.queue.
    37  }
    38  
    39  type itemKeyValue struct {
    40  	key string
    41  	obj interface{}
    42  }
    43  
    44  // heapData is an internal struct that implements the standard heap interface
    45  // and keeps the data stored in the heap.
    46  type heapData struct {
    47  	// items is a map from key of the objects to the objects and their index.
    48  	// We depend on the property that items in the map are in the queue and vice versa.
    49  	items map[string]*heapItem
    50  	// queue implements a heap data structure and keeps the order of elements
    51  	// according to the heap invariant. The queue keeps the keys of objects stored
    52  	// in "items".
    53  	queue []string
    54  
    55  	// keyFunc is used to make the key used for queued item insertion and retrieval, and
    56  	// should be deterministic.
    57  	keyFunc KeyFunc
    58  	// lessFunc is used to compare two objects in the heap.
    59  	lessFunc LessFunc
    60  }
    61  
    62  var (
    63  	_ = heap.Interface(&heapData{}) // heapData is a standard heap
    64  )
    65  
    66  // Less compares two objects and returns true if the first one should go
    67  // in front of the second one in the heap.
    68  func (h *heapData) Less(i, j int) bool {
    69  	if i > len(h.queue) || j > len(h.queue) {
    70  		return false
    71  	}
    72  	itemi, ok := h.items[h.queue[i]]
    73  	if !ok {
    74  		return false
    75  	}
    76  	itemj, ok := h.items[h.queue[j]]
    77  	if !ok {
    78  		return false
    79  	}
    80  	return h.lessFunc(itemi.obj, itemj.obj)
    81  }
    82  
    83  // Len returns the number of items in the Heap.
    84  func (h *heapData) Len() int { return len(h.queue) }
    85  
    86  // Swap implements swapping of two elements in the heap. This is a part of standard
    87  // heap interface and should never be called directly.
    88  func (h *heapData) Swap(i, j int) {
    89  	h.queue[i], h.queue[j] = h.queue[j], h.queue[i]
    90  	item := h.items[h.queue[i]]
    91  	item.index = i
    92  	item = h.items[h.queue[j]]
    93  	item.index = j
    94  }
    95  
    96  // Push is supposed to be called by heap.Push only.
    97  func (h *heapData) Push(kv interface{}) {
    98  	keyValue := kv.(*itemKeyValue)
    99  	n := len(h.queue)
   100  	h.items[keyValue.key] = &heapItem{keyValue.obj, n}
   101  	h.queue = append(h.queue, keyValue.key)
   102  }
   103  
   104  // Pop is supposed to be called by heap.Pop only.
   105  func (h *heapData) Pop() interface{} {
   106  	key := h.queue[len(h.queue)-1]
   107  	h.queue = h.queue[0 : len(h.queue)-1]
   108  	item, ok := h.items[key]
   109  	if !ok {
   110  		// This is an error
   111  		return nil
   112  	}
   113  	delete(h.items, key)
   114  	return item.obj
   115  }
   116  
   117  // Heap is a thread-safe producer/consumer queue that implements a heap data structure.
   118  // It can be used to implement priority queues and similar data structures.
   119  type Heap struct {
   120  	lock sync.RWMutex
   121  	cond sync.Cond
   122  
   123  	// data stores objects and has a queue that keeps their ordering according
   124  	// to the heap invariant.
   125  	data *heapData
   126  
   127  	// closed indicates that the queue is closed.
   128  	// It is mainly used to let Pop() exit its control loop while waiting for an item.
   129  	closed bool
   130  }
   131  
   132  // Close the Heap and signals condition variables that may be waiting to pop
   133  // items from the heap.
   134  func (h *Heap) Close() {
   135  	h.lock.Lock()
   136  	defer h.lock.Unlock()
   137  	h.closed = true
   138  	h.cond.Broadcast()
   139  }
   140  
   141  // Add inserts an item, and puts it in the queue. The item is updated if it
   142  // already exists.
   143  func (h *Heap) Add(obj interface{}) error {
   144  	key, err := h.data.keyFunc(obj)
   145  	if err != nil {
   146  		return KeyError{obj, err}
   147  	}
   148  	h.lock.Lock()
   149  	defer h.lock.Unlock()
   150  	if h.closed {
   151  		return fmt.Errorf(closedMsg)
   152  	}
   153  	if _, exists := h.data.items[key]; exists {
   154  		h.data.items[key].obj = obj
   155  		heap.Fix(h.data, h.data.items[key].index)
   156  	} else {
   157  		h.addIfNotPresentLocked(key, obj)
   158  	}
   159  	h.cond.Broadcast()
   160  	return nil
   161  }
   162  
   163  // BulkAdd adds all the items in the list to the queue and then signals the condition
   164  // variable. It is useful when the caller would like to add all of the items
   165  // to the queue before consumer starts processing them.
   166  func (h *Heap) BulkAdd(list []interface{}) error {
   167  	h.lock.Lock()
   168  	defer h.lock.Unlock()
   169  	if h.closed {
   170  		return fmt.Errorf(closedMsg)
   171  	}
   172  	for _, obj := range list {
   173  		key, err := h.data.keyFunc(obj)
   174  		if err != nil {
   175  			return KeyError{obj, err}
   176  		}
   177  		if _, exists := h.data.items[key]; exists {
   178  			h.data.items[key].obj = obj
   179  			heap.Fix(h.data, h.data.items[key].index)
   180  		} else {
   181  			h.addIfNotPresentLocked(key, obj)
   182  		}
   183  	}
   184  	h.cond.Broadcast()
   185  	return nil
   186  }
   187  
   188  // AddIfNotPresent inserts an item, and puts it in the queue. If an item with
   189  // the key is present in the map, no changes is made to the item.
   190  //
   191  // This is useful in a single producer/consumer scenario so that the consumer can
   192  // safely retry items without contending with the producer and potentially enqueueing
   193  // stale items.
   194  func (h *Heap) AddIfNotPresent(obj interface{}) error {
   195  	id, err := h.data.keyFunc(obj)
   196  	if err != nil {
   197  		return KeyError{obj, err}
   198  	}
   199  	h.lock.Lock()
   200  	defer h.lock.Unlock()
   201  	if h.closed {
   202  		return fmt.Errorf(closedMsg)
   203  	}
   204  	h.addIfNotPresentLocked(id, obj)
   205  	h.cond.Broadcast()
   206  	return nil
   207  }
   208  
   209  // addIfNotPresentLocked assumes the lock is already held and adds the provided
   210  // item to the queue if it does not already exist.
   211  func (h *Heap) addIfNotPresentLocked(key string, obj interface{}) {
   212  	if _, exists := h.data.items[key]; exists {
   213  		return
   214  	}
   215  	heap.Push(h.data, &itemKeyValue{key, obj})
   216  }
   217  
   218  // Update is the same as Add in this implementation. When the item does not
   219  // exist, it is added.
   220  func (h *Heap) Update(obj interface{}) error {
   221  	return h.Add(obj)
   222  }
   223  
   224  // Delete removes an item.
   225  func (h *Heap) Delete(obj interface{}) error {
   226  	key, err := h.data.keyFunc(obj)
   227  	if err != nil {
   228  		return KeyError{obj, err}
   229  	}
   230  	h.lock.Lock()
   231  	defer h.lock.Unlock()
   232  	if item, ok := h.data.items[key]; ok {
   233  		heap.Remove(h.data, item.index)
   234  		return nil
   235  	}
   236  	return fmt.Errorf("object not found")
   237  }
   238  
   239  // Pop waits until an item is ready. If multiple items are
   240  // ready, they are returned in the order given by Heap.data.lessFunc.
   241  func (h *Heap) Pop() (interface{}, error) {
   242  	h.lock.Lock()
   243  	defer h.lock.Unlock()
   244  	for len(h.data.queue) == 0 {
   245  		// When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
   246  		// When Close() is called, the h.closed is set and the condition is broadcast,
   247  		// which causes this loop to continue and return from the Pop().
   248  		if h.closed {
   249  			return nil, fmt.Errorf("heap is closed")
   250  		}
   251  		h.cond.Wait()
   252  	}
   253  	obj := heap.Pop(h.data)
   254  	if obj == nil {
   255  		return nil, fmt.Errorf("object was removed from heap data")
   256  	}
   257  
   258  	return obj, nil
   259  }
   260  
   261  // List returns a list of all the items.
   262  func (h *Heap) List() []interface{} {
   263  	h.lock.RLock()
   264  	defer h.lock.RUnlock()
   265  	list := make([]interface{}, 0, len(h.data.items))
   266  	for _, item := range h.data.items {
   267  		list = append(list, item.obj)
   268  	}
   269  	return list
   270  }
   271  
   272  // ListKeys returns a list of all the keys of the objects currently in the Heap.
   273  func (h *Heap) ListKeys() []string {
   274  	h.lock.RLock()
   275  	defer h.lock.RUnlock()
   276  	list := make([]string, 0, len(h.data.items))
   277  	for key := range h.data.items {
   278  		list = append(list, key)
   279  	}
   280  	return list
   281  }
   282  
   283  // Get returns the requested item, or sets exists=false.
   284  func (h *Heap) Get(obj interface{}) (interface{}, bool, error) {
   285  	key, err := h.data.keyFunc(obj)
   286  	if err != nil {
   287  		return nil, false, KeyError{obj, err}
   288  	}
   289  	return h.GetByKey(key)
   290  }
   291  
   292  // GetByKey returns the requested item, or sets exists=false.
   293  func (h *Heap) GetByKey(key string) (interface{}, bool, error) {
   294  	h.lock.RLock()
   295  	defer h.lock.RUnlock()
   296  	item, exists := h.data.items[key]
   297  	if !exists {
   298  		return nil, false, nil
   299  	}
   300  	return item.obj, true, nil
   301  }
   302  
   303  // IsClosed returns true if the queue is closed.
   304  func (h *Heap) IsClosed() bool {
   305  	h.lock.RLock()
   306  	defer h.lock.RUnlock()
   307  	return h.closed
   308  }
   309  
   310  // NewHeap returns a Heap which can be used to queue up items to process.
   311  func NewHeap(keyFn KeyFunc, lessFn LessFunc) *Heap {
   312  	h := &Heap{
   313  		data: &heapData{
   314  			items:    map[string]*heapItem{},
   315  			queue:    []string{},
   316  			keyFunc:  keyFn,
   317  			lessFunc: lessFn,
   318  		},
   319  	}
   320  	h.cond.L = &h.lock
   321  	return h
   322  }
   323  

View as plain text