package chirp import ( "bytes" "context" "encoding/base64" "encoding/json" "fmt" "io" "net/http" "os" "testing" "time" "edge-infra.dev/pkg/edge/datasync/kafkaclient" "github.com/google/uuid" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) const ( bigFile = "" num = 1000 // k port-forward -n data-sync-messaging data-sync-messaging-0 3000:80 msgURL = "http://localhost:3000/send-message" topic = "chat-room" ) var ( defaultConfig = &kafkaclient.Config{ // k port-forward -n redpanda redpanda-0 9092:9092 // telepresence intercept --namespace redpanda redpanda --local-only Brokers: []string{"localhost:9092"}, Topic: topic, MessageMaxBytes: 10485760, GroupID: "data-sync___", BulkSize: 100, ReadTimeout: 5 * time.Second, ReadWindowTimeout: 10 * time.Second, ReturnOnEmptyFetch: true, } ) func TestSendLarge(t *testing.T) { payload, err := os.ReadFile(bigFile) if err != nil { t.Skip("no file found for testing") } data := map[string]string{ "Type": topic, "Payload": base64.StdEncoding.EncodeToString(payload), } for i := 0; i < num; i++ { data["ID"] = uuid.NewString() body, err := json.Marshal(data) assert.NoError(t, err) req, err := http.NewRequest(http.MethodPost, msgURL, bytes.NewReader(body)) assert.NoError(t, err) req.Header.Add("Content-Type", "application/json") req.Header.Add("Accept", "application/json") resp, err := http.DefaultClient.Do(req) assert.NoError(t, err) jsonResp := make(map[string]interface{}) respData, err := io.ReadAll(resp.Body) assert.NoError(t, err) err = json.Unmarshal(respData, &jsonResp) assert.NoError(t, err) fmt.Println("count", i+1, "response", jsonResp) assert.False(t, jsonResp["IsError"].(bool)) } } func TestRedPandaBigFileSettings(t *testing.T) { payload, err := os.ReadFile(bigFile) if err != nil { t.Skip("no file found for testing") } // rpk cluster config set kafka_batch_max_bytes 10485760 // rpk cluster config get kafka_batch_max_bytes 10485760 ctx := context.Background() admin, err := kafkaclient.NewAdmin(defaultConfig) require.NoError(t, err) err = admin.EnsureTopic(ctx, topic) require.NoError(t, err) producer, err := kafkaclient.NewProducer(defaultConfig) require.NoError(t, err) consumer, err := kafkaclient.NewConsumer(defaultConfig) require.NoError(t, err) err = producer.Ping(ctx) require.NoError(t, err) err = consumer.Ping(ctx) require.NoError(t, err) err = producer.Produce(ctx, buildRecords(num, payload)...) require.NoError(t, err) wait, cancel := context.WithTimeout(ctx, 30*time.Second) defer cancel() count := 0 for { records, err := consumer.Read(ctx) require.NoError(t, err) select { case <-wait.Done(): assert.Equal(t, num, count) return default: require.True(t, len(records) > 0) for _, record := range records { assert.Len(t, record.Headers, 9) assert.Equal(t, len(payload), len(record.Value)) err = consumer.Ack(ctx, record) assert.NoError(t, err) count++ } } } } func buildRecords(num int, payload []byte) []*kafkaclient.Record { r := make([]*kafkaclient.Record, num) for i := 0; i < num; i++ { id := uuid.NewString() r[i] = &kafkaclient.Record{ Topic: topic, //Partition: Not Set, Headers: []kafkaclient.Header{ {Key: "id", Value: []byte(id)}, {Key: "type", Value: []byte("system-status")}, {Key: "organization", Value: []byte("mailmen")}, {Key: "organization_name", Value: []byte("mailmen")}, {Key: "organization_id", Value: []byte("9b387a16540e442a8223e739ba83dbbc")}, {Key: "site_id", Value: []byte("ecc4593801264c50af25d6856459152c")}, {Key: "site_name", Value: []byte("k3d-jan16-9")}, {Key: "signature", Value: []byte("")}, {Key: "created", Value: []byte(time.Now().String())}, }, Value: payload, } } return r }