...

Source file src/k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations/nestedpendingoperations.go

Documentation: k8s.io/kubernetes/pkg/volume/util/nestedpendingoperations

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  /*
    18  Package nestedpendingoperations is a modified implementation of
    19  pkg/util/goroutinemap. It implements a data structure for managing go routines
    20  by volume/pod name. It prevents the creation of new go routines if an existing
    21  go routine for the volume already exists. It also allows multiple operations to
    22  execute in parallel for the same volume as long as they are operating on
    23  different pods.
    24  */
    25  package nestedpendingoperations
    26  
    27  import (
    28  	"fmt"
    29  	"sync"
    30  
    31  	v1 "k8s.io/api/core/v1"
    32  	"k8s.io/apimachinery/pkg/types"
    33  	k8sRuntime "k8s.io/apimachinery/pkg/util/runtime"
    34  	"k8s.io/klog/v2"
    35  	"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
    36  	volumetypes "k8s.io/kubernetes/pkg/volume/util/types"
    37  )
    38  
    39  const (
    40  	// EmptyUniquePodName is a UniquePodName for empty string.
    41  	EmptyUniquePodName volumetypes.UniquePodName = volumetypes.UniquePodName("")
    42  
    43  	// EmptyUniqueVolumeName is a UniqueVolumeName for empty string
    44  	EmptyUniqueVolumeName v1.UniqueVolumeName = v1.UniqueVolumeName("")
    45  
    46  	// EmptyNodeName is a NodeName for empty string
    47  	EmptyNodeName types.NodeName = types.NodeName("")
    48  )
    49  
    50  // NestedPendingOperations defines the supported set of operations.
    51  type NestedPendingOperations interface {
    52  
    53  	// Run adds the concatenation of volumeName, podName, and nodeName to the list
    54  	// of running operations and spawns a new go routine to run
    55  	// generatedOperations.
    56  
    57  	// volumeName, podName, and nodeName collectively form the operation key.
    58  	// The following forms of operation keys are supported (two keys are designed
    59  	// to be "matched" if we want to serialize their operations):
    60  	// - volumeName empty, podName and nodeName could be anything
    61  	//   This key does not match with any keys.
    62  	// - volumeName exists, podName empty, nodeName empty
    63  	//   This key matches all other keys with the same volumeName.
    64  	// - volumeName exists, podName exists, nodeName empty
    65  	//   This key matches with:
    66  	//   - the same volumeName and podName
    67  	//   - the same volumeName, but empty podName
    68  	// - volumeName exists, podName empty, nodeName exists
    69  	//   This key matches with:
    70  	//   - the same volumeName and nodeName
    71  	//   - the same volumeName but empty nodeName
    72  
    73  	// If there is no operation with a matching key, the operation is allowed to
    74  	// proceed.
    75  	// If an operation with a matching key exists and the previous operation is
    76  	// running, an AlreadyExists error is returned.
    77  	// If an operation with a matching key exists and the previous operation
    78  	// failed:
    79  	// - If the previous operation has the same
    80  	//   generatedOperations.operationName:
    81  	//   - If the full exponential backoff period is satisfied, the operation is
    82  	//     allowed to proceed.
    83  	//   - Otherwise, an ExponentialBackoff error is returned.
    84  	// - Otherwise, exponential backoff is reset and operation is allowed to
    85  	//   proceed.
    86  
    87  	// Once the operation is complete, the go routine is terminated. If the
    88  	// operation succeeded, its corresponding key is removed from the list of
    89  	// executing operations, allowing a new operation to be started with the key
    90  	// without error. If it failed, the key remains and the exponential
    91  	// backoff status is updated.
    92  	Run(
    93  		volumeName v1.UniqueVolumeName,
    94  		podName volumetypes.UniquePodName,
    95  		nodeName types.NodeName,
    96  		generatedOperations volumetypes.GeneratedOperations) error
    97  
    98  	// Wait blocks until all operations are completed. This is typically
    99  	// necessary during tests - the test should wait until all operations finish
   100  	// and evaluate results after that.
   101  	Wait()
   102  
   103  	// IsOperationPending returns true if an operation for the given volumeName
   104  	// and one of podName or nodeName is pending, otherwise it returns false
   105  	IsOperationPending(
   106  		volumeName v1.UniqueVolumeName,
   107  		podName volumetypes.UniquePodName,
   108  		nodeName types.NodeName) bool
   109  
   110  	// IsOperationSafeToRetry returns false if an operation for the given volumeName
   111  	// and one of podName or nodeName is pending or in exponential backoff, otherwise it returns true
   112  	IsOperationSafeToRetry(
   113  		volumeName v1.UniqueVolumeName,
   114  		podName volumetypes.UniquePodName,
   115  		nodeName types.NodeName, operationName string) bool
   116  }
   117  
   118  // NewNestedPendingOperations returns a new instance of NestedPendingOperations.
   119  func NewNestedPendingOperations(exponentialBackOffOnError bool) NestedPendingOperations {
   120  	g := &nestedPendingOperations{
   121  		operations:                []operation{},
   122  		exponentialBackOffOnError: exponentialBackOffOnError,
   123  	}
   124  	g.cond = sync.NewCond(&g.lock)
   125  	return g
   126  }
   127  
   128  type nestedPendingOperations struct {
   129  	operations                []operation
   130  	exponentialBackOffOnError bool
   131  	cond                      *sync.Cond
   132  	lock                      sync.RWMutex
   133  }
   134  
   135  type operation struct {
   136  	key              operationKey
   137  	operationName    string
   138  	operationPending bool
   139  	expBackoff       exponentialbackoff.ExponentialBackoff
   140  }
   141  
   142  func (grm *nestedPendingOperations) Run(
   143  	volumeName v1.UniqueVolumeName,
   144  	podName volumetypes.UniquePodName,
   145  	nodeName types.NodeName,
   146  	generatedOperations volumetypes.GeneratedOperations) error {
   147  	grm.lock.Lock()
   148  	defer grm.lock.Unlock()
   149  
   150  	opKey := operationKey{volumeName, podName, nodeName}
   151  
   152  	opExists, previousOpIndex := grm.isOperationExists(opKey)
   153  	if opExists {
   154  		previousOp := grm.operations[previousOpIndex]
   155  		// Operation already exists
   156  		if previousOp.operationPending {
   157  			// Operation is pending
   158  			return NewAlreadyExistsError(opKey)
   159  		}
   160  
   161  		backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey))
   162  		if backOffErr != nil {
   163  			if previousOp.operationName == generatedOperations.OperationName {
   164  				return backOffErr
   165  			}
   166  			// previous operation and new operation are different. reset op. name and exp. backoff
   167  			grm.operations[previousOpIndex].operationName = generatedOperations.OperationName
   168  			grm.operations[previousOpIndex].expBackoff = exponentialbackoff.ExponentialBackoff{}
   169  		}
   170  
   171  		// Update existing operation to mark as pending.
   172  		grm.operations[previousOpIndex].operationPending = true
   173  		grm.operations[previousOpIndex].key = opKey
   174  	} else {
   175  		// Create a new operation
   176  		grm.operations = append(grm.operations,
   177  			operation{
   178  				key:              opKey,
   179  				operationPending: true,
   180  				operationName:    generatedOperations.OperationName,
   181  				expBackoff:       exponentialbackoff.ExponentialBackoff{},
   182  			})
   183  	}
   184  
   185  	go func() (eventErr, detailedErr error) {
   186  		// Handle unhandled panics (very unlikely)
   187  		defer k8sRuntime.HandleCrash()
   188  		// Handle completion of and error, if any, from operationFunc()
   189  		defer grm.operationComplete(opKey, &detailedErr)
   190  		return generatedOperations.Run()
   191  	}()
   192  
   193  	return nil
   194  }
   195  func (grm *nestedPendingOperations) IsOperationSafeToRetry(
   196  	volumeName v1.UniqueVolumeName,
   197  	podName volumetypes.UniquePodName,
   198  	nodeName types.NodeName,
   199  	operationName string) bool {
   200  
   201  	grm.lock.RLock()
   202  	defer grm.lock.RUnlock()
   203  
   204  	opKey := operationKey{volumeName, podName, nodeName}
   205  	exist, previousOpIndex := grm.isOperationExists(opKey)
   206  	if !exist {
   207  		return true
   208  	}
   209  	previousOp := grm.operations[previousOpIndex]
   210  	if previousOp.operationPending {
   211  		return false
   212  	}
   213  	backOffErr := previousOp.expBackoff.SafeToRetry(fmt.Sprintf("%+v", opKey))
   214  	if backOffErr != nil {
   215  		if previousOp.operationName == operationName {
   216  			return false
   217  		}
   218  	}
   219  
   220  	return true
   221  }
   222  
   223  func (grm *nestedPendingOperations) IsOperationPending(
   224  	volumeName v1.UniqueVolumeName,
   225  	podName volumetypes.UniquePodName,
   226  	nodeName types.NodeName) bool {
   227  
   228  	grm.lock.RLock()
   229  	defer grm.lock.RUnlock()
   230  
   231  	opKey := operationKey{volumeName, podName, nodeName}
   232  	exist, previousOpIndex := grm.isOperationExists(opKey)
   233  	if exist && grm.operations[previousOpIndex].operationPending {
   234  		return true
   235  	}
   236  	return false
   237  }
   238  
   239  // This is an internal function and caller should acquire and release the lock
   240  func (grm *nestedPendingOperations) isOperationExists(key operationKey) (bool, int) {
   241  
   242  	// If volumeName is empty, operation can be executed concurrently
   243  	if key.volumeName == EmptyUniqueVolumeName {
   244  		return false, -1
   245  	}
   246  
   247  	opIndex := -1
   248  	for previousOpIndex, previousOp := range grm.operations {
   249  		volumeNameMatch := previousOp.key.volumeName == key.volumeName
   250  
   251  		podNameMatch := previousOp.key.podName == EmptyUniquePodName ||
   252  			key.podName == EmptyUniquePodName ||
   253  			previousOp.key.podName == key.podName
   254  
   255  		podNameExactMatch := previousOp.key.podName == key.podName
   256  
   257  		nodeNameMatch := previousOp.key.nodeName == EmptyNodeName ||
   258  			key.nodeName == EmptyNodeName ||
   259  			previousOp.key.nodeName == key.nodeName
   260  
   261  		nodeNameExactMatch := previousOp.key.nodeName == key.nodeName
   262  
   263  		if volumeNameMatch && podNameMatch && nodeNameMatch {
   264  			// nonExactMatch pending first
   265  			if previousOp.operationPending {
   266  				return true, previousOpIndex
   267  			}
   268  			// nonExactMatch with no pending, set opIndex to the first nonExactMatch
   269  			// exactMatch can override opIndex to expected
   270  			if opIndex == -1 || (podNameExactMatch && nodeNameExactMatch) {
   271  				opIndex = previousOpIndex
   272  			}
   273  		}
   274  	}
   275  	return opIndex != -1, opIndex
   276  
   277  }
   278  
   279  func (grm *nestedPendingOperations) getOperation(key operationKey) (uint, error) {
   280  	// Assumes lock has been acquired by caller.
   281  
   282  	for i, op := range grm.operations {
   283  		if op.key.volumeName == key.volumeName &&
   284  			op.key.podName == key.podName &&
   285  			op.key.nodeName == key.nodeName {
   286  			return uint(i), nil
   287  		}
   288  	}
   289  
   290  	return 0, fmt.Errorf("operation %+v not found", key)
   291  }
   292  
   293  func (grm *nestedPendingOperations) deleteOperation(key operationKey) {
   294  	// Assumes lock has been acquired by caller.
   295  
   296  	opIndex := -1
   297  	for i, op := range grm.operations {
   298  		if op.key.volumeName == key.volumeName &&
   299  			op.key.podName == key.podName &&
   300  			op.key.nodeName == key.nodeName {
   301  			opIndex = i
   302  			break
   303  		}
   304  	}
   305  
   306  	if opIndex < 0 {
   307  		return
   308  	}
   309  
   310  	// Delete index without preserving order
   311  	grm.operations[opIndex] = grm.operations[len(grm.operations)-1]
   312  	grm.operations = grm.operations[:len(grm.operations)-1]
   313  }
   314  
   315  func (grm *nestedPendingOperations) operationComplete(key operationKey, err *error) {
   316  	// Defer operations are executed in Last-In is First-Out order. In this case
   317  	// the lock is acquired first when operationCompletes begins, and is
   318  	// released when the method finishes, after the lock is released cond is
   319  	// signaled to wake waiting goroutine.
   320  	defer grm.cond.Signal()
   321  	grm.lock.Lock()
   322  	defer grm.lock.Unlock()
   323  
   324  	if *err == nil || !grm.exponentialBackOffOnError {
   325  		// Operation completed without error, or exponentialBackOffOnError disabled
   326  		grm.deleteOperation(key)
   327  		if *err != nil {
   328  			// Log error
   329  			klog.Errorf("operation %+v failed with: %v", key, *err)
   330  		}
   331  		return
   332  	}
   333  
   334  	// Operation completed with error and exponentialBackOffOnError Enabled
   335  	existingOpIndex, getOpErr := grm.getOperation(key)
   336  	if getOpErr != nil {
   337  		// Failed to find existing operation
   338  		klog.Errorf("Operation %+v completed. error: %v. exponentialBackOffOnError is enabled, but failed to get operation to update.",
   339  			key,
   340  			*err)
   341  		return
   342  	}
   343  
   344  	grm.operations[existingOpIndex].expBackoff.Update(err)
   345  	grm.operations[existingOpIndex].operationPending = false
   346  
   347  	// Log error
   348  	klog.Errorf("%v", grm.operations[existingOpIndex].expBackoff.
   349  		GenerateNoRetriesPermittedMsg(fmt.Sprintf("%+v", key)))
   350  }
   351  
   352  func (grm *nestedPendingOperations) Wait() {
   353  	grm.lock.Lock()
   354  	defer grm.lock.Unlock()
   355  
   356  	for len(grm.operations) > 0 {
   357  		grm.cond.Wait()
   358  	}
   359  }
   360  
   361  type operationKey struct {
   362  	volumeName v1.UniqueVolumeName
   363  	podName    volumetypes.UniquePodName
   364  	nodeName   types.NodeName
   365  }
   366  
   367  // NewAlreadyExistsError returns a new instance of AlreadyExists error.
   368  func NewAlreadyExistsError(key operationKey) error {
   369  	return alreadyExistsError{key}
   370  }
   371  
   372  // IsAlreadyExists returns true if an error returned from
   373  // NestedPendingOperations indicates a new operation can not be started because
   374  // an operation with the same operation name is already executing.
   375  func IsAlreadyExists(err error) bool {
   376  	switch err.(type) {
   377  	case alreadyExistsError:
   378  		return true
   379  	default:
   380  		return false
   381  	}
   382  }
   383  
   384  // alreadyExistsError is the error returned by NestedPendingOperations when a
   385  // new operation can not be started because an operation with the same operation
   386  // name is already executing.
   387  type alreadyExistsError struct {
   388  	operationKey operationKey
   389  }
   390  
   391  var _ error = alreadyExistsError{}
   392  
   393  func (err alreadyExistsError) Error() string {
   394  	return fmt.Sprintf(
   395  		"Failed to create operation with name %+v. An operation with that name is already executing.",
   396  		err.operationKey)
   397  }
   398  

View as plain text