package couchctl import ( "bytes" "context" "encoding/json" "fmt" "strings" "time" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/metrics" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr" "edge-infra.dev/pkg/k8s/runtime/patch" "github.com/go-logr/logr" kuberecorder "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" ) type CouchDBDesignDocReconciler struct { client.Client NodeResourcePredicate kuberecorder.EventRecorder Name string Config *Config Metrics metrics.Metrics patchOptions []patch.Option } var ( dDocConditions = reconcile.Conditions{ Target: status.ReadyCondition, Owned: []string{ dsapi.DesignDocSetupSucceededReason, status.ReadyCondition, status.ReconcilingCondition, status.StalledCondition, }, Summarize: []string{ dsapi.DesignDocSetupSucceededReason, status.StalledCondition, }, NegativePolarity: []string{ status.ReconcilingCondition, status.StalledCondition, }, } ) // SetupWithManager sets up CouchDBDesignDocReconciler with the manager func (r *CouchDBDesignDocReconciler) SetupWithManager(mgr ctrl.Manager) error { r.patchOptions = getPatchOptions(dDocConditions.Owned, r.Name) return ctrl.NewControllerManagedBy(mgr). For(&dsapi.CouchDBDesignDoc{}, r.designDocPredicates()). Complete(r) } func (r *CouchDBDesignDocReconciler) designDocPredicates() builder.Predicates { return builder.WithPredicates( predicate.GenerationChangedPredicate{}, predicate.NewPredicateFuncs(func(_ client.Object) bool { if r.Config.IsDSDS() { server, err := couchDBLeader(context.Background(), r.Client, r.Config) if err != nil { return false } return r.ShouldReconcile(r.Config, server) } return true })) } func (r *CouchDBDesignDocReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { reconcileStart := time.Now() log := ctrl.LoggerFrom(ctx) ddoc := &dsapi.CouchDBDesignDoc{} if err := r.Client.Get(ctx, req.NamespacedName, ddoc); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } ddoc.Spec.SetRetryInterval(r.Config.RequeueTime) ddoc.Spec.SetInterval(r.Config.PollingInterval) log = log.WithValues("design doc", ddoc.Spec.ID) ctx = logr.NewContext(ctx, log) patcher := patch.NewSerialPatcher(ddoc, r.Client) if err := reconcile.Progressing(ctx, ddoc, patcher, r.patchOptions...); err != nil { log.Error(err, "unable to update CouchDBDesignDoc status") return ctrl.Result{}, err } recResult := reconcile.ResultEmpty var recErr recerr.Error defer func() { summarizer := reconcile.NewSummarizer(patcher) res, err = summarizer.SummarizeAndPatch(ctx, ddoc, []reconcile.SummarizeOption{ reconcile.WithConditions(dDocConditions), reconcile.WithResult(recResult), reconcile.WithError(recErr), reconcile.WithIgnoreNotFound(), reconcile.WithProcessors(reconcile.RecordResult), reconcile.WithFieldOwner(r.Name), reconcile.WithEventRecorder(r.EventRecorder), }...) r.Metrics.RecordDuration(ctx, ddoc, reconcileStart) r.Metrics.RecordReadiness(ctx, ddoc) }() if recErr = r.reconcile(ctx, ddoc); recErr != nil { recErr.ToCondition(ddoc, dsapi.DesignDocCreationFailedReason) return } // Check if finalizer exists if !controllerutil.ContainsFinalizer(ddoc, DatasyncFinalizer) { controllerutil.AddFinalizer(ddoc, DatasyncFinalizer) // Return immediately so that we requeue and reconcile object with finalizer // added. recResult = reconcile.ResultRequeue return } if !ddoc.ObjectMeta.DeletionTimestamp.IsZero() { recErr = r.finalize(ctx, ddoc) return } recResult = reconcile.ResultSuccess conditions.MarkTrue(ddoc, dsapi.DesignDocSetupSucceededReason, status.SucceededReason, "Successfully set up CouchDBDesignDoc") log.Info("Successfully set up CouchDBDesignDoc") return } func (r *CouchDBDesignDocReconciler) reconcile(ctx context.Context, ddoc *dsapi.CouchDBDesignDoc) recerr.Error { log := logr.FromContextOrDiscard(ctx) cc, err := couchDBLeaderClient(ctx, r.Client, r.Config) if err != nil { log.Error(err, "failed to get leader couchdb client") return recerr.New(fmt.Errorf("fail to get leader couchdb client: %w", err), status.DependencyInvalidReason) } defer func(cc *couchdb.CouchDB, ctx context.Context) { err := cc.Close(ctx) if err != nil { log.Error(err, "failed to close couchdb client") } }(cc, ctx) db := cc.Client.DB(ddoc.Spec.DB) docID := ddoc.Spec.ID if !strings.HasPrefix(docID, "_design/") { docID = fmt.Sprintf("_design/%s", ddoc.Spec.ID) } var rev string switch row := db.Get(ctx, docID); { case row.Err() == nil: doc := map[string]interface{}{} if err = row.ScanDoc(&doc); err != nil { log.Error(err, "failed to scan design doc") return recerr.New(fmt.Errorf("failed to scan design doc: %w", err), status.DependencyInvalidReason) } rev = doc["_rev"].(string) if designDocEqual(doc, ddoc.Spec.DesignDoc) { // Note: avoid increasing revision if design doc is the same log.Info("design doc already exists", "revision", rev) return nil } case couchdb.IsNotFound(row.Err()): rev = "" default: return recerr.New(fmt.Errorf("failed to create couchdb design doc: %w", row.Err()), status.DependencyInvalidReason) } var doc interface{} = ddoc.Spec.DesignDoc if rev != "" { doc = addRevisionDoc(doc, rev) } _, err = db.Put(ctx, docID, doc) if err != nil { log.Error(err, "failed to create couchdb design doc") return recerr.New(fmt.Errorf("failed to create couchdb design doc: %w", err), dsapi.DesignDocCreationFailedReason) } return nil } func (r *CouchDBDesignDocReconciler) finalize(ctx context.Context, ddoc *dsapi.CouchDBDesignDoc) recerr.Error { log := logr.FromContextOrDiscard(ctx) cc, err := couchDBLeaderClient(ctx, r.Client, r.Config) if err != nil { log.Error(err, "failed to get leader couchdb client") return recerr.New(fmt.Errorf("fail to get leader couchdb client: %w", err), status.DependencyInvalidReason) } defer func(cc *couchdb.CouchDB, ctx context.Context) { err := cc.Close(ctx) if err != nil { log.Error(err, "failed to close couchdb client") } }(cc, ctx) db := cc.Client.DB(ddoc.Spec.DB) ddocID := ddoc.Spec.ID if !strings.HasPrefix(ddocID, "_design/") { ddocID = fmt.Sprintf("_design/%s", ddoc.Spec.ID) } rev, err := db.GetRev(ctx, ddocID) if err != nil { if couchdb.IsNotFound(err) { return nil } log.Error(err, "failed to get couchdb design doc") return recerr.New(fmt.Errorf("failed to get couchdb design doc: %w", err), status.DependencyInvalidReason) } _, err = db.Delete(ctx, ddocID, rev) if err != nil { log.Error(err, "failed to create couchdb design doc") return recerr.New(fmt.Errorf("failed to create couchdb design doc: %w", err), dsapi.DesignDocCreationFailedReason) } controllerutil.RemoveFinalizer(ddoc, DatasyncFinalizer) return nil } func designDocEqual(doc map[string]interface{}, specDoc dsapi.DesignDoc) bool { lang, ok := doc["language"] if specDoc.Language != "" && ok && specDoc.Language != lang.(string) { return false } vdu, ok := doc["validateDocUpdate"] if specDoc.ValidateDocUpdate != "" && ok && specDoc.ValidateDocUpdate != vdu.(string) { return false } if !docValueEqual(specDoc.Views, doc["views"]) { return false } if !docValueEqual(specDoc.Updates, doc["updates"]) { return false } if !docValueEqual(specDoc.Filters, doc["filters"]) { return false } return true } func docValueEqual(doc1 map[string]map[string]string, doc2 interface{}) bool { if len(doc1) == 0 && doc2 == nil { return true } if len(doc1) > 0 && doc2 == nil { return false } if doc2 != nil && len(doc1) == 0 { return false } data1, err := json.Marshal(doc1) if err != nil { return false } data2, err := json.Marshal(doc2) if err != nil { return false } return bytes.Equal(data1, data2) } func addRevisionDoc(doc interface{}, rev string) map[string]interface{} { m := map[string]interface{}{} m["_rev"] = rev data, _ := json.Marshal(doc) _ = json.Unmarshal(data, &m) return m }