1 package cron
2
3 import (
4 "io/ioutil"
5 "log"
6 "reflect"
7 "sync"
8 "testing"
9 "time"
10 )
11
12 func appendingJob(slice *[]int, value int) Job {
13 var m sync.Mutex
14 return FuncJob(func() {
15 m.Lock()
16 *slice = append(*slice, value)
17 m.Unlock()
18 })
19 }
20
21 func appendingWrapper(slice *[]int, value int) JobWrapper {
22 return func(j Job) Job {
23 return FuncJob(func() {
24 appendingJob(slice, value).Run()
25 j.Run()
26 })
27 }
28 }
29
30 func TestChain(t *testing.T) {
31 var nums []int
32 var (
33 append1 = appendingWrapper(&nums, 1)
34 append2 = appendingWrapper(&nums, 2)
35 append3 = appendingWrapper(&nums, 3)
36 append4 = appendingJob(&nums, 4)
37 )
38 NewChain(append1, append2, append3).Then(append4).Run()
39 if !reflect.DeepEqual(nums, []int{1, 2, 3, 4}) {
40 t.Error("unexpected order of calls:", nums)
41 }
42 }
43
44 func TestChainRecover(t *testing.T) {
45 panickingJob := FuncJob(func() {
46 panic("panickingJob panics")
47 })
48
49 t.Run("panic exits job by default", func(t *testing.T) {
50 defer func() {
51 if err := recover(); err == nil {
52 t.Errorf("panic expected, but none received")
53 }
54 }()
55 NewChain().Then(panickingJob).
56 Run()
57 })
58
59 t.Run("Recovering JobWrapper recovers", func(t *testing.T) {
60 NewChain(Recover(PrintfLogger(log.New(ioutil.Discard, "", 0)))).
61 Then(panickingJob).
62 Run()
63 })
64
65 t.Run("composed with the *IfStillRunning wrappers", func(t *testing.T) {
66 NewChain(Recover(PrintfLogger(log.New(ioutil.Discard, "", 0)))).
67 Then(panickingJob).
68 Run()
69 })
70 }
71
72 type countJob struct {
73 m sync.Mutex
74 started int
75 done int
76 delay time.Duration
77 }
78
79 func (j *countJob) Run() {
80 j.m.Lock()
81 j.started++
82 j.m.Unlock()
83 time.Sleep(j.delay)
84 j.m.Lock()
85 j.done++
86 j.m.Unlock()
87 }
88
89 func (j *countJob) Started() int {
90 defer j.m.Unlock()
91 j.m.Lock()
92 return j.started
93 }
94
95 func (j *countJob) Done() int {
96 defer j.m.Unlock()
97 j.m.Lock()
98 return j.done
99 }
100
101 func TestChainDelayIfStillRunning(t *testing.T) {
102
103 t.Run("runs immediately", func(t *testing.T) {
104 var j countJob
105 wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
106 go wrappedJob.Run()
107 time.Sleep(2 * time.Millisecond)
108 if c := j.Done(); c != 1 {
109 t.Errorf("expected job run once, immediately, got %d", c)
110 }
111 })
112
113 t.Run("second run immediate if first done", func(t *testing.T) {
114 var j countJob
115 wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
116 go func() {
117 go wrappedJob.Run()
118 time.Sleep(time.Millisecond)
119 go wrappedJob.Run()
120 }()
121 time.Sleep(3 * time.Millisecond)
122 if c := j.Done(); c != 2 {
123 t.Errorf("expected job run twice, immediately, got %d", c)
124 }
125 })
126
127 t.Run("second run delayed if first not done", func(t *testing.T) {
128 var j countJob
129 j.delay = 10 * time.Millisecond
130 wrappedJob := NewChain(DelayIfStillRunning(DiscardLogger)).Then(&j)
131 go func() {
132 go wrappedJob.Run()
133 time.Sleep(time.Millisecond)
134 go wrappedJob.Run()
135 }()
136
137
138
139 time.Sleep(5 * time.Millisecond)
140 started, done := j.Started(), j.Done()
141 if started != 1 || done != 0 {
142 t.Error("expected first job started, but not finished, got", started, done)
143 }
144
145
146 time.Sleep(25 * time.Millisecond)
147 started, done = j.Started(), j.Done()
148 if started != 2 || done != 2 {
149 t.Error("expected both jobs done, got", started, done)
150 }
151 })
152
153 }
154
155 func TestChainSkipIfStillRunning(t *testing.T) {
156
157 t.Run("runs immediately", func(t *testing.T) {
158 var j countJob
159 wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
160 go wrappedJob.Run()
161 time.Sleep(2 * time.Millisecond)
162 if c := j.Done(); c != 1 {
163 t.Errorf("expected job run once, immediately, got %d", c)
164 }
165 })
166
167 t.Run("second run immediate if first done", func(t *testing.T) {
168 var j countJob
169 wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
170 go func() {
171 go wrappedJob.Run()
172 time.Sleep(time.Millisecond)
173 go wrappedJob.Run()
174 }()
175 time.Sleep(3 * time.Millisecond)
176 if c := j.Done(); c != 2 {
177 t.Errorf("expected job run twice, immediately, got %d", c)
178 }
179 })
180
181 t.Run("second run skipped if first not done", func(t *testing.T) {
182 var j countJob
183 j.delay = 10 * time.Millisecond
184 wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
185 go func() {
186 go wrappedJob.Run()
187 time.Sleep(time.Millisecond)
188 go wrappedJob.Run()
189 }()
190
191
192
193 time.Sleep(5 * time.Millisecond)
194 started, done := j.Started(), j.Done()
195 if started != 1 || done != 0 {
196 t.Error("expected first job started, but not finished, got", started, done)
197 }
198
199
200 time.Sleep(25 * time.Millisecond)
201 started, done = j.Started(), j.Done()
202 if started != 1 || done != 1 {
203 t.Error("expected second job skipped, got", started, done)
204 }
205 })
206
207 t.Run("skip 10 jobs on rapid fire", func(t *testing.T) {
208 var j countJob
209 j.delay = 10 * time.Millisecond
210 wrappedJob := NewChain(SkipIfStillRunning(DiscardLogger)).Then(&j)
211 for i := 0; i < 11; i++ {
212 go wrappedJob.Run()
213 }
214 time.Sleep(200 * time.Millisecond)
215 done := j.Done()
216 if done != 1 {
217 t.Error("expected 1 jobs executed, 10 jobs dropped, got", done)
218 }
219 })
220
221 t.Run("different jobs independent", func(t *testing.T) {
222 var j1, j2 countJob
223 j1.delay = 10 * time.Millisecond
224 j2.delay = 10 * time.Millisecond
225 chain := NewChain(SkipIfStillRunning(DiscardLogger))
226 wrappedJob1 := chain.Then(&j1)
227 wrappedJob2 := chain.Then(&j2)
228 for i := 0; i < 11; i++ {
229 go wrappedJob1.Run()
230 go wrappedJob2.Run()
231 }
232 time.Sleep(100 * time.Millisecond)
233 var (
234 done1 = j1.Done()
235 done2 = j2.Done()
236 )
237 if done1 != 1 || done2 != 1 {
238 t.Error("expected both jobs executed once, got", done1, "and", done2)
239 }
240 })
241
242 }
243
View as plain text