package engine import ( "context" "fmt" "time" "github.com/go-logr/logr" "edge-infra.dev/pkg/edge/rollouts" "edge-infra.dev/pkg/edge/rollouts/drivers" ) // The RolloutEngine struct is the primary interface to running rollouts for a given // environment. Creating a new engine and calling Run will run the engine in the background // and process any waiting rollouts. // // The Engine relies on reading from a rollouts.NodeExecutionResult channel in order // to process the rollout steps. The easiest way to run the engine is to also call // Listen which will log the results messages and progress the rollouts. type RolloutEngine struct { logger logr.Logger store drivers.RolloutStore interval time.Duration resultChan chan rollouts.NodeExecutionResult } // TODO: handler for Listen so behavior can be injected that listens on the // result channel as an option // type ListenHandler func(*RolloutEngine, chan rollouts.NodeExecutionResult) error func NewRolloutEngine(store drivers.RolloutStore, logger logr.Logger, interval time.Duration) *RolloutEngine { // TODO(dk185217): consider passing in the result channel instead, so callers can control what // they do with the results resultChan := make(chan rollouts.NodeExecutionResult) return &RolloutEngine{ logger: logger, store: store, interval: interval, resultChan: resultChan, } } // TODO(dk185217): maybe move out of engine. this is concerned with consuming the events // emitted by the engine // Default function for progressing rollout event channels func (e *RolloutEngine) Listen(ctx context.Context) error { // Infinitely pull/log from eventChan go func() { for { select { case nodeResult := <-e.resultChan: e.logger.Info(nodeResult.Message) case <-ctx.Done(): return } } }() return nil } func (e *RolloutEngine) Run(ctx context.Context) error { log := e.logger.WithName("rollout") go func(ctx context.Context, interval time.Duration) { for { // TODO(dk185217): Instead of polling, wait for input on an input channel // <-inputChan log.Info("getting rollouts") rollouts, err := e.store.GetRollouts(ctx) if err != nil { log.Error(err, "failed to get clusters matching labels") } for _, rollout := range rollouts { e.logger.Info(fmt.Sprintf("running rollout %s", rollout.ID)) instance, err := e.store.NewRolloutInstance(ctx, e.logger.WithName("rollout-instance"), rollout, e.resultChan) if err != nil { log.Error(err, fmt.Sprintf("rollout %s errored", rollout.ID)) } done, err := drivers.RunRollout(ctx, instance) if err != nil { log.Error(err, fmt.Sprintf("rollout %s errored", rollout.ID)) } if done { log.Info(fmt.Sprintf("rollout %s is done", rollout.ID)) } } select { case <-time.After(interval): continue case <-ctx.Done(): log.Info("ending continuous rollout") return } } }(ctx, e.interval) return nil }