package couchctl import ( "context" "encoding/json" "fmt" "time" "github.com/go-kivik/kivik/v4" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/metrics" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr" "edge-infra.dev/pkg/k8s/runtime/patch" "github.com/go-logr/logr" kuberecorder "k8s.io/client-go/tools/record" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/predicate" ) type CouchIndexReconciler struct { client.Client NodeResourcePredicate kuberecorder.EventRecorder Name string Config *Config Metrics metrics.Metrics patchOptions []patch.Option } var ( indexConditions = reconcile.Conditions{ Target: status.ReadyCondition, Owned: []string{ dsapi.IndexSetupSucceededReason, status.ReadyCondition, status.ReconcilingCondition, status.StalledCondition, }, Summarize: []string{ dsapi.IndexSetupSucceededReason, status.StalledCondition, }, NegativePolarity: []string{ status.ReconcilingCondition, status.StalledCondition, }, } ) // SetupWithManager sets up CouchIndexReconciler with the manager func (r *CouchIndexReconciler) SetupWithManager(mgr ctrl.Manager) error { r.patchOptions = getPatchOptions(indexConditions.Owned, r.Name) return ctrl.NewControllerManagedBy(mgr). For(&dsapi.CouchDBIndex{}, r.indexPredicates()). Complete(r) } func (r *CouchIndexReconciler) indexPredicates() builder.Predicates { return builder.WithPredicates( predicate.GenerationChangedPredicate{}, predicate.NewPredicateFuncs(func(_ client.Object) bool { if r.Config.IsDSDS() { server, err := couchDBLeader(context.Background(), r.Client, r.Config) if err != nil { return false } return r.ShouldReconcile(r.Config, server) } return true })) } func (r *CouchIndexReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { reconcileStart := time.Now() log := ctrl.LoggerFrom(ctx) index := &dsapi.CouchDBIndex{} if err := r.Client.Get(ctx, req.NamespacedName, index); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } index.Spec.SetRetryInterval(r.Config.RequeueTime) index.Spec.SetInterval(r.Config.PollingInterval) log = log.WithValues("dbname", index.Spec.DB) ctx = logr.NewContext(ctx, log) patcher := patch.NewSerialPatcher(index, r.Client) if err := reconcile.Progressing(ctx, index, patcher, r.patchOptions...); err != nil { log.Error(err, "unable to update CouchDBIndex status") return ctrl.Result{}, err } recResult := reconcile.ResultEmpty var recErr recerr.Error defer func() { summarizer := reconcile.NewSummarizer(patcher) res, err = summarizer.SummarizeAndPatch(ctx, index, []reconcile.SummarizeOption{ reconcile.WithConditions(serverConditions), reconcile.WithResult(recResult), reconcile.WithError(recErr), reconcile.WithIgnoreNotFound(), reconcile.WithProcessors(reconcile.RecordResult), reconcile.WithFieldOwner(r.Name), reconcile.WithEventRecorder(r.EventRecorder), }...) r.Metrics.RecordDuration(ctx, index, reconcileStart) r.Metrics.RecordReadiness(ctx, index) log.Info("Reconcile finished", "duration", time.Since(reconcileStart).String()) }() // Check if finalizer exists if !controllerutil.ContainsFinalizer(index, DatasyncFinalizer) { controllerutil.AddFinalizer(index, DatasyncFinalizer) // Return immediately so that we requeue and reconcile object with finalizer // added. recResult = reconcile.ResultRequeue return } if !index.ObjectMeta.DeletionTimestamp.IsZero() { recErr = r.finalize(ctx, index) return } if recErr = r.reconcile(ctx, index); recErr != nil { recErr.ToCondition(index, dsapi.IndexCreationFailedReason) err = recErr return } recResult = reconcile.ResultSuccess conditions.MarkTrue(index, dsapi.ServerSetupSucceededReason, status.SucceededReason, "Successfully set up CouchDBIndex") log.Info("Successfully set up CouchDBIndex") return } func (r *CouchIndexReconciler) reconcile(ctx context.Context, index *dsapi.CouchDBIndex) recerr.Error { log := logr.FromContextOrDiscard(ctx) name := index.IndexName() indexSelector := map[string]interface{}{} indexSelector["fields"] = index.Spec.Index.Fields if len(index.Spec.Index.PartialFilterSelector) > 0 { pfs := map[string]interface{}{} err := json.Unmarshal([]byte(index.Spec.Index.PartialFilterSelector), &pfs) if err != nil { log.Error(err, "fail to Unmarshal PartialFilterSelector, will not requeue") return recerr.NewStalled(fmt.Errorf("fail to Unmarshal PartialFilterSelector, will not requeue: %w", err), dsapi.IndexMappingFailedReason) } indexSelector["partial_filter_selector"] = pfs } cc, err := couchDBLeaderClient(ctx, r.Client, r.Config) if err != nil { log.Error(err, "failed to get leader couchdb client") return recerr.New(fmt.Errorf("fail to get leader couchdb client: %w", err), status.DependencyInvalidReason) } defer func(cc *couchdb.CouchDB, ctx context.Context) { err := cc.Close(ctx) if err != nil { log.Error(err, "failed to close couchdb client") } }(cc, ctx) err = cc.Client.DB(index.Spec.DB).CreateIndex(ctx, index.Spec.DDoc, name, indexSelector, kivik.Param("partitioned", index.Spec.Partitioned)) if err != nil { log.Error(err, "failed to create couchdb index") return recerr.New(fmt.Errorf("failed to create couchdb index: %w", err), dsapi.IndexCreationFailedReason) } return nil } func (r *CouchIndexReconciler) finalize(ctx context.Context, index *dsapi.CouchDBIndex) recerr.Error { log := logr.FromContextOrDiscard(ctx) cc, err := couchDBLeaderClient(ctx, r.Client, r.Config) if err != nil { log.Error(err, "failed to get leader couchdb client") return recerr.New(fmt.Errorf("fail to get leader couchdb client: %w", err), status.DependencyInvalidReason) } defer func(cc *couchdb.CouchDB, ctx context.Context) { err := cc.Close(ctx) if err != nil { log.Error(err, "failed to close couchdb client") } }(cc, ctx) err = cc.Client.DB(index.Spec.DB).DeleteIndex(ctx, index.Spec.DDoc, index.IndexName()) if err != nil { if couchdb.IsNotFound(err) { controllerutil.RemoveFinalizer(index, DatasyncFinalizer) return nil } log.Error(err, "failed to delete couchdb index") return recerr.New(fmt.Errorf("failed to delete couchdb index: %w", err), dsapi.IndexCreationFailedReason) } controllerutil.RemoveFinalizer(index, DatasyncFinalizer) return nil }