1 package multiclustertraffic
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "os"
8 "strings"
9 "testing"
10 "time"
11
12 mcHealthcheck "github.com/linkerd/linkerd2/multicluster/cmd"
13 "github.com/linkerd/linkerd2/pkg/healthcheck"
14 "github.com/linkerd/linkerd2/pkg/k8s"
15 "github.com/linkerd/linkerd2/testutil"
16 "github.com/linkerd/linkerd2/testutil/prommatch"
17 )
18
19 var (
20 TestHelper *testutil.TestHelper
21 targetCtx string
22 sourceCtx string
23 contexts map[string]string
24 )
25
26 var (
27 nginxTargetLabels = prommatch.Labels{
28 "direction": prommatch.Equals("outbound"),
29 "tls": prommatch.Equals("true"),
30 "server_id": prommatch.Equals("default.linkerd-multicluster-statefulset.serviceaccount.identity.linkerd.cluster.local"),
31 "dst_control_plane_ns": prommatch.Equals("linkerd"),
32 "dst_namespace": prommatch.Equals("linkerd-multicluster-statefulset"),
33 "dst_pod": prommatch.Equals("nginx-statefulset-0"),
34 "dst_serviceaccount": prommatch.Equals("default"),
35 "dst_statefulset": prommatch.Equals("nginx-statefulset"),
36 }
37
38 tcpConnMatcher = prommatch.NewMatcher("tcp_open_total",
39 prommatch.Labels{
40 "peer": prommatch.Equals("dst"),
41 },
42 prommatch.TargetAddrLabels(),
43 nginxTargetLabels,
44 prommatch.HasPositiveValue(),
45 )
46 httpReqMatcher = prommatch.NewMatcher("request_total",
47 prommatch.TargetAddrLabels(),
48 nginxTargetLabels,
49 prommatch.HasPositiveValue(),
50 )
51 )
52
53 func TestMain(m *testing.M) {
54 TestHelper = testutil.NewTestHelper()
55
56 contexts = TestHelper.GetMulticlusterContexts()
57 sourceCtx = contexts[testutil.SourceContextKey]
58 targetCtx = contexts[testutil.TargetContextKey]
59
60
61 if err := TestHelper.SwitchContext(sourceCtx); err != nil {
62 out := fmt.Sprintf("Error running test: failed to switch Kubernetes client to context [%s]: %s\n", sourceCtx, err)
63 os.Stderr.Write([]byte(out))
64 os.Exit(1)
65 }
66
67
68 TestHelper.WaitUntilDeployReady(testutil.MulticlusterSourceReplicas)
69 os.Exit(m.Run())
70 }
71
72
73
74
75 func TestGateways(t *testing.T) {
76 t.Run("install resources in target cluster", func(t *testing.T) {
77
78 out, err := TestHelper.KubectlWithContext("", contexts[testutil.SourceContextKey], "create", "namespace", "linkerd-nginx-gateway-deploy")
79 if err != nil {
80 testutil.AnnotatedFatalf(t, "failed to create namespace", "failed to create namespace 'linkerd-nginx-gateway-deploy': %s\n%s", err, out)
81 }
82
83 out, err = TestHelper.KubectlApplyWithContext("", contexts[testutil.TargetContextKey], "-n", "linkerd-nginx-gateway-deploy", "-f", "testdata/nginx-gateway-deploy.yaml")
84 if err != nil {
85 testutil.AnnotatedFatalf(t, "failed to install nginx deploy", "failed to install nginx deploy: %s\n%s", err, out)
86 }
87
88
89
90
91 tgtWorkloadRollouts := map[string]testutil.DeploySpec{
92 "nginx-deploy": {Namespace: "linkerd-nginx-gateway-deploy", Replicas: 1},
93 }
94 TestHelper.WaitRolloutWithContext(t, tgtWorkloadRollouts, contexts[testutil.TargetContextKey])
95 })
96
97 timeout := time.Minute
98 err := testutil.RetryFor(timeout, func() error {
99 out, err := TestHelper.LinkerdRun("--context="+contexts[testutil.SourceContextKey], "multicluster", "gateways")
100 if err != nil {
101 return err
102 }
103 rows := strings.Split(out, "\n")
104 if len(rows) < 2 {
105 return errors.New("response is empty")
106 }
107 fields := strings.Fields(rows[1])
108 if len(fields) < 4 {
109 return fmt.Errorf("unexpected number of columns: %d", len(fields))
110 }
111 if fields[0] != "target" {
112 return fmt.Errorf("unexpected target cluster name: %s", fields[0])
113 }
114 if fields[1] != "True" {
115 return errors.New("target cluster is not alive")
116 }
117 if fields[2] != "1" {
118 return fmt.Errorf("invalid NUM_SVC: %s", fields[2])
119 }
120
121 return nil
122 })
123 if err != nil {
124 testutil.AnnotatedFatal(t, fmt.Sprintf("'linkerd multicluster gateways' command timed-out (%s)", timeout), err)
125 }
126 }
127
128
129
130
131 func TestCheckGatewayAfterRepairEndpoints(t *testing.T) {
132
133 if err := TestHelper.SwitchContext(contexts[testutil.SourceContextKey]); err != nil {
134 testutil.AnnotatedFatalf(t,
135 "failed to rebuild helper clientset with new context",
136 "failed to rebuild helper clientset with new context [%s]: %v",
137 contexts[testutil.SourceContextKey], err)
138 }
139 time.Sleep(time.Minute + 5*time.Second)
140 err := TestHelper.TestCheckWith([]healthcheck.CategoryID{mcHealthcheck.LinkerdMulticlusterExtensionCheck}, "--context", contexts[testutil.SourceContextKey])
141 if err != nil {
142 t.Fatalf("'linkerd check' command failed: %s", err)
143 }
144 }
145
146
147
148
149
150
151
152
153 func TestTargetTraffic(t *testing.T) {
154 if err := TestHelper.SwitchContext(contexts[testutil.TargetContextKey]); err != nil {
155 testutil.AnnotatedFatalf(t,
156 "failed to rebuild helper clientset with new context",
157 "failed to rebuild helper clientset with new context [%s]: %v",
158 contexts[testutil.TargetContextKey], err)
159 }
160
161 ctx := context.Background()
162
163 annotations := map[string]string{
164
165 }
166 TestHelper.WithDataPlaneNamespace(ctx, "emojivoto", annotations, t, func(t *testing.T, ns string) {
167 t.Run("Deploy resources in source and target clusters", func(t *testing.T) {
168
169 o, err := TestHelper.KubectlWithContext("", contexts[testutil.SourceContextKey], "create", "ns", ns)
170 if err != nil {
171 testutil.AnnotatedFatalf(t, "failed to create ns", "failed to create ns: %s\n%s", err, o)
172 }
173 o, err = TestHelper.KubectlApplyWithContext("", contexts[testutil.SourceContextKey], "--namespace", ns, "-f", "testdata/vote-bot.yml")
174 if err != nil {
175 testutil.AnnotatedFatalf(t, "failed to install vote-bot", "failed to install vote-bot: %s\n%s", err, o)
176 }
177
178 out, err := TestHelper.KubectlApplyWithContext("", contexts[testutil.TargetContextKey], "--namespace", ns, "-f", "testdata/emojivoto-no-bot.yml")
179 if err != nil {
180 testutil.AnnotatedFatalf(t, "failed to install emojivoto", "failed to install emojivoto: %s\n%s", err, out)
181 }
182
183 timeout := time.Minute
184 err = testutil.RetryFor(timeout, func() error {
185 out, err = TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace", ns, "label", "service/web-svc", "mirror.linkerd.io/exported=true")
186 return err
187 })
188 if err != nil {
189 testutil.AnnotatedFatalf(t, "failed to label web-svc", "%s\n%s", err, out)
190 }
191 })
192
193 t.Run("Wait until target workloads are ready", func(t *testing.T) {
194
195 voteBotDeployReplica := map[string]testutil.DeploySpec{"vote-bot": {Namespace: ns, Replicas: 1}}
196 TestHelper.WaitRolloutWithContext(t, voteBotDeployReplica, contexts[testutil.SourceContextKey])
197
198
199 emojiDeployReplicas := map[string]testutil.DeploySpec{
200 "web": {Namespace: ns, Replicas: 1},
201 "emoji": {Namespace: ns, Replicas: 1},
202 "voting": {Namespace: ns, Replicas: 1},
203 }
204 TestHelper.WaitRolloutWithContext(t, emojiDeployReplicas, targetCtx)
205 })
206
207 timeout := time.Minute
208 err := testutil.RetryFor(timeout, func() error {
209 out, err := TestHelper.KubectlWithContext("",
210 targetCtx,
211 "--namespace", ns,
212 "logs",
213 "--selector", "app=web-svc",
214 "--container", "web-svc",
215 )
216 if err != nil {
217 return fmt.Errorf("%w\n%s", err, out)
218 }
219
220 for _, row := range strings.Split(out, "\n") {
221 if strings.Contains(row, " /api/vote?choice=:doughnut: ") {
222 return nil
223 }
224 }
225 return fmt.Errorf("web-svc logs in target cluster do not include voting errors\n%s", out)
226 })
227 if err != nil {
228 testutil.AnnotatedFatal(t, fmt.Sprintf("'linkerd multicluster gateways' command timed-out (%s)", timeout), err)
229 }
230 })
231 }
232
233
234
235
236
237
238
239
240
241 func TestMulticlusterStatefulSetTargetTraffic(t *testing.T) {
242 if err := TestHelper.SwitchContext(contexts[testutil.TargetContextKey]); err != nil {
243 testutil.AnnotatedFatalf(t, "failed to rebuild helper clientset with new context", "failed to rebuild helper clientset with new context [%s]: %v", contexts[testutil.TargetContextKey], err)
244 }
245
246 ctx := context.Background()
247
248 TestHelper.WithDataPlaneNamespace(ctx, "multicluster-statefulset", map[string]string{}, t, func(t *testing.T, ns string) {
249 t.Run("Deploy resources in source and target clusters", func(t *testing.T) {
250
251 out, err := TestHelper.KubectlApplyWithContext("", contexts[testutil.SourceContextKey], "-f", "testdata/slow-cooker.yml")
252 if err != nil {
253 testutil.AnnotatedFatalf(t, "failed to install slow-cooker", "failed to install slow-cooker: %s\ngot: %s", err, out)
254 }
255
256
257 out, err = TestHelper.KubectlApplyWithContext("", contexts[testutil.TargetContextKey], "-f", "testdata/nginx-ss.yml")
258 if err != nil {
259 testutil.AnnotatedFatalf(t, "failed to install nginx-ss", "failed to install nginx-ss: %s\n%s", err, out)
260 }
261 })
262
263 t.Run("Wait until workloads are ready", func(t *testing.T) {
264
265 scDeployReplica := map[string]testutil.DeploySpec{"slow-cooker": {Namespace: ns, Replicas: 1}}
266 TestHelper.WaitRolloutWithContext(t, scDeployReplica, contexts[testutil.SourceContextKey])
267
268
269 nginxSpec := testutil.DeploySpec{Namespace: ns, Replicas: 1}
270 o, err := TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace="+nginxSpec.Namespace, "rollout", "status", "--timeout=60m", "statefulset/nginx-statefulset")
271 if err != nil {
272 oEvt, _ := TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace="+nginxSpec.Namespace, "get", "event", "--field-selector", "involvedObject.name=nginx-statefulset")
273 testutil.AnnotatedFatalf(t,
274 fmt.Sprintf("failed to wait rollout of deploy/%s", "nginx-statefulset"),
275 "failed to wait for rollout of deploy/%s: %s: %s\nEvents:\n%s", "nginx-statefulset", err, o, oEvt)
276 }
277 })
278
279 _, err := TestHelper.KubectlWithContext("", contexts[testutil.TargetContextKey], "--namespace="+ns, "label", "svc", "nginx-statefulset-svc", k8s.DefaultExportedServiceSelector+"=true")
280 if err != nil {
281 testutil.AnnotatedFatal(t, "failed to label nginx-statefulset-svc service", err)
282 }
283
284 dgCmd := []string{"--context=" + targetCtx, "diagnostics", "proxy-metrics", "--namespace",
285 "linkerd-multicluster", "deploy/linkerd-gateway"}
286 t.Run("expect open outbound TCP connection from gateway to nginx", func(t *testing.T) {
287
288
289 err := testutil.RetryFor(1*time.Minute, func() error {
290
291 metrics, err := TestHelper.LinkerdRun(dgCmd...)
292 if err != nil {
293 return fmt.Errorf("failed to get metrics for gateway deployment: %w", err)
294 }
295
296 s := prommatch.Suite{}.
297 MustContain("TCP connection from gateway to nginx", tcpConnMatcher).
298 MustContain("HTTP requests from gateway to nginx", httpReqMatcher)
299
300 if err := s.CheckString(metrics); err != nil {
301 return fmt.Errorf("invalid metrics for gateway deployment: %w", err)
302 }
303
304 return nil
305 })
306
307 if err != nil {
308 testutil.AnnotatedFatalf(t, "unexpected error", "unexpected error: %v", err)
309 }
310 })
311 })
312 }
313
314 func TestSourceResourcesAreCleaned(t *testing.T) {
315 if err := TestHelper.SwitchContext(contexts[testutil.SourceContextKey]); err != nil {
316 testutil.AnnotatedFatalf(t, "failed to rebuild helper clientset with new context", "failed to rebuild helper clientset with new context [%s]: %v", contexts[testutil.SourceContextKey], err)
317 }
318
319 ctx := context.Background()
320 if err := TestHelper.DeleteNamespaceIfExists(ctx, "linkerd-multicluster-statefulset"); err != nil {
321 testutil.AnnotatedFatalf(t, fmt.Sprintf("failed to delete %s namespace", "linkerd-multicluster-statefulset"),
322 "failed to delete %s namespace: %s", "linkerd-multicluster-statefulset", err)
323 }
324
325 if err := TestHelper.DeleteNamespaceIfExists(ctx, "linkerd-emojivoto"); err != nil {
326 testutil.AnnotatedFatalf(t, fmt.Sprintf("failed to delete %s namespace", "linkerd-emojivoto"),
327 "failed to delete %s namespace: %s", "linkerd-emojivoto", err)
328 }
329
330 if err := TestHelper.DeleteNamespaceIfExists(ctx, "linkerd-nginx-gateway-deploy"); err != nil {
331 testutil.AnnotatedFatalf(t, fmt.Sprintf("failed to delete %s namespace", "linkerd-nginx-gateway-deploy"),
332 "failed to delete %s namespace: %s", "linkerd-nginx-gateway-deploy", err)
333 }
334 }
335
336
337
338 func TestTargetResourcesAreCleaned(t *testing.T) {
339 if err := TestHelper.SwitchContext(contexts[testutil.TargetContextKey]); err != nil {
340 testutil.AnnotatedFatalf(t, "failed to rebuild helper clientset with new context", "failed to rebuild helper clientset with new context [%s]: %v", contexts[testutil.TargetContextKey], err)
341 }
342
343 ctx := context.Background()
344 if err := TestHelper.DeleteNamespaceIfExists(ctx, "linkerd-nginx-gateway-deploy"); err != nil {
345 testutil.AnnotatedFatalf(t, fmt.Sprintf("failed to delete %s namespace", "linkerd-nginx-gateway-deploy"),
346 "failed to delete %s namespace: %s", "linkerd-nginx-gateway-deploy", err)
347 }
348 }
349
View as plain text