1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package trace_test
16
17 import (
18 "context"
19 "encoding/binary"
20 "errors"
21 "fmt"
22 "os"
23 "sync"
24 "testing"
25 "time"
26
27 ottest "go.opentelemetry.io/otel/sdk/internal/internaltest"
28
29 "github.com/go-logr/logr/funcr"
30 "github.com/stretchr/testify/assert"
31 "github.com/stretchr/testify/require"
32
33 "go.opentelemetry.io/otel/internal/global"
34 "go.opentelemetry.io/otel/sdk/internal/env"
35 sdktrace "go.opentelemetry.io/otel/sdk/trace"
36 "go.opentelemetry.io/otel/sdk/trace/tracetest"
37 "go.opentelemetry.io/otel/trace"
38 )
39
40 type testBatchExporter struct {
41 mu sync.Mutex
42 spans []sdktrace.ReadOnlySpan
43 sizes []int
44 batchCount int
45 shutdownCount int
46 errors []error
47 droppedCount int
48 idx int
49 err error
50 }
51
52 func (t *testBatchExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error {
53 t.mu.Lock()
54 defer t.mu.Unlock()
55
56 if t.idx < len(t.errors) {
57 t.droppedCount += len(spans)
58 err := t.errors[t.idx]
59 t.idx++
60 return err
61 }
62
63 select {
64 case <-ctx.Done():
65 t.err = ctx.Err()
66 return ctx.Err()
67 default:
68 }
69
70 t.spans = append(t.spans, spans...)
71 t.sizes = append(t.sizes, len(spans))
72 t.batchCount++
73 return nil
74 }
75
76 func (t *testBatchExporter) Shutdown(context.Context) error {
77 t.shutdownCount++
78 return nil
79 }
80
81 func (t *testBatchExporter) len() int {
82 t.mu.Lock()
83 defer t.mu.Unlock()
84 return len(t.spans)
85 }
86
87 func (t *testBatchExporter) getBatchCount() int {
88 t.mu.Lock()
89 defer t.mu.Unlock()
90 return t.batchCount
91 }
92
93 var _ sdktrace.SpanExporter = (*testBatchExporter)(nil)
94
95 func TestNewBatchSpanProcessorWithNilExporter(t *testing.T) {
96 tp := basicTracerProvider(t)
97 bsp := sdktrace.NewBatchSpanProcessor(nil)
98 tp.RegisterSpanProcessor(bsp)
99 tr := tp.Tracer("NilExporter")
100
101 _, span := tr.Start(context.Background(), "foo")
102 span.End()
103
104
105 bsp.OnStart(context.Background(), span.(sdktrace.ReadWriteSpan))
106 bsp.OnEnd(span.(sdktrace.ReadOnlySpan))
107 if err := bsp.ForceFlush(context.Background()); err != nil {
108 t.Errorf("failed to ForceFlush the BatchSpanProcessor: %v", err)
109 }
110 if err := bsp.Shutdown(context.Background()); err != nil {
111 t.Errorf("failed to Shutdown the BatchSpanProcessor: %v", err)
112 }
113 }
114
115 type testOption struct {
116 name string
117 o []sdktrace.BatchSpanProcessorOption
118 wantNumSpans int
119 wantBatchCount int
120 genNumSpans int
121 parallel bool
122 envs map[string]string
123 }
124
125 func TestNewBatchSpanProcessorWithOptions(t *testing.T) {
126 schDelay := 200 * time.Millisecond
127 options := []testOption{
128 {
129 name: "default BatchSpanProcessorOptions",
130 wantNumSpans: 2053,
131 wantBatchCount: 4,
132 genNumSpans: 2053,
133 },
134 {
135 name: "non-default BatchTimeout",
136 o: []sdktrace.BatchSpanProcessorOption{
137 sdktrace.WithBatchTimeout(schDelay),
138 },
139 wantNumSpans: 2053,
140 wantBatchCount: 4,
141 genNumSpans: 2053,
142 },
143 {
144 name: "non-default MaxQueueSize and BatchTimeout",
145 o: []sdktrace.BatchSpanProcessorOption{
146 sdktrace.WithBatchTimeout(schDelay),
147 sdktrace.WithMaxQueueSize(200),
148 },
149 wantNumSpans: 205,
150 wantBatchCount: 1,
151 genNumSpans: 205,
152 },
153 {
154 name: "non-default MaxQueueSize, BatchTimeout and MaxExportBatchSize",
155 o: []sdktrace.BatchSpanProcessorOption{
156 sdktrace.WithBatchTimeout(schDelay),
157 sdktrace.WithMaxQueueSize(205),
158 sdktrace.WithMaxExportBatchSize(20),
159 },
160 wantNumSpans: 210,
161 wantBatchCount: 11,
162 genNumSpans: 210,
163 },
164 {
165 name: "blocking option",
166 o: []sdktrace.BatchSpanProcessorOption{
167 sdktrace.WithBatchTimeout(schDelay),
168 sdktrace.WithMaxQueueSize(200),
169 sdktrace.WithMaxExportBatchSize(20),
170 },
171 wantNumSpans: 205,
172 wantBatchCount: 11,
173 genNumSpans: 205,
174 },
175 {
176 name: "parallel span generation",
177 o: []sdktrace.BatchSpanProcessorOption{
178 sdktrace.WithBatchTimeout(schDelay),
179 sdktrace.WithMaxQueueSize(200),
180 },
181 wantNumSpans: 205,
182 wantBatchCount: 1,
183 genNumSpans: 205,
184 parallel: true,
185 },
186 {
187 name: "parallel span blocking",
188 o: []sdktrace.BatchSpanProcessorOption{
189 sdktrace.WithBatchTimeout(schDelay),
190 sdktrace.WithMaxExportBatchSize(200),
191 },
192 wantNumSpans: 2000,
193 wantBatchCount: 10,
194 genNumSpans: 2000,
195 parallel: true,
196 },
197 }
198 for _, option := range options {
199 t.Run(option.name, func(t *testing.T) {
200 te := testBatchExporter{}
201 tp := basicTracerProvider(t)
202 ssp := createAndRegisterBatchSP(option, &te)
203 if ssp == nil {
204 t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
205 }
206 tp.RegisterSpanProcessor(ssp)
207 tr := tp.Tracer("BatchSpanProcessorWithOptions")
208
209 if option.parallel {
210 generateSpanParallel(t, tr, option)
211 } else {
212 generateSpan(t, tr, option)
213 }
214
215 tp.UnregisterSpanProcessor(ssp)
216
217 gotNumOfSpans := te.len()
218 if option.wantNumSpans > 0 && option.wantNumSpans != gotNumOfSpans {
219 t.Errorf("number of exported span: got %+v, want %+v\n",
220 gotNumOfSpans, option.wantNumSpans)
221 }
222
223 gotBatchCount := te.getBatchCount()
224 if option.wantBatchCount > 0 && gotBatchCount < option.wantBatchCount {
225 t.Errorf("number batches: got %+v, want >= %+v\n",
226 gotBatchCount, option.wantBatchCount)
227 t.Errorf("Batches %v\n", te.sizes)
228 }
229 })
230 }
231 }
232
233 func TestNewBatchSpanProcessorWithEnvOptions(t *testing.T) {
234 options := []testOption{
235 {
236 name: "BatchSpanProcessorEnvOptions - Basic",
237 wantNumSpans: 2053,
238 wantBatchCount: 1,
239 genNumSpans: 2053,
240 envs: map[string]string{
241 env.BatchSpanProcessorMaxQueueSizeKey: "5000",
242 env.BatchSpanProcessorMaxExportBatchSizeKey: "5000",
243 },
244 },
245 {
246 name: "BatchSpanProcessorEnvOptions - A lager max export batch size than queue size",
247 wantNumSpans: 2053,
248 wantBatchCount: 4,
249 genNumSpans: 2053,
250 envs: map[string]string{
251 env.BatchSpanProcessorMaxQueueSizeKey: "5000",
252 env.BatchSpanProcessorMaxExportBatchSizeKey: "10000",
253 },
254 },
255 {
256 name: "BatchSpanProcessorEnvOptions - A lage max export batch size with a small queue size",
257 wantNumSpans: 2053,
258 wantBatchCount: 42,
259 genNumSpans: 2053,
260 envs: map[string]string{
261 env.BatchSpanProcessorMaxQueueSizeKey: "50",
262 env.BatchSpanProcessorMaxExportBatchSizeKey: "10000",
263 },
264 },
265 }
266
267 envStore := ottest.NewEnvStore()
268 envStore.Record(env.BatchSpanProcessorScheduleDelayKey)
269 envStore.Record(env.BatchSpanProcessorExportTimeoutKey)
270 envStore.Record(env.BatchSpanProcessorMaxQueueSizeKey)
271 envStore.Record(env.BatchSpanProcessorMaxExportBatchSizeKey)
272
273 defer func() {
274 require.NoError(t, envStore.Restore())
275 }()
276
277 for _, option := range options {
278 t.Run(option.name, func(t *testing.T) {
279 for k, v := range option.envs {
280 require.NoError(t, os.Setenv(k, v))
281 }
282
283 te := testBatchExporter{}
284 tp := basicTracerProvider(t)
285 ssp := createAndRegisterBatchSP(option, &te)
286 if ssp == nil {
287 t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
288 }
289 tp.RegisterSpanProcessor(ssp)
290 tr := tp.Tracer("BatchSpanProcessorWithOptions")
291
292 if option.parallel {
293 generateSpanParallel(t, tr, option)
294 } else {
295 generateSpan(t, tr, option)
296 }
297
298 tp.UnregisterSpanProcessor(ssp)
299
300 gotNumOfSpans := te.len()
301 if option.wantNumSpans > 0 && option.wantNumSpans != gotNumOfSpans {
302 t.Errorf("number of exported span: got %+v, want %+v\n",
303 gotNumOfSpans, option.wantNumSpans)
304 }
305
306 gotBatchCount := te.getBatchCount()
307 if option.wantBatchCount > 0 && gotBatchCount < option.wantBatchCount {
308 t.Errorf("number batches: got %+v, want >= %+v\n",
309 gotBatchCount, option.wantBatchCount)
310 t.Errorf("Batches %v\n", te.sizes)
311 }
312 })
313 }
314 }
315
316 type stuckExporter struct {
317 testBatchExporter
318 }
319
320
321 func (e *stuckExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error {
322 <-ctx.Done()
323 e.err = ctx.Err()
324 return ctx.Err()
325 }
326
327 func TestBatchSpanProcessorExportTimeout(t *testing.T) {
328 exp := new(stuckExporter)
329 bsp := sdktrace.NewBatchSpanProcessor(
330 exp,
331
332 sdktrace.WithExportTimeout(1*time.Microsecond),
333 sdktrace.WithBlocking(),
334 )
335 tp := basicTracerProvider(t)
336 tp.RegisterSpanProcessor(bsp)
337
338 tr := tp.Tracer("BatchSpanProcessorExportTimeout")
339 generateSpan(t, tr, testOption{genNumSpans: 1})
340 tp.UnregisterSpanProcessor(bsp)
341
342 if exp.err != context.DeadlineExceeded {
343 t.Errorf("context deadline error not returned: got %+v", exp.err)
344 }
345 }
346
347 func createAndRegisterBatchSP(option testOption, te *testBatchExporter) sdktrace.SpanProcessor {
348
349 options := append(option.o, sdktrace.WithBlocking())
350 return sdktrace.NewBatchSpanProcessor(te, options...)
351 }
352
353 func generateSpan(t *testing.T, tr trace.Tracer, option testOption) {
354 sc := getSpanContext()
355
356 for i := 0; i < option.genNumSpans; i++ {
357 tid := sc.TraceID()
358 binary.BigEndian.PutUint64(tid[0:8], uint64(i+1))
359 newSc := sc.WithTraceID(tid)
360 ctx := trace.ContextWithRemoteSpanContext(context.Background(), newSc)
361 _, span := tr.Start(ctx, option.name)
362 span.End()
363 }
364 }
365
366 func generateSpanParallel(t *testing.T, tr trace.Tracer, option testOption) {
367 sc := getSpanContext()
368
369 wg := &sync.WaitGroup{}
370 for i := 0; i < option.genNumSpans; i++ {
371 tid := sc.TraceID()
372 binary.BigEndian.PutUint64(tid[0:8], uint64(i+1))
373
374 wg.Add(1)
375 go func(sc trace.SpanContext) {
376 ctx := trace.ContextWithRemoteSpanContext(context.Background(), sc)
377 _, span := tr.Start(ctx, option.name)
378 span.End()
379 wg.Done()
380 }(sc.WithTraceID(tid))
381 }
382 wg.Wait()
383 }
384
385 func getSpanContext() trace.SpanContext {
386 tid, _ := trace.TraceIDFromHex("01020304050607080102040810203040")
387 sid, _ := trace.SpanIDFromHex("0102040810203040")
388 return trace.NewSpanContext(trace.SpanContextConfig{
389 TraceID: tid,
390 SpanID: sid,
391 TraceFlags: 0x1,
392 })
393 }
394
395 func TestBatchSpanProcessorShutdown(t *testing.T) {
396 var bp testBatchExporter
397 bsp := sdktrace.NewBatchSpanProcessor(&bp)
398
399 err := bsp.Shutdown(context.Background())
400 if err != nil {
401 t.Error("Error shutting the BatchSpanProcessor down\n")
402 }
403 assert.Equal(t, 1, bp.shutdownCount, "shutdown from span exporter not called")
404
405
406 err = bsp.Shutdown(context.Background())
407 if err != nil {
408 t.Error("Error shutting the BatchSpanProcessor down\n")
409 }
410 assert.Equal(t, 1, bp.shutdownCount)
411 }
412
413 func TestBatchSpanProcessorPostShutdown(t *testing.T) {
414 tp := basicTracerProvider(t)
415 be := testBatchExporter{}
416 bsp := sdktrace.NewBatchSpanProcessor(&be)
417
418 tp.RegisterSpanProcessor(bsp)
419 tr := tp.Tracer("Normal")
420
421 generateSpanParallel(t, tr, testOption{
422 o: []sdktrace.BatchSpanProcessorOption{
423 sdktrace.WithMaxExportBatchSize(50),
424 },
425 genNumSpans: 60,
426 })
427
428 require.NoError(t, bsp.Shutdown(context.Background()), "shutting down BatchSpanProcessor")
429 lenJustAfterShutdown := be.len()
430
431 _, span := tr.Start(context.Background(), "foo")
432 span.End()
433 assert.NoError(t, bsp.ForceFlush(context.Background()), "force flushing BatchSpanProcessor")
434
435 assert.Equal(t, lenJustAfterShutdown, be.len(), "OnEnd and ForceFlush should have no effect after Shutdown")
436 }
437
438 func TestBatchSpanProcessorForceFlushSucceeds(t *testing.T) {
439 te := testBatchExporter{}
440 tp := basicTracerProvider(t)
441 option := testOption{
442 name: "default BatchSpanProcessorOptions",
443 o: []sdktrace.BatchSpanProcessorOption{
444 sdktrace.WithMaxQueueSize(0),
445 sdktrace.WithMaxExportBatchSize(3000),
446 },
447 wantNumSpans: 2053,
448 wantBatchCount: 1,
449 genNumSpans: 2053,
450 }
451 ssp := createAndRegisterBatchSP(option, &te)
452 if ssp == nil {
453 t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
454 }
455 tp.RegisterSpanProcessor(ssp)
456 tr := tp.Tracer("BatchSpanProcessorWithOption")
457 if option.parallel {
458 generateSpanParallel(t, tr, option)
459 } else {
460 generateSpan(t, tr, option)
461 }
462
463
464 err := ssp.ForceFlush(context.Background())
465
466 assertMaxSpanDiff(t, te.len(), option.wantNumSpans, 10)
467
468 gotBatchCount := te.getBatchCount()
469 if gotBatchCount < option.wantBatchCount {
470 t.Errorf("number batches: got %+v, want >= %+v\n",
471 gotBatchCount, option.wantBatchCount)
472 t.Errorf("Batches %v\n", te.sizes)
473 }
474 assert.NoError(t, err)
475 }
476
477 func TestBatchSpanProcessorDropBatchIfFailed(t *testing.T) {
478 te := testBatchExporter{
479 errors: []error{errors.New("fail to export")},
480 }
481 tp := basicTracerProvider(t)
482 option := testOption{
483 o: []sdktrace.BatchSpanProcessorOption{
484 sdktrace.WithMaxQueueSize(0),
485 sdktrace.WithMaxExportBatchSize(2000),
486 },
487 wantNumSpans: 1000,
488 wantBatchCount: 1,
489 genNumSpans: 1000,
490 }
491 ssp := createAndRegisterBatchSP(option, &te)
492 if ssp == nil {
493 t.Fatalf("%s: Error creating new instance of BatchSpanProcessor\n", option.name)
494 }
495 tp.RegisterSpanProcessor(ssp)
496 tr := tp.Tracer("BatchSpanProcessorWithOption")
497 if option.parallel {
498 generateSpanParallel(t, tr, option)
499 } else {
500 generateSpan(t, tr, option)
501 }
502
503
504 err := ssp.ForceFlush(context.Background())
505 assert.Error(t, err)
506 assert.EqualError(t, err, "fail to export")
507
508
509 assertMaxSpanDiff(t, te.droppedCount, option.wantNumSpans, 10)
510 assert.Equal(t, 0, te.len())
511 assert.Equal(t, 0, te.getBatchCount())
512
513
514 if option.parallel {
515 generateSpanParallel(t, tr, option)
516 } else {
517 generateSpan(t, tr, option)
518 }
519
520
521 err = ssp.ForceFlush(context.Background())
522 assert.NoError(t, err)
523
524 assertMaxSpanDiff(t, te.len(), option.wantNumSpans, 10)
525 gotBatchCount := te.getBatchCount()
526 if gotBatchCount < option.wantBatchCount {
527 t.Errorf("number batches: got %+v, want >= %+v\n",
528 gotBatchCount, option.wantBatchCount)
529 t.Errorf("Batches %v\n", te.sizes)
530 }
531 }
532
533 func assertMaxSpanDiff(t *testing.T, want, got, maxDif int) {
534 spanDifference := want - got
535 if spanDifference < 0 {
536 spanDifference = spanDifference * -1
537 }
538 if spanDifference > maxDif {
539 t.Errorf("number of exported span not equal to or within %d less than: got %+v, want %+v\n",
540 maxDif, got, want)
541 }
542 }
543
544 type indefiniteExporter struct{}
545
546 func (indefiniteExporter) Shutdown(context.Context) error { return nil }
547 func (indefiniteExporter) ExportSpans(ctx context.Context, _ []sdktrace.ReadOnlySpan) error {
548 <-ctx.Done()
549 return ctx.Err()
550 }
551
552 func TestBatchSpanProcessorForceFlushCancellation(t *testing.T) {
553 ctx, cancel := context.WithCancel(context.Background())
554
555 cancel()
556
557 bsp := sdktrace.NewBatchSpanProcessor(indefiniteExporter{})
558 if got, want := bsp.ForceFlush(ctx), context.Canceled; !errors.Is(got, want) {
559 t.Errorf("expected %q error, got %v", want, got)
560 }
561 }
562
563 func TestBatchSpanProcessorForceFlushTimeout(t *testing.T) {
564
565 ctx, cancel := context.WithTimeout(context.Background(), time.Nanosecond)
566 defer cancel()
567 <-ctx.Done()
568
569 bsp := sdktrace.NewBatchSpanProcessor(indefiniteExporter{})
570 if got, want := bsp.ForceFlush(ctx), context.DeadlineExceeded; !errors.Is(got, want) {
571 t.Errorf("expected %q error, got %v", want, got)
572 }
573 }
574
575 func TestBatchSpanProcessorForceFlushQueuedSpans(t *testing.T) {
576 ctx := context.Background()
577
578 exp := tracetest.NewInMemoryExporter()
579
580 tp := sdktrace.NewTracerProvider(
581 sdktrace.WithBatcher(exp),
582 )
583
584 tracer := tp.Tracer("tracer")
585
586 for i := 0; i < 10; i++ {
587 _, span := tracer.Start(ctx, fmt.Sprintf("span%d", i))
588 span.End()
589
590 err := tp.ForceFlush(ctx)
591 assert.NoError(t, err)
592
593 assert.Len(t, exp.GetSpans(), i+1)
594 }
595 }
596
597 func TestBatchSpanProcessorConcurrentSafe(t *testing.T) {
598 ctx := context.Background()
599 var bp testBatchExporter
600 bsp := sdktrace.NewBatchSpanProcessor(&bp)
601 tp := basicTracerProvider(t)
602 tp.RegisterSpanProcessor(bsp)
603 tr := tp.Tracer(t.Name())
604
605 var wg sync.WaitGroup
606
607 wg.Add(1)
608 go func() {
609 defer wg.Done()
610 generateSpan(t, tr, testOption{genNumSpans: 1})
611 }()
612
613 wg.Add(1)
614 go func() {
615 defer wg.Done()
616 _ = bsp.ForceFlush(ctx)
617 }()
618
619 wg.Add(1)
620 go func() {
621 defer wg.Done()
622 _ = bsp.Shutdown(ctx)
623 }()
624
625 wg.Add(1)
626 go func() {
627 defer wg.Done()
628 _ = tp.ForceFlush(ctx)
629 }()
630
631 wg.Add(1)
632 go func() {
633 defer wg.Done()
634 _ = tp.Shutdown(ctx)
635 }()
636
637 wg.Wait()
638 }
639
640 func BenchmarkSpanProcessor(b *testing.B) {
641 tp := sdktrace.NewTracerProvider(
642 sdktrace.WithBatcher(
643 tracetest.NewNoopExporter(),
644 sdktrace.WithMaxExportBatchSize(10),
645 ))
646 tracer := tp.Tracer("bench")
647 ctx := context.Background()
648
649 b.ResetTimer()
650 b.ReportAllocs()
651
652 for i := 0; i < b.N; i++ {
653 for j := 0; j < 10; j++ {
654 _, span := tracer.Start(ctx, "bench")
655 span.End()
656 }
657 }
658 }
659
660 func BenchmarkSpanProcessorVerboseLogging(b *testing.B) {
661 global.SetLogger(funcr.New(func(prefix, args string) {}, funcr.Options{Verbosity: 5}))
662 tp := sdktrace.NewTracerProvider(
663 sdktrace.WithBatcher(
664 tracetest.NewNoopExporter(),
665 sdktrace.WithMaxExportBatchSize(10),
666 ))
667 tracer := tp.Tracer("bench")
668 ctx := context.Background()
669
670 b.ResetTimer()
671 b.ReportAllocs()
672
673 for i := 0; i < b.N; i++ {
674 for j := 0; j < 10; j++ {
675 _, span := tracer.Start(ctx, "bench")
676 span.End()
677 }
678 }
679 }
680
View as plain text