...

Source file src/cuelang.org/go/internal/core/adt/sched.go

Documentation: cuelang.org/go/internal/core/adt

     1  // Copyright 2023 CUE Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package adt
    16  
    17  import (
    18  	"math/bits"
    19  )
    20  
    21  // The CUE scheduler schedules tasks for evaluation.
    22  //
    23  // A task is a computation unit associated with a single node. Each task may
    24  // depend on knowing certain properties of one or more fields, namely:
    25  //
    26  //  - whether the field exists
    27  //  - the scalar value of a field, if any
    28  //  - the set of all conjuncts
    29  //  - the set of all sub fields
    30  //  - the recursively evaluated value
    31  //
    32  // Each task, in turn, may mark itself as providing knowledge about one or more
    33  // of these properties. If it is not known upfront whether a task may contribute
    34  // to a certain property, it must mark itself as (potentially) contributing to
    35  // this property.
    36  //
    37  //
    38  // DEPENDENCY GRAPH
    39  //
    40  // A task may depend on zero or more fields, including the field for which it
    41  // is defined. The graph of all dependencies is defined as follows:
    42  //
    43  // - Each task and each <field, property> pair is a node in the graph.
    44  // - A task T for field F that (possibly) computes property P for F is
    45  //   represented by an edge from <F, P> to T.
    46  // - A task T for field F that depends on property P of field G is represented
    47  //   by an edge from <G, P> to T.
    48  //
    49  // It is an evaluation cycle for a task T if there is a path from any task T to
    50  // itself in the dependency graph. Processing will stop in the even of such a
    51  // cycle. In such case, the scheduler will commence an unblocking mechanism.
    52  //
    53  // As a general rule, once a node is detected to be blocking, it may no longer
    54  // become more specific. In other words, it is "frozen".
    55  // The unblocking consists of two phases: the scheduler will first freeze and
    56  // unblock all blocked nodes for the properties marked as autoUnblock-ing in
    57  // taskContext. Subsequently all tasks that are unblocked by this will run.
    58  // In the next phase all remaining tasks are unblocked.
    59  // See taskContext.autoUnblock for more information.
    60  //
    61  // Note that some tasks, like references, may depend on other fields without
    62  // requiring a certain property. These do not count as dependencies.
    63  
    64  // A taskContext manages the task memory and task stack.
    65  // It is typically associated with an OpContext.
    66  type taskContext struct {
    67  	// stack tracks the current execution of tasks. This is a stack as tasks
    68  	// may trigger the evaluation of other tasks to complete.
    69  	stack []*task
    70  
    71  	// blocking lists all tasks that were blocked during a round of evaluation.
    72  	// Evaluation finalized one node at a time, which includes the evaluation
    73  	// of all nodes necessary to evaluate that node. Any task that is blocked
    74  	// during such a round of evaluation is recorded here. Any mutual cycles
    75  	// will result in unresolved tasks. At the end of such a round, computation
    76  	// can be frozen and the tasks unblocked.
    77  	blocking []*task
    78  
    79  	// counterMask marks which conditions use counters. Other conditions are
    80  	// handled by signals only.
    81  	counterMask condition
    82  
    83  	// autoUnblock marks the flags that get unblocked automatically when there
    84  	// is a deadlock between nodes. These are properties that may become
    85  	// meaningful once it is known that a value may not become more specific.
    86  	// An example of this is the property "scalar". If something is not a scalar
    87  	// yet, and it is known that the value may never become more specific, it is
    88  	// known that this value is never will become a scalar, thus effectively
    89  	// making it known.
    90  	autoUnblock condition
    91  
    92  	// This is called upon completion of states, allowing other states to be
    93  	// updated atomically.
    94  	complete func(s *scheduler) condition
    95  }
    96  
    97  func (p *taskContext) current() *task {
    98  	return p.stack[len(p.stack)-1]
    99  }
   100  
   101  func (p *taskContext) pushTask(t *task) {
   102  	p.stack = append(p.stack, t)
   103  }
   104  
   105  func (p *taskContext) popTask() {
   106  	p.stack = p.stack[:len(p.stack)-1]
   107  }
   108  
   109  func (p *taskContext) newTask() *task {
   110  	// TODO: allocate from pool.
   111  	return &task{}
   112  }
   113  
   114  type taskState uint8
   115  
   116  const (
   117  	taskREADY taskState = iota
   118  
   119  	taskRUNNING // processing conjunct(s)
   120  	taskWAITING // task is blocked on a property of an arc to hold
   121  	taskSUCCESS
   122  	taskFAILED
   123  )
   124  
   125  type schedState uint8
   126  
   127  const (
   128  	schedREADY schedState = iota
   129  
   130  	schedRUNNING    // processing conjunct(s)
   131  	schedFINALIZING // all tasks completed, run new tasks immediately
   132  	schedSUCCESS
   133  	schedFAILED
   134  )
   135  
   136  func (s schedState) done() bool { return s >= schedSUCCESS }
   137  
   138  func (s taskState) String() string {
   139  	switch s {
   140  	case taskREADY:
   141  		return "READY"
   142  	case taskRUNNING:
   143  		return "RUNNING"
   144  	case taskWAITING:
   145  		return "WAITING"
   146  	case taskSUCCESS:
   147  		return "SUCCESS"
   148  	case taskFAILED:
   149  		return "FAILED"
   150  	default:
   151  		return "UNKNOWN"
   152  	}
   153  }
   154  
   155  func (s schedState) String() string {
   156  	switch s {
   157  	case schedREADY:
   158  		return "READY"
   159  	case schedRUNNING:
   160  		return "RUNNING"
   161  	case schedFINALIZING:
   162  		return "FINALIZING"
   163  	case schedSUCCESS:
   164  		return "SUCCESS"
   165  	case schedFAILED:
   166  		return "FAILED"
   167  	default:
   168  		return "UNKNOWN"
   169  	}
   170  }
   171  
   172  // runMode indicates how to proceed after a condition could not be met.
   173  type runMode uint8
   174  
   175  const (
   176  	// ignore indicates that the new evaluator should not do any processing.
   177  	// This is mostly used in the transition from old to new evaluator and
   178  	// should probably eventually be removed.
   179  	ignore runMode = 1 + iota
   180  
   181  	// attemptOnly indicates that execution should continue even if the
   182  	// condition is not met.
   183  	attemptOnly
   184  
   185  	// yield means that execution should be yielded if the condition is not met.
   186  	// That is, the task is marked as a dependency and control is returned to
   187  	// the runloop. The task will resume once the dependency is met.
   188  	yield
   189  
   190  	// finalize means that uncompleted tasks should be turned into errors to
   191  	// complete the evaluation of a Vertex.
   192  	finalize
   193  )
   194  
   195  func (r runMode) String() string {
   196  	switch r {
   197  	case ignore:
   198  		return "ignore"
   199  	case attemptOnly:
   200  		return "attemptOnly"
   201  	case yield:
   202  		return "yield"
   203  	case finalize:
   204  		return "finalize"
   205  	}
   206  	return "unknown"
   207  }
   208  
   209  // condition is a bit mask of states that a task may depend on.
   210  //
   211  // There are generally two types of states: states that are met if all tasks
   212  // that contribute to that state are completed (counter states), and states that
   213  // are met if some global set of conditions are met.
   214  type condition uint16
   215  
   216  const (
   217  	// allKnown indicates that all possible states are completed.
   218  	allKnown condition = 0x7fff
   219  
   220  	// neverKnown is a special condition that is never met. It can be used to
   221  	// mark a task as impossible to complete.
   222  	neverKnown condition = 0x8000
   223  )
   224  
   225  func (c condition) meets(x condition) bool {
   226  	return c&x == x
   227  }
   228  
   229  const numCompletionStates = 10 // TODO: make this configurable
   230  
   231  // A scheduler represents the set of outstanding tasks for a node.
   232  type scheduler struct {
   233  	ctx  *OpContext
   234  	node *nodeContext
   235  
   236  	state schedState
   237  
   238  	// completed is bit set of completed states.
   239  	completed condition
   240  
   241  	// needs specifies all the states needed to complete tasks in this scheduler.
   242  	needs condition
   243  
   244  	// provided specifies all the states that are provided by tasks added
   245  	// to this scheduler.
   246  	provided condition // TODO: rename to "provides"? To be consistent with "needs".
   247  
   248  	// frozen indicates all states that are frozen. These bits should be checked
   249  	// before making a node more specific.
   250  	// TODO: do we need a separate field for this, or can we use completed?
   251  	frozen condition
   252  
   253  	// isFrozen indicates if freeze was called explicitly.
   254  	//
   255  	// TODO: rename to isExplicitlyFrozen if it turns out we need both frozen
   256  	// and isFrozen. We probably do not. Check once the implementation of the
   257  	// new evaluator is complete.
   258  	isFrozen bool
   259  
   260  	// counters keeps track of the number of uncompleted tasks that are
   261  	// outstanding for each of the possible conditions. A state is
   262  	// considered completed if the corresponding counter reaches zero.
   263  	counters [numCompletionStates]int
   264  
   265  	// tasks lists all tasks that were scheduled for this scheduler.
   266  	// The list only contains tasks that are associated with this node.
   267  	// TODO: rename to queue and taskPos to nextQueueIndex.
   268  	tasks   []*task
   269  	taskPos int
   270  
   271  	// blocking is a list of tasks that are blocked on the completion of
   272  	// the indicate conditions. This can hold tasks from other nodes or tasks
   273  	// originating from this node itself.
   274  	blocking []*task
   275  }
   276  
   277  func (s *scheduler) clear() {
   278  	// TODO(perf): free tasks into task pool
   279  
   280  	*s = scheduler{
   281  		ctx:      s.ctx,
   282  		tasks:    s.tasks[:0],
   283  		blocking: s.blocking[:0],
   284  	}
   285  }
   286  
   287  // cloneInto initializes the state of dst to be the same as s.
   288  //
   289  // NOTE: this is deliberately not a pointer receiver: this approach allows
   290  // cloning s into dst while preserving the buffers of dst and not having to
   291  // explicitly clone any non-buffer fields.
   292  func (s scheduler) cloneInto(dst *scheduler) {
   293  	s.tasks = append(dst.tasks, s.tasks...)
   294  	s.blocking = append(dst.blocking, s.blocking...)
   295  
   296  	*dst = s
   297  }
   298  
   299  // incrementCounts adds the counters for each condition.
   300  // See also decrementCounts.
   301  func (s *scheduler) incrementCounts(x condition) {
   302  	x &= s.ctx.counterMask
   303  
   304  	for {
   305  		n := bits.TrailingZeros16(uint16(x))
   306  		if n == 16 {
   307  			break
   308  		}
   309  		bit := condition(1 << n)
   310  		x &^= bit
   311  
   312  		s.counters[n]++
   313  	}
   314  }
   315  
   316  // decrementCounts decrements the counters for each condition. If a counter for
   317  // a condition reaches zero, it means that condition is met and all blocking
   318  // tasks depending on that state can be run.
   319  func (s *scheduler) decrementCounts(x condition) {
   320  	x &= s.ctx.counterMask
   321  
   322  	var completed condition
   323  	for {
   324  		n := bits.TrailingZeros16(uint16(x))
   325  		if n == 16 {
   326  			break
   327  		}
   328  		bit := condition(1 << n)
   329  		x &^= bit
   330  
   331  		s.counters[n]--
   332  		if s.counters[n] == 0 {
   333  			completed |= bit
   334  		}
   335  	}
   336  
   337  	s.signal(completed)
   338  }
   339  
   340  // finalize runs all tasks and signals that the scheduler is done upon
   341  // completion for the given signals.
   342  func (s *scheduler) finalize(completed condition) {
   343  	// Do not panic on cycle detection. Instead, post-process the tasks
   344  	// by collecting and marking cycle errors.
   345  	s.process(allKnown, finalize)
   346  	s.signal(completed)
   347  	if s.state == schedRUNNING {
   348  		if s.meets(s.needs) {
   349  			s.state = schedSUCCESS
   350  		} else {
   351  			s.state = schedFAILED
   352  		}
   353  	}
   354  }
   355  
   356  // process advances a scheduler by executing tasks that are required.
   357  // Depending on mode, if the scheduler is blocked on a condition, it will
   358  // forcefully unblock the tasks.
   359  func (s *scheduler) process(needs condition, mode runMode) bool {
   360  	c := s.ctx
   361  
   362  	// Update completions, if necessary.
   363  	if f := c.taskContext.complete; f != nil {
   364  		s.signal(f(s))
   365  	}
   366  
   367  	if Debug && len(s.tasks) > 0 {
   368  		if v := s.tasks[0].node.node; v != nil {
   369  			c.nest++
   370  			c.Logf(v, "START Process %v -- mode: %v", v.Label, mode)
   371  			defer func() {
   372  				c.Logf(v, "END Process")
   373  				c.nest--
   374  			}()
   375  		}
   376  	}
   377  
   378  	// hasRunning := false
   379  	s.state = schedRUNNING
   380  	// Use variable instead of range, because s.tasks may grow during processes.
   381  
   382  processNextTask:
   383  	for s.taskPos < len(s.tasks) {
   384  		t := s.tasks[s.taskPos]
   385  		s.taskPos++
   386  
   387  		if t.state != taskREADY {
   388  			// TODO(perf): Figure out how it is possible to reach this and if we
   389  			// should optimize.
   390  			// panic("task not READY")
   391  		}
   392  
   393  		switch {
   394  		case t.state == taskRUNNING:
   395  			// TODO: we could store the current referring node that caused
   396  			// the cycle and then proceed up the stack to mark all tasks
   397  			// that re involved in the cycle as well. Further, we could
   398  			// mark the cycle as a generation counter, instead of a boolean
   399  			// value, so that it will be trivial reconstruct a detailed cycle
   400  			// report when generating an error message.
   401  
   402  		case t.state != taskREADY:
   403  
   404  		default:
   405  			runTask(t, mode)
   406  		}
   407  	}
   408  
   409  	switch mode {
   410  	default: // case attemptOnly:
   411  		return s.meets(needs)
   412  
   413  	case yield:
   414  		if s.meets(needs) {
   415  			return true
   416  		}
   417  		c.current().waitFor(s, needs)
   418  		s.yield()
   419  		panic("unreachable")
   420  
   421  	case finalize:
   422  		// remainder of function
   423  	}
   424  
   425  unblockTasks:
   426  	// Unblocking proceeds in three stages. Each of the stages may cause
   427  	// formerly blocked tasks to become unblocked. To ensure that unblocking
   428  	// tasks do not happen in an order-dependent way, we want to ensure that we
   429  	// have unblocked all tasks from one phase, before commencing to the next.
   430  
   431  	// The types of the node can no longer be altered. We can unblock the
   432  	// relevant states first to finish up any tasks that were just waiting for
   433  	// types, such as lists.
   434  	for _, t := range c.blocking {
   435  		if t.blockedOn != nil {
   436  			t.blockedOn.signal(s.ctx.autoUnblock)
   437  		}
   438  	}
   439  
   440  	// Mark all remaining conditions as "frozen" before actually running the
   441  	// tasks. Doing this before running the remaining tasks ensures that we get
   442  	// the same errors, regardless of the order in which tasks are unblocked.
   443  	for _, t := range c.blocking {
   444  		if t.blockedOn != nil {
   445  			t.blockedOn.freeze(t.blockCondition)
   446  			t.unblocked = true
   447  		}
   448  	}
   449  
   450  	// Run the remaining blocked tasks.
   451  	numBlocked := len(c.blocking)
   452  	for _, t := range c.blocking {
   453  		if t.blockedOn != nil {
   454  			n, cond := t.blockedOn, t.blockCondition
   455  			t.blockedOn, t.blockCondition = nil, neverKnown
   456  			n.signal(cond)
   457  			runTask(t, attemptOnly) // Does this need to be final? Probably not if we do a fixed point computation.
   458  		}
   459  	}
   460  
   461  	// The running of tasks above may result in more tasks being added to the
   462  	// queue. Process these first before continuing.
   463  	if s.taskPos < len(s.tasks) {
   464  		goto processNextTask
   465  	}
   466  
   467  	// Similarly, the running of tasks may result in more tasks being blocked.
   468  	// Ensure we processed them all.
   469  	if numBlocked < len(c.blocking) {
   470  		goto unblockTasks
   471  	}
   472  
   473  	c.blocking = c.blocking[:0]
   474  
   475  	return true
   476  }
   477  
   478  // yield causes the current task to be suspended until the given conditions
   479  // are met.
   480  func (s *scheduler) yield() {
   481  	panic(s)
   482  }
   483  
   484  // meets reports whether all needed completion states in s are met.
   485  func (s *scheduler) meets(needs condition) bool {
   486  	if s.state != schedREADY {
   487  		// Automatically qualify for conditions that are not provided by this node.
   488  		// NOTE: in the evaluator this is generally not the case, as tasks my still
   489  		// be added during evaluation until all ancestor nodes are evaluated. This
   490  		// can be encoded by the scheduler by adding a state "ancestorsCompleted".
   491  		// which all other conditions depend on.
   492  		needs &= s.provided
   493  	}
   494  	return s.completed&needs == needs
   495  }
   496  
   497  // blockOn marks a state as uncompleted.
   498  func (s *scheduler) blockOn(cond condition) {
   499  	// TODO:  should we allow this to be used for counters states?
   500  	// if s.ctx.counterMask&cond != 0 {
   501  	// 	panic("cannot block on counter states")
   502  	// }
   503  	s.provided |= cond
   504  }
   505  
   506  // signal causes tasks that are blocking on the given completion to be run
   507  // for this scheduler. Tasks are only run if the completion state was not
   508  // already reached before.
   509  func (s *scheduler) signal(completed condition) {
   510  	was := s.completed
   511  	s.completed |= completed
   512  	if was == s.completed {
   513  		s.frozen |= completed
   514  		return
   515  	}
   516  
   517  	s.completed |= s.ctx.complete(s)
   518  	s.frozen |= completed
   519  
   520  	// TODO: this could benefit from a linked list where tasks are removed
   521  	// from the list before being run.
   522  	for _, t := range s.blocking {
   523  		if t.blockCondition&s.completed == t.blockCondition {
   524  			// Prevent task from running again.
   525  			t.blockCondition = neverKnown
   526  			t.blockedOn = nil
   527  			runTask(t, attemptOnly) // TODO: does this ever need to be final?
   528  			// TODO: should only be run once for each blocking queue.
   529  		}
   530  	}
   531  }
   532  
   533  // freeze indicates no more tasks satisfying the given condition may be added.
   534  // It is also used to freeze certain elements of the task.
   535  func (s *scheduler) freeze(c condition) {
   536  	s.frozen |= c
   537  	s.completed |= c
   538  	s.ctx.complete(s)
   539  	s.isFrozen = true
   540  }
   541  
   542  // signalDoneAdding signals that no more tasks will be added to this scheduler.
   543  // This allows unblocking tasks that depend on states for which there are no
   544  // tasks in this scheduler.
   545  func (s *scheduler) signalDoneAdding() {
   546  	s.signal(s.needs &^ s.provided)
   547  }
   548  
   549  // runner defines properties of a type of task, including a function to run it.
   550  type runner struct {
   551  	name string
   552  
   553  	// The mode argument indicates whether the scheduler
   554  	// of this field is finalizing. It is passed as a component of the required
   555  	// state to various evaluation methods.
   556  	f func(ctx *OpContext, t *task, mode runMode)
   557  
   558  	// completes indicates which states this tasks contributes to.
   559  	completes condition
   560  
   561  	// needes indicates which states of the corresponding node need to be
   562  	// completed before this task can be run.
   563  	needs condition
   564  }
   565  
   566  type task struct {
   567  	state taskState
   568  
   569  	completes condition // cycles may alter the completion mask. TODO: is this still true?
   570  
   571  	// unblocked indicates this task was unblocked by force.
   572  	unblocked bool
   573  
   574  	// The following fields indicate what this task is blocked on, including
   575  	// the scheduler, which conditions it is blocking on, and the stack of
   576  	// tasks executed leading to the block.
   577  
   578  	blockedOn      *scheduler
   579  	blockCondition condition
   580  	blockStack     []*task // TODO: use; for error reporting.
   581  
   582  	err *Bottom
   583  
   584  	// The node from which this conjunct originates.
   585  	node *nodeContext
   586  
   587  	run *runner // TODO: use struct to make debugging easier?
   588  
   589  	// The Conjunct processed by this task.
   590  	env *Environment
   591  	id  CloseInfo // TODO: rename to closeInfo?
   592  	x   Node      // The conjunct Expression or Value.
   593  
   594  	// For Comprehensions:
   595  	comp *envComprehension
   596  	leaf *Comprehension
   597  }
   598  
   599  func (s *scheduler) insertTask(t *task) {
   600  	completes := t.run.completes
   601  	needs := t.run.needs
   602  
   603  	s.needs |= needs
   604  	s.provided |= completes
   605  
   606  	if needs&completes != 0 {
   607  		panic("task depends on its own completion")
   608  	}
   609  	t.completes = completes
   610  
   611  	if s.state == schedFINALIZING {
   612  		runTask(t, finalize)
   613  		return
   614  	}
   615  
   616  	s.incrementCounts(completes)
   617  	if cc := t.id.cc; cc != nil {
   618  		// may be nil for "group" tasks, such as processLists.
   619  		dep := cc.incDependent(TASK, nil)
   620  		if dep != nil {
   621  			dep.taskID = len(s.tasks)
   622  			dep.task = t
   623  		}
   624  	}
   625  	s.tasks = append(s.tasks, t)
   626  	if s.completed&needs != needs {
   627  		t.waitFor(s, needs)
   628  	}
   629  }
   630  
   631  func runTask(t *task, mode runMode) {
   632  	ctx := t.node.ctx
   633  
   634  	switch t.state {
   635  	case taskSUCCESS, taskFAILED:
   636  		return
   637  	case taskRUNNING:
   638  		// TODO: should we mark this as a cycle?
   639  	}
   640  
   641  	defer func() {
   642  		switch r := recover().(type) {
   643  		case nil:
   644  		case *scheduler:
   645  			// Task must be WAITING.
   646  			if t.state == taskRUNNING {
   647  				t.state = taskSUCCESS // XXX: something else? Do we known the dependency?
   648  				if t.err != nil {
   649  					t.state = taskFAILED
   650  				}
   651  			}
   652  		default:
   653  			panic(r)
   654  		}
   655  	}()
   656  
   657  	defer ctx.PopArc(ctx.PushArc(t.node.node))
   658  
   659  	// TODO: merge these two mechanisms once we get rid of the old evaluator.
   660  	ctx.pushTask(t)
   661  	defer ctx.popTask()
   662  	if t.env != nil {
   663  		id := t.id
   664  		id.cc = nil // this is done to avoid struct args from passing fields up.
   665  		s := ctx.PushConjunct(MakeConjunct(t.env, t.x, id))
   666  		defer ctx.PopState(s)
   667  	}
   668  
   669  	t.state = taskRUNNING
   670  	// A task may have recorded an error on a previous try. Clear it.
   671  	t.err = nil
   672  
   673  	t.run.f(ctx, t, mode)
   674  
   675  	if t.state != taskWAITING {
   676  		t.blockedOn = nil
   677  		t.blockCondition = neverKnown
   678  
   679  		// TODO: always reporting errors in the current task would avoid us
   680  		// having to collect and assign errors here.
   681  		t.err = CombineErrors(nil, t.err, ctx.Err())
   682  		if t.err == nil {
   683  			t.state = taskSUCCESS
   684  		} else {
   685  			t.state = taskFAILED
   686  		}
   687  		t.node.addBottom(t.err) // TODO: replace with something more principled.
   688  
   689  		if t.id.cc != nil {
   690  			t.id.cc.decDependent(ctx, TASK, nil)
   691  		}
   692  		t.node.decrementCounts(t.completes)
   693  		t.completes = 0 // safety
   694  	}
   695  }
   696  
   697  // waitFor blocks task t until the needs for scheduler s are met.
   698  func (t *task) waitFor(s *scheduler, needs condition) {
   699  	if s.meets(needs) {
   700  		panic("waiting for condition that already completed")
   701  	}
   702  	// TODO: this line causes the scheduler state to fail if tasks are blocking
   703  	// on it. Is this desirable? At the very least we should then ensure that
   704  	// the scheduler where the tasks originate from will fail in that case.
   705  	s.needs |= needs
   706  
   707  	t.state = taskWAITING
   708  
   709  	t.blockCondition = needs
   710  	t.blockedOn = s
   711  	s.blocking = append(s.blocking, t)
   712  	s.ctx.blocking = append(s.ctx.blocking, t)
   713  }
   714  

View as plain text