1
16
17 package leaderelection
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "sync"
24 "testing"
25 "time"
26
27 "github.com/google/go-cmp/cmp"
28 coordinationv1 "k8s.io/api/coordination/v1"
29 corev1 "k8s.io/api/core/v1"
30 "k8s.io/apimachinery/pkg/api/equality"
31 "k8s.io/apimachinery/pkg/api/errors"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 "k8s.io/apimachinery/pkg/runtime"
34 "k8s.io/apimachinery/pkg/util/wait"
35 "k8s.io/client-go/kubernetes/fake"
36 fakeclient "k8s.io/client-go/testing"
37 rl "k8s.io/client-go/tools/leaderelection/resourcelock"
38 "k8s.io/client-go/tools/record"
39 "k8s.io/utils/clock"
40
41 "github.com/stretchr/testify/assert"
42 )
43
44 func createLockObject(t *testing.T, objectType, namespace, name string, record *rl.LeaderElectionRecord) (obj runtime.Object) {
45 objectMeta := metav1.ObjectMeta{
46 Namespace: namespace,
47 Name: name,
48 }
49 if record != nil {
50 recordBytes, _ := json.Marshal(record)
51 objectMeta.Annotations = map[string]string{
52 rl.LeaderElectionRecordAnnotationKey: string(recordBytes),
53 }
54 }
55 switch objectType {
56 case "endpoints":
57 obj = &corev1.Endpoints{ObjectMeta: objectMeta}
58 case "configmaps":
59 obj = &corev1.ConfigMap{ObjectMeta: objectMeta}
60 case "leases":
61 var spec coordinationv1.LeaseSpec
62 if record != nil {
63 spec = rl.LeaderElectionRecordToLeaseSpec(record)
64 }
65 obj = &coordinationv1.Lease{ObjectMeta: objectMeta, Spec: spec}
66 default:
67 t.Fatal("unexpected objType:" + objectType)
68 }
69 return
70 }
71
72 type Reactor struct {
73 verb string
74 objectType string
75 reaction fakeclient.ReactionFunc
76 }
77
78 func testTryAcquireOrRenew(t *testing.T, objectType string) {
79 clock := clock.RealClock{}
80 future := clock.Now().Add(1000 * time.Hour)
81 past := clock.Now().Add(-1000 * time.Hour)
82
83 tests := []struct {
84 name string
85 observedRecord rl.LeaderElectionRecord
86 observedTime time.Time
87 retryAfter time.Duration
88 reactors []Reactor
89 expectedEvents []string
90
91 expectSuccess bool
92 transitionLeader bool
93 outHolder string
94 }{
95 {
96 name: "acquire from no object",
97 reactors: []Reactor{
98 {
99 verb: "get",
100 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
101 return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
102 },
103 },
104 {
105 verb: "create",
106 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
107 return true, action.(fakeclient.CreateAction).GetObject(), nil
108 },
109 },
110 },
111 expectSuccess: true,
112 outHolder: "baz",
113 },
114 {
115 name: "acquire from object without annotations",
116 reactors: []Reactor{
117 {
118 verb: "get",
119 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
120 return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), nil), nil
121 },
122 },
123 {
124 verb: "update",
125 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
126 return true, action.(fakeclient.CreateAction).GetObject(), nil
127 },
128 },
129 },
130 expectSuccess: true,
131 transitionLeader: true,
132 outHolder: "baz",
133 },
134 {
135 name: "acquire from led object with the lease duration seconds",
136 reactors: []Reactor{
137 {
138 verb: "get",
139 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
140 return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing", LeaseDurationSeconds: 3}), nil
141 },
142 },
143 {
144 verb: "get",
145 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
146 return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing", LeaseDurationSeconds: 3}), nil
147 },
148 },
149 {
150 verb: "update",
151 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
152 return true, action.(fakeclient.CreateAction).GetObject(), nil
153 },
154 },
155 },
156 retryAfter: 3 * time.Second,
157 expectSuccess: true,
158 transitionLeader: true,
159 outHolder: "baz",
160 },
161 {
162 name: "acquire from unled object",
163 reactors: []Reactor{
164 {
165 verb: "get",
166 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
167 return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{}), nil
168 },
169 },
170 {
171 verb: "update",
172 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
173 return true, action.(fakeclient.CreateAction).GetObject(), nil
174 },
175 },
176 },
177
178 expectSuccess: true,
179 transitionLeader: true,
180 outHolder: "baz",
181 },
182 {
183 name: "acquire from led, unacked object",
184 reactors: []Reactor{
185 {
186 verb: "get",
187 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
188 return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
189 },
190 },
191 {
192 verb: "update",
193 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
194 return true, action.(fakeclient.CreateAction).GetObject(), nil
195 },
196 },
197 },
198 observedRecord: rl.LeaderElectionRecord{HolderIdentity: "bing"},
199 observedTime: past,
200
201 expectSuccess: true,
202 transitionLeader: true,
203 outHolder: "baz",
204 },
205 {
206 name: "acquire from empty led, acked object",
207 reactors: []Reactor{
208 {
209 verb: "get",
210 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
211 return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: ""}), nil
212 },
213 },
214 {
215 verb: "update",
216 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
217 return true, action.(fakeclient.CreateAction).GetObject(), nil
218 },
219 },
220 },
221 observedTime: future,
222
223 expectSuccess: true,
224 transitionLeader: true,
225 outHolder: "baz",
226 },
227 {
228 name: "don't acquire from led, acked object",
229 reactors: []Reactor{
230 {
231 verb: "get",
232 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
233 return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "bing"}), nil
234 },
235 },
236 },
237 observedTime: future,
238
239 expectSuccess: false,
240 outHolder: "bing",
241 },
242 {
243 name: "renew already acquired object",
244 reactors: []Reactor{
245 {
246 verb: "get",
247 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
248 return true, createLockObject(t, objectType, action.GetNamespace(), action.(fakeclient.GetAction).GetName(), &rl.LeaderElectionRecord{HolderIdentity: "baz"}), nil
249 },
250 },
251 {
252 verb: "update",
253 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
254 return true, action.(fakeclient.CreateAction).GetObject(), nil
255 },
256 },
257 },
258 observedTime: future,
259 observedRecord: rl.LeaderElectionRecord{HolderIdentity: "baz"},
260
261 expectSuccess: true,
262 outHolder: "baz",
263 },
264 }
265
266 for i := range tests {
267 test := &tests[i]
268 t.Run(test.name, func(t *testing.T) {
269
270 var wg sync.WaitGroup
271 wg.Add(1)
272 var reportedLeader string
273 var lock rl.Interface
274
275 objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
276 recorder := record.NewFakeRecorder(100)
277 resourceLockConfig := rl.ResourceLockConfig{
278 Identity: "baz",
279 EventRecorder: recorder,
280 }
281 c := &fake.Clientset{}
282 for _, reactor := range test.reactors {
283 c.AddReactor(reactor.verb, objectType, reactor.reaction)
284 }
285 c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
286 t.Errorf("unreachable action. testclient called too many times: %+v", action)
287 return true, nil, fmt.Errorf("unreachable action")
288 })
289
290 switch objectType {
291 case "leases":
292 lock = &rl.LeaseLock{
293 LeaseMeta: objectMeta,
294 LockConfig: resourceLockConfig,
295 Client: c.CoordinationV1(),
296 }
297 default:
298 t.Fatalf("Unknown objectType: %v", objectType)
299 }
300
301 lec := LeaderElectionConfig{
302 Lock: lock,
303 LeaseDuration: 10 * time.Second,
304 Callbacks: LeaderCallbacks{
305 OnNewLeader: func(l string) {
306 defer wg.Done()
307 reportedLeader = l
308 },
309 },
310 }
311 observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
312 le := &LeaderElector{
313 config: lec,
314 observedRecord: test.observedRecord,
315 observedRawRecord: observedRawRecord,
316 observedTime: test.observedTime,
317 clock: clock,
318 metrics: globalMetricsFactory.newLeaderMetrics(),
319 }
320 if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
321 if test.retryAfter != 0 {
322 time.Sleep(test.retryAfter)
323 if test.expectSuccess != le.tryAcquireOrRenew(context.Background()) {
324 t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
325 }
326 } else {
327 t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", !test.expectSuccess)
328 }
329 }
330
331 le.observedRecord.AcquireTime = metav1.Time{}
332 le.observedRecord.RenewTime = metav1.Time{}
333 if le.observedRecord.HolderIdentity != test.outHolder {
334 t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
335 }
336 if len(test.reactors) != len(c.Actions()) {
337 t.Errorf("wrong number of api interactions")
338 }
339 if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
340 t.Errorf("leader should have transitioned but did not")
341 }
342 if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
343 t.Errorf("leader should not have transitioned but did")
344 }
345
346 le.maybeReportTransition()
347 wg.Wait()
348 if reportedLeader != test.outHolder {
349 t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
350 }
351 assertEqualEvents(t, test.expectedEvents, recorder.Events)
352 })
353 }
354 }
355
356
357 func TestTryAcquireOrRenewLeases(t *testing.T) {
358 testTryAcquireOrRenew(t, "leases")
359 }
360
361 func TestLeaseSpecToLeaderElectionRecordRoundTrip(t *testing.T) {
362 holderIdentity := "foo"
363 leaseDurationSeconds := int32(10)
364 leaseTransitions := int32(1)
365 oldSpec := coordinationv1.LeaseSpec{
366 HolderIdentity: &holderIdentity,
367 LeaseDurationSeconds: &leaseDurationSeconds,
368 AcquireTime: &metav1.MicroTime{Time: time.Now()},
369 RenewTime: &metav1.MicroTime{Time: time.Now()},
370 LeaseTransitions: &leaseTransitions,
371 }
372
373 oldRecord := rl.LeaseSpecToLeaderElectionRecord(&oldSpec)
374 newSpec := rl.LeaderElectionRecordToLeaseSpec(oldRecord)
375
376 if !equality.Semantic.DeepEqual(oldSpec, newSpec) {
377 t.Errorf("diff: %v", cmp.Diff(oldSpec, newSpec))
378 }
379
380 newRecord := rl.LeaseSpecToLeaderElectionRecord(&newSpec)
381
382 if !equality.Semantic.DeepEqual(oldRecord, newRecord) {
383 t.Errorf("diff: %v", cmp.Diff(oldRecord, newRecord))
384 }
385 }
386
387 func GetRawRecordOrDie(t *testing.T, objectType string, ler rl.LeaderElectionRecord) (ret []byte) {
388 var err error
389 switch objectType {
390 case "leases":
391 ret, err = json.Marshal(ler)
392 if err != nil {
393 t.Fatalf("lock %s get raw record %v failed: %v", objectType, ler, err)
394 }
395 default:
396 t.Fatal("unexpected objType:" + objectType)
397 }
398 return
399 }
400
401 func testReleaseLease(t *testing.T, objectType string) {
402 tests := []struct {
403 name string
404 observedRecord rl.LeaderElectionRecord
405 observedTime time.Time
406 reactors []Reactor
407 expectedEvents []string
408
409 expectSuccess bool
410 transitionLeader bool
411 outHolder string
412 }{
413 {
414 name: "release acquired lock from no object",
415 reactors: []Reactor{
416 {
417 verb: "get",
418 objectType: objectType,
419 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
420 return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
421 },
422 },
423 {
424 verb: "create",
425 objectType: objectType,
426 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
427 return true, action.(fakeclient.CreateAction).GetObject(), nil
428 },
429 },
430 {
431 verb: "update",
432 objectType: objectType,
433 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
434 return true, action.(fakeclient.UpdateAction).GetObject(), nil
435 },
436 },
437 },
438 expectSuccess: true,
439 outHolder: "",
440 },
441 }
442
443 for i := range tests {
444 test := &tests[i]
445 t.Run(test.name, func(t *testing.T) {
446
447 var wg sync.WaitGroup
448 wg.Add(1)
449 var reportedLeader string
450 var lock rl.Interface
451
452 objectMeta := metav1.ObjectMeta{Namespace: "foo", Name: "bar"}
453 recorder := record.NewFakeRecorder(100)
454 resourceLockConfig := rl.ResourceLockConfig{
455 Identity: "baz",
456 EventRecorder: recorder,
457 }
458 c := &fake.Clientset{}
459 for _, reactor := range test.reactors {
460 c.AddReactor(reactor.verb, objectType, reactor.reaction)
461 }
462 c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
463 t.Errorf("unreachable action. testclient called too many times: %+v", action)
464 return true, nil, fmt.Errorf("unreachable action")
465 })
466
467 switch objectType {
468 case "leases":
469 lock = &rl.LeaseLock{
470 LeaseMeta: objectMeta,
471 LockConfig: resourceLockConfig,
472 Client: c.CoordinationV1(),
473 }
474 default:
475 t.Fatalf("Unknown objectType: %v", objectType)
476 }
477
478 lec := LeaderElectionConfig{
479 Lock: lock,
480 LeaseDuration: 10 * time.Second,
481 Callbacks: LeaderCallbacks{
482 OnNewLeader: func(l string) {
483 defer wg.Done()
484 reportedLeader = l
485 },
486 },
487 }
488 observedRawRecord := GetRawRecordOrDie(t, objectType, test.observedRecord)
489 le := &LeaderElector{
490 config: lec,
491 observedRecord: test.observedRecord,
492 observedRawRecord: observedRawRecord,
493 observedTime: test.observedTime,
494 clock: clock.RealClock{},
495 metrics: globalMetricsFactory.newLeaderMetrics(),
496 }
497 if !le.tryAcquireOrRenew(context.Background()) {
498 t.Errorf("unexpected result of tryAcquireOrRenew: [succeeded=%v]", true)
499 }
500
501 le.maybeReportTransition()
502
503
504 wg.Wait()
505 wg.Add(1)
506
507 if test.expectSuccess != le.release() {
508 t.Errorf("unexpected result of release: [succeeded=%v]", !test.expectSuccess)
509 }
510
511 le.observedRecord.AcquireTime = metav1.Time{}
512 le.observedRecord.RenewTime = metav1.Time{}
513 if le.observedRecord.HolderIdentity != test.outHolder {
514 t.Errorf("expected holder:\n\t%+v\ngot:\n\t%+v", test.outHolder, le.observedRecord.HolderIdentity)
515 }
516 if len(test.reactors) != len(c.Actions()) {
517 t.Errorf("wrong number of api interactions")
518 }
519 if test.transitionLeader && le.observedRecord.LeaderTransitions != 1 {
520 t.Errorf("leader should have transitioned but did not")
521 }
522 if !test.transitionLeader && le.observedRecord.LeaderTransitions != 0 {
523 t.Errorf("leader should not have transitioned but did")
524 }
525 le.maybeReportTransition()
526 wg.Wait()
527 if reportedLeader != test.outHolder {
528 t.Errorf("reported leader was not the new leader. expected %q, got %q", test.outHolder, reportedLeader)
529 }
530 assertEqualEvents(t, test.expectedEvents, recorder.Events)
531 })
532 }
533 }
534
535
536 func TestReleaseLeaseLeases(t *testing.T) {
537 testReleaseLease(t, "leases")
538 }
539
540 func TestReleaseOnCancellation_Leases(t *testing.T) {
541 testReleaseOnCancellation(t, "leases")
542 }
543
544 func testReleaseOnCancellation(t *testing.T, objectType string) {
545 var (
546 onNewLeader = make(chan struct{})
547 onRenewCalled = make(chan struct{})
548 onRenewResume = make(chan struct{})
549 onRelease = make(chan struct{})
550
551 lockObj runtime.Object
552 gets int
553 updates int
554 wg sync.WaitGroup
555 )
556 resetVars := func() {
557 onNewLeader = make(chan struct{})
558 onRenewCalled = make(chan struct{})
559 onRenewResume = make(chan struct{})
560 onRelease = make(chan struct{})
561
562 lockObj = nil
563 gets = 0
564 updates = 0
565 }
566 lec := LeaderElectionConfig{
567 LeaseDuration: 15 * time.Second,
568 RenewDeadline: 2 * time.Second,
569 RetryPeriod: 1 * time.Second,
570
571
572 ReleaseOnCancel: true,
573
574 Callbacks: LeaderCallbacks{
575 OnNewLeader: func(identity string) {},
576 OnStoppedLeading: func() {},
577 OnStartedLeading: func(context.Context) {
578 close(onNewLeader)
579 },
580 },
581 }
582
583 tests := []struct {
584 name string
585 reactors []Reactor
586 expectedEvents []string
587 }{
588 {
589 name: "release acquired lock on cancellation of update",
590 reactors: []Reactor{
591 {
592 verb: "get",
593 objectType: objectType,
594 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
595 gets++
596 if lockObj != nil {
597 return true, lockObj, nil
598 }
599 return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
600 },
601 },
602 {
603 verb: "create",
604 objectType: objectType,
605 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
606 lockObj = action.(fakeclient.CreateAction).GetObject()
607 return true, lockObj, nil
608 },
609 },
610 {
611 verb: "update",
612 objectType: objectType,
613 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
614 updates++
615
616 if updates%2 == 1 && updates < 5 {
617 return true, nil, context.Canceled
618 }
619
620
621
622 if updates == 4 {
623 close(onRenewCalled)
624 <-onRenewResume
625 return true, nil, context.Canceled
626 } else if updates == 5 {
627
628
629 defer wg.Done()
630 close(onRelease)
631 }
632
633 lockObj = action.(fakeclient.UpdateAction).GetObject()
634 return true, lockObj, nil
635 },
636 },
637 },
638 expectedEvents: []string{
639 "Normal LeaderElection baz became leader",
640 "Normal LeaderElection baz stopped leading",
641 },
642 },
643 {
644 name: "release acquired lock on cancellation of get",
645 reactors: []Reactor{
646 {
647 verb: "get",
648 objectType: objectType,
649 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
650 gets++
651 if lockObj != nil {
652
653
654 if gets >= 3 {
655 close(onRenewCalled)
656 <-onRenewResume
657 return true, nil, context.Canceled
658 }
659 return true, lockObj, nil
660 }
661 return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
662 },
663 },
664 {
665 verb: "create",
666 objectType: objectType,
667 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
668 lockObj = action.(fakeclient.CreateAction).GetObject()
669 return true, lockObj, nil
670 },
671 },
672 {
673 verb: "update",
674 objectType: objectType,
675 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
676 updates++
677
678 if updates%2 == 1 {
679 return true, nil, context.Canceled
680 }
681
682 if updates == 4 {
683
684
685 defer wg.Done()
686 close(onRelease)
687 }
688
689 lockObj = action.(fakeclient.UpdateAction).GetObject()
690 return true, lockObj, nil
691 },
692 },
693 },
694 expectedEvents: []string{
695 "Normal LeaderElection baz became leader",
696 "Normal LeaderElection baz stopped leading",
697 },
698 },
699 }
700
701 for i := range tests {
702 test := &tests[i]
703 t.Run(test.name, func(t *testing.T) {
704 wg.Add(1)
705 resetVars()
706
707 recorder := record.NewFakeRecorder(100)
708 resourceLockConfig := rl.ResourceLockConfig{
709 Identity: "baz",
710 EventRecorder: recorder,
711 }
712 c := &fake.Clientset{}
713 for _, reactor := range test.reactors {
714 c.AddReactor(reactor.verb, objectType, reactor.reaction)
715 }
716 c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
717 t.Errorf("unreachable action. testclient called too many times: %+v", action)
718 return true, nil, fmt.Errorf("unreachable action")
719 })
720 lock, err := rl.New(objectType, "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
721 if err != nil {
722 t.Fatal("resourcelock.New() = ", err)
723 }
724
725 lec.Lock = lock
726 elector, err := NewLeaderElector(lec)
727 if err != nil {
728 t.Fatal("Failed to create leader elector: ", err)
729 }
730
731 ctx, cancel := context.WithCancel(context.Background())
732
733 go elector.Run(ctx)
734
735
736 select {
737 case <-onNewLeader:
738 case <-time.After(10 * time.Second):
739 t.Fatal("failed to become the leader")
740 }
741
742
743 select {
744 case <-onRenewCalled:
745 case <-time.After(10 * time.Second):
746 t.Fatal("the elector failed to renew the lock")
747 }
748
749
750
751 cancel()
752
753
754
755 close(onRenewResume)
756
757 select {
758 case <-onRelease:
759 case <-time.After(10 * time.Second):
760 t.Fatal("the lock was not released")
761 }
762 wg.Wait()
763 assertEqualEvents(t, test.expectedEvents, recorder.Events)
764 })
765 }
766 }
767
768 func TestLeaderElectionConfigValidation(t *testing.T) {
769 resourceLockConfig := rl.ResourceLockConfig{
770 Identity: "baz",
771 }
772
773 lock := &rl.LeaseLock{
774 LockConfig: resourceLockConfig,
775 }
776
777 lec := LeaderElectionConfig{
778 Lock: lock,
779 LeaseDuration: 15 * time.Second,
780 RenewDeadline: 2 * time.Second,
781 RetryPeriod: 1 * time.Second,
782
783 ReleaseOnCancel: true,
784
785 Callbacks: LeaderCallbacks{
786 OnNewLeader: func(identity string) {},
787 OnStoppedLeading: func() {},
788 OnStartedLeading: func(context.Context) {},
789 },
790 }
791
792 _, err := NewLeaderElector(lec)
793 assert.NoError(t, err)
794
795
796 resourceLockConfig.Identity = ""
797 lock.LockConfig = resourceLockConfig
798 lec.Lock = lock
799 _, err = NewLeaderElector(lec)
800 assert.Error(t, err, fmt.Errorf("Lock identity is empty"))
801 }
802
803 func assertEqualEvents(t *testing.T, expected []string, actual <-chan string) {
804 c := time.After(wait.ForeverTestTimeout)
805 for _, e := range expected {
806 select {
807 case a := <-actual:
808 if e != a {
809 t.Errorf("Expected event %q, got %q", e, a)
810 return
811 }
812 case <-c:
813 t.Errorf("Expected event %q, got nothing", e)
814
815 }
816 }
817 for {
818 select {
819 case a := <-actual:
820 t.Errorf("Unexpected event: %q", a)
821 default:
822 return
823 }
824 }
825 }
826
827 func TestFastPathLeaderElection(t *testing.T) {
828 objectType := "leases"
829 var (
830 lockObj runtime.Object
831 updates int
832 lockOps []string
833 cancelFunc func()
834 )
835 resetVars := func() {
836 lockObj = nil
837 updates = 0
838 lockOps = []string{}
839 cancelFunc = nil
840 }
841 lec := LeaderElectionConfig{
842 LeaseDuration: 15 * time.Second,
843 RenewDeadline: 2 * time.Second,
844 RetryPeriod: 1 * time.Second,
845
846 Callbacks: LeaderCallbacks{
847 OnNewLeader: func(identity string) {},
848 OnStoppedLeading: func() {},
849 OnStartedLeading: func(context.Context) {
850 },
851 },
852 }
853
854 tests := []struct {
855 name string
856 reactors []Reactor
857 expectedLockOps []string
858 }{
859 {
860 name: "Exercise fast path after lock acquired",
861 reactors: []Reactor{
862 {
863 verb: "get",
864 objectType: objectType,
865 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
866 lockOps = append(lockOps, "get")
867 if lockObj != nil {
868 return true, lockObj, nil
869 }
870 return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
871 },
872 },
873 {
874 verb: "create",
875 objectType: objectType,
876 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
877 lockOps = append(lockOps, "create")
878 lockObj = action.(fakeclient.CreateAction).GetObject()
879 return true, lockObj, nil
880 },
881 },
882 {
883 verb: "update",
884 objectType: objectType,
885 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
886 updates++
887 lockOps = append(lockOps, "update")
888 if updates == 2 {
889 cancelFunc()
890 }
891 lockObj = action.(fakeclient.UpdateAction).GetObject()
892 return true, lockObj, nil
893 },
894 },
895 },
896 expectedLockOps: []string{"get", "create", "update", "update"},
897 },
898 {
899 name: "Fallback to slow path after fast path fails",
900 reactors: []Reactor{
901 {
902 verb: "get",
903 objectType: objectType,
904 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
905 lockOps = append(lockOps, "get")
906 if lockObj != nil {
907 return true, lockObj, nil
908 }
909 return true, nil, errors.NewNotFound(action.(fakeclient.GetAction).GetResource().GroupResource(), action.(fakeclient.GetAction).GetName())
910 },
911 },
912 {
913 verb: "create",
914 objectType: objectType,
915 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
916 lockOps = append(lockOps, "create")
917 lockObj = action.(fakeclient.CreateAction).GetObject()
918 return true, lockObj, nil
919 },
920 },
921 {
922 verb: "update",
923 objectType: objectType,
924 reaction: func(action fakeclient.Action) (handled bool, ret runtime.Object, err error) {
925 updates++
926 lockOps = append(lockOps, "update")
927 switch updates {
928 case 2:
929 return true, nil, errors.NewConflict(action.(fakeclient.UpdateAction).GetResource().GroupResource(), "fake conflict", nil)
930 case 4:
931 cancelFunc()
932 }
933 lockObj = action.(fakeclient.UpdateAction).GetObject()
934 return true, lockObj, nil
935 },
936 },
937 },
938 expectedLockOps: []string{"get", "create", "update", "update", "get", "update", "update"},
939 },
940 }
941
942 for i := range tests {
943 test := &tests[i]
944 t.Run(test.name, func(t *testing.T) {
945 resetVars()
946
947 recorder := record.NewFakeRecorder(100)
948 resourceLockConfig := rl.ResourceLockConfig{
949 Identity: "baz",
950 EventRecorder: recorder,
951 }
952 c := &fake.Clientset{}
953 for _, reactor := range test.reactors {
954 c.AddReactor(reactor.verb, objectType, reactor.reaction)
955 }
956 c.AddReactor("*", "*", func(action fakeclient.Action) (bool, runtime.Object, error) {
957 t.Errorf("unreachable action. testclient called too many times: %+v", action)
958 return true, nil, fmt.Errorf("unreachable action")
959 })
960 lock, err := rl.New("leases", "foo", "bar", c.CoreV1(), c.CoordinationV1(), resourceLockConfig)
961 if err != nil {
962 t.Fatal("resourcelock.New() = ", err)
963 }
964
965 lec.Lock = lock
966 elector, err := NewLeaderElector(lec)
967 if err != nil {
968 t.Fatal("Failed to create leader elector: ", err)
969 }
970
971 ctx, cancel := context.WithCancel(context.Background())
972 cancelFunc = cancel
973
974 elector.Run(ctx)
975 assert.Equal(t, test.expectedLockOps, lockOps, "Expected lock ops %q, got %q", test.expectedLockOps, lockOps)
976 })
977 }
978 }
979
View as plain text