...

Source file src/cuelang.org/go/tools/flow/flow.go

Documentation: cuelang.org/go/tools/flow

     1  // Copyright 2020 CUE Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Package flow provides a low-level workflow manager based on a CUE Instance.
    16  //
    17  // A Task defines an operational unit in a Workflow and corresponds to a struct
    18  // in a CUE instance. This package does not define what a Task looks like in a
    19  // CUE Instance. Instead, the user of this package must supply a TaskFunc that
    20  // creates a Runner for cue.Values that are deemed to be a Task.
    21  //
    22  // Tasks may depend on other tasks. Cyclic dependencies are thereby not allowed.
    23  // A Task A depends on another Task B if A, directly or indirectly, has a
    24  // reference to any field of Task B, including its root.
    25  package flow
    26  
    27  // TODO: Add hooks. This would allow UIs, for instance, to report on progress.
    28  //
    29  // - New(inst *cue.Instance, options ...Option)
    30  // - AddTask(v cue.Value, r Runner) *Task
    31  // - AddDependency(a, b *Task)
    32  // - AddTaskGraph(root cue.Value, fn taskFunc)
    33  // - AddSequence(list cue.Value, fn taskFunc)
    34  // - Err()
    35  
    36  // TODO:
    37  // Should we allow lists as a shorthand for a sequence of tasks?
    38  // If so, how do we specify termination behavior?
    39  
    40  // TODO:
    41  // Should we allow tasks to be a child of another task? Currently, the search
    42  // for tasks end once a task root is found.
    43  //
    44  // Semantically it is somewhat unclear to do so: for instance, if an $after
    45  // is used to refer to an explicit task dependency, it is logically
    46  // indistinguishable whether this should be a subtask or is a dependency.
    47  // Using higher-order constructs for analysis is generally undesirable.
    48  //
    49  // A possible solution would be to define specific "grouping tasks" whose sole
    50  // purpose is to define sub tasks. The user of this package would then need
    51  // to explicitly distinguish between tasks that are dependencies and tasks that
    52  // are subtasks.
    53  
    54  // TODO: streaming tasks/ server applications
    55  //
    56  // Workflows are currently implemented for batch processing, for instance to
    57  // implement shell scripting or other kinds of batch processing.
    58  //
    59  // This API has been designed, however, to also allow for streaming
    60  // applications. For instance, a streaming Task could listen for Etcd changes
    61  // or incoming HTTP requests and send updates each time an input changes.
    62  // Downstream tasks could then alternate between a Waiting and Running state.
    63  //
    64  // Note that such streaming applications would also cause configurations to
    65  // potentially not become increasingly more specific. Instead, a Task would
    66  // replace its old result each time it is updated. This would require tracking
    67  // of which conjunct was previously created by a task.
    68  
    69  import (
    70  	"context"
    71  	"fmt"
    72  	"os"
    73  	"strings"
    74  	"sync/atomic"
    75  
    76  	"cuelang.org/go/cue"
    77  	"cuelang.org/go/cue/errors"
    78  	"cuelang.org/go/cue/stats"
    79  	"cuelang.org/go/internal/core/adt"
    80  	"cuelang.org/go/internal/core/convert"
    81  	"cuelang.org/go/internal/core/eval"
    82  	"cuelang.org/go/internal/value"
    83  )
    84  
    85  var (
    86  	// ErrAbort may be returned by a task to avoid processing downstream tasks.
    87  	// This can be used by control nodes to influence execution.
    88  	ErrAbort = errors.New("abort dependant tasks without failure")
    89  
    90  	// TODO: ErrUpdate: update and run a dependency, but don't complete a
    91  	// dependency as more results may come. This is useful in server mode.
    92  
    93  	debug = os.Getenv("CUE_DEBUG_TOOLS_FLOW") != ""
    94  )
    95  
    96  // A TaskFunc creates a Runner for v if v defines a task or reports nil
    97  // otherwise. It reports an error for illformed tasks.
    98  //
    99  // If TaskFunc returns a non-nil Runner the search for task within v stops.
   100  // That is, subtasks are not supported.
   101  type TaskFunc func(v cue.Value) (Runner, error)
   102  
   103  // A Runner executes a Task.
   104  type Runner interface {
   105  	// Run runs a Task. If any of the tasks it depends on returned an error it
   106  	// is passed to this task. It reports an error upon failure.
   107  	//
   108  	// Any results to be returned can be set by calling Fill on the passed task.
   109  	//
   110  	// TODO: what is a good contract for receiving and passing errors and abort.
   111  	//
   112  	// If for a returned error x errors.Is(x, ErrAbort), all dependant tasks
   113  	// will not be run, without this being an error.
   114  	Run(t *Task, err error) error
   115  }
   116  
   117  // A RunnerFunc runs a Task.
   118  type RunnerFunc func(t *Task) error
   119  
   120  func (f RunnerFunc) Run(t *Task, err error) error {
   121  	return f(t)
   122  }
   123  
   124  // A Config defines options for interpreting an Instance as a Workflow.
   125  type Config struct {
   126  	// Root limits the search for tasks to be within the path indicated to root.
   127  	// For the cue command, this is set to ["command"]. The default value is
   128  	// for all tasks to be root.
   129  	Root cue.Path
   130  
   131  	// InferTasks allows tasks to be defined outside of the Root. Such tasks
   132  	// will only be included in the workflow if any of its fields is referenced
   133  	// by any of the tasks defined within Root.
   134  	//
   135  	// CAVEAT EMPTOR: this features is mostly provided for backwards
   136  	// compatibility with v0.2. A problem with this approach is that it will
   137  	// look for task structs within arbitrary data. So if not careful, there may
   138  	// be spurious matches.
   139  	InferTasks bool
   140  
   141  	// IgnoreConcrete ignores references for which the values are already
   142  	// concrete and cannot change.
   143  	IgnoreConcrete bool
   144  
   145  	// FindHiddenTasks allows tasks to be defined in hidden fields.
   146  	FindHiddenTasks bool
   147  
   148  	// UpdateFunc is called whenever the information in the controller is
   149  	// updated. This includes directly after initialization. The task may be
   150  	// nil if this call is not the result of a task completing.
   151  	UpdateFunc func(c *Controller, t *Task) error
   152  }
   153  
   154  // A Controller defines a set of Tasks to be executed.
   155  type Controller struct {
   156  	cfg    Config
   157  	isTask TaskFunc
   158  
   159  	inst        cue.Value
   160  	valueSeqNum int64
   161  
   162  	env *adt.Environment
   163  
   164  	conjuncts   []adt.Conjunct
   165  	conjunctSeq int64
   166  
   167  	taskCh chan *Task
   168  
   169  	opCtx      *adt.OpContext
   170  	context    context.Context
   171  	cancelFunc context.CancelFunc
   172  
   173  	// taskStats tracks counters for auxiliary operations done by tasks. It does
   174  	// not include the CUE operations done by the Controller on behalf of tasks,
   175  	// which is likely going to tbe the bulk of the operations.
   176  	taskStats stats.Counts
   177  
   178  	done atomic.Bool
   179  
   180  	// keys maps task keys to their index. This allows a recreation of the
   181  	// Instance while retaining the original task indices.
   182  	//
   183  	// TODO: do instance updating in place to allow for more efficient
   184  	// processing.
   185  	keys  map[string]*Task
   186  	tasks []*Task
   187  
   188  	// Only used during task initialization.
   189  	nodes map[*adt.Vertex]*Task
   190  
   191  	errs errors.Error
   192  }
   193  
   194  // Stats reports statistics on the total number of CUE operations used.
   195  //
   196  // This is an experimental method and the API is likely to change. The
   197  // Counts.String method will likely stay and is the safest way to use this API.
   198  //
   199  // This currently should only be called after completion or within a call to
   200  // UpdateFunc.
   201  func (c *Controller) Stats() (counts stats.Counts) {
   202  	counts = *c.opCtx.Stats()
   203  	counts.Add(c.taskStats)
   204  	return counts
   205  }
   206  
   207  // Tasks reports the tasks that are currently registered with the controller.
   208  //
   209  // This may currently only be called before Run is called or from within
   210  // a call to UpdateFunc. Task pointers returned by this call are not guaranteed
   211  // to be the same between successive calls to this method.
   212  func (c *Controller) Tasks() []*Task {
   213  	return c.tasks
   214  }
   215  
   216  func (c *Controller) cancel() {
   217  	if c.cancelFunc != nil {
   218  		c.cancelFunc()
   219  	}
   220  }
   221  
   222  func (c *Controller) addErr(err error, msg string) {
   223  	c.errs = errors.Append(c.errs, errors.Promote(err, msg))
   224  }
   225  
   226  // New creates a Controller for a given Instance and TaskFunc.
   227  //
   228  // The instance value can either be a *cue.Instance or a cue.Value.
   229  func New(cfg *Config, inst cue.InstanceOrValue, f TaskFunc) *Controller {
   230  	v := inst.Value()
   231  	ctx := eval.NewContext(value.ToInternal(v))
   232  
   233  	c := &Controller{
   234  		isTask: f,
   235  		inst:   v,
   236  		opCtx:  ctx,
   237  
   238  		taskCh: make(chan *Task),
   239  		keys:   map[string]*Task{},
   240  	}
   241  
   242  	if cfg != nil {
   243  		c.cfg = *cfg
   244  	}
   245  
   246  	c.initTasks()
   247  	return c
   248  
   249  }
   250  
   251  // Run runs the tasks of a workflow until completion.
   252  func (c *Controller) Run(ctx context.Context) error {
   253  	c.context, c.cancelFunc = context.WithCancel(ctx)
   254  	defer c.cancelFunc()
   255  
   256  	c.runLoop()
   257  
   258  	// NOTE: track state here as runLoop might add more tasks to the flow
   259  	// during the execution so checking current tasks state may not be
   260  	// accurate enough to determine that the flow is terminated.
   261  	// This is used to determine if the controller value can be retrieved.
   262  	// When the controller value is safe to be read concurrently this tracking
   263  	// can be removed.
   264  	c.done.Store(true)
   265  
   266  	return c.errs
   267  }
   268  
   269  // Value returns the value managed by the controller.
   270  //
   271  // It is safe to use the value only after Run() has returned.
   272  // It panics if the flow is running.
   273  func (c *Controller) Value() cue.Value {
   274  	if !c.done.Load() {
   275  		panic("can't retrieve value before flow has terminated")
   276  	}
   277  	return c.inst
   278  }
   279  
   280  // We need to escape quotes in the path, per
   281  // https://mermaid-js.github.io/mermaid/#/flowchart?id=entity-codes-to-escape-characters
   282  // This also requires that we escape the quoting character #.
   283  var mermaidQuote = strings.NewReplacer("#", "#35;", `"`, "#quot;")
   284  
   285  // mermaidGraph generates a mermaid graph of the current state. This can be
   286  // pasted into https://mermaid-js.github.io/mermaid-live-editor/ for
   287  // visualization.
   288  func mermaidGraph(c *Controller) string {
   289  	w := &strings.Builder{}
   290  	fmt.Fprintln(w, "graph TD")
   291  	for i, t := range c.Tasks() {
   292  		path := mermaidQuote.Replace(t.Path().String())
   293  		fmt.Fprintf(w, "  t%d(\"%s [%s]\")\n", i, path, t.State())
   294  		for _, t := range t.Dependencies() {
   295  			fmt.Fprintf(w, "  t%d-->t%d\n", i, t.Index())
   296  		}
   297  	}
   298  	return w.String()
   299  }
   300  
   301  // A State indicates the state of a Task.
   302  //
   303  // The following state diagram indicates the possible state transitions:
   304  //
   305  //	       Ready
   306  //	    ↗︎        ↘︎
   307  //	Waiting  ←  Running
   308  //	    ↘︎        ↙︎
   309  //	    Terminated
   310  //
   311  // A Task may move from Waiting to Terminating if one of
   312  // the tasks on which it depends fails.
   313  //
   314  // NOTE: transitions from Running to Waiting are currently not supported. In
   315  // the future this may be possible if a task depends on continuously running
   316  // tasks that send updates.
   317  type State int
   318  
   319  const (
   320  	// Waiting indicates a task is blocked on input from another task.
   321  	//
   322  	// NOTE: although this is currently not implemented, a task could
   323  	// theoretically move from the Running to Waiting state.
   324  	Waiting State = iota
   325  
   326  	// Ready means a tasks is ready to run, but currently not running.
   327  	Ready
   328  
   329  	// Running indicates a goroutine is currently active for a task and that
   330  	// it is not Waiting.
   331  	Running
   332  
   333  	// Terminated means a task has stopped running either because it terminated
   334  	// while Running or was aborted by task on which it depends. The error
   335  	// value of a Task indicates the reason for the termination.
   336  	Terminated
   337  )
   338  
   339  var stateStrings = map[State]string{
   340  	Waiting:    "Waiting",
   341  	Ready:      "Ready",
   342  	Running:    "Running",
   343  	Terminated: "Terminated",
   344  }
   345  
   346  // String reports a human readable string of status s.
   347  func (s State) String() string {
   348  	return stateStrings[s]
   349  }
   350  
   351  // A Task contains the context for a single task execution.
   352  // Tasks may be run concurrently.
   353  type Task struct {
   354  	// Static
   355  	c    *Controller
   356  	ctxt *adt.OpContext
   357  	r    Runner
   358  
   359  	index  int
   360  	path   cue.Path
   361  	key    string
   362  	labels []adt.Feature
   363  
   364  	// Dynamic
   365  	update   adt.Expr
   366  	deps     map[*Task]bool
   367  	pathDeps map[string][]*Task
   368  
   369  	conjunctSeq int64
   370  	valueSeq    int64
   371  	v           cue.Value
   372  	err         errors.Error
   373  	state       State
   374  	depTasks    []*Task
   375  
   376  	stats stats.Counts
   377  }
   378  
   379  // Stats reports statistics on the number of CUE operations used to complete
   380  // this task.
   381  //
   382  // This is an experimental method and the API is likely to change.
   383  //
   384  // It only shows numbers upon completion. This may change in the future.
   385  func (t *Task) Stats() stats.Counts {
   386  	return t.stats
   387  }
   388  
   389  // Context reports the Controller's Context.
   390  func (t *Task) Context() context.Context {
   391  	return t.c.context
   392  }
   393  
   394  // Path reports the path of Task within the Instance in which it is defined.
   395  // The Path is always valid.
   396  func (t *Task) Path() cue.Path {
   397  	return t.path
   398  }
   399  
   400  // Index reports the sequence number of the Task. This will not change over
   401  // time.
   402  func (t *Task) Index() int {
   403  	return t.index
   404  }
   405  
   406  func (t *Task) done() bool {
   407  	return t.state > Running
   408  }
   409  
   410  func (t *Task) isReady() bool {
   411  	for _, d := range t.depTasks {
   412  		if !d.done() {
   413  			return false
   414  		}
   415  	}
   416  	return true
   417  }
   418  
   419  func (t *Task) vertex() *adt.Vertex {
   420  	_, x := value.ToInternal(t.v)
   421  	return x
   422  }
   423  
   424  func (t *Task) addDep(path string, dep *Task) {
   425  	if dep == nil || dep == t {
   426  		return
   427  	}
   428  	if t.deps == nil {
   429  		t.deps = map[*Task]bool{}
   430  		t.pathDeps = map[string][]*Task{}
   431  	}
   432  
   433  	// Add the dependencies for a given path to the controller. We could compute
   434  	// this again later, but this ensures there will be no discrepancies.
   435  	a := t.pathDeps[path]
   436  	found := false
   437  	for _, t := range a {
   438  		if t == dep {
   439  			found = true
   440  			break
   441  		}
   442  	}
   443  	if !found {
   444  		t.pathDeps[path] = append(a, dep)
   445  
   446  	}
   447  
   448  	if !t.deps[dep] {
   449  		t.deps[dep] = true
   450  		t.depTasks = append(t.depTasks, dep)
   451  	}
   452  }
   453  
   454  // Fill fills in values of the Controller's configuration for the current task.
   455  // The changes take effect after the task completes.
   456  //
   457  // This method may currently only be called by the runner.
   458  func (t *Task) Fill(x interface{}) error {
   459  	expr := convert.GoValueToExpr(t.ctxt, true, x)
   460  	if t.update == nil {
   461  		t.update = expr
   462  		return nil
   463  	}
   464  	t.update = &adt.BinaryExpr{
   465  		Op: adt.AndOp,
   466  		X:  t.update,
   467  		Y:  expr,
   468  	}
   469  	return nil
   470  }
   471  
   472  // Value reports the latest value of this task.
   473  //
   474  // This method may currently only be called before Run is called or after a
   475  // Task completed, or from within a call to UpdateFunc.
   476  func (t *Task) Value() cue.Value {
   477  	// TODO: synchronize
   478  	return t.v
   479  }
   480  
   481  // Dependencies reports the Tasks t depends on.
   482  //
   483  // This method may currently only be called before Run is called or after a
   484  // Task completed, or from within a call to UpdateFunc.
   485  func (t *Task) Dependencies() []*Task {
   486  	// TODO: add synchronization.
   487  	return t.depTasks
   488  }
   489  
   490  // PathDependencies reports the dependencies found for a value at the given
   491  // path.
   492  //
   493  // This may currently only be called before Run is called or from within
   494  // a call to UpdateFunc.
   495  func (t *Task) PathDependencies(p cue.Path) []*Task {
   496  	return t.pathDeps[p.String()]
   497  }
   498  
   499  // Err returns the error of a completed Task.
   500  //
   501  // This method may currently only be called before Run is called, after a
   502  // Task completed, or from within a call to UpdateFunc.
   503  func (t *Task) Err() error {
   504  	return t.err
   505  }
   506  
   507  // State is the current state of the Task.
   508  //
   509  // This method may currently only be called before Run is called or after a
   510  // Task completed, or from within a call to UpdateFunc.
   511  func (t *Task) State() State {
   512  	return t.state
   513  }
   514  

View as plain text