...

Source file src/k8s.io/client-go/util/workqueue/queue.go

Documentation: k8s.io/client-go/util/workqueue

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package workqueue
    18  
    19  import (
    20  	"sync"
    21  	"time"
    22  
    23  	"k8s.io/utils/clock"
    24  )
    25  
    26  type Interface interface {
    27  	Add(item interface{})
    28  	Len() int
    29  	Get() (item interface{}, shutdown bool)
    30  	Done(item interface{})
    31  	ShutDown()
    32  	ShutDownWithDrain()
    33  	ShuttingDown() bool
    34  }
    35  
    36  // QueueConfig specifies optional configurations to customize an Interface.
    37  type QueueConfig struct {
    38  	// Name for the queue. If unnamed, the metrics will not be registered.
    39  	Name string
    40  
    41  	// MetricsProvider optionally allows specifying a metrics provider to use for the queue
    42  	// instead of the global provider.
    43  	MetricsProvider MetricsProvider
    44  
    45  	// Clock ability to inject real or fake clock for testing purposes.
    46  	Clock clock.WithTicker
    47  }
    48  
    49  // New constructs a new work queue (see the package comment).
    50  func New() *Type {
    51  	return NewWithConfig(QueueConfig{
    52  		Name: "",
    53  	})
    54  }
    55  
    56  // NewWithConfig constructs a new workqueue with ability to
    57  // customize different properties.
    58  func NewWithConfig(config QueueConfig) *Type {
    59  	return newQueueWithConfig(config, defaultUnfinishedWorkUpdatePeriod)
    60  }
    61  
    62  // NewNamed creates a new named queue.
    63  // Deprecated: Use NewWithConfig instead.
    64  func NewNamed(name string) *Type {
    65  	return NewWithConfig(QueueConfig{
    66  		Name: name,
    67  	})
    68  }
    69  
    70  // newQueueWithConfig constructs a new named workqueue
    71  // with the ability to customize different properties for testing purposes
    72  func newQueueWithConfig(config QueueConfig, updatePeriod time.Duration) *Type {
    73  	var metricsFactory *queueMetricsFactory
    74  	if config.MetricsProvider != nil {
    75  		metricsFactory = &queueMetricsFactory{
    76  			metricsProvider: config.MetricsProvider,
    77  		}
    78  	} else {
    79  		metricsFactory = &globalMetricsFactory
    80  	}
    81  
    82  	if config.Clock == nil {
    83  		config.Clock = clock.RealClock{}
    84  	}
    85  
    86  	return newQueue(
    87  		config.Clock,
    88  		metricsFactory.newQueueMetrics(config.Name, config.Clock),
    89  		updatePeriod,
    90  	)
    91  }
    92  
    93  func newQueue(c clock.WithTicker, metrics queueMetrics, updatePeriod time.Duration) *Type {
    94  	t := &Type{
    95  		clock:                      c,
    96  		dirty:                      set{},
    97  		processing:                 set{},
    98  		cond:                       sync.NewCond(&sync.Mutex{}),
    99  		metrics:                    metrics,
   100  		unfinishedWorkUpdatePeriod: updatePeriod,
   101  	}
   102  
   103  	// Don't start the goroutine for a type of noMetrics so we don't consume
   104  	// resources unnecessarily
   105  	if _, ok := metrics.(noMetrics); !ok {
   106  		go t.updateUnfinishedWorkLoop()
   107  	}
   108  
   109  	return t
   110  }
   111  
   112  const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
   113  
   114  // Type is a work queue (see the package comment).
   115  type Type struct {
   116  	// queue defines the order in which we will work on items. Every
   117  	// element of queue should be in the dirty set and not in the
   118  	// processing set.
   119  	queue []t
   120  
   121  	// dirty defines all of the items that need to be processed.
   122  	dirty set
   123  
   124  	// Things that are currently being processed are in the processing set.
   125  	// These things may be simultaneously in the dirty set. When we finish
   126  	// processing something and remove it from this set, we'll check if
   127  	// it's in the dirty set, and if so, add it to the queue.
   128  	processing set
   129  
   130  	cond *sync.Cond
   131  
   132  	shuttingDown bool
   133  	drain        bool
   134  
   135  	metrics queueMetrics
   136  
   137  	unfinishedWorkUpdatePeriod time.Duration
   138  	clock                      clock.WithTicker
   139  }
   140  
   141  type empty struct{}
   142  type t interface{}
   143  type set map[t]empty
   144  
   145  func (s set) has(item t) bool {
   146  	_, exists := s[item]
   147  	return exists
   148  }
   149  
   150  func (s set) insert(item t) {
   151  	s[item] = empty{}
   152  }
   153  
   154  func (s set) delete(item t) {
   155  	delete(s, item)
   156  }
   157  
   158  func (s set) len() int {
   159  	return len(s)
   160  }
   161  
   162  // Add marks item as needing processing.
   163  func (q *Type) Add(item interface{}) {
   164  	q.cond.L.Lock()
   165  	defer q.cond.L.Unlock()
   166  	if q.shuttingDown {
   167  		return
   168  	}
   169  	if q.dirty.has(item) {
   170  		return
   171  	}
   172  
   173  	q.metrics.add(item)
   174  
   175  	q.dirty.insert(item)
   176  	if q.processing.has(item) {
   177  		return
   178  	}
   179  
   180  	q.queue = append(q.queue, item)
   181  	q.cond.Signal()
   182  }
   183  
   184  // Len returns the current queue length, for informational purposes only. You
   185  // shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
   186  // value, that can't be synchronized properly.
   187  func (q *Type) Len() int {
   188  	q.cond.L.Lock()
   189  	defer q.cond.L.Unlock()
   190  	return len(q.queue)
   191  }
   192  
   193  // Get blocks until it can return an item to be processed. If shutdown = true,
   194  // the caller should end their goroutine. You must call Done with item when you
   195  // have finished processing it.
   196  func (q *Type) Get() (item interface{}, shutdown bool) {
   197  	q.cond.L.Lock()
   198  	defer q.cond.L.Unlock()
   199  	for len(q.queue) == 0 && !q.shuttingDown {
   200  		q.cond.Wait()
   201  	}
   202  	if len(q.queue) == 0 {
   203  		// We must be shutting down.
   204  		return nil, true
   205  	}
   206  
   207  	item = q.queue[0]
   208  	// The underlying array still exists and reference this object, so the object will not be garbage collected.
   209  	q.queue[0] = nil
   210  	q.queue = q.queue[1:]
   211  
   212  	q.metrics.get(item)
   213  
   214  	q.processing.insert(item)
   215  	q.dirty.delete(item)
   216  
   217  	return item, false
   218  }
   219  
   220  // Done marks item as done processing, and if it has been marked as dirty again
   221  // while it was being processed, it will be re-added to the queue for
   222  // re-processing.
   223  func (q *Type) Done(item interface{}) {
   224  	q.cond.L.Lock()
   225  	defer q.cond.L.Unlock()
   226  
   227  	q.metrics.done(item)
   228  
   229  	q.processing.delete(item)
   230  	if q.dirty.has(item) {
   231  		q.queue = append(q.queue, item)
   232  		q.cond.Signal()
   233  	} else if q.processing.len() == 0 {
   234  		q.cond.Signal()
   235  	}
   236  }
   237  
   238  // ShutDown will cause q to ignore all new items added to it and
   239  // immediately instruct the worker goroutines to exit.
   240  func (q *Type) ShutDown() {
   241  	q.cond.L.Lock()
   242  	defer q.cond.L.Unlock()
   243  
   244  	q.drain = false
   245  	q.shuttingDown = true
   246  	q.cond.Broadcast()
   247  }
   248  
   249  // ShutDownWithDrain will cause q to ignore all new items added to it. As soon
   250  // as the worker goroutines have "drained", i.e: finished processing and called
   251  // Done on all existing items in the queue; they will be instructed to exit and
   252  // ShutDownWithDrain will return. Hence: a strict requirement for using this is;
   253  // your workers must ensure that Done is called on all items in the queue once
   254  // the shut down has been initiated, if that is not the case: this will block
   255  // indefinitely. It is, however, safe to call ShutDown after having called
   256  // ShutDownWithDrain, as to force the queue shut down to terminate immediately
   257  // without waiting for the drainage.
   258  func (q *Type) ShutDownWithDrain() {
   259  	q.cond.L.Lock()
   260  	defer q.cond.L.Unlock()
   261  
   262  	q.drain = true
   263  	q.shuttingDown = true
   264  	q.cond.Broadcast()
   265  
   266  	for q.processing.len() != 0 && q.drain {
   267  		q.cond.Wait()
   268  	}
   269  }
   270  
   271  func (q *Type) ShuttingDown() bool {
   272  	q.cond.L.Lock()
   273  	defer q.cond.L.Unlock()
   274  
   275  	return q.shuttingDown
   276  }
   277  
   278  func (q *Type) updateUnfinishedWorkLoop() {
   279  	t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
   280  	defer t.Stop()
   281  	for range t.C() {
   282  		if !func() bool {
   283  			q.cond.L.Lock()
   284  			defer q.cond.L.Unlock()
   285  			if !q.shuttingDown {
   286  				q.metrics.updateUnfinishedWork()
   287  				return true
   288  			}
   289  			return false
   290  
   291  		}() {
   292  			return
   293  		}
   294  	}
   295  }
   296  

View as plain text