...

Source file src/k8s.io/client-go/util/workqueue/delaying_queue.go

Documentation: k8s.io/client-go/util/workqueue

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package workqueue
    18  
    19  import (
    20  	"container/heap"
    21  	"sync"
    22  	"time"
    23  
    24  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    25  	"k8s.io/utils/clock"
    26  )
    27  
    28  // DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
    29  // requeue items after failures without ending up in a hot-loop.
    30  type DelayingInterface interface {
    31  	Interface
    32  	// AddAfter adds an item to the workqueue after the indicated duration has passed
    33  	AddAfter(item interface{}, duration time.Duration)
    34  }
    35  
    36  // DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.
    37  type DelayingQueueConfig struct {
    38  	// Name for the queue. If unnamed, the metrics will not be registered.
    39  	Name string
    40  
    41  	// MetricsProvider optionally allows specifying a metrics provider to use for the queue
    42  	// instead of the global provider.
    43  	MetricsProvider MetricsProvider
    44  
    45  	// Clock optionally allows injecting a real or fake clock for testing purposes.
    46  	Clock clock.WithTicker
    47  
    48  	// Queue optionally allows injecting custom queue Interface instead of the default one.
    49  	Queue Interface
    50  }
    51  
    52  // NewDelayingQueue constructs a new workqueue with delayed queuing ability.
    53  // NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use
    54  // NewDelayingQueueWithConfig instead and specify a name.
    55  func NewDelayingQueue() DelayingInterface {
    56  	return NewDelayingQueueWithConfig(DelayingQueueConfig{})
    57  }
    58  
    59  // NewDelayingQueueWithConfig constructs a new workqueue with options to
    60  // customize different properties.
    61  func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface {
    62  	if config.Clock == nil {
    63  		config.Clock = clock.RealClock{}
    64  	}
    65  
    66  	if config.Queue == nil {
    67  		config.Queue = NewWithConfig(QueueConfig{
    68  			Name:            config.Name,
    69  			MetricsProvider: config.MetricsProvider,
    70  			Clock:           config.Clock,
    71  		})
    72  	}
    73  
    74  	return newDelayingQueue(config.Clock, config.Queue, config.Name, config.MetricsProvider)
    75  }
    76  
    77  // NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
    78  // inject custom queue Interface instead of the default one
    79  // Deprecated: Use NewDelayingQueueWithConfig instead.
    80  func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
    81  	return NewDelayingQueueWithConfig(DelayingQueueConfig{
    82  		Name:  name,
    83  		Queue: q,
    84  	})
    85  }
    86  
    87  // NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability.
    88  // Deprecated: Use NewDelayingQueueWithConfig instead.
    89  func NewNamedDelayingQueue(name string) DelayingInterface {
    90  	return NewDelayingQueueWithConfig(DelayingQueueConfig{Name: name})
    91  }
    92  
    93  // NewDelayingQueueWithCustomClock constructs a new named workqueue
    94  // with ability to inject real or fake clock for testing purposes.
    95  // Deprecated: Use NewDelayingQueueWithConfig instead.
    96  func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface {
    97  	return NewDelayingQueueWithConfig(DelayingQueueConfig{
    98  		Name:  name,
    99  		Clock: clock,
   100  	})
   101  }
   102  
   103  func newDelayingQueue(clock clock.WithTicker, q Interface, name string, provider MetricsProvider) *delayingType {
   104  	ret := &delayingType{
   105  		Interface:       q,
   106  		clock:           clock,
   107  		heartbeat:       clock.NewTicker(maxWait),
   108  		stopCh:          make(chan struct{}),
   109  		waitingForAddCh: make(chan *waitFor, 1000),
   110  		metrics:         newRetryMetrics(name, provider),
   111  	}
   112  
   113  	go ret.waitingLoop()
   114  	return ret
   115  }
   116  
   117  // delayingType wraps an Interface and provides delayed re-enquing
   118  type delayingType struct {
   119  	Interface
   120  
   121  	// clock tracks time for delayed firing
   122  	clock clock.Clock
   123  
   124  	// stopCh lets us signal a shutdown to the waiting loop
   125  	stopCh chan struct{}
   126  	// stopOnce guarantees we only signal shutdown a single time
   127  	stopOnce sync.Once
   128  
   129  	// heartbeat ensures we wait no more than maxWait before firing
   130  	heartbeat clock.Ticker
   131  
   132  	// waitingForAddCh is a buffered channel that feeds waitingForAdd
   133  	waitingForAddCh chan *waitFor
   134  
   135  	// metrics counts the number of retries
   136  	metrics retryMetrics
   137  }
   138  
   139  // waitFor holds the data to add and the time it should be added
   140  type waitFor struct {
   141  	data    t
   142  	readyAt time.Time
   143  	// index in the priority queue (heap)
   144  	index int
   145  }
   146  
   147  // waitForPriorityQueue implements a priority queue for waitFor items.
   148  //
   149  // waitForPriorityQueue implements heap.Interface. The item occurring next in
   150  // time (i.e., the item with the smallest readyAt) is at the root (index 0).
   151  // Peek returns this minimum item at index 0. Pop returns the minimum item after
   152  // it has been removed from the queue and placed at index Len()-1 by
   153  // container/heap. Push adds an item at index Len(), and container/heap
   154  // percolates it into the correct location.
   155  type waitForPriorityQueue []*waitFor
   156  
   157  func (pq waitForPriorityQueue) Len() int {
   158  	return len(pq)
   159  }
   160  func (pq waitForPriorityQueue) Less(i, j int) bool {
   161  	return pq[i].readyAt.Before(pq[j].readyAt)
   162  }
   163  func (pq waitForPriorityQueue) Swap(i, j int) {
   164  	pq[i], pq[j] = pq[j], pq[i]
   165  	pq[i].index = i
   166  	pq[j].index = j
   167  }
   168  
   169  // Push adds an item to the queue. Push should not be called directly; instead,
   170  // use `heap.Push`.
   171  func (pq *waitForPriorityQueue) Push(x interface{}) {
   172  	n := len(*pq)
   173  	item := x.(*waitFor)
   174  	item.index = n
   175  	*pq = append(*pq, item)
   176  }
   177  
   178  // Pop removes an item from the queue. Pop should not be called directly;
   179  // instead, use `heap.Pop`.
   180  func (pq *waitForPriorityQueue) Pop() interface{} {
   181  	n := len(*pq)
   182  	item := (*pq)[n-1]
   183  	item.index = -1
   184  	*pq = (*pq)[0:(n - 1)]
   185  	return item
   186  }
   187  
   188  // Peek returns the item at the beginning of the queue, without removing the
   189  // item or otherwise mutating the queue. It is safe to call directly.
   190  func (pq waitForPriorityQueue) Peek() interface{} {
   191  	return pq[0]
   192  }
   193  
   194  // ShutDown stops the queue. After the queue drains, the returned shutdown bool
   195  // on Get() will be true. This method may be invoked more than once.
   196  func (q *delayingType) ShutDown() {
   197  	q.stopOnce.Do(func() {
   198  		q.Interface.ShutDown()
   199  		close(q.stopCh)
   200  		q.heartbeat.Stop()
   201  	})
   202  }
   203  
   204  // AddAfter adds the given item to the work queue after the given delay
   205  func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
   206  	// don't add if we're already shutting down
   207  	if q.ShuttingDown() {
   208  		return
   209  	}
   210  
   211  	q.metrics.retry()
   212  
   213  	// immediately add things with no delay
   214  	if duration <= 0 {
   215  		q.Add(item)
   216  		return
   217  	}
   218  
   219  	select {
   220  	case <-q.stopCh:
   221  		// unblock if ShutDown() is called
   222  	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
   223  	}
   224  }
   225  
   226  // maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
   227  // Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
   228  // expired item sitting for more than 10 seconds.
   229  const maxWait = 10 * time.Second
   230  
   231  // waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
   232  func (q *delayingType) waitingLoop() {
   233  	defer utilruntime.HandleCrash()
   234  
   235  	// Make a placeholder channel to use when there are no items in our list
   236  	never := make(<-chan time.Time)
   237  
   238  	// Make a timer that expires when the item at the head of the waiting queue is ready
   239  	var nextReadyAtTimer clock.Timer
   240  
   241  	waitingForQueue := &waitForPriorityQueue{}
   242  	heap.Init(waitingForQueue)
   243  
   244  	waitingEntryByData := map[t]*waitFor{}
   245  
   246  	for {
   247  		if q.Interface.ShuttingDown() {
   248  			return
   249  		}
   250  
   251  		now := q.clock.Now()
   252  
   253  		// Add ready entries
   254  		for waitingForQueue.Len() > 0 {
   255  			entry := waitingForQueue.Peek().(*waitFor)
   256  			if entry.readyAt.After(now) {
   257  				break
   258  			}
   259  
   260  			entry = heap.Pop(waitingForQueue).(*waitFor)
   261  			q.Add(entry.data)
   262  			delete(waitingEntryByData, entry.data)
   263  		}
   264  
   265  		// Set up a wait for the first item's readyAt (if one exists)
   266  		nextReadyAt := never
   267  		if waitingForQueue.Len() > 0 {
   268  			if nextReadyAtTimer != nil {
   269  				nextReadyAtTimer.Stop()
   270  			}
   271  			entry := waitingForQueue.Peek().(*waitFor)
   272  			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
   273  			nextReadyAt = nextReadyAtTimer.C()
   274  		}
   275  
   276  		select {
   277  		case <-q.stopCh:
   278  			return
   279  
   280  		case <-q.heartbeat.C():
   281  			// continue the loop, which will add ready items
   282  
   283  		case <-nextReadyAt:
   284  			// continue the loop, which will add ready items
   285  
   286  		case waitEntry := <-q.waitingForAddCh:
   287  			if waitEntry.readyAt.After(q.clock.Now()) {
   288  				insert(waitingForQueue, waitingEntryByData, waitEntry)
   289  			} else {
   290  				q.Add(waitEntry.data)
   291  			}
   292  
   293  			drained := false
   294  			for !drained {
   295  				select {
   296  				case waitEntry := <-q.waitingForAddCh:
   297  					if waitEntry.readyAt.After(q.clock.Now()) {
   298  						insert(waitingForQueue, waitingEntryByData, waitEntry)
   299  					} else {
   300  						q.Add(waitEntry.data)
   301  					}
   302  				default:
   303  					drained = true
   304  				}
   305  			}
   306  		}
   307  	}
   308  }
   309  
   310  // insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
   311  func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
   312  	// if the entry already exists, update the time only if it would cause the item to be queued sooner
   313  	existing, exists := knownEntries[entry.data]
   314  	if exists {
   315  		if existing.readyAt.After(entry.readyAt) {
   316  			existing.readyAt = entry.readyAt
   317  			heap.Fix(q, existing.index)
   318  		}
   319  
   320  		return
   321  	}
   322  
   323  	heap.Push(q, entry)
   324  	knownEntries[entry.data] = entry
   325  }
   326  

View as plain text