1
16
17 package node
18
19 import (
20 "context"
21 "fmt"
22 "regexp"
23 "time"
24
25 v1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/fields"
28 "k8s.io/apimachinery/pkg/util/wait"
29 clientset "k8s.io/client-go/kubernetes"
30 "k8s.io/kubernetes/test/e2e/framework"
31 )
32
33 const sleepTime = 20 * time.Second
34
35 var requiredPerNodePods = []*regexp.Regexp{
36 regexp.MustCompile(".*kube-proxy.*"),
37 regexp.MustCompile(".*fluentd-elasticsearch.*"),
38 regexp.MustCompile(".*node-problem-detector.*"),
39 }
40
41
42
43 func WaitForReadyNodes(ctx context.Context, c clientset.Interface, size int, timeout time.Duration) error {
44 _, err := CheckReady(ctx, c, size, timeout)
45 return err
46 }
47
48
49 func WaitForTotalHealthy(ctx context.Context, c clientset.Interface, timeout time.Duration) error {
50 framework.Logf("Waiting up to %v for all nodes to be ready", timeout)
51
52 var notReady []v1.Node
53 var missingPodsPerNode map[string][]string
54 err := wait.PollUntilContextTimeout(ctx, poll, timeout, true, func(ctx context.Context) (bool, error) {
55 notReady = nil
56
57 nodes, err := c.CoreV1().Nodes().List(ctx, metav1.ListOptions{ResourceVersion: "0"})
58 if err != nil {
59 return false, err
60 }
61 for _, node := range nodes.Items {
62 if !IsConditionSetAsExpected(&node, v1.NodeReady, true) {
63 notReady = append(notReady, node)
64 }
65 }
66 pods, err := c.CoreV1().Pods(metav1.NamespaceAll).List(ctx, metav1.ListOptions{ResourceVersion: "0"})
67 if err != nil {
68 return false, err
69 }
70
71 systemPodsPerNode := make(map[string][]string)
72 for _, pod := range pods.Items {
73 if pod.Namespace == metav1.NamespaceSystem && pod.Status.Phase == v1.PodRunning {
74 if pod.Spec.NodeName != "" {
75 systemPodsPerNode[pod.Spec.NodeName] = append(systemPodsPerNode[pod.Spec.NodeName], pod.Name)
76 }
77 }
78 }
79 missingPodsPerNode = make(map[string][]string)
80 for _, node := range nodes.Items {
81 if isNodeSchedulableWithoutTaints(&node) {
82 for _, requiredPod := range requiredPerNodePods {
83 foundRequired := false
84 for _, presentPod := range systemPodsPerNode[node.Name] {
85 if requiredPod.MatchString(presentPod) {
86 foundRequired = true
87 break
88 }
89 }
90 if !foundRequired {
91 missingPodsPerNode[node.Name] = append(missingPodsPerNode[node.Name], requiredPod.String())
92 }
93 }
94 }
95 }
96 return len(notReady) == 0 && len(missingPodsPerNode) == 0, nil
97 })
98
99 if err != nil && !wait.Interrupted(err) {
100 return err
101 }
102
103 if len(notReady) > 0 {
104 return fmt.Errorf("Not ready nodes: %v", notReady)
105 }
106 if len(missingPodsPerNode) > 0 {
107 return fmt.Errorf("Not running system Pods: %v", missingPodsPerNode)
108 }
109 return nil
110
111 }
112
113
114
115
116
117 func WaitConditionToBe(ctx context.Context, c clientset.Interface, name string, conditionType v1.NodeConditionType, wantTrue bool, timeout time.Duration) bool {
118 framework.Logf("Waiting up to %v for node %s condition %s to be %t", timeout, name, conditionType, wantTrue)
119 for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
120 node, err := c.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
121 if err != nil {
122 framework.Logf("Couldn't get node %s", name)
123 continue
124 }
125
126 if IsConditionSetAsExpected(node, conditionType, wantTrue) {
127 return true
128 }
129 }
130 framework.Logf("Node %s didn't reach desired %s condition status (%t) within %v", name, conditionType, wantTrue, timeout)
131 return false
132 }
133
134
135
136
137 func WaitForNodeToBeNotReady(ctx context.Context, c clientset.Interface, name string, timeout time.Duration) bool {
138 return WaitConditionToBe(ctx, c, name, v1.NodeReady, false, timeout)
139 }
140
141
142 func WaitForNodeToBeReady(ctx context.Context, c clientset.Interface, name string, timeout time.Duration) bool {
143 return WaitConditionToBe(ctx, c, name, v1.NodeReady, true, timeout)
144 }
145
146 func WaitForNodeSchedulable(ctx context.Context, c clientset.Interface, name string, timeout time.Duration, wantSchedulable bool) bool {
147 framework.Logf("Waiting up to %v for node %s to be schedulable: %t", timeout, name, wantSchedulable)
148 for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
149 node, err := c.CoreV1().Nodes().Get(ctx, name, metav1.GetOptions{})
150 if err != nil {
151 framework.Logf("Couldn't get node %s", name)
152 continue
153 }
154
155 if IsNodeSchedulable(node) == wantSchedulable {
156 return true
157 }
158 }
159 framework.Logf("Node %s didn't reach desired schedulable status (%t) within %v", name, wantSchedulable, timeout)
160 return false
161 }
162
163
164
165 func CheckReady(ctx context.Context, c clientset.Interface, size int, timeout time.Duration) ([]v1.Node, error) {
166 for start := time.Now(); time.Since(start) < timeout; time.Sleep(sleepTime) {
167 nodes, err := waitListSchedulableNodes(ctx, c)
168 if err != nil {
169 framework.Logf("Failed to list nodes: %v", err)
170 continue
171 }
172 numNodes := len(nodes.Items)
173
174
175 Filter(nodes, func(node v1.Node) bool {
176 nodeReady := IsConditionSetAsExpected(&node, v1.NodeReady, true)
177 networkReady := isConditionUnset(&node, v1.NodeNetworkUnavailable) || IsConditionSetAsExpected(&node, v1.NodeNetworkUnavailable, false)
178 return nodeReady && networkReady
179 })
180 numReady := len(nodes.Items)
181
182 if numNodes == size && numReady == size {
183 framework.Logf("Cluster has reached the desired number of ready nodes %d", size)
184 return nodes.Items, nil
185 }
186 framework.Logf("Waiting for ready nodes %d, current ready %d, not ready nodes %d", size, numReady, numNodes-numReady)
187 }
188 return nil, fmt.Errorf("timeout waiting %v for number of ready nodes to be %d", timeout, size)
189 }
190
191
192 func waitListSchedulableNodes(ctx context.Context, c clientset.Interface) (*v1.NodeList, error) {
193 var nodes *v1.NodeList
194 var err error
195 if wait.PollUntilContextTimeout(ctx, poll, singleCallTimeout, true, func(ctx context.Context) (bool, error) {
196 nodes, err = c.CoreV1().Nodes().List(ctx, metav1.ListOptions{FieldSelector: fields.Set{
197 "spec.unschedulable": "false",
198 }.AsSelector().String()})
199 if err != nil {
200 return false, err
201 }
202 return true, nil
203 }) != nil {
204 return nodes, err
205 }
206 return nodes, nil
207 }
208
209
210 func checkWaitListSchedulableNodes(ctx context.Context, c clientset.Interface) (*v1.NodeList, error) {
211 nodes, err := waitListSchedulableNodes(ctx, c)
212 if err != nil {
213 return nil, fmt.Errorf("error: %s. Non-retryable failure or timed out while listing nodes for e2e cluster", err)
214 }
215 return nodes, nil
216 }
217
218
219 func CheckReadyForTests(ctx context.Context, c clientset.Interface, nonblockingTaints string, allowedNotReadyNodes, largeClusterThreshold int) func(ctx context.Context) (bool, error) {
220 attempt := 0
221 return func(ctx context.Context) (bool, error) {
222 if allowedNotReadyNodes == -1 {
223 return true, nil
224 }
225 attempt++
226 var nodesNotReadyYet []v1.Node
227 opts := metav1.ListOptions{
228 ResourceVersion: "0",
229
230 FieldSelector: fields.Set{"spec.unschedulable": "false"}.AsSelector().String(),
231 }
232 allNodes, err := c.CoreV1().Nodes().List(ctx, opts)
233 if err != nil {
234 var terminalListNodesErr error
235 framework.Logf("Unexpected error listing nodes: %v", err)
236 if attempt >= 3 {
237 terminalListNodesErr = err
238 }
239 return false, terminalListNodesErr
240 }
241 for _, node := range allNodes.Items {
242 if !readyForTests(&node, nonblockingTaints) {
243 nodesNotReadyYet = append(nodesNotReadyYet, node)
244 }
245 }
246
247
248
249
250
251
252
253 if len(nodesNotReadyYet) > 0 {
254
255 if len(nodesNotReadyYet) < largeClusterThreshold || attempt%10 == 0 {
256 framework.Logf("Unschedulable nodes= %v, maximum value for starting tests= %v", len(nodesNotReadyYet), allowedNotReadyNodes)
257 for _, node := range nodesNotReadyYet {
258 framework.Logf(" -> Node %s [[[ Ready=%t, Network(available)=%t, Taints=%v, NonblockingTaints=%v ]]]",
259 node.Name,
260 IsConditionSetAsExpectedSilent(&node, v1.NodeReady, true),
261 IsConditionSetAsExpectedSilent(&node, v1.NodeNetworkUnavailable, false),
262 node.Spec.Taints,
263 nonblockingTaints,
264 )
265
266 }
267 if len(nodesNotReadyYet) > allowedNotReadyNodes {
268 ready := len(allNodes.Items) - len(nodesNotReadyYet)
269 remaining := len(nodesNotReadyYet) - allowedNotReadyNodes
270 framework.Logf("==== node wait: %v out of %v nodes are ready, max notReady allowed %v. Need %v more before starting.", ready, len(allNodes.Items), allowedNotReadyNodes, remaining)
271 }
272 }
273 }
274 return len(nodesNotReadyYet) <= allowedNotReadyNodes, nil
275 }
276 }
277
278
279
280
281
282 func readyForTests(node *v1.Node, nonblockingTaints string) bool {
283 if hasNonblockingTaint(node, nonblockingTaints) {
284
285
286 if !IsNodeReady(node) || !isNodeUntaintedWithNonblocking(node, nonblockingTaints) {
287 return false
288 }
289 } else {
290 if !IsNodeSchedulable(node) || !isNodeUntainted(node) {
291 return false
292 }
293 }
294 return true
295 }
296
View as plain text