1
16
17 package network
18
19 import (
20 "context"
21 "fmt"
22 "time"
23
24 appsv1 "k8s.io/api/apps/v1"
25 v1 "k8s.io/api/core/v1"
26 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27 "k8s.io/apimachinery/pkg/labels"
28 "k8s.io/apimachinery/pkg/util/wait"
29 clientset "k8s.io/client-go/kubernetes"
30 "k8s.io/kubernetes/test/e2e/framework"
31 e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
32 e2enode "k8s.io/kubernetes/test/e2e/framework/node"
33 "k8s.io/kubernetes/test/e2e/upgrades"
34
35 "github.com/onsi/ginkgo/v2"
36 )
37
38 const (
39 defaultTestTimeout = time.Duration(5 * time.Minute)
40 clusterAddonLabelKey = "k8s-app"
41 clusterComponentKey = "component"
42 kubeProxyLabelName = "kube-proxy"
43 )
44
45
46 type KubeProxyUpgradeTest struct {
47 }
48
49
50 func (KubeProxyUpgradeTest) Name() string { return "[sig-network] kube-proxy-upgrade" }
51
52
53 func (t *KubeProxyUpgradeTest) Setup(ctx context.Context, f *framework.Framework) {
54 ginkgo.By("Waiting for kube-proxy static pods running and ready")
55 err := waitForKubeProxyStaticPodsRunning(ctx, f.ClientSet)
56 framework.ExpectNoError(err)
57 }
58
59
60 func (t *KubeProxyUpgradeTest) Test(ctx context.Context, f *framework.Framework, done <-chan struct{}, upgrade upgrades.UpgradeType) {
61 c := f.ClientSet
62
63
64 ginkgo.By("Waiting for upgrade to finish")
65 <-done
66
67 ginkgo.By("Waiting for kube-proxy static pods disappear")
68 err := waitForKubeProxyStaticPodsDisappear(ctx, c)
69 framework.ExpectNoError(err)
70
71 ginkgo.By("Waiting for kube-proxy DaemonSet running and ready")
72 err = waitForKubeProxyDaemonSetRunning(ctx, f, c)
73 framework.ExpectNoError(err)
74 }
75
76
77 func (t *KubeProxyUpgradeTest) Teardown(ctx context.Context, f *framework.Framework) {
78 }
79
80
81 type KubeProxyDowngradeTest struct {
82 }
83
84
85 func (KubeProxyDowngradeTest) Name() string { return "[sig-network] kube-proxy-downgrade" }
86
87
88 func (t *KubeProxyDowngradeTest) Setup(ctx context.Context, f *framework.Framework) {
89 ginkgo.By("Waiting for kube-proxy DaemonSet running and ready")
90 err := waitForKubeProxyDaemonSetRunning(ctx, f, f.ClientSet)
91 framework.ExpectNoError(err)
92 }
93
94
95 func (t *KubeProxyDowngradeTest) Test(ctx context.Context, f *framework.Framework, done <-chan struct{}, upgrade upgrades.UpgradeType) {
96 c := f.ClientSet
97
98
99 ginkgo.By("Waiting for upgrade to finish")
100 <-done
101
102 ginkgo.By("Waiting for kube-proxy DaemonSet disappear")
103 err := waitForKubeProxyDaemonSetDisappear(ctx, c)
104 framework.ExpectNoError(err)
105
106 ginkgo.By("Waiting for kube-proxy static pods running and ready")
107 err = waitForKubeProxyStaticPodsRunning(ctx, c)
108 framework.ExpectNoError(err)
109 }
110
111
112 func (t *KubeProxyDowngradeTest) Teardown(ctx context.Context, f *framework.Framework) {
113 }
114
115 func waitForKubeProxyStaticPodsRunning(ctx context.Context, c clientset.Interface) error {
116 framework.Logf("Waiting up to %v for kube-proxy static pods running", defaultTestTimeout)
117
118 condition := func() (bool, error) {
119 pods, err := getKubeProxyStaticPods(ctx, c)
120 if err != nil {
121 framework.Logf("Failed to get kube-proxy static pods: %v", err)
122 return false, nil
123 }
124
125 nodes, err := e2enode.GetReadySchedulableNodes(ctx, c)
126 if err != nil {
127 framework.Logf("Failed to get nodes: %v", err)
128 return false, nil
129 }
130
131 numberSchedulableNodes := len(nodes.Items)
132 numberkubeProxyPods := 0
133 for _, pod := range pods.Items {
134 if pod.Status.Phase == v1.PodRunning {
135 numberkubeProxyPods = numberkubeProxyPods + 1
136 }
137 }
138 if numberkubeProxyPods != numberSchedulableNodes {
139 framework.Logf("Expect %v kube-proxy static pods running, got %v running, %v in total", numberSchedulableNodes, numberkubeProxyPods, len(pods.Items))
140 return false, nil
141 }
142 return true, nil
143 }
144
145 if err := wait.PollImmediate(5*time.Second, defaultTestTimeout, condition); err != nil {
146 return fmt.Errorf("error waiting for kube-proxy static pods running: %w", err)
147 }
148 return nil
149 }
150
151 func waitForKubeProxyStaticPodsDisappear(ctx context.Context, c clientset.Interface) error {
152 framework.Logf("Waiting up to %v for kube-proxy static pods disappear", defaultTestTimeout)
153
154 condition := func() (bool, error) {
155 pods, err := getKubeProxyStaticPods(ctx, c)
156 if err != nil {
157 framework.Logf("Failed to get kube-proxy static pods: %v", err)
158 return false, nil
159 }
160
161 if len(pods.Items) != 0 {
162 framework.Logf("Expect kube-proxy static pods to disappear, got %v pods", len(pods.Items))
163 return false, nil
164 }
165 return true, nil
166 }
167
168 if err := wait.PollImmediate(5*time.Second, defaultTestTimeout, condition); err != nil {
169 return fmt.Errorf("error waiting for kube-proxy static pods disappear: %w", err)
170 }
171 return nil
172 }
173
174 func waitForKubeProxyDaemonSetRunning(ctx context.Context, f *framework.Framework, c clientset.Interface) error {
175 framework.Logf("Waiting up to %v for kube-proxy DaemonSet running", defaultTestTimeout)
176
177 condition := func() (bool, error) {
178 daemonSets, err := getKubeProxyDaemonSet(ctx, c)
179 if err != nil {
180 framework.Logf("Failed to get kube-proxy DaemonSet: %v", err)
181 return false, nil
182 }
183
184 if len(daemonSets.Items) != 1 {
185 framework.Logf("Expect only one kube-proxy DaemonSet, got %v", len(daemonSets.Items))
186 return false, nil
187 }
188
189 return e2edaemonset.CheckRunningOnAllNodes(ctx, f, &daemonSets.Items[0])
190 }
191
192 if err := wait.PollImmediate(5*time.Second, defaultTestTimeout, condition); err != nil {
193 return fmt.Errorf("error waiting for kube-proxy DaemonSet running: %w", err)
194 }
195 return nil
196 }
197
198 func waitForKubeProxyDaemonSetDisappear(ctx context.Context, c clientset.Interface) error {
199 framework.Logf("Waiting up to %v for kube-proxy DaemonSet disappear", defaultTestTimeout)
200
201 condition := func() (bool, error) {
202 daemonSets, err := getKubeProxyDaemonSet(ctx, c)
203 if err != nil {
204 framework.Logf("Failed to get kube-proxy DaemonSet: %v", err)
205 return false, nil
206 }
207
208 if len(daemonSets.Items) != 0 {
209 framework.Logf("Expect kube-proxy DaemonSet to disappear, got %v DaemonSet", len(daemonSets.Items))
210 return false, nil
211 }
212 return true, nil
213 }
214
215 if err := wait.PollImmediate(5*time.Second, defaultTestTimeout, condition); err != nil {
216 return fmt.Errorf("error waiting for kube-proxy DaemonSet disappear: %w", err)
217 }
218 return nil
219 }
220
221 func getKubeProxyStaticPods(ctx context.Context, c clientset.Interface) (*v1.PodList, error) {
222 label := labels.SelectorFromSet(labels.Set(map[string]string{clusterComponentKey: kubeProxyLabelName}))
223 listOpts := metav1.ListOptions{LabelSelector: label.String()}
224 return c.CoreV1().Pods(metav1.NamespaceSystem).List(ctx, listOpts)
225 }
226
227 func getKubeProxyDaemonSet(ctx context.Context, c clientset.Interface) (*appsv1.DaemonSetList, error) {
228 label := labels.SelectorFromSet(labels.Set(map[string]string{clusterAddonLabelKey: kubeProxyLabelName}))
229 listOpts := metav1.ListOptions{LabelSelector: label.String()}
230 return c.AppsV1().DaemonSets(metav1.NamespaceSystem).List(ctx, listOpts)
231 }
232
View as plain text