1
16
17 package apps
18
19 import (
20 "context"
21 "fmt"
22 "strconv"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/labels"
28 "k8s.io/apimachinery/pkg/runtime"
29 "k8s.io/apimachinery/pkg/util/sets"
30 "k8s.io/apimachinery/pkg/util/uuid"
31 "k8s.io/apimachinery/pkg/util/wait"
32 "k8s.io/apimachinery/pkg/watch"
33 clientset "k8s.io/client-go/kubernetes"
34 "k8s.io/client-go/tools/cache"
35 "k8s.io/kubernetes/pkg/cluster/ports"
36 kubeschedulerconfig "k8s.io/kubernetes/pkg/scheduler/apis/config"
37 "k8s.io/kubernetes/test/e2e/framework"
38 e2edebug "k8s.io/kubernetes/test/e2e/framework/debug"
39 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
40 e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
41 e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper"
42 e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
43 testutils "k8s.io/kubernetes/test/utils"
44 imageutils "k8s.io/kubernetes/test/utils/image"
45 admissionapi "k8s.io/pod-security-admission/api"
46
47 "github.com/onsi/ginkgo/v2"
48 )
49
50
51
52
53
54
55
56
57 const (
58 restartPollInterval = 5 * time.Second
59 restartTimeout = 10 * time.Minute
60 numPods = 10
61
62 ADD = "ADD"
63
64 DEL = "DEL"
65
66 UPDATE = "UPDATE"
67 )
68
69
70
71 type RestartDaemonConfig struct {
72 nodeName string
73 daemonName string
74 healthzPort int
75 pollInterval time.Duration
76 pollTimeout time.Duration
77 enableHTTPS bool
78 }
79
80
81 func NewRestartConfig(nodeName, daemonName string, healthzPort int, pollInterval, pollTimeout time.Duration, enableHTTPS bool) *RestartDaemonConfig {
82 if !framework.ProviderIs("gce") {
83 framework.Logf("WARNING: SSH through the restart config might not work on %s", framework.TestContext.Provider)
84 }
85 return &RestartDaemonConfig{
86 nodeName: nodeName,
87 daemonName: daemonName,
88 healthzPort: healthzPort,
89 pollInterval: pollInterval,
90 pollTimeout: pollTimeout,
91 enableHTTPS: enableHTTPS,
92 }
93 }
94
95 func (r *RestartDaemonConfig) String() string {
96 return fmt.Sprintf("Daemon %v on node %v", r.daemonName, r.nodeName)
97 }
98
99
100 func (r *RestartDaemonConfig) waitUp(ctx context.Context) {
101 framework.Logf("Checking if %v is up by polling for a 200 on its /healthz endpoint", r)
102 nullDev := "/dev/null"
103 if framework.NodeOSDistroIs("windows") {
104 nullDev = "NUL"
105 }
106 var healthzCheck string
107 if r.enableHTTPS {
108 healthzCheck = fmt.Sprintf(
109 "curl -sk -o %v -I -w \"%%{http_code}\" https://localhost:%v/healthz", nullDev, r.healthzPort)
110 } else {
111 healthzCheck = fmt.Sprintf(
112 "curl -s -o %v -I -w \"%%{http_code}\" http://localhost:%v/healthz", nullDev, r.healthzPort)
113
114 }
115 err := wait.PollWithContext(ctx, r.pollInterval, r.pollTimeout, func(ctx context.Context) (bool, error) {
116 result, err := e2essh.NodeExec(ctx, r.nodeName, healthzCheck, framework.TestContext.Provider)
117 if err != nil {
118 return false, err
119 }
120 if result.Code == 0 {
121 httpCode, err := strconv.Atoi(result.Stdout)
122 if err != nil {
123 framework.Logf("Unable to parse healthz http return code: %v", err)
124 } else if httpCode == 200 {
125 return true, nil
126 }
127 }
128 framework.Logf("node %v exec command, '%v' failed with exitcode %v: \n\tstdout: %v\n\tstderr: %v",
129 r.nodeName, healthzCheck, result.Code, result.Stdout, result.Stderr)
130 return false, nil
131 })
132 framework.ExpectNoError(err, "%v did not respond with a 200 via %v within %v", r, healthzCheck, r.pollTimeout)
133 }
134
135
136 func (r *RestartDaemonConfig) kill(ctx context.Context) {
137 killCmd := fmt.Sprintf("pgrep %v | xargs -I {} sudo kill {}", r.daemonName)
138 if framework.NodeOSDistroIs("windows") {
139 killCmd = fmt.Sprintf("taskkill /im %v.exe /f", r.daemonName)
140 }
141 framework.Logf("Killing %v", r)
142 _, err := e2essh.NodeExec(ctx, r.nodeName, killCmd, framework.TestContext.Provider)
143 framework.ExpectNoError(err)
144 }
145
146
147 func (r *RestartDaemonConfig) restart(ctx context.Context) {
148 r.waitUp(ctx)
149 r.kill(ctx)
150 r.waitUp(ctx)
151 }
152
153
154 type podTracker struct {
155 cache.ThreadSafeStore
156 }
157
158 func (p *podTracker) remember(pod *v1.Pod, eventType string) {
159 if eventType == UPDATE && pod.Status.Phase == v1.PodRunning {
160 return
161 }
162 p.Add(fmt.Sprintf("[%v] %v: %v", time.Now(), eventType, pod.Name), pod)
163 }
164
165 func (p *podTracker) String() (msg string) {
166 for _, k := range p.ListKeys() {
167 obj, exists := p.Get(k)
168 if !exists {
169 continue
170 }
171 pod := obj.(*v1.Pod)
172 msg += fmt.Sprintf("%v Phase %v Host %v\n", k, pod.Status.Phase, pod.Spec.NodeName)
173 }
174 return
175 }
176
177 func newPodTracker() *podTracker {
178 return &podTracker{cache.NewThreadSafeStore(
179 cache.Indexers{}, cache.Indices{})}
180 }
181
182
183 func replacePods(pods []*v1.Pod, store cache.Store) {
184 found := make([]interface{}, 0, len(pods))
185 for i := range pods {
186 found = append(found, pods[i])
187 }
188 framework.ExpectNoError(store.Replace(found, "0"))
189 }
190
191
192
193 func getContainerRestarts(ctx context.Context, c clientset.Interface, ns string, labelSelector labels.Selector) (int, []string) {
194 options := metav1.ListOptions{LabelSelector: labelSelector.String()}
195 pods, err := c.CoreV1().Pods(ns).List(ctx, options)
196 framework.ExpectNoError(err)
197 failedContainers := 0
198 containerRestartNodes := sets.NewString()
199 for _, p := range pods.Items {
200 for _, v := range testutils.FailedContainers(&p) {
201 failedContainers = failedContainers + v.Restarts
202 containerRestartNodes.Insert(p.Spec.NodeName)
203 }
204 }
205 return failedContainers, containerRestartNodes.List()
206 }
207
208 var _ = SIGDescribe("DaemonRestart", framework.WithDisruptive(), func() {
209
210 f := framework.NewDefaultFramework("daemonrestart")
211 f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged
212 rcName := "daemonrestart" + strconv.Itoa(numPods) + "-" + string(uuid.NewUUID())
213 labelSelector := labels.Set(map[string]string{"name": rcName}).AsSelector()
214 existingPods := cache.NewStore(cache.MetaNamespaceKeyFunc)
215 var ns string
216 var config testutils.RCConfig
217 var controller cache.Controller
218 var newPods cache.Store
219 var tracker *podTracker
220
221 ginkgo.BeforeEach(func(ctx context.Context) {
222
223 e2eskipper.SkipUnlessProviderIs(framework.ProvidersWithSSH...)
224 ns = f.Namespace.Name
225
226
227
228 config = testutils.RCConfig{
229 Client: f.ClientSet,
230 Name: rcName,
231 Namespace: ns,
232 Image: imageutils.GetPauseImageName(),
233 Replicas: numPods,
234 CreatedPods: &[]*v1.Pod{},
235 }
236 framework.ExpectNoError(e2erc.RunRC(ctx, config))
237 replacePods(*config.CreatedPods, existingPods)
238
239
240
241 backgroundCtx, cancel := context.WithCancel(context.Background())
242 ginkgo.DeferCleanup(cancel)
243 tracker = newPodTracker()
244 newPods, controller = cache.NewInformer(
245 &cache.ListWatch{
246 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
247 options.LabelSelector = labelSelector.String()
248 obj, err := f.ClientSet.CoreV1().Pods(ns).List(backgroundCtx, options)
249 return runtime.Object(obj), err
250 },
251 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
252 options.LabelSelector = labelSelector.String()
253 return f.ClientSet.CoreV1().Pods(ns).Watch(backgroundCtx, options)
254 },
255 },
256 &v1.Pod{},
257 0,
258 cache.ResourceEventHandlerFuncs{
259 AddFunc: func(obj interface{}) {
260 tracker.remember(obj.(*v1.Pod), ADD)
261 },
262 UpdateFunc: func(oldObj, newObj interface{}) {
263 tracker.remember(newObj.(*v1.Pod), UPDATE)
264 },
265 DeleteFunc: func(obj interface{}) {
266 tracker.remember(obj.(*v1.Pod), DEL)
267 },
268 },
269 )
270 go controller.Run(backgroundCtx.Done())
271 })
272
273 ginkgo.It("Controller Manager should not create/delete replicas across restart", func(ctx context.Context) {
274
275
276 e2eskipper.SkipUnlessProviderIs("gce", "aws")
277 restarter := NewRestartConfig(
278 framework.APIAddress(), "kube-controller", ports.KubeControllerManagerPort, restartPollInterval, restartTimeout, true)
279 restarter.restart(ctx)
280
281
282
283
284
285
286 e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods, true)
287
288
289
290 existingKeys := sets.NewString()
291 newKeys := sets.NewString()
292 for _, k := range existingPods.ListKeys() {
293 existingKeys.Insert(k)
294 }
295 for _, k := range newPods.ListKeys() {
296 newKeys.Insert(k)
297 }
298 if len(newKeys.List()) != len(existingKeys.List()) ||
299 !newKeys.IsSuperset(existingKeys) {
300 framework.Failf("RcManager created/deleted pods after restart \n\n %+v", tracker)
301 }
302 })
303
304 ginkgo.It("Scheduler should continue assigning pods to nodes across restart", func(ctx context.Context) {
305
306
307 e2eskipper.SkipUnlessProviderIs("gce", "aws")
308 restarter := NewRestartConfig(
309 framework.APIAddress(), "kube-scheduler", kubeschedulerconfig.DefaultKubeSchedulerPort, restartPollInterval, restartTimeout, true)
310
311
312
313 restarter.waitUp(ctx)
314 restarter.kill(ctx)
315
316
317 framework.ExpectNoError(e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, false))
318 restarter.waitUp(ctx)
319 framework.ExpectNoError(e2erc.ScaleRC(ctx, f.ClientSet, f.ScalesGetter, ns, rcName, numPods+5, true))
320 })
321
322 ginkgo.It("Kubelet should not restart containers across restart", func(ctx context.Context) {
323 nodeIPs, err := e2enode.GetPublicIps(ctx, f.ClientSet)
324 if err != nil {
325 framework.Logf("Unexpected error occurred: %v", err)
326 }
327 framework.ExpectNoErrorWithOffset(0, err)
328 preRestarts, badNodes := getContainerRestarts(ctx, f.ClientSet, ns, labelSelector)
329 if preRestarts != 0 {
330 framework.Logf("WARNING: Non-zero container restart count: %d across nodes %v", preRestarts, badNodes)
331 }
332 for _, ip := range nodeIPs {
333 restarter := NewRestartConfig(
334 ip, "kubelet", ports.KubeletReadOnlyPort, restartPollInterval, restartTimeout, false)
335 restarter.restart(ctx)
336 }
337 postRestarts, badNodes := getContainerRestarts(ctx, f.ClientSet, ns, labelSelector)
338 if postRestarts != preRestarts {
339 e2edebug.DumpNodeDebugInfo(ctx, f.ClientSet, badNodes, framework.Logf)
340 framework.Failf("Net container restart count went from %v -> %v after kubelet restart on nodes %v \n\n %+v", preRestarts, postRestarts, badNodes, tracker)
341 }
342 })
343
344 ginkgo.It("Kube-proxy should recover after being killed accidentally", func(ctx context.Context) {
345 nodeIPs, err := e2enode.GetPublicIps(ctx, f.ClientSet)
346 if err != nil {
347 framework.Logf("Unexpected error occurred: %v", err)
348 }
349 for _, ip := range nodeIPs {
350 restarter := NewRestartConfig(
351 ip, "kube-proxy", ports.ProxyHealthzPort, restartPollInterval, restartTimeout, false)
352
353
354 restarter.restart(ctx)
355 }
356 })
357 })
358
View as plain text