...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package schedule
16
17 import (
18 "context"
19 "sync"
20 )
21
22 type Job func(context.Context)
23
24
25 type Scheduler interface {
26
27
28 Schedule(j Job)
29
30
31 Pending() int
32
33
34 Scheduled() int
35
36
37 Finished() int
38
39
40 WaitFinish(n int)
41
42
43 Stop()
44 }
45
46 type fifo struct {
47 mu sync.Mutex
48
49 resume chan struct{}
50 scheduled int
51 finished int
52 pendings []Job
53
54 ctx context.Context
55 cancel context.CancelFunc
56
57 finishCond *sync.Cond
58 donec chan struct{}
59 }
60
61
62
63 func NewFIFOScheduler() Scheduler {
64 f := &fifo{
65 resume: make(chan struct{}, 1),
66 donec: make(chan struct{}, 1),
67 }
68 f.finishCond = sync.NewCond(&f.mu)
69 f.ctx, f.cancel = context.WithCancel(context.Background())
70 go f.run()
71 return f
72 }
73
74
75 func (f *fifo) Schedule(j Job) {
76 f.mu.Lock()
77 defer f.mu.Unlock()
78
79 if f.cancel == nil {
80 panic("schedule: schedule to stopped scheduler")
81 }
82
83 if len(f.pendings) == 0 {
84 select {
85 case f.resume <- struct{}{}:
86 default:
87 }
88 }
89 f.pendings = append(f.pendings, j)
90 }
91
92 func (f *fifo) Pending() int {
93 f.mu.Lock()
94 defer f.mu.Unlock()
95 return len(f.pendings)
96 }
97
98 func (f *fifo) Scheduled() int {
99 f.mu.Lock()
100 defer f.mu.Unlock()
101 return f.scheduled
102 }
103
104 func (f *fifo) Finished() int {
105 f.finishCond.L.Lock()
106 defer f.finishCond.L.Unlock()
107 return f.finished
108 }
109
110 func (f *fifo) WaitFinish(n int) {
111 f.finishCond.L.Lock()
112 for f.finished < n || len(f.pendings) != 0 {
113 f.finishCond.Wait()
114 }
115 f.finishCond.L.Unlock()
116 }
117
118
119 func (f *fifo) Stop() {
120 f.mu.Lock()
121 f.cancel()
122 f.cancel = nil
123 f.mu.Unlock()
124 <-f.donec
125 }
126
127 func (f *fifo) run() {
128
129 defer func() {
130 close(f.donec)
131 close(f.resume)
132 }()
133
134 for {
135 var todo Job
136 f.mu.Lock()
137 if len(f.pendings) != 0 {
138 f.scheduled++
139 todo = f.pendings[0]
140 }
141 f.mu.Unlock()
142 if todo == nil {
143 select {
144 case <-f.resume:
145 case <-f.ctx.Done():
146 f.mu.Lock()
147 pendings := f.pendings
148 f.pendings = nil
149 f.mu.Unlock()
150
151 for _, todo := range pendings {
152 todo(f.ctx)
153 }
154 return
155 }
156 } else {
157 todo(f.ctx)
158 f.finishCond.L.Lock()
159 f.finished++
160 f.pendings = f.pendings[1:]
161 f.finishCond.Broadcast()
162 f.finishCond.L.Unlock()
163 }
164 }
165 }
166
View as plain text