...

Source file src/k8s.io/kubernetes/pkg/proxy/config/config_test.go

Documentation: k8s.io/kubernetes/pkg/proxy/config

     1  /*
     2  Copyright 2014 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package config
    18  
    19  import (
    20  	"reflect"
    21  	"sort"
    22  	"sync"
    23  	"testing"
    24  	"time"
    25  
    26  	"k8s.io/api/core/v1"
    27  	discoveryv1 "k8s.io/api/discovery/v1"
    28  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    29  	"k8s.io/apimachinery/pkg/types"
    30  	"k8s.io/apimachinery/pkg/util/wait"
    31  	"k8s.io/apimachinery/pkg/watch"
    32  	informers "k8s.io/client-go/informers"
    33  	"k8s.io/client-go/kubernetes/fake"
    34  	ktesting "k8s.io/client-go/testing"
    35  	"k8s.io/utils/ptr"
    36  )
    37  
    38  type sortedServices []*v1.Service
    39  
    40  func (s sortedServices) Len() int {
    41  	return len(s)
    42  }
    43  func (s sortedServices) Swap(i, j int) {
    44  	s[i], s[j] = s[j], s[i]
    45  }
    46  func (s sortedServices) Less(i, j int) bool {
    47  	return s[i].Name < s[j].Name
    48  }
    49  
    50  type ServiceHandlerMock struct {
    51  	lock sync.Mutex
    52  
    53  	state   map[types.NamespacedName]*v1.Service
    54  	synced  bool
    55  	updated chan []*v1.Service
    56  	process func([]*v1.Service)
    57  }
    58  
    59  func NewServiceHandlerMock() *ServiceHandlerMock {
    60  	shm := &ServiceHandlerMock{
    61  		state:   make(map[types.NamespacedName]*v1.Service),
    62  		updated: make(chan []*v1.Service, 5),
    63  	}
    64  	shm.process = func(services []*v1.Service) {
    65  		shm.updated <- services
    66  	}
    67  	return shm
    68  }
    69  
    70  func (h *ServiceHandlerMock) OnServiceAdd(service *v1.Service) {
    71  	h.lock.Lock()
    72  	defer h.lock.Unlock()
    73  	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
    74  	h.state[namespacedName] = service
    75  	h.sendServices()
    76  }
    77  
    78  func (h *ServiceHandlerMock) OnServiceUpdate(oldService, service *v1.Service) {
    79  	h.lock.Lock()
    80  	defer h.lock.Unlock()
    81  	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
    82  	h.state[namespacedName] = service
    83  	h.sendServices()
    84  }
    85  
    86  func (h *ServiceHandlerMock) OnServiceDelete(service *v1.Service) {
    87  	h.lock.Lock()
    88  	defer h.lock.Unlock()
    89  	namespacedName := types.NamespacedName{Namespace: service.Namespace, Name: service.Name}
    90  	delete(h.state, namespacedName)
    91  	h.sendServices()
    92  }
    93  
    94  func (h *ServiceHandlerMock) OnServiceSynced() {
    95  	h.lock.Lock()
    96  	defer h.lock.Unlock()
    97  	h.synced = true
    98  	h.sendServices()
    99  }
   100  
   101  func (h *ServiceHandlerMock) sendServices() {
   102  	if !h.synced {
   103  		return
   104  	}
   105  	services := make([]*v1.Service, 0, len(h.state))
   106  	for _, svc := range h.state {
   107  		services = append(services, svc)
   108  	}
   109  	sort.Sort(sortedServices(services))
   110  	h.process(services)
   111  }
   112  
   113  func (h *ServiceHandlerMock) ValidateServices(t *testing.T, expectedServices []*v1.Service) {
   114  	// We might get 1 or more updates for N service updates, because we
   115  	// over write older snapshots of services from the producer go-routine
   116  	// if the consumer falls behind.
   117  	var services []*v1.Service
   118  	for {
   119  		select {
   120  		case services = <-h.updated:
   121  			if reflect.DeepEqual(services, expectedServices) {
   122  				return
   123  			}
   124  		// Unittests will hard timeout in 5m with a stack trace, prevent that
   125  		// and surface a clearer reason for failure.
   126  		case <-time.After(wait.ForeverTestTimeout):
   127  			t.Errorf("Timed out. Expected %#v, Got %#v", expectedServices, services)
   128  			return
   129  		}
   130  	}
   131  }
   132  
   133  type sortedEndpointSlices []*discoveryv1.EndpointSlice
   134  
   135  func (s sortedEndpointSlices) Len() int {
   136  	return len(s)
   137  }
   138  func (s sortedEndpointSlices) Swap(i, j int) {
   139  	s[i], s[j] = s[j], s[i]
   140  }
   141  func (s sortedEndpointSlices) Less(i, j int) bool {
   142  	return s[i].Name < s[j].Name
   143  }
   144  
   145  type EndpointSliceHandlerMock struct {
   146  	lock sync.Mutex
   147  
   148  	state   map[types.NamespacedName]*discoveryv1.EndpointSlice
   149  	synced  bool
   150  	updated chan []*discoveryv1.EndpointSlice
   151  	process func([]*discoveryv1.EndpointSlice)
   152  }
   153  
   154  func NewEndpointSliceHandlerMock() *EndpointSliceHandlerMock {
   155  	ehm := &EndpointSliceHandlerMock{
   156  		state:   make(map[types.NamespacedName]*discoveryv1.EndpointSlice),
   157  		updated: make(chan []*discoveryv1.EndpointSlice, 5),
   158  	}
   159  	ehm.process = func(endpoints []*discoveryv1.EndpointSlice) {
   160  		ehm.updated <- endpoints
   161  	}
   162  	return ehm
   163  }
   164  
   165  func (h *EndpointSliceHandlerMock) OnEndpointSliceAdd(slice *discoveryv1.EndpointSlice) {
   166  	h.lock.Lock()
   167  	defer h.lock.Unlock()
   168  	namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name}
   169  	h.state[namespacedName] = slice
   170  	h.sendEndpointSlices()
   171  }
   172  
   173  func (h *EndpointSliceHandlerMock) OnEndpointSliceUpdate(oldSlice, slice *discoveryv1.EndpointSlice) {
   174  	h.lock.Lock()
   175  	defer h.lock.Unlock()
   176  	namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name}
   177  	h.state[namespacedName] = slice
   178  	h.sendEndpointSlices()
   179  }
   180  
   181  func (h *EndpointSliceHandlerMock) OnEndpointSliceDelete(slice *discoveryv1.EndpointSlice) {
   182  	h.lock.Lock()
   183  	defer h.lock.Unlock()
   184  	namespacedName := types.NamespacedName{Namespace: slice.Namespace, Name: slice.Name}
   185  	delete(h.state, namespacedName)
   186  	h.sendEndpointSlices()
   187  }
   188  
   189  func (h *EndpointSliceHandlerMock) OnEndpointSlicesSynced() {
   190  	h.lock.Lock()
   191  	defer h.lock.Unlock()
   192  	h.synced = true
   193  	h.sendEndpointSlices()
   194  }
   195  
   196  func (h *EndpointSliceHandlerMock) sendEndpointSlices() {
   197  	if !h.synced {
   198  		return
   199  	}
   200  	slices := make([]*discoveryv1.EndpointSlice, 0, len(h.state))
   201  	for _, eps := range h.state {
   202  		slices = append(slices, eps)
   203  	}
   204  	sort.Sort(sortedEndpointSlices(slices))
   205  	h.process(slices)
   206  }
   207  
   208  func (h *EndpointSliceHandlerMock) ValidateEndpointSlices(t *testing.T, expectedSlices []*discoveryv1.EndpointSlice) {
   209  	// We might get 1 or more updates for N endpointslice updates, because we
   210  	// over write older snapshots of endpointslices from the producer go-routine
   211  	// if the consumer falls behind. Unittests will hard timeout in 5m.
   212  	var slices []*discoveryv1.EndpointSlice
   213  	for {
   214  		select {
   215  		case slices = <-h.updated:
   216  			if reflect.DeepEqual(slices, expectedSlices) {
   217  				return
   218  			}
   219  		// Unittests will hard timeout in 5m with a stack trace, prevent that
   220  		// and surface a clearer reason for failure.
   221  		case <-time.After(wait.ForeverTestTimeout):
   222  			t.Errorf("Timed out. Expected %#v, Got %#v", expectedSlices, slices)
   223  			return
   224  		}
   225  	}
   226  }
   227  
   228  func TestNewServiceAddedAndNotified(t *testing.T) {
   229  	client := fake.NewSimpleClientset()
   230  	fakeWatch := watch.NewFake()
   231  	client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
   232  
   233  	stopCh := make(chan struct{})
   234  	defer close(stopCh)
   235  
   236  	sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
   237  
   238  	config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
   239  	handler := NewServiceHandlerMock()
   240  	config.RegisterEventHandler(handler)
   241  	go sharedInformers.Start(stopCh)
   242  	go config.Run(stopCh)
   243  
   244  	service := &v1.Service{
   245  		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
   246  		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
   247  	}
   248  	fakeWatch.Add(service)
   249  	handler.ValidateServices(t, []*v1.Service{service})
   250  }
   251  
   252  func TestServiceAddedRemovedSetAndNotified(t *testing.T) {
   253  	client := fake.NewSimpleClientset()
   254  	fakeWatch := watch.NewFake()
   255  	client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
   256  
   257  	stopCh := make(chan struct{})
   258  	defer close(stopCh)
   259  
   260  	sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
   261  
   262  	config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
   263  	handler := NewServiceHandlerMock()
   264  	config.RegisterEventHandler(handler)
   265  	go sharedInformers.Start(stopCh)
   266  	go config.Run(stopCh)
   267  
   268  	service1 := &v1.Service{
   269  		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
   270  		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
   271  	}
   272  	fakeWatch.Add(service1)
   273  	handler.ValidateServices(t, []*v1.Service{service1})
   274  
   275  	service2 := &v1.Service{
   276  		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
   277  		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 20}}},
   278  	}
   279  	fakeWatch.Add(service2)
   280  	services := []*v1.Service{service2, service1}
   281  	handler.ValidateServices(t, services)
   282  
   283  	fakeWatch.Delete(service1)
   284  	services = []*v1.Service{service2}
   285  	handler.ValidateServices(t, services)
   286  }
   287  
   288  func TestNewServicesMultipleHandlersAddedAndNotified(t *testing.T) {
   289  	client := fake.NewSimpleClientset()
   290  	fakeWatch := watch.NewFake()
   291  	client.PrependWatchReactor("services", ktesting.DefaultWatchReactor(fakeWatch, nil))
   292  
   293  	stopCh := make(chan struct{})
   294  	defer close(stopCh)
   295  
   296  	sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
   297  
   298  	config := NewServiceConfig(sharedInformers.Core().V1().Services(), time.Minute)
   299  	handler := NewServiceHandlerMock()
   300  	handler2 := NewServiceHandlerMock()
   301  	config.RegisterEventHandler(handler)
   302  	config.RegisterEventHandler(handler2)
   303  	go sharedInformers.Start(stopCh)
   304  	go config.Run(stopCh)
   305  
   306  	service1 := &v1.Service{
   307  		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
   308  		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 10}}},
   309  	}
   310  	service2 := &v1.Service{
   311  		ObjectMeta: metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
   312  		Spec:       v1.ServiceSpec{Ports: []v1.ServicePort{{Protocol: "TCP", Port: 20}}},
   313  	}
   314  	fakeWatch.Add(service1)
   315  	fakeWatch.Add(service2)
   316  
   317  	services := []*v1.Service{service2, service1}
   318  	handler.ValidateServices(t, services)
   319  	handler2.ValidateServices(t, services)
   320  }
   321  
   322  func TestNewEndpointsMultipleHandlersAddedAndNotified(t *testing.T) {
   323  	client := fake.NewSimpleClientset()
   324  	fakeWatch := watch.NewFake()
   325  	client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
   326  
   327  	stopCh := make(chan struct{})
   328  	defer close(stopCh)
   329  
   330  	sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
   331  
   332  	config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
   333  	handler := NewEndpointSliceHandlerMock()
   334  	handler2 := NewEndpointSliceHandlerMock()
   335  	config.RegisterEventHandler(handler)
   336  	config.RegisterEventHandler(handler2)
   337  	go sharedInformers.Start(stopCh)
   338  	go config.Run(stopCh)
   339  
   340  	endpoints1 := &discoveryv1.EndpointSlice{
   341  		ObjectMeta:  metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
   342  		AddressType: discoveryv1.AddressTypeIPv4,
   343  		Endpoints: []discoveryv1.Endpoint{{
   344  			Addresses: []string{"1.1.1.1"},
   345  		}, {
   346  			Addresses: []string{"2.2.2.2"},
   347  		}},
   348  		Ports: []discoveryv1.EndpointPort{{Port: ptr.To[int32](80)}},
   349  	}
   350  	endpoints2 := &discoveryv1.EndpointSlice{
   351  		ObjectMeta:  metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
   352  		AddressType: discoveryv1.AddressTypeIPv4,
   353  		Endpoints: []discoveryv1.Endpoint{{
   354  			Addresses: []string{"3.3.3.3"},
   355  		}, {
   356  			Addresses: []string{"4.4.4.4"},
   357  		}},
   358  		Ports: []discoveryv1.EndpointPort{{Port: ptr.To[int32](80)}},
   359  	}
   360  	fakeWatch.Add(endpoints1)
   361  	fakeWatch.Add(endpoints2)
   362  
   363  	endpoints := []*discoveryv1.EndpointSlice{endpoints2, endpoints1}
   364  	handler.ValidateEndpointSlices(t, endpoints)
   365  	handler2.ValidateEndpointSlices(t, endpoints)
   366  }
   367  
   368  func TestNewEndpointsMultipleHandlersAddRemoveSetAndNotified(t *testing.T) {
   369  	client := fake.NewSimpleClientset()
   370  	fakeWatch := watch.NewFake()
   371  	client.PrependWatchReactor("endpointslices", ktesting.DefaultWatchReactor(fakeWatch, nil))
   372  
   373  	stopCh := make(chan struct{})
   374  	defer close(stopCh)
   375  
   376  	sharedInformers := informers.NewSharedInformerFactory(client, time.Minute)
   377  
   378  	config := NewEndpointSliceConfig(sharedInformers.Discovery().V1().EndpointSlices(), time.Minute)
   379  	handler := NewEndpointSliceHandlerMock()
   380  	handler2 := NewEndpointSliceHandlerMock()
   381  	config.RegisterEventHandler(handler)
   382  	config.RegisterEventHandler(handler2)
   383  	go sharedInformers.Start(stopCh)
   384  	go config.Run(stopCh)
   385  
   386  	endpoints1 := &discoveryv1.EndpointSlice{
   387  		ObjectMeta:  metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
   388  		AddressType: discoveryv1.AddressTypeIPv4,
   389  		Endpoints: []discoveryv1.Endpoint{{
   390  			Addresses: []string{"1.1.1.1"},
   391  		}, {
   392  			Addresses: []string{"2.2.2.2"},
   393  		}},
   394  		Ports: []discoveryv1.EndpointPort{{Port: ptr.To[int32](80)}},
   395  	}
   396  	endpoints2 := &discoveryv1.EndpointSlice{
   397  		ObjectMeta:  metav1.ObjectMeta{Namespace: "testnamespace", Name: "bar"},
   398  		AddressType: discoveryv1.AddressTypeIPv4,
   399  		Endpoints: []discoveryv1.Endpoint{{
   400  			Addresses: []string{"3.3.3.3"},
   401  		}, {
   402  			Addresses: []string{"4.4.4.4"},
   403  		}},
   404  		Ports: []discoveryv1.EndpointPort{{Port: ptr.To[int32](80)}},
   405  	}
   406  	fakeWatch.Add(endpoints1)
   407  	fakeWatch.Add(endpoints2)
   408  
   409  	endpoints := []*discoveryv1.EndpointSlice{endpoints2, endpoints1}
   410  	handler.ValidateEndpointSlices(t, endpoints)
   411  	handler2.ValidateEndpointSlices(t, endpoints)
   412  
   413  	// Add one more
   414  	endpoints3 := &discoveryv1.EndpointSlice{
   415  		ObjectMeta:  metav1.ObjectMeta{Namespace: "testnamespace", Name: "foobar"},
   416  		AddressType: discoveryv1.AddressTypeIPv4,
   417  		Endpoints: []discoveryv1.Endpoint{{
   418  			Addresses: []string{"5.5.5.5"},
   419  		}, {
   420  			Addresses: []string{"6.6.6.6"},
   421  		}},
   422  		Ports: []discoveryv1.EndpointPort{{Port: ptr.To[int32](80)}},
   423  	}
   424  	fakeWatch.Add(endpoints3)
   425  	endpoints = []*discoveryv1.EndpointSlice{endpoints2, endpoints1, endpoints3}
   426  	handler.ValidateEndpointSlices(t, endpoints)
   427  	handler2.ValidateEndpointSlices(t, endpoints)
   428  
   429  	// Update the "foo" service with new endpoints
   430  	endpoints1v2 := &discoveryv1.EndpointSlice{
   431  		ObjectMeta:  metav1.ObjectMeta{Namespace: "testnamespace", Name: "foo"},
   432  		AddressType: discoveryv1.AddressTypeIPv4,
   433  		Endpoints: []discoveryv1.Endpoint{{
   434  			Addresses: []string{"7.7.7.7"},
   435  		}},
   436  		Ports: []discoveryv1.EndpointPort{{Port: ptr.To[int32](80)}},
   437  	}
   438  	fakeWatch.Modify(endpoints1v2)
   439  	endpoints = []*discoveryv1.EndpointSlice{endpoints2, endpoints1v2, endpoints3}
   440  	handler.ValidateEndpointSlices(t, endpoints)
   441  	handler2.ValidateEndpointSlices(t, endpoints)
   442  
   443  	// Remove "bar" endpoints
   444  	fakeWatch.Delete(endpoints2)
   445  	endpoints = []*discoveryv1.EndpointSlice{endpoints1v2, endpoints3}
   446  	handler.ValidateEndpointSlices(t, endpoints)
   447  	handler2.ValidateEndpointSlices(t, endpoints)
   448  }
   449  
   450  // TODO: Add a unittest for interrupts getting processed in a timely manner.
   451  // Currently this module has a circular dependency with config, and so it's
   452  // named config_test, which means even test methods need to be public. This
   453  // is refactoring that we can avoid by resolving the dependency.
   454  

View as plain text