1
16
17 package kubemark
18
19 import (
20 "context"
21 "fmt"
22 "math/rand"
23 "sync"
24 "time"
25
26 apiv1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/fields"
29 "k8s.io/apimachinery/pkg/labels"
30 "k8s.io/client-go/informers"
31 informersv1 "k8s.io/client-go/informers/core/v1"
32 kubeclient "k8s.io/client-go/kubernetes"
33 listersv1 "k8s.io/client-go/listers/core/v1"
34 "k8s.io/client-go/tools/cache"
35
36 "k8s.io/klog/v2"
37 )
38
39 const (
40 namespaceKubemark = "kubemark"
41 nodeGroupLabel = "autoscaling.k8s.io/nodegroup"
42 numRetries = 3
43 )
44
45
46
47
48 type KubemarkController struct {
49 nodeTemplate *apiv1.ReplicationController
50 externalCluster externalCluster
51 kubemarkCluster kubemarkCluster
52 rand *rand.Rand
53 createNodeQueue chan string
54 nodeGroupQueueSize map[string]int
55 nodeGroupQueueSizeLock sync.Mutex
56 }
57
58
59
60
61 type externalCluster struct {
62 rcLister listersv1.ReplicationControllerLister
63 rcSynced cache.InformerSynced
64 podLister listersv1.PodLister
65 podSynced cache.InformerSynced
66 client kubeclient.Interface
67 }
68
69
70
71
72
73 type kubemarkCluster struct {
74 client kubeclient.Interface
75 nodeLister listersv1.NodeLister
76 nodeSynced cache.InformerSynced
77 nodesToDelete map[string]bool
78 nodesToDeleteLock sync.Mutex
79 }
80
81
82
83 func NewKubemarkController(externalClient kubeclient.Interface, externalInformerFactory informers.SharedInformerFactory,
84 kubemarkClient kubeclient.Interface, kubemarkNodeInformer informersv1.NodeInformer) (*KubemarkController, error) {
85 rcInformer := externalInformerFactory.InformerFor(&apiv1.ReplicationController{}, newReplicationControllerInformer)
86 podInformer := externalInformerFactory.InformerFor(&apiv1.Pod{}, newPodInformer)
87 controller := &KubemarkController{
88 externalCluster: externalCluster{
89 rcLister: listersv1.NewReplicationControllerLister(rcInformer.GetIndexer()),
90 rcSynced: rcInformer.HasSynced,
91 podLister: listersv1.NewPodLister(podInformer.GetIndexer()),
92 podSynced: podInformer.HasSynced,
93 client: externalClient,
94 },
95 kubemarkCluster: kubemarkCluster{
96 nodeLister: kubemarkNodeInformer.Lister(),
97 nodeSynced: kubemarkNodeInformer.Informer().HasSynced,
98 client: kubemarkClient,
99 nodesToDelete: make(map[string]bool),
100 nodesToDeleteLock: sync.Mutex{},
101 },
102 rand: rand.New(rand.NewSource(time.Now().UnixNano())),
103 createNodeQueue: make(chan string, 1000),
104 nodeGroupQueueSize: make(map[string]int),
105 nodeGroupQueueSizeLock: sync.Mutex{},
106 }
107
108 kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
109 UpdateFunc: controller.kubemarkCluster.removeUnneededNodes,
110 })
111
112 return controller, nil
113 }
114
115
116 func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool {
117 return cache.WaitForNamedCacheSync("kubemark", stopCh,
118 kubemarkController.externalCluster.rcSynced,
119 kubemarkController.externalCluster.podSynced,
120 kubemarkController.kubemarkCluster.nodeSynced)
121 }
122
123
124
125 func (kubemarkController *KubemarkController) Run(stopCh chan struct{}) {
126 nodeTemplate, err := kubemarkController.getNodeTemplate()
127 if err != nil {
128 klog.Fatalf("failed to get node template: %s", err)
129 }
130 kubemarkController.nodeTemplate = nodeTemplate
131
132 go kubemarkController.runNodeCreation(stopCh)
133 <-stopCh
134 }
135
136
137 func (kubemarkController *KubemarkController) GetNodeNamesForNodeGroup(nodeGroup string) ([]string, error) {
138 selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup})
139 pods, err := kubemarkController.externalCluster.podLister.List(selector)
140 if err != nil {
141 return nil, err
142 }
143 result := make([]string, 0, len(pods))
144 for _, pod := range pods {
145 result = append(result, pod.ObjectMeta.Name)
146 }
147 return result, nil
148 }
149
150
151 func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) {
152 selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup}))
153 nodes, err := kubemarkController.externalCluster.rcLister.List(selector)
154 if err != nil {
155 return 0, err
156 }
157 return len(nodes), nil
158 }
159
160
161
162 func (kubemarkController *KubemarkController) GetNodeGroupTargetSize(nodeGroup string) (int, error) {
163 kubemarkController.nodeGroupQueueSizeLock.Lock()
164 defer kubemarkController.nodeGroupQueueSizeLock.Unlock()
165 realSize, err := kubemarkController.GetNodeGroupSize(nodeGroup)
166 if err != nil {
167 return realSize, err
168 }
169 return realSize + kubemarkController.nodeGroupQueueSize[nodeGroup], nil
170 }
171
172
173 func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error {
174 currSize, err := kubemarkController.GetNodeGroupTargetSize(nodeGroup)
175 if err != nil {
176 return err
177 }
178 switch delta := size - currSize; {
179 case delta < 0:
180 absDelta := -delta
181 nodes, err := kubemarkController.GetNodeNamesForNodeGroup(nodeGroup)
182 if err != nil {
183 return err
184 }
185 if len(nodes) < absDelta {
186 return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes: %d", absDelta, nodeGroup, len(nodes))
187 }
188 for i, node := range nodes {
189 if i == absDelta {
190 return nil
191 }
192 if err := kubemarkController.RemoveNodeFromNodeGroup(nodeGroup, node); err != nil {
193 return err
194 }
195 }
196 case delta > 0:
197 kubemarkController.nodeGroupQueueSizeLock.Lock()
198 kubemarkController.nodeGroupQueueSize[nodeGroup] += delta
199 kubemarkController.nodeGroupQueueSizeLock.Unlock()
200 for i := 0; i < delta; i++ {
201 kubemarkController.createNodeQueue <- nodeGroup
202 }
203 }
204
205 return nil
206 }
207
208
209
210 func (kubemarkController *KubemarkController) GetNodeGroupForNode(node string) (string, error) {
211 pod := kubemarkController.getPodByName(node)
212 if pod == nil {
213 return "", fmt.Errorf("node %s does not exist", node)
214 }
215 nodeGroup, ok := pod.ObjectMeta.Labels[nodeGroupLabel]
216 if ok {
217 return nodeGroup, nil
218 }
219 return "", fmt.Errorf("can't find nodegroup for node %s due to missing label %s", node, nodeGroupLabel)
220 }
221
222 func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error {
223 node := kubemarkController.nodeTemplate.DeepCopy()
224 node.Name = fmt.Sprintf("%s-%d", nodeGroup, kubemarkController.rand.Int63())
225 node.Labels = map[string]string{nodeGroupLabel: nodeGroup, "name": node.Name}
226 node.Spec.Template.Labels = node.Labels
227
228 var err error
229 for i := 0; i < numRetries; i++ {
230 _, err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(node.Namespace).Create(context.TODO(), node, metav1.CreateOptions{})
231 if err == nil {
232 return nil
233 }
234 }
235
236 return err
237 }
238
239 func (kubemarkController *KubemarkController) RemoveNodeFromNodeGroup(nodeGroup string, node string) error {
240 pod := kubemarkController.getPodByName(node)
241 if pod == nil {
242 klog.Warningf("Can't delete node %s from nodegroup %s. Node does not exist.", node, nodeGroup)
243 return nil
244 }
245 if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup {
246 return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup)
247 }
248 policy := metav1.DeletePropagationForeground
249 var err error
250 for i := 0; i < numRetries; i++ {
251 err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete(context.TODO(), pod.ObjectMeta.Labels["name"],
252 metav1.DeleteOptions{PropagationPolicy: &policy})
253 if err == nil {
254 klog.Infof("marking node %s for deletion", node)
255
256
257
258
259
260 kubemarkController.kubemarkCluster.markNodeForDeletion(node)
261 return nil
262 }
263 }
264 return fmt.Errorf("Failed to delete node %s: %v", node, err)
265 }
266
267 func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController {
268 rcs, err := kubemarkController.externalCluster.rcLister.List(labels.Everything())
269 if err != nil {
270 return nil
271 }
272 for _, rc := range rcs {
273 if rc.ObjectMeta.Name == name {
274 return rc
275 }
276 }
277 return nil
278 }
279
280 func (kubemarkController *KubemarkController) getPodByName(name string) *apiv1.Pod {
281 pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
282 if err != nil {
283 return nil
284 }
285 for _, pod := range pods {
286 if pod.ObjectMeta.Name == name {
287 return pod
288 }
289 }
290 return nil
291 }
292
293 func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) {
294 pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
295 if err != nil {
296 return "", err
297 }
298 for _, pod := range pods {
299 if pod.ObjectMeta.Name == podName {
300 return pod.Labels["name"], nil
301 }
302 }
303 return "", fmt.Errorf("pod %s not found", podName)
304 }
305
306
307
308
309 func (kubemarkController *KubemarkController) getNodeTemplate() (*apiv1.ReplicationController, error) {
310 podName, err := kubemarkController.kubemarkCluster.getHollowNodeName()
311 if err != nil {
312 return nil, err
313 }
314 hollowNodeName, err := kubemarkController.getNodeNameForPod(podName)
315 if err != nil {
316 return nil, err
317 }
318 if hollowNode := kubemarkController.getReplicationControllerByName(hollowNodeName); hollowNode != nil {
319 nodeTemplate := &apiv1.ReplicationController{
320 Spec: apiv1.ReplicationControllerSpec{
321 Template: hollowNode.Spec.Template,
322 },
323 }
324
325 nodeTemplate.Spec.Selector = nil
326 nodeTemplate.Namespace = namespaceKubemark
327 one := int32(1)
328 nodeTemplate.Spec.Replicas = &one
329
330 return nodeTemplate, nil
331 }
332 return nil, fmt.Errorf("can't get hollow node template")
333 }
334
335 func (kubemarkController *KubemarkController) runNodeCreation(stop <-chan struct{}) {
336 for {
337 select {
338 case nodeGroup := <-kubemarkController.createNodeQueue:
339 kubemarkController.nodeGroupQueueSizeLock.Lock()
340 err := kubemarkController.addNodeToNodeGroup(nodeGroup)
341 if err != nil {
342 klog.Errorf("failed to add node to node group %s: %v", nodeGroup, err)
343 } else {
344 kubemarkController.nodeGroupQueueSize[nodeGroup]--
345 }
346 kubemarkController.nodeGroupQueueSizeLock.Unlock()
347 case <-stop:
348 return
349 }
350 }
351 }
352
353 func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) {
354 selector, _ := labels.Parse(nodeGroupLabel)
355 nodes, err := kubemarkCluster.nodeLister.List(selector)
356 if err != nil {
357 return "", err
358 }
359 for _, node := range nodes {
360 return node.Name, nil
361 }
362 return "", fmt.Errorf("did not find any hollow nodes in the cluster")
363 }
364
365 func (kubemarkCluster *kubemarkCluster) removeUnneededNodes(oldObj interface{}, newObj interface{}) {
366 node, ok := newObj.(*apiv1.Node)
367 if !ok {
368 return
369 }
370 for _, condition := range node.Status.Conditions {
371
372
373 if condition.Type == apiv1.NodeReady && condition.Status != apiv1.ConditionTrue {
374 kubemarkCluster.nodesToDeleteLock.Lock()
375 defer kubemarkCluster.nodesToDeleteLock.Unlock()
376 if kubemarkCluster.nodesToDelete[node.Name] {
377 kubemarkCluster.nodesToDelete[node.Name] = false
378 if err := kubemarkCluster.client.CoreV1().Nodes().Delete(context.TODO(), node.Name, metav1.DeleteOptions{}); err != nil {
379 klog.Errorf("failed to delete node %s from kubemark cluster, err: %v", node.Name, err)
380 }
381 }
382 return
383 }
384 }
385 }
386
387 func (kubemarkCluster *kubemarkCluster) markNodeForDeletion(name string) {
388 kubemarkCluster.nodesToDeleteLock.Lock()
389 defer kubemarkCluster.nodesToDeleteLock.Unlock()
390 kubemarkCluster.nodesToDelete[name] = true
391 }
392
393 func newReplicationControllerInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
394 rcListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "replicationcontrollers", namespaceKubemark, fields.Everything())
395 return cache.NewSharedIndexInformer(rcListWatch, &apiv1.ReplicationController{}, resyncPeriod, nil)
396 }
397
398 func newPodInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
399 podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespaceKubemark, fields.Everything())
400 return cache.NewSharedIndexInformer(podListWatch, &apiv1.Pod{}, resyncPeriod, nil)
401 }
402
View as plain text