...
1 package backoff
2
3 import (
4 "context"
5 "sync"
6 "time"
7 )
8
9 type controller struct {
10 ctx context.Context
11 cancel func()
12 ig IntervalGenerator
13 maxRetries int
14 mu *sync.RWMutex
15 next chan struct{}
16 resetTimer chan time.Duration
17 retries int
18 timer *time.Timer
19 }
20
21 func newController(ctx context.Context, ig IntervalGenerator, options ...ControllerOption) *controller {
22 cctx, cancel := context.WithCancel(ctx)
23
24 maxRetries := 10
25 for _, option := range options {
26 switch option.Ident() {
27 case identMaxRetries{}:
28 maxRetries = option.Value().(int)
29 }
30 }
31
32 c := &controller{
33 cancel: cancel,
34 ctx: cctx,
35 ig: ig,
36 maxRetries: maxRetries,
37 mu: &sync.RWMutex{},
38 next: make(chan struct{}, 1),
39 resetTimer: make(chan time.Duration, 1),
40 timer: time.NewTimer(ig.Next()),
41 }
42
43
44 c.next <- struct{}{}
45
46 go c.loop()
47 return c
48 }
49
50 func (c *controller) loop() {
51 for {
52 select {
53 case <-c.ctx.Done():
54 return
55 case d := <-c.resetTimer:
56 if !c.timer.Stop() {
57 select {
58 case <-c.timer.C:
59 default:
60 }
61 }
62 c.timer.Reset(d)
63 case <-c.timer.C:
64 select {
65 case <-c.ctx.Done():
66 return
67 case c.next <- struct{}{}:
68 }
69 if c.maxRetries > 0 {
70 c.retries++
71 }
72
73 if !c.check() {
74 c.cancel()
75 return
76 }
77 c.resetTimer <- c.ig.Next()
78 }
79 }
80 }
81
82 func (c *controller) check() bool {
83 if c.maxRetries > 0 && c.retries >= c.maxRetries {
84 return false
85 }
86 return true
87 }
88
89 func (c *controller) Done() <-chan struct{} {
90 c.mu.RLock()
91 defer c.mu.RUnlock()
92 return c.ctx.Done()
93 }
94
95 func (c *controller) Next() <-chan struct{} {
96 c.mu.RLock()
97 defer c.mu.RUnlock()
98 return c.next
99 }
100
View as plain text