1
18
19 package binarylog
20
21 import (
22 "context"
23 "net"
24 "strings"
25 "sync/atomic"
26 "time"
27
28 binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
29 "google.golang.org/grpc/metadata"
30 "google.golang.org/grpc/status"
31 "google.golang.org/protobuf/proto"
32 "google.golang.org/protobuf/types/known/durationpb"
33 "google.golang.org/protobuf/types/known/timestamppb"
34 )
35
36 type callIDGenerator struct {
37 id uint64
38 }
39
40 func (g *callIDGenerator) next() uint64 {
41 id := atomic.AddUint64(&g.id, 1)
42 return id
43 }
44
45
46 func (g *callIDGenerator) reset() {
47 g.id = 0
48 }
49
50 var idGen callIDGenerator
51
52
53
54
55
56 type MethodLogger interface {
57 Log(context.Context, LogEntryConfig)
58 }
59
60
61
62 type TruncatingMethodLogger struct {
63 headerMaxLen, messageMaxLen uint64
64
65 callID uint64
66 idWithinCallGen *callIDGenerator
67
68 sink Sink
69 }
70
71
72
73
74
75 func NewTruncatingMethodLogger(h, m uint64) *TruncatingMethodLogger {
76 return &TruncatingMethodLogger{
77 headerMaxLen: h,
78 messageMaxLen: m,
79
80 callID: idGen.next(),
81 idWithinCallGen: &callIDGenerator{},
82
83 sink: DefaultSink,
84 }
85 }
86
87
88
89
90 func (ml *TruncatingMethodLogger) Build(c LogEntryConfig) *binlogpb.GrpcLogEntry {
91 m := c.toProto()
92 timestamp := timestamppb.Now()
93 m.Timestamp = timestamp
94 m.CallId = ml.callID
95 m.SequenceIdWithinCall = ml.idWithinCallGen.next()
96
97 switch pay := m.Payload.(type) {
98 case *binlogpb.GrpcLogEntry_ClientHeader:
99 m.PayloadTruncated = ml.truncateMetadata(pay.ClientHeader.GetMetadata())
100 case *binlogpb.GrpcLogEntry_ServerHeader:
101 m.PayloadTruncated = ml.truncateMetadata(pay.ServerHeader.GetMetadata())
102 case *binlogpb.GrpcLogEntry_Message:
103 m.PayloadTruncated = ml.truncateMessage(pay.Message)
104 }
105 return m
106 }
107
108
109 func (ml *TruncatingMethodLogger) Log(ctx context.Context, c LogEntryConfig) {
110 ml.sink.Write(ml.Build(c))
111 }
112
113 func (ml *TruncatingMethodLogger) truncateMetadata(mdPb *binlogpb.Metadata) (truncated bool) {
114 if ml.headerMaxLen == maxUInt {
115 return false
116 }
117 var (
118 bytesLimit = ml.headerMaxLen
119 index int
120 )
121
122
123
124
125 for ; index < len(mdPb.Entry); index++ {
126 entry := mdPb.Entry[index]
127 if entry.Key == "grpc-trace-bin" {
128
129
130 continue
131 }
132 currentEntryLen := uint64(len(entry.GetKey())) + uint64(len(entry.GetValue()))
133 if currentEntryLen > bytesLimit {
134 break
135 }
136 bytesLimit -= currentEntryLen
137 }
138 truncated = index < len(mdPb.Entry)
139 mdPb.Entry = mdPb.Entry[:index]
140 return truncated
141 }
142
143 func (ml *TruncatingMethodLogger) truncateMessage(msgPb *binlogpb.Message) (truncated bool) {
144 if ml.messageMaxLen == maxUInt {
145 return false
146 }
147 if ml.messageMaxLen >= uint64(len(msgPb.Data)) {
148 return false
149 }
150 msgPb.Data = msgPb.Data[:ml.messageMaxLen]
151 return true
152 }
153
154
155
156
157
158 type LogEntryConfig interface {
159 toProto() *binlogpb.GrpcLogEntry
160 }
161
162
163 type ClientHeader struct {
164 OnClientSide bool
165 Header metadata.MD
166 MethodName string
167 Authority string
168 Timeout time.Duration
169
170 PeerAddr net.Addr
171 }
172
173 func (c *ClientHeader) toProto() *binlogpb.GrpcLogEntry {
174
175
176 clientHeader := &binlogpb.ClientHeader{
177 Metadata: mdToMetadataProto(c.Header),
178 MethodName: c.MethodName,
179 Authority: c.Authority,
180 }
181 if c.Timeout > 0 {
182 clientHeader.Timeout = durationpb.New(c.Timeout)
183 }
184 ret := &binlogpb.GrpcLogEntry{
185 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HEADER,
186 Payload: &binlogpb.GrpcLogEntry_ClientHeader{
187 ClientHeader: clientHeader,
188 },
189 }
190 if c.OnClientSide {
191 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
192 } else {
193 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
194 }
195 if c.PeerAddr != nil {
196 ret.Peer = addrToProto(c.PeerAddr)
197 }
198 return ret
199 }
200
201
202 type ServerHeader struct {
203 OnClientSide bool
204 Header metadata.MD
205
206 PeerAddr net.Addr
207 }
208
209 func (c *ServerHeader) toProto() *binlogpb.GrpcLogEntry {
210 ret := &binlogpb.GrpcLogEntry{
211 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_HEADER,
212 Payload: &binlogpb.GrpcLogEntry_ServerHeader{
213 ServerHeader: &binlogpb.ServerHeader{
214 Metadata: mdToMetadataProto(c.Header),
215 },
216 },
217 }
218 if c.OnClientSide {
219 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
220 } else {
221 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
222 }
223 if c.PeerAddr != nil {
224 ret.Peer = addrToProto(c.PeerAddr)
225 }
226 return ret
227 }
228
229
230 type ClientMessage struct {
231 OnClientSide bool
232
233
234 Message any
235 }
236
237 func (c *ClientMessage) toProto() *binlogpb.GrpcLogEntry {
238 var (
239 data []byte
240 err error
241 )
242 if m, ok := c.Message.(proto.Message); ok {
243 data, err = proto.Marshal(m)
244 if err != nil {
245 grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
246 }
247 } else if b, ok := c.Message.([]byte); ok {
248 data = b
249 } else {
250 grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
251 }
252 ret := &binlogpb.GrpcLogEntry{
253 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_MESSAGE,
254 Payload: &binlogpb.GrpcLogEntry_Message{
255 Message: &binlogpb.Message{
256 Length: uint32(len(data)),
257 Data: data,
258 },
259 },
260 }
261 if c.OnClientSide {
262 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
263 } else {
264 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
265 }
266 return ret
267 }
268
269
270 type ServerMessage struct {
271 OnClientSide bool
272
273
274 Message any
275 }
276
277 func (c *ServerMessage) toProto() *binlogpb.GrpcLogEntry {
278 var (
279 data []byte
280 err error
281 )
282 if m, ok := c.Message.(proto.Message); ok {
283 data, err = proto.Marshal(m)
284 if err != nil {
285 grpclogLogger.Infof("binarylogging: failed to marshal proto message: %v", err)
286 }
287 } else if b, ok := c.Message.([]byte); ok {
288 data = b
289 } else {
290 grpclogLogger.Infof("binarylogging: message to log is neither proto.message nor []byte")
291 }
292 ret := &binlogpb.GrpcLogEntry{
293 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_MESSAGE,
294 Payload: &binlogpb.GrpcLogEntry_Message{
295 Message: &binlogpb.Message{
296 Length: uint32(len(data)),
297 Data: data,
298 },
299 },
300 }
301 if c.OnClientSide {
302 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
303 } else {
304 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
305 }
306 return ret
307 }
308
309
310 type ClientHalfClose struct {
311 OnClientSide bool
312 }
313
314 func (c *ClientHalfClose) toProto() *binlogpb.GrpcLogEntry {
315 ret := &binlogpb.GrpcLogEntry{
316 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CLIENT_HALF_CLOSE,
317 Payload: nil,
318 }
319 if c.OnClientSide {
320 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
321 } else {
322 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
323 }
324 return ret
325 }
326
327
328 type ServerTrailer struct {
329 OnClientSide bool
330 Trailer metadata.MD
331
332 Err error
333
334
335 PeerAddr net.Addr
336 }
337
338 func (c *ServerTrailer) toProto() *binlogpb.GrpcLogEntry {
339 st, ok := status.FromError(c.Err)
340 if !ok {
341 grpclogLogger.Info("binarylogging: error in trailer is not a status error")
342 }
343 var (
344 detailsBytes []byte
345 err error
346 )
347 stProto := st.Proto()
348 if stProto != nil && len(stProto.Details) != 0 {
349 detailsBytes, err = proto.Marshal(stProto)
350 if err != nil {
351 grpclogLogger.Infof("binarylogging: failed to marshal status proto: %v", err)
352 }
353 }
354 ret := &binlogpb.GrpcLogEntry{
355 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_SERVER_TRAILER,
356 Payload: &binlogpb.GrpcLogEntry_Trailer{
357 Trailer: &binlogpb.Trailer{
358 Metadata: mdToMetadataProto(c.Trailer),
359 StatusCode: uint32(st.Code()),
360 StatusMessage: st.Message(),
361 StatusDetails: detailsBytes,
362 },
363 },
364 }
365 if c.OnClientSide {
366 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
367 } else {
368 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
369 }
370 if c.PeerAddr != nil {
371 ret.Peer = addrToProto(c.PeerAddr)
372 }
373 return ret
374 }
375
376
377 type Cancel struct {
378 OnClientSide bool
379 }
380
381 func (c *Cancel) toProto() *binlogpb.GrpcLogEntry {
382 ret := &binlogpb.GrpcLogEntry{
383 Type: binlogpb.GrpcLogEntry_EVENT_TYPE_CANCEL,
384 Payload: nil,
385 }
386 if c.OnClientSide {
387 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_CLIENT
388 } else {
389 ret.Logger = binlogpb.GrpcLogEntry_LOGGER_SERVER
390 }
391 return ret
392 }
393
394
395
396 func metadataKeyOmit(key string) bool {
397 switch key {
398 case "lb-token", ":path", ":authority", "content-encoding", "content-type", "user-agent", "te":
399 return true
400 case "grpc-trace-bin":
401 return false
402 }
403 return strings.HasPrefix(key, "grpc-")
404 }
405
406 func mdToMetadataProto(md metadata.MD) *binlogpb.Metadata {
407 ret := &binlogpb.Metadata{}
408 for k, vv := range md {
409 if metadataKeyOmit(k) {
410 continue
411 }
412 for _, v := range vv {
413 ret.Entry = append(ret.Entry,
414 &binlogpb.MetadataEntry{
415 Key: k,
416 Value: []byte(v),
417 },
418 )
419 }
420 }
421 return ret
422 }
423
424 func addrToProto(addr net.Addr) *binlogpb.Address {
425 ret := &binlogpb.Address{}
426 switch a := addr.(type) {
427 case *net.TCPAddr:
428 if a.IP.To4() != nil {
429 ret.Type = binlogpb.Address_TYPE_IPV4
430 } else if a.IP.To16() != nil {
431 ret.Type = binlogpb.Address_TYPE_IPV6
432 } else {
433 ret.Type = binlogpb.Address_TYPE_UNKNOWN
434
435 break
436 }
437 ret.Address = a.IP.String()
438 ret.IpPort = uint32(a.Port)
439 case *net.UnixAddr:
440 ret.Type = binlogpb.Address_TYPE_UNIX
441 ret.Address = a.String()
442 default:
443 ret.Type = binlogpb.Address_TYPE_UNKNOWN
444 }
445 return ret
446 }
447
View as plain text