...

Source file src/github.com/linkerd/linkerd2/viz/metrics-api/edges.go

Documentation: github.com/linkerd/linkerd2/viz/metrics-api

     1  package api
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"sort"
     7  
     8  	"github.com/linkerd/linkerd2/pkg/k8s"
     9  	pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    10  	"github.com/prometheus/common/model"
    11  	log "github.com/sirupsen/logrus"
    12  	v1 "k8s.io/api/core/v1"
    13  )
    14  
    15  const (
    16  	edgesQuery = "sum(%s%s) by (%s, dst_%s, pod, server_id, namespace, dst_namespace, no_tls_reason)"
    17  )
    18  
    19  var formatMsg = map[string]string{
    20  	"disabled":                          "Disabled",
    21  	"loopback":                          "Loopback",
    22  	"no_authority_in_http_request":      "No Authority In HTTP Request",
    23  	"not_http":                          "Not HTTP",
    24  	"not_provided_by_remote":            "Not Provided By Remote",
    25  	"not_provided_by_service_discovery": "Not Provided By Service Discovery",
    26  }
    27  
    28  type edgeKey struct {
    29  	src   string
    30  	srcNs string
    31  	dst   string
    32  	dstNs string
    33  }
    34  
    35  func (s *grpcServer) Edges(ctx context.Context, req *pb.EdgesRequest) (*pb.EdgesResponse, error) {
    36  	log.Debugf("Edges request: %+v", req)
    37  	if req.GetSelector().GetResource() == nil {
    38  		return edgesError(req, "Edges request missing Selector Resource"), nil
    39  	}
    40  
    41  	resourceType := promResourceType(req.GetSelector().GetResource())
    42  	dstResourceType := "dst_" + resourceType
    43  	labelsOutbound := promDirectionLabels("outbound")
    44  	labelsOutboundStr := generateLabelStringWithExclusion(labelsOutbound, string(resourceType), string(dstResourceType))
    45  	query := fmt.Sprintf(edgesQuery, "tcp_open_connections", labelsOutboundStr, resourceType, resourceType)
    46  
    47  	promResult, err := s.queryProm(ctx, query)
    48  	if err != nil {
    49  		return edgesError(req, err.Error()), nil
    50  	}
    51  
    52  	edgeMap := make(map[edgeKey]*pb.Edge)
    53  
    54  	for _, sample := range promResult {
    55  		if sample.Value == 0.0 {
    56  			continue
    57  		}
    58  		key := edgeKey{
    59  			src:   string(sample.Metric[resourceType]),
    60  			srcNs: string(sample.Metric[model.LabelName("namespace")]),
    61  			dst:   string(sample.Metric[dstResourceType]),
    62  			dstNs: string(sample.Metric[model.LabelName("dst_namespace")]),
    63  		}
    64  		requestedNs := req.GetSelector().GetResource().GetNamespace()
    65  		if requestedNs != v1.NamespaceAll {
    66  			if requestedNs != key.srcNs && requestedNs != key.dstNs {
    67  				continue
    68  			}
    69  		}
    70  		if _, ok := edgeMap[key]; !ok {
    71  
    72  			clientID, err := s.getPodIdentity(string(sample.Metric[model.LabelName("pod")]), key.srcNs)
    73  			if err != nil {
    74  				log.Warnf("failed to get pod identity for %s: %v", sample.Metric[model.LabelName("pod")], err)
    75  				continue
    76  			}
    77  
    78  			edgeMap[key] = &pb.Edge{
    79  				Src: &pb.Resource{
    80  					Namespace: key.srcNs,
    81  					Name:      key.src,
    82  					Type:      string(resourceType),
    83  				},
    84  				Dst: &pb.Resource{
    85  					Namespace: key.dstNs,
    86  					Name:      key.dst,
    87  					Type:      string(resourceType),
    88  				},
    89  				ServerId:      string(sample.Metric[model.LabelName("server_id")]),
    90  				ClientId:      clientID,
    91  				NoIdentityMsg: formatMsg[string(sample.Metric[model.LabelName("no_tls_reason")])],
    92  			}
    93  		}
    94  	}
    95  
    96  	edges := []*pb.Edge{}
    97  	for _, edge := range edgeMap {
    98  		edges = append(edges, edge)
    99  	}
   100  	edges = sortEdgeRows(edges)
   101  
   102  	return &pb.EdgesResponse{
   103  		Response: &pb.EdgesResponse_Ok_{
   104  			Ok: &pb.EdgesResponse_Ok{
   105  				Edges: edges,
   106  			},
   107  		},
   108  	}, nil
   109  }
   110  
   111  func edgesError(req *pb.EdgesRequest, message string) *pb.EdgesResponse {
   112  	return &pb.EdgesResponse{
   113  		Response: &pb.EdgesResponse_Error{
   114  			Error: &pb.ResourceError{
   115  				Resource: req.GetSelector().GetResource(),
   116  				Error:    message,
   117  			},
   118  		},
   119  	}
   120  }
   121  
   122  func (s *grpcServer) getPodIdentity(pod string, namespace string) (string, error) {
   123  	po, err := s.k8sAPI.Pod().Lister().Pods(namespace).Get(pod)
   124  	if err != nil {
   125  		return "", err
   126  	}
   127  	return k8s.PodIdentity(po)
   128  }
   129  
   130  func sortEdgeRows(rows []*pb.Edge) []*pb.Edge {
   131  	sort.Slice(rows, func(i, j int) bool {
   132  		keyI := rows[i].GetSrc().GetNamespace() + rows[i].GetDst().GetNamespace() + rows[i].GetSrc().GetName() + rows[i].GetDst().GetName()
   133  		keyJ := rows[j].GetSrc().GetNamespace() + rows[j].GetDst().GetNamespace() + rows[j].GetSrc().GetName() + rows[j].GetDst().GetName()
   134  		return keyI < keyJ
   135  	})
   136  	return rows
   137  }
   138  

View as plain text