...
1 package agent
2
3 import (
4 "context"
5 "time"
6
7 agentapi "github.com/datawire/ambassador/v2/pkg/api/agent"
8 "github.com/datawire/dlib/dlog"
9 )
10
11 type DirectiveHandler interface {
12 HandleDirective(context.Context, *Agent, *agentapi.Directive)
13 }
14
15 type BasicDirectiveHandler struct {
16 DefaultMinReportPeriod time.Duration
17 rolloutsGetterFactory rolloutsGetterFactory
18 }
19
20 func (dh *BasicDirectiveHandler) HandleDirective(ctx context.Context, a *Agent, directive *agentapi.Directive) {
21 if directive == nil {
22 dlog.Warn(ctx, "Received empty directive, ignoring.")
23 return
24 }
25 ctx = dlog.WithField(ctx, "directive", directive.ID)
26
27 dlog.Debug(ctx, "Directive received")
28
29 if directive.StopReporting {
30
31 a.StopReporting(ctx)
32 }
33
34 if directive.MinReportPeriod != nil {
35
36 protoDur := directive.MinReportPeriod
37
38
39
40 dur := time.Duration(protoDur.Seconds)*time.Second + time.Duration(protoDur.Nanos)*time.Nanosecond
41 dur = MaxDuration(dur, dh.DefaultMinReportPeriod)
42 a.SetMinReportPeriod(ctx, dur)
43 }
44
45 for _, command := range directive.Commands {
46 if command.Message != "" {
47 dlog.Info(ctx, command.Message)
48 }
49
50 if command.RolloutCommand != nil {
51 dh.handleRolloutCommand(ctx, command.RolloutCommand, a)
52 }
53 }
54
55 a.SetLastDirectiveID(ctx, directive.ID)
56 }
57
58 func (dh *BasicDirectiveHandler) handleRolloutCommand(ctx context.Context, cmdSchema *agentapi.RolloutCommand, a *Agent) {
59 if dh.rolloutsGetterFactory == nil {
60 dlog.Warn(ctx, "Received rollout command but does not know how to talk to Argo Rollouts.")
61 return
62 }
63
64 rolloutName := cmdSchema.GetName()
65 namespace := cmdSchema.GetNamespace()
66 action := int32(cmdSchema.GetAction())
67 commandID := cmdSchema.GetCommandId()
68
69 if rolloutName == "" {
70 dlog.Warn(ctx, "Rollout command received without a rollout name.")
71 return
72 }
73
74 if namespace == "" {
75 dlog.Warn(ctx, "Rollout command received without a namespace.")
76 return
77 }
78
79 if commandID == "" {
80 dlog.Warn(ctx, "Rollout command received without a command ID.")
81 return
82 }
83
84 cmd := &rolloutCommand{
85 rolloutName: rolloutName,
86 namespace: namespace,
87 action: rolloutAction(agentapi.RolloutCommand_Action_name[action]),
88 }
89 err := cmd.RunWithClientFactory(ctx, dh.rolloutsGetterFactory)
90 if err != nil {
91 dlog.Errorf(ctx, "error running rollout command %s: %s", cmd, err)
92 }
93 dh.reportCommandResult(ctx, commandID, cmd, err, a)
94 }
95
96 func (dh *BasicDirectiveHandler) reportCommandResult(ctx context.Context, commandID string, cmd *rolloutCommand, cmdError error, a *Agent) {
97 result := &agentapi.CommandResult{CommandId: commandID, Success: true}
98 if cmdError != nil {
99 result.Success = false
100 result.Message = cmdError.Error()
101 }
102 a.ambassadorAPIKeyMutex.Lock()
103 apiKey := a.ambassadorAPIKey
104 a.ambassadorAPIKeyMutex.Unlock()
105 err := a.comm.ReportCommandResult(ctx, result, apiKey)
106 if err != nil {
107 dlog.Errorf(ctx, "error reporting result of rollout command %s: %s", cmd, err)
108 }
109 }
110
View as plain text