1
2
3
4
5
6
7
8 package graphite
9
10 import (
11 "context"
12 "fmt"
13 "io"
14 "sync"
15 "time"
16
17 "github.com/go-kit/kit/metrics"
18 "github.com/go-kit/kit/metrics/generic"
19 "github.com/go-kit/kit/util/conn"
20 "github.com/go-kit/log"
21 )
22
23
24
25
26
27
28
29
30
31
32
33 type Graphite struct {
34 mtx sync.RWMutex
35 prefix string
36 counters map[string]*Counter
37 gauges map[string]*Gauge
38 histograms map[string]*Histogram
39 logger log.Logger
40 }
41
42
43
44
45 func New(prefix string, logger log.Logger) *Graphite {
46 return &Graphite{
47 prefix: prefix,
48 counters: map[string]*Counter{},
49 gauges: map[string]*Gauge{},
50 histograms: map[string]*Histogram{},
51 logger: logger,
52 }
53 }
54
55
56
57 func (g *Graphite) NewCounter(name string) *Counter {
58 c := NewCounter(g.prefix + name)
59 g.mtx.Lock()
60 g.counters[g.prefix+name] = c
61 g.mtx.Unlock()
62 return c
63 }
64
65
66
67 func (g *Graphite) NewGauge(name string) *Gauge {
68 ga := NewGauge(g.prefix + name)
69 g.mtx.Lock()
70 g.gauges[g.prefix+name] = ga
71 g.mtx.Unlock()
72 return ga
73 }
74
75
76
77
78 func (g *Graphite) NewHistogram(name string, buckets int) *Histogram {
79 h := NewHistogram(g.prefix+name, buckets)
80 g.mtx.Lock()
81 g.histograms[g.prefix+name] = h
82 g.mtx.Unlock()
83 return h
84 }
85
86
87
88
89
90 func (g *Graphite) WriteLoop(ctx context.Context, c <-chan time.Time, w io.Writer) {
91 for {
92 select {
93 case <-c:
94 if _, err := g.WriteTo(w); err != nil {
95 g.logger.Log("during", "WriteTo", "err", err)
96 }
97 case <-ctx.Done():
98 return
99 }
100 }
101 }
102
103
104
105
106
107
108 func (g *Graphite) SendLoop(ctx context.Context, c <-chan time.Time, network, address string) {
109 g.WriteLoop(ctx, c, conn.NewDefaultManager(network, address, g.logger))
110 }
111
112
113
114
115
116
117 func (g *Graphite) WriteTo(w io.Writer) (count int64, err error) {
118 g.mtx.RLock()
119 defer g.mtx.RUnlock()
120 now := time.Now().Unix()
121
122 for name, c := range g.counters {
123 n, err := fmt.Fprintf(w, "%s %f %d\n", name, c.c.ValueReset(), now)
124 if err != nil {
125 return count, err
126 }
127 count += int64(n)
128 }
129
130 for name, ga := range g.gauges {
131 n, err := fmt.Fprintf(w, "%s %f %d\n", name, ga.g.Value(), now)
132 if err != nil {
133 return count, err
134 }
135 count += int64(n)
136 }
137
138 for name, h := range g.histograms {
139 for _, p := range []struct {
140 s string
141 f float64
142 }{
143 {"50", 0.50},
144 {"90", 0.90},
145 {"95", 0.95},
146 {"99", 0.99},
147 } {
148 n, err := fmt.Fprintf(w, "%s.p%s %f %d\n", name, p.s, h.h.Quantile(p.f), now)
149 if err != nil {
150 return count, err
151 }
152 count += int64(n)
153 }
154 }
155
156 return count, err
157 }
158
159
160 type Counter struct {
161 c *generic.Counter
162 }
163
164
165 func NewCounter(name string) *Counter {
166 return &Counter{generic.NewCounter(name)}
167 }
168
169
170 func (c *Counter) With(...string) metrics.Counter { return c }
171
172
173 func (c *Counter) Add(delta float64) { c.c.Add(delta) }
174
175
176 type Gauge struct {
177 g *generic.Gauge
178 }
179
180
181 func NewGauge(name string) *Gauge {
182 return &Gauge{generic.NewGauge(name)}
183 }
184
185
186 func (g *Gauge) With(...string) metrics.Gauge { return g }
187
188
189 func (g *Gauge) Set(value float64) { g.g.Set(value) }
190
191
192 func (g *Gauge) Add(delta float64) { g.g.Add(delta) }
193
194
195
196 type Histogram struct {
197 h *generic.Histogram
198 }
199
200
201 func NewHistogram(name string, buckets int) *Histogram {
202 return &Histogram{generic.NewHistogram(name, buckets)}
203 }
204
205
206 func (h *Histogram) With(...string) metrics.Histogram { return h }
207
208
209 func (h *Histogram) Observe(value float64) { h.h.Observe(value) }
210
View as plain text