...

Source file src/edge-infra.dev/pkg/edge/datasync/cushion/storage.go

Documentation: edge-infra.dev/pkg/edge/datasync/cushion

     1  package cushion
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"errors"
     7  	"fmt"
     8  
     9  	"edge-infra.dev/pkg/edge/chariot"
    10  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    11  
    12  	"github.com/go-kivik/kivik/v4"
    13  	_ "github.com/go-kivik/kivik/v4/couchdb" // The CouchDB driver
    14  )
    15  
    16  // todo - will have to go back and probably add this in, maybe align with chariot2
    17  // // Storer is a simple interface for putting and deleting storage objects.
    18  // type Storer interface {
    19  // 	Put(ctx context.Context, objects ...chariot.StorageObject) (chariot.PutInfo, error)
    20  // 	Delete(ctx context.Context, objects ...chariot.StorageObject) (chariot.DeleteInfo, error)
    21  // }
    22  
    23  type CouchDBStorage struct {
    24  	client *kivik.Client
    25  	d      *Daemon
    26  }
    27  
    28  func NewCouchDBStorage(client *kivik.Client, d *Daemon) *CouchDBStorage {
    29  	return &CouchDBStorage{
    30  		client: client,
    31  		d:      d,
    32  	}
    33  }
    34  
    35  func (s *CouchDBStorage) BulKPut(ctx context.Context, db *kivik.DB, msgs ...*Message) (chariot.StorageInfo, error) {
    36  	if len(msgs) == 0 {
    37  		return chariot.StorageInfo{ObjectsEmpty: true}, nil
    38  	}
    39  
    40  	if err := s.bulkInsertWithRetry(ctx, db, msgs...); err != nil {
    41  		return chariot.StorageInfo{}, err
    42  	}
    43  	return chariot.StorageInfo{}, nil
    44  }
    45  
    46  func (s *CouchDBStorage) Put(ctx context.Context, db *kivik.DB, objs ...chariot.StorageObject) (chariot.StorageInfo, error) { //nolint:dupl
    47  	if len(objs) == 0 {
    48  		return chariot.StorageInfo{ObjectsEmpty: true}, nil
    49  	}
    50  	// don't support right now, eventually this could possibly end up being bulk updates
    51  	// but more refactoring will need to be done as per message wouldn't work
    52  	if len(objs) > 1 {
    53  		return chariot.StorageInfo{}, fmt.Errorf("idk")
    54  	}
    55  
    56  	obj := objs[0]
    57  	var pi chariot.StorageInfo
    58  	// entity_id == doc id
    59  	if err := s.insertWithRetry(ctx, db, &obj); err != nil {
    60  		pi.Errors = append(pi.Errors, chariot.StorageObjectError{
    61  			Object: obj,
    62  			Error:  err.Error(),
    63  		})
    64  	} else {
    65  		pi.ObjectsPut = append(pi.ObjectsPut, obj)
    66  	}
    67  
    68  	var reterr error
    69  	if len(pi.Errors) > 0 {
    70  		reterr = fmt.Errorf("got errors putting objects: %v", pi.Errors)
    71  	}
    72  	return pi, reterr
    73  }
    74  
    75  // Delete deletes objects at the provided StorageObject.Location fields.
    76  func (s *CouchDBStorage) Delete(ctx context.Context, db *kivik.DB, objs ...chariot.StorageObject) (chariot.StorageInfo, error) { //nolint:dupl
    77  	if len(objs) == 0 {
    78  		return chariot.StorageInfo{ObjectsEmpty: true}, nil
    79  	}
    80  	// don't support right now, eventually this could possibly end up being bulk updates
    81  	// but more refactoring will need to be done as per message wouldn't work
    82  	if len(objs) > 1 {
    83  		return chariot.StorageInfo{}, fmt.Errorf("idk")
    84  	}
    85  
    86  	obj := objs[0]
    87  	var di chariot.StorageInfo
    88  	rev, err := db.GetRev(ctx, obj.Location)
    89  	if couchdb.IgnoreNotFound(err) != nil {
    90  		di.Errors = append(di.Errors, chariot.StorageObjectError{
    91  			Object: obj,
    92  			Error:  fmt.Sprintf("fail to get entity metadata: %s", err.Error()),
    93  		})
    94  	}
    95  
    96  	// todo - add retry logic at some point
    97  	if len(di.Errors) == 0 && rev != "" {
    98  		// entity_id == doc id
    99  		if _, err := db.Delete(ctx, obj.Location, rev); couchdb.IgnoreNotFound(err) != nil {
   100  			di.Errors = append(di.Errors, chariot.StorageObjectError{
   101  				Object: obj,
   102  				Error:  fmt.Sprintf("fail to delete entity: %s", err.Error()),
   103  			})
   104  		}
   105  		di.ObjectsDeleted = append(di.ObjectsDeleted, obj)
   106  	}
   107  
   108  	var reterr error
   109  	if len(di.Errors) > 0 {
   110  		reterr = fmt.Errorf("got errors deleting objects: %v", di.Errors)
   111  	}
   112  	return di, reterr
   113  }
   114  
   115  func (s *CouchDBStorage) insertWithRetry(ctx context.Context, db *kivik.DB, obj *chariot.StorageObject) error {
   116  	_, err := db.Put(ctx, obj.Location, obj.Content)
   117  	if err == nil {
   118  		return nil
   119  	}
   120  	if kivik.HTTPStatus(err) != 409 {
   121  		return err
   122  	}
   123  
   124  	rev, metaErr := db.GetRev(ctx, obj.Location)
   125  	if metaErr != nil {
   126  		return metaErr
   127  	}
   128  
   129  	content := make(map[string]interface{})
   130  	if unmarErr := json.Unmarshal([]byte(obj.Content), &content); unmarErr != nil {
   131  		return unmarErr
   132  	}
   133  	content["_rev"] = rev
   134  
   135  	contentBytes, marErr := json.Marshal(content)
   136  	if marErr != nil {
   137  		return marErr
   138  	}
   139  	obj.Content = string(contentBytes)
   140  	return s.insertWithRetry(ctx, db, obj)
   141  }
   142  
   143  // bulkInsertWithRetry  assumes doc ids are unique
   144  func (s *CouchDBStorage) bulkInsertWithRetry(ctx context.Context, db *kivik.DB, msgs ...*Message) error {
   145  	_msgs := Messages(msgs)
   146  	if _msgs.Processed() {
   147  		return nil
   148  	}
   149  
   150  	bulkMsgs, bulkDocs := s.toBulkDocs(msgs)
   151  	if len(bulkDocs) == 0 {
   152  		return nil // no messages left to processed
   153  	}
   154  
   155  	bulkResults, bulkErr := db.BulkDocs(ctx, bulkDocs)
   156  	if bulkErr != nil {
   157  		// ensure message can be re-processed as soon as possible
   158  		for _, msg := range bulkMsgs {
   159  			s.nack(msg, bulkErr)
   160  		}
   161  		s.d.logger.Error(bulkErr, "can't put messages in bulk")
   162  		return bulkErr
   163  	}
   164  
   165  	var conflictDocs []kivik.BulkGetReference
   166  	for _, bulkResult := range bulkResults {
   167  		id := bulkResult.ID
   168  		err := bulkResult.Error
   169  
   170  		if err == nil {
   171  			if msg, ok := bulkMsgs[id]; ok {
   172  				s.ack(msg)
   173  			}
   174  			continue
   175  		}
   176  
   177  		if couchdb.IsConflict(err) {
   178  			conflictDocs = append(conflictDocs, kivik.BulkGetReference{ID: id})
   179  			continue
   180  		}
   181  
   182  		if msg, ok := bulkMsgs[id]; ok {
   183  			s.nack(msg, nil)
   184  		}
   185  
   186  		s.d.logger.Error(errors.Join(err), "msg has been nacked", "id", id, "db", db.Name())
   187  	}
   188  
   189  	if len(conflictDocs) == 0 { // only uniq ids are processed by bulk docs
   190  		return s.bulkInsertWithRetry(ctx, db, msgs...)
   191  	}
   192  
   193  	rows := db.BulkGet(ctx, conflictDocs, kivik.Param("revs", true))
   194  	if rows.Err() != nil {
   195  		for _, ref := range conflictDocs {
   196  			if msg, ok := bulkMsgs[ref.ID]; ok {
   197  				s.nack(msg, rows.Err())
   198  			}
   199  		}
   200  		s.d.logger.Error(rows.Err(), "bulk get errors, messages nacked")
   201  		return rows.Err()
   202  	}
   203  
   204  	for rows.Next() {
   205  		// Err handle?
   206  		id, _ := rows.ID()
   207  		msg, ok := bulkMsgs[id]
   208  		if !ok {
   209  			continue
   210  		}
   211  		doc := make(map[string]interface{})
   212  		if scanErr := rows.ScanDoc(&doc); scanErr != nil {
   213  			s.nack(msg, scanErr)
   214  			continue
   215  		}
   216  		msg.Rev = doc["_rev"].(string)
   217  	}
   218  
   219  	return s.bulkInsertWithRetry(ctx, db, msgs...)
   220  }
   221  
   222  // ack convenience method for acking a message
   223  func (s *CouchDBStorage) ack(msg *Message) {
   224  	// TODO simplify CreateOrUpdateReplicationDoc
   225  	if err := s.d.CreateOrUpdateReplicationDoc(msg.Req); err != nil {
   226  		s.nack(msg, err)
   227  		return
   228  	}
   229  	msg.Ack()
   230  	s.d.metrics.CushionSuccessfulAcksTotal.WithLabelValues(validTenantIDAndDBName(msg.Req.TenantID, msg.Req.DBName)...).Inc()
   231  }
   232  
   233  // nack convenience method for nacking a message
   234  func (s *CouchDBStorage) nack(msg *Message, err error) {
   235  	msg.NackAndLog(err)
   236  	s.d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(msg.Req.TenantID, msg.Req.DBName)...).Inc()
   237  }
   238  
   239  // toBulkDocs converts messages to valid couchdb bulk docs: remove duplicates, nacks invalid messages
   240  func (s *CouchDBStorage) toBulkDocs(msgs []*Message) (map[string]*Message, []interface{}) {
   241  	bulkMsgs := map[string]*Message{}
   242  	var bulkDocs []interface{}
   243  	exists := map[string]bool{}
   244  	for _, msg := range msgs {
   245  		if msg.Processed() {
   246  			continue
   247  		}
   248  		id := msg.Req.EntityID
   249  		if exists[id] {
   250  			// can't bulk update duplicate documents
   251  			continue
   252  		}
   253  		doc, err := JSONUpdate(msg.Msg.Data(), map[string]string{"_id": id, "_rev": msg.Rev})
   254  		if err != nil {
   255  			s.nack(msg, err)
   256  			continue
   257  		}
   258  		bulkDocs = append(bulkDocs, doc)
   259  		bulkMsgs[id] = msg
   260  		exists[msg.Req.EntityID] = true
   261  	}
   262  	return bulkMsgs, bulkDocs
   263  }
   264  
   265  // JSONUpdate adds _id and _rev to docs for bulk update
   266  func JSONUpdate(doc []byte, kvs map[string]string) (map[string]interface{}, error) {
   267  	m := make(map[string]interface{})
   268  	if err := json.Unmarshal(doc, &m); err != nil {
   269  		return nil, err
   270  	}
   271  	for key, val := range kvs {
   272  		if val != "" {
   273  			m[key] = val
   274  		}
   275  	}
   276  	return m, nil
   277  }
   278  

View as plain text