1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31 package datadog
32
33 import (
34 "context"
35 "fmt"
36 "sort"
37 "strings"
38 "time"
39
40 "github.com/DataDog/datadog-go/statsd"
41 "github.com/palantir/go-baseapp/baseapp"
42 "github.com/pkg/errors"
43 "github.com/rcrowley/go-metrics"
44 )
45
46 const (
47 DefaultAddress = "127.0.0.1:8125"
48 DefaultInterval = 10 * time.Second
49 )
50
51 type Config struct {
52 Address string `yaml:"address" json:"address"`
53 Interval time.Duration `yaml:"interval" json:"interval"`
54 Tags []string `yaml:"tags" json:"tags"`
55 }
56
57
58
59 func StartEmitter(s *baseapp.Server, c Config) error {
60 if c.Address == "" {
61 c.Address = DefaultAddress
62 }
63 if c.Interval == 0 {
64 c.Interval = DefaultInterval
65 }
66
67 client, err := statsd.New(c.Address)
68 if err != nil {
69 return errors.Wrap(err, "datadog: failed to create client")
70 }
71 client.Tags = append(client.Tags, c.Tags...)
72
73 emitter := NewEmitter(client, s.Registry())
74 go emitter.Emit(context.Background(), c.Interval)
75
76 return nil
77 }
78
79 type Emitter struct {
80 client *statsd.Client
81 registry metrics.Registry
82
83 counters map[string]int64
84 }
85
86 func NewEmitter(client *statsd.Client, registry metrics.Registry) *Emitter {
87 return &Emitter{
88 registry: registry,
89 client: client,
90 counters: make(map[string]int64),
91 }
92 }
93
94 func (e *Emitter) Emit(ctx context.Context, interval time.Duration) {
95 t := time.NewTicker(interval)
96 defer t.Stop()
97
98 for {
99 select {
100 case <-t.C:
101 e.EmitOnce()
102 case <-ctx.Done():
103 return
104 }
105 }
106 }
107
108 func (e *Emitter) EmitOnce() {
109 e.registry.Each(func(name string, metric interface{}) {
110 name, tags := tagsFromName(name)
111
112 switch m := metric.(type) {
113 case metrics.Counter:
114 key := fmt.Sprintf("%s[%s]", name, strings.Join(tags, ","))
115
116
117
118
119 value := m.Count()
120 value, e.counters[key] = value-e.counters[key], value
121 _ = e.client.Count(name, value, tags, 1)
122
123 case metrics.Gauge:
124 _ = e.client.Gauge(name, float64(m.Value()), tags, 1)
125
126 case metrics.GaugeFloat64:
127 _ = e.client.Gauge(name, m.Value(), tags, 1)
128
129 case metrics.Histogram:
130 ms := m.Snapshot()
131 _ = e.client.Gauge(name+".avg", ms.Mean(), tags, 1)
132 _ = e.client.Gauge(name+".count", float64(ms.Count()), tags, 1)
133 _ = e.client.Gauge(name+".max", float64(ms.Max()), tags, 1)
134 _ = e.client.Gauge(name+".median", ms.Percentile(0.5), tags, 1)
135 _ = e.client.Gauge(name+".min", float64(ms.Min()), tags, 1)
136 _ = e.client.Gauge(name+".sum", float64(ms.Sum()), tags, 1)
137 _ = e.client.Gauge(name+".95percentile", ms.Percentile(0.95), tags, 1)
138
139 case metrics.Meter:
140 ms := m.Snapshot()
141 _ = e.client.Gauge(name+".avg", ms.RateMean(), tags, 1)
142 _ = e.client.Gauge(name+".count", float64(ms.Count()), tags, 1)
143 _ = e.client.Gauge(name+".rate1", ms.Rate1(), tags, 1)
144 _ = e.client.Gauge(name+".rate5", ms.Rate5(), tags, 1)
145 _ = e.client.Gauge(name+".rate15", ms.Rate15(), tags, 1)
146
147 case metrics.Timer:
148 ms := m.Snapshot()
149 _ = e.client.Gauge(name+".avg", ms.Mean(), tags, 1)
150 _ = e.client.Gauge(name+".count", float64(ms.Count()), tags, 1)
151 _ = e.client.Gauge(name+".max", float64(ms.Max()), tags, 1)
152 _ = e.client.Gauge(name+".median", ms.Percentile(0.5), tags, 1)
153 _ = e.client.Gauge(name+".min", float64(ms.Min()), tags, 1)
154 _ = e.client.Gauge(name+".sum", float64(ms.Sum()), tags, 1)
155 _ = e.client.Gauge(name+".95percentile", ms.Percentile(0.95), tags, 1)
156 }
157 })
158 }
159
160
161
162 func tagsFromName(name string) (string, []string) {
163 start := strings.IndexRune(name, '[')
164 if start < 0 || name[len(name)-1] != ']' {
165 return name, nil
166 }
167
168 tags := strings.Split(name[start+1:len(name)-1], ",")
169 sort.Strings(tags)
170
171 return name[:start], tags
172 }
173
View as plain text