1 package cmd
2
3 import (
4 "bytes"
5 "crypto/sha256"
6 "fmt"
7 "sort"
8 "sync/atomic"
9 "time"
10
11 "github.com/linkerd/linkerd2/pkg/k8s"
12 "github.com/prometheus/common/expfmt"
13 corev1 "k8s.io/api/core/v1"
14 )
15
16
17 type metricsResult struct {
18 pod string
19 container string
20 metrics []byte
21 err error
22 }
23 type byResult []metricsResult
24
25 func (s byResult) Len() int {
26 return len(s)
27 }
28 func (s byResult) Swap(i, j int) {
29 s[i], s[j] = s[j], s[i]
30 }
31 func (s byResult) Less(i, j int) bool {
32 return s[i].pod < s[j].pod || ((s[i].pod == s[j].pod) && s[i].container < s[j].container)
33 }
34
35
36
37 func getAllContainersWithPort(
38 pod corev1.Pod,
39 portName string,
40 ) ([]corev1.Container, error) {
41 if pod.Status.Phase != corev1.PodRunning {
42 return nil, fmt.Errorf("pod not running: %s", pod.GetName())
43 }
44 var containers []corev1.Container
45
46 allContainers := append(pod.Spec.InitContainers, pod.Spec.Containers...)
47 for _, c := range allContainers {
48 for _, p := range c.Ports {
49 if p.Name == portName {
50 containers = append(containers, c)
51 }
52 }
53 }
54 return containers, nil
55 }
56
57
58
59 func getMetrics(
60 k8sAPI *k8s.KubernetesAPI,
61 pods []corev1.Pod,
62 portName string,
63 waitingTime time.Duration,
64 emitLogs bool,
65 ) []metricsResult {
66 var results []metricsResult
67
68 resultChan := make(chan metricsResult)
69 var activeRoutines int32
70 for _, pod := range pods {
71 atomic.AddInt32(&activeRoutines, 1)
72 go func(p corev1.Pod) {
73 defer atomic.AddInt32(&activeRoutines, -1)
74 containers, err := getAllContainersWithPort(p, portName)
75 if err != nil {
76 resultChan <- metricsResult{
77 pod: p.GetName(),
78 err: err,
79 }
80 return
81 }
82
83 for _, c := range containers {
84 bytes, err := k8s.GetContainerMetrics(k8sAPI, p, c, emitLogs, portName)
85
86 resultChan <- metricsResult{
87 pod: p.GetName(),
88 container: c.Name,
89 metrics: bytes,
90 err: err,
91 }
92 }
93 }(pod)
94 }
95
96 timeout := time.NewTimer(waitingTime)
97 defer timeout.Stop()
98 wait:
99 for {
100 select {
101 case result := <-resultChan:
102 results = append(results, result)
103 case <-timeout.C:
104 break wait
105 }
106 if atomic.LoadInt32(&activeRoutines) == 0 {
107 break
108 }
109 }
110
111 sort.Sort(byResult(results))
112
113 return results
114 }
115
116 var obfuscationMap = map[string]struct{}{
117 "authority": {},
118 "client_id": {},
119 "server_id": {},
120 "target_addr": {},
121 "dst_service": {},
122 "dst_namespace": {},
123 }
124
125 func obfuscateMetrics(metrics []byte) ([]byte, error) {
126 reader := bytes.NewReader(metrics)
127
128 var metricsParser expfmt.TextParser
129
130 parsedMetrics, err := metricsParser.TextToMetricFamilies(reader)
131 if err != nil {
132 return nil, err
133 }
134
135 var writer bytes.Buffer
136 for _, v := range parsedMetrics {
137 for _, m := range v.Metric {
138 for _, l := range m.Label {
139 if _, ok := obfuscationMap[l.GetName()]; ok {
140 obfuscatedValue := obfuscate(l.GetValue())
141 l.Value = &obfuscatedValue
142 }
143 }
144 }
145
146
147 expfmt.MetricFamilyToText(&writer, v)
148 }
149
150 return writer.Bytes(), nil
151 }
152
153 func obfuscate(s string) string {
154 hash := sha256.Sum256([]byte(s))
155 return fmt.Sprintf("%x", hash[:4])
156 }
157
View as plain text