...

Source file src/k8s.io/kubernetes/pkg/proxy/config/config.go

Documentation: k8s.io/kubernetes/pkg/proxy/config

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package config
    18  
    19  import (
    20  	"fmt"
    21  	"sync"
    22  	"time"
    23  
    24  	v1 "k8s.io/api/core/v1"
    25  	discoveryv1 "k8s.io/api/discovery/v1"
    26  	networkingv1alpha1 "k8s.io/api/networking/v1alpha1"
    27  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    28  	"k8s.io/apimachinery/pkg/util/sets"
    29  	v1informers "k8s.io/client-go/informers/core/v1"
    30  	discoveryv1informers "k8s.io/client-go/informers/discovery/v1"
    31  	networkingv1alpha1informers "k8s.io/client-go/informers/networking/v1alpha1"
    32  	"k8s.io/client-go/tools/cache"
    33  	"k8s.io/klog/v2"
    34  )
    35  
    36  // ServiceHandler is an abstract interface of objects which receive
    37  // notifications about service object changes.
    38  type ServiceHandler interface {
    39  	// OnServiceAdd is called whenever creation of new service object
    40  	// is observed.
    41  	OnServiceAdd(service *v1.Service)
    42  	// OnServiceUpdate is called whenever modification of an existing
    43  	// service object is observed.
    44  	OnServiceUpdate(oldService, service *v1.Service)
    45  	// OnServiceDelete is called whenever deletion of an existing service
    46  	// object is observed.
    47  	OnServiceDelete(service *v1.Service)
    48  	// OnServiceSynced is called once all the initial event handlers were
    49  	// called and the state is fully propagated to local cache.
    50  	OnServiceSynced()
    51  }
    52  
    53  // EndpointSliceHandler is an abstract interface of objects which receive
    54  // notifications about endpoint slice object changes.
    55  type EndpointSliceHandler interface {
    56  	// OnEndpointSliceAdd is called whenever creation of new endpoint slice
    57  	// object is observed.
    58  	OnEndpointSliceAdd(endpointSlice *discoveryv1.EndpointSlice)
    59  	// OnEndpointSliceUpdate is called whenever modification of an existing
    60  	// endpoint slice object is observed.
    61  	OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice *discoveryv1.EndpointSlice)
    62  	// OnEndpointSliceDelete is called whenever deletion of an existing
    63  	// endpoint slice object is observed.
    64  	OnEndpointSliceDelete(endpointSlice *discoveryv1.EndpointSlice)
    65  	// OnEndpointSlicesSynced is called once all the initial event handlers were
    66  	// called and the state is fully propagated to local cache.
    67  	OnEndpointSlicesSynced()
    68  }
    69  
    70  // EndpointSliceConfig tracks a set of endpoints configurations.
    71  type EndpointSliceConfig struct {
    72  	listerSynced  cache.InformerSynced
    73  	eventHandlers []EndpointSliceHandler
    74  }
    75  
    76  // NewEndpointSliceConfig creates a new EndpointSliceConfig.
    77  func NewEndpointSliceConfig(endpointSliceInformer discoveryv1informers.EndpointSliceInformer, resyncPeriod time.Duration) *EndpointSliceConfig {
    78  	result := &EndpointSliceConfig{
    79  		listerSynced: endpointSliceInformer.Informer().HasSynced,
    80  	}
    81  
    82  	_, _ = endpointSliceInformer.Informer().AddEventHandlerWithResyncPeriod(
    83  		cache.ResourceEventHandlerFuncs{
    84  			AddFunc:    result.handleAddEndpointSlice,
    85  			UpdateFunc: result.handleUpdateEndpointSlice,
    86  			DeleteFunc: result.handleDeleteEndpointSlice,
    87  		},
    88  		resyncPeriod,
    89  	)
    90  
    91  	return result
    92  }
    93  
    94  // RegisterEventHandler registers a handler which is called on every endpoint slice change.
    95  func (c *EndpointSliceConfig) RegisterEventHandler(handler EndpointSliceHandler) {
    96  	c.eventHandlers = append(c.eventHandlers, handler)
    97  }
    98  
    99  // Run waits for cache synced and invokes handlers after syncing.
   100  func (c *EndpointSliceConfig) Run(stopCh <-chan struct{}) {
   101  	klog.InfoS("Starting endpoint slice config controller")
   102  
   103  	if !cache.WaitForNamedCacheSync("endpoint slice config", stopCh, c.listerSynced) {
   104  		return
   105  	}
   106  
   107  	for _, h := range c.eventHandlers {
   108  		klog.V(3).InfoS("Calling handler.OnEndpointSlicesSynced()")
   109  		h.OnEndpointSlicesSynced()
   110  	}
   111  }
   112  
   113  func (c *EndpointSliceConfig) handleAddEndpointSlice(obj interface{}) {
   114  	endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
   115  	if !ok {
   116  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
   117  		return
   118  	}
   119  	for _, h := range c.eventHandlers {
   120  		klog.V(4).InfoS("Calling handler.OnEndpointSliceAdd", "endpoints", klog.KObj(endpointSlice))
   121  		h.OnEndpointSliceAdd(endpointSlice)
   122  	}
   123  }
   124  
   125  func (c *EndpointSliceConfig) handleUpdateEndpointSlice(oldObj, newObj interface{}) {
   126  	oldEndpointSlice, ok := oldObj.(*discoveryv1.EndpointSlice)
   127  	if !ok {
   128  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj))
   129  		return
   130  	}
   131  	newEndpointSlice, ok := newObj.(*discoveryv1.EndpointSlice)
   132  	if !ok {
   133  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", newObj))
   134  		return
   135  	}
   136  	for _, h := range c.eventHandlers {
   137  		klog.V(4).InfoS("Calling handler.OnEndpointSliceUpdate")
   138  		h.OnEndpointSliceUpdate(oldEndpointSlice, newEndpointSlice)
   139  	}
   140  }
   141  
   142  func (c *EndpointSliceConfig) handleDeleteEndpointSlice(obj interface{}) {
   143  	endpointSlice, ok := obj.(*discoveryv1.EndpointSlice)
   144  	if !ok {
   145  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   146  		if !ok {
   147  			utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
   148  			return
   149  		}
   150  		if endpointSlice, ok = tombstone.Obj.(*discoveryv1.EndpointSlice); !ok {
   151  			utilruntime.HandleError(fmt.Errorf("unexpected object type: %T", obj))
   152  			return
   153  		}
   154  	}
   155  	for _, h := range c.eventHandlers {
   156  		klog.V(4).InfoS("Calling handler.OnEndpointsDelete")
   157  		h.OnEndpointSliceDelete(endpointSlice)
   158  	}
   159  }
   160  
   161  // ServiceConfig tracks a set of service configurations.
   162  type ServiceConfig struct {
   163  	listerSynced  cache.InformerSynced
   164  	eventHandlers []ServiceHandler
   165  }
   166  
   167  // NewServiceConfig creates a new ServiceConfig.
   168  func NewServiceConfig(serviceInformer v1informers.ServiceInformer, resyncPeriod time.Duration) *ServiceConfig {
   169  	result := &ServiceConfig{
   170  		listerSynced: serviceInformer.Informer().HasSynced,
   171  	}
   172  
   173  	_, _ = serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
   174  		cache.ResourceEventHandlerFuncs{
   175  			AddFunc:    result.handleAddService,
   176  			UpdateFunc: result.handleUpdateService,
   177  			DeleteFunc: result.handleDeleteService,
   178  		},
   179  		resyncPeriod,
   180  	)
   181  
   182  	return result
   183  }
   184  
   185  // RegisterEventHandler registers a handler which is called on every service change.
   186  func (c *ServiceConfig) RegisterEventHandler(handler ServiceHandler) {
   187  	c.eventHandlers = append(c.eventHandlers, handler)
   188  }
   189  
   190  // Run waits for cache synced and invokes handlers after syncing.
   191  func (c *ServiceConfig) Run(stopCh <-chan struct{}) {
   192  	klog.InfoS("Starting service config controller")
   193  
   194  	if !cache.WaitForNamedCacheSync("service config", stopCh, c.listerSynced) {
   195  		return
   196  	}
   197  
   198  	for i := range c.eventHandlers {
   199  		klog.V(3).InfoS("Calling handler.OnServiceSynced()")
   200  		c.eventHandlers[i].OnServiceSynced()
   201  	}
   202  }
   203  
   204  func (c *ServiceConfig) handleAddService(obj interface{}) {
   205  	service, ok := obj.(*v1.Service)
   206  	if !ok {
   207  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
   208  		return
   209  	}
   210  	for i := range c.eventHandlers {
   211  		klog.V(4).InfoS("Calling handler.OnServiceAdd")
   212  		c.eventHandlers[i].OnServiceAdd(service)
   213  	}
   214  }
   215  
   216  func (c *ServiceConfig) handleUpdateService(oldObj, newObj interface{}) {
   217  	oldService, ok := oldObj.(*v1.Service)
   218  	if !ok {
   219  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
   220  		return
   221  	}
   222  	service, ok := newObj.(*v1.Service)
   223  	if !ok {
   224  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
   225  		return
   226  	}
   227  	for i := range c.eventHandlers {
   228  		klog.V(4).InfoS("Calling handler.OnServiceUpdate")
   229  		c.eventHandlers[i].OnServiceUpdate(oldService, service)
   230  	}
   231  }
   232  
   233  func (c *ServiceConfig) handleDeleteService(obj interface{}) {
   234  	service, ok := obj.(*v1.Service)
   235  	if !ok {
   236  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   237  		if !ok {
   238  			utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
   239  			return
   240  		}
   241  		if service, ok = tombstone.Obj.(*v1.Service); !ok {
   242  			utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
   243  			return
   244  		}
   245  	}
   246  	for i := range c.eventHandlers {
   247  		klog.V(4).InfoS("Calling handler.OnServiceDelete")
   248  		c.eventHandlers[i].OnServiceDelete(service)
   249  	}
   250  }
   251  
   252  // NodeHandler is an abstract interface of objects which receive
   253  // notifications about node object changes.
   254  type NodeHandler interface {
   255  	// OnNodeAdd is called whenever creation of new node object
   256  	// is observed.
   257  	OnNodeAdd(node *v1.Node)
   258  	// OnNodeUpdate is called whenever modification of an existing
   259  	// node object is observed.
   260  	OnNodeUpdate(oldNode, node *v1.Node)
   261  	// OnNodeDelete is called whenever deletion of an existing node
   262  	// object is observed.
   263  	OnNodeDelete(node *v1.Node)
   264  	// OnNodeSynced is called once all the initial event handlers were
   265  	// called and the state is fully propagated to local cache.
   266  	OnNodeSynced()
   267  }
   268  
   269  // NoopNodeHandler is a noop handler for proxiers that have not yet
   270  // implemented a full NodeHandler.
   271  type NoopNodeHandler struct{}
   272  
   273  // OnNodeAdd is a noop handler for Node creates.
   274  func (*NoopNodeHandler) OnNodeAdd(node *v1.Node) {}
   275  
   276  // OnNodeUpdate is a noop handler for Node updates.
   277  func (*NoopNodeHandler) OnNodeUpdate(oldNode, node *v1.Node) {}
   278  
   279  // OnNodeDelete is a noop handler for Node deletes.
   280  func (*NoopNodeHandler) OnNodeDelete(node *v1.Node) {}
   281  
   282  // OnNodeSynced is a noop handler for Node syncs.
   283  func (*NoopNodeHandler) OnNodeSynced() {}
   284  
   285  var _ NodeHandler = &NoopNodeHandler{}
   286  
   287  // NodeConfig tracks a set of node configurations.
   288  // It accepts "set", "add" and "remove" operations of node via channels, and invokes registered handlers on change.
   289  type NodeConfig struct {
   290  	listerSynced  cache.InformerSynced
   291  	eventHandlers []NodeHandler
   292  }
   293  
   294  // NewNodeConfig creates a new NodeConfig.
   295  func NewNodeConfig(nodeInformer v1informers.NodeInformer, resyncPeriod time.Duration) *NodeConfig {
   296  	result := &NodeConfig{
   297  		listerSynced: nodeInformer.Informer().HasSynced,
   298  	}
   299  
   300  	_, _ = nodeInformer.Informer().AddEventHandlerWithResyncPeriod(
   301  		cache.ResourceEventHandlerFuncs{
   302  			AddFunc:    result.handleAddNode,
   303  			UpdateFunc: result.handleUpdateNode,
   304  			DeleteFunc: result.handleDeleteNode,
   305  		},
   306  		resyncPeriod,
   307  	)
   308  
   309  	return result
   310  }
   311  
   312  // RegisterEventHandler registers a handler which is called on every node change.
   313  func (c *NodeConfig) RegisterEventHandler(handler NodeHandler) {
   314  	c.eventHandlers = append(c.eventHandlers, handler)
   315  }
   316  
   317  // Run starts the goroutine responsible for calling registered handlers.
   318  func (c *NodeConfig) Run(stopCh <-chan struct{}) {
   319  	klog.InfoS("Starting node config controller")
   320  
   321  	if !cache.WaitForNamedCacheSync("node config", stopCh, c.listerSynced) {
   322  		return
   323  	}
   324  
   325  	for i := range c.eventHandlers {
   326  		klog.V(3).InfoS("Calling handler.OnNodeSynced()")
   327  		c.eventHandlers[i].OnNodeSynced()
   328  	}
   329  }
   330  
   331  func (c *NodeConfig) handleAddNode(obj interface{}) {
   332  	node, ok := obj.(*v1.Node)
   333  	if !ok {
   334  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
   335  		return
   336  	}
   337  	for i := range c.eventHandlers {
   338  		klog.V(4).InfoS("Calling handler.OnNodeAdd")
   339  		c.eventHandlers[i].OnNodeAdd(node)
   340  	}
   341  }
   342  
   343  func (c *NodeConfig) handleUpdateNode(oldObj, newObj interface{}) {
   344  	oldNode, ok := oldObj.(*v1.Node)
   345  	if !ok {
   346  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
   347  		return
   348  	}
   349  	node, ok := newObj.(*v1.Node)
   350  	if !ok {
   351  		utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
   352  		return
   353  	}
   354  	for i := range c.eventHandlers {
   355  		klog.V(5).InfoS("Calling handler.OnNodeUpdate")
   356  		c.eventHandlers[i].OnNodeUpdate(oldNode, node)
   357  	}
   358  }
   359  
   360  func (c *NodeConfig) handleDeleteNode(obj interface{}) {
   361  	node, ok := obj.(*v1.Node)
   362  	if !ok {
   363  		tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
   364  		if !ok {
   365  			utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
   366  			return
   367  		}
   368  		if node, ok = tombstone.Obj.(*v1.Node); !ok {
   369  			utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
   370  			return
   371  		}
   372  	}
   373  	for i := range c.eventHandlers {
   374  		klog.V(4).InfoS("Calling handler.OnNodeDelete")
   375  		c.eventHandlers[i].OnNodeDelete(node)
   376  	}
   377  }
   378  
   379  // ServiceCIDRHandler is an abstract interface of objects which receive
   380  // notifications about ServiceCIDR object changes.
   381  type ServiceCIDRHandler interface {
   382  	// OnServiceCIDRsChanged is called whenever a change is observed
   383  	// in any of the ServiceCIDRs, and provides complete list of service cidrs.
   384  	OnServiceCIDRsChanged(cidrs []string)
   385  }
   386  
   387  // ServiceCIDRConfig tracks a set of service configurations.
   388  type ServiceCIDRConfig struct {
   389  	listerSynced  cache.InformerSynced
   390  	eventHandlers []ServiceCIDRHandler
   391  	mu            sync.Mutex
   392  	cidrs         sets.Set[string]
   393  }
   394  
   395  // NewServiceCIDRConfig creates a new ServiceCIDRConfig.
   396  func NewServiceCIDRConfig(serviceCIDRInformer networkingv1alpha1informers.ServiceCIDRInformer, resyncPeriod time.Duration) *ServiceCIDRConfig {
   397  	result := &ServiceCIDRConfig{
   398  		listerSynced: serviceCIDRInformer.Informer().HasSynced,
   399  		cidrs:        sets.New[string](),
   400  	}
   401  
   402  	_, _ = serviceCIDRInformer.Informer().AddEventHandlerWithResyncPeriod(
   403  		cache.ResourceEventHandlerFuncs{
   404  			AddFunc: func(obj interface{}) {
   405  				result.handleServiceCIDREvent(nil, obj)
   406  			},
   407  			UpdateFunc: func(oldObj, newObj interface{}) {
   408  				result.handleServiceCIDREvent(oldObj, newObj)
   409  			},
   410  			DeleteFunc: func(obj interface{}) {
   411  				result.handleServiceCIDREvent(obj, nil)
   412  			},
   413  		},
   414  		resyncPeriod,
   415  	)
   416  	return result
   417  }
   418  
   419  // RegisterEventHandler registers a handler which is called on every ServiceCIDR change.
   420  func (c *ServiceCIDRConfig) RegisterEventHandler(handler ServiceCIDRHandler) {
   421  	c.eventHandlers = append(c.eventHandlers, handler)
   422  }
   423  
   424  // Run waits for cache synced and invokes handlers after syncing.
   425  func (c *ServiceCIDRConfig) Run(stopCh <-chan struct{}) {
   426  	klog.InfoS("Starting serviceCIDR config controller")
   427  
   428  	if !cache.WaitForNamedCacheSync("serviceCIDR config", stopCh, c.listerSynced) {
   429  		return
   430  	}
   431  	c.handleServiceCIDREvent(nil, nil)
   432  }
   433  
   434  // handleServiceCIDREvent is a helper function to handle Add, Update and Delete
   435  // events on ServiceCIDR objects and call downstream event handlers.
   436  func (c *ServiceCIDRConfig) handleServiceCIDREvent(oldObj, newObj interface{}) {
   437  	var oldServiceCIDR, newServiceCIDR *networkingv1alpha1.ServiceCIDR
   438  	var ok bool
   439  
   440  	if oldObj != nil {
   441  		oldServiceCIDR, ok = oldObj.(*networkingv1alpha1.ServiceCIDR)
   442  		if !ok {
   443  			utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", oldObj))
   444  			return
   445  		}
   446  	}
   447  
   448  	if newObj != nil {
   449  		newServiceCIDR, ok = newObj.(*networkingv1alpha1.ServiceCIDR)
   450  		if !ok {
   451  			utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
   452  			return
   453  		}
   454  	}
   455  
   456  	c.mu.Lock()
   457  	defer c.mu.Unlock()
   458  
   459  	if oldServiceCIDR != nil {
   460  		c.cidrs.Delete(oldServiceCIDR.Spec.CIDRs...)
   461  	}
   462  
   463  	if newServiceCIDR != nil {
   464  		c.cidrs.Insert(newServiceCIDR.Spec.CIDRs...)
   465  	}
   466  
   467  	for i := range c.eventHandlers {
   468  		klog.V(4).InfoS("Calling handler.OnServiceCIDRsChanged")
   469  		c.eventHandlers[i].OnServiceCIDRsChanged(c.cidrs.UnsortedList())
   470  	}
   471  }
   472  

View as plain text