...

Source file src/k8s.io/kubernetes/pkg/controller/nodeipam/ipam/range_allocator.go

Documentation: k8s.io/kubernetes/pkg/controller/nodeipam/ipam

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package ipam
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"sync"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	"k8s.io/klog/v2"
    27  	netutils "k8s.io/utils/net"
    28  
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	"k8s.io/apimachinery/pkg/types"
    31  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    32  	"k8s.io/apimachinery/pkg/util/sets"
    33  	informers "k8s.io/client-go/informers/core/v1"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	"k8s.io/client-go/kubernetes/scheme"
    36  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    37  	corelisters "k8s.io/client-go/listers/core/v1"
    38  	"k8s.io/client-go/tools/cache"
    39  	"k8s.io/client-go/tools/record"
    40  	nodeutil "k8s.io/component-helpers/node/util"
    41  	"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
    42  	controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
    43  )
    44  
    45  type rangeAllocator struct {
    46  	client clientset.Interface
    47  	// cluster cidrs as passed in during controller creation
    48  	clusterCIDRs []*net.IPNet
    49  	// for each entry in clusterCIDRs we maintain a list of what is used and what is not
    50  	cidrSets []*cidrset.CidrSet
    51  	// nodeLister is able to list/get nodes and is populated by the shared informer passed to controller
    52  	nodeLister corelisters.NodeLister
    53  	// nodesSynced returns true if the node shared informer has been synced at least once.
    54  	nodesSynced cache.InformerSynced
    55  	// Channel that is used to pass updating Nodes and their reserved CIDRs to the background
    56  	// This increases a throughput of CIDR assignment by not blocking on long operations.
    57  	nodeCIDRUpdateChannel chan nodeReservedCIDRs
    58  	broadcaster           record.EventBroadcaster
    59  	recorder              record.EventRecorder
    60  	// Keep a set of nodes that are currently being processed to avoid races in CIDR allocation
    61  	lock              sync.Mutex
    62  	nodesInProcessing sets.String
    63  }
    64  
    65  // NewCIDRRangeAllocator returns a CIDRAllocator to allocate CIDRs for node (one from each of clusterCIDRs)
    66  // Caller must ensure subNetMaskSize is not less than cluster CIDR mask size.
    67  // Caller must always pass in a list of existing nodes so the new allocator.
    68  // Caller must ensure that ClusterCIDRs are semantically correct e.g (1 for non DualStack, 2 for DualStack etc..)
    69  // can initialize its CIDR map. NodeList is only nil in testing.
    70  func NewCIDRRangeAllocator(ctx context.Context, client clientset.Interface, nodeInformer informers.NodeInformer, allocatorParams CIDRAllocatorParams, nodeList *v1.NodeList) (CIDRAllocator, error) {
    71  	logger := klog.FromContext(ctx)
    72  	if client == nil {
    73  		logger.Error(nil, "kubeClient is nil when starting CIDRRangeAllocator")
    74  		klog.FlushAndExit(klog.ExitFlushTimeout, 1)
    75  	}
    76  
    77  	eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
    78  	recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cidrAllocator"})
    79  
    80  	// create a cidrSet for each cidr we operate on
    81  	// cidrSet are mapped to clusterCIDR by index
    82  	cidrSets := make([]*cidrset.CidrSet, len(allocatorParams.ClusterCIDRs))
    83  	for idx, cidr := range allocatorParams.ClusterCIDRs {
    84  		cidrSet, err := cidrset.NewCIDRSet(cidr, allocatorParams.NodeCIDRMaskSizes[idx])
    85  		if err != nil {
    86  			return nil, err
    87  		}
    88  		cidrSets[idx] = cidrSet
    89  	}
    90  
    91  	ra := &rangeAllocator{
    92  		client:                client,
    93  		clusterCIDRs:          allocatorParams.ClusterCIDRs,
    94  		cidrSets:              cidrSets,
    95  		nodeLister:            nodeInformer.Lister(),
    96  		nodesSynced:           nodeInformer.Informer().HasSynced,
    97  		nodeCIDRUpdateChannel: make(chan nodeReservedCIDRs, cidrUpdateQueueSize),
    98  		broadcaster:           eventBroadcaster,
    99  		recorder:              recorder,
   100  		nodesInProcessing:     sets.NewString(),
   101  	}
   102  
   103  	if allocatorParams.ServiceCIDR != nil {
   104  		ra.filterOutServiceRange(logger, allocatorParams.ServiceCIDR)
   105  	} else {
   106  		logger.Info("No Service CIDR provided. Skipping filtering out service addresses")
   107  	}
   108  
   109  	if allocatorParams.SecondaryServiceCIDR != nil {
   110  		ra.filterOutServiceRange(logger, allocatorParams.SecondaryServiceCIDR)
   111  	} else {
   112  		logger.Info("No Secondary Service CIDR provided. Skipping filtering out secondary service addresses")
   113  	}
   114  
   115  	if nodeList != nil {
   116  		for _, node := range nodeList.Items {
   117  			if len(node.Spec.PodCIDRs) == 0 {
   118  				logger.V(4).Info("Node has no CIDR, ignoring", "node", klog.KObj(&node))
   119  				continue
   120  			}
   121  			logger.V(4).Info("Node has CIDR, occupying it in CIDR map", "node", klog.KObj(&node), "podCIDR", node.Spec.PodCIDR)
   122  			if err := ra.occupyCIDRs(&node); err != nil {
   123  				// This will happen if:
   124  				// 1. We find garbage in the podCIDRs field. Retrying is useless.
   125  				// 2. CIDR out of range: This means a node CIDR has changed.
   126  				// This error will keep crashing controller-manager.
   127  				return nil, err
   128  			}
   129  		}
   130  	}
   131  
   132  	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   133  		AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
   134  			return ra.AllocateOrOccupyCIDR(logger, node)
   135  		}),
   136  		UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
   137  			// If the PodCIDRs list is not empty we either:
   138  			// - already processed a Node that already had CIDRs after NC restarted
   139  			//   (cidr is marked as used),
   140  			// - already processed a Node successfully and allocated CIDRs for it
   141  			//   (cidr is marked as used),
   142  			// - already processed a Node but we did saw a "timeout" response and
   143  			//   request eventually got through in this case we haven't released
   144  			//   the allocated CIDRs (cidr is still marked as used).
   145  			// There's a possible error here:
   146  			// - NC sees a new Node and assigns CIDRs X,Y.. to it,
   147  			// - Update Node call fails with a timeout,
   148  			// - Node is updated by some other component, NC sees an update and
   149  			//   assigns CIDRs A,B.. to the Node,
   150  			// - Both CIDR X,Y.. and CIDR A,B.. are marked as used in the local cache,
   151  			//   even though Node sees only CIDR A,B..
   152  			// The problem here is that in in-memory cache we see CIDR X,Y.. as marked,
   153  			// which prevents it from being assigned to any new node. The cluster
   154  			// state is correct.
   155  			// Restart of NC fixes the issue.
   156  			if len(newNode.Spec.PodCIDRs) == 0 {
   157  				return ra.AllocateOrOccupyCIDR(logger, newNode)
   158  			}
   159  			return nil
   160  		}),
   161  		DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
   162  			return ra.ReleaseCIDR(logger, node)
   163  		}),
   164  	})
   165  
   166  	return ra, nil
   167  }
   168  
   169  func (r *rangeAllocator) Run(ctx context.Context) {
   170  	defer utilruntime.HandleCrash()
   171  
   172  	// Start event processing pipeline.
   173  	r.broadcaster.StartStructuredLogging(3)
   174  	logger := klog.FromContext(ctx)
   175  	logger.Info("Sending events to api server")
   176  	r.broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: r.client.CoreV1().Events("")})
   177  	defer r.broadcaster.Shutdown()
   178  
   179  	logger.Info("Starting range CIDR allocator")
   180  	defer logger.Info("Shutting down range CIDR allocator")
   181  
   182  	if !cache.WaitForNamedCacheSync("cidrallocator", ctx.Done(), r.nodesSynced) {
   183  		return
   184  	}
   185  
   186  	for i := 0; i < cidrUpdateWorkers; i++ {
   187  		go r.worker(ctx)
   188  	}
   189  
   190  	<-ctx.Done()
   191  }
   192  
   193  func (r *rangeAllocator) worker(ctx context.Context) {
   194  	logger := klog.FromContext(ctx)
   195  	for {
   196  		select {
   197  		case workItem, ok := <-r.nodeCIDRUpdateChannel:
   198  			if !ok {
   199  				logger.Info("Channel nodeCIDRUpdateChannel was unexpectedly closed")
   200  				return
   201  			}
   202  			if err := r.updateCIDRsAllocation(logger, workItem); err != nil {
   203  				// Requeue the failed node for update again.
   204  				r.nodeCIDRUpdateChannel <- workItem
   205  			}
   206  		case <-ctx.Done():
   207  			return
   208  		}
   209  	}
   210  }
   211  
   212  func (r *rangeAllocator) insertNodeToProcessing(nodeName string) bool {
   213  	r.lock.Lock()
   214  	defer r.lock.Unlock()
   215  	if r.nodesInProcessing.Has(nodeName) {
   216  		return false
   217  	}
   218  	r.nodesInProcessing.Insert(nodeName)
   219  	return true
   220  }
   221  
   222  func (r *rangeAllocator) removeNodeFromProcessing(nodeName string) {
   223  	r.lock.Lock()
   224  	defer r.lock.Unlock()
   225  	r.nodesInProcessing.Delete(nodeName)
   226  }
   227  
   228  // marks node.PodCIDRs[...] as used in allocator's tracked cidrSet
   229  func (r *rangeAllocator) occupyCIDRs(node *v1.Node) error {
   230  	defer r.removeNodeFromProcessing(node.Name)
   231  	if len(node.Spec.PodCIDRs) == 0 {
   232  		return nil
   233  	}
   234  	for idx, cidr := range node.Spec.PodCIDRs {
   235  		_, podCIDR, err := netutils.ParseCIDRSloppy(cidr)
   236  		if err != nil {
   237  			return fmt.Errorf("failed to parse node %s, CIDR %s", node.Name, node.Spec.PodCIDR)
   238  		}
   239  		// If node has a pre allocate cidr that does not exist in our cidrs.
   240  		// This will happen if cluster went from dualstack(multi cidrs) to non-dualstack
   241  		// then we have now way of locking it
   242  		if idx >= len(r.cidrSets) {
   243  			return fmt.Errorf("node:%s has an allocated cidr: %v at index:%v that does not exist in cluster cidrs configuration", node.Name, cidr, idx)
   244  		}
   245  
   246  		if err := r.cidrSets[idx].Occupy(podCIDR); err != nil {
   247  			return fmt.Errorf("failed to mark cidr[%v] at idx [%v] as occupied for node: %v: %v", podCIDR, idx, node.Name, err)
   248  		}
   249  	}
   250  	return nil
   251  }
   252  
   253  // WARNING: If you're adding any return calls or defer any more work from this
   254  // function you have to make sure to update nodesInProcessing properly with the
   255  // disposition of the node when the work is done.
   256  func (r *rangeAllocator) AllocateOrOccupyCIDR(logger klog.Logger, node *v1.Node) error {
   257  	if node == nil {
   258  		return nil
   259  	}
   260  	if !r.insertNodeToProcessing(node.Name) {
   261  		logger.V(2).Info("Node is already in a process of CIDR assignment", "node", klog.KObj(node))
   262  		return nil
   263  	}
   264  
   265  	if len(node.Spec.PodCIDRs) > 0 {
   266  		return r.occupyCIDRs(node)
   267  	}
   268  	// allocate and queue the assignment
   269  	allocated := nodeReservedCIDRs{
   270  		nodeName:       node.Name,
   271  		allocatedCIDRs: make([]*net.IPNet, len(r.cidrSets)),
   272  	}
   273  
   274  	for idx := range r.cidrSets {
   275  		podCIDR, err := r.cidrSets[idx].AllocateNext()
   276  		if err != nil {
   277  			r.removeNodeFromProcessing(node.Name)
   278  			controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRNotAvailable")
   279  			return fmt.Errorf("failed to allocate cidr from cluster cidr at idx:%v: %v", idx, err)
   280  		}
   281  		allocated.allocatedCIDRs[idx] = podCIDR
   282  	}
   283  
   284  	//queue the assignment
   285  	logger.V(4).Info("Putting node with CIDR into the work queue", "node", klog.KObj(node), "CIDRs", allocated.allocatedCIDRs)
   286  	r.nodeCIDRUpdateChannel <- allocated
   287  	return nil
   288  }
   289  
   290  // ReleaseCIDR marks node.podCIDRs[...] as unused in our tracked cidrSets
   291  func (r *rangeAllocator) ReleaseCIDR(logger klog.Logger, node *v1.Node) error {
   292  	if node == nil || len(node.Spec.PodCIDRs) == 0 {
   293  		return nil
   294  	}
   295  
   296  	for idx, cidr := range node.Spec.PodCIDRs {
   297  		_, podCIDR, err := netutils.ParseCIDRSloppy(cidr)
   298  		if err != nil {
   299  			return fmt.Errorf("failed to parse CIDR %s on Node %v: %v", cidr, node.Name, err)
   300  		}
   301  
   302  		// If node has a pre allocate cidr that does not exist in our cidrs.
   303  		// This will happen if cluster went from dualstack(multi cidrs) to non-dualstack
   304  		// then we have now way of locking it
   305  		if idx >= len(r.cidrSets) {
   306  			return fmt.Errorf("node:%s has an allocated cidr: %v at index:%v that does not exist in cluster cidrs configuration", node.Name, cidr, idx)
   307  		}
   308  
   309  		logger.V(4).Info("Release CIDR for node", "CIDR", cidr, "node", klog.KObj(node))
   310  		if err = r.cidrSets[idx].Release(podCIDR); err != nil {
   311  			return fmt.Errorf("error when releasing CIDR %v: %v", cidr, err)
   312  		}
   313  	}
   314  	return nil
   315  }
   316  
   317  // Marks all CIDRs with subNetMaskSize that belongs to serviceCIDR as used across all cidrs
   318  // so that they won't be assignable.
   319  func (r *rangeAllocator) filterOutServiceRange(logger klog.Logger, serviceCIDR *net.IPNet) {
   320  	// Checks if service CIDR has a nonempty intersection with cluster
   321  	// CIDR. It is the case if either clusterCIDR contains serviceCIDR with
   322  	// clusterCIDR's Mask applied (this means that clusterCIDR contains
   323  	// serviceCIDR) or vice versa (which means that serviceCIDR contains
   324  	// clusterCIDR).
   325  	for idx, cidr := range r.clusterCIDRs {
   326  		// if they don't overlap then ignore the filtering
   327  		if !cidr.Contains(serviceCIDR.IP.Mask(cidr.Mask)) && !serviceCIDR.Contains(cidr.IP.Mask(serviceCIDR.Mask)) {
   328  			continue
   329  		}
   330  
   331  		// at this point, len(cidrSet) == len(clusterCidr)
   332  		if err := r.cidrSets[idx].Occupy(serviceCIDR); err != nil {
   333  			logger.Error(err, "Error filtering out service cidr out cluster cidr", "CIDR", cidr, "index", idx, "serviceCIDR", serviceCIDR)
   334  		}
   335  	}
   336  }
   337  
   338  // updateCIDRsAllocation assigns CIDR to Node and sends an update to the API server.
   339  func (r *rangeAllocator) updateCIDRsAllocation(logger klog.Logger, data nodeReservedCIDRs) error {
   340  	var err error
   341  	var node *v1.Node
   342  	defer r.removeNodeFromProcessing(data.nodeName)
   343  	cidrsString := ipnetToStringList(data.allocatedCIDRs)
   344  	node, err = r.nodeLister.Get(data.nodeName)
   345  	if err != nil {
   346  		logger.Error(err, "Failed while getting node for updating Node.Spec.PodCIDRs", "node", klog.KRef("", data.nodeName))
   347  		return err
   348  	}
   349  
   350  	// if cidr list matches the proposed.
   351  	// then we possibly updated this node
   352  	// and just failed to ack the success.
   353  	if len(node.Spec.PodCIDRs) == len(data.allocatedCIDRs) {
   354  		match := true
   355  		for idx, cidr := range cidrsString {
   356  			if node.Spec.PodCIDRs[idx] != cidr {
   357  				match = false
   358  				break
   359  			}
   360  		}
   361  		if match {
   362  			logger.V(4).Info("Node already has allocated CIDR. It matches the proposed one", "node", klog.KObj(node), "CIDRs", data.allocatedCIDRs)
   363  			return nil
   364  		}
   365  	}
   366  
   367  	// node has cidrs, release the reserved
   368  	if len(node.Spec.PodCIDRs) != 0 {
   369  		logger.Error(nil, "Node already has a CIDR allocated. Releasing the new one", "node", klog.KObj(node), "podCIDRs", node.Spec.PodCIDRs)
   370  		for idx, cidr := range data.allocatedCIDRs {
   371  			if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil {
   372  				logger.Error(releaseErr, "Error when releasing CIDR", "index", idx, "CIDR", cidr)
   373  			}
   374  		}
   375  		return nil
   376  	}
   377  
   378  	// If we reached here, it means that the node has no CIDR currently assigned. So we set it.
   379  	for i := 0; i < cidrUpdateRetries; i++ {
   380  		if err = nodeutil.PatchNodeCIDRs(r.client, types.NodeName(node.Name), cidrsString); err == nil {
   381  			logger.Info("Set node PodCIDR", "node", klog.KObj(node), "podCIDRs", cidrsString)
   382  			return nil
   383  		}
   384  	}
   385  	// failed release back to the pool
   386  	logger.Error(err, "Failed to update node PodCIDR after multiple attempts", "node", klog.KObj(node), "podCIDRs", cidrsString)
   387  	controllerutil.RecordNodeStatusChange(logger, r.recorder, node, "CIDRAssignmentFailed")
   388  	// We accept the fact that we may leak CIDRs here. This is safer than releasing
   389  	// them in case when we don't know if request went through.
   390  	// NodeController restart will return all falsely allocated CIDRs to the pool.
   391  	if !apierrors.IsServerTimeout(err) {
   392  		logger.Error(err, "CIDR assignment for node failed. Releasing allocated CIDR", "node", klog.KObj(node))
   393  		for idx, cidr := range data.allocatedCIDRs {
   394  			if releaseErr := r.cidrSets[idx].Release(cidr); releaseErr != nil {
   395  				logger.Error(releaseErr, "Error releasing allocated CIDR for node", "node", klog.KObj(node))
   396  			}
   397  		}
   398  	}
   399  	return err
   400  }
   401  

View as plain text