...

Source file src/k8s.io/kubernetes/pkg/util/goroutinemap/goroutinemap.go

Documentation: k8s.io/kubernetes/pkg/util/goroutinemap

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  /*
    18  Package goroutinemap implements a data structure for managing go routines
    19  by name. It prevents the creation of new go routines if an existing go routine
    20  with the same name exists.
    21  */
    22  package goroutinemap
    23  
    24  import (
    25  	"fmt"
    26  	"sync"
    27  
    28  	k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
    29  	"k8s.io/klog/v2"
    30  	"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
    31  )
    32  
    33  // GoRoutineMap defines a type that can run named goroutines and track their
    34  // state.  It prevents the creation of multiple goroutines with the same name
    35  // and may prevent recreation of a goroutine until after the a backoff time
    36  // has elapsed after the last goroutine with that name finished.
    37  type GoRoutineMap interface {
    38  	// Run adds operation name to the list of running operations and spawns a
    39  	// new go routine to execute the operation.
    40  	// If an operation with the same operation name already exists, an
    41  	// AlreadyExists or ExponentialBackoff error is returned.
    42  	// Once the operation is complete, the go routine is terminated and the
    43  	// operation name is removed from the list of executing operations allowing
    44  	// a new operation to be started with the same operation name without error.
    45  	Run(operationName string, operationFunc func() error) error
    46  
    47  	// Wait blocks until operations map is empty. This is typically
    48  	// necessary during tests - the test should wait until all operations finish
    49  	// and evaluate results after that.
    50  	Wait()
    51  
    52  	// WaitForCompletion blocks until either all operations have successfully completed
    53  	// or have failed but are not pending. The test should wait until operations are either
    54  	// complete or have failed.
    55  	WaitForCompletion()
    56  
    57  	// IsOperationPending returns true if the operation is pending (currently
    58  	// running), otherwise returns false.
    59  	IsOperationPending(operationName string) bool
    60  }
    61  
    62  // NewGoRoutineMap returns a new instance of GoRoutineMap.
    63  func NewGoRoutineMap(exponentialBackOffOnError bool) GoRoutineMap {
    64  	g := &goRoutineMap{
    65  		operations:                make(map[string]operation),
    66  		exponentialBackOffOnError: exponentialBackOffOnError,
    67  	}
    68  
    69  	g.cond = sync.NewCond(&g.lock)
    70  	return g
    71  }
    72  
    73  type goRoutineMap struct {
    74  	operations                map[string]operation
    75  	exponentialBackOffOnError bool
    76  	cond                      *sync.Cond
    77  	lock                      sync.RWMutex
    78  }
    79  
    80  // operation holds the state of a single goroutine.
    81  type operation struct {
    82  	operationPending bool
    83  	expBackoff       exponentialbackoff.ExponentialBackoff
    84  }
    85  
    86  func (grm *goRoutineMap) Run(
    87  	operationName string,
    88  	operationFunc func() error) error {
    89  	grm.lock.Lock()
    90  	defer grm.lock.Unlock()
    91  
    92  	existingOp, exists := grm.operations[operationName]
    93  	if exists {
    94  		// Operation with name exists
    95  		if existingOp.operationPending {
    96  			return NewAlreadyExistsError(operationName)
    97  		}
    98  
    99  		if err := existingOp.expBackoff.SafeToRetry(operationName); err != nil {
   100  			return err
   101  		}
   102  	}
   103  
   104  	grm.operations[operationName] = operation{
   105  		operationPending: true,
   106  		expBackoff:       existingOp.expBackoff,
   107  	}
   108  	go func() (err error) {
   109  		// Handle unhandled panics (very unlikely)
   110  		defer k8sRuntime.HandleCrash()
   111  		// Handle completion of and error, if any, from operationFunc()
   112  		defer grm.operationComplete(operationName, &err)
   113  		// Handle panic, if any, from operationFunc()
   114  		defer k8sRuntime.RecoverFromPanic(&err)
   115  		return operationFunc()
   116  	}()
   117  
   118  	return nil
   119  }
   120  
   121  // operationComplete handles the completion of a goroutine run in the
   122  // goRoutineMap.
   123  func (grm *goRoutineMap) operationComplete(
   124  	operationName string, err *error) {
   125  	// Defer operations are executed in Last-In is First-Out order. In this case
   126  	// the lock is acquired first when operationCompletes begins, and is
   127  	// released when the method finishes, after the lock is released cond is
   128  	// signaled to wake waiting goroutine.
   129  	defer grm.cond.Signal()
   130  	grm.lock.Lock()
   131  	defer grm.lock.Unlock()
   132  
   133  	if *err == nil || !grm.exponentialBackOffOnError {
   134  		// Operation completed without error, or exponentialBackOffOnError disabled
   135  		delete(grm.operations, operationName)
   136  		if *err != nil {
   137  			// Log error
   138  			klog.Errorf("operation for %q failed with: %v",
   139  				operationName,
   140  				*err)
   141  		}
   142  	} else {
   143  		// Operation completed with error and exponentialBackOffOnError Enabled
   144  		existingOp := grm.operations[operationName]
   145  		existingOp.expBackoff.Update(err)
   146  		existingOp.operationPending = false
   147  		grm.operations[operationName] = existingOp
   148  
   149  		// Log error
   150  		klog.Errorf("%v",
   151  			existingOp.expBackoff.GenerateNoRetriesPermittedMsg(operationName))
   152  	}
   153  }
   154  
   155  func (grm *goRoutineMap) IsOperationPending(operationName string) bool {
   156  	grm.lock.RLock()
   157  	defer grm.lock.RUnlock()
   158  	existingOp, exists := grm.operations[operationName]
   159  	if exists && existingOp.operationPending {
   160  		return true
   161  	}
   162  	return false
   163  }
   164  
   165  func (grm *goRoutineMap) Wait() {
   166  	grm.lock.Lock()
   167  	defer grm.lock.Unlock()
   168  
   169  	for len(grm.operations) > 0 {
   170  		grm.cond.Wait()
   171  	}
   172  }
   173  
   174  func (grm *goRoutineMap) WaitForCompletion() {
   175  	grm.lock.Lock()
   176  	defer grm.lock.Unlock()
   177  
   178  	for {
   179  		if len(grm.operations) == 0 || grm.nothingPending() {
   180  			break
   181  		} else {
   182  			grm.cond.Wait()
   183  		}
   184  	}
   185  }
   186  
   187  // Check if any operation is pending. Already assumes caller has the
   188  // necessary locks
   189  func (grm *goRoutineMap) nothingPending() bool {
   190  	nothingIsPending := true
   191  	for _, operation := range grm.operations {
   192  		if operation.operationPending {
   193  			nothingIsPending = false
   194  			break
   195  		}
   196  	}
   197  	return nothingIsPending
   198  }
   199  
   200  // NewAlreadyExistsError returns a new instance of AlreadyExists error.
   201  func NewAlreadyExistsError(operationName string) error {
   202  	return alreadyExistsError{operationName}
   203  }
   204  
   205  // IsAlreadyExists returns true if an error returned from GoRoutineMap indicates
   206  // a new operation can not be started because an operation with the same
   207  // operation name is already executing.
   208  func IsAlreadyExists(err error) bool {
   209  	switch err.(type) {
   210  	case alreadyExistsError:
   211  		return true
   212  	default:
   213  		return false
   214  	}
   215  }
   216  
   217  // alreadyExistsError is the error returned by GoRoutineMap when a new operation
   218  // can not be started because an operation with the same operation name is
   219  // already executing.
   220  type alreadyExistsError struct {
   221  	operationName string
   222  }
   223  
   224  var _ error = alreadyExistsError{}
   225  
   226  func (err alreadyExistsError) Error() string {
   227  	return fmt.Sprintf(
   228  		"Failed to create operation with name %q. An operation with that name is already executing.",
   229  		err.operationName)
   230  }
   231  

View as plain text