...

Source file src/github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/payload_interceptors.go

Documentation: github.com/grpc-ecosystem/go-grpc-middleware/logging/zap

     1  package grpc_zap
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"fmt"
     7  
     8  	"github.com/golang/protobuf/jsonpb"
     9  	"github.com/golang/protobuf/proto"
    10  	"github.com/grpc-ecosystem/go-grpc-middleware/logging"
    11  	"github.com/grpc-ecosystem/go-grpc-middleware/logging/zap/ctxzap"
    12  	"go.uber.org/zap"
    13  	"go.uber.org/zap/zapcore"
    14  	"google.golang.org/grpc"
    15  )
    16  
    17  var (
    18  	// JsonPbMarshaller is the marshaller used for serializing protobuf messages.
    19  	// If needed, this variable can be reassigned with a different marshaller with the same Marshal() signature.
    20  	JsonPbMarshaller grpc_logging.JsonPbMarshaler = &jsonpb.Marshaler{}
    21  )
    22  
    23  // PayloadUnaryServerInterceptor returns a new unary server interceptors that logs the payloads of requests.
    24  //
    25  // This *only* works when placed *after* the `grpc_zap.UnaryServerInterceptor`. However, the logging can be done to a
    26  // separate instance of the logger.
    27  func PayloadUnaryServerInterceptor(logger *zap.Logger, decider grpc_logging.ServerPayloadLoggingDecider) grpc.UnaryServerInterceptor {
    28  	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    29  		if !decider(ctx, info.FullMethod, info.Server) {
    30  			return handler(ctx, req)
    31  		}
    32  		// Use the provided zap.Logger for logging but use the fields from context.
    33  		logEntry := logger.With(append(serverCallFields(info.FullMethod), ctxzap.TagsToFields(ctx)...)...)
    34  		logProtoMessageAsJson(logEntry, req, "grpc.request.content", "server request payload logged as grpc.request.content field")
    35  		resp, err := handler(ctx, req)
    36  		if err == nil {
    37  			logProtoMessageAsJson(logEntry, resp, "grpc.response.content", "server response payload logged as grpc.response.content field")
    38  		}
    39  		return resp, err
    40  	}
    41  }
    42  
    43  // PayloadStreamServerInterceptor returns a new server server interceptors that logs the payloads of requests.
    44  //
    45  // This *only* works when placed *after* the `grpc_zap.StreamServerInterceptor`. However, the logging can be done to a
    46  // separate instance of the logger.
    47  func PayloadStreamServerInterceptor(logger *zap.Logger, decider grpc_logging.ServerPayloadLoggingDecider) grpc.StreamServerInterceptor {
    48  	return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    49  		if !decider(stream.Context(), info.FullMethod, srv) {
    50  			return handler(srv, stream)
    51  		}
    52  		logEntry := logger.With(append(serverCallFields(info.FullMethod), ctxzap.TagsToFields(stream.Context())...)...)
    53  		newStream := &loggingServerStream{ServerStream: stream, logger: logEntry}
    54  		return handler(srv, newStream)
    55  	}
    56  }
    57  
    58  // PayloadUnaryClientInterceptor returns a new unary client interceptor that logs the payloads of requests and responses.
    59  func PayloadUnaryClientInterceptor(logger *zap.Logger, decider grpc_logging.ClientPayloadLoggingDecider) grpc.UnaryClientInterceptor {
    60  	return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    61  		if !decider(ctx, method) {
    62  			return invoker(ctx, method, req, reply, cc, opts...)
    63  		}
    64  		logEntry := logger.With(newClientLoggerFields(ctx, method)...)
    65  		logProtoMessageAsJson(logEntry, req, "grpc.request.content", "client request payload logged as grpc.request.content")
    66  		err := invoker(ctx, method, req, reply, cc, opts...)
    67  		if err == nil {
    68  			logProtoMessageAsJson(logEntry, reply, "grpc.response.content", "client response payload logged as grpc.response.content")
    69  		}
    70  		return err
    71  	}
    72  }
    73  
    74  // PayloadStreamClientInterceptor returns a new streaming client interceptor that logs the payloads of requests and responses.
    75  func PayloadStreamClientInterceptor(logger *zap.Logger, decider grpc_logging.ClientPayloadLoggingDecider) grpc.StreamClientInterceptor {
    76  	return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    77  		if !decider(ctx, method) {
    78  			return streamer(ctx, desc, cc, method, opts...)
    79  		}
    80  		logEntry := logger.With(newClientLoggerFields(ctx, method)...)
    81  		clientStream, err := streamer(ctx, desc, cc, method, opts...)
    82  		newStream := &loggingClientStream{ClientStream: clientStream, logger: logEntry}
    83  		return newStream, err
    84  	}
    85  }
    86  
    87  type loggingClientStream struct {
    88  	grpc.ClientStream
    89  	logger *zap.Logger
    90  }
    91  
    92  func (l *loggingClientStream) SendMsg(m interface{}) error {
    93  	err := l.ClientStream.SendMsg(m)
    94  	if err == nil {
    95  		logProtoMessageAsJson(l.logger, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
    96  	}
    97  	return err
    98  }
    99  
   100  func (l *loggingClientStream) RecvMsg(m interface{}) error {
   101  	err := l.ClientStream.RecvMsg(m)
   102  	if err == nil {
   103  		logProtoMessageAsJson(l.logger, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
   104  	}
   105  	return err
   106  }
   107  
   108  type loggingServerStream struct {
   109  	grpc.ServerStream
   110  	logger *zap.Logger
   111  }
   112  
   113  func (l *loggingServerStream) SendMsg(m interface{}) error {
   114  	err := l.ServerStream.SendMsg(m)
   115  	if err == nil {
   116  		logProtoMessageAsJson(l.logger, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
   117  	}
   118  	return err
   119  }
   120  
   121  func (l *loggingServerStream) RecvMsg(m interface{}) error {
   122  	err := l.ServerStream.RecvMsg(m)
   123  	if err == nil {
   124  		logProtoMessageAsJson(l.logger, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
   125  	}
   126  	return err
   127  }
   128  
   129  func logProtoMessageAsJson(logger *zap.Logger, pbMsg interface{}, key string, msg string) {
   130  	if p, ok := pbMsg.(proto.Message); ok {
   131  		logger.Check(zapcore.InfoLevel, msg).Write(zap.Object(key, &jsonpbObjectMarshaler{pb: p}))
   132  	}
   133  }
   134  
   135  type jsonpbObjectMarshaler struct {
   136  	pb proto.Message
   137  }
   138  
   139  func (j *jsonpbObjectMarshaler) MarshalLogObject(e zapcore.ObjectEncoder) error {
   140  	// ZAP jsonEncoder deals with AddReflect by using json.MarshalObject. The same thing applies for consoleEncoder.
   141  	return e.AddReflected("msg", j)
   142  }
   143  
   144  func (j *jsonpbObjectMarshaler) MarshalJSON() ([]byte, error) {
   145  	b := &bytes.Buffer{}
   146  	if err := JsonPbMarshaller.Marshal(b, j.pb); err != nil {
   147  		return nil, fmt.Errorf("jsonpb serializer failed: %v", err)
   148  	}
   149  	return b.Bytes(), nil
   150  }
   151  

View as plain text