package drivers import ( "context" "fmt" "strings" "github.com/go-logr/logr" "edge-infra.dev/pkg/edge/rollouts" ) type RolloutStore interface { GetRollout(ctx context.Context, rolloutID string) (*rollouts.RolloutGraph, error) GetRollouts(ctx context.Context) ([]*rollouts.RolloutGraph, error) GetBannerRollouts(ctx context.Context, bannerID string) ([]*rollouts.RolloutGraph, error) GetRolloutPlan(ctx context.Context, planID string) (rollouts.RolloutPlan, error) GetRolloutPlans(ctx context.Context) ([]rollouts.RolloutPlan, error) GetBannerRolloutPlans(ctx context.Context, bannerID string) ([]rollouts.RolloutPlan, error) OpenApprovalGate(ctx context.Context, rolloutID string, nodeKey rollouts.NodeKey) error NewRolloutInstance(ctx context.Context, logger logr.Logger, rollout *rollouts.RolloutGraph, notifyChan chan rollouts.NodeExecutionResult) (*RolloutInstance, error) } type RolloutInstance struct { driver rolloutDriver logger logr.Logger NotifyChan chan rollouts.NodeExecutionResult } func (ri *RolloutInstance) sendEvent(event rollouts.NodeExecutionResult) { ri.NotifyChan <- event } type rolloutDriver interface { getRollout() *rollouts.RolloutGraph execute(ctx context.Context, node rollouts.RolloutGraphNode) (rollouts.NodeExecutionResult, error) executeTargetGroup(ctx context.Context, tg *rollouts.TargetGroup) (rollouts.NodeExecutionResult, error) checkTimerGate(ctx context.Context, tg *rollouts.TimerGate) (rollouts.NodeExecutionResult, error) checkApprovalGate(ctx context.Context, ag *rollouts.ApprovalGate) (rollouts.NodeExecutionResult, error) persistGraph(ctx context.Context) error } func RunRollout(ctx context.Context, ri *RolloutInstance) (bool, error) { // start the rollout done, results, err := processStep(ctx, ri) if err != nil { return false, err } // TODO: persist here // So far inmem doesn't do anything, but SQL instance would update the jsonb // field with the current state of the graph if err = ri.driver.persistGraph(ctx); err != nil { return false, err } if done { // doneEvent := events.RolloutEvent{Rollout: rollout, RolloutState: events.RolloutComplete} doneResult := rollouts.NodeExecutionResult{ Message: fmt.Sprintf("rollout %s complete", ri.driver.getRollout().ID), RolloutState: rollouts.RolloutComplete, } ri.sendEvent(doneResult) return true, nil } for _, result := range results { result.RolloutState = rollouts.RolloutInProgress ri.sendEvent(result) } // nodes should not be in-progress if their dependencies have not finished for _, currKey := range ri.driver.getRollout().Current { curr := ri.driver.getRollout().Nodes[currKey] allDepsComplete := true var incompleteDep rollouts.RolloutGraphNode for _, depOfCurr := range curr.GetDependsOn() { if depOfCurr.GetState() != rollouts.Complete { allDepsComplete = false incompleteDep = depOfCurr } } if !allDepsComplete && curr.GetState() == rollouts.InProgress { return false, fmt.Errorf("node %s was started without finishing its deps first. incomplete dep: %s", curr.GetKey(), incompleteDep.GetKey()) } } return false, nil } func processStep(ctx context.Context, ri *RolloutInstance) (bool, []rollouts.NodeExecutionResult, error) { nextNodes := map[rollouts.NodeKey]rollouts.RolloutGraphNode{} results := []rollouts.NodeExecutionResult{} // TODO(edge-foundation): Replace/remove print debugging beflre merging ri.logger.Info(fmt.Sprint("Processing step. Graph:", ri.driver.getRollout().ID)) // there can be multiple "current" nodes, ie in a fan-out style graph // TODO: phase 2+ parallelism for _, currKey := range ri.driver.getRollout().Current { // check if all of the current node's deps are met before proceeding curr := ri.driver.getRollout().Nodes[currKey] allDepsComplete := true for _, dep := range curr.GetDependsOn() { // approach 1: let nodes track their state. ie, pending, in-progress, or complete if dep.GetState() != rollouts.Complete { allDepsComplete = false break } // approach 2: track node results in graph. nodes completely stateless // result, ok := g.NodeExecutionResults[dep.Key()] // if !ok || !(result.Done()) { // continue // } } if !allDepsComplete { continue } result, err := ri.driver.execute(ctx, curr) if err != nil { // handle error. log, return, and make sure caller can rollback and transactions etc return false, nil, err } results = append(results, result) // if the execution was successful, save that result in the graph by marking the edge as succeeded ri.driver.getRollout().NodeExecutionResults[curr.GetKey()] = result success := result.Done() if success { // if node succeeds, and add its Next nodes to current, without duplicates (eg > 1 incoming edges) for _, maybeNext := range curr.GetNext() { if _, seen := nextNodes[maybeNext.GetKey()]; !seen { ri.logger.Info(fmt.Sprintf("first time seeing %s, adding to next\n", maybeNext.GetKey())) nextNodes[maybeNext.GetKey()] = maybeNext } } } else { // if node is not successful yet, add it back to the list to process next time nextNodes[curr.GetKey()] = curr } } // set current to list of next nodes to process, only adding nodes whos deps are all complete nextKeys := []rollouts.NodeKey{} for _, n := range nextNodes { nextKeys = append(nextKeys, n.GetKey()) } ri.driver.getRollout().Current = nextKeys // if there are no more nodes, it is complete if len(ri.driver.getRollout().Current) == 0 { // TODO consider removing the overall "Complete"ness state. track in db / external to graph. callers should // use the return value of process step to determine completeness/doneness return true, results, nil } // stringify just for debug logging nextNodesStrs := []string{} for _, n := range nextNodes { nextNodesStrs = append(nextNodesStrs, fmt.Sprintf("%s (%s)", n.GetLabel(), n.GetKey())) } // TODO(edge-foundation): Replace/remove print debugging before merging // TODO: this still prints out even if the "new" current == last current. only log if it changes? ri.logger.Info(fmt.Sprintln("Done processing step. Current node(s):", fmt.Sprintf("[%v]", strings.Join(nextNodesStrs, ",")))) return false, results, nil }