...

Source file src/go.uber.org/zap/zapcore/buffered_write_syncer.go

Documentation: go.uber.org/zap/zapcore

     1  // Copyright (c) 2021 Uber Technologies, Inc.
     2  //
     3  // Permission is hereby granted, free of charge, to any person obtaining a copy
     4  // of this software and associated documentation files (the "Software"), to deal
     5  // in the Software without restriction, including without limitation the rights
     6  // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
     7  // copies of the Software, and to permit persons to whom the Software is
     8  // furnished to do so, subject to the following conditions:
     9  //
    10  // The above copyright notice and this permission notice shall be included in
    11  // all copies or substantial portions of the Software.
    12  //
    13  // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
    14  // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
    15  // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
    16  // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
    17  // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
    18  // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
    19  // THE SOFTWARE.
    20  
    21  package zapcore
    22  
    23  import (
    24  	"bufio"
    25  	"sync"
    26  	"time"
    27  
    28  	"go.uber.org/multierr"
    29  )
    30  
    31  const (
    32  	// _defaultBufferSize specifies the default size used by Buffer.
    33  	_defaultBufferSize = 256 * 1024 // 256 kB
    34  
    35  	// _defaultFlushInterval specifies the default flush interval for
    36  	// Buffer.
    37  	_defaultFlushInterval = 30 * time.Second
    38  )
    39  
    40  // A BufferedWriteSyncer is a WriteSyncer that buffers writes in-memory before
    41  // flushing them to a wrapped WriteSyncer after reaching some limit, or at some
    42  // fixed interval--whichever comes first.
    43  //
    44  // BufferedWriteSyncer is safe for concurrent use. You don't need to use
    45  // zapcore.Lock for WriteSyncers with BufferedWriteSyncer.
    46  //
    47  // To set up a BufferedWriteSyncer, construct a WriteSyncer for your log
    48  // destination (*os.File is a valid WriteSyncer), wrap it with
    49  // BufferedWriteSyncer, and defer a Stop() call for when you no longer need the
    50  // object.
    51  //
    52  //	 func main() {
    53  //	   ws := ... // your log destination
    54  //	   bws := &zapcore.BufferedWriteSyncer{WS: ws}
    55  //	   defer bws.Stop()
    56  //
    57  //	   // ...
    58  //	   core := zapcore.NewCore(enc, bws, lvl)
    59  //	   logger := zap.New(core)
    60  //
    61  //	   // ...
    62  //	}
    63  //
    64  // By default, a BufferedWriteSyncer will buffer up to 256 kilobytes of logs,
    65  // waiting at most 30 seconds between flushes.
    66  // You can customize these parameters by setting the Size or FlushInterval
    67  // fields.
    68  // For example, the following buffers up to 512 kB of logs before flushing them
    69  // to Stderr, with a maximum of one minute between each flush.
    70  //
    71  //	ws := &BufferedWriteSyncer{
    72  //	  WS:            os.Stderr,
    73  //	  Size:          512 * 1024, // 512 kB
    74  //	  FlushInterval: time.Minute,
    75  //	}
    76  //	defer ws.Stop()
    77  type BufferedWriteSyncer struct {
    78  	// WS is the WriteSyncer around which BufferedWriteSyncer will buffer
    79  	// writes.
    80  	//
    81  	// This field is required.
    82  	WS WriteSyncer
    83  
    84  	// Size specifies the maximum amount of data the writer will buffered
    85  	// before flushing.
    86  	//
    87  	// Defaults to 256 kB if unspecified.
    88  	Size int
    89  
    90  	// FlushInterval specifies how often the writer should flush data if
    91  	// there have been no writes.
    92  	//
    93  	// Defaults to 30 seconds if unspecified.
    94  	FlushInterval time.Duration
    95  
    96  	// Clock, if specified, provides control of the source of time for the
    97  	// writer.
    98  	//
    99  	// Defaults to the system clock.
   100  	Clock Clock
   101  
   102  	// unexported fields for state
   103  	mu          sync.Mutex
   104  	initialized bool // whether initialize() has run
   105  	stopped     bool // whether Stop() has run
   106  	writer      *bufio.Writer
   107  	ticker      *time.Ticker
   108  	stop        chan struct{} // closed when flushLoop should stop
   109  	done        chan struct{} // closed when flushLoop has stopped
   110  }
   111  
   112  func (s *BufferedWriteSyncer) initialize() {
   113  	size := s.Size
   114  	if size == 0 {
   115  		size = _defaultBufferSize
   116  	}
   117  
   118  	flushInterval := s.FlushInterval
   119  	if flushInterval == 0 {
   120  		flushInterval = _defaultFlushInterval
   121  	}
   122  
   123  	if s.Clock == nil {
   124  		s.Clock = DefaultClock
   125  	}
   126  
   127  	s.ticker = s.Clock.NewTicker(flushInterval)
   128  	s.writer = bufio.NewWriterSize(s.WS, size)
   129  	s.stop = make(chan struct{})
   130  	s.done = make(chan struct{})
   131  	s.initialized = true
   132  	go s.flushLoop()
   133  }
   134  
   135  // Write writes log data into buffer syncer directly, multiple Write calls will be batched,
   136  // and log data will be flushed to disk when the buffer is full or periodically.
   137  func (s *BufferedWriteSyncer) Write(bs []byte) (int, error) {
   138  	s.mu.Lock()
   139  	defer s.mu.Unlock()
   140  
   141  	if !s.initialized {
   142  		s.initialize()
   143  	}
   144  
   145  	// To avoid partial writes from being flushed, we manually flush the existing buffer if:
   146  	// * The current write doesn't fit into the buffer fully, and
   147  	// * The buffer is not empty (since bufio will not split large writes when the buffer is empty)
   148  	if len(bs) > s.writer.Available() && s.writer.Buffered() > 0 {
   149  		if err := s.writer.Flush(); err != nil {
   150  			return 0, err
   151  		}
   152  	}
   153  
   154  	return s.writer.Write(bs)
   155  }
   156  
   157  // Sync flushes buffered log data into disk directly.
   158  func (s *BufferedWriteSyncer) Sync() error {
   159  	s.mu.Lock()
   160  	defer s.mu.Unlock()
   161  
   162  	var err error
   163  	if s.initialized {
   164  		err = s.writer.Flush()
   165  	}
   166  
   167  	return multierr.Append(err, s.WS.Sync())
   168  }
   169  
   170  // flushLoop flushes the buffer at the configured interval until Stop is
   171  // called.
   172  func (s *BufferedWriteSyncer) flushLoop() {
   173  	defer close(s.done)
   174  
   175  	for {
   176  		select {
   177  		case <-s.ticker.C:
   178  			// we just simply ignore error here
   179  			// because the underlying bufio writer stores any errors
   180  			// and we return any error from Sync() as part of the close
   181  			_ = s.Sync()
   182  		case <-s.stop:
   183  			return
   184  		}
   185  	}
   186  }
   187  
   188  // Stop closes the buffer, cleans up background goroutines, and flushes
   189  // remaining unwritten data.
   190  func (s *BufferedWriteSyncer) Stop() (err error) {
   191  	var stopped bool
   192  
   193  	// Critical section.
   194  	func() {
   195  		s.mu.Lock()
   196  		defer s.mu.Unlock()
   197  
   198  		if !s.initialized {
   199  			return
   200  		}
   201  
   202  		stopped = s.stopped
   203  		if stopped {
   204  			return
   205  		}
   206  		s.stopped = true
   207  
   208  		s.ticker.Stop()
   209  		close(s.stop) // tell flushLoop to stop
   210  		<-s.done      // and wait until it has
   211  	}()
   212  
   213  	// Don't call Sync on consecutive Stops.
   214  	if !stopped {
   215  		err = s.Sync()
   216  	}
   217  
   218  	return err
   219  }
   220  

View as plain text