1
2
3 package retryclient
4
5 import (
6 "context"
7 "fmt"
8 "time"
9
10 ctrlclient "sigs.k8s.io/controller-runtime/pkg/client"
11
12 "edge-infra.dev/pkg/sds/lib/k8s/retryclient/types"
13 )
14
15
16 type RetryClient struct {
17 client ctrlclient.Client
18 reader ctrlclient.Reader
19 Config
20 useCache bool
21 }
22
23
24
25
26
27
28
29
30
31
32
33 type Config struct {
34 RequestTimeout time.Duration
35 InitialBackoff time.Duration
36 BackoffFactor float64
37 MaxRetries int
38 }
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54 func New(client ctrlclient.Client, reader ctrlclient.Reader, config Config) types.Retrier {
55 if config.RequestTimeout == 0*time.Second {
56 config.RequestTimeout = 5 * time.Second
57 }
58 if config.InitialBackoff == 0*time.Second {
59 config.InitialBackoff = 500 * time.Millisecond
60 }
61 if config.BackoffFactor == 0 {
62 config.BackoffFactor = 1
63 }
64 if config.MaxRetries == 0 {
65 config.MaxRetries = 3
66 }
67 return &RetryClient{
68 client,
69 reader,
70 config,
71 true,
72 }
73 }
74
75
76
77
78
79
80
81
82 func (r *RetryClient) withRetry(ctx context.Context, fn func(ctx context.Context) error) error {
83 var err error
84 backoff := r.InitialBackoff
85 retries := 0
86
87
88
89 for {
90 ctx, cancel := context.WithTimeout(ctx, r.RequestTimeout)
91 defer cancel()
92 err = fn(ctx)
93 if err == nil {
94 break
95 }
96
97 if retries >= r.MaxRetries {
98 break
99 }
100
101 time.Sleep(backoff)
102 backoff = time.Duration(float64(backoff) * r.BackoffFactor)
103 retries++
104 }
105 return err
106 }
107
108
109 func (r *RetryClient) SafeGet(ctx context.Context, objKey ctrlclient.ObjectKey, obj ctrlclient.Object, opts ...ctrlclient.GetOption) error {
110 return r.withRetry(ctx, func(ctx context.Context) error {
111 if r.useCache {
112 return r.client.Get(ctx, objKey, obj, opts...)
113 }
114 return r.reader.Get(ctx, objKey, obj, opts...)
115 })
116 }
117
118
119 func (r *RetryClient) SafeList(ctx context.Context, list ctrlclient.ObjectList, opts ...ctrlclient.ListOption) error {
120 return r.withRetry(ctx, func(ctx context.Context) error {
121 if r.useCache {
122 return r.client.List(ctx, list, opts...)
123 }
124 return r.reader.List(ctx, list, opts...)
125 })
126 }
127
128
129 func (r *RetryClient) SafeCreate(ctx context.Context, obj ctrlclient.Object, opts ...ctrlclient.CreateOption) error {
130 return r.withRetry(ctx, func(ctx context.Context) error {
131 return r.client.Create(ctx, obj, opts...)
132 })
133 }
134
135
136 func (r *RetryClient) SafeDelete(ctx context.Context, obj ctrlclient.Object, opts ...ctrlclient.DeleteOption) error {
137 return r.withRetry(ctx, func(ctx context.Context) error {
138 return r.client.Delete(ctx, obj, opts...)
139 })
140 }
141
142
143
144 func (r *RetryClient) SafeUpdate(ctx context.Context, objKey ctrlclient.ObjectKey, obj ctrlclient.Object, fn func(context.Context, ctrlclient.Object) error, updateOpts ...ctrlclient.UpdateOption) error {
145 return r.withRetry(ctx, func(ctx context.Context) error {
146 var err error
147 switch r.useCache {
148 case true:
149 err = r.client.Get(ctx, objKey, obj)
150 case false:
151 err = r.reader.Get(ctx, objKey, obj)
152 }
153 if err != nil {
154 return fmt.Errorf("failed to get Kubernetes object: %w", err)
155 }
156
157 if err := fn(ctx, obj); err != nil {
158 return err
159 }
160
161 return r.client.Update(ctx, obj, updateOpts...)
162 })
163 }
164
165
166
167
168
169
170
171 func (r RetryClient) IgnoreCache() types.Retrier {
172 r.useCache = false
173 return &r
174 }
175
176
177
178 func (r *RetryClient) Client() ctrlclient.Client {
179 return r.client
180 }
181
182
183
184 func (r *RetryClient) Reader() ctrlclient.Reader {
185 return r.reader
186 }
187
View as plain text