1 package grpc_zap
2
3 import (
4 "bytes"
5 "context"
6 "fmt"
7
8 "github.com/golang/protobuf/jsonpb"
9 "github.com/golang/protobuf/proto"
10 "github.com/grpc-ecosystem/go-grpc-middleware/logging"
11 "github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
12 "go.uber.org/zap"
13 "go.uber.org/zap/zapcore"
14 "google.golang.org/grpc"
15 )
16
17 var (
18
19
20 JsonPbMarshaller grpc_logging.JsonPbMarshaler = &jsonpb.Marshaler{}
21 )
22
23
24
25
26
27 func PayloadUnaryServerInterceptor(logger *zap.Logger, decider grpc_logging.ServerPayloadLoggingDecider) grpc.UnaryServerInterceptor {
28 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
29 if !decider(ctx, info.FullMethod, info.Server) {
30 return handler(ctx, req)
31 }
32
33 logEntry := logger.With(append(serverCallFields(info.FullMethod), ctxzap.TagsToFields(ctx)...)...)
34 logProtoMessageAsJson(logEntry, req, "grpc.request.content", "server request payload logged as grpc.request.content field")
35 resp, err := handler(ctx, req)
36 if err == nil {
37 logProtoMessageAsJson(logEntry, resp, "grpc.response.content", "server response payload logged as grpc.response.content field")
38 }
39 return resp, err
40 }
41 }
42
43
44
45
46
47 func PayloadStreamServerInterceptor(logger *zap.Logger, decider grpc_logging.ServerPayloadLoggingDecider) grpc.StreamServerInterceptor {
48 return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
49 if !decider(stream.Context(), info.FullMethod, srv) {
50 return handler(srv, stream)
51 }
52 logEntry := logger.With(append(serverCallFields(info.FullMethod), ctxzap.TagsToFields(stream.Context())...)...)
53 newStream := &loggingServerStream{ServerStream: stream, logger: logEntry}
54 return handler(srv, newStream)
55 }
56 }
57
58
59 func PayloadUnaryClientInterceptor(logger *zap.Logger, decider grpc_logging.ClientPayloadLoggingDecider) grpc.UnaryClientInterceptor {
60 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
61 if !decider(ctx, method) {
62 return invoker(ctx, method, req, reply, cc, opts...)
63 }
64 logEntry := logger.With(newClientLoggerFields(ctx, method)...)
65 logProtoMessageAsJson(logEntry, req, "grpc.request.content", "client request payload logged as grpc.request.content")
66 err := invoker(ctx, method, req, reply, cc, opts...)
67 if err == nil {
68 logProtoMessageAsJson(logEntry, reply, "grpc.response.content", "client response payload logged as grpc.response.content")
69 }
70 return err
71 }
72 }
73
74
75 func PayloadStreamClientInterceptor(logger *zap.Logger, decider grpc_logging.ClientPayloadLoggingDecider) grpc.StreamClientInterceptor {
76 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
77 if !decider(ctx, method) {
78 return streamer(ctx, desc, cc, method, opts...)
79 }
80 logEntry := logger.With(newClientLoggerFields(ctx, method)...)
81 clientStream, err := streamer(ctx, desc, cc, method, opts...)
82 newStream := &loggingClientStream{ClientStream: clientStream, logger: logEntry}
83 return newStream, err
84 }
85 }
86
87 type loggingClientStream struct {
88 grpc.ClientStream
89 logger *zap.Logger
90 }
91
92 func (l *loggingClientStream) SendMsg(m interface{}) error {
93 err := l.ClientStream.SendMsg(m)
94 if err == nil {
95 logProtoMessageAsJson(l.logger, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
96 }
97 return err
98 }
99
100 func (l *loggingClientStream) RecvMsg(m interface{}) error {
101 err := l.ClientStream.RecvMsg(m)
102 if err == nil {
103 logProtoMessageAsJson(l.logger, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
104 }
105 return err
106 }
107
108 type loggingServerStream struct {
109 grpc.ServerStream
110 logger *zap.Logger
111 }
112
113 func (l *loggingServerStream) SendMsg(m interface{}) error {
114 err := l.ServerStream.SendMsg(m)
115 if err == nil {
116 logProtoMessageAsJson(l.logger, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
117 }
118 return err
119 }
120
121 func (l *loggingServerStream) RecvMsg(m interface{}) error {
122 err := l.ServerStream.RecvMsg(m)
123 if err == nil {
124 logProtoMessageAsJson(l.logger, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
125 }
126 return err
127 }
128
129 func logProtoMessageAsJson(logger *zap.Logger, pbMsg interface{}, key string, msg string) {
130 if p, ok := pbMsg.(proto.Message); ok {
131 logger.Check(zapcore.InfoLevel, msg).Write(zap.Object(key, &jsonpbObjectMarshaler{pb: p}))
132 }
133 }
134
135 type jsonpbObjectMarshaler struct {
136 pb proto.Message
137 }
138
139 func (j *jsonpbObjectMarshaler) MarshalLogObject(e zapcore.ObjectEncoder) error {
140
141 return e.AddReflected("msg", j)
142 }
143
144 func (j *jsonpbObjectMarshaler) MarshalJSON() ([]byte, error) {
145 b := &bytes.Buffer{}
146 if err := JsonPbMarshaller.Marshal(b, j.pb); err != nil {
147 return nil, fmt.Errorf("jsonpb serializer failed: %v", err)
148 }
149 return b.Bytes(), nil
150 }
151
View as plain text