...

Source file src/edge-infra.dev/pkg/edge/datasync/controllers/couchctl/leader_election.go

Documentation: edge-infra.dev/pkg/edge/datasync/controllers/couchctl

     1  package couchctl
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"sync"
     7  	"time"
     8  
     9  	"edge-infra.dev/pkg/lib/fog"
    10  
    11  	"github.com/go-logr/logr"
    12  
    13  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    14  	coordinationv1client "k8s.io/client-go/kubernetes/typed/coordination/v1"
    15  	"k8s.io/client-go/tools/leaderelection"
    16  	"k8s.io/client-go/tools/leaderelection/resourcelock"
    17  )
    18  
    19  type PersistenceLeaderElector interface {
    20  	IsLeader() bool
    21  }
    22  
    23  type PersistenceLeaderElectorFunc func() bool
    24  
    25  func (r PersistenceLeaderElectorFunc) IsLeader() bool {
    26  	return r()
    27  }
    28  
    29  type LeaderElection struct {
    30  	mu     sync.RWMutex
    31  	Lock   resourcelock.Interface
    32  	log    logr.Logger
    33  	id     string
    34  	leader bool
    35  }
    36  
    37  func NewLeaderElection(cs *coordinationv1client.CoordinationV1Client, namespace, name, id string) *LeaderElection {
    38  	return &LeaderElection{
    39  		id: id,
    40  		Lock: &resourcelock.LeaseLock{
    41  			LeaseMeta: metav1.ObjectMeta{
    42  				Name:      name,
    43  				Namespace: namespace,
    44  			},
    45  			Client:     cs,
    46  			LockConfig: resourcelock.ResourceLockConfig{Identity: id},
    47  		},
    48  		log: fog.WithLabels(fog.New(), "id", id),
    49  	}
    50  }
    51  
    52  func (l *LeaderElection) IsLeader() bool {
    53  	l.mu.RLock()
    54  	defer l.mu.RUnlock()
    55  	return l.leader
    56  }
    57  
    58  func (l *LeaderElection) OnNewLeader(ctx context.Context, cb func(leader bool)) {
    59  	leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
    60  		Lock:            l.Lock,
    61  		ReleaseOnCancel: true,
    62  		LeaseDuration:   15 * time.Second,
    63  		RenewDeadline:   10 * time.Second,
    64  		RetryPeriod:     2 * time.Second,
    65  		Callbacks: leaderelection.LeaderCallbacks{
    66  			OnStartedLeading: func(_ context.Context) {
    67  				l.log.Info("leader election started")
    68  			},
    69  			OnStoppedLeading: func() {
    70  				l.log.Info("leader election stopped")
    71  			},
    72  			OnNewLeader: func(identity string) {
    73  				l.mu.Lock()
    74  				defer l.mu.Unlock()
    75  				leader := identity == l.id
    76  				l.leader = leader
    77  				if cb != nil {
    78  					cb(leader)
    79  				}
    80  				l.log.Info(fmt.Sprintf("is new leader: %v", leader))
    81  			},
    82  		},
    83  	})
    84  }
    85  

View as plain text