1
16
17 package testing
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "net"
24 "path/filepath"
25 "strings"
26 "sync"
27 "testing"
28 "time"
29
30 authenticationv1 "k8s.io/api/authentication/v1"
31 v1 "k8s.io/api/core/v1"
32 storagev1 "k8s.io/api/storage/v1"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/labels"
35 "k8s.io/apimachinery/pkg/types"
36 "k8s.io/apimachinery/pkg/util/sets"
37 "k8s.io/apimachinery/pkg/util/wait"
38 "k8s.io/client-go/informers"
39 clientset "k8s.io/client-go/kubernetes"
40 storagelistersv1 "k8s.io/client-go/listers/storage/v1"
41 "k8s.io/client-go/tools/cache"
42 "k8s.io/client-go/tools/record"
43 cloudprovider "k8s.io/cloud-provider"
44 csilibplugins "k8s.io/csi-translation-lib/plugins"
45 . "k8s.io/kubernetes/pkg/volume"
46 "k8s.io/kubernetes/pkg/volume/util/hostutil"
47 "k8s.io/kubernetes/pkg/volume/util/subpath"
48 "k8s.io/mount-utils"
49 "k8s.io/utils/exec"
50 testingexec "k8s.io/utils/exec/testing"
51 )
52
53 type FakeVolumeHost interface {
54 VolumeHost
55
56 GetPluginMgr() *VolumePluginMgr
57 }
58
59
60
61 type fakeVolumeHost struct {
62 rootDir string
63 kubeClient clientset.Interface
64 pluginMgr *VolumePluginMgr
65 cloud cloudprovider.Interface
66 mounter mount.Interface
67 hostUtil hostutil.HostUtils
68 exec *testingexec.FakeExec
69 nodeLabels map[string]string
70 nodeName string
71 subpather subpath.Interface
72 node *v1.Node
73 csiDriverLister storagelistersv1.CSIDriverLister
74 volumeAttachmentLister storagelistersv1.VolumeAttachmentLister
75 informerFactory informers.SharedInformerFactory
76 kubeletErr error
77 mux sync.Mutex
78 }
79
80 var _ VolumeHost = &fakeVolumeHost{}
81 var _ FakeVolumeHost = &fakeVolumeHost{}
82
83 func NewFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) FakeVolumeHost {
84 return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
85 }
86
87 func NewFakeVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) FakeVolumeHost {
88 return newFakeVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil)
89 }
90
91 func NewFakeVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
92 return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
93 }
94
95 func newFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
96 host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud, nodeName: nodeName, csiDriverLister: driverLister, volumeAttachmentLister: volumeAttachLister}
97 host.mounter = mount.NewFakeMounter(nil)
98 host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap)
99 host.exec = &testingexec.FakeExec{DisableScripts: true}
100 host.pluginMgr = &VolumePluginMgr{}
101 if err := host.pluginMgr.InitPlugins(plugins, nil , host); err != nil {
102 t.Fatalf("Failed to init plugins while creating fake volume host: %v", err)
103 }
104 host.subpather = &subpath.FakeSubpath{}
105 host.informerFactory = informers.NewSharedInformerFactory(kubeClient, time.Minute)
106
107 if err := host.WaitForKubeletErrNil(); err != nil {
108 t.Fatalf("Failed to wait for kubelet err to be nil while creating fake volume host: %v", err)
109 }
110 return host
111 }
112
113 func (f *fakeVolumeHost) GetPluginDir(podUID string) string {
114 return filepath.Join(f.rootDir, "plugins", podUID)
115 }
116
117 func (f *fakeVolumeHost) GetVolumeDevicePluginDir(pluginName string) string {
118 return filepath.Join(f.rootDir, "plugins", pluginName, "volumeDevices")
119 }
120
121 func (f *fakeVolumeHost) GetPodsDir() string {
122 return filepath.Join(f.rootDir, "pods")
123 }
124
125 func (f *fakeVolumeHost) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
126 return filepath.Join(f.rootDir, "pods", string(podUID), "volumes", pluginName, volumeName)
127 }
128
129 func (f *fakeVolumeHost) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
130 return filepath.Join(f.rootDir, "pods", string(podUID), "volumeDevices", pluginName)
131 }
132
133 func (f *fakeVolumeHost) GetPodPluginDir(podUID types.UID, pluginName string) string {
134 return filepath.Join(f.rootDir, "pods", string(podUID), "plugins", pluginName)
135 }
136
137 func (f *fakeVolumeHost) GetKubeClient() clientset.Interface {
138 return f.kubeClient
139 }
140
141 func (f *fakeVolumeHost) GetCloudProvider() cloudprovider.Interface {
142 return f.cloud
143 }
144
145 func (f *fakeVolumeHost) GetMounter(pluginName string) mount.Interface {
146 return f.mounter
147 }
148
149 func (f *fakeVolumeHost) GetSubpather() subpath.Interface {
150 return f.subpather
151 }
152
153 func (f *fakeVolumeHost) GetPluginMgr() *VolumePluginMgr {
154 return f.pluginMgr
155 }
156
157 func (f *fakeVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
158 return map[v1.UniqueVolumeName]string{}, nil
159 }
160
161 func (f *fakeVolumeHost) NewWrapperMounter(volName string, spec Spec, pod *v1.Pod, opts VolumeOptions) (Mounter, error) {
162
163 wrapperVolumeName := "wrapped_" + volName
164 if spec.Volume != nil {
165 spec.Volume.Name = wrapperVolumeName
166 }
167 plug, err := f.pluginMgr.FindPluginBySpec(&spec)
168 if err != nil {
169 return nil, err
170 }
171 return plug.NewMounter(&spec, pod, opts)
172 }
173
174 func (f *fakeVolumeHost) NewWrapperUnmounter(volName string, spec Spec, podUID types.UID) (Unmounter, error) {
175
176 wrapperVolumeName := "wrapped_" + volName
177 if spec.Volume != nil {
178 spec.Volume.Name = wrapperVolumeName
179 }
180 plug, err := f.pluginMgr.FindPluginBySpec(&spec)
181 if err != nil {
182 return nil, err
183 }
184 return plug.NewUnmounter(spec.Name(), podUID)
185 }
186
187
188 func (f *fakeVolumeHost) GetHostName() string {
189 return "fakeHostName"
190 }
191
192
193 func (f *fakeVolumeHost) GetHostIP() (net.IP, error) {
194 return nil, fmt.Errorf("GetHostIP() not implemented")
195 }
196
197 func (f *fakeVolumeHost) GetNodeAllocatable() (v1.ResourceList, error) {
198 return v1.ResourceList{}, nil
199 }
200
201 func (f *fakeVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
202 return func(namespace, name string) (*v1.Secret, error) {
203 return f.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
204 }
205 }
206
207 func (f *fakeVolumeHost) GetExec(pluginName string) exec.Interface {
208 return f.exec
209 }
210
211 func (f *fakeVolumeHost) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
212 return func(namespace, name string) (*v1.ConfigMap, error) {
213 return f.kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
214 }
215 }
216
217 func (f *fakeVolumeHost) GetServiceAccountTokenFunc() func(string, string, *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
218 return func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
219 return f.kubeClient.CoreV1().ServiceAccounts(namespace).CreateToken(context.TODO(), name, tr, metav1.CreateOptions{})
220 }
221 }
222
223 func (f *fakeVolumeHost) DeleteServiceAccountTokenFunc() func(types.UID) {
224 return func(types.UID) {}
225 }
226
227 func (f *fakeVolumeHost) GetNodeLabels() (map[string]string, error) {
228 if f.nodeLabels == nil {
229 f.nodeLabels = map[string]string{"test-label": "test-value"}
230 }
231 return f.nodeLabels, nil
232 }
233
234 func (f *fakeVolumeHost) GetNodeName() types.NodeName {
235 return types.NodeName(f.nodeName)
236 }
237
238 func (f *fakeVolumeHost) GetEventRecorder() record.EventRecorder {
239 return nil
240 }
241
242 func (f *fakeVolumeHost) ScriptCommands(scripts []CommandScript) {
243 ScriptCommands(f.exec, scripts)
244 }
245
246 func (f *fakeVolumeHost) WaitForKubeletErrNil() error {
247 return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
248 f.mux.Lock()
249 defer f.mux.Unlock()
250 return f.kubeletErr == nil, nil
251 })
252 }
253
254 type fakeAttachDetachVolumeHost struct {
255 fakeVolumeHost
256 }
257
258 var _ AttachDetachVolumeHost = &fakeAttachDetachVolumeHost{}
259 var _ FakeVolumeHost = &fakeAttachDetachVolumeHost{}
260
261 func NewFakeAttachDetachVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
262 return newFakeAttachDetachVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
263 }
264
265 func newFakeAttachDetachVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
266 host := &fakeAttachDetachVolumeHost{}
267 host.rootDir = rootDir
268 host.kubeClient = kubeClient
269 host.cloud = cloud
270 host.nodeName = nodeName
271 host.csiDriverLister = driverLister
272 host.volumeAttachmentLister = volumeAttachLister
273 host.mounter = mount.NewFakeMounter(nil)
274 host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap)
275 host.exec = &testingexec.FakeExec{DisableScripts: true}
276 host.pluginMgr = &VolumePluginMgr{}
277 if err := host.pluginMgr.InitPlugins(plugins, nil , host); err != nil {
278 t.Fatalf("Failed to init plugins while creating fake volume host: %v", err)
279 }
280 host.subpather = &subpath.FakeSubpath{}
281 host.informerFactory = informers.NewSharedInformerFactory(kubeClient, time.Minute)
282
283 if err := host.WaitForKubeletErrNil(); err != nil {
284 t.Fatalf("Failed to wait for kubelet err to be nil while creating fake volume host: %v", err)
285 }
286 return host
287 }
288
289 func (f *fakeAttachDetachVolumeHost) CSINodeLister() storagelistersv1.CSINodeLister {
290 csiNode := &storagev1.CSINode{
291 ObjectMeta: metav1.ObjectMeta{Name: f.nodeName},
292 Spec: storagev1.CSINodeSpec{
293 Drivers: []storagev1.CSINodeDriver{},
294 },
295 }
296 enableMigrationOnNode(csiNode, csilibplugins.GCEPDInTreePluginName)
297 return getFakeCSINodeLister(csiNode)
298 }
299
300 func enableMigrationOnNode(csiNode *storagev1.CSINode, pluginName string) {
301 nodeInfoAnnotations := csiNode.GetAnnotations()
302 if nodeInfoAnnotations == nil {
303 nodeInfoAnnotations = map[string]string{}
304 }
305
306 newAnnotationSet := sets.NewString()
307 newAnnotationSet.Insert(pluginName)
308 nas := strings.Join(newAnnotationSet.List(), ",")
309 nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas
310
311 csiNode.Annotations = nodeInfoAnnotations
312 }
313
314 func (f *fakeAttachDetachVolumeHost) CSIDriverLister() storagelistersv1.CSIDriverLister {
315 return f.csiDriverLister
316 }
317
318 func (f *fakeAttachDetachVolumeHost) VolumeAttachmentLister() storagelistersv1.VolumeAttachmentLister {
319 return f.volumeAttachmentLister
320 }
321
322 func (f *fakeAttachDetachVolumeHost) IsAttachDetachController() bool {
323 return true
324 }
325
326 type fakeKubeletVolumeHost struct {
327 fakeVolumeHost
328 }
329
330 var _ KubeletVolumeHost = &fakeKubeletVolumeHost{}
331 var _ FakeVolumeHost = &fakeKubeletVolumeHost{}
332
333 func NewFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeKubeletVolumeHost {
334 return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
335 }
336
337 func NewFakeKubeletVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeKubeletVolumeHost {
338 return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil)
339 }
340
341 func NewFakeKubeletVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeKubeletVolumeHost {
342 return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
343 }
344
345 func NewFakeKubeletVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeKubeletVolumeHost {
346 return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil, nil)
347 }
348
349 func newFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeKubeletVolumeHost {
350 host := &fakeKubeletVolumeHost{}
351 host.rootDir = rootDir
352 host.kubeClient = kubeClient
353 host.cloud = cloud
354 host.nodeName = nodeName
355 host.csiDriverLister = driverLister
356 host.volumeAttachmentLister = volumeAttachLister
357 host.mounter = mount.NewFakeMounter(nil)
358 host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap)
359 host.exec = &testingexec.FakeExec{DisableScripts: true}
360 host.pluginMgr = &VolumePluginMgr{}
361 if err := host.pluginMgr.InitPlugins(plugins, nil , host); err != nil {
362 t.Fatalf("Failed to init plugins while creating fake volume host: %v", err)
363 }
364 host.subpather = &subpath.FakeSubpath{}
365 host.informerFactory = informers.NewSharedInformerFactory(kubeClient, time.Minute)
366
367 if err := host.WaitForKubeletErrNil(); err != nil {
368 t.Fatalf("Failed to wait for kubelet err to be nil while creating fake volume host: %v", err)
369 }
370 return host
371 }
372
373 func (f *fakeKubeletVolumeHost) WithNode(node *v1.Node) *fakeKubeletVolumeHost {
374 f.node = node
375 return f
376 }
377
378 type CSINodeLister []storagev1.CSINode
379
380
381 func (n CSINodeLister) Get(name string) (*storagev1.CSINode, error) {
382 for _, cn := range n {
383 if cn.Name == name {
384 return &cn, nil
385 }
386 }
387 return nil, fmt.Errorf("csiNode %q not found", name)
388 }
389
390
391 func (n CSINodeLister) List(selector labels.Selector) (ret []*storagev1.CSINode, err error) {
392 return nil, fmt.Errorf("not implemented")
393 }
394
395 func getFakeCSINodeLister(csiNode *storagev1.CSINode) CSINodeLister {
396 csiNodeLister := CSINodeLister{}
397 if csiNode != nil {
398 csiNodeLister = append(csiNodeLister, *csiNode.DeepCopy())
399 }
400 return csiNodeLister
401 }
402
403 func (f *fakeKubeletVolumeHost) SetKubeletError(err error) {
404 f.mux.Lock()
405 defer f.mux.Unlock()
406 f.kubeletErr = err
407 return
408 }
409
410 func (f *fakeKubeletVolumeHost) GetInformerFactory() informers.SharedInformerFactory {
411 return f.informerFactory
412 }
413
414 func (f *fakeKubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
415 result := map[v1.UniqueVolumeName]string{}
416 if f.node != nil {
417 for _, av := range f.node.Status.VolumesAttached {
418 result[av.Name] = av.DevicePath
419 }
420 }
421
422 return result, nil
423 }
424
425 func (f *fakeKubeletVolumeHost) CSIDriverLister() storagelistersv1.CSIDriverLister {
426 return f.csiDriverLister
427 }
428
429 func (f *fakeKubeletVolumeHost) CSIDriversSynced() cache.InformerSynced {
430
431 return nil
432 }
433
434 func (f *fakeKubeletVolumeHost) WaitForCacheSync() error {
435 return nil
436 }
437
438 func (f *fakeKubeletVolumeHost) GetHostUtil() hostutil.HostUtils {
439 return f.hostUtil
440 }
441
442 func (f *fakeKubeletVolumeHost) GetTrustAnchorsByName(name string, allowMissing bool) ([]byte, error) {
443 ctb, err := f.kubeClient.CertificatesV1alpha1().ClusterTrustBundles().Get(context.Background(), name, metav1.GetOptions{})
444 if err != nil {
445 return nil, fmt.Errorf("while getting ClusterTrustBundle %s: %w", name, err)
446 }
447
448 return []byte(ctb.Spec.TrustBundle), nil
449 }
450
451
452 func (f *fakeKubeletVolumeHost) GetTrustAnchorsBySigner(signerName string, labelSelector *metav1.LabelSelector, allowMissing bool) ([]byte, error) {
453 ctbList, err := f.kubeClient.CertificatesV1alpha1().ClusterTrustBundles().List(context.Background(), metav1.ListOptions{})
454 if err != nil {
455 return nil, fmt.Errorf("while listing all ClusterTrustBundles: %w", err)
456 }
457
458 fullSet := bytes.Buffer{}
459 for i, ctb := range ctbList.Items {
460 fullSet.WriteString(ctb.Spec.TrustBundle)
461 if i != len(ctbList.Items)-1 {
462 fullSet.WriteString("\n")
463 }
464 }
465
466 return fullSet.Bytes(), nil
467 }
468
View as plain text