...

Source file src/k8s.io/kubernetes/pkg/volume/testing/volume_host.go

Documentation: k8s.io/kubernetes/pkg/volume/testing

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package testing
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"net"
    24  	"path/filepath"
    25  	"strings"
    26  	"sync"
    27  	"testing"
    28  	"time"
    29  
    30  	authenticationv1 "k8s.io/api/authentication/v1"
    31  	v1 "k8s.io/api/core/v1"
    32  	storagev1 "k8s.io/api/storage/v1"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	"k8s.io/apimachinery/pkg/labels"
    35  	"k8s.io/apimachinery/pkg/types"
    36  	"k8s.io/apimachinery/pkg/util/sets"
    37  	"k8s.io/apimachinery/pkg/util/wait"
    38  	"k8s.io/client-go/informers"
    39  	clientset "k8s.io/client-go/kubernetes"
    40  	storagelistersv1 "k8s.io/client-go/listers/storage/v1"
    41  	"k8s.io/client-go/tools/cache"
    42  	"k8s.io/client-go/tools/record"
    43  	cloudprovider "k8s.io/cloud-provider"
    44  	csilibplugins "k8s.io/csi-translation-lib/plugins"
    45  	. "k8s.io/kubernetes/pkg/volume"
    46  	"k8s.io/kubernetes/pkg/volume/util/hostutil"
    47  	"k8s.io/kubernetes/pkg/volume/util/subpath"
    48  	"k8s.io/mount-utils"
    49  	"k8s.io/utils/exec"
    50  	testingexec "k8s.io/utils/exec/testing"
    51  )
    52  
    53  type FakeVolumeHost interface {
    54  	VolumeHost
    55  
    56  	GetPluginMgr() *VolumePluginMgr
    57  }
    58  
    59  // fakeVolumeHost is useful for testing volume plugins.
    60  // TODO: Extract fields specific to fakeKubeletVolumeHost and fakeAttachDetachVolumeHost.
    61  type fakeVolumeHost struct {
    62  	rootDir                string
    63  	kubeClient             clientset.Interface
    64  	pluginMgr              *VolumePluginMgr
    65  	cloud                  cloudprovider.Interface
    66  	mounter                mount.Interface
    67  	hostUtil               hostutil.HostUtils
    68  	exec                   *testingexec.FakeExec
    69  	nodeLabels             map[string]string
    70  	nodeName               string
    71  	subpather              subpath.Interface
    72  	node                   *v1.Node
    73  	csiDriverLister        storagelistersv1.CSIDriverLister
    74  	volumeAttachmentLister storagelistersv1.VolumeAttachmentLister
    75  	informerFactory        informers.SharedInformerFactory
    76  	kubeletErr             error
    77  	mux                    sync.Mutex
    78  }
    79  
    80  var _ VolumeHost = &fakeVolumeHost{}
    81  var _ FakeVolumeHost = &fakeVolumeHost{}
    82  
    83  func NewFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) FakeVolumeHost {
    84  	return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
    85  }
    86  
    87  func NewFakeVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) FakeVolumeHost {
    88  	return newFakeVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil)
    89  }
    90  
    91  func NewFakeVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
    92  	return newFakeVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
    93  }
    94  
    95  func newFakeVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
    96  	host := &fakeVolumeHost{rootDir: rootDir, kubeClient: kubeClient, cloud: cloud, nodeName: nodeName, csiDriverLister: driverLister, volumeAttachmentLister: volumeAttachLister}
    97  	host.mounter = mount.NewFakeMounter(nil)
    98  	host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap)
    99  	host.exec = &testingexec.FakeExec{DisableScripts: true}
   100  	host.pluginMgr = &VolumePluginMgr{}
   101  	if err := host.pluginMgr.InitPlugins(plugins, nil /* prober */, host); err != nil {
   102  		t.Fatalf("Failed to init plugins while creating fake volume host: %v", err)
   103  	}
   104  	host.subpather = &subpath.FakeSubpath{}
   105  	host.informerFactory = informers.NewSharedInformerFactory(kubeClient, time.Minute)
   106  	// Wait until the InitPlugins setup is finished before returning from this setup func
   107  	if err := host.WaitForKubeletErrNil(); err != nil {
   108  		t.Fatalf("Failed to wait for kubelet err to be nil while creating fake volume host: %v", err)
   109  	}
   110  	return host
   111  }
   112  
   113  func (f *fakeVolumeHost) GetPluginDir(podUID string) string {
   114  	return filepath.Join(f.rootDir, "plugins", podUID)
   115  }
   116  
   117  func (f *fakeVolumeHost) GetVolumeDevicePluginDir(pluginName string) string {
   118  	return filepath.Join(f.rootDir, "plugins", pluginName, "volumeDevices")
   119  }
   120  
   121  func (f *fakeVolumeHost) GetPodsDir() string {
   122  	return filepath.Join(f.rootDir, "pods")
   123  }
   124  
   125  func (f *fakeVolumeHost) GetPodVolumeDir(podUID types.UID, pluginName, volumeName string) string {
   126  	return filepath.Join(f.rootDir, "pods", string(podUID), "volumes", pluginName, volumeName)
   127  }
   128  
   129  func (f *fakeVolumeHost) GetPodVolumeDeviceDir(podUID types.UID, pluginName string) string {
   130  	return filepath.Join(f.rootDir, "pods", string(podUID), "volumeDevices", pluginName)
   131  }
   132  
   133  func (f *fakeVolumeHost) GetPodPluginDir(podUID types.UID, pluginName string) string {
   134  	return filepath.Join(f.rootDir, "pods", string(podUID), "plugins", pluginName)
   135  }
   136  
   137  func (f *fakeVolumeHost) GetKubeClient() clientset.Interface {
   138  	return f.kubeClient
   139  }
   140  
   141  func (f *fakeVolumeHost) GetCloudProvider() cloudprovider.Interface {
   142  	return f.cloud
   143  }
   144  
   145  func (f *fakeVolumeHost) GetMounter(pluginName string) mount.Interface {
   146  	return f.mounter
   147  }
   148  
   149  func (f *fakeVolumeHost) GetSubpather() subpath.Interface {
   150  	return f.subpather
   151  }
   152  
   153  func (f *fakeVolumeHost) GetPluginMgr() *VolumePluginMgr {
   154  	return f.pluginMgr
   155  }
   156  
   157  func (f *fakeVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
   158  	return map[v1.UniqueVolumeName]string{}, nil
   159  }
   160  
   161  func (f *fakeVolumeHost) NewWrapperMounter(volName string, spec Spec, pod *v1.Pod, opts VolumeOptions) (Mounter, error) {
   162  	// The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
   163  	wrapperVolumeName := "wrapped_" + volName
   164  	if spec.Volume != nil {
   165  		spec.Volume.Name = wrapperVolumeName
   166  	}
   167  	plug, err := f.pluginMgr.FindPluginBySpec(&spec)
   168  	if err != nil {
   169  		return nil, err
   170  	}
   171  	return plug.NewMounter(&spec, pod, opts)
   172  }
   173  
   174  func (f *fakeVolumeHost) NewWrapperUnmounter(volName string, spec Spec, podUID types.UID) (Unmounter, error) {
   175  	// The name of wrapper volume is set to "wrapped_{wrapped_volume_name}"
   176  	wrapperVolumeName := "wrapped_" + volName
   177  	if spec.Volume != nil {
   178  		spec.Volume.Name = wrapperVolumeName
   179  	}
   180  	plug, err := f.pluginMgr.FindPluginBySpec(&spec)
   181  	if err != nil {
   182  		return nil, err
   183  	}
   184  	return plug.NewUnmounter(spec.Name(), podUID)
   185  }
   186  
   187  // Returns the hostname of the host kubelet is running on
   188  func (f *fakeVolumeHost) GetHostName() string {
   189  	return "fakeHostName"
   190  }
   191  
   192  // Returns host IP or nil in the case of error.
   193  func (f *fakeVolumeHost) GetHostIP() (net.IP, error) {
   194  	return nil, fmt.Errorf("GetHostIP() not implemented")
   195  }
   196  
   197  func (f *fakeVolumeHost) GetNodeAllocatable() (v1.ResourceList, error) {
   198  	return v1.ResourceList{}, nil
   199  }
   200  
   201  func (f *fakeVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
   202  	return func(namespace, name string) (*v1.Secret, error) {
   203  		return f.kubeClient.CoreV1().Secrets(namespace).Get(context.TODO(), name, metav1.GetOptions{})
   204  	}
   205  }
   206  
   207  func (f *fakeVolumeHost) GetExec(pluginName string) exec.Interface {
   208  	return f.exec
   209  }
   210  
   211  func (f *fakeVolumeHost) GetConfigMapFunc() func(namespace, name string) (*v1.ConfigMap, error) {
   212  	return func(namespace, name string) (*v1.ConfigMap, error) {
   213  		return f.kubeClient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
   214  	}
   215  }
   216  
   217  func (f *fakeVolumeHost) GetServiceAccountTokenFunc() func(string, string, *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
   218  	return func(namespace, name string, tr *authenticationv1.TokenRequest) (*authenticationv1.TokenRequest, error) {
   219  		return f.kubeClient.CoreV1().ServiceAccounts(namespace).CreateToken(context.TODO(), name, tr, metav1.CreateOptions{})
   220  	}
   221  }
   222  
   223  func (f *fakeVolumeHost) DeleteServiceAccountTokenFunc() func(types.UID) {
   224  	return func(types.UID) {}
   225  }
   226  
   227  func (f *fakeVolumeHost) GetNodeLabels() (map[string]string, error) {
   228  	if f.nodeLabels == nil {
   229  		f.nodeLabels = map[string]string{"test-label": "test-value"}
   230  	}
   231  	return f.nodeLabels, nil
   232  }
   233  
   234  func (f *fakeVolumeHost) GetNodeName() types.NodeName {
   235  	return types.NodeName(f.nodeName)
   236  }
   237  
   238  func (f *fakeVolumeHost) GetEventRecorder() record.EventRecorder {
   239  	return nil
   240  }
   241  
   242  func (f *fakeVolumeHost) ScriptCommands(scripts []CommandScript) {
   243  	ScriptCommands(f.exec, scripts)
   244  }
   245  
   246  func (f *fakeVolumeHost) WaitForKubeletErrNil() error {
   247  	return wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) {
   248  		f.mux.Lock()
   249  		defer f.mux.Unlock()
   250  		return f.kubeletErr == nil, nil
   251  	})
   252  }
   253  
   254  type fakeAttachDetachVolumeHost struct {
   255  	fakeVolumeHost
   256  }
   257  
   258  var _ AttachDetachVolumeHost = &fakeAttachDetachVolumeHost{}
   259  var _ FakeVolumeHost = &fakeAttachDetachVolumeHost{}
   260  
   261  func NewFakeAttachDetachVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
   262  	return newFakeAttachDetachVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
   263  }
   264  
   265  func newFakeAttachDetachVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) FakeVolumeHost {
   266  	host := &fakeAttachDetachVolumeHost{}
   267  	host.rootDir = rootDir
   268  	host.kubeClient = kubeClient
   269  	host.cloud = cloud
   270  	host.nodeName = nodeName
   271  	host.csiDriverLister = driverLister
   272  	host.volumeAttachmentLister = volumeAttachLister
   273  	host.mounter = mount.NewFakeMounter(nil)
   274  	host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap)
   275  	host.exec = &testingexec.FakeExec{DisableScripts: true}
   276  	host.pluginMgr = &VolumePluginMgr{}
   277  	if err := host.pluginMgr.InitPlugins(plugins, nil /* prober */, host); err != nil {
   278  		t.Fatalf("Failed to init plugins while creating fake volume host: %v", err)
   279  	}
   280  	host.subpather = &subpath.FakeSubpath{}
   281  	host.informerFactory = informers.NewSharedInformerFactory(kubeClient, time.Minute)
   282  	// Wait until the InitPlugins setup is finished before returning from this setup func
   283  	if err := host.WaitForKubeletErrNil(); err != nil {
   284  		t.Fatalf("Failed to wait for kubelet err to be nil while creating fake volume host: %v", err)
   285  	}
   286  	return host
   287  }
   288  
   289  func (f *fakeAttachDetachVolumeHost) CSINodeLister() storagelistersv1.CSINodeLister {
   290  	csiNode := &storagev1.CSINode{
   291  		ObjectMeta: metav1.ObjectMeta{Name: f.nodeName},
   292  		Spec: storagev1.CSINodeSpec{
   293  			Drivers: []storagev1.CSINodeDriver{},
   294  		},
   295  	}
   296  	enableMigrationOnNode(csiNode, csilibplugins.GCEPDInTreePluginName)
   297  	return getFakeCSINodeLister(csiNode)
   298  }
   299  
   300  func enableMigrationOnNode(csiNode *storagev1.CSINode, pluginName string) {
   301  	nodeInfoAnnotations := csiNode.GetAnnotations()
   302  	if nodeInfoAnnotations == nil {
   303  		nodeInfoAnnotations = map[string]string{}
   304  	}
   305  
   306  	newAnnotationSet := sets.NewString()
   307  	newAnnotationSet.Insert(pluginName)
   308  	nas := strings.Join(newAnnotationSet.List(), ",")
   309  	nodeInfoAnnotations[v1.MigratedPluginsAnnotationKey] = nas
   310  
   311  	csiNode.Annotations = nodeInfoAnnotations
   312  }
   313  
   314  func (f *fakeAttachDetachVolumeHost) CSIDriverLister() storagelistersv1.CSIDriverLister {
   315  	return f.csiDriverLister
   316  }
   317  
   318  func (f *fakeAttachDetachVolumeHost) VolumeAttachmentLister() storagelistersv1.VolumeAttachmentLister {
   319  	return f.volumeAttachmentLister
   320  }
   321  
   322  func (f *fakeAttachDetachVolumeHost) IsAttachDetachController() bool {
   323  	return true
   324  }
   325  
   326  type fakeKubeletVolumeHost struct {
   327  	fakeVolumeHost
   328  }
   329  
   330  var _ KubeletVolumeHost = &fakeKubeletVolumeHost{}
   331  var _ FakeVolumeHost = &fakeKubeletVolumeHost{}
   332  
   333  func NewFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin) *fakeKubeletVolumeHost {
   334  	return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, "", nil, nil)
   335  }
   336  
   337  func NewFakeKubeletVolumeHostWithCloudProvider(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface) *fakeKubeletVolumeHost {
   338  	return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, cloud, nil, "", nil, nil)
   339  }
   340  
   341  func NewFakeKubeletVolumeHostWithCSINodeName(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeKubeletVolumeHost {
   342  	return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, nil, nodeName, driverLister, volumeAttachLister)
   343  }
   344  
   345  func NewFakeKubeletVolumeHostWithMounterFSType(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, pathToTypeMap map[string]hostutil.FileType) *fakeKubeletVolumeHost {
   346  	return newFakeKubeletVolumeHost(t, rootDir, kubeClient, plugins, nil, pathToTypeMap, "", nil, nil)
   347  }
   348  
   349  func newFakeKubeletVolumeHost(t *testing.T, rootDir string, kubeClient clientset.Interface, plugins []VolumePlugin, cloud cloudprovider.Interface, pathToTypeMap map[string]hostutil.FileType, nodeName string, driverLister storagelistersv1.CSIDriverLister, volumeAttachLister storagelistersv1.VolumeAttachmentLister) *fakeKubeletVolumeHost {
   350  	host := &fakeKubeletVolumeHost{}
   351  	host.rootDir = rootDir
   352  	host.kubeClient = kubeClient
   353  	host.cloud = cloud
   354  	host.nodeName = nodeName
   355  	host.csiDriverLister = driverLister
   356  	host.volumeAttachmentLister = volumeAttachLister
   357  	host.mounter = mount.NewFakeMounter(nil)
   358  	host.hostUtil = hostutil.NewFakeHostUtil(pathToTypeMap)
   359  	host.exec = &testingexec.FakeExec{DisableScripts: true}
   360  	host.pluginMgr = &VolumePluginMgr{}
   361  	if err := host.pluginMgr.InitPlugins(plugins, nil /* prober */, host); err != nil {
   362  		t.Fatalf("Failed to init plugins while creating fake volume host: %v", err)
   363  	}
   364  	host.subpather = &subpath.FakeSubpath{}
   365  	host.informerFactory = informers.NewSharedInformerFactory(kubeClient, time.Minute)
   366  	// Wait until the InitPlugins setup is finished before returning from this setup func
   367  	if err := host.WaitForKubeletErrNil(); err != nil {
   368  		t.Fatalf("Failed to wait for kubelet err to be nil while creating fake volume host: %v", err)
   369  	}
   370  	return host
   371  }
   372  
   373  func (f *fakeKubeletVolumeHost) WithNode(node *v1.Node) *fakeKubeletVolumeHost {
   374  	f.node = node
   375  	return f
   376  }
   377  
   378  type CSINodeLister []storagev1.CSINode
   379  
   380  // Get returns a fake CSINode object.
   381  func (n CSINodeLister) Get(name string) (*storagev1.CSINode, error) {
   382  	for _, cn := range n {
   383  		if cn.Name == name {
   384  			return &cn, nil
   385  		}
   386  	}
   387  	return nil, fmt.Errorf("csiNode %q not found", name)
   388  }
   389  
   390  // List lists all CSINodes in the indexer.
   391  func (n CSINodeLister) List(selector labels.Selector) (ret []*storagev1.CSINode, err error) {
   392  	return nil, fmt.Errorf("not implemented")
   393  }
   394  
   395  func getFakeCSINodeLister(csiNode *storagev1.CSINode) CSINodeLister {
   396  	csiNodeLister := CSINodeLister{}
   397  	if csiNode != nil {
   398  		csiNodeLister = append(csiNodeLister, *csiNode.DeepCopy())
   399  	}
   400  	return csiNodeLister
   401  }
   402  
   403  func (f *fakeKubeletVolumeHost) SetKubeletError(err error) {
   404  	f.mux.Lock()
   405  	defer f.mux.Unlock()
   406  	f.kubeletErr = err
   407  	return
   408  }
   409  
   410  func (f *fakeKubeletVolumeHost) GetInformerFactory() informers.SharedInformerFactory {
   411  	return f.informerFactory
   412  }
   413  
   414  func (f *fakeKubeletVolumeHost) GetAttachedVolumesFromNodeStatus() (map[v1.UniqueVolumeName]string, error) {
   415  	result := map[v1.UniqueVolumeName]string{}
   416  	if f.node != nil {
   417  		for _, av := range f.node.Status.VolumesAttached {
   418  			result[av.Name] = av.DevicePath
   419  		}
   420  	}
   421  
   422  	return result, nil
   423  }
   424  
   425  func (f *fakeKubeletVolumeHost) CSIDriverLister() storagelistersv1.CSIDriverLister {
   426  	return f.csiDriverLister
   427  }
   428  
   429  func (f *fakeKubeletVolumeHost) CSIDriversSynced() cache.InformerSynced {
   430  	// not needed for testing
   431  	return nil
   432  }
   433  
   434  func (f *fakeKubeletVolumeHost) WaitForCacheSync() error {
   435  	return nil
   436  }
   437  
   438  func (f *fakeKubeletVolumeHost) GetHostUtil() hostutil.HostUtils {
   439  	return f.hostUtil
   440  }
   441  
   442  func (f *fakeKubeletVolumeHost) GetTrustAnchorsByName(name string, allowMissing bool) ([]byte, error) {
   443  	ctb, err := f.kubeClient.CertificatesV1alpha1().ClusterTrustBundles().Get(context.Background(), name, metav1.GetOptions{})
   444  	if err != nil {
   445  		return nil, fmt.Errorf("while getting ClusterTrustBundle %s: %w", name, err)
   446  	}
   447  
   448  	return []byte(ctb.Spec.TrustBundle), nil
   449  }
   450  
   451  // Note: we do none of the deduplication and sorting that the real deal should do.
   452  func (f *fakeKubeletVolumeHost) GetTrustAnchorsBySigner(signerName string, labelSelector *metav1.LabelSelector, allowMissing bool) ([]byte, error) {
   453  	ctbList, err := f.kubeClient.CertificatesV1alpha1().ClusterTrustBundles().List(context.Background(), metav1.ListOptions{})
   454  	if err != nil {
   455  		return nil, fmt.Errorf("while listing all ClusterTrustBundles: %w", err)
   456  	}
   457  
   458  	fullSet := bytes.Buffer{}
   459  	for i, ctb := range ctbList.Items {
   460  		fullSet.WriteString(ctb.Spec.TrustBundle)
   461  		if i != len(ctbList.Items)-1 {
   462  			fullSet.WriteString("\n")
   463  		}
   464  	}
   465  
   466  	return fullSet.Bytes(), nil
   467  }
   468  

View as plain text