...

Source file src/go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/stats_handler.go

Documentation: go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc

     1  // Copyright The OpenTelemetry Authors
     2  // SPDX-License-Identifier: Apache-2.0
     3  
     4  package otelgrpc // import "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
     5  
     6  import (
     7  	"context"
     8  	"sync/atomic"
     9  	"time"
    10  
    11  	grpc_codes "google.golang.org/grpc/codes"
    12  	"google.golang.org/grpc/peer"
    13  	"google.golang.org/grpc/stats"
    14  	"google.golang.org/grpc/status"
    15  
    16  	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
    17  	"go.opentelemetry.io/otel/attribute"
    18  	"go.opentelemetry.io/otel/codes"
    19  	"go.opentelemetry.io/otel/metric"
    20  	semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
    21  	"go.opentelemetry.io/otel/trace"
    22  )
    23  
    24  type gRPCContextKey struct{}
    25  
    26  type gRPCContext struct {
    27  	messagesReceived int64
    28  	messagesSent     int64
    29  	metricAttrs      []attribute.KeyValue
    30  }
    31  
    32  type serverHandler struct {
    33  	*config
    34  }
    35  
    36  // NewServerHandler creates a stats.Handler for a gRPC server.
    37  func NewServerHandler(opts ...Option) stats.Handler {
    38  	h := &serverHandler{
    39  		config: newConfig(opts, "server"),
    40  	}
    41  
    42  	return h
    43  }
    44  
    45  // TagConn can attach some information to the given context.
    46  func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
    47  	return ctx
    48  }
    49  
    50  // HandleConn processes the Conn stats.
    51  func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
    52  }
    53  
    54  // TagRPC can attach some information to the given context.
    55  func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
    56  	ctx = extract(ctx, h.config.Propagators)
    57  
    58  	name, attrs := internal.ParseFullMethod(info.FullMethodName)
    59  	attrs = append(attrs, RPCSystemGRPC)
    60  	ctx, _ = h.tracer.Start(
    61  		trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
    62  		name,
    63  		trace.WithSpanKind(trace.SpanKindServer),
    64  		trace.WithAttributes(attrs...),
    65  	)
    66  
    67  	gctx := gRPCContext{
    68  		metricAttrs: attrs,
    69  	}
    70  	return context.WithValue(ctx, gRPCContextKey{}, &gctx)
    71  }
    72  
    73  // HandleRPC processes the RPC stats.
    74  func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
    75  	isServer := true
    76  	h.handleRPC(ctx, rs, isServer)
    77  }
    78  
    79  type clientHandler struct {
    80  	*config
    81  }
    82  
    83  // NewClientHandler creates a stats.Handler for a gRPC client.
    84  func NewClientHandler(opts ...Option) stats.Handler {
    85  	h := &clientHandler{
    86  		config: newConfig(opts, "client"),
    87  	}
    88  
    89  	return h
    90  }
    91  
    92  // TagRPC can attach some information to the given context.
    93  func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
    94  	name, attrs := internal.ParseFullMethod(info.FullMethodName)
    95  	attrs = append(attrs, RPCSystemGRPC)
    96  	ctx, _ = h.tracer.Start(
    97  		ctx,
    98  		name,
    99  		trace.WithSpanKind(trace.SpanKindClient),
   100  		trace.WithAttributes(attrs...),
   101  	)
   102  
   103  	gctx := gRPCContext{
   104  		metricAttrs: attrs,
   105  	}
   106  
   107  	return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators)
   108  }
   109  
   110  // HandleRPC processes the RPC stats.
   111  func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
   112  	isServer := false
   113  	h.handleRPC(ctx, rs, isServer)
   114  }
   115  
   116  // TagConn can attach some information to the given context.
   117  func (h *clientHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
   118  	return ctx
   119  }
   120  
   121  // HandleConn processes the Conn stats.
   122  func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
   123  	// no-op
   124  }
   125  
   126  func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool) { // nolint: revive  // isServer is not a control flag.
   127  	span := trace.SpanFromContext(ctx)
   128  	var metricAttrs []attribute.KeyValue
   129  	var messageId int64
   130  
   131  	gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
   132  	if gctx != nil {
   133  		metricAttrs = make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
   134  		metricAttrs = append(metricAttrs, gctx.metricAttrs...)
   135  	}
   136  
   137  	switch rs := rs.(type) {
   138  	case *stats.Begin:
   139  	case *stats.InPayload:
   140  		if gctx != nil {
   141  			messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
   142  			c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
   143  		}
   144  
   145  		if c.ReceivedEvent {
   146  			span.AddEvent("message",
   147  				trace.WithAttributes(
   148  					semconv.MessageTypeReceived,
   149  					semconv.MessageIDKey.Int64(messageId),
   150  					semconv.MessageCompressedSizeKey.Int(rs.CompressedLength),
   151  					semconv.MessageUncompressedSizeKey.Int(rs.Length),
   152  				),
   153  			)
   154  		}
   155  	case *stats.OutPayload:
   156  		if gctx != nil {
   157  			messageId = atomic.AddInt64(&gctx.messagesSent, 1)
   158  			c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
   159  		}
   160  
   161  		if c.SentEvent {
   162  			span.AddEvent("message",
   163  				trace.WithAttributes(
   164  					semconv.MessageTypeSent,
   165  					semconv.MessageIDKey.Int64(messageId),
   166  					semconv.MessageCompressedSizeKey.Int(rs.CompressedLength),
   167  					semconv.MessageUncompressedSizeKey.Int(rs.Length),
   168  				),
   169  			)
   170  		}
   171  	case *stats.OutTrailer:
   172  	case *stats.OutHeader:
   173  		if p, ok := peer.FromContext(ctx); ok {
   174  			span.SetAttributes(peerAttr(p.Addr.String())...)
   175  		}
   176  	case *stats.End:
   177  		var rpcStatusAttr attribute.KeyValue
   178  
   179  		if rs.Error != nil {
   180  			s, _ := status.FromError(rs.Error)
   181  			if isServer {
   182  				statusCode, msg := serverStatus(s)
   183  				span.SetStatus(statusCode, msg)
   184  			} else {
   185  				span.SetStatus(codes.Error, s.Message())
   186  			}
   187  			rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))
   188  		} else {
   189  			rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))
   190  		}
   191  		span.SetAttributes(rpcStatusAttr)
   192  		span.End()
   193  
   194  		metricAttrs = append(metricAttrs, rpcStatusAttr)
   195  
   196  		// Use floating point division here for higher precision (instead of Millisecond method).
   197  		elapsedTime := float64(rs.EndTime.Sub(rs.BeginTime)) / float64(time.Millisecond)
   198  
   199  		c.rpcDuration.Record(ctx, elapsedTime, metric.WithAttributes(metricAttrs...))
   200  		if gctx != nil {
   201  			c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), metric.WithAttributes(metricAttrs...))
   202  			c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), metric.WithAttributes(metricAttrs...))
   203  		}
   204  	default:
   205  		return
   206  	}
   207  }
   208  

View as plain text