...

Source file src/k8s.io/kubernetes/test/e2e/framework/service/jig.go

Documentation: k8s.io/kubernetes/test/e2e/framework/service

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package service
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"net"
    24  	"strconv"
    25  	"strings"
    26  	"time"
    27  
    28  	"github.com/onsi/ginkgo/v2"
    29  	v1 "k8s.io/api/core/v1"
    30  	discoveryv1 "k8s.io/api/discovery/v1"
    31  	policyv1 "k8s.io/api/policy/v1"
    32  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	"k8s.io/apimachinery/pkg/fields"
    35  	"k8s.io/apimachinery/pkg/labels"
    36  	"k8s.io/apimachinery/pkg/runtime"
    37  	"k8s.io/apimachinery/pkg/util/intstr"
    38  	utilnet "k8s.io/apimachinery/pkg/util/net"
    39  	"k8s.io/apimachinery/pkg/util/sets"
    40  	"k8s.io/apimachinery/pkg/util/uuid"
    41  	"k8s.io/apimachinery/pkg/util/wait"
    42  	"k8s.io/apimachinery/pkg/watch"
    43  	clientset "k8s.io/client-go/kubernetes"
    44  	"k8s.io/client-go/tools/cache"
    45  	"k8s.io/kubernetes/test/e2e/framework"
    46  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    47  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    48  	e2epodoutput "k8s.io/kubernetes/test/e2e/framework/pod/output"
    49  	e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
    50  	testutils "k8s.io/kubernetes/test/utils"
    51  	imageutils "k8s.io/kubernetes/test/utils/image"
    52  	netutils "k8s.io/utils/net"
    53  )
    54  
    55  // NodePortRange should match whatever the default/configured range is
    56  var NodePortRange = utilnet.PortRange{Base: 30000, Size: 2768}
    57  
    58  // It is copied from "k8s.io/kubernetes/pkg/registry/core/service/portallocator"
    59  var errAllocated = errors.New("provided port is already allocated")
    60  
    61  // TestJig is a test jig to help service testing.
    62  type TestJig struct {
    63  	Client    clientset.Interface
    64  	Namespace string
    65  	Name      string
    66  	ID        string
    67  	Labels    map[string]string
    68  	// ExternalIPs should be false for Conformance test
    69  	// Don't check nodeport on external addrs in conformance test, but in e2e test.
    70  	ExternalIPs bool
    71  }
    72  
    73  // NewTestJig allocates and inits a new TestJig.
    74  func NewTestJig(client clientset.Interface, namespace, name string) *TestJig {
    75  	j := &TestJig{}
    76  	j.Client = client
    77  	j.Namespace = namespace
    78  	j.Name = name
    79  	j.ID = j.Name + "-" + string(uuid.NewUUID())
    80  	j.Labels = map[string]string{"testid": j.ID}
    81  
    82  	return j
    83  }
    84  
    85  // newServiceTemplate returns the default v1.Service template for this j, but
    86  // does not actually create the Service.  The default Service has the same name
    87  // as the j and exposes the given port.
    88  func (j *TestJig) newServiceTemplate(proto v1.Protocol, port int32) *v1.Service {
    89  	service := &v1.Service{
    90  		ObjectMeta: metav1.ObjectMeta{
    91  			Namespace: j.Namespace,
    92  			Name:      j.Name,
    93  			Labels:    j.Labels,
    94  		},
    95  		Spec: v1.ServiceSpec{
    96  			Selector: j.Labels,
    97  			Ports: []v1.ServicePort{
    98  				{
    99  					Protocol: proto,
   100  					Port:     port,
   101  				},
   102  			},
   103  		},
   104  	}
   105  	return service
   106  }
   107  
   108  // CreateTCPServiceWithPort creates a new TCP Service with given port based on the
   109  // j's defaults. Callers can provide a function to tweak the Service object before
   110  // it is created.
   111  func (j *TestJig) CreateTCPServiceWithPort(ctx context.Context, tweak func(svc *v1.Service), port int32) (*v1.Service, error) {
   112  	svc := j.newServiceTemplate(v1.ProtocolTCP, port)
   113  	if tweak != nil {
   114  		tweak(svc)
   115  	}
   116  	result, err := j.Client.CoreV1().Services(j.Namespace).Create(ctx, svc, metav1.CreateOptions{})
   117  	if err != nil {
   118  		return nil, fmt.Errorf("failed to create TCP Service %q: %w", svc.Name, err)
   119  	}
   120  	return j.sanityCheckService(result, svc.Spec.Type)
   121  }
   122  
   123  // CreateTCPService creates a new TCP Service based on the j's
   124  // defaults.  Callers can provide a function to tweak the Service object before
   125  // it is created.
   126  func (j *TestJig) CreateTCPService(ctx context.Context, tweak func(svc *v1.Service)) (*v1.Service, error) {
   127  	return j.CreateTCPServiceWithPort(ctx, tweak, 80)
   128  }
   129  
   130  // CreateUDPService creates a new UDP Service based on the j's
   131  // defaults.  Callers can provide a function to tweak the Service object before
   132  // it is created.
   133  func (j *TestJig) CreateUDPService(ctx context.Context, tweak func(svc *v1.Service)) (*v1.Service, error) {
   134  	svc := j.newServiceTemplate(v1.ProtocolUDP, 80)
   135  	if tweak != nil {
   136  		tweak(svc)
   137  	}
   138  	result, err := j.Client.CoreV1().Services(j.Namespace).Create(ctx, svc, metav1.CreateOptions{})
   139  	if err != nil {
   140  		return nil, fmt.Errorf("failed to create UDP Service %q: %w", svc.Name, err)
   141  	}
   142  	return j.sanityCheckService(result, svc.Spec.Type)
   143  }
   144  
   145  // CreateExternalNameService creates a new ExternalName type Service based on the j's defaults.
   146  // Callers can provide a function to tweak the Service object before it is created.
   147  func (j *TestJig) CreateExternalNameService(ctx context.Context, tweak func(svc *v1.Service)) (*v1.Service, error) {
   148  	svc := &v1.Service{
   149  		ObjectMeta: metav1.ObjectMeta{
   150  			Namespace: j.Namespace,
   151  			Name:      j.Name,
   152  			Labels:    j.Labels,
   153  		},
   154  		Spec: v1.ServiceSpec{
   155  			Selector:     j.Labels,
   156  			ExternalName: "foo.example.com",
   157  			Type:         v1.ServiceTypeExternalName,
   158  		},
   159  	}
   160  	if tweak != nil {
   161  		tweak(svc)
   162  	}
   163  	result, err := j.Client.CoreV1().Services(j.Namespace).Create(ctx, svc, metav1.CreateOptions{})
   164  	if err != nil {
   165  		return nil, fmt.Errorf("failed to create ExternalName Service %q: %w", svc.Name, err)
   166  	}
   167  	return j.sanityCheckService(result, svc.Spec.Type)
   168  }
   169  
   170  // ChangeServiceType updates the given service's ServiceType to the given newType.
   171  func (j *TestJig) ChangeServiceType(ctx context.Context, newType v1.ServiceType, timeout time.Duration) error {
   172  	ingressIP := ""
   173  	svc, err := j.UpdateService(ctx, func(s *v1.Service) {
   174  		for _, ing := range s.Status.LoadBalancer.Ingress {
   175  			if ing.IP != "" {
   176  				ingressIP = ing.IP
   177  			}
   178  		}
   179  		s.Spec.Type = newType
   180  		s.Spec.Ports[0].NodePort = 0
   181  	})
   182  	if err != nil {
   183  		return err
   184  	}
   185  	if ingressIP != "" {
   186  		_, err = j.WaitForLoadBalancerDestroy(ctx, ingressIP, int(svc.Spec.Ports[0].Port), timeout)
   187  	}
   188  	return err
   189  }
   190  
   191  // CreateOnlyLocalNodePortService creates a NodePort service with
   192  // ExternalTrafficPolicy set to Local and sanity checks its nodePort.
   193  // If createPod is true, it also creates an RC with 1 replica of
   194  // the standard netexec container used everywhere in this test.
   195  func (j *TestJig) CreateOnlyLocalNodePortService(ctx context.Context, createPod bool) (*v1.Service, error) {
   196  	ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=NodePort and ExternalTrafficPolicy=Local")
   197  	svc, err := j.CreateTCPService(ctx, func(svc *v1.Service) {
   198  		svc.Spec.Type = v1.ServiceTypeNodePort
   199  		svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
   200  		svc.Spec.Ports = []v1.ServicePort{{Protocol: v1.ProtocolTCP, Port: 80}}
   201  	})
   202  	if err != nil {
   203  		return nil, err
   204  	}
   205  
   206  	if createPod {
   207  		ginkgo.By("creating a pod to be part of the service " + j.Name)
   208  		_, err = j.Run(ctx, nil)
   209  		if err != nil {
   210  			return nil, err
   211  		}
   212  	}
   213  	return svc, nil
   214  }
   215  
   216  // CreateOnlyLocalLoadBalancerService creates a loadbalancer service with
   217  // ExternalTrafficPolicy set to Local and waits for it to acquire an ingress IP.
   218  // If createPod is true, it also creates an RC with 1 replica of
   219  // the standard netexec container used everywhere in this test.
   220  func (j *TestJig) CreateOnlyLocalLoadBalancerService(ctx context.Context, timeout time.Duration, createPod bool,
   221  	tweak func(svc *v1.Service)) (*v1.Service, error) {
   222  	_, err := j.CreateLoadBalancerService(ctx, timeout, func(svc *v1.Service) {
   223  		ginkgo.By("setting ExternalTrafficPolicy=Local")
   224  		svc.Spec.ExternalTrafficPolicy = v1.ServiceExternalTrafficPolicyLocal
   225  		if tweak != nil {
   226  			tweak(svc)
   227  		}
   228  	})
   229  	if err != nil {
   230  		return nil, err
   231  	}
   232  
   233  	if createPod {
   234  		ginkgo.By("creating a pod to be part of the service " + j.Name)
   235  		_, err = j.Run(ctx, nil)
   236  		if err != nil {
   237  			return nil, err
   238  		}
   239  	}
   240  	ginkgo.By("waiting for loadbalancer for service " + j.Namespace + "/" + j.Name)
   241  	return j.WaitForLoadBalancer(ctx, timeout)
   242  }
   243  
   244  // CreateLoadBalancerService creates a loadbalancer service and waits
   245  // for it to acquire an ingress IP.
   246  func (j *TestJig) CreateLoadBalancerService(ctx context.Context, timeout time.Duration, tweak func(svc *v1.Service)) (*v1.Service, error) {
   247  	ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=LoadBalancer")
   248  	svc := j.newServiceTemplate(v1.ProtocolTCP, 80)
   249  	svc.Spec.Type = v1.ServiceTypeLoadBalancer
   250  	// We need to turn affinity off for our LB distribution tests
   251  	svc.Spec.SessionAffinity = v1.ServiceAffinityNone
   252  	if tweak != nil {
   253  		tweak(svc)
   254  	}
   255  	_, err := j.Client.CoreV1().Services(j.Namespace).Create(ctx, svc, metav1.CreateOptions{})
   256  	if err != nil {
   257  		return nil, fmt.Errorf("failed to create LoadBalancer Service %q: %w", svc.Name, err)
   258  	}
   259  
   260  	ginkgo.By("waiting for loadbalancer for service " + j.Namespace + "/" + j.Name)
   261  	return j.WaitForLoadBalancer(ctx, timeout)
   262  }
   263  
   264  // GetEndpointNodes returns a map of nodenames:external-ip on which the
   265  // endpoints of the Service are running.
   266  func (j *TestJig) GetEndpointNodes(ctx context.Context) (map[string][]string, error) {
   267  	return j.GetEndpointNodesWithIP(ctx, v1.NodeExternalIP)
   268  }
   269  
   270  // GetEndpointNodesWithIP returns a map of nodenames:<ip of given type> on which the
   271  // endpoints of the Service are running.
   272  func (j *TestJig) GetEndpointNodesWithIP(ctx context.Context, addressType v1.NodeAddressType) (map[string][]string, error) {
   273  	nodes, err := j.ListNodesWithEndpoint(ctx)
   274  	if err != nil {
   275  		return nil, err
   276  	}
   277  	nodeMap := map[string][]string{}
   278  	for _, node := range nodes {
   279  		nodeMap[node.Name] = e2enode.GetAddresses(&node, addressType)
   280  	}
   281  	return nodeMap, nil
   282  }
   283  
   284  // ListNodesWithEndpoint returns a list of nodes on which the
   285  // endpoints of the given Service are running.
   286  func (j *TestJig) ListNodesWithEndpoint(ctx context.Context) ([]v1.Node, error) {
   287  	nodeNames, err := j.GetEndpointNodeNames(ctx)
   288  	if err != nil {
   289  		return nil, err
   290  	}
   291  	allNodes, err := j.Client.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
   292  	if err != nil {
   293  		return nil, err
   294  	}
   295  	epNodes := make([]v1.Node, 0, nodeNames.Len())
   296  	for _, node := range allNodes.Items {
   297  		if nodeNames.Has(node.Name) {
   298  			epNodes = append(epNodes, node)
   299  		}
   300  	}
   301  	return epNodes, nil
   302  }
   303  
   304  // GetEndpointNodeNames returns a string set of node names on which the
   305  // endpoints of the given Service are running.
   306  func (j *TestJig) GetEndpointNodeNames(ctx context.Context) (sets.String, error) {
   307  	err := j.waitForAvailableEndpoint(ctx, ServiceEndpointsTimeout)
   308  	if err != nil {
   309  		return nil, err
   310  	}
   311  	endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
   312  	if err != nil {
   313  		return nil, fmt.Errorf("get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
   314  	}
   315  	if len(endpoints.Subsets) == 0 {
   316  		return nil, fmt.Errorf("endpoint has no subsets, cannot determine node addresses")
   317  	}
   318  	epNodes := sets.NewString()
   319  	for _, ss := range endpoints.Subsets {
   320  		for _, e := range ss.Addresses {
   321  			if e.NodeName != nil {
   322  				epNodes.Insert(*e.NodeName)
   323  			}
   324  		}
   325  	}
   326  	return epNodes, nil
   327  }
   328  
   329  // WaitForEndpointOnNode waits for a service endpoint on the given node.
   330  func (j *TestJig) WaitForEndpointOnNode(ctx context.Context, nodeName string) error {
   331  	return wait.PollUntilContextTimeout(ctx, framework.Poll, KubeProxyLagTimeout, true, func(ctx context.Context) (bool, error) {
   332  		endpoints, err := j.Client.CoreV1().Endpoints(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
   333  		if err != nil {
   334  			framework.Logf("Get endpoints for service %s/%s failed (%s)", j.Namespace, j.Name, err)
   335  			return false, nil
   336  		}
   337  		if len(endpoints.Subsets) == 0 {
   338  			framework.Logf("Expect endpoints with subsets, got none.")
   339  			return false, nil
   340  		}
   341  		// TODO: Handle multiple endpoints
   342  		if len(endpoints.Subsets[0].Addresses) == 0 {
   343  			framework.Logf("Expected Ready endpoints - found none")
   344  			return false, nil
   345  		}
   346  		epHostName := *endpoints.Subsets[0].Addresses[0].NodeName
   347  		framework.Logf("Pod for service %s/%s is on node %s", j.Namespace, j.Name, epHostName)
   348  		if epHostName != nodeName {
   349  			framework.Logf("Found endpoint on wrong node, expected %v, got %v", nodeName, epHostName)
   350  			return false, nil
   351  		}
   352  		return true, nil
   353  	})
   354  }
   355  
   356  // waitForAvailableEndpoint waits for at least 1 endpoint to be available till timeout
   357  func (j *TestJig) waitForAvailableEndpoint(ctx context.Context, timeout time.Duration) error {
   358  	//Wait for endpoints to be created, this may take longer time if service backing pods are taking longer time to run
   359  	endpointSelector := fields.OneTermEqualSelector("metadata.name", j.Name)
   360  	stopCh := make(chan struct{})
   361  	endpointAvailable := false
   362  	endpointSliceAvailable := false
   363  
   364  	var controller cache.Controller
   365  	_, controller = cache.NewInformer(
   366  		&cache.ListWatch{
   367  			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   368  				options.FieldSelector = endpointSelector.String()
   369  				obj, err := j.Client.CoreV1().Endpoints(j.Namespace).List(ctx, options)
   370  				return runtime.Object(obj), err
   371  			},
   372  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   373  				options.FieldSelector = endpointSelector.String()
   374  				return j.Client.CoreV1().Endpoints(j.Namespace).Watch(ctx, options)
   375  			},
   376  		},
   377  		&v1.Endpoints{},
   378  		0,
   379  		cache.ResourceEventHandlerFuncs{
   380  			AddFunc: func(obj interface{}) {
   381  				if e, ok := obj.(*v1.Endpoints); ok {
   382  					if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
   383  						endpointAvailable = true
   384  					}
   385  				}
   386  			},
   387  			UpdateFunc: func(old, cur interface{}) {
   388  				if e, ok := cur.(*v1.Endpoints); ok {
   389  					if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
   390  						endpointAvailable = true
   391  					}
   392  				}
   393  			},
   394  		},
   395  	)
   396  	defer func() {
   397  		close(stopCh)
   398  	}()
   399  
   400  	go controller.Run(stopCh)
   401  
   402  	var esController cache.Controller
   403  	_, esController = cache.NewInformer(
   404  		&cache.ListWatch{
   405  			ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
   406  				options.LabelSelector = "kubernetes.io/service-name=" + j.Name
   407  				obj, err := j.Client.DiscoveryV1().EndpointSlices(j.Namespace).List(ctx, options)
   408  				return runtime.Object(obj), err
   409  			},
   410  			WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
   411  				options.LabelSelector = "kubernetes.io/service-name=" + j.Name
   412  				return j.Client.DiscoveryV1().EndpointSlices(j.Namespace).Watch(ctx, options)
   413  			},
   414  		},
   415  		&discoveryv1.EndpointSlice{},
   416  		0,
   417  		cache.ResourceEventHandlerFuncs{
   418  			AddFunc: func(obj interface{}) {
   419  				if es, ok := obj.(*discoveryv1.EndpointSlice); ok {
   420  					// TODO: currently we only consider addresses in 1 slice, but services with
   421  					// a large number of endpoints (>1000) may have multiple slices. Some slices
   422  					// with only a few addresses. We should check the addresses in all slices.
   423  					if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
   424  						endpointSliceAvailable = true
   425  					}
   426  				}
   427  			},
   428  			UpdateFunc: func(old, cur interface{}) {
   429  				if es, ok := cur.(*discoveryv1.EndpointSlice); ok {
   430  					// TODO: currently we only consider addresses in 1 slice, but services with
   431  					// a large number of endpoints (>1000) may have multiple slices. Some slices
   432  					// with only a few addresses. We should check the addresses in all slices.
   433  					if len(es.Endpoints) > 0 && len(es.Endpoints[0].Addresses) > 0 {
   434  						endpointSliceAvailable = true
   435  					}
   436  				}
   437  			},
   438  		},
   439  	)
   440  
   441  	go esController.Run(stopCh)
   442  
   443  	err := wait.PollWithContext(ctx, 1*time.Second, timeout, func(ctx context.Context) (bool, error) {
   444  		return endpointAvailable && endpointSliceAvailable, nil
   445  	})
   446  	if err != nil {
   447  		return fmt.Errorf("no subset of available IP address found for the endpoint %s within timeout %v", j.Name, timeout)
   448  	}
   449  	return nil
   450  }
   451  
   452  // sanityCheckService performs sanity checks on the given service; in particular, ensuring
   453  // that creating/updating a service allocates IPs, ports, etc, as needed. It does not
   454  // check for ingress assignment as that happens asynchronously after the Service is created.
   455  func (j *TestJig) sanityCheckService(svc *v1.Service, svcType v1.ServiceType) (*v1.Service, error) {
   456  	if svcType == "" {
   457  		svcType = v1.ServiceTypeClusterIP
   458  	}
   459  	if svc.Spec.Type != svcType {
   460  		return nil, fmt.Errorf("unexpected Spec.Type (%s) for service, expected %s", svc.Spec.Type, svcType)
   461  	}
   462  
   463  	if svcType != v1.ServiceTypeExternalName {
   464  		if svc.Spec.ExternalName != "" {
   465  			return nil, fmt.Errorf("unexpected Spec.ExternalName (%s) for service, expected empty", svc.Spec.ExternalName)
   466  		}
   467  		if svc.Spec.ClusterIP == "" {
   468  			return nil, fmt.Errorf("didn't get ClusterIP for non-ExternalName service")
   469  		}
   470  	} else {
   471  		if svc.Spec.ClusterIP != "" {
   472  			return nil, fmt.Errorf("unexpected Spec.ClusterIP (%s) for ExternalName service, expected empty", svc.Spec.ClusterIP)
   473  		}
   474  	}
   475  
   476  	expectNodePorts := needsNodePorts(svc)
   477  	for i, port := range svc.Spec.Ports {
   478  		hasNodePort := (port.NodePort != 0)
   479  		if hasNodePort != expectNodePorts {
   480  			return nil, fmt.Errorf("unexpected Spec.Ports[%d].NodePort (%d) for service", i, port.NodePort)
   481  		}
   482  		if hasNodePort {
   483  			if !NodePortRange.Contains(int(port.NodePort)) {
   484  				return nil, fmt.Errorf("out-of-range nodePort (%d) for service", port.NodePort)
   485  			}
   486  		}
   487  	}
   488  
   489  	// FIXME: this fails for tests that were changed from LoadBalancer to ClusterIP.
   490  	// if svcType != v1.ServiceTypeLoadBalancer {
   491  	// 	if len(svc.Status.LoadBalancer.Ingress) != 0 {
   492  	// 		return nil, fmt.Errorf("unexpected Status.LoadBalancer.Ingress on non-LoadBalancer service")
   493  	// 	}
   494  	// }
   495  
   496  	return svc, nil
   497  }
   498  
   499  func needsNodePorts(svc *v1.Service) bool {
   500  	if svc == nil {
   501  		return false
   502  	}
   503  	// Type NodePort
   504  	if svc.Spec.Type == v1.ServiceTypeNodePort {
   505  		return true
   506  	}
   507  	if svc.Spec.Type != v1.ServiceTypeLoadBalancer {
   508  		return false
   509  	}
   510  	// Type LoadBalancer
   511  	if svc.Spec.AllocateLoadBalancerNodePorts == nil {
   512  		return true //back-compat
   513  	}
   514  	return *svc.Spec.AllocateLoadBalancerNodePorts
   515  }
   516  
   517  // UpdateService fetches a service, calls the update function on it, and
   518  // then attempts to send the updated service. It tries up to 3 times in the
   519  // face of timeouts and conflicts.
   520  func (j *TestJig) UpdateService(ctx context.Context, update func(*v1.Service)) (*v1.Service, error) {
   521  	for i := 0; i < 3; i++ {
   522  		service, err := j.Client.CoreV1().Services(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
   523  		if err != nil {
   524  			return nil, fmt.Errorf("failed to get Service %q: %w", j.Name, err)
   525  		}
   526  		update(service)
   527  		result, err := j.Client.CoreV1().Services(j.Namespace).Update(ctx, service, metav1.UpdateOptions{})
   528  		if err == nil {
   529  			return j.sanityCheckService(result, service.Spec.Type)
   530  		}
   531  		if !apierrors.IsConflict(err) && !apierrors.IsServerTimeout(err) {
   532  			return nil, fmt.Errorf("failed to update Service %q: %w", j.Name, err)
   533  		}
   534  	}
   535  	return nil, fmt.Errorf("too many retries updating Service %q", j.Name)
   536  }
   537  
   538  // WaitForNewIngressIP waits for the given service to get a new ingress IP, or returns an error after the given timeout
   539  func (j *TestJig) WaitForNewIngressIP(ctx context.Context, existingIP string, timeout time.Duration) (*v1.Service, error) {
   540  	framework.Logf("Waiting up to %v for service %q to get a new ingress IP", timeout, j.Name)
   541  	service, err := j.waitForCondition(ctx, timeout, "have a new ingress IP", func(svc *v1.Service) bool {
   542  		if len(svc.Status.LoadBalancer.Ingress) == 0 {
   543  			return false
   544  		}
   545  		ip := svc.Status.LoadBalancer.Ingress[0].IP
   546  		if ip == "" || ip == existingIP {
   547  			return false
   548  		}
   549  		return true
   550  	})
   551  	if err != nil {
   552  		return nil, err
   553  	}
   554  	return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer)
   555  }
   556  
   557  // ChangeServiceNodePort changes node ports of the given service.
   558  func (j *TestJig) ChangeServiceNodePort(ctx context.Context, initial int) (*v1.Service, error) {
   559  	var err error
   560  	var service *v1.Service
   561  	for i := 1; i < NodePortRange.Size; i++ {
   562  		offs1 := initial - NodePortRange.Base
   563  		offs2 := (offs1 + i) % NodePortRange.Size
   564  		newPort := NodePortRange.Base + offs2
   565  		service, err = j.UpdateService(ctx, func(s *v1.Service) {
   566  			s.Spec.Ports[0].NodePort = int32(newPort)
   567  		})
   568  		if err != nil && strings.Contains(err.Error(), errAllocated.Error()) {
   569  			framework.Logf("tried nodePort %d, but it is in use, will try another", newPort)
   570  			continue
   571  		}
   572  		// Otherwise err was nil or err was a real error
   573  		break
   574  	}
   575  	return service, err
   576  }
   577  
   578  // WaitForLoadBalancer waits the given service to have a LoadBalancer, or returns an error after the given timeout
   579  func (j *TestJig) WaitForLoadBalancer(ctx context.Context, timeout time.Duration) (*v1.Service, error) {
   580  	framework.Logf("Waiting up to %v for service %q to have a LoadBalancer", timeout, j.Name)
   581  	service, err := j.waitForCondition(ctx, timeout, "have a load balancer", func(svc *v1.Service) bool {
   582  		return len(svc.Status.LoadBalancer.Ingress) > 0
   583  	})
   584  	if err != nil {
   585  		return nil, err
   586  	}
   587  
   588  	for i, ing := range service.Status.LoadBalancer.Ingress {
   589  		if ing.IP == "" && ing.Hostname == "" {
   590  			return nil, fmt.Errorf("unexpected Status.LoadBalancer.Ingress[%d] for service: %#v", i, ing)
   591  		}
   592  	}
   593  
   594  	return j.sanityCheckService(service, v1.ServiceTypeLoadBalancer)
   595  }
   596  
   597  // WaitForLoadBalancerDestroy waits the given service to destroy a LoadBalancer, or returns an error after the given timeout
   598  func (j *TestJig) WaitForLoadBalancerDestroy(ctx context.Context, ip string, port int, timeout time.Duration) (*v1.Service, error) {
   599  	// TODO: once support ticket 21807001 is resolved, reduce this timeout back to something reasonable
   600  	defer func() {
   601  		if err := framework.EnsureLoadBalancerResourcesDeleted(ctx, ip, strconv.Itoa(port)); err != nil {
   602  			framework.Logf("Failed to delete cloud resources for service: %s %d (%v)", ip, port, err)
   603  		}
   604  	}()
   605  
   606  	framework.Logf("Waiting up to %v for service %q to have no LoadBalancer", timeout, j.Name)
   607  	service, err := j.waitForCondition(ctx, timeout, "have no load balancer", func(svc *v1.Service) bool {
   608  		return len(svc.Status.LoadBalancer.Ingress) == 0
   609  	})
   610  	if err != nil {
   611  		return nil, err
   612  	}
   613  	return j.sanityCheckService(service, service.Spec.Type)
   614  }
   615  
   616  func (j *TestJig) waitForCondition(ctx context.Context, timeout time.Duration, message string, conditionFn func(*v1.Service) bool) (*v1.Service, error) {
   617  	var service *v1.Service
   618  	pollFunc := func(ctx context.Context) (bool, error) {
   619  		svc, err := j.Client.CoreV1().Services(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
   620  		if err != nil {
   621  			framework.Logf("Retrying .... error trying to get Service %s: %v", j.Name, err)
   622  			return false, nil
   623  		}
   624  		if conditionFn(svc) {
   625  			service = svc
   626  			return true, nil
   627  		}
   628  		return false, nil
   629  	}
   630  	if err := wait.PollUntilContextTimeout(ctx, framework.Poll, timeout, true, pollFunc); err != nil {
   631  		return nil, fmt.Errorf("timed out waiting for service %q to %s: %w", j.Name, message, err)
   632  	}
   633  	return service, nil
   634  }
   635  
   636  // newRCTemplate returns the default v1.ReplicationController object for
   637  // this j, but does not actually create the RC.  The default RC has the same
   638  // name as the j and runs the "netexec" container.
   639  func (j *TestJig) newRCTemplate() *v1.ReplicationController {
   640  	var replicas int32 = 1
   641  	var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down
   642  
   643  	rc := &v1.ReplicationController{
   644  		ObjectMeta: metav1.ObjectMeta{
   645  			Namespace: j.Namespace,
   646  			Name:      j.Name,
   647  			Labels:    j.Labels,
   648  		},
   649  		Spec: v1.ReplicationControllerSpec{
   650  			Replicas: &replicas,
   651  			Selector: j.Labels,
   652  			Template: &v1.PodTemplateSpec{
   653  				ObjectMeta: metav1.ObjectMeta{
   654  					Labels: j.Labels,
   655  				},
   656  				Spec: v1.PodSpec{
   657  					Containers: []v1.Container{
   658  						{
   659  							Name:  "netexec",
   660  							Image: imageutils.GetE2EImage(imageutils.Agnhost),
   661  							Args:  []string{"netexec", "--http-port=80", "--udp-port=80"},
   662  							ReadinessProbe: &v1.Probe{
   663  								PeriodSeconds: 3,
   664  								ProbeHandler: v1.ProbeHandler{
   665  									HTTPGet: &v1.HTTPGetAction{
   666  										Port: intstr.FromInt32(80),
   667  										Path: "/hostName",
   668  									},
   669  								},
   670  							},
   671  						},
   672  					},
   673  					TerminationGracePeriodSeconds: &grace,
   674  				},
   675  			},
   676  		},
   677  	}
   678  	return rc
   679  }
   680  
   681  // AddRCAntiAffinity adds AntiAffinity to the given ReplicationController.
   682  func (j *TestJig) AddRCAntiAffinity(rc *v1.ReplicationController) {
   683  	var replicas int32 = 2
   684  
   685  	rc.Spec.Replicas = &replicas
   686  	if rc.Spec.Template.Spec.Affinity == nil {
   687  		rc.Spec.Template.Spec.Affinity = &v1.Affinity{}
   688  	}
   689  	if rc.Spec.Template.Spec.Affinity.PodAntiAffinity == nil {
   690  		rc.Spec.Template.Spec.Affinity.PodAntiAffinity = &v1.PodAntiAffinity{}
   691  	}
   692  	rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution = append(
   693  		rc.Spec.Template.Spec.Affinity.PodAntiAffinity.RequiredDuringSchedulingIgnoredDuringExecution,
   694  		v1.PodAffinityTerm{
   695  			LabelSelector: &metav1.LabelSelector{MatchLabels: j.Labels},
   696  			Namespaces:    nil,
   697  			TopologyKey:   "kubernetes.io/hostname",
   698  		})
   699  }
   700  
   701  // CreatePDB returns a PodDisruptionBudget for the given ReplicationController, or returns an error if a PodDisruptionBudget isn't ready
   702  func (j *TestJig) CreatePDB(ctx context.Context, rc *v1.ReplicationController) (*policyv1.PodDisruptionBudget, error) {
   703  	pdb := j.newPDBTemplate(rc)
   704  	newPdb, err := j.Client.PolicyV1().PodDisruptionBudgets(j.Namespace).Create(ctx, pdb, metav1.CreateOptions{})
   705  	if err != nil {
   706  		return nil, fmt.Errorf("failed to create PDB %q %v", pdb.Name, err)
   707  	}
   708  	if err := j.waitForPdbReady(ctx); err != nil {
   709  		return nil, fmt.Errorf("failed waiting for PDB to be ready: %w", err)
   710  	}
   711  
   712  	return newPdb, nil
   713  }
   714  
   715  // newPDBTemplate returns the default policyv1.PodDisruptionBudget object for
   716  // this j, but does not actually create the PDB.  The default PDB specifies a
   717  // MinAvailable of N-1 and matches the pods created by the RC.
   718  func (j *TestJig) newPDBTemplate(rc *v1.ReplicationController) *policyv1.PodDisruptionBudget {
   719  	minAvailable := intstr.FromInt32(*rc.Spec.Replicas - 1)
   720  
   721  	pdb := &policyv1.PodDisruptionBudget{
   722  		ObjectMeta: metav1.ObjectMeta{
   723  			Namespace: j.Namespace,
   724  			Name:      j.Name,
   725  			Labels:    j.Labels,
   726  		},
   727  		Spec: policyv1.PodDisruptionBudgetSpec{
   728  			MinAvailable: &minAvailable,
   729  			Selector:     &metav1.LabelSelector{MatchLabels: j.Labels},
   730  		},
   731  	}
   732  
   733  	return pdb
   734  }
   735  
   736  // Run creates a ReplicationController and Pod(s) and waits for the
   737  // Pod(s) to be running. Callers can provide a function to tweak the RC object
   738  // before it is created.
   739  func (j *TestJig) Run(ctx context.Context, tweak func(rc *v1.ReplicationController)) (*v1.ReplicationController, error) {
   740  	rc := j.newRCTemplate()
   741  	if tweak != nil {
   742  		tweak(rc)
   743  	}
   744  	result, err := j.Client.CoreV1().ReplicationControllers(j.Namespace).Create(ctx, rc, metav1.CreateOptions{})
   745  	if err != nil {
   746  		return nil, fmt.Errorf("failed to create RC %q: %w", rc.Name, err)
   747  	}
   748  	pods, err := j.waitForPodsCreated(ctx, int(*(rc.Spec.Replicas)))
   749  	if err != nil {
   750  		return nil, fmt.Errorf("failed to create pods: %w", err)
   751  	}
   752  	if err := j.waitForPodsReady(ctx, pods); err != nil {
   753  		return nil, fmt.Errorf("failed waiting for pods to be running: %w", err)
   754  	}
   755  	return result, nil
   756  }
   757  
   758  // Scale scales pods to the given replicas
   759  func (j *TestJig) Scale(ctx context.Context, replicas int) error {
   760  	rc := j.Name
   761  	scale, err := j.Client.CoreV1().ReplicationControllers(j.Namespace).GetScale(ctx, rc, metav1.GetOptions{})
   762  	if err != nil {
   763  		return fmt.Errorf("failed to get scale for RC %q: %w", rc, err)
   764  	}
   765  
   766  	scale.ResourceVersion = "" // indicate the scale update should be unconditional
   767  	scale.Spec.Replicas = int32(replicas)
   768  	_, err = j.Client.CoreV1().ReplicationControllers(j.Namespace).UpdateScale(ctx, rc, scale, metav1.UpdateOptions{})
   769  	if err != nil {
   770  		return fmt.Errorf("failed to scale RC %q: %w", rc, err)
   771  	}
   772  	pods, err := j.waitForPodsCreated(ctx, replicas)
   773  	if err != nil {
   774  		return fmt.Errorf("failed waiting for pods: %w", err)
   775  	}
   776  	if err := j.waitForPodsReady(ctx, pods); err != nil {
   777  		return fmt.Errorf("failed waiting for pods to be running: %w", err)
   778  	}
   779  	return nil
   780  }
   781  
   782  func (j *TestJig) waitForPdbReady(ctx context.Context) error {
   783  	timeout := 2 * time.Minute
   784  	for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
   785  		pdb, err := j.Client.PolicyV1().PodDisruptionBudgets(j.Namespace).Get(ctx, j.Name, metav1.GetOptions{})
   786  		if err != nil {
   787  			return err
   788  		}
   789  		if pdb.Status.DisruptionsAllowed > 0 {
   790  			return nil
   791  		}
   792  	}
   793  
   794  	return fmt.Errorf("timeout waiting for PDB %q to be ready", j.Name)
   795  }
   796  
   797  func (j *TestJig) waitForPodsCreated(ctx context.Context, replicas int) ([]string, error) {
   798  	// TODO (pohly): replace with gomega.Eventually
   799  	timeout := 2 * time.Minute
   800  	// List the pods, making sure we observe all the replicas.
   801  	label := labels.SelectorFromSet(labels.Set(j.Labels))
   802  	framework.Logf("Waiting up to %v for %d pods to be created", timeout, replicas)
   803  	for start := time.Now(); time.Since(start) < timeout && ctx.Err() == nil; time.Sleep(2 * time.Second) {
   804  		options := metav1.ListOptions{LabelSelector: label.String()}
   805  		pods, err := j.Client.CoreV1().Pods(j.Namespace).List(ctx, options)
   806  		if err != nil {
   807  			return nil, err
   808  		}
   809  
   810  		found := []string{}
   811  		for _, pod := range pods.Items {
   812  			if pod.DeletionTimestamp != nil {
   813  				continue
   814  			}
   815  			found = append(found, pod.Name)
   816  		}
   817  		if len(found) == replicas {
   818  			framework.Logf("Found all %d pods", replicas)
   819  			return found, nil
   820  		}
   821  		framework.Logf("Found %d/%d pods - will retry", len(found), replicas)
   822  	}
   823  	return nil, fmt.Errorf("timeout waiting for %d pods to be created", replicas)
   824  }
   825  
   826  func (j *TestJig) waitForPodsReady(ctx context.Context, pods []string) error {
   827  	timeout := 2 * time.Minute
   828  	if !e2epod.CheckPodsRunningReady(ctx, j.Client, j.Namespace, pods, timeout) {
   829  		return fmt.Errorf("timeout waiting for %d pods to be ready", len(pods))
   830  	}
   831  	return nil
   832  }
   833  
   834  func testReachabilityOverServiceName(ctx context.Context, serviceName string, sp v1.ServicePort, execPod *v1.Pod) error {
   835  	return testEndpointReachability(ctx, serviceName, sp.Port, sp.Protocol, execPod)
   836  }
   837  
   838  func testReachabilityOverClusterIP(ctx context.Context, clusterIP string, sp v1.ServicePort, execPod *v1.Pod) error {
   839  	// If .spec.clusterIP is set to "" or "None" for service, ClusterIP is not created, so reachability can not be tested over clusterIP:servicePort
   840  	if netutils.ParseIPSloppy(clusterIP) == nil {
   841  		return fmt.Errorf("unable to parse ClusterIP: %s", clusterIP)
   842  	}
   843  	return testEndpointReachability(ctx, clusterIP, sp.Port, sp.Protocol, execPod)
   844  }
   845  
   846  func testReachabilityOverExternalIP(ctx context.Context, externalIP string, sp v1.ServicePort, execPod *v1.Pod) error {
   847  	return testEndpointReachability(ctx, externalIP, sp.Port, sp.Protocol, execPod)
   848  }
   849  
   850  func testReachabilityOverNodePorts(ctx context.Context, nodes *v1.NodeList, sp v1.ServicePort, pod *v1.Pod, clusterIP string, externalIPs bool) error {
   851  	internalAddrs := e2enode.CollectAddresses(nodes, v1.NodeInternalIP)
   852  	isClusterIPV4 := netutils.IsIPv4String(clusterIP)
   853  
   854  	for _, internalAddr := range internalAddrs {
   855  		// If the node's internal address points to localhost, then we are not
   856  		// able to test the service reachability via that address
   857  		if isInvalidOrLocalhostAddress(internalAddr) {
   858  			framework.Logf("skipping testEndpointReachability() for internal address %s", internalAddr)
   859  			continue
   860  		}
   861  		// Check service reachability on the node internalIP which is same family as clusterIP
   862  		if isClusterIPV4 != netutils.IsIPv4String(internalAddr) {
   863  			framework.Logf("skipping testEndpointReachability() for internal address %s as it does not match clusterIP (%s) family", internalAddr, clusterIP)
   864  			continue
   865  		}
   866  
   867  		err := testEndpointReachability(ctx, internalAddr, sp.NodePort, sp.Protocol, pod)
   868  		if err != nil {
   869  			return err
   870  		}
   871  	}
   872  	if externalIPs {
   873  		externalAddrs := e2enode.CollectAddresses(nodes, v1.NodeExternalIP)
   874  		for _, externalAddr := range externalAddrs {
   875  			if isClusterIPV4 != netutils.IsIPv4String(externalAddr) {
   876  				framework.Logf("skipping testEndpointReachability() for external address %s as it does not match clusterIP (%s) family", externalAddr, clusterIP)
   877  				continue
   878  			}
   879  			err := testEndpointReachability(ctx, externalAddr, sp.NodePort, sp.Protocol, pod)
   880  			if err != nil {
   881  				return err
   882  			}
   883  		}
   884  	}
   885  	return nil
   886  }
   887  
   888  // isInvalidOrLocalhostAddress returns `true` if the provided `ip` is either not
   889  // parsable or the loopback address. Otherwise it will return `false`.
   890  func isInvalidOrLocalhostAddress(ip string) bool {
   891  	parsedIP := netutils.ParseIPSloppy(ip)
   892  	if parsedIP == nil || parsedIP.IsLoopback() {
   893  		return true
   894  	}
   895  	return false
   896  }
   897  
   898  // testEndpointReachability tests reachability to endpoints (i.e. IP, ServiceName) and ports. Test request is initiated from specified execPod.
   899  // TCP and UDP protocol based service are supported at this moment
   900  // TODO: add support to test SCTP Protocol based services.
   901  func testEndpointReachability(ctx context.Context, endpoint string, port int32, protocol v1.Protocol, execPod *v1.Pod) error {
   902  	ep := net.JoinHostPort(endpoint, strconv.Itoa(int(port)))
   903  	cmd := ""
   904  	switch protocol {
   905  	case v1.ProtocolTCP:
   906  		cmd = fmt.Sprintf("echo hostName | nc -v -t -w 2 %s %v", endpoint, port)
   907  	case v1.ProtocolUDP:
   908  		cmd = fmt.Sprintf("echo hostName | nc -v -u -w 2 %s %v", endpoint, port)
   909  	default:
   910  		return fmt.Errorf("service reachability check is not supported for %v", protocol)
   911  	}
   912  
   913  	err := wait.PollUntilContextTimeout(ctx, 1*time.Second, ServiceReachabilityShortPollTimeout, true, func(ctx context.Context) (bool, error) {
   914  		stdout, err := e2epodoutput.RunHostCmd(execPod.Namespace, execPod.Name, cmd)
   915  		if err != nil {
   916  			framework.Logf("Service reachability failing with error: %v\nRetrying...", err)
   917  			return false, nil
   918  		}
   919  		trimmed := strings.TrimSpace(stdout)
   920  		if trimmed != "" {
   921  			return true, nil
   922  		}
   923  		return false, nil
   924  	})
   925  	if err != nil {
   926  		return fmt.Errorf("service is not reachable within %v timeout on endpoint %s over %s protocol", ServiceReachabilityShortPollTimeout, ep, protocol)
   927  	}
   928  	return nil
   929  }
   930  
   931  // checkClusterIPServiceReachability ensures that service of type ClusterIP is reachable over
   932  // - ServiceName:ServicePort, ClusterIP:ServicePort
   933  func (j *TestJig) checkClusterIPServiceReachability(ctx context.Context, svc *v1.Service, pod *v1.Pod) error {
   934  	clusterIP := svc.Spec.ClusterIP
   935  	servicePorts := svc.Spec.Ports
   936  	externalIPs := svc.Spec.ExternalIPs
   937  
   938  	err := j.waitForAvailableEndpoint(ctx, ServiceEndpointsTimeout)
   939  	if err != nil {
   940  		return err
   941  	}
   942  
   943  	for _, servicePort := range servicePorts {
   944  		err = testReachabilityOverServiceName(ctx, svc.Name, servicePort, pod)
   945  		if err != nil {
   946  			return err
   947  		}
   948  		err = testReachabilityOverClusterIP(ctx, clusterIP, servicePort, pod)
   949  		if err != nil {
   950  			return err
   951  		}
   952  		if len(externalIPs) > 0 {
   953  			for _, externalIP := range externalIPs {
   954  				err = testReachabilityOverExternalIP(ctx, externalIP, servicePort, pod)
   955  				if err != nil {
   956  					return err
   957  				}
   958  			}
   959  		}
   960  	}
   961  	return nil
   962  }
   963  
   964  // checkNodePortServiceReachability ensures that service of type nodePort are reachable
   965  //   - Internal clients should be reachable to service over -
   966  //     ServiceName:ServicePort, ClusterIP:ServicePort and NodeInternalIPs:NodePort
   967  //   - External clients should be reachable to service over -
   968  //     NodePublicIPs:NodePort
   969  func (j *TestJig) checkNodePortServiceReachability(ctx context.Context, svc *v1.Service, pod *v1.Pod) error {
   970  	clusterIP := svc.Spec.ClusterIP
   971  	servicePorts := svc.Spec.Ports
   972  
   973  	// Consider only 2 nodes for testing
   974  	nodes, err := e2enode.GetBoundedReadySchedulableNodes(ctx, j.Client, 2)
   975  	if err != nil {
   976  		return err
   977  	}
   978  
   979  	err = j.waitForAvailableEndpoint(ctx, ServiceEndpointsTimeout)
   980  	if err != nil {
   981  		return err
   982  	}
   983  
   984  	for _, servicePort := range servicePorts {
   985  		err = testReachabilityOverServiceName(ctx, svc.Name, servicePort, pod)
   986  		if err != nil {
   987  			return err
   988  		}
   989  		err = testReachabilityOverClusterIP(ctx, clusterIP, servicePort, pod)
   990  		if err != nil {
   991  			return err
   992  		}
   993  		err = testReachabilityOverNodePorts(ctx, nodes, servicePort, pod, clusterIP, j.ExternalIPs)
   994  		if err != nil {
   995  			return err
   996  		}
   997  	}
   998  
   999  	return nil
  1000  }
  1001  
  1002  // checkExternalServiceReachability ensures service of type externalName resolves to IP address and no fake externalName is set
  1003  // FQDN of kubernetes is used as externalName(for air tight platforms).
  1004  func (j *TestJig) checkExternalServiceReachability(ctx context.Context, svc *v1.Service, pod *v1.Pod) error {
  1005  	// NOTE(claudiub): Windows does not support PQDN.
  1006  	svcName := fmt.Sprintf("%s.%s.svc.%s", svc.Name, svc.Namespace, framework.TestContext.ClusterDNSDomain)
  1007  	// Service must resolve to IP
  1008  	cmd := fmt.Sprintf("nslookup %s", svcName)
  1009  	return wait.PollUntilContextTimeout(ctx, framework.Poll, ServiceReachabilityShortPollTimeout, true, func(ctx context.Context) (done bool, err error) {
  1010  		_, stderr, err := e2epodoutput.RunHostCmdWithFullOutput(pod.Namespace, pod.Name, cmd)
  1011  		// NOTE(claudiub): nslookup may return 0 on Windows, even though the DNS name was not found. In this case,
  1012  		// we can check stderr for the error.
  1013  		if err != nil || (framework.NodeOSDistroIs("windows") && strings.Contains(stderr, fmt.Sprintf("can't find %s", svcName))) {
  1014  			framework.Logf("ExternalName service %q failed to resolve to IP", pod.Namespace+"/"+pod.Name)
  1015  			return false, nil
  1016  		}
  1017  		return true, nil
  1018  	})
  1019  }
  1020  
  1021  // CheckServiceReachability ensures that request are served by the services. Only supports Services with type ClusterIP, NodePort and ExternalName.
  1022  func (j *TestJig) CheckServiceReachability(ctx context.Context, svc *v1.Service, pod *v1.Pod) error {
  1023  	svcType := svc.Spec.Type
  1024  
  1025  	_, err := j.sanityCheckService(svc, svcType)
  1026  	if err != nil {
  1027  		return err
  1028  	}
  1029  
  1030  	switch svcType {
  1031  	case v1.ServiceTypeClusterIP:
  1032  		return j.checkClusterIPServiceReachability(ctx, svc, pod)
  1033  	case v1.ServiceTypeNodePort:
  1034  		return j.checkNodePortServiceReachability(ctx, svc, pod)
  1035  	case v1.ServiceTypeExternalName:
  1036  		return j.checkExternalServiceReachability(ctx, svc, pod)
  1037  	case v1.ServiceTypeLoadBalancer:
  1038  		return j.checkClusterIPServiceReachability(ctx, svc, pod)
  1039  	default:
  1040  		return fmt.Errorf("unsupported service type \"%s\" to verify service reachability for \"%s\" service. This may due to diverse implementation of the service type", svcType, svc.Name)
  1041  	}
  1042  }
  1043  
  1044  // CreateServicePods creates a replication controller with the label same as service. Service listens to TCP and UDP.
  1045  func (j *TestJig) CreateServicePods(ctx context.Context, replica int) error {
  1046  	config := testutils.RCConfig{
  1047  		Client:       j.Client,
  1048  		Name:         j.Name,
  1049  		Image:        framework.ServeHostnameImage,
  1050  		Command:      []string{"/agnhost", "serve-hostname", "--http=false", "--tcp", "--udp"},
  1051  		Namespace:    j.Namespace,
  1052  		Labels:       j.Labels,
  1053  		PollInterval: 3 * time.Second,
  1054  		Timeout:      framework.PodReadyBeforeTimeout,
  1055  		Replicas:     replica,
  1056  	}
  1057  	return e2erc.RunRC(ctx, config)
  1058  }
  1059  
  1060  // CreateSCTPServiceWithPort creates a new SCTP Service with given port based on the
  1061  // j's defaults. Callers can provide a function to tweak the Service object before
  1062  // it is created.
  1063  func (j *TestJig) CreateSCTPServiceWithPort(ctx context.Context, tweak func(svc *v1.Service), port int32) (*v1.Service, error) {
  1064  	svc := j.newServiceTemplate(v1.ProtocolSCTP, port)
  1065  	if tweak != nil {
  1066  		tweak(svc)
  1067  	}
  1068  	result, err := j.Client.CoreV1().Services(j.Namespace).Create(ctx, svc, metav1.CreateOptions{})
  1069  	if err != nil {
  1070  		return nil, fmt.Errorf("failed to create SCTP Service %q: %w", svc.Name, err)
  1071  	}
  1072  	return j.sanityCheckService(result, svc.Spec.Type)
  1073  }
  1074  
  1075  // CreateLoadBalancerServiceWaitForClusterIPOnly creates a loadbalancer service and waits
  1076  // for it to acquire a cluster IP
  1077  func (j *TestJig) CreateLoadBalancerServiceWaitForClusterIPOnly(tweak func(svc *v1.Service)) (*v1.Service, error) {
  1078  	ginkgo.By("creating a service " + j.Namespace + "/" + j.Name + " with type=LoadBalancer")
  1079  	svc := j.newServiceTemplate(v1.ProtocolTCP, 80)
  1080  	svc.Spec.Type = v1.ServiceTypeLoadBalancer
  1081  	// We need to turn affinity off for our LB distribution tests
  1082  	svc.Spec.SessionAffinity = v1.ServiceAffinityNone
  1083  	if tweak != nil {
  1084  		tweak(svc)
  1085  	}
  1086  	result, err := j.Client.CoreV1().Services(j.Namespace).Create(context.TODO(), svc, metav1.CreateOptions{})
  1087  	if err != nil {
  1088  		return nil, fmt.Errorf("failed to create LoadBalancer Service %q: %w", svc.Name, err)
  1089  	}
  1090  
  1091  	return j.sanityCheckService(result, v1.ServiceTypeLoadBalancer)
  1092  }
  1093  

View as plain text