1
16
17 package utils
18
19 import (
20 "context"
21 "fmt"
22 "io"
23 "os"
24 "path"
25 "regexp"
26 "strings"
27 "time"
28
29 "github.com/onsi/ginkgo/v2"
30 "github.com/onsi/gomega"
31 v1 "k8s.io/api/core/v1"
32 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
33 clientset "k8s.io/client-go/kubernetes"
34 "k8s.io/kubernetes/test/e2e/framework"
35 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
36 e2essh "k8s.io/kubernetes/test/e2e/framework/ssh"
37 "k8s.io/kubernetes/test/e2e/storage/podlogs"
38 )
39
40
41
42
43
44
45
46 func StartPodLogs(ctx context.Context, f *framework.Framework, driverNamespace *v1.Namespace) func() {
47 ctx, cancel := context.WithCancel(ctx)
48 cs := f.ClientSet
49
50 ns := driverNamespace.Name
51
52 var podEventLog io.Writer = ginkgo.GinkgoWriter
53 var podEventLogCloser io.Closer
54 to := podlogs.LogOutput{
55 StatusWriter: ginkgo.GinkgoWriter,
56 }
57 if framework.TestContext.ReportDir == "" {
58 to.LogWriter = ginkgo.GinkgoWriter
59 } else {
60 test := ginkgo.CurrentSpecReport()
61
62
63
64 reg := regexp.MustCompile("[^a-zA-Z0-9_-]+")
65 var testName []string
66 for _, text := range test.ContainerHierarchyTexts {
67 testName = append(testName, reg.ReplaceAllString(text, "_"))
68 if len(test.LeafNodeText) > 0 {
69 testName = append(testName, reg.ReplaceAllString(test.LeafNodeText, "_"))
70 }
71 }
72
73
74
75
76
77
78
79
80 logDir := framework.TestContext.ReportDir + "/" + strings.Join(testName, "/")
81 to.LogPathPrefix = logDir + "/"
82
83 err := os.MkdirAll(logDir, 0755)
84 framework.ExpectNoError(err, "create pod log directory")
85 f, err := os.Create(path.Join(logDir, "pod-event.log"))
86 framework.ExpectNoError(err, "create pod events log file")
87 podEventLog = f
88 podEventLogCloser = f
89 }
90 podlogs.CopyAllLogs(ctx, cs, ns, to)
91
92
93
94
95 podlogs.WatchPods(ctx, cs, ns, podEventLog, podEventLogCloser)
96
97 return cancel
98 }
99
100
101
102
103
104
105
106 func KubeletCommand(ctx context.Context, kOp KubeletOpt, c clientset.Interface, pod *v1.Pod) {
107 command := ""
108 systemctlPresent := false
109 kubeletPid := ""
110
111 nodeIP, err := getHostAddress(ctx, c, pod)
112 framework.ExpectNoError(err)
113 nodeIP = nodeIP + ":22"
114
115 framework.Logf("Checking if systemctl command is present")
116 sshResult, err := e2essh.SSH(ctx, "systemctl --version", nodeIP, framework.TestContext.Provider)
117 framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName))
118 if !strings.Contains(sshResult.Stderr, "command not found") {
119 command = fmt.Sprintf("systemctl %s kubelet", string(kOp))
120 systemctlPresent = true
121 } else {
122 command = fmt.Sprintf("service kubelet %s", string(kOp))
123 }
124
125 sudoPresent := isSudoPresent(ctx, nodeIP, framework.TestContext.Provider)
126 if sudoPresent {
127 command = fmt.Sprintf("sudo %s", command)
128 }
129
130 if kOp == KRestart {
131 kubeletPid = getKubeletMainPid(ctx, nodeIP, sudoPresent, systemctlPresent)
132 }
133
134 framework.Logf("Attempting `%s`", command)
135 sshResult, err = e2essh.SSH(ctx, command, nodeIP, framework.TestContext.Provider)
136 framework.ExpectNoError(err, fmt.Sprintf("SSH to Node %q errored.", pod.Spec.NodeName))
137 e2essh.LogResult(sshResult)
138 gomega.Expect(sshResult.Code).To(gomega.BeZero(), "Failed to [%s] kubelet:\n%#v", string(kOp), sshResult)
139
140 if kOp == KStop {
141 if ok := e2enode.WaitForNodeToBeNotReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok {
142 framework.Failf("Node %s failed to enter NotReady state", pod.Spec.NodeName)
143 }
144 }
145 if kOp == KRestart {
146
147 isPidChanged := false
148 for start := time.Now(); time.Since(start) < 1*time.Minute; time.Sleep(2 * time.Second) {
149 if ctx.Err() != nil {
150 framework.Fail("timed out waiting for Kubelet POD change")
151 }
152 kubeletPidAfterRestart := getKubeletMainPid(ctx, nodeIP, sudoPresent, systemctlPresent)
153 if kubeletPid != kubeletPidAfterRestart {
154 isPidChanged = true
155 break
156 }
157 }
158 if !isPidChanged {
159 framework.Fail("Kubelet PID remained unchanged after restarting Kubelet")
160 }
161
162 framework.Logf("Noticed that kubelet PID is changed. Waiting for 30 Seconds for Kubelet to come back")
163 time.Sleep(30 * time.Second)
164 }
165 if kOp == KStart || kOp == KRestart {
166
167 if ok := e2enode.WaitForNodeToBeReady(ctx, c, pod.Spec.NodeName, NodeStateTimeout); !ok {
168 framework.Failf("Node %s failed to enter Ready state", pod.Spec.NodeName)
169 }
170 }
171 }
172
173
174
175
176 func getHostAddress(ctx context.Context, client clientset.Interface, p *v1.Pod) (string, error) {
177 node, err := client.CoreV1().Nodes().Get(ctx, p.Spec.NodeName, metav1.GetOptions{})
178 if err != nil {
179 return "", err
180 }
181
182 for _, address := range node.Status.Addresses {
183 if address.Type == v1.NodeExternalIP {
184 if address.Address != "" {
185 return address.Address, nil
186 }
187 }
188 }
189
190 for _, address := range node.Status.Addresses {
191 if address.Type == v1.NodeInternalIP {
192 if address.Address != "" {
193 return address.Address, nil
194 }
195 }
196 }
197
198
199 return "", fmt.Errorf("No address for pod %v on node %v",
200 p.Name, p.Spec.NodeName)
201 }
202
View as plain text