1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package trace
16
17 import (
18 "context"
19 "fmt"
20 "sync"
21 "sync/atomic"
22
23 "go.opentelemetry.io/otel"
24 "go.opentelemetry.io/otel/internal/global"
25 "go.opentelemetry.io/otel/sdk/instrumentation"
26 "go.opentelemetry.io/otel/sdk/resource"
27 "go.opentelemetry.io/otel/trace"
28 "go.opentelemetry.io/otel/trace/embedded"
29 "go.opentelemetry.io/otel/trace/noop"
30 )
31
32 const (
33 defaultTracerName = "go.opentelemetry.io/otel/sdk/tracer"
34 )
35
36
37 type tracerProviderConfig struct {
38
39
40
41
42
43 processors []SpanProcessor
44
45
46 sampler Sampler
47
48
49 idGenerator IDGenerator
50
51
52 spanLimits SpanLimits
53
54
55 resource *resource.Resource
56 }
57
58
59 func (cfg tracerProviderConfig) MarshalLog() interface{} {
60 return struct {
61 SpanProcessors []SpanProcessor
62 SamplerType string
63 IDGeneratorType string
64 SpanLimits SpanLimits
65 Resource *resource.Resource
66 }{
67 SpanProcessors: cfg.processors,
68 SamplerType: fmt.Sprintf("%T", cfg.sampler),
69 IDGeneratorType: fmt.Sprintf("%T", cfg.idGenerator),
70 SpanLimits: cfg.spanLimits,
71 Resource: cfg.resource,
72 }
73 }
74
75
76
77 type TracerProvider struct {
78 embedded.TracerProvider
79
80 mu sync.Mutex
81 namedTracer map[instrumentation.Scope]*tracer
82 spanProcessors atomic.Pointer[spanProcessorStates]
83
84 isShutdown atomic.Bool
85
86
87
88 sampler Sampler
89 idGenerator IDGenerator
90 spanLimits SpanLimits
91 resource *resource.Resource
92 }
93
94 var _ trace.TracerProvider = &TracerProvider{}
95
96
97
98
99
100
101
102
103
104
105
106 func NewTracerProvider(opts ...TracerProviderOption) *TracerProvider {
107 o := tracerProviderConfig{
108 spanLimits: NewSpanLimits(),
109 }
110 o = applyTracerProviderEnvConfigs(o)
111
112 for _, opt := range opts {
113 o = opt.apply(o)
114 }
115
116 o = ensureValidTracerProviderConfig(o)
117
118 tp := &TracerProvider{
119 namedTracer: make(map[instrumentation.Scope]*tracer),
120 sampler: o.sampler,
121 idGenerator: o.idGenerator,
122 spanLimits: o.spanLimits,
123 resource: o.resource,
124 }
125 global.Info("TracerProvider created", "config", o)
126
127 spss := make(spanProcessorStates, 0, len(o.processors))
128 for _, sp := range o.processors {
129 spss = append(spss, newSpanProcessorState(sp))
130 }
131 tp.spanProcessors.Store(&spss)
132
133 return tp
134 }
135
136
137
138
139
140
141
142
143 func (p *TracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer {
144
145 if p.isShutdown.Load() {
146 return noop.NewTracerProvider().Tracer(name, opts...)
147 }
148 c := trace.NewTracerConfig(opts...)
149 if name == "" {
150 name = defaultTracerName
151 }
152 is := instrumentation.Scope{
153 Name: name,
154 Version: c.InstrumentationVersion(),
155 SchemaURL: c.SchemaURL(),
156 }
157
158 t, ok := func() (trace.Tracer, bool) {
159 p.mu.Lock()
160 defer p.mu.Unlock()
161
162
163 if p.isShutdown.Load() {
164 return noop.NewTracerProvider().Tracer(name, opts...), true
165 }
166 t, ok := p.namedTracer[is]
167 if !ok {
168 t = &tracer{
169 provider: p,
170 instrumentationScope: is,
171 }
172 p.namedTracer[is] = t
173 }
174 return t, ok
175 }()
176 if !ok {
177
178
179
180
181
182 global.Info("Tracer created", "name", name, "version", is.Version, "schemaURL", is.SchemaURL)
183 }
184 return t
185 }
186
187
188 func (p *TracerProvider) RegisterSpanProcessor(sp SpanProcessor) {
189
190 if p.isShutdown.Load() {
191 return
192 }
193 p.mu.Lock()
194 defer p.mu.Unlock()
195
196 if p.isShutdown.Load() {
197 return
198 }
199
200 current := p.getSpanProcessors()
201 newSPS := make(spanProcessorStates, 0, len(current)+1)
202 newSPS = append(newSPS, current...)
203 newSPS = append(newSPS, newSpanProcessorState(sp))
204 p.spanProcessors.Store(&newSPS)
205 }
206
207
208 func (p *TracerProvider) UnregisterSpanProcessor(sp SpanProcessor) {
209
210 if p.isShutdown.Load() {
211 return
212 }
213 p.mu.Lock()
214 defer p.mu.Unlock()
215
216 if p.isShutdown.Load() {
217 return
218 }
219 old := p.getSpanProcessors()
220 if len(old) == 0 {
221 return
222 }
223 spss := make(spanProcessorStates, len(old))
224 copy(spss, old)
225
226
227 var stopOnce *spanProcessorState
228 var idx int
229 for i, sps := range spss {
230 if sps.sp == sp {
231 stopOnce = sps
232 idx = i
233 }
234 }
235 if stopOnce != nil {
236 stopOnce.state.Do(func() {
237 if err := sp.Shutdown(context.Background()); err != nil {
238 otel.Handle(err)
239 }
240 })
241 }
242 if len(spss) > 1 {
243 copy(spss[idx:], spss[idx+1:])
244 }
245 spss[len(spss)-1] = nil
246 spss = spss[:len(spss)-1]
247
248 p.spanProcessors.Store(&spss)
249 }
250
251
252
253 func (p *TracerProvider) ForceFlush(ctx context.Context) error {
254 spss := p.getSpanProcessors()
255 if len(spss) == 0 {
256 return nil
257 }
258
259 for _, sps := range spss {
260 select {
261 case <-ctx.Done():
262 return ctx.Err()
263 default:
264 }
265
266 if err := sps.sp.ForceFlush(ctx); err != nil {
267 return err
268 }
269 }
270 return nil
271 }
272
273
274
275
276 func (p *TracerProvider) Shutdown(ctx context.Context) error {
277
278 if p.isShutdown.Load() {
279 return nil
280 }
281 p.mu.Lock()
282 defer p.mu.Unlock()
283
284 if !p.isShutdown.CompareAndSwap(false, true) {
285 return nil
286 }
287
288 var retErr error
289 for _, sps := range p.getSpanProcessors() {
290 select {
291 case <-ctx.Done():
292 return ctx.Err()
293 default:
294 }
295
296 var err error
297 sps.state.Do(func() {
298 err = sps.sp.Shutdown(ctx)
299 })
300 if err != nil {
301 if retErr == nil {
302 retErr = err
303 } else {
304
305 retErr = fmt.Errorf("%v; %v", retErr, err)
306 }
307 }
308 }
309 p.spanProcessors.Store(&spanProcessorStates{})
310 return retErr
311 }
312
313 func (p *TracerProvider) getSpanProcessors() spanProcessorStates {
314 return *(p.spanProcessors.Load())
315 }
316
317
318 type TracerProviderOption interface {
319 apply(tracerProviderConfig) tracerProviderConfig
320 }
321
322 type traceProviderOptionFunc func(tracerProviderConfig) tracerProviderConfig
323
324 func (fn traceProviderOptionFunc) apply(cfg tracerProviderConfig) tracerProviderConfig {
325 return fn(cfg)
326 }
327
328
329
330
331
332
333
334
335
336 func WithSyncer(e SpanExporter) TracerProviderOption {
337 return WithSpanProcessor(NewSimpleSpanProcessor(e))
338 }
339
340
341
342 func WithBatcher(e SpanExporter, opts ...BatchSpanProcessorOption) TracerProviderOption {
343 return WithSpanProcessor(NewBatchSpanProcessor(e, opts...))
344 }
345
346
347 func WithSpanProcessor(sp SpanProcessor) TracerProviderOption {
348 return traceProviderOptionFunc(func(cfg tracerProviderConfig) tracerProviderConfig {
349 cfg.processors = append(cfg.processors, sp)
350 return cfg
351 })
352 }
353
354
355
356
357
358
359
360
361 func WithResource(r *resource.Resource) TracerProviderOption {
362 return traceProviderOptionFunc(func(cfg tracerProviderConfig) tracerProviderConfig {
363 var err error
364 cfg.resource, err = resource.Merge(resource.Environment(), r)
365 if err != nil {
366 otel.Handle(err)
367 }
368 return cfg
369 })
370 }
371
372
373
374
375
376
377
378
379 func WithIDGenerator(g IDGenerator) TracerProviderOption {
380 return traceProviderOptionFunc(func(cfg tracerProviderConfig) tracerProviderConfig {
381 if g != nil {
382 cfg.idGenerator = g
383 }
384 return cfg
385 })
386 }
387
388
389
390
391
392
393
394
395
396
397
398 func WithSampler(s Sampler) TracerProviderOption {
399 return traceProviderOptionFunc(func(cfg tracerProviderConfig) tracerProviderConfig {
400 if s != nil {
401 cfg.sampler = s
402 }
403 return cfg
404 })
405 }
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422 func WithSpanLimits(sl SpanLimits) TracerProviderOption {
423 if sl.AttributeValueLengthLimit <= 0 {
424 sl.AttributeValueLengthLimit = DefaultAttributeValueLengthLimit
425 }
426 if sl.AttributeCountLimit <= 0 {
427 sl.AttributeCountLimit = DefaultAttributeCountLimit
428 }
429 if sl.EventCountLimit <= 0 {
430 sl.EventCountLimit = DefaultEventCountLimit
431 }
432 if sl.AttributePerEventCountLimit <= 0 {
433 sl.AttributePerEventCountLimit = DefaultAttributePerEventCountLimit
434 }
435 if sl.LinkCountLimit <= 0 {
436 sl.LinkCountLimit = DefaultLinkCountLimit
437 }
438 if sl.AttributePerLinkCountLimit <= 0 {
439 sl.AttributePerLinkCountLimit = DefaultAttributePerLinkCountLimit
440 }
441 return traceProviderOptionFunc(func(cfg tracerProviderConfig) tracerProviderConfig {
442 cfg.spanLimits = sl
443 return cfg
444 })
445 }
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462 func WithRawSpanLimits(limits SpanLimits) TracerProviderOption {
463 return traceProviderOptionFunc(func(cfg tracerProviderConfig) tracerProviderConfig {
464 cfg.spanLimits = limits
465 return cfg
466 })
467 }
468
469 func applyTracerProviderEnvConfigs(cfg tracerProviderConfig) tracerProviderConfig {
470 for _, opt := range tracerProviderOptionsFromEnv() {
471 cfg = opt.apply(cfg)
472 }
473
474 return cfg
475 }
476
477 func tracerProviderOptionsFromEnv() []TracerProviderOption {
478 var opts []TracerProviderOption
479
480 sampler, err := samplerFromEnv()
481 if err != nil {
482 otel.Handle(err)
483 }
484
485 if sampler != nil {
486 opts = append(opts, WithSampler(sampler))
487 }
488
489 return opts
490 }
491
492
493 func ensureValidTracerProviderConfig(cfg tracerProviderConfig) tracerProviderConfig {
494 if cfg.sampler == nil {
495 cfg.sampler = ParentBased(AlwaysSample())
496 }
497 if cfg.idGenerator == nil {
498 cfg.idGenerator = defaultIDGenerator()
499 }
500 if cfg.resource == nil {
501 cfg.resource = resource.Default()
502 }
503 return cfg
504 }
505
View as plain text