...

Source file src/k8s.io/kubernetes/test/e2e/storage/volume_metrics.go

Documentation: k8s.io/kubernetes/test/e2e/storage

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package storage
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"strings"
    23  	"time"
    24  
    25  	"github.com/onsi/ginkgo/v2"
    26  	"github.com/onsi/gomega"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	storagev1 "k8s.io/api/storage/v1"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	clientset "k8s.io/client-go/kubernetes"
    33  	"k8s.io/component-base/metrics/testutil"
    34  	"k8s.io/component-helpers/storage/ephemeral"
    35  	kubeletmetrics "k8s.io/kubernetes/pkg/kubelet/metrics"
    36  	"k8s.io/kubernetes/test/e2e/framework"
    37  	e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
    38  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    39  	e2epv "k8s.io/kubernetes/test/e2e/framework/pv"
    40  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    41  	"k8s.io/kubernetes/test/e2e/storage/testsuites"
    42  	"k8s.io/kubernetes/test/e2e/storage/utils"
    43  	admissionapi "k8s.io/pod-security-admission/api"
    44  )
    45  
    46  // This test needs to run in serial because other tests could interfere
    47  // with metrics being tested here.
    48  var _ = utils.SIGDescribe(framework.WithSerial(), "Volume metrics", func() {
    49  	var (
    50  		c              clientset.Interface
    51  		ns             string
    52  		pvc            *v1.PersistentVolumeClaim
    53  		pvcBlock       *v1.PersistentVolumeClaim
    54  		metricsGrabber *e2emetrics.Grabber
    55  		invalidSc      *storagev1.StorageClass
    56  		defaultScName  string
    57  		err            error
    58  	)
    59  	f := framework.NewDefaultFramework("pv")
    60  	f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
    61  
    62  	ginkgo.BeforeEach(func(ctx context.Context) {
    63  		c = f.ClientSet
    64  		ns = f.Namespace.Name
    65  		var err error
    66  
    67  		// The tests below make various assumptions about the cluster
    68  		// and the underlying storage driver and therefore don't pass
    69  		// with other kinds of clusters and drivers.
    70  		e2eskipper.SkipUnlessProviderIs("gce", "gke", "aws")
    71  		e2epv.SkipIfNoDefaultStorageClass(ctx, c)
    72  		defaultScName, err = e2epv.GetDefaultStorageClassName(ctx, c)
    73  		framework.ExpectNoError(err)
    74  
    75  		test := testsuites.StorageClassTest{
    76  			Name:      "default",
    77  			Timeouts:  f.Timeouts,
    78  			ClaimSize: "2Gi",
    79  		}
    80  
    81  		fsMode := v1.PersistentVolumeFilesystem
    82  		pvc = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
    83  			ClaimSize:  test.ClaimSize,
    84  			VolumeMode: &fsMode,
    85  		}, ns)
    86  
    87  		// selected providers all support PersistentVolumeBlock
    88  		blockMode := v1.PersistentVolumeBlock
    89  		pvcBlock = e2epv.MakePersistentVolumeClaim(e2epv.PersistentVolumeClaimConfig{
    90  			ClaimSize:  test.ClaimSize,
    91  			VolumeMode: &blockMode,
    92  		}, ns)
    93  
    94  		metricsGrabber, err = e2emetrics.NewMetricsGrabber(ctx, c, nil, f.ClientConfig(), true, false, true, false, false, false)
    95  
    96  		if err != nil {
    97  			framework.Failf("Error creating metrics grabber : %v", err)
    98  		}
    99  	})
   100  
   101  	ginkgo.AfterEach(func(ctx context.Context) {
   102  		newPvc, err := c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Get(ctx, pvc.Name, metav1.GetOptions{})
   103  		if err != nil {
   104  			framework.Logf("Failed to get pvc %s/%s: %v", pvc.Namespace, pvc.Name, err)
   105  		} else {
   106  			e2epv.DeletePersistentVolumeClaim(ctx, c, newPvc.Name, newPvc.Namespace)
   107  			if newPvc.Spec.VolumeName != "" {
   108  				err = e2epv.WaitForPersistentVolumeDeleted(ctx, c, newPvc.Spec.VolumeName, 5*time.Second, 5*time.Minute)
   109  				framework.ExpectNoError(err, "Persistent Volume %v not deleted by dynamic provisioner", newPvc.Spec.VolumeName)
   110  			}
   111  		}
   112  
   113  		if invalidSc != nil {
   114  			err := c.StorageV1().StorageClasses().Delete(ctx, invalidSc.Name, metav1.DeleteOptions{})
   115  			framework.ExpectNoError(err, "Error deleting storageclass %v: %v", invalidSc.Name, err)
   116  			invalidSc = nil
   117  		}
   118  	})
   119  
   120  	provisioning := func(ctx context.Context, ephemeral bool) {
   121  		if !metricsGrabber.HasControlPlanePods() {
   122  			e2eskipper.Skipf("Environment does not support getting controller-manager metrics - skipping")
   123  		}
   124  
   125  		ginkgo.By("Getting plugin name")
   126  		defaultClass, err := c.StorageV1().StorageClasses().Get(ctx, defaultScName, metav1.GetOptions{})
   127  		framework.ExpectNoError(err, "Error getting default storageclass: %v", err)
   128  		pluginName := defaultClass.Provisioner
   129  
   130  		controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   131  
   132  		framework.ExpectNoError(err, "Error getting c-m metrics : %v", err)
   133  
   134  		storageOpMetrics := getControllerStorageMetrics(controllerMetrics, pluginName)
   135  
   136  		if !ephemeral {
   137  			pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   138  			framework.ExpectNoError(err)
   139  			gomega.Expect(pvc).ToNot(gomega.BeNil())
   140  		}
   141  
   142  		pod := makePod(f, pvc, ephemeral)
   143  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   144  		framework.ExpectNoError(err)
   145  
   146  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   147  		framework.ExpectNoError(err, "Error starting pod %s", pod.Name)
   148  
   149  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   150  
   151  		updatedStorageMetrics := waitForDetachAndGrabMetrics(ctx, storageOpMetrics, metricsGrabber, pluginName)
   152  
   153  		gomega.Expect(updatedStorageMetrics.latencyMetrics).ToNot(gomega.BeEmpty(), "Error fetching c-m updated storage metrics")
   154  		gomega.Expect(updatedStorageMetrics.statusMetrics).ToNot(gomega.BeEmpty(), "Error fetching c-m updated storage metrics")
   155  
   156  		volumeOperations := []string{"volume_detach", "volume_attach"}
   157  
   158  		for _, volumeOp := range volumeOperations {
   159  			verifyMetricCount(storageOpMetrics, updatedStorageMetrics, volumeOp, false)
   160  		}
   161  	}
   162  
   163  	provisioningError := func(ctx context.Context, ephemeral bool) {
   164  		if !metricsGrabber.HasControlPlanePods() {
   165  			e2eskipper.Skipf("Environment does not support getting controller-manager metrics - skipping")
   166  		}
   167  
   168  		ginkgo.By("Getting default storageclass")
   169  		defaultClass, err := c.StorageV1().StorageClasses().Get(ctx, defaultScName, metav1.GetOptions{})
   170  		framework.ExpectNoError(err, "Error getting default storageclass: %v", err)
   171  		pluginName := defaultClass.Provisioner
   172  
   173  		invalidSc = &storagev1.StorageClass{
   174  			ObjectMeta: metav1.ObjectMeta{
   175  				Name: fmt.Sprintf("fail-metrics-invalid-sc-%s", pvc.Namespace),
   176  			},
   177  			Provisioner: defaultClass.Provisioner,
   178  			Parameters: map[string]string{
   179  				"invalidparam": "invalidvalue",
   180  			},
   181  		}
   182  		_, err = c.StorageV1().StorageClasses().Create(ctx, invalidSc, metav1.CreateOptions{})
   183  		framework.ExpectNoError(err, "Error creating new storageclass: %v", err)
   184  
   185  		pvc.Spec.StorageClassName = &invalidSc.Name
   186  		if !ephemeral {
   187  			pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   188  			framework.ExpectNoError(err, "failed to create PVC %s/%s", pvc.Namespace, pvc.Name)
   189  			gomega.Expect(pvc).ToNot(gomega.BeNil())
   190  		}
   191  
   192  		ginkgo.By("Creating a pod and expecting it to fail")
   193  		pod := makePod(f, pvc, ephemeral)
   194  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   195  		framework.ExpectNoError(err, "failed to create Pod %s/%s", pod.Namespace, pod.Name)
   196  
   197  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   198  		framework.ExpectError(err)
   199  
   200  		framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
   201  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   202  
   203  		ginkgo.By("Checking failure metrics")
   204  		updatedControllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   205  		framework.ExpectNoError(err, "failed to get controller manager metrics")
   206  		updatedStorageMetrics := getControllerStorageMetrics(updatedControllerMetrics, pluginName)
   207  
   208  		gomega.Expect(updatedStorageMetrics.statusMetrics).ToNot(gomega.BeEmpty(), "Error fetching c-m updated storage metrics")
   209  	}
   210  
   211  	filesystemMode := func(ctx context.Context, isEphemeral bool) {
   212  		if !isEphemeral {
   213  			pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   214  			framework.ExpectNoError(err)
   215  			gomega.Expect(pvc).ToNot(gomega.BeNil())
   216  		}
   217  
   218  		pod := makePod(f, pvc, isEphemeral)
   219  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   220  		framework.ExpectNoError(err)
   221  
   222  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   223  		framework.ExpectNoError(err, "Error starting pod %s", pod.Name)
   224  
   225  		pod, err = c.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{})
   226  		framework.ExpectNoError(err)
   227  
   228  		pvcName := pvc.Name
   229  		if isEphemeral {
   230  			pvcName = ephemeral.VolumeClaimName(pod, &pod.Spec.Volumes[0])
   231  		}
   232  		pvcNamespace := pod.Namespace
   233  
   234  		// Verify volume stat metrics were collected for the referenced PVC
   235  		volumeStatKeys := []string{
   236  			kubeletmetrics.VolumeStatsUsedBytesKey,
   237  			kubeletmetrics.VolumeStatsCapacityBytesKey,
   238  			kubeletmetrics.VolumeStatsAvailableBytesKey,
   239  			kubeletmetrics.VolumeStatsInodesKey,
   240  			kubeletmetrics.VolumeStatsInodesFreeKey,
   241  			kubeletmetrics.VolumeStatsInodesUsedKey,
   242  		}
   243  		key := volumeStatKeys[0]
   244  		kubeletKeyName := fmt.Sprintf("%s_%s", kubeletmetrics.KubeletSubsystem, key)
   245  		// Poll kubelet metrics waiting for the volume to be picked up
   246  		// by the volume stats collector
   247  		var kubeMetrics e2emetrics.KubeletMetrics
   248  		waitErr := wait.PollWithContext(ctx, 30*time.Second, 5*time.Minute, func(ctx context.Context) (bool, error) {
   249  			framework.Logf("Grabbing Kubelet metrics")
   250  			// Grab kubelet metrics from the node the pod was scheduled on
   251  			var err error
   252  			kubeMetrics, err = metricsGrabber.GrabFromKubelet(ctx, pod.Spec.NodeName)
   253  			if err != nil {
   254  				framework.Logf("Error fetching kubelet metrics")
   255  				return false, err
   256  			}
   257  			if !findVolumeStatMetric(kubeletKeyName, pvcNamespace, pvcName, kubeMetrics) {
   258  				return false, nil
   259  			}
   260  			return true, nil
   261  		})
   262  		framework.ExpectNoError(waitErr, "Unable to find metric %s for PVC %s/%s", kubeletKeyName, pvcNamespace, pvcName)
   263  
   264  		for _, key := range volumeStatKeys {
   265  			kubeletKeyName := fmt.Sprintf("%s_%s", kubeletmetrics.KubeletSubsystem, key)
   266  			found := findVolumeStatMetric(kubeletKeyName, pvcNamespace, pvcName, kubeMetrics)
   267  			if !found {
   268  				framework.Failf("PVC %s, Namespace %s not found for %s", pvcName, pvcNamespace, kubeletKeyName)
   269  			}
   270  		}
   271  
   272  		framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
   273  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   274  	}
   275  
   276  	blockmode := func(ctx context.Context, isEphemeral bool) {
   277  		if !isEphemeral {
   278  			pvcBlock, err = c.CoreV1().PersistentVolumeClaims(pvcBlock.Namespace).Create(ctx, pvcBlock, metav1.CreateOptions{})
   279  			framework.ExpectNoError(err)
   280  			gomega.Expect(pvcBlock).ToNot(gomega.BeNil())
   281  		}
   282  
   283  		pod := makePod(f, pvcBlock, isEphemeral)
   284  		pod.Spec.Containers[0].VolumeDevices = []v1.VolumeDevice{{
   285  			Name:       pod.Spec.Volumes[0].Name,
   286  			DevicePath: "/mnt/" + pvcBlock.Name,
   287  		}}
   288  		pod.Spec.Containers[0].VolumeMounts = nil
   289  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   290  		framework.ExpectNoError(err)
   291  
   292  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   293  		framework.ExpectNoError(err, "Error starting pod %s", pod.Name)
   294  
   295  		pod, err = c.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{})
   296  		framework.ExpectNoError(err)
   297  
   298  		// Verify volume stat metrics were collected for the referenced PVC
   299  		volumeStatKeys := []string{
   300  			// BlockMode PVCs only support capacity (for now)
   301  			kubeletmetrics.VolumeStatsCapacityBytesKey,
   302  		}
   303  		key := volumeStatKeys[0]
   304  		kubeletKeyName := fmt.Sprintf("%s_%s", kubeletmetrics.KubeletSubsystem, key)
   305  		pvcName := pvcBlock.Name
   306  		pvcNamespace := pvcBlock.Namespace
   307  		if isEphemeral {
   308  			pvcName = ephemeral.VolumeClaimName(pod, &pod.Spec.Volumes[0])
   309  			pvcNamespace = pod.Namespace
   310  		}
   311  		// Poll kubelet metrics waiting for the volume to be picked up
   312  		// by the volume stats collector
   313  		var kubeMetrics e2emetrics.KubeletMetrics
   314  		waitErr := wait.Poll(30*time.Second, 5*time.Minute, func() (bool, error) {
   315  			framework.Logf("Grabbing Kubelet metrics")
   316  			// Grab kubelet metrics from the node the pod was scheduled on
   317  			var err error
   318  			kubeMetrics, err = metricsGrabber.GrabFromKubelet(ctx, pod.Spec.NodeName)
   319  			if err != nil {
   320  				framework.Logf("Error fetching kubelet metrics")
   321  				return false, err
   322  			}
   323  			if !findVolumeStatMetric(kubeletKeyName, pvcNamespace, pvcName, kubeMetrics) {
   324  				return false, nil
   325  			}
   326  			return true, nil
   327  		})
   328  		framework.ExpectNoError(waitErr, "Unable to find metric %s for PVC %s/%s", kubeletKeyName, pvcNamespace, pvcName)
   329  
   330  		for _, key := range volumeStatKeys {
   331  			kubeletKeyName := fmt.Sprintf("%s_%s", kubeletmetrics.KubeletSubsystem, key)
   332  			found := findVolumeStatMetric(kubeletKeyName, pvcNamespace, pvcName, kubeMetrics)
   333  			if !found {
   334  				framework.Failf("PVC %s, Namespace %s not found for %s", pvcName, pvcNamespace, kubeletKeyName)
   335  			}
   336  		}
   337  
   338  		framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
   339  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   340  	}
   341  
   342  	totalTime := func(ctx context.Context, isEphemeral bool) {
   343  		if !isEphemeral {
   344  			pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   345  			framework.ExpectNoError(err)
   346  			gomega.Expect(pvc).ToNot(gomega.BeNil())
   347  		}
   348  
   349  		pod := makePod(f, pvc, isEphemeral)
   350  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   351  		framework.ExpectNoError(err)
   352  
   353  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   354  		framework.ExpectNoError(err, "Error starting pod %s", pod.Name)
   355  
   356  		pod, err = c.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{})
   357  		framework.ExpectNoError(err)
   358  
   359  		controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   360  		if err != nil {
   361  			e2eskipper.Skipf("Could not get controller-manager metrics - skipping")
   362  		}
   363  
   364  		metricKey := "volume_operation_total_seconds_count"
   365  		dimensions := []string{"operation_name", "plugin_name"}
   366  		err = testutil.ValidateMetrics(testutil.Metrics(controllerMetrics), metricKey, dimensions...)
   367  		framework.ExpectNoError(err, "Invalid metric in P/V Controller metrics: %q", metricKey)
   368  
   369  		framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
   370  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   371  	}
   372  
   373  	volumeManager := func(ctx context.Context, isEphemeral bool) {
   374  		if !isEphemeral {
   375  			pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   376  			framework.ExpectNoError(err)
   377  			gomega.Expect(pvc).ToNot(gomega.BeNil())
   378  		}
   379  
   380  		pod := makePod(f, pvc, isEphemeral)
   381  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   382  		framework.ExpectNoError(err)
   383  
   384  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   385  		framework.ExpectNoError(err, "Error starting pod %s", pod.Name)
   386  
   387  		pod, err = c.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{})
   388  		framework.ExpectNoError(err)
   389  
   390  		kubeMetrics, err := metricsGrabber.GrabFromKubelet(ctx, pod.Spec.NodeName)
   391  		framework.ExpectNoError(err)
   392  
   393  		// Metrics should have dimensions plugin_name and state available
   394  		totalVolumesKey := "volume_manager_total_volumes"
   395  		dimensions := []string{"state", "plugin_name"}
   396  		err = testutil.ValidateMetrics(testutil.Metrics(kubeMetrics), totalVolumesKey, dimensions...)
   397  		framework.ExpectNoError(err, "Invalid metric in Volume Manager metrics: %q", totalVolumesKey)
   398  
   399  		framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
   400  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   401  	}
   402  
   403  	adController := func(ctx context.Context, isEphemeral bool) {
   404  		if !isEphemeral {
   405  			pvc, err = c.CoreV1().PersistentVolumeClaims(pvc.Namespace).Create(ctx, pvc, metav1.CreateOptions{})
   406  			framework.ExpectNoError(err)
   407  			gomega.Expect(pvc).ToNot(gomega.BeNil())
   408  		}
   409  
   410  		pod := makePod(f, pvc, isEphemeral)
   411  
   412  		// Get metrics
   413  		controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   414  		if err != nil {
   415  			e2eskipper.Skipf("Could not get controller-manager metrics - skipping")
   416  		}
   417  
   418  		// Create pod
   419  		pod, err = c.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
   420  		framework.ExpectNoError(err)
   421  		err = e2epod.WaitTimeoutForPodRunningInNamespace(ctx, c, pod.Name, pod.Namespace, f.Timeouts.PodStart)
   422  		framework.ExpectNoError(err, "Error starting pod %s", pod.Name)
   423  		pod, err = c.CoreV1().Pods(ns).Get(ctx, pod.Name, metav1.GetOptions{})
   424  		framework.ExpectNoError(err)
   425  
   426  		// Get updated metrics
   427  		updatedControllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   428  		if err != nil {
   429  			e2eskipper.Skipf("Could not get controller-manager metrics - skipping")
   430  		}
   431  
   432  		// Forced detach metric should be present
   433  		forceDetachKey := "attachdetach_controller_forced_detaches"
   434  		_, ok := updatedControllerMetrics[forceDetachKey]
   435  		if !ok {
   436  			framework.Failf("Key %q not found in A/D Controller metrics", forceDetachKey)
   437  		}
   438  
   439  		// Wait and validate
   440  		totalVolumesKey := "attachdetach_controller_total_volumes"
   441  		states := []string{"actual_state_of_world", "desired_state_of_world"}
   442  		dimensions := []string{"state", "plugin_name"}
   443  		waitForADControllerStatesMetrics(ctx, metricsGrabber, totalVolumesKey, dimensions, states)
   444  
   445  		// Total number of volumes in both ActualStateofWorld and DesiredStateOfWorld
   446  		// states should be higher or equal than it used to be
   447  		oldStates := getStatesMetrics(totalVolumesKey, testutil.Metrics(controllerMetrics))
   448  		updatedStates := getStatesMetrics(totalVolumesKey, testutil.Metrics(updatedControllerMetrics))
   449  		for _, stateName := range states {
   450  			if _, ok := oldStates[stateName]; !ok {
   451  				continue
   452  			}
   453  			for pluginName, numVolumes := range updatedStates[stateName] {
   454  				oldNumVolumes := oldStates[stateName][pluginName]
   455  				gomega.Expect(numVolumes).To(gomega.BeNumerically(">=", oldNumVolumes),
   456  					"Wrong number of volumes in state %q, plugin %q: wanted >=%d, got %d",
   457  					stateName, pluginName, oldNumVolumes, numVolumes)
   458  			}
   459  		}
   460  
   461  		framework.Logf("Deleting pod %q/%q", pod.Namespace, pod.Name)
   462  		framework.ExpectNoError(e2epod.DeletePodWithWait(ctx, c, pod))
   463  	}
   464  
   465  	testAll := func(isEphemeral bool) {
   466  		ginkgo.It("should create prometheus metrics for volume provisioning and attach/detach", func(ctx context.Context) {
   467  			provisioning(ctx, isEphemeral)
   468  		})
   469  		// TODO(mauriciopoppe): after CSIMigration is turned on we're no longer reporting
   470  		// the volume_provision metric (removed in #106609), issue to investigate the bug #106773
   471  		f.It("should create prometheus metrics for volume provisioning errors", f.WithSlow(), func(ctx context.Context) {
   472  			provisioningError(ctx, isEphemeral)
   473  		})
   474  		ginkgo.It("should create volume metrics with the correct FilesystemMode PVC ref", func(ctx context.Context) {
   475  			filesystemMode(ctx, isEphemeral)
   476  		})
   477  		ginkgo.It("should create volume metrics with the correct BlockMode PVC ref", func(ctx context.Context) {
   478  			blockmode(ctx, isEphemeral)
   479  		})
   480  		ginkgo.It("should create metrics for total time taken in volume operations in P/V Controller", func(ctx context.Context) {
   481  			totalTime(ctx, isEphemeral)
   482  		})
   483  		ginkgo.It("should create volume metrics in Volume Manager", func(ctx context.Context) {
   484  			volumeManager(ctx, isEphemeral)
   485  		})
   486  		ginkgo.It("should create metrics for total number of volumes in A/D Controller", func(ctx context.Context) {
   487  			adController(ctx, isEphemeral)
   488  		})
   489  	}
   490  
   491  	ginkgo.Context("PVC", func() {
   492  		testAll(false)
   493  	})
   494  
   495  	ginkgo.Context("Ephemeral", func() {
   496  		testAll(true)
   497  	})
   498  
   499  	// Test for pv controller metrics, concretely: bound/unbound pv/pvc count.
   500  	ginkgo.Describe("PVController", func() {
   501  		const (
   502  			classKey      = "storage_class"
   503  			namespaceKey  = "namespace"
   504  			pluginNameKey = "plugin_name"
   505  			volumeModeKey = "volume_mode"
   506  
   507  			totalPVKey    = "pv_collector_total_pv_count"
   508  			boundPVKey    = "pv_collector_bound_pv_count"
   509  			unboundPVKey  = "pv_collector_unbound_pv_count"
   510  			boundPVCKey   = "pv_collector_bound_pvc_count"
   511  			unboundPVCKey = "pv_collector_unbound_pvc_count"
   512  		)
   513  
   514  		var (
   515  			pv  *v1.PersistentVolume
   516  			pvc *v1.PersistentVolumeClaim
   517  
   518  			className = "bound-unbound-count-test-sc"
   519  			pvConfig  = e2epv.PersistentVolumeConfig{
   520  				PVSource: v1.PersistentVolumeSource{
   521  					HostPath: &v1.HostPathVolumeSource{Path: "/data"},
   522  				},
   523  				NamePrefix:       "pv-test-",
   524  				StorageClassName: className,
   525  			}
   526  			pvcConfig = e2epv.PersistentVolumeClaimConfig{StorageClassName: &className}
   527  
   528  			e2emetrics = []struct {
   529  				name      string
   530  				dimension string
   531  			}{
   532  				{boundPVKey, classKey},
   533  				{unboundPVKey, classKey},
   534  				{boundPVCKey, namespaceKey},
   535  				{unboundPVCKey, namespaceKey},
   536  			}
   537  
   538  			// Original metric values before we create any PV/PVCs. The length should be 4,
   539  			// and the elements should be bound pv count, unbound pv count, bound pvc count,
   540  			// unbound pvc count in turn.
   541  			// We use these values to calculate relative increment of each test.
   542  			originMetricValues []map[string]int64
   543  		)
   544  
   545  		// validator used to validate each metric's values, the length of metricValues
   546  		// should be 4, and the elements should be bound pv count, unbound pv count, bound
   547  		// pvc count, unbound pvc count in turn.
   548  		validator := func(ctx context.Context, metricValues []map[string]int64) {
   549  			gomega.Expect(metricValues).To(gomega.HaveLen(4), "Wrong metric size: %d", len(metricValues))
   550  
   551  			controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   552  			framework.ExpectNoError(err, "Error getting c-m metricValues: %v", err)
   553  
   554  			for i, metric := range e2emetrics {
   555  				expectValues := metricValues[i]
   556  				if expectValues == nil {
   557  					expectValues = make(map[string]int64)
   558  				}
   559  				// We using relative increment value instead of absolute value to reduce unexpected flakes.
   560  				// Concretely, we expect the difference of the updated values and original values for each
   561  				// test suit are equal to expectValues.
   562  				actualValues := calculateRelativeValues(originMetricValues[i],
   563  					testutil.GetMetricValuesForLabel(testutil.Metrics(controllerMetrics), metric.name, metric.dimension))
   564  				gomega.Expect(actualValues).To(gomega.Equal(expectValues), "Wrong pv controller metric %s(%s): wanted %v, got %v",
   565  					metric.name, metric.dimension, expectValues, actualValues)
   566  			}
   567  		}
   568  
   569  		ginkgo.BeforeEach(func(ctx context.Context) {
   570  			if !metricsGrabber.HasControlPlanePods() {
   571  				e2eskipper.Skipf("Environment does not support getting controller-manager metrics - skipping")
   572  			}
   573  
   574  			pv = e2epv.MakePersistentVolume(pvConfig)
   575  			pvc = e2epv.MakePersistentVolumeClaim(pvcConfig, ns)
   576  
   577  			// Initializes all original metric values.
   578  			controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   579  			framework.ExpectNoError(err, "Error getting c-m metricValues: %v", err)
   580  			for _, metric := range e2emetrics {
   581  				originMetricValues = append(originMetricValues,
   582  					testutil.GetMetricValuesForLabel(testutil.Metrics(controllerMetrics), metric.name, metric.dimension))
   583  			}
   584  		})
   585  
   586  		ginkgo.AfterEach(func(ctx context.Context) {
   587  			if err := e2epv.DeletePersistentVolume(ctx, c, pv.Name); err != nil {
   588  				framework.Failf("Error deleting pv: %v", err)
   589  			}
   590  			if err := e2epv.DeletePersistentVolumeClaim(ctx, c, pvc.Name, pvc.Namespace); err != nil {
   591  				framework.Failf("Error deleting pvc: %v", err)
   592  			}
   593  
   594  			// Clear original metric values.
   595  			originMetricValues = nil
   596  		})
   597  
   598  		ginkgo.It("should create none metrics for pvc controller before creating any PV or PVC", func(ctx context.Context) {
   599  			validator(ctx, []map[string]int64{nil, nil, nil, nil})
   600  		})
   601  
   602  		ginkgo.It("should create unbound pv count metrics for pvc controller after creating pv only",
   603  			func(ctx context.Context) {
   604  				var err error
   605  				pv, err = e2epv.CreatePV(ctx, c, f.Timeouts, pv)
   606  				framework.ExpectNoError(err, "Error creating pv: %v", err)
   607  				waitForPVControllerSync(ctx, metricsGrabber, unboundPVKey, classKey)
   608  				validator(ctx, []map[string]int64{nil, {className: 1}, nil, nil})
   609  			})
   610  
   611  		ginkgo.It("should create unbound pvc count metrics for pvc controller after creating pvc only",
   612  			func(ctx context.Context) {
   613  				var err error
   614  				pvc, err = e2epv.CreatePVC(ctx, c, ns, pvc)
   615  				framework.ExpectNoError(err, "Error creating pvc: %v", err)
   616  				waitForPVControllerSync(ctx, metricsGrabber, unboundPVCKey, namespaceKey)
   617  				validator(ctx, []map[string]int64{nil, nil, nil, {ns: 1}})
   618  			})
   619  
   620  		ginkgo.It("should create bound pv/pvc count metrics for pvc controller after creating both pv and pvc",
   621  			func(ctx context.Context) {
   622  				var err error
   623  				pv, pvc, err = e2epv.CreatePVPVC(ctx, c, f.Timeouts, pvConfig, pvcConfig, ns, true)
   624  				framework.ExpectNoError(err, "Error creating pv pvc: %v", err)
   625  				waitForPVControllerSync(ctx, metricsGrabber, boundPVKey, classKey)
   626  				waitForPVControllerSync(ctx, metricsGrabber, boundPVCKey, namespaceKey)
   627  				validator(ctx, []map[string]int64{{className: 1}, nil, {ns: 1}, nil})
   628  
   629  			})
   630  		ginkgo.It("should create total pv count metrics for with plugin and volume mode labels after creating pv",
   631  			func(ctx context.Context) {
   632  				var err error
   633  				dimensions := []string{pluginNameKey, volumeModeKey}
   634  				pv, err = e2epv.CreatePV(ctx, c, f.Timeouts, pv)
   635  				framework.ExpectNoError(err, "Error creating pv: %v", err)
   636  				waitForPVControllerSync(ctx, metricsGrabber, totalPVKey, pluginNameKey)
   637  				controllerMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   638  				framework.ExpectNoError(err, "Error getting c-m metricValues: %v", err)
   639  				err = testutil.ValidateMetrics(testutil.Metrics(controllerMetrics), totalPVKey, dimensions...)
   640  				framework.ExpectNoError(err, "Invalid metric in Controller Manager metrics: %q", totalPVKey)
   641  			})
   642  	})
   643  })
   644  
   645  type storageControllerMetrics struct {
   646  	latencyMetrics map[string]int64
   647  	statusMetrics  map[string]statusMetricCounts
   648  }
   649  
   650  type statusMetricCounts struct {
   651  	successCount int64
   652  	failCount    int64
   653  	otherCount   int64
   654  }
   655  
   656  func newStorageControllerMetrics() *storageControllerMetrics {
   657  	return &storageControllerMetrics{
   658  		latencyMetrics: make(map[string]int64),
   659  		statusMetrics:  make(map[string]statusMetricCounts),
   660  	}
   661  }
   662  
   663  func waitForDetachAndGrabMetrics(ctx context.Context, oldMetrics *storageControllerMetrics, metricsGrabber *e2emetrics.Grabber, pluginName string) *storageControllerMetrics {
   664  	backoff := wait.Backoff{
   665  		Duration: 10 * time.Second,
   666  		Factor:   1.2,
   667  		Steps:    21,
   668  	}
   669  
   670  	updatedStorageMetrics := newStorageControllerMetrics()
   671  	oldDetachCount, ok := oldMetrics.latencyMetrics["volume_detach"]
   672  	if !ok {
   673  		oldDetachCount = 0
   674  	}
   675  
   676  	verifyMetricFunc := func(ctx context.Context) (bool, error) {
   677  		updatedMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   678  
   679  		if err != nil {
   680  			framework.Logf("Error fetching controller-manager metrics")
   681  			return false, err
   682  		}
   683  
   684  		updatedStorageMetrics = getControllerStorageMetrics(updatedMetrics, pluginName)
   685  		newDetachCount, ok := updatedStorageMetrics.latencyMetrics["volume_detach"]
   686  
   687  		// if detach metrics are not yet there, we need to retry
   688  		if !ok {
   689  			return false, nil
   690  		}
   691  
   692  		// if old Detach count is more or equal to new detach count, that means detach
   693  		// event has not been observed yet.
   694  		if oldDetachCount >= newDetachCount {
   695  			return false, nil
   696  		}
   697  
   698  		return true, nil
   699  	}
   700  
   701  	waitErr := wait.ExponentialBackoffWithContext(ctx, backoff, verifyMetricFunc)
   702  	framework.ExpectNoError(waitErr, "Unable to get updated metrics for plugin %s", pluginName)
   703  	return updatedStorageMetrics
   704  }
   705  
   706  func verifyMetricCount(oldMetrics, newMetrics *storageControllerMetrics, metricName string, expectFailure bool) {
   707  	oldLatencyCount, ok := oldMetrics.latencyMetrics[metricName]
   708  	// if metric does not exist in oldMap, it probably hasn't been emitted yet.
   709  	if !ok {
   710  		oldLatencyCount = 0
   711  	}
   712  
   713  	oldStatusCount := int64(0)
   714  	if oldStatusCounts, ok := oldMetrics.statusMetrics[metricName]; ok {
   715  		if expectFailure {
   716  			oldStatusCount = oldStatusCounts.failCount
   717  		} else {
   718  			oldStatusCount = oldStatusCounts.successCount
   719  		}
   720  	}
   721  
   722  	newLatencyCount, ok := newMetrics.latencyMetrics[metricName]
   723  	if !expectFailure {
   724  		if !ok {
   725  			framework.Failf("Error getting updated latency metrics for %s", metricName)
   726  		}
   727  	}
   728  	newStatusCounts, ok := newMetrics.statusMetrics[metricName]
   729  	if !ok {
   730  		framework.Failf("Error getting updated status metrics for %s", metricName)
   731  	}
   732  
   733  	newStatusCount := int64(0)
   734  	if expectFailure {
   735  		newStatusCount = newStatusCounts.failCount
   736  	} else {
   737  		newStatusCount = newStatusCounts.successCount
   738  	}
   739  
   740  	// It appears that in a busy cluster some spurious detaches are unavoidable
   741  	// even if the test is run serially.  We really just verify if new count
   742  	// is greater than old count
   743  	if !expectFailure {
   744  		gomega.Expect(newLatencyCount).To(gomega.BeNumerically(">", oldLatencyCount), "New latency count %d should be more than old count %d for action %s", newLatencyCount, oldLatencyCount, metricName)
   745  	}
   746  	gomega.Expect(newStatusCount).To(gomega.BeNumerically(">", oldStatusCount), "New status count %d should be more than old count %d for action %s", newStatusCount, oldStatusCount, metricName)
   747  }
   748  
   749  func getControllerStorageMetrics(ms e2emetrics.ControllerManagerMetrics, pluginName string) *storageControllerMetrics {
   750  	result := newStorageControllerMetrics()
   751  
   752  	for method, samples := range ms {
   753  		switch method {
   754  		// from the base metric name "storage_operation_duration_seconds"
   755  		case "storage_operation_duration_seconds_count":
   756  			for _, sample := range samples {
   757  				count := int64(sample.Value)
   758  				operation := string(sample.Metric["operation_name"])
   759  				// if the volumes were provisioned with a CSI Driver
   760  				// the metric operation name will be prefixed with
   761  				// "kubernetes.io/csi:"
   762  				metricPluginName := string(sample.Metric["volume_plugin"])
   763  				status := string(sample.Metric["status"])
   764  				if strings.Index(metricPluginName, pluginName) < 0 {
   765  					// the metric volume plugin field doesn't match
   766  					// the default storageClass.Provisioner field
   767  					continue
   768  				}
   769  
   770  				statusCounts := result.statusMetrics[operation]
   771  				switch status {
   772  				case "success":
   773  					statusCounts.successCount = count
   774  				case "fail-unknown":
   775  					statusCounts.failCount = count
   776  				default:
   777  					statusCounts.otherCount = count
   778  				}
   779  				result.statusMetrics[operation] = statusCounts
   780  				result.latencyMetrics[operation] = count
   781  			}
   782  		}
   783  	}
   784  	return result
   785  }
   786  
   787  // Finds the sample in the specified metric from `KubeletMetrics` tagged with
   788  // the specified namespace and pvc name
   789  func findVolumeStatMetric(metricKeyName string, namespace string, pvcName string, kubeletMetrics e2emetrics.KubeletMetrics) bool {
   790  	found := false
   791  	errCount := 0
   792  	framework.Logf("Looking for sample in metric `%s` tagged with namespace `%s`, PVC `%s`", metricKeyName, namespace, pvcName)
   793  	if samples, ok := kubeletMetrics[metricKeyName]; ok {
   794  		for _, sample := range samples {
   795  			framework.Logf("Found sample %s", sample.String())
   796  			samplePVC, ok := sample.Metric["persistentvolumeclaim"]
   797  			if !ok {
   798  				framework.Logf("Error getting pvc for metric %s, sample %s", metricKeyName, sample.String())
   799  				errCount++
   800  			}
   801  			sampleNS, ok := sample.Metric["namespace"]
   802  			if !ok {
   803  				framework.Logf("Error getting namespace for metric %s, sample %s", metricKeyName, sample.String())
   804  				errCount++
   805  			}
   806  
   807  			if string(samplePVC) == pvcName && string(sampleNS) == namespace {
   808  				found = true
   809  				break
   810  			}
   811  		}
   812  	}
   813  	gomega.Expect(errCount).To(gomega.Equal(0), "Found invalid samples")
   814  	return found
   815  }
   816  
   817  // Wait for the count of a pv controller's metric specified by metricName and dimension bigger than zero.
   818  func waitForPVControllerSync(ctx context.Context, metricsGrabber *e2emetrics.Grabber, metricName, dimension string) {
   819  	backoff := wait.Backoff{
   820  		Duration: 10 * time.Second,
   821  		Factor:   1.2,
   822  		Steps:    21,
   823  	}
   824  	verifyMetricFunc := func(ctx context.Context) (bool, error) {
   825  		updatedMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   826  		if err != nil {
   827  			framework.Logf("Error fetching controller-manager metrics")
   828  			return false, err
   829  		}
   830  		return len(testutil.GetMetricValuesForLabel(testutil.Metrics(updatedMetrics), metricName, dimension)) > 0, nil
   831  	}
   832  	waitErr := wait.ExponentialBackoffWithContext(ctx, backoff, verifyMetricFunc)
   833  	framework.ExpectNoError(waitErr, "Unable to get pv controller metrics")
   834  }
   835  
   836  func calculateRelativeValues(originValues, updatedValues map[string]int64) map[string]int64 {
   837  	relativeValues := make(map[string]int64)
   838  	for key, value := range updatedValues {
   839  		relativeValue := value - originValues[key]
   840  		if relativeValue != 0 {
   841  			relativeValues[key] = relativeValue
   842  		}
   843  	}
   844  	for key, value := range originValues {
   845  		if _, exist := updatedValues[key]; !exist && value > 0 {
   846  			relativeValues[key] = -value
   847  		}
   848  	}
   849  	return relativeValues
   850  }
   851  
   852  func getStatesMetrics(metricKey string, givenMetrics testutil.Metrics) map[string]map[string]int64 {
   853  	states := make(map[string]map[string]int64)
   854  	for _, sample := range givenMetrics[metricKey] {
   855  		framework.Logf("Found sample %q", sample.String())
   856  		state := string(sample.Metric["state"])
   857  		pluginName := string(sample.Metric["plugin_name"])
   858  		states[state] = map[string]int64{pluginName: int64(sample.Value)}
   859  	}
   860  	return states
   861  }
   862  
   863  func waitForADControllerStatesMetrics(ctx context.Context, metricsGrabber *e2emetrics.Grabber, metricName string, dimensions []string, stateNames []string) {
   864  	backoff := wait.Backoff{
   865  		Duration: 10 * time.Second,
   866  		Factor:   1.2,
   867  		Steps:    21,
   868  	}
   869  	verifyMetricFunc := func(ctx context.Context) (bool, error) {
   870  		updatedMetrics, err := metricsGrabber.GrabFromControllerManager(ctx)
   871  		if err != nil {
   872  			e2eskipper.Skipf("Could not get controller-manager metrics - skipping")
   873  			return false, err
   874  		}
   875  		err = testutil.ValidateMetrics(testutil.Metrics(updatedMetrics), metricName, dimensions...)
   876  		if err != nil {
   877  			return false, fmt.Errorf("could not get valid metrics: %v ", err)
   878  		}
   879  		states := getStatesMetrics(metricName, testutil.Metrics(updatedMetrics))
   880  		for _, name := range stateNames {
   881  			if _, ok := states[name]; !ok {
   882  				return false, fmt.Errorf("could not get state %q from A/D Controller metrics", name)
   883  			}
   884  		}
   885  		return true, nil
   886  	}
   887  	waitErr := wait.ExponentialBackoffWithContext(ctx, backoff, verifyMetricFunc)
   888  	framework.ExpectNoError(waitErr, "Unable to get A/D controller metrics")
   889  }
   890  
   891  // makePod creates a pod which either references the PVC or creates it via a
   892  // generic ephemeral volume claim template.
   893  func makePod(f *framework.Framework, pvc *v1.PersistentVolumeClaim, isEphemeral bool) *v1.Pod {
   894  	claims := []*v1.PersistentVolumeClaim{pvc}
   895  	pod := e2epod.MakePod(f.Namespace.Name, nil, claims, f.NamespacePodSecurityLevel, "")
   896  	if isEphemeral {
   897  		volSrc := pod.Spec.Volumes[0]
   898  		volSrc.PersistentVolumeClaim = nil
   899  		volSrc.Ephemeral = &v1.EphemeralVolumeSource{
   900  			VolumeClaimTemplate: &v1.PersistentVolumeClaimTemplate{
   901  				Spec: pvc.Spec,
   902  			},
   903  		}
   904  		pod.Spec.Volumes[0] = volSrc
   905  	}
   906  	return pod
   907  }
   908  

View as plain text