...
1 package manager
2
3 import (
4 "context"
5 "errors"
6 "sync"
7
8 "sigs.k8s.io/controller-runtime/pkg/webhook"
9 )
10
11 var (
12 errRunnableGroupStopped = errors.New("can't accept new runnable as stop procedure is already engaged")
13 )
14
15
16
17 type readyRunnable struct {
18 Runnable
19 Check runnableCheck
20 signalReady bool
21 }
22
23
24
25
26 type runnableCheck func(ctx context.Context) bool
27
28
29
30 type runnables struct {
31 HTTPServers *runnableGroup
32 Webhooks *runnableGroup
33 Caches *runnableGroup
34 LeaderElection *runnableGroup
35 Others *runnableGroup
36 }
37
38
39 func newRunnables(baseContext BaseContextFunc, errChan chan error) *runnables {
40 return &runnables{
41 HTTPServers: newRunnableGroup(baseContext, errChan),
42 Webhooks: newRunnableGroup(baseContext, errChan),
43 Caches: newRunnableGroup(baseContext, errChan),
44 LeaderElection: newRunnableGroup(baseContext, errChan),
45 Others: newRunnableGroup(baseContext, errChan),
46 }
47 }
48
49
50
51
52
53
54
55 func (r *runnables) Add(fn Runnable) error {
56 switch runnable := fn.(type) {
57 case *Server:
58 if runnable.NeedLeaderElection() {
59 return r.LeaderElection.Add(fn, nil)
60 }
61 return r.HTTPServers.Add(fn, nil)
62 case hasCache:
63 return r.Caches.Add(fn, func(ctx context.Context) bool {
64 return runnable.GetCache().WaitForCacheSync(ctx)
65 })
66 case webhook.Server:
67 return r.Webhooks.Add(fn, nil)
68 case LeaderElectionRunnable:
69 if !runnable.NeedLeaderElection() {
70 return r.Others.Add(fn, nil)
71 }
72 return r.LeaderElection.Add(fn, nil)
73 default:
74 return r.LeaderElection.Add(fn, nil)
75 }
76 }
77
78
79
80
81
82
83 type runnableGroup struct {
84 ctx context.Context
85 cancel context.CancelFunc
86
87 start sync.Mutex
88 startOnce sync.Once
89 started bool
90 startQueue []*readyRunnable
91 startReadyCh chan *readyRunnable
92
93 stop sync.RWMutex
94 stopOnce sync.Once
95 stopped bool
96
97
98
99
100 errChan chan error
101
102
103 ch chan *readyRunnable
104
105
106
107 wg *sync.WaitGroup
108 }
109
110 func newRunnableGroup(baseContext BaseContextFunc, errChan chan error) *runnableGroup {
111 r := &runnableGroup{
112 startReadyCh: make(chan *readyRunnable),
113 errChan: errChan,
114 ch: make(chan *readyRunnable),
115 wg: new(sync.WaitGroup),
116 }
117
118 r.ctx, r.cancel = context.WithCancel(baseContext())
119 return r
120 }
121
122
123 func (r *runnableGroup) Started() bool {
124 r.start.Lock()
125 defer r.start.Unlock()
126 return r.started
127 }
128
129
130
131
132 func (r *runnableGroup) Start(ctx context.Context) error {
133 var retErr error
134
135 r.startOnce.Do(func() {
136 defer close(r.startReadyCh)
137
138
139 go r.reconcile()
140
141
142
143 r.start.Lock()
144 r.started = true
145 for _, rn := range r.startQueue {
146 rn.signalReady = true
147 r.ch <- rn
148 }
149 r.start.Unlock()
150
151
152 if len(r.startQueue) == 0 {
153 return
154 }
155
156
157 for {
158 select {
159 case <-ctx.Done():
160 if err := ctx.Err(); !errors.Is(err, context.Canceled) {
161 retErr = err
162 }
163 case rn := <-r.startReadyCh:
164 for i, existing := range r.startQueue {
165 if existing == rn {
166
167 r.startQueue = append(r.startQueue[:i], r.startQueue[i+1:]...)
168 break
169 }
170 }
171
172 if len(r.startQueue) == 0 {
173 return
174 }
175 }
176 }
177 })
178
179 return retErr
180 }
181
182
183
184
185 func (r *runnableGroup) reconcile() {
186 for runnable := range r.ch {
187
188
189
190
191 {
192 r.stop.RLock()
193 if r.stopped {
194
195 r.errChan <- errRunnableGroupStopped
196 r.stop.RUnlock()
197 continue
198 }
199
200
201
202
203
204
205 r.wg.Add(1)
206 r.stop.RUnlock()
207 }
208
209
210 go func(rn *readyRunnable) {
211 go func() {
212 if rn.Check(r.ctx) {
213 if rn.signalReady {
214 r.startReadyCh <- rn
215 }
216 }
217 }()
218
219
220
221
222
223 defer r.wg.Done()
224
225
226 if err := rn.Start(r.ctx); err != nil {
227 r.errChan <- err
228 }
229 }(runnable)
230 }
231 }
232
233
234
235 func (r *runnableGroup) Add(rn Runnable, ready runnableCheck) error {
236 r.stop.RLock()
237 if r.stopped {
238 r.stop.RUnlock()
239 return errRunnableGroupStopped
240 }
241 r.stop.RUnlock()
242
243 if ready == nil {
244 ready = func(_ context.Context) bool { return true }
245 }
246
247 readyRunnable := &readyRunnable{
248 Runnable: rn,
249 Check: ready,
250 }
251
252
253
254
255
256 {
257 r.start.Lock()
258
259
260 if !r.started {
261
262 r.startQueue = append(r.startQueue, readyRunnable)
263 r.start.Unlock()
264 return nil
265 }
266 r.start.Unlock()
267 }
268
269
270
271
272 r.stop.RLock()
273 defer r.stop.RUnlock()
274 if r.stopped {
275 return errRunnableGroupStopped
276 }
277
278
279 r.ch <- readyRunnable
280 return nil
281 }
282
283
284 func (r *runnableGroup) StopAndWait(ctx context.Context) {
285 r.stopOnce.Do(func() {
286
287 defer func() {
288 r.stop.Lock()
289 close(r.ch)
290 r.stop.Unlock()
291 }()
292
293 _ = r.Start(ctx)
294 r.stop.Lock()
295
296
297 r.stopped = true
298 r.stop.Unlock()
299
300
301 r.cancel()
302
303 done := make(chan struct{})
304 go func() {
305 defer close(done)
306
307 r.wg.Wait()
308 }()
309
310 select {
311 case <-done:
312
313 case <-ctx.Done():
314
315 }
316 })
317 }
318
View as plain text