1
2
3
4 package otelgrpc
5
6
7
8 import (
9 "context"
10 "io"
11 "net"
12 "strconv"
13 "time"
14
15 "google.golang.org/grpc"
16 grpc_codes "google.golang.org/grpc/codes"
17 "google.golang.org/grpc/metadata"
18 "google.golang.org/grpc/peer"
19 "google.golang.org/grpc/status"
20 "google.golang.org/protobuf/proto"
21
22 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc/internal"
23 "go.opentelemetry.io/otel/attribute"
24 "go.opentelemetry.io/otel/codes"
25 "go.opentelemetry.io/otel/metric"
26 semconv "go.opentelemetry.io/otel/semconv/v1.17.0"
27 "go.opentelemetry.io/otel/trace"
28 )
29
30 type messageType attribute.KeyValue
31
32
33
34 func (m messageType) Event(ctx context.Context, id int, _ interface{}) {
35 span := trace.SpanFromContext(ctx)
36 if !span.IsRecording() {
37 return
38 }
39 span.AddEvent("message", trace.WithAttributes(
40 attribute.KeyValue(m),
41 RPCMessageIDKey.Int(id),
42 ))
43 }
44
45 var (
46 messageSent = messageType(RPCMessageTypeSent)
47 messageReceived = messageType(RPCMessageTypeReceived)
48 )
49
50
51
52
53
54 func UnaryClientInterceptor(opts ...Option) grpc.UnaryClientInterceptor {
55 cfg := newConfig(opts, "client")
56 tracer := cfg.TracerProvider.Tracer(
57 ScopeName,
58 trace.WithInstrumentationVersion(Version()),
59 )
60
61 return func(
62 ctx context.Context,
63 method string,
64 req, reply interface{},
65 cc *grpc.ClientConn,
66 invoker grpc.UnaryInvoker,
67 callOpts ...grpc.CallOption,
68 ) error {
69 i := &InterceptorInfo{
70 Method: method,
71 Type: UnaryClient,
72 }
73 if cfg.Filter != nil && !cfg.Filter(i) {
74 return invoker(ctx, method, req, reply, cc, callOpts...)
75 }
76
77 name, attr, _ := telemetryAttributes(method, cc.Target())
78
79 startOpts := append([]trace.SpanStartOption{
80 trace.WithSpanKind(trace.SpanKindClient),
81 trace.WithAttributes(attr...),
82 },
83 cfg.SpanStartOptions...,
84 )
85
86 ctx, span := tracer.Start(
87 ctx,
88 name,
89 startOpts...,
90 )
91 defer span.End()
92
93 ctx = inject(ctx, cfg.Propagators)
94
95 if cfg.SentEvent {
96 messageSent.Event(ctx, 1, req)
97 }
98
99 err := invoker(ctx, method, req, reply, cc, callOpts...)
100
101 if cfg.ReceivedEvent {
102 messageReceived.Event(ctx, 1, reply)
103 }
104
105 if err != nil {
106 s, _ := status.FromError(err)
107 span.SetStatus(codes.Error, s.Message())
108 span.SetAttributes(statusCodeAttr(s.Code()))
109 } else {
110 span.SetAttributes(statusCodeAttr(grpc_codes.OK))
111 }
112
113 return err
114 }
115 }
116
117
118
119 type clientStream struct {
120 grpc.ClientStream
121 desc *grpc.StreamDesc
122
123 span trace.Span
124
125 receivedEvent bool
126 sentEvent bool
127
128 receivedMessageID int
129 sentMessageID int
130 }
131
132 var _ = proto.Marshal
133
134 func (w *clientStream) RecvMsg(m interface{}) error {
135 err := w.ClientStream.RecvMsg(m)
136
137 if err == nil && !w.desc.ServerStreams {
138 w.endSpan(nil)
139 } else if err == io.EOF {
140 w.endSpan(nil)
141 } else if err != nil {
142 w.endSpan(err)
143 } else {
144 w.receivedMessageID++
145
146 if w.receivedEvent {
147 messageReceived.Event(w.Context(), w.receivedMessageID, m)
148 }
149 }
150
151 return err
152 }
153
154 func (w *clientStream) SendMsg(m interface{}) error {
155 err := w.ClientStream.SendMsg(m)
156
157 w.sentMessageID++
158
159 if w.sentEvent {
160 messageSent.Event(w.Context(), w.sentMessageID, m)
161 }
162
163 if err != nil {
164 w.endSpan(err)
165 }
166
167 return err
168 }
169
170 func (w *clientStream) Header() (metadata.MD, error) {
171 md, err := w.ClientStream.Header()
172 if err != nil {
173 w.endSpan(err)
174 }
175
176 return md, err
177 }
178
179 func (w *clientStream) CloseSend() error {
180 err := w.ClientStream.CloseSend()
181 if err != nil {
182 w.endSpan(err)
183 }
184
185 return err
186 }
187
188 func wrapClientStream(ctx context.Context, s grpc.ClientStream, desc *grpc.StreamDesc, span trace.Span, cfg *config) *clientStream {
189 return &clientStream{
190 ClientStream: s,
191 span: span,
192 desc: desc,
193 receivedEvent: cfg.ReceivedEvent,
194 sentEvent: cfg.SentEvent,
195 }
196 }
197
198 func (w *clientStream) endSpan(err error) {
199 if err != nil {
200 s, _ := status.FromError(err)
201 w.span.SetStatus(codes.Error, s.Message())
202 w.span.SetAttributes(statusCodeAttr(s.Code()))
203 } else {
204 w.span.SetAttributes(statusCodeAttr(grpc_codes.OK))
205 }
206
207 w.span.End()
208 }
209
210
211
212
213
214 func StreamClientInterceptor(opts ...Option) grpc.StreamClientInterceptor {
215 cfg := newConfig(opts, "client")
216 tracer := cfg.TracerProvider.Tracer(
217 ScopeName,
218 trace.WithInstrumentationVersion(Version()),
219 )
220
221 return func(
222 ctx context.Context,
223 desc *grpc.StreamDesc,
224 cc *grpc.ClientConn,
225 method string,
226 streamer grpc.Streamer,
227 callOpts ...grpc.CallOption,
228 ) (grpc.ClientStream, error) {
229 i := &InterceptorInfo{
230 Method: method,
231 Type: StreamClient,
232 }
233 if cfg.Filter != nil && !cfg.Filter(i) {
234 return streamer(ctx, desc, cc, method, callOpts...)
235 }
236
237 name, attr, _ := telemetryAttributes(method, cc.Target())
238
239 startOpts := append([]trace.SpanStartOption{
240 trace.WithSpanKind(trace.SpanKindClient),
241 trace.WithAttributes(attr...),
242 },
243 cfg.SpanStartOptions...,
244 )
245
246 ctx, span := tracer.Start(
247 ctx,
248 name,
249 startOpts...,
250 )
251
252 ctx = inject(ctx, cfg.Propagators)
253
254 s, err := streamer(ctx, desc, cc, method, callOpts...)
255 if err != nil {
256 grpcStatus, _ := status.FromError(err)
257 span.SetStatus(codes.Error, grpcStatus.Message())
258 span.SetAttributes(statusCodeAttr(grpcStatus.Code()))
259 span.End()
260 return s, err
261 }
262 stream := wrapClientStream(ctx, s, desc, span, cfg)
263 return stream, nil
264 }
265 }
266
267
268
269
270
271 func UnaryServerInterceptor(opts ...Option) grpc.UnaryServerInterceptor {
272 cfg := newConfig(opts, "server")
273 tracer := cfg.TracerProvider.Tracer(
274 ScopeName,
275 trace.WithInstrumentationVersion(Version()),
276 )
277
278 return func(
279 ctx context.Context,
280 req interface{},
281 info *grpc.UnaryServerInfo,
282 handler grpc.UnaryHandler,
283 ) (interface{}, error) {
284 i := &InterceptorInfo{
285 UnaryServerInfo: info,
286 Type: UnaryServer,
287 }
288 if cfg.Filter != nil && !cfg.Filter(i) {
289 return handler(ctx, req)
290 }
291
292 ctx = extract(ctx, cfg.Propagators)
293 name, attr, metricAttrs := telemetryAttributes(info.FullMethod, peerFromCtx(ctx))
294
295 startOpts := append([]trace.SpanStartOption{
296 trace.WithSpanKind(trace.SpanKindServer),
297 trace.WithAttributes(attr...),
298 },
299 cfg.SpanStartOptions...,
300 )
301
302 ctx, span := tracer.Start(
303 trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
304 name,
305 startOpts...,
306 )
307 defer span.End()
308
309 if cfg.ReceivedEvent {
310 messageReceived.Event(ctx, 1, req)
311 }
312
313 before := time.Now()
314
315 resp, err := handler(ctx, req)
316
317 s, _ := status.FromError(err)
318 if err != nil {
319 statusCode, msg := serverStatus(s)
320 span.SetStatus(statusCode, msg)
321 if cfg.SentEvent {
322 messageSent.Event(ctx, 1, s.Proto())
323 }
324 } else {
325 if cfg.SentEvent {
326 messageSent.Event(ctx, 1, resp)
327 }
328 }
329 grpcStatusCodeAttr := statusCodeAttr(s.Code())
330 span.SetAttributes(grpcStatusCodeAttr)
331
332
333 elapsedTime := float64(time.Since(before)) / float64(time.Millisecond)
334
335 metricAttrs = append(metricAttrs, grpcStatusCodeAttr)
336 cfg.rpcDuration.Record(ctx, elapsedTime, metric.WithAttributes(metricAttrs...))
337
338 return resp, err
339 }
340 }
341
342
343
344 type serverStream struct {
345 grpc.ServerStream
346 ctx context.Context
347
348 receivedMessageID int
349 sentMessageID int
350
351 receivedEvent bool
352 sentEvent bool
353 }
354
355 func (w *serverStream) Context() context.Context {
356 return w.ctx
357 }
358
359 func (w *serverStream) RecvMsg(m interface{}) error {
360 err := w.ServerStream.RecvMsg(m)
361
362 if err == nil {
363 w.receivedMessageID++
364 if w.receivedEvent {
365 messageReceived.Event(w.Context(), w.receivedMessageID, m)
366 }
367 }
368
369 return err
370 }
371
372 func (w *serverStream) SendMsg(m interface{}) error {
373 err := w.ServerStream.SendMsg(m)
374
375 w.sentMessageID++
376 if w.sentEvent {
377 messageSent.Event(w.Context(), w.sentMessageID, m)
378 }
379
380 return err
381 }
382
383 func wrapServerStream(ctx context.Context, ss grpc.ServerStream, cfg *config) *serverStream {
384 return &serverStream{
385 ServerStream: ss,
386 ctx: ctx,
387 receivedEvent: cfg.ReceivedEvent,
388 sentEvent: cfg.SentEvent,
389 }
390 }
391
392
393
394
395
396 func StreamServerInterceptor(opts ...Option) grpc.StreamServerInterceptor {
397 cfg := newConfig(opts, "server")
398 tracer := cfg.TracerProvider.Tracer(
399 ScopeName,
400 trace.WithInstrumentationVersion(Version()),
401 )
402
403 return func(
404 srv interface{},
405 ss grpc.ServerStream,
406 info *grpc.StreamServerInfo,
407 handler grpc.StreamHandler,
408 ) error {
409 ctx := ss.Context()
410 i := &InterceptorInfo{
411 StreamServerInfo: info,
412 Type: StreamServer,
413 }
414 if cfg.Filter != nil && !cfg.Filter(i) {
415 return handler(srv, wrapServerStream(ctx, ss, cfg))
416 }
417
418 ctx = extract(ctx, cfg.Propagators)
419 name, attr, _ := telemetryAttributes(info.FullMethod, peerFromCtx(ctx))
420
421 startOpts := append([]trace.SpanStartOption{
422 trace.WithSpanKind(trace.SpanKindServer),
423 trace.WithAttributes(attr...),
424 },
425 cfg.SpanStartOptions...,
426 )
427
428 ctx, span := tracer.Start(
429 trace.ContextWithRemoteSpanContext(ctx, trace.SpanContextFromContext(ctx)),
430 name,
431 startOpts...,
432 )
433 defer span.End()
434
435 err := handler(srv, wrapServerStream(ctx, ss, cfg))
436 if err != nil {
437 s, _ := status.FromError(err)
438 statusCode, msg := serverStatus(s)
439 span.SetStatus(statusCode, msg)
440 span.SetAttributes(statusCodeAttr(s.Code()))
441 } else {
442 span.SetAttributes(statusCodeAttr(grpc_codes.OK))
443 }
444
445 return err
446 }
447 }
448
449
450
451 func telemetryAttributes(fullMethod, peerAddress string) (string, []attribute.KeyValue, []attribute.KeyValue) {
452 name, methodAttrs := internal.ParseFullMethod(fullMethod)
453 peerAttrs := peerAttr(peerAddress)
454
455 attrs := make([]attribute.KeyValue, 0, 1+len(methodAttrs)+len(peerAttrs))
456 attrs = append(attrs, RPCSystemGRPC)
457 attrs = append(attrs, methodAttrs...)
458 metricAttrs := attrs[:1+len(methodAttrs)]
459 attrs = append(attrs, peerAttrs...)
460 return name, attrs, metricAttrs
461 }
462
463
464 func peerAttr(addr string) []attribute.KeyValue {
465 host, p, err := net.SplitHostPort(addr)
466 if err != nil {
467 return nil
468 }
469
470 if host == "" {
471 host = "127.0.0.1"
472 }
473 port, err := strconv.Atoi(p)
474 if err != nil {
475 return nil
476 }
477
478 var attr []attribute.KeyValue
479 if ip := net.ParseIP(host); ip != nil {
480 attr = []attribute.KeyValue{
481 semconv.NetSockPeerAddr(host),
482 semconv.NetSockPeerPort(port),
483 }
484 } else {
485 attr = []attribute.KeyValue{
486 semconv.NetPeerName(host),
487 semconv.NetPeerPort(port),
488 }
489 }
490
491 return attr
492 }
493
494
495 func peerFromCtx(ctx context.Context) string {
496 p, ok := peer.FromContext(ctx)
497 if !ok {
498 return ""
499 }
500 return p.Addr.String()
501 }
502
503
504 func statusCodeAttr(c grpc_codes.Code) attribute.KeyValue {
505 return GRPCStatusCodeKey.Int64(int64(c))
506 }
507
508
509
510
511
512
513
514
515
516
517 func serverStatus(grpcStatus *status.Status) (codes.Code, string) {
518 switch grpcStatus.Code() {
519 case grpc_codes.Unknown,
520 grpc_codes.DeadlineExceeded,
521 grpc_codes.Unimplemented,
522 grpc_codes.Internal,
523 grpc_codes.Unavailable,
524 grpc_codes.DataLoss:
525 return codes.Error, grpcStatus.Message()
526 default:
527 return codes.Unset, ""
528 }
529 }
530
View as plain text