...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package metricexport
17
18 import (
19 "context"
20 "fmt"
21 "sync"
22 "time"
23
24 "go.opencensus.io/metric/metricdata"
25 "go.opencensus.io/metric/metricproducer"
26 "go.opencensus.io/trace"
27 )
28
29 var (
30 defaultSampler = trace.ProbabilitySampler(0.0001)
31 errReportingIntervalTooLow = fmt.Errorf("reporting interval less than %d", minimumReportingDuration)
32 errAlreadyStarted = fmt.Errorf("already started")
33 errIntervalReaderNil = fmt.Errorf("interval reader is nil")
34 errExporterNil = fmt.Errorf("exporter is nil")
35 errReaderNil = fmt.Errorf("reader is nil")
36 )
37
38 const (
39 defaultReportingDuration = 60 * time.Second
40 minimumReportingDuration = 1 * time.Second
41 defaultSpanName = "ExportMetrics"
42 )
43
44
45 type ReaderOptions struct {
46
47 SpanName string
48 }
49
50
51
52
53 type Reader struct {
54 sampler trace.Sampler
55
56 spanName string
57 }
58
59
60
61
62 type IntervalReader struct {
63
64
65
66 ReportingInterval time.Duration
67
68 exporter Exporter
69 timer *time.Ticker
70 quit, done chan bool
71 mu sync.RWMutex
72 reader *Reader
73 }
74
75
76 type ReaderOption func(*ReaderOptions)
77
78
79 func WithSpanName(spanName string) ReaderOption {
80 return func(o *ReaderOptions) {
81 o.SpanName = spanName
82 }
83 }
84
85
86 func NewReader(o ...ReaderOption) *Reader {
87 var opts ReaderOptions
88 for _, op := range o {
89 op(&opts)
90 }
91 reader := &Reader{defaultSampler, defaultSpanName}
92 if opts.SpanName != "" {
93 reader.spanName = opts.SpanName
94 }
95 return reader
96 }
97
98
99
100 func NewIntervalReader(reader *Reader, exporter Exporter) (*IntervalReader, error) {
101 if exporter == nil {
102 return nil, errExporterNil
103 }
104 if reader == nil {
105 return nil, errReaderNil
106 }
107
108 r := &IntervalReader{
109 exporter: exporter,
110 reader: reader,
111 }
112 return r, nil
113 }
114
115
116
117
118
119 func (ir *IntervalReader) Start() error {
120 if ir == nil {
121 return errIntervalReaderNil
122 }
123 ir.mu.Lock()
124 defer ir.mu.Unlock()
125 var reportingInterval = defaultReportingDuration
126 if ir.ReportingInterval != 0 {
127 if ir.ReportingInterval < minimumReportingDuration {
128 return errReportingIntervalTooLow
129 }
130 reportingInterval = ir.ReportingInterval
131 }
132
133 if ir.quit != nil {
134 return errAlreadyStarted
135 }
136 ir.timer = time.NewTicker(reportingInterval)
137 ir.quit = make(chan bool)
138 ir.done = make(chan bool)
139
140 go ir.startInternal()
141 return nil
142 }
143
144 func (ir *IntervalReader) startInternal() {
145 for {
146 select {
147 case <-ir.timer.C:
148 ir.reader.ReadAndExport(ir.exporter)
149 case <-ir.quit:
150 ir.timer.Stop()
151 ir.done <- true
152 return
153 }
154 }
155 }
156
157
158
159 func (ir *IntervalReader) Stop() {
160 if ir == nil {
161 return
162 }
163 ir.mu.Lock()
164 defer ir.mu.Unlock()
165 if ir.quit == nil {
166 return
167 }
168 ir.quit <- true
169 <-ir.done
170 close(ir.quit)
171 close(ir.done)
172 ir.quit = nil
173 }
174
175
176 func (ir *IntervalReader) Flush() {
177 ir.mu.Lock()
178 defer ir.mu.Unlock()
179
180
181 if ir.quit != nil {
182 return
183 }
184
185 ir.reader.ReadAndExport(ir.exporter)
186 }
187
188
189
190 func (r *Reader) ReadAndExport(exporter Exporter) {
191 ctx, span := trace.StartSpan(context.Background(), r.spanName, trace.WithSampler(r.sampler))
192 defer span.End()
193 producers := metricproducer.GlobalManager().GetAll()
194 data := []*metricdata.Metric{}
195 for _, producer := range producers {
196 data = append(data, producer.Read()...)
197 }
198
199 exporter.ExportMetrics(ctx, data)
200 }
201
View as plain text