1 package scheduler
2
3 import (
4 "context"
5 "fmt"
6 "time"
7
8 "github.com/hashicorp/go-multierror"
9 "k8s.io/client-go/kubernetes"
10 "k8s.io/client-go/rest"
11 ctrl "sigs.k8s.io/controller-runtime"
12 "sigs.k8s.io/controller-runtime/pkg/client"
13 cruntimeconfig "sigs.k8s.io/controller-runtime/pkg/client/config"
14 kmanager "sigs.k8s.io/controller-runtime/pkg/manager"
15
16 "edge-infra.dev/pkg/k8s/runtime/controller"
17
18 "edge-infra.dev/pkg/sds/lanoutage/scheduler/internal/filters"
19
20 v1 "k8s.io/api/core/v1"
21 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
22 kruntime "k8s.io/apimachinery/pkg/runtime"
23 "k8s.io/apimachinery/pkg/types"
24 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
25 clientgoscheme "k8s.io/client-go/kubernetes/scheme"
26
27 kctl "edge-infra.dev/pkg/k8s/runtime/controller"
28
29 "edge-infra.dev/pkg/lib/fog"
30 )
31
32 var (
33 ControllerName = "lanoutagescheduler"
34 RequeueTime = time.Second * 20
35 )
36
37 type Controller struct {
38 client.Client
39 *kubernetes.Clientset
40 hostname string
41 }
42
43 func Run(hostname string) error {
44 ctrl.SetLogger(fog.New().WithName("scheduler"))
45
46 mgr, err := CreateControllerManager()
47 if err != nil {
48 return fmt.Errorf("error creating controller manager: %v", err)
49 }
50
51 config, err := cruntimeconfig.GetConfig()
52 if err != nil {
53 return err
54 }
55
56 if err := RegisterController(mgr, config, hostname); err != nil {
57 return err
58 }
59
60 return mgr.Start(ctrl.SetupSignalHandler())
61 }
62
63 func RegisterController(mgr kmanager.Manager, cfg *rest.Config, hostname string) error {
64 c := Controller{}
65 c.Client = mgr.GetClient()
66 clientset, err := kubernetes.NewForConfig(cfg)
67 if err != nil {
68 return err
69 }
70 c.Clientset = clientset
71 c.hostname = hostname
72 return c.SetupWithManager(mgr)
73 }
74
75 func (c *Controller) SetupWithManager(mgr kmanager.Manager) error {
76 return ctrl.NewControllerManagedBy(mgr).
77 For(&v1.Pod{}).
78 Complete(c)
79 }
80
81 func (c *Controller) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
82 log := ctrl.LoggerFrom(ctx)
83
84 pod := &v1.Pod{}
85 err := c.Client.Get(ctx, req.NamespacedName, pod)
86
87
88 if client.IgnoreNotFound(err) != nil {
89 log.Error(err, "error occurred fetching the pod")
90 return ctrl.Result{RequeueAfter: RequeueTime}, nil
91 }
92
93
94 if err != nil {
95 return ctrl.Result{}, nil
96 }
97
98
99 node, err := c.node(ctx)
100 if err != nil {
101 log.Error(err, "error occurred fetching the node")
102 return ctrl.Result{RequeueAfter: RequeueTime}, nil
103 }
104
105
106 for funcName, predicateFunc := range filters.PodPredicateFuncs {
107 if schedulable, _ := predicateFunc(pod, &v1.Node{}); !schedulable {
108 log.V(1).Info("pod was ignored due to predicate function", "function", funcName)
109 return ctrl.Result{}, nil
110 }
111 }
112
113 scheduledNode, err := c.findSchedulableNode(pod, node)
114 if scheduledNode == nil {
115 log.Error(err, "failed to schedule pod to node")
116 return ctrl.Result{RequeueAfter: RequeueTime}, nil
117 }
118
119 if err := c.schedulePod(ctx, pod, scheduledNode); err != nil {
120 log.Error(err, "failed to schedule pod to node", "node", scheduledNode.ObjectMeta.Name)
121 return ctrl.Result{RequeueAfter: RequeueTime}, nil
122 }
123
124 log.Info("pod was scheduled to node", "node", scheduledNode.ObjectMeta.Name)
125
126 return ctrl.Result{}, nil
127 }
128
129 func (c *Controller) findSchedulableNode(pod *v1.Pod, node *v1.Node) (*v1.Node, error) {
130 var schedulingErrs error
131 for _, predicateFunc := range filters.NodePredicateFuncs {
132 schedulable, err := predicateFunc(pod, node)
133 if err != nil {
134 schedulingErrs = multierror.Append(schedulingErrs, err)
135 continue
136 }
137 if schedulable {
138 return node, err
139 }
140 }
141 return nil, schedulingErrs
142 }
143
144
145
146 func (c *Controller) schedulePod(ctx context.Context, pod *v1.Pod, node *v1.Node) error {
147 binding := podBinding(pod, node)
148 return c.Clientset.CoreV1().Pods(pod.ObjectMeta.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
149 }
150
151 func (c *Controller) node(ctx context.Context) (*v1.Node, error) {
152 node := &v1.Node{}
153 if err := c.Client.Get(ctx, types.NamespacedName{Name: c.hostname}, node); err != nil {
154 return &v1.Node{}, err
155 }
156 return node, nil
157 }
158
159 func CreateControllerManager(options ...controller.Option) (ctrl.Manager, error) {
160 ctlCfg, opts := kctl.ProcessOptions(options...)
161 opts.Scheme = createScheme()
162
163 mgr, err := ctrl.NewManager(ctlCfg, opts)
164 if err != nil {
165 return nil, fmt.Errorf("failed to create controller manager: %v", err)
166 }
167
168 return mgr, nil
169 }
170
171 func createScheme() *kruntime.Scheme {
172 scheme := kruntime.NewScheme()
173 utilruntime.Must(clientgoscheme.AddToScheme(scheme))
174 return scheme
175 }
176
177 func podBinding(pod *v1.Pod, node *v1.Node) *v1.Binding {
178 return &v1.Binding{
179 ObjectMeta: metav1.ObjectMeta{
180 Name: pod.ObjectMeta.Name,
181 Namespace: pod.ObjectMeta.Namespace,
182 },
183 Target: v1.ObjectReference{
184 APIVersion: v1.SchemeGroupVersion.Version,
185 Kind: "Node",
186 Name: node.ObjectMeta.Name,
187 },
188 }
189 }
190
View as plain text