package engine import ( "context" "fmt" "strings" "testing" "time" "github.com/go-logr/logr" "github.com/go-logr/logr/funcr" "gotest.tools/v3/assert" "gotest.tools/v3/poll" "edge-infra.dev/pkg/edge/rollouts" "edge-infra.dev/pkg/edge/rollouts/drivers" "edge-infra.dev/pkg/edge/rollouts/internal" ) func newStdoutLogger() logr.Logger { return funcr.New(func(prefix, args string) { if prefix != "" { fmt.Printf("%s: %s\n", prefix, args) } else { fmt.Println(args) } }, funcr.Options{}) } func TestNewEngine(t *testing.T) { store := drivers.NewInMemoryRolloutStore(internal.NewExampleInMemStore()) assert.Assert(t, store != nil) logger := newStdoutLogger() engine := NewRolloutEngine(store, logger, time.Millisecond*10) assert.Assert(t, engine != nil) } func TestEngineRun(t *testing.T) { pollDelay := 5 * time.Millisecond pollTimeout := 50 * time.Millisecond inMemStore := internal.NewExampleInMemStore() store := drivers.NewInMemoryRolloutStore(inMemStore) assert.Assert(t, store != nil) ctx, cancel := context.WithCancelCause(context.Background()) defer cancel(nil) logger := newStdoutLogger() engine := NewRolloutEngine(store, logger, time.Millisecond*10) assert.Assert(t, engine != nil) // get the initial graph state testRollout, err := store.GetRollout(ctx, internal.ExampleRolloutID) assert.NilError(t, err) // start engine in background which will modify the state concurrently engineRunErr := engine.Run(ctx) assert.NilError(t, engineRunErr) // listen to progress rollout via channel by default engineListenErr := engine.Listen(ctx) assert.NilError(t, engineListenErr) // check that the first "blocking" state is reached - node will remain in progress until something // asynchronously updates the underlying state tg1Check := func(_ poll.LogT) poll.Result { tg1 := testRollout.Nodes[rollouts.NodeKey("tg1")] if tg1.GetState() == rollouts.InProgress { fmt.Println("reached tg1Check success") return poll.Success() } return poll.Continue("waiting on tg1 to be InProgress. current state: %s", tg1.GetState()) } poll.WaitOn(t, tg1Check, poll.WithDelay(pollDelay), poll.WithTimeout(pollTimeout)) // TODO: change the state // set the "actual" version of the nodes targeted by tg1 to = the desired version // set tg1 "dev" cluster versions to Ready devClusterIDs, err := inMemStore.GetClusterLabelMatches("dev") assert.NilError(t, err) inMemStore.SetClusterArtifactReady(internal.StoreArtifactName, devClusterIDs) tg1CompleteCheck := func(_ poll.LogT) poll.Result { tg1 := testRollout.Nodes[rollouts.NodeKey("tg1")] if tg1.GetState() == rollouts.Complete { fmt.Println("reached tg1CompleteCheck success") return poll.Success() } return poll.Continue("waiting on tg1 to be %s. current state: %s", rollouts.Complete, tg1.GetState()) } poll.WaitOn(t, tg1CompleteCheck, poll.WithDelay(pollDelay), poll.WithTimeout(pollTimeout)) // set tg2 and 3 "staging:east" + "staging:west" to Ready stageEastClusterIDs, err := inMemStore.GetClusterLabelMatches("staging:east") assert.NilError(t, err) stageWestClusterIDs, err := inMemStore.GetClusterLabelMatches("staging:west") assert.NilError(t, err) inMemStore.SetClusterArtifactReady(internal.StoreArtifactName, stageEastClusterIDs) inMemStore.SetClusterArtifactReady(internal.StoreArtifactName, stageWestClusterIDs) tg2And3CompleteCheck := func(_ poll.LogT) poll.Result { tg2 := testRollout.Nodes[rollouts.NodeKey("tg2")] tg3 := testRollout.Nodes[rollouts.NodeKey("tg3")] if tg2.GetState() == rollouts.Complete && tg3.GetState() == rollouts.Complete { fmt.Println("reached tg2And3CompleteCheck success") return poll.Success() } return poll.Continue("waiting on tg2 and tg3 to be %s. current state: tg2: %s tg3: %s", rollouts.Complete, tg2.GetState(), tg3.GetState()) } poll.WaitOn(t, tg2And3CompleteCheck, poll.WithDelay(pollDelay), poll.WithTimeout(pollTimeout)) // check for approval gate in pending ag1PendingCheck := func(_ poll.LogT) poll.Result { ag1 := testRollout.Nodes[rollouts.NodeKey("ag1")] if ag1.GetState() == rollouts.Pending { fmt.Println("reached ag1PendingCheck success") return poll.Success() } return poll.Continue("waiting on ag1 to be %s. current state: %s", rollouts.Pending, ag1.GetState()) } poll.WaitOn(t, ag1PendingCheck, poll.WithDelay(pollDelay), poll.WithTimeout(pollTimeout)) // check for approval gate in pending ag1 := testRollout.Nodes[rollouts.NodeKey("ag1")].(*rollouts.ApprovalGate) err = inMemStore.OpenApprovalGate(ag1.GetKey()) assert.NilError(t, err) ag1CompleteCheck := func(_ poll.LogT) poll.Result { if ag1.GetState() == rollouts.Complete { fmt.Println("reached ag1CompleteCheck success") return poll.Success() } return poll.Continue("waiting on ag1 to be %s. current state: %s", rollouts.Complete, ag1.GetState()) } poll.WaitOn(t, ag1CompleteCheck, poll.WithDelay(pollDelay), poll.WithTimeout(pollTimeout)) // check for approval gate in pending tg4PendingCheck := func(_ poll.LogT) poll.Result { tg4 := testRollout.Nodes[rollouts.NodeKey("tg4")] if tg4.GetState() == rollouts.Pending { fmt.Println("reached tg4PendingCheck success") return poll.Success() } return poll.Continue("waiting on tg4 to be %s. current state: %s", rollouts.Pending, tg4.GetState()) } poll.WaitOn(t, tg4PendingCheck, poll.WithDelay(pollDelay), poll.WithTimeout(pollTimeout)) // mark tg4 ready prodClusterIDs, err := inMemStore.GetClusterLabelMatches("prod:us") assert.NilError(t, err) inMemStore.SetClusterArtifactReady(internal.StoreArtifactName, prodClusterIDs) tg4CompleteCheck := func(_ poll.LogT) poll.Result { tg4 := testRollout.Nodes[rollouts.NodeKey("tg4")] if tg4.GetState() == rollouts.Complete { fmt.Println("reached tg4CompleteCheck success") return poll.Success() } return poll.Continue("waiting on tg4 to be %s. current state: %s", rollouts.Complete, tg4.GetState()) } poll.WaitOn(t, tg4CompleteCheck, poll.WithDelay(pollDelay), poll.WithTimeout(pollTimeout)) // assert final graph state once the rollout is complete completeCheck := func(_ poll.LogT) poll.Result { incompleteNodes := []string{} for key, node := range testRollout.Nodes { if node.GetState() != rollouts.Complete { incompleteNodes = append(incompleteNodes, string(key)) } } if len(incompleteNodes) > 0 { return poll.Continue("waiting on nodes [%s] to complete", strings.Join(incompleteNodes, ",")) } // no incomplete node means all nodes have NodeState == complete return poll.Success() } poll.WaitOn(t, completeCheck, poll.WithDelay(pollDelay), poll.WithTimeout(pollTimeout)) // cancel the background engine.Run routine cancel(nil) <-ctx.Done() } var engineChanConditions = internal.ConditionMap{ "tg1": { NodeState: rollouts.InProgress, Action: func(inMemStore *internal.InMemStore, _ rollouts.NodeExecutionResult) { clusterIDs, _ := inMemStore.GetClusterLabelMatches("dev") inMemStore.SetClusterArtifactReady(internal.StoreArtifactName, clusterIDs) }}, "g1": { NodeState: rollouts.Complete, Action: func(_ *internal.InMemStore, _ rollouts.NodeExecutionResult) { }}, "tg2": { NodeState: rollouts.InProgress, Action: func(inMemStore *internal.InMemStore, _ rollouts.NodeExecutionResult) { clusterIDs, _ := inMemStore.GetClusterLabelMatches("staging:east") inMemStore.SetClusterArtifactReady(internal.StoreArtifactName, clusterIDs) }}, "tg3": { NodeState: rollouts.InProgress, Action: func(inMemStore *internal.InMemStore, _ rollouts.NodeExecutionResult) { clusterIDs, _ := inMemStore.GetClusterLabelMatches("staging:west") inMemStore.SetClusterArtifactReady(internal.StoreArtifactName, clusterIDs) }}, "ag1": { NodeState: rollouts.Pending, Action: func(inMemStore *internal.InMemStore, result rollouts.NodeExecutionResult) { _ = inMemStore.OpenApprovalGate(result.Key) }}, "tg4": { NodeState: rollouts.InProgress, Action: func(inMemStore *internal.InMemStore, _ rollouts.NodeExecutionResult) { clusterIDs, _ := inMemStore.GetClusterLabelMatches("prod:us") inMemStore.SetClusterArtifactReady(internal.StoreArtifactName, clusterIDs) }}, } func TestEngineChanRun(t *testing.T) { inMemStore := internal.NewExampleInMemStore() store := drivers.NewInMemoryRolloutStore(inMemStore) assert.Assert(t, store != nil) ctx, cancel := context.WithCancelCause(context.Background()) defer cancel(nil) logger := newStdoutLogger() engine := NewRolloutEngine(store, logger, time.Millisecond*10) assert.Assert(t, engine != nil) err := engine.Run(ctx) if err != nil { t.Fatal(err) } timeout := time.NewTimer(time.Second * 2) testloop: for { fmt.Println("in for") select { case nodeResult := <-engine.resultChan: fmt.Println("selecting engine event") switch nodeResult.RolloutState { case rollouts.RolloutComplete: t.Log("done") cancel(nil) <-ctx.Done() break testloop default: fmt.Println("received from event chan") fmt.Println(nodeResult.Message) fmt.Println("running event check") err := internal.ModifyStore(inMemStore, engineChanConditions, nodeResult) if err != nil { t.Fatal(err) } } case <-timeout.C: fmt.Println("timeout") cancel(nil) <-ctx.Done() t.Fatal("test timed out") } } } func TestEngineRunListen(t *testing.T) { inMemStore := internal.NewExampleInMemStore() store := drivers.NewInMemoryRolloutStore(inMemStore) assert.Assert(t, store != nil) ctx, cancel := context.WithCancelCause(context.Background()) defer cancel(nil) logger := newStdoutLogger() engine := NewRolloutEngine(store, logger, time.Millisecond*10) assert.Assert(t, engine != nil) err := engine.Run(ctx) if err != nil { t.Fatal(err) } err = engine.Listen(ctx) if err != nil { t.Fatal(err) } <-time.After(time.Second * 1) cancel(nil) // There is nothing to progres the external state of the rollout, so // it will just run without error and stay at the first target group assert.NilError(t, err) }