1
16
17 package metrics
18
19 import (
20 "context"
21 "fmt"
22 "io"
23 "net/http"
24 "sort"
25 "strconv"
26 "strings"
27 "time"
28
29 "k8s.io/apimachinery/pkg/util/sets"
30 clientset "k8s.io/client-go/kubernetes"
31 "k8s.io/component-base/metrics/testutil"
32 "k8s.io/kubernetes/test/e2e/framework"
33 )
34
35 const (
36 proxyTimeout = 2 * time.Minute
37
38
39 dockerOperationsLatencyKey = "docker_operations_duration_seconds"
40
41 kubeletSubsystem = "kubelet"
42
43 podWorkerDurationKey = "pod_worker_duration_seconds"
44
45 podStartDurationKey = "pod_start_duration_seconds"
46
47 podStartSLIDurationKey = "pod_start_sli_duration_seconds"
48
49 cgroupManagerOperationsKey = "cgroup_manager_duration_seconds"
50
51 podWorkerStartDurationKey = "pod_worker_start_duration_seconds"
52
53 plegRelistDurationKey = "pleg_relist_duration_seconds"
54 )
55
56
57 type KubeletMetrics testutil.Metrics
58
59
60 func (m *KubeletMetrics) Equal(o KubeletMetrics) bool {
61 return (*testutil.Metrics)(m).Equal(testutil.Metrics(o))
62 }
63
64
65 func NewKubeletMetrics() KubeletMetrics {
66 result := testutil.NewMetrics()
67 return KubeletMetrics(result)
68 }
69
70
71 func GrabKubeletMetricsWithoutProxy(ctx context.Context, nodeName, path string) (KubeletMetrics, error) {
72 req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("http://%s%s", nodeName, path), nil)
73 if err != nil {
74 return KubeletMetrics{}, err
75 }
76 resp, err := http.DefaultClient.Do(req)
77 if err != nil {
78 return KubeletMetrics{}, err
79 }
80 defer resp.Body.Close()
81 body, err := io.ReadAll(resp.Body)
82 if err != nil {
83 return KubeletMetrics{}, err
84 }
85 return parseKubeletMetrics(string(body))
86 }
87
88 func parseKubeletMetrics(data string) (KubeletMetrics, error) {
89 result := NewKubeletMetrics()
90 if err := testutil.ParseMetrics(data, (*testutil.Metrics)(&result)); err != nil {
91 return KubeletMetrics{}, err
92 }
93 return result, nil
94 }
95
96
97
98 type KubeletLatencyMetric struct {
99
100 Operation string
101
102 Method string
103
104 Quantile float64
105 Latency time.Duration
106 }
107
108
109
110 type KubeletLatencyMetrics []KubeletLatencyMetric
111
112 func (a KubeletLatencyMetrics) Len() int { return len(a) }
113 func (a KubeletLatencyMetrics) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
114 func (a KubeletLatencyMetrics) Less(i, j int) bool { return a[i].Latency > a[j].Latency }
115
116
117
118 func getKubeletMetricsFromNode(ctx context.Context, c clientset.Interface, nodeName string) (KubeletMetrics, error) {
119 if c == nil {
120 return GrabKubeletMetricsWithoutProxy(ctx, nodeName, "/metrics")
121 }
122 grabber, err := NewMetricsGrabber(ctx, c, nil, nil, true, false, false, false, false, false)
123 if err != nil {
124 return KubeletMetrics{}, err
125 }
126 return grabber.GrabFromKubelet(ctx, nodeName)
127 }
128
129
130
131 func GetKubeletMetrics(ctx context.Context, c clientset.Interface, nodeName string) (KubeletMetrics, error) {
132 ms, err := getKubeletMetricsFromNode(ctx, c, nodeName)
133 if err != nil {
134 return KubeletMetrics{}, err
135 }
136
137 kubeletMetrics := make(KubeletMetrics)
138 for name, samples := range ms {
139 const prefix = kubeletSubsystem + "_"
140 if !strings.HasPrefix(name, prefix) {
141
142 continue
143 }
144 method := strings.TrimPrefix(name, prefix)
145 kubeletMetrics[method] = samples
146 }
147 return kubeletMetrics, nil
148 }
149
150
151
152
153 func GetDefaultKubeletLatencyMetrics(ms KubeletMetrics) KubeletLatencyMetrics {
154 latencyMetricNames := sets.NewString(
155 podWorkerDurationKey,
156 podWorkerStartDurationKey,
157 podStartDurationKey,
158 podStartSLIDurationKey,
159 cgroupManagerOperationsKey,
160 dockerOperationsLatencyKey,
161 podWorkerStartDurationKey,
162 plegRelistDurationKey,
163 )
164 return GetKubeletLatencyMetrics(ms, latencyMetricNames)
165 }
166
167
168
169 func GetKubeletLatencyMetrics(ms KubeletMetrics, filterMetricNames sets.String) KubeletLatencyMetrics {
170 var latencyMetrics KubeletLatencyMetrics
171 for name, samples := range ms {
172 if !filterMetricNames.Has(name) {
173 continue
174 }
175 for _, sample := range samples {
176 latency := sample.Value
177 operation := string(sample.Metric["operation_type"])
178 var quantile float64
179 if val, ok := sample.Metric[testutil.QuantileLabel]; ok {
180 var err error
181 if quantile, err = strconv.ParseFloat(string(val), 64); err != nil {
182 continue
183 }
184 }
185
186 latencyMetrics = append(latencyMetrics, KubeletLatencyMetric{
187 Operation: operation,
188 Method: name,
189 Quantile: quantile,
190 Latency: time.Duration(int64(latency)) * time.Microsecond,
191 })
192 }
193 }
194 return latencyMetrics
195 }
196
197
198 func HighLatencyKubeletOperations(ctx context.Context, c clientset.Interface, threshold time.Duration, nodeName string, logFunc func(fmt string, args ...interface{})) (KubeletLatencyMetrics, error) {
199 ms, err := GetKubeletMetrics(ctx, c, nodeName)
200 if err != nil {
201 return KubeletLatencyMetrics{}, err
202 }
203 latencyMetrics := GetDefaultKubeletLatencyMetrics(ms)
204 sort.Sort(latencyMetrics)
205 var badMetrics KubeletLatencyMetrics
206 logFunc("\nLatency metrics for node %v", nodeName)
207 for _, m := range latencyMetrics {
208 if m.Latency > threshold {
209 badMetrics = append(badMetrics, m)
210 framework.Logf("%+v", m)
211 }
212 }
213 return badMetrics, nil
214 }
215
View as plain text