...

Source file src/k8s.io/client-go/tools/leaderelection/leaderelection_test.go

Documentation: k8s.io/client-go/tools/leaderelection

     1  /*
     2  Copyright 2015 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package leaderelection
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"sync"
    24  	"testing"
    25  	"time"
    26  
    27  	"github.com/google/go-cmp/cmp"
    28  	coordinationv1 "k8s.io/api/coordination/v1"
    29  	corev1 "k8s.io/api/core/v1"
    30  	"k8s.io/apimachinery/pkg/api/equality"
    31  	"k8s.io/apimachinery/pkg/api/errors"
    32  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    33  	"k8s.io/apimachinery/pkg/runtime"
    34  	"k8s.io/apimachinery/pkg/util/wait"
    35  	"k8s.io/client-go/kubernetes/fake"
    36  	fakeclient "k8s.io/client-go/testing"
    37  	rl "k8s.io/client-go/tools/leaderelection/resourcelock"
    38  	"k8s.io/client-go/tools/record"
    39  	"k8s.io/utils/clock"
    40  
    41  	"github.com/stretchr/testify/assert"
    42  )
    43  
    44  func createLockObject(t *testing.T, objectType, namespace, name string, record *rl.LeaderElectionRecord) (obj runtime.Object) {
    45  	objectMeta := metav1.ObjectMeta{
    46  		Namespace: namespace,
    47  		Name:      name,
    48  	}
    49  	if record != nil {
    50  		recordBytes, _ := json.Marshal(record)
    51  		objectMeta.Annotations = map[string]string{
    52  			rl.LeaderElectionRecordAnnotationKey: string(recordBytes),
    53  		}
    54  	}
    55  	switch objectType {
    56  	case "endpoints":
    57  		obj = &corev1.Endpoints{ObjectMeta: objectMeta}
    58  	case "configmaps":
    59  		obj = &corev1.ConfigMap{ObjectMeta: objectMeta}
    60  	case "leases":
    61  		var spec coordinationv1.LeaseSpec
    62  		if record != nil {
    63  			spec = rl.LeaderElectionRecordToLeaseSpec(record)
    64  		}
    65  		obj = &coordinationv1.Lease{ObjectMeta: objectMeta, Spec: spec}
    66  	default:
    67  		t.Fatal("unexpected objType:" + objectType)
    68  	}
    69  	return
    70  }
    71  
    72  type Reactor struct {
    73  	verb       string
    74  	objectType string
    75  	reaction   fakeclient.ReactionFunc
    76  }
    77  
    78  func testTryAcquireOrRenew(t *testing.T, objectType string) {
    79  	clock := clock.RealClock{}
    80  	future := clock.Now().Add(1000 * time.Hour)
    81  	past := clock.Now().Add(-1000 * time.Hour)
    82  
    83  	tests := []struct {
    84  		name           string
    85  		observedRecord rl.LeaderElectionRecord
    86  		observedTime   time.Time
    87  		retryAfter     time.Duration
    88  		reactors       []Reactor
    89  		expectedEvents []string
    90  
    91  		expectSuccess    bool
    92  		transitionLeader bool
    93  		outHolder        string
    94  	}{
    95  		{
    96  			name: "acquire from no object",
    97  			reactors: []Reactor{
    98  				{
    99  					verb: "get",
   100  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   101  						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
   102  					},
   103  				},
   104  				{
   105  					verb: "create",
   106  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   107  						return true, action.(fakeclient.CreateAction).GetObject(), nil
   108  					},
   109  				},
   110  			},
   111  			expectSuccess: true,
   112  			outHolder:     "baz",
   113  		},
   114  		{
   115  			name: "acquire from object without annotations",
   116  			reactors: []Reactor{
   117  				{
   118  					verb: "get",
   119  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   120  						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), nil), nil
   121  					},
   122  				},
   123  				{
   124  					verb: "update",
   125  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   126  						return true, action.(fakeclient.CreateAction).GetObject(), nil
   127  					},
   128  				},
   129  			},
   130  			expectSuccess:    true,
   131  			transitionLeader: true,
   132  			outHolder:        "baz",
   133  		},
   134  		{
   135  			name: "acquire from led object with the lease duration seconds",
   136  			reactors: []Reactor{
   137  				{
   138  					verb: "get",
   139  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   140  						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing", LeaseDurationSeconds: 3}), nil
   141  					},
   142  				},
   143  				{
   144  					verb: "get",
   145  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   146  						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing", LeaseDurationSeconds: 3}), nil
   147  					},
   148  				},
   149  				{
   150  					verb: "update",
   151  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   152  						return true, action.(fakeclient.CreateAction).GetObject(), nil
   153  					},
   154  				},
   155  			},
   156  			retryAfter:       3 * time.Second,
   157  			expectSuccess:    true,
   158  			transitionLeader: true,
   159  			outHolder:        "baz",
   160  		},
   161  		{
   162  			name: "acquire from unled object",
   163  			reactors: []Reactor{
   164  				{
   165  					verb: "get",
   166  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   167  						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
   168  					},
   169  				},
   170  				{
   171  					verb: "update",
   172  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   173  						return true, action.(fakeclient.CreateAction).GetObject(), nil
   174  					},
   175  				},
   176  			},
   177  
   178  			expectSuccess:    true,
   179  			transitionLeader: true,
   180  			outHolder:        "baz",
   181  		},
   182  		{
   183  			name: "acquire from led, unacked object",
   184  			reactors: []Reactor{
   185  				{
   186  					verb: "get",
   187  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   188  						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
   189  					},
   190  				},
   191  				{
   192  					verb: "update",
   193  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   194  						return true, action.(fakeclient.CreateAction).GetObject(), nil
   195  					},
   196  				},
   197  			},
   198  			observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
   199  			observedTime:   past,
   200  
   201  			expectSuccess:    true,
   202  			transitionLeader: true,
   203  			outHolder:        "baz",
   204  		},
   205  		{
   206  			name: "acquire from empty led, acked object",
   207  			reactors: []Reactor{
   208  				{
   209  					verb: "get",
   210  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   211  						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: ""}), nil
   212  					},
   213  				},
   214  				{
   215  					verb: "update",
   216  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   217  						return true, action.(fakeclient.CreateAction).GetObject(), nil
   218  					},
   219  				},
   220  			},
   221  			observedTime: future,
   222  
   223  			expectSuccess:    true,
   224  			transitionLeader: true,
   225  			outHolder:        "baz",
   226  		},
   227  		{
   228  			name: "don't acquire from led, acked object",
   229  			reactors: []Reactor{
   230  				{
   231  					verb: "get",
   232  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   233  						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
   234  					},
   235  				},
   236  			},
   237  			observedTime: future,
   238  
   239  			expectSuccess: false,
   240  			outHolder:     "bing",
   241  		},
   242  		{
   243  			name: "renew already acquired object",
   244  			reactors: []Reactor{
   245  				{
   246  					verb: "get",
   247  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   248  						return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
   249  					},
   250  				},
   251  				{
   252  					verb: "update",
   253  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   254  						return true, action.(fakeclient.CreateAction).GetObject(), nil
   255  					},
   256  				},
   257  			},
   258  			observedTime:   future,
   259  			observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"},
   260  
   261  			expectSuccess: true,
   262  			outHolder:     "baz",
   263  		},
   264  	}
   265  
   266  	for i := range tests {
   267  		test := &tests[i]
   268  		t.Run(test.name, func(t *testing.T) {
   269  			// OnNewLeader is called async so we have to wait for it.
   270  			var wg sync.WaitGroup
   271  			wg.Add(1)
   272  			var reportedLeader string
   273  			var lock rl.Interface
   274  
   275  			objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
   276  			recorder := record.NewFakeRecorder(100)
   277  			resourceLockConfig := rl.ResourceLockConfig{
   278  				Identity:      "baz",
   279  				EventRecorder: recorder,
   280  			}
   281  			c := &fake.Clientset{}
   282  			for _, reactor := range test.reactors {
   283  				c.AddReactor(reactor.verb, objectType, reactor.reaction)
   284  			}
   285  			c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
   286  				t.Errorf("unreachable action. testclient called too many times: %+v", action)
   287  				return true, nil, fmt.Errorf("unreachable action")
   288  			})
   289  
   290  			switch objectType {
   291  			case "leases":
   292  				lock = &rl.LeaseLock{
   293  					LeaseMeta:  objectMeta,
   294  					LockConfig: resourceLockConfig,
   295  					Client:     c.CoordinationV1(),
   296  				}
   297  			default:
   298  				t.Fatalf("Unknown objectType: %v", objectType)
   299  			}
   300  
   301  			lec := LeaderElectionConfig{
   302  				Lock:          lock,
   303  				LeaseDuration: 10 * time.Second,
   304  				Callbacks: LeaderCallbacks{
   305  					OnNewLeader: func(l string) {
   306  						defer wg.Done()
   307  						reportedLeader = l
   308  					},
   309  				},
   310  			}
   311  			observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
   312  			le := &LeaderElector{
   313  				config:            lec,
   314  				observedRecord:    test.observedRecord,
   315  				observedRawRecord: observedRawRecord,
   316  				observedTime:      test.observedTime,
   317  				clock:             clock,
   318  				metrics:           globalMetricsFactory.newLeaderMetrics(),
   319  			}
   320  			if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
   321  				if test.retryAfter != 0 {
   322  					time.Sleep(test.retryAfter)
   323  					if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
   324  						t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
   325  					}
   326  				} else {
   327  					t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
   328  				}
   329  			}
   330  
   331  			le.observedRecord.AcquireTime = metav1.Time{}
   332  			le.observedRecord.RenewTime = metav1.Time{}
   333  			if le.observedRecord.HolderIdentity != test.outHolder {
   334  				t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
   335  			}
   336  			if len(test.reactors) != len(c.Actions()) {
   337  				t.Errorf("wrong number of api interactions")
   338  			}
   339  			if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
   340  				t.Errorf("leader should have transitioned but did not")
   341  			}
   342  			if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
   343  				t.Errorf("leader should not have transitioned but did")
   344  			}
   345  
   346  			le.maybeReportTransition()
   347  			wg.Wait()
   348  			if reportedLeader != test.outHolder {
   349  				t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
   350  			}
   351  			assertEqualEvents(t, test.expectedEvents, recorder.Events)
   352  		})
   353  	}
   354  }
   355  
   356  // Will test leader election using lease as the resource
   357  func TestTryAcquireOrRenewLeases(t *testing.T) {
   358  	testTryAcquireOrRenew(t, "leases")
   359  }
   360  
   361  func TestLeaseSpecToLeaderElectionRecordRoundTrip(t *testing.T) {
   362  	holderIdentity := "foo"
   363  	leaseDurationSeconds := int32(10)
   364  	leaseTransitions := int32(1)
   365  	oldSpec := coordinationv1.LeaseSpec{
   366  		HolderIdentity:       &holderIdentity,
   367  		LeaseDurationSeconds: &leaseDurationSeconds,
   368  		AcquireTime:          &metav1.MicroTime{Time: time.Now()},
   369  		RenewTime:            &metav1.MicroTime{Time: time.Now()},
   370  		LeaseTransitions:     &leaseTransitions,
   371  	}
   372  
   373  	oldRecord := rl.LeaseSpecToLeaderElectionRecord(&oldSpec)
   374  	newSpec := rl.LeaderElectionRecordToLeaseSpec(oldRecord)
   375  
   376  	if !equality.Semantic.DeepEqual(oldSpec, newSpec) {
   377  		t.Errorf("diff: %v", cmp.Diff(oldSpec, newSpec))
   378  	}
   379  
   380  	newRecord := rl.LeaseSpecToLeaderElectionRecord(&newSpec)
   381  
   382  	if !equality.Semantic.DeepEqual(oldRecord, newRecord) {
   383  		t.Errorf("diff: %v", cmp.Diff(oldRecord, newRecord))
   384  	}
   385  }
   386  
   387  func GetRawRecordOrDie(t *testing.T, objectType string, ler rl.LeaderElectionRecord) (ret []byte) {
   388  	var err error
   389  	switch objectType {
   390  	case "leases":
   391  		ret, err = json.Marshal(ler)
   392  		if err != nil {
   393  			t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err)
   394  		}
   395  	default:
   396  		t.Fatal("unexpected objType:" + objectType)
   397  	}
   398  	return
   399  }
   400  
   401  func testReleaseLease(t *testing.T, objectType string) {
   402  	tests := []struct {
   403  		name           string
   404  		observedRecord rl.LeaderElectionRecord
   405  		observedTime   time.Time
   406  		reactors       []Reactor
   407  		expectedEvents []string
   408  
   409  		expectSuccess    bool
   410  		transitionLeader bool
   411  		outHolder        string
   412  	}{
   413  		{
   414  			name: "release acquired lock from no object",
   415  			reactors: []Reactor{
   416  				{
   417  					verb:       "get",
   418  					objectType: objectType,
   419  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   420  						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
   421  					},
   422  				},
   423  				{
   424  					verb:       "create",
   425  					objectType: objectType,
   426  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   427  						return true, action.(fakeclient.CreateAction).GetObject(), nil
   428  					},
   429  				},
   430  				{
   431  					verb:       "update",
   432  					objectType: objectType,
   433  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   434  						return true, action.(fakeclient.UpdateAction).GetObject(), nil
   435  					},
   436  				},
   437  			},
   438  			expectSuccess: true,
   439  			outHolder:     "",
   440  		},
   441  	}
   442  
   443  	for i := range tests {
   444  		test := &tests[i]
   445  		t.Run(test.name, func(t *testing.T) {
   446  			// OnNewLeader is called async so we have to wait for it.
   447  			var wg sync.WaitGroup
   448  			wg.Add(1)
   449  			var reportedLeader string
   450  			var lock rl.Interface
   451  
   452  			objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
   453  			recorder := record.NewFakeRecorder(100)
   454  			resourceLockConfig := rl.ResourceLockConfig{
   455  				Identity:      "baz",
   456  				EventRecorder: recorder,
   457  			}
   458  			c := &fake.Clientset{}
   459  			for _, reactor := range test.reactors {
   460  				c.AddReactor(reactor.verb, objectType, reactor.reaction)
   461  			}
   462  			c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
   463  				t.Errorf("unreachable action. testclient called too many times: %+v", action)
   464  				return true, nil, fmt.Errorf("unreachable action")
   465  			})
   466  
   467  			switch objectType {
   468  			case "leases":
   469  				lock = &rl.LeaseLock{
   470  					LeaseMeta:  objectMeta,
   471  					LockConfig: resourceLockConfig,
   472  					Client:     c.CoordinationV1(),
   473  				}
   474  			default:
   475  				t.Fatalf("Unknown objectType: %v", objectType)
   476  			}
   477  
   478  			lec := LeaderElectionConfig{
   479  				Lock:          lock,
   480  				LeaseDuration: 10 * time.Second,
   481  				Callbacks: LeaderCallbacks{
   482  					OnNewLeader: func(l string) {
   483  						defer wg.Done()
   484  						reportedLeader = l
   485  					},
   486  				},
   487  			}
   488  			observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
   489  			le := &LeaderElector{
   490  				config:            lec,
   491  				observedRecord:    test.observedRecord,
   492  				observedRawRecord: observedRawRecord,
   493  				observedTime:      test.observedTime,
   494  				clock:             clock.RealClock{},
   495  				metrics:           globalMetricsFactory.newLeaderMetrics(),
   496  			}
   497  			if !le.tryAcquireOrRenew(context.Background()) {
   498  				t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true)
   499  			}
   500  
   501  			le.maybeReportTransition()
   502  
   503  			// Wait for a response to the leader transition, and add 1 so that we can track the final transition.
   504  			wg.Wait()
   505  			wg.Add(1)
   506  
   507  			if test.expectSuccess != le.release() {
   508  				t.Errorf("unexpected result of release: [succeeded=%v]", !test.expectSuccess)
   509  			}
   510  
   511  			le.observedRecord.AcquireTime = metav1.Time{}
   512  			le.observedRecord.RenewTime = metav1.Time{}
   513  			if le.observedRecord.HolderIdentity != test.outHolder {
   514  				t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
   515  			}
   516  			if len(test.reactors) != len(c.Actions()) {
   517  				t.Errorf("wrong number of api interactions")
   518  			}
   519  			if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
   520  				t.Errorf("leader should have transitioned but did not")
   521  			}
   522  			if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
   523  				t.Errorf("leader should not have transitioned but did")
   524  			}
   525  			le.maybeReportTransition()
   526  			wg.Wait()
   527  			if reportedLeader != test.outHolder {
   528  				t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
   529  			}
   530  			assertEqualEvents(t, test.expectedEvents, recorder.Events)
   531  		})
   532  	}
   533  }
   534  
   535  // Will test leader election using endpoints as the resource
   536  func TestReleaseLeaseLeases(t *testing.T) {
   537  	testReleaseLease(t, "leases")
   538  }
   539  
   540  func TestReleaseOnCancellation_Leases(t *testing.T) {
   541  	testReleaseOnCancellation(t, "leases")
   542  }
   543  
   544  func testReleaseOnCancellation(t *testing.T, objectType string) {
   545  	var (
   546  		onNewLeader   = make(chan struct{})
   547  		onRenewCalled = make(chan struct{})
   548  		onRenewResume = make(chan struct{})
   549  		onRelease     = make(chan struct{})
   550  
   551  		lockObj runtime.Object
   552  		gets    int
   553  		updates int
   554  		wg      sync.WaitGroup
   555  	)
   556  	resetVars := func() {
   557  		onNewLeader = make(chan struct{})
   558  		onRenewCalled = make(chan struct{})
   559  		onRenewResume = make(chan struct{})
   560  		onRelease = make(chan struct{})
   561  
   562  		lockObj = nil
   563  		gets = 0
   564  		updates = 0
   565  	}
   566  	lec := LeaderElectionConfig{
   567  		LeaseDuration: 15 * time.Second,
   568  		RenewDeadline: 2 * time.Second,
   569  		RetryPeriod:   1 * time.Second,
   570  
   571  		// This is what we're testing
   572  		ReleaseOnCancel: true,
   573  
   574  		Callbacks: LeaderCallbacks{
   575  			OnNewLeader:      func(identity string) {},
   576  			OnStoppedLeading: func() {},
   577  			OnStartedLeading: func(context.Context) {
   578  				close(onNewLeader)
   579  			},
   580  		},
   581  	}
   582  
   583  	tests := []struct {
   584  		name           string
   585  		reactors       []Reactor
   586  		expectedEvents []string
   587  	}{
   588  		{
   589  			name: "release acquired lock on cancellation of update",
   590  			reactors: []Reactor{
   591  				{
   592  					verb:       "get",
   593  					objectType: objectType,
   594  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   595  						gets++
   596  						if lockObj != nil {
   597  							return true, lockObj, nil
   598  						}
   599  						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
   600  					},
   601  				},
   602  				{
   603  					verb:       "create",
   604  					objectType: objectType,
   605  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   606  						lockObj = action.(fakeclient.CreateAction).GetObject()
   607  						return true, lockObj, nil
   608  					},
   609  				},
   610  				{
   611  					verb:       "update",
   612  					objectType: objectType,
   613  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   614  						updates++
   615  						// Skip initial two fast path renews
   616  						if updates%2 == 1 && updates < 5 {
   617  							return true, nil, context.Canceled
   618  						}
   619  
   620  						// Second update (first renew) should return our canceled error
   621  						// FakeClient doesn't do anything with the context so we're doing this ourselves
   622  						if updates == 4 {
   623  							close(onRenewCalled)
   624  							<-onRenewResume
   625  							return true, nil, context.Canceled
   626  						} else if updates == 5 {
   627  							// We update the lock after the cancellation to release it
   628  							// This wg is to avoid the data race on lockObj
   629  							defer wg.Done()
   630  							close(onRelease)
   631  						}
   632  
   633  						lockObj = action.(fakeclient.UpdateAction).GetObject()
   634  						return true, lockObj, nil
   635  					},
   636  				},
   637  			},
   638  			expectedEvents: []string{
   639  				"Normal LeaderElection baz became leader",
   640  				"Normal LeaderElection baz stopped leading",
   641  			},
   642  		},
   643  		{
   644  			name: "release acquired lock on cancellation of get",
   645  			reactors: []Reactor{
   646  				{
   647  					verb:       "get",
   648  					objectType: objectType,
   649  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   650  						gets++
   651  						if lockObj != nil {
   652  							// Third and more get (first create, second renew) should return our canceled error
   653  							// FakeClient doesn't do anything with the context so we're doing this ourselves
   654  							if gets >= 3 {
   655  								close(onRenewCalled)
   656  								<-onRenewResume
   657  								return true, nil, context.Canceled
   658  							}
   659  							return true, lockObj, nil
   660  						}
   661  						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
   662  					},
   663  				},
   664  				{
   665  					verb:       "create",
   666  					objectType: objectType,
   667  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   668  						lockObj = action.(fakeclient.CreateAction).GetObject()
   669  						return true, lockObj, nil
   670  					},
   671  				},
   672  				{
   673  					verb:       "update",
   674  					objectType: objectType,
   675  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   676  						updates++
   677  						// Always skip fast path renew
   678  						if updates%2 == 1 {
   679  							return true, nil, context.Canceled
   680  						}
   681  						// Second update (first renew) should release the lock
   682  						if updates == 4 {
   683  							// We update the lock after the cancellation to release it
   684  							// This wg is to avoid the data race on lockObj
   685  							defer wg.Done()
   686  							close(onRelease)
   687  						}
   688  
   689  						lockObj = action.(fakeclient.UpdateAction).GetObject()
   690  						return true, lockObj, nil
   691  					},
   692  				},
   693  			},
   694  			expectedEvents: []string{
   695  				"Normal LeaderElection baz became leader",
   696  				"Normal LeaderElection baz stopped leading",
   697  			},
   698  		},
   699  	}
   700  
   701  	for i := range tests {
   702  		test := &tests[i]
   703  		t.Run(test.name, func(t *testing.T) {
   704  			wg.Add(1)
   705  			resetVars()
   706  
   707  			recorder := record.NewFakeRecorder(100)
   708  			resourceLockConfig := rl.ResourceLockConfig{
   709  				Identity:      "baz",
   710  				EventRecorder: recorder,
   711  			}
   712  			c := &fake.Clientset{}
   713  			for _, reactor := range test.reactors {
   714  				c.AddReactor(reactor.verb, objectType, reactor.reaction)
   715  			}
   716  			c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
   717  				t.Errorf("unreachable action. testclient called too many times: %+v", action)
   718  				return true, nil, fmt.Errorf("unreachable action")
   719  			})
   720  			lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
   721  			if err != nil {
   722  				t.Fatal("resourcelock.New() = ", err)
   723  			}
   724  
   725  			lec.Lock = lock
   726  			elector, err := NewLeaderElector(lec)
   727  			if err != nil {
   728  				t.Fatal("Failed to create leader elector: ", err)
   729  			}
   730  
   731  			ctx, cancel := context.WithCancel(context.Background())
   732  
   733  			go elector.Run(ctx)
   734  
   735  			// Wait for us to become the leader
   736  			select {
   737  			case <-onNewLeader:
   738  			case <-time.After(10 * time.Second):
   739  				t.Fatal("failed to become the leader")
   740  			}
   741  
   742  			// Wait for renew (update) to be invoked
   743  			select {
   744  			case <-onRenewCalled:
   745  			case <-time.After(10 * time.Second):
   746  				t.Fatal("the elector failed to renew the lock")
   747  			}
   748  
   749  			// Cancel the context - stopping the elector while
   750  			// it's running
   751  			cancel()
   752  
   753  			// Resume the tryAcquireOrRenew call to return the cancellation
   754  			// which should trigger the release flow
   755  			close(onRenewResume)
   756  
   757  			select {
   758  			case <-onRelease:
   759  			case <-time.After(10 * time.Second):
   760  				t.Fatal("the lock was not released")
   761  			}
   762  			wg.Wait()
   763  			assertEqualEvents(t, test.expectedEvents, recorder.Events)
   764  		})
   765  	}
   766  }
   767  
   768  func TestLeaderElectionConfigValidation(t *testing.T) {
   769  	resourceLockConfig := rl.ResourceLockConfig{
   770  		Identity: "baz",
   771  	}
   772  
   773  	lock := &rl.LeaseLock{
   774  		LockConfig: resourceLockConfig,
   775  	}
   776  
   777  	lec := LeaderElectionConfig{
   778  		Lock:          lock,
   779  		LeaseDuration: 15 * time.Second,
   780  		RenewDeadline: 2 * time.Second,
   781  		RetryPeriod:   1 * time.Second,
   782  
   783  		ReleaseOnCancel: true,
   784  
   785  		Callbacks: LeaderCallbacks{
   786  			OnNewLeader:      func(identity string) {},
   787  			OnStoppedLeading: func() {},
   788  			OnStartedLeading: func(context.Context) {},
   789  		},
   790  	}
   791  
   792  	_, err := NewLeaderElector(lec)
   793  	assert.NoError(t, err)
   794  
   795  	// Invalid lock identity
   796  	resourceLockConfig.Identity = ""
   797  	lock.LockConfig = resourceLockConfig
   798  	lec.Lock = lock
   799  	_, err = NewLeaderElector(lec)
   800  	assert.Error(t, err, fmt.Errorf("Lock identity is empty"))
   801  }
   802  
   803  func assertEqualEvents(t *testing.T, expected []string, actual <-chan string) {
   804  	c := time.After(wait.ForeverTestTimeout)
   805  	for _, e := range expected {
   806  		select {
   807  		case a := <-actual:
   808  			if e != a {
   809  				t.Errorf("Expected event %q, got %q", e, a)
   810  				return
   811  			}
   812  		case <-c:
   813  			t.Errorf("Expected event %q, got nothing", e)
   814  			// continue iterating to print all expected events
   815  		}
   816  	}
   817  	for {
   818  		select {
   819  		case a := <-actual:
   820  			t.Errorf("Unexpected event: %q", a)
   821  		default:
   822  			return // No more events, as expected.
   823  		}
   824  	}
   825  }
   826  
   827  func TestFastPathLeaderElection(t *testing.T) {
   828  	objectType := "leases"
   829  	var (
   830  		lockObj    runtime.Object
   831  		updates    int
   832  		lockOps    []string
   833  		cancelFunc func()
   834  	)
   835  	resetVars := func() {
   836  		lockObj = nil
   837  		updates = 0
   838  		lockOps = []string{}
   839  		cancelFunc = nil
   840  	}
   841  	lec := LeaderElectionConfig{
   842  		LeaseDuration: 15 * time.Second,
   843  		RenewDeadline: 2 * time.Second,
   844  		RetryPeriod:   1 * time.Second,
   845  
   846  		Callbacks: LeaderCallbacks{
   847  			OnNewLeader:      func(identity string) {},
   848  			OnStoppedLeading: func() {},
   849  			OnStartedLeading: func(context.Context) {
   850  			},
   851  		},
   852  	}
   853  
   854  	tests := []struct {
   855  		name            string
   856  		reactors        []Reactor
   857  		expectedLockOps []string
   858  	}{
   859  		{
   860  			name: "Exercise fast path after lock acquired",
   861  			reactors: []Reactor{
   862  				{
   863  					verb:       "get",
   864  					objectType: objectType,
   865  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   866  						lockOps = append(lockOps, "get")
   867  						if lockObj != nil {
   868  							return true, lockObj, nil
   869  						}
   870  						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
   871  					},
   872  				},
   873  				{
   874  					verb:       "create",
   875  					objectType: objectType,
   876  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   877  						lockOps = append(lockOps, "create")
   878  						lockObj = action.(fakeclient.CreateAction).GetObject()
   879  						return true, lockObj, nil
   880  					},
   881  				},
   882  				{
   883  					verb:       "update",
   884  					objectType: objectType,
   885  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   886  						updates++
   887  						lockOps = append(lockOps, "update")
   888  						if updates == 2 {
   889  							cancelFunc()
   890  						}
   891  						lockObj = action.(fakeclient.UpdateAction).GetObject()
   892  						return true, lockObj, nil
   893  					},
   894  				},
   895  			},
   896  			expectedLockOps: []string{"get", "create", "update", "update"},
   897  		},
   898  		{
   899  			name: "Fallback to slow path after fast path fails",
   900  			reactors: []Reactor{
   901  				{
   902  					verb:       "get",
   903  					objectType: objectType,
   904  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   905  						lockOps = append(lockOps, "get")
   906  						if lockObj != nil {
   907  							return true, lockObj, nil
   908  						}
   909  						return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
   910  					},
   911  				},
   912  				{
   913  					verb:       "create",
   914  					objectType: objectType,
   915  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   916  						lockOps = append(lockOps, "create")
   917  						lockObj = action.(fakeclient.CreateAction).GetObject()
   918  						return true, lockObj, nil
   919  					},
   920  				},
   921  				{
   922  					verb:       "update",
   923  					objectType: objectType,
   924  					reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
   925  						updates++
   926  						lockOps = append(lockOps, "update")
   927  						switch updates {
   928  						case 2:
   929  							return true, nil, errors.NewConflict(action.(fakeclient.UpdateAction).GetResource().GroupResource(), "fake conflict", nil)
   930  						case 4:
   931  							cancelFunc()
   932  						}
   933  						lockObj = action.(fakeclient.UpdateAction).GetObject()
   934  						return true, lockObj, nil
   935  					},
   936  				},
   937  			},
   938  			expectedLockOps: []string{"get", "create", "update", "update", "get", "update", "update"},
   939  		},
   940  	}
   941  
   942  	for i := range tests {
   943  		test := &tests[i]
   944  		t.Run(test.name, func(t *testing.T) {
   945  			resetVars()
   946  
   947  			recorder := record.NewFakeRecorder(100)
   948  			resourceLockConfig := rl.ResourceLockConfig{
   949  				Identity:      "baz",
   950  				EventRecorder: recorder,
   951  			}
   952  			c := &fake.Clientset{}
   953  			for _, reactor := range test.reactors {
   954  				c.AddReactor(reactor.verb, objectType, reactor.reaction)
   955  			}
   956  			c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
   957  				t.Errorf("unreachable action. testclient called too many times: %+v", action)
   958  				return true, nil, fmt.Errorf("unreachable action")
   959  			})
   960  			lock, err := rl.New("leases", "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
   961  			if err != nil {
   962  				t.Fatal("resourcelock.New() = ", err)
   963  			}
   964  
   965  			lec.Lock = lock
   966  			elector, err := NewLeaderElector(lec)
   967  			if err != nil {
   968  				t.Fatal("Failed to create leader elector: ", err)
   969  			}
   970  
   971  			ctx, cancel := context.WithCancel(context.Background())
   972  			cancelFunc = cancel
   973  
   974  			elector.Run(ctx)
   975  			assert.Equal(t, test.expectedLockOps, lockOps, "Expected lock ops %q, got %q", test.expectedLockOps, lockOps)
   976  		})
   977  	}
   978  }
   979  

View as plain text