...

Source file src/github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing/client_interceptors.go

Documentation: github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing

     1  // Copyright 2017 Michal Witkowski. All Rights Reserved.
     2  // See LICENSE for licensing terms.
     3  
     4  package grpc_opentracing
     5  
     6  import (
     7  	"context"
     8  	"io"
     9  	"sync"
    10  
    11  	"github.com/grpc-ecosystem/go-grpc-middleware/util/metautils"
    12  	opentracing "github.com/opentracing/opentracing-go"
    13  	"github.com/opentracing/opentracing-go/ext"
    14  	"github.com/opentracing/opentracing-go/log"
    15  	"google.golang.org/grpc"
    16  	"google.golang.org/grpc/grpclog"
    17  	"google.golang.org/grpc/metadata"
    18  )
    19  
    20  // UnaryClientInterceptor returns a new unary client interceptor for OpenTracing.
    21  func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
    22  	o := evaluateOptions(opts)
    23  	return func(parentCtx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    24  		if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) {
    25  			return invoker(parentCtx, method, req, reply, cc, opts...)
    26  		}
    27  		newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method)
    28  		if o.unaryRequestHandlerFunc != nil {
    29  			o.unaryRequestHandlerFunc(clientSpan, req)
    30  		}
    31  		err := invoker(newCtx, method, req, reply, cc, opts...)
    32  		finishClientSpan(clientSpan, err)
    33  		return err
    34  	}
    35  }
    36  
    37  // StreamClientInterceptor returns a new streaming client interceptor for OpenTracing.
    38  func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
    39  	o := evaluateOptions(opts)
    40  	return func(parentCtx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    41  		if o.filterOutFunc != nil && !o.filterOutFunc(parentCtx, method) {
    42  			return streamer(parentCtx, desc, cc, method, opts...)
    43  		}
    44  		newCtx, clientSpan := newClientSpanFromContext(parentCtx, o.tracer, method)
    45  		clientStream, err := streamer(newCtx, desc, cc, method, opts...)
    46  		if err != nil {
    47  			finishClientSpan(clientSpan, err)
    48  			return nil, err
    49  		}
    50  		return &tracedClientStream{ClientStream: clientStream, clientSpan: clientSpan}, nil
    51  	}
    52  }
    53  
    54  // type serverStreamingRetryingStream is the implementation of grpc.ClientStream that acts as a
    55  // proxy to the underlying call. If any of the RecvMsg() calls fail, it will try to reestablish
    56  // a new ClientStream according to the retry policy.
    57  type tracedClientStream struct {
    58  	grpc.ClientStream
    59  	mu              sync.Mutex
    60  	alreadyFinished bool
    61  	clientSpan      opentracing.Span
    62  }
    63  
    64  func (s *tracedClientStream) Header() (metadata.MD, error) {
    65  	h, err := s.ClientStream.Header()
    66  	if err != nil {
    67  		s.finishClientSpan(err)
    68  	}
    69  	return h, err
    70  }
    71  
    72  func (s *tracedClientStream) SendMsg(m interface{}) error {
    73  	err := s.ClientStream.SendMsg(m)
    74  	if err != nil {
    75  		s.finishClientSpan(err)
    76  	}
    77  	return err
    78  }
    79  
    80  func (s *tracedClientStream) CloseSend() error {
    81  	err := s.ClientStream.CloseSend()
    82  	s.finishClientSpan(err)
    83  	return err
    84  }
    85  
    86  func (s *tracedClientStream) RecvMsg(m interface{}) error {
    87  	err := s.ClientStream.RecvMsg(m)
    88  	if err != nil {
    89  		s.finishClientSpan(err)
    90  	}
    91  	return err
    92  }
    93  
    94  func (s *tracedClientStream) finishClientSpan(err error) {
    95  	s.mu.Lock()
    96  	defer s.mu.Unlock()
    97  	if !s.alreadyFinished {
    98  		finishClientSpan(s.clientSpan, err)
    99  		s.alreadyFinished = true
   100  	}
   101  }
   102  
   103  // ClientAddContextTags returns a context with specified opentracing tags, which
   104  // are used by UnaryClientInterceptor/StreamClientInterceptor when creating a
   105  // new span.
   106  func ClientAddContextTags(ctx context.Context, tags opentracing.Tags) context.Context {
   107  	return context.WithValue(ctx, clientSpanTagKey{}, tags)
   108  }
   109  
   110  type clientSpanTagKey struct{}
   111  
   112  func newClientSpanFromContext(ctx context.Context, tracer opentracing.Tracer, fullMethodName string) (context.Context, opentracing.Span) {
   113  	var parentSpanCtx opentracing.SpanContext
   114  	if parent := opentracing.SpanFromContext(ctx); parent != nil {
   115  		parentSpanCtx = parent.Context()
   116  	}
   117  	opts := []opentracing.StartSpanOption{
   118  		opentracing.ChildOf(parentSpanCtx),
   119  		ext.SpanKindRPCClient,
   120  		grpcTag,
   121  	}
   122  	if tagx := ctx.Value(clientSpanTagKey{}); tagx != nil {
   123  		if opt, ok := tagx.(opentracing.StartSpanOption); ok {
   124  			opts = append(opts, opt)
   125  		}
   126  	}
   127  	clientSpan := tracer.StartSpan(fullMethodName, opts...)
   128  	// Make sure we add this to the metadata of the call, so it gets propagated:
   129  	md := metautils.ExtractOutgoing(ctx).Clone()
   130  	if err := tracer.Inject(clientSpan.Context(), opentracing.HTTPHeaders, metadataTextMap(md)); err != nil {
   131  		grpclog.Infof("grpc_opentracing: failed serializing trace information: %v", err)
   132  	}
   133  	ctxWithMetadata := md.ToOutgoing(ctx)
   134  	return opentracing.ContextWithSpan(ctxWithMetadata, clientSpan), clientSpan
   135  }
   136  
   137  func finishClientSpan(clientSpan opentracing.Span, err error) {
   138  	if err != nil && err != io.EOF {
   139  		ext.Error.Set(clientSpan, true)
   140  		clientSpan.LogFields(log.String("event", "error"), log.String("message", err.Error()))
   141  	}
   142  	clientSpan.Finish()
   143  }
   144  

View as plain text