...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/persistence_controller.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"reflect"
     7  	"time"
     8  
     9  	"edge-infra.dev/pkg/edge/clientutils"
    10  
    11  	"github.com/go-logr/logr"
    12  	"github.com/google/uuid"
    13  	appsv1 "k8s.io/api/apps/v1"
    14  	corev1 "k8s.io/api/core/v1"
    15  	"k8s.io/apimachinery/pkg/api/errors"
    16  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    17  	"k8s.io/apimachinery/pkg/types"
    18  	"k8s.io/client-go/dynamic"
    19  	kuberecorder "k8s.io/client-go/tools/record"
    20  	"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
    21  	ctrl "sigs.k8s.io/controller-runtime"
    22  	"sigs.k8s.io/controller-runtime/pkg/builder"
    23  	"sigs.k8s.io/controller-runtime/pkg/client"
    24  	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    25  	"sigs.k8s.io/controller-runtime/pkg/event"
    26  	"sigs.k8s.io/controller-runtime/pkg/handler"
    27  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    28  	ctrlReconcile "sigs.k8s.io/controller-runtime/pkg/reconcile"
    29  
    30  	"edge-infra.dev/pkg/edge/controllers/envctl/pkg/nameutils"
    31  	dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1"
    32  	"edge-infra.dev/pkg/edge/datasync/couchdb"
    33  	"edge-infra.dev/pkg/k8s/meta/status"
    34  	"edge-infra.dev/pkg/k8s/runtime/conditions"
    35  	"edge-infra.dev/pkg/k8s/runtime/controller/metrics"
    36  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile"
    37  	"edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr"
    38  	"edge-infra.dev/pkg/k8s/runtime/inventory"
    39  	"edge-infra.dev/pkg/k8s/runtime/patch"
    40  	"edge-infra.dev/pkg/k8s/runtime/sap"
    41  	"edge-infra.dev/pkg/k8s/unstructured"
    42  )
    43  
    44  var (
    45  	persistenceConditions = reconcile.Conditions{
    46  		Target: status.ReadyCondition,
    47  		Owned: []string{
    48  			dsapi.PersistenceSetupSucceededReason,
    49  			status.ReadyCondition,
    50  			status.ReconcilingCondition,
    51  			status.StalledCondition,
    52  		},
    53  		Summarize: []string{
    54  			dsapi.PersistenceSetupSucceededReason,
    55  			status.StalledCondition,
    56  		},
    57  		NegativePolarity: []string{
    58  			status.ReconcilingCondition,
    59  			status.StalledCondition,
    60  		},
    61  	}
    62  	oldPVCs = map[string]string{
    63  		"data-sync-couchdb":   "database-storage",
    64  		"data-sync-messaging": "outbox",
    65  	}
    66  )
    67  
    68  type CouchDBPersistenceReconciler struct {
    69  	client.Client
    70  	LeaderElector
    71  	kuberecorder.EventRecorder
    72  	ResourceManager *sap.ResourceManager
    73  	Name            string
    74  	Config          *Config
    75  	Metrics         metrics.Metrics
    76  	patchOptions    []patch.Option
    77  	replicationDB   string
    78  	PersistenceLeaderElector
    79  }
    80  
    81  func (r *CouchDBPersistenceReconciler) SetupWithManager(mgr ctrl.Manager) error {
    82  	r.replicationDB = r.Config.ReplicationDB()
    83  	r.patchOptions = getPatchOptions(persistenceConditions.Owned, r.Name)
    84  	d, err := dynamic.NewForConfig(mgr.GetConfig())
    85  	if err != nil {
    86  		return fmt.Errorf("fail to create dynamic client: %w", err)
    87  	}
    88  	r.ResourceManager = sap.NewResourceManager(
    89  		r.Client,
    90  		watcher.NewDefaultStatusWatcher(d, mgr.GetRESTMapper()),
    91  		sap.Owner{Field: r.Name},
    92  	)
    93  	return ctrl.NewControllerManagedBy(mgr).
    94  		For(&dsapi.CouchDBPersistence{}, r.persistencePredicates()).
    95  		Watches(
    96  			&corev1.Node{},
    97  			handler.EnqueueRequestsFromMapFunc(r.enqueue),
    98  			builder.WithPredicates(nodePredicate()),
    99  		).
   100  		Owns(&dsapi.CouchDBServer{}).
   101  		Owns(&dsapi.CouchDBDatabase{}).
   102  		Owns(&dsapi.CouchDBUser{}).
   103  		Owns(&dsapi.CouchDBReplicationSet{}).
   104  		Owns(&appsv1.StatefulSet{}).
   105  		Complete(r)
   106  }
   107  
   108  func (r *CouchDBPersistenceReconciler) persistencePredicates() builder.Predicates {
   109  	return builder.WithPredicates(
   110  		predicate.GenerationChangedPredicate{},
   111  		predicate.NewPredicateFuncs(func(_ client.Object) bool {
   112  			if r.Config.IsDSDS() {
   113  				return r.IsLeader()
   114  			}
   115  			return true
   116  		}))
   117  }
   118  
   119  func (r *CouchDBPersistenceReconciler) enqueue(ctx context.Context, _ client.Object) []ctrlReconcile.Request {
   120  	if r.Config.IsDSDS() && !r.IsLeader() {
   121  		return nil
   122  	}
   123  	persList := &dsapi.CouchDBPersistenceList{}
   124  	if err := r.Client.List(ctx, persList); client.IgnoreNotFound(err) != nil {
   125  		return nil
   126  	}
   127  	var requests []ctrlReconcile.Request
   128  	for _, p := range persList.Items {
   129  		requests = append(requests,
   130  			ctrlReconcile.Request{
   131  				NamespacedName: types.NamespacedName{
   132  					Name:      p.Name,
   133  					Namespace: p.Namespace,
   134  				}})
   135  	}
   136  	return requests
   137  }
   138  
   139  func nodePredicate() predicate.Predicate {
   140  	return predicate.Funcs{
   141  		UpdateFunc: nodeUpdatePredicate,
   142  		CreateFunc: func(_ event.CreateEvent) bool {
   143  			return true
   144  		},
   145  		DeleteFunc: func(_ event.DeleteEvent) bool {
   146  			return true
   147  		},
   148  	}
   149  }
   150  
   151  func nodeUpdatePredicate(e event.UpdateEvent) bool {
   152  	if !reflect.DeepEqual(e.ObjectNew.GetLabels(), e.ObjectOld.GetLabels()) {
   153  		return true
   154  	}
   155  	updatedNode, ok := e.ObjectNew.(*corev1.Node)
   156  	if !ok {
   157  		return false
   158  	}
   159  	oldNode, ok := e.ObjectOld.(*corev1.Node)
   160  	if !ok {
   161  		return false
   162  	}
   163  	return updatedNode.Spec.Unschedulable != oldNode.Spec.Unschedulable
   164  }
   165  
   166  func (r *CouchDBPersistenceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) {
   167  	reconcileStart := time.Now()
   168  	log := ctrl.LoggerFrom(ctx)
   169  
   170  	p := &dsapi.CouchDBPersistence{}
   171  	if err := r.Client.Get(ctx, req.NamespacedName, p); err != nil {
   172  		return ctrl.Result{}, client.IgnoreNotFound(err)
   173  	}
   174  
   175  	ctx = logr.NewContext(ctx, log)
   176  
   177  	patcher := patch.NewSerialPatcher(p, r.Client)
   178  	if patchErr := reconcile.Progressing(ctx, p, patcher, r.patchOptions...); patchErr != nil {
   179  		log.Error(patchErr, "unable to update status")
   180  		return ctrl.Result{}, err
   181  	}
   182  
   183  	recResult := reconcile.ResultEmpty
   184  	var recErr recerr.Error
   185  
   186  	defer func() {
   187  		summarizer := reconcile.NewSummarizer(patcher)
   188  		res, err = summarizer.SummarizeAndPatch(ctx, p, []reconcile.SummarizeOption{
   189  			reconcile.WithConditions(persistenceConditions),
   190  			reconcile.WithResult(recResult),
   191  			reconcile.WithError(recErr),
   192  			reconcile.WithIgnoreNotFound(),
   193  			reconcile.WithProcessors(
   194  				reconcile.RecordResult,
   195  			),
   196  			reconcile.WithFieldOwner(r.Name),
   197  			reconcile.WithEventRecorder(r.EventRecorder),
   198  		}...)
   199  		r.Metrics.RecordDuration(ctx, p, reconcileStart)
   200  		r.Metrics.RecordReadiness(ctx, p)
   201  	}()
   202  
   203  	// Add datasync finalizer if it does not exist
   204  	if !controllerutil.ContainsFinalizer(p, DatasyncFinalizer) {
   205  		controllerutil.AddFinalizer(p, DatasyncFinalizer)
   206  		recResult = reconcile.ResultRequeue
   207  		return
   208  	}
   209  
   210  	// execute finalizer logic
   211  	if !p.ObjectMeta.DeletionTimestamp.IsZero() {
   212  		log.Info("executing finalizer")
   213  		if fErr := pruneInventory(ctx, r.ResourceManager, p); fErr != nil {
   214  			err = fErr
   215  			return
   216  		}
   217  		controllerutil.RemoveFinalizer(p, DatasyncFinalizer)
   218  		log.Info("finalizer executed")
   219  		return
   220  	}
   221  
   222  	if recErr = r.reconcile(ctx, p); recErr != nil {
   223  		recErr.ToCondition(p, dsapi.PersistenceSetupSucceededReason)
   224  		err = recErr
   225  		return
   226  	}
   227  
   228  	recResult = reconcile.ResultSuccess
   229  	conditions.MarkTrue(p, dsapi.PersistenceSetupSucceededReason, status.SucceededReason, "Successfully created CouchDBPersistence resources")
   230  	log.Info("Successfully created CouchDBPersistence resources")
   231  
   232  	return
   233  }
   234  
   235  func (r *CouchDBPersistenceReconciler) reconcile(ctx context.Context, p *dsapi.CouchDBPersistence) recerr.Error {
   236  	log := logr.FromContextOrDiscard(ctx)
   237  	objs := p.PersistenceObjects()
   238  	if len(objs) == 0 {
   239  		log.Error(fmt.Errorf("invalid spec"), "no resources provided")
   240  		return recerr.NewStalled(fmt.Errorf("invalid spec"), "no resources provided")
   241  	}
   242  
   243  	substitutions, err := r.buildNodeSubstitutions(ctx)
   244  	if err != nil {
   245  		log.Error(err, "could not build substitutions")
   246  		return recerr.NewWait(err, "could not build substitutions", r.Config.RequeueTime)
   247  	}
   248  
   249  	selectNodeByRole, role := p.NodeRoleFilter()
   250  	selectNodeByClass, class := p.NodeClassFilter()
   251  	var uns []*unstructured.Unstructured
   252  	for _, obj := range objs {
   253  		for _, s := range substitutions {
   254  			if selectNodeByRole && s.DSDS && role != s.NodeRole() {
   255  				continue
   256  			}
   257  			if selectNodeByClass && s.DSDS && class != s.NodeClass() {
   258  				continue
   259  			}
   260  			un, err := ApplySubstitutions(obj, s)
   261  			if err != nil {
   262  				return recerr.NewStalled(err, "spec invalid substitution")
   263  			}
   264  			un.SetOwnerReferences(r.ownerRef(p))
   265  			un.SetNamespace(p.Namespace)
   266  			uns = append(uns, un)
   267  		}
   268  	}
   269  
   270  	changeSet, err := r.ResourceManager.ApplyAll(ctx, uns, sap.ApplyOptions{Force: true})
   271  	if err != nil {
   272  		log.Error(err, "fail to apply persistence resources")
   273  		return recerr.New(err, dsapi.PersistenceObjectsCreationFailedReason)
   274  	}
   275  
   276  	i := inventory.New(inventory.FromSapChangeSet(changeSet))
   277  	if err := r.prune(ctx, p, i); err != nil {
   278  		log.Error(err, "fail to prune resources")
   279  		return recerr.New(err, dsapi.PruneFailed)
   280  	}
   281  	p.Status.Inventory = i
   282  
   283  	return nil
   284  }
   285  
   286  // buildNodeSubstitutions create substitutions mapping based on node labels
   287  // new nodes can be added and deleted
   288  func (r *CouchDBPersistenceReconciler) buildNodeSubstitutions(ctx context.Context) (map[string]Substitution, error) {
   289  	log := logr.FromContextOrDiscard(ctx)
   290  	m := map[string]Substitution{}
   291  	if !r.Config.IsDSDS() {
   292  		su := StoreSubstitution(r.replicationDB)
   293  		genericUID, err := r.getNodeUIDGeneric(ctx)
   294  		su.NodeUID = genericUID
   295  		if err != nil {
   296  			log.Error(err, "failed to fetch Node UID for generic cluster")
   297  			return nil, err
   298  		}
   299  		m[su.ServerName] = su
   300  		return m, nil
   301  	}
   302  	nodes := &corev1.NodeList{}
   303  	if err := r.Client.List(ctx, nodes); client.IgnoreNotFound(err) != nil {
   304  		log.Error(err, "failed to get dsds nodes")
   305  		return nil, err
   306  	}
   307  	oldPVCs, err := oldPVCsSuffixes(ctx, r.Client)
   308  	if err != nil {
   309  		log.Error(err, "fail to get old pvcs")
   310  		return nil, err
   311  	}
   312  	leaderNode, err := r.LeaderElector.Elect(nodes.Items)
   313  	if err != nil {
   314  		log.Error(err, "fail to elect leader node")
   315  		return nil, err
   316  	}
   317  	log.Info("LEADER NODE", "node", leaderNode.Name)
   318  	var oldLeader *corev1.Node
   319  	for i := range nodes.Items {
   320  		node := nodes.Items[i]
   321  		if node.Spec.Unschedulable {
   322  			if node.Labels[couchdb.NodeLeaderLabel] == couchdb.LabelValueTrue {
   323  				oldLeader = &node
   324  			}
   325  			continue
   326  		}
   327  
   328  		ni, err := nameutils.GetNodeInfo(node, LaneNumberSubstitutionMaxLength)
   329  		if err != nil {
   330  			log.Info("Fail to acquire Node Info", "node", node.Name, "err", err)
   331  			continue
   332  		}
   333  
   334  		su := LaneSubstitution(ni, oldPVCs, r.replicationDB, string(leaderNode.UID))
   335  
   336  		// add label to node to be able to schedule statefulsets
   337  		node.Labels[couchdb.NodeUIDLabel] = string(node.UID)
   338  		if su.Leader {
   339  			node.Labels[couchdb.NodeLeaderLabel] = couchdb.LabelValueTrue
   340  		} else {
   341  			delete(node.Labels, couchdb.NodeLeaderLabel)
   342  		}
   343  
   344  		nodeWithLabel := node
   345  		if err = r.Client.Update(ctx, &nodeWithLabel); err != nil {
   346  			return nil, fmt.Errorf("fail to update/annotate node: %s, %w", node.Name, err)
   347  		}
   348  		m[su.ServerName] = su
   349  	}
   350  	if oldLeader != nil {
   351  		delete(oldLeader.Labels, couchdb.NodeLeaderLabel)
   352  		if err = r.Client.Update(ctx, oldLeader); err != nil && !errors.IsNotFound(err) {
   353  			return nil, fmt.Errorf("fail to update/annotate node: %s, %w", oldLeader.Name, err)
   354  		}
   355  	}
   356  	return m, nil
   357  }
   358  
   359  func (r *CouchDBPersistenceReconciler) getNodeUIDGeneric(ctx context.Context) (string, error) {
   360  	log := logr.FromContextOrDiscard(ctx)
   361  
   362  	cm := &corev1.ConfigMap{}
   363  	err := r.Client.Get(context.TODO(), client.ObjectKey{
   364  		Name:      ConfigMapUID,
   365  		Namespace: r.Config.CouchNamespace,
   366  	}, cm)
   367  
   368  	if err != nil && errors.IsNotFound(err) {
   369  		u := uuid.New().String()
   370  		newCM := &corev1.ConfigMap{
   371  			ObjectMeta: metav1.ObjectMeta{
   372  				Name:      ConfigMapUID,
   373  				Namespace: r.Config.CouchNamespace,
   374  			},
   375  			Data: map[string]string{
   376  				"uuid": u,
   377  			},
   378  		}
   379  		err = r.Client.Create(ctx, newCM)
   380  		if err != nil {
   381  			log.Error(err, "Failed to create ConfigMap with uuid")
   382  			return "", err
   383  		}
   384  		return u, nil
   385  	} else if err != nil {
   386  		log.Error(err, "Failed to fetch ConfigMap")
   387  		return "", err
   388  	}
   389  
   390  	if u, exists := cm.Data["uuid"]; exists {
   391  		return u, nil
   392  	}
   393  	log.Info("ConfigMap exists but no uuid found, recreating")
   394  	newUUID := uuid.New().String()
   395  	cm.Data["uuid"] = newUUID
   396  	err = clientutils.CreateOrUpdateConfigmap(ctx, r.Client, cm)
   397  	if err != nil {
   398  		log.Error(err, "Failed to update ConfigMap with new uuid")
   399  		return "", err
   400  	}
   401  	return newUUID, nil
   402  }
   403  func (r *CouchDBPersistenceReconciler) prune(ctx context.Context, p *dsapi.CouchDBPersistence, i *inventory.ResourceInventory) error {
   404  	if p.Status.Inventory != nil {
   405  		diff, err := inventory.Diff(p.Status.Inventory, i)
   406  		if err != nil {
   407  			return nil
   408  		}
   409  		if len(diff) > 0 {
   410  			changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, sap.DefaultDeleteOptions())
   411  			if err != nil {
   412  				return err
   413  			}
   414  			log := logr.FromContextOrDiscard(ctx)
   415  			log.Info("pruned objects", "changeset", changeSet.ToMap())
   416  		}
   417  	}
   418  	return nil
   419  }
   420  
   421  // ownerRef TODO there is a conflict with persistence
   422  func (r *CouchDBPersistenceReconciler) ownerRef(p *dsapi.CouchDBPersistence) []metav1.OwnerReference {
   423  	return []metav1.OwnerReference{
   424  		*metav1.NewControllerRef(
   425  			p,
   426  			dsapi.GroupVersion.WithKind("CouchDBPersistence"),
   427  		),
   428  	}
   429  }
   430  

View as plain text