...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package view
17
18 import (
19 "errors"
20 "fmt"
21 "strings"
22 "time"
23
24 "go.opencensus.io/stats"
25 "go.opencensus.io/stats/internal"
26 "go.opencensus.io/tag"
27 )
28
29 type command interface {
30 handleCommand(w *worker)
31 }
32
33
34 type getViewByNameReq struct {
35 name string
36 c chan *getViewByNameResp
37 }
38
39 type getViewByNameResp struct {
40 v *View
41 }
42
43 func (cmd *getViewByNameReq) handleCommand(w *worker) {
44 v := w.views[cmd.name]
45 if v == nil {
46 cmd.c <- &getViewByNameResp{nil}
47 return
48 }
49 cmd.c <- &getViewByNameResp{v.view}
50 }
51
52
53 type registerViewReq struct {
54 views []*View
55 err chan error
56 }
57
58 func (cmd *registerViewReq) handleCommand(w *worker) {
59 for _, v := range cmd.views {
60 if err := v.canonicalize(); err != nil {
61 cmd.err <- err
62 return
63 }
64 }
65 var errstr []string
66 for _, view := range cmd.views {
67 vi, err := w.tryRegisterView(view)
68 if err != nil {
69 errstr = append(errstr, fmt.Sprintf("%s: %v", view.Name, err))
70 continue
71 }
72 internal.SubscriptionReporter(view.Measure.Name())
73 vi.subscribe()
74 }
75 if len(errstr) > 0 {
76 cmd.err <- errors.New(strings.Join(errstr, "\n"))
77 } else {
78 cmd.err <- nil
79 }
80 }
81
82
83
84
85 type unregisterFromViewReq struct {
86 views []string
87 done chan struct{}
88 }
89
90 func (cmd *unregisterFromViewReq) handleCommand(w *worker) {
91 for _, name := range cmd.views {
92 vi, ok := w.views[name]
93 if !ok {
94 continue
95 }
96
97
98 w.reportView(vi)
99
100 vi.unsubscribe()
101 if !vi.isSubscribed() {
102
103
104 vi.clearRows()
105 }
106 w.unregisterView(vi)
107 }
108 cmd.done <- struct{}{}
109 }
110
111
112 type retrieveDataReq struct {
113 now time.Time
114 v string
115 c chan *retrieveDataResp
116 }
117
118 type retrieveDataResp struct {
119 rows []*Row
120 err error
121 }
122
123 func (cmd *retrieveDataReq) handleCommand(w *worker) {
124 w.mu.Lock()
125 defer w.mu.Unlock()
126 vi, ok := w.views[cmd.v]
127 if !ok {
128 cmd.c <- &retrieveDataResp{
129 nil,
130 fmt.Errorf("cannot retrieve data; view %q is not registered", cmd.v),
131 }
132 return
133 }
134
135 if !vi.isSubscribed() {
136 cmd.c <- &retrieveDataResp{
137 nil,
138 fmt.Errorf("cannot retrieve data; view %q has no subscriptions or collection is not forcibly started", cmd.v),
139 }
140 return
141 }
142 cmd.c <- &retrieveDataResp{
143 vi.collectedRows(),
144 nil,
145 }
146 }
147
148
149
150 type recordReq struct {
151 tm *tag.Map
152 ms []stats.Measurement
153 attachments map[string]interface{}
154 t time.Time
155 }
156
157 func (cmd *recordReq) handleCommand(w *worker) {
158 w.mu.Lock()
159 defer w.mu.Unlock()
160 for _, m := range cmd.ms {
161 if (m == stats.Measurement{}) {
162 continue
163 }
164 ref := w.getMeasureRef(m.Measure().Name())
165 for v := range ref.views {
166 v.addSample(cmd.tm, m.Value(), cmd.attachments, cmd.t)
167 }
168 }
169 }
170
171
172
173 type setReportingPeriodReq struct {
174 d time.Duration
175 c chan bool
176 }
177
178 func (cmd *setReportingPeriodReq) handleCommand(w *worker) {
179 w.timer.Stop()
180 if cmd.d <= 0 {
181 w.timer = time.NewTicker(defaultReportingDuration)
182 } else {
183 w.timer = time.NewTicker(cmd.d)
184 }
185 cmd.c <- true
186 }
187
View as plain text