...

Source file src/k8s.io/kubernetes/pkg/kubelet/cri/remote/remote_runtime.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cri/remote

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package remote
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"io"
    24  	"strings"
    25  	"time"
    26  
    27  	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
    28  	"go.opentelemetry.io/otel/trace"
    29  	"google.golang.org/grpc"
    30  	"google.golang.org/grpc/backoff"
    31  	"google.golang.org/grpc/codes"
    32  	"google.golang.org/grpc/credentials/insecure"
    33  	"google.golang.org/grpc/status"
    34  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    35  	"k8s.io/component-base/logs/logreduction"
    36  	tracing "k8s.io/component-base/tracing"
    37  	internalapi "k8s.io/cri-api/pkg/apis"
    38  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    39  	"k8s.io/klog/v2"
    40  	"k8s.io/kubernetes/pkg/features"
    41  	"k8s.io/kubernetes/pkg/kubelet/metrics"
    42  	"k8s.io/kubernetes/pkg/kubelet/util"
    43  	"k8s.io/kubernetes/pkg/probe/exec"
    44  
    45  	utilexec "k8s.io/utils/exec"
    46  )
    47  
    48  // remoteRuntimeService is a gRPC implementation of internalapi.RuntimeService.
    49  type remoteRuntimeService struct {
    50  	timeout       time.Duration
    51  	runtimeClient runtimeapi.RuntimeServiceClient
    52  	// Cache last per-container error message to reduce log spam
    53  	logReduction *logreduction.LogReduction
    54  }
    55  
    56  const (
    57  	// How frequently to report identical errors
    58  	identicalErrorDelay = 1 * time.Minute
    59  
    60  	// connection parameters
    61  	maxBackoffDelay      = 3 * time.Second
    62  	baseBackoffDelay     = 100 * time.Millisecond
    63  	minConnectionTimeout = 5 * time.Second
    64  )
    65  
    66  // CRIVersion is the type for valid Container Runtime Interface (CRI) API
    67  // versions.
    68  type CRIVersion string
    69  
    70  // ErrContainerStatusNil indicates that the returned container status is nil.
    71  var ErrContainerStatusNil = errors.New("container status is nil")
    72  
    73  const (
    74  	// CRIVersionV1 references the v1 CRI API.
    75  	CRIVersionV1 CRIVersion = "v1"
    76  )
    77  
    78  // NewRemoteRuntimeService creates a new internalapi.RuntimeService.
    79  func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.RuntimeService, error) {
    80  	klog.V(3).InfoS("Connecting to runtime service", "endpoint", endpoint)
    81  	addr, dialer, err := util.GetAddressAndDialer(endpoint)
    82  	if err != nil {
    83  		return nil, err
    84  	}
    85  	ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
    86  	defer cancel()
    87  
    88  	var dialOpts []grpc.DialOption
    89  	dialOpts = append(dialOpts,
    90  		grpc.WithTransportCredentials(insecure.NewCredentials()),
    91  		grpc.WithContextDialer(dialer),
    92  		grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
    93  	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
    94  		tracingOpts := []otelgrpc.Option{
    95  			otelgrpc.WithPropagators(tracing.Propagators()),
    96  			otelgrpc.WithTracerProvider(tp),
    97  		}
    98  		// Even if there is no TracerProvider, the otelgrpc still handles context propagation.
    99  		// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
   100  		dialOpts = append(dialOpts,
   101  			grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
   102  			grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...)))
   103  	}
   104  
   105  	connParams := grpc.ConnectParams{
   106  		Backoff: backoff.DefaultConfig,
   107  	}
   108  	connParams.MinConnectTimeout = minConnectionTimeout
   109  	connParams.Backoff.BaseDelay = baseBackoffDelay
   110  	connParams.Backoff.MaxDelay = maxBackoffDelay
   111  	dialOpts = append(dialOpts,
   112  		grpc.WithConnectParams(connParams),
   113  	)
   114  
   115  	conn, err := grpc.DialContext(ctx, addr, dialOpts...)
   116  	if err != nil {
   117  		klog.ErrorS(err, "Connect remote runtime failed", "address", addr)
   118  		return nil, err
   119  	}
   120  
   121  	service := &remoteRuntimeService{
   122  		timeout:      connectionTimeout,
   123  		logReduction: logreduction.NewLogReduction(identicalErrorDelay),
   124  	}
   125  
   126  	if err := service.validateServiceConnection(ctx, conn, endpoint); err != nil {
   127  		return nil, fmt.Errorf("validate service connection: %w", err)
   128  	}
   129  
   130  	return service, nil
   131  }
   132  
   133  // validateServiceConnection tries to connect to the remote runtime service by
   134  // using the CRI v1 API version and fails if that's not possible.
   135  func (r *remoteRuntimeService) validateServiceConnection(ctx context.Context, conn *grpc.ClientConn, endpoint string) error {
   136  	klog.V(4).InfoS("Validating the CRI v1 API runtime version")
   137  	r.runtimeClient = runtimeapi.NewRuntimeServiceClient(conn)
   138  
   139  	if _, err := r.runtimeClient.Version(ctx, &runtimeapi.VersionRequest{}); err != nil {
   140  		return fmt.Errorf("validate CRI v1 runtime API for endpoint %q: %w", endpoint, err)
   141  	}
   142  
   143  	klog.V(2).InfoS("Validated CRI v1 runtime API")
   144  	return nil
   145  }
   146  
   147  // Version returns the runtime name, runtime version and runtime API version.
   148  func (r *remoteRuntimeService) Version(ctx context.Context, apiVersion string) (*runtimeapi.VersionResponse, error) {
   149  	klog.V(10).InfoS("[RemoteRuntimeService] Version", "apiVersion", apiVersion, "timeout", r.timeout)
   150  
   151  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   152  	defer cancel()
   153  
   154  	return r.versionV1(ctx, apiVersion)
   155  }
   156  
   157  func (r *remoteRuntimeService) versionV1(ctx context.Context, apiVersion string) (*runtimeapi.VersionResponse, error) {
   158  	typedVersion, err := r.runtimeClient.Version(ctx, &runtimeapi.VersionRequest{
   159  		Version: apiVersion,
   160  	})
   161  	if err != nil {
   162  		klog.ErrorS(err, "Version from runtime service failed")
   163  		return nil, err
   164  	}
   165  
   166  	klog.V(10).InfoS("[RemoteRuntimeService] Version Response", "apiVersion", typedVersion)
   167  
   168  	if typedVersion.Version == "" || typedVersion.RuntimeName == "" || typedVersion.RuntimeApiVersion == "" || typedVersion.RuntimeVersion == "" {
   169  		return nil, fmt.Errorf("not all fields are set in VersionResponse (%q)", *typedVersion)
   170  	}
   171  
   172  	return typedVersion, err
   173  }
   174  
   175  // RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
   176  // the sandbox is in ready state.
   177  func (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
   178  	// Use 2 times longer timeout for sandbox operation (4 mins by default)
   179  	// TODO: Make the pod sandbox timeout configurable.
   180  	timeout := r.timeout * 2
   181  
   182  	klog.V(10).InfoS("[RemoteRuntimeService] RunPodSandbox", "config", config, "runtimeHandler", runtimeHandler, "timeout", timeout)
   183  
   184  	ctx, cancel := context.WithTimeout(ctx, timeout)
   185  	defer cancel()
   186  
   187  	resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
   188  		Config:         config,
   189  		RuntimeHandler: runtimeHandler,
   190  	})
   191  
   192  	if err != nil {
   193  		klog.ErrorS(err, "RunPodSandbox from runtime service failed")
   194  		return "", err
   195  	}
   196  
   197  	podSandboxID := resp.PodSandboxId
   198  
   199  	if podSandboxID == "" {
   200  		errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.Metadata)
   201  		err := errors.New(errorMessage)
   202  		klog.ErrorS(err, "RunPodSandbox failed")
   203  		return "", err
   204  	}
   205  
   206  	klog.V(10).InfoS("[RemoteRuntimeService] RunPodSandbox Response", "podSandboxID", podSandboxID)
   207  
   208  	return podSandboxID, nil
   209  }
   210  
   211  // StopPodSandbox stops the sandbox. If there are any running containers in the
   212  // sandbox, they should be forced to termination.
   213  func (r *remoteRuntimeService) StopPodSandbox(ctx context.Context, podSandBoxID string) (err error) {
   214  	klog.V(10).InfoS("[RemoteRuntimeService] StopPodSandbox", "podSandboxID", podSandBoxID, "timeout", r.timeout)
   215  
   216  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   217  	defer cancel()
   218  
   219  	if _, err := r.runtimeClient.StopPodSandbox(ctx, &runtimeapi.StopPodSandboxRequest{
   220  		PodSandboxId: podSandBoxID,
   221  	}); err != nil {
   222  		klog.ErrorS(err, "StopPodSandbox from runtime service failed", "podSandboxID", podSandBoxID)
   223  		return err
   224  	}
   225  
   226  	klog.V(10).InfoS("[RemoteRuntimeService] StopPodSandbox Response", "podSandboxID", podSandBoxID)
   227  
   228  	return nil
   229  }
   230  
   231  // RemovePodSandbox removes the sandbox. If there are any containers in the
   232  // sandbox, they should be forcibly removed.
   233  func (r *remoteRuntimeService) RemovePodSandbox(ctx context.Context, podSandBoxID string) (err error) {
   234  	klog.V(10).InfoS("[RemoteRuntimeService] RemovePodSandbox", "podSandboxID", podSandBoxID, "timeout", r.timeout)
   235  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   236  	defer cancel()
   237  
   238  	if _, err := r.runtimeClient.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{
   239  		PodSandboxId: podSandBoxID,
   240  	}); err != nil {
   241  		klog.ErrorS(err, "RemovePodSandbox from runtime service failed", "podSandboxID", podSandBoxID)
   242  		return err
   243  	}
   244  
   245  	klog.V(10).InfoS("[RemoteRuntimeService] RemovePodSandbox Response", "podSandboxID", podSandBoxID)
   246  
   247  	return nil
   248  }
   249  
   250  // PodSandboxStatus returns the status of the PodSandbox.
   251  func (r *remoteRuntimeService) PodSandboxStatus(ctx context.Context, podSandBoxID string, verbose bool) (*runtimeapi.PodSandboxStatusResponse, error) {
   252  	klog.V(10).InfoS("[RemoteRuntimeService] PodSandboxStatus", "podSandboxID", podSandBoxID, "timeout", r.timeout)
   253  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   254  	defer cancel()
   255  
   256  	return r.podSandboxStatusV1(ctx, podSandBoxID, verbose)
   257  }
   258  
   259  func (r *remoteRuntimeService) podSandboxStatusV1(ctx context.Context, podSandBoxID string, verbose bool) (*runtimeapi.PodSandboxStatusResponse, error) {
   260  	resp, err := r.runtimeClient.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{
   261  		PodSandboxId: podSandBoxID,
   262  		Verbose:      verbose,
   263  	})
   264  	if err != nil {
   265  		return nil, err
   266  	}
   267  
   268  	klog.V(10).InfoS("[RemoteRuntimeService] PodSandboxStatus Response", "podSandboxID", podSandBoxID, "status", resp.Status)
   269  
   270  	status := resp.Status
   271  	if resp.Status != nil {
   272  		if err := verifySandboxStatus(status); err != nil {
   273  			return nil, err
   274  		}
   275  	}
   276  
   277  	return resp, nil
   278  }
   279  
   280  // ListPodSandbox returns a list of PodSandboxes.
   281  func (r *remoteRuntimeService) ListPodSandbox(ctx context.Context, filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) {
   282  	klog.V(10).InfoS("[RemoteRuntimeService] ListPodSandbox", "filter", filter, "timeout", r.timeout)
   283  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   284  	defer cancel()
   285  
   286  	return r.listPodSandboxV1(ctx, filter)
   287  }
   288  
   289  func (r *remoteRuntimeService) listPodSandboxV1(ctx context.Context, filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) {
   290  	resp, err := r.runtimeClient.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{
   291  		Filter: filter,
   292  	})
   293  	if err != nil {
   294  		klog.ErrorS(err, "ListPodSandbox with filter from runtime service failed", "filter", filter)
   295  		return nil, err
   296  	}
   297  
   298  	klog.V(10).InfoS("[RemoteRuntimeService] ListPodSandbox Response", "filter", filter, "items", resp.Items)
   299  
   300  	return resp.Items, nil
   301  }
   302  
   303  // CreateContainer creates a new container in the specified PodSandbox.
   304  func (r *remoteRuntimeService) CreateContainer(ctx context.Context, podSandBoxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
   305  	klog.V(10).InfoS("[RemoteRuntimeService] CreateContainer", "podSandboxID", podSandBoxID, "timeout", r.timeout)
   306  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   307  	defer cancel()
   308  
   309  	return r.createContainerV1(ctx, podSandBoxID, config, sandboxConfig)
   310  }
   311  
   312  func (r *remoteRuntimeService) createContainerV1(ctx context.Context, podSandBoxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
   313  	resp, err := r.runtimeClient.CreateContainer(ctx, &runtimeapi.CreateContainerRequest{
   314  		PodSandboxId:  podSandBoxID,
   315  		Config:        config,
   316  		SandboxConfig: sandboxConfig,
   317  	})
   318  	if err != nil {
   319  		klog.ErrorS(err, "CreateContainer in sandbox from runtime service failed", "podSandboxID", podSandBoxID)
   320  		return "", err
   321  	}
   322  
   323  	klog.V(10).InfoS("[RemoteRuntimeService] CreateContainer", "podSandboxID", podSandBoxID, "containerID", resp.ContainerId)
   324  	if resp.ContainerId == "" {
   325  		errorMessage := fmt.Sprintf("ContainerId is not set for container %q", config.Metadata)
   326  		err := errors.New(errorMessage)
   327  		klog.ErrorS(err, "CreateContainer failed")
   328  		return "", err
   329  	}
   330  
   331  	return resp.ContainerId, nil
   332  }
   333  
   334  // StartContainer starts the container.
   335  func (r *remoteRuntimeService) StartContainer(ctx context.Context, containerID string) (err error) {
   336  	klog.V(10).InfoS("[RemoteRuntimeService] StartContainer", "containerID", containerID, "timeout", r.timeout)
   337  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   338  	defer cancel()
   339  
   340  	if _, err := r.runtimeClient.StartContainer(ctx, &runtimeapi.StartContainerRequest{
   341  		ContainerId: containerID,
   342  	}); err != nil {
   343  		klog.ErrorS(err, "StartContainer from runtime service failed", "containerID", containerID)
   344  		return err
   345  	}
   346  	klog.V(10).InfoS("[RemoteRuntimeService] StartContainer Response", "containerID", containerID)
   347  
   348  	return nil
   349  }
   350  
   351  // StopContainer stops a running container with a grace period (i.e., timeout).
   352  func (r *remoteRuntimeService) StopContainer(ctx context.Context, containerID string, timeout int64) (err error) {
   353  	klog.V(10).InfoS("[RemoteRuntimeService] StopContainer", "containerID", containerID, "timeout", timeout)
   354  	// Use timeout + default timeout (2 minutes) as timeout to leave extra time
   355  	// for SIGKILL container and request latency.
   356  	t := r.timeout + time.Duration(timeout)*time.Second
   357  	ctx, cancel := context.WithTimeout(ctx, t)
   358  	defer cancel()
   359  
   360  	r.logReduction.ClearID(containerID)
   361  
   362  	if _, err := r.runtimeClient.StopContainer(ctx, &runtimeapi.StopContainerRequest{
   363  		ContainerId: containerID,
   364  		Timeout:     timeout,
   365  	}); err != nil {
   366  		klog.ErrorS(err, "StopContainer from runtime service failed", "containerID", containerID)
   367  		return err
   368  	}
   369  	klog.V(10).InfoS("[RemoteRuntimeService] StopContainer Response", "containerID", containerID)
   370  
   371  	return nil
   372  }
   373  
   374  // RemoveContainer removes the container. If the container is running, the container
   375  // should be forced to removal.
   376  func (r *remoteRuntimeService) RemoveContainer(ctx context.Context, containerID string) (err error) {
   377  	klog.V(10).InfoS("[RemoteRuntimeService] RemoveContainer", "containerID", containerID, "timeout", r.timeout)
   378  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   379  	defer cancel()
   380  
   381  	r.logReduction.ClearID(containerID)
   382  	if _, err := r.runtimeClient.RemoveContainer(ctx, &runtimeapi.RemoveContainerRequest{
   383  		ContainerId: containerID,
   384  	}); err != nil {
   385  		klog.ErrorS(err, "RemoveContainer from runtime service failed", "containerID", containerID)
   386  		return err
   387  	}
   388  	klog.V(10).InfoS("[RemoteRuntimeService] RemoveContainer Response", "containerID", containerID)
   389  
   390  	return nil
   391  }
   392  
   393  // ListContainers lists containers by filters.
   394  func (r *remoteRuntimeService) ListContainers(ctx context.Context, filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) {
   395  	klog.V(10).InfoS("[RemoteRuntimeService] ListContainers", "filter", filter, "timeout", r.timeout)
   396  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   397  	defer cancel()
   398  
   399  	return r.listContainersV1(ctx, filter)
   400  }
   401  
   402  func (r *remoteRuntimeService) listContainersV1(ctx context.Context, filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) {
   403  	resp, err := r.runtimeClient.ListContainers(ctx, &runtimeapi.ListContainersRequest{
   404  		Filter: filter,
   405  	})
   406  	if err != nil {
   407  		klog.ErrorS(err, "ListContainers with filter from runtime service failed", "filter", filter)
   408  		return nil, err
   409  	}
   410  	klog.V(10).InfoS("[RemoteRuntimeService] ListContainers Response", "filter", filter, "containers", resp.Containers)
   411  
   412  	return resp.Containers, nil
   413  }
   414  
   415  // ContainerStatus returns the container status.
   416  func (r *remoteRuntimeService) ContainerStatus(ctx context.Context, containerID string, verbose bool) (*runtimeapi.ContainerStatusResponse, error) {
   417  	klog.V(10).InfoS("[RemoteRuntimeService] ContainerStatus", "containerID", containerID, "timeout", r.timeout)
   418  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   419  	defer cancel()
   420  
   421  	return r.containerStatusV1(ctx, containerID, verbose)
   422  }
   423  
   424  func (r *remoteRuntimeService) containerStatusV1(ctx context.Context, containerID string, verbose bool) (*runtimeapi.ContainerStatusResponse, error) {
   425  	resp, err := r.runtimeClient.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{
   426  		ContainerId: containerID,
   427  		Verbose:     verbose,
   428  	})
   429  	if err != nil {
   430  		// Don't spam the log with endless messages about the same failure.
   431  		if r.logReduction.ShouldMessageBePrinted(err.Error(), containerID) {
   432  			klog.ErrorS(err, "ContainerStatus from runtime service failed", "containerID", containerID)
   433  		}
   434  		return nil, err
   435  	}
   436  	r.logReduction.ClearID(containerID)
   437  	klog.V(10).InfoS("[RemoteRuntimeService] ContainerStatus Response", "containerID", containerID, "status", resp.Status)
   438  
   439  	status := resp.Status
   440  	if resp.Status != nil {
   441  		if err := verifyContainerStatus(status); err != nil {
   442  			klog.ErrorS(err, "verify ContainerStatus failed", "containerID", containerID)
   443  			return nil, err
   444  		}
   445  	}
   446  
   447  	return resp, nil
   448  }
   449  
   450  // UpdateContainerResources updates a containers resource config
   451  func (r *remoteRuntimeService) UpdateContainerResources(ctx context.Context, containerID string, resources *runtimeapi.ContainerResources) (err error) {
   452  	klog.V(10).InfoS("[RemoteRuntimeService] UpdateContainerResources", "containerID", containerID, "timeout", r.timeout)
   453  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   454  	defer cancel()
   455  
   456  	if _, err := r.runtimeClient.UpdateContainerResources(ctx, &runtimeapi.UpdateContainerResourcesRequest{
   457  		ContainerId: containerID,
   458  		Linux:       resources.GetLinux(),
   459  		Windows:     resources.GetWindows(),
   460  	}); err != nil {
   461  		klog.ErrorS(err, "UpdateContainerResources from runtime service failed", "containerID", containerID)
   462  		return err
   463  	}
   464  	klog.V(10).InfoS("[RemoteRuntimeService] UpdateContainerResources Response", "containerID", containerID)
   465  
   466  	return nil
   467  }
   468  
   469  // ExecSync executes a command in the container, and returns the stdout output.
   470  // If command exits with a non-zero exit code, an error is returned.
   471  func (r *remoteRuntimeService) ExecSync(ctx context.Context, containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
   472  	klog.V(10).InfoS("[RemoteRuntimeService] ExecSync", "containerID", containerID, "timeout", timeout)
   473  	// Do not set timeout when timeout is 0.
   474  	var cancel context.CancelFunc
   475  	if timeout != 0 {
   476  		// Use timeout + default timeout (2 minutes) as timeout to leave some time for
   477  		// the runtime to do cleanup.
   478  		ctx, cancel = context.WithTimeout(ctx, r.timeout+timeout)
   479  	} else {
   480  		ctx, cancel = context.WithCancel(ctx)
   481  	}
   482  	defer cancel()
   483  
   484  	return r.execSyncV1(ctx, containerID, cmd, timeout)
   485  }
   486  
   487  func (r *remoteRuntimeService) execSyncV1(ctx context.Context, containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
   488  	timeoutSeconds := int64(timeout.Seconds())
   489  	req := &runtimeapi.ExecSyncRequest{
   490  		ContainerId: containerID,
   491  		Cmd:         cmd,
   492  		Timeout:     timeoutSeconds,
   493  	}
   494  	resp, err := r.runtimeClient.ExecSync(ctx, req)
   495  	if err != nil {
   496  		klog.ErrorS(err, "ExecSync cmd from runtime service failed", "containerID", containerID, "cmd", cmd)
   497  
   498  		// interpret DeadlineExceeded gRPC errors as timedout probes
   499  		if status.Code(err) == codes.DeadlineExceeded {
   500  			err = exec.NewTimeoutError(fmt.Errorf("command %q timed out", strings.Join(cmd, " ")), timeout)
   501  		}
   502  
   503  		return nil, nil, err
   504  	}
   505  
   506  	klog.V(10).InfoS("[RemoteRuntimeService] ExecSync Response", "containerID", containerID, "exitCode", resp.ExitCode)
   507  	err = nil
   508  	if resp.ExitCode != 0 {
   509  		err = utilexec.CodeExitError{
   510  			Err:  fmt.Errorf("command '%s' exited with %d: %s", strings.Join(cmd, " "), resp.ExitCode, resp.Stderr),
   511  			Code: int(resp.ExitCode),
   512  		}
   513  	}
   514  
   515  	return resp.Stdout, resp.Stderr, err
   516  }
   517  
   518  // Exec prepares a streaming endpoint to execute a command in the container, and returns the address.
   519  func (r *remoteRuntimeService) Exec(ctx context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
   520  	klog.V(10).InfoS("[RemoteRuntimeService] Exec", "timeout", r.timeout)
   521  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   522  	defer cancel()
   523  
   524  	return r.execV1(ctx, req)
   525  }
   526  
   527  func (r *remoteRuntimeService) execV1(ctx context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
   528  	resp, err := r.runtimeClient.Exec(ctx, req)
   529  	if err != nil {
   530  		klog.ErrorS(err, "Exec cmd from runtime service failed", "containerID", req.ContainerId, "cmd", req.Cmd)
   531  		return nil, err
   532  	}
   533  	klog.V(10).InfoS("[RemoteRuntimeService] Exec Response")
   534  
   535  	if resp.Url == "" {
   536  		errorMessage := "URL is not set"
   537  		err := errors.New(errorMessage)
   538  		klog.ErrorS(err, "Exec failed")
   539  		return nil, err
   540  	}
   541  
   542  	return resp, nil
   543  }
   544  
   545  // Attach prepares a streaming endpoint to attach to a running container, and returns the address.
   546  func (r *remoteRuntimeService) Attach(ctx context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
   547  	klog.V(10).InfoS("[RemoteRuntimeService] Attach", "containerID", req.ContainerId, "timeout", r.timeout)
   548  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   549  	defer cancel()
   550  
   551  	return r.attachV1(ctx, req)
   552  }
   553  
   554  func (r *remoteRuntimeService) attachV1(ctx context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
   555  	resp, err := r.runtimeClient.Attach(ctx, req)
   556  	if err != nil {
   557  		klog.ErrorS(err, "Attach container from runtime service failed", "containerID", req.ContainerId)
   558  		return nil, err
   559  	}
   560  	klog.V(10).InfoS("[RemoteRuntimeService] Attach Response", "containerID", req.ContainerId)
   561  
   562  	if resp.Url == "" {
   563  		errorMessage := "URL is not set"
   564  		err := errors.New(errorMessage)
   565  		klog.ErrorS(err, "Attach failed")
   566  		return nil, err
   567  	}
   568  	return resp, nil
   569  }
   570  
   571  // PortForward prepares a streaming endpoint to forward ports from a PodSandbox, and returns the address.
   572  func (r *remoteRuntimeService) PortForward(ctx context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
   573  	klog.V(10).InfoS("[RemoteRuntimeService] PortForward", "podSandboxID", req.PodSandboxId, "port", req.Port, "timeout", r.timeout)
   574  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   575  	defer cancel()
   576  
   577  	return r.portForwardV1(ctx, req)
   578  }
   579  
   580  func (r *remoteRuntimeService) portForwardV1(ctx context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
   581  	resp, err := r.runtimeClient.PortForward(ctx, req)
   582  	if err != nil {
   583  		klog.ErrorS(err, "PortForward from runtime service failed", "podSandboxID", req.PodSandboxId)
   584  		return nil, err
   585  	}
   586  	klog.V(10).InfoS("[RemoteRuntimeService] PortForward Response", "podSandboxID", req.PodSandboxId)
   587  
   588  	if resp.Url == "" {
   589  		errorMessage := "URL is not set"
   590  		err := errors.New(errorMessage)
   591  		klog.ErrorS(err, "PortForward failed")
   592  		return nil, err
   593  	}
   594  
   595  	return resp, nil
   596  }
   597  
   598  // UpdateRuntimeConfig updates the config of a runtime service. The only
   599  // update payload currently supported is the pod CIDR assigned to a node,
   600  // and the runtime service just proxies it down to the network plugin.
   601  func (r *remoteRuntimeService) UpdateRuntimeConfig(ctx context.Context, runtimeConfig *runtimeapi.RuntimeConfig) (err error) {
   602  	klog.V(10).InfoS("[RemoteRuntimeService] UpdateRuntimeConfig", "runtimeConfig", runtimeConfig, "timeout", r.timeout)
   603  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   604  	defer cancel()
   605  
   606  	// Response doesn't contain anything of interest. This translates to an
   607  	// Event notification to the network plugin, which can't fail, so we're
   608  	// really looking to surface destination unreachable.
   609  	if _, err := r.runtimeClient.UpdateRuntimeConfig(ctx, &runtimeapi.UpdateRuntimeConfigRequest{
   610  		RuntimeConfig: runtimeConfig,
   611  	}); err != nil {
   612  		return err
   613  	}
   614  	klog.V(10).InfoS("[RemoteRuntimeService] UpdateRuntimeConfig Response", "runtimeConfig", runtimeConfig)
   615  
   616  	return nil
   617  }
   618  
   619  // Status returns the status of the runtime.
   620  func (r *remoteRuntimeService) Status(ctx context.Context, verbose bool) (*runtimeapi.StatusResponse, error) {
   621  	klog.V(10).InfoS("[RemoteRuntimeService] Status", "timeout", r.timeout)
   622  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   623  	defer cancel()
   624  
   625  	return r.statusV1(ctx, verbose)
   626  }
   627  
   628  func (r *remoteRuntimeService) statusV1(ctx context.Context, verbose bool) (*runtimeapi.StatusResponse, error) {
   629  	resp, err := r.runtimeClient.Status(ctx, &runtimeapi.StatusRequest{
   630  		Verbose: verbose,
   631  	})
   632  	if err != nil {
   633  		klog.ErrorS(err, "Status from runtime service failed")
   634  		return nil, err
   635  	}
   636  
   637  	klog.V(10).InfoS("[RemoteRuntimeService] Status Response", "status", resp.Status)
   638  
   639  	if resp.Status == nil || len(resp.Status.Conditions) < 2 {
   640  		errorMessage := "RuntimeReady or NetworkReady condition are not set"
   641  		err := errors.New(errorMessage)
   642  		klog.ErrorS(err, "Status failed")
   643  		return nil, err
   644  	}
   645  
   646  	return resp, nil
   647  }
   648  
   649  // ContainerStats returns the stats of the container.
   650  func (r *remoteRuntimeService) ContainerStats(ctx context.Context, containerID string) (*runtimeapi.ContainerStats, error) {
   651  	klog.V(10).InfoS("[RemoteRuntimeService] ContainerStats", "containerID", containerID, "timeout", r.timeout)
   652  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   653  	defer cancel()
   654  
   655  	return r.containerStatsV1(ctx, containerID)
   656  }
   657  
   658  func (r *remoteRuntimeService) containerStatsV1(ctx context.Context, containerID string) (*runtimeapi.ContainerStats, error) {
   659  	resp, err := r.runtimeClient.ContainerStats(ctx, &runtimeapi.ContainerStatsRequest{
   660  		ContainerId: containerID,
   661  	})
   662  	if err != nil {
   663  		if r.logReduction.ShouldMessageBePrinted(err.Error(), containerID) {
   664  			klog.ErrorS(err, "ContainerStats from runtime service failed", "containerID", containerID)
   665  		}
   666  		return nil, err
   667  	}
   668  	r.logReduction.ClearID(containerID)
   669  	klog.V(10).InfoS("[RemoteRuntimeService] ContainerStats Response", "containerID", containerID, "stats", resp.GetStats())
   670  
   671  	return resp.GetStats(), nil
   672  }
   673  
   674  // ListContainerStats returns the list of ContainerStats given the filter.
   675  func (r *remoteRuntimeService) ListContainerStats(ctx context.Context, filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) {
   676  	klog.V(10).InfoS("[RemoteRuntimeService] ListContainerStats", "filter", filter)
   677  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   678  	defer cancel()
   679  
   680  	return r.listContainerStatsV1(ctx, filter)
   681  }
   682  
   683  func (r *remoteRuntimeService) listContainerStatsV1(ctx context.Context, filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) {
   684  	resp, err := r.runtimeClient.ListContainerStats(ctx, &runtimeapi.ListContainerStatsRequest{
   685  		Filter: filter,
   686  	})
   687  	if err != nil {
   688  		klog.ErrorS(err, "ListContainerStats with filter from runtime service failed", "filter", filter)
   689  		return nil, err
   690  	}
   691  	klog.V(10).InfoS("[RemoteRuntimeService] ListContainerStats Response", "filter", filter, "stats", resp.GetStats())
   692  
   693  	return resp.GetStats(), nil
   694  }
   695  
   696  // PodSandboxStats returns the stats of the pod.
   697  func (r *remoteRuntimeService) PodSandboxStats(ctx context.Context, podSandboxID string) (*runtimeapi.PodSandboxStats, error) {
   698  	klog.V(10).InfoS("[RemoteRuntimeService] PodSandboxStats", "podSandboxID", podSandboxID, "timeout", r.timeout)
   699  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   700  	defer cancel()
   701  
   702  	return r.podSandboxStatsV1(ctx, podSandboxID)
   703  }
   704  
   705  func (r *remoteRuntimeService) podSandboxStatsV1(ctx context.Context, podSandboxID string) (*runtimeapi.PodSandboxStats, error) {
   706  	resp, err := r.runtimeClient.PodSandboxStats(ctx, &runtimeapi.PodSandboxStatsRequest{
   707  		PodSandboxId: podSandboxID,
   708  	})
   709  	if err != nil {
   710  		if r.logReduction.ShouldMessageBePrinted(err.Error(), podSandboxID) {
   711  			klog.ErrorS(err, "PodSandbox from runtime service failed", "podSandboxID", podSandboxID)
   712  		}
   713  		return nil, err
   714  	}
   715  	r.logReduction.ClearID(podSandboxID)
   716  	klog.V(10).InfoS("[RemoteRuntimeService] PodSandbox Response", "podSandboxID", podSandboxID, "stats", resp.GetStats())
   717  
   718  	return resp.GetStats(), nil
   719  }
   720  
   721  // ListPodSandboxStats returns the list of pod sandbox stats given the filter
   722  func (r *remoteRuntimeService) ListPodSandboxStats(ctx context.Context, filter *runtimeapi.PodSandboxStatsFilter) ([]*runtimeapi.PodSandboxStats, error) {
   723  	klog.V(10).InfoS("[RemoteRuntimeService] ListPodSandboxStats", "filter", filter)
   724  	// Set timeout, because runtimes are able to cache disk stats results
   725  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   726  	defer cancel()
   727  
   728  	return r.listPodSandboxStatsV1(ctx, filter)
   729  }
   730  
   731  func (r *remoteRuntimeService) listPodSandboxStatsV1(ctx context.Context, filter *runtimeapi.PodSandboxStatsFilter) ([]*runtimeapi.PodSandboxStats, error) {
   732  	resp, err := r.runtimeClient.ListPodSandboxStats(ctx, &runtimeapi.ListPodSandboxStatsRequest{
   733  		Filter: filter,
   734  	})
   735  	if err != nil {
   736  		klog.ErrorS(err, "ListPodSandboxStats with filter from runtime service failed", "filter", filter)
   737  		return nil, err
   738  	}
   739  	klog.V(10).InfoS("[RemoteRuntimeService] ListPodSandboxStats Response", "filter", filter, "stats", resp.GetStats())
   740  
   741  	return resp.GetStats(), nil
   742  }
   743  
   744  // ReopenContainerLog reopens the container log file.
   745  func (r *remoteRuntimeService) ReopenContainerLog(ctx context.Context, containerID string) (err error) {
   746  	klog.V(10).InfoS("[RemoteRuntimeService] ReopenContainerLog", "containerID", containerID, "timeout", r.timeout)
   747  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   748  	defer cancel()
   749  
   750  	if _, err := r.runtimeClient.ReopenContainerLog(ctx, &runtimeapi.ReopenContainerLogRequest{ContainerId: containerID}); err != nil {
   751  		klog.ErrorS(err, "ReopenContainerLog from runtime service failed", "containerID", containerID)
   752  		return err
   753  	}
   754  
   755  	klog.V(10).InfoS("[RemoteRuntimeService] ReopenContainerLog Response", "containerID", containerID)
   756  	return nil
   757  }
   758  
   759  // CheckpointContainer triggers a checkpoint of the given CheckpointContainerRequest
   760  func (r *remoteRuntimeService) CheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error {
   761  	klog.V(10).InfoS(
   762  		"[RemoteRuntimeService] CheckpointContainer",
   763  		"options",
   764  		options,
   765  	)
   766  	if options == nil {
   767  		return errors.New("CheckpointContainer requires non-nil CheckpointRestoreOptions parameter")
   768  	}
   769  	if options.Timeout < 0 {
   770  		return errors.New("CheckpointContainer requires the timeout value to be > 0")
   771  	}
   772  
   773  	ctx, cancel := func(ctx context.Context) (context.Context, context.CancelFunc) {
   774  		defaultTimeout := int64(r.timeout / time.Second)
   775  		if options.Timeout > defaultTimeout {
   776  			// The user requested a specific timeout, let's use that if it
   777  			// is larger than the CRI default.
   778  			return context.WithTimeout(ctx, time.Duration(options.Timeout)*time.Second)
   779  		}
   780  		// If the user requested a timeout less than the
   781  		// CRI default, let's use the CRI default.
   782  		options.Timeout = defaultTimeout
   783  		return context.WithTimeout(ctx, r.timeout)
   784  	}(ctx)
   785  	defer cancel()
   786  
   787  	_, err := r.runtimeClient.CheckpointContainer(
   788  		ctx,
   789  		options,
   790  	)
   791  
   792  	if err != nil {
   793  		klog.ErrorS(
   794  			err,
   795  			"CheckpointContainer from runtime service failed",
   796  			"containerID",
   797  			options.ContainerId,
   798  		)
   799  		return err
   800  	}
   801  	klog.V(10).InfoS(
   802  		"[RemoteRuntimeService] CheckpointContainer Response",
   803  		"containerID",
   804  		options.ContainerId,
   805  	)
   806  
   807  	return nil
   808  }
   809  
   810  func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error {
   811  	containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(context.Background(), &runtimeapi.GetEventsRequest{})
   812  	if err != nil {
   813  		klog.ErrorS(err, "GetContainerEvents failed to get streaming client")
   814  		return err
   815  	}
   816  
   817  	// The connection is successfully established and we have a streaming client ready for use.
   818  	metrics.EventedPLEGConn.Inc()
   819  
   820  	for {
   821  		resp, err := containerEventsStreamingClient.Recv()
   822  		if err == io.EOF {
   823  			klog.ErrorS(err, "container events stream is closed")
   824  			return err
   825  		}
   826  		if err != nil {
   827  			klog.ErrorS(err, "failed to receive streaming container event")
   828  			return err
   829  		}
   830  		if resp != nil {
   831  			containerEventsCh <- resp
   832  			klog.V(4).InfoS("container event received", "resp", resp)
   833  		}
   834  	}
   835  }
   836  
   837  // ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics.
   838  func (r *remoteRuntimeService) ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error) {
   839  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   840  	defer cancel()
   841  
   842  	resp, err := r.runtimeClient.ListMetricDescriptors(ctx, &runtimeapi.ListMetricDescriptorsRequest{})
   843  	if err != nil {
   844  		klog.ErrorS(err, "ListMetricDescriptors from runtime service failed")
   845  		return nil, err
   846  	}
   847  	klog.V(10).InfoS("[RemoteRuntimeService] ListMetricDescriptors Response", "stats", resp.GetDescriptors())
   848  
   849  	return resp.GetDescriptors(), nil
   850  }
   851  
   852  // ListPodSandboxMetrics retrieves the metrics for all pod sandboxes.
   853  func (r *remoteRuntimeService) ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error) {
   854  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   855  	defer cancel()
   856  
   857  	resp, err := r.runtimeClient.ListPodSandboxMetrics(ctx, &runtimeapi.ListPodSandboxMetricsRequest{})
   858  	if err != nil {
   859  		klog.ErrorS(err, "ListPodSandboxMetrics from runtime service failed")
   860  		return nil, err
   861  	}
   862  	klog.V(10).InfoS("[RemoteRuntimeService] ListPodSandboxMetrics Response", "stats", resp.GetPodMetrics())
   863  
   864  	return resp.GetPodMetrics(), nil
   865  }
   866  
   867  // RuntimeConfig returns the configuration information of the runtime.
   868  func (r *remoteRuntimeService) RuntimeConfig(ctx context.Context) (*runtimeapi.RuntimeConfigResponse, error) {
   869  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   870  	defer cancel()
   871  
   872  	resp, err := r.runtimeClient.RuntimeConfig(ctx, &runtimeapi.RuntimeConfigRequest{})
   873  	if err != nil {
   874  		klog.ErrorS(err, "RuntimeConfig from runtime service failed")
   875  		return nil, err
   876  	}
   877  	klog.V(10).InfoS("[RemoteRuntimeService] RuntimeConfigResponse", "linuxConfig", resp.GetLinux())
   878  
   879  	return resp, nil
   880  }
   881  

View as plain text