...

Source file src/k8s.io/kubernetes/test/integration/volume/attach_detach_test.go

Documentation: k8s.io/kubernetes/test/integration/volume

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package volume
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"testing"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/apimachinery/pkg/api/resource"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/util/wait"
    29  	clientgoinformers "k8s.io/client-go/informers"
    30  	clientset "k8s.io/client-go/kubernetes"
    31  	restclient "k8s.io/client-go/rest"
    32  	"k8s.io/client-go/tools/cache"
    33  	fakecloud "k8s.io/cloud-provider/fake"
    34  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    35  	"k8s.io/kubernetes/pkg/controller/volume/attachdetach"
    36  	volumecache "k8s.io/kubernetes/pkg/controller/volume/attachdetach/cache"
    37  	"k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
    38  	persistentvolumeoptions "k8s.io/kubernetes/pkg/controller/volume/persistentvolume/options"
    39  	"k8s.io/kubernetes/pkg/volume"
    40  	volumetest "k8s.io/kubernetes/pkg/volume/testing"
    41  	"k8s.io/kubernetes/pkg/volume/util"
    42  	"k8s.io/kubernetes/test/integration/framework"
    43  	"k8s.io/kubernetes/test/utils/ktesting"
    44  )
    45  
    46  func fakePodWithVol(namespace string) *v1.Pod {
    47  	fakePod := &v1.Pod{
    48  		ObjectMeta: metav1.ObjectMeta{
    49  			Namespace: namespace,
    50  			Name:      "fakepod",
    51  		},
    52  		Spec: v1.PodSpec{
    53  			Containers: []v1.Container{
    54  				{
    55  					Name:  "fake-container",
    56  					Image: "nginx",
    57  					VolumeMounts: []v1.VolumeMount{
    58  						{
    59  							Name:      "fake-mount",
    60  							MountPath: "/var/www/html",
    61  						},
    62  					},
    63  				},
    64  			},
    65  			Volumes: []v1.Volume{
    66  				{
    67  					Name: "fake-mount",
    68  					VolumeSource: v1.VolumeSource{
    69  						HostPath: &v1.HostPathVolumeSource{
    70  							Path: "/var/www/html",
    71  						},
    72  					},
    73  				},
    74  			},
    75  			NodeName: "node-sandbox",
    76  		},
    77  	}
    78  	return fakePod
    79  }
    80  
    81  func fakePodWithPVC(name, pvcName, namespace string) (*v1.Pod, *v1.PersistentVolumeClaim) {
    82  	fakePod := &v1.Pod{
    83  		ObjectMeta: metav1.ObjectMeta{
    84  			Namespace: namespace,
    85  			Name:      name,
    86  		},
    87  		Spec: v1.PodSpec{
    88  			Containers: []v1.Container{
    89  				{
    90  					Name:  "fake-container",
    91  					Image: "nginx",
    92  					VolumeMounts: []v1.VolumeMount{
    93  						{
    94  							Name:      "fake-mount",
    95  							MountPath: "/var/www/html",
    96  						},
    97  					},
    98  				},
    99  			},
   100  			Volumes: []v1.Volume{
   101  				{
   102  					Name: "fake-mount",
   103  					VolumeSource: v1.VolumeSource{
   104  						PersistentVolumeClaim: &v1.PersistentVolumeClaimVolumeSource{
   105  							ClaimName: pvcName,
   106  						},
   107  					},
   108  				},
   109  			},
   110  			NodeName: "node-sandbox",
   111  		},
   112  	}
   113  	class := "fake-sc"
   114  	fakePVC := &v1.PersistentVolumeClaim{
   115  		ObjectMeta: metav1.ObjectMeta{
   116  			Namespace: namespace,
   117  			Name:      pvcName,
   118  		},
   119  		Spec: v1.PersistentVolumeClaimSpec{
   120  			AccessModes: []v1.PersistentVolumeAccessMode{
   121  				v1.ReadWriteOnce,
   122  			},
   123  			Resources: v1.VolumeResourceRequirements{
   124  				Requests: v1.ResourceList{
   125  					v1.ResourceName(v1.ResourceStorage): resource.MustParse("5Gi"),
   126  				},
   127  			},
   128  			StorageClassName: &class,
   129  		},
   130  	}
   131  	return fakePod, fakePVC
   132  }
   133  
   134  var defaultTimerConfig = attachdetach.TimerConfig{
   135  	ReconcilerLoopPeriod:                              100 * time.Millisecond,
   136  	ReconcilerMaxWaitForUnmountDuration:               6 * time.Second,
   137  	DesiredStateOfWorldPopulatorLoopSleepPeriod:       1 * time.Second,
   138  	DesiredStateOfWorldPopulatorListPodsRetryDuration: 3 * time.Second,
   139  }
   140  
   141  // Via integration test we can verify that if pod delete
   142  // event is somehow missed by AttachDetach controller - it still
   143  // gets cleaned up by Desired State of World populator.
   144  func TestPodDeletionWithDswp(t *testing.T) {
   145  	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
   146  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
   147  	defer server.TearDownFn()
   148  
   149  	namespaceName := "test-pod-deletion"
   150  	node := &v1.Node{
   151  		ObjectMeta: metav1.ObjectMeta{
   152  			Name: "node-sandbox",
   153  			Annotations: map[string]string{
   154  				util.ControllerManagedAttachAnnotation: "true",
   155  			},
   156  		},
   157  	}
   158  
   159  	tCtx := ktesting.Init(t)
   160  	defer tCtx.Cancel("test has completed")
   161  	testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
   162  
   163  	ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
   164  	defer framework.DeleteNamespaceOrDie(testClient, ns, t)
   165  
   166  	pod := fakePodWithVol(namespaceName)
   167  
   168  	if _, err := testClient.CoreV1().Nodes().Create(tCtx, node, metav1.CreateOptions{}); err != nil {
   169  		t.Fatalf("Failed to created node : %v", err)
   170  	}
   171  
   172  	// start controller loop
   173  	go informers.Core().V1().Nodes().Informer().Run(tCtx.Done())
   174  	if _, err := testClient.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}); err != nil {
   175  		t.Errorf("Failed to create pod : %v", err)
   176  	}
   177  
   178  	podInformer := informers.Core().V1().Pods().Informer()
   179  	go podInformer.Run(tCtx.Done())
   180  
   181  	go informers.Core().V1().PersistentVolumeClaims().Informer().Run(tCtx.Done())
   182  	go informers.Core().V1().PersistentVolumes().Informer().Run(tCtx.Done())
   183  	go informers.Storage().V1().VolumeAttachments().Informer().Run(tCtx.Done())
   184  	initCSIObjects(tCtx.Done(), informers)
   185  	go ctrl.Run(tCtx)
   186  	// Run pvCtrl to avoid leaking goroutines started during its creation.
   187  	go pvCtrl.Run(tCtx)
   188  
   189  	waitToObservePods(t, podInformer, 1)
   190  	podKey, err := cache.MetaNamespaceKeyFunc(pod)
   191  	if err != nil {
   192  		t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err)
   193  	}
   194  
   195  	podInformerObj, _, err := podInformer.GetStore().GetByKey(podKey)
   196  
   197  	if err != nil {
   198  		t.Fatalf("Pod not found in Pod Informer cache : %v", err)
   199  	}
   200  
   201  	waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld())
   202  	// let's stop pod events from getting triggered
   203  	err = podInformer.GetStore().Delete(podInformerObj)
   204  	if err != nil {
   205  		t.Fatalf("Error deleting pod : %v", err)
   206  	}
   207  
   208  	waitToObservePods(t, podInformer, 0)
   209  	// the populator loop turns every 1 minute
   210  	waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 80*time.Second, "expected 0 pods in dsw after pod delete", 0)
   211  }
   212  
   213  func initCSIObjects(stopCh <-chan struct{}, informers clientgoinformers.SharedInformerFactory) {
   214  	go informers.Storage().V1().CSINodes().Informer().Run(stopCh)
   215  	go informers.Storage().V1().CSIDrivers().Informer().Run(stopCh)
   216  }
   217  
   218  func TestPodUpdateWithWithADC(t *testing.T) {
   219  	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
   220  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
   221  	defer server.TearDownFn()
   222  	namespaceName := "test-pod-update"
   223  
   224  	node := &v1.Node{
   225  		ObjectMeta: metav1.ObjectMeta{
   226  			Name: "node-sandbox",
   227  			Annotations: map[string]string{
   228  				util.ControllerManagedAttachAnnotation: "true",
   229  			},
   230  		},
   231  	}
   232  
   233  	tCtx := ktesting.Init(t)
   234  	defer tCtx.Cancel("test has completed")
   235  	testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
   236  
   237  	ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
   238  	defer framework.DeleteNamespaceOrDie(testClient, ns, t)
   239  
   240  	pod := fakePodWithVol(namespaceName)
   241  	podStopCh := make(chan struct{})
   242  	defer close(podStopCh)
   243  
   244  	if _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil {
   245  		t.Fatalf("Failed to created node : %v", err)
   246  	}
   247  
   248  	go informers.Core().V1().Nodes().Informer().Run(podStopCh)
   249  
   250  	if _, err := testClient.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
   251  		t.Errorf("Failed to create pod : %v", err)
   252  	}
   253  
   254  	podInformer := informers.Core().V1().Pods().Informer()
   255  	go podInformer.Run(podStopCh)
   256  
   257  	// start controller loop
   258  	go informers.Core().V1().PersistentVolumeClaims().Informer().Run(tCtx.Done())
   259  	go informers.Core().V1().PersistentVolumes().Informer().Run(tCtx.Done())
   260  	go informers.Storage().V1().VolumeAttachments().Informer().Run(tCtx.Done())
   261  	initCSIObjects(tCtx.Done(), informers)
   262  	go ctrl.Run(tCtx)
   263  	// Run pvCtrl to avoid leaking goroutines started during its creation.
   264  	go pvCtrl.Run(tCtx)
   265  
   266  	waitToObservePods(t, podInformer, 1)
   267  	podKey, err := cache.MetaNamespaceKeyFunc(pod)
   268  	if err != nil {
   269  		t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err)
   270  	}
   271  
   272  	_, _, err = podInformer.GetStore().GetByKey(podKey)
   273  
   274  	if err != nil {
   275  		t.Fatalf("Pod not found in Pod Informer cache : %v", err)
   276  	}
   277  
   278  	waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld())
   279  
   280  	pod.Status.Phase = v1.PodSucceeded
   281  
   282  	if _, err := testClient.CoreV1().Pods(ns.Name).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil {
   283  		t.Errorf("Failed to update pod : %v", err)
   284  	}
   285  
   286  	waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 20*time.Second, "expected 0 pods in dsw after pod completion", 0)
   287  }
   288  
   289  func TestPodUpdateWithKeepTerminatedPodVolumes(t *testing.T) {
   290  	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
   291  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
   292  	defer server.TearDownFn()
   293  	namespaceName := "test-pod-update"
   294  
   295  	node := &v1.Node{
   296  		ObjectMeta: metav1.ObjectMeta{
   297  			Name: "node-sandbox",
   298  			Annotations: map[string]string{
   299  				util.ControllerManagedAttachAnnotation:  "true",
   300  				util.KeepTerminatedPodVolumesAnnotation: "true",
   301  			},
   302  		},
   303  	}
   304  
   305  	tCtx := ktesting.Init(t)
   306  	defer tCtx.Cancel("test has completed")
   307  	testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
   308  
   309  	ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
   310  	defer framework.DeleteNamespaceOrDie(testClient, ns, t)
   311  
   312  	pod := fakePodWithVol(namespaceName)
   313  	podStopCh := make(chan struct{})
   314  	defer close(podStopCh)
   315  
   316  	if _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil {
   317  		t.Fatalf("Failed to created node : %v", err)
   318  	}
   319  
   320  	go informers.Core().V1().Nodes().Informer().Run(podStopCh)
   321  
   322  	if _, err := testClient.CoreV1().Pods(ns.Name).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
   323  		t.Errorf("Failed to create pod : %v", err)
   324  	}
   325  
   326  	podInformer := informers.Core().V1().Pods().Informer()
   327  	go podInformer.Run(podStopCh)
   328  
   329  	// start controller loop
   330  	go informers.Core().V1().PersistentVolumeClaims().Informer().Run(tCtx.Done())
   331  	go informers.Core().V1().PersistentVolumes().Informer().Run(tCtx.Done())
   332  	go informers.Storage().V1().VolumeAttachments().Informer().Run(tCtx.Done())
   333  	initCSIObjects(tCtx.Done(), informers)
   334  	go ctrl.Run(tCtx)
   335  	// Run pvCtrl to avoid leaking goroutines started during its creation.
   336  	go pvCtrl.Run(tCtx)
   337  
   338  	waitToObservePods(t, podInformer, 1)
   339  	podKey, err := cache.MetaNamespaceKeyFunc(pod)
   340  	if err != nil {
   341  		t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err)
   342  	}
   343  
   344  	_, _, err = podInformer.GetStore().GetByKey(podKey)
   345  
   346  	if err != nil {
   347  		t.Fatalf("Pod not found in Pod Informer cache : %v", err)
   348  	}
   349  
   350  	waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld())
   351  
   352  	pod.Status.Phase = v1.PodSucceeded
   353  
   354  	if _, err := testClient.CoreV1().Pods(ns.Name).UpdateStatus(context.TODO(), pod, metav1.UpdateOptions{}); err != nil {
   355  		t.Errorf("Failed to update pod : %v", err)
   356  	}
   357  
   358  	waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 20*time.Second, "expected non-zero pods in dsw if KeepTerminatedPodVolumesAnnotation is set", 1)
   359  }
   360  
   361  // wait for the podInformer to observe the pods. Call this function before
   362  // running the RC manager to prevent the rc manager from creating new pods
   363  // rather than adopting the existing ones.
   364  func waitToObservePods(t *testing.T, podInformer cache.SharedIndexInformer, podNum int) {
   365  	if err := wait.Poll(100*time.Millisecond, 60*time.Second, func() (bool, error) {
   366  		objects := podInformer.GetIndexer().List()
   367  		if len(objects) == podNum {
   368  			return true, nil
   369  		}
   370  		return false, nil
   371  	}); err != nil {
   372  		t.Fatal(err)
   373  	}
   374  }
   375  
   376  // wait for pods to be observed in desired state of world
   377  func waitForPodsInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld) {
   378  	if err := wait.Poll(time.Millisecond*500, wait.ForeverTestTimeout, func() (bool, error) {
   379  		pods := dswp.GetPodToAdd()
   380  		if len(pods) > 0 {
   381  			return true, nil
   382  		}
   383  		return false, nil
   384  	}); err != nil {
   385  		t.Fatalf("Pod not added to desired state of world : %v", err)
   386  	}
   387  }
   388  
   389  // wait for pods to be observed in desired state of world
   390  func waitForPodFuncInDSWP(t *testing.T, dswp volumecache.DesiredStateOfWorld, checkTimeout time.Duration, failMessage string, podCount int) {
   391  	if err := wait.Poll(time.Millisecond*500, checkTimeout, func() (bool, error) {
   392  		pods := dswp.GetPodToAdd()
   393  		if len(pods) == podCount {
   394  			return true, nil
   395  		}
   396  		return false, nil
   397  	}); err != nil {
   398  		t.Fatalf("%s but got error %v", failMessage, err)
   399  	}
   400  }
   401  
   402  func createAdClients(ctx context.Context, t *testing.T, server *kubeapiservertesting.TestServer, syncPeriod time.Duration, timers attachdetach.TimerConfig) (*clientset.Clientset, attachdetach.AttachDetachController, *persistentvolume.PersistentVolumeController, clientgoinformers.SharedInformerFactory) {
   403  	config := restclient.CopyConfig(server.ClientConfig)
   404  	config.QPS = 1000000
   405  	config.Burst = 1000000
   406  	resyncPeriod := 12 * time.Hour
   407  	testClient := clientset.NewForConfigOrDie(server.ClientConfig)
   408  
   409  	host := volumetest.NewFakeVolumeHost(t, "/tmp/fake", nil, nil)
   410  	plugin := &volumetest.FakeVolumePlugin{
   411  		PluginName:             provisionerPluginName,
   412  		Host:                   host,
   413  		Config:                 volume.VolumeConfig{},
   414  		LastProvisionerOptions: volume.VolumeOptions{},
   415  		NewAttacherCallCount:   0,
   416  		NewDetacherCallCount:   0,
   417  		Mounters:               nil,
   418  		Unmounters:             nil,
   419  		Attachers:              nil,
   420  		Detachers:              nil,
   421  	}
   422  	plugins := []volume.VolumePlugin{plugin}
   423  	cloud := &fakecloud.Cloud{}
   424  	informers := clientgoinformers.NewSharedInformerFactory(testClient, resyncPeriod)
   425  	ctrl, err := attachdetach.NewAttachDetachController(
   426  		ctx,
   427  		testClient,
   428  		informers.Core().V1().Pods(),
   429  		informers.Core().V1().Nodes(),
   430  		informers.Core().V1().PersistentVolumeClaims(),
   431  		informers.Core().V1().PersistentVolumes(),
   432  		informers.Storage().V1().CSINodes(),
   433  		informers.Storage().V1().CSIDrivers(),
   434  		informers.Storage().V1().VolumeAttachments(),
   435  		cloud,
   436  		plugins,
   437  		nil, /* prober */
   438  		false,
   439  		5*time.Second,
   440  		false,
   441  		timers,
   442  	)
   443  
   444  	if err != nil {
   445  		t.Fatalf("Error creating AttachDetach : %v", err)
   446  	}
   447  
   448  	// create pv controller
   449  	controllerOptions := persistentvolumeoptions.NewPersistentVolumeControllerOptions()
   450  	params := persistentvolume.ControllerParameters{
   451  		KubeClient:                testClient,
   452  		SyncPeriod:                controllerOptions.PVClaimBinderSyncPeriod,
   453  		VolumePlugins:             plugins,
   454  		Cloud:                     nil,
   455  		ClusterName:               "volume-test-cluster",
   456  		VolumeInformer:            informers.Core().V1().PersistentVolumes(),
   457  		ClaimInformer:             informers.Core().V1().PersistentVolumeClaims(),
   458  		ClassInformer:             informers.Storage().V1().StorageClasses(),
   459  		PodInformer:               informers.Core().V1().Pods(),
   460  		NodeInformer:              informers.Core().V1().Nodes(),
   461  		EnableDynamicProvisioning: false,
   462  	}
   463  	pvCtrl, err := persistentvolume.NewController(ctx, params)
   464  	if err != nil {
   465  		t.Fatalf("Failed to create PV controller: %v", err)
   466  	}
   467  	return testClient, ctrl, pvCtrl, informers
   468  }
   469  
   470  // Via integration test we can verify that if pod add
   471  // event is somehow missed by AttachDetach controller - it still
   472  // gets added by Desired State of World populator.
   473  func TestPodAddedByDswp(t *testing.T) {
   474  	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
   475  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
   476  	defer server.TearDownFn()
   477  	namespaceName := "test-pod-deletion"
   478  
   479  	node := &v1.Node{
   480  		ObjectMeta: metav1.ObjectMeta{
   481  			Name: "node-sandbox",
   482  			Annotations: map[string]string{
   483  				util.ControllerManagedAttachAnnotation: "true",
   484  			},
   485  		},
   486  	}
   487  
   488  	tCtx := ktesting.Init(t)
   489  	defer tCtx.Cancel("test has completed")
   490  	testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, defaultTimerConfig)
   491  
   492  	ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
   493  	defer framework.DeleteNamespaceOrDie(testClient, ns, t)
   494  
   495  	pod := fakePodWithVol(namespaceName)
   496  	podStopCh := make(chan struct{})
   497  
   498  	if _, err := testClient.CoreV1().Nodes().Create(tCtx, node, metav1.CreateOptions{}); err != nil {
   499  		t.Fatalf("Failed to created node : %v", err)
   500  	}
   501  
   502  	go informers.Core().V1().Nodes().Informer().Run(podStopCh)
   503  
   504  	if _, err := testClient.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{}); err != nil {
   505  		t.Errorf("Failed to create pod : %v", err)
   506  	}
   507  
   508  	podInformer := informers.Core().V1().Pods().Informer()
   509  	go podInformer.Run(podStopCh)
   510  
   511  	// start controller loop
   512  	go informers.Core().V1().PersistentVolumeClaims().Informer().Run(tCtx.Done())
   513  	go informers.Core().V1().PersistentVolumes().Informer().Run(tCtx.Done())
   514  	go informers.Storage().V1().VolumeAttachments().Informer().Run(tCtx.Done())
   515  	initCSIObjects(tCtx.Done(), informers)
   516  	go ctrl.Run(tCtx)
   517  	// Run pvCtrl to avoid leaking goroutines started during its creation.
   518  	go pvCtrl.Run(tCtx)
   519  
   520  	waitToObservePods(t, podInformer, 1)
   521  	podKey, err := cache.MetaNamespaceKeyFunc(pod)
   522  	if err != nil {
   523  		t.Fatalf("MetaNamespaceKeyFunc failed with : %v", err)
   524  	}
   525  
   526  	_, _, err = podInformer.GetStore().GetByKey(podKey)
   527  
   528  	if err != nil {
   529  		t.Fatalf("Pod not found in Pod Informer cache : %v", err)
   530  	}
   531  
   532  	waitForPodsInDSWP(t, ctrl.GetDesiredStateOfWorld())
   533  
   534  	// let's stop pod events from getting triggered
   535  	close(podStopCh)
   536  	podNew := pod.DeepCopy()
   537  	newPodName := "newFakepod"
   538  	podNew.SetName(newPodName)
   539  	err = podInformer.GetStore().Add(podNew)
   540  	if err != nil {
   541  		t.Fatalf("Error adding pod : %v", err)
   542  	}
   543  
   544  	waitToObservePods(t, podInformer, 2)
   545  
   546  	// the findAndAddActivePods loop turns every 3 minute
   547  	waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 200*time.Second, "expected 2 pods in dsw after pod addition", 2)
   548  }
   549  
   550  func TestPVCBoundWithADC(t *testing.T) {
   551  	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
   552  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
   553  	defer server.TearDownFn()
   554  
   555  	tCtx := ktesting.Init(t)
   556  	defer tCtx.Cancel("test has completed")
   557  
   558  	namespaceName := "test-pod-deletion"
   559  
   560  	testClient, ctrl, pvCtrl, informers := createAdClients(tCtx, t, server, defaultSyncPeriod, attachdetach.TimerConfig{
   561  		ReconcilerLoopPeriod:                        100 * time.Millisecond,
   562  		ReconcilerMaxWaitForUnmountDuration:         6 * time.Second,
   563  		DesiredStateOfWorldPopulatorLoopSleepPeriod: 24 * time.Hour,
   564  		// Use high duration to disable DesiredStateOfWorldPopulator.findAndAddActivePods loop in test.
   565  		DesiredStateOfWorldPopulatorListPodsRetryDuration: 24 * time.Hour,
   566  	})
   567  
   568  	ns := framework.CreateNamespaceOrDie(testClient, namespaceName, t)
   569  	defer framework.DeleteNamespaceOrDie(testClient, ns, t)
   570  
   571  	node := &v1.Node{
   572  		ObjectMeta: metav1.ObjectMeta{
   573  			Name: "node-sandbox",
   574  			Annotations: map[string]string{
   575  				util.ControllerManagedAttachAnnotation: "true",
   576  			},
   577  		},
   578  	}
   579  	if _, err := testClient.CoreV1().Nodes().Create(context.TODO(), node, metav1.CreateOptions{}); err != nil {
   580  		t.Fatalf("Failed to created node : %v", err)
   581  	}
   582  
   583  	// pods with pvc not bound
   584  	pvcs := []*v1.PersistentVolumeClaim{}
   585  	for i := 0; i < 3; i++ {
   586  		pod, pvc := fakePodWithPVC(fmt.Sprintf("fakepod-pvcnotbound-%d", i), fmt.Sprintf("fakepvc-%d", i), namespaceName)
   587  		if _, err := testClient.CoreV1().Pods(pod.Namespace).Create(context.TODO(), pod, metav1.CreateOptions{}); err != nil {
   588  			t.Errorf("Failed to create pod : %v", err)
   589  		}
   590  		if _, err := testClient.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(context.TODO(), pvc, metav1.CreateOptions{}); err != nil {
   591  			t.Errorf("Failed to create pvc : %v", err)
   592  		}
   593  		pvcs = append(pvcs, pvc)
   594  	}
   595  	// pod with no pvc
   596  	podNew := fakePodWithVol(namespaceName)
   597  	podNew.SetName("fakepod")
   598  	if _, err := testClient.CoreV1().Pods(podNew.Namespace).Create(context.TODO(), podNew, metav1.CreateOptions{}); err != nil {
   599  		t.Errorf("Failed to create pod : %v", err)
   600  	}
   601  
   602  	// start controller loop
   603  	informers.Start(tCtx.Done())
   604  	informers.WaitForCacheSync(tCtx.Done())
   605  	initCSIObjects(tCtx.Done(), informers)
   606  	go ctrl.Run(tCtx)
   607  	go pvCtrl.Run(tCtx)
   608  
   609  	waitToObservePods(t, informers.Core().V1().Pods().Informer(), 4)
   610  	// Give attachdetach controller enough time to populate pods into DSWP.
   611  	time.Sleep(10 * time.Second)
   612  	waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 60*time.Second, "expected 1 pod in dsw", 1)
   613  	for _, pvc := range pvcs {
   614  		createPVForPVC(t, testClient, pvc)
   615  	}
   616  	waitForPodFuncInDSWP(t, ctrl.GetDesiredStateOfWorld(), 60*time.Second, "expected 4 pods in dsw after PVCs are bound", 4)
   617  }
   618  
   619  // Create PV for PVC, pv controller will bind them together.
   620  func createPVForPVC(t *testing.T, testClient *clientset.Clientset, pvc *v1.PersistentVolumeClaim) {
   621  	pv := &v1.PersistentVolume{
   622  		ObjectMeta: metav1.ObjectMeta{
   623  			Name: fmt.Sprintf("fakepv-%s", pvc.Name),
   624  		},
   625  		Spec: v1.PersistentVolumeSpec{
   626  			Capacity:    pvc.Spec.Resources.Requests,
   627  			AccessModes: pvc.Spec.AccessModes,
   628  			PersistentVolumeSource: v1.PersistentVolumeSource{
   629  				HostPath: &v1.HostPathVolumeSource{
   630  					Path: "/var/www/html",
   631  				},
   632  			},
   633  			ClaimRef:         &v1.ObjectReference{Name: pvc.Name, Namespace: pvc.Namespace},
   634  			StorageClassName: *pvc.Spec.StorageClassName,
   635  		},
   636  	}
   637  	if _, err := testClient.CoreV1().PersistentVolumes().Create(context.TODO(), pv, metav1.CreateOptions{}); err != nil {
   638  		t.Errorf("Failed to create pv : %v", err)
   639  	}
   640  }
   641  

View as plain text