1 package couchctl
2
3 import (
4 "context"
5 "errors"
6 "fmt"
7 "net/url"
8 "slices"
9
10 "maps"
11
12 "github.com/go-kivik/kivik/v4"
13 "github.com/go-logr/logr"
14
15 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
16 "edge-infra.dev/pkg/edge/datasync/couchdb"
17 )
18
19 type ReplicationInfo struct {
20 SourceURI string
21 SourceUsername string
22 SourcePassword string
23
24 TargetURI string
25 TargetUsername string
26 TargetPassword string
27 }
28
29 type State int
30
31 const (
32 Processing State = iota
33 Done
34 Error
35 )
36
37 type Stat struct {
38 total int
39 processing int
40 done int
41 error int
42 }
43
44 func (s Stat) String() string {
45 return fmt.Sprintf("Total: %d, Processing: %d, Done: %d, Error: %d", s.total, s.processing, s.done, s.error)
46 }
47 func (s Stat) Done() bool {
48 return s.processing == 0
49 }
50
51 type BulkDocs struct {
52 Docs map[string]*BulkDoc
53 }
54
55 type BulkDoc struct {
56 State
57 Doc map[string]interface{}
58 Error error
59 Dataset dsapi.Dataset
60 }
61
62 func (d *BulkDoc) SetError(err error) {
63 d.State = Error
64 d.Error = err
65 }
66
67 func (d *BulkDoc) SetDone() {
68 d.State = Done
69 }
70
71 func (b *BulkDocs) SetProcessing(id string) bool {
72 return b.SetState(id, Processing)
73 }
74
75 func (b *BulkDocs) SetDone(id string) bool {
76 return b.SetState(id, Done)
77 }
78
79 func (b *BulkDocs) SetError(id string, err error) bool {
80 if doc, ok := b.Docs[id]; ok {
81 doc.State = Error
82 doc.Error = err
83 }
84 return false
85 }
86
87 func (b *BulkDocs) Remove(dbs ...string) {
88 for _, dbname := range dbs {
89 delete(b.Docs, dbname)
90 }
91 }
92
93
94 func (b *BulkDocs) SetErrors(err error) {
95 for _, doc := range b.Docs {
96 if doc.State != Done {
97 doc.Error = err
98 doc.State = Error
99 }
100 }
101 }
102
103 func (b *BulkDocs) SetState(id string, state State) bool {
104 if doc, ok := b.Docs[id]; ok {
105 doc.State = state
106 }
107 return false
108 }
109
110 func (b *BulkDocs) DoneProcessing() bool {
111 for _, doc := range b.Docs {
112 if doc.State == Processing {
113 return false
114 }
115 }
116 return true
117 }
118
119 func (b *BulkDocs) ProcessingDocs() []interface{} {
120 var results []interface{}
121 for _, doc := range b.Docs {
122 if doc.State == Processing && doc.Doc != nil {
123 results = append(results, doc.Doc)
124 }
125 }
126 return results
127 }
128
129 func (b *BulkDocs) SetRevision(id string, rev string) bool {
130 if doc, ok := b.Docs[id]; ok && doc.Doc != nil {
131 doc.State = Processing
132 doc.Doc["_rev"] = rev
133 return true
134 }
135 return false
136 }
137
138 func (b *BulkDocs) JoinErrors() error {
139 var errs []error
140 for _, doc := range b.Docs {
141 if err := doc.Error; err != nil {
142 errs = append(errs, err)
143 }
144 }
145 return errors.Join(errs...)
146 }
147
148 func (b *BulkDocs) Stats() Stat {
149 processing := 0
150 done := 0
151 err := 0
152 for _, doc := range b.Docs {
153 switch doc.State {
154 case Processing:
155 processing++
156 case Done:
157 done++
158 case Error:
159 err++
160 }
161 }
162
163 return Stat{total: len(b.Docs), processing: processing, done: done, error: err}
164 }
165
166 func (b *BulkDocs) GetAllDocs() []*BulkDoc {
167 return slices.Collect(maps.Values(b.Docs))
168 }
169
170 func (b *BulkDocs) GetDocs(state State) []string {
171 docs := make([]string, 0)
172 for dbname, doc := range b.Docs {
173 if doc.State == state {
174 docs = append(docs, dbname)
175 }
176 }
177 return docs
178 }
179
180 func (b *BulkDocs) Equals(id string, doc map[string]interface{}) bool {
181 if bulkDoc, ok := b.Docs[id]; ok && bulkDoc.Doc != nil && doc != nil {
182 return doc["_id"] == bulkDoc.Doc["_id"] &&
183 doc["continuous"] == bulkDoc.Doc["continuous"] &&
184 doc["create_target"] == bulkDoc.Doc["create_target"] &&
185 doc["source"] == bulkDoc.Doc["source"] &&
186 doc["target"] == bulkDoc.Doc["target"] &&
187 doc["cancel"] == bulkDoc.Doc["cancel"]
188
189 }
190 return false
191 }
192
193
194 func (r *CouchReplicationReconciler) bulkInsert(ctx context.Context, client *couchdb.CouchDB, docs *BulkDocs) error {
195 log := logr.FromContextOrDiscard(ctx)
196
197 log.Info("bulkInsert", "stats", docs.Stats())
198
199 if docs.DoneProcessing() {
200 log.Info("bulk insert done processing", "stats", docs.Stats().String())
201 return nil
202 }
203
204 db, err := client.GetReplicatorDB()
205 if err != nil {
206 docs.SetErrors(err)
207 log.Error(err, "fail to get replicator DB")
208 return err
209 }
210
211 bulkResults, bulkErr := db.BulkDocs(ctx, docs.ProcessingDocs())
212 if bulkErr != nil {
213 docs.SetErrors(err)
214 log.Error(bulkErr, "failed to create replications in bulk")
215 return bulkErr
216 }
217
218 var conflictDocs []kivik.BulkGetReference
219 for _, bulkResult := range bulkResults {
220 dbname := bulkResult.ID
221 err := bulkResult.Error
222 if err == nil {
223 docs.SetDone(dbname)
224 continue
225 }
226 if couchdb.IsConflict(err) {
227 conflictDocs = append(conflictDocs, kivik.BulkGetReference{ID: dbname})
228 docs.SetProcessing(dbname)
229 continue
230 }
231
232 docs.SetError(dbname, err)
233 }
234
235 if len(conflictDocs) == 0 {
236 return r.bulkInsert(ctx, client, docs)
237 }
238
239 rows := db.BulkGet(ctx, conflictDocs)
240 if rows.Err() != nil {
241 docs.SetErrors(rows.Err())
242 log.Error(rows.Err(), "error getting bulk replications")
243 return rows.Err()
244 }
245
246 for rows.Next() {
247 dbname, err := rows.ID()
248 if err != nil {
249 docs.SetError(dbname, err)
250 continue
251 }
252 doc := make(map[string]interface{})
253 if scanErr := rows.ScanDoc(&doc); scanErr != nil {
254 docs.SetError(dbname, scanErr)
255 continue
256 }
257 if docs.Equals(dbname, doc) {
258 docs.SetDone(dbname)
259 continue
260 }
261 rev := doc["_rev"].(string)
262 docs.SetRevision(dbname, rev)
263 }
264
265 return r.bulkInsert(ctx, client, docs)
266 }
267
268 func toBulkReplicationDocs(r *ReplicationInfo, ds []dsapi.Dataset, cancel bool) *BulkDocs {
269 bd := &BulkDocs{Docs: map[string]*BulkDoc{}}
270 for _, d := range ds {
271 dbname := d.Name
272 sourceDSN, err1 := buildDSNName(r.SourceUsername, r.SourcePassword, r.SourceURI, dbname)
273 targetDSN, err2 := buildDSNName(r.TargetUsername, r.TargetPassword, r.TargetURI, dbname)
274 if err := errors.Join(err1, err2); err != nil {
275 bd.Docs[dbname] = &BulkDoc{
276 Dataset: d,
277 State: Error,
278 Error: err,
279 }
280 } else {
281 replDoc := toReplicationSettings(d.Config, dbname)
282 replDoc["source"] = sourceDSN
283 replDoc["target"] = targetDSN
284 if cancel {
285 replDoc["cancel"] = cancel
286 }
287 bd.Docs[dbname] = &BulkDoc{
288 Dataset: d,
289 Doc: replDoc,
290 State: Processing,
291 }
292 }
293 }
294 return bd
295 }
296
297 func buildDSNName(username, password, uri, dbname string) (string, error) {
298 u, err := url.Parse(uri)
299 if err != nil {
300 return "", err
301 }
302 if !u.IsAbs() {
303 u.Scheme = "http"
304 }
305 u.User = url.UserPassword(username, password)
306 u.Path = dbname
307 return u.String(), err
308 }
309
310
311 func toReplicationSettings(r dsapi.ReplConfig, id string) map[string]interface{} {
312 m := make(map[string]interface{})
313 m["_id"] = id
314 m["continuous"] = r.Continuous
315 m["create_target"] = r.CreateTarget
316 return m
317 }
318
View as plain text