...
1
16
17
18
19
20 package reconciler
21
22 import (
23 "sync"
24 "time"
25
26 "k8s.io/apimachinery/pkg/util/wait"
27 "k8s.io/klog/v2"
28 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
29 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
30 "k8s.io/kubernetes/pkg/util/goroutinemap"
31 "k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
32 )
33
34
35
36
37 type Reconciler interface {
38
39
40
41 Run(stopCh <-chan struct{})
42
43
44
45 AddHandler(pluginType string, pluginHandler cache.PluginHandler)
46 }
47
48
49
50
51
52
53
54
55
56
57
58
59
60 func NewReconciler(
61 operationExecutor operationexecutor.OperationExecutor,
62 loopSleepDuration time.Duration,
63 desiredStateOfWorld cache.DesiredStateOfWorld,
64 actualStateOfWorld cache.ActualStateOfWorld) Reconciler {
65 return &reconciler{
66 operationExecutor: operationExecutor,
67 loopSleepDuration: loopSleepDuration,
68 desiredStateOfWorld: desiredStateOfWorld,
69 actualStateOfWorld: actualStateOfWorld,
70 handlers: make(map[string]cache.PluginHandler),
71 }
72 }
73
74 type reconciler struct {
75 operationExecutor operationexecutor.OperationExecutor
76 loopSleepDuration time.Duration
77 desiredStateOfWorld cache.DesiredStateOfWorld
78 actualStateOfWorld cache.ActualStateOfWorld
79 handlers map[string]cache.PluginHandler
80 sync.RWMutex
81 }
82
83 var _ Reconciler = &reconciler{}
84
85 func (rc *reconciler) Run(stopCh <-chan struct{}) {
86 wait.Until(func() {
87 rc.reconcile()
88 },
89 rc.loopSleepDuration,
90 stopCh)
91 }
92
93 func (rc *reconciler) AddHandler(pluginType string, pluginHandler cache.PluginHandler) {
94 rc.Lock()
95 defer rc.Unlock()
96
97 rc.handlers[pluginType] = pluginHandler
98 }
99
100 func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
101 rc.RLock()
102 defer rc.RUnlock()
103
104 var copyHandlers = make(map[string]cache.PluginHandler)
105 for pluginType, handler := range rc.handlers {
106 copyHandlers[pluginType] = handler
107 }
108 return copyHandlers
109 }
110
111 func (rc *reconciler) reconcile() {
112
113
114
115 for _, registeredPlugin := range rc.actualStateOfWorld.GetRegisteredPlugins() {
116 unregisterPlugin := false
117 if !rc.desiredStateOfWorld.PluginExists(registeredPlugin.SocketPath) {
118 unregisterPlugin = true
119 } else {
120
121
122
123
124 for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {
125 if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.Timestamp != registeredPlugin.Timestamp {
126 klog.V(5).InfoS("An updated version of plugin has been found, unregistering the plugin first before reregistering", "plugin", registeredPlugin)
127 unregisterPlugin = true
128 break
129 }
130 }
131 }
132
133 if unregisterPlugin {
134 klog.V(5).InfoS("Starting operationExecutor.UnregisterPlugin", "plugin", registeredPlugin)
135 err := rc.operationExecutor.UnregisterPlugin(registeredPlugin, rc.actualStateOfWorld)
136 if err != nil &&
137 !goroutinemap.IsAlreadyExists(err) &&
138 !exponentialbackoff.IsExponentialBackoff(err) {
139
140
141 klog.ErrorS(err, "OperationExecutor.UnregisterPlugin failed", "plugin", registeredPlugin)
142 }
143 if err == nil {
144 klog.V(1).InfoS("OperationExecutor.UnregisterPlugin started", "plugin", registeredPlugin)
145 }
146 }
147 }
148
149
150 for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
151 if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
152 klog.V(5).InfoS("Starting operationExecutor.RegisterPlugin", "plugin", pluginToRegister)
153 err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
154 if err != nil &&
155 !goroutinemap.IsAlreadyExists(err) &&
156 !exponentialbackoff.IsExponentialBackoff(err) {
157
158 klog.ErrorS(err, "OperationExecutor.RegisterPlugin failed", "plugin", pluginToRegister)
159 }
160 if err == nil {
161 klog.V(1).InfoS("OperationExecutor.RegisterPlugin started", "plugin", pluginToRegister)
162 }
163 }
164 }
165 }
166
View as plain text