...

Source file src/github.com/linkerd/linkerd2/viz/cmd/tap_test.go

Documentation: github.com/linkerd/linkerd2/viz/cmd

     1  package cmd
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"net/http"
     7  	"net/http/httptest"
     8  	"os"
     9  	"strings"
    10  	"testing"
    11  
    12  	"github.com/golang/protobuf/ptypes/duration"
    13  	netPb "github.com/linkerd/linkerd2/controller/gen/common/net"
    14  	"github.com/linkerd/linkerd2/pkg/addr"
    15  	"github.com/linkerd/linkerd2/pkg/k8s"
    16  	"github.com/linkerd/linkerd2/pkg/protohttp"
    17  	metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    18  	tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
    19  	"github.com/linkerd/linkerd2/viz/tap/pkg"
    20  	"google.golang.org/grpc/codes"
    21  )
    22  
    23  const targetName = "pod-666"
    24  
    25  func busyTest(t *testing.T, output string) {
    26  	resourceType := k8s.Pod
    27  	params := pkg.TapRequestParams{
    28  		Resource:  resourceType + "/" + targetName,
    29  		Scheme:    "https",
    30  		Method:    "GET",
    31  		Authority: "localhost",
    32  		Path:      "/some/path",
    33  	}
    34  
    35  	req, err := pkg.BuildTapByResourceRequest(params)
    36  	if err != nil {
    37  		t.Fatalf("Unexpected error: %v", err)
    38  	}
    39  
    40  	event1 := pkg.CreateTapEvent(
    41  		&tapPb.TapEvent_Http{
    42  			Event: &tapPb.TapEvent_Http_RequestInit_{
    43  				RequestInit: &tapPb.TapEvent_Http_RequestInit{
    44  					Id: &tapPb.TapEvent_Http_StreamId{
    45  						Base: 1,
    46  					},
    47  					Method: &metricsPb.HttpMethod{
    48  						Type: &metricsPb.HttpMethod_Registered_{
    49  							Registered: metricsPb.HttpMethod_GET,
    50  						},
    51  					},
    52  					Scheme: &metricsPb.Scheme{
    53  						Type: &metricsPb.Scheme_Registered_{
    54  							Registered: metricsPb.Scheme_HTTPS,
    55  						},
    56  					},
    57  					Authority: params.Authority,
    58  					Path:      params.Path,
    59  					Headers: &metricsPb.Headers{
    60  						Headers: []*metricsPb.Headers_Header{
    61  							{
    62  								Name: "header-name-1",
    63  								Value: &metricsPb.Headers_Header_ValueStr{
    64  									ValueStr: "header-value-str-1",
    65  								},
    66  							},
    67  							{
    68  								Name: "header-name-2",
    69  								Value: &metricsPb.Headers_Header_ValueBin{
    70  									ValueBin: []byte("header-value-bin-2"),
    71  								},
    72  							},
    73  						},
    74  					},
    75  				},
    76  			},
    77  		},
    78  		map[string]string{
    79  			"pod": "my-pod",
    80  			"tls": "true",
    81  		},
    82  		tapPb.TapEvent_OUTBOUND,
    83  	)
    84  	event2 := pkg.CreateTapEvent(
    85  		&tapPb.TapEvent_Http{
    86  			Event: &tapPb.TapEvent_Http_ResponseEnd_{
    87  				ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
    88  					Id: &tapPb.TapEvent_Http_StreamId{
    89  						Base: 1,
    90  					},
    91  					Eos: &metricsPb.Eos{
    92  						End: &metricsPb.Eos_GrpcStatusCode{GrpcStatusCode: 666},
    93  					},
    94  					SinceRequestInit: &duration.Duration{
    95  						Seconds: 10,
    96  					},
    97  					SinceResponseInit: &duration.Duration{
    98  						Seconds: 100,
    99  					},
   100  					ResponseBytes: 1337,
   101  					Trailers: &metricsPb.Headers{
   102  						Headers: []*metricsPb.Headers_Header{
   103  							{
   104  								Name: "trailer-name",
   105  								Value: &metricsPb.Headers_Header_ValueBin{
   106  									ValueBin: []byte("header-value-bin"),
   107  								},
   108  							},
   109  						},
   110  					},
   111  				},
   112  			},
   113  		},
   114  		map[string]string{},
   115  		tapPb.TapEvent_OUTBOUND,
   116  	)
   117  	kubeAPI, err := k8s.NewFakeAPI()
   118  	if err != nil {
   119  		t.Fatalf("Unexpected error: %v", err)
   120  	}
   121  	ts := httptest.NewServer(http.HandlerFunc(
   122  		func(w http.ResponseWriter, r *http.Request) {
   123  			for _, event := range []*tapPb.TapEvent{event1, event2} {
   124  				err = protohttp.WriteProtoToHTTPResponse(w, event)
   125  				if err != nil {
   126  					t.Fatalf("Unexpected error: %v", err)
   127  				}
   128  			}
   129  		}),
   130  	)
   131  	defer ts.Close()
   132  	kubeAPI.Config.Host = ts.URL
   133  
   134  	options := newTapOptions()
   135  	options.output = output
   136  
   137  	writer := bytes.NewBufferString("")
   138  	err = requestTapByResourceFromAPI(context.Background(), writer, kubeAPI, req, options)
   139  	if err != nil {
   140  		t.Fatalf("Unexpected error: %v", err)
   141  	}
   142  
   143  	var goldenFilePath string
   144  	switch {
   145  	case options.output == wideOutput:
   146  		goldenFilePath = "testdata/tap_busy_output_wide.golden"
   147  	case options.output == jsonOutput:
   148  		goldenFilePath = "testdata/tap_busy_output_json.golden"
   149  	case strings.HasPrefix(options.output, jsonPathOutput):
   150  		goldenFilePath = "testdata/tap_busy_output_jsonpath.golden"
   151  	default:
   152  		goldenFilePath = "testdata/tap_busy_output.golden"
   153  	}
   154  
   155  	goldenFileBytes, err := os.ReadFile(goldenFilePath)
   156  	if err != nil {
   157  		t.Fatalf("Unexpected error: %v", err)
   158  	}
   159  	expectedContent := string(goldenFileBytes)
   160  	actual := writer.String()
   161  	if expectedContent != actual {
   162  		t.Fatalf("Expected function to render:\n%s\bbut got:\n%s", expectedContent, actual)
   163  	}
   164  }
   165  
   166  func TestRequestTapByResourceFromAPI(t *testing.T) {
   167  
   168  	ctx := context.Background()
   169  	t.Run("Should render busy response if everything went well", func(t *testing.T) {
   170  		busyTest(t, "")
   171  	})
   172  
   173  	t.Run("Should render wide busy response if everything went well", func(t *testing.T) {
   174  		busyTest(t, "wide")
   175  	})
   176  
   177  	t.Run("Should render JSON busy response if everything went well", func(t *testing.T) {
   178  		busyTest(t, "json")
   179  	})
   180  
   181  	t.Run("Should render jsonpath busy response if everything went well", func(t *testing.T) {
   182  		busyTest(t, "jsonpath={.source}")
   183  	})
   184  
   185  	t.Run("Should render empty response if no events returned", func(t *testing.T) {
   186  		resourceType := k8s.Pod
   187  		params := pkg.TapRequestParams{
   188  			Resource:  resourceType + "/" + targetName,
   189  			Scheme:    "https",
   190  			Method:    "GET",
   191  			Authority: "localhost",
   192  			Path:      "/some/path",
   193  		}
   194  
   195  		req, err := pkg.BuildTapByResourceRequest(params)
   196  		if err != nil {
   197  			t.Fatalf("Unexpected error: %v", err)
   198  		}
   199  
   200  		kubeAPI, err := k8s.NewFakeAPI()
   201  		if err != nil {
   202  			t.Fatalf("Unexpected error: %v", err)
   203  		}
   204  		ts := httptest.NewServer(http.HandlerFunc(
   205  			func(w http.ResponseWriter, r *http.Request) {}),
   206  		)
   207  		defer ts.Close()
   208  		kubeAPI.Config.Host = ts.URL
   209  
   210  		options := newTapOptions()
   211  		writer := bytes.NewBufferString("")
   212  		err = requestTapByResourceFromAPI(ctx, writer, kubeAPI, req, options)
   213  		if err != nil {
   214  			t.Fatalf("Unexpected error: %v", err)
   215  		}
   216  
   217  		goldenFileBytes, err := os.ReadFile("testdata/tap_empty_output.golden")
   218  		if err != nil {
   219  			t.Fatalf("Unexpected error: %v", err)
   220  		}
   221  		expectedContent := string(goldenFileBytes)
   222  		output := writer.String()
   223  		if expectedContent != output {
   224  			t.Fatalf("Expected function to render:\n%s\bbut got:\n%s", expectedContent, output)
   225  		}
   226  	})
   227  
   228  	t.Run("Should return error if stream returned error", func(t *testing.T) {
   229  		t.SkipNow()
   230  		resourceType := k8s.Pod
   231  		params := pkg.TapRequestParams{
   232  			Resource:  resourceType + "/" + targetName,
   233  			Scheme:    "https",
   234  			Method:    "GET",
   235  			Authority: "localhost",
   236  			Path:      "/some/path",
   237  		}
   238  
   239  		req, err := pkg.BuildTapByResourceRequest(params)
   240  		if err != nil {
   241  			t.Fatalf("Unexpected error: %v", err)
   242  		}
   243  
   244  		kubeAPI, err := k8s.NewFakeAPI()
   245  		if err != nil {
   246  			t.Fatalf("Unexpected error: %v", err)
   247  		}
   248  
   249  		options := newTapOptions()
   250  		writer := bytes.NewBufferString("")
   251  		err = requestTapByResourceFromAPI(ctx, writer, kubeAPI, req, options)
   252  		if err == nil {
   253  			t.Fatalf("Expecting error, got nothing but output [%s]", writer.String())
   254  		}
   255  	})
   256  }
   257  
   258  func TestEventToString(t *testing.T) {
   259  	toTapEvent := func(httpEvent *tapPb.TapEvent_Http) *tapPb.TapEvent {
   260  		streamID := &tapPb.TapEvent_Http_StreamId{
   261  			Base:   7,
   262  			Stream: 8,
   263  		}
   264  
   265  		switch httpEvent.Event.(type) {
   266  		case *tapPb.TapEvent_Http_RequestInit_:
   267  			httpEvent.GetRequestInit().Id = streamID
   268  		case *tapPb.TapEvent_Http_ResponseInit_:
   269  			httpEvent.GetResponseInit().Id = streamID
   270  		case *tapPb.TapEvent_Http_ResponseEnd_:
   271  			httpEvent.GetResponseEnd().Id = streamID
   272  		}
   273  
   274  		srcIP, _ := addr.ParsePublicIP("1.2.3.4")
   275  		destIP, _ := addr.ParsePublicIP("2.3.4.5")
   276  		return &tapPb.TapEvent{
   277  			ProxyDirection: tapPb.TapEvent_OUTBOUND,
   278  			Source: &netPb.TcpAddress{
   279  				Ip:   srcIP,
   280  				Port: 5555,
   281  			},
   282  			Destination: &netPb.TcpAddress{
   283  				Ip:   destIP,
   284  				Port: 6666,
   285  			},
   286  			Event: &tapPb.TapEvent_Http_{Http: httpEvent},
   287  		}
   288  	}
   289  
   290  	t.Run("Converts HTTP request init event to string", func(t *testing.T) {
   291  		event := toTapEvent(&tapPb.TapEvent_Http{
   292  			Event: &tapPb.TapEvent_Http_RequestInit_{
   293  				RequestInit: &tapPb.TapEvent_Http_RequestInit{
   294  					Method: &metricsPb.HttpMethod{
   295  						Type: &metricsPb.HttpMethod_Registered_{
   296  							Registered: metricsPb.HttpMethod_POST,
   297  						},
   298  					},
   299  					Scheme: &metricsPb.Scheme{
   300  						Type: &metricsPb.Scheme_Registered_{
   301  							Registered: metricsPb.Scheme_HTTPS,
   302  						},
   303  					},
   304  					Authority: "hello.default:7777",
   305  					Path:      "/hello.v1.HelloService/Hello",
   306  				},
   307  			},
   308  		})
   309  
   310  		expectedOutput := "req id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= :method=POST :authority=hello.default:7777 :path=/hello.v1.HelloService/Hello"
   311  		output := renderTapEvent(event)
   312  		if output != expectedOutput {
   313  			t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
   314  		}
   315  	})
   316  
   317  	t.Run("Converts HTTP response init event to string", func(t *testing.T) {
   318  		event := toTapEvent(&tapPb.TapEvent_Http{
   319  			Event: &tapPb.TapEvent_Http_ResponseInit_{
   320  				ResponseInit: &tapPb.TapEvent_Http_ResponseInit{
   321  					SinceRequestInit: &duration.Duration{Seconds: 9, Nanos: 999000},
   322  					HttpStatus:       http.StatusOK,
   323  				},
   324  			},
   325  		})
   326  
   327  		expectedOutput := "rsp id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= :status=200 latency=9000999µs"
   328  		output := renderTapEvent(event)
   329  		if output != expectedOutput {
   330  			t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
   331  		}
   332  	})
   333  
   334  	t.Run("Converts gRPC response end event to string", func(t *testing.T) {
   335  		event := toTapEvent(&tapPb.TapEvent_Http{
   336  			Event: &tapPb.TapEvent_Http_ResponseEnd_{
   337  				ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
   338  					SinceRequestInit:  &duration.Duration{Nanos: 999000},
   339  					SinceResponseInit: &duration.Duration{Nanos: 888000},
   340  					ResponseBytes:     111,
   341  					Eos: &metricsPb.Eos{
   342  						End: &metricsPb.Eos_GrpcStatusCode{GrpcStatusCode: uint32(codes.OK)},
   343  					},
   344  				},
   345  			},
   346  		})
   347  
   348  		expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= grpc-status=OK duration=888µs response-length=111B"
   349  		output := renderTapEvent(event)
   350  		if output != expectedOutput {
   351  			t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
   352  		}
   353  	})
   354  
   355  	t.Run("Converts HTTP response end event with reset error code to string", func(t *testing.T) {
   356  		event := toTapEvent(&tapPb.TapEvent_Http{
   357  			Event: &tapPb.TapEvent_Http_ResponseEnd_{
   358  				ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
   359  					SinceRequestInit:  &duration.Duration{Nanos: 999000},
   360  					SinceResponseInit: &duration.Duration{Nanos: 888000},
   361  					ResponseBytes:     111,
   362  					Eos: &metricsPb.Eos{
   363  						End: &metricsPb.Eos_ResetErrorCode{ResetErrorCode: 123},
   364  					},
   365  				},
   366  			},
   367  		})
   368  
   369  		expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= reset-error=123 duration=888µs response-length=111B"
   370  		output := renderTapEvent(event)
   371  		if output != expectedOutput {
   372  			t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
   373  		}
   374  	})
   375  
   376  	t.Run("Converts HTTP response end event with empty EOS context string", func(t *testing.T) {
   377  		event := toTapEvent(&tapPb.TapEvent_Http{
   378  			Event: &tapPb.TapEvent_Http_ResponseEnd_{
   379  				ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
   380  					SinceRequestInit:  &duration.Duration{Nanos: 999000},
   381  					SinceResponseInit: &duration.Duration{Nanos: 888000},
   382  					ResponseBytes:     111,
   383  					Eos:               &metricsPb.Eos{},
   384  				},
   385  			},
   386  		})
   387  
   388  		expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= duration=888µs response-length=111B"
   389  		output := renderTapEvent(event)
   390  		if output != expectedOutput {
   391  			t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
   392  		}
   393  	})
   394  
   395  	t.Run("Converts HTTP response end event without EOS context string", func(t *testing.T) {
   396  		event := toTapEvent(&tapPb.TapEvent_Http{
   397  			Event: &tapPb.TapEvent_Http_ResponseEnd_{
   398  				ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
   399  					SinceRequestInit:  &duration.Duration{Nanos: 999000},
   400  					SinceResponseInit: &duration.Duration{Nanos: 888000},
   401  					ResponseBytes:     111,
   402  				},
   403  			},
   404  		})
   405  
   406  		expectedOutput := "end id=7:8 proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls= duration=888µs response-length=111B"
   407  		output := renderTapEvent(event)
   408  		if output != expectedOutput {
   409  			t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
   410  		}
   411  	})
   412  
   413  	t.Run("Handles unknown event types", func(t *testing.T) {
   414  		event := toTapEvent(&tapPb.TapEvent_Http{})
   415  
   416  		expectedOutput := "unknown proxy=out src=1.2.3.4:5555 dst=2.3.4.5:6666 tls="
   417  		output := renderTapEvent(event)
   418  		if output != expectedOutput {
   419  			t.Fatalf("Expecting command output to be [%s], got [%s]", expectedOutput, output)
   420  		}
   421  	})
   422  }
   423  

View as plain text