...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/index_controller.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"time"
     8  
     9  	"github.com/go-kivik/kivik/v4"
    10  
    11  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    12  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    13  	"edge-infra.dev/pkg/k8s/meta/status"
    14  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    15  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    16  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    17  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    18  	"edge-infra.dev/pkg/k8s/runtime/patch"
    19  
    20  	"github.com/go-logr/logr"
    21  
    22  	kuberecorder "k8s.io/client-go/tools/record"
    23  	ctrl "sigs.k8s.io/controller-runtime"
    24  	"sigs.k8s.io/controller-runtime/pkg/builder"
    25  	"sigs.k8s.io/controller-runtime/pkg/client"
    26  	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    27  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    28  )
    29  
    30  type CouchIndexReconciler struct {
    31  	client.Client
    32  	NodeResourcePredicate
    33  	kuberecorder.EventRecorder
    34  	Name         string
    35  	Config       *Config
    36  	Metrics      metrics.Metrics
    37  	patchOptions []patch.Option
    38  }
    39  
    40  var (
    41  	indexConditions = reconcile.Conditions{
    42  		Target: status.ReadyCondition,
    43  		Owned: []string{
    44  			dsapi.IndexSetupSucceededReason,
    45  			status.ReadyCondition,
    46  			status.ReconcilingCondition,
    47  			status.StalledCondition,
    48  		},
    49  		Summarize: []string{
    50  			dsapi.IndexSetupSucceededReason,
    51  			status.StalledCondition,
    52  		},
    53  		NegativePolarity: []string{
    54  			status.ReconcilingCondition,
    55  			status.StalledCondition,
    56  		},
    57  	}
    58  )
    59  
    60  // SetupWithManager sets up CouchIndexReconciler with the manager
    61  func (r *CouchIndexReconciler) SetupWithManager(mgr ctrl.Manager) error {
    62  	r.patchOptions = getPatchOptions(indexConditions.Owned, r.Name)
    63  	return ctrl.NewControllerManagedBy(mgr).
    64  		For(&dsapi.CouchDBIndex{}, r.indexPredicates()).
    65  		Complete(r)
    66  }
    67  
    68  func (r *CouchIndexReconciler) indexPredicates() builder.Predicates {
    69  	return builder.WithPredicates(
    70  		predicate.GenerationChangedPredicate{},
    71  		predicate.NewPredicateFuncs(func(_ client.Object) bool {
    72  			if r.Config.IsDSDS() {
    73  				server, err := couchDBLeader(context.Background(), r.Client, r.Config)
    74  				if err != nil {
    75  					return false
    76  				}
    77  				return r.ShouldReconcile(r.Config, server)
    78  			}
    79  			return true
    80  		}))
    81  }
    82  
    83  func (r *CouchIndexReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
    84  	reconcileStart := time.Now()
    85  
    86  	log := ctrl.LoggerFrom(ctx)
    87  
    88  	index := &dsapi.CouchDBIndex{}
    89  	if err := r.Client.Get(ctx, req.NamespacedName, index); err != nil {
    90  		return ctrl.Result{}, client.IgnoreNotFound(err)
    91  	}
    92  	index.Spec.SetRetryInterval(r.Config.RequeueTime)
    93  	index.Spec.SetInterval(r.Config.PollingInterval)
    94  
    95  	log = log.WithValues("dbname", index.Spec.DB)
    96  	ctx = logr.NewContext(ctx, log)
    97  
    98  	patcher := patch.NewSerialPatcher(index, r.Client)
    99  	if err := reconcile.Progressing(ctx, index, patcher, r.patchOptions...); err != nil {
   100  		log.Error(err, "unable to update CouchDBIndex status")
   101  		return ctrl.Result{}, err
   102  	}
   103  
   104  	recResult := reconcile.ResultEmpty
   105  	var recErr recerr.Error
   106  
   107  	defer func() {
   108  		summarizer := reconcile.NewSummarizer(patcher)
   109  		res, err = summarizer.SummarizeAndPatch(ctx, index, []reconcile.SummarizeOption{
   110  			reconcile.WithConditions(serverConditions),
   111  			reconcile.WithResult(recResult),
   112  			reconcile.WithError(recErr),
   113  			reconcile.WithIgnoreNotFound(),
   114  			reconcile.WithProcessors(reconcile.RecordResult),
   115  			reconcile.WithFieldOwner(r.Name),
   116  			reconcile.WithEventRecorder(r.EventRecorder),
   117  		}...)
   118  		r.Metrics.RecordDuration(ctx, index, reconcileStart)
   119  		r.Metrics.RecordReadiness(ctx, index)
   120  		log.Info("Reconcile finished", "duration", time.Since(reconcileStart).String())
   121  	}()
   122  
   123  	// Check if finalizer exists
   124  	if !controllerutil.ContainsFinalizer(index, DatasyncFinalizer) {
   125  		controllerutil.AddFinalizer(index, DatasyncFinalizer)
   126  		// Return immediately so that we requeue and reconcile object with finalizer
   127  		// added.
   128  		recResult = reconcile.ResultRequeue
   129  		return
   130  	}
   131  
   132  	if !index.ObjectMeta.DeletionTimestamp.IsZero() {
   133  		recErr = r.finalize(ctx, index)
   134  		return
   135  	}
   136  
   137  	if recErr = r.reconcile(ctx, index); recErr != nil {
   138  		recErr.ToCondition(index, dsapi.IndexCreationFailedReason)
   139  		err = recErr
   140  		return
   141  	}
   142  
   143  	recResult = reconcile.ResultSuccess
   144  	conditions.MarkTrue(index, dsapi.ServerSetupSucceededReason, status.SucceededReason, "Successfully set up CouchDBIndex")
   145  	log.Info("Successfully set up CouchDBIndex")
   146  	return
   147  }
   148  
   149  func (r *CouchIndexReconciler) reconcile(ctx context.Context, index *dsapi.CouchDBIndex) recerr.Error {
   150  	log := logr.FromContextOrDiscard(ctx)
   151  
   152  	name := index.IndexName()
   153  	indexSelector := map[string]interface{}{}
   154  	indexSelector["fields"] = index.Spec.Index.Fields
   155  	if len(index.Spec.Index.PartialFilterSelector) > 0 {
   156  		pfs := map[string]interface{}{}
   157  		err := json.Unmarshal([]byte(index.Spec.Index.PartialFilterSelector), &pfs)
   158  		if err != nil {
   159  			log.Error(err, "fail to Unmarshal PartialFilterSelector, will not requeue")
   160  			return recerr.NewStalled(fmt.Errorf("fail to Unmarshal PartialFilterSelector, will not requeue: %w", err), dsapi.IndexMappingFailedReason)
   161  		}
   162  		indexSelector["partial_filter_selector"] = pfs
   163  	}
   164  
   165  	cc, err := couchDBLeaderClient(ctx, r.Client, r.Config)
   166  	if err != nil {
   167  		log.Error(err, "failed to get leader couchdb client")
   168  		return recerr.New(fmt.Errorf("fail to get leader couchdb client: %w", err), status.DependencyInvalidReason)
   169  	}
   170  
   171  	defer func(cc *couchdb.CouchDB, ctx context.Context) {
   172  		err := cc.Close(ctx)
   173  		if err != nil {
   174  			log.Error(err, "failed to close couchdb client")
   175  		}
   176  	}(cc, ctx)
   177  
   178  	err = cc.Client.DB(index.Spec.DB).CreateIndex(ctx, index.Spec.DDoc, name, indexSelector, kivik.Param("partitioned", index.Spec.Partitioned))
   179  	if err != nil {
   180  		log.Error(err, "failed to create couchdb index")
   181  		return recerr.New(fmt.Errorf("failed to create couchdb index: %w", err), dsapi.IndexCreationFailedReason)
   182  	}
   183  	return nil
   184  }
   185  
   186  func (r *CouchIndexReconciler) finalize(ctx context.Context, index *dsapi.CouchDBIndex) recerr.Error {
   187  	log := logr.FromContextOrDiscard(ctx)
   188  
   189  	cc, err := couchDBLeaderClient(ctx, r.Client, r.Config)
   190  	if err != nil {
   191  		log.Error(err, "failed to get leader couchdb client")
   192  		return recerr.New(fmt.Errorf("fail to get leader couchdb client: %w", err), status.DependencyInvalidReason)
   193  	}
   194  
   195  	defer func(cc *couchdb.CouchDB, ctx context.Context) {
   196  		err := cc.Close(ctx)
   197  		if err != nil {
   198  			log.Error(err, "failed to close couchdb client")
   199  		}
   200  	}(cc, ctx)
   201  
   202  	err = cc.Client.DB(index.Spec.DB).DeleteIndex(ctx, index.Spec.DDoc, index.IndexName())
   203  	if err != nil {
   204  		if couchdb.IsNotFound(err) {
   205  			controllerutil.RemoveFinalizer(index, DatasyncFinalizer)
   206  			return nil
   207  		}
   208  		log.Error(err, "failed to delete couchdb index")
   209  		return recerr.New(fmt.Errorf("failed to delete couchdb index: %w", err), dsapi.IndexCreationFailedReason)
   210  	}
   211  	controllerutil.RemoveFinalizer(index, DatasyncFinalizer)
   212  	return nil
   213  }
   214  

View as plain text