1
2
3
4 package influx
5
6 import (
7 "context"
8 "time"
9
10 influxdb "github.com/influxdata/influxdb1-client/v2"
11
12 "github.com/go-kit/kit/metrics"
13 "github.com/go-kit/kit/metrics/generic"
14 "github.com/go-kit/kit/metrics/internal/lv"
15 "github.com/go-kit/log"
16 )
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34 type Influx struct {
35 counters *lv.Space
36 gauges *lv.Space
37 histograms *lv.Space
38 tags map[string]string
39 conf influxdb.BatchPointsConfig
40 logger log.Logger
41 }
42
43
44
45
46 func New(tags map[string]string, conf influxdb.BatchPointsConfig, logger log.Logger) *Influx {
47 return &Influx{
48 counters: lv.NewSpace(),
49 gauges: lv.NewSpace(),
50 histograms: lv.NewSpace(),
51 tags: tags,
52 conf: conf,
53 logger: logger,
54 }
55 }
56
57
58 func (in *Influx) NewCounter(name string) *Counter {
59 return &Counter{
60 name: name,
61 obs: in.counters.Observe,
62 }
63 }
64
65
66 func (in *Influx) NewGauge(name string) *Gauge {
67 return &Gauge{
68 name: name,
69 obs: in.gauges.Observe,
70 add: in.gauges.Add,
71 }
72 }
73
74
75 func (in *Influx) NewHistogram(name string) *Histogram {
76 return &Histogram{
77 name: name,
78 obs: in.histograms.Observe,
79 }
80 }
81
82
83
84 type BatchPointsWriter interface {
85 Write(influxdb.BatchPoints) error
86 }
87
88
89
90
91
92 func (in *Influx) WriteLoop(ctx context.Context, c <-chan time.Time, w BatchPointsWriter) {
93 for {
94 select {
95 case <-c:
96 if err := in.WriteTo(w); err != nil {
97 in.logger.Log("during", "WriteTo", "err", err)
98 }
99 case <-ctx.Done():
100 return
101 }
102 }
103 }
104
105
106
107
108
109 func (in *Influx) WriteTo(w BatchPointsWriter) (err error) {
110 bp, err := influxdb.NewBatchPoints(in.conf)
111 if err != nil {
112 return err
113 }
114
115 now := time.Now()
116
117 in.counters.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
118 tags := mergeTags(in.tags, lvs)
119 var p *influxdb.Point
120 fields := map[string]interface{}{"count": sum(values)}
121 p, err = influxdb.NewPoint(name, tags, fields, now)
122 if err != nil {
123 return false
124 }
125 bp.AddPoint(p)
126 return true
127 })
128 if err != nil {
129 return err
130 }
131
132 in.gauges.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
133 tags := mergeTags(in.tags, lvs)
134 var p *influxdb.Point
135 fields := map[string]interface{}{"value": last(values)}
136 p, err = influxdb.NewPoint(name, tags, fields, now)
137 if err != nil {
138 return false
139 }
140 bp.AddPoint(p)
141 return true
142 })
143 if err != nil {
144 return err
145 }
146
147 in.histograms.Reset().Walk(func(name string, lvs lv.LabelValues, values []float64) bool {
148 histogram := generic.NewHistogram(name, 50)
149 tags := mergeTags(in.tags, lvs)
150 var p *influxdb.Point
151 for _, v := range values {
152 histogram.Observe(v)
153 }
154 fields := map[string]interface{}{
155 "p50": histogram.Quantile(0.50),
156 "p90": histogram.Quantile(0.90),
157 "p95": histogram.Quantile(0.95),
158 "p99": histogram.Quantile(0.99),
159 }
160 p, err = influxdb.NewPoint(name, tags, fields, now)
161 if err != nil {
162 return false
163 }
164 bp.AddPoint(p)
165 return true
166 })
167 if err != nil {
168 return err
169 }
170
171 return w.Write(bp)
172 }
173
174 func mergeTags(tags map[string]string, labelValues []string) map[string]string {
175 if len(labelValues)%2 != 0 {
176 panic("mergeTags received a labelValues with an odd number of strings")
177 }
178 ret := make(map[string]string, len(tags)+len(labelValues)/2)
179 for k, v := range tags {
180 ret[k] = v
181 }
182 for i := 0; i < len(labelValues); i += 2 {
183 ret[labelValues[i]] = labelValues[i+1]
184 }
185 return ret
186 }
187
188 func sum(a []float64) float64 {
189 var v float64
190 for _, f := range a {
191 v += f
192 }
193 return v
194 }
195
196 func last(a []float64) float64 {
197 return a[len(a)-1]
198 }
199
200 type observeFunc func(name string, lvs lv.LabelValues, value float64)
201
202
203
204 type Counter struct {
205 name string
206 lvs lv.LabelValues
207 obs observeFunc
208 }
209
210
211 func (c *Counter) With(labelValues ...string) metrics.Counter {
212 return &Counter{
213 name: c.name,
214 lvs: c.lvs.With(labelValues...),
215 obs: c.obs,
216 }
217 }
218
219
220 func (c *Counter) Add(delta float64) {
221 c.obs(c.name, c.lvs, delta)
222 }
223
224
225
226 type Gauge struct {
227 name string
228 lvs lv.LabelValues
229 obs observeFunc
230 add observeFunc
231 }
232
233
234 func (g *Gauge) With(labelValues ...string) metrics.Gauge {
235 return &Gauge{
236 name: g.name,
237 lvs: g.lvs.With(labelValues...),
238 obs: g.obs,
239 add: g.add,
240 }
241 }
242
243
244 func (g *Gauge) Set(value float64) {
245 g.obs(g.name, g.lvs, value)
246 }
247
248
249 func (g *Gauge) Add(delta float64) {
250 g.add(g.name, g.lvs, delta)
251 }
252
253
254
255 type Histogram struct {
256 name string
257 lvs lv.LabelValues
258 obs observeFunc
259 }
260
261
262 func (h *Histogram) With(labelValues ...string) metrics.Histogram {
263 return &Histogram{
264 name: h.name,
265 lvs: h.lvs.With(labelValues...),
266 obs: h.obs,
267 }
268 }
269
270
271 func (h *Histogram) Observe(value float64) {
272 h.obs(h.name, h.lvs, value)
273 }
274
View as plain text