...

Source file src/k8s.io/kubernetes/test/integration/endpointslice/endpointslicemirroring_test.go

Documentation: k8s.io/kubernetes/test/integration/endpointslice

     1  /*
     2  Copyright 2020 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package endpointslice
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"sort"
    23  	"testing"
    24  	"time"
    25  
    26  	corev1 "k8s.io/api/core/v1"
    27  	discovery "k8s.io/api/discovery/v1"
    28  	apiequality "k8s.io/apimachinery/pkg/api/equality"
    29  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    30  	"k8s.io/apimachinery/pkg/util/wait"
    31  	"k8s.io/client-go/informers"
    32  	clientset "k8s.io/client-go/kubernetes"
    33  	kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing"
    34  	"k8s.io/kubernetes/pkg/controller/endpoint"
    35  	"k8s.io/kubernetes/pkg/controller/endpointslice"
    36  	"k8s.io/kubernetes/pkg/controller/endpointslicemirroring"
    37  	"k8s.io/kubernetes/test/integration/framework"
    38  	"k8s.io/kubernetes/test/utils/ktesting"
    39  )
    40  
    41  func TestEndpointSliceMirroring(t *testing.T) {
    42  	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
    43  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
    44  	defer server.TearDownFn()
    45  
    46  	client, err := clientset.NewForConfig(server.ClientConfig)
    47  	if err != nil {
    48  		t.Fatalf("Error creating clientset: %v", err)
    49  	}
    50  
    51  	tCtx := ktesting.Init(t)
    52  	resyncPeriod := 12 * time.Hour
    53  	informers := informers.NewSharedInformerFactory(client, resyncPeriod)
    54  
    55  	epController := endpoint.NewEndpointController(
    56  		tCtx,
    57  		informers.Core().V1().Pods(),
    58  		informers.Core().V1().Services(),
    59  		informers.Core().V1().Endpoints(),
    60  		client,
    61  		1*time.Second)
    62  
    63  	epsController := endpointslice.NewController(
    64  		tCtx,
    65  		informers.Core().V1().Pods(),
    66  		informers.Core().V1().Services(),
    67  		informers.Core().V1().Nodes(),
    68  		informers.Discovery().V1().EndpointSlices(),
    69  		int32(100),
    70  		client,
    71  		1*time.Second)
    72  
    73  	epsmController := endpointslicemirroring.NewController(
    74  		tCtx,
    75  		informers.Core().V1().Endpoints(),
    76  		informers.Discovery().V1().EndpointSlices(),
    77  		informers.Core().V1().Services(),
    78  		int32(100),
    79  		client,
    80  		1*time.Second)
    81  
    82  	// Start informer and controllers
    83  	informers.Start(tCtx.Done())
    84  	go epController.Run(tCtx, 5)
    85  	go epsController.Run(tCtx, 5)
    86  	go epsmController.Run(tCtx, 5)
    87  
    88  	testCases := []struct {
    89  		testName                     string
    90  		service                      *corev1.Service
    91  		customEndpoints              *corev1.Endpoints
    92  		expectEndpointSlice          int
    93  		expectEndpointSliceManagedBy string
    94  	}{{
    95  		testName: "Service with selector",
    96  		service: &corev1.Service{
    97  			ObjectMeta: metav1.ObjectMeta{
    98  				Name: "test-123",
    99  			},
   100  			Spec: corev1.ServiceSpec{
   101  				Ports: []corev1.ServicePort{{
   102  					Port: int32(80),
   103  				}},
   104  				Selector: map[string]string{
   105  					"foo": "bar",
   106  				},
   107  			},
   108  		},
   109  		expectEndpointSlice:          1,
   110  		expectEndpointSliceManagedBy: "endpointslice-controller.k8s.io",
   111  	}, {
   112  		testName: "Service without selector",
   113  		service: &corev1.Service{
   114  			ObjectMeta: metav1.ObjectMeta{
   115  				Name: "test-123",
   116  			},
   117  			Spec: corev1.ServiceSpec{
   118  				Ports: []corev1.ServicePort{{
   119  					Port: int32(80),
   120  				}},
   121  			},
   122  		},
   123  		customEndpoints: &corev1.Endpoints{
   124  			ObjectMeta: metav1.ObjectMeta{
   125  				Name: "test-123",
   126  			},
   127  			Subsets: []corev1.EndpointSubset{{
   128  				Ports: []corev1.EndpointPort{{
   129  					Port: 80,
   130  				}},
   131  				Addresses: []corev1.EndpointAddress{{
   132  					IP: "10.0.0.1",
   133  				}},
   134  			}},
   135  		},
   136  		expectEndpointSlice:          1,
   137  		expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io",
   138  	}, {
   139  		testName: "Service without selector Endpoint multiple subsets and same address",
   140  		service: &corev1.Service{
   141  			ObjectMeta: metav1.ObjectMeta{
   142  				Name: "test-123",
   143  			},
   144  			Spec: corev1.ServiceSpec{
   145  				Ports: []corev1.ServicePort{{
   146  					Port: int32(80),
   147  				}},
   148  			},
   149  		},
   150  		customEndpoints: &corev1.Endpoints{
   151  			ObjectMeta: metav1.ObjectMeta{
   152  				Name: "test-123",
   153  			},
   154  			Subsets: []corev1.EndpointSubset{
   155  				{
   156  					Ports: []corev1.EndpointPort{{
   157  						Name: "port1",
   158  						Port: 80,
   159  					}},
   160  					Addresses: []corev1.EndpointAddress{{
   161  						IP: "10.0.0.1",
   162  					}},
   163  				},
   164  				{
   165  					Ports: []corev1.EndpointPort{{
   166  						Name: "port2",
   167  						Port: 90,
   168  					}},
   169  					Addresses: []corev1.EndpointAddress{{
   170  						IP: "10.0.0.1",
   171  					}},
   172  				},
   173  			},
   174  		},
   175  		expectEndpointSlice:          1,
   176  		expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io",
   177  	}, {
   178  		testName: "Service without selector Endpoint multiple subsets",
   179  		service: &corev1.Service{
   180  			ObjectMeta: metav1.ObjectMeta{
   181  				Name: "test-123",
   182  			},
   183  			Spec: corev1.ServiceSpec{
   184  				Ports: []corev1.ServicePort{{
   185  					Port: int32(80),
   186  				}},
   187  			},
   188  		},
   189  		customEndpoints: &corev1.Endpoints{
   190  			ObjectMeta: metav1.ObjectMeta{
   191  				Name: "test-123",
   192  			},
   193  			Subsets: []corev1.EndpointSubset{
   194  				{
   195  					Ports: []corev1.EndpointPort{{
   196  						Name: "port1",
   197  						Port: 80,
   198  					}},
   199  					Addresses: []corev1.EndpointAddress{{
   200  						IP: "10.0.0.1",
   201  					}},
   202  				},
   203  				{
   204  					Ports: []corev1.EndpointPort{{
   205  						Name: "port2",
   206  						Port: 90,
   207  					}},
   208  					Addresses: []corev1.EndpointAddress{{
   209  						IP: "10.0.0.2",
   210  					}},
   211  				},
   212  			},
   213  		},
   214  		expectEndpointSlice:          2,
   215  		expectEndpointSliceManagedBy: "endpointslicemirroring-controller.k8s.io",
   216  	}, {
   217  		testName: "Service without Endpoints",
   218  		service: &corev1.Service{
   219  			ObjectMeta: metav1.ObjectMeta{
   220  				Name: "test-123",
   221  			},
   222  			Spec: corev1.ServiceSpec{
   223  				Ports: []corev1.ServicePort{{
   224  					Port: int32(80),
   225  				}},
   226  				Selector: map[string]string{
   227  					"foo": "bar",
   228  				},
   229  			},
   230  		},
   231  		customEndpoints:              nil,
   232  		expectEndpointSlice:          1,
   233  		expectEndpointSliceManagedBy: "endpointslice-controller.k8s.io",
   234  	}, {
   235  		testName: "Endpoints without Service",
   236  		service:  nil,
   237  		customEndpoints: &corev1.Endpoints{
   238  			ObjectMeta: metav1.ObjectMeta{
   239  				Name: "test-123",
   240  			},
   241  			Subsets: []corev1.EndpointSubset{{
   242  				Ports: []corev1.EndpointPort{{
   243  					Port: 80,
   244  				}},
   245  				Addresses: []corev1.EndpointAddress{{
   246  					IP: "10.0.0.1",
   247  				}},
   248  			}},
   249  		},
   250  		expectEndpointSlice: 0,
   251  	}}
   252  
   253  	for i, tc := range testCases {
   254  		t.Run(tc.testName, func(t *testing.T) {
   255  			ns := framework.CreateNamespaceOrDie(client, fmt.Sprintf("test-endpointslice-mirroring-%d", i), t)
   256  			defer framework.DeleteNamespaceOrDie(client, ns, t)
   257  
   258  			resourceName := ""
   259  			if tc.service != nil {
   260  				resourceName = tc.service.Name
   261  				tc.service.Namespace = ns.Name
   262  				_, err = client.CoreV1().Services(ns.Name).Create(tCtx, tc.service, metav1.CreateOptions{})
   263  				if err != nil {
   264  					t.Fatalf("Error creating service: %v", err)
   265  				}
   266  			}
   267  
   268  			if tc.customEndpoints != nil {
   269  				resourceName = tc.customEndpoints.Name
   270  				tc.customEndpoints.Namespace = ns.Name
   271  				_, err = client.CoreV1().Endpoints(ns.Name).Create(tCtx, tc.customEndpoints, metav1.CreateOptions{})
   272  				if err != nil {
   273  					t.Fatalf("Error creating endpoints: %v", err)
   274  				}
   275  			}
   276  
   277  			err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
   278  				lSelector := discovery.LabelServiceName + "=" + resourceName
   279  				esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(tCtx, metav1.ListOptions{LabelSelector: lSelector})
   280  				if err != nil {
   281  					t.Logf("Error listing EndpointSlices: %v", err)
   282  					return false, err
   283  				}
   284  
   285  				if tc.expectEndpointSlice > 0 {
   286  					if len(esList.Items) < tc.expectEndpointSlice {
   287  						t.Logf("Waiting for EndpointSlice to be created")
   288  						return false, nil
   289  					}
   290  					if len(esList.Items) != tc.expectEndpointSlice {
   291  						return false, fmt.Errorf("Only expected %d EndpointSlice, got %d", tc.expectEndpointSlice, len(esList.Items))
   292  					}
   293  					endpointSlice := esList.Items[0]
   294  					if tc.expectEndpointSliceManagedBy != "" {
   295  						if endpointSlice.Labels[discovery.LabelManagedBy] != tc.expectEndpointSliceManagedBy {
   296  							return false, fmt.Errorf("Expected EndpointSlice to be managed by %s, got %s", tc.expectEndpointSliceManagedBy, endpointSlice.Labels[discovery.LabelManagedBy])
   297  						}
   298  					}
   299  				} else if len(esList.Items) > 0 {
   300  					t.Logf("Waiting for EndpointSlices to be removed, still %d", len(esList.Items))
   301  					return false, nil
   302  				}
   303  
   304  				return true, nil
   305  			})
   306  			if err != nil {
   307  				t.Fatalf("Timed out waiting for conditions: %v", err)
   308  			}
   309  		})
   310  	}
   311  
   312  }
   313  
   314  func TestEndpointSliceMirroringUpdates(t *testing.T) {
   315  	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
   316  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
   317  	defer server.TearDownFn()
   318  
   319  	client, err := clientset.NewForConfig(server.ClientConfig)
   320  	if err != nil {
   321  		t.Fatalf("Error creating clientset: %v", err)
   322  	}
   323  
   324  	resyncPeriod := 12 * time.Hour
   325  	informers := informers.NewSharedInformerFactory(client, resyncPeriod)
   326  
   327  	tCtx := ktesting.Init(t)
   328  	epsmController := endpointslicemirroring.NewController(
   329  		tCtx,
   330  		informers.Core().V1().Endpoints(),
   331  		informers.Discovery().V1().EndpointSlices(),
   332  		informers.Core().V1().Services(),
   333  		int32(100),
   334  		client,
   335  		1*time.Second)
   336  
   337  	// Start informer and controllers
   338  	informers.Start(tCtx.Done())
   339  	go epsmController.Run(tCtx, 1)
   340  
   341  	testCases := []struct {
   342  		testName      string
   343  		tweakEndpoint func(ep *corev1.Endpoints)
   344  	}{
   345  		{
   346  			testName: "Update labels",
   347  			tweakEndpoint: func(ep *corev1.Endpoints) {
   348  				ep.Labels["foo"] = "bar"
   349  			},
   350  		},
   351  		{
   352  			testName: "Update annotations",
   353  			tweakEndpoint: func(ep *corev1.Endpoints) {
   354  				ep.Annotations["foo2"] = "bar2"
   355  			},
   356  		},
   357  		{
   358  			testName: "Update annotations but triggertime",
   359  			tweakEndpoint: func(ep *corev1.Endpoints) {
   360  				ep.Annotations["foo2"] = "bar2"
   361  				ep.Annotations[corev1.EndpointsLastChangeTriggerTime] = "date"
   362  			},
   363  		},
   364  		{
   365  			testName: "Update addresses",
   366  			tweakEndpoint: func(ep *corev1.Endpoints) {
   367  				ep.Subsets[0].Addresses = []corev1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "1.2.3.6"}}
   368  			},
   369  		},
   370  	}
   371  
   372  	for i, tc := range testCases {
   373  		t.Run(tc.testName, func(t *testing.T) {
   374  			ns := framework.CreateNamespaceOrDie(client, fmt.Sprintf("test-endpointslice-mirroring-%d", i), t)
   375  			defer framework.DeleteNamespaceOrDie(client, ns, t)
   376  
   377  			service := &corev1.Service{
   378  				ObjectMeta: metav1.ObjectMeta{
   379  					Name:      "test-123",
   380  					Namespace: ns.Name,
   381  				},
   382  				Spec: corev1.ServiceSpec{
   383  					Ports: []corev1.ServicePort{{
   384  						Port: int32(80),
   385  					}},
   386  				},
   387  			}
   388  
   389  			customEndpoints := &corev1.Endpoints{
   390  				ObjectMeta: metav1.ObjectMeta{
   391  					Name:        "test-123",
   392  					Namespace:   ns.Name,
   393  					Labels:      map[string]string{},
   394  					Annotations: map[string]string{},
   395  				},
   396  				Subsets: []corev1.EndpointSubset{{
   397  					Ports: []corev1.EndpointPort{{
   398  						Port: 80,
   399  					}},
   400  					Addresses: []corev1.EndpointAddress{{
   401  						IP: "10.0.0.1",
   402  					}},
   403  				}},
   404  			}
   405  
   406  			_, err = client.CoreV1().Services(ns.Name).Create(tCtx, service, metav1.CreateOptions{})
   407  			if err != nil {
   408  				t.Fatalf("Error creating service: %v", err)
   409  			}
   410  
   411  			_, err = client.CoreV1().Endpoints(ns.Name).Create(tCtx, customEndpoints, metav1.CreateOptions{})
   412  			if err != nil {
   413  				t.Fatalf("Error creating endpoints: %v", err)
   414  			}
   415  
   416  			// update endpoint
   417  			tc.tweakEndpoint(customEndpoints)
   418  			_, err = client.CoreV1().Endpoints(ns.Name).Update(tCtx, customEndpoints, metav1.UpdateOptions{})
   419  			if err != nil {
   420  				t.Fatalf("Error updating endpoints: %v", err)
   421  			}
   422  
   423  			// verify the endpoint updates were mirrored
   424  			err = wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
   425  				lSelector := discovery.LabelServiceName + "=" + service.Name
   426  				esList, err := client.DiscoveryV1().EndpointSlices(ns.Name).List(tCtx, metav1.ListOptions{LabelSelector: lSelector})
   427  				if err != nil {
   428  					t.Logf("Error listing EndpointSlices: %v", err)
   429  					return false, err
   430  				}
   431  
   432  				if len(esList.Items) == 0 {
   433  					t.Logf("Waiting for EndpointSlice to be created")
   434  					return false, nil
   435  				}
   436  
   437  				for _, endpointSlice := range esList.Items {
   438  					if endpointSlice.Labels[discovery.LabelManagedBy] != "endpointslicemirroring-controller.k8s.io" {
   439  						return false, fmt.Errorf("Expected EndpointSlice to be managed by endpointslicemirroring-controller.k8s.io, got %s", endpointSlice.Labels[discovery.LabelManagedBy])
   440  					}
   441  
   442  					// compare addresses
   443  					epAddresses := []string{}
   444  					for _, address := range customEndpoints.Subsets[0].Addresses {
   445  						epAddresses = append(epAddresses, address.IP)
   446  					}
   447  
   448  					sliceAddresses := []string{}
   449  					for _, sliceEndpoint := range endpointSlice.Endpoints {
   450  						sliceAddresses = append(sliceAddresses, sliceEndpoint.Addresses...)
   451  					}
   452  
   453  					sort.Strings(epAddresses)
   454  					sort.Strings(sliceAddresses)
   455  
   456  					if !apiequality.Semantic.DeepEqual(epAddresses, sliceAddresses) {
   457  						t.Logf("Expected EndpointSlice to have the same IP addresses, expected %v got %v", epAddresses, sliceAddresses)
   458  						return false, nil
   459  					}
   460  
   461  					// check labels were mirrored
   462  					if !isSubset(customEndpoints.Labels, endpointSlice.Labels) {
   463  						t.Logf("Expected EndpointSlice to mirror labels, expected %v to be in received %v", customEndpoints.Labels, endpointSlice.Labels)
   464  						return false, nil
   465  					}
   466  
   467  					// check annotations but endpoints.kubernetes.io/last-change-trigger-time were mirrored
   468  					annotations := map[string]string{}
   469  					for k, v := range customEndpoints.Annotations {
   470  						if k == corev1.EndpointsLastChangeTriggerTime {
   471  							continue
   472  						}
   473  						annotations[k] = v
   474  					}
   475  					if !apiequality.Semantic.DeepEqual(annotations, endpointSlice.Annotations) {
   476  						t.Logf("Expected EndpointSlice to mirror annotations, expected %v received %v", customEndpoints.Annotations, endpointSlice.Annotations)
   477  						return false, nil
   478  					}
   479  				}
   480  				return true, nil
   481  			})
   482  			if err != nil {
   483  				t.Fatalf("Timed out waiting for conditions: %v", err)
   484  			}
   485  		})
   486  	}
   487  }
   488  
   489  func TestEndpointSliceMirroringSelectorTransition(t *testing.T) {
   490  	// Disable ServiceAccount admission plugin as we don't have serviceaccount controller running.
   491  	server := kubeapiservertesting.StartTestServerOrDie(t, nil, []string{"--disable-admission-plugins=ServiceAccount"}, framework.SharedEtcd())
   492  	defer server.TearDownFn()
   493  
   494  	client, err := clientset.NewForConfig(server.ClientConfig)
   495  	if err != nil {
   496  		t.Fatalf("Error creating clientset: %v", err)
   497  	}
   498  
   499  	resyncPeriod := 12 * time.Hour
   500  	informers := informers.NewSharedInformerFactory(client, resyncPeriod)
   501  
   502  	tCtx := ktesting.Init(t)
   503  	epsmController := endpointslicemirroring.NewController(
   504  		tCtx,
   505  		informers.Core().V1().Endpoints(),
   506  		informers.Discovery().V1().EndpointSlices(),
   507  		informers.Core().V1().Services(),
   508  		int32(100),
   509  		client,
   510  		1*time.Second)
   511  
   512  	// Start informer and controllers
   513  	informers.Start(tCtx.Done())
   514  	go epsmController.Run(tCtx, 1)
   515  
   516  	testCases := []struct {
   517  		testName               string
   518  		startingSelector       map[string]string
   519  		startingMirroredSlices int
   520  		endingSelector         map[string]string
   521  		endingMirroredSlices   int
   522  	}{
   523  		{
   524  			testName:               "nil -> {foo: bar} selector",
   525  			startingSelector:       nil,
   526  			startingMirroredSlices: 1,
   527  			endingSelector:         map[string]string{"foo": "bar"},
   528  			endingMirroredSlices:   0,
   529  		},
   530  		{
   531  			testName:               "{foo: bar} -> nil selector",
   532  			startingSelector:       map[string]string{"foo": "bar"},
   533  			startingMirroredSlices: 0,
   534  			endingSelector:         nil,
   535  			endingMirroredSlices:   1,
   536  		},
   537  		{
   538  			testName:               "{} -> {foo: bar} selector",
   539  			startingSelector:       map[string]string{},
   540  			startingMirroredSlices: 1,
   541  			endingSelector:         map[string]string{"foo": "bar"},
   542  			endingMirroredSlices:   0,
   543  		},
   544  		{
   545  			testName:               "{foo: bar} -> {} selector",
   546  			startingSelector:       map[string]string{"foo": "bar"},
   547  			startingMirroredSlices: 0,
   548  			endingSelector:         map[string]string{},
   549  			endingMirroredSlices:   1,
   550  		},
   551  	}
   552  
   553  	for i, tc := range testCases {
   554  		t.Run(tc.testName, func(t *testing.T) {
   555  			ns := framework.CreateNamespaceOrDie(client, fmt.Sprintf("test-endpointslice-mirroring-%d", i), t)
   556  			defer framework.DeleteNamespaceOrDie(client, ns, t)
   557  			meta := metav1.ObjectMeta{Name: "test-123", Namespace: ns.Name}
   558  
   559  			service := &corev1.Service{
   560  				ObjectMeta: meta,
   561  				Spec: corev1.ServiceSpec{
   562  					Ports: []corev1.ServicePort{{
   563  						Port: int32(80),
   564  					}},
   565  					Selector: tc.startingSelector,
   566  				},
   567  			}
   568  
   569  			customEndpoints := &corev1.Endpoints{
   570  				ObjectMeta: meta,
   571  				Subsets: []corev1.EndpointSubset{{
   572  					Ports: []corev1.EndpointPort{{
   573  						Port: 80,
   574  					}},
   575  					Addresses: []corev1.EndpointAddress{{
   576  						IP: "10.0.0.1",
   577  					}},
   578  				}},
   579  			}
   580  
   581  			_, err = client.CoreV1().Services(ns.Name).Create(context.TODO(), service, metav1.CreateOptions{})
   582  			if err != nil {
   583  				t.Fatalf("Error creating service: %v", err)
   584  			}
   585  
   586  			_, err = client.CoreV1().Endpoints(ns.Name).Create(context.TODO(), customEndpoints, metav1.CreateOptions{})
   587  			if err != nil {
   588  				t.Fatalf("Error creating endpoints: %v", err)
   589  			}
   590  
   591  			// verify the expected number of mirrored slices exist
   592  			err = waitForMirroredSlices(t, client, ns.Name, service.Name, tc.startingMirroredSlices)
   593  			if err != nil {
   594  				t.Fatalf("Timed out waiting for initial mirrored slices to match expectations: %v", err)
   595  			}
   596  
   597  			service.Spec.Selector = tc.endingSelector
   598  			_, err = client.CoreV1().Services(ns.Name).Update(context.TODO(), service, metav1.UpdateOptions{})
   599  			if err != nil {
   600  				t.Fatalf("Error updating service: %v", err)
   601  			}
   602  
   603  			// verify the expected number of mirrored slices exist
   604  			err = waitForMirroredSlices(t, client, ns.Name, service.Name, tc.endingMirroredSlices)
   605  			if err != nil {
   606  				t.Fatalf("Timed out waiting for final mirrored slices to match expectations: %v", err)
   607  			}
   608  		})
   609  	}
   610  }
   611  
   612  func waitForMirroredSlices(t *testing.T, client *clientset.Clientset, nsName, svcName string, num int) error {
   613  	t.Helper()
   614  	return wait.PollImmediate(1*time.Second, wait.ForeverTestTimeout, func() (bool, error) {
   615  		lSelector := discovery.LabelServiceName + "=" + svcName
   616  		lSelector += "," + discovery.LabelManagedBy + "=endpointslicemirroring-controller.k8s.io"
   617  		esList, err := client.DiscoveryV1().EndpointSlices(nsName).List(context.TODO(), metav1.ListOptions{LabelSelector: lSelector})
   618  		if err != nil {
   619  			t.Logf("Error listing EndpointSlices: %v", err)
   620  			return false, err
   621  		}
   622  
   623  		if len(esList.Items) != num {
   624  			t.Logf("Expected %d slices to be mirrored, got %d", num, len(esList.Items))
   625  			return false, nil
   626  		}
   627  
   628  		return true, nil
   629  	})
   630  }
   631  
   632  // isSubset check if all the elements in a exist in b
   633  func isSubset(a, b map[string]string) bool {
   634  	if len(a) > len(b) {
   635  		return false
   636  	}
   637  	for k, v1 := range a {
   638  		if v2, ok := b[k]; !ok || v1 != v2 {
   639  			return false
   640  		}
   641  	}
   642  	return true
   643  }
   644  

View as plain text