    17  package storage
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"strings"
    23  	"time"
    25  	"github.com/onsi/ginkgo/v2"
    26  	"github.com/onsi/gomega"
    28  	v1 "k8s.io/api/core/v1"
    29  	storagev1 "k8s.io/api/storage/v1"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	clientset "k8s.io/client-go/kubernetes"
    33  	"k8s.io/component-base/metrics/testutil"
    34  	"k8s.io/component-helpers/storage/ephemeral"
    35  	kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
    36  	"k8s.io/kubernetes/test/e2e/framework"
    37  	e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
    38  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    39  	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
    40  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    41  	"k8s.io/kubernetes/test/e2e/storage/testsuites"
    42  	"k8s.io/kubernetes/test/e2e/storage/utils"
    43  	admissionapi "k8s.io/pod-security-admission/api"
    44  )
    46  // This test needs to run in serial because other tests could interfere
    47  // with metrics being tested here.
    48  var _ = utils.SIGDescribe(framework.WithSerial(), "Volume metrics", func() {
    49  	var (
    50  		c              clientset.Interface
    51  		ns             string
    52  		pvc            *v1.PersistentVolumeClaim
    53  		pvcBlock       *v1.PersistentVolumeClaim
    54  		metricsGrabber *e2emetrics.Grabber
    55  		invalidSc      *storagev1.StorageClass
    56  		defaultScName  string
    57  		err            error
    58  	)
    59  	f := framework.NewDefaultFramework("pv")
    60  	f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
    62  	ginkgo.BeforeEach(func(ctx context.Context) {
    63  		c = f.ClientSet
    64  		ns = f.Namespace.Name
    65  		var err error
    67  		// The tests below make various assumptions about the cluster
    68  		// and the underlying storage driver and therefore don't pass
    69  		// with other kinds of clusters and drivers.
    70  		e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws")
    71  		e2epv.SkipIfNoDefaultStorageClass(ctx, c)
    72  		defaultScName, err = e2epv.GetDefaultStorageClassName(ctx, c)
    73  		framework.ExpectNoError(err)
    75  		test := testsuites.StorageClassTest{
    76  			Name:      "default",
    77  			Timeouts:  f.Timeouts,
    78  			ClaimSize: "2Gi",
    79  		}
    81  		fsMode := v1.PersistentVolumeFilesystem
    82  		pvc = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
    83  			ClaimSize:  test.ClaimSize,
    84  			VolumeMode: &fsMode,
    85  		}, ns)
    87  		// selected providers all support PersistentVolumeBlock
    88  		blockMode := v1.PersistentVolumeBlock
    89  		pvcBlock = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
    90  			ClaimSize:  test.ClaimSize,
    91  			VolumeMode: &blockMode,
    92  		}, ns)
    94  		metricsGrabber, err = e2emetrics.NewMetricsGrabber(ctx, c, nil, f.ClientConfig(), true, false, true, false, false, false)
    96  		if err != nil {
    97  			framework.Failf("Error creating metrics grabber : %v", err)
    98  		}
    99  	})
   101  	ginkgo.AfterEach(func(ctx context.Context) {
   102  		newPvc, err := c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(ctx, pvc.Name, metav1.GetOptions{})
   103  		if err != nil {
   104  			framework.Logf("Failed to get pvc %s/%s: %v", pvc.Namespace, pvc.Name, err)
   105  		} else {
   106  			e2epv.DeletePersistentVolumeClaim(ctx, c, newPvc.Name, newPvc.Namespace)
   107  			if newPvc.Spec.VolumeName != "" {
   108  				err = e2epv.WaitForPersistentVolumeDeleted(ctx, c, newPvc.Spec.VolumeName, 5*time.Second, 5*time.Minute)
   109  				framework.ExpectNoError(err, "Persistent Volume %v not deleted by dynamic provisioner", newPvc.Spec.VolumeName)
   110  			}
   111  		}
   113  		if invalidSc != nil {
   114  			err := c.StorageV1().StorageClasses().Delete(ctx, invalidSc.Name, metav1.DeleteOptions{})
   115  			framework.ExpectNoError(err, "Error deleting storageclass %v: %v", invalidSc.Name, err)
   116  			invalidSc = nil
   117  		}
   118  	})
   120  	provisioning := func(ctx context.Context, ephemeral bool) {
   121  		if !metricsGrabber.HasControlPlanePods() {
   122  			e2eskipper.Skipf("Environment does not support getting controller-manager metrics - skipping")
   123  		}
   125  		ginkgo.By("Getting plugin name")
   126  		defaultClass, err := c.StorageV1().StorageClasses().Get(ctx, defaultScName, metav1.GetOptions{})
   127  		framework.ExpectNoError(err, "Error getting default storageclass: %v", err)
   128  		pluginName := defaultClass.Provisioner
   130  		controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   132  		framework.ExpectNoError(err, "Error getting c-m metrics : %v", err)
   134  		storageOpMetrics := getControllerStorageMetrics(controllerMetrics, pluginName)
   136  		if !ephemeral {
   137  			pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   138  			framework.ExpectNoError(err)
   139  			gomega.Expect(pvc).ToNot(gomega.BeNil())
   140  		}
   142  		pod := makePod(f, pvc, ephemeral)
   143  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   144  		framework.ExpectNoError(err)
   146  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   147  		framework.ExpectNoError(err, "Error starting pod %s", pod.Name)
   149  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   151  		updatedStorageMetrics := waitForDetachAndGrabMetrics(ctx, storageOpMetrics, metricsGrabber, pluginName)
   153  		gomega.Expect(updatedStorageMetrics.latencyMetrics).ToNot(gomega.BeEmpty(), "Error fetching c-m updated storage metrics")
   154  		gomega.Expect(updatedStorageMetrics.statusMetrics).ToNot(gomega.BeEmpty(), "Error fetching c-m updated storage metrics")
   156  		volumeOperations := []string{"volume_detach", "volume_attach"}
   158  		for _, volumeOp := range volumeOperations {
   159  			verifyMetricCount(storageOpMetrics, updatedStorageMetrics, volumeOp, false)
   160  		}
   161  	}
   163  	provisioningError := func(ctx context.Context, ephemeral bool) {
   164  		if !metricsGrabber.HasControlPlanePods() {
   165  			e2eskipper.Skipf("Environment does not support getting controller-manager metrics - skipping")
   166  		}
   168  		ginkgo.By("Getting default storageclass")
   169  		defaultClass, err := c.StorageV1().StorageClasses().Get(ctx, defaultScName, metav1.GetOptions{})
   170  		framework.ExpectNoError(err, "Error getting default storageclass: %v", err)
   171  		pluginName := defaultClass.Provisioner
   173  		invalidSc = &storagev1.StorageClass{
   174  			ObjectMeta: metav1.ObjectMeta{
   175  				Name: fmt.Sprintf("fail-metrics-invalid-sc-%s", pvc.Namespace),
   176  			},
   177  			Provisioner: defaultClass.Provisioner,
   178  			Parameters: map[string]string{
   179  				"invalidparam": "invalidvalue",
   180  			},
   181  		}
   182  		_, err = c.StorageV1().StorageClasses().Create(ctx, invalidSc, metav1.CreateOptions{})
   183  		framework.ExpectNoError(err, "Error creating new storageclass: %v", err)
   185  		pvc.Spec.StorageClassName = &invalidSc.Name
   186  		if !ephemeral {
   187  			pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   188  			framework.ExpectNoError(err, "failed to create PVC %s/%s", pvc.Namespace, pvc.Name)
   189  			gomega.Expect(pvc).ToNot(gomega.BeNil())
   190  		}
   192  		ginkgo.By("Creating a pod and expecting it to fail")
   193  		pod := makePod(f, pvc, ephemeral)
   194  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   195  		framework.ExpectNoError(err, "failed to create Pod %s/%s", pod.Namespace, pod.Name)
   197  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   198  		framework.ExpectError(err)
   200  		framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
   201  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   203  		ginkgo.By("Checking failure metrics")
   204  		updatedControllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   205  		framework.ExpectNoError(err, "failed to get controller manager metrics")
   206  		updatedStorageMetrics := getControllerStorageMetrics(updatedControllerMetrics, pluginName)
   208  		gomega.Expect(updatedStorageMetrics.statusMetrics).ToNot(gomega.BeEmpty(), "Error fetching c-m updated storage metrics")
   209  	}
   211  	filesystemMode := func(ctx context.Context, isEphemeral bool) {
   212  		if !isEphemeral {
   213  			pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   214  			framework.ExpectNoError(err)
   215  			gomega.Expect(pvc).ToNot(gomega.BeNil())
   216  		}
   218  		pod := makePod(f, pvc, isEphemeral)
   219  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   220  		framework.ExpectNoError(err)
   222  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   223  		framework.ExpectNoError(err, "Error starting pod %s", pod.Name)
   225  		pod, err = c.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{})
   226  		framework.ExpectNoError(err)
   228  		pvcName := pvc.Name
   229  		if isEphemeral {
   230  			pvcName = ephemeral.VolumeClaimName(pod, &pod.Spec.Volumes[0])
   231  		}
   232  		pvcNamespace := pod.Namespace
   234  		// Verify volume stat metrics were collected for the referenced PVC
   235  		volumeStatKeys := []string{
   236  			kubeletmetrics.VolumeStatsUsedBytesKey,
   237  			kubeletmetrics.VolumeStatsCapacityBytesKey,
   238  			kubeletmetrics.VolumeStatsAvailableBytesKey,
   239  			kubeletmetrics.VolumeStatsInodesKey,
   240  			kubeletmetrics.VolumeStatsInodesFreeKey,
   241  			kubeletmetrics.VolumeStatsInodesUsedKey,
   242  		}
   243  		key := volumeStatKeys[0]
   244  		kubeletKeyName := fmt.Sprintf("%s_%s", kubeletmetrics.KubeletSubsystem, key)
   245  		// Poll kubelet metrics waiting for the volume to be picked up
   246  		// by the volume stats collector
   247  		var kubeMetrics e2emetrics.KubeletMetrics
   248  		waitErr := wait.PollWithContext(ctx, 30*time.Second, 5*time.Minute, func(ctx context.Context) (bool, error) {
   249  			framework.Logf("Grabbing Kubelet metrics")
   250  			// Grab kubelet metrics from the node the pod was scheduled on
   251  			var err error
   252  			kubeMetrics, err = metricsGrabber.GrabFromKubelet(ctx, pod.Spec.NodeName)
   253  			if err != nil {
   254  				framework.Logf("Error fetching kubelet metrics")
   255  				return false, err
   256  			}
   257  			if !findVolumeStatMetric(kubeletKeyName, pvcNamespace, pvcName, kubeMetrics) {
   258  				return false, nil
   259  			}
   260  			return true, nil
   261  		})
   262  		framework.ExpectNoError(waitErr, "Unable to find metric %s for PVC %s/%s", kubeletKeyName, pvcNamespace, pvcName)
   264  		for _, key := range volumeStatKeys {
   265  			kubeletKeyName := fmt.Sprintf("%s_%s", kubeletmetrics.KubeletSubsystem, key)
   266  			found := findVolumeStatMetric(kubeletKeyName, pvcNamespace, pvcName, kubeMetrics)
   267  			if !found {
   268  				framework.Failf("PVC %s, Namespace %s not found for %s", pvcName, pvcNamespace, kubeletKeyName)
   269  			}
   270  		}
   272  		framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
   273  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   274  	}
   276  	blockmode := func(ctx context.Context, isEphemeral bool) {
   277  		if !isEphemeral {
   278  			pvcBlock, err = c.CoreV1().PersistentVolumeClaims(pvcBlock.Namespace).Create(ctx, pvcBlock, metav1.CreateOptions{})
   279  			framework.ExpectNoError(err)
   280  			gomega.Expect(pvcBlock).ToNot(gomega.BeNil())
   281  		}
   283  		pod := makePod(f, pvcBlock, isEphemeral)
   284  		pod.Spec.Containers[0].VolumeDevices = []v1.VolumeDevice{{
   285  			Name:       pod.Spec.Volumes[0].Name,
   286  			DevicePath: "/mnt/" + pvcBlock.Name,
   287  		}}
   288  		pod.Spec.Containers[0].VolumeMounts = nil
   289  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   290  		framework.ExpectNoError(err)
   292  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   293  		framework.ExpectNoError(err, "Error starting pod %s", pod.Name)
   295  		pod, err = c.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{})
   296  		framework.ExpectNoError(err)
   298  		// Verify volume stat metrics were collected for the referenced PVC
   299  		volumeStatKeys := []string{
   300  			// BlockMode PVCs only support capacity (for now)
   301  			kubeletmetrics.VolumeStatsCapacityBytesKey,
   302  		}
   303  		key := volumeStatKeys[0]
   304  		kubeletKeyName := fmt.Sprintf("%s_%s", kubeletmetrics.KubeletSubsystem, key)
   305  		pvcName := pvcBlock.Name
   306  		pvcNamespace := pvcBlock.Namespace
   307  		if isEphemeral {
   308  			pvcName = ephemeral.VolumeClaimName(pod, &pod.Spec.Volumes[0])
   309  			pvcNamespace = pod.Namespace
   310  		}
   311  		// Poll kubelet metrics waiting for the volume to be picked up
   312  		// by the volume stats collector
   313  		var kubeMetrics e2emetrics.KubeletMetrics
   314  		waitErr := wait.Poll(30*time.Second, 5*time.Minute, func() (bool, error) {
   315  			framework.Logf("Grabbing Kubelet metrics")
   316  			// Grab kubelet metrics from the node the pod was scheduled on
   317  			var err error
   318  			kubeMetrics, err = metricsGrabber.GrabFromKubelet(ctx, pod.Spec.NodeName)
   319  			if err != nil {
   320  				framework.Logf("Error fetching kubelet metrics")
   321  				return false, err
   322  			}
   323  			if !findVolumeStatMetric(kubeletKeyName, pvcNamespace, pvcName, kubeMetrics) {
   324  				return false, nil
   325  			}
   326  			return true, nil
   327  		})
   328  		framework.ExpectNoError(waitErr, "Unable to find metric %s for PVC %s/%s", kubeletKeyName, pvcNamespace, pvcName)
   330  		for _, key := range volumeStatKeys {
   331  			kubeletKeyName := fmt.Sprintf("%s_%s", kubeletmetrics.KubeletSubsystem, key)
   332  			found := findVolumeStatMetric(kubeletKeyName, pvcNamespace, pvcName, kubeMetrics)
   333  			if !found {
   334  				framework.Failf("PVC %s, Namespace %s not found for %s", pvcName, pvcNamespace, kubeletKeyName)
   335  			}
   336  		}
   338  		framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
   339  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   340  	}
   342  	totalTime := func(ctx context.Context, isEphemeral bool) {
   343  		if !isEphemeral {
   344  			pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   345  			framework.ExpectNoError(err)
   346  			gomega.Expect(pvc).ToNot(gomega.BeNil())
   347  		}
   349  		pod := makePod(f, pvc, isEphemeral)
   350  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   351  		framework.ExpectNoError(err)
   353  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   354  		framework.ExpectNoError(err, "Error starting pod %s", pod.Name)
   356  		pod, err = c.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{})
   357  		framework.ExpectNoError(err)
   359  		controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   360  		if err != nil {
   361  			e2eskipper.Skipf("Could not get controller-manager metrics - skipping")
   362  		}
   364  		metricKey := "volume_operation_total_seconds_count"
   365  		dimensions := []string{"operation_name", "plugin_name"}
   366  		err = testutil.ValidateMetrics(testutil.Metrics(controllerMetrics), metricKey, dimensions...)
   367  		framework.ExpectNoError(err, "Invalid metric in P/V Controller metrics: %q", metricKey)
   369  		framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
   370  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   371  	}
   373  	volumeManager := func(ctx context.Context, isEphemeral bool) {
   374  		if !isEphemeral {
   375  			pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   376  			framework.ExpectNoError(err)
   377  			gomega.Expect(pvc).ToNot(gomega.BeNil())
   378  		}
   380  		pod := makePod(f, pvc, isEphemeral)
   381  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   382  		framework.ExpectNoError(err)
   384  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   385  		framework.ExpectNoError(err, "Error starting pod %s", pod.Name)
   387  		pod, err = c.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{})
   388  		framework.ExpectNoError(err)
   390  		kubeMetrics, err := metricsGrabber.GrabFromKubelet(ctx, pod.Spec.NodeName)
   391  		framework.ExpectNoError(err)
   393  		// Metrics should have dimensions plugin_name and state available
   394  		totalVolumesKey := "volume_manager_total_volumes"
   395  		dimensions := []string{"state", "plugin_name"}
   396  		err = testutil.ValidateMetrics(testutil.Metrics(kubeMetrics), totalVolumesKey, dimensions...)
   397  		framework.ExpectNoError(err, "Invalid metric in Volume Manager metrics: %q", totalVolumesKey)
   399  		framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
   400  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   401  	}
   403  	adController := func(ctx context.Context, isEphemeral bool) {
   404  		if !isEphemeral {
   405  			pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   406  			framework.ExpectNoError(err)
   407  			gomega.Expect(pvc).ToNot(gomega.BeNil())
   408  		}
   410  		pod := makePod(f, pvc, isEphemeral)
   412  		// Get metrics
   413  		controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   414  		if err != nil {
   415  			e2eskipper.Skipf("Could not get controller-manager metrics - skipping")
   416  		}
   418  		// Create pod
   419  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   420  		framework.ExpectNoError(err)
   421  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   422  		framework.ExpectNoError(err, "Error starting pod %s", pod.Name)
   423  		pod, err = c.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{})
   424  		framework.ExpectNoError(err)
   426  		// Get updated metrics
   427  		updatedControllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   428  		if err != nil {
   429  			e2eskipper.Skipf("Could not get controller-manager metrics - skipping")
   430  		}
   432  		// Forced detach metric should be present
   433  		forceDetachKey := "attachdetach_controller_forced_detaches"
   434  		_, ok := updatedControllerMetrics[forceDetachKey]
   435  		if !ok {
   436  			framework.Failf("Key %q not found in A/D Controller metrics", forceDetachKey)
   437  		}
   439  		// Wait and validate
   440  		totalVolumesKey := "attachdetach_controller_total_volumes"
   441  		states := []string{"actual_state_of_world", "desired_state_of_world"}
   442  		dimensions := []string{"state", "plugin_name"}
   443  		waitForADControllerStatesMetrics(ctx, metricsGrabber, totalVolumesKey, dimensions, states)
   445  		// Total number of volumes in both ActualStateofWorld and DesiredStateOfWorld
   446  		// states should be higher or equal than it used to be
   447  		oldStates := getStatesMetrics(totalVolumesKey, testutil.Metrics(controllerMetrics))
   448  		updatedStates := getStatesMetrics(totalVolumesKey, testutil.Metrics(updatedControllerMetrics))
   449  		for _, stateName := range states {
   450  			if _, ok := oldStates[stateName]; !ok {
   451  				continue
   452  			}
   453  			for pluginName, numVolumes := range updatedStates[stateName] {
   454  				oldNumVolumes := oldStates[stateName][pluginName]
   455  				gomega.Expect(numVolumes).To(gomega.BeNumerically(">=", oldNumVolumes),
   456  					"Wrong number of volumes in state %q, plugin %q: wanted >=%d, got %d",
   457  					stateName, pluginName, oldNumVolumes, numVolumes)
   458  			}
   459  		}
   461  		framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
   462  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   463  	}
   465  	testAll := func(isEphemeral bool) {
   466  		ginkgo.It("should create prometheus metrics for volume provisioning and attach/detach", func(ctx context.Context) {
   467  			provisioning(ctx, isEphemeral)
   468  		})
   469  		// TODO(mauriciopoppe): after CSIMigration is turned on we're no longer reporting
   470  		// the volume_provision metric (removed in #106609), issue to investigate the bug #106773
   471  		f.It("should create prometheus metrics for volume provisioning errors", f.WithSlow(), func(ctx context.Context) {
   472  			provisioningError(ctx, isEphemeral)
   473  		})
   474  		ginkgo.It("should create volume metrics with the correct FilesystemMode PVC ref", func(ctx context.Context) {
   475  			filesystemMode(ctx, isEphemeral)
   476  		})
   477  		ginkgo.It("should create volume metrics with the correct BlockMode PVC ref", func(ctx context.Context) {
   478  			blockmode(ctx, isEphemeral)
   479  		})
   480  		ginkgo.It("should create metrics for total time taken in volume operations in P/V Controller", func(ctx context.Context) {
   481  			totalTime(ctx, isEphemeral)
   482  		})
   483  		ginkgo.It("should create volume metrics in Volume Manager", func(ctx context.Context) {
   484  			volumeManager(ctx, isEphemeral)
   485  		})
   486  		ginkgo.It("should create metrics for total number of volumes in A/D Controller", func(ctx context.Context) {
   487  			adController(ctx, isEphemeral)
   488  		})
   489  	}
   491  	ginkgo.Context("PVC", func() {
   492  		testAll(false)
   493  	})
   495  	ginkgo.Context("Ephemeral", func() {
   496  		testAll(true)
   497  	})
   499  	// Test for pv controller metrics, concretely: bound/unbound pv/pvc count.
   500  	ginkgo.Describe("PVController", func() {
   501  		const (
   502  			classKey      = "storage_class"
   503  			namespaceKey  = "namespace"
   504  			pluginNameKey = "plugin_name"
   505  			volumeModeKey = "volume_mode"
   507  			totalPVKey    = "pv_collector_total_pv_count"
   508  			boundPVKey    = "pv_collector_bound_pv_count"
   509  			unboundPVKey  = "pv_collector_unbound_pv_count"
   510  			boundPVCKey   = "pv_collector_bound_pvc_count"
   511  			unboundPVCKey = "pv_collector_unbound_pvc_count"
   512  		)
   514  		var (
   515  			pv  *v1.PersistentVolume
   516  			pvc *v1.PersistentVolumeClaim
   518  			className = "bound-unbound-count-test-sc"
   519  			pvConfig  = e2epv.PersistentVolumeConfig{
   520  				PVSource: v1.PersistentVolumeSource{
   521  					HostPath: &v1.HostPathVolumeSource{Path: "/data"},
   522  				},
   523  				NamePrefix:       "pv-test-",
   524  				StorageClassName: className,
   525  			}
   526  			pvcConfig = e2epv.PersistentVolumeClaimConfig{StorageClassName: &className}
   528  			e2emetrics = []struct {
   529  				name      string
   530  				dimension string
   531  			}{
   532  				{boundPVKey, classKey},
   533  				{unboundPVKey, classKey},
   534  				{boundPVCKey, namespaceKey},
   535  				{unboundPVCKey, namespaceKey},
   536  			}
   538  			// Original metric values before we create any PV/PVCs. The length should be 4,
   539  			// and the elements should be bound pv count, unbound pv count, bound pvc count,
   540  			// unbound pvc count in turn.
   541  			// We use these values to calculate relative increment of each test.
   542  			originMetricValues []map[string]int64
   543  		)
   545  		// validator used to validate each metric's values, the length of metricValues
   546  		// should be 4, and the elements should be bound pv count, unbound pv count, bound
   547  		// pvc count, unbound pvc count in turn.
   548  		validator := func(ctx context.Context, metricValues []map[string]int64) {
   549  			gomega.Expect(metricValues).To(gomega.HaveLen(4), "Wrong metric size: %d", len(metricValues))
   551  			controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   552  			framework.ExpectNoError(err, "Error getting c-m metricValues: %v", err)
   554  			for i, metric := range e2emetrics {
   555  				expectValues := metricValues[i]
   556  				if expectValues == nil {
   557  					expectValues = make(map[string]int64)
   558  				}
   559  				// We using relative increment value instead of absolute value to reduce unexpected flakes.
   560  				// Concretely, we expect the difference of the updated values and original values for each
   561  				// test suit are equal to expectValues.
   562  				actualValues := calculateRelativeValues(originMetricValues[i],
   563  					testutil.GetMetricValuesForLabel(testutil.Metrics(controllerMetrics), metric.name, metric.dimension))
   564  				gomega.Expect(actualValues).To(gomega.Equal(expectValues), "Wrong pv controller metric %s(%s): wanted %v, got %v",
   565  					metric.name, metric.dimension, expectValues, actualValues)
   566  			}
   567  		}
   569  		ginkgo.BeforeEach(func(ctx context.Context) {
   570  			if !metricsGrabber.HasControlPlanePods() {
   571  				e2eskipper.Skipf("Environment does not support getting controller-manager metrics - skipping")
   572  			}
   574  			pv = e2epv.MakePersistentVolume(pvConfig)
   575  			pvc = e2epv.MakePersistentVolumeClaim(pvcConfig, ns)
   577  			// Initializes all original metric values.
   578  			controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   579  			framework.ExpectNoError(err, "Error getting c-m metricValues: %v", err)
   580  			for _, metric := range e2emetrics {
   581  				originMetricValues = append(originMetricValues,
   582  					testutil.GetMetricValuesForLabel(testutil.Metrics(controllerMetrics), metric.name, metric.dimension))
   583  			}
   584  		})
   586  		ginkgo.AfterEach(func(ctx context.Context) {
   587  			if err := e2epv.DeletePersistentVolume(ctx, c, pv.Name); err != nil {
   588  				framework.Failf("Error deleting pv: %v", err)
   589  			}
   590  			if err := e2epv.DeletePersistentVolumeClaim(ctx, c, pvc.Name, pvc.Namespace); err != nil {
   591  				framework.Failf("Error deleting pvc: %v", err)
   592  			}
   594  			// Clear original metric values.
   595  			originMetricValues = nil
   596  		})
   598  		ginkgo.It("should create none metrics for pvc controller before creating any PV or PVC", func(ctx context.Context) {
   599  			validator(ctx, []map[string]int64{nil, nil, nil, nil})
   600  		})
   602  		ginkgo.It("should create unbound pv count metrics for pvc controller after creating pv only",
   603  			func(ctx context.Context) {
   604  				var err error
   605  				pv, err = e2epv.CreatePV(ctx, c, f.Timeouts, pv)
   606  				framework.ExpectNoError(err, "Error creating pv: %v", err)
   607  				waitForPVControllerSync(ctx, metricsGrabber, unboundPVKey, classKey)
   608  				validator(ctx, []map[string]int64{nil, {className: 1}, nil, nil})
   609  			})
   611  		ginkgo.It("should create unbound pvc count metrics for pvc controller after creating pvc only",
   612  			func(ctx context.Context) {
   613  				var err error
   614  				pvc, err = e2epv.CreatePVC(ctx, c, ns, pvc)
   615  				framework.ExpectNoError(err, "Error creating pvc: %v", err)
   616  				waitForPVControllerSync(ctx, metricsGrabber, unboundPVCKey, namespaceKey)
   617  				validator(ctx, []map[string]int64{nil, nil, nil, {ns: 1}})
   618  			})
   620  		ginkgo.It("should create bound pv/pvc count metrics for pvc controller after creating both pv and pvc",
   621  			func(ctx context.Context) {
   622  				var err error
   623  				pv, pvc, err = e2epv.CreatePVPVC(ctx, c, f.Timeouts, pvConfig, pvcConfig, ns, true)
   624  				framework.ExpectNoError(err, "Error creating pv pvc: %v", err)
   625  				waitForPVControllerSync(ctx, metricsGrabber, boundPVKey, classKey)
   626  				waitForPVControllerSync(ctx, metricsGrabber, boundPVCKey, namespaceKey)
   627  				validator(ctx, []map[string]int64{{className: 1}, nil, {ns: 1}, nil})
   629  			})
   630  		ginkgo.It("should create total pv count metrics for with plugin and volume mode labels after creating pv",
   631  			func(ctx context.Context) {
   632  				var err error
   633  				dimensions := []string{pluginNameKey, volumeModeKey}
   634  				pv, err = e2epv.CreatePV(ctx, c, f.Timeouts, pv)
   635  				framework.ExpectNoError(err, "Error creating pv: %v", err)
   636  				waitForPVControllerSync(ctx, metricsGrabber, totalPVKey, pluginNameKey)
   637  				controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   638  				framework.ExpectNoError(err, "Error getting c-m metricValues: %v", err)
   639  				err = testutil.ValidateMetrics(testutil.Metrics(controllerMetrics), totalPVKey, dimensions...)
   640  				framework.ExpectNoError(err, "Invalid metric in Controller Manager metrics: %q", totalPVKey)
   641  			})
   642  	})
   643  })
   645  type storageControllerMetrics struct {
   646  	latencyMetrics map[string]int64
   647  	statusMetrics  map[string]statusMetricCounts
   648  }
   650  type statusMetricCounts struct {
   651  	successCount int64
   652  	failCount    int64
   653  	otherCount   int64
   654  }
   656  func newStorageControllerMetrics() *storageControllerMetrics {
   657  	return &storageControllerMetrics{
   658  		latencyMetrics: make(map[string]int64),
   659  		statusMetrics:  make(map[string]statusMetricCounts),
   660  	}
   661  }
   663  func waitForDetachAndGrabMetrics(ctx context.Context, oldMetrics *storageControllerMetrics, metricsGrabber *e2emetrics.Grabber, pluginName string) *storageControllerMetrics {
   664  	backoff := wait.Backoff{
   665  		Duration: 10 * time.Second,
   666  		Factor:   1.2,
   667  		Steps:    21,
   668  	}
   670  	updatedStorageMetrics := newStorageControllerMetrics()
   671  	oldDetachCount, ok := oldMetrics.latencyMetrics["volume_detach"]
   672  	if !ok {
   673  		oldDetachCount = 0
   674  	}
   676  	verifyMetricFunc := func(ctx context.Context) (bool, error) {
   677  		updatedMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   679  		if err != nil {
   680  			framework.Logf("Error fetching controller-manager metrics")
   681  			return false, err
   682  		}
   684  		updatedStorageMetrics = getControllerStorageMetrics(updatedMetrics, pluginName)
   685  		newDetachCount, ok := updatedStorageMetrics.latencyMetrics["volume_detach"]
   687  		// if detach metrics are not yet there, we need to retry
   688  		if !ok {
   689  			return false, nil
   690  		}
   692  		// if old Detach count is more or equal to new detach count, that means detach
   693  		// event has not been observed yet.
   694  		if oldDetachCount >= newDetachCount {
   695  			return false, nil
   696  		}
   698  		return true, nil
   699  	}
   701  	waitErr := wait.ExponentialBackoffWithContext(ctx, backoff, verifyMetricFunc)
   702  	framework.ExpectNoError(waitErr, "Unable to get updated metrics for plugin %s", pluginName)
   703  	return updatedStorageMetrics
   704  }
   706  func verifyMetricCount(oldMetrics, newMetrics *storageControllerMetrics, metricName string, expectFailure bool) {
   707  	oldLatencyCount, ok := oldMetrics.latencyMetrics[metricName]
   708  	// if metric does not exist in oldMap, it probably hasn't been emitted yet.
   709  	if !ok {
   710  		oldLatencyCount = 0
   711  	}
   713  	oldStatusCount := int64(0)
   714  	if oldStatusCounts, ok := oldMetrics.statusMetrics[metricName]; ok {
   715  		if expectFailure {
   716  			oldStatusCount = oldStatusCounts.failCount
   717  		} else {
   718  			oldStatusCount = oldStatusCounts.successCount
   719  		}
   720  	}
   722  	newLatencyCount, ok := newMetrics.latencyMetrics[metricName]
   723  	if !expectFailure {
   724  		if !ok {
   725  			framework.Failf("Error getting updated latency metrics for %s", metricName)
   726  		}
   727  	}
   728  	newStatusCounts, ok := newMetrics.statusMetrics[metricName]
   729  	if !ok {
   730  		framework.Failf("Error getting updated status metrics for %s", metricName)
   731  	}
   733  	newStatusCount := int64(0)
   734  	if expectFailure {
   735  		newStatusCount = newStatusCounts.failCount
   736  	} else {
   737  		newStatusCount = newStatusCounts.successCount
   738  	}
   740  	// It appears that in a busy cluster some spurious detaches are unavoidable
   741  	// even if the test is run serially.  We really just verify if new count
   742  	// is greater than old count
   743  	if !expectFailure {
   744  		gomega.Expect(newLatencyCount).To(gomega.BeNumerically(">", oldLatencyCount), "New latency count %d should be more than old count %d for action %s", newLatencyCount, oldLatencyCount, metricName)
   745  	}
   746  	gomega.Expect(newStatusCount).To(gomega.BeNumerically(">", oldStatusCount), "New status count %d should be more than old count %d for action %s", newStatusCount, oldStatusCount, metricName)
   747  }
   749  func getControllerStorageMetrics(ms e2emetrics.ControllerManagerMetrics, pluginName string) *storageControllerMetrics {
   750  	result := newStorageControllerMetrics()
   752  	for method, samples := range ms {
   753  		switch method {
   754  		// from the base metric name "storage_operation_duration_seconds"
   755  		case "storage_operation_duration_seconds_count":
   756  			for _, sample := range samples {
   757  				count := int64(sample.Value)
   758  				operation := string(sample.Metric["operation_name"])
   759  				// if the volumes were provisioned with a CSI Driver
   760  				// the metric operation name will be prefixed with
   761  				// "kubernetes.io/csi:"
   762  				metricPluginName := string(sample.Metric["volume_plugin"])
   763  				status := string(sample.Metric["status"])
   764  				if strings.Index(metricPluginName, pluginName) < 0 {
   765  					// the metric volume plugin field doesn't match
   766  					// the default storageClass.Provisioner field
   767  					continue
   768  				}
   770  				statusCounts := result.statusMetrics[operation]
   771  				switch status {
   772  				case "success":
   773  					statusCounts.successCount = count
   774  				case "fail-unknown":
   775  					statusCounts.failCount = count
   776  				default:
   777  					statusCounts.otherCount = count
   778  				}
   779  				result.statusMetrics[operation] = statusCounts
   780  				result.latencyMetrics[operation] = count
   781  			}
   782  		}
   783  	}
   784  	return result
   785  }
   787  // Finds the sample in the specified metric from `KubeletMetrics` tagged with
   788  // the specified namespace and pvc name
   789  func findVolumeStatMetric(metricKeyName string, namespace string, pvcName string, kubeletMetrics e2emetrics.KubeletMetrics) bool {
   790  	found := false
   791  	errCount := 0
   792  	framework.Logf("Looking for sample in metric `%s` tagged with namespace `%s`, PVC `%s`", metricKeyName, namespace, pvcName)
   793  	if samples, ok := kubeletMetrics[metricKeyName]; ok {
   794  		for _, sample := range samples {
   795  			framework.Logf("Found sample %s", sample.String())
   796  			samplePVC, ok := sample.Metric["persistentvolumeclaim"]
   797  			if !ok {
   798  				framework.Logf("Error getting pvc for metric %s, sample %s", metricKeyName, sample.String())
   799  				errCount++
   800  			}
   801  			sampleNS, ok := sample.Metric["namespace"]
   802  			if !ok {
   803  				framework.Logf("Error getting namespace for metric %s, sample %s", metricKeyName, sample.String())
   804  				errCount++
   805  			}
   807  			if string(samplePVC) == pvcName && string(sampleNS) == namespace {
   808  				found = true
   809  				break
   810  			}
   811  		}
   812  	}
   813  	gomega.Expect(errCount).To(gomega.Equal(0), "Found invalid samples")
   814  	return found
   815  }
   817  // Wait for the count of a pv controller's metric specified by metricName and dimension bigger than zero.
   818  func waitForPVControllerSync(ctx context.Context, metricsGrabber *e2emetrics.Grabber, metricName, dimension string) {
   819  	backoff := wait.Backoff{
   820  		Duration: 10 * time.Second,
   821  		Factor:   1.2,
   822  		Steps:    21,
   823  	}
   824  	verifyMetricFunc := func(ctx context.Context) (bool, error) {
   825  		updatedMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   826  		if err != nil {
   827  			framework.Logf("Error fetching controller-manager metrics")
   828  			return false, err
   829  		}
   830  		return len(testutil.GetMetricValuesForLabel(testutil.Metrics(updatedMetrics), metricName, dimension)) > 0, nil
   831  	}
   832  	waitErr := wait.ExponentialBackoffWithContext(ctx, backoff, verifyMetricFunc)
   833  	framework.ExpectNoError(waitErr, "Unable to get pv controller metrics")
   834  }
   836  func calculateRelativeValues(originValues, updatedValues map[string]int64) map[string]int64 {
   837  	relativeValues := make(map[string]int64)
   838  	for key, value := range updatedValues {
   839  		relativeValue := value - originValues[key]
   840  		if relativeValue != 0 {
   841  			relativeValues[key] = relativeValue
   842  		}
   843  	}
   844  	for key, value := range originValues {
   845  		if _, exist := updatedValues[key]; !exist && value > 0 {
   846  			relativeValues[key] = -value
   847  		}
   848  	}
   849  	return relativeValues
   850  }
   852  func getStatesMetrics(metricKey string, givenMetrics testutil.Metrics) map[string]map[string]int64 {
   853  	states := make(map[string]map[string]int64)
   854  	for _, sample := range givenMetrics[metricKey] {
   855  		framework.Logf("Found sample %q", sample.String())
   856  		state := string(sample.Metric["state"])
   857  		pluginName := string(sample.Metric["plugin_name"])
   858  		states[state] = map[string]int64{pluginName: int64(sample.Value)}
   859  	}
   860  	return states
   861  }
   863  func waitForADControllerStatesMetrics(ctx context.Context, metricsGrabber *e2emetrics.Grabber, metricName string, dimensions []string, stateNames []string) {
   864  	backoff := wait.Backoff{
   865  		Duration: 10 * time.Second,
   866  		Factor:   1.2,
   867  		Steps:    21,
   868  	}
   869  	verifyMetricFunc := func(ctx context.Context) (bool, error) {
   870  		updatedMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   871  		if err != nil {
   872  			e2eskipper.Skipf("Could not get controller-manager metrics - skipping")
   873  			return false, err
   874  		}
   875  		err = testutil.ValidateMetrics(testutil.Metrics(updatedMetrics), metricName, dimensions...)
   876  		if err != nil {
   877  			return false, fmt.Errorf("could not get valid metrics: %v ", err)
   878  		}
   879  		states := getStatesMetrics(metricName, testutil.Metrics(updatedMetrics))
   880  		for _, name := range stateNames {
   881  			if _, ok := states[name]; !ok {
   882  				return false, fmt.Errorf("could not get state %q from A/D Controller metrics", name)
   883  			}
   884  		}
   885  		return true, nil
   886  	}
   887  	waitErr := wait.ExponentialBackoffWithContext(ctx, backoff, verifyMetricFunc)
   888  	framework.ExpectNoError(waitErr, "Unable to get A/D controller metrics")
   889  }
   891  // makePod creates a pod which either references the PVC or creates it via a
   892  // generic ephemeral volume claim template.
   893  func makePod(f *framework.Framework, pvc *v1.PersistentVolumeClaim, isEphemeral bool) *v1.Pod {
   894  	claims := []*v1.PersistentVolumeClaim{pvc}
   895  	pod := e2epod.MakePod(f.Namespace.Name, nil, claims, f.NamespacePodSecurityLevel, "")
   896  	if isEphemeral {
   897  		volSrc := pod.Spec.Volumes[0]
   898  		volSrc.PersistentVolumeClaim = nil
   899  		volSrc.Ephemeral = &v1.EphemeralVolumeSource{
   900  			VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{
   901  				Spec: pvc.Spec,
   902  			},
   903  		}
   904  		pod.Spec.Volumes[0] = volSrc
   905  	}
   906  	return pod
   907  }

