1
16
17 package e2enode
18
19 import (
20 "archive/tar"
21 "context"
22 "encoding/json"
23 "fmt"
24 "io"
25 "net/http"
26 "os"
27 "strings"
28 "time"
29
30 "github.com/onsi/ginkgo/v2"
31 v1 "k8s.io/api/core/v1"
32 apierrors "k8s.io/apimachinery/pkg/api/errors"
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 clientset "k8s.io/client-go/kubernetes"
35 restclient "k8s.io/client-go/rest"
36 "k8s.io/kubernetes/test/e2e/framework"
37 e2emetrics "k8s.io/kubernetes/test/e2e/framework/metrics"
38 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
39 "k8s.io/kubernetes/test/e2e/nodefeature"
40 testutils "k8s.io/kubernetes/test/utils"
41 imageutils "k8s.io/kubernetes/test/utils/image"
42 admissionapi "k8s.io/pod-security-admission/api"
43
44 "github.com/onsi/gomega"
45 )
46
47 const (
48
49 proxyTimeout = 2 * time.Minute
50 )
51
52 type checkpointResult struct {
53 Items []string `json:"items"`
54 }
55
56
57 func proxyPostRequest(ctx context.Context, c clientset.Interface, node, endpoint string, port int) (restclient.Result, error) {
58
59 var result restclient.Result
60 finished := make(chan struct{}, 1)
61 go func() {
62 result = c.CoreV1().RESTClient().Post().
63 Resource("nodes").
64 SubResource("proxy").
65 Name(fmt.Sprintf("%v:%v", node, port)).
66 Suffix(endpoint).
67 Do(ctx)
68
69 finished <- struct{}{}
70 }()
71 select {
72 case <-finished:
73 return result, nil
74 case <-ctx.Done():
75 return restclient.Result{}, nil
76 case <-time.After(proxyTimeout):
77 return restclient.Result{}, nil
78 }
79 }
80
81 func getCheckpointContainerMetric(ctx context.Context, f *framework.Framework, pod *v1.Pod) (int, error) {
82 framework.Logf("Getting 'checkpoint_container' metrics from %q", pod.Spec.NodeName)
83 ms, err := e2emetrics.GetKubeletMetrics(
84 ctx,
85 f.ClientSet,
86 pod.Spec.NodeName,
87 )
88 if err != nil {
89 return 0, err
90 }
91
92 runtimeOperationsTotal, ok := ms["runtime_operations_total"]
93 if !ok {
94
95 return 0, nil
96 }
97
98 for _, item := range runtimeOperationsTotal {
99 if item.Metric["__name__"] == "kubelet_runtime_operations_total" && item.Metric["operation_type"] == "checkpoint_container" {
100 return int(item.Value), nil
101 }
102 }
103
104 return 0, nil
105 }
106
107 func getCheckpointContainerErrorMetric(ctx context.Context, f *framework.Framework, pod *v1.Pod) (int, error) {
108 framework.Logf("Getting 'checkpoint_container' error metrics from %q", pod.Spec.NodeName)
109 ms, err := e2emetrics.GetKubeletMetrics(
110 ctx,
111 f.ClientSet,
112 pod.Spec.NodeName,
113 )
114 if err != nil {
115 return 0, err
116 }
117
118 runtimeOperationsErrorsTotal, ok := ms["runtime_operations_errors_total"]
119 if !ok {
120
121 return 0, nil
122 }
123
124 for _, item := range runtimeOperationsErrorsTotal {
125 if item.Metric["__name__"] == "kubelet_runtime_operations_errors_total" && item.Metric["operation_type"] == "checkpoint_container" {
126 return int(item.Value), nil
127 }
128 }
129
130 return 0, nil
131 }
132
133 var _ = SIGDescribe("Checkpoint Container", nodefeature.CheckpointContainer, func() {
134 f := framework.NewDefaultFramework("checkpoint-container-test")
135 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
136 ginkgo.It("will checkpoint a container out of a pod", func(ctx context.Context) {
137 ginkgo.By("creating a target pod")
138 podClient := e2epod.NewPodClient(f)
139 pod := podClient.CreateSync(ctx, &v1.Pod{
140 ObjectMeta: metav1.ObjectMeta{
141 Name: "checkpoint-container-pod",
142 Namespace: f.Namespace.Name,
143 },
144 Spec: v1.PodSpec{
145 Containers: []v1.Container{
146 {
147 Name: "test-container-1",
148 Image: imageutils.GetE2EImage(imageutils.BusyBox),
149 Command: []string{"/bin/sleep"},
150 Args: []string{"10000"},
151 },
152 },
153 },
154 })
155
156 p, err := podClient.Get(
157 ctx,
158 pod.Name,
159 metav1.GetOptions{},
160 )
161
162 framework.ExpectNoError(err)
163 isReady, err := testutils.PodRunningReady(p)
164 framework.ExpectNoError(err)
165 if !isReady {
166 framework.Failf("pod %q should be ready", p.Name)
167 }
168
169
170 checkpointContainerMetric, err := getCheckpointContainerMetric(ctx, f, pod)
171 framework.ExpectNoError(err)
172 gomega.Expect(checkpointContainerMetric).To(gomega.Equal(0))
173
174 checkpointContainerErrorMetric, err := getCheckpointContainerErrorMetric(ctx, f, pod)
175 framework.ExpectNoError(err)
176 gomega.Expect(checkpointContainerErrorMetric).To(gomega.Equal(0))
177
178 framework.Logf(
179 "About to checkpoint container %q on %q",
180 pod.Spec.Containers[0].Name,
181 pod.Spec.NodeName,
182 )
183 result, err := proxyPostRequest(
184 ctx,
185 f.ClientSet,
186 pod.Spec.NodeName,
187 fmt.Sprintf(
188 "checkpoint/%s/%s/%s",
189 f.Namespace.Name,
190 pod.Name,
191 pod.Spec.Containers[0].Name,
192 ),
193 framework.KubeletPort,
194 )
195
196 framework.ExpectNoError(err)
197
198 err = result.Error()
199 if err != nil {
200 statusError, ok := err.(*apierrors.StatusError)
201 if !ok {
202 framework.Failf("got error %#v, expected StatusError", err)
203 }
204
205
206 if (int(statusError.ErrStatus.Code)) == http.StatusNotFound {
207 ginkgo.Skip("Feature 'ContainerCheckpoint' is not enabled and not available")
208 return
209 }
210
211
212
213
214
215
216
217
218
219
220 if (int(statusError.ErrStatus.Code)) == http.StatusInternalServerError {
221 if strings.Contains(
222 statusError.ErrStatus.Message,
223 "(rpc error: code = Unimplemented desc = unknown method CheckpointContainer",
224 ) {
225 ginkgo.Skip("Container engine does not implement 'CheckpointContainer'")
226 return
227 }
228 if strings.Contains(
229 statusError.ErrStatus.Message,
230 "(rpc error: code = Unimplemented desc = method CheckpointContainer not implemented)",
231 ) {
232 ginkgo.Skip("Container engine does not implement 'CheckpointContainer'")
233 return
234 }
235 if strings.Contains(
236 statusError.ErrStatus.Message,
237 "(rpc error: code = Unknown desc = checkpoint/restore support not available)",
238 ) {
239 ginkgo.Skip("Container engine does not implement 'CheckpointContainer'")
240 return
241 }
242 }
243 framework.Failf(
244 "Unexpected status code (%d) during 'CheckpointContainer': %q",
245 statusError.ErrStatus.Code,
246 statusError.ErrStatus.Message,
247 )
248 }
249
250 framework.ExpectNoError(err)
251
252
253
254
255 raw, err := result.Raw()
256 framework.ExpectNoError(err)
257 answer := checkpointResult{}
258 err = json.Unmarshal(raw, &answer)
259 framework.ExpectNoError(err)
260
261 for _, item := range answer.Items {
262
263 _, err := os.Stat(item)
264 framework.ExpectNoError(err)
265
266
267
268
269
270
271
272 checkForFiles := map[string]bool{
273 "spec.dump": false,
274 "config.dump": false,
275 "checkpoint/inventory.img": false,
276 }
277 fileReader, err := os.Open(item)
278 framework.ExpectNoError(err)
279 tr := tar.NewReader(fileReader)
280 for {
281 hdr, err := tr.Next()
282 if err == io.EOF {
283
284 break
285 }
286 framework.ExpectNoError(err)
287 if _, key := checkForFiles[hdr.Name]; key {
288 checkForFiles[hdr.Name] = true
289 }
290 }
291 for fileName := range checkForFiles {
292 if !checkForFiles[fileName] {
293 framework.Failf("File %q not found in checkpoint archive %q", fileName, item)
294 }
295 }
296
297 os.RemoveAll(item)
298 }
299
300 checkpointContainerMetric, err = getCheckpointContainerMetric(ctx, f, pod)
301 framework.ExpectNoError(err)
302 gomega.Expect(checkpointContainerMetric).To(gomega.Equal(1))
303
304 checkpointContainerErrorMetric, err = getCheckpointContainerErrorMetric(ctx, f, pod)
305 framework.ExpectNoError(err)
306 gomega.Expect(checkpointContainerErrorMetric).To(gomega.Equal(0))
307 })
308 })
309
View as plain text