...

Source file src/go.etcd.io/etcd/server/v3/embed/config_tracing.go

Documentation: go.etcd.io/etcd/server/v3/embed

     1  // Copyright 2022 The etcd Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package embed
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  
    21  	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
    22  	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    23  	"go.opentelemetry.io/otel/propagation"
    24  	"go.opentelemetry.io/otel/sdk/resource"
    25  	tracesdk "go.opentelemetry.io/otel/sdk/trace"
    26  	semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
    27  	"go.uber.org/zap"
    28  )
    29  
    30  const maxSamplingRatePerMillion = 1000000
    31  
    32  func validateTracingConfig(samplingRate int) error {
    33  	if samplingRate < 0 {
    34  		return fmt.Errorf("tracing sampling rate must be positive")
    35  	}
    36  	if samplingRate > maxSamplingRatePerMillion {
    37  		return fmt.Errorf("tracing sampling rate must be less than %d", maxSamplingRatePerMillion)
    38  	}
    39  
    40  	return nil
    41  }
    42  
    43  type tracingExporter struct {
    44  	exporter tracesdk.SpanExporter
    45  	opts     []otelgrpc.Option
    46  	provider *tracesdk.TracerProvider
    47  }
    48  
    49  func newTracingExporter(ctx context.Context, cfg *Config) (*tracingExporter, error) {
    50  	exporter, err := otlptracegrpc.New(ctx,
    51  		otlptracegrpc.WithInsecure(),
    52  		otlptracegrpc.WithEndpoint(cfg.ExperimentalDistributedTracingAddress),
    53  	)
    54  	if err != nil {
    55  		return nil, err
    56  	}
    57  
    58  	res, err := resource.New(ctx,
    59  		resource.WithAttributes(
    60  			semconv.ServiceNameKey.String(cfg.ExperimentalDistributedTracingServiceName),
    61  		),
    62  	)
    63  	if err != nil {
    64  		return nil, err
    65  	}
    66  
    67  	if resWithIDKey := determineResourceWithIDKey(cfg.ExperimentalDistributedTracingServiceInstanceID); resWithIDKey != nil {
    68  		// Merge resources into a new
    69  		// resource in case of duplicates.
    70  		res, err = resource.Merge(res, resWithIDKey)
    71  		if err != nil {
    72  			return nil, err
    73  		}
    74  	}
    75  
    76  	traceProvider := tracesdk.NewTracerProvider(
    77  		tracesdk.WithBatcher(exporter),
    78  		tracesdk.WithResource(res),
    79  		tracesdk.WithSampler(
    80  			tracesdk.ParentBased(determineSampler(cfg.ExperimentalDistributedTracingSamplingRatePerMillion)),
    81  		),
    82  	)
    83  
    84  	options := []otelgrpc.Option{
    85  		otelgrpc.WithPropagators(
    86  			propagation.NewCompositeTextMapPropagator(
    87  				propagation.TraceContext{},
    88  				propagation.Baggage{},
    89  			),
    90  		),
    91  		otelgrpc.WithTracerProvider(
    92  			traceProvider,
    93  		),
    94  	}
    95  
    96  	cfg.logger.Debug(
    97  		"distributed tracing enabled",
    98  		zap.String("address", cfg.ExperimentalDistributedTracingAddress),
    99  		zap.String("service-name", cfg.ExperimentalDistributedTracingServiceName),
   100  		zap.String("service-instance-id", cfg.ExperimentalDistributedTracingServiceInstanceID),
   101  	)
   102  
   103  	return &tracingExporter{
   104  		exporter: exporter,
   105  		opts:     options,
   106  		provider: traceProvider,
   107  	}, nil
   108  }
   109  
   110  func (te *tracingExporter) Close(ctx context.Context) {
   111  	if te.provider != nil {
   112  		te.provider.Shutdown(ctx)
   113  	}
   114  
   115  	if te.exporter != nil {
   116  		te.exporter.Shutdown(ctx)
   117  	}
   118  }
   119  
   120  func determineSampler(samplingRate int) tracesdk.Sampler {
   121  	sampler := tracesdk.NeverSample()
   122  	if samplingRate == 0 {
   123  		return sampler
   124  	}
   125  	return tracesdk.TraceIDRatioBased(float64(samplingRate) / float64(maxSamplingRatePerMillion))
   126  }
   127  
   128  // As Tracing service Instance ID must be unique, it should
   129  // never use the empty default string value, it's set if
   130  // if it's a non empty string.
   131  func determineResourceWithIDKey(serviceInstanceID string) *resource.Resource {
   132  	if serviceInstanceID != "" {
   133  		return resource.NewSchemaless(
   134  			(semconv.ServiceInstanceIDKey.String(serviceInstanceID)),
   135  		)
   136  	}
   137  	return nil
   138  }
   139  

View as plain text