1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package flow_test
16
17 import (
18 "context"
19 "fmt"
20 "os"
21 "path"
22 "strings"
23 "sync"
24 "testing"
25 "time"
26
27 "cuelang.org/go/cue"
28 "cuelang.org/go/cue/cuecontext"
29 "cuelang.org/go/cue/errors"
30 "cuelang.org/go/cue/format"
31 "cuelang.org/go/cue/stats"
32 "cuelang.org/go/internal/cuetxtar"
33 "cuelang.org/go/tools/flow"
34 )
35
36
37
38 func TestFlow(t *testing.T) {
39 test := cuetxtar.TxTarTest{
40 Root: "./testdata",
41 Name: "run",
42 }
43
44 test.Run(t, func(t *cuetxtar.Test) {
45 v := cuecontext.New().BuildInstance(t.Instance())
46 if err := v.Err(); err != nil {
47 t.Fatal(errors.Details(err, nil))
48 }
49
50 seqNum = 0
51
52 var tasksTotal stats.Counts
53
54 updateFunc := func(c *flow.Controller, task *flow.Task) error {
55 str := flow.MermaidGraph(c)
56 step := fmt.Sprintf("t%d", seqNum)
57 fmt.Fprintln(t.Writer(step), str)
58
59 if task != nil {
60 n := task.Value().Syntax(cue.Final())
61 b, err := format.Node(n)
62 if err != nil {
63 t.Fatal(err)
64 }
65 fmt.Fprintln(t.Writer(path.Join(step, "value")), string(b))
66
67 stats := task.Stats()
68 tasksTotal.Add(stats)
69 fmt.Fprintln(t.Writer(path.Join(step, "stats")), &stats)
70 }
71
72 incSeqNum()
73
74 return nil
75 }
76
77 cfg := &flow.Config{
78 Root: cue.ParsePath("root"),
79 InferTasks: t.Bool("InferTasks"),
80 IgnoreConcrete: t.Bool("IgnoreConcrete"),
81 FindHiddenTasks: t.Bool("FindHiddenTasks"),
82 UpdateFunc: updateFunc,
83 }
84
85 c := flow.New(cfg, v, taskFunc)
86
87 w := t.Writer("errors")
88 if err := c.Run(context.Background()); err != nil {
89 cwd, _ := os.Getwd()
90 fmt.Fprint(w, "error: ")
91 errors.Print(w, err, &errors.Config{
92 Cwd: cwd,
93 ToSlash: true,
94 })
95 }
96
97 totals := c.Stats()
98 if tasksTotal != zeroStats && totals != tasksTotal {
99 t.Errorf(diffMsg, tasksTotal, totals, tasksTotal.Since(totals))
100 }
101 fmt.Fprintln(t.Writer("stats/totals"), totals)
102 })
103 }
104
105 var zeroStats stats.Counts
106
107 const diffMsg = `
108 stats: task totals different from controller:
109 task totals:
110 %v
111
112 controller totals:
113 %v
114
115 task totals - controller totals:
116 %v`
117
118 func TestFlowValuePanic(t *testing.T) {
119 f := `
120 root: {
121 a: {
122 $id: "slow"
123 out: string
124 }
125 b: {
126 $id: "slow"
127 $after: a
128 out: string
129 }
130 }
131 `
132 ctx := cuecontext.New()
133 v := ctx.CompileString(f)
134
135 ch := make(chan bool, 1)
136
137 cfg := &flow.Config{
138 Root: cue.ParsePath("root"),
139 UpdateFunc: func(c *flow.Controller, t *flow.Task) error {
140 ch <- true
141 return nil
142 },
143 }
144
145 c := flow.New(cfg, v, taskFunc)
146
147 defer func() { recover() }()
148
149 go c.Run(context.TODO())
150
151
152
153 <-ch
154 c.Value()
155 <-ch
156
157 t.Errorf("Value() did not panic")
158 }
159
160 func taskFunc(v cue.Value) (flow.Runner, error) {
161 switch name, err := v.Lookup("$id").String(); name {
162 default:
163 if err == nil {
164 return flow.RunnerFunc(func(t *flow.Task) error {
165 t.Fill(map[string]string{"stdout": "foo"})
166 return nil
167 }), nil
168 } else if v.LookupPath(cue.MakePath(cue.Str("$id"))).Exists() {
169 return nil, err
170 }
171
172 case "valToOut":
173 return flow.RunnerFunc(func(t *flow.Task) error {
174 if str, err := t.Value().Lookup("val").String(); err == nil {
175 t.Fill(map[string]string{"out": str})
176 }
177 return nil
178 }), nil
179
180 case "failure":
181 return flow.RunnerFunc(func(t *flow.Task) error {
182 return errors.New("failure")
183 }), nil
184
185 case "abort":
186 return flow.RunnerFunc(func(t *flow.Task) error {
187 return flow.ErrAbort
188 }), nil
189
190 case "list":
191 return flow.RunnerFunc(func(t *flow.Task) error {
192 t.Fill(map[string][]int{"out": {1, 2}})
193 return nil
194 }), nil
195
196 case "slow":
197 return flow.RunnerFunc(func(t *flow.Task) error {
198 time.Sleep(10 * time.Millisecond)
199 t.Fill(map[string]string{"out": "finished"})
200 return nil
201 }), nil
202
203 case "sequenced":
204
205
206 return flow.RunnerFunc(func(t *flow.Task) error {
207 seq, err := t.Value().Lookup("seq").Int64()
208 if err != nil {
209 return err
210 }
211
212 waitSeqNum(seq)
213
214 if str, err := t.Value().Lookup("val").String(); err == nil {
215 t.Fill(map[string]string{"out": str})
216 }
217
218 return nil
219 }), nil
220 }
221 return nil, nil
222 }
223
224
225
226 var (
227 seqNum int64
228 seqLock sync.Mutex
229 seqCond = sync.NewCond(&seqLock)
230 )
231
232 func incSeqNum() {
233 seqCond.L.Lock()
234 seqNum++
235 seqCond.Broadcast()
236 seqCond.L.Unlock()
237 }
238
239 func waitSeqNum(seq int64) {
240 seqCond.L.Lock()
241 for seq != seqNum {
242 seqCond.Wait()
243 }
244 seqCond.L.Unlock()
245 }
246
247
248 func TestX(t *testing.T) {
249 in := `
250 `
251
252 if strings.TrimSpace(in) == "" {
253 t.Skip()
254 }
255
256 rt := cuecontext.New()
257 v := rt.CompileString(in)
258 if err := v.Err(); err != nil {
259 t.Fatal(err)
260 }
261
262 c := flow.New(&flow.Config{
263 Root: cue.ParsePath("root"),
264 UpdateFunc: func(c *flow.Controller, ft *flow.Task) error {
265 if ft != nil {
266 t.Errorf("\nTASK:\n%s", ft.Stats())
267 }
268 return nil
269 },
270 }, v, taskFunc)
271
272 t.Error(flow.MermaidGraph(c))
273
274 if err := c.Run(context.Background()); err != nil {
275 t.Fatal(errors.Details(err, nil))
276 }
277
278 t.Errorf("\nCONTROLLER:\n%s", c.Stats())
279 }
280
View as plain text