...

Source file src/edge-infra.dev/pkg/edge/datasync/cushion/replication.go

Documentation: edge-infra.dev/pkg/edge/datasync/cushion

     1  package cushion
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"net/http"
     8  	"slices"
     9  	"strings"
    10  	"sync"
    11  
    12  	"maps"
    13  
    14  	"github.com/go-kivik/kivik/v4"
    15  
    16  	"edge-infra.dev/pkg/edge/chariot"
    17  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    18  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    19  )
    20  
    21  type DBGetterFunc func(dbname string) (*MessageBuffer, error)
    22  
    23  type ReplicationDocCache struct {
    24  	sync.RWMutex
    25  	replDB   string
    26  	datasets map[string]dsapi.Dataset
    27  	storer   *CouchDBStorage
    28  	dbGetter DBGetterFunc
    29  }
    30  
    31  func BuildReplicationDocCache(ctx context.Context, dbname string, storer *CouchDBStorage, dbGetter DBGetterFunc) (*ReplicationDocCache, error) {
    32  	r := &ReplicationDocCache{
    33  		replDB:   dbname,
    34  		datasets: map[string]dsapi.Dataset{},
    35  		storer:   storer,
    36  		dbGetter: dbGetter,
    37  	}
    38  	repl, err := r.GetReplicationDoc(ctx, dbGetter)
    39  	if err != nil {
    40  		if kivik.HTTPStatus(err) != http.StatusNotFound {
    41  			return nil, err
    42  		}
    43  		repl = &dsapi.ReplicationSet{}
    44  	}
    45  	for _, ds := range repl.Datasets {
    46  		r.datasets[ds.Name] = ds
    47  	}
    48  	return r, nil
    49  }
    50  
    51  func (r *ReplicationDocCache) GetReplicationDoc(ctx context.Context, getDB DBGetterFunc) (*dsapi.ReplicationSet, error) {
    52  	r.RLock()
    53  	defer r.RUnlock()
    54  
    55  	mb, err := getDB(r.replDB)
    56  	if err != nil {
    57  		return nil, err
    58  	}
    59  
    60  	return r.getReplicationDoc(ctx, mb.DB)
    61  }
    62  
    63  func (r *ReplicationDocCache) CreateOrUpdateReplicationDoc(req *Request) error {
    64  	r.Lock()
    65  	defer r.Unlock()
    66  	dataset := createDataset(req)
    67  	if !r.valid(dataset) {
    68  		ctx := context.Background()
    69  		mb, err := r.dbGetter(r.replDB)
    70  		if err != nil {
    71  			return err
    72  		}
    73  		db := mb.DB
    74  		rsd, err := r.getReplicationDoc(ctx, db)
    75  		switch {
    76  		case kivik.HTTPStatus(err) == http.StatusNotFound:
    77  			return r.updateRepDoc(ctx, db, createReplicationSetDoc(dataset), dataset)
    78  		case err != nil:
    79  			return err
    80  		}
    81  		updateReplicationSetDoc(rsd, dataset)
    82  
    83  		return r.updateRepDoc(ctx, db, rsd, dataset)
    84  	}
    85  	return nil
    86  }
    87  
    88  func (r *ReplicationDocCache) valid(ds dsapi.Dataset) bool {
    89  	oldDS, ok := r.datasets[ds.Name]
    90  	if !ok {
    91  		return false
    92  	}
    93  	if oldDS.Deleted != ds.Deleted {
    94  		return false
    95  	}
    96  	if oldDS.EnterpriseUnitID == "" && ds.EnterpriseUnitID != "" {
    97  		return false
    98  	}
    99  	if ds.Provider.Empty() {
   100  		return true
   101  	}
   102  	return ds.Provider.Equal(oldDS.Provider)
   103  }
   104  
   105  func (r *ReplicationDocCache) getReplicationDoc(ctx context.Context, db *kivik.DB) (*dsapi.ReplicationSet, error) {
   106  	row := db.Get(ctx, couchdb.ReplicationDocument)
   107  	if row.Err() != nil {
   108  		return nil, row.Err()
   109  	}
   110  	rsd := &dsapi.ReplicationSet{}
   111  	err := row.ScanDoc(rsd)
   112  	if err != nil {
   113  		return nil, err
   114  	}
   115  	return rsd, nil
   116  }
   117  
   118  func (r *ReplicationDocCache) updateRepDoc(ctx context.Context, db *kivik.DB, replSet *dsapi.ReplicationSet, ds dsapi.Dataset) error {
   119  	so, err := toStorageObject(replSet)
   120  	if err != nil {
   121  		return err
   122  	}
   123  
   124  	_, err = r.storer.Put(ctx, db, so)
   125  	if err != nil {
   126  		return err
   127  	}
   128  	r.datasets[ds.Name] = ds
   129  	//logStorageInfo(si)
   130  	return nil
   131  }
   132  
   133  func toStorageObject(replSet *dsapi.ReplicationSet) (chariot.StorageObject, error) {
   134  	so := chariot.StorageObject{Location: couchdb.ReplicationDocument}
   135  	data, err := json.Marshal(replSet)
   136  	so.Content = string(data)
   137  	return so, err
   138  }
   139  
   140  // https://ncr-saas.slack.com/archives/C0479SBMV88/p1670888542993859?thread_ts=1670887917.004499&cid=C0479SBMV88
   141  func createDataset(req *Request) dsapi.Dataset {
   142  	d := dsapi.Dataset{
   143  		Name:             normalizeDBName(req.DBName),
   144  		Config:           defaultReplConfig(),
   145  		Stores:           splitEmpty(req.SiteID),
   146  		Touchpoints:      splitEmpty(req.TouchpointID),
   147  		EnterpriseUnitID: req.EnterpriseUnitID,
   148  		Deleted:          req.Deleted && req.EntityID == couchdb.AllDocs,
   149  	}
   150  
   151  	if req.Provider != "" {
   152  		d.Provider = &dsapi.Provider{Name: req.Provider}
   153  	}
   154  	return d
   155  }
   156  
   157  func createReplicationSetDoc(ds dsapi.Dataset) *dsapi.ReplicationSet {
   158  	rsd := &dsapi.ReplicationSet{
   159  		Datasets: []dsapi.Dataset{ds},
   160  	}
   161  	if !ds.Provider.Empty() {
   162  		rsd.Providers = append(rsd.Providers, *ds.Provider)
   163  	}
   164  	return rsd
   165  }
   166  
   167  func updateReplicationSetDoc(replicationSet *dsapi.ReplicationSet, dataset dsapi.Dataset) {
   168  	provider := dataset.Provider
   169  	if !provider.Empty() {
   170  		replicationSet.Providers = mergeProviders(replicationSet.Providers, *provider)
   171  	}
   172  	replicationSet.Datasets = mergeDatasets(replicationSet.Datasets, dataset)
   173  }
   174  
   175  func mergeProviders(providers []dsapi.Provider, p dsapi.Provider) []dsapi.Provider {
   176  	for _, provider := range providers {
   177  		if provider.Name == p.Name {
   178  			return providers
   179  		}
   180  	}
   181  	return append(providers, p)
   182  }
   183  
   184  func defaultReplConfig() dsapi.ReplConfig {
   185  	return dsapi.ReplConfig{
   186  		Interval:     fmt.Sprintf("%d", couchdb.ReplicationInterval.Milliseconds()),
   187  		Continuous:   true,
   188  		CreateTarget: true,
   189  	}
   190  }
   191  
   192  func mergeDatasets(datasets []dsapi.Dataset, d dsapi.Dataset) []dsapi.Dataset {
   193  	for i := range datasets {
   194  		dataset := &datasets[i]
   195  		if dataset.Name == d.Name {
   196  			updateDataset(dataset, &d)
   197  			return datasets
   198  		}
   199  	}
   200  	return append(datasets, d)
   201  }
   202  
   203  func updateDataset(a, b *dsapi.Dataset) {
   204  	if a.Provider.Empty() && !b.Provider.Empty() {
   205  		a.Provider = b.Provider
   206  	}
   207  	a.Deleted = b.Deleted
   208  	a.Config = b.Config
   209  	a.Stores = mergeStringArray(a.Stores, b.Stores)
   210  	a.Touchpoints = mergeStringArray(a.Touchpoints, b.Touchpoints)
   211  	if a.EnterpriseUnitID == "" {
   212  		a.EnterpriseUnitID = b.EnterpriseUnitID
   213  	}
   214  }
   215  
   216  func mergeStringArray(a, b []string) []string {
   217  	result := map[string]struct{}{}
   218  	for _, v := range a {
   219  		result[v] = struct{}{}
   220  	}
   221  	for _, v := range b {
   222  		result[v] = struct{}{}
   223  	}
   224  	sliceResult := slices.Collect(maps.Keys(result))
   225  	if len(sliceResult) == 0 {
   226  		return []string{}
   227  	}
   228  	return sliceResult
   229  }
   230  
   231  func splitEmpty(str string) []string {
   232  	str = strings.TrimSpace(str)
   233  	if len(str) != 0 {
   234  		return strings.Split(str, ",")
   235  	}
   236  	return []string{}
   237  }
   238  

View as plain text