...
1
18
19 package binarylog
20
21 import (
22 "bufio"
23 "encoding/binary"
24 "io"
25 "sync"
26 "time"
27
28 binlogpb "google.golang.org/grpc/binarylog/grpc_binarylog_v1"
29 "google.golang.org/protobuf/proto"
30 )
31
32 var (
33
34
35 DefaultSink Sink = &noopSink{}
36 )
37
38
39
40
41 type Sink interface {
42
43
44
45 Write(*binlogpb.GrpcLogEntry) error
46
47 Close() error
48 }
49
50 type noopSink struct{}
51
52 func (ns *noopSink) Write(*binlogpb.GrpcLogEntry) error { return nil }
53 func (ns *noopSink) Close() error { return nil }
54
55
56
57
58
59
60
61 func newWriterSink(w io.Writer) Sink {
62 return &writerSink{out: w}
63 }
64
65 type writerSink struct {
66 out io.Writer
67 }
68
69 func (ws *writerSink) Write(e *binlogpb.GrpcLogEntry) error {
70 b, err := proto.Marshal(e)
71 if err != nil {
72 grpclogLogger.Errorf("binary logging: failed to marshal proto message: %v", err)
73 return err
74 }
75 hdr := make([]byte, 4)
76 binary.BigEndian.PutUint32(hdr, uint32(len(b)))
77 if _, err := ws.out.Write(hdr); err != nil {
78 return err
79 }
80 if _, err := ws.out.Write(b); err != nil {
81 return err
82 }
83 return nil
84 }
85
86 func (ws *writerSink) Close() error { return nil }
87
88 type bufferedSink struct {
89 mu sync.Mutex
90 closer io.Closer
91 out Sink
92 buf *bufio.Writer
93 flusherStarted bool
94
95 writeTicker *time.Ticker
96 done chan struct{}
97 }
98
99 func (fs *bufferedSink) Write(e *binlogpb.GrpcLogEntry) error {
100 fs.mu.Lock()
101 defer fs.mu.Unlock()
102 if !fs.flusherStarted {
103
104 fs.startFlushGoroutine()
105 fs.flusherStarted = true
106 }
107 if err := fs.out.Write(e); err != nil {
108 return err
109 }
110 return nil
111 }
112
113 const (
114 bufFlushDuration = 60 * time.Second
115 )
116
117 func (fs *bufferedSink) startFlushGoroutine() {
118 fs.writeTicker = time.NewTicker(bufFlushDuration)
119 go func() {
120 for {
121 select {
122 case <-fs.done:
123 return
124 case <-fs.writeTicker.C:
125 }
126 fs.mu.Lock()
127 if err := fs.buf.Flush(); err != nil {
128 grpclogLogger.Warningf("failed to flush to Sink: %v", err)
129 }
130 fs.mu.Unlock()
131 }
132 }()
133 }
134
135 func (fs *bufferedSink) Close() error {
136 fs.mu.Lock()
137 defer fs.mu.Unlock()
138 if fs.writeTicker != nil {
139 fs.writeTicker.Stop()
140 }
141 close(fs.done)
142 if err := fs.buf.Flush(); err != nil {
143 grpclogLogger.Warningf("failed to flush to Sink: %v", err)
144 }
145 if err := fs.closer.Close(); err != nil {
146 grpclogLogger.Warningf("failed to close the underlying WriterCloser: %v", err)
147 }
148 if err := fs.out.Close(); err != nil {
149 grpclogLogger.Warningf("failed to close the Sink: %v", err)
150 }
151 return nil
152 }
153
154
155
156
157
158
159
160
161
162 func NewBufferedSink(o io.WriteCloser) Sink {
163 bufW := bufio.NewWriter(o)
164 return &bufferedSink{
165 closer: o,
166 out: newWriterSink(bufW),
167 buf: bufW,
168 done: make(chan struct{}),
169 }
170 }
171
View as plain text