1
16
17 package reconciler
18
19 import (
20 "fmt"
21 "os"
22 "path/filepath"
23 "runtime"
24 "testing"
25 "time"
26
27 "github.com/stretchr/testify/require"
28 "k8s.io/apimachinery/pkg/util/wait"
29 "k8s.io/client-go/tools/record"
30 registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
31 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
32 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
33 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
34 )
35
36 const (
37
38
39 reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond
40 )
41
42 var (
43 socketDir string
44 supportedVersions = []string{"v1beta1", "v1beta2"}
45 )
46
47 func init() {
48 d, err := os.MkdirTemp("", "reconciler_test")
49 if err != nil {
50 panic(fmt.Sprintf("Could not create a temp directory: %s", d))
51 }
52 socketDir = d
53 }
54
55 func cleanup(t *testing.T) {
56 require.NoError(t, os.RemoveAll(socketDir))
57 os.MkdirAll(socketDir, 0755)
58 }
59
60 func runReconciler(reconciler Reconciler) {
61 go reconciler.Run(wait.NeverStop)
62 }
63
64 func waitForRegistration(
65 t *testing.T,
66 socketPath string,
67 previousTimestamp time.Time,
68 asw cache.ActualStateOfWorld) {
69 err := retryWithExponentialBackOff(
70 time.Duration(500*time.Millisecond),
71 func() (bool, error) {
72 registeredPlugins := asw.GetRegisteredPlugins()
73 for _, plugin := range registeredPlugins {
74 if plugin.SocketPath == socketPath && plugin.Timestamp.After(previousTimestamp) {
75 return true, nil
76 }
77 }
78 return false, nil
79 },
80 )
81 if err != nil {
82 t.Fatalf("Timed out waiting for plugin to be registered:\n%s.", socketPath)
83 }
84 }
85
86 func waitForUnregistration(
87 t *testing.T,
88 socketPath string,
89 asw cache.ActualStateOfWorld) {
90 err := retryWithExponentialBackOff(
91 time.Duration(500*time.Millisecond),
92 func() (bool, error) {
93 registeredPlugins := asw.GetRegisteredPlugins()
94 for _, plugin := range registeredPlugins {
95 if plugin.SocketPath == socketPath {
96 return false, nil
97 }
98 }
99 return true, nil
100 },
101 )
102
103 if err != nil {
104 t.Fatalf("Timed out waiting for plugin to be unregistered:\n%s.", socketPath)
105 }
106 }
107
108 func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
109 backoff := wait.Backoff{
110 Duration: initialDuration,
111 Factor: 3,
112 Jitter: 0,
113 Steps: 6,
114 }
115 return wait.ExponentialBackoff(backoff, fn)
116 }
117
118 type DummyImpl struct{}
119
120 func NewDummyImpl() *DummyImpl {
121 return &DummyImpl{}
122 }
123
124
125 func (d *DummyImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
126 return nil
127 }
128
129
130 func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions []string, pluginClientTimeout *time.Duration) error {
131 return nil
132 }
133
134
135 func (d *DummyImpl) DeRegisterPlugin(pluginName string) {
136 }
137
138
139
140 func Test_Run_Positive_DoNothing(t *testing.T) {
141 defer cleanup(t)
142
143 dsw := cache.NewDesiredStateOfWorld()
144 asw := cache.NewActualStateOfWorld()
145 fakeRecorder := &record.FakeRecorder{}
146 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
147 fakeRecorder,
148 ))
149 reconciler := NewReconciler(
150 oex,
151 reconcilerLoopSleepDuration,
152 dsw,
153 asw,
154 )
155
156 runReconciler(reconciler)
157
158
159 if len(asw.GetRegisteredPlugins()) != 0 {
160 t.Fatalf("Test_Run_Positive_DoNothing: actual state of world should be empty but it's not")
161 }
162 if len(dsw.GetPluginsToRegister()) != 0 {
163 t.Fatalf("Test_Run_Positive_DoNothing: desired state of world should be empty but it's not")
164 }
165 }
166
167
168
169
170 func Test_Run_Positive_Register(t *testing.T) {
171
172 if runtime.GOOS == "windows" {
173 t.Skip("Skipping test that fails on Windows")
174 }
175
176 defer cleanup(t)
177
178 dsw := cache.NewDesiredStateOfWorld()
179 asw := cache.NewActualStateOfWorld()
180 di := NewDummyImpl()
181 fakeRecorder := &record.FakeRecorder{}
182 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
183 fakeRecorder,
184 ))
185 reconciler := NewReconciler(
186 oex,
187 reconcilerLoopSleepDuration,
188 dsw,
189 asw,
190 )
191 reconciler.AddHandler(registerapi.DevicePlugin, cache.PluginHandler(di))
192
193
194 stopChan := make(chan struct{})
195 defer close(stopChan)
196 go reconciler.Run(stopChan)
197 socketPath := filepath.Join(socketDir, "plugin.sock")
198 pluginName := fmt.Sprintf("example-plugin")
199 p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
200 require.NoError(t, p.Serve("v1beta1", "v1beta2"))
201 defer func() {
202 require.NoError(t, p.Stop())
203 }()
204 timestampBeforeRegistration := time.Now()
205 dsw.AddOrUpdatePlugin(socketPath)
206 waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
207
208
209 aswPlugins := asw.GetRegisteredPlugins()
210 if len(aswPlugins) != 1 {
211 t.Fatalf("Test_Run_Positive_Register: actual state of world length should be one but it's %d", len(aswPlugins))
212 }
213 if aswPlugins[0].SocketPath != socketPath {
214 t.Fatalf("Test_Run_Positive_Register: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
215 }
216 }
217
218
219
220
221
222
223 func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
224
225 if runtime.GOOS == "windows" {
226 t.Skip("Skipping test that fails on Windows")
227 }
228
229 defer cleanup(t)
230
231 dsw := cache.NewDesiredStateOfWorld()
232 asw := cache.NewActualStateOfWorld()
233 di := NewDummyImpl()
234 fakeRecorder := &record.FakeRecorder{}
235 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
236 fakeRecorder,
237 ))
238 reconciler := NewReconciler(
239 oex,
240 reconcilerLoopSleepDuration,
241 dsw,
242 asw,
243 )
244 reconciler.AddHandler(registerapi.DevicePlugin, cache.PluginHandler(di))
245
246
247 stopChan := make(chan struct{})
248 defer close(stopChan)
249 go reconciler.Run(stopChan)
250
251 socketPath := filepath.Join(socketDir, "plugin.sock")
252 pluginName := fmt.Sprintf("example-plugin")
253 p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
254 require.NoError(t, p.Serve("v1beta1", "v1beta2"))
255 timestampBeforeRegistration := time.Now()
256 dsw.AddOrUpdatePlugin(socketPath)
257 waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
258
259
260 aswPlugins := asw.GetRegisteredPlugins()
261 if len(aswPlugins) != 1 {
262 t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be one but it's %d", len(aswPlugins))
263 }
264 if aswPlugins[0].SocketPath != socketPath {
265 t.Fatalf("Test_Run_Positive_RegisterThenUnregister: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
266 }
267
268 dsw.RemovePlugin(socketPath)
269 os.Remove(socketPath)
270 waitForUnregistration(t, socketPath, asw)
271
272
273 aswPlugins = asw.GetRegisteredPlugins()
274 if len(aswPlugins) != 0 {
275 t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be zero but it's %d", len(aswPlugins))
276 }
277 }
278
279
280
281
282
283
284 func Test_Run_Positive_ReRegister(t *testing.T) {
285
286 if runtime.GOOS == "windows" {
287 t.Skip("Skipping test that fails on Windows")
288 }
289
290 defer cleanup(t)
291
292 dsw := cache.NewDesiredStateOfWorld()
293 asw := cache.NewActualStateOfWorld()
294 di := NewDummyImpl()
295 fakeRecorder := &record.FakeRecorder{}
296 oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
297 fakeRecorder,
298 ))
299 reconciler := NewReconciler(
300 oex,
301 reconcilerLoopSleepDuration,
302 dsw,
303 asw,
304 )
305 reconciler.AddHandler(registerapi.DevicePlugin, cache.PluginHandler(di))
306
307
308 stopChan := make(chan struct{})
309 defer close(stopChan)
310 go reconciler.Run(stopChan)
311
312 socketPath := filepath.Join(socketDir, "plugin2.sock")
313 pluginName := fmt.Sprintf("example-plugin2")
314 p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
315 require.NoError(t, p.Serve("v1beta1", "v1beta2"))
316 timestampBeforeRegistration := time.Now()
317 dsw.AddOrUpdatePlugin(socketPath)
318 waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
319
320 timeStampBeforeReRegistration := time.Now()
321
322 dsw.AddOrUpdatePlugin(socketPath)
323
324
325
326
327 waitForRegistration(t, socketPath, timeStampBeforeReRegistration, asw)
328
329
330 aswPlugins := asw.GetRegisteredPlugins()
331 if len(aswPlugins) != 1 {
332 t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be one but it's %d", len(aswPlugins))
333 }
334 if aswPlugins[0].SocketPath != socketPath {
335 t.Fatalf("Test_Run_Positive_RegisterThenUnregister: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
336 }
337 }
338
View as plain text