...

Source file src/k8s.io/kubernetes/pkg/controlplane/reconcilers/lease_test.go

Documentation: k8s.io/kubernetes/pkg/controlplane/reconcilers

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package reconcilers
    18  
    19  /*
    20  Original Source:
    21  https://github.com/openshift/origin/blob/bb340c5dd5ff72718be86fb194dedc0faed7f4c7/pkg/cmd/server/election/lease_endpoint_reconciler_test.go
    22  */
    23  
    24  import (
    25  	"reflect"
    26  	"sort"
    27  	"testing"
    28  	"time"
    29  
    30  	"github.com/google/uuid"
    31  	corev1 "k8s.io/api/core/v1"
    32  	"k8s.io/apimachinery/pkg/api/apitesting"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	"k8s.io/apimachinery/pkg/runtime"
    35  	"k8s.io/apimachinery/pkg/runtime/schema"
    36  	"k8s.io/apimachinery/pkg/runtime/serializer"
    37  	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
    38  	"k8s.io/apiserver/pkg/storage"
    39  	etcd3testing "k8s.io/apiserver/pkg/storage/etcd3/testing"
    40  	"k8s.io/apiserver/pkg/storage/storagebackend/factory"
    41  	"k8s.io/client-go/kubernetes/fake"
    42  	"k8s.io/kubernetes/pkg/apis/core"
    43  	netutils "k8s.io/utils/net"
    44  )
    45  
    46  func init() {
    47  	var scheme = runtime.NewScheme()
    48  
    49  	metav1.AddToGroupVersion(scheme, metav1.SchemeGroupVersion)
    50  	utilruntime.Must(core.AddToScheme(scheme))
    51  	utilruntime.Must(corev1.AddToScheme(scheme))
    52  	utilruntime.Must(scheme.SetVersionPriority(corev1.SchemeGroupVersion))
    53  
    54  	codecs = serializer.NewCodecFactory(scheme)
    55  }
    56  
    57  var codecs serializer.CodecFactory
    58  
    59  type fakeLeases struct {
    60  	storageLeases
    61  }
    62  
    63  var _ Leases = &fakeLeases{}
    64  
    65  func newFakeLeases(t *testing.T, s storage.Interface) *fakeLeases {
    66  	// use the same base key used by the controlplane, but add a random
    67  	// prefix so we can reuse the etcd instance for subtests independently.
    68  	// pkg/controlplane/instance.go:268:
    69  	// masterLeases, err := reconcilers.NewLeases(config, "/masterleases/", ttl)
    70  	// ref: https://issues.k8s.io/114049
    71  	base := "/" + uuid.New().String() + "/masterleases/"
    72  	return &fakeLeases{
    73  		storageLeases{
    74  			storage:   s,
    75  			destroyFn: func() {},
    76  			baseKey:   base,
    77  			leaseTime: 1 * time.Minute, // avoid the lease to timeout on tests
    78  		},
    79  	}
    80  }
    81  
    82  func (f *fakeLeases) SetKeys(keys []string) error {
    83  	for _, ip := range keys {
    84  		if err := f.UpdateLease(ip); err != nil {
    85  			return err
    86  		}
    87  	}
    88  	return nil
    89  }
    90  
    91  func TestLeaseEndpointReconciler(t *testing.T) {
    92  	server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
    93  	t.Cleanup(func() { server.Terminate(t) })
    94  
    95  	newFunc := func() runtime.Object { return &corev1.Endpoints{} }
    96  	newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
    97  	sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
    98  
    99  	s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc, newListFunc, "")
   100  	if err != nil {
   101  		t.Fatalf("Error creating storage: %v", err)
   102  	}
   103  	t.Cleanup(dFunc)
   104  
   105  	reconcileTests := []struct {
   106  		testName      string
   107  		serviceName   string
   108  		ip            string
   109  		endpointPorts []corev1.EndpointPort
   110  		endpointKeys  []string
   111  		initialState  []runtime.Object
   112  		expectUpdate  []runtime.Object
   113  		expectCreate  []runtime.Object
   114  		expectLeases  []string
   115  	}{
   116  		{
   117  			testName:      "no existing endpoints",
   118  			serviceName:   "foo",
   119  			ip:            "1.2.3.4",
   120  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   121  			initialState:  nil,
   122  			expectCreate:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   123  			expectLeases:  []string{"1.2.3.4"},
   124  		},
   125  		{
   126  			testName:      "existing endpoints satisfy",
   127  			serviceName:   "foo",
   128  			ip:            "1.2.3.4",
   129  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   130  			initialState:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   131  			expectLeases:  []string{"1.2.3.4"},
   132  		},
   133  		{
   134  			testName:      "existing endpoints satisfy, no endpointslice",
   135  			serviceName:   "foo",
   136  			ip:            "1.2.3.4",
   137  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   138  			initialState: []runtime.Object{
   139  				makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   140  			},
   141  			expectCreate: []runtime.Object{
   142  				makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   143  			},
   144  			expectLeases: []string{"1.2.3.4"},
   145  		},
   146  		{
   147  			testName:      "existing endpointslice satisfies, no endpoints",
   148  			serviceName:   "foo",
   149  			ip:            "1.2.3.4",
   150  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   151  			initialState: []runtime.Object{
   152  				makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   153  			},
   154  			expectCreate: []runtime.Object{
   155  				makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   156  			},
   157  			expectLeases: []string{"1.2.3.4"},
   158  		},
   159  		{
   160  			testName:      "existing endpoints satisfy, endpointslice is wrong",
   161  			serviceName:   "foo",
   162  			ip:            "1.2.3.4",
   163  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   164  			initialState: []runtime.Object{
   165  				makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   166  				makeEndpointSlice("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   167  			},
   168  			expectUpdate: []runtime.Object{
   169  				makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   170  			},
   171  			expectLeases: []string{"1.2.3.4"},
   172  		},
   173  		{
   174  			testName:      "existing endpointslice satisfies, endpoints is wrong",
   175  			serviceName:   "foo",
   176  			ip:            "1.2.3.4",
   177  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   178  			initialState: []runtime.Object{
   179  				makeEndpoints("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   180  				makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   181  			},
   182  			expectUpdate: []runtime.Object{
   183  				makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   184  			},
   185  			expectLeases: []string{"1.2.3.4"},
   186  		},
   187  		{
   188  			testName:      "existing endpoints satisfy + refresh existing key",
   189  			serviceName:   "foo",
   190  			ip:            "1.2.3.4",
   191  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   192  			endpointKeys:  []string{"1.2.3.4"},
   193  			initialState:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   194  			expectLeases:  []string{"1.2.3.4"},
   195  		},
   196  		{
   197  			testName:      "existing endpoints satisfy but too many",
   198  			serviceName:   "foo",
   199  			ip:            "1.2.3.4",
   200  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   201  			initialState:  makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   202  			expectUpdate:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   203  			expectLeases:  []string{"1.2.3.4"},
   204  		},
   205  		{
   206  			testName:      "existing endpoints satisfy but too many + extra masters",
   207  			serviceName:   "foo",
   208  			ip:            "1.2.3.4",
   209  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   210  			endpointKeys:  []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
   211  			initialState:  makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   212  			expectUpdate:  makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   213  			expectLeases:  []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
   214  		},
   215  		{
   216  			testName:      "existing endpoints satisfy but too many + extra masters + delete first",
   217  			serviceName:   "foo",
   218  			ip:            "4.3.2.4",
   219  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   220  			endpointKeys:  []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
   221  			initialState:  makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   222  			expectUpdate:  makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   223  			expectLeases:  []string{"4.3.2.1", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
   224  		},
   225  		{
   226  			testName:      "existing endpoints current IP missing",
   227  			serviceName:   "foo",
   228  			ip:            "4.3.2.2",
   229  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   230  			endpointKeys:  []string{"4.3.2.1"},
   231  			initialState:  makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   232  			expectUpdate:  makeEndpointsArray("foo", []string{"4.3.2.1", "4.3.2.2"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   233  			expectLeases:  []string{"4.3.2.1", "4.3.2.2"},
   234  		},
   235  		{
   236  			testName:      "existing endpoints wrong name",
   237  			serviceName:   "foo",
   238  			ip:            "1.2.3.4",
   239  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   240  			initialState:  makeEndpointsArray("bar", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   241  			expectCreate:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   242  			expectLeases:  []string{"1.2.3.4"},
   243  		},
   244  		{
   245  			testName:      "existing endpoints wrong IP",
   246  			serviceName:   "foo",
   247  			ip:            "1.2.3.4",
   248  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   249  			initialState:  makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   250  			expectUpdate:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   251  			expectLeases:  []string{"1.2.3.4"},
   252  		},
   253  		{
   254  			testName:      "existing endpoints wrong port",
   255  			serviceName:   "foo",
   256  			ip:            "1.2.3.4",
   257  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   258  			initialState:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 9090, Protocol: "TCP"}}),
   259  			expectUpdate:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   260  			expectLeases:  []string{"1.2.3.4"},
   261  		},
   262  		{
   263  			testName:      "existing endpoints wrong protocol",
   264  			serviceName:   "foo",
   265  			ip:            "1.2.3.4",
   266  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   267  			initialState:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "UDP"}}),
   268  			expectUpdate:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   269  			expectLeases:  []string{"1.2.3.4"},
   270  		},
   271  		{
   272  			testName:      "existing endpoints wrong port name",
   273  			serviceName:   "foo",
   274  			ip:            "1.2.3.4",
   275  			endpointPorts: []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}},
   276  			initialState:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   277  			expectUpdate:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "baz", Port: 8080, Protocol: "TCP"}}),
   278  			expectLeases:  []string{"1.2.3.4"},
   279  		},
   280  		{
   281  			testName:      "existing endpoints without skip mirror label",
   282  			serviceName:   "foo",
   283  			ip:            "1.2.3.4",
   284  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   285  			initialState: []runtime.Object{
   286  				// can't use makeEndpointsArray() here because we don't want the
   287  				// skip-mirror label
   288  				&corev1.Endpoints{
   289  					ObjectMeta: metav1.ObjectMeta{
   290  						Namespace: metav1.NamespaceDefault,
   291  						Name:      "foo",
   292  					},
   293  					Subsets: []corev1.EndpointSubset{{
   294  						Addresses: []corev1.EndpointAddress{{IP: "1.2.3.4"}},
   295  						Ports:     []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   296  					}},
   297  				},
   298  				makeEndpointSlice("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   299  			},
   300  			expectUpdate: []runtime.Object{
   301  				makeEndpoints("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   302  				// EndpointSlice does not get updated because it was already correct
   303  			},
   304  			expectLeases: []string{"1.2.3.4"},
   305  		},
   306  		{
   307  			testName:    "existing endpoints extra service ports satisfy",
   308  			serviceName: "foo",
   309  			ip:          "1.2.3.4",
   310  			endpointPorts: []corev1.EndpointPort{
   311  				{Name: "foo", Port: 8080, Protocol: "TCP"},
   312  				{Name: "bar", Port: 1000, Protocol: "TCP"},
   313  				{Name: "baz", Port: 1010, Protocol: "TCP"},
   314  			},
   315  			initialState: makeEndpointsArray("foo", []string{"1.2.3.4"},
   316  				[]corev1.EndpointPort{
   317  					{Name: "foo", Port: 8080, Protocol: "TCP"},
   318  					{Name: "bar", Port: 1000, Protocol: "TCP"},
   319  					{Name: "baz", Port: 1010, Protocol: "TCP"},
   320  				},
   321  			),
   322  			expectLeases: []string{"1.2.3.4"},
   323  		},
   324  		{
   325  			testName:    "existing endpoints extra service ports missing port",
   326  			serviceName: "foo",
   327  			ip:          "1.2.3.4",
   328  			endpointPorts: []corev1.EndpointPort{
   329  				{Name: "foo", Port: 8080, Protocol: "TCP"},
   330  				{Name: "bar", Port: 1000, Protocol: "TCP"},
   331  			},
   332  			initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   333  			expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"},
   334  				[]corev1.EndpointPort{
   335  					{Name: "foo", Port: 8080, Protocol: "TCP"},
   336  					{Name: "bar", Port: 1000, Protocol: "TCP"},
   337  				},
   338  			),
   339  			expectLeases: []string{"1.2.3.4"},
   340  		},
   341  	}
   342  	for _, test := range reconcileTests {
   343  		t.Run(test.testName, func(t *testing.T) {
   344  			fakeLeases := newFakeLeases(t, s)
   345  			err := fakeLeases.SetKeys(test.endpointKeys)
   346  			if err != nil {
   347  				t.Errorf("unexpected error creating keys: %v", err)
   348  			}
   349  			clientset := fake.NewSimpleClientset(test.initialState...)
   350  
   351  			epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
   352  			r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
   353  			err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, true)
   354  			if err != nil {
   355  				t.Errorf("unexpected error reconciling: %v", err)
   356  			}
   357  
   358  			err = verifyCreatesAndUpdates(clientset, test.expectCreate, test.expectUpdate)
   359  			if err != nil {
   360  				t.Errorf("unexpected error in side effects: %v", err)
   361  			}
   362  
   363  			leases, err := fakeLeases.ListLeases()
   364  			if err != nil {
   365  				t.Errorf("unexpected error: %v", err)
   366  			}
   367  			// sort for comparison
   368  			sort.Strings(leases)
   369  			sort.Strings(test.expectLeases)
   370  			if !reflect.DeepEqual(leases, test.expectLeases) {
   371  				t.Errorf("expected %v got: %v", test.expectLeases, leases)
   372  			}
   373  		})
   374  	}
   375  
   376  	nonReconcileTests := []struct {
   377  		testName      string
   378  		serviceName   string
   379  		ip            string
   380  		endpointPorts []corev1.EndpointPort
   381  		endpointKeys  []string
   382  		initialState  []runtime.Object
   383  		expectUpdate  []runtime.Object
   384  		expectCreate  []runtime.Object
   385  		expectLeases  []string
   386  	}{
   387  		{
   388  			testName:    "existing endpoints extra service ports missing port no update",
   389  			serviceName: "foo",
   390  			ip:          "1.2.3.4",
   391  			endpointPorts: []corev1.EndpointPort{
   392  				{Name: "foo", Port: 8080, Protocol: "TCP"},
   393  				{Name: "bar", Port: 1000, Protocol: "TCP"},
   394  			},
   395  			initialState: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   396  			expectUpdate: nil,
   397  			expectLeases: []string{"1.2.3.4"},
   398  		},
   399  		{
   400  			testName:    "existing endpoints extra service ports, wrong ports, wrong IP",
   401  			serviceName: "foo",
   402  			ip:          "1.2.3.4",
   403  			endpointPorts: []corev1.EndpointPort{
   404  				{Name: "foo", Port: 8080, Protocol: "TCP"},
   405  				{Name: "bar", Port: 1000, Protocol: "TCP"},
   406  			},
   407  			initialState: makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   408  			expectUpdate: makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   409  			expectLeases: []string{"1.2.3.4"},
   410  		},
   411  		{
   412  			testName:      "no existing endpoints",
   413  			serviceName:   "foo",
   414  			ip:            "1.2.3.4",
   415  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   416  			initialState:  nil,
   417  			expectCreate:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   418  			expectLeases:  []string{"1.2.3.4"},
   419  		},
   420  	}
   421  	for _, test := range nonReconcileTests {
   422  		t.Run(test.testName, func(t *testing.T) {
   423  			fakeLeases := newFakeLeases(t, s)
   424  			err := fakeLeases.SetKeys(test.endpointKeys)
   425  			if err != nil {
   426  				t.Errorf("unexpected error creating keys: %v", err)
   427  			}
   428  			clientset := fake.NewSimpleClientset(test.initialState...)
   429  			epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
   430  			r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
   431  			err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false)
   432  			if err != nil {
   433  				t.Errorf("unexpected error reconciling: %v", err)
   434  			}
   435  
   436  			err = verifyCreatesAndUpdates(clientset, test.expectCreate, test.expectUpdate)
   437  			if err != nil {
   438  				t.Errorf("unexpected error in side effects: %v", err)
   439  			}
   440  
   441  			leases, err := fakeLeases.ListLeases()
   442  			if err != nil {
   443  				t.Errorf("unexpected error: %v", err)
   444  			}
   445  			// sort for comparison
   446  			sort.Strings(leases)
   447  			sort.Strings(test.expectLeases)
   448  			if !reflect.DeepEqual(leases, test.expectLeases) {
   449  				t.Errorf("expected %v got: %v", test.expectLeases, leases)
   450  			}
   451  		})
   452  	}
   453  }
   454  
   455  func TestLeaseRemoveEndpoints(t *testing.T) {
   456  	server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
   457  	t.Cleanup(func() { server.Terminate(t) })
   458  
   459  	newFunc := func() runtime.Object { return &corev1.Endpoints{} }
   460  	newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
   461  	sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
   462  
   463  	s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "pods"}), newFunc, newListFunc, "")
   464  	if err != nil {
   465  		t.Fatalf("Error creating storage: %v", err)
   466  	}
   467  	t.Cleanup(dFunc)
   468  
   469  	stopTests := []struct {
   470  		testName         string
   471  		serviceName      string
   472  		ip               string
   473  		endpointPorts    []corev1.EndpointPort
   474  		endpointKeys     []string
   475  		initialState     []runtime.Object
   476  		expectUpdate     []runtime.Object
   477  		expectLeases     []string
   478  		apiServerStartup bool
   479  	}{
   480  		{
   481  			testName:         "successful remove previous endpoints before apiserver starts",
   482  			serviceName:      "foo",
   483  			ip:               "1.2.3.4",
   484  			endpointPorts:    []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   485  			endpointKeys:     []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
   486  			initialState:     makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   487  			expectUpdate:     makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   488  			expectLeases:     []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"},
   489  			apiServerStartup: true,
   490  		},
   491  		{
   492  			testName:      "successful stop reconciling",
   493  			serviceName:   "foo",
   494  			ip:            "1.2.3.4",
   495  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   496  			endpointKeys:  []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
   497  			initialState:  makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   498  			expectUpdate:  makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   499  			expectLeases:  []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"},
   500  		},
   501  		{
   502  			testName:      "stop reconciling with ip not in endpoint ip list",
   503  			serviceName:   "foo",
   504  			ip:            "5.6.7.8",
   505  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   506  			endpointKeys:  []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
   507  			initialState:  makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   508  			expectLeases:  []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
   509  		},
   510  		{
   511  			testName:      "endpoint with no subset",
   512  			serviceName:   "foo",
   513  			ip:            "1.2.3.4",
   514  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   515  			endpointKeys:  []string{"1.2.3.4", "4.3.2.2", "4.3.2.3", "4.3.2.4"},
   516  			initialState:  makeEndpointsArray("foo", nil, nil),
   517  			expectUpdate:  makeEndpointsArray("foo", []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   518  			expectLeases:  []string{"4.3.2.2", "4.3.2.3", "4.3.2.4"},
   519  		},
   520  		{
   521  			testName:      "the last API server was shut down cleanly",
   522  			serviceName:   "foo",
   523  			ip:            "1.2.3.4",
   524  			endpointPorts: []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   525  			endpointKeys:  []string{"1.2.3.4"},
   526  			initialState:  makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   527  			expectUpdate:  makeEndpointsArray("foo", []string{}, []corev1.EndpointPort{}),
   528  			expectLeases:  []string{},
   529  		},
   530  	}
   531  	for _, test := range stopTests {
   532  		t.Run(test.testName, func(t *testing.T) {
   533  			fakeLeases := newFakeLeases(t, s)
   534  			err := fakeLeases.SetKeys(test.endpointKeys)
   535  			if err != nil {
   536  				t.Errorf("unexpected error creating keys: %v", err)
   537  			}
   538  			clientset := fake.NewSimpleClientset(test.initialState...)
   539  			epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
   540  			r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
   541  			if !test.apiServerStartup {
   542  				r.StopReconciling()
   543  			}
   544  			err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts)
   545  			// if the ip is not on the endpoints, it must return an storage error and stop reconciling
   546  			if !contains(test.endpointKeys, test.ip) {
   547  				if !storage.IsNotFound(err) {
   548  					t.Errorf("expected error StorageError: key not found, Code: 1, Key: /registry/base/key/%s got:  %v", test.ip, err)
   549  				}
   550  			} else if err != nil {
   551  				t.Errorf("unexpected error reconciling: %v", err)
   552  			}
   553  
   554  			err = verifyCreatesAndUpdates(clientset, nil, test.expectUpdate)
   555  			if err != nil {
   556  				t.Errorf("unexpected error in side effects: %v", err)
   557  			}
   558  
   559  			leases, err := fakeLeases.ListLeases()
   560  			if err != nil {
   561  				t.Errorf("unexpected error: %v", err)
   562  			}
   563  			// sort for comparison
   564  			sort.Strings(leases)
   565  			sort.Strings(test.expectLeases)
   566  			if !reflect.DeepEqual(leases, test.expectLeases) {
   567  				t.Errorf("expected %v got: %v", test.expectLeases, leases)
   568  			}
   569  		})
   570  	}
   571  }
   572  
   573  func contains(s []string, str string) bool {
   574  	for _, v := range s {
   575  		if v == str {
   576  			return true
   577  		}
   578  	}
   579  	return false
   580  }
   581  
   582  func TestApiserverShutdown(t *testing.T) {
   583  	server, sc := etcd3testing.NewUnsecuredEtcd3TestClientServer(t)
   584  	t.Cleanup(func() { server.Terminate(t) })
   585  
   586  	newFunc := func() runtime.Object { return &corev1.Endpoints{} }
   587  	newListFunc := func() runtime.Object { return &corev1.EndpointsList{} }
   588  	sc.Codec = apitesting.TestStorageCodec(codecs, corev1.SchemeGroupVersion)
   589  
   590  	s, dFunc, err := factory.Create(*sc.ForResource(schema.GroupResource{Resource: "endpoints"}), newFunc, newListFunc, "")
   591  	if err != nil {
   592  		t.Fatalf("Error creating storage: %v", err)
   593  	}
   594  	t.Cleanup(dFunc)
   595  
   596  	reconcileTests := []struct {
   597  		testName                string
   598  		serviceName             string
   599  		ip                      string
   600  		endpointPorts           []corev1.EndpointPort
   601  		endpointKeys            []string
   602  		initialState            []runtime.Object
   603  		expectUpdate            []runtime.Object
   604  		expectLeases            []string
   605  		shutDownBeforeReconcile bool
   606  	}{
   607  		{
   608  			testName:                "last apiserver shutdown after endpoint reconcile",
   609  			serviceName:             "foo",
   610  			ip:                      "1.2.3.4",
   611  			endpointPorts:           []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   612  			endpointKeys:            []string{"1.2.3.4"},
   613  			initialState:            makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   614  			expectUpdate:            makeEndpointsArray("foo", []string{}, []corev1.EndpointPort{}),
   615  			expectLeases:            []string{},
   616  			shutDownBeforeReconcile: false,
   617  		},
   618  		{
   619  			testName:                "last apiserver shutdown before endpoint reconcile",
   620  			serviceName:             "foo",
   621  			ip:                      "1.2.3.4",
   622  			endpointPorts:           []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   623  			endpointKeys:            []string{"1.2.3.4"},
   624  			initialState:            makeEndpointsArray("foo", []string{"1.2.3.4"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   625  			expectUpdate:            makeEndpointsArray("foo", []string{}, []corev1.EndpointPort{}),
   626  			expectLeases:            []string{},
   627  			shutDownBeforeReconcile: true,
   628  		},
   629  		{
   630  			testName:                "not the last apiserver which was shutdown before endpoint reconcile",
   631  			serviceName:             "foo",
   632  			ip:                      "1.2.3.4",
   633  			endpointPorts:           []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   634  			endpointKeys:            []string{"1.2.3.4", "4.3.2.1"},
   635  			initialState:            makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   636  			expectUpdate:            makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   637  			expectLeases:            []string{"4.3.2.1"},
   638  			shutDownBeforeReconcile: true,
   639  		},
   640  		{
   641  			testName:                "not the last apiserver which was shutdown after endpoint reconcile",
   642  			serviceName:             "foo",
   643  			ip:                      "1.2.3.4",
   644  			endpointPorts:           []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}},
   645  			endpointKeys:            []string{"1.2.3.4", "4.3.2.1"},
   646  			initialState:            makeEndpointsArray("foo", []string{"1.2.3.4", "4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   647  			expectUpdate:            makeEndpointsArray("foo", []string{"4.3.2.1"}, []corev1.EndpointPort{{Name: "foo", Port: 8080, Protocol: "TCP"}}),
   648  			expectLeases:            []string{"4.3.2.1"},
   649  			shutDownBeforeReconcile: false,
   650  		},
   651  	}
   652  	for _, test := range reconcileTests {
   653  		t.Run(test.testName, func(t *testing.T) {
   654  			fakeLeases := newFakeLeases(t, s)
   655  			err := fakeLeases.SetKeys(test.endpointKeys)
   656  			if err != nil {
   657  				t.Errorf("unexpected error creating keys: %v", err)
   658  			}
   659  			clientset := fake.NewSimpleClientset(test.initialState...)
   660  
   661  			epAdapter := NewEndpointsAdapter(clientset.CoreV1(), clientset.DiscoveryV1())
   662  			r := NewLeaseEndpointReconciler(epAdapter, fakeLeases)
   663  
   664  			if test.shutDownBeforeReconcile {
   665  				// shutdown apiserver first
   666  				r.StopReconciling()
   667  				err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts)
   668  				if err != nil {
   669  					t.Errorf("unexpected error remove endpoints: %v", err)
   670  				}
   671  
   672  				// reconcile endpoints in another goroutine
   673  				err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false)
   674  				if err != nil {
   675  					t.Errorf("unexpected error reconciling: %v", err)
   676  				}
   677  			} else {
   678  				// reconcile endpoints first
   679  				err = r.ReconcileEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts, false)
   680  				if err != nil {
   681  					t.Errorf("unexpected error reconciling: %v", err)
   682  				}
   683  
   684  				r.StopReconciling()
   685  				err = r.RemoveEndpoints(test.serviceName, netutils.ParseIPSloppy(test.ip), test.endpointPorts)
   686  				if err != nil {
   687  					t.Errorf("unexpected error remove endpoints: %v", err)
   688  				}
   689  			}
   690  
   691  			err = verifyCreatesAndUpdates(clientset, nil, test.expectUpdate)
   692  			if err != nil {
   693  				t.Errorf("unexpected error in side effects: %v", err)
   694  			}
   695  
   696  			leases, err := fakeLeases.ListLeases()
   697  			if err != nil {
   698  				t.Errorf("unexpected error: %v", err)
   699  			}
   700  			// sort for comparison
   701  			sort.Strings(leases)
   702  			sort.Strings(test.expectLeases)
   703  			if !reflect.DeepEqual(leases, test.expectLeases) {
   704  				t.Errorf("expected %v got: %v", test.expectLeases, leases)
   705  			}
   706  		})
   707  	}
   708  }
   709  

View as plain text