...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/replication_event.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"strings"
     7  	"sync"
     8  	"time"
     9  
    10  	"github.com/go-kivik/kivik/v4"
    11  
    12  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    13  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    14  	"edge-infra.dev/pkg/lib/fog"
    15  
    16  	"github.com/go-logr/logr"
    17  
    18  	"k8s.io/apimachinery/pkg/types"
    19  	"k8s.io/client-go/util/workqueue"
    20  
    21  	re "sigs.k8s.io/controller-runtime/pkg/reconcile"
    22  )
    23  
    24  type Changes interface {
    25  	Next() bool
    26  	Err() error
    27  	ID() string        // document ID
    28  	Changes() []string // list of revisions
    29  }
    30  
    31  type ChangesFunc func(ctx context.Context, username, password, url string) (Changes, error)
    32  type RetryableErrorFunc func(err error) bool
    33  
    34  type ReplicationEvent struct {
    35  	sync.Mutex
    36  	log       logr.Logger
    37  	config    *Config
    38  	queue     workqueue.RateLimitingInterface
    39  	cache     map[string]string
    40  	cancelCtx map[string]context.CancelFunc
    41  	parentCtx context.Context
    42  
    43  	changesFunc      ChangesFunc
    44  	isRetryableError RetryableErrorFunc
    45  }
    46  
    47  func NewReplicationEvent(cfg *Config) *ReplicationEvent {
    48  	r := &ReplicationEvent{
    49  		config:    cfg,
    50  		cache:     make(map[string]string),
    51  		cancelCtx: make(map[string]context.CancelFunc),
    52  		log:       fog.New().WithName("replication-event"),
    53  	}
    54  	r.changesFunc = r.changes
    55  	r.isRetryableError = commonNetworkIssues
    56  	return r
    57  }
    58  
    59  func (c *ReplicationEvent) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
    60  	c.Lock()
    61  	defer c.Unlock()
    62  	c.queue = queue
    63  	c.parentCtx = ctx
    64  	c.log.Info("replication event queue started")
    65  	return nil
    66  }
    67  
    68  func (c *ReplicationEvent) Listen(repl *dsapi.CouchDBReplicationSet, username, password, url string) error {
    69  	c.Lock()
    70  	defer c.Unlock()
    71  	if c.queue == nil || c.parentCtx == nil {
    72  		return fmt.Errorf("replication event queue has not started")
    73  	}
    74  
    75  	replKey := fmt.Sprintf("%s/%s", repl.Namespace, repl.Name)
    76  	couchKey := fmt.Sprintf("%s:%s@%s", username, password, url)
    77  
    78  	if _couchKey := c.cache[replKey]; _couchKey == couchKey {
    79  		return nil
    80  	} else if _couchKey != "" && c.cancelCtx[replKey] != nil {
    81  		// if couch replication values are different, cancel the previous context.
    82  		c.cancelCtx[replKey]()
    83  		delete(c.cache, replKey)
    84  		delete(c.cancelCtx, replKey)
    85  	}
    86  
    87  	ctx, cancel := context.WithCancel(c.parentCtx)
    88  	changes, err := c.changesFunc(ctx, username, password, url)
    89  	if err != nil {
    90  		cancel()
    91  		return err
    92  	}
    93  
    94  	go c.listen(changes, repl.Namespace, repl.Name)
    95  
    96  	c.cache[replKey] = couchKey
    97  	c.cancelCtx[replKey] = cancel
    98  	c.log.Info("listening for new replication events", "CHANGES_URL", fmt.Sprintf("%s/_changes", url))
    99  	return nil
   100  }
   101  
   102  func (c *ReplicationEvent) listen(changes Changes, namespace, name string) {
   103  	defer func() {
   104  		c.removeKey(fmt.Sprintf("%s/%s", namespace, name))
   105  		c.log.Info("replication event stopped")
   106  	}()
   107  
   108  	var lastRevision string
   109  	for {
   110  		for changes.Next() {
   111  			if err := changes.Err(); err != nil {
   112  				c.log.Error(err, "error in changes feed for replication event")
   113  				break
   114  			}
   115  
   116  			if changes.ID() != couchdb.ReplicationDocument {
   117  				continue
   118  			}
   119  
   120  			revisions := changes.Changes()
   121  			if len(revisions) == 0 {
   122  				continue
   123  			}
   124  
   125  			rev := revisions[0] // only one revision is expected
   126  
   127  			if lastRevision == "" {
   128  				lastRevision = rev
   129  				continue
   130  			}
   131  
   132  			if lastRevision != rev {
   133  				lastRevision = rev
   134  				c.queue.Add(re.Request{NamespacedName: types.NamespacedName{Namespace: namespace, Name: name}})
   135  				c.log.Info("replication event triggered", "namespace", namespace, "name", name, "rev", rev)
   136  			}
   137  		}
   138  		err := changes.Err()
   139  		if err != nil {
   140  			if c.isRetryableError(err) {
   141  				time.Sleep(c.config.ReplicationChangesInterval)
   142  				c.log.Info("retrying replication event")
   143  				continue
   144  			}
   145  			c.log.Error(err, "replication event stopped with error")
   146  		}
   147  		break
   148  	}
   149  }
   150  
   151  // commonNetworkIssues default implementation of RetryableErrorFunc
   152  func commonNetworkIssues(err error) bool {
   153  	if err == nil {
   154  		return false
   155  	}
   156  	errString := err.Error()
   157  	return strings.Contains(errString, "Bad Gateway") ||
   158  		strings.Contains(errString, "timeout")
   159  }
   160  
   161  func (c *ReplicationEvent) Stop() {
   162  	c.Lock()
   163  	defer c.Unlock()
   164  	if len(c.cache) == 0 {
   165  		return
   166  	}
   167  	for _, cancel := range c.cancelCtx {
   168  		cancel()
   169  	}
   170  	// reset the cache
   171  	c.cache = make(map[string]string)
   172  	c.cancelCtx = make(map[string]context.CancelFunc)
   173  }
   174  
   175  func (c *ReplicationEvent) changes(ctx context.Context, username, password, url string) (Changes, error) {
   176  	couch := &couchdb.CouchDB{}
   177  	err := couch.NewFromURL(username, password, url)
   178  	if err != nil {
   179  		return nil, fmt.Errorf("fail to create couchdb client: %w", err)
   180  	}
   181  
   182  	db := couch.Client.DB(c.config.ReplicationDB())
   183  	options := map[string]interface{}{
   184  		"feed":       "continuous",
   185  		"heartbeat":  true, // keep connection alive
   186  		"descending": false,
   187  	}
   188  	changes := db.Changes(ctx, kivik.Params(options))
   189  	if changes.Err() != nil {
   190  		return nil, fmt.Errorf("fail to listen to replication event db: %s, changes: %w", db.Name(), err)
   191  	}
   192  	return changes, nil
   193  }
   194  
   195  func (c *ReplicationEvent) removeKey(replKey string) {
   196  	c.Lock()
   197  	defer c.Unlock()
   198  	delete(c.cache, replKey)
   199  	delete(c.cancelCtx, replKey)
   200  }
   201  

View as plain text