1 package cushion
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "net/http"
8 "slices"
9 "strings"
10 "sync"
11
12 "maps"
13
14 "github.com/go-kivik/kivik/v4"
15
16 "edge-infra.dev/pkg/edge/chariot"
17 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
18 "edge-infra.dev/pkg/edge/datasync/couchdb"
19 )
20
21 type DBGetterFunc func(dbname string) (*MessageBuffer, error)
22
23 type ReplicationDocCache struct {
24 sync.RWMutex
25 replDB string
26 datasets map[string]dsapi.Dataset
27 storer *CouchDBStorage
28 dbGetter DBGetterFunc
29 }
30
31 func BuildReplicationDocCache(ctx context.Context, dbname string, storer *CouchDBStorage, dbGetter DBGetterFunc) (*ReplicationDocCache, error) {
32 r := &ReplicationDocCache{
33 replDB: dbname,
34 datasets: map[string]dsapi.Dataset{},
35 storer: storer,
36 dbGetter: dbGetter,
37 }
38 repl, err := r.GetReplicationDoc(ctx, dbGetter)
39 if err != nil {
40 if kivik.HTTPStatus(err) != http.StatusNotFound {
41 return nil, err
42 }
43 repl = &dsapi.ReplicationSet{}
44 }
45 for _, ds := range repl.Datasets {
46 r.datasets[ds.Name] = ds
47 }
48 return r, nil
49 }
50
51 func (r *ReplicationDocCache) GetReplicationDoc(ctx context.Context, getDB DBGetterFunc) (*dsapi.ReplicationSet, error) {
52 r.RLock()
53 defer r.RUnlock()
54
55 mb, err := getDB(r.replDB)
56 if err != nil {
57 return nil, err
58 }
59
60 return r.getReplicationDoc(ctx, mb.DB)
61 }
62
63 func (r *ReplicationDocCache) CreateOrUpdateReplicationDoc(req *Request) error {
64 r.Lock()
65 defer r.Unlock()
66 dataset := createDataset(req)
67 if !r.valid(dataset) {
68 ctx := context.Background()
69 mb, err := r.dbGetter(r.replDB)
70 if err != nil {
71 return err
72 }
73 db := mb.DB
74 rsd, err := r.getReplicationDoc(ctx, db)
75 switch {
76 case kivik.HTTPStatus(err) == http.StatusNotFound:
77 return r.updateRepDoc(ctx, db, createReplicationSetDoc(dataset), dataset)
78 case err != nil:
79 return err
80 }
81 updateReplicationSetDoc(rsd, dataset)
82
83 return r.updateRepDoc(ctx, db, rsd, dataset)
84 }
85 return nil
86 }
87
88 func (r *ReplicationDocCache) valid(ds dsapi.Dataset) bool {
89 oldDS, ok := r.datasets[ds.Name]
90 if !ok {
91 return false
92 }
93 if oldDS.Deleted != ds.Deleted {
94 return false
95 }
96 if oldDS.EnterpriseUnitID == "" && ds.EnterpriseUnitID != "" {
97 return false
98 }
99 if ds.Provider.Empty() {
100 return true
101 }
102 return ds.Provider.Equal(oldDS.Provider)
103 }
104
105 func (r *ReplicationDocCache) getReplicationDoc(ctx context.Context, db *kivik.DB) (*dsapi.ReplicationSet, error) {
106 row := db.Get(ctx, couchdb.ReplicationDocument)
107 if row.Err() != nil {
108 return nil, row.Err()
109 }
110 rsd := &dsapi.ReplicationSet{}
111 err := row.ScanDoc(rsd)
112 if err != nil {
113 return nil, err
114 }
115 return rsd, nil
116 }
117
118 func (r *ReplicationDocCache) updateRepDoc(ctx context.Context, db *kivik.DB, replSet *dsapi.ReplicationSet, ds dsapi.Dataset) error {
119 so, err := toStorageObject(replSet)
120 if err != nil {
121 return err
122 }
123
124 _, err = r.storer.Put(ctx, db, so)
125 if err != nil {
126 return err
127 }
128 r.datasets[ds.Name] = ds
129
130 return nil
131 }
132
133 func toStorageObject(replSet *dsapi.ReplicationSet) (chariot.StorageObject, error) {
134 so := chariot.StorageObject{Location: couchdb.ReplicationDocument}
135 data, err := json.Marshal(replSet)
136 so.Content = string(data)
137 return so, err
138 }
139
140
141 func createDataset(req *Request) dsapi.Dataset {
142 d := dsapi.Dataset{
143 Name: normalizeDBName(req.DBName),
144 Config: defaultReplConfig(),
145 Stores: splitEmpty(req.SiteID),
146 Touchpoints: splitEmpty(req.TouchpointID),
147 EnterpriseUnitID: req.EnterpriseUnitID,
148 Deleted: req.Deleted && req.EntityID == couchdb.AllDocs,
149 }
150
151 if req.Provider != "" {
152 d.Provider = &dsapi.Provider{Name: req.Provider}
153 }
154 return d
155 }
156
157 func createReplicationSetDoc(ds dsapi.Dataset) *dsapi.ReplicationSet {
158 rsd := &dsapi.ReplicationSet{
159 Datasets: []dsapi.Dataset{ds},
160 }
161 if !ds.Provider.Empty() {
162 rsd.Providers = append(rsd.Providers, *ds.Provider)
163 }
164 return rsd
165 }
166
167 func updateReplicationSetDoc(replicationSet *dsapi.ReplicationSet, dataset dsapi.Dataset) {
168 provider := dataset.Provider
169 if !provider.Empty() {
170 replicationSet.Providers = mergeProviders(replicationSet.Providers, *provider)
171 }
172 replicationSet.Datasets = mergeDatasets(replicationSet.Datasets, dataset)
173 }
174
175 func mergeProviders(providers []dsapi.Provider, p dsapi.Provider) []dsapi.Provider {
176 for _, provider := range providers {
177 if provider.Name == p.Name {
178 return providers
179 }
180 }
181 return append(providers, p)
182 }
183
184 func defaultReplConfig() dsapi.ReplConfig {
185 return dsapi.ReplConfig{
186 Interval: fmt.Sprintf("%d", couchdb.ReplicationInterval.Milliseconds()),
187 Continuous: true,
188 CreateTarget: true,
189 }
190 }
191
192 func mergeDatasets(datasets []dsapi.Dataset, d dsapi.Dataset) []dsapi.Dataset {
193 for i := range datasets {
194 dataset := &datasets[i]
195 if dataset.Name == d.Name {
196 updateDataset(dataset, &d)
197 return datasets
198 }
199 }
200 return append(datasets, d)
201 }
202
203 func updateDataset(a, b *dsapi.Dataset) {
204 if a.Provider.Empty() && !b.Provider.Empty() {
205 a.Provider = b.Provider
206 }
207 a.Deleted = b.Deleted
208 a.Config = b.Config
209 a.Stores = mergeStringArray(a.Stores, b.Stores)
210 a.Touchpoints = mergeStringArray(a.Touchpoints, b.Touchpoints)
211 if a.EnterpriseUnitID == "" {
212 a.EnterpriseUnitID = b.EnterpriseUnitID
213 }
214 }
215
216 func mergeStringArray(a, b []string) []string {
217 result := map[string]struct{}{}
218 for _, v := range a {
219 result[v] = struct{}{}
220 }
221 for _, v := range b {
222 result[v] = struct{}{}
223 }
224 sliceResult := slices.Collect(maps.Keys(result))
225 if len(sliceResult) == 0 {
226 return []string{}
227 }
228 return sliceResult
229 }
230
231 func splitEmpty(str string) []string {
232 str = strings.TrimSpace(str)
233 if len(str) != 0 {
234 return strings.Split(str, ",")
235 }
236 return []string{}
237 }
238
View as plain text