...

Source file src/github.com/linkerd/linkerd2/controller/heartbeat/heartbeat.go

Documentation: github.com/linkerd/linkerd2/controller/heartbeat

     1  package heartbeat
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"math"
     7  	"net/http"
     8  	"net/url"
     9  	"strconv"
    10  	"time"
    11  
    12  	pkgK8s "github.com/linkerd/linkerd2/controller/k8s"
    13  	"github.com/linkerd/linkerd2/pkg/config"
    14  	"github.com/linkerd/linkerd2/pkg/k8s"
    15  	"github.com/linkerd/linkerd2/pkg/util"
    16  	"github.com/linkerd/linkerd2/pkg/version"
    17  	promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
    18  	"github.com/prometheus/common/model"
    19  	log "github.com/sirupsen/logrus"
    20  	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    21  )
    22  
    23  type containerMeta struct {
    24  	name model.LabelValue
    25  	ns   model.LabelValue
    26  }
    27  
    28  // K8sValues gathers relevant heartbeat information from Kubernetes
    29  func K8sValues(ctx context.Context, kubeAPI *k8s.KubernetesAPI, controlPlaneNamespace string) url.Values {
    30  	v := url.Values{}
    31  
    32  	cm, err := config.FetchLinkerdConfigMap(ctx, kubeAPI, controlPlaneNamespace)
    33  	if err != nil {
    34  		log.Errorf("Failed to fetch linkerd-config: %s", err)
    35  	} else {
    36  		v.Set("uuid", string(cm.GetUID()))
    37  		v.Set("install-time", strconv.FormatInt(cm.GetCreationTimestamp().Unix(), 10))
    38  	}
    39  
    40  	versionInfo, err := kubeAPI.GetVersionInfo()
    41  	if err != nil {
    42  		log.Errorf("Failed to fetch Kubernetes version info: %s", err)
    43  	} else {
    44  		v.Set("k8s-version", versionInfo.String())
    45  	}
    46  
    47  	namespaces, err := kubeAPI.GetAllNamespacesWithExtensionLabel(ctx)
    48  	if err != nil {
    49  		log.Errorf("Failed to fetch namespaces with %s label: %s", k8s.LinkerdExtensionLabel, err)
    50  	} else {
    51  		for _, ns := range namespaces {
    52  			extensionNameParam := fmt.Sprintf("ext-%s", ns.Labels[k8s.LinkerdExtensionLabel])
    53  			v.Set(extensionNameParam, "1")
    54  		}
    55  	}
    56  
    57  	err = k8s.ServiceProfilesAccess(ctx, kubeAPI)
    58  	if err != nil {
    59  		log.Errorf("Failed to verify service profile access: %s", err)
    60  		return v
    61  	}
    62  
    63  	l5dCrdClient, err := pkgK8s.NewL5DCRDClient(kubeAPI.Config)
    64  	if err != nil {
    65  		log.Errorf("Failed to create Linkerd CRD client: %s", err)
    66  		return v
    67  	}
    68  
    69  	spList, err := l5dCrdClient.LinkerdV1alpha2().ServiceProfiles("").List(ctx, v1.ListOptions{})
    70  	if err != nil {
    71  		log.Errorf("Failed to get service profiles: %s", err)
    72  		return v
    73  	}
    74  
    75  	v.Set("service-profile-count", strconv.Itoa(len(spList.Items)))
    76  
    77  	return v
    78  }
    79  
    80  // PromValues gathers relevant heartbeat information from Prometheus
    81  func PromValues(promAPI promv1.API, controlPlaneNamespace string) url.Values {
    82  	v := url.Values{}
    83  
    84  	jobProxyLabels := model.LabelSet{"job": "linkerd-proxy"}
    85  
    86  	// total-rps
    87  	query := fmt.Sprintf("sum(rate(request_total%s[30s]))", jobProxyLabels.Merge(model.LabelSet{"direction": "inbound"}))
    88  	value, err := promQuery(promAPI, query, 0)
    89  	if err != nil {
    90  		log.Errorf("Prometheus query failed: %s", err)
    91  	} else {
    92  		v.Set("total-rps", value)
    93  	}
    94  
    95  	// meshed-pods
    96  	query = fmt.Sprintf("count(count by (pod) (request_total%s))", jobProxyLabels)
    97  	value, err = promQuery(promAPI, query, 0)
    98  	if err != nil {
    99  		log.Errorf("Prometheus query failed: %s", err)
   100  	} else {
   101  		v.Set("meshed-pods", value)
   102  	}
   103  
   104  	// p95-handle-us
   105  	query = fmt.Sprintf("histogram_quantile(0.99, sum(rate(request_handle_us_bucket%s[24h])) by (le))", jobProxyLabels)
   106  	value, err = promQuery(promAPI, query, 0)
   107  	if err != nil {
   108  		log.Errorf("Prometheus query failed: %s", err)
   109  	} else {
   110  		v.Set("p99-handle-us", value)
   111  	}
   112  
   113  	// proxy-injector-injections
   114  	jobInjectorLabels := model.LabelSet{
   115  		"job":  "linkerd-controller",
   116  		"skip": "false",
   117  	}
   118  	query = fmt.Sprintf("sum(proxy_inject_admission_responses_total%s)", jobInjectorLabels)
   119  	value, err = promQuery(promAPI, query, 0)
   120  	if err != nil {
   121  		log.Errorf("Prometheus query failed: %s", err)
   122  	} else {
   123  		v.Set("proxy-injector-injections", value)
   124  	}
   125  
   126  	// container metrics
   127  	for _, container := range []containerMeta{
   128  		{
   129  			name: "linkerd-proxy",
   130  		},
   131  		{
   132  			name: "destination",
   133  			ns:   "linkerd",
   134  		},
   135  		{
   136  			name: "prometheus",
   137  			ns:   "linkerd",
   138  		},
   139  	} {
   140  		// as of k8s 1.16 cadvisor labels container names with just `container`
   141  		containerLabelsPre16 := getLabelSet(container, "container_name")
   142  		containerLabelsPost16 := getLabelSet(container, "container")
   143  
   144  		// max-mem
   145  		query = fmt.Sprintf("max(container_memory_working_set_bytes%s or container_memory_working_set_bytes%s)",
   146  			containerLabelsPre16, containerLabelsPost16)
   147  		value, err = promQuery(promAPI, query, 0)
   148  		if err != nil {
   149  			log.Errorf("Prometheus query failed: %s", err)
   150  		} else {
   151  			param := fmt.Sprintf("max-mem-%s", container.name)
   152  			v.Set(param, value)
   153  		}
   154  
   155  		// p95-cpu
   156  		query = fmt.Sprintf("max(quantile_over_time(0.95,rate(container_cpu_usage_seconds_total%s[5m])[24h:5m]) "+
   157  			"or quantile_over_time(0.95,rate(container_cpu_usage_seconds_total%s[5m])[24h:5m]))",
   158  			containerLabelsPre16, containerLabelsPost16)
   159  		value, err = promQuery(promAPI, query, 3)
   160  		if err != nil {
   161  			log.Errorf("Prometheus query failed: %s", err)
   162  		} else {
   163  			param := fmt.Sprintf("p95-cpu-%s", container.name)
   164  			v.Set(param, value)
   165  		}
   166  	}
   167  
   168  	return v
   169  }
   170  
   171  func getLabelSet(container containerMeta, containerKey model.LabelName) model.LabelSet {
   172  	containerLabels := model.LabelSet{
   173  		"job":        "kubernetes-nodes-cadvisor",
   174  		containerKey: container.name,
   175  	}
   176  	if container.ns != "" {
   177  		containerLabels["namespace"] = container.ns
   178  	}
   179  	return containerLabels
   180  }
   181  
   182  func promQuery(promAPI promv1.API, query string, precision int) (string, error) {
   183  	log.Debugf("Prometheus query: %s", query)
   184  
   185  	res, warn, err := promAPI.Query(context.Background(), query, time.Time{})
   186  	if err != nil {
   187  		return "", err
   188  	}
   189  	if warn != nil {
   190  		log.Warnf("%v", warn)
   191  	}
   192  
   193  	if result, ok := res.(model.Vector); ok {
   194  		if len(result) != 1 {
   195  			return "", fmt.Errorf("unexpected result Prometheus result vector length: %d", len(result))
   196  		}
   197  		f := float64(result[0].Value)
   198  		if math.IsNaN(f) {
   199  			return "", fmt.Errorf("unexpected sample value: %v", result[0].Value)
   200  		}
   201  
   202  		return strconv.FormatFloat(f, 'f', precision, 64), nil
   203  	}
   204  
   205  	return "", fmt.Errorf("unexpected query result type (expected Vector): %s", res.Type())
   206  }
   207  
   208  // MergeValues merges two url.Values
   209  func MergeValues(v1, v2 url.Values) url.Values {
   210  	v := url.Values{}
   211  	for key, val := range v1 {
   212  		v[key] = val
   213  	}
   214  	for key, val := range v2 {
   215  		v[key] = val
   216  	}
   217  	return v
   218  }
   219  
   220  // Send takes a map of url.Values and sends them to versioncheck.linkerd.io
   221  func Send(v url.Values) error {
   222  	return send(http.DefaultClient, version.CheckURL, v)
   223  }
   224  
   225  func send(client *http.Client, baseURL string, v url.Values) error {
   226  	req, err := http.NewRequest("GET", baseURL, nil)
   227  	if err != nil {
   228  		return fmt.Errorf("failed to create HTTP request for base URL [%s]: %w", baseURL, err)
   229  	}
   230  	req.URL.RawQuery = v.Encode()
   231  
   232  	log.Infof("Sending heartbeat: %s", req.URL.String())
   233  	resp, err := client.Do(req)
   234  	if err != nil {
   235  		return fmt.Errorf("check URL [%s] request failed with: %w", req.URL.String(), err)
   236  	}
   237  
   238  	defer resp.Body.Close()
   239  
   240  	body, err := util.ReadAllLimit(resp.Body, util.MB)
   241  	if err != nil {
   242  		return fmt.Errorf("failed to read response body: %w", err)
   243  	}
   244  	if resp.StatusCode != http.StatusOK {
   245  		return fmt.Errorf("request failed with code %d; response body: %s", resp.StatusCode, string(body))
   246  	}
   247  
   248  	log.Infof("Successfully sent heartbeat: %s", string(body))
   249  
   250  	return nil
   251  }
   252  

View as plain text