...

Source file src/github.com/linkerd/linkerd2/viz/metrics-api/policy.go

Documentation: github.com/linkerd/linkerd2/viz/metrics-api

     1  package api
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"strings"
     7  
     8  	"github.com/linkerd/linkerd2/pkg/k8s"
     9  	pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    10  	"github.com/prometheus/common/model"
    11  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    12  	"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
    13  	"k8s.io/apimachinery/pkg/runtime/schema"
    14  )
    15  
    16  const (
    17  	httpAuthzDenyQuery  = "sum(increase(inbound_http_authz_deny_total%s[%s])) by (%s)"
    18  	httpAuthzAllowQuery = "sum(increase(inbound_http_authz_allow_total%s[%s])) by (%s)"
    19  )
    20  
    21  func isPolicyResource(resource *pb.Resource) bool {
    22  	if resource != nil {
    23  		if resource.GetType() == k8s.Server ||
    24  			resource.GetType() == k8s.ServerAuthorization ||
    25  			resource.GetType() == k8s.AuthorizationPolicy ||
    26  			resource.GetType() == k8s.HTTPRoute {
    27  			return true
    28  		}
    29  	}
    30  	return false
    31  }
    32  
    33  func (s *grpcServer) policyResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {
    34  
    35  	policyResources, err := s.getPolicyResourceKeys(req)
    36  	if err != nil {
    37  		return resourceResult{res: nil, err: err}
    38  	}
    39  
    40  	var requestMetrics map[rKey]*pb.BasicStats
    41  	var tcpMetrics map[rKey]*pb.TcpStats
    42  	var authzMetrics map[rKey]*pb.ServerStats
    43  	if !req.SkipStats {
    44  		requestMetrics, tcpMetrics, authzMetrics, err = s.getPolicyMetrics(ctx, req, req.TimeWindow)
    45  		if err != nil {
    46  			return resourceResult{res: nil, err: err}
    47  		}
    48  	}
    49  
    50  	rows := make([]*pb.StatTable_PodGroup_Row, 0)
    51  	for _, key := range policyResources {
    52  		row := pb.StatTable_PodGroup_Row{
    53  			Resource: &pb.Resource{
    54  				Name:      key.Name,
    55  				Namespace: key.Namespace,
    56  				Type:      req.GetSelector().GetResource().GetType(),
    57  			},
    58  			TimeWindow: req.TimeWindow,
    59  			Stats:      requestMetrics[key],
    60  			TcpStats:   tcpMetrics[key],
    61  			SrvStats:   authzMetrics[key],
    62  		}
    63  
    64  		rows = append(rows, &row)
    65  	}
    66  
    67  	rsp := pb.StatTable{
    68  		Table: &pb.StatTable_PodGroup_{
    69  			PodGroup: &pb.StatTable_PodGroup{
    70  				Rows: rows,
    71  			},
    72  		},
    73  	}
    74  	return resourceResult{res: &rsp, err: nil}
    75  }
    76  
    77  func (s *grpcServer) getPolicyResourceKeys(req *pb.StatSummaryRequest) ([]rKey, error) {
    78  	var err error
    79  	var unstructuredResources *unstructured.UnstructuredList
    80  
    81  	// TODO(ver): We should use a typed client
    82  	var gvr schema.GroupVersionResource
    83  	if req.GetSelector().Resource.GetType() == k8s.Server {
    84  		gvr = k8s.ServerGVR
    85  	} else if req.GetSelector().Resource.GetType() == k8s.ServerAuthorization {
    86  		gvr = k8s.SazGVR
    87  	} else if req.GetSelector().Resource.GetType() == k8s.AuthorizationPolicy {
    88  		gvr = k8s.AuthorizationPolicyGVR
    89  	} else if req.GetSelector().Resource.GetType() == k8s.HTTPRoute {
    90  		gvr = k8s.HTTPRouteGVR
    91  	}
    92  
    93  	res := req.GetSelector().GetResource()
    94  	labelSelector, err := getLabelSelector(req)
    95  	if err != nil {
    96  		return nil, err
    97  	}
    98  
    99  	if res.GetNamespace() == "" {
   100  		unstructuredResources, err = s.k8sAPI.DynamicClient.Resource(gvr).Namespace("").
   101  			List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector.String()})
   102  	} else if res.GetName() == "" {
   103  		unstructuredResources, err = s.k8sAPI.DynamicClient.Resource(gvr).Namespace(res.GetNamespace()).
   104  			List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector.String()})
   105  	} else {
   106  		var ts *unstructured.Unstructured
   107  		ts, err = s.k8sAPI.DynamicClient.Resource(gvr).Namespace(res.GetNamespace()).
   108  			Get(context.TODO(), res.GetName(), metav1.GetOptions{})
   109  		if err != nil {
   110  			return nil, err
   111  		}
   112  		unstructuredResources = &unstructured.UnstructuredList{Items: []unstructured.Unstructured{*ts}}
   113  	}
   114  	if err != nil {
   115  		return nil, err
   116  	}
   117  
   118  	var resourceKeys []rKey
   119  	for _, resource := range unstructuredResources.Items {
   120  		// Resource Key's type should be singular and lowercased while the kind isn't
   121  		resourceKeys = append(resourceKeys, rKey{
   122  			Namespace: resource.GetNamespace(),
   123  			// TODO(ver) This isn't a reliable way to make a plural name singular.
   124  			Type: strings.ToLower(resource.GetKind()[0:len(resource.GetKind())]),
   125  			Name: resource.GetName(),
   126  		})
   127  	}
   128  	return resourceKeys, nil
   129  }
   130  
   131  func (s *grpcServer) getPolicyMetrics(
   132  	ctx context.Context,
   133  	req *pb.StatSummaryRequest,
   134  	timeWindow string,
   135  ) (map[rKey]*pb.BasicStats, map[rKey]*pb.TcpStats, map[rKey]*pb.ServerStats, error) {
   136  	labels, groupBy := buildServerRequestLabels(req)
   137  	// These metrics are always inbound.
   138  	reqLabels := labels.Merge(model.LabelSet{
   139  		"direction": model.LabelValue("inbound"),
   140  	})
   141  
   142  	promQueries := make(map[promType]string)
   143  	if req.GetSelector().GetResource().GetType() == k8s.Server {
   144  		// TCP metrics are only supported with servers
   145  		if req.TcpStats {
   146  			// peer is always `src` as these are inbound metrics
   147  			tcpLabels := reqLabels.Merge(promPeerLabel("src"))
   148  			promQueries[promTCPConnections] = fmt.Sprintf(tcpConnectionsQuery, tcpLabels.String(), groupBy.String())
   149  			promQueries[promTCPReadBytes] = fmt.Sprintf(tcpReadBytesQuery, tcpLabels.String(), timeWindow, groupBy.String())
   150  			promQueries[promTCPWriteBytes] = fmt.Sprintf(tcpWriteBytesQuery, tcpLabels.String(), timeWindow, groupBy.String())
   151  		}
   152  	}
   153  
   154  	promQueries[promRequests] = fmt.Sprintf(reqQuery, reqLabels, timeWindow, groupBy.String())
   155  	// Use `labels` as direction isn't present with authorization metrics
   156  	promQueries[promAllowedRequests] = fmt.Sprintf(httpAuthzAllowQuery, labels, timeWindow, groupBy.String())
   157  	promQueries[promDeniedRequests] = fmt.Sprintf(httpAuthzDenyQuery, labels, timeWindow, groupBy.String())
   158  	quantileQueries := generateQuantileQueries(latencyQuantileQuery, reqLabels.String(), timeWindow, groupBy.String())
   159  	results, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
   160  	if err != nil {
   161  		return nil, nil, nil, err
   162  	}
   163  
   164  	basicStats, tcpStats, authzStats := processPrometheusMetrics(req, results, groupBy)
   165  	return basicStats, tcpStats, authzStats, nil
   166  }
   167  
   168  func buildServerRequestLabels(req *pb.StatSummaryRequest) (labels model.LabelSet, labelNames model.LabelNames) {
   169  	if req.GetSelector().GetResource().GetNamespace() != "" {
   170  		labels = labels.Merge(model.LabelSet{
   171  			namespaceLabel: model.LabelValue(req.GetSelector().GetResource().GetNamespace()),
   172  		})
   173  	}
   174  	var groupBy model.LabelNames
   175  	if req.GetSelector().GetResource().GetType() == k8s.Server {
   176  		// note that metricToKey assumes the label ordering (..., namespace, name)
   177  		groupBy = model.LabelNames{serverKindLabel, namespaceLabel, serverNameLabel}
   178  		labels = labels.Merge(model.LabelSet{
   179  			serverKindLabel: model.LabelValue("server"),
   180  		})
   181  		if req.GetSelector().GetResource().GetName() != "" {
   182  			labels = labels.Merge(model.LabelSet{
   183  				serverNameLabel: model.LabelValue(req.GetSelector().GetResource().GetName()),
   184  			})
   185  		}
   186  	} else if req.GetSelector().GetResource().GetType() == k8s.ServerAuthorization {
   187  		// note that metricToKey assumes the label ordering (..., namespace, name)
   188  		groupBy = model.LabelNames{namespaceLabel, authorizationNameLabel}
   189  		labels = labels.Merge(model.LabelSet{
   190  			authorizationKindLabel: model.LabelValue("serverauthorization"),
   191  		})
   192  		if req.GetSelector().GetResource().GetName() != "" {
   193  			labels = labels.Merge(model.LabelSet{
   194  				authorizationNameLabel: model.LabelValue(req.GetSelector().GetResource().GetName()),
   195  			})
   196  		}
   197  	} else if req.GetSelector().GetResource().GetType() == k8s.AuthorizationPolicy {
   198  		// note that metricToKey assumes the label ordering (..., namespace, name)
   199  		groupBy = model.LabelNames{namespaceLabel, authorizationNameLabel}
   200  		labels = labels.Merge(model.LabelSet{
   201  			authorizationKindLabel: model.LabelValue("authorizationpolicy"),
   202  		})
   203  		if req.GetSelector().GetResource().GetName() != "" {
   204  			labels = labels.Merge(model.LabelSet{
   205  				authorizationNameLabel: model.LabelValue(req.GetSelector().GetResource().GetName()),
   206  			})
   207  		}
   208  	} else if req.GetSelector().GetResource().GetType() == k8s.HTTPRoute {
   209  		// note that metricToKey assumes the label ordering (..., namespace, name)
   210  		groupBy = model.LabelNames{serverNameLabel, serverKindLabel, routeNameLabel, routeKindLabel, namespaceLabel, routeNameLabel}
   211  		if req.GetSelector().GetResource().GetName() != "" {
   212  			labels = labels.Merge(model.LabelSet{
   213  				routeNameLabel: model.LabelValue(req.GetSelector().GetResource().GetName()),
   214  			})
   215  		}
   216  	}
   217  
   218  	switch out := req.Outbound.(type) {
   219  	case *pb.StatSummaryRequest_ToResource:
   220  		// if --to flag is passed, Calculate traffic sent to the policy resource
   221  		// with additional filtering narrowing down to the workload
   222  		// it is sent to.
   223  		labels = labels.Merge(promQueryLabels(out.ToResource))
   224  
   225  	// No FromResource case as policy metrics are all inbound
   226  	default:
   227  		// no extra labels needed
   228  	}
   229  
   230  	return labels, groupBy
   231  }
   232  

View as plain text