1
16
17 package remote
18
19 import (
20 "context"
21 "errors"
22 "fmt"
23 "io"
24 "strings"
25 "time"
26
27 "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
28 "go.opentelemetry.io/otel/trace"
29 "google.golang.org/grpc"
30 "google.golang.org/grpc/backoff"
31 "google.golang.org/grpc/codes"
32 "google.golang.org/grpc/credentials/insecure"
33 "google.golang.org/grpc/status"
34 utilfeature "k8s.io/apiserver/pkg/util/feature"
35 "k8s.io/component-base/logs/logreduction"
36 tracing "k8s.io/component-base/tracing"
37 internalapi "k8s.io/cri-api/pkg/apis"
38 runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1"
39 "k8s.io/klog/v2"
40 "k8s.io/kubernetes/pkg/features"
41 "k8s.io/kubernetes/pkg/kubelet/metrics"
42 "k8s.io/kubernetes/pkg/kubelet/util"
43 "k8s.io/kubernetes/pkg/probe/exec"
44
45 utilexec "k8s.io/utils/exec"
46 )
47
48
49 type remoteRuntimeService struct {
50 timeout time.Duration
51 runtimeClient runtimeapi.RuntimeServiceClient
52
53 logReduction *logreduction.LogReduction
54 }
55
56 const (
57
58 identicalErrorDelay = 1 * time.Minute
59
60
61 maxBackoffDelay = 3 * time.Second
62 baseBackoffDelay = 100 * time.Millisecond
63 minConnectionTimeout = 5 * time.Second
64 )
65
66
67
68 type CRIVersion string
69
70
71 var ErrContainerStatusNil = errors.New("container status is nil")
72
73 const (
74
75 CRIVersionV1 CRIVersion = "v1"
76 )
77
78
79 func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.RuntimeService, error) {
80 klog.V(3).InfoS("Connecting to runtime service", "endpoint", endpoint)
81 addr, dialer, err := util.GetAddressAndDialer(endpoint)
82 if err != nil {
83 return nil, err
84 }
85 ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
86 defer cancel()
87
88 var dialOpts []grpc.DialOption
89 dialOpts = append(dialOpts,
90 grpc.WithTransportCredentials(insecure.NewCredentials()),
91 grpc.WithContextDialer(dialer),
92 grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
93 if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
94 tracingOpts := []otelgrpc.Option{
95 otelgrpc.WithPropagators(tracing.Propagators()),
96 otelgrpc.WithTracerProvider(tp),
97 }
98
99
100 dialOpts = append(dialOpts,
101 grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
102 grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...)))
103 }
104
105 connParams := grpc.ConnectParams{
106 Backoff: backoff.DefaultConfig,
107 }
108 connParams.MinConnectTimeout = minConnectionTimeout
109 connParams.Backoff.BaseDelay = baseBackoffDelay
110 connParams.Backoff.MaxDelay = maxBackoffDelay
111 dialOpts = append(dialOpts,
112 grpc.WithConnectParams(connParams),
113 )
114
115 conn, err := grpc.DialContext(ctx, addr, dialOpts...)
116 if err != nil {
117 klog.ErrorS(err, "Connect remote runtime failed", "address", addr)
118 return nil, err
119 }
120
121 service := &remoteRuntimeService{
122 timeout: connectionTimeout,
123 logReduction: logreduction.NewLogReduction(identicalErrorDelay),
124 }
125
126 if err := service.validateServiceConnection(ctx, conn, endpoint); err != nil {
127 return nil, fmt.Errorf("validate service connection: %w", err)
128 }
129
130 return service, nil
131 }
132
133
134
135 func (r *remoteRuntimeService) validateServiceConnection(ctx context.Context, conn *grpc.ClientConn, endpoint string) error {
136 klog.V(4).InfoS("Validating the CRI v1 API runtime version")
137 r.runtimeClient = runtimeapi.NewRuntimeServiceClient(conn)
138
139 if _, err := r.runtimeClient.Version(ctx, &runtimeapi.VersionRequest{}); err != nil {
140 return fmt.Errorf("validate CRI v1 runtime API for endpoint %q: %w", endpoint, err)
141 }
142
143 klog.V(2).InfoS("Validated CRI v1 runtime API")
144 return nil
145 }
146
147
148 func (r *remoteRuntimeService) Version(ctx context.Context, apiVersion string) (*runtimeapi.VersionResponse, error) {
149 klog.V(10).InfoS("[RemoteRuntimeService] Version", "apiVersion", apiVersion, "timeout", r.timeout)
150
151 ctx, cancel := context.WithTimeout(ctx, r.timeout)
152 defer cancel()
153
154 return r.versionV1(ctx, apiVersion)
155 }
156
157 func (r *remoteRuntimeService) versionV1(ctx context.Context, apiVersion string) (*runtimeapi.VersionResponse, error) {
158 typedVersion, err := r.runtimeClient.Version(ctx, &runtimeapi.VersionRequest{
159 Version: apiVersion,
160 })
161 if err != nil {
162 klog.ErrorS(err, "Version from runtime service failed")
163 return nil, err
164 }
165
166 klog.V(10).InfoS("[RemoteRuntimeService] Version Response", "apiVersion", typedVersion)
167
168 if typedVersion.Version == "" || typedVersion.RuntimeName == "" || typedVersion.RuntimeApiVersion == "" || typedVersion.RuntimeVersion == "" {
169 return nil, fmt.Errorf("not all fields are set in VersionResponse (%q)", *typedVersion)
170 }
171
172 return typedVersion, err
173 }
174
175
176
177 func (r *remoteRuntimeService) RunPodSandbox(ctx context.Context, config *runtimeapi.PodSandboxConfig, runtimeHandler string) (string, error) {
178
179
180 timeout := r.timeout * 2
181
182 klog.V(10).InfoS("[RemoteRuntimeService] RunPodSandbox", "config", config, "runtimeHandler", runtimeHandler, "timeout", timeout)
183
184 ctx, cancel := context.WithTimeout(ctx, timeout)
185 defer cancel()
186
187 resp, err := r.runtimeClient.RunPodSandbox(ctx, &runtimeapi.RunPodSandboxRequest{
188 Config: config,
189 RuntimeHandler: runtimeHandler,
190 })
191
192 if err != nil {
193 klog.ErrorS(err, "RunPodSandbox from runtime service failed")
194 return "", err
195 }
196
197 podSandboxID := resp.PodSandboxId
198
199 if podSandboxID == "" {
200 errorMessage := fmt.Sprintf("PodSandboxId is not set for sandbox %q", config.Metadata)
201 err := errors.New(errorMessage)
202 klog.ErrorS(err, "RunPodSandbox failed")
203 return "", err
204 }
205
206 klog.V(10).InfoS("[RemoteRuntimeService] RunPodSandbox Response", "podSandboxID", podSandboxID)
207
208 return podSandboxID, nil
209 }
210
211
212
213 func (r *remoteRuntimeService) StopPodSandbox(ctx context.Context, podSandBoxID string) (err error) {
214 klog.V(10).InfoS("[RemoteRuntimeService] StopPodSandbox", "podSandboxID", podSandBoxID, "timeout", r.timeout)
215
216 ctx, cancel := context.WithTimeout(ctx, r.timeout)
217 defer cancel()
218
219 if _, err := r.runtimeClient.StopPodSandbox(ctx, &runtimeapi.StopPodSandboxRequest{
220 PodSandboxId: podSandBoxID,
221 }); err != nil {
222 klog.ErrorS(err, "StopPodSandbox from runtime service failed", "podSandboxID", podSandBoxID)
223 return err
224 }
225
226 klog.V(10).InfoS("[RemoteRuntimeService] StopPodSandbox Response", "podSandboxID", podSandBoxID)
227
228 return nil
229 }
230
231
232
233 func (r *remoteRuntimeService) RemovePodSandbox(ctx context.Context, podSandBoxID string) (err error) {
234 klog.V(10).InfoS("[RemoteRuntimeService] RemovePodSandbox", "podSandboxID", podSandBoxID, "timeout", r.timeout)
235 ctx, cancel := context.WithTimeout(ctx, r.timeout)
236 defer cancel()
237
238 if _, err := r.runtimeClient.RemovePodSandbox(ctx, &runtimeapi.RemovePodSandboxRequest{
239 PodSandboxId: podSandBoxID,
240 }); err != nil {
241 klog.ErrorS(err, "RemovePodSandbox from runtime service failed", "podSandboxID", podSandBoxID)
242 return err
243 }
244
245 klog.V(10).InfoS("[RemoteRuntimeService] RemovePodSandbox Response", "podSandboxID", podSandBoxID)
246
247 return nil
248 }
249
250
251 func (r *remoteRuntimeService) PodSandboxStatus(ctx context.Context, podSandBoxID string, verbose bool) (*runtimeapi.PodSandboxStatusResponse, error) {
252 klog.V(10).InfoS("[RemoteRuntimeService] PodSandboxStatus", "podSandboxID", podSandBoxID, "timeout", r.timeout)
253 ctx, cancel := context.WithTimeout(ctx, r.timeout)
254 defer cancel()
255
256 return r.podSandboxStatusV1(ctx, podSandBoxID, verbose)
257 }
258
259 func (r *remoteRuntimeService) podSandboxStatusV1(ctx context.Context, podSandBoxID string, verbose bool) (*runtimeapi.PodSandboxStatusResponse, error) {
260 resp, err := r.runtimeClient.PodSandboxStatus(ctx, &runtimeapi.PodSandboxStatusRequest{
261 PodSandboxId: podSandBoxID,
262 Verbose: verbose,
263 })
264 if err != nil {
265 return nil, err
266 }
267
268 klog.V(10).InfoS("[RemoteRuntimeService] PodSandboxStatus Response", "podSandboxID", podSandBoxID, "status", resp.Status)
269
270 status := resp.Status
271 if resp.Status != nil {
272 if err := verifySandboxStatus(status); err != nil {
273 return nil, err
274 }
275 }
276
277 return resp, nil
278 }
279
280
281 func (r *remoteRuntimeService) ListPodSandbox(ctx context.Context, filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) {
282 klog.V(10).InfoS("[RemoteRuntimeService] ListPodSandbox", "filter", filter, "timeout", r.timeout)
283 ctx, cancel := context.WithTimeout(ctx, r.timeout)
284 defer cancel()
285
286 return r.listPodSandboxV1(ctx, filter)
287 }
288
289 func (r *remoteRuntimeService) listPodSandboxV1(ctx context.Context, filter *runtimeapi.PodSandboxFilter) ([]*runtimeapi.PodSandbox, error) {
290 resp, err := r.runtimeClient.ListPodSandbox(ctx, &runtimeapi.ListPodSandboxRequest{
291 Filter: filter,
292 })
293 if err != nil {
294 klog.ErrorS(err, "ListPodSandbox with filter from runtime service failed", "filter", filter)
295 return nil, err
296 }
297
298 klog.V(10).InfoS("[RemoteRuntimeService] ListPodSandbox Response", "filter", filter, "items", resp.Items)
299
300 return resp.Items, nil
301 }
302
303
304 func (r *remoteRuntimeService) CreateContainer(ctx context.Context, podSandBoxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
305 klog.V(10).InfoS("[RemoteRuntimeService] CreateContainer", "podSandboxID", podSandBoxID, "timeout", r.timeout)
306 ctx, cancel := context.WithTimeout(ctx, r.timeout)
307 defer cancel()
308
309 return r.createContainerV1(ctx, podSandBoxID, config, sandboxConfig)
310 }
311
312 func (r *remoteRuntimeService) createContainerV1(ctx context.Context, podSandBoxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
313 resp, err := r.runtimeClient.CreateContainer(ctx, &runtimeapi.CreateContainerRequest{
314 PodSandboxId: podSandBoxID,
315 Config: config,
316 SandboxConfig: sandboxConfig,
317 })
318 if err != nil {
319 klog.ErrorS(err, "CreateContainer in sandbox from runtime service failed", "podSandboxID", podSandBoxID)
320 return "", err
321 }
322
323 klog.V(10).InfoS("[RemoteRuntimeService] CreateContainer", "podSandboxID", podSandBoxID, "containerID", resp.ContainerId)
324 if resp.ContainerId == "" {
325 errorMessage := fmt.Sprintf("ContainerId is not set for container %q", config.Metadata)
326 err := errors.New(errorMessage)
327 klog.ErrorS(err, "CreateContainer failed")
328 return "", err
329 }
330
331 return resp.ContainerId, nil
332 }
333
334
335 func (r *remoteRuntimeService) StartContainer(ctx context.Context, containerID string) (err error) {
336 klog.V(10).InfoS("[RemoteRuntimeService] StartContainer", "containerID", containerID, "timeout", r.timeout)
337 ctx, cancel := context.WithTimeout(ctx, r.timeout)
338 defer cancel()
339
340 if _, err := r.runtimeClient.StartContainer(ctx, &runtimeapi.StartContainerRequest{
341 ContainerId: containerID,
342 }); err != nil {
343 klog.ErrorS(err, "StartContainer from runtime service failed", "containerID", containerID)
344 return err
345 }
346 klog.V(10).InfoS("[RemoteRuntimeService] StartContainer Response", "containerID", containerID)
347
348 return nil
349 }
350
351
352 func (r *remoteRuntimeService) StopContainer(ctx context.Context, containerID string, timeout int64) (err error) {
353 klog.V(10).InfoS("[RemoteRuntimeService] StopContainer", "containerID", containerID, "timeout", timeout)
354
355
356 t := r.timeout + time.Duration(timeout)*time.Second
357 ctx, cancel := context.WithTimeout(ctx, t)
358 defer cancel()
359
360 r.logReduction.ClearID(containerID)
361
362 if _, err := r.runtimeClient.StopContainer(ctx, &runtimeapi.StopContainerRequest{
363 ContainerId: containerID,
364 Timeout: timeout,
365 }); err != nil {
366 klog.ErrorS(err, "StopContainer from runtime service failed", "containerID", containerID)
367 return err
368 }
369 klog.V(10).InfoS("[RemoteRuntimeService] StopContainer Response", "containerID", containerID)
370
371 return nil
372 }
373
374
375
376 func (r *remoteRuntimeService) RemoveContainer(ctx context.Context, containerID string) (err error) {
377 klog.V(10).InfoS("[RemoteRuntimeService] RemoveContainer", "containerID", containerID, "timeout", r.timeout)
378 ctx, cancel := context.WithTimeout(ctx, r.timeout)
379 defer cancel()
380
381 r.logReduction.ClearID(containerID)
382 if _, err := r.runtimeClient.RemoveContainer(ctx, &runtimeapi.RemoveContainerRequest{
383 ContainerId: containerID,
384 }); err != nil {
385 klog.ErrorS(err, "RemoveContainer from runtime service failed", "containerID", containerID)
386 return err
387 }
388 klog.V(10).InfoS("[RemoteRuntimeService] RemoveContainer Response", "containerID", containerID)
389
390 return nil
391 }
392
393
394 func (r *remoteRuntimeService) ListContainers(ctx context.Context, filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) {
395 klog.V(10).InfoS("[RemoteRuntimeService] ListContainers", "filter", filter, "timeout", r.timeout)
396 ctx, cancel := context.WithTimeout(ctx, r.timeout)
397 defer cancel()
398
399 return r.listContainersV1(ctx, filter)
400 }
401
402 func (r *remoteRuntimeService) listContainersV1(ctx context.Context, filter *runtimeapi.ContainerFilter) ([]*runtimeapi.Container, error) {
403 resp, err := r.runtimeClient.ListContainers(ctx, &runtimeapi.ListContainersRequest{
404 Filter: filter,
405 })
406 if err != nil {
407 klog.ErrorS(err, "ListContainers with filter from runtime service failed", "filter", filter)
408 return nil, err
409 }
410 klog.V(10).InfoS("[RemoteRuntimeService] ListContainers Response", "filter", filter, "containers", resp.Containers)
411
412 return resp.Containers, nil
413 }
414
415
416 func (r *remoteRuntimeService) ContainerStatus(ctx context.Context, containerID string, verbose bool) (*runtimeapi.ContainerStatusResponse, error) {
417 klog.V(10).InfoS("[RemoteRuntimeService] ContainerStatus", "containerID", containerID, "timeout", r.timeout)
418 ctx, cancel := context.WithTimeout(ctx, r.timeout)
419 defer cancel()
420
421 return r.containerStatusV1(ctx, containerID, verbose)
422 }
423
424 func (r *remoteRuntimeService) containerStatusV1(ctx context.Context, containerID string, verbose bool) (*runtimeapi.ContainerStatusResponse, error) {
425 resp, err := r.runtimeClient.ContainerStatus(ctx, &runtimeapi.ContainerStatusRequest{
426 ContainerId: containerID,
427 Verbose: verbose,
428 })
429 if err != nil {
430
431 if r.logReduction.ShouldMessageBePrinted(err.Error(), containerID) {
432 klog.ErrorS(err, "ContainerStatus from runtime service failed", "containerID", containerID)
433 }
434 return nil, err
435 }
436 r.logReduction.ClearID(containerID)
437 klog.V(10).InfoS("[RemoteRuntimeService] ContainerStatus Response", "containerID", containerID, "status", resp.Status)
438
439 status := resp.Status
440 if resp.Status != nil {
441 if err := verifyContainerStatus(status); err != nil {
442 klog.ErrorS(err, "verify ContainerStatus failed", "containerID", containerID)
443 return nil, err
444 }
445 }
446
447 return resp, nil
448 }
449
450
451 func (r *remoteRuntimeService) UpdateContainerResources(ctx context.Context, containerID string, resources *runtimeapi.ContainerResources) (err error) {
452 klog.V(10).InfoS("[RemoteRuntimeService] UpdateContainerResources", "containerID", containerID, "timeout", r.timeout)
453 ctx, cancel := context.WithTimeout(ctx, r.timeout)
454 defer cancel()
455
456 if _, err := r.runtimeClient.UpdateContainerResources(ctx, &runtimeapi.UpdateContainerResourcesRequest{
457 ContainerId: containerID,
458 Linux: resources.GetLinux(),
459 Windows: resources.GetWindows(),
460 }); err != nil {
461 klog.ErrorS(err, "UpdateContainerResources from runtime service failed", "containerID", containerID)
462 return err
463 }
464 klog.V(10).InfoS("[RemoteRuntimeService] UpdateContainerResources Response", "containerID", containerID)
465
466 return nil
467 }
468
469
470
471 func (r *remoteRuntimeService) ExecSync(ctx context.Context, containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
472 klog.V(10).InfoS("[RemoteRuntimeService] ExecSync", "containerID", containerID, "timeout", timeout)
473
474 var cancel context.CancelFunc
475 if timeout != 0 {
476
477
478 ctx, cancel = context.WithTimeout(ctx, r.timeout+timeout)
479 } else {
480 ctx, cancel = context.WithCancel(ctx)
481 }
482 defer cancel()
483
484 return r.execSyncV1(ctx, containerID, cmd, timeout)
485 }
486
487 func (r *remoteRuntimeService) execSyncV1(ctx context.Context, containerID string, cmd []string, timeout time.Duration) (stdout []byte, stderr []byte, err error) {
488 timeoutSeconds := int64(timeout.Seconds())
489 req := &runtimeapi.ExecSyncRequest{
490 ContainerId: containerID,
491 Cmd: cmd,
492 Timeout: timeoutSeconds,
493 }
494 resp, err := r.runtimeClient.ExecSync(ctx, req)
495 if err != nil {
496 klog.ErrorS(err, "ExecSync cmd from runtime service failed", "containerID", containerID, "cmd", cmd)
497
498
499 if status.Code(err) == codes.DeadlineExceeded {
500 err = exec.NewTimeoutError(fmt.Errorf("command %q timed out", strings.Join(cmd, " ")), timeout)
501 }
502
503 return nil, nil, err
504 }
505
506 klog.V(10).InfoS("[RemoteRuntimeService] ExecSync Response", "containerID", containerID, "exitCode", resp.ExitCode)
507 err = nil
508 if resp.ExitCode != 0 {
509 err = utilexec.CodeExitError{
510 Err: fmt.Errorf("command '%s' exited with %d: %s", strings.Join(cmd, " "), resp.ExitCode, resp.Stderr),
511 Code: int(resp.ExitCode),
512 }
513 }
514
515 return resp.Stdout, resp.Stderr, err
516 }
517
518
519 func (r *remoteRuntimeService) Exec(ctx context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
520 klog.V(10).InfoS("[RemoteRuntimeService] Exec", "timeout", r.timeout)
521 ctx, cancel := context.WithTimeout(ctx, r.timeout)
522 defer cancel()
523
524 return r.execV1(ctx, req)
525 }
526
527 func (r *remoteRuntimeService) execV1(ctx context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) {
528 resp, err := r.runtimeClient.Exec(ctx, req)
529 if err != nil {
530 klog.ErrorS(err, "Exec cmd from runtime service failed", "containerID", req.ContainerId, "cmd", req.Cmd)
531 return nil, err
532 }
533 klog.V(10).InfoS("[RemoteRuntimeService] Exec Response")
534
535 if resp.Url == "" {
536 errorMessage := "URL is not set"
537 err := errors.New(errorMessage)
538 klog.ErrorS(err, "Exec failed")
539 return nil, err
540 }
541
542 return resp, nil
543 }
544
545
546 func (r *remoteRuntimeService) Attach(ctx context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
547 klog.V(10).InfoS("[RemoteRuntimeService] Attach", "containerID", req.ContainerId, "timeout", r.timeout)
548 ctx, cancel := context.WithTimeout(ctx, r.timeout)
549 defer cancel()
550
551 return r.attachV1(ctx, req)
552 }
553
554 func (r *remoteRuntimeService) attachV1(ctx context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) {
555 resp, err := r.runtimeClient.Attach(ctx, req)
556 if err != nil {
557 klog.ErrorS(err, "Attach container from runtime service failed", "containerID", req.ContainerId)
558 return nil, err
559 }
560 klog.V(10).InfoS("[RemoteRuntimeService] Attach Response", "containerID", req.ContainerId)
561
562 if resp.Url == "" {
563 errorMessage := "URL is not set"
564 err := errors.New(errorMessage)
565 klog.ErrorS(err, "Attach failed")
566 return nil, err
567 }
568 return resp, nil
569 }
570
571
572 func (r *remoteRuntimeService) PortForward(ctx context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
573 klog.V(10).InfoS("[RemoteRuntimeService] PortForward", "podSandboxID", req.PodSandboxId, "port", req.Port, "timeout", r.timeout)
574 ctx, cancel := context.WithTimeout(ctx, r.timeout)
575 defer cancel()
576
577 return r.portForwardV1(ctx, req)
578 }
579
580 func (r *remoteRuntimeService) portForwardV1(ctx context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) {
581 resp, err := r.runtimeClient.PortForward(ctx, req)
582 if err != nil {
583 klog.ErrorS(err, "PortForward from runtime service failed", "podSandboxID", req.PodSandboxId)
584 return nil, err
585 }
586 klog.V(10).InfoS("[RemoteRuntimeService] PortForward Response", "podSandboxID", req.PodSandboxId)
587
588 if resp.Url == "" {
589 errorMessage := "URL is not set"
590 err := errors.New(errorMessage)
591 klog.ErrorS(err, "PortForward failed")
592 return nil, err
593 }
594
595 return resp, nil
596 }
597
598
599
600
601 func (r *remoteRuntimeService) UpdateRuntimeConfig(ctx context.Context, runtimeConfig *runtimeapi.RuntimeConfig) (err error) {
602 klog.V(10).InfoS("[RemoteRuntimeService] UpdateRuntimeConfig", "runtimeConfig", runtimeConfig, "timeout", r.timeout)
603 ctx, cancel := context.WithTimeout(ctx, r.timeout)
604 defer cancel()
605
606
607
608
609 if _, err := r.runtimeClient.UpdateRuntimeConfig(ctx, &runtimeapi.UpdateRuntimeConfigRequest{
610 RuntimeConfig: runtimeConfig,
611 }); err != nil {
612 return err
613 }
614 klog.V(10).InfoS("[RemoteRuntimeService] UpdateRuntimeConfig Response", "runtimeConfig", runtimeConfig)
615
616 return nil
617 }
618
619
620 func (r *remoteRuntimeService) Status(ctx context.Context, verbose bool) (*runtimeapi.StatusResponse, error) {
621 klog.V(10).InfoS("[RemoteRuntimeService] Status", "timeout", r.timeout)
622 ctx, cancel := context.WithTimeout(ctx, r.timeout)
623 defer cancel()
624
625 return r.statusV1(ctx, verbose)
626 }
627
628 func (r *remoteRuntimeService) statusV1(ctx context.Context, verbose bool) (*runtimeapi.StatusResponse, error) {
629 resp, err := r.runtimeClient.Status(ctx, &runtimeapi.StatusRequest{
630 Verbose: verbose,
631 })
632 if err != nil {
633 klog.ErrorS(err, "Status from runtime service failed")
634 return nil, err
635 }
636
637 klog.V(10).InfoS("[RemoteRuntimeService] Status Response", "status", resp.Status)
638
639 if resp.Status == nil || len(resp.Status.Conditions) < 2 {
640 errorMessage := "RuntimeReady or NetworkReady condition are not set"
641 err := errors.New(errorMessage)
642 klog.ErrorS(err, "Status failed")
643 return nil, err
644 }
645
646 return resp, nil
647 }
648
649
650 func (r *remoteRuntimeService) ContainerStats(ctx context.Context, containerID string) (*runtimeapi.ContainerStats, error) {
651 klog.V(10).InfoS("[RemoteRuntimeService] ContainerStats", "containerID", containerID, "timeout", r.timeout)
652 ctx, cancel := context.WithTimeout(ctx, r.timeout)
653 defer cancel()
654
655 return r.containerStatsV1(ctx, containerID)
656 }
657
658 func (r *remoteRuntimeService) containerStatsV1(ctx context.Context, containerID string) (*runtimeapi.ContainerStats, error) {
659 resp, err := r.runtimeClient.ContainerStats(ctx, &runtimeapi.ContainerStatsRequest{
660 ContainerId: containerID,
661 })
662 if err != nil {
663 if r.logReduction.ShouldMessageBePrinted(err.Error(), containerID) {
664 klog.ErrorS(err, "ContainerStats from runtime service failed", "containerID", containerID)
665 }
666 return nil, err
667 }
668 r.logReduction.ClearID(containerID)
669 klog.V(10).InfoS("[RemoteRuntimeService] ContainerStats Response", "containerID", containerID, "stats", resp.GetStats())
670
671 return resp.GetStats(), nil
672 }
673
674
675 func (r *remoteRuntimeService) ListContainerStats(ctx context.Context, filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) {
676 klog.V(10).InfoS("[RemoteRuntimeService] ListContainerStats", "filter", filter)
677 ctx, cancel := context.WithTimeout(ctx, r.timeout)
678 defer cancel()
679
680 return r.listContainerStatsV1(ctx, filter)
681 }
682
683 func (r *remoteRuntimeService) listContainerStatsV1(ctx context.Context, filter *runtimeapi.ContainerStatsFilter) ([]*runtimeapi.ContainerStats, error) {
684 resp, err := r.runtimeClient.ListContainerStats(ctx, &runtimeapi.ListContainerStatsRequest{
685 Filter: filter,
686 })
687 if err != nil {
688 klog.ErrorS(err, "ListContainerStats with filter from runtime service failed", "filter", filter)
689 return nil, err
690 }
691 klog.V(10).InfoS("[RemoteRuntimeService] ListContainerStats Response", "filter", filter, "stats", resp.GetStats())
692
693 return resp.GetStats(), nil
694 }
695
696
697 func (r *remoteRuntimeService) PodSandboxStats(ctx context.Context, podSandboxID string) (*runtimeapi.PodSandboxStats, error) {
698 klog.V(10).InfoS("[RemoteRuntimeService] PodSandboxStats", "podSandboxID", podSandboxID, "timeout", r.timeout)
699 ctx, cancel := context.WithTimeout(ctx, r.timeout)
700 defer cancel()
701
702 return r.podSandboxStatsV1(ctx, podSandboxID)
703 }
704
705 func (r *remoteRuntimeService) podSandboxStatsV1(ctx context.Context, podSandboxID string) (*runtimeapi.PodSandboxStats, error) {
706 resp, err := r.runtimeClient.PodSandboxStats(ctx, &runtimeapi.PodSandboxStatsRequest{
707 PodSandboxId: podSandboxID,
708 })
709 if err != nil {
710 if r.logReduction.ShouldMessageBePrinted(err.Error(), podSandboxID) {
711 klog.ErrorS(err, "PodSandbox from runtime service failed", "podSandboxID", podSandboxID)
712 }
713 return nil, err
714 }
715 r.logReduction.ClearID(podSandboxID)
716 klog.V(10).InfoS("[RemoteRuntimeService] PodSandbox Response", "podSandboxID", podSandboxID, "stats", resp.GetStats())
717
718 return resp.GetStats(), nil
719 }
720
721
722 func (r *remoteRuntimeService) ListPodSandboxStats(ctx context.Context, filter *runtimeapi.PodSandboxStatsFilter) ([]*runtimeapi.PodSandboxStats, error) {
723 klog.V(10).InfoS("[RemoteRuntimeService] ListPodSandboxStats", "filter", filter)
724
725 ctx, cancel := context.WithTimeout(ctx, r.timeout)
726 defer cancel()
727
728 return r.listPodSandboxStatsV1(ctx, filter)
729 }
730
731 func (r *remoteRuntimeService) listPodSandboxStatsV1(ctx context.Context, filter *runtimeapi.PodSandboxStatsFilter) ([]*runtimeapi.PodSandboxStats, error) {
732 resp, err := r.runtimeClient.ListPodSandboxStats(ctx, &runtimeapi.ListPodSandboxStatsRequest{
733 Filter: filter,
734 })
735 if err != nil {
736 klog.ErrorS(err, "ListPodSandboxStats with filter from runtime service failed", "filter", filter)
737 return nil, err
738 }
739 klog.V(10).InfoS("[RemoteRuntimeService] ListPodSandboxStats Response", "filter", filter, "stats", resp.GetStats())
740
741 return resp.GetStats(), nil
742 }
743
744
745 func (r *remoteRuntimeService) ReopenContainerLog(ctx context.Context, containerID string) (err error) {
746 klog.V(10).InfoS("[RemoteRuntimeService] ReopenContainerLog", "containerID", containerID, "timeout", r.timeout)
747 ctx, cancel := context.WithTimeout(ctx, r.timeout)
748 defer cancel()
749
750 if _, err := r.runtimeClient.ReopenContainerLog(ctx, &runtimeapi.ReopenContainerLogRequest{ContainerId: containerID}); err != nil {
751 klog.ErrorS(err, "ReopenContainerLog from runtime service failed", "containerID", containerID)
752 return err
753 }
754
755 klog.V(10).InfoS("[RemoteRuntimeService] ReopenContainerLog Response", "containerID", containerID)
756 return nil
757 }
758
759
760 func (r *remoteRuntimeService) CheckpointContainer(ctx context.Context, options *runtimeapi.CheckpointContainerRequest) error {
761 klog.V(10).InfoS(
762 "[RemoteRuntimeService] CheckpointContainer",
763 "options",
764 options,
765 )
766 if options == nil {
767 return errors.New("CheckpointContainer requires non-nil CheckpointRestoreOptions parameter")
768 }
769 if options.Timeout < 0 {
770 return errors.New("CheckpointContainer requires the timeout value to be > 0")
771 }
772
773 ctx, cancel := func(ctx context.Context) (context.Context, context.CancelFunc) {
774 defaultTimeout := int64(r.timeout / time.Second)
775 if options.Timeout > defaultTimeout {
776
777
778 return context.WithTimeout(ctx, time.Duration(options.Timeout)*time.Second)
779 }
780
781
782 options.Timeout = defaultTimeout
783 return context.WithTimeout(ctx, r.timeout)
784 }(ctx)
785 defer cancel()
786
787 _, err := r.runtimeClient.CheckpointContainer(
788 ctx,
789 options,
790 )
791
792 if err != nil {
793 klog.ErrorS(
794 err,
795 "CheckpointContainer from runtime service failed",
796 "containerID",
797 options.ContainerId,
798 )
799 return err
800 }
801 klog.V(10).InfoS(
802 "[RemoteRuntimeService] CheckpointContainer Response",
803 "containerID",
804 options.ContainerId,
805 )
806
807 return nil
808 }
809
810 func (r *remoteRuntimeService) GetContainerEvents(containerEventsCh chan *runtimeapi.ContainerEventResponse) error {
811 containerEventsStreamingClient, err := r.runtimeClient.GetContainerEvents(context.Background(), &runtimeapi.GetEventsRequest{})
812 if err != nil {
813 klog.ErrorS(err, "GetContainerEvents failed to get streaming client")
814 return err
815 }
816
817
818 metrics.EventedPLEGConn.Inc()
819
820 for {
821 resp, err := containerEventsStreamingClient.Recv()
822 if err == io.EOF {
823 klog.ErrorS(err, "container events stream is closed")
824 return err
825 }
826 if err != nil {
827 klog.ErrorS(err, "failed to receive streaming container event")
828 return err
829 }
830 if resp != nil {
831 containerEventsCh <- resp
832 klog.V(4).InfoS("container event received", "resp", resp)
833 }
834 }
835 }
836
837
838 func (r *remoteRuntimeService) ListMetricDescriptors(ctx context.Context) ([]*runtimeapi.MetricDescriptor, error) {
839 ctx, cancel := context.WithTimeout(ctx, r.timeout)
840 defer cancel()
841
842 resp, err := r.runtimeClient.ListMetricDescriptors(ctx, &runtimeapi.ListMetricDescriptorsRequest{})
843 if err != nil {
844 klog.ErrorS(err, "ListMetricDescriptors from runtime service failed")
845 return nil, err
846 }
847 klog.V(10).InfoS("[RemoteRuntimeService] ListMetricDescriptors Response", "stats", resp.GetDescriptors())
848
849 return resp.GetDescriptors(), nil
850 }
851
852
853 func (r *remoteRuntimeService) ListPodSandboxMetrics(ctx context.Context) ([]*runtimeapi.PodSandboxMetrics, error) {
854 ctx, cancel := context.WithTimeout(ctx, r.timeout)
855 defer cancel()
856
857 resp, err := r.runtimeClient.ListPodSandboxMetrics(ctx, &runtimeapi.ListPodSandboxMetricsRequest{})
858 if err != nil {
859 klog.ErrorS(err, "ListPodSandboxMetrics from runtime service failed")
860 return nil, err
861 }
862 klog.V(10).InfoS("[RemoteRuntimeService] ListPodSandboxMetrics Response", "stats", resp.GetPodMetrics())
863
864 return resp.GetPodMetrics(), nil
865 }
866
867
868 func (r *remoteRuntimeService) RuntimeConfig(ctx context.Context) (*runtimeapi.RuntimeConfigResponse, error) {
869 ctx, cancel := context.WithTimeout(ctx, r.timeout)
870 defer cancel()
871
872 resp, err := r.runtimeClient.RuntimeConfig(ctx, &runtimeapi.RuntimeConfigRequest{})
873 if err != nil {
874 klog.ErrorS(err, "RuntimeConfig from runtime service failed")
875 return nil, err
876 }
877 klog.V(10).InfoS("[RemoteRuntimeService] RuntimeConfigResponse", "linuxConfig", resp.GetLinux())
878
879 return resp, nil
880 }
881
View as plain text