1 package couchctl
2
3 import (
4 "context"
5 "fmt"
6 "strings"
7 "sync"
8 "time"
9
10 "github.com/go-kivik/kivik/v4"
11
12 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
13 "edge-infra.dev/pkg/edge/datasync/couchdb"
14 "edge-infra.dev/pkg/lib/fog"
15
16 "github.com/go-logr/logr"
17
18 "k8s.io/apimachinery/pkg/types"
19 "k8s.io/client-go/util/workqueue"
20
21 re "sigs.k8s.io/controller-runtime/pkg/reconcile"
22 )
23
24 type Changes interface {
25 Next() bool
26 Err() error
27 ID() string
28 Changes() []string
29 }
30
31 type ChangesFunc func(ctx context.Context, username, password, url string) (Changes, error)
32 type RetryableErrorFunc func(err error) bool
33
34 type ReplicationEvent struct {
35 sync.Mutex
36 log logr.Logger
37 config *Config
38 queue workqueue.RateLimitingInterface
39 cache map[string]string
40 cancelCtx map[string]context.CancelFunc
41 parentCtx context.Context
42
43 changesFunc ChangesFunc
44 isRetryableError RetryableErrorFunc
45 }
46
47 func NewReplicationEvent(cfg *Config) *ReplicationEvent {
48 r := &ReplicationEvent{
49 config: cfg,
50 cache: make(map[string]string),
51 cancelCtx: make(map[string]context.CancelFunc),
52 log: fog.New().WithName("replication-event"),
53 }
54 r.changesFunc = r.changes
55 r.isRetryableError = commonNetworkIssues
56 return r
57 }
58
59 func (c *ReplicationEvent) Start(ctx context.Context, queue workqueue.RateLimitingInterface) error {
60 c.Lock()
61 defer c.Unlock()
62 c.queue = queue
63 c.parentCtx = ctx
64 c.log.Info("replication event queue started")
65 return nil
66 }
67
68 func (c *ReplicationEvent) Listen(repl *dsapi.CouchDBReplicationSet, username, password, url string) error {
69 c.Lock()
70 defer c.Unlock()
71 if c.queue == nil || c.parentCtx == nil {
72 return fmt.Errorf("replication event queue has not started")
73 }
74
75 replKey := fmt.Sprintf("%s/%s", repl.Namespace, repl.Name)
76 couchKey := fmt.Sprintf("%s:%s@%s", username, password, url)
77
78 if _couchKey := c.cache[replKey]; _couchKey == couchKey {
79 return nil
80 } else if _couchKey != "" && c.cancelCtx[replKey] != nil {
81
82 c.cancelCtx[replKey]()
83 delete(c.cache, replKey)
84 delete(c.cancelCtx, replKey)
85 }
86
87 ctx, cancel := context.WithCancel(c.parentCtx)
88 changes, err := c.changesFunc(ctx, username, password, url)
89 if err != nil {
90 cancel()
91 return err
92 }
93
94 go c.listen(changes, repl.Namespace, repl.Name)
95
96 c.cache[replKey] = couchKey
97 c.cancelCtx[replKey] = cancel
98 c.log.Info("listening for new replication events", "CHANGES_URL", fmt.Sprintf("%s/_changes", url))
99 return nil
100 }
101
102 func (c *ReplicationEvent) listen(changes Changes, namespace, name string) {
103 defer func() {
104 c.removeKey(fmt.Sprintf("%s/%s", namespace, name))
105 c.log.Info("replication event stopped")
106 }()
107
108 var lastRevision string
109 for {
110 for changes.Next() {
111 if err := changes.Err(); err != nil {
112 c.log.Error(err, "error in changes feed for replication event")
113 break
114 }
115
116 if changes.ID() != couchdb.ReplicationDocument {
117 continue
118 }
119
120 revisions := changes.Changes()
121 if len(revisions) == 0 {
122 continue
123 }
124
125 rev := revisions[0]
126
127 if lastRevision == "" {
128 lastRevision = rev
129 continue
130 }
131
132 if lastRevision != rev {
133 lastRevision = rev
134 c.queue.Add(re.Request{NamespacedName: types.NamespacedName{Namespace: namespace, Name: name}})
135 c.log.Info("replication event triggered", "namespace", namespace, "name", name, "rev", rev)
136 }
137 }
138 err := changes.Err()
139 if err != nil {
140 if c.isRetryableError(err) {
141 time.Sleep(c.config.ReplicationChangesInterval)
142 c.log.Info("retrying replication event")
143 continue
144 }
145 c.log.Error(err, "replication event stopped with error")
146 }
147 break
148 }
149 }
150
151
152 func commonNetworkIssues(err error) bool {
153 if err == nil {
154 return false
155 }
156 errString := err.Error()
157 return strings.Contains(errString, "Bad Gateway") ||
158 strings.Contains(errString, "timeout")
159 }
160
161 func (c *ReplicationEvent) Stop() {
162 c.Lock()
163 defer c.Unlock()
164 if len(c.cache) == 0 {
165 return
166 }
167 for _, cancel := range c.cancelCtx {
168 cancel()
169 }
170
171 c.cache = make(map[string]string)
172 c.cancelCtx = make(map[string]context.CancelFunc)
173 }
174
175 func (c *ReplicationEvent) changes(ctx context.Context, username, password, url string) (Changes, error) {
176 couch := &couchdb.CouchDB{}
177 err := couch.NewFromURL(username, password, url)
178 if err != nil {
179 return nil, fmt.Errorf("fail to create couchdb client: %w", err)
180 }
181
182 db := couch.Client.DB(c.config.ReplicationDB())
183 options := map[string]interface{}{
184 "feed": "continuous",
185 "heartbeat": true,
186 "descending": false,
187 }
188 changes := db.Changes(ctx, kivik.Params(options))
189 if changes.Err() != nil {
190 return nil, fmt.Errorf("fail to listen to replication event db: %s, changes: %w", db.Name(), err)
191 }
192 return changes, nil
193 }
194
195 func (c *ReplicationEvent) removeKey(replKey string) {
196 c.Lock()
197 defer c.Unlock()
198 delete(c.cache, replKey)
199 delete(c.cancelCtx, replKey)
200 }
201
View as plain text