package couchctl import ( "context" "fmt" "sync" "time" "edge-infra.dev/pkg/lib/fog" "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1" "k8s.io/client-go/tools/leaderelection" "k8s.io/client-go/tools/leaderelection/resourcelock" ) type PersistenceLeaderElector interface { IsLeader() bool } type PersistenceLeaderElectorFunc func() bool func (r PersistenceLeaderElectorFunc) IsLeader() bool { return r() } type LeaderElection struct { mu sync.RWMutex Lock resourcelock.Interface log logr.Logger id string leader bool } func NewLeaderElection(cs *coordinationv1client.CoordinationV1Client, namespace, name, id string) *LeaderElection { return &LeaderElection{ id: id, Lock: &resourcelock.LeaseLock{ LeaseMeta: metav1.ObjectMeta{ Name: name, Namespace: namespace, }, Client: cs, LockConfig: resourcelock.ResourceLockConfig{Identity: id}, }, log: fog.WithLabels(fog.New(), "id", id), } } func (l *LeaderElection) IsLeader() bool { l.mu.RLock() defer l.mu.RUnlock() return l.leader } func (l *LeaderElection) OnNewLeader(ctx context.Context, cb func(leader bool)) { leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ Lock: l.Lock, ReleaseOnCancel: true, LeaseDuration: 15 * time.Second, RenewDeadline: 10 * time.Second, RetryPeriod: 2 * time.Second, Callbacks: leaderelection.LeaderCallbacks{ OnStartedLeading: func(_ context.Context) { l.log.Info("leader election started") }, OnStoppedLeading: func() { l.log.Info("leader election stopped") }, OnNewLeader: func(identity string) { l.mu.Lock() defer l.mu.Unlock() leader := identity == l.id l.leader = leader if cb != nil { cb(leader) } l.log.Info(fmt.Sprintf("is new leader: %v", leader)) }, }, }) }