...

Source file src/edge-infra.dev/pkg/edge/datasync/cushion/daemon.go

Documentation: edge-infra.dev/pkg/edge/datasync/cushion

     1  package cushion
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"io"
     8  	"os"
     9  	"os/signal"
    10  	"strings"
    11  	"sync"
    12  	"time"
    13  
    14  	"github.com/go-kivik/kivik/v4"
    15  	_ "github.com/go-kivik/kivik/v4/couchdb" // The CouchDB driver
    16  	"github.com/go-logr/logr"
    17  	"golang.org/x/sys/unix"
    18  
    19  	"k8s.io/apimachinery/pkg/api/errors"
    20  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    21  	"k8s.io/apimachinery/pkg/fields"
    22  	"k8s.io/apimachinery/pkg/labels"
    23  	"k8s.io/apimachinery/pkg/selection"
    24  	"k8s.io/apimachinery/pkg/types"
    25  	kwatch "k8s.io/apimachinery/pkg/watch"
    26  	"sigs.k8s.io/controller-runtime/pkg/client"
    27  
    28  	"edge-infra.dev/pkg/edge/chariot"
    29  	"edge-infra.dev/pkg/edge/clientutils"
    30  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    31  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    32  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    33  )
    34  
    35  const (
    36  	SuccessLogMessage = "successfully processed pubsub message"
    37  	WarnLogMessage    = "failed to process pubsub message, will retry"
    38  	FailureLogMessage = "failed to process pubsub message"
    39  
    40  	// needs to align with values in manifests
    41  	componentLabel   = "platform.edge.ncr.com/component"
    42  	cushionNamespace = "cushion"
    43  	cushionUser      = "cushion"
    44  	cushionRole      = "cushion-admin"
    45  )
    46  
    47  var (
    48  	// loggerOutput is defined in a variable to allow automated tests to verify the output of logs.
    49  	loggerOutput io.ReadWriter = os.Stdout
    50  )
    51  
    52  type DatabaseStatus interface {
    53  	WatchCouchDBDatabase(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error
    54  }
    55  
    56  type DatabaseStatusFunc func(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error
    57  
    58  func (db DatabaseStatusFunc) WatchCouchDBDatabase(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error {
    59  	return db(ctx, k8sDB, duration)
    60  }
    61  
    62  // Option functions set private fields in the Daemon.
    63  type Option func(*Daemon) error
    64  
    65  func OptionLogger(logger logr.Logger) Option {
    66  	return func(d *Daemon) error {
    67  		d.logger = logger
    68  		return nil
    69  	}
    70  }
    71  
    72  // OptionPubSubReceiver sets the PubSub interface in the Daemon.
    73  func OptionPubSubReceiver(ipsr chariot.IPubSubReceiver) Option {
    74  	return func(d *Daemon) error {
    75  		if ipsr == nil {
    76  			return fmt.Errorf("IPubSubReceiver must not be nil")
    77  		}
    78  		d.pubSubReceiver = ipsr
    79  		return nil
    80  	}
    81  }
    82  
    83  // OptionCouchDBStorage sets the CouchDB client in the Daemon.
    84  func OptionCouchDBStorage(c *kivik.Client) Option {
    85  	return func(d *Daemon) error {
    86  		if c == nil {
    87  			return fmt.Errorf("CouchDB client must not be nil")
    88  		}
    89  		d.storer = NewCouchDBStorage(c, d)
    90  		return nil
    91  	}
    92  }
    93  
    94  func OptionKubeClient(c client.WithWatch) Option {
    95  	return func(d *Daemon) error {
    96  		if c == nil {
    97  			return fmt.Errorf("kube client must not be nil")
    98  		}
    99  		d.kube = c
   100  		return nil
   101  	}
   102  }
   103  
   104  func OptionDatabaseStatus(dbStatus DatabaseStatus) Option {
   105  	return func(d *Daemon) error {
   106  		if dbStatus == nil {
   107  			return fmt.Errorf("status func must not be nil")
   108  		}
   109  		d.DatabaseStatus = dbStatus
   110  		return nil
   111  	}
   112  }
   113  
   114  // Daemon is the Chariot process that pulls and processes PubSub messages.
   115  type Daemon struct {
   116  	sync.Mutex     // this mutex protects the pubSubReceiverHandle method.
   117  	pubSubReceiver chariot.IPubSubReceiver
   118  	storer         *CouchDBStorage
   119  	kube           client.WithWatch
   120  	*ReplicationDocCache
   121  	// databases is a cache of each database the couchdb client uses
   122  	databases       map[string]*MessageBuffer
   123  	logger          logr.Logger
   124  	metrics         *Metrics
   125  	cfg             *Config
   126  	ShutDownContext context.Context
   127  	DatabaseStatus
   128  	replicationDB string
   129  }
   130  
   131  // NewDaemon returns a Daemon with the provided options set.
   132  func NewDaemon(cfg *Config, options ...Option) (*Daemon, error) {
   133  	var d = new(Daemon)
   134  	d.cfg = cfg
   135  	d.ShutDownContext = ShutDownContext()
   136  	d.metrics = NewMetrics()
   137  	d.metrics.Reset()
   138  	d.databases = make(map[string]*MessageBuffer)
   139  
   140  	ctx := context.Background()
   141  	for _, opt := range options {
   142  		if err := opt(d); err != nil {
   143  			return nil, err
   144  		}
   145  	}
   146  
   147  	if err := migrateCushionK8sDBs(ctx, d.kube); err != nil {
   148  		return nil, err
   149  	}
   150  
   151  	if err := d.buildDBCache(); err != nil {
   152  		return nil, err
   153  	}
   154  
   155  	d.replicationDB = cfg.ReplicationDB()
   156  	replDocCache, err := BuildReplicationDocCache(ctx, d.replicationDB, d.storer, d.getDatabase)
   157  	if err != nil {
   158  		return nil, err
   159  	}
   160  	d.ReplicationDocCache = replDocCache
   161  	return d, nil
   162  }
   163  
   164  func ShutDownContext() context.Context {
   165  	ctx, cancel := context.WithCancel(context.Background())
   166  	sigs := make(chan os.Signal, 1)
   167  	signal.Notify(sigs, unix.SIGINT, unix.SIGTERM)
   168  	go func() {
   169  		<-sigs
   170  		cancel()
   171  	}()
   172  	return ctx
   173  }
   174  
   175  // Run starts the daemon and blocks until an error occurs.
   176  func (d *Daemon) Run(ctx context.Context) error {
   177  	return d.pubSubReceiver.Receive(ctx, d.pubSubReceiverHandle)
   178  }
   179  
   180  // pubSubReceiverHandle is called for each PubSub message. Unlike chariot version we don't want to
   181  // ack every message. Need to work out what response to other services will look like here.
   182  func (d *Daemon) pubSubReceiverHandle(ctx context.Context, ipsm chariot.IPubSubMessage) {
   183  	req := &Request{}
   184  	if err := req.FromAttributes(ipsm.Attributes()); err != nil {
   185  		nackAndLogPubSubRequest(ipsm, "invalid message attributes", err)
   186  		d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
   187  		return
   188  	}
   189  
   190  	messageBuffer, err := d.getOrCreateDatabaseForMessage(req.DBName, req.TenantID, ipsm)
   191  	if err != nil {
   192  		return // ack/nack logic handled in method
   193  	}
   194  
   195  	if req.Deleted {
   196  		if req.DBName == d.replicationDB {
   197  			ackAndLogPubSubRequest(ipsm, fmt.Errorf("replication can never be deleted: %s", req.DBName))
   198  			return
   199  		}
   200  		if strings.HasPrefix(req.DBName, "_") {
   201  			ackAndLogPubSubRequest(ipsm, fmt.Errorf("reserved database cannot deleted: %s", req.DBName))
   202  			return
   203  		}
   204  		if req.EntityID == couchdb.AllDocs {
   205  			d.DeleteDB(ctx, ipsm, req)
   206  			return
   207  		}
   208  		d.DeleteDocument(ctx, messageBuffer.DB, ipsm, req)
   209  	}
   210  
   211  	messageBuffer.Add(NewMessage(req, ipsm))
   212  }
   213  
   214  func (d *Daemon) BatchProcess(ctx context.Context, db *kivik.DB, msgs ...*Message) {
   215  	_, err := d.storer.BulKPut(ctx, db, msgs...)
   216  	if err != nil {
   217  		d.logger.Error(err, "bulk update failure")
   218  	}
   219  
   220  	// in case any messages fail to process and not acked
   221  	go Messages(msgs).NackAll()
   222  }
   223  
   224  func (d *Daemon) DeleteDocument(ctx context.Context, database *kivik.DB, ipsm chariot.IPubSubMessage, req *Request) {
   225  	so := chariot.StorageObject{
   226  		Location: req.EntityID, // used as docID
   227  		Content:  string(ipsm.Data()),
   228  	}
   229  	_, err := d.storer.Delete(ctx, database, so)
   230  	if err != nil {
   231  		nackAndLogPubSubRequest(ipsm, "", err)
   232  		d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
   233  		return
   234  	}
   235  	d.metrics.CushionSuccessfulAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
   236  	ackAndLogPubSubRequest(ipsm, err)
   237  }
   238  
   239  // CancelMessageBuffer stop processing incoming messages for a database
   240  func (d *Daemon) CancelMessageBuffer(req *Request) {
   241  	d.Lock()
   242  	defer d.Unlock()
   243  	messageBuffer := d.databases[req.DBName]
   244  	if messageBuffer != nil {
   245  		messageBuffer.Stop()
   246  	}
   247  }
   248  
   249  // DeleteDB deletes the k8s CouchDBDatabase resource and its associated CouchDB database.
   250  func (d *Daemon) DeleteDB(ctx context.Context, ipsm chariot.IPubSubMessage, req *Request) {
   251  	d.CancelMessageBuffer(req)
   252  
   253  	db := &dsapi.CouchDBDatabase{
   254  		TypeMeta:   metav1.TypeMeta{APIVersion: dsapi.GroupVersion.String(), Kind: "CouchDBDatabase"},
   255  		ObjectMeta: metav1.ObjectMeta{Name: req.K8sDBName(), Namespace: cushionNamespace},
   256  	}
   257  	err := d.kube.Delete(ctx, db)
   258  	if err != nil && !errors.IsNotFound(err) {
   259  		nackAndLogPubSubRequest(ipsm, "", err)
   260  		d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
   261  		return
   262  	}
   263  
   264  	err = d.storer.client.DestroyDB(ctx, req.DBName)
   265  	if err != nil && !couchdb.IsNotFound(err) {
   266  		nackAndLogPubSubRequest(ipsm, "", err)
   267  		d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
   268  		return
   269  	}
   270  
   271  	err = d.ReplicationDocCache.CreateOrUpdateReplicationDoc(req)
   272  	if err != nil {
   273  		nackAndLogPubSubRequest(ipsm, "", err)
   274  		d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
   275  		return
   276  	}
   277  
   278  	d.metrics.CushionSuccessfulAcksTotal.WithLabelValues(validTenantIDAndDBName(req.TenantID, req.DBName)...).Inc()
   279  	ackAndLogPubSubRequest(ipsm, err)
   280  	d.logger.Info("Successfully deleted database from couchdb", "db", req.DBName)
   281  }
   282  
   283  func (d *Daemon) getDatabase(dbName string) (*MessageBuffer, error) {
   284  	d.Lock()
   285  	defer d.Unlock()
   286  
   287  	db := d.databases[dbName]
   288  	if db != nil && !db.deleted {
   289  		return db, nil
   290  	}
   291  
   292  	// ensure kube friendly name when looking for and creating CouchDBDatabase resource
   293  	k8sdbName := K8sDBName(dbName)
   294  	kubeCtx := context.Background()
   295  	nn := types.NamespacedName{
   296  		Name:      k8sdbName,
   297  		Namespace: cushionNamespace,
   298  	}
   299  	newDB := newCouchDBDatabase(nn, dbName, couchdb.Namespace) // todo - make logging better
   300  	if err := clientutils.CreateOrUpdateCouchDBDatabase(kubeCtx, d.kube, newDB); err != nil {
   301  		d.logger.Error(err, "fail to create/update CouchDBDatabase resource", "dbname", dbName, "k8sdbName", k8sdbName)
   302  		return nil, err
   303  	}
   304  	err := d.WatchCouchDBDatabase(context.Background(), newDB, time.Duration(1)*time.Minute)
   305  	if err != nil {
   306  		return nil, err
   307  	}
   308  	// todo - rely on message back off for now but need to look into acking errors
   309  	// we know can't be handled
   310  
   311  	// double check db exists with couch client and can auth with it
   312  	couchCtx := context.Background()
   313  	ok, err := d.storer.client.DBExists(couchCtx, dbName)
   314  	if err != nil {
   315  		return nil, err
   316  	} else if !ok {
   317  		nfErr := fmt.Errorf("couch database %s not found", dbName)
   318  		return nil, nfErr
   319  	}
   320  
   321  	d.logger.Info("creating kivik CouchDB database resource:", "dbName", dbName)
   322  	// cache the database resource to prevent having to go through this logic every message
   323  	database := d.storer.client.DB(dbName)
   324  	mb := NewMessageBuffer(d.ShutDownContext, d, database, d.cfg.BulkSize, d.cfg.MaxWaitInterval)
   325  	d.databases[dbName] = mb
   326  
   327  	return mb, nil
   328  }
   329  
   330  func dbStatus(cl client.WithWatch) func(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error {
   331  	return func(ctx context.Context, k8sDB *dsapi.CouchDBDatabase, duration time.Duration) error {
   332  		list := &dsapi.CouchDBDatabaseList{}
   333  		watch, err := cl.Watch(ctx, list, &client.ListOptions{
   334  			FieldSelector: fields.OneTermEqualSelector("metadata.name", k8sDB.Name),
   335  			Namespace:     k8sDB.Namespace,
   336  		})
   337  		if err != nil {
   338  			return err
   339  		}
   340  	loop:
   341  		for {
   342  			select {
   343  			case event := <-watch.ResultChan():
   344  				db := event.Object.(*dsapi.CouchDBDatabase)
   345  				switch event.Type {
   346  				case kwatch.Added, kwatch.Modified:
   347  					ready := conditions.IsReady(db)
   348  					if !ready {
   349  						continue
   350  					}
   351  					break loop
   352  				case kwatch.Error:
   353  					return fmt.Errorf("error retrieving CouchDBDatabase")
   354  				}
   355  			case <-time.After(duration):
   356  				return fmt.Errorf("error timing out while watching CouchDBDatabase")
   357  			}
   358  		}
   359  		return nil
   360  	}
   361  }
   362  
   363  func newCouchDBDatabase(nn types.NamespacedName, dbName, serverNS string) *dsapi.CouchDBDatabase {
   364  	return &dsapi.CouchDBDatabase{
   365  		TypeMeta: metav1.TypeMeta{
   366  			APIVersion: dsapi.GroupVersion.String(),
   367  			Kind:       "CouchDBDatabase",
   368  		},
   369  		ObjectMeta: metav1.ObjectMeta{
   370  			Name:      nn.Name,
   371  			Namespace: nn.Namespace,
   372  			Labels:    map[string]string{componentLabel: cushionNamespace},
   373  		},
   374  		Spec: dsapi.CouchDBDatabaseSpec{
   375  			Name: dbName,
   376  			ServerRef: dsapi.ServerReference{
   377  				Name:      couchdb.AdminServerName,
   378  				Namespace: serverNS,
   379  			},
   380  			Security: dsapi.Security{
   381  				Admins: dsapi.NameRole{
   382  					Names: []string{cushionUser},
   383  					Roles: []string{cushionRole},
   384  				},
   385  			},
   386  		},
   387  	}
   388  }
   389  
   390  func (d *Daemon) getOrCreateDatabaseForMessage(dbName string, tenant string, ipsm chariot.IPubSubMessage) (*MessageBuffer, error) {
   391  	db, err := d.getDatabase(dbName)
   392  	if err != nil {
   393  		nackAndLogPubSubRequest(ipsm, "unable to get db for message", err)
   394  		d.metrics.CushionFailedAcksTotal.WithLabelValues(validTenantIDAndDBName(tenant, dbName)...).Inc()
   395  		return nil, err
   396  	}
   397  	return db, nil
   398  }
   399  
   400  // buildDBCache precache existing dbs
   401  func (d *Daemon) buildDBCache() error {
   402  	couchCtx := context.Background()
   403  	d.Lock()
   404  	defer d.Unlock()
   405  
   406  	// migration already happened
   407  	r, err := labels.NewRequirement(componentLabel, selection.In, []string{cushionNamespace})
   408  	if err != nil {
   409  		return fmt.Errorf("error creating componentLabel requirement: %w", err)
   410  	}
   411  	k8sDBs := &dsapi.CouchDBDatabaseList{}
   412  	err = d.kube.List(couchCtx, k8sDBs, &client.ListOptions{
   413  		LabelSelector: labels.NewSelector().Add(*r),
   414  		Namespace:     cushionNamespace,
   415  	})
   416  	if err != nil {
   417  		d.logger.Error(err, "failed to get all dbs")
   418  		return err
   419  	}
   420  
   421  	for i := range k8sDBs.Items {
   422  		k8sDB := k8sDBs.Items[i]
   423  		ready := conditions.IsReady(&k8sDB)
   424  		if !ready {
   425  			continue
   426  		}
   427  		database := d.storer.client.DB(k8sDB.Spec.Name)
   428  		d.databases[k8sDB.Spec.Name] = NewMessageBuffer(d.ShutDownContext, d, database, d.cfg.BulkSize, d.cfg.MaxWaitInterval)
   429  	}
   430  	return nil
   431  }
   432  
   433  // PubSubLogMessageObject is logged when an Ack occurs.
   434  type PubSubLogMessageObject struct {
   435  	Severity string `json:"severity"` // "info", "error"
   436  	Message  string `json:"message"`  // The log message
   437  
   438  	// Err is an error that caused Chariot to not process the reqest.
   439  	Err string `json:"error,omitempty"`
   440  
   441  	// PubSub info
   442  	ID              string            `json:"pubsub_id"`
   443  	Data            []byte            `json:"pubsub_data"`
   444  	Attributes      map[string]string `json:"pubsub_attributes"`
   445  	PublishTime     time.Time         `json:"pubsub_publish_time"`
   446  	DeliveryAttempt int               `json:"pubsub_delivery_attempt"`
   447  	OrderingKey     string            `json:"pubsub_ordering_key"`
   448  }
   449  
   450  func ackAndLogPubSubRequest(ipsm chariot.IPubSubMessage, optionalErr error) {
   451  	defer ipsm.Ack() // Ack after logging for unit test corectness purposes.
   452  	// Determine the log message and severity.
   453  	var msg = SuccessLogMessage
   454  	var sev = LogSeverityInfo
   455  	var err string
   456  	if optionalErr != nil {
   457  		msg = FailureLogMessage
   458  		sev = LogSeverityError
   459  		err = optionalErr.Error()
   460  	}
   461  
   462  	// Dereference the delivery attempt value.
   463  	var deliveryAttempt int
   464  	if ptrDeliveryAttempt := ipsm.DeliveryAttempt(); ptrDeliveryAttempt != nil {
   465  		deliveryAttempt = *ptrDeliveryAttempt
   466  	}
   467  
   468  	// Log to the `loggerOutput` variable for unit tests that need to read logs.
   469  	var logObject = PubSubLogMessageObject{
   470  		Severity:        sev,
   471  		Message:         msg,
   472  		Err:             err,
   473  		ID:              ipsm.ID(),
   474  		Data:            ipsm.Data(),
   475  		Attributes:      ipsm.Attributes(),
   476  		PublishTime:     ipsm.PublishTime(),
   477  		DeliveryAttempt: deliveryAttempt,
   478  		OrderingKey:     ipsm.OrderingKey(),
   479  	}
   480  	if optionalErr != nil {
   481  		json.NewEncoder(loggerOutput).Encode(logObject) //nolint:errcheck
   482  	}
   483  }
   484  
   485  func nackAndLogPubSubRequest(ipsm chariot.IPubSubMessage, msg string, optionalErr error) {
   486  	defer ipsm.Nack() // Nack after logging for unit test corectness purposes.
   487  
   488  	// Determine the log message and severity.
   489  	var sev = LogSeverityInfo
   490  	if msg == "" {
   491  		msg = WarnLogMessage
   492  		sev = LogSeverityWarn
   493  	}
   494  	var err string
   495  	if optionalErr != nil {
   496  		msg = FailureLogMessage
   497  		sev = LogSeverityError
   498  		err = optionalErr.Error()
   499  	}
   500  
   501  	// Dereference the delivery attempt value.
   502  	var deliveryAttempt int
   503  	if ptrDeliveryAttempt := ipsm.DeliveryAttempt(); ptrDeliveryAttempt != nil {
   504  		deliveryAttempt = *ptrDeliveryAttempt
   505  	}
   506  
   507  	// Log to the `loggerOutput` variable for unit tests that need to read logs.
   508  	var logObject = PubSubLogMessageObject{
   509  		Severity:        sev,
   510  		Message:         msg,
   511  		Err:             err,
   512  		ID:              ipsm.ID(),
   513  		Data:            ipsm.Data(),
   514  		Attributes:      ipsm.Attributes(),
   515  		PublishTime:     ipsm.PublishTime(),
   516  		DeliveryAttempt: deliveryAttempt,
   517  		OrderingKey:     ipsm.OrderingKey(),
   518  	}
   519  	json.NewEncoder(loggerOutput).Encode(logObject) //nolint:errcheck
   520  }
   521  
   522  type StorageInfoLogMessageObject struct {
   523  	Severity string `json:"severity"` // "info", "error"
   524  	Message  string `json:"message"`  // The log message
   525  
   526  	StorageInfo chariot.StorageInfo `json:"storage_info"`
   527  }
   528  
   529  //func logStorageInfo(si chariot.StorageInfo) {
   530  //	const msg = "Storage Info"
   531  //	var sev = sevInfo
   532  //	if len(si.Errors) > 0 {
   533  //		sev = sevError
   534  //	}
   535  //	var logObject = StorageInfoLogMessageObject{
   536  //		Message:     msg,
   537  //		Severity:    sev,
   538  //		StorageInfo: si,
   539  //	}
   540  //	json.NewEncoder(loggerOutput).Encode(logObject) //nolint:errcheck
   541  //}
   542  
   543  // validTenantIDAndDBName validates tenant_id and db_name from the message attributes
   544  func validTenantIDAndDBName(tenantID string, dbname string) []string {
   545  	if dbname == "" {
   546  		dbname = "Empty_DB"
   547  	}
   548  	if tenantID == "" {
   549  		tenantID = "Empty_Tenant"
   550  	}
   551  	return []string{strings.ToLower(tenantID), strings.ToLower(dbname)}
   552  }
   553  

View as plain text