...

Source file src/github.com/linkerd/linkerd2/viz/tap/api/grpc_server.go

Documentation: github.com/linkerd/linkerd2/viz/tap/api

     1  package api
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"io"
     8  	"net"
     9  	"strconv"
    10  	"strings"
    11  	"time"
    12  	"unicode/utf8"
    13  
    14  	"k8s.io/apimachinery/pkg/labels"
    15  
    16  	httpPb "github.com/linkerd/linkerd2-proxy-api/go/http_types"
    17  	proxy "github.com/linkerd/linkerd2-proxy-api/go/tap"
    18  	netPb "github.com/linkerd/linkerd2/controller/gen/common/net"
    19  	"github.com/linkerd/linkerd2/controller/k8s"
    20  	"github.com/linkerd/linkerd2/pkg/addr"
    21  	pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
    22  	"github.com/linkerd/linkerd2/pkg/prometheus"
    23  	"github.com/linkerd/linkerd2/pkg/util"
    24  	metricsPb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    25  	vizLabels "github.com/linkerd/linkerd2/viz/pkg/labels"
    26  	pkgUtil "github.com/linkerd/linkerd2/viz/pkg/util"
    27  	tapPb "github.com/linkerd/linkerd2/viz/tap/gen/tap"
    28  	log "github.com/sirupsen/logrus"
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/codes"
    31  	"google.golang.org/grpc/credentials/insecure"
    32  	"google.golang.org/grpc/metadata"
    33  	"google.golang.org/grpc/status"
    34  	corev1 "k8s.io/api/core/v1"
    35  	"k8s.io/apimachinery/pkg/runtime"
    36  	"k8s.io/client-go/tools/cache"
    37  )
    38  
    39  const ipIndex = "ip"
    40  const defaultMaxRps = 100.0
    41  
    42  // GRPCTapServer describes the gRPC server implementing pb.TapServer
    43  type GRPCTapServer struct {
    44  	tapPb.UnimplementedTapServer
    45  	tapPort             uint
    46  	k8sAPI              *k8s.API
    47  	controllerNamespace string
    48  	trustDomain         string
    49  	ignoreHeaders       map[string]bool
    50  }
    51  
    52  var (
    53  	tapInterval = 1 * time.Second
    54  )
    55  
    56  // Tap is deprecated, use TapByResource.
    57  // This API endpoint is marked as deprecated but it's still used.
    58  //
    59  //nolint:staticcheck
    60  func (s *GRPCTapServer) Tap(req *tapPb.TapRequest, stream tapPb.Tap_TapServer) error {
    61  	return status.Error(codes.Unimplemented, "Tap is deprecated, use TapByResource")
    62  }
    63  
    64  // TapByResource taps all resources matched by the request object.
    65  func (s *GRPCTapServer) TapByResource(req *tapPb.TapByResourceRequest, stream tapPb.Tap_TapByResourceServer) error {
    66  	if req == nil {
    67  		return status.Error(codes.InvalidArgument, "TapByResource received nil TapByResourceRequest")
    68  	}
    69  	if req.GetTarget() == nil {
    70  		return status.Error(codes.InvalidArgument, "TapByResource received nil target ResourceSelection")
    71  	}
    72  	res := req.GetTarget().GetResource()
    73  	labelSelector, err := getLabelSelector(req)
    74  	if err != nil {
    75  		return err
    76  	}
    77  	if res == nil {
    78  		return status.Error(codes.InvalidArgument, "TapByResource received nil target Resource")
    79  	}
    80  	if req.GetMaxRps() == 0.0 {
    81  		req.MaxRps = defaultMaxRps
    82  	}
    83  
    84  	objects, err := s.k8sAPI.GetObjects(res.GetNamespace(), res.GetType(), res.GetName(), labelSelector)
    85  	if err != nil {
    86  		return pkgUtil.GRPCError(err)
    87  	}
    88  
    89  	pods := []*corev1.Pod{}
    90  	tapDisabled := []*corev1.Pod{}
    91  	tapNotEnabled := []*corev1.Pod{}
    92  	for _, object := range objects {
    93  		podsFor, err := s.k8sAPI.GetPodsFor(object, false)
    94  		if err != nil {
    95  			return pkgUtil.GRPCError(err)
    96  		}
    97  
    98  		for _, pod := range podsFor {
    99  			if pkgK8s.IsMeshed(pod, s.controllerNamespace) {
   100  				if vizLabels.IsTapDisabled(pod) {
   101  					tapDisabled = append(tapDisabled, pod)
   102  				} else if !vizLabels.IsTapEnabled(pod) {
   103  					tapNotEnabled = append(tapNotEnabled, pod)
   104  				} else {
   105  					pods = append(pods, pod)
   106  				}
   107  			}
   108  		}
   109  	}
   110  
   111  	if len(pods) == 0 {
   112  		var errs strings.Builder
   113  		fmt.Fprintf(&errs, "no pods to tap for type=%q name=%q\n", res.GetType(), res.GetName())
   114  		if len(tapDisabled) > 0 {
   115  			fmt.Fprintf(&errs, "%d pods found with tap disabled via the %s annotation:\n", len(tapDisabled), vizLabels.VizTapDisabled)
   116  			for _, pod := range tapDisabled {
   117  				fmt.Fprintf(&errs, "\t* %s\n", pod.Name)
   118  			}
   119  			fmt.Fprintln(&errs, "remove this annotation to make these pods valid tap targets")
   120  		}
   121  		if len(tapNotEnabled) > 0 {
   122  			fmt.Fprintf(&errs, "%d pods found with tap not enabled:\n", len(tapNotEnabled))
   123  			for _, pod := range tapNotEnabled {
   124  				fmt.Fprintf(&errs, "\t* %s\n", pod.Name)
   125  			}
   126  			fmt.Fprintln(&errs, "restart these pods to enable tap and make them valid tap targets")
   127  		}
   128  		return status.Errorf(codes.NotFound, errs.String())
   129  	}
   130  
   131  	log.Infof("Tapping %d pods for target: %q", len(pods), res.String())
   132  
   133  	events := make(chan *tapPb.TapEvent)
   134  
   135  	// divide the rps evenly between all pods to tap
   136  	rpsPerPod := req.GetMaxRps() / float32(len(pods))
   137  	if rpsPerPod < 1 {
   138  		rpsPerPod = 1
   139  	}
   140  
   141  	match, err := makeByResourceMatch(req.GetMatch())
   142  	if err != nil {
   143  		return pkgUtil.GRPCError(err)
   144  	}
   145  
   146  	extract := &proxy.ObserveRequest_Extract{}
   147  
   148  	// HTTP is the only protocol supported for extracting metadata, so this is
   149  	// the only field checked.
   150  	extractHTTP := req.GetExtract().GetHttp()
   151  	if extractHTTP != nil {
   152  		extract = buildExtractHTTP(extractHTTP)
   153  	}
   154  
   155  	for _, pod := range pods {
   156  		// create the expected pod identity from the pod spec
   157  		ns := res.GetNamespace()
   158  		if res.GetType() == pkgK8s.Namespace {
   159  			ns = res.GetName()
   160  		}
   161  		name := fmt.Sprintf("%s.%s.serviceaccount.identity.%s.%s", pod.Spec.ServiceAccountName, ns, s.controllerNamespace, s.trustDomain)
   162  		log.Debugf("initiating tap request to %s with required name %s", pod.Spec.ServiceAccountName, name)
   163  
   164  		// pass the header metadata into the request context
   165  		ctx := stream.Context()
   166  		ctx = metadata.AppendToOutgoingContext(ctx, pkgK8s.RequireIDHeader, name)
   167  
   168  		// initiate a tap on the pod
   169  		go s.tapProxy(ctx, rpsPerPod, match, extract, pod.Status.PodIP, events)
   170  	}
   171  
   172  	// read events from the taps and send them back
   173  	for {
   174  		select {
   175  		case <-stream.Context().Done():
   176  			return nil
   177  		case event := <-events:
   178  			err := stream.Send(event)
   179  			if err != nil {
   180  				return pkgUtil.GRPCError(err)
   181  			}
   182  		}
   183  	}
   184  }
   185  
   186  func makeByResourceMatch(match *tapPb.TapByResourceRequest_Match) (*proxy.ObserveRequest_Match, error) {
   187  	// TODO: for now assume it's always a single, flat `All` match list
   188  	seq := match.GetAll()
   189  	if seq == nil {
   190  		return nil, status.Errorf(codes.Unimplemented, "unexpected match specified: %+v", match)
   191  	}
   192  
   193  	matches := []*proxy.ObserveRequest_Match{}
   194  
   195  	for _, reqMatch := range seq.Matches {
   196  		switch typed := reqMatch.Match.(type) {
   197  		case *tapPb.TapByResourceRequest_Match_Destinations:
   198  
   199  			for k, v := range destinationLabels(typed.Destinations.Resource) {
   200  				matches = append(matches, &proxy.ObserveRequest_Match{
   201  					Match: &proxy.ObserveRequest_Match_DestinationLabel{
   202  						DestinationLabel: &proxy.ObserveRequest_Match_Label{
   203  							Key:   k,
   204  							Value: v,
   205  						},
   206  					},
   207  				})
   208  			}
   209  
   210  		case *tapPb.TapByResourceRequest_Match_Http_:
   211  
   212  			httpMatch := proxy.ObserveRequest_Match_Http{}
   213  
   214  			switch httpTyped := typed.Http.Match.(type) {
   215  			case *tapPb.TapByResourceRequest_Match_Http_Scheme:
   216  				httpMatch = proxy.ObserveRequest_Match_Http{
   217  					Match: &proxy.ObserveRequest_Match_Http_Scheme{
   218  						Scheme: util.ParseScheme(httpTyped.Scheme),
   219  					},
   220  				}
   221  			case *tapPb.TapByResourceRequest_Match_Http_Method:
   222  				httpMatch = proxy.ObserveRequest_Match_Http{
   223  					Match: &proxy.ObserveRequest_Match_Http_Method{
   224  						Method: util.ParseMethod(httpTyped.Method),
   225  					},
   226  				}
   227  			case *tapPb.TapByResourceRequest_Match_Http_Authority:
   228  				httpMatch = proxy.ObserveRequest_Match_Http{
   229  					Match: &proxy.ObserveRequest_Match_Http_Authority{
   230  						Authority: &proxy.ObserveRequest_Match_Http_StringMatch{
   231  							Match: &proxy.ObserveRequest_Match_Http_StringMatch_Exact{
   232  								Exact: httpTyped.Authority,
   233  							},
   234  						},
   235  					},
   236  				}
   237  			case *tapPb.TapByResourceRequest_Match_Http_Path:
   238  				httpMatch = proxy.ObserveRequest_Match_Http{
   239  					Match: &proxy.ObserveRequest_Match_Http_Path{
   240  						Path: &proxy.ObserveRequest_Match_Http_StringMatch{
   241  							Match: &proxy.ObserveRequest_Match_Http_StringMatch_Prefix{
   242  								Prefix: httpTyped.Path,
   243  							},
   244  						},
   245  					},
   246  				}
   247  			default:
   248  				return nil, status.Errorf(codes.Unimplemented, "unknown HTTP match type: %v", httpTyped)
   249  			}
   250  
   251  			matches = append(matches, &proxy.ObserveRequest_Match{
   252  				Match: &proxy.ObserveRequest_Match_Http_{
   253  					Http: &httpMatch,
   254  				},
   255  			})
   256  
   257  		default:
   258  			return nil, status.Errorf(codes.Unimplemented, "unknown match type: %v", typed)
   259  		}
   260  	}
   261  
   262  	return &proxy.ObserveRequest_Match{
   263  		Match: &proxy.ObserveRequest_Match_All{
   264  			All: &proxy.ObserveRequest_Match_Seq{
   265  				Matches: matches,
   266  			},
   267  		},
   268  	}, nil
   269  }
   270  
   271  // TODO: factor out with `promLabels` in public-api
   272  func destinationLabels(resource *metricsPb.Resource) map[string]string {
   273  	dstLabels := map[string]string{}
   274  	if resource.Name != "" {
   275  		l5dLabel := pkgK8s.KindToL5DLabel(resource.Type)
   276  		dstLabels[l5dLabel] = resource.Name
   277  	}
   278  	if resource.Type != pkgK8s.Namespace && resource.Namespace != "" {
   279  		dstLabels["namespace"] = resource.Namespace
   280  	}
   281  	return dstLabels
   282  }
   283  
   284  func buildExtractHTTP(extract *tapPb.TapByResourceRequest_Extract_Http) *proxy.ObserveRequest_Extract {
   285  	if extract.GetHeaders() != nil {
   286  		return &proxy.ObserveRequest_Extract{
   287  			Extract: &proxy.ObserveRequest_Extract_Http_{
   288  				Http: &proxy.ObserveRequest_Extract_Http{
   289  					Extract: &proxy.ObserveRequest_Extract_Http_Headers_{
   290  						Headers: &proxy.ObserveRequest_Extract_Http_Headers{},
   291  					},
   292  				},
   293  			},
   294  		}
   295  	}
   296  	return nil
   297  }
   298  
   299  // Tap a pod.
   300  // This method will run continuously until an error is encountered or the
   301  // request is cancelled via the context.  Thus it should be called as a
   302  // go-routine.
   303  // To limit the rps to maxRps, this method calls Observe on the pod with a limit
   304  // of maxRps * 1s at most once per 1s window.  If this limit is reached in
   305  // less than 1s, we sleep until the end of the window before calling Observe
   306  // again.
   307  func (s *GRPCTapServer) tapProxy(ctx context.Context, maxRps float32, match *proxy.ObserveRequest_Match, extract *proxy.ObserveRequest_Extract, addr string, events chan *tapPb.TapEvent) {
   308  	strPort := strconv.Itoa(int(s.tapPort))
   309  	tapAddr := net.JoinHostPort(addr, strPort)
   310  	log.Infof("Establishing tap on %s", tapAddr)
   311  	conn, err := grpc.NewClient(tapAddr, grpc.WithTransportCredentials(insecure.NewCredentials()))
   312  	if err != nil {
   313  		log.Error(err)
   314  		return
   315  	}
   316  	client := proxy.NewTapClient(conn)
   317  	defer conn.Close()
   318  
   319  	req := &proxy.ObserveRequest{
   320  		Limit:   uint32(maxRps * float32(tapInterval.Seconds())),
   321  		Match:   match,
   322  		Extract: extract,
   323  	}
   324  
   325  	for { // Request loop
   326  		windowStart := time.Now()
   327  		windowEnd := windowStart.Add(tapInterval)
   328  		rsp, err := client.Observe(ctx, req)
   329  		if err != nil {
   330  			log.Error(err)
   331  			return
   332  		}
   333  		for { // Stream loop
   334  			event, err := rsp.Recv()
   335  			if err != nil {
   336  				if errors.Is(err, io.EOF) {
   337  					log.Debugf("[%s] proxy terminated the stream", addr)
   338  					break
   339  				}
   340  				log.Errorf("[%s] encountered an error: %s", addr, err)
   341  				return
   342  			}
   343  
   344  			translatedEvent := s.translateEvent(ctx, event)
   345  
   346  			select {
   347  			case <-ctx.Done():
   348  				log.Debugf("[%s] client terminated the stream", addr)
   349  				return
   350  			default:
   351  				events <- translatedEvent
   352  			}
   353  		}
   354  		if time.Now().Before(windowEnd) {
   355  			time.Sleep(time.Until(windowEnd))
   356  		}
   357  	}
   358  }
   359  
   360  func (s *GRPCTapServer) translateEvent(ctx context.Context, orig *proxy.TapEvent) *tapPb.TapEvent {
   361  	direction := func(orig proxy.TapEvent_ProxyDirection) tapPb.TapEvent_ProxyDirection {
   362  		switch orig {
   363  		case proxy.TapEvent_INBOUND:
   364  			return tapPb.TapEvent_INBOUND
   365  		case proxy.TapEvent_OUTBOUND:
   366  			return tapPb.TapEvent_OUTBOUND
   367  		default:
   368  			return tapPb.TapEvent_UNKNOWN
   369  		}
   370  	}
   371  
   372  	event := func(orig *proxy.TapEvent_Http) *tapPb.TapEvent_Http_ {
   373  		id := func(orig *proxy.TapEvent_Http_StreamId) *tapPb.TapEvent_Http_StreamId {
   374  			return &tapPb.TapEvent_Http_StreamId{
   375  				Base:   orig.GetBase(),
   376  				Stream: orig.GetStream(),
   377  			}
   378  		}
   379  
   380  		method := func(orig *httpPb.HttpMethod) *metricsPb.HttpMethod {
   381  			switch m := orig.GetType().(type) {
   382  			case *httpPb.HttpMethod_Registered_:
   383  				return &metricsPb.HttpMethod{
   384  					Type: &metricsPb.HttpMethod_Registered_{
   385  						Registered: metricsPb.HttpMethod_Registered(m.Registered),
   386  					},
   387  				}
   388  			case *httpPb.HttpMethod_Unregistered:
   389  				return &metricsPb.HttpMethod{
   390  					Type: &metricsPb.HttpMethod_Unregistered{
   391  						Unregistered: m.Unregistered,
   392  					},
   393  				}
   394  			default:
   395  				return nil
   396  			}
   397  		}
   398  
   399  		scheme := func(orig *httpPb.Scheme) *metricsPb.Scheme {
   400  			switch s := orig.GetType().(type) {
   401  			case *httpPb.Scheme_Registered_:
   402  				return &metricsPb.Scheme{
   403  					Type: &metricsPb.Scheme_Registered_{
   404  						Registered: metricsPb.Scheme_Registered(s.Registered),
   405  					},
   406  				}
   407  			case *httpPb.Scheme_Unregistered:
   408  				return &metricsPb.Scheme{
   409  					Type: &metricsPb.Scheme_Unregistered{
   410  						Unregistered: s.Unregistered,
   411  					},
   412  				}
   413  			default:
   414  				return nil
   415  			}
   416  		}
   417  
   418  		headers := func(orig *httpPb.Headers) *metricsPb.Headers {
   419  			if orig == nil {
   420  				return nil
   421  			}
   422  			var headers []*metricsPb.Headers_Header
   423  			for _, header := range orig.GetHeaders() {
   424  				n := header.GetName()
   425  				if s.ignoreHeaders[n] {
   426  					continue
   427  				}
   428  				b := header.GetValue()
   429  				h := metricsPb.Headers_Header{Name: n, Value: &metricsPb.Headers_Header_ValueBin{ValueBin: b}}
   430  				if utf8.Valid(b) {
   431  					h = metricsPb.Headers_Header{Name: n, Value: &metricsPb.Headers_Header_ValueStr{ValueStr: string(b)}}
   432  				}
   433  				headers = append(headers, &h)
   434  			}
   435  			return &metricsPb.Headers{
   436  				Headers: headers,
   437  			}
   438  		}
   439  
   440  		switch orig := orig.GetEvent().(type) {
   441  		case *proxy.TapEvent_Http_RequestInit_:
   442  			return &tapPb.TapEvent_Http_{
   443  				Http: &tapPb.TapEvent_Http{
   444  					Event: &tapPb.TapEvent_Http_RequestInit_{
   445  						RequestInit: &tapPb.TapEvent_Http_RequestInit{
   446  							Id:        id(orig.RequestInit.GetId()),
   447  							Method:    method(orig.RequestInit.GetMethod()),
   448  							Scheme:    scheme(orig.RequestInit.GetScheme()),
   449  							Authority: orig.RequestInit.Authority,
   450  							Path:      orig.RequestInit.Path,
   451  							Headers:   headers(orig.RequestInit.GetHeaders()),
   452  						},
   453  					},
   454  				},
   455  			}
   456  
   457  		case *proxy.TapEvent_Http_ResponseInit_:
   458  			return &tapPb.TapEvent_Http_{
   459  				Http: &tapPb.TapEvent_Http{
   460  					Event: &tapPb.TapEvent_Http_ResponseInit_{
   461  						ResponseInit: &tapPb.TapEvent_Http_ResponseInit{
   462  							Id:               id(orig.ResponseInit.GetId()),
   463  							SinceRequestInit: orig.ResponseInit.GetSinceRequestInit(),
   464  							HttpStatus:       orig.ResponseInit.GetHttpStatus(),
   465  							Headers:          headers(orig.ResponseInit.GetHeaders()),
   466  						},
   467  					},
   468  				},
   469  			}
   470  
   471  		case *proxy.TapEvent_Http_ResponseEnd_:
   472  			eos := func(orig *proxy.Eos) *metricsPb.Eos {
   473  				switch e := orig.GetEnd().(type) {
   474  				case *proxy.Eos_ResetErrorCode:
   475  					return &metricsPb.Eos{
   476  						End: &metricsPb.Eos_ResetErrorCode{
   477  							ResetErrorCode: e.ResetErrorCode,
   478  						},
   479  					}
   480  				case *proxy.Eos_GrpcStatusCode:
   481  					return &metricsPb.Eos{
   482  						End: &metricsPb.Eos_GrpcStatusCode{
   483  							GrpcStatusCode: e.GrpcStatusCode,
   484  						},
   485  					}
   486  				default:
   487  					return nil
   488  				}
   489  			}
   490  
   491  			return &tapPb.TapEvent_Http_{
   492  				Http: &tapPb.TapEvent_Http{
   493  					Event: &tapPb.TapEvent_Http_ResponseEnd_{
   494  						ResponseEnd: &tapPb.TapEvent_Http_ResponseEnd{
   495  							Id:                id(orig.ResponseEnd.GetId()),
   496  							SinceRequestInit:  orig.ResponseEnd.GetSinceRequestInit(),
   497  							SinceResponseInit: orig.ResponseEnd.GetSinceResponseInit(),
   498  							ResponseBytes:     orig.ResponseEnd.GetResponseBytes(),
   499  							Eos:               eos(orig.ResponseEnd.GetEos()),
   500  							Trailers:          headers(orig.ResponseEnd.GetTrailers()),
   501  						},
   502  					},
   503  				},
   504  			}
   505  
   506  		default:
   507  			return nil
   508  		}
   509  	}
   510  
   511  	sourceLabels := orig.GetSourceMeta().GetLabels()
   512  	if sourceLabels == nil {
   513  		sourceLabels = make(map[string]string)
   514  	}
   515  	destinationLabels := orig.GetDestinationMeta().GetLabels()
   516  	if destinationLabels == nil {
   517  		destinationLabels = make(map[string]string)
   518  	}
   519  
   520  	ev := &tapPb.TapEvent{
   521  		Source: addr.NetToPublic(orig.GetSource()),
   522  		SourceMeta: &tapPb.TapEvent_EndpointMeta{
   523  			Labels: sourceLabels,
   524  		},
   525  		Destination: addr.NetToPublic(orig.GetDestination()),
   526  		DestinationMeta: &tapPb.TapEvent_EndpointMeta{
   527  			Labels: destinationLabels,
   528  		},
   529  		RouteMeta: &tapPb.TapEvent_RouteMeta{
   530  			Labels: orig.GetRouteMeta().GetLabels(),
   531  		},
   532  		ProxyDirection: direction(orig.GetProxyDirection()),
   533  		Event:          event(orig.GetHttp()),
   534  	}
   535  
   536  	s.hydrateEventLabels(ctx, ev)
   537  
   538  	return ev
   539  }
   540  
   541  // NewGrpcTapServer creates a new gRPC Tap server
   542  func NewGrpcTapServer(
   543  	tapPort uint,
   544  	controllerNamespace string,
   545  	trustDomain string,
   546  	k8sAPI *k8s.API,
   547  	ignoreHeaders map[string]bool,
   548  ) (*GRPCTapServer, error) {
   549  	if err := k8sAPI.Pod().Informer().AddIndexers(cache.Indexers{ipIndex: indexByIP}); err != nil {
   550  		return nil, err
   551  	}
   552  	if err := k8sAPI.Node().Informer().AddIndexers(cache.Indexers{ipIndex: indexByIP}); err != nil {
   553  		return nil, err
   554  	}
   555  
   556  	return newGRPCTapServer(tapPort, controllerNamespace, trustDomain, k8sAPI, ignoreHeaders), nil
   557  }
   558  
   559  func newGRPCTapServer(
   560  	tapPort uint,
   561  	controllerNamespace string,
   562  	trustDomain string,
   563  	k8sAPI *k8s.API,
   564  	ignoreHeaders map[string]bool,
   565  ) *GRPCTapServer {
   566  	srv := &GRPCTapServer{
   567  		tapPort:             tapPort,
   568  		k8sAPI:              k8sAPI,
   569  		controllerNamespace: controllerNamespace,
   570  		trustDomain:         trustDomain,
   571  		ignoreHeaders:       ignoreHeaders,
   572  	}
   573  
   574  	s := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
   575  	tapPb.RegisterTapServer(s, srv)
   576  
   577  	return srv
   578  }
   579  
   580  func indexByIP(obj interface{}) ([]string, error) {
   581  	switch v := obj.(type) {
   582  	case *corev1.Pod:
   583  		return []string{v.Status.PodIP}, nil
   584  	case *corev1.Node:
   585  		addresses := make([]string, 0)
   586  		for _, address := range v.Status.Addresses {
   587  			if address.Type == corev1.NodeInternalIP {
   588  				log.Debugf("Indexing node address: %s", address.Address)
   589  				addresses = append(addresses, address.Address)
   590  			}
   591  		}
   592  		return addresses, nil
   593  	}
   594  	return []string{""}, fmt.Errorf("object is not a pod nor a node")
   595  }
   596  
   597  // hydrateEventLabels attempts to hydrate the metadata labels for an event's
   598  // source and (if the event was reported by an inbound proxy) destination,
   599  // and adds them to the event's `SourceMeta` and `DestinationMeta` fields.
   600  //
   601  // Since errors encountered while hydrating metadata are non-fatal and result
   602  // only in missing labels, any errors are logged at the WARN level.
   603  func (s *GRPCTapServer) hydrateEventLabels(ctx context.Context, ev *tapPb.TapEvent) {
   604  	err := s.hydrateIPLabels(ctx, ev.GetSource().GetIp(), ev.GetSourceMeta().GetLabels())
   605  	if err != nil {
   606  		log.Warnf("error hydrating source labels: %s", err)
   607  	}
   608  
   609  	if ev.ProxyDirection == tapPb.TapEvent_INBOUND {
   610  		// Events emitted by an inbound proxies don't have destination labels,
   611  		// since the inbound proxy _is_ the destination, and proxies don't know
   612  		// their own labels.
   613  		err = s.hydrateIPLabels(ctx, ev.GetDestination().GetIp(), ev.GetDestinationMeta().GetLabels())
   614  		if err != nil {
   615  			log.Warnf("error hydrating destination labels: %s", err)
   616  		}
   617  	}
   618  
   619  }
   620  
   621  // hydrateIPLabels attempts to determine the metadata labels for `ip` and, if
   622  // successful, adds them to `labels`.
   623  func (s *GRPCTapServer) hydrateIPLabels(ctx context.Context, ip *netPb.IPAddress, labels map[string]string) error {
   624  	res, err := s.resourceForIP(ip)
   625  	if err != nil {
   626  		return err
   627  	}
   628  
   629  	switch v := res.(type) {
   630  	case *corev1.Pod:
   631  		if v == nil {
   632  			log.Debugf("no pod found for IP %s", addr.PublicIPToString(ip))
   633  			return nil
   634  		}
   635  		ownerKind, ownerName := s.k8sAPI.GetOwnerKindAndName(ctx, v, false)
   636  		podLabels := pkgK8s.GetPodLabels(ownerKind, ownerName, v)
   637  		for key, value := range podLabels {
   638  			labels[key] = value
   639  		}
   640  		labels[pkgK8s.Namespace] = v.Namespace
   641  	case *corev1.Node:
   642  		labels[pkgK8s.Node] = v.Name
   643  	}
   644  	return nil
   645  }
   646  
   647  // resourceForIP returns the node or pod corresponding to a given IP address.
   648  //
   649  // First it checks if the IP corresponds to a Node's internal IP and returns the
   650  // node if that's the case. Otherwise it checks the running pods that match the
   651  // IP. If exactly one is found, it's returned. Otherwise it returns nil. Errors
   652  // are returned only in the event of an error searching the indices.
   653  func (s *GRPCTapServer) resourceForIP(ip *netPb.IPAddress) (runtime.Object, error) {
   654  	ipStr := addr.PublicIPToString(ip)
   655  
   656  	nodes, err := s.k8sAPI.Node().Informer().GetIndexer().ByIndex(ipIndex, ipStr)
   657  	if err != nil {
   658  		return nil, err
   659  	}
   660  	if len(nodes) == 1 {
   661  		log.Debugf("found one node at IP %s", ipStr)
   662  		return nodes[0].(*corev1.Node), nil
   663  	}
   664  
   665  	pods, err := s.k8sAPI.Pod().Informer().GetIndexer().ByIndex(ipIndex, ipStr)
   666  	if err != nil {
   667  		return nil, err
   668  	}
   669  
   670  	if len(pods) == 1 {
   671  		log.Debugf("found one pod at IP %s", ipStr)
   672  		return pods[0].(*corev1.Pod), nil
   673  	}
   674  
   675  	var singleRunningPod *corev1.Pod
   676  	for _, obj := range pods {
   677  		pod := obj.(*corev1.Pod)
   678  		if pod.Status.Phase == corev1.PodRunning {
   679  			if singleRunningPod != nil {
   680  				log.Warnf(
   681  					"could not uniquely identify pod at %s (found %d pods)",
   682  					ipStr,
   683  					len(pods),
   684  				)
   685  				return nil, nil
   686  			}
   687  			singleRunningPod = pod
   688  		}
   689  	}
   690  
   691  	return singleRunningPod, nil
   692  }
   693  
   694  func getLabelSelector(req *tapPb.TapByResourceRequest) (labels.Selector, error) {
   695  	labelSelector := labels.Everything()
   696  	if s := req.GetTarget().GetLabelSelector(); s != "" {
   697  		var err error
   698  		labelSelector, err = labels.Parse(s)
   699  		if err != nil {
   700  			return nil, fmt.Errorf("invalid label selector \"%s\": %w", s, err)
   701  		}
   702  	}
   703  	return labelSelector, nil
   704  }
   705  

View as plain text