...

Source file src/go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/client.go

Documentation: go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc

     1  // Copyright The OpenTelemetry Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package otlptracegrpc // import "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"sync"
    21  	"time"
    22  
    23  	"google.golang.org/genproto/googleapis/rpc/errdetails"
    24  	"google.golang.org/grpc"
    25  	"google.golang.org/grpc/codes"
    26  	"google.golang.org/grpc/metadata"
    27  	"google.golang.org/grpc/status"
    28  
    29  	"go.opentelemetry.io/otel"
    30  	"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
    31  	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal"
    32  	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/otlpconfig"
    33  	"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc/internal/retry"
    34  	coltracepb "go.opentelemetry.io/proto/otlp/collector/trace/v1"
    35  	tracepb "go.opentelemetry.io/proto/otlp/trace/v1"
    36  )
    37  
    38  type client struct {
    39  	endpoint      string
    40  	dialOpts      []grpc.DialOption
    41  	metadata      metadata.MD
    42  	exportTimeout time.Duration
    43  	requestFunc   retry.RequestFunc
    44  
    45  	// stopCtx is used as a parent context for all exports. Therefore, when it
    46  	// is canceled with the stopFunc all exports are canceled.
    47  	stopCtx context.Context
    48  	// stopFunc cancels stopCtx, stopping any active exports.
    49  	stopFunc context.CancelFunc
    50  
    51  	// ourConn keeps track of where conn was created: true if created here on
    52  	// Start, or false if passed with an option. This is important on Shutdown
    53  	// as the conn should only be closed if created here on start. Otherwise,
    54  	// it is up to the processes that passed the conn to close it.
    55  	ourConn bool
    56  	conn    *grpc.ClientConn
    57  	tscMu   sync.RWMutex
    58  	tsc     coltracepb.TraceServiceClient
    59  }
    60  
    61  // Compile time check *client implements otlptrace.Client.
    62  var _ otlptrace.Client = (*client)(nil)
    63  
    64  // NewClient creates a new gRPC trace client.
    65  func NewClient(opts ...Option) otlptrace.Client {
    66  	return newClient(opts...)
    67  }
    68  
    69  func newClient(opts ...Option) *client {
    70  	cfg := otlpconfig.NewGRPCConfig(asGRPCOptions(opts)...)
    71  
    72  	ctx, cancel := context.WithCancel(context.Background())
    73  
    74  	c := &client{
    75  		endpoint:      cfg.Traces.Endpoint,
    76  		exportTimeout: cfg.Traces.Timeout,
    77  		requestFunc:   cfg.RetryConfig.RequestFunc(retryable),
    78  		dialOpts:      cfg.DialOptions,
    79  		stopCtx:       ctx,
    80  		stopFunc:      cancel,
    81  		conn:          cfg.GRPCConn,
    82  	}
    83  
    84  	if len(cfg.Traces.Headers) > 0 {
    85  		c.metadata = metadata.New(cfg.Traces.Headers)
    86  	}
    87  
    88  	return c
    89  }
    90  
    91  // Start establishes a gRPC connection to the collector.
    92  func (c *client) Start(ctx context.Context) error {
    93  	if c.conn == nil {
    94  		// If the caller did not provide a ClientConn when the client was
    95  		// created, create one using the configuration they did provide.
    96  		conn, err := grpc.DialContext(ctx, c.endpoint, c.dialOpts...)
    97  		if err != nil {
    98  			return err
    99  		}
   100  		// Keep track that we own the lifecycle of this conn and need to close
   101  		// it on Shutdown.
   102  		c.ourConn = true
   103  		c.conn = conn
   104  	}
   105  
   106  	// The otlptrace.Client interface states this method is called just once,
   107  	// so no need to check if already started.
   108  	c.tscMu.Lock()
   109  	c.tsc = coltracepb.NewTraceServiceClient(c.conn)
   110  	c.tscMu.Unlock()
   111  
   112  	return nil
   113  }
   114  
   115  var errAlreadyStopped = errors.New("the client is already stopped")
   116  
   117  // Stop shuts down the client.
   118  //
   119  // Any active connections to a remote endpoint are closed if they were created
   120  // by the client. Any gRPC connection passed during creation using
   121  // WithGRPCConn will not be closed. It is the caller's responsibility to
   122  // handle cleanup of that resource.
   123  //
   124  // This method synchronizes with the UploadTraces method of the client. It
   125  // will wait for any active calls to that method to complete unimpeded, or it
   126  // will cancel any active calls if ctx expires. If ctx expires, the context
   127  // error will be forwarded as the returned error. All client held resources
   128  // will still be released in this situation.
   129  //
   130  // If the client has already stopped, an error will be returned describing
   131  // this.
   132  func (c *client) Stop(ctx context.Context) error {
   133  	// Make sure to return context error if the context is done when calling this method.
   134  	err := ctx.Err()
   135  
   136  	// Acquire the c.tscMu lock within the ctx lifetime.
   137  	acquired := make(chan struct{})
   138  	go func() {
   139  		c.tscMu.Lock()
   140  		close(acquired)
   141  	}()
   142  
   143  	select {
   144  	case <-ctx.Done():
   145  		// The Stop timeout is reached. Kill any remaining exports to force
   146  		// the clear of the lock and save the timeout error to return and
   147  		// signal the shutdown timed out before cleanly stopping.
   148  		c.stopFunc()
   149  		err = ctx.Err()
   150  
   151  		// To ensure the client is not left in a dirty state c.tsc needs to be
   152  		// set to nil. To avoid the race condition when doing this, ensure
   153  		// that all the exports are killed (initiated by c.stopFunc).
   154  		<-acquired
   155  	case <-acquired:
   156  	}
   157  	// Hold the tscMu lock for the rest of the function to ensure no new
   158  	// exports are started.
   159  	defer c.tscMu.Unlock()
   160  
   161  	// The otlptrace.Client interface states this method is called only
   162  	// once, but there is no guarantee it is called after Start. Ensure the
   163  	// client is started before doing anything and let the called know if they
   164  	// made a mistake.
   165  	if c.tsc == nil {
   166  		return errAlreadyStopped
   167  	}
   168  
   169  	// Clear c.tsc to signal the client is stopped.
   170  	c.tsc = nil
   171  
   172  	if c.ourConn {
   173  		closeErr := c.conn.Close()
   174  		// A context timeout error takes precedence over this error.
   175  		if err == nil && closeErr != nil {
   176  			err = closeErr
   177  		}
   178  	}
   179  	return err
   180  }
   181  
   182  var errShutdown = errors.New("the client is shutdown")
   183  
   184  // UploadTraces sends a batch of spans.
   185  //
   186  // Retryable errors from the server will be handled according to any
   187  // RetryConfig the client was created with.
   188  func (c *client) UploadTraces(ctx context.Context, protoSpans []*tracepb.ResourceSpans) error {
   189  	// Hold a read lock to ensure a shut down initiated after this starts does
   190  	// not abandon the export. This read lock acquire has less priority than a
   191  	// write lock acquire (i.e. Stop), meaning if the client is shutting down
   192  	// this will come after the shut down.
   193  	c.tscMu.RLock()
   194  	defer c.tscMu.RUnlock()
   195  
   196  	if c.tsc == nil {
   197  		return errShutdown
   198  	}
   199  
   200  	ctx, cancel := c.exportContext(ctx)
   201  	defer cancel()
   202  
   203  	return c.requestFunc(ctx, func(iCtx context.Context) error {
   204  		resp, err := c.tsc.Export(iCtx, &coltracepb.ExportTraceServiceRequest{
   205  			ResourceSpans: protoSpans,
   206  		})
   207  		if resp != nil && resp.PartialSuccess != nil {
   208  			msg := resp.PartialSuccess.GetErrorMessage()
   209  			n := resp.PartialSuccess.GetRejectedSpans()
   210  			if n != 0 || msg != "" {
   211  				err := internal.TracePartialSuccessError(n, msg)
   212  				otel.Handle(err)
   213  			}
   214  		}
   215  		// nil is converted to OK.
   216  		if status.Code(err) == codes.OK {
   217  			// Success.
   218  			return nil
   219  		}
   220  		return err
   221  	})
   222  }
   223  
   224  // exportContext returns a copy of parent with an appropriate deadline and
   225  // cancellation function.
   226  //
   227  // It is the callers responsibility to cancel the returned context once its
   228  // use is complete, via the parent or directly with the returned CancelFunc, to
   229  // ensure all resources are correctly released.
   230  func (c *client) exportContext(parent context.Context) (context.Context, context.CancelFunc) {
   231  	var (
   232  		ctx    context.Context
   233  		cancel context.CancelFunc
   234  	)
   235  
   236  	if c.exportTimeout > 0 {
   237  		ctx, cancel = context.WithTimeout(parent, c.exportTimeout)
   238  	} else {
   239  		ctx, cancel = context.WithCancel(parent)
   240  	}
   241  
   242  	if c.metadata.Len() > 0 {
   243  		ctx = metadata.NewOutgoingContext(ctx, c.metadata)
   244  	}
   245  
   246  	// Unify the client stopCtx with the parent.
   247  	go func() {
   248  		select {
   249  		case <-ctx.Done():
   250  		case <-c.stopCtx.Done():
   251  			// Cancel the export as the shutdown has timed out.
   252  			cancel()
   253  		}
   254  	}()
   255  
   256  	return ctx, cancel
   257  }
   258  
   259  // retryable returns if err identifies a request that can be retried and a
   260  // duration to wait for if an explicit throttle time is included in err.
   261  func retryable(err error) (bool, time.Duration) {
   262  	s := status.Convert(err)
   263  	return retryableGRPCStatus(s)
   264  }
   265  
   266  func retryableGRPCStatus(s *status.Status) (bool, time.Duration) {
   267  	switch s.Code() {
   268  	case codes.Canceled,
   269  		codes.DeadlineExceeded,
   270  		codes.Aborted,
   271  		codes.OutOfRange,
   272  		codes.Unavailable,
   273  		codes.DataLoss:
   274  		// Additionally handle RetryInfo.
   275  		_, d := throttleDelay(s)
   276  		return true, d
   277  	case codes.ResourceExhausted:
   278  		// Retry only if the server signals that the recovery from resource exhaustion is possible.
   279  		return throttleDelay(s)
   280  	}
   281  
   282  	// Not a retry-able error.
   283  	return false, 0
   284  }
   285  
   286  // throttleDelay returns of the status is RetryInfo
   287  // and the its duration to wait for if an explicit throttle time.
   288  func throttleDelay(s *status.Status) (bool, time.Duration) {
   289  	for _, detail := range s.Details() {
   290  		if t, ok := detail.(*errdetails.RetryInfo); ok {
   291  			return true, t.RetryDelay.AsDuration()
   292  		}
   293  	}
   294  	return false, 0
   295  }
   296  
   297  // MarshalLog is the marshaling function used by the logging system to represent this Client.
   298  func (c *client) MarshalLog() interface{} {
   299  	return struct {
   300  		Type     string
   301  		Endpoint string
   302  	}{
   303  		Type:     "otlphttpgrpc",
   304  		Endpoint: c.endpoint,
   305  	}
   306  }
   307  

View as plain text