1
2
3
4 package otelgrpc
5
6 import (
7 "context"
8 "sync/atomic"
9 "time"
10
11 grpc_codes "google.golang.org/grpc/codes"
12 "google.golang.org/grpc/peer"
13 "google.golang.org/grpc/stats"
14 "google.golang.org/grpc/status"
15
16 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
17 "go.opentelemetry.io/otel/attribute"
18 "go.opentelemetry.io/otel/codes"
19 "go.opentelemetry.io/otel/metric"
20 semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
21 "go.opentelemetry.io/otel/trace"
22 )
23
24 type gRPCContextKey struct{}
25
26 type gRPCContext struct {
27 messagesReceived int64
28 messagesSent int64
29 metricAttrs []attribute.KeyValue
30 }
31
32 type serverHandler struct {
33 *config
34 }
35
36
37 func NewServerHandler(opts ...Option) stats.Handler {
38 h := &serverHandler{
39 config: newConfig(opts, "server"),
40 }
41
42 return h
43 }
44
45
46 func (h *serverHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
47 return ctx
48 }
49
50
51 func (h *serverHandler) HandleConn(ctx context.Context, info stats.ConnStats) {
52 }
53
54
55 func (h *serverHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
56 ctx = extract(ctx, h.config.Propagators)
57
58 name, attrs := internal.ParseFullMethod(info.FullMethodName)
59 attrs = append(attrs, RPCSystemGRPC)
60 ctx, _ = h.tracer.Start(
61 trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
62 name,
63 trace.WithSpanKind(trace.SpanKindServer),
64 trace.WithAttributes(attrs...),
65 )
66
67 gctx := gRPCContext{
68 metricAttrs: attrs,
69 }
70 return context.WithValue(ctx, gRPCContextKey{}, &gctx)
71 }
72
73
74 func (h *serverHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
75 isServer := true
76 h.handleRPC(ctx, rs, isServer)
77 }
78
79 type clientHandler struct {
80 *config
81 }
82
83
84 func NewClientHandler(opts ...Option) stats.Handler {
85 h := &clientHandler{
86 config: newConfig(opts, "client"),
87 }
88
89 return h
90 }
91
92
93 func (h *clientHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
94 name, attrs := internal.ParseFullMethod(info.FullMethodName)
95 attrs = append(attrs, RPCSystemGRPC)
96 ctx, _ = h.tracer.Start(
97 ctx,
98 name,
99 trace.WithSpanKind(trace.SpanKindClient),
100 trace.WithAttributes(attrs...),
101 )
102
103 gctx := gRPCContext{
104 metricAttrs: attrs,
105 }
106
107 return inject(context.WithValue(ctx, gRPCContextKey{}, &gctx), h.config.Propagators)
108 }
109
110
111 func (h *clientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) {
112 isServer := false
113 h.handleRPC(ctx, rs, isServer)
114 }
115
116
117 func (h *clientHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
118 return ctx
119 }
120
121
122 func (h *clientHandler) HandleConn(context.Context, stats.ConnStats) {
123
124 }
125
126 func (c *config) handleRPC(ctx context.Context, rs stats.RPCStats, isServer bool) {
127 span := trace.SpanFromContext(ctx)
128 var metricAttrs []attribute.KeyValue
129 var messageId int64
130
131 gctx, _ := ctx.Value(gRPCContextKey{}).(*gRPCContext)
132 if gctx != nil {
133 metricAttrs = make([]attribute.KeyValue, 0, len(gctx.metricAttrs)+1)
134 metricAttrs = append(metricAttrs, gctx.metricAttrs...)
135 }
136
137 switch rs := rs.(type) {
138 case *stats.Begin:
139 case *stats.InPayload:
140 if gctx != nil {
141 messageId = atomic.AddInt64(&gctx.messagesReceived, 1)
142 c.rpcRequestSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
143 }
144
145 if c.ReceivedEvent {
146 span.AddEvent("message",
147 trace.WithAttributes(
148 semconv.MessageTypeReceived,
149 semconv.MessageIDKey.Int64(messageId),
150 semconv.MessageCompressedSizeKey.Int(rs.CompressedLength),
151 semconv.MessageUncompressedSizeKey.Int(rs.Length),
152 ),
153 )
154 }
155 case *stats.OutPayload:
156 if gctx != nil {
157 messageId = atomic.AddInt64(&gctx.messagesSent, 1)
158 c.rpcResponseSize.Record(ctx, int64(rs.Length), metric.WithAttributes(metricAttrs...))
159 }
160
161 if c.SentEvent {
162 span.AddEvent("message",
163 trace.WithAttributes(
164 semconv.MessageTypeSent,
165 semconv.MessageIDKey.Int64(messageId),
166 semconv.MessageCompressedSizeKey.Int(rs.CompressedLength),
167 semconv.MessageUncompressedSizeKey.Int(rs.Length),
168 ),
169 )
170 }
171 case *stats.OutTrailer:
172 case *stats.OutHeader:
173 if p, ok := peer.FromContext(ctx); ok {
174 span.SetAttributes(peerAttr(p.Addr.String())...)
175 }
176 case *stats.End:
177 var rpcStatusAttr attribute.KeyValue
178
179 if rs.Error != nil {
180 s, _ := status.FromError(rs.Error)
181 if isServer {
182 statusCode, msg := serverStatus(s)
183 span.SetStatus(statusCode, msg)
184 } else {
185 span.SetStatus(codes.Error, s.Message())
186 }
187 rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(s.Code()))
188 } else {
189 rpcStatusAttr = semconv.RPCGRPCStatusCodeKey.Int(int(grpc_codes.OK))
190 }
191 span.SetAttributes(rpcStatusAttr)
192 span.End()
193
194 metricAttrs = append(metricAttrs, rpcStatusAttr)
195
196
197 elapsedTime := float64(rs.EndTime.Sub(rs.BeginTime)) / float64(time.Millisecond)
198
199 c.rpcDuration.Record(ctx, elapsedTime, metric.WithAttributes(metricAttrs...))
200 if gctx != nil {
201 c.rpcRequestsPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesReceived), metric.WithAttributes(metricAttrs...))
202 c.rpcResponsesPerRPC.Record(ctx, atomic.LoadInt64(&gctx.messagesSent), metric.WithAttributes(metricAttrs...))
203 }
204 default:
205 return
206 }
207 }
208
View as plain text