...

Source file src/edge-infra.dev/pkg/edge/datasync/cushion/buffer_messages.go

Documentation: edge-infra.dev/pkg/edge/datasync/cushion

     1  package cushion
     2  
     3  import (
     4  	"context"
     5  	"time"
     6  
     7  	"github.com/go-kivik/kivik/v4"
     8  )
     9  
    10  type BatchProcessor interface {
    11  	BatchProcess(ctx context.Context, database *kivik.DB, msgs ...*Message)
    12  }
    13  
    14  // MessageBuffer buffers valid messages to be process in bulk for a single database
    15  type MessageBuffer struct {
    16  	DB *kivik.DB
    17  
    18  	bp BatchProcessor
    19  
    20  	buffer      chan *Message
    21  	bufferSize  int
    22  	waitUntil   time.Duration
    23  	shutDownCtx context.Context
    24  	cancel      context.CancelFunc
    25  	deleted     bool
    26  }
    27  
    28  func NewMessageBuffer(ctx context.Context, bp BatchProcessor, db *kivik.DB, bufferSize int, waitUntil time.Duration) *MessageBuffer {
    29  	ctx, cancel := context.WithCancel(ctx)
    30  	mb := &MessageBuffer{
    31  		shutDownCtx: ctx,
    32  		cancel:      cancel,
    33  		bp:          bp,
    34  		DB:          db,
    35  		bufferSize:  bufferSize,
    36  		buffer:      make(chan *Message, bufferSize),
    37  		waitUntil:   waitUntil,
    38  	}
    39  	go mb.run()
    40  	return mb
    41  }
    42  
    43  // Add appends a message to a channel to be processed in bulk
    44  func (m *MessageBuffer) Add(msg *Message) {
    45  	m.buffer <- msg
    46  }
    47  
    48  // run watch for the buffer to be full or for the maximum wait time to be exceeded
    49  func (m *MessageBuffer) run() {
    50  	buffer := make([]*Message, m.bufferSize)
    51  	for {
    52  		clear(buffer) // always clear before re-populating the buffer
    53  		maxWaitTimer := time.NewTimer(m.waitUntil)
    54  		select {
    55  		case <-m.shutDownCtx.Done():
    56  			m.done()
    57  			return
    58  		default:
    59  			msgInBuffer := 0
    60  		outer:
    61  			for i := 0; i < m.bufferSize; i++ {
    62  				select {
    63  				case <-m.shutDownCtx.Done():
    64  					m.done()
    65  					return
    66  				case <-maxWaitTimer.C:
    67  					break outer
    68  				case msg, ok := <-m.buffer:
    69  					if ok && msg != nil { // only valid messages in the batch
    70  						buffer[msgInBuffer] = msg
    71  						msgInBuffer++
    72  					}
    73  				}
    74  			}
    75  			// only call batch process if there is messages to be processed
    76  			if msgInBuffer > 0 {
    77  				bufferCopy := make([]*Message, msgInBuffer)
    78  				copy(bufferCopy, buffer)
    79  				m.bp.BatchProcess(m.shutDownCtx, m.DB, bufferCopy...)
    80  			}
    81  		}
    82  	}
    83  }
    84  
    85  func (m *MessageBuffer) done() {
    86  	close(m.buffer)
    87  	for msg := range m.buffer {
    88  		if msg != nil {
    89  			msg.Nack()
    90  		}
    91  	}
    92  }
    93  
    94  func (m *MessageBuffer) Stop() {
    95  	m.deleted = true
    96  	m.cancel()
    97  }
    98  

View as plain text