...

Source file src/k8s.io/kubernetes/pkg/kubelet/volumemanager/volume_manager_test.go

Documentation: k8s.io/kubernetes/pkg/kubelet/volumemanager

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package volumemanager
    18  
    19  import (
    20  	"context"
    21  	"os"
    22  	"reflect"
    23  	"strconv"
    24  	"strings"
    25  	"testing"
    26  	"time"
    27  
    28  	"k8s.io/mount-utils"
    29  
    30  	v1 "k8s.io/api/core/v1"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	kubetypes "k8s.io/apimachinery/pkg/types"
    33  	"k8s.io/apimachinery/pkg/util/sets"
    34  	"k8s.io/apimachinery/pkg/util/wait"
    35  	clientset "k8s.io/client-go/kubernetes"
    36  	"k8s.io/client-go/kubernetes/fake"
    37  	"k8s.io/client-go/tools/record"
    38  	utiltesting "k8s.io/client-go/util/testing"
    39  	"k8s.io/kubernetes/pkg/kubelet/config"
    40  	containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
    41  	kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
    42  	"k8s.io/kubernetes/pkg/volume"
    43  	volumetest "k8s.io/kubernetes/pkg/volume/testing"
    44  	"k8s.io/kubernetes/pkg/volume/util"
    45  	"k8s.io/kubernetes/pkg/volume/util/hostutil"
    46  	"k8s.io/kubernetes/pkg/volume/util/types"
    47  )
    48  
    49  const (
    50  	testHostname = "test-hostname"
    51  )
    52  
    53  func TestGetMountedVolumesForPodAndGetVolumesInUse(t *testing.T) {
    54  	tests := []struct {
    55  		name            string
    56  		pvMode, podMode v1.PersistentVolumeMode
    57  		expectMount     bool
    58  		expectError     bool
    59  	}{
    60  		{
    61  			name:        "filesystem volume",
    62  			pvMode:      v1.PersistentVolumeFilesystem,
    63  			podMode:     v1.PersistentVolumeFilesystem,
    64  			expectMount: true,
    65  			expectError: false,
    66  		},
    67  		{
    68  			name:        "block volume",
    69  			pvMode:      v1.PersistentVolumeBlock,
    70  			podMode:     v1.PersistentVolumeBlock,
    71  			expectMount: true,
    72  			expectError: false,
    73  		},
    74  		{
    75  			name:        "mismatched volume",
    76  			pvMode:      v1.PersistentVolumeBlock,
    77  			podMode:     v1.PersistentVolumeFilesystem,
    78  			expectMount: false,
    79  			expectError: true,
    80  		},
    81  	}
    82  
    83  	for _, test := range tests {
    84  		t.Run(test.name, func(t *testing.T) {
    85  			tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
    86  			if err != nil {
    87  				t.Fatalf("can't make a temp dir: %v", err)
    88  			}
    89  			defer os.RemoveAll(tmpDir)
    90  			podManager := kubepod.NewBasicPodManager()
    91  
    92  			node, pod, pv, claim := createObjects(test.pvMode, test.podMode)
    93  			kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
    94  
    95  			manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
    96  
    97  			stopCh := runVolumeManager(manager)
    98  			defer close(stopCh)
    99  
   100  			podManager.SetPods([]*v1.Pod{pod})
   101  
   102  			// Fake node status update
   103  			go simulateVolumeInUseUpdate(
   104  				v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
   105  				stopCh,
   106  				manager)
   107  
   108  			err = manager.WaitForAttachAndMount(context.Background(), pod)
   109  			if err != nil && !test.expectError {
   110  				t.Errorf("Expected success: %v", err)
   111  			}
   112  			if err == nil && test.expectError {
   113  				t.Errorf("Expected error, got none")
   114  			}
   115  
   116  			expectedMounted := pod.Spec.Volumes[0].Name
   117  			actualMounted := manager.GetMountedVolumesForPod(types.UniquePodName(pod.ObjectMeta.UID))
   118  			if test.expectMount {
   119  				if _, ok := actualMounted[expectedMounted]; !ok || (len(actualMounted) != 1) {
   120  					t.Errorf("Expected %v to be mounted to pod but got %v", expectedMounted, actualMounted)
   121  				}
   122  			} else {
   123  				if _, ok := actualMounted[expectedMounted]; ok || (len(actualMounted) != 0) {
   124  					t.Errorf("Expected %v not to be mounted to pod but got %v", expectedMounted, actualMounted)
   125  				}
   126  			}
   127  
   128  			expectedInUse := []v1.UniqueVolumeName{}
   129  			if test.expectMount {
   130  				expectedInUse = []v1.UniqueVolumeName{v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name)}
   131  			}
   132  			actualInUse := manager.GetVolumesInUse()
   133  			if !reflect.DeepEqual(expectedInUse, actualInUse) {
   134  				t.Errorf("Expected %v to be in use but got %v", expectedInUse, actualInUse)
   135  			}
   136  		})
   137  	}
   138  }
   139  
   140  func TestWaitForAttachAndMountError(t *testing.T) {
   141  	tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
   142  	if err != nil {
   143  		t.Fatalf("can't make a temp dir: %v", err)
   144  	}
   145  	defer os.RemoveAll(tmpDir)
   146  	podManager := kubepod.NewBasicPodManager()
   147  
   148  	pod := &v1.Pod{
   149  		ObjectMeta: metav1.ObjectMeta{
   150  			Name:      "abc",
   151  			Namespace: "nsA",
   152  			UID:       "1234",
   153  		},
   154  		Spec: v1.PodSpec{
   155  			Containers: []v1.Container{
   156  				{
   157  					Name: "container1",
   158  					VolumeMounts: []v1.VolumeMount{
   159  						{
   160  							Name:      volumetest.FailMountDeviceVolumeName,
   161  							MountPath: "/vol1",
   162  						},
   163  						{
   164  							Name:      "vol2",
   165  							MountPath: "/vol2",
   166  						},
   167  						{
   168  							Name:      "vol02",
   169  							MountPath: "/vol02",
   170  						},
   171  						{
   172  							Name:      "vol3",
   173  							MountPath: "/vol3",
   174  						},
   175  						{
   176  							Name:      "vol03",
   177  							MountPath: "/vol03",
   178  						},
   179  					},
   180  				},
   181  			},
   182  			Volumes: []v1.Volume{
   183  				{
   184  					Name: volumetest.FailMountDeviceVolumeName,
   185  					VolumeSource: v1.VolumeSource{
   186  						ConfigMap: &v1.ConfigMapVolumeSource{},
   187  					},
   188  				},
   189  				{
   190  					Name: "vol2",
   191  					VolumeSource: v1.VolumeSource{
   192  						RBD: &v1.RBDVolumeSource{},
   193  					},
   194  				},
   195  				{
   196  					Name: "vol02",
   197  					VolumeSource: v1.VolumeSource{
   198  						RBD: &v1.RBDVolumeSource{},
   199  					},
   200  				},
   201  				{
   202  					Name: "vol3",
   203  					VolumeSource: v1.VolumeSource{
   204  						AzureDisk: &v1.AzureDiskVolumeSource{},
   205  					},
   206  				},
   207  				{
   208  					Name: "vol03",
   209  					VolumeSource: v1.VolumeSource{
   210  						AzureDisk: &v1.AzureDiskVolumeSource{},
   211  					},
   212  				},
   213  			},
   214  		},
   215  	}
   216  
   217  	kubeClient := fake.NewSimpleClientset(pod)
   218  
   219  	manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, nil)
   220  
   221  	stopCh := runVolumeManager(manager)
   222  	defer close(stopCh)
   223  
   224  	podManager.SetPods([]*v1.Pod{pod})
   225  
   226  	err = manager.WaitForAttachAndMount(context.Background(), pod)
   227  	if err == nil {
   228  		t.Errorf("Expected error, got none")
   229  	}
   230  	if !strings.Contains(err.Error(),
   231  		"unattached volumes=[vol02 vol2], failed to process volumes=[vol03 vol3]") {
   232  		t.Errorf("Unexpected error info: %v", err)
   233  	}
   234  }
   235  
   236  func TestInitialPendingVolumesForPodAndGetVolumesInUse(t *testing.T) {
   237  	tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
   238  	if err != nil {
   239  		t.Fatalf("can't make a temp dir: %v", err)
   240  	}
   241  	defer os.RemoveAll(tmpDir)
   242  	podManager := kubepod.NewBasicPodManager()
   243  
   244  	node, pod, pv, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)
   245  	claim.Status = v1.PersistentVolumeClaimStatus{
   246  		Phase: v1.ClaimPending,
   247  	}
   248  
   249  	kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
   250  
   251  	manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
   252  
   253  	stopCh := runVolumeManager(manager)
   254  	defer close(stopCh)
   255  
   256  	podManager.SetPods([]*v1.Pod{pod})
   257  
   258  	// Fake node status update
   259  	go simulateVolumeInUseUpdate(
   260  		v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
   261  		stopCh,
   262  		manager)
   263  
   264  	// delayed claim binding
   265  	go delayClaimBecomesBound(kubeClient, claim.GetNamespace(), claim.ObjectMeta.Name)
   266  
   267  	err = wait.Poll(100*time.Millisecond, 1*time.Second, func() (bool, error) {
   268  		err = manager.WaitForAttachAndMount(context.Background(), pod)
   269  		if err != nil {
   270  			// Few "PVC not bound" errors are expected
   271  			return false, nil
   272  		}
   273  		return true, nil
   274  	})
   275  	if err != nil {
   276  		t.Errorf("Expected a volume to be mounted, got: %s", err)
   277  	}
   278  
   279  }
   280  
   281  func TestGetExtraSupplementalGroupsForPod(t *testing.T) {
   282  	tmpDir, err := utiltesting.MkTmpdir("volumeManagerTest")
   283  	if err != nil {
   284  		t.Fatalf("can't make a temp dir: %v", err)
   285  	}
   286  	defer os.RemoveAll(tmpDir)
   287  	podManager := kubepod.NewBasicPodManager()
   288  
   289  	node, pod, _, claim := createObjects(v1.PersistentVolumeFilesystem, v1.PersistentVolumeFilesystem)
   290  
   291  	existingGid := pod.Spec.SecurityContext.SupplementalGroups[0]
   292  
   293  	cases := []struct {
   294  		gidAnnotation string
   295  		expected      []int64
   296  	}{
   297  		{
   298  			gidAnnotation: "777",
   299  			expected:      []int64{777},
   300  		},
   301  		{
   302  			gidAnnotation: strconv.FormatInt(int64(existingGid), 10),
   303  			expected:      []int64{},
   304  		},
   305  		{
   306  			gidAnnotation: "a",
   307  			expected:      []int64{},
   308  		},
   309  		{
   310  			gidAnnotation: "",
   311  			expected:      []int64{},
   312  		},
   313  	}
   314  
   315  	for _, tc := range cases {
   316  		fs := v1.PersistentVolumeFilesystem
   317  		pv := &v1.PersistentVolume{
   318  			ObjectMeta: metav1.ObjectMeta{
   319  				Name: "pvA",
   320  				Annotations: map[string]string{
   321  					util.VolumeGidAnnotationKey: tc.gidAnnotation,
   322  				},
   323  			},
   324  			Spec: v1.PersistentVolumeSpec{
   325  				PersistentVolumeSource: v1.PersistentVolumeSource{
   326  					RBD: &v1.RBDPersistentVolumeSource{
   327  						RBDImage: "fake-device",
   328  					},
   329  				},
   330  				ClaimRef: &v1.ObjectReference{
   331  					Name:      claim.ObjectMeta.Name,
   332  					Namespace: claim.ObjectMeta.Namespace,
   333  				},
   334  				VolumeMode: &fs,
   335  			},
   336  		}
   337  		kubeClient := fake.NewSimpleClientset(node, pod, pv, claim)
   338  
   339  		manager := newTestVolumeManager(t, tmpDir, podManager, kubeClient, node)
   340  
   341  		stopCh := runVolumeManager(manager)
   342  		defer close(stopCh)
   343  
   344  		podManager.SetPods([]*v1.Pod{pod})
   345  
   346  		// Fake node status update
   347  		go simulateVolumeInUseUpdate(
   348  			v1.UniqueVolumeName(node.Status.VolumesAttached[0].Name),
   349  			stopCh,
   350  			manager)
   351  
   352  		err = manager.WaitForAttachAndMount(context.Background(), pod)
   353  		if err != nil {
   354  			t.Errorf("Expected success: %v", err)
   355  			continue
   356  		}
   357  
   358  		actual := manager.GetExtraSupplementalGroupsForPod(pod)
   359  		if !reflect.DeepEqual(tc.expected, actual) {
   360  			t.Errorf("Expected supplemental groups %v, got %v", tc.expected, actual)
   361  		}
   362  	}
   363  }
   364  
   365  type fakePodStateProvider struct {
   366  	shouldRemove map[kubetypes.UID]struct{}
   367  	terminating  map[kubetypes.UID]struct{}
   368  }
   369  
   370  func (p *fakePodStateProvider) ShouldPodRuntimeBeRemoved(uid kubetypes.UID) bool {
   371  	_, ok := p.shouldRemove[uid]
   372  	return ok
   373  }
   374  
   375  func (p *fakePodStateProvider) ShouldPodContainersBeTerminating(uid kubetypes.UID) bool {
   376  	_, ok := p.terminating[uid]
   377  	return ok
   378  }
   379  
   380  func newTestVolumeManager(t *testing.T, tmpDir string, podManager kubepod.Manager, kubeClient clientset.Interface, node *v1.Node) VolumeManager {
   381  	attachablePlug := &volumetest.FakeVolumePlugin{
   382  		PluginName: "fake",
   383  		Host:       nil,
   384  		CanSupportFn: func(spec *volume.Spec) bool {
   385  			return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.RBD != nil) ||
   386  				(spec.Volume != nil && spec.Volume.RBD != nil)
   387  		},
   388  	}
   389  	unattachablePlug := &volumetest.FakeVolumePlugin{
   390  		PluginName:    "unattachable-fake-plugin",
   391  		Host:          nil,
   392  		NonAttachable: true,
   393  		CanSupportFn: func(spec *volume.Spec) bool {
   394  			return spec.Volume != nil && spec.Volume.ConfigMap != nil
   395  		},
   396  	}
   397  	fakeRecorder := &record.FakeRecorder{}
   398  	plugMgr := &volume.VolumePluginMgr{}
   399  	// TODO (#51147) inject mock prober
   400  	fakeVolumeHost := volumetest.NewFakeKubeletVolumeHost(t, tmpDir, kubeClient, nil)
   401  	fakeVolumeHost.WithNode(node)
   402  
   403  	plugMgr.InitPlugins([]volume.VolumePlugin{attachablePlug, unattachablePlug}, nil /* prober */, fakeVolumeHost)
   404  	stateProvider := &fakePodStateProvider{}
   405  	fakePathHandler := volumetest.NewBlockVolumePathHandler()
   406  	vm := NewVolumeManager(
   407  		true,
   408  		testHostname,
   409  		podManager,
   410  		stateProvider,
   411  		kubeClient,
   412  		plugMgr,
   413  		&containertest.FakeRuntime{},
   414  		mount.NewFakeMounter(nil),
   415  		hostutil.NewFakeHostUtil(nil),
   416  		"",
   417  		fakeRecorder,
   418  		false, /* keepTerminatedPodVolumes */
   419  		fakePathHandler)
   420  
   421  	return vm
   422  }
   423  
   424  // createObjects returns objects for making a fake clientset. The pv is
   425  // already attached to the node and bound to the claim used by the pod.
   426  func createObjects(pvMode, podMode v1.PersistentVolumeMode) (*v1.Node, *v1.Pod, *v1.PersistentVolume, *v1.PersistentVolumeClaim) {
   427  	node := &v1.Node{
   428  		ObjectMeta: metav1.ObjectMeta{Name: testHostname},
   429  		Status: v1.NodeStatus{
   430  			VolumesAttached: []v1.AttachedVolume{
   431  				{
   432  					Name:       "fake/fake-device",
   433  					DevicePath: "fake/path",
   434  				},
   435  			}},
   436  	}
   437  	pod := &v1.Pod{
   438  		ObjectMeta: metav1.ObjectMeta{
   439  			Name:      "abc",
   440  			Namespace: "nsA",
   441  			UID:       "1234",
   442  		},
   443  		Spec: v1.PodSpec{
   444  			Containers: []v1.Container{
   445  				{
   446  					Name: "container1",
   447  				},
   448  			},
   449  			Volumes: []v1.Volume{
   450  				{
   451  					Name: "vol1",
   452  					VolumeSource: v1.VolumeSource{
   453  						PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
   454  							ClaimName: "claimA",
   455  						},
   456  					},
   457  				},
   458  			},
   459  			SecurityContext: &v1.PodSecurityContext{
   460  				SupplementalGroups: []int64{555},
   461  			},
   462  		},
   463  	}
   464  	switch podMode {
   465  	case v1.PersistentVolumeBlock:
   466  		pod.Spec.Containers[0].VolumeDevices = []v1.VolumeDevice{
   467  			{
   468  				Name:       "vol1",
   469  				DevicePath: "/dev/vol1",
   470  			},
   471  		}
   472  	case v1.PersistentVolumeFilesystem:
   473  		pod.Spec.Containers[0].VolumeMounts = []v1.VolumeMount{
   474  			{
   475  				Name:      "vol1",
   476  				MountPath: "/mnt/vol1",
   477  			},
   478  		}
   479  	default:
   480  		// The volume is not mounted nor mapped
   481  	}
   482  	pv := &v1.PersistentVolume{
   483  		ObjectMeta: metav1.ObjectMeta{
   484  			Name: "pvA",
   485  		},
   486  		Spec: v1.PersistentVolumeSpec{
   487  			PersistentVolumeSource: v1.PersistentVolumeSource{
   488  				RBD: &v1.RBDPersistentVolumeSource{
   489  					RBDImage: "fake-device",
   490  				},
   491  			},
   492  			ClaimRef: &v1.ObjectReference{
   493  				Namespace: "nsA",
   494  				Name:      "claimA",
   495  			},
   496  			VolumeMode: &pvMode,
   497  		},
   498  	}
   499  	claim := &v1.PersistentVolumeClaim{
   500  		ObjectMeta: metav1.ObjectMeta{
   501  			Name:      "claimA",
   502  			Namespace: "nsA",
   503  		},
   504  		Spec: v1.PersistentVolumeClaimSpec{
   505  			VolumeName: "pvA",
   506  			VolumeMode: &pvMode,
   507  		},
   508  		Status: v1.PersistentVolumeClaimStatus{
   509  			Phase: v1.ClaimBound,
   510  		},
   511  	}
   512  	return node, pod, pv, claim
   513  }
   514  
   515  func simulateVolumeInUseUpdate(volumeName v1.UniqueVolumeName, stopCh <-chan struct{}, volumeManager VolumeManager) {
   516  	ticker := time.NewTicker(100 * time.Millisecond)
   517  	defer ticker.Stop()
   518  	for {
   519  		select {
   520  		case <-ticker.C:
   521  			volumeManager.MarkVolumesAsReportedInUse(
   522  				[]v1.UniqueVolumeName{volumeName})
   523  		case <-stopCh:
   524  			return
   525  		}
   526  	}
   527  }
   528  
   529  func delayClaimBecomesBound(
   530  	kubeClient clientset.Interface,
   531  	namespace, claimName string,
   532  ) {
   533  	time.Sleep(500 * time.Millisecond)
   534  	volumeClaim, _ :=
   535  		kubeClient.CoreV1().PersistentVolumeClaims(namespace).Get(context.TODO(), claimName, metav1.GetOptions{})
   536  	volumeClaim.Status = v1.PersistentVolumeClaimStatus{
   537  		Phase: v1.ClaimBound,
   538  	}
   539  	kubeClient.CoreV1().PersistentVolumeClaims(namespace).Update(context.TODO(), volumeClaim, metav1.UpdateOptions{})
   540  }
   541  
   542  func runVolumeManager(manager VolumeManager) chan struct{} {
   543  	stopCh := make(chan struct{})
   544  	//readyCh := make(chan bool, 1)
   545  	//readyCh <- true
   546  	sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
   547  	go manager.Run(sourcesReady, stopCh)
   548  	return stopCh
   549  }
   550  

View as plain text