...

Source file src/k8s.io/kubernetes/pkg/controller/nodeipam/ipam/controller_legacyprovider.go

Documentation: k8s.io/kubernetes/pkg/controller/nodeipam/ipam

     1  //go:build !providerless
     2  // +build !providerless
     3  
     4  /*
     5  Copyright 2017 The Kubernetes Authors.
     6  
     7  Licensed under the Apache License, Version 2.0 (the "License");
     8  you may not use this file except in compliance with the License.
     9  You may obtain a copy of the License at
    10  
    11      http://www.apache.org/licenses/LICENSE-2.0
    12  
    13  Unless required by applicable law or agreed to in writing, software
    14  distributed under the License is distributed on an "AS IS" BASIS,
    15  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    16  See the License for the specific language governing permissions and
    17  limitations under the License.
    18  */
    19  
    20  package ipam
    21  
    22  import (
    23  	"context"
    24  	"fmt"
    25  	"net"
    26  	"sync"
    27  	"time"
    28  
    29  	"k8s.io/klog/v2"
    30  	netutils "k8s.io/utils/net"
    31  
    32  	v1 "k8s.io/api/core/v1"
    33  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    34  	informers "k8s.io/client-go/informers/core/v1"
    35  	clientset "k8s.io/client-go/kubernetes"
    36  	"k8s.io/client-go/tools/cache"
    37  	cloudprovider "k8s.io/cloud-provider"
    38  	"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
    39  	nodesync "k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync"
    40  	controllerutil "k8s.io/kubernetes/pkg/controller/util/node"
    41  	"k8s.io/legacy-cloud-providers/gce"
    42  )
    43  
    44  // Config for the IPAM controller.
    45  type Config struct {
    46  	// Resync is the default timeout duration when there are no errors.
    47  	Resync time.Duration
    48  	// MaxBackoff is the maximum timeout when in a error backoff state.
    49  	MaxBackoff time.Duration
    50  	// InitialRetry is the initial retry interval when an error is reported.
    51  	InitialRetry time.Duration
    52  	// Mode to use to synchronize.
    53  	Mode nodesync.NodeSyncMode
    54  }
    55  
    56  // Controller is the controller for synchronizing cluster and cloud node
    57  // pod CIDR range assignments.
    58  type Controller struct {
    59  	config  *Config
    60  	adapter *adapter
    61  
    62  	lock    sync.Mutex
    63  	syncers map[string]*nodesync.NodeSync
    64  
    65  	set *cidrset.CidrSet
    66  }
    67  
    68  // NewController returns a new instance of the IPAM controller.
    69  func NewController(
    70  	ctx context.Context,
    71  	config *Config,
    72  	kubeClient clientset.Interface,
    73  	cloud cloudprovider.Interface,
    74  	clusterCIDR, serviceCIDR *net.IPNet,
    75  	nodeCIDRMaskSize int) (*Controller, error) {
    76  
    77  	if !nodesync.IsValidMode(config.Mode) {
    78  		return nil, fmt.Errorf("invalid IPAM controller mode %q", config.Mode)
    79  	}
    80  
    81  	gceCloud, ok := cloud.(*gce.Cloud)
    82  	if !ok {
    83  		return nil, fmt.Errorf("cloud IPAM controller does not support %q provider", cloud.ProviderName())
    84  	}
    85  
    86  	set, err := cidrset.NewCIDRSet(clusterCIDR, nodeCIDRMaskSize)
    87  	if err != nil {
    88  		return nil, err
    89  	}
    90  
    91  	c := &Controller{
    92  		config:  config,
    93  		adapter: newAdapter(ctx, kubeClient, gceCloud),
    94  		syncers: make(map[string]*nodesync.NodeSync),
    95  		set:     set,
    96  	}
    97  
    98  	if err := occupyServiceCIDR(c.set, clusterCIDR, serviceCIDR); err != nil {
    99  		return nil, err
   100  	}
   101  
   102  	//check whether there is a remaining cidr after occupyServiceCIDR
   103  	cidr, err := c.set.AllocateNext()
   104  	switch err {
   105  	case cidrset.ErrCIDRRangeNoCIDRsRemaining:
   106  		return nil, fmt.Errorf("failed after occupy serviceCIDR: %v", err)
   107  	case nil:
   108  		err := c.set.Release(cidr)
   109  		return c, err
   110  	default:
   111  		return nil, fmt.Errorf("unexpected error when check remaining CIDR range: %v", err)
   112  	}
   113  }
   114  
   115  // Start initializes the Controller with the existing list of nodes and
   116  // registers the informers for node changes. This will start synchronization
   117  // of the node and cloud CIDR range allocations.
   118  func (c *Controller) Start(logger klog.Logger, nodeInformer informers.NodeInformer) error {
   119  	logger.Info("Starting IPAM controller", "config", c.config)
   120  
   121  	nodes, err := listNodes(logger, c.adapter.k8s)
   122  	if err != nil {
   123  		return err
   124  	}
   125  	for _, node := range nodes.Items {
   126  		if node.Spec.PodCIDR != "" {
   127  			_, cidrRange, err := netutils.ParseCIDRSloppy(node.Spec.PodCIDR)
   128  			if err == nil {
   129  				c.set.Occupy(cidrRange)
   130  				logger.V(3).Info("Occupying CIDR for node", "CIDR", node.Spec.PodCIDR, "node", klog.KObj(&node))
   131  			} else {
   132  				logger.Error(err, "Node has an invalid CIDR", "node", klog.KObj(&node), "CIDR", node.Spec.PodCIDR)
   133  			}
   134  		}
   135  
   136  		func() {
   137  			c.lock.Lock()
   138  			defer c.lock.Unlock()
   139  
   140  			// XXX/bowei -- stagger the start of each sync cycle.
   141  			syncer := c.newSyncer(node.Name)
   142  			c.syncers[node.Name] = syncer
   143  			go syncer.Loop(logger, nil)
   144  		}()
   145  	}
   146  
   147  	nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
   148  		AddFunc: controllerutil.CreateAddNodeHandler(func(node *v1.Node) error {
   149  			return c.onAdd(logger, node)
   150  		}),
   151  		UpdateFunc: controllerutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
   152  			return c.onUpdate(logger, newNode)
   153  		}),
   154  		DeleteFunc: controllerutil.CreateDeleteNodeHandler(logger, func(node *v1.Node) error {
   155  			return c.onDelete(logger, node)
   156  		}),
   157  	})
   158  
   159  	return nil
   160  }
   161  
   162  func (c *Controller) Run(ctx context.Context) {
   163  	defer utilruntime.HandleCrash()
   164  
   165  	go c.adapter.Run(ctx)
   166  	<-ctx.Done()
   167  }
   168  
   169  type nodeState struct {
   170  	t Timeout
   171  }
   172  
   173  func (ns *nodeState) ReportResult(err error) {
   174  	ns.t.Update(err == nil)
   175  }
   176  
   177  func (ns *nodeState) ResyncTimeout() time.Duration {
   178  	return ns.t.Next()
   179  }
   180  
   181  func (c *Controller) newSyncer(name string) *nodesync.NodeSync {
   182  	ns := &nodeState{
   183  		Timeout{
   184  			Resync:       c.config.Resync,
   185  			MaxBackoff:   c.config.MaxBackoff,
   186  			InitialRetry: c.config.InitialRetry,
   187  		},
   188  	}
   189  	return nodesync.New(ns, c.adapter, c.adapter, c.config.Mode, name, c.set)
   190  }
   191  
   192  func (c *Controller) onAdd(logger klog.Logger, node *v1.Node) error {
   193  	c.lock.Lock()
   194  	defer c.lock.Unlock()
   195  
   196  	syncer, ok := c.syncers[node.Name]
   197  	if !ok {
   198  		syncer = c.newSyncer(node.Name)
   199  		c.syncers[node.Name] = syncer
   200  		go syncer.Loop(logger, nil)
   201  	} else {
   202  		logger.Info("Add for node that already exists", "node", klog.KObj(node))
   203  	}
   204  	syncer.Update(node)
   205  
   206  	return nil
   207  }
   208  
   209  func (c *Controller) onUpdate(logger klog.Logger, node *v1.Node) error {
   210  	c.lock.Lock()
   211  	defer c.lock.Unlock()
   212  
   213  	if sync, ok := c.syncers[node.Name]; ok {
   214  		sync.Update(node)
   215  	} else {
   216  		logger.Error(nil, "Received update for non-existent node", "node", klog.KObj(node))
   217  		return fmt.Errorf("unknown node %q", node.Name)
   218  	}
   219  
   220  	return nil
   221  }
   222  
   223  func (c *Controller) onDelete(logger klog.Logger, node *v1.Node) error {
   224  	c.lock.Lock()
   225  	defer c.lock.Unlock()
   226  
   227  	if syncer, ok := c.syncers[node.Name]; ok {
   228  		syncer.Delete(node)
   229  		delete(c.syncers, node.Name)
   230  	} else {
   231  		logger.Info("Node was already deleted", "node", klog.KObj(node))
   232  	}
   233  
   234  	return nil
   235  }
   236  

View as plain text