1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package trace
16
17 import (
18 "sync"
19 "time"
20
21 "go.opencensus.io/internal"
22 )
23
24 const (
25 maxBucketSize = 100000
26 defaultBucketSize = 10
27 )
28
29 var (
30 ssmu sync.RWMutex
31 spanStores = make(map[string]*spanStore)
32 )
33
34
35 type internalOnly struct{}
36
37 func init() {
38
39 internal.Trace = &internalOnly{}
40 }
41
42
43 func (i internalOnly) ReportActiveSpans(name string) []*SpanData {
44 s := spanStoreForName(name)
45 if s == nil {
46 return nil
47 }
48 var out []*SpanData
49 s.mu.Lock()
50 defer s.mu.Unlock()
51 for activeSpan := range s.active {
52 if s, ok := activeSpan.(*span); ok {
53 out = append(out, s.makeSpanData())
54 }
55 }
56 return out
57 }
58
59
60
61
62 func (i internalOnly) ReportSpansByError(name string, code int32) []*SpanData {
63 s := spanStoreForName(name)
64 if s == nil {
65 return nil
66 }
67 var out []*SpanData
68 s.mu.Lock()
69 defer s.mu.Unlock()
70 if code != 0 {
71 if b, ok := s.errors[code]; ok {
72 for _, sd := range b.buffer {
73 if sd == nil {
74 break
75 }
76 out = append(out, sd)
77 }
78 }
79 } else {
80 for _, b := range s.errors {
81 for _, sd := range b.buffer {
82 if sd == nil {
83 break
84 }
85 out = append(out, sd)
86 }
87 }
88 }
89 return out
90 }
91
92
93
94 func (i internalOnly) ConfigureBucketSizes(bcs []internal.BucketConfiguration) {
95 for _, bc := range bcs {
96 latencyBucketSize := bc.MaxRequestsSucceeded
97 if latencyBucketSize < 0 {
98 latencyBucketSize = 0
99 }
100 if latencyBucketSize > maxBucketSize {
101 latencyBucketSize = maxBucketSize
102 }
103 errorBucketSize := bc.MaxRequestsErrors
104 if errorBucketSize < 0 {
105 errorBucketSize = 0
106 }
107 if errorBucketSize > maxBucketSize {
108 errorBucketSize = maxBucketSize
109 }
110 spanStoreSetSize(bc.Name, latencyBucketSize, errorBucketSize)
111 }
112 }
113
114
115 func (i internalOnly) ReportSpansPerMethod() map[string]internal.PerMethodSummary {
116 out := make(map[string]internal.PerMethodSummary)
117 ssmu.RLock()
118 defer ssmu.RUnlock()
119 for name, s := range spanStores {
120 s.mu.Lock()
121 p := internal.PerMethodSummary{
122 Active: len(s.active),
123 }
124 for code, b := range s.errors {
125 p.ErrorBuckets = append(p.ErrorBuckets, internal.ErrorBucketSummary{
126 ErrorCode: code,
127 Size: b.size(),
128 })
129 }
130 for i, b := range s.latency {
131 min, max := latencyBucketBounds(i)
132 p.LatencyBuckets = append(p.LatencyBuckets, internal.LatencyBucketSummary{
133 MinLatency: min,
134 MaxLatency: max,
135 Size: b.size(),
136 })
137 }
138 s.mu.Unlock()
139 out[name] = p
140 }
141 return out
142 }
143
144
145
146
147
148 func (i internalOnly) ReportSpansByLatency(name string, minLatency, maxLatency time.Duration) []*SpanData {
149 s := spanStoreForName(name)
150 if s == nil {
151 return nil
152 }
153 var out []*SpanData
154 s.mu.Lock()
155 defer s.mu.Unlock()
156 for i, b := range s.latency {
157 min, max := latencyBucketBounds(i)
158 if i+1 != len(s.latency) && max <= minLatency {
159 continue
160 }
161 if maxLatency != 0 && maxLatency < min {
162 continue
163 }
164 for _, sd := range b.buffer {
165 if sd == nil {
166 break
167 }
168 if minLatency != 0 || maxLatency != 0 {
169 d := sd.EndTime.Sub(sd.StartTime)
170 if d < minLatency {
171 continue
172 }
173 if maxLatency != 0 && d > maxLatency {
174 continue
175 }
176 }
177 out = append(out, sd)
178 }
179 }
180 return out
181 }
182
183
184
185
186
187
188 type spanStore struct {
189 mu sync.Mutex
190 active map[SpanInterface]struct{}
191 errors map[int32]*bucket
192 latency []bucket
193 maxSpansPerErrorBucket int
194 }
195
196
197 func newSpanStore(name string, latencyBucketSize int, errorBucketSize int) *spanStore {
198 s := &spanStore{
199 active: make(map[SpanInterface]struct{}),
200 latency: make([]bucket, len(defaultLatencies)+1),
201 maxSpansPerErrorBucket: errorBucketSize,
202 }
203 for i := range s.latency {
204 s.latency[i] = makeBucket(latencyBucketSize)
205 }
206 return s
207 }
208
209
210
211
212 func spanStoreForName(name string) *spanStore {
213 var s *spanStore
214 ssmu.RLock()
215 s, _ = spanStores[name]
216 ssmu.RUnlock()
217 return s
218 }
219
220
221
222
223 func spanStoreForNameCreateIfNew(name string) *spanStore {
224 ssmu.RLock()
225 s, ok := spanStores[name]
226 ssmu.RUnlock()
227 if ok {
228 return s
229 }
230 ssmu.Lock()
231 defer ssmu.Unlock()
232 s, ok = spanStores[name]
233 if ok {
234 return s
235 }
236 s = newSpanStore(name, defaultBucketSize, defaultBucketSize)
237 spanStores[name] = s
238 return s
239 }
240
241
242
243
244 func spanStoreSetSize(name string, latencyBucketSize int, errorBucketSize int) {
245 ssmu.RLock()
246 s, ok := spanStores[name]
247 ssmu.RUnlock()
248 if ok {
249 s.resize(latencyBucketSize, errorBucketSize)
250 return
251 }
252 ssmu.Lock()
253 defer ssmu.Unlock()
254 s, ok = spanStores[name]
255 if ok {
256 s.resize(latencyBucketSize, errorBucketSize)
257 return
258 }
259 s = newSpanStore(name, latencyBucketSize, errorBucketSize)
260 spanStores[name] = s
261 }
262
263 func (s *spanStore) resize(latencyBucketSize int, errorBucketSize int) {
264 s.mu.Lock()
265 for i := range s.latency {
266 s.latency[i].resize(latencyBucketSize)
267 }
268 for _, b := range s.errors {
269 b.resize(errorBucketSize)
270 }
271 s.maxSpansPerErrorBucket = errorBucketSize
272 s.mu.Unlock()
273 }
274
275
276 func (s *spanStore) add(span SpanInterface) {
277 s.mu.Lock()
278 s.active[span] = struct{}{}
279 s.mu.Unlock()
280 }
281
282
283
284 func (s *spanStore) finished(span SpanInterface, sd *SpanData) {
285 latency := sd.EndTime.Sub(sd.StartTime)
286 if latency < 0 {
287 latency = 0
288 }
289 code := sd.Status.Code
290
291 s.mu.Lock()
292 delete(s.active, span)
293 if code == 0 {
294 s.latency[latencyBucket(latency)].add(sd)
295 } else {
296 if s.errors == nil {
297 s.errors = make(map[int32]*bucket)
298 }
299 if b := s.errors[code]; b != nil {
300 b.add(sd)
301 } else {
302 b := makeBucket(s.maxSpansPerErrorBucket)
303 s.errors[code] = &b
304 b.add(sd)
305 }
306 }
307 s.mu.Unlock()
308 }
309
View as plain text