1 // Copyright 2020 CUE Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 // Package flow provides a low-level workflow manager based on a CUE Instance. 16 // 17 // A Task defines an operational unit in a Workflow and corresponds to a struct 18 // in a CUE instance. This package does not define what a Task looks like in a 19 // CUE Instance. Instead, the user of this package must supply a TaskFunc that 20 // creates a Runner for cue.Values that are deemed to be a Task. 21 // 22 // Tasks may depend on other tasks. Cyclic dependencies are thereby not allowed. 23 // A Task A depends on another Task B if A, directly or indirectly, has a 24 // reference to any field of Task B, including its root. 25 package flow 26 27 // TODO: Add hooks. This would allow UIs, for instance, to report on progress. 28 // 29 // - New(inst *cue.Instance, options ...Option) 30 // - AddTask(v cue.Value, r Runner) *Task 31 // - AddDependency(a, b *Task) 32 // - AddTaskGraph(root cue.Value, fn taskFunc) 33 // - AddSequence(list cue.Value, fn taskFunc) 34 // - Err() 35 36 // TODO: 37 // Should we allow lists as a shorthand for a sequence of tasks? 38 // If so, how do we specify termination behavior? 39 40 // TODO: 41 // Should we allow tasks to be a child of another task? Currently, the search 42 // for tasks end once a task root is found. 43 // 44 // Semantically it is somewhat unclear to do so: for instance, if an $after 45 // is used to refer to an explicit task dependency, it is logically 46 // indistinguishable whether this should be a subtask or is a dependency. 47 // Using higher-order constructs for analysis is generally undesirable. 48 // 49 // A possible solution would be to define specific "grouping tasks" whose sole 50 // purpose is to define sub tasks. The user of this package would then need 51 // to explicitly distinguish between tasks that are dependencies and tasks that 52 // are subtasks. 53 54 // TODO: streaming tasks/ server applications 55 // 56 // Workflows are currently implemented for batch processing, for instance to 57 // implement shell scripting or other kinds of batch processing. 58 // 59 // This API has been designed, however, to also allow for streaming 60 // applications. For instance, a streaming Task could listen for Etcd changes 61 // or incoming HTTP requests and send updates each time an input changes. 62 // Downstream tasks could then alternate between a Waiting and Running state. 63 // 64 // Note that such streaming applications would also cause configurations to 65 // potentially not become increasingly more specific. Instead, a Task would 66 // replace its old result each time it is updated. This would require tracking 67 // of which conjunct was previously created by a task. 68 69 import ( 70 "context" 71 "fmt" 72 "os" 73 "strings" 74 "sync/atomic" 75 76 "cuelang.org/go/cue" 77 "cuelang.org/go/cue/errors" 78 "cuelang.org/go/cue/stats" 79 "cuelang.org/go/internal/core/adt" 80 "cuelang.org/go/internal/core/convert" 81 "cuelang.org/go/internal/core/eval" 82 "cuelang.org/go/internal/value" 83 ) 84 85 var ( 86 // ErrAbort may be returned by a task to avoid processing downstream tasks. 87 // This can be used by control nodes to influence execution. 88 ErrAbort = errors.New("abort dependant tasks without failure") 89 90 // TODO: ErrUpdate: update and run a dependency, but don't complete a 91 // dependency as more results may come. This is useful in server mode. 92 93 debug = os.Getenv("CUE_DEBUG_TOOLS_FLOW") != "" 94 ) 95 96 // A TaskFunc creates a Runner for v if v defines a task or reports nil 97 // otherwise. It reports an error for illformed tasks. 98 // 99 // If TaskFunc returns a non-nil Runner the search for task within v stops. 100 // That is, subtasks are not supported. 101 type TaskFunc func(v cue.Value) (Runner, error) 102 103 // A Runner executes a Task. 104 type Runner interface { 105 // Run runs a Task. If any of the tasks it depends on returned an error it 106 // is passed to this task. It reports an error upon failure. 107 // 108 // Any results to be returned can be set by calling Fill on the passed task. 109 // 110 // TODO: what is a good contract for receiving and passing errors and abort. 111 // 112 // If for a returned error x errors.Is(x, ErrAbort), all dependant tasks 113 // will not be run, without this being an error. 114 Run(t *Task, err error) error 115 } 116 117 // A RunnerFunc runs a Task. 118 type RunnerFunc func(t *Task) error 119 120 func (f RunnerFunc) Run(t *Task, err error) error { 121 return f(t) 122 } 123 124 // A Config defines options for interpreting an Instance as a Workflow. 125 type Config struct { 126 // Root limits the search for tasks to be within the path indicated to root. 127 // For the cue command, this is set to ["command"]. The default value is 128 // for all tasks to be root. 129 Root cue.Path 130 131 // InferTasks allows tasks to be defined outside of the Root. Such tasks 132 // will only be included in the workflow if any of its fields is referenced 133 // by any of the tasks defined within Root. 134 // 135 // CAVEAT EMPTOR: this features is mostly provided for backwards 136 // compatibility with v0.2. A problem with this approach is that it will 137 // look for task structs within arbitrary data. So if not careful, there may 138 // be spurious matches. 139 InferTasks bool 140 141 // IgnoreConcrete ignores references for which the values are already 142 // concrete and cannot change. 143 IgnoreConcrete bool 144 145 // FindHiddenTasks allows tasks to be defined in hidden fields. 146 FindHiddenTasks bool 147 148 // UpdateFunc is called whenever the information in the controller is 149 // updated. This includes directly after initialization. The task may be 150 // nil if this call is not the result of a task completing. 151 UpdateFunc func(c *Controller, t *Task) error 152 } 153 154 // A Controller defines a set of Tasks to be executed. 155 type Controller struct { 156 cfg Config 157 isTask TaskFunc 158 159 inst cue.Value 160 valueSeqNum int64 161 162 env *adt.Environment 163 164 conjuncts []adt.Conjunct 165 conjunctSeq int64 166 167 taskCh chan *Task 168 169 opCtx *adt.OpContext 170 context context.Context 171 cancelFunc context.CancelFunc 172 173 // taskStats tracks counters for auxiliary operations done by tasks. It does 174 // not include the CUE operations done by the Controller on behalf of tasks, 175 // which is likely going to tbe the bulk of the operations. 176 taskStats stats.Counts 177 178 done atomic.Bool 179 180 // keys maps task keys to their index. This allows a recreation of the 181 // Instance while retaining the original task indices. 182 // 183 // TODO: do instance updating in place to allow for more efficient 184 // processing. 185 keys map[string]*Task 186 tasks []*Task 187 188 // Only used during task initialization. 189 nodes map[*adt.Vertex]*Task 190 191 errs errors.Error 192 } 193 194 // Stats reports statistics on the total number of CUE operations used. 195 // 196 // This is an experimental method and the API is likely to change. The 197 // Counts.String method will likely stay and is the safest way to use this API. 198 // 199 // This currently should only be called after completion or within a call to 200 // UpdateFunc. 201 func (c *Controller) Stats() (counts stats.Counts) { 202 counts = *c.opCtx.Stats() 203 counts.Add(c.taskStats) 204 return counts 205 } 206 207 // Tasks reports the tasks that are currently registered with the controller. 208 // 209 // This may currently only be called before Run is called or from within 210 // a call to UpdateFunc. Task pointers returned by this call are not guaranteed 211 // to be the same between successive calls to this method. 212 func (c *Controller) Tasks() []*Task { 213 return c.tasks 214 } 215 216 func (c *Controller) cancel() { 217 if c.cancelFunc != nil { 218 c.cancelFunc() 219 } 220 } 221 222 func (c *Controller) addErr(err error, msg string) { 223 c.errs = errors.Append(c.errs, errors.Promote(err, msg)) 224 } 225 226 // New creates a Controller for a given Instance and TaskFunc. 227 // 228 // The instance value can either be a *cue.Instance or a cue.Value. 229 func New(cfg *Config, inst cue.InstanceOrValue, f TaskFunc) *Controller { 230 v := inst.Value() 231 ctx := eval.NewContext(value.ToInternal(v)) 232 233 c := &Controller{ 234 isTask: f, 235 inst: v, 236 opCtx: ctx, 237 238 taskCh: make(chan *Task), 239 keys: map[string]*Task{}, 240 } 241 242 if cfg != nil { 243 c.cfg = *cfg 244 } 245 246 c.initTasks() 247 return c 248 249 } 250 251 // Run runs the tasks of a workflow until completion. 252 func (c *Controller) Run(ctx context.Context) error { 253 c.context, c.cancelFunc = context.WithCancel(ctx) 254 defer c.cancelFunc() 255 256 c.runLoop() 257 258 // NOTE: track state here as runLoop might add more tasks to the flow 259 // during the execution so checking current tasks state may not be 260 // accurate enough to determine that the flow is terminated. 261 // This is used to determine if the controller value can be retrieved. 262 // When the controller value is safe to be read concurrently this tracking 263 // can be removed. 264 c.done.Store(true) 265 266 return c.errs 267 } 268 269 // Value returns the value managed by the controller. 270 // 271 // It is safe to use the value only after Run() has returned. 272 // It panics if the flow is running. 273 func (c *Controller) Value() cue.Value { 274 if !c.done.Load() { 275 panic("can't retrieve value before flow has terminated") 276 } 277 return c.inst 278 } 279 280 // We need to escape quotes in the path, per 281 // https://mermaid-js.github.io/mermaid/#/flowchart?id=entity-codes-to-escape-characters 282 // This also requires that we escape the quoting character #. 283 var mermaidQuote = strings.NewReplacer("#", "#35;", `"`, "#quot;") 284 285 // mermaidGraph generates a mermaid graph of the current state. This can be 286 // pasted into https://mermaid-js.github.io/mermaid-live-editor/ for 287 // visualization. 288 func mermaidGraph(c *Controller) string { 289 w := &strings.Builder{} 290 fmt.Fprintln(w, "graph TD") 291 for i, t := range c.Tasks() { 292 path := mermaidQuote.Replace(t.Path().String()) 293 fmt.Fprintf(w, " t%d(\"%s [%s]\")\n", i, path, t.State()) 294 for _, t := range t.Dependencies() { 295 fmt.Fprintf(w, " t%d-->t%d\n", i, t.Index()) 296 } 297 } 298 return w.String() 299 } 300 301 // A State indicates the state of a Task. 302 // 303 // The following state diagram indicates the possible state transitions: 304 // 305 // Ready 306 // ↗︎ ↘︎ 307 // Waiting ← Running 308 // ↘︎ ↙︎ 309 // Terminated 310 // 311 // A Task may move from Waiting to Terminating if one of 312 // the tasks on which it depends fails. 313 // 314 // NOTE: transitions from Running to Waiting are currently not supported. In 315 // the future this may be possible if a task depends on continuously running 316 // tasks that send updates. 317 type State int 318 319 const ( 320 // Waiting indicates a task is blocked on input from another task. 321 // 322 // NOTE: although this is currently not implemented, a task could 323 // theoretically move from the Running to Waiting state. 324 Waiting State = iota 325 326 // Ready means a tasks is ready to run, but currently not running. 327 Ready 328 329 // Running indicates a goroutine is currently active for a task and that 330 // it is not Waiting. 331 Running 332 333 // Terminated means a task has stopped running either because it terminated 334 // while Running or was aborted by task on which it depends. The error 335 // value of a Task indicates the reason for the termination. 336 Terminated 337 ) 338 339 var stateStrings = map[State]string{ 340 Waiting: "Waiting", 341 Ready: "Ready", 342 Running: "Running", 343 Terminated: "Terminated", 344 } 345 346 // String reports a human readable string of status s. 347 func (s State) String() string { 348 return stateStrings[s] 349 } 350 351 // A Task contains the context for a single task execution. 352 // Tasks may be run concurrently. 353 type Task struct { 354 // Static 355 c *Controller 356 ctxt *adt.OpContext 357 r Runner 358 359 index int 360 path cue.Path 361 key string 362 labels []adt.Feature 363 364 // Dynamic 365 update adt.Expr 366 deps map[*Task]bool 367 pathDeps map[string][]*Task 368 369 conjunctSeq int64 370 valueSeq int64 371 v cue.Value 372 err errors.Error 373 state State 374 depTasks []*Task 375 376 stats stats.Counts 377 } 378 379 // Stats reports statistics on the number of CUE operations used to complete 380 // this task. 381 // 382 // This is an experimental method and the API is likely to change. 383 // 384 // It only shows numbers upon completion. This may change in the future. 385 func (t *Task) Stats() stats.Counts { 386 return t.stats 387 } 388 389 // Context reports the Controller's Context. 390 func (t *Task) Context() context.Context { 391 return t.c.context 392 } 393 394 // Path reports the path of Task within the Instance in which it is defined. 395 // The Path is always valid. 396 func (t *Task) Path() cue.Path { 397 return t.path 398 } 399 400 // Index reports the sequence number of the Task. This will not change over 401 // time. 402 func (t *Task) Index() int { 403 return t.index 404 } 405 406 func (t *Task) done() bool { 407 return t.state > Running 408 } 409 410 func (t *Task) isReady() bool { 411 for _, d := range t.depTasks { 412 if !d.done() { 413 return false 414 } 415 } 416 return true 417 } 418 419 func (t *Task) vertex() *adt.Vertex { 420 _, x := value.ToInternal(t.v) 421 return x 422 } 423 424 func (t *Task) addDep(path string, dep *Task) { 425 if dep == nil || dep == t { 426 return 427 } 428 if t.deps == nil { 429 t.deps = map[*Task]bool{} 430 t.pathDeps = map[string][]*Task{} 431 } 432 433 // Add the dependencies for a given path to the controller. We could compute 434 // this again later, but this ensures there will be no discrepancies. 435 a := t.pathDeps[path] 436 found := false 437 for _, t := range a { 438 if t == dep { 439 found = true 440 break 441 } 442 } 443 if !found { 444 t.pathDeps[path] = append(a, dep) 445 446 } 447 448 if !t.deps[dep] { 449 t.deps[dep] = true 450 t.depTasks = append(t.depTasks, dep) 451 } 452 } 453 454 // Fill fills in values of the Controller's configuration for the current task. 455 // The changes take effect after the task completes. 456 // 457 // This method may currently only be called by the runner. 458 func (t *Task) Fill(x interface{}) error { 459 expr := convert.GoValueToExpr(t.ctxt, true, x) 460 if t.update == nil { 461 t.update = expr 462 return nil 463 } 464 t.update = &adt.BinaryExpr{ 465 Op: adt.AndOp, 466 X: t.update, 467 Y: expr, 468 } 469 return nil 470 } 471 472 // Value reports the latest value of this task. 473 // 474 // This method may currently only be called before Run is called or after a 475 // Task completed, or from within a call to UpdateFunc. 476 func (t *Task) Value() cue.Value { 477 // TODO: synchronize 478 return t.v 479 } 480 481 // Dependencies reports the Tasks t depends on. 482 // 483 // This method may currently only be called before Run is called or after a 484 // Task completed, or from within a call to UpdateFunc. 485 func (t *Task) Dependencies() []*Task { 486 // TODO: add synchronization. 487 return t.depTasks 488 } 489 490 // PathDependencies reports the dependencies found for a value at the given 491 // path. 492 // 493 // This may currently only be called before Run is called or from within 494 // a call to UpdateFunc. 495 func (t *Task) PathDependencies(p cue.Path) []*Task { 496 return t.pathDeps[p.String()] 497 } 498 499 // Err returns the error of a completed Task. 500 // 501 // This method may currently only be called before Run is called, after a 502 // Task completed, or from within a call to UpdateFunc. 503 func (t *Task) Err() error { 504 return t.err 505 } 506 507 // State is the current state of the Task. 508 // 509 // This method may currently only be called before Run is called or after a 510 // Task completed, or from within a call to UpdateFunc. 511 func (t *Task) State() State { 512 return t.state 513 } 514