...

Package workqueue

import "k8s.io/client-go/util/workqueue"
Overview
Index

Overview ▾

Package workqueue provides a simple queue that supports the following features:

  • Fair: items processed in the order in which they are added.
  • Stingy: a single item will not be processed multiple times concurrently, and if an item is added multiple times before it can be processed, it will only be processed once.
  • Multiple consumers and producers. In particular, it is allowed for an item to be reenqueued while it is being processed.
  • Shutdown notifications.

Index ▾

func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options)
func SetProvider(metricsProvider MetricsProvider)
func WithChunkSize(c int) func(*options)
type BucketRateLimiter
    func (r *BucketRateLimiter) Forget(item interface{})
    func (r *BucketRateLimiter) NumRequeues(item interface{}) int
    func (r *BucketRateLimiter) When(item interface{}) time.Duration
type CounterMetric
type DelayingInterface
    func NewDelayingQueue() DelayingInterface
    func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface
    func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface
    func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface
    func NewNamedDelayingQueue(name string) DelayingInterface
type DelayingQueueConfig
type DoWorkPieceFunc
type GaugeMetric
type HistogramMetric
type Interface
type ItemExponentialFailureRateLimiter
    func (r *ItemExponentialFailureRateLimiter) Forget(item interface{})
    func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int
    func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration
type ItemFastSlowRateLimiter
    func (r *ItemFastSlowRateLimiter) Forget(item interface{})
    func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int
    func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration
type MaxOfRateLimiter
    func (r *MaxOfRateLimiter) Forget(item interface{})
    func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int
    func (r *MaxOfRateLimiter) When(item interface{}) time.Duration
type MetricsProvider
type Options
type QueueConfig
type RateLimiter
    func DefaultControllerRateLimiter() RateLimiter
    func DefaultItemBasedRateLimiter() RateLimiter
    func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter
    func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter
    func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter
    func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter
type RateLimitingInterface
    func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface
    func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface
    func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface
    func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface
type RateLimitingQueueConfig
type SettableGaugeMetric
type SummaryMetric
type Type
    func New() *Type
    func NewNamed(name string) *Type
    func NewWithConfig(config QueueConfig) *Type
    func (q *Type) Add(item interface{})
    func (q *Type) Done(item interface{})
    func (q *Type) Get() (item interface{}, shutdown bool)
    func (q *Type) Len() int
    func (q *Type) ShutDown()
    func (q *Type) ShutDownWithDrain()
    func (q *Type) ShuttingDown() bool
type WithMaxWaitRateLimiter
    func (w WithMaxWaitRateLimiter) Forget(item interface{})
    func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int
    func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration

Package files

default_rate_limiters.go delaying_queue.go doc.go metrics.go parallelizer.go queue.go rate_limiting_queue.go

func ParallelizeUntil

func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc, opts ...Options)

ParallelizeUntil is a framework that allows for parallelizing N independent pieces of work until done or the context is canceled.

func SetProvider

func SetProvider(metricsProvider MetricsProvider)

SetProvider sets the metrics provider for all subsequently created work queues. Only the first call has an effect.

func WithChunkSize

func WithChunkSize(c int) func(*options)

WithChunkSize allows to set chunks of work items to the workers, rather than processing one by one. It is recommended to use this option if the number of pieces significantly higher than the number of workers and the work done for each item is small.

type BucketRateLimiter

BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API

type BucketRateLimiter struct {
    *rate.Limiter
}

func (*BucketRateLimiter) Forget

func (r *BucketRateLimiter) Forget(item interface{})

func (*BucketRateLimiter) NumRequeues

func (r *BucketRateLimiter) NumRequeues(item interface{}) int

func (*BucketRateLimiter) When

func (r *BucketRateLimiter) When(item interface{}) time.Duration

type CounterMetric

CounterMetric represents a single numerical value that only ever goes up.

type CounterMetric interface {
    Inc()
}

type DelayingInterface

DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to requeue items after failures without ending up in a hot-loop.

type DelayingInterface interface {
    Interface
    // AddAfter adds an item to the workqueue after the indicated duration has passed
    AddAfter(item interface{}, duration time.Duration)
}

func NewDelayingQueue

func NewDelayingQueue() DelayingInterface

NewDelayingQueue constructs a new workqueue with delayed queuing ability. NewDelayingQueue does not emit metrics. For use with a MetricsProvider, please use NewDelayingQueueWithConfig instead and specify a name.

func NewDelayingQueueWithConfig

func NewDelayingQueueWithConfig(config DelayingQueueConfig) DelayingInterface

NewDelayingQueueWithConfig constructs a new workqueue with options to customize different properties.

func NewDelayingQueueWithCustomClock

func NewDelayingQueueWithCustomClock(clock clock.WithTicker, name string) DelayingInterface

NewDelayingQueueWithCustomClock constructs a new named workqueue with ability to inject real or fake clock for testing purposes. Deprecated: Use NewDelayingQueueWithConfig instead.

func NewDelayingQueueWithCustomQueue

func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface

NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to inject custom queue Interface instead of the default one Deprecated: Use NewDelayingQueueWithConfig instead.

func NewNamedDelayingQueue

func NewNamedDelayingQueue(name string) DelayingInterface

NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability. Deprecated: Use NewDelayingQueueWithConfig instead.

type DelayingQueueConfig

DelayingQueueConfig specifies optional configurations to customize a DelayingInterface.

type DelayingQueueConfig struct {
    // Name for the queue. If unnamed, the metrics will not be registered.
    Name string

    // MetricsProvider optionally allows specifying a metrics provider to use for the queue
    // instead of the global provider.
    MetricsProvider MetricsProvider

    // Clock optionally allows injecting a real or fake clock for testing purposes.
    Clock clock.WithTicker

    // Queue optionally allows injecting custom queue Interface instead of the default one.
    Queue Interface
}

type DoWorkPieceFunc

type DoWorkPieceFunc func(piece int)

type GaugeMetric

GaugeMetric represents a single numerical value that can arbitrarily go up and down.

type GaugeMetric interface {
    Inc()
    Dec()
}

type HistogramMetric

HistogramMetric counts individual observations.

type HistogramMetric interface {
    Observe(float64)
}

type Interface

type Interface interface {
    Add(item interface{})
    Len() int
    Get() (item interface{}, shutdown bool)
    Done(item interface{})
    ShutDown()
    ShutDownWithDrain()
    ShuttingDown() bool
}

type ItemExponentialFailureRateLimiter

ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit dealing with max failures and expiration are up to the caller

type ItemExponentialFailureRateLimiter struct {
    // contains filtered or unexported fields
}

func (*ItemExponentialFailureRateLimiter) Forget

func (r *ItemExponentialFailureRateLimiter) Forget(item interface{})

func (*ItemExponentialFailureRateLimiter) NumRequeues

func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int

func (*ItemExponentialFailureRateLimiter) When

func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration

type ItemFastSlowRateLimiter

ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that

type ItemFastSlowRateLimiter struct {
    // contains filtered or unexported fields
}

func (*ItemFastSlowRateLimiter) Forget

func (r *ItemFastSlowRateLimiter) Forget(item interface{})

func (*ItemFastSlowRateLimiter) NumRequeues

func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int

func (*ItemFastSlowRateLimiter) When

func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration

type MaxOfRateLimiter

MaxOfRateLimiter calls every RateLimiter and returns the worst case response When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items were separately delayed a longer time.

type MaxOfRateLimiter struct {
    // contains filtered or unexported fields
}

func (*MaxOfRateLimiter) Forget

func (r *MaxOfRateLimiter) Forget(item interface{})

func (*MaxOfRateLimiter) NumRequeues

func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int

func (*MaxOfRateLimiter) When

func (r *MaxOfRateLimiter) When(item interface{}) time.Duration

type MetricsProvider

MetricsProvider generates various metrics used by the queue.

type MetricsProvider interface {
    NewDepthMetric(name string) GaugeMetric
    NewAddsMetric(name string) CounterMetric
    NewLatencyMetric(name string) HistogramMetric
    NewWorkDurationMetric(name string) HistogramMetric
    NewUnfinishedWorkSecondsMetric(name string) SettableGaugeMetric
    NewLongestRunningProcessorSecondsMetric(name string) SettableGaugeMetric
    NewRetriesMetric(name string) CounterMetric
}

type Options

type Options func(*options)

type QueueConfig

QueueConfig specifies optional configurations to customize an Interface.

type QueueConfig struct {
    // Name for the queue. If unnamed, the metrics will not be registered.
    Name string

    // MetricsProvider optionally allows specifying a metrics provider to use for the queue
    // instead of the global provider.
    MetricsProvider MetricsProvider

    // Clock ability to inject real or fake clock for testing purposes.
    Clock clock.WithTicker
}

type RateLimiter

type RateLimiter interface {
    // When gets an item and gets to decide how long that item should wait
    When(item interface{}) time.Duration
    // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for failing
    // or for success, we'll stop tracking it
    Forget(item interface{})
    // NumRequeues returns back how many failures the item has had
    NumRequeues(item interface{}) int
}

func DefaultControllerRateLimiter

func DefaultControllerRateLimiter() RateLimiter

DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential

func DefaultItemBasedRateLimiter

func DefaultItemBasedRateLimiter() RateLimiter

func NewItemExponentialFailureRateLimiter

func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter

func NewItemFastSlowRateLimiter

func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter

func NewMaxOfRateLimiter

func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter

func NewWithMaxWaitRateLimiter

func NewWithMaxWaitRateLimiter(limiter RateLimiter, maxDelay time.Duration) RateLimiter

type RateLimitingInterface

RateLimitingInterface is an interface that rate limits items being added to the queue.

type RateLimitingInterface interface {
    DelayingInterface

    // AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
    AddRateLimited(item interface{})

    // Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
    // or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
    // still have to call `Done` on the queue.
    Forget(item interface{})

    // NumRequeues returns back how many times the item was requeued
    NumRequeues(item interface{}) int
}

func NewNamedRateLimitingQueue

func NewNamedRateLimitingQueue(rateLimiter RateLimiter, name string) RateLimitingInterface

NewNamedRateLimitingQueue constructs a new named workqueue with rateLimited queuing ability. Deprecated: Use NewRateLimitingQueueWithConfig instead.

func NewRateLimitingQueue

func NewRateLimitingQueue(rateLimiter RateLimiter) RateLimitingInterface

NewRateLimitingQueue constructs a new workqueue with rateLimited queuing ability Remember to call Forget! If you don't, you may end up tracking failures forever. NewRateLimitingQueue does not emit metrics. For use with a MetricsProvider, please use NewRateLimitingQueueWithConfig instead and specify a name.

func NewRateLimitingQueueWithConfig

func NewRateLimitingQueueWithConfig(rateLimiter RateLimiter, config RateLimitingQueueConfig) RateLimitingInterface

NewRateLimitingQueueWithConfig constructs a new workqueue with rateLimited queuing ability with options to customize different properties. Remember to call Forget! If you don't, you may end up tracking failures forever.

func NewRateLimitingQueueWithDelayingInterface

func NewRateLimitingQueueWithDelayingInterface(di DelayingInterface, rateLimiter RateLimiter) RateLimitingInterface

NewRateLimitingQueueWithDelayingInterface constructs a new named workqueue with rateLimited queuing ability with the option to inject a custom delaying queue instead of the default one. Deprecated: Use NewRateLimitingQueueWithConfig instead.

type RateLimitingQueueConfig

type RateLimitingQueueConfig struct {
    // Name for the queue. If unnamed, the metrics will not be registered.
    Name string

    // MetricsProvider optionally allows specifying a metrics provider to use for the queue
    // instead of the global provider.
    MetricsProvider MetricsProvider

    // Clock optionally allows injecting a real or fake clock for testing purposes.
    Clock clock.WithTicker

    // DelayingQueue optionally allows injecting custom delaying queue DelayingInterface instead of the default one.
    DelayingQueue DelayingInterface
}

type SettableGaugeMetric

SettableGaugeMetric represents a single numerical value that can arbitrarily go up and down. (Separate from GaugeMetric to preserve backwards compatibility.)

type SettableGaugeMetric interface {
    Set(float64)
}

type SummaryMetric

SummaryMetric captures individual observations.

type SummaryMetric interface {
    Observe(float64)
}

type Type

Type is a work queue (see the package comment).

type Type struct {
    // contains filtered or unexported fields
}

func New

func New() *Type

New constructs a new work queue (see the package comment).

func NewNamed

func NewNamed(name string) *Type

NewNamed creates a new named queue. Deprecated: Use NewWithConfig instead.

func NewWithConfig

func NewWithConfig(config QueueConfig) *Type

NewWithConfig constructs a new workqueue with ability to customize different properties.

func (*Type) Add

func (q *Type) Add(item interface{})

Add marks item as needing processing.

func (*Type) Done

func (q *Type) Done(item interface{})

Done marks item as done processing, and if it has been marked as dirty again while it was being processed, it will be re-added to the queue for re-processing.

func (*Type) Get

func (q *Type) Get() (item interface{}, shutdown bool)

Get blocks until it can return an item to be processed. If shutdown = true, the caller should end their goroutine. You must call Done with item when you have finished processing it.

func (*Type) Len

func (q *Type) Len() int

Len returns the current queue length, for informational purposes only. You shouldn't e.g. gate a call to Add() or Get() on Len() being a particular value, that can't be synchronized properly.

func (*Type) ShutDown

func (q *Type) ShutDown()

ShutDown will cause q to ignore all new items added to it and immediately instruct the worker goroutines to exit.

func (*Type) ShutDownWithDrain

func (q *Type) ShutDownWithDrain()

ShutDownWithDrain will cause q to ignore all new items added to it. As soon as the worker goroutines have "drained", i.e: finished processing and called Done on all existing items in the queue; they will be instructed to exit and ShutDownWithDrain will return. Hence: a strict requirement for using this is; your workers must ensure that Done is called on all items in the queue once the shut down has been initiated, if that is not the case: this will block indefinitely. It is, however, safe to call ShutDown after having called ShutDownWithDrain, as to force the queue shut down to terminate immediately without waiting for the drainage.

func (*Type) ShuttingDown

func (q *Type) ShuttingDown() bool

type WithMaxWaitRateLimiter

WithMaxWaitRateLimiter have maxDelay which avoids waiting too long

type WithMaxWaitRateLimiter struct {
    // contains filtered or unexported fields
}

func (WithMaxWaitRateLimiter) Forget

func (w WithMaxWaitRateLimiter) Forget(item interface{})

func (WithMaxWaitRateLimiter) NumRequeues

func (w WithMaxWaitRateLimiter) NumRequeues(item interface{}) int

func (WithMaxWaitRateLimiter) When

func (w WithMaxWaitRateLimiter) When(item interface{}) time.Duration