...

Source file src/k8s.io/kubernetes/pkg/controlplane/controller/kubernetesservice/controller.go

Documentation: k8s.io/kubernetes/pkg/controlplane/controller/kubernetesservice

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package kubernetesservice
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"net"
    23  	"net/http"
    24  	"sync"
    25  	"time"
    26  
    27  	corev1 "k8s.io/api/core/v1"
    28  	"k8s.io/apimachinery/pkg/api/errors"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/util/intstr"
    31  	"k8s.io/apimachinery/pkg/util/runtime"
    32  	"k8s.io/apimachinery/pkg/util/wait"
    33  	"k8s.io/apiserver/pkg/storage"
    34  	v1informers "k8s.io/client-go/informers/core/v1"
    35  	"k8s.io/client-go/kubernetes"
    36  	v1listers "k8s.io/client-go/listers/core/v1"
    37  	"k8s.io/client-go/tools/cache"
    38  	"k8s.io/klog/v2"
    39  
    40  	"k8s.io/kubernetes/pkg/controlplane/reconcilers"
    41  )
    42  
    43  const (
    44  	kubernetesServiceName = "kubernetes"
    45  )
    46  
    47  // Controller is the controller manager for the core bootstrap Kubernetes
    48  // controller loops, which manage creating the "kubernetes" service and
    49  // provide the IP repair check on service IPs
    50  type Controller struct {
    51  	Config
    52  
    53  	client        kubernetes.Interface
    54  	serviceLister v1listers.ServiceLister
    55  	serviceSynced cache.InformerSynced
    56  
    57  	lock   sync.Mutex
    58  	stopCh chan struct{} // closed by Stop()
    59  }
    60  
    61  type Config struct {
    62  	PublicIP net.IP
    63  
    64  	EndpointReconciler reconcilers.EndpointReconciler
    65  	EndpointInterval   time.Duration
    66  
    67  	// ServiceIP indicates where the kubernetes service will live.  It may not be nil.
    68  	ServiceIP                 net.IP
    69  	ServicePort               int
    70  	PublicServicePort         int
    71  	KubernetesServiceNodePort int
    72  }
    73  
    74  // New returns a controller for watching the kubernetes service endpoints.
    75  func New(config Config, client kubernetes.Interface, serviceInformer v1informers.ServiceInformer) *Controller {
    76  	return &Controller{
    77  		Config:        config,
    78  		client:        client,
    79  		serviceLister: serviceInformer.Lister(),
    80  		serviceSynced: serviceInformer.Informer().HasSynced,
    81  		stopCh:        make(chan struct{}),
    82  	}
    83  }
    84  
    85  // Start begins the core controller loops that must exist for bootstrapping
    86  // a cluster.
    87  func (c *Controller) Start(stopCh <-chan struct{}) {
    88  	if !cache.WaitForCacheSync(stopCh, c.serviceSynced) {
    89  		runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
    90  		return
    91  	}
    92  	// Reconcile during first run removing itself until server is ready.
    93  	endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
    94  	if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err == nil {
    95  		klog.Error("Found stale data, removed previous endpoints on kubernetes service, apiserver didn't exit successfully previously")
    96  	} else if !storage.IsNotFound(err) {
    97  		klog.Errorf("Error removing old endpoints from kubernetes service: %v", err)
    98  	}
    99  
   100  	localStopCh := make(chan struct{})
   101  	go func() {
   102  		defer close(localStopCh)
   103  		select {
   104  		case <-stopCh: // from Start
   105  		case <-c.stopCh: // from Stop
   106  		}
   107  	}()
   108  
   109  	go c.Run(localStopCh)
   110  }
   111  
   112  // Stop cleans up this API Servers endpoint reconciliation leases so another master can take over more quickly.
   113  func (c *Controller) Stop() {
   114  	c.lock.Lock()
   115  	defer c.lock.Unlock()
   116  
   117  	select {
   118  	case <-c.stopCh:
   119  		return // only close once
   120  	default:
   121  		close(c.stopCh)
   122  	}
   123  
   124  	endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
   125  	finishedReconciling := make(chan struct{})
   126  	go func() {
   127  		defer close(finishedReconciling)
   128  		klog.Infof("Shutting down kubernetes service endpoint reconciler")
   129  		c.EndpointReconciler.StopReconciling()
   130  		if err := c.EndpointReconciler.RemoveEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts); err != nil {
   131  			klog.Errorf("Unable to remove endpoints from kubernetes service: %v", err)
   132  		}
   133  		c.EndpointReconciler.Destroy()
   134  	}()
   135  
   136  	select {
   137  	case <-finishedReconciling:
   138  		// done
   139  	case <-time.After(2 * c.EndpointInterval):
   140  		// don't block server shutdown forever if we can't reach etcd to remove ourselves
   141  		klog.Warning("RemoveEndpoints() timed out")
   142  	}
   143  }
   144  
   145  // Run periodically updates the kubernetes service
   146  func (c *Controller) Run(ch <-chan struct{}) {
   147  	// wait until process is ready
   148  	wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
   149  		var code int
   150  		c.client.CoreV1().RESTClient().Get().AbsPath("/readyz").Do(context.TODO()).StatusCode(&code)
   151  		return code == http.StatusOK, nil
   152  	}, ch)
   153  
   154  	wait.NonSlidingUntil(func() {
   155  		// Service definition is not reconciled after first
   156  		// run, ports and type will be corrected only during
   157  		// start.
   158  		if err := c.UpdateKubernetesService(false); err != nil {
   159  			runtime.HandleError(fmt.Errorf("unable to sync kubernetes service: %v", err))
   160  		}
   161  	}, c.EndpointInterval, ch)
   162  }
   163  
   164  // UpdateKubernetesService attempts to update the default Kube service.
   165  func (c *Controller) UpdateKubernetesService(reconcile bool) error {
   166  	// Update service & endpoint records.
   167  	servicePorts, serviceType := createPortAndServiceSpec(c.ServicePort, c.PublicServicePort, c.KubernetesServiceNodePort, "https")
   168  	if err := c.CreateOrUpdateMasterServiceIfNeeded(kubernetesServiceName, c.ServiceIP, servicePorts, serviceType, reconcile); err != nil {
   169  		return err
   170  	}
   171  	endpointPorts := createEndpointPortSpec(c.PublicServicePort, "https")
   172  	if err := c.EndpointReconciler.ReconcileEndpoints(kubernetesServiceName, c.PublicIP, endpointPorts, reconcile); err != nil {
   173  		return err
   174  	}
   175  	return nil
   176  }
   177  
   178  // createPortAndServiceSpec creates an array of service ports.
   179  // If the NodePort value is 0, just the servicePort is used, otherwise, a node port is exposed.
   180  func createPortAndServiceSpec(servicePort int, targetServicePort int, nodePort int, servicePortName string) ([]corev1.ServicePort, corev1.ServiceType) {
   181  	// Use the Cluster IP type for the service port if NodePort isn't provided.
   182  	// Otherwise, we will be binding the master service to a NodePort.
   183  	servicePorts := []corev1.ServicePort{{
   184  		Protocol:   corev1.ProtocolTCP,
   185  		Port:       int32(servicePort),
   186  		Name:       servicePortName,
   187  		TargetPort: intstr.FromInt32(int32(targetServicePort)),
   188  	}}
   189  	serviceType := corev1.ServiceTypeClusterIP
   190  	if nodePort > 0 {
   191  		servicePorts[0].NodePort = int32(nodePort)
   192  		serviceType = corev1.ServiceTypeNodePort
   193  	}
   194  	return servicePorts, serviceType
   195  }
   196  
   197  // createEndpointPortSpec creates the endpoint ports
   198  func createEndpointPortSpec(endpointPort int, endpointPortName string) []corev1.EndpointPort {
   199  	return []corev1.EndpointPort{{
   200  		Protocol: corev1.ProtocolTCP,
   201  		Port:     int32(endpointPort),
   202  		Name:     endpointPortName,
   203  	}}
   204  }
   205  
   206  // CreateOrUpdateMasterServiceIfNeeded will create the specified service if it
   207  // doesn't already exist.
   208  func (c *Controller) CreateOrUpdateMasterServiceIfNeeded(serviceName string, serviceIP net.IP, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType, reconcile bool) error {
   209  	if s, err := c.serviceLister.Services(metav1.NamespaceDefault).Get(serviceName); err == nil {
   210  		// The service already exists.
   211  		// This path is no executed since 1.17 2a9a9fa, keeping it in case it needs to be revisited
   212  		if reconcile {
   213  			if svc, updated := getMasterServiceUpdateIfNeeded(s, servicePorts, serviceType); updated {
   214  				klog.Warningf("Resetting master service %q to %#v", serviceName, svc)
   215  				_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Update(context.TODO(), svc, metav1.UpdateOptions{})
   216  				return err
   217  			}
   218  		}
   219  		return nil
   220  	}
   221  	singleStack := corev1.IPFamilyPolicySingleStack
   222  	svc := &corev1.Service{
   223  		ObjectMeta: metav1.ObjectMeta{
   224  			Name:      serviceName,
   225  			Namespace: metav1.NamespaceDefault,
   226  			Labels:    map[string]string{"provider": "kubernetes", "component": "apiserver"},
   227  		},
   228  		Spec: corev1.ServiceSpec{
   229  			Ports: servicePorts,
   230  			// maintained by this code, not by the pod selector
   231  			Selector:        nil,
   232  			ClusterIP:       serviceIP.String(),
   233  			IPFamilyPolicy:  &singleStack,
   234  			SessionAffinity: corev1.ServiceAffinityNone,
   235  			Type:            serviceType,
   236  		},
   237  	}
   238  
   239  	_, err := c.client.CoreV1().Services(metav1.NamespaceDefault).Create(context.TODO(), svc, metav1.CreateOptions{})
   240  	if errors.IsAlreadyExists(err) {
   241  		return c.CreateOrUpdateMasterServiceIfNeeded(serviceName, serviceIP, servicePorts, serviceType, reconcile)
   242  	}
   243  	return err
   244  }
   245  
   246  // getMasterServiceUpdateIfNeeded sets service attributes for the given apiserver service.
   247  func getMasterServiceUpdateIfNeeded(svc *corev1.Service, servicePorts []corev1.ServicePort, serviceType corev1.ServiceType) (s *corev1.Service, updated bool) {
   248  	// Determine if the service is in the format we expect
   249  	// (servicePorts are present and service type matches)
   250  	formatCorrect := checkServiceFormat(svc, servicePorts, serviceType)
   251  	if formatCorrect {
   252  		return svc, false
   253  	}
   254  	svc.Spec.Ports = servicePorts
   255  	svc.Spec.Type = serviceType
   256  	return svc, true
   257  }
   258  
   259  // Determine if the service is in the correct format
   260  // getMasterServiceUpdateIfNeeded expects (servicePorts are correct
   261  // and service type matches).
   262  func checkServiceFormat(s *corev1.Service, ports []corev1.ServicePort, serviceType corev1.ServiceType) (formatCorrect bool) {
   263  	if s.Spec.Type != serviceType {
   264  		return false
   265  	}
   266  	if len(ports) != len(s.Spec.Ports) {
   267  		return false
   268  	}
   269  	for i, port := range ports {
   270  		if port != s.Spec.Ports[i] {
   271  			return false
   272  		}
   273  	}
   274  	return true
   275  }
   276  

View as plain text