...

Source file src/cuelang.org/go/tools/flow/tasks.go

Documentation: cuelang.org/go/tools/flow

     1  // Copyright 2020 CUE Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package flow
    16  
    17  // This file contains functionality for identifying tasks in the configuration
    18  // and annotating the dependencies between them.
    19  
    20  import (
    21  	"cuelang.org/go/cue"
    22  	"cuelang.org/go/cue/errors"
    23  	"cuelang.org/go/internal/core/adt"
    24  	"cuelang.org/go/internal/core/dep"
    25  	"cuelang.org/go/internal/value"
    26  )
    27  
    28  // initTasks takes the current configuration and adds tasks to the list of
    29  // tasks. It can be run multiple times on increasingly more concrete
    30  // configurations to add more tasks, whereby the task pointers of previously
    31  // found tasks are preserved.
    32  func (c *Controller) initTasks() {
    33  	// Clear previous cache.
    34  	c.nodes = map[*adt.Vertex]*Task{}
    35  
    36  	v := c.inst.LookupPath(c.cfg.Root)
    37  	if err := v.Err(); err != nil {
    38  		c.addErr(err, "invalid root")
    39  		c.cancel()
    40  		return
    41  	}
    42  
    43  	// Mark any task that is located under the root.
    44  	c.findRootTasks(v)
    45  
    46  	// Mark any tasks that are implied by dependencies.
    47  	// Note that the list of tasks may grow as this loop progresses.
    48  	for i := 0; i < len(c.tasks); i++ {
    49  		t := c.tasks[i]
    50  		c.markTaskDependencies(t, t.vertex())
    51  	}
    52  
    53  	// Check if there are cycles in the task dependencies.
    54  	if err := checkCycle(c.tasks); err != nil {
    55  		c.addErr(err, "cyclic task")
    56  	}
    57  
    58  	if c.errs != nil {
    59  		c.cancel()
    60  	}
    61  }
    62  
    63  // findRootTasks finds tasks under the root.
    64  func (c *Controller) findRootTasks(v cue.Value) {
    65  	t := c.getTask(nil, v)
    66  
    67  	if t != nil {
    68  		return
    69  	}
    70  
    71  	opts := []cue.Option{}
    72  
    73  	if c.cfg.FindHiddenTasks {
    74  		opts = append(opts, cue.Hidden(true), cue.Definitions(false))
    75  	}
    76  
    77  	for iter, _ := v.Fields(opts...); iter.Next(); {
    78  		c.findRootTasks(iter.Value())
    79  	}
    80  	for iter, _ := v.List(); iter.Next(); {
    81  		c.findRootTasks(iter.Value())
    82  	}
    83  
    84  }
    85  
    86  // This file contains the functionality to locate and record the tasks of
    87  // a configuration. It:
    88  //   - create Task struct for each node that is a task
    89  //   - associate nodes in a configuration with a Task, if applicable.
    90  // The node-to-task map is used to determine task dependencies.
    91  
    92  // getTask finds and marks tasks that are descendents of v.
    93  func (c *Controller) getTask(scope *Task, v cue.Value) *Task {
    94  	// Look up cached node.
    95  	_, w := value.ToInternal(v)
    96  	if t, ok := c.nodes[w]; ok {
    97  		return t
    98  	}
    99  
   100  	// Look up cached task from previous evaluation.
   101  	p := v.Path()
   102  	key := p.String()
   103  
   104  	t := c.keys[key]
   105  
   106  	if t == nil {
   107  		r, err := c.isTask(v)
   108  
   109  		var errs errors.Error
   110  		if err != nil {
   111  			if !c.inRoot(w) {
   112  				// Must be in InferTask mode. In this case we ignore the error.
   113  				r = nil
   114  			} else {
   115  				c.addErr(err, "invalid task")
   116  				errs = errors.Promote(err, "create task")
   117  			}
   118  		}
   119  
   120  		if r != nil {
   121  			index := len(c.tasks)
   122  			t = &Task{
   123  				v:      v,
   124  				c:      c,
   125  				r:      r,
   126  				path:   p,
   127  				labels: w.Path(),
   128  				key:    key,
   129  				index:  index,
   130  				err:    errs,
   131  			}
   132  			c.tasks = append(c.tasks, t)
   133  			c.keys[key] = t
   134  		}
   135  	}
   136  
   137  	// Process nodes of task for this evaluation.
   138  	if t != nil {
   139  		scope = t
   140  		if t.state <= Ready {
   141  			// Don't set the value if the task is currently running as this may
   142  			// result in all kinds of inconsistency issues.
   143  			t.v = v
   144  		}
   145  
   146  		c.tagChildren(w, t)
   147  	}
   148  
   149  	c.nodes[w] = scope
   150  
   151  	return t
   152  }
   153  
   154  func (c *Controller) tagChildren(n *adt.Vertex, t *Task) {
   155  	for _, a := range n.Arcs {
   156  		c.nodes[a] = t
   157  		c.tagChildren(a, t)
   158  	}
   159  }
   160  
   161  // findImpliedTask determines the task of corresponding to node n, if any. If n
   162  // is not already associated with a task, it tries to determine whether n is
   163  // part of a task by checking if any of the parent nodes is a task.
   164  //
   165  // TODO: it is actually more accurate to check for tasks from top down. TODO:
   166  // What should be done if a subtasks is referenced that is embedded in another
   167  // task. Should the surrounding tasks be added as well?
   168  func (c *Controller) findImpliedTask(d dep.Dependency) *Task {
   169  	// Ignore references into packages. Fill will fundamentally not work for
   170  	// packages, and packages cannot point back to the main package as cycles
   171  	// are not allowed.
   172  	if d.Import() != nil {
   173  		return nil
   174  	}
   175  
   176  	n := d.Node
   177  
   178  	// This Finalize should not be necessary, as the input to dep is already
   179  	// finalized. However, cue cmd uses some legacy instance stitching code
   180  	// where some of the backlink Environments are not properly initialized.
   181  	// Finalizing should patch those up at the expense of doing some duplicate
   182  	// work. The plan is to replace `cue cmd` with a much more clean
   183  	// implementation (probably a separate tool called `cuerun`) where this
   184  	// issue is fixed. For now we leave this patch.
   185  	//
   186  	// Note that this issue predates package flow, but that it just surfaced in
   187  	// flow and having a different evaluation order.
   188  	//
   189  	// Note: this call is cheap if n is already Finalized.
   190  	n.Finalize(c.opCtx)
   191  
   192  	for ; n != nil; n = n.Parent {
   193  		if c.cfg.IgnoreConcrete && n.IsConcrete() {
   194  			if k := n.BaseValue.Kind(); k != adt.StructKind && k != adt.ListKind {
   195  				return nil
   196  			}
   197  		}
   198  
   199  		t, ok := c.nodes[n]
   200  		if ok || !c.cfg.InferTasks {
   201  			return t
   202  		}
   203  
   204  		if !d.IsRoot() {
   205  			v := value.Make(c.opCtx, n)
   206  
   207  			if t := c.getTask(nil, v); t != nil {
   208  				return t
   209  			}
   210  		}
   211  	}
   212  
   213  	return nil
   214  }
   215  
   216  // markTaskDependencies traces through all conjuncts of a Task and marks
   217  // any dependencies on other tasks.
   218  //
   219  // The dependencies for a node by traversing the nodes of a task and then
   220  // traversing the dependencies of the conjuncts.
   221  //
   222  // This terminates because:
   223  //
   224  //   - traversing all nodes of all tasks is guaranteed finite (CUE does not
   225  //     evaluate to infinite structure).
   226  //
   227  //   - traversing conjuncts of all nodes is finite, as the input syntax is
   228  //     inherently finite.
   229  //
   230  //   - as regular nodes are traversed recursively they are marked with a cycle
   231  //     marker to detect cycles, ensuring a finite traversal as well.
   232  func (c *Controller) markTaskDependencies(t *Task, n *adt.Vertex) {
   233  	cfg := &dep.Config{
   234  		Dynamic: true,
   235  	}
   236  	dep.Visit(cfg, c.opCtx, n, func(d dep.Dependency) error {
   237  		depTask := c.findImpliedTask(d)
   238  		if depTask != nil {
   239  			if depTask != cycleMarker {
   240  				v := value.Make(c.opCtx, d.Node)
   241  				t.addDep(v.Path().String(), depTask)
   242  			}
   243  			return nil
   244  		}
   245  
   246  		// If this points to a non-task node, it may itself point to a task.
   247  		// Handling this allows for dynamic references. For instance, such a
   248  		// value may reference the result value of a task, or even create
   249  		// new tasks based on the result of another task.
   250  		if d.Import() == nil {
   251  			if c.nodes[d.Node] == cycleMarker {
   252  				return nil
   253  			}
   254  			c.nodes[d.Node] = cycleMarker
   255  			d.Recurse()
   256  			c.nodes[d.Node] = nil
   257  		}
   258  		return nil
   259  	})
   260  }
   261  
   262  func (c *Controller) inRoot(n *adt.Vertex) bool {
   263  	path := value.Make(c.opCtx, n).Path().Selectors()
   264  	root := c.cfg.Root.Selectors()
   265  	if len(path) < len(root) {
   266  		return false
   267  	}
   268  	for i, sel := range root {
   269  		if path[i] != sel {
   270  			return false
   271  		}
   272  	}
   273  	return true
   274  }
   275  
   276  var cycleMarker = &Task{}
   277  

View as plain text