...
1 package couchctl
2
3 import (
4 "context"
5 "fmt"
6 "sync"
7 "time"
8
9 "edge-infra.dev/pkg/lib/fog"
10
11 "github.com/go-logr/logr"
12
13 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
14 coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1"
15 "k8s.io/client-go/tools/leaderelection"
16 "k8s.io/client-go/tools/leaderelection/resourcelock"
17 )
18
19 type PersistenceLeaderElector interface {
20 IsLeader() bool
21 }
22
23 type PersistenceLeaderElectorFunc func() bool
24
25 func (r PersistenceLeaderElectorFunc) IsLeader() bool {
26 return r()
27 }
28
29 type LeaderElection struct {
30 mu sync.RWMutex
31 Lock resourcelock.Interface
32 log logr.Logger
33 id string
34 leader bool
35 }
36
37 func NewLeaderElection(cs *coordinationv1client.CoordinationV1Client, namespace, name, id string) *LeaderElection {
38 return &LeaderElection{
39 id: id,
40 Lock: &resourcelock.LeaseLock{
41 LeaseMeta: metav1.ObjectMeta{
42 Name: name,
43 Namespace: namespace,
44 },
45 Client: cs,
46 LockConfig: resourcelock.ResourceLockConfig{Identity: id},
47 },
48 log: fog.WithLabels(fog.New(), "id", id),
49 }
50 }
51
52 func (l *LeaderElection) IsLeader() bool {
53 l.mu.RLock()
54 defer l.mu.RUnlock()
55 return l.leader
56 }
57
58 func (l *LeaderElection) OnNewLeader(ctx context.Context, cb func(leader bool)) {
59 leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
60 Lock: l.Lock,
61 ReleaseOnCancel: true,
62 LeaseDuration: 15 * time.Second,
63 RenewDeadline: 10 * time.Second,
64 RetryPeriod: 2 * time.Second,
65 Callbacks: leaderelection.LeaderCallbacks{
66 OnStartedLeading: func(_ context.Context) {
67 l.log.Info("leader election started")
68 },
69 OnStoppedLeading: func() {
70 l.log.Info("leader election stopped")
71 },
72 OnNewLeader: func(identity string) {
73 l.mu.Lock()
74 defer l.mu.Unlock()
75 leader := identity == l.id
76 l.leader = leader
77 if cb != nil {
78 cb(leader)
79 }
80 l.log.Info(fmt.Sprintf("is new leader: %v", leader))
81 },
82 },
83 })
84 }
85
View as plain text