...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/external-workload/endpoints_reconciler_test.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination/external-workload

     1  package externalworkload
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"reflect"
     7  	"strings"
     8  	"testing"
     9  
    10  	ewv1beta1 "github.com/linkerd/linkerd2/controller/gen/apis/externalworkload/v1beta1"
    11  	"github.com/linkerd/linkerd2/controller/k8s"
    12  
    13  	corev1 "k8s.io/api/core/v1"
    14  	discoveryv1 "k8s.io/api/discovery/v1"
    15  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    16  	"k8s.io/apimachinery/pkg/labels"
    17  	"k8s.io/apimachinery/pkg/runtime"
    18  	"k8s.io/apimachinery/pkg/runtime/schema"
    19  	k8stesting "k8s.io/client-go/testing"
    20  	epsliceutil "k8s.io/endpointslice/util"
    21  
    22  	"k8s.io/apimachinery/pkg/types"
    23  	"k8s.io/apimachinery/pkg/util/intstr"
    24  	"k8s.io/apimachinery/pkg/util/rand"
    25  	"sigs.k8s.io/yaml"
    26  )
    27  
    28  var (
    29  	httpUnnamedPort = corev1.ServicePort{
    30  		Port: 8080,
    31  		TargetPort: intstr.IntOrString{
    32  			Type:   intstr.Int,
    33  			IntVal: 8080,
    34  		},
    35  	}
    36  
    37  	httpNamedPort = corev1.ServicePort{
    38  		TargetPort: intstr.IntOrString{
    39  			Type:   intstr.String,
    40  			StrVal: "http",
    41  		},
    42  	}
    43  
    44  	defaultTestEndpointsQuota = 100
    45  
    46  	testControllerName = "test-controller"
    47  )
    48  
    49  // === Test create / update / delete ===
    50  
    51  // Test that when a service has no endpointslices written to the API Server, reconciling
    52  // with a workload will create a new endpointslice.
    53  func TestReconcilerCreatesNewEndpointSlice(t *testing.T) {
    54  	// We do not need to receive anything through the informers so
    55  	// create a client with no cached resources
    56  	k8sAPI, err := k8s.NewFakeAPI([]string{}...)
    57  	if err != nil {
    58  		t.Fatalf("unexpected error when creating Kubernetes clientset: %v", err)
    59  	}
    60  
    61  	svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "")
    62  	ew := makeExternalWorkload("1", "wlkd-1", map[string]string{"app": ""}, map[int32]string{8080: ""}, []string{"192.0.2.0"})
    63  	ew.ObjectMeta.UID = types.UID(fmt.Sprintf("%s-%s", ew.Namespace, ew.Name))
    64  
    65  	r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
    66  	err = r.reconcile(svc, []*ewv1beta1.ExternalWorkload{ew}, nil)
    67  	if err != nil {
    68  		t.Fatalf("unexpected error when reconciling endpoints: %v", err)
    69  	}
    70  
    71  	expectedEndpoint := makeEndpoint([]string{"192.0.2.0"}, true, ew)
    72  	es := fetchEndpointSlices(t, k8sAPI, svc)
    73  	if len(es) != 1 {
    74  		t.Fatalf("expected %d endpointslices after reconciliation, got %d instead", 1, len(es))
    75  	}
    76  
    77  	if len(es[0].Endpoints) != 1 {
    78  		t.Fatalf("expected %d endpointslices after reconciliation, got %d instead", 1, len(es[0].Endpoints))
    79  	}
    80  
    81  	if es[0].AddressType != discoveryv1.AddressTypeIPv4 {
    82  		t.Fatalf("expected endpointslice to have AF %s, got %s instead", discoveryv1.AddressTypeIPv4, es[0].AddressType)
    83  	}
    84  	ep := es[0].Endpoints[0]
    85  	diffEndpoints(t, ep, expectedEndpoint)
    86  }
    87  
    88  // Test that when a service has no endpointslices written to the API Server, reconciling
    89  // with a workload will create a new endpointslice. Since it is a headless
    90  // service, we will also get a hostname
    91  func TestReconcilerCreatesNewEndpointSliceHeadless(t *testing.T) {
    92  	// We do not need to receive anything through the informers so
    93  	// create a client with no cached resources
    94  	k8sAPI, err := k8s.NewFakeAPI([]string{}...)
    95  	if err != nil {
    96  		t.Fatalf("unexpected error when creating Kubernetes clientset: %v", err)
    97  	}
    98  
    99  	svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "")
   100  	svc.Spec.ClusterIP = corev1.ClusterIPNone
   101  	ew := makeExternalWorkload("1", "wlkd-1", map[string]string{"app": ""}, map[int32]string{8080: ""}, []string{"192.0.2.0"})
   102  	ew.Namespace = "default"
   103  	ew.ObjectMeta.UID = types.UID(fmt.Sprintf("%s-%s", ew.Namespace, ew.Name))
   104  
   105  	r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
   106  	err = r.reconcile(svc, []*ewv1beta1.ExternalWorkload{ew}, nil)
   107  	if err != nil {
   108  		t.Fatalf("unexpected error when reconciling endpoints: %v", err)
   109  	}
   110  
   111  	expectedEndpoint := makeEndpoint([]string{"192.0.2.0"}, true, ew)
   112  	es := fetchEndpointSlices(t, k8sAPI, svc)
   113  	if len(es) != 1 {
   114  		t.Fatalf("expected %d endpointslices after reconciliation, got %d instead", 1, len(es))
   115  	}
   116  
   117  	if len(es[0].Endpoints) != 1 {
   118  		t.Fatalf("expected %d endpointslices after reconciliation, got %d instead", 1, len(es[0].Endpoints))
   119  	}
   120  
   121  	if es[0].AddressType != discoveryv1.AddressTypeIPv4 {
   122  		t.Fatalf("expected endpointslice to have AF %s, got %s instead", discoveryv1.AddressTypeIPv4, es[0].AddressType)
   123  	}
   124  	ep := es[0].Endpoints[0]
   125  	diffEndpoints(t, ep, expectedEndpoint)
   126  
   127  	if _, ok := es[0].Labels[corev1.IsHeadlessService]; !ok {
   128  		t.Errorf("expected \"%s\" label to be present on the service", corev1.IsHeadlessService)
   129  	}
   130  
   131  	if ep.Hostname == nil {
   132  		t.Fatalf("expected endpoint to have a hostname")
   133  	}
   134  
   135  	if *ep.Hostname != ew.Name {
   136  		t.Errorf("expected \"%s\" as a hostname, got: %s", ew.Name, *ep.Hostname)
   137  	}
   138  
   139  }
   140  
   141  // Test that when a service has an endpointslice written to the API Server,
   142  // reconciling with the two workloads updates the endpointslice
   143  func TestReconcilerUpdatesEndpointSlice(t *testing.T) {
   144  	// Create a service
   145  	svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "")
   146  
   147  	// Create our existing workload
   148  	ewCreated := makeExternalWorkload("1", "wlkd-1", map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{"192.0.2.1"})
   149  	ewCreated.ObjectMeta.UID = types.UID(fmt.Sprintf("%s-%s", ewCreated.Namespace, ewCreated.Name))
   150  
   151  	// Create an endpointslice
   152  	port := int32(8080)
   153  	ports := []discoveryv1.EndpointPort{{
   154  		Port: &port,
   155  	}}
   156  	es := makeEndpointSlice(svc, ports)
   157  	endpoints := []discoveryv1.Endpoint{}
   158  	endpoints = append(endpoints, externalWorkloadToEndpoint(discoveryv1.AddressTypeIPv4, ewCreated, svc))
   159  	es.Endpoints = endpoints
   160  	es.Generation = 1
   161  
   162  	// Create our "new" workload
   163  	ewUpdated := makeExternalWorkload("1", "wlkd-2", map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{"192.0.2.0"})
   164  	ewUpdated.ObjectMeta.UID = types.UID(fmt.Sprintf("%s-%s", ewUpdated.Namespace, ewUpdated.Name))
   165  
   166  	// Convert endpointslice to string and register with fake client
   167  	k8sAPI, err := k8s.NewFakeAPI(endpointSliceAsYaml(t, es))
   168  	if err != nil {
   169  		t.Fatalf("unexpected error when creating Kubernetes clientset: %v", err)
   170  	}
   171  
   172  	r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
   173  	err = r.reconcile(svc, []*ewv1beta1.ExternalWorkload{ewCreated, ewUpdated}, []*discoveryv1.EndpointSlice{es})
   174  	if err != nil {
   175  		t.Fatalf("unexpected error when reconciling endpoints: %v", err)
   176  	}
   177  
   178  	slice, err := k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Get(context.Background(), es.Name, metav1.GetOptions{})
   179  	if err != nil {
   180  		t.Fatalf("unexpected error when retrieving endpointslice: %v", err)
   181  	}
   182  	if len(slice.Endpoints) != 2 {
   183  		t.Fatalf("expected %d endpointslices after reconciliation, got %d instead", 2, len(slice.Endpoints))
   184  	}
   185  
   186  	if slice.AddressType != discoveryv1.AddressTypeIPv4 {
   187  		t.Fatalf("expected endpointslice to have AF %s, got %s instead", discoveryv1.AddressTypeIPv4, slice.AddressType)
   188  	}
   189  
   190  	for _, ep := range slice.Endpoints {
   191  		if ep.TargetRef.Name == ewUpdated.Name {
   192  			expectedEndpoint := makeEndpoint([]string{"192.0.2.0"}, true, ewUpdated)
   193  			diffEndpoints(t, ep, expectedEndpoint)
   194  		} else if ep.TargetRef.Name == ewCreated.Name {
   195  			expectedEndpoint := makeEndpoint([]string{"192.0.2.1"}, true, ewCreated)
   196  			diffEndpoints(t, ep, expectedEndpoint)
   197  		} else {
   198  			t.Errorf("found unexpected targetRef name %s", ep.TargetRef.Name)
   199  		}
   200  	}
   201  }
   202  
   203  // When an endpoint has changed, we should see the endpointslice change its
   204  // endpoint
   205  func TestReconcilerUpdatesEndpointSliceInPlace(t *testing.T) {
   206  	// Create a service
   207  	svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "")
   208  
   209  	// Create our existing workload
   210  	ewCreated := makeExternalWorkload("1", "wlkd-1", map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{"192.0.2.1"})
   211  	ewCreated.ObjectMeta.UID = types.UID(fmt.Sprintf("%s-%s", ewCreated.Namespace, ewCreated.Name))
   212  
   213  	// Create an endpointslice
   214  	port := int32(8080)
   215  	ports := []discoveryv1.EndpointPort{{
   216  		Port: &port,
   217  	}}
   218  	es := makeEndpointSlice(svc, ports)
   219  	endpoints := []discoveryv1.Endpoint{}
   220  	endpoints = append(endpoints, externalWorkloadToEndpoint(discoveryv1.AddressTypeIPv4, ewCreated, svc))
   221  	es.Endpoints = endpoints
   222  	es.Generation = 1
   223  
   224  	// Convert endpointslice to string and register with fake client
   225  	k8sAPI, err := k8s.NewFakeAPI(endpointSliceAsYaml(t, es))
   226  	if err != nil {
   227  		t.Fatalf("unexpected error when creating Kubernetes clientset: %v", err)
   228  	}
   229  
   230  	if err != nil {
   231  		t.Fatalf("unexpected error when retrieving endpointslice: %v", err)
   232  	}
   233  
   234  	// Change the workload
   235  	ewCreated.Labels = map[string]string{
   236  		corev1.LabelTopologyZone: "zone1",
   237  	}
   238  
   239  	r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
   240  	err = r.reconcile(svc, []*ewv1beta1.ExternalWorkload{ewCreated, ewCreated}, []*discoveryv1.EndpointSlice{es})
   241  	if err != nil {
   242  		t.Fatalf("unexpected error when reconciling endpoints: %v", err)
   243  	}
   244  
   245  	slice, err := k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Get(context.Background(), es.Name, metav1.GetOptions{})
   246  	if err != nil {
   247  		t.Fatalf("unexpected error when retrieving endpointslice: %v", err)
   248  	}
   249  	if len(slice.Endpoints) != 1 {
   250  		t.Fatalf("expected %d endpointslices after reconciliation, got %d instead", 1, len(slice.Endpoints))
   251  	}
   252  
   253  	if slice.AddressType != discoveryv1.AddressTypeIPv4 {
   254  		t.Fatalf("expected endpointslice to have AF %s, got %s instead", discoveryv1.AddressTypeIPv4, slice.AddressType)
   255  	}
   256  
   257  	if slice.Generation == 1 {
   258  		t.Fatalf("expected endpointslice to have its generation bumped after update")
   259  	}
   260  
   261  	if *slice.Endpoints[0].Zone != "zone1" {
   262  		t.Fatalf("expected endpoint to be updated with new zone topology")
   263  	}
   264  }
   265  
   266  // === Test ports ===
   267  
   268  // A named port on a service can target a different port on a workload
   269  func TestReconcileEndpointSlicesNamedPorts(t *testing.T) {
   270  	svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpNamedPort}, "192.0.2.1")
   271  	ews := []*ewv1beta1.ExternalWorkload{}
   272  	// Generate a large number of external workloads
   273  	// randomise ports so that a named port maps to different target values
   274  	for i := 0; i < 300; i++ {
   275  		ready := !(i%3 == 0)
   276  		offset := i % 5
   277  		genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
   278  		genPort := int32(8080 + offset)
   279  		ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{genPort: "http"}, []string{genIp})
   280  		ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
   281  		ews = append(ews, ew)
   282  	}
   283  
   284  	k8sAPI, err := k8s.NewFakeAPI([]string{}...)
   285  	if err != nil {
   286  		t.Fatalf("unexpected error when initializing API client: %v", err)
   287  	}
   288  
   289  	// Start with 100 endpoints max quota. Since we have 5 possible ports
   290  	// mapping to name 'http' we will generate 5 slices
   291  	r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
   292  	r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{})
   293  	slices := fetchEndpointSlices(t, k8sAPI, svc)
   294  	expectedNumSlices := 5
   295  	if len(slices) != expectedNumSlices {
   296  		t.Fatalf("expected %d slices to be created, got %d instead", expectedNumSlices, len(slices))
   297  	}
   298  
   299  	// We should have 5 slices with 60 endpoints each
   300  	expectSlicesWithLengths(t, []int{60, 60, 60, 60, 60}, slices)
   301  	expectedSlices := []discoveryv1.EndpointSlice{}
   302  	for i := range slices {
   303  		port := int32(8080 + i)
   304  		expectedSlices = append(expectedSlices, discoveryv1.EndpointSlice{
   305  			Ports: []discoveryv1.EndpointPort{
   306  				{
   307  					Port: &port,
   308  				},
   309  			},
   310  			AddressType: discoveryv1.AddressTypeIPv4,
   311  		})
   312  	}
   313  
   314  	// Diff the ports
   315  	diffEndpointSlicePorts(t, expectedSlices, slices)
   316  }
   317  
   318  // === Test packing logic ===
   319  
   320  // a simple use case with 250 workloads matching a service and no existing slices
   321  // reconcile should create 3 slices, completely filling 2 of them
   322  func TestReconcileManyWorkloads(t *testing.T) {
   323  	svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "10.0.2.1")
   324  	// start with 250 workloads
   325  	ews := []*ewv1beta1.ExternalWorkload{}
   326  	for i := 0; i < 250; i++ {
   327  		ready := !(i%3 == 0)
   328  		genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
   329  		ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{genIp})
   330  
   331  		ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
   332  		ews = append(ews, ew)
   333  	}
   334  
   335  	k8sAPI, actions := newClientset(t, []string{})
   336  	r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
   337  	r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{})
   338  	expectActions(t, actions(), 3, "create", "endpointslices")
   339  
   340  	slices := fetchEndpointSlices(t, k8sAPI, svc)
   341  	expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
   342  }
   343  
   344  // Test with preexisting slices. 250 pods matching a service:
   345  // * First es: 62 endpoints (all desired)
   346  // * Second es: 61 endpoints (all desired)
   347  // We have 127 leftover to add.
   348  //
   349  // We will drop 27 in the first slice closest to full
   350  func TestReconcileEndpointSlicesSomePreexisting(t *testing.T) {
   351  	svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "10.0.2.1")
   352  	// start with 250 workloads
   353  	ews := []*ewv1beta1.ExternalWorkload{}
   354  	for i := 0; i < 250; i++ {
   355  		ready := !(i%3 == 0)
   356  		genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
   357  		ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{genIp})
   358  
   359  		ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
   360  		ews = append(ews, ew)
   361  	}
   362  
   363  	// Create an endpointslice
   364  	port := int32(8080)
   365  	esPorts := []discoveryv1.EndpointPort{{
   366  		Port: &port,
   367  	}}
   368  
   369  	es1 := makeEndpointSlice(svc, esPorts)
   370  	// Take a quarter of workloads in the first slice
   371  	for i := 1; i < len(ews)-4; i += 4 {
   372  		addrs := []string{ews[i].Spec.WorkloadIPs[0].Ip}
   373  		isReady := IsEwReady(ews[i])
   374  		es1.Endpoints = append(es1.Endpoints, makeEndpoint(addrs, isReady, ews[i]))
   375  	}
   376  
   377  	es2 := makeEndpointSlice(svc, esPorts)
   378  	// Take a quarter of workloads in the second slice
   379  	for i := 3; i < len(ews)-4; i += 4 {
   380  		addrs := []string{ews[i].Spec.WorkloadIPs[0].Ip}
   381  		isReady := IsEwReady(ews[i])
   382  		es2.Endpoints = append(es2.Endpoints, makeEndpoint(addrs, isReady, ews[i]))
   383  	}
   384  
   385  	existingSlices := []*discoveryv1.EndpointSlice{es1, es2}
   386  	cmc := newCacheMutationCheck(existingSlices)
   387  	k8sAPI, actions := newClientset(t, []string{})
   388  	for _, slice := range existingSlices {
   389  		_, err := k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Create(context.TODO(), slice, metav1.CreateOptions{})
   390  		if err != nil {
   391  			t.Fatalf("unexpected error when creating Kubernetes obj: %v", err)
   392  		}
   393  	}
   394  
   395  	r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
   396  	r.reconcile(svc, ews, existingSlices)
   397  	expectActions(t, actions(), 2, "update", "endpointslices")
   398  
   399  	slices := fetchEndpointSlices(t, k8sAPI, svc)
   400  	expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
   401  
   402  	// ensure cache mutation has not occurred
   403  	cmc.Check(t)
   404  }
   405  
   406  // Ensure reconciler updates everything in-place when a service requires a
   407  // change. That means we expect to only see updates, no creates.
   408  func TestReconcileEndpointSlicesUpdatingSvc(t *testing.T) {
   409  	svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "10.0.2.1")
   410  	// start with 250 workloads
   411  	ews := []*ewv1beta1.ExternalWorkload{}
   412  	for i := 0; i < 250; i++ {
   413  		ready := !(i%3 == 0)
   414  		genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
   415  		ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{genIp})
   416  
   417  		ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
   418  		ews = append(ews, ew)
   419  	}
   420  
   421  	k8sAPI, actions := newClientset(t, []string{})
   422  	r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
   423  	r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{})
   424  
   425  	slices := fetchEndpointSlices(t, k8sAPI, svc)
   426  	expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
   427  	for _, ew := range ews {
   428  		ew.Spec.Ports[0].Port = int32(81)
   429  	}
   430  	svc.Spec.Ports[0].TargetPort.IntVal = 81
   431  
   432  	r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{&slices[0], &slices[1], &slices[2]})
   433  	expectActions(t, actions(), 3, "update", "endpointslices")
   434  	slices = fetchEndpointSlices(t, k8sAPI, svc)
   435  	expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
   436  	for _, slice := range slices {
   437  		if *slice.Ports[0].Port != 81 {
   438  			t.Errorf("expected targetPort value to be 81, got: %d", slice.Ports[0].Port)
   439  		}
   440  	}
   441  }
   442  
   443  // When service labels update, all slices will require a change.
   444  //
   445  // This test will ensure that we update slices with the appropriate labels when
   446  // a service has changed.
   447  func TestReconcileEndpointSlicesLabelsUpdatingSvc(t *testing.T) {
   448  	svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "10.0.2.1")
   449  	// start with 250 workloads
   450  	ews := []*ewv1beta1.ExternalWorkload{}
   451  	for i := 0; i < 250; i++ {
   452  		ready := !(i%3 == 0)
   453  		genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
   454  		ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{genIp})
   455  
   456  		ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
   457  		ews = append(ews, ew)
   458  	}
   459  
   460  	k8sAPI, actions := newClientset(t, []string{})
   461  	r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
   462  	r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{})
   463  
   464  	slices := fetchEndpointSlices(t, k8sAPI, svc)
   465  	expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
   466  
   467  	// update service with new labels
   468  	svc.Labels = map[string]string{"foo": "bar"}
   469  	r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{&slices[0], &slices[1], &slices[2]})
   470  	expectActions(t, actions(), 3, "update", "endpointslices")
   471  
   472  	slices = fetchEndpointSlices(t, k8sAPI, svc)
   473  	expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
   474  	// check that the labels were updated
   475  	for _, slice := range slices {
   476  		w, ok := slice.Labels["foo"]
   477  		if !ok {
   478  			t.Errorf("expected label \"foo\" from parent service not found")
   479  		} else if "bar" != w {
   480  			t.Errorf("expected EndpointSlice to have parent service labels: have %s value, expected bar", w)
   481  		}
   482  	}
   483  }
   484  
   485  // In some cases, such as service labels updates, all slices for that service will require a change
   486  // However, this should not happen for reserved labels
   487  func TestReconcileEndpointSlicesReservedLabelsSvc(t *testing.T) {
   488  	svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "10.0.2.1")
   489  	// start with 250 workloads
   490  	ews := []*ewv1beta1.ExternalWorkload{}
   491  	for i := 0; i < 250; i++ {
   492  		ready := !(i%3 == 0)
   493  		genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
   494  		ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{genIp})
   495  
   496  		ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
   497  		ews = append(ews, ew)
   498  	}
   499  
   500  	k8sAPI, actions := newClientset(t, []string{})
   501  	r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
   502  	r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{})
   503  	numActionExpected := 3
   504  
   505  	slices := fetchEndpointSlices(t, k8sAPI, svc)
   506  	expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
   507  	numActionExpected++
   508  
   509  	// update service with new labels
   510  	svc.Labels = map[string]string{discoveryv1.LabelServiceName: "bad", discoveryv1.LabelManagedBy: "actor", corev1.IsHeadlessService: "invalid"}
   511  	r.reconcile(svc, ews, []*discoveryv1.EndpointSlice{&slices[0], &slices[1], &slices[2]})
   512  	slices = fetchEndpointSlices(t, k8sAPI, svc)
   513  	numActionExpected++
   514  	if len(actions()) != numActionExpected {
   515  		t.Errorf("expected %d actions, got %d instead", numActionExpected, len(actions()))
   516  	}
   517  
   518  	expectSlicesWithLengths(t, []int{100, 100, 50}, slices)
   519  	// check that the labels were updated
   520  	for _, slice := range slices {
   521  		if v := slice.Labels[discoveryv1.LabelServiceName]; v == "bad" {
   522  			t.Errorf("unexpected label value \"%s\" from parent service found on slice", "bad")
   523  		}
   524  
   525  		if v := slice.Labels[discoveryv1.LabelManagedBy]; v == "actor" {
   526  			t.Errorf("unexpected label value \"%s\" from parent service found on slice", "actor")
   527  		}
   528  
   529  		if v := slice.Labels[corev1.IsHeadlessService]; v == "invalid" {
   530  			t.Errorf("unexpected label value \"%s\" from parent service found on slice", "invalid")
   531  		}
   532  	}
   533  }
   534  
   535  func TestEndpointSlicesAreRecycled(t *testing.T) {
   536  	svc := makeIPv4Service(map[string]string{"app": "test"}, []corev1.ServicePort{httpUnnamedPort}, "10.0.2.1")
   537  	// start with 250 workloads
   538  	ews := []*ewv1beta1.ExternalWorkload{}
   539  	for i := 0; i < 300; i++ {
   540  		ready := !(i%3 == 0)
   541  		genIp := fmt.Sprintf("192.%d.%d.%d", i%5, i%3, i%2)
   542  		ew := makeExternalWorkload("1", fmt.Sprintf("wlkd-%d", i), map[string]string{"app": "test"}, map[int32]string{8080: ""}, []string{genIp})
   543  
   544  		ew.Status.Conditions = []ewv1beta1.WorkloadCondition{newStatusCondition(ready)}
   545  		ews = append(ews, ew)
   546  	}
   547  
   548  	// Create an endpointslice
   549  	port := int32(8080)
   550  	esPorts := []discoveryv1.EndpointPort{{
   551  		Port: &port,
   552  	}}
   553  
   554  	// generate 10 existing slices with 30 endpoints each
   555  	existingSlices := []*discoveryv1.EndpointSlice{}
   556  	for i, ew := range ews {
   557  		sliceNum := i / 30
   558  		if i%30 == 0 {
   559  			existingSlices = append(existingSlices, makeEndpointSlice(svc, esPorts))
   560  		}
   561  
   562  		addrs := []string{ews[i].Spec.WorkloadIPs[0].Ip}
   563  		isReady := IsEwReady(ews[i])
   564  		existingSlices[sliceNum].Endpoints = append(existingSlices[sliceNum].Endpoints, makeEndpoint(addrs, isReady, ew))
   565  	}
   566  
   567  	cmc := newCacheMutationCheck(existingSlices)
   568  	k8sAPI, err := k8s.NewFakeAPI([]string{}...)
   569  	if err != nil {
   570  		t.Fatalf("unexpected error when creating Kubernetes clientset: %v", err)
   571  	}
   572  
   573  	for _, slice := range existingSlices {
   574  		_, err := k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).Create(context.TODO(), slice, metav1.CreateOptions{})
   575  		if err != nil {
   576  			t.Fatalf("unexpected error when creating Kubernetes obj: %v", err)
   577  		}
   578  	}
   579  
   580  	for _, ew := range ews {
   581  		ew.Spec.Ports[0].Port = int32(81)
   582  	}
   583  
   584  	// changing a service port should require all slices to be updated, time for a repack
   585  	svc.Spec.Ports[0].TargetPort.IntVal = 81
   586  	r := newEndpointsReconciler(k8sAPI, testControllerName, defaultTestEndpointsQuota)
   587  	r.reconcile(svc, ews, existingSlices)
   588  
   589  	slices := fetchEndpointSlices(t, k8sAPI, svc)
   590  	expectSlicesWithLengths(t, []int{100, 100, 100}, slices)
   591  	// ensure cache mutation has not occurred
   592  	cmc.Check(t)
   593  }
   594  
   595  func newClientset(t *testing.T, k8sConfigs []string) (*k8s.API, func() []k8stesting.Action) {
   596  	k8sAPI, actions, err := k8s.NewFakeAPIWithActions(k8sConfigs...)
   597  
   598  	if err != nil {
   599  		t.Fatalf("unexpected error %v", err)
   600  	}
   601  
   602  	return k8sAPI, actions
   603  }
   604  
   605  func makeEndpointSlice(svc *corev1.Service, ports []discoveryv1.EndpointPort) *discoveryv1.EndpointSlice {
   606  	// We need an ownerRef to point to our service
   607  	ownerRef := metav1.NewControllerRef(svc, schema.GroupVersionKind{Version: "v1", Kind: "Service"})
   608  	slice := &discoveryv1.EndpointSlice{
   609  		ObjectMeta: metav1.ObjectMeta{
   610  			Name:            fmt.Sprintf("linkerd-external-%s-%s", svc.Name, rand.String(8)),
   611  			Namespace:       svc.Namespace,
   612  			Labels:          map[string]string{},
   613  			OwnerReferences: []metav1.OwnerReference{*ownerRef},
   614  		},
   615  		AddressType: discoveryv1.AddressTypeIPv4,
   616  		Endpoints:   []discoveryv1.Endpoint{},
   617  		Ports:       ports,
   618  	}
   619  	labels, _ := setEndpointSliceLabels(slice, svc, testControllerName)
   620  	slice.Labels = labels
   621  	return slice
   622  }
   623  
   624  // Helper function that tests a set of slices matches a list of expected lengths
   625  // for number of endpoints
   626  func expectSlicesWithLengths(t *testing.T, expectedLengths []int, es []discoveryv1.EndpointSlice) {
   627  	t.Helper()
   628  	noMatch := []string{}
   629  	for _, slice := range es {
   630  		epLen := len(slice.Endpoints)
   631  		matched := false
   632  		for i := 0; i < len(expectedLengths); i++ {
   633  			if epLen == expectedLengths[i] {
   634  				matched = true
   635  				expectedLengths = append(expectedLengths[:i], expectedLengths[i+1:]...)
   636  				break
   637  			}
   638  		}
   639  
   640  		if !matched {
   641  			noMatch = append(noMatch, fmt.Sprintf("%s/%s (%d)", slice.Namespace, slice.Name, len(slice.Endpoints)))
   642  		}
   643  	}
   644  
   645  	if len(noMatch) > 0 {
   646  		t.Fatalf("slices %s did not match the required lengths, unmatched lengths: %v", strings.Join(noMatch, ", "), expectedLengths)
   647  	}
   648  }
   649  
   650  func diffEndpointSlicePorts(t *testing.T, expected, actual []discoveryv1.EndpointSlice) {
   651  	t.Helper()
   652  	if len(expected) != len(actual) {
   653  		t.Fatalf("expected %d slices, got %d instead", len(expected), len(actual))
   654  	}
   655  
   656  	unmatched := []discoveryv1.EndpointSlice{}
   657  	for _, actualSlice := range actual {
   658  		matched := false
   659  		for i := 0; i < len(expected); i++ {
   660  			expectedSlice := expected[i]
   661  			expectedHash := epsliceutil.NewPortMapKey(expectedSlice.Ports)
   662  			actualHash := epsliceutil.NewPortMapKey(actualSlice.Ports)
   663  
   664  			if (actualSlice.AddressType == expectedSlice.AddressType) &&
   665  				(actualHash == expectedHash) {
   666  				matched = true
   667  				expected = append(expected[:i], expected[i+1:]...)
   668  				break
   669  			}
   670  		}
   671  
   672  		if !matched {
   673  			unmatched = append(unmatched, actualSlice)
   674  		}
   675  	}
   676  
   677  	if len(expected) != 0 {
   678  		t.Errorf("expected slices not found in actual list of EndpointSlices")
   679  	}
   680  
   681  	if len(unmatched) > 0 {
   682  		t.Errorf("found %d slices that do not match expected ports", len(unmatched))
   683  	}
   684  }
   685  
   686  // === Test utilities ===
   687  
   688  // Modify a slice's name in-place since the fake API server does not support
   689  // generated names
   690  func endpointSliceAsYaml(t *testing.T, es *discoveryv1.EndpointSlice) string {
   691  	if es.Name == "" {
   692  		es.Name = fmt.Sprintf("%s-%s", es.ObjectMeta.GenerateName, rand.String(5))
   693  		es.GenerateName = ""
   694  	}
   695  	es.TypeMeta = metav1.TypeMeta{
   696  		APIVersion: "discovery.k8s.io/v1",
   697  		Kind:       "EndpointSlice",
   698  	}
   699  
   700  	b, err := yaml.Marshal(es)
   701  	if err != nil {
   702  		t.Fatalf("unexpected error when serializing endpointslices to yaml")
   703  	}
   704  
   705  	return string(b)
   706  
   707  }
   708  
   709  func makeIPv4Service(selector map[string]string, ports []corev1.ServicePort, clusterIP string) *corev1.Service {
   710  	return &corev1.Service{
   711  		ObjectMeta: metav1.ObjectMeta{
   712  			Name:      "test-svc",
   713  			Namespace: "default",
   714  			UID:       "default-test-svc",
   715  		},
   716  		Spec: corev1.ServiceSpec{
   717  			Ports:      ports,
   718  			Selector:   selector,
   719  			ClusterIP:  clusterIP,
   720  			IPFamilies: []corev1.IPFamily{corev1.IPv4Protocol},
   721  		},
   722  		Status: corev1.ServiceStatus{},
   723  	}
   724  }
   725  
   726  func makeEndpoint(addrs []string, isReady bool, ew *ewv1beta1.ExternalWorkload) discoveryv1.Endpoint {
   727  	rdy := &isReady
   728  	term := !isReady
   729  	ep := discoveryv1.Endpoint{
   730  		Addresses: addrs,
   731  		Conditions: discoveryv1.EndpointConditions{
   732  			Ready:       rdy,
   733  			Serving:     rdy,
   734  			Terminating: &term,
   735  		},
   736  		TargetRef: &corev1.ObjectReference{
   737  			Kind:      ew.Kind,
   738  			Namespace: ew.Namespace,
   739  			Name:      ew.Name,
   740  			UID:       ew.UID,
   741  		},
   742  	}
   743  	return ep
   744  }
   745  
   746  func fetchEndpointSlices(t *testing.T, k8sAPI *k8s.API, svc *corev1.Service) []discoveryv1.EndpointSlice {
   747  	t.Helper()
   748  	selector := labels.Set(map[string]string{
   749  		discoveryv1.LabelServiceName: svc.Name,
   750  		discoveryv1.LabelManagedBy:   testControllerName,
   751  	}).AsSelectorPreValidated()
   752  	fetchedSlices, err := k8sAPI.Client.DiscoveryV1().EndpointSlices(svc.Namespace).List(context.Background(), metav1.ListOptions{
   753  		LabelSelector: selector.String(),
   754  	})
   755  	if err != nil {
   756  		t.Fatalf("unexpected error when fetching endpointslices: %v", err)
   757  	}
   758  
   759  	return fetchedSlices.Items
   760  }
   761  
   762  func diffEndpoints(t *testing.T, actual, expected discoveryv1.Endpoint) {
   763  	t.Helper()
   764  	if len(actual.Addresses) != len(expected.Addresses) {
   765  		t.Errorf("expected %d addresses, got %d instead", len(expected.Addresses), len(actual.Addresses))
   766  	}
   767  
   768  	if actual.Conditions.Ready != nil && expected.Conditions.Ready != nil {
   769  		if *actual.Conditions.Ready != *expected.Conditions.Ready {
   770  			t.Errorf("expected \"ready\" condition to be %t, got %t instead", *expected.Conditions.Ready, *actual.Conditions.Ready)
   771  		}
   772  	}
   773  
   774  	if actual.Conditions.Serving != nil && expected.Conditions.Serving != nil {
   775  		if *actual.Conditions.Serving != *expected.Conditions.Serving {
   776  			t.Errorf("expected \"serving\" condition to be %t, got %t instead", *expected.Conditions.Serving, *actual.Conditions.Serving)
   777  		}
   778  	}
   779  
   780  	if actual.Conditions.Terminating != nil && expected.Conditions.Terminating != nil {
   781  		if *actual.Conditions.Terminating != *expected.Conditions.Terminating {
   782  			t.Errorf("expected \"terminating\" condition to be %t, got %t instead", *expected.Conditions.Terminating, *actual.Conditions.Terminating)
   783  		}
   784  	}
   785  
   786  	if actual.Zone != nil && expected.Zone != nil {
   787  		if *actual.Zone != *expected.Zone {
   788  			t.Errorf("expected \"zone=%s\", got \"zone=%s\" instead", *expected.Zone, *actual.Zone)
   789  		}
   790  	}
   791  
   792  	actualAddrs := toSet(actual.Addresses)
   793  	expAddrs := toSet(expected.Addresses)
   794  	for actualAddr := range actualAddrs {
   795  		if _, found := expAddrs[actualAddr]; !found {
   796  			t.Errorf("found unexpected address %s in the actual endpoint", actualAddr)
   797  		}
   798  	}
   799  
   800  	for expAddr := range expAddrs {
   801  		if _, found := actualAddrs[expAddr]; !found {
   802  			t.Errorf("expected to find address %s in the actual endpoint", expAddr)
   803  		}
   804  	}
   805  
   806  	expRef := expected.TargetRef
   807  	actRef := actual.TargetRef
   808  	if expRef.UID != actRef.UID {
   809  		t.Errorf("expected targetRef with UID %s; got %s instead", expRef.UID, actRef.UID)
   810  	}
   811  
   812  	if expRef.Name != actRef.Name {
   813  		t.Errorf("expected targetRef with name %s; got %s instead", expRef.Name, actRef.Name)
   814  	}
   815  
   816  }
   817  
   818  // === impl cache mutation check
   819  
   820  // Code originally forked from:
   821  //
   822  // https://github.com/kubernetes/endpointslice/commit/a09c1c9580d13f5020248d25c7fd11f5dde6dd9b
   823  
   824  // cacheMutationCheck helps ensure that cached objects have not been changed
   825  // in any way throughout a test run.
   826  type cacheMutationCheck struct {
   827  	objects []cacheObject
   828  }
   829  
   830  // cacheObject stores a reference to an original object as well as a deep copy
   831  // of that object to track any mutations in the original object.
   832  type cacheObject struct {
   833  	original runtime.Object
   834  	deepCopy runtime.Object
   835  }
   836  
   837  // newCacheMutationCheck initializes a cacheMutationCheck with EndpointSlices.
   838  func newCacheMutationCheck(endpointSlices []*discoveryv1.EndpointSlice) cacheMutationCheck {
   839  	cmc := cacheMutationCheck{}
   840  	for _, endpointSlice := range endpointSlices {
   841  		cmc.Add(endpointSlice)
   842  	}
   843  	return cmc
   844  }
   845  
   846  // Add appends a runtime.Object and a deep copy of that object into the
   847  // cacheMutationCheck.
   848  func (cmc *cacheMutationCheck) Add(o runtime.Object) {
   849  	cmc.objects = append(cmc.objects, cacheObject{
   850  		original: o,
   851  		deepCopy: o.DeepCopyObject(),
   852  	})
   853  }
   854  
   855  // Check verifies that no objects in the cacheMutationCheck have been mutated.
   856  func (cmc *cacheMutationCheck) Check(t *testing.T) {
   857  	for _, o := range cmc.objects {
   858  		if !reflect.DeepEqual(o.original, o.deepCopy) {
   859  			// Cached objects can't be safely mutated and instead should be deep
   860  			// copied before changed in any way.
   861  			t.Errorf("Cached object was unexpectedly mutated. Original: %+v, Mutated: %+v", o.deepCopy, o.original)
   862  		}
   863  	}
   864  }
   865  
   866  func toSet(s []string) map[string]struct{} {
   867  	set := map[string]struct{}{}
   868  	for _, k := range s {
   869  		set[k] = struct{}{}
   870  	}
   871  	return set
   872  }
   873  

View as plain text