1
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53 package leaderelection
54
55 import (
56 "bytes"
57 "context"
58 "fmt"
59 "sync"
60 "time"
61
62 "k8s.io/apimachinery/pkg/api/errors"
63 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
64 "k8s.io/apimachinery/pkg/util/runtime"
65 "k8s.io/apimachinery/pkg/util/wait"
66 rl "k8s.io/client-go/tools/leaderelection/resourcelock"
67 "k8s.io/klog/v2"
68 "k8s.io/utils/clock"
69 )
70
71 const (
72 JitterFactor = 1.2
73 )
74
75
76 func NewLeaderElector(lec LeaderElectionConfig) (*LeaderElector, error) {
77 if lec.LeaseDuration <= lec.RenewDeadline {
78 return nil, fmt.Errorf("leaseDuration must be greater than renewDeadline")
79 }
80 if lec.RenewDeadline <= time.Duration(JitterFactor*float64(lec.RetryPeriod)) {
81 return nil, fmt.Errorf("renewDeadline must be greater than retryPeriod*JitterFactor")
82 }
83 if lec.LeaseDuration < 1 {
84 return nil, fmt.Errorf("leaseDuration must be greater than zero")
85 }
86 if lec.RenewDeadline < 1 {
87 return nil, fmt.Errorf("renewDeadline must be greater than zero")
88 }
89 if lec.RetryPeriod < 1 {
90 return nil, fmt.Errorf("retryPeriod must be greater than zero")
91 }
92 if lec.Callbacks.OnStartedLeading == nil {
93 return nil, fmt.Errorf("OnStartedLeading callback must not be nil")
94 }
95 if lec.Callbacks.OnStoppedLeading == nil {
96 return nil, fmt.Errorf("OnStoppedLeading callback must not be nil")
97 }
98
99 if lec.Lock == nil {
100 return nil, fmt.Errorf("Lock must not be nil.")
101 }
102 id := lec.Lock.Identity()
103 if id == "" {
104 return nil, fmt.Errorf("Lock identity is empty")
105 }
106
107 le := LeaderElector{
108 config: lec,
109 clock: clock.RealClock{},
110 metrics: globalMetricsFactory.newLeaderMetrics(),
111 }
112 le.metrics.leaderOff(le.config.Name)
113 return &le, nil
114 }
115
116 type LeaderElectionConfig struct {
117
118 Lock rl.Interface
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133 LeaseDuration time.Duration
134
135
136
137
138 RenewDeadline time.Duration
139
140
141
142
143 RetryPeriod time.Duration
144
145
146
147 Callbacks LeaderCallbacks
148
149
150
151 WatchDog *HealthzAdaptor
152
153
154
155
156
157
158 ReleaseOnCancel bool
159
160
161 Name string
162 }
163
164
165
166
167
168
169 type LeaderCallbacks struct {
170
171 OnStartedLeading func(context.Context)
172
173 OnStoppedLeading func()
174
175
176
177 OnNewLeader func(identity string)
178 }
179
180
181 type LeaderElector struct {
182 config LeaderElectionConfig
183
184 observedRecord rl.LeaderElectionRecord
185 observedRawRecord []byte
186 observedTime time.Time
187
188
189
190 reportedLeader string
191
192
193 clock clock.Clock
194
195
196 observedRecordLock sync.Mutex
197
198 metrics leaderMetricsAdapter
199 }
200
201
202
203
204 func (le *LeaderElector) Run(ctx context.Context) {
205 defer runtime.HandleCrash()
206 defer le.config.Callbacks.OnStoppedLeading()
207
208 if !le.acquire(ctx) {
209 return
210 }
211 ctx, cancel := context.WithCancel(ctx)
212 defer cancel()
213 go le.config.Callbacks.OnStartedLeading(ctx)
214 le.renew(ctx)
215 }
216
217
218
219
220 func RunOrDie(ctx context.Context, lec LeaderElectionConfig) {
221 le, err := NewLeaderElector(lec)
222 if err != nil {
223 panic(err)
224 }
225 if lec.WatchDog != nil {
226 lec.WatchDog.SetLeaderElection(le)
227 }
228 le.Run(ctx)
229 }
230
231
232
233
234 func (le *LeaderElector) GetLeader() string {
235 return le.getObservedRecord().HolderIdentity
236 }
237
238
239 func (le *LeaderElector) IsLeader() bool {
240 return le.getObservedRecord().HolderIdentity == le.config.Lock.Identity()
241 }
242
243
244
245 func (le *LeaderElector) acquire(ctx context.Context) bool {
246 ctx, cancel := context.WithCancel(ctx)
247 defer cancel()
248 succeeded := false
249 desc := le.config.Lock.Describe()
250 klog.Infof("attempting to acquire leader lease %v...", desc)
251 wait.JitterUntil(func() {
252 succeeded = le.tryAcquireOrRenew(ctx)
253 le.maybeReportTransition()
254 if !succeeded {
255 klog.V(4).Infof("failed to acquire lease %v", desc)
256 return
257 }
258 le.config.Lock.RecordEvent("became leader")
259 le.metrics.leaderOn(le.config.Name)
260 klog.Infof("successfully acquired lease %v", desc)
261 cancel()
262 }, le.config.RetryPeriod, JitterFactor, true, ctx.Done())
263 return succeeded
264 }
265
266
267 func (le *LeaderElector) renew(ctx context.Context) {
268 defer le.config.Lock.RecordEvent("stopped leading")
269 ctx, cancel := context.WithCancel(ctx)
270 defer cancel()
271 wait.Until(func() {
272 timeoutCtx, timeoutCancel := context.WithTimeout(ctx, le.config.RenewDeadline)
273 defer timeoutCancel()
274 err := wait.PollImmediateUntil(le.config.RetryPeriod, func() (bool, error) {
275 return le.tryAcquireOrRenew(timeoutCtx), nil
276 }, timeoutCtx.Done())
277
278 le.maybeReportTransition()
279 desc := le.config.Lock.Describe()
280 if err == nil {
281 klog.V(5).Infof("successfully renewed lease %v", desc)
282 return
283 }
284 le.metrics.leaderOff(le.config.Name)
285 klog.Infof("failed to renew lease %v: %v", desc, err)
286 cancel()
287 }, le.config.RetryPeriod, ctx.Done())
288
289
290 if le.config.ReleaseOnCancel {
291 le.release()
292 }
293 }
294
295
296 func (le *LeaderElector) release() bool {
297 if !le.IsLeader() {
298 return true
299 }
300 now := metav1.NewTime(le.clock.Now())
301 leaderElectionRecord := rl.LeaderElectionRecord{
302 LeaderTransitions: le.observedRecord.LeaderTransitions,
303 LeaseDurationSeconds: 1,
304 RenewTime: now,
305 AcquireTime: now,
306 }
307 if err := le.config.Lock.Update(context.TODO(), leaderElectionRecord); err != nil {
308 klog.Errorf("Failed to release lock: %v", err)
309 return false
310 }
311
312 le.setObservedRecord(&leaderElectionRecord)
313 return true
314 }
315
316
317
318
319 func (le *LeaderElector) tryAcquireOrRenew(ctx context.Context) bool {
320 now := metav1.NewTime(le.clock.Now())
321 leaderElectionRecord := rl.LeaderElectionRecord{
322 HolderIdentity: le.config.Lock.Identity(),
323 LeaseDurationSeconds: int(le.config.LeaseDuration / time.Second),
324 RenewTime: now,
325 AcquireTime: now,
326 }
327
328
329
330 if le.IsLeader() && le.isLeaseValid(now.Time) {
331 oldObservedRecord := le.getObservedRecord()
332 leaderElectionRecord.AcquireTime = oldObservedRecord.AcquireTime
333 leaderElectionRecord.LeaderTransitions = oldObservedRecord.LeaderTransitions
334
335 err := le.config.Lock.Update(ctx, leaderElectionRecord)
336 if err == nil {
337 le.setObservedRecord(&leaderElectionRecord)
338 return true
339 }
340 klog.Errorf("Failed to update lock optimitically: %v, falling back to slow path", err)
341 }
342
343
344 oldLeaderElectionRecord, oldLeaderElectionRawRecord, err := le.config.Lock.Get(ctx)
345 if err != nil {
346 if !errors.IsNotFound(err) {
347 klog.Errorf("error retrieving resource lock %v: %v", le.config.Lock.Describe(), err)
348 return false
349 }
350 if err = le.config.Lock.Create(ctx, leaderElectionRecord); err != nil {
351 klog.Errorf("error initially creating leader election record: %v", err)
352 return false
353 }
354
355 le.setObservedRecord(&leaderElectionRecord)
356
357 return true
358 }
359
360
361 if !bytes.Equal(le.observedRawRecord, oldLeaderElectionRawRecord) {
362 le.setObservedRecord(oldLeaderElectionRecord)
363
364 le.observedRawRecord = oldLeaderElectionRawRecord
365 }
366 if len(oldLeaderElectionRecord.HolderIdentity) > 0 && le.isLeaseValid(now.Time) && !le.IsLeader() {
367 klog.V(4).Infof("lock is held by %v and has not yet expired", oldLeaderElectionRecord.HolderIdentity)
368 return false
369 }
370
371
372
373 if le.IsLeader() {
374 leaderElectionRecord.AcquireTime = oldLeaderElectionRecord.AcquireTime
375 leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions
376 le.metrics.slowpathExercised(le.config.Name)
377 } else {
378 leaderElectionRecord.LeaderTransitions = oldLeaderElectionRecord.LeaderTransitions + 1
379 }
380
381
382 if err = le.config.Lock.Update(ctx, leaderElectionRecord); err != nil {
383 klog.Errorf("Failed to update lock: %v", err)
384 return false
385 }
386
387 le.setObservedRecord(&leaderElectionRecord)
388 return true
389 }
390
391 func (le *LeaderElector) maybeReportTransition() {
392 if le.observedRecord.HolderIdentity == le.reportedLeader {
393 return
394 }
395 le.reportedLeader = le.observedRecord.HolderIdentity
396 if le.config.Callbacks.OnNewLeader != nil {
397 go le.config.Callbacks.OnNewLeader(le.reportedLeader)
398 }
399 }
400
401
402 func (le *LeaderElector) Check(maxTolerableExpiredLease time.Duration) error {
403 if !le.IsLeader() {
404
405 return nil
406 }
407
408
409
410 if le.clock.Since(le.observedTime) > le.config.LeaseDuration+maxTolerableExpiredLease {
411 return fmt.Errorf("failed election to renew leadership on lease %s", le.config.Name)
412 }
413
414 return nil
415 }
416
417 func (le *LeaderElector) isLeaseValid(now time.Time) bool {
418 return le.observedTime.Add(time.Second * time.Duration(le.getObservedRecord().LeaseDurationSeconds)).After(now)
419 }
420
421
422
423 func (le *LeaderElector) setObservedRecord(observedRecord *rl.LeaderElectionRecord) {
424 le.observedRecordLock.Lock()
425 defer le.observedRecordLock.Unlock()
426
427 le.observedRecord = *observedRecord
428 le.observedTime = le.clock.Now()
429 }
430
431
432
433 func (le *LeaderElector) getObservedRecord() rl.LeaderElectionRecord {
434 le.observedRecordLock.Lock()
435 defer le.observedRecordLock.Unlock()
436
437 return le.observedRecord
438 }
439
View as plain text