...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/mock_couchdb_cluster.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"net/http"
     8  	"net/url"
     9  	"sync"
    10  
    11  	"k8s.io/apimachinery/pkg/types"
    12  	"sigs.k8s.io/controller-runtime/pkg/client"
    13  
    14  	"edge-infra.dev/pkg/edge/api/testutils"
    15  	"edge-infra.dev/pkg/edge/api/utils"
    16  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    17  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    18  )
    19  
    20  const (
    21  	db1  = "db-1"
    22  	_rev = "1-ebb9921e747510f296532e95a6a74d2e"
    23  )
    24  
    25  type MockCouchDBCluster struct {
    26  	sync.Mutex
    27  	*utils.MockHTTPTestServer
    28  	client client.Client
    29  	config *Config
    30  	server *dsapi.CouchDBServer
    31  	ctx    context.Context
    32  }
    33  
    34  // NewMockCouchDBCluster allows us to run all integration tests without connecting to couchdb.
    35  // Note: this does not replace integration test for couchdb
    36  func NewMockCouchDBCluster(config *Config, client client.Client,
    37  	server *dsapi.CouchDBServer,
    38  	database *dsapi.CouchDBDatabase,
    39  	user *dsapi.CouchDBUser,
    40  	index *dsapi.CouchDBIndex,
    41  	ddoc *dsapi.CouchDBDesignDoc,
    42  	replication *dsapi.CouchDBReplicationSet) (*MockCouchDBCluster, error) {
    43  	testServer := utils.NewMockHTTPTestServer().AddAllowedContentType("application/json")
    44  
    45  	cluster := &MockCouchDBCluster{
    46  		MockHTTPTestServer: testServer,
    47  		client:             client,
    48  		server:             server,
    49  		config:             config,
    50  		ctx:                context.Background(),
    51  	}
    52  
    53  	cluster.mockCouchDBServer()
    54  	cluster.mockCouchDBDatabase(database, user)
    55  	cluster.mockCouchDBIndex(database, index)
    56  	cluster.mockCouchDBDesignDoc(database, ddoc)
    57  	cluster.mockCouchDBUser(server, user)
    58  	cluster.mockCouchDBReplicationSet(server, replication, user)
    59  	return cluster, cluster.validate()
    60  }
    61  
    62  // validate checks for duplicate urls which are almost impossible to spot at compile time.
    63  func (c *MockCouchDBCluster) validate() error {
    64  	set := map[string]struct{}{}
    65  	for _, route := range c.Routes {
    66  		path := fmt.Sprintf("%s:%s", route.Method, route.Path)
    67  		if _, ok := set[path]; ok {
    68  			return fmt.Errorf("duplicated route found: %s", path)
    69  		}
    70  		set[path] = struct{}{}
    71  	}
    72  	return nil
    73  }
    74  
    75  // sourceURL replication source url: cloud or store server url from source replication secret
    76  func sourceURL(ctx context.Context, cl client.Client, replication *dsapi.CouchDBReplicationSet, dbname string) (string, error) {
    77  	replCreds, err := replicationSourceCredentials(ctx, cl, replication)
    78  	if err != nil {
    79  		return "", err
    80  	}
    81  	return buildDSNName(string(replCreds.Username), string(replCreds.Password), string(replCreds.URI), dbname)
    82  }
    83  
    84  // targetURL replication target url: store server url and from admin credentials
    85  func targetURL(ctx context.Context, cl client.Client, server *dsapi.CouchDBServer, dbname, port string) (string, error) {
    86  	creds, err := serverAdminCreds(ctx, cl, server)
    87  	if err != nil {
    88  		return "", err
    89  	}
    90  	uri := fmt.Sprintf("http://localhost:%s", port)
    91  	return buildDSNName(string(creds.Username), string(creds.Password), uri, dbname)
    92  }
    93  
    94  func (c *MockCouchDBCluster) NotFound() {
    95  	c.MockHTTPTestServer.AddNotFound(func(w http.ResponseWriter, r *http.Request) {
    96  		c.Lock()
    97  		defer c.Unlock()
    98  		fmt.Println("NotFound-------------", r.Method, r.URL, r.Header)
    99  		w.WriteHeader(404)
   100  	})
   101  }
   102  func (c *MockCouchDBCluster) mockCouchDBServer() {
   103  	c.Get("Check Cluster Setup", "/_cluster_setup", c.clusterSetUp, nil)
   104  	c.Post("Setup Cluster", "/_cluster_setup", c.clusterSetUp, nil)
   105  	c.Head("Ping Cluster", "/_up", c.emptyOk, nil)
   106  }
   107  func (c *MockCouchDBCluster) mockCouchDBDatabase(database *dsapi.CouchDBDatabase, user *dsapi.CouchDBUser) {
   108  	c.Put("Create DB", "/"+database.Name, c.emptyOk, nil)
   109  	c.Head("DB Exists", "/"+database.Name, c.emptyOk, nil)
   110  
   111  	c.Put("Create DB Auth", fmt.Sprintf("/%s/_design/auth", database.Name), c.emptyOk, nil)
   112  
   113  	c.Put("Create Security", fmt.Sprintf("/%s/_security", database.Name), c.emptyOk, nil)
   114  	c.Get("Security Exists", fmt.Sprintf("/%s/_security", database.Name), func(w http.ResponseWriter, _ *http.Request) {
   115  		security := couchdbSecurity(database)
   116  
   117  		// Note: for users with special roles like `read-only-user`, we add the user's name to the database security
   118  		security.Members.Names = append(security.Members.Names, user.Spec.User.Name)
   119  		c.ok(w, security)
   120  	}, nil)
   121  
   122  	c.Get("Security Exists for db", fmt.Sprintf("/%s/_security", db1), func(w http.ResponseWriter, _ *http.Request) {
   123  		security := couchdbSecurity(database)
   124  
   125  		// Note: for users with special roles like `read-only-user`, we add the user's name to the database security
   126  		security.Members.Names = append(security.Members.Names, user.Spec.User.Name)
   127  		c.ok(w, security)
   128  	}, nil)
   129  }
   130  
   131  func (c *MockCouchDBCluster) mockCouchDBUser(server *dsapi.CouchDBServer, user *dsapi.CouchDBUser) {
   132  	couchDBUserPath := "/_users/org.couchdb.user%3A" + user.Spec.User.Name
   133  	couchServerUserPath := "/_users/org.couchdb.user%3A" + server.Name
   134  
   135  	c.Get("Get CouchDB Test User", couchDBUserPath, func(w http.ResponseWriter, _ *http.Request) {
   136  		doc := c.mockUser(user.Spec.User.Name, user.Spec.User.Roles)
   137  		c.ok(w, &doc)
   138  	}, nil)
   139  
   140  	c.Get("Get CouchDB Server Test User", couchServerUserPath, func(w http.ResponseWriter, _ *http.Request) {
   141  		doc := c.mockUser(server.Name, []string{couchdb.ReplicationUser})
   142  		c.ok(w, &doc)
   143  	}, nil)
   144  
   145  	c.Put("Create User", couchDBUserPath, c.emptyOk, nil)
   146  	c.Put("Create Store Server User", couchServerUserPath, c.emptyOk, nil)
   147  }
   148  func (c *MockCouchDBCluster) mockUser(name string, roles []string) map[string]interface{} {
   149  	return map[string]interface{}{
   150  		"_id":      fmt.Sprintf("_users/org.couchdb.user:%s", name),
   151  		"name":     name,
   152  		"type":     "user",
   153  		"roles":    roles,
   154  		"password": "this is generated",
   155  		"_rev":     _rev,
   156  	}
   157  }
   158  
   159  func (c *MockCouchDBCluster) mockCouchDBReplicationSet(server *dsapi.CouchDBServer, replSet *dsapi.CouchDBReplicationSet, user *dsapi.CouchDBUser) {
   160  	replDocDB := replSet.Spec.Datasets[0].Name
   161  	replDocPath := fmt.Sprintf("/%s/repl_doc", replDocDB)
   162  
   163  	provider := user.Spec.Provider
   164  
   165  	c.Head("Replication DB Exists", fmt.Sprintf("/%s", replDocDB), c.emptyOk, nil)
   166  	c.Head("Replication DB Exists DB1", fmt.Sprintf("/%s", db1), c.emptyOk, nil)
   167  
   168  	c.Head("HEAD: Replication Jobs", "/_scheduler/jobs", c.emptyOk, nil)
   169  
   170  	c.Get("Get Replication Doc", replDocPath, func(w http.ResponseWriter, _ *http.Request) {
   171  		doc := map[string]interface{}{
   172  			"_id":       fmt.Sprintf("_users/org.couchdb.user:%s", user.Spec.User.Name),
   173  			"_rev":      _rev,
   174  			"datasets":  datasets(db1, provider.Name),
   175  			"providers": []dsapi.Provider{*provider},
   176  		}
   177  		c.ok(w, &doc)
   178  	}, nil)
   179  	c.Post("Create Replication", "/_replicator", c.emptyOk, nil)
   180  	c.Post("Get DBs Info", "/_dbs_info", func(w http.ResponseWriter, _ *http.Request) {
   181  		docs := []map[string]interface{}{
   182  			{
   183  				"key": replDocDB,
   184  				"info": map[string]interface{}{
   185  					"docs_read":          187,
   186  					"docs_written":       187,
   187  					"doc_write_failures": 0,
   188  					"changes_pending":    0,
   189  				},
   190  				"doc_count": 1,
   191  			},
   192  		}
   193  		c.ok(w, &docs)
   194  	}, nil)
   195  
   196  	c.Get("Get Replication Docs", "/_replicator/_all_docs?include_docs=true", func(w http.ResponseWriter, _ *http.Request) {
   197  		doc := map[string]interface{}{
   198  			"total_rows": 1,
   199  			"offset":     0,
   200  			"rows": []map[string]interface{}{
   201  				{
   202  					"id":  replDocDB,
   203  					"key": replDocDB,
   204  					"value": map[string]string{
   205  						"rev": _rev,
   206  					},
   207  					"doc": c.replConfig(server, replSet, replDocDB),
   208  				},
   209  				{
   210  					"id":  db1,
   211  					"key": db1,
   212  					"value": map[string]string{
   213  						"rev": _rev,
   214  					},
   215  					"doc": c.replConfig(server, replSet, db1),
   216  				},
   217  			},
   218  		}
   219  		c.ok(w, &doc)
   220  	}, nil)
   221  
   222  	c.Get("Get Scheduler Docs", "/_scheduler/docs", func(w http.ResponseWriter, _ *http.Request) {
   223  		doc := map[string]interface{}{
   224  			"total_rows": 1,
   225  			"offset":     0,
   226  			"docs": []map[string]interface{}{
   227  				c.schedulerDoc(server, replSet, replDocDB),
   228  				c.schedulerDoc(server, replSet, db1),
   229  			},
   230  		}
   231  		c.ok(w, &doc)
   232  	}, nil)
   233  
   234  	c.Get("Get Replication Config", fmt.Sprintf("/_replicator/%s", replDocDB), func(w http.ResponseWriter, _ *http.Request) {
   235  		cfg := c.replConfig(server, replSet, replDocDB)
   236  		c.ok(w, &cfg)
   237  	}, nil)
   238  
   239  	c.Get("Get Replication Config DB1", fmt.Sprintf("/_replicator/%s", db1), func(w http.ResponseWriter, _ *http.Request) {
   240  		cfg := c.replConfig(server, replSet, db1)
   241  		c.ok(w, &cfg)
   242  	}, nil)
   243  
   244  	c.Head("Head: Make ReadOnly", fmt.Sprintf("/%s/_design/auth", replDocDB), func(w http.ResponseWriter, r *http.Request) {
   245  		c.Lock()
   246  		w.Header().Set("ETag", _rev)
   247  		c.Unlock()
   248  		c.emptyOk(w, r)
   249  	}, nil)
   250  	c.Head("Head: Make ReadOnly DB1", fmt.Sprintf("/%s/_design/auth", db1), func(w http.ResponseWriter, r *http.Request) {
   251  		c.Lock()
   252  		w.Header().Set("ETag", _rev)
   253  		c.Unlock()
   254  		c.emptyOk(w, r)
   255  	}, nil)
   256  
   257  	c.Put("Make ReadOnly", fmt.Sprintf("/%s/_design/auth", replDocDB), c.emptyOk, nil)
   258  	c.Put("Make ReadOnly db1", fmt.Sprintf("/%s/_design/auth", db1), c.emptyOk, nil)
   259  }
   260  
   261  func (c *MockCouchDBCluster) schedulerDoc(server *dsapi.CouchDBServer, replSet *dsapi.CouchDBReplicationSet, db string) map[string]interface{} {
   262  	sURL, _ := sourceURL(c.ctx, c.client, replSet, db)
   263  	tURL, _ := targetURL(c.ctx, c.client, server, db, c.config.CouchDBPort)
   264  	return map[string]interface{}{
   265  		"database":     "_replicator",
   266  		"id":           db,
   267  		"doc_id":       db,
   268  		"source":       sURL + "/",
   269  		"target":       tURL + "/",
   270  		"start_time":   "2023-06-29T10:49:51Z",
   271  		"last_updated": "2023-06-30T03:09:20Z",
   272  		"state":        "running",
   273  		"info": map[string]interface{}{
   274  			"docs_read":          187,
   275  			"docs_written":       187,
   276  			"doc_write_failures": 0,
   277  			"changes_pending":    0,
   278  		},
   279  	}
   280  }
   281  func (c *MockCouchDBCluster) replConfig(server *dsapi.CouchDBServer, replSet *dsapi.CouchDBReplicationSet, db string) map[string]interface{} {
   282  	sURL, _ := sourceURL(c.ctx, c.client, replSet, db) // TODO
   283  	tURL, _ := targetURL(c.ctx, c.client, server, db, c.config.CouchDBPort)
   284  	return map[string]interface{}{
   285  		"_id":           db,
   286  		"_rev":          _rev,
   287  		"continuous":    true,
   288  		"create_target": true,
   289  		"source":        sURL,
   290  		"target":        tURL,
   291  	}
   292  }
   293  
   294  func datasets(dbName, provider string) []dsapi.Dataset {
   295  	return []dsapi.Dataset{
   296  		{
   297  			Name: dbName,
   298  			Provider: &dsapi.Provider{
   299  				Name: provider,
   300  			},
   301  			Config: dsapi.ReplConfig{
   302  				Interval:     fmt.Sprintf("%d", couchdb.ReplicationInterval.Milliseconds()),
   303  				Continuous:   true,
   304  				CreateTarget: true,
   305  			},
   306  		},
   307  	}
   308  }
   309  
   310  func (c *MockCouchDBCluster) Port() string {
   311  	serverURL, _ := url.Parse(c.Server.URL)
   312  	return serverURL.Port()
   313  }
   314  
   315  func (c *MockCouchDBCluster) clusterSetUp(w http.ResponseWriter, _ *http.Request) {
   316  	state := "single_node_enabled"
   317  	if c.server.IsCloud() {
   318  		state = "cluster_finished"
   319  	}
   320  	resp := &ServerSetupResponse{
   321  		State:  state,
   322  		Error:  "",
   323  		Reason: "",
   324  	}
   325  	c.ok(w, resp)
   326  }
   327  
   328  func (c *MockCouchDBCluster) ok(w http.ResponseWriter, data interface{}) {
   329  	c.Lock()
   330  	defer c.Unlock()
   331  	res, err := json.Marshal(data)
   332  	if err != nil {
   333  		testutils.WriteHTTPBadResponse(w)
   334  		return
   335  	}
   336  	w.Header().Set("Content-Type", "application/json")
   337  	_, _ = w.Write(res)
   338  }
   339  
   340  func (c *MockCouchDBCluster) emptyOk(w http.ResponseWriter, _ *http.Request) { //nolint:unused
   341  	c.ok(w, struct{}{})
   342  }
   343  
   344  func (c *MockCouchDBCluster) mockCouchDBIndex(database *dsapi.CouchDBDatabase, index *dsapi.CouchDBIndex) {
   345  	c.Post("Create Index", fmt.Sprintf("/%s/_index", database.Name), c.emptyOk, nil)
   346  	c.Get("Get Index", fmt.Sprintf("/%s/_index", database.Name), func(w http.ResponseWriter, _ *http.Request) {
   347  		_index := map[string]interface{}{
   348  			"total_rows": 1,
   349  			"indexes": []map[string]interface{}{
   350  				{
   351  					"ddoc": "_design/" + index.Spec.DDoc,
   352  					"name": index.IndexName(),
   353  					"type": "json",
   354  					"def": map[string]interface{}{
   355  						"fields": []map[string]interface{}{
   356  							{index.Spec.Index.Fields[0]: "asc"},
   357  						},
   358  						"partial_filter_selector": index.Spec.Index.PartialFilterSelector,
   359  					},
   360  				},
   361  			},
   362  		}
   363  		c.ok(w, &_index)
   364  	}, nil)
   365  }
   366  
   367  func (c *MockCouchDBCluster) mockCouchDBDesignDoc(database *dsapi.CouchDBDatabase, ddoc *dsapi.CouchDBDesignDoc) {
   368  	c.Put("Create Design Doc", fmt.Sprintf("/%s/_design/%s", database.Name, ddoc.Spec.ID), c.emptyOk, nil)
   369  	c.Get("Get Design Doc", fmt.Sprintf("/%s/_design/%s", database.Name, ddoc.Spec.ID), func(w http.ResponseWriter, _ *http.Request) {
   370  		doc := map[string]interface{}{
   371  			"_id":  "_design/" + ddoc.Spec.ID,
   372  			"_rev": _rev,
   373  			"views": map[string]map[string]string{
   374  				"test": {
   375  					"map": "function(doc) { if (doc.type === 'test') { emit(doc._id, doc); } }",
   376  				},
   377  			},
   378  		}
   379  		c.ok(w, &doc)
   380  	}, nil)
   381  }
   382  
   383  func serverAdminCreds(ctx context.Context, cl client.Client, server *dsapi.CouchDBServer) (*couchdb.AdminCredentials, error) {
   384  	ref := server.AdminCredentials()
   385  	creds := &couchdb.AdminCredentials{}
   386  	_, err := creds.FromSecret(ctx, cl, types.NamespacedName{
   387  		Name:      ref.Name,
   388  		Namespace: ref.Namespace,
   389  	})
   390  	if err != nil {
   391  		return nil, err
   392  	}
   393  	return creds, nil
   394  }
   395  
   396  func replicationSourceCredentials(ctx context.Context, cl client.Client, replication *dsapi.CouchDBReplicationSet) (*couchdb.ReplicationCredentials, error) {
   397  	sourceCreds := &couchdb.ReplicationCredentials{}
   398  	nn := types.NamespacedName{
   399  		Name:      replication.Spec.Source.Name,
   400  		Namespace: replication.Spec.Source.Namespace,
   401  	}
   402  	_, err := sourceCreds.FromSecret(ctx, cl, nn)
   403  	return sourceCreds, err
   404  }
   405  

View as plain text