...

Source file src/k8s.io/kubernetes/test/e2e_node/checkpoint_container.go

Documentation: k8s.io/kubernetes/test/e2e_node

     1  /*
     2  Copyright 2022 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package e2enode
    18  
    19  import (
    20  	"archive/tar"
    21  	"context"
    22  	"encoding/json"
    23  	"fmt"
    24  	"io"
    25  	"net/http"
    26  	"os"
    27  	"strings"
    28  	"time"
    29  
    30  	"github.com/onsi/ginkgo/v2"
    31  	v1 "k8s.io/api/core/v1"
    32  	apierrors "k8s.io/apimachinery/pkg/api/errors"
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	clientset "k8s.io/client-go/kubernetes"
    35  	restclient "k8s.io/client-go/rest"
    36  	"k8s.io/kubernetes/test/e2e/framework"
    37  	e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
    38  	e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
    39  	"k8s.io/kubernetes/test/e2e/nodefeature"
    40  	testutils "k8s.io/kubernetes/test/utils"
    41  	imageutils "k8s.io/kubernetes/test/utils/image"
    42  	admissionapi "k8s.io/pod-security-admission/api"
    43  
    44  	"github.com/onsi/gomega"
    45  )
    46  
    47  const (
    48  	// timeout for proxy requests.
    49  	proxyTimeout = 2 * time.Minute
    50  )
    51  
    52  type checkpointResult struct {
    53  	Items []string `json:"items"`
    54  }
    55  
    56  // proxyPostRequest performs a post on a node proxy endpoint given the nodename and rest client.
    57  func proxyPostRequest(ctx context.Context, c clientset.Interface, node, endpoint string, port int) (restclient.Result, error) {
    58  	// proxy tends to hang in some cases when Node is not ready. Add an artificial timeout for this call. #22165
    59  	var result restclient.Result
    60  	finished := make(chan struct{}, 1)
    61  	go func() {
    62  		result = c.CoreV1().RESTClient().Post().
    63  			Resource("nodes").
    64  			SubResource("proxy").
    65  			Name(fmt.Sprintf("%v:%v", node, port)).
    66  			Suffix(endpoint).
    67  			Do(ctx)
    68  
    69  		finished <- struct{}{}
    70  	}()
    71  	select {
    72  	case <-finished:
    73  		return result, nil
    74  	case <-ctx.Done():
    75  		return restclient.Result{}, nil
    76  	case <-time.After(proxyTimeout):
    77  		return restclient.Result{}, nil
    78  	}
    79  }
    80  
    81  func getCheckpointContainerMetric(ctx context.Context, f *framework.Framework, pod *v1.Pod) (int, error) {
    82  	framework.Logf("Getting 'checkpoint_container' metrics from %q", pod.Spec.NodeName)
    83  	ms, err := e2emetrics.GetKubeletMetrics(
    84  		ctx,
    85  		f.ClientSet,
    86  		pod.Spec.NodeName,
    87  	)
    88  	if err != nil {
    89  		return 0, err
    90  	}
    91  
    92  	runtimeOperationsTotal, ok := ms["runtime_operations_total"]
    93  	if !ok {
    94  		// If the metric was not found it was probably not written to, yet.
    95  		return 0, nil
    96  	}
    97  
    98  	for _, item := range runtimeOperationsTotal {
    99  		if item.Metric["__name__"] == "kubelet_runtime_operations_total" && item.Metric["operation_type"] == "checkpoint_container" {
   100  			return int(item.Value), nil
   101  		}
   102  	}
   103  	// If the metric was not found it was probably not written to, yet.
   104  	return 0, nil
   105  }
   106  
   107  func getCheckpointContainerErrorMetric(ctx context.Context, f *framework.Framework, pod *v1.Pod) (int, error) {
   108  	framework.Logf("Getting 'checkpoint_container' error metrics from %q", pod.Spec.NodeName)
   109  	ms, err := e2emetrics.GetKubeletMetrics(
   110  		ctx,
   111  		f.ClientSet,
   112  		pod.Spec.NodeName,
   113  	)
   114  	if err != nil {
   115  		return 0, err
   116  	}
   117  
   118  	runtimeOperationsErrorsTotal, ok := ms["runtime_operations_errors_total"]
   119  	if !ok {
   120  		// If the metric was not found it was probably not written to, yet.
   121  		return 0, nil
   122  	}
   123  
   124  	for _, item := range runtimeOperationsErrorsTotal {
   125  		if item.Metric["__name__"] == "kubelet_runtime_operations_errors_total" && item.Metric["operation_type"] == "checkpoint_container" {
   126  			return int(item.Value), nil
   127  		}
   128  	}
   129  	// If the metric was not found it was probably not written to, yet.
   130  	return 0, nil
   131  }
   132  
   133  var _ = SIGDescribe("Checkpoint Container", nodefeature.CheckpointContainer, func() {
   134  	f := framework.NewDefaultFramework("checkpoint-container-test")
   135  	f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
   136  	ginkgo.It("will checkpoint a container out of a pod", func(ctx context.Context) {
   137  		ginkgo.By("creating a target pod")
   138  		podClient := e2epod.NewPodClient(f)
   139  		pod := podClient.CreateSync(ctx, &v1.Pod{
   140  			ObjectMeta: metav1.ObjectMeta{
   141  				Name:      "checkpoint-container-pod",
   142  				Namespace: f.Namespace.Name,
   143  			},
   144  			Spec: v1.PodSpec{
   145  				Containers: []v1.Container{
   146  					{
   147  						Name:    "test-container-1",
   148  						Image:   imageutils.GetE2EImage(imageutils.BusyBox),
   149  						Command: []string{"/bin/sleep"},
   150  						Args:    []string{"10000"},
   151  					},
   152  				},
   153  			},
   154  		})
   155  
   156  		p, err := podClient.Get(
   157  			ctx,
   158  			pod.Name,
   159  			metav1.GetOptions{},
   160  		)
   161  
   162  		framework.ExpectNoError(err)
   163  		isReady, err := testutils.PodRunningReady(p)
   164  		framework.ExpectNoError(err)
   165  		if !isReady {
   166  			framework.Failf("pod %q should be ready", p.Name)
   167  		}
   168  
   169  		// No checkpoint operation should have been logged
   170  		checkpointContainerMetric, err := getCheckpointContainerMetric(ctx, f, pod)
   171  		framework.ExpectNoError(err)
   172  		gomega.Expect(checkpointContainerMetric).To(gomega.Equal(0))
   173  		// No error should have been logged
   174  		checkpointContainerErrorMetric, err := getCheckpointContainerErrorMetric(ctx, f, pod)
   175  		framework.ExpectNoError(err)
   176  		gomega.Expect(checkpointContainerErrorMetric).To(gomega.Equal(0))
   177  
   178  		framework.Logf(
   179  			"About to checkpoint container %q on %q",
   180  			pod.Spec.Containers[0].Name,
   181  			pod.Spec.NodeName,
   182  		)
   183  		result, err := proxyPostRequest(
   184  			ctx,
   185  			f.ClientSet,
   186  			pod.Spec.NodeName,
   187  			fmt.Sprintf(
   188  				"checkpoint/%s/%s/%s",
   189  				f.Namespace.Name,
   190  				pod.Name,
   191  				pod.Spec.Containers[0].Name,
   192  			),
   193  			framework.KubeletPort,
   194  		)
   195  
   196  		framework.ExpectNoError(err)
   197  
   198  		err = result.Error()
   199  		if err != nil {
   200  			statusError, ok := err.(*apierrors.StatusError)
   201  			if !ok {
   202  				framework.Failf("got error %#v, expected StatusError", err)
   203  			}
   204  			// If we are testing against a kubelet with ContainerCheckpoint == false
   205  			// we should get a 404. So a 404 is (also) a good sign.
   206  			if (int(statusError.ErrStatus.Code)) == http.StatusNotFound {
   207  				ginkgo.Skip("Feature 'ContainerCheckpoint' is not enabled and not available")
   208  				return
   209  			}
   210  
   211  			// If the container engine has not implemented the Checkpoint CRI API
   212  			// we will get 500 and a message with
   213  			// '(rpc error: code = Unimplemented desc = unknown method CheckpointContainer'
   214  			// or
   215  			// '(rpc error: code = Unimplemented desc = method CheckpointContainer not implemented)'
   216  			// if the container engine returns that it explicitly has disabled support for it.
   217  			// or
   218  			// '(rpc error: code = Unknown desc = checkpoint/restore support not available)'
   219  			// if the container engine explicitly disabled the checkpoint/restore support
   220  			if (int(statusError.ErrStatus.Code)) == http.StatusInternalServerError {
   221  				if strings.Contains(
   222  					statusError.ErrStatus.Message,
   223  					"(rpc error: code = Unimplemented desc = unknown method CheckpointContainer",
   224  				) {
   225  					ginkgo.Skip("Container engine does not implement 'CheckpointContainer'")
   226  					return
   227  				}
   228  				if strings.Contains(
   229  					statusError.ErrStatus.Message,
   230  					"(rpc error: code = Unimplemented desc = method CheckpointContainer not implemented)",
   231  				) {
   232  					ginkgo.Skip("Container engine does not implement 'CheckpointContainer'")
   233  					return
   234  				}
   235  				if strings.Contains(
   236  					statusError.ErrStatus.Message,
   237  					"(rpc error: code = Unknown desc = checkpoint/restore support not available)",
   238  				) {
   239  					ginkgo.Skip("Container engine does not implement 'CheckpointContainer'")
   240  					return
   241  				}
   242  			}
   243  			framework.Failf(
   244  				"Unexpected status code (%d) during 'CheckpointContainer': %q",
   245  				statusError.ErrStatus.Code,
   246  				statusError.ErrStatus.Message,
   247  			)
   248  		}
   249  
   250  		framework.ExpectNoError(err)
   251  
   252  		// Checkpointing actually worked. Verify that the checkpoint exists and that
   253  		// it is a checkpoint.
   254  
   255  		raw, err := result.Raw()
   256  		framework.ExpectNoError(err)
   257  		answer := checkpointResult{}
   258  		err = json.Unmarshal(raw, &answer)
   259  		framework.ExpectNoError(err)
   260  
   261  		for _, item := range answer.Items {
   262  			// Check that the file exists
   263  			_, err := os.Stat(item)
   264  			framework.ExpectNoError(err)
   265  			// Check the content of the tar file
   266  			// At least looking for the following files
   267  			//  * spec.dump
   268  			//  * config.dump
   269  			//  * checkpoint/inventory.img
   270  			// If these files exist in the checkpoint archive it is
   271  			// probably a complete checkpoint.
   272  			checkForFiles := map[string]bool{
   273  				"spec.dump":                false,
   274  				"config.dump":              false,
   275  				"checkpoint/inventory.img": false,
   276  			}
   277  			fileReader, err := os.Open(item)
   278  			framework.ExpectNoError(err)
   279  			tr := tar.NewReader(fileReader)
   280  			for {
   281  				hdr, err := tr.Next()
   282  				if err == io.EOF {
   283  					// End of archive
   284  					break
   285  				}
   286  				framework.ExpectNoError(err)
   287  				if _, key := checkForFiles[hdr.Name]; key {
   288  					checkForFiles[hdr.Name] = true
   289  				}
   290  			}
   291  			for fileName := range checkForFiles {
   292  				if !checkForFiles[fileName] {
   293  					framework.Failf("File %q not found in checkpoint archive %q", fileName, item)
   294  				}
   295  			}
   296  			// cleanup checkpoint archive
   297  			os.RemoveAll(item)
   298  		}
   299  		// Exactly one checkpoint operation should have happened
   300  		checkpointContainerMetric, err = getCheckpointContainerMetric(ctx, f, pod)
   301  		framework.ExpectNoError(err)
   302  		gomega.Expect(checkpointContainerMetric).To(gomega.Equal(1))
   303  		// No error should have been logged
   304  		checkpointContainerErrorMetric, err = getCheckpointContainerErrorMetric(ctx, f, pod)
   305  		framework.ExpectNoError(err)
   306  		gomega.Expect(checkpointContainerErrorMetric).To(gomega.Equal(0))
   307  	})
   308  })
   309  

View as plain text