1 package cloudwatch
2
3 import (
4 "context"
5 "fmt"
6 "os"
7 "strconv"
8 "sync"
9 "time"
10
11 "github.com/aws/aws-sdk-go/aws"
12 "github.com/aws/aws-sdk-go/service/cloudwatch"
13 "github.com/aws/aws-sdk-go/service/cloudwatch/cloudwatchiface"
14
15 "github.com/go-kit/kit/metrics"
16 "github.com/go-kit/kit/metrics/generic"
17 "github.com/go-kit/kit/metrics/internal/lv"
18 "github.com/go-kit/log"
19 )
20
21 const (
22 maxConcurrentRequests = 20
23 maxValuesInABatch = 150
24 )
25
26
27
28
29
30
31 type CloudWatch struct {
32 mtx sync.RWMutex
33 sem chan struct{}
34 namespace string
35 svc cloudwatchiface.CloudWatchAPI
36 counters *lv.Space
37 gauges *lv.Space
38 histograms *lv.Space
39 percentiles []float64
40 logger log.Logger
41 numConcurrentRequests int
42 }
43
44
45 type Option func(*CloudWatch)
46
47
48
49 func WithLogger(logger log.Logger) Option {
50 return func(c *CloudWatch) {
51 c.logger = logger
52 }
53 }
54
55
56
57
58
59 func WithPercentiles(percentiles ...float64) Option {
60 return func(c *CloudWatch) {
61 c.percentiles = make([]float64, 0, len(percentiles))
62 for _, p := range percentiles {
63 if p < 0 || p > 1 {
64 continue
65 }
66 c.percentiles = append(c.percentiles, p)
67 }
68 }
69 }
70
71
72
73
74
75 func WithConcurrentRequests(n int) Option {
76 return func(c *CloudWatch) {
77 if n > maxConcurrentRequests {
78 n = maxConcurrentRequests
79 }
80 c.numConcurrentRequests = n
81 }
82 }
83
84
85
86
87
88 func New(namespace string, svc cloudwatchiface.CloudWatchAPI, options ...Option) *CloudWatch {
89 cw := &CloudWatch{
90 sem: nil,
91 namespace: namespace,
92 svc: svc,
93 counters: lv.NewSpace(),
94 gauges: lv.NewSpace(),
95 histograms: lv.NewSpace(),
96 numConcurrentRequests: 10,
97 logger: log.NewLogfmtLogger(os.Stderr),
98 percentiles: []float64{0.50, 0.90, 0.95, 0.99},
99 }
100
101 for _, opt := range options {
102 opt(cw)
103 }
104
105 cw.sem = make(chan struct{}, cw.numConcurrentRequests)
106
107 return cw
108 }
109
110
111
112 func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
113 return &Counter{
114 name: name,
115 obs: cw.counters.Observe,
116 }
117 }
118
119
120 func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
121 return &Gauge{
122 name: name,
123 obs: cw.gauges.Observe,
124 add: cw.gauges.Add,
125 }
126 }
127
128
129 func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
130 return &Histogram{
131 name: name,
132 obs: cw.histograms.Observe,
133 }
134 }
135
136
137
138
139
140 func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) {
141 for {
142 select {
143 case <-c:
144 if err := cw.Send(); err != nil {
145 cw.logger.Log("during", "Send", "err", err)
146 }
147 case <-ctx.Done():
148 return
149 }
150 }
151 }
152
153
154
155 func (cw *CloudWatch) Send() error {
156 cw.mtx.RLock()
157 defer cw.mtx.RUnlock()
158 now := time.Now()
159
160 var datums []*cloudwatch.MetricDatum
161
162 cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
163 value := sum(values)
164 datums = append(datums, &cloudwatch.MetricDatum{
165 MetricName: aws.String(name),
166 Dimensions: makeDimensions(lvs...),
167 Value: aws.Float64(value),
168 Timestamp: aws.Time(now),
169 })
170 return true
171 })
172
173 cw.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
174 if len(values) == 0 {
175 return true
176 }
177
178 datum := &cloudwatch.MetricDatum{
179 MetricName: aws.String(name),
180 Dimensions: makeDimensions(lvs...),
181 Timestamp: aws.Time(now),
182 }
183
184
185
186 valuesCounter := make(map[float64]int)
187 for _, v := range values {
188 valuesCounter[v]++
189 }
190
191 for value, count := range valuesCounter {
192 if len(datum.Values) == maxValuesInABatch {
193 break
194 }
195 datum.Values = append(datum.Values, aws.Float64(value))
196 datum.Counts = append(datum.Counts, aws.Float64(float64(count)))
197 }
198
199 datums = append(datums, datum)
200 return true
201 })
202
203
204
205
206
207 formatPerc := func(p float64) string {
208 return strconv.FormatFloat(p*100, 'f', -1, 64)
209 }
210
211 cw.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
212 histogram := generic.NewHistogram(name, 50)
213
214 for _, v := range values {
215 histogram.Observe(v)
216 }
217
218 for _, perc := range cw.percentiles {
219 value := histogram.Quantile(perc)
220 datums = append(datums, &cloudwatch.MetricDatum{
221 MetricName: aws.String(fmt.Sprintf("%s_%s", name, formatPerc(perc))),
222 Dimensions: makeDimensions(lvs...),
223 Value: aws.Float64(value),
224 Timestamp: aws.Time(now),
225 })
226 }
227 return true
228 })
229
230 var batches [][]*cloudwatch.MetricDatum
231 for len(datums) > 0 {
232 var batch []*cloudwatch.MetricDatum
233 lim := min(len(datums), maxConcurrentRequests)
234 batch, datums = datums[:lim], datums[lim:]
235 batches = append(batches, batch)
236 }
237
238 var errors = make(chan error, len(batches))
239 for _, batch := range batches {
240 go func(batch []*cloudwatch.MetricDatum) {
241 cw.sem <- struct{}{}
242 defer func() {
243 <-cw.sem
244 }()
245 _, err := cw.svc.PutMetricData(&cloudwatch.PutMetricDataInput{
246 Namespace: aws.String(cw.namespace),
247 MetricData: batch,
248 })
249 errors <- err
250 }(batch)
251 }
252 var firstErr error
253 for i := 0; i < cap(errors); i++ {
254 if err := <-errors; err != nil && firstErr == nil {
255 firstErr = err
256 }
257 }
258
259 return firstErr
260 }
261
262 func sum(a []float64) float64 {
263 var v float64
264 for _, f := range a {
265 v += f
266 }
267 return v
268 }
269
270 func min(a, b int) int {
271 if a < b {
272 return a
273 }
274 return b
275 }
276
277 type observeFunc func(name string, lvs lv.LabelValues, value float64)
278
279
280
281 type Counter struct {
282 name string
283 lvs lv.LabelValues
284 obs observeFunc
285 }
286
287
288 func (c *Counter) With(labelValues ...string) metrics.Counter {
289 return &Counter{
290 name: c.name,
291 lvs: c.lvs.With(labelValues...),
292 obs: c.obs,
293 }
294 }
295
296
297 func (c *Counter) Add(delta float64) {
298 c.obs(c.name, c.lvs, delta)
299 }
300
301
302
303 type Gauge struct {
304 name string
305 lvs lv.LabelValues
306 obs observeFunc
307 add observeFunc
308 }
309
310
311 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
312 return &Gauge{
313 name: g.name,
314 lvs: g.lvs.With(labelValues...),
315 obs: g.obs,
316 add: g.add,
317 }
318 }
319
320
321 func (g *Gauge) Set(value float64) {
322 g.obs(g.name, g.lvs, value)
323 }
324
325
326 func (g *Gauge) Add(delta float64) {
327 g.add(g.name, g.lvs, delta)
328 }
329
330
331
332 type Histogram struct {
333 name string
334 lvs lv.LabelValues
335 obs observeFunc
336 }
337
338
339 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
340 return &Histogram{
341 name: h.name,
342 lvs: h.lvs.With(labelValues...),
343 obs: h.obs,
344 }
345 }
346
347
348 func (h *Histogram) Observe(value float64) {
349 h.obs(h.name, h.lvs, value)
350 }
351
352 func makeDimensions(labelValues ...string) []*cloudwatch.Dimension {
353 dimensions := make([]*cloudwatch.Dimension, len(labelValues)/2)
354 for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
355 dimensions[j] = &cloudwatch.Dimension{
356 Name: aws.String(labelValues[i]),
357 Value: aws.String(labelValues[i+1]),
358 }
359 }
360 return dimensions
361 }
362
View as plain text