...

Source file src/k8s.io/kubernetes/pkg/controlplane/reconcilers/instancecount.go

Documentation: k8s.io/kubernetes/pkg/controlplane/reconcilers

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  // Package reconcilers master count based reconciler
    18  package reconcilers
    19  
    20  import (
    21  	"net"
    22  	"sync"
    23  
    24  	corev1 "k8s.io/api/core/v1"
    25  	"k8s.io/apimachinery/pkg/api/errors"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/client-go/util/retry"
    28  	"k8s.io/klog/v2"
    29  	endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
    30  )
    31  
    32  // masterCountEndpointReconciler reconciles endpoints based on a specified expected number of
    33  // masters. masterCountEndpointReconciler implements EndpointReconciler.
    34  type masterCountEndpointReconciler struct {
    35  	masterCount           int
    36  	epAdapter             EndpointsAdapter
    37  	stopReconcilingCalled bool
    38  	reconcilingLock       sync.Mutex
    39  }
    40  
    41  // NewMasterCountEndpointReconciler creates a new EndpointReconciler that reconciles based on a
    42  // specified expected number of masters.
    43  func NewMasterCountEndpointReconciler(masterCount int, epAdapter EndpointsAdapter) EndpointReconciler {
    44  	return &masterCountEndpointReconciler{
    45  		masterCount: masterCount,
    46  		epAdapter:   epAdapter,
    47  	}
    48  }
    49  
    50  // ReconcileEndpoints sets the endpoints for the given apiserver service (ro or rw).
    51  // ReconcileEndpoints expects that the endpoints objects it manages will all be
    52  // managed only by ReconcileEndpoints; therefore, to understand this, you need only
    53  // understand the requirements and the body of this function.
    54  //
    55  // Requirements:
    56  //   - All apiservers MUST use the same ports for their {rw, ro} services.
    57  //   - All apiservers MUST use ReconcileEndpoints and only ReconcileEndpoints to manage the
    58  //     endpoints for their {rw, ro} services.
    59  //   - All apiservers MUST know and agree on the number of apiservers expected
    60  //     to be running (c.masterCount).
    61  //   - ReconcileEndpoints is called periodically from all apiservers.
    62  func (r *masterCountEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
    63  	r.reconcilingLock.Lock()
    64  	defer r.reconcilingLock.Unlock()
    65  
    66  	if r.stopReconcilingCalled {
    67  		return nil
    68  	}
    69  
    70  	e, err := r.epAdapter.Get(metav1.NamespaceDefault, serviceName, metav1.GetOptions{})
    71  	if err != nil {
    72  		e = &corev1.Endpoints{
    73  			ObjectMeta: metav1.ObjectMeta{
    74  				Name:      serviceName,
    75  				Namespace: metav1.NamespaceDefault,
    76  			},
    77  		}
    78  	}
    79  
    80  	// Don't use the EndpointSliceMirroring controller to mirror this to
    81  	// EndpointSlices. This may change in the future.
    82  	skipMirrorChanged := setSkipMirrorTrue(e)
    83  
    84  	if errors.IsNotFound(err) {
    85  		// Simply create non-existing endpoints for the service.
    86  		e.Subsets = []corev1.EndpointSubset{{
    87  			Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
    88  			Ports:     endpointPorts,
    89  		}}
    90  		_, err = r.epAdapter.Create(metav1.NamespaceDefault, e)
    91  		return err
    92  	}
    93  
    94  	// First, determine if the endpoint is in the format we expect (one
    95  	// subset, ports matching endpointPorts, N IP addresses).
    96  	formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormat(e, ip.String(), endpointPorts, r.masterCount, reconcilePorts)
    97  	if !formatCorrect {
    98  		// Something is egregiously wrong, just re-make the endpoints record.
    99  		e.Subsets = []corev1.EndpointSubset{{
   100  			Addresses: []corev1.EndpointAddress{{IP: ip.String()}},
   101  			Ports:     endpointPorts,
   102  		}}
   103  		klog.Warningf("Resetting endpoints for master service %q to %#v", serviceName, e)
   104  		_, err = r.epAdapter.Update(metav1.NamespaceDefault, e)
   105  		return err
   106  	}
   107  
   108  	if !skipMirrorChanged && ipCorrect && portsCorrect {
   109  		return r.epAdapter.EnsureEndpointSliceFromEndpoints(metav1.NamespaceDefault, e)
   110  	}
   111  	if !ipCorrect {
   112  		// We *always* add our own IP address.
   113  		e.Subsets[0].Addresses = append(e.Subsets[0].Addresses, corev1.EndpointAddress{IP: ip.String()})
   114  
   115  		// Lexicographic order is retained by this step.
   116  		e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
   117  
   118  		// If too many IP addresses, remove the ones lexicographically after our
   119  		// own IP address.  Given the requirements stated at the top of
   120  		// this function, this should cause the list of IP addresses to
   121  		// become eventually correct.
   122  		if addrs := &e.Subsets[0].Addresses; len(*addrs) > r.masterCount {
   123  			// addrs is a pointer because we're going to mutate it.
   124  			for i, addr := range *addrs {
   125  				if addr.IP == ip.String() {
   126  					for len(*addrs) > r.masterCount {
   127  						// wrap around if necessary.
   128  						remove := (i + 1) % len(*addrs)
   129  						*addrs = append((*addrs)[:remove], (*addrs)[remove+1:]...)
   130  					}
   131  					break
   132  				}
   133  			}
   134  		}
   135  	}
   136  	if !portsCorrect {
   137  		// Reset ports.
   138  		e.Subsets[0].Ports = endpointPorts
   139  	}
   140  	klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, e)
   141  	_, err = r.epAdapter.Update(metav1.NamespaceDefault, e)
   142  	return err
   143  }
   144  
   145  func (r *masterCountEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
   146  	r.reconcilingLock.Lock()
   147  	defer r.reconcilingLock.Unlock()
   148  
   149  	e, err := r.epAdapter.Get(metav1.NamespaceDefault, serviceName, metav1.GetOptions{})
   150  	if err != nil {
   151  		if errors.IsNotFound(err) {
   152  			// Endpoint doesn't exist
   153  			return nil
   154  		}
   155  		return err
   156  	}
   157  
   158  	if len(e.Subsets) == 0 {
   159  		// no action is needed to remove the endpoint
   160  		return nil
   161  	}
   162  	// Remove our IP from the list of addresses
   163  	new := []corev1.EndpointAddress{}
   164  	for _, addr := range e.Subsets[0].Addresses {
   165  		if addr.IP != ip.String() {
   166  			new = append(new, addr)
   167  		}
   168  	}
   169  	e.Subsets[0].Addresses = new
   170  	e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
   171  	err = retry.RetryOnConflict(retry.DefaultBackoff, func() error {
   172  		_, err := r.epAdapter.Update(metav1.NamespaceDefault, e)
   173  		return err
   174  	})
   175  	return err
   176  }
   177  
   178  func (r *masterCountEndpointReconciler) StopReconciling() {
   179  	r.reconcilingLock.Lock()
   180  	defer r.reconcilingLock.Unlock()
   181  	r.stopReconcilingCalled = true
   182  }
   183  
   184  func (r *masterCountEndpointReconciler) Destroy() {
   185  }
   186  
   187  // Determine if the endpoint is in the format ReconcileEndpoints expects.
   188  //
   189  // Return values:
   190  //   - formatCorrect is true if exactly one subset is found.
   191  //   - ipCorrect is true when current master's IP is found and the number
   192  //     of addresses is less than or equal to the master count.
   193  //   - portsCorrect is true when endpoint ports exactly match provided ports.
   194  //     portsCorrect is only evaluated when reconcilePorts is set to true.
   195  func checkEndpointSubsetFormat(e *corev1.Endpoints, ip string, ports []corev1.EndpointPort, count int, reconcilePorts bool) (formatCorrect bool, ipCorrect bool, portsCorrect bool) {
   196  	if len(e.Subsets) != 1 {
   197  		return false, false, false
   198  	}
   199  	sub := &e.Subsets[0]
   200  	portsCorrect = true
   201  	if reconcilePorts {
   202  		if len(sub.Ports) != len(ports) {
   203  			portsCorrect = false
   204  		}
   205  		for i, port := range ports {
   206  			if len(sub.Ports) <= i || port != sub.Ports[i] {
   207  				portsCorrect = false
   208  				break
   209  			}
   210  		}
   211  	}
   212  	for _, addr := range sub.Addresses {
   213  		if addr.IP == ip {
   214  			ipCorrect = len(sub.Addresses) <= count
   215  			break
   216  		}
   217  	}
   218  	return true, ipCorrect, portsCorrect
   219  }
   220  

View as plain text