
Source file src/k8s.io/kubernetes/pkg/controlplane/reconcilers/lease.go

Documentation: k8s.io/kubernetes/pkg/controlplane/reconcilers

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     8      http://www.apache.org/licenses/LICENSE-2.0
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    17  package reconcilers
    19  /*
    20  Original Source:
    21  https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c7/pkg/cmd/server/election/lease_endpoint_reconciler.go
    22  */
    24  import (
    25  	"fmt"
    26  	"net"
    27  	"path"
    28  	"sync"
    29  	"sync/atomic"
    30  	"time"
    32  	"k8s.io/klog/v2"
    34  	corev1 "k8s.io/api/core/v1"
    35  	"k8s.io/apimachinery/pkg/api/errors"
    36  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    37  	kruntime "k8s.io/apimachinery/pkg/runtime"
    38  	apirequest "k8s.io/apiserver/pkg/endpoints/request"
    39  	"k8s.io/apiserver/pkg/registry/rest"
    40  	"k8s.io/apiserver/pkg/storage"
    41  	"k8s.io/apiserver/pkg/storage/storagebackend"
    42  	storagefactory "k8s.io/apiserver/pkg/storage/storagebackend/factory"
    43  	endpointsv1 "k8s.io/kubernetes/pkg/api/v1/endpoints"
    44  )
    46  // Leases is an interface which assists in managing the set of active masters
    47  type Leases interface {
    48  	// ListLeases retrieves a list of the current master IPs
    49  	ListLeases() ([]string, error)
    51  	// UpdateLease adds or refreshes a master's lease
    52  	UpdateLease(ip string) error
    54  	// RemoveLease removes a master's lease
    55  	RemoveLease(ip string) error
    57  	// Destroy cleans up everything on shutdown.
    58  	Destroy()
    59  }
    61  type storageLeases struct {
    62  	storage   storage.Interface
    63  	destroyFn func()
    64  	baseKey   string
    65  	leaseTime time.Duration
    66  }
    68  var _ Leases = &storageLeases{}
    70  // ListLeases retrieves a list of the current master IPs from storage
    71  func (s *storageLeases) ListLeases() ([]string, error) {
    72  	ipInfoList := &corev1.EndpointsList{}
    73  	storageOpts := storage.ListOptions{
    74  		ResourceVersion:      "0",
    75  		ResourceVersionMatch: metav1.ResourceVersionMatchNotOlderThan,
    76  		Predicate:            storage.Everything,
    77  		Recursive:            true,
    78  	}
    79  	if err := s.storage.GetList(apirequest.NewDefaultContext(), s.baseKey, storageOpts, ipInfoList); err != nil {
    80  		return nil, err
    81  	}
    83  	ipList := make([]string, 0, len(ipInfoList.Items))
    84  	for _, ip := range ipInfoList.Items {
    85  		if len(ip.Subsets) > 0 && len(ip.Subsets[0].Addresses) > 0 && len(ip.Subsets[0].Addresses[0].IP) > 0 {
    86  			ipList = append(ipList, ip.Subsets[0].Addresses[0].IP)
    87  		}
    88  	}
    90  	klog.V(6).Infof("Current master IPs listed in storage are %v", ipList)
    92  	return ipList, nil
    93  }
    95  // UpdateLease resets the TTL on a master IP in storage
    96  // UpdateLease will create a new key if it doesn't exist.
    97  func (s *storageLeases) UpdateLease(ip string) error {
    98  	key := path.Join(s.baseKey, ip)
    99  	return s.storage.GuaranteedUpdate(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, true, nil, func(input kruntime.Object, respMeta storage.ResponseMeta) (kruntime.Object, *uint64, error) {
   100  		// just make sure we've got the right IP set, and then refresh the TTL
   101  		existing := input.(*corev1.Endpoints)
   102  		existing.Subsets = []corev1.EndpointSubset{
   103  			{
   104  				Addresses: []corev1.EndpointAddress{{IP: ip}},
   105  			},
   106  		}
   108  		// leaseTime needs to be in seconds
   109  		leaseTime := uint64(s.leaseTime / time.Second)
   111  		// NB: GuaranteedUpdate does not perform the store operation unless
   112  		// something changed between load and store (not including resource
   113  		// version), meaning we can't refresh the TTL without actually
   114  		// changing a field.
   115  		existing.Generation++
   117  		klog.V(6).Infof("Resetting TTL on master IP %q listed in storage to %v", ip, leaseTime)
   119  		return existing, &leaseTime, nil
   120  	}, nil)
   121  }
   123  // RemoveLease removes the lease on a master IP in storage
   124  func (s *storageLeases) RemoveLease(ip string) error {
   125  	key := path.Join(s.baseKey, ip)
   126  	return s.storage.Delete(apirequest.NewDefaultContext(), key, &corev1.Endpoints{}, nil, rest.ValidateAllObjectFunc, nil)
   127  }
   129  func (s *storageLeases) Destroy() {
   130  	s.destroyFn()
   131  }
   133  // NewLeases creates a new etcd-based Leases implementation.
   134  func NewLeases(config *storagebackend.ConfigForResource, baseKey string, leaseTime time.Duration) (Leases, error) {
   135  	// note that newFunc, newListFunc and resourcePrefix
   136  	// can be left blank unless the storage.Watch method is used
   137  	leaseStorage, destroyFn, err := storagefactory.Create(*config, nil, nil, "")
   138  	if err != nil {
   139  		return nil, fmt.Errorf("error creating storage factory: %v", err)
   140  	}
   141  	var once sync.Once
   142  	return &storageLeases{
   143  		storage:   leaseStorage,
   144  		destroyFn: func() { once.Do(destroyFn) },
   145  		baseKey:   baseKey,
   146  		leaseTime: leaseTime,
   147  	}, nil
   148  }
   150  type leaseEndpointReconciler struct {
   151  	epAdapter             EndpointsAdapter
   152  	masterLeases          Leases
   153  	stopReconcilingCalled atomic.Bool
   154  	reconcilingLock       sync.Mutex
   155  }
   157  // NewLeaseEndpointReconciler creates a new LeaseEndpoint reconciler
   158  func NewLeaseEndpointReconciler(epAdapter EndpointsAdapter, masterLeases Leases) EndpointReconciler {
   159  	return &leaseEndpointReconciler{
   160  		epAdapter:    epAdapter,
   161  		masterLeases: masterLeases,
   162  	}
   163  }
   165  // ReconcileEndpoints lists keys in a special etcd directory.
   166  // Each key is expected to have a TTL of R+n, where R is the refresh interval
   167  // at which this function is called, and n is some small value.  If an
   168  // apiserver goes down, it will fail to refresh its key's TTL and the key will
   169  // expire. ReconcileEndpoints will notice that the endpoints object is
   170  // different from the directory listing, and update the endpoints object
   171  // accordingly.
   172  func (r *leaseEndpointReconciler) ReconcileEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
   173  	// reconcile endpoints only if apiserver was not shutdown
   174  	if r.stopReconcilingCalled.Load() {
   175  		return nil
   176  	}
   178  	// Ensure that there will be no race condition with the RemoveEndpoints.
   179  	r.reconcilingLock.Lock()
   180  	defer r.reconcilingLock.Unlock()
   182  	// Refresh the TTL on our key, independently of whether any error or
   183  	// update conflict happens below. This makes sure that at least some of
   184  	// the masters will add our endpoint.
   185  	if err := r.masterLeases.UpdateLease(ip.String()); err != nil {
   186  		return err
   187  	}
   189  	return r.doReconcile(serviceName, endpointPorts, reconcilePorts)
   190  }
   192  // doReconcile can be called from ReconcileEndpoints() or RemoveEndpoints().
   193  // it is NOT SAFE to call it from multiple goroutines.
   194  func (r *leaseEndpointReconciler) doReconcile(serviceName string, endpointPorts []corev1.EndpointPort, reconcilePorts bool) error {
   195  	e, err := r.epAdapter.Get(corev1.NamespaceDefault, serviceName, metav1.GetOptions{})
   196  	shouldCreate := false
   197  	if err != nil {
   198  		if !errors.IsNotFound(err) {
   199  			return err
   200  		}
   202  		// there are no endpoints and we should stop reconciling
   203  		if r.stopReconcilingCalled.Load() {
   204  			return nil
   205  		}
   207  		shouldCreate = true
   208  		e = &corev1.Endpoints{
   209  			ObjectMeta: metav1.ObjectMeta{
   210  				Name:      serviceName,
   211  				Namespace: corev1.NamespaceDefault,
   212  			},
   213  		}
   214  	}
   216  	// ... and the list of master IP keys from etcd
   217  	masterIPs, err := r.masterLeases.ListLeases()
   218  	if err != nil {
   219  		return err
   220  	}
   222  	// Since we just refreshed our own key, assume that zero endpoints
   223  	// returned from storage indicates an issue or invalid state, and thus do
   224  	// not update the endpoints list based on the result.
   225  	// If the controller was ordered to stop and is this is the last apiserver
   226  	// we keep going to remove our endpoint before shutting down.
   227  	if !r.stopReconcilingCalled.Load() && len(masterIPs) == 0 {
   228  		return fmt.Errorf("no API server IP addresses were listed in storage, refusing to erase all endpoints for the kubernetes Service")
   229  	}
   231  	// Don't use the EndpointSliceMirroring controller to mirror this to
   232  	// EndpointSlices. This may change in the future.
   233  	skipMirrorChanged := setSkipMirrorTrue(e)
   235  	// Next, we compare the current list of endpoints with the list of master IP keys
   236  	formatCorrect, ipCorrect, portsCorrect := checkEndpointSubsetFormatWithLease(e, masterIPs, endpointPorts, reconcilePorts)
   237  	if !skipMirrorChanged && formatCorrect && ipCorrect && portsCorrect {
   238  		return r.epAdapter.EnsureEndpointSliceFromEndpoints(corev1.NamespaceDefault, e)
   239  	}
   241  	if !formatCorrect {
   242  		// Something is egregiously wrong, just re-make the endpoints record.
   243  		e.Subsets = []corev1.EndpointSubset{{
   244  			Addresses: []corev1.EndpointAddress{},
   245  			Ports:     endpointPorts,
   246  		}}
   247  	}
   249  	if !formatCorrect || !ipCorrect {
   250  		// repopulate the addresses according to the expected IPs from etcd
   251  		e.Subsets[0].Addresses = make([]corev1.EndpointAddress, len(masterIPs))
   252  		for ind, ip := range masterIPs {
   253  			e.Subsets[0].Addresses[ind] = corev1.EndpointAddress{IP: ip}
   254  		}
   256  		// Lexicographic order is retained by this step.
   257  		e.Subsets = endpointsv1.RepackSubsets(e.Subsets)
   258  	}
   260  	if len(e.Subsets) != 0 && !portsCorrect {
   261  		// Reset ports.
   262  		e.Subsets[0].Ports = endpointPorts
   263  	}
   265  	klog.Warningf("Resetting endpoints for master service %q to %v", serviceName, masterIPs)
   266  	if shouldCreate {
   267  		if _, err = r.epAdapter.Create(corev1.NamespaceDefault, e); errors.IsAlreadyExists(err) {
   268  			err = nil
   269  		}
   270  	} else {
   271  		_, err = r.epAdapter.Update(corev1.NamespaceDefault, e)
   272  	}
   273  	return err
   274  }
   276  // checkEndpointSubsetFormatWithLease determines if the endpoint is in the
   277  // format ReconcileEndpoints expects when the controller is using leases.
   278  //
   279  // Return values:
   280  //   - formatCorrect is true if exactly one subset is found.
   281  //   - ipsCorrect when the addresses in the endpoints match the expected addresses list
   282  //   - portsCorrect is true when endpoint ports exactly match provided ports.
   283  //     portsCorrect is only evaluated when reconcilePorts is set to true.
   284  func checkEndpointSubsetFormatWithLease(e *corev1.Endpoints, expectedIPs []string, ports []corev1.EndpointPort, reconcilePorts bool) (formatCorrect bool, ipsCorrect bool, portsCorrect bool) {
   285  	if len(e.Subsets) != 1 {
   286  		return false, false, false
   287  	}
   288  	sub := &e.Subsets[0]
   289  	portsCorrect = true
   290  	if reconcilePorts {
   291  		if len(sub.Ports) != len(ports) {
   292  			portsCorrect = false
   293  		} else {
   294  			for i, port := range ports {
   295  				if port != sub.Ports[i] {
   296  					portsCorrect = false
   297  					break
   298  				}
   299  			}
   300  		}
   301  	}
   303  	ipsCorrect = true
   304  	if len(sub.Addresses) != len(expectedIPs) {
   305  		ipsCorrect = false
   306  	} else {
   307  		// check the actual content of the addresses
   308  		// present addrs is used as a set (the keys) and to indicate if a
   309  		// value was already found (the values)
   310  		presentAddrs := make(map[string]bool, len(expectedIPs))
   311  		for _, ip := range expectedIPs {
   312  			presentAddrs[ip] = false
   313  		}
   315  		// uniqueness is assumed amongst all Addresses.
   316  		for _, addr := range sub.Addresses {
   317  			if alreadySeen, ok := presentAddrs[addr.IP]; alreadySeen || !ok {
   318  				ipsCorrect = false
   319  				break
   320  			}
   322  			presentAddrs[addr.IP] = true
   323  		}
   324  	}
   326  	return true, ipsCorrect, portsCorrect
   327  }
   329  func (r *leaseEndpointReconciler) RemoveEndpoints(serviceName string, ip net.IP, endpointPorts []corev1.EndpointPort) error {
   330  	// Ensure that there will be no race condition with the ReconcileEndpoints.
   331  	r.reconcilingLock.Lock()
   332  	defer r.reconcilingLock.Unlock()
   334  	if err := r.masterLeases.RemoveLease(ip.String()); err != nil {
   335  		return err
   336  	}
   338  	return r.doReconcile(serviceName, endpointPorts, true)
   339  }
   341  func (r *leaseEndpointReconciler) StopReconciling() {
   342  	r.stopReconcilingCalled.Store(true)
   343  }
   345  func (r *leaseEndpointReconciler) Destroy() {
   346  	r.masterLeases.Destroy()
   347  }

View as plain text