1 package chirp
2
3 import (
4 "bytes"
5 "context"
6 "encoding/base64"
7 "encoding/json"
8 "fmt"
9 "io"
10 "net/http"
11 "os"
12 "testing"
13 "time"
14
15 "edge-infra.dev/pkg/edge/datasync/kafkaclient"
16
17 "github.com/google/uuid"
18 "github.com/stretchr/testify/assert"
19 "github.com/stretchr/testify/require"
20 )
21
22 const (
23 bigFile = ""
24 num = 1000
25
26 msgURL = "http://localhost:3000/send-message"
27 topic = "chat-room"
28 )
29
30 var (
31 defaultConfig = &kafkaclient.Config{
32
33
34 Brokers: []string{"localhost:9092"},
35
36 Topic: topic,
37
38 MessageMaxBytes: 10485760,
39
40 GroupID: "data-sync___",
41 BulkSize: 100,
42 ReadTimeout: 5 * time.Second,
43 ReadWindowTimeout: 10 * time.Second,
44 ReturnOnEmptyFetch: true,
45 }
46 )
47
48 func TestSendLarge(t *testing.T) {
49 payload, err := os.ReadFile(bigFile)
50 if err != nil {
51 t.Skip("no file found for testing")
52 }
53
54 data := map[string]string{
55 "Type": topic,
56 "Payload": base64.StdEncoding.EncodeToString(payload),
57 }
58
59 for i := 0; i < num; i++ {
60 data["ID"] = uuid.NewString()
61 body, err := json.Marshal(data)
62 assert.NoError(t, err)
63
64 req, err := http.NewRequest(http.MethodPost, msgURL, bytes.NewReader(body))
65 assert.NoError(t, err)
66 req.Header.Add("Content-Type", "application/json")
67 req.Header.Add("Accept", "application/json")
68 resp, err := http.DefaultClient.Do(req)
69 assert.NoError(t, err)
70
71 jsonResp := make(map[string]interface{})
72 respData, err := io.ReadAll(resp.Body)
73 assert.NoError(t, err)
74 err = json.Unmarshal(respData, &jsonResp)
75 assert.NoError(t, err)
76 fmt.Println("count", i+1, "response", jsonResp)
77 assert.False(t, jsonResp["IsError"].(bool))
78 }
79 }
80
81 func TestRedPandaBigFileSettings(t *testing.T) {
82 payload, err := os.ReadFile(bigFile)
83 if err != nil {
84 t.Skip("no file found for testing")
85 }
86
87
88
89 ctx := context.Background()
90
91 admin, err := kafkaclient.NewAdmin(defaultConfig)
92 require.NoError(t, err)
93
94 err = admin.EnsureTopic(ctx, topic)
95 require.NoError(t, err)
96
97 producer, err := kafkaclient.NewProducer(defaultConfig)
98 require.NoError(t, err)
99
100 consumer, err := kafkaclient.NewConsumer(defaultConfig)
101 require.NoError(t, err)
102
103 err = producer.Ping(ctx)
104 require.NoError(t, err)
105
106 err = consumer.Ping(ctx)
107 require.NoError(t, err)
108
109 err = producer.Produce(ctx, buildRecords(num, payload)...)
110 require.NoError(t, err)
111
112 wait, cancel := context.WithTimeout(ctx, 30*time.Second)
113 defer cancel()
114 count := 0
115 for {
116 records, err := consumer.Read(ctx)
117 require.NoError(t, err)
118 select {
119 case <-wait.Done():
120 assert.Equal(t, num, count)
121 return
122 default:
123 require.True(t, len(records) > 0)
124 for _, record := range records {
125 assert.Len(t, record.Headers, 9)
126 assert.Equal(t, len(payload), len(record.Value))
127 err = consumer.Ack(ctx, record)
128 assert.NoError(t, err)
129 count++
130 }
131 }
132 }
133 }
134
135 func buildRecords(num int, payload []byte) []*kafkaclient.Record {
136 r := make([]*kafkaclient.Record, num)
137 for i := 0; i < num; i++ {
138 id := uuid.NewString()
139 r[i] = &kafkaclient.Record{
140 Topic: topic,
141
142 Headers: []kafkaclient.Header{
143 {Key: "id", Value: []byte(id)},
144 {Key: "type", Value: []byte("system-status")},
145 {Key: "organization", Value: []byte("mailmen")},
146 {Key: "organization_name", Value: []byte("mailmen")},
147 {Key: "organization_id", Value: []byte("9b387a16540e442a8223e739ba83dbbc")},
148 {Key: "site_id", Value: []byte("ecc4593801264c50af25d6856459152c")},
149 {Key: "site_name", Value: []byte("k3d-jan16-9")},
150 {Key: "signature", Value: []byte("")},
151 {Key: "created", Value: []byte(time.Now().String())},
152 },
153 Value: payload,
154 }
155 }
156 return r
157 }
158
View as plain text