1
2
3
4
5
6
7
8
9
10
11 package dogstatsd
12
13 import (
14 "context"
15 "fmt"
16 "io"
17 "math/rand"
18 "strings"
19 "sync"
20 "sync/atomic"
21 "time"
22
23 "github.com/go-kit/kit/metrics"
24 "github.com/go-kit/kit/metrics/generic"
25 "github.com/go-kit/kit/metrics/internal/lv"
26 "github.com/go-kit/kit/metrics/internal/ratemap"
27 "github.com/go-kit/kit/util/conn"
28 "github.com/go-kit/log"
29 )
30
31
32
33
34
35
36
37
38
39
40
41 type Dogstatsd struct {
42 mtx sync.RWMutex
43 prefix string
44 rates *ratemap.RateMap
45 counters *lv.Space
46 gauges map[string]*gaugeNode
47 timings *lv.Space
48 histograms *lv.Space
49 logger log.Logger
50 lvs lv.LabelValues
51 }
52
53
54
55
56 func New(prefix string, logger log.Logger, lvs ...string) *Dogstatsd {
57 if len(lvs)%2 != 0 {
58 panic("odd number of LabelValues; programmer error!")
59 }
60 return &Dogstatsd{
61 prefix: prefix,
62 rates: ratemap.New(),
63 counters: lv.NewSpace(),
64 gauges: map[string]*gaugeNode{},
65 timings: lv.NewSpace(),
66 histograms: lv.NewSpace(),
67 logger: logger,
68 lvs: lvs,
69 }
70 }
71
72
73 func (d *Dogstatsd) NewCounter(name string, sampleRate float64) *Counter {
74 d.rates.Set(name, sampleRate)
75 return &Counter{
76 name: name,
77 obs: sampleObservations(d.counters.Observe, sampleRate),
78 }
79 }
80
81
82 func (d *Dogstatsd) NewGauge(name string) *Gauge {
83 d.mtx.Lock()
84 n, ok := d.gauges[name]
85 if !ok {
86 n = &gaugeNode{gauge: &Gauge{g: generic.NewGauge(name), ddog: d}}
87 d.gauges[name] = n
88 }
89 d.mtx.Unlock()
90 return n.gauge
91 }
92
93
94
95 func (d *Dogstatsd) NewTiming(name string, sampleRate float64) *Timing {
96 d.rates.Set(name, sampleRate)
97 return &Timing{
98 name: name,
99 obs: sampleObservations(d.timings.Observe, sampleRate),
100 }
101 }
102
103
104
105 func (d *Dogstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
106 d.rates.Set(name, sampleRate)
107 return &Histogram{
108 name: name,
109 obs: sampleObservations(d.histograms.Observe, sampleRate),
110 }
111 }
112
113
114
115
116
117 func (d *Dogstatsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
118 for {
119 select {
120 case <-c:
121 if _, err := d.WriteTo(w); err != nil {
122 d.logger.Log("during", "WriteTo", "err", err)
123 }
124 case <-ctx.Done():
125 return
126 }
127 }
128 }
129
130
131
132
133
134
135 func (d *Dogstatsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
136 d.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, d.logger))
137 }
138
139
140
141
142
143 func (d *Dogstatsd) WriteTo(w io.Writer) (count int64, err error) {
144 var n int
145
146 d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
147 n, err = fmt.Fprintf(w, "%s%s:%f|c%s%s\n", d.prefix, name, sum(values), sampling(d.rates.Get(name)), d.tagValues(lvs))
148 if err != nil {
149 return false
150 }
151 count += int64(n)
152 return true
153 })
154 if err != nil {
155 return count, err
156 }
157
158 d.mtx.RLock()
159 for _, root := range d.gauges {
160 root.walk(func(name string, lvs lv.LabelValues, value float64) bool {
161 n, err = fmt.Fprintf(w, "%s%s:%f|g%s\n", d.prefix, name, value, d.tagValues(lvs))
162 if err != nil {
163 return false
164 }
165 count += int64(n)
166 return true
167 })
168 }
169 d.mtx.RUnlock()
170
171 d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
172 sampleRate := d.rates.Get(name)
173 for _, value := range values {
174 n, err = fmt.Fprintf(w, "%s%s:%f|ms%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs))
175 if err != nil {
176 return false
177 }
178 count += int64(n)
179 }
180 return true
181 })
182 if err != nil {
183 return count, err
184 }
185
186 d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
187 sampleRate := d.rates.Get(name)
188 for _, value := range values {
189 n, err = fmt.Fprintf(w, "%s%s:%f|h%s%s\n", d.prefix, name, value, sampling(sampleRate), d.tagValues(lvs))
190 if err != nil {
191 return false
192 }
193 count += int64(n)
194 }
195 return true
196 })
197 if err != nil {
198 return count, err
199 }
200
201 return count, err
202 }
203
204 func sum(a []float64) float64 {
205 var v float64
206 for _, f := range a {
207 v += f
208 }
209 return v
210 }
211
212 func sampling(r float64) string {
213 var sv string
214 if r < 1.0 {
215 sv = fmt.Sprintf("|@%f", r)
216 }
217 return sv
218 }
219
220 func (d *Dogstatsd) tagValues(labelValues []string) string {
221 if len(labelValues) == 0 && len(d.lvs) == 0 {
222 return ""
223 }
224 if len(labelValues)%2 != 0 {
225 panic("tagValues received a labelValues with an odd number of strings")
226 }
227 pairs := make([]string, 0, (len(d.lvs)+len(labelValues))/2)
228 for i := 0; i < len(d.lvs); i += 2 {
229 pairs = append(pairs, d.lvs[i]+":"+d.lvs[i+1])
230 }
231 for i := 0; i < len(labelValues); i += 2 {
232 pairs = append(pairs, labelValues[i]+":"+labelValues[i+1])
233 }
234 return "|#" + strings.Join(pairs, ",")
235 }
236
237 type observeFunc func(name string, lvs lv.LabelValues, value float64)
238
239
240 func sampleObservations(obs observeFunc, sampleRate float64) observeFunc {
241 if sampleRate >= 1 {
242 return obs
243 }
244 return func(name string, lvs lv.LabelValues, value float64) {
245 if rand.Float64() > sampleRate {
246 return
247 }
248 obs(name, lvs, value)
249 }
250 }
251
252
253
254 type Counter struct {
255 name string
256 lvs lv.LabelValues
257 obs observeFunc
258 }
259
260
261 func (c *Counter) With(labelValues ...string) metrics.Counter {
262 return &Counter{
263 name: c.name,
264 lvs: c.lvs.With(labelValues...),
265 obs: c.obs,
266 }
267 }
268
269
270 func (c *Counter) Add(delta float64) {
271 c.obs(c.name, c.lvs, delta)
272 }
273
274
275
276 type Gauge struct {
277 g *generic.Gauge
278 ddog *Dogstatsd
279 set int32
280 }
281
282
283 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
284 g.ddog.mtx.RLock()
285 node := g.ddog.gauges[g.g.Name]
286 g.ddog.mtx.RUnlock()
287
288 ga := &Gauge{g: g.g.With(labelValues...).(*generic.Gauge), ddog: g.ddog}
289 return node.addGauge(ga, ga.g.LabelValues())
290 }
291
292
293 func (g *Gauge) Set(value float64) {
294 g.g.Set(value)
295 g.touch()
296 }
297
298
299 func (g *Gauge) Add(delta float64) {
300 g.g.Add(delta)
301 g.touch()
302 }
303
304
305
306
307 type Timing struct {
308 name string
309 lvs lv.LabelValues
310 obs observeFunc
311 }
312
313
314 func (t *Timing) With(labelValues ...string) metrics.Histogram {
315 return &Timing{
316 name: t.name,
317 lvs: t.lvs.With(labelValues...),
318 obs: t.obs,
319 }
320 }
321
322
323 func (t *Timing) Observe(value float64) {
324 t.obs(t.name, t.lvs, value)
325 }
326
327
328
329 type Histogram struct {
330 name string
331 lvs lv.LabelValues
332 obs observeFunc
333 }
334
335
336 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
337 return &Histogram{
338 name: h.name,
339 lvs: h.lvs.With(labelValues...),
340 obs: h.obs,
341 }
342 }
343
344
345 func (h *Histogram) Observe(value float64) {
346 h.obs(h.name, h.lvs, value)
347 }
348
349 type pair struct{ label, value string }
350
351 type gaugeNode struct {
352 mtx sync.RWMutex
353 gauge *Gauge
354 children map[pair]*gaugeNode
355 }
356
357 func (n *gaugeNode) addGauge(g *Gauge, lvs lv.LabelValues) *Gauge {
358 n.mtx.Lock()
359 defer n.mtx.Unlock()
360 if len(lvs) == 0 {
361 if n.gauge == nil {
362 n.gauge = g
363 }
364 return n.gauge
365 }
366 if len(lvs) < 2 {
367 panic("too few LabelValues; programmer error!")
368 }
369 head, tail := pair{lvs[0], lvs[1]}, lvs[2:]
370 if n.children == nil {
371 n.children = map[pair]*gaugeNode{}
372 }
373 child, ok := n.children[head]
374 if !ok {
375 child = &gaugeNode{}
376 n.children[head] = child
377 }
378 return child.addGauge(g, tail)
379 }
380
381 func (n *gaugeNode) walk(fn func(string, lv.LabelValues, float64) bool) bool {
382 n.mtx.RLock()
383 defer n.mtx.RUnlock()
384 if n.gauge != nil {
385 value, ok := n.gauge.read()
386 if ok && !fn(n.gauge.g.Name, n.gauge.g.LabelValues(), value) {
387 return false
388 }
389 }
390 for _, child := range n.children {
391 if !child.walk(fn) {
392 return false
393 }
394 }
395 return true
396 }
397
398 func (g *Gauge) touch() {
399 atomic.StoreInt32(&(g.set), 1)
400 }
401
402 func (g *Gauge) read() (float64, bool) {
403 set := atomic.SwapInt32(&(g.set), 0)
404 return g.g.Value(), set != 0
405 }
406
View as plain text