1
16
17 package metrics
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "net"
24 "regexp"
25 "sync"
26 "time"
27
28 v1 "k8s.io/api/core/v1"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/fields"
31 "k8s.io/apimachinery/pkg/util/wait"
32 clientset "k8s.io/client-go/kubernetes"
33 "k8s.io/client-go/rest"
34 "k8s.io/klog/v2"
35
36 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
37 )
38
39 const (
40
41 kubeSchedulerPort = 10259
42
43 kubeControllerManagerPort = 10257
44
45 snapshotControllerPort = 9102
46 )
47
48
49
50
51
52 var MetricsGrabbingDisabledError = errors.New("metrics grabbing disabled")
53
54
55 type Collection struct {
56 APIServerMetrics APIServerMetrics
57 APIServerMetricsSLIs APIServerMetrics
58 ControllerManagerMetrics ControllerManagerMetrics
59 SnapshotControllerMetrics SnapshotControllerMetrics
60 KubeletMetrics map[string]KubeletMetrics
61 SchedulerMetrics SchedulerMetrics
62 ClusterAutoscalerMetrics ClusterAutoscalerMetrics
63 }
64
65
66 type Grabber struct {
67 client clientset.Interface
68 externalClient clientset.Interface
69 config *rest.Config
70 grabFromAPIServer bool
71 grabFromControllerManager bool
72 grabFromKubelets bool
73 grabFromScheduler bool
74 grabFromClusterAutoscaler bool
75 grabFromSnapshotController bool
76 kubeScheduler string
77 waitForSchedulerReadyOnce sync.Once
78 kubeControllerManager string
79 waitForControllerManagerReadyOnce sync.Once
80 snapshotController string
81 waitForSnapshotControllerReadyOnce sync.Once
82 }
83
84
85
86
87
88
89
90
91
92 func NewMetricsGrabber(ctx context.Context, c clientset.Interface, ec clientset.Interface, config *rest.Config, kubelets bool, scheduler bool, controllers bool, apiServer bool, clusterAutoscaler bool, snapshotController bool) (*Grabber, error) {
93
94 kubeScheduler := ""
95 kubeControllerManager := ""
96 snapshotControllerManager := ""
97
98 regKubeScheduler := regexp.MustCompile("kube-scheduler-.*")
99 regKubeControllerManager := regexp.MustCompile("kube-controller-manager-.*")
100 regSnapshotController := regexp.MustCompile("volume-snapshot-controller.*")
101
102 if (scheduler || controllers) && config == nil {
103 return nil, errors.New("a rest config is required for grabbing kube-controller and kube-controller-manager metrics")
104 }
105
106 podList, err := c.CoreV1().Pods(metav1.NamespaceSystem).List(ctx, metav1.ListOptions{})
107 if err != nil {
108 return nil, err
109 }
110 if len(podList.Items) < 1 {
111 klog.Warningf("Can't find any pods in namespace %s to grab metrics from", metav1.NamespaceSystem)
112 }
113 for _, pod := range podList.Items {
114 if regKubeScheduler.MatchString(pod.Name) {
115 kubeScheduler = pod.Name
116 }
117 if regKubeControllerManager.MatchString(pod.Name) {
118 kubeControllerManager = pod.Name
119 }
120 if regSnapshotController.MatchString(pod.Name) {
121 snapshotControllerManager = pod.Name
122 }
123 if kubeScheduler != "" && kubeControllerManager != "" && snapshotControllerManager != "" {
124 break
125 }
126 }
127 if clusterAutoscaler && ec == nil {
128 klog.Warningf("Did not receive an external client interface. Grabbing metrics from ClusterAutoscaler is disabled.")
129 }
130
131 return &Grabber{
132 client: c,
133 externalClient: ec,
134 config: config,
135 grabFromAPIServer: apiServer,
136 grabFromControllerManager: checkPodDebugHandlers(ctx, c, controllers, "kube-controller-manager", kubeControllerManager),
137 grabFromKubelets: kubelets,
138 grabFromScheduler: checkPodDebugHandlers(ctx, c, scheduler, "kube-scheduler", kubeScheduler),
139 grabFromClusterAutoscaler: clusterAutoscaler,
140 grabFromSnapshotController: checkPodDebugHandlers(ctx, c, snapshotController, "snapshot-controller", snapshotControllerManager),
141 kubeScheduler: kubeScheduler,
142 kubeControllerManager: kubeControllerManager,
143 snapshotController: snapshotControllerManager,
144 }, nil
145 }
146
147 func checkPodDebugHandlers(ctx context.Context, c clientset.Interface, requested bool, component, podName string) bool {
148 if !requested {
149 return false
150 }
151 if podName == "" {
152 klog.Warningf("Can't find %s pod. Grabbing metrics from %s is disabled.", component, component)
153 return false
154 }
155
156
157
158 limit := int64(1)
159 if _, err := c.CoreV1().Pods(metav1.NamespaceSystem).GetLogs(podName, &v1.PodLogOptions{LimitBytes: &limit}).DoRaw(ctx); err != nil {
160 klog.Warningf("Can't retrieve log output of %s (%q). Debug handlers might be disabled in kubelet. Grabbing metrics from %s is disabled.",
161 podName, err, component)
162 return false
163 }
164
165
166 return true
167 }
168
169
170 func (g *Grabber) HasControlPlanePods() bool {
171 return g.kubeScheduler != "" && g.kubeControllerManager != ""
172 }
173
174
175 func (g *Grabber) GrabFromKubelet(ctx context.Context, nodeName string) (KubeletMetrics, error) {
176 nodes, err := g.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{FieldSelector: fields.Set{"metadata.name": nodeName}.AsSelector().String()})
177 if err != nil {
178 return KubeletMetrics{}, err
179 }
180 if len(nodes.Items) != 1 {
181 return KubeletMetrics{}, fmt.Errorf("Error listing nodes with name %v, got %v", nodeName, nodes.Items)
182 }
183 kubeletPort := nodes.Items[0].Status.DaemonEndpoints.KubeletEndpoint.Port
184 return g.grabFromKubeletInternal(ctx, nodeName, int(kubeletPort), "metrics")
185 }
186
187
188 func (g *Grabber) GrabResourceMetricsFromKubelet(ctx context.Context, nodeName string) (KubeletMetrics, error) {
189 nodes, err := g.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{FieldSelector: fields.Set{"metadata.name": nodeName}.AsSelector().String()})
190 if err != nil {
191 return KubeletMetrics{}, err
192 }
193 if len(nodes.Items) != 1 {
194 return KubeletMetrics{}, fmt.Errorf("Error listing nodes with name %v, got %v", nodeName, nodes.Items)
195 }
196 kubeletPort := nodes.Items[0].Status.DaemonEndpoints.KubeletEndpoint.Port
197 return g.grabFromKubeletInternal(ctx, nodeName, int(kubeletPort), "metrics/resource")
198 }
199
200 func (g *Grabber) grabFromKubeletInternal(ctx context.Context, nodeName string, kubeletPort int, pathSuffix string) (KubeletMetrics, error) {
201 if kubeletPort <= 0 || kubeletPort > 65535 {
202 return KubeletMetrics{}, fmt.Errorf("Invalid Kubelet port %v. Skipping Kubelet's metrics gathering", kubeletPort)
203 }
204 output, err := g.getMetricsFromNode(ctx, nodeName, int(kubeletPort), pathSuffix)
205 if err != nil {
206 return KubeletMetrics{}, err
207 }
208 return parseKubeletMetrics(output)
209 }
210
211 func (g *Grabber) getMetricsFromNode(ctx context.Context, nodeName string, kubeletPort int, pathSuffix string) (string, error) {
212
213 finished := make(chan struct{}, 1)
214 var err error
215 var rawOutput []byte
216 go func() {
217 rawOutput, err = g.client.CoreV1().RESTClient().Get().
218 Resource("nodes").
219 SubResource("proxy").
220 Name(fmt.Sprintf("%v:%v", nodeName, kubeletPort)).
221 Suffix(pathSuffix).
222 Do(ctx).Raw()
223 finished <- struct{}{}
224 }()
225 select {
226 case <-time.After(proxyTimeout):
227 return "", fmt.Errorf("Timed out when waiting for proxy to gather metrics from %v", nodeName)
228 case <-finished:
229 if err != nil {
230 return "", err
231 }
232 return string(rawOutput), nil
233 }
234 }
235
236
237 func (g *Grabber) GrabFromScheduler(ctx context.Context) (SchedulerMetrics, error) {
238 if !g.grabFromScheduler {
239 return SchedulerMetrics{}, fmt.Errorf("kube-scheduler: %w", MetricsGrabbingDisabledError)
240 }
241
242 var err error
243
244 g.waitForSchedulerReadyOnce.Do(func() {
245 if readyErr := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, g.client, g.kubeScheduler, metav1.NamespaceSystem, 5*time.Minute); readyErr != nil {
246 err = fmt.Errorf("error waiting for kube-scheduler pod to be ready: %w", readyErr)
247 }
248 })
249 if err != nil {
250 return SchedulerMetrics{}, err
251 }
252
253 var lastMetricsFetchErr error
254 var output string
255 if metricsWaitErr := wait.PollUntilContextTimeout(ctx, time.Second, time.Minute, true, func(ctx context.Context) (bool, error) {
256 output, lastMetricsFetchErr = g.getSecureMetricsFromPod(ctx, g.kubeScheduler, metav1.NamespaceSystem, kubeSchedulerPort)
257 return lastMetricsFetchErr == nil, nil
258 }); metricsWaitErr != nil {
259 err := fmt.Errorf("error waiting for kube-scheduler pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
260 return SchedulerMetrics{}, err
261 }
262
263 return parseSchedulerMetrics(output)
264 }
265
266
267 func (g *Grabber) GrabFromClusterAutoscaler(ctx context.Context) (ClusterAutoscalerMetrics, error) {
268 if !g.HasControlPlanePods() && g.externalClient == nil {
269 return ClusterAutoscalerMetrics{}, fmt.Errorf("ClusterAutoscaler: %w", MetricsGrabbingDisabledError)
270 }
271 var client clientset.Interface
272 var namespace string
273 if g.externalClient != nil {
274 client = g.externalClient
275 namespace = "kubemark"
276 } else {
277 client = g.client
278 namespace = metav1.NamespaceSystem
279 }
280 output, err := g.getMetricsFromPod(ctx, client, "cluster-autoscaler", namespace, 8085)
281 if err != nil {
282 return ClusterAutoscalerMetrics{}, err
283 }
284 return parseClusterAutoscalerMetrics(output)
285 }
286
287
288 func (g *Grabber) GrabFromControllerManager(ctx context.Context) (ControllerManagerMetrics, error) {
289 if !g.grabFromControllerManager {
290 return ControllerManagerMetrics{}, fmt.Errorf("kube-controller-manager: %w", MetricsGrabbingDisabledError)
291 }
292
293 var err error
294
295 g.waitForControllerManagerReadyOnce.Do(func() {
296 if readyErr := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, g.client, g.kubeControllerManager, metav1.NamespaceSystem, 5*time.Minute); readyErr != nil {
297 err = fmt.Errorf("error waiting for kube-controller-manager pod to be ready: %w", readyErr)
298 }
299 })
300 if err != nil {
301 return ControllerManagerMetrics{}, err
302 }
303
304 var output string
305 var lastMetricsFetchErr error
306 if metricsWaitErr := wait.PollUntilContextTimeout(ctx, time.Second, time.Minute, true, func(ctx context.Context) (bool, error) {
307 output, lastMetricsFetchErr = g.getSecureMetricsFromPod(ctx, g.kubeControllerManager, metav1.NamespaceSystem, kubeControllerManagerPort)
308 return lastMetricsFetchErr == nil, nil
309 }); metricsWaitErr != nil {
310 err := fmt.Errorf("error waiting for kube-controller-manager to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
311 return ControllerManagerMetrics{}, err
312 }
313
314 return parseControllerManagerMetrics(output)
315 }
316
317
318 func (g *Grabber) GrabFromSnapshotController(ctx context.Context, podName string, port int) (SnapshotControllerMetrics, error) {
319 if !g.grabFromSnapshotController {
320 return SnapshotControllerMetrics{}, fmt.Errorf("volume-snapshot-controller: %w", MetricsGrabbingDisabledError)
321 }
322
323
324
325 if podName == "" {
326 podName = g.snapshotController
327 }
328 if port == 0 {
329 port = snapshotControllerPort
330 }
331
332 var err error
333
334 g.waitForSnapshotControllerReadyOnce.Do(func() {
335 if readyErr := e2epod.WaitTimeoutForPodReadyInNamespace(ctx, g.client, podName, metav1.NamespaceSystem, 5*time.Minute); readyErr != nil {
336 err = fmt.Errorf("error waiting for volume-snapshot-controller pod to be ready: %w", readyErr)
337 }
338 })
339 if err != nil {
340 return SnapshotControllerMetrics{}, err
341 }
342
343 var output string
344 var lastMetricsFetchErr error
345 if metricsWaitErr := wait.PollUntilContextTimeout(ctx, time.Second, time.Minute, true, func(ctx context.Context) (bool, error) {
346 output, lastMetricsFetchErr = g.getMetricsFromPod(ctx, g.client, podName, metav1.NamespaceSystem, port)
347 return lastMetricsFetchErr == nil, nil
348 }); metricsWaitErr != nil {
349 err = fmt.Errorf("error waiting for volume-snapshot-controller pod to expose metrics: %v; %v", metricsWaitErr, lastMetricsFetchErr)
350 return SnapshotControllerMetrics{}, err
351 }
352
353 return parseSnapshotControllerMetrics(output)
354 }
355
356
357 func (g *Grabber) GrabFromAPIServer(ctx context.Context) (APIServerMetrics, error) {
358 output, err := g.getMetricsFromAPIServer(ctx)
359 if err != nil {
360 return APIServerMetrics{}, err
361 }
362 return parseAPIServerMetrics(output)
363 }
364
365
366 func (g *Grabber) GrabMetricsSLIsFromAPIServer(ctx context.Context) (APIServerMetrics, error) {
367 output, err := g.getMetricsSLIsFromAPIServer(ctx)
368 if err != nil {
369 return APIServerMetrics{}, err
370 }
371 return parseAPIServerMetrics(output)
372 }
373
374 func (g *Grabber) getMetricsFromAPIServer(ctx context.Context) (string, error) {
375 rawOutput, err := g.client.CoreV1().RESTClient().Get().RequestURI("/metrics").Do(ctx).Raw()
376 if err != nil {
377 return "", err
378 }
379 return string(rawOutput), nil
380 }
381
382 func (g *Grabber) getMetricsSLIsFromAPIServer(ctx context.Context) (string, error) {
383 rawOutput, err := g.client.CoreV1().RESTClient().Get().RequestURI("/metrics/slis").Do(ctx).Raw()
384 if err != nil {
385 return "", err
386 }
387 return string(rawOutput), nil
388 }
389
390
391 func (g *Grabber) Grab(ctx context.Context) (Collection, error) {
392 result := Collection{}
393 var errs []error
394 if g.grabFromAPIServer {
395 metrics, err := g.GrabFromAPIServer(ctx)
396 if err != nil {
397 errs = append(errs, err)
398 } else {
399 result.APIServerMetrics = metrics
400 }
401 metrics, err = g.GrabMetricsSLIsFromAPIServer(ctx)
402 if err != nil {
403 errs = append(errs, err)
404 } else {
405 result.APIServerMetricsSLIs = metrics
406 }
407 }
408 if g.grabFromScheduler {
409 metrics, err := g.GrabFromScheduler(ctx)
410 if err != nil {
411 errs = append(errs, err)
412 } else {
413 result.SchedulerMetrics = metrics
414 }
415 }
416 if g.grabFromControllerManager {
417 metrics, err := g.GrabFromControllerManager(ctx)
418 if err != nil {
419 errs = append(errs, err)
420 } else {
421 result.ControllerManagerMetrics = metrics
422 }
423 }
424 if g.grabFromSnapshotController {
425 metrics, err := g.GrabFromSnapshotController(ctx, g.snapshotController, snapshotControllerPort)
426 if err != nil {
427 errs = append(errs, err)
428 } else {
429 result.SnapshotControllerMetrics = metrics
430 }
431 }
432 if g.grabFromClusterAutoscaler {
433 metrics, err := g.GrabFromClusterAutoscaler(ctx)
434 if err != nil {
435 errs = append(errs, err)
436 } else {
437 result.ClusterAutoscalerMetrics = metrics
438 }
439 }
440 if g.grabFromKubelets {
441 result.KubeletMetrics = make(map[string]KubeletMetrics)
442 nodes, err := g.client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
443 if err != nil {
444 errs = append(errs, err)
445 } else {
446 for _, node := range nodes.Items {
447 kubeletPort := node.Status.DaemonEndpoints.KubeletEndpoint.Port
448 metrics, err := g.grabFromKubeletInternal(ctx, node.Name, int(kubeletPort), "metrics")
449 if err != nil {
450 errs = append(errs, err)
451 }
452 result.KubeletMetrics[node.Name] = metrics
453 }
454 }
455 }
456 if len(errs) > 0 {
457 return result, fmt.Errorf("Errors while grabbing metrics: %v", errs)
458 }
459 return result, nil
460 }
461
462
463 func (g *Grabber) getMetricsFromPod(ctx context.Context, client clientset.Interface, podName string, namespace string, port int) (string, error) {
464 rawOutput, err := client.CoreV1().RESTClient().Get().
465 Namespace(namespace).
466 Resource("pods").
467 SubResource("proxy").
468 Name(fmt.Sprintf("%s:%d", podName, port)).
469 Suffix("metrics").
470 Do(ctx).Raw()
471 if err != nil {
472 return "", err
473 }
474 return string(rawOutput), nil
475 }
476
477
478
479
480
481
482 func (g *Grabber) getSecureMetricsFromPod(ctx context.Context, podName string, namespace string, port int) (string, error) {
483 dialer := e2epod.NewDialer(g.client, g.config)
484 metricConfig := rest.CopyConfig(g.config)
485 addr := e2epod.Addr{
486 Namespace: namespace,
487 PodName: podName,
488 Port: port,
489 }
490 metricConfig.Dial = func(ctx context.Context, network, address string) (net.Conn, error) {
491 return dialer.DialContainerPort(ctx, addr)
492 }
493
494
495
496 metricConfig.Host = addr.String()
497 metricConfig.ServerName = "localhost"
498
499
500
501
502
503 metricConfig.Insecure = true
504 metricConfig.CAFile = ""
505 metricConfig.CAData = nil
506
507
508
509
510 metricClient, err := clientset.NewForConfig(metricConfig)
511 if err != nil {
512 return "", err
513 }
514
515 rawOutput, err := metricClient.RESTClient().Get().
516 AbsPath("metrics").
517 Do(ctx).Raw()
518 if err != nil {
519 return "", err
520 }
521 return string(rawOutput), nil
522 }
523
View as plain text