...

Source file src/k8s.io/kubernetes/test/e2e/instrumentation/monitoring/custom_metrics_stackdriver.go

Documentation: k8s.io/kubernetes/test/e2e/instrumentation/monitoring

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package monitoring
    18  
    19  import (
    20  	"context"
    21  	"strings"
    22  	"time"
    23  
    24  	gcm "google.golang.org/api/monitoring/v3"
    25  	v1 "k8s.io/api/core/v1"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/labels"
    28  	"k8s.io/apimachinery/pkg/runtime/schema"
    29  	"k8s.io/apimachinery/pkg/selection"
    30  	"k8s.io/client-go/discovery"
    31  	cacheddiscovery "k8s.io/client-go/discovery/cached/memory"
    32  	clientset "k8s.io/client-go/kubernetes"
    33  	"k8s.io/client-go/restmapper"
    34  	"k8s.io/kubernetes/test/e2e/feature"
    35  	"k8s.io/kubernetes/test/e2e/framework"
    36  	e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
    37  	instrumentation "k8s.io/kubernetes/test/e2e/instrumentation/common"
    38  	customclient "k8s.io/metrics/pkg/client/custom_metrics"
    39  	externalclient "k8s.io/metrics/pkg/client/external_metrics"
    40  	admissionapi "k8s.io/pod-security-admission/api"
    41  
    42  	"github.com/onsi/ginkgo/v2"
    43  	"golang.org/x/oauth2/google"
    44  	"google.golang.org/api/option"
    45  )
    46  
    47  const (
    48  	stackdriverExporterPod1  = "stackdriver-exporter-1"
    49  	stackdriverExporterPod2  = "stackdriver-exporter-2"
    50  	stackdriverExporterLabel = "stackdriver-exporter"
    51  )
    52  
    53  var _ = instrumentation.SIGDescribe("Stackdriver Monitoring", func() {
    54  	ginkgo.BeforeEach(func() {
    55  		e2eskipper.SkipUnlessProviderIs("gce", "gke")
    56  	})
    57  
    58  	f := framework.NewDefaultFramework("stackdriver-monitoring")
    59  	f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
    60  
    61  	f.It("should run Custom Metrics - Stackdriver Adapter for old resource model", feature.StackdriverCustomMetrics, func(ctx context.Context) {
    62  		kubeClient := f.ClientSet
    63  		config, err := framework.LoadConfig()
    64  		if err != nil {
    65  			framework.Failf("Failed to load config: %s", err)
    66  		}
    67  		discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(config)
    68  		cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
    69  		restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoClient)
    70  		restMapper.Reset()
    71  		apiVersionsGetter := customclient.NewAvailableAPIsGetter(discoveryClient)
    72  		customMetricsClient := customclient.NewForConfig(config, restMapper, apiVersionsGetter)
    73  		testCustomMetrics(ctx, f, kubeClient, customMetricsClient, discoveryClient, AdapterForOldResourceModel)
    74  	})
    75  
    76  	f.It("should run Custom Metrics - Stackdriver Adapter for new resource model", feature.StackdriverCustomMetrics, func(ctx context.Context) {
    77  		kubeClient := f.ClientSet
    78  		config, err := framework.LoadConfig()
    79  		if err != nil {
    80  			framework.Failf("Failed to load config: %s", err)
    81  		}
    82  		discoveryClient := discovery.NewDiscoveryClientForConfigOrDie(config)
    83  		cachedDiscoClient := cacheddiscovery.NewMemCacheClient(discoveryClient)
    84  		restMapper := restmapper.NewDeferredDiscoveryRESTMapper(cachedDiscoClient)
    85  		restMapper.Reset()
    86  		apiVersionsGetter := customclient.NewAvailableAPIsGetter(discoveryClient)
    87  		customMetricsClient := customclient.NewForConfig(config, restMapper, apiVersionsGetter)
    88  		testCustomMetrics(ctx, f, kubeClient, customMetricsClient, discoveryClient, AdapterForNewResourceModel)
    89  	})
    90  
    91  	f.It("should run Custom Metrics - Stackdriver Adapter for external metrics", feature.StackdriverExternalMetrics, func(ctx context.Context) {
    92  		kubeClient := f.ClientSet
    93  		config, err := framework.LoadConfig()
    94  		if err != nil {
    95  			framework.Failf("Failed to load config: %s", err)
    96  		}
    97  		externalMetricsClient := externalclient.NewForConfigOrDie(config)
    98  		testExternalMetrics(ctx, f, kubeClient, externalMetricsClient)
    99  	})
   100  })
   101  
   102  func testCustomMetrics(ctx context.Context, f *framework.Framework, kubeClient clientset.Interface, customMetricsClient customclient.CustomMetricsClient, discoveryClient *discovery.DiscoveryClient, adapterDeployment string) {
   103  	projectID := framework.TestContext.CloudConfig.ProjectID
   104  
   105  	client, err := google.DefaultClient(ctx, gcm.CloudPlatformScope)
   106  	framework.ExpectNoError(err)
   107  
   108  	gcmService, err := gcm.NewService(ctx, option.WithHTTPClient(client))
   109  	if err != nil {
   110  		framework.Failf("Failed to create gcm service, %v", err)
   111  	}
   112  
   113  	// Set up a cluster: create a custom metric and set up k8s-sd adapter
   114  	err = CreateDescriptors(gcmService, projectID)
   115  	if err != nil {
   116  		if strings.Contains(err.Error(), "Request throttled") {
   117  			e2eskipper.Skipf("Skipping...hitting rate limits on creating and updating metrics/labels")
   118  		}
   119  		framework.Failf("Failed to create metric descriptor: %s", err)
   120  	}
   121  	ginkgo.DeferCleanup(CleanupDescriptors, gcmService, projectID)
   122  
   123  	err = CreateAdapter(adapterDeployment)
   124  	if err != nil {
   125  		framework.Failf("Failed to set up: %s", err)
   126  	}
   127  	ginkgo.DeferCleanup(CleanupAdapter, adapterDeployment)
   128  
   129  	_, err = kubeClient.RbacV1().ClusterRoleBindings().Create(ctx, HPAPermissions, metav1.CreateOptions{})
   130  	if err != nil {
   131  		framework.Failf("Failed to create ClusterRoleBindings: %v", err)
   132  	}
   133  	ginkgo.DeferCleanup(kubeClient.RbacV1().ClusterRoleBindings().Delete, HPAPermissions.Name, metav1.DeleteOptions{})
   134  
   135  	// Run application that exports the metric
   136  	_, err = createSDExporterPods(ctx, f, kubeClient)
   137  	if err != nil {
   138  		framework.Failf("Failed to create stackdriver-exporter pod: %s", err)
   139  	}
   140  	ginkgo.DeferCleanup(cleanupSDExporterPod, f, kubeClient)
   141  
   142  	// Wait a short amount of time to create a pod and export some metrics
   143  	// TODO: add some events to wait for instead of fixed amount of time
   144  	//       i.e. pod creation, first time series exported
   145  	time.Sleep(60 * time.Second)
   146  
   147  	verifyResponsesFromCustomMetricsAPI(f, customMetricsClient, discoveryClient)
   148  }
   149  
   150  // TODO(kawych): migrate this test to new resource model
   151  func testExternalMetrics(ctx context.Context, f *framework.Framework, kubeClient clientset.Interface, externalMetricsClient externalclient.ExternalMetricsClient) {
   152  	projectID := framework.TestContext.CloudConfig.ProjectID
   153  
   154  	client, err := google.DefaultClient(ctx, gcm.CloudPlatformScope)
   155  	framework.ExpectNoError(err)
   156  
   157  	gcmService, err := gcm.NewService(ctx, option.WithHTTPClient(client))
   158  	if err != nil {
   159  		framework.Failf("Failed to create gcm service, %v", err)
   160  	}
   161  
   162  	// Set up a cluster: create a custom metric and set up k8s-sd adapter
   163  	err = CreateDescriptors(gcmService, projectID)
   164  	if err != nil {
   165  		if strings.Contains(err.Error(), "Request throttled") {
   166  			e2eskipper.Skipf("Skipping...hitting rate limits on creating and updating metrics/labels")
   167  		}
   168  		framework.Failf("Failed to create metric descriptor: %s", err)
   169  	}
   170  	ginkgo.DeferCleanup(CleanupDescriptors, gcmService, projectID)
   171  
   172  	// Both deployments - for old and new resource model - expose External Metrics API.
   173  	err = CreateAdapter(AdapterForOldResourceModel)
   174  	if err != nil {
   175  		framework.Failf("Failed to set up: %s", err)
   176  	}
   177  	ginkgo.DeferCleanup(CleanupAdapter, AdapterForOldResourceModel)
   178  
   179  	_, err = kubeClient.RbacV1().ClusterRoleBindings().Create(ctx, HPAPermissions, metav1.CreateOptions{})
   180  	if err != nil {
   181  		framework.Failf("Failed to create ClusterRoleBindings: %v", err)
   182  	}
   183  	ginkgo.DeferCleanup(kubeClient.RbacV1().ClusterRoleBindings().Delete, HPAPermissions.Name, metav1.DeleteOptions{})
   184  
   185  	// Run application that exports the metric
   186  	pod, err := createSDExporterPods(ctx, f, kubeClient)
   187  	if err != nil {
   188  		framework.Failf("Failed to create stackdriver-exporter pod: %s", err)
   189  	}
   190  	ginkgo.DeferCleanup(cleanupSDExporterPod, f, kubeClient)
   191  
   192  	// Wait a short amount of time to create a pod and export some metrics
   193  	// TODO: add some events to wait for instead of fixed amount of time
   194  	//       i.e. pod creation, first time series exported
   195  	time.Sleep(60 * time.Second)
   196  
   197  	verifyResponseFromExternalMetricsAPI(f, externalMetricsClient, pod)
   198  }
   199  
   200  func verifyResponsesFromCustomMetricsAPI(f *framework.Framework, customMetricsClient customclient.CustomMetricsClient, discoveryClient *discovery.DiscoveryClient) {
   201  	resources, err := discoveryClient.ServerResourcesForGroupVersion("custom.metrics.k8s.io/v1beta1")
   202  	if err != nil {
   203  		framework.Failf("Failed to retrieve a list of supported metrics: %s", err)
   204  	}
   205  	if !containsResource(resources.APIResources, "*/custom.googleapis.com|"+CustomMetricName) {
   206  		framework.Failf("Metric '%s' expected but not received", CustomMetricName)
   207  	}
   208  	if !containsResource(resources.APIResources, "*/custom.googleapis.com|"+UnusedMetricName) {
   209  		framework.Failf("Metric '%s' expected but not received", UnusedMetricName)
   210  	}
   211  	value, err := customMetricsClient.NamespacedMetrics(f.Namespace.Name).GetForObject(schema.GroupKind{Group: "", Kind: "Pod"}, stackdriverExporterPod1, CustomMetricName, labels.NewSelector())
   212  	if err != nil {
   213  		framework.Failf("Failed query: %s", err)
   214  	}
   215  	if value.Value.Value() != CustomMetricValue {
   216  		framework.Failf("Unexpected metric value for metric %s: expected %v but received %v", CustomMetricName, CustomMetricValue, value.Value)
   217  	}
   218  	filter, err := labels.NewRequirement("name", selection.Equals, []string{stackdriverExporterLabel})
   219  	if err != nil {
   220  		framework.Failf("Couldn't create a label filter")
   221  	}
   222  	values, err := customMetricsClient.NamespacedMetrics(f.Namespace.Name).GetForObjects(schema.GroupKind{Group: "", Kind: "Pod"}, labels.NewSelector().Add(*filter), CustomMetricName, labels.NewSelector())
   223  	if err != nil {
   224  		framework.Failf("Failed query: %s", err)
   225  	}
   226  	if len(values.Items) != 1 {
   227  		framework.Failf("Expected results for exactly 1 pod, but %v results received", len(values.Items))
   228  	}
   229  	if values.Items[0].DescribedObject.Name != stackdriverExporterPod1 || values.Items[0].Value.Value() != CustomMetricValue {
   230  		framework.Failf("Unexpected metric value for metric %s and pod %s: %v", CustomMetricName, values.Items[0].DescribedObject.Name, values.Items[0].Value.Value())
   231  	}
   232  }
   233  
   234  func containsResource(resourcesList []metav1.APIResource, resourceName string) bool {
   235  	for _, resource := range resourcesList {
   236  		if resource.Name == resourceName {
   237  			return true
   238  		}
   239  	}
   240  	return false
   241  }
   242  
   243  func verifyResponseFromExternalMetricsAPI(f *framework.Framework, externalMetricsClient externalclient.ExternalMetricsClient, pod *v1.Pod) {
   244  	req1, _ := labels.NewRequirement("resource.type", selection.Equals, []string{"gke_container"})
   245  	// It's important to filter out only metrics from the right namespace, since multiple e2e tests
   246  	// may run in the same project concurrently. "dummy" is added to test
   247  	req2, _ := labels.NewRequirement("resource.labels.pod_id", selection.In, []string{string(pod.UID), "dummy"})
   248  	req3, _ := labels.NewRequirement("resource.labels.namespace_id", selection.Exists, []string{})
   249  	req4, _ := labels.NewRequirement("resource.labels.zone", selection.NotEquals, []string{"dummy"})
   250  	req5, _ := labels.NewRequirement("resource.labels.cluster_name", selection.NotIn, []string{"foo", "bar"})
   251  	values, err := externalMetricsClient.
   252  		NamespacedMetrics("dummy").
   253  		List("custom.googleapis.com|"+CustomMetricName, labels.NewSelector().Add(*req1, *req2, *req3, *req4, *req5))
   254  	if err != nil {
   255  		framework.Failf("Failed query: %s", err)
   256  	}
   257  	if len(values.Items) != 1 {
   258  		framework.Failf("Expected exactly one external metric value, but % values received", len(values.Items))
   259  	}
   260  	if values.Items[0].MetricName != "custom.googleapis.com|"+CustomMetricName ||
   261  		values.Items[0].Value.Value() != CustomMetricValue ||
   262  		// Check one label just to make sure labels are included
   263  		values.Items[0].MetricLabels["resource.labels.pod_id"] != string(pod.UID) {
   264  		framework.Failf("Unexpected result for metric %s: %v", CustomMetricName, values.Items[0])
   265  	}
   266  }
   267  
   268  func cleanupSDExporterPod(ctx context.Context, f *framework.Framework, cs clientset.Interface) {
   269  	err := cs.CoreV1().Pods(f.Namespace.Name).Delete(ctx, stackdriverExporterPod1, metav1.DeleteOptions{})
   270  	if err != nil {
   271  		framework.Logf("Failed to delete %s pod: %v", stackdriverExporterPod1, err)
   272  	}
   273  	err = cs.CoreV1().Pods(f.Namespace.Name).Delete(ctx, stackdriverExporterPod2, metav1.DeleteOptions{})
   274  	if err != nil {
   275  		framework.Logf("Failed to delete %s pod: %v", stackdriverExporterPod2, err)
   276  	}
   277  }
   278  
   279  func createSDExporterPods(ctx context.Context, f *framework.Framework, cs clientset.Interface) (*v1.Pod, error) {
   280  	pod, err := cs.CoreV1().Pods(f.Namespace.Name).Create(ctx, StackdriverExporterPod(stackdriverExporterPod1, f.Namespace.Name, stackdriverExporterLabel, CustomMetricName, CustomMetricValue), metav1.CreateOptions{})
   281  	if err != nil {
   282  		return nil, err
   283  	}
   284  	_, err = cs.CoreV1().Pods(f.Namespace.Name).Create(ctx, StackdriverExporterPod(stackdriverExporterPod2, f.Namespace.Name, stackdriverExporterLabel, UnusedMetricName, UnusedMetricValue), metav1.CreateOptions{})
   285  	return pod, err
   286  }
   287  

View as plain text