1
2
3
4
5
6
7
8
9 package statsd
10
11 import (
12 "context"
13 "fmt"
14 "io"
15 "time"
16
17 "github.com/go-kit/kit/metrics"
18 "github.com/go-kit/kit/metrics/internal/lv"
19 "github.com/go-kit/kit/metrics/internal/ratemap"
20 "github.com/go-kit/kit/util/conn"
21 "github.com/go-kit/log"
22 )
23
24
25
26
27
28
29
30
31
32
33
34 type Statsd struct {
35 prefix string
36 rates *ratemap.RateMap
37
38
39
40
41
42
43 counters *lv.Space
44 gauges *lv.Space
45 timings *lv.Space
46
47 logger log.Logger
48 }
49
50
51
52
53 func New(prefix string, logger log.Logger) *Statsd {
54 return &Statsd{
55 prefix: prefix,
56 rates: ratemap.New(),
57 counters: lv.NewSpace(),
58 gauges: lv.NewSpace(),
59 timings: lv.NewSpace(),
60 logger: logger,
61 }
62 }
63
64
65 func (s *Statsd) NewCounter(name string, sampleRate float64) *Counter {
66 s.rates.Set(s.prefix+name, sampleRate)
67 return &Counter{
68 name: s.prefix + name,
69 obs: s.counters.Observe,
70 }
71 }
72
73
74 func (s *Statsd) NewGauge(name string) *Gauge {
75 return &Gauge{
76 name: s.prefix + name,
77 obs: s.gauges.Observe,
78 add: s.gauges.Add,
79 }
80 }
81
82
83
84 func (s *Statsd) NewTiming(name string, sampleRate float64) *Timing {
85 s.rates.Set(s.prefix+name, sampleRate)
86 return &Timing{
87 name: s.prefix + name,
88 obs: s.timings.Observe,
89 }
90 }
91
92
93
94
95
96 func (s *Statsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
97 for {
98 select {
99 case <-c:
100 if _, err := s.WriteTo(w); err != nil {
101 s.logger.Log("during", "WriteTo", "err", err)
102 }
103 case <-ctx.Done():
104 return
105 }
106 }
107 }
108
109
110
111
112
113
114 func (s *Statsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
115 s.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, s.logger))
116 }
117
118
119
120
121
122 func (s *Statsd) WriteTo(w io.Writer) (count int64, err error) {
123 var n int
124
125 s.counters.Reset().Walk(func(name string, _ lv.LabelValues, values []float64) bool {
126 n, err = fmt.Fprintf(w, "%s:%f|c%s\n", name, sum(values), sampling(s.rates.Get(name)))
127 if err != nil {
128 return false
129 }
130 count += int64(n)
131 return true
132 })
133 if err != nil {
134 return count, err
135 }
136
137 s.gauges.Reset().Walk(func(name string, _ lv.LabelValues, values []float64) bool {
138 n, err = fmt.Fprintf(w, "%s:%f|g\n", name, last(values))
139 if err != nil {
140 return false
141 }
142 count += int64(n)
143 return true
144 })
145 if err != nil {
146 return count, err
147 }
148
149 s.timings.Reset().Walk(func(name string, _ lv.LabelValues, values []float64) bool {
150 sampleRate := s.rates.Get(name)
151 for _, value := range values {
152 n, err = fmt.Fprintf(w, "%s:%f|ms%s\n", name, value, sampling(sampleRate))
153 if err != nil {
154 return false
155 }
156 count += int64(n)
157 }
158 return true
159 })
160 if err != nil {
161 return count, err
162 }
163
164 return count, err
165 }
166
167 func sum(a []float64) float64 {
168 var v float64
169 for _, f := range a {
170 v += f
171 }
172 return v
173 }
174
175 func last(a []float64) float64 {
176 return a[len(a)-1]
177 }
178
179 func sampling(r float64) string {
180 var sv string
181 if r < 1.0 {
182 sv = fmt.Sprintf("|@%f", r)
183 }
184 return sv
185 }
186
187 type observeFunc func(name string, lvs lv.LabelValues, value float64)
188
189
190
191 type Counter struct {
192 name string
193 obs observeFunc
194 }
195
196
197 func (c *Counter) With(...string) metrics.Counter {
198 return c
199 }
200
201
202 func (c *Counter) Add(delta float64) {
203 c.obs(c.name, lv.LabelValues{}, delta)
204 }
205
206
207
208 type Gauge struct {
209 name string
210 obs observeFunc
211 add observeFunc
212 }
213
214
215 func (g *Gauge) With(...string) metrics.Gauge {
216 return g
217 }
218
219
220 func (g *Gauge) Set(value float64) {
221 g.obs(g.name, lv.LabelValues{}, value)
222 }
223
224
225 func (g *Gauge) Add(delta float64) {
226 g.add(g.name, lv.LabelValues{}, delta)
227 }
228
229
230
231
232 type Timing struct {
233 name string
234 obs observeFunc
235 }
236
237
238 func (t *Timing) With(...string) metrics.Histogram {
239 return t
240 }
241
242
243 func (t *Timing) Observe(value float64) {
244 t.obs(t.name, lv.LabelValues{}, value)
245 }
246
View as plain text