...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/replicationset_controller.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"slices"
     7  	"time"
     8  
     9  	"github.com/go-logr/logr"
    10  
    11  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    12  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    13  	"edge-infra.dev/pkg/k8s/meta/status"
    14  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    15  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    16  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    17  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    18  	"edge-infra.dev/pkg/k8s/runtime/patch"
    19  
    20  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    21  	"k8s.io/apimachinery/pkg/types"
    22  	kuberecorder "k8s.io/client-go/tools/record"
    23  	"k8s.io/client-go/util/workqueue"
    24  
    25  	ctrl "sigs.k8s.io/controller-runtime"
    26  	"sigs.k8s.io/controller-runtime/pkg/builder"
    27  	"sigs.k8s.io/controller-runtime/pkg/client"
    28  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    29  	re "sigs.k8s.io/controller-runtime/pkg/reconcile"
    30  )
    31  
    32  type CouchReplicationReconciler struct {
    33  	client.Client
    34  	NodeResourcePredicate
    35  	kuberecorder.EventRecorder
    36  	Name         string
    37  	Config       *Config
    38  	Metrics      metrics.Metrics
    39  	patchOptions []patch.Option
    40  
    41  	interlockClient  *InterlockClient
    42  	replicationEvent *ReplicationEvent
    43  	log              logr.Logger
    44  }
    45  
    46  var (
    47  	replicationConditions = reconcile.Conditions{
    48  		Target: status.ReadyCondition,
    49  		Owned: []string{
    50  			string(dsapi.ReplicationSucceededStatus),
    51  			status.ReadyCondition,
    52  			status.ReconcilingCondition,
    53  			status.StalledCondition,
    54  		},
    55  		Summarize: []string{
    56  			string(dsapi.ReplicationSucceededStatus),
    57  			status.StalledCondition,
    58  		},
    59  		NegativePolarity: []string{
    60  			status.ReconcilingCondition,
    61  			status.StalledCondition,
    62  		},
    63  	}
    64  )
    65  
    66  // SetupWithManager sets up CouchReplicationReconciler with the manager
    67  func (r *CouchReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
    68  	r.patchOptions = getPatchOptions(replicationConditions.Owned, r.Name)
    69  	r.log = mgr.GetLogger()
    70  
    71  	b := ctrl.NewControllerManagedBy(mgr).
    72  		For(&dsapi.CouchDBReplicationSet{}, r.replicationPredicates())
    73  	if r.Config.IsDSDS() {
    74  		r.interlockClient = NewInterlockClient(r.Config.InterlockAPIURL, r.EnQueue)
    75  		b.WatchesRawSource(r.interlockClient)
    76  	}
    77  	r.replicationEvent.log = r.log // override default logger
    78  	b.WatchesRawSource(r.replicationEvent)
    79  
    80  	return b.Complete(r)
    81  }
    82  
    83  func (r *CouchReplicationReconciler) replicationPredicates() builder.Predicates {
    84  	return builder.WithPredicates(
    85  		predicate.GenerationChangedPredicate{},
    86  		predicate.NewPredicateFuncs(func(obj client.Object) bool {
    87  			if r.Config.IsDSDS() {
    88  				return r.ShouldReconcile(r.Config, obj)
    89  			}
    90  			return true
    91  		}))
    92  }
    93  
    94  func (r *CouchReplicationReconciler) EnQueue(_ HostState, queue workqueue.RateLimitingInterface) {
    95  	repls := &dsapi.CouchDBReplicationSetList{}
    96  	opts := []client.ListOption{client.MatchingLabels{couchdb.NodeUIDLabel: r.Config.NodeUID}}
    97  	err := r.Client.List(context.Background(), repls, opts...)
    98  	if err != nil {
    99  		r.log.Error(err, "fail to list replication sets")
   100  		return
   101  	}
   102  
   103  	for _, item := range repls.Items {
   104  		queue.Add(re.Request{
   105  			NamespacedName: types.NamespacedName{
   106  				Namespace: item.Namespace,
   107  				Name:      item.Name,
   108  			},
   109  		})
   110  	}
   111  }
   112  
   113  func (r *CouchReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
   114  	reconcileStart := time.Now()
   115  	log := ctrl.LoggerFrom(ctx)
   116  
   117  	replicationSet := &dsapi.CouchDBReplicationSet{}
   118  	if err := r.Client.Get(ctx, req.NamespacedName, replicationSet); err != nil {
   119  		return ctrl.Result{}, client.IgnoreNotFound(err)
   120  	}
   121  	replicationSet.WithRetry(r.Config.RequeueTime)
   122  	replicationSet.WithInterval(r.Config.ReplicationPollingInterval)
   123  
   124  	log = log.WithValues("replication-db", replicationSet.Spec.Datasets[0].Name)
   125  	ctx = logr.NewContext(ctx, log)
   126  
   127  	patcher := patch.NewSerialPatcher(replicationSet, r.Client)
   128  	if err := reconcile.Progressing(ctx, replicationSet, patcher, r.patchOptions...); err != nil {
   129  		log.Error(err, "unable to update status")
   130  		return ctrl.Result{}, err
   131  	}
   132  
   133  	recResult := reconcile.ResultEmpty
   134  	var recErr recerr.Error
   135  
   136  	defer func() {
   137  		summarizer := reconcile.NewSummarizer(patcher)
   138  		res, err = summarizer.SummarizeAndPatch(ctx, replicationSet, []reconcile.SummarizeOption{
   139  			reconcile.WithConditions(replicationConditions),
   140  			reconcile.WithResult(recResult),
   141  			reconcile.WithError(recErr),
   142  			reconcile.WithIgnoreNotFound(),
   143  			reconcile.WithProcessors(
   144  				reconcile.RecordResult,
   145  			),
   146  			reconcile.WithFieldOwner(r.Name),
   147  			reconcile.WithEventRecorder(r.EventRecorder),
   148  		}...)
   149  		r.Metrics.RecordDuration(ctx, replicationSet, reconcileStart)
   150  		r.Metrics.RecordReadiness(ctx, replicationSet)
   151  	}()
   152  
   153  	if recErr = r.reconcile(ctx, replicationSet); recErr != nil {
   154  		if !couchDBNotReadyOrNotFound(recErr) {
   155  			recErr.ToCondition(replicationSet, string(dsapi.ReplicationSucceededStatus))
   156  			err = recErr
   157  			return
   158  		}
   159  	}
   160  	recResult = reconcile.ResultSuccess
   161  	conditions.MarkTrue(replicationSet, string(dsapi.ReplicationSucceededStatus), status.SucceededReason, "Successfully created replication")
   162  	log.Info("Successfully created replication")
   163  
   164  	return
   165  }
   166  
   167  func (r *CouchReplicationReconciler) reconcile(ctx context.Context, repl *dsapi.CouchDBReplicationSet) recerr.Error {
   168  	log := logr.FromContextOrDiscard(ctx)
   169  
   170  	var inLom bool
   171  	if r.Config.IsDSDS() {
   172  		hs, err := r.interlockClient.GetHostState(context.Background())
   173  		if err != nil {
   174  			r.log.Error(err, "fail to get host state from interlock API")
   175  			return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.RequeueTime)
   176  		}
   177  		inLom = hs.InLOM()
   178  	}
   179  	// get the source secret
   180  	sourceCreds := &couchdb.ReplicationCredentials{}
   181  	sourceNN := types.NamespacedName{Name: repl.Spec.Source.Name, Namespace: repl.Spec.Source.Namespace}
   182  	reqTime := r.Config.ServerNotReady
   183  	fromCloud := cloudReplication(sourceNN)
   184  	if fromCloud {
   185  		// for cloud replication datasync might not be enabled
   186  		reqTime = r.Config.EnablementWatchInterval
   187  	}
   188  	_, err := sourceCreds.FromSecret(ctx, r.Client, sourceNN)
   189  	if err != nil {
   190  		msg := "replication secret not found"
   191  		if fromCloud {
   192  			msg = "datasync not enabled: " + msg
   193  		}
   194  		log.Error(err, msg, "NamespacedName", sourceNN)
   195  		return recerr.NewWait(err, status.DependencyNotReadyReason, reqTime)
   196  	}
   197  	if !fromCloud {
   198  		replicationDB := repl.Spec.Datasets[0].Name
   199  		sourceCreds.DBName = []byte(replicationDB)
   200  		leaderURL, err := r.leaderURL(ctx)
   201  		if err != nil {
   202  			log.Error(err, "leader couchdb not found", "NamespacedName", sourceNN)
   203  			return recerr.NewWait(err, status.DependencyNotReadyReason, reqTime)
   204  		}
   205  		sourceCreds.URI = []byte(leaderURL)
   206  	}
   207  
   208  	targetServerNN := repl.ServerRef()
   209  	ready, targetServer, err := checkIfServerIsReady(ctx, r.Client, repl)
   210  	if err != nil {
   211  		return recerr.NewWait(err, dsapi.ServerInvalidReason, r.Config.ServerNotReady)
   212  	}
   213  
   214  	if !ready {
   215  		err := fmt.Errorf("%v %w", targetServerNN, ErrServerNotReady)
   216  		return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.ServerNotReady)
   217  	}
   218  
   219  	log = log.WithValues("IN_LOM", inLom, "server", client.ObjectKeyFromObject(targetServer), "URI", targetServer.Spec.URI)
   220  	ctx = logr.NewContext(ctx, log)
   221  
   222  	// get the target server admin credentials to be able to create replication
   223  	targetAdminCreds := &couchdb.AdminCredentials{}
   224  	targetAdminRef := targetServer.AdminCredentials()
   225  	targetAdminNN := types.NamespacedName{Name: targetAdminRef.Name, Namespace: targetAdminRef.Namespace}
   226  	_, err = targetAdminCreds.FromSecret(ctx, r.Client, targetAdminNN)
   227  	switch {
   228  	case err != nil && kerrors.IsNotFound(err):
   229  		log.Error(err, "error target server AdminCredentials Not found", "NamespacedName", targetAdminNN)
   230  		return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.ServerNotReady)
   231  	case err != nil:
   232  		log.Error(err, "error getting target server AdminCredentials", "NamespacedName", targetAdminNN)
   233  		return recerr.New(err, status.DependencyInvalidReason)
   234  	}
   235  
   236  	// create the couchdb client using the server admin credentials
   237  	targetClient := &couchdb.CouchDB{}
   238  	err = targetClient.New(couchdb.Driver, string(targetAdminCreds.Username), string(targetAdminCreds.Password), targetServer.Spec.URI, r.Config.CouchDBPort)
   239  	if err != nil {
   240  		log.Error(err, "error initializing couchdb client", "NamespacedName", targetAdminNN)
   241  		return recerr.NewWait(err, string(dsapi.ReplicationCredentialsInvalidStatus), r.Config.ServerNotReady)
   242  	}
   243  	// defer closing the client
   244  	defer targetClient.Close(ctx)
   245  
   246  	sourceClient := &couchdb.CouchDB{}
   247  	err = sourceClient.NewFromURL(string(sourceCreds.Username), string(sourceCreds.Password), string(sourceCreds.URI))
   248  	if err != nil {
   249  		log.Error(err, "error initializing cloud couchdb client", "NamespacedName", sourceNN)
   250  		return recerr.NewWait(err, string(dsapi.ReplicationCredentialsInvalidStatus), r.Config.ServerNotReady)
   251  	}
   252  	// defer closing the client
   253  	defer sourceClient.Close(ctx)
   254  
   255  	// Replicate the replication doc from cloud to store and from store to touchpoints
   256  	dsapi.ResetCouchDBReplicationSetInventory(repl)
   257  
   258  	ri := &ReplicationInfo{
   259  		SourceURI:      string(sourceCreds.URI),
   260  		SourceUsername: string(sourceCreds.Username),
   261  		SourcePassword: string(sourceCreds.Password),
   262  
   263  		TargetURI:      fmt.Sprintf(couchdb.ReplicationHostFormat, r.Config.CouchDBPort),
   264  		TargetUsername: string(targetAdminCreds.Username),
   265  		TargetPassword: string(targetAdminCreds.Password),
   266  	}
   267  
   268  	replDB, rErr := r.createReplication(ctx, repl, targetClient, sourceClient, ri, repl.Spec.Datasets, inLom)
   269  	if rErr != nil {
   270  		return rErr
   271  	}
   272  
   273  	// source and target db names are the same
   274  	replicationDBName := string(sourceCreds.DBName)
   275  	replSet := &dsapi.ReplicationSet{}
   276  	if err := targetClient.GetReplicationSetDoc(ctx, replicationDBName, replSet); err != nil {
   277  		return recerr.NewWait(err, status.DependencyInvalidReason, r.Config.DatabaseNotFound)
   278  	}
   279  
   280  	// Replicate the databases found in the replication docs from cloud to store and from store to touchpoints
   281  	// TODO success status
   282  	replDBs, rErr := r.createReplication(ctx, repl, targetClient, sourceClient, ri, replSet.Datasets, inLom)
   283  	if rErr != nil {
   284  		return rErr
   285  	}
   286  	cleanReplicationStatus(repl, replDB, replDBs)
   287  
   288  	if inLom {
   289  		r.replicationEvent.Stop()
   290  	} else if err := r.listenToReplicationEvents(repl, targetServer, sourceCreds, targetAdminCreds); err != nil {
   291  		return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.RequeueTime)
   292  	}
   293  	return nil
   294  }
   295  
   296  func (r *CouchReplicationReconciler) listenToReplicationEvents(repl *dsapi.CouchDBReplicationSet,
   297  	targetServer *dsapi.CouchDBServer,
   298  	sourceCreds *couchdb.ReplicationCredentials,
   299  	targetAdminCreds *couchdb.AdminCredentials) error {
   300  	var err error
   301  	if r.Config.ReplicationEventFromSource {
   302  		err = r.replicationEvent.Listen(repl,
   303  			string(sourceCreds.Username), string(sourceCreds.Password), string(sourceCreds.URI))
   304  	} else {
   305  		err = r.replicationEvent.Listen(repl, string(targetAdminCreds.Username), string(targetAdminCreds.Password),
   306  			fmt.Sprintf("http://%s:%s", targetServer.Spec.URI, r.Config.CouchDBPort))
   307  	}
   308  	return err
   309  }
   310  
   311  func (r *CouchReplicationReconciler) leaderURL(ctx context.Context) (string, error) {
   312  	list := &dsapi.CouchDBServerList{}
   313  	err := r.Client.List(ctx, list)
   314  	if err != nil {
   315  		return "", err
   316  	}
   317  	for _, item := range list.Items {
   318  		if item.Labels[couchdb.NodeLeaderLabel] == "true" {
   319  			return getServerURL(r.Config, &item), nil // #nosec G601
   320  		}
   321  	}
   322  	return "", fmt.Errorf("leader CouchDBServer not found")
   323  }
   324  
   325  func (r *CouchReplicationReconciler) createReplication(ctx context.Context,
   326  	repl *dsapi.CouchDBReplicationSet,
   327  	targetClient, sourceClient *couchdb.CouchDB,
   328  	ri *ReplicationInfo, datasets []dsapi.Dataset, cancel bool) (*BulkDocs, recerr.Error) {
   329  	log := logr.FromContextOrDiscard(ctx)
   330  
   331  	replicationDB := repl.Spec.Datasets[0].Name
   332  
   333  	bulkDocs := toBulkReplicationDocs(ri, datasets, cancel)
   334  	defer r.updateStatus(repl, bulkDocs)
   335  
   336  	r.compactDatabase(ctx, targetClient, bulkDocs)
   337  
   338  	r.validateReplication(ctx, targetClient, bulkDocs, replicationDB)
   339  
   340  	err := r.bulkInsert(ctx, targetClient, bulkDocs)
   341  	if err != nil {
   342  		log.Error(err, "bulk insert error")
   343  	}
   344  
   345  	err = r.waitForReplicationDBCreation(ctx, targetClient, bulkDocs)
   346  	if err != nil {
   347  		log.Error(err, "database not replicated")
   348  	}
   349  
   350  	r.makeDBsReadOnly(ctx, targetClient, bulkDocs)
   351  
   352  	r.updateMetrics(ctx, repl.Spec.Target.Name, targetClient, sourceClient, bulkDocs)
   353  
   354  	if err := bulkDocs.JoinErrors(); err != nil {
   355  		log.Error(err, "fail to replicate all databases", "stats", bulkDocs.Stats())
   356  		err = fmt.Errorf("fail to replicate all databases")
   357  		return bulkDocs, wait(repl, err, string(dsapi.ReplicationBadStateStatus), r.Config.DatabaseNotFound)
   358  	}
   359  	return bulkDocs, nil
   360  }
   361  
   362  func (r *CouchReplicationReconciler) updateStatus(repl *dsapi.CouchDBReplicationSet, bulkDocs *BulkDocs) {
   363  	for dbname, doc := range bulkDocs.Docs {
   364  		if doc.State == Done {
   365  			dsapi.SetCouchDBReplicationSetInfo(repl, dbname, dsapi.ReplicationSucceededStatus, "replication created successfully")
   366  		} else {
   367  			dsapi.SetCouchDBReplicationSetInfo(repl, dbname, dsapi.ReplicationCreationFailedStatus, errorOrMessage(doc.Error, "replication failed"))
   368  		}
   369  	}
   370  }
   371  
   372  func (r *CouchReplicationReconciler) makeDBsReadOnly(ctx context.Context, targetClient *couchdb.CouchDB, bulkDocs *BulkDocs) {
   373  	for _, dbname := range bulkDocs.GetDocs(Done) {
   374  		exists, err := targetClient.CheckIfDBExists(ctx, dbname) // remove this
   375  		if err != nil || !exists {
   376  			err = fmt.Errorf("error getting replicated db: %s", dbname)
   377  			bulkDocs.SetError(dbname, err)
   378  			continue
   379  		}
   380  
   381  		if err = targetClient.MakeReadOnly(ctx, dbname); err != nil { // 3 calls
   382  			err = fmt.Errorf("fail to make replication database read-only: %w", err)
   383  			bulkDocs.SetError(dbname, err)
   384  			continue
   385  		}
   386  	}
   387  }
   388  
   389  // updateMetrics stats document count  and diffs
   390  func (r *CouchReplicationReconciler) updateMetrics(ctx context.Context, servername string, targetClient, sourceClient *couchdb.CouchDB, bulkDocs *BulkDocs) {
   391  	for dbname, doc := range bulkDocs.Docs {
   392  		if doc.State == Done {
   393  			targetDBStat, err := targetClient.Client.DB(dbname).Stats(ctx)
   394  			if err != nil {
   395  				doc.SetError(err)
   396  				ReplicationStatusInc(servername, dbname, string(dsapi.ReplicationBadStateStatus), err.Error())
   397  				continue
   398  			}
   399  
   400  			DatabaseDocumentCountSet(servername, dbname, float64(targetDBStat.DocCount))
   401  
   402  			sourceDBStat, err := sourceClient.Client.DB(dbname).Stats(ctx)
   403  			if err != nil {
   404  				doc.SetError(err)
   405  				ReplicationStatusInc(servername, dbname, string(dsapi.ReplicationBadStateStatus), err.Error())
   406  				continue
   407  			}
   408  
   409  			diff := sourceDBStat.DocCount - targetDBStat.DocCount
   410  			if diff < 1 {
   411  				diff = 0
   412  			}
   413  
   414  			DatabaseDocumentDiffInc(servername, dbname, float64(diff))
   415  
   416  			ReplicationStatusInc(servername, dbname, string(dsapi.ReplicationSucceededStatus), "replication created successfully")
   417  		} else {
   418  			ReplicationStatusInc(servername, dbname, string(dsapi.ReplicationBadStateStatus), errorOrMessage(doc.Error, "replication failed"))
   419  		}
   420  	}
   421  }
   422  
   423  func (r *CouchReplicationReconciler) compactDatabase(ctx context.Context, targetDB *couchdb.CouchDB, bulkDocs *BulkDocs) {
   424  	if r.Config.CompactRatio < 1 {
   425  		return
   426  	}
   427  	for dbname, doc := range bulkDocs.Docs {
   428  		db := targetDB.Client.DB(dbname)
   429  		stats, err := db.Stats(ctx)
   430  		if err != nil {
   431  			if couchdb.IgnoreNotFound(err) != nil {
   432  				doc.SetError(err)
   433  			}
   434  			continue
   435  		}
   436  		if stats.DiskSize == 0 || stats.ActiveSize == 0 {
   437  			continue
   438  		}
   439  		if float64(stats.DiskSize)/float64(stats.ActiveSize) > r.Config.CompactRatio {
   440  			if err := db.Compact(ctx); err != nil {
   441  				doc.SetError(err)
   442  				continue
   443  			}
   444  		}
   445  	}
   446  }
   447  
   448  func (r *CouchReplicationReconciler) validateReplication(ctx context.Context, targetDB *couchdb.CouchDB, bulkDocs *BulkDocs, replicationDB string) {
   449  	log := logr.FromContextOrDiscard(ctx)
   450  	var skipDocs []string
   451  	defer func() {
   452  		bulkDocs.Remove(skipDocs...)
   453  	}()
   454  	for dbname, doc := range bulkDocs.Docs {
   455  		if !shouldReplicate(r.Config, doc.Dataset, replicationDB) {
   456  			skipDocs = append(skipDocs, dbname) // do not replicate or process this database
   457  			err := targetDB.DeleteReplication(ctx, dbname)
   458  			if err != nil {
   459  				log.Error(err, "failed to delete replication", "dbname", dbname)
   460  				continue
   461  			}
   462  			if doc.Dataset.Deleted {
   463  				err = targetDB.Client.DestroyDB(ctx, dbname)
   464  				if err != nil && !couchdb.IsNotFound(err) {
   465  					log.Error(err, "failed to delete couch database", "dbname", dbname)
   466  					continue
   467  				}
   468  			}
   469  		}
   470  	}
   471  }
   472  
   473  func (r *CouchReplicationReconciler) waitForReplicationDBCreation(ctx context.Context, cc *couchdb.CouchDB, bulkDocs *BulkDocs) error {
   474  	replicatedDBs := bulkDocs.GetDocs(Done)
   475  	if len(replicatedDBs) == 0 {
   476  		return nil
   477  	}
   478  	m := make(map[string]bool)
   479  	for {
   480  		select {
   481  		case <-ctx.Done():
   482  			return ctx.Err()
   483  		case <-time.After(r.Config.DatabaseNotFound):
   484  			return fmt.Errorf("timeout waiting for databases creation")
   485  		default:
   486  			dbs, err := cc.Client.AllDBs(ctx)
   487  			if err != nil {
   488  				return fmt.Errorf("error getting all dbs: %w", err)
   489  			}
   490  			if len(dbs) == 0 {
   491  				break
   492  			}
   493  			for _, replicatedDB := range replicatedDBs {
   494  				if m[replicatedDB] {
   495  					continue
   496  				}
   497  				if slices.Contains(dbs, replicatedDB) {
   498  					m[replicatedDB] = true
   499  				}
   500  			}
   501  			if len(m) == len(replicatedDBs) {
   502  				return nil
   503  			}
   504  		}
   505  		time.Sleep(r.Config.ReplicationDBCreated)
   506  	}
   507  }
   508  
   509  func shouldReplicate(cfg *Config, ds dsapi.Dataset, replicationDB string) bool {
   510  	if ds.Deleted {
   511  		return false
   512  	}
   513  	// replication doc overrides config
   514  	if ds.Name == replicationDB {
   515  		return true
   516  	}
   517  	if ds.EnterpriseUnitID == "" {
   518  		return true
   519  	}
   520  
   521  	if cfg.SiteID != "" { // should not be nil for stores
   522  		return cfg.SiteID == ds.EnterpriseUnitID
   523  	}
   524  	return true
   525  }
   526  
   527  func errorOrMessage(err error, msg string) string {
   528  	if err != nil {
   529  		return err.Error()
   530  	}
   531  	return msg
   532  }
   533  
   534  func cloudReplication(nn types.NamespacedName) bool {
   535  	ref := dsapi.CloudReplicationCredentials()
   536  	return ref.Name == nn.Name && ref.Namespace == nn.Namespace
   537  }
   538  
   539  // cleanReplicationStatus if db is deleted remove it from status
   540  func cleanReplicationStatus(repl *dsapi.CouchDBReplicationSet, dbs ...*BulkDocs) {
   541  	var validDbs []string
   542  	for _, db := range dbs {
   543  		for name := range db.Docs {
   544  			validDbs = append(validDbs, name)
   545  		}
   546  	}
   547  	dsapi.CleanReplications(repl, validDbs)
   548  }
   549  

View as plain text