...

Source file src/cuelang.org/go/internal/core/adt/sched_test.go

Documentation: cuelang.org/go/internal/core/adt

     1  // Copyright 2023 CUE Authors
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package adt
    16  
    17  import (
    18  	"fmt"
    19  	"strings"
    20  	"testing"
    21  
    22  	"cuelang.org/go/internal/cuetest"
    23  )
    24  
    25  const (
    26  	c1 condition = 1 << iota
    27  	c2
    28  	c3
    29  	c4
    30  
    31  	// auto is a condition that is automatically set by the simulator.
    32  	auto
    33  )
    34  
    35  // TestScheduler tests the non-CUE specific scheduler functionality.
    36  func TestScheduler(t *testing.T) {
    37  	ctx := &OpContext{
    38  		taskContext: taskContext{
    39  			counterMask: c1 | c2 | c3 | c4,
    40  			complete:    func(s *scheduler) condition { return 0 },
    41  		},
    42  	}
    43  
    44  	// shared state
    45  	nodeID := 0
    46  	w := &strings.Builder{}
    47  	nodes := []*nodeContext{}
    48  
    49  	node := func(parent *nodeContext) *nodeContext {
    50  		if nodeID == 0 {
    51  			if parent != nil {
    52  				t.Fatal("root node must be created first")
    53  			}
    54  		} else {
    55  			if parent == nil {
    56  				t.Fatal("non-root node must have parent")
    57  			}
    58  		}
    59  
    60  		n := &nodeContext{scheduler: scheduler{ctx: ctx}, refCount: nodeID}
    61  		nodeID++
    62  		nodes = append(nodes, n)
    63  		return n
    64  	}
    65  
    66  	// dep encodes a dependency on a node uncovered while running a task. It
    67  	// corresponds to a single evaluation of a top-level expression within a
    68  	// task.
    69  	type dep struct {
    70  		node  *nodeContext
    71  		needs condition
    72  	}
    73  
    74  	// process simulates the running of a task with the given dependencies on
    75  	// other nodes/ schedulers.
    76  	//
    77  	// Note that tasks indicate their dependencies at runtime, and that these
    78  	// are not statically declared at the time of task creation. This is because
    79  	// dependencies may only be known after evaluating some CUE. As a
    80  	// consequence, it may be possible for a tasks to be started before one of
    81  	// its dependencies is run. Blocking only occurs if there is a mutual
    82  	// dependency that cannot be resolved without first blocking the task and
    83  	// coming back to it later.
    84  	process := func(name string, t *task, deps ...dep) (ok bool) {
    85  		fmt.Fprintf(w, "\n\t\t    running task %s", name)
    86  		ok = true
    87  		for _, d := range deps {
    88  			func() {
    89  				defer func() {
    90  					if x := recover(); x != nil {
    91  						fmt.Fprintf(w, "\n\t\t        task %s waiting for v%d meeting %x", name, d.node.refCount, d.needs)
    92  						fmt.Fprint(w, ": BLOCKED")
    93  						panic(x)
    94  					}
    95  				}()
    96  				if !d.node.process(d.needs, yield) {
    97  					ok = false
    98  				}
    99  			}()
   100  		}
   101  		return ok
   102  	}
   103  
   104  	// success creates a task that will succeed.
   105  	success := func(name string, n *nodeContext, completes, needs condition, deps ...dep) *task {
   106  		t := &task{
   107  			run: &runner{
   108  				f: func(ctx *OpContext, t *task, mode runMode) {
   109  					process(name, t, deps...)
   110  				},
   111  				completes: completes,
   112  				needs:     needs,
   113  			},
   114  			node: n,
   115  			x:    &String{Str: name}, // Set name for debugging purposes.
   116  		}
   117  		n.insertTask(t)
   118  		return t
   119  	}
   120  
   121  	// signal is a task that unconditionally sets a completion bit.
   122  	signal := func(name string, n *nodeContext, completes condition, deps ...dep) *task {
   123  		t := &task{
   124  			run: &runner{
   125  				f: func(ctx *OpContext, t *task, mode runMode) {
   126  					if process(name, t, deps...) {
   127  						n.scheduler.signal(completes)
   128  					}
   129  				},
   130  				completes: completes,
   131  			},
   132  			node: n,
   133  			x:    &String{Str: name}, // Set name for debugging purposes.
   134  		}
   135  		n.insertTask(t)
   136  		return t
   137  	}
   138  
   139  	// completes creates a task that completes some state in another node.
   140  	completes := func(name string, n, other *nodeContext, completes condition, deps ...dep) *task {
   141  		other.scheduler.incrementCounts(completes)
   142  		t := &task{
   143  			run: &runner{
   144  				f: func(ctx *OpContext, t *task, mode runMode) {
   145  					if process(name, t, deps...) {
   146  						other.scheduler.decrementCounts(completes)
   147  					}
   148  				},
   149  				completes: completes,
   150  			},
   151  			node: n,
   152  			x:    &String{Str: name}, // Set name for debugging purposes.
   153  		}
   154  		n.insertTask(t)
   155  		return t
   156  	}
   157  
   158  	// fail creates a task that will fail.
   159  	fail := func(name string, n *nodeContext, completes, needs condition, deps ...dep) *task {
   160  		t := &task{
   161  
   162  			run: &runner{
   163  				f: func(ctx *OpContext, t *task, mode runMode) {
   164  					fmt.Fprintf(w, "\n\t\t    running task %s:", name)
   165  					t.err = &Bottom{}
   166  					fmt.Fprint(w, " FAIL")
   167  				},
   168  				completes: completes,
   169  				needs:     needs,
   170  			},
   171  			node: n,
   172  			x:    &String{Str: name}, // Set name for debugging purposes.
   173  		}
   174  		n.insertTask(t)
   175  		return t
   176  	}
   177  
   178  	type testCase struct {
   179  		name string
   180  		init func()
   181  
   182  		log   string // A lot
   183  		state string // A textual representation of the task state
   184  
   185  		// err holds all errors or "" if none.
   186  		err string
   187  	}
   188  
   189  	cases := []testCase{{
   190  		name: "empty scheduler",
   191  		init: func() {
   192  			node(nil)
   193  		},
   194  		log: ``,
   195  
   196  		state: `
   197  			v0 (SUCCESS):`,
   198  	}, {
   199  		name: "node with one task",
   200  		init: func() {
   201  			v0 := node(nil)
   202  			success("t1", v0, c1, 0)
   203  		},
   204  		log: `
   205  		    running task t1`,
   206  
   207  		state: `
   208  			v0 (SUCCESS):
   209  			    task:    t1: SUCCESS`,
   210  	}, {
   211  		name: "node with two tasks",
   212  		init: func() {
   213  			v0 := node(nil)
   214  			success("t1", v0, c1, 0)
   215  			success("t2", v0, c2, 0)
   216  		},
   217  		log: `
   218  		    running task t1
   219  		    running task t2`,
   220  
   221  		state: `
   222  			v0 (SUCCESS):
   223  			    task:    t1: SUCCESS
   224  			    task:    t2: SUCCESS`,
   225  	}, {
   226  		name: "node failing task",
   227  		init: func() {
   228  			v0 := node(nil)
   229  			fail("t1", v0, c1, 0)
   230  		},
   231  		log: `
   232  		    running task t1: FAIL`,
   233  
   234  		state: `
   235  			v0 (SUCCESS):
   236  			    task:    t1: FAILED`,
   237  	}, {
   238  		// Tasks will have to be run in order according to their dependencies.
   239  		// Note that the tasks will be run in order, as they all depend on the
   240  		// same node, in which case the order must be and will be strictly
   241  		// enforced.
   242  		name: "dependency chain on nodes within scheduler",
   243  		init: func() {
   244  			v0 := node(nil)
   245  			success("third", v0, c3, c2)
   246  			success("fourth", v0, c4, c3)
   247  			success("second", v0, c2, c1)
   248  			success("first", v0, c1, 0)
   249  		},
   250  		log: `
   251  		    running task first
   252  		    running task second
   253  		    running task third
   254  		    running task fourth`,
   255  
   256  		state: `
   257  			v0 (SUCCESS):
   258  			    task:    third: SUCCESS
   259  			    task:    fourth: SUCCESS
   260  			    task:    second: SUCCESS
   261  			    task:    first: SUCCESS`,
   262  	}, {
   263  		// If a task depends on a state completion for which there is no task,
   264  		// it should be considered as completed, because essentially all
   265  		// information is known about that state.
   266  		name: "task depends on state for which there is no task",
   267  		init: func() {
   268  			v0 := node(nil)
   269  			success("t1", v0, c2, c1)
   270  		},
   271  		log: `
   272  		    running task t1`,
   273  		state: `
   274  			v0 (SUCCESS):
   275  			    task:    t1: SUCCESS`,
   276  	}, {
   277  		// Same as previous, but now for another node.
   278  		name: "task depends on state of other node for which there is no task",
   279  		init: func() {
   280  			v0 := node(nil)
   281  			v1 := node(v0)
   282  			v2 := node(v0)
   283  			success("t1", v1, c1, 0, dep{node: v2, needs: c2})
   284  		},
   285  		log: `
   286  		    running task t1`,
   287  		state: `
   288  			v0 (SUCCESS):
   289  			v1 (SUCCESS):
   290  			    task:    t1: SUCCESS
   291  			v2 (SUCCESS):`,
   292  	}, {
   293  		name: "tasks depend on multiple other tasks within same scheduler",
   294  		init: func() {
   295  			v0 := node(nil)
   296  			success("before1", v0, c2, 0)
   297  			success("last", v0, c4, c1|c2|c3)
   298  			success("block", v0, c3, c1|c2)
   299  			success("before2", v0, c1, 0)
   300  		},
   301  		log: `
   302  		    running task before1
   303  		    running task before2
   304  		    running task block
   305  		    running task last`,
   306  
   307  		state: `
   308  			v0 (SUCCESS):
   309  			    task:    before1: SUCCESS
   310  			    task:    last: SUCCESS
   311  			    task:    block: SUCCESS
   312  			    task:    before2: SUCCESS`,
   313  	}, {
   314  		// In this test we simulate dynamic reference that are dependent
   315  		// on each other in a chain to form the fields. Task t0 would not be
   316  		// a task in the regular evaluator, but it is included there as a
   317  		// task in absence of the ability to simulate static elements.
   318  		//
   319  		//	v0: {
   320  		//		(v0.baz): "bar" // task t1
   321  		//		(v0.foo): "baz" // task t2
   322  		//		baz: "foo"      // task t0
   323  		//	}
   324  		//
   325  		name: "non-cyclic dependencies between nodes p1",
   326  		init: func() {
   327  			v0 := node(nil)
   328  			baz := node(v0)
   329  			success("t0", baz, c1, 0)
   330  			foo := node(v0)
   331  
   332  			completes("t1:bar", v0, foo, c2, dep{node: baz, needs: c1})
   333  			success("t2:baz", v0, c1, 0, dep{node: foo, needs: c2})
   334  		},
   335  		log: `
   336  		    running task t1:bar
   337  		    running task t0
   338  		    running task t2:baz`,
   339  		state: `
   340  			v0 (SUCCESS):
   341  			    task:    t1:bar: SUCCESS
   342  			    task:    t2:baz: SUCCESS
   343  			v1 (SUCCESS):
   344  			    task:    t0: SUCCESS
   345  			v2 (SUCCESS):`,
   346  	}, {
   347  		// Like the previous test, but different order of execution.
   348  		//
   349  		//	v0: {
   350  		//		(v0.foo): "baz" // task t2
   351  		//		(v0.baz): "bar" // task t1
   352  		//		baz: "foo"      // task t0
   353  		//	}
   354  		//
   355  		name: "non-cyclic dependencies between nodes p2",
   356  		init: func() {
   357  			v0 := node(nil)
   358  			baz := node(v0)
   359  			success("foo", baz, c1, 0)
   360  			foo := node(v0)
   361  
   362  			success("t2:baz", v0, c1, 0, dep{node: foo, needs: c2})
   363  			completes("t1:bar", v0, foo, c2, dep{node: baz, needs: c1})
   364  		},
   365  		log: `
   366  		    running task t2:baz
   367  		    running task t1:bar
   368  		    running task foo`,
   369  		state: `
   370  			v0 (SUCCESS):
   371  			    task:    t2:baz: SUCCESS
   372  			    task:    t1:bar: SUCCESS
   373  			v1 (SUCCESS):
   374  			    task:    foo: SUCCESS
   375  			v2 (SUCCESS):`,
   376  	}, {
   377  		//	    b: a - 10
   378  		//	    a: b + 10
   379  		name: "cycle in mutually referencing expressions",
   380  		init: func() {
   381  			v0 := node(nil)
   382  			v1 := node(v0)
   383  			v2 := node(v0)
   384  			success("a-10", v1, c1|c2, 0, dep{node: v2, needs: c1})
   385  			success("b+10", v2, c1|c2, 0, dep{node: v1, needs: c1})
   386  		},
   387  		log: `
   388  		    running task a-10
   389  		    running task b+10
   390  		        task b+10 waiting for v1 meeting 1: BLOCKED
   391  		        task a-10 waiting for v2 meeting 1: BLOCKED
   392  		    running task b+10
   393  		    running task a-10`,
   394  		state: `
   395  			v0 (SUCCESS):
   396  			v1 (SUCCESS): (frozen)
   397  			    task:    a-10: SUCCESS (unblocked)
   398  			v2 (SUCCESS): (frozen)
   399  			    task:    b+10: SUCCESS (unblocked)`,
   400  	}, {
   401  		//	    b: a - 10
   402  		//	    a: b + 10
   403  		//	    a: 5
   404  		name: "broken cyclic reference in expressions",
   405  		init: func() {
   406  			v0 := node(nil)
   407  			v1 := node(v0)
   408  			v2 := node(v0)
   409  			success("a-10", v1, c1|c2, 0, dep{node: v2, needs: c1})
   410  			success("b+10", v2, c1|c2, 0, dep{node: v1, needs: c1})
   411  
   412  			// NOTE: using success("5", v2, c1, 0) here would cause the cyclic
   413  			// references to block, as they would both provide and depend on
   414  			// v1 and v2 becoming scalars. Once a field is known to be a scalar,
   415  			// it can safely be signaled as unification cannot make it more
   416  			// concrete. Further unification could result in an error, but that
   417  			// will be caught by completing the unification.
   418  			signal("5", v2, c1)
   419  		},
   420  		log: `
   421  		    running task a-10
   422  		    running task b+10
   423  		        task b+10 waiting for v1 meeting 1: BLOCKED
   424  		    running task 5
   425  		    running task b+10`,
   426  		state: `
   427  			v0 (SUCCESS):
   428  			v1 (SUCCESS):
   429  			    task:    a-10: SUCCESS
   430  			v2 (SUCCESS):
   431  			    task:    b+10: SUCCESS
   432  			    task:    5: SUCCESS`,
   433  	}, {
   434  		// This test simulates a case where a comprehension projects
   435  		// onto itself. The cycle is broken by allowing a required state
   436  		// to be dropped upon detecting a cycle. For comprehensions,
   437  		// for instance, one usually would define that it provides fields in
   438  		// the vertex in which it is defined. However, for self-projections
   439  		// this results in a cycle. By dropping the requirement that all fields
   440  		// need be specified the cycle is broken. However, this means the
   441  		// comprehension may no longer add new fields to the vertex.
   442  		//
   443  		//	x: {
   444  		//		for k, v in x {
   445  		//			(k): v
   446  		//		}
   447  		//		foo: 5
   448  		//	}
   449  		name: "self cyclic",
   450  		init: func() {
   451  			x := node(nil)
   452  			foo := node(x)
   453  			success("5", foo, c1, 0)
   454  			success("comprehension", x, c1, 0, dep{node: x, needs: c1})
   455  		},
   456  		log: `
   457  		    running task comprehension
   458  		        task comprehension waiting for v0 meeting 1: BLOCKED
   459  		    running task comprehension
   460  		    running task 5`,
   461  		state: `
   462  			v0 (SUCCESS): (frozen)
   463  			    task:    comprehension: SUCCESS (unblocked)
   464  			v1 (SUCCESS):
   465  			    task:    5: SUCCESS`,
   466  	}, {
   467  		// This test simulates a case where comprehensions are not allowed to
   468  		// project on themselves. CUE allows this, but it is to test that
   469  		// similar constructions where this is not allowed do not cause
   470  		// infinite loops.
   471  		//
   472  		//	x: {
   473  		//		for k, v in x {
   474  		//			(k+"X"): v
   475  		//		}
   476  		//		foo: 5
   477  		//	}
   478  		// TODO: override freeze.
   479  		name: "self cyclic not allowed",
   480  		init: func() {
   481  			x := node(nil)
   482  			foo := node(x)
   483  			success("5", foo, c1, 0)
   484  			success("comprehension", x, c1, 0, dep{node: x, needs: c1})
   485  		},
   486  		log: `
   487  		    running task comprehension
   488  		        task comprehension waiting for v0 meeting 1: BLOCKED
   489  		    running task comprehension
   490  		    running task 5`,
   491  		state: `
   492  			v0 (SUCCESS): (frozen)
   493  			    task:    comprehension: SUCCESS (unblocked)
   494  			v1 (SUCCESS):
   495  			    task:    5: SUCCESS`,
   496  	}, {
   497  		// This test simulates a case where comprehensions mutually project
   498  		// on each other.
   499  		//
   500  		//	x: {
   501  		//		for k, v in y {
   502  		//			(k): v
   503  		//		}
   504  		//	}
   505  		//	y: {
   506  		//		for k, v in x {
   507  		//			(k): v
   508  		//		}
   509  		//	}
   510  		name: "mutually cyclic projection",
   511  		init: func() {
   512  			v0 := node(nil)
   513  			x := node(v0)
   514  			y := node(v0)
   515  
   516  			success("comprehension", x, c1, 0, dep{node: y, needs: c1})
   517  			success("comprehension", y, c1, 0, dep{node: x, needs: c1})
   518  
   519  		},
   520  		log: `
   521  		    running task comprehension
   522  		    running task comprehension
   523  		        task comprehension waiting for v1 meeting 1: BLOCKED
   524  		        task comprehension waiting for v2 meeting 1: BLOCKED
   525  		    running task comprehension
   526  		    running task comprehension`,
   527  		state: `
   528  			v0 (SUCCESS):
   529  			v1 (SUCCESS): (frozen)
   530  			    task:    comprehension: SUCCESS (unblocked)
   531  			v2 (SUCCESS): (frozen)
   532  			    task:    comprehension: SUCCESS (unblocked)`,
   533  	}, {
   534  		// This test simulates a case where comprehensions are not allowed to
   535  		// project on each other cyclicly. CUE allows this, but it is to test
   536  		// that similar constructions where this is not allowed do not cause
   537  		// infinite loops.
   538  		//
   539  		//	x: {
   540  		//		for k, v in y {
   541  		//			(k): v
   542  		//		}
   543  		//	}
   544  		//	y: {
   545  		//		for k, v in x {
   546  		//			(k): v
   547  		//		}
   548  		//		foo: 5
   549  		//	}
   550  		name: "disallowed mutually cyclic projection",
   551  		init: func() {
   552  			v0 := node(nil)
   553  			x := node(v0)
   554  			y := node(v0)
   555  			foo := node(y)
   556  			success("5", foo, c1, 0)
   557  
   558  			success("comprehension", x, c1, 0, dep{node: y, needs: c1})
   559  			success("comprehension", y, c1, 0, dep{node: x, needs: c1})
   560  
   561  		},
   562  		log: `
   563  		    running task comprehension
   564  		    running task comprehension
   565  		        task comprehension waiting for v1 meeting 1: BLOCKED
   566  		        task comprehension waiting for v2 meeting 1: BLOCKED
   567  		    running task comprehension
   568  		    running task comprehension
   569  		    running task 5`,
   570  		state: `
   571  			v0 (SUCCESS):
   572  			v1 (SUCCESS): (frozen)
   573  			    task:    comprehension: SUCCESS (unblocked)
   574  			v2 (SUCCESS): (frozen)
   575  			    task:    comprehension: SUCCESS (unblocked)
   576  			v3 (SUCCESS):
   577  			    task:    5: SUCCESS`,
   578  	}}
   579  
   580  	cuetest.Run(t, cases, func(t *cuetest.T, tc *testCase) {
   581  		// t.Update(true)
   582  		// t.Select("non-cyclic_dependencies_between_nodes_p2")
   583  
   584  		nodeID = 0
   585  		nodes = nodes[:0]
   586  		w.Reset()
   587  
   588  		// Create and run root scheduler.
   589  		tc.init()
   590  		for _, n := range nodes {
   591  			n.provided |= auto
   592  			n.signalDoneAdding()
   593  		}
   594  		for _, n := range nodes {
   595  			n.finalize(auto)
   596  		}
   597  
   598  		t.Equal(w.String(), tc.log)
   599  
   600  		w := &strings.Builder{}
   601  		for _, n := range nodes {
   602  			fmt.Fprintf(w, "\n\t\t\tv%d (%v):", n.refCount, n.state)
   603  			if n.scheduler.isFrozen {
   604  				fmt.Fprint(w, " (frozen)")
   605  			}
   606  			for _, t := range n.tasks {
   607  				fmt.Fprintf(w, "\n\t\t\t    task:    %s: %v", t.x.(*String).Str, t.state)
   608  				if t.unblocked {
   609  					fmt.Fprint(w, " (unblocked)")
   610  				}
   611  			}
   612  			for _, t := range n.blocking {
   613  				if t.blockedOn != nil {
   614  					fmt.Fprintf(w, "\n\t\t\t    blocked: %s: %v", t.x.(*String).Str, t.state)
   615  				}
   616  			}
   617  		}
   618  
   619  		t.Equal(w.String(), tc.state)
   620  	})
   621  }
   622  

View as plain text