...

Source file src/github.com/go-kit/kit/metrics/statsd/statsd.go

Documentation: github.com/go-kit/kit/metrics/statsd

     1  // Package statsd provides a StatsD backend for package metrics. StatsD has no
     2  // concept of arbitrary key-value tagging, so label values are not supported,
     3  // and With is a no-op on all metrics.
     4  //
     5  // This package batches observations and emits them on some schedule to the
     6  // remote server. This is useful even if you connect to your StatsD server over
     7  // UDP. Emitting one network packet per observation can quickly overwhelm even
     8  // the fastest internal network.
     9  package statsd
    10  
    11  import (
    12  	"context"
    13  	"fmt"
    14  	"io"
    15  	"time"
    16  
    17  	"github.com/go-kit/kit/metrics"
    18  	"github.com/go-kit/kit/metrics/internal/lv"
    19  	"github.com/go-kit/kit/metrics/internal/ratemap"
    20  	"github.com/go-kit/kit/util/conn"
    21  	"github.com/go-kit/log"
    22  )
    23  
    24  // Statsd receives metrics observations and forwards them to a StatsD server.
    25  // Create a Statsd object, use it to create metrics, and pass those metrics as
    26  // dependencies to the components that will use them.
    27  //
    28  // All metrics are buffered until WriteTo is called. Counters and gauges are
    29  // aggregated into a single observation per timeseries per write. Timings are
    30  // buffered but not aggregated.
    31  //
    32  // To regularly report metrics to an io.Writer, use the WriteLoop helper method.
    33  // To send to a StatsD server, use the SendLoop helper method.
    34  type Statsd struct {
    35  	prefix string
    36  	rates  *ratemap.RateMap
    37  
    38  	// The observations are collected in an N-dimensional vector space, even
    39  	// though they only take advantage of a single dimension (name). This is an
    40  	// implementation detail born purely from convenience. It would be more
    41  	// accurate to collect them in a map[string][]float64, but we already have
    42  	// this nice data structure and helper methods.
    43  	counters *lv.Space
    44  	gauges   *lv.Space
    45  	timings  *lv.Space
    46  
    47  	logger log.Logger
    48  }
    49  
    50  // New returns a Statsd object that may be used to create metrics. Prefix is
    51  // applied to all created metrics. Callers must ensure that regular calls to
    52  // WriteTo are performed, either manually or with one of the helper methods.
    53  func New(prefix string, logger log.Logger) *Statsd {
    54  	return &Statsd{
    55  		prefix:   prefix,
    56  		rates:    ratemap.New(),
    57  		counters: lv.NewSpace(),
    58  		gauges:   lv.NewSpace(),
    59  		timings:  lv.NewSpace(),
    60  		logger:   logger,
    61  	}
    62  }
    63  
    64  // NewCounter returns a counter, sending observations to this Statsd object.
    65  func (s *Statsd) NewCounter(name string, sampleRate float64) *Counter {
    66  	s.rates.Set(s.prefix+name, sampleRate)
    67  	return &Counter{
    68  		name: s.prefix + name,
    69  		obs:  s.counters.Observe,
    70  	}
    71  }
    72  
    73  // NewGauge returns a gauge, sending observations to this Statsd object.
    74  func (s *Statsd) NewGauge(name string) *Gauge {
    75  	return &Gauge{
    76  		name: s.prefix + name,
    77  		obs:  s.gauges.Observe,
    78  		add:  s.gauges.Add,
    79  	}
    80  }
    81  
    82  // NewTiming returns a histogram whose observations are interpreted as
    83  // millisecond durations, and are forwarded to this Statsd object.
    84  func (s *Statsd) NewTiming(name string, sampleRate float64) *Timing {
    85  	s.rates.Set(s.prefix+name, sampleRate)
    86  	return &Timing{
    87  		name: s.prefix + name,
    88  		obs:  s.timings.Observe,
    89  	}
    90  }
    91  
    92  // WriteLoop is a helper method that invokes WriteTo to the passed writer every
    93  // time the passed channel fires. This method blocks until ctx is canceled,
    94  // so clients probably want to run it in its own goroutine. For typical
    95  // usage, create a time.Ticker and pass its C channel to this method.
    96  func (s *Statsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
    97  	for {
    98  		select {
    99  		case <-c:
   100  			if _, err := s.WriteTo(w); err != nil {
   101  				s.logger.Log("during", "WriteTo", "err", err)
   102  			}
   103  		case <-ctx.Done():
   104  			return
   105  		}
   106  	}
   107  }
   108  
   109  // SendLoop is a helper method that wraps WriteLoop, passing a managed
   110  // connection to the network and address. Like WriteLoop, this method blocks
   111  // until ctx is canceled, so clients probably want to start it in its own
   112  // goroutine. For typical usage, create a time.Ticker and pass its C channel to
   113  // this method.
   114  func (s *Statsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
   115  	s.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, s.logger))
   116  }
   117  
   118  // WriteTo flushes the buffered content of the metrics to the writer, in
   119  // StatsD format. WriteTo abides best-effort semantics, so observations are
   120  // lost if there is a problem with the write. Clients should be sure to call
   121  // WriteTo regularly, ideally through the WriteLoop or SendLoop helper methods.
   122  func (s *Statsd) WriteTo(w io.Writer) (count int64, err error) {
   123  	var n int
   124  
   125  	s.counters.Reset().Walk(func(name string, _ lv.LabelValues, values []float64) bool {
   126  		n, err = fmt.Fprintf(w, "%s:%f|c%s\n", name, sum(values), sampling(s.rates.Get(name)))
   127  		if err != nil {
   128  			return false
   129  		}
   130  		count += int64(n)
   131  		return true
   132  	})
   133  	if err != nil {
   134  		return count, err
   135  	}
   136  
   137  	s.gauges.Reset().Walk(func(name string, _ lv.LabelValues, values []float64) bool {
   138  		n, err = fmt.Fprintf(w, "%s:%f|g\n", name, last(values))
   139  		if err != nil {
   140  			return false
   141  		}
   142  		count += int64(n)
   143  		return true
   144  	})
   145  	if err != nil {
   146  		return count, err
   147  	}
   148  
   149  	s.timings.Reset().Walk(func(name string, _ lv.LabelValues, values []float64) bool {
   150  		sampleRate := s.rates.Get(name)
   151  		for _, value := range values {
   152  			n, err = fmt.Fprintf(w, "%s:%f|ms%s\n", name, value, sampling(sampleRate))
   153  			if err != nil {
   154  				return false
   155  			}
   156  			count += int64(n)
   157  		}
   158  		return true
   159  	})
   160  	if err != nil {
   161  		return count, err
   162  	}
   163  
   164  	return count, err
   165  }
   166  
   167  func sum(a []float64) float64 {
   168  	var v float64
   169  	for _, f := range a {
   170  		v += f
   171  	}
   172  	return v
   173  }
   174  
   175  func last(a []float64) float64 {
   176  	return a[len(a)-1]
   177  }
   178  
   179  func sampling(r float64) string {
   180  	var sv string
   181  	if r < 1.0 {
   182  		sv = fmt.Sprintf("|@%f", r)
   183  	}
   184  	return sv
   185  }
   186  
   187  type observeFunc func(name string, lvs lv.LabelValues, value float64)
   188  
   189  // Counter is a StatsD counter. Observations are forwarded to a Statsd object,
   190  // and aggregated (summed) per timeseries.
   191  type Counter struct {
   192  	name string
   193  	obs  observeFunc
   194  }
   195  
   196  // With is a no-op.
   197  func (c *Counter) With(...string) metrics.Counter {
   198  	return c
   199  }
   200  
   201  // Add implements metrics.Counter.
   202  func (c *Counter) Add(delta float64) {
   203  	c.obs(c.name, lv.LabelValues{}, delta)
   204  }
   205  
   206  // Gauge is a StatsD gauge. Observations are forwarded to a Statsd object, and
   207  // aggregated (the last observation selected) per timeseries.
   208  type Gauge struct {
   209  	name string
   210  	obs  observeFunc
   211  	add  observeFunc
   212  }
   213  
   214  // With is a no-op.
   215  func (g *Gauge) With(...string) metrics.Gauge {
   216  	return g
   217  }
   218  
   219  // Set implements metrics.Gauge.
   220  func (g *Gauge) Set(value float64) {
   221  	g.obs(g.name, lv.LabelValues{}, value)
   222  }
   223  
   224  // Add implements metrics.Gauge.
   225  func (g *Gauge) Add(delta float64) {
   226  	g.add(g.name, lv.LabelValues{}, delta)
   227  }
   228  
   229  // Timing is a StatsD timing, or metrics.Histogram. Observations are
   230  // forwarded to a Statsd object, and collected (but not aggregated) per
   231  // timeseries.
   232  type Timing struct {
   233  	name string
   234  	obs  observeFunc
   235  }
   236  
   237  // With is a no-op.
   238  func (t *Timing) With(...string) metrics.Histogram {
   239  	return t
   240  }
   241  
   242  // Observe implements metrics.Histogram. Value is interpreted as milliseconds.
   243  func (t *Timing) Observe(value float64) {
   244  	t.obs(t.name, lv.LabelValues{}, value)
   245  }
   246  

View as plain text