1 package heartbeat
2
3 import (
4 "context"
5 "fmt"
6 "math"
7 "net/http"
8 "net/url"
9 "strconv"
10 "time"
11
12 pkgK8s "github.com/linkerd/linkerd2/controller/k8s"
13 "github.com/linkerd/linkerd2/pkg/config"
14 "github.com/linkerd/linkerd2/pkg/k8s"
15 "github.com/linkerd/linkerd2/pkg/util"
16 "github.com/linkerd/linkerd2/pkg/version"
17 promv1 "github.com/prometheus/client_golang/api/prometheus/v1"
18 "github.com/prometheus/common/model"
19 log "github.com/sirupsen/logrus"
20 v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21 )
22
23 type containerMeta struct {
24 name model.LabelValue
25 ns model.LabelValue
26 }
27
28
29 func K8sValues(ctx context.Context, kubeAPI *k8s.KubernetesAPI, controlPlaneNamespace string) url.Values {
30 v := url.Values{}
31
32 cm, err := config.FetchLinkerdConfigMap(ctx, kubeAPI, controlPlaneNamespace)
33 if err != nil {
34 log.Errorf("Failed to fetch linkerd-config: %s", err)
35 } else {
36 v.Set("uuid", string(cm.GetUID()))
37 v.Set("install-time", strconv.FormatInt(cm.GetCreationTimestamp().Unix(), 10))
38 }
39
40 versionInfo, err := kubeAPI.GetVersionInfo()
41 if err != nil {
42 log.Errorf("Failed to fetch Kubernetes version info: %s", err)
43 } else {
44 v.Set("k8s-version", versionInfo.String())
45 }
46
47 namespaces, err := kubeAPI.GetAllNamespacesWithExtensionLabel(ctx)
48 if err != nil {
49 log.Errorf("Failed to fetch namespaces with %s label: %s", k8s.LinkerdExtensionLabel, err)
50 } else {
51 for _, ns := range namespaces {
52 extensionNameParam := fmt.Sprintf("ext-%s", ns.Labels[k8s.LinkerdExtensionLabel])
53 v.Set(extensionNameParam, "1")
54 }
55 }
56
57 err = k8s.ServiceProfilesAccess(ctx, kubeAPI)
58 if err != nil {
59 log.Errorf("Failed to verify service profile access: %s", err)
60 return v
61 }
62
63 l5dCrdClient, err := pkgK8s.NewL5DCRDClient(kubeAPI.Config)
64 if err != nil {
65 log.Errorf("Failed to create Linkerd CRD client: %s", err)
66 return v
67 }
68
69 spList, err := l5dCrdClient.LinkerdV1alpha2().ServiceProfiles("").List(ctx, v1.ListOptions{})
70 if err != nil {
71 log.Errorf("Failed to get service profiles: %s", err)
72 return v
73 }
74
75 v.Set("service-profile-count", strconv.Itoa(len(spList.Items)))
76
77 return v
78 }
79
80
81 func PromValues(promAPI promv1.API, controlPlaneNamespace string) url.Values {
82 v := url.Values{}
83
84 jobProxyLabels := model.LabelSet{"job": "linkerd-proxy"}
85
86
87 query := fmt.Sprintf("sum(rate(request_total%s[30s]))", jobProxyLabels.Merge(model.LabelSet{"direction": "inbound"}))
88 value, err := promQuery(promAPI, query, 0)
89 if err != nil {
90 log.Errorf("Prometheus query failed: %s", err)
91 } else {
92 v.Set("total-rps", value)
93 }
94
95
96 query = fmt.Sprintf("count(count by (pod) (request_total%s))", jobProxyLabels)
97 value, err = promQuery(promAPI, query, 0)
98 if err != nil {
99 log.Errorf("Prometheus query failed: %s", err)
100 } else {
101 v.Set("meshed-pods", value)
102 }
103
104
105 query = fmt.Sprintf("histogram_quantile(0.99, sum(rate(request_handle_us_bucket%s[24h])) by (le))", jobProxyLabels)
106 value, err = promQuery(promAPI, query, 0)
107 if err != nil {
108 log.Errorf("Prometheus query failed: %s", err)
109 } else {
110 v.Set("p99-handle-us", value)
111 }
112
113
114 jobInjectorLabels := model.LabelSet{
115 "job": "linkerd-controller",
116 "skip": "false",
117 }
118 query = fmt.Sprintf("sum(proxy_inject_admission_responses_total%s)", jobInjectorLabels)
119 value, err = promQuery(promAPI, query, 0)
120 if err != nil {
121 log.Errorf("Prometheus query failed: %s", err)
122 } else {
123 v.Set("proxy-injector-injections", value)
124 }
125
126
127 for _, container := range []containerMeta{
128 {
129 name: "linkerd-proxy",
130 },
131 {
132 name: "destination",
133 ns: "linkerd",
134 },
135 {
136 name: "prometheus",
137 ns: "linkerd",
138 },
139 } {
140
141 containerLabelsPre16 := getLabelSet(container, "container_name")
142 containerLabelsPost16 := getLabelSet(container, "container")
143
144
145 query = fmt.Sprintf("max(container_memory_working_set_bytes%s or container_memory_working_set_bytes%s)",
146 containerLabelsPre16, containerLabelsPost16)
147 value, err = promQuery(promAPI, query, 0)
148 if err != nil {
149 log.Errorf("Prometheus query failed: %s", err)
150 } else {
151 param := fmt.Sprintf("max-mem-%s", container.name)
152 v.Set(param, value)
153 }
154
155
156 query = fmt.Sprintf("max(quantile_over_time(0.95,rate(container_cpu_usage_seconds_total%s[5m])[24h:5m]) "+
157 "or quantile_over_time(0.95,rate(container_cpu_usage_seconds_total%s[5m])[24h:5m]))",
158 containerLabelsPre16, containerLabelsPost16)
159 value, err = promQuery(promAPI, query, 3)
160 if err != nil {
161 log.Errorf("Prometheus query failed: %s", err)
162 } else {
163 param := fmt.Sprintf("p95-cpu-%s", container.name)
164 v.Set(param, value)
165 }
166 }
167
168 return v
169 }
170
171 func getLabelSet(container containerMeta, containerKey model.LabelName) model.LabelSet {
172 containerLabels := model.LabelSet{
173 "job": "kubernetes-nodes-cadvisor",
174 containerKey: container.name,
175 }
176 if container.ns != "" {
177 containerLabels["namespace"] = container.ns
178 }
179 return containerLabels
180 }
181
182 func promQuery(promAPI promv1.API, query string, precision int) (string, error) {
183 log.Debugf("Prometheus query: %s", query)
184
185 res, warn, err := promAPI.Query(context.Background(), query, time.Time{})
186 if err != nil {
187 return "", err
188 }
189 if warn != nil {
190 log.Warnf("%v", warn)
191 }
192
193 if result, ok := res.(model.Vector); ok {
194 if len(result) != 1 {
195 return "", fmt.Errorf("unexpected result Prometheus result vector length: %d", len(result))
196 }
197 f := float64(result[0].Value)
198 if math.IsNaN(f) {
199 return "", fmt.Errorf("unexpected sample value: %v", result[0].Value)
200 }
201
202 return strconv.FormatFloat(f, 'f', precision, 64), nil
203 }
204
205 return "", fmt.Errorf("unexpected query result type (expected Vector): %s", res.Type())
206 }
207
208
209 func MergeValues(v1, v2 url.Values) url.Values {
210 v := url.Values{}
211 for key, val := range v1 {
212 v[key] = val
213 }
214 for key, val := range v2 {
215 v[key] = val
216 }
217 return v
218 }
219
220
221 func Send(v url.Values) error {
222 return send(http.DefaultClient, version.CheckURL, v)
223 }
224
225 func send(client *http.Client, baseURL string, v url.Values) error {
226 req, err := http.NewRequest("GET", baseURL, nil)
227 if err != nil {
228 return fmt.Errorf("failed to create HTTP request for base URL [%s]: %w", baseURL, err)
229 }
230 req.URL.RawQuery = v.Encode()
231
232 log.Infof("Sending heartbeat: %s", req.URL.String())
233 resp, err := client.Do(req)
234 if err != nil {
235 return fmt.Errorf("check URL [%s] request failed with: %w", req.URL.String(), err)
236 }
237
238 defer resp.Body.Close()
239
240 body, err := util.ReadAllLimit(resp.Body, util.MB)
241 if err != nil {
242 return fmt.Errorf("failed to read response body: %w", err)
243 }
244 if resp.StatusCode != http.StatusOK {
245 return fmt.Errorf("request failed with code %d; response body: %s", resp.StatusCode, string(body))
246 }
247
248 log.Infof("Successfully sent heartbeat: %s", string(body))
249
250 return nil
251 }
252
View as plain text