...

Source file src/k8s.io/kubernetes/pkg/kubelet/pluginmanager/reconciler/reconciler.go

Documentation: k8s.io/kubernetes/pkg/kubelet/pluginmanager/reconciler

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package reconciler implements interfaces that attempt to reconcile the
    18  // desired state of the world with the actual state of the world by triggering
    19  // relevant actions (register/deregister plugins).
    20  package reconciler
    21  
    22  import (
    23  	"sync"
    24  	"time"
    25  
    26  	"k8s.io/apimachinery/pkg/util/wait"
    27  	"k8s.io/klog/v2"
    28  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
    29  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
    30  	"k8s.io/kubernetes/pkg/util/goroutinemap"
    31  	"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
    32  )
    33  
    34  // Reconciler runs a periodic loop to reconcile the desired state of the world
    35  // with the actual state of the world by triggering register and unregister
    36  // operations. Also provides a means to add a handler for a plugin type.
    37  type Reconciler interface {
    38  	// Run starts running the reconciliation loop which executes periodically,
    39  	// checks if plugins are correctly registered or unregistered.
    40  	// If not, it will trigger register/unregister operations to rectify.
    41  	Run(stopCh <-chan struct{})
    42  
    43  	// AddHandler adds the given plugin handler for a specific plugin type,
    44  	// which will be added to the actual state of world cache.
    45  	AddHandler(pluginType string, pluginHandler cache.PluginHandler)
    46  }
    47  
    48  // NewReconciler returns a new instance of Reconciler.
    49  //
    50  // operationExecutor - used to trigger register/unregister operations safely
    51  // (prevents more than one operation from being triggered on the same
    52  // socket path)
    53  //
    54  // loopSleepDuration - the amount of time the reconciler loop sleeps between
    55  // successive executions
    56  //
    57  // desiredStateOfWorld - cache containing the desired state of the world
    58  //
    59  // actualStateOfWorld - cache containing the actual state of the world
    60  func NewReconciler(
    61  	operationExecutor operationexecutor.OperationExecutor,
    62  	loopSleepDuration time.Duration,
    63  	desiredStateOfWorld cache.DesiredStateOfWorld,
    64  	actualStateOfWorld cache.ActualStateOfWorld) Reconciler {
    65  	return &reconciler{
    66  		operationExecutor:   operationExecutor,
    67  		loopSleepDuration:   loopSleepDuration,
    68  		desiredStateOfWorld: desiredStateOfWorld,
    69  		actualStateOfWorld:  actualStateOfWorld,
    70  		handlers:            make(map[string]cache.PluginHandler),
    71  	}
    72  }
    73  
    74  type reconciler struct {
    75  	operationExecutor   operationexecutor.OperationExecutor
    76  	loopSleepDuration   time.Duration
    77  	desiredStateOfWorld cache.DesiredStateOfWorld
    78  	actualStateOfWorld  cache.ActualStateOfWorld
    79  	handlers            map[string]cache.PluginHandler
    80  	sync.RWMutex
    81  }
    82  
    83  var _ Reconciler = &reconciler{}
    84  
    85  func (rc *reconciler) Run(stopCh <-chan struct{}) {
    86  	wait.Until(func() {
    87  		rc.reconcile()
    88  	},
    89  		rc.loopSleepDuration,
    90  		stopCh)
    91  }
    92  
    93  func (rc *reconciler) AddHandler(pluginType string, pluginHandler cache.PluginHandler) {
    94  	rc.Lock()
    95  	defer rc.Unlock()
    96  
    97  	rc.handlers[pluginType] = pluginHandler
    98  }
    99  
   100  func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
   101  	rc.RLock()
   102  	defer rc.RUnlock()
   103  
   104  	var copyHandlers = make(map[string]cache.PluginHandler)
   105  	for pluginType, handler := range rc.handlers {
   106  		copyHandlers[pluginType] = handler
   107  	}
   108  	return copyHandlers
   109  }
   110  
   111  func (rc *reconciler) reconcile() {
   112  	// Unregisterations are triggered before registrations
   113  
   114  	// Ensure plugins that should be unregistered are unregistered.
   115  	for _, registeredPlugin := range rc.actualStateOfWorld.GetRegisteredPlugins() {
   116  		unregisterPlugin := false
   117  		if !rc.desiredStateOfWorld.PluginExists(registeredPlugin.SocketPath) {
   118  			unregisterPlugin = true
   119  		} else {
   120  			// We also need to unregister the plugins that exist in both actual state of world
   121  			// and desired state of world cache, but the timestamps don't match.
   122  			// Iterate through desired state of world plugins and see if there's any plugin
   123  			// with the same socket path but different timestamp.
   124  			for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {
   125  				if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.Timestamp != registeredPlugin.Timestamp {
   126  					klog.V(5).InfoS("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin)
   127  					unregisterPlugin = true
   128  					break
   129  				}
   130  			}
   131  		}
   132  
   133  		if unregisterPlugin {
   134  			klog.V(5).InfoS("Starting operationExecutor.UnregisterPlugin", "plugin", registeredPlugin)
   135  			err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)
   136  			if err != nil &&
   137  				!goroutinemap.IsAlreadyExists(err) &&
   138  				!exponentialbackoff.IsExponentialBackoff(err) {
   139  				// Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
   140  				// Log all other errors.
   141  				klog.ErrorS(err, "OperationExecutor.UnregisterPlugin failed", "plugin", registeredPlugin)
   142  			}
   143  			if err == nil {
   144  				klog.V(1).InfoS("OperationExecutor.UnregisterPlugin started", "plugin", registeredPlugin)
   145  			}
   146  		}
   147  	}
   148  
   149  	// Ensure plugins that should be registered are registered
   150  	for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
   151  		if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
   152  			klog.V(5).InfoS("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister)
   153  			err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
   154  			if err != nil &&
   155  				!goroutinemap.IsAlreadyExists(err) &&
   156  				!exponentialbackoff.IsExponentialBackoff(err) {
   157  				// Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
   158  				klog.ErrorS(err, "OperationExecutor.RegisterPlugin failed", "plugin", pluginToRegister)
   159  			}
   160  			if err == nil {
   161  				klog.V(1).InfoS("OperationExecutor.RegisterPlugin started", "plugin", pluginToRegister)
   162  			}
   163  		}
   164  	}
   165  }
   166  

View as plain text