...

Source file src/k8s.io/kubernetes/pkg/controller/nodeipam/node_ipam_controller.go

Documentation: k8s.io/kubernetes/pkg/controller/nodeipam

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package nodeipam
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"time"
    24  
    25  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    26  	coreinformers "k8s.io/client-go/informers/core/v1"
    27  	clientset "k8s.io/client-go/kubernetes"
    28  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    29  	corelisters "k8s.io/client-go/listers/core/v1"
    30  	"k8s.io/client-go/tools/cache"
    31  	"k8s.io/client-go/tools/record"
    32  	cloudprovider "k8s.io/cloud-provider"
    33  	controllersmetrics "k8s.io/component-base/metrics/prometheus/controllers"
    34  	"k8s.io/klog/v2"
    35  	"k8s.io/kubernetes/pkg/controller/nodeipam/ipam"
    36  )
    37  
    38  const (
    39  	// ipamResyncInterval is the amount of time between when the cloud and node
    40  	// CIDR range assignments are synchronized.
    41  	ipamResyncInterval = 30 * time.Second
    42  	// ipamMaxBackoff is the maximum backoff for retrying synchronization of a
    43  	// given in the error state.
    44  	ipamMaxBackoff = 10 * time.Second
    45  	// ipamInitialRetry is the initial retry interval for retrying synchronization of a
    46  	// given in the error state.
    47  	ipamInitialBackoff = 250 * time.Millisecond
    48  )
    49  
    50  // ipamController is an interface abstracting an interface for
    51  // legacy mode. It is needed to ensure correct building for
    52  // both provider-specific and providerless environments.
    53  type ipamController interface {
    54  	Run(ctx context.Context)
    55  }
    56  
    57  // Controller is the controller that manages node ipam state.
    58  type Controller struct {
    59  	allocatorType ipam.CIDRAllocatorType
    60  
    61  	cloud                cloudprovider.Interface
    62  	clusterCIDRs         []*net.IPNet
    63  	serviceCIDR          *net.IPNet
    64  	secondaryServiceCIDR *net.IPNet
    65  	kubeClient           clientset.Interface
    66  	eventBroadcaster     record.EventBroadcaster
    67  
    68  	nodeLister         corelisters.NodeLister
    69  	nodeInformerSynced cache.InformerSynced
    70  
    71  	legacyIPAM    ipamController
    72  	cidrAllocator ipam.CIDRAllocator
    73  }
    74  
    75  // NewNodeIpamController returns a new node IP Address Management controller to
    76  // sync instances from cloudprovider.
    77  // This method returns an error if it is unable to initialize the CIDR bitmap with
    78  // podCIDRs it has already allocated to nodes. Since we don't allow podCIDR changes
    79  // currently, this should be handled as a fatal error.
    80  func NewNodeIpamController(
    81  	ctx context.Context,
    82  	nodeInformer coreinformers.NodeInformer,
    83  	cloud cloudprovider.Interface,
    84  	kubeClient clientset.Interface,
    85  	clusterCIDRs []*net.IPNet,
    86  	serviceCIDR *net.IPNet,
    87  	secondaryServiceCIDR *net.IPNet,
    88  	nodeCIDRMaskSizes []int,
    89  	allocatorType ipam.CIDRAllocatorType) (*Controller, error) {
    90  
    91  	if kubeClient == nil {
    92  		return nil, fmt.Errorf("kubeClient is nil when starting Controller")
    93  	}
    94  
    95  	// Cloud CIDR allocator does not rely on clusterCIDR or nodeCIDRMaskSize for allocation.
    96  	if allocatorType != ipam.CloudAllocatorType {
    97  		if len(clusterCIDRs) == 0 {
    98  			return nil, fmt.Errorf("Controller: Must specify --cluster-cidr if --allocate-node-cidrs is set")
    99  		}
   100  
   101  		for idx, cidr := range clusterCIDRs {
   102  			mask := cidr.Mask
   103  			if maskSize, _ := mask.Size(); maskSize > nodeCIDRMaskSizes[idx] {
   104  				return nil, fmt.Errorf("Controller: Invalid --cluster-cidr, mask size of cluster CIDR must be less than or equal to --node-cidr-mask-size configured for CIDR family")
   105  			}
   106  		}
   107  	}
   108  
   109  	ic := &Controller{
   110  		cloud:                cloud,
   111  		kubeClient:           kubeClient,
   112  		eventBroadcaster:     record.NewBroadcaster(record.WithContext(ctx)),
   113  		clusterCIDRs:         clusterCIDRs,
   114  		serviceCIDR:          serviceCIDR,
   115  		secondaryServiceCIDR: secondaryServiceCIDR,
   116  		allocatorType:        allocatorType,
   117  	}
   118  
   119  	// TODO: Abstract this check into a generic controller manager should run method.
   120  	if ic.allocatorType == ipam.IPAMFromClusterAllocatorType || ic.allocatorType == ipam.IPAMFromCloudAllocatorType {
   121  		var err error
   122  		ic.legacyIPAM, err = createLegacyIPAM(ctx, ic, nodeInformer, cloud, kubeClient, clusterCIDRs, serviceCIDR, nodeCIDRMaskSizes)
   123  		if err != nil {
   124  			return nil, err
   125  		}
   126  	} else {
   127  		var err error
   128  
   129  		allocatorParams := ipam.CIDRAllocatorParams{
   130  			ClusterCIDRs:         clusterCIDRs,
   131  			ServiceCIDR:          ic.serviceCIDR,
   132  			SecondaryServiceCIDR: ic.secondaryServiceCIDR,
   133  			NodeCIDRMaskSizes:    nodeCIDRMaskSizes,
   134  		}
   135  
   136  		ic.cidrAllocator, err = ipam.New(ctx, kubeClient, cloud, nodeInformer, ic.allocatorType, allocatorParams)
   137  		if err != nil {
   138  			return nil, err
   139  		}
   140  	}
   141  
   142  	ic.nodeLister = nodeInformer.Lister()
   143  	ic.nodeInformerSynced = nodeInformer.Informer().HasSynced
   144  
   145  	return ic, nil
   146  }
   147  
   148  // Run starts an asynchronous loop that monitors the status of cluster nodes.
   149  func (nc *Controller) Run(ctx context.Context) {
   150  	defer utilruntime.HandleCrash()
   151  
   152  	// Start event processing pipeline.
   153  	nc.eventBroadcaster.StartStructuredLogging(3)
   154  	nc.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: nc.kubeClient.CoreV1().Events("")})
   155  	defer nc.eventBroadcaster.Shutdown()
   156  	klog.FromContext(ctx).Info("Starting ipam controller")
   157  	defer klog.FromContext(ctx).Info("Shutting down ipam controller")
   158  
   159  	if !cache.WaitForNamedCacheSync("node", ctx.Done(), nc.nodeInformerSynced) {
   160  		return
   161  	}
   162  
   163  	if nc.allocatorType == ipam.IPAMFromClusterAllocatorType || nc.allocatorType == ipam.IPAMFromCloudAllocatorType {
   164  		go nc.legacyIPAM.Run(ctx)
   165  	} else {
   166  		go nc.cidrAllocator.Run(ctx)
   167  	}
   168  
   169  	<-ctx.Done()
   170  }
   171  
   172  // RunWithMetrics is a wrapper for Run that also tracks starting and stopping of the nodeipam controller with additional metric
   173  func (nc *Controller) RunWithMetrics(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
   174  	controllerManagerMetrics.ControllerStarted("nodeipam")
   175  	defer controllerManagerMetrics.ControllerStopped("nodeipam")
   176  	nc.Run(ctx)
   177  }
   178  

View as plain text