...

Source file src/edge-infra.dev/pkg/edge/rollouts/engine/engine.go

Documentation: edge-infra.dev/pkg/edge/rollouts/engine

     1  package engine
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"time"
     7  
     8  	"github.com/go-logr/logr"
     9  
    10  	"edge-infra.dev/pkg/edge/rollouts"
    11  	"edge-infra.dev/pkg/edge/rollouts/drivers"
    12  )
    13  
    14  // The RolloutEngine struct is the primary interface to running rollouts for a given
    15  // environment. Creating a new engine and calling Run will run the engine in the background
    16  // and process any waiting rollouts.
    17  //
    18  // The Engine relies on reading from a rollouts.NodeExecutionResult channel in order
    19  // to process the rollout steps. The easiest way to run the engine is to also call
    20  // Listen which will log the results messages and progress the rollouts.
    21  type RolloutEngine struct {
    22  	logger     logr.Logger
    23  	store      drivers.RolloutStore
    24  	interval   time.Duration
    25  	resultChan chan rollouts.NodeExecutionResult
    26  }
    27  
    28  // TODO: handler for Listen so behavior can be injected that listens on the
    29  // result channel as an option
    30  // type ListenHandler func(*RolloutEngine, chan rollouts.NodeExecutionResult) error
    31  
    32  func NewRolloutEngine(store drivers.RolloutStore, logger logr.Logger, interval time.Duration) *RolloutEngine {
    33  	// TODO(dk185217): consider passing in the result channel instead, so callers can control what
    34  	// they do with the results
    35  	resultChan := make(chan rollouts.NodeExecutionResult)
    36  	return &RolloutEngine{
    37  		logger:     logger,
    38  		store:      store,
    39  		interval:   interval,
    40  		resultChan: resultChan,
    41  	}
    42  }
    43  
    44  // TODO(dk185217): maybe move out of engine. this is concerned with consuming the events
    45  // emitted by the engine
    46  // Default function for progressing rollout event channels
    47  func (e *RolloutEngine) Listen(ctx context.Context) error {
    48  	// Infinitely pull/log from eventChan
    49  	go func() {
    50  		for {
    51  			select {
    52  			case nodeResult := <-e.resultChan:
    53  				e.logger.Info(nodeResult.Message)
    54  			case <-ctx.Done():
    55  				return
    56  			}
    57  		}
    58  	}()
    59  	return nil
    60  }
    61  
    62  func (e *RolloutEngine) Run(ctx context.Context) error {
    63  	log := e.logger.WithName("rollout")
    64  	go func(ctx context.Context, interval time.Duration) {
    65  		for {
    66  			// TODO(dk185217): Instead of polling, wait for input on an input channel
    67  			// <-inputChan
    68  			log.Info("getting rollouts")
    69  			rollouts, err := e.store.GetRollouts(ctx)
    70  			if err != nil {
    71  				log.Error(err, "failed to get clusters matching labels")
    72  			}
    73  			for _, rollout := range rollouts {
    74  				e.logger.Info(fmt.Sprintf("running rollout %s", rollout.ID))
    75  				instance, err := e.store.NewRolloutInstance(ctx, e.logger.WithName("rollout-instance"), rollout, e.resultChan)
    76  				if err != nil {
    77  					log.Error(err, fmt.Sprintf("rollout %s errored", rollout.ID))
    78  				}
    79  				done, err := drivers.RunRollout(ctx, instance)
    80  				if err != nil {
    81  					log.Error(err, fmt.Sprintf("rollout %s errored", rollout.ID))
    82  				}
    83  				if done {
    84  					log.Info(fmt.Sprintf("rollout %s is done", rollout.ID))
    85  				}
    86  			}
    87  			select {
    88  			case <-time.After(interval):
    89  				continue
    90  			case <-ctx.Done():
    91  				log.Info("ending continuous rollout")
    92  				return
    93  			}
    94  		}
    95  	}(ctx, e.interval)
    96  	return nil
    97  }
    98  

View as plain text