...

Source file src/edge-infra.dev/pkg/sds/lib/k8s/retryclient/retry_client.go

Documentation: edge-infra.dev/pkg/sds/lib/k8s/retryclient

     1  // Package retryclient provides a Kubernetes client wrapper that implements retries
     2  // for Kubernetes methods. This is primarily to handle issues related to quorum instability.
     3  package retryclient
     4  
     5  import (
     6  	"context"
     7  	"fmt"
     8  	"time"
     9  
    10  	ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
    11  
    12  	"edge-infra.dev/pkg/sds/lib/k8s/retryclient/types"
    13  )
    14  
    15  // RetryClient is a Kubernetes client that incorporates retries in its operations
    16  type RetryClient struct {
    17  	client ctrlclient.Client
    18  	reader ctrlclient.Reader
    19  	Config
    20  	useCache bool
    21  }
    22  
    23  // Config is a data structure used to configure the RetryClient. The fields can be used in the
    24  // following ways:
    25  //
    26  // RequestTimeout: The timeout used for client requests
    27  //
    28  // InitialBackoff: The initial backoff period to wait between failed requests
    29  //
    30  // BackoffFactor: The factor by which to increase the backoff period after each failed attempt
    31  //
    32  // MaxRetries: The maximum number of retries to attempt before returning an error
    33  type Config struct {
    34  	RequestTimeout time.Duration
    35  	InitialBackoff time.Duration
    36  	BackoffFactor  float64
    37  	MaxRetries     int
    38  }
    39  
    40  // New creates a new instance of RetryClient as a Retrier interface. If not provided via
    41  // the config then the RetryClient will have the default args:
    42  //
    43  // RequestTimeout: 5 Seconds
    44  //
    45  // InitialBackoff: 0.5 Seconds
    46  //
    47  // BackoffFactor: 1.5
    48  //
    49  // MaxRetries: 3
    50  //
    51  // You can keep all of the defaults or overwrite individual values by only including the
    52  // values you wish to overwrite in the provided config. You can not set any of these fields
    53  // to their nil values because these will not overwrite the defaults.
    54  func New(client ctrlclient.Client, reader ctrlclient.Reader, config Config) types.Retrier {
    55  	if config.RequestTimeout == 0*time.Second {
    56  		config.RequestTimeout = 5 * time.Second
    57  	}
    58  	if config.InitialBackoff == 0*time.Second {
    59  		config.InitialBackoff = 500 * time.Millisecond
    60  	}
    61  	if config.BackoffFactor == 0 {
    62  		config.BackoffFactor = 1
    63  	}
    64  	if config.MaxRetries == 0 {
    65  		config.MaxRetries = 3
    66  	}
    67  	return &RetryClient{
    68  		client,
    69  		reader,
    70  		config,
    71  		true,
    72  	}
    73  }
    74  
    75  // withRetry is a method to add retry capabilities to Kubernetes client requests.
    76  // It will add a timeout to the target function, configured by the RequestTimeout config
    77  // parameter. It will continue to attempt to complete the target function until the
    78  // MaxRetries has been reached or a non-nil error response returned from the target function.
    79  // It sleeps after each unsuccessful attempt, starting with the InitialBackoff duration.
    80  // This sleep duration will increase exponentially as it is multiplied by the BackoffFactor
    81  // after each unsuccessful attempt.
    82  func (r *RetryClient) withRetry(ctx context.Context, fn func(ctx context.Context) error) error {
    83  	var err error
    84  	backoff := r.InitialBackoff
    85  	retries := 0
    86  
    87  	// continue trying the function until the MaxRetries is reached, or the
    88  	// target function returns a non-nil error response
    89  	for {
    90  		ctx, cancel := context.WithTimeout(ctx, r.RequestTimeout)
    91  		defer cancel()
    92  		err = fn(ctx)
    93  		if err == nil {
    94  			break
    95  		}
    96  
    97  		if retries >= r.MaxRetries {
    98  			break
    99  		}
   100  
   101  		time.Sleep(backoff)
   102  		backoff = time.Duration(float64(backoff) * r.BackoffFactor)
   103  		retries++
   104  	}
   105  	return err
   106  }
   107  
   108  // SafeGet attempts to retrieve a Kubernetes object, with retries.
   109  func (r *RetryClient) SafeGet(ctx context.Context, objKey ctrlclient.ObjectKey, obj ctrlclient.Object, opts ...ctrlclient.GetOption) error {
   110  	return r.withRetry(ctx, func(ctx context.Context) error {
   111  		if r.useCache {
   112  			return r.client.Get(ctx, objKey, obj, opts...)
   113  		}
   114  		return r.reader.Get(ctx, objKey, obj, opts...)
   115  	})
   116  }
   117  
   118  // SafeList attempts to list Kubernetes objects, with retries.
   119  func (r *RetryClient) SafeList(ctx context.Context, list ctrlclient.ObjectList, opts ...ctrlclient.ListOption) error {
   120  	return r.withRetry(ctx, func(ctx context.Context) error {
   121  		if r.useCache {
   122  			return r.client.List(ctx, list, opts...)
   123  		}
   124  		return r.reader.List(ctx, list, opts...)
   125  	})
   126  }
   127  
   128  // SafeCreate attempts to create a Kubernetes object, with retries.
   129  func (r *RetryClient) SafeCreate(ctx context.Context, obj ctrlclient.Object, opts ...ctrlclient.CreateOption) error {
   130  	return r.withRetry(ctx, func(ctx context.Context) error {
   131  		return r.client.Create(ctx, obj, opts...)
   132  	})
   133  }
   134  
   135  // SafeDelete attempts to delete a Kubernetes object, with retries.
   136  func (r *RetryClient) SafeDelete(ctx context.Context, obj ctrlclient.Object, opts ...ctrlclient.DeleteOption) error {
   137  	return r.withRetry(ctx, func(ctx context.Context) error {
   138  		return r.client.Delete(ctx, obj, opts...)
   139  	})
   140  }
   141  
   142  // SafeUpdate attempts to update a Kubernetes object using the provided
   143  // function, with retries.
   144  func (r *RetryClient) SafeUpdate(ctx context.Context, objKey ctrlclient.ObjectKey, obj ctrlclient.Object, fn func(context.Context, ctrlclient.Object) error, updateOpts ...ctrlclient.UpdateOption) error {
   145  	return r.withRetry(ctx, func(ctx context.Context) error {
   146  		var err error
   147  		switch r.useCache {
   148  		case true:
   149  			err = r.client.Get(ctx, objKey, obj)
   150  		case false:
   151  			err = r.reader.Get(ctx, objKey, obj)
   152  		}
   153  		if err != nil {
   154  			return fmt.Errorf("failed to get Kubernetes object: %w", err)
   155  		}
   156  
   157  		if err := fn(ctx, obj); err != nil {
   158  			return err
   159  		}
   160  
   161  		return r.client.Update(ctx, obj, updateOpts...)
   162  	})
   163  }
   164  
   165  // IgnoreCache returns a retryclient object that can be chained to make
   166  // direct API server requests without using the client cache. This should
   167  // be used sparingly as it will increase direct requests to the API server.
   168  // The original retryclient will not be altered and will continue to use
   169  // cache. This is done to make sure cache is only ignored where it is
   170  // absolutely necessary.
   171  func (r RetryClient) IgnoreCache() types.Retrier {
   172  	r.useCache = false
   173  	return &r
   174  }
   175  
   176  // Client will return the controller runtime client used by the
   177  // retry client
   178  func (r *RetryClient) Client() ctrlclient.Client {
   179  	return r.client
   180  }
   181  
   182  // Reader will return the controller runtime API reader used by the
   183  // retry client
   184  func (r *RetryClient) Reader() ctrlclient.Reader {
   185  	return r.reader
   186  }
   187  

View as plain text