...

Source file src/k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync/sync.go

Documentation: k8s.io/kubernetes/pkg/controller/nodeipam/ipam/sync

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package sync
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"time"
    24  
    25  	"k8s.io/klog/v2"
    26  	netutils "k8s.io/utils/net"
    27  
    28  	v1 "k8s.io/api/core/v1"
    29  	"k8s.io/kubernetes/pkg/controller/nodeipam/ipam/cidrset"
    30  )
    31  
    32  const (
    33  	// InvalidPodCIDR is the event recorded when a node is found with an
    34  	// invalid PodCIDR.
    35  	InvalidPodCIDR = "CloudCIDRAllocatorInvalidPodCIDR"
    36  	// InvalidModeEvent is the event recorded when the CIDR range cannot be
    37  	// sync'd due to the cluster running in the wrong mode.
    38  	InvalidModeEvent = "CloudCIDRAllocatorInvalidMode"
    39  	// MismatchEvent is the event recorded when the CIDR range allocated in the
    40  	// node spec does not match what has been allocated in the cloud.
    41  	MismatchEvent = "CloudCIDRAllocatorMismatch"
    42  )
    43  
    44  // cloudAlias is the interface to the cloud platform APIs.
    45  type cloudAlias interface {
    46  	// Alias returns the IP alias for the node.
    47  	Alias(ctx context.Context, node *v1.Node) (*net.IPNet, error)
    48  	// AddAlias adds an alias to the node.
    49  	AddAlias(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error
    50  }
    51  
    52  // kubeAPI is the interface to the Kubernetes APIs.
    53  type kubeAPI interface {
    54  	// Node returns the spec for the Node object.
    55  	Node(ctx context.Context, name string) (*v1.Node, error)
    56  	// UpdateNodePodCIDR updates the PodCIDR in the Node spec.
    57  	UpdateNodePodCIDR(ctx context.Context, node *v1.Node, cidrRange *net.IPNet) error
    58  	// UpdateNodeNetworkUnavailable updates the network unavailable status for the node.
    59  	UpdateNodeNetworkUnavailable(nodeName string, unavailable bool) error
    60  	// EmitNodeWarningEvent emits an event for the given node.
    61  	EmitNodeWarningEvent(nodeName, reason, fmt string, args ...interface{})
    62  }
    63  
    64  // controller is the interface to the controller.
    65  type controller interface {
    66  	// ReportResult updates the controller with the result of the latest
    67  	// sync operation.
    68  	ReportResult(err error)
    69  	// ResyncTimeout returns the amount of time to wait before retrying
    70  	// a sync with a node.
    71  	ResyncTimeout() time.Duration
    72  }
    73  
    74  // NodeSyncMode is the mode the cloud CIDR allocator runs in.
    75  type NodeSyncMode string
    76  
    77  var (
    78  	// SyncFromCloud is the mode that synchronizes the IP allocation from the cloud
    79  	// platform to the node.
    80  	SyncFromCloud NodeSyncMode = "SyncFromCloud"
    81  	// SyncFromCluster is the mode that synchronizes the IP allocation determined
    82  	// by the k8s controller to the cloud provider.
    83  	SyncFromCluster NodeSyncMode = "SyncFromCluster"
    84  )
    85  
    86  // IsValidMode returns true if the given mode is valid.
    87  func IsValidMode(m NodeSyncMode) bool {
    88  	switch m {
    89  	case SyncFromCloud:
    90  	case SyncFromCluster:
    91  	default:
    92  		return false
    93  	}
    94  	return true
    95  }
    96  
    97  // NodeSync synchronizes the state for a single node in the cluster.
    98  type NodeSync struct {
    99  	c          controller
   100  	cloudAlias cloudAlias
   101  	kubeAPI    kubeAPI
   102  	mode       NodeSyncMode
   103  	nodeName   string
   104  	opChan     chan syncOp
   105  	set        *cidrset.CidrSet
   106  }
   107  
   108  // New returns a new syncer for a given node.
   109  func New(c controller, cloudAlias cloudAlias, kubeAPI kubeAPI, mode NodeSyncMode, nodeName string, set *cidrset.CidrSet) *NodeSync {
   110  	return &NodeSync{
   111  		c:          c,
   112  		cloudAlias: cloudAlias,
   113  		kubeAPI:    kubeAPI,
   114  		mode:       mode,
   115  		nodeName:   nodeName,
   116  		opChan:     make(chan syncOp, 1),
   117  		set:        set,
   118  	}
   119  }
   120  
   121  // Loop runs the sync loop for a given node. done is an optional channel that
   122  // is closed when the Loop() returns.
   123  func (sync *NodeSync) Loop(logger klog.Logger, done chan struct{}) {
   124  	logger.V(2).Info("Starting sync loop", "node", klog.KRef("", sync.nodeName))
   125  
   126  	defer func() {
   127  		if done != nil {
   128  			close(done)
   129  		}
   130  	}()
   131  
   132  	timeout := sync.c.ResyncTimeout()
   133  	delayTimer := time.NewTimer(timeout)
   134  	logger.V(4).Info("Try to resync node later", "node", klog.KRef("", sync.nodeName), "resyncTime", timeout)
   135  
   136  	for {
   137  		select {
   138  		case op, more := <-sync.opChan:
   139  			if !more {
   140  				logger.V(2).Info("Stopping sync loop")
   141  				return
   142  			}
   143  			sync.c.ReportResult(op.run(logger, sync))
   144  			if !delayTimer.Stop() {
   145  				<-delayTimer.C
   146  			}
   147  		case <-delayTimer.C:
   148  			logger.V(4).Info("Running resync", "node", klog.KRef("", sync.nodeName))
   149  			sync.c.ReportResult((&updateOp{}).run(logger, sync))
   150  		}
   151  
   152  		timeout := sync.c.ResyncTimeout()
   153  		delayTimer.Reset(timeout)
   154  		logger.V(4).Info("Try to resync node later", "node", klog.KRef("", sync.nodeName), "resyncTime", timeout)
   155  	}
   156  }
   157  
   158  // Update causes an update operation on the given node. If node is nil, then
   159  // the syncer will fetch the node spec from the API server before syncing.
   160  //
   161  // This method is safe to call from multiple goroutines.
   162  func (sync *NodeSync) Update(node *v1.Node) {
   163  	sync.opChan <- &updateOp{node}
   164  }
   165  
   166  // Delete performs the sync operations necessary to remove the node from the
   167  // IPAM state.
   168  //
   169  // This method is safe to call from multiple goroutines.
   170  func (sync *NodeSync) Delete(node *v1.Node) {
   171  	sync.opChan <- &deleteOp{node}
   172  	close(sync.opChan)
   173  }
   174  
   175  // syncOp is the interface for generic sync operation.
   176  type syncOp interface {
   177  	// run the requested sync operation.
   178  	run(logger klog.Logger, sync *NodeSync) error
   179  }
   180  
   181  // updateOp handles creation and updates of a node.
   182  type updateOp struct {
   183  	node *v1.Node
   184  }
   185  
   186  func (op *updateOp) String() string {
   187  	if op.node == nil {
   188  		return fmt.Sprintf("updateOp(nil)")
   189  	}
   190  	return fmt.Sprintf("updateOp(%q,%v)", op.node.Name, op.node.Spec.PodCIDR)
   191  }
   192  
   193  func (op *updateOp) run(logger klog.Logger, sync *NodeSync) error {
   194  	logger.V(3).Info("Running updateOp", "updateOp", op)
   195  
   196  	ctx := context.Background()
   197  
   198  	if op.node == nil {
   199  		logger.V(3).Info("Getting node spec", "node", klog.KRef("", sync.nodeName))
   200  		node, err := sync.kubeAPI.Node(ctx, sync.nodeName)
   201  		if err != nil {
   202  			logger.Error(err, "Error getting node pec", "node", klog.KRef("", sync.nodeName))
   203  			return err
   204  		}
   205  		op.node = node
   206  	}
   207  
   208  	aliasRange, err := sync.cloudAlias.Alias(ctx, op.node)
   209  	if err != nil {
   210  		logger.Error(err, "Error getting cloud alias for node", "node", klog.KRef("", sync.nodeName))
   211  		return err
   212  	}
   213  
   214  	switch {
   215  	case op.node.Spec.PodCIDR == "" && aliasRange == nil:
   216  		err = op.allocateRange(ctx, sync, op.node)
   217  	case op.node.Spec.PodCIDR == "" && aliasRange != nil:
   218  		err = op.updateNodeFromAlias(ctx, sync, op.node, aliasRange)
   219  	case op.node.Spec.PodCIDR != "" && aliasRange == nil:
   220  		err = op.updateAliasFromNode(ctx, sync, op.node)
   221  	case op.node.Spec.PodCIDR != "" && aliasRange != nil:
   222  		err = op.validateRange(ctx, sync, op.node, aliasRange)
   223  	}
   224  
   225  	return err
   226  }
   227  
   228  // validateRange checks that the allocated range and the alias range
   229  // match.
   230  func (op *updateOp) validateRange(ctx context.Context, sync *NodeSync, node *v1.Node, aliasRange *net.IPNet) error {
   231  	if node.Spec.PodCIDR != aliasRange.String() {
   232  		klog.FromContext(ctx).Error(nil, "Inconsistency detected between node PodCIDR and node alias", "podCIDR", node.Spec.PodCIDR, "alias", aliasRange)
   233  		sync.kubeAPI.EmitNodeWarningEvent(node.Name, MismatchEvent,
   234  			"Node.Spec.PodCIDR != cloud alias (%v != %v)", node.Spec.PodCIDR, aliasRange)
   235  		// User intervention is required in this case, as this is most likely due
   236  		// to the user mucking around with their VM aliases on the side.
   237  	} else {
   238  		klog.FromContext(ctx).V(4).Info("Node CIDR range is matches cloud assignment", "node", klog.KObj(node), "podCIDR", node.Spec.PodCIDR)
   239  	}
   240  	return nil
   241  }
   242  
   243  // updateNodeFromAlias updates the node from the cloud allocated
   244  // alias.
   245  func (op *updateOp) updateNodeFromAlias(ctx context.Context, sync *NodeSync, node *v1.Node, aliasRange *net.IPNet) error {
   246  	if sync.mode != SyncFromCloud {
   247  		sync.kubeAPI.EmitNodeWarningEvent(node.Name, InvalidModeEvent,
   248  			"Cannot sync from cloud in mode %q", sync.mode)
   249  		return fmt.Errorf("cannot sync from cloud in mode %q", sync.mode)
   250  	}
   251  	logger := klog.FromContext(ctx)
   252  	logger.V(2).Info("Updating node spec with alias range", "podCIDR", aliasRange)
   253  
   254  	if err := sync.set.Occupy(aliasRange); err != nil {
   255  		logger.Error(nil, "Error occupying range for node", "node", klog.KRef("", sync.nodeName), "alias", aliasRange)
   256  		return err
   257  	}
   258  
   259  	if err := sync.kubeAPI.UpdateNodePodCIDR(ctx, node, aliasRange); err != nil {
   260  		logger.Error(err, "Could not update node PodCIDR", "node", klog.KObj(node), "podCIDR", aliasRange)
   261  		return err
   262  	}
   263  
   264  	logger.V(2).Info("Node PodCIDR updated", "node", klog.KObj(node), "podCIDR", aliasRange)
   265  
   266  	if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil {
   267  		logger.Error(err, "Could not update node NetworkUnavailable status to false")
   268  		return err
   269  	}
   270  
   271  	logger.V(2).Info("Updated node PodCIDR from cloud alias", "node", klog.KObj(node), "alias", aliasRange)
   272  
   273  	return nil
   274  }
   275  
   276  // updateAliasFromNode updates the cloud alias given the node allocation.
   277  func (op *updateOp) updateAliasFromNode(ctx context.Context, sync *NodeSync, node *v1.Node) error {
   278  	if sync.mode != SyncFromCluster {
   279  		sync.kubeAPI.EmitNodeWarningEvent(
   280  			node.Name, InvalidModeEvent, "Cannot sync to cloud in mode %q", sync.mode)
   281  		return fmt.Errorf("cannot sync to cloud in mode %q", sync.mode)
   282  	}
   283  
   284  	_, aliasRange, err := netutils.ParseCIDRSloppy(node.Spec.PodCIDR)
   285  
   286  	logger := klog.FromContext(ctx)
   287  	if err != nil {
   288  		logger.Error(err, "Could not parse PodCIDR for node", "node", klog.KObj(node), "podCIDR", node.Spec.PodCIDR)
   289  		return err
   290  	}
   291  
   292  	if err := sync.set.Occupy(aliasRange); err != nil {
   293  		logger.Error(nil, "Error occupying range for node", "node", klog.KRef("", sync.nodeName), "alias", aliasRange)
   294  		return err
   295  	}
   296  
   297  	if err := sync.cloudAlias.AddAlias(ctx, node, aliasRange); err != nil {
   298  		logger.Error(err, "Could not add alias for node", "node", klog.KObj(node), "alias", aliasRange)
   299  		return err
   300  	}
   301  
   302  	if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil {
   303  		logger.Error(err, "Could not update node NetworkUnavailable status to false")
   304  		return err
   305  	}
   306  
   307  	logger.V(2).Info("Updated node cloud alias with node spec", "node", klog.KObj(node), "podCIDR", node.Spec.PodCIDR)
   308  
   309  	return nil
   310  }
   311  
   312  // allocateRange allocates a new range and updates both the cloud
   313  // platform and the node allocation.
   314  func (op *updateOp) allocateRange(ctx context.Context, sync *NodeSync, node *v1.Node) error {
   315  	if sync.mode != SyncFromCluster {
   316  		sync.kubeAPI.EmitNodeWarningEvent(node.Name, InvalidModeEvent,
   317  			"Cannot allocate CIDRs in mode %q", sync.mode)
   318  		return fmt.Errorf("controller cannot allocate CIDRS in mode %q", sync.mode)
   319  	}
   320  
   321  	cidrRange, err := sync.set.AllocateNext()
   322  	if err != nil {
   323  		return err
   324  	}
   325  	// If addAlias returns a hard error, cidrRange will be leaked as there
   326  	// is no durable record of the range. The missing space will be
   327  	// recovered on the next restart of the controller.
   328  	logger := klog.FromContext(ctx)
   329  	if err := sync.cloudAlias.AddAlias(ctx, node, cidrRange); err != nil {
   330  		logger.Error(err, "Could not add alias for node", "node", klog.KObj(node), "alias", cidrRange)
   331  		return err
   332  	}
   333  
   334  	if err := sync.kubeAPI.UpdateNodePodCIDR(ctx, node, cidrRange); err != nil {
   335  		logger.Error(err, "Could not update node PodCIDR", "node", klog.KObj(node), "podCIDR", cidrRange)
   336  		return err
   337  	}
   338  
   339  	if err := sync.kubeAPI.UpdateNodeNetworkUnavailable(node.Name, false); err != nil {
   340  		logger.Error(err, "Could not update node NetworkUnavailable status to false")
   341  		return err
   342  	}
   343  
   344  	logger.V(2).Info("Allocated PodCIDR for node", "node", klog.KObj(node), "podCIDR", cidrRange)
   345  
   346  	return nil
   347  }
   348  
   349  // deleteOp handles deletion of a node.
   350  type deleteOp struct {
   351  	node *v1.Node
   352  }
   353  
   354  func (op *deleteOp) String() string {
   355  	if op.node == nil {
   356  		return fmt.Sprintf("deleteOp(nil)")
   357  	}
   358  	return fmt.Sprintf("deleteOp(%q,%v)", op.node.Name, op.node.Spec.PodCIDR)
   359  }
   360  
   361  func (op *deleteOp) run(logger klog.Logger, sync *NodeSync) error {
   362  	logger.V(3).Info("Running deleteOp", "deleteOp", op)
   363  	if op.node.Spec.PodCIDR == "" {
   364  		logger.V(2).Info("Node was deleted, node had no PodCIDR range assigned", "node", klog.KObj(op.node))
   365  		return nil
   366  	}
   367  
   368  	_, cidrRange, err := netutils.ParseCIDRSloppy(op.node.Spec.PodCIDR)
   369  	if err != nil {
   370  		logger.Error(err, "Deleted node has an invalid podCIDR", "node", klog.KObj(op.node), "podCIDR", op.node.Spec.PodCIDR)
   371  		sync.kubeAPI.EmitNodeWarningEvent(op.node.Name, InvalidPodCIDR,
   372  			"Node %q has an invalid PodCIDR: %q", op.node.Name, op.node.Spec.PodCIDR)
   373  		return nil
   374  	}
   375  
   376  	sync.set.Release(cidrRange)
   377  	logger.V(2).Info("Node was deleted, releasing CIDR range", "node", klog.KObj(op.node), "podCIDR", op.node.Spec.PodCIDR)
   378  
   379  	return nil
   380  }
   381  

View as plain text