...

Source file src/edge-infra.dev/test/e2e/datasync/cloudtoedge/replication_test.go

Documentation: edge-infra.dev/test/e2e/datasync/cloudtoedge

     1  package cloudtoedge
     2  
     3  import (
     4  	"encoding/base64"
     5  	"fmt"
     6  	"net/http"
     7  	"strings"
     8  	"testing"
     9  
    10  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    11  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    12  	"edge-infra.dev/pkg/k8s/testing/kmp"
    13  	"edge-infra.dev/test/f2"
    14  	"edge-infra.dev/test/f2/x/ktest"
    15  
    16  	"cloud.google.com/go/pubsub"
    17  
    18  	"gotest.tools/v3/assert"
    19  	"gotest.tools/v3/assert/cmp"
    20  	"gotest.tools/v3/poll"
    21  
    22  	corev1 "k8s.io/api/core/v1"
    23  	"sigs.k8s.io/controller-runtime/pkg/client"
    24  )
    25  
    26  func TestSendMessage(t *testing.T) {
    27  	feature := f2.NewFeature("Datasync Message E2C").
    28  		Test("Publish Message To Datasync", func(ctx f2.Context, t *testing.T) f2.Context {
    29  			result := pubSubClient.Topic(pubSubTopic).Publish(ctx, &pubsub.Message{
    30  				Attributes: map[string]string{
    31  					"tenant_id":   bslInfo.OrganizationID,
    32  					"db_name":     dbName,
    33  					"entity_id":   ctx.RunID,
    34  					"entity_type": "json",
    35  				},
    36  				Data: tlog,
    37  			})
    38  
    39  			msgID, err := result.Get(ctx)
    40  			assert.NilError(t, err, "fail to send C2E message to pub/sub")
    41  			t.Log("Sent pub/sub message Id:", msgID)
    42  			return ctx
    43  		}).
    44  		Test("Verify Message In CouchDB Master", func(ctx f2.Context, t *testing.T) f2.Context {
    45  			k := ktest.FromContextT(ctx, t)
    46  			secret := corev1.Secret{}
    47  			key := client.ObjectKey{Namespace: "couchctl", Name: couchdb.StoreReplicationSecretName}
    48  			err := k.Client.Get(ctx, key, &secret)
    49  			assert.NilError(t, err, "fail to get couchdb master secret")
    50  
    51  			req, err := couchDBRequest(secret, string(secret.Data["uri"]), dbName, ctx.RunID)
    52  			assert.NilError(t, err, "fail to create couchdb request")
    53  
    54  			var resp *http.Response
    55  			poll.WaitOn(t, func(_ poll.LogT) poll.Result {
    56  				resp, err = http.DefaultClient.Do(req)
    57  				if err == nil && resp.StatusCode == 200 {
    58  					return poll.Success()
    59  				}
    60  				return poll.Continue("fail to get doc from couchdb master: %s", errMessage(resp, req, err))
    61  			}, poll.WithDelay(k.Tick), poll.WithTimeout(k.Timeout*timeoutMultiplier))
    62  			return ctx
    63  		}).
    64  		Test("Verify CouchDBServer in ready state", func(ctx f2.Context, t *testing.T) f2.Context {
    65  			k := ktest.FromContextT(ctx, t)
    66  			servers := dsapi.CouchDBServerList{}
    67  			err := k.Client.List(ctx, &servers, client.InNamespace(couchNamespace))
    68  			assert.NilError(t, err, "fail to list couchdb servers")
    69  			assert.Check(t, cmp.Len(servers.Items, len(nodes.Items)), "number of couchdb servers not equal to number of nodes")
    70  
    71  			for _, server := range servers.Items {
    72  				server := server
    73  				k.WaitOn(t, k.Check(&server, kmp.IsReady()))
    74  			}
    75  			return ctx
    76  		}).
    77  		Test("Verify CouchDBReplicationSet in ready state", func(ctx f2.Context, t *testing.T) f2.Context {
    78  			k := ktest.FromContextT(ctx, t)
    79  			replications := dsapi.CouchDBReplicationSetList{}
    80  			err := k.Client.List(ctx, &replications, client.InNamespace(couchNamespace))
    81  			assert.NilError(t, err, "fail to list couchdb replications")
    82  			assert.Check(t, cmp.Len(replications.Items, len(nodes.Items)), "number of couchdb replications not equal to number of nodes")
    83  
    84  			for _, replication := range replications.Items {
    85  				repl := replication
    86  				k.WaitOn(t, k.Check(&repl, kmp.IsReady()))
    87  			}
    88  			return ctx
    89  		}).
    90  		Test("Verify Message In CouchDB Store", func(ctx f2.Context, t *testing.T) f2.Context {
    91  			k := ktest.FromContextT(ctx, t)
    92  			secret := corev1.Secret{}
    93  			key := client.ObjectKey{Namespace: couchNamespace, Name: couchdb.StoreSecretName}
    94  
    95  			err := k.Client.Get(ctx, key, &secret)
    96  			assert.NilError(t, err, "fail to get couchdb store secret")
    97  
    98  			servers := dsapi.CouchDBServerList{}
    99  			assert.NilError(t, k.Client.List(ctx, &servers), "fail to list couchdb servers")
   100  
   101  			for _, server := range servers.Items {
   102  				pod := strings.Split(server.Spec.URI, ".")[0]
   103  				portForward := portMapping[pod]
   104  				if portForward == nil {
   105  					t.Errorf("portForward is nil for pod %s", pod)
   106  					continue
   107  				}
   108  				addr := portForward.Retrieve(t)
   109  				req, err := couchDBRequest(secret, fmt.Sprintf("http://%s", addr), dbName, ctx.RunID)
   110  				assert.NilError(t, err, "fail to create couchdb request")
   111  
   112  				var resp *http.Response
   113  				poll.WaitOn(t, func(_ poll.LogT) poll.Result {
   114  					resp, err = http.DefaultClient.Do(req)
   115  					if err == nil && resp.StatusCode == 200 {
   116  						return poll.Success()
   117  					}
   118  					return poll.Continue("fail to get doc from couchdb Store: %s", errMessage(resp, req, err))
   119  				}, poll.WithDelay(k.Tick), poll.WithTimeout(k.Timeout*timeoutMultiplier))
   120  			}
   121  
   122  			return ctx
   123  		}).
   124  		Feature()
   125  	f.Test(t, feature)
   126  }
   127  
   128  func couchDBRequest(s corev1.Secret, url, db, docID string) (*http.Request, error) {
   129  	docURL := fmt.Sprintf("%s/%s/%s", url, db, docID)
   130  
   131  	req, err := http.NewRequest(http.MethodGet, docURL, nil)
   132  	if err != nil {
   133  		return nil, err
   134  	}
   135  
   136  	req.Header.Add("Content-Type", "application/json")
   137  	req.Header.Add("Accept", "application/json")
   138  
   139  	auth := fmt.Sprintf("%s:%s", string(s.Data["username"]), string(s.Data["password"]))
   140  	token := base64.StdEncoding.EncodeToString([]byte(auth))
   141  
   142  	req.Header.Add("Authorization", fmt.Sprintf("Basic %s", token))
   143  	return req, err
   144  }
   145  
   146  func errMessage(resp *http.Response, req *http.Request, err error) string {
   147  	b := strings.Builder{}
   148  
   149  	if err != nil {
   150  		b.WriteString(fmt.Sprintf("error: %s", err))
   151  	}
   152  
   153  	if resp != nil {
   154  		b.WriteString(fmt.Sprintf("status code: %d", resp.StatusCode))
   155  		b.WriteString(fmt.Sprintf("status: %s", resp.Status))
   156  	}
   157  
   158  	if req != nil {
   159  		b.WriteString(fmt.Sprintf("URL: %s", req.URL.String()))
   160  	}
   161  
   162  	return b.String()
   163  }
   164  

View as plain text