...

Source file src/edge-infra.dev/pkg/edge/datasync/cushion/buffer_messages_test.go

Documentation: edge-infra.dev/pkg/edge/datasync/cushion

     1  package cushion
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"testing"
     7  	"time"
     8  
     9  	"cloud.google.com/go/pubsub"
    10  
    11  	"github.com/go-kivik/kivik/v4"
    12  	"github.com/stretchr/testify/assert"
    13  
    14  	"edge-infra.dev/pkg/edge/chariot"
    15  )
    16  
    17  type FakeBatchProcessor struct {
    18  	Buckets      [][]*Message
    19  	MessageCount int
    20  }
    21  
    22  func (f *FakeBatchProcessor) BatchProcess(_ context.Context, _ *kivik.DB, msgs ...*Message) {
    23  	f.Buckets = append(f.Buckets, msgs)
    24  	f.MessageCount += len(msgs)
    25  }
    26  
    27  func TestDBMessageBufferBySize(t *testing.T) {
    28  	f := &FakeBatchProcessor{Buckets: make([][]*Message, 0)}
    29  
    30  	ctx, shutDown := context.WithCancel(context.Background())
    31  
    32  	totalMessages := 20
    33  	bufferSize := 5
    34  	waitUntil := 1 * time.Minute // ensure messages are process by buffer size instead of wait time
    35  
    36  	mb := NewMessageBuffer(ctx, f, &kivik.DB{}, bufferSize, waitUntil)
    37  
    38  	for i := 0; i < totalMessages; i++ {
    39  		id := fmt.Sprint(i)
    40  		mb.Add(&Message{
    41  			Req: &Request{
    42  				TenantID: "test",
    43  				DBName:   "test",
    44  				EntityID: id,
    45  			},
    46  			Msg: chariot.NewPubSubMessageFromMessage(&pubsub.Message{
    47  				ID: id,
    48  				Attributes: map[string]string{
    49  					"tenant_id": "test",
    50  					"db_name":   "test",
    51  					"entity_id": id,
    52  				},
    53  				Data: []byte(id),
    54  			})})
    55  	}
    56  
    57  	assert.Eventually(t, func() bool {
    58  		return f.MessageCount == totalMessages
    59  	}, 2*time.Second, 10*time.Millisecond, "Message Count: %d", f.MessageCount)
    60  
    61  	// batch of 20/5 = 4 buckets
    62  	assert.Len(t, f.Buckets, totalMessages/bufferSize)
    63  
    64  	// first item at first position
    65  	assert.Equal(t, f.Buckets[0][0].Req.EntityID, "0")
    66  
    67  	// last item at last position
    68  	assert.Equal(t, f.Buckets[3][4].Req.EntityID, "19")
    69  
    70  	shutDown()
    71  }
    72  
    73  func TestDBMessageBufferByWaitTime(t *testing.T) {
    74  	f := &FakeBatchProcessor{Buckets: make([][]*Message, 0)}
    75  
    76  	ctx, shutDown := context.WithCancel(context.Background())
    77  
    78  	totalMessages := 20
    79  	bufferSize := 5
    80  
    81  	waitUntil := 1 * time.Microsecond
    82  	messageRate := 2 * time.Microsecond
    83  
    84  	mb := NewMessageBuffer(ctx, f, &kivik.DB{}, bufferSize, waitUntil)
    85  
    86  	for i := 0; i < totalMessages; i++ {
    87  		id := fmt.Sprint(i)
    88  		mb.Add(&Message{
    89  			Req: &Request{
    90  				TenantID: "test",
    91  				DBName:   "test",
    92  				EntityID: id,
    93  			},
    94  			Msg: chariot.NewPubSubMessageFromMessage(&pubsub.Message{
    95  				ID: id,
    96  				Attributes: map[string]string{
    97  					"tenant_id": "test",
    98  					"db_name":   "test",
    99  					"entity_id": id,
   100  				},
   101  				Data: []byte(id),
   102  			})})
   103  		time.Sleep(messageRate)
   104  	}
   105  
   106  	assert.Eventually(t, func() bool {
   107  		return f.MessageCount == totalMessages
   108  	}, 2*time.Second, 10*time.Millisecond, "Message Count: %d", f.MessageCount)
   109  
   110  	// first item at first position
   111  	assert.Equal(t, f.Buckets[0][0].Req.EntityID, "0")
   112  
   113  	// last item at last position
   114  	l := len(f.Buckets)
   115  	elemLen := len(f.Buckets[l-1])
   116  
   117  	assert.Equal(t, f.Buckets[l-1][elemLen-1].Req.EntityID, "19")
   118  
   119  	shutDown()
   120  }
   121  

View as plain text