...

Source file src/github.com/datawire/ambassador/v2/pkg/agent/rollouts.go

Documentation: github.com/datawire/ambassador/v2/pkg/agent

     1  package agent
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  
     7  	argov1alpha1 "github.com/argoproj/argo-rollouts/pkg/client/clientset/versioned/typed/rollouts/v1alpha1"
     8  	"github.com/datawire/dlib/dlog"
     9  	k8serrors "k8s.io/apimachinery/pkg/api/errors"
    10  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    11  	"k8s.io/apimachinery/pkg/types"
    12  	"k8s.io/client-go/rest"
    13  	"k8s.io/client-go/tools/clientcmd"
    14  )
    15  
    16  // rolloutAction indicates the action to be performed on a Rollout object
    17  type rolloutAction string
    18  
    19  const (
    20  	// rolloutActionPause represents the "pause" action on a Rollout
    21  	rolloutActionPause = rolloutAction("PAUSE")
    22  	// rolloutActionResume represents the "resume" action on a Rollout
    23  	rolloutActionResume = rolloutAction("RESUME")
    24  	// rolloutActionAbort represents the "abort" action on a Rollout
    25  	rolloutActionAbort = rolloutAction("ABORT")
    26  )
    27  
    28  // rolloutsGetterFactory is a factory for creating RolloutsGetter.
    29  type rolloutsGetterFactory func() (argov1alpha1.RolloutsGetter, error)
    30  
    31  // rolloutCommand holds a reference to a Rollout command to be ran.
    32  type rolloutCommand struct {
    33  	namespace   string
    34  	rolloutName string
    35  	action      rolloutAction
    36  }
    37  
    38  func (r *rolloutCommand) String() string {
    39  	return fmt.Sprintf("<rollout=%s namespace=%s action=%s>", r.rolloutName, r.namespace, r.action)
    40  }
    41  
    42  // RunWithClientFactory runs the given Rollout command using rolloutsClientFactory to get a RolloutsGetter.
    43  func (r *rolloutCommand) RunWithClientFactory(ctx context.Context, rolloutsClientFactory rolloutsGetterFactory) error {
    44  	client, err := rolloutsClientFactory()
    45  	if err != nil {
    46  		return err
    47  	}
    48  	return r.patchRollout(ctx, client)
    49  }
    50  
    51  const unpausePatch = `{"spec":{"paused":false}}`
    52  const abortPatch = `{"status":{"abort":true}}`
    53  const retryPatch = `{"status":{"abort":false}}`
    54  const pausePatch = `{"spec":{"paused":true}}`
    55  
    56  func (r *rolloutCommand) patchRollout(ctx context.Context, client argov1alpha1.RolloutsGetter) error {
    57  	var err error
    58  	switch r.action {
    59  	// The "Resume" action in the DCP should be able to recover from Rollout that is either paused or aborted.
    60  	// For more information about the need for rolloutCommand.applyRetryPatch to apply the "retry" patch, please check its godoc.
    61  	case rolloutActionResume:
    62  		err = r.applyPatch(ctx, client, unpausePatch)
    63  		if err == nil {
    64  			err = r.applyStatusPatch(ctx, client, retryPatch)
    65  		}
    66  	case rolloutActionAbort:
    67  		err = r.applyStatusPatch(ctx, client, abortPatch)
    68  	case rolloutActionPause:
    69  		err = r.applyPatch(ctx, client, pausePatch)
    70  	default:
    71  		err := fmt.Errorf(
    72  			"tried to perform unknown action '%s' on rollout %s (%s)",
    73  			r.action,
    74  			r.rolloutName,
    75  			r.namespace,
    76  		)
    77  		dlog.Errorln(ctx, err)
    78  		return err
    79  	}
    80  	if err != nil {
    81  		errMsg := fmt.Errorf(
    82  			"failed to %s rollout %s (%s): %w",
    83  			r.action,
    84  			r.rolloutName,
    85  			r.namespace,
    86  			err,
    87  		)
    88  		dlog.Errorln(ctx, errMsg)
    89  		return err
    90  	}
    91  	return nil
    92  }
    93  
    94  func (r *rolloutCommand) applyPatch(ctx context.Context, client argov1alpha1.RolloutsGetter, patch string) error {
    95  	rollout := client.Rollouts(r.namespace)
    96  	_, err := rollout.Patch(
    97  		ctx,
    98  		r.rolloutName,
    99  		types.MergePatchType,
   100  		[]byte(patch),
   101  		metav1.PatchOptions{},
   102  	)
   103  	return err
   104  }
   105  
   106  // applyStatusPatch exists because any change to a Rollout status (Rollout Abort or Retry)
   107  // requires a patch the rollouts/status subresource. If that fails, then base "rollouts" rollout is patched.
   108  // This is based on the logic of the Argo Rollouts CLI, as seen at https://github.com/argoproj/argo-rollouts/blob/v1.1.1/pkg/kubectl-argo-rollouts/cmd/retry/retry.go#L84.
   109  func (r *rolloutCommand) applyStatusPatch(ctx context.Context, client argov1alpha1.RolloutsGetter, patch string) error {
   110  	rollout := client.Rollouts(r.namespace)
   111  	_, err := rollout.Patch(
   112  		ctx,
   113  		r.rolloutName,
   114  		types.MergePatchType,
   115  		[]byte(patch),
   116  		metav1.PatchOptions{},
   117  		"status",
   118  	)
   119  	if err != nil && k8serrors.IsNotFound(err) {
   120  		_, err = rollout.Patch(
   121  			ctx,
   122  			r.rolloutName,
   123  			types.MergePatchType,
   124  			[]byte(patch),
   125  			metav1.PatchOptions{},
   126  		)
   127  	}
   128  	return err
   129  }
   130  
   131  // NewArgoRolloutsGetter creates a RolloutsGetter from Argo's v1alpha1 API.
   132  func NewArgoRolloutsGetter() (argov1alpha1.RolloutsGetter, error) {
   133  	kubeConfig, err := newK8sRestClient()
   134  	if err != nil {
   135  		return nil, err
   136  	}
   137  
   138  	argoClient, err := argov1alpha1.NewForConfig(kubeConfig)
   139  	if err != nil {
   140  		return nil, err
   141  	}
   142  
   143  	return argoClient, nil
   144  }
   145  
   146  func newK8sRestClient() (*rest.Config, error) {
   147  	config, err := clientcmd.NewNonInteractiveDeferredLoadingClientConfig(
   148  		clientcmd.NewDefaultClientConfigLoadingRules(),
   149  		&clientcmd.ConfigOverrides{},
   150  	).ClientConfig()
   151  	if err != nil {
   152  		return nil, err
   153  	}
   154  	return config, nil
   155  }
   156  

View as plain text