1 package diodes 2 3 import ( 4 "sync/atomic" 5 "unsafe" 6 ) 7 8 // GenericDataType is the data type the diodes operate on. 9 type GenericDataType unsafe.Pointer 10 11 // Alerter is used to report how many values were overwritten since the 12 // last write. 13 type Alerter interface { 14 Alert(missed int) 15 } 16 17 // AlertFunc type is an adapter to allow the use of ordinary functions as 18 // Alert handlers. 19 type AlertFunc func(missed int) 20 21 // Alert calls f(missed) 22 func (f AlertFunc) Alert(missed int) { 23 f(missed) 24 } 25 26 type bucket struct { 27 data GenericDataType 28 seq uint64 // seq is the recorded write index at the time of writing 29 } 30 31 // OneToOne diode is meant to be used by a single reader and a single writer. 32 // It is not thread safe if used otherwise. 33 type OneToOne struct { 34 writeIndex uint64 35 readIndex uint64 36 buffer []unsafe.Pointer 37 alerter Alerter 38 } 39 40 // NewOneToOne creates a new diode is meant to be used by a single reader and 41 // a single writer. The alerter is invoked on the read's go-routine. It is 42 // called when it notices that the writer go-routine has passed it and wrote 43 // over data. A nil can be used to ignore alerts. 44 func NewOneToOne(size int, alerter Alerter) *OneToOne { 45 if alerter == nil { 46 alerter = AlertFunc(func(int) {}) 47 } 48 49 return &OneToOne{ 50 buffer: make([]unsafe.Pointer, size), 51 alerter: alerter, 52 } 53 } 54 55 // Set sets the data in the next slot of the ring buffer. 56 func (d *OneToOne) Set(data GenericDataType) { 57 idx := d.writeIndex % uint64(len(d.buffer)) 58 59 newBucket := &bucket{ 60 data: data, 61 seq: d.writeIndex, 62 } 63 d.writeIndex++ 64 65 atomic.StorePointer(&d.buffer[idx], unsafe.Pointer(newBucket)) 66 } 67 68 // TryNext will attempt to read from the next slot of the ring buffer. 69 // If there is no data available, it will return (nil, false). 70 func (d *OneToOne) TryNext() (data GenericDataType, ok bool) { 71 // Read a value from the ring buffer based on the readIndex. 72 idx := d.readIndex % uint64(len(d.buffer)) 73 result := (*bucket)(atomic.SwapPointer(&d.buffer[idx], nil)) 74 75 // When the result is nil that means the writer has not had the 76 // opportunity to write a value into the diode. This value must be ignored 77 // and the read head must not increment. 78 if result == nil { 79 return nil, false 80 } 81 82 // When the seq value is less than the current read index that means a 83 // value was read from idx that was previously written but since has 84 // been dropped. This value must be ignored and the read head must not 85 // increment. 86 // 87 // The simulation for this scenario assumes the fast forward occurred as 88 // detailed below. 89 // 90 // 5. The reader reads again getting seq 5. It then reads again expecting 91 // seq 6 but gets seq 2. This is a read of a stale value that was 92 // effectively "dropped" so the read fails and the read head stays put. 93 // `| 4 | 5 | 2 | 3 |` r: 7, w: 6 94 // 95 if result.seq < d.readIndex { 96 return nil, false 97 } 98 99 // When the seq value is greater than the current read index that means a 100 // value was read from idx that overwrote the value that was expected to 101 // be at this idx. This happens when the writer has lapped the reader. The 102 // reader needs to catch up to the writer so it moves its write head to 103 // the new seq, effectively dropping the messages that were not read in 104 // between the two values. 105 // 106 // Here is a simulation of this scenario: 107 // 108 // 1. Both the read and write heads start at 0. 109 // `| nil | nil | nil | nil |` r: 0, w: 0 110 // 2. The writer fills the buffer. 111 // `| 0 | 1 | 2 | 3 |` r: 0, w: 4 112 // 3. The writer laps the read head. 113 // `| 4 | 5 | 2 | 3 |` r: 0, w: 6 114 // 4. The reader reads the first value, expecting a seq of 0 but reads 4, 115 // this forces the reader to fast forward to 5. 116 // `| 4 | 5 | 2 | 3 |` r: 5, w: 6 117 // 118 if result.seq > d.readIndex { 119 dropped := result.seq - d.readIndex 120 d.readIndex = result.seq 121 d.alerter.Alert(int(dropped)) 122 } 123 124 // Only increment read index if a regular read occurred (where seq was 125 // equal to readIndex) or a value was read that caused a fast forward 126 // (where seq was greater than readIndex). 127 d.readIndex++ 128 return result.data, true 129 } 130