...
1 package cron
2
3 import (
4 "fmt"
5 "runtime"
6 "sync"
7 "time"
8 )
9
10
11 type JobWrapper func(Job) Job
12
13
14
15 type Chain struct {
16 wrappers []JobWrapper
17 }
18
19
20 func NewChain(c ...JobWrapper) Chain {
21 return Chain{c}
22 }
23
24
25
26
27
28
29
30 func (c Chain) Then(j Job) Job {
31 for i := range c.wrappers {
32 j = c.wrappers[len(c.wrappers)-i-1](j)
33 }
34 return j
35 }
36
37
38 func Recover(logger Logger) JobWrapper {
39 return func(j Job) Job {
40 return FuncJob(func() {
41 defer func() {
42 if r := recover(); r != nil {
43 const size = 64 << 10
44 buf := make([]byte, size)
45 buf = buf[:runtime.Stack(buf, false)]
46 err, ok := r.(error)
47 if !ok {
48 err = fmt.Errorf("%v", r)
49 }
50 logger.Error(err, "panic", "stack", "...\n"+string(buf))
51 }
52 }()
53 j.Run()
54 })
55 }
56 }
57
58
59
60
61 func DelayIfStillRunning(logger Logger) JobWrapper {
62 return func(j Job) Job {
63 var mu sync.Mutex
64 return FuncJob(func() {
65 start := time.Now()
66 mu.Lock()
67 defer mu.Unlock()
68 if dur := time.Since(start); dur > time.Minute {
69 logger.Info("delay", "duration", dur)
70 }
71 j.Run()
72 })
73 }
74 }
75
76
77
78 func SkipIfStillRunning(logger Logger) JobWrapper {
79 return func(j Job) Job {
80 var ch = make(chan struct{}, 1)
81 ch <- struct{}{}
82 return FuncJob(func() {
83 select {
84 case v := <-ch:
85 j.Run()
86 ch <- v
87 default:
88 logger.Info("skip")
89 }
90 })
91 }
92 }
93
View as plain text