...

Source file src/github.com/linkerd/linkerd2/controller/api/destination/watcher/endpoints_watcher_test.go

Documentation: github.com/linkerd/linkerd2/controller/api/destination/watcher

     1  package watcher
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  	"sort"
     8  	"sync"
     9  	"testing"
    10  	"time"
    11  
    12  	"github.com/linkerd/linkerd2/controller/k8s"
    13  	consts "github.com/linkerd/linkerd2/pkg/k8s"
    14  	"github.com/linkerd/linkerd2/testutil"
    15  	logging "github.com/sirupsen/logrus"
    16  	corev1 "k8s.io/api/core/v1"
    17  	dv1 "k8s.io/api/discovery/v1"
    18  	kerrors "k8s.io/apimachinery/pkg/api/errors"
    19  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    20  )
    21  
    22  type bufferingEndpointListener struct {
    23  	added              []string
    24  	removed            []string
    25  	localTrafficPolicy bool
    26  	noEndpointsCalled  bool
    27  	noEndpointsExist   bool
    28  	sync.Mutex
    29  }
    30  
    31  func newBufferingEndpointListener() *bufferingEndpointListener {
    32  	return &bufferingEndpointListener{
    33  		added:   []string{},
    34  		removed: []string{},
    35  		Mutex:   sync.Mutex{},
    36  	}
    37  }
    38  
    39  func addressString(address Address) string {
    40  	addressString := fmt.Sprintf("%s:%d", address.IP, address.Port)
    41  	if address.Identity != "" {
    42  		addressString = fmt.Sprintf("%s/%s", addressString, address.Identity)
    43  	}
    44  	if address.AuthorityOverride != "" {
    45  		addressString = fmt.Sprintf("%s/%s", addressString, address.AuthorityOverride)
    46  	}
    47  	return addressString
    48  }
    49  
    50  func (bel *bufferingEndpointListener) ExpectAdded(expected []string, t *testing.T) {
    51  	bel.Lock()
    52  	defer bel.Unlock()
    53  	t.Helper()
    54  	sort.Strings(bel.added)
    55  	testCompare(t, expected, bel.added)
    56  }
    57  
    58  func (bel *bufferingEndpointListener) ExpectRemoved(expected []string, t *testing.T) {
    59  	bel.Lock()
    60  	defer bel.Unlock()
    61  	t.Helper()
    62  	sort.Strings(bel.removed)
    63  	testCompare(t, expected, bel.removed)
    64  }
    65  
    66  func (bel *bufferingEndpointListener) endpointsAreNotCalled() bool {
    67  	bel.Lock()
    68  	defer bel.Unlock()
    69  	return bel.noEndpointsCalled
    70  }
    71  
    72  func (bel *bufferingEndpointListener) endpointsDoNotExist() bool {
    73  	bel.Lock()
    74  	defer bel.Unlock()
    75  	return bel.noEndpointsExist
    76  }
    77  
    78  func (bel *bufferingEndpointListener) Add(set AddressSet) {
    79  	bel.Lock()
    80  	defer bel.Unlock()
    81  	for _, address := range set.Addresses {
    82  		bel.added = append(bel.added, addressString(address))
    83  	}
    84  	bel.localTrafficPolicy = set.LocalTrafficPolicy
    85  }
    86  
    87  func (bel *bufferingEndpointListener) Remove(set AddressSet) {
    88  	bel.Lock()
    89  	defer bel.Unlock()
    90  	for _, address := range set.Addresses {
    91  		bel.removed = append(bel.removed, addressString(address))
    92  	}
    93  	bel.localTrafficPolicy = set.LocalTrafficPolicy
    94  }
    95  
    96  func (bel *bufferingEndpointListener) NoEndpoints(exists bool) {
    97  	bel.Lock()
    98  	defer bel.Unlock()
    99  	bel.noEndpointsCalled = true
   100  	bel.noEndpointsExist = exists
   101  }
   102  
   103  type bufferingEndpointListenerWithResVersion struct {
   104  	added   []string
   105  	removed []string
   106  	sync.Mutex
   107  }
   108  
   109  func newBufferingEndpointListenerWithResVersion() *bufferingEndpointListenerWithResVersion {
   110  	return &bufferingEndpointListenerWithResVersion{
   111  		added:   []string{},
   112  		removed: []string{},
   113  		Mutex:   sync.Mutex{},
   114  	}
   115  }
   116  
   117  func addressStringWithResVersion(address Address) string {
   118  	return fmt.Sprintf("%s:%d:%s", address.IP, address.Port, address.Pod.ResourceVersion)
   119  }
   120  
   121  func (bel *bufferingEndpointListenerWithResVersion) ExpectAdded(expected []string, t *testing.T) {
   122  	bel.Lock()
   123  	defer bel.Unlock()
   124  	sort.Strings(bel.added)
   125  	testCompare(t, expected, bel.added)
   126  }
   127  
   128  func (bel *bufferingEndpointListenerWithResVersion) ExpectRemoved(expected []string, t *testing.T) {
   129  	bel.Lock()
   130  	defer bel.Unlock()
   131  	sort.Strings(bel.removed)
   132  	testCompare(t, expected, bel.removed)
   133  }
   134  
   135  func (bel *bufferingEndpointListenerWithResVersion) Add(set AddressSet) {
   136  	bel.Lock()
   137  	defer bel.Unlock()
   138  	for _, address := range set.Addresses {
   139  		bel.added = append(bel.added, addressStringWithResVersion(address))
   140  	}
   141  }
   142  
   143  func (bel *bufferingEndpointListenerWithResVersion) Remove(set AddressSet) {
   144  	bel.Lock()
   145  	defer bel.Unlock()
   146  	for _, address := range set.Addresses {
   147  		bel.removed = append(bel.removed, addressStringWithResVersion(address))
   148  	}
   149  }
   150  
   151  func (bel *bufferingEndpointListenerWithResVersion) NoEndpoints(exists bool) {}
   152  
   153  func TestEndpointsWatcher(t *testing.T) {
   154  	for _, tt := range []struct {
   155  		serviceType                      string
   156  		k8sConfigs                       []string
   157  		id                               ServiceID
   158  		hostname                         string
   159  		port                             Port
   160  		expectedAddresses                []string
   161  		expectedNoEndpoints              bool
   162  		expectedNoEndpointsServiceExists bool
   163  		expectedError                    bool
   164  	}{
   165  		{
   166  			serviceType: "local services",
   167  			k8sConfigs: []string{`
   168  apiVersion: v1
   169  kind: Service
   170  metadata:
   171    name: name1
   172    namespace: ns
   173  spec:
   174    type: LoadBalancer
   175    ports:
   176    - port: 8989`,
   177  				`
   178  apiVersion: v1
   179  kind: Endpoints
   180  metadata:
   181    name: name1
   182    namespace: ns
   183  subsets:
   184  - addresses:
   185    - ip: 172.17.0.12
   186      targetRef:
   187        kind: Pod
   188        name: name1-1
   189        namespace: ns
   190    - ip: 172.17.0.19
   191      targetRef:
   192        kind: Pod
   193        name: name1-2
   194        namespace: ns
   195    - ip: 172.17.0.20
   196      targetRef:
   197        kind: Pod
   198        name: name1-3
   199        namespace: ns
   200    - ip: 172.17.0.21
   201    ports:
   202    - port: 8989`,
   203  				`
   204  apiVersion: v1
   205  kind: Pod
   206  metadata:
   207    name: name1-1
   208    namespace: ns
   209    ownerReferences:
   210    - kind: ReplicaSet
   211      name: rs-1
   212  status:
   213    phase: Running
   214    podIP: 172.17.0.12`,
   215  				`
   216  apiVersion: v1
   217  kind: Pod
   218  metadata:
   219    name: name1-2
   220    namespace: ns
   221    ownerReferences:
   222    - kind: ReplicaSet
   223      name: rs-1
   224  status:
   225    phase: Running
   226    podIP: 172.17.0.19`,
   227  				`
   228  apiVersion: v1
   229  kind: Pod
   230  metadata:
   231    name: name1-3
   232    namespace: ns
   233    ownerReferences:
   234    - kind: ReplicaSet
   235      name: rs-1
   236  status:
   237    phase: Running
   238    podIP: 172.17.0.20`,
   239  			},
   240  			id:   ServiceID{Name: "name1", Namespace: "ns"},
   241  			port: 8989,
   242  			expectedAddresses: []string{
   243  				"172.17.0.12:8989",
   244  				"172.17.0.19:8989",
   245  				"172.17.0.20:8989",
   246  				"172.17.0.21:8989",
   247  			},
   248  			expectedNoEndpoints:              false,
   249  			expectedNoEndpointsServiceExists: false,
   250  			expectedError:                    false,
   251  		},
   252  		{
   253  			// Test for the issue described in linkerd/linkerd2#1405.
   254  			serviceType: "local NodePort service with unnamed port",
   255  			k8sConfigs: []string{`
   256  apiVersion: v1
   257  kind: Service
   258  metadata:
   259    name: name1
   260    namespace: ns
   261  spec:
   262    type: NodePort
   263    ports:
   264    - port: 8989
   265      targetPort: port1`,
   266  				`
   267  apiVersion: v1
   268  kind: Endpoints
   269  metadata:
   270    name: name1
   271    namespace: ns
   272  subsets:
   273  - addresses:
   274    - ip: 10.233.66.239
   275      targetRef:
   276        kind: Pod
   277        name: name1-f748fb6b4-hpwpw
   278        namespace: ns
   279    - ip: 10.233.88.244
   280      targetRef:
   281        kind: Pod
   282        name: name1-f748fb6b4-6vcmw
   283        namespace: ns
   284    ports:
   285    - port: 8990
   286      protocol: TCP`,
   287  				`
   288  apiVersion: v1
   289  kind: Pod
   290  metadata:
   291    name: name1-f748fb6b4-hpwpw
   292    namespace: ns
   293    ownerReferences:
   294    - kind: ReplicaSet
   295      name: rs-1
   296  status:
   297    podIp: 10.233.66.239
   298    phase: Running`,
   299  				`
   300  apiVersion: v1
   301  kind: Pod
   302  metadata:
   303    name: name1-f748fb6b4-6vcmw
   304    namespace: ns
   305    ownerReferences:
   306    - kind: ReplicaSet
   307      name: rs-1
   308  status:
   309    podIp: 10.233.88.244
   310    phase: Running`,
   311  			},
   312  			id:   ServiceID{Name: "name1", Namespace: "ns"},
   313  			port: 8989,
   314  			expectedAddresses: []string{
   315  				"10.233.66.239:8990",
   316  				"10.233.88.244:8990",
   317  			},
   318  			expectedNoEndpoints:              false,
   319  			expectedNoEndpointsServiceExists: false,
   320  			expectedError:                    false,
   321  		},
   322  		{
   323  			// Test for the issue described in linkerd/linkerd2#1853.
   324  			serviceType: "local service with named target port and differently-named service port",
   325  			k8sConfigs: []string{`
   326  apiVersion: v1
   327  kind: Service
   328  metadata:
   329    name: world
   330    namespace: ns
   331  spec:
   332    type: ClusterIP
   333    ports:
   334      - name: app
   335        port: 7778
   336        targetPort: http`,
   337  				`
   338  apiVersion: v1
   339  kind: Endpoints
   340  metadata:
   341    name: world
   342    namespace: ns
   343  subsets:
   344  - addresses:
   345    - ip: 10.1.30.135
   346      targetRef:
   347        kind: Pod
   348        name: world-575bf846b4-tp4hw
   349        namespace: ns
   350    ports:
   351    - name: app
   352      port: 7779
   353      protocol: TCP`,
   354  				`
   355  apiVersion: v1
   356  kind: Pod
   357  metadata:
   358    name: world-575bf846b4-tp4hw
   359    namespace: ns
   360    ownerReferences:
   361    - kind: ReplicaSet
   362      name: rs-1
   363  status:
   364    podIp: 10.1.30.135
   365    phase: Running`,
   366  			},
   367  			id:   ServiceID{Name: "world", Namespace: "ns"},
   368  			port: 7778,
   369  			expectedAddresses: []string{
   370  				"10.1.30.135:7779",
   371  			},
   372  			expectedNoEndpoints:              false,
   373  			expectedNoEndpointsServiceExists: false,
   374  			expectedError:                    false,
   375  		},
   376  		{
   377  			serviceType: "local services with missing addresses",
   378  			k8sConfigs: []string{`
   379  apiVersion: v1
   380  kind: Service
   381  metadata:
   382    name: name1
   383    namespace: ns
   384  spec:
   385    type: LoadBalancer
   386    ports:
   387    - port: 8989`,
   388  				`
   389  apiVersion: v1
   390  kind: Endpoints
   391  metadata:
   392    name: name1
   393    namespace: ns
   394  subsets:
   395  - addresses:
   396    - ip: 172.17.0.23
   397      targetRef:
   398        kind: Pod
   399        name: name1-1
   400        namespace: ns
   401    - ip: 172.17.0.24
   402      targetRef:
   403        kind: Pod
   404        name: name1-2
   405        namespace: ns
   406    - ip: 172.17.0.25
   407      targetRef:
   408        kind: Pod
   409        name: name1-3
   410        namespace: ns
   411    ports:
   412    - port: 8989`,
   413  				`
   414  apiVersion: v1
   415  kind: Pod
   416  metadata:
   417    name: name1-3
   418    namespace: ns
   419    ownerReferences:
   420    - kind: ReplicaSet
   421      name: rs-1
   422  status:
   423    phase: Running
   424    podIP: 172.17.0.25`,
   425  			},
   426  			id:   ServiceID{Name: "name1", Namespace: "ns"},
   427  			port: 8989,
   428  			expectedAddresses: []string{
   429  				"172.17.0.25:8989",
   430  			},
   431  			expectedNoEndpoints:              false,
   432  			expectedNoEndpointsServiceExists: false,
   433  			expectedError:                    false,
   434  		},
   435  		{
   436  			serviceType: "local services with no endpoints",
   437  			k8sConfigs: []string{`
   438  apiVersion: v1
   439  kind: Service
   440  metadata:
   441    name: name2
   442    namespace: ns
   443  spec:
   444    type: LoadBalancer
   445    ports:
   446    - port: 7979`,
   447  			},
   448  			id:                               ServiceID{Name: "name2", Namespace: "ns"},
   449  			port:                             7979,
   450  			expectedAddresses:                []string{},
   451  			expectedNoEndpoints:              true,
   452  			expectedNoEndpointsServiceExists: true,
   453  			expectedError:                    false,
   454  		},
   455  		{
   456  			serviceType: "external name services",
   457  			k8sConfigs: []string{`
   458  apiVersion: v1
   459  kind: Service
   460  metadata:
   461    name: name3
   462    namespace: ns
   463  spec:
   464    type: ExternalName
   465    externalName: foo`,
   466  			},
   467  			id:                               ServiceID{Name: "name3", Namespace: "ns"},
   468  			port:                             6969,
   469  			expectedAddresses:                []string{},
   470  			expectedNoEndpoints:              false,
   471  			expectedNoEndpointsServiceExists: false,
   472  			expectedError:                    true,
   473  		},
   474  		{
   475  			serviceType:                      "services that do not yet exist",
   476  			k8sConfigs:                       []string{},
   477  			id:                               ServiceID{Name: "name4", Namespace: "ns"},
   478  			port:                             5959,
   479  			expectedAddresses:                []string{},
   480  			expectedNoEndpoints:              true,
   481  			expectedNoEndpointsServiceExists: false,
   482  			expectedError:                    false,
   483  		},
   484  		{
   485  			serviceType: "stateful sets",
   486  			k8sConfigs: []string{`
   487  apiVersion: v1
   488  kind: Service
   489  metadata:
   490    name: name1
   491    namespace: ns
   492  spec:
   493    type: LoadBalancer
   494    ports:
   495    - port: 8989`,
   496  				`
   497  apiVersion: v1
   498  kind: Endpoints
   499  metadata:
   500    name: name1
   501    namespace: ns
   502  subsets:
   503  - addresses:
   504    - ip: 172.17.0.12
   505      hostname: name1-1
   506      targetRef:
   507        kind: Pod
   508        name: name1-1
   509        namespace: ns
   510    - ip: 172.17.0.19
   511      hostname: name1-2
   512      targetRef:
   513        kind: Pod
   514        name: name1-2
   515        namespace: ns
   516    - ip: 172.17.0.20
   517      hostname: name1-3
   518      targetRef:
   519        kind: Pod
   520        name: name1-3
   521        namespace: ns
   522    ports:
   523    - port: 8989`,
   524  				`
   525  apiVersion: v1
   526  kind: Pod
   527  metadata:
   528    name: name1-1
   529    namespace: ns
   530    ownerReferences:
   531    - kind: ReplicaSet
   532      name: rs-1
   533  status:
   534    phase: Running
   535    podIP: 172.17.0.12`,
   536  				`
   537  apiVersion: v1
   538  kind: Pod
   539  metadata:
   540    name: name1-2
   541    namespace: ns
   542    ownerReferences:
   543    - kind: ReplicaSet
   544      name: rs-1
   545  status:
   546    phase: Running
   547    podIP: 172.17.0.19`,
   548  				`
   549  apiVersion: v1
   550  kind: Pod
   551  metadata:
   552    name: name1-3
   553    namespace: ns
   554    ownerReferences:
   555    - kind: ReplicaSet
   556      name: rs-1
   557  status:
   558    phase: Running
   559    podIP: 172.17.0.20`,
   560  			},
   561  			id:                               ServiceID{Name: "name1", Namespace: "ns"},
   562  			hostname:                         "name1-3",
   563  			port:                             5959,
   564  			expectedAddresses:                []string{"172.17.0.20:5959"},
   565  			expectedNoEndpoints:              false,
   566  			expectedNoEndpointsServiceExists: false,
   567  		},
   568  		{
   569  			serviceType: "local service with new named port mid rollout and two subsets but only first subset is relevant",
   570  			k8sConfigs: []string{`
   571  apiVersion: v1
   572  kind: Service
   573  metadata:
   574    name: name1
   575    namespace: ns
   576  spec:
   577    type: ClusterIP
   578    ports:
   579      - name: port1
   580        port: 8989
   581        targetPort: port1
   582      - name: port2
   583        port: 9999
   584        targetPort: port2`,
   585  				`
   586  apiVersion: v1
   587  kind: Endpoints
   588  metadata:
   589    labels:
   590      app: name1
   591    name: name1
   592    namespace: ns
   593  subsets:
   594  - addresses:
   595    - ip: 172.17.0.1
   596      nodeName: name1-1
   597      targetRef:
   598        kind: Pod
   599        name: name1-1
   600        namespace: ns
   601    - ip: 172.17.0.2
   602      nodeName: name1-2
   603      targetRef:
   604        kind: Pod
   605        name: name1-2
   606        namespace: ns
   607    ports:
   608    - name: port1
   609      port: 8989
   610      protocol: TCP
   611  - addresses:
   612    - ip: 172.17.0.1
   613      nodeName: name1-1
   614      targetRef:
   615        kind: Pod
   616        name: name1-1
   617        namespace: ns
   618    notReadyAddresses:
   619    - ip: 172.17.0.2
   620      nodeName: name1-2
   621      targetRef:
   622        kind: Pod
   623        name: name1-2
   624        namespace: ns
   625    ports:
   626    - name: port2
   627      port: 9999
   628      protocol: TCP
   629      `,
   630  				`
   631  apiVersion: v1
   632  kind: Pod
   633  metadata:
   634    name: name1-1
   635    namespace: ns
   636    ownerReferences:
   637    - kind: ReplicaSet
   638      name: rs-1
   639  status:
   640    phase: Running
   641    podIP: 172.17.0.1`,
   642  				`
   643  apiVersion: v1
   644  kind: Pod
   645  metadata:
   646    name: name1-2
   647    namespace: ns
   648    ownerReferences:
   649    - kind: ReplicaSet
   650      name: rs-2
   651  status:
   652    phase: Running
   653    podIP: 172.17.0.2`,
   654  			},
   655  			id:   ServiceID{Name: "name1", Namespace: "ns"},
   656  			port: 8989,
   657  			expectedAddresses: []string{
   658  				"172.17.0.1:8989",
   659  				"172.17.0.2:8989",
   660  			},
   661  			expectedNoEndpoints:              false,
   662  			expectedNoEndpointsServiceExists: false,
   663  			expectedError:                    false,
   664  		},
   665  	} {
   666  		tt := tt // pin
   667  		t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
   668  			k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
   669  			if err != nil {
   670  				t.Fatalf("NewFakeAPI returned an error: %s", err)
   671  			}
   672  
   673  			metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
   674  			if err != nil {
   675  				t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
   676  			}
   677  
   678  			watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
   679  			if err != nil {
   680  				t.Fatalf("can't create Endpoints watcher: %s", err)
   681  			}
   682  
   683  			k8sAPI.Sync(nil)
   684  			metadataAPI.Sync(nil)
   685  
   686  			listener := newBufferingEndpointListener()
   687  
   688  			err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
   689  			if tt.expectedError && err == nil {
   690  				t.Fatal("Expected error but was ok")
   691  			}
   692  			if !tt.expectedError && err != nil {
   693  				t.Fatalf("Expected no error, got [%s]", err)
   694  			}
   695  
   696  			listener.ExpectAdded(tt.expectedAddresses, t)
   697  
   698  			if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
   699  				t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
   700  					tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
   701  			}
   702  
   703  			if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
   704  				t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
   705  					tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
   706  			}
   707  		})
   708  	}
   709  }
   710  
   711  func TestEndpointsWatcherWithEndpointSlices(t *testing.T) {
   712  	for _, tt := range []struct {
   713  		serviceType                      string
   714  		k8sConfigs                       []string
   715  		id                               ServiceID
   716  		hostname                         string
   717  		port                             Port
   718  		expectedAddresses                []string
   719  		expectedNoEndpoints              bool
   720  		expectedNoEndpointsServiceExists bool
   721  		expectedError                    bool
   722  		expectedLocalTrafficPolicy       bool
   723  	}{
   724  		{
   725  			serviceType: "local services with EndpointSlice",
   726  			k8sConfigs: []string{`
   727  kind: APIResourceList
   728  apiVersion: v1
   729  groupVersion: discovery.k8s.io/v1
   730  resources:
   731    - name: endpointslices
   732      singularName: endpointslice
   733      namespaced: true
   734      kind: EndpointSlice
   735      verbs:
   736        - delete
   737        - deletecollection
   738        - get
   739        - list
   740        - patch
   741        - create
   742        - update
   743        - watch
   744  `, `
   745  apiVersion: v1
   746  kind: Service
   747  metadata:
   748    name: name-1
   749    namespace: ns
   750  spec:
   751    type: LoadBalancer
   752    ports:
   753    - port: 8989
   754    internalTrafficPolicy: Local`,
   755  				`
   756  addressType: IPv4
   757  apiVersion: discovery.k8s.io/v1
   758  endpoints:
   759  - addresses:
   760    - 172.17.0.12
   761    conditions:
   762      ready: true
   763    targetRef:
   764      kind: Pod
   765      name: name-1-1
   766      namespace: ns
   767    topology:
   768      kubernetes.io/hostname: node-1
   769  - addresses:
   770    - 172.17.0.19
   771    conditions:
   772      ready: true
   773    targetRef:
   774      kind: Pod
   775      name: name-1-2
   776      namespace: ns
   777    topology:
   778      kubernetes.io/hostname: node-1
   779  - addresses:
   780    - 172.17.0.20
   781    conditions:
   782      ready: true
   783    targetRef:
   784      kind: Pod
   785      name: name-1-3
   786      namespace: ns
   787    topology:
   788      kubernetes.io/hostname: node-2
   789  - addresses:
   790    - 172.17.0.21
   791    conditions:
   792      ready: true
   793    topology:
   794      kubernetes.io/hostname: node-2
   795  kind: EndpointSlice
   796  metadata:
   797    labels:
   798      kubernetes.io/service-name: name-1
   799    name: name-1-bhnqh
   800    namespace: ns
   801  ports:
   802  - name: ""
   803    port: 8989`,
   804  				`
   805  apiVersion: v1
   806  kind: Pod
   807  metadata:
   808    name: name-1-1
   809    namespace: ns
   810    ownerReferences:
   811    - kind: ReplicaSet
   812      name: rs-1
   813  status:
   814    phase: Running
   815    podIP: 172.17.0.12`,
   816  				`
   817  apiVersion: v1
   818  kind: Pod
   819  metadata:
   820    name: name-1-2
   821    namespace: ns
   822    ownerReferences:
   823    - kind: ReplicaSet
   824      name: rs-1
   825  status:
   826    phase: Running
   827    podIP: 172.17.0.19`,
   828  				`
   829  apiVersion: v1
   830  kind: Pod
   831  metadata:
   832    name: name-1-3
   833    namespace: ns
   834    ownerReferences:
   835    - kind: ReplicaSet
   836      name: rs-1
   837  status:
   838    phase: Running
   839    podIP: 172.17.0.20`,
   840  			},
   841  			id:   ServiceID{Name: "name-1", Namespace: "ns"},
   842  			port: 8989,
   843  			expectedAddresses: []string{
   844  				"172.17.0.12:8989",
   845  				"172.17.0.19:8989",
   846  				"172.17.0.20:8989",
   847  				"172.17.0.21:8989",
   848  			},
   849  			expectedNoEndpoints:              false,
   850  			expectedNoEndpointsServiceExists: false,
   851  			expectedError:                    false,
   852  			expectedLocalTrafficPolicy:       true,
   853  		},
   854  		{
   855  			serviceType: "local services with missing addresses and EndpointSlice",
   856  			k8sConfigs: []string{`
   857  kind: APIResourceList
   858  apiVersion: v1
   859  groupVersion: discovery.k8s.io/v1
   860  resources:
   861    - name: endpointslices
   862      singularName: endpointslice
   863      namespaced: true
   864      kind: EndpointSlice
   865      verbs:
   866        - delete
   867        - deletecollection
   868        - get
   869        - list
   870        - patch
   871        - create
   872        - update
   873        - watch
   874  `, `
   875  apiVersion: v1
   876  kind: Service
   877  metadata:
   878    name: name-1
   879    namespace: ns
   880  spec:
   881    type: LoadBalancer
   882    ports:
   883    - port: 8989`, `
   884  addressType: IPv4
   885  apiVersion: discovery.k8s.io/v1
   886  endpoints:
   887  - addresses:
   888    - 172.17.0.23
   889    conditions:
   890      ready: true
   891    targetRef:
   892      kind: Pod
   893      name: name-1-1
   894      namespace: ns
   895    topology:
   896      kubernetes.io/hostname: node-1
   897  - addresses:
   898    - 172.17.0.24
   899    conditions:
   900      ready: true
   901    targetRef:
   902      kind: Pod
   903      name: name-1-2
   904      namespace: ns
   905    topology:
   906      kubernetes.io/hostname: node-1
   907  - addresses:
   908    - 172.17.0.25
   909    conditions:
   910      ready: true
   911    targetRef:
   912      kind: Pod
   913      name: name-1-3
   914      namespace: ns
   915    topology:
   916      kubernetes.io/hostname: node-2
   917  kind: EndpointSlice
   918  metadata:
   919    labels:
   920      kubernetes.io/service-name: name-1
   921    name: name1-f5fad
   922    namespace: ns
   923  ports:
   924  - name: ""
   925    port: 8989`, `
   926  apiVersion: v1
   927  kind: Pod
   928  metadata:
   929    name: name-1-3
   930    namespace: ns
   931    ownerReferences:
   932    - kind: ReplicaSet
   933      name: rs-1
   934  status:
   935    podIP: 172.17.0.25
   936    phase: Running`,
   937  			},
   938  			id:                               ServiceID{Name: "name-1", Namespace: "ns"},
   939  			port:                             8989,
   940  			expectedAddresses:                []string{"172.17.0.25:8989"},
   941  			expectedNoEndpoints:              false,
   942  			expectedNoEndpointsServiceExists: false,
   943  			expectedError:                    false,
   944  		},
   945  		{
   946  			serviceType: "local services with no EndpointSlices",
   947  			k8sConfigs: []string{`
   948  kind: APIResourceList
   949  apiVersion: v1
   950  groupVersion: discovery.k8s.io/v1
   951  resources:
   952    - name: endpointslices
   953      singularName: endpointslice
   954      namespaced: true
   955      kind: EndpointSlice
   956      verbs:
   957        - delete
   958        - deletecollection
   959        - get
   960        - list
   961        - patch
   962        - create
   963        - update
   964        - watch
   965  `, `
   966  apiVersion: v1
   967  kind: Service
   968  metadata:
   969    name: name-2
   970    namespace: ns
   971  spec:
   972    type: LoadBalancer
   973    ports:
   974    - port: 7979`,
   975  			},
   976  			id:                               ServiceID{Name: "name-2", Namespace: "ns"},
   977  			port:                             7979,
   978  			expectedAddresses:                []string{},
   979  			expectedNoEndpoints:              true,
   980  			expectedNoEndpointsServiceExists: true,
   981  			expectedError:                    false,
   982  		},
   983  		{
   984  			serviceType: "external name services with EndpointSlices",
   985  			k8sConfigs: []string{`
   986  kind: APIResourceList
   987  apiVersion: v1
   988  groupVersion: discovery.k8s.io/v1
   989  resources:
   990    - name: endpointslices
   991      singularName: endpointslice
   992      namespaced: true
   993      kind: EndpointSlice
   994      verbs:
   995        - delete
   996        - deletecollection
   997        - get
   998        - list
   999        - patch
  1000        - create
  1001        - update
  1002        - watch
  1003  `, `
  1004  apiVersion: v1
  1005  kind: Service
  1006  metadata:
  1007    name: name-3-external-svc
  1008    namespace: ns
  1009  spec:
  1010    type: ExternalName
  1011    externalName: foo`,
  1012  			},
  1013  			id:                               ServiceID{Name: "name-3-external-svc", Namespace: "ns"},
  1014  			port:                             7777,
  1015  			expectedAddresses:                []string{},
  1016  			expectedNoEndpoints:              false,
  1017  			expectedNoEndpointsServiceExists: false,
  1018  			expectedError:                    true,
  1019  		},
  1020  		{
  1021  			serviceType:                      "services that do not exist",
  1022  			k8sConfigs:                       []string{},
  1023  			id:                               ServiceID{Name: "name-4-inexistent-svc", Namespace: "ns"},
  1024  			port:                             5555,
  1025  			expectedAddresses:                []string{},
  1026  			expectedNoEndpoints:              true,
  1027  			expectedNoEndpointsServiceExists: false,
  1028  			expectedError:                    false,
  1029  		},
  1030  		{
  1031  			serviceType: "stateful sets with EndpointSlices",
  1032  			k8sConfigs: []string{`
  1033  kind: APIResourceList
  1034  apiVersion: v1
  1035  groupVersion: discovery.k8s.io/v1
  1036  resources:
  1037    - name: endpointslices
  1038      singularName: endpointslice
  1039      namespaced: true
  1040      kind: EndpointSlice
  1041      verbs:
  1042        - delete
  1043        - deletecollection
  1044        - get
  1045        - list
  1046        - patch
  1047        - create
  1048        - update
  1049        - watch
  1050  `, `
  1051  apiVersion: v1
  1052  kind: Service
  1053  metadata:
  1054    name: name-1
  1055    namespace: ns
  1056  spec:
  1057    type: LoadBalancer
  1058    ports:
  1059    - port: 8989`, `
  1060  addressType: IPv4
  1061  apiVersion: discovery.k8s.io/v1
  1062  endpoints:
  1063  - addresses:
  1064    - 172.17.0.12
  1065    conditions:
  1066      ready: true
  1067    hostname: name-1-1
  1068    targetRef:
  1069      kind: Pod
  1070      name: name-1-1
  1071      namespace: ns
  1072    topology:
  1073      kubernetes.io/hostname: node-1
  1074  - addresses:
  1075    - 172.17.0.19
  1076    hostname: name-1-2
  1077    conditions:
  1078      ready: true
  1079    targetRef:
  1080      kind: Pod
  1081      name: name-1-2
  1082      namespace: ns
  1083    topology:
  1084      kubernetes.io/hostname: node-1
  1085  - addresses:
  1086    - 172.17.0.20
  1087    hostname: name-1-3
  1088    conditions:
  1089      ready: true
  1090    targetRef:
  1091      kind: Pod
  1092      name: name-1-3
  1093      namespace: ns
  1094    topology:
  1095      kubernetes.io/hostname: node-2
  1096  kind: EndpointSlice
  1097  metadata:
  1098    labels:
  1099      kubernetes.io/service-name: name-1
  1100    name: name-1-f5fad
  1101    namespace: ns
  1102  ports:
  1103  - name: ""
  1104    port: 8989`, `
  1105  apiVersion: v1
  1106  kind: Pod
  1107  metadata:
  1108    name: name-1-1
  1109    namespace: ns
  1110    ownerReferences:
  1111    - kind: ReplicaSet
  1112      name: rs-1
  1113  status:
  1114    phase: Running
  1115    podIP: 172.17.0.12`,
  1116  				`
  1117  apiVersion: v1
  1118  kind: Pod
  1119  metadata:
  1120    name: name-1-2
  1121    namespace: ns
  1122    ownerReferences:
  1123    - kind: ReplicaSet
  1124      name: rs-1
  1125  status:
  1126    phase: Running
  1127    podIP: 172.17.0.19`,
  1128  				`
  1129  apiVersion: v1
  1130  kind: Pod
  1131  metadata:
  1132    name: name-1-3
  1133    namespace: ns
  1134    ownerReferences:
  1135    - kind: ReplicaSet
  1136      name: rs-1
  1137  status:
  1138    phase: Running
  1139    podIP: 172.17.0.20`,
  1140  			},
  1141  			id:                               ServiceID{Name: "name-1", Namespace: "ns"},
  1142  			hostname:                         "name-1-3",
  1143  			port:                             6000,
  1144  			expectedAddresses:                []string{"172.17.0.20:6000"},
  1145  			expectedNoEndpoints:              false,
  1146  			expectedNoEndpointsServiceExists: false,
  1147  			expectedError:                    false,
  1148  		},
  1149  		{
  1150  			serviceType: "service with EndpointSlice without labels",
  1151  			k8sConfigs: []string{`
  1152  kind: APIResourceList
  1153  apiVersion: v1
  1154  groupVersion: discovery.k8s.io/v1
  1155  resources:
  1156    - name: endpointslices
  1157      singularName: endpointslice
  1158      namespaced: true
  1159      kind: EndpointSlice
  1160      verbs:
  1161        - delete
  1162        - deletecollection
  1163        - get
  1164        - list
  1165        - patch
  1166        - create
  1167        - update
  1168        - watch
  1169  `, `
  1170  apiVersion: v1
  1171  kind: Service
  1172  metadata:
  1173    name: name-5
  1174    namespace: ns
  1175  spec:
  1176    type: LoadBalancer
  1177    ports:
  1178    - port: 8989`, `
  1179  addressType: IPv4
  1180  apiVersion: discovery.k8s.io/v1
  1181  endpoints:
  1182  - addresses:
  1183    - 172.17.0.12
  1184    conditions:
  1185      ready: true
  1186    hostname: name-1-1
  1187    targetRef:
  1188      kind: Pod
  1189      name: name-1-1
  1190      namespace: ns
  1191    topology:
  1192      kubernetes.io/hostname: node-1
  1193  kind: EndpointSlice
  1194  metadata:
  1195    labels:
  1196    name: name-1-f5fad
  1197    namespace: ns
  1198  ports:
  1199  - name: ""
  1200    port: 8989`, `
  1201  apiVersion: v1
  1202  kind: Pod
  1203  metadata:
  1204    name: name-1-1
  1205    namespace: ns
  1206    ownerReferences:
  1207    - kind: ReplicaSet
  1208      name: rs-1
  1209  status:
  1210    phase: Running
  1211    podIP: 172.17.0.12`,
  1212  			},
  1213  			id:                               ServiceID{Name: "name-5", Namespace: "ns"},
  1214  			port:                             8989,
  1215  			expectedAddresses:                []string{},
  1216  			expectedNoEndpoints:              true,
  1217  			expectedNoEndpointsServiceExists: true,
  1218  			expectedError:                    false,
  1219  		},
  1220  		{
  1221  			serviceType: "service with IPv6 address type EndpointSlice",
  1222  			k8sConfigs: []string{`
  1223  kind: APIResourceList
  1224  apiVersion: v1
  1225  groupVersion: discovery.k8s.io/v1
  1226  resources:
  1227    - name: endpointslices
  1228      singularName: endpointslice
  1229      namespaced: true
  1230      kind: EndpointSlice
  1231      verbs:
  1232        - delete
  1233        - deletecollection
  1234        - get
  1235        - list
  1236        - patch
  1237        - create
  1238        - update
  1239        - watch
  1240  `, `
  1241  apiVersion: v1
  1242  kind: Service
  1243  metadata:
  1244    name: name-5
  1245    namespace: ns
  1246  spec:
  1247    type: LoadBalancer
  1248    ports:
  1249    - port: 9000`, `
  1250  addressType: IPv6
  1251  apiVersion: discovery.k8s.io/v1
  1252  endpoints:
  1253  - addresses:
  1254    - 0:0:0:0:0:0:0:1
  1255    conditions:
  1256      ready: true
  1257    targetRef:
  1258      kind: Pod
  1259      name: name-5-1
  1260      namespace: ns
  1261    topology:
  1262      kubernetes.io/hostname: node-1
  1263  kind: EndpointSlice
  1264  metadata:
  1265    labels:
  1266    name: name-5-f65dv
  1267    namespace: ns
  1268    ownerReferences:
  1269    - apiVersion: v1
  1270      kind: Service
  1271      name: name-5
  1272  ports:
  1273  - name: ""
  1274    port: 9000`, `
  1275  apiVersion: v1
  1276  kind: Pod
  1277  metadata:
  1278    name: name-5-1
  1279    namespace: ns
  1280    ownerReferences:
  1281    - kind: ReplicaSet
  1282      name: rs-1
  1283  status:
  1284    phase: Running
  1285    podIP: 0:0:0:0:0:0:0:1`,
  1286  			},
  1287  			id:                               ServiceID{Name: "name-5", Namespace: "ns"},
  1288  			port:                             9000,
  1289  			expectedAddresses:                []string{},
  1290  			expectedNoEndpoints:              true,
  1291  			expectedNoEndpointsServiceExists: true,
  1292  			expectedError:                    false,
  1293  		}} {
  1294  		tt := tt // pin
  1295  		t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
  1296  			k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
  1297  			if err != nil {
  1298  				t.Fatalf("NewFakeAPI returned an error: %s", err)
  1299  			}
  1300  
  1301  			metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  1302  			if err != nil {
  1303  				t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  1304  			}
  1305  
  1306  			watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
  1307  			if err != nil {
  1308  				t.Fatalf("can't create Endpoints watcher: %s", err)
  1309  			}
  1310  
  1311  			k8sAPI.Sync(nil)
  1312  			metadataAPI.Sync(nil)
  1313  
  1314  			listener := newBufferingEndpointListener()
  1315  
  1316  			err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
  1317  			if tt.expectedError && err == nil {
  1318  				t.Fatal("Expected error but was ok")
  1319  			}
  1320  			if !tt.expectedError && err != nil {
  1321  				t.Fatalf("Expected no error, got [%s]", err)
  1322  			}
  1323  
  1324  			if listener.localTrafficPolicy != tt.expectedLocalTrafficPolicy {
  1325  				t.Fatalf("Expected localTrafficPolicy [%v], got [%v]", tt.expectedLocalTrafficPolicy, listener.localTrafficPolicy)
  1326  			}
  1327  
  1328  			listener.ExpectAdded(tt.expectedAddresses, t)
  1329  
  1330  			if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
  1331  				t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
  1332  					tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
  1333  			}
  1334  
  1335  			if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
  1336  				t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
  1337  					tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
  1338  			}
  1339  		})
  1340  	}
  1341  }
  1342  
  1343  func TestEndpointsWatcherWithEndpointSlicesExternalWorkload(t *testing.T) {
  1344  	for _, tt := range []struct {
  1345  		serviceType                      string
  1346  		k8sConfigs                       []string
  1347  		id                               ServiceID
  1348  		hostname                         string
  1349  		port                             Port
  1350  		expectedAddresses                []string
  1351  		expectedNoEndpoints              bool
  1352  		expectedNoEndpointsServiceExists bool
  1353  		expectedError                    bool
  1354  		expectedLocalTrafficPolicy       bool
  1355  	}{
  1356  		{
  1357  			serviceType: "local services with EndpointSlice",
  1358  			k8sConfigs: []string{`
  1359  kind: APIResourceList
  1360  apiVersion: v1
  1361  groupVersion: discovery.k8s.io/v1
  1362  resources:
  1363    - name: endpointslices
  1364      singularName: endpointslice
  1365      namespaced: true
  1366      kind: EndpointSlice
  1367      verbs:
  1368        - delete
  1369        - deletecollection
  1370        - get
  1371        - list
  1372        - patch
  1373        - create
  1374        - update
  1375        - watch
  1376  `, `
  1377  apiVersion: v1
  1378  kind: Service
  1379  metadata:
  1380    name: name-1
  1381    namespace: ns
  1382  spec:
  1383    type: LoadBalancer
  1384    ports:
  1385    - port: 8989
  1386    internalTrafficPolicy: Local`,
  1387  				`
  1388  addressType: IPv4
  1389  apiVersion: discovery.k8s.io/v1
  1390  endpoints:
  1391  - addresses:
  1392    - 172.17.0.12
  1393    conditions:
  1394      ready: true
  1395    targetRef:
  1396      kind: ExternalWorkload
  1397      name: name-1-1
  1398      namespace: ns
  1399    topology:
  1400      kubernetes.io/hostname: node-1
  1401  - addresses:
  1402    - 172.17.0.19
  1403    conditions:
  1404      ready: true
  1405    targetRef:
  1406      kind: ExternalWorkload
  1407      name: name-1-2
  1408      namespace: ns
  1409    topology:
  1410      kubernetes.io/hostname: node-1
  1411  - addresses:
  1412    - 172.17.0.20
  1413    conditions:
  1414      ready: true
  1415    targetRef:
  1416      kind: ExternalWorkload
  1417      name: name-1-3
  1418      namespace: ns
  1419    topology:
  1420      kubernetes.io/hostname: node-2
  1421  - addresses:
  1422    - 172.17.0.21
  1423    conditions:
  1424      ready: true
  1425    topology:
  1426      kubernetes.io/hostname: node-2
  1427  kind: EndpointSlice
  1428  metadata:
  1429    labels:
  1430      kubernetes.io/service-name: name-1
  1431    name: name-1-bhnqh
  1432    namespace: ns
  1433  ports:
  1434  - name: ""
  1435    port: 8989`,
  1436  				`
  1437  apiVersion: workload.linkerd.io/v1beta1
  1438  kind: ExternalWorkload
  1439  metadata:
  1440    name: name-1-1
  1441    namespace: ns
  1442  status:
  1443    conditions:
  1444    ready: true`,
  1445  				`
  1446  apiVersion: workload.linkerd.io/v1beta1
  1447  kind: ExternalWorkload
  1448  metadata:
  1449    name: name-1-2
  1450    namespace: ns
  1451  status:
  1452    conditions:
  1453    ready: true`,
  1454  				`
  1455  apiVersion: workload.linkerd.io/v1beta1
  1456  kind: ExternalWorkload
  1457  metadata:
  1458    name: name-1-3
  1459    namespace: ns
  1460  status:
  1461    conditions:
  1462    ready: true`,
  1463  			},
  1464  			id:   ExternalWorkloadID{Name: "name-1", Namespace: "ns"},
  1465  			port: 8989,
  1466  			expectedAddresses: []string{
  1467  				"172.17.0.12:8989",
  1468  				"172.17.0.19:8989",
  1469  				"172.17.0.20:8989",
  1470  				"172.17.0.21:8989",
  1471  			},
  1472  			expectedNoEndpoints:              false,
  1473  			expectedNoEndpointsServiceExists: false,
  1474  			expectedError:                    false,
  1475  			expectedLocalTrafficPolicy:       true,
  1476  		},
  1477  		{
  1478  			serviceType: "local services with missing addresses and EndpointSlice",
  1479  			k8sConfigs: []string{`
  1480  kind: APIResourceList
  1481  apiVersion: v1
  1482  groupVersion: discovery.k8s.io/v1
  1483  resources:
  1484    - name: endpointslices
  1485      singularName: endpointslice
  1486      namespaced: true
  1487      kind: EndpointSlice
  1488      verbs:
  1489        - delete
  1490        - deletecollection
  1491        - get
  1492        - list
  1493        - patch
  1494        - create
  1495        - update
  1496        - watch
  1497  `, `
  1498  apiVersion: v1
  1499  kind: Service
  1500  metadata:
  1501    name: name-1
  1502    namespace: ns
  1503  spec:
  1504    type: LoadBalancer
  1505    ports:
  1506    - port: 8989`, `
  1507  addressType: IPv4
  1508  apiVersion: discovery.k8s.io/v1
  1509  endpoints:
  1510  - addresses:
  1511    - 172.17.0.23
  1512    conditions:
  1513      ready: true
  1514    targetRef:
  1515      kind: ExternalWorkload
  1516      name: name-1-1
  1517      namespace: ns
  1518    topology:
  1519      kubernetes.io/hostname: node-1
  1520  - addresses:
  1521    - 172.17.0.24
  1522    conditions:
  1523      ready: true
  1524    targetRef:
  1525      kind: ExternalWorkload
  1526      name: name-1-2
  1527      namespace: ns
  1528    topology:
  1529      kubernetes.io/hostname: node-1
  1530  - addresses:
  1531    - 172.17.0.25
  1532    conditions:
  1533      ready: true
  1534    targetRef:
  1535      kind: ExternalWorkload
  1536      name: name-1-3
  1537      namespace: ns
  1538    topology:
  1539      kubernetes.io/hostname: node-2
  1540  kind: EndpointSlice
  1541  metadata:
  1542    labels:
  1543      kubernetes.io/service-name: name-1
  1544    name: name1-f5fad
  1545    namespace: ns
  1546  ports:
  1547  - name: ""
  1548    port: 8989`, `
  1549  apiVersion: workload.linkerd.io/v1beta1
  1550  kind: ExternalWorkload
  1551  metadata:
  1552    name: name-1-3
  1553    namespace: ns
  1554  status:
  1555    conditions:
  1556    ready: true`,
  1557  			},
  1558  			id:                               ServiceID{Name: "name-1", Namespace: "ns"},
  1559  			port:                             8989,
  1560  			expectedAddresses:                []string{"172.17.0.25:8989"},
  1561  			expectedNoEndpoints:              false,
  1562  			expectedNoEndpointsServiceExists: false,
  1563  			expectedError:                    false,
  1564  		},
  1565  		{
  1566  			serviceType: "service with EndpointSlice without labels",
  1567  			k8sConfigs: []string{`
  1568  kind: APIResourceList
  1569  apiVersion: v1
  1570  groupVersion: discovery.k8s.io/v1
  1571  resources:
  1572    - name: endpointslices
  1573      singularName: endpointslice
  1574      namespaced: true
  1575      kind: EndpointSlice
  1576      verbs:
  1577        - delete
  1578        - deletecollection
  1579        - get
  1580        - list
  1581        - patch
  1582        - create
  1583        - update
  1584        - watch
  1585  `, `
  1586  apiVersion: v1
  1587  kind: Service
  1588  metadata:
  1589    name: name-5
  1590    namespace: ns
  1591  spec:
  1592    type: LoadBalancer
  1593    ports:
  1594    - port: 8989`, `
  1595  addressType: IPv4
  1596  apiVersion: discovery.k8s.io/v1
  1597  endpoints:
  1598  - addresses:
  1599    - 172.17.0.12
  1600    conditions:
  1601      ready: true
  1602    hostname: name-1-1
  1603    targetRef:
  1604      kind: ExternalWorkload
  1605      name: name-1-1
  1606      namespace: ns
  1607    topology:
  1608      kubernetes.io/hostname: node-1
  1609  kind: EndpointSlice
  1610  metadata:
  1611    labels:
  1612    name: name-1-f5fad
  1613    namespace: ns
  1614  ports:
  1615  - name: ""
  1616    port: 8989`, `
  1617  apiVersion: workload.linkerd.io/v1beta1
  1618  kind: ExternalWorkload
  1619  metadata:
  1620    name: name-1-1
  1621    namespace: ns
  1622  status:
  1623    conditions:
  1624    ready: true`,
  1625  			},
  1626  			id:                               ServiceID{Name: "name-5", Namespace: "ns"},
  1627  			port:                             8989,
  1628  			expectedAddresses:                []string{},
  1629  			expectedNoEndpoints:              true,
  1630  			expectedNoEndpointsServiceExists: true,
  1631  			expectedError:                    false,
  1632  		},
  1633  
  1634  		{
  1635  			serviceType: "service with IPv6 address type EndpointSlice",
  1636  			k8sConfigs: []string{`
  1637  kind: APIResourceList
  1638  apiVersion: v1
  1639  groupVersion: discovery.k8s.io/v1
  1640  resources:
  1641    - name: endpointslices
  1642      singularName: endpointslice
  1643      namespaced: true
  1644      kind: EndpointSlice
  1645      verbs:
  1646        - delete
  1647        - deletecollection
  1648        - get
  1649        - list
  1650        - patch
  1651        - create
  1652        - update
  1653        - watch
  1654  `, `
  1655  apiVersion: v1
  1656  kind: Service
  1657  metadata:
  1658    name: name-5
  1659    namespace: ns
  1660  spec:
  1661    type: LoadBalancer
  1662    ports:
  1663    - port: 9000`, `
  1664  addressType: IPv6
  1665  apiVersion: discovery.k8s.io/v1
  1666  endpoints:
  1667  - addresses:
  1668    - 0:0:0:0:0:0:0:1
  1669    conditions:
  1670      ready: true
  1671    targetRef:
  1672      kind: ExternalWorkload
  1673      name: name-5-1
  1674      namespace: ns
  1675    topology:
  1676      kubernetes.io/hostname: node-1
  1677  kind: EndpointSlice
  1678  metadata:
  1679    labels:
  1680    name: name-5-f65dv
  1681    namespace: ns
  1682    ownerReferences:
  1683    - apiVersion: v1
  1684      kind: Service
  1685      name: name-5
  1686  ports:
  1687  - name: ""
  1688    port: 9000`, `
  1689  apiVersion: workload.linkerd.io/v1beta1
  1690  kind: ExternalWorkload
  1691  metadata:
  1692    name: name-5-1
  1693    namespace: ns
  1694  status:
  1695    conditions:
  1696    ready: true`,
  1697  			},
  1698  			id:                               ServiceID{Name: "name-5", Namespace: "ns"},
  1699  			port:                             9000,
  1700  			expectedAddresses:                []string{},
  1701  			expectedNoEndpoints:              true,
  1702  			expectedNoEndpointsServiceExists: true,
  1703  			expectedError:                    false,
  1704  		},
  1705  	} {
  1706  		tt := tt // pin
  1707  		t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
  1708  			k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
  1709  			if err != nil {
  1710  				t.Fatalf("NewFakeAPI returned an error: %s", err)
  1711  			}
  1712  
  1713  			metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  1714  			if err != nil {
  1715  				t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  1716  			}
  1717  
  1718  			watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
  1719  			if err != nil {
  1720  				t.Fatalf("can't create Endpoints watcher: %s", err)
  1721  			}
  1722  
  1723  			k8sAPI.Sync(nil)
  1724  			metadataAPI.Sync(nil)
  1725  
  1726  			listener := newBufferingEndpointListener()
  1727  
  1728  			err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
  1729  			if tt.expectedError && err == nil {
  1730  				t.Fatal("Expected error but was ok")
  1731  			}
  1732  			if !tt.expectedError && err != nil {
  1733  				t.Fatalf("Expected no error, got [%s]", err)
  1734  			}
  1735  
  1736  			if listener.localTrafficPolicy != tt.expectedLocalTrafficPolicy {
  1737  				t.Fatalf("Expected localTrafficPolicy [%v], got [%v]", tt.expectedLocalTrafficPolicy, listener.localTrafficPolicy)
  1738  			}
  1739  
  1740  			listener.ExpectAdded(tt.expectedAddresses, t)
  1741  
  1742  			if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
  1743  				t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
  1744  					tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
  1745  			}
  1746  
  1747  			if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
  1748  				t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
  1749  					tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
  1750  			}
  1751  		})
  1752  	}
  1753  }
  1754  
  1755  func TestEndpointsWatcherDeletion(t *testing.T) {
  1756  	k8sConfigs := []string{`
  1757  apiVersion: v1
  1758  kind: Service
  1759  metadata:
  1760    name: name1
  1761    namespace: ns
  1762  spec:
  1763    type: LoadBalancer
  1764    ports:
  1765    - port: 8989`,
  1766  		`
  1767  apiVersion: v1
  1768  kind: Endpoints
  1769  metadata:
  1770    name: name1
  1771    namespace: ns
  1772  subsets:
  1773  - addresses:
  1774    - ip: 172.17.0.12
  1775      targetRef:
  1776        kind: Pod
  1777        name: name1-1
  1778        namespace: ns
  1779    ports:
  1780    - port: 8989`,
  1781  		`
  1782  apiVersion: v1
  1783  kind: Pod
  1784  metadata:
  1785    name: name1-1
  1786    namespace: ns
  1787  status:
  1788    phase: Running
  1789    podIP: 172.17.0.12`}
  1790  
  1791  	for _, tt := range []struct {
  1792  		serviceType      string
  1793  		k8sConfigs       []string
  1794  		id               ServiceID
  1795  		hostname         string
  1796  		port             Port
  1797  		objectToDelete   interface{}
  1798  		deletingServices bool
  1799  	}{
  1800  		{
  1801  			serviceType:    "can delete endpoints",
  1802  			k8sConfigs:     k8sConfigs,
  1803  			id:             ServiceID{Name: "name1", Namespace: "ns"},
  1804  			port:           8989,
  1805  			hostname:       "name1-1",
  1806  			objectToDelete: &corev1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}},
  1807  		},
  1808  		{
  1809  			serviceType:    "can delete endpoints when wrapped in a DeletedFinalStateUnknown",
  1810  			k8sConfigs:     k8sConfigs,
  1811  			id:             ServiceID{Name: "name1", Namespace: "ns"},
  1812  			port:           8989,
  1813  			hostname:       "name1-1",
  1814  			objectToDelete: &corev1.Endpoints{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}},
  1815  		},
  1816  		{
  1817  			serviceType:      "can delete services",
  1818  			k8sConfigs:       k8sConfigs,
  1819  			id:               ServiceID{Name: "name1", Namespace: "ns"},
  1820  			port:             8989,
  1821  			hostname:         "name1-1",
  1822  			objectToDelete:   &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}},
  1823  			deletingServices: true,
  1824  		},
  1825  		{
  1826  			serviceType:      "can delete services when wrapped in a DeletedFinalStateUnknown",
  1827  			k8sConfigs:       k8sConfigs,
  1828  			id:               ServiceID{Name: "name1", Namespace: "ns"},
  1829  			port:             8989,
  1830  			hostname:         "name1-1",
  1831  			objectToDelete:   &corev1.Service{ObjectMeta: metav1.ObjectMeta{Name: "name1", Namespace: "ns"}},
  1832  			deletingServices: true,
  1833  		},
  1834  	} {
  1835  
  1836  		tt := tt // pin
  1837  		t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
  1838  			k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
  1839  			if err != nil {
  1840  				t.Fatalf("NewFakeAPI returned an error: %s", err)
  1841  			}
  1842  
  1843  			metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  1844  			if err != nil {
  1845  				t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  1846  			}
  1847  
  1848  			watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
  1849  			if err != nil {
  1850  				t.Fatalf("can't create Endpoints watcher: %s", err)
  1851  			}
  1852  
  1853  			k8sAPI.Sync(nil)
  1854  			metadataAPI.Sync(nil)
  1855  
  1856  			listener := newBufferingEndpointListener()
  1857  
  1858  			err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
  1859  			if err != nil {
  1860  				t.Fatal(err)
  1861  			}
  1862  
  1863  			if tt.deletingServices {
  1864  				watcher.deleteService(tt.objectToDelete)
  1865  			} else {
  1866  				watcher.deleteEndpoints(tt.objectToDelete)
  1867  			}
  1868  
  1869  			if !listener.endpointsAreNotCalled() {
  1870  				t.Fatal("Expected NoEndpoints to be Called")
  1871  			}
  1872  		})
  1873  
  1874  	}
  1875  }
  1876  
  1877  func TestEndpointsWatcherDeletionWithEndpointSlices(t *testing.T) {
  1878  	k8sConfigsWithES := []string{`
  1879  kind: APIResourceList
  1880  apiVersion: v1
  1881  groupVersion: discovery.k8s.io/v1
  1882  resources:
  1883    - name: endpointslices
  1884      singularName: endpointslice
  1885      namespaced: true
  1886      kind: EndpointSlice
  1887      verbs:
  1888        - delete
  1889        - deletecollection
  1890        - get
  1891        - list
  1892        - patch
  1893        - create
  1894        - update
  1895        - watch
  1896  `, `
  1897  apiVersion: v1
  1898  kind: Service
  1899  metadata:
  1900    name: name1
  1901    namespace: ns
  1902  spec:
  1903    type: LoadBalancer
  1904    ports:
  1905    - port: 8989`, `
  1906  addressType: IPv4
  1907  apiVersion: discovery.k8s.io/v1
  1908  endpoints:
  1909  - addresses:
  1910    - 172.17.0.12
  1911    conditions:
  1912      ready: true
  1913    targetRef:
  1914      kind: Pod
  1915      name: name1-1
  1916      namespace: ns
  1917    topology:
  1918      kubernetes.io/hostname: node-1
  1919  kind: EndpointSlice
  1920  metadata:
  1921    labels:
  1922      kubernetes.io/service-name: name1
  1923    name: name1-del
  1924    namespace: ns
  1925  ports:
  1926  - name: ""
  1927    port: 8989`, `
  1928  apiVersion: v1
  1929  kind: Pod
  1930  metadata:
  1931    name: name1-1
  1932    namespace: ns
  1933  status:
  1934    phase: Running
  1935    podIP: 172.17.0.12`}
  1936  
  1937  	k8sConfigWithMultipleES := append(k8sConfigsWithES, []string{`
  1938  addressType: IPv4
  1939  apiVersion: discovery.k8s.io/v1
  1940  endpoints:
  1941  - addresses:
  1942    - 172.17.0.13
  1943    conditions:
  1944      ready: true
  1945    targetRef:
  1946      kind: Pod
  1947      name: name1-2
  1948      namespace: ns
  1949    topology:
  1950      kubernetes.io/hostname: node-1
  1951  kind: EndpointSlice
  1952  metadata:
  1953    labels:
  1954      kubernetes.io/service-name: name1
  1955    name: name1-live
  1956    namespace: ns
  1957  ports:
  1958  - name: ""
  1959    port: 8989`, `apiVersion: v1
  1960  kind: Pod
  1961  metadata:
  1962    name: name1-2
  1963    namespace: ns
  1964  status:
  1965    phase: Running
  1966    podIP: 172.17.0.13`}...)
  1967  
  1968  	for _, tt := range []struct {
  1969  		serviceType       string
  1970  		k8sConfigs        []string
  1971  		id                ServiceID
  1972  		hostname          string
  1973  		port              Port
  1974  		objectToDelete    interface{}
  1975  		deletingServices  bool
  1976  		hasSliceAccess    bool
  1977  		noEndpointsCalled bool
  1978  	}{
  1979  		{
  1980  			serviceType:       "can delete an EndpointSlice",
  1981  			k8sConfigs:        k8sConfigsWithES,
  1982  			id:                ServiceID{Name: "name1", Namespace: "ns"},
  1983  			port:              8989,
  1984  			hostname:          "name1-1",
  1985  			objectToDelete:    createTestEndpointSlice(consts.PodKind),
  1986  			hasSliceAccess:    true,
  1987  			noEndpointsCalled: true,
  1988  		},
  1989  		{
  1990  			serviceType:       "can delete an EndpointSlice when wrapped in a DeletedFinalStateUnknown",
  1991  			k8sConfigs:        k8sConfigsWithES,
  1992  			id:                ServiceID{Name: "name1", Namespace: "ns"},
  1993  			port:              8989,
  1994  			hostname:          "name1-1",
  1995  			objectToDelete:    createTestEndpointSlice(consts.PodKind),
  1996  			hasSliceAccess:    true,
  1997  			noEndpointsCalled: true,
  1998  		},
  1999  		{
  2000  			serviceType:       "can delete an EndpointSlice when there are multiple ones",
  2001  			k8sConfigs:        k8sConfigWithMultipleES,
  2002  			id:                ServiceID{Name: "name1", Namespace: "ns"},
  2003  			port:              8989,
  2004  			hostname:          "name1-1",
  2005  			objectToDelete:    createTestEndpointSlice(consts.PodKind),
  2006  			hasSliceAccess:    true,
  2007  			noEndpointsCalled: false,
  2008  		},
  2009  	} {
  2010  		tt := tt // pin
  2011  		t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
  2012  			k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
  2013  			if err != nil {
  2014  				t.Fatalf("NewFakeAPI returned an error: %s", err)
  2015  			}
  2016  
  2017  			metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  2018  			if err != nil {
  2019  				t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  2020  			}
  2021  
  2022  			watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
  2023  			if err != nil {
  2024  				t.Fatalf("can't create Endpoints watcher: %s", err)
  2025  			}
  2026  
  2027  			k8sAPI.Sync(nil)
  2028  			metadataAPI.Sync(nil)
  2029  
  2030  			listener := newBufferingEndpointListener()
  2031  
  2032  			err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
  2033  			if err != nil {
  2034  				t.Fatal(err)
  2035  			}
  2036  
  2037  			watcher.deleteEndpointSlice(tt.objectToDelete)
  2038  
  2039  			if listener.endpointsAreNotCalled() != tt.noEndpointsCalled {
  2040  				t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
  2041  					tt.noEndpointsCalled, listener.endpointsAreNotCalled())
  2042  			}
  2043  		})
  2044  	}
  2045  }
  2046  
  2047  func TestEndpointsWatcherDeletionWithEndpointSlicesExternalWorkload(t *testing.T) {
  2048  	k8sConfigsWithES := []string{`
  2049  kind: APIResourceList
  2050  apiVersion: v1
  2051  groupVersion: discovery.k8s.io/v1
  2052  resources:
  2053    - name: endpointslices
  2054      singularName: endpointslice
  2055      namespaced: true
  2056      kind: EndpointSlice
  2057      verbs:
  2058        - delete
  2059        - deletecollection
  2060        - get
  2061        - list
  2062        - patch
  2063        - create
  2064        - update
  2065        - watch
  2066  `, `
  2067  apiVersion: v1
  2068  kind: Service
  2069  metadata:
  2070    name: name1
  2071    namespace: ns
  2072  spec:
  2073    type: LoadBalancer
  2074    ports:
  2075    - port: 8989`, `
  2076  addressType: IPv4
  2077  apiVersion: discovery.k8s.io/v1
  2078  endpoints:
  2079  - addresses:
  2080    - 172.17.0.12
  2081    conditions:
  2082      ready: true
  2083    targetRef:
  2084      kind: ExternalWorkload
  2085      name: name1-1
  2086      namespace: ns
  2087    topology:
  2088      kubernetes.io/hostname: node-1
  2089  kind: EndpointSlice
  2090  metadata:
  2091    labels:
  2092      kubernetes.io/service-name: name1
  2093    name: name1-del
  2094    namespace: ns
  2095  ports:
  2096  - name: ""
  2097    port: 8989`, `
  2098  apiVersion: workload.linkerd.io/v1beta1
  2099  kind: ExternalWorkload
  2100  metadata:
  2101    name: name1-1
  2102    namespace: ns
  2103  status:
  2104    conditions:
  2105    ready: true`}
  2106  
  2107  	k8sConfigWithMultipleES := append(k8sConfigsWithES, []string{`
  2108  addressType: IPv4
  2109  apiVersion: discovery.k8s.io/v1
  2110  endpoints:
  2111  - addresses:
  2112    - 172.17.0.13
  2113    conditions:
  2114      ready: true
  2115    targetRef:
  2116      kind: ExternalWorkload
  2117      name: name1-2
  2118      namespace: ns
  2119    topology:
  2120      kubernetes.io/hostname: node-1
  2121  kind: EndpointSlice
  2122  metadata:
  2123    labels:
  2124      kubernetes.io/service-name: name1
  2125    name: name1-live
  2126    namespace: ns
  2127  ports:
  2128  - name: ""
  2129    port: 8989`, `apiVersion: workload.linkerd.io/v1beta1
  2130  kind: ExternalWorkload
  2131  metadata:
  2132    name: name1-2
  2133    namespace: ns
  2134  status:
  2135    conditions:
  2136    ready: true`}...)
  2137  
  2138  	for _, tt := range []struct {
  2139  		serviceType       string
  2140  		k8sConfigs        []string
  2141  		id                ServiceID
  2142  		hostname          string
  2143  		port              Port
  2144  		objectToDelete    interface{}
  2145  		deletingServices  bool
  2146  		hasSliceAccess    bool
  2147  		noEndpointsCalled bool
  2148  	}{
  2149  		{
  2150  			serviceType:       "can delete an EndpointSlice",
  2151  			k8sConfigs:        k8sConfigsWithES,
  2152  			id:                ServiceID{Name: "name1", Namespace: "ns"},
  2153  			port:              8989,
  2154  			hostname:          "name1-1",
  2155  			objectToDelete:    createTestEndpointSlice(consts.ExtWorkloadKind),
  2156  			hasSliceAccess:    true,
  2157  			noEndpointsCalled: true,
  2158  		},
  2159  		{
  2160  			serviceType:       "can delete an EndpointSlice when wrapped in a DeletedFinalStateUnknown",
  2161  			k8sConfigs:        k8sConfigsWithES,
  2162  			id:                ServiceID{Name: "name1", Namespace: "ns"},
  2163  			port:              8989,
  2164  			hostname:          "name1-1",
  2165  			objectToDelete:    createTestEndpointSlice(consts.ExtWorkloadKind),
  2166  			hasSliceAccess:    true,
  2167  			noEndpointsCalled: true,
  2168  		},
  2169  		{
  2170  			serviceType:       "can delete an EndpointSlice when there are multiple ones",
  2171  			k8sConfigs:        k8sConfigWithMultipleES,
  2172  			id:                ServiceID{Name: "name1", Namespace: "ns"},
  2173  			port:              8989,
  2174  			hostname:          "name1-1",
  2175  			objectToDelete:    createTestEndpointSlice(consts.ExtWorkloadKind),
  2176  			hasSliceAccess:    true,
  2177  			noEndpointsCalled: false,
  2178  		},
  2179  	} {
  2180  		tt := tt // pin
  2181  		t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
  2182  			k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
  2183  			if err != nil {
  2184  				t.Fatalf("NewFakeAPI returned an error: %s", err)
  2185  			}
  2186  
  2187  			metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  2188  			if err != nil {
  2189  				t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  2190  			}
  2191  
  2192  			watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
  2193  			if err != nil {
  2194  				t.Fatalf("can't create Endpoints watcher: %s", err)
  2195  			}
  2196  
  2197  			k8sAPI.Sync(nil)
  2198  			metadataAPI.Sync(nil)
  2199  
  2200  			listener := newBufferingEndpointListener()
  2201  
  2202  			err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
  2203  			if err != nil {
  2204  				t.Fatal(err)
  2205  			}
  2206  
  2207  			watcher.deleteEndpointSlice(tt.objectToDelete)
  2208  
  2209  			if listener.endpointsAreNotCalled() != tt.noEndpointsCalled {
  2210  				t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
  2211  					tt.noEndpointsCalled, listener.endpointsAreNotCalled())
  2212  			}
  2213  		})
  2214  	}
  2215  }
  2216  
  2217  func TestEndpointsWatcherServiceMirrors(t *testing.T) {
  2218  	for _, tt := range []struct {
  2219  		serviceType                      string
  2220  		k8sConfigs                       []string
  2221  		id                               ServiceID
  2222  		hostname                         string
  2223  		port                             Port
  2224  		expectedAddresses                []string
  2225  		expectedNoEndpoints              bool
  2226  		expectedNoEndpointsServiceExists bool
  2227  		enableEndpointSlices             bool
  2228  	}{
  2229  		{
  2230  			k8sConfigs: []string{`
  2231  apiVersion: v1
  2232  kind: Service
  2233  metadata:
  2234    name: name1-remote
  2235    namespace: ns
  2236  spec:
  2237    type: LoadBalancer
  2238    ports:
  2239    - port: 8989`,
  2240  				`
  2241  apiVersion: v1
  2242  kind: Endpoints
  2243  metadata:
  2244    name: name1-remote
  2245    namespace: ns
  2246    annotations:
  2247      mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
  2248      mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
  2249    labels:
  2250      mirror.linkerd.io/mirrored-service: "true"
  2251  subsets:
  2252  - addresses:
  2253    - ip: 172.17.0.12
  2254    ports:
  2255    - port: 8989`,
  2256  			},
  2257  			serviceType: "mirrored service with identity",
  2258  			id:          ServiceID{Name: "name1-remote", Namespace: "ns"},
  2259  			port:        8989,
  2260  			expectedAddresses: []string{
  2261  				"172.17.0.12:8989/gateway-identity-1/name1-remote-fq:8989",
  2262  			},
  2263  			expectedNoEndpoints:              false,
  2264  			expectedNoEndpointsServiceExists: false,
  2265  		},
  2266  		{
  2267  			k8sConfigs: []string{`
  2268  apiVersion: v1
  2269  kind: Service
  2270  metadata:
  2271    name: name1-remote
  2272    namespace: ns
  2273  spec:
  2274    type: LoadBalancer
  2275    ports:
  2276    - port: 8989`,
  2277  				`
  2278  apiVersion: discovery.k8s.io/v1
  2279  kind: EndpointSlice
  2280  metadata:
  2281    name: name1-remote-xxxx
  2282    namespace: ns
  2283    annotations:
  2284      mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
  2285      mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
  2286    labels:
  2287      mirror.linkerd.io/mirrored-service: "true"
  2288      kubernetes.io/service-name: name1-remote
  2289  endpoints:
  2290  - addresses:
  2291    - 172.17.0.12
  2292  ports:
  2293  - port: 8989`,
  2294  			},
  2295  			serviceType: "mirrored service with identity and endpoint slices",
  2296  			id:          ServiceID{Name: "name1-remote", Namespace: "ns"},
  2297  			port:        8989,
  2298  			expectedAddresses: []string{
  2299  				"172.17.0.12:8989/gateway-identity-1/name1-remote-fq:8989",
  2300  			},
  2301  			expectedNoEndpoints:              false,
  2302  			expectedNoEndpointsServiceExists: false,
  2303  			enableEndpointSlices:             true,
  2304  		},
  2305  		{
  2306  			k8sConfigs: []string{`
  2307  apiVersion: v1
  2308  kind: Service
  2309  metadata:
  2310    name: name1-remote
  2311    namespace: ns
  2312  spec:
  2313    type: LoadBalancer
  2314    ports:
  2315    - port: 8989`,
  2316  				`
  2317  apiVersion: v1
  2318  kind: Endpoints
  2319  metadata:
  2320    name: name1-remote
  2321    namespace: ns
  2322    annotations:
  2323      mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
  2324    labels:
  2325      mirror.linkerd.io/mirrored-service: "true"
  2326  subsets:
  2327  - addresses:
  2328    - ip: 172.17.0.12
  2329    ports:
  2330    - port: 8989`,
  2331  			},
  2332  			serviceType: "mirrored service without identity",
  2333  			id:          ServiceID{Name: "name1-remote", Namespace: "ns"},
  2334  			port:        8989,
  2335  			expectedAddresses: []string{
  2336  				"172.17.0.12:8989/name1-remote-fq:8989",
  2337  			},
  2338  			expectedNoEndpoints:              false,
  2339  			expectedNoEndpointsServiceExists: false,
  2340  		},
  2341  
  2342  		{
  2343  			k8sConfigs: []string{`
  2344  apiVersion: v1
  2345  kind: Service
  2346  metadata:
  2347    name: name1-remote
  2348    namespace: ns
  2349  spec:
  2350    type: LoadBalancer
  2351    ports:
  2352    - port: 8989`,
  2353  				`
  2354  apiVersion: v1
  2355  kind: Endpoints
  2356  metadata:
  2357    name: name1-remote
  2358    namespace: ns
  2359    annotations:
  2360      mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
  2361      mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
  2362    labels:
  2363      mirror.linkerd.io/mirrored-service: "true"
  2364  subsets:
  2365  - addresses:
  2366    - ip: 172.17.0.12
  2367    ports:
  2368    - port: 9999`,
  2369  			},
  2370  			serviceType: "mirrored service with remapped port in endpoints",
  2371  			id:          ServiceID{Name: "name1-remote", Namespace: "ns"},
  2372  			port:        8989,
  2373  			expectedAddresses: []string{
  2374  				"172.17.0.12:9999/gateway-identity-1/name1-remote-fq:8989",
  2375  			},
  2376  			expectedNoEndpoints:              false,
  2377  			expectedNoEndpointsServiceExists: false,
  2378  		},
  2379  		{
  2380  			k8sConfigs: []string{`
  2381  apiVersion: v1
  2382  kind: Service
  2383  metadata:
  2384    name: name1-remote
  2385    namespace: ns
  2386  spec:
  2387    type: LoadBalancer
  2388    ports:
  2389    - port: 8989`,
  2390  				`
  2391  apiVersion: v1
  2392  kind: Endpoints
  2393  metadata:
  2394    name: name1-remote
  2395    namespace: ns
  2396    annotations:
  2397      mirror.linkerd.io/remote-gateway-identity: ""
  2398      mirror.linkerd.io/remote-svc-fq-name: "name1-remote-fq"
  2399    labels:
  2400      mirror.linkerd.io/mirrored-service: "true"
  2401  subsets:
  2402  - addresses:
  2403    - ip: 172.17.0.12
  2404    ports:
  2405    - port: 9999`,
  2406  			},
  2407  			serviceType: "mirrored service with empty identity and remapped port in endpoints",
  2408  			id:          ServiceID{Name: "name1-remote", Namespace: "ns"},
  2409  			port:        8989,
  2410  			expectedAddresses: []string{
  2411  				"172.17.0.12:9999/name1-remote-fq:8989",
  2412  			},
  2413  			expectedNoEndpoints:              false,
  2414  			expectedNoEndpointsServiceExists: false,
  2415  		},
  2416  	} {
  2417  		tt := tt // pin
  2418  		t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
  2419  			k8sAPI, err := k8s.NewFakeAPI(tt.k8sConfigs...)
  2420  			if err != nil {
  2421  				t.Fatalf("NewFakeAPI returned an error: %s", err)
  2422  			}
  2423  
  2424  			metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  2425  			if err != nil {
  2426  				t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  2427  			}
  2428  
  2429  			watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), tt.enableEndpointSlices, "local")
  2430  			if err != nil {
  2431  				t.Fatalf("can't create Endpoints watcher: %s", err)
  2432  			}
  2433  
  2434  			k8sAPI.Sync(nil)
  2435  			metadataAPI.Sync(nil)
  2436  
  2437  			listener := newBufferingEndpointListener()
  2438  
  2439  			err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
  2440  
  2441  			if err != nil {
  2442  				t.Fatalf("NewFakeAPI returned an error: %s", err)
  2443  			}
  2444  
  2445  			listener.ExpectAdded(tt.expectedAddresses, t)
  2446  
  2447  			if listener.endpointsAreNotCalled() != tt.expectedNoEndpoints {
  2448  				t.Fatalf("Expected noEndpointsCalled to be [%t], got [%t]",
  2449  					tt.expectedNoEndpoints, listener.endpointsAreNotCalled())
  2450  			}
  2451  
  2452  			if listener.endpointsDoNotExist() != tt.expectedNoEndpointsServiceExists {
  2453  				t.Fatalf("Expected noEndpointsExist to be [%t], got [%t]",
  2454  					tt.expectedNoEndpointsServiceExists, listener.endpointsDoNotExist())
  2455  			}
  2456  		})
  2457  	}
  2458  }
  2459  
  2460  func testPod(resVersion string) *corev1.Pod {
  2461  	return &corev1.Pod{
  2462  		TypeMeta: metav1.TypeMeta{
  2463  			Kind:       "Pod",
  2464  			APIVersion: "v1",
  2465  		},
  2466  		ObjectMeta: metav1.ObjectMeta{
  2467  			ResourceVersion: resVersion,
  2468  			Name:            "name1-1",
  2469  			Namespace:       "ns",
  2470  		},
  2471  		Status: corev1.PodStatus{
  2472  			Phase: corev1.PodRunning,
  2473  			PodIP: "172.17.0.12",
  2474  		},
  2475  	}
  2476  }
  2477  
  2478  func endpoints(identity string) *corev1.Endpoints {
  2479  	return &corev1.Endpoints{
  2480  		TypeMeta: metav1.TypeMeta{
  2481  			Kind:       "Endpoints",
  2482  			APIVersion: "v1",
  2483  		},
  2484  		ObjectMeta: metav1.ObjectMeta{
  2485  			Name:      "remote-service",
  2486  			Namespace: "ns",
  2487  			Annotations: map[string]string{
  2488  				consts.RemoteGatewayIdentity: identity,
  2489  				consts.RemoteServiceFqName:   "remote-service.svc.default.cluster.local",
  2490  			},
  2491  			Labels: map[string]string{
  2492  				consts.MirroredResourceLabel: "true",
  2493  			},
  2494  		},
  2495  		Subsets: []corev1.EndpointSubset{
  2496  			{
  2497  				Addresses: []corev1.EndpointAddress{
  2498  					{
  2499  						IP: "1.2.3.4",
  2500  					},
  2501  				},
  2502  				Ports: []corev1.EndpointPort{
  2503  					{
  2504  						Port: 80,
  2505  					},
  2506  				},
  2507  			},
  2508  		},
  2509  	}
  2510  }
  2511  
  2512  func createTestEndpointSlice(targetRefKind string) *dv1.EndpointSlice {
  2513  	return &dv1.EndpointSlice{
  2514  		AddressType: "IPv4",
  2515  		ObjectMeta:  metav1.ObjectMeta{Name: "name1-del", Namespace: "ns", Labels: map[string]string{dv1.LabelServiceName: "name1"}},
  2516  		Endpoints: []dv1.Endpoint{
  2517  			{
  2518  				Addresses:  []string{"172.17.0.12"},
  2519  				Conditions: dv1.EndpointConditions{Ready: func(b bool) *bool { return &b }(true)},
  2520  				TargetRef:  &corev1.ObjectReference{Name: "name1-1", Namespace: "ns", Kind: targetRefKind},
  2521  			},
  2522  		},
  2523  		Ports: []dv1.EndpointPort{
  2524  			{
  2525  				Name: func(s string) *string { return &s }(""),
  2526  				Port: func(i int32) *int32 { return &i }(8989),
  2527  			},
  2528  		},
  2529  	}
  2530  }
  2531  
  2532  func TestEndpointsChangeDetection(t *testing.T) {
  2533  
  2534  	k8sConfigs := []string{`
  2535  apiVersion: v1
  2536  kind: Service
  2537  metadata:
  2538    name: remote-service
  2539    namespace: ns
  2540  spec:
  2541    ports:
  2542    - port: 80
  2543      targetPort: 80`,
  2544  		`
  2545  apiVersion: v1
  2546  kind: Endpoints
  2547  metadata:
  2548    name: remote-service
  2549    namespace: ns
  2550    annotations:
  2551      mirror.linkerd.io/remote-gateway-identity: "gateway-identity-1"
  2552      mirror.linkerd.io/remote-svc-fq-name: "remote-service.svc.default.cluster.local"
  2553    labels:
  2554      mirror.linkerd.io/mirrored-service: "true"
  2555  subsets:
  2556  - addresses:
  2557    - ip: 1.2.3.4
  2558    ports:
  2559    - port: 80`,
  2560  	}
  2561  
  2562  	for _, tt := range []struct {
  2563  		serviceType       string
  2564  		id                ServiceID
  2565  		port              Port
  2566  		newEndpoints      *corev1.Endpoints
  2567  		expectedAddresses []string
  2568  	}{
  2569  		{
  2570  			serviceType:       "will update endpoints if identity is different",
  2571  			id:                ServiceID{Name: "remote-service", Namespace: "ns"},
  2572  			port:              80,
  2573  			newEndpoints:      endpoints("gateway-identity-2"),
  2574  			expectedAddresses: []string{"1.2.3.4:80/gateway-identity-1/remote-service.svc.default.cluster.local:80", "1.2.3.4:80/gateway-identity-2/remote-service.svc.default.cluster.local:80"},
  2575  		},
  2576  
  2577  		{
  2578  			serviceType:       "will not update endpoints if identity is the same",
  2579  			id:                ServiceID{Name: "remote-service", Namespace: "ns"},
  2580  			port:              80,
  2581  			newEndpoints:      endpoints("gateway-identity-1"),
  2582  			expectedAddresses: []string{"1.2.3.4:80/gateway-identity-1/remote-service.svc.default.cluster.local:80"},
  2583  		},
  2584  	} {
  2585  
  2586  		tt := tt // pin
  2587  		t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
  2588  			k8sAPI, err := k8s.NewFakeAPI(k8sConfigs...)
  2589  			if err != nil {
  2590  				t.Fatalf("NewFakeAPI returned an error: %s", err)
  2591  			}
  2592  
  2593  			metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  2594  			if err != nil {
  2595  				t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  2596  			}
  2597  
  2598  			watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
  2599  			if err != nil {
  2600  				t.Fatalf("can't create Endpoints watcher: %s", err)
  2601  			}
  2602  
  2603  			k8sAPI.Sync(nil)
  2604  			metadataAPI.Sync(nil)
  2605  
  2606  			listener := newBufferingEndpointListener()
  2607  
  2608  			err = watcher.Subscribe(tt.id, tt.port, "", listener)
  2609  			if err != nil {
  2610  				t.Fatal(err)
  2611  			}
  2612  
  2613  			k8sAPI.Sync(nil)
  2614  
  2615  			watcher.addEndpoints(tt.newEndpoints)
  2616  
  2617  			listener.ExpectAdded(tt.expectedAddresses, t)
  2618  		})
  2619  	}
  2620  }
  2621  
  2622  func TestPodChangeDetection(t *testing.T) {
  2623  	endpoints := &corev1.Endpoints{
  2624  		TypeMeta: metav1.TypeMeta{
  2625  			Kind:       "Endpoints",
  2626  			APIVersion: "v1",
  2627  		},
  2628  		ObjectMeta: metav1.ObjectMeta{
  2629  			Name:      "name1",
  2630  			Namespace: "ns",
  2631  		},
  2632  		Subsets: []corev1.EndpointSubset{
  2633  			{
  2634  				Addresses: []corev1.EndpointAddress{
  2635  					{
  2636  						IP:       "172.17.0.12",
  2637  						Hostname: "name1-1",
  2638  						TargetRef: &corev1.ObjectReference{
  2639  							Kind:      "Pod",
  2640  							Namespace: "ns",
  2641  							Name:      "name1-1",
  2642  						},
  2643  					},
  2644  				},
  2645  				Ports: []corev1.EndpointPort{
  2646  					{
  2647  						Port: 8989,
  2648  					},
  2649  				},
  2650  			},
  2651  		},
  2652  	}
  2653  
  2654  	k8sConfigs := []string{`
  2655  apiVersion: v1
  2656  kind: Service
  2657  metadata:
  2658    name: name1
  2659    namespace: ns
  2660  spec:
  2661    type: LoadBalancer
  2662    ports:
  2663    - port: 8989`,
  2664  		`
  2665  apiVersion: v1
  2666  kind: Endpoints
  2667  metadata:
  2668    name: name1
  2669    namespace: ns
  2670  subsets:
  2671  - addresses:
  2672    - ip: 172.17.0.12
  2673      hostname: name1-1
  2674      targetRef:
  2675        kind: Pod
  2676        name: name1-1
  2677        namespace: ns
  2678    ports:
  2679    - port: 8989`,
  2680  		`
  2681  apiVersion: v1
  2682  kind: Pod
  2683  metadata:
  2684    name: name1-1
  2685    namespace: ns
  2686    resourceVersion: "1"
  2687  status:
  2688    phase: Running
  2689    podIP: 172.17.0.12`}
  2690  
  2691  	for _, tt := range []struct {
  2692  		serviceType       string
  2693  		id                ServiceID
  2694  		hostname          string
  2695  		port              Port
  2696  		newPod            *corev1.Pod
  2697  		expectedAddresses []string
  2698  	}{
  2699  		{
  2700  			serviceType: "will update pod if resource version is different",
  2701  			id:          ServiceID{Name: "name1", Namespace: "ns"},
  2702  			port:        8989,
  2703  			hostname:    "name1-1",
  2704  			newPod:      testPod("2"),
  2705  
  2706  			expectedAddresses: []string{"172.17.0.12:8989:1", "172.17.0.12:8989:2"},
  2707  		},
  2708  		{
  2709  			serviceType: "will not update pod if resource version is the same",
  2710  			id:          ServiceID{Name: "name1", Namespace: "ns"},
  2711  			port:        8989,
  2712  			hostname:    "name1-1",
  2713  			newPod:      testPod("1"),
  2714  
  2715  			expectedAddresses: []string{"172.17.0.12:8989:1"},
  2716  		},
  2717  	} {
  2718  		tt := tt // pin
  2719  		t.Run("subscribes listener to "+tt.serviceType, func(t *testing.T) {
  2720  			k8sAPI, err := k8s.NewFakeAPI(k8sConfigs...)
  2721  			if err != nil {
  2722  				t.Fatalf("NewFakeAPI returned an error: %s", err)
  2723  			}
  2724  
  2725  			metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  2726  			if err != nil {
  2727  				t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  2728  			}
  2729  
  2730  			watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), false, "local")
  2731  			if err != nil {
  2732  				t.Fatalf("can't create Endpoints watcher: %s", err)
  2733  			}
  2734  
  2735  			k8sAPI.Sync(nil)
  2736  			metadataAPI.Sync(nil)
  2737  
  2738  			listener := newBufferingEndpointListenerWithResVersion()
  2739  
  2740  			err = watcher.Subscribe(tt.id, tt.port, tt.hostname, listener)
  2741  			if err != nil {
  2742  				t.Fatal(err)
  2743  			}
  2744  
  2745  			err = k8sAPI.Pod().Informer().GetStore().Add(tt.newPod)
  2746  			if err != nil {
  2747  				t.Fatal(err)
  2748  			}
  2749  			k8sAPI.Sync(nil)
  2750  
  2751  			watcher.addEndpoints(endpoints)
  2752  			listener.ExpectAdded(tt.expectedAddresses, t)
  2753  		})
  2754  	}
  2755  }
  2756  
  2757  // Test that when an EndpointSlice is scaled down, the EndpointsWatcher sends
  2758  // all of the Remove events, even if the associated pod / workload is no longer available
  2759  // from the API.
  2760  func TestEndpointSliceScaleDown(t *testing.T) {
  2761  	k8sConfigsWithES := []string{`
  2762  kind: APIResourceList
  2763  apiVersion: v1
  2764  groupVersion: discovery.k8s.io/v1
  2765  resources:
  2766  - name: endpointslices
  2767    singularName: endpointslice
  2768    namespaced: true
  2769    kind: EndpointSlice
  2770    verbs:
  2771      - delete
  2772      - deletecollection
  2773      - get
  2774      - list
  2775      - patch
  2776      - create
  2777      - update
  2778      - watch
  2779  `, `
  2780  apiVersion: v1
  2781  kind: Service
  2782  metadata:
  2783    name: name1
  2784    namespace: ns
  2785  spec:
  2786    type: LoadBalancer
  2787    ports:
  2788    - port: 8989`, `
  2789  addressType: IPv4
  2790  apiVersion: discovery.k8s.io/v1
  2791  endpoints:
  2792  - addresses:
  2793    - 172.17.0.12
  2794    conditions:
  2795    ready: true
  2796    targetRef:
  2797      kind: Pod
  2798      name: name1-1
  2799      namespace: ns
  2800    topology:
  2801    kubernetes.io/hostname: node-1
  2802  kind: EndpointSlice
  2803  metadata:
  2804    labels:
  2805      kubernetes.io/service-name: name1
  2806    name: name1-es
  2807    namespace: ns
  2808  ports:
  2809  - name: ""
  2810    port: 8989`, `
  2811  apiVersion: v1
  2812  kind: Pod
  2813  metadata:
  2814    name: name1-1
  2815    namespace: ns
  2816  status:
  2817    phase: Running
  2818    podIP: 172.17.0.12`}
  2819  
  2820  	// Create an EndpointSlice with one endpoint, backed by a pod.
  2821  
  2822  	k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
  2823  	if err != nil {
  2824  		t.Fatalf("NewFakeAPI returned an error: %s", err)
  2825  	}
  2826  
  2827  	metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  2828  	if err != nil {
  2829  		t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  2830  	}
  2831  
  2832  	watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
  2833  	if err != nil {
  2834  		t.Fatalf("can't create Endpoints watcher: %s", err)
  2835  	}
  2836  
  2837  	k8sAPI.Sync(nil)
  2838  	metadataAPI.Sync(nil)
  2839  
  2840  	listener := newBufferingEndpointListener()
  2841  
  2842  	err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
  2843  	if err != nil {
  2844  		t.Fatal(err)
  2845  	}
  2846  
  2847  	k8sAPI.Sync(nil)
  2848  
  2849  	listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)
  2850  
  2851  	// Delete the backing pod and scale the EndpointSlice to 0 endpoints.
  2852  
  2853  	err = k8sAPI.Client.CoreV1().Pods("ns").Delete(context.Background(), "name1-1", metav1.DeleteOptions{})
  2854  	if err != nil {
  2855  		t.Fatal(err)
  2856  	}
  2857  
  2858  	// It may take some time before the pod deletion is recognized by the
  2859  	// lister. We wait until the lister sees the pod as deleted.
  2860  	err = testutil.RetryFor(time.Second*30, func() error {
  2861  		_, err := k8sAPI.Pod().Lister().Pods("ns").Get("name1-1")
  2862  		if kerrors.IsNotFound(err) {
  2863  			return nil
  2864  		}
  2865  		if err == nil {
  2866  			return errors.New("pod should be deleted, but still exists in lister")
  2867  		}
  2868  		return err
  2869  	})
  2870  	if err != nil {
  2871  		t.Fatal(err)
  2872  	}
  2873  
  2874  	ES, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
  2875  	if err != nil {
  2876  		t.Fatal(err)
  2877  	}
  2878  
  2879  	emptyES := &dv1.EndpointSlice{
  2880  		AddressType: "IPv4",
  2881  		ObjectMeta: metav1.ObjectMeta{
  2882  			Name: "name1-es", Namespace: "ns",
  2883  			Labels: map[string]string{dv1.LabelServiceName: "name1"},
  2884  		},
  2885  		Endpoints: []dv1.Endpoint{},
  2886  		Ports:     []dv1.EndpointPort{},
  2887  	}
  2888  
  2889  	watcher.updateEndpointSlice(ES, emptyES)
  2890  
  2891  	// Ensure the watcher emits a remove event.
  2892  
  2893  	listener.ExpectRemoved([]string{"172.17.0.12:8989"}, t)
  2894  }
  2895  
  2896  // Test that when an endpointslice's endpoints change their readiness status to
  2897  // not ready, this is correctly picked up by the subscribers
  2898  func TestEndpointSliceChangeNotReady(t *testing.T) {
  2899  	k8sConfigsWithES := []string{`
  2900  kind: APIResourceList
  2901  apiVersion: v1
  2902  groupVersion: discovery.k8s.io/v1
  2903  resources:
  2904  - name: endpointslices
  2905    singularName: endpointslice
  2906    namespaced: true
  2907    kind: EndpointSlice
  2908    verbs:
  2909      - delete
  2910      - deletecollection
  2911      - get
  2912      - list
  2913      - patch
  2914      - create
  2915      - update
  2916      - watch
  2917  `, `
  2918  apiVersion: v1
  2919  kind: Service
  2920  metadata:
  2921    name: name1
  2922    namespace: ns
  2923  spec:
  2924    type: LoadBalancer
  2925    ports:
  2926    - port: 8989`, `
  2927  addressType: IPv4
  2928  apiVersion: discovery.k8s.io/v1
  2929  endpoints:
  2930  - addresses:
  2931    - 172.17.0.12
  2932    conditions:
  2933      ready: true
  2934    targetRef:
  2935      kind: Pod
  2936      name: name1-1
  2937      namespace: ns
  2938  - addresses:
  2939    - 192.0.2.0
  2940    conditions:
  2941      ready: true
  2942    targetRef:
  2943      kind: ExternalWorkload
  2944      name: wlkd1
  2945      namespace: ns
  2946    topology:
  2947      kubernetes.io/hostname: node-1
  2948  kind: EndpointSlice
  2949  metadata:
  2950    labels:
  2951      kubernetes.io/service-name: name1
  2952    name: name1-es
  2953    namespace: ns
  2954  ports:
  2955  - name: ""
  2956    port: 8989`, `
  2957  apiVersion: v1
  2958  kind: Pod
  2959  metadata:
  2960    name: name1-1
  2961    namespace: ns
  2962  status:
  2963    phase: Running
  2964    podIP: 172.17.0.12`, `
  2965  apiVersion: workload.linkerd.io/v1beta1
  2966  kind: ExternalWorkload
  2967  metadata:
  2968    name: wlkd1
  2969    namespace: ns
  2970  spec:
  2971    meshTLS:
  2972      identity: foo
  2973      serverName: foo
  2974    ports:
  2975    - port: 8989
  2976    workloadIPs:
  2977    - ip: 192.0.2.0
  2978  status:
  2979    conditions:
  2980    - type: Ready
  2981      status: "True"
  2982  `,
  2983  	}
  2984  
  2985  	k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
  2986  	if err != nil {
  2987  		t.Fatalf("NewFakeAPI returned an error: %s", err)
  2988  	}
  2989  
  2990  	metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  2991  	if err != nil {
  2992  		t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  2993  	}
  2994  
  2995  	watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
  2996  	if err != nil {
  2997  		t.Fatalf("can't create Endpoints watcher: %s", err)
  2998  	}
  2999  
  3000  	k8sAPI.Sync(nil)
  3001  	metadataAPI.Sync(nil)
  3002  
  3003  	listener := newBufferingEndpointListener()
  3004  
  3005  	err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
  3006  	if err != nil {
  3007  		t.Fatal(err)
  3008  	}
  3009  
  3010  	k8sAPI.Sync(nil)
  3011  	metadataAPI.Sync(nil)
  3012  
  3013  	listener.ExpectAdded([]string{"172.17.0.12:8989", "192.0.2.0:8989"}, t)
  3014  
  3015  	// Change readiness status for pod and for external workload
  3016  	es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
  3017  	if err != nil {
  3018  		t.Fatal(err)
  3019  	}
  3020  
  3021  	unready := false
  3022  	es.Endpoints[0].Conditions.Ready = &unready
  3023  	es.Endpoints[1].Conditions.Ready = &unready
  3024  
  3025  	_, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
  3026  	if err != nil {
  3027  		t.Fatal(err)
  3028  	}
  3029  
  3030  	k8sAPI.Sync(nil)
  3031  	metadataAPI.Sync(nil)
  3032  
  3033  	// Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on
  3034  	time.Sleep(50 * time.Millisecond)
  3035  
  3036  	listener.ExpectRemoved([]string{"172.17.0.12:8989", "192.0.2.0:8989"}, t)
  3037  }
  3038  
  3039  // Test that when an endpointslice's endpoints change their readiness status to
  3040  // ready, this is correctly picked up by the subscribers
  3041  func TestEndpointSliceChangeToReady(t *testing.T) {
  3042  	k8sConfigsWithES := []string{`
  3043  kind: APIResourceList
  3044  apiVersion: v1
  3045  groupVersion: discovery.k8s.io/v1
  3046  resources:
  3047  - name: endpointslices
  3048    singularName: endpointslice
  3049    namespaced: true
  3050    kind: EndpointSlice
  3051    verbs:
  3052      - delete
  3053      - deletecollection
  3054      - get
  3055      - list
  3056      - patch
  3057      - create
  3058      - update
  3059      - watch
  3060  `, `
  3061  apiVersion: v1
  3062  kind: Service
  3063  metadata:
  3064    name: name1
  3065    namespace: ns
  3066  spec:
  3067    type: LoadBalancer
  3068    ports:
  3069    - port: 8989`, `
  3070  addressType: IPv4
  3071  apiVersion: discovery.k8s.io/v1
  3072  endpoints:
  3073  - addresses:
  3074    - 172.17.0.12
  3075    conditions:
  3076      ready: true
  3077    targetRef:
  3078      kind: Pod
  3079      name: name1-1
  3080      namespace: ns
  3081  - addresses:
  3082    - 172.17.0.13
  3083    conditions:
  3084      ready: false
  3085    targetRef:
  3086      kind: Pod
  3087      name: name1-2
  3088      namespace: ns
  3089  - addresses:
  3090    - 192.0.2.0
  3091    conditions:
  3092      ready: true
  3093    targetRef:
  3094      kind: ExternalWorkload
  3095      name: wlkd1
  3096      namespace: ns
  3097    topology:
  3098      kubernetes.io/hostname: node-1
  3099  - addresses:
  3100    - 192.0.2.1
  3101    conditions:
  3102      ready: false
  3103    targetRef:
  3104      kind: ExternalWorkload
  3105      name: wlkd2
  3106      namespace: ns
  3107    topology:
  3108      kubernetes.io/hostname: node-1
  3109  kind: EndpointSlice
  3110  metadata:
  3111    labels:
  3112      kubernetes.io/service-name: name1
  3113    name: name1-es
  3114    namespace: ns
  3115  ports:
  3116  - name: ""
  3117    port: 8989`, `
  3118  apiVersion: v1
  3119  kind: Pod
  3120  metadata:
  3121    name: name1-1
  3122    namespace: ns
  3123  status:
  3124    phase: Running
  3125    podIP: 172.17.0.12`, `
  3126  apiVersion: v1
  3127  kind: Pod
  3128  metadata:
  3129    name: name1-2
  3130    namespace: ns
  3131  status:
  3132    phase: Running
  3133    podIP: 172.17.0.13`, `
  3134  apiVersion: workload.linkerd.io/v1beta1
  3135  kind: ExternalWorkload
  3136  metadata:
  3137    name: wlkd1
  3138    namespace: ns
  3139  spec:
  3140    meshTLS:
  3141      identity: foo
  3142      serverName: foo
  3143    ports:
  3144    - port: 8989
  3145    workloadIPs:
  3146    - ip: 192.0.2.0
  3147  status:
  3148    conditions:
  3149    - type: Ready
  3150      status: "True"
  3151  `, `
  3152  apiVersion: workload.linkerd.io/v1beta1
  3153  kind: ExternalWorkload
  3154  metadata:
  3155    name: wlkd2
  3156    namespace: ns
  3157  spec:
  3158    meshTLS:
  3159      identity: foo
  3160      serverName: foo
  3161    ports:
  3162    - port: 8989
  3163    workloadIPs:
  3164    - ip: 192.0.2.1
  3165  status:
  3166    conditions:
  3167    - type: Ready
  3168      status: "True"
  3169  `,
  3170  	}
  3171  
  3172  	k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
  3173  	if err != nil {
  3174  		t.Fatalf("NewFakeAPI returned an error: %s", err)
  3175  	}
  3176  
  3177  	metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  3178  	if err != nil {
  3179  		t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  3180  	}
  3181  
  3182  	watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
  3183  	if err != nil {
  3184  		t.Fatalf("can't create Endpoints watcher: %s", err)
  3185  	}
  3186  
  3187  	k8sAPI.Sync(nil)
  3188  	metadataAPI.Sync(nil)
  3189  
  3190  	listener := newBufferingEndpointListener()
  3191  
  3192  	err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
  3193  	if err != nil {
  3194  		t.Fatal(err)
  3195  	}
  3196  
  3197  	k8sAPI.Sync(nil)
  3198  	metadataAPI.Sync(nil)
  3199  
  3200  	// Expect only two endpoints to be added, the rest are not ready
  3201  	listener.ExpectAdded([]string{"172.17.0.12:8989", "192.0.2.0:8989"}, t)
  3202  
  3203  	es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
  3204  	if err != nil {
  3205  		t.Fatal(err)
  3206  	}
  3207  
  3208  	// Change readiness status for pod and for external workload only if they
  3209  	// are unready
  3210  	rdy := true
  3211  	es.Endpoints[1].Conditions.Ready = &rdy
  3212  	es.Endpoints[3].Conditions.Ready = &rdy
  3213  
  3214  	_, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
  3215  	if err != nil {
  3216  		t.Fatal(err)
  3217  	}
  3218  
  3219  	k8sAPI.Sync(nil)
  3220  	metadataAPI.Sync(nil)
  3221  
  3222  	// Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on
  3223  	time.Sleep(50 * time.Millisecond)
  3224  
  3225  	listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.13:8989", "192.0.2.0:8989", "192.0.2.1:8989"}, t)
  3226  
  3227  }
  3228  
  3229  // Test that when an endpointslice gets a hint added, then mark it as a change
  3230  func TestEndpointSliceAddHints(t *testing.T) {
  3231  	k8sConfigsWithES := []string{`
  3232  kind: APIResourceList
  3233  apiVersion: v1
  3234  groupVersion: discovery.k8s.io/v1
  3235  resources:
  3236  - name: endpointslices
  3237    singularName: endpointslice
  3238    namespaced: true
  3239    kind: EndpointSlice
  3240    verbs:
  3241      - delete
  3242      - deletecollection
  3243      - get
  3244      - list
  3245      - patch
  3246      - create
  3247      - update
  3248      - watch
  3249  `, `
  3250  apiVersion: v1
  3251  kind: Service
  3252  metadata:
  3253    name: name1
  3254    namespace: ns
  3255  spec:
  3256    type: LoadBalancer
  3257    ports:
  3258    - port: 8989`, `
  3259  addressType: IPv4
  3260  apiVersion: discovery.k8s.io/v1
  3261  endpoints:
  3262  - addresses:
  3263    - 172.17.0.12
  3264    conditions:
  3265    ready: true
  3266    targetRef:
  3267      kind: Pod
  3268      name: name1-1
  3269      namespace: ns
  3270    topology:
  3271      kubernetes.io/hostname: node-1
  3272  kind: EndpointSlice
  3273  metadata:
  3274    labels:
  3275      kubernetes.io/service-name: name1
  3276    name: name1-es
  3277    namespace: ns
  3278  ports:
  3279  - name: ""
  3280    port: 8989`, `
  3281  apiVersion: v1
  3282  kind: Pod
  3283  metadata:
  3284    name: name1-1
  3285    namespace: ns
  3286  status:
  3287    phase: Running
  3288    podIP: 172.17.0.12`}
  3289  
  3290  	// Create an EndpointSlice with one endpoint, backed by a pod.
  3291  
  3292  	k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
  3293  	if err != nil {
  3294  		t.Fatalf("NewFakeAPI returned an error: %s", err)
  3295  	}
  3296  
  3297  	metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  3298  	if err != nil {
  3299  		t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  3300  	}
  3301  
  3302  	watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
  3303  	if err != nil {
  3304  		t.Fatalf("can't create Endpoints watcher: %s", err)
  3305  	}
  3306  
  3307  	k8sAPI.Sync(nil)
  3308  	metadataAPI.Sync(nil)
  3309  
  3310  	listener := newBufferingEndpointListener()
  3311  
  3312  	err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
  3313  	if err != nil {
  3314  		t.Fatal(err)
  3315  	}
  3316  
  3317  	k8sAPI.Sync(nil)
  3318  	metadataAPI.Sync(nil)
  3319  
  3320  	listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)
  3321  
  3322  	// Add a hint to the EndpointSlice
  3323  	es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
  3324  	if err != nil {
  3325  		t.Fatal(err)
  3326  	}
  3327  
  3328  	es.Endpoints[0].Hints = &dv1.EndpointHints{
  3329  		ForZones: []dv1.ForZone{{Name: "zone1"}},
  3330  	}
  3331  
  3332  	_, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
  3333  	if err != nil {
  3334  		t.Fatal(err)
  3335  	}
  3336  
  3337  	k8sAPI.Sync(nil)
  3338  	metadataAPI.Sync(nil)
  3339  
  3340  	// Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on
  3341  	time.Sleep(50 * time.Millisecond)
  3342  
  3343  	listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.12:8989"}, t)
  3344  }
  3345  
  3346  // Test that when an endpointslice loses a hint, then mark it as a change
  3347  func TestEndpointSliceRemoveHints(t *testing.T) {
  3348  	k8sConfigsWithES := []string{`
  3349  kind: APIResourceList
  3350  apiVersion: v1
  3351  groupVersion: discovery.k8s.io/v1
  3352  resources:
  3353  - name: endpointslices
  3354    singularName: endpointslice
  3355    namespaced: true
  3356    kind: EndpointSlice
  3357    verbs:
  3358      - delete
  3359      - deletecollection
  3360      - get
  3361      - list
  3362      - patch
  3363      - create
  3364      - update
  3365      - watch
  3366  `, `
  3367  apiVersion: v1
  3368  kind: Service
  3369  metadata:
  3370    name: name1
  3371    namespace: ns
  3372  spec:
  3373    type: LoadBalancer
  3374    ports:
  3375    - port: 8989`, `
  3376  addressType: IPv4
  3377  apiVersion: discovery.k8s.io/v1
  3378  endpoints:
  3379  - addresses:
  3380    - 172.17.0.12
  3381    conditions:
  3382    hints:
  3383      forZones:
  3384      - name: zone1
  3385    ready: true
  3386    targetRef:
  3387      kind: Pod
  3388      name: name1-1
  3389      namespace: ns
  3390    topology:
  3391      kubernetes.io/hostname: node-1
  3392  kind: EndpointSlice
  3393  metadata:
  3394    labels:
  3395      kubernetes.io/service-name: name1
  3396    name: name1-es
  3397    namespace: ns
  3398  ports:
  3399  - name: ""
  3400    port: 8989`, `
  3401  apiVersion: v1
  3402  kind: Pod
  3403  metadata:
  3404    name: name1-1
  3405    namespace: ns
  3406  status:
  3407    phase: Running
  3408    podIP: 172.17.0.12`}
  3409  
  3410  	// Create an EndpointSlice with one endpoint, backed by a pod.
  3411  
  3412  	k8sAPI, err := k8s.NewFakeAPI(k8sConfigsWithES...)
  3413  	if err != nil {
  3414  		t.Fatalf("NewFakeAPI returned an error: %s", err)
  3415  	}
  3416  
  3417  	metadataAPI, err := k8s.NewFakeMetadataAPI(nil)
  3418  	if err != nil {
  3419  		t.Fatalf("NewFakeMetadataAPI returned an error: %s", err)
  3420  	}
  3421  
  3422  	watcher, err := NewEndpointsWatcher(k8sAPI, metadataAPI, logging.WithField("test", t.Name()), true, "local")
  3423  	if err != nil {
  3424  		t.Fatalf("can't create Endpoints watcher: %s", err)
  3425  	}
  3426  
  3427  	k8sAPI.Sync(nil)
  3428  	metadataAPI.Sync(nil)
  3429  
  3430  	listener := newBufferingEndpointListener()
  3431  
  3432  	err = watcher.Subscribe(ServiceID{Name: "name1", Namespace: "ns"}, 8989, "", listener)
  3433  	if err != nil {
  3434  		t.Fatal(err)
  3435  	}
  3436  
  3437  	k8sAPI.Sync(nil)
  3438  	metadataAPI.Sync(nil)
  3439  
  3440  	listener.ExpectAdded([]string{"172.17.0.12:8989"}, t)
  3441  
  3442  	// Remove a hint from the EndpointSlice
  3443  	es, err := k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Get(context.Background(), "name1-es", metav1.GetOptions{})
  3444  	if err != nil {
  3445  		t.Fatal(err)
  3446  	}
  3447  
  3448  	es.Endpoints[0].Hints = &dv1.EndpointHints{
  3449  		//ForZones: []dv1.ForZone{{Name: "zone1"}},
  3450  	}
  3451  
  3452  	_, err = k8sAPI.Client.DiscoveryV1().EndpointSlices("ns").Update(context.Background(), es, metav1.UpdateOptions{})
  3453  	if err != nil {
  3454  		t.Fatal(err)
  3455  	}
  3456  
  3457  	k8sAPI.Sync(nil)
  3458  	metadataAPI.Sync(nil)
  3459  
  3460  	// Wait for the update to be processed because there is no blocking call currently in k8s that we can wait on
  3461  	time.Sleep(50 * time.Millisecond)
  3462  
  3463  	listener.ExpectAdded([]string{"172.17.0.12:8989", "172.17.0.12:8989"}, t)
  3464  }
  3465  

View as plain text