package couchctl import ( "context" "encoding/json" "fmt" "net/http" "net/url" "sync" "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/edge/api/testutils" "edge-infra.dev/pkg/edge/api/utils" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" ) const ( db1 = "db-1" _rev = "1-ebb9921e747510f296532e95a6a74d2e" ) type MockCouchDBCluster struct { sync.Mutex *utils.MockHTTPTestServer client client.Client config *Config server *dsapi.CouchDBServer ctx context.Context } // NewMockCouchDBCluster allows us to run all integration tests without connecting to couchdb. // Note: this does not replace integration test for couchdb func NewMockCouchDBCluster(config *Config, client client.Client, server *dsapi.CouchDBServer, database *dsapi.CouchDBDatabase, user *dsapi.CouchDBUser, index *dsapi.CouchDBIndex, ddoc *dsapi.CouchDBDesignDoc, replication *dsapi.CouchDBReplicationSet) (*MockCouchDBCluster, error) { testServer := utils.NewMockHTTPTestServer().AddAllowedContentType("application/json") cluster := &MockCouchDBCluster{ MockHTTPTestServer: testServer, client: client, server: server, config: config, ctx: context.Background(), } cluster.mockCouchDBServer() cluster.mockCouchDBDatabase(database, user) cluster.mockCouchDBIndex(database, index) cluster.mockCouchDBDesignDoc(database, ddoc) cluster.mockCouchDBUser(server, user) cluster.mockCouchDBReplicationSet(server, replication, user) return cluster, cluster.validate() } // validate checks for duplicate urls which are almost impossible to spot at compile time. func (c *MockCouchDBCluster) validate() error { set := map[string]struct{}{} for _, route := range c.Routes { path := fmt.Sprintf("%s:%s", route.Method, route.Path) if _, ok := set[path]; ok { return fmt.Errorf("duplicated route found: %s", path) } set[path] = struct{}{} } return nil } // sourceURL replication source url: cloud or store server url from source replication secret func sourceURL(ctx context.Context, cl client.Client, replication *dsapi.CouchDBReplicationSet, dbname string) (string, error) { replCreds, err := replicationSourceCredentials(ctx, cl, replication) if err != nil { return "", err } return buildDSNName(string(replCreds.Username), string(replCreds.Password), string(replCreds.URI), dbname) } // targetURL replication target url: store server url and from admin credentials func targetURL(ctx context.Context, cl client.Client, server *dsapi.CouchDBServer, dbname, port string) (string, error) { creds, err := serverAdminCreds(ctx, cl, server) if err != nil { return "", err } uri := fmt.Sprintf("http://localhost:%s", port) return buildDSNName(string(creds.Username), string(creds.Password), uri, dbname) } func (c *MockCouchDBCluster) NotFound() { c.MockHTTPTestServer.AddNotFound(func(w http.ResponseWriter, r *http.Request) { c.Lock() defer c.Unlock() fmt.Println("NotFound-------------", r.Method, r.URL, r.Header) w.WriteHeader(404) }) } func (c *MockCouchDBCluster) mockCouchDBServer() { c.Get("Check Cluster Setup", "/_cluster_setup", c.clusterSetUp, nil) c.Post("Setup Cluster", "/_cluster_setup", c.clusterSetUp, nil) c.Head("Ping Cluster", "/_up", c.emptyOk, nil) } func (c *MockCouchDBCluster) mockCouchDBDatabase(database *dsapi.CouchDBDatabase, user *dsapi.CouchDBUser) { c.Put("Create DB", "/"+database.Name, c.emptyOk, nil) c.Head("DB Exists", "/"+database.Name, c.emptyOk, nil) c.Put("Create DB Auth", fmt.Sprintf("/%s/_design/auth", database.Name), c.emptyOk, nil) c.Put("Create Security", fmt.Sprintf("/%s/_security", database.Name), c.emptyOk, nil) c.Get("Security Exists", fmt.Sprintf("/%s/_security", database.Name), func(w http.ResponseWriter, _ *http.Request) { security := couchdbSecurity(database) // Note: for users with special roles like `read-only-user`, we add the user's name to the database security security.Members.Names = append(security.Members.Names, user.Spec.User.Name) c.ok(w, security) }, nil) c.Get("Security Exists for db", fmt.Sprintf("/%s/_security", db1), func(w http.ResponseWriter, _ *http.Request) { security := couchdbSecurity(database) // Note: for users with special roles like `read-only-user`, we add the user's name to the database security security.Members.Names = append(security.Members.Names, user.Spec.User.Name) c.ok(w, security) }, nil) } func (c *MockCouchDBCluster) mockCouchDBUser(server *dsapi.CouchDBServer, user *dsapi.CouchDBUser) { couchDBUserPath := "/_users/org.couchdb.user%3A" + user.Spec.User.Name couchServerUserPath := "/_users/org.couchdb.user%3A" + server.Name c.Get("Get CouchDB Test User", couchDBUserPath, func(w http.ResponseWriter, _ *http.Request) { doc := c.mockUser(user.Spec.User.Name, user.Spec.User.Roles) c.ok(w, &doc) }, nil) c.Get("Get CouchDB Server Test User", couchServerUserPath, func(w http.ResponseWriter, _ *http.Request) { doc := c.mockUser(server.Name, []string{couchdb.ReplicationUser}) c.ok(w, &doc) }, nil) c.Put("Create User", couchDBUserPath, c.emptyOk, nil) c.Put("Create Store Server User", couchServerUserPath, c.emptyOk, nil) } func (c *MockCouchDBCluster) mockUser(name string, roles []string) map[string]interface{} { return map[string]interface{}{ "_id": fmt.Sprintf("_users/org.couchdb.user:%s", name), "name": name, "type": "user", "roles": roles, "password": "this is generated", "_rev": _rev, } } func (c *MockCouchDBCluster) mockCouchDBReplicationSet(server *dsapi.CouchDBServer, replSet *dsapi.CouchDBReplicationSet, user *dsapi.CouchDBUser) { replDocDB := replSet.Spec.Datasets[0].Name replDocPath := fmt.Sprintf("/%s/repl_doc", replDocDB) provider := user.Spec.Provider c.Head("Replication DB Exists", fmt.Sprintf("/%s", replDocDB), c.emptyOk, nil) c.Head("Replication DB Exists DB1", fmt.Sprintf("/%s", db1), c.emptyOk, nil) c.Head("HEAD: Replication Jobs", "/_scheduler/jobs", c.emptyOk, nil) c.Get("Get Replication Doc", replDocPath, func(w http.ResponseWriter, _ *http.Request) { doc := map[string]interface{}{ "_id": fmt.Sprintf("_users/org.couchdb.user:%s", user.Spec.User.Name), "_rev": _rev, "datasets": datasets(db1, provider.Name), "providers": []dsapi.Provider{*provider}, } c.ok(w, &doc) }, nil) c.Post("Create Replication", "/_replicator", c.emptyOk, nil) c.Post("Get DBs Info", "/_dbs_info", func(w http.ResponseWriter, _ *http.Request) { docs := []map[string]interface{}{ { "key": replDocDB, "info": map[string]interface{}{ "docs_read": 187, "docs_written": 187, "doc_write_failures": 0, "changes_pending": 0, }, "doc_count": 1, }, } c.ok(w, &docs) }, nil) c.Get("Get Replication Docs", "/_replicator/_all_docs?include_docs=true", func(w http.ResponseWriter, _ *http.Request) { doc := map[string]interface{}{ "total_rows": 1, "offset": 0, "rows": []map[string]interface{}{ { "id": replDocDB, "key": replDocDB, "value": map[string]string{ "rev": _rev, }, "doc": c.replConfig(server, replSet, replDocDB), }, { "id": db1, "key": db1, "value": map[string]string{ "rev": _rev, }, "doc": c.replConfig(server, replSet, db1), }, }, } c.ok(w, &doc) }, nil) c.Get("Get Scheduler Docs", "/_scheduler/docs", func(w http.ResponseWriter, _ *http.Request) { doc := map[string]interface{}{ "total_rows": 1, "offset": 0, "docs": []map[string]interface{}{ c.schedulerDoc(server, replSet, replDocDB), c.schedulerDoc(server, replSet, db1), }, } c.ok(w, &doc) }, nil) c.Get("Get Replication Config", fmt.Sprintf("/_replicator/%s", replDocDB), func(w http.ResponseWriter, _ *http.Request) { cfg := c.replConfig(server, replSet, replDocDB) c.ok(w, &cfg) }, nil) c.Get("Get Replication Config DB1", fmt.Sprintf("/_replicator/%s", db1), func(w http.ResponseWriter, _ *http.Request) { cfg := c.replConfig(server, replSet, db1) c.ok(w, &cfg) }, nil) c.Head("Head: Make ReadOnly", fmt.Sprintf("/%s/_design/auth", replDocDB), func(w http.ResponseWriter, r *http.Request) { c.Lock() w.Header().Set("ETag", _rev) c.Unlock() c.emptyOk(w, r) }, nil) c.Head("Head: Make ReadOnly DB1", fmt.Sprintf("/%s/_design/auth", db1), func(w http.ResponseWriter, r *http.Request) { c.Lock() w.Header().Set("ETag", _rev) c.Unlock() c.emptyOk(w, r) }, nil) c.Put("Make ReadOnly", fmt.Sprintf("/%s/_design/auth", replDocDB), c.emptyOk, nil) c.Put("Make ReadOnly db1", fmt.Sprintf("/%s/_design/auth", db1), c.emptyOk, nil) } func (c *MockCouchDBCluster) schedulerDoc(server *dsapi.CouchDBServer, replSet *dsapi.CouchDBReplicationSet, db string) map[string]interface{} { sURL, _ := sourceURL(c.ctx, c.client, replSet, db) tURL, _ := targetURL(c.ctx, c.client, server, db, c.config.CouchDBPort) return map[string]interface{}{ "database": "_replicator", "id": db, "doc_id": db, "source": sURL + "/", "target": tURL + "/", "start_time": "2023-06-29T10:49:51Z", "last_updated": "2023-06-30T03:09:20Z", "state": "running", "info": map[string]interface{}{ "docs_read": 187, "docs_written": 187, "doc_write_failures": 0, "changes_pending": 0, }, } } func (c *MockCouchDBCluster) replConfig(server *dsapi.CouchDBServer, replSet *dsapi.CouchDBReplicationSet, db string) map[string]interface{} { sURL, _ := sourceURL(c.ctx, c.client, replSet, db) // TODO tURL, _ := targetURL(c.ctx, c.client, server, db, c.config.CouchDBPort) return map[string]interface{}{ "_id": db, "_rev": _rev, "continuous": true, "create_target": true, "source": sURL, "target": tURL, } } func datasets(dbName, provider string) []dsapi.Dataset { return []dsapi.Dataset{ { Name: dbName, Provider: &dsapi.Provider{ Name: provider, }, Config: dsapi.ReplConfig{ Interval: fmt.Sprintf("%d", couchdb.ReplicationInterval.Milliseconds()), Continuous: true, CreateTarget: true, }, }, } } func (c *MockCouchDBCluster) Port() string { serverURL, _ := url.Parse(c.Server.URL) return serverURL.Port() } func (c *MockCouchDBCluster) clusterSetUp(w http.ResponseWriter, _ *http.Request) { state := "single_node_enabled" if c.server.IsCloud() { state = "cluster_finished" } resp := &ServerSetupResponse{ State: state, Error: "", Reason: "", } c.ok(w, resp) } func (c *MockCouchDBCluster) ok(w http.ResponseWriter, data interface{}) { c.Lock() defer c.Unlock() res, err := json.Marshal(data) if err != nil { testutils.WriteHTTPBadResponse(w) return } w.Header().Set("Content-Type", "application/json") _, _ = w.Write(res) } func (c *MockCouchDBCluster) emptyOk(w http.ResponseWriter, _ *http.Request) { //nolint:unused c.ok(w, struct{}{}) } func (c *MockCouchDBCluster) mockCouchDBIndex(database *dsapi.CouchDBDatabase, index *dsapi.CouchDBIndex) { c.Post("Create Index", fmt.Sprintf("/%s/_index", database.Name), c.emptyOk, nil) c.Get("Get Index", fmt.Sprintf("/%s/_index", database.Name), func(w http.ResponseWriter, _ *http.Request) { _index := map[string]interface{}{ "total_rows": 1, "indexes": []map[string]interface{}{ { "ddoc": "_design/" + index.Spec.DDoc, "name": index.IndexName(), "type": "json", "def": map[string]interface{}{ "fields": []map[string]interface{}{ {index.Spec.Index.Fields[0]: "asc"}, }, "partial_filter_selector": index.Spec.Index.PartialFilterSelector, }, }, }, } c.ok(w, &_index) }, nil) } func (c *MockCouchDBCluster) mockCouchDBDesignDoc(database *dsapi.CouchDBDatabase, ddoc *dsapi.CouchDBDesignDoc) { c.Put("Create Design Doc", fmt.Sprintf("/%s/_design/%s", database.Name, ddoc.Spec.ID), c.emptyOk, nil) c.Get("Get Design Doc", fmt.Sprintf("/%s/_design/%s", database.Name, ddoc.Spec.ID), func(w http.ResponseWriter, _ *http.Request) { doc := map[string]interface{}{ "_id": "_design/" + ddoc.Spec.ID, "_rev": _rev, "views": map[string]map[string]string{ "test": { "map": "function(doc) { if (doc.type === 'test') { emit(doc._id, doc); } }", }, }, } c.ok(w, &doc) }, nil) } func serverAdminCreds(ctx context.Context, cl client.Client, server *dsapi.CouchDBServer) (*couchdb.AdminCredentials, error) { ref := server.AdminCredentials() creds := &couchdb.AdminCredentials{} _, err := creds.FromSecret(ctx, cl, types.NamespacedName{ Name: ref.Name, Namespace: ref.Namespace, }) if err != nil { return nil, err } return creds, nil } func replicationSourceCredentials(ctx context.Context, cl client.Client, replication *dsapi.CouchDBReplicationSet) (*couchdb.ReplicationCredentials, error) { sourceCreds := &couchdb.ReplicationCredentials{} nn := types.NamespacedName{ Name: replication.Spec.Source.Name, Namespace: replication.Spec.Source.Namespace, } _, err := sourceCreds.FromSecret(ctx, cl, nn) return sourceCreds, err }