...

Source file src/k8s.io/kubernetes/pkg/kubelet/pluginmanager/plugin_manager.go

Documentation: k8s.io/kubernetes/pkg/kubelet/pluginmanager

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package pluginmanager
    18  
    19  import (
    20  	"time"
    21  
    22  	"k8s.io/apimachinery/pkg/util/runtime"
    23  	"k8s.io/client-go/tools/record"
    24  	"k8s.io/klog/v2"
    25  	"k8s.io/kubernetes/pkg/kubelet/config"
    26  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
    27  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/metrics"
    28  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
    29  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
    30  	"k8s.io/kubernetes/pkg/kubelet/pluginmanager/reconciler"
    31  )
    32  
    33  // PluginManager runs a set of asynchronous loops that figure out which plugins
    34  // need to be registered/deregistered and makes it so.
    35  type PluginManager interface {
    36  	// Starts the plugin manager and all the asynchronous loops that it controls
    37  	Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
    38  
    39  	// AddHandler adds the given plugin handler for a specific plugin type, which
    40  	// will be added to the actual state of world cache so that it can be passed to
    41  	// the desired state of world cache in order to be used during plugin
    42  	// registration/deregistration
    43  	AddHandler(pluginType string, pluginHandler cache.PluginHandler)
    44  }
    45  
    46  const (
    47  	// loopSleepDuration is the amount of time the reconciler loop waits
    48  	// between successive executions
    49  	loopSleepDuration = 1 * time.Second
    50  )
    51  
    52  // NewPluginManager returns a new concrete instance implementing the
    53  // PluginManager interface.
    54  func NewPluginManager(
    55  	sockDir string,
    56  	recorder record.EventRecorder) PluginManager {
    57  	asw := cache.NewActualStateOfWorld()
    58  	dsw := cache.NewDesiredStateOfWorld()
    59  	reconciler := reconciler.NewReconciler(
    60  		operationexecutor.NewOperationExecutor(
    61  			operationexecutor.NewOperationGenerator(
    62  				recorder,
    63  			),
    64  		),
    65  		loopSleepDuration,
    66  		dsw,
    67  		asw,
    68  	)
    69  
    70  	pm := &pluginManager{
    71  		desiredStateOfWorldPopulator: pluginwatcher.NewWatcher(
    72  			sockDir,
    73  			dsw,
    74  		),
    75  		reconciler:          reconciler,
    76  		desiredStateOfWorld: dsw,
    77  		actualStateOfWorld:  asw,
    78  	}
    79  	return pm
    80  }
    81  
    82  // pluginManager implements the PluginManager interface
    83  type pluginManager struct {
    84  	// desiredStateOfWorldPopulator (the plugin watcher) runs an asynchronous
    85  	// periodic loop to populate the desiredStateOfWorld.
    86  	desiredStateOfWorldPopulator *pluginwatcher.Watcher
    87  
    88  	// reconciler runs an asynchronous periodic loop to reconcile the
    89  	// desiredStateOfWorld with the actualStateOfWorld by triggering register
    90  	// and unregister operations using the operationExecutor.
    91  	reconciler reconciler.Reconciler
    92  
    93  	// actualStateOfWorld is a data structure containing the actual state of
    94  	// the world according to the manager: i.e. which plugins are registered.
    95  	// The data structure is populated upon successful completion of register
    96  	// and unregister actions triggered by the reconciler.
    97  	actualStateOfWorld cache.ActualStateOfWorld
    98  
    99  	// desiredStateOfWorld is a data structure containing the desired state of
   100  	// the world according to the plugin manager: i.e. what plugins are registered.
   101  	// The data structure is populated by the desired state of the world
   102  	// populator (plugin watcher).
   103  	desiredStateOfWorld cache.DesiredStateOfWorld
   104  }
   105  
   106  var _ PluginManager = &pluginManager{}
   107  
   108  func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
   109  	defer runtime.HandleCrash()
   110  
   111  	if err := pm.desiredStateOfWorldPopulator.Start(stopCh); err != nil {
   112  		klog.ErrorS(err, "The desired_state_of_world populator (plugin watcher) starts failed!")
   113  		return
   114  	}
   115  
   116  	klog.V(2).InfoS("The desired_state_of_world populator (plugin watcher) starts")
   117  
   118  	klog.InfoS("Starting Kubelet Plugin Manager")
   119  	go pm.reconciler.Run(stopCh)
   120  
   121  	metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld)
   122  	<-stopCh
   123  	klog.InfoS("Shutting down Kubelet Plugin Manager")
   124  }
   125  
   126  func (pm *pluginManager) AddHandler(pluginType string, handler cache.PluginHandler) {
   127  	pm.reconciler.AddHandler(pluginType, handler)
   128  }
   129  

View as plain text