1 package kit
2
3 import (
4 "bytes"
5 "fmt"
6
7 "context"
8
9 "github.com/go-kit/log"
10 "github.com/go-kit/log/level"
11 "github.com/golang/protobuf/jsonpb"
12 "github.com/golang/protobuf/proto"
13 grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
14 "github.com/grpc-ecosystem/go-grpc-middleware/logging/kit/ctxkit"
15 "google.golang.org/grpc"
16 )
17
18 var (
19
20
21 JsonPbMarshaller grpc_logging.JsonPbMarshaler = &jsonpb.Marshaler{}
22 )
23
24
25
26
27
28 func PayloadUnaryServerInterceptor(logger log.Logger, decider grpc_logging.ServerPayloadLoggingDecider) grpc.UnaryServerInterceptor {
29 return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
30 if !decider(ctx, info.FullMethod, info.Server) {
31 return handler(ctx, req)
32 }
33
34 logEntry := log.With(logger, append(serverCallFields(info.FullMethod), ctxkit.TagsToFields(ctx)...)...)
35 logProtoMessageAsJson(logEntry, req, "grpc.request.content", "server request payload logged as grpc.request.content field")
36 resp, err := handler(ctx, req)
37 if err == nil {
38 logProtoMessageAsJson(logEntry, resp, "grpc.response.content", "server response payload logged as grpc.request.content field")
39 }
40 return resp, err
41 }
42 }
43
44
45
46
47
48 func PayloadStreamServerInterceptor(logger log.Logger, decider grpc_logging.ServerPayloadLoggingDecider) grpc.StreamServerInterceptor {
49 return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
50 if !decider(stream.Context(), info.FullMethod, srv) {
51 return handler(srv, stream)
52 }
53 logEntry := log.With(logger, append(serverCallFields(info.FullMethod), ctxkit.TagsToFields(stream.Context())...)...)
54 newStream := &loggingServerStream{ServerStream: stream, logger: logEntry}
55 return handler(srv, newStream)
56 }
57 }
58
59
60 func PayloadUnaryClientInterceptor(logger log.Logger, decider grpc_logging.ClientPayloadLoggingDecider) grpc.UnaryClientInterceptor {
61 return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
62 if !decider(ctx, method) {
63 return invoker(ctx, method, req, reply, cc, opts...)
64 }
65 logEntry := log.With(logger, newClientLoggerFields(ctx, method)...)
66 logProtoMessageAsJson(logEntry, req, "grpc.request.content", "client request payload logged as grpc.request.content")
67 err := invoker(ctx, method, req, reply, cc, opts...)
68 if err == nil {
69 logProtoMessageAsJson(logEntry, reply, "grpc.response.content", "client response payload logged as grpc.response.content")
70 }
71 return err
72 }
73 }
74
75
76 func PayloadStreamClientInterceptor(logger log.Logger, decider grpc_logging.ClientPayloadLoggingDecider) grpc.StreamClientInterceptor {
77 return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
78 if !decider(ctx, method) {
79 return streamer(ctx, desc, cc, method, opts...)
80 }
81 logEntry := log.With(logger, newClientLoggerFields(ctx, method)...)
82 clientStream, err := streamer(ctx, desc, cc, method, opts...)
83 newStream := &loggingClientStream{ClientStream: clientStream, logger: logEntry}
84 return newStream, err
85 }
86 }
87
88 type loggingClientStream struct {
89 grpc.ClientStream
90 logger log.Logger
91 }
92
93 func (l *loggingClientStream) SendMsg(m interface{}) error {
94 err := l.ClientStream.SendMsg(m)
95 if err == nil {
96 logProtoMessageAsJson(l.logger, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
97 }
98 return err
99 }
100
101 func (l *loggingClientStream) RecvMsg(m interface{}) error {
102 err := l.ClientStream.RecvMsg(m)
103 if err == nil {
104 logProtoMessageAsJson(l.logger, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
105 }
106 return err
107 }
108
109 type loggingServerStream struct {
110 grpc.ServerStream
111 logger log.Logger
112 }
113
114 func (l *loggingServerStream) SendMsg(m interface{}) error {
115 err := l.ServerStream.SendMsg(m)
116 if err == nil {
117 logProtoMessageAsJson(l.logger, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
118 }
119 return err
120 }
121
122 func (l *loggingServerStream) RecvMsg(m interface{}) error {
123 err := l.ServerStream.RecvMsg(m)
124 if err == nil {
125 logProtoMessageAsJson(l.logger, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
126 }
127 return err
128 }
129
130 func logProtoMessageAsJson(logger log.Logger, pbMsg interface{}, key string, msg string) {
131 if p, ok := pbMsg.(proto.Message); ok {
132 payload, err := (&jsonpbObjectMarshaler{pb: p}).marshalJSON()
133 if err != nil {
134 level.Info(logger).Log(key, err)
135 }
136 level.Info(logger).Log(key, string(payload))
137 }
138 }
139
140 type jsonpbObjectMarshaler struct {
141 pb proto.Message
142 }
143
144 func (j *jsonpbObjectMarshaler) marshalJSON() ([]byte, error) {
145 b := &bytes.Buffer{}
146 if err := JsonPbMarshaller.Marshal(b, j.pb); err != nil {
147 return nil, fmt.Errorf("jsonpb serializer failed: %v", err)
148 }
149 return b.Bytes(), nil
150 }
151
View as plain text