...

Source file src/github.com/go-kit/kit/metrics/dogstatsd/dogstatsd.go

Documentation: github.com/go-kit/kit/metrics/dogstatsd

     1  // Package dogstatsd provides a DogStatsD backend for package metrics. It's very
     2  // similar to StatsD, but supports arbitrary tags per-metric, which map to Go
     3  // kit's label values. So, while label values are no-ops in StatsD, they are
     4  // supported here. For more details, see the documentation at
     5  // http://docs.datadoghq.com/guides/dogstatsd/.
     6  //
     7  // This package batches observations and emits them on some schedule to the
     8  // remote server. This is useful even if you connect to your DogStatsD server
     9  // over UDP. Emitting one network packet per observation can quickly overwhelm
    10  // even the fastest internal network.
    11  package dogstatsd
    12  
    13  import (
    14  	"context"
    15  	"fmt"
    16  	"io"
    17  	"math/rand"
    18  	"strings"
    19  	"sync"
    20  	"sync/atomic"
    21  	"time"
    22  
    23  	"github.com/go-kit/kit/metrics"
    24  	"github.com/go-kit/kit/metrics/generic"
    25  	"github.com/go-kit/kit/metrics/internal/lv"
    26  	"github.com/go-kit/kit/metrics/internal/ratemap"
    27  	"github.com/go-kit/kit/util/conn"
    28  	"github.com/go-kit/log"
    29  )
    30  
    31  // Dogstatsd receives metrics observations and forwards them to a DogStatsD
    32  // server. Create a Dogstatsd object, use it to create metrics, and pass those
    33  // metrics as dependencies to the components that will use them.
    34  //
    35  // All metrics are buffered until WriteTo is called. Counters and gauges are
    36  // aggregated into a single observation per timeseries per write. Timings and
    37  // histograms are buffered but not aggregated.
    38  //
    39  // To regularly report metrics to an io.Writer, use the WriteLoop helper method.
    40  // To send to a DogStatsD server, use the SendLoop helper method.
    41  type Dogstatsd struct {
    42  	mtx        sync.RWMutex
    43  	prefix     string
    44  	rates      *ratemap.RateMap
    45  	counters   *lv.Space
    46  	gauges     map[string]*gaugeNode
    47  	timings    *lv.Space
    48  	histograms *lv.Space
    49  	logger     log.Logger
    50  	lvs        lv.LabelValues
    51  }
    52  
    53  // New returns a Dogstatsd object that may be used to create metrics. Prefix is
    54  // applied to all created metrics. Callers must ensure that regular calls to
    55  // WriteTo are performed, either manually or with one of the helper methods.
    56  func New(prefix string, logger log.Logger, lvs ...string) *Dogstatsd {
    57  	if len(lvs)%2 != 0 {
    58  		panic("odd number of LabelValues; programmer error!")
    59  	}
    60  	return &Dogstatsd{
    61  		prefix:     prefix,
    62  		rates:      ratemap.New(),
    63  		counters:   lv.NewSpace(),
    64  		gauges:     map[string]*gaugeNode{},
    65  		timings:    lv.NewSpace(),
    66  		histograms: lv.NewSpace(),
    67  		logger:     logger,
    68  		lvs:        lvs,
    69  	}
    70  }
    71  
    72  // NewCounter returns a counter, sending observations to this Dogstatsd object.
    73  func (d *Dogstatsd) NewCounter(name string, sampleRate float64) *Counter {
    74  	d.rates.Set(name, sampleRate)
    75  	return &Counter{
    76  		name: name,
    77  		obs:  sampleObservations(d.counters.Observe, sampleRate),
    78  	}
    79  }
    80  
    81  // NewGauge returns a gauge, sending observations to this Dogstatsd object.
    82  func (d *Dogstatsd) NewGauge(name string) *Gauge {
    83  	d.mtx.Lock()
    84  	n, ok := d.gauges[name]
    85  	if !ok {
    86  		n = &gaugeNode{gauge: &Gauge{g: generic.NewGauge(name), ddog: d}}
    87  		d.gauges[name] = n
    88  	}
    89  	d.mtx.Unlock()
    90  	return n.gauge
    91  }
    92  
    93  // NewTiming returns a histogram whose observations are interpreted as
    94  // millisecond durations, and are forwarded to this Dogstatsd object.
    95  func (d *Dogstatsd) NewTiming(name string, sampleRate float64) *Timing {
    96  	d.rates.Set(name, sampleRate)
    97  	return &Timing{
    98  		name: name,
    99  		obs:  sampleObservations(d.timings.Observe, sampleRate),
   100  	}
   101  }
   102  
   103  // NewHistogram returns a histogram whose observations are of an unspecified
   104  // unit, and are forwarded to this Dogstatsd object.
   105  func (d *Dogstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
   106  	d.rates.Set(name, sampleRate)
   107  	return &Histogram{
   108  		name: name,
   109  		obs:  sampleObservations(d.histograms.Observe, sampleRate),
   110  	}
   111  }
   112  
   113  // WriteLoop is a helper method that invokes WriteTo to the passed writer every
   114  // time the passed channel fires. This method blocks until ctx is canceled,
   115  // so clients probably want to run it in its own goroutine. For typical
   116  // usage, create a time.Ticker and pass its C channel to this method.
   117  func (d *Dogstatsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
   118  	for {
   119  		select {
   120  		case <-c:
   121  			if _, err := d.WriteTo(w); err != nil {
   122  				d.logger.Log("during", "WriteTo", "err", err)
   123  			}
   124  		case <-ctx.Done():
   125  			return
   126  		}
   127  	}
   128  }
   129  
   130  // SendLoop is a helper method that wraps WriteLoop, passing a managed
   131  // connection to the network and address. Like WriteLoop, this method blocks
   132  // until ctx is canceled, so clients probably want to start it in its own
   133  // goroutine. For typical usage, create a time.Ticker and pass its C channel to
   134  // this method.
   135  func (d *Dogstatsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
   136  	d.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, d.logger))
   137  }
   138  
   139  // WriteTo flushes the buffered content of the metrics to the writer, in
   140  // DogStatsD format. WriteTo abides best-effort semantics, so observations are
   141  // lost if there is a problem with the write. Clients should be sure to call
   142  // WriteTo regularly, ideally through the WriteLoop or SendLoop helper methods.
   143  func (d *Dogstatsd) WriteTo(w io.Writer) (count int64, err error) {
   144  	var n int
   145  
   146  	d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   147  		n, err = fmt.Fprintf(w, "%s%s:%f|c%s%s\n", d.prefix, name, sum(values), sampling(d.rates.Get(name)), d.tagValues(lvs))
   148  		if err != nil {
   149  			return false
   150  		}
   151  		count += int64(n)
   152  		return true
   153  	})
   154  	if err != nil {
   155  		return count, err
   156  	}
   157  
   158  	d.mtx.RLock()
   159  	for _, root := range d.gauges {
   160  		root.walk(func(name string, lvs lv.LabelValues, value float64) bool {
   161  			n, err = fmt.Fprintf(w, "%s%s:%f|g%s\n", d.prefix, name, value, d.tagValues(lvs))
   162  			if err != nil {
   163  				return false
   164  			}
   165  			count += int64(n)
   166  			return true
   167  		})
   168  	}
   169  	d.mtx.RUnlock()
   170  
   171  	d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   172  		sampleRate := d.rates.Get(name)
   173  		for _, value := range values {
   174  			n, err = fmt.Fprintf(w, "%s%s:%f|ms%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs))
   175  			if err != nil {
   176  				return false
   177  			}
   178  			count += int64(n)
   179  		}
   180  		return true
   181  	})
   182  	if err != nil {
   183  		return count, err
   184  	}
   185  
   186  	d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   187  		sampleRate := d.rates.Get(name)
   188  		for _, value := range values {
   189  			n, err = fmt.Fprintf(w, "%s%s:%f|h%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs))
   190  			if err != nil {
   191  				return false
   192  			}
   193  			count += int64(n)
   194  		}
   195  		return true
   196  	})
   197  	if err != nil {
   198  		return count, err
   199  	}
   200  
   201  	return count, err
   202  }
   203  
   204  func sum(a []float64) float64 {
   205  	var v float64
   206  	for _, f := range a {
   207  		v += f
   208  	}
   209  	return v
   210  }
   211  
   212  func sampling(r float64) string {
   213  	var sv string
   214  	if r < 1.0 {
   215  		sv = fmt.Sprintf("|@%f", r)
   216  	}
   217  	return sv
   218  }
   219  
   220  func (d *Dogstatsd) tagValues(labelValues []string) string {
   221  	if len(labelValues) == 0 && len(d.lvs) == 0 {
   222  		return ""
   223  	}
   224  	if len(labelValues)%2 != 0 {
   225  		panic("tagValues received a labelValues with an odd number of strings")
   226  	}
   227  	pairs := make([]string, 0, (len(d.lvs)+len(labelValues))/2)
   228  	for i := 0; i < len(d.lvs); i += 2 {
   229  		pairs = append(pairs, d.lvs[i]+":"+d.lvs[i+1])
   230  	}
   231  	for i := 0; i < len(labelValues); i += 2 {
   232  		pairs = append(pairs, labelValues[i]+":"+labelValues[i+1])
   233  	}
   234  	return "|#" + strings.Join(pairs, ",")
   235  }
   236  
   237  type observeFunc func(name string, lvs lv.LabelValues, value float64)
   238  
   239  // sampleObservations returns a modified observeFunc that samples observations.
   240  func sampleObservations(obs observeFunc, sampleRate float64) observeFunc {
   241  	if sampleRate >= 1 {
   242  		return obs
   243  	}
   244  	return func(name string, lvs lv.LabelValues, value float64) {
   245  		if rand.Float64() > sampleRate {
   246  			return
   247  		}
   248  		obs(name, lvs, value)
   249  	}
   250  }
   251  
   252  // Counter is a DogStatsD counter. Observations are forwarded to a Dogstatsd
   253  // object, and aggregated (summed) per timeseries.
   254  type Counter struct {
   255  	name string
   256  	lvs  lv.LabelValues
   257  	obs  observeFunc
   258  }
   259  
   260  // With implements metrics.Counter.
   261  func (c *Counter) With(labelValues ...string) metrics.Counter {
   262  	return &Counter{
   263  		name: c.name,
   264  		lvs:  c.lvs.With(labelValues...),
   265  		obs:  c.obs,
   266  	}
   267  }
   268  
   269  // Add implements metrics.Counter.
   270  func (c *Counter) Add(delta float64) {
   271  	c.obs(c.name, c.lvs, delta)
   272  }
   273  
   274  // Gauge is a DogStatsD gauge. Observations are forwarded to a Dogstatsd
   275  // object, and aggregated (the last observation selected) per timeseries.
   276  type Gauge struct {
   277  	g    *generic.Gauge
   278  	ddog *Dogstatsd
   279  	set  int32
   280  }
   281  
   282  // With implements metrics.Gauge.
   283  func (g *Gauge) With(labelValues ...string) metrics.Gauge {
   284  	g.ddog.mtx.RLock()
   285  	node := g.ddog.gauges[g.g.Name]
   286  	g.ddog.mtx.RUnlock()
   287  
   288  	ga := &Gauge{g: g.g.With(labelValues...).(*generic.Gauge), ddog: g.ddog}
   289  	return node.addGauge(ga, ga.g.LabelValues())
   290  }
   291  
   292  // Set implements metrics.Gauge.
   293  func (g *Gauge) Set(value float64) {
   294  	g.g.Set(value)
   295  	g.touch()
   296  }
   297  
   298  // Add implements metrics.Gauge.
   299  func (g *Gauge) Add(delta float64) {
   300  	g.g.Add(delta)
   301  	g.touch()
   302  }
   303  
   304  // Timing is a DogStatsD timing, or metrics.Histogram. Observations are
   305  // forwarded to a Dogstatsd object, and collected (but not aggregated) per
   306  // timeseries.
   307  type Timing struct {
   308  	name string
   309  	lvs  lv.LabelValues
   310  	obs  observeFunc
   311  }
   312  
   313  // With implements metrics.Timing.
   314  func (t *Timing) With(labelValues ...string) metrics.Histogram {
   315  	return &Timing{
   316  		name: t.name,
   317  		lvs:  t.lvs.With(labelValues...),
   318  		obs:  t.obs,
   319  	}
   320  }
   321  
   322  // Observe implements metrics.Histogram. Value is interpreted as milliseconds.
   323  func (t *Timing) Observe(value float64) {
   324  	t.obs(t.name, t.lvs, value)
   325  }
   326  
   327  // Histogram is a DogStatsD histrogram. Observations are forwarded to a
   328  // Dogstatsd object, and collected (but not aggregated) per timeseries.
   329  type Histogram struct {
   330  	name string
   331  	lvs  lv.LabelValues
   332  	obs  observeFunc
   333  }
   334  
   335  // With implements metrics.Histogram.
   336  func (h *Histogram) With(labelValues ...string) metrics.Histogram {
   337  	return &Histogram{
   338  		name: h.name,
   339  		lvs:  h.lvs.With(labelValues...),
   340  		obs:  h.obs,
   341  	}
   342  }
   343  
   344  // Observe implements metrics.Histogram.
   345  func (h *Histogram) Observe(value float64) {
   346  	h.obs(h.name, h.lvs, value)
   347  }
   348  
   349  type pair struct{ label, value string }
   350  
   351  type gaugeNode struct {
   352  	mtx      sync.RWMutex
   353  	gauge    *Gauge
   354  	children map[pair]*gaugeNode
   355  }
   356  
   357  func (n *gaugeNode) addGauge(g *Gauge, lvs lv.LabelValues) *Gauge {
   358  	n.mtx.Lock()
   359  	defer n.mtx.Unlock()
   360  	if len(lvs) == 0 {
   361  		if n.gauge == nil {
   362  			n.gauge = g
   363  		}
   364  		return n.gauge
   365  	}
   366  	if len(lvs) < 2 {
   367  		panic("too few LabelValues; programmer error!")
   368  	}
   369  	head, tail := pair{lvs[0], lvs[1]}, lvs[2:]
   370  	if n.children == nil {
   371  		n.children = map[pair]*gaugeNode{}
   372  	}
   373  	child, ok := n.children[head]
   374  	if !ok {
   375  		child = &gaugeNode{}
   376  		n.children[head] = child
   377  	}
   378  	return child.addGauge(g, tail)
   379  }
   380  
   381  func (n *gaugeNode) walk(fn func(string, lv.LabelValues, float64) bool) bool {
   382  	n.mtx.RLock()
   383  	defer n.mtx.RUnlock()
   384  	if n.gauge != nil {
   385  		value, ok := n.gauge.read()
   386  		if ok && !fn(n.gauge.g.Name, n.gauge.g.LabelValues(), value) {
   387  			return false
   388  		}
   389  	}
   390  	for _, child := range n.children {
   391  		if !child.walk(fn) {
   392  			return false
   393  		}
   394  	}
   395  	return true
   396  }
   397  
   398  func (g *Gauge) touch() {
   399  	atomic.StoreInt32(&(g.set), 1)
   400  }
   401  
   402  func (g *Gauge) read() (float64, bool) {
   403  	set := atomic.SwapInt32(&(g.set), 0)
   404  	return g.g.Value(), set != 0
   405  }
   406  

View as plain text