...

Source file src/edge-infra.dev/pkg/sds/lanoutage/scheduler/controller.go

Documentation: edge-infra.dev/pkg/sds/lanoutage/scheduler

     1  package scheduler
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"time"
     7  
     8  	"github.com/hashicorp/go-multierror"
     9  	"k8s.io/client-go/kubernetes"
    10  	"k8s.io/client-go/rest"
    11  	ctrl "sigs.k8s.io/controller-runtime"
    12  	"sigs.k8s.io/controller-runtime/pkg/client"
    13  	cruntimeconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
    14  	kmanager "sigs.k8s.io/controller-runtime/pkg/manager"
    15  
    16  	"edge-infra.dev/pkg/k8s/runtime/controller"
    17  
    18  	"edge-infra.dev/pkg/sds/lanoutage/scheduler/internal/filters"
    19  
    20  	v1 "k8s.io/api/core/v1"
    21  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    22  	kruntime "k8s.io/apimachinery/pkg/runtime"
    23  	"k8s.io/apimachinery/pkg/types"
    24  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    25  	clientgoscheme "k8s.io/client-go/kubernetes/scheme"
    26  
    27  	kctl "edge-infra.dev/pkg/k8s/runtime/controller"
    28  
    29  	"edge-infra.dev/pkg/lib/fog"
    30  )
    31  
    32  var (
    33  	ControllerName = "lanoutagescheduler"
    34  	RequeueTime    = time.Second * 20
    35  )
    36  
    37  type Controller struct {
    38  	client.Client
    39  	*kubernetes.Clientset
    40  	hostname string
    41  }
    42  
    43  func Run(hostname string) error {
    44  	ctrl.SetLogger(fog.New().WithName("scheduler"))
    45  
    46  	mgr, err := CreateControllerManager()
    47  	if err != nil {
    48  		return fmt.Errorf("error creating controller manager: %v", err)
    49  	}
    50  
    51  	config, err := cruntimeconfig.GetConfig()
    52  	if err != nil {
    53  		return err
    54  	}
    55  
    56  	if err := RegisterController(mgr, config, hostname); err != nil {
    57  		return err
    58  	}
    59  
    60  	return mgr.Start(ctrl.SetupSignalHandler())
    61  }
    62  
    63  func RegisterController(mgr kmanager.Manager, cfg *rest.Config, hostname string) error {
    64  	c := Controller{}
    65  	c.Client = mgr.GetClient()
    66  	clientset, err := kubernetes.NewForConfig(cfg)
    67  	if err != nil {
    68  		return err
    69  	}
    70  	c.Clientset = clientset
    71  	c.hostname = hostname
    72  	return c.SetupWithManager(mgr)
    73  }
    74  
    75  func (c *Controller) SetupWithManager(mgr kmanager.Manager) error {
    76  	return ctrl.NewControllerManagedBy(mgr).
    77  		For(&v1.Pod{}).
    78  		Complete(c)
    79  }
    80  
    81  func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
    82  	log := ctrl.LoggerFrom(ctx)
    83  
    84  	pod := &v1.Pod{}
    85  	err := c.Client.Get(ctx, req.NamespacedName, pod)
    86  
    87  	// requeue on any non-not found errors
    88  	if client.IgnoreNotFound(err) != nil {
    89  		log.Error(err, "error occurred fetching the pod")
    90  		return ctrl.Result{RequeueAfter: RequeueTime}, nil
    91  	}
    92  
    93  	// ignore pods that are not found
    94  	if err != nil {
    95  		return ctrl.Result{}, nil
    96  	}
    97  
    98  	// fetch node resource
    99  	node, err := c.node(ctx)
   100  	if err != nil {
   101  		log.Error(err, "error occurred fetching the node")
   102  		return ctrl.Result{RequeueAfter: RequeueTime}, nil
   103  	}
   104  
   105  	// filter out all pods which are not schedulable during lan-outage
   106  	for funcName, predicateFunc := range filters.PodPredicateFuncs {
   107  		if schedulable, _ := predicateFunc(pod, &v1.Node{}); !schedulable {
   108  			log.V(1).Info("pod was ignored due to predicate function", "function", funcName)
   109  			return ctrl.Result{}, nil
   110  		}
   111  	}
   112  
   113  	scheduledNode, err := c.findSchedulableNode(pod, node)
   114  	if scheduledNode == nil {
   115  		log.Error(err, "failed to schedule pod to node")
   116  		return ctrl.Result{RequeueAfter: RequeueTime}, nil
   117  	}
   118  
   119  	if err := c.schedulePod(ctx, pod, scheduledNode); err != nil {
   120  		log.Error(err, "failed to schedule pod to node", "node", scheduledNode.ObjectMeta.Name)
   121  		return ctrl.Result{RequeueAfter: RequeueTime}, nil
   122  	}
   123  
   124  	log.Info("pod was scheduled to node", "node", scheduledNode.ObjectMeta.Name)
   125  
   126  	return ctrl.Result{}, nil
   127  }
   128  
   129  func (c *Controller) findSchedulableNode(pod *v1.Pod, node *v1.Node) (*v1.Node, error) {
   130  	var schedulingErrs error
   131  	for _, predicateFunc := range filters.NodePredicateFuncs {
   132  		schedulable, err := predicateFunc(pod, node)
   133  		if err != nil {
   134  			schedulingErrs = multierror.Append(schedulingErrs, err)
   135  			continue
   136  		}
   137  		if schedulable {
   138  			return node, err
   139  		}
   140  	}
   141  	return nil, schedulingErrs
   142  }
   143  
   144  // until controller-runtime 0.16.0 we have to send HTTP request to bind the pod to a node
   145  // the following enables us to use the clientset using the configuration loaded by controller-runtime
   146  func (c *Controller) schedulePod(ctx context.Context, pod *v1.Pod, node *v1.Node) error {
   147  	binding := podBinding(pod, node)
   148  	return c.Clientset.CoreV1().Pods(pod.ObjectMeta.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
   149  }
   150  
   151  func (c *Controller) node(ctx context.Context) (*v1.Node, error) {
   152  	node := &v1.Node{}
   153  	if err := c.Client.Get(ctx, types.NamespacedName{Name: c.hostname}, node); err != nil {
   154  		return &v1.Node{}, err
   155  	}
   156  	return node, nil
   157  }
   158  
   159  func CreateControllerManager(options ...controller.Option) (ctrl.Manager, error) {
   160  	ctlCfg, opts := kctl.ProcessOptions(options...)
   161  	opts.Scheme = createScheme()
   162  
   163  	mgr, err := ctrl.NewManager(ctlCfg, opts)
   164  	if err != nil {
   165  		return nil, fmt.Errorf("failed to create controller manager: %v", err)
   166  	}
   167  
   168  	return mgr, nil
   169  }
   170  
   171  func createScheme() *kruntime.Scheme {
   172  	scheme := kruntime.NewScheme()
   173  	utilruntime.Must(clientgoscheme.AddToScheme(scheme))
   174  	return scheme
   175  }
   176  
   177  func podBinding(pod *v1.Pod, node *v1.Node) *v1.Binding {
   178  	return &v1.Binding{
   179  		ObjectMeta: metav1.ObjectMeta{
   180  			Name:      pod.ObjectMeta.Name,
   181  			Namespace: pod.ObjectMeta.Namespace,
   182  		},
   183  		Target: v1.ObjectReference{
   184  			APIVersion: v1.SchemeGroupVersion.Version,
   185  			Kind:       "Node",
   186  			Name:       node.ObjectMeta.Name,
   187  		},
   188  	}
   189  }
   190  

View as plain text