...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package trace
16
17 import (
18 "context"
19 "sync"
20 "sync/atomic"
21 "time"
22
23 "go.opentelemetry.io/otel"
24 "go.opentelemetry.io/otel/internal/global"
25 "go.opentelemetry.io/otel/sdk/internal/env"
26 "go.opentelemetry.io/otel/trace"
27 )
28
29
30 const (
31 DefaultMaxQueueSize = 2048
32 DefaultScheduleDelay = 5000
33 DefaultExportTimeout = 30000
34 DefaultMaxExportBatchSize = 512
35 )
36
37
38 type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
39
40
41
42 type BatchSpanProcessorOptions struct {
43
44
45
46 MaxQueueSize int
47
48
49
50
51 BatchTimeout time.Duration
52
53
54
55
56 ExportTimeout time.Duration
57
58
59
60
61
62 MaxExportBatchSize int
63
64
65
66
67
68 BlockOnQueueFull bool
69 }
70
71
72
73 type batchSpanProcessor struct {
74 e SpanExporter
75 o BatchSpanProcessorOptions
76
77 queue chan ReadOnlySpan
78 dropped uint32
79
80 batch []ReadOnlySpan
81 batchMutex sync.Mutex
82 timer *time.Timer
83 stopWait sync.WaitGroup
84 stopOnce sync.Once
85 stopCh chan struct{}
86 stopped atomic.Bool
87 }
88
89 var _ SpanProcessor = (*batchSpanProcessor)(nil)
90
91
92
93
94
95 func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
96 maxQueueSize := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize)
97 maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize)
98
99 if maxExportBatchSize > maxQueueSize {
100 if DefaultMaxExportBatchSize > maxQueueSize {
101 maxExportBatchSize = maxQueueSize
102 } else {
103 maxExportBatchSize = DefaultMaxExportBatchSize
104 }
105 }
106
107 o := BatchSpanProcessorOptions{
108 BatchTimeout: time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond,
109 ExportTimeout: time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
110 MaxQueueSize: maxQueueSize,
111 MaxExportBatchSize: maxExportBatchSize,
112 }
113 for _, opt := range options {
114 opt(&o)
115 }
116 bsp := &batchSpanProcessor{
117 e: exporter,
118 o: o,
119 batch: make([]ReadOnlySpan, 0, o.MaxExportBatchSize),
120 timer: time.NewTimer(o.BatchTimeout),
121 queue: make(chan ReadOnlySpan, o.MaxQueueSize),
122 stopCh: make(chan struct{}),
123 }
124
125 bsp.stopWait.Add(1)
126 go func() {
127 defer bsp.stopWait.Done()
128 bsp.processQueue()
129 bsp.drainQueue()
130 }()
131
132 return bsp
133 }
134
135
136 func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}
137
138
139 func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
140
141 if bsp.stopped.Load() {
142 return
143 }
144
145
146 if bsp.e == nil {
147 return
148 }
149 bsp.enqueue(s)
150 }
151
152
153
154 func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
155 var err error
156 bsp.stopOnce.Do(func() {
157 bsp.stopped.Store(true)
158 wait := make(chan struct{})
159 go func() {
160 close(bsp.stopCh)
161 bsp.stopWait.Wait()
162 if bsp.e != nil {
163 if err := bsp.e.Shutdown(ctx); err != nil {
164 otel.Handle(err)
165 }
166 }
167 close(wait)
168 }()
169
170 select {
171 case <-wait:
172 case <-ctx.Done():
173 err = ctx.Err()
174 }
175 })
176 return err
177 }
178
179 type forceFlushSpan struct {
180 ReadOnlySpan
181 flushed chan struct{}
182 }
183
184 func (f forceFlushSpan) SpanContext() trace.SpanContext {
185 return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
186 }
187
188
189 func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
190
191 if err := ctx.Err(); err != nil {
192 return err
193 }
194
195
196 if bsp.stopped.Load() {
197 return nil
198 }
199
200 var err error
201 if bsp.e != nil {
202 flushCh := make(chan struct{})
203 if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {
204 select {
205 case <-bsp.stopCh:
206
207 return nil
208 case <-flushCh:
209
210 case <-ctx.Done():
211 return ctx.Err()
212 }
213 }
214
215 wait := make(chan error)
216 go func() {
217 wait <- bsp.exportSpans(ctx)
218 close(wait)
219 }()
220
221 select {
222 case err = <-wait:
223 case <-ctx.Done():
224 err = ctx.Err()
225 }
226 }
227 return err
228 }
229
230
231
232 func WithMaxQueueSize(size int) BatchSpanProcessorOption {
233 return func(o *BatchSpanProcessorOptions) {
234 o.MaxQueueSize = size
235 }
236 }
237
238
239
240 func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {
241 return func(o *BatchSpanProcessorOptions) {
242 o.MaxExportBatchSize = size
243 }
244 }
245
246
247
248
249 func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {
250 return func(o *BatchSpanProcessorOptions) {
251 o.BatchTimeout = delay
252 }
253 }
254
255
256
257
258 func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption {
259 return func(o *BatchSpanProcessorOptions) {
260 o.ExportTimeout = timeout
261 }
262 }
263
264
265
266
267 func WithBlocking() BatchSpanProcessorOption {
268 return func(o *BatchSpanProcessorOptions) {
269 o.BlockOnQueueFull = true
270 }
271 }
272
273
274 func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
275 bsp.timer.Reset(bsp.o.BatchTimeout)
276
277 bsp.batchMutex.Lock()
278 defer bsp.batchMutex.Unlock()
279
280 if bsp.o.ExportTimeout > 0 {
281 var cancel context.CancelFunc
282 ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout)
283 defer cancel()
284 }
285
286 if l := len(bsp.batch); l > 0 {
287 global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
288 err := bsp.e.ExportSpans(ctx, bsp.batch)
289
290
291
292
293
294 bsp.batch = bsp.batch[:0]
295
296 if err != nil {
297 return err
298 }
299 }
300 return nil
301 }
302
303
304
305
306 func (bsp *batchSpanProcessor) processQueue() {
307 defer bsp.timer.Stop()
308
309 ctx, cancel := context.WithCancel(context.Background())
310 defer cancel()
311 for {
312 select {
313 case <-bsp.stopCh:
314 return
315 case <-bsp.timer.C:
316 if err := bsp.exportSpans(ctx); err != nil {
317 otel.Handle(err)
318 }
319 case sd := <-bsp.queue:
320 if ffs, ok := sd.(forceFlushSpan); ok {
321 close(ffs.flushed)
322 continue
323 }
324 bsp.batchMutex.Lock()
325 bsp.batch = append(bsp.batch, sd)
326 shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
327 bsp.batchMutex.Unlock()
328 if shouldExport {
329 if !bsp.timer.Stop() {
330 <-bsp.timer.C
331 }
332 if err := bsp.exportSpans(ctx); err != nil {
333 otel.Handle(err)
334 }
335 }
336 }
337 }
338 }
339
340
341
342 func (bsp *batchSpanProcessor) drainQueue() {
343 ctx, cancel := context.WithCancel(context.Background())
344 defer cancel()
345 for {
346 select {
347 case sd := <-bsp.queue:
348 if _, ok := sd.(forceFlushSpan); ok {
349
350 continue
351 }
352
353 bsp.batchMutex.Lock()
354 bsp.batch = append(bsp.batch, sd)
355 shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
356 bsp.batchMutex.Unlock()
357
358 if shouldExport {
359 if err := bsp.exportSpans(ctx); err != nil {
360 otel.Handle(err)
361 }
362 }
363 default:
364
365 if err := bsp.exportSpans(ctx); err != nil {
366 otel.Handle(err)
367 }
368 return
369 }
370 }
371 }
372
373 func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
374 ctx := context.TODO()
375 if bsp.o.BlockOnQueueFull {
376 bsp.enqueueBlockOnQueueFull(ctx, sd)
377 } else {
378 bsp.enqueueDrop(ctx, sd)
379 }
380 }
381
382 func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool {
383 if !sd.SpanContext().IsSampled() {
384 return false
385 }
386
387 select {
388 case bsp.queue <- sd:
389 return true
390 case <-ctx.Done():
391 return false
392 }
393 }
394
395 func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool {
396 if !sd.SpanContext().IsSampled() {
397 return false
398 }
399
400 select {
401 case bsp.queue <- sd:
402 return true
403 default:
404 atomic.AddUint32(&bsp.dropped, 1)
405 }
406 return false
407 }
408
409
410 func (bsp *batchSpanProcessor) MarshalLog() interface{} {
411 return struct {
412 Type string
413 SpanExporter SpanExporter
414 Config BatchSpanProcessorOptions
415 }{
416 Type: "BatchSpanProcessor",
417 SpanExporter: bsp.e,
418 Config: bsp.o,
419 }
420 }
421
View as plain text