...

Source file src/github.com/go-kit/kit/metrics/cloudwatch2/cloudwatch2.go

Documentation: github.com/go-kit/kit/metrics/cloudwatch2

     1  // Package cloudwatch2 emits all data as a StatisticsSet (rather than
     2  // a singular Value) to CloudWatch via the aws-sdk-go-v2 SDK.
     3  package cloudwatch2
     4  
     5  import (
     6  	"context"
     7  	"math"
     8  	"sync"
     9  	"time"
    10  
    11  	"github.com/aws/aws-sdk-go-v2/aws"
    12  	"github.com/aws/aws-sdk-go-v2/service/cloudwatch"
    13  	"github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
    14  	"golang.org/x/sync/errgroup"
    15  
    16  	"github.com/go-kit/kit/metrics"
    17  	"github.com/go-kit/kit/metrics/internal/convert"
    18  	"github.com/go-kit/kit/metrics/internal/lv"
    19  	"github.com/go-kit/log"
    20  )
    21  
    22  const (
    23  	maxConcurrentRequests = 20
    24  )
    25  
    26  // CloudWatchAPI is an interface that defines the set of Amazon CloudWatch API operations required by CloudWatch.
    27  type CloudWatchAPI interface {
    28  	PutMetricData(ctx context.Context, params *cloudwatch.PutMetricDataInput, optFns ...func(*cloudwatch.Options)) (*cloudwatch.PutMetricDataOutput, error)
    29  }
    30  
    31  // CloudWatch receives metrics observations and forwards them to CloudWatch.
    32  // Create a CloudWatch object, use it to create metrics, and pass those metrics as
    33  // dependencies to the components that will use them.
    34  //
    35  // To regularly report metrics to CloudWatch, use the WriteLoop helper method.
    36  type CloudWatch struct {
    37  	mtx                   sync.RWMutex
    38  	sem                   chan struct{}
    39  	namespace             string
    40  	svc                   CloudWatchAPI
    41  	counters              *lv.Space
    42  	logger                log.Logger
    43  	numConcurrentRequests int
    44  }
    45  
    46  // Option is a function adapter to change config of the CloudWatch struct
    47  type Option func(*CloudWatch)
    48  
    49  // WithLogger sets the Logger that will receive error messages generated
    50  // during the WriteLoop. By default, no logger is used.
    51  func WithLogger(logger log.Logger) Option {
    52  	return func(cw *CloudWatch) {
    53  		cw.logger = logger
    54  	}
    55  }
    56  
    57  // WithConcurrentRequests sets the upper limit on how many
    58  // cloudwatch.PutMetricDataRequest may be under way at any
    59  // given time. If n is greater than 20, 20 is used. By default,
    60  // the max is set at 10 concurrent requests.
    61  func WithConcurrentRequests(n int) Option {
    62  	return func(cw *CloudWatch) {
    63  		if n > maxConcurrentRequests {
    64  			n = maxConcurrentRequests
    65  		}
    66  		cw.numConcurrentRequests = n
    67  	}
    68  }
    69  
    70  // New returns a CloudWatch object that may be used to create metrics.
    71  // Namespace is applied to all created metrics and maps to the CloudWatch namespace.
    72  // Callers must ensure that regular calls to Send are performed, either
    73  // manually or with one of the helper methods.
    74  func New(namespace string, svc CloudWatchAPI, options ...Option) *CloudWatch {
    75  	cw := &CloudWatch{
    76  		namespace:             namespace,
    77  		svc:                   svc,
    78  		counters:              lv.NewSpace(),
    79  		numConcurrentRequests: 10,
    80  		logger:                log.NewNopLogger(),
    81  	}
    82  
    83  	for _, optFunc := range options {
    84  		optFunc(cw)
    85  	}
    86  
    87  	cw.sem = make(chan struct{}, cw.numConcurrentRequests)
    88  
    89  	return cw
    90  }
    91  
    92  // NewCounter returns a counter. Observations are aggregated and emitted once
    93  // per write invocation.
    94  func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
    95  	return &Counter{
    96  		name: name,
    97  		obs:  cw.counters.Observe,
    98  	}
    99  }
   100  
   101  // NewGauge returns an gauge. Under the covers, there is no distinctions
   102  // in CloudWatch for how Counters/Histograms/Gauges are reported, so this
   103  // just wraps a cloudwatch2.Counter.
   104  func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
   105  	return convert.NewCounterAsGauge(cw.NewCounter(name))
   106  }
   107  
   108  // NewHistogram returns a histogram. Under the covers, there is no distinctions
   109  // in CloudWatch for how Counters/Histograms/Gauges are reported, so this
   110  // just wraps a cloudwatch2.Counter.
   111  func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
   112  	return convert.NewCounterAsHistogram(cw.NewCounter(name))
   113  }
   114  
   115  // WriteLoop is a helper method that invokes Send every time the passed
   116  // channel fires. This method blocks until ctx is canceled, so clients
   117  // probably want to run it in its own goroutine. For typical usage, create a
   118  // time.Ticker and pass its C channel to this method.
   119  func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) {
   120  	for {
   121  		select {
   122  		case <-c:
   123  			if err := cw.Send(); err != nil {
   124  				cw.logger.Log("during", "Send", "err", err)
   125  			}
   126  		case <-ctx.Done():
   127  			return
   128  		}
   129  	}
   130  }
   131  
   132  // Send will fire an API request to CloudWatch with the latest stats for
   133  // all metrics. It is preferred that the WriteLoop method is used.
   134  func (cw *CloudWatch) Send() error {
   135  	cw.mtx.RLock()
   136  	defer cw.mtx.RUnlock()
   137  	now := time.Now()
   138  
   139  	var datums []types.MetricDatum
   140  
   141  	cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
   142  		datums = append(datums, types.MetricDatum{
   143  			MetricName:      aws.String(name),
   144  			Dimensions:      makeDimensions(lvs...),
   145  			StatisticValues: stats(values),
   146  			Timestamp:       aws.Time(now),
   147  		})
   148  		return true
   149  	})
   150  
   151  	var batches [][]types.MetricDatum
   152  	for len(datums) > 0 {
   153  		var batch []types.MetricDatum
   154  		lim := len(datums)
   155  		if lim > maxConcurrentRequests {
   156  			lim = maxConcurrentRequests
   157  		}
   158  		batch, datums = datums[:lim], datums[lim:]
   159  		batches = append(batches, batch)
   160  	}
   161  
   162  	var g errgroup.Group
   163  	for _, batch := range batches {
   164  		batch := batch
   165  		g.Go(func() error {
   166  			cw.sem <- struct{}{}
   167  			defer func() {
   168  				<-cw.sem
   169  			}()
   170  			_, err := cw.svc.PutMetricData(context.TODO(), &cloudwatch.PutMetricDataInput{
   171  				Namespace:  aws.String(cw.namespace),
   172  				MetricData: batch,
   173  			})
   174  			return err
   175  		})
   176  	}
   177  	return g.Wait()
   178  }
   179  
   180  var zero = float64(0.0)
   181  
   182  // Just build this once to reduce construction costs whenever
   183  // someone does a Send with no aggregated values.
   184  var zeros = types.StatisticSet{
   185  	Maximum:     &zero,
   186  	Minimum:     &zero,
   187  	Sum:         &zero,
   188  	SampleCount: &zero,
   189  }
   190  
   191  func stats(a []float64) *types.StatisticSet {
   192  	count := float64(len(a))
   193  	if count == 0 {
   194  		return &zeros
   195  	}
   196  
   197  	var sum float64
   198  	var min = math.MaxFloat64
   199  	var max = math.MaxFloat64 * -1
   200  	for _, f := range a {
   201  		sum += f
   202  		if f < min {
   203  			min = f
   204  		}
   205  		if f > max {
   206  			max = f
   207  		}
   208  	}
   209  
   210  	return &types.StatisticSet{
   211  		Maximum:     &max,
   212  		Minimum:     &min,
   213  		Sum:         &sum,
   214  		SampleCount: &count,
   215  	}
   216  }
   217  
   218  func makeDimensions(labelValues ...string) []types.Dimension {
   219  	dimensions := make([]types.Dimension, len(labelValues)/2)
   220  	for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
   221  		dimensions[j] = types.Dimension{
   222  			Name:  aws.String(labelValues[i]),
   223  			Value: aws.String(labelValues[i+1]),
   224  		}
   225  	}
   226  	return dimensions
   227  }
   228  
   229  type observeFunc func(name string, lvs lv.LabelValues, value float64)
   230  
   231  // Counter is a counter. Observations are forwarded to a node
   232  // object, and aggregated per timeseries.
   233  type Counter struct {
   234  	name string
   235  	lvs  lv.LabelValues
   236  	obs  observeFunc
   237  }
   238  
   239  // With implements metrics.Counter.
   240  func (c *Counter) With(labelValues ...string) metrics.Counter {
   241  	return &Counter{
   242  		name: c.name,
   243  		lvs:  c.lvs.With(labelValues...),
   244  		obs:  c.obs,
   245  	}
   246  }
   247  
   248  // Add implements metrics.Counter.
   249  func (c *Counter) Add(delta float64) {
   250  	c.obs(c.name, c.lvs, delta)
   251  }
   252  

View as plain text