...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/replication.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"net/url"
     8  	"slices"
     9  
    10  	"maps"
    11  
    12  	"github.com/go-kivik/kivik/v4"
    13  	"github.com/go-logr/logr"
    14  
    15  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    16  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    17  )
    18  
    19  type ReplicationInfo struct {
    20  	SourceURI      string
    21  	SourceUsername string
    22  	SourcePassword string
    23  
    24  	TargetURI      string // http://localhost:5984
    25  	TargetUsername string
    26  	TargetPassword string
    27  }
    28  
    29  type State int
    30  
    31  const (
    32  	Processing State = iota
    33  	Done
    34  	Error
    35  )
    36  
    37  type Stat struct {
    38  	total      int
    39  	processing int
    40  	done       int
    41  	error      int
    42  }
    43  
    44  func (s Stat) String() string {
    45  	return fmt.Sprintf("Total: %d, Processing: %d, Done: %d, Error: %d", s.total, s.processing, s.done, s.error)
    46  }
    47  func (s Stat) Done() bool {
    48  	return s.processing == 0
    49  }
    50  
    51  type BulkDocs struct {
    52  	Docs map[string]*BulkDoc
    53  }
    54  
    55  type BulkDoc struct {
    56  	State
    57  	Doc     map[string]interface{}
    58  	Error   error
    59  	Dataset dsapi.Dataset
    60  }
    61  
    62  func (d *BulkDoc) SetError(err error) {
    63  	d.State = Error
    64  	d.Error = err
    65  }
    66  
    67  func (d *BulkDoc) SetDone() {
    68  	d.State = Done
    69  }
    70  
    71  func (b *BulkDocs) SetProcessing(id string) bool {
    72  	return b.SetState(id, Processing)
    73  }
    74  
    75  func (b *BulkDocs) SetDone(id string) bool {
    76  	return b.SetState(id, Done)
    77  }
    78  
    79  func (b *BulkDocs) SetError(id string, err error) bool {
    80  	if doc, ok := b.Docs[id]; ok {
    81  		doc.State = Error
    82  		doc.Error = err
    83  	}
    84  	return false
    85  }
    86  
    87  func (b *BulkDocs) Remove(dbs ...string) {
    88  	for _, dbname := range dbs {
    89  		delete(b.Docs, dbname)
    90  	}
    91  }
    92  
    93  // SetErrors can't no longer process doc, set all remaining doc as error
    94  func (b *BulkDocs) SetErrors(err error) {
    95  	for _, doc := range b.Docs {
    96  		if doc.State != Done {
    97  			doc.Error = err
    98  			doc.State = Error
    99  		}
   100  	}
   101  }
   102  
   103  func (b *BulkDocs) SetState(id string, state State) bool {
   104  	if doc, ok := b.Docs[id]; ok {
   105  		doc.State = state
   106  	}
   107  	return false
   108  }
   109  
   110  func (b *BulkDocs) DoneProcessing() bool {
   111  	for _, doc := range b.Docs {
   112  		if doc.State == Processing {
   113  			return false
   114  		}
   115  	}
   116  	return true
   117  }
   118  
   119  func (b *BulkDocs) ProcessingDocs() []interface{} {
   120  	var results []interface{}
   121  	for _, doc := range b.Docs {
   122  		if doc.State == Processing && doc.Doc != nil {
   123  			results = append(results, doc.Doc)
   124  		}
   125  	}
   126  	return results
   127  }
   128  
   129  func (b *BulkDocs) SetRevision(id string, rev string) bool {
   130  	if doc, ok := b.Docs[id]; ok && doc.Doc != nil {
   131  		doc.State = Processing
   132  		doc.Doc["_rev"] = rev
   133  		return true
   134  	}
   135  	return false
   136  }
   137  
   138  func (b *BulkDocs) JoinErrors() error {
   139  	var errs []error
   140  	for _, doc := range b.Docs {
   141  		if err := doc.Error; err != nil {
   142  			errs = append(errs, err)
   143  		}
   144  	}
   145  	return errors.Join(errs...)
   146  }
   147  
   148  func (b *BulkDocs) Stats() Stat {
   149  	processing := 0
   150  	done := 0
   151  	err := 0
   152  	for _, doc := range b.Docs {
   153  		switch doc.State {
   154  		case Processing:
   155  			processing++
   156  		case Done:
   157  			done++
   158  		case Error:
   159  			err++
   160  		}
   161  	}
   162  
   163  	return Stat{total: len(b.Docs), processing: processing, done: done, error: err}
   164  }
   165  
   166  func (b *BulkDocs) GetAllDocs() []*BulkDoc {
   167  	return slices.Collect(maps.Values(b.Docs))
   168  }
   169  
   170  func (b *BulkDocs) GetDocs(state State) []string {
   171  	docs := make([]string, 0)
   172  	for dbname, doc := range b.Docs {
   173  		if doc.State == state {
   174  			docs = append(docs, dbname)
   175  		}
   176  	}
   177  	return docs
   178  }
   179  
   180  func (b *BulkDocs) Equals(id string, doc map[string]interface{}) bool {
   181  	if bulkDoc, ok := b.Docs[id]; ok && bulkDoc.Doc != nil && doc != nil {
   182  		return doc["_id"] == bulkDoc.Doc["_id"] &&
   183  			doc["continuous"] == bulkDoc.Doc["continuous"] &&
   184  			doc["create_target"] == bulkDoc.Doc["create_target"] &&
   185  			doc["source"] == bulkDoc.Doc["source"] &&
   186  			doc["target"] == bulkDoc.Doc["target"] &&
   187  			doc["cancel"] == bulkDoc.Doc["cancel"]
   188  		// note we are ignoring doc["owner"]
   189  	}
   190  	return false
   191  }
   192  
   193  // bulkInsert put bulk docs into _replicator db
   194  func (r *CouchReplicationReconciler) bulkInsert(ctx context.Context, client *couchdb.CouchDB, docs *BulkDocs) error {
   195  	log := logr.FromContextOrDiscard(ctx)
   196  
   197  	log.Info("bulkInsert", "stats", docs.Stats())
   198  
   199  	if docs.DoneProcessing() {
   200  		log.Info("bulk insert done processing", "stats", docs.Stats().String())
   201  		return nil
   202  	}
   203  
   204  	db, err := client.GetReplicatorDB()
   205  	if err != nil {
   206  		docs.SetErrors(err)
   207  		log.Error(err, "fail to get replicator DB")
   208  		return err
   209  	}
   210  
   211  	bulkResults, bulkErr := db.BulkDocs(ctx, docs.ProcessingDocs())
   212  	if bulkErr != nil {
   213  		docs.SetErrors(err)
   214  		log.Error(bulkErr, "failed to create replications in bulk")
   215  		return bulkErr
   216  	}
   217  
   218  	var conflictDocs []kivik.BulkGetReference
   219  	for _, bulkResult := range bulkResults {
   220  		dbname := bulkResult.ID
   221  		err := bulkResult.Error
   222  		if err == nil {
   223  			docs.SetDone(dbname)
   224  			continue
   225  		}
   226  		if couchdb.IsConflict(err) {
   227  			conflictDocs = append(conflictDocs, kivik.BulkGetReference{ID: dbname})
   228  			docs.SetProcessing(dbname)
   229  			continue
   230  		}
   231  
   232  		docs.SetError(dbname, err)
   233  	}
   234  
   235  	if len(conflictDocs) == 0 {
   236  		return r.bulkInsert(ctx, client, docs)
   237  	}
   238  
   239  	rows := db.BulkGet(ctx, conflictDocs)
   240  	if rows.Err() != nil {
   241  		docs.SetErrors(rows.Err())
   242  		log.Error(rows.Err(), "error getting bulk replications")
   243  		return rows.Err()
   244  	}
   245  
   246  	for rows.Next() {
   247  		dbname, err := rows.ID()
   248  		if err != nil {
   249  			docs.SetError(dbname, err)
   250  			continue
   251  		}
   252  		doc := make(map[string]interface{})
   253  		if scanErr := rows.ScanDoc(&doc); scanErr != nil {
   254  			docs.SetError(dbname, scanErr)
   255  			continue
   256  		}
   257  		if docs.Equals(dbname, doc) {
   258  			docs.SetDone(dbname)
   259  			continue
   260  		}
   261  		rev := doc["_rev"].(string)
   262  		docs.SetRevision(dbname, rev)
   263  	}
   264  
   265  	return r.bulkInsert(ctx, client, docs)
   266  }
   267  
   268  func toBulkReplicationDocs(r *ReplicationInfo, ds []dsapi.Dataset, cancel bool) *BulkDocs {
   269  	bd := &BulkDocs{Docs: map[string]*BulkDoc{}}
   270  	for _, d := range ds {
   271  		dbname := d.Name
   272  		sourceDSN, err1 := buildDSNName(r.SourceUsername, r.SourcePassword, r.SourceURI, dbname)
   273  		targetDSN, err2 := buildDSNName(r.TargetUsername, r.TargetPassword, r.TargetURI, dbname)
   274  		if err := errors.Join(err1, err2); err != nil {
   275  			bd.Docs[dbname] = &BulkDoc{
   276  				Dataset: d,
   277  				State:   Error,
   278  				Error:   err,
   279  			}
   280  		} else {
   281  			replDoc := toReplicationSettings(d.Config, dbname)
   282  			replDoc["source"] = sourceDSN
   283  			replDoc["target"] = targetDSN
   284  			if cancel {
   285  				replDoc["cancel"] = cancel
   286  			}
   287  			bd.Docs[dbname] = &BulkDoc{
   288  				Dataset: d,
   289  				Doc:     replDoc,
   290  				State:   Processing,
   291  			}
   292  		}
   293  	}
   294  	return bd
   295  }
   296  
   297  func buildDSNName(username, password, uri, dbname string) (string, error) {
   298  	u, err := url.Parse(uri)
   299  	if err != nil {
   300  		return "", err
   301  	}
   302  	if !u.IsAbs() {
   303  		u.Scheme = "http"
   304  	}
   305  	u.User = url.UserPassword(username, password)
   306  	u.Path = dbname
   307  	return u.String(), err
   308  }
   309  
   310  // ReplicationSettings convert ReplConfig to replication settings. TODO add all the settings in next pr
   311  func toReplicationSettings(r dsapi.ReplConfig, id string) map[string]interface{} {
   312  	m := make(map[string]interface{})
   313  	m["_id"] = id
   314  	m["continuous"] = r.Continuous
   315  	m["create_target"] = r.CreateTarget
   316  	return m
   317  }
   318  

View as plain text