...

Source file src/edge-infra.dev/pkg/sds/lib/etcd/client/retry/retry_client.go

Documentation: edge-infra.dev/pkg/sds/lib/etcd/client/retry

     1  // Package retry provides an etcd client wrapper that implements retries
     2  // for etcd methods. This is primarily to handle issues related to quorum instability.
     3  package retry
     4  
     5  import (
     6  	"context"
     7  	"time"
     8  
     9  	clientv3 "go.etcd.io/etcd/client/v3"
    10  )
    11  
    12  // Retrier is an interface for interacting with an etcd cluster with retry logic.
    13  // It provides methods for safely listing members, adding members as learners, promoting members,
    14  // and removing members.
    15  //
    16  //go:generate mockgen -destination=./mocks/mock_retry_client.go -package=mocks edge-infra.dev/pkg/sds/lib/etcd/client/retry Retrier
    17  type Retrier interface {
    18  	SafeMemberList(ctx context.Context) (*clientv3.MemberListResponse, error)
    19  	SafeMemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error)
    20  	SafeMemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error)
    21  	SafeMemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error)
    22  	SafeStatus(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
    23  	SafeAlarmList(ctx context.Context) (*clientv3.AlarmResponse, error)
    24  	Endpoints() []string
    25  	Close() error
    26  }
    27  
    28  // Client is a struct that embeds the `clientv3.Client` struct and the retry client config.
    29  type Client struct {
    30  	clientv3.Client
    31  	Config
    32  }
    33  
    34  // Config is a data structure used to configure the Client. The fields can be used in the
    35  // following ways:
    36  //
    37  // RequestTimeout: The timeout used for client requests
    38  //
    39  // InitialBackoff: The initial backoff period to wait between failed requests
    40  //
    41  // BackoffFactor: The factor by which to increase the backoff period after each failed attempt
    42  //
    43  // MaxRetries: The maximum number of retries to attempt before returning an error
    44  type Config struct {
    45  	RequestTimeout time.Duration
    46  	InitialBackoff time.Duration
    47  	BackoffFactor  float64
    48  	MaxRetries     int
    49  }
    50  
    51  // New creates a new instance of Client as a Retrier interface. If not provided via
    52  // the config then the Client will have the default args:
    53  //
    54  // RequestTimeout: 5 Seconds
    55  //
    56  // InitialBackoff: 0.5 Seconds
    57  //
    58  // BackoffFactor: 1.5
    59  //
    60  // MaxRetries: 3
    61  //
    62  // You can keep all of the defaults or overwrite individual values by only including the
    63  // values you wish to overwrite in the provided config. You can not set any of these fields
    64  // to their nil values because these will not overwrite the defaults.
    65  func New(client clientv3.Client, config Config) Retrier {
    66  	if config.RequestTimeout == 0*time.Second {
    67  		config.RequestTimeout = 5 * time.Second
    68  	}
    69  	if config.InitialBackoff == 0*time.Second {
    70  		config.InitialBackoff = 500 * time.Millisecond
    71  	}
    72  	if config.BackoffFactor == 0 {
    73  		config.BackoffFactor = 1.5
    74  	}
    75  	if config.MaxRetries == 0 {
    76  		config.MaxRetries = 3
    77  	}
    78  	return &Client{
    79  		client,
    80  		config,
    81  	}
    82  }
    83  
    84  // withRetry is a method to add retry (with timeout) capabilities to etcd client requests.
    85  // It will add a timeout to the target function, configured by the RequestTimeout config
    86  // parameter. It will continue to attempt to complete the target function until the
    87  // MaxRetries has been reached or a non-nil response returned from the target function.
    88  // It sleeps after each unsuccessful attempt, starting with the InitialBackoff duration.
    89  // This sleep duration will increase exponentially as it is multiplied by the BackoffFactor
    90  // after each unsuccessful attempt.
    91  func (r *Client) withRetry(ctx context.Context, fn func(ctx context.Context) error) error {
    92  	var err error
    93  	backoff := r.InitialBackoff
    94  	retries := 0
    95  
    96  	// continue trying the function until the MaxRetries is reached, or the
    97  	// target function returns a non-nil response
    98  	for {
    99  		ctx, cancel := context.WithTimeout(ctx, r.RequestTimeout)
   100  		defer cancel()
   101  		err = fn(ctx)
   102  		if err == nil {
   103  			break
   104  		}
   105  
   106  		if retries >= r.MaxRetries {
   107  			break
   108  		}
   109  		time.Sleep(backoff)
   110  
   111  		backoff = backoff * time.Duration(r.BackoffFactor)
   112  		retries++
   113  	}
   114  	return err
   115  }
   116  
   117  // SafeMemberList attempts to retrieve a list of etcd cluster members, with retries.
   118  func (r *Client) SafeMemberList(ctx context.Context) (*clientv3.MemberListResponse, error) {
   119  	var resp *clientv3.MemberListResponse
   120  	return resp, r.withRetry(ctx, func(ctx context.Context) error {
   121  		var err error
   122  		resp, err = r.MemberList(ctx)
   123  		return err
   124  	})
   125  }
   126  
   127  // SafeMemberAddAsLearner attempts to add a node as an etcd learner member, with retries.
   128  func (r *Client) SafeMemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
   129  	var resp *clientv3.MemberAddResponse
   130  	return resp, r.withRetry(ctx, func(ctx context.Context) error {
   131  		var err error
   132  		resp, err = r.MemberAddAsLearner(ctx, peerAddrs)
   133  		return err
   134  	})
   135  }
   136  
   137  // SafeMemberPromote attempts to promote an etcd learner member to a full etcd member, with retries.
   138  func (r *Client) SafeMemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) {
   139  	var resp *clientv3.MemberPromoteResponse
   140  	return resp, r.withRetry(ctx, func(ctx context.Context) error {
   141  		var err error
   142  		resp, err = r.MemberPromote(ctx, id)
   143  		return err
   144  	})
   145  }
   146  
   147  // SafeMemberRemove attempts to remove an etcd member from the etcd cluster, with retries.
   148  func (r *Client) SafeMemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) {
   149  	var resp *clientv3.MemberRemoveResponse
   150  	return resp, r.withRetry(ctx, func(ctx context.Context) error {
   151  		var err error
   152  		resp, err = r.MemberRemove(ctx, id)
   153  		return err
   154  	})
   155  }
   156  
   157  // SafeStatus attempts to retreve an etcd member's endpoint status, with retries.
   158  func (r *Client) SafeStatus(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) {
   159  	var resp *clientv3.StatusResponse
   160  	return resp, r.withRetry(ctx, func(ctx context.Context) error {
   161  		var err error
   162  		resp, err = r.Status(ctx, endpoint)
   163  		return err
   164  	})
   165  }
   166  
   167  // SafeAlarmList attempts to retrieve the list of active alarms in the etcd cluster, with retries
   168  func (r *Client) SafeAlarmList(ctx context.Context) (*clientv3.AlarmResponse, error) {
   169  	var resp *clientv3.AlarmResponse
   170  	return resp, r.withRetry(ctx, func(ctx context.Context) error {
   171  		var err error
   172  		resp, err = r.AlarmList(ctx)
   173  		return err
   174  	})
   175  }
   176  
   177  func (r *Client) Close() error {
   178  	return r.Client.Close()
   179  }
   180  

View as plain text