...

Source file src/k8s.io/kubernetes/pkg/controller/tainteviction/timed_workers.go

Documentation: k8s.io/kubernetes/pkg/controller/tainteviction

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package tainteviction
    18  
    19  import (
    20  	"context"
    21  	"sync"
    22  	"time"
    23  
    24  	"k8s.io/apimachinery/pkg/types"
    25  	"k8s.io/klog/v2"
    26  	"k8s.io/utils/clock"
    27  )
    28  
    29  // WorkArgs keeps arguments that will be passed to the function executed by the worker.
    30  type WorkArgs struct {
    31  	NamespacedName types.NamespacedName
    32  }
    33  
    34  // KeyFromWorkArgs creates a key for the given `WorkArgs`
    35  func (w *WorkArgs) KeyFromWorkArgs() string {
    36  	return w.NamespacedName.String()
    37  }
    38  
    39  // NewWorkArgs is a helper function to create new `WorkArgs`
    40  func NewWorkArgs(name, namespace string) *WorkArgs {
    41  	return &WorkArgs{
    42  		NamespacedName: types.NamespacedName{Namespace: namespace, Name: name},
    43  	}
    44  }
    45  
    46  // TimedWorker is a responsible for executing a function no earlier than at FireAt time.
    47  type TimedWorker struct {
    48  	WorkItem  *WorkArgs
    49  	CreatedAt time.Time
    50  	FireAt    time.Time
    51  	Timer     clock.Timer
    52  }
    53  
    54  // createWorker creates a TimedWorker that will execute `f` not earlier than `fireAt`.
    55  func createWorker(ctx context.Context, args *WorkArgs, createdAt time.Time, fireAt time.Time, f func(ctx context.Context, fireAt time.Time, args *WorkArgs) error, clock clock.WithDelayedExecution) *TimedWorker {
    56  	delay := fireAt.Sub(createdAt)
    57  	logger := klog.FromContext(ctx)
    58  	fWithErrorLogging := func() {
    59  		err := f(ctx, fireAt, args)
    60  		if err != nil {
    61  			logger.Error(err, "TaintEvictionController: timed worker failed")
    62  		}
    63  	}
    64  	if delay <= 0 {
    65  		go fWithErrorLogging()
    66  		return nil
    67  	}
    68  	timer := clock.AfterFunc(delay, fWithErrorLogging)
    69  	return &TimedWorker{
    70  		WorkItem:  args,
    71  		CreatedAt: createdAt,
    72  		FireAt:    fireAt,
    73  		Timer:     timer,
    74  	}
    75  }
    76  
    77  // Cancel cancels the execution of function by the `TimedWorker`
    78  func (w *TimedWorker) Cancel() {
    79  	if w != nil {
    80  		w.Timer.Stop()
    81  	}
    82  }
    83  
    84  // TimedWorkerQueue keeps a set of TimedWorkers that are still wait for execution.
    85  type TimedWorkerQueue struct {
    86  	sync.Mutex
    87  	// map of workers keyed by string returned by 'KeyFromWorkArgs' from the given worker.
    88  	workers  map[string]*TimedWorker
    89  	workFunc func(ctx context.Context, fireAt time.Time, args *WorkArgs) error
    90  	clock    clock.WithDelayedExecution
    91  }
    92  
    93  // CreateWorkerQueue creates a new TimedWorkerQueue for workers that will execute
    94  // given function `f`.
    95  func CreateWorkerQueue(f func(ctx context.Context, fireAt time.Time, args *WorkArgs) error) *TimedWorkerQueue {
    96  	return &TimedWorkerQueue{
    97  		workers:  make(map[string]*TimedWorker),
    98  		workFunc: f,
    99  		clock:    clock.RealClock{},
   100  	}
   101  }
   102  
   103  func (q *TimedWorkerQueue) getWrappedWorkerFunc(key string) func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
   104  	return func(ctx context.Context, fireAt time.Time, args *WorkArgs) error {
   105  		err := q.workFunc(ctx, fireAt, args)
   106  		q.Lock()
   107  		defer q.Unlock()
   108  		if err == nil {
   109  			// To avoid duplicated calls we keep the key in the queue, to prevent
   110  			// subsequent additions.
   111  			q.workers[key] = nil
   112  		} else {
   113  			delete(q.workers, key)
   114  		}
   115  		return err
   116  	}
   117  }
   118  
   119  // AddWork adds a work to the WorkerQueue which will be executed not earlier than `fireAt`.
   120  func (q *TimedWorkerQueue) AddWork(ctx context.Context, args *WorkArgs, createdAt time.Time, fireAt time.Time) {
   121  	key := args.KeyFromWorkArgs()
   122  	logger := klog.FromContext(ctx)
   123  	logger.V(4).Info("Adding TimedWorkerQueue item and to be fired at firedTime", "item", key, "createTime", createdAt, "firedTime", fireAt)
   124  
   125  	q.Lock()
   126  	defer q.Unlock()
   127  	if _, exists := q.workers[key]; exists {
   128  		logger.Info("Trying to add already existing work, skipping", "args", args)
   129  		return
   130  	}
   131  	worker := createWorker(ctx, args, createdAt, fireAt, q.getWrappedWorkerFunc(key), q.clock)
   132  	q.workers[key] = worker
   133  }
   134  
   135  // CancelWork removes scheduled function execution from the queue. Returns true if work was cancelled.
   136  func (q *TimedWorkerQueue) CancelWork(logger klog.Logger, key string) bool {
   137  	q.Lock()
   138  	defer q.Unlock()
   139  	worker, found := q.workers[key]
   140  	result := false
   141  	if found {
   142  		logger.V(4).Info("Cancelling TimedWorkerQueue item", "item", key, "time", time.Now())
   143  		if worker != nil {
   144  			result = true
   145  			worker.Cancel()
   146  		}
   147  		delete(q.workers, key)
   148  	}
   149  	return result
   150  }
   151  
   152  // GetWorkerUnsafe returns a TimedWorker corresponding to the given key.
   153  // Unsafe method - workers have attached goroutines which can fire after this function is called.
   154  func (q *TimedWorkerQueue) GetWorkerUnsafe(key string) *TimedWorker {
   155  	q.Lock()
   156  	defer q.Unlock()
   157  	return q.workers[key]
   158  }
   159  

View as plain text