1 package cushion
2
3 import (
4 "context"
5 "fmt"
6 "testing"
7 "time"
8
9 "cloud.google.com/go/pubsub"
10
11 "github.com/go-kivik/kivik/v4"
12 "github.com/stretchr/testify/assert"
13
14 "edge-infra.dev/pkg/edge/chariot"
15 )
16
17 type FakeBatchProcessor struct {
18 Buckets [][]*Message
19 MessageCount int
20 }
21
22 func (f *FakeBatchProcessor) BatchProcess(_ context.Context, _ *kivik.DB, msgs ...*Message) {
23 f.Buckets = append(f.Buckets, msgs)
24 f.MessageCount += len(msgs)
25 }
26
27 func TestDBMessageBufferBySize(t *testing.T) {
28 f := &FakeBatchProcessor{Buckets: make([][]*Message, 0)}
29
30 ctx, shutDown := context.WithCancel(context.Background())
31
32 totalMessages := 20
33 bufferSize := 5
34 waitUntil := 1 * time.Minute
35
36 mb := NewMessageBuffer(ctx, f, &kivik.DB{}, bufferSize, waitUntil)
37
38 for i := 0; i < totalMessages; i++ {
39 id := fmt.Sprint(i)
40 mb.Add(&Message{
41 Req: &Request{
42 TenantID: "test",
43 DBName: "test",
44 EntityID: id,
45 },
46 Msg: chariot.NewPubSubMessageFromMessage(&pubsub.Message{
47 ID: id,
48 Attributes: map[string]string{
49 "tenant_id": "test",
50 "db_name": "test",
51 "entity_id": id,
52 },
53 Data: []byte(id),
54 })})
55 }
56
57 assert.Eventually(t, func() bool {
58 return f.MessageCount == totalMessages
59 }, 2*time.Second, 10*time.Millisecond, "Message Count: %d", f.MessageCount)
60
61
62 assert.Len(t, f.Buckets, totalMessages/bufferSize)
63
64
65 assert.Equal(t, f.Buckets[0][0].Req.EntityID, "0")
66
67
68 assert.Equal(t, f.Buckets[3][4].Req.EntityID, "19")
69
70 shutDown()
71 }
72
73 func TestDBMessageBufferByWaitTime(t *testing.T) {
74 f := &FakeBatchProcessor{Buckets: make([][]*Message, 0)}
75
76 ctx, shutDown := context.WithCancel(context.Background())
77
78 totalMessages := 20
79 bufferSize := 5
80
81 waitUntil := 1 * time.Microsecond
82 messageRate := 2 * time.Microsecond
83
84 mb := NewMessageBuffer(ctx, f, &kivik.DB{}, bufferSize, waitUntil)
85
86 for i := 0; i < totalMessages; i++ {
87 id := fmt.Sprint(i)
88 mb.Add(&Message{
89 Req: &Request{
90 TenantID: "test",
91 DBName: "test",
92 EntityID: id,
93 },
94 Msg: chariot.NewPubSubMessageFromMessage(&pubsub.Message{
95 ID: id,
96 Attributes: map[string]string{
97 "tenant_id": "test",
98 "db_name": "test",
99 "entity_id": id,
100 },
101 Data: []byte(id),
102 })})
103 time.Sleep(messageRate)
104 }
105
106 assert.Eventually(t, func() bool {
107 return f.MessageCount == totalMessages
108 }, 2*time.Second, 10*time.Millisecond, "Message Count: %d", f.MessageCount)
109
110
111 assert.Equal(t, f.Buckets[0][0].Req.EntityID, "0")
112
113
114 l := len(f.Buckets)
115 elemLen := len(f.Buckets[l-1])
116
117 assert.Equal(t, f.Buckets[l-1][elemLen-1].Req.EntityID, "19")
118
119 shutDown()
120 }
121
View as plain text