1 package cushion
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "os"
9 "os/signal"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/go-kivik/kivik/v4"
15 _ "github.com/go-kivik/kivik/v4/couchdb"
16 "github.com/go-logr/logr"
17 "golang.org/x/sys/unix"
18
19 "k8s.io/apimachinery/pkg/api/errors"
20 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
21 "k8s.io/apimachinery/pkg/fields"
22 "k8s.io/apimachinery/pkg/labels"
23 "k8s.io/apimachinery/pkg/selection"
24 "k8s.io/apimachinery/pkg/types"
25 kwatch "k8s.io/apimachinery/pkg/watch"
26 "sigs.k8s.io/controller-runtime/pkg/client"
27
28 "edge-infra.dev/pkg/edge/chariot"
29 "edge-infra.dev/pkg/edge/clientutils"
30 dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
31 "edge-infra.dev/pkg/edge/datasync/couchdb"
32 "edge-infra.dev/pkg/k8s/runtime/conditions"
33 )
34
35 const (
36 SuccessLogMessage = "successfully processed pubsub message"
37 WarnLogMessage = "failed to process pubsub message, will retry"
38 FailureLogMessage = "failed to process pubsub message"
39
40
41 componentLabel = "platform.edge.ncr.com/component"
42 cushionNamespace = "cushion"
43 cushionUser = "cushion"
44 cushionRole = "cushion-admin"
45 )
46
47 var (
48
49 loggerOutput io.ReadWriter = os.Stdout
50 )
51
52 type DatabaseStatus interface {
53 WatchCouchDBDatabase(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error
54 }
55
56 type DatabaseStatusFunc func(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error
57
58 func (db DatabaseStatusFunc) WatchCouchDBDatabase(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error {
59 return db(ctx, k8sDB, duration)
60 }
61
62
63 type Option func(*Daemon) error
64
65 func OptionLogger(logger logr.Logger) Option {
66 return func(d *Daemon) error {
67 d.logger = logger
68 return nil
69 }
70 }
71
72
73 func OptionPubSubReceiver(ipsr chariot.IPubSubReceiver) Option {
74 return func(d *Daemon) error {
75 if ipsr == nil {
76 return fmt.Errorf("IPubSubReceiver must not be nil")
77 }
78 d.pubSubReceiver = ipsr
79 return nil
80 }
81 }
82
83
84 func OptionCouchDBStorage(c *kivik.Client) Option {
85 return func(d *Daemon) error {
86 if c == nil {
87 return fmt.Errorf("CouchDB client must not be nil")
88 }
89 d.storer = NewCouchDBStorage(c, d)
90 return nil
91 }
92 }
93
94 func OptionKubeClient(c client.WithWatch) Option {
95 return func(d *Daemon) error {
96 if c == nil {
97 return fmt.Errorf("kube client must not be nil")
98 }
99 d.kube = c
100 return nil
101 }
102 }
103
104 func OptionDatabaseStatus(dbStatus DatabaseStatus) Option {
105 return func(d *Daemon) error {
106 if dbStatus == nil {
107 return fmt.Errorf("status func must not be nil")
108 }
109 d.DatabaseStatus = dbStatus
110 return nil
111 }
112 }
113
114
115 type Daemon struct {
116 sync.Mutex
117 pubSubReceiver chariot.IPubSubReceiver
118 storer *CouchDBStorage
119 kube client.WithWatch
120 *ReplicationDocCache
121
122 databases map[string]*MessageBuffer
123 logger logr.Logger
124 metrics *Metrics
125 cfg *Config
126 ShutDownContext context.Context
127 DatabaseStatus
128 replicationDB string
129 }
130
131
132 func NewDaemon(cfg *Config, options ...Option) (*Daemon, error) {
133 var d = new(Daemon)
134 d.cfg = cfg
135 d.ShutDownContext = ShutDownContext()
136 d.metrics = NewMetrics()
137 d.metrics.Reset()
138 d.databases = make(map[string]*MessageBuffer)
139
140 ctx := context.Background()
141 for _, opt := range options {
142 if err := opt(d); err != nil {
143 return nil, err
144 }
145 }
146
147 if err := migrateCushionK8sDBs(ctx, d.kube); err != nil {
148 return nil, err
149 }
150
151 if err := d.buildDBCache(); err != nil {
152 return nil, err
153 }
154
155 d.replicationDB = cfg.ReplicationDB()
156 replDocCache, err := BuildReplicationDocCache(ctx, d.replicationDB, d.storer, d.getDatabase)
157 if err != nil {
158 return nil, err
159 }
160 d.ReplicationDocCache = replDocCache
161 return d, nil
162 }
163
164 func ShutDownContext() context.Context {
165 ctx, cancel := context.WithCancel(context.Background())
166 sigs := make(chan os.Signal, 1)
167 signal.Notify(sigs, unix.SIGINT, unix.SIGTERM)
168 go func() {
169 <-sigs
170 cancel()
171 }()
172 return ctx
173 }
174
175
176 func (d *Daemon) Run(ctx context.Context) error {
177 return d.pubSubReceiver.Receive(ctx, d.pubSubReceiverHandle)
178 }
179
180
181
182 func (d *Daemon) pubSubReceiverHandle(ctx context.Context, ipsm chariot.IPubSubMessage) {
183 req := &Request{}
184 if err := req.FromAttributes(ipsm.Attributes()); err != nil {
185 nackAndLogPubSubRequest(ipsm, "invalid message attributes", err)
186 d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
187 return
188 }
189
190 messageBuffer, err := d.getOrCreateDatabaseForMessage(req.DBName, req.TenantID, ipsm)
191 if err != nil {
192 return
193 }
194
195 if req.Deleted {
196 if req.DBName == d.replicationDB {
197 ackAndLogPubSubRequest(ipsm, fmt.Errorf("replication can never be deleted: %s", req.DBName))
198 return
199 }
200 if strings.HasPrefix(req.DBName, "_") {
201 ackAndLogPubSubRequest(ipsm, fmt.Errorf("reserved database cannot deleted: %s", req.DBName))
202 return
203 }
204 if req.EntityID == couchdb.AllDocs {
205 d.DeleteDB(ctx, ipsm, req)
206 return
207 }
208 d.DeleteDocument(ctx, messageBuffer.DB, ipsm, req)
209 }
210
211 messageBuffer.Add(NewMessage(req, ipsm))
212 }
213
214 func (d *Daemon) BatchProcess(ctx context.Context, db *kivik.DB, msgs ...*Message) {
215 _, err := d.storer.BulKPut(ctx, db, msgs...)
216 if err != nil {
217 d.logger.Error(err, "bulk update failure")
218 }
219
220
221 go Messages(msgs).NackAll()
222 }
223
224 func (d *Daemon) DeleteDocument(ctx context.Context, database *kivik.DB, ipsm chariot.IPubSubMessage, req *Request) {
225 so := chariot.StorageObject{
226 Location: req.EntityID,
227 Content: string(ipsm.Data()),
228 }
229 _, err := d.storer.Delete(ctx, database, so)
230 if err != nil {
231 nackAndLogPubSubRequest(ipsm, "", err)
232 d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
233 return
234 }
235 d.metrics.CushionSuccessfulAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
236 ackAndLogPubSubRequest(ipsm, err)
237 }
238
239
240 func (d *Daemon) CancelMessageBuffer(req *Request) {
241 d.Lock()
242 defer d.Unlock()
243 messageBuffer := d.databases[req.DBName]
244 if messageBuffer != nil {
245 messageBuffer.Stop()
246 }
247 }
248
249
250 func (d *Daemon) DeleteDB(ctx context.Context, ipsm chariot.IPubSubMessage, req *Request) {
251 d.CancelMessageBuffer(req)
252
253 db := &dsapi.CouchDBDatabase{
254 TypeMeta: metav1.TypeMeta{APIVersion: dsapi.GroupVersion.String(), Kind: "CouchDBDatabase"},
255 ObjectMeta: metav1.ObjectMeta{Name: req.K8sDBName(), Namespace: cushionNamespace},
256 }
257 err := d.kube.Delete(ctx, db)
258 if err != nil && !errors.IsNotFound(err) {
259 nackAndLogPubSubRequest(ipsm, "", err)
260 d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
261 return
262 }
263
264 err = d.storer.client.DestroyDB(ctx, req.DBName)
265 if err != nil && !couchdb.IsNotFound(err) {
266 nackAndLogPubSubRequest(ipsm, "", err)
267 d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
268 return
269 }
270
271 err = d.ReplicationDocCache.CreateOrUpdateReplicationDoc(req)
272 if err != nil {
273 nackAndLogPubSubRequest(ipsm, "", err)
274 d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
275 return
276 }
277
278 d.metrics.CushionSuccessfulAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
279 ackAndLogPubSubRequest(ipsm, err)
280 d.logger.Info("Successfully deleted database from couchdb", "db", req.DBName)
281 }
282
283 func (d *Daemon) getDatabase(dbName string) (*MessageBuffer, error) {
284 d.Lock()
285 defer d.Unlock()
286
287 db := d.databases[dbName]
288 if db != nil && !db.deleted {
289 return db, nil
290 }
291
292
293 k8sdbName := K8sDBName(dbName)
294 kubeCtx := context.Background()
295 nn := types.NamespacedName{
296 Name: k8sdbName,
297 Namespace: cushionNamespace,
298 }
299 newDB := newCouchDBDatabase(nn, dbName, couchdb.Namespace)
300 if err := clientutils.CreateOrUpdateCouchDBDatabase(kubeCtx, d.kube, newDB); err != nil {
301 d.logger.Error(err, "fail to create/update CouchDBDatabase resource", "dbname", dbName, "k8sdbName", k8sdbName)
302 return nil, err
303 }
304 err := d.WatchCouchDBDatabase(context.Background(), newDB, time.Duration(1)*time.Minute)
305 if err != nil {
306 return nil, err
307 }
308
309
310
311
312 couchCtx := context.Background()
313 ok, err := d.storer.client.DBExists(couchCtx, dbName)
314 if err != nil {
315 return nil, err
316 } else if !ok {
317 nfErr := fmt.Errorf("couch database %s not found", dbName)
318 return nil, nfErr
319 }
320
321 d.logger.Info("creating kivik CouchDB database resource:", "dbName", dbName)
322
323 database := d.storer.client.DB(dbName)
324 mb := NewMessageBuffer(d.ShutDownContext, d, database, d.cfg.BulkSize, d.cfg.MaxWaitInterval)
325 d.databases[dbName] = mb
326
327 return mb, nil
328 }
329
330 func dbStatus(cl client.WithWatch) func(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error {
331 return func(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error {
332 list := &dsapi.CouchDBDatabaseList{}
333 watch, err := cl.Watch(ctx, list, &client.ListOptions{
334 FieldSelector: fields.OneTermEqualSelector("metadata.name", k8sDB.Name),
335 Namespace: k8sDB.Namespace,
336 })
337 if err != nil {
338 return err
339 }
340 loop:
341 for {
342 select {
343 case event := <-watch.ResultChan():
344 db := event.Object.(*dsapi.CouchDBDatabase)
345 switch event.Type {
346 case kwatch.Added, kwatch.Modified:
347 ready := conditions.IsReady(db)
348 if !ready {
349 continue
350 }
351 break loop
352 case kwatch.Error:
353 return fmt.Errorf("error retrieving CouchDBDatabase")
354 }
355 case <-time.After(duration):
356 return fmt.Errorf("error timing out while watching CouchDBDatabase")
357 }
358 }
359 return nil
360 }
361 }
362
363 func newCouchDBDatabase(nn types.NamespacedName, dbName, serverNS string) *dsapi.CouchDBDatabase {
364 return &dsapi.CouchDBDatabase{
365 TypeMeta: metav1.TypeMeta{
366 APIVersion: dsapi.GroupVersion.String(),
367 Kind: "CouchDBDatabase",
368 },
369 ObjectMeta: metav1.ObjectMeta{
370 Name: nn.Name,
371 Namespace: nn.Namespace,
372 Labels: map[string]string{componentLabel: cushionNamespace},
373 },
374 Spec: dsapi.CouchDBDatabaseSpec{
375 Name: dbName,
376 ServerRef: dsapi.ServerReference{
377 Name: couchdb.AdminServerName,
378 Namespace: serverNS,
379 },
380 Security: dsapi.Security{
381 Admins: dsapi.NameRole{
382 Names: []string{cushionUser},
383 Roles: []string{cushionRole},
384 },
385 },
386 },
387 }
388 }
389
390 func (d *Daemon) getOrCreateDatabaseForMessage(dbName string, tenant string, ipsm chariot.IPubSubMessage) (*MessageBuffer, error) {
391 db, err := d.getDatabase(dbName)
392 if err != nil {
393 nackAndLogPubSubRequest(ipsm, "unable to get db for message", err)
394 d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(tenant, dbName)...).Inc()
395 return nil, err
396 }
397 return db, nil
398 }
399
400
401 func (d *Daemon) buildDBCache() error {
402 couchCtx := context.Background()
403 d.Lock()
404 defer d.Unlock()
405
406
407 r, err := labels.NewRequirement(componentLabel, selection.In, []string{cushionNamespace})
408 if err != nil {
409 return fmt.Errorf("error creating componentLabel requirement: %w", err)
410 }
411 k8sDBs := &dsapi.CouchDBDatabaseList{}
412 err = d.kube.List(couchCtx, k8sDBs, &client.ListOptions{
413 LabelSelector: labels.NewSelector().Add(*r),
414 Namespace: cushionNamespace,
415 })
416 if err != nil {
417 d.logger.Error(err, "failed to get all dbs")
418 return err
419 }
420
421 for i := range k8sDBs.Items {
422 k8sDB := k8sDBs.Items[i]
423 ready := conditions.IsReady(&k8sDB)
424 if !ready {
425 continue
426 }
427 database := d.storer.client.DB(k8sDB.Spec.Name)
428 d.databases[k8sDB.Spec.Name] = NewMessageBuffer(d.ShutDownContext, d, database, d.cfg.BulkSize, d.cfg.MaxWaitInterval)
429 }
430 return nil
431 }
432
433
434 type PubSubLogMessageObject struct {
435 Severity string `json:"severity"`
436 Message string `json:"message"`
437
438
439 Err string `json:"error,omitempty"`
440
441
442 ID string `json:"pubsub_id"`
443 Data []byte `json:"pubsub_data"`
444 Attributes map[string]string `json:"pubsub_attributes"`
445 PublishTime time.Time `json:"pubsub_publish_time"`
446 DeliveryAttempt int `json:"pubsub_delivery_attempt"`
447 OrderingKey string `json:"pubsub_ordering_key"`
448 }
449
450 func ackAndLogPubSubRequest(ipsm chariot.IPubSubMessage, optionalErr error) {
451 defer ipsm.Ack()
452
453 var msg = SuccessLogMessage
454 var sev = LogSeverityInfo
455 var err string
456 if optionalErr != nil {
457 msg = FailureLogMessage
458 sev = LogSeverityError
459 err = optionalErr.Error()
460 }
461
462
463 var deliveryAttempt int
464 if ptrDeliveryAttempt := ipsm.DeliveryAttempt(); ptrDeliveryAttempt != nil {
465 deliveryAttempt = *ptrDeliveryAttempt
466 }
467
468
469 var logObject = PubSubLogMessageObject{
470 Severity: sev,
471 Message: msg,
472 Err: err,
473 ID: ipsm.ID(),
474 Data: ipsm.Data(),
475 Attributes: ipsm.Attributes(),
476 PublishTime: ipsm.PublishTime(),
477 DeliveryAttempt: deliveryAttempt,
478 OrderingKey: ipsm.OrderingKey(),
479 }
480 if optionalErr != nil {
481 json.NewEncoder(loggerOutput).Encode(logObject)
482 }
483 }
484
485 func nackAndLogPubSubRequest(ipsm chariot.IPubSubMessage, msg string, optionalErr error) {
486 defer ipsm.Nack()
487
488
489 var sev = LogSeverityInfo
490 if msg == "" {
491 msg = WarnLogMessage
492 sev = LogSeverityWarn
493 }
494 var err string
495 if optionalErr != nil {
496 msg = FailureLogMessage
497 sev = LogSeverityError
498 err = optionalErr.Error()
499 }
500
501
502 var deliveryAttempt int
503 if ptrDeliveryAttempt := ipsm.DeliveryAttempt(); ptrDeliveryAttempt != nil {
504 deliveryAttempt = *ptrDeliveryAttempt
505 }
506
507
508 var logObject = PubSubLogMessageObject{
509 Severity: sev,
510 Message: msg,
511 Err: err,
512 ID: ipsm.ID(),
513 Data: ipsm.Data(),
514 Attributes: ipsm.Attributes(),
515 PublishTime: ipsm.PublishTime(),
516 DeliveryAttempt: deliveryAttempt,
517 OrderingKey: ipsm.OrderingKey(),
518 }
519 json.NewEncoder(loggerOutput).Encode(logObject)
520 }
521
522 type StorageInfoLogMessageObject struct {
523 Severity string `json:"severity"`
524 Message string `json:"message"`
525
526 StorageInfo chariot.StorageInfo `json:"storage_info"`
527 }
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544 func validTenantIDAndDBName(tenantID string, dbname string) []string {
545 if dbname == "" {
546 dbname = "Empty_DB"
547 }
548 if tenantID == "" {
549 tenantID = "Empty_Tenant"
550 }
551 return []string{strings.ToLower(tenantID), strings.ToLower(dbname)}
552 }
553
View as plain text