package cushion import ( "context" "time" "github.com/go-kivik/kivik/v4" ) type BatchProcessor interface { BatchProcess(ctx context.Context, database *kivik.DB, msgs ...*Message) } // MessageBuffer buffers valid messages to be process in bulk for a single database type MessageBuffer struct { DB *kivik.DB bp BatchProcessor buffer chan *Message bufferSize int waitUntil time.Duration shutDownCtx context.Context cancel context.CancelFunc deleted bool } func NewMessageBuffer(ctx context.Context, bp BatchProcessor, db *kivik.DB, bufferSize int, waitUntil time.Duration) *MessageBuffer { ctx, cancel := context.WithCancel(ctx) mb := &MessageBuffer{ shutDownCtx: ctx, cancel: cancel, bp: bp, DB: db, bufferSize: bufferSize, buffer: make(chan *Message, bufferSize), waitUntil: waitUntil, } go mb.run() return mb } // Add appends a message to a channel to be processed in bulk func (m *MessageBuffer) Add(msg *Message) { m.buffer <- msg } // run watch for the buffer to be full or for the maximum wait time to be exceeded func (m *MessageBuffer) run() { buffer := make([]*Message, m.bufferSize) for { clear(buffer) // always clear before re-populating the buffer maxWaitTimer := time.NewTimer(m.waitUntil) select { case <-m.shutDownCtx.Done(): m.done() return default: msgInBuffer := 0 outer: for i := 0; i < m.bufferSize; i++ { select { case <-m.shutDownCtx.Done(): m.done() return case <-maxWaitTimer.C: break outer case msg, ok := <-m.buffer: if ok && msg != nil { // only valid messages in the batch buffer[msgInBuffer] = msg msgInBuffer++ } } } // only call batch process if there is messages to be processed if msgInBuffer > 0 { bufferCopy := make([]*Message, msgInBuffer) copy(bufferCopy, buffer) m.bp.BatchProcess(m.shutDownCtx, m.DB, bufferCopy...) } } } } func (m *MessageBuffer) done() { close(m.buffer) for msg := range m.buffer { if msg != nil { msg.Nack() } } } func (m *MessageBuffer) Stop() { m.deleted = true m.cancel() }