...

Source file src/k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller/repair.go

Documentation: k8s.io/kubernetes/pkg/registry/core/service/ipallocator/controller

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package controller
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"sync"
    24  	"time"
    25  
    26  	v1 "k8s.io/api/core/v1"
    27  	"k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/util/runtime"
    30  	"k8s.io/apimachinery/pkg/util/wait"
    31  	corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
    32  	eventsv1client "k8s.io/client-go/kubernetes/typed/events/v1"
    33  	"k8s.io/client-go/tools/events"
    34  	"k8s.io/client-go/util/retry"
    35  	"k8s.io/kubernetes/pkg/api/legacyscheme"
    36  	api "k8s.io/kubernetes/pkg/apis/core"
    37  	"k8s.io/kubernetes/pkg/apis/core/v1/helper"
    38  	"k8s.io/kubernetes/pkg/registry/core/rangeallocation"
    39  	"k8s.io/kubernetes/pkg/registry/core/service/ipallocator"
    40  	netutils "k8s.io/utils/net"
    41  )
    42  
    43  // Repair is a controller loop that periodically examines all service ClusterIP allocations
    44  // and logs any errors, and then sets the compacted and accurate list of all allocated IPs.
    45  //
    46  // Handles:
    47  // * Duplicate ClusterIP assignments caused by operator action or undetected race conditions
    48  // * ClusterIPs that do not match the currently configured range
    49  // * Allocations to services that were not actually created due to a crash or powerloss
    50  // * Migrates old versions of Kubernetes services into the atomic ipallocator model automatically
    51  //
    52  // Can be run at infrequent intervals, and is best performed on startup of the master.
    53  // Is level driven and idempotent - all valid ClusterIPs will be updated into the ipallocator
    54  // map at the end of a single execution loop if no race is encountered.
    55  //
    56  // TODO: allocate new IPs if necessary
    57  // TODO: perform repair?
    58  type Repair struct {
    59  	interval      time.Duration
    60  	serviceClient corev1client.ServicesGetter
    61  
    62  	networkByFamily   map[v1.IPFamily]*net.IPNet                    // networks we operate on, by their family
    63  	allocatorByFamily map[v1.IPFamily]rangeallocation.RangeRegistry // allocators we use, by their family
    64  
    65  	leaksByFamily map[v1.IPFamily]map[string]int // counter per leaked IP per family
    66  	broadcaster   events.EventBroadcaster
    67  	recorder      events.EventRecorder
    68  }
    69  
    70  // How many times we need to detect a leak before we clean up.  This is to
    71  // avoid races between allocating an IP and using it.
    72  const numRepairsBeforeLeakCleanup = 3
    73  
    74  // NewRepair creates a controller that periodically ensures that all clusterIPs are uniquely allocated across the cluster
    75  // and generates informational warnings for a cluster that is not in sync.
    76  func NewRepair(interval time.Duration, serviceClient corev1client.ServicesGetter, eventClient eventsv1client.EventsV1Interface, network *net.IPNet, alloc rangeallocation.RangeRegistry, secondaryNetwork *net.IPNet, secondaryAlloc rangeallocation.RangeRegistry) *Repair {
    77  	eventBroadcaster := events.NewBroadcaster(&events.EventSinkImpl{Interface: eventClient})
    78  	recorder := eventBroadcaster.NewRecorder(legacyscheme.Scheme, "ipallocator-repair-controller")
    79  
    80  	// build *ByFamily struct members
    81  	networkByFamily := make(map[v1.IPFamily]*net.IPNet)
    82  	allocatorByFamily := make(map[v1.IPFamily]rangeallocation.RangeRegistry)
    83  	leaksByFamily := make(map[v1.IPFamily]map[string]int)
    84  
    85  	primary := v1.IPv4Protocol
    86  	secondary := v1.IPv6Protocol
    87  	if netutils.IsIPv6(network.IP) {
    88  		primary = v1.IPv6Protocol
    89  	}
    90  
    91  	networkByFamily[primary] = network
    92  	allocatorByFamily[primary] = alloc
    93  	leaksByFamily[primary] = make(map[string]int)
    94  
    95  	if secondaryNetwork != nil && secondaryNetwork.IP != nil {
    96  		if primary == v1.IPv6Protocol {
    97  			secondary = v1.IPv4Protocol
    98  		}
    99  		networkByFamily[secondary] = secondaryNetwork
   100  		allocatorByFamily[secondary] = secondaryAlloc
   101  		leaksByFamily[secondary] = make(map[string]int)
   102  	}
   103  
   104  	registerMetrics()
   105  
   106  	return &Repair{
   107  		interval:      interval,
   108  		serviceClient: serviceClient,
   109  
   110  		networkByFamily:   networkByFamily,
   111  		allocatorByFamily: allocatorByFamily,
   112  
   113  		leaksByFamily: leaksByFamily,
   114  		broadcaster:   eventBroadcaster,
   115  		recorder:      recorder,
   116  	}
   117  }
   118  
   119  // RunUntil starts the controller until the provided ch is closed.
   120  func (c *Repair) RunUntil(onFirstSuccess func(), stopCh chan struct{}) {
   121  	c.broadcaster.StartRecordingToSink(stopCh)
   122  	defer c.broadcaster.Shutdown()
   123  
   124  	var once sync.Once
   125  	wait.Until(func() {
   126  		if err := c.runOnce(); err != nil {
   127  			runtime.HandleError(err)
   128  			return
   129  		}
   130  		once.Do(onFirstSuccess)
   131  	}, c.interval, stopCh)
   132  }
   133  
   134  // runOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
   135  func (c *Repair) runOnce() error {
   136  	return retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   137  		err := c.doRunOnce()
   138  		if err != nil {
   139  			clusterIPRepairReconcileErrors.Inc()
   140  		}
   141  		return err
   142  	})
   143  }
   144  
   145  // doRunOnce verifies the state of the cluster IP allocations and returns an error if an unrecoverable problem occurs.
   146  func (c *Repair) doRunOnce() error {
   147  	// TODO: (per smarterclayton) if Get() or ListServices() is a weak consistency read,
   148  	// or if they are executed against different leaders,
   149  	// the ordering guarantee required to ensure no IP is allocated twice is violated.
   150  	// ListServices must return a ResourceVersion higher than the etcd index Get triggers,
   151  	// and the release code must not release services that have had IPs allocated but not yet been created
   152  	// See #8295
   153  
   154  	// If etcd server is not running we should wait for some time and fail only then. This is particularly
   155  	// important when we start apiserver and etcd at the same time.
   156  	snapshotByFamily := make(map[v1.IPFamily]*api.RangeAllocation)
   157  	storedByFamily := make(map[v1.IPFamily]ipallocator.Interface)
   158  
   159  	err := wait.PollImmediate(time.Second, 10*time.Second, func() (bool, error) {
   160  		for family, allocator := range c.allocatorByFamily {
   161  			// get snapshot if it is not there
   162  			if _, ok := snapshotByFamily[family]; !ok {
   163  				snapshot, err := allocator.Get()
   164  				if err != nil {
   165  					return false, err
   166  				}
   167  
   168  				snapshotByFamily[family] = snapshot
   169  			}
   170  		}
   171  		return true, nil
   172  	})
   173  
   174  	if err != nil {
   175  		return fmt.Errorf("unable to refresh the service IP block: %v", err)
   176  	}
   177  
   178  	// ensure that ranges are assigned
   179  	for family, snapshot := range snapshotByFamily {
   180  		if snapshot.Range == "" {
   181  			snapshot.Range = c.networkByFamily[family].String()
   182  		}
   183  	}
   184  
   185  	// Create an allocator because it is easy to use.
   186  	for family, snapshot := range snapshotByFamily {
   187  		stored, err := ipallocator.NewFromSnapshot(snapshot)
   188  		if err != nil {
   189  			return fmt.Errorf("unable to rebuild allocator from snapshots for family:%v with error:%v", family, err)
   190  		}
   191  
   192  		storedByFamily[family] = stored
   193  	}
   194  
   195  	rebuiltByFamily := make(map[v1.IPFamily]*ipallocator.Range)
   196  
   197  	for family, network := range c.networkByFamily {
   198  		rebuilt, err := ipallocator.NewInMemory(network)
   199  		if err != nil {
   200  			return fmt.Errorf("unable to create CIDR range for family %v: %v", family, err)
   201  		}
   202  
   203  		rebuiltByFamily[family] = rebuilt
   204  	}
   205  	// We explicitly send no resource version, since the resource version
   206  	// of 'snapshot' is from a different collection, it's not comparable to
   207  	// the service collection. The caching layer keeps per-collection RVs,
   208  	// and this is proper, since in theory the collections could be hosted
   209  	// in separate etcd (or even non-etcd) instances.
   210  	list, err := c.serviceClient.Services(metav1.NamespaceAll).List(context.TODO(), metav1.ListOptions{})
   211  	if err != nil {
   212  		return fmt.Errorf("unable to refresh the service IP block: %v", err)
   213  	}
   214  
   215  	getFamilyByIP := func(ip net.IP) v1.IPFamily {
   216  		if netutils.IsIPv6(ip) {
   217  			return v1.IPv6Protocol
   218  		}
   219  		return v1.IPv4Protocol
   220  	}
   221  
   222  	// Check every Service's ClusterIP, and rebuild the state as we think it should be.
   223  	for _, svc := range list.Items {
   224  		if !helper.IsServiceIPSet(&svc) {
   225  			// didn't need a cluster IP
   226  			continue
   227  		}
   228  
   229  		for _, ip := range svc.Spec.ClusterIPs {
   230  			ip := netutils.ParseIPSloppy(ip)
   231  			if ip == nil {
   232  				// cluster IP is corrupt
   233  				clusterIPRepairIPErrors.WithLabelValues("invalid").Inc()
   234  				c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s is not a valid IP; please recreate service", ip)
   235  				runtime.HandleError(fmt.Errorf("the cluster IP %s for service %s/%s is not a valid IP; please recreate", ip, svc.Name, svc.Namespace))
   236  				continue
   237  			}
   238  
   239  			family := getFamilyByIP(ip)
   240  			if _, ok := rebuiltByFamily[family]; !ok {
   241  				// this service is using an IPFamily no longer configured on cluster
   242  				clusterIPRepairIPErrors.WithLabelValues("invalid").Inc()
   243  				c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotValid", "ClusterIPValidation", "Cluster IP %s(%s) is of ip family that is no longer configured on cluster; please recreate service", ip, family)
   244  				runtime.HandleError(fmt.Errorf("the cluster IP %s(%s) for service %s/%s is of ip family that is no longer configured on cluster; please recreate", ip, family, svc.Name, svc.Namespace))
   245  				continue
   246  			}
   247  
   248  			// mark it as in-use
   249  			actualAlloc := rebuiltByFamily[family]
   250  			switch err := actualAlloc.Allocate(ip); err {
   251  			case nil:
   252  				actualStored := storedByFamily[family]
   253  				if actualStored.Has(ip) {
   254  					// remove it from the old set, so we can find leaks
   255  					actualStored.Release(ip)
   256  				} else {
   257  					// cluster IP doesn't seem to be allocated
   258  					clusterIPRepairIPErrors.WithLabelValues("repair").Inc()
   259  					c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPNotAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s is not allocated; repairing", family, ip)
   260  					runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not allocated; repairing", family, ip, svc.Name, svc.Namespace))
   261  				}
   262  				delete(c.leaksByFamily[family], ip.String()) // it is used, so it can't be leaked
   263  			case ipallocator.ErrAllocated:
   264  				// cluster IP is duplicate
   265  				clusterIPRepairIPErrors.WithLabelValues("duplicate").Inc()
   266  				c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPAlreadyAllocated", "ClusterIPAllocation", "Cluster IP [%v]:%s was assigned to multiple services; please recreate service", family, ip)
   267  				runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s was assigned to multiple services; please recreate", family, ip, svc.Name, svc.Namespace))
   268  			case err.(*ipallocator.ErrNotInRange):
   269  				// cluster IP is out of range
   270  				clusterIPRepairIPErrors.WithLabelValues("outOfRange").Inc()
   271  				c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ClusterIPOutOfRange", "ClusterIPAllocation", "Cluster IP [%v]:%s is not within the service CIDR %s; please recreate service", family, ip, c.networkByFamily[family])
   272  				runtime.HandleError(fmt.Errorf("the cluster IP [%v]:%s for service %s/%s is not within the service CIDR %s; please recreate", family, ip, svc.Name, svc.Namespace, c.networkByFamily[family]))
   273  			case ipallocator.ErrFull:
   274  				// somehow we are out of IPs
   275  				clusterIPRepairIPErrors.WithLabelValues("full").Inc()
   276  				cidr := actualAlloc.CIDR()
   277  				c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "ServiceCIDRFull", "ClusterIPAllocation", "Service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
   278  				return fmt.Errorf("the service CIDR %v is full; you must widen the CIDR in order to create new services for Cluster IP [%v]:%s", cidr, family, ip)
   279  			default:
   280  				clusterIPRepairIPErrors.WithLabelValues("unknown").Inc()
   281  				c.recorder.Eventf(&svc, nil, v1.EventTypeWarning, "UnknownError", "ClusterIPAllocation", "Unable to allocate cluster IP [%v]:%s due to an unknown error", family, ip)
   282  				return fmt.Errorf("unable to allocate cluster IP [%v]:%s for service %s/%s due to an unknown error, exiting: %v", family, ip, svc.Name, svc.Namespace, err)
   283  			}
   284  		}
   285  	}
   286  
   287  	// leak check
   288  	for family, leaks := range c.leaksByFamily {
   289  		c.checkLeaked(leaks, storedByFamily[family], rebuiltByFamily[family])
   290  	}
   291  
   292  	// save logic
   293  	// Blast the rebuilt state into storage.
   294  	for family, rebuilt := range rebuiltByFamily {
   295  		err = c.saveSnapShot(rebuilt, c.allocatorByFamily[family], snapshotByFamily[family])
   296  		if err != nil {
   297  			return err
   298  		}
   299  	}
   300  
   301  	return nil
   302  }
   303  
   304  func (c *Repair) saveSnapShot(rebuilt *ipallocator.Range, alloc rangeallocation.RangeRegistry, snapshot *api.RangeAllocation) error {
   305  	if err := rebuilt.Snapshot(snapshot); err != nil {
   306  		return fmt.Errorf("unable to snapshot the updated service IP allocations: %v", err)
   307  	}
   308  	if err := alloc.CreateOrUpdate(snapshot); err != nil {
   309  		if errors.IsConflict(err) {
   310  			return err
   311  		}
   312  		return fmt.Errorf("unable to persist the updated service IP allocations: %v", err)
   313  	}
   314  
   315  	return nil
   316  }
   317  
   318  func (c *Repair) checkLeaked(leaks map[string]int, stored ipallocator.Interface, rebuilt *ipallocator.Range) {
   319  	// Check for IPs that are left in the old set.  They appear to have been leaked.
   320  	stored.ForEach(func(ip net.IP) {
   321  		count, found := leaks[ip.String()]
   322  		switch {
   323  		case !found:
   324  			// flag it to be cleaned up after any races (hopefully) are gone
   325  			runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked: flagging for later clean up", ip))
   326  			count = numRepairsBeforeLeakCleanup - 1
   327  			fallthrough
   328  		case count > 0:
   329  			// pretend it is still in use until count expires
   330  			leaks[ip.String()] = count - 1
   331  			if err := rebuilt.Allocate(ip); err != nil {
   332  				// do not increment the metric here, if it is a leak it will be detected once the counter gets to 0
   333  				runtime.HandleError(fmt.Errorf("the cluster IP %s may have leaked, but can not be allocated: %v", ip, err))
   334  			}
   335  		default:
   336  			clusterIPRepairIPErrors.WithLabelValues("leak").Inc()
   337  			// do not add it to the rebuilt set, which means it will be available for reuse
   338  			runtime.HandleError(fmt.Errorf("the cluster IP %s appears to have leaked: cleaning up", ip))
   339  		}
   340  	})
   341  }
   342  

View as plain text