...

Source file src/github.com/linkerd/linkerd2/cli/cmd/metrics_diagnostics_util.go

Documentation: github.com/linkerd/linkerd2/cli/cmd

     1  package cmd
     2  
     3  import (
     4  	"bytes"
     5  	"crypto/sha256"
     6  	"fmt"
     7  	"sort"
     8  	"sync/atomic"
     9  	"time"
    10  
    11  	"github.com/linkerd/linkerd2/pkg/k8s"
    12  	"github.com/prometheus/common/expfmt"
    13  	corev1 "k8s.io/api/core/v1"
    14  )
    15  
    16  // shared between metrics and diagnostics command
    17  type metricsResult struct {
    18  	pod       string
    19  	container string
    20  	metrics   []byte
    21  	err       error
    22  }
    23  type byResult []metricsResult
    24  
    25  func (s byResult) Len() int {
    26  	return len(s)
    27  }
    28  func (s byResult) Swap(i, j int) {
    29  	s[i], s[j] = s[j], s[i]
    30  }
    31  func (s byResult) Less(i, j int) bool {
    32  	return s[i].pod < s[j].pod || ((s[i].pod == s[j].pod) && s[i].container < s[j].container)
    33  }
    34  
    35  // getAllContainersWithPort returns all the containers within
    36  // a pod which exposes metrics at a port with name portName
    37  func getAllContainersWithPort(
    38  	pod corev1.Pod,
    39  	portName string,
    40  ) ([]corev1.Container, error) {
    41  	if pod.Status.Phase != corev1.PodRunning {
    42  		return nil, fmt.Errorf("pod not running: %s", pod.GetName())
    43  	}
    44  	var containers []corev1.Container
    45  
    46  	allContainers := append(pod.Spec.InitContainers, pod.Spec.Containers...)
    47  	for _, c := range allContainers {
    48  		for _, p := range c.Ports {
    49  			if p.Name == portName {
    50  				containers = append(containers, c)
    51  			}
    52  		}
    53  	}
    54  	return containers, nil
    55  }
    56  
    57  // getMetrics returns the metrics exposed by all the containers of the passed in list of pods
    58  // which exposes their metrics at portName
    59  func getMetrics(
    60  	k8sAPI *k8s.KubernetesAPI,
    61  	pods []corev1.Pod,
    62  	portName string,
    63  	waitingTime time.Duration,
    64  	emitLogs bool,
    65  ) []metricsResult {
    66  	var results []metricsResult
    67  
    68  	resultChan := make(chan metricsResult)
    69  	var activeRoutines int32
    70  	for _, pod := range pods {
    71  		atomic.AddInt32(&activeRoutines, 1)
    72  		go func(p corev1.Pod) {
    73  			defer atomic.AddInt32(&activeRoutines, -1)
    74  			containers, err := getAllContainersWithPort(p, portName)
    75  			if err != nil {
    76  				resultChan <- metricsResult{
    77  					pod: p.GetName(),
    78  					err: err,
    79  				}
    80  				return
    81  			}
    82  
    83  			for _, c := range containers {
    84  				bytes, err := k8s.GetContainerMetrics(k8sAPI, p, c, emitLogs, portName)
    85  
    86  				resultChan <- metricsResult{
    87  					pod:       p.GetName(),
    88  					container: c.Name,
    89  					metrics:   bytes,
    90  					err:       err,
    91  				}
    92  			}
    93  		}(pod)
    94  	}
    95  
    96  	timeout := time.NewTimer(waitingTime)
    97  	defer timeout.Stop()
    98  wait:
    99  	for {
   100  		select {
   101  		case result := <-resultChan:
   102  			results = append(results, result)
   103  		case <-timeout.C:
   104  			break wait // timed out
   105  		}
   106  		if atomic.LoadInt32(&activeRoutines) == 0 {
   107  			break
   108  		}
   109  	}
   110  
   111  	sort.Sort(byResult(results))
   112  
   113  	return results
   114  }
   115  
   116  var obfuscationMap = map[string]struct{}{
   117  	"authority":     {},
   118  	"client_id":     {},
   119  	"server_id":     {},
   120  	"target_addr":   {},
   121  	"dst_service":   {},
   122  	"dst_namespace": {},
   123  }
   124  
   125  func obfuscateMetrics(metrics []byte) ([]byte, error) {
   126  	reader := bytes.NewReader(metrics)
   127  
   128  	var metricsParser expfmt.TextParser
   129  
   130  	parsedMetrics, err := metricsParser.TextToMetricFamilies(reader)
   131  	if err != nil {
   132  		return nil, err
   133  	}
   134  
   135  	var writer bytes.Buffer
   136  	for _, v := range parsedMetrics {
   137  		for _, m := range v.Metric {
   138  			for _, l := range m.Label {
   139  				if _, ok := obfuscationMap[l.GetName()]; ok {
   140  					obfuscatedValue := obfuscate(l.GetValue())
   141  					l.Value = &obfuscatedValue
   142  				}
   143  			}
   144  		}
   145  		// We'll assume MetricFamilyToText errors are insignificant
   146  		//nolint:errcheck
   147  		expfmt.MetricFamilyToText(&writer, v)
   148  	}
   149  
   150  	return writer.Bytes(), nil
   151  }
   152  
   153  func obfuscate(s string) string {
   154  	hash := sha256.Sum256([]byte(s))
   155  	return fmt.Sprintf("%x", hash[:4])
   156  }
   157  

View as plain text