...

Source file src/github.com/linkerd/linkerd2/testutil/kubernetes_helper.go

Documentation: github.com/linkerd/linkerd2/testutil

     1  package testutil
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"os"
     8  	"os/exec"
     9  	"regexp"
    10  	"strings"
    11  	"testing"
    12  	"time"
    13  
    14  	"github.com/linkerd/linkerd2/pkg/k8s"
    15  	corev1 "k8s.io/api/core/v1"
    16  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    17  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    18  	"k8s.io/apimachinery/pkg/labels"
    19  	"k8s.io/client-go/kubernetes"
    20  	"k8s.io/client-go/tools/clientcmd"
    21  
    22  	// Loads the GCP auth plugin
    23  	_ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
    24  )
    25  
    26  // KubernetesHelper provides Kubernetes-related test helpers. It connects to the
    27  // Kubernetes API using the environment's configured kubeconfig file.
    28  type KubernetesHelper struct {
    29  	k8sContext string
    30  	clientset  *kubernetes.Clientset
    31  	retryFor   func(time.Duration, func() error) error
    32  }
    33  
    34  // RestartCountError is returned by CheckPods() whenever a pod has restarted exactly one time.
    35  // Consumers should log this type of error instead of failing the test.
    36  // This is to alleviate CI flakiness stemming from a containerd bug.
    37  // See https://github.com/kubernetes/kubernetes/issues/89064
    38  // See https://github.com/containerd/containerd/issues/4068
    39  type RestartCountError struct {
    40  	msg string
    41  }
    42  
    43  func (e *RestartCountError) Error() string {
    44  	return e.msg
    45  }
    46  
    47  // NewKubernetesHelper creates a new instance of KubernetesHelper.
    48  func NewKubernetesHelper(k8sContext string, retryFor func(time.Duration, func() error) error) (*KubernetesHelper, error) {
    49  	rules := clientcmd.NewDefaultClientConfigLoadingRules()
    50  	overrides := &clientcmd.ConfigOverrides{CurrentContext: k8sContext}
    51  	kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides)
    52  	config, err := kubeConfig.ClientConfig()
    53  	if err != nil {
    54  		return nil, err
    55  	}
    56  
    57  	clientset, err := kubernetes.NewForConfig(config)
    58  	if err != nil {
    59  		return nil, err
    60  	}
    61  
    62  	return &KubernetesHelper{
    63  		clientset:  clientset,
    64  		k8sContext: k8sContext,
    65  		retryFor:   retryFor,
    66  	}, nil
    67  }
    68  
    69  // CheckIfNamespaceExists checks if a namespace exists.
    70  func (h *KubernetesHelper) CheckIfNamespaceExists(ctx context.Context, namespace string) error {
    71  	_, err := h.clientset.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
    72  	return err
    73  }
    74  
    75  // SwitchContext will re-build the clientset with the given context. Useful when
    76  // testing multiple clusters at the same time
    77  func (h *KubernetesHelper) SwitchContext(ctx string) error {
    78  	rules := clientcmd.NewDefaultClientConfigLoadingRules()
    79  	overrides := &clientcmd.ConfigOverrides{CurrentContext: ctx}
    80  	kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides)
    81  	config, err := kubeConfig.ClientConfig()
    82  	if err != nil {
    83  		return err
    84  	}
    85  
    86  	clientset, err := kubernetes.NewForConfig(config)
    87  	if err != nil {
    88  		return err
    89  	}
    90  
    91  	h.clientset = clientset
    92  	return nil
    93  }
    94  
    95  // GetSecret retrieves a Kubernetes Secret
    96  func (h *KubernetesHelper) GetSecret(ctx context.Context, namespace, name string) (*corev1.Secret, error) {
    97  	return h.clientset.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{})
    98  }
    99  
   100  func (h *KubernetesHelper) createNamespaceIfNotExists(ctx context.Context, namespace string, annotations, labels map[string]string) error {
   101  	err := h.CheckIfNamespaceExists(ctx, namespace)
   102  
   103  	if err != nil {
   104  		ns := &corev1.Namespace{
   105  			ObjectMeta: metav1.ObjectMeta{
   106  				Labels:      labels,
   107  				Annotations: annotations,
   108  				Name:        namespace,
   109  			},
   110  		}
   111  		_, err = h.clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
   112  
   113  		if err != nil {
   114  			return err
   115  		}
   116  	}
   117  
   118  	return nil
   119  }
   120  
   121  // DeleteNamespaceIfExists attempts to delete the given namespace,
   122  // using the K8s API directly
   123  func (h *KubernetesHelper) DeleteNamespaceIfExists(ctx context.Context, namespace string) error {
   124  	err := h.clientset.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{})
   125  
   126  	if err != nil && !kerrors.IsNotFound(err) {
   127  		return err
   128  	}
   129  	return nil
   130  }
   131  
   132  // CreateControlPlaneNamespaceIfNotExists creates linkerd control plane namespace.
   133  func (h *KubernetesHelper) CreateControlPlaneNamespaceIfNotExists(ctx context.Context, namespace string) error {
   134  	return h.createNamespaceIfNotExists(ctx, namespace, nil, nil)
   135  }
   136  
   137  // CreateDataPlaneNamespaceIfNotExists creates a dataplane namespace if it does not already exist,
   138  // with a test.linkerd.io/is-test-data-plane label for easier cleanup afterwards
   139  func (h *KubernetesHelper) CreateDataPlaneNamespaceIfNotExists(ctx context.Context, namespace string, annotations map[string]string) error {
   140  	return h.createNamespaceIfNotExists(ctx, namespace, annotations, map[string]string{"test.linkerd.io/is-test-data-plane": "true"})
   141  }
   142  
   143  // KubectlApply applies a given configuration string in a namespace. If the
   144  // namespace does not exist, it creates it first. If no namespace is provided,
   145  // it does not specify the `--namespace` flag.
   146  func (h *KubernetesHelper) KubectlApply(stdin string, namespace string) (string, error) {
   147  	args := []string{"apply", "-f", "-"}
   148  	if namespace != "" {
   149  		args = append(args, "--namespace", namespace)
   150  	}
   151  
   152  	return h.Kubectl(stdin, args...)
   153  }
   154  
   155  // KubectlApplyWithArgs applies a given configuration string with the passed
   156  // flags
   157  func (h *KubernetesHelper) KubectlApplyWithArgs(stdin string, cmdArgs ...string) (string, error) {
   158  	args := []string{"apply"}
   159  	args = append(args, cmdArgs...)
   160  	args = append(args, "-f", "-")
   161  	return h.Kubectl(stdin, args...)
   162  }
   163  
   164  // Kubectl executes an arbitrary Kubectl command
   165  func (h *KubernetesHelper) Kubectl(stdin string, arg ...string) (string, error) {
   166  	withContext := append([]string{"--context=" + h.k8sContext}, arg...)
   167  	cmd := exec.Command("kubectl", withContext...)
   168  	cmd.Stdin = strings.NewReader(stdin)
   169  	out, err := cmd.CombinedOutput()
   170  	return string(out), err
   171  }
   172  
   173  // KubectlApplyWithContext applies a given configuration with the given flags
   174  func (h *KubernetesHelper) KubectlApplyWithContext(stdin string, context string, arg ...string) (string, error) {
   175  	args := append([]string{"apply"}, arg...)
   176  	return h.KubectlWithContext(stdin, context, args...)
   177  }
   178  
   179  // KubectlWithContext will call the kubectl binary with any optional arguments
   180  // provided and an arbitrary, given context. Useful when working with k8s
   181  // resources in a multi-cluster context. Optionally, stdin can be piped to
   182  // kubectl.
   183  func (h *KubernetesHelper) KubectlWithContext(stdin string, context string, arg ...string) (string, error) {
   184  	withContext := append([]string{"--context=" + context}, arg...)
   185  	cmd := exec.Command("kubectl", withContext...)
   186  	cmd.Stdin = strings.NewReader(stdin)
   187  	out, err := cmd.CombinedOutput()
   188  	return string(out), err
   189  }
   190  
   191  // GetConfigUID returns the uid associated to the linkerd-config ConfigMap resource
   192  // in the given namespace
   193  func (h *KubernetesHelper) GetConfigUID(ctx context.Context, namespace string) (string, error) {
   194  	cm, err := h.clientset.CoreV1().ConfigMaps(namespace).Get(ctx, k8s.ConfigConfigMapName, metav1.GetOptions{})
   195  	if err != nil {
   196  		return "", err
   197  	}
   198  	return string(cm.GetUID()), nil
   199  }
   200  
   201  // GetResources returns the resource limits and requests set on a deployment
   202  // of the set name in the given namespace
   203  func (h *KubernetesHelper) GetResources(ctx context.Context, containerName, deploymentName, namespace string) (corev1.ResourceRequirements, error) {
   204  	dep, err := h.clientset.AppsV1().Deployments(namespace).Get(ctx, deploymentName, metav1.GetOptions{})
   205  	if err != nil {
   206  		return corev1.ResourceRequirements{}, err
   207  	}
   208  
   209  	for _, container := range dep.Spec.Template.Spec.Containers {
   210  		if container.Name == containerName {
   211  			return container.Resources, nil
   212  		}
   213  	}
   214  	return corev1.ResourceRequirements{}, fmt.Errorf("container %s not found in deployment %s in namespace %s", containerName, deploymentName, namespace)
   215  }
   216  
   217  // CheckPods checks that a deployment in a namespace contains the expected
   218  // number of pods in the Running state, and that no pods have been restarted.
   219  func (h *KubernetesHelper) CheckPods(ctx context.Context, namespace string, deploymentName string, replicas int) error {
   220  	var checkedPods []corev1.Pod
   221  
   222  	err := h.retryFor(60*time.Minute, func() error {
   223  		checkedPods = []corev1.Pod{}
   224  		pods, err := h.GetPodsForDeployment(ctx, namespace, deploymentName)
   225  		if err != nil {
   226  			return err
   227  		}
   228  
   229  		var deploymentReplicas int
   230  		for _, pod := range pods {
   231  			checkedPods = append(checkedPods, pod)
   232  
   233  			deploymentReplicas++
   234  			if pod.Status.Phase != "Running" {
   235  				return fmt.Errorf("Pod [%s] in namespace [%s] is not running",
   236  					pod.Name, pod.Namespace)
   237  			}
   238  			for _, container := range pod.Status.ContainerStatuses {
   239  				if !container.Ready {
   240  					return fmt.Errorf("Container [%s] in pod [%s] in namespace [%s] is not running",
   241  						container.Name, pod.Name, pod.Namespace)
   242  				}
   243  			}
   244  		}
   245  
   246  		if deploymentReplicas != replicas {
   247  			return fmt.Errorf("Expected there to be [%d] pods in deployment [%s] in namespace [%s], but found [%d]",
   248  				replicas, deploymentName, namespace, deploymentReplicas)
   249  		}
   250  
   251  		return nil
   252  	})
   253  
   254  	if err != nil {
   255  		return err
   256  	}
   257  
   258  	for _, pod := range checkedPods {
   259  		for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
   260  			errStr := fmt.Sprintf("Container [%s] in pod [%s] in namespace [%s] has restart count [%d]",
   261  				status.Name, pod.Name, pod.Namespace, status.RestartCount)
   262  			if status.RestartCount == 1 {
   263  				return &RestartCountError{errStr}
   264  			}
   265  			if status.RestartCount > 1 {
   266  				return errors.New(errStr)
   267  			}
   268  		}
   269  	}
   270  
   271  	return nil
   272  }
   273  
   274  // CheckService checks that a service exists in a namespace.
   275  func (h *KubernetesHelper) CheckService(ctx context.Context, namespace string, serviceName string) error {
   276  	return h.retryFor(10*time.Second, func() error {
   277  		_, err := h.clientset.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{})
   278  		return err
   279  	})
   280  }
   281  
   282  // GetService gets a service that exists in a namespace.
   283  func (h *KubernetesHelper) GetService(ctx context.Context, namespace string, serviceName string) (*corev1.Service, error) {
   284  	service, err := h.clientset.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{})
   285  	if err != nil {
   286  		return nil, err
   287  	}
   288  	return service, nil
   289  }
   290  
   291  // GetEndpoints gets endpoints that exist in a namespace.
   292  func (h *KubernetesHelper) GetEndpoints(ctx context.Context, namespace string, serviceName string) (*corev1.Endpoints, error) {
   293  	ep, err := h.clientset.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
   294  	if err != nil {
   295  		return nil, err
   296  	}
   297  	return ep, nil
   298  }
   299  
   300  // GetPods returns all pods with the given labels
   301  func (h *KubernetesHelper) GetPods(ctx context.Context, namespace string, podLabels map[string]string) ([]corev1.Pod, error) {
   302  	podList, err := h.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
   303  		LabelSelector: labels.Set(podLabels).AsSelector().String(),
   304  	})
   305  	if err != nil {
   306  		return nil, err
   307  	}
   308  
   309  	return podList.Items, nil
   310  }
   311  
   312  // GetPodsForDeployment returns all pods for the given deployment
   313  func (h *KubernetesHelper) GetPodsForDeployment(ctx context.Context, namespace string, deploymentName string) ([]corev1.Pod, error) {
   314  	deploy, err := h.clientset.AppsV1().Deployments(namespace).Get(ctx, deploymentName, metav1.GetOptions{})
   315  	if err != nil {
   316  		return nil, err
   317  	}
   318  	return h.GetPods(ctx, namespace, deploy.Spec.Selector.MatchLabels)
   319  }
   320  
   321  // GetPodNamesForDeployment returns all pod names for the given deployment
   322  func (h *KubernetesHelper) GetPodNamesForDeployment(ctx context.Context, namespace string, deploymentName string) ([]string, error) {
   323  	podList, err := h.GetPodsForDeployment(ctx, namespace, deploymentName)
   324  	if err != nil {
   325  		return nil, err
   326  	}
   327  
   328  	pods := make([]string, 0)
   329  	for _, pod := range podList {
   330  		pods = append(pods, pod.Name)
   331  	}
   332  
   333  	return pods, nil
   334  }
   335  
   336  // ParseNamespacedResource extracts a namespace and resource name from a string
   337  // that's in the format namespace/resource. If the strings is in a different
   338  // format it returns an error.
   339  func (h *KubernetesHelper) ParseNamespacedResource(resource string) (string, string, error) {
   340  	r := regexp.MustCompile(`^(.+)\/(.+)$`)
   341  	matches := r.FindAllStringSubmatch(resource, 2)
   342  	if len(matches) == 0 {
   343  		return "", "", fmt.Errorf("string [%s] didn't contain expected format for namespace/resource, extracted: %v", resource, matches)
   344  	}
   345  	return matches[0][1], matches[0][2], nil
   346  }
   347  
   348  // URLFor creates a kubernetes port-forward, runs it, and returns the URL that
   349  // tests can use for access to the given deployment. Note that the port-forward
   350  // remains running for the duration of the test.
   351  func (h *KubernetesHelper) URLFor(ctx context.Context, namespace, deployName string, remotePort int) (string, error) {
   352  	k8sAPI, err := k8s.NewAPI("", h.k8sContext, "", []string{}, 0)
   353  	if err != nil {
   354  		return "", err
   355  	}
   356  
   357  	pf, err := k8s.NewPortForward(ctx, k8sAPI, namespace, deployName, "localhost", 0, remotePort, false)
   358  	if err != nil {
   359  		return "", err
   360  	}
   361  
   362  	if err = pf.Init(); err != nil {
   363  		return "", err
   364  	}
   365  
   366  	return pf.URLFor(""), nil
   367  }
   368  
   369  // WaitRollout blocks until all the given deployments have been completely
   370  // rolled out (and their pods are ready)
   371  func (h *KubernetesHelper) WaitRollout(t *testing.T, deploys map[string]DeploySpec) {
   372  	t.Helper()
   373  	// Use default context
   374  	h.WaitRolloutWithContext(t, deploys, h.k8sContext)
   375  }
   376  
   377  // WaitRolloutWithContext blocks until all the given deployments in a provided
   378  // k8s context have been completely rolled out (and their pods are ready)
   379  func (h *KubernetesHelper) WaitRolloutWithContext(t *testing.T, deploys map[string]DeploySpec, context string) {
   380  	t.Helper()
   381  	for deploy, deploySpec := range deploys {
   382  		stat, err := h.KubectlWithContext("", context, "--namespace="+deploySpec.Namespace,
   383  			"rollout", "status", "--timeout=5m", "deploy/"+deploy)
   384  		if err != nil {
   385  			desc, _ := h.KubectlWithContext("", context, "--namespace="+deploySpec.Namespace,
   386  				"describe", "po")
   387  			AnnotatedFatalf(t,
   388  				fmt.Sprintf("failed to wait rollout of deploy/%s", deploy),
   389  				"failed to wait for rollout of deploy/%s: %s: %s\n---\n%s", deploy, err, stat, desc)
   390  		}
   391  	}
   392  }
   393  
   394  // WaitUntilDeployReady will block and wait until all given deploys have been
   395  // rolled out and their pods are in a 'ready' status. The difference between
   396  // this and WaitRollout is that WaitUntilDeployReady uses CheckPods underneath,
   397  // instead of relying on the 'rollout' command. WaitUntilDeployReady will also
   398  // retry for a long period time. This function is used to block tests from
   399  // running until the control plane and extensions are ready.
   400  func (h *KubernetesHelper) WaitUntilDeployReady(deploys map[string]DeploySpec) {
   401  	ctx := context.Background()
   402  	for deploy, spec := range deploys {
   403  		if err := h.CheckPods(ctx, spec.Namespace, deploy, 1); err != nil {
   404  			var out string
   405  			//nolint:errorlint
   406  			if rce, ok := err.(*RestartCountError); ok {
   407  				out = fmt.Sprintf("Error running test: failed to wait for deploy/%s to become 'ready', too many restarts (%v)\n", deploy, rce)
   408  			} else {
   409  				out = fmt.Sprintf("Error running test: failed to wait for deploy/%s to become 'ready', timed out waiting for condition\n", deploy)
   410  			}
   411  			os.Stderr.Write([]byte(out))
   412  			os.Exit(1)
   413  		}
   414  	}
   415  }
   416  

View as plain text