...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package flow
16
17
18
19
20
21
22
23
24
25
26
27
28 import (
29 "fmt"
30 "os"
31
32 "cuelang.org/go/cue/errors"
33 "cuelang.org/go/internal/core/adt"
34 "cuelang.org/go/internal/core/eval"
35 "cuelang.org/go/internal/value"
36 )
37
38 func (c *Controller) runLoop() {
39 _, root := value.ToInternal(c.inst)
40
41
42 n := len(root.Conjuncts)
43 c.conjuncts = make([]adt.Conjunct, n, n+len(c.tasks))
44 copy(c.conjuncts, root.Conjuncts)
45
46 c.markReady(nil)
47
48 for c.errs == nil {
49
50
51
52 waiting := false
53 running := false
54
55
56 for _, t := range c.tasks {
57 switch t.state {
58 case Waiting:
59 waiting = true
60
61 case Ready:
62 running = true
63
64 t.state = Running
65 c.updateTaskValue(t)
66
67 t.ctxt = eval.NewContext(value.ToInternal(t.v))
68
69 go func(t *Task) {
70 if err := t.r.Run(t, nil); err != nil {
71 t.err = errors.Promote(err, "task failed")
72 }
73
74 t.c.taskCh <- t
75 }(t)
76
77 case Running:
78 running = true
79
80 case Terminated:
81 }
82 }
83
84 if !running {
85 if waiting {
86
87
88 c.addErr(errors.New("deadlock"), "run loop")
89 }
90 break
91 }
92
93 select {
94 case <-c.context.Done():
95 return
96
97 case t := <-c.taskCh:
98 t.state = Terminated
99
100 taskStats := *t.ctxt.Stats()
101 t.stats.Add(taskStats)
102 c.taskStats.Add(taskStats)
103
104 start := *c.opCtx.Stats()
105
106 switch t.err {
107 case nil:
108 c.updateTaskResults(t)
109
110 case ErrAbort:
111
112 fallthrough
113
114 default:
115 c.addErr(t.err, "task failure")
116 return
117 }
118
119
120 if c.updateValue() {
121
122
123 c.initTasks()
124 }
125
126 c.updateTaskValue(t)
127
128 t.stats.Add(c.opCtx.Stats().Since(start))
129
130 c.markReady(t)
131 }
132 }
133 }
134
135 func (c *Controller) markReady(t *Task) {
136 for _, x := range c.tasks {
137 if x.state == Waiting && x.isReady() {
138 x.state = Ready
139 }
140 }
141
142 if debug {
143 fmt.Fprintln(os.Stderr, "tools/flow task dependency graph:")
144 fmt.Fprintln(os.Stderr, "```mermaid")
145 fmt.Fprint(os.Stderr, mermaidGraph(c))
146 fmt.Fprintln(os.Stderr, "```")
147 }
148
149 if c.cfg.UpdateFunc != nil {
150 if err := c.cfg.UpdateFunc(c, t); err != nil {
151 c.addErr(err, "task completed")
152 c.cancel()
153 return
154 }
155 }
156 }
157
158
159
160 func (c *Controller) updateValue() bool {
161
162 if c.valueSeqNum == c.conjunctSeq {
163 return false
164 }
165
166
167
168
169
170 v := &adt.Vertex{Conjuncts: c.conjuncts}
171 v.Finalize(c.opCtx)
172
173 c.inst = value.Make(c.opCtx, v)
174 c.valueSeqNum = c.conjunctSeq
175 return true
176 }
177
178
179
180 func (c *Controller) updateTaskValue(t *Task) {
181 required := t.conjunctSeq
182 for _, dep := range t.depTasks {
183 if dep.conjunctSeq > required {
184 required = dep.conjunctSeq
185 }
186 }
187
188 if t.valueSeq == required {
189 return
190 }
191
192 if c.valueSeqNum < required {
193 c.updateValue()
194 }
195
196 t.v = c.inst.LookupPath(t.path)
197 t.valueSeq = required
198 }
199
200
201
202 func (c *Controller) updateTaskResults(t *Task) bool {
203 if t.update == nil {
204 return false
205 }
206
207 expr := t.update
208 for i := len(t.labels) - 1; i >= 0; i-- {
209 label := t.labels[i]
210 switch label.Typ() {
211 case adt.StringLabel, adt.HiddenLabel:
212 expr = &adt.StructLit{
213 Decls: []adt.Decl{
214 &adt.Field{
215 Label: t.labels[i],
216 Value: expr,
217 },
218 },
219 }
220 case adt.IntLabel:
221 i := label.Index()
222 list := &adt.ListLit{}
223 any := &adt.Top{}
224
225 for k := 0; k < i; k++ {
226 list.Elems = append(list.Elems, any)
227 }
228 list.Elems = append(list.Elems, expr, &adt.Ellipsis{})
229 expr = list
230 default:
231 panic(fmt.Errorf("unexpected label type %v", label.Typ()))
232 }
233 }
234
235 t.update = nil
236
237
238
239 c.conjuncts = append(c.conjuncts, adt.MakeRootConjunct(c.env, expr))
240 c.conjunctSeq++
241 t.conjunctSeq = c.conjunctSeq
242
243 return true
244 }
245
View as plain text