...

Source file src/github.com/linkerd/linkerd2/viz/metrics-api/stat_summary.go

Documentation: github.com/linkerd/linkerd2/viz/metrics-api

     1  package api
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"reflect"
     7  	"sort"
     8  	"strings"
     9  
    10  	"github.com/linkerd/linkerd2/pkg/k8s"
    11  	pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    12  	vizutil "github.com/linkerd/linkerd2/viz/pkg/util"
    13  	"github.com/prometheus/common/model"
    14  	log "github.com/sirupsen/logrus"
    15  	"google.golang.org/protobuf/proto"
    16  	corev1 "k8s.io/api/core/v1"
    17  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    18  	"k8s.io/apimachinery/pkg/api/meta"
    19  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    20  	"k8s.io/apimachinery/pkg/labels"
    21  	"k8s.io/apimachinery/pkg/runtime"
    22  )
    23  
    24  type resourceResult struct {
    25  	res *pb.StatTable
    26  	err error
    27  }
    28  type k8sStat struct {
    29  	object   metav1.Object
    30  	podStats *podStats
    31  }
    32  
    33  type rKey struct {
    34  	Namespace string
    35  	Type      string
    36  	Name      string
    37  }
    38  
    39  type dstKey struct {
    40  	Namespace string
    41  	Service   string
    42  	Dst       string
    43  	Weight    string
    44  }
    45  
    46  const (
    47  	success = "success"
    48  	failure = "failure"
    49  
    50  	reqQuery             = "sum(increase(response_total%s[%s])) by (%s, classification, tls)"
    51  	latencyQuantileQuery = "histogram_quantile(%s, sum(irate(response_latency_ms_bucket%s[%s])) by (le, %s))"
    52  	tcpConnectionsQuery  = "sum(tcp_open_connections%s) by (%s)"
    53  	tcpReadBytesQuery    = "sum(increase(tcp_read_bytes_total%s[%s])) by (%s)"
    54  	tcpWriteBytesQuery   = "sum(increase(tcp_write_bytes_total%s[%s])) by (%s)"
    55  
    56  	regexAny = ".+"
    57  )
    58  
    59  type podStats struct {
    60  	status string
    61  	inMesh uint64
    62  	total  uint64
    63  	failed uint64
    64  	errors map[string]*pb.PodErrors
    65  }
    66  
    67  func (s *grpcServer) StatSummary(ctx context.Context, req *pb.StatSummaryRequest) (*pb.StatSummaryResponse, error) {
    68  
    69  	// check for well-formed request
    70  	if req.GetSelector().GetResource() == nil {
    71  		return statSummaryError(req, "StatSummary request missing Selector Resource"), nil
    72  	}
    73  
    74  	// err if --from is a service
    75  	if req.GetFromResource() != nil && req.GetFromResource().GetType() == k8s.Service {
    76  		return statSummaryError(req, "service is not supported as a target on 'from' queries, or as a target with 'to' queries"), nil
    77  	}
    78  
    79  	// err if --from is added with policy resources
    80  	if req.GetFromResource() != nil {
    81  		if isPolicyResource(req.GetSelector().GetResource()) ||
    82  			isPolicyResource(req.GetFromResource()) {
    83  			return statSummaryError(req, "'from' queries are not supported with policy resources, as they have inbound metrics only"), nil
    84  		}
    85  	}
    86  
    87  	if req.GetToResource() != nil {
    88  		if isPolicyResource(req.GetSelector().GetResource()) ||
    89  			isPolicyResource(req.GetToResource()) {
    90  			return statSummaryError(req, "'to' queries are not supported with policy resources, as they have inbound metrics only"), nil
    91  		}
    92  	}
    93  
    94  	switch ob := req.Outbound.(type) {
    95  	case *pb.StatSummaryRequest_ToResource:
    96  		if ob.ToResource.Type == k8s.All {
    97  			return statSummaryError(req, "resource type 'all' is not supported as a filter"), nil
    98  		}
    99  	case *pb.StatSummaryRequest_FromResource:
   100  		if ob.FromResource.Type == k8s.All {
   101  			return statSummaryError(req, "resource type 'all' is not supported as a filter"), nil
   102  		}
   103  	}
   104  
   105  	err := s.validateTimeWindow(ctx, req.TimeWindow)
   106  	if err != nil {
   107  		return statSummaryError(req, fmt.Sprintf("invalid time window: %s", err)), nil
   108  	}
   109  
   110  	statTables := make([]*pb.StatTable, 0)
   111  
   112  	var resourcesToQuery []string
   113  	if req.Selector.Resource.Type == k8s.All {
   114  		resourcesToQuery = k8s.StatAllResourceTypes
   115  	} else {
   116  		resourcesToQuery = []string{req.Selector.Resource.Type}
   117  	}
   118  
   119  	// request stats for the resourcesToQuery, in parallel
   120  	resultChan := make(chan resourceResult)
   121  
   122  	for _, resource := range resourcesToQuery {
   123  		statReq := proto.Clone(req).(*pb.StatSummaryRequest)
   124  		statReq.Selector.Resource.Type = resource
   125  
   126  		go func() {
   127  			if isNonK8sResourceQuery(statReq.GetSelector().GetResource().GetType()) {
   128  				resultChan <- s.nonK8sResourceQuery(ctx, statReq)
   129  			} else if statReq.GetSelector().GetResource().GetType() == k8s.Service {
   130  				resultChan <- s.serviceResourceQuery(ctx, statReq)
   131  			} else if isPolicyResource(statReq.GetSelector().GetResource()) {
   132  				resultChan <- s.policyResourceQuery(ctx, statReq)
   133  			} else {
   134  				resultChan <- s.k8sResourceQuery(ctx, statReq)
   135  			}
   136  		}()
   137  	}
   138  
   139  	for i := 0; i < len(resourcesToQuery); i++ {
   140  		result := <-resultChan
   141  		if result.err != nil {
   142  			return nil, vizutil.GRPCError(result.err)
   143  		}
   144  		statTables = append(statTables, result.res)
   145  	}
   146  
   147  	rsp := pb.StatSummaryResponse{
   148  		Response: &pb.StatSummaryResponse_Ok_{ // https://github.com/golang/protobuf/issues/205
   149  			Ok: &pb.StatSummaryResponse_Ok{
   150  				StatTables: statTables,
   151  			},
   152  		},
   153  	}
   154  
   155  	log.Debugf("Sent response as %+v\n", statTables)
   156  	return &rsp, nil
   157  }
   158  
   159  func statSummaryError(req *pb.StatSummaryRequest, message string) *pb.StatSummaryResponse {
   160  	return &pb.StatSummaryResponse{
   161  		Response: &pb.StatSummaryResponse_Error{
   162  			Error: &pb.ResourceError{
   163  				Resource: req.GetSelector().GetResource(),
   164  				Error:    message,
   165  			},
   166  		},
   167  	}
   168  }
   169  
   170  func (s *grpcServer) getKubernetesObjectStats(req *pb.StatSummaryRequest) (map[rKey]k8sStat, error) {
   171  	requestedResource := req.GetSelector().GetResource()
   172  
   173  	labelSelector, err := getLabelSelector(req)
   174  	if err != nil {
   175  		return nil, err
   176  	}
   177  
   178  	objects, err := s.k8sAPI.GetObjects(requestedResource.Namespace, requestedResource.Type, requestedResource.Name, labelSelector)
   179  	if err != nil {
   180  		return nil, err
   181  	}
   182  
   183  	objectMap := map[rKey]k8sStat{}
   184  
   185  	for _, object := range objects {
   186  		metaObj, err := meta.Accessor(object)
   187  		if err != nil {
   188  			return nil, err
   189  		}
   190  
   191  		key := rKey{
   192  			Name:      metaObj.GetName(),
   193  			Namespace: metaObj.GetNamespace(),
   194  			Type:      requestedResource.GetType(),
   195  		}
   196  
   197  		podStats, err := s.getPodStats(object)
   198  		if err != nil {
   199  			return nil, err
   200  		}
   201  
   202  		objectMap[key] = k8sStat{
   203  			object:   metaObj,
   204  			podStats: podStats,
   205  		}
   206  	}
   207  	return objectMap, nil
   208  }
   209  
   210  func (s *grpcServer) k8sResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {
   211  
   212  	k8sObjects, err := s.getKubernetesObjectStats(req)
   213  	if err != nil {
   214  		return resourceResult{res: nil, err: err}
   215  	}
   216  
   217  	var requestMetrics map[rKey]*pb.BasicStats
   218  	var tcpMetrics map[rKey]*pb.TcpStats
   219  	if !req.SkipStats {
   220  		requestMetrics, tcpMetrics, err = s.getStatMetrics(ctx, req, req.TimeWindow)
   221  		if err != nil {
   222  			return resourceResult{res: nil, err: err}
   223  		}
   224  	}
   225  
   226  	rows := make([]*pb.StatTable_PodGroup_Row, 0)
   227  	keys := getResultKeys(req, k8sObjects, requestMetrics)
   228  
   229  	for _, key := range keys {
   230  		objInfo, ok := k8sObjects[key]
   231  		if !ok {
   232  			continue
   233  		}
   234  
   235  		var tcpStats *pb.TcpStats
   236  		if req.TcpStats {
   237  			tcpStats = tcpMetrics[key]
   238  		}
   239  
   240  		var basicStats *pb.BasicStats
   241  		if !reflect.DeepEqual(requestMetrics[key], &pb.BasicStats{}) {
   242  			basicStats = requestMetrics[key]
   243  		}
   244  
   245  		k8sResource := objInfo.object
   246  		row := pb.StatTable_PodGroup_Row{
   247  			Resource: &pb.Resource{
   248  				Name:      k8sResource.GetName(),
   249  				Namespace: k8sResource.GetNamespace(),
   250  				Type:      req.GetSelector().GetResource().GetType(),
   251  			},
   252  			TimeWindow: req.TimeWindow,
   253  			Stats:      basicStats,
   254  			TcpStats:   tcpStats,
   255  		}
   256  
   257  		podStat := objInfo.podStats
   258  		row.Status = podStat.status
   259  		row.MeshedPodCount = podStat.inMesh
   260  		row.RunningPodCount = podStat.total
   261  		row.FailedPodCount = podStat.failed
   262  		row.ErrorsByPod = podStat.errors
   263  
   264  		rows = append(rows, &row)
   265  	}
   266  
   267  	rsp := pb.StatTable{
   268  		Table: &pb.StatTable_PodGroup_{
   269  			PodGroup: &pb.StatTable_PodGroup{
   270  				Rows: rows,
   271  			},
   272  		},
   273  	}
   274  
   275  	return resourceResult{res: &rsp, err: nil}
   276  }
   277  
   278  func (s *grpcServer) serviceResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {
   279  
   280  	rows := make([]*pb.StatTable_PodGroup_Row, 0)
   281  	dstBasicStats := make(map[dstKey]*pb.BasicStats)
   282  	dstTCPStats := make(map[dstKey]*pb.TcpStats)
   283  
   284  	if !req.SkipStats {
   285  		var err error
   286  		dstBasicStats, dstTCPStats, err = s.getServiceMetrics(ctx, req, req.TimeWindow)
   287  		if err != nil {
   288  			return resourceResult{res: nil, err: err}
   289  		}
   290  	}
   291  
   292  	weights := make(map[dstKey]string)
   293  	for k := range dstBasicStats {
   294  		weights[k] = ""
   295  	}
   296  
   297  	name := req.GetSelector().GetResource().GetName()
   298  	namespace := req.GetSelector().GetResource().GetNamespace()
   299  
   300  	// Check if a ServiceProfile exists for the Service
   301  	spName := fmt.Sprintf("%s.%s.svc.%s", name, namespace, s.clusterDomain)
   302  	sp, err := s.k8sAPI.SP().Lister().ServiceProfiles(namespace).Get(spName)
   303  	if err == nil {
   304  		for _, weightedDst := range sp.Spec.DstOverrides {
   305  			weights[dstKey{
   306  				Namespace: namespace,
   307  				Service:   name,
   308  				Dst:       dstFromAuthority(weightedDst.Authority),
   309  			}] = weightedDst.Weight.String()
   310  		}
   311  	} else if !kerrors.IsNotFound(err) {
   312  		log.Errorf("Failed to get weights from ServiceProfile %q: %v", spName, err)
   313  	}
   314  
   315  	for k, weight := range weights {
   316  		row := pb.StatTable_PodGroup_Row{
   317  			Resource: &pb.Resource{
   318  				Name:      k.Service,
   319  				Namespace: k.Namespace,
   320  				Type:      req.GetSelector().GetResource().GetType(),
   321  			},
   322  			TimeWindow: req.TimeWindow,
   323  			Stats:      dstBasicStats[k],
   324  			TcpStats:   dstTCPStats[k],
   325  		}
   326  
   327  		// Set TrafficSplitStats only when weight is not empty
   328  		if weight != "" {
   329  			row.TsStats = &pb.TrafficSplitStats{
   330  				Apex:   k.Service,
   331  				Leaf:   k.Dst,
   332  				Weight: weight,
   333  			}
   334  		}
   335  		rows = append(rows, &row)
   336  	}
   337  
   338  	// sort rows before returning in order to have a consistent order for tests
   339  	rows = sortTrafficSplitRows(rows)
   340  
   341  	rsp := pb.StatTable{
   342  		Table: &pb.StatTable_PodGroup_{
   343  			PodGroup: &pb.StatTable_PodGroup{
   344  				Rows: rows,
   345  			},
   346  		},
   347  	}
   348  
   349  	return resourceResult{res: &rsp, err: nil}
   350  }
   351  
   352  func sortTrafficSplitRows(rows []*pb.StatTable_PodGroup_Row) []*pb.StatTable_PodGroup_Row {
   353  	sort.Slice(rows, func(i, j int) bool {
   354  		if rows[i].TsStats != nil && rows[j].TsStats != nil {
   355  			key1 := rows[i].TsStats.Apex + rows[i].TsStats.Leaf
   356  			key2 := rows[j].TsStats.Apex + rows[j].TsStats.Leaf
   357  			return key1 < key2
   358  		}
   359  		return false
   360  	})
   361  	return rows
   362  }
   363  
   364  func (s *grpcServer) nonK8sResourceQuery(ctx context.Context, req *pb.StatSummaryRequest) resourceResult {
   365  	var requestMetrics map[rKey]*pb.BasicStats
   366  	if !req.SkipStats {
   367  		var err error
   368  		requestMetrics, _, err = s.getStatMetrics(ctx, req, req.TimeWindow)
   369  		if err != nil {
   370  			return resourceResult{res: nil, err: err}
   371  		}
   372  	}
   373  	rows := make([]*pb.StatTable_PodGroup_Row, 0)
   374  
   375  	for rkey, metrics := range requestMetrics {
   376  		rkey.Type = req.GetSelector().GetResource().GetType()
   377  
   378  		row := pb.StatTable_PodGroup_Row{
   379  			Resource: &pb.Resource{
   380  				Type:      rkey.Type,
   381  				Namespace: rkey.Namespace,
   382  				Name:      rkey.Name,
   383  			},
   384  			TimeWindow: req.TimeWindow,
   385  			Stats:      metrics,
   386  		}
   387  		rows = append(rows, &row)
   388  	}
   389  
   390  	rsp := pb.StatTable{
   391  		Table: &pb.StatTable_PodGroup_{
   392  			PodGroup: &pb.StatTable_PodGroup{
   393  				Rows: rows,
   394  			},
   395  		},
   396  	}
   397  	return resourceResult{res: &rsp, err: nil}
   398  }
   399  
   400  func isNonK8sResourceQuery(resourceType string) bool {
   401  	return resourceType == k8s.Authority
   402  }
   403  
   404  // get the list of objects for which we want to return results
   405  func getResultKeys(
   406  	req *pb.StatSummaryRequest,
   407  	k8sObjects map[rKey]k8sStat,
   408  	metricResults map[rKey]*pb.BasicStats,
   409  ) []rKey {
   410  	var keys []rKey
   411  
   412  	if req.GetOutbound() == nil || req.GetNone() != nil {
   413  		// if the request doesn't have outbound filtering, return all rows
   414  		for key := range k8sObjects {
   415  			keys = append(keys, key)
   416  		}
   417  	} else {
   418  		// if the request does have outbound filtering,
   419  		// only return rows for which we have stats
   420  		for key := range metricResults {
   421  			keys = append(keys, key)
   422  		}
   423  	}
   424  	return keys
   425  }
   426  
   427  func buildRequestLabels(req *pb.StatSummaryRequest) (labels model.LabelSet, labelNames model.LabelNames) {
   428  	// labelNames: the group by in the prometheus query
   429  	// labels: the labels for the resource we want to query for
   430  
   431  	switch out := req.Outbound.(type) {
   432  	case *pb.StatSummaryRequest_ToResource:
   433  		labelNames = promGroupByLabelNames(req.Selector.Resource)
   434  
   435  		labels = labels.Merge(promDstQueryLabels(out.ToResource))
   436  		labels = labels.Merge(promQueryLabels(req.Selector.Resource))
   437  		labels = labels.Merge(promDirectionLabels("outbound"))
   438  
   439  	case *pb.StatSummaryRequest_FromResource:
   440  		labelNames = promDstGroupByLabelNames(req.Selector.Resource)
   441  
   442  		labels = labels.Merge(promQueryLabels(out.FromResource))
   443  		labels = labels.Merge(promDstQueryLabels(req.Selector.Resource))
   444  		labels = labels.Merge(promDirectionLabels("outbound"))
   445  
   446  	default:
   447  		labelNames = promGroupByLabelNames(req.Selector.Resource)
   448  
   449  		labels = labels.Merge(promQueryLabels(req.Selector.Resource))
   450  		labels = labels.Merge(promDirectionLabels("inbound"))
   451  	}
   452  
   453  	return
   454  }
   455  
   456  func buildServiceRequestLabels(req *pb.StatSummaryRequest) (labels model.LabelSet, labelNames model.LabelNames) {
   457  	// Service Request labels are always direction="outbound". If the --from or --to flags were used,
   458  	// we merge an additional ToResource or FromResource label. Service metrics results are
   459  	// always grouped by dst_service, and dst_namespace (to avoid conflicts) .
   460  	labels = model.LabelSet{
   461  		"direction": model.LabelValue("outbound"),
   462  	}
   463  
   464  	switch out := req.Outbound.(type) {
   465  	case *pb.StatSummaryRequest_ToResource:
   466  		// if --to flag is passed, Calculate traffic sent to the service
   467  		// with additional filtering narrowing down to the workload
   468  		// it is sent to.
   469  		labels = labels.Merge(promDstQueryLabels(out.ToResource))
   470  
   471  	case *pb.StatSummaryRequest_FromResource:
   472  		// if --from flag is passed, FromResource is never a service here
   473  		labels = labels.Merge(promQueryLabels(out.FromResource))
   474  
   475  	default:
   476  		// no extra labels needed
   477  	}
   478  
   479  	groupBy := model.LabelNames{model.LabelName("dst_namespace"), model.LabelName("dst_service")}
   480  
   481  	return labels, groupBy
   482  }
   483  
   484  func buildTCPStatsRequestLabels(req *pb.StatSummaryRequest, reqLabels model.LabelSet) string {
   485  	switch req.Outbound.(type) {
   486  	case *pb.StatSummaryRequest_ToResource, *pb.StatSummaryRequest_FromResource:
   487  		// If TCP stats are queried from a resource to another one (i.e outbound -- from/to), then append peer='dst'
   488  		reqLabels = reqLabels.Merge(promPeerLabel("dst"))
   489  
   490  	default:
   491  		// If TCP stats are not queried from a specific resource (i.e inbound -- no to/from), then append peer='src'
   492  		reqLabels = reqLabels.Merge(promPeerLabel("src"))
   493  	}
   494  	return reqLabels.String()
   495  }
   496  
   497  func (s *grpcServer) getStatMetrics(ctx context.Context, req *pb.StatSummaryRequest, timeWindow string) (map[rKey]*pb.BasicStats, map[rKey]*pb.TcpStats, error) {
   498  	reqLabels, groupBy := buildRequestLabels(req)
   499  	promQueries := map[promType]string{
   500  		promRequests: fmt.Sprintf(reqQuery, reqLabels.String(), timeWindow, groupBy.String()),
   501  	}
   502  
   503  	if req.TcpStats {
   504  		promQueries[promTCPConnections] = fmt.Sprintf(tcpConnectionsQuery, reqLabels.String(), groupBy.String())
   505  		// For TCP read/write bytes total we add an additional 'peer' label with a value of either 'src' or 'dst'
   506  		tcpLabels := buildTCPStatsRequestLabels(req, reqLabels)
   507  		promQueries[promTCPReadBytes] = fmt.Sprintf(tcpReadBytesQuery, tcpLabels, timeWindow, groupBy.String())
   508  		promQueries[promTCPWriteBytes] = fmt.Sprintf(tcpWriteBytesQuery, tcpLabels, timeWindow, groupBy.String())
   509  	}
   510  
   511  	quantileQueries := generateQuantileQueries(latencyQuantileQuery, reqLabels.String(), timeWindow, groupBy.String())
   512  	results, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
   513  
   514  	if err != nil {
   515  		return nil, nil, err
   516  	}
   517  
   518  	basicStats, tcpStats, _ := processPrometheusMetrics(req, results, groupBy)
   519  	return basicStats, tcpStats, nil
   520  }
   521  
   522  func (s *grpcServer) getServiceMetrics(ctx context.Context, req *pb.StatSummaryRequest, timeWindow string) (map[dstKey]*pb.BasicStats, map[dstKey]*pb.TcpStats, error) {
   523  	dstBasicStats := make(map[dstKey]*pb.BasicStats)
   524  	dstTCPStats := make(map[dstKey]*pb.TcpStats)
   525  	labels, groupBy := buildServiceRequestLabels(req)
   526  
   527  	service := req.GetSelector().GetResource().GetName()
   528  	namespace := req.GetSelector().GetResource().GetNamespace()
   529  
   530  	if service == "" {
   531  		service = regexAny
   532  	}
   533  	authority := fmt.Sprintf("%s.%s.svc.%s", service, namespace, s.clusterDomain)
   534  
   535  	reqLabels := generateLabelStringWithRegex(labels, string(authorityLabel), authority)
   536  
   537  	promQueries := map[promType]string{
   538  		promRequests: fmt.Sprintf(reqQuery, reqLabels, timeWindow, groupBy.String()),
   539  	}
   540  
   541  	if req.TcpStats {
   542  		// Service stats always need to have `peer=dst`, cuz there is no `src` with `authority` label
   543  		tcpLabels := labels.Merge(promPeerLabel("dst"))
   544  		tcpLabelString := generateLabelStringWithRegex(tcpLabels, string(authorityLabel), authority)
   545  		promQueries[promTCPConnections] = fmt.Sprintf(tcpConnectionsQuery, tcpLabelString, groupBy.String())
   546  		promQueries[promTCPReadBytes] = fmt.Sprintf(tcpReadBytesQuery, tcpLabelString, timeWindow, groupBy.String())
   547  		promQueries[promTCPWriteBytes] = fmt.Sprintf(tcpWriteBytesQuery, tcpLabelString, timeWindow, groupBy.String())
   548  	}
   549  
   550  	quantileQueries := generateQuantileQueries(latencyQuantileQuery, reqLabels, timeWindow, groupBy.String())
   551  	results, err := s.getPrometheusMetrics(ctx, promQueries, quantileQueries)
   552  	if err != nil {
   553  		return nil, nil, err
   554  	}
   555  
   556  	basicStats, tcpStats, _ := processPrometheusMetrics(req, results, groupBy)
   557  
   558  	for rKey, basicStatsVal := range basicStats {
   559  
   560  		// Use the returned `dst_service` in the `all` svc case
   561  		svcName := service
   562  		if svcName == regexAny {
   563  			svcName = rKey.Name
   564  		}
   565  
   566  		dstBasicStats[dstKey{
   567  			Namespace: rKey.Namespace,
   568  			Service:   svcName,
   569  			Dst:       rKey.Name,
   570  		}] = basicStatsVal
   571  	}
   572  
   573  	for rKey, tcpStatsVal := range tcpStats {
   574  
   575  		// Use the returned `dst_service` in the `all` svc case
   576  		svcName := service
   577  		if svcName == regexAny {
   578  			svcName = rKey.Name
   579  		}
   580  
   581  		dstTCPStats[dstKey{
   582  			Namespace: rKey.Namespace,
   583  			Service:   svcName,
   584  			Dst:       rKey.Name,
   585  		}] = tcpStatsVal
   586  	}
   587  
   588  	return dstBasicStats, dstTCPStats, nil
   589  }
   590  
   591  func processPrometheusMetrics(req *pb.StatSummaryRequest, results []promResult, groupBy model.LabelNames) (map[rKey]*pb.BasicStats, map[rKey]*pb.TcpStats, map[rKey]*pb.ServerStats) {
   592  	basicStats := make(map[rKey]*pb.BasicStats)
   593  	tcpStats := make(map[rKey]*pb.TcpStats)
   594  	authzStats := make(map[rKey]*pb.ServerStats)
   595  
   596  	for _, result := range results {
   597  		for _, sample := range result.vec {
   598  			resource := metricToKey(req, sample.Metric, groupBy)
   599  
   600  			addBasicStats := func() {
   601  				if basicStats[resource] == nil {
   602  					basicStats[resource] = &pb.BasicStats{}
   603  				}
   604  			}
   605  			addTCPStats := func() {
   606  				if tcpStats[resource] == nil {
   607  					tcpStats[resource] = &pb.TcpStats{}
   608  				}
   609  			}
   610  
   611  			if authzStats[resource] == nil {
   612  				srv := pb.Resource{
   613  					Type: string(sample.Metric[serverKindLabel]),
   614  					Name: string(sample.Metric[serverNameLabel]),
   615  				}
   616  				route := pb.Resource{
   617  					Type: string(sample.Metric[routeKindLabel]),
   618  					Name: string(sample.Metric[routeNameLabel]),
   619  				}
   620  				authz := pb.Resource{
   621  					Type: string(sample.Metric[authorizationKindLabel]),
   622  					Name: string(sample.Metric[authorizationNameLabel]),
   623  				}
   624  				authzStats[resource] = &pb.ServerStats{
   625  					Srv:   &srv,
   626  					Route: &route,
   627  					Authz: &authz,
   628  				}
   629  			}
   630  
   631  			value := extractSampleValue(sample)
   632  
   633  			switch result.prom {
   634  			case promRequests:
   635  				addBasicStats()
   636  				switch string(sample.Metric[model.LabelName("classification")]) {
   637  				case success:
   638  					basicStats[resource].SuccessCount += value
   639  				case failure:
   640  					basicStats[resource].FailureCount += value
   641  				}
   642  			case promLatencyP50:
   643  				addBasicStats()
   644  				basicStats[resource].LatencyMsP50 = value
   645  			case promLatencyP95:
   646  				addBasicStats()
   647  				basicStats[resource].LatencyMsP95 = value
   648  			case promLatencyP99:
   649  				addBasicStats()
   650  				basicStats[resource].LatencyMsP99 = value
   651  			case promTCPConnections:
   652  				addTCPStats()
   653  				tcpStats[resource].OpenConnections = value
   654  			case promTCPReadBytes:
   655  				addTCPStats()
   656  				tcpStats[resource].ReadBytesTotal = value
   657  			case promTCPWriteBytes:
   658  				addTCPStats()
   659  				tcpStats[resource].WriteBytesTotal = value
   660  			case promAllowedRequests:
   661  				authzStats[resource].AllowedCount = value
   662  			case promDeniedRequests:
   663  				authzStats[resource].DeniedCount = value
   664  			}
   665  		}
   666  	}
   667  
   668  	return basicStats, tcpStats, authzStats
   669  }
   670  
   671  func metricToKey(req *pb.StatSummaryRequest, metric model.Metric, groupBy model.LabelNames) rKey {
   672  	// this key is used to match the metric stats we queried from prometheus
   673  	// with the k8s object stats we queried from k8s
   674  	// ASSUMPTION: this code assumes that groupBy is always ordered (..., namespace, name)
   675  	key := rKey{
   676  		Type: req.GetSelector().GetResource().GetType(),
   677  		Name: string(metric[groupBy[len(groupBy)-1]]),
   678  	}
   679  
   680  	if len(groupBy) >= 2 {
   681  		key.Namespace = string(metric[groupBy[len(groupBy)-2]])
   682  	}
   683  
   684  	return key
   685  }
   686  
   687  func (s *grpcServer) getPodStats(obj runtime.Object) (*podStats, error) {
   688  	pods, err := s.k8sAPI.GetPodsFor(obj, true)
   689  	if err != nil {
   690  		return nil, err
   691  	}
   692  	podErrors := make(map[string]*pb.PodErrors)
   693  	meshCount := &podStats{}
   694  
   695  	if pod, ok := obj.(*corev1.Pod); ok {
   696  		meshCount.status = k8s.GetPodStatus(*pod)
   697  	}
   698  
   699  	for _, pod := range pods {
   700  		if pod.Status.Phase == corev1.PodFailed {
   701  			meshCount.failed++
   702  		} else {
   703  			meshCount.total++
   704  			if k8s.IsMeshed(pod, s.controllerNamespace) {
   705  				meshCount.inMesh++
   706  			}
   707  		}
   708  
   709  		errors := checkContainerErrors(pod.Status.ContainerStatuses)
   710  		errors = append(errors, checkContainerErrors(pod.Status.InitContainerStatuses)...)
   711  
   712  		if len(errors) > 0 {
   713  			podErrors[pod.Name] = &pb.PodErrors{Errors: errors}
   714  		}
   715  	}
   716  	meshCount.errors = podErrors
   717  	return meshCount, nil
   718  }
   719  
   720  func toPodError(container, image, reason, message string) *pb.PodErrors_PodError {
   721  	return &pb.PodErrors_PodError{
   722  		Error: &pb.PodErrors_PodError_Container{
   723  			Container: &pb.PodErrors_PodError_ContainerError{
   724  				Message:   message,
   725  				Container: container,
   726  				Image:     image,
   727  				Reason:    reason,
   728  			},
   729  		},
   730  	}
   731  }
   732  
   733  func checkContainerErrors(containerStatuses []corev1.ContainerStatus) []*pb.PodErrors_PodError {
   734  	errors := []*pb.PodErrors_PodError{}
   735  	for _, st := range containerStatuses {
   736  		if !st.Ready {
   737  			if st.State.Waiting != nil {
   738  				errors = append(errors, toPodError(st.Name, st.Image, st.State.Waiting.Reason, st.State.Waiting.Message))
   739  			}
   740  
   741  			if st.State.Terminated != nil && (st.State.Terminated.ExitCode != 0 || st.State.Terminated.Signal != 0) {
   742  				errors = append(errors, toPodError(st.Name, st.Image, st.State.Terminated.Reason, st.State.Terminated.Message))
   743  			}
   744  
   745  			if st.LastTerminationState.Waiting != nil {
   746  				errors = append(errors, toPodError(st.Name, st.Image, st.LastTerminationState.Waiting.Reason, st.LastTerminationState.Waiting.Message))
   747  			}
   748  
   749  			if st.LastTerminationState.Terminated != nil {
   750  				errors = append(errors, toPodError(st.Name, st.Image, st.LastTerminationState.Terminated.Reason, st.LastTerminationState.Terminated.Message))
   751  			}
   752  		}
   753  	}
   754  	return errors
   755  }
   756  
   757  func getLabelSelector(req *pb.StatSummaryRequest) (labels.Selector, error) {
   758  	labelSelector := labels.Everything()
   759  	if s := req.GetSelector().GetLabelSelector(); s != "" {
   760  		var err error
   761  		labelSelector, err = labels.Parse(s)
   762  		if err != nil {
   763  			return nil, fmt.Errorf("invalid label selector %q: %w", s, err)
   764  		}
   765  	}
   766  	return labelSelector, nil
   767  }
   768  
   769  func dstFromAuthority(authority string) string {
   770  	// name.namespace.svc.suffix
   771  	labels := strings.Split(authority, ".")
   772  	if len(labels) >= 3 && labels[2] == "svc" {
   773  		// name
   774  		return labels[0]
   775  	}
   776  	return authority
   777  }
   778  

View as plain text