...

Source file src/k8s.io/kubernetes/test/integration/dualstack/dualstack_endpoints_test.go

Documentation: k8s.io/kubernetes/test/integration/dualstack

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package dualstack
    18  
    19  import (
    20  	"fmt"
    21  	"testing"
    22  	"time"
    23  
    24  	v1 "k8s.io/api/core/v1"
    25  	discovery "k8s.io/api/discovery/v1"
    26  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    27  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    28  	"k8s.io/apimachinery/pkg/util/intstr"
    29  	"k8s.io/apimachinery/pkg/util/wait"
    30  	"k8s.io/client-go/informers"
    31  	"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
    32  	"k8s.io/kubernetes/pkg/controller/endpoint"
    33  	"k8s.io/kubernetes/pkg/controller/endpointslice"
    34  	"k8s.io/kubernetes/test/integration/framework"
    35  	"k8s.io/kubernetes/test/utils/ktesting"
    36  )
    37  
    38  func TestDualStackEndpoints(t *testing.T) {
    39  	// Create an IPv4IPv6 dual stack control-plane
    40  	serviceCIDR := "10.0.0.0/16"
    41  	secondaryServiceCIDR := "2001:db8:1::/112"
    42  	labelMap := func() map[string]string {
    43  		return map[string]string{"foo": "bar"}
    44  	}
    45  
    46  	tCtx := ktesting.Init(t)
    47  
    48  	client, _, tearDownFn := framework.StartTestServer(tCtx, t, framework.TestServerSetup{
    49  		ModifyServerRunOptions: func(opts *options.ServerRunOptions) {
    50  			opts.ServiceClusterIPRanges = fmt.Sprintf("%s,%s", serviceCIDR, secondaryServiceCIDR)
    51  			// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
    52  			opts.Admission.GenericAdmission.DisablePlugins = []string{"ServiceAccount"}
    53  		},
    54  	})
    55  	defer tearDownFn()
    56  
    57  	// Wait until the default "kubernetes" service is created.
    58  	if err := wait.Poll(250*time.Millisecond, time.Minute, func() (bool, error) {
    59  		_, err := client.CoreV1().Services(metav1.NamespaceDefault).Get(tCtx, "kubernetes", metav1.GetOptions{})
    60  		if err != nil && !apierrors.IsNotFound(err) {
    61  			return false, err
    62  		}
    63  		return !apierrors.IsNotFound(err), nil
    64  	}); err != nil {
    65  		t.Fatalf("Creating kubernetes service timed out")
    66  	}
    67  
    68  	resyncPeriod := 0 * time.Hour
    69  	informers := informers.NewSharedInformerFactory(client, resyncPeriod)
    70  
    71  	// Create fake node
    72  	testNode := &v1.Node{
    73  		ObjectMeta: metav1.ObjectMeta{
    74  			Name: "fakenode",
    75  		},
    76  		Spec: v1.NodeSpec{Unschedulable: false},
    77  		Status: v1.NodeStatus{
    78  			Conditions: []v1.NodeCondition{
    79  				{
    80  					Type:              v1.NodeReady,
    81  					Status:            v1.ConditionTrue,
    82  					Reason:            fmt.Sprintf("schedulable condition"),
    83  					LastHeartbeatTime: metav1.Time{Time: time.Now()},
    84  				},
    85  			},
    86  		},
    87  	}
    88  	if _, err := client.CoreV1().Nodes().Create(tCtx, testNode, metav1.CreateOptions{}); err != nil {
    89  		t.Fatalf("Failed to create Node %q: %v", testNode.Name, err)
    90  	}
    91  
    92  	epController := endpoint.NewEndpointController(
    93  		tCtx,
    94  		informers.Core().V1().Pods(),
    95  		informers.Core().V1().Services(),
    96  		informers.Core().V1().Endpoints(),
    97  		client,
    98  		1*time.Second)
    99  
   100  	epsController := endpointslice.NewController(
   101  		tCtx,
   102  		informers.Core().V1().Pods(),
   103  		informers.Core().V1().Services(),
   104  		informers.Core().V1().Nodes(),
   105  		informers.Discovery().V1().EndpointSlices(),
   106  		int32(100),
   107  		client,
   108  		1*time.Second)
   109  
   110  	// Start informer and controllers
   111  	informers.Start(tCtx.Done())
   112  	// use only one worker to serialize the updates
   113  	go epController.Run(tCtx, 1)
   114  	go epsController.Run(tCtx, 1)
   115  
   116  	var testcases = []struct {
   117  		name           string
   118  		serviceType    v1.ServiceType
   119  		ipFamilies     []v1.IPFamily
   120  		ipFamilyPolicy v1.IPFamilyPolicy
   121  	}{
   122  		{
   123  			name:           "Service IPv4 Only",
   124  			serviceType:    v1.ServiceTypeClusterIP,
   125  			ipFamilies:     []v1.IPFamily{v1.IPv4Protocol},
   126  			ipFamilyPolicy: v1.IPFamilyPolicySingleStack,
   127  		},
   128  		{
   129  			name:           "Service IPv6 Only",
   130  			serviceType:    v1.ServiceTypeClusterIP,
   131  			ipFamilies:     []v1.IPFamily{v1.IPv6Protocol},
   132  			ipFamilyPolicy: v1.IPFamilyPolicySingleStack,
   133  		},
   134  		{
   135  			name:           "Service IPv6 IPv4 Dual Stack",
   136  			serviceType:    v1.ServiceTypeClusterIP,
   137  			ipFamilies:     []v1.IPFamily{v1.IPv6Protocol, v1.IPv4Protocol},
   138  			ipFamilyPolicy: v1.IPFamilyPolicyRequireDualStack,
   139  		},
   140  		{
   141  			name:           "Service IPv4 IPv6 Dual Stack",
   142  			serviceType:    v1.ServiceTypeClusterIP,
   143  			ipFamilies:     []v1.IPFamily{v1.IPv4Protocol, v1.IPv6Protocol},
   144  			ipFamilyPolicy: v1.IPFamilyPolicyRequireDualStack,
   145  		},
   146  	}
   147  
   148  	for i, tc := range testcases {
   149  		t.Run(tc.name, func(t *testing.T) {
   150  			ns := framework.CreateNamespaceOrDie(client, fmt.Sprintf("test-endpointslice-dualstack-%d", i), t)
   151  			defer framework.DeleteNamespaceOrDie(client, ns, t)
   152  
   153  			// Create a pod with labels
   154  			pod := &v1.Pod{
   155  				ObjectMeta: metav1.ObjectMeta{
   156  					Name:      "test-pod",
   157  					Namespace: ns.Name,
   158  					Labels:    labelMap(),
   159  				},
   160  				Spec: v1.PodSpec{
   161  					NodeName: "fakenode",
   162  					Containers: []v1.Container{
   163  						{
   164  							Name:  "fake-name",
   165  							Image: "fakeimage",
   166  						},
   167  					},
   168  				},
   169  			}
   170  
   171  			createdPod, err := client.CoreV1().Pods(ns.Name).Create(tCtx, pod, metav1.CreateOptions{})
   172  			if err != nil {
   173  				t.Fatalf("Failed to create pod %s: %v", pod.Name, err)
   174  			}
   175  
   176  			// Set pod IPs
   177  			podIPbyFamily := map[v1.IPFamily]string{v1.IPv4Protocol: "1.1.1.1", v1.IPv6Protocol: "2001:db2::65"}
   178  			createdPod.Status = v1.PodStatus{
   179  				Phase:  v1.PodRunning,
   180  				PodIPs: []v1.PodIP{{IP: podIPbyFamily[v1.IPv4Protocol]}, {IP: podIPbyFamily[v1.IPv6Protocol]}},
   181  			}
   182  			_, err = client.CoreV1().Pods(ns.Name).UpdateStatus(tCtx, createdPod, metav1.UpdateOptions{})
   183  			if err != nil {
   184  				t.Fatalf("Failed to update status of pod %s: %v", pod.Name, err)
   185  			}
   186  
   187  			svc := &v1.Service{
   188  				ObjectMeta: metav1.ObjectMeta{
   189  					Name:      fmt.Sprintf("svc-test-%d", i), // use different services for each test
   190  					Namespace: ns.Name,
   191  					Labels:    labelMap(),
   192  				},
   193  				Spec: v1.ServiceSpec{
   194  					Type:           v1.ServiceTypeClusterIP,
   195  					IPFamilies:     tc.ipFamilies,
   196  					IPFamilyPolicy: &tc.ipFamilyPolicy,
   197  					Selector:       labelMap(),
   198  					Ports: []v1.ServicePort{
   199  						{
   200  							Name:       fmt.Sprintf("port-test-%d", i),
   201  							Port:       443,
   202  							TargetPort: intstr.IntOrString{IntVal: 443},
   203  							Protocol:   "TCP",
   204  						},
   205  					},
   206  				},
   207  			}
   208  
   209  			// create a service
   210  			_, err = client.CoreV1().Services(ns.Name).Create(tCtx, svc, metav1.CreateOptions{})
   211  			if err != nil {
   212  				t.Fatalf("Error creating service: %v", err)
   213  			}
   214  
   215  			// wait until endpoints are created
   216  			// legacy endpoints are not dual stack
   217  			// and use the address of the first IP family
   218  			if err := wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
   219  				e, err := client.CoreV1().Endpoints(ns.Name).Get(tCtx, svc.Name, metav1.GetOptions{})
   220  				if err != nil {
   221  					t.Logf("Error fetching endpoints: %v", err)
   222  					return false, nil
   223  				}
   224  				// check if the endpoint addresses match the pod IP of the first IPFamily of the service
   225  				// since this is an integration test PodIPs are not "ready"
   226  				if len(e.Subsets) > 0 && len(e.Subsets[0].NotReadyAddresses) > 0 {
   227  					if e.Subsets[0].NotReadyAddresses[0].IP == podIPbyFamily[tc.ipFamilies[0]] {
   228  						return true, nil
   229  					}
   230  					t.Logf("Endpoint address %s does not match PodIP %s ", e.Subsets[0].Addresses[0].IP, podIPbyFamily[tc.ipFamilies[0]])
   231  				}
   232  				t.Logf("Endpoint does not contain addresses: %s", e.Name)
   233  				return false, nil
   234  			}); err != nil {
   235  				t.Fatalf("Endpoints not found: %v", err)
   236  			}
   237  
   238  			// wait until the endpoint slices are created
   239  			err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
   240  				lSelector := discovery.LabelServiceName + "=" + svc.Name
   241  				esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(tCtx, metav1.ListOptions{LabelSelector: lSelector})
   242  				if err != nil {
   243  					t.Logf("Error listing EndpointSlices: %v", err)
   244  					return false, nil
   245  				}
   246  				// there must be an endpoint slice per ipFamily
   247  				if len(esList.Items) != len(tc.ipFamilies) {
   248  					t.Logf("Waiting for EndpointSlice to be created %v", esList)
   249  					return false, nil
   250  				}
   251  				// there must be an endpoint address per each IP family
   252  				for _, ipFamily := range tc.ipFamilies {
   253  					found := false
   254  					for _, slice := range esList.Items {
   255  						// check if the endpoint addresses match the pod IPs
   256  						if len(slice.Endpoints) > 0 && len(slice.Endpoints[0].Addresses) > 0 {
   257  							if string(ipFamily) == string(slice.AddressType) &&
   258  								slice.Endpoints[0].Addresses[0] == podIPbyFamily[ipFamily] {
   259  								found = true
   260  								break
   261  							}
   262  						}
   263  						t.Logf("Waiting endpoint slice to contain addresses")
   264  					}
   265  					if !found {
   266  						t.Logf("Endpoint slices does not contain PodIP %s", podIPbyFamily[ipFamily])
   267  						return false, nil
   268  					}
   269  				}
   270  				return true, nil
   271  			})
   272  			if err != nil {
   273  				t.Fatalf("Error waiting for endpoint slices: %v", err)
   274  			}
   275  		})
   276  	}
   277  }
   278  

View as plain text