...

Source file src/github.com/linkerd/linkerd2/viz/cmd/tap.go

Documentation: github.com/linkerd/linkerd2/viz/cmd

     1  package cmd
     2  
     3  import (
     4  	"bufio"
     5  	"context"
     6  	"encoding/json"
     7  	"errors"
     8  	"fmt"
     9  	"io"
    10  	"os"
    11  	"sort"
    12  	"strings"
    13  
    14  	"github.com/golang/protobuf/ptypes/duration"
    15  	netPb "github.com/linkerd/linkerd2/controller/gen/common/net"
    16  	"github.com/linkerd/linkerd2/pkg/addr"
    17  	pkgcmd "github.com/linkerd/linkerd2/pkg/cmd"
    18  	"github.com/linkerd/linkerd2/pkg/healthcheck"
    19  	"github.com/linkerd/linkerd2/pkg/k8s"
    20  	"github.com/linkerd/linkerd2/pkg/protohttp"
    21  	metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    22  	"github.com/linkerd/linkerd2/viz/pkg/api"
    23  	hc "github.com/linkerd/linkerd2/viz/pkg/healthcheck"
    24  	"github.com/linkerd/linkerd2/viz/pkg/jsonpath"
    25  	vizutil "github.com/linkerd/linkerd2/viz/pkg/util"
    26  	tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
    27  	"github.com/linkerd/linkerd2/viz/tap/pkg"
    28  	log "github.com/sirupsen/logrus"
    29  	"github.com/spf13/cobra"
    30  	"google.golang.org/grpc/codes"
    31  )
    32  
    33  type renderTapEventFunc func(*tapPb.TapEvent, ...renderOptions) string
    34  
    35  type tapOptions struct {
    36  	namespace     string
    37  	toResource    string
    38  	toNamespace   string
    39  	maxRps        float32
    40  	scheme        string
    41  	method        string
    42  	authority     string
    43  	path          string
    44  	output        string
    45  	labelSelector string
    46  }
    47  
    48  type endpoint struct {
    49  	IP       string            `json:"ip"`
    50  	Port     uint32            `json:"port"`
    51  	Metadata map[string]string `json:"metadata"`
    52  }
    53  
    54  type streamID struct {
    55  	Base   uint32 `json:"base"`
    56  	Stream uint64 `json:"stream"`
    57  }
    58  
    59  type metadata interface {
    60  	isMetadata()
    61  }
    62  
    63  type metadataStr struct {
    64  	Name     string `json:"name"`
    65  	ValueStr string `json:"valueStr"`
    66  }
    67  
    68  func (*metadataStr) isMetadata() {}
    69  
    70  type metadataBin struct {
    71  	Name     string `json:"name"`
    72  	ValueBin []byte `json:"valueBin"`
    73  }
    74  
    75  func (*metadataBin) isMetadata() {}
    76  
    77  type requestInitEvent struct {
    78  	ID        *streamID  `json:"id"`
    79  	Method    string     `json:"method"`
    80  	Scheme    string     `json:"scheme"`
    81  	Authority string     `json:"authority"`
    82  	Path      string     `json:"path"`
    83  	Headers   []metadata `json:"headers"`
    84  }
    85  
    86  type responseInitEvent struct {
    87  	ID               *streamID          `json:"id"`
    88  	SinceRequestInit *duration.Duration `json:"sinceRequestInit"`
    89  	HTTPStatus       uint32             `json:"httpStatus"`
    90  	Headers          []metadata         `json:"headers"`
    91  }
    92  
    93  type responseEndEvent struct {
    94  	ID                *streamID          `json:"id"`
    95  	SinceRequestInit  *duration.Duration `json:"sinceRequestInit"`
    96  	SinceResponseInit *duration.Duration `json:"sinceResponseInit"`
    97  	ResponseBytes     uint64             `json:"responseBytes"`
    98  	Trailers          []metadata         `json:"trailers"`
    99  	GrpcStatusCode    uint32             `json:"grpcStatusCode"`
   100  	ResetErrorCode    uint32             `json:"resetErrorCode,omitempty"`
   101  }
   102  
   103  // Private type used for displaying JSON encoded tap events
   104  type tapEvent struct {
   105  	Source            *endpoint          `json:"source"`
   106  	Destination       *endpoint          `json:"destination"`
   107  	RouteMeta         map[string]string  `json:"routeMeta"`
   108  	ProxyDirection    string             `json:"proxyDirection"`
   109  	RequestInitEvent  *requestInitEvent  `json:"requestInitEvent,omitempty"`
   110  	ResponseInitEvent *responseInitEvent `json:"responseInitEvent,omitempty"`
   111  	ResponseEndEvent  *responseEndEvent  `json:"responseEndEvent,omitempty"`
   112  }
   113  
   114  func newTapOptions() *tapOptions {
   115  	return &tapOptions{
   116  		toResource:    "",
   117  		toNamespace:   "",
   118  		maxRps:        maxRps,
   119  		scheme:        "",
   120  		method:        "",
   121  		authority:     "",
   122  		path:          "",
   123  		output:        "",
   124  		labelSelector: "",
   125  	}
   126  }
   127  
   128  type renderFilter struct {
   129  	JsonPath string
   130  }
   131  
   132  type renderOptions func(f *renderFilter)
   133  
   134  func WithJsonPath(jsonPath string) renderOptions {
   135  	return func(r *renderFilter) {
   136  		r.JsonPath = jsonPath
   137  	}
   138  }
   139  
   140  func (o *tapOptions) validate() error {
   141  	if o.output == "" || o.output == wideOutput || o.output == jsonOutput || strings.HasPrefix(o.output, jsonPathOutput) {
   142  		return nil
   143  	}
   144  
   145  	return fmt.Errorf("output format \"%s\" not recognized", o.output)
   146  }
   147  
   148  // NewCmdTap creates a new cobra command `tap` for tap functionality
   149  func NewCmdTap() *cobra.Command {
   150  	options := newTapOptions()
   151  
   152  	cmd := &cobra.Command{
   153  		Use:   "tap [flags] (RESOURCE)",
   154  		Short: "Listen to a traffic stream",
   155  		Long: `Listen to a traffic stream.
   156  
   157    The RESOURCE argument specifies the target resource(s) to tap:
   158    (TYPE [NAME] | TYPE/NAME)
   159  
   160    Examples:
   161    * cronjob/my-cronjob
   162    * deploy
   163    * deploy/my-deploy
   164    * deploy my-deploy
   165    * ds/my-daemonset
   166    * job/my-job
   167    * ns/my-ns
   168    * rs
   169    * rs/my-replicaset
   170    * sts
   171    * sts/my-statefulset
   172  
   173    Valid resource types include:
   174    * cronjobs
   175    * daemonsets
   176    * deployments
   177    * jobs
   178    * namespaces
   179    * pods
   180    * replicasets
   181    * replicationcontrollers
   182    * statefulsets
   183    * services (only supported as a --to resource)`,
   184  		Example: `  # tap the web deployment in the default namespace
   185    linkerd viz tap deploy/web
   186  
   187    # tap the web-dlbvj pod in the default namespace
   188    linkerd viz tap pod/web-dlbvj
   189  
   190    # tap the test namespace, filter by request to prod namespace
   191    linkerd viz tap ns/test --to ns/prod`,
   192  		Args: cobra.RangeArgs(1, 2),
   193  		ValidArgsFunction: func(cmd *cobra.Command, args []string, toComplete string) ([]string, cobra.ShellCompDirective) {
   194  			// This command requires at most two arguments if we already have
   195  			// two after requesting autocompletion i.e. [tab][tab]
   196  			// skip running validArgsFunction
   197  			if len(args) > 1 {
   198  				return nil, cobra.ShellCompDirectiveError
   199  			}
   200  
   201  			k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
   202  			if err != nil {
   203  				return nil, cobra.ShellCompDirectiveError
   204  			}
   205  
   206  			if options.namespace == "" {
   207  				options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
   208  			}
   209  
   210  			cc := k8s.NewCommandCompletion(k8sAPI, options.namespace)
   211  
   212  			results, err := cc.Complete(args, toComplete)
   213  			if err != nil {
   214  				return nil, cobra.ShellCompDirectiveError
   215  			}
   216  
   217  			return results, cobra.ShellCompDirectiveDefault
   218  		},
   219  		RunE: func(cmd *cobra.Command, args []string) error {
   220  			if options.namespace == "" {
   221  				options.namespace = pkgcmd.GetDefaultNamespace(kubeconfigPath, kubeContext)
   222  			}
   223  
   224  			api.CheckClientOrExit(hc.VizOptions{
   225  				Options: &healthcheck.Options{
   226  					ControlPlaneNamespace: controlPlaneNamespace,
   227  					KubeConfig:            kubeconfigPath,
   228  					Impersonate:           impersonate,
   229  					ImpersonateGroup:      impersonateGroup,
   230  					KubeContext:           kubeContext,
   231  					APIAddr:               apiAddr,
   232  				},
   233  				VizNamespaceOverride: vizNamespace,
   234  			})
   235  
   236  			requestParams := pkg.TapRequestParams{
   237  				Resource:      strings.Join(args, "/"),
   238  				Namespace:     options.namespace,
   239  				ToResource:    options.toResource,
   240  				ToNamespace:   options.toNamespace,
   241  				MaxRps:        options.maxRps,
   242  				Scheme:        options.scheme,
   243  				Method:        options.method,
   244  				Authority:     options.authority,
   245  				Path:          options.path,
   246  				Extract:       options.output == jsonOutput,
   247  				LabelSelector: options.labelSelector,
   248  			}
   249  
   250  			err := options.validate()
   251  			if err != nil {
   252  				return fmt.Errorf("validation error when executing tap command: %w", err)
   253  			}
   254  
   255  			req, err := pkg.BuildTapByResourceRequest(requestParams)
   256  			if err != nil {
   257  				fmt.Fprintln(os.Stderr, err.Error())
   258  				os.Exit(1)
   259  			}
   260  
   261  			k8sAPI, err := k8s.NewAPI(kubeconfigPath, kubeContext, impersonate, impersonateGroup, 0)
   262  			if err != nil {
   263  				fmt.Fprintln(os.Stderr, err.Error())
   264  				os.Exit(1)
   265  			}
   266  
   267  			err = requestTapByResourceFromAPI(cmd.Context(), os.Stdout, k8sAPI, req, options)
   268  			if err != nil {
   269  				fmt.Fprintln(os.Stderr, err.Error())
   270  				os.Exit(1)
   271  			}
   272  
   273  			return nil
   274  		},
   275  	}
   276  
   277  	cmd.PersistentFlags().StringVarP(&options.namespace, "namespace", "n", options.namespace,
   278  		"Namespace of the specified resource")
   279  	cmd.PersistentFlags().StringVar(&options.toResource, "to", options.toResource,
   280  		"Display requests to this resource")
   281  	cmd.PersistentFlags().StringVar(&options.toNamespace, "to-namespace", options.toNamespace,
   282  		"Sets the namespace used to lookup the \"--to\" resource; by default the current \"--namespace\" is used")
   283  	cmd.PersistentFlags().Float32Var(&options.maxRps, "max-rps", options.maxRps,
   284  		"Maximum requests per second to tap.")
   285  	cmd.PersistentFlags().StringVar(&options.scheme, "scheme", options.scheme,
   286  		"Display requests with this scheme")
   287  	cmd.PersistentFlags().StringVar(&options.method, "method", options.method,
   288  		"Display requests with this HTTP method")
   289  	cmd.PersistentFlags().StringVar(&options.authority, "authority", options.authority,
   290  		"Display requests with this :authority")
   291  	cmd.PersistentFlags().StringVar(&options.path, "path", options.path,
   292  		"Display requests with paths that start with this prefix")
   293  	cmd.PersistentFlags().StringVarP(&options.output, "output", "o", options.output,
   294  		fmt.Sprintf("Output format. One of: \"%s\", \"%s\", \"%s\"", wideOutput, jsonOutput, jsonPathOutput))
   295  	cmd.PersistentFlags().StringVarP(&options.labelSelector, "selector", "l", options.labelSelector,
   296  		"Selector (label query) to filter on, supports '=', '==', and '!='")
   297  
   298  	pkgcmd.ConfigureNamespaceFlagCompletion(
   299  		cmd, []string{"namespace", "to-namespace"},
   300  		kubeconfigPath, impersonate, impersonateGroup, kubeContext)
   301  	return cmd
   302  }
   303  
   304  func requestTapByResourceFromAPI(ctx context.Context, w io.Writer, k8sAPI *k8s.KubernetesAPI, req *tapPb.TapByResourceRequest, options *tapOptions) error {
   305  	reader, body, err := pkg.Reader(ctx, k8sAPI, req)
   306  	if err != nil {
   307  		return err
   308  	}
   309  	defer body.Close()
   310  
   311  	return writeTapEventsToBuffer(w, reader, options)
   312  }
   313  
   314  func writeTapEventsToBuffer(w io.Writer, tapByteStream *bufio.Reader, options *tapOptions) error {
   315  	output := options.output
   316  
   317  	switch {
   318  	case output == "":
   319  		return renderTapEvents(tapByteStream, w, renderTapEvent)
   320  	case output == wideOutput:
   321  		return renderTapEvents(tapByteStream, w, renderTapEventWide)
   322  	case output == jsonOutput:
   323  		return renderTapEvents(tapByteStream, w, renderTapEventJSON)
   324  	case strings.HasPrefix(output, jsonPathOutput):
   325  		jPathFilter, err := jsonpath.GetJsonPathFlagVal(output)
   326  		if err != nil {
   327  			return err
   328  		}
   329  		return renderTapEvents(tapByteStream, w, renderTapEventJSON, WithJsonPath(jPathFilter))
   330  	default:
   331  		return fmt.Errorf("unknown output format: %q", options.output)
   332  	}
   333  }
   334  
   335  func renderTapEvents(tapByteStream *bufio.Reader, w io.Writer, render renderTapEventFunc, opts ...renderOptions) error {
   336  	for {
   337  		log.Debug("Waiting for data...")
   338  		event := tapPb.TapEvent{}
   339  		err := protohttp.FromByteStreamToProtocolBuffers(tapByteStream, &event)
   340  		if err != nil {
   341  			if errors.Is(err, io.EOF) {
   342  				break
   343  			}
   344  			fmt.Fprintln(os.Stderr, err)
   345  			break
   346  		}
   347  		_, err = fmt.Fprintln(w, render(&event, opts...))
   348  		if err != nil {
   349  			return err
   350  		}
   351  	}
   352  
   353  	return nil
   354  }
   355  
   356  func renderTapEventWide(event *tapPb.TapEvent, _ ...renderOptions) string {
   357  	dst := dst(event)
   358  	src := src(event)
   359  
   360  	out := []string{renderTapEvent(event)}
   361  	out = append(out, src.formatResource()...)
   362  	out = append(out, dst.formatResource()...)
   363  	out = append(out, routeLabels(event)...)
   364  	return strings.Join(out, " ")
   365  }
   366  
   367  // renderTapEvent renders a Public API TapEvent to a string.
   368  func renderTapEvent(event *tapPb.TapEvent, _ ...renderOptions) string {
   369  	dst := dst(event)
   370  	src := src(event)
   371  
   372  	proxy := "???"
   373  	tls := ""
   374  	switch event.GetProxyDirection() {
   375  	case tapPb.TapEvent_INBOUND:
   376  		proxy = "in " // A space is added so it aligns with `out`.
   377  		tls = src.tlsStatus()
   378  	case tapPb.TapEvent_OUTBOUND:
   379  		proxy = "out"
   380  		tls = dst.tlsStatus()
   381  	default:
   382  		// Too old for TLS.
   383  	}
   384  
   385  	flow := fmt.Sprintf("proxy=%s %s %s tls=%s",
   386  		proxy,
   387  		src.formatAddr(),
   388  		dst.formatAddr(),
   389  		tls,
   390  	)
   391  
   392  	switch ev := event.GetHttp().GetEvent().(type) {
   393  	case *tapPb.TapEvent_Http_RequestInit_:
   394  		return fmt.Sprintf("req id=%d:%d %s :method=%s :authority=%s :path=%s",
   395  			ev.RequestInit.GetId().GetBase(),
   396  			ev.RequestInit.GetId().GetStream(),
   397  			flow,
   398  			vizutil.HTTPMethodToString(ev.RequestInit.GetMethod()),
   399  			ev.RequestInit.GetAuthority(),
   400  			ev.RequestInit.GetPath(),
   401  		)
   402  
   403  	case *tapPb.TapEvent_Http_ResponseInit_:
   404  		return fmt.Sprintf("rsp id=%d:%d %s :status=%d latency=%dµs",
   405  			ev.ResponseInit.GetId().GetBase(),
   406  			ev.ResponseInit.GetId().GetStream(),
   407  			flow,
   408  			ev.ResponseInit.GetHttpStatus(),
   409  			ev.ResponseInit.GetSinceRequestInit().AsDuration().Microseconds(),
   410  		)
   411  
   412  	case *tapPb.TapEvent_Http_ResponseEnd_:
   413  		switch eos := ev.ResponseEnd.GetEos().GetEnd().(type) {
   414  		case *metricsPb.Eos_GrpcStatusCode:
   415  			return fmt.Sprintf(
   416  				"end id=%d:%d %s grpc-status=%s duration=%dµs response-length=%dB",
   417  				ev.ResponseEnd.GetId().GetBase(),
   418  				ev.ResponseEnd.GetId().GetStream(),
   419  				flow,
   420  				codes.Code(eos.GrpcStatusCode),
   421  				ev.ResponseEnd.GetSinceResponseInit().AsDuration().Microseconds(),
   422  				ev.ResponseEnd.GetResponseBytes(),
   423  			)
   424  
   425  		case *metricsPb.Eos_ResetErrorCode:
   426  			return fmt.Sprintf(
   427  				"end id=%d:%d %s reset-error=%+v duration=%dµs response-length=%dB",
   428  				ev.ResponseEnd.GetId().GetBase(),
   429  				ev.ResponseEnd.GetId().GetStream(),
   430  				flow,
   431  				eos.ResetErrorCode,
   432  				ev.ResponseEnd.GetSinceResponseInit().AsDuration().Microseconds(),
   433  				ev.ResponseEnd.GetResponseBytes(),
   434  			)
   435  
   436  		default:
   437  			return fmt.Sprintf("end id=%d:%d %s duration=%dµs response-length=%dB",
   438  				ev.ResponseEnd.GetId().GetBase(),
   439  				ev.ResponseEnd.GetId().GetStream(),
   440  				flow,
   441  				ev.ResponseEnd.GetSinceResponseInit().AsDuration().Microseconds(),
   442  				ev.ResponseEnd.GetResponseBytes(),
   443  			)
   444  		}
   445  
   446  	default:
   447  		return fmt.Sprintf("unknown %s", flow)
   448  	}
   449  }
   450  
   451  // renderTapEventJSON renders a Public API TapEvent to a string in JSON format.
   452  func renderTapEventJSON(event *tapPb.TapEvent, opts ...renderOptions) string {
   453  	filter := &renderFilter{}
   454  	for _, opt := range opts {
   455  		opt(filter)
   456  	}
   457  	m := mapPublicToDisplayTapEvent(event)
   458  	if filter.JsonPath != "" {
   459  		filteredJson, err := jsonpath.GetJsonFilteredByJPath(m, filter.JsonPath)
   460  		if err != nil {
   461  			return err.Error()
   462  		}
   463  		return filteredJson[0]
   464  	}
   465  	e, err := json.MarshalIndent(m, "", "  ")
   466  	if err != nil {
   467  		return fmt.Sprintf("{\"error marshalling JSON\": \"%s\"}", err)
   468  	}
   469  	return string(e)
   470  }
   471  
   472  // Map public API `TapEvent`s to `displayTapEvent`s
   473  func mapPublicToDisplayTapEvent(event *tapPb.TapEvent) *tapEvent {
   474  	// Map source endpoint
   475  	sip := addr.PublicIPToString(event.GetSource().GetIp())
   476  	src := &endpoint{
   477  		IP:       sip,
   478  		Port:     event.GetSource().GetPort(),
   479  		Metadata: event.GetSourceMeta().GetLabels(),
   480  	}
   481  
   482  	// Map destination endpoint
   483  	dip := addr.PublicIPToString(event.GetDestination().GetIp())
   484  	dst := &endpoint{
   485  		IP:       dip,
   486  		Port:     event.GetDestination().GetPort(),
   487  		Metadata: event.GetDestinationMeta().GetLabels(),
   488  	}
   489  
   490  	return &tapEvent{
   491  		Source:            src,
   492  		Destination:       dst,
   493  		RouteMeta:         event.GetRouteMeta().GetLabels(),
   494  		ProxyDirection:    event.GetProxyDirection().String(),
   495  		RequestInitEvent:  getRequestInitEvent(event.GetHttp()),
   496  		ResponseInitEvent: getResponseInitEvent(event.GetHttp()),
   497  		ResponseEndEvent:  getResponseEndEvent(event.GetHttp()),
   498  	}
   499  }
   500  
   501  // Attempt to map a `TapEvent_Http_RequestInit event to a `requestInitEvent`
   502  func getRequestInitEvent(pubEv *tapPb.TapEvent_Http) *requestInitEvent {
   503  	reqI := pubEv.GetRequestInit()
   504  	if reqI == nil {
   505  		return nil
   506  	}
   507  	sid := &streamID{
   508  		Base:   reqI.GetId().GetBase(),
   509  		Stream: reqI.GetId().GetStream(),
   510  	}
   511  	return &requestInitEvent{
   512  		ID:        sid,
   513  		Method:    formatMethod(reqI.GetMethod()),
   514  		Scheme:    formatScheme(reqI.GetScheme()),
   515  		Authority: reqI.GetAuthority(),
   516  		Path:      reqI.GetPath(),
   517  		Headers:   formatHeadersTrailers(reqI.GetHeaders()),
   518  	}
   519  }
   520  
   521  func formatMethod(m *metricsPb.HttpMethod) string {
   522  	if x, ok := m.GetType().(*metricsPb.HttpMethod_Registered_); ok {
   523  		return x.Registered.String()
   524  	}
   525  	if s, ok := m.GetType().(*metricsPb.HttpMethod_Unregistered); ok {
   526  		return s.Unregistered
   527  	}
   528  	return ""
   529  }
   530  
   531  func formatScheme(s *metricsPb.Scheme) string {
   532  	if x, ok := s.GetType().(*metricsPb.Scheme_Registered_); ok {
   533  		return x.Registered.String()
   534  	}
   535  	if str, ok := s.GetType().(*metricsPb.Scheme_Unregistered); ok {
   536  		return str.Unregistered
   537  	}
   538  	return ""
   539  }
   540  
   541  // Attempt to map a `TapEvent_Http_ResponseInit` event to a `responseInitEvent`
   542  func getResponseInitEvent(pubEv *tapPb.TapEvent_Http) *responseInitEvent {
   543  	resI := pubEv.GetResponseInit()
   544  	if resI == nil {
   545  		return nil
   546  	}
   547  	sid := &streamID{
   548  		Base:   resI.GetId().GetBase(),
   549  		Stream: resI.GetId().GetStream(),
   550  	}
   551  	return &responseInitEvent{
   552  		ID:               sid,
   553  		SinceRequestInit: resI.GetSinceRequestInit(),
   554  		HTTPStatus:       resI.GetHttpStatus(),
   555  		Headers:          formatHeadersTrailers(resI.GetHeaders()),
   556  	}
   557  }
   558  
   559  // Attempt to map a `TapEvent_Http_ResponseEnd` event to a `responseEndEvent`
   560  func getResponseEndEvent(pubEv *tapPb.TapEvent_Http) *responseEndEvent {
   561  	resE := pubEv.GetResponseEnd()
   562  	if resE == nil {
   563  		return nil
   564  	}
   565  	sid := &streamID{
   566  		Base:   resE.GetId().GetBase(),
   567  		Stream: resE.GetId().GetStream(),
   568  	}
   569  	return &responseEndEvent{
   570  		ID:                sid,
   571  		SinceRequestInit:  resE.GetSinceRequestInit(),
   572  		SinceResponseInit: resE.GetSinceResponseInit(),
   573  		ResponseBytes:     resE.GetResponseBytes(),
   574  		Trailers:          formatHeadersTrailers(resE.GetTrailers()),
   575  		GrpcStatusCode:    resE.GetEos().GetGrpcStatusCode(),
   576  		ResetErrorCode:    resE.GetEos().GetResetErrorCode(),
   577  	}
   578  }
   579  
   580  func formatHeadersTrailers(hs *metricsPb.Headers) []metadata {
   581  	var fm []metadata
   582  	for _, h := range hs.GetHeaders() {
   583  		switch h.GetValue().(type) {
   584  		case *metricsPb.Headers_Header_ValueStr:
   585  			fht := &metadataStr{Name: h.GetName(), ValueStr: h.GetValueStr()}
   586  			fm = append(fm, fht)
   587  			continue
   588  		case *metricsPb.Headers_Header_ValueBin:
   589  			fht := &metadataBin{Name: h.GetName(), ValueBin: h.GetValueBin()}
   590  			fm = append(fm, fht)
   591  			continue
   592  		}
   593  	}
   594  	return fm
   595  }
   596  
   597  // src returns the source peer of a `TapEvent`.
   598  func src(event *tapPb.TapEvent) peer {
   599  	return peer{
   600  		address:   event.GetSource(),
   601  		labels:    event.GetSourceMeta().GetLabels(),
   602  		direction: "src",
   603  	}
   604  }
   605  
   606  // dst returns the destination peer of a `TapEvent`.
   607  func dst(event *tapPb.TapEvent) peer {
   608  	return peer{
   609  		address:   event.GetDestination(),
   610  		labels:    event.GetDestinationMeta().GetLabels(),
   611  		direction: "dst",
   612  	}
   613  }
   614  
   615  type peer struct {
   616  	address   *netPb.TcpAddress
   617  	labels    map[string]string
   618  	direction string
   619  }
   620  
   621  // formatAddr formats the peer's TCP address for the `src` or `dst` element in
   622  // the tap output corresponding to this peer.
   623  func (p *peer) formatAddr() string {
   624  	return fmt.Sprintf(
   625  		"%s=%s",
   626  		p.direction,
   627  		addr.PublicAddressToString(p.address),
   628  	)
   629  }
   630  
   631  // formatResource returns the peer's labels formatted and sorted.
   632  func (p *peer) formatResource() []string {
   633  	labels := []string{}
   634  	for k, v := range p.labels {
   635  		labels = append(labels, fmt.Sprintf("%s_%s=%s", p.direction, k, v))
   636  	}
   637  	sort.Strings(labels)
   638  	return labels
   639  }
   640  
   641  func (p *peer) tlsStatus() string {
   642  	return p.labels["tls"]
   643  }
   644  
   645  func routeLabels(event *tapPb.TapEvent) []string {
   646  	out := []string{}
   647  	for key, val := range event.GetRouteMeta().GetLabels() {
   648  		out = append(out, fmt.Sprintf("rt_%s=%s", key, val))
   649  	}
   650  	return out
   651  }
   652  

View as plain text