...

Source file src/github.com/linkerd/linkerd2/viz/metrics-api/grpc_server.go

Documentation: github.com/linkerd/linkerd2/viz/metrics-api

     1  package api
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"time"
     8  
     9  	"github.com/golang/protobuf/ptypes/duration"
    10  	"github.com/linkerd/linkerd2/controller/k8s"
    11  	pkgK8s "github.com/linkerd/linkerd2/pkg/k8s"
    12  	"github.com/linkerd/linkerd2/pkg/prometheus"
    13  	pb "github.com/linkerd/linkerd2/viz/metrics-api/gen/viz"
    14  	"github.com/linkerd/linkerd2/viz/metrics-api/util"
    15  	promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
    16  	log "github.com/sirupsen/logrus"
    17  	"google.golang.org/grpc"
    18  	"gopkg.in/yaml.v2"
    19  	corev1 "k8s.io/api/core/v1"
    20  	"k8s.io/apimachinery/pkg/labels"
    21  )
    22  
    23  // Server specifies the interface the Viz metric API server should implement
    24  type Server interface {
    25  	pb.ApiServer
    26  }
    27  
    28  type grpcServer struct {
    29  	pb.UnimplementedApiServer
    30  	prometheusAPI       promv1.API
    31  	k8sAPI              *k8s.API
    32  	controllerNamespace string
    33  	clusterDomain       string
    34  	ignoredNamespaces   []string
    35  }
    36  
    37  type podReport struct {
    38  	lastReport              time.Time
    39  	processStartTimeSeconds time.Time
    40  }
    41  
    42  const (
    43  	podQuery                   = "max(process_start_time_seconds{%s}) by (pod, namespace)"
    44  	k8sClientSubsystemName     = "kubernetes"
    45  	k8sClientCheckDescription  = "linkerd viz can talk to Kubernetes"
    46  	promClientSubsystemName    = "prometheus"
    47  	promClientCheckDescription = "linkerd viz can talk to Prometheus"
    48  )
    49  
    50  // NewGrpcServer creates a new instance of the Api server and registers it
    51  // with Prometheus.
    52  func NewGrpcServer(
    53  	promAPI promv1.API,
    54  	k8sAPI *k8s.API,
    55  	controllerNamespace string,
    56  	clusterDomain string,
    57  	ignoredNamespaces []string,
    58  ) *grpc.Server {
    59  
    60  	server := &grpcServer{
    61  		prometheusAPI:       promAPI,
    62  		k8sAPI:              k8sAPI,
    63  		controllerNamespace: controllerNamespace,
    64  		clusterDomain:       clusterDomain,
    65  		ignoredNamespaces:   ignoredNamespaces,
    66  	}
    67  
    68  	s := prometheus.NewGrpcServer(grpc.MaxConcurrentStreams(0))
    69  	pb.RegisterApiServer(s, server)
    70  
    71  	return s
    72  }
    73  
    74  func (s *grpcServer) ListPods(ctx context.Context, req *pb.ListPodsRequest) (*pb.ListPodsResponse, error) {
    75  	log.Debugf("ListPods request: %+v", req)
    76  
    77  	targetOwner := req.GetSelector().GetResource()
    78  
    79  	// Reports is a map from instance name to the absolute time of the most recent
    80  	// report from that instance and its process start time
    81  	reports := make(map[string]podReport)
    82  
    83  	labelSelector := labels.Everything()
    84  	if s := req.GetSelector().GetLabelSelector(); s != "" {
    85  		var err error
    86  		labelSelector, err = labels.Parse(s)
    87  		if err != nil {
    88  			return nil, fmt.Errorf("invalid label selector %q: %w", s, err)
    89  		}
    90  	}
    91  
    92  	nsQuery := ""
    93  	namespace := ""
    94  	if targetOwner.GetNamespace() != "" {
    95  		namespace = targetOwner.GetNamespace()
    96  	} else if targetOwner.GetType() == pkgK8s.Namespace {
    97  		namespace = targetOwner.GetName()
    98  	}
    99  	if namespace != "" {
   100  		nsQuery = fmt.Sprintf("namespace=\"%s\"", namespace)
   101  	}
   102  	processStartTimeQuery := fmt.Sprintf(podQuery, nsQuery)
   103  
   104  	// Query Prometheus for all pods present
   105  	vec, err := s.queryProm(ctx, processStartTimeQuery)
   106  	if err != nil && !errors.Is(err, ErrNoPrometheusInstance) {
   107  		return nil, err
   108  	}
   109  
   110  	for _, sample := range vec {
   111  		pod := string(sample.Metric["pod"])
   112  		timestamp := sample.Timestamp
   113  
   114  		reports[pod] = podReport{
   115  			lastReport:              time.Unix(0, int64(timestamp)*int64(time.Millisecond)),
   116  			processStartTimeSeconds: time.Unix(0, int64(sample.Value)*int64(time.Second)),
   117  		}
   118  	}
   119  
   120  	var pods []*corev1.Pod
   121  	if namespace != "" {
   122  		pods, err = s.k8sAPI.Pod().Lister().Pods(namespace).List(labelSelector)
   123  	} else {
   124  		pods, err = s.k8sAPI.Pod().Lister().List(labelSelector)
   125  	}
   126  
   127  	if err != nil {
   128  		return nil, err
   129  	}
   130  	podList := make([]*pb.Pod, 0)
   131  
   132  	for _, pod := range pods {
   133  		if s.shouldIgnore(pod) {
   134  			continue
   135  		}
   136  
   137  		ownerKind, ownerName := s.k8sAPI.GetOwnerKindAndName(ctx, pod, false)
   138  		// filter out pods without matching owner
   139  		if targetOwner.GetNamespace() != "" && targetOwner.GetNamespace() != pod.GetNamespace() {
   140  			continue
   141  		}
   142  		if targetOwner.GetType() != "" && targetOwner.GetType() != ownerKind {
   143  			continue
   144  		}
   145  		if targetOwner.GetName() != "" && targetOwner.GetName() != ownerName {
   146  			continue
   147  		}
   148  
   149  		updated, added := reports[pod.Name]
   150  
   151  		item := util.K8sPodToPublicPod(*pod, ownerKind, ownerName)
   152  		item.Added = added
   153  
   154  		if added {
   155  			since := time.Since(updated.lastReport)
   156  			item.SinceLastReport = &duration.Duration{
   157  				Seconds: int64(since / time.Second),
   158  				Nanos:   int32(since % time.Second),
   159  			}
   160  			sinceStarting := time.Since(updated.processStartTimeSeconds)
   161  			item.Uptime = &duration.Duration{
   162  				Seconds: int64(sinceStarting / time.Second),
   163  				Nanos:   int32(sinceStarting % time.Second),
   164  			}
   165  		}
   166  
   167  		podList = append(podList, item)
   168  	}
   169  
   170  	rsp := pb.ListPodsResponse{Pods: podList}
   171  
   172  	log.Debugf("ListPods response: %s", rsp.String())
   173  
   174  	return &rsp, nil
   175  }
   176  
   177  func (s *grpcServer) SelfCheck(ctx context.Context, in *pb.SelfCheckRequest) (*pb.SelfCheckResponse, error) {
   178  	k8sClientCheck := &pb.CheckResult{
   179  		SubsystemName:    k8sClientSubsystemName,
   180  		CheckDescription: k8sClientCheckDescription,
   181  		Status:           pb.CheckStatus_OK,
   182  	}
   183  	_, err := s.k8sAPI.Pod().Lister().List(labels.Everything())
   184  	if err != nil {
   185  		k8sClientCheck.Status = pb.CheckStatus_ERROR
   186  		k8sClientCheck.FriendlyMessageToUser = fmt.Sprintf("Error calling the Kubernetes API: %s", err)
   187  	}
   188  
   189  	response := &pb.SelfCheckResponse{
   190  		Results: []*pb.CheckResult{
   191  			k8sClientCheck,
   192  		},
   193  	}
   194  
   195  	if s.prometheusAPI != nil {
   196  		promClientCheck := &pb.CheckResult{
   197  			SubsystemName:    promClientSubsystemName,
   198  			CheckDescription: promClientCheckDescription,
   199  			Status:           pb.CheckStatus_OK,
   200  		}
   201  		_, err = s.queryProm(ctx, fmt.Sprintf(podQuery, ""))
   202  		if err != nil {
   203  			promClientCheck.Status = pb.CheckStatus_ERROR
   204  			promClientCheck.FriendlyMessageToUser = fmt.Sprintf("Error calling Prometheus from the control plane: %s", err)
   205  		}
   206  
   207  		response.Results = append(response.Results, promClientCheck)
   208  	}
   209  
   210  	return response, nil
   211  }
   212  
   213  func (s *grpcServer) shouldIgnore(pod *corev1.Pod) bool {
   214  	for _, namespace := range s.ignoredNamespaces {
   215  		if pod.Namespace == namespace {
   216  			return true
   217  		}
   218  	}
   219  	return false
   220  }
   221  
   222  func (s *grpcServer) ListServices(ctx context.Context, req *pb.ListServicesRequest) (*pb.ListServicesResponse, error) {
   223  	log.Debugf("ListServices request: %+v", req)
   224  
   225  	services, err := s.k8sAPI.GetServices(req.Namespace, "")
   226  	if err != nil {
   227  		return nil, err
   228  	}
   229  
   230  	svcs := make([]*pb.Service, 0)
   231  	for _, svc := range services {
   232  		svcs = append(svcs, &pb.Service{
   233  			Name:      svc.GetName(),
   234  			Namespace: svc.GetNamespace(),
   235  		})
   236  	}
   237  
   238  	return &pb.ListServicesResponse{Services: svcs}, nil
   239  }
   240  
   241  // validateTimeWindow returns an error if the Prometheus scrape interval
   242  // is longer than the query time window. This is an opportunistic, best-effort
   243  // validation: if we cannot determine the Prometheus scrape interval for any
   244  // reason, we do not return an error.
   245  func (s *grpcServer) validateTimeWindow(ctx context.Context, window string) error {
   246  	config, err := s.prometheusAPI.Config(ctx)
   247  	if err != nil {
   248  		return nil
   249  	}
   250  
   251  	type PrometheusConfig struct {
   252  		Global map[string]string
   253  	}
   254  
   255  	var prom PrometheusConfig
   256  	err = yaml.Unmarshal([]byte(config.YAML), &prom)
   257  	if err != nil {
   258  		return nil
   259  	}
   260  
   261  	scrape_interval_str, found := prom.Global["scrape_interval"]
   262  	if !found {
   263  		return nil
   264  	}
   265  
   266  	scrape_interval, err := time.ParseDuration(scrape_interval_str)
   267  	if err != nil {
   268  		return nil
   269  	}
   270  
   271  	t, err := time.ParseDuration(window)
   272  	if err != nil {
   273  		return err
   274  	}
   275  
   276  	if t < scrape_interval {
   277  		return fmt.Errorf("time window (%s) must be at least as long as the Prometheus scrape interval (%s)", window, scrape_interval)
   278  	}
   279  	return nil
   280  }
   281  

View as plain text