1 package couchctl
2
3 import (
4 "context"
5 "fmt"
6 "slices"
7 "time"
8
9 "github.com/go-logr/logr"
10
11 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
12 "edge-infra.dev/pkg/edge/datasync/couchdb"
13 "edge-infra.dev/pkg/k8s/meta/status"
14 "edge-infra.dev/pkg/k8s/runtime/conditions"
15 "edge-infra.dev/pkg/k8s/runtime/controller/metrics"
16 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
17 "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
18 "edge-infra.dev/pkg/k8s/runtime/patch"
19
20 kerrors "k8s.io/apimachinery/pkg/api/errors"
21 "k8s.io/apimachinery/pkg/types"
22 kuberecorder "k8s.io/client-go/tools/record"
23 "k8s.io/client-go/util/workqueue"
24
25 ctrl "sigs.k8s.io/controller-runtime"
26 "sigs.k8s.io/controller-runtime/pkg/builder"
27 "sigs.k8s.io/controller-runtime/pkg/client"
28 "sigs.k8s.io/controller-runtime/pkg/predicate"
29 re "sigs.k8s.io/controller-runtime/pkg/reconcile"
30 )
31
32 type CouchReplicationReconciler struct {
33 client.Client
34 NodeResourcePredicate
35 kuberecorder.EventRecorder
36 Name string
37 Config *Config
38 Metrics metrics.Metrics
39 patchOptions []patch.Option
40
41 interlockClient *InterlockClient
42 replicationEvent *ReplicationEvent
43 log logr.Logger
44 }
45
46 var (
47 replicationConditions = reconcile.Conditions{
48 Target: status.ReadyCondition,
49 Owned: []string{
50 string(dsapi.ReplicationSucceededStatus),
51 status.ReadyCondition,
52 status.ReconcilingCondition,
53 status.StalledCondition,
54 },
55 Summarize: []string{
56 string(dsapi.ReplicationSucceededStatus),
57 status.StalledCondition,
58 },
59 NegativePolarity: []string{
60 status.ReconcilingCondition,
61 status.StalledCondition,
62 },
63 }
64 )
65
66
67 func (r *CouchReplicationReconciler) SetupWithManager(mgr ctrl.Manager) error {
68 r.patchOptions = getPatchOptions(replicationConditions.Owned, r.Name)
69 r.log = mgr.GetLogger()
70
71 b := ctrl.NewControllerManagedBy(mgr).
72 For(&dsapi.CouchDBReplicationSet{}, r.replicationPredicates())
73 if r.Config.IsDSDS() {
74 r.interlockClient = NewInterlockClient(r.Config.InterlockAPIURL, r.EnQueue)
75 b.WatchesRawSource(r.interlockClient)
76 }
77 r.replicationEvent.log = r.log
78 b.WatchesRawSource(r.replicationEvent)
79
80 return b.Complete(r)
81 }
82
83 func (r *CouchReplicationReconciler) replicationPredicates() builder.Predicates {
84 return builder.WithPredicates(
85 predicate.GenerationChangedPredicate{},
86 predicate.NewPredicateFuncs(func(obj client.Object) bool {
87 if r.Config.IsDSDS() {
88 return r.ShouldReconcile(r.Config, obj)
89 }
90 return true
91 }))
92 }
93
94 func (r *CouchReplicationReconciler) EnQueue(_ HostState, queue workqueue.RateLimitingInterface) {
95 repls := &dsapi.CouchDBReplicationSetList{}
96 opts := []client.ListOption{client.MatchingLabels{couchdb.NodeUIDLabel: r.Config.NodeUID}}
97 err := r.Client.List(context.Background(), repls, opts...)
98 if err != nil {
99 r.log.Error(err, "fail to list replication sets")
100 return
101 }
102
103 for _, item := range repls.Items {
104 queue.Add(re.Request{
105 NamespacedName: types.NamespacedName{
106 Namespace: item.Namespace,
107 Name: item.Name,
108 },
109 })
110 }
111 }
112
113 func (r *CouchReplicationReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
114 reconcileStart := time.Now()
115 log := ctrl.LoggerFrom(ctx)
116
117 replicationSet := &dsapi.CouchDBReplicationSet{}
118 if err := r.Client.Get(ctx, req.NamespacedName, replicationSet); err != nil {
119 return ctrl.Result{}, client.IgnoreNotFound(err)
120 }
121 replicationSet.WithRetry(r.Config.RequeueTime)
122 replicationSet.WithInterval(r.Config.ReplicationPollingInterval)
123
124 log = log.WithValues("replication-db", replicationSet.Spec.Datasets[0].Name)
125 ctx = logr.NewContext(ctx, log)
126
127 patcher := patch.NewSerialPatcher(replicationSet, r.Client)
128 if err := reconcile.Progressing(ctx, replicationSet, patcher, r.patchOptions...); err != nil {
129 log.Error(err, "unable to update status")
130 return ctrl.Result{}, err
131 }
132
133 recResult := reconcile.ResultEmpty
134 var recErr recerr.Error
135
136 defer func() {
137 summarizer := reconcile.NewSummarizer(patcher)
138 res, err = summarizer.SummarizeAndPatch(ctx, replicationSet, []reconcile.SummarizeOption{
139 reconcile.WithConditions(replicationConditions),
140 reconcile.WithResult(recResult),
141 reconcile.WithError(recErr),
142 reconcile.WithIgnoreNotFound(),
143 reconcile.WithProcessors(
144 reconcile.RecordResult,
145 ),
146 reconcile.WithFieldOwner(r.Name),
147 reconcile.WithEventRecorder(r.EventRecorder),
148 }...)
149 r.Metrics.RecordDuration(ctx, replicationSet, reconcileStart)
150 r.Metrics.RecordReadiness(ctx, replicationSet)
151 }()
152
153 if recErr = r.reconcile(ctx, replicationSet); recErr != nil {
154 if !couchDBNotReadyOrNotFound(recErr) {
155 recErr.ToCondition(replicationSet, string(dsapi.ReplicationSucceededStatus))
156 err = recErr
157 return
158 }
159 }
160 recResult = reconcile.ResultSuccess
161 conditions.MarkTrue(replicationSet, string(dsapi.ReplicationSucceededStatus), status.SucceededReason, "Successfully created replication")
162 log.Info("Successfully created replication")
163
164 return
165 }
166
167 func (r *CouchReplicationReconciler) reconcile(ctx context.Context, repl *dsapi.CouchDBReplicationSet) recerr.Error {
168 log := logr.FromContextOrDiscard(ctx)
169
170 var inLom bool
171 if r.Config.IsDSDS() {
172 hs, err := r.interlockClient.GetHostState(context.Background())
173 if err != nil {
174 r.log.Error(err, "fail to get host state from interlock API")
175 return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.RequeueTime)
176 }
177 inLom = hs.InLOM()
178 }
179
180 sourceCreds := &couchdb.ReplicationCredentials{}
181 sourceNN := types.NamespacedName{Name: repl.Spec.Source.Name, Namespace: repl.Spec.Source.Namespace}
182 reqTime := r.Config.ServerNotReady
183 fromCloud := cloudReplication(sourceNN)
184 if fromCloud {
185
186 reqTime = r.Config.EnablementWatchInterval
187 }
188 _, err := sourceCreds.FromSecret(ctx, r.Client, sourceNN)
189 if err != nil {
190 msg := "replication secret not found"
191 if fromCloud {
192 msg = "datasync not enabled: " + msg
193 }
194 log.Error(err, msg, "NamespacedName", sourceNN)
195 return recerr.NewWait(err, status.DependencyNotReadyReason, reqTime)
196 }
197 if !fromCloud {
198 replicationDB := repl.Spec.Datasets[0].Name
199 sourceCreds.DBName = []byte(replicationDB)
200 leaderURL, err := r.leaderURL(ctx)
201 if err != nil {
202 log.Error(err, "leader couchdb not found", "NamespacedName", sourceNN)
203 return recerr.NewWait(err, status.DependencyNotReadyReason, reqTime)
204 }
205 sourceCreds.URI = []byte(leaderURL)
206 }
207
208 targetServerNN := repl.ServerRef()
209 ready, targetServer, err := checkIfServerIsReady(ctx, r.Client, repl)
210 if err != nil {
211 return recerr.NewWait(err, dsapi.ServerInvalidReason, r.Config.ServerNotReady)
212 }
213
214 if !ready {
215 err := fmt.Errorf("%v %w", targetServerNN, ErrServerNotReady)
216 return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.ServerNotReady)
217 }
218
219 log = log.WithValues("IN_LOM", inLom, "server", client.ObjectKeyFromObject(targetServer), "URI", targetServer.Spec.URI)
220 ctx = logr.NewContext(ctx, log)
221
222
223 targetAdminCreds := &couchdb.AdminCredentials{}
224 targetAdminRef := targetServer.AdminCredentials()
225 targetAdminNN := types.NamespacedName{Name: targetAdminRef.Name, Namespace: targetAdminRef.Namespace}
226 _, err = targetAdminCreds.FromSecret(ctx, r.Client, targetAdminNN)
227 switch {
228 case err != nil && kerrors.IsNotFound(err):
229 log.Error(err, "error target server AdminCredentials Not found", "NamespacedName", targetAdminNN)
230 return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.ServerNotReady)
231 case err != nil:
232 log.Error(err, "error getting target server AdminCredentials", "NamespacedName", targetAdminNN)
233 return recerr.New(err, status.DependencyInvalidReason)
234 }
235
236
237 targetClient := &couchdb.CouchDB{}
238 err = targetClient.New(couchdb.Driver, string(targetAdminCreds.Username), string(targetAdminCreds.Password), targetServer.Spec.URI, r.Config.CouchDBPort)
239 if err != nil {
240 log.Error(err, "error initializing couchdb client", "NamespacedName", targetAdminNN)
241 return recerr.NewWait(err, string(dsapi.ReplicationCredentialsInvalidStatus), r.Config.ServerNotReady)
242 }
243
244 defer targetClient.Close(ctx)
245
246 sourceClient := &couchdb.CouchDB{}
247 err = sourceClient.NewFromURL(string(sourceCreds.Username), string(sourceCreds.Password), string(sourceCreds.URI))
248 if err != nil {
249 log.Error(err, "error initializing cloud couchdb client", "NamespacedName", sourceNN)
250 return recerr.NewWait(err, string(dsapi.ReplicationCredentialsInvalidStatus), r.Config.ServerNotReady)
251 }
252
253 defer sourceClient.Close(ctx)
254
255
256 dsapi.ResetCouchDBReplicationSetInventory(repl)
257
258 ri := &ReplicationInfo{
259 SourceURI: string(sourceCreds.URI),
260 SourceUsername: string(sourceCreds.Username),
261 SourcePassword: string(sourceCreds.Password),
262
263 TargetURI: fmt.Sprintf(couchdb.ReplicationHostFormat, r.Config.CouchDBPort),
264 TargetUsername: string(targetAdminCreds.Username),
265 TargetPassword: string(targetAdminCreds.Password),
266 }
267
268 replDB, rErr := r.createReplication(ctx, repl, targetClient, sourceClient, ri, repl.Spec.Datasets, inLom)
269 if rErr != nil {
270 return rErr
271 }
272
273
274 replicationDBName := string(sourceCreds.DBName)
275 replSet := &dsapi.ReplicationSet{}
276 if err := targetClient.GetReplicationSetDoc(ctx, replicationDBName, replSet); err != nil {
277 return recerr.NewWait(err, status.DependencyInvalidReason, r.Config.DatabaseNotFound)
278 }
279
280
281
282 replDBs, rErr := r.createReplication(ctx, repl, targetClient, sourceClient, ri, replSet.Datasets, inLom)
283 if rErr != nil {
284 return rErr
285 }
286 cleanReplicationStatus(repl, replDB, replDBs)
287
288 if inLom {
289 r.replicationEvent.Stop()
290 } else if err := r.listenToReplicationEvents(repl, targetServer, sourceCreds, targetAdminCreds); err != nil {
291 return recerr.NewWait(err, status.DependencyNotReadyReason, r.Config.RequeueTime)
292 }
293 return nil
294 }
295
296 func (r *CouchReplicationReconciler) listenToReplicationEvents(repl *dsapi.CouchDBReplicationSet,
297 targetServer *dsapi.CouchDBServer,
298 sourceCreds *couchdb.ReplicationCredentials,
299 targetAdminCreds *couchdb.AdminCredentials) error {
300 var err error
301 if r.Config.ReplicationEventFromSource {
302 err = r.replicationEvent.Listen(repl,
303 string(sourceCreds.Username), string(sourceCreds.Password), string(sourceCreds.URI))
304 } else {
305 err = r.replicationEvent.Listen(repl, string(targetAdminCreds.Username), string(targetAdminCreds.Password),
306 fmt.Sprintf("http://%s:%s", targetServer.Spec.URI, r.Config.CouchDBPort))
307 }
308 return err
309 }
310
311 func (r *CouchReplicationReconciler) leaderURL(ctx context.Context) (string, error) {
312 list := &dsapi.CouchDBServerList{}
313 err := r.Client.List(ctx, list)
314 if err != nil {
315 return "", err
316 }
317 for _, item := range list.Items {
318 if item.Labels[couchdb.NodeLeaderLabel] == "true" {
319 return getServerURL(r.Config, &item), nil
320 }
321 }
322 return "", fmt.Errorf("leader CouchDBServer not found")
323 }
324
325 func (r *CouchReplicationReconciler) createReplication(ctx context.Context,
326 repl *dsapi.CouchDBReplicationSet,
327 targetClient, sourceClient *couchdb.CouchDB,
328 ri *ReplicationInfo, datasets []dsapi.Dataset, cancel bool) (*BulkDocs, recerr.Error) {
329 log := logr.FromContextOrDiscard(ctx)
330
331 replicationDB := repl.Spec.Datasets[0].Name
332
333 bulkDocs := toBulkReplicationDocs(ri, datasets, cancel)
334 defer r.updateStatus(repl, bulkDocs)
335
336 r.compactDatabase(ctx, targetClient, bulkDocs)
337
338 r.validateReplication(ctx, targetClient, bulkDocs, replicationDB)
339
340 err := r.bulkInsert(ctx, targetClient, bulkDocs)
341 if err != nil {
342 log.Error(err, "bulk insert error")
343 }
344
345 err = r.waitForReplicationDBCreation(ctx, targetClient, bulkDocs)
346 if err != nil {
347 log.Error(err, "database not replicated")
348 }
349
350 r.makeDBsReadOnly(ctx, targetClient, bulkDocs)
351
352 r.updateMetrics(ctx, repl.Spec.Target.Name, targetClient, sourceClient, bulkDocs)
353
354 if err := bulkDocs.JoinErrors(); err != nil {
355 log.Error(err, "fail to replicate all databases", "stats", bulkDocs.Stats())
356 err = fmt.Errorf("fail to replicate all databases")
357 return bulkDocs, wait(repl, err, string(dsapi.ReplicationBadStateStatus), r.Config.DatabaseNotFound)
358 }
359 return bulkDocs, nil
360 }
361
362 func (r *CouchReplicationReconciler) updateStatus(repl *dsapi.CouchDBReplicationSet, bulkDocs *BulkDocs) {
363 for dbname, doc := range bulkDocs.Docs {
364 if doc.State == Done {
365 dsapi.SetCouchDBReplicationSetInfo(repl, dbname, dsapi.ReplicationSucceededStatus, "replication created successfully")
366 } else {
367 dsapi.SetCouchDBReplicationSetInfo(repl, dbname, dsapi.ReplicationCreationFailedStatus, errorOrMessage(doc.Error, "replication failed"))
368 }
369 }
370 }
371
372 func (r *CouchReplicationReconciler) makeDBsReadOnly(ctx context.Context, targetClient *couchdb.CouchDB, bulkDocs *BulkDocs) {
373 for _, dbname := range bulkDocs.GetDocs(Done) {
374 exists, err := targetClient.CheckIfDBExists(ctx, dbname)
375 if err != nil || !exists {
376 err = fmt.Errorf("error getting replicated db: %s", dbname)
377 bulkDocs.SetError(dbname, err)
378 continue
379 }
380
381 if err = targetClient.MakeReadOnly(ctx, dbname); err != nil {
382 err = fmt.Errorf("fail to make replication database read-only: %w", err)
383 bulkDocs.SetError(dbname, err)
384 continue
385 }
386 }
387 }
388
389
390 func (r *CouchReplicationReconciler) updateMetrics(ctx context.Context, servername string, targetClient, sourceClient *couchdb.CouchDB, bulkDocs *BulkDocs) {
391 for dbname, doc := range bulkDocs.Docs {
392 if doc.State == Done {
393 targetDBStat, err := targetClient.Client.DB(dbname).Stats(ctx)
394 if err != nil {
395 doc.SetError(err)
396 ReplicationStatusInc(servername, dbname, string(dsapi.ReplicationBadStateStatus), err.Error())
397 continue
398 }
399
400 DatabaseDocumentCountSet(servername, dbname, float64(targetDBStat.DocCount))
401
402 sourceDBStat, err := sourceClient.Client.DB(dbname).Stats(ctx)
403 if err != nil {
404 doc.SetError(err)
405 ReplicationStatusInc(servername, dbname, string(dsapi.ReplicationBadStateStatus), err.Error())
406 continue
407 }
408
409 diff := sourceDBStat.DocCount - targetDBStat.DocCount
410 if diff < 1 {
411 diff = 0
412 }
413
414 DatabaseDocumentDiffInc(servername, dbname, float64(diff))
415
416 ReplicationStatusInc(servername, dbname, string(dsapi.ReplicationSucceededStatus), "replication created successfully")
417 } else {
418 ReplicationStatusInc(servername, dbname, string(dsapi.ReplicationBadStateStatus), errorOrMessage(doc.Error, "replication failed"))
419 }
420 }
421 }
422
423 func (r *CouchReplicationReconciler) compactDatabase(ctx context.Context, targetDB *couchdb.CouchDB, bulkDocs *BulkDocs) {
424 if r.Config.CompactRatio < 1 {
425 return
426 }
427 for dbname, doc := range bulkDocs.Docs {
428 db := targetDB.Client.DB(dbname)
429 stats, err := db.Stats(ctx)
430 if err != nil {
431 if couchdb.IgnoreNotFound(err) != nil {
432 doc.SetError(err)
433 }
434 continue
435 }
436 if stats.DiskSize == 0 || stats.ActiveSize == 0 {
437 continue
438 }
439 if float64(stats.DiskSize)/float64(stats.ActiveSize) > r.Config.CompactRatio {
440 if err := db.Compact(ctx); err != nil {
441 doc.SetError(err)
442 continue
443 }
444 }
445 }
446 }
447
448 func (r *CouchReplicationReconciler) validateReplication(ctx context.Context, targetDB *couchdb.CouchDB, bulkDocs *BulkDocs, replicationDB string) {
449 log := logr.FromContextOrDiscard(ctx)
450 var skipDocs []string
451 defer func() {
452 bulkDocs.Remove(skipDocs...)
453 }()
454 for dbname, doc := range bulkDocs.Docs {
455 if !shouldReplicate(r.Config, doc.Dataset, replicationDB) {
456 skipDocs = append(skipDocs, dbname)
457 err := targetDB.DeleteReplication(ctx, dbname)
458 if err != nil {
459 log.Error(err, "failed to delete replication", "dbname", dbname)
460 continue
461 }
462 if doc.Dataset.Deleted {
463 err = targetDB.Client.DestroyDB(ctx, dbname)
464 if err != nil && !couchdb.IsNotFound(err) {
465 log.Error(err, "failed to delete couch database", "dbname", dbname)
466 continue
467 }
468 }
469 }
470 }
471 }
472
473 func (r *CouchReplicationReconciler) waitForReplicationDBCreation(ctx context.Context, cc *couchdb.CouchDB, bulkDocs *BulkDocs) error {
474 replicatedDBs := bulkDocs.GetDocs(Done)
475 if len(replicatedDBs) == 0 {
476 return nil
477 }
478 m := make(map[string]bool)
479 for {
480 select {
481 case <-ctx.Done():
482 return ctx.Err()
483 case <-time.After(r.Config.DatabaseNotFound):
484 return fmt.Errorf("timeout waiting for databases creation")
485 default:
486 dbs, err := cc.Client.AllDBs(ctx)
487 if err != nil {
488 return fmt.Errorf("error getting all dbs: %w", err)
489 }
490 if len(dbs) == 0 {
491 break
492 }
493 for _, replicatedDB := range replicatedDBs {
494 if m[replicatedDB] {
495 continue
496 }
497 if slices.Contains(dbs, replicatedDB) {
498 m[replicatedDB] = true
499 }
500 }
501 if len(m) == len(replicatedDBs) {
502 return nil
503 }
504 }
505 time.Sleep(r.Config.ReplicationDBCreated)
506 }
507 }
508
509 func shouldReplicate(cfg *Config, ds dsapi.Dataset, replicationDB string) bool {
510 if ds.Deleted {
511 return false
512 }
513
514 if ds.Name == replicationDB {
515 return true
516 }
517 if ds.EnterpriseUnitID == "" {
518 return true
519 }
520
521 if cfg.SiteID != "" {
522 return cfg.SiteID == ds.EnterpriseUnitID
523 }
524 return true
525 }
526
527 func errorOrMessage(err error, msg string) string {
528 if err != nil {
529 return err.Error()
530 }
531 return msg
532 }
533
534 func cloudReplication(nn types.NamespacedName) bool {
535 ref := dsapi.CloudReplicationCredentials()
536 return ref.Name == nn.Name && ref.Namespace == nn.Namespace
537 }
538
539
540 func cleanReplicationStatus(repl *dsapi.CouchDBReplicationSet, dbs ...*BulkDocs) {
541 var validDbs []string
542 for _, db := range dbs {
543 for name := range db.Docs {
544 validDbs = append(validDbs, name)
545 }
546 }
547 dsapi.CleanReplications(repl, validDbs)
548 }
549
View as plain text