...
1
2
3
4
5
6
7
8
9
10
11
12
13 package throttler
14
15 import (
16 "fmt"
17 "math"
18 "sync"
19 "sync/atomic"
20 )
21
22
23 type Throttler struct {
24 maxWorkers int32
25 workerCount int32
26 batchingTotal int32
27 batchSize int32
28 totalJobs int32
29 jobsStarted int32
30 jobsCompleted int32
31 doneChan chan struct{}
32 errsMutex *sync.Mutex
33 errs []error
34 errorCount int32
35 }
36
37
38
39 func New(maxWorkers, totalJobs int) *Throttler {
40 if maxWorkers < 1 {
41 panic("maxWorkers has to be at least 1")
42 }
43 return &Throttler{
44 maxWorkers: int32(maxWorkers),
45 batchSize: 1,
46 totalJobs: int32(totalJobs),
47 doneChan: make(chan struct{}, totalJobs),
48 errsMutex: &sync.Mutex{},
49 }
50 }
51
52
53 func NewBatchedThrottler(maxWorkers, batchingTotal, batchSize int) *Throttler {
54 totalJobs := int(math.Ceil(float64(batchingTotal) / float64(batchSize)))
55 t := New(maxWorkers, totalJobs)
56 t.batchSize = int32(batchSize)
57 t.batchingTotal = int32(batchingTotal)
58 return t
59 }
60
61
62
63
64 func (t *Throttler) SetMaxWorkers(maxWorkers int) {
65 if maxWorkers < 1 {
66 panic("maxWorkers has to be at least 1")
67 }
68 atomic.StoreInt32(&t.maxWorkers, int32(maxWorkers))
69 }
70
71
72
73
74
75
76 func (t *Throttler) Throttle() int {
77 if atomic.LoadInt32(&t.totalJobs) < 1 {
78 return int(atomic.LoadInt32(&t.errorCount))
79 }
80 atomic.AddInt32(&t.jobsStarted, 1)
81 atomic.AddInt32(&t.workerCount, 1)
82
83
84
85 if atomic.LoadInt32(&t.workerCount) == atomic.LoadInt32(&t.maxWorkers) {
86 atomic.AddInt32(&t.jobsCompleted, 1)
87 atomic.AddInt32(&t.workerCount, -1)
88 <-t.doneChan
89 }
90
91
92
93 if atomic.LoadInt32(&t.jobsStarted) == atomic.LoadInt32(&t.totalJobs) {
94 for atomic.LoadInt32(&t.jobsCompleted) < atomic.LoadInt32(&t.totalJobs) {
95 atomic.AddInt32(&t.jobsCompleted, 1)
96 <-t.doneChan
97 }
98 }
99
100 return int(atomic.LoadInt32(&t.errorCount))
101 }
102
103
104
105
106 func (t *Throttler) Done(err error) {
107 if err != nil {
108 t.errsMutex.Lock()
109 t.errs = append(t.errs, err)
110 atomic.AddInt32(&t.errorCount, 1)
111 t.errsMutex.Unlock()
112 }
113 t.doneChan <- struct{}{}
114 }
115
116
117 func (t *Throttler) Err() error {
118 t.errsMutex.Lock()
119 defer t.errsMutex.Unlock()
120 if atomic.LoadInt32(&t.errorCount) == 0 {
121 return nil
122 }
123 return multiError(t.errs)
124 }
125
126
127 func (t *Throttler) Errs() []error {
128 t.errsMutex.Lock()
129 defer t.errsMutex.Unlock()
130 return t.errs
131 }
132
133 type multiError []error
134
135 func (te multiError) Error() string {
136 errString := te[0].Error()
137 if len(te) > 1 {
138 errString += fmt.Sprintf(" (and %d more errors)", len(te)-1)
139 }
140 return errString
141 }
142
143
144
145
146 func (t *Throttler) BatchStartIndex() int {
147 return int(atomic.LoadInt32(&t.jobsStarted) * atomic.LoadInt32(&t.batchSize))
148 }
149
150
151
152
153
154 func (t *Throttler) BatchEndIndex() int {
155 end := (atomic.LoadInt32(&t.jobsStarted) + 1) * atomic.LoadInt32(&t.batchSize)
156 if end > atomic.LoadInt32(&t.batchingTotal) {
157 end = atomic.LoadInt32(&t.batchingTotal)
158 }
159 return int(end)
160 }
161
162
163 func (t *Throttler) TotalJobs() int {
164 return int(atomic.LoadInt32(&t.totalJobs))
165 }
166
View as plain text