1
2
3 package retry
4
5 import (
6 "context"
7 "time"
8
9 clientv3 "go.etcd.io/etcd/client/v3"
10 )
11
12
13
14
15
16
17 type Retrier interface {
18 SafeMemberList(ctx context.Context) (*clientv3.MemberListResponse, error)
19 SafeMemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error)
20 SafeMemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error)
21 SafeMemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error)
22 SafeStatus(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error)
23 SafeAlarmList(ctx context.Context) (*clientv3.AlarmResponse, error)
24 Endpoints() []string
25 Close() error
26 }
27
28
29 type Client struct {
30 clientv3.Client
31 Config
32 }
33
34
35
36
37
38
39
40
41
42
43
44 type Config struct {
45 RequestTimeout time.Duration
46 InitialBackoff time.Duration
47 BackoffFactor float64
48 MaxRetries int
49 }
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65 func New(client clientv3.Client, config Config) Retrier {
66 if config.RequestTimeout == 0*time.Second {
67 config.RequestTimeout = 5 * time.Second
68 }
69 if config.InitialBackoff == 0*time.Second {
70 config.InitialBackoff = 500 * time.Millisecond
71 }
72 if config.BackoffFactor == 0 {
73 config.BackoffFactor = 1.5
74 }
75 if config.MaxRetries == 0 {
76 config.MaxRetries = 3
77 }
78 return &Client{
79 client,
80 config,
81 }
82 }
83
84
85
86
87
88
89
90
91 func (r *Client) withRetry(ctx context.Context, fn func(ctx context.Context) error) error {
92 var err error
93 backoff := r.InitialBackoff
94 retries := 0
95
96
97
98 for {
99 ctx, cancel := context.WithTimeout(ctx, r.RequestTimeout)
100 defer cancel()
101 err = fn(ctx)
102 if err == nil {
103 break
104 }
105
106 if retries >= r.MaxRetries {
107 break
108 }
109 time.Sleep(backoff)
110
111 backoff = backoff * time.Duration(r.BackoffFactor)
112 retries++
113 }
114 return err
115 }
116
117
118 func (r *Client) SafeMemberList(ctx context.Context) (*clientv3.MemberListResponse, error) {
119 var resp *clientv3.MemberListResponse
120 return resp, r.withRetry(ctx, func(ctx context.Context) error {
121 var err error
122 resp, err = r.MemberList(ctx)
123 return err
124 })
125 }
126
127
128 func (r *Client) SafeMemberAddAsLearner(ctx context.Context, peerAddrs []string) (*clientv3.MemberAddResponse, error) {
129 var resp *clientv3.MemberAddResponse
130 return resp, r.withRetry(ctx, func(ctx context.Context) error {
131 var err error
132 resp, err = r.MemberAddAsLearner(ctx, peerAddrs)
133 return err
134 })
135 }
136
137
138 func (r *Client) SafeMemberPromote(ctx context.Context, id uint64) (*clientv3.MemberPromoteResponse, error) {
139 var resp *clientv3.MemberPromoteResponse
140 return resp, r.withRetry(ctx, func(ctx context.Context) error {
141 var err error
142 resp, err = r.MemberPromote(ctx, id)
143 return err
144 })
145 }
146
147
148 func (r *Client) SafeMemberRemove(ctx context.Context, id uint64) (*clientv3.MemberRemoveResponse, error) {
149 var resp *clientv3.MemberRemoveResponse
150 return resp, r.withRetry(ctx, func(ctx context.Context) error {
151 var err error
152 resp, err = r.MemberRemove(ctx, id)
153 return err
154 })
155 }
156
157
158 func (r *Client) SafeStatus(ctx context.Context, endpoint string) (*clientv3.StatusResponse, error) {
159 var resp *clientv3.StatusResponse
160 return resp, r.withRetry(ctx, func(ctx context.Context) error {
161 var err error
162 resp, err = r.Status(ctx, endpoint)
163 return err
164 })
165 }
166
167
168 func (r *Client) SafeAlarmList(ctx context.Context) (*clientv3.AlarmResponse, error) {
169 var resp *clientv3.AlarmResponse
170 return resp, r.withRetry(ctx, func(ctx context.Context) error {
171 var err error
172 resp, err = r.AlarmList(ctx)
173 return err
174 })
175 }
176
177 func (r *Client) Close() error {
178 return r.Client.Close()
179 }
180
View as plain text