...

Source file src/github.com/linkerd/linkerd2/multicluster/service-mirror/probe_worker.go

Documentation: github.com/linkerd/linkerd2/multicluster/service-mirror

     1  package servicemirror
     2  
     3  import (
     4  	"fmt"
     5  	"net"
     6  	"net/http"
     7  	"strconv"
     8  	"sync"
     9  	"time"
    10  
    11  	"github.com/linkerd/linkerd2/pkg/multicluster"
    12  	"github.com/prometheus/client_golang/prometheus"
    13  	logging "github.com/sirupsen/logrus"
    14  )
    15  
    16  const httpGatewayTimeoutMillis = 50000
    17  
    18  // ProbeWorker is responsible for monitoring gateways using a probe specification
    19  type ProbeWorker struct {
    20  	localGatewayName string
    21  	alive            bool
    22  	Liveness         chan bool
    23  	*sync.RWMutex
    24  	probeSpec *multicluster.ProbeSpec
    25  	stopCh    chan struct{}
    26  	metrics   *ProbeMetrics
    27  	log       *logging.Entry
    28  }
    29  
    30  // NewProbeWorker creates a new probe worker associated with a particular gateway
    31  func NewProbeWorker(localGatewayName string, spec *multicluster.ProbeSpec, metrics *ProbeMetrics, probekey string) *ProbeWorker {
    32  	metrics.gatewayEnabled.Set(1)
    33  	return &ProbeWorker{
    34  		localGatewayName: localGatewayName,
    35  		Liveness:         make(chan bool, 10),
    36  		RWMutex:          &sync.RWMutex{},
    37  		probeSpec:        spec,
    38  		stopCh:           make(chan struct{}),
    39  		metrics:          metrics,
    40  		log: logging.WithFields(logging.Fields{
    41  			"probe-key": probekey,
    42  		}),
    43  	}
    44  }
    45  
    46  // UpdateProbeSpec is used to update the probe specification when something about the gateway changes
    47  func (pw *ProbeWorker) UpdateProbeSpec(spec *multicluster.ProbeSpec) {
    48  	pw.Lock()
    49  	pw.probeSpec = spec
    50  	pw.Unlock()
    51  }
    52  
    53  // Stop this probe worker
    54  func (pw *ProbeWorker) Stop() {
    55  	pw.metrics.unregister()
    56  	pw.log.Infof("Stopping probe worker")
    57  	close(pw.stopCh)
    58  }
    59  
    60  // Start this probe worker
    61  func (pw *ProbeWorker) Start() {
    62  
    63  	pw.log.Infof("Starting probe worker")
    64  	go pw.run()
    65  }
    66  
    67  func (pw *ProbeWorker) run() {
    68  	probeTickerPeriod := pw.probeSpec.Period
    69  	maxJitter := pw.probeSpec.Period / 10 // max jitter is 10% of period
    70  	probeTicker := NewTicker(probeTickerPeriod, maxJitter)
    71  	defer probeTicker.Stop()
    72  
    73  probeLoop:
    74  	for {
    75  		select {
    76  		case <-pw.stopCh:
    77  			break probeLoop
    78  		case <-probeTicker.C:
    79  			pw.doProbe()
    80  		}
    81  	}
    82  }
    83  
    84  func (pw *ProbeWorker) doProbe() {
    85  	pw.RLock()
    86  	defer pw.RUnlock()
    87  
    88  	successLabel := prometheus.Labels{probeSuccessfulLabel: "true"}
    89  	notSuccessLabel := prometheus.Labels{probeSuccessfulLabel: "false"}
    90  
    91  	client := http.Client{
    92  		Timeout: httpGatewayTimeoutMillis * time.Millisecond,
    93  	}
    94  
    95  	strPort := strconv.Itoa(int(pw.probeSpec.Port))
    96  	urlAddress := net.JoinHostPort(pw.localGatewayName, strPort)
    97  	req, err := http.NewRequest("GET", fmt.Sprintf("http://%s%s", urlAddress, pw.probeSpec.Path), nil)
    98  	if err != nil {
    99  		pw.log.Errorf("Could not create a GET request to gateway: %s", err)
   100  		return
   101  	}
   102  
   103  	start := time.Now()
   104  	resp, err := client.Do(req)
   105  	end := time.Since(start)
   106  	if err != nil {
   107  		pw.log.Warnf("Problem connecting with gateway. Marking as unhealthy %s", err)
   108  		pw.metrics.alive.Set(0)
   109  		pw.metrics.probes.With(notSuccessLabel).Inc()
   110  		if pw.alive {
   111  			pw.alive = false
   112  			pw.Liveness <- false
   113  		}
   114  		return
   115  	}
   116  	if resp.StatusCode != 200 {
   117  		pw.log.Warnf("Gateway returned unexpected status %d. Marking as unhealthy", resp.StatusCode)
   118  		pw.metrics.alive.Set(0)
   119  		pw.metrics.probes.With(notSuccessLabel).Inc()
   120  		if pw.alive {
   121  			pw.alive = false
   122  			pw.Liveness <- false
   123  		}
   124  	} else {
   125  		pw.log.Debug("Gateway is healthy")
   126  		pw.metrics.alive.Set(1)
   127  		pw.metrics.latency.Set(float64(end.Milliseconds()))
   128  		pw.metrics.latencies.Observe(float64(end.Milliseconds()))
   129  		pw.metrics.probes.With(successLabel).Inc()
   130  		if !pw.alive {
   131  			pw.alive = true
   132  			pw.Liveness <- true
   133  		}
   134  	}
   135  
   136  	if err := resp.Body.Close(); err != nil {
   137  		pw.log.Warnf("Failed to close response body %s", err)
   138  	}
   139  
   140  }
   141  

View as plain text