...

Source file src/github.com/grpc-ecosystem/go-grpc-middleware/logging/kit/payload_interceptors.go

Documentation: github.com/grpc-ecosystem/go-grpc-middleware/logging/kit

     1  package kit
     2  
     3  import (
     4  	"bytes"
     5  	"fmt"
     6  
     7  	"context"
     8  
     9  	"github.com/go-kit/log"
    10  	"github.com/go-kit/log/level"
    11  	"github.com/golang/protobuf/jsonpb"
    12  	"github.com/golang/protobuf/proto"
    13  	grpc_logging "github.com/grpc-ecosystem/go-grpc-middleware/logging"
    14  	"github.com/grpc-ecosystem/go-grpc-middleware/logging/kit/ctxkit"
    15  	"google.golang.org/grpc"
    16  )
    17  
    18  var (
    19  	// JsonPbMarshaller is the marshaller used for serializing protobuf messages.
    20  	// If needed, this variable can be reassigned with a different marshaller with the same Marshal() signature.
    21  	JsonPbMarshaller grpc_logging.JsonPbMarshaler = &jsonpb.Marshaler{}
    22  )
    23  
    24  // PayloadUnaryServerInterceptor returns a new unary server interceptors that logs the payloads of requests.
    25  //
    26  // This *only* works when placed *after* the `kit.UnaryServerInterceptor`. However, the logging can be done to a
    27  // separate instance of the logger.
    28  func PayloadUnaryServerInterceptor(logger log.Logger, decider grpc_logging.ServerPayloadLoggingDecider) grpc.UnaryServerInterceptor {
    29  	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
    30  		if !decider(ctx, info.FullMethod, info.Server) {
    31  			return handler(ctx, req)
    32  		}
    33  		// Use the provided log.Logger for logging but use the fields from context.
    34  		logEntry := log.With(logger, append(serverCallFields(info.FullMethod), ctxkit.TagsToFields(ctx)...)...)
    35  		logProtoMessageAsJson(logEntry, req, "grpc.request.content", "server request payload logged as grpc.request.content field")
    36  		resp, err := handler(ctx, req)
    37  		if err == nil {
    38  			logProtoMessageAsJson(logEntry, resp, "grpc.response.content", "server response payload logged as grpc.request.content field")
    39  		}
    40  		return resp, err
    41  	}
    42  }
    43  
    44  // PayloadStreamServerInterceptor returns a new server server interceptors that logs the payloads of requests.
    45  //
    46  // This *only* works when placed *after* the `kit.StreamServerInterceptor`. However, the logging can be done to a
    47  // separate instance of the logger.
    48  func PayloadStreamServerInterceptor(logger log.Logger, decider grpc_logging.ServerPayloadLoggingDecider) grpc.StreamServerInterceptor {
    49  	return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    50  		if !decider(stream.Context(), info.FullMethod, srv) {
    51  			return handler(srv, stream)
    52  		}
    53  		logEntry := log.With(logger, append(serverCallFields(info.FullMethod), ctxkit.TagsToFields(stream.Context())...)...)
    54  		newStream := &loggingServerStream{ServerStream: stream, logger: logEntry}
    55  		return handler(srv, newStream)
    56  	}
    57  }
    58  
    59  // PayloadUnaryClientInterceptor returns a new unary client interceptor that logs the paylods of requests and responses.
    60  func PayloadUnaryClientInterceptor(logger log.Logger, decider grpc_logging.ClientPayloadLoggingDecider) grpc.UnaryClientInterceptor {
    61  	return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
    62  		if !decider(ctx, method) {
    63  			return invoker(ctx, method, req, reply, cc, opts...)
    64  		}
    65  		logEntry := log.With(logger, newClientLoggerFields(ctx, method)...)
    66  		logProtoMessageAsJson(logEntry, req, "grpc.request.content", "client request payload logged as grpc.request.content")
    67  		err := invoker(ctx, method, req, reply, cc, opts...)
    68  		if err == nil {
    69  			logProtoMessageAsJson(logEntry, reply, "grpc.response.content", "client response payload logged as grpc.response.content")
    70  		}
    71  		return err
    72  	}
    73  }
    74  
    75  // PayloadStreamClientInterceptor returns a new streaming client interceptor that logs the paylods of requests and responses.
    76  func PayloadStreamClientInterceptor(logger log.Logger, decider grpc_logging.ClientPayloadLoggingDecider) grpc.StreamClientInterceptor {
    77  	return func(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
    78  		if !decider(ctx, method) {
    79  			return streamer(ctx, desc, cc, method, opts...)
    80  		}
    81  		logEntry := log.With(logger, newClientLoggerFields(ctx, method)...)
    82  		clientStream, err := streamer(ctx, desc, cc, method, opts...)
    83  		newStream := &loggingClientStream{ClientStream: clientStream, logger: logEntry}
    84  		return newStream, err
    85  	}
    86  }
    87  
    88  type loggingClientStream struct {
    89  	grpc.ClientStream
    90  	logger log.Logger
    91  }
    92  
    93  func (l *loggingClientStream) SendMsg(m interface{}) error {
    94  	err := l.ClientStream.SendMsg(m)
    95  	if err == nil {
    96  		logProtoMessageAsJson(l.logger, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
    97  	}
    98  	return err
    99  }
   100  
   101  func (l *loggingClientStream) RecvMsg(m interface{}) error {
   102  	err := l.ClientStream.RecvMsg(m)
   103  	if err == nil {
   104  		logProtoMessageAsJson(l.logger, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
   105  	}
   106  	return err
   107  }
   108  
   109  type loggingServerStream struct {
   110  	grpc.ServerStream
   111  	logger log.Logger
   112  }
   113  
   114  func (l *loggingServerStream) SendMsg(m interface{}) error {
   115  	err := l.ServerStream.SendMsg(m)
   116  	if err == nil {
   117  		logProtoMessageAsJson(l.logger, m, "grpc.response.content", "server response payload logged as grpc.response.content field")
   118  	}
   119  	return err
   120  }
   121  
   122  func (l *loggingServerStream) RecvMsg(m interface{}) error {
   123  	err := l.ServerStream.RecvMsg(m)
   124  	if err == nil {
   125  		logProtoMessageAsJson(l.logger, m, "grpc.request.content", "server request payload logged as grpc.request.content field")
   126  	}
   127  	return err
   128  }
   129  
   130  func logProtoMessageAsJson(logger log.Logger, pbMsg interface{}, key string, msg string) {
   131  	if p, ok := pbMsg.(proto.Message); ok {
   132  		payload, err := (&jsonpbObjectMarshaler{pb: p}).marshalJSON()
   133  		if err != nil {
   134  			level.Info(logger).Log(key, err)
   135  		}
   136  		level.Info(logger).Log(key, string(payload))
   137  	}
   138  }
   139  
   140  type jsonpbObjectMarshaler struct {
   141  	pb proto.Message
   142  }
   143  
   144  func (j *jsonpbObjectMarshaler) marshalJSON() ([]byte, error) {
   145  	b := &bytes.Buffer{}
   146  	if err := JsonPbMarshaller.Marshal(b, j.pb); err != nil {
   147  		return nil, fmt.Errorf("jsonpb serializer failed: %v", err)
   148  	}
   149  	return b.Bytes(), nil
   150  }
   151  

View as plain text