1 package cloudtoedge
2
3 import (
4 "encoding/base64"
5 "fmt"
6 "net/http"
7 "strings"
8 "testing"
9
10 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
11 "edge-infra.dev/pkg/edge/datasync/couchdb"
12 "edge-infra.dev/pkg/k8s/testing/kmp"
13 "edge-infra.dev/test/f2"
14 "edge-infra.dev/test/f2/x/ktest"
15
16 "cloud.google.com/go/pubsub"
17
18 "gotest.tools/v3/assert"
19 "gotest.tools/v3/assert/cmp"
20 "gotest.tools/v3/poll"
21
22 corev1 "k8s.io/api/core/v1"
23 "sigs.k8s.io/controller-runtime/pkg/client"
24 )
25
26 func TestSendMessage(t *testing.T) {
27 feature := f2.NewFeature("Datasync Message E2C").
28 Test("Publish Message To Datasync", func(ctx f2.Context, t *testing.T) f2.Context {
29 result := pubSubClient.Topic(pubSubTopic).Publish(ctx, &pubsub.Message{
30 Attributes: map[string]string{
31 "tenant_id": bslInfo.OrganizationID,
32 "db_name": dbName,
33 "entity_id": ctx.RunID,
34 "entity_type": "json",
35 },
36 Data: tlog,
37 })
38
39 msgID, err := result.Get(ctx)
40 assert.NilError(t, err, "fail to send C2E message to pub/sub")
41 t.Log("Sent pub/sub message Id:", msgID)
42 return ctx
43 }).
44 Test("Verify Message In CouchDB Master", func(ctx f2.Context, t *testing.T) f2.Context {
45 k := ktest.FromContextT(ctx, t)
46 secret := corev1.Secret{}
47 key := client.ObjectKey{Namespace: "couchctl", Name: couchdb.StoreReplicationSecretName}
48 err := k.Client.Get(ctx, key, &secret)
49 assert.NilError(t, err, "fail to get couchdb master secret")
50
51 req, err := couchDBRequest(secret, string(secret.Data["uri"]), dbName, ctx.RunID)
52 assert.NilError(t, err, "fail to create couchdb request")
53
54 var resp *http.Response
55 poll.WaitOn(t, func(_ poll.LogT) poll.Result {
56 resp, err = http.DefaultClient.Do(req)
57 if err == nil && resp.StatusCode == 200 {
58 return poll.Success()
59 }
60 return poll.Continue("fail to get doc from couchdb master: %s", errMessage(resp, req, err))
61 }, poll.WithDelay(k.Tick), poll.WithTimeout(k.Timeout*timeoutMultiplier))
62 return ctx
63 }).
64 Test("Verify CouchDBServer in ready state", func(ctx f2.Context, t *testing.T) f2.Context {
65 k := ktest.FromContextT(ctx, t)
66 servers := dsapi.CouchDBServerList{}
67 err := k.Client.List(ctx, &servers, client.InNamespace(couchNamespace))
68 assert.NilError(t, err, "fail to list couchdb servers")
69 assert.Check(t, cmp.Len(servers.Items, len(nodes.Items)), "number of couchdb servers not equal to number of nodes")
70
71 for _, server := range servers.Items {
72 server := server
73 k.WaitOn(t, k.Check(&server, kmp.IsReady()))
74 }
75 return ctx
76 }).
77 Test("Verify CouchDBReplicationSet in ready state", func(ctx f2.Context, t *testing.T) f2.Context {
78 k := ktest.FromContextT(ctx, t)
79 replications := dsapi.CouchDBReplicationSetList{}
80 err := k.Client.List(ctx, &replications, client.InNamespace(couchNamespace))
81 assert.NilError(t, err, "fail to list couchdb replications")
82 assert.Check(t, cmp.Len(replications.Items, len(nodes.Items)), "number of couchdb replications not equal to number of nodes")
83
84 for _, replication := range replications.Items {
85 repl := replication
86 k.WaitOn(t, k.Check(&repl, kmp.IsReady()))
87 }
88 return ctx
89 }).
90 Test("Verify Message In CouchDB Store", func(ctx f2.Context, t *testing.T) f2.Context {
91 k := ktest.FromContextT(ctx, t)
92 secret := corev1.Secret{}
93 key := client.ObjectKey{Namespace: couchNamespace, Name: couchdb.StoreSecretName}
94
95 err := k.Client.Get(ctx, key, &secret)
96 assert.NilError(t, err, "fail to get couchdb store secret")
97
98 servers := dsapi.CouchDBServerList{}
99 assert.NilError(t, k.Client.List(ctx, &servers), "fail to list couchdb servers")
100
101 for _, server := range servers.Items {
102 pod := strings.Split(server.Spec.URI, ".")[0]
103 portForward := portMapping[pod]
104 if portForward == nil {
105 t.Errorf("portForward is nil for pod %s", pod)
106 continue
107 }
108 addr := portForward.Retrieve(t)
109 req, err := couchDBRequest(secret, fmt.Sprintf("http://%s", addr), dbName, ctx.RunID)
110 assert.NilError(t, err, "fail to create couchdb request")
111
112 var resp *http.Response
113 poll.WaitOn(t, func(_ poll.LogT) poll.Result {
114 resp, err = http.DefaultClient.Do(req)
115 if err == nil && resp.StatusCode == 200 {
116 return poll.Success()
117 }
118 return poll.Continue("fail to get doc from couchdb Store: %s", errMessage(resp, req, err))
119 }, poll.WithDelay(k.Tick), poll.WithTimeout(k.Timeout*timeoutMultiplier))
120 }
121
122 return ctx
123 }).
124 Feature()
125 f.Test(t, feature)
126 }
127
128 func couchDBRequest(s corev1.Secret, url, db, docID string) (*http.Request, error) {
129 docURL := fmt.Sprintf("%s/%s/%s", url, db, docID)
130
131 req, err := http.NewRequest(http.MethodGet, docURL, nil)
132 if err != nil {
133 return nil, err
134 }
135
136 req.Header.Add("Content-Type", "application/json")
137 req.Header.Add("Accept", "application/json")
138
139 auth := fmt.Sprintf("%s:%s", string(s.Data["username"]), string(s.Data["password"]))
140 token := base64.StdEncoding.EncodeToString([]byte(auth))
141
142 req.Header.Add("Authorization", fmt.Sprintf("Basic %s", token))
143 return req, err
144 }
145
146 func errMessage(resp *http.Response, req *http.Request, err error) string {
147 b := strings.Builder{}
148
149 if err != nil {
150 b.WriteString(fmt.Sprintf("error: %s", err))
151 }
152
153 if resp != nil {
154 b.WriteString(fmt.Sprintf("status code: %d", resp.StatusCode))
155 b.WriteString(fmt.Sprintf("status: %s", resp.Status))
156 }
157
158 if req != nil {
159 b.WriteString(fmt.Sprintf("URL: %s", req.URL.String()))
160 }
161
162 return b.String()
163 }
164
View as plain text