...

Source file src/github.com/rs/zerolog/diode/diode.go

Documentation: github.com/rs/zerolog/diode

     1  // Package diode provides a thread-safe, lock-free, non-blocking io.Writer
     2  // wrapper.
     3  package diode
     4  
     5  import (
     6  	"context"
     7  	"io"
     8  	"sync"
     9  	"time"
    10  
    11  	"github.com/rs/zerolog/diode/internal/diodes"
    12  )
    13  
    14  var bufPool = &sync.Pool{
    15  	New: func() interface{} {
    16  		return make([]byte, 0, 500)
    17  	},
    18  }
    19  
    20  type Alerter func(missed int)
    21  
    22  type diodeFetcher interface {
    23  	diodes.Diode
    24  	Next() diodes.GenericDataType
    25  }
    26  
    27  // Writer is a io.Writer wrapper that uses a diode to make Write lock-free,
    28  // non-blocking and thread safe.
    29  type Writer struct {
    30  	w    io.Writer
    31  	d    diodeFetcher
    32  	c    context.CancelFunc
    33  	done chan struct{}
    34  }
    35  
    36  // NewWriter creates a writer wrapping w with a many-to-one diode in order to
    37  // never block log producers and drop events if the writer can't keep up with
    38  // the flow of data.
    39  //
    40  // Use a diode.Writer when
    41  //
    42  //     wr := diode.NewWriter(w, 1000, 0, func(missed int) {
    43  //         log.Printf("Dropped %d messages", missed)
    44  //     })
    45  //     log := zerolog.New(wr)
    46  //
    47  // If pollInterval is greater than 0, a poller is used otherwise a waiter is
    48  // used.
    49  //
    50  // See code.cloudfoundry.org/go-diodes for more info on diode.
    51  func NewWriter(w io.Writer, size int, pollInterval time.Duration, f Alerter) Writer {
    52  	ctx, cancel := context.WithCancel(context.Background())
    53  	dw := Writer{
    54  		w:    w,
    55  		c:    cancel,
    56  		done: make(chan struct{}),
    57  	}
    58  	if f == nil {
    59  		f = func(int) {}
    60  	}
    61  	d := diodes.NewManyToOne(size, diodes.AlertFunc(f))
    62  	if pollInterval > 0 {
    63  		dw.d = diodes.NewPoller(d,
    64  			diodes.WithPollingInterval(pollInterval),
    65  			diodes.WithPollingContext(ctx))
    66  	} else {
    67  		dw.d = diodes.NewWaiter(d,
    68  			diodes.WithWaiterContext(ctx))
    69  	}
    70  	go dw.poll()
    71  	return dw
    72  }
    73  
    74  func (dw Writer) Write(p []byte) (n int, err error) {
    75  	// p is pooled in zerolog so we can't hold it passed this call, hence the
    76  	// copy.
    77  	p = append(bufPool.Get().([]byte), p...)
    78  	dw.d.Set(diodes.GenericDataType(&p))
    79  	return len(p), nil
    80  }
    81  
    82  // Close releases the diode poller and call Close on the wrapped writer if
    83  // io.Closer is implemented.
    84  func (dw Writer) Close() error {
    85  	dw.c()
    86  	<-dw.done
    87  	if w, ok := dw.w.(io.Closer); ok {
    88  		return w.Close()
    89  	}
    90  	return nil
    91  }
    92  
    93  func (dw Writer) poll() {
    94  	defer close(dw.done)
    95  	for {
    96  		d := dw.d.Next()
    97  		if d == nil {
    98  			return
    99  		}
   100  		p := *(*[]byte)(d)
   101  		dw.w.Write(p)
   102  
   103  		// Proper usage of a sync.Pool requires each entry to have approximately
   104  		// the same memory cost. To obtain this property when the stored type
   105  		// contains a variably-sized buffer, we add a hard limit on the maximum buffer
   106  		// to place back in the pool.
   107  		//
   108  		// See https://golang.org/issue/23199
   109  		const maxSize = 1 << 16 // 64KiB
   110  		if cap(p) <= maxSize {
   111  			bufPool.Put(p[:0])
   112  		}
   113  	}
   114  }
   115  

View as plain text