...

Source file src/github.com/go-kit/kit/metrics/influxstatsd/influxstatsd.go

Documentation: github.com/go-kit/kit/metrics/influxstatsd

     1  // Package influxstatsd provides support for InfluxData's StatsD Telegraf plugin. It's very
     2  // similar to StatsD, but supports arbitrary tags per-metric, which map to Go
     3  // kit's label values. So, while label values are no-ops in StatsD, they are
     4  // supported here. For more details, see the article at
     5  // https://www.influxdata.com/blog/getting-started-with-sending-statsd-metrics-to-telegraf-influxdb/
     6  //
     7  // This package batches observations and emits them on some schedule to the
     8  // remote server. This is useful even if you connect to your service
     9  // over UDP. Emitting one network packet per observation can quickly overwhelm
    10  // even the fastest internal network.
    11  package influxstatsd
    12  
    13  import (
    14  	"context"
    15  	"fmt"
    16  	"io"
    17  	"strings"
    18  	"sync"
    19  	"sync/atomic"
    20  	"time"
    21  
    22  	"github.com/go-kit/kit/metrics"
    23  	"github.com/go-kit/kit/metrics/generic"
    24  	"github.com/go-kit/kit/metrics/internal/lv"
    25  	"github.com/go-kit/kit/metrics/internal/ratemap"
    26  	"github.com/go-kit/kit/util/conn"
    27  	"github.com/go-kit/log"
    28  )
    29  
    30  // Influxstatsd receives metrics observations and forwards them to a server.
    31  // Create a Influxstatsd object, use it to create metrics, and pass those
    32  // metrics as dependencies to the components that will use them.
    33  //
    34  // All metrics are buffered until WriteTo is called. Counters and gauges are
    35  // aggregated into a single observation per timeseries per write. Timings and
    36  // histograms are buffered but not aggregated.
    37  //
    38  // To regularly report metrics to an io.Writer, use the WriteLoop helper method.
    39  // To send to a InfluxStatsD server, use the SendLoop helper method.
    40  type Influxstatsd struct {
    41  	mtx        sync.RWMutex
    42  	prefix     string
    43  	rates      *ratemap.RateMap
    44  	counters   *lv.Space
    45  	gauges     map[string]*gaugeNode
    46  	timings    *lv.Space
    47  	histograms *lv.Space
    48  	logger     log.Logger
    49  	lvs        lv.LabelValues
    50  }
    51  
    52  // New returns a Influxstatsd object that may be used to create metrics. Prefix is
    53  // applied to all created metrics. Callers must ensure that regular calls to
    54  // WriteTo are performed, either manually or with one of the helper methods.
    55  func New(prefix string, logger log.Logger, lvs ...string) *Influxstatsd {
    56  	if len(lvs)%2 != 0 {
    57  		panic("odd number of LabelValues; programmer error!")
    58  	}
    59  	return &Influxstatsd{
    60  		prefix:     prefix,
    61  		rates:      ratemap.New(),
    62  		counters:   lv.NewSpace(),
    63  		gauges:     map[string]*gaugeNode{}, // https://github.com/go-kit/kit/pull/588
    64  		timings:    lv.NewSpace(),
    65  		histograms: lv.NewSpace(),
    66  		logger:     logger,
    67  		lvs:        lvs,
    68  	}
    69  }
    70  
    71  // NewCounter returns a counter, sending observations to this Influxstatsd object.
    72  func (d *Influxstatsd) NewCounter(name string, sampleRate float64) *Counter {
    73  	d.rates.Set(name, sampleRate)
    74  	return &Counter{
    75  		name: name,
    76  		obs:  d.counters.Observe,
    77  	}
    78  }
    79  
    80  // NewGauge returns a gauge, sending observations to this Influxstatsd object.
    81  func (d *Influxstatsd) NewGauge(name string) *Gauge {
    82  	d.mtx.Lock()
    83  	n, ok := d.gauges[name]
    84  	if !ok {
    85  		n = &gaugeNode{gauge: &Gauge{g: generic.NewGauge(name), influx: d}}
    86  		d.gauges[name] = n
    87  	}
    88  	d.mtx.Unlock()
    89  	return n.gauge
    90  }
    91  
    92  // NewTiming returns a histogram whose observations are interpreted as
    93  // millisecond durations, and are forwarded to this Influxstatsd object.
    94  func (d *Influxstatsd) NewTiming(name string, sampleRate float64) *Timing {
    95  	d.rates.Set(name, sampleRate)
    96  	return &Timing{
    97  		name: name,
    98  		obs:  d.timings.Observe,
    99  	}
   100  }
   101  
   102  // NewHistogram returns a histogram whose observations are of an unspecified
   103  // unit, and are forwarded to this Influxstatsd object.
   104  func (d *Influxstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
   105  	d.rates.Set(name, sampleRate)
   106  	return &Histogram{
   107  		name: name,
   108  		obs:  d.histograms.Observe,
   109  	}
   110  }
   111  
   112  // WriteLoop is a helper method that invokes WriteTo to the passed writer every
   113  // time the passed channel fires. This method blocks until ctx is canceled,
   114  // so clients probably want to run it in its own goroutine. For typical
   115  // usage, create a time.Ticker and pass its C channel to this method.
   116  func (d *Influxstatsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
   117  	for {
   118  		select {
   119  		case <-c:
   120  			if _, err := d.WriteTo(w); err != nil {
   121  				d.logger.Log("during", "WriteTo", "err", err)
   122  			}
   123  		case <-ctx.Done():
   124  			return
   125  		}
   126  	}
   127  }
   128  
   129  // SendLoop is a helper method that wraps WriteLoop, passing a managed
   130  // connection to the network and address. Like WriteLoop, this method blocks
   131  // until ctx is canceled, so clients probably want to start it in its own
   132  // goroutine. For typical usage, create a time.Ticker and pass its C channel to
   133  // this method.
   134  func (d *Influxstatsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
   135  	d.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, d.logger))
   136  }
   137  
   138  // WriteTo flushes the buffered content of the metrics to the writer, in
   139  // InfluxStatsD format. WriteTo abides best-effort semantics, so observations are
   140  // lost if there is a problem with the write. Clients should be sure to call
   141  // WriteTo regularly, ideally through the WriteLoop or SendLoop helper methods.
   142  func (d *Influxstatsd) WriteTo(w io.Writer) (count int64, err error) {
   143  	var n int
   144  
   145  	d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   146  		n, err = fmt.Fprintf(w, "%s%s%s:%f|c%s\n", d.prefix, name, d.tagValues(lvs), sum(values), sampling(d.rates.Get(name)))
   147  		if err != nil {
   148  			return false
   149  		}
   150  		count += int64(n)
   151  		return true
   152  	})
   153  	if err != nil {
   154  		return count, err
   155  	}
   156  
   157  	d.mtx.RLock()
   158  	for _, root := range d.gauges {
   159  		root.walk(func(name string, lvs lv.LabelValues, value float64) bool {
   160  			n, err = fmt.Fprintf(w, "%s%s%s:%f|g\n", d.prefix, name, d.tagValues(lvs), value)
   161  			if err != nil {
   162  				return false
   163  			}
   164  			count += int64(n)
   165  			return true
   166  		})
   167  	}
   168  	d.mtx.RUnlock()
   169  
   170  	d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   171  		sampleRate := d.rates.Get(name)
   172  		for _, value := range values {
   173  			n, err = fmt.Fprintf(w, "%s%s%s:%f|ms%s\n", d.prefix, name, d.tagValues(lvs), value, sampling(sampleRate))
   174  			if err != nil {
   175  				return false
   176  			}
   177  			count += int64(n)
   178  		}
   179  		return true
   180  	})
   181  	if err != nil {
   182  		return count, err
   183  	}
   184  
   185  	d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   186  		sampleRate := d.rates.Get(name)
   187  		for _, value := range values {
   188  			n, err = fmt.Fprintf(w, "%s%s%s:%f|h%s\n", d.prefix, name, d.tagValues(lvs), value, sampling(sampleRate))
   189  			if err != nil {
   190  				return false
   191  			}
   192  			count += int64(n)
   193  		}
   194  		return true
   195  	})
   196  	if err != nil {
   197  		return count, err
   198  	}
   199  
   200  	return count, err
   201  }
   202  
   203  func sum(a []float64) float64 {
   204  	var v float64
   205  	for _, f := range a {
   206  		v += f
   207  	}
   208  	return v
   209  }
   210  
   211  func sampling(r float64) string {
   212  	var sv string
   213  	if r < 1.0 {
   214  		sv = fmt.Sprintf("|@%f", r)
   215  	}
   216  	return sv
   217  }
   218  
   219  func (d *Influxstatsd) tagValues(labelValues []string) string {
   220  	if len(labelValues) == 0 && len(d.lvs) == 0 {
   221  		return ""
   222  	}
   223  	if len(labelValues)%2 != 0 {
   224  		panic("tagValues received a labelValues with an odd number of strings")
   225  	}
   226  	pairs := make([]string, 0, (len(d.lvs)+len(labelValues))/2)
   227  	for i := 0; i < len(d.lvs); i += 2 {
   228  		pairs = append(pairs, d.lvs[i]+"="+d.lvs[i+1])
   229  	}
   230  	for i := 0; i < len(labelValues); i += 2 {
   231  		pairs = append(pairs, labelValues[i]+"="+labelValues[i+1])
   232  	}
   233  	return "," + strings.Join(pairs, ",")
   234  }
   235  
   236  type observeFunc func(name string, lvs lv.LabelValues, value float64)
   237  
   238  // Counter is a InfluxStatsD counter. Observations are forwarded to a Influxstatsd
   239  // object, and aggregated (summed) per timeseries.
   240  type Counter struct {
   241  	name string
   242  	lvs  lv.LabelValues
   243  	obs  observeFunc
   244  }
   245  
   246  // With implements metrics.Counter.
   247  func (c *Counter) With(labelValues ...string) metrics.Counter {
   248  	return &Counter{
   249  		name: c.name,
   250  		lvs:  c.lvs.With(labelValues...),
   251  		obs:  c.obs,
   252  	}
   253  }
   254  
   255  // Add implements metrics.Counter.
   256  func (c *Counter) Add(delta float64) {
   257  	c.obs(c.name, c.lvs, delta)
   258  }
   259  
   260  // Gauge is a InfluxStatsD gauge. Observations are forwarded to a Influxstatsd
   261  // object, and aggregated (the last observation selected) per timeseries.
   262  type Gauge struct {
   263  	g      *generic.Gauge
   264  	influx *Influxstatsd
   265  	set    int32
   266  }
   267  
   268  // With implements metrics.Gauge.
   269  func (g *Gauge) With(labelValues ...string) metrics.Gauge {
   270  	g.influx.mtx.RLock()
   271  	node := g.influx.gauges[g.g.Name]
   272  	g.influx.mtx.RUnlock()
   273  
   274  	ga := &Gauge{g: g.g.With(labelValues...).(*generic.Gauge), influx: g.influx}
   275  	return node.addGauge(ga, ga.g.LabelValues())
   276  }
   277  
   278  // Set implements metrics.Gauge.
   279  func (g *Gauge) Set(value float64) {
   280  	g.g.Set(value)
   281  	g.touch()
   282  }
   283  
   284  // Add implements metrics.Gauge.
   285  func (g *Gauge) Add(delta float64) {
   286  	g.g.Add(delta)
   287  	g.touch()
   288  }
   289  
   290  // Timing is a InfluxStatsD timing, or metrics.Histogram. Observations are
   291  // forwarded to a Influxstatsd object, and collected (but not aggregated) per
   292  // timeseries.
   293  type Timing struct {
   294  	name string
   295  	lvs  lv.LabelValues
   296  	obs  observeFunc
   297  }
   298  
   299  // With implements metrics.Timing.
   300  func (t *Timing) With(labelValues ...string) metrics.Histogram {
   301  	return &Timing{
   302  		name: t.name,
   303  		lvs:  t.lvs.With(labelValues...),
   304  		obs:  t.obs,
   305  	}
   306  }
   307  
   308  // Observe implements metrics.Histogram. Value is interpreted as milliseconds.
   309  func (t *Timing) Observe(value float64) {
   310  	t.obs(t.name, t.lvs, value)
   311  }
   312  
   313  // Histogram is a InfluxStatsD histrogram. Observations are forwarded to a
   314  // Influxstatsd object, and collected (but not aggregated) per timeseries.
   315  type Histogram struct {
   316  	name string
   317  	lvs  lv.LabelValues
   318  	obs  observeFunc
   319  }
   320  
   321  // With implements metrics.Histogram.
   322  func (h *Histogram) With(labelValues ...string) metrics.Histogram {
   323  	return &Histogram{
   324  		name: h.name,
   325  		lvs:  h.lvs.With(labelValues...),
   326  		obs:  h.obs,
   327  	}
   328  }
   329  
   330  // Observe implements metrics.Histogram.
   331  func (h *Histogram) Observe(value float64) {
   332  	h.obs(h.name, h.lvs, value)
   333  }
   334  
   335  type pair struct{ label, value string }
   336  
   337  type gaugeNode struct {
   338  	mtx      sync.RWMutex
   339  	gauge    *Gauge
   340  	children map[pair]*gaugeNode
   341  }
   342  
   343  func (n *gaugeNode) addGauge(g *Gauge, lvs lv.LabelValues) *Gauge {
   344  	n.mtx.Lock()
   345  	defer n.mtx.Unlock()
   346  	if len(lvs) == 0 {
   347  		if n.gauge == nil {
   348  			n.gauge = g
   349  		}
   350  		return n.gauge
   351  	}
   352  	if len(lvs) < 2 {
   353  		panic("too few LabelValues; programmer error!")
   354  	}
   355  	head, tail := pair{lvs[0], lvs[1]}, lvs[2:]
   356  	if n.children == nil {
   357  		n.children = map[pair]*gaugeNode{}
   358  	}
   359  	child, ok := n.children[head]
   360  	if !ok {
   361  		child = &gaugeNode{}
   362  		n.children[head] = child
   363  	}
   364  	return child.addGauge(g, tail)
   365  }
   366  
   367  func (n *gaugeNode) walk(fn func(string, lv.LabelValues, float64) bool) bool {
   368  	n.mtx.RLock()
   369  	defer n.mtx.RUnlock()
   370  	if n.gauge != nil {
   371  		value, ok := n.gauge.read()
   372  		if ok && !fn(n.gauge.g.Name, n.gauge.g.LabelValues(), value) {
   373  			return false
   374  		}
   375  	}
   376  	for _, child := range n.children {
   377  		if !child.walk(fn) {
   378  			return false
   379  		}
   380  	}
   381  	return true
   382  }
   383  
   384  func (g *Gauge) touch() {
   385  	atomic.StoreInt32(&(g.set), 1)
   386  }
   387  
   388  func (g *Gauge) read() (float64, bool) {
   389  	set := atomic.SwapInt32(&(g.set), 0)
   390  	return g.g.Value(), set != 0
   391  }
   392  

View as plain text