...

Source file src/k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient/idempotency.go

Documentation: k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiclient
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"time"
    23  
    24  	"github.com/pkg/errors"
    25  
    26  	apps "k8s.io/api/apps/v1"
    27  	v1 "k8s.io/api/core/v1"
    28  	rbac "k8s.io/api/rbac/v1"
    29  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    30  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    31  	"k8s.io/apimachinery/pkg/types"
    32  	"k8s.io/apimachinery/pkg/util/strategicpatch"
    33  	"k8s.io/apimachinery/pkg/util/wait"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  
    36  	kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
    37  	"k8s.io/kubernetes/cmd/kubeadm/app/constants"
    38  )
    39  
    40  // ConfigMapMutator is a function that mutates the given ConfigMap and optionally returns an error
    41  type ConfigMapMutator func(*v1.ConfigMap) error
    42  
    43  // apiCallRetryInterval holds a local copy of apiCallRetryInterval for testing purposes
    44  var apiCallRetryInterval = constants.KubernetesAPICallRetryInterval
    45  
    46  // TODO: We should invent a dynamic mechanism for this using the dynamic client instead of hard-coding these functions per-type
    47  
    48  // CreateOrUpdateConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
    49  func CreateOrUpdateConfigMap(client clientset.Interface, cm *v1.ConfigMap) error {
    50  	var lastError error
    51  	err := wait.PollUntilContextTimeout(context.Background(),
    52  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
    53  		true, func(_ context.Context) (bool, error) {
    54  			ctx := context.Background()
    55  			if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(ctx, cm, metav1.CreateOptions{}); err != nil {
    56  				if !apierrors.IsAlreadyExists(err) {
    57  					lastError = errors.Wrap(err, "unable to create ConfigMap")
    58  					return false, nil
    59  				}
    60  				if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Update(ctx, cm, metav1.UpdateOptions{}); err != nil {
    61  					lastError = errors.Wrap(err, "unable to update ConfigMap")
    62  					return false, nil
    63  				}
    64  			}
    65  			return true, nil
    66  		})
    67  	if err == nil {
    68  		return nil
    69  	}
    70  	return lastError
    71  }
    72  
    73  // CreateOrMutateConfigMap tries to create the ConfigMap provided as cm. If the resource exists already, the latest version will be fetched from
    74  // the cluster and mutator callback will be called on it, then an Update of the mutated ConfigMap will be performed. This function is resilient
    75  // to conflicts, and a retry will be issued if the ConfigMap was modified on the server between the refresh and the update (while the mutation was
    76  // taking place)
    77  func CreateOrMutateConfigMap(client clientset.Interface, cm *v1.ConfigMap, mutator ConfigMapMutator) error {
    78  	var lastError error
    79  	err := wait.PollUntilContextTimeout(context.Background(),
    80  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
    81  		true, func(_ context.Context) (bool, error) {
    82  			if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(context.Background(), cm, metav1.CreateOptions{}); err != nil {
    83  				lastError = err
    84  				if apierrors.IsAlreadyExists(err) {
    85  					lastError = mutateConfigMap(client, metav1.ObjectMeta{Namespace: cm.ObjectMeta.Namespace, Name: cm.ObjectMeta.Name}, mutator)
    86  					return lastError == nil, nil
    87  				}
    88  				return false, nil
    89  			}
    90  			return true, nil
    91  		})
    92  	if err == nil {
    93  		return nil
    94  	}
    95  	return lastError
    96  }
    97  
    98  // mutateConfigMap takes a ConfigMap Object Meta (namespace and name), retrieves the resource from the server and tries to mutate it
    99  // by calling to the mutator callback, then an Update of the mutated ConfigMap will be performed. This function is resilient
   100  // to conflicts, and a retry will be issued if the ConfigMap was modified on the server between the refresh and the update (while the mutation was
   101  // taking place).
   102  func mutateConfigMap(client clientset.Interface, meta metav1.ObjectMeta, mutator ConfigMapMutator) error {
   103  	ctx := context.Background()
   104  	configMap, err := client.CoreV1().ConfigMaps(meta.Namespace).Get(ctx, meta.Name, metav1.GetOptions{})
   105  	if err != nil {
   106  		return errors.Wrap(err, "unable to get ConfigMap")
   107  	}
   108  	if err = mutator(configMap); err != nil {
   109  		return errors.Wrap(err, "unable to mutate ConfigMap")
   110  	}
   111  	_, err = client.CoreV1().ConfigMaps(configMap.ObjectMeta.Namespace).Update(ctx, configMap, metav1.UpdateOptions{})
   112  	return err
   113  }
   114  
   115  // CreateOrRetainConfigMap creates a ConfigMap if the target resource doesn't exist. If the resource exists already, this function will retain the resource instead.
   116  func CreateOrRetainConfigMap(client clientset.Interface, cm *v1.ConfigMap, configMapName string) error {
   117  	var lastError error
   118  	err := wait.PollUntilContextTimeout(context.Background(),
   119  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
   120  		true, func(_ context.Context) (bool, error) {
   121  			ctx := context.Background()
   122  			if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Get(ctx, configMapName, metav1.GetOptions{}); err != nil {
   123  				if !apierrors.IsNotFound(err) {
   124  					lastError = errors.Wrap(err, "unable to get ConfigMap")
   125  					return false, nil
   126  				}
   127  				if _, err := client.CoreV1().ConfigMaps(cm.ObjectMeta.Namespace).Create(ctx, cm, metav1.CreateOptions{}); err != nil {
   128  					lastError = errors.Wrap(err, "unable to create ConfigMap")
   129  					return false, nil
   130  				}
   131  			}
   132  			return true, nil
   133  		})
   134  	if err == nil {
   135  		return nil
   136  	}
   137  	return lastError
   138  }
   139  
   140  // CreateOrUpdateSecret creates a Secret if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
   141  func CreateOrUpdateSecret(client clientset.Interface, secret *v1.Secret) error {
   142  	var lastError error
   143  	err := wait.PollUntilContextTimeout(context.Background(),
   144  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
   145  		true, func(_ context.Context) (bool, error) {
   146  			ctx := context.Background()
   147  			if _, err := client.CoreV1().Secrets(secret.ObjectMeta.Namespace).Create(ctx, secret, metav1.CreateOptions{}); err != nil {
   148  				if !apierrors.IsAlreadyExists(err) {
   149  					lastError = errors.Wrap(err, "unable to create Secret")
   150  					return false, nil
   151  				}
   152  				if _, err := client.CoreV1().Secrets(secret.ObjectMeta.Namespace).Update(ctx, secret, metav1.UpdateOptions{}); err != nil {
   153  					lastError = errors.Wrap(err, "unable to update Secret")
   154  					return false, nil
   155  				}
   156  			}
   157  			return true, nil
   158  		})
   159  	if err == nil {
   160  		return nil
   161  	}
   162  	return lastError
   163  }
   164  
   165  // CreateOrUpdateServiceAccount creates a ServiceAccount if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
   166  func CreateOrUpdateServiceAccount(client clientset.Interface, sa *v1.ServiceAccount) error {
   167  	var lastError error
   168  	err := wait.PollUntilContextTimeout(context.Background(),
   169  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
   170  		true, func(_ context.Context) (bool, error) {
   171  			ctx := context.Background()
   172  			if _, err := client.CoreV1().ServiceAccounts(sa.ObjectMeta.Namespace).Create(ctx, sa, metav1.CreateOptions{}); err != nil {
   173  				if !apierrors.IsAlreadyExists(err) {
   174  					lastError = errors.Wrap(err, "unable to create ServicAccount")
   175  					return false, nil
   176  				}
   177  				if _, err := client.CoreV1().ServiceAccounts(sa.ObjectMeta.Namespace).Update(ctx, sa, metav1.UpdateOptions{}); err != nil {
   178  					lastError = errors.Wrap(err, "unable to update ServicAccount")
   179  					return false, nil
   180  				}
   181  			}
   182  			return true, nil
   183  		})
   184  	if err == nil {
   185  		return nil
   186  	}
   187  	return lastError
   188  }
   189  
   190  // CreateOrUpdateDeployment creates a Deployment if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
   191  func CreateOrUpdateDeployment(client clientset.Interface, deploy *apps.Deployment) error {
   192  	var lastError error
   193  	err := wait.PollUntilContextTimeout(context.Background(),
   194  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
   195  		true, func(_ context.Context) (bool, error) {
   196  			ctx := context.Background()
   197  			if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Create(ctx, deploy, metav1.CreateOptions{}); err != nil {
   198  				if !apierrors.IsAlreadyExists(err) {
   199  					lastError = errors.Wrap(err, "unable to create Deployment")
   200  					return false, nil
   201  				}
   202  				if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Update(ctx, deploy, metav1.UpdateOptions{}); err != nil {
   203  					lastError = errors.Wrap(err, "unable to update Deployment")
   204  					return false, nil
   205  				}
   206  			}
   207  			return true, nil
   208  		})
   209  	if err == nil {
   210  		return nil
   211  	}
   212  	return lastError
   213  }
   214  
   215  // CreateOrRetainDeployment creates a Deployment if the target resource doesn't exist. If the resource exists already, this function will retain the resource instead.
   216  func CreateOrRetainDeployment(client clientset.Interface, deploy *apps.Deployment, deployName string) error {
   217  	var lastError error
   218  	err := wait.PollUntilContextTimeout(context.Background(),
   219  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
   220  		true, func(_ context.Context) (bool, error) {
   221  			ctx := context.Background()
   222  			if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Get(ctx, deployName, metav1.GetOptions{}); err != nil {
   223  				if !apierrors.IsNotFound(err) {
   224  					lastError = errors.Wrap(err, "unable to get Deployment")
   225  					return false, nil
   226  				}
   227  				if _, err := client.AppsV1().Deployments(deploy.ObjectMeta.Namespace).Create(ctx, deploy, metav1.CreateOptions{}); err != nil {
   228  					if !apierrors.IsAlreadyExists(err) {
   229  						lastError = errors.Wrap(err, "unable to create Deployment")
   230  						return false, nil
   231  					}
   232  				}
   233  			}
   234  			return true, nil
   235  		})
   236  	if err == nil {
   237  		return nil
   238  	}
   239  	return lastError
   240  }
   241  
   242  // CreateOrUpdateDaemonSet creates a DaemonSet if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
   243  func CreateOrUpdateDaemonSet(client clientset.Interface, ds *apps.DaemonSet) error {
   244  	var lastError error
   245  	err := wait.PollUntilContextTimeout(context.Background(),
   246  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
   247  		true, func(_ context.Context) (bool, error) {
   248  			ctx := context.Background()
   249  			if _, err := client.AppsV1().DaemonSets(ds.ObjectMeta.Namespace).Create(ctx, ds, metav1.CreateOptions{}); err != nil {
   250  				if !apierrors.IsAlreadyExists(err) {
   251  					lastError = errors.Wrap(err, "unable to create DaemonSet")
   252  					return false, nil
   253  				}
   254  				if _, err := client.AppsV1().DaemonSets(ds.ObjectMeta.Namespace).Update(ctx, ds, metav1.UpdateOptions{}); err != nil {
   255  					lastError = errors.Wrap(err, "unable to update DaemonSet")
   256  					return false, nil
   257  				}
   258  			}
   259  			return true, nil
   260  		})
   261  	if err == nil {
   262  		return nil
   263  	}
   264  	return lastError
   265  }
   266  
   267  // CreateOrUpdateRole creates a Role if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
   268  func CreateOrUpdateRole(client clientset.Interface, role *rbac.Role) error {
   269  	var lastError error
   270  	err := wait.PollUntilContextTimeout(context.Background(),
   271  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
   272  		true, func(_ context.Context) (bool, error) {
   273  			ctx := context.Background()
   274  			if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Create(ctx, role, metav1.CreateOptions{}); err != nil {
   275  				if !apierrors.IsAlreadyExists(err) {
   276  					lastError = errors.Wrap(err, "unable to create Role")
   277  					return false, nil
   278  				}
   279  				if _, err := client.RbacV1().Roles(role.ObjectMeta.Namespace).Update(ctx, role, metav1.UpdateOptions{}); err != nil {
   280  					lastError = errors.Wrap(err, "unable to update Role")
   281  					return false, nil
   282  				}
   283  			}
   284  			return true, nil
   285  		})
   286  	if err == nil {
   287  		return nil
   288  	}
   289  	return lastError
   290  }
   291  
   292  // CreateOrUpdateRoleBinding creates a RoleBinding if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
   293  func CreateOrUpdateRoleBinding(client clientset.Interface, roleBinding *rbac.RoleBinding) error {
   294  	var lastError error
   295  	err := wait.PollUntilContextTimeout(context.Background(),
   296  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
   297  		true, func(_ context.Context) (bool, error) {
   298  			ctx := context.Background()
   299  			if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Create(ctx, roleBinding, metav1.CreateOptions{}); err != nil {
   300  				if !apierrors.IsAlreadyExists(err) {
   301  					lastError = errors.Wrap(err, "unable to create RoleBinding")
   302  					return false, nil
   303  				}
   304  				if _, err := client.RbacV1().RoleBindings(roleBinding.ObjectMeta.Namespace).Update(ctx, roleBinding, metav1.UpdateOptions{}); err != nil {
   305  					lastError = errors.Wrap(err, "unable to update RoleBinding")
   306  					return false, nil
   307  				}
   308  			}
   309  			return true, nil
   310  		})
   311  	if err == nil {
   312  		return nil
   313  	}
   314  	return lastError
   315  }
   316  
   317  // CreateOrUpdateClusterRole creates a ClusterRole if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
   318  func CreateOrUpdateClusterRole(client clientset.Interface, clusterRole *rbac.ClusterRole) error {
   319  	var lastError error
   320  	err := wait.PollUntilContextTimeout(context.Background(),
   321  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
   322  		true, func(_ context.Context) (bool, error) {
   323  			ctx := context.Background()
   324  			if _, err := client.RbacV1().ClusterRoles().Create(ctx, clusterRole, metav1.CreateOptions{}); err != nil {
   325  				if !apierrors.IsAlreadyExists(err) {
   326  					lastError = errors.Wrap(err, "unable to create ClusterRole")
   327  					return false, nil
   328  				}
   329  				if _, err := client.RbacV1().ClusterRoles().Update(ctx, clusterRole, metav1.UpdateOptions{}); err != nil {
   330  					lastError = errors.Wrap(err, "unable to update ClusterRole")
   331  					return false, nil
   332  				}
   333  			}
   334  			return true, nil
   335  		})
   336  	if err == nil {
   337  		return nil
   338  	}
   339  	return lastError
   340  }
   341  
   342  // CreateOrUpdateClusterRoleBinding creates a ClusterRoleBinding if the target resource doesn't exist. If the resource exists already, this function will update the resource instead.
   343  func CreateOrUpdateClusterRoleBinding(client clientset.Interface, clusterRoleBinding *rbac.ClusterRoleBinding) error {
   344  	var lastError error
   345  	err := wait.PollUntilContextTimeout(context.Background(),
   346  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
   347  		true, func(_ context.Context) (bool, error) {
   348  			ctx := context.Background()
   349  			if _, err := client.RbacV1().ClusterRoleBindings().Create(ctx, clusterRoleBinding, metav1.CreateOptions{}); err != nil {
   350  				if !apierrors.IsAlreadyExists(err) {
   351  					lastError = errors.Wrap(err, "unable to create ClusterRoleBinding")
   352  					return false, nil
   353  				}
   354  				if _, err := client.RbacV1().ClusterRoleBindings().Update(ctx, clusterRoleBinding, metav1.UpdateOptions{}); err != nil {
   355  					lastError = errors.Wrap(err, "unable to update ClusterRoleBinding")
   356  					return false, nil
   357  				}
   358  			}
   359  			return true, nil
   360  		})
   361  	if err == nil {
   362  		return nil
   363  	}
   364  	return lastError
   365  }
   366  
   367  // PatchNodeOnce executes patchFn on the node object found by the node name.
   368  func PatchNodeOnce(client clientset.Interface, nodeName string, patchFn func(*v1.Node), lastError *error) func(context.Context) (bool, error) {
   369  	return func(_ context.Context) (bool, error) {
   370  		// First get the node object
   371  		ctx := context.Background()
   372  		n, err := client.CoreV1().Nodes().Get(ctx, nodeName, metav1.GetOptions{})
   373  		if err != nil {
   374  			*lastError = err
   375  			return false, nil // retry on any error
   376  		}
   377  
   378  		// The node may appear to have no labels at first,
   379  		// so we wait for it to get hostname label.
   380  		if _, found := n.ObjectMeta.Labels[v1.LabelHostname]; !found {
   381  			return false, nil
   382  		}
   383  
   384  		oldData, err := json.Marshal(n)
   385  		if err != nil {
   386  			*lastError = errors.Wrapf(err, "failed to marshal unmodified node %q into JSON", n.Name)
   387  			return false, *lastError
   388  		}
   389  
   390  		// Execute the mutating function
   391  		patchFn(n)
   392  
   393  		newData, err := json.Marshal(n)
   394  		if err != nil {
   395  			*lastError = errors.Wrapf(err, "failed to marshal modified node %q into JSON", n.Name)
   396  			return false, *lastError
   397  		}
   398  
   399  		patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
   400  		if err != nil {
   401  			*lastError = errors.Wrap(err, "failed to create two way merge patch")
   402  			return false, *lastError
   403  		}
   404  
   405  		if _, err := client.CoreV1().Nodes().Patch(ctx, n.Name, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil {
   406  			*lastError = errors.Wrapf(err, "error patching Node %q", n.Name)
   407  			if apierrors.IsTimeout(err) || apierrors.IsConflict(err) || apierrors.IsServerTimeout(err) || apierrors.IsServiceUnavailable(err) {
   408  				return false, nil
   409  			}
   410  			return false, *lastError
   411  		}
   412  
   413  		return true, nil
   414  	}
   415  }
   416  
   417  // PatchNode tries to patch a node using patchFn for the actual mutating logic.
   418  // Retries are provided by the wait package.
   419  func PatchNode(client clientset.Interface, nodeName string, patchFn func(*v1.Node)) error {
   420  	var lastError error
   421  	err := wait.PollUntilContextTimeout(context.Background(),
   422  		apiCallRetryInterval, kubeadmapi.GetActiveTimeouts().KubernetesAPICall.Duration,
   423  		true, PatchNodeOnce(client, nodeName, patchFn, &lastError))
   424  	if err == nil {
   425  		return nil
   426  	}
   427  	return lastError
   428  }
   429  
   430  // GetConfigMapWithShortRetry tries to retrieve a ConfigMap using the given client, retrying for a short
   431  // time if it gets an unexpected error. The main usage of this function is in areas of the code that
   432  // fallback to a default ConfigMap value in case the one from the API cannot be quickly obtained.
   433  func GetConfigMapWithShortRetry(client clientset.Interface, namespace, name string) (*v1.ConfigMap, error) {
   434  	var cm *v1.ConfigMap
   435  	var lastError error
   436  	err := wait.PollUntilContextTimeout(context.Background(),
   437  		time.Millisecond*50, time.Millisecond*350,
   438  		true, func(_ context.Context) (bool, error) {
   439  			var err error
   440  			// Intentionally pass a new context to this API call. This will let the API call run
   441  			// independently of the parent context timeout, which is quite short and can cause the API
   442  			// call to return abruptly.
   443  			cm, err = client.CoreV1().ConfigMaps(namespace).Get(context.Background(), name, metav1.GetOptions{})
   444  			if err == nil {
   445  				return true, nil
   446  			}
   447  			lastError = err
   448  			return false, nil
   449  		})
   450  	if err == nil {
   451  		return cm, nil
   452  	}
   453  	return nil, lastError
   454  }
   455  

View as plain text