1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package view
17
18 import (
19 "fmt"
20 "sync"
21 "time"
22
23 "go.opencensus.io/resource"
24
25 "go.opencensus.io/metric/metricdata"
26 "go.opencensus.io/metric/metricproducer"
27 "go.opencensus.io/stats"
28 "go.opencensus.io/stats/internal"
29 "go.opencensus.io/tag"
30 )
31
32 func init() {
33 defaultWorker = NewMeter().(*worker)
34 go defaultWorker.start()
35 internal.DefaultRecorder = record
36 internal.MeasurementRecorder = recordMeasurement
37 }
38
39 type measureRef struct {
40 measure string
41 views map[*viewInternal]struct{}
42 }
43
44 type worker struct {
45 measures map[string]*measureRef
46 views map[string]*viewInternal
47 viewStartTimes map[*viewInternal]time.Time
48
49 timer *time.Ticker
50 c chan command
51 quit, done chan bool
52 mu sync.RWMutex
53 r *resource.Resource
54
55 exportersMu sync.RWMutex
56 exporters map[Exporter]struct{}
57 }
58
59
60
61
62
63
64
65
66 type Meter interface {
67 stats.Recorder
68
69
70 Find(name string) *View
71
72
73 Register(views ...*View) error
74
75
76
77
78 Unregister(views ...*View)
79
80
81
82
83
84
85
86 SetReportingPeriod(time.Duration)
87
88
89
90
91
92
93
94
95 RegisterExporter(Exporter)
96
97 UnregisterExporter(Exporter)
98
99
100
101 SetResource(*resource.Resource)
102
103
104
105 Start()
106
107 Stop()
108
109
110
111 RetrieveData(viewName string) ([]*Row, error)
112 }
113
114 var _ Meter = (*worker)(nil)
115
116 var defaultWorker *worker
117
118 var defaultReportingDuration = 10 * time.Second
119
120
121
122 func Find(name string) (v *View) {
123 return defaultWorker.Find(name)
124 }
125
126
127
128 func (w *worker) Find(name string) (v *View) {
129 req := &getViewByNameReq{
130 name: name,
131 c: make(chan *getViewByNameResp),
132 }
133 w.c <- req
134 resp := <-req.c
135 return resp.v
136 }
137
138
139
140 func Register(views ...*View) error {
141 return defaultWorker.Register(views...)
142 }
143
144
145
146 func (w *worker) Register(views ...*View) error {
147 req := ®isterViewReq{
148 views: views,
149 err: make(chan error),
150 }
151 w.c <- req
152 return <-req.err
153 }
154
155
156
157
158
159 func Unregister(views ...*View) {
160 defaultWorker.Unregister(views...)
161 }
162
163
164
165
166
167 func (w *worker) Unregister(views ...*View) {
168 names := make([]string, len(views))
169 for i := range views {
170 names[i] = views[i].Name
171 }
172 req := &unregisterFromViewReq{
173 views: names,
174 done: make(chan struct{}),
175 }
176 w.c <- req
177 <-req.done
178 }
179
180
181
182 func RetrieveData(viewName string) ([]*Row, error) {
183 return defaultWorker.RetrieveData(viewName)
184 }
185
186
187
188 func (w *worker) RetrieveData(viewName string) ([]*Row, error) {
189 req := &retrieveDataReq{
190 now: time.Now(),
191 v: viewName,
192 c: make(chan *retrieveDataResp),
193 }
194 w.c <- req
195 resp := <-req.c
196 return resp.rows, resp.err
197 }
198
199 func record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
200 defaultWorker.Record(tags, ms, attachments)
201 }
202
203 func recordMeasurement(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{}) {
204 defaultWorker.recordMeasurement(tags, ms, attachments)
205 }
206
207
208 func (w *worker) Record(tags *tag.Map, ms interface{}, attachments map[string]interface{}) {
209 w.recordMeasurement(tags, ms.([]stats.Measurement), attachments)
210 }
211
212
213
214 func (w *worker) recordMeasurement(tags *tag.Map, ms []stats.Measurement, attachments map[string]interface{}) {
215 req := &recordReq{
216 tm: tags,
217 ms: ms,
218 attachments: attachments,
219 t: time.Now(),
220 }
221 w.c <- req
222 }
223
224
225
226
227
228
229
230
231 func SetReportingPeriod(d time.Duration) {
232 defaultWorker.SetReportingPeriod(d)
233 }
234
235
236 func Stop() {
237 defaultWorker.Stop()
238 }
239
240
241
242
243
244
245
246
247 func (w *worker) SetReportingPeriod(d time.Duration) {
248
249
250 req := &setReportingPeriodReq{
251 d: d,
252 c: make(chan bool),
253 }
254 w.c <- req
255 <-req.c
256 }
257
258
259
260
261 func NewMeter() Meter {
262 return &worker{
263 measures: make(map[string]*measureRef),
264 views: make(map[string]*viewInternal),
265 viewStartTimes: make(map[*viewInternal]time.Time),
266 timer: time.NewTicker(defaultReportingDuration),
267 c: make(chan command, 1024),
268 quit: make(chan bool),
269 done: make(chan bool),
270
271 exporters: make(map[Exporter]struct{}),
272 }
273 }
274
275
276
277
278
279 func (w *worker) SetResource(r *resource.Resource) {
280 w.r = r
281 }
282
283 func (w *worker) Start() {
284 go w.start()
285 }
286
287 func (w *worker) start() {
288 prodMgr := metricproducer.GlobalManager()
289 prodMgr.AddProducer(w)
290
291 for {
292 select {
293 case cmd := <-w.c:
294 cmd.handleCommand(w)
295 case <-w.timer.C:
296 w.reportUsage()
297 case <-w.quit:
298 w.timer.Stop()
299 close(w.c)
300 close(w.done)
301 return
302 }
303 }
304 }
305
306 func (w *worker) Stop() {
307 prodMgr := metricproducer.GlobalManager()
308 prodMgr.DeleteProducer(w)
309 select {
310 case <-w.quit:
311 default:
312 close(w.quit)
313 }
314 <-w.done
315 }
316
317 func (w *worker) getMeasureRef(name string) *measureRef {
318 if mr, ok := w.measures[name]; ok {
319 return mr
320 }
321 mr := &measureRef{
322 measure: name,
323 views: make(map[*viewInternal]struct{}),
324 }
325 w.measures[name] = mr
326 return mr
327 }
328
329 func (w *worker) tryRegisterView(v *View) (*viewInternal, error) {
330 w.mu.Lock()
331 defer w.mu.Unlock()
332 vi, err := newViewInternal(v)
333 if err != nil {
334 return nil, err
335 }
336 if x, ok := w.views[vi.view.Name]; ok {
337 if !x.view.same(vi.view) {
338 return nil, fmt.Errorf("cannot register view %q; a different view with the same name is already registered", v.Name)
339 }
340
341
342
343 return x, nil
344 }
345 w.views[vi.view.Name] = vi
346 w.viewStartTimes[vi] = time.Now()
347 ref := w.getMeasureRef(vi.view.Measure.Name())
348 ref.views[vi] = struct{}{}
349 return vi, nil
350 }
351
352 func (w *worker) unregisterView(v *viewInternal) {
353 w.mu.Lock()
354 defer w.mu.Unlock()
355 delete(w.views, v.view.Name)
356 delete(w.viewStartTimes, v)
357 if measure := w.measures[v.view.Measure.Name()]; measure != nil {
358 delete(measure.views, v)
359 }
360 }
361
362 func (w *worker) reportView(v *viewInternal) {
363 if !v.isSubscribed() {
364 return
365 }
366 rows := v.collectedRows()
367 viewData := &Data{
368 View: v.view,
369 Start: w.viewStartTimes[v],
370 End: time.Now(),
371 Rows: rows,
372 }
373 w.exportersMu.Lock()
374 defer w.exportersMu.Unlock()
375 for e := range w.exporters {
376 e.ExportView(viewData)
377 }
378 }
379
380 func (w *worker) reportUsage() {
381 w.mu.Lock()
382 defer w.mu.Unlock()
383 for _, v := range w.views {
384 w.reportView(v)
385 }
386 }
387
388 func (w *worker) toMetric(v *viewInternal, now time.Time) *metricdata.Metric {
389 if !v.isSubscribed() {
390 return nil
391 }
392
393 return viewToMetric(v, w.r, now)
394 }
395
396
397
398 func (w *worker) Read() []*metricdata.Metric {
399 w.mu.Lock()
400 defer w.mu.Unlock()
401 now := time.Now()
402 metrics := make([]*metricdata.Metric, 0, len(w.views))
403 for _, v := range w.views {
404 metric := w.toMetric(v, now)
405 if metric != nil {
406 metrics = append(metrics, metric)
407 }
408 }
409 return metrics
410 }
411
412 func (w *worker) RegisterExporter(e Exporter) {
413 w.exportersMu.Lock()
414 defer w.exportersMu.Unlock()
415
416 w.exporters[e] = struct{}{}
417 }
418
419 func (w *worker) UnregisterExporter(e Exporter) {
420 w.exportersMu.Lock()
421 defer w.exportersMu.Unlock()
422
423 delete(w.exporters, e)
424 }
425
View as plain text