...
1 package cushion
2
3 import (
4 "context"
5 "time"
6
7 "github.com/go-kivik/kivik/v4"
8 )
9
10 type BatchProcessor interface {
11 BatchProcess(ctx context.Context, database *kivik.DB, msgs ...*Message)
12 }
13
14
15 type MessageBuffer struct {
16 DB *kivik.DB
17
18 bp BatchProcessor
19
20 buffer chan *Message
21 bufferSize int
22 waitUntil time.Duration
23 shutDownCtx context.Context
24 cancel context.CancelFunc
25 deleted bool
26 }
27
28 func NewMessageBuffer(ctx context.Context, bp BatchProcessor, db *kivik.DB, bufferSize int, waitUntil time.Duration) *MessageBuffer {
29 ctx, cancel := context.WithCancel(ctx)
30 mb := &MessageBuffer{
31 shutDownCtx: ctx,
32 cancel: cancel,
33 bp: bp,
34 DB: db,
35 bufferSize: bufferSize,
36 buffer: make(chan *Message, bufferSize),
37 waitUntil: waitUntil,
38 }
39 go mb.run()
40 return mb
41 }
42
43
44 func (m *MessageBuffer) Add(msg *Message) {
45 m.buffer <- msg
46 }
47
48
49 func (m *MessageBuffer) run() {
50 buffer := make([]*Message, m.bufferSize)
51 for {
52 clear(buffer)
53 maxWaitTimer := time.NewTimer(m.waitUntil)
54 select {
55 case <-m.shutDownCtx.Done():
56 m.done()
57 return
58 default:
59 msgInBuffer := 0
60 outer:
61 for i := 0; i < m.bufferSize; i++ {
62 select {
63 case <-m.shutDownCtx.Done():
64 m.done()
65 return
66 case <-maxWaitTimer.C:
67 break outer
68 case msg, ok := <-m.buffer:
69 if ok && msg != nil {
70 buffer[msgInBuffer] = msg
71 msgInBuffer++
72 }
73 }
74 }
75
76 if msgInBuffer > 0 {
77 bufferCopy := make([]*Message, msgInBuffer)
78 copy(bufferCopy, buffer)
79 m.bp.BatchProcess(m.shutDownCtx, m.DB, bufferCopy...)
80 }
81 }
82 }
83 }
84
85 func (m *MessageBuffer) done() {
86 close(m.buffer)
87 for msg := range m.buffer {
88 if msg != nil {
89 msg.Nack()
90 }
91 }
92 }
93
94 func (m *MessageBuffer) Stop() {
95 m.deleted = true
96 m.cancel()
97 }
98
View as plain text