...

Source file src/k8s.io/kubernetes/pkg/controlplane/controller/defaultservicecidr/default_servicecidr_controller.go

Documentation: k8s.io/kubernetes/pkg/controlplane/controller/defaultservicecidr

     1  /*
     2  Copyright 2023 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package defaultservicecidr
    18  
    19  import (
    20  	"context"
    21  	"net"
    22  	"reflect"
    23  	"time"
    24  
    25  	v1 "k8s.io/api/core/v1"
    26  	networkingapiv1alpha1 "k8s.io/api/networking/v1alpha1"
    27  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/fields"
    30  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    31  	"k8s.io/apimachinery/pkg/util/wait"
    32  	metav1apply "k8s.io/client-go/applyconfigurations/meta/v1"
    33  	networkingapiv1alpha1apply "k8s.io/client-go/applyconfigurations/networking/v1alpha1"
    34  	networkingv1alpha1informers "k8s.io/client-go/informers/networking/v1alpha1"
    35  	clientset "k8s.io/client-go/kubernetes"
    36  	"k8s.io/client-go/kubernetes/scheme"
    37  	v1core "k8s.io/client-go/kubernetes/typed/core/v1"
    38  	networkingv1alpha1listers "k8s.io/client-go/listers/networking/v1alpha1"
    39  	"k8s.io/client-go/tools/cache"
    40  	"k8s.io/client-go/tools/record"
    41  	"k8s.io/klog/v2"
    42  )
    43  
    44  const (
    45  	controllerName         = "kubernetes-service-cidr-controller"
    46  	DefaultServiceCIDRName = "kubernetes"
    47  )
    48  
    49  // NewController returns a new *Controller that generates the default ServiceCIDR
    50  // from the `--service-cluster-ip-range` flag and recreates it if necessary,
    51  // but doesn't update it if is different.
    52  // It follows the same logic that the kubernetes.default Service.
    53  func NewController(
    54  	primaryRange net.IPNet,
    55  	secondaryRange net.IPNet,
    56  	client clientset.Interface,
    57  ) *Controller {
    58  	broadcaster := record.NewBroadcaster()
    59  	recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: controllerName})
    60  
    61  	c := &Controller{
    62  		client:   client,
    63  		interval: 10 * time.Second, // same as DefaultEndpointReconcilerInterval
    64  	}
    65  
    66  	// obtain configuration from flags
    67  	c.cidrs = append(c.cidrs, primaryRange.String())
    68  	if secondaryRange.IP != nil {
    69  		c.cidrs = append(c.cidrs, secondaryRange.String())
    70  	}
    71  	// instead of using the shared informers from the controlplane instance, we construct our own informer
    72  	// because we need such a small subset of the information available, only the kubernetes.default ServiceCIDR
    73  	c.serviceCIDRInformer = networkingv1alpha1informers.NewFilteredServiceCIDRInformer(client, 12*time.Hour,
    74  		cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
    75  		func(options *metav1.ListOptions) {
    76  			options.FieldSelector = fields.OneTermEqualSelector("metadata.name", DefaultServiceCIDRName).String()
    77  		})
    78  
    79  	c.serviceCIDRLister = networkingv1alpha1listers.NewServiceCIDRLister(c.serviceCIDRInformer.GetIndexer())
    80  	c.serviceCIDRsSynced = c.serviceCIDRInformer.HasSynced
    81  
    82  	c.eventBroadcaster = broadcaster
    83  	c.eventRecorder = recorder
    84  
    85  	return c
    86  }
    87  
    88  // Controller manages selector-based service ipAddress.
    89  type Controller struct {
    90  	cidrs []string // order matters, first cidr defines the default IP family
    91  
    92  	client           clientset.Interface
    93  	eventBroadcaster record.EventBroadcaster
    94  	eventRecorder    record.EventRecorder
    95  
    96  	serviceCIDRInformer cache.SharedIndexInformer
    97  	serviceCIDRLister   networkingv1alpha1listers.ServiceCIDRLister
    98  	serviceCIDRsSynced  cache.InformerSynced
    99  
   100  	interval time.Duration
   101  }
   102  
   103  // Start will not return until the default ServiceCIDR exists or stopCh is closed.
   104  func (c *Controller) Start(stopCh <-chan struct{}) {
   105  	defer utilruntime.HandleCrash()
   106  
   107  	c.eventBroadcaster.StartStructuredLogging(0)
   108  	c.eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: c.client.CoreV1().Events("")})
   109  	defer c.eventBroadcaster.Shutdown()
   110  
   111  	klog.Infof("Starting %s", controllerName)
   112  	defer klog.Infof("Shutting down %s", controllerName)
   113  
   114  	go c.serviceCIDRInformer.Run(stopCh)
   115  	if !cache.WaitForNamedCacheSync(controllerName, stopCh, c.serviceCIDRsSynced) {
   116  		return
   117  	}
   118  
   119  	// derive a context from the stopCh so we can cancel the poll loop
   120  	ctx := wait.ContextForChannel(stopCh)
   121  	// wait until first successfully sync
   122  	// this blocks apiserver startup so poll with a short interval
   123  	err := wait.PollUntilContextCancel(ctx, 100*time.Millisecond, true, func(ctx context.Context) (bool, error) {
   124  		syncErr := c.sync()
   125  		return syncErr == nil, nil
   126  	})
   127  	if err != nil {
   128  		klog.Infof("error initializing the default ServiceCIDR: %v", err)
   129  
   130  	}
   131  
   132  	// run the sync loop in the background with the defined interval
   133  	go wait.Until(func() {
   134  		err := c.sync()
   135  		if err != nil {
   136  			klog.Infof("error trying to sync the default ServiceCIDR: %v", err)
   137  		}
   138  	}, c.interval, stopCh)
   139  }
   140  
   141  func (c *Controller) sync() error {
   142  	// check if the default ServiceCIDR already exist
   143  	serviceCIDR, err := c.serviceCIDRLister.Get(DefaultServiceCIDRName)
   144  	// if exists
   145  	if err == nil {
   146  		c.syncStatus(serviceCIDR)
   147  		return nil
   148  	}
   149  
   150  	// unknown error
   151  	if !apierrors.IsNotFound(err) {
   152  		return err
   153  	}
   154  
   155  	// default ServiceCIDR does not exist
   156  	klog.Infof("Creating default ServiceCIDR with CIDRs: %v", c.cidrs)
   157  	serviceCIDR = &networkingapiv1alpha1.ServiceCIDR{
   158  		ObjectMeta: metav1.ObjectMeta{
   159  			Name: DefaultServiceCIDRName,
   160  		},
   161  		Spec: networkingapiv1alpha1.ServiceCIDRSpec{
   162  			CIDRs: c.cidrs,
   163  		},
   164  	}
   165  	serviceCIDR, err = c.client.NetworkingV1alpha1().ServiceCIDRs().Create(context.Background(), serviceCIDR, metav1.CreateOptions{})
   166  	if err != nil && !apierrors.IsAlreadyExists(err) {
   167  		c.eventRecorder.Eventf(serviceCIDR, v1.EventTypeWarning, "KubernetesDefaultServiceCIDRError", "The default ServiceCIDR can not be created")
   168  		return err
   169  	}
   170  	c.syncStatus(serviceCIDR)
   171  	return nil
   172  }
   173  
   174  func (c *Controller) syncStatus(serviceCIDR *networkingapiv1alpha1.ServiceCIDR) {
   175  	// don't sync the status of the ServiceCIDR if is being deleted,
   176  	// deletion must be handled by the controller-manager
   177  	if !serviceCIDR.GetDeletionTimestamp().IsZero() {
   178  		return
   179  	}
   180  
   181  	// This controller will set the Ready condition to true if the Ready condition
   182  	// does not exist and the CIDR values match this controller CIDR values.
   183  	for _, condition := range serviceCIDR.Status.Conditions {
   184  		if condition.Type == networkingapiv1alpha1.ServiceCIDRConditionReady {
   185  			if condition.Status == metav1.ConditionTrue {
   186  				return
   187  			}
   188  			klog.Infof("default ServiceCIDR condition Ready is not True: %v", condition.Status)
   189  			c.eventRecorder.Eventf(serviceCIDR, v1.EventTypeWarning, condition.Reason, condition.Message)
   190  			return
   191  		}
   192  	}
   193  	// set status to ready if the ServiceCIDR matches this configuration
   194  	if reflect.DeepEqual(c.cidrs, serviceCIDR.Spec.CIDRs) {
   195  		klog.Infof("Setting default ServiceCIDR condition Ready to True")
   196  		svcApplyStatus := networkingapiv1alpha1apply.ServiceCIDRStatus().WithConditions(
   197  			metav1apply.Condition().
   198  				WithType(networkingapiv1alpha1.ServiceCIDRConditionReady).
   199  				WithStatus(metav1.ConditionTrue).
   200  				WithMessage("Kubernetes default Service CIDR is ready").
   201  				WithLastTransitionTime(metav1.Now()))
   202  		svcApply := networkingapiv1alpha1apply.ServiceCIDR(DefaultServiceCIDRName).WithStatus(svcApplyStatus)
   203  		if _, errApply := c.client.NetworkingV1alpha1().ServiceCIDRs().ApplyStatus(context.Background(), svcApply, metav1.ApplyOptions{FieldManager: controllerName, Force: true}); errApply != nil {
   204  			klog.Infof("error updating default ServiceCIDR status: %v", errApply)
   205  			c.eventRecorder.Eventf(serviceCIDR, v1.EventTypeWarning, "KubernetesDefaultServiceCIDRError", "The default ServiceCIDR Status can not be set to Ready=True")
   206  		}
   207  	}
   208  }
   209  

View as plain text