...
1 package engine
2
3 import (
4 "context"
5 "fmt"
6 "time"
7
8 "github.com/go-logr/logr"
9
10 "edge-infra.dev/pkg/edge/rollouts"
11 "edge-infra.dev/pkg/edge/rollouts/drivers"
12 )
13
14
15
16
17
18
19
20
21 type RolloutEngine struct {
22 logger logr.Logger
23 store drivers.RolloutStore
24 interval time.Duration
25 resultChan chan rollouts.NodeExecutionResult
26 }
27
28
29
30
31
32 func NewRolloutEngine(store drivers.RolloutStore, logger logr.Logger, interval time.Duration) *RolloutEngine {
33
34
35 resultChan := make(chan rollouts.NodeExecutionResult)
36 return &RolloutEngine{
37 logger: logger,
38 store: store,
39 interval: interval,
40 resultChan: resultChan,
41 }
42 }
43
44
45
46
47 func (e *RolloutEngine) Listen(ctx context.Context) error {
48
49 go func() {
50 for {
51 select {
52 case nodeResult := <-e.resultChan:
53 e.logger.Info(nodeResult.Message)
54 case <-ctx.Done():
55 return
56 }
57 }
58 }()
59 return nil
60 }
61
62 func (e *RolloutEngine) Run(ctx context.Context) error {
63 log := e.logger.WithName("rollout")
64 go func(ctx context.Context, interval time.Duration) {
65 for {
66
67
68 log.Info("getting rollouts")
69 rollouts, err := e.store.GetRollouts(ctx)
70 if err != nil {
71 log.Error(err, "failed to get clusters matching labels")
72 }
73 for _, rollout := range rollouts {
74 e.logger.Info(fmt.Sprintf("running rollout %s", rollout.ID))
75 instance, err := e.store.NewRolloutInstance(ctx, e.logger.WithName("rollout-instance"), rollout, e.resultChan)
76 if err != nil {
77 log.Error(err, fmt.Sprintf("rollout %s errored", rollout.ID))
78 }
79 done, err := drivers.RunRollout(ctx, instance)
80 if err != nil {
81 log.Error(err, fmt.Sprintf("rollout %s errored", rollout.ID))
82 }
83 if done {
84 log.Info(fmt.Sprintf("rollout %s is done", rollout.ID))
85 }
86 }
87 select {
88 case <-time.After(interval):
89 continue
90 case <-ctx.Done():
91 log.Info("ending continuous rollout")
92 return
93 }
94 }
95 }(ctx, e.interval)
96 return nil
97 }
98
View as plain text