package pxe import ( "context" corev1 "k8s.io/api/core/v1" kerrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/client-go/dynamic" "sigs.k8s.io/cli-utils/pkg/kstatus/watcher" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" "edge-infra.dev/pkg/k8s/runtime/patch" "edge-infra.dev/pkg/k8s/runtime/sap" "edge-infra.dev/pkg/lib/fog" "edge-infra.dev/pkg/sds/ien/bootoptions" v1ienode "edge-infra.dev/pkg/sds/ien/k8s/apis/v1" v1pxe "edge-infra.dev/pkg/sds/ien/k8s/controllers/pxe/apis/v1" "edge-infra.dev/pkg/sds/ien/k8s/controllers/pxe/dnsmasq" "edge-infra.dev/pkg/sds/ien/k8s/controllers/pxe/staticfileserver" ) const scalerPXEName = "scaler" // Scaler acts as a controller that can be used to scale the cluster resources // needed to facilitate PXE booting. This is done based on whether there are // nodes that require registration and whether PXE booting is enabled for the // cluster type Scaler struct { Reconciler } // NewScaler returns a new Scaler using the provided client func NewScaler(mgr ctrl.Manager, cli client.Client) (*Scaler, error) { name := scalerPXEName d, err := dynamic.NewForConfig(mgr.GetConfig()) if err != nil { return nil, err } manager := sap.NewResourceManager( cli, watcher.NewDefaultStatusWatcher(d, mgr.GetRESTMapper()), sap.Owner{Field: name}, ) return &Scaler{ Reconciler: Reconciler{ name, cli, manager, }, }, nil } // SetupWithManager sets up the scaler controller with the provided manager func (s *Scaler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). For( &v1pxe.PXE{}, builder.WithPredicates(predicate.GenerationChangedPredicate{}, s.isScaler(), s.ignoreDelete(), predicate.Or(s.isDeleteRequested(), s.isSuspendChanged())), ). Watches( // react to boot-options changes as PXE may have been enabled/disabled &corev1.ConfigMap{}, handler.EnqueueRequestsFromMapFunc(s.createReconcileRequest), builder.WithPredicates( predicate.ResourceVersionChangedPredicate{}, s.isBootOptionsConfigMap(), s.ignoreDelete(), ), ). Watches( // react to node events as they represent nodes being installed/uninstalled &corev1.Node{}, handler.EnqueueRequestsFromMapFunc(s.createReconcileRequest), builder.WithPredicates(s.ignoreUpdate()), ). Watches( // react to ienode events as they represent nodes being registered/unregistered &v1ienode.IENode{}, handler.EnqueueRequestsFromMapFunc(s.createReconcileRequest), builder.WithPredicates(s.ignoreUpdate()), ).Complete(s) } // isScaler returns a predicate func that will return true if the object has the // scaler pxe name, otherwise it will return false func (s *Scaler) isScaler() predicate.Funcs { return predicate.NewPredicateFuncs(func(obj client.Object) bool { return obj.GetName() == scalerPXEName }) } // isDeleteRequested returns a predicate func that will return true if the // object has been requested to be deleted, otherwise it will return false func (s *Scaler) isDeleteRequested() predicate.Funcs { return predicate.NewPredicateFuncs(func(obj client.Object) bool { return !obj.GetDeletionTimestamp().IsZero() }) } // isSuspendChanged returns a predicate func that will return true if the object // has been updated and in that update the suspend option was changed, otherwise // will return false func (s *Scaler) isSuspendChanged() predicate.Funcs { return predicate.Funcs{ CreateFunc: func(_ event.CreateEvent) bool { return false }, UpdateFunc: func(e event.UpdateEvent) bool { pxeOld := e.ObjectOld.(*v1pxe.PXE) pxeNew := e.ObjectNew.(*v1pxe.PXE) return pxeOld.Spec.Suspend != pxeNew.Spec.Suspend }, DeleteFunc: func(_ event.DeleteEvent) bool { return false }, } } // isBootOptionsConfigMap returns a predicate func that will return true if the // provided object is the boot options configmap, otherwise it will return false func (s *Scaler) isBootOptionsConfigMap() predicate.Funcs { return predicate.NewPredicateFuncs(func(obj client.Object) bool { return bootoptions.IsBootOptionsConfigMap(obj) }) } // ignoreDelete returns a predicate func that ignores object delete events func (s *Scaler) ignoreDelete() predicate.Funcs { return predicate.Funcs{ DeleteFunc: func(_ event.DeleteEvent) bool { return false }, } } // ignoreUpdate returns a predicate func that ignores object update events func (s *Scaler) ignoreUpdate() predicate.Funcs { return predicate.Funcs{ UpdateFunc: func(_ event.UpdateEvent) bool { return false }, } } // createReconcileRequest returns a reconcile request for the scaler pxe // instance func (s *Scaler) createReconcileRequest(_ context.Context, _ client.Object) []reconcile.Request { return []reconcile.Request{ { NamespacedName: client.ObjectKey{ Name: scalerPXEName, }, }, } } // Reconcile on the scaler PXE instance. During reconciliation, it will be // determined whether the resources needed to PXE boot nodes are currently // required. If they are, then the dnsmasq-controller and static-file-server // deployments will be scaled up to 1 replica each and the global dnsmasq // options configuration will be created. If not, this process is reversed and // the dnsmasq-controller and static-file-server deployments will be scaled to 0 // replicas and the global dnsmasq options configuration will be deleted func (s *Scaler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) { log := fog.FromContext(ctx).WithValues("reconciler", s.name) ctx = fog.IntoContext(ctx, log) log.Info("reconciling") pxe := &v1pxe.PXE{} if err := s.client.Get(ctx, req.NamespacedName, pxe); err != nil { return ctrl.Result{}, client.IgnoreNotFound(err) } // if scaler pxe is disabled, do nothing if pxe.Spec.Suspend { log.Info("reconciliation suspended") return ctrl.Result{}, nil } pxeMgr := newPXEManager(s.manager, pxe) options, err := bootoptions.FromClient(ctx, s.client) if err != nil { return ctrl.Result{}, err } patcher := patch.NewSerialPatcher(pxe, s.client) defer func() { res, recErr = s.summarize(ctx, patcher, pxe, recErr) }() // is pxe booting is disabled, disable the pxe booting resources if !options.PXEEnabled { log.Info("PXE booting is disabled - scaling down PXE resources") return ctrl.Result{}, s.disablePXEBooting(ctx, pxeMgr) } // is pxe resource has been scheduled for deletion if !pxe.GetDeletionTimestamp().IsZero() { log.Info("PXE resource scheduled for deletion - scaling down PXE resources") controllerutil.RemoveFinalizer(pxe, v1pxe.Finalizer) return ctrl.Result{}, s.disablePXEBooting(ctx, pxeMgr) } required, err := nodesNeedInstallation(ctx, s.client) if err != nil { return ctrl.Result{}, err } // is no nodes require installation, disable the pxe booting resources if !required { log.Info("no nodes currently require installation - scaling down PXE resources") return ctrl.Result{}, s.disablePXEBooting(ctx, pxeMgr) } log.Info("scaling up PXE resources") return ctrl.Result{}, s.enablePXEBooting(ctx, pxeMgr) } // nodesNeedInstallation returns true if there are any ienodes in the cluster // that do not have a corresponding node resource, otherwise returns false. The // presence of a node resource implies that the node has been successfully // installed and the kubelet has registered the node func nodesNeedInstallation(ctx context.Context, cli client.Client) (bool, error) { ienodeList := &v1ienode.IENodeList{} if err := cli.List(ctx, ienodeList); err != nil { return false, err } for i := range ienodeList.Items { err := cli.Get(ctx, client.ObjectKeyFromObject(&ienodeList.Items[i]), &corev1.Node{}) if client.IgnoreNotFound(err) != nil { return false, err } // if node is not found, it needs installation if kerrors.IsNotFound(err) { return true, nil } } return false, nil } // enablePXEBooting will scale both the static-file-server and // dnsmasq-controller deployments up to 1 replica and create/update the global // dnsmasqoptions configuration func (s *Scaler) enablePXEBooting(ctx context.Context, pxeMgr pxeManager) error { dnsmasqDeployment, err := dnsmasq.ScaledUpDeployment() if err != nil { return err } sfsConfigMap, err := staticfileserver.ConfigMap() if err != nil { return err } sfsDeployment, err := staticfileserver.ScaledUpDeployment() if err != nil { return err } ienodeList := &v1ienode.IENodeList{} if err := s.client.List(ctx, ienodeList); err != nil { return err } globalDNSMasq, err := dnsmasq.GlobalDNSMasqManifest(ienodeList) if err != nil { return err } return pxeMgr.apply(ctx, dnsmasqDeployment, sfsConfigMap, sfsDeployment, globalDNSMasq) } // disablePXEBooting scales both the static-file-server and dnsmasq-controller // deployments down to 0 replicas and deletes the global dnsmasqoptions // configuration func (s *Scaler) disablePXEBooting(ctx context.Context, pxeMgr pxeManager) error { dnsmasqDeployment, err := dnsmasq.ScaledDownDeployment() if err != nil { return err } sfsDeployment, err := staticfileserver.ScaledDownDeployment() if err != nil { return err } return pxeMgr.apply(ctx, dnsmasqDeployment, sfsDeployment) }