...

Source file src/edge-infra.dev/pkg/edge/datasync/chirp/chirp_test.go

Documentation: edge-infra.dev/pkg/edge/datasync/chirp

     1  package chirp
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"encoding/base64"
     7  	"encoding/json"
     8  	"fmt"
     9  	"io"
    10  	"net/http"
    11  	"os"
    12  	"testing"
    13  	"time"
    14  
    15  	"edge-infra.dev/pkg/edge/datasync/kafkaclient"
    16  
    17  	"github.com/google/uuid"
    18  	"github.com/stretchr/testify/assert"
    19  	"github.com/stretchr/testify/require"
    20  )
    21  
    22  const (
    23  	bigFile = ""
    24  	num     = 1000
    25  	// k port-forward -n data-sync-messaging data-sync-messaging-0 3000:80
    26  	msgURL = "http://localhost:3000/send-message"
    27  	topic  = "chat-room"
    28  )
    29  
    30  var (
    31  	defaultConfig = &kafkaclient.Config{
    32  		// k port-forward -n redpanda redpanda-0 9092:9092
    33  		// telepresence intercept --namespace redpanda redpanda --local-only
    34  		Brokers: []string{"localhost:9092"},
    35  
    36  		Topic: topic,
    37  
    38  		MessageMaxBytes: 10485760,
    39  
    40  		GroupID:            "data-sync___",
    41  		BulkSize:           100,
    42  		ReadTimeout:        5 * time.Second,
    43  		ReadWindowTimeout:  10 * time.Second,
    44  		ReturnOnEmptyFetch: true,
    45  	}
    46  )
    47  
    48  func TestSendLarge(t *testing.T) {
    49  	payload, err := os.ReadFile(bigFile)
    50  	if err != nil {
    51  		t.Skip("no file found for testing")
    52  	}
    53  
    54  	data := map[string]string{
    55  		"Type":    topic,
    56  		"Payload": base64.StdEncoding.EncodeToString(payload),
    57  	}
    58  
    59  	for i := 0; i < num; i++ {
    60  		data["ID"] = uuid.NewString()
    61  		body, err := json.Marshal(data)
    62  		assert.NoError(t, err)
    63  
    64  		req, err := http.NewRequest(http.MethodPost, msgURL, bytes.NewReader(body))
    65  		assert.NoError(t, err)
    66  		req.Header.Add("Content-Type", "application/json")
    67  		req.Header.Add("Accept", "application/json")
    68  		resp, err := http.DefaultClient.Do(req)
    69  		assert.NoError(t, err)
    70  
    71  		jsonResp := make(map[string]interface{})
    72  		respData, err := io.ReadAll(resp.Body)
    73  		assert.NoError(t, err)
    74  		err = json.Unmarshal(respData, &jsonResp)
    75  		assert.NoError(t, err)
    76  		fmt.Println("count", i+1, "response", jsonResp)
    77  		assert.False(t, jsonResp["IsError"].(bool))
    78  	}
    79  }
    80  
    81  func TestRedPandaBigFileSettings(t *testing.T) {
    82  	payload, err := os.ReadFile(bigFile)
    83  	if err != nil {
    84  		t.Skip("no file found for testing")
    85  	}
    86  
    87  	// rpk cluster config set kafka_batch_max_bytes 10485760
    88  	// rpk cluster config get kafka_batch_max_bytes 10485760
    89  	ctx := context.Background()
    90  
    91  	admin, err := kafkaclient.NewAdmin(defaultConfig)
    92  	require.NoError(t, err)
    93  
    94  	err = admin.EnsureTopic(ctx, topic)
    95  	require.NoError(t, err)
    96  
    97  	producer, err := kafkaclient.NewProducer(defaultConfig)
    98  	require.NoError(t, err)
    99  
   100  	consumer, err := kafkaclient.NewConsumer(defaultConfig)
   101  	require.NoError(t, err)
   102  
   103  	err = producer.Ping(ctx)
   104  	require.NoError(t, err)
   105  
   106  	err = consumer.Ping(ctx)
   107  	require.NoError(t, err)
   108  
   109  	err = producer.Produce(ctx, buildRecords(num, payload)...)
   110  	require.NoError(t, err)
   111  
   112  	wait, cancel := context.WithTimeout(ctx, 30*time.Second)
   113  	defer cancel()
   114  	count := 0
   115  	for {
   116  		records, err := consumer.Read(ctx)
   117  		require.NoError(t, err)
   118  		select {
   119  		case <-wait.Done():
   120  			assert.Equal(t, num, count)
   121  			return
   122  		default:
   123  			require.True(t, len(records) > 0)
   124  			for _, record := range records {
   125  				assert.Len(t, record.Headers, 9)
   126  				assert.Equal(t, len(payload), len(record.Value))
   127  				err = consumer.Ack(ctx, record)
   128  				assert.NoError(t, err)
   129  				count++
   130  			}
   131  		}
   132  	}
   133  }
   134  
   135  func buildRecords(num int, payload []byte) []*kafkaclient.Record {
   136  	r := make([]*kafkaclient.Record, num)
   137  	for i := 0; i < num; i++ {
   138  		id := uuid.NewString()
   139  		r[i] = &kafkaclient.Record{
   140  			Topic: topic,
   141  			//Partition: Not Set,
   142  			Headers: []kafkaclient.Header{
   143  				{Key: "id", Value: []byte(id)},
   144  				{Key: "type", Value: []byte("system-status")},
   145  				{Key: "organization", Value: []byte("mailmen")},
   146  				{Key: "organization_name", Value: []byte("mailmen")},
   147  				{Key: "organization_id", Value: []byte("9b387a16540e442a8223e739ba83dbbc")},
   148  				{Key: "site_id", Value: []byte("ecc4593801264c50af25d6856459152c")},
   149  				{Key: "site_name", Value: []byte("k3d-jan16-9")},
   150  				{Key: "signature", Value: []byte("")},
   151  				{Key: "created", Value: []byte(time.Now().String())},
   152  			},
   153  			Value: payload,
   154  		}
   155  	}
   156  	return r
   157  }
   158  

View as plain text