...

Source file src/k8s.io/kubernetes/cmd/kubeadm/app/util/runtime/runtime.go

Documentation: k8s.io/kubernetes/cmd/kubeadm/app/util/runtime

     1  /*
     2  Copyright 2018 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package runtime
    18  
    19  import (
    20  	"os"
    21  	"strings"
    22  
    23  	"github.com/pkg/errors"
    24  
    25  	errorsutil "k8s.io/apimachinery/pkg/util/errors"
    26  	"k8s.io/klog/v2"
    27  	utilsexec "k8s.io/utils/exec"
    28  
    29  	"k8s.io/kubernetes/cmd/kubeadm/app/constants"
    30  )
    31  
    32  // defaultKnownCRISockets holds the set of known CRI endpoints
    33  var defaultKnownCRISockets = []string{
    34  	constants.CRISocketContainerd,
    35  	constants.CRISocketCRIO,
    36  	constants.CRISocketDocker,
    37  }
    38  
    39  // ContainerRuntime is an interface for working with container runtimes
    40  type ContainerRuntime interface {
    41  	Socket() string
    42  	IsRunning() error
    43  	ListKubeContainers() ([]string, error)
    44  	RemoveContainers(containers []string) error
    45  	PullImage(image string) error
    46  	PullImagesInParallel(images []string, ifNotPresent bool) error
    47  	ImageExists(image string) (bool, error)
    48  	SandboxImage() (string, error)
    49  }
    50  
    51  // CRIRuntime is a struct that interfaces with the CRI
    52  type CRIRuntime struct {
    53  	exec       utilsexec.Interface
    54  	criSocket  string
    55  	crictlPath string
    56  }
    57  
    58  // NewContainerRuntime sets up and returns a ContainerRuntime struct
    59  func NewContainerRuntime(execer utilsexec.Interface, criSocket string) (ContainerRuntime, error) {
    60  	const toolName = "crictl"
    61  	crictlPath, err := execer.LookPath(toolName)
    62  	if err != nil {
    63  		return nil, errors.Wrapf(err, "%s is required by the container runtime", toolName)
    64  	}
    65  	return &CRIRuntime{execer, criSocket, crictlPath}, nil
    66  }
    67  
    68  // Socket returns the CRI socket endpoint
    69  func (runtime *CRIRuntime) Socket() string {
    70  	return runtime.criSocket
    71  }
    72  
    73  // crictl creates a crictl command for the provided args.
    74  func (runtime *CRIRuntime) crictl(args ...string) utilsexec.Cmd {
    75  	cmd := runtime.exec.Command(runtime.crictlPath, append([]string{"-r", runtime.Socket(), "-i", runtime.Socket()}, args...)...)
    76  	cmd.SetEnv(os.Environ())
    77  	return cmd
    78  }
    79  
    80  // IsRunning checks if runtime is running
    81  func (runtime *CRIRuntime) IsRunning() error {
    82  	if out, err := runtime.crictl("info").CombinedOutput(); err != nil {
    83  		return errors.Wrapf(err, "container runtime is not running: output: %s, error", string(out))
    84  	}
    85  	return nil
    86  }
    87  
    88  // ListKubeContainers lists running k8s CRI pods
    89  func (runtime *CRIRuntime) ListKubeContainers() ([]string, error) {
    90  	// Disable debug mode regardless how the crictl is configured so that the debug info won't be
    91  	// iterpreted to the Pod ID.
    92  	args := []string{"-D=false", "pods", "-q"}
    93  	out, err := runtime.crictl(args...).CombinedOutput()
    94  	if err != nil {
    95  		return nil, errors.Wrapf(err, "output: %s, error", string(out))
    96  	}
    97  	pods := []string{}
    98  	pods = append(pods, strings.Fields(string(out))...)
    99  	return pods, nil
   100  }
   101  
   102  // RemoveContainers removes running k8s pods
   103  func (runtime *CRIRuntime) RemoveContainers(containers []string) error {
   104  	errs := []error{}
   105  	for _, container := range containers {
   106  		var lastErr error
   107  		for i := 0; i < constants.RemoveContainerRetry; i++ {
   108  			klog.V(5).Infof("Attempting to remove container %v", container)
   109  			out, err := runtime.crictl("stopp", container).CombinedOutput()
   110  			if err != nil {
   111  				lastErr = errors.Wrapf(err, "failed to stop running pod %s: output: %s", container, string(out))
   112  				continue
   113  			}
   114  			out, err = runtime.crictl("rmp", container).CombinedOutput()
   115  			if err != nil {
   116  				lastErr = errors.Wrapf(err, "failed to remove running container %s: output: %s", container, string(out))
   117  				continue
   118  			}
   119  			lastErr = nil
   120  			break
   121  		}
   122  
   123  		if lastErr != nil {
   124  			errs = append(errs, lastErr)
   125  		}
   126  	}
   127  	return errorsutil.NewAggregate(errs)
   128  }
   129  
   130  // PullImage pulls the image
   131  func (runtime *CRIRuntime) PullImage(image string) error {
   132  	var err error
   133  	var out []byte
   134  	for i := 0; i < constants.PullImageRetry; i++ {
   135  		out, err = runtime.crictl("pull", image).CombinedOutput()
   136  		if err == nil {
   137  			return nil
   138  		}
   139  	}
   140  	return errors.Wrapf(err, "output: %s, error", out)
   141  }
   142  
   143  // PullImagesInParallel pulls a list of images in parallel
   144  func (runtime *CRIRuntime) PullImagesInParallel(images []string, ifNotPresent bool) error {
   145  	errs := pullImagesInParallelImpl(images, ifNotPresent, runtime.ImageExists, runtime.PullImage)
   146  	return errorsutil.NewAggregate(errs)
   147  }
   148  
   149  func pullImagesInParallelImpl(images []string, ifNotPresent bool,
   150  	imageExistsFunc func(string) (bool, error), pullImageFunc func(string) error) []error {
   151  
   152  	var errs []error
   153  	errChan := make(chan error, len(images))
   154  
   155  	klog.V(1).Info("pulling all images in parallel")
   156  	for _, img := range images {
   157  		image := img
   158  		go func() {
   159  			if ifNotPresent {
   160  				exists, err := imageExistsFunc(image)
   161  				if err != nil {
   162  					errChan <- errors.WithMessagef(err, "failed to check if image %s exists", image)
   163  					return
   164  				}
   165  				if exists {
   166  					klog.V(1).Infof("image exists: %s", image)
   167  					errChan <- nil
   168  					return
   169  				}
   170  			}
   171  			err := pullImageFunc(image)
   172  			if err != nil {
   173  				err = errors.WithMessagef(err, "failed to pull image %s", image)
   174  			} else {
   175  				klog.V(1).Infof("done pulling: %s", image)
   176  			}
   177  			errChan <- err
   178  		}()
   179  	}
   180  
   181  	for i := 0; i < len(images); i++ {
   182  		if err := <-errChan; err != nil {
   183  			errs = append(errs, err)
   184  		}
   185  	}
   186  
   187  	return errs
   188  }
   189  
   190  // ImageExists checks to see if the image exists on the system
   191  func (runtime *CRIRuntime) ImageExists(image string) (bool, error) {
   192  	err := runtime.crictl("inspecti", image).Run()
   193  	return err == nil, nil
   194  }
   195  
   196  // detectCRISocketImpl is separated out only for test purposes, DON'T call it directly, use DetectCRISocket instead
   197  func detectCRISocketImpl(isSocket func(string) bool, knownCRISockets []string) (string, error) {
   198  	foundCRISockets := []string{}
   199  
   200  	for _, socket := range knownCRISockets {
   201  		if isSocket(socket) {
   202  			foundCRISockets = append(foundCRISockets, socket)
   203  		}
   204  	}
   205  
   206  	switch len(foundCRISockets) {
   207  	case 0:
   208  		// Fall back to the default socket if no CRI is detected, we can error out later on if we need it
   209  		return constants.DefaultCRISocket, nil
   210  	case 1:
   211  		// Precisely one CRI found, use that
   212  		return foundCRISockets[0], nil
   213  	default:
   214  		// Multiple CRIs installed?
   215  		return "", errors.Errorf("Found multiple CRI endpoints on the host. Please define which one do you wish "+
   216  			"to use by setting the 'criSocket' field in the kubeadm configuration file: %s",
   217  			strings.Join(foundCRISockets, ", "))
   218  	}
   219  }
   220  
   221  // DetectCRISocket uses a list of known CRI sockets to detect one. If more than one or none is discovered, an error is returned.
   222  func DetectCRISocket() (string, error) {
   223  	return detectCRISocketImpl(isExistingSocket, defaultKnownCRISockets)
   224  }
   225  
   226  // SandboxImage returns the sandbox image used by the container runtime
   227  func (runtime *CRIRuntime) SandboxImage() (string, error) {
   228  	args := []string{"-D=false", "info", "-o", "go-template", "--template", "{{.config.sandboxImage}}"}
   229  	out, err := runtime.crictl(args...).CombinedOutput()
   230  	if err != nil {
   231  		return "", errors.Wrapf(err, "output: %s, error", string(out))
   232  	}
   233  
   234  	sandboxImage := strings.TrimSpace(string(out))
   235  	if len(sandboxImage) > 0 {
   236  		return sandboxImage, nil
   237  	}
   238  
   239  	return "", errors.Errorf("the detected sandbox image is empty")
   240  }
   241  

View as plain text