...

Source file src/edge-infra.dev/pkg/edge/datasync/couchdb/couchdb.go

Documentation: edge-infra.dev/pkg/edge/datasync/couchdb

     1  package couchdb
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"net/http"
     8  	"net/url"
     9  	"strings"
    10  
    11  	_ "embed"
    12  
    13  	"edge-infra.dev/pkg/lib/logging"
    14  
    15  	"github.com/go-kivik/kivik/v4"
    16  	"github.com/go-kivik/kivik/v4/couchdb" // The CouchDB driver
    17  )
    18  
    19  type CouchDB struct {
    20  	Client *kivik.Client
    21  	Logger *logging.EdgeLogger
    22  }
    23  
    24  type Security struct {
    25  	Admins  NameRole
    26  	Members NameRole
    27  }
    28  
    29  type NameRole struct {
    30  	Names []string
    31  	Roles []string
    32  }
    33  
    34  type User struct {
    35  	Name string   `json:"name"`
    36  	Role []string `json:"roles"`
    37  }
    38  
    39  // ReplicatorConfigDoc TODO add remaining settings in next PR from here https://docs.couchdb.org/en/stable/json-structure.html#replication-settings
    40  type ReplicatorConfigDoc struct {
    41  	ID           string      `json:"_id"`
    42  	Rev          string      `json:"_rev"`
    43  	Source       interface{} `json:"source"` // can be string or struct
    44  	Target       interface{} `json:"target"` // can be string or struct
    45  	CreateTarget bool        `json:"create_target"`
    46  	Continuous   bool        `json:"continuous"`
    47  }
    48  
    49  //go:embed readOnly.js
    50  var jsFunction string
    51  
    52  var (
    53  	ErrNotFound            = errors.New("not found")
    54  	ErrAuthorizationNeeded = errors.New("authentication required")
    55  	ErrNotAuthorized       = errors.New("you are not authorized")
    56  	ErrPreconditionFailed  = errors.New("cannot create a duplicate database")
    57  	ErrConflict            = errors.New("cannot create a duplicate user")
    58  )
    59  
    60  // create a new instance of CouchDB
    61  func (cdb *CouchDB) New(driver, u, p, uri, port string) error {
    62  	clientKeepAlive := &http.Client{Transport: &http.Transport{DisableKeepAlives: true}}
    63  	client, err := kivik.New(driver, FormatURI(u, p, uri, port), couchdb.BasicAuth(u, p), couchdb.OptionHTTPClient(clientKeepAlive))
    64  	if err != nil {
    65  		return err
    66  	}
    67  
    68  	cdb.Client = client
    69  	cdb.Logger = logging.NewLogger()
    70  	return nil
    71  }
    72  
    73  // create a new instance of CouchDB
    74  func (cdb *CouchDB) NewFromURL(u, p, url string, options ...kivik.Option) error {
    75  	fURL, err := FormatURL(u, p, url)
    76  	if err != nil {
    77  		return err
    78  	}
    79  	clientKeepAlive := &http.Client{Transport: &http.Transport{DisableKeepAlives: true}}
    80  	options = append(options, couchdb.BasicAuth(u, p), couchdb.OptionHTTPClient(clientKeepAlive))
    81  	client, err := kivik.New(Driver, fURL, options...)
    82  	if err != nil {
    83  		return err
    84  	}
    85  
    86  	cdb.Client = client
    87  	cdb.Logger = logging.NewLogger()
    88  	return nil
    89  }
    90  
    91  func (cdb *CouchDB) GetReplicatorDB() (*kivik.DB, error) {
    92  	db := cdb.Client.DB("_replicator")
    93  	err := db.Err()
    94  	if err != nil {
    95  		return nil, fmt.Errorf("fail to get replication database: %w", err)
    96  	}
    97  	return db, nil
    98  }
    99  
   100  func (cdb *CouchDB) GetReplicationConfigDoc(ctx context.Context, dbname string) (*ReplicatorConfigDoc, error) {
   101  	db, err := cdb.GetReplicatorDB()
   102  	if err != nil {
   103  		return nil, err
   104  	}
   105  	replDoc := &ReplicatorConfigDoc{}
   106  	row := db.Get(ctx, dbname)
   107  	err = row.Err()
   108  	if err != nil {
   109  		return nil, fmt.Errorf("fail to get replication document %s: %w", dbname, err)
   110  	}
   111  
   112  	err = row.ScanDoc(replDoc)
   113  	if err != nil {
   114  		return nil, fmt.Errorf("fail to scan replication document %s: %w", dbname, err)
   115  	}
   116  
   117  	return replDoc, err
   118  }
   119  
   120  // ShouldReCreateFn we cannot mutate a replication, we can only delete and re-create
   121  type ShouldReCreateFn func(replDoc *ReplicatorConfigDoc) bool
   122  
   123  func (cdb *CouchDB) CreateOrUpdateReplication(ctx context.Context, targetDSN, sourceDSN string, options map[string]interface{}, shouldRecreate ShouldReCreateFn) error {
   124  	dbname := options["_id"].(string)
   125  	if len(dbname) == 0 {
   126  		return fmt.Errorf("invalid replication settings configuration, id not found")
   127  	}
   128  	replDoc, err := cdb.GetReplicationConfigDoc(ctx, dbname)
   129  	if err != nil {
   130  		if IsNotFound(err) {
   131  			_, err = cdb.Client.Replicate(ctx, targetDSN, sourceDSN, kivik.Params(options))
   132  			return err
   133  		}
   134  		return err
   135  	}
   136  	if !shouldRecreate(replDoc) {
   137  		return nil
   138  	}
   139  	db, err := cdb.GetReplicatorDB()
   140  	if err != nil {
   141  		return err
   142  	}
   143  	_, err = db.Delete(ctx, dbname, replDoc.Rev)
   144  	if err != nil {
   145  		return fmt.Errorf("fail to remove old replication: %w", err)
   146  	}
   147  	_, err = cdb.Client.Replicate(ctx, targetDSN, sourceDSN, kivik.Params(options))
   148  	return err
   149  }
   150  
   151  func (cdb *CouchDB) DeleteReplication(ctx context.Context, dbname string) error {
   152  	replDoc, err := cdb.GetReplicationConfigDoc(ctx, dbname)
   153  	if IgnoreNotFound(err) != nil {
   154  		return err
   155  	}
   156  	if replDoc == nil { // not found, no need to delete
   157  		return nil
   158  	}
   159  	db, err := cdb.GetReplicatorDB()
   160  	if err != nil {
   161  		return nil
   162  	}
   163  	_, err = db.Delete(ctx, dbname, replDoc.Rev)
   164  	return err
   165  }
   166  
   167  // GetReplicationSetDoc return the replication set document
   168  func (cdb *CouchDB) GetReplicationSetDoc(ctx context.Context, dbname string, dest interface{}) error {
   169  	db := cdb.Client.DB(dbname)
   170  	row := db.Get(ctx, ReplicationDocument)
   171  	if row.Err() != nil {
   172  		return row.Err()
   173  	}
   174  	return row.ScanDoc(dest)
   175  }
   176  
   177  func (cdb *CouchDB) CheckReplication(ctx context.Context, docID string) error {
   178  	repls, err := cdb.Client.GetReplications(ctx)
   179  	if err != nil {
   180  		return fmt.Errorf("couldn't list replications: %w", err)
   181  	}
   182  	docID = fmt.Sprintf("/%s/", docID)
   183  	found := false
   184  	// Note: library does not return replication state for a single replication, we are forced to loop through
   185  	for _, r := range repls {
   186  		if strings.HasSuffix(r.Source, docID) && strings.HasSuffix(r.Target, docID) {
   187  			found = true
   188  			state := r.State()
   189  			badReplication := state == kivik.ReplicationError ||
   190  				state == kivik.ReplicationCrashing ||
   191  				state == kivik.ReplicationFailed
   192  			if badReplication {
   193  				return fmt.Errorf("replication is in bad state %s: %w", state, r.Err())
   194  			}
   195  			break
   196  		}
   197  	}
   198  	if found {
   199  		return nil
   200  	}
   201  	return fmt.Errorf("could not find replication for %s", docID)
   202  }
   203  
   204  // CreateDB creates a new database
   205  // return a bool that indicates a duplication was attempted and the error
   206  func (cdb *CouchDB) CreateDB(ctx context.Context, dbname string) error {
   207  	return cdb.checkStatusCode(cdb.Client.CreateDB(ctx, dbname))
   208  }
   209  
   210  func (cdb *CouchDB) CheckIfDBExists(ctx context.Context, dbname string) (bool, error) {
   211  	exists, err := cdb.Client.DBExists(ctx, dbname)
   212  	return exists, cdb.checkStatusCode(err)
   213  }
   214  
   215  // DBsInfo bulk stats, default 100
   216  // TODO split per 100 DB
   217  func (cdb *CouchDB) DBsInfo(ctx context.Context, dbs []string) (map[string]*kivik.DBStats, error) {
   218  	// RECOVERING FROM DATA NOT FOUND IN COUCHDB.
   219  	// THE KIVIK LIBRARY FALSY PANICS, RESULT SHOULD BE EMPTY INSTEAD
   220  	defer func() { _ = recover() }()
   221  
   222  	result := make(map[string]*kivik.DBStats)
   223  	if len(dbs) > 0 {
   224  		stats, err := cdb.Client.DBsStats(ctx, dbs)
   225  		if err != nil {
   226  			return nil, err
   227  		}
   228  		for _, stat := range stats {
   229  			result[stat.Name] = stat
   230  		}
   231  	}
   232  	return result, nil
   233  }
   234  
   235  // CreateNewUser creates a new user
   236  // return the rev key for the new user, a bool for if a duplication was attempted to be inserted, and an error
   237  func (cdb *CouchDB) CreateNewUser(ctx context.Context, u, p string, r []string) (string, error) {
   238  	db := cdb.Client.DB("_users")
   239  	user := formatUserString(u)
   240  	doc := map[string]interface{}{
   241  		"_id":      user,
   242  		"name":     u,
   243  		"type":     "user",
   244  		"roles":    r,
   245  		"password": p,
   246  	}
   247  	rev, err := db.Put(ctx, user, doc)
   248  	return rev, cdb.checkStatusCode(err)
   249  }
   250  
   251  func (cdb *CouchDB) DeleteUser(ctx context.Context, username string) error {
   252  	db := cdb.Client.DB("_users")
   253  	user := formatUserString(username)
   254  	rev, err := db.GetRev(ctx, user)
   255  	if err != nil {
   256  		return err
   257  	}
   258  	_, err = db.Delete(ctx, user, rev)
   259  	return err
   260  }
   261  
   262  func (cdb *CouchDB) CheckUserAndRolesExists(ctx context.Context, u string, roles []string) (bool, error) {
   263  	db := cdb.Client.DB("_users")
   264  	username := formatUserString(u)
   265  	row := db.Get(ctx, username)
   266  	if row.Err() != nil {
   267  		return false, row.Err()
   268  	}
   269  
   270  	var user User
   271  	if err := row.ScanDoc(&user); err != nil {
   272  		return false, err
   273  	}
   274  
   275  	if user.Name != u {
   276  		return false, nil
   277  	}
   278  
   279  	for _, role := range roles {
   280  		if !contains(user.Role, role) {
   281  			return false, nil
   282  		}
   283  	}
   284  
   285  	return true, nil
   286  }
   287  
   288  // AddMemberUserAndRoleToMultipleDBs adds a user to a list of databases
   289  func (cdb *CouchDB) AddMemberUserAndRoleToMultipleDBs(ctx context.Context, s Security, dbs []string) error {
   290  	for _, db := range dbs {
   291  		err := cdb.AddMemberUserAndRolesToDB(ctx, s, db)
   292  		if err != nil {
   293  			return err
   294  		}
   295  	}
   296  	return nil
   297  }
   298  
   299  // AddMemberUserAndRolesToDB adds a user to a given databases Member list
   300  // takes in a list of strings for Names (users) and Roles
   301  func (cdb *CouchDB) AddMemberUserAndRolesToDB(ctx context.Context, s Security, dbname string) error {
   302  	m := &kivik.Members{
   303  		Names: s.Members.Names,
   304  		Roles: s.Members.Roles,
   305  	}
   306  
   307  	a := &kivik.Members{
   308  		Names: s.Admins.Names,
   309  		Roles: s.Admins.Roles,
   310  	}
   311  
   312  	// connect to the correct db and get the current security details
   313  	db := cdb.Client.DB(dbname)
   314  	sec, err := db.Security(ctx)
   315  	if err != nil {
   316  		return err
   317  	}
   318  
   319  	// add the new roles to the security list so we dont overwrite
   320  	for _, name := range a.Names {
   321  		if !contains(sec.Admins.Names, name) {
   322  			sec.Admins.Names = append(sec.Admins.Names, name)
   323  		}
   324  	}
   325  
   326  	for _, role := range a.Roles {
   327  		if !contains(sec.Admins.Roles, role) {
   328  			sec.Admins.Roles = append(sec.Admins.Roles, role)
   329  		}
   330  	}
   331  
   332  	for _, name := range m.Names {
   333  		if !contains(sec.Members.Names, name) {
   334  			sec.Members.Names = append(sec.Members.Names, name)
   335  		}
   336  	}
   337  
   338  	for _, role := range m.Roles {
   339  		if !contains(sec.Members.Roles, role) {
   340  			sec.Members.Roles = append(sec.Members.Roles, role)
   341  		}
   342  	}
   343  
   344  	return cdb.checkStatusCode(db.SetSecurity(ctx, sec))
   345  }
   346  
   347  func (cdb *CouchDB) CheckDBUsersAndRoles(ctx context.Context, s Security, dbname string) (bool, error) {
   348  	// connect to the correct db and get the current security details
   349  	db := cdb.Client.DB(dbname)
   350  	sec, err := db.Security(ctx)
   351  	if err != nil {
   352  		return false, err
   353  	}
   354  
   355  	// check that the database contains the user defined security members and roles
   356  	exists := securityContains(s, sec)
   357  	return exists, nil
   358  }
   359  
   360  func (cdb *CouchDB) MakeReadOnly(ctx context.Context, dbname string) error {
   361  	db := cdb.Client.DB(dbname)
   362  	_, err := db.GetRev(ctx, AuthDesignDoc)
   363  	if !IsNotFound(err) {
   364  		return err
   365  	}
   366  	if err == nil { // do not re-create design doc
   367  		return nil
   368  	}
   369  	validUserFunc := strings.ReplaceAll(jsFunction, "\\n", " ")
   370  	doc := map[string]string{ReadOnlyDesignDoc: validUserFunc}
   371  	return cdb.makeReadOnly(ctx, db, doc)
   372  }
   373  
   374  func (cdb *CouchDB) makeReadOnly(ctx context.Context, db *kivik.DB, doc map[string]string) error {
   375  	_, err := db.Put(ctx, AuthDesignDoc, doc)
   376  	if err == nil {
   377  		return nil
   378  	}
   379  	if kivik.HTTPStatus(err) != 409 {
   380  		return err
   381  	}
   382  	rev, err := db.GetRev(ctx, AuthDesignDoc)
   383  	if err != nil {
   384  		return err
   385  	}
   386  	doc["_rev"] = rev
   387  	return cdb.makeReadOnly(ctx, db, doc)
   388  }
   389  
   390  func (cdb *CouchDB) RemoveReadOnly(ctx context.Context, dbname string) error {
   391  	db := cdb.Client.DB(dbname)
   392  	rev, err := db.GetRev(ctx, AuthDesignDoc)
   393  	if IsNotFound(err) {
   394  		return nil
   395  	}
   396  	if err != nil {
   397  		return err
   398  	}
   399  	_, err = db.Delete(ctx, AuthDesignDoc, rev)
   400  	return err
   401  }
   402  
   403  func securityContains(s Security, sec *kivik.Security) bool {
   404  	// check the admin names / roles exist
   405  	for _, name := range s.Admins.Names {
   406  		if !contains(sec.Admins.Names, name) {
   407  			return false
   408  		}
   409  	}
   410  
   411  	for _, role := range s.Admins.Roles {
   412  		if !contains(sec.Admins.Roles, role) {
   413  			return false
   414  		}
   415  	}
   416  
   417  	// check the members names / roles exist
   418  	for _, name := range s.Members.Names {
   419  		if !contains(sec.Members.Names, name) {
   420  			return false
   421  		}
   422  	}
   423  
   424  	for _, role := range s.Members.Roles {
   425  		if !contains(sec.Members.Roles, role) {
   426  			return false
   427  		}
   428  	}
   429  	return true
   430  }
   431  
   432  // :)
   433  func contains(arr []string, str string) bool {
   434  	for _, v := range arr {
   435  		if v == str {
   436  			return true
   437  		}
   438  	}
   439  
   440  	return false
   441  }
   442  
   443  // Close closes the client connection
   444  func (cdb *CouchDB) Close(_ context.Context) error {
   445  	return cdb.Client.Close()
   446  }
   447  
   448  // wrap some of the well known errors with our own sentinels
   449  // this should expand over time
   450  func (cdb *CouchDB) checkStatusCode(err error) error {
   451  	// if there's no error dont create an error
   452  	if err == nil {
   453  		return err
   454  	}
   455  
   456  	switch kivik.HTTPStatus(err) {
   457  	case http.StatusNotFound:
   458  		return fmt.Errorf("%w", ErrNotFound)
   459  	case http.StatusUnauthorized:
   460  		return fmt.Errorf("%w", ErrAuthorizationNeeded)
   461  	case http.StatusForbidden:
   462  		return fmt.Errorf("%w", ErrNotAuthorized)
   463  	case http.StatusConflict:
   464  		return fmt.Errorf("%w", ErrConflict)
   465  	case http.StatusPreconditionFailed:
   466  		return fmt.Errorf("%w", ErrPreconditionFailed)
   467  	}
   468  	return err
   469  }
   470  
   471  func IsConflict(err error) bool {
   472  	return kivik.HTTPStatus(err) == http.StatusConflict
   473  }
   474  
   475  func IsNotFound(err error) bool {
   476  	return kivik.HTTPStatus(err) == http.StatusNotFound
   477  }
   478  
   479  func IsBadGateway(err error) bool {
   480  	return kivik.HTTPStatus(err) == http.StatusBadGateway
   481  }
   482  
   483  func IgnoreNotFound(err error) error {
   484  	if IsNotFound(err) {
   485  		return nil
   486  	}
   487  	return err
   488  }
   489  
   490  // format username with couchdb mandatory prefix (org.couchdb.user:)
   491  func formatUserString(username string) string {
   492  	return fmt.Sprintf("%s%s", kivik.UserPrefix, username)
   493  }
   494  
   495  func FormatURI(_, _, uri, port string) string {
   496  	url := url.URL{
   497  		Scheme: "http",
   498  		Host:   fmt.Sprintf("%s:%s", uri, port),
   499  		// User:   url.UserPassword(username, password),
   500  	}
   501  	return url.String()
   502  }
   503  
   504  func FormatURL(username, password, uri string) (string, error) {
   505  	parsedURL, err := url.Parse(uri)
   506  
   507  	if err != nil {
   508  		return "", err
   509  	}
   510  
   511  	parsedURL.User = url.UserPassword(username, password)
   512  	return parsedURL.String(), nil
   513  }
   514  
   515  func FormatFinishClusterURI(username, password, uri, port string) string {
   516  	return FormatClusterURI(username, password, uri, port, "_cluster_setup")
   517  }
   518  
   519  func FormatClusterURI(username, password, uri, port, path string) string {
   520  	url := url.URL{
   521  		Scheme: "http",
   522  		Host:   fmt.Sprintf("%s:%s", uri, port),
   523  		User:   url.UserPassword(username, password),
   524  		Path:   path,
   525  	}
   526  	return url.String()
   527  }
   528  

View as plain text