...

Source file src/k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cloud_cidr_allocator.go

Documentation: k8s.io/kubernetes/pkg/controller/nodeipam/ipam

     1  //go:build !providerless
     2  // +build !providerless
     3  
     4  /*
     5  Copyright 2016 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package ipam
    21  
    22  import (
    23  	"context"
    24  	"fmt"
    25  	"math/rand"
    26  	"net"
    27  	"sync"
    28  	"time"
    29  
    30  	"github.com/google/go-cmp/cmp"
    31  
    32  	"k8s.io/klog/v2"
    33  
    34  	v1 "k8s.io/api/core/v1"
    35  	"k8s.io/apimachinery/pkg/api/errors"
    36  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    37  	"k8s.io/apimachinery/pkg/types"
    38  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    39  	informers "k8s.io/client-go/informers/core/v1"
    40  	corelisters "k8s.io/client-go/listers/core/v1"
    41  	"k8s.io/client-go/tools/cache"
    42  	"k8s.io/client-go/tools/record"
    43  
    44  	clientset "k8s.io/client-go/kubernetes"
    45  	"k8s.io/client-go/kubernetes/scheme"
    46  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    47  	cloudprovider "k8s.io/cloud-provider"
    48  	nodeutil "k8s.io/component-helpers/node/util"
    49  	controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
    50  	utiltaints "k8s.io/kubernetes/pkg/util/taints"
    51  	"k8s.io/legacy-cloud-providers/gce"
    52  	netutils "k8s.io/utils/net"
    53  )
    54  
    55  // nodeProcessingInfo tracks information related to current nodes in processing
    56  type nodeProcessingInfo struct {
    57  	retries int
    58  }
    59  
    60  // cloudCIDRAllocator allocates node CIDRs according to IP address aliases
    61  // assigned by the cloud provider. In this case, the allocation and
    62  // deallocation is delegated to the external provider, and the controller
    63  // merely takes the assignment and updates the node spec.
    64  type cloudCIDRAllocator struct {
    65  	client clientset.Interface
    66  	cloud  *gce.Cloud
    67  
    68  	// nodeLister is able to list/get nodes and is populated by the shared informer passed to
    69  	// NewCloudCIDRAllocator.
    70  	nodeLister corelisters.NodeLister
    71  	// nodesSynced returns true if the node shared informer has been synced at least once.
    72  	nodesSynced cache.InformerSynced
    73  
    74  	// Channel that is used to pass updating Nodes to the background.
    75  	// This increases the throughput of CIDR assignment by parallelization
    76  	// and not blocking on long operations (which shouldn't be done from
    77  	// event handlers anyway).
    78  	nodeUpdateChannel chan string
    79  	broadcaster       record.EventBroadcaster
    80  	recorder          record.EventRecorder
    81  
    82  	// Keep a set of nodes that are currectly being processed to avoid races in CIDR allocation
    83  	lock              sync.Mutex
    84  	nodesInProcessing map[string]*nodeProcessingInfo
    85  }
    86  
    87  var _ CIDRAllocator = (*cloudCIDRAllocator)(nil)
    88  
    89  // NewCloudCIDRAllocator creates a new cloud CIDR allocator.
    90  func NewCloudCIDRAllocator(ctx context.Context, client clientset.Interface, cloud cloudprovider.Interface, nodeInformer informers.NodeInformer) (CIDRAllocator, error) {
    91  	logger := klog.FromContext(ctx)
    92  	if client == nil {
    93  		logger.Error(nil, "kubeClient is nil when starting cloud CIDR allocator")
    94  		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
    95  	}
    96  
    97  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
    98  	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
    99  
   100  	gceCloud, ok := cloud.(*gce.Cloud)
   101  	if !ok {
   102  		err := fmt.Errorf("cloudCIDRAllocator does not support %v provider", cloud.ProviderName())
   103  		return nil, err
   104  	}
   105  
   106  	ca := &cloudCIDRAllocator{
   107  		client:            client,
   108  		cloud:             gceCloud,
   109  		nodeLister:        nodeInformer.Lister(),
   110  		nodesSynced:       nodeInformer.Informer().HasSynced,
   111  		nodeUpdateChannel: make(chan string, cidrUpdateQueueSize),
   112  		broadcaster:       eventBroadcaster,
   113  		recorder:          recorder,
   114  		nodesInProcessing: map[string]*nodeProcessingInfo{},
   115  	}
   116  
   117  	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   118  		AddFunc: controllerutil.CreateAddNodeHandler(
   119  			func(node *v1.Node) error {
   120  				return ca.AllocateOrOccupyCIDR(logger, node)
   121  			}),
   122  		UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
   123  			if newNode.Spec.PodCIDR == "" {
   124  				return ca.AllocateOrOccupyCIDR(logger, newNode)
   125  			}
   126  			// Even if PodCIDR is assigned, but NetworkUnavailable condition is
   127  			// set to true, we need to process the node to set the condition.
   128  			networkUnavailableTaint := &v1.Taint{Key: v1.TaintNodeNetworkUnavailable, Effect: v1.TaintEffectNoSchedule}
   129  			_, cond := controllerutil.GetNodeCondition(&newNode.Status, v1.NodeNetworkUnavailable)
   130  			if cond == nil || cond.Status != v1.ConditionFalse || utiltaints.TaintExists(newNode.Spec.Taints, networkUnavailableTaint) {
   131  				return ca.AllocateOrOccupyCIDR(logger, newNode)
   132  			}
   133  			return nil
   134  		}),
   135  		DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
   136  			return ca.ReleaseCIDR(logger, node)
   137  		}),
   138  	})
   139  	logger.Info("Using cloud CIDR allocator", "provider", cloud.ProviderName())
   140  	return ca, nil
   141  }
   142  
   143  func (ca *cloudCIDRAllocator) Run(ctx context.Context) {
   144  	defer utilruntime.HandleCrash()
   145  
   146  	// Start event processing pipeline.
   147  	ca.broadcaster.StartStructuredLogging(3)
   148  	logger := klog.FromContext(ctx)
   149  	logger.Info("Sending events to api server")
   150  	ca.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: ca.client.CoreV1().Events("")})
   151  	defer ca.broadcaster.Shutdown()
   152  
   153  	logger.Info("Starting cloud CIDR allocator")
   154  	defer logger.Info("Shutting down cloud CIDR allocator")
   155  
   156  	if !cache.WaitForNamedCacheSync("cidrallocator", ctx.Done(), ca.nodesSynced) {
   157  		return
   158  	}
   159  
   160  	for i := 0; i < cidrUpdateWorkers; i++ {
   161  		go ca.worker(ctx)
   162  	}
   163  
   164  	<-ctx.Done()
   165  }
   166  
   167  func (ca *cloudCIDRAllocator) worker(ctx context.Context) {
   168  	logger := klog.FromContext(ctx)
   169  	for {
   170  		select {
   171  		case workItem, ok := <-ca.nodeUpdateChannel:
   172  			if !ok {
   173  				logger.Info("Channel nodeCIDRUpdateChannel was unexpectedly closed")
   174  				return
   175  			}
   176  			if err := ca.updateCIDRAllocation(logger, workItem); err == nil {
   177  				logger.V(3).Info("Updated CIDR", "workItem", workItem)
   178  			} else {
   179  				logger.Error(err, "Error updating CIDR", "workItem", workItem)
   180  				if canRetry, timeout := ca.retryParams(logger, workItem); canRetry {
   181  					logger.V(2).Info("Retrying update on next period", "workItem", workItem, "timeout", timeout)
   182  					time.AfterFunc(timeout, func() {
   183  						// Requeue the failed node for update again.
   184  						ca.nodeUpdateChannel <- workItem
   185  					})
   186  					continue
   187  				}
   188  				logger.Error(nil, "Exceeded retry count, dropping from queue", "workItem", workItem)
   189  			}
   190  			ca.removeNodeFromProcessing(workItem)
   191  		case <-ctx.Done():
   192  			return
   193  		}
   194  	}
   195  }
   196  
   197  func (ca *cloudCIDRAllocator) insertNodeToProcessing(nodeName string) bool {
   198  	ca.lock.Lock()
   199  	defer ca.lock.Unlock()
   200  	if _, found := ca.nodesInProcessing[nodeName]; found {
   201  		return false
   202  	}
   203  	ca.nodesInProcessing[nodeName] = &nodeProcessingInfo{}
   204  	return true
   205  }
   206  
   207  func (ca *cloudCIDRAllocator) retryParams(logger klog.Logger, nodeName string) (bool, time.Duration) {
   208  	ca.lock.Lock()
   209  	defer ca.lock.Unlock()
   210  
   211  	entry, ok := ca.nodesInProcessing[nodeName]
   212  	if !ok {
   213  		logger.Error(nil, "Cannot get retryParams for node as entry does not exist", "node", klog.KRef("", nodeName))
   214  		return false, 0
   215  	}
   216  
   217  	count := entry.retries + 1
   218  	if count > updateMaxRetries {
   219  		return false, 0
   220  	}
   221  	ca.nodesInProcessing[nodeName].retries = count
   222  
   223  	return true, nodeUpdateRetryTimeout(count)
   224  }
   225  
   226  func nodeUpdateRetryTimeout(count int) time.Duration {
   227  	timeout := updateRetryTimeout
   228  	for i := 0; i < count && timeout < maxUpdateRetryTimeout; i++ {
   229  		timeout *= 2
   230  	}
   231  	if timeout > maxUpdateRetryTimeout {
   232  		timeout = maxUpdateRetryTimeout
   233  	}
   234  	return time.Duration(timeout.Nanoseconds()/2 + rand.Int63n(timeout.Nanoseconds()))
   235  }
   236  
   237  func (ca *cloudCIDRAllocator) removeNodeFromProcessing(nodeName string) {
   238  	ca.lock.Lock()
   239  	defer ca.lock.Unlock()
   240  	delete(ca.nodesInProcessing, nodeName)
   241  }
   242  
   243  // WARNING: If you're adding any return calls or defer any more work from this
   244  // function you have to make sure to update nodesInProcessing properly with the
   245  // disposition of the node when the work is done.
   246  func (ca *cloudCIDRAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error {
   247  	if node == nil {
   248  		return nil
   249  	}
   250  	if !ca.insertNodeToProcessing(node.Name) {
   251  		logger.V(2).Info("Node is already in a process of CIDR assignment", "node", klog.KObj(node))
   252  		return nil
   253  	}
   254  
   255  	logger.V(4).Info("Putting node into the work queue", "node", klog.KObj(node))
   256  	ca.nodeUpdateChannel <- node.Name
   257  	return nil
   258  }
   259  
   260  // updateCIDRAllocation assigns CIDR to Node and sends an update to the API server.
   261  func (ca *cloudCIDRAllocator) updateCIDRAllocation(logger klog.Logger, nodeName string) error {
   262  	node, err := ca.nodeLister.Get(nodeName)
   263  	if err != nil {
   264  		if errors.IsNotFound(err) {
   265  			return nil // node no longer available, skip processing
   266  		}
   267  		logger.Error(err, "Failed while getting the node for updating Node.Spec.PodCIDR", "node", klog.KRef("", nodeName))
   268  		return err
   269  	}
   270  	if node.Spec.ProviderID == "" {
   271  		return fmt.Errorf("node %s doesn't have providerID", nodeName)
   272  	}
   273  
   274  	cidrStrings, err := ca.cloud.AliasRangesByProviderID(node.Spec.ProviderID)
   275  	if err != nil {
   276  		controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRNotAvailable")
   277  		return fmt.Errorf("failed to get cidr(s) from provider: %v", err)
   278  	}
   279  	if len(cidrStrings) == 0 {
   280  		controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRNotAvailable")
   281  		return fmt.Errorf("failed to allocate cidr: Node %v has no CIDRs", node.Name)
   282  	}
   283  	//Can have at most 2 ips (one for v4 and one for v6)
   284  	if len(cidrStrings) > 2 {
   285  		logger.Info("Got more than 2 ips, truncating to 2", "cidrStrings", cidrStrings)
   286  		cidrStrings = cidrStrings[:2]
   287  	}
   288  
   289  	cidrs, err := netutils.ParseCIDRs(cidrStrings)
   290  	if err != nil {
   291  		return fmt.Errorf("failed to parse strings %v as CIDRs: %v", cidrStrings, err)
   292  	}
   293  
   294  	needUpdate, err := needPodCIDRsUpdate(logger, node, cidrs)
   295  	if err != nil {
   296  		return fmt.Errorf("err: %v, CIDRS: %v", err, cidrStrings)
   297  	}
   298  	if needUpdate {
   299  		if node.Spec.PodCIDR != "" {
   300  			logger.Error(nil, "PodCIDR being reassigned", "node", klog.KObj(node), "podCIDRs", node.Spec.PodCIDRs, "cidrStrings", cidrStrings)
   301  			// We fall through and set the CIDR despite this error. This
   302  			// implements the same logic as implemented in the
   303  			// rangeAllocator.
   304  			//
   305  			// See https://github.com/kubernetes/kubernetes/pull/42147#discussion_r103357248
   306  		}
   307  		for i := 0; i < cidrUpdateRetries; i++ {
   308  			if err = nodeutil.PatchNodeCIDRs(ca.client, types.NodeName(node.Name), cidrStrings); err == nil {
   309  				logger.Info("Set the node PodCIDRs", "node", klog.KObj(node), "cidrStrings", cidrStrings)
   310  				break
   311  			}
   312  		}
   313  	}
   314  	if err != nil {
   315  		controllerutil.RecordNodeStatusChange(logger, ca.recorder, node, "CIDRAssignmentFailed")
   316  		logger.Error(err, "Failed to update the node PodCIDR after multiple attempts", "node", klog.KObj(node), "cidrStrings", cidrStrings)
   317  		return err
   318  	}
   319  
   320  	err = nodeutil.SetNodeCondition(ca.client, types.NodeName(node.Name), v1.NodeCondition{
   321  		Type:               v1.NodeNetworkUnavailable,
   322  		Status:             v1.ConditionFalse,
   323  		Reason:             "RouteCreated",
   324  		Message:            "NodeController create implicit route",
   325  		LastTransitionTime: metav1.Now(),
   326  	})
   327  	if err != nil {
   328  		logger.Error(err, "Error setting route status for the node", "node", klog.KObj(node))
   329  	}
   330  	return err
   331  }
   332  
   333  func needPodCIDRsUpdate(logger klog.Logger, node *v1.Node, podCIDRs []*net.IPNet) (bool, error) {
   334  	if node.Spec.PodCIDR == "" {
   335  		return true, nil
   336  	}
   337  	_, nodePodCIDR, err := netutils.ParseCIDRSloppy(node.Spec.PodCIDR)
   338  	if err != nil {
   339  		logger.Error(err, "Found invalid node.Spec.PodCIDR", "podCIDR", node.Spec.PodCIDR)
   340  		// We will try to overwrite with new CIDR(s)
   341  		return true, nil
   342  	}
   343  	nodePodCIDRs, err := netutils.ParseCIDRs(node.Spec.PodCIDRs)
   344  	if err != nil {
   345  		logger.Error(err, "Found invalid node.Spec.PodCIDRs", "podCIDRs", node.Spec.PodCIDRs)
   346  		// We will try to overwrite with new CIDR(s)
   347  		return true, nil
   348  	}
   349  
   350  	if len(podCIDRs) == 1 {
   351  		if cmp.Equal(nodePodCIDR, podCIDRs[0]) {
   352  			logger.V(4).Info("Node already has allocated CIDR. It matches the proposed one", "node", klog.KObj(node), "podCIDR", podCIDRs[0])
   353  			return false, nil
   354  		}
   355  	} else if len(nodePodCIDRs) == len(podCIDRs) {
   356  		if dualStack, _ := netutils.IsDualStackCIDRs(podCIDRs); !dualStack {
   357  			return false, fmt.Errorf("IPs are not dual stack")
   358  		}
   359  		for idx, cidr := range podCIDRs {
   360  			if !cmp.Equal(nodePodCIDRs[idx], cidr) {
   361  				return true, nil
   362  			}
   363  		}
   364  		logger.V(4).Info("Node already has allocated CIDRs. It matches the proposed one", "node", klog.KObj(node), "podCIDRs", podCIDRs)
   365  		return false, nil
   366  	}
   367  
   368  	return true, nil
   369  }
   370  
   371  func (ca *cloudCIDRAllocator) ReleaseCIDR(logger klog.Logger, node *v1.Node) error {
   372  	logger.V(2).Info("Node's PodCIDR will be released by external cloud provider (not managed by controller)",
   373  		"node", klog.KObj(node), "podCIDR", node.Spec.PodCIDR)
   374  	return nil
   375  }
   376  

View as plain text