1
16
17 package pluginwatcher
18
19 import (
20 "flag"
21 "fmt"
22 "os"
23 "path/filepath"
24 "sync"
25 "testing"
26 "time"
27
28 "github.com/stretchr/testify/require"
29 "k8s.io/apimachinery/pkg/util/wait"
30 "k8s.io/klog/v2"
31 registerapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1"
32 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
33 )
34
35 var (
36 supportedVersions = []string{"v1beta1", "v1beta2"}
37 )
38
39 func init() {
40 var logLevel string
41
42 klog.InitFlags(flag.CommandLine)
43 flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
44 flag.StringVar(&logLevel, "logLevel", "6", "test")
45 flag.Lookup("v").Value.Set(logLevel)
46 }
47
48 func initTempDir(t *testing.T) string {
49
50
51 d, err := os.MkdirTemp("", "plugin_test")
52 if err != nil {
53 t.Fatalf("Could not create a temp directory %s: %v", d, err)
54 }
55
56 return d
57 }
58
59 func waitForRegistration(
60 t *testing.T,
61 socketPath string,
62 dsw cache.DesiredStateOfWorld) {
63 err := retryWithExponentialBackOff(
64 time.Duration(500*time.Millisecond),
65 func() (bool, error) {
66 if dsw.PluginExists(socketPath) {
67 return true, nil
68 }
69 return false, nil
70 },
71 )
72 if err != nil {
73 t.Fatalf("Timed out waiting for plugin to be added to desired state of world cache:\n%s.", socketPath)
74 }
75 }
76
77 func waitForUnregistration(
78 t *testing.T,
79 socketPath string,
80 dsw cache.DesiredStateOfWorld) {
81 err := retryWithExponentialBackOff(
82 time.Duration(500*time.Millisecond),
83 func() (bool, error) {
84 if !dsw.PluginExists(socketPath) {
85 return true, nil
86 }
87 return false, nil
88 },
89 )
90
91 if err != nil {
92 t.Fatalf("Timed out waiting for plugin to be unregistered:\n%s.", socketPath)
93 }
94 }
95
96 func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
97 backoff := wait.Backoff{
98 Duration: initialDuration,
99 Factor: 3,
100 Jitter: 0,
101 Steps: 6,
102 }
103 return wait.ExponentialBackoff(backoff, fn)
104 }
105
106 func TestPluginRegistration(t *testing.T) {
107 socketDir := initTempDir(t)
108 defer os.RemoveAll(socketDir)
109
110 dsw := cache.NewDesiredStateOfWorld()
111 newWatcher(t, socketDir, dsw, wait.NeverStop)
112
113 for i := 0; i < 10; i++ {
114 socketPath := filepath.Join(socketDir, fmt.Sprintf("plugin-%d.sock", i))
115 pluginName := fmt.Sprintf("example-plugin-%d", i)
116
117 p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
118 require.NoError(t, p.Serve("v1beta1", "v1beta2"))
119
120 pluginInfo := GetPluginInfo(p)
121 waitForRegistration(t, pluginInfo.SocketPath, dsw)
122
123
124 dswPlugins := dsw.GetPluginsToRegister()
125 if len(dswPlugins) != 1 {
126 t.Fatalf("TestPluginRegistration: desired state of world length should be 1 but it's %d", len(dswPlugins))
127 }
128
129
130 require.NoError(t, p.Stop())
131
132 waitForUnregistration(t, pluginInfo.SocketPath, dsw)
133 dswPlugins = dsw.GetPluginsToRegister()
134 if len(dswPlugins) != 0 {
135 t.Fatalf("TestPluginRegistration: desired state of world length should be 0 but it's %d", len(dswPlugins))
136 }
137 }
138 }
139
140 func TestPluginRegistrationSameName(t *testing.T) {
141 socketDir := initTempDir(t)
142 defer os.RemoveAll(socketDir)
143
144 dsw := cache.NewDesiredStateOfWorld()
145 newWatcher(t, socketDir, dsw, wait.NeverStop)
146
147
148
149 pluginName := "dep-example-plugin"
150 for i := 0; i < 10; i++ {
151 socketPath := filepath.Join(socketDir, fmt.Sprintf("plugin-%d.sock", i))
152 p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
153 require.NoError(t, p.Serve("v1beta1", "v1beta2"))
154
155 pluginInfo := GetPluginInfo(p)
156 waitForRegistration(t, pluginInfo.SocketPath, dsw)
157
158
159 dswPlugins := dsw.GetPluginsToRegister()
160 if len(dswPlugins) != i+1 {
161 t.Fatalf("TestPluginRegistrationSameName: desired state of world length should be %d but it's %d", i+1, len(dswPlugins))
162 }
163 }
164 }
165
166 func TestPluginReRegistration(t *testing.T) {
167 socketDir := initTempDir(t)
168 defer os.RemoveAll(socketDir)
169
170 dsw := cache.NewDesiredStateOfWorld()
171 newWatcher(t, socketDir, dsw, wait.NeverStop)
172
173
174
175 socketPath := filepath.Join(socketDir, "plugin-reregistration.sock")
176 pluginName := "reregister-plugin"
177 p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
178 require.NoError(t, p.Serve("v1beta1", "v1beta2"))
179 pluginInfo := GetPluginInfo(p)
180 lastTimestamp := time.Now()
181 waitForRegistration(t, pluginInfo.SocketPath, dsw)
182
183
184
185 for i := 0; i < 10; i++ {
186
187
188 require.NoError(t, p.Stop())
189 waitForUnregistration(t, pluginInfo.SocketPath, dsw)
190
191
192 pluginName := fmt.Sprintf("dep-example-plugin-%d", i)
193 p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
194 require.NoError(t, p.Serve("v1beta1", "v1beta2"))
195 waitForRegistration(t, pluginInfo.SocketPath, dsw)
196
197
198 dswPlugins := dsw.GetPluginsToRegister()
199 if len(dswPlugins) != 1 {
200 t.Fatalf("TestPluginReRegistration: desired state of world length should be 1 but it's %d", len(dswPlugins))
201 }
202 if !dswPlugins[0].Timestamp.After(lastTimestamp) {
203 t.Fatalf("TestPluginReRegistration: for plugin %s timestamp of plugin is not updated", pluginName)
204 }
205 lastTimestamp = dswPlugins[0].Timestamp
206 }
207 }
208
209 func TestPluginRegistrationAtKubeletStart(t *testing.T) {
210 socketDir := initTempDir(t)
211 defer os.RemoveAll(socketDir)
212
213 plugins := make([]*examplePlugin, 10)
214
215 for i := 0; i < len(plugins); i++ {
216 socketPath := filepath.Join(socketDir, fmt.Sprintf("plugin-%d.sock", i))
217 pluginName := fmt.Sprintf("example-plugin-%d", i)
218
219 p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
220 require.NoError(t, p.Serve("v1beta1", "v1beta2"))
221 defer func(p *examplePlugin) {
222 require.NoError(t, p.Stop())
223 }(p)
224
225 plugins[i] = p
226 }
227
228 dsw := cache.NewDesiredStateOfWorld()
229 newWatcher(t, socketDir, dsw, wait.NeverStop)
230
231 var wg sync.WaitGroup
232 for i := 0; i < len(plugins); i++ {
233 wg.Add(1)
234 go func(p *examplePlugin) {
235 defer wg.Done()
236
237 pluginInfo := GetPluginInfo(p)
238
239 waitForRegistration(t, pluginInfo.SocketPath, dsw)
240 }(plugins[i])
241 }
242
243 c := make(chan struct{})
244 go func() {
245 defer close(c)
246 wg.Wait()
247 }()
248
249 select {
250 case <-c:
251 return
252 case <-time.After(wait.ForeverTestTimeout):
253 t.Fatalf("Timeout while waiting for the plugin registration status")
254 }
255 }
256
257 func newWatcher(t *testing.T, socketDir string, desiredStateOfWorldCache cache.DesiredStateOfWorld, stopCh <-chan struct{}) *Watcher {
258 w := NewWatcher(socketDir, desiredStateOfWorldCache)
259 require.NoError(t, w.Start(stopCh))
260
261 return w
262 }
263
View as plain text