package couchctl import ( "context" "errors" "fmt" "net/url" "slices" "maps" "github.com/go-kivik/kivik/v4" "github.com/go-logr/logr" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" ) type ReplicationInfo struct { SourceURI string SourceUsername string SourcePassword string TargetURI string // http://localhost:5984 TargetUsername string TargetPassword string } type State int const ( Processing State = iota Done Error ) type Stat struct { total int processing int done int error int } func (s Stat) String() string { return fmt.Sprintf("Total: %d, Processing: %d, Done: %d, Error: %d", s.total, s.processing, s.done, s.error) } func (s Stat) Done() bool { return s.processing == 0 } type BulkDocs struct { Docs map[string]*BulkDoc } type BulkDoc struct { State Doc map[string]interface{} Error error Dataset dsapi.Dataset } func (d *BulkDoc) SetError(err error) { d.State = Error d.Error = err } func (d *BulkDoc) SetDone() { d.State = Done } func (b *BulkDocs) SetProcessing(id string) bool { return b.SetState(id, Processing) } func (b *BulkDocs) SetDone(id string) bool { return b.SetState(id, Done) } func (b *BulkDocs) SetError(id string, err error) bool { if doc, ok := b.Docs[id]; ok { doc.State = Error doc.Error = err } return false } func (b *BulkDocs) Remove(dbs ...string) { for _, dbname := range dbs { delete(b.Docs, dbname) } } // SetErrors can't no longer process doc, set all remaining doc as error func (b *BulkDocs) SetErrors(err error) { for _, doc := range b.Docs { if doc.State != Done { doc.Error = err doc.State = Error } } } func (b *BulkDocs) SetState(id string, state State) bool { if doc, ok := b.Docs[id]; ok { doc.State = state } return false } func (b *BulkDocs) DoneProcessing() bool { for _, doc := range b.Docs { if doc.State == Processing { return false } } return true } func (b *BulkDocs) ProcessingDocs() []interface{} { var results []interface{} for _, doc := range b.Docs { if doc.State == Processing && doc.Doc != nil { results = append(results, doc.Doc) } } return results } func (b *BulkDocs) SetRevision(id string, rev string) bool { if doc, ok := b.Docs[id]; ok && doc.Doc != nil { doc.State = Processing doc.Doc["_rev"] = rev return true } return false } func (b *BulkDocs) JoinErrors() error { var errs []error for _, doc := range b.Docs { if err := doc.Error; err != nil { errs = append(errs, err) } } return errors.Join(errs...) } func (b *BulkDocs) Stats() Stat { processing := 0 done := 0 err := 0 for _, doc := range b.Docs { switch doc.State { case Processing: processing++ case Done: done++ case Error: err++ } } return Stat{total: len(b.Docs), processing: processing, done: done, error: err} } func (b *BulkDocs) GetAllDocs() []*BulkDoc { return slices.Collect(maps.Values(b.Docs)) } func (b *BulkDocs) GetDocs(state State) []string { docs := make([]string, 0) for dbname, doc := range b.Docs { if doc.State == state { docs = append(docs, dbname) } } return docs } func (b *BulkDocs) Equals(id string, doc map[string]interface{}) bool { if bulkDoc, ok := b.Docs[id]; ok && bulkDoc.Doc != nil && doc != nil { return doc["_id"] == bulkDoc.Doc["_id"] && doc["continuous"] == bulkDoc.Doc["continuous"] && doc["create_target"] == bulkDoc.Doc["create_target"] && doc["source"] == bulkDoc.Doc["source"] && doc["target"] == bulkDoc.Doc["target"] && doc["cancel"] == bulkDoc.Doc["cancel"] // note we are ignoring doc["owner"] } return false } // bulkInsert put bulk docs into _replicator db func (r *CouchReplicationReconciler) bulkInsert(ctx context.Context, client *couchdb.CouchDB, docs *BulkDocs) error { log := logr.FromContextOrDiscard(ctx) log.Info("bulkInsert", "stats", docs.Stats()) if docs.DoneProcessing() { log.Info("bulk insert done processing", "stats", docs.Stats().String()) return nil } db, err := client.GetReplicatorDB() if err != nil { docs.SetErrors(err) log.Error(err, "fail to get replicator DB") return err } bulkResults, bulkErr := db.BulkDocs(ctx, docs.ProcessingDocs()) if bulkErr != nil { docs.SetErrors(err) log.Error(bulkErr, "failed to create replications in bulk") return bulkErr } var conflictDocs []kivik.BulkGetReference for _, bulkResult := range bulkResults { dbname := bulkResult.ID err := bulkResult.Error if err == nil { docs.SetDone(dbname) continue } if couchdb.IsConflict(err) { conflictDocs = append(conflictDocs, kivik.BulkGetReference{ID: dbname}) docs.SetProcessing(dbname) continue } docs.SetError(dbname, err) } if len(conflictDocs) == 0 { return r.bulkInsert(ctx, client, docs) } rows := db.BulkGet(ctx, conflictDocs) if rows.Err() != nil { docs.SetErrors(rows.Err()) log.Error(rows.Err(), "error getting bulk replications") return rows.Err() } for rows.Next() { dbname, err := rows.ID() if err != nil { docs.SetError(dbname, err) continue } doc := make(map[string]interface{}) if scanErr := rows.ScanDoc(&doc); scanErr != nil { docs.SetError(dbname, scanErr) continue } if docs.Equals(dbname, doc) { docs.SetDone(dbname) continue } rev := doc["_rev"].(string) docs.SetRevision(dbname, rev) } return r.bulkInsert(ctx, client, docs) } func toBulkReplicationDocs(r *ReplicationInfo, ds []dsapi.Dataset, cancel bool) *BulkDocs { bd := &BulkDocs{Docs: map[string]*BulkDoc{}} for _, d := range ds { dbname := d.Name sourceDSN, err1 := buildDSNName(r.SourceUsername, r.SourcePassword, r.SourceURI, dbname) targetDSN, err2 := buildDSNName(r.TargetUsername, r.TargetPassword, r.TargetURI, dbname) if err := errors.Join(err1, err2); err != nil { bd.Docs[dbname] = &BulkDoc{ Dataset: d, State: Error, Error: err, } } else { replDoc := toReplicationSettings(d.Config, dbname) replDoc["source"] = sourceDSN replDoc["target"] = targetDSN if cancel { replDoc["cancel"] = cancel } bd.Docs[dbname] = &BulkDoc{ Dataset: d, Doc: replDoc, State: Processing, } } } return bd } func buildDSNName(username, password, uri, dbname string) (string, error) { u, err := url.Parse(uri) if err != nil { return "", err } if !u.IsAbs() { u.Scheme = "http" } u.User = url.UserPassword(username, password) u.Path = dbname return u.String(), err } // ReplicationSettings convert ReplConfig to replication settings. TODO add all the settings in next pr func toReplicationSettings(r dsapi.ReplConfig, id string) map[string]interface{} { m := make(map[string]interface{}) m["_id"] = id m["continuous"] = r.Continuous m["create_target"] = r.CreateTarget return m }