...

Source file src/github.com/rs/zerolog/diode/internal/diodes/many_to_one.go

Documentation: github.com/rs/zerolog/diode/internal/diodes

     1  package diodes
     2  
     3  import (
     4  	"log"
     5  	"sync/atomic"
     6  	"unsafe"
     7  )
     8  
     9  // ManyToOne diode is optimal for many writers (go-routines B-n) and a single
    10  // reader (go-routine A). It is not thread safe for multiple readers.
    11  type ManyToOne struct {
    12  	writeIndex uint64
    13  	readIndex  uint64
    14  	buffer     []unsafe.Pointer
    15  	alerter    Alerter
    16  }
    17  
    18  // NewManyToOne creates a new diode (ring buffer). The ManyToOne diode
    19  // is optimized for many writers (on go-routines B-n) and a single reader
    20  // (on go-routine A). The alerter is invoked on the read's go-routine. It is
    21  // called when it notices that the writer go-routine has passed it and wrote
    22  // over data. A nil can be used to ignore alerts.
    23  func NewManyToOne(size int, alerter Alerter) *ManyToOne {
    24  	if alerter == nil {
    25  		alerter = AlertFunc(func(int) {})
    26  	}
    27  
    28  	d := &ManyToOne{
    29  		buffer:  make([]unsafe.Pointer, size),
    30  		alerter: alerter,
    31  	}
    32  
    33  	// Start write index at the value before 0
    34  	// to allow the first write to use AddUint64
    35  	// and still have a beginning index of 0
    36  	d.writeIndex = ^d.writeIndex
    37  	return d
    38  }
    39  
    40  // Set sets the data in the next slot of the ring buffer.
    41  func (d *ManyToOne) Set(data GenericDataType) {
    42  	for {
    43  		writeIndex := atomic.AddUint64(&d.writeIndex, 1)
    44  		idx := writeIndex % uint64(len(d.buffer))
    45  		old := atomic.LoadPointer(&d.buffer[idx])
    46  
    47  		if old != nil &&
    48  			(*bucket)(old) != nil &&
    49  			(*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) {
    50  			log.Println("Diode set collision: consider using a larger diode")
    51  			continue
    52  		}
    53  
    54  		newBucket := &bucket{
    55  			data: data,
    56  			seq:  writeIndex,
    57  		}
    58  
    59  		if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) {
    60  			log.Println("Diode set collision: consider using a larger diode")
    61  			continue
    62  		}
    63  
    64  		return
    65  	}
    66  }
    67  
    68  // TryNext will attempt to read from the next slot of the ring buffer.
    69  // If there is no data available, it will return (nil, false).
    70  func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) {
    71  	// Read a value from the ring buffer based on the readIndex.
    72  	idx := d.readIndex % uint64(len(d.buffer))
    73  	result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil))
    74  
    75  	// When the result is nil that means the writer has not had the
    76  	// opportunity to write a value into the diode. This value must be ignored
    77  	// and the read head must not increment.
    78  	if result == nil {
    79  		return nil, false
    80  	}
    81  
    82  	// When the seq value is less than the current read index that means a
    83  	// value was read from idx that was previously written but since has
    84  	// been dropped. This value must be ignored and the read head must not
    85  	// increment.
    86  	//
    87  	// The simulation for this scenario assumes the fast forward occurred as
    88  	// detailed below.
    89  	//
    90  	// 5. The reader reads again getting seq 5. It then reads again expecting
    91  	//    seq 6 but gets seq 2. This is a read of a stale value that was
    92  	//    effectively "dropped" so the read fails and the read head stays put.
    93  	//    `| 4 | 5 | 2 | 3 |` r: 7, w: 6
    94  	//
    95  	if result.seq < d.readIndex {
    96  		return nil, false
    97  	}
    98  
    99  	// When the seq value is greater than the current read index that means a
   100  	// value was read from idx that overwrote the value that was expected to
   101  	// be at this idx. This happens when the writer has lapped the reader. The
   102  	// reader needs to catch up to the writer so it moves its write head to
   103  	// the new seq, effectively dropping the messages that were not read in
   104  	// between the two values.
   105  	//
   106  	// Here is a simulation of this scenario:
   107  	//
   108  	// 1. Both the read and write heads start at 0.
   109  	//    `| nil | nil | nil | nil |` r: 0, w: 0
   110  	// 2. The writer fills the buffer.
   111  	//    `| 0 | 1 | 2 | 3 |` r: 0, w: 4
   112  	// 3. The writer laps the read head.
   113  	//    `| 4 | 5 | 2 | 3 |` r: 0, w: 6
   114  	// 4. The reader reads the first value, expecting a seq of 0 but reads 4,
   115  	//    this forces the reader to fast forward to 5.
   116  	//    `| 4 | 5 | 2 | 3 |` r: 5, w: 6
   117  	//
   118  	if result.seq > d.readIndex {
   119  		dropped := result.seq - d.readIndex
   120  		d.readIndex = result.seq
   121  		d.alerter.Alert(int(dropped))
   122  	}
   123  
   124  	// Only increment read index if a regular read occurred (where seq was
   125  	// equal to readIndex) or a value was read that caused a fast forward
   126  	// (where seq was greater than readIndex).
   127  	//
   128  	d.readIndex++
   129  	return result.data, true
   130  }
   131  

View as plain text