1 package couchctl
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "net/url"
9 "sync"
10
11 "k8s.io/apimachinery/pkg/types"
12 "sigs.k8s.io/controller-runtime/pkg/client"
13
14 "edge-infra.dev/pkg/edge/api/testutils"
15 "edge-infra.dev/pkg/edge/api/utils"
16 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
17 "edge-infra.dev/pkg/edge/datasync/couchdb"
18 )
19
20 const (
21 db1 = "db-1"
22 _rev = "1-ebb9921e747510f296532e95a6a74d2e"
23 )
24
25 type MockCouchDBCluster struct {
26 sync.Mutex
27 *utils.MockHTTPTestServer
28 client client.Client
29 config *Config
30 server *dsapi.CouchDBServer
31 ctx context.Context
32 }
33
34
35
36 func NewMockCouchDBCluster(config *Config, client client.Client,
37 server *dsapi.CouchDBServer,
38 database *dsapi.CouchDBDatabase,
39 user *dsapi.CouchDBUser,
40 index *dsapi.CouchDBIndex,
41 ddoc *dsapi.CouchDBDesignDoc,
42 replication *dsapi.CouchDBReplicationSet) (*MockCouchDBCluster, error) {
43 testServer := utils.NewMockHTTPTestServer().AddAllowedContentType("application/json")
44
45 cluster := &MockCouchDBCluster{
46 MockHTTPTestServer: testServer,
47 client: client,
48 server: server,
49 config: config,
50 ctx: context.Background(),
51 }
52
53 cluster.mockCouchDBServer()
54 cluster.mockCouchDBDatabase(database, user)
55 cluster.mockCouchDBIndex(database, index)
56 cluster.mockCouchDBDesignDoc(database, ddoc)
57 cluster.mockCouchDBUser(server, user)
58 cluster.mockCouchDBReplicationSet(server, replication, user)
59 return cluster, cluster.validate()
60 }
61
62
63 func (c *MockCouchDBCluster) validate() error {
64 set := map[string]struct{}{}
65 for _, route := range c.Routes {
66 path := fmt.Sprintf("%s:%s", route.Method, route.Path)
67 if _, ok := set[path]; ok {
68 return fmt.Errorf("duplicated route found: %s", path)
69 }
70 set[path] = struct{}{}
71 }
72 return nil
73 }
74
75
76 func sourceURL(ctx context.Context, cl client.Client, replication *dsapi.CouchDBReplicationSet, dbname string) (string, error) {
77 replCreds, err := replicationSourceCredentials(ctx, cl, replication)
78 if err != nil {
79 return "", err
80 }
81 return buildDSNName(string(replCreds.Username), string(replCreds.Password), string(replCreds.URI), dbname)
82 }
83
84
85 func targetURL(ctx context.Context, cl client.Client, server *dsapi.CouchDBServer, dbname, port string) (string, error) {
86 creds, err := serverAdminCreds(ctx, cl, server)
87 if err != nil {
88 return "", err
89 }
90 uri := fmt.Sprintf("http://localhost:%s", port)
91 return buildDSNName(string(creds.Username), string(creds.Password), uri, dbname)
92 }
93
94 func (c *MockCouchDBCluster) NotFound() {
95 c.MockHTTPTestServer.AddNotFound(func(w http.ResponseWriter, r *http.Request) {
96 c.Lock()
97 defer c.Unlock()
98 fmt.Println("NotFound-------------", r.Method, r.URL, r.Header)
99 w.WriteHeader(404)
100 })
101 }
102 func (c *MockCouchDBCluster) mockCouchDBServer() {
103 c.Get("Check Cluster Setup", "/_cluster_setup", c.clusterSetUp, nil)
104 c.Post("Setup Cluster", "/_cluster_setup", c.clusterSetUp, nil)
105 c.Head("Ping Cluster", "/_up", c.emptyOk, nil)
106 }
107 func (c *MockCouchDBCluster) mockCouchDBDatabase(database *dsapi.CouchDBDatabase, user *dsapi.CouchDBUser) {
108 c.Put("Create DB", "/"+database.Name, c.emptyOk, nil)
109 c.Head("DB Exists", "/"+database.Name, c.emptyOk, nil)
110
111 c.Put("Create DB Auth", fmt.Sprintf("/%s/_design/auth", database.Name), c.emptyOk, nil)
112
113 c.Put("Create Security", fmt.Sprintf("/%s/_security", database.Name), c.emptyOk, nil)
114 c.Get("Security Exists", fmt.Sprintf("/%s/_security", database.Name), func(w http.ResponseWriter, _ *http.Request) {
115 security := couchdbSecurity(database)
116
117
118 security.Members.Names = append(security.Members.Names, user.Spec.User.Name)
119 c.ok(w, security)
120 }, nil)
121
122 c.Get("Security Exists for db", fmt.Sprintf("/%s/_security", db1), func(w http.ResponseWriter, _ *http.Request) {
123 security := couchdbSecurity(database)
124
125
126 security.Members.Names = append(security.Members.Names, user.Spec.User.Name)
127 c.ok(w, security)
128 }, nil)
129 }
130
131 func (c *MockCouchDBCluster) mockCouchDBUser(server *dsapi.CouchDBServer, user *dsapi.CouchDBUser) {
132 couchDBUserPath := "/_users/org.couchdb.user%3A" + user.Spec.User.Name
133 couchServerUserPath := "/_users/org.couchdb.user%3A" + server.Name
134
135 c.Get("Get CouchDB Test User", couchDBUserPath, func(w http.ResponseWriter, _ *http.Request) {
136 doc := c.mockUser(user.Spec.User.Name, user.Spec.User.Roles)
137 c.ok(w, &doc)
138 }, nil)
139
140 c.Get("Get CouchDB Server Test User", couchServerUserPath, func(w http.ResponseWriter, _ *http.Request) {
141 doc := c.mockUser(server.Name, []string{couchdb.ReplicationUser})
142 c.ok(w, &doc)
143 }, nil)
144
145 c.Put("Create User", couchDBUserPath, c.emptyOk, nil)
146 c.Put("Create Store Server User", couchServerUserPath, c.emptyOk, nil)
147 }
148 func (c *MockCouchDBCluster) mockUser(name string, roles []string) map[string]interface{} {
149 return map[string]interface{}{
150 "_id": fmt.Sprintf("_users/org.couchdb.user:%s", name),
151 "name": name,
152 "type": "user",
153 "roles": roles,
154 "password": "this is generated",
155 "_rev": _rev,
156 }
157 }
158
159 func (c *MockCouchDBCluster) mockCouchDBReplicationSet(server *dsapi.CouchDBServer, replSet *dsapi.CouchDBReplicationSet, user *dsapi.CouchDBUser) {
160 replDocDB := replSet.Spec.Datasets[0].Name
161 replDocPath := fmt.Sprintf("/%s/repl_doc", replDocDB)
162
163 provider := user.Spec.Provider
164
165 c.Head("Replication DB Exists", fmt.Sprintf("/%s", replDocDB), c.emptyOk, nil)
166 c.Head("Replication DB Exists DB1", fmt.Sprintf("/%s", db1), c.emptyOk, nil)
167
168 c.Head("HEAD: Replication Jobs", "/_scheduler/jobs", c.emptyOk, nil)
169
170 c.Get("Get Replication Doc", replDocPath, func(w http.ResponseWriter, _ *http.Request) {
171 doc := map[string]interface{}{
172 "_id": fmt.Sprintf("_users/org.couchdb.user:%s", user.Spec.User.Name),
173 "_rev": _rev,
174 "datasets": datasets(db1, provider.Name),
175 "providers": []dsapi.Provider{*provider},
176 }
177 c.ok(w, &doc)
178 }, nil)
179 c.Post("Create Replication", "/_replicator", c.emptyOk, nil)
180 c.Post("Get DBs Info", "/_dbs_info", func(w http.ResponseWriter, _ *http.Request) {
181 docs := []map[string]interface{}{
182 {
183 "key": replDocDB,
184 "info": map[string]interface{}{
185 "docs_read": 187,
186 "docs_written": 187,
187 "doc_write_failures": 0,
188 "changes_pending": 0,
189 },
190 "doc_count": 1,
191 },
192 }
193 c.ok(w, &docs)
194 }, nil)
195
196 c.Get("Get Replication Docs", "/_replicator/_all_docs?include_docs=true", func(w http.ResponseWriter, _ *http.Request) {
197 doc := map[string]interface{}{
198 "total_rows": 1,
199 "offset": 0,
200 "rows": []map[string]interface{}{
201 {
202 "id": replDocDB,
203 "key": replDocDB,
204 "value": map[string]string{
205 "rev": _rev,
206 },
207 "doc": c.replConfig(server, replSet, replDocDB),
208 },
209 {
210 "id": db1,
211 "key": db1,
212 "value": map[string]string{
213 "rev": _rev,
214 },
215 "doc": c.replConfig(server, replSet, db1),
216 },
217 },
218 }
219 c.ok(w, &doc)
220 }, nil)
221
222 c.Get("Get Scheduler Docs", "/_scheduler/docs", func(w http.ResponseWriter, _ *http.Request) {
223 doc := map[string]interface{}{
224 "total_rows": 1,
225 "offset": 0,
226 "docs": []map[string]interface{}{
227 c.schedulerDoc(server, replSet, replDocDB),
228 c.schedulerDoc(server, replSet, db1),
229 },
230 }
231 c.ok(w, &doc)
232 }, nil)
233
234 c.Get("Get Replication Config", fmt.Sprintf("/_replicator/%s", replDocDB), func(w http.ResponseWriter, _ *http.Request) {
235 cfg := c.replConfig(server, replSet, replDocDB)
236 c.ok(w, &cfg)
237 }, nil)
238
239 c.Get("Get Replication Config DB1", fmt.Sprintf("/_replicator/%s", db1), func(w http.ResponseWriter, _ *http.Request) {
240 cfg := c.replConfig(server, replSet, db1)
241 c.ok(w, &cfg)
242 }, nil)
243
244 c.Head("Head: Make ReadOnly", fmt.Sprintf("/%s/_design/auth", replDocDB), func(w http.ResponseWriter, r *http.Request) {
245 c.Lock()
246 w.Header().Set("ETag", _rev)
247 c.Unlock()
248 c.emptyOk(w, r)
249 }, nil)
250 c.Head("Head: Make ReadOnly DB1", fmt.Sprintf("/%s/_design/auth", db1), func(w http.ResponseWriter, r *http.Request) {
251 c.Lock()
252 w.Header().Set("ETag", _rev)
253 c.Unlock()
254 c.emptyOk(w, r)
255 }, nil)
256
257 c.Put("Make ReadOnly", fmt.Sprintf("/%s/_design/auth", replDocDB), c.emptyOk, nil)
258 c.Put("Make ReadOnly db1", fmt.Sprintf("/%s/_design/auth", db1), c.emptyOk, nil)
259 }
260
261 func (c *MockCouchDBCluster) schedulerDoc(server *dsapi.CouchDBServer, replSet *dsapi.CouchDBReplicationSet, db string) map[string]interface{} {
262 sURL, _ := sourceURL(c.ctx, c.client, replSet, db)
263 tURL, _ := targetURL(c.ctx, c.client, server, db, c.config.CouchDBPort)
264 return map[string]interface{}{
265 "database": "_replicator",
266 "id": db,
267 "doc_id": db,
268 "source": sURL + "/",
269 "target": tURL + "/",
270 "start_time": "2023-06-29T10:49:51Z",
271 "last_updated": "2023-06-30T03:09:20Z",
272 "state": "running",
273 "info": map[string]interface{}{
274 "docs_read": 187,
275 "docs_written": 187,
276 "doc_write_failures": 0,
277 "changes_pending": 0,
278 },
279 }
280 }
281 func (c *MockCouchDBCluster) replConfig(server *dsapi.CouchDBServer, replSet *dsapi.CouchDBReplicationSet, db string) map[string]interface{} {
282 sURL, _ := sourceURL(c.ctx, c.client, replSet, db)
283 tURL, _ := targetURL(c.ctx, c.client, server, db, c.config.CouchDBPort)
284 return map[string]interface{}{
285 "_id": db,
286 "_rev": _rev,
287 "continuous": true,
288 "create_target": true,
289 "source": sURL,
290 "target": tURL,
291 }
292 }
293
294 func datasets(dbName, provider string) []dsapi.Dataset {
295 return []dsapi.Dataset{
296 {
297 Name: dbName,
298 Provider: &dsapi.Provider{
299 Name: provider,
300 },
301 Config: dsapi.ReplConfig{
302 Interval: fmt.Sprintf("%d", couchdb.ReplicationInterval.Milliseconds()),
303 Continuous: true,
304 CreateTarget: true,
305 },
306 },
307 }
308 }
309
310 func (c *MockCouchDBCluster) Port() string {
311 serverURL, _ := url.Parse(c.Server.URL)
312 return serverURL.Port()
313 }
314
315 func (c *MockCouchDBCluster) clusterSetUp(w http.ResponseWriter, _ *http.Request) {
316 state := "single_node_enabled"
317 if c.server.IsCloud() {
318 state = "cluster_finished"
319 }
320 resp := &ServerSetupResponse{
321 State: state,
322 Error: "",
323 Reason: "",
324 }
325 c.ok(w, resp)
326 }
327
328 func (c *MockCouchDBCluster) ok(w http.ResponseWriter, data interface{}) {
329 c.Lock()
330 defer c.Unlock()
331 res, err := json.Marshal(data)
332 if err != nil {
333 testutils.WriteHTTPBadResponse(w)
334 return
335 }
336 w.Header().Set("Content-Type", "application/json")
337 _, _ = w.Write(res)
338 }
339
340 func (c *MockCouchDBCluster) emptyOk(w http.ResponseWriter, _ *http.Request) {
341 c.ok(w, struct{}{})
342 }
343
344 func (c *MockCouchDBCluster) mockCouchDBIndex(database *dsapi.CouchDBDatabase, index *dsapi.CouchDBIndex) {
345 c.Post("Create Index", fmt.Sprintf("/%s/_index", database.Name), c.emptyOk, nil)
346 c.Get("Get Index", fmt.Sprintf("/%s/_index", database.Name), func(w http.ResponseWriter, _ *http.Request) {
347 _index := map[string]interface{}{
348 "total_rows": 1,
349 "indexes": []map[string]interface{}{
350 {
351 "ddoc": "_design/" + index.Spec.DDoc,
352 "name": index.IndexName(),
353 "type": "json",
354 "def": map[string]interface{}{
355 "fields": []map[string]interface{}{
356 {index.Spec.Index.Fields[0]: "asc"},
357 },
358 "partial_filter_selector": index.Spec.Index.PartialFilterSelector,
359 },
360 },
361 },
362 }
363 c.ok(w, &_index)
364 }, nil)
365 }
366
367 func (c *MockCouchDBCluster) mockCouchDBDesignDoc(database *dsapi.CouchDBDatabase, ddoc *dsapi.CouchDBDesignDoc) {
368 c.Put("Create Design Doc", fmt.Sprintf("/%s/_design/%s", database.Name, ddoc.Spec.ID), c.emptyOk, nil)
369 c.Get("Get Design Doc", fmt.Sprintf("/%s/_design/%s", database.Name, ddoc.Spec.ID), func(w http.ResponseWriter, _ *http.Request) {
370 doc := map[string]interface{}{
371 "_id": "_design/" + ddoc.Spec.ID,
372 "_rev": _rev,
373 "views": map[string]map[string]string{
374 "test": {
375 "map": "function(doc) { if (doc.type === 'test') { emit(doc._id, doc); } }",
376 },
377 },
378 }
379 c.ok(w, &doc)
380 }, nil)
381 }
382
383 func serverAdminCreds(ctx context.Context, cl client.Client, server *dsapi.CouchDBServer) (*couchdb.AdminCredentials, error) {
384 ref := server.AdminCredentials()
385 creds := &couchdb.AdminCredentials{}
386 _, err := creds.FromSecret(ctx, cl, types.NamespacedName{
387 Name: ref.Name,
388 Namespace: ref.Namespace,
389 })
390 if err != nil {
391 return nil, err
392 }
393 return creds, nil
394 }
395
396 func replicationSourceCredentials(ctx context.Context, cl client.Client, replication *dsapi.CouchDBReplicationSet) (*couchdb.ReplicationCredentials, error) {
397 sourceCreds := &couchdb.ReplicationCredentials{}
398 nn := types.NamespacedName{
399 Name: replication.Spec.Source.Name,
400 Namespace: replication.Spec.Source.Namespace,
401 }
402 _, err := sourceCreds.FromSecret(ctx, cl, nn)
403 return sourceCreds, err
404 }
405
View as plain text