...

Source file src/k8s.io/kubernetes/pkg/kubemark/controller.go

Documentation: k8s.io/kubernetes/pkg/kubemark

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubemark
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"math/rand"
    23  	"sync"
    24  	"time"
    25  
    26  	apiv1 "k8s.io/api/core/v1"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/fields"
    29  	"k8s.io/apimachinery/pkg/labels"
    30  	"k8s.io/client-go/informers"
    31  	informersv1 "k8s.io/client-go/informers/core/v1"
    32  	kubeclient "k8s.io/client-go/kubernetes"
    33  	listersv1 "k8s.io/client-go/listers/core/v1"
    34  	"k8s.io/client-go/tools/cache"
    35  
    36  	"k8s.io/klog/v2"
    37  )
    38  
    39  const (
    40  	namespaceKubemark = "kubemark"
    41  	nodeGroupLabel    = "autoscaling.k8s.io/nodegroup"
    42  	numRetries        = 3
    43  )
    44  
    45  // KubemarkController is a simplified version of cloud provider for kubemark. It allows
    46  // to add and delete nodes from a kubemark cluster and introduces nodegroups
    47  // by applying labels to the kubemark's hollow-nodes.
    48  type KubemarkController struct {
    49  	nodeTemplate           *apiv1.ReplicationController
    50  	externalCluster        externalCluster
    51  	kubemarkCluster        kubemarkCluster
    52  	rand                   *rand.Rand
    53  	createNodeQueue        chan string
    54  	nodeGroupQueueSize     map[string]int
    55  	nodeGroupQueueSizeLock sync.Mutex
    56  }
    57  
    58  // externalCluster is used to communicate with the external cluster that hosts
    59  // kubemark, in order to be able to list, create and delete hollow nodes
    60  // by manipulating the replication controllers.
    61  type externalCluster struct {
    62  	rcLister  listersv1.ReplicationControllerLister
    63  	rcSynced  cache.InformerSynced
    64  	podLister listersv1.PodLister
    65  	podSynced cache.InformerSynced
    66  	client    kubeclient.Interface
    67  }
    68  
    69  // kubemarkCluster is used to delete nodes from kubemark cluster once their
    70  // respective replication controllers have been deleted and the nodes have
    71  // become unready. This is to cover for the fact that there is no proper cloud
    72  // provider for kubemark that would care for deleting the nodes.
    73  type kubemarkCluster struct {
    74  	client            kubeclient.Interface
    75  	nodeLister        listersv1.NodeLister
    76  	nodeSynced        cache.InformerSynced
    77  	nodesToDelete     map[string]bool
    78  	nodesToDeleteLock sync.Mutex
    79  }
    80  
    81  // NewKubemarkController creates KubemarkController using the provided clients to talk to external
    82  // and kubemark clusters.
    83  func NewKubemarkController(externalClient kubeclient.Interface, externalInformerFactory informers.SharedInformerFactory,
    84  	kubemarkClient kubeclient.Interface, kubemarkNodeInformer informersv1.NodeInformer) (*KubemarkController, error) {
    85  	rcInformer := externalInformerFactory.InformerFor(&apiv1.ReplicationController{}, newReplicationControllerInformer)
    86  	podInformer := externalInformerFactory.InformerFor(&apiv1.Pod{}, newPodInformer)
    87  	controller := &KubemarkController{
    88  		externalCluster: externalCluster{
    89  			rcLister:  listersv1.NewReplicationControllerLister(rcInformer.GetIndexer()),
    90  			rcSynced:  rcInformer.HasSynced,
    91  			podLister: listersv1.NewPodLister(podInformer.GetIndexer()),
    92  			podSynced: podInformer.HasSynced,
    93  			client:    externalClient,
    94  		},
    95  		kubemarkCluster: kubemarkCluster{
    96  			nodeLister:        kubemarkNodeInformer.Lister(),
    97  			nodeSynced:        kubemarkNodeInformer.Informer().HasSynced,
    98  			client:            kubemarkClient,
    99  			nodesToDelete:     make(map[string]bool),
   100  			nodesToDeleteLock: sync.Mutex{},
   101  		},
   102  		rand:                   rand.New(rand.NewSource(time.Now().UnixNano())),
   103  		createNodeQueue:        make(chan string, 1000),
   104  		nodeGroupQueueSize:     make(map[string]int),
   105  		nodeGroupQueueSizeLock: sync.Mutex{},
   106  	}
   107  
   108  	kubemarkNodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   109  		UpdateFunc: controller.kubemarkCluster.removeUnneededNodes,
   110  	})
   111  
   112  	return controller, nil
   113  }
   114  
   115  // WaitForCacheSync waits until all caches in the controller are populated.
   116  func (kubemarkController *KubemarkController) WaitForCacheSync(stopCh chan struct{}) bool {
   117  	return cache.WaitForNamedCacheSync("kubemark", stopCh,
   118  		kubemarkController.externalCluster.rcSynced,
   119  		kubemarkController.externalCluster.podSynced,
   120  		kubemarkController.kubemarkCluster.nodeSynced)
   121  }
   122  
   123  // Run populates the node template needed for creation of kubemark nodes and
   124  // starts the worker routine for creating new nodes.
   125  func (kubemarkController *KubemarkController) Run(stopCh chan struct{}) {
   126  	nodeTemplate, err := kubemarkController.getNodeTemplate()
   127  	if err != nil {
   128  		klog.Fatalf("failed to get node template: %s", err)
   129  	}
   130  	kubemarkController.nodeTemplate = nodeTemplate
   131  
   132  	go kubemarkController.runNodeCreation(stopCh)
   133  	<-stopCh
   134  }
   135  
   136  // GetNodeNamesForNodeGroup returns list of the nodes in the node group.
   137  func (kubemarkController *KubemarkController) GetNodeNamesForNodeGroup(nodeGroup string) ([]string, error) {
   138  	selector := labels.SelectorFromSet(labels.Set{nodeGroupLabel: nodeGroup})
   139  	pods, err := kubemarkController.externalCluster.podLister.List(selector)
   140  	if err != nil {
   141  		return nil, err
   142  	}
   143  	result := make([]string, 0, len(pods))
   144  	for _, pod := range pods {
   145  		result = append(result, pod.ObjectMeta.Name)
   146  	}
   147  	return result, nil
   148  }
   149  
   150  // GetNodeGroupSize returns the current size for the node group as observed.
   151  func (kubemarkController *KubemarkController) GetNodeGroupSize(nodeGroup string) (int, error) {
   152  	selector := labels.SelectorFromSet(labels.Set(map[string]string{nodeGroupLabel: nodeGroup}))
   153  	nodes, err := kubemarkController.externalCluster.rcLister.List(selector)
   154  	if err != nil {
   155  		return 0, err
   156  	}
   157  	return len(nodes), nil
   158  }
   159  
   160  // GetNodeGroupTargetSize returns the size of the node group as a sum of current
   161  // observed size and number of upcoming nodes.
   162  func (kubemarkController *KubemarkController) GetNodeGroupTargetSize(nodeGroup string) (int, error) {
   163  	kubemarkController.nodeGroupQueueSizeLock.Lock()
   164  	defer kubemarkController.nodeGroupQueueSizeLock.Unlock()
   165  	realSize, err := kubemarkController.GetNodeGroupSize(nodeGroup)
   166  	if err != nil {
   167  		return realSize, err
   168  	}
   169  	return realSize + kubemarkController.nodeGroupQueueSize[nodeGroup], nil
   170  }
   171  
   172  // SetNodeGroupSize changes the size of node group by adding or removing nodes.
   173  func (kubemarkController *KubemarkController) SetNodeGroupSize(nodeGroup string, size int) error {
   174  	currSize, err := kubemarkController.GetNodeGroupTargetSize(nodeGroup)
   175  	if err != nil {
   176  		return err
   177  	}
   178  	switch delta := size - currSize; {
   179  	case delta < 0:
   180  		absDelta := -delta
   181  		nodes, err := kubemarkController.GetNodeNamesForNodeGroup(nodeGroup)
   182  		if err != nil {
   183  			return err
   184  		}
   185  		if len(nodes) < absDelta {
   186  			return fmt.Errorf("can't remove %d nodes from %s nodegroup, not enough nodes: %d", absDelta, nodeGroup, len(nodes))
   187  		}
   188  		for i, node := range nodes {
   189  			if i == absDelta {
   190  				return nil
   191  			}
   192  			if err := kubemarkController.RemoveNodeFromNodeGroup(nodeGroup, node); err != nil {
   193  				return err
   194  			}
   195  		}
   196  	case delta > 0:
   197  		kubemarkController.nodeGroupQueueSizeLock.Lock()
   198  		kubemarkController.nodeGroupQueueSize[nodeGroup] += delta
   199  		kubemarkController.nodeGroupQueueSizeLock.Unlock()
   200  		for i := 0; i < delta; i++ {
   201  			kubemarkController.createNodeQueue <- nodeGroup
   202  		}
   203  	}
   204  
   205  	return nil
   206  }
   207  
   208  // GetNodeGroupForNode returns the name of the node group to which the node
   209  // belongs.
   210  func (kubemarkController *KubemarkController) GetNodeGroupForNode(node string) (string, error) {
   211  	pod := kubemarkController.getPodByName(node)
   212  	if pod == nil {
   213  		return "", fmt.Errorf("node %s does not exist", node)
   214  	}
   215  	nodeGroup, ok := pod.ObjectMeta.Labels[nodeGroupLabel]
   216  	if ok {
   217  		return nodeGroup, nil
   218  	}
   219  	return "", fmt.Errorf("can't find nodegroup for node %s due to missing label %s", node, nodeGroupLabel)
   220  }
   221  
   222  func (kubemarkController *KubemarkController) addNodeToNodeGroup(nodeGroup string) error {
   223  	node := kubemarkController.nodeTemplate.DeepCopy()
   224  	node.Name = fmt.Sprintf("%s-%d", nodeGroup, kubemarkController.rand.Int63())
   225  	node.Labels = map[string]string{nodeGroupLabel: nodeGroup, "name": node.Name}
   226  	node.Spec.Template.Labels = node.Labels
   227  
   228  	var err error
   229  	for i := 0; i < numRetries; i++ {
   230  		_, err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(node.Namespace).Create(context.TODO(), node, metav1.CreateOptions{})
   231  		if err == nil {
   232  			return nil
   233  		}
   234  	}
   235  
   236  	return err
   237  }
   238  
   239  func (kubemarkController *KubemarkController) RemoveNodeFromNodeGroup(nodeGroup string, node string) error {
   240  	pod := kubemarkController.getPodByName(node)
   241  	if pod == nil {
   242  		klog.Warningf("Can't delete node %s from nodegroup %s. Node does not exist.", node, nodeGroup)
   243  		return nil
   244  	}
   245  	if pod.ObjectMeta.Labels[nodeGroupLabel] != nodeGroup {
   246  		return fmt.Errorf("can't delete node %s from nodegroup %s. Node is not in nodegroup", node, nodeGroup)
   247  	}
   248  	policy := metav1.DeletePropagationForeground
   249  	var err error
   250  	for i := 0; i < numRetries; i++ {
   251  		err = kubemarkController.externalCluster.client.CoreV1().ReplicationControllers(namespaceKubemark).Delete(context.TODO(), pod.ObjectMeta.Labels["name"],
   252  			metav1.DeleteOptions{PropagationPolicy: &policy})
   253  		if err == nil {
   254  			klog.Infof("marking node %s for deletion", node)
   255  			// Mark node for deletion from kubemark cluster.
   256  			// Once it becomes unready after replication controller
   257  			// deletion has been noticed, we will delete it explicitly.
   258  			// This is to cover for the fact that kubemark does not
   259  			// take care of this itself.
   260  			kubemarkController.kubemarkCluster.markNodeForDeletion(node)
   261  			return nil
   262  		}
   263  	}
   264  	return fmt.Errorf("Failed to delete node %s: %v", node, err)
   265  }
   266  
   267  func (kubemarkController *KubemarkController) getReplicationControllerByName(name string) *apiv1.ReplicationController {
   268  	rcs, err := kubemarkController.externalCluster.rcLister.List(labels.Everything())
   269  	if err != nil {
   270  		return nil
   271  	}
   272  	for _, rc := range rcs {
   273  		if rc.ObjectMeta.Name == name {
   274  			return rc
   275  		}
   276  	}
   277  	return nil
   278  }
   279  
   280  func (kubemarkController *KubemarkController) getPodByName(name string) *apiv1.Pod {
   281  	pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
   282  	if err != nil {
   283  		return nil
   284  	}
   285  	for _, pod := range pods {
   286  		if pod.ObjectMeta.Name == name {
   287  			return pod
   288  		}
   289  	}
   290  	return nil
   291  }
   292  
   293  func (kubemarkController *KubemarkController) getNodeNameForPod(podName string) (string, error) {
   294  	pods, err := kubemarkController.externalCluster.podLister.List(labels.Everything())
   295  	if err != nil {
   296  		return "", err
   297  	}
   298  	for _, pod := range pods {
   299  		if pod.ObjectMeta.Name == podName {
   300  			return pod.Labels["name"], nil
   301  		}
   302  	}
   303  	return "", fmt.Errorf("pod %s not found", podName)
   304  }
   305  
   306  // getNodeTemplate returns the template for hollow node replication controllers
   307  // by looking for an existing hollow node specification. This requires at least
   308  // one kubemark node to be present on startup.
   309  func (kubemarkController *KubemarkController) getNodeTemplate() (*apiv1.ReplicationController, error) {
   310  	podName, err := kubemarkController.kubemarkCluster.getHollowNodeName()
   311  	if err != nil {
   312  		return nil, err
   313  	}
   314  	hollowNodeName, err := kubemarkController.getNodeNameForPod(podName)
   315  	if err != nil {
   316  		return nil, err
   317  	}
   318  	if hollowNode := kubemarkController.getReplicationControllerByName(hollowNodeName); hollowNode != nil {
   319  		nodeTemplate := &apiv1.ReplicationController{
   320  			Spec: apiv1.ReplicationControllerSpec{
   321  				Template: hollowNode.Spec.Template,
   322  			},
   323  		}
   324  
   325  		nodeTemplate.Spec.Selector = nil
   326  		nodeTemplate.Namespace = namespaceKubemark
   327  		one := int32(1)
   328  		nodeTemplate.Spec.Replicas = &one
   329  
   330  		return nodeTemplate, nil
   331  	}
   332  	return nil, fmt.Errorf("can't get hollow node template")
   333  }
   334  
   335  func (kubemarkController *KubemarkController) runNodeCreation(stop <-chan struct{}) {
   336  	for {
   337  		select {
   338  		case nodeGroup := <-kubemarkController.createNodeQueue:
   339  			kubemarkController.nodeGroupQueueSizeLock.Lock()
   340  			err := kubemarkController.addNodeToNodeGroup(nodeGroup)
   341  			if err != nil {
   342  				klog.Errorf("failed to add node to node group %s: %v", nodeGroup, err)
   343  			} else {
   344  				kubemarkController.nodeGroupQueueSize[nodeGroup]--
   345  			}
   346  			kubemarkController.nodeGroupQueueSizeLock.Unlock()
   347  		case <-stop:
   348  			return
   349  		}
   350  	}
   351  }
   352  
   353  func (kubemarkCluster *kubemarkCluster) getHollowNodeName() (string, error) {
   354  	selector, _ := labels.Parse(nodeGroupLabel)
   355  	nodes, err := kubemarkCluster.nodeLister.List(selector)
   356  	if err != nil {
   357  		return "", err
   358  	}
   359  	for _, node := range nodes {
   360  		return node.Name, nil
   361  	}
   362  	return "", fmt.Errorf("did not find any hollow nodes in the cluster")
   363  }
   364  
   365  func (kubemarkCluster *kubemarkCluster) removeUnneededNodes(oldObj interface{}, newObj interface{}) {
   366  	node, ok := newObj.(*apiv1.Node)
   367  	if !ok {
   368  		return
   369  	}
   370  	for _, condition := range node.Status.Conditions {
   371  		// Delete node if it is in unready state, and it has been
   372  		// explicitly marked for deletion.
   373  		if condition.Type == apiv1.NodeReady && condition.Status != apiv1.ConditionTrue {
   374  			kubemarkCluster.nodesToDeleteLock.Lock()
   375  			defer kubemarkCluster.nodesToDeleteLock.Unlock()
   376  			if kubemarkCluster.nodesToDelete[node.Name] {
   377  				kubemarkCluster.nodesToDelete[node.Name] = false
   378  				if err := kubemarkCluster.client.CoreV1().Nodes().Delete(context.TODO(), node.Name, metav1.DeleteOptions{}); err != nil {
   379  					klog.Errorf("failed to delete node %s from kubemark cluster, err: %v", node.Name, err)
   380  				}
   381  			}
   382  			return
   383  		}
   384  	}
   385  }
   386  
   387  func (kubemarkCluster *kubemarkCluster) markNodeForDeletion(name string) {
   388  	kubemarkCluster.nodesToDeleteLock.Lock()
   389  	defer kubemarkCluster.nodesToDeleteLock.Unlock()
   390  	kubemarkCluster.nodesToDelete[name] = true
   391  }
   392  
   393  func newReplicationControllerInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
   394  	rcListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "replicationcontrollers", namespaceKubemark, fields.Everything())
   395  	return cache.NewSharedIndexInformer(rcListWatch, &apiv1.ReplicationController{}, resyncPeriod, nil)
   396  }
   397  
   398  func newPodInformer(kubeClient kubeclient.Interface, resyncPeriod time.Duration) cache.SharedIndexInformer {
   399  	podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespaceKubemark, fields.Everything())
   400  	return cache.NewSharedIndexInformer(podListWatch, &apiv1.Pod{}, resyncPeriod, nil)
   401  }
   402  

View as plain text