1 package couchctl
2
3 import (
4 "context"
5 "strings"
6 "testing"
7
8 "github.com/go-logr/logr"
9 "github.com/google/uuid"
10
11 "gotest.tools/v3/assert"
12 "gotest.tools/v3/poll"
13
14 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
15 "edge-infra.dev/pkg/lib/fog"
16
17 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
18 "k8s.io/client-go/util/workqueue"
19
20 "sigs.k8s.io/controller-runtime/pkg/controller/controllertest"
21 )
22
23 var (
24 username = "admin"
25 password = "password"
26 couchURL = "http://localhost:5984"
27 _ Changes = (*ChangesIter)(nil)
28 )
29
30 type ChangesResults struct {
31 Results []ChangesResult `json:"results"`
32 }
33
34 type ChangesResult struct {
35 Seq string `json:"seq"`
36 ID string `json:"id"`
37 Changes []struct {
38 Rev string `json:"rev"`
39 } `json:"changes"`
40 Doc map[string]string `json:"doc"`
41 }
42
43 type ChangesIter struct {
44 changes ChangesResults
45 index int
46 err error
47 }
48
49 func NewChangesIter(results ...ChangesResult) *ChangesIter {
50 return &ChangesIter{
51 changes: ChangesResults{
52 Results: results,
53 },
54 index: -1,
55 }
56 }
57
58 func (f *ChangesIter) Next() bool {
59 f.index++
60 if len(f.changes.Results) == 0 ||
61 f.index >= len(f.changes.Results) {
62 return false
63 }
64 return true
65 }
66
67 func (f *ChangesIter) Err() error {
68 return f.err
69 }
70
71 func (f *ChangesIter) ID() string {
72 return f.changes.Results[f.index].ID
73 }
74
75 func (f *ChangesIter) Changes() []string {
76 changes := f.changes.Results[f.index].Changes
77 results := make([]string, len(changes))
78 for i, c := range changes {
79 results[i] = c.Rev
80 }
81 return results
82 }
83
84 func (f *ChangesIter) AsChangesFunc() ChangesFunc {
85 return func(_ context.Context, _, _, _ string) (Changes, error) {
86 return f, nil
87 }
88 }
89
90 func TestReplicationEvent(t *testing.T) {
91 cfg := Config{BannerEdgeID: uuid.NewString()}
92
93 ctx, cancel := context.WithCancel(context.Background())
94 ctx = logr.NewContext(ctx, fog.New())
95 q := &controllertest.Queue{Interface: workqueue.New()}
96
97 re := NewReplicationEvent(&cfg)
98 re.changesFunc = NewChangesIter(mockChangesResults()...).AsChangesFunc()
99
100 err := re.Start(ctx, q)
101 assert.NilError(t, err, "failed to start replication event")
102
103 assert.Equal(t, q.Len(), 0, "queue should be empty before listen is called")
104
105 repl1 := &dsapi.CouchDBReplicationSet{ObjectMeta: metav1.ObjectMeta{Name: "test1", Namespace: "test"}}
106 err = re.Listen(repl1, username, password, couchURL)
107 assert.NilError(t, err, "replication changes failed")
108
109 poll.WaitOn(t, func(_ poll.LogT) poll.Result {
110 re.Lock()
111 defer re.Unlock()
112 l := len(re.cache)
113 if l == 1 {
114 return poll.Success()
115 }
116 return poll.Continue("invalid queue length: %d, wanted 1", l)
117 })
118 assert.Equal(t, re.cache[repl1.Namespace+"/"+repl1.Name], username+":"+password+"@"+couchURL)
119
120 re.changesFunc = NewChangesIter(mockChangesResults()...).AsChangesFunc()
121 repl2 := &dsapi.CouchDBReplicationSet{ObjectMeta: metav1.ObjectMeta{Name: "test2", Namespace: "test"}}
122 err = re.Listen(repl2, username, password, couchURL)
123 assert.NilError(t, err, "failed to listen to replication changes")
124
125 poll.WaitOn(t, func(_ poll.LogT) poll.Result {
126 re.Lock()
127 defer re.Unlock()
128 l := len(re.cache)
129 if l == 2 {
130 return poll.Success()
131 }
132 return poll.Continue("invalid queue length: %d, wanted 2", l)
133 })
134 assert.Equal(t, re.cache[repl2.Namespace+"/"+repl2.Name], username+":"+password+"@"+couchURL)
135
136 poll.WaitOn(t, func(_ poll.LogT) poll.Result {
137 if q.Len() == 2 {
138 return poll.Success()
139 }
140 return poll.Continue("queue length should be 2. length: %d", q.Len())
141 })
142
143 cancel()
144 poll.WaitOn(t, func(_ poll.LogT) poll.Result {
145 re.Lock()
146 defer re.Unlock()
147 l := len(re.cache)
148 if l == 0 {
149 return poll.Success()
150 }
151 return poll.Continue("on cancel, cache should be empty. length: %d", l)
152 })
153 }
154
155 func mockChangesResults() []ChangesResult {
156 var results []ChangesResult
157
158 uid := strings.Replace(uuid.NewString(), "-", "", -1)
159 results = append(results, ChangesResult{
160 Seq: "1",
161 ID: "repl_doc",
162 Changes: []struct {
163 Rev string `json:"rev"`
164 }{
165 {Rev: "1-" + uid},
166 },
167 Doc: map[string]string{"_id": "repl_doc", "_rev": "1-" + uid},
168 })
169
170 uid = strings.Replace(uuid.NewString(), "-", "", -1)
171 results = append(results, ChangesResult{
172 Seq: "2",
173 ID: "repl_doc",
174 Changes: []struct {
175 Rev string `json:"rev"`
176 }{
177 {Rev: "2-" + uid},
178 },
179 Doc: map[string]string{"_id": "repl_doc", "_rev": "2-" + uid},
180 })
181
182 return results
183 }
184
View as plain text