...

Source file src/github.com/go-kit/kit/metrics/influx/influx.go

Documentation: github.com/go-kit/kit/metrics/influx

     1  // Package influx provides an InfluxDB implementation for metrics. The model is
     2  // similar to other push-based instrumentation systems. Observations are
     3  // aggregated locally and emitted to the Influx server on regular intervals.
     4  package influx
     5  
     6  import (
     7  	"context"
     8  	"time"
     9  
    10  	influxdb "github.com/influxdata/influxdb1-client/v2"
    11  
    12  	"github.com/go-kit/kit/metrics"
    13  	"github.com/go-kit/kit/metrics/generic"
    14  	"github.com/go-kit/kit/metrics/internal/lv"
    15  	"github.com/go-kit/log"
    16  )
    17  
    18  // Influx is a store for metrics that will be emitted to an Influx database.
    19  //
    20  // Influx is a general purpose time-series database, and has no native concepts
    21  // of counters, gauges, or histograms. Counters are modeled as a timeseries with
    22  // one data point per flush, with a "count" field that reflects all adds since
    23  // the last flush. Gauges are modeled as a timeseries with one data point per
    24  // flush, with a "value" field that reflects the current state of the gauge.
    25  // Histograms are modeled as a timeseries with one data point per combination of tags,
    26  // with a set of quantile fields that reflects the p50, p90, p95 & p99.
    27  //
    28  // Influx tags are attached to the Influx object, can be given to each
    29  // metric at construction and can be updated anytime via With function. Influx fields
    30  // are mapped to Go kit label values directly by this collector. Actual metric
    31  // values are provided as fields with specific names depending on the metric.
    32  //
    33  // All observations are collected in memory locally, and flushed on demand.
    34  type Influx struct {
    35  	counters   *lv.Space
    36  	gauges     *lv.Space
    37  	histograms *lv.Space
    38  	tags       map[string]string
    39  	conf       influxdb.BatchPointsConfig
    40  	logger     log.Logger
    41  }
    42  
    43  // New returns an Influx, ready to create metrics and collect observations. Tags
    44  // are applied to all metrics created from this object. The BatchPointsConfig is
    45  // used during flushing.
    46  func New(tags map[string]string, conf influxdb.BatchPointsConfig, logger log.Logger) *Influx {
    47  	return &Influx{
    48  		counters:   lv.NewSpace(),
    49  		gauges:     lv.NewSpace(),
    50  		histograms: lv.NewSpace(),
    51  		tags:       tags,
    52  		conf:       conf,
    53  		logger:     logger,
    54  	}
    55  }
    56  
    57  // NewCounter returns an Influx counter.
    58  func (in *Influx) NewCounter(name string) *Counter {
    59  	return &Counter{
    60  		name: name,
    61  		obs:  in.counters.Observe,
    62  	}
    63  }
    64  
    65  // NewGauge returns an Influx gauge.
    66  func (in *Influx) NewGauge(name string) *Gauge {
    67  	return &Gauge{
    68  		name: name,
    69  		obs:  in.gauges.Observe,
    70  		add:  in.gauges.Add,
    71  	}
    72  }
    73  
    74  // NewHistogram returns an Influx histogram.
    75  func (in *Influx) NewHistogram(name string) *Histogram {
    76  	return &Histogram{
    77  		name: name,
    78  		obs:  in.histograms.Observe,
    79  	}
    80  }
    81  
    82  // BatchPointsWriter captures a subset of the influxdb.Client methods necessary
    83  // for emitting metrics observations.
    84  type BatchPointsWriter interface {
    85  	Write(influxdb.BatchPoints) error
    86  }
    87  
    88  // WriteLoop is a helper method that invokes WriteTo to the passed writer every
    89  // time the passed channel fires. This method blocks until the channel is
    90  // closed, so clients probably want to run it in its own goroutine. For typical
    91  // usage, create a time.Ticker and pass its C channel to this method.
    92  func (in *Influx) WriteLoop(ctx context.Context, c <-chan time.Time, w BatchPointsWriter) {
    93  	for {
    94  		select {
    95  		case <-c:
    96  			if err := in.WriteTo(w); err != nil {
    97  				in.logger.Log("during", "WriteTo", "err", err)
    98  			}
    99  		case <-ctx.Done():
   100  			return
   101  		}
   102  	}
   103  }
   104  
   105  // WriteTo flushes the buffered content of the metrics to the writer, in an
   106  // Influx BatchPoints format. WriteTo abides best-effort semantics, so
   107  // observations are lost if there is a problem with the write. Clients should be
   108  // sure to call WriteTo regularly, ideally through the WriteLoop helper method.
   109  func (in *Influx) WriteTo(w BatchPointsWriter) (err error) {
   110  	bp, err := influxdb.NewBatchPoints(in.conf)
   111  	if err != nil {
   112  		return err
   113  	}
   114  
   115  	now := time.Now()
   116  
   117  	in.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   118  		tags := mergeTags(in.tags, lvs)
   119  		var p *influxdb.Point
   120  		fields := map[string]interface{}{"count": sum(values)}
   121  		p, err = influxdb.NewPoint(name, tags, fields, now)
   122  		if err != nil {
   123  			return false
   124  		}
   125  		bp.AddPoint(p)
   126  		return true
   127  	})
   128  	if err != nil {
   129  		return err
   130  	}
   131  
   132  	in.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   133  		tags := mergeTags(in.tags, lvs)
   134  		var p *influxdb.Point
   135  		fields := map[string]interface{}{"value": last(values)}
   136  		p, err = influxdb.NewPoint(name, tags, fields, now)
   137  		if err != nil {
   138  			return false
   139  		}
   140  		bp.AddPoint(p)
   141  		return true
   142  	})
   143  	if err != nil {
   144  		return err
   145  	}
   146  
   147  	in.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   148  		histogram := generic.NewHistogram(name, 50)
   149  		tags := mergeTags(in.tags, lvs)
   150  		var p *influxdb.Point
   151  		for _, v := range values {
   152  			histogram.Observe(v)
   153  		}
   154  		fields := map[string]interface{}{
   155  			"p50": histogram.Quantile(0.50),
   156  			"p90": histogram.Quantile(0.90),
   157  			"p95": histogram.Quantile(0.95),
   158  			"p99": histogram.Quantile(0.99),
   159  		}
   160  		p, err = influxdb.NewPoint(name, tags, fields, now)
   161  		if err != nil {
   162  			return false
   163  		}
   164  		bp.AddPoint(p)
   165  		return true
   166  	})
   167  	if err != nil {
   168  		return err
   169  	}
   170  
   171  	return w.Write(bp)
   172  }
   173  
   174  func mergeTags(tags map[string]string, labelValues []string) map[string]string {
   175  	if len(labelValues)%2 != 0 {
   176  		panic("mergeTags received a labelValues with an odd number of strings")
   177  	}
   178  	ret := make(map[string]string, len(tags)+len(labelValues)/2)
   179  	for k, v := range tags {
   180  		ret[k] = v
   181  	}
   182  	for i := 0; i < len(labelValues); i += 2 {
   183  		ret[labelValues[i]] = labelValues[i+1]
   184  	}
   185  	return ret
   186  }
   187  
   188  func sum(a []float64) float64 {
   189  	var v float64
   190  	for _, f := range a {
   191  		v += f
   192  	}
   193  	return v
   194  }
   195  
   196  func last(a []float64) float64 {
   197  	return a[len(a)-1]
   198  }
   199  
   200  type observeFunc func(name string, lvs lv.LabelValues, value float64)
   201  
   202  // Counter is an Influx counter. Observations are forwarded to an Influx
   203  // object, and aggregated (summed) per timeseries.
   204  type Counter struct {
   205  	name string
   206  	lvs  lv.LabelValues
   207  	obs  observeFunc
   208  }
   209  
   210  // With implements metrics.Counter.
   211  func (c *Counter) With(labelValues ...string) metrics.Counter {
   212  	return &Counter{
   213  		name: c.name,
   214  		lvs:  c.lvs.With(labelValues...),
   215  		obs:  c.obs,
   216  	}
   217  }
   218  
   219  // Add implements metrics.Counter.
   220  func (c *Counter) Add(delta float64) {
   221  	c.obs(c.name, c.lvs, delta)
   222  }
   223  
   224  // Gauge is an Influx gauge. Observations are forwarded to a Dogstatsd
   225  // object, and aggregated (the last observation selected) per timeseries.
   226  type Gauge struct {
   227  	name string
   228  	lvs  lv.LabelValues
   229  	obs  observeFunc
   230  	add  observeFunc
   231  }
   232  
   233  // With implements metrics.Gauge.
   234  func (g *Gauge) With(labelValues ...string) metrics.Gauge {
   235  	return &Gauge{
   236  		name: g.name,
   237  		lvs:  g.lvs.With(labelValues...),
   238  		obs:  g.obs,
   239  		add:  g.add,
   240  	}
   241  }
   242  
   243  // Set implements metrics.Gauge.
   244  func (g *Gauge) Set(value float64) {
   245  	g.obs(g.name, g.lvs, value)
   246  }
   247  
   248  // Add implements metrics.Gauge.
   249  func (g *Gauge) Add(delta float64) {
   250  	g.add(g.name, g.lvs, delta)
   251  }
   252  
   253  // Histogram is an Influx histrogram. Observations are aggregated into a
   254  // generic.Histogram and emitted as per-quantile gauges to the Influx server.
   255  type Histogram struct {
   256  	name string
   257  	lvs  lv.LabelValues
   258  	obs  observeFunc
   259  }
   260  
   261  // With implements metrics.Histogram.
   262  func (h *Histogram) With(labelValues ...string) metrics.Histogram {
   263  	return &Histogram{
   264  		name: h.name,
   265  		lvs:  h.lvs.With(labelValues...),
   266  		obs:  h.obs,
   267  	}
   268  }
   269  
   270  // Observe implements metrics.Histogram.
   271  func (h *Histogram) Observe(value float64) {
   272  	h.obs(h.name, h.lvs, value)
   273  }
   274  

View as plain text