...

Source file src/cuelang.org/go/tools/flow/flow_test.go

Documentation: cuelang.org/go/tools/flow

     1  // Copyright 2020 CUE Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package flow_test
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"os"
    21  	"path"
    22  	"strings"
    23  	"sync"
    24  	"testing"
    25  	"time"
    26  
    27  	"cuelang.org/go/cue"
    28  	"cuelang.org/go/cue/cuecontext"
    29  	"cuelang.org/go/cue/errors"
    30  	"cuelang.org/go/cue/format"
    31  	"cuelang.org/go/cue/stats"
    32  	"cuelang.org/go/internal/cuetxtar"
    33  	"cuelang.org/go/tools/flow"
    34  )
    35  
    36  // TestTasks tests the logic that determines which nodes are tasks and what are
    37  // their dependencies.
    38  func TestFlow(t *testing.T) {
    39  	test := cuetxtar.TxTarTest{
    40  		Root: "./testdata",
    41  		Name: "run",
    42  	}
    43  
    44  	test.Run(t, func(t *cuetxtar.Test) {
    45  		v := cuecontext.New().BuildInstance(t.Instance())
    46  		if err := v.Err(); err != nil {
    47  			t.Fatal(errors.Details(err, nil))
    48  		}
    49  
    50  		seqNum = 0
    51  
    52  		var tasksTotal stats.Counts
    53  
    54  		updateFunc := func(c *flow.Controller, task *flow.Task) error {
    55  			str := flow.MermaidGraph(c)
    56  			step := fmt.Sprintf("t%d", seqNum)
    57  			fmt.Fprintln(t.Writer(step), str)
    58  
    59  			if task != nil {
    60  				n := task.Value().Syntax(cue.Final())
    61  				b, err := format.Node(n)
    62  				if err != nil {
    63  					t.Fatal(err)
    64  				}
    65  				fmt.Fprintln(t.Writer(path.Join(step, "value")), string(b))
    66  
    67  				stats := task.Stats()
    68  				tasksTotal.Add(stats)
    69  				fmt.Fprintln(t.Writer(path.Join(step, "stats")), &stats)
    70  			}
    71  
    72  			incSeqNum()
    73  
    74  			return nil
    75  		}
    76  
    77  		cfg := &flow.Config{
    78  			Root:            cue.ParsePath("root"),
    79  			InferTasks:      t.Bool("InferTasks"),
    80  			IgnoreConcrete:  t.Bool("IgnoreConcrete"),
    81  			FindHiddenTasks: t.Bool("FindHiddenTasks"),
    82  			UpdateFunc:      updateFunc,
    83  		}
    84  
    85  		c := flow.New(cfg, v, taskFunc)
    86  
    87  		w := t.Writer("errors")
    88  		if err := c.Run(context.Background()); err != nil {
    89  			cwd, _ := os.Getwd()
    90  			fmt.Fprint(w, "error: ")
    91  			errors.Print(w, err, &errors.Config{
    92  				Cwd:     cwd,
    93  				ToSlash: true,
    94  			})
    95  		}
    96  
    97  		totals := c.Stats()
    98  		if tasksTotal != zeroStats && totals != tasksTotal {
    99  			t.Errorf(diffMsg, tasksTotal, totals, tasksTotal.Since(totals))
   100  		}
   101  		fmt.Fprintln(t.Writer("stats/totals"), totals)
   102  	})
   103  }
   104  
   105  var zeroStats stats.Counts
   106  
   107  const diffMsg = `
   108  stats: task totals different from controller:
   109  task totals:
   110  %v
   111  
   112  controller totals:
   113  %v
   114  
   115  task totals - controller totals:
   116  %v`
   117  
   118  func TestFlowValuePanic(t *testing.T) {
   119  	f := `
   120      root: {
   121          a: {
   122              $id: "slow"
   123              out: string
   124          }
   125          b: {
   126              $id:    "slow"
   127              $after: a
   128              out:    string
   129          }
   130      }
   131      `
   132  	ctx := cuecontext.New()
   133  	v := ctx.CompileString(f)
   134  
   135  	ch := make(chan bool, 1)
   136  
   137  	cfg := &flow.Config{
   138  		Root: cue.ParsePath("root"),
   139  		UpdateFunc: func(c *flow.Controller, t *flow.Task) error {
   140  			ch <- true
   141  			return nil
   142  		},
   143  	}
   144  
   145  	c := flow.New(cfg, v, taskFunc)
   146  
   147  	defer func() { recover() }()
   148  
   149  	go c.Run(context.TODO())
   150  
   151  	// Call Value amidst two task runs. This should trigger a panic as the flow
   152  	// is not terminated.
   153  	<-ch
   154  	c.Value()
   155  	<-ch
   156  
   157  	t.Errorf("Value() did not panic")
   158  }
   159  
   160  func taskFunc(v cue.Value) (flow.Runner, error) {
   161  	switch name, err := v.Lookup("$id").String(); name {
   162  	default:
   163  		if err == nil {
   164  			return flow.RunnerFunc(func(t *flow.Task) error {
   165  				t.Fill(map[string]string{"stdout": "foo"})
   166  				return nil
   167  			}), nil
   168  		} else if v.LookupPath(cue.MakePath(cue.Str("$id"))).Exists() {
   169  			return nil, err
   170  		}
   171  
   172  	case "valToOut":
   173  		return flow.RunnerFunc(func(t *flow.Task) error {
   174  			if str, err := t.Value().Lookup("val").String(); err == nil {
   175  				t.Fill(map[string]string{"out": str})
   176  			}
   177  			return nil
   178  		}), nil
   179  
   180  	case "failure":
   181  		return flow.RunnerFunc(func(t *flow.Task) error {
   182  			return errors.New("failure")
   183  		}), nil
   184  
   185  	case "abort":
   186  		return flow.RunnerFunc(func(t *flow.Task) error {
   187  			return flow.ErrAbort
   188  		}), nil
   189  
   190  	case "list":
   191  		return flow.RunnerFunc(func(t *flow.Task) error {
   192  			t.Fill(map[string][]int{"out": {1, 2}})
   193  			return nil
   194  		}), nil
   195  
   196  	case "slow":
   197  		return flow.RunnerFunc(func(t *flow.Task) error {
   198  			time.Sleep(10 * time.Millisecond)
   199  			t.Fill(map[string]string{"out": "finished"})
   200  			return nil
   201  		}), nil
   202  
   203  	case "sequenced":
   204  		// This task is used to serialize different runners in case
   205  		// non-deterministic scheduling is possible.
   206  		return flow.RunnerFunc(func(t *flow.Task) error {
   207  			seq, err := t.Value().Lookup("seq").Int64()
   208  			if err != nil {
   209  				return err
   210  			}
   211  
   212  			waitSeqNum(seq)
   213  
   214  			if str, err := t.Value().Lookup("val").String(); err == nil {
   215  				t.Fill(map[string]string{"out": str})
   216  			}
   217  
   218  			return nil
   219  		}), nil
   220  	}
   221  	return nil, nil
   222  }
   223  
   224  // These vars are used to serialize tasks that are run in parallel. This allows
   225  // for testing running tasks in parallel, while obtaining deterministic output.
   226  var (
   227  	seqNum  int64
   228  	seqLock sync.Mutex
   229  	seqCond = sync.NewCond(&seqLock)
   230  )
   231  
   232  func incSeqNum() {
   233  	seqCond.L.Lock()
   234  	seqNum++
   235  	seqCond.Broadcast()
   236  	seqCond.L.Unlock()
   237  }
   238  
   239  func waitSeqNum(seq int64) {
   240  	seqCond.L.Lock()
   241  	for seq != seqNum {
   242  		seqCond.Wait()
   243  	}
   244  	seqCond.L.Unlock()
   245  }
   246  
   247  // DO NOT REMOVE: for testing purposes.
   248  func TestX(t *testing.T) {
   249  	in := `
   250  	`
   251  
   252  	if strings.TrimSpace(in) == "" {
   253  		t.Skip()
   254  	}
   255  
   256  	rt := cuecontext.New()
   257  	v := rt.CompileString(in)
   258  	if err := v.Err(); err != nil {
   259  		t.Fatal(err)
   260  	}
   261  
   262  	c := flow.New(&flow.Config{
   263  		Root: cue.ParsePath("root"),
   264  		UpdateFunc: func(c *flow.Controller, ft *flow.Task) error {
   265  			if ft != nil {
   266  				t.Errorf("\nTASK:\n%s", ft.Stats())
   267  			}
   268  			return nil
   269  		},
   270  	}, v, taskFunc)
   271  
   272  	t.Error(flow.MermaidGraph(c))
   273  
   274  	if err := c.Run(context.Background()); err != nil {
   275  		t.Fatal(errors.Details(err, nil))
   276  	}
   277  
   278  	t.Errorf("\nCONTROLLER:\n%s", c.Stats())
   279  }
   280  

View as plain text