1
16
17 package resource
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 "github.com/onsi/ginkgo/v2"
25
26 v1 "k8s.io/api/core/v1"
27 apierrors "k8s.io/apimachinery/pkg/api/errors"
28 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
29 "k8s.io/apimachinery/pkg/fields"
30 "k8s.io/apimachinery/pkg/runtime"
31 "k8s.io/apimachinery/pkg/runtime/schema"
32 "k8s.io/apimachinery/pkg/util/wait"
33 "k8s.io/client-go/dynamic"
34 clientset "k8s.io/client-go/kubernetes"
35 scaleclient "k8s.io/client-go/scale"
36 "k8s.io/kubernetes/test/e2e/framework"
37 e2epod "k8s.io/kubernetes/test/e2e/framework/pod"
38 testutils "k8s.io/kubernetes/test/utils"
39 )
40
41 const (
42
43
44 gcThroughput = 10
45 )
46
47
48 func ScaleResource(
49 ctx context.Context,
50 clientset clientset.Interface,
51 scalesGetter scaleclient.ScalesGetter,
52 ns, name string,
53 size uint,
54 wait bool,
55 kind schema.GroupKind,
56 gvr schema.GroupVersionResource,
57 ) error {
58 ginkgo.By(fmt.Sprintf("Scaling %v %s in namespace %s to %d", kind, name, ns, size))
59 if err := testutils.ScaleResourceWithRetries(scalesGetter, ns, name, size, gvr); err != nil {
60 return fmt.Errorf("error while scaling RC %s to %d replicas: %w", name, size, err)
61 }
62 if !wait {
63 return nil
64 }
65 return WaitForControlledPodsRunning(ctx, clientset, ns, name, kind)
66 }
67
68
69 func DeleteResourceAndWaitForGC(ctx context.Context, c clientset.Interface, kind schema.GroupKind, ns, name string) error {
70 ginkgo.By(fmt.Sprintf("deleting %v %s in namespace %s, will wait for the garbage collector to delete the pods", kind, name, ns))
71
72 rtObject, err := GetRuntimeObjectForKind(ctx, c, kind, ns, name)
73 if err != nil {
74 if apierrors.IsNotFound(err) {
75 framework.Logf("%v %s not found: %v", kind, name, err)
76 return nil
77 }
78 return err
79 }
80 deleteObject := func() error {
81 background := metav1.DeletePropagationBackground
82 return testutils.DeleteResource(c, kind, ns, name, metav1.DeleteOptions{PropagationPolicy: &background})
83 }
84 return deleteObjectAndWaitForGC(ctx, c, rtObject, deleteObject, ns, name, kind.String())
85 }
86
87
88
89 func DeleteCustomResourceAndWaitForGC(ctx context.Context, c clientset.Interface, dynamicClient dynamic.Interface, scaleClient scaleclient.ScalesGetter, gvr schema.GroupVersionResource, ns, name string) error {
90 ginkgo.By(fmt.Sprintf("deleting %v %s in namespace %s, will wait for the garbage collector to delete the pods", gvr, name, ns))
91 resourceClient := dynamicClient.Resource(gvr).Namespace(ns)
92 _, err := resourceClient.Get(ctx, name, metav1.GetOptions{})
93 if err != nil {
94 if apierrors.IsNotFound(err) {
95 framework.Logf("%v %s not found: %v", gvr, name, err)
96 return nil
97 }
98 return err
99 }
100 scaleObj, err := scaleClient.Scales(ns).Get(ctx, gvr.GroupResource(), name, metav1.GetOptions{})
101 if err != nil {
102 framework.Logf("error while trying to get scale subresource of kind %v with name %v: %v", gvr, name, err)
103 return nil
104 }
105 deleteObject := func() error {
106 background := metav1.DeletePropagationBackground
107 return resourceClient.Delete(ctx, name, metav1.DeleteOptions{PropagationPolicy: &background})
108 }
109 return deleteObjectAndWaitForGC(ctx, c, scaleObj, deleteObject, ns, name, gvr.String())
110 }
111
112 func deleteObjectAndWaitForGC(ctx context.Context, c clientset.Interface, rtObject runtime.Object, deleteObject func() error, ns, name, description string) error {
113 selector, err := GetSelectorFromRuntimeObject(rtObject)
114 if err != nil {
115 return err
116 }
117 replicas, err := GetReplicasFromRuntimeObject(rtObject)
118 if err != nil {
119 return err
120 }
121
122 ps, err := testutils.NewPodStore(c, ns, selector, fields.Everything())
123 if err != nil {
124 return err
125 }
126
127 defer ps.Stop()
128 startTime := time.Now()
129 if err := testutils.RetryWithExponentialBackOff(func() (bool, error) {
130 err := deleteObject()
131 if err == nil || apierrors.IsNotFound(err) {
132 return true, nil
133 }
134 return false, fmt.Errorf("failed to delete object with non-retriable error: %w", err)
135 }); err != nil {
136 return err
137 }
138 deleteTime := time.Since(startTime)
139 framework.Logf("Deleting %v %s took: %v", description, name, deleteTime)
140
141 var interval, timeout time.Duration
142 switch {
143 case replicas < 100:
144 interval = 100 * time.Millisecond
145 case replicas < 1000:
146 interval = 1 * time.Second
147 default:
148 interval = 10 * time.Second
149 }
150 if replicas < 5000 {
151 timeout = 10 * time.Minute
152 } else {
153 timeout = time.Duration(replicas/gcThroughput) * time.Second
154
155 timeout = timeout + 3*time.Minute
156 }
157
158 err = waitForPodsInactive(ctx, ps, interval, timeout)
159 if err != nil {
160 return fmt.Errorf("error while waiting for pods to become inactive %s: %w", name, err)
161 }
162 terminatePodTime := time.Since(startTime) - deleteTime
163 framework.Logf("Terminating %v %s pods took: %v", description, name, terminatePodTime)
164
165
166
167
168 err = waitForPodsGone(ctx, ps, interval, 20*time.Minute)
169 if err != nil {
170 return fmt.Errorf("error while waiting for pods gone %s: %w", name, err)
171 }
172 return nil
173 }
174
175
176 func waitForPodsGone(ctx context.Context, ps *testutils.PodStore, interval, timeout time.Duration) error {
177 var pods []*v1.Pod
178 err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
179 if pods = ps.List(); len(pods) == 0 {
180 return true, nil
181 }
182 return false, nil
183 })
184
185 if wait.Interrupted(err) {
186 for _, pod := range pods {
187 framework.Logf("ERROR: Pod %q still exists. Node: %q", pod.Name, pod.Spec.NodeName)
188 }
189 return fmt.Errorf("there are %d pods left. E.g. %q on node %q", len(pods), pods[0].Name, pods[0].Spec.NodeName)
190 }
191 return err
192 }
193
194
195
196
197
198 func waitForPodsInactive(ctx context.Context, ps *testutils.PodStore, interval, timeout time.Duration) error {
199 var activePods []*v1.Pod
200 err := wait.PollUntilContextTimeout(ctx, interval, timeout, true, func(ctx context.Context) (bool, error) {
201 pods := ps.List()
202 activePods = e2epod.FilterActivePods(pods)
203 if len(activePods) != 0 {
204 return false, nil
205 }
206 return true, nil
207 })
208
209 if wait.Interrupted(err) {
210 for _, pod := range activePods {
211 framework.Logf("ERROR: Pod %q running on %q is still active", pod.Name, pod.Spec.NodeName)
212 }
213 return fmt.Errorf("there are %d active pods. E.g. %q on node %q", len(activePods), activePods[0].Name, activePods[0].Spec.NodeName)
214 }
215 return err
216 }
217
218
219 func WaitForControlledPodsRunning(ctx context.Context, c clientset.Interface, ns, name string, kind schema.GroupKind) error {
220 rtObject, err := GetRuntimeObjectForKind(ctx, c, kind, ns, name)
221 if err != nil {
222 return err
223 }
224 selector, err := GetSelectorFromRuntimeObject(rtObject)
225 if err != nil {
226 return err
227 }
228 replicas, err := GetReplicasFromRuntimeObject(rtObject)
229 if err != nil {
230 return err
231 }
232 err = testutils.WaitForEnoughPodsWithLabelRunning(c, ns, selector, int(replicas))
233 if err != nil {
234 return fmt.Errorf("Error while waiting for replication controller %s pods to be running: %w", name, err)
235 }
236 return nil
237 }
238
239
240 func WaitForControlledPods(ctx context.Context, c clientset.Interface, ns, name string, kind schema.GroupKind) (pods *v1.PodList, err error) {
241 rtObject, err := GetRuntimeObjectForKind(ctx, c, kind, ns, name)
242 if err != nil {
243 return nil, err
244 }
245 selector, err := GetSelectorFromRuntimeObject(rtObject)
246 if err != nil {
247 return nil, err
248 }
249 return e2epod.WaitForPodsWithLabel(ctx, c, ns, selector)
250 }
251
View as plain text