...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/design_doc_controller.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"bytes"
     5  	"context"
     6  	"encoding/json"
     7  	"fmt"
     8  	"strings"
     9  	"time"
    10  
    11  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    12  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    13  	"edge-infra.dev/pkg/k8s/meta/status"
    14  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    15  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    16  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    17  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    18  	"edge-infra.dev/pkg/k8s/runtime/patch"
    19  
    20  	"github.com/go-logr/logr"
    21  
    22  	kuberecorder "k8s.io/client-go/tools/record"
    23  	ctrl "sigs.k8s.io/controller-runtime"
    24  	"sigs.k8s.io/controller-runtime/pkg/builder"
    25  	"sigs.k8s.io/controller-runtime/pkg/client"
    26  	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    27  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    28  )
    29  
    30  type CouchDBDesignDocReconciler struct {
    31  	client.Client
    32  	NodeResourcePredicate
    33  	kuberecorder.EventRecorder
    34  	Name         string
    35  	Config       *Config
    36  	Metrics      metrics.Metrics
    37  	patchOptions []patch.Option
    38  }
    39  
    40  var (
    41  	dDocConditions = reconcile.Conditions{
    42  		Target: status.ReadyCondition,
    43  		Owned: []string{
    44  			dsapi.DesignDocSetupSucceededReason,
    45  			status.ReadyCondition,
    46  			status.ReconcilingCondition,
    47  			status.StalledCondition,
    48  		},
    49  		Summarize: []string{
    50  			dsapi.DesignDocSetupSucceededReason,
    51  			status.StalledCondition,
    52  		},
    53  		NegativePolarity: []string{
    54  			status.ReconcilingCondition,
    55  			status.StalledCondition,
    56  		},
    57  	}
    58  )
    59  
    60  // SetupWithManager sets up CouchDBDesignDocReconciler with the manager
    61  func (r *CouchDBDesignDocReconciler) SetupWithManager(mgr ctrl.Manager) error {
    62  	r.patchOptions = getPatchOptions(dDocConditions.Owned, r.Name)
    63  	return ctrl.NewControllerManagedBy(mgr).
    64  		For(&dsapi.CouchDBDesignDoc{}, r.designDocPredicates()).
    65  		Complete(r)
    66  }
    67  
    68  func (r *CouchDBDesignDocReconciler) designDocPredicates() builder.Predicates {
    69  	return builder.WithPredicates(
    70  		predicate.GenerationChangedPredicate{},
    71  		predicate.NewPredicateFuncs(func(_ client.Object) bool {
    72  			if r.Config.IsDSDS() {
    73  				server, err := couchDBLeader(context.Background(), r.Client, r.Config)
    74  				if err != nil {
    75  					return false
    76  				}
    77  				return r.ShouldReconcile(r.Config, server)
    78  			}
    79  			return true
    80  		}))
    81  }
    82  
    83  func (r *CouchDBDesignDocReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
    84  	reconcileStart := time.Now()
    85  
    86  	log := ctrl.LoggerFrom(ctx)
    87  
    88  	ddoc := &dsapi.CouchDBDesignDoc{}
    89  	if err := r.Client.Get(ctx, req.NamespacedName, ddoc); err != nil {
    90  		return ctrl.Result{}, client.IgnoreNotFound(err)
    91  	}
    92  	ddoc.Spec.SetRetryInterval(r.Config.RequeueTime)
    93  	ddoc.Spec.SetInterval(r.Config.PollingInterval)
    94  	log = log.WithValues("design doc", ddoc.Spec.ID)
    95  	ctx = logr.NewContext(ctx, log)
    96  
    97  	patcher := patch.NewSerialPatcher(ddoc, r.Client)
    98  	if err := reconcile.Progressing(ctx, ddoc, patcher, r.patchOptions...); err != nil {
    99  		log.Error(err, "unable to update CouchDBDesignDoc status")
   100  		return ctrl.Result{}, err
   101  	}
   102  
   103  	recResult := reconcile.ResultEmpty
   104  	var recErr recerr.Error
   105  
   106  	defer func() {
   107  		summarizer := reconcile.NewSummarizer(patcher)
   108  		res, err = summarizer.SummarizeAndPatch(ctx, ddoc, []reconcile.SummarizeOption{
   109  			reconcile.WithConditions(dDocConditions),
   110  			reconcile.WithResult(recResult),
   111  			reconcile.WithError(recErr),
   112  			reconcile.WithIgnoreNotFound(),
   113  			reconcile.WithProcessors(reconcile.RecordResult),
   114  			reconcile.WithFieldOwner(r.Name),
   115  			reconcile.WithEventRecorder(r.EventRecorder),
   116  		}...)
   117  		r.Metrics.RecordDuration(ctx, ddoc, reconcileStart)
   118  		r.Metrics.RecordReadiness(ctx, ddoc)
   119  	}()
   120  
   121  	if recErr = r.reconcile(ctx, ddoc); recErr != nil {
   122  		recErr.ToCondition(ddoc, dsapi.DesignDocCreationFailedReason)
   123  		return
   124  	}
   125  
   126  	// Check if finalizer exists
   127  	if !controllerutil.ContainsFinalizer(ddoc, DatasyncFinalizer) {
   128  		controllerutil.AddFinalizer(ddoc, DatasyncFinalizer)
   129  		// Return immediately so that we requeue and reconcile object with finalizer
   130  		// added.
   131  		recResult = reconcile.ResultRequeue
   132  		return
   133  	}
   134  
   135  	if !ddoc.ObjectMeta.DeletionTimestamp.IsZero() {
   136  		recErr = r.finalize(ctx, ddoc)
   137  		return
   138  	}
   139  
   140  	recResult = reconcile.ResultSuccess
   141  	conditions.MarkTrue(ddoc, dsapi.DesignDocSetupSucceededReason, status.SucceededReason, "Successfully set up CouchDBDesignDoc")
   142  	log.Info("Successfully set up CouchDBDesignDoc")
   143  	return
   144  }
   145  
   146  func (r *CouchDBDesignDocReconciler) reconcile(ctx context.Context, ddoc *dsapi.CouchDBDesignDoc) recerr.Error {
   147  	log := logr.FromContextOrDiscard(ctx)
   148  
   149  	cc, err := couchDBLeaderClient(ctx, r.Client, r.Config)
   150  	if err != nil {
   151  		log.Error(err, "failed to get leader couchdb client")
   152  		return recerr.New(fmt.Errorf("fail to get leader couchdb client: %w", err), status.DependencyInvalidReason)
   153  	}
   154  
   155  	defer func(cc *couchdb.CouchDB, ctx context.Context) {
   156  		err := cc.Close(ctx)
   157  		if err != nil {
   158  			log.Error(err, "failed to close couchdb client")
   159  		}
   160  	}(cc, ctx)
   161  
   162  	db := cc.Client.DB(ddoc.Spec.DB)
   163  	docID := ddoc.Spec.ID
   164  	if !strings.HasPrefix(docID, "_design/") {
   165  		docID = fmt.Sprintf("_design/%s", ddoc.Spec.ID)
   166  	}
   167  
   168  	var rev string
   169  	switch row := db.Get(ctx, docID); {
   170  	case row.Err() == nil:
   171  		doc := map[string]interface{}{}
   172  		if err = row.ScanDoc(&doc); err != nil {
   173  			log.Error(err, "failed to scan design doc")
   174  			return recerr.New(fmt.Errorf("failed to scan design doc: %w", err), status.DependencyInvalidReason)
   175  		}
   176  		rev = doc["_rev"].(string)
   177  		if designDocEqual(doc, ddoc.Spec.DesignDoc) { // Note: avoid increasing revision if design doc is the same
   178  			log.Info("design doc already exists", "revision", rev)
   179  			return nil
   180  		}
   181  	case couchdb.IsNotFound(row.Err()):
   182  		rev = ""
   183  	default:
   184  		return recerr.New(fmt.Errorf("failed to create couchdb design doc: %w", row.Err()), status.DependencyInvalidReason)
   185  	}
   186  
   187  	var doc interface{} = ddoc.Spec.DesignDoc
   188  	if rev != "" {
   189  		doc = addRevisionDoc(doc, rev)
   190  	}
   191  	_, err = db.Put(ctx, docID, doc)
   192  	if err != nil {
   193  		log.Error(err, "failed to create couchdb design doc")
   194  		return recerr.New(fmt.Errorf("failed to create couchdb design doc: %w", err), dsapi.DesignDocCreationFailedReason)
   195  	}
   196  	return nil
   197  }
   198  
   199  func (r *CouchDBDesignDocReconciler) finalize(ctx context.Context, ddoc *dsapi.CouchDBDesignDoc) recerr.Error {
   200  	log := logr.FromContextOrDiscard(ctx)
   201  
   202  	cc, err := couchDBLeaderClient(ctx, r.Client, r.Config)
   203  	if err != nil {
   204  		log.Error(err, "failed to get leader couchdb client")
   205  		return recerr.New(fmt.Errorf("fail to get leader couchdb client: %w", err), status.DependencyInvalidReason)
   206  	}
   207  
   208  	defer func(cc *couchdb.CouchDB, ctx context.Context) {
   209  		err := cc.Close(ctx)
   210  		if err != nil {
   211  			log.Error(err, "failed to close couchdb client")
   212  		}
   213  	}(cc, ctx)
   214  
   215  	db := cc.Client.DB(ddoc.Spec.DB)
   216  
   217  	ddocID := ddoc.Spec.ID
   218  	if !strings.HasPrefix(ddocID, "_design/") {
   219  		ddocID = fmt.Sprintf("_design/%s", ddoc.Spec.ID)
   220  	}
   221  	rev, err := db.GetRev(ctx, ddocID)
   222  	if err != nil {
   223  		if couchdb.IsNotFound(err) {
   224  			return nil
   225  		}
   226  		log.Error(err, "failed to get couchdb design doc")
   227  		return recerr.New(fmt.Errorf("failed to get couchdb design doc: %w", err), status.DependencyInvalidReason)
   228  	}
   229  	_, err = db.Delete(ctx, ddocID, rev)
   230  	if err != nil {
   231  		log.Error(err, "failed to create couchdb design doc")
   232  		return recerr.New(fmt.Errorf("failed to create couchdb design doc: %w", err), dsapi.DesignDocCreationFailedReason)
   233  	}
   234  	controllerutil.RemoveFinalizer(ddoc, DatasyncFinalizer)
   235  	return nil
   236  }
   237  
   238  func designDocEqual(doc map[string]interface{}, specDoc dsapi.DesignDoc) bool {
   239  	lang, ok := doc["language"]
   240  	if specDoc.Language != "" && ok && specDoc.Language != lang.(string) {
   241  		return false
   242  	}
   243  	vdu, ok := doc["validateDocUpdate"]
   244  	if specDoc.ValidateDocUpdate != "" && ok && specDoc.ValidateDocUpdate != vdu.(string) {
   245  		return false
   246  	}
   247  	if !docValueEqual(specDoc.Views, doc["views"]) {
   248  		return false
   249  	}
   250  	if !docValueEqual(specDoc.Updates, doc["updates"]) {
   251  		return false
   252  	}
   253  	if !docValueEqual(specDoc.Filters, doc["filters"]) {
   254  		return false
   255  	}
   256  	return true
   257  }
   258  
   259  func docValueEqual(doc1 map[string]map[string]string, doc2 interface{}) bool {
   260  	if len(doc1) == 0 && doc2 == nil {
   261  		return true
   262  	}
   263  	if len(doc1) > 0 && doc2 == nil {
   264  		return false
   265  	}
   266  	if doc2 != nil && len(doc1) == 0 {
   267  		return false
   268  	}
   269  	data1, err := json.Marshal(doc1)
   270  	if err != nil {
   271  		return false
   272  	}
   273  	data2, err := json.Marshal(doc2)
   274  	if err != nil {
   275  		return false
   276  	}
   277  	return bytes.Equal(data1, data2)
   278  }
   279  
   280  func addRevisionDoc(doc interface{}, rev string) map[string]interface{} {
   281  	m := map[string]interface{}{}
   282  	m["_rev"] = rev
   283  	data, _ := json.Marshal(doc)
   284  	_ = json.Unmarshal(data, &m)
   285  	return m
   286  }
   287  

View as plain text