1
16
17 package preflight
18
19 import (
20 "bufio"
21 "bytes"
22 "crypto/tls"
23 "crypto/x509"
24 "encoding/json"
25 "fmt"
26 "io"
27 "net"
28 "net/http"
29 "net/url"
30 "os"
31 "path/filepath"
32 "runtime"
33 "strings"
34 "time"
35
36 "github.com/pkg/errors"
37
38 v1 "k8s.io/api/core/v1"
39 netutil "k8s.io/apimachinery/pkg/util/net"
40 "k8s.io/apimachinery/pkg/util/sets"
41 "k8s.io/apimachinery/pkg/util/validation"
42 versionutil "k8s.io/apimachinery/pkg/util/version"
43 kubeadmversion "k8s.io/component-base/version"
44 "k8s.io/klog/v2"
45 system "k8s.io/system-validators/validators"
46 utilsexec "k8s.io/utils/exec"
47 netutils "k8s.io/utils/net"
48
49 kubeadmapi "k8s.io/kubernetes/cmd/kubeadm/app/apis/kubeadm"
50 kubeadmconstants "k8s.io/kubernetes/cmd/kubeadm/app/constants"
51 "k8s.io/kubernetes/cmd/kubeadm/app/images"
52 "k8s.io/kubernetes/cmd/kubeadm/app/util/initsystem"
53 utilruntime "k8s.io/kubernetes/cmd/kubeadm/app/util/runtime"
54 )
55
56 const (
57 ipv4Forward = "/proc/sys/net/ipv4/ip_forward"
58 ipv6DefaultForwarding = "/proc/sys/net/ipv6/conf/default/forwarding"
59 externalEtcdRequestTimeout = 10 * time.Second
60 externalEtcdRequestRetries = 3
61 externalEtcdRequestInterval = 5 * time.Second
62 )
63
64 var (
65 minExternalEtcdVersion = versionutil.MustParseSemantic(kubeadmconstants.MinExternalEtcdVersion)
66 )
67
68
69 type Error struct {
70 Msg string
71 }
72
73
74 func (e *Error) Error() string {
75 return fmt.Sprintf("[preflight] Some fatal errors occurred:\n%s%s", e.Msg, "[preflight] If you know what you are doing, you can make a check non-fatal with `--ignore-preflight-errors=...`")
76 }
77
78
79 func (e *Error) Preflight() bool {
80 return true
81 }
82
83
84
85 type Checker interface {
86 Check() (warnings, errorList []error)
87 Name() string
88 }
89
90
91 type ContainerRuntimeCheck struct {
92 runtime utilruntime.ContainerRuntime
93 }
94
95
96 func (ContainerRuntimeCheck) Name() string {
97 return "CRI"
98 }
99
100
101 func (crc ContainerRuntimeCheck) Check() (warnings, errorList []error) {
102 klog.V(1).Infoln("validating the container runtime")
103 if err := crc.runtime.IsRunning(); err != nil {
104 errorList = append(errorList, err)
105 }
106 return warnings, errorList
107 }
108
109
110
111
112 type ServiceCheck struct {
113 Service string
114 CheckIfActive bool
115 Label string
116 }
117
118
119 func (sc ServiceCheck) Name() string {
120 if sc.Label != "" {
121 return sc.Label
122 }
123 return fmt.Sprintf("Service-%s", strings.Title(sc.Service))
124 }
125
126
127 func (sc ServiceCheck) Check() (warnings, errorList []error) {
128 klog.V(1).Infof("validating if the %q service is enabled and active", sc.Service)
129 initSystem, err := initsystem.GetInitSystem()
130 if err != nil {
131 return []error{err}, nil
132 }
133
134 if !initSystem.ServiceExists(sc.Service) {
135 return []error{errors.Errorf("%s service does not exist", sc.Service)}, nil
136 }
137
138 if !initSystem.ServiceIsEnabled(sc.Service) {
139 warnings = append(warnings,
140 errors.Errorf("%s service is not enabled, please run '%s'",
141 sc.Service, initSystem.EnableCommand(sc.Service)))
142 }
143
144 if sc.CheckIfActive && !initSystem.ServiceIsActive(sc.Service) {
145 errorList = append(errorList,
146 errors.Errorf("%s service is not active, please run 'systemctl start %s.service'",
147 sc.Service, sc.Service))
148 }
149
150 return warnings, errorList
151 }
152
153
154
155 type FirewalldCheck struct {
156 ports []int
157 }
158
159
160 func (FirewalldCheck) Name() string {
161 return "Firewalld"
162 }
163
164
165 func (fc FirewalldCheck) Check() (warnings, errorList []error) {
166 klog.V(1).Infoln("validating if the firewall is enabled and active")
167 initSystem, err := initsystem.GetInitSystem()
168 if err != nil {
169 return []error{err}, nil
170 }
171
172 if !initSystem.ServiceExists("firewalld") {
173 return nil, nil
174 }
175
176 if initSystem.ServiceIsActive("firewalld") {
177 err := errors.Errorf("firewalld is active, please ensure ports %v are open or your cluster may not function correctly",
178 fc.ports)
179 return []error{err}, nil
180 }
181
182 return nil, nil
183 }
184
185
186 type PortOpenCheck struct {
187 port int
188 label string
189 }
190
191
192 func (poc PortOpenCheck) Name() string {
193 if poc.label != "" {
194 return poc.label
195 }
196 return fmt.Sprintf("Port-%d", poc.port)
197 }
198
199
200 func (poc PortOpenCheck) Check() (warnings, errorList []error) {
201 klog.V(1).Infof("validating availability of port %d", poc.port)
202
203 ln, err := net.Listen("tcp", fmt.Sprintf(":%d", poc.port))
204 if err != nil {
205 errorList = []error{errors.Errorf("Port %d is in use", poc.port)}
206 }
207 if ln != nil {
208 if err = ln.Close(); err != nil {
209 warnings = append(warnings,
210 errors.Errorf("when closing port %d, encountered %v", poc.port, err))
211 }
212 }
213
214 return warnings, errorList
215 }
216
217
218 type IsPrivilegedUserCheck struct{}
219
220
221 func (IsPrivilegedUserCheck) Name() string {
222 return "IsPrivilegedUser"
223 }
224
225
226 type DirAvailableCheck struct {
227 Path string
228 Label string
229 }
230
231
232 func (dac DirAvailableCheck) Name() string {
233 if dac.Label != "" {
234 return dac.Label
235 }
236 return fmt.Sprintf("DirAvailable-%s", strings.Replace(dac.Path, "/", "-", -1))
237 }
238
239
240 func (dac DirAvailableCheck) Check() (warnings, errorList []error) {
241 klog.V(1).Infof("validating the existence and emptiness of directory %s", dac.Path)
242
243
244 if _, err := os.Stat(dac.Path); os.IsNotExist(err) {
245 return nil, nil
246 }
247
248 f, err := os.Open(dac.Path)
249 if err != nil {
250 return nil, []error{errors.Wrapf(err, "unable to check if %s is empty", dac.Path)}
251 }
252 defer f.Close()
253
254 _, err = f.Readdirnames(1)
255 if err != io.EOF {
256 return nil, []error{errors.Errorf("%s is not empty", dac.Path)}
257 }
258
259 return nil, nil
260 }
261
262
263 type FileAvailableCheck struct {
264 Path string
265 Label string
266 }
267
268
269 func (fac FileAvailableCheck) Name() string {
270 if fac.Label != "" {
271 return fac.Label
272 }
273 return fmt.Sprintf("FileAvailable-%s", strings.Replace(fac.Path, "/", "-", -1))
274 }
275
276
277 func (fac FileAvailableCheck) Check() (warnings, errorList []error) {
278 klog.V(1).Infof("validating the existence of file %s", fac.Path)
279
280 if _, err := os.Stat(fac.Path); err == nil {
281 return nil, []error{errors.Errorf("%s already exists", fac.Path)}
282 }
283 return nil, nil
284 }
285
286
287 type FileExistingCheck struct {
288 Path string
289 Label string
290 }
291
292
293 func (fac FileExistingCheck) Name() string {
294 if fac.Label != "" {
295 return fac.Label
296 }
297 return fmt.Sprintf("FileExisting-%s", strings.Replace(fac.Path, "/", "-", -1))
298 }
299
300
301 func (fac FileExistingCheck) Check() (warnings, errorList []error) {
302 klog.V(1).Infof("validating the existence of file %s", fac.Path)
303
304 if _, err := os.Stat(fac.Path); err != nil {
305 return nil, []error{errors.Errorf("%s doesn't exist", fac.Path)}
306 }
307 return nil, nil
308 }
309
310
311 type FileContentCheck struct {
312 Path string
313 Content []byte
314 Label string
315 }
316
317
318 func (fcc FileContentCheck) Name() string {
319 if fcc.Label != "" {
320 return fcc.Label
321 }
322 return fmt.Sprintf("FileContent-%s", strings.Replace(fcc.Path, "/", "-", -1))
323 }
324
325
326 func (fcc FileContentCheck) Check() (warnings, errorList []error) {
327 klog.V(1).Infof("validating the contents of file %s", fcc.Path)
328 f, err := os.Open(fcc.Path)
329 if err != nil {
330 return nil, []error{errors.Errorf("%s does not exist", fcc.Path)}
331 }
332
333 lr := io.LimitReader(f, int64(len(fcc.Content)))
334 defer f.Close()
335
336 buf := &bytes.Buffer{}
337 _, err = io.Copy(buf, lr)
338 if err != nil {
339 return nil, []error{errors.Errorf("%s could not be read", fcc.Path)}
340 }
341
342 if !bytes.Equal(buf.Bytes(), fcc.Content) {
343 return nil, []error{errors.Errorf("%s contents are not set to %s", fcc.Path, fcc.Content)}
344 }
345 return nil, []error{}
346
347 }
348
349
350 type InPathCheck struct {
351 executable string
352 mandatory bool
353 exec utilsexec.Interface
354 label string
355 suggestion string
356 }
357
358
359 func (ipc InPathCheck) Name() string {
360 if ipc.label != "" {
361 return ipc.label
362 }
363 return fmt.Sprintf("FileExisting-%s", strings.Replace(ipc.executable, "/", "-", -1))
364 }
365
366
367 func (ipc InPathCheck) Check() (warnings, errs []error) {
368 klog.V(1).Infof("validating the presence of executable %s", ipc.executable)
369 _, err := ipc.exec.LookPath(ipc.executable)
370 if err != nil {
371 if ipc.mandatory {
372
373 return nil, []error{errors.Errorf("%s not found in system path", ipc.executable)}
374 }
375
376 warningMessage := fmt.Sprintf("%s not found in system path", ipc.executable)
377 if ipc.suggestion != "" {
378 warningMessage += fmt.Sprintf("\nSuggestion: %s", ipc.suggestion)
379 }
380 return []error{errors.New(warningMessage)}, nil
381 }
382 return nil, nil
383 }
384
385
386
387 type HostnameCheck struct {
388 nodeName string
389 }
390
391
392 func (HostnameCheck) Name() string {
393 return "Hostname"
394 }
395
396
397
398 func (hc HostnameCheck) Check() (warnings, errorList []error) {
399 klog.V(1).Infoln("checking whether the given node name is valid and reachable using net.LookupHost")
400 for _, msg := range validation.IsQualifiedName(hc.nodeName) {
401 warnings = append(warnings, errors.Errorf("invalid node name format %q: %s", hc.nodeName, msg))
402 }
403
404 addr, err := net.LookupHost(hc.nodeName)
405 if addr == nil {
406 warnings = append(warnings, errors.Errorf("hostname \"%s\" could not be reached", hc.nodeName))
407 }
408 if err != nil {
409 warnings = append(warnings, errors.Wrapf(err, "hostname \"%s\"", hc.nodeName))
410 }
411 return warnings, errorList
412 }
413
414
415
416 type HTTPProxyCheck struct {
417 Proto string
418 Host string
419 }
420
421
422 func (hst HTTPProxyCheck) Name() string {
423 return "HTTPProxy"
424 }
425
426
427 func (hst HTTPProxyCheck) Check() (warnings, errorList []error) {
428 klog.V(1).Infoln("validating if the connectivity type is via proxy or direct")
429 u := &url.URL{Scheme: hst.Proto, Host: hst.Host}
430 if netutils.IsIPv6String(hst.Host) {
431 u.Host = net.JoinHostPort(hst.Host, "1234")
432 }
433
434 req, err := http.NewRequest("GET", u.String(), nil)
435 if err != nil {
436 return nil, []error{err}
437 }
438
439 proxy, err := netutil.SetOldTransportDefaults(&http.Transport{}).Proxy(req)
440 if err != nil {
441 return nil, []error{err}
442 }
443 if proxy != nil {
444 return []error{errors.Errorf("Connection to %q uses proxy %q. If that is not intended, adjust your proxy settings", u, proxy)}, nil
445 }
446 return nil, nil
447 }
448
449
450
451
452
453
454 type HTTPProxyCIDRCheck struct {
455 Proto string
456 CIDR string
457 }
458
459
460 func (HTTPProxyCIDRCheck) Name() string {
461 return "HTTPProxyCIDR"
462 }
463
464
465
466 func (subnet HTTPProxyCIDRCheck) Check() (warnings, errorList []error) {
467 klog.V(1).Infoln("validating http connectivity to first IP address in the CIDR")
468 if len(subnet.CIDR) == 0 {
469 return nil, nil
470 }
471
472 _, cidr, err := netutils.ParseCIDRSloppy(subnet.CIDR)
473 if err != nil {
474 return nil, []error{errors.Wrapf(err, "error parsing CIDR %q", subnet.CIDR)}
475 }
476
477 testIP, err := netutils.GetIndexedIP(cidr, 1)
478 if err != nil {
479 return nil, []error{errors.Wrapf(err, "unable to get first IP address from the given CIDR (%s)", cidr.String())}
480 }
481
482 testIPstring := testIP.String()
483 if len(testIP) == net.IPv6len {
484 testIPstring = fmt.Sprintf("[%s]:1234", testIP)
485 }
486 url := fmt.Sprintf("%s://%s/", subnet.Proto, testIPstring)
487
488 req, err := http.NewRequest("GET", url, nil)
489 if err != nil {
490 return nil, []error{err}
491 }
492
493
494 proxy, err := netutil.SetOldTransportDefaults(&http.Transport{}).Proxy(req)
495 if err != nil {
496 return nil, []error{err}
497 }
498 if proxy != nil {
499 return []error{errors.Errorf("connection to %q uses proxy %q. This may lead to malfunctional cluster setup. Make sure that Pod and Services IP ranges specified correctly as exceptions in proxy configuration", subnet.CIDR, proxy)}, nil
500 }
501 return nil, nil
502 }
503
504
505 type SystemVerificationCheck struct{}
506
507
508 func (SystemVerificationCheck) Name() string {
509 return "SystemVerification"
510 }
511
512
513 func (sysver SystemVerificationCheck) Check() (warnings, errorList []error) {
514 klog.V(1).Infoln("running all checks")
515
516
517 bufw := bufio.NewWriterSize(os.Stdout, 1*1024*1024)
518 reporter := &system.StreamReporter{WriteStream: bufw}
519
520 var errs []error
521 var warns []error
522
523 var validators = []system.Validator{
524 &system.KernelValidator{Reporter: reporter}}
525
526 validators = addOSValidator(validators, reporter)
527
528
529 for _, v := range validators {
530 warn, err := v.Validate(system.DefaultSysSpec)
531 if err != nil {
532 errs = append(errs, err...)
533 }
534 if warn != nil {
535 warns = append(warns, warn...)
536 }
537 }
538
539 if len(errs) != 0 {
540
541 fmt.Println("[preflight] The system verification failed. Printing the output from the verification:")
542 bufw.Flush()
543 return warns, errs
544 }
545 return warns, nil
546 }
547
548
549 type KubernetesVersionCheck struct {
550 KubeadmVersion string
551 KubernetesVersion string
552 }
553
554
555 func (KubernetesVersionCheck) Name() string {
556 return "KubernetesVersion"
557 }
558
559
560 func (kubever KubernetesVersionCheck) Check() (warnings, errorList []error) {
561 klog.V(1).Infoln("validating Kubernetes and kubeadm version")
562
563 if strings.HasPrefix(kubever.KubeadmVersion, "v0.0.0") {
564 return nil, nil
565 }
566
567 kadmVersion, err := versionutil.ParseSemantic(kubever.KubeadmVersion)
568 if err != nil {
569 return nil, []error{errors.Wrapf(err, "couldn't parse kubeadm version %q", kubever.KubeadmVersion)}
570 }
571
572 k8sVersion, err := versionutil.ParseSemantic(kubever.KubernetesVersion)
573 if err != nil {
574 return nil, []error{errors.Wrapf(err, "couldn't parse Kubernetes version %q", kubever.KubernetesVersion)}
575 }
576
577
578
579
580
581 firstUnsupportedVersion := versionutil.MustParseSemantic(fmt.Sprintf("%d.%d.%s", kadmVersion.Major(), kadmVersion.Minor()+1, "0-0"))
582 if k8sVersion.AtLeast(firstUnsupportedVersion) {
583 return []error{errors.Errorf("Kubernetes version is greater than kubeadm version. Please consider to upgrade kubeadm. Kubernetes version: %s. Kubeadm version: %d.%d.x", k8sVersion, kadmVersion.Components()[0], kadmVersion.Components()[1])}, nil
584 }
585
586 return nil, nil
587 }
588
589
590 type KubeletVersionCheck struct {
591 KubernetesVersion string
592 minKubeletVersion *versionutil.Version
593 exec utilsexec.Interface
594 }
595
596
597 func (KubeletVersionCheck) Name() string {
598 return "KubeletVersion"
599 }
600
601
602 func (kubever KubeletVersionCheck) Check() (warnings, errorList []error) {
603 klog.V(1).Infoln("validating kubelet version")
604 kubeletVersion, err := GetKubeletVersion(kubever.exec)
605 if err != nil {
606 return nil, []error{errors.Wrap(err, "couldn't get kubelet version")}
607 }
608 if kubever.minKubeletVersion == nil {
609 kubever.minKubeletVersion = kubeadmconstants.MinimumKubeletVersion
610 }
611 if kubeletVersion.LessThan(kubever.minKubeletVersion) {
612 return nil, []error{errors.Errorf("Kubelet version %q is lower than kubeadm can support. Please upgrade kubelet", kubeletVersion)}
613 }
614
615 if kubever.KubernetesVersion != "" {
616 k8sVersion, err := versionutil.ParseSemantic(kubever.KubernetesVersion)
617 if err != nil {
618 return nil, []error{errors.Wrapf(err, "couldn't parse Kubernetes version %q", kubever.KubernetesVersion)}
619 }
620 if kubeletVersion.Major() > k8sVersion.Major() || kubeletVersion.Minor() > k8sVersion.Minor() {
621 return nil, []error{errors.Errorf("the kubelet version is higher than the control plane version. This is not a supported version skew and may lead to a malfunctional cluster. Kubelet version: %q Control plane version: %q", kubeletVersion, k8sVersion)}
622 }
623 }
624 return nil, nil
625 }
626
627
628 type SwapCheck struct{}
629
630
631 func (SwapCheck) Name() string {
632 return "Swap"
633 }
634
635
636 func (swc SwapCheck) Check() (warnings, errorList []error) {
637 klog.V(1).Infoln("validating whether swap is enabled or not")
638 f, err := os.Open("/proc/swaps")
639 if err != nil {
640
641 return nil, nil
642 }
643 defer f.Close()
644 var buf []string
645 scanner := bufio.NewScanner(f)
646 for scanner.Scan() {
647 buf = append(buf, scanner.Text())
648 }
649 if err := scanner.Err(); err != nil {
650 return []error{errors.Wrap(err, "error parsing /proc/swaps")}, nil
651 }
652
653 if len(buf) > 1 {
654 return []error{errors.New("swap is supported for cgroup v2 only; the NodeSwap feature gate of the kubelet is beta but disabled by default")}, nil
655 }
656
657 return nil, nil
658 }
659
660 type etcdVersionResponse struct {
661 Etcdserver string `json:"etcdserver"`
662 Etcdcluster string `json:"etcdcluster"`
663 }
664
665
666 type ExternalEtcdVersionCheck struct {
667 Etcd kubeadmapi.Etcd
668 }
669
670
671 func (ExternalEtcdVersionCheck) Name() string {
672 return "ExternalEtcdVersion"
673 }
674
675
676
677 func (evc ExternalEtcdVersionCheck) Check() (warnings, errorList []error) {
678 klog.V(1).Infoln("validating the external etcd version")
679
680
681 if evc.Etcd.External.Endpoints == nil {
682 return nil, nil
683 }
684
685 var config *tls.Config
686 var err error
687 if config, err = evc.configRootCAs(config); err != nil {
688 errorList = append(errorList, err)
689 return nil, errorList
690 }
691 if config, err = evc.configCertAndKey(config); err != nil {
692 errorList = append(errorList, err)
693 return nil, errorList
694 }
695
696 client := evc.getHTTPClient(config)
697 for _, endpoint := range evc.Etcd.External.Endpoints {
698 if _, err := url.Parse(endpoint); err != nil {
699 errorList = append(errorList, errors.Wrapf(err, "failed to parse external etcd endpoint %s", endpoint))
700 continue
701 }
702 resp := etcdVersionResponse{}
703 var err error
704 versionURL := fmt.Sprintf("%s/%s", endpoint, "version")
705 if tmpVersionURL, err := normalizeURLString(versionURL); err != nil {
706 errorList = append(errorList, errors.Wrapf(err, "failed to normalize external etcd version url %s", versionURL))
707 continue
708 } else {
709 versionURL = tmpVersionURL
710 }
711 if err = getEtcdVersionResponse(client, versionURL, &resp); err != nil {
712 errorList = append(errorList, err)
713 continue
714 }
715
716 etcdVersion, err := versionutil.ParseSemantic(resp.Etcdserver)
717 if err != nil {
718 errorList = append(errorList, errors.Wrapf(err, "couldn't parse external etcd version %q", resp.Etcdserver))
719 continue
720 }
721 if etcdVersion.LessThan(minExternalEtcdVersion) {
722 errorList = append(errorList, errors.Errorf("this version of kubeadm only supports external etcd version >= %s. Current version: %s", kubeadmconstants.MinExternalEtcdVersion, resp.Etcdserver))
723 continue
724 }
725 }
726
727 return nil, errorList
728 }
729
730
731 func (evc ExternalEtcdVersionCheck) configRootCAs(config *tls.Config) (*tls.Config, error) {
732 var CACertPool *x509.CertPool
733 if evc.Etcd.External.CAFile != "" {
734 CACert, err := os.ReadFile(evc.Etcd.External.CAFile)
735 if err != nil {
736 return nil, errors.Wrapf(err, "couldn't load external etcd's server certificate %s", evc.Etcd.External.CAFile)
737 }
738 CACertPool = x509.NewCertPool()
739 CACertPool.AppendCertsFromPEM(CACert)
740 }
741 if CACertPool != nil {
742 if config == nil {
743 config = &tls.Config{}
744 }
745 config.RootCAs = CACertPool
746 }
747 return config, nil
748 }
749
750
751 func (evc ExternalEtcdVersionCheck) configCertAndKey(config *tls.Config) (*tls.Config, error) {
752 var cert tls.Certificate
753 if evc.Etcd.External.CertFile != "" && evc.Etcd.External.KeyFile != "" {
754 var err error
755 cert, err = tls.LoadX509KeyPair(evc.Etcd.External.CertFile, evc.Etcd.External.KeyFile)
756 if err != nil {
757 return nil, errors.Wrapf(err, "couldn't load external etcd's certificate and key pair %s, %s", evc.Etcd.External.CertFile, evc.Etcd.External.KeyFile)
758 }
759 if config == nil {
760 config = &tls.Config{}
761 }
762 config.Certificates = []tls.Certificate{cert}
763 }
764 return config, nil
765 }
766
767 func (evc ExternalEtcdVersionCheck) getHTTPClient(config *tls.Config) *http.Client {
768 if config != nil {
769 transport := netutil.SetOldTransportDefaults(&http.Transport{
770 TLSClientConfig: config,
771 })
772 return &http.Client{
773 Transport: transport,
774 Timeout: externalEtcdRequestTimeout,
775 }
776 }
777 return &http.Client{Timeout: externalEtcdRequestTimeout, Transport: netutil.SetOldTransportDefaults(&http.Transport{})}
778 }
779
780 func getEtcdVersionResponse(client *http.Client, url string, target interface{}) error {
781 loopCount := externalEtcdRequestRetries + 1
782 var err error
783 var stopRetry bool
784 for loopCount > 0 {
785 if loopCount <= externalEtcdRequestRetries {
786 time.Sleep(externalEtcdRequestInterval)
787 }
788 stopRetry, err = func() (stopRetry bool, err error) {
789 r, err := client.Get(url)
790 if err != nil {
791 loopCount--
792 return false, err
793 }
794 defer r.Body.Close()
795
796 if r.StatusCode >= 500 && r.StatusCode <= 599 {
797 loopCount--
798 return false, errors.Errorf("server responded with non-successful status: %s", r.Status)
799 }
800 return true, json.NewDecoder(r.Body).Decode(target)
801
802 }()
803 if stopRetry {
804 break
805 }
806 }
807 return err
808 }
809
810
811 type ImagePullCheck struct {
812 runtime utilruntime.ContainerRuntime
813 imageList []string
814 sandboxImage string
815 imagePullPolicy v1.PullPolicy
816 imagePullSerial bool
817 }
818
819
820 func (ImagePullCheck) Name() string {
821 return "ImagePull"
822 }
823
824
825 func (ipc ImagePullCheck) Check() (warnings, errorList []error) {
826
827 policy := ipc.imagePullPolicy
828 switch policy {
829 case v1.PullAlways, v1.PullIfNotPresent:
830 klog.V(1).Infof("using image pull policy: %s", policy)
831 case v1.PullNever:
832 klog.V(1).Infof("skipping the pull of all images due to policy: %s", policy)
833 return warnings, errorList
834 default:
835 errorList = append(errorList, errors.Errorf("unsupported pull policy %q", policy))
836 return warnings, errorList
837 }
838
839
840 criSandboxImage, err := ipc.runtime.SandboxImage()
841 if err != nil {
842 klog.V(4).Infof("failed to detect the sandbox image for local container runtime, %v", err)
843 } else if criSandboxImage != ipc.sandboxImage {
844 klog.Warningf("detected that the sandbox image %q of the container runtime is inconsistent with that used by kubeadm."+
845 "It is recommended to use %q as the CRI sandbox image.", criSandboxImage, ipc.sandboxImage)
846 }
847
848
849 if !ipc.imagePullSerial {
850 if err := ipc.runtime.PullImagesInParallel(ipc.imageList, policy == v1.PullIfNotPresent); err != nil {
851 errorList = append(errorList, err)
852 }
853 return warnings, errorList
854 }
855
856
857 for _, image := range ipc.imageList {
858 switch policy {
859 case v1.PullIfNotPresent:
860 ret, err := ipc.runtime.ImageExists(image)
861 if ret && err == nil {
862 klog.V(1).Infof("image exists: %s", image)
863 continue
864 }
865 if err != nil {
866 errorList = append(errorList, errors.Wrapf(err, "failed to check if image %s exists", image))
867 }
868 fallthrough
869 case v1.PullAlways:
870 klog.V(1).Infof("pulling: %s", image)
871 if err := ipc.runtime.PullImage(image); err != nil {
872 errorList = append(errorList, errors.WithMessagef(err, "failed to pull image %s", image))
873 }
874 }
875 }
876
877 return warnings, errorList
878 }
879
880
881 type NumCPUCheck struct {
882 NumCPU int
883 }
884
885
886 func (NumCPUCheck) Name() string {
887 return "NumCPU"
888 }
889
890
891 func (ncc NumCPUCheck) Check() (warnings, errorList []error) {
892 numCPU := runtime.NumCPU()
893 if numCPU < ncc.NumCPU {
894 errorList = append(errorList, errors.Errorf("the number of available CPUs %d is less than the required %d", numCPU, ncc.NumCPU))
895 }
896 return warnings, errorList
897 }
898
899
900 type MemCheck struct {
901 Mem uint64
902 }
903
904
905 func (MemCheck) Name() string {
906 return "Mem"
907 }
908
909
910 func InitNodeChecks(execer utilsexec.Interface, cfg *kubeadmapi.InitConfiguration, ignorePreflightErrors sets.Set[string], isSecondaryControlPlane bool, downloadCerts bool) ([]Checker, error) {
911 if !isSecondaryControlPlane {
912
913 if err := RunRootCheckOnly(ignorePreflightErrors); err != nil {
914 return nil, err
915 }
916 }
917
918 manifestsDir := filepath.Join(kubeadmconstants.KubernetesDir, kubeadmconstants.ManifestsSubDirName)
919 checks := []Checker{
920 NumCPUCheck{NumCPU: kubeadmconstants.ControlPlaneNumCPU},
921
922
923 MemCheck{Mem: kubeadmconstants.ControlPlaneMem},
924 KubernetesVersionCheck{KubernetesVersion: cfg.KubernetesVersion, KubeadmVersion: kubeadmversion.Get().GitVersion},
925 FirewalldCheck{ports: []int{int(cfg.LocalAPIEndpoint.BindPort), kubeadmconstants.KubeletPort}},
926 PortOpenCheck{port: int(cfg.LocalAPIEndpoint.BindPort)},
927 PortOpenCheck{port: kubeadmconstants.KubeSchedulerPort},
928 PortOpenCheck{port: kubeadmconstants.KubeControllerManagerPort},
929 FileAvailableCheck{Path: kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.KubeAPIServer, manifestsDir)},
930 FileAvailableCheck{Path: kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.KubeControllerManager, manifestsDir)},
931 FileAvailableCheck{Path: kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.KubeScheduler, manifestsDir)},
932 FileAvailableCheck{Path: kubeadmconstants.GetStaticPodFilepath(kubeadmconstants.Etcd, manifestsDir)},
933 HTTPProxyCheck{Proto: "https", Host: cfg.LocalAPIEndpoint.AdvertiseAddress},
934 }
935
936
937
938
939
940
941
942
943 IPV4Check := false
944 IPV6Check := false
945 cidrs := strings.Split(cfg.Networking.ServiceSubnet, ",")
946 for _, cidr := range cidrs {
947 checks = append(checks, HTTPProxyCIDRCheck{Proto: "https", CIDR: cidr})
948 if !IPV4Check && netutils.IsIPv4CIDRString(cidr) {
949 IPV4Check = true
950 }
951 if !IPV6Check && netutils.IsIPv6CIDRString(cidr) {
952 IPV6Check = true
953 }
954
955 }
956 cidrs = strings.Split(cfg.Networking.PodSubnet, ",")
957 for _, cidr := range cidrs {
958 checks = append(checks, HTTPProxyCIDRCheck{Proto: "https", CIDR: cidr})
959 if !IPV4Check && netutils.IsIPv4CIDRString(cidr) {
960 IPV4Check = true
961 }
962 if !IPV6Check && netutils.IsIPv6CIDRString(cidr) {
963 IPV6Check = true
964 }
965 }
966
967 if !isSecondaryControlPlane {
968 checks = addCommonChecks(execer, cfg.KubernetesVersion, &cfg.NodeRegistration, checks)
969
970
971 if ip := netutils.ParseIPSloppy(cfg.LocalAPIEndpoint.AdvertiseAddress); ip != nil {
972 if !IPV4Check && netutils.IsIPv4(ip) {
973 IPV4Check = true
974 }
975 if !IPV6Check && netutils.IsIPv6(ip) {
976 IPV6Check = true
977 }
978 }
979
980 if IPV4Check {
981 checks = addIPv4Checks(checks)
982 }
983 if IPV6Check {
984 checks = addIPv6Checks(checks)
985 }
986
987
988 if cfg.Etcd.External != nil {
989
990 checks = append(checks, ExternalEtcdVersionCheck{Etcd: cfg.Etcd})
991 }
992 }
993
994 if cfg.Etcd.Local != nil {
995
996 checks = append(checks,
997 PortOpenCheck{port: kubeadmconstants.EtcdListenClientPort},
998 PortOpenCheck{port: kubeadmconstants.EtcdListenPeerPort},
999 DirAvailableCheck{Path: cfg.Etcd.Local.DataDir},
1000 )
1001 }
1002
1003 if cfg.Etcd.External != nil && !(isSecondaryControlPlane && downloadCerts) {
1004
1005 if cfg.Etcd.External.CAFile != "" {
1006 checks = append(checks, FileExistingCheck{Path: cfg.Etcd.External.CAFile, Label: "ExternalEtcdClientCertificates"})
1007 }
1008 if cfg.Etcd.External.CertFile != "" {
1009 checks = append(checks, FileExistingCheck{Path: cfg.Etcd.External.CertFile, Label: "ExternalEtcdClientCertificates"})
1010 }
1011 if cfg.Etcd.External.KeyFile != "" {
1012 checks = append(checks, FileExistingCheck{Path: cfg.Etcd.External.KeyFile, Label: "ExternalEtcdClientCertificates"})
1013 }
1014 }
1015 return checks, nil
1016 }
1017
1018
1019
1020
1021
1022 func RunInitNodeChecks(execer utilsexec.Interface, cfg *kubeadmapi.InitConfiguration, ignorePreflightErrors sets.Set[string], isSecondaryControlPlane bool, downloadCerts bool) error {
1023 checks, err := InitNodeChecks(execer, cfg, ignorePreflightErrors, isSecondaryControlPlane, downloadCerts)
1024 if err != nil {
1025 return err
1026 }
1027 return RunChecks(checks, os.Stderr, ignorePreflightErrors)
1028 }
1029
1030
1031 func JoinNodeChecks(execer utilsexec.Interface, cfg *kubeadmapi.JoinConfiguration, ignorePreflightErrors sets.Set[string]) ([]Checker, error) {
1032
1033 if err := RunRootCheckOnly(ignorePreflightErrors); err != nil {
1034 return nil, err
1035 }
1036
1037 checks := []Checker{
1038 FileAvailableCheck{Path: filepath.Join(kubeadmconstants.KubernetesDir, kubeadmconstants.KubeletKubeConfigFileName)},
1039 FileAvailableCheck{Path: filepath.Join(kubeadmconstants.KubernetesDir, kubeadmconstants.KubeletBootstrapKubeConfigFileName)},
1040 }
1041 checks = addCommonChecks(execer, "", &cfg.NodeRegistration, checks)
1042 if cfg.ControlPlane == nil {
1043 checks = append(checks, FileAvailableCheck{Path: cfg.CACertPath})
1044 }
1045
1046 if cfg.Discovery.BootstrapToken != nil {
1047 ipstr, _, err := net.SplitHostPort(cfg.Discovery.BootstrapToken.APIServerEndpoint)
1048 if err == nil {
1049 checks = append(checks,
1050 HTTPProxyCheck{Proto: "https", Host: ipstr},
1051 )
1052 if ip := netutils.ParseIPSloppy(ipstr); ip != nil {
1053 if netutils.IsIPv4(ip) {
1054 checks = addIPv4Checks(checks)
1055 }
1056 if netutils.IsIPv6(ip) {
1057 checks = addIPv6Checks(checks)
1058 }
1059 }
1060 }
1061 }
1062 return checks, nil
1063 }
1064
1065
1066 func RunJoinNodeChecks(execer utilsexec.Interface, cfg *kubeadmapi.JoinConfiguration, ignorePreflightErrors sets.Set[string]) error {
1067 checks, err := JoinNodeChecks(execer, cfg, ignorePreflightErrors)
1068 if err != nil {
1069 return err
1070 }
1071 return RunChecks(checks, os.Stderr, ignorePreflightErrors)
1072 }
1073
1074
1075
1076 func addCommonChecks(execer utilsexec.Interface, k8sVersion string, nodeReg *kubeadmapi.NodeRegistrationOptions, checks []Checker) []Checker {
1077 containerRuntime, err := utilruntime.NewContainerRuntime(execer, nodeReg.CRISocket)
1078 if err != nil {
1079 klog.Warningf("[preflight] WARNING: Couldn't create the interface used for talking to the container runtime: %v\n", err)
1080 } else {
1081 checks = append(checks, ContainerRuntimeCheck{runtime: containerRuntime})
1082 }
1083
1084
1085 checks = addSwapCheck(checks)
1086 checks = addExecChecks(checks, execer)
1087 checks = append(checks,
1088 SystemVerificationCheck{},
1089 HostnameCheck{nodeName: nodeReg.Name},
1090 KubeletVersionCheck{KubernetesVersion: k8sVersion, exec: execer},
1091 ServiceCheck{Service: "kubelet", CheckIfActive: false},
1092 PortOpenCheck{port: kubeadmconstants.KubeletPort})
1093 return checks
1094 }
1095
1096
1097 func RunRootCheckOnly(ignorePreflightErrors sets.Set[string]) error {
1098 checks := []Checker{
1099 IsPrivilegedUserCheck{},
1100 }
1101
1102 return RunChecks(checks, os.Stderr, ignorePreflightErrors)
1103 }
1104
1105
1106 func RunPullImagesCheck(execer utilsexec.Interface, cfg *kubeadmapi.InitConfiguration, ignorePreflightErrors sets.Set[string]) error {
1107 containerRuntime, err := utilruntime.NewContainerRuntime(utilsexec.New(), cfg.NodeRegistration.CRISocket)
1108 if err != nil {
1109 return &Error{Msg: err.Error()}
1110 }
1111
1112 serialPull := true
1113 if cfg.NodeRegistration.ImagePullSerial != nil {
1114 serialPull = *cfg.NodeRegistration.ImagePullSerial
1115 }
1116
1117 checks := []Checker{
1118 ImagePullCheck{
1119 runtime: containerRuntime,
1120 imageList: images.GetControlPlaneImages(&cfg.ClusterConfiguration),
1121 sandboxImage: images.GetPauseImage(&cfg.ClusterConfiguration),
1122 imagePullPolicy: cfg.NodeRegistration.ImagePullPolicy,
1123 imagePullSerial: serialPull,
1124 },
1125 }
1126 return RunChecks(checks, os.Stderr, ignorePreflightErrors)
1127 }
1128
1129
1130
1131 func RunChecks(checks []Checker, ww io.Writer, ignorePreflightErrors sets.Set[string]) error {
1132 var errsBuffer bytes.Buffer
1133
1134 for _, c := range checks {
1135 name := c.Name()
1136 warnings, errs := c.Check()
1137
1138 if setHasItemOrAll(ignorePreflightErrors, name) {
1139
1140 warnings = append(warnings, errs...)
1141 errs = []error{}
1142 }
1143
1144 for _, w := range warnings {
1145 io.WriteString(ww, fmt.Sprintf("\t[WARNING %s]: %v\n", name, w))
1146 }
1147 for _, i := range errs {
1148 errsBuffer.WriteString(fmt.Sprintf("\t[ERROR %s]: %v\n", name, i.Error()))
1149 }
1150 }
1151 if errsBuffer.Len() > 0 {
1152 return &Error{Msg: errsBuffer.String()}
1153 }
1154 return nil
1155 }
1156
1157
1158 func setHasItemOrAll(s sets.Set[string], item string) bool {
1159 if s.Has("all") || s.Has(strings.ToLower(item)) {
1160 return true
1161 }
1162 return false
1163 }
1164
1165
1166
1167 func normalizeURLString(s string) (string, error) {
1168 u, err := url.Parse(s)
1169 if err != nil {
1170 return "", err
1171 }
1172 if len(u.Path) > 0 {
1173 u.Path = strings.ReplaceAll(u.Path, "//", "/")
1174 }
1175 return u.String(), nil
1176 }
1177
View as plain text