...

Source file src/edge-infra.dev/pkg/sds/ien/k8s/controllers/pxe/scaler.go

Documentation: edge-infra.dev/pkg/sds/ien/k8s/controllers/pxe

     1  package pxe
     2  
     3  import (
     4  	"context"
     5  
     6  	corev1 "k8s.io/api/core/v1"
     7  	kerrors "k8s.io/apimachinery/pkg/api/errors"
     8  	"k8s.io/client-go/dynamic"
     9  	"sigs.k8s.io/cli-utils/pkg/kstatus/watcher"
    10  	ctrl "sigs.k8s.io/controller-runtime"
    11  	"sigs.k8s.io/controller-runtime/pkg/builder"
    12  	"sigs.k8s.io/controller-runtime/pkg/client"
    13  	"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
    14  	"sigs.k8s.io/controller-runtime/pkg/event"
    15  	"sigs.k8s.io/controller-runtime/pkg/handler"
    16  	"sigs.k8s.io/controller-runtime/pkg/predicate"
    17  	"sigs.k8s.io/controller-runtime/pkg/reconcile"
    18  
    19  	"edge-infra.dev/pkg/k8s/runtime/patch"
    20  	"edge-infra.dev/pkg/k8s/runtime/sap"
    21  	"edge-infra.dev/pkg/lib/fog"
    22  	"edge-infra.dev/pkg/sds/ien/bootoptions"
    23  	v1ienode "edge-infra.dev/pkg/sds/ien/k8s/apis/v1"
    24  	v1pxe "edge-infra.dev/pkg/sds/ien/k8s/controllers/pxe/apis/v1"
    25  	"edge-infra.dev/pkg/sds/ien/k8s/controllers/pxe/dnsmasq"
    26  	"edge-infra.dev/pkg/sds/ien/k8s/controllers/pxe/staticfileserver"
    27  )
    28  
    29  const scalerPXEName = "scaler"
    30  
    31  // Scaler acts as a controller that can be used to scale the cluster resources
    32  // needed to facilitate PXE booting. This is done based on whether there are
    33  // nodes that require registration and whether PXE booting is enabled for the
    34  // cluster
    35  type Scaler struct {
    36  	Reconciler
    37  }
    38  
    39  // NewScaler returns a new Scaler using the provided client
    40  func NewScaler(mgr ctrl.Manager, cli client.Client) (*Scaler, error) {
    41  	name := scalerPXEName
    42  
    43  	d, err := dynamic.NewForConfig(mgr.GetConfig())
    44  	if err != nil {
    45  		return nil, err
    46  	}
    47  
    48  	manager := sap.NewResourceManager(
    49  		cli,
    50  		watcher.NewDefaultStatusWatcher(d, mgr.GetRESTMapper()),
    51  		sap.Owner{Field: name},
    52  	)
    53  
    54  	return &Scaler{
    55  		Reconciler: Reconciler{
    56  			name,
    57  			cli,
    58  			manager,
    59  		},
    60  	}, nil
    61  }
    62  
    63  // SetupWithManager sets up the scaler controller with the provided manager
    64  func (s *Scaler) SetupWithManager(mgr ctrl.Manager) error {
    65  	return ctrl.NewControllerManagedBy(mgr).
    66  		For(
    67  			&v1pxe.PXE{},
    68  			builder.WithPredicates(predicate.GenerationChangedPredicate{}, s.isScaler(), s.ignoreDelete(), predicate.Or(s.isDeleteRequested(), s.isSuspendChanged())),
    69  		).
    70  		Watches( // react to boot-options changes as PXE may have been enabled/disabled
    71  			&corev1.ConfigMap{},
    72  			handler.EnqueueRequestsFromMapFunc(s.createReconcileRequest),
    73  			builder.WithPredicates(
    74  				predicate.ResourceVersionChangedPredicate{},
    75  				s.isBootOptionsConfigMap(),
    76  				s.ignoreDelete(),
    77  			),
    78  		).
    79  		Watches( // react to node events as they represent nodes being installed/uninstalled
    80  			&corev1.Node{},
    81  			handler.EnqueueRequestsFromMapFunc(s.createReconcileRequest),
    82  			builder.WithPredicates(s.ignoreUpdate()),
    83  		).
    84  		Watches( // react to ienode events as they represent nodes being registered/unregistered
    85  			&v1ienode.IENode{},
    86  			handler.EnqueueRequestsFromMapFunc(s.createReconcileRequest),
    87  			builder.WithPredicates(s.ignoreUpdate()),
    88  		).Complete(s)
    89  }
    90  
    91  // isScaler returns a predicate func that will return true if the object has the
    92  // scaler pxe name, otherwise it will return false
    93  func (s *Scaler) isScaler() predicate.Funcs {
    94  	return predicate.NewPredicateFuncs(func(obj client.Object) bool {
    95  		return obj.GetName() == scalerPXEName
    96  	})
    97  }
    98  
    99  // isDeleteRequested returns a predicate func that will return true if the
   100  // object has been requested to be deleted, otherwise it will return false
   101  func (s *Scaler) isDeleteRequested() predicate.Funcs {
   102  	return predicate.NewPredicateFuncs(func(obj client.Object) bool {
   103  		return !obj.GetDeletionTimestamp().IsZero()
   104  	})
   105  }
   106  
   107  // isSuspendChanged returns a predicate func that will return true if the object
   108  // has been updated and in that update the suspend option was changed, otherwise
   109  // will return false
   110  func (s *Scaler) isSuspendChanged() predicate.Funcs {
   111  	return predicate.Funcs{
   112  		CreateFunc: func(_ event.CreateEvent) bool {
   113  			return false
   114  		},
   115  		UpdateFunc: func(e event.UpdateEvent) bool {
   116  			pxeOld := e.ObjectOld.(*v1pxe.PXE)
   117  			pxeNew := e.ObjectNew.(*v1pxe.PXE)
   118  
   119  			return pxeOld.Spec.Suspend != pxeNew.Spec.Suspend
   120  		},
   121  		DeleteFunc: func(_ event.DeleteEvent) bool {
   122  			return false
   123  		},
   124  	}
   125  }
   126  
   127  // isBootOptionsConfigMap returns a predicate func that will return true if the
   128  // provided object is the boot options configmap, otherwise it will return false
   129  func (s *Scaler) isBootOptionsConfigMap() predicate.Funcs {
   130  	return predicate.NewPredicateFuncs(func(obj client.Object) bool {
   131  		return bootoptions.IsBootOptionsConfigMap(obj)
   132  	})
   133  }
   134  
   135  // ignoreDelete returns a predicate func that ignores object delete events
   136  func (s *Scaler) ignoreDelete() predicate.Funcs {
   137  	return predicate.Funcs{
   138  		DeleteFunc: func(_ event.DeleteEvent) bool {
   139  			return false
   140  		},
   141  	}
   142  }
   143  
   144  // ignoreUpdate returns a predicate func that ignores object update events
   145  func (s *Scaler) ignoreUpdate() predicate.Funcs {
   146  	return predicate.Funcs{
   147  		UpdateFunc: func(_ event.UpdateEvent) bool {
   148  			return false
   149  		},
   150  	}
   151  }
   152  
   153  // createReconcileRequest returns a reconcile request for the scaler pxe
   154  // instance
   155  func (s *Scaler) createReconcileRequest(_ context.Context, _ client.Object) []reconcile.Request {
   156  	return []reconcile.Request{
   157  		{
   158  			NamespacedName: client.ObjectKey{
   159  				Name: scalerPXEName,
   160  			},
   161  		},
   162  	}
   163  }
   164  
   165  // Reconcile on the scaler PXE instance. During reconciliation, it will be
   166  // determined whether the resources needed to PXE boot nodes are currently
   167  // required. If they are, then the dnsmasq-controller and static-file-server
   168  // deployments will be scaled up to 1 replica each and the global dnsmasq
   169  // options configuration will be created. If not, this process is reversed and
   170  // the dnsmasq-controller and static-file-server deployments will be scaled to 0
   171  // replicas and the global dnsmasq options configuration will be deleted
   172  func (s *Scaler) Reconcile(ctx context.Context, req ctrl.Request) (res ctrl.Result, recErr error) {
   173  	log := fog.FromContext(ctx).WithValues("reconciler", s.name)
   174  	ctx = fog.IntoContext(ctx, log)
   175  
   176  	log.Info("reconciling")
   177  
   178  	pxe := &v1pxe.PXE{}
   179  	if err := s.client.Get(ctx, req.NamespacedName, pxe); err != nil {
   180  		return ctrl.Result{}, client.IgnoreNotFound(err)
   181  	}
   182  
   183  	// if scaler pxe is disabled, do nothing
   184  	if pxe.Spec.Suspend {
   185  		log.Info("reconciliation suspended")
   186  
   187  		return ctrl.Result{}, nil
   188  	}
   189  
   190  	pxeMgr := newPXEManager(s.manager, pxe)
   191  
   192  	options, err := bootoptions.FromClient(ctx, s.client)
   193  	if err != nil {
   194  		return ctrl.Result{}, err
   195  	}
   196  
   197  	patcher := patch.NewSerialPatcher(pxe, s.client)
   198  	defer func() {
   199  		res, recErr = s.summarize(ctx, patcher, pxe, recErr)
   200  	}()
   201  
   202  	// is pxe booting is disabled, disable the pxe booting resources
   203  	if !options.PXEEnabled {
   204  		log.Info("PXE booting is disabled - scaling down PXE resources")
   205  
   206  		return ctrl.Result{}, s.disablePXEBooting(ctx, pxeMgr)
   207  	}
   208  
   209  	// is pxe resource has been scheduled for deletion
   210  	if !pxe.GetDeletionTimestamp().IsZero() {
   211  		log.Info("PXE resource scheduled for deletion - scaling down PXE resources")
   212  
   213  		controllerutil.RemoveFinalizer(pxe, v1pxe.Finalizer)
   214  		return ctrl.Result{}, s.disablePXEBooting(ctx, pxeMgr)
   215  	}
   216  
   217  	required, err := nodesNeedInstallation(ctx, s.client)
   218  	if err != nil {
   219  		return ctrl.Result{}, err
   220  	}
   221  	// is no nodes require installation, disable the pxe booting resources
   222  	if !required {
   223  		log.Info("no nodes currently require installation - scaling down PXE resources")
   224  
   225  		return ctrl.Result{}, s.disablePXEBooting(ctx, pxeMgr)
   226  	}
   227  
   228  	log.Info("scaling up PXE resources")
   229  	return ctrl.Result{}, s.enablePXEBooting(ctx, pxeMgr)
   230  }
   231  
   232  // nodesNeedInstallation returns true if there are any ienodes in the cluster
   233  // that do not have a corresponding node resource, otherwise returns false. The
   234  // presence of a node resource implies that the node has been successfully
   235  // installed and the kubelet has registered the node
   236  func nodesNeedInstallation(ctx context.Context, cli client.Client) (bool, error) {
   237  	ienodeList := &v1ienode.IENodeList{}
   238  	if err := cli.List(ctx, ienodeList); err != nil {
   239  		return false, err
   240  	}
   241  
   242  	for i := range ienodeList.Items {
   243  		err := cli.Get(ctx, client.ObjectKeyFromObject(&ienodeList.Items[i]), &corev1.Node{})
   244  		if client.IgnoreNotFound(err) != nil {
   245  			return false, err
   246  		}
   247  		// if node is not found, it needs installation
   248  		if kerrors.IsNotFound(err) {
   249  			return true, nil
   250  		}
   251  	}
   252  	return false, nil
   253  }
   254  
   255  // enablePXEBooting will scale both the static-file-server and
   256  // dnsmasq-controller deployments up to 1 replica and create/update the global
   257  // dnsmasqoptions configuration
   258  func (s *Scaler) enablePXEBooting(ctx context.Context, pxeMgr pxeManager) error {
   259  	dnsmasqDeployment, err := dnsmasq.ScaledUpDeployment()
   260  	if err != nil {
   261  		return err
   262  	}
   263  
   264  	sfsConfigMap, err := staticfileserver.ConfigMap()
   265  	if err != nil {
   266  		return err
   267  	}
   268  
   269  	sfsDeployment, err := staticfileserver.ScaledUpDeployment()
   270  	if err != nil {
   271  		return err
   272  	}
   273  
   274  	ienodeList := &v1ienode.IENodeList{}
   275  	if err := s.client.List(ctx, ienodeList); err != nil {
   276  		return err
   277  	}
   278  	globalDNSMasq, err := dnsmasq.GlobalDNSMasqManifest(ienodeList)
   279  	if err != nil {
   280  		return err
   281  	}
   282  
   283  	return pxeMgr.apply(ctx, dnsmasqDeployment, sfsConfigMap, sfsDeployment, globalDNSMasq)
   284  }
   285  
   286  // disablePXEBooting scales both the static-file-server and dnsmasq-controller
   287  // deployments down to 0 replicas and deletes the global dnsmasqoptions
   288  // configuration
   289  func (s *Scaler) disablePXEBooting(ctx context.Context, pxeMgr pxeManager) error {
   290  	dnsmasqDeployment, err := dnsmasq.ScaledDownDeployment()
   291  	if err != nil {
   292  		return err
   293  	}
   294  
   295  	sfsDeployment, err := staticfileserver.ScaledDownDeployment()
   296  	if err != nil {
   297  		return err
   298  	}
   299  
   300  	return pxeMgr.apply(ctx, dnsmasqDeployment, sfsDeployment)
   301  }
   302  

View as plain text