package cushion import ( "context" "fmt" "testing" "time" "cloud.google.com/go/pubsub" "github.com/go-kivik/kivik/v4" "github.com/stretchr/testify/assert" "edge-infra.dev/pkg/edge/chariot" ) type FakeBatchProcessor struct { Buckets [][]*Message MessageCount int } func (f *FakeBatchProcessor) BatchProcess(_ context.Context, _ *kivik.DB, msgs ...*Message) { f.Buckets = append(f.Buckets, msgs) f.MessageCount += len(msgs) } func TestDBMessageBufferBySize(t *testing.T) { f := &FakeBatchProcessor{Buckets: make([][]*Message, 0)} ctx, shutDown := context.WithCancel(context.Background()) totalMessages := 20 bufferSize := 5 waitUntil := 1 * time.Minute // ensure messages are process by buffer size instead of wait time mb := NewMessageBuffer(ctx, f, &kivik.DB{}, bufferSize, waitUntil) for i := 0; i < totalMessages; i++ { id := fmt.Sprint(i) mb.Add(&Message{ Req: &Request{ TenantID: "test", DBName: "test", EntityID: id, }, Msg: chariot.NewPubSubMessageFromMessage(&pubsub.Message{ ID: id, Attributes: map[string]string{ "tenant_id": "test", "db_name": "test", "entity_id": id, }, Data: []byte(id), })}) } assert.Eventually(t, func() bool { return f.MessageCount == totalMessages }, 2*time.Second, 10*time.Millisecond, "Message Count: %d", f.MessageCount) // batch of 20/5 = 4 buckets assert.Len(t, f.Buckets, totalMessages/bufferSize) // first item at first position assert.Equal(t, f.Buckets[0][0].Req.EntityID, "0") // last item at last position assert.Equal(t, f.Buckets[3][4].Req.EntityID, "19") shutDown() } func TestDBMessageBufferByWaitTime(t *testing.T) { f := &FakeBatchProcessor{Buckets: make([][]*Message, 0)} ctx, shutDown := context.WithCancel(context.Background()) totalMessages := 20 bufferSize := 5 waitUntil := 1 * time.Microsecond messageRate := 2 * time.Microsecond mb := NewMessageBuffer(ctx, f, &kivik.DB{}, bufferSize, waitUntil) for i := 0; i < totalMessages; i++ { id := fmt.Sprint(i) mb.Add(&Message{ Req: &Request{ TenantID: "test", DBName: "test", EntityID: id, }, Msg: chariot.NewPubSubMessageFromMessage(&pubsub.Message{ ID: id, Attributes: map[string]string{ "tenant_id": "test", "db_name": "test", "entity_id": id, }, Data: []byte(id), })}) time.Sleep(messageRate) } assert.Eventually(t, func() bool { return f.MessageCount == totalMessages }, 2*time.Second, 10*time.Millisecond, "Message Count: %d", f.MessageCount) // first item at first position assert.Equal(t, f.Buckets[0][0].Req.EntityID, "0") // last item at last position l := len(f.Buckets) elemLen := len(f.Buckets[l-1]) assert.Equal(t, f.Buckets[l-1][elemLen-1].Req.EntityID, "19") shutDown() }