...

Source file src/k8s.io/kubernetes/pkg/controlplane/reconcilers/endpointsadapter.go

Documentation: k8s.io/kubernetes/pkg/controlplane/reconcilers

     1  /*
     2  Copyright 2019 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package reconcilers
    18  
    19  import (
    20  	"context"
    21  
    22  	corev1 "k8s.io/api/core/v1"
    23  	discovery "k8s.io/api/discovery/v1"
    24  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    25  	"k8s.io/apimachinery/pkg/api/errors"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
    28  	discoveryclient "k8s.io/client-go/kubernetes/typed/discovery/v1"
    29  	utilnet "k8s.io/utils/net"
    30  )
    31  
    32  // EndpointsAdapter provides a simple interface for reading and writing both
    33  // Endpoints and Endpoint Slices.
    34  // NOTE: This is an incomplete adapter implementation that is only suitable for
    35  // use in this package. This takes advantage of the Endpoints used in this
    36  // package always having a consistent set of ports, a single subset, and a small
    37  // set of addresses. Any more complex Endpoints resource would likely translate
    38  // into multiple Endpoint Slices creating significantly more complexity instead
    39  // of the 1:1 mapping this allows.
    40  type EndpointsAdapter struct {
    41  	endpointClient      corev1client.EndpointsGetter
    42  	endpointSliceClient discoveryclient.EndpointSlicesGetter
    43  }
    44  
    45  // NewEndpointsAdapter returns a new EndpointsAdapter.
    46  func NewEndpointsAdapter(endpointClient corev1client.EndpointsGetter, endpointSliceClient discoveryclient.EndpointSlicesGetter) EndpointsAdapter {
    47  	return EndpointsAdapter{
    48  		endpointClient:      endpointClient,
    49  		endpointSliceClient: endpointSliceClient,
    50  	}
    51  }
    52  
    53  // Get takes the name and namespace of the Endpoints resource, and returns a
    54  // corresponding Endpoints object if it exists, and an error if there is any.
    55  func (adapter *EndpointsAdapter) Get(namespace, name string, getOpts metav1.GetOptions) (*corev1.Endpoints, error) {
    56  	return adapter.endpointClient.Endpoints(namespace).Get(context.TODO(), name, getOpts)
    57  }
    58  
    59  // Create accepts a namespace and Endpoints object and creates the Endpoints
    60  // object and matching EndpointSlice. The created Endpoints object or an error will be
    61  // returned.
    62  func (adapter *EndpointsAdapter) Create(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
    63  	endpoints, err := adapter.endpointClient.Endpoints(namespace).Create(context.TODO(), endpoints, metav1.CreateOptions{})
    64  	if err == nil {
    65  		err = adapter.EnsureEndpointSliceFromEndpoints(namespace, endpoints)
    66  	}
    67  	return endpoints, err
    68  }
    69  
    70  // Update accepts a namespace and Endpoints object and updates it and its
    71  // matching EndpointSlice. The updated Endpoints object or an error will be returned.
    72  func (adapter *EndpointsAdapter) Update(namespace string, endpoints *corev1.Endpoints) (*corev1.Endpoints, error) {
    73  	endpoints, err := adapter.endpointClient.Endpoints(namespace).Update(context.TODO(), endpoints, metav1.UpdateOptions{})
    74  	if err == nil {
    75  		err = adapter.EnsureEndpointSliceFromEndpoints(namespace, endpoints)
    76  	}
    77  	return endpoints, err
    78  }
    79  
    80  // EnsureEndpointSliceFromEndpoints accepts a namespace and Endpoints resource
    81  // and creates or updates a corresponding EndpointSlice. An error will be returned
    82  // if it fails to sync the EndpointSlice.
    83  func (adapter *EndpointsAdapter) EnsureEndpointSliceFromEndpoints(namespace string, endpoints *corev1.Endpoints) error {
    84  	endpointSlice := endpointSliceFromEndpoints(endpoints)
    85  	currentEndpointSlice, err := adapter.endpointSliceClient.EndpointSlices(namespace).Get(context.TODO(), endpointSlice.Name, metav1.GetOptions{})
    86  
    87  	if err != nil {
    88  		if errors.IsNotFound(err) {
    89  			if _, err = adapter.endpointSliceClient.EndpointSlices(namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{}); errors.IsAlreadyExists(err) {
    90  				err = nil
    91  			}
    92  		}
    93  		return err
    94  	}
    95  
    96  	// required for transition from IP to IPv4 address type.
    97  	if currentEndpointSlice.AddressType != endpointSlice.AddressType {
    98  		err = adapter.endpointSliceClient.EndpointSlices(namespace).Delete(context.TODO(), endpointSlice.Name, metav1.DeleteOptions{})
    99  		if err != nil {
   100  			return err
   101  		}
   102  		_, err = adapter.endpointSliceClient.EndpointSlices(namespace).Create(context.TODO(), endpointSlice, metav1.CreateOptions{})
   103  		return err
   104  	}
   105  
   106  	if apiequality.Semantic.DeepEqual(currentEndpointSlice.Endpoints, endpointSlice.Endpoints) &&
   107  		apiequality.Semantic.DeepEqual(currentEndpointSlice.Ports, endpointSlice.Ports) &&
   108  		apiequality.Semantic.DeepEqual(currentEndpointSlice.Labels, endpointSlice.Labels) {
   109  		return nil
   110  	}
   111  
   112  	_, err = adapter.endpointSliceClient.EndpointSlices(namespace).Update(context.TODO(), endpointSlice, metav1.UpdateOptions{})
   113  	return err
   114  }
   115  
   116  // endpointSliceFromEndpoints generates an EndpointSlice from an Endpoints
   117  // resource.
   118  func endpointSliceFromEndpoints(endpoints *corev1.Endpoints) *discovery.EndpointSlice {
   119  	endpointSlice := &discovery.EndpointSlice{}
   120  	endpointSlice.Name = endpoints.Name
   121  	endpointSlice.Namespace = endpoints.Namespace
   122  	endpointSlice.Labels = map[string]string{discovery.LabelServiceName: endpoints.Name}
   123  
   124  	// TODO: Add support for dual stack here (and in the rest of
   125  	// EndpointsAdapter).
   126  	endpointSlice.AddressType = discovery.AddressTypeIPv4
   127  
   128  	if len(endpoints.Subsets) > 0 {
   129  		subset := endpoints.Subsets[0]
   130  		for i := range subset.Ports {
   131  			endpointSlice.Ports = append(endpointSlice.Ports, discovery.EndpointPort{
   132  				Port:     &subset.Ports[i].Port,
   133  				Name:     &subset.Ports[i].Name,
   134  				Protocol: &subset.Ports[i].Protocol,
   135  			})
   136  		}
   137  
   138  		if allAddressesIPv6(append(subset.Addresses, subset.NotReadyAddresses...)) {
   139  			endpointSlice.AddressType = discovery.AddressTypeIPv6
   140  		}
   141  
   142  		endpointSlice.Endpoints = append(endpointSlice.Endpoints, getEndpointsFromAddresses(subset.Addresses, endpointSlice.AddressType, true)...)
   143  		endpointSlice.Endpoints = append(endpointSlice.Endpoints, getEndpointsFromAddresses(subset.NotReadyAddresses, endpointSlice.AddressType, false)...)
   144  	}
   145  
   146  	return endpointSlice
   147  }
   148  
   149  // getEndpointsFromAddresses returns a list of Endpoints from addresses that
   150  // match the provided address type.
   151  func getEndpointsFromAddresses(addresses []corev1.EndpointAddress, addressType discovery.AddressType, ready bool) []discovery.Endpoint {
   152  	endpoints := []discovery.Endpoint{}
   153  	isIPv6AddressType := addressType == discovery.AddressTypeIPv6
   154  
   155  	for _, address := range addresses {
   156  		if utilnet.IsIPv6String(address.IP) == isIPv6AddressType {
   157  			endpoints = append(endpoints, endpointFromAddress(address, ready))
   158  		}
   159  	}
   160  
   161  	return endpoints
   162  }
   163  
   164  // endpointFromAddress generates an Endpoint from an EndpointAddress resource.
   165  func endpointFromAddress(address corev1.EndpointAddress, ready bool) discovery.Endpoint {
   166  	ep := discovery.Endpoint{
   167  		Addresses:  []string{address.IP},
   168  		Conditions: discovery.EndpointConditions{Ready: &ready},
   169  		TargetRef:  address.TargetRef,
   170  	}
   171  
   172  	if address.NodeName != nil {
   173  		ep.NodeName = address.NodeName
   174  	}
   175  
   176  	return ep
   177  }
   178  
   179  // allAddressesIPv6 returns true if all provided addresses are IPv6.
   180  func allAddressesIPv6(addresses []corev1.EndpointAddress) bool {
   181  	if len(addresses) == 0 {
   182  		return false
   183  	}
   184  
   185  	for _, address := range addresses {
   186  		if !utilnet.IsIPv6String(address.IP) {
   187  			return false
   188  		}
   189  	}
   190  
   191  	return true
   192  }
   193  
   194  // setSkipMirrorTrue sets endpointslice.kubernetes.io/skip-mirror to true. It
   195  // returns true if this has resulted in a change to the Endpoints resource.
   196  func setSkipMirrorTrue(e *corev1.Endpoints) bool {
   197  	skipMirrorVal, ok := e.Labels[discovery.LabelSkipMirror]
   198  	if !ok || skipMirrorVal != "true" {
   199  		if e.Labels == nil {
   200  			e.Labels = map[string]string{}
   201  		}
   202  		e.Labels[discovery.LabelSkipMirror] = "true"
   203  		return true
   204  	}
   205  	return false
   206  }
   207  

View as plain text