1
16
17 package pod
18
19 import (
20 "context"
21 "fmt"
22 "os"
23 "path/filepath"
24 "strconv"
25 "strings"
26 "time"
27
28 "github.com/onsi/ginkgo/v2"
29 "github.com/onsi/gomega"
30 v1 "k8s.io/api/core/v1"
31 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
32 "k8s.io/apimachinery/pkg/labels"
33 clientset "k8s.io/client-go/kubernetes"
34 "k8s.io/klog/v2"
35
36 "k8s.io/kubernetes/test/e2e/framework"
37 testutils "k8s.io/kubernetes/test/utils"
38 imageutils "k8s.io/kubernetes/test/utils/image"
39 )
40
41
42
43
44 const LabelLogOnPodFailure = "log-on-pod-failure"
45
46
47
48 func expectNoError(err error, explain ...interface{}) {
49 expectNoErrorWithOffset(1, err, explain...)
50 }
51
52
53
54
55 func expectNoErrorWithOffset(offset int, err error, explain ...interface{}) {
56 if err != nil {
57 framework.Logf("Unexpected error occurred: %v", err)
58 }
59 gomega.ExpectWithOffset(1+offset, err).NotTo(gomega.HaveOccurred(), explain...)
60 }
61
62
63 func PodsCreated(ctx context.Context, c clientset.Interface, ns, name string, replicas int32) (*v1.PodList, error) {
64 label := labels.SelectorFromSet(labels.Set(map[string]string{"name": name}))
65 return PodsCreatedByLabel(ctx, c, ns, name, replicas, label)
66 }
67
68
69 func PodsCreatedByLabel(ctx context.Context, c clientset.Interface, ns, name string, replicas int32, label labels.Selector) (*v1.PodList, error) {
70 timeout := 2 * time.Minute
71 for start := time.Now(); time.Since(start) < timeout; time.Sleep(5 * time.Second) {
72 options := metav1.ListOptions{LabelSelector: label.String()}
73
74
75 pods, err := c.CoreV1().Pods(ns).List(ctx, options)
76 if err != nil {
77 return nil, err
78 }
79
80 created := []v1.Pod{}
81 for _, pod := range pods.Items {
82 if pod.DeletionTimestamp != nil {
83 continue
84 }
85 created = append(created, pod)
86 }
87 framework.Logf("Pod name %s: Found %d pods out of %d", name, len(created), replicas)
88
89 if int32(len(created)) == replicas {
90 pods.Items = created
91 return pods, nil
92 }
93 }
94 return nil, fmt.Errorf("Pod name %s: Gave up waiting %v for %d pods to come up", name, timeout, replicas)
95 }
96
97
98 func VerifyPods(ctx context.Context, c clientset.Interface, ns, name string, wantName bool, replicas int32) error {
99 return podRunningMaybeResponding(ctx, c, ns, name, wantName, replicas, true)
100 }
101
102
103 func VerifyPodsRunning(ctx context.Context, c clientset.Interface, ns, name string, wantName bool, replicas int32) error {
104 return podRunningMaybeResponding(ctx, c, ns, name, wantName, replicas, false)
105 }
106
107 func podRunningMaybeResponding(ctx context.Context, c clientset.Interface, ns, name string, wantName bool, replicas int32, checkResponding bool) error {
108 pods, err := PodsCreated(ctx, c, ns, name, replicas)
109 if err != nil {
110 return err
111 }
112 e := podsRunning(ctx, c, pods)
113 if len(e) > 0 {
114 return fmt.Errorf("failed to wait for pods running: %v", e)
115 }
116 if checkResponding {
117 return WaitForPodsResponding(ctx, c, ns, name, wantName, podRespondingTimeout, pods)
118 }
119 return nil
120 }
121
122 func podsRunning(ctx context.Context, c clientset.Interface, pods *v1.PodList) []error {
123
124
125 ginkgo.By("ensuring each pod is running")
126 e := []error{}
127 errorChan := make(chan error)
128
129 for _, pod := range pods.Items {
130 go func(p v1.Pod) {
131 errorChan <- WaitForPodRunningInNamespace(ctx, c, &p)
132 }(pod)
133 }
134
135 for range pods.Items {
136 err := <-errorChan
137 if err != nil {
138 e = append(e, err)
139 }
140 }
141
142 return e
143 }
144
145
146 func LogPodStates(pods []v1.Pod) {
147
148 maxPodW, maxNodeW, maxPhaseW, maxGraceW := len("POD"), len("NODE"), len("PHASE"), len("GRACE")
149 for i := range pods {
150 pod := &pods[i]
151 if len(pod.ObjectMeta.Name) > maxPodW {
152 maxPodW = len(pod.ObjectMeta.Name)
153 }
154 if len(pod.Spec.NodeName) > maxNodeW {
155 maxNodeW = len(pod.Spec.NodeName)
156 }
157 if len(pod.Status.Phase) > maxPhaseW {
158 maxPhaseW = len(pod.Status.Phase)
159 }
160 }
161
162 maxPodW++
163 maxNodeW++
164 maxPhaseW++
165 maxGraceW++
166
167
168 framework.Logf("%-[1]*[2]s %-[3]*[4]s %-[5]*[6]s %-[7]*[8]s %[9]s",
169 maxPodW, "POD", maxNodeW, "NODE", maxPhaseW, "PHASE", maxGraceW, "GRACE", "CONDITIONS")
170 for _, pod := range pods {
171 grace := ""
172 if pod.DeletionGracePeriodSeconds != nil {
173 grace = fmt.Sprintf("%ds", *pod.DeletionGracePeriodSeconds)
174 }
175 framework.Logf("%-[1]*[2]s %-[3]*[4]s %-[5]*[6]s %-[7]*[8]s %[9]s",
176 maxPodW, pod.ObjectMeta.Name, maxNodeW, pod.Spec.NodeName, maxPhaseW, pod.Status.Phase, maxGraceW, grace, pod.Status.Conditions)
177 }
178 framework.Logf("")
179 }
180
181
182
183 func logPodTerminationMessages(pods []v1.Pod) {
184 for _, pod := range pods {
185 for _, status := range pod.Status.InitContainerStatuses {
186 if status.LastTerminationState.Terminated != nil && len(status.LastTerminationState.Terminated.Message) > 0 {
187 framework.Logf("%s[%s].initContainer[%s]=%s", pod.Name, pod.Namespace, status.Name, status.LastTerminationState.Terminated.Message)
188 }
189 }
190 for _, status := range pod.Status.ContainerStatuses {
191 if status.LastTerminationState.Terminated != nil && len(status.LastTerminationState.Terminated.Message) > 0 {
192 framework.Logf("%s[%s].container[%s]=%s", pod.Name, pod.Namespace, status.Name, status.LastTerminationState.Terminated.Message)
193 }
194 }
195 }
196 }
197
198
199
200
201
202
203 func logPodLogs(ctx context.Context, c clientset.Interface, namespace string, pods []v1.Pod, reportDir string) {
204 if reportDir == "" {
205 return
206 }
207
208 var logPods []v1.Pod
209 for _, pod := range pods {
210 if _, ok := pod.Labels[LabelLogOnPodFailure]; ok {
211 logPods = append(logPods, pod)
212 }
213 }
214 maxPods := len(logPods)
215
216
217 if maxPods == 0 {
218 logPods = pods
219 maxPods = len(pods)
220 if maxPods > 5 {
221 maxPods = 5
222 }
223 }
224
225 tailLen := 42
226 for i := 0; i < maxPods; i++ {
227 pod := logPods[i]
228 for _, container := range pod.Spec.Containers {
229 logs, err := getPodLogsInternal(ctx, c, namespace, pod.Name, container.Name, false, nil, &tailLen)
230 if err != nil {
231 framework.Logf("Unable to fetch %s/%s/%s logs: %v", pod.Namespace, pod.Name, container.Name, err)
232 continue
233 }
234
235 logDir := filepath.Join(reportDir, namespace, pod.Name, container.Name)
236 err = os.MkdirAll(logDir, 0755)
237 if err != nil {
238 framework.Logf("Unable to create path '%s'. Err: %v", logDir, err)
239 continue
240 }
241
242 logPath := filepath.Join(logDir, "logs.txt")
243 err = os.WriteFile(logPath, []byte(logs), 0644)
244 if err != nil {
245 framework.Logf("Could not write the container logs in: %s. Err: %v", logPath, err)
246 }
247 }
248 }
249 }
250
251
252 func DumpAllPodInfoForNamespace(ctx context.Context, c clientset.Interface, namespace, reportDir string) {
253 pods, err := c.CoreV1().Pods(namespace).List(ctx, metav1.ListOptions{})
254 if err != nil {
255 framework.Logf("unable to fetch pod debug info: %v", err)
256 }
257 LogPodStates(pods.Items)
258 logPodTerminationMessages(pods.Items)
259 logPodLogs(ctx, c, namespace, pods.Items, reportDir)
260 }
261
262
263
264 func FilterNonRestartablePods(pods []*v1.Pod) []*v1.Pod {
265 var results []*v1.Pod
266 for _, p := range pods {
267 if isNotRestartAlwaysMirrorPod(p) {
268
269
270
271
272 continue
273 }
274 results = append(results, p)
275 }
276 return results
277 }
278
279 func isNotRestartAlwaysMirrorPod(p *v1.Pod) bool {
280
281 if _, ok := p.Annotations[v1.MirrorPodAnnotationKey]; !ok {
282 return false
283 }
284 return p.Spec.RestartPolicy != v1.RestartPolicyAlways
285 }
286
287
288
289
290 func NewAgnhostPod(ns, podName string, volumes []v1.Volume, mounts []v1.VolumeMount, ports []v1.ContainerPort, args ...string) *v1.Pod {
291 immediate := int64(0)
292 pod := &v1.Pod{
293 ObjectMeta: metav1.ObjectMeta{
294 Name: podName,
295 Namespace: ns,
296 },
297 Spec: v1.PodSpec{
298 Containers: []v1.Container{
299 NewAgnhostContainer("agnhost-container", mounts, ports, args...),
300 },
301 Volumes: volumes,
302 SecurityContext: &v1.PodSecurityContext{},
303 TerminationGracePeriodSeconds: &immediate,
304 },
305 }
306 return pod
307 }
308
309 func NewAgnhostPodFromContainers(ns, podName string, volumes []v1.Volume, containers ...v1.Container) *v1.Pod {
310 immediate := int64(0)
311 pod := &v1.Pod{
312 ObjectMeta: metav1.ObjectMeta{
313 Name: podName,
314 Namespace: ns,
315 },
316 Spec: v1.PodSpec{
317 Containers: containers[:],
318 Volumes: volumes,
319 SecurityContext: &v1.PodSecurityContext{},
320 TerminationGracePeriodSeconds: &immediate,
321 },
322 }
323 return pod
324 }
325
326
327 func NewAgnhostContainer(containerName string, mounts []v1.VolumeMount, ports []v1.ContainerPort, args ...string) v1.Container {
328 if len(args) == 0 {
329 args = []string{"pause"}
330 }
331 return v1.Container{
332 Name: containerName,
333 Image: imageutils.GetE2EImage(imageutils.Agnhost),
334 Args: args,
335 VolumeMounts: mounts,
336 Ports: ports,
337 SecurityContext: &v1.SecurityContext{},
338 ImagePullPolicy: v1.PullIfNotPresent,
339 }
340 }
341
342
343 func NewExecPodSpec(ns, name string, hostNetwork bool) *v1.Pod {
344 pod := NewAgnhostPod(ns, name, nil, nil, nil)
345 pod.Spec.HostNetwork = hostNetwork
346 return pod
347 }
348
349
350 func newExecPodSpec(ns, generateName string) *v1.Pod {
351
352
353 pod := NewAgnhostPod(ns, "", nil, nil, nil)
354 pod.ObjectMeta.GenerateName = generateName
355 return pod
356 }
357
358
359
360 func CreateExecPodOrFail(ctx context.Context, client clientset.Interface, ns, generateName string, tweak func(*v1.Pod)) *v1.Pod {
361 framework.Logf("Creating new exec pod")
362 pod := newExecPodSpec(ns, generateName)
363 if tweak != nil {
364 tweak(pod)
365 }
366 execPod, err := client.CoreV1().Pods(ns).Create(ctx, pod, metav1.CreateOptions{})
367 expectNoError(err, "failed to create new exec pod in namespace: %s", ns)
368 err = WaitForPodNameRunningInNamespace(ctx, client, execPod.Name, execPod.Namespace)
369 expectNoError(err, "failed to create new exec pod in namespace: %s", ns)
370 return execPod
371 }
372
373
374
375
376
377
378
379 func WithWindowsHostProcess(pod *v1.Pod, username string) {
380 if pod.Spec.SecurityContext == nil {
381 pod.Spec.SecurityContext = &v1.PodSecurityContext{}
382 }
383 if pod.Spec.SecurityContext.WindowsOptions == nil {
384 pod.Spec.SecurityContext.WindowsOptions = &v1.WindowsSecurityContextOptions{}
385 }
386
387 trueVar := true
388 if username == "" {
389 username = "NT AUTHORITY\\SYSTEM"
390 }
391 pod.Spec.SecurityContext.WindowsOptions.HostProcess = &trueVar
392 pod.Spec.SecurityContext.WindowsOptions.RunAsUserName = &username
393 }
394
395
396
397
398 func CheckPodsRunningReady(ctx context.Context, c clientset.Interface, ns string, podNames []string, timeout time.Duration) bool {
399 return checkPodsCondition(ctx, c, ns, podNames, timeout, testutils.PodRunningReady, "running and ready")
400 }
401
402
403
404
405 func CheckPodsRunningReadyOrSucceeded(ctx context.Context, c clientset.Interface, ns string, podNames []string, timeout time.Duration) bool {
406 return checkPodsCondition(ctx, c, ns, podNames, timeout, testutils.PodRunningReadyOrSucceeded, "running and ready, or succeeded")
407 }
408
409
410
411 func checkPodsCondition(ctx context.Context, c clientset.Interface, ns string, podNames []string, timeout time.Duration, condition podCondition, desc string) bool {
412 np := len(podNames)
413 framework.Logf("Waiting up to %v for %d pods to be %s: %s", timeout, np, desc, podNames)
414 type waitPodResult struct {
415 success bool
416 podName string
417 }
418 result := make(chan waitPodResult, len(podNames))
419 for _, podName := range podNames {
420
421 go func(name string) {
422 err := WaitForPodCondition(ctx, c, ns, name, desc, timeout, condition)
423 result <- waitPodResult{err == nil, name}
424 }(podName)
425 }
426
427 success := true
428 for range podNames {
429 res := <-result
430 if !res.success {
431 framework.Logf("Pod %[1]s failed to be %[2]s.", res.podName, desc)
432 success = false
433 }
434 }
435 framework.Logf("Wanted all %d pods to be %s. Result: %t. Pods: %v", np, desc, success, podNames)
436 return success
437 }
438
439
440 func GetPodLogs(ctx context.Context, c clientset.Interface, namespace, podName, containerName string) (string, error) {
441 return getPodLogsInternal(ctx, c, namespace, podName, containerName, false, nil, nil)
442 }
443
444
445 func GetPodLogsSince(ctx context.Context, c clientset.Interface, namespace, podName, containerName string, since time.Time) (string, error) {
446 sinceTime := metav1.NewTime(since)
447 return getPodLogsInternal(ctx, c, namespace, podName, containerName, false, &sinceTime, nil)
448 }
449
450
451
452 func GetPreviousPodLogs(ctx context.Context, c clientset.Interface, namespace, podName, containerName string) (string, error) {
453 return getPodLogsInternal(ctx, c, namespace, podName, containerName, true, nil, nil)
454 }
455
456
457 func getPodLogsInternal(ctx context.Context, c clientset.Interface, namespace, podName, containerName string, previous bool, sinceTime *metav1.Time, tailLines *int) (string, error) {
458 request := c.CoreV1().RESTClient().Get().
459 Resource("pods").
460 Namespace(namespace).
461 Name(podName).SubResource("log").
462 Param("container", containerName).
463 Param("previous", strconv.FormatBool(previous))
464 if sinceTime != nil {
465 request.Param("sinceTime", sinceTime.Format(time.RFC3339))
466 }
467 if tailLines != nil {
468 request.Param("tailLines", strconv.Itoa(*tailLines))
469 }
470 logs, err := request.Do(ctx).Raw()
471 if err != nil {
472 return "", err
473 }
474 if strings.Contains(string(logs), "Internal Error") {
475 return "", fmt.Errorf("Fetched log contains \"Internal Error\": %q", string(logs))
476 }
477 return string(logs), err
478 }
479
480
481 func GetPodsInNamespace(ctx context.Context, c clientset.Interface, ns string, ignoreLabels map[string]string) ([]*v1.Pod, error) {
482 pods, err := c.CoreV1().Pods(ns).List(ctx, metav1.ListOptions{})
483 if err != nil {
484 return []*v1.Pod{}, err
485 }
486 ignoreSelector := labels.SelectorFromSet(ignoreLabels)
487 var filtered []*v1.Pod
488 for i := range pods.Items {
489 p := pods.Items[i]
490 if len(ignoreLabels) != 0 && ignoreSelector.Matches(labels.Set(p.Labels)) {
491 continue
492 }
493 filtered = append(filtered, &p)
494 }
495 return filtered, nil
496 }
497
498
499 func GetPods(ctx context.Context, c clientset.Interface, ns string, matchLabels map[string]string) ([]v1.Pod, error) {
500 label := labels.SelectorFromSet(matchLabels)
501 listOpts := metav1.ListOptions{LabelSelector: label.String()}
502 pods, err := c.CoreV1().Pods(ns).List(ctx, listOpts)
503 if err != nil {
504 return []v1.Pod{}, err
505 }
506 return pods.Items, nil
507 }
508
509
510 func GetPodSecretUpdateTimeout(ctx context.Context, c clientset.Interface) time.Duration {
511
512
513
514
515
516 secretTTL, err := getNodeTTLAnnotationValue(ctx, c)
517 if err != nil {
518 framework.Logf("Couldn't get node TTL annotation (using default value of 0): %v", err)
519 }
520 podLogTimeout := 240*time.Second + secretTTL
521 return podLogTimeout
522 }
523
524
525 func VerifyPodHasConditionWithType(ctx context.Context, f *framework.Framework, pod *v1.Pod, cType v1.PodConditionType) {
526 pod, err := f.ClientSet.CoreV1().Pods(f.Namespace.Name).Get(ctx, pod.Name, metav1.GetOptions{})
527 framework.ExpectNoError(err, "Failed to get the recent pod object for name: %q", pod.Name)
528 if condition := FindPodConditionByType(&pod.Status, cType); condition == nil {
529 framework.Failf("pod %q should have the condition: %q, pod status: %v", pod.Name, cType, pod.Status)
530 }
531 }
532
533 func getNodeTTLAnnotationValue(ctx context.Context, c clientset.Interface) (time.Duration, error) {
534 nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
535 if err != nil || len(nodes.Items) == 0 {
536 return time.Duration(0), fmt.Errorf("Couldn't list any nodes to get TTL annotation: %w", err)
537 }
538
539
540 node := &nodes.Items[0]
541 if node.Annotations == nil {
542 return time.Duration(0), fmt.Errorf("No annotations found on the node")
543 }
544 value, ok := node.Annotations[v1.ObjectTTLAnnotationKey]
545 if !ok {
546 return time.Duration(0), fmt.Errorf("No TTL annotation found on the node")
547 }
548 intValue, err := strconv.Atoi(value)
549 if err != nil {
550 return time.Duration(0), fmt.Errorf("Cannot convert TTL annotation from %#v to int", *node)
551 }
552 return time.Duration(intValue) * time.Second, nil
553 }
554
555
556 func FilterActivePods(pods []*v1.Pod) []*v1.Pod {
557 var result []*v1.Pod
558 for _, p := range pods {
559 if IsPodActive(p) {
560 result = append(result, p)
561 } else {
562 klog.V(4).Infof("Ignoring inactive pod %v/%v in state %v, deletion time %v",
563 p.Namespace, p.Name, p.Status.Phase, p.DeletionTimestamp)
564 }
565 }
566 return result
567 }
568
569
570 func IsPodActive(p *v1.Pod) bool {
571 return v1.PodSucceeded != p.Status.Phase &&
572 v1.PodFailed != p.Status.Phase &&
573 p.DeletionTimestamp == nil
574 }
575
View as plain text