...

Source file src/k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler/rate_limited_queue.go

Documentation: k8s.io/kubernetes/pkg/controller/nodelifecycle/scheduler

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package scheduler
    18  
    19  import (
    20  	"container/heap"
    21  	"sync"
    22  	"time"
    23  
    24  	"k8s.io/apimachinery/pkg/util/sets"
    25  	"k8s.io/client-go/util/flowcontrol"
    26  	"k8s.io/klog/v2"
    27  )
    28  
    29  const (
    30  	// NodeHealthUpdateRetry controls the number of retries of writing
    31  	// node health update.
    32  	NodeHealthUpdateRetry = 5
    33  	// NodeEvictionPeriod controls how often NodeController will try to
    34  	// evict Pods from non-responsive Nodes.
    35  	NodeEvictionPeriod = 100 * time.Millisecond
    36  	// EvictionRateLimiterBurst is the burst value for all eviction rate
    37  	// limiters
    38  	EvictionRateLimiterBurst = 1
    39  )
    40  
    41  // TimedValue is a value that should be processed at a designated time.
    42  type TimedValue struct {
    43  	Value string
    44  	// UID could be anything that helps identify the value
    45  	UID       interface{}
    46  	AddedAt   time.Time
    47  	ProcessAt time.Time
    48  }
    49  
    50  // now is used to test time
    51  var now = time.Now
    52  
    53  // TimedQueue is a priority heap where the lowest ProcessAt is at the front of the queue
    54  type TimedQueue []*TimedValue
    55  
    56  // Len is the length of the queue.
    57  func (h TimedQueue) Len() int { return len(h) }
    58  
    59  // Less returns true if queue[i] < queue[j].
    60  func (h TimedQueue) Less(i, j int) bool { return h[i].ProcessAt.Before(h[j].ProcessAt) }
    61  
    62  // Swap swaps index i and j.
    63  func (h TimedQueue) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
    64  
    65  // Push a new TimedValue on to the queue.
    66  func (h *TimedQueue) Push(x interface{}) {
    67  	*h = append(*h, x.(*TimedValue))
    68  }
    69  
    70  // Pop the lowest ProcessAt item.
    71  func (h *TimedQueue) Pop() interface{} {
    72  	old := *h
    73  	n := len(old)
    74  	x := old[n-1]
    75  	*h = old[0 : n-1]
    76  	return x
    77  }
    78  
    79  // UniqueQueue is a FIFO queue which additionally guarantees that any
    80  // element can be added only once until it is removed.
    81  type UniqueQueue struct {
    82  	lock  sync.Mutex
    83  	queue TimedQueue
    84  	set   sets.String
    85  }
    86  
    87  // Add a new value to the queue if it wasn't added before, or was
    88  // explicitly removed by the Remove call. Returns true if new value
    89  // was added.
    90  func (q *UniqueQueue) Add(value TimedValue) bool {
    91  	q.lock.Lock()
    92  	defer q.lock.Unlock()
    93  
    94  	if q.set.Has(value.Value) {
    95  		return false
    96  	}
    97  	heap.Push(&q.queue, &value)
    98  	q.set.Insert(value.Value)
    99  	return true
   100  }
   101  
   102  // Replace replaces an existing value in the queue if it already
   103  // exists, otherwise it does nothing. Returns true if the item was
   104  // found.
   105  func (q *UniqueQueue) Replace(value TimedValue) bool {
   106  	q.lock.Lock()
   107  	defer q.lock.Unlock()
   108  
   109  	for i := range q.queue {
   110  		if q.queue[i].Value != value.Value {
   111  			continue
   112  		}
   113  		heap.Remove(&q.queue, i)
   114  		heap.Push(&q.queue, &value)
   115  		return true
   116  	}
   117  	return false
   118  }
   119  
   120  // RemoveFromQueue the value from the queue, but keeps it in the set,
   121  // so it won't be added second time. Returns true if something was
   122  // removed.
   123  func (q *UniqueQueue) RemoveFromQueue(value string) bool {
   124  	q.lock.Lock()
   125  	defer q.lock.Unlock()
   126  
   127  	if !q.set.Has(value) {
   128  		return false
   129  	}
   130  	for i, val := range q.queue {
   131  		if val.Value == value {
   132  			heap.Remove(&q.queue, i)
   133  			return true
   134  		}
   135  	}
   136  	return false
   137  }
   138  
   139  // Remove the value from the queue, so Get() call won't return it, and
   140  // allow subsequent addition of the given value. If the value is not
   141  // present does nothing and returns false.
   142  func (q *UniqueQueue) Remove(value string) bool {
   143  	q.lock.Lock()
   144  	defer q.lock.Unlock()
   145  
   146  	if !q.set.Has(value) {
   147  		return false
   148  	}
   149  	q.set.Delete(value)
   150  	for i, val := range q.queue {
   151  		if val.Value == value {
   152  			heap.Remove(&q.queue, i)
   153  			return true
   154  		}
   155  	}
   156  	return true
   157  }
   158  
   159  // Get returns the oldest added value that wasn't returned yet.
   160  func (q *UniqueQueue) Get() (TimedValue, bool) {
   161  	q.lock.Lock()
   162  	defer q.lock.Unlock()
   163  	if len(q.queue) == 0 {
   164  		return TimedValue{}, false
   165  	}
   166  	result := heap.Pop(&q.queue).(*TimedValue)
   167  	q.set.Delete(result.Value)
   168  	return *result, true
   169  }
   170  
   171  // Head returns the oldest added value that wasn't returned yet
   172  // without removing it.
   173  func (q *UniqueQueue) Head() (TimedValue, bool) {
   174  	q.lock.Lock()
   175  	defer q.lock.Unlock()
   176  	if len(q.queue) == 0 {
   177  		return TimedValue{}, false
   178  	}
   179  	result := q.queue[0]
   180  	return *result, true
   181  }
   182  
   183  // Clear removes all items from the queue and duplication preventing
   184  // set.
   185  func (q *UniqueQueue) Clear() {
   186  	q.lock.Lock()
   187  	defer q.lock.Unlock()
   188  	if q.queue.Len() > 0 {
   189  		q.queue = make(TimedQueue, 0)
   190  	}
   191  	if len(q.set) > 0 {
   192  		q.set = sets.NewString()
   193  	}
   194  }
   195  
   196  // RateLimitedTimedQueue is a unique item priority queue ordered by
   197  // the expected next time of execution. It is also rate limited.
   198  type RateLimitedTimedQueue struct {
   199  	queue       UniqueQueue
   200  	limiterLock sync.Mutex
   201  	limiter     flowcontrol.RateLimiter
   202  }
   203  
   204  // NewRateLimitedTimedQueue creates new queue which will use given
   205  // RateLimiter to oversee execution.
   206  func NewRateLimitedTimedQueue(limiter flowcontrol.RateLimiter) *RateLimitedTimedQueue {
   207  	return &RateLimitedTimedQueue{
   208  		queue: UniqueQueue{
   209  			queue: TimedQueue{},
   210  			set:   sets.NewString(),
   211  		},
   212  		limiter: limiter,
   213  	}
   214  }
   215  
   216  // ActionFunc takes a timed value and returns false if the item must
   217  // be retried, with an optional time.Duration if some minimum wait
   218  // interval should be used.
   219  type ActionFunc func(TimedValue) (bool, time.Duration)
   220  
   221  // Try processes the queue.Ends prematurely if RateLimiter forbids an
   222  // action and leak is true. Otherwise, requeues the item to be
   223  // processed. Each value is processed once if fn returns true,
   224  // otherwise it is added back to the queue. The returned remaining is
   225  // used to identify the minimum time to execute the next item in the
   226  // queue. The same value is processed only once unless Remove is
   227  // explicitly called on it (it's done by the cancelPodEviction
   228  // function in NodeController when Node becomes Ready again) TODO:
   229  // figure out a good way to do garbage collection for all Nodes that
   230  // were removed from the cluster.
   231  func (q *RateLimitedTimedQueue) Try(logger klog.Logger, fn ActionFunc) {
   232  	val, ok := q.queue.Head()
   233  	q.limiterLock.Lock()
   234  	defer q.limiterLock.Unlock()
   235  	for ok {
   236  		// rate limit the queue checking
   237  		if !q.limiter.TryAccept() {
   238  			logger.V(10).Info("Try rate limited", "value", val)
   239  			// Try again later
   240  			break
   241  		}
   242  
   243  		now := now()
   244  		if now.Before(val.ProcessAt) {
   245  			break
   246  		}
   247  
   248  		if ok, wait := fn(val); !ok {
   249  			val.ProcessAt = now.Add(wait + 1)
   250  			q.queue.Replace(val)
   251  		} else {
   252  			q.queue.RemoveFromQueue(val.Value)
   253  		}
   254  		val, ok = q.queue.Head()
   255  	}
   256  }
   257  
   258  // Add value to the queue to be processed. Won't add the same
   259  // value(comparison by value) a second time if it was already added
   260  // and not removed.
   261  func (q *RateLimitedTimedQueue) Add(value string, uid interface{}) bool {
   262  	now := now()
   263  	return q.queue.Add(TimedValue{
   264  		Value:     value,
   265  		UID:       uid,
   266  		AddedAt:   now,
   267  		ProcessAt: now,
   268  	})
   269  }
   270  
   271  // Remove Node from the Evictor. The Node won't be processed until
   272  // added again.
   273  func (q *RateLimitedTimedQueue) Remove(value string) bool {
   274  	return q.queue.Remove(value)
   275  }
   276  
   277  // Clear removes all items from the queue
   278  func (q *RateLimitedTimedQueue) Clear() {
   279  	q.queue.Clear()
   280  }
   281  
   282  // SwapLimiter safely swaps current limiter for this queue with the
   283  // passed one if capacities or qps's differ.
   284  func (q *RateLimitedTimedQueue) SwapLimiter(newQPS float32) {
   285  	q.limiterLock.Lock()
   286  	defer q.limiterLock.Unlock()
   287  	if q.limiter.QPS() == newQPS {
   288  		return
   289  	}
   290  	var newLimiter flowcontrol.RateLimiter
   291  	if newQPS <= 0 {
   292  		newLimiter = flowcontrol.NewFakeNeverRateLimiter()
   293  	} else {
   294  		newLimiter = flowcontrol.NewTokenBucketRateLimiter(newQPS, EvictionRateLimiterBurst)
   295  
   296  		// If we're currently waiting on limiter, we drain the new one - this is a good approach when Burst value is 1
   297  		// TODO: figure out if we need to support higher Burst values and decide on the drain logic, should we keep:
   298  		// - saturation (percentage of used tokens)
   299  		// - number of used tokens
   300  		// - number of available tokens
   301  		// - something else
   302  		if q.limiter.TryAccept() == false {
   303  			newLimiter.TryAccept()
   304  		}
   305  	}
   306  	q.limiter.Stop()
   307  	q.limiter = newLimiter
   308  }
   309  

View as plain text