1
2
3 package cloudwatch2
4
5 import (
6 "context"
7 "math"
8 "sync"
9 "time"
10
11 "github.com/aws/aws-sdk-go-v2/aws"
12 "github.com/aws/aws-sdk-go-v2/service/cloudwatch"
13 "github.com/aws/aws-sdk-go-v2/service/cloudwatch/types"
14 "golang.org/x/sync/errgroup"
15
16 "github.com/go-kit/kit/metrics"
17 "github.com/go-kit/kit/metrics/internal/convert"
18 "github.com/go-kit/kit/metrics/internal/lv"
19 "github.com/go-kit/log"
20 )
21
22 const (
23 maxConcurrentRequests = 20
24 )
25
26
27 type CloudWatchAPI interface {
28 PutMetricData(ctx context.Context, params *cloudwatch.PutMetricDataInput, optFns ...func(*cloudwatch.Options)) (*cloudwatch.PutMetricDataOutput, error)
29 }
30
31
32
33
34
35
36 type CloudWatch struct {
37 mtx sync.RWMutex
38 sem chan struct{}
39 namespace string
40 svc CloudWatchAPI
41 counters *lv.Space
42 logger log.Logger
43 numConcurrentRequests int
44 }
45
46
47 type Option func(*CloudWatch)
48
49
50
51 func WithLogger(logger log.Logger) Option {
52 return func(cw *CloudWatch) {
53 cw.logger = logger
54 }
55 }
56
57
58
59
60
61 func WithConcurrentRequests(n int) Option {
62 return func(cw *CloudWatch) {
63 if n > maxConcurrentRequests {
64 n = maxConcurrentRequests
65 }
66 cw.numConcurrentRequests = n
67 }
68 }
69
70
71
72
73
74 func New(namespace string, svc CloudWatchAPI, options ...Option) *CloudWatch {
75 cw := &CloudWatch{
76 namespace: namespace,
77 svc: svc,
78 counters: lv.NewSpace(),
79 numConcurrentRequests: 10,
80 logger: log.NewNopLogger(),
81 }
82
83 for _, optFunc := range options {
84 optFunc(cw)
85 }
86
87 cw.sem = make(chan struct{}, cw.numConcurrentRequests)
88
89 return cw
90 }
91
92
93
94 func (cw *CloudWatch) NewCounter(name string) metrics.Counter {
95 return &Counter{
96 name: name,
97 obs: cw.counters.Observe,
98 }
99 }
100
101
102
103
104 func (cw *CloudWatch) NewGauge(name string) metrics.Gauge {
105 return convert.NewCounterAsGauge(cw.NewCounter(name))
106 }
107
108
109
110
111 func (cw *CloudWatch) NewHistogram(name string) metrics.Histogram {
112 return convert.NewCounterAsHistogram(cw.NewCounter(name))
113 }
114
115
116
117
118
119 func (cw *CloudWatch) WriteLoop(ctx context.Context, c <-chan time.Time) {
120 for {
121 select {
122 case <-c:
123 if err := cw.Send(); err != nil {
124 cw.logger.Log("during", "Send", "err", err)
125 }
126 case <-ctx.Done():
127 return
128 }
129 }
130 }
131
132
133
134 func (cw *CloudWatch) Send() error {
135 cw.mtx.RLock()
136 defer cw.mtx.RUnlock()
137 now := time.Now()
138
139 var datums []types.MetricDatum
140
141 cw.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
142 datums = append(datums, types.MetricDatum{
143 MetricName: aws.String(name),
144 Dimensions: makeDimensions(lvs...),
145 StatisticValues: stats(values),
146 Timestamp: aws.Time(now),
147 })
148 return true
149 })
150
151 var batches [][]types.MetricDatum
152 for len(datums) > 0 {
153 var batch []types.MetricDatum
154 lim := len(datums)
155 if lim > maxConcurrentRequests {
156 lim = maxConcurrentRequests
157 }
158 batch, datums = datums[:lim], datums[lim:]
159 batches = append(batches, batch)
160 }
161
162 var g errgroup.Group
163 for _, batch := range batches {
164 batch := batch
165 g.Go(func() error {
166 cw.sem <- struct{}{}
167 defer func() {
168 <-cw.sem
169 }()
170 _, err := cw.svc.PutMetricData(context.TODO(), &cloudwatch.PutMetricDataInput{
171 Namespace: aws.String(cw.namespace),
172 MetricData: batch,
173 })
174 return err
175 })
176 }
177 return g.Wait()
178 }
179
180 var zero = float64(0.0)
181
182
183
184 var zeros = types.StatisticSet{
185 Maximum: &zero,
186 Minimum: &zero,
187 Sum: &zero,
188 SampleCount: &zero,
189 }
190
191 func stats(a []float64) *types.StatisticSet {
192 count := float64(len(a))
193 if count == 0 {
194 return &zeros
195 }
196
197 var sum float64
198 var min = math.MaxFloat64
199 var max = math.MaxFloat64 * -1
200 for _, f := range a {
201 sum += f
202 if f < min {
203 min = f
204 }
205 if f > max {
206 max = f
207 }
208 }
209
210 return &types.StatisticSet{
211 Maximum: &max,
212 Minimum: &min,
213 Sum: &sum,
214 SampleCount: &count,
215 }
216 }
217
218 func makeDimensions(labelValues ...string) []types.Dimension {
219 dimensions := make([]types.Dimension, len(labelValues)/2)
220 for i, j := 0, 0; i < len(labelValues); i, j = i+2, j+1 {
221 dimensions[j] = types.Dimension{
222 Name: aws.String(labelValues[i]),
223 Value: aws.String(labelValues[i+1]),
224 }
225 }
226 return dimensions
227 }
228
229 type observeFunc func(name string, lvs lv.LabelValues, value float64)
230
231
232
233 type Counter struct {
234 name string
235 lvs lv.LabelValues
236 obs observeFunc
237 }
238
239
240 func (c *Counter) With(labelValues ...string) metrics.Counter {
241 return &Counter{
242 name: c.name,
243 lvs: c.lvs.With(labelValues...),
244 obs: c.obs,
245 }
246 }
247
248
249 func (c *Counter) Add(delta float64) {
250 c.obs(c.name, c.lvs, delta)
251 }
252
View as plain text