1
16
17 package pod
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "regexp"
24 "sync"
25 "time"
26
27 v1 "k8s.io/api/core/v1"
28 apierrors "k8s.io/apimachinery/pkg/api/errors"
29 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30 "k8s.io/apimachinery/pkg/types"
31 "k8s.io/apimachinery/pkg/util/sets"
32 "k8s.io/apimachinery/pkg/util/strategicpatch"
33 "k8s.io/apimachinery/pkg/util/wait"
34 "k8s.io/client-go/kubernetes/scheme"
35 v1core "k8s.io/client-go/kubernetes/typed/core/v1"
36 "k8s.io/kubectl/pkg/util/podutils"
37
38 "github.com/onsi/ginkgo/v2"
39 "github.com/onsi/gomega"
40
41 "k8s.io/kubernetes/test/e2e/framework"
42 )
43
44 const (
45
46 DefaultPodDeletionTimeout = 3 * time.Minute
47
48
49 killingContainer = "Killing"
50
51
52 failedToCreateContainer = "Failed"
53
54
55 startedContainer = "Started"
56
57
58 forbiddenReason = "SysctlForbidden"
59 )
60
61
62
63
64 var ImagePrePullList sets.String
65
66
67
68
69 func NewPodClient(f *framework.Framework) *PodClient {
70 return &PodClient{
71 f: f,
72 PodInterface: f.ClientSet.CoreV1().Pods(f.Namespace.Name),
73 namespace: f.Namespace.Name,
74 }
75 }
76
77
78
79
80 func PodClientNS(f *framework.Framework, namespace string) *PodClient {
81 return &PodClient{
82 f: f,
83 PodInterface: f.ClientSet.CoreV1().Pods(namespace),
84 namespace: namespace,
85 }
86 }
87
88
89 type PodClient struct {
90 f *framework.Framework
91 v1core.PodInterface
92 namespace string
93 }
94
95
96 func (c *PodClient) Create(ctx context.Context, pod *v1.Pod) *v1.Pod {
97 c.mungeSpec(pod)
98 p, err := c.PodInterface.Create(ctx, pod, metav1.CreateOptions{})
99 framework.ExpectNoError(err, "Error creating Pod")
100 return p
101 }
102
103
104 func (c *PodClient) CreateSync(ctx context.Context, pod *v1.Pod) *v1.Pod {
105 p := c.Create(ctx, pod)
106 framework.ExpectNoError(WaitTimeoutForPodReadyInNamespace(ctx, c.f.ClientSet, p.Name, c.namespace, framework.PodStartTimeout))
107
108 p, err := c.Get(ctx, p.Name, metav1.GetOptions{})
109 framework.ExpectNoError(err)
110 return p
111 }
112
113
114 func (c *PodClient) CreateBatch(ctx context.Context, pods []*v1.Pod) []*v1.Pod {
115 ps := make([]*v1.Pod, len(pods))
116 var wg sync.WaitGroup
117 for i, pod := range pods {
118 wg.Add(1)
119 go func(i int, pod *v1.Pod) {
120 defer wg.Done()
121 defer ginkgo.GinkgoRecover()
122 ps[i] = c.CreateSync(ctx, pod)
123 }(i, pod)
124 }
125 wg.Wait()
126 return ps
127 }
128
129
130
131
132 func (c *PodClient) Update(ctx context.Context, name string, updateFn func(pod *v1.Pod)) {
133 framework.ExpectNoError(wait.PollWithContext(ctx, time.Millisecond*500, time.Second*30, func(ctx context.Context) (bool, error) {
134 pod, err := c.PodInterface.Get(ctx, name, metav1.GetOptions{})
135 if err != nil {
136 return false, fmt.Errorf("failed to get pod %q: %w", name, err)
137 }
138 updateFn(pod)
139 _, err = c.PodInterface.Update(ctx, pod, metav1.UpdateOptions{})
140 if err == nil {
141 framework.Logf("Successfully updated pod %q", name)
142 return true, nil
143 }
144 if apierrors.IsConflict(err) {
145 framework.Logf("Conflicting update to pod %q, re-get and re-update: %v", name, err)
146 return false, nil
147 }
148 return false, fmt.Errorf("failed to update pod %q: %w", name, err)
149 }))
150 }
151
152
153 func (c *PodClient) AddEphemeralContainerSync(ctx context.Context, pod *v1.Pod, ec *v1.EphemeralContainer, timeout time.Duration) error {
154 podJS, err := json.Marshal(pod)
155 framework.ExpectNoError(err, "error creating JSON for pod %q", FormatPod(pod))
156
157 ecPod := pod.DeepCopy()
158 ecPod.Spec.EphemeralContainers = append(ecPod.Spec.EphemeralContainers, *ec)
159 ecJS, err := json.Marshal(ecPod)
160 framework.ExpectNoError(err, "error creating JSON for pod with ephemeral container %q", FormatPod(pod))
161
162 patch, err := strategicpatch.CreateTwoWayMergePatch(podJS, ecJS, pod)
163 framework.ExpectNoError(err, "error creating patch to add ephemeral container %q", FormatPod(pod))
164
165
166 if _, err := c.Patch(ctx, pod.Name, types.StrategicMergePatchType, patch, metav1.PatchOptions{}, "ephemeralcontainers"); err != nil {
167 return err
168 }
169
170 framework.ExpectNoError(WaitForContainerRunning(ctx, c.f.ClientSet, c.namespace, pod.Name, ec.Name, timeout))
171 return nil
172 }
173
174
175
176
177
178 func FormatPod(pod *v1.Pod) string {
179 if pod == nil {
180 return "<nil>"
181 }
182 return fmt.Sprintf("%s_%s(%s)", pod.Name, pod.Namespace, pod.UID)
183 }
184
185
186
187 func (c *PodClient) DeleteSync(ctx context.Context, name string, options metav1.DeleteOptions, timeout time.Duration) {
188 err := c.Delete(ctx, name, options)
189 if err != nil && !apierrors.IsNotFound(err) {
190 framework.Failf("Failed to delete pod %q: %v", name, err)
191 }
192 framework.ExpectNoError(WaitForPodNotFoundInNamespace(ctx, c.f.ClientSet, name, c.namespace, timeout), "wait for pod %q to disappear", name)
193 }
194
195
196 func (c *PodClient) mungeSpec(pod *v1.Pod) {
197 if !framework.TestContext.NodeE2E {
198 return
199 }
200
201 gomega.Expect(pod.Spec.NodeName).To(gomega.Or(gomega.BeZero(), gomega.Equal(framework.TestContext.NodeName)), "Test misconfigured")
202 pod.Spec.NodeName = framework.TestContext.NodeName
203
204
205 pod.Spec.DNSPolicy = v1.DNSDefault
206
207
208
209 if !framework.TestContext.PrepullImages {
210 return
211 }
212
213
214 for i := range pod.Spec.Containers {
215 c := &pod.Spec.Containers[i]
216 if c.ImagePullPolicy == v1.PullAlways {
217
218
219
220 continue
221 }
222
223
224 gomega.Expect(ImagePrePullList.Has(c.Image)).To(gomega.BeTrue(), "Image %q is not in the pre-pull list, consider adding it to PrePulledImages in test/e2e/common/util.go or NodePrePullImageList in test/e2e_node/image_list.go", c.Image)
225
226
227 c.ImagePullPolicy = v1.PullNever
228 }
229 }
230
231
232
233 func (c *PodClient) WaitForSuccess(ctx context.Context, name string, timeout time.Duration) {
234 gomega.Expect(WaitForPodCondition(ctx, c.f.ClientSet, c.namespace, name, fmt.Sprintf("%s or %s", v1.PodSucceeded, v1.PodFailed), timeout,
235 func(pod *v1.Pod) (bool, error) {
236 switch pod.Status.Phase {
237 case v1.PodFailed:
238 return true, fmt.Errorf("pod %q failed with reason: %q, message: %q", name, pod.Status.Reason, pod.Status.Message)
239 case v1.PodSucceeded:
240 return true, nil
241 default:
242 return false, nil
243 }
244 },
245 )).To(gomega.Succeed(), "wait for pod %q to succeed", name)
246 }
247
248
249 func (c *PodClient) WaitForFinish(ctx context.Context, name string, timeout time.Duration) {
250 gomega.Expect(WaitForPodCondition(ctx, c.f.ClientSet, c.namespace, name, fmt.Sprintf("%s or %s", v1.PodSucceeded, v1.PodFailed), timeout,
251 func(pod *v1.Pod) (bool, error) {
252 switch pod.Status.Phase {
253 case v1.PodFailed:
254 return true, nil
255 case v1.PodSucceeded:
256 return true, nil
257 default:
258 return false, nil
259 }
260 },
261 )).To(gomega.Succeed(), "wait for pod %q to finish running", name)
262 }
263
264
265 func (c *PodClient) WaitForErrorEventOrSuccess(ctx context.Context, pod *v1.Pod) (*v1.Event, error) {
266 var ev *v1.Event
267 err := wait.PollWithContext(ctx, framework.Poll, framework.PodStartTimeout, func(ctx context.Context) (bool, error) {
268 evnts, err := c.f.ClientSet.CoreV1().Events(pod.Namespace).Search(scheme.Scheme, pod)
269 if err != nil {
270 return false, fmt.Errorf("error in listing events: %w", err)
271 }
272 for _, e := range evnts.Items {
273 switch e.Reason {
274 case killingContainer, failedToCreateContainer, forbiddenReason:
275 ev = &e
276 return true, nil
277 case startedContainer:
278 return true, nil
279 default:
280
281 }
282 }
283 return false, nil
284 })
285 return ev, err
286 }
287
288
289 func (c *PodClient) MatchContainerOutput(ctx context.Context, name string, containerName string, expectedRegexp string) error {
290 f := c.f
291 output, err := GetPodLogs(ctx, f.ClientSet, f.Namespace.Name, name, containerName)
292 if err != nil {
293 return fmt.Errorf("failed to get output for container %q of pod %q", containerName, name)
294 }
295 regex, err := regexp.Compile(expectedRegexp)
296 if err != nil {
297 return fmt.Errorf("failed to compile regexp %q: %w", expectedRegexp, err)
298 }
299 if !regex.MatchString(output) {
300 return fmt.Errorf("failed to match regexp %q in output %q", expectedRegexp, output)
301 }
302 return nil
303 }
304
305
306 func (c *PodClient) PodIsReady(ctx context.Context, name string) bool {
307 pod, err := c.Get(ctx, name, metav1.GetOptions{})
308 framework.ExpectNoError(err)
309 return podutils.IsPodReady(pod)
310 }
311
312
313
314
315
316 func removeString(slice []string, s string) []string {
317 newSlice := make([]string, 0)
318 for _, item := range slice {
319 if item != s {
320 newSlice = append(newSlice, item)
321 }
322 }
323 if len(newSlice) == 0 {
324
325
326 return nil
327 }
328 return newSlice
329 }
330
331
332 func (c *PodClient) RemoveFinalizer(ctx context.Context, podName string, finalizerName string) {
333 framework.Logf("Removing pod's %q finalizer: %q", podName, finalizerName)
334 c.Update(ctx, podName, func(pod *v1.Pod) {
335 pod.ObjectMeta.Finalizers = removeString(pod.ObjectMeta.Finalizers, finalizerName)
336 })
337 }
338
View as plain text