1
16
17 package remote
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "time"
24
25 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
26 "go.opentelemetry.io/otel/trace"
27 "google.golang.org/grpc"
28 "google.golang.org/grpc/backoff"
29 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/credentials/insecure"
31 "google.golang.org/grpc/status"
32 utilfeature "k8s.io/apiserver/pkg/util/feature"
33 tracing "k8s.io/component-base/tracing"
34 "k8s.io/klog/v2"
35
36 internalapi "k8s.io/cri-api/pkg/apis"
37 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
38 "k8s.io/kubernetes/pkg/features"
39 "k8s.io/kubernetes/pkg/kubelet/util"
40 )
41
42
43 type remoteImageService struct {
44 timeout time.Duration
45 imageClient runtimeapi.ImageServiceClient
46 }
47
48
49 func NewRemoteImageService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.ImageManagerService, error) {
50 klog.V(3).InfoS("Connecting to image service", "endpoint", endpoint)
51 addr, dialer, err := util.GetAddressAndDialer(endpoint)
52 if err != nil {
53 return nil, err
54 }
55
56 ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
57 defer cancel()
58
59 var dialOpts []grpc.DialOption
60 dialOpts = append(dialOpts,
61 grpc.WithTransportCredentials(insecure.NewCredentials()),
62 grpc.WithContextDialer(dialer),
63 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
64 if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
65 tracingOpts := []otelgrpc.Option{
66 otelgrpc.WithPropagators(tracing.Propagators()),
67 otelgrpc.WithTracerProvider(tp),
68 }
69
70
71 dialOpts = append(dialOpts,
72 grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
73 grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...)))
74 }
75
76 connParams := grpc.ConnectParams{
77 Backoff: backoff.DefaultConfig,
78 }
79 connParams.MinConnectTimeout = minConnectionTimeout
80 connParams.Backoff.BaseDelay = baseBackoffDelay
81 connParams.Backoff.MaxDelay = maxBackoffDelay
82 dialOpts = append(dialOpts,
83 grpc.WithConnectParams(connParams),
84 )
85
86 conn, err := grpc.DialContext(ctx, addr, dialOpts...)
87 if err != nil {
88 klog.ErrorS(err, "Connect remote image service failed", "address", addr)
89 return nil, err
90 }
91
92 service := &remoteImageService{timeout: connectionTimeout}
93 if err := service.validateServiceConnection(ctx, conn, endpoint); err != nil {
94 return nil, fmt.Errorf("validate service connection: %w", err)
95 }
96
97 return service, nil
98
99 }
100
101
102
103 func (r *remoteImageService) validateServiceConnection(ctx context.Context, conn *grpc.ClientConn, endpoint string) error {
104 klog.V(4).InfoS("Validating the CRI v1 API image version")
105 r.imageClient = runtimeapi.NewImageServiceClient(conn)
106
107 if _, err := r.imageClient.ImageFsInfo(ctx, &runtimeapi.ImageFsInfoRequest{}); err != nil {
108 return fmt.Errorf("validate CRI v1 image API for endpoint %q: %w", endpoint, err)
109 }
110
111 klog.V(2).InfoS("Validated CRI v1 image API")
112 return nil
113 }
114
115
116 func (r *remoteImageService) ListImages(ctx context.Context, filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error) {
117 ctx, cancel := context.WithTimeout(ctx, r.timeout)
118 defer cancel()
119
120 return r.listImagesV1(ctx, filter)
121 }
122
123 func (r *remoteImageService) listImagesV1(ctx context.Context, filter *runtimeapi.ImageFilter) ([]*runtimeapi.Image, error) {
124 resp, err := r.imageClient.ListImages(ctx, &runtimeapi.ListImagesRequest{
125 Filter: filter,
126 })
127 if err != nil {
128 klog.ErrorS(err, "ListImages with filter from image service failed", "filter", filter)
129 return nil, err
130 }
131
132 return resp.Images, nil
133 }
134
135
136 func (r *remoteImageService) ImageStatus(ctx context.Context, image *runtimeapi.ImageSpec, verbose bool) (*runtimeapi.ImageStatusResponse, error) {
137 ctx, cancel := context.WithTimeout(ctx, r.timeout)
138 defer cancel()
139
140 return r.imageStatusV1(ctx, image, verbose)
141 }
142
143 func (r *remoteImageService) imageStatusV1(ctx context.Context, image *runtimeapi.ImageSpec, verbose bool) (*runtimeapi.ImageStatusResponse, error) {
144 resp, err := r.imageClient.ImageStatus(ctx, &runtimeapi.ImageStatusRequest{
145 Image: image,
146 Verbose: verbose,
147 })
148 if err != nil {
149 klog.ErrorS(err, "Get ImageStatus from image service failed", "image", image.Image)
150 return nil, err
151 }
152
153 if resp.Image != nil {
154 if resp.Image.Id == "" || resp.Image.Size_ == 0 {
155 errorMessage := fmt.Sprintf("Id or size of image %q is not set", image.Image)
156 err := errors.New(errorMessage)
157 klog.ErrorS(err, "ImageStatus failed", "image", image.Image)
158 return nil, err
159 }
160 }
161
162 return resp, nil
163 }
164
165
166 func (r *remoteImageService) PullImage(ctx context.Context, image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
167 ctx, cancel := context.WithCancel(ctx)
168 defer cancel()
169
170 return r.pullImageV1(ctx, image, auth, podSandboxConfig)
171 }
172
173 func (r *remoteImageService) pullImageV1(ctx context.Context, image *runtimeapi.ImageSpec, auth *runtimeapi.AuthConfig, podSandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
174 resp, err := r.imageClient.PullImage(ctx, &runtimeapi.PullImageRequest{
175 Image: image,
176 Auth: auth,
177 SandboxConfig: podSandboxConfig,
178 })
179 if err != nil {
180 klog.ErrorS(err, "PullImage from image service failed", "image", image.Image)
181
182
183
184
185
186
187 statusErr, ok := status.FromError(err)
188 if ok && statusErr.Code() == codes.Unknown {
189 return "", errors.New(statusErr.Message())
190 }
191
192 return "", err
193 }
194
195 if resp.ImageRef == "" {
196 klog.ErrorS(errors.New("PullImage failed"), "ImageRef of image is not set", "image", image.Image)
197 errorMessage := fmt.Sprintf("imageRef of image %q is not set", image.Image)
198 return "", errors.New(errorMessage)
199 }
200
201 return resp.ImageRef, nil
202 }
203
204
205 func (r *remoteImageService) RemoveImage(ctx context.Context, image *runtimeapi.ImageSpec) (err error) {
206 ctx, cancel := context.WithTimeout(ctx, r.timeout)
207 defer cancel()
208
209 if _, err = r.imageClient.RemoveImage(ctx, &runtimeapi.RemoveImageRequest{
210 Image: image,
211 }); err != nil {
212 klog.ErrorS(err, "RemoveImage from image service failed", "image", image.Image)
213 return err
214 }
215
216 return nil
217 }
218
219
220 func (r *remoteImageService) ImageFsInfo(ctx context.Context) (*runtimeapi.ImageFsInfoResponse, error) {
221
222
223 ctx, cancel := context.WithCancel(ctx)
224 defer cancel()
225
226 return r.imageFsInfoV1(ctx)
227 }
228
229 func (r *remoteImageService) imageFsInfoV1(ctx context.Context) (*runtimeapi.ImageFsInfoResponse, error) {
230 resp, err := r.imageClient.ImageFsInfo(ctx, &runtimeapi.ImageFsInfoRequest{})
231 if err != nil {
232 klog.ErrorS(err, "ImageFsInfo from image service failed")
233 return nil, err
234 }
235 return resp, nil
236 }
237
View as plain text