...

Source file src/github.com/go-kit/kit/metrics/cloudwatch/cloudwatch.go

Documentation: github.com/go-kit/kit/metrics/cloudwatch

     1  package cloudwatch
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"os"
     7  	"strconv"
     8  	"sync"
     9  	"time"
    10  
    11  	"github.com/aws/aws-sdk-go/aws"
    12  	"github.com/aws/aws-sdk-go/service/cloudwatch"
    13  	"github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
    14  
    15  	"github.com/go-kit/kit/metrics"
    16  	"github.com/go-kit/kit/metrics/generic"
    17  	"github.com/go-kit/kit/metrics/internal/lv"
    18  	"github.com/go-kit/log"
    19  )
    20  
    21  const (
    22  	maxConcurrentRequests = 20
    23  	maxValuesInABatch     = 150
    24  )
    25  
    26  // CloudWatch receives metrics observations and forwards them to CloudWatch.
    27  // Create a CloudWatch object, use it to create metrics, and pass those metrics as
    28  // dependencies to the components that will use them.
    29  //
    30  // To regularly report metrics to CloudWatch, use the WriteLoop helper method.
    31  type CloudWatch struct {
    32  	mtx                   sync.RWMutex
    33  	sem                   chan struct{}
    34  	namespace             string
    35  	svc                   cloudwatchiface.CloudWatchAPI
    36  	counters              *lv.Space
    37  	gauges                *lv.Space
    38  	histograms            *lv.Space
    39  	percentiles           []float64 // percentiles to track
    40  	logger                log.Logger
    41  	numConcurrentRequests int
    42  }
    43  
    44  // Option is a function adapter to change config of the CloudWatch struct
    45  type Option func(*CloudWatch)
    46  
    47  // WithLogger sets the Logger that will receive error messages generated
    48  // during the WriteLoop. By default, fmt logger is used.
    49  func WithLogger(logger log.Logger) Option {
    50  	return func(c *CloudWatch) {
    51  		c.logger = logger
    52  	}
    53  }
    54  
    55  // WithPercentiles registers the percentiles to track, overriding the
    56  // existing/default values.
    57  // Reason is that Cloudwatch makes you pay per metric, so you can save half the money
    58  // by only using 2 metrics instead of the default 4.
    59  func WithPercentiles(percentiles ...float64) Option {
    60  	return func(c *CloudWatch) {
    61  		c.percentiles = make([]float64, 0, len(percentiles))
    62  		for _, p := range percentiles {
    63  			if p < 0 || p > 1 {
    64  				continue // illegal entry; ignore
    65  			}
    66  			c.percentiles = append(c.percentiles, p)
    67  		}
    68  	}
    69  }
    70  
    71  // WithConcurrentRequests sets the upper limit on how many
    72  // cloudwatch.PutMetricDataRequest may be under way at any
    73  // given time. If n is greater than 20, 20 is used. By default,
    74  // the max is set at 10 concurrent requests.
    75  func WithConcurrentRequests(n int) Option {
    76  	return func(c *CloudWatch) {
    77  		if n > maxConcurrentRequests {
    78  			n = maxConcurrentRequests
    79  		}
    80  		c.numConcurrentRequests = n
    81  	}
    82  }
    83  
    84  // New returns a CloudWatch object that may be used to create metrics.
    85  // Namespace is applied to all created metrics and maps to the CloudWatch namespace.
    86  // Callers must ensure that regular calls to Send are performed, either
    87  // manually or with one of the helper methods.
    88  func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...Option) *CloudWatch {
    89  	cw := &CloudWatch{
    90  		sem:                   nil, // set below
    91  		namespace:             namespace,
    92  		svc:                   svc,
    93  		counters:              lv.NewSpace(),
    94  		gauges:                lv.NewSpace(),
    95  		histograms:            lv.NewSpace(),
    96  		numConcurrentRequests: 10,
    97  		logger:                log.NewLogfmtLogger(os.Stderr),
    98  		percentiles:           []float64{0.50, 0.90, 0.95, 0.99},
    99  	}
   100  
   101  	for _, opt := range options {
   102  		opt(cw)
   103  	}
   104  
   105  	cw.sem = make(chan struct{}, cw.numConcurrentRequests)
   106  
   107  	return cw
   108  }
   109  
   110  // NewCounter returns a counter. Observations are aggregated and emitted once
   111  // per write invocation.
   112  func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
   113  	return &Counter{
   114  		name: name,
   115  		obs:  cw.counters.Observe,
   116  	}
   117  }
   118  
   119  // NewGauge returns an gauge.
   120  func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
   121  	return &Gauge{
   122  		name: name,
   123  		obs:  cw.gauges.Observe,
   124  		add:  cw.gauges.Add,
   125  	}
   126  }
   127  
   128  // NewHistogram returns a histogram.
   129  func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
   130  	return &Histogram{
   131  		name: name,
   132  		obs:  cw.histograms.Observe,
   133  	}
   134  }
   135  
   136  // WriteLoop is a helper method that invokes Send every time the passed
   137  // channel fires. This method blocks until ctx is canceled, so clients
   138  // probably want to run it in its own goroutine. For typical usage, create a
   139  // time.Ticker and pass its C channel to this method.
   140  func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) {
   141  	for {
   142  		select {
   143  		case <-c:
   144  			if err := cw.Send(); err != nil {
   145  				cw.logger.Log("during", "Send", "err", err)
   146  			}
   147  		case <-ctx.Done():
   148  			return
   149  		}
   150  	}
   151  }
   152  
   153  // Send will fire an API request to CloudWatch with the latest stats for
   154  // all metrics. It is preferred that the WriteLoop method is used.
   155  func (cw *CloudWatch) Send() error {
   156  	cw.mtx.RLock()
   157  	defer cw.mtx.RUnlock()
   158  	now := time.Now()
   159  
   160  	var datums []*cloudwatch.MetricDatum
   161  
   162  	cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   163  		value := sum(values)
   164  		datums = append(datums, &cloudwatch.MetricDatum{
   165  			MetricName: aws.String(name),
   166  			Dimensions: makeDimensions(lvs...),
   167  			Value:      aws.Float64(value),
   168  			Timestamp:  aws.Time(now),
   169  		})
   170  		return true
   171  	})
   172  
   173  	cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   174  		if len(values) == 0 {
   175  			return true
   176  		}
   177  
   178  		datum := &cloudwatch.MetricDatum{
   179  			MetricName: aws.String(name),
   180  			Dimensions: makeDimensions(lvs...),
   181  			Timestamp:  aws.Time(now),
   182  		}
   183  
   184  		// CloudWatch Put Metrics API (https://docs.aws.amazon.com/AmazonCloudWatch/latest/APIReference/API_MetricDatum.html)
   185  		// expects batch of unique values including the array of corresponding counts
   186  		valuesCounter := make(map[float64]int)
   187  		for _, v := range values {
   188  			valuesCounter[v]++
   189  		}
   190  
   191  		for value, count := range valuesCounter {
   192  			if len(datum.Values) == maxValuesInABatch {
   193  				break
   194  			}
   195  			datum.Values = append(datum.Values, aws.Float64(value))
   196  			datum.Counts = append(datum.Counts, aws.Float64(float64(count)))
   197  		}
   198  
   199  		datums = append(datums, datum)
   200  		return true
   201  	})
   202  
   203  	// format a [0,1]-float value to a percentile value, with minimum nr of decimals
   204  	// 0.90 -> "90"
   205  	// 0.95 -> "95"
   206  	// 0.999 -> "99.9"
   207  	formatPerc := func(p float64) string {
   208  		return strconv.FormatFloat(p*100, 'f', -1, 64)
   209  	}
   210  
   211  	cw.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   212  		histogram := generic.NewHistogram(name, 50)
   213  
   214  		for _, v := range values {
   215  			histogram.Observe(v)
   216  		}
   217  
   218  		for _, perc := range cw.percentiles {
   219  			value := histogram.Quantile(perc)
   220  			datums = append(datums, &cloudwatch.MetricDatum{
   221  				MetricName: aws.String(fmt.Sprintf("%s_%s", name, formatPerc(perc))),
   222  				Dimensions: makeDimensions(lvs...),
   223  				Value:      aws.Float64(value),
   224  				Timestamp:  aws.Time(now),
   225  			})
   226  		}
   227  		return true
   228  	})
   229  
   230  	var batches [][]*cloudwatch.MetricDatum
   231  	for len(datums) > 0 {
   232  		var batch []*cloudwatch.MetricDatum
   233  		lim := min(len(datums), maxConcurrentRequests)
   234  		batch, datums = datums[:lim], datums[lim:]
   235  		batches = append(batches, batch)
   236  	}
   237  
   238  	var errors = make(chan error, len(batches))
   239  	for _, batch := range batches {
   240  		go func(batch []*cloudwatch.MetricDatum) {
   241  			cw.sem <- struct{}{}
   242  			defer func() {
   243  				<-cw.sem
   244  			}()
   245  			_, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
   246  				Namespace:  aws.String(cw.namespace),
   247  				MetricData: batch,
   248  			})
   249  			errors <- err
   250  		}(batch)
   251  	}
   252  	var firstErr error
   253  	for i := 0; i < cap(errors); i++ {
   254  		if err := <-errors; err != nil && firstErr == nil {
   255  			firstErr = err
   256  		}
   257  	}
   258  
   259  	return firstErr
   260  }
   261  
   262  func sum(a []float64) float64 {
   263  	var v float64
   264  	for _, f := range a {
   265  		v += f
   266  	}
   267  	return v
   268  }
   269  
   270  func min(a, b int) int {
   271  	if a < b {
   272  		return a
   273  	}
   274  	return b
   275  }
   276  
   277  type observeFunc func(name string, lvs lv.LabelValues, value float64)
   278  
   279  // Counter is a counter. Observations are forwarded to a node
   280  // object, and aggregated (summed) per timeseries.
   281  type Counter struct {
   282  	name string
   283  	lvs  lv.LabelValues
   284  	obs  observeFunc
   285  }
   286  
   287  // With implements metrics.Counter.
   288  func (c *Counter) With(labelValues ...string) metrics.Counter {
   289  	return &Counter{
   290  		name: c.name,
   291  		lvs:  c.lvs.With(labelValues...),
   292  		obs:  c.obs,
   293  	}
   294  }
   295  
   296  // Add implements metrics.Counter.
   297  func (c *Counter) Add(delta float64) {
   298  	c.obs(c.name, c.lvs, delta)
   299  }
   300  
   301  // Gauge is a gauge. Observations are forwarded to a node
   302  // object, and aggregated (the last observation selected) per timeseries.
   303  type Gauge struct {
   304  	name string
   305  	lvs  lv.LabelValues
   306  	obs  observeFunc
   307  	add  observeFunc
   308  }
   309  
   310  // With implements metrics.Gauge.
   311  func (g *Gauge) With(labelValues ...string) metrics.Gauge {
   312  	return &Gauge{
   313  		name: g.name,
   314  		lvs:  g.lvs.With(labelValues...),
   315  		obs:  g.obs,
   316  		add:  g.add,
   317  	}
   318  }
   319  
   320  // Set implements metrics.Gauge.
   321  func (g *Gauge) Set(value float64) {
   322  	g.obs(g.name, g.lvs, value)
   323  }
   324  
   325  // Add implements metrics.Gauge.
   326  func (g *Gauge) Add(delta float64) {
   327  	g.add(g.name, g.lvs, delta)
   328  }
   329  
   330  // Histogram is an Influx histrogram. Observations are aggregated into a
   331  // generic.Histogram and emitted as per-quantile gauges to the Influx server.
   332  type Histogram struct {
   333  	name string
   334  	lvs  lv.LabelValues
   335  	obs  observeFunc
   336  }
   337  
   338  // With implements metrics.Histogram.
   339  func (h *Histogram) With(labelValues ...string) metrics.Histogram {
   340  	return &Histogram{
   341  		name: h.name,
   342  		lvs:  h.lvs.With(labelValues...),
   343  		obs:  h.obs,
   344  	}
   345  }
   346  
   347  // Observe implements metrics.Histogram.
   348  func (h *Histogram) Observe(value float64) {
   349  	h.obs(h.name, h.lvs, value)
   350  }
   351  
   352  func makeDimensions(labelValues ...string) []*cloudwatch.Dimension {
   353  	dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2)
   354  	for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
   355  		dimensions[j] = &cloudwatch.Dimension{
   356  			Name:  aws.String(labelValues[i]),
   357  			Value: aws.String(labelValues[i+1]),
   358  		}
   359  	}
   360  	return dimensions
   361  }
   362  

View as plain text