package cushion import ( "context" "encoding/json" "errors" "fmt" "edge-infra.dev/pkg/edge/chariot" "edge-infra.dev/pkg/edge/datasync/couchdb" "github.com/go-kivik/kivik/v4" _ "github.com/go-kivik/kivik/v4/couchdb" // The CouchDB driver ) // todo - will have to go back and probably add this in, maybe align with chariot2 // // Storer is a simple interface for putting and deleting storage objects. // type Storer interface { // Put(ctx context.Context, objects ...chariot.StorageObject) (chariot.PutInfo, error) // Delete(ctx context.Context, objects ...chariot.StorageObject) (chariot.DeleteInfo, error) // } type CouchDBStorage struct { client *kivik.Client d *Daemon } func NewCouchDBStorage(client *kivik.Client, d *Daemon) *CouchDBStorage { return &CouchDBStorage{ client: client, d: d, } } func (s *CouchDBStorage) BulKPut(ctx context.Context, db *kivik.DB, msgs ...*Message) (chariot.StorageInfo, error) { if len(msgs) == 0 { return chariot.StorageInfo{ObjectsEmpty: true}, nil } if err := s.bulkInsertWithRetry(ctx, db, msgs...); err != nil { return chariot.StorageInfo{}, err } return chariot.StorageInfo{}, nil } func (s *CouchDBStorage) Put(ctx context.Context, db *kivik.DB, objs ...chariot.StorageObject) (chariot.StorageInfo, error) { //nolint:dupl if len(objs) == 0 { return chariot.StorageInfo{ObjectsEmpty: true}, nil } // don't support right now, eventually this could possibly end up being bulk updates // but more refactoring will need to be done as per message wouldn't work if len(objs) > 1 { return chariot.StorageInfo{}, fmt.Errorf("idk") } obj := objs[0] var pi chariot.StorageInfo // entity_id == doc id if err := s.insertWithRetry(ctx, db, &obj); err != nil { pi.Errors = append(pi.Errors, chariot.StorageObjectError{ Object: obj, Error: err.Error(), }) } else { pi.ObjectsPut = append(pi.ObjectsPut, obj) } var reterr error if len(pi.Errors) > 0 { reterr = fmt.Errorf("got errors putting objects: %v", pi.Errors) } return pi, reterr } // Delete deletes objects at the provided StorageObject.Location fields. func (s *CouchDBStorage) Delete(ctx context.Context, db *kivik.DB, objs ...chariot.StorageObject) (chariot.StorageInfo, error) { //nolint:dupl if len(objs) == 0 { return chariot.StorageInfo{ObjectsEmpty: true}, nil } // don't support right now, eventually this could possibly end up being bulk updates // but more refactoring will need to be done as per message wouldn't work if len(objs) > 1 { return chariot.StorageInfo{}, fmt.Errorf("idk") } obj := objs[0] var di chariot.StorageInfo rev, err := db.GetRev(ctx, obj.Location) if couchdb.IgnoreNotFound(err) != nil { di.Errors = append(di.Errors, chariot.StorageObjectError{ Object: obj, Error: fmt.Sprintf("fail to get entity metadata: %s", err.Error()), }) } // todo - add retry logic at some point if len(di.Errors) == 0 && rev != "" { // entity_id == doc id if _, err := db.Delete(ctx, obj.Location, rev); couchdb.IgnoreNotFound(err) != nil { di.Errors = append(di.Errors, chariot.StorageObjectError{ Object: obj, Error: fmt.Sprintf("fail to delete entity: %s", err.Error()), }) } di.ObjectsDeleted = append(di.ObjectsDeleted, obj) } var reterr error if len(di.Errors) > 0 { reterr = fmt.Errorf("got errors deleting objects: %v", di.Errors) } return di, reterr } func (s *CouchDBStorage) insertWithRetry(ctx context.Context, db *kivik.DB, obj *chariot.StorageObject) error { _, err := db.Put(ctx, obj.Location, obj.Content) if err == nil { return nil } if kivik.HTTPStatus(err) != 409 { return err } rev, metaErr := db.GetRev(ctx, obj.Location) if metaErr != nil { return metaErr } content := make(map[string]interface{}) if unmarErr := json.Unmarshal([]byte(obj.Content), &content); unmarErr != nil { return unmarErr } content["_rev"] = rev contentBytes, marErr := json.Marshal(content) if marErr != nil { return marErr } obj.Content = string(contentBytes) return s.insertWithRetry(ctx, db, obj) } // bulkInsertWithRetry assumes doc ids are unique func (s *CouchDBStorage) bulkInsertWithRetry(ctx context.Context, db *kivik.DB, msgs ...*Message) error { _msgs := Messages(msgs) if _msgs.Processed() { return nil } bulkMsgs, bulkDocs := s.toBulkDocs(msgs) if len(bulkDocs) == 0 { return nil // no messages left to processed } bulkResults, bulkErr := db.BulkDocs(ctx, bulkDocs) if bulkErr != nil { // ensure message can be re-processed as soon as possible for _, msg := range bulkMsgs { s.nack(msg, bulkErr) } s.d.logger.Error(bulkErr, "can't put messages in bulk") return bulkErr } var conflictDocs []kivik.BulkGetReference for _, bulkResult := range bulkResults { id := bulkResult.ID err := bulkResult.Error if err == nil { if msg, ok := bulkMsgs[id]; ok { s.ack(msg) } continue } if couchdb.IsConflict(err) { conflictDocs = append(conflictDocs, kivik.BulkGetReference{ID: id}) continue } if msg, ok := bulkMsgs[id]; ok { s.nack(msg, nil) } s.d.logger.Error(errors.Join(err), "msg has been nacked", "id", id, "db", db.Name()) } if len(conflictDocs) == 0 { // only uniq ids are processed by bulk docs return s.bulkInsertWithRetry(ctx, db, msgs...) } rows := db.BulkGet(ctx, conflictDocs, kivik.Param("revs", true)) if rows.Err() != nil { for _, ref := range conflictDocs { if msg, ok := bulkMsgs[ref.ID]; ok { s.nack(msg, rows.Err()) } } s.d.logger.Error(rows.Err(), "bulk get errors, messages nacked") return rows.Err() } for rows.Next() { // Err handle? id, _ := rows.ID() msg, ok := bulkMsgs[id] if !ok { continue } doc := make(map[string]interface{}) if scanErr := rows.ScanDoc(&doc); scanErr != nil { s.nack(msg, scanErr) continue } msg.Rev = doc["_rev"].(string) } return s.bulkInsertWithRetry(ctx, db, msgs...) } // ack convenience method for acking a message func (s *CouchDBStorage) ack(msg *Message) { // TODO simplify CreateOrUpdateReplicationDoc if err := s.d.CreateOrUpdateReplicationDoc(msg.Req); err != nil { s.nack(msg, err) return } msg.Ack() s.d.metrics.CushionSuccessfulAcksTotal.WithLabelValues(validTenantIDAndDBName(msg.Req.TenantID, msg.Req.DBName)...).Inc() } // nack convenience method for nacking a message func (s *CouchDBStorage) nack(msg *Message, err error) { msg.NackAndLog(err) s.d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(msg.Req.TenantID, msg.Req.DBName)...).Inc() } // toBulkDocs converts messages to valid couchdb bulk docs: remove duplicates, nacks invalid messages func (s *CouchDBStorage) toBulkDocs(msgs []*Message) (map[string]*Message, []interface{}) { bulkMsgs := map[string]*Message{} var bulkDocs []interface{} exists := map[string]bool{} for _, msg := range msgs { if msg.Processed() { continue } id := msg.Req.EntityID if exists[id] { // can't bulk update duplicate documents continue } doc, err := JSONUpdate(msg.Msg.Data(), map[string]string{"_id": id, "_rev": msg.Rev}) if err != nil { s.nack(msg, err) continue } bulkDocs = append(bulkDocs, doc) bulkMsgs[id] = msg exists[msg.Req.EntityID] = true } return bulkMsgs, bulkDocs } // JSONUpdate adds _id and _rev to docs for bulk update func JSONUpdate(doc []byte, kvs map[string]string) (map[string]interface{}, error) { m := make(map[string]interface{}) if err := json.Unmarshal(doc, &m); err != nil { return nil, err } for key, val := range kvs { if val != "" { m[key] = val } } return m, nil }