...

Source file src/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go

Documentation: go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc

     1  // Copyright The OpenTelemetry Authors
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
     5  
     6  // gRPC tracing middleware
     7  // https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/rpc.md
     8  import (
     9  	"context"
    10  	"io"
    11  	"net"
    12  	"strconv"
    13  	"time"
    14  
    15  	"google.golang.org/grpc"
    16  	grpc_codes "google.golang.org/grpc/codes"
    17  	"google.golang.org/grpc/metadata"
    18  	"google.golang.org/grpc/peer"
    19  	"google.golang.org/grpc/status"
    20  	"google.golang.org/protobuf/proto"
    21  
    22  	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
    23  	"go.opentelemetry.io/otel/attribute"
    24  	"go.opentelemetry.io/otel/codes"
    25  	"go.opentelemetry.io/otel/metric"
    26  	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
    27  	"go.opentelemetry.io/otel/trace"
    28  )
    29  
    30  type messageType attribute.KeyValue
    31  
    32  // Event adds an event of the messageType to the span associated with the
    33  // passed context with a message id.
    34  func (m messageType) Event(ctx context.Context, id int, _ interface{}) {
    35  	span := trace.SpanFromContext(ctx)
    36  	if !span.IsRecording() {
    37  		return
    38  	}
    39  	span.AddEvent("message", trace.WithAttributes(
    40  		attribute.KeyValue(m),
    41  		RPCMessageIDKey.Int(id),
    42  	))
    43  }
    44  
    45  var (
    46  	messageSent     = messageType(RPCMessageTypeSent)
    47  	messageReceived = messageType(RPCMessageTypeReceived)
    48  )
    49  
    50  // UnaryClientInterceptor returns a grpc.UnaryClientInterceptor suitable
    51  // for use in a grpc.Dial call.
    52  //
    53  // Deprecated: Use [NewClientHandler] instead.
    54  func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
    55  	cfg := newConfig(opts, "client")
    56  	tracer := cfg.TracerProvider.Tracer(
    57  		ScopeName,
    58  		trace.WithInstrumentationVersion(Version()),
    59  	)
    60  
    61  	return func(
    62  		ctx context.Context,
    63  		method string,
    64  		req, reply interface{},
    65  		cc *grpc.ClientConn,
    66  		invoker grpc.UnaryInvoker,
    67  		callOpts ...grpc.CallOption,
    68  	) error {
    69  		i := &InterceptorInfo{
    70  			Method: method,
    71  			Type:   UnaryClient,
    72  		}
    73  		if cfg.Filter != nil && !cfg.Filter(i) {
    74  			return invoker(ctx, method, req, reply, cc, callOpts...)
    75  		}
    76  
    77  		name, attr, _ := telemetryAttributes(method, cc.Target())
    78  
    79  		startOpts := append([]trace.SpanStartOption{
    80  			trace.WithSpanKind(trace.SpanKindClient),
    81  			trace.WithAttributes(attr...),
    82  		},
    83  			cfg.SpanStartOptions...,
    84  		)
    85  
    86  		ctx, span := tracer.Start(
    87  			ctx,
    88  			name,
    89  			startOpts...,
    90  		)
    91  		defer span.End()
    92  
    93  		ctx = inject(ctx, cfg.Propagators)
    94  
    95  		if cfg.SentEvent {
    96  			messageSent.Event(ctx, 1, req)
    97  		}
    98  
    99  		err := invoker(ctx, method, req, reply, cc, callOpts...)
   100  
   101  		if cfg.ReceivedEvent {
   102  			messageReceived.Event(ctx, 1, reply)
   103  		}
   104  
   105  		if err != nil {
   106  			s, _ := status.FromError(err)
   107  			span.SetStatus(codes.Error, s.Message())
   108  			span.SetAttributes(statusCodeAttr(s.Code()))
   109  		} else {
   110  			span.SetAttributes(statusCodeAttr(grpc_codes.OK))
   111  		}
   112  
   113  		return err
   114  	}
   115  }
   116  
   117  // clientStream  wraps around the embedded grpc.ClientStream, and intercepts the RecvMsg and
   118  // SendMsg method call.
   119  type clientStream struct {
   120  	grpc.ClientStream
   121  	desc *grpc.StreamDesc
   122  
   123  	span trace.Span
   124  
   125  	receivedEvent bool
   126  	sentEvent     bool
   127  
   128  	receivedMessageID int
   129  	sentMessageID     int
   130  }
   131  
   132  var _ = proto.Marshal
   133  
   134  func (w *clientStream) RecvMsg(m interface{}) error {
   135  	err := w.ClientStream.RecvMsg(m)
   136  
   137  	if err == nil && !w.desc.ServerStreams {
   138  		w.endSpan(nil)
   139  	} else if err == io.EOF {
   140  		w.endSpan(nil)
   141  	} else if err != nil {
   142  		w.endSpan(err)
   143  	} else {
   144  		w.receivedMessageID++
   145  
   146  		if w.receivedEvent {
   147  			messageReceived.Event(w.Context(), w.receivedMessageID, m)
   148  		}
   149  	}
   150  
   151  	return err
   152  }
   153  
   154  func (w *clientStream) SendMsg(m interface{}) error {
   155  	err := w.ClientStream.SendMsg(m)
   156  
   157  	w.sentMessageID++
   158  
   159  	if w.sentEvent {
   160  		messageSent.Event(w.Context(), w.sentMessageID, m)
   161  	}
   162  
   163  	if err != nil {
   164  		w.endSpan(err)
   165  	}
   166  
   167  	return err
   168  }
   169  
   170  func (w *clientStream) Header() (metadata.MD, error) {
   171  	md, err := w.ClientStream.Header()
   172  	if err != nil {
   173  		w.endSpan(err)
   174  	}
   175  
   176  	return md, err
   177  }
   178  
   179  func (w *clientStream) CloseSend() error {
   180  	err := w.ClientStream.CloseSend()
   181  	if err != nil {
   182  		w.endSpan(err)
   183  	}
   184  
   185  	return err
   186  }
   187  
   188  func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, cfg *config) *clientStream {
   189  	return &clientStream{
   190  		ClientStream:  s,
   191  		span:          span,
   192  		desc:          desc,
   193  		receivedEvent: cfg.ReceivedEvent,
   194  		sentEvent:     cfg.SentEvent,
   195  	}
   196  }
   197  
   198  func (w *clientStream) endSpan(err error) {
   199  	if err != nil {
   200  		s, _ := status.FromError(err)
   201  		w.span.SetStatus(codes.Error, s.Message())
   202  		w.span.SetAttributes(statusCodeAttr(s.Code()))
   203  	} else {
   204  		w.span.SetAttributes(statusCodeAttr(grpc_codes.OK))
   205  	}
   206  
   207  	w.span.End()
   208  }
   209  
   210  // StreamClientInterceptor returns a grpc.StreamClientInterceptor suitable
   211  // for use in a grpc.Dial call.
   212  //
   213  // Deprecated: Use [NewClientHandler] instead.
   214  func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
   215  	cfg := newConfig(opts, "client")
   216  	tracer := cfg.TracerProvider.Tracer(
   217  		ScopeName,
   218  		trace.WithInstrumentationVersion(Version()),
   219  	)
   220  
   221  	return func(
   222  		ctx context.Context,
   223  		desc *grpc.StreamDesc,
   224  		cc *grpc.ClientConn,
   225  		method string,
   226  		streamer grpc.Streamer,
   227  		callOpts ...grpc.CallOption,
   228  	) (grpc.ClientStream, error) {
   229  		i := &InterceptorInfo{
   230  			Method: method,
   231  			Type:   StreamClient,
   232  		}
   233  		if cfg.Filter != nil && !cfg.Filter(i) {
   234  			return streamer(ctx, desc, cc, method, callOpts...)
   235  		}
   236  
   237  		name, attr, _ := telemetryAttributes(method, cc.Target())
   238  
   239  		startOpts := append([]trace.SpanStartOption{
   240  			trace.WithSpanKind(trace.SpanKindClient),
   241  			trace.WithAttributes(attr...),
   242  		},
   243  			cfg.SpanStartOptions...,
   244  		)
   245  
   246  		ctx, span := tracer.Start(
   247  			ctx,
   248  			name,
   249  			startOpts...,
   250  		)
   251  
   252  		ctx = inject(ctx, cfg.Propagators)
   253  
   254  		s, err := streamer(ctx, desc, cc, method, callOpts...)
   255  		if err != nil {
   256  			grpcStatus, _ := status.FromError(err)
   257  			span.SetStatus(codes.Error, grpcStatus.Message())
   258  			span.SetAttributes(statusCodeAttr(grpcStatus.Code()))
   259  			span.End()
   260  			return s, err
   261  		}
   262  		stream := wrapClientStream(ctx, s, desc, span, cfg)
   263  		return stream, nil
   264  	}
   265  }
   266  
   267  // UnaryServerInterceptor returns a grpc.UnaryServerInterceptor suitable
   268  // for use in a grpc.NewServer call.
   269  //
   270  // Deprecated: Use [NewServerHandler] instead.
   271  func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
   272  	cfg := newConfig(opts, "server")
   273  	tracer := cfg.TracerProvider.Tracer(
   274  		ScopeName,
   275  		trace.WithInstrumentationVersion(Version()),
   276  	)
   277  
   278  	return func(
   279  		ctx context.Context,
   280  		req interface{},
   281  		info *grpc.UnaryServerInfo,
   282  		handler grpc.UnaryHandler,
   283  	) (interface{}, error) {
   284  		i := &InterceptorInfo{
   285  			UnaryServerInfo: info,
   286  			Type:            UnaryServer,
   287  		}
   288  		if cfg.Filter != nil && !cfg.Filter(i) {
   289  			return handler(ctx, req)
   290  		}
   291  
   292  		ctx = extract(ctx, cfg.Propagators)
   293  		name, attr, metricAttrs := telemetryAttributes(info.FullMethod, peerFromCtx(ctx))
   294  
   295  		startOpts := append([]trace.SpanStartOption{
   296  			trace.WithSpanKind(trace.SpanKindServer),
   297  			trace.WithAttributes(attr...),
   298  		},
   299  			cfg.SpanStartOptions...,
   300  		)
   301  
   302  		ctx, span := tracer.Start(
   303  			trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
   304  			name,
   305  			startOpts...,
   306  		)
   307  		defer span.End()
   308  
   309  		if cfg.ReceivedEvent {
   310  			messageReceived.Event(ctx, 1, req)
   311  		}
   312  
   313  		before := time.Now()
   314  
   315  		resp, err := handler(ctx, req)
   316  
   317  		s, _ := status.FromError(err)
   318  		if err != nil {
   319  			statusCode, msg := serverStatus(s)
   320  			span.SetStatus(statusCode, msg)
   321  			if cfg.SentEvent {
   322  				messageSent.Event(ctx, 1, s.Proto())
   323  			}
   324  		} else {
   325  			if cfg.SentEvent {
   326  				messageSent.Event(ctx, 1, resp)
   327  			}
   328  		}
   329  		grpcStatusCodeAttr := statusCodeAttr(s.Code())
   330  		span.SetAttributes(grpcStatusCodeAttr)
   331  
   332  		// Use floating point division here for higher precision (instead of Millisecond method).
   333  		elapsedTime := float64(time.Since(before)) / float64(time.Millisecond)
   334  
   335  		metricAttrs = append(metricAttrs, grpcStatusCodeAttr)
   336  		cfg.rpcDuration.Record(ctx, elapsedTime, metric.WithAttributes(metricAttrs...))
   337  
   338  		return resp, err
   339  	}
   340  }
   341  
   342  // serverStream wraps around the embedded grpc.ServerStream, and intercepts the RecvMsg and
   343  // SendMsg method call.
   344  type serverStream struct {
   345  	grpc.ServerStream
   346  	ctx context.Context
   347  
   348  	receivedMessageID int
   349  	sentMessageID     int
   350  
   351  	receivedEvent bool
   352  	sentEvent     bool
   353  }
   354  
   355  func (w *serverStream) Context() context.Context {
   356  	return w.ctx
   357  }
   358  
   359  func (w *serverStream) RecvMsg(m interface{}) error {
   360  	err := w.ServerStream.RecvMsg(m)
   361  
   362  	if err == nil {
   363  		w.receivedMessageID++
   364  		if w.receivedEvent {
   365  			messageReceived.Event(w.Context(), w.receivedMessageID, m)
   366  		}
   367  	}
   368  
   369  	return err
   370  }
   371  
   372  func (w *serverStream) SendMsg(m interface{}) error {
   373  	err := w.ServerStream.SendMsg(m)
   374  
   375  	w.sentMessageID++
   376  	if w.sentEvent {
   377  		messageSent.Event(w.Context(), w.sentMessageID, m)
   378  	}
   379  
   380  	return err
   381  }
   382  
   383  func wrapServerStream(ctx context.Context, ss grpc.ServerStream, cfg *config) *serverStream {
   384  	return &serverStream{
   385  		ServerStream:  ss,
   386  		ctx:           ctx,
   387  		receivedEvent: cfg.ReceivedEvent,
   388  		sentEvent:     cfg.SentEvent,
   389  	}
   390  }
   391  
   392  // StreamServerInterceptor returns a grpc.StreamServerInterceptor suitable
   393  // for use in a grpc.NewServer call.
   394  //
   395  // Deprecated: Use [NewServerHandler] instead.
   396  func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
   397  	cfg := newConfig(opts, "server")
   398  	tracer := cfg.TracerProvider.Tracer(
   399  		ScopeName,
   400  		trace.WithInstrumentationVersion(Version()),
   401  	)
   402  
   403  	return func(
   404  		srv interface{},
   405  		ss grpc.ServerStream,
   406  		info *grpc.StreamServerInfo,
   407  		handler grpc.StreamHandler,
   408  	) error {
   409  		ctx := ss.Context()
   410  		i := &InterceptorInfo{
   411  			StreamServerInfo: info,
   412  			Type:             StreamServer,
   413  		}
   414  		if cfg.Filter != nil && !cfg.Filter(i) {
   415  			return handler(srv, wrapServerStream(ctx, ss, cfg))
   416  		}
   417  
   418  		ctx = extract(ctx, cfg.Propagators)
   419  		name, attr, _ := telemetryAttributes(info.FullMethod, peerFromCtx(ctx))
   420  
   421  		startOpts := append([]trace.SpanStartOption{
   422  			trace.WithSpanKind(trace.SpanKindServer),
   423  			trace.WithAttributes(attr...),
   424  		},
   425  			cfg.SpanStartOptions...,
   426  		)
   427  
   428  		ctx, span := tracer.Start(
   429  			trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
   430  			name,
   431  			startOpts...,
   432  		)
   433  		defer span.End()
   434  
   435  		err := handler(srv, wrapServerStream(ctx, ss, cfg))
   436  		if err != nil {
   437  			s, _ := status.FromError(err)
   438  			statusCode, msg := serverStatus(s)
   439  			span.SetStatus(statusCode, msg)
   440  			span.SetAttributes(statusCodeAttr(s.Code()))
   441  		} else {
   442  			span.SetAttributes(statusCodeAttr(grpc_codes.OK))
   443  		}
   444  
   445  		return err
   446  	}
   447  }
   448  
   449  // telemetryAttributes returns a span name and span and metric attributes from
   450  // the gRPC method and peer address.
   451  func telemetryAttributes(fullMethod, peerAddress string) (string, []attribute.KeyValue, []attribute.KeyValue) {
   452  	name, methodAttrs := internal.ParseFullMethod(fullMethod)
   453  	peerAttrs := peerAttr(peerAddress)
   454  
   455  	attrs := make([]attribute.KeyValue, 0, 1+len(methodAttrs)+len(peerAttrs))
   456  	attrs = append(attrs, RPCSystemGRPC)
   457  	attrs = append(attrs, methodAttrs...)
   458  	metricAttrs := attrs[:1+len(methodAttrs)]
   459  	attrs = append(attrs, peerAttrs...)
   460  	return name, attrs, metricAttrs
   461  }
   462  
   463  // peerAttr returns attributes about the peer address.
   464  func peerAttr(addr string) []attribute.KeyValue {
   465  	host, p, err := net.SplitHostPort(addr)
   466  	if err != nil {
   467  		return nil
   468  	}
   469  
   470  	if host == "" {
   471  		host = "127.0.0.1"
   472  	}
   473  	port, err := strconv.Atoi(p)
   474  	if err != nil {
   475  		return nil
   476  	}
   477  
   478  	var attr []attribute.KeyValue
   479  	if ip := net.ParseIP(host); ip != nil {
   480  		attr = []attribute.KeyValue{
   481  			semconv.NetSockPeerAddr(host),
   482  			semconv.NetSockPeerPort(port),
   483  		}
   484  	} else {
   485  		attr = []attribute.KeyValue{
   486  			semconv.NetPeerName(host),
   487  			semconv.NetPeerPort(port),
   488  		}
   489  	}
   490  
   491  	return attr
   492  }
   493  
   494  // peerFromCtx returns a peer address from a context, if one exists.
   495  func peerFromCtx(ctx context.Context) string {
   496  	p, ok := peer.FromContext(ctx)
   497  	if !ok {
   498  		return ""
   499  	}
   500  	return p.Addr.String()
   501  }
   502  
   503  // statusCodeAttr returns status code attribute based on given gRPC code.
   504  func statusCodeAttr(c grpc_codes.Code) attribute.KeyValue {
   505  	return GRPCStatusCodeKey.Int64(int64(c))
   506  }
   507  
   508  // serverStatus returns a span status code and message for a given gRPC
   509  // status code. It maps specific gRPC status codes to a corresponding span
   510  // status code and message. This function is intended for use on the server
   511  // side of a gRPC connection.
   512  //
   513  // If the gRPC status code is Unknown, DeadlineExceeded, Unimplemented,
   514  // Internal, Unavailable, or DataLoss, it returns a span status code of Error
   515  // and the message from the gRPC status. Otherwise, it returns a span status
   516  // code of Unset and an empty message.
   517  func serverStatus(grpcStatus *status.Status) (codes.Code, string) {
   518  	switch grpcStatus.Code() {
   519  	case grpc_codes.Unknown,
   520  		grpc_codes.DeadlineExceeded,
   521  		grpc_codes.Unimplemented,
   522  		grpc_codes.Internal,
   523  		grpc_codes.Unavailable,
   524  		grpc_codes.DataLoss:
   525  		return codes.Error, grpcStatus.Message()
   526  	default:
   527  		return codes.Unset, ""
   528  	}
   529  }
   530  

View as plain text