...
1
16
17 package pluginmanager
18
19 import (
20 "time"
21
22 "k8s.io/apimachinery/pkg/util/runtime"
23 "k8s.io/client-go/tools/record"
24 "k8s.io/klog/v2"
25 "k8s.io/kubernetes/pkg/kubelet/config"
26 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
27 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/metrics"
28 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
29 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
30 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/reconciler"
31 )
32
33
34
35 type PluginManager interface {
36
37 Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
38
39
40
41
42
43 AddHandler(pluginType string, pluginHandler cache.PluginHandler)
44 }
45
46 const (
47
48
49 loopSleepDuration = 1 * time.Second
50 )
51
52
53
54 func NewPluginManager(
55 sockDir string,
56 recorder record.EventRecorder) PluginManager {
57 asw := cache.NewActualStateOfWorld()
58 dsw := cache.NewDesiredStateOfWorld()
59 reconciler := reconciler.NewReconciler(
60 operationexecutor.NewOperationExecutor(
61 operationexecutor.NewOperationGenerator(
62 recorder,
63 ),
64 ),
65 loopSleepDuration,
66 dsw,
67 asw,
68 )
69
70 pm := &pluginManager{
71 desiredStateOfWorldPopulator: pluginwatcher.NewWatcher(
72 sockDir,
73 dsw,
74 ),
75 reconciler: reconciler,
76 desiredStateOfWorld: dsw,
77 actualStateOfWorld: asw,
78 }
79 return pm
80 }
81
82
83 type pluginManager struct {
84
85
86 desiredStateOfWorldPopulator *pluginwatcher.Watcher
87
88
89
90
91 reconciler reconciler.Reconciler
92
93
94
95
96
97 actualStateOfWorld cache.ActualStateOfWorld
98
99
100
101
102
103 desiredStateOfWorld cache.DesiredStateOfWorld
104 }
105
106 var _ PluginManager = &pluginManager{}
107
108 func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
109 defer runtime.HandleCrash()
110
111 if err := pm.desiredStateOfWorldPopulator.Start(stopCh); err != nil {
112 klog.ErrorS(err, "The desired_state_of_world populator (plugin watcher) starts failed!")
113 return
114 }
115
116 klog.V(2).InfoS("The desired_state_of_world populator (plugin watcher) starts")
117
118 klog.InfoS("Starting Kubelet Plugin Manager")
119 go pm.reconciler.Run(stopCh)
120
121 metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld)
122 <-stopCh
123 klog.InfoS("Shutting down Kubelet Plugin Manager")
124 }
125
126 func (pm *pluginManager) AddHandler(pluginType string, handler cache.PluginHandler) {
127 pm.reconciler.AddHandler(pluginType, handler)
128 }
129
View as plain text