...
1 package manager
2
3 import (
4 "context"
5 "errors"
6 "sync"
7 )
8
9 var (
10 errRunnableGroupStopped = errors.New("can't accept new runnable as stop procedure is already engaged")
11 )
12
13
14 type readyRunnable struct {
15 Runnable
16 Check runnableCheck
17 signalReady bool
18 }
19
20
21
22
23 type runnableCheck func(ctx context.Context) bool
24
25
26
27 type runnables struct {
28
29
30
31 Others *runnableGroup
32 }
33
34
35 func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
36 return &runnables{
37
38
39
40 Others: newRunnableGroup(baseContext, errChan),
41 }
42 }
43
44
45
46
47
48
49
50 func (r *runnables) Add(fn Runnable) error {
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66 return r.Others.Add(fn, nil)
67 }
68
69
70
71
72
73
74 type runnableGroup struct {
75 ctx context.Context
76 cancel context.CancelFunc
77
78 start sync.Mutex
79 startOnce sync.Once
80 started bool
81 startQueue []*readyRunnable
82 startReadyCh chan *readyRunnable
83
84 stop sync.RWMutex
85 stopOnce sync.Once
86 stopped bool
87
88
89
90
91 errChan chan error
92
93
94 ch chan *readyRunnable
95
96
97
98 wg *sync.WaitGroup
99 }
100
101 func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup {
102 r := &runnableGroup{
103 startReadyCh: make(chan *readyRunnable),
104 errChan: errChan,
105 ch: make(chan *readyRunnable),
106 wg: new(sync.WaitGroup),
107 }
108
109 r.ctx, r.cancel = context.WithCancel(baseContext())
110 return r
111 }
112
113
114 func (r *runnableGroup) Started() bool {
115 r.start.Lock()
116 defer r.start.Unlock()
117 return r.started
118 }
119
120
121
122
123 func (r *runnableGroup) Start(ctx context.Context) error {
124 var retErr error
125
126 r.startOnce.Do(func() {
127 defer close(r.startReadyCh)
128
129
130 go r.reconcile()
131
132
133
134 r.start.Lock()
135 r.started = true
136 for _, rn := range r.startQueue {
137 rn.signalReady = true
138 r.ch <- rn
139 }
140 r.start.Unlock()
141
142
143 if len(r.startQueue) == 0 {
144 return
145 }
146
147
148 for {
149 select {
150 case <-ctx.Done():
151 if err := ctx.Err(); !errors.Is(err, context.Canceled) {
152 retErr = err
153 }
154 case rn := <-r.startReadyCh:
155 for i, existing := range r.startQueue {
156 if existing == rn {
157
158 r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)
159 break
160 }
161 }
162
163 if len(r.startQueue) == 0 {
164 return
165 }
166 }
167 }
168 })
169
170 return retErr
171 }
172
173
174
175
176 func (r *runnableGroup) reconcile() {
177 for runnable := range r.ch {
178
179
180
181
182 {
183 r.stop.RLock()
184 if r.stopped {
185
186 r.errChan <- errRunnableGroupStopped
187 r.stop.RUnlock()
188 continue
189 }
190
191
192
193
194
195
196 r.wg.Add(1)
197 r.stop.RUnlock()
198 }
199
200
201 go func(rn *readyRunnable) {
202 go func() {
203 if rn.Check(r.ctx) {
204 if rn.signalReady {
205 r.startReadyCh <- rn
206 }
207 }
208 }()
209
210
211
212
213
214 defer r.wg.Done()
215
216
217 if err := rn.Start(r.ctx); err != nil {
218 r.errChan <- err
219 }
220 }(runnable)
221 }
222 }
223
224
225
226 func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
227 r.stop.RLock()
228 if r.stopped {
229 r.stop.RUnlock()
230 return errRunnableGroupStopped
231 }
232 r.stop.RUnlock()
233
234 if ready == nil {
235 ready = func(_ context.Context) bool { return true }
236 }
237
238 readyRunnable := &readyRunnable{
239 Runnable: rn,
240 Check: ready,
241 }
242
243
244
245
246
247 {
248 r.start.Lock()
249
250
251 if !r.started {
252
253 r.startQueue = append(r.startQueue, readyRunnable)
254 r.start.Unlock()
255 return nil
256 }
257 r.start.Unlock()
258 }
259
260
261 r.ch <- readyRunnable
262 return nil
263 }
264
265
266 func (r *runnableGroup) StopAndWait(ctx context.Context) {
267 r.stopOnce.Do(func() {
268
269 defer close(r.ch)
270
271 _ = r.Start(ctx)
272 r.stop.Lock()
273
274
275 r.stopped = true
276 r.stop.Unlock()
277
278
279 r.cancel()
280
281 done := make(chan struct{})
282 go func() {
283 defer close(done)
284
285 r.wg.Wait()
286 }()
287
288 select {
289 case <-done:
290
291 case <-ctx.Done():
292
293 }
294 })
295 }
296
View as plain text