1 // Copyright 2023 CUE Authors 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package adt 16 17 import ( 18 "math/bits" 19 ) 20 21 // The CUE scheduler schedules tasks for evaluation. 22 // 23 // A task is a computation unit associated with a single node. Each task may 24 // depend on knowing certain properties of one or more fields, namely: 25 // 26 // - whether the field exists 27 // - the scalar value of a field, if any 28 // - the set of all conjuncts 29 // - the set of all sub fields 30 // - the recursively evaluated value 31 // 32 // Each task, in turn, may mark itself as providing knowledge about one or more 33 // of these properties. If it is not known upfront whether a task may contribute 34 // to a certain property, it must mark itself as (potentially) contributing to 35 // this property. 36 // 37 // 38 // DEPENDENCY GRAPH 39 // 40 // A task may depend on zero or more fields, including the field for which it 41 // is defined. The graph of all dependencies is defined as follows: 42 // 43 // - Each task and each <field, property> pair is a node in the graph. 44 // - A task T for field F that (possibly) computes property P for F is 45 // represented by an edge from <F, P> to T. 46 // - A task T for field F that depends on property P of field G is represented 47 // by an edge from <G, P> to T. 48 // 49 // It is an evaluation cycle for a task T if there is a path from any task T to 50 // itself in the dependency graph. Processing will stop in the even of such a 51 // cycle. In such case, the scheduler will commence an unblocking mechanism. 52 // 53 // As a general rule, once a node is detected to be blocking, it may no longer 54 // become more specific. In other words, it is "frozen". 55 // The unblocking consists of two phases: the scheduler will first freeze and 56 // unblock all blocked nodes for the properties marked as autoUnblock-ing in 57 // taskContext. Subsequently all tasks that are unblocked by this will run. 58 // In the next phase all remaining tasks are unblocked. 59 // See taskContext.autoUnblock for more information. 60 // 61 // Note that some tasks, like references, may depend on other fields without 62 // requiring a certain property. These do not count as dependencies. 63 64 // A taskContext manages the task memory and task stack. 65 // It is typically associated with an OpContext. 66 type taskContext struct { 67 // stack tracks the current execution of tasks. This is a stack as tasks 68 // may trigger the evaluation of other tasks to complete. 69 stack []*task 70 71 // blocking lists all tasks that were blocked during a round of evaluation. 72 // Evaluation finalized one node at a time, which includes the evaluation 73 // of all nodes necessary to evaluate that node. Any task that is blocked 74 // during such a round of evaluation is recorded here. Any mutual cycles 75 // will result in unresolved tasks. At the end of such a round, computation 76 // can be frozen and the tasks unblocked. 77 blocking []*task 78 79 // counterMask marks which conditions use counters. Other conditions are 80 // handled by signals only. 81 counterMask condition 82 83 // autoUnblock marks the flags that get unblocked automatically when there 84 // is a deadlock between nodes. These are properties that may become 85 // meaningful once it is known that a value may not become more specific. 86 // An example of this is the property "scalar". If something is not a scalar 87 // yet, and it is known that the value may never become more specific, it is 88 // known that this value is never will become a scalar, thus effectively 89 // making it known. 90 autoUnblock condition 91 92 // This is called upon completion of states, allowing other states to be 93 // updated atomically. 94 complete func(s *scheduler) condition 95 } 96 97 func (p *taskContext) current() *task { 98 return p.stack[len(p.stack)-1] 99 } 100 101 func (p *taskContext) pushTask(t *task) { 102 p.stack = append(p.stack, t) 103 } 104 105 func (p *taskContext) popTask() { 106 p.stack = p.stack[:len(p.stack)-1] 107 } 108 109 func (p *taskContext) newTask() *task { 110 // TODO: allocate from pool. 111 return &task{} 112 } 113 114 type taskState uint8 115 116 const ( 117 taskREADY taskState = iota 118 119 taskRUNNING // processing conjunct(s) 120 taskWAITING // task is blocked on a property of an arc to hold 121 taskSUCCESS 122 taskFAILED 123 ) 124 125 type schedState uint8 126 127 const ( 128 schedREADY schedState = iota 129 130 schedRUNNING // processing conjunct(s) 131 schedFINALIZING // all tasks completed, run new tasks immediately 132 schedSUCCESS 133 schedFAILED 134 ) 135 136 func (s schedState) done() bool { return s >= schedSUCCESS } 137 138 func (s taskState) String() string { 139 switch s { 140 case taskREADY: 141 return "READY" 142 case taskRUNNING: 143 return "RUNNING" 144 case taskWAITING: 145 return "WAITING" 146 case taskSUCCESS: 147 return "SUCCESS" 148 case taskFAILED: 149 return "FAILED" 150 default: 151 return "UNKNOWN" 152 } 153 } 154 155 func (s schedState) String() string { 156 switch s { 157 case schedREADY: 158 return "READY" 159 case schedRUNNING: 160 return "RUNNING" 161 case schedFINALIZING: 162 return "FINALIZING" 163 case schedSUCCESS: 164 return "SUCCESS" 165 case schedFAILED: 166 return "FAILED" 167 default: 168 return "UNKNOWN" 169 } 170 } 171 172 // runMode indicates how to proceed after a condition could not be met. 173 type runMode uint8 174 175 const ( 176 // ignore indicates that the new evaluator should not do any processing. 177 // This is mostly used in the transition from old to new evaluator and 178 // should probably eventually be removed. 179 ignore runMode = 1 + iota 180 181 // attemptOnly indicates that execution should continue even if the 182 // condition is not met. 183 attemptOnly 184 185 // yield means that execution should be yielded if the condition is not met. 186 // That is, the task is marked as a dependency and control is returned to 187 // the runloop. The task will resume once the dependency is met. 188 yield 189 190 // finalize means that uncompleted tasks should be turned into errors to 191 // complete the evaluation of a Vertex. 192 finalize 193 ) 194 195 func (r runMode) String() string { 196 switch r { 197 case ignore: 198 return "ignore" 199 case attemptOnly: 200 return "attemptOnly" 201 case yield: 202 return "yield" 203 case finalize: 204 return "finalize" 205 } 206 return "unknown" 207 } 208 209 // condition is a bit mask of states that a task may depend on. 210 // 211 // There are generally two types of states: states that are met if all tasks 212 // that contribute to that state are completed (counter states), and states that 213 // are met if some global set of conditions are met. 214 type condition uint16 215 216 const ( 217 // allKnown indicates that all possible states are completed. 218 allKnown condition = 0x7fff 219 220 // neverKnown is a special condition that is never met. It can be used to 221 // mark a task as impossible to complete. 222 neverKnown condition = 0x8000 223 ) 224 225 func (c condition) meets(x condition) bool { 226 return c&x == x 227 } 228 229 const numCompletionStates = 10 // TODO: make this configurable 230 231 // A scheduler represents the set of outstanding tasks for a node. 232 type scheduler struct { 233 ctx *OpContext 234 node *nodeContext 235 236 state schedState 237 238 // completed is bit set of completed states. 239 completed condition 240 241 // needs specifies all the states needed to complete tasks in this scheduler. 242 needs condition 243 244 // provided specifies all the states that are provided by tasks added 245 // to this scheduler. 246 provided condition // TODO: rename to "provides"? To be consistent with "needs". 247 248 // frozen indicates all states that are frozen. These bits should be checked 249 // before making a node more specific. 250 // TODO: do we need a separate field for this, or can we use completed? 251 frozen condition 252 253 // isFrozen indicates if freeze was called explicitly. 254 // 255 // TODO: rename to isExplicitlyFrozen if it turns out we need both frozen 256 // and isFrozen. We probably do not. Check once the implementation of the 257 // new evaluator is complete. 258 isFrozen bool 259 260 // counters keeps track of the number of uncompleted tasks that are 261 // outstanding for each of the possible conditions. A state is 262 // considered completed if the corresponding counter reaches zero. 263 counters [numCompletionStates]int 264 265 // tasks lists all tasks that were scheduled for this scheduler. 266 // The list only contains tasks that are associated with this node. 267 // TODO: rename to queue and taskPos to nextQueueIndex. 268 tasks []*task 269 taskPos int 270 271 // blocking is a list of tasks that are blocked on the completion of 272 // the indicate conditions. This can hold tasks from other nodes or tasks 273 // originating from this node itself. 274 blocking []*task 275 } 276 277 func (s *scheduler) clear() { 278 // TODO(perf): free tasks into task pool 279 280 *s = scheduler{ 281 ctx: s.ctx, 282 tasks: s.tasks[:0], 283 blocking: s.blocking[:0], 284 } 285 } 286 287 // cloneInto initializes the state of dst to be the same as s. 288 // 289 // NOTE: this is deliberately not a pointer receiver: this approach allows 290 // cloning s into dst while preserving the buffers of dst and not having to 291 // explicitly clone any non-buffer fields. 292 func (s scheduler) cloneInto(dst *scheduler) { 293 s.tasks = append(dst.tasks, s.tasks...) 294 s.blocking = append(dst.blocking, s.blocking...) 295 296 *dst = s 297 } 298 299 // incrementCounts adds the counters for each condition. 300 // See also decrementCounts. 301 func (s *scheduler) incrementCounts(x condition) { 302 x &= s.ctx.counterMask 303 304 for { 305 n := bits.TrailingZeros16(uint16(x)) 306 if n == 16 { 307 break 308 } 309 bit := condition(1 << n) 310 x &^= bit 311 312 s.counters[n]++ 313 } 314 } 315 316 // decrementCounts decrements the counters for each condition. If a counter for 317 // a condition reaches zero, it means that condition is met and all blocking 318 // tasks depending on that state can be run. 319 func (s *scheduler) decrementCounts(x condition) { 320 x &= s.ctx.counterMask 321 322 var completed condition 323 for { 324 n := bits.TrailingZeros16(uint16(x)) 325 if n == 16 { 326 break 327 } 328 bit := condition(1 << n) 329 x &^= bit 330 331 s.counters[n]-- 332 if s.counters[n] == 0 { 333 completed |= bit 334 } 335 } 336 337 s.signal(completed) 338 } 339 340 // finalize runs all tasks and signals that the scheduler is done upon 341 // completion for the given signals. 342 func (s *scheduler) finalize(completed condition) { 343 // Do not panic on cycle detection. Instead, post-process the tasks 344 // by collecting and marking cycle errors. 345 s.process(allKnown, finalize) 346 s.signal(completed) 347 if s.state == schedRUNNING { 348 if s.meets(s.needs) { 349 s.state = schedSUCCESS 350 } else { 351 s.state = schedFAILED 352 } 353 } 354 } 355 356 // process advances a scheduler by executing tasks that are required. 357 // Depending on mode, if the scheduler is blocked on a condition, it will 358 // forcefully unblock the tasks. 359 func (s *scheduler) process(needs condition, mode runMode) bool { 360 c := s.ctx 361 362 // Update completions, if necessary. 363 if f := c.taskContext.complete; f != nil { 364 s.signal(f(s)) 365 } 366 367 if Debug && len(s.tasks) > 0 { 368 if v := s.tasks[0].node.node; v != nil { 369 c.nest++ 370 c.Logf(v, "START Process %v -- mode: %v", v.Label, mode) 371 defer func() { 372 c.Logf(v, "END Process") 373 c.nest-- 374 }() 375 } 376 } 377 378 // hasRunning := false 379 s.state = schedRUNNING 380 // Use variable instead of range, because s.tasks may grow during processes. 381 382 processNextTask: 383 for s.taskPos < len(s.tasks) { 384 t := s.tasks[s.taskPos] 385 s.taskPos++ 386 387 if t.state != taskREADY { 388 // TODO(perf): Figure out how it is possible to reach this and if we 389 // should optimize. 390 // panic("task not READY") 391 } 392 393 switch { 394 case t.state == taskRUNNING: 395 // TODO: we could store the current referring node that caused 396 // the cycle and then proceed up the stack to mark all tasks 397 // that re involved in the cycle as well. Further, we could 398 // mark the cycle as a generation counter, instead of a boolean 399 // value, so that it will be trivial reconstruct a detailed cycle 400 // report when generating an error message. 401 402 case t.state != taskREADY: 403 404 default: 405 runTask(t, mode) 406 } 407 } 408 409 switch mode { 410 default: // case attemptOnly: 411 return s.meets(needs) 412 413 case yield: 414 if s.meets(needs) { 415 return true 416 } 417 c.current().waitFor(s, needs) 418 s.yield() 419 panic("unreachable") 420 421 case finalize: 422 // remainder of function 423 } 424 425 unblockTasks: 426 // Unblocking proceeds in three stages. Each of the stages may cause 427 // formerly blocked tasks to become unblocked. To ensure that unblocking 428 // tasks do not happen in an order-dependent way, we want to ensure that we 429 // have unblocked all tasks from one phase, before commencing to the next. 430 431 // The types of the node can no longer be altered. We can unblock the 432 // relevant states first to finish up any tasks that were just waiting for 433 // types, such as lists. 434 for _, t := range c.blocking { 435 if t.blockedOn != nil { 436 t.blockedOn.signal(s.ctx.autoUnblock) 437 } 438 } 439 440 // Mark all remaining conditions as "frozen" before actually running the 441 // tasks. Doing this before running the remaining tasks ensures that we get 442 // the same errors, regardless of the order in which tasks are unblocked. 443 for _, t := range c.blocking { 444 if t.blockedOn != nil { 445 t.blockedOn.freeze(t.blockCondition) 446 t.unblocked = true 447 } 448 } 449 450 // Run the remaining blocked tasks. 451 numBlocked := len(c.blocking) 452 for _, t := range c.blocking { 453 if t.blockedOn != nil { 454 n, cond := t.blockedOn, t.blockCondition 455 t.blockedOn, t.blockCondition = nil, neverKnown 456 n.signal(cond) 457 runTask(t, attemptOnly) // Does this need to be final? Probably not if we do a fixed point computation. 458 } 459 } 460 461 // The running of tasks above may result in more tasks being added to the 462 // queue. Process these first before continuing. 463 if s.taskPos < len(s.tasks) { 464 goto processNextTask 465 } 466 467 // Similarly, the running of tasks may result in more tasks being blocked. 468 // Ensure we processed them all. 469 if numBlocked < len(c.blocking) { 470 goto unblockTasks 471 } 472 473 c.blocking = c.blocking[:0] 474 475 return true 476 } 477 478 // yield causes the current task to be suspended until the given conditions 479 // are met. 480 func (s *scheduler) yield() { 481 panic(s) 482 } 483 484 // meets reports whether all needed completion states in s are met. 485 func (s *scheduler) meets(needs condition) bool { 486 if s.state != schedREADY { 487 // Automatically qualify for conditions that are not provided by this node. 488 // NOTE: in the evaluator this is generally not the case, as tasks my still 489 // be added during evaluation until all ancestor nodes are evaluated. This 490 // can be encoded by the scheduler by adding a state "ancestorsCompleted". 491 // which all other conditions depend on. 492 needs &= s.provided 493 } 494 return s.completed&needs == needs 495 } 496 497 // blockOn marks a state as uncompleted. 498 func (s *scheduler) blockOn(cond condition) { 499 // TODO: should we allow this to be used for counters states? 500 // if s.ctx.counterMask&cond != 0 { 501 // panic("cannot block on counter states") 502 // } 503 s.provided |= cond 504 } 505 506 // signal causes tasks that are blocking on the given completion to be run 507 // for this scheduler. Tasks are only run if the completion state was not 508 // already reached before. 509 func (s *scheduler) signal(completed condition) { 510 was := s.completed 511 s.completed |= completed 512 if was == s.completed { 513 s.frozen |= completed 514 return 515 } 516 517 s.completed |= s.ctx.complete(s) 518 s.frozen |= completed 519 520 // TODO: this could benefit from a linked list where tasks are removed 521 // from the list before being run. 522 for _, t := range s.blocking { 523 if t.blockCondition&s.completed == t.blockCondition { 524 // Prevent task from running again. 525 t.blockCondition = neverKnown 526 t.blockedOn = nil 527 runTask(t, attemptOnly) // TODO: does this ever need to be final? 528 // TODO: should only be run once for each blocking queue. 529 } 530 } 531 } 532 533 // freeze indicates no more tasks satisfying the given condition may be added. 534 // It is also used to freeze certain elements of the task. 535 func (s *scheduler) freeze(c condition) { 536 s.frozen |= c 537 s.completed |= c 538 s.ctx.complete(s) 539 s.isFrozen = true 540 } 541 542 // signalDoneAdding signals that no more tasks will be added to this scheduler. 543 // This allows unblocking tasks that depend on states for which there are no 544 // tasks in this scheduler. 545 func (s *scheduler) signalDoneAdding() { 546 s.signal(s.needs &^ s.provided) 547 } 548 549 // runner defines properties of a type of task, including a function to run it. 550 type runner struct { 551 name string 552 553 // The mode argument indicates whether the scheduler 554 // of this field is finalizing. It is passed as a component of the required 555 // state to various evaluation methods. 556 f func(ctx *OpContext, t *task, mode runMode) 557 558 // completes indicates which states this tasks contributes to. 559 completes condition 560 561 // needes indicates which states of the corresponding node need to be 562 // completed before this task can be run. 563 needs condition 564 } 565 566 type task struct { 567 state taskState 568 569 completes condition // cycles may alter the completion mask. TODO: is this still true? 570 571 // unblocked indicates this task was unblocked by force. 572 unblocked bool 573 574 // The following fields indicate what this task is blocked on, including 575 // the scheduler, which conditions it is blocking on, and the stack of 576 // tasks executed leading to the block. 577 578 blockedOn *scheduler 579 blockCondition condition 580 blockStack []*task // TODO: use; for error reporting. 581 582 err *Bottom 583 584 // The node from which this conjunct originates. 585 node *nodeContext 586 587 run *runner // TODO: use struct to make debugging easier? 588 589 // The Conjunct processed by this task. 590 env *Environment 591 id CloseInfo // TODO: rename to closeInfo? 592 x Node // The conjunct Expression or Value. 593 594 // For Comprehensions: 595 comp *envComprehension 596 leaf *Comprehension 597 } 598 599 func (s *scheduler) insertTask(t *task) { 600 completes := t.run.completes 601 needs := t.run.needs 602 603 s.needs |= needs 604 s.provided |= completes 605 606 if needs&completes != 0 { 607 panic("task depends on its own completion") 608 } 609 t.completes = completes 610 611 if s.state == schedFINALIZING { 612 runTask(t, finalize) 613 return 614 } 615 616 s.incrementCounts(completes) 617 if cc := t.id.cc; cc != nil { 618 // may be nil for "group" tasks, such as processLists. 619 dep := cc.incDependent(TASK, nil) 620 if dep != nil { 621 dep.taskID = len(s.tasks) 622 dep.task = t 623 } 624 } 625 s.tasks = append(s.tasks, t) 626 if s.completed&needs != needs { 627 t.waitFor(s, needs) 628 } 629 } 630 631 func runTask(t *task, mode runMode) { 632 ctx := t.node.ctx 633 634 switch t.state { 635 case taskSUCCESS, taskFAILED: 636 return 637 case taskRUNNING: 638 // TODO: should we mark this as a cycle? 639 } 640 641 defer func() { 642 switch r := recover().(type) { 643 case nil: 644 case *scheduler: 645 // Task must be WAITING. 646 if t.state == taskRUNNING { 647 t.state = taskSUCCESS // XXX: something else? Do we known the dependency? 648 if t.err != nil { 649 t.state = taskFAILED 650 } 651 } 652 default: 653 panic(r) 654 } 655 }() 656 657 defer ctx.PopArc(ctx.PushArc(t.node.node)) 658 659 // TODO: merge these two mechanisms once we get rid of the old evaluator. 660 ctx.pushTask(t) 661 defer ctx.popTask() 662 if t.env != nil { 663 id := t.id 664 id.cc = nil // this is done to avoid struct args from passing fields up. 665 s := ctx.PushConjunct(MakeConjunct(t.env, t.x, id)) 666 defer ctx.PopState(s) 667 } 668 669 t.state = taskRUNNING 670 // A task may have recorded an error on a previous try. Clear it. 671 t.err = nil 672 673 t.run.f(ctx, t, mode) 674 675 if t.state != taskWAITING { 676 t.blockedOn = nil 677 t.blockCondition = neverKnown 678 679 // TODO: always reporting errors in the current task would avoid us 680 // having to collect and assign errors here. 681 t.err = CombineErrors(nil, t.err, ctx.Err()) 682 if t.err == nil { 683 t.state = taskSUCCESS 684 } else { 685 t.state = taskFAILED 686 } 687 t.node.addBottom(t.err) // TODO: replace with something more principled. 688 689 if t.id.cc != nil { 690 t.id.cc.decDependent(ctx, TASK, nil) 691 } 692 t.node.decrementCounts(t.completes) 693 t.completes = 0 // safety 694 } 695 } 696 697 // waitFor blocks task t until the needs for scheduler s are met. 698 func (t *task) waitFor(s *scheduler, needs condition) { 699 if s.meets(needs) { 700 panic("waiting for condition that already completed") 701 } 702 // TODO: this line causes the scheduler state to fail if tasks are blocking 703 // on it. Is this desirable? At the very least we should then ensure that 704 // the scheduler where the tasks originate from will fail in that case. 705 s.needs |= needs 706 707 t.state = taskWAITING 708 709 t.blockCondition = needs 710 t.blockedOn = s 711 s.blocking = append(s.blocking, t) 712 s.ctx.blocking = append(s.ctx.blocking, t) 713 } 714