package couchctl import ( "context" "fmt" "strings" "sync" "time" "github.com/go-kivik/kivik/v4" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/lib/fog" "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/workqueue" re "sigs.k8s.io/controller-runtime/pkg/reconcile" ) type Changes interface { Next() bool Err() error ID() string // document ID Changes() []string // list of revisions } type ChangesFunc func(ctx context.Context, username, password, url string) (Changes, error) type RetryableErrorFunc func(err error) bool type ReplicationEvent struct { sync.Mutex log logr.Logger config *Config queue workqueue.RateLimitingInterface cache map[string]string cancelCtx map[string]context.CancelFunc parentCtx context.Context changesFunc ChangesFunc isRetryableError RetryableErrorFunc } func NewReplicationEvent(cfg *Config) *ReplicationEvent { r := &ReplicationEvent{ config: cfg, cache: make(map[string]string), cancelCtx: make(map[string]context.CancelFunc), log: fog.New().WithName("replication-event"), } r.changesFunc = r.changes r.isRetryableError = commonNetworkIssues return r } func (c *ReplicationEvent) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error { c.Lock() defer c.Unlock() c.queue = queue c.parentCtx = ctx c.log.Info("replication event queue started") return nil } func (c *ReplicationEvent) Listen(repl *dsapi.CouchDBReplicationSet, username, password, url string) error { c.Lock() defer c.Unlock() if c.queue == nil || c.parentCtx == nil { return fmt.Errorf("replication event queue has not started") } replKey := fmt.Sprintf("%s/%s", repl.Namespace, repl.Name) couchKey := fmt.Sprintf("%s:%s@%s", username, password, url) if _couchKey := c.cache[replKey]; _couchKey == couchKey { return nil } else if _couchKey != "" && c.cancelCtx[replKey] != nil { // if couch replication values are different, cancel the previous context. c.cancelCtx[replKey]() delete(c.cache, replKey) delete(c.cancelCtx, replKey) } ctx, cancel := context.WithCancel(c.parentCtx) changes, err := c.changesFunc(ctx, username, password, url) if err != nil { cancel() return err } go c.listen(changes, repl.Namespace, repl.Name) c.cache[replKey] = couchKey c.cancelCtx[replKey] = cancel c.log.Info("listening for new replication events", "CHANGES_URL", fmt.Sprintf("%s/_changes", url)) return nil } func (c *ReplicationEvent) listen(changes Changes, namespace, name string) { defer func() { c.removeKey(fmt.Sprintf("%s/%s", namespace, name)) c.log.Info("replication event stopped") }() var lastRevision string for { for changes.Next() { if err := changes.Err(); err != nil { c.log.Error(err, "error in changes feed for replication event") break } if changes.ID() != couchdb.ReplicationDocument { continue } revisions := changes.Changes() if len(revisions) == 0 { continue } rev := revisions[0] // only one revision is expected if lastRevision == "" { lastRevision = rev continue } if lastRevision != rev { lastRevision = rev c.queue.Add(re.Request{NamespacedName: types.NamespacedName{Namespace: namespace, Name: name}}) c.log.Info("replication event triggered", "namespace", namespace, "name", name, "rev", rev) } } err := changes.Err() if err != nil { if c.isRetryableError(err) { time.Sleep(c.config.ReplicationChangesInterval) c.log.Info("retrying replication event") continue } c.log.Error(err, "replication event stopped with error") } break } } // commonNetworkIssues default implementation of RetryableErrorFunc func commonNetworkIssues(err error) bool { if err == nil { return false } errString := err.Error() return strings.Contains(errString, "Bad Gateway") || strings.Contains(errString, "timeout") } func (c *ReplicationEvent) Stop() { c.Lock() defer c.Unlock() if len(c.cache) == 0 { return } for _, cancel := range c.cancelCtx { cancel() } // reset the cache c.cache = make(map[string]string) c.cancelCtx = make(map[string]context.CancelFunc) } func (c *ReplicationEvent) changes(ctx context.Context, username, password, url string) (Changes, error) { couch := &couchdb.CouchDB{} err := couch.NewFromURL(username, password, url) if err != nil { return nil, fmt.Errorf("fail to create couchdb client: %w", err) } db := couch.Client.DB(c.config.ReplicationDB()) options := map[string]interface{}{ "feed": "continuous", "heartbeat": true, // keep connection alive "descending": false, } changes := db.Changes(ctx, kivik.Params(options)) if changes.Err() != nil { return nil, fmt.Errorf("fail to listen to replication event db: %s, changes: %w", db.Name(), err) } return changes, nil } func (c *ReplicationEvent) removeKey(replKey string) { c.Lock() defer c.Unlock() delete(c.cache, replKey) delete(c.cancelCtx, replKey) }