package cushion import ( "context" "encoding/json" "fmt" "net/http" "slices" "strings" "sync" "maps" "github.com/go-kivik/kivik/v4" "edge-infra.dev/pkg/edge/chariot" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" ) type DBGetterFunc func(dbname string) (*MessageBuffer, error) type ReplicationDocCache struct { sync.RWMutex replDB string datasets map[string]dsapi.Dataset storer *CouchDBStorage dbGetter DBGetterFunc } func BuildReplicationDocCache(ctx context.Context, dbname string, storer *CouchDBStorage, dbGetter DBGetterFunc) (*ReplicationDocCache, error) { r := &ReplicationDocCache{ replDB: dbname, datasets: map[string]dsapi.Dataset{}, storer: storer, dbGetter: dbGetter, } repl, err := r.GetReplicationDoc(ctx, dbGetter) if err != nil { if kivik.HTTPStatus(err) != http.StatusNotFound { return nil, err } repl = &dsapi.ReplicationSet{} } for _, ds := range repl.Datasets { r.datasets[ds.Name] = ds } return r, nil } func (r *ReplicationDocCache) GetReplicationDoc(ctx context.Context, getDB DBGetterFunc) (*dsapi.ReplicationSet, error) { r.RLock() defer r.RUnlock() mb, err := getDB(r.replDB) if err != nil { return nil, err } return r.getReplicationDoc(ctx, mb.DB) } func (r *ReplicationDocCache) CreateOrUpdateReplicationDoc(req *Request) error { r.Lock() defer r.Unlock() dataset := createDataset(req) if !r.valid(dataset) { ctx := context.Background() mb, err := r.dbGetter(r.replDB) if err != nil { return err } db := mb.DB rsd, err := r.getReplicationDoc(ctx, db) switch { case kivik.HTTPStatus(err) == http.StatusNotFound: return r.updateRepDoc(ctx, db, createReplicationSetDoc(dataset), dataset) case err != nil: return err } updateReplicationSetDoc(rsd, dataset) return r.updateRepDoc(ctx, db, rsd, dataset) } return nil } func (r *ReplicationDocCache) valid(ds dsapi.Dataset) bool { oldDS, ok := r.datasets[ds.Name] if !ok { return false } if oldDS.Deleted != ds.Deleted { return false } if oldDS.EnterpriseUnitID == "" && ds.EnterpriseUnitID != "" { return false } if ds.Provider.Empty() { return true } return ds.Provider.Equal(oldDS.Provider) } func (r *ReplicationDocCache) getReplicationDoc(ctx context.Context, db *kivik.DB) (*dsapi.ReplicationSet, error) { row := db.Get(ctx, couchdb.ReplicationDocument) if row.Err() != nil { return nil, row.Err() } rsd := &dsapi.ReplicationSet{} err := row.ScanDoc(rsd) if err != nil { return nil, err } return rsd, nil } func (r *ReplicationDocCache) updateRepDoc(ctx context.Context, db *kivik.DB, replSet *dsapi.ReplicationSet, ds dsapi.Dataset) error { so, err := toStorageObject(replSet) if err != nil { return err } _, err = r.storer.Put(ctx, db, so) if err != nil { return err } r.datasets[ds.Name] = ds //logStorageInfo(si) return nil } func toStorageObject(replSet *dsapi.ReplicationSet) (chariot.StorageObject, error) { so := chariot.StorageObject{Location: couchdb.ReplicationDocument} data, err := json.Marshal(replSet) so.Content = string(data) return so, err } // https://ncr-saas.slack.com/archives/C0479SBMV88/p1670888542993859?thread_ts=1670887917.004499&cid=C0479SBMV88 func createDataset(req *Request) dsapi.Dataset { d := dsapi.Dataset{ Name: normalizeDBName(req.DBName), Config: defaultReplConfig(), Stores: splitEmpty(req.SiteID), Touchpoints: splitEmpty(req.TouchpointID), EnterpriseUnitID: req.EnterpriseUnitID, Deleted: req.Deleted && req.EntityID == couchdb.AllDocs, } if req.Provider != "" { d.Provider = &dsapi.Provider{Name: req.Provider} } return d } func createReplicationSetDoc(ds dsapi.Dataset) *dsapi.ReplicationSet { rsd := &dsapi.ReplicationSet{ Datasets: []dsapi.Dataset{ds}, } if !ds.Provider.Empty() { rsd.Providers = append(rsd.Providers, *ds.Provider) } return rsd } func updateReplicationSetDoc(replicationSet *dsapi.ReplicationSet, dataset dsapi.Dataset) { provider := dataset.Provider if !provider.Empty() { replicationSet.Providers = mergeProviders(replicationSet.Providers, *provider) } replicationSet.Datasets = mergeDatasets(replicationSet.Datasets, dataset) } func mergeProviders(providers []dsapi.Provider, p dsapi.Provider) []dsapi.Provider { for _, provider := range providers { if provider.Name == p.Name { return providers } } return append(providers, p) } func defaultReplConfig() dsapi.ReplConfig { return dsapi.ReplConfig{ Interval: fmt.Sprintf("%d", couchdb.ReplicationInterval.Milliseconds()), Continuous: true, CreateTarget: true, } } func mergeDatasets(datasets []dsapi.Dataset, d dsapi.Dataset) []dsapi.Dataset { for i := range datasets { dataset := &datasets[i] if dataset.Name == d.Name { updateDataset(dataset, &d) return datasets } } return append(datasets, d) } func updateDataset(a, b *dsapi.Dataset) { if a.Provider.Empty() && !b.Provider.Empty() { a.Provider = b.Provider } a.Deleted = b.Deleted a.Config = b.Config a.Stores = mergeStringArray(a.Stores, b.Stores) a.Touchpoints = mergeStringArray(a.Touchpoints, b.Touchpoints) if a.EnterpriseUnitID == "" { a.EnterpriseUnitID = b.EnterpriseUnitID } } func mergeStringArray(a, b []string) []string { result := map[string]struct{}{} for _, v := range a { result[v] = struct{}{} } for _, v := range b { result[v] = struct{}{} } sliceResult := slices.Collect(maps.Keys(result)) if len(sliceResult) == 0 { return []string{} } return sliceResult } func splitEmpty(str string) []string { str = strings.TrimSpace(str) if len(str) != 0 { return strings.Split(str, ",") } return []string{} }