1 package cron
2
3 import (
4 "context"
5 "sort"
6 "sync"
7 "time"
8 )
9
10
11
12
13 type Cron struct {
14 entries []*Entry
15 chain Chain
16 stop chan struct{}
17 add chan *Entry
18 remove chan EntryID
19 snapshot chan chan []Entry
20 running bool
21 logger Logger
22 runningMu sync.Mutex
23 location *time.Location
24 parser ScheduleParser
25 nextID EntryID
26 jobWaiter sync.WaitGroup
27 }
28
29
30 type ScheduleParser interface {
31 Parse(spec string) (Schedule, error)
32 }
33
34
35 type Job interface {
36 Run()
37 }
38
39
40 type Schedule interface {
41
42
43 Next(time.Time) time.Time
44 }
45
46
47 type EntryID int
48
49
50 type Entry struct {
51
52
53 ID EntryID
54
55
56 Schedule Schedule
57
58
59
60 Next time.Time
61
62
63 Prev time.Time
64
65
66 WrappedJob Job
67
68
69
70
71 Job Job
72 }
73
74
75 func (e Entry) Valid() bool { return e.ID != 0 }
76
77
78
79 type byTime []*Entry
80
81 func (s byTime) Len() int { return len(s) }
82 func (s byTime) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
83 func (s byTime) Less(i, j int) bool {
84
85
86
87 if s[i].Next.IsZero() {
88 return false
89 }
90 if s[j].Next.IsZero() {
91 return true
92 }
93 return s[i].Next.Before(s[j].Next)
94 }
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113 func New(opts ...Option) *Cron {
114 c := &Cron{
115 entries: nil,
116 chain: NewChain(),
117 add: make(chan *Entry),
118 stop: make(chan struct{}),
119 snapshot: make(chan chan []Entry),
120 remove: make(chan EntryID),
121 running: false,
122 runningMu: sync.Mutex{},
123 logger: DefaultLogger,
124 location: time.Local,
125 parser: standardParser,
126 }
127 for _, opt := range opts {
128 opt(c)
129 }
130 return c
131 }
132
133
134 type FuncJob func()
135
136 func (f FuncJob) Run() { f() }
137
138
139
140
141 func (c *Cron) AddFunc(spec string, cmd func()) (EntryID, error) {
142 return c.AddJob(spec, FuncJob(cmd))
143 }
144
145
146
147
148 func (c *Cron) AddJob(spec string, cmd Job) (EntryID, error) {
149 schedule, err := c.parser.Parse(spec)
150 if err != nil {
151 return 0, err
152 }
153 return c.Schedule(schedule, cmd), nil
154 }
155
156
157
158 func (c *Cron) Schedule(schedule Schedule, cmd Job) EntryID {
159 c.runningMu.Lock()
160 defer c.runningMu.Unlock()
161 c.nextID++
162 entry := &Entry{
163 ID: c.nextID,
164 Schedule: schedule,
165 WrappedJob: c.chain.Then(cmd),
166 Job: cmd,
167 }
168 if !c.running {
169 c.entries = append(c.entries, entry)
170 } else {
171 c.add <- entry
172 }
173 return entry.ID
174 }
175
176
177 func (c *Cron) Entries() []Entry {
178 c.runningMu.Lock()
179 defer c.runningMu.Unlock()
180 if c.running {
181 replyChan := make(chan []Entry, 1)
182 c.snapshot <- replyChan
183 return <-replyChan
184 }
185 return c.entrySnapshot()
186 }
187
188
189 func (c *Cron) Location() *time.Location {
190 return c.location
191 }
192
193
194 func (c *Cron) Entry(id EntryID) Entry {
195 for _, entry := range c.Entries() {
196 if id == entry.ID {
197 return entry
198 }
199 }
200 return Entry{}
201 }
202
203
204 func (c *Cron) Remove(id EntryID) {
205 c.runningMu.Lock()
206 defer c.runningMu.Unlock()
207 if c.running {
208 c.remove <- id
209 } else {
210 c.removeEntry(id)
211 }
212 }
213
214
215 func (c *Cron) Start() {
216 c.runningMu.Lock()
217 defer c.runningMu.Unlock()
218 if c.running {
219 return
220 }
221 c.running = true
222 go c.run()
223 }
224
225
226 func (c *Cron) Run() {
227 c.runningMu.Lock()
228 if c.running {
229 c.runningMu.Unlock()
230 return
231 }
232 c.running = true
233 c.runningMu.Unlock()
234 c.run()
235 }
236
237
238
239 func (c *Cron) run() {
240 c.logger.Info("start")
241
242
243 now := c.now()
244 for _, entry := range c.entries {
245 entry.Next = entry.Schedule.Next(now)
246 c.logger.Info("schedule", "now", now, "entry", entry.ID, "next", entry.Next)
247 }
248
249 for {
250
251 sort.Sort(byTime(c.entries))
252
253 var timer *time.Timer
254 if len(c.entries) == 0 || c.entries[0].Next.IsZero() {
255
256
257 timer = time.NewTimer(100000 * time.Hour)
258 } else {
259 timer = time.NewTimer(c.entries[0].Next.Sub(now))
260 }
261
262 for {
263 select {
264 case now = <-timer.C:
265 now = now.In(c.location)
266 c.logger.Info("wake", "now", now)
267
268
269 for _, e := range c.entries {
270 if e.Next.After(now) || e.Next.IsZero() {
271 break
272 }
273 c.startJob(e.WrappedJob)
274 e.Prev = e.Next
275 e.Next = e.Schedule.Next(now)
276 c.logger.Info("run", "now", now, "entry", e.ID, "next", e.Next)
277 }
278
279 case newEntry := <-c.add:
280 timer.Stop()
281 now = c.now()
282 newEntry.Next = newEntry.Schedule.Next(now)
283 c.entries = append(c.entries, newEntry)
284 c.logger.Info("added", "now", now, "entry", newEntry.ID, "next", newEntry.Next)
285
286 case replyChan := <-c.snapshot:
287 replyChan <- c.entrySnapshot()
288 continue
289
290 case <-c.stop:
291 timer.Stop()
292 c.logger.Info("stop")
293 return
294
295 case id := <-c.remove:
296 timer.Stop()
297 now = c.now()
298 c.removeEntry(id)
299 c.logger.Info("removed", "entry", id)
300 }
301
302 break
303 }
304 }
305 }
306
307
308 func (c *Cron) startJob(j Job) {
309 c.jobWaiter.Add(1)
310 go func() {
311 defer c.jobWaiter.Done()
312 j.Run()
313 }()
314 }
315
316
317 func (c *Cron) now() time.Time {
318 return time.Now().In(c.location)
319 }
320
321
322
323 func (c *Cron) Stop() context.Context {
324 c.runningMu.Lock()
325 defer c.runningMu.Unlock()
326 if c.running {
327 c.stop <- struct{}{}
328 c.running = false
329 }
330 ctx, cancel := context.WithCancel(context.Background())
331 go func() {
332 c.jobWaiter.Wait()
333 cancel()
334 }()
335 return ctx
336 }
337
338
339 func (c *Cron) entrySnapshot() []Entry {
340 var entries = make([]Entry, len(c.entries))
341 for i, e := range c.entries {
342 entries[i] = *e
343 }
344 return entries
345 }
346
347 func (c *Cron) removeEntry(id EntryID) {
348 var entries []*Entry
349 for _, e := range c.entries {
350 if e.ID != id {
351 entries = append(entries, e)
352 }
353 }
354 c.entries = entries
355 }
356
View as plain text