1
16
17 package apiclient
18
19 import (
20 "context"
21 "encoding/json"
22 "time"
23
24 "github.com/pkg/errors"
25
26 apps "k8s.io/api/apps/v1"
27 v1 "k8s.io/api/core/v1"
28 rbac "k8s.io/api/rbac/v1"
29 apierrors "k8s.io/apimachinery/pkg/api/errors"
30 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
31 "k8s.io/apimachinery/pkg/types"
32 "k8s.io/apimachinery/pkg/util/strategicpatch"
33 "k8s.io/apimachinery/pkg/util/wait"
34 clientset "k8s.io/client-go/kubernetes"
35
36 kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
37 "k8s.io/kubernetes/cmd/kubeadm/app/constants"
38 )
39
40
41 type ConfigMapMutator func(*v1.ConfigMap) error
42
43
44 var apiCallRetryInterval = constants.KubernetesAPICallRetryInterval
45
46
47
48
49 func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error {
50 var lastError error
51 err := wait.PollUntilContextTimeout(context.Background(),
52 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
53 true, func(_ context.Context) (bool, error) {
54 ctx := context.Background()
55 if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(ctx, cm, metav1.CreateOptions{}); err != nil {
56 if !apierrors.IsAlreadyExists(err) {
57 lastError = errors.Wrap(err, "unable to create ConfigMap")
58 return false, nil
59 }
60 if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Update(ctx, cm, metav1.UpdateOptions{}); err != nil {
61 lastError = errors.Wrap(err, "unable to update ConfigMap")
62 return false, nil
63 }
64 }
65 return true, nil
66 })
67 if err == nil {
68 return nil
69 }
70 return lastError
71 }
72
73
74
75
76
77 func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutator ConfigMapMutator) error {
78 var lastError error
79 err := wait.PollUntilContextTimeout(context.Background(),
80 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
81 true, func(_ context.Context) (bool, error) {
82 if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.Background(), cm, metav1.CreateOptions{}); err != nil {
83 lastError = err
84 if apierrors.IsAlreadyExists(err) {
85 lastError = mutateConfigMap(client, metav1.ObjectMeta{Namespace: cm.ObjectMeta.Namespace, Name: cm.ObjectMeta.Name}, mutator)
86 return lastError == nil, nil
87 }
88 return false, nil
89 }
90 return true, nil
91 })
92 if err == nil {
93 return nil
94 }
95 return lastError
96 }
97
98
99
100
101
102 func mutateConfigMap(client clientset.Interface, meta metav1.ObjectMeta, mutator ConfigMapMutator) error {
103 ctx := context.Background()
104 configMap, err := client.CoreV1().ConfigMaps(meta.Namespace).Get(ctx, meta.Name, metav1.GetOptions{})
105 if err != nil {
106 return errors.Wrap(err, "unable to get ConfigMap")
107 }
108 if err = mutator(configMap); err != nil {
109 return errors.Wrap(err, "unable to mutate ConfigMap")
110 }
111 _, err = client.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Update(ctx, configMap, metav1.UpdateOptions{})
112 return err
113 }
114
115
116 func CreateOrRetainConfigMap(client clientset.Interface, cm *v1.ConfigMap, configMapName string) error {
117 var lastError error
118 err := wait.PollUntilContextTimeout(context.Background(),
119 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
120 true, func(_ context.Context) (bool, error) {
121 ctx := context.Background()
122 if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Get(ctx, configMapName, metav1.GetOptions{}); err != nil {
123 if !apierrors.IsNotFound(err) {
124 lastError = errors.Wrap(err, "unable to get ConfigMap")
125 return false, nil
126 }
127 if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(ctx, cm, metav1.CreateOptions{}); err != nil {
128 lastError = errors.Wrap(err, "unable to create ConfigMap")
129 return false, nil
130 }
131 }
132 return true, nil
133 })
134 if err == nil {
135 return nil
136 }
137 return lastError
138 }
139
140
141 func CreateOrUpdateSecret(client clientset.Interface, secret *v1.Secret) error {
142 var lastError error
143 err := wait.PollUntilContextTimeout(context.Background(),
144 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
145 true, func(_ context.Context) (bool, error) {
146 ctx := context.Background()
147 if _, err := client.CoreV1().Secrets(secret.ObjectMeta.Namespace).Create(ctx, secret, metav1.CreateOptions{}); err != nil {
148 if !apierrors.IsAlreadyExists(err) {
149 lastError = errors.Wrap(err, "unable to create Secret")
150 return false, nil
151 }
152 if _, err := client.CoreV1().Secrets(secret.ObjectMeta.Namespace).Update(ctx, secret, metav1.UpdateOptions{}); err != nil {
153 lastError = errors.Wrap(err, "unable to update Secret")
154 return false, nil
155 }
156 }
157 return true, nil
158 })
159 if err == nil {
160 return nil
161 }
162 return lastError
163 }
164
165
166 func CreateOrUpdateServiceAccount(client clientset.Interface, sa *v1.ServiceAccount) error {
167 var lastError error
168 err := wait.PollUntilContextTimeout(context.Background(),
169 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
170 true, func(_ context.Context) (bool, error) {
171 ctx := context.Background()
172 if _, err := client.CoreV1().ServiceAccounts(sa.ObjectMeta.Namespace).Create(ctx, sa, metav1.CreateOptions{}); err != nil {
173 if !apierrors.IsAlreadyExists(err) {
174 lastError = errors.Wrap(err, "unable to create ServicAccount")
175 return false, nil
176 }
177 if _, err := client.CoreV1().ServiceAccounts(sa.ObjectMeta.Namespace).Update(ctx, sa, metav1.UpdateOptions{}); err != nil {
178 lastError = errors.Wrap(err, "unable to update ServicAccount")
179 return false, nil
180 }
181 }
182 return true, nil
183 })
184 if err == nil {
185 return nil
186 }
187 return lastError
188 }
189
190
191 func CreateOrUpdateDeployment(client clientset.Interface, deploy *apps.Deployment) error {
192 var lastError error
193 err := wait.PollUntilContextTimeout(context.Background(),
194 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
195 true, func(_ context.Context) (bool, error) {
196 ctx := context.Background()
197 if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Create(ctx, deploy, metav1.CreateOptions{}); err != nil {
198 if !apierrors.IsAlreadyExists(err) {
199 lastError = errors.Wrap(err, "unable to create Deployment")
200 return false, nil
201 }
202 if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Update(ctx, deploy, metav1.UpdateOptions{}); err != nil {
203 lastError = errors.Wrap(err, "unable to update Deployment")
204 return false, nil
205 }
206 }
207 return true, nil
208 })
209 if err == nil {
210 return nil
211 }
212 return lastError
213 }
214
215
216 func CreateOrRetainDeployment(client clientset.Interface, deploy *apps.Deployment, deployName string) error {
217 var lastError error
218 err := wait.PollUntilContextTimeout(context.Background(),
219 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
220 true, func(_ context.Context) (bool, error) {
221 ctx := context.Background()
222 if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Get(ctx, deployName, metav1.GetOptions{}); err != nil {
223 if !apierrors.IsNotFound(err) {
224 lastError = errors.Wrap(err, "unable to get Deployment")
225 return false, nil
226 }
227 if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Create(ctx, deploy, metav1.CreateOptions{}); err != nil {
228 if !apierrors.IsAlreadyExists(err) {
229 lastError = errors.Wrap(err, "unable to create Deployment")
230 return false, nil
231 }
232 }
233 }
234 return true, nil
235 })
236 if err == nil {
237 return nil
238 }
239 return lastError
240 }
241
242
243 func CreateOrUpdateDaemonSet(client clientset.Interface, ds *apps.DaemonSet) error {
244 var lastError error
245 err := wait.PollUntilContextTimeout(context.Background(),
246 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
247 true, func(_ context.Context) (bool, error) {
248 ctx := context.Background()
249 if _, err := client.AppsV1().DaemonSets(ds.ObjectMeta.Namespace).Create(ctx, ds, metav1.CreateOptions{}); err != nil {
250 if !apierrors.IsAlreadyExists(err) {
251 lastError = errors.Wrap(err, "unable to create DaemonSet")
252 return false, nil
253 }
254 if _, err := client.AppsV1().DaemonSets(ds.ObjectMeta.Namespace).Update(ctx, ds, metav1.UpdateOptions{}); err != nil {
255 lastError = errors.Wrap(err, "unable to update DaemonSet")
256 return false, nil
257 }
258 }
259 return true, nil
260 })
261 if err == nil {
262 return nil
263 }
264 return lastError
265 }
266
267
268 func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error {
269 var lastError error
270 err := wait.PollUntilContextTimeout(context.Background(),
271 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
272 true, func(_ context.Context) (bool, error) {
273 ctx := context.Background()
274 if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Create(ctx, role, metav1.CreateOptions{}); err != nil {
275 if !apierrors.IsAlreadyExists(err) {
276 lastError = errors.Wrap(err, "unable to create Role")
277 return false, nil
278 }
279 if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Update(ctx, role, metav1.UpdateOptions{}); err != nil {
280 lastError = errors.Wrap(err, "unable to update Role")
281 return false, nil
282 }
283 }
284 return true, nil
285 })
286 if err == nil {
287 return nil
288 }
289 return lastError
290 }
291
292
293 func CreateOrUpdateRoleBinding(client clientset.Interface, roleBinding *rbac.RoleBinding) error {
294 var lastError error
295 err := wait.PollUntilContextTimeout(context.Background(),
296 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
297 true, func(_ context.Context) (bool, error) {
298 ctx := context.Background()
299 if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Create(ctx, roleBinding, metav1.CreateOptions{}); err != nil {
300 if !apierrors.IsAlreadyExists(err) {
301 lastError = errors.Wrap(err, "unable to create RoleBinding")
302 return false, nil
303 }
304 if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Update(ctx, roleBinding, metav1.UpdateOptions{}); err != nil {
305 lastError = errors.Wrap(err, "unable to update RoleBinding")
306 return false, nil
307 }
308 }
309 return true, nil
310 })
311 if err == nil {
312 return nil
313 }
314 return lastError
315 }
316
317
318 func CreateOrUpdateClusterRole(client clientset.Interface, clusterRole *rbac.ClusterRole) error {
319 var lastError error
320 err := wait.PollUntilContextTimeout(context.Background(),
321 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
322 true, func(_ context.Context) (bool, error) {
323 ctx := context.Background()
324 if _, err := client.RbacV1().ClusterRoles().Create(ctx, clusterRole, metav1.CreateOptions{}); err != nil {
325 if !apierrors.IsAlreadyExists(err) {
326 lastError = errors.Wrap(err, "unable to create ClusterRole")
327 return false, nil
328 }
329 if _, err := client.RbacV1().ClusterRoles().Update(ctx, clusterRole, metav1.UpdateOptions{}); err != nil {
330 lastError = errors.Wrap(err, "unable to update ClusterRole")
331 return false, nil
332 }
333 }
334 return true, nil
335 })
336 if err == nil {
337 return nil
338 }
339 return lastError
340 }
341
342
343 func CreateOrUpdateClusterRoleBinding(client clientset.Interface, clusterRoleBinding *rbac.ClusterRoleBinding) error {
344 var lastError error
345 err := wait.PollUntilContextTimeout(context.Background(),
346 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
347 true, func(_ context.Context) (bool, error) {
348 ctx := context.Background()
349 if _, err := client.RbacV1().ClusterRoleBindings().Create(ctx, clusterRoleBinding, metav1.CreateOptions{}); err != nil {
350 if !apierrors.IsAlreadyExists(err) {
351 lastError = errors.Wrap(err, "unable to create ClusterRoleBinding")
352 return false, nil
353 }
354 if _, err := client.RbacV1().ClusterRoleBindings().Update(ctx, clusterRoleBinding, metav1.UpdateOptions{}); err != nil {
355 lastError = errors.Wrap(err, "unable to update ClusterRoleBinding")
356 return false, nil
357 }
358 }
359 return true, nil
360 })
361 if err == nil {
362 return nil
363 }
364 return lastError
365 }
366
367
368 func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1.Node), lastError *error) func(context.Context) (bool, error) {
369 return func(_ context.Context) (bool, error) {
370
371 ctx := context.Background()
372 n, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
373 if err != nil {
374 *lastError = err
375 return false, nil
376 }
377
378
379
380 if _, found := n.ObjectMeta.Labels[v1.LabelHostname]; !found {
381 return false, nil
382 }
383
384 oldData, err := json.Marshal(n)
385 if err != nil {
386 *lastError = errors.Wrapf(err, "failed to marshal unmodified node %q into JSON", n.Name)
387 return false, *lastError
388 }
389
390
391 patchFn(n)
392
393 newData, err := json.Marshal(n)
394 if err != nil {
395 *lastError = errors.Wrapf(err, "failed to marshal modified node %q into JSON", n.Name)
396 return false, *lastError
397 }
398
399 patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
400 if err != nil {
401 *lastError = errors.Wrap(err, "failed to create two way merge patch")
402 return false, *lastError
403 }
404
405 if _, err := client.CoreV1().Nodes().Patch(ctx, n.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
406 *lastError = errors.Wrapf(err, "error patching Node %q", n.Name)
407 if apierrors.IsTimeout(err) || apierrors.IsConflict(err) || apierrors.IsServerTimeout(err) || apierrors.IsServiceUnavailable(err) {
408 return false, nil
409 }
410 return false, *lastError
411 }
412
413 return true, nil
414 }
415 }
416
417
418
419 func PatchNode(client clientset.Interface, nodeName string, patchFn func(*v1.Node)) error {
420 var lastError error
421 err := wait.PollUntilContextTimeout(context.Background(),
422 apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
423 true, PatchNodeOnce(client, nodeName, patchFn, &lastError))
424 if err == nil {
425 return nil
426 }
427 return lastError
428 }
429
430
431
432
433 func GetConfigMapWithShortRetry(client clientset.Interface, namespace, name string) (*v1.ConfigMap, error) {
434 var cm *v1.ConfigMap
435 var lastError error
436 err := wait.PollUntilContextTimeout(context.Background(),
437 time.Millisecond*50, time.Millisecond*350,
438 true, func(_ context.Context) (bool, error) {
439 var err error
440
441
442
443 cm, err = client.CoreV1().ConfigMaps(namespace).Get(context.Background(), name, metav1.GetOptions{})
444 if err == nil {
445 return true, nil
446 }
447 lastError = err
448 return false, nil
449 })
450 if err == nil {
451 return cm, nil
452 }
453 return nil, lastError
454 }
455
View as plain text