...

Source file src/github.com/linkerd/linkerd2/multicluster/service-mirror/cluster_watcher_mirroring_test.go

Documentation: github.com/linkerd/linkerd2/multicluster/service-mirror

     1  package servicemirror
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"testing"
     7  
     8  	"github.com/go-test/deep"
     9  	"github.com/linkerd/linkerd2/controller/k8s"
    10  	consts "github.com/linkerd/linkerd2/pkg/k8s"
    11  	"github.com/linkerd/linkerd2/pkg/multicluster"
    12  	logging "github.com/sirupsen/logrus"
    13  	corev1 "k8s.io/api/core/v1"
    14  	"k8s.io/apimachinery/pkg/api/errors"
    15  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    16  	"k8s.io/client-go/tools/record"
    17  	"k8s.io/client-go/util/workqueue"
    18  )
    19  
    20  type mirroringTestCase struct {
    21  	description            string
    22  	environment            *testEnvironment
    23  	expectedLocalServices  []*corev1.Service
    24  	expectedLocalEndpoints []*corev1.Endpoints
    25  	expectedEventsInQueue  []interface{}
    26  }
    27  
    28  func (tc *mirroringTestCase) run(t *testing.T) {
    29  	t.Run(tc.description, func(t *testing.T) {
    30  
    31  		q := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
    32  		localAPI, err := tc.environment.runEnvironment(q)
    33  		if err != nil {
    34  			t.Fatal(err)
    35  		}
    36  		if tc.expectedLocalServices == nil {
    37  			// ensure the are no local services
    38  			services, err := localAPI.Client.CoreV1().Services(corev1.NamespaceAll).List(context.Background(), metav1.ListOptions{})
    39  			if err != nil {
    40  				t.Fatal(err)
    41  			}
    42  
    43  			if len(services.Items) > 0 {
    44  				t.Fatalf("Was expecting no local services but instead found %v", services.Items)
    45  
    46  			}
    47  		} else {
    48  			for _, expected := range tc.expectedLocalServices {
    49  				actual, err := localAPI.Client.CoreV1().Services(expected.Namespace).Get(context.Background(), expected.Name, metav1.GetOptions{})
    50  				if err != nil {
    51  					t.Fatalf("Could not find mirrored service with name %s", expected.Name)
    52  				}
    53  
    54  				if err := diffServices(expected, actual); err != nil {
    55  					t.Fatal(err)
    56  				}
    57  			}
    58  		}
    59  
    60  		if tc.expectedLocalEndpoints == nil {
    61  			// In a real Kubernetes cluster, deleting the service is sufficient
    62  			// to delete the endpoints.
    63  		} else {
    64  			for _, expected := range tc.expectedLocalEndpoints {
    65  				actual, err := localAPI.Client.CoreV1().Endpoints(expected.Namespace).Get(context.Background(), expected.Name, metav1.GetOptions{})
    66  				if err != nil {
    67  					t.Fatalf("Could not find endpoints with name %s", expected.Name)
    68  				}
    69  
    70  				if err := diffEndpoints(expected, actual); err != nil {
    71  					t.Fatal(err)
    72  				}
    73  			}
    74  		}
    75  
    76  		expectedNumEvents := len(tc.expectedEventsInQueue)
    77  		actualNumEvents := q.Len()
    78  
    79  		if expectedNumEvents != actualNumEvents {
    80  			t.Fatalf("Was expecting %d events but got %d", expectedNumEvents, actualNumEvents)
    81  		}
    82  
    83  		for _, ev := range tc.expectedEventsInQueue {
    84  			evInQueue, _ := q.Get()
    85  			if diff := deep.Equal(ev, evInQueue); diff != nil {
    86  				t.Errorf("%v", diff)
    87  			}
    88  		}
    89  	})
    90  }
    91  
    92  func TestRemoteServiceCreatedMirroring(t *testing.T) {
    93  	for _, tt := range []mirroringTestCase{
    94  		{
    95  			description: "create service and endpoints when gateway can be resolved",
    96  			environment: createExportedService,
    97  			expectedLocalServices: []*corev1.Service{
    98  				mirrorService(
    99  					"service-one-remote",
   100  					"ns1",
   101  					"111",
   102  					[]corev1.ServicePort{
   103  						{
   104  							Name:     "port1",
   105  							Protocol: "TCP",
   106  							Port:     555,
   107  						},
   108  						{
   109  							Name:     "port2",
   110  							Protocol: "TCP",
   111  							Port:     666,
   112  						},
   113  					}),
   114  			},
   115  			expectedLocalEndpoints: []*corev1.Endpoints{
   116  				endpoints("service-one-remote", "ns1", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{
   117  					{
   118  						Name:     "port1",
   119  						Port:     888,
   120  						Protocol: "TCP",
   121  					},
   122  					{
   123  						Name:     "port2",
   124  						Port:     888,
   125  						Protocol: "TCP",
   126  					},
   127  				}),
   128  			},
   129  		},
   130  		{
   131  			description: "create headless service and endpoints when gateway can be resolved",
   132  			environment: createExportedHeadlessService,
   133  			expectedLocalServices: []*corev1.Service{
   134  				headlessMirrorService(
   135  					"service-one-remote",
   136  					"ns2",
   137  					"111",
   138  					[]corev1.ServicePort{
   139  						{
   140  							Name:     "port1",
   141  							Protocol: "TCP",
   142  							Port:     555,
   143  						},
   144  						{
   145  							Name:     "port2",
   146  							Protocol: "TCP",
   147  							Port:     666,
   148  						},
   149  					}),
   150  				endpointMirrorService(
   151  					"pod-0",
   152  					"service-one-remote",
   153  					"ns2",
   154  					"112",
   155  					[]corev1.ServicePort{
   156  						{
   157  							Name:     "port1",
   158  							Protocol: "TCP",
   159  							Port:     555,
   160  						},
   161  						{
   162  							Name:     "port2",
   163  							Protocol: "TCP",
   164  							Port:     666,
   165  						},
   166  					},
   167  				),
   168  			},
   169  			expectedLocalEndpoints: []*corev1.Endpoints{
   170  				headlessMirrorEndpoints("service-one-remote", "ns2", "pod-0", "", "gateway-identity", []corev1.EndpointPort{
   171  					{
   172  						Name:     "port1",
   173  						Port:     555,
   174  						Protocol: "TCP",
   175  					},
   176  					{
   177  						Name:     "port2",
   178  						Port:     666,
   179  						Protocol: "TCP",
   180  					},
   181  				}),
   182  				endpointMirrorEndpoints(
   183  					"service-one-remote",
   184  					"ns2",
   185  					"pod-0",
   186  					"192.0.2.129",
   187  					"gateway-identity",
   188  					[]corev1.EndpointPort{
   189  						{
   190  							Name:     "port1",
   191  							Port:     889,
   192  							Protocol: "TCP",
   193  						},
   194  						{
   195  							Name:     "port2",
   196  							Port:     889,
   197  							Protocol: "TCP",
   198  						},
   199  					}),
   200  			},
   201  		},
   202  		{
   203  			description: "remote discovery mirroring",
   204  			environment: createRemoteDiscoveryService,
   205  			expectedLocalServices: []*corev1.Service{
   206  				remoteDiscoveryMirrorService(
   207  					"service-one",
   208  					"ns1",
   209  					"111",
   210  					[]corev1.ServicePort{
   211  						{
   212  							Name:     "port1",
   213  							Protocol: "TCP",
   214  							Port:     555,
   215  						},
   216  						{
   217  							Name:     "port2",
   218  							Protocol: "TCP",
   219  							Port:     666,
   220  						},
   221  					}),
   222  			},
   223  			expectedLocalEndpoints: []*corev1.Endpoints{},
   224  		},
   225  		{
   226  			description: "link with no gateway mirrors only remote discovery",
   227  			environment: noGatewayLink,
   228  			expectedLocalServices: []*corev1.Service{
   229  				remoteDiscoveryMirrorService(
   230  					"service-one",
   231  					"ns1",
   232  					"111",
   233  					[]corev1.ServicePort{
   234  						{
   235  							Name:     "port1",
   236  							Protocol: "TCP",
   237  							Port:     555,
   238  						},
   239  						{
   240  							Name:     "port2",
   241  							Protocol: "TCP",
   242  							Port:     666,
   243  						},
   244  					}),
   245  			},
   246  			expectedLocalEndpoints: []*corev1.Endpoints{},
   247  		},
   248  	} {
   249  		tc := tt // pin
   250  		tc.run(t)
   251  	}
   252  }
   253  
   254  func TestLocalNamespaceCreatedAfterServiceExport(t *testing.T) {
   255  	remoteAPI, err := k8s.NewFakeAPI(
   256  		gatewayAsYaml("existing-gateway", "existing-namespace", "222", "192.0.2.127", "mc-gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod),
   257  		remoteServiceAsYaml("service-one", "ns1", "111", []corev1.ServicePort{}),
   258  		endpointsAsYaml("service-one", "ns1", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{}),
   259  	)
   260  	if err != nil {
   261  		t.Fatal(err)
   262  	}
   263  	localAPI, err := k8s.NewFakeAPI()
   264  	if err != nil {
   265  		t.Fatal(err)
   266  	}
   267  	remoteAPI.Sync(nil)
   268  	localAPI.Sync(nil)
   269  
   270  	q := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
   271  	eventRecorder := record.NewFakeRecorder(100)
   272  
   273  	watcher := RemoteClusterServiceWatcher{
   274  		link: &multicluster.Link{
   275  			TargetClusterName:       clusterName,
   276  			TargetClusterDomain:     clusterDomain,
   277  			GatewayIdentity:         "gateway-identity",
   278  			GatewayAddress:          "192.0.2.127",
   279  			GatewayPort:             888,
   280  			ProbeSpec:               defaultProbeSpec,
   281  			Selector:                *defaultSelector,
   282  			RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
   283  		},
   284  		remoteAPIClient:         remoteAPI,
   285  		localAPIClient:          localAPI,
   286  		stopper:                 nil,
   287  		recorder:                eventRecorder,
   288  		log:                     logging.WithFields(logging.Fields{"cluster": clusterName}),
   289  		eventsQueue:             q,
   290  		requeueLimit:            0,
   291  		gatewayAlive:            true,
   292  		headlessServicesEnabled: true,
   293  	}
   294  
   295  	q.Add(&RemoteServiceCreated{
   296  		service: remoteService("service-one", "ns1", "111", map[string]string{
   297  			consts.DefaultExportedServiceSelector: "true",
   298  		}, []corev1.ServicePort{
   299  			{
   300  				Name:     "port1",
   301  				Protocol: "TCP",
   302  				Port:     555,
   303  			},
   304  			{
   305  				Name:     "port2",
   306  				Protocol: "TCP",
   307  				Port:     666,
   308  			},
   309  		}),
   310  	})
   311  	for q.Len() > 0 {
   312  		watcher.processNextEvent(context.Background())
   313  	}
   314  
   315  	_, err = localAPI.Svc().Lister().Services("ns1").Get("service-one-remote")
   316  	if err == nil {
   317  		t.Fatalf("service-one should not exist in local cluster before namespace is created")
   318  	} else if !errors.IsNotFound(err) {
   319  		t.Fatalf("unexpected error: %v", err)
   320  	}
   321  
   322  	skippedEvent := <-eventRecorder.Events
   323  	if skippedEvent != fmt.Sprintf("%s %s %s", corev1.EventTypeNormal, eventTypeSkipped, "Skipped mirroring service: namespace does not exist") {
   324  		t.Error("Expected skipped event, got:", skippedEvent)
   325  	}
   326  
   327  	ns, err := localAPI.Client.CoreV1().Namespaces().Create(context.Background(), &corev1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "ns1"}}, metav1.CreateOptions{})
   328  	if err != nil {
   329  		t.Fatal(err)
   330  	}
   331  
   332  	q.Add(&OnLocalNamespaceAdded{ns})
   333  	for q.Len() > 0 {
   334  		watcher.processNextEvent(context.Background())
   335  	}
   336  
   337  	_, err = localAPI.Client.CoreV1().Services("ns1").Get(context.Background(), "service-one-remote", metav1.GetOptions{})
   338  	if err != nil {
   339  		t.Fatalf("error getting service-one locally: %v", err)
   340  	}
   341  }
   342  
   343  func TestServiceCreatedGatewayAlive(t *testing.T) {
   344  	remoteAPI, err := k8s.NewFakeAPI(
   345  		gatewayAsYaml("gateway", "gateway-ns", "1", "192.0.0.1", "gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod),
   346  		remoteServiceAsYaml("svc", "ns", "1", []corev1.ServicePort{}),
   347  		endpointsAsYaml("svc", "ns", "192.0.0.1", "gateway-identity", []corev1.EndpointPort{}),
   348  	)
   349  	if err != nil {
   350  		t.Fatal(err)
   351  	}
   352  	localAPI, err := k8s.NewFakeAPI(
   353  		namespaceAsYaml("ns"),
   354  	)
   355  	if err != nil {
   356  		t.Fatal(err)
   357  	}
   358  	remoteAPI.Sync(nil)
   359  	localAPI.Sync(nil)
   360  
   361  	events := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
   362  	watcher := RemoteClusterServiceWatcher{
   363  		link: &multicluster.Link{
   364  			TargetClusterName:       clusterName,
   365  			TargetClusterDomain:     clusterDomain,
   366  			GatewayIdentity:         "gateway-identity",
   367  			GatewayAddress:          "192.0.0.1",
   368  			GatewayPort:             888,
   369  			ProbeSpec:               defaultProbeSpec,
   370  			Selector:                *defaultSelector,
   371  			RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
   372  		},
   373  		remoteAPIClient: remoteAPI,
   374  		localAPIClient:  localAPI,
   375  		log:             logging.WithFields(logging.Fields{"cluster": clusterName}),
   376  		eventsQueue:     events,
   377  		requeueLimit:    0,
   378  		gatewayAlive:    true,
   379  	}
   380  
   381  	events.Add(&RemoteServiceCreated{
   382  		service: remoteService("svc", "ns", "1", map[string]string{
   383  			consts.DefaultExportedServiceSelector: "true",
   384  		}, []corev1.ServicePort{
   385  			{
   386  				Name:     "port",
   387  				Protocol: "TCP",
   388  				Port:     111,
   389  			},
   390  		}),
   391  	})
   392  	for events.Len() > 0 {
   393  		watcher.processNextEvent(context.Background())
   394  	}
   395  
   396  	// Expect Service svc-remote to be created with ready endpoints because
   397  	// the Namespace ns exists and the gateway is alive.
   398  	_, err = localAPI.Client.CoreV1().Services("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
   399  	if err != nil {
   400  		t.Fatalf("error getting svc-remote Service: %v", err)
   401  	}
   402  	endpoints, err := localAPI.Client.CoreV1().Endpoints("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
   403  	if err != nil {
   404  		t.Fatalf("error getting svc-remote Endpoints: %v", err)
   405  	}
   406  	if len(endpoints.Subsets) == 0 {
   407  		t.Fatal("expected svc-remote Endpoints subsets")
   408  	}
   409  	for _, ss := range endpoints.Subsets {
   410  		if len(ss.Addresses) == 0 {
   411  			t.Fatal("svc-remote Endpoints should have addresses")
   412  		}
   413  		if len(ss.NotReadyAddresses) != 0 {
   414  			t.Fatalf("svc-remote Endpoints should not have not ready addresses: %v", ss.NotReadyAddresses)
   415  		}
   416  	}
   417  
   418  	// The gateway is now down which triggers repairing Endpoints on the local
   419  	// cluster.
   420  	watcher.gatewayAlive = false
   421  	events.Add(&RepairEndpoints{})
   422  	for events.Len() > 0 {
   423  		watcher.processNextEvent(context.Background())
   424  	}
   425  
   426  	// When repairing Endpoints on the local cluster, the gateway address
   427  	// should have been moved to NotReadyAddresses meaning that Endpoints
   428  	// for the mirrored Service svc-remote should have no ready addresses.
   429  	endpoints, err = localAPI.Client.CoreV1().Endpoints("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
   430  	if err != nil {
   431  		t.Fatalf("error getting svc-remote Endpoints locally: %v", err)
   432  	}
   433  	if len(endpoints.Subsets) == 0 {
   434  		t.Fatal("expected svc-remote Endpoints subsets")
   435  	}
   436  	for _, ss := range endpoints.Subsets {
   437  		if len(ss.NotReadyAddresses) == 0 {
   438  			t.Fatal("svc-remote Endpoints should have not ready addresses")
   439  		}
   440  		if len(ss.Addresses) != 0 {
   441  			t.Fatalf("svc-remote Endpoints should not have addresses: %v", ss.Addresses)
   442  		}
   443  	}
   444  
   445  	// Issue an update for the remote Service which adds a new label
   446  	// 'new-label'. This should exercise RemoteServiceUpdated which should
   447  	// update svc-remote; the gateway is still not alive though so we expect
   448  	// the Endpoints of svc-remote to still have no ready addresses.
   449  	events.Add(&RemoteServiceUpdated{
   450  		localService:   remoteService("svc-remote", "ns", "2", nil, nil),
   451  		localEndpoints: endpoints,
   452  		remoteUpdate: remoteService("svc", "ns", "2", map[string]string{
   453  			consts.DefaultExportedServiceSelector: "true",
   454  			"new-label":                           "hi",
   455  		}, []corev1.ServicePort{
   456  			{
   457  				Name:     "port",
   458  				Protocol: "TCP",
   459  				Port:     111,
   460  			},
   461  		}),
   462  	})
   463  	for events.Len() > 0 {
   464  		watcher.processNextEvent(context.Background())
   465  	}
   466  	service, err := localAPI.Client.CoreV1().Services("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
   467  	if err != nil {
   468  		t.Fatalf("error getting svc-remote Service: %v", err)
   469  	}
   470  	_, ok := service.Labels["new-label"]
   471  	if !ok {
   472  		t.Fatalf("error updating svc-remote Service: %v", err)
   473  	}
   474  	endpoints, err = localAPI.Client.CoreV1().Endpoints("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
   475  	if err != nil {
   476  		t.Fatalf("error getting svc-remote Endpoints: %v", err)
   477  	}
   478  	if len(endpoints.Subsets) == 0 {
   479  		t.Fatal("expected svc-remote Endpoints subsets")
   480  	}
   481  	for _, ss := range endpoints.Subsets {
   482  		if len(ss.NotReadyAddresses) == 0 {
   483  			t.Fatal("svc-remote Endpoints should have not ready addresses")
   484  		}
   485  		if len(ss.Addresses) != 0 {
   486  			t.Fatalf("svc-remote Endpoints should not have addresses: %v", ss.Addresses)
   487  		}
   488  	}
   489  }
   490  
   491  func TestServiceCreatedGatewayDown(t *testing.T) {
   492  	remoteAPI, err := k8s.NewFakeAPI(
   493  		gatewayAsYaml("gateway", "gateway-ns", "1", "192.0.0.1", "gateway", 888, "gateway-identity", defaultProbePort, defaultProbePath, defaultProbePeriod),
   494  		remoteServiceAsYaml("svc", "ns", "1", []corev1.ServicePort{}),
   495  		endpointsAsYaml("svc", "ns", "192.0.0.1", "gateway-identity", []corev1.EndpointPort{}),
   496  	)
   497  	if err != nil {
   498  		t.Fatal(err)
   499  	}
   500  	localAPI, err := k8s.NewFakeAPI(
   501  		namespaceAsYaml("ns"),
   502  	)
   503  	if err != nil {
   504  		t.Fatal(err)
   505  	}
   506  	remoteAPI.Sync(nil)
   507  	localAPI.Sync(nil)
   508  
   509  	events := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
   510  	watcher := RemoteClusterServiceWatcher{
   511  		link: &multicluster.Link{
   512  			TargetClusterName:       clusterName,
   513  			TargetClusterDomain:     clusterDomain,
   514  			GatewayIdentity:         "gateway-identity",
   515  			GatewayAddress:          "192.0.0.1",
   516  			GatewayPort:             888,
   517  			ProbeSpec:               defaultProbeSpec,
   518  			Selector:                *defaultSelector,
   519  			RemoteDiscoverySelector: *defaultRemoteDiscoverySelector,
   520  		},
   521  		remoteAPIClient: remoteAPI,
   522  		localAPIClient:  localAPI,
   523  		log:             logging.WithFields(logging.Fields{"cluster": clusterName}),
   524  		eventsQueue:     events,
   525  		requeueLimit:    0,
   526  		gatewayAlive:    false,
   527  	}
   528  
   529  	events.Add(&RemoteServiceCreated{
   530  		service: remoteService("svc", "ns", "1", map[string]string{
   531  			consts.DefaultExportedServiceSelector: "true",
   532  		}, []corev1.ServicePort{
   533  			{
   534  				Name:     "port",
   535  				Protocol: "TCP",
   536  				Port:     111,
   537  			},
   538  		}),
   539  	})
   540  	for events.Len() > 0 {
   541  		watcher.processNextEvent(context.Background())
   542  	}
   543  
   544  	// Expect Service svc-remote to be created with Endpoints subsets
   545  	// that are not ready because the gateway is down.
   546  	_, err = localAPI.Client.CoreV1().Services("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
   547  	if err != nil {
   548  		t.Fatalf("error getting svc-remote Service: %v", err)
   549  	}
   550  	endpoints, err := localAPI.Client.CoreV1().Endpoints("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
   551  	if err != nil {
   552  		t.Fatalf("error getting svc-remote Endpoints: %v", err)
   553  	}
   554  	if len(endpoints.Subsets) == 0 {
   555  		t.Fatal("expected svc-remote Endpoints subsets")
   556  	}
   557  	for _, ss := range endpoints.Subsets {
   558  		if len(ss.NotReadyAddresses) == 0 {
   559  			t.Fatal("svc-remote Endpoints should have not ready addresses")
   560  		}
   561  		if len(ss.Addresses) != 0 {
   562  			t.Fatalf("svc-remote Endpoints should not have addresses: %v", ss.Addresses)
   563  		}
   564  	}
   565  
   566  	// The gateway is now alive which triggers repairing Endpoints on the
   567  	// local cluster.
   568  	watcher.gatewayAlive = true
   569  	events.Add(&RepairEndpoints{})
   570  	for events.Len() > 0 {
   571  		watcher.processNextEvent(context.Background())
   572  	}
   573  	endpoints, err = localAPI.Client.CoreV1().Endpoints("ns").Get(context.Background(), "svc-remote", metav1.GetOptions{})
   574  	if err != nil {
   575  		t.Fatalf("error getting svc-remote Endpoints locally: %v", err)
   576  	}
   577  	if len(endpoints.Subsets) == 0 {
   578  		t.Fatal("expected svc-remote Endpoints subsets")
   579  	}
   580  	for _, ss := range endpoints.Subsets {
   581  		if len(ss.Addresses) == 0 {
   582  			t.Fatal("svc-remote Endpoints should have addresses")
   583  		}
   584  		if len(ss.NotReadyAddresses) != 0 {
   585  			t.Fatalf("svc-remote Service endpoints should not have not ready addresses: %v", ss.NotReadyAddresses)
   586  		}
   587  	}
   588  }
   589  
   590  func TestRemoteServiceDeletedMirroring(t *testing.T) {
   591  	for _, tt := range []mirroringTestCase{
   592  		{
   593  			description: "deletes locally mirrored service",
   594  			environment: deleteMirrorService,
   595  		},
   596  	} {
   597  		tc := tt // pin
   598  		tc.run(t)
   599  	}
   600  }
   601  
   602  func TestRemoteServiceUpdatedMirroring(t *testing.T) {
   603  	for _, tt := range []mirroringTestCase{
   604  		{
   605  			description: "updates service ports on both service and endpoints",
   606  			environment: updateServiceWithChangedPorts,
   607  			expectedLocalServices: []*corev1.Service{
   608  				mirrorService("test-service-remote", "test-namespace", "currentServiceResVersion",
   609  					[]corev1.ServicePort{
   610  						{
   611  							Name:     "port1",
   612  							Protocol: "TCP",
   613  							Port:     111,
   614  						},
   615  						{
   616  							Name:     "port3",
   617  							Protocol: "TCP",
   618  							Port:     333,
   619  						},
   620  					}),
   621  			},
   622  
   623  			expectedLocalEndpoints: []*corev1.Endpoints{
   624  				endpoints("test-service-remote", "test-namespace", "192.0.2.127", "gateway-identity", []corev1.EndpointPort{
   625  					{
   626  						Name:     "port1",
   627  						Port:     888,
   628  						Protocol: "TCP",
   629  					},
   630  					{
   631  						Name:     "port3",
   632  						Port:     888,
   633  						Protocol: "TCP",
   634  					},
   635  				}),
   636  			},
   637  		},
   638  	} {
   639  		tc := tt // pin
   640  		tc.run(t)
   641  	}
   642  }
   643  
   644  // TestEmptyRemoteSelectors asserts that empty selectors do not introduce side
   645  // effects, such as mirroring unexported services. An empty label selector
   646  // functions as a catch-all (i.e. matches everything), the cluster watcher must
   647  // uphold an invariant whereby empty selectors do not export everything by
   648  // default.
   649  func TestEmptyRemoteSelectors(t *testing.T) {
   650  	for _, tt := range []mirroringTestCase{
   651  		{
   652  			description: "empty remote discovery selector does not result in exports",
   653  			environment: createEnvWithSelector(defaultSelector, &metav1.LabelSelector{}),
   654  			expectedEventsInQueue: []interface{}{&RemoteServiceCreated{
   655  				service: remoteService("service-one", "ns1", "111", map[string]string{
   656  					consts.DefaultExportedServiceSelector: "true",
   657  				}, []corev1.ServicePort{
   658  					{
   659  						Name:     "default1",
   660  						Protocol: "TCP",
   661  						Port:     555,
   662  					},
   663  					{
   664  						Name:     "default2",
   665  						Protocol: "TCP",
   666  						Port:     666,
   667  					},
   668  				}),
   669  			},
   670  			},
   671  		},
   672  		{
   673  			description: "empty default selector does not result in exports",
   674  			environment: createEnvWithSelector(&metav1.LabelSelector{}, defaultRemoteDiscoverySelector),
   675  			expectedEventsInQueue: []interface{}{&RemoteServiceCreated{
   676  				service: remoteService("service-two", "ns1", "111", map[string]string{
   677  					consts.DefaultExportedServiceSelector: "remote-discovery",
   678  				}, []corev1.ServicePort{
   679  					{
   680  						Name:     "remote1",
   681  						Protocol: "TCP",
   682  						Port:     777,
   683  					},
   684  					{
   685  						Name:     "remote2",
   686  						Protocol: "TCP",
   687  						Port:     888,
   688  					},
   689  				}),
   690  			}},
   691  		},
   692  		{
   693  			description: "no selector in link does not result in exports",
   694  			environment: createEnvWithSelector(&metav1.LabelSelector{}, &metav1.LabelSelector{}),
   695  		},
   696  	} {
   697  		tc := tt
   698  		tc.run(t)
   699  	}
   700  }
   701  
   702  func TestRemoteEndpointsUpdatedMirroring(t *testing.T) {
   703  	for _, tt := range []mirroringTestCase{
   704  		{
   705  			description: "updates headless mirror service with new remote Endpoints hosts",
   706  			environment: updateEndpointsWithChangedHosts,
   707  			expectedLocalServices: []*corev1.Service{
   708  				headlessMirrorService("service-two-remote", "eptest", "222", []corev1.ServicePort{
   709  					{
   710  						Name:     "port1",
   711  						Protocol: "TCP",
   712  						Port:     555,
   713  					},
   714  					{
   715  						Name:     "port2",
   716  						Protocol: "TCP",
   717  						Port:     666,
   718  					},
   719  				}),
   720  				endpointMirrorService("pod-0", "service-two-remote", "eptest", "333", []corev1.ServicePort{
   721  					{
   722  						Name:     "port1",
   723  						Protocol: "TCP",
   724  						Port:     555,
   725  					},
   726  					{
   727  						Name:     "port2",
   728  						Protocol: "TCP",
   729  						Port:     666,
   730  					},
   731  				}),
   732  				endpointMirrorService("pod-1", "service-two-remote", "eptest", "112", []corev1.ServicePort{
   733  					{
   734  						Name:     "port1",
   735  						Protocol: "TCP",
   736  						Port:     555,
   737  					},
   738  					{
   739  						Name:     "port2",
   740  						Protocol: "TCP",
   741  						Port:     666,
   742  					},
   743  				}),
   744  			},
   745  			expectedLocalEndpoints: []*corev1.Endpoints{
   746  				headlessMirrorEndpointsUpdated(
   747  					"service-two-remote",
   748  					"eptest",
   749  					[]string{"pod-0", "pod-1"},
   750  					[]string{"", ""},
   751  					"gateway-identity",
   752  					[]corev1.EndpointPort{
   753  						{
   754  							Name:     "port1",
   755  							Port:     555,
   756  							Protocol: "TCP",
   757  						},
   758  						{
   759  							Name:     "port2",
   760  							Port:     666,
   761  							Protocol: "TCP",
   762  						},
   763  					}),
   764  				endpointMirrorEndpoints(
   765  					"service-two-remote",
   766  					"eptest",
   767  					"pod-0",
   768  					"192.0.2.127",
   769  					"gateway-identity",
   770  					[]corev1.EndpointPort{
   771  						{
   772  							Name:     "port1",
   773  							Port:     888,
   774  							Protocol: "TCP",
   775  						},
   776  						{
   777  							Name:     "port2",
   778  							Port:     888,
   779  							Protocol: "TCP",
   780  						},
   781  					}),
   782  				endpointMirrorEndpoints(
   783  					"service-two-remote",
   784  					"eptest",
   785  					"pod-1",
   786  					"192.0.2.127",
   787  					"gateway-identity",
   788  					[]corev1.EndpointPort{
   789  						{
   790  							Name:     "port1",
   791  							Port:     888,
   792  							Protocol: "TCP",
   793  						},
   794  						{
   795  							Name:     "port2",
   796  							Port:     888,
   797  							Protocol: "TCP",
   798  						},
   799  					}),
   800  			},
   801  		},
   802  	} {
   803  		tc := tt // pin
   804  		tc.run(t)
   805  	}
   806  }
   807  
   808  func TestClusterUnregisteredMirroring(t *testing.T) {
   809  	for _, tt := range []mirroringTestCase{
   810  		{
   811  			description: "unregisters cluster and cleans up all mirrored resources",
   812  			environment: clusterUnregistered,
   813  		},
   814  	} {
   815  		tc := tt // pin
   816  		tc.run(t)
   817  	}
   818  }
   819  
   820  func TestGcOrphanedServicesMirroring(t *testing.T) {
   821  	for _, tt := range []mirroringTestCase{
   822  		{
   823  			description: "deletes mirrored resources that are no longer present on the remote cluster",
   824  			environment: gcTriggered,
   825  			expectedLocalServices: []*corev1.Service{
   826  				mirrorService("test-service-1-remote", "test-namespace", "", nil),
   827  				headlessMirrorService("test-headless-service-remote", "test-namespace", "", nil),
   828  				endpointMirrorService("pod-0", "test-headless-service-remote", "test-namespace", "", nil),
   829  			},
   830  
   831  			expectedLocalEndpoints: []*corev1.Endpoints{
   832  				endpoints("test-service-1-remote", "test-namespace", "", "", nil),
   833  				headlessMirrorEndpoints("test-headless-service-remote", "test-namespace", "pod-0", "", "", nil),
   834  				endpointMirrorEndpoints("test-headless-service-remote", "test-namespace", "pod-0", "", "", nil),
   835  			},
   836  		},
   837  	} {
   838  		tc := tt // pin
   839  		tc.run(t)
   840  	}
   841  }
   842  
   843  func onAddOrUpdateTestCases(isAdd bool) []mirroringTestCase {
   844  
   845  	testType := "ADD"
   846  	if !isAdd {
   847  		testType = "UPDATE"
   848  	}
   849  
   850  	return []mirroringTestCase{
   851  		{
   852  			description: fmt.Sprintf("enqueue a RemoteServiceCreated event when this is not a gateway and we have the needed annotations (%s)", testType),
   853  			environment: onAddOrUpdateExportedSvc(isAdd),
   854  			expectedEventsInQueue: []interface{}{&RemoteServiceCreated{
   855  				service: remoteService("test-service", "test-namespace", "resVersion", map[string]string{
   856  					consts.DefaultExportedServiceSelector: "true",
   857  				}, nil),
   858  			}},
   859  		},
   860  		{
   861  			description: fmt.Sprintf("enqueue a RemoteServiceUpdated event if this is a service that we have already mirrored and its res version is different (%s)", testType),
   862  			environment: onAddOrUpdateRemoteServiceUpdated(isAdd),
   863  			expectedEventsInQueue: []interface{}{&RemoteServiceUpdated{
   864  				localService:   mirrorService("test-service-remote", "test-namespace", "pastResourceVersion", nil),
   865  				localEndpoints: endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
   866  				remoteUpdate: remoteService("test-service", "test-namespace", "currentResVersion", map[string]string{
   867  					consts.DefaultExportedServiceSelector: "true",
   868  				}, nil),
   869  			}},
   870  			expectedLocalServices: []*corev1.Service{
   871  				mirrorService("test-service-remote", "test-namespace", "pastResourceVersion", nil),
   872  			},
   873  			expectedLocalEndpoints: []*corev1.Endpoints{
   874  				endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
   875  			},
   876  		},
   877  		{
   878  			description: fmt.Sprintf("not enqueue any events as this update does not really tell us anything new (res version is the same...) (%s)", testType),
   879  			environment: onAddOrUpdateSameResVersion(isAdd),
   880  			expectedLocalServices: []*corev1.Service{
   881  				mirrorService("test-service-remote", "test-namespace", "currentResVersion", nil),
   882  			},
   883  			expectedLocalEndpoints: []*corev1.Endpoints{
   884  				endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
   885  			},
   886  		},
   887  		{
   888  			description: fmt.Sprintf("enqueue RemoteServiceDeleted event as this service is not mirrorable anymore (%s)", testType),
   889  			environment: serviceNotExportedAnymore(isAdd),
   890  			expectedEventsInQueue: []interface{}{&RemoteServiceDeleted{
   891  				Name:      "test-service",
   892  				Namespace: "test-namespace",
   893  			}},
   894  
   895  			expectedLocalServices: []*corev1.Service{
   896  				mirrorService("test-service-remote", "test-namespace", "currentResVersion", nil),
   897  			},
   898  			expectedLocalEndpoints: []*corev1.Endpoints{
   899  				endpoints("test-service-remote", "test-namespace", "0.0.0.0", "", nil),
   900  			},
   901  		},
   902  	}
   903  }
   904  
   905  func TestOnAdd(t *testing.T) {
   906  	for _, tt := range onAddOrUpdateTestCases(true) {
   907  		tc := tt // pin
   908  		tc.run(t)
   909  	}
   910  }
   911  
   912  func TestOnUpdate(t *testing.T) {
   913  	for _, tt := range onAddOrUpdateTestCases(false) {
   914  		tc := tt // pin
   915  		tc.run(t)
   916  	}
   917  }
   918  
   919  func TestOnDelete(t *testing.T) {
   920  	for _, tt := range []mirroringTestCase{
   921  		{
   922  			description: "enqueues a RemoteServiceDeleted because there is gateway metadata present on the service",
   923  			environment: onDeleteExportedService,
   924  			expectedEventsInQueue: []interface{}{
   925  				&RemoteServiceDeleted{
   926  					Name:      "test-service",
   927  					Namespace: "test-namespace",
   928  				},
   929  			},
   930  		},
   931  		{
   932  			description:           "skips because there is no gateway metadata present on the service",
   933  			environment:           onDeleteNonExportedService,
   934  			expectedEventsInQueue: []interface{}{},
   935  		},
   936  	} {
   937  		tc := tt // pin
   938  		tc.run(t)
   939  	}
   940  }
   941  

View as plain text