1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package adt
16
17 import (
18 "fmt"
19 "strings"
20 "testing"
21
22 "cuelang.org/go/internal/cuetest"
23 )
24
25 const (
26 c1 condition = 1 << iota
27 c2
28 c3
29 c4
30
31
32 auto
33 )
34
35
36 func TestScheduler(t *testing.T) {
37 ctx := &OpContext{
38 taskContext: taskContext{
39 counterMask: c1 | c2 | c3 | c4,
40 complete: func(s *scheduler) condition { return 0 },
41 },
42 }
43
44
45 nodeID := 0
46 w := &strings.Builder{}
47 nodes := []*nodeContext{}
48
49 node := func(parent *nodeContext) *nodeContext {
50 if nodeID == 0 {
51 if parent != nil {
52 t.Fatal("root node must be created first")
53 }
54 } else {
55 if parent == nil {
56 t.Fatal("non-root node must have parent")
57 }
58 }
59
60 n := &nodeContext{scheduler: scheduler{ctx: ctx}, refCount: nodeID}
61 nodeID++
62 nodes = append(nodes, n)
63 return n
64 }
65
66
67
68
69 type dep struct {
70 node *nodeContext
71 needs condition
72 }
73
74
75
76
77
78
79
80
81
82
83
84 process := func(name string, t *task, deps ...dep) (ok bool) {
85 fmt.Fprintf(w, "\n\t\t running task %s", name)
86 ok = true
87 for _, d := range deps {
88 func() {
89 defer func() {
90 if x := recover(); x != nil {
91 fmt.Fprintf(w, "\n\t\t task %s waiting for v%d meeting %x", name, d.node.refCount, d.needs)
92 fmt.Fprint(w, ": BLOCKED")
93 panic(x)
94 }
95 }()
96 if !d.node.process(d.needs, yield) {
97 ok = false
98 }
99 }()
100 }
101 return ok
102 }
103
104
105 success := func(name string, n *nodeContext, completes, needs condition, deps ...dep) *task {
106 t := &task{
107 run: &runner{
108 f: func(ctx *OpContext, t *task, mode runMode) {
109 process(name, t, deps...)
110 },
111 completes: completes,
112 needs: needs,
113 },
114 node: n,
115 x: &String{Str: name},
116 }
117 n.insertTask(t)
118 return t
119 }
120
121
122 signal := func(name string, n *nodeContext, completes condition, deps ...dep) *task {
123 t := &task{
124 run: &runner{
125 f: func(ctx *OpContext, t *task, mode runMode) {
126 if process(name, t, deps...) {
127 n.scheduler.signal(completes)
128 }
129 },
130 completes: completes,
131 },
132 node: n,
133 x: &String{Str: name},
134 }
135 n.insertTask(t)
136 return t
137 }
138
139
140 completes := func(name string, n, other *nodeContext, completes condition, deps ...dep) *task {
141 other.scheduler.incrementCounts(completes)
142 t := &task{
143 run: &runner{
144 f: func(ctx *OpContext, t *task, mode runMode) {
145 if process(name, t, deps...) {
146 other.scheduler.decrementCounts(completes)
147 }
148 },
149 completes: completes,
150 },
151 node: n,
152 x: &String{Str: name},
153 }
154 n.insertTask(t)
155 return t
156 }
157
158
159 fail := func(name string, n *nodeContext, completes, needs condition, deps ...dep) *task {
160 t := &task{
161
162 run: &runner{
163 f: func(ctx *OpContext, t *task, mode runMode) {
164 fmt.Fprintf(w, "\n\t\t running task %s:", name)
165 t.err = &Bottom{}
166 fmt.Fprint(w, " FAIL")
167 },
168 completes: completes,
169 needs: needs,
170 },
171 node: n,
172 x: &String{Str: name},
173 }
174 n.insertTask(t)
175 return t
176 }
177
178 type testCase struct {
179 name string
180 init func()
181
182 log string
183 state string
184
185
186 err string
187 }
188
189 cases := []testCase{{
190 name: "empty scheduler",
191 init: func() {
192 node(nil)
193 },
194 log: ``,
195
196 state: `
197 v0 (SUCCESS):`,
198 }, {
199 name: "node with one task",
200 init: func() {
201 v0 := node(nil)
202 success("t1", v0, c1, 0)
203 },
204 log: `
205 running task t1`,
206
207 state: `
208 v0 (SUCCESS):
209 task: t1: SUCCESS`,
210 }, {
211 name: "node with two tasks",
212 init: func() {
213 v0 := node(nil)
214 success("t1", v0, c1, 0)
215 success("t2", v0, c2, 0)
216 },
217 log: `
218 running task t1
219 running task t2`,
220
221 state: `
222 v0 (SUCCESS):
223 task: t1: SUCCESS
224 task: t2: SUCCESS`,
225 }, {
226 name: "node failing task",
227 init: func() {
228 v0 := node(nil)
229 fail("t1", v0, c1, 0)
230 },
231 log: `
232 running task t1: FAIL`,
233
234 state: `
235 v0 (SUCCESS):
236 task: t1: FAILED`,
237 }, {
238
239
240
241
242 name: "dependency chain on nodes within scheduler",
243 init: func() {
244 v0 := node(nil)
245 success("third", v0, c3, c2)
246 success("fourth", v0, c4, c3)
247 success("second", v0, c2, c1)
248 success("first", v0, c1, 0)
249 },
250 log: `
251 running task first
252 running task second
253 running task third
254 running task fourth`,
255
256 state: `
257 v0 (SUCCESS):
258 task: third: SUCCESS
259 task: fourth: SUCCESS
260 task: second: SUCCESS
261 task: first: SUCCESS`,
262 }, {
263
264
265
266 name: "task depends on state for which there is no task",
267 init: func() {
268 v0 := node(nil)
269 success("t1", v0, c2, c1)
270 },
271 log: `
272 running task t1`,
273 state: `
274 v0 (SUCCESS):
275 task: t1: SUCCESS`,
276 }, {
277
278 name: "task depends on state of other node for which there is no task",
279 init: func() {
280 v0 := node(nil)
281 v1 := node(v0)
282 v2 := node(v0)
283 success("t1", v1, c1, 0, dep{node: v2, needs: c2})
284 },
285 log: `
286 running task t1`,
287 state: `
288 v0 (SUCCESS):
289 v1 (SUCCESS):
290 task: t1: SUCCESS
291 v2 (SUCCESS):`,
292 }, {
293 name: "tasks depend on multiple other tasks within same scheduler",
294 init: func() {
295 v0 := node(nil)
296 success("before1", v0, c2, 0)
297 success("last", v0, c4, c1|c2|c3)
298 success("block", v0, c3, c1|c2)
299 success("before2", v0, c1, 0)
300 },
301 log: `
302 running task before1
303 running task before2
304 running task block
305 running task last`,
306
307 state: `
308 v0 (SUCCESS):
309 task: before1: SUCCESS
310 task: last: SUCCESS
311 task: block: SUCCESS
312 task: before2: SUCCESS`,
313 }, {
314
315
316
317
318
319
320
321
322
323
324
325 name: "non-cyclic dependencies between nodes p1",
326 init: func() {
327 v0 := node(nil)
328 baz := node(v0)
329 success("t0", baz, c1, 0)
330 foo := node(v0)
331
332 completes("t1:bar", v0, foo, c2, dep{node: baz, needs: c1})
333 success("t2:baz", v0, c1, 0, dep{node: foo, needs: c2})
334 },
335 log: `
336 running task t1:bar
337 running task t0
338 running task t2:baz`,
339 state: `
340 v0 (SUCCESS):
341 task: t1:bar: SUCCESS
342 task: t2:baz: SUCCESS
343 v1 (SUCCESS):
344 task: t0: SUCCESS
345 v2 (SUCCESS):`,
346 }, {
347
348
349
350
351
352
353
354
355 name: "non-cyclic dependencies between nodes p2",
356 init: func() {
357 v0 := node(nil)
358 baz := node(v0)
359 success("foo", baz, c1, 0)
360 foo := node(v0)
361
362 success("t2:baz", v0, c1, 0, dep{node: foo, needs: c2})
363 completes("t1:bar", v0, foo, c2, dep{node: baz, needs: c1})
364 },
365 log: `
366 running task t2:baz
367 running task t1:bar
368 running task foo`,
369 state: `
370 v0 (SUCCESS):
371 task: t2:baz: SUCCESS
372 task: t1:bar: SUCCESS
373 v1 (SUCCESS):
374 task: foo: SUCCESS
375 v2 (SUCCESS):`,
376 }, {
377
378
379 name: "cycle in mutually referencing expressions",
380 init: func() {
381 v0 := node(nil)
382 v1 := node(v0)
383 v2 := node(v0)
384 success("a-10", v1, c1|c2, 0, dep{node: v2, needs: c1})
385 success("b+10", v2, c1|c2, 0, dep{node: v1, needs: c1})
386 },
387 log: `
388 running task a-10
389 running task b+10
390 task b+10 waiting for v1 meeting 1: BLOCKED
391 task a-10 waiting for v2 meeting 1: BLOCKED
392 running task b+10
393 running task a-10`,
394 state: `
395 v0 (SUCCESS):
396 v1 (SUCCESS): (frozen)
397 task: a-10: SUCCESS (unblocked)
398 v2 (SUCCESS): (frozen)
399 task: b+10: SUCCESS (unblocked)`,
400 }, {
401
402
403
404 name: "broken cyclic reference in expressions",
405 init: func() {
406 v0 := node(nil)
407 v1 := node(v0)
408 v2 := node(v0)
409 success("a-10", v1, c1|c2, 0, dep{node: v2, needs: c1})
410 success("b+10", v2, c1|c2, 0, dep{node: v1, needs: c1})
411
412
413
414
415
416
417
418 signal("5", v2, c1)
419 },
420 log: `
421 running task a-10
422 running task b+10
423 task b+10 waiting for v1 meeting 1: BLOCKED
424 running task 5
425 running task b+10`,
426 state: `
427 v0 (SUCCESS):
428 v1 (SUCCESS):
429 task: a-10: SUCCESS
430 v2 (SUCCESS):
431 task: b+10: SUCCESS
432 task: 5: SUCCESS`,
433 }, {
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449 name: "self cyclic",
450 init: func() {
451 x := node(nil)
452 foo := node(x)
453 success("5", foo, c1, 0)
454 success("comprehension", x, c1, 0, dep{node: x, needs: c1})
455 },
456 log: `
457 running task comprehension
458 task comprehension waiting for v0 meeting 1: BLOCKED
459 running task comprehension
460 running task 5`,
461 state: `
462 v0 (SUCCESS): (frozen)
463 task: comprehension: SUCCESS (unblocked)
464 v1 (SUCCESS):
465 task: 5: SUCCESS`,
466 }, {
467
468
469
470
471
472
473
474
475
476
477
478
479 name: "self cyclic not allowed",
480 init: func() {
481 x := node(nil)
482 foo := node(x)
483 success("5", foo, c1, 0)
484 success("comprehension", x, c1, 0, dep{node: x, needs: c1})
485 },
486 log: `
487 running task comprehension
488 task comprehension waiting for v0 meeting 1: BLOCKED
489 running task comprehension
490 running task 5`,
491 state: `
492 v0 (SUCCESS): (frozen)
493 task: comprehension: SUCCESS (unblocked)
494 v1 (SUCCESS):
495 task: 5: SUCCESS`,
496 }, {
497
498
499
500
501
502
503
504
505
506
507
508
509
510 name: "mutually cyclic projection",
511 init: func() {
512 v0 := node(nil)
513 x := node(v0)
514 y := node(v0)
515
516 success("comprehension", x, c1, 0, dep{node: y, needs: c1})
517 success("comprehension", y, c1, 0, dep{node: x, needs: c1})
518
519 },
520 log: `
521 running task comprehension
522 running task comprehension
523 task comprehension waiting for v1 meeting 1: BLOCKED
524 task comprehension waiting for v2 meeting 1: BLOCKED
525 running task comprehension
526 running task comprehension`,
527 state: `
528 v0 (SUCCESS):
529 v1 (SUCCESS): (frozen)
530 task: comprehension: SUCCESS (unblocked)
531 v2 (SUCCESS): (frozen)
532 task: comprehension: SUCCESS (unblocked)`,
533 }, {
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550 name: "disallowed mutually cyclic projection",
551 init: func() {
552 v0 := node(nil)
553 x := node(v0)
554 y := node(v0)
555 foo := node(y)
556 success("5", foo, c1, 0)
557
558 success("comprehension", x, c1, 0, dep{node: y, needs: c1})
559 success("comprehension", y, c1, 0, dep{node: x, needs: c1})
560
561 },
562 log: `
563 running task comprehension
564 running task comprehension
565 task comprehension waiting for v1 meeting 1: BLOCKED
566 task comprehension waiting for v2 meeting 1: BLOCKED
567 running task comprehension
568 running task comprehension
569 running task 5`,
570 state: `
571 v0 (SUCCESS):
572 v1 (SUCCESS): (frozen)
573 task: comprehension: SUCCESS (unblocked)
574 v2 (SUCCESS): (frozen)
575 task: comprehension: SUCCESS (unblocked)
576 v3 (SUCCESS):
577 task: 5: SUCCESS`,
578 }}
579
580 cuetest.Run(t, cases, func(t *cuetest.T, tc *testCase) {
581
582
583
584 nodeID = 0
585 nodes = nodes[:0]
586 w.Reset()
587
588
589 tc.init()
590 for _, n := range nodes {
591 n.provided |= auto
592 n.signalDoneAdding()
593 }
594 for _, n := range nodes {
595 n.finalize(auto)
596 }
597
598 t.Equal(w.String(), tc.log)
599
600 w := &strings.Builder{}
601 for _, n := range nodes {
602 fmt.Fprintf(w, "\n\t\t\tv%d (%v):", n.refCount, n.state)
603 if n.scheduler.isFrozen {
604 fmt.Fprint(w, " (frozen)")
605 }
606 for _, t := range n.tasks {
607 fmt.Fprintf(w, "\n\t\t\t task: %s: %v", t.x.(*String).Str, t.state)
608 if t.unblocked {
609 fmt.Fprint(w, " (unblocked)")
610 }
611 }
612 for _, t := range n.blocking {
613 if t.blockedOn != nil {
614 fmt.Fprintf(w, "\n\t\t\t blocked: %s: %v", t.x.(*String).Str, t.state)
615 }
616 }
617 }
618
619 t.Equal(w.String(), tc.state)
620 })
621 }
622
View as plain text