package cushion import ( "context" "encoding/json" "fmt" "io" "os" "os/signal" "strings" "sync" "time" "github.com/go-kivik/kivik/v4" _ "github.com/go-kivik/kivik/v4/couchdb" // The CouchDB driver "github.com/go-logr/logr" "golang.org/x/sys/unix" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/selection" "k8s.io/apimachinery/pkg/types" kwatch "k8s.io/apimachinery/pkg/watch" "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/edge/chariot" "edge-infra.dev/pkg/edge/clientutils" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/k8s/runtime/conditions" ) const ( SuccessLogMessage = "successfully processed pubsub message" WarnLogMessage = "failed to process pubsub message, will retry" FailureLogMessage = "failed to process pubsub message" // needs to align with values in manifests componentLabel = "platform.edge.ncr.com/component" cushionNamespace = "cushion" cushionUser = "cushion" cushionRole = "cushion-admin" ) var ( // loggerOutput is defined in a variable to allow automated tests to verify the output of logs. loggerOutput io.ReadWriter = os.Stdout ) type DatabaseStatus interface { WatchCouchDBDatabase(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error } type DatabaseStatusFunc func(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error func (db DatabaseStatusFunc) WatchCouchDBDatabase(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error { return db(ctx, k8sDB, duration) } // Option functions set private fields in the Daemon. type Option func(*Daemon) error func OptionLogger(logger logr.Logger) Option { return func(d *Daemon) error { d.logger = logger return nil } } // OptionPubSubReceiver sets the PubSub interface in the Daemon. func OptionPubSubReceiver(ipsr chariot.IPubSubReceiver) Option { return func(d *Daemon) error { if ipsr == nil { return fmt.Errorf("IPubSubReceiver must not be nil") } d.pubSubReceiver = ipsr return nil } } // OptionCouchDBStorage sets the CouchDB client in the Daemon. func OptionCouchDBStorage(c *kivik.Client) Option { return func(d *Daemon) error { if c == nil { return fmt.Errorf("CouchDB client must not be nil") } d.storer = NewCouchDBStorage(c, d) return nil } } func OptionKubeClient(c client.WithWatch) Option { return func(d *Daemon) error { if c == nil { return fmt.Errorf("kube client must not be nil") } d.kube = c return nil } } func OptionDatabaseStatus(dbStatus DatabaseStatus) Option { return func(d *Daemon) error { if dbStatus == nil { return fmt.Errorf("status func must not be nil") } d.DatabaseStatus = dbStatus return nil } } // Daemon is the Chariot process that pulls and processes PubSub messages. type Daemon struct { sync.Mutex // this mutex protects the pubSubReceiverHandle method. pubSubReceiver chariot.IPubSubReceiver storer *CouchDBStorage kube client.WithWatch *ReplicationDocCache // databases is a cache of each database the couchdb client uses databases map[string]*MessageBuffer logger logr.Logger metrics *Metrics cfg *Config ShutDownContext context.Context DatabaseStatus replicationDB string } // NewDaemon returns a Daemon with the provided options set. func NewDaemon(cfg *Config, options ...Option) (*Daemon, error) { var d = new(Daemon) d.cfg = cfg d.ShutDownContext = ShutDownContext() d.metrics = NewMetrics() d.metrics.Reset() d.databases = make(map[string]*MessageBuffer) ctx := context.Background() for _, opt := range options { if err := opt(d); err != nil { return nil, err } } if err := migrateCushionK8sDBs(ctx, d.kube); err != nil { return nil, err } if err := d.buildDBCache(); err != nil { return nil, err } d.replicationDB = cfg.ReplicationDB() replDocCache, err := BuildReplicationDocCache(ctx, d.replicationDB, d.storer, d.getDatabase) if err != nil { return nil, err } d.ReplicationDocCache = replDocCache return d, nil } func ShutDownContext() context.Context { ctx, cancel := context.WithCancel(context.Background()) sigs := make(chan os.Signal, 1) signal.Notify(sigs, unix.SIGINT, unix.SIGTERM) go func() { <-sigs cancel() }() return ctx } // Run starts the daemon and blocks until an error occurs. func (d *Daemon) Run(ctx context.Context) error { return d.pubSubReceiver.Receive(ctx, d.pubSubReceiverHandle) } // pubSubReceiverHandle is called for each PubSub message. Unlike chariot version we don't want to // ack every message. Need to work out what response to other services will look like here. func (d *Daemon) pubSubReceiverHandle(ctx context.Context, ipsm chariot.IPubSubMessage) { req := &Request{} if err := req.FromAttributes(ipsm.Attributes()); err != nil { nackAndLogPubSubRequest(ipsm, "invalid message attributes", err) d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc() return } messageBuffer, err := d.getOrCreateDatabaseForMessage(req.DBName, req.TenantID, ipsm) if err != nil { return // ack/nack logic handled in method } if req.Deleted { if req.DBName == d.replicationDB { ackAndLogPubSubRequest(ipsm, fmt.Errorf("replication can never be deleted: %s", req.DBName)) return } if strings.HasPrefix(req.DBName, "_") { ackAndLogPubSubRequest(ipsm, fmt.Errorf("reserved database cannot deleted: %s", req.DBName)) return } if req.EntityID == couchdb.AllDocs { d.DeleteDB(ctx, ipsm, req) return } d.DeleteDocument(ctx, messageBuffer.DB, ipsm, req) } messageBuffer.Add(NewMessage(req, ipsm)) } func (d *Daemon) BatchProcess(ctx context.Context, db *kivik.DB, msgs ...*Message) { _, err := d.storer.BulKPut(ctx, db, msgs...) if err != nil { d.logger.Error(err, "bulk update failure") } // in case any messages fail to process and not acked go Messages(msgs).NackAll() } func (d *Daemon) DeleteDocument(ctx context.Context, database *kivik.DB, ipsm chariot.IPubSubMessage, req *Request) { so := chariot.StorageObject{ Location: req.EntityID, // used as docID Content: string(ipsm.Data()), } _, err := d.storer.Delete(ctx, database, so) if err != nil { nackAndLogPubSubRequest(ipsm, "", err) d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc() return } d.metrics.CushionSuccessfulAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc() ackAndLogPubSubRequest(ipsm, err) } // CancelMessageBuffer stop processing incoming messages for a database func (d *Daemon) CancelMessageBuffer(req *Request) { d.Lock() defer d.Unlock() messageBuffer := d.databases[req.DBName] if messageBuffer != nil { messageBuffer.Stop() } } // DeleteDB deletes the k8s CouchDBDatabase resource and its associated CouchDB database. func (d *Daemon) DeleteDB(ctx context.Context, ipsm chariot.IPubSubMessage, req *Request) { d.CancelMessageBuffer(req) db := &dsapi.CouchDBDatabase{ TypeMeta: metav1.TypeMeta{APIVersion: dsapi.GroupVersion.String(), Kind: "CouchDBDatabase"}, ObjectMeta: metav1.ObjectMeta{Name: req.K8sDBName(), Namespace: cushionNamespace}, } err := d.kube.Delete(ctx, db) if err != nil && !errors.IsNotFound(err) { nackAndLogPubSubRequest(ipsm, "", err) d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc() return } err = d.storer.client.DestroyDB(ctx, req.DBName) if err != nil && !couchdb.IsNotFound(err) { nackAndLogPubSubRequest(ipsm, "", err) d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc() return } err = d.ReplicationDocCache.CreateOrUpdateReplicationDoc(req) if err != nil { nackAndLogPubSubRequest(ipsm, "", err) d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc() return } d.metrics.CushionSuccessfulAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc() ackAndLogPubSubRequest(ipsm, err) d.logger.Info("Successfully deleted database from couchdb", "db", req.DBName) } func (d *Daemon) getDatabase(dbName string) (*MessageBuffer, error) { d.Lock() defer d.Unlock() db := d.databases[dbName] if db != nil && !db.deleted { return db, nil } // ensure kube friendly name when looking for and creating CouchDBDatabase resource k8sdbName := K8sDBName(dbName) kubeCtx := context.Background() nn := types.NamespacedName{ Name: k8sdbName, Namespace: cushionNamespace, } newDB := newCouchDBDatabase(nn, dbName, couchdb.Namespace) // todo - make logging better if err := clientutils.CreateOrUpdateCouchDBDatabase(kubeCtx, d.kube, newDB); err != nil { d.logger.Error(err, "fail to create/update CouchDBDatabase resource", "dbname", dbName, "k8sdbName", k8sdbName) return nil, err } err := d.WatchCouchDBDatabase(context.Background(), newDB, time.Duration(1)*time.Minute) if err != nil { return nil, err } // todo - rely on message back off for now but need to look into acking errors // we know can't be handled // double check db exists with couch client and can auth with it couchCtx := context.Background() ok, err := d.storer.client.DBExists(couchCtx, dbName) if err != nil { return nil, err } else if !ok { nfErr := fmt.Errorf("couch database %s not found", dbName) return nil, nfErr } d.logger.Info("creating kivik CouchDB database resource:", "dbName", dbName) // cache the database resource to prevent having to go through this logic every message database := d.storer.client.DB(dbName) mb := NewMessageBuffer(d.ShutDownContext, d, database, d.cfg.BulkSize, d.cfg.MaxWaitInterval) d.databases[dbName] = mb return mb, nil } func dbStatus(cl client.WithWatch) func(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error { return func(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error { list := &dsapi.CouchDBDatabaseList{} watch, err := cl.Watch(ctx, list, &client.ListOptions{ FieldSelector: fields.OneTermEqualSelector("metadata.name", k8sDB.Name), Namespace: k8sDB.Namespace, }) if err != nil { return err } loop: for { select { case event := <-watch.ResultChan(): db := event.Object.(*dsapi.CouchDBDatabase) switch event.Type { case kwatch.Added, kwatch.Modified: ready := conditions.IsReady(db) if !ready { continue } break loop case kwatch.Error: return fmt.Errorf("error retrieving CouchDBDatabase") } case <-time.After(duration): return fmt.Errorf("error timing out while watching CouchDBDatabase") } } return nil } } func newCouchDBDatabase(nn types.NamespacedName, dbName, serverNS string) *dsapi.CouchDBDatabase { return &dsapi.CouchDBDatabase{ TypeMeta: metav1.TypeMeta{ APIVersion: dsapi.GroupVersion.String(), Kind: "CouchDBDatabase", }, ObjectMeta: metav1.ObjectMeta{ Name: nn.Name, Namespace: nn.Namespace, Labels: map[string]string{componentLabel: cushionNamespace}, }, Spec: dsapi.CouchDBDatabaseSpec{ Name: dbName, ServerRef: dsapi.ServerReference{ Name: couchdb.AdminServerName, Namespace: serverNS, }, Security: dsapi.Security{ Admins: dsapi.NameRole{ Names: []string{cushionUser}, Roles: []string{cushionRole}, }, }, }, } } func (d *Daemon) getOrCreateDatabaseForMessage(dbName string, tenant string, ipsm chariot.IPubSubMessage) (*MessageBuffer, error) { db, err := d.getDatabase(dbName) if err != nil { nackAndLogPubSubRequest(ipsm, "unable to get db for message", err) d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(tenant, dbName)...).Inc() return nil, err } return db, nil } // buildDBCache precache existing dbs func (d *Daemon) buildDBCache() error { couchCtx := context.Background() d.Lock() defer d.Unlock() // migration already happened r, err := labels.NewRequirement(componentLabel, selection.In, []string{cushionNamespace}) if err != nil { return fmt.Errorf("error creating componentLabel requirement: %w", err) } k8sDBs := &dsapi.CouchDBDatabaseList{} err = d.kube.List(couchCtx, k8sDBs, &client.ListOptions{ LabelSelector: labels.NewSelector().Add(*r), Namespace: cushionNamespace, }) if err != nil { d.logger.Error(err, "failed to get all dbs") return err } for i := range k8sDBs.Items { k8sDB := k8sDBs.Items[i] ready := conditions.IsReady(&k8sDB) if !ready { continue } database := d.storer.client.DB(k8sDB.Spec.Name) d.databases[k8sDB.Spec.Name] = NewMessageBuffer(d.ShutDownContext, d, database, d.cfg.BulkSize, d.cfg.MaxWaitInterval) } return nil } // PubSubLogMessageObject is logged when an Ack occurs. type PubSubLogMessageObject struct { Severity string `json:"severity"` // "info", "error" Message string `json:"message"` // The log message // Err is an error that caused Chariot to not process the reqest. Err string `json:"error,omitempty"` // PubSub info ID string `json:"pubsub_id"` Data []byte `json:"pubsub_data"` Attributes map[string]string `json:"pubsub_attributes"` PublishTime time.Time `json:"pubsub_publish_time"` DeliveryAttempt int `json:"pubsub_delivery_attempt"` OrderingKey string `json:"pubsub_ordering_key"` } func ackAndLogPubSubRequest(ipsm chariot.IPubSubMessage, optionalErr error) { defer ipsm.Ack() // Ack after logging for unit test corectness purposes. // Determine the log message and severity. var msg = SuccessLogMessage var sev = LogSeverityInfo var err string if optionalErr != nil { msg = FailureLogMessage sev = LogSeverityError err = optionalErr.Error() } // Dereference the delivery attempt value. var deliveryAttempt int if ptrDeliveryAttempt := ipsm.DeliveryAttempt(); ptrDeliveryAttempt != nil { deliveryAttempt = *ptrDeliveryAttempt } // Log to the `loggerOutput` variable for unit tests that need to read logs. var logObject = PubSubLogMessageObject{ Severity: sev, Message: msg, Err: err, ID: ipsm.ID(), Data: ipsm.Data(), Attributes: ipsm.Attributes(), PublishTime: ipsm.PublishTime(), DeliveryAttempt: deliveryAttempt, OrderingKey: ipsm.OrderingKey(), } if optionalErr != nil { json.NewEncoder(loggerOutput).Encode(logObject) //nolint:errcheck } } func nackAndLogPubSubRequest(ipsm chariot.IPubSubMessage, msg string, optionalErr error) { defer ipsm.Nack() // Nack after logging for unit test corectness purposes. // Determine the log message and severity. var sev = LogSeverityInfo if msg == "" { msg = WarnLogMessage sev = LogSeverityWarn } var err string if optionalErr != nil { msg = FailureLogMessage sev = LogSeverityError err = optionalErr.Error() } // Dereference the delivery attempt value. var deliveryAttempt int if ptrDeliveryAttempt := ipsm.DeliveryAttempt(); ptrDeliveryAttempt != nil { deliveryAttempt = *ptrDeliveryAttempt } // Log to the `loggerOutput` variable for unit tests that need to read logs. var logObject = PubSubLogMessageObject{ Severity: sev, Message: msg, Err: err, ID: ipsm.ID(), Data: ipsm.Data(), Attributes: ipsm.Attributes(), PublishTime: ipsm.PublishTime(), DeliveryAttempt: deliveryAttempt, OrderingKey: ipsm.OrderingKey(), } json.NewEncoder(loggerOutput).Encode(logObject) //nolint:errcheck } type StorageInfoLogMessageObject struct { Severity string `json:"severity"` // "info", "error" Message string `json:"message"` // The log message StorageInfo chariot.StorageInfo `json:"storage_info"` } //func logStorageInfo(si chariot.StorageInfo) { // const msg = "Storage Info" // var sev = sevInfo // if len(si.Errors) > 0 { // sev = sevError // } // var logObject = StorageInfoLogMessageObject{ // Message: msg, // Severity: sev, // StorageInfo: si, // } // json.NewEncoder(loggerOutput).Encode(logObject) //nolint:errcheck //} // validTenantIDAndDBName validates tenant_id and db_name from the message attributes func validTenantIDAndDBName(tenantID string, dbname string) []string { if dbname == "" { dbname = "Empty_DB" } if tenantID == "" { tenantID = "Empty_Tenant" } return []string{strings.ToLower(tenantID), strings.ToLower(dbname)} }