1 package grpc_logrus
2
3 import (
4 "bytes"
5 "context"
6 "fmt"
7
8 "github.com/golang/protobuf/jsonpb"
9 "github.com/golang/protobuf/proto"
10 "github.com/grpc-ecosystem/go-grpc-middleware/logging"
11 "github.com/grpc-ecosystem/go-grpc-middleware/logging/logrus/ctxlogrus"
12 "github.com/sirupsen/logrus"
13 "google.golang.org/grpc"
14 )
15
16 var (
17
18
19 JsonPbMarshaller grpc_logging.JsonPbMarshaler = &jsonpb.Marshaler{}
20 )
21
22
23
24
25
26 func PayloadUnaryServerInterceptor(entry *logrus.Entry, decider grpc_logging.ServerPayloadLoggingDecider) grpc.UnaryServerInterceptor {
27 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
28 if !decider(ctx, info.FullMethod, info.Server) {
29 return handler(ctx, req)
30 }
31
32 logEntry := entry.WithFields(ctxlogrus.Extract(ctx).Data)
33 logProtoMessageAsJson(logEntry, req, "grpc.request.content", "server request payload logged as grpc.request.content field")
34 resp, err := handler(ctx, req)
35 if err == nil {
36 logProtoMessageAsJson(logEntry, resp, "grpc.response.content", "server response payload logged as grpc.request.content field")
37 }
38 return resp, err
39 }
40 }
41
42
43
44
45
46 func PayloadStreamServerInterceptor(entry *logrus.Entry, decider grpc_logging.ServerPayloadLoggingDecider) grpc.StreamServerInterceptor {
47 return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
48 if !decider(stream.Context(), info.FullMethod, srv) {
49 return handler(srv, stream)
50 }
51
52 logEntry := entry.WithFields(ctxlogrus.Extract(stream.Context()).Data)
53 newStream := &loggingServerStream{ServerStream: stream, entry: logEntry}
54 return handler(srv, newStream)
55 }
56 }
57
58
59 func PayloadUnaryClientInterceptor(entry *logrus.Entry, decider grpc_logging.ClientPayloadLoggingDecider) grpc.UnaryClientInterceptor {
60 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
61 if !decider(ctx, method) {
62 return invoker(ctx, method, req, reply, cc, opts...)
63 }
64 logEntry := entry.WithFields(newClientLoggerFields(ctx, method))
65 logProtoMessageAsJson(logEntry, req, "grpc.request.content", "client request payload logged as grpc.request.content")
66 err := invoker(ctx, method, req, reply, cc, opts...)
67 if err == nil {
68 logProtoMessageAsJson(logEntry, reply, "grpc.response.content", "client response payload logged as grpc.response.content")
69 }
70 return err
71 }
72 }
73
74
75 func PayloadStreamClientInterceptor(entry *logrus.Entry, decider grpc_logging.ClientPayloadLoggingDecider) grpc.StreamClientInterceptor {
76 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
77 if !decider(ctx, method) {
78 return streamer(ctx, desc, cc, method, opts...)
79 }
80 logEntry := entry.WithFields(newClientLoggerFields(ctx, method))
81 clientStream, err := streamer(ctx, desc, cc, method, opts...)
82 newStream := &loggingClientStream{ClientStream: clientStream, entry: logEntry}
83 return newStream, err
84 }
85 }
86
87 type loggingClientStream struct {
88 grpc.ClientStream
89 entry *logrus.Entry
90 }
91
92 func (l *loggingClientStream) SendMsg(m interface{}) error {
93 err := l.ClientStream.SendMsg(m)
94 if err == nil {
95 logProtoMessageAsJson(l.entry, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
96 }
97 return err
98 }
99
100 func (l *loggingClientStream) RecvMsg(m interface{}) error {
101 err := l.ClientStream.RecvMsg(m)
102 if err == nil {
103 logProtoMessageAsJson(l.entry, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
104 }
105 return err
106 }
107
108 type loggingServerStream struct {
109 grpc.ServerStream
110 entry *logrus.Entry
111 }
112
113 func (l *loggingServerStream) SendMsg(m interface{}) error {
114 err := l.ServerStream.SendMsg(m)
115 if err == nil {
116 logProtoMessageAsJson(l.entry, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
117 }
118 return err
119 }
120
121 func (l *loggingServerStream) RecvMsg(m interface{}) error {
122 err := l.ServerStream.RecvMsg(m)
123 if err == nil {
124 logProtoMessageAsJson(l.entry, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
125 }
126 return err
127 }
128
129 func logProtoMessageAsJson(entry *logrus.Entry, pbMsg interface{}, key string, msg string) {
130 if p, ok := pbMsg.(proto.Message); ok {
131 entry.WithField(key, &jsonpbMarshalleble{p}).Info(msg)
132 }
133 }
134
135 type jsonpbMarshalleble struct {
136 proto.Message
137 }
138
139 func (j *jsonpbMarshalleble) MarshalJSON() ([]byte, error) {
140 b := &bytes.Buffer{}
141 if err := JsonPbMarshaller.Marshal(b, j.Message); err != nil {
142 return nil, fmt.Errorf("jsonpb serializer failed: %v", err)
143 }
144 return b.Bytes(), nil
145 }
146
View as plain text