package couchctl import ( "context" "fmt" "slices" "time" "github.com/go-logr/logr" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/metrics" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr" "edge-infra.dev/pkg/k8s/runtime/patch" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/types" kuberecorder "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/predicate" re "sigs.k8s.io/controller-runtime/pkg/reconcile" ) type CouchReplicationReconciler struct { client.Client NodeResourcePredicate kuberecorder.EventRecorder Name string Config *Config Metrics metrics.Metrics patchOptions []patch.Option interlockClient *InterlockClient replicationEvent *ReplicationEvent log logr.Logger } var ( replicationConditions = reconcile.Conditions{ Target: status.ReadyCondition, Owned: []string{ string(dsapi.ReplicationSucceededStatus), status.ReadyCondition, status.ReconcilingCondition, status.StalledCondition, }, Summarize: []string{ string(dsapi.ReplicationSucceededStatus), status.StalledCondition, }, NegativePolarity: []string{ status.ReconcilingCondition, status.StalledCondition, }, } ) // SetupWithManager sets up CouchReplicationReconciler with the manager func (r *CouchReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error { r.patchOptions = getPatchOptions(replicationConditions.Owned, r.Name) r.log = mgr.GetLogger() b := ctrl.NewControllerManagedBy(mgr). For(&dsapi.CouchDBReplicationSet{}, r.replicationPredicates()) if r.Config.IsDSDS() { r.interlockClient = NewInterlockClient(r.Config.InterlockAPIURL, r.EnQueue) b.WatchesRawSource(r.interlockClient) } r.replicationEvent.log = r.log // override default logger b.WatchesRawSource(r.replicationEvent) return b.Complete(r) } func (r *CouchReplicationReconciler) replicationPredicates() builder.Predicates { return builder.WithPredicates( predicate.GenerationChangedPredicate{}, predicate.NewPredicateFuncs(func(obj client.Object) bool { if r.Config.IsDSDS() { return r.ShouldReconcile(r.Config, obj) } return true })) } func (r *CouchReplicationReconciler) EnQueue(_ HostState, queue workqueue.RateLimitingInterface) { repls := &dsapi.CouchDBReplicationSetList{} opts := []client.ListOption{client.MatchingLabels{couchdb.NodeUIDLabel: r.Config.NodeUID}} err := r.Client.List(context.Background(), repls, opts...) if err != nil { r.log.Error(err, "fail to list replication sets") return } for _, item := range repls.Items { queue.Add(re.Request{ NamespacedName: types.NamespacedName{ Namespace: item.Namespace, Name: item.Name, }, }) } } func (r *CouchReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { reconcileStart := time.Now() log := ctrl.LoggerFrom(ctx) replicationSet := &dsapi.CouchDBReplicationSet{} if err := r.Client.Get(ctx, req.NamespacedName, replicationSet); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } replicationSet.WithRetry(r.Config.RequeueTime) replicationSet.WithInterval(r.Config.ReplicationPollingInterval) log = log.WithValues("replication-db", replicationSet.Spec.Datasets[0].Name) ctx = logr.NewContext(ctx, log) patcher := patch.NewSerialPatcher(replicationSet, r.Client) if err := reconcile.Progressing(ctx, replicationSet, patcher, r.patchOptions...); err != nil { log.Error(err, "unable to update status") return ctrl.Result{}, err } recResult := reconcile.ResultEmpty var recErr recerr.Error defer func() { summarizer := reconcile.NewSummarizer(patcher) res, err = summarizer.SummarizeAndPatch(ctx, replicationSet, []reconcile.SummarizeOption{ reconcile.WithConditions(replicationConditions), reconcile.WithResult(recResult), reconcile.WithError(recErr), reconcile.WithIgnoreNotFound(), reconcile.WithProcessors( reconcile.RecordResult, ), reconcile.WithFieldOwner(r.Name), reconcile.WithEventRecorder(r.EventRecorder), }...) r.Metrics.RecordDuration(ctx, replicationSet, reconcileStart) r.Metrics.RecordReadiness(ctx, replicationSet) }() if recErr = r.reconcile(ctx, replicationSet); recErr != nil { if !couchDBNotReadyOrNotFound(recErr) { recErr.ToCondition(replicationSet, string(dsapi.ReplicationSucceededStatus)) err = recErr return } } recResult = reconcile.ResultSuccess conditions.MarkTrue(replicationSet, string(dsapi.ReplicationSucceededStatus), status.SucceededReason, "Successfully created replication") log.Info("Successfully created replication") return } func (r *CouchReplicationReconciler) reconcile(ctx context.Context, repl *dsapi.CouchDBReplicationSet) recerr.Error { log := logr.FromContextOrDiscard(ctx) var inLom bool if r.Config.IsDSDS() { hs, err := r.interlockClient.GetHostState(context.Background()) if err != nil { r.log.Error(err, "fail to get host state from interlock API") return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.RequeueTime) } inLom = hs.InLOM() } // get the source secret sourceCreds := &couchdb.ReplicationCredentials{} sourceNN := types.NamespacedName{Name: repl.Spec.Source.Name, Namespace: repl.Spec.Source.Namespace} reqTime := r.Config.ServerNotReady fromCloud := cloudReplication(sourceNN) if fromCloud { // for cloud replication datasync might not be enabled reqTime = r.Config.EnablementWatchInterval } _, err := sourceCreds.FromSecret(ctx, r.Client, sourceNN) if err != nil { msg := "replication secret not found" if fromCloud { msg = "datasync not enabled: " + msg } log.Error(err, msg, "NamespacedName", sourceNN) return recerr.NewWait(err, status.DependencyNotReadyReason, reqTime) } if !fromCloud { replicationDB := repl.Spec.Datasets[0].Name sourceCreds.DBName = []byte(replicationDB) leaderURL, err := r.leaderURL(ctx) if err != nil { log.Error(err, "leader couchdb not found", "NamespacedName", sourceNN) return recerr.NewWait(err, status.DependencyNotReadyReason, reqTime) } sourceCreds.URI = []byte(leaderURL) } targetServerNN := repl.ServerRef() ready, targetServer, err := checkIfServerIsReady(ctx, r.Client, repl) if err != nil { return recerr.NewWait(err, dsapi.ServerInvalidReason, r.Config.ServerNotReady) } if !ready { err := fmt.Errorf("%v %w", targetServerNN, ErrServerNotReady) return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.ServerNotReady) } log = log.WithValues("IN_LOM", inLom, "server", client.ObjectKeyFromObject(targetServer), "URI", targetServer.Spec.URI) ctx = logr.NewContext(ctx, log) // get the target server admin credentials to be able to create replication targetAdminCreds := &couchdb.AdminCredentials{} targetAdminRef := targetServer.AdminCredentials() targetAdminNN := types.NamespacedName{Name: targetAdminRef.Name, Namespace: targetAdminRef.Namespace} _, err = targetAdminCreds.FromSecret(ctx, r.Client, targetAdminNN) switch { case err != nil && kerrors.IsNotFound(err): log.Error(err, "error target server AdminCredentials Not found", "NamespacedName", targetAdminNN) return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.ServerNotReady) case err != nil: log.Error(err, "error getting target server AdminCredentials", "NamespacedName", targetAdminNN) return recerr.New(err, status.DependencyInvalidReason) } // create the couchdb client using the server admin credentials targetClient := &couchdb.CouchDB{} err = targetClient.New(couchdb.Driver, string(targetAdminCreds.Username), string(targetAdminCreds.Password), targetServer.Spec.URI, r.Config.CouchDBPort) if err != nil { log.Error(err, "error initializing couchdb client", "NamespacedName", targetAdminNN) return recerr.NewWait(err, string(dsapi.ReplicationCredentialsInvalidStatus), r.Config.ServerNotReady) } // defer closing the client defer targetClient.Close(ctx) sourceClient := &couchdb.CouchDB{} err = sourceClient.NewFromURL(string(sourceCreds.Username), string(sourceCreds.Password), string(sourceCreds.URI)) if err != nil { log.Error(err, "error initializing cloud couchdb client", "NamespacedName", sourceNN) return recerr.NewWait(err, string(dsapi.ReplicationCredentialsInvalidStatus), r.Config.ServerNotReady) } // defer closing the client defer sourceClient.Close(ctx) // Replicate the replication doc from cloud to store and from store to touchpoints dsapi.ResetCouchDBReplicationSetInventory(repl) ri := &ReplicationInfo{ SourceURI: string(sourceCreds.URI), SourceUsername: string(sourceCreds.Username), SourcePassword: string(sourceCreds.Password), TargetURI: fmt.Sprintf(couchdb.ReplicationHostFormat, r.Config.CouchDBPort), TargetUsername: string(targetAdminCreds.Username), TargetPassword: string(targetAdminCreds.Password), } replDB, rErr := r.createReplication(ctx, repl, targetClient, sourceClient, ri, repl.Spec.Datasets, inLom) if rErr != nil { return rErr } // source and target db names are the same replicationDBName := string(sourceCreds.DBName) replSet := &dsapi.ReplicationSet{} if err := targetClient.GetReplicationSetDoc(ctx, replicationDBName, replSet); err != nil { return recerr.NewWait(err, status.DependencyInvalidReason, r.Config.DatabaseNotFound) } // Replicate the databases found in the replication docs from cloud to store and from store to touchpoints // TODO success status replDBs, rErr := r.createReplication(ctx, repl, targetClient, sourceClient, ri, replSet.Datasets, inLom) if rErr != nil { return rErr } cleanReplicationStatus(repl, replDB, replDBs) if inLom { r.replicationEvent.Stop() } else if err := r.listenToReplicationEvents(repl, targetServer, sourceCreds, targetAdminCreds); err != nil { return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.RequeueTime) } return nil } func (r *CouchReplicationReconciler) listenToReplicationEvents(repl *dsapi.CouchDBReplicationSet, targetServer *dsapi.CouchDBServer, sourceCreds *couchdb.ReplicationCredentials, targetAdminCreds *couchdb.AdminCredentials) error { var err error if r.Config.ReplicationEventFromSource { err = r.replicationEvent.Listen(repl, string(sourceCreds.Username), string(sourceCreds.Password), string(sourceCreds.URI)) } else { err = r.replicationEvent.Listen(repl, string(targetAdminCreds.Username), string(targetAdminCreds.Password), fmt.Sprintf("http://%s:%s", targetServer.Spec.URI, r.Config.CouchDBPort)) } return err } func (r *CouchReplicationReconciler) leaderURL(ctx context.Context) (string, error) { list := &dsapi.CouchDBServerList{} err := r.Client.List(ctx, list) if err != nil { return "", err } for _, item := range list.Items { if item.Labels[couchdb.NodeLeaderLabel] == "true" { return getServerURL(r.Config, &item), nil // #nosec G601 } } return "", fmt.Errorf("leader CouchDBServer not found") } func (r *CouchReplicationReconciler) createReplication(ctx context.Context, repl *dsapi.CouchDBReplicationSet, targetClient, sourceClient *couchdb.CouchDB, ri *ReplicationInfo, datasets []dsapi.Dataset, cancel bool) (*BulkDocs, recerr.Error) { log := logr.FromContextOrDiscard(ctx) replicationDB := repl.Spec.Datasets[0].Name bulkDocs := toBulkReplicationDocs(ri, datasets, cancel) defer r.updateStatus(repl, bulkDocs) r.compactDatabase(ctx, targetClient, bulkDocs) r.validateReplication(ctx, targetClient, bulkDocs, replicationDB) err := r.bulkInsert(ctx, targetClient, bulkDocs) if err != nil { log.Error(err, "bulk insert error") } err = r.waitForReplicationDBCreation(ctx, targetClient, bulkDocs) if err != nil { log.Error(err, "database not replicated") } r.makeDBsReadOnly(ctx, targetClient, bulkDocs) r.updateMetrics(ctx, repl.Spec.Target.Name, targetClient, sourceClient, bulkDocs) if err := bulkDocs.JoinErrors(); err != nil { log.Error(err, "fail to replicate all databases", "stats", bulkDocs.Stats()) err = fmt.Errorf("fail to replicate all databases") return bulkDocs, wait(repl, err, string(dsapi.ReplicationBadStateStatus), r.Config.DatabaseNotFound) } return bulkDocs, nil } func (r *CouchReplicationReconciler) updateStatus(repl *dsapi.CouchDBReplicationSet, bulkDocs *BulkDocs) { for dbname, doc := range bulkDocs.Docs { if doc.State == Done { dsapi.SetCouchDBReplicationSetInfo(repl, dbname, dsapi.ReplicationSucceededStatus, "replication created successfully") } else { dsapi.SetCouchDBReplicationSetInfo(repl, dbname, dsapi.ReplicationCreationFailedStatus, errorOrMessage(doc.Error, "replication failed")) } } } func (r *CouchReplicationReconciler) makeDBsReadOnly(ctx context.Context, targetClient *couchdb.CouchDB, bulkDocs *BulkDocs) { for _, dbname := range bulkDocs.GetDocs(Done) { exists, err := targetClient.CheckIfDBExists(ctx, dbname) // remove this if err != nil || !exists { err = fmt.Errorf("error getting replicated db: %s", dbname) bulkDocs.SetError(dbname, err) continue } if err = targetClient.MakeReadOnly(ctx, dbname); err != nil { // 3 calls err = fmt.Errorf("fail to make replication database read-only: %w", err) bulkDocs.SetError(dbname, err) continue } } } // updateMetrics stats document count and diffs func (r *CouchReplicationReconciler) updateMetrics(ctx context.Context, servername string, targetClient, sourceClient *couchdb.CouchDB, bulkDocs *BulkDocs) { for dbname, doc := range bulkDocs.Docs { if doc.State == Done { targetDBStat, err := targetClient.Client.DB(dbname).Stats(ctx) if err != nil { doc.SetError(err) ReplicationStatusInc(servername, dbname, string(dsapi.ReplicationBadStateStatus), err.Error()) continue } DatabaseDocumentCountSet(servername, dbname, float64(targetDBStat.DocCount)) sourceDBStat, err := sourceClient.Client.DB(dbname).Stats(ctx) if err != nil { doc.SetError(err) ReplicationStatusInc(servername, dbname, string(dsapi.ReplicationBadStateStatus), err.Error()) continue } diff := sourceDBStat.DocCount - targetDBStat.DocCount if diff < 1 { diff = 0 } DatabaseDocumentDiffInc(servername, dbname, float64(diff)) ReplicationStatusInc(servername, dbname, string(dsapi.ReplicationSucceededStatus), "replication created successfully") } else { ReplicationStatusInc(servername, dbname, string(dsapi.ReplicationBadStateStatus), errorOrMessage(doc.Error, "replication failed")) } } } func (r *CouchReplicationReconciler) compactDatabase(ctx context.Context, targetDB *couchdb.CouchDB, bulkDocs *BulkDocs) { if r.Config.CompactRatio < 1 { return } for dbname, doc := range bulkDocs.Docs { db := targetDB.Client.DB(dbname) stats, err := db.Stats(ctx) if err != nil { if couchdb.IgnoreNotFound(err) != nil { doc.SetError(err) } continue } if stats.DiskSize == 0 || stats.ActiveSize == 0 { continue } if float64(stats.DiskSize)/float64(stats.ActiveSize) > r.Config.CompactRatio { if err := db.Compact(ctx); err != nil { doc.SetError(err) continue } } } } func (r *CouchReplicationReconciler) validateReplication(ctx context.Context, targetDB *couchdb.CouchDB, bulkDocs *BulkDocs, replicationDB string) { log := logr.FromContextOrDiscard(ctx) var skipDocs []string defer func() { bulkDocs.Remove(skipDocs...) }() for dbname, doc := range bulkDocs.Docs { if !shouldReplicate(r.Config, doc.Dataset, replicationDB) { skipDocs = append(skipDocs, dbname) // do not replicate or process this database err := targetDB.DeleteReplication(ctx, dbname) if err != nil { log.Error(err, "failed to delete replication", "dbname", dbname) continue } if doc.Dataset.Deleted { err = targetDB.Client.DestroyDB(ctx, dbname) if err != nil && !couchdb.IsNotFound(err) { log.Error(err, "failed to delete couch database", "dbname", dbname) continue } } } } } func (r *CouchReplicationReconciler) waitForReplicationDBCreation(ctx context.Context, cc *couchdb.CouchDB, bulkDocs *BulkDocs) error { replicatedDBs := bulkDocs.GetDocs(Done) if len(replicatedDBs) == 0 { return nil } m := make(map[string]bool) for { select { case <-ctx.Done(): return ctx.Err() case <-time.After(r.Config.DatabaseNotFound): return fmt.Errorf("timeout waiting for databases creation") default: dbs, err := cc.Client.AllDBs(ctx) if err != nil { return fmt.Errorf("error getting all dbs: %w", err) } if len(dbs) == 0 { break } for _, replicatedDB := range replicatedDBs { if m[replicatedDB] { continue } if slices.Contains(dbs, replicatedDB) { m[replicatedDB] = true } } if len(m) == len(replicatedDBs) { return nil } } time.Sleep(r.Config.ReplicationDBCreated) } } func shouldReplicate(cfg *Config, ds dsapi.Dataset, replicationDB string) bool { if ds.Deleted { return false } // replication doc overrides config if ds.Name == replicationDB { return true } if ds.EnterpriseUnitID == "" { return true } if cfg.SiteID != "" { // should not be nil for stores return cfg.SiteID == ds.EnterpriseUnitID } return true } func errorOrMessage(err error, msg string) string { if err != nil { return err.Error() } return msg } func cloudReplication(nn types.NamespacedName) bool { ref := dsapi.CloudReplicationCredentials() return ref.Name == nn.Name && ref.Namespace == nn.Namespace } // cleanReplicationStatus if db is deleted remove it from status func cleanReplicationStatus(repl *dsapi.CouchDBReplicationSet, dbs ...*BulkDocs) { var validDbs []string for _, db := range dbs { for name := range db.Docs { validDbs = append(validDbs, name) } } dsapi.CleanReplications(repl, validDbs) }