1 package testutil
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "os"
8 "os/exec"
9 "regexp"
10 "strings"
11 "testing"
12 "time"
13
14 "github.com/linkerd/linkerd2/pkg/k8s"
15 corev1 "k8s.io/api/core/v1"
16 kerrors "k8s.io/apimachinery/pkg/api/errors"
17 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18 "k8s.io/apimachinery/pkg/labels"
19 "k8s.io/client-go/kubernetes"
20 "k8s.io/client-go/tools/clientcmd"
21
22
23 _ "k8s.io/client-go/plugin/pkg/client/auth/gcp"
24 )
25
26
27
28 type KubernetesHelper struct {
29 k8sContext string
30 clientset *kubernetes.Clientset
31 retryFor func(time.Duration, func() error) error
32 }
33
34
35
36
37
38
39 type RestartCountError struct {
40 msg string
41 }
42
43 func (e *RestartCountError) Error() string {
44 return e.msg
45 }
46
47
48 func NewKubernetesHelper(k8sContext string, retryFor func(time.Duration, func() error) error) (*KubernetesHelper, error) {
49 rules := clientcmd.NewDefaultClientConfigLoadingRules()
50 overrides := &clientcmd.ConfigOverrides{CurrentContext: k8sContext}
51 kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides)
52 config, err := kubeConfig.ClientConfig()
53 if err != nil {
54 return nil, err
55 }
56
57 clientset, err := kubernetes.NewForConfig(config)
58 if err != nil {
59 return nil, err
60 }
61
62 return &KubernetesHelper{
63 clientset: clientset,
64 k8sContext: k8sContext,
65 retryFor: retryFor,
66 }, nil
67 }
68
69
70 func (h *KubernetesHelper) CheckIfNamespaceExists(ctx context.Context, namespace string) error {
71 _, err := h.clientset.CoreV1().Namespaces().Get(ctx, namespace, metav1.GetOptions{})
72 return err
73 }
74
75
76
77 func (h *KubernetesHelper) SwitchContext(ctx string) error {
78 rules := clientcmd.NewDefaultClientConfigLoadingRules()
79 overrides := &clientcmd.ConfigOverrides{CurrentContext: ctx}
80 kubeConfig := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(rules, overrides)
81 config, err := kubeConfig.ClientConfig()
82 if err != nil {
83 return err
84 }
85
86 clientset, err := kubernetes.NewForConfig(config)
87 if err != nil {
88 return err
89 }
90
91 h.clientset = clientset
92 return nil
93 }
94
95
96 func (h *KubernetesHelper) GetSecret(ctx context.Context, namespace, name string) (*corev1.Secret, error) {
97 return h.clientset.CoreV1().Secrets(namespace).Get(ctx, name, metav1.GetOptions{})
98 }
99
100 func (h *KubernetesHelper) createNamespaceIfNotExists(ctx context.Context, namespace string, annotations, labels map[string]string) error {
101 err := h.CheckIfNamespaceExists(ctx, namespace)
102
103 if err != nil {
104 ns := &corev1.Namespace{
105 ObjectMeta: metav1.ObjectMeta{
106 Labels: labels,
107 Annotations: annotations,
108 Name: namespace,
109 },
110 }
111 _, err = h.clientset.CoreV1().Namespaces().Create(ctx, ns, metav1.CreateOptions{})
112
113 if err != nil {
114 return err
115 }
116 }
117
118 return nil
119 }
120
121
122
123 func (h *KubernetesHelper) DeleteNamespaceIfExists(ctx context.Context, namespace string) error {
124 err := h.clientset.CoreV1().Namespaces().Delete(ctx, namespace, metav1.DeleteOptions{})
125
126 if err != nil && !kerrors.IsNotFound(err) {
127 return err
128 }
129 return nil
130 }
131
132
133 func (h *KubernetesHelper) CreateControlPlaneNamespaceIfNotExists(ctx context.Context, namespace string) error {
134 return h.createNamespaceIfNotExists(ctx, namespace, nil, nil)
135 }
136
137
138
139 func (h *KubernetesHelper) CreateDataPlaneNamespaceIfNotExists(ctx context.Context, namespace string, annotations map[string]string) error {
140 return h.createNamespaceIfNotExists(ctx, namespace, annotations, map[string]string{"test.linkerd.io/is-test-data-plane": "true"})
141 }
142
143
144
145
146 func (h *KubernetesHelper) KubectlApply(stdin string, namespace string) (string, error) {
147 args := []string{"apply", "-f", "-"}
148 if namespace != "" {
149 args = append(args, "--namespace", namespace)
150 }
151
152 return h.Kubectl(stdin, args...)
153 }
154
155
156
157 func (h *KubernetesHelper) KubectlApplyWithArgs(stdin string, cmdArgs ...string) (string, error) {
158 args := []string{"apply"}
159 args = append(args, cmdArgs...)
160 args = append(args, "-f", "-")
161 return h.Kubectl(stdin, args...)
162 }
163
164
165 func (h *KubernetesHelper) Kubectl(stdin string, arg ...string) (string, error) {
166 withContext := append([]string{"--context=" + h.k8sContext}, arg...)
167 cmd := exec.Command("kubectl", withContext...)
168 cmd.Stdin = strings.NewReader(stdin)
169 out, err := cmd.CombinedOutput()
170 return string(out), err
171 }
172
173
174 func (h *KubernetesHelper) KubectlApplyWithContext(stdin string, context string, arg ...string) (string, error) {
175 args := append([]string{"apply"}, arg...)
176 return h.KubectlWithContext(stdin, context, args...)
177 }
178
179
180
181
182
183 func (h *KubernetesHelper) KubectlWithContext(stdin string, context string, arg ...string) (string, error) {
184 withContext := append([]string{"--context=" + context}, arg...)
185 cmd := exec.Command("kubectl", withContext...)
186 cmd.Stdin = strings.NewReader(stdin)
187 out, err := cmd.CombinedOutput()
188 return string(out), err
189 }
190
191
192
193 func (h *KubernetesHelper) GetConfigUID(ctx context.Context, namespace string) (string, error) {
194 cm, err := h.clientset.CoreV1().ConfigMaps(namespace).Get(ctx, k8s.ConfigConfigMapName, metav1.GetOptions{})
195 if err != nil {
196 return "", err
197 }
198 return string(cm.GetUID()), nil
199 }
200
201
202
203 func (h *KubernetesHelper) GetResources(ctx context.Context, containerName, deploymentName, namespace string) (corev1.ResourceRequirements, error) {
204 dep, err := h.clientset.AppsV1().Deployments(namespace).Get(ctx, deploymentName, metav1.GetOptions{})
205 if err != nil {
206 return corev1.ResourceRequirements{}, err
207 }
208
209 for _, container := range dep.Spec.Template.Spec.Containers {
210 if container.Name == containerName {
211 return container.Resources, nil
212 }
213 }
214 return corev1.ResourceRequirements{}, fmt.Errorf("container %s not found in deployment %s in namespace %s", containerName, deploymentName, namespace)
215 }
216
217
218
219 func (h *KubernetesHelper) CheckPods(ctx context.Context, namespace string, deploymentName string, replicas int) error {
220 var checkedPods []corev1.Pod
221
222 err := h.retryFor(60*time.Minute, func() error {
223 checkedPods = []corev1.Pod{}
224 pods, err := h.GetPodsForDeployment(ctx, namespace, deploymentName)
225 if err != nil {
226 return err
227 }
228
229 var deploymentReplicas int
230 for _, pod := range pods {
231 checkedPods = append(checkedPods, pod)
232
233 deploymentReplicas++
234 if pod.Status.Phase != "Running" {
235 return fmt.Errorf("Pod [%s] in namespace [%s] is not running",
236 pod.Name, pod.Namespace)
237 }
238 for _, container := range pod.Status.ContainerStatuses {
239 if !container.Ready {
240 return fmt.Errorf("Container [%s] in pod [%s] in namespace [%s] is not running",
241 container.Name, pod.Name, pod.Namespace)
242 }
243 }
244 }
245
246 if deploymentReplicas != replicas {
247 return fmt.Errorf("Expected there to be [%d] pods in deployment [%s] in namespace [%s], but found [%d]",
248 replicas, deploymentName, namespace, deploymentReplicas)
249 }
250
251 return nil
252 })
253
254 if err != nil {
255 return err
256 }
257
258 for _, pod := range checkedPods {
259 for _, status := range append(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses...) {
260 errStr := fmt.Sprintf("Container [%s] in pod [%s] in namespace [%s] has restart count [%d]",
261 status.Name, pod.Name, pod.Namespace, status.RestartCount)
262 if status.RestartCount == 1 {
263 return &RestartCountError{errStr}
264 }
265 if status.RestartCount > 1 {
266 return errors.New(errStr)
267 }
268 }
269 }
270
271 return nil
272 }
273
274
275 func (h *KubernetesHelper) CheckService(ctx context.Context, namespace string, serviceName string) error {
276 return h.retryFor(10*time.Second, func() error {
277 _, err := h.clientset.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{})
278 return err
279 })
280 }
281
282
283 func (h *KubernetesHelper) GetService(ctx context.Context, namespace string, serviceName string) (*corev1.Service, error) {
284 service, err := h.clientset.CoreV1().Services(namespace).Get(ctx, serviceName, metav1.GetOptions{})
285 if err != nil {
286 return nil, err
287 }
288 return service, nil
289 }
290
291
292 func (h *KubernetesHelper) GetEndpoints(ctx context.Context, namespace string, serviceName string) (*corev1.Endpoints, error) {
293 ep, err := h.clientset.CoreV1().Endpoints(namespace).Get(ctx, serviceName, metav1.GetOptions{})
294 if err != nil {
295 return nil, err
296 }
297 return ep, nil
298 }
299
300
301 func (h *KubernetesHelper) GetPods(ctx context.Context, namespace string, podLabels map[string]string) ([]corev1.Pod, error) {
302 podList, err := h.clientset.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{
303 LabelSelector: labels.Set(podLabels).AsSelector().String(),
304 })
305 if err != nil {
306 return nil, err
307 }
308
309 return podList.Items, nil
310 }
311
312
313 func (h *KubernetesHelper) GetPodsForDeployment(ctx context.Context, namespace string, deploymentName string) ([]corev1.Pod, error) {
314 deploy, err := h.clientset.AppsV1().Deployments(namespace).Get(ctx, deploymentName, metav1.GetOptions{})
315 if err != nil {
316 return nil, err
317 }
318 return h.GetPods(ctx, namespace, deploy.Spec.Selector.MatchLabels)
319 }
320
321
322 func (h *KubernetesHelper) GetPodNamesForDeployment(ctx context.Context, namespace string, deploymentName string) ([]string, error) {
323 podList, err := h.GetPodsForDeployment(ctx, namespace, deploymentName)
324 if err != nil {
325 return nil, err
326 }
327
328 pods := make([]string, 0)
329 for _, pod := range podList {
330 pods = append(pods, pod.Name)
331 }
332
333 return pods, nil
334 }
335
336
337
338
339 func (h *KubernetesHelper) ParseNamespacedResource(resource string) (string, string, error) {
340 r := regexp.MustCompile(`^(.+)\/(.+)$`)
341 matches := r.FindAllStringSubmatch(resource, 2)
342 if len(matches) == 0 {
343 return "", "", fmt.Errorf("string [%s] didn't contain expected format for namespace/resource, extracted: %v", resource, matches)
344 }
345 return matches[0][1], matches[0][2], nil
346 }
347
348
349
350
351 func (h *KubernetesHelper) URLFor(ctx context.Context, namespace, deployName string, remotePort int) (string, error) {
352 k8sAPI, err := k8s.NewAPI("", h.k8sContext, "", []string{}, 0)
353 if err != nil {
354 return "", err
355 }
356
357 pf, err := k8s.NewPortForward(ctx, k8sAPI, namespace, deployName, "localhost", 0, remotePort, false)
358 if err != nil {
359 return "", err
360 }
361
362 if err = pf.Init(); err != nil {
363 return "", err
364 }
365
366 return pf.URLFor(""), nil
367 }
368
369
370
371 func (h *KubernetesHelper) WaitRollout(t *testing.T, deploys map[string]DeploySpec) {
372 t.Helper()
373
374 h.WaitRolloutWithContext(t, deploys, h.k8sContext)
375 }
376
377
378
379 func (h *KubernetesHelper) WaitRolloutWithContext(t *testing.T, deploys map[string]DeploySpec, context string) {
380 t.Helper()
381 for deploy, deploySpec := range deploys {
382 stat, err := h.KubectlWithContext("", context, "--namespace="+deploySpec.Namespace,
383 "rollout", "status", "--timeout=5m", "deploy/"+deploy)
384 if err != nil {
385 desc, _ := h.KubectlWithContext("", context, "--namespace="+deploySpec.Namespace,
386 "describe", "po")
387 AnnotatedFatalf(t,
388 fmt.Sprintf("failed to wait rollout of deploy/%s", deploy),
389 "failed to wait for rollout of deploy/%s: %s: %s\n---\n%s", deploy, err, stat, desc)
390 }
391 }
392 }
393
394
395
396
397
398
399
400 func (h *KubernetesHelper) WaitUntilDeployReady(deploys map[string]DeploySpec) {
401 ctx := context.Background()
402 for deploy, spec := range deploys {
403 if err := h.CheckPods(ctx, spec.Namespace, deploy, 1); err != nil {
404 var out string
405
406 if rce, ok := err.(*RestartCountError); ok {
407 out = fmt.Sprintf("Error running test: failed to wait for deploy/%s to become 'ready', too many restarts (%v)\n", deploy, rce)
408 } else {
409 out = fmt.Sprintf("Error running test: failed to wait for deploy/%s to become 'ready', timed out waiting for condition\n", deploy)
410 }
411 os.Stderr.Write([]byte(out))
412 os.Exit(1)
413 }
414 }
415 }
416
View as plain text