1
2
3
4
5
6
7
8
9
10
11 package influxstatsd
12
13 import (
14 "context"
15 "fmt"
16 "io"
17 "strings"
18 "sync"
19 "sync/atomic"
20 "time"
21
22 "github.com/go-kit/kit/metrics"
23 "github.com/go-kit/kit/metrics/generic"
24 "github.com/go-kit/kit/metrics/internal/lv"
25 "github.com/go-kit/kit/metrics/internal/ratemap"
26 "github.com/go-kit/kit/util/conn"
27 "github.com/go-kit/log"
28 )
29
30
31
32
33
34
35
36
37
38
39
40 type Influxstatsd struct {
41 mtx sync.RWMutex
42 prefix string
43 rates *ratemap.RateMap
44 counters *lv.Space
45 gauges map[string]*gaugeNode
46 timings *lv.Space
47 histograms *lv.Space
48 logger log.Logger
49 lvs lv.LabelValues
50 }
51
52
53
54
55 func New(prefix string, logger log.Logger, lvs ...string) *Influxstatsd {
56 if len(lvs)%2 != 0 {
57 panic("odd number of LabelValues; programmer error!")
58 }
59 return &Influxstatsd{
60 prefix: prefix,
61 rates: ratemap.New(),
62 counters: lv.NewSpace(),
63 gauges: map[string]*gaugeNode{},
64 timings: lv.NewSpace(),
65 histograms: lv.NewSpace(),
66 logger: logger,
67 lvs: lvs,
68 }
69 }
70
71
72 func (d *Influxstatsd) NewCounter(name string, sampleRate float64) *Counter {
73 d.rates.Set(name, sampleRate)
74 return &Counter{
75 name: name,
76 obs: d.counters.Observe,
77 }
78 }
79
80
81 func (d *Influxstatsd) NewGauge(name string) *Gauge {
82 d.mtx.Lock()
83 n, ok := d.gauges[name]
84 if !ok {
85 n = &gaugeNode{gauge: &Gauge{g: generic.NewGauge(name), influx: d}}
86 d.gauges[name] = n
87 }
88 d.mtx.Unlock()
89 return n.gauge
90 }
91
92
93
94 func (d *Influxstatsd) NewTiming(name string, sampleRate float64) *Timing {
95 d.rates.Set(name, sampleRate)
96 return &Timing{
97 name: name,
98 obs: d.timings.Observe,
99 }
100 }
101
102
103
104 func (d *Influxstatsd) NewHistogram(name string, sampleRate float64) *Histogram {
105 d.rates.Set(name, sampleRate)
106 return &Histogram{
107 name: name,
108 obs: d.histograms.Observe,
109 }
110 }
111
112
113
114
115
116 func (d *Influxstatsd) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
117 for {
118 select {
119 case <-c:
120 if _, err := d.WriteTo(w); err != nil {
121 d.logger.Log("during", "WriteTo", "err", err)
122 }
123 case <-ctx.Done():
124 return
125 }
126 }
127 }
128
129
130
131
132
133
134 func (d *Influxstatsd) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
135 d.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, d.logger))
136 }
137
138
139
140
141
142 func (d *Influxstatsd) WriteTo(w io.Writer) (count int64, err error) {
143 var n int
144
145 d.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
146 n, err = fmt.Fprintf(w, "%s%s%s:%f|c%s\n", d.prefix, name, d.tagValues(lvs), sum(values), sampling(d.rates.Get(name)))
147 if err != nil {
148 return false
149 }
150 count += int64(n)
151 return true
152 })
153 if err != nil {
154 return count, err
155 }
156
157 d.mtx.RLock()
158 for _, root := range d.gauges {
159 root.walk(func(name string, lvs lv.LabelValues, value float64) bool {
160 n, err = fmt.Fprintf(w, "%s%s%s:%f|g\n", d.prefix, name, d.tagValues(lvs), value)
161 if err != nil {
162 return false
163 }
164 count += int64(n)
165 return true
166 })
167 }
168 d.mtx.RUnlock()
169
170 d.timings.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
171 sampleRate := d.rates.Get(name)
172 for _, value := range values {
173 n, err = fmt.Fprintf(w, "%s%s%s:%f|ms%s\n", d.prefix, name, d.tagValues(lvs), value, sampling(sampleRate))
174 if err != nil {
175 return false
176 }
177 count += int64(n)
178 }
179 return true
180 })
181 if err != nil {
182 return count, err
183 }
184
185 d.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
186 sampleRate := d.rates.Get(name)
187 for _, value := range values {
188 n, err = fmt.Fprintf(w, "%s%s%s:%f|h%s\n", d.prefix, name, d.tagValues(lvs), value, sampling(sampleRate))
189 if err != nil {
190 return false
191 }
192 count += int64(n)
193 }
194 return true
195 })
196 if err != nil {
197 return count, err
198 }
199
200 return count, err
201 }
202
203 func sum(a []float64) float64 {
204 var v float64
205 for _, f := range a {
206 v += f
207 }
208 return v
209 }
210
211 func sampling(r float64) string {
212 var sv string
213 if r < 1.0 {
214 sv = fmt.Sprintf("|@%f", r)
215 }
216 return sv
217 }
218
219 func (d *Influxstatsd) tagValues(labelValues []string) string {
220 if len(labelValues) == 0 && len(d.lvs) == 0 {
221 return ""
222 }
223 if len(labelValues)%2 != 0 {
224 panic("tagValues received a labelValues with an odd number of strings")
225 }
226 pairs := make([]string, 0, (len(d.lvs)+len(labelValues))/2)
227 for i := 0; i < len(d.lvs); i += 2 {
228 pairs = append(pairs, d.lvs[i]+"="+d.lvs[i+1])
229 }
230 for i := 0; i < len(labelValues); i += 2 {
231 pairs = append(pairs, labelValues[i]+"="+labelValues[i+1])
232 }
233 return "," + strings.Join(pairs, ",")
234 }
235
236 type observeFunc func(name string, lvs lv.LabelValues, value float64)
237
238
239
240 type Counter struct {
241 name string
242 lvs lv.LabelValues
243 obs observeFunc
244 }
245
246
247 func (c *Counter) With(labelValues ...string) metrics.Counter {
248 return &Counter{
249 name: c.name,
250 lvs: c.lvs.With(labelValues...),
251 obs: c.obs,
252 }
253 }
254
255
256 func (c *Counter) Add(delta float64) {
257 c.obs(c.name, c.lvs, delta)
258 }
259
260
261
262 type Gauge struct {
263 g *generic.Gauge
264 influx *Influxstatsd
265 set int32
266 }
267
268
269 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
270 g.influx.mtx.RLock()
271 node := g.influx.gauges[g.g.Name]
272 g.influx.mtx.RUnlock()
273
274 ga := &Gauge{g: g.g.With(labelValues...).(*generic.Gauge), influx: g.influx}
275 return node.addGauge(ga, ga.g.LabelValues())
276 }
277
278
279 func (g *Gauge) Set(value float64) {
280 g.g.Set(value)
281 g.touch()
282 }
283
284
285 func (g *Gauge) Add(delta float64) {
286 g.g.Add(delta)
287 g.touch()
288 }
289
290
291
292
293 type Timing struct {
294 name string
295 lvs lv.LabelValues
296 obs observeFunc
297 }
298
299
300 func (t *Timing) With(labelValues ...string) metrics.Histogram {
301 return &Timing{
302 name: t.name,
303 lvs: t.lvs.With(labelValues...),
304 obs: t.obs,
305 }
306 }
307
308
309 func (t *Timing) Observe(value float64) {
310 t.obs(t.name, t.lvs, value)
311 }
312
313
314
315 type Histogram struct {
316 name string
317 lvs lv.LabelValues
318 obs observeFunc
319 }
320
321
322 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
323 return &Histogram{
324 name: h.name,
325 lvs: h.lvs.With(labelValues...),
326 obs: h.obs,
327 }
328 }
329
330
331 func (h *Histogram) Observe(value float64) {
332 h.obs(h.name, h.lvs, value)
333 }
334
335 type pair struct{ label, value string }
336
337 type gaugeNode struct {
338 mtx sync.RWMutex
339 gauge *Gauge
340 children map[pair]*gaugeNode
341 }
342
343 func (n *gaugeNode) addGauge(g *Gauge, lvs lv.LabelValues) *Gauge {
344 n.mtx.Lock()
345 defer n.mtx.Unlock()
346 if len(lvs) == 0 {
347 if n.gauge == nil {
348 n.gauge = g
349 }
350 return n.gauge
351 }
352 if len(lvs) < 2 {
353 panic("too few LabelValues; programmer error!")
354 }
355 head, tail := pair{lvs[0], lvs[1]}, lvs[2:]
356 if n.children == nil {
357 n.children = map[pair]*gaugeNode{}
358 }
359 child, ok := n.children[head]
360 if !ok {
361 child = &gaugeNode{}
362 n.children[head] = child
363 }
364 return child.addGauge(g, tail)
365 }
366
367 func (n *gaugeNode) walk(fn func(string, lv.LabelValues, float64) bool) bool {
368 n.mtx.RLock()
369 defer n.mtx.RUnlock()
370 if n.gauge != nil {
371 value, ok := n.gauge.read()
372 if ok && !fn(n.gauge.g.Name, n.gauge.g.LabelValues(), value) {
373 return false
374 }
375 }
376 for _, child := range n.children {
377 if !child.walk(fn) {
378 return false
379 }
380 }
381 return true
382 }
383
384 func (g *Gauge) touch() {
385 atomic.StoreInt32(&(g.set), 1)
386 }
387
388 func (g *Gauge) read() (float64, bool) {
389 set := atomic.SwapInt32(&(g.set), 0)
390 return g.g.Value(), set != 0
391 }
392
View as plain text