...

Source file src/k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor/operation_executor.go

Documentation: k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package operationexecutor implements interfaces that enable execution of
    18  // register and unregister operations with a
    19  // goroutinemap so that more than one operation is never triggered
    20  // on the same plugin.
    21  package operationexecutor
    22  
    23  import (
    24  	"time"
    25  
    26  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
    27  	"k8s.io/kubernetes/pkg/util/goroutinemap"
    28  )
    29  
    30  // OperationExecutor defines a set of operations for registering and unregistering
    31  // a plugin that are executed with a NewGoRoutineMap which
    32  // prevents more than one operation from being triggered on the same socket path.
    33  //
    34  // These operations should be idempotent (for example, RegisterPlugin should
    35  // still succeed if the plugin is already registered, etc.). However,
    36  // they depend on the plugin handlers (for each plugin type) to implement this
    37  // behavior.
    38  //
    39  // Once an operation completes successfully, the actualStateOfWorld is updated
    40  // to indicate the plugin is registered/unregistered.
    41  //
    42  // Once the operation is started, since it is executed asynchronously,
    43  // errors are simply logged and the goroutine is terminated without updating
    44  // actualStateOfWorld.
    45  type OperationExecutor interface {
    46  	// RegisterPlugin registers the given plugin using a handler in the plugin handler map.
    47  	// It then updates the actual state of the world to reflect that.
    48  	RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
    49  
    50  	// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
    51  	// It then updates the actual state of the world to reflect that.
    52  	UnregisterPlugin(pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error
    53  }
    54  
    55  // NewOperationExecutor returns a new instance of OperationExecutor.
    56  func NewOperationExecutor(
    57  	operationGenerator OperationGenerator) OperationExecutor {
    58  
    59  	return &operationExecutor{
    60  		pendingOperations:  goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
    61  		operationGenerator: operationGenerator,
    62  	}
    63  }
    64  
    65  // ActualStateOfWorldUpdater defines a set of operations updating the actual
    66  // state of the world cache after successful registration/deregistration.
    67  type ActualStateOfWorldUpdater interface {
    68  	// AddPlugin add the given plugin in the cache if no existing plugin
    69  	// in the cache has the same socket path.
    70  	// An error will be returned if socketPath is empty.
    71  	AddPlugin(pluginInfo cache.PluginInfo) error
    72  
    73  	// RemovePlugin deletes the plugin with the given socket path from the actual
    74  	// state of world.
    75  	// If a plugin does not exist with the given socket path, this is a no-op.
    76  	RemovePlugin(socketPath string)
    77  }
    78  
    79  type operationExecutor struct {
    80  	// pendingOperations keeps track of pending attach and detach operations so
    81  	// multiple operations are not started on the same volume
    82  	pendingOperations goroutinemap.GoRoutineMap
    83  
    84  	// operationGenerator is an interface that provides implementations for
    85  	// generating volume function
    86  	operationGenerator OperationGenerator
    87  }
    88  
    89  var _ OperationExecutor = &operationExecutor{}
    90  
    91  func (oe *operationExecutor) IsOperationPending(socketPath string) bool {
    92  	return oe.pendingOperations.IsOperationPending(socketPath)
    93  }
    94  
    95  func (oe *operationExecutor) RegisterPlugin(
    96  	socketPath string,
    97  	timestamp time.Time,
    98  	pluginHandlers map[string]cache.PluginHandler,
    99  	actualStateOfWorld ActualStateOfWorldUpdater) error {
   100  	generatedOperation :=
   101  		oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld)
   102  
   103  	return oe.pendingOperations.Run(
   104  		socketPath, generatedOperation)
   105  }
   106  
   107  func (oe *operationExecutor) UnregisterPlugin(
   108  	pluginInfo cache.PluginInfo,
   109  	actualStateOfWorld ActualStateOfWorldUpdater) error {
   110  	generatedOperation :=
   111  		oe.operationGenerator.GenerateUnregisterPluginFunc(pluginInfo, actualStateOfWorld)
   112  
   113  	return oe.pendingOperations.Run(
   114  		pluginInfo.SocketPath, generatedOperation)
   115  }
   116  

View as plain text