...

Source file src/k8s.io/kubernetes/pkg/kubelet/cri/remote/remote_image.go

Documentation: k8s.io/kubernetes/pkg/kubelet/cri/remote

     1  /*
     2  Copyright 2016 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package remote
    18  
    19  import (
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"time"
    24  
    25  	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
    26  	"go.opentelemetry.io/otel/trace"
    27  	"google.golang.org/grpc"
    28  	"google.golang.org/grpc/backoff"
    29  	"google.golang.org/grpc/codes"
    30  	"google.golang.org/grpc/credentials/insecure"
    31  	"google.golang.org/grpc/status"
    32  	utilfeature "k8s.io/apiserver/pkg/util/feature"
    33  	tracing "k8s.io/component-base/tracing"
    34  	"k8s.io/klog/v2"
    35  
    36  	internalapi "k8s.io/cri-api/pkg/apis"
    37  	runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
    38  	"k8s.io/kubernetes/pkg/features"
    39  	"k8s.io/kubernetes/pkg/kubelet/util"
    40  )
    41  
    42  // remoteImageService is a gRPC implementation of internalapi.ImageManagerService.
    43  type remoteImageService struct {
    44  	timeout     time.Duration
    45  	imageClient runtimeapi.ImageServiceClient
    46  }
    47  
    48  // NewRemoteImageService creates a new internalapi.ImageManagerService.
    49  func NewRemoteImageService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.ImageManagerService, error) {
    50  	klog.V(3).InfoS("Connecting to image service", "endpoint", endpoint)
    51  	addr, dialer, err := util.GetAddressAndDialer(endpoint)
    52  	if err != nil {
    53  		return nil, err
    54  	}
    55  
    56  	ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
    57  	defer cancel()
    58  
    59  	var dialOpts []grpc.DialOption
    60  	dialOpts = append(dialOpts,
    61  		grpc.WithTransportCredentials(insecure.NewCredentials()),
    62  		grpc.WithContextDialer(dialer),
    63  		grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
    64  	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
    65  		tracingOpts := []otelgrpc.Option{
    66  			otelgrpc.WithPropagators(tracing.Propagators()),
    67  			otelgrpc.WithTracerProvider(tp),
    68  		}
    69  		// Even if there is no TracerProvider, the otelgrpc still handles context propagation.
    70  		// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
    71  		dialOpts = append(dialOpts,
    72  			grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
    73  			grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...)))
    74  	}
    75  
    76  	connParams := grpc.ConnectParams{
    77  		Backoff: backoff.DefaultConfig,
    78  	}
    79  	connParams.MinConnectTimeout = minConnectionTimeout
    80  	connParams.Backoff.BaseDelay = baseBackoffDelay
    81  	connParams.Backoff.MaxDelay = maxBackoffDelay
    82  	dialOpts = append(dialOpts,
    83  		grpc.WithConnectParams(connParams),
    84  	)
    85  
    86  	conn, err := grpc.DialContext(ctx, addr, dialOpts...)
    87  	if err != nil {
    88  		klog.ErrorS(err, "Connect remote image service failed", "address", addr)
    89  		return nil, err
    90  	}
    91  
    92  	service := &remoteImageService{timeout: connectionTimeout}
    93  	if err := service.validateServiceConnection(ctx, conn, endpoint); err != nil {
    94  		return nil, fmt.Errorf("validate service connection: %w", err)
    95  	}
    96  
    97  	return service, nil
    98  
    99  }
   100  
   101  // validateServiceConnection tries to connect to the remote image service by
   102  // using the CRI v1 API version and fails if that's not possible.
   103  func (r *remoteImageService) validateServiceConnection(ctx context.Context, conn *grpc.ClientConn, endpoint string) error {
   104  	klog.V(4).InfoS("Validating the CRI v1 API image version")
   105  	r.imageClient = runtimeapi.NewImageServiceClient(conn)
   106  
   107  	if _, err := r.imageClient.ImageFsInfo(ctx, &runtimeapi.ImageFsInfoRequest{}); err != nil {
   108  		return fmt.Errorf("validate CRI v1 image API for endpoint %q: %w", endpoint, err)
   109  	}
   110  
   111  	klog.V(2).InfoS("Validated CRI v1 image API")
   112  	return nil
   113  }
   114  
   115  // ListImages lists available images.
   116  func (r *remoteImageService) ListImages(ctx context.Context, filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error) {
   117  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   118  	defer cancel()
   119  
   120  	return r.listImagesV1(ctx, filter)
   121  }
   122  
   123  func (r *remoteImageService) listImagesV1(ctx context.Context, filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error) {
   124  	resp, err := r.imageClient.ListImages(ctx, &runtimeapi.ListImagesRequest{
   125  		Filter: filter,
   126  	})
   127  	if err != nil {
   128  		klog.ErrorS(err, "ListImages with filter from image service failed", "filter", filter)
   129  		return nil, err
   130  	}
   131  
   132  	return resp.Images, nil
   133  }
   134  
   135  // ImageStatus returns the status of the image.
   136  func (r *remoteImageService) ImageStatus(ctx context.Context, image *runtimeapi.ImageSpec, verbose bool) (*runtimeapi.ImageStatusResponse, error) {
   137  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   138  	defer cancel()
   139  
   140  	return r.imageStatusV1(ctx, image, verbose)
   141  }
   142  
   143  func (r *remoteImageService) imageStatusV1(ctx context.Context, image *runtimeapi.ImageSpec, verbose bool) (*runtimeapi.ImageStatusResponse, error) {
   144  	resp, err := r.imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
   145  		Image:   image,
   146  		Verbose: verbose,
   147  	})
   148  	if err != nil {
   149  		klog.ErrorS(err, "Get ImageStatus from image service failed", "image", image.Image)
   150  		return nil, err
   151  	}
   152  
   153  	if resp.Image != nil {
   154  		if resp.Image.Id == "" || resp.Image.Size_ == 0 {
   155  			errorMessage := fmt.Sprintf("Id or size of image %q is not set", image.Image)
   156  			err := errors.New(errorMessage)
   157  			klog.ErrorS(err, "ImageStatus failed", "image", image.Image)
   158  			return nil, err
   159  		}
   160  	}
   161  
   162  	return resp, nil
   163  }
   164  
   165  // PullImage pulls an image with authentication config.
   166  func (r *remoteImageService) PullImage(ctx context.Context, image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
   167  	ctx, cancel := context.WithCancel(ctx)
   168  	defer cancel()
   169  
   170  	return r.pullImageV1(ctx, image, auth, podSandboxConfig)
   171  }
   172  
   173  func (r *remoteImageService) pullImageV1(ctx context.Context, image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
   174  	resp, err := r.imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
   175  		Image:         image,
   176  		Auth:          auth,
   177  		SandboxConfig: podSandboxConfig,
   178  	})
   179  	if err != nil {
   180  		klog.ErrorS(err, "PullImage from image service failed", "image", image.Image)
   181  
   182  		// We can strip the code from unknown status errors since they add no value
   183  		// and will make them easier to read in the logs/events.
   184  		//
   185  		// It also ensures that checking custom error types from pkg/kubelet/images/types.go
   186  		// works in `imageManager.EnsureImageExists` (pkg/kubelet/images/image_manager.go).
   187  		statusErr, ok := status.FromError(err)
   188  		if ok && statusErr.Code() == codes.Unknown {
   189  			return "", errors.New(statusErr.Message())
   190  		}
   191  
   192  		return "", err
   193  	}
   194  
   195  	if resp.ImageRef == "" {
   196  		klog.ErrorS(errors.New("PullImage failed"), "ImageRef of image is not set", "image", image.Image)
   197  		errorMessage := fmt.Sprintf("imageRef of image %q is not set", image.Image)
   198  		return "", errors.New(errorMessage)
   199  	}
   200  
   201  	return resp.ImageRef, nil
   202  }
   203  
   204  // RemoveImage removes the image.
   205  func (r *remoteImageService) RemoveImage(ctx context.Context, image *runtimeapi.ImageSpec) (err error) {
   206  	ctx, cancel := context.WithTimeout(ctx, r.timeout)
   207  	defer cancel()
   208  
   209  	if _, err = r.imageClient.RemoveImage(ctx, &runtimeapi.RemoveImageRequest{
   210  		Image: image,
   211  	}); err != nil {
   212  		klog.ErrorS(err, "RemoveImage from image service failed", "image", image.Image)
   213  		return err
   214  	}
   215  
   216  	return nil
   217  }
   218  
   219  // ImageFsInfo returns information of the filesystem that is used to store images.
   220  func (r *remoteImageService) ImageFsInfo(ctx context.Context) (*runtimeapi.ImageFsInfoResponse, error) {
   221  	// Do not set timeout, because `ImageFsInfo` takes time.
   222  	// TODO(random-liu): Should we assume runtime should cache the result, and set timeout here?
   223  	ctx, cancel := context.WithCancel(ctx)
   224  	defer cancel()
   225  
   226  	return r.imageFsInfoV1(ctx)
   227  }
   228  
   229  func (r *remoteImageService) imageFsInfoV1(ctx context.Context) (*runtimeapi.ImageFsInfoResponse, error) {
   230  	resp, err := r.imageClient.ImageFsInfo(ctx, &runtimeapi.ImageFsInfoRequest{})
   231  	if err != nil {
   232  		klog.ErrorS(err, "ImageFsInfo from image service failed")
   233  		return nil, err
   234  	}
   235  	return resp, nil
   236  }
   237  

View as plain text