...

Source file src/k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient/wait.go

Documentation: k8s.io/kubernetes/cmd/kubeadm/app/util/apiclient

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apiclient
    18  
    19  import (
    20  	"context"
    21  	"crypto/tls"
    22  	"fmt"
    23  	"io"
    24  	"net/http"
    25  	"time"
    26  
    27  	"github.com/pkg/errors"
    28  
    29  	v1 "k8s.io/api/core/v1"
    30  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    31  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    32  	utilerrors "k8s.io/apimachinery/pkg/util/errors"
    33  	netutil "k8s.io/apimachinery/pkg/util/net"
    34  	"k8s.io/apimachinery/pkg/util/wait"
    35  	clientset "k8s.io/client-go/kubernetes"
    36  
    37  	kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
    38  	"k8s.io/kubernetes/cmd/kubeadm/app/constants"
    39  )
    40  
    41  // Waiter is an interface for waiting for criteria in Kubernetes to happen
    42  type Waiter interface {
    43  	// WaitForControlPlaneComponents waits for all control plane components to report "ok" on /healthz
    44  	WaitForControlPlaneComponents(cfg *kubeadmapi.ClusterConfiguration) error
    45  	// WaitForAPI waits for the API Server's /healthz endpoint to become "ok"
    46  	// TODO: remove WaitForAPI once WaitForAllControlPlaneComponents goes GA:
    47  	// https://github.com/kubernetes/kubeadm/issues/2907
    48  	WaitForAPI() error
    49  	// WaitForPodsWithLabel waits for Pods in the kube-system namespace to become Ready
    50  	WaitForPodsWithLabel(kvLabel string) error
    51  	// WaitForPodToDisappear waits for the given Pod in the kube-system namespace to be deleted
    52  	WaitForPodToDisappear(staticPodName string) error
    53  	// WaitForStaticPodSingleHash fetches sha256 hash for the control plane static pod
    54  	WaitForStaticPodSingleHash(nodeName string, component string) (string, error)
    55  	// WaitForStaticPodHashChange waits for the given static pod component's static pod hash to get updated.
    56  	// By doing that we can be sure that the kubelet has restarted the given Static Pod
    57  	WaitForStaticPodHashChange(nodeName, component, previousHash string) error
    58  	// WaitForStaticPodControlPlaneHashes fetches sha256 hashes for the control plane static pods
    59  	WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error)
    60  	// WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'
    61  	WaitForKubelet() error
    62  	// SetTimeout adjusts the timeout to the specified duration
    63  	SetTimeout(timeout time.Duration)
    64  }
    65  
    66  // KubeWaiter is an implementation of Waiter that is backed by a Kubernetes client
    67  type KubeWaiter struct {
    68  	client  clientset.Interface
    69  	timeout time.Duration
    70  	writer  io.Writer
    71  }
    72  
    73  // NewKubeWaiter returns a new Waiter object that talks to the given Kubernetes cluster
    74  func NewKubeWaiter(client clientset.Interface, timeout time.Duration, writer io.Writer) Waiter {
    75  	return &KubeWaiter{
    76  		client:  client,
    77  		timeout: timeout,
    78  		writer:  writer,
    79  	}
    80  }
    81  
    82  type controlPlaneComponent struct {
    83  	name string
    84  	url  string
    85  }
    86  
    87  // getControlPlaneComponents takes a ClusterConfiguration and returns a slice of
    88  // control plane components and their secure ports.
    89  func getControlPlaneComponents(cfg *kubeadmapi.ClusterConfiguration) []controlPlaneComponent {
    90  	portArg := "secure-port"
    91  	portAPIServer, idx := kubeadmapi.GetArgValue(cfg.APIServer.ExtraArgs, portArg, -1)
    92  	if idx == -1 {
    93  		portAPIServer = "6443"
    94  	}
    95  	portKCM, idx := kubeadmapi.GetArgValue(cfg.ControllerManager.ExtraArgs, portArg, -1)
    96  	if idx == -1 {
    97  		portKCM = "10257"
    98  	}
    99  	portScheduler, idx := kubeadmapi.GetArgValue(cfg.Scheduler.ExtraArgs, portArg, -1)
   100  	if idx == -1 {
   101  		portScheduler = "10259"
   102  	}
   103  	urlFormat := "https://127.0.0.1:%s/healthz"
   104  	return []controlPlaneComponent{
   105  		{name: "kube-apiserver", url: fmt.Sprintf(urlFormat, portAPIServer)},
   106  		{name: "kube-controller-manager", url: fmt.Sprintf(urlFormat, portKCM)},
   107  		{name: "kube-scheduler", url: fmt.Sprintf(urlFormat, portScheduler)},
   108  	}
   109  }
   110  
   111  // WaitForControlPlaneComponents waits for all control plane components to report "ok" on /healthz
   112  func (w *KubeWaiter) WaitForControlPlaneComponents(cfg *kubeadmapi.ClusterConfiguration) error {
   113  	fmt.Printf("[control-plane-check] Waiting for healthy control plane components."+
   114  		" This can take up to %v\n", w.timeout)
   115  
   116  	components := getControlPlaneComponents(cfg)
   117  
   118  	var errs []error
   119  	errChan := make(chan error, len(components))
   120  
   121  	for _, comp := range components {
   122  		fmt.Printf("[control-plane-check] Checking %s at %s\n", comp.name, comp.url)
   123  
   124  		go func(comp controlPlaneComponent) {
   125  			tr := &http.Transport{
   126  				TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
   127  			}
   128  			client := &http.Client{Transport: tr}
   129  			start := time.Now()
   130  			var lastError error
   131  
   132  			err := wait.PollUntilContextTimeout(
   133  				context.Background(),
   134  				constants.KubernetesAPICallRetryInterval,
   135  				w.timeout,
   136  				true, func(ctx context.Context) (bool, error) {
   137  					resp, err := client.Get(comp.url)
   138  					if err != nil {
   139  						lastError = errors.WithMessagef(err, "%s /healthz check failed", comp.name)
   140  						return false, nil
   141  					}
   142  
   143  					defer func() {
   144  						_ = resp.Body.Close()
   145  					}()
   146  					if resp.StatusCode != http.StatusOK {
   147  						lastError = errors.Errorf("%s /healthz check failed with status: %d", comp.name, resp.StatusCode)
   148  						return false, nil
   149  					}
   150  
   151  					return true, nil
   152  				})
   153  			if err != nil {
   154  				fmt.Printf("[control-plane-check] %s is not healthy after %v\n", comp.name, time.Since(start))
   155  				errChan <- lastError
   156  				return
   157  			}
   158  			fmt.Printf("[control-plane-check] %s is healthy after %v\n", comp.name, time.Since(start))
   159  			errChan <- nil
   160  		}(comp)
   161  	}
   162  
   163  	for i := 0; i < len(components); i++ {
   164  		if err := <-errChan; err != nil {
   165  			errs = append(errs, err)
   166  		}
   167  	}
   168  	return utilerrors.NewAggregate(errs)
   169  }
   170  
   171  // WaitForAPI waits for the API Server's /healthz endpoint to report "ok"
   172  func (w *KubeWaiter) WaitForAPI() error {
   173  	fmt.Printf("[api-check] Waiting for a healthy API server. This can take up to %v\n", w.timeout)
   174  
   175  	start := time.Now()
   176  	err := wait.PollUntilContextTimeout(
   177  		context.Background(),
   178  		constants.KubernetesAPICallRetryInterval,
   179  		w.timeout,
   180  		true, func(ctx context.Context) (bool, error) {
   181  			healthStatus := 0
   182  			w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus)
   183  			if healthStatus != http.StatusOK {
   184  				return false, nil
   185  			}
   186  			return true, nil
   187  		})
   188  	if err != nil {
   189  		fmt.Printf("[api-check] The API server is not healthy after %v\n", time.Since(start))
   190  		return err
   191  	}
   192  
   193  	fmt.Printf("[api-check] The API server is healthy after %v\n", time.Since(start))
   194  	return nil
   195  }
   196  
   197  // WaitForPodsWithLabel will lookup pods with the given label and wait until they are all
   198  // reporting status as running.
   199  func (w *KubeWaiter) WaitForPodsWithLabel(kvLabel string) error {
   200  
   201  	lastKnownPodNumber := -1
   202  	return wait.PollUntilContextTimeout(context.Background(),
   203  		constants.KubernetesAPICallRetryInterval, w.timeout,
   204  		true, func(_ context.Context) (bool, error) {
   205  			listOpts := metav1.ListOptions{LabelSelector: kvLabel}
   206  			pods, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), listOpts)
   207  			if err != nil {
   208  				fmt.Fprintf(w.writer, "[apiclient] Error getting Pods with label selector %q [%v]\n", kvLabel, err)
   209  				return false, nil
   210  			}
   211  
   212  			if lastKnownPodNumber != len(pods.Items) {
   213  				fmt.Fprintf(w.writer, "[apiclient] Found %d Pods for label selector %s\n", len(pods.Items), kvLabel)
   214  				lastKnownPodNumber = len(pods.Items)
   215  			}
   216  
   217  			if len(pods.Items) == 0 {
   218  				return false, nil
   219  			}
   220  
   221  			for _, pod := range pods.Items {
   222  				if pod.Status.Phase != v1.PodRunning {
   223  					return false, nil
   224  				}
   225  			}
   226  
   227  			return true, nil
   228  		})
   229  }
   230  
   231  // WaitForPodToDisappear blocks until it timeouts or gets a "NotFound" response from the API Server when getting the Static Pod in question
   232  func (w *KubeWaiter) WaitForPodToDisappear(podName string) error {
   233  	return wait.PollUntilContextTimeout(context.Background(),
   234  		constants.KubernetesAPICallRetryInterval, w.timeout,
   235  		true, func(_ context.Context) (bool, error) {
   236  			_, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).Get(context.TODO(), podName, metav1.GetOptions{})
   237  			if err != nil && apierrors.IsNotFound(err) {
   238  				fmt.Printf("[apiclient] The old Pod %q is now removed (which is desired)\n", podName)
   239  				return true, nil
   240  			}
   241  			return false, nil
   242  		})
   243  }
   244  
   245  // WaitForKubelet blocks until the kubelet /healthz endpoint returns 'ok'.
   246  func (w *KubeWaiter) WaitForKubelet() error {
   247  	var (
   248  		lastError       error
   249  		start           = time.Now()
   250  		healthzEndpoint = fmt.Sprintf("http://localhost:%d/healthz", constants.KubeletHealthzPort)
   251  	)
   252  
   253  	fmt.Printf("[kubelet-check] Waiting for a healthy kubelet. This can take up to %v\n", w.timeout)
   254  
   255  	formatError := func(cause string) error {
   256  		return errors.Errorf("The HTTP call equal to 'curl -sSL %s' returned %s\n",
   257  			healthzEndpoint, cause)
   258  	}
   259  
   260  	err := wait.PollUntilContextTimeout(
   261  		context.Background(),
   262  		constants.KubernetesAPICallRetryInterval,
   263  		w.timeout,
   264  		true, func(ctx context.Context) (bool, error) {
   265  			client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})}
   266  			req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthzEndpoint, nil)
   267  			if err != nil {
   268  				lastError = formatError(fmt.Sprintf("error: %v", err))
   269  				return false, err
   270  			}
   271  			resp, err := client.Do(req)
   272  			if err != nil {
   273  				lastError = formatError(fmt.Sprintf("error: %v", err))
   274  				return false, nil
   275  			}
   276  			defer func() {
   277  				_ = resp.Body.Close()
   278  			}()
   279  			if resp.StatusCode != http.StatusOK {
   280  				lastError = formatError(fmt.Sprintf("status code: %d", resp.StatusCode))
   281  				return false, nil
   282  			}
   283  
   284  			return true, nil
   285  		})
   286  	if err != nil {
   287  		fmt.Printf("[kubelet-check] The kubelet is not healthy after %v\n", time.Since(start))
   288  		return lastError
   289  	}
   290  
   291  	fmt.Printf("[kubelet-check] The kubelet is healthy after %v\n", time.Since(start))
   292  	return nil
   293  }
   294  
   295  // SetTimeout adjusts the timeout to the specified duration
   296  func (w *KubeWaiter) SetTimeout(timeout time.Duration) {
   297  	w.timeout = timeout
   298  }
   299  
   300  // WaitForStaticPodControlPlaneHashes blocks until it timeouts or gets a hash map for all components and their Static Pods
   301  func (w *KubeWaiter) WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error) {
   302  
   303  	componentHash := ""
   304  	var err, lastErr error
   305  	mirrorPodHashes := map[string]string{}
   306  	for _, component := range constants.ControlPlaneComponents {
   307  		err = wait.PollUntilContextTimeout(context.Background(),
   308  			constants.KubernetesAPICallRetryInterval, w.timeout,
   309  			true, func(_ context.Context) (bool, error) {
   310  				componentHash, err = getStaticPodSingleHash(w.client, nodeName, component)
   311  				if err != nil {
   312  					lastErr = err
   313  					return false, nil
   314  				}
   315  				return true, nil
   316  			})
   317  		if err != nil {
   318  			return nil, lastErr
   319  		}
   320  		mirrorPodHashes[component] = componentHash
   321  	}
   322  
   323  	return mirrorPodHashes, nil
   324  }
   325  
   326  // WaitForStaticPodSingleHash blocks until it timeouts or gets a hash for a single component and its Static Pod
   327  func (w *KubeWaiter) WaitForStaticPodSingleHash(nodeName string, component string) (string, error) {
   328  
   329  	componentPodHash := ""
   330  	var err, lastErr error
   331  	err = wait.PollUntilContextTimeout(context.Background(),
   332  		constants.KubernetesAPICallRetryInterval, w.timeout,
   333  		true, func(_ context.Context) (bool, error) {
   334  			componentPodHash, err = getStaticPodSingleHash(w.client, nodeName, component)
   335  			if err != nil {
   336  				lastErr = err
   337  				return false, nil
   338  			}
   339  			return true, nil
   340  		})
   341  
   342  	if err != nil {
   343  		err = lastErr
   344  	}
   345  	return componentPodHash, err
   346  }
   347  
   348  // WaitForStaticPodHashChange blocks until it timeouts or notices that the Mirror Pod (for the Static Pod, respectively) has changed
   349  // This implicitly means this function blocks until the kubelet has restarted the Static Pod in question
   350  func (w *KubeWaiter) WaitForStaticPodHashChange(nodeName, component, previousHash string) error {
   351  	var err, lastErr error
   352  	err = wait.PollUntilContextTimeout(context.Background(),
   353  		constants.KubernetesAPICallRetryInterval, w.timeout,
   354  		true, func(_ context.Context) (bool, error) {
   355  			hash, err := getStaticPodSingleHash(w.client, nodeName, component)
   356  			if err != nil {
   357  				lastErr = err
   358  				return false, nil
   359  			}
   360  			// Set lastErr to nil to be able to later distinguish between getStaticPodSingleHash() and timeout errors
   361  			lastErr = nil
   362  			// We should continue polling until the UID changes
   363  			if hash == previousHash {
   364  				return false, nil
   365  			}
   366  
   367  			return true, nil
   368  		})
   369  
   370  	// If lastError is not nil, this must be a getStaticPodSingleHash() error, else if err is not nil there was a poll timeout
   371  	if lastErr != nil {
   372  		return lastErr
   373  	}
   374  	return errors.Wrapf(err, "static Pod hash for component %s on Node %s did not change after %v", component, nodeName, w.timeout)
   375  }
   376  
   377  // getStaticPodSingleHash computes hashes for a single Static Pod resource
   378  func getStaticPodSingleHash(client clientset.Interface, nodeName string, component string) (string, error) {
   379  
   380  	staticPodName := fmt.Sprintf("%s-%s", component, nodeName)
   381  	staticPod, err := client.CoreV1().Pods(metav1.NamespaceSystem).Get(context.TODO(), staticPodName, metav1.GetOptions{})
   382  	if err != nil {
   383  		return "", errors.Wrapf(err, "failed to obtain static Pod hash for component %s on Node %s", component, nodeName)
   384  	}
   385  
   386  	staticPodHash := staticPod.Annotations["kubernetes.io/config.hash"]
   387  	return staticPodHash, nil
   388  }
   389  

View as plain text