...

Source file src/go.opentelemetry.io/otel/sdk/trace/batch_span_processor.go

Documentation: go.opentelemetry.io/otel/sdk/trace

     1  // Copyright The OpenTelemetry Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package trace // import "go.opentelemetry.io/otel/sdk/trace"
    16  
    17  import (
    18  	"context"
    19  	"sync"
    20  	"sync/atomic"
    21  	"time"
    22  
    23  	"go.opentelemetry.io/otel"
    24  	"go.opentelemetry.io/otel/internal/global"
    25  	"go.opentelemetry.io/otel/sdk/internal/env"
    26  	"go.opentelemetry.io/otel/trace"
    27  )
    28  
    29  // Defaults for BatchSpanProcessorOptions.
    30  const (
    31  	DefaultMaxQueueSize       = 2048
    32  	DefaultScheduleDelay      = 5000
    33  	DefaultExportTimeout      = 30000
    34  	DefaultMaxExportBatchSize = 512
    35  )
    36  
    37  // BatchSpanProcessorOption configures a BatchSpanProcessor.
    38  type BatchSpanProcessorOption func(o *BatchSpanProcessorOptions)
    39  
    40  // BatchSpanProcessorOptions is configuration settings for a
    41  // BatchSpanProcessor.
    42  type BatchSpanProcessorOptions struct {
    43  	// MaxQueueSize is the maximum queue size to buffer spans for delayed processing. If the
    44  	// queue gets full it drops the spans. Use BlockOnQueueFull to change this behavior.
    45  	// The default value of MaxQueueSize is 2048.
    46  	MaxQueueSize int
    47  
    48  	// BatchTimeout is the maximum duration for constructing a batch. Processor
    49  	// forcefully sends available spans when timeout is reached.
    50  	// The default value of BatchTimeout is 5000 msec.
    51  	BatchTimeout time.Duration
    52  
    53  	// ExportTimeout specifies the maximum duration for exporting spans. If the timeout
    54  	// is reached, the export will be cancelled.
    55  	// The default value of ExportTimeout is 30000 msec.
    56  	ExportTimeout time.Duration
    57  
    58  	// MaxExportBatchSize is the maximum number of spans to process in a single batch.
    59  	// If there are more than one batch worth of spans then it processes multiple batches
    60  	// of spans one batch after the other without any delay.
    61  	// The default value of MaxExportBatchSize is 512.
    62  	MaxExportBatchSize int
    63  
    64  	// BlockOnQueueFull blocks onEnd() and onStart() method if the queue is full
    65  	// AND if BlockOnQueueFull is set to true.
    66  	// Blocking option should be used carefully as it can severely affect the performance of an
    67  	// application.
    68  	BlockOnQueueFull bool
    69  }
    70  
    71  // batchSpanProcessor is a SpanProcessor that batches asynchronously-received
    72  // spans and sends them to a trace.Exporter when complete.
    73  type batchSpanProcessor struct {
    74  	e SpanExporter
    75  	o BatchSpanProcessorOptions
    76  
    77  	queue   chan ReadOnlySpan
    78  	dropped uint32
    79  
    80  	batch      []ReadOnlySpan
    81  	batchMutex sync.Mutex
    82  	timer      *time.Timer
    83  	stopWait   sync.WaitGroup
    84  	stopOnce   sync.Once
    85  	stopCh     chan struct{}
    86  	stopped    atomic.Bool
    87  }
    88  
    89  var _ SpanProcessor = (*batchSpanProcessor)(nil)
    90  
    91  // NewBatchSpanProcessor creates a new SpanProcessor that will send completed
    92  // span batches to the exporter with the supplied options.
    93  //
    94  // If the exporter is nil, the span processor will perform no action.
    95  func NewBatchSpanProcessor(exporter SpanExporter, options ...BatchSpanProcessorOption) SpanProcessor {
    96  	maxQueueSize := env.BatchSpanProcessorMaxQueueSize(DefaultMaxQueueSize)
    97  	maxExportBatchSize := env.BatchSpanProcessorMaxExportBatchSize(DefaultMaxExportBatchSize)
    98  
    99  	if maxExportBatchSize > maxQueueSize {
   100  		if DefaultMaxExportBatchSize > maxQueueSize {
   101  			maxExportBatchSize = maxQueueSize
   102  		} else {
   103  			maxExportBatchSize = DefaultMaxExportBatchSize
   104  		}
   105  	}
   106  
   107  	o := BatchSpanProcessorOptions{
   108  		BatchTimeout:       time.Duration(env.BatchSpanProcessorScheduleDelay(DefaultScheduleDelay)) * time.Millisecond,
   109  		ExportTimeout:      time.Duration(env.BatchSpanProcessorExportTimeout(DefaultExportTimeout)) * time.Millisecond,
   110  		MaxQueueSize:       maxQueueSize,
   111  		MaxExportBatchSize: maxExportBatchSize,
   112  	}
   113  	for _, opt := range options {
   114  		opt(&o)
   115  	}
   116  	bsp := &batchSpanProcessor{
   117  		e:      exporter,
   118  		o:      o,
   119  		batch:  make([]ReadOnlySpan, 0, o.MaxExportBatchSize),
   120  		timer:  time.NewTimer(o.BatchTimeout),
   121  		queue:  make(chan ReadOnlySpan, o.MaxQueueSize),
   122  		stopCh: make(chan struct{}),
   123  	}
   124  
   125  	bsp.stopWait.Add(1)
   126  	go func() {
   127  		defer bsp.stopWait.Done()
   128  		bsp.processQueue()
   129  		bsp.drainQueue()
   130  	}()
   131  
   132  	return bsp
   133  }
   134  
   135  // OnStart method does nothing.
   136  func (bsp *batchSpanProcessor) OnStart(parent context.Context, s ReadWriteSpan) {}
   137  
   138  // OnEnd method enqueues a ReadOnlySpan for later processing.
   139  func (bsp *batchSpanProcessor) OnEnd(s ReadOnlySpan) {
   140  	// Do not enqueue spans after Shutdown.
   141  	if bsp.stopped.Load() {
   142  		return
   143  	}
   144  
   145  	// Do not enqueue spans if we are just going to drop them.
   146  	if bsp.e == nil {
   147  		return
   148  	}
   149  	bsp.enqueue(s)
   150  }
   151  
   152  // Shutdown flushes the queue and waits until all spans are processed.
   153  // It only executes once. Subsequent call does nothing.
   154  func (bsp *batchSpanProcessor) Shutdown(ctx context.Context) error {
   155  	var err error
   156  	bsp.stopOnce.Do(func() {
   157  		bsp.stopped.Store(true)
   158  		wait := make(chan struct{})
   159  		go func() {
   160  			close(bsp.stopCh)
   161  			bsp.stopWait.Wait()
   162  			if bsp.e != nil {
   163  				if err := bsp.e.Shutdown(ctx); err != nil {
   164  					otel.Handle(err)
   165  				}
   166  			}
   167  			close(wait)
   168  		}()
   169  		// Wait until the wait group is done or the context is cancelled
   170  		select {
   171  		case <-wait:
   172  		case <-ctx.Done():
   173  			err = ctx.Err()
   174  		}
   175  	})
   176  	return err
   177  }
   178  
   179  type forceFlushSpan struct {
   180  	ReadOnlySpan
   181  	flushed chan struct{}
   182  }
   183  
   184  func (f forceFlushSpan) SpanContext() trace.SpanContext {
   185  	return trace.NewSpanContext(trace.SpanContextConfig{TraceFlags: trace.FlagsSampled})
   186  }
   187  
   188  // ForceFlush exports all ended spans that have not yet been exported.
   189  func (bsp *batchSpanProcessor) ForceFlush(ctx context.Context) error {
   190  	// Interrupt if context is already canceled.
   191  	if err := ctx.Err(); err != nil {
   192  		return err
   193  	}
   194  
   195  	// Do nothing after Shutdown.
   196  	if bsp.stopped.Load() {
   197  		return nil
   198  	}
   199  
   200  	var err error
   201  	if bsp.e != nil {
   202  		flushCh := make(chan struct{})
   203  		if bsp.enqueueBlockOnQueueFull(ctx, forceFlushSpan{flushed: flushCh}) {
   204  			select {
   205  			case <-bsp.stopCh:
   206  				// The batchSpanProcessor is Shutdown.
   207  				return nil
   208  			case <-flushCh:
   209  				// Processed any items in queue prior to ForceFlush being called
   210  			case <-ctx.Done():
   211  				return ctx.Err()
   212  			}
   213  		}
   214  
   215  		wait := make(chan error)
   216  		go func() {
   217  			wait <- bsp.exportSpans(ctx)
   218  			close(wait)
   219  		}()
   220  		// Wait until the export is finished or the context is cancelled/timed out
   221  		select {
   222  		case err = <-wait:
   223  		case <-ctx.Done():
   224  			err = ctx.Err()
   225  		}
   226  	}
   227  	return err
   228  }
   229  
   230  // WithMaxQueueSize returns a BatchSpanProcessorOption that configures the
   231  // maximum queue size allowed for a BatchSpanProcessor.
   232  func WithMaxQueueSize(size int) BatchSpanProcessorOption {
   233  	return func(o *BatchSpanProcessorOptions) {
   234  		o.MaxQueueSize = size
   235  	}
   236  }
   237  
   238  // WithMaxExportBatchSize returns a BatchSpanProcessorOption that configures
   239  // the maximum export batch size allowed for a BatchSpanProcessor.
   240  func WithMaxExportBatchSize(size int) BatchSpanProcessorOption {
   241  	return func(o *BatchSpanProcessorOptions) {
   242  		o.MaxExportBatchSize = size
   243  	}
   244  }
   245  
   246  // WithBatchTimeout returns a BatchSpanProcessorOption that configures the
   247  // maximum delay allowed for a BatchSpanProcessor before it will export any
   248  // held span (whether the queue is full or not).
   249  func WithBatchTimeout(delay time.Duration) BatchSpanProcessorOption {
   250  	return func(o *BatchSpanProcessorOptions) {
   251  		o.BatchTimeout = delay
   252  	}
   253  }
   254  
   255  // WithExportTimeout returns a BatchSpanProcessorOption that configures the
   256  // amount of time a BatchSpanProcessor waits for an exporter to export before
   257  // abandoning the export.
   258  func WithExportTimeout(timeout time.Duration) BatchSpanProcessorOption {
   259  	return func(o *BatchSpanProcessorOptions) {
   260  		o.ExportTimeout = timeout
   261  	}
   262  }
   263  
   264  // WithBlocking returns a BatchSpanProcessorOption that configures a
   265  // BatchSpanProcessor to wait for enqueue operations to succeed instead of
   266  // dropping data when the queue is full.
   267  func WithBlocking() BatchSpanProcessorOption {
   268  	return func(o *BatchSpanProcessorOptions) {
   269  		o.BlockOnQueueFull = true
   270  	}
   271  }
   272  
   273  // exportSpans is a subroutine of processing and draining the queue.
   274  func (bsp *batchSpanProcessor) exportSpans(ctx context.Context) error {
   275  	bsp.timer.Reset(bsp.o.BatchTimeout)
   276  
   277  	bsp.batchMutex.Lock()
   278  	defer bsp.batchMutex.Unlock()
   279  
   280  	if bsp.o.ExportTimeout > 0 {
   281  		var cancel context.CancelFunc
   282  		ctx, cancel = context.WithTimeout(ctx, bsp.o.ExportTimeout)
   283  		defer cancel()
   284  	}
   285  
   286  	if l := len(bsp.batch); l > 0 {
   287  		global.Debug("exporting spans", "count", len(bsp.batch), "total_dropped", atomic.LoadUint32(&bsp.dropped))
   288  		err := bsp.e.ExportSpans(ctx, bsp.batch)
   289  
   290  		// A new batch is always created after exporting, even if the batch failed to be exported.
   291  		//
   292  		// It is up to the exporter to implement any type of retry logic if a batch is failing
   293  		// to be exported, since it is specific to the protocol and backend being sent to.
   294  		bsp.batch = bsp.batch[:0]
   295  
   296  		if err != nil {
   297  			return err
   298  		}
   299  	}
   300  	return nil
   301  }
   302  
   303  // processQueue removes spans from the `queue` channel until processor
   304  // is shut down. It calls the exporter in batches of up to MaxExportBatchSize
   305  // waiting up to BatchTimeout to form a batch.
   306  func (bsp *batchSpanProcessor) processQueue() {
   307  	defer bsp.timer.Stop()
   308  
   309  	ctx, cancel := context.WithCancel(context.Background())
   310  	defer cancel()
   311  	for {
   312  		select {
   313  		case <-bsp.stopCh:
   314  			return
   315  		case <-bsp.timer.C:
   316  			if err := bsp.exportSpans(ctx); err != nil {
   317  				otel.Handle(err)
   318  			}
   319  		case sd := <-bsp.queue:
   320  			if ffs, ok := sd.(forceFlushSpan); ok {
   321  				close(ffs.flushed)
   322  				continue
   323  			}
   324  			bsp.batchMutex.Lock()
   325  			bsp.batch = append(bsp.batch, sd)
   326  			shouldExport := len(bsp.batch) >= bsp.o.MaxExportBatchSize
   327  			bsp.batchMutex.Unlock()
   328  			if shouldExport {
   329  				if !bsp.timer.Stop() {
   330  					<-bsp.timer.C
   331  				}
   332  				if err := bsp.exportSpans(ctx); err != nil {
   333  					otel.Handle(err)
   334  				}
   335  			}
   336  		}
   337  	}
   338  }
   339  
   340  // drainQueue awaits the any caller that had added to bsp.stopWait
   341  // to finish the enqueue, then exports the final batch.
   342  func (bsp *batchSpanProcessor) drainQueue() {
   343  	ctx, cancel := context.WithCancel(context.Background())
   344  	defer cancel()
   345  	for {
   346  		select {
   347  		case sd := <-bsp.queue:
   348  			if _, ok := sd.(forceFlushSpan); ok {
   349  				// Ignore flush requests as they are not valid spans.
   350  				continue
   351  			}
   352  
   353  			bsp.batchMutex.Lock()
   354  			bsp.batch = append(bsp.batch, sd)
   355  			shouldExport := len(bsp.batch) == bsp.o.MaxExportBatchSize
   356  			bsp.batchMutex.Unlock()
   357  
   358  			if shouldExport {
   359  				if err := bsp.exportSpans(ctx); err != nil {
   360  					otel.Handle(err)
   361  				}
   362  			}
   363  		default:
   364  			// There are no more enqueued spans. Make final export.
   365  			if err := bsp.exportSpans(ctx); err != nil {
   366  				otel.Handle(err)
   367  			}
   368  			return
   369  		}
   370  	}
   371  }
   372  
   373  func (bsp *batchSpanProcessor) enqueue(sd ReadOnlySpan) {
   374  	ctx := context.TODO()
   375  	if bsp.o.BlockOnQueueFull {
   376  		bsp.enqueueBlockOnQueueFull(ctx, sd)
   377  	} else {
   378  		bsp.enqueueDrop(ctx, sd)
   379  	}
   380  }
   381  
   382  func (bsp *batchSpanProcessor) enqueueBlockOnQueueFull(ctx context.Context, sd ReadOnlySpan) bool {
   383  	if !sd.SpanContext().IsSampled() {
   384  		return false
   385  	}
   386  
   387  	select {
   388  	case bsp.queue <- sd:
   389  		return true
   390  	case <-ctx.Done():
   391  		return false
   392  	}
   393  }
   394  
   395  func (bsp *batchSpanProcessor) enqueueDrop(ctx context.Context, sd ReadOnlySpan) bool {
   396  	if !sd.SpanContext().IsSampled() {
   397  		return false
   398  	}
   399  
   400  	select {
   401  	case bsp.queue <- sd:
   402  		return true
   403  	default:
   404  		atomic.AddUint32(&bsp.dropped, 1)
   405  	}
   406  	return false
   407  }
   408  
   409  // MarshalLog is the marshaling function used by the logging system to represent this exporter.
   410  func (bsp *batchSpanProcessor) MarshalLog() interface{} {
   411  	return struct {
   412  		Type         string
   413  		SpanExporter SpanExporter
   414  		Config       BatchSpanProcessorOptions
   415  	}{
   416  		Type:         "BatchSpanProcessor",
   417  		SpanExporter: bsp.e,
   418  		Config:       bsp.o,
   419  	}
   420  }
   421  

View as plain text