1
16
17 package apiclient
18
19 import (
20 "context"
21 "crypto/tls"
22 "fmt"
23 "io"
24 "net/http"
25 "time"
26
27 "github.com/pkg/errors"
28
29 v1 "k8s.io/api/core/v1"
30 apierrors "k8s.io/apimachinery/pkg/api/errors"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 utilerrors "k8s.io/apimachinery/pkg/util/errors"
33 netutil "k8s.io/apimachinery/pkg/util/net"
34 "k8s.io/apimachinery/pkg/util/wait"
35 clientset "k8s.io/client-go/kubernetes"
36
37 kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
38 "k8s.io/kubernetes/cmd/kubeadm/app/constants"
39 )
40
41
42 type Waiter interface {
43
44 WaitForControlPlaneComponents(cfg *kubeadmapi.ClusterConfiguration) error
45
46
47
48 WaitForAPI() error
49
50 WaitForPodsWithLabel(kvLabel string) error
51
52 WaitForPodToDisappear(staticPodName string) error
53
54 WaitForStaticPodSingleHash(nodeName string, component string) (string, error)
55
56
57 WaitForStaticPodHashChange(nodeName, component, previousHash string) error
58
59 WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error)
60
61 WaitForKubelet() error
62
63 SetTimeout(timeout time.Duration)
64 }
65
66
67 type KubeWaiter struct {
68 client clientset.Interface
69 timeout time.Duration
70 writer io.Writer
71 }
72
73
74 func NewKubeWaiter(client clientset.Interface, timeout time.Duration, writer io.Writer) Waiter {
75 return &KubeWaiter{
76 client: client,
77 timeout: timeout,
78 writer: writer,
79 }
80 }
81
82 type controlPlaneComponent struct {
83 name string
84 url string
85 }
86
87
88
89 func getControlPlaneComponents(cfg *kubeadmapi.ClusterConfiguration) []controlPlaneComponent {
90 portArg := "secure-port"
91 portAPIServer, idx := kubeadmapi.GetArgValue(cfg.APIServer.ExtraArgs, portArg, -1)
92 if idx == -1 {
93 portAPIServer = "6443"
94 }
95 portKCM, idx := kubeadmapi.GetArgValue(cfg.ControllerManager.ExtraArgs, portArg, -1)
96 if idx == -1 {
97 portKCM = "10257"
98 }
99 portScheduler, idx := kubeadmapi.GetArgValue(cfg.Scheduler.ExtraArgs, portArg, -1)
100 if idx == -1 {
101 portScheduler = "10259"
102 }
103 urlFormat := "https://127.0.0.1:%s/healthz"
104 return []controlPlaneComponent{
105 {name: "kube-apiserver", url: fmt.Sprintf(urlFormat, portAPIServer)},
106 {name: "kube-controller-manager", url: fmt.Sprintf(urlFormat, portKCM)},
107 {name: "kube-scheduler", url: fmt.Sprintf(urlFormat, portScheduler)},
108 }
109 }
110
111
112 func (w *KubeWaiter) WaitForControlPlaneComponents(cfg *kubeadmapi.ClusterConfiguration) error {
113 fmt.Printf("[control-plane-check] Waiting for healthy control plane components."+
114 " This can take up to %v\n", w.timeout)
115
116 components := getControlPlaneComponents(cfg)
117
118 var errs []error
119 errChan := make(chan error, len(components))
120
121 for _, comp := range components {
122 fmt.Printf("[control-plane-check] Checking %s at %s\n", comp.name, comp.url)
123
124 go func(comp controlPlaneComponent) {
125 tr := &http.Transport{
126 TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
127 }
128 client := &http.Client{Transport: tr}
129 start := time.Now()
130 var lastError error
131
132 err := wait.PollUntilContextTimeout(
133 context.Background(),
134 constants.KubernetesAPICallRetryInterval,
135 w.timeout,
136 true, func(ctx context.Context) (bool, error) {
137 resp, err := client.Get(comp.url)
138 if err != nil {
139 lastError = errors.WithMessagef(err, "%s /healthz check failed", comp.name)
140 return false, nil
141 }
142
143 defer func() {
144 _ = resp.Body.Close()
145 }()
146 if resp.StatusCode != http.StatusOK {
147 lastError = errors.Errorf("%s /healthz check failed with status: %d", comp.name, resp.StatusCode)
148 return false, nil
149 }
150
151 return true, nil
152 })
153 if err != nil {
154 fmt.Printf("[control-plane-check] %s is not healthy after %v\n", comp.name, time.Since(start))
155 errChan <- lastError
156 return
157 }
158 fmt.Printf("[control-plane-check] %s is healthy after %v\n", comp.name, time.Since(start))
159 errChan <- nil
160 }(comp)
161 }
162
163 for i := 0; i < len(components); i++ {
164 if err := <-errChan; err != nil {
165 errs = append(errs, err)
166 }
167 }
168 return utilerrors.NewAggregate(errs)
169 }
170
171
172 func (w *KubeWaiter) WaitForAPI() error {
173 fmt.Printf("[api-check] Waiting for a healthy API server. This can take up to %v\n", w.timeout)
174
175 start := time.Now()
176 err := wait.PollUntilContextTimeout(
177 context.Background(),
178 constants.KubernetesAPICallRetryInterval,
179 w.timeout,
180 true, func(ctx context.Context) (bool, error) {
181 healthStatus := 0
182 w.client.Discovery().RESTClient().Get().AbsPath("/healthz").Do(ctx).StatusCode(&healthStatus)
183 if healthStatus != http.StatusOK {
184 return false, nil
185 }
186 return true, nil
187 })
188 if err != nil {
189 fmt.Printf("[api-check] The API server is not healthy after %v\n", time.Since(start))
190 return err
191 }
192
193 fmt.Printf("[api-check] The API server is healthy after %v\n", time.Since(start))
194 return nil
195 }
196
197
198
199 func (w *KubeWaiter) WaitForPodsWithLabel(kvLabel string) error {
200
201 lastKnownPodNumber := -1
202 return wait.PollUntilContextTimeout(context.Background(),
203 constants.KubernetesAPICallRetryInterval, w.timeout,
204 true, func(_ context.Context) (bool, error) {
205 listOpts := metav1.ListOptions{LabelSelector: kvLabel}
206 pods, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).List(context.TODO(), listOpts)
207 if err != nil {
208 fmt.Fprintf(w.writer, "[apiclient] Error getting Pods with label selector %q [%v]\n", kvLabel, err)
209 return false, nil
210 }
211
212 if lastKnownPodNumber != len(pods.Items) {
213 fmt.Fprintf(w.writer, "[apiclient] Found %d Pods for label selector %s\n", len(pods.Items), kvLabel)
214 lastKnownPodNumber = len(pods.Items)
215 }
216
217 if len(pods.Items) == 0 {
218 return false, nil
219 }
220
221 for _, pod := range pods.Items {
222 if pod.Status.Phase != v1.PodRunning {
223 return false, nil
224 }
225 }
226
227 return true, nil
228 })
229 }
230
231
232 func (w *KubeWaiter) WaitForPodToDisappear(podName string) error {
233 return wait.PollUntilContextTimeout(context.Background(),
234 constants.KubernetesAPICallRetryInterval, w.timeout,
235 true, func(_ context.Context) (bool, error) {
236 _, err := w.client.CoreV1().Pods(metav1.NamespaceSystem).Get(context.TODO(), podName, metav1.GetOptions{})
237 if err != nil && apierrors.IsNotFound(err) {
238 fmt.Printf("[apiclient] The old Pod %q is now removed (which is desired)\n", podName)
239 return true, nil
240 }
241 return false, nil
242 })
243 }
244
245
246 func (w *KubeWaiter) WaitForKubelet() error {
247 var (
248 lastError error
249 start = time.Now()
250 healthzEndpoint = fmt.Sprintf("http://localhost:%d/healthz", constants.KubeletHealthzPort)
251 )
252
253 fmt.Printf("[kubelet-check] Waiting for a healthy kubelet. This can take up to %v\n", w.timeout)
254
255 formatError := func(cause string) error {
256 return errors.Errorf("The HTTP call equal to 'curl -sSL %s' returned %s\n",
257 healthzEndpoint, cause)
258 }
259
260 err := wait.PollUntilContextTimeout(
261 context.Background(),
262 constants.KubernetesAPICallRetryInterval,
263 w.timeout,
264 true, func(ctx context.Context) (bool, error) {
265 client := &http.Client{Transport: netutil.SetOldTransportDefaults(&http.Transport{})}
266 req, err := http.NewRequestWithContext(ctx, http.MethodGet, healthzEndpoint, nil)
267 if err != nil {
268 lastError = formatError(fmt.Sprintf("error: %v", err))
269 return false, err
270 }
271 resp, err := client.Do(req)
272 if err != nil {
273 lastError = formatError(fmt.Sprintf("error: %v", err))
274 return false, nil
275 }
276 defer func() {
277 _ = resp.Body.Close()
278 }()
279 if resp.StatusCode != http.StatusOK {
280 lastError = formatError(fmt.Sprintf("status code: %d", resp.StatusCode))
281 return false, nil
282 }
283
284 return true, nil
285 })
286 if err != nil {
287 fmt.Printf("[kubelet-check] The kubelet is not healthy after %v\n", time.Since(start))
288 return lastError
289 }
290
291 fmt.Printf("[kubelet-check] The kubelet is healthy after %v\n", time.Since(start))
292 return nil
293 }
294
295
296 func (w *KubeWaiter) SetTimeout(timeout time.Duration) {
297 w.timeout = timeout
298 }
299
300
301 func (w *KubeWaiter) WaitForStaticPodControlPlaneHashes(nodeName string) (map[string]string, error) {
302
303 componentHash := ""
304 var err, lastErr error
305 mirrorPodHashes := map[string]string{}
306 for _, component := range constants.ControlPlaneComponents {
307 err = wait.PollUntilContextTimeout(context.Background(),
308 constants.KubernetesAPICallRetryInterval, w.timeout,
309 true, func(_ context.Context) (bool, error) {
310 componentHash, err = getStaticPodSingleHash(w.client, nodeName, component)
311 if err != nil {
312 lastErr = err
313 return false, nil
314 }
315 return true, nil
316 })
317 if err != nil {
318 return nil, lastErr
319 }
320 mirrorPodHashes[component] = componentHash
321 }
322
323 return mirrorPodHashes, nil
324 }
325
326
327 func (w *KubeWaiter) WaitForStaticPodSingleHash(nodeName string, component string) (string, error) {
328
329 componentPodHash := ""
330 var err, lastErr error
331 err = wait.PollUntilContextTimeout(context.Background(),
332 constants.KubernetesAPICallRetryInterval, w.timeout,
333 true, func(_ context.Context) (bool, error) {
334 componentPodHash, err = getStaticPodSingleHash(w.client, nodeName, component)
335 if err != nil {
336 lastErr = err
337 return false, nil
338 }
339 return true, nil
340 })
341
342 if err != nil {
343 err = lastErr
344 }
345 return componentPodHash, err
346 }
347
348
349
350 func (w *KubeWaiter) WaitForStaticPodHashChange(nodeName, component, previousHash string) error {
351 var err, lastErr error
352 err = wait.PollUntilContextTimeout(context.Background(),
353 constants.KubernetesAPICallRetryInterval, w.timeout,
354 true, func(_ context.Context) (bool, error) {
355 hash, err := getStaticPodSingleHash(w.client, nodeName, component)
356 if err != nil {
357 lastErr = err
358 return false, nil
359 }
360
361 lastErr = nil
362
363 if hash == previousHash {
364 return false, nil
365 }
366
367 return true, nil
368 })
369
370
371 if lastErr != nil {
372 return lastErr
373 }
374 return errors.Wrapf(err, "static Pod hash for component %s on Node %s did not change after %v", component, nodeName, w.timeout)
375 }
376
377
378 func getStaticPodSingleHash(client clientset.Interface, nodeName string, component string) (string, error) {
379
380 staticPodName := fmt.Sprintf("%s-%s", component, nodeName)
381 staticPod, err := client.CoreV1().Pods(metav1.NamespaceSystem).Get(context.TODO(), staticPodName, metav1.GetOptions{})
382 if err != nil {
383 return "", errors.Wrapf(err, "failed to obtain static Pod hash for component %s on Node %s", component, nodeName)
384 }
385
386 staticPodHash := staticPod.Annotations["kubernetes.io/config.hash"]
387 return staticPodHash, nil
388 }
389
View as plain text