...

Source file src/google.golang.org/grpc/internal/binarylog/method_logger.go

Documentation: google.golang.org/grpc/internal/binarylog

     1  /*
     2   *
     3   * Copyright 2018 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package binarylog
    20  
    21  import (
    22  	"context"
    23  	"net"
    24  	"strings"
    25  	"sync/atomic"
    26  	"time"
    27  
    28  	binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
    29  	"google.golang.org/grpc/metadata"
    30  	"google.golang.org/grpc/status"
    31  	"google.golang.org/protobuf/proto"
    32  	"google.golang.org/protobuf/types/known/durationpb"
    33  	"google.golang.org/protobuf/types/known/timestamppb"
    34  )
    35  
    36  type callIDGenerator struct {
    37  	id uint64
    38  }
    39  
    40  func (g *callIDGenerator) next() uint64 {
    41  	id := atomic.AddUint64(&g.id, 1)
    42  	return id
    43  }
    44  
    45  // reset is for testing only, and doesn't need to be thread safe.
    46  func (g *callIDGenerator) reset() {
    47  	g.id = 0
    48  }
    49  
    50  var idGen callIDGenerator
    51  
    52  // MethodLogger is the sub-logger for each method.
    53  //
    54  // This is used in the 1.0 release of gcp/observability, and thus must not be
    55  // deleted or changed.
    56  type MethodLogger interface {
    57  	Log(context.Context, LogEntryConfig)
    58  }
    59  
    60  // TruncatingMethodLogger is a method logger that truncates headers and messages
    61  // based on configured fields.
    62  type TruncatingMethodLogger struct {
    63  	headerMaxLen, messageMaxLen uint64
    64  
    65  	callID          uint64
    66  	idWithinCallGen *callIDGenerator
    67  
    68  	sink Sink // TODO(blog): make this pluggable.
    69  }
    70  
    71  // NewTruncatingMethodLogger returns a new truncating method logger.
    72  //
    73  // This is used in the 1.0 release of gcp/observability, and thus must not be
    74  // deleted or changed.
    75  func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
    76  	return &TruncatingMethodLogger{
    77  		headerMaxLen:  h,
    78  		messageMaxLen: m,
    79  
    80  		callID:          idGen.next(),
    81  		idWithinCallGen: &callIDGenerator{},
    82  
    83  		sink: DefaultSink, // TODO(blog): make it pluggable.
    84  	}
    85  }
    86  
    87  // Build is an internal only method for building the proto message out of the
    88  // input event. It's made public to enable other library to reuse as much logic
    89  // in TruncatingMethodLogger as possible.
    90  func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {
    91  	m := c.toProto()
    92  	timestamp := timestamppb.Now()
    93  	m.Timestamp = timestamp
    94  	m.CallId = ml.callID
    95  	m.SequenceIdWithinCall = ml.idWithinCallGen.next()
    96  
    97  	switch pay := m.Payload.(type) {
    98  	case *binlogpb.GrpcLogEntry_ClientHeader:
    99  		m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
   100  	case *binlogpb.GrpcLogEntry_ServerHeader:
   101  		m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
   102  	case *binlogpb.GrpcLogEntry_Message:
   103  		m.PayloadTruncated = ml.truncateMessage(pay.Message)
   104  	}
   105  	return m
   106  }
   107  
   108  // Log creates a proto binary log entry, and logs it to the sink.
   109  func (ml *TruncatingMethodLogger) Log(ctx context.Context, c LogEntryConfig) {
   110  	ml.sink.Write(ml.Build(c))
   111  }
   112  
   113  func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) {
   114  	if ml.headerMaxLen == maxUInt {
   115  		return false
   116  	}
   117  	var (
   118  		bytesLimit = ml.headerMaxLen
   119  		index      int
   120  	)
   121  	// At the end of the loop, index will be the first entry where the total
   122  	// size is greater than the limit:
   123  	//
   124  	// len(entry[:index]) <= ml.hdr && len(entry[:index+1]) > ml.hdr.
   125  	for ; index < len(mdPb.Entry); index++ {
   126  		entry := mdPb.Entry[index]
   127  		if entry.Key == "grpc-trace-bin" {
   128  			// "grpc-trace-bin" is a special key. It's kept in the log entry,
   129  			// but not counted towards the size limit.
   130  			continue
   131  		}
   132  		currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue()))
   133  		if currentEntryLen > bytesLimit {
   134  			break
   135  		}
   136  		bytesLimit -= currentEntryLen
   137  	}
   138  	truncated = index < len(mdPb.Entry)
   139  	mdPb.Entry = mdPb.Entry[:index]
   140  	return truncated
   141  }
   142  
   143  func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) {
   144  	if ml.messageMaxLen == maxUInt {
   145  		return false
   146  	}
   147  	if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
   148  		return false
   149  	}
   150  	msgPb.Data = msgPb.Data[:ml.messageMaxLen]
   151  	return true
   152  }
   153  
   154  // LogEntryConfig represents the configuration for binary log entry.
   155  //
   156  // This is used in the 1.0 release of gcp/observability, and thus must not be
   157  // deleted or changed.
   158  type LogEntryConfig interface {
   159  	toProto() *binlogpb.GrpcLogEntry
   160  }
   161  
   162  // ClientHeader configs the binary log entry to be a ClientHeader entry.
   163  type ClientHeader struct {
   164  	OnClientSide bool
   165  	Header       metadata.MD
   166  	MethodName   string
   167  	Authority    string
   168  	Timeout      time.Duration
   169  	// PeerAddr is required only when it's on server side.
   170  	PeerAddr net.Addr
   171  }
   172  
   173  func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {
   174  	// This function doesn't need to set all the fields (e.g. seq ID). The Log
   175  	// function will set the fields when necessary.
   176  	clientHeader := &binlogpb.ClientHeader{
   177  		Metadata:   mdToMetadataProto(c.Header),
   178  		MethodName: c.MethodName,
   179  		Authority:  c.Authority,
   180  	}
   181  	if c.Timeout > 0 {
   182  		clientHeader.Timeout = durationpb.New(c.Timeout)
   183  	}
   184  	ret := &binlogpb.GrpcLogEntry{
   185  		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
   186  		Payload: &binlogpb.GrpcLogEntry_ClientHeader{
   187  			ClientHeader: clientHeader,
   188  		},
   189  	}
   190  	if c.OnClientSide {
   191  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
   192  	} else {
   193  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
   194  	}
   195  	if c.PeerAddr != nil {
   196  		ret.Peer = addrToProto(c.PeerAddr)
   197  	}
   198  	return ret
   199  }
   200  
   201  // ServerHeader configs the binary log entry to be a ServerHeader entry.
   202  type ServerHeader struct {
   203  	OnClientSide bool
   204  	Header       metadata.MD
   205  	// PeerAddr is required only when it's on client side.
   206  	PeerAddr net.Addr
   207  }
   208  
   209  func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry {
   210  	ret := &binlogpb.GrpcLogEntry{
   211  		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
   212  		Payload: &binlogpb.GrpcLogEntry_ServerHeader{
   213  			ServerHeader: &binlogpb.ServerHeader{
   214  				Metadata: mdToMetadataProto(c.Header),
   215  			},
   216  		},
   217  	}
   218  	if c.OnClientSide {
   219  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
   220  	} else {
   221  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
   222  	}
   223  	if c.PeerAddr != nil {
   224  		ret.Peer = addrToProto(c.PeerAddr)
   225  	}
   226  	return ret
   227  }
   228  
   229  // ClientMessage configs the binary log entry to be a ClientMessage entry.
   230  type ClientMessage struct {
   231  	OnClientSide bool
   232  	// Message can be a proto.Message or []byte. Other messages formats are not
   233  	// supported.
   234  	Message any
   235  }
   236  
   237  func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry {
   238  	var (
   239  		data []byte
   240  		err  error
   241  	)
   242  	if m, ok := c.Message.(proto.Message); ok {
   243  		data, err = proto.Marshal(m)
   244  		if err != nil {
   245  			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
   246  		}
   247  	} else if b, ok := c.Message.([]byte); ok {
   248  		data = b
   249  	} else {
   250  		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
   251  	}
   252  	ret := &binlogpb.GrpcLogEntry{
   253  		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
   254  		Payload: &binlogpb.GrpcLogEntry_Message{
   255  			Message: &binlogpb.Message{
   256  				Length: uint32(len(data)),
   257  				Data:   data,
   258  			},
   259  		},
   260  	}
   261  	if c.OnClientSide {
   262  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
   263  	} else {
   264  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
   265  	}
   266  	return ret
   267  }
   268  
   269  // ServerMessage configs the binary log entry to be a ServerMessage entry.
   270  type ServerMessage struct {
   271  	OnClientSide bool
   272  	// Message can be a proto.Message or []byte. Other messages formats are not
   273  	// supported.
   274  	Message any
   275  }
   276  
   277  func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry {
   278  	var (
   279  		data []byte
   280  		err  error
   281  	)
   282  	if m, ok := c.Message.(proto.Message); ok {
   283  		data, err = proto.Marshal(m)
   284  		if err != nil {
   285  			grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
   286  		}
   287  	} else if b, ok := c.Message.([]byte); ok {
   288  		data = b
   289  	} else {
   290  		grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
   291  	}
   292  	ret := &binlogpb.GrpcLogEntry{
   293  		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
   294  		Payload: &binlogpb.GrpcLogEntry_Message{
   295  			Message: &binlogpb.Message{
   296  				Length: uint32(len(data)),
   297  				Data:   data,
   298  			},
   299  		},
   300  	}
   301  	if c.OnClientSide {
   302  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
   303  	} else {
   304  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
   305  	}
   306  	return ret
   307  }
   308  
   309  // ClientHalfClose configs the binary log entry to be a ClientHalfClose entry.
   310  type ClientHalfClose struct {
   311  	OnClientSide bool
   312  }
   313  
   314  func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry {
   315  	ret := &binlogpb.GrpcLogEntry{
   316  		Type:    binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
   317  		Payload: nil, // No payload here.
   318  	}
   319  	if c.OnClientSide {
   320  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
   321  	} else {
   322  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
   323  	}
   324  	return ret
   325  }
   326  
   327  // ServerTrailer configs the binary log entry to be a ServerTrailer entry.
   328  type ServerTrailer struct {
   329  	OnClientSide bool
   330  	Trailer      metadata.MD
   331  	// Err is the status error.
   332  	Err error
   333  	// PeerAddr is required only when it's on client side and the RPC is trailer
   334  	// only.
   335  	PeerAddr net.Addr
   336  }
   337  
   338  func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry {
   339  	st, ok := status.FromError(c.Err)
   340  	if !ok {
   341  		grpclogLogger.Info("binarylogging: error in trailer is not a status error")
   342  	}
   343  	var (
   344  		detailsBytes []byte
   345  		err          error
   346  	)
   347  	stProto := st.Proto()
   348  	if stProto != nil && len(stProto.Details) != 0 {
   349  		detailsBytes, err = proto.Marshal(stProto)
   350  		if err != nil {
   351  			grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
   352  		}
   353  	}
   354  	ret := &binlogpb.GrpcLogEntry{
   355  		Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
   356  		Payload: &binlogpb.GrpcLogEntry_Trailer{
   357  			Trailer: &binlogpb.Trailer{
   358  				Metadata:      mdToMetadataProto(c.Trailer),
   359  				StatusCode:    uint32(st.Code()),
   360  				StatusMessage: st.Message(),
   361  				StatusDetails: detailsBytes,
   362  			},
   363  		},
   364  	}
   365  	if c.OnClientSide {
   366  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
   367  	} else {
   368  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
   369  	}
   370  	if c.PeerAddr != nil {
   371  		ret.Peer = addrToProto(c.PeerAddr)
   372  	}
   373  	return ret
   374  }
   375  
   376  // Cancel configs the binary log entry to be a Cancel entry.
   377  type Cancel struct {
   378  	OnClientSide bool
   379  }
   380  
   381  func (c *Cancel) toProto() *binlogpb.GrpcLogEntry {
   382  	ret := &binlogpb.GrpcLogEntry{
   383  		Type:    binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
   384  		Payload: nil,
   385  	}
   386  	if c.OnClientSide {
   387  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
   388  	} else {
   389  		ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
   390  	}
   391  	return ret
   392  }
   393  
   394  // metadataKeyOmit returns whether the metadata entry with this key should be
   395  // omitted.
   396  func metadataKeyOmit(key string) bool {
   397  	switch key {
   398  	case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
   399  		return true
   400  	case "grpc-trace-bin": // grpc-trace-bin is special because it's visible to users.
   401  		return false
   402  	}
   403  	return strings.HasPrefix(key, "grpc-")
   404  }
   405  
   406  func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata {
   407  	ret := &binlogpb.Metadata{}
   408  	for k, vv := range md {
   409  		if metadataKeyOmit(k) {
   410  			continue
   411  		}
   412  		for _, v := range vv {
   413  			ret.Entry = append(ret.Entry,
   414  				&binlogpb.MetadataEntry{
   415  					Key:   k,
   416  					Value: []byte(v),
   417  				},
   418  			)
   419  		}
   420  	}
   421  	return ret
   422  }
   423  
   424  func addrToProto(addr net.Addr) *binlogpb.Address {
   425  	ret := &binlogpb.Address{}
   426  	switch a := addr.(type) {
   427  	case *net.TCPAddr:
   428  		if a.IP.To4() != nil {
   429  			ret.Type = binlogpb.Address_TYPE_IPV4
   430  		} else if a.IP.To16() != nil {
   431  			ret.Type = binlogpb.Address_TYPE_IPV6
   432  		} else {
   433  			ret.Type = binlogpb.Address_TYPE_UNKNOWN
   434  			// Do not set address and port fields.
   435  			break
   436  		}
   437  		ret.Address = a.IP.String()
   438  		ret.IpPort = uint32(a.Port)
   439  	case *net.UnixAddr:
   440  		ret.Type = binlogpb.Address_TYPE_UNIX
   441  		ret.Address = a.String()
   442  	default:
   443  		ret.Type = binlogpb.Address_TYPE_UNKNOWN
   444  	}
   445  	return ret
   446  }
   447  

View as plain text