1 // Copyright (c) 2021 Uber Technologies, Inc. 2 // 3 // Permission is hereby granted, free of charge, to any person obtaining a copy 4 // of this software and associated documentation files (the "Software"), to deal 5 // in the Software without restriction, including without limitation the rights 6 // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell 7 // copies of the Software, and to permit persons to whom the Software is 8 // furnished to do so, subject to the following conditions: 9 // 10 // The above copyright notice and this permission notice shall be included in 11 // all copies or substantial portions of the Software. 12 // 13 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 14 // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 15 // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE 16 // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 17 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, 18 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN 19 // THE SOFTWARE. 20 21 package zapcore 22 23 import ( 24 "bufio" 25 "sync" 26 "time" 27 28 "go.uber.org/multierr" 29 ) 30 31 const ( 32 // _defaultBufferSize specifies the default size used by Buffer. 33 _defaultBufferSize = 256 * 1024 // 256 kB 34 35 // _defaultFlushInterval specifies the default flush interval for 36 // Buffer. 37 _defaultFlushInterval = 30 * time.Second 38 ) 39 40 // A BufferedWriteSyncer is a WriteSyncer that buffers writes in-memory before 41 // flushing them to a wrapped WriteSyncer after reaching some limit, or at some 42 // fixed interval--whichever comes first. 43 // 44 // BufferedWriteSyncer is safe for concurrent use. You don't need to use 45 // zapcore.Lock for WriteSyncers with BufferedWriteSyncer. 46 // 47 // To set up a BufferedWriteSyncer, construct a WriteSyncer for your log 48 // destination (*os.File is a valid WriteSyncer), wrap it with 49 // BufferedWriteSyncer, and defer a Stop() call for when you no longer need the 50 // object. 51 // 52 // func main() { 53 // ws := ... // your log destination 54 // bws := &zapcore.BufferedWriteSyncer{WS: ws} 55 // defer bws.Stop() 56 // 57 // // ... 58 // core := zapcore.NewCore(enc, bws, lvl) 59 // logger := zap.New(core) 60 // 61 // // ... 62 // } 63 // 64 // By default, a BufferedWriteSyncer will buffer up to 256 kilobytes of logs, 65 // waiting at most 30 seconds between flushes. 66 // You can customize these parameters by setting the Size or FlushInterval 67 // fields. 68 // For example, the following buffers up to 512 kB of logs before flushing them 69 // to Stderr, with a maximum of one minute between each flush. 70 // 71 // ws := &BufferedWriteSyncer{ 72 // WS: os.Stderr, 73 // Size: 512 * 1024, // 512 kB 74 // FlushInterval: time.Minute, 75 // } 76 // defer ws.Stop() 77 type BufferedWriteSyncer struct { 78 // WS is the WriteSyncer around which BufferedWriteSyncer will buffer 79 // writes. 80 // 81 // This field is required. 82 WS WriteSyncer 83 84 // Size specifies the maximum amount of data the writer will buffered 85 // before flushing. 86 // 87 // Defaults to 256 kB if unspecified. 88 Size int 89 90 // FlushInterval specifies how often the writer should flush data if 91 // there have been no writes. 92 // 93 // Defaults to 30 seconds if unspecified. 94 FlushInterval time.Duration 95 96 // Clock, if specified, provides control of the source of time for the 97 // writer. 98 // 99 // Defaults to the system clock. 100 Clock Clock 101 102 // unexported fields for state 103 mu sync.Mutex 104 initialized bool // whether initialize() has run 105 stopped bool // whether Stop() has run 106 writer *bufio.Writer 107 ticker *time.Ticker 108 stop chan struct{} // closed when flushLoop should stop 109 done chan struct{} // closed when flushLoop has stopped 110 } 111 112 func (s *BufferedWriteSyncer) initialize() { 113 size := s.Size 114 if size == 0 { 115 size = _defaultBufferSize 116 } 117 118 flushInterval := s.FlushInterval 119 if flushInterval == 0 { 120 flushInterval = _defaultFlushInterval 121 } 122 123 if s.Clock == nil { 124 s.Clock = DefaultClock 125 } 126 127 s.ticker = s.Clock.NewTicker(flushInterval) 128 s.writer = bufio.NewWriterSize(s.WS, size) 129 s.stop = make(chan struct{}) 130 s.done = make(chan struct{}) 131 s.initialized = true 132 go s.flushLoop() 133 } 134 135 // Write writes log data into buffer syncer directly, multiple Write calls will be batched, 136 // and log data will be flushed to disk when the buffer is full or periodically. 137 func (s *BufferedWriteSyncer) Write(bs []byte) (int, error) { 138 s.mu.Lock() 139 defer s.mu.Unlock() 140 141 if !s.initialized { 142 s.initialize() 143 } 144 145 // To avoid partial writes from being flushed, we manually flush the existing buffer if: 146 // * The current write doesn't fit into the buffer fully, and 147 // * The buffer is not empty (since bufio will not split large writes when the buffer is empty) 148 if len(bs) > s.writer.Available() && s.writer.Buffered() > 0 { 149 if err := s.writer.Flush(); err != nil { 150 return 0, err 151 } 152 } 153 154 return s.writer.Write(bs) 155 } 156 157 // Sync flushes buffered log data into disk directly. 158 func (s *BufferedWriteSyncer) Sync() error { 159 s.mu.Lock() 160 defer s.mu.Unlock() 161 162 var err error 163 if s.initialized { 164 err = s.writer.Flush() 165 } 166 167 return multierr.Append(err, s.WS.Sync()) 168 } 169 170 // flushLoop flushes the buffer at the configured interval until Stop is 171 // called. 172 func (s *BufferedWriteSyncer) flushLoop() { 173 defer close(s.done) 174 175 for { 176 select { 177 case <-s.ticker.C: 178 // we just simply ignore error here 179 // because the underlying bufio writer stores any errors 180 // and we return any error from Sync() as part of the close 181 _ = s.Sync() 182 case <-s.stop: 183 return 184 } 185 } 186 } 187 188 // Stop closes the buffer, cleans up background goroutines, and flushes 189 // remaining unwritten data. 190 func (s *BufferedWriteSyncer) Stop() (err error) { 191 var stopped bool 192 193 // Critical section. 194 func() { 195 s.mu.Lock() 196 defer s.mu.Unlock() 197 198 if !s.initialized { 199 return 200 } 201 202 stopped = s.stopped 203 if stopped { 204 return 205 } 206 s.stopped = true 207 208 s.ticker.Stop() 209 close(s.stop) // tell flushLoop to stop 210 <-s.done // and wait until it has 211 }() 212 213 // Don't call Sync on consecutive Stops. 214 if !stopped { 215 err = s.Sync() 216 } 217 218 return err 219 } 220