// Package retryclient provides a Kubernetes client wrapper that implements retries // for Kubernetes methods. This is primarily to handle issues related to quorum instability. package retryclient import ( "context" "fmt" "time" ctrlclient "sigs.k8s.io/controller-runtime/pkg/client" "edge-infra.dev/pkg/sds/lib/k8s/retryclient/types" ) // RetryClient is a Kubernetes client that incorporates retries in its operations type RetryClient struct { client ctrlclient.Client reader ctrlclient.Reader Config useCache bool } // Config is a data structure used to configure the RetryClient. The fields can be used in the // following ways: // // RequestTimeout: The timeout used for client requests // // InitialBackoff: The initial backoff period to wait between failed requests // // BackoffFactor: The factor by which to increase the backoff period after each failed attempt // // MaxRetries: The maximum number of retries to attempt before returning an error type Config struct { RequestTimeout time.Duration InitialBackoff time.Duration BackoffFactor float64 MaxRetries int } // New creates a new instance of RetryClient as a Retrier interface. If not provided via // the config then the RetryClient will have the default args: // // RequestTimeout: 5 Seconds // // InitialBackoff: 0.5 Seconds // // BackoffFactor: 1.5 // // MaxRetries: 3 // // You can keep all of the defaults or overwrite individual values by only including the // values you wish to overwrite in the provided config. You can not set any of these fields // to their nil values because these will not overwrite the defaults. func New(client ctrlclient.Client, reader ctrlclient.Reader, config Config) types.Retrier { if config.RequestTimeout == 0*time.Second { config.RequestTimeout = 5 * time.Second } if config.InitialBackoff == 0*time.Second { config.InitialBackoff = 500 * time.Millisecond } if config.BackoffFactor == 0 { config.BackoffFactor = 1 } if config.MaxRetries == 0 { config.MaxRetries = 3 } return &RetryClient{ client, reader, config, true, } } // withRetry is a method to add retry capabilities to Kubernetes client requests. // It will add a timeout to the target function, configured by the RequestTimeout config // parameter. It will continue to attempt to complete the target function until the // MaxRetries has been reached or a non-nil error response returned from the target function. // It sleeps after each unsuccessful attempt, starting with the InitialBackoff duration. // This sleep duration will increase exponentially as it is multiplied by the BackoffFactor // after each unsuccessful attempt. func (r *RetryClient) withRetry(ctx context.Context, fn func(ctx context.Context) error) error { var err error backoff := r.InitialBackoff retries := 0 // continue trying the function until the MaxRetries is reached, or the // target function returns a non-nil error response for { ctx, cancel := context.WithTimeout(ctx, r.RequestTimeout) defer cancel() err = fn(ctx) if err == nil { break } if retries >= r.MaxRetries { break } time.Sleep(backoff) backoff = time.Duration(float64(backoff) * r.BackoffFactor) retries++ } return err } // SafeGet attempts to retrieve a Kubernetes object, with retries. func (r *RetryClient) SafeGet(ctx context.Context, objKey ctrlclient.ObjectKey, obj ctrlclient.Object, opts ...ctrlclient.GetOption) error { return r.withRetry(ctx, func(ctx context.Context) error { if r.useCache { return r.client.Get(ctx, objKey, obj, opts...) } return r.reader.Get(ctx, objKey, obj, opts...) }) } // SafeList attempts to list Kubernetes objects, with retries. func (r *RetryClient) SafeList(ctx context.Context, list ctrlclient.ObjectList, opts ...ctrlclient.ListOption) error { return r.withRetry(ctx, func(ctx context.Context) error { if r.useCache { return r.client.List(ctx, list, opts...) } return r.reader.List(ctx, list, opts...) }) } // SafeCreate attempts to create a Kubernetes object, with retries. func (r *RetryClient) SafeCreate(ctx context.Context, obj ctrlclient.Object, opts ...ctrlclient.CreateOption) error { return r.withRetry(ctx, func(ctx context.Context) error { return r.client.Create(ctx, obj, opts...) }) } // SafeDelete attempts to delete a Kubernetes object, with retries. func (r *RetryClient) SafeDelete(ctx context.Context, obj ctrlclient.Object, opts ...ctrlclient.DeleteOption) error { return r.withRetry(ctx, func(ctx context.Context) error { return r.client.Delete(ctx, obj, opts...) }) } // SafeUpdate attempts to update a Kubernetes object using the provided // function, with retries. func (r *RetryClient) SafeUpdate(ctx context.Context, objKey ctrlclient.ObjectKey, obj ctrlclient.Object, fn func(context.Context, ctrlclient.Object) error, updateOpts ...ctrlclient.UpdateOption) error { return r.withRetry(ctx, func(ctx context.Context) error { var err error switch r.useCache { case true: err = r.client.Get(ctx, objKey, obj) case false: err = r.reader.Get(ctx, objKey, obj) } if err != nil { return fmt.Errorf("failed to get Kubernetes object: %w", err) } if err := fn(ctx, obj); err != nil { return err } return r.client.Update(ctx, obj, updateOpts...) }) } // IgnoreCache returns a retryclient object that can be chained to make // direct API server requests without using the client cache. This should // be used sparingly as it will increase direct requests to the API server. // The original retryclient will not be altered and will continue to use // cache. This is done to make sure cache is only ignored where it is // absolutely necessary. func (r RetryClient) IgnoreCache() types.Retrier { r.useCache = false return &r } // Client will return the controller runtime client used by the // retry client func (r *RetryClient) Client() ctrlclient.Client { return r.client } // Reader will return the controller runtime API reader used by the // retry client func (r *RetryClient) Reader() ctrlclient.Reader { return r.reader }