1 package cushion
2
3 import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8
9 "edge-infra.dev/pkg/edge/chariot"
10 "edge-infra.dev/pkg/edge/datasync/couchdb"
11
12 "github.com/go-kivik/kivik/v4"
13 _ "github.com/go-kivik/kivik/v4/couchdb"
14 )
15
16
17
18
19
20
21
22
23 type CouchDBStorage struct {
24 client *kivik.Client
25 d *Daemon
26 }
27
28 func NewCouchDBStorage(client *kivik.Client, d *Daemon) *CouchDBStorage {
29 return &CouchDBStorage{
30 client: client,
31 d: d,
32 }
33 }
34
35 func (s *CouchDBStorage) BulKPut(ctx context.Context, db *kivik.DB, msgs ...*Message) (chariot.StorageInfo, error) {
36 if len(msgs) == 0 {
37 return chariot.StorageInfo{ObjectsEmpty: true}, nil
38 }
39
40 if err := s.bulkInsertWithRetry(ctx, db, msgs...); err != nil {
41 return chariot.StorageInfo{}, err
42 }
43 return chariot.StorageInfo{}, nil
44 }
45
46 func (s *CouchDBStorage) Put(ctx context.Context, db *kivik.DB, objs ...chariot.StorageObject) (chariot.StorageInfo, error) {
47 if len(objs) == 0 {
48 return chariot.StorageInfo{ObjectsEmpty: true}, nil
49 }
50
51
52 if len(objs) > 1 {
53 return chariot.StorageInfo{}, fmt.Errorf("idk")
54 }
55
56 obj := objs[0]
57 var pi chariot.StorageInfo
58
59 if err := s.insertWithRetry(ctx, db, &obj); err != nil {
60 pi.Errors = append(pi.Errors, chariot.StorageObjectError{
61 Object: obj,
62 Error: err.Error(),
63 })
64 } else {
65 pi.ObjectsPut = append(pi.ObjectsPut, obj)
66 }
67
68 var reterr error
69 if len(pi.Errors) > 0 {
70 reterr = fmt.Errorf("got errors putting objects: %v", pi.Errors)
71 }
72 return pi, reterr
73 }
74
75
76 func (s *CouchDBStorage) Delete(ctx context.Context, db *kivik.DB, objs ...chariot.StorageObject) (chariot.StorageInfo, error) {
77 if len(objs) == 0 {
78 return chariot.StorageInfo{ObjectsEmpty: true}, nil
79 }
80
81
82 if len(objs) > 1 {
83 return chariot.StorageInfo{}, fmt.Errorf("idk")
84 }
85
86 obj := objs[0]
87 var di chariot.StorageInfo
88 rev, err := db.GetRev(ctx, obj.Location)
89 if couchdb.IgnoreNotFound(err) != nil {
90 di.Errors = append(di.Errors, chariot.StorageObjectError{
91 Object: obj,
92 Error: fmt.Sprintf("fail to get entity metadata: %s", err.Error()),
93 })
94 }
95
96
97 if len(di.Errors) == 0 && rev != "" {
98
99 if _, err := db.Delete(ctx, obj.Location, rev); couchdb.IgnoreNotFound(err) != nil {
100 di.Errors = append(di.Errors, chariot.StorageObjectError{
101 Object: obj,
102 Error: fmt.Sprintf("fail to delete entity: %s", err.Error()),
103 })
104 }
105 di.ObjectsDeleted = append(di.ObjectsDeleted, obj)
106 }
107
108 var reterr error
109 if len(di.Errors) > 0 {
110 reterr = fmt.Errorf("got errors deleting objects: %v", di.Errors)
111 }
112 return di, reterr
113 }
114
115 func (s *CouchDBStorage) insertWithRetry(ctx context.Context, db *kivik.DB, obj *chariot.StorageObject) error {
116 _, err := db.Put(ctx, obj.Location, obj.Content)
117 if err == nil {
118 return nil
119 }
120 if kivik.HTTPStatus(err) != 409 {
121 return err
122 }
123
124 rev, metaErr := db.GetRev(ctx, obj.Location)
125 if metaErr != nil {
126 return metaErr
127 }
128
129 content := make(map[string]interface{})
130 if unmarErr := json.Unmarshal([]byte(obj.Content), &content); unmarErr != nil {
131 return unmarErr
132 }
133 content["_rev"] = rev
134
135 contentBytes, marErr := json.Marshal(content)
136 if marErr != nil {
137 return marErr
138 }
139 obj.Content = string(contentBytes)
140 return s.insertWithRetry(ctx, db, obj)
141 }
142
143
144 func (s *CouchDBStorage) bulkInsertWithRetry(ctx context.Context, db *kivik.DB, msgs ...*Message) error {
145 _msgs := Messages(msgs)
146 if _msgs.Processed() {
147 return nil
148 }
149
150 bulkMsgs, bulkDocs := s.toBulkDocs(msgs)
151 if len(bulkDocs) == 0 {
152 return nil
153 }
154
155 bulkResults, bulkErr := db.BulkDocs(ctx, bulkDocs)
156 if bulkErr != nil {
157
158 for _, msg := range bulkMsgs {
159 s.nack(msg, bulkErr)
160 }
161 s.d.logger.Error(bulkErr, "can't put messages in bulk")
162 return bulkErr
163 }
164
165 var conflictDocs []kivik.BulkGetReference
166 for _, bulkResult := range bulkResults {
167 id := bulkResult.ID
168 err := bulkResult.Error
169
170 if err == nil {
171 if msg, ok := bulkMsgs[id]; ok {
172 s.ack(msg)
173 }
174 continue
175 }
176
177 if couchdb.IsConflict(err) {
178 conflictDocs = append(conflictDocs, kivik.BulkGetReference{ID: id})
179 continue
180 }
181
182 if msg, ok := bulkMsgs[id]; ok {
183 s.nack(msg, nil)
184 }
185
186 s.d.logger.Error(errors.Join(err), "msg has been nacked", "id", id, "db", db.Name())
187 }
188
189 if len(conflictDocs) == 0 {
190 return s.bulkInsertWithRetry(ctx, db, msgs...)
191 }
192
193 rows := db.BulkGet(ctx, conflictDocs, kivik.Param("revs", true))
194 if rows.Err() != nil {
195 for _, ref := range conflictDocs {
196 if msg, ok := bulkMsgs[ref.ID]; ok {
197 s.nack(msg, rows.Err())
198 }
199 }
200 s.d.logger.Error(rows.Err(), "bulk get errors, messages nacked")
201 return rows.Err()
202 }
203
204 for rows.Next() {
205
206 id, _ := rows.ID()
207 msg, ok := bulkMsgs[id]
208 if !ok {
209 continue
210 }
211 doc := make(map[string]interface{})
212 if scanErr := rows.ScanDoc(&doc); scanErr != nil {
213 s.nack(msg, scanErr)
214 continue
215 }
216 msg.Rev = doc["_rev"].(string)
217 }
218
219 return s.bulkInsertWithRetry(ctx, db, msgs...)
220 }
221
222
223 func (s *CouchDBStorage) ack(msg *Message) {
224
225 if err := s.d.CreateOrUpdateReplicationDoc(msg.Req); err != nil {
226 s.nack(msg, err)
227 return
228 }
229 msg.Ack()
230 s.d.metrics.CushionSuccessfulAcksTotal.WithLabelValues(validTenantIDAndDBName(msg.Req.TenantID, msg.Req.DBName)...).Inc()
231 }
232
233
234 func (s *CouchDBStorage) nack(msg *Message, err error) {
235 msg.NackAndLog(err)
236 s.d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(msg.Req.TenantID, msg.Req.DBName)...).Inc()
237 }
238
239
240 func (s *CouchDBStorage) toBulkDocs(msgs []*Message) (map[string]*Message, []interface{}) {
241 bulkMsgs := map[string]*Message{}
242 var bulkDocs []interface{}
243 exists := map[string]bool{}
244 for _, msg := range msgs {
245 if msg.Processed() {
246 continue
247 }
248 id := msg.Req.EntityID
249 if exists[id] {
250
251 continue
252 }
253 doc, err := JSONUpdate(msg.Msg.Data(), map[string]string{"_id": id, "_rev": msg.Rev})
254 if err != nil {
255 s.nack(msg, err)
256 continue
257 }
258 bulkDocs = append(bulkDocs, doc)
259 bulkMsgs[id] = msg
260 exists[msg.Req.EntityID] = true
261 }
262 return bulkMsgs, bulkDocs
263 }
264
265
266 func JSONUpdate(doc []byte, kvs map[string]string) (map[string]interface{}, error) {
267 m := make(map[string]interface{})
268 if err := json.Unmarshal(doc, &m); err != nil {
269 return nil, err
270 }
271 for key, val := range kvs {
272 if val != "" {
273 m[key] = val
274 }
275 }
276 return m, nil
277 }
278
View as plain text