...

Source file src/github.com/datawire/ambassador/v2/pkg/agent/directive_handler.go

Documentation: github.com/datawire/ambassador/v2/pkg/agent

     1  package agent
     2  
     3  import (
     4  	"context"
     5  	"time"
     6  
     7  	agentapi "github.com/datawire/ambassador/v2/pkg/api/agent"
     8  	"github.com/datawire/dlib/dlog"
     9  )
    10  
    11  type DirectiveHandler interface {
    12  	HandleDirective(context.Context, *Agent, *agentapi.Directive)
    13  }
    14  
    15  type BasicDirectiveHandler struct {
    16  	DefaultMinReportPeriod time.Duration
    17  	rolloutsGetterFactory  rolloutsGetterFactory
    18  }
    19  
    20  func (dh *BasicDirectiveHandler) HandleDirective(ctx context.Context, a *Agent, directive *agentapi.Directive) {
    21  	if directive == nil {
    22  		dlog.Warn(ctx, "Received empty directive, ignoring.")
    23  		return
    24  	}
    25  	ctx = dlog.WithField(ctx, "directive", directive.ID)
    26  
    27  	dlog.Debug(ctx, "Directive received")
    28  
    29  	if directive.StopReporting {
    30  		// The Director wants us to stop reporting
    31  		a.StopReporting(ctx)
    32  	}
    33  
    34  	if directive.MinReportPeriod != nil {
    35  		// The Director wants to adjust the minimum time we wait between reports
    36  		protoDur := directive.MinReportPeriod
    37  		// Note: This conversion ignores potential overflow. In practice this
    38  		// shouldn't be a problem, as the server will be constructing this
    39  		// durationpb.Duration from a valid time.Duration.
    40  		dur := time.Duration(protoDur.Seconds)*time.Second + time.Duration(protoDur.Nanos)*time.Nanosecond
    41  		dur = MaxDuration(dur, dh.DefaultMinReportPeriod) // respect configured minimum
    42  		a.SetMinReportPeriod(ctx, dur)
    43  	}
    44  
    45  	for _, command := range directive.Commands {
    46  		if command.Message != "" {
    47  			dlog.Info(ctx, command.Message)
    48  		}
    49  
    50  		if command.RolloutCommand != nil {
    51  			dh.handleRolloutCommand(ctx, command.RolloutCommand, a)
    52  		}
    53  	}
    54  
    55  	a.SetLastDirectiveID(ctx, directive.ID)
    56  }
    57  
    58  func (dh *BasicDirectiveHandler) handleRolloutCommand(ctx context.Context, cmdSchema *agentapi.RolloutCommand, a *Agent) {
    59  	if dh.rolloutsGetterFactory == nil {
    60  		dlog.Warn(ctx, "Received rollout command but does not know how to talk to Argo Rollouts.")
    61  		return
    62  	}
    63  
    64  	rolloutName := cmdSchema.GetName()
    65  	namespace := cmdSchema.GetNamespace()
    66  	action := int32(cmdSchema.GetAction())
    67  	commandID := cmdSchema.GetCommandId()
    68  
    69  	if rolloutName == "" {
    70  		dlog.Warn(ctx, "Rollout command received without a rollout name.")
    71  		return
    72  	}
    73  
    74  	if namespace == "" {
    75  		dlog.Warn(ctx, "Rollout command received without a namespace.")
    76  		return
    77  	}
    78  
    79  	if commandID == "" {
    80  		dlog.Warn(ctx, "Rollout command received without a command ID.")
    81  		return
    82  	}
    83  
    84  	cmd := &rolloutCommand{
    85  		rolloutName: rolloutName,
    86  		namespace:   namespace,
    87  		action:      rolloutAction(agentapi.RolloutCommand_Action_name[action]),
    88  	}
    89  	err := cmd.RunWithClientFactory(ctx, dh.rolloutsGetterFactory)
    90  	if err != nil {
    91  		dlog.Errorf(ctx, "error running rollout command %s: %s", cmd, err)
    92  	}
    93  	dh.reportCommandResult(ctx, commandID, cmd, err, a)
    94  }
    95  
    96  func (dh *BasicDirectiveHandler) reportCommandResult(ctx context.Context, commandID string, cmd *rolloutCommand, cmdError error, a *Agent) {
    97  	result := &agentapi.CommandResult{CommandId: commandID, Success: true}
    98  	if cmdError != nil {
    99  		result.Success = false
   100  		result.Message = cmdError.Error()
   101  	}
   102  	a.ambassadorAPIKeyMutex.Lock()
   103  	apiKey := a.ambassadorAPIKey
   104  	a.ambassadorAPIKeyMutex.Unlock()
   105  	err := a.comm.ReportCommandResult(ctx, result, apiKey)
   106  	if err != nil {
   107  		dlog.Errorf(ctx, "error reporting result of rollout command %s: %s", cmd, err)
   108  	}
   109  }
   110  

View as plain text