1 package cmd
2
3 import (
4 "bytes"
5 "context"
6 "net/http"
7 "net/http/httptest"
8 "os"
9 "strings"
10 "testing"
11
12 "github.com/golang/protobuf/ptypes/duration"
13 netPb "github.com/linkerd/linkerd2/controller/gen/common/net"
14 "github.com/linkerd/linkerd2/pkg/addr"
15 "github.com/linkerd/linkerd2/pkg/k8s"
16 "github.com/linkerd/linkerd2/pkg/protohttp"
17 metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
18 tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
19 "github.com/linkerd/linkerd2/viz/tap/pkg"
20 "google.golang.org/grpc/codes"
21 )
22
23 const targetName = "pod-666"
24
25 func busyTest(t *testing.T, output string) {
26 resourceType := k8s.Pod
27 params := pkg.TapRequestParams{
28 Resource: resourceType + "/" + targetName,
29 Scheme: "https",
30 Method: "GET",
31 Authority: "localhost",
32 Path: "/some/path",
33 }
34
35 req, err := pkg.BuildTapByResourceRequest(params)
36 if err != nil {
37 t.Fatalf("Unexpected error: %v", err)
38 }
39
40 event1 := pkg.CreateTapEvent(
41 &tapPb.TapEvent_Http{
42 Event: &tapPb.TapEvent_Http_RequestInit_{
43 RequestInit: &tapPb.TapEvent_Http_RequestInit{
44 Id: &tapPb.TapEvent_Http_StreamId{
45 Base: 1,
46 },
47 Method: &metricsPb.HttpMethod{
48 Type: &metricsPb.HttpMethod_Registered_{
49 Registered: metricsPb.HttpMethod_GET,
50 },
51 },
52 Scheme: &metricsPb.Scheme{
53 Type: &metricsPb.Scheme_Registered_{
54 Registered: metricsPb.Scheme_HTTPS,
55 },
56 },
57 Authority: params.Authority,
58 Path: params.Path,
59 Headers: &metricsPb.Headers{
60 Headers: []*metricsPb.Headers_Header{
61 {
62 Name: "header-name-1",
63 Value: &metricsPb.Headers_Header_ValueStr{
64 ValueStr: "header-value-str-1",
65 },
66 },
67 {
68 Name: "header-name-2",
69 Value: &metricsPb.Headers_Header_ValueBin{
70 ValueBin: []byte("header-value-bin-2"),
71 },
72 },
73 },
74 },
75 },
76 },
77 },
78 map[string]string{
79 "pod": "my-pod",
80 "tls": "true",
81 },
82 tapPb.TapEvent_OUTBOUND,
83 )
84 event2 := pkg.CreateTapEvent(
85 &tapPb.TapEvent_Http{
86 Event: &tapPb.TapEvent_Http_ResponseEnd_{
87 ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
88 Id: &tapPb.TapEvent_Http_StreamId{
89 Base: 1,
90 },
91 Eos: &metricsPb.Eos{
92 End: &metricsPb.Eos_GrpcStatusCode{GrpcStatusCode: 666},
93 },
94 SinceRequestInit: &duration.Duration{
95 Seconds: 10,
96 },
97 SinceResponseInit: &duration.Duration{
98 Seconds: 100,
99 },
100 ResponseBytes: 1337,
101 Trailers: &metricsPb.Headers{
102 Headers: []*metricsPb.Headers_Header{
103 {
104 Name: "trailer-name",
105 Value: &metricsPb.Headers_Header_ValueBin{
106 ValueBin: []byte("header-value-bin"),
107 },
108 },
109 },
110 },
111 },
112 },
113 },
114 map[string]string{},
115 tapPb.TapEvent_OUTBOUND,
116 )
117 kubeAPI, err := k8s.NewFakeAPI()
118 if err != nil {
119 t.Fatalf("Unexpected error: %v", err)
120 }
121 ts := httptest.NewServer(http.HandlerFunc(
122 func(w http.ResponseWriter, r *http.Request) {
123 for _, event := range []*tapPb.TapEvent{event1, event2} {
124 err = protohttp.WriteProtoToHTTPResponse(w, event)
125 if err != nil {
126 t.Fatalf("Unexpected error: %v", err)
127 }
128 }
129 }),
130 )
131 defer ts.Close()
132 kubeAPI.Config.Host = ts.URL
133
134 options := newTapOptions()
135 options.output = output
136
137 writer := bytes.NewBufferString("")
138 err = requestTapByResourceFromAPI(context.Background(), writer, kubeAPI, req, options)
139 if err != nil {
140 t.Fatalf("Unexpected error: %v", err)
141 }
142
143 var goldenFilePath string
144 switch {
145 case options.output == wideOutput:
146 goldenFilePath = "testdata/tap_busy_output_wide.golden"
147 case options.output == jsonOutput:
148 goldenFilePath = "testdata/tap_busy_output_json.golden"
149 case strings.HasPrefix(options.output, jsonPathOutput):
150 goldenFilePath = "testdata/tap_busy_output_jsonpath.golden"
151 default:
152 goldenFilePath = "testdata/tap_busy_output.golden"
153 }
154
155 goldenFileBytes, err := os.ReadFile(goldenFilePath)
156 if err != nil {
157 t.Fatalf("Unexpected error: %v", err)
158 }
159 expectedContent := string(goldenFileBytes)
160 actual := writer.String()
161 if expectedContent != actual {
162 t.Fatalf("Expected function to render:\n%s\bbut got:\n%s", expectedContent, actual)
163 }
164 }
165
166 func TestRequestTapByResourceFromAPI(t *testing.T) {
167
168 ctx := context.Background()
169 t.Run("Should render busy response if everything went well", func(t *testing.T) {
170 busyTest(t, "")
171 })
172
173 t.Run("Should render wide busy response if everything went well", func(t *testing.T) {
174 busyTest(t, "wide")
175 })
176
177 t.Run("Should render JSON busy response if everything went well", func(t *testing.T) {
178 busyTest(t, "json")
179 })
180
181 t.Run("Should render jsonpath busy response if everything went well", func(t *testing.T) {
182 busyTest(t, "jsonpath={.source}")
183 })
184
185 t.Run("Should render empty response if no events returned", func(t *testing.T) {
186 resourceType := k8s.Pod
187 params := pkg.TapRequestParams{
188 Resource: resourceType + "/" + targetName,
189 Scheme: "https",
190 Method: "GET",
191 Authority: "localhost",
192 Path: "/some/path",
193 }
194
195 req, err := pkg.BuildTapByResourceRequest(params)
196 if err != nil {
197 t.Fatalf("Unexpected error: %v", err)
198 }
199
200 kubeAPI, err := k8s.NewFakeAPI()
201 if err != nil {
202 t.Fatalf("Unexpected error: %v", err)
203 }
204 ts := httptest.NewServer(http.HandlerFunc(
205 func(w http.ResponseWriter, r *http.Request) {}),
206 )
207 defer ts.Close()
208 kubeAPI.Config.Host = ts.URL
209
210 options := newTapOptions()
211 writer := bytes.NewBufferString("")
212 err = requestTapByResourceFromAPI(ctx, writer, kubeAPI, req, options)
213 if err != nil {
214 t.Fatalf("Unexpected error: %v", err)
215 }
216
217 goldenFileBytes, err := os.ReadFile("testdata/tap_empty_output.golden")
218 if err != nil {
219 t.Fatalf("Unexpected error: %v", err)
220 }
221 expectedContent := string(goldenFileBytes)
222 output := writer.String()
223 if expectedContent != output {
224 t.Fatalf("Expected function to render:\n%s\bbut got:\n%s", expectedContent, output)
225 }
226 })
227
228 t.Run("Should return error if stream returned error", func(t *testing.T) {
229 t.SkipNow()
230 resourceType := k8s.Pod
231 params := pkg.TapRequestParams{
232 Resource: resourceType + "/" + targetName,
233 Scheme: "https",
234 Method: "GET",
235 Authority: "localhost",
236 Path: "/some/path",
237 }
238
239 req, err := pkg.BuildTapByResourceRequest(params)
240 if err != nil {
241 t.Fatalf("Unexpected error: %v", err)
242 }
243
244 kubeAPI, err := k8s.NewFakeAPI()
245 if err != nil {
246 t.Fatalf("Unexpected error: %v", err)
247 }
248
249 options := newTapOptions()
250 writer := bytes.NewBufferString("")
251 err = requestTapByResourceFromAPI(ctx, writer, kubeAPI, req, options)
252 if err == nil {
253 t.Fatalf("Expecting error, got nothing but output [%s]", writer.String())
254 }
255 })
256 }
257
258 func TestEventToString(t *testing.T) {
259 toTapEvent := func(httpEvent *tapPb.TapEvent_Http) *tapPb.TapEvent {
260 streamID := &tapPb.TapEvent_Http_StreamId{
261 Base: 7,
262 Stream: 8,
263 }
264
265 switch httpEvent.Event.(type) {
266 case *tapPb.TapEvent_Http_RequestInit_:
267 httpEvent.GetRequestInit().Id = streamID
268 case *tapPb.TapEvent_Http_ResponseInit_:
269 httpEvent.GetResponseInit().Id = streamID
270 case *tapPb.TapEvent_Http_ResponseEnd_:
271 httpEvent.GetResponseEnd().Id = streamID
272 }
273
274 srcIP, _ := addr.ParsePublicIP("1.2.3.4")
275 destIP, _ := addr.ParsePublicIP("2.3.4.5")
276 return &tapPb.TapEvent{
277 ProxyDirection: tapPb.TapEvent_OUTBOUND,
278 Source: &netPb.TcpAddress{
279 Ip: srcIP,
280 Port: 5555,
281 },
282 Destination: &netPb.TcpAddress{
283 Ip: destIP,
284 Port: 6666,
285 },
286 Event: &tapPb.TapEvent_Http_{Http: httpEvent},
287 }
288 }
289
290 t.Run("Converts HTTP request init event to string", func(t *testing.T) {
291 event := toTapEvent(&tapPb.TapEvent_Http{
292 Event: &tapPb.TapEvent_Http_RequestInit_{
293 RequestInit: &tapPb.TapEvent_Http_RequestInit{
294 Method: &metricsPb.HttpMethod{
295 Type: &metricsPb.HttpMethod_Registered_{
296 Registered: metricsPb.HttpMethod_POST,
297 },
298 },
299 Scheme: &metricsPb.Scheme{
300 Type: &metricsPb.Scheme_Registered_{
301 Registered: metricsPb.Scheme_HTTPS,
302 },
303 },
304 Authority: "hello.default:7777",
305 Path: "/hello.v1.HelloService/Hello",
306 },
307 },
308 })
309
310 expectedOutput := "req id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= :method=POST :authority=hello.default:7777 :path=/hello.v1.HelloService/Hello"
311 output := renderTapEvent(event)
312 if output != expectedOutput {
313 t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
314 }
315 })
316
317 t.Run("Converts HTTP response init event to string", func(t *testing.T) {
318 event := toTapEvent(&tapPb.TapEvent_Http{
319 Event: &tapPb.TapEvent_Http_ResponseInit_{
320 ResponseInit: &tapPb.TapEvent_Http_ResponseInit{
321 SinceRequestInit: &duration.Duration{Seconds: 9, Nanos: 999000},
322 HttpStatus: http.StatusOK,
323 },
324 },
325 })
326
327 expectedOutput := "rsp id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= :status=200 latency=9000999µs"
328 output := renderTapEvent(event)
329 if output != expectedOutput {
330 t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
331 }
332 })
333
334 t.Run("Converts gRPC response end event to string", func(t *testing.T) {
335 event := toTapEvent(&tapPb.TapEvent_Http{
336 Event: &tapPb.TapEvent_Http_ResponseEnd_{
337 ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
338 SinceRequestInit: &duration.Duration{Nanos: 999000},
339 SinceResponseInit: &duration.Duration{Nanos: 888000},
340 ResponseBytes: 111,
341 Eos: &metricsPb.Eos{
342 End: &metricsPb.Eos_GrpcStatusCode{GrpcStatusCode: uint32(codes.OK)},
343 },
344 },
345 },
346 })
347
348 expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= grpc-status=OK duration=888µs response-length=111B"
349 output := renderTapEvent(event)
350 if output != expectedOutput {
351 t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
352 }
353 })
354
355 t.Run("Converts HTTP response end event with reset error code to string", func(t *testing.T) {
356 event := toTapEvent(&tapPb.TapEvent_Http{
357 Event: &tapPb.TapEvent_Http_ResponseEnd_{
358 ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
359 SinceRequestInit: &duration.Duration{Nanos: 999000},
360 SinceResponseInit: &duration.Duration{Nanos: 888000},
361 ResponseBytes: 111,
362 Eos: &metricsPb.Eos{
363 End: &metricsPb.Eos_ResetErrorCode{ResetErrorCode: 123},
364 },
365 },
366 },
367 })
368
369 expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= reset-error=123 duration=888µs response-length=111B"
370 output := renderTapEvent(event)
371 if output != expectedOutput {
372 t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
373 }
374 })
375
376 t.Run("Converts HTTP response end event with empty EOS context string", func(t *testing.T) {
377 event := toTapEvent(&tapPb.TapEvent_Http{
378 Event: &tapPb.TapEvent_Http_ResponseEnd_{
379 ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
380 SinceRequestInit: &duration.Duration{Nanos: 999000},
381 SinceResponseInit: &duration.Duration{Nanos: 888000},
382 ResponseBytes: 111,
383 Eos: &metricsPb.Eos{},
384 },
385 },
386 })
387
388 expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= duration=888µs response-length=111B"
389 output := renderTapEvent(event)
390 if output != expectedOutput {
391 t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
392 }
393 })
394
395 t.Run("Converts HTTP response end event without EOS context string", func(t *testing.T) {
396 event := toTapEvent(&tapPb.TapEvent_Http{
397 Event: &tapPb.TapEvent_Http_ResponseEnd_{
398 ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
399 SinceRequestInit: &duration.Duration{Nanos: 999000},
400 SinceResponseInit: &duration.Duration{Nanos: 888000},
401 ResponseBytes: 111,
402 },
403 },
404 })
405
406 expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= duration=888µs response-length=111B"
407 output := renderTapEvent(event)
408 if output != expectedOutput {
409 t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
410 }
411 })
412
413 t.Run("Handles unknown event types", func(t *testing.T) {
414 event := toTapEvent(&tapPb.TapEvent_Http{})
415
416 expectedOutput := "unknown proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls="
417 output := renderTapEvent(event)
418 if output != expectedOutput {
419 t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
420 }
421 })
422 }
423
View as plain text