package lifecycle import ( "context" "fmt" "time" corev1 "k8s.io/api/core/v1" k8stypes "k8s.io/apimachinery/pkg/types" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "edge-infra.dev/pkg/lib/fog" v1etcd "edge-infra.dev/pkg/sds/etcd/operator/apis/etcdmember/v1" "edge-infra.dev/pkg/sds/etcd/operator/internal/config" "edge-infra.dev/pkg/sds/etcd/operator/internal/metrics" "edge-infra.dev/pkg/sds/etcd/operator/internal/resources" ) type Reconciler struct { config.Config *metrics.Metrics } type resultOptions struct { startTime time.Time handlers *Handlers recErr error } // Handlers holds the handlers to manage the kubernetes resources // that the Reconciler depends on type Handlers struct { member *resources.EtcdMemberHandler node *resources.NodeHandler } // SetupWithManager builds the controller for the manager func (r *Reconciler) SetupWithManager(cfg config.Config, initialMembers *v1etcd.EtcdMemberList) error { r.Config = cfg r.Metrics = metrics.New(r.Mgr, "lifecycle") r.Metrics.Custom.Run(initialMembers) return ctrl.NewControllerManagedBy(r.Mgr). For(&v1etcd.EtcdMember{}, builder.WithPredicates(r.etcdMemberEventFilter())). Watches( &corev1.Node{}, handler.EnqueueRequestsFromMapFunc(r.nodeReconcileRequests), builder.WithPredicates(r.nodeEventFilter()), ). Complete(r) } // nodeReconcileRequests returns a list of node reconcile requests func (r *Reconciler) nodeReconcileRequests(_ context.Context, obj client.Object) []ctrl.Request { nodeName := obj.GetName() return []reconcile.Request{ { NamespacedName: k8stypes.NamespacedName{ Name: nodeName, }, }, } } // etcdMemberEventFilter filters kubernetes events for EtcdMember resources so that we only // reconcile on EtcdMember deletions or when an EtcdMember is showing as unhealthy func (r *Reconciler) etcdMemberEventFilter() predicate.Predicate { return predicate.Funcs{ CreateFunc: func(_ event.CreateEvent) bool { return false }, UpdateFunc: func(e event.UpdateEvent) bool { etcdMember := e.ObjectNew.(*v1etcd.EtcdMember) // deep copy the EtcdMember into the EtcdMemberHandler handler := resources.NewEtcdMemberHandlerBuilder().Build() handler.DeepCopyFrom(etcdMember) // reconcile if the EtcdMember is being deleted return !handler.DeletionTimestamp.IsZero() }, DeleteFunc: func(_ event.DeleteEvent) bool { return false }, } } // nodeEventFilter filters kubernetes events for Node resources so that we only // reconcile on Node creations func (r *Reconciler) nodeEventFilter() predicate.Predicate { return predicate.Funcs{ // only reconcile on Node creations CreateFunc: func(_ event.CreateEvent) bool { return true }, UpdateFunc: func(e event.UpdateEvent) bool { return hasIENVersionChanged(e.ObjectOld, e.ObjectNew) || hasNodeRoleChanged(e.ObjectOld, e.ObjectNew) }, DeleteFunc: func(_ event.DeleteEvent) bool { return false }, } } func hasNodeRoleChanged(oldObj, newObj client.Object) bool { oldNode := oldObj.(*corev1.Node) oldHandler := resources.NewNodeHandlerBuilder().Build() oldHandler.DeepCopyFrom(oldNode) newNode := newObj.(*corev1.Node) newHandler := resources.NewNodeHandlerBuilder().Build() newHandler.DeepCopyFrom(newNode) return oldHandler.IsWorker() != newHandler.IsWorker() } func hasIENVersionChanged(oldObj, newObj client.Object) bool { oldNode := oldObj.(*corev1.Node) oldHandler := resources.NewNodeHandlerBuilder().Build() oldHandler.DeepCopyFrom(oldNode) newNode := newObj.(*corev1.Node) newHandler := resources.NewNodeHandlerBuilder().Build() newHandler.DeepCopyFrom(newNode) return newHandler.IENVersion() != oldHandler.IENVersion() } // Reconcile is the top-level reconcilliation function for node object create events, // EtcdMember delete events, and EtcdMember becoming unhealthy. The setup is carried // out and then EtcdMembers are deleted/created as required. The following scenarios // are handled: // // 1. Node is a controlplane - do nothing // // 2. Node that the EtcdMember was created for still exists - delete the current // EtcdMember if one exists and then create a new one // // 3. Node that the EtcdMember was created for no longer exists - delete the current // EtcdMember if one exists, but do not create a new one func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := fog.FromContext(ctx).WithName(r.Name) ctx = fog.IntoContext(ctx, log) startTime := time.Now() log.V(0).Info("started EtcdMember lifecycle reconciliation loop") defer log.V(0).Info("ended EtcdMember lifecycle reconciliation loop") if err := r.WithDefaultEtcdRetryClient(ctx); err != nil { return ctrl.Result{}, fmt.Errorf("failed to setup etcd retry client: %w", err) } defer r.EtcdRetryClient.Close() handlers, err := r.generateHandlers(ctx, req) if err != nil { return ctrl.Result{}, err } if pass := r.checkPreconditions(ctx, handlers); !pass { return ctrl.Result{}, nil } var recErr error defer func() { resultOpts := resultOptions{ startTime, handlers, recErr, } r.recordResults(ctx, resultOpts) }() recErr = r.reconcile(ctx, handlers) if recErr != nil { return ctrl.Result{}, recErr } return ctrl.Result{}, nil } // setup runs the setup logic for the reconciler. This includes setting up the resource // handlers and checking preconditions func (r *Reconciler) generateHandlers(ctx context.Context, req ctrl.Request) (*Handlers, error) { handlers := &Handlers{} if err := r.setNodeHandler(ctx, req, handlers); err != nil { return nil, err } if err := r.setMemberHandler(ctx, req, handlers); err != nil { return nil, err } return handlers, nil } // setNodeHandler sets the node handler for the reconciler and retrieves // the latest Node object from the kubernetes API server func (r *Reconciler) setNodeHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error { handlers.node = resources.NewNodeHandlerBuilder(). WithClient(r.KubeRetryClient). WithKey(req.NamespacedName). HandlesNode(). Named(req.Name). Build() // reconcile the local copy of the Node with updated data from the remote copy err := handlers.node.ReconcileLocal(ctx) if client.IgnoreNotFound(err) != nil { return fmt.Errorf("failed to retrieve node: %w", err) } // if the Node exists, set Found to true. This is used as a behavioural flag if err == nil { handlers.node.Found = true } return nil } // setMemberHandler sets the member handler for the reconciler and retrieves // the latest EtcdMember object from the kubernetes API server func (r *Reconciler) setMemberHandler(ctx context.Context, req ctrl.Request, handlers *Handlers) error { handlers.member = resources.NewEtcdMemberHandlerBuilder(). WithClient(r.KubeRetryClient). WithKey(req.NamespacedName). HandlesEtcdMember(). Named(req.Name). Build() // reconcile the local copy of the EtcdMember with updated data from the remote copy err := handlers.member.ReconcileLocal(ctx) if client.IgnoreNotFound(err) != nil { return fmt.Errorf("failed to retrieve EtcdMember: %w", err) } // if the EtcdMember exists, set Found to true. This is used as a behavioural flag if err == nil { handlers.member.Found = true } return nil } // checkPreconditions checks the preconditions for the reconciler to ensure the Node // is a worker and that the EtcdMember is not suspended func (r *Reconciler) checkPreconditions(ctx context.Context, handlers *Handlers) bool { log := fog.FromContext(ctx) // if the node did exist and is not a worker (is a controlplane), we do not want to // reconcile if handlers.node.Found && !handlers.node.IsWorker() { log.V(0).Info("node does not have the worker role") return false } if handlers.member.IsSuspended() { log.V(0).Info("EtcdMember reconciliation is suspended", "suspended", "true") return false } return true } // reconcile runs the reconcile logic for the reconciler. This includes reconciling // the deletion and creation of EtcdMembers func (r *Reconciler) reconcile(ctx context.Context, handlers *Handlers) error { log := fog.FromContext(ctx) // if the EtcdMember is being deleted, is unhealthy, or was installed for a previous // IEN version, then we want to delete the current EtcdMember IENVersion := handlers.node.IENVersion() if handlers.member.Found && handlers.member.DeletionRequired(IENVersion) { if err := r.destroy(ctx, handlers); err != nil { return err } log.V(0).Info("deleted EtcdMember", "eoaudit", "") } // if the Node that the EtcdMember is to be made for does not exist, // we do not want to continue creating a new EtcdMember if !handlers.node.Found { return nil } return r.create(ctx, handlers) } // recordResults takes the results of the reconcile and records the metrics for them func (r *Reconciler) recordResults(ctx context.Context, opts resultOptions) { r.Metrics.Default.RecordDuration(ctx, opts.handlers.member.EtcdMember, opts.startTime) r.Metrics.Custom.RecordReconciliation(opts.handlers.member.EtcdMember) r.Metrics.Custom.RecordReconciliationError(opts.recErr, opts.handlers.member.EtcdMember) }