1 /* 2 Copyright 2019 The Kubernetes Authors. 3 4 Licensed under the Apache License, Version 2.0 (the "License"); 5 you may not use this file except in compliance with the License. 6 You may obtain a copy of the License at 7 8 http://www.apache.org/licenses/LICENSE-2.0 9 10 Unless required by applicable law or agreed to in writing, software 11 distributed under the License is distributed on an "AS IS" BASIS, 12 WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 13 See the License for the specific language governing permissions and 14 limitations under the License. 15 */ 16 17 // Package operationexecutor implements interfaces that enable execution of 18 // register and unregister operations with a 19 // goroutinemap so that more than one operation is never triggered 20 // on the same plugin. 21 package operationexecutor 22 23 import ( 24 "time" 25 26 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache" 27 "k8s.io/kubernetes/pkg/util/goroutinemap" 28 ) 29 30 // OperationExecutor defines a set of operations for registering and unregistering 31 // a plugin that are executed with a NewGoRoutineMap which 32 // prevents more than one operation from being triggered on the same socket path. 33 // 34 // These operations should be idempotent (for example, RegisterPlugin should 35 // still succeed if the plugin is already registered, etc.). However, 36 // they depend on the plugin handlers (for each plugin type) to implement this 37 // behavior. 38 // 39 // Once an operation completes successfully, the actualStateOfWorld is updated 40 // to indicate the plugin is registered/unregistered. 41 // 42 // Once the operation is started, since it is executed asynchronously, 43 // errors are simply logged and the goroutine is terminated without updating 44 // actualStateOfWorld. 45 type OperationExecutor interface { 46 // RegisterPlugin registers the given plugin using a handler in the plugin handler map. 47 // It then updates the actual state of the world to reflect that. 48 RegisterPlugin(socketPath string, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error 49 50 // UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map. 51 // It then updates the actual state of the world to reflect that. 52 UnregisterPlugin(pluginInfo cache.PluginInfo, actualStateOfWorld ActualStateOfWorldUpdater) error 53 } 54 55 // NewOperationExecutor returns a new instance of OperationExecutor. 56 func NewOperationExecutor( 57 operationGenerator OperationGenerator) OperationExecutor { 58 59 return &operationExecutor{ 60 pendingOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */), 61 operationGenerator: operationGenerator, 62 } 63 } 64 65 // ActualStateOfWorldUpdater defines a set of operations updating the actual 66 // state of the world cache after successful registration/deregistration. 67 type ActualStateOfWorldUpdater interface { 68 // AddPlugin add the given plugin in the cache if no existing plugin 69 // in the cache has the same socket path. 70 // An error will be returned if socketPath is empty. 71 AddPlugin(pluginInfo cache.PluginInfo) error 72 73 // RemovePlugin deletes the plugin with the given socket path from the actual 74 // state of world. 75 // If a plugin does not exist with the given socket path, this is a no-op. 76 RemovePlugin(socketPath string) 77 } 78 79 type operationExecutor struct { 80 // pendingOperations keeps track of pending attach and detach operations so 81 // multiple operations are not started on the same volume 82 pendingOperations goroutinemap.GoRoutineMap 83 84 // operationGenerator is an interface that provides implementations for 85 // generating volume function 86 operationGenerator OperationGenerator 87 } 88 89 var _ OperationExecutor = &operationExecutor{} 90 91 func (oe *operationExecutor) IsOperationPending(socketPath string) bool { 92 return oe.pendingOperations.IsOperationPending(socketPath) 93 } 94 95 func (oe *operationExecutor) RegisterPlugin( 96 socketPath string, 97 timestamp time.Time, 98 pluginHandlers map[string]cache.PluginHandler, 99 actualStateOfWorld ActualStateOfWorldUpdater) error { 100 generatedOperation := 101 oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, timestamp, pluginHandlers, actualStateOfWorld) 102 103 return oe.pendingOperations.Run( 104 socketPath, generatedOperation) 105 } 106 107 func (oe *operationExecutor) UnregisterPlugin( 108 pluginInfo cache.PluginInfo, 109 actualStateOfWorld ActualStateOfWorldUpdater) error { 110 generatedOperation := 111 oe.operationGenerator.GenerateUnregisterPluginFunc(pluginInfo, actualStateOfWorld) 112 113 return oe.pendingOperations.Run( 114 pluginInfo.SocketPath, generatedOperation) 115 } 116