1 package diodes 2 3 import ( 4 "log" 5 "sync/atomic" 6 "unsafe" 7 ) 8 9 // ManyToOne diode is optimal for many writers (go-routines B-n) and a single 10 // reader (go-routine A). It is not thread safe for multiple readers. 11 type ManyToOne struct { 12 writeIndex uint64 13 readIndex uint64 14 buffer []unsafe.Pointer 15 alerter Alerter 16 } 17 18 // NewManyToOne creates a new diode (ring buffer). The ManyToOne diode 19 // is optimized for many writers (on go-routines B-n) and a single reader 20 // (on go-routine A). The alerter is invoked on the read's go-routine. It is 21 // called when it notices that the writer go-routine has passed it and wrote 22 // over data. A nil can be used to ignore alerts. 23 func NewManyToOne(size int, alerter Alerter) *ManyToOne { 24 if alerter == nil { 25 alerter = AlertFunc(func(int) {}) 26 } 27 28 d := &ManyToOne{ 29 buffer: make([]unsafe.Pointer, size), 30 alerter: alerter, 31 } 32 33 // Start write index at the value before 0 34 // to allow the first write to use AddUint64 35 // and still have a beginning index of 0 36 d.writeIndex = ^d.writeIndex 37 return d 38 } 39 40 // Set sets the data in the next slot of the ring buffer. 41 func (d *ManyToOne) Set(data GenericDataType) { 42 for { 43 writeIndex := atomic.AddUint64(&d.writeIndex, 1) 44 idx := writeIndex % uint64(len(d.buffer)) 45 old := atomic.LoadPointer(&d.buffer[idx]) 46 47 if old != nil && 48 (*bucket)(old) != nil && 49 (*bucket)(old).seq > writeIndex-uint64(len(d.buffer)) { 50 log.Println("Diode set collision: consider using a larger diode") 51 continue 52 } 53 54 newBucket := &bucket{ 55 data: data, 56 seq: writeIndex, 57 } 58 59 if !atomic.CompareAndSwapPointer(&d.buffer[idx], old, unsafe.Pointer(newBucket)) { 60 log.Println("Diode set collision: consider using a larger diode") 61 continue 62 } 63 64 return 65 } 66 } 67 68 // TryNext will attempt to read from the next slot of the ring buffer. 69 // If there is no data available, it will return (nil, false). 70 func (d *ManyToOne) TryNext() (data GenericDataType, ok bool) { 71 // Read a value from the ring buffer based on the readIndex. 72 idx := d.readIndex % uint64(len(d.buffer)) 73 result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil)) 74 75 // When the result is nil that means the writer has not had the 76 // opportunity to write a value into the diode. This value must be ignored 77 // and the read head must not increment. 78 if result == nil { 79 return nil, false 80 } 81 82 // When the seq value is less than the current read index that means a 83 // value was read from idx that was previously written but since has 84 // been dropped. This value must be ignored and the read head must not 85 // increment. 86 // 87 // The simulation for this scenario assumes the fast forward occurred as 88 // detailed below. 89 // 90 // 5. The reader reads again getting seq 5. It then reads again expecting 91 // seq 6 but gets seq 2. This is a read of a stale value that was 92 // effectively "dropped" so the read fails and the read head stays put. 93 // `| 4 | 5 | 2 | 3 |` r: 7, w: 6 94 // 95 if result.seq < d.readIndex { 96 return nil, false 97 } 98 99 // When the seq value is greater than the current read index that means a 100 // value was read from idx that overwrote the value that was expected to 101 // be at this idx. This happens when the writer has lapped the reader. The 102 // reader needs to catch up to the writer so it moves its write head to 103 // the new seq, effectively dropping the messages that were not read in 104 // between the two values. 105 // 106 // Here is a simulation of this scenario: 107 // 108 // 1. Both the read and write heads start at 0. 109 // `| nil | nil | nil | nil |` r: 0, w: 0 110 // 2. The writer fills the buffer. 111 // `| 0 | 1 | 2 | 3 |` r: 0, w: 4 112 // 3. The writer laps the read head. 113 // `| 4 | 5 | 2 | 3 |` r: 0, w: 6 114 // 4. The reader reads the first value, expecting a seq of 0 but reads 4, 115 // this forces the reader to fast forward to 5. 116 // `| 4 | 5 | 2 | 3 |` r: 5, w: 6 117 // 118 if result.seq > d.readIndex { 119 dropped := result.seq - d.readIndex 120 d.readIndex = result.seq 121 d.alerter.Alert(int(dropped)) 122 } 123 124 // Only increment read index if a regular read occurred (where seq was 125 // equal to readIndex) or a value was read that caused a fast forward 126 // (where seq was greater than readIndex). 127 // 128 d.readIndex++ 129 return result.data, true 130 } 131