...

Source file src/cuelang.org/go/tools/flow/run.go

Documentation: cuelang.org/go/tools/flow

     1  // Copyright 2020 CUE Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package flow
    16  
    17  // This file contains logic for running tasks.
    18  //
    19  // This implementation anticipates that workflows can also be used for defining
    20  // servers, not just batch scripts. In the future, tasks may be long running and
    21  // provide streams of results.
    22  //
    23  // The implementation starts a goroutine for each user-defined task, instead of
    24  // having a fixed pool of workers. The main reason for this is that tasks are
    25  // inherently heterogeneous and may be blocking on top of that. Also, in the
    26  // future tasks may be long running, as discussed above.
    27  
    28  import (
    29  	"fmt"
    30  	"os"
    31  
    32  	"cuelang.org/go/cue/errors"
    33  	"cuelang.org/go/internal/core/adt"
    34  	"cuelang.org/go/internal/core/eval"
    35  	"cuelang.org/go/internal/value"
    36  )
    37  
    38  func (c *Controller) runLoop() {
    39  	_, root := value.ToInternal(c.inst)
    40  
    41  	// Copy the initial conjuncts.
    42  	n := len(root.Conjuncts)
    43  	c.conjuncts = make([]adt.Conjunct, n, n+len(c.tasks))
    44  	copy(c.conjuncts, root.Conjuncts)
    45  
    46  	c.markReady(nil)
    47  
    48  	for c.errs == nil {
    49  		// Dispatch all unblocked tasks to workers. Only update
    50  		// the configuration when all have been dispatched.
    51  
    52  		waiting := false
    53  		running := false
    54  
    55  		// Mark tasks as Ready.
    56  		for _, t := range c.tasks {
    57  			switch t.state {
    58  			case Waiting:
    59  				waiting = true
    60  
    61  			case Ready:
    62  				running = true
    63  
    64  				t.state = Running
    65  				c.updateTaskValue(t)
    66  
    67  				t.ctxt = eval.NewContext(value.ToInternal(t.v))
    68  
    69  				go func(t *Task) {
    70  					if err := t.r.Run(t, nil); err != nil {
    71  						t.err = errors.Promote(err, "task failed")
    72  					}
    73  
    74  					t.c.taskCh <- t
    75  				}(t)
    76  
    77  			case Running:
    78  				running = true
    79  
    80  			case Terminated:
    81  			}
    82  		}
    83  
    84  		if !running {
    85  			if waiting {
    86  				// Should not happen ever, as cycle detection should have caught
    87  				// this. But keep this around as a defensive measure.
    88  				c.addErr(errors.New("deadlock"), "run loop")
    89  			}
    90  			break
    91  		}
    92  
    93  		select {
    94  		case <-c.context.Done():
    95  			return
    96  
    97  		case t := <-c.taskCh:
    98  			t.state = Terminated
    99  
   100  			taskStats := *t.ctxt.Stats()
   101  			t.stats.Add(taskStats)
   102  			c.taskStats.Add(taskStats)
   103  
   104  			start := *c.opCtx.Stats()
   105  
   106  			switch t.err {
   107  			case nil:
   108  				c.updateTaskResults(t)
   109  
   110  			case ErrAbort:
   111  				// TODO: do something cleverer.
   112  				fallthrough
   113  
   114  			default:
   115  				c.addErr(t.err, "task failure")
   116  				return
   117  			}
   118  
   119  			// Recompute the configuration, if necessary.
   120  			if c.updateValue() {
   121  				// initTasks was already called in New to catch initialization
   122  				// errors earlier.
   123  				c.initTasks()
   124  			}
   125  
   126  			c.updateTaskValue(t)
   127  
   128  			t.stats.Add(c.opCtx.Stats().Since(start))
   129  
   130  			c.markReady(t)
   131  		}
   132  	}
   133  }
   134  
   135  func (c *Controller) markReady(t *Task) {
   136  	for _, x := range c.tasks {
   137  		if x.state == Waiting && x.isReady() {
   138  			x.state = Ready
   139  		}
   140  	}
   141  
   142  	if debug {
   143  		fmt.Fprintln(os.Stderr, "tools/flow task dependency graph:")
   144  		fmt.Fprintln(os.Stderr, "```mermaid")
   145  		fmt.Fprint(os.Stderr, mermaidGraph(c))
   146  		fmt.Fprintln(os.Stderr, "```")
   147  	}
   148  
   149  	if c.cfg.UpdateFunc != nil {
   150  		if err := c.cfg.UpdateFunc(c, t); err != nil {
   151  			c.addErr(err, "task completed")
   152  			c.cancel()
   153  			return
   154  		}
   155  	}
   156  }
   157  
   158  // updateValue recomputes the workflow configuration if it is out of date. It
   159  // reports whether the values were updated.
   160  func (c *Controller) updateValue() bool {
   161  
   162  	if c.valueSeqNum == c.conjunctSeq {
   163  		return false
   164  	}
   165  
   166  	// TODO: incrementally update results. Currently, the entire tree is
   167  	// recomputed on every update. This should not be necessary with the right
   168  	// notification structure in place.
   169  
   170  	v := &adt.Vertex{Conjuncts: c.conjuncts}
   171  	v.Finalize(c.opCtx)
   172  
   173  	c.inst = value.Make(c.opCtx, v)
   174  	c.valueSeqNum = c.conjunctSeq
   175  	return true
   176  }
   177  
   178  // updateTaskValue updates the value of the task in the configuration if it is
   179  // out of date.
   180  func (c *Controller) updateTaskValue(t *Task) {
   181  	required := t.conjunctSeq
   182  	for _, dep := range t.depTasks {
   183  		if dep.conjunctSeq > required {
   184  			required = dep.conjunctSeq
   185  		}
   186  	}
   187  
   188  	if t.valueSeq == required {
   189  		return
   190  	}
   191  
   192  	if c.valueSeqNum < required {
   193  		c.updateValue()
   194  	}
   195  
   196  	t.v = c.inst.LookupPath(t.path)
   197  	t.valueSeq = required
   198  }
   199  
   200  // updateTaskResults updates the result status of the task and adds any result
   201  // values to the overall configuration.
   202  func (c *Controller) updateTaskResults(t *Task) bool {
   203  	if t.update == nil {
   204  		return false
   205  	}
   206  
   207  	expr := t.update
   208  	for i := len(t.labels) - 1; i >= 0; i-- {
   209  		label := t.labels[i]
   210  		switch label.Typ() {
   211  		case adt.StringLabel, adt.HiddenLabel:
   212  			expr = &adt.StructLit{
   213  				Decls: []adt.Decl{
   214  					&adt.Field{
   215  						Label: t.labels[i],
   216  						Value: expr,
   217  					},
   218  				},
   219  			}
   220  		case adt.IntLabel:
   221  			i := label.Index()
   222  			list := &adt.ListLit{}
   223  			any := &adt.Top{}
   224  			// TODO(perf): make this a constant thing. This will be possible with the query extension.
   225  			for k := 0; k < i; k++ {
   226  				list.Elems = append(list.Elems, any)
   227  			}
   228  			list.Elems = append(list.Elems, expr, &adt.Ellipsis{})
   229  			expr = list
   230  		default:
   231  			panic(fmt.Errorf("unexpected label type %v", label.Typ()))
   232  		}
   233  	}
   234  
   235  	t.update = nil
   236  
   237  	// TODO: replace rather than add conjunct if this task already added a
   238  	// conjunct before. This will allow for serving applications.
   239  	c.conjuncts = append(c.conjuncts, adt.MakeRootConjunct(c.env, expr))
   240  	c.conjunctSeq++
   241  	t.conjunctSeq = c.conjunctSeq
   242  
   243  	return true
   244  }
   245  

View as plain text