     1  package multiclustertraffic
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"os"
     8  	"strings"
     9  	"testing"
    10  	"time"
    12  	mcHealthcheck "github.com/linkerd/linkerd2/multicluster/cmd"
    13  	"github.com/linkerd/linkerd2/pkg/healthcheck"
    14  	"github.com/linkerd/linkerd2/pkg/k8s"
    15  	"github.com/linkerd/linkerd2/testutil"
    16  	"github.com/linkerd/linkerd2/testutil/prommatch"
    17  )
    19  var (
    20  	TestHelper *testutil.TestHelper
    21  	targetCtx  string
    22  	sourceCtx  string
    23  	contexts   map[string]string
    24  )
    26  var (
    27  	nginxTargetLabels = prommatch.Labels{
    28  		"direction":            prommatch.Equals("outbound"),
    29  		"tls":                  prommatch.Equals("true"),
    30  		"server_id":            prommatch.Equals("default.linkerd-multicluster-statefulset.serviceaccount.identity.linkerd.cluster.local"),
    31  		"dst_control_plane_ns": prommatch.Equals("linkerd"),
    32  		"dst_namespace":        prommatch.Equals("linkerd-multicluster-statefulset"),
    33  		"dst_pod":              prommatch.Equals("nginx-statefulset-0"),
    34  		"dst_serviceaccount":   prommatch.Equals("default"),
    35  		"dst_statefulset":      prommatch.Equals("nginx-statefulset"),
    36  	}
    38  	tcpConnMatcher = prommatch.NewMatcher("tcp_open_total",
    39  		prommatch.Labels{
    40  			"peer": prommatch.Equals("dst"),
    41  		},
    42  		prommatch.TargetAddrLabels(),
    43  		nginxTargetLabels,
    44  		prommatch.HasPositiveValue(),
    45  	)
    46  	httpReqMatcher = prommatch.NewMatcher("request_total",
    47  		prommatch.TargetAddrLabels(),
    48  		nginxTargetLabels,
    49  		prommatch.HasPositiveValue(),
    50  	)
    51  )
    53  func TestMain(m *testing.M) {
    54  	TestHelper = testutil.NewTestHelper()
    55  	// Before starting, initialize contexts
    56  	contexts = TestHelper.GetMulticlusterContexts()
    57  	sourceCtx = contexts[testutil.SourceContextKey]
    58  	targetCtx = contexts[testutil.TargetContextKey]
    59  	// Then, re-build clientset with source cluster context instead of context
    60  	// inferred from environment.
    61  	if err := TestHelper.SwitchContext(sourceCtx); err != nil {
    62  		out := fmt.Sprintf("Error running test: failed to switch Kubernetes client to context [%s]: %s\n", sourceCtx, err)
    63  		os.Stderr.Write([]byte(out))
    64  		os.Exit(1)
    65  	}
    66  	// Block until gateway & service mirror deploys are running successfully in
    67  	// source cluster.
    68  	TestHelper.WaitUntilDeployReady(testutil.MulticlusterSourceReplicas)
    69  	os.Exit(m.Run())
    70  }
    72  // TestGateways tests the `linkerd multicluster gateways` command by installing
    73  // three emojivoto services in target cluster and asserting the output against
    74  // the source cluster.
    75  func TestGateways(t *testing.T) {
    76  	t.Run("install resources in target cluster", func(t *testing.T) {
    77  		// Create namespace in source cluster
    78  		out, err := TestHelper.KubectlWithContext("", contexts[testutil.SourceContextKey], "create", "namespace", "linkerd-nginx-gateway-deploy")
    79  		if err != nil {
    80  			testutil.AnnotatedFatalf(t, "failed to create namespace", "failed to create namespace 'linkerd-nginx-gateway-deploy': %s\n%s", err, out)
    81  		}
    83  		out, err = TestHelper.KubectlApplyWithContext("", contexts[testutil.TargetContextKey], "-n", "linkerd-nginx-gateway-deploy", "-f", "testdata/nginx-gateway-deploy.yaml")
    84  		if err != nil {
    85  			testutil.AnnotatedFatalf(t, "failed to install nginx deploy", "failed to install nginx deploy: %s\n%s", err, out)
    86  		}
    88  		// Wait for workloads to spin up in target cluster. These workloads will have
    89  		// their services mirrored in the source cluster. The test will check whether
    90  		// the gateway keeps track of the mirror services.
    91  		tgtWorkloadRollouts := map[string]testutil.DeploySpec{
    92  			"nginx-deploy": {Namespace: "linkerd-nginx-gateway-deploy", Replicas: 1},
    93  		}
    94  		TestHelper.WaitRolloutWithContext(t, tgtWorkloadRollouts, contexts[testutil.TargetContextKey])
    95  	})
    97  	timeout := time.Minute
    98  	err := testutil.RetryFor(timeout, func() error {
    99  		out, err := TestHelper.LinkerdRun("--context="+contexts[testutil.SourceContextKey], "multicluster", "gateways")
   100  		if err != nil {
   101  			return err
   102  		}
   103  		rows := strings.Split(out, "\n")
   104  		if len(rows) < 2 {
   105  			return errors.New("response is empty")
   106  		}
   107  		fields := strings.Fields(rows[1])
   108  		if len(fields) < 4 {
   109  			return fmt.Errorf("unexpected number of columns: %d", len(fields))
   110  		}
   111  		if fields[0] != "target" {
   112  			return fmt.Errorf("unexpected target cluster name: %s", fields[0])
   113  		}
   114  		if fields[1] != "True" {
   115  			return errors.New("target cluster is not alive")
   116  		}
   117  		if fields[2] != "1" {
   118  			return fmt.Errorf("invalid NUM_SVC: %s", fields[2])
   119  		}
   121  		return nil
   122  	})
   123  	if err != nil {
   124  		testutil.AnnotatedFatal(t, fmt.Sprintf("'linkerd multicluster gateways' command timed-out (%s)", timeout), err)
   125  	}
   126  }
   128  // TestCheckGatewayAfterRepairEndpoints calls `linkerd mc check` again after 1 minute,
   129  // so that the RepairEndpoints event has already been processed, making sure
   130  // that resyncing didn't break things.
   131  func TestCheckGatewayAfterRepairEndpoints(t *testing.T) {
   132  	// Re-build the clientset with the source context
   133  	if err := TestHelper.SwitchContext(contexts[testutil.SourceContextKey]); err != nil {
   134  		testutil.AnnotatedFatalf(t,
   135  			"failed to rebuild helper clientset with new context",
   136  			"failed to rebuild helper clientset with new context [%s]: %v",
   137  			contexts[testutil.SourceContextKey], err)
   138  	}
   139  	time.Sleep(time.Minute + 5*time.Second)
   140  	err := TestHelper.TestCheckWith([]healthcheck.CategoryID{mcHealthcheck.LinkerdMulticlusterExtensionCheck}, "--context", contexts[testutil.SourceContextKey])
   141  	if err != nil {
   142  		t.Fatalf("'linkerd check' command failed: %s", err)
   143  	}
   144  }
   146  // TestTargetTraffic inspects the target cluster's web-svc pod to see if the
   147  // source cluster's vote-bot has been able to hit it with requests. If it has
   148  // successfully issued requests, then we'll see log messages indicating that the
   149  // web-svc can't reach the voting-svc (because it's not running).
   150  //
   151  // TODO it may be clearer to invoke `linkerd diagnostics proxy-metrics` to check whether we see
   152  // connections from the gateway pod to the web-svc?
   153  func TestTargetTraffic(t *testing.T) {
   154  	if err := TestHelper.SwitchContext(contexts[testutil.TargetContextKey]); err != nil {
   155  		testutil.AnnotatedFatalf(t,
   156  			"failed to rebuild helper clientset with new context",
   157  			"failed to rebuild helper clientset with new context [%s]: %v",
   158  			contexts[testutil.TargetContextKey], err)
   159  	}
   161  	ctx := context.Background()
   162  	// Create emojivoto in target cluster, to be deleted at the end of the test.
   163  	annotations := map[string]string{
   164  		// "config.linkerd.io/proxy-log-level": "linkerd=debug,info",
   165  	}
   166  	TestHelper.WithDataPlaneNamespace(ctx, "emojivoto", annotations, t, func(t *testing.T, ns string) {
   167  		t.Run("Deploy resources in source and target clusters", func(t *testing.T) {
   168  			// Deploy vote-bot client in source-cluster
   169  			o, err := TestHelper.KubectlWithContext("", contexts[testutil.SourceContextKey], "create", "ns", ns)
   170  			if err != nil {
   171  				testutil.AnnotatedFatalf(t, "failed to create ns", "failed to create ns: %s\n%s", err, o)
   172  			}
   173  			o, err = TestHelper.KubectlApplyWithContext("", contexts[testutil.SourceContextKey], "--namespace", ns, "-f", "testdata/vote-bot.yml")
   174  			if err != nil {
   175  				testutil.AnnotatedFatalf(t, "failed to install vote-bot", "failed to install vote-bot: %s\n%s", err, o)
   176  			}
   178  			out, err := TestHelper.KubectlApplyWithContext("", contexts[testutil.TargetContextKey], "--namespace", ns, "-f", "testdata/emojivoto-no-bot.yml")
   179  			if err != nil {
   180  				testutil.AnnotatedFatalf(t, "failed to install emojivoto", "failed to install emojivoto: %s\n%s", err, out)
   181  			}
   183  			timeout := time.Minute
   184  			err = testutil.RetryFor(timeout, func() error {
   185  				out, err = TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace", ns, "label", "service/web-svc", "mirror.linkerd.io/exported=true")
   186  				return err
   187  			})
   188  			if err != nil {
   189  				testutil.AnnotatedFatalf(t, "failed to label web-svc", "%s\n%s", err, out)
   190  			}
   191  		})
   193  		t.Run("Wait until target workloads are ready", func(t *testing.T) {
   194  			// Wait until client is up and running in source cluster
   195  			voteBotDeployReplica := map[string]testutil.DeploySpec{"vote-bot": {Namespace: ns, Replicas: 1}}
   196  			TestHelper.WaitRolloutWithContext(t, voteBotDeployReplica, contexts[testutil.SourceContextKey])
   198  			// Wait until "target" services and replicas are up and running.
   199  			emojiDeployReplicas := map[string]testutil.DeploySpec{
   200  				"web":    {Namespace: ns, Replicas: 1},
   201  				"emoji":  {Namespace: ns, Replicas: 1},
   202  				"voting": {Namespace: ns, Replicas: 1},
   203  			}
   204  			TestHelper.WaitRolloutWithContext(t, emojiDeployReplicas, targetCtx)
   205  		})
   207  		timeout := time.Minute
   208  		err := testutil.RetryFor(timeout, func() error {
   209  			out, err := TestHelper.KubectlWithContext("",
   210  				targetCtx,
   211  				"--namespace", ns,
   212  				"logs",
   213  				"--selector", "app=web-svc",
   214  				"--container", "web-svc",
   215  			)
   216  			if err != nil {
   217  				return fmt.Errorf("%w\n%s", err, out)
   218  			}
   219  			// Check for expected error messages
   220  			for _, row := range strings.Split(out, "\n") {
   221  				if strings.Contains(row, " /api/vote?choice=:doughnut: ") {
   222  					return nil
   223  				}
   224  			}
   225  			return fmt.Errorf("web-svc logs in target cluster do not include voting errors\n%s", out)
   226  		})
   227  		if err != nil {
   228  			testutil.AnnotatedFatal(t, fmt.Sprintf("'linkerd multicluster gateways' command timed-out (%s)", timeout), err)
   229  		}
   230  	})
   231  }
   233  // TestMulticlusterStatefulSetTargetTraffic will test that a statefulset can be
   234  // mirrored from a target cluster to a source cluster. The test deploys two
   235  // workloads: a slow cooker (as a client) in the src, and an nginx statefulset in
   236  // (as a server) in the tgt. The slow-cooker is configured to send traffic to an
   237  // nginx endpoint mirror (nginx-statefulset-0). The traffic should be received
   238  // by the nginx pod in the tgt. To assert this, we get proxy metrics from the
   239  // gateway to make sure our connections from the source cluster were routed
   240  // correctly.
   241  func TestMulticlusterStatefulSetTargetTraffic(t *testing.T) {
   242  	if err := TestHelper.SwitchContext(contexts[testutil.TargetContextKey]); err != nil {
   243  		testutil.AnnotatedFatalf(t, "failed to rebuild helper clientset with new context", "failed to rebuild helper clientset with new context [%s]: %v", contexts[testutil.TargetContextKey], err)
   244  	}
   246  	ctx := context.Background()
   247  	// Create 'multicluster-statefulset' namespace in target cluster, to be deleted at the end of the test.
   248  	TestHelper.WithDataPlaneNamespace(ctx, "multicluster-statefulset", map[string]string{}, t, func(t *testing.T, ns string) {
   249  		t.Run("Deploy resources in source and target clusters", func(t *testing.T) {
   250  			// Create slow-cooker client in source cluster
   251  			out, err := TestHelper.KubectlApplyWithContext("", contexts[testutil.SourceContextKey], "-f", "testdata/slow-cooker.yml")
   252  			if err != nil {
   253  				testutil.AnnotatedFatalf(t, "failed to install slow-cooker", "failed to install slow-cooker: %s\ngot: %s", err, out)
   254  			}
   256  			// Create statefulset deployment in target cluster
   257  			out, err = TestHelper.KubectlApplyWithContext("", contexts[testutil.TargetContextKey], "-f", "testdata/nginx-ss.yml")
   258  			if err != nil {
   259  				testutil.AnnotatedFatalf(t, "failed to install nginx-ss", "failed to install nginx-ss: %s\n%s", err, out)
   260  			}
   261  		})
   263  		t.Run("Wait until workloads are ready", func(t *testing.T) {
   264  			// Wait until client is up and running in source cluster
   265  			scDeployReplica := map[string]testutil.DeploySpec{"slow-cooker": {Namespace: ns, Replicas: 1}}
   266  			TestHelper.WaitRolloutWithContext(t, scDeployReplica, contexts[testutil.SourceContextKey])
   268  			// Wait until "target" statefulset is up and running.
   269  			nginxSpec := testutil.DeploySpec{Namespace: ns, Replicas: 1}
   270  			o, err := TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace="+nginxSpec.Namespace, "rollout", "status", "--timeout=60m", "statefulset/nginx-statefulset")
   271  			if err != nil {
   272  				oEvt, _ := TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace="+nginxSpec.Namespace, "get", "event", "--field-selector", "involvedObject.name=nginx-statefulset")
   273  				testutil.AnnotatedFatalf(t,
   274  					fmt.Sprintf("failed to wait rollout of deploy/%s", "nginx-statefulset"),
   275  					"failed to wait for rollout of deploy/%s: %s: %s\nEvents:\n%s", "nginx-statefulset", err, o, oEvt)
   276  			}
   277  		})
   279  		_, err := TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace="+ns, "label", "svc", "nginx-statefulset-svc", k8s.DefaultExportedServiceSelector+"=true")
   280  		if err != nil {
   281  			testutil.AnnotatedFatal(t, "failed to label nginx-statefulset-svc service", err)
   282  		}
   284  		dgCmd := []string{"--context=" + targetCtx, "diagnostics", "proxy-metrics", "--namespace",
   285  			"linkerd-multicluster", "deploy/linkerd-gateway"}
   286  		t.Run("expect open outbound TCP connection from gateway to nginx", func(t *testing.T) {
   287  			// Use a short time window so that slow-cooker can warm-up and send
   288  			// requests.
   289  			err := testutil.RetryFor(1*time.Minute, func() error {
   290  				// Check gateway metrics
   291  				metrics, err := TestHelper.LinkerdRun(dgCmd...)
   292  				if err != nil {
   293  					return fmt.Errorf("failed to get metrics for gateway deployment: %w", err)
   294  				}
   296  				s := prommatch.Suite{}.
   297  					MustContain("TCP connection from gateway to nginx", tcpConnMatcher).
   298  					MustContain("HTTP requests from gateway to nginx", httpReqMatcher)
   300  				if err := s.CheckString(metrics); err != nil {
   301  					return fmt.Errorf("invalid metrics for gateway deployment: %w", err)
   302  				}
   304  				return nil
   305  			})
   307  			if err != nil {
   308  				testutil.AnnotatedFatalf(t, "unexpected error", "unexpected error: %v", err)
   309  			}
   310  		})
   311  	})
   312  }
   314  func TestSourceResourcesAreCleaned(t *testing.T) {
   315  	if err := TestHelper.SwitchContext(contexts[testutil.SourceContextKey]); err != nil {
   316  		testutil.AnnotatedFatalf(t, "failed to rebuild helper clientset with new context", "failed to rebuild helper clientset with new context [%s]: %v", contexts[testutil.SourceContextKey], err)
   317  	}
   319  	ctx := context.Background()
   320  	if err := TestHelper.DeleteNamespaceIfExists(ctx, "linkerd-multicluster-statefulset"); err != nil {
   321  		testutil.AnnotatedFatalf(t, fmt.Sprintf("failed to delete %s namespace", "linkerd-multicluster-statefulset"),
   322  			"failed to delete %s namespace: %s", "linkerd-multicluster-statefulset", err)
   323  	}
   325  	if err := TestHelper.DeleteNamespaceIfExists(ctx, "linkerd-emojivoto"); err != nil {
   326  		testutil.AnnotatedFatalf(t, fmt.Sprintf("failed to delete %s namespace", "linkerd-emojivoto"),
   327  			"failed to delete %s namespace: %s", "linkerd-emojivoto", err)
   328  	}
   330  	if err := TestHelper.DeleteNamespaceIfExists(ctx, "linkerd-nginx-gateway-deploy"); err != nil {
   331  		testutil.AnnotatedFatalf(t, fmt.Sprintf("failed to delete %s namespace", "linkerd-nginx-gateway-deploy"),
   332  			"failed to delete %s namespace: %s", "linkerd-nginx-gateway-deploy", err)
   333  	}
   334  }
   336  // At the end of the test, we have one resource left to clean 'linkerd-nginx-gateway-deploy',
   337  // so we just switch the context again and delete its corresponding namespace.
   338  func TestTargetResourcesAreCleaned(t *testing.T) {
   339  	if err := TestHelper.SwitchContext(contexts[testutil.TargetContextKey]); err != nil {
   340  		testutil.AnnotatedFatalf(t, "failed to rebuild helper clientset with new context", "failed to rebuild helper clientset with new context [%s]: %v", contexts[testutil.TargetContextKey], err)
   341  	}
   343  	ctx := context.Background()
   344  	if err := TestHelper.DeleteNamespaceIfExists(ctx, "linkerd-nginx-gateway-deploy"); err != nil {
   345  		testutil.AnnotatedFatalf(t, fmt.Sprintf("failed to delete %s namespace", "linkerd-nginx-gateway-deploy"),
   346  			"failed to delete %s namespace: %s", "linkerd-nginx-gateway-deploy", err)
   347  	}
   348  }

