package couchctl import ( "context" "fmt" "reflect" "time" "edge-infra.dev/pkg/edge/clientutils" "github.com/go-logr/logr" "github.com/google/uuid" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/dynamic" kuberecorder "k8s.io/client-go/tools/record" "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" ctrlReconcile "sigs.k8s.io/controller-runtime/pkg/reconcile" "edge-infra.dev/pkg/edge/controllers/envctl/pkg/nameutils" dsapi "edge-infra.dev/pkg/edge/datasync/apis/v1alpha1" "edge-infra.dev/pkg/edge/datasync/couchdb" "edge-infra.dev/pkg/k8s/meta/status" "edge-infra.dev/pkg/k8s/runtime/conditions" "edge-infra.dev/pkg/k8s/runtime/controller/metrics" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile" "edge-infra.dev/pkg/k8s/runtime/controller/reconcile/recerr" "edge-infra.dev/pkg/k8s/runtime/inventory" "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/k8s/runtime/sap" "edge-infra.dev/pkg/k8s/unstructured" ) var ( persistenceConditions = reconcile.Conditions{ Target: status.ReadyCondition, Owned: []string{ dsapi.PersistenceSetupSucceededReason, status.ReadyCondition, status.ReconcilingCondition, status.StalledCondition, }, Summarize: []string{ dsapi.PersistenceSetupSucceededReason, status.StalledCondition, }, NegativePolarity: []string{ status.ReconcilingCondition, status.StalledCondition, }, } oldPVCs = map[string]string{ "data-sync-couchdb": "database-storage", "data-sync-messaging": "outbox", } ) type CouchDBPersistenceReconciler struct { client.Client LeaderElector kuberecorder.EventRecorder ResourceManager *sap.ResourceManager Name string Config *Config Metrics metrics.Metrics patchOptions []patch.Option replicationDB string PersistenceLeaderElector } func (r *CouchDBPersistenceReconciler) SetupWithManager(mgr ctrl.Manager) error { r.replicationDB = r.Config.ReplicationDB() r.patchOptions = getPatchOptions(persistenceConditions.Owned, r.Name) d, err := dynamic.NewForConfig(mgr.GetConfig()) if err != nil { return fmt.Errorf("fail to create dynamic client: %w", err) } r.ResourceManager = sap.NewResourceManager( r.Client, watcher.NewDefaultStatusWatcher(d, mgr.GetRESTMapper()), sap.Owner{Field: r.Name}, ) return ctrl.NewControllerManagedBy(mgr). For(&dsapi.CouchDBPersistence{}, r.persistencePredicates()). Watches( &corev1.Node{}, handler.EnqueueRequestsFromMapFunc(r.enqueue), builder.WithPredicates(nodePredicate()), ). Owns(&dsapi.CouchDBServer{}). Owns(&dsapi.CouchDBDatabase{}). Owns(&dsapi.CouchDBUser{}). Owns(&dsapi.CouchDBReplicationSet{}). Owns(&appsv1.StatefulSet{}). Complete(r) } func (r *CouchDBPersistenceReconciler) persistencePredicates() builder.Predicates { return builder.WithPredicates( predicate.GenerationChangedPredicate{}, predicate.NewPredicateFuncs(func(_ client.Object) bool { if r.Config.IsDSDS() { return r.IsLeader() } return true })) } func (r *CouchDBPersistenceReconciler) enqueue(ctx context.Context, _ client.Object) []ctrlReconcile.Request { if r.Config.IsDSDS() && !r.IsLeader() { return nil } persList := &dsapi.CouchDBPersistenceList{} if err := r.Client.List(ctx, persList); client.IgnoreNotFound(err) != nil { return nil } var requests []ctrlReconcile.Request for _, p := range persList.Items { requests = append(requests, ctrlReconcile.Request{ NamespacedName: types.NamespacedName{ Name: p.Name, Namespace: p.Namespace, }}) } return requests } func nodePredicate() predicate.Predicate { return predicate.Funcs{ UpdateFunc: nodeUpdatePredicate, CreateFunc: func(_ event.CreateEvent) bool { return true }, DeleteFunc: func(_ event.DeleteEvent) bool { return true }, } } func nodeUpdatePredicate(e event.UpdateEvent) bool { if !reflect.DeepEqual(e.ObjectNew.GetLabels(), e.ObjectOld.GetLabels()) { return true } updatedNode, ok := e.ObjectNew.(*corev1.Node) if !ok { return false } oldNode, ok := e.ObjectOld.(*corev1.Node) if !ok { return false } return updatedNode.Spec.Unschedulable != oldNode.Spec.Unschedulable } func (r *CouchDBPersistenceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, err error) { reconcileStart := time.Now() log := ctrl.LoggerFrom(ctx) p := &dsapi.CouchDBPersistence{} if err := r.Client.Get(ctx, req.NamespacedName, p); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } ctx = logr.NewContext(ctx, log) patcher := patch.NewSerialPatcher(p, r.Client) if patchErr := reconcile.Progressing(ctx, p, patcher, r.patchOptions...); patchErr != nil { log.Error(patchErr, "unable to update status") return ctrl.Result{}, err } recResult := reconcile.ResultEmpty var recErr recerr.Error defer func() { summarizer := reconcile.NewSummarizer(patcher) res, err = summarizer.SummarizeAndPatch(ctx, p, []reconcile.SummarizeOption{ reconcile.WithConditions(persistenceConditions), reconcile.WithResult(recResult), reconcile.WithError(recErr), reconcile.WithIgnoreNotFound(), reconcile.WithProcessors( reconcile.RecordResult, ), reconcile.WithFieldOwner(r.Name), reconcile.WithEventRecorder(r.EventRecorder), }...) r.Metrics.RecordDuration(ctx, p, reconcileStart) r.Metrics.RecordReadiness(ctx, p) }() // Add datasync finalizer if it does not exist if !controllerutil.ContainsFinalizer(p, DatasyncFinalizer) { controllerutil.AddFinalizer(p, DatasyncFinalizer) recResult = reconcile.ResultRequeue return } // execute finalizer logic if !p.ObjectMeta.DeletionTimestamp.IsZero() { log.Info("executing finalizer") if fErr := pruneInventory(ctx, r.ResourceManager, p); fErr != nil { err = fErr return } controllerutil.RemoveFinalizer(p, DatasyncFinalizer) log.Info("finalizer executed") return } if recErr = r.reconcile(ctx, p); recErr != nil { recErr.ToCondition(p, dsapi.PersistenceSetupSucceededReason) err = recErr return } recResult = reconcile.ResultSuccess conditions.MarkTrue(p, dsapi.PersistenceSetupSucceededReason, status.SucceededReason, "Successfully created CouchDBPersistence resources") log.Info("Successfully created CouchDBPersistence resources") return } func (r *CouchDBPersistenceReconciler) reconcile(ctx context.Context, p *dsapi.CouchDBPersistence) recerr.Error { log := logr.FromContextOrDiscard(ctx) objs := p.PersistenceObjects() if len(objs) == 0 { log.Error(fmt.Errorf("invalid spec"), "no resources provided") return recerr.NewStalled(fmt.Errorf("invalid spec"), "no resources provided") } substitutions, err := r.buildNodeSubstitutions(ctx) if err != nil { log.Error(err, "could not build substitutions") return recerr.NewWait(err, "could not build substitutions", r.Config.RequeueTime) } selectNodeByRole, role := p.NodeRoleFilter() selectNodeByClass, class := p.NodeClassFilter() var uns []*unstructured.Unstructured for _, obj := range objs { for _, s := range substitutions { if selectNodeByRole && s.DSDS && role != s.NodeRole() { continue } if selectNodeByClass && s.DSDS && class != s.NodeClass() { continue } un, err := ApplySubstitutions(obj, s) if err != nil { return recerr.NewStalled(err, "spec invalid substitution") } un.SetOwnerReferences(r.ownerRef(p)) un.SetNamespace(p.Namespace) uns = append(uns, un) } } changeSet, err := r.ResourceManager.ApplyAll(ctx, uns, sap.ApplyOptions{Force: true}) if err != nil { log.Error(err, "fail to apply persistence resources") return recerr.New(err, dsapi.PersistenceObjectsCreationFailedReason) } i := inventory.New(inventory.FromSapChangeSet(changeSet)) if err := r.prune(ctx, p, i); err != nil { log.Error(err, "fail to prune resources") return recerr.New(err, dsapi.PruneFailed) } p.Status.Inventory = i return nil } // buildNodeSubstitutions create substitutions mapping based on node labels // new nodes can be added and deleted func (r *CouchDBPersistenceReconciler) buildNodeSubstitutions(ctx context.Context) (map[string]Substitution, error) { log := logr.FromContextOrDiscard(ctx) m := map[string]Substitution{} if !r.Config.IsDSDS() { su := StoreSubstitution(r.replicationDB) genericUID, err := r.getNodeUIDGeneric(ctx) su.NodeUID = genericUID if err != nil { log.Error(err, "failed to fetch Node UID for generic cluster") return nil, err } m[su.ServerName] = su return m, nil } nodes := &corev1.NodeList{} if err := r.Client.List(ctx, nodes); client.IgnoreNotFound(err) != nil { log.Error(err, "failed to get dsds nodes") return nil, err } oldPVCs, err := oldPVCsSuffixes(ctx, r.Client) if err != nil { log.Error(err, "fail to get old pvcs") return nil, err } leaderNode, err := r.LeaderElector.Elect(nodes.Items) if err != nil { log.Error(err, "fail to elect leader node") return nil, err } log.Info("LEADER NODE", "node", leaderNode.Name) var oldLeader *corev1.Node for i := range nodes.Items { node := nodes.Items[i] if node.Spec.Unschedulable { if node.Labels[couchdb.NodeLeaderLabel] == couchdb.LabelValueTrue { oldLeader = &node } continue } ni, err := nameutils.GetNodeInfo(node, LaneNumberSubstitutionMaxLength) if err != nil { log.Info("Fail to acquire Node Info", "node", node.Name, "err", err) continue } su := LaneSubstitution(ni, oldPVCs, r.replicationDB, string(leaderNode.UID)) // add label to node to be able to schedule statefulsets node.Labels[couchdb.NodeUIDLabel] = string(node.UID) if su.Leader { node.Labels[couchdb.NodeLeaderLabel] = couchdb.LabelValueTrue } else { delete(node.Labels, couchdb.NodeLeaderLabel) } nodeWithLabel := node if err = r.Client.Update(ctx, &nodeWithLabel); err != nil { return nil, fmt.Errorf("fail to update/annotate node: %s, %w", node.Name, err) } m[su.ServerName] = su } if oldLeader != nil { delete(oldLeader.Labels, couchdb.NodeLeaderLabel) if err = r.Client.Update(ctx, oldLeader); err != nil && !errors.IsNotFound(err) { return nil, fmt.Errorf("fail to update/annotate node: %s, %w", oldLeader.Name, err) } } return m, nil } func (r *CouchDBPersistenceReconciler) getNodeUIDGeneric(ctx context.Context) (string, error) { log := logr.FromContextOrDiscard(ctx) cm := &corev1.ConfigMap{} err := r.Client.Get(context.TODO(), client.ObjectKey{ Name: ConfigMapUID, Namespace: r.Config.CouchNamespace, }, cm) if err != nil && errors.IsNotFound(err) { u := uuid.New().String() newCM := &corev1.ConfigMap{ ObjectMeta: metav1.ObjectMeta{ Name: ConfigMapUID, Namespace: r.Config.CouchNamespace, }, Data: map[string]string{ "uuid": u, }, } err = r.Client.Create(ctx, newCM) if err != nil { log.Error(err, "Failed to create ConfigMap with uuid") return "", err } return u, nil } else if err != nil { log.Error(err, "Failed to fetch ConfigMap") return "", err } if u, exists := cm.Data["uuid"]; exists { return u, nil } log.Info("ConfigMap exists but no uuid found, recreating") newUUID := uuid.New().String() cm.Data["uuid"] = newUUID err = clientutils.CreateOrUpdateConfigmap(ctx, r.Client, cm) if err != nil { log.Error(err, "Failed to update ConfigMap with new uuid") return "", err } return newUUID, nil } func (r *CouchDBPersistenceReconciler) prune(ctx context.Context, p *dsapi.CouchDBPersistence, i *inventory.ResourceInventory) error { if p.Status.Inventory != nil { diff, err := inventory.Diff(p.Status.Inventory, i) if err != nil { return nil } if len(diff) > 0 { changeSet, err := r.ResourceManager.DeleteAll(ctx, diff, sap.DefaultDeleteOptions()) if err != nil { return err } log := logr.FromContextOrDiscard(ctx) log.Info("pruned objects", "changeset", changeSet.ToMap()) } } return nil } // ownerRef TODO there is a conflict with persistence func (r *CouchDBPersistenceReconciler) ownerRef(p *dsapi.CouchDBPersistence) []metav1.OwnerReference { return []metav1.OwnerReference{ *metav1.NewControllerRef( p, dsapi.GroupVersion.WithKind("CouchDBPersistence"), ), } }