...

Source file src/k8s.io/kubernetes/test/e2e/upgrades/network/kube_proxy_migration.go

Documentation: k8s.io/kubernetes/test/e2e/upgrades/network

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package network
    18  
    19  import (
    20  	"context"
    21  	"fmt"
    22  	"time"
    23  
    24  	appsv1 "k8s.io/api/apps/v1"
    25  	v1 "k8s.io/api/core/v1"
    26  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    27  	"k8s.io/apimachinery/pkg/labels"
    28  	"k8s.io/apimachinery/pkg/util/wait"
    29  	clientset "k8s.io/client-go/kubernetes"
    30  	"k8s.io/kubernetes/test/e2e/framework"
    31  	e2edaemonset "k8s.io/kubernetes/test/e2e/framework/daemonset"
    32  	e2enode "k8s.io/kubernetes/test/e2e/framework/node"
    33  	"k8s.io/kubernetes/test/e2e/upgrades"
    34  
    35  	"github.com/onsi/ginkgo/v2"
    36  )
    37  
    38  const (
    39  	defaultTestTimeout   = time.Duration(5 * time.Minute)
    40  	clusterAddonLabelKey = "k8s-app"
    41  	clusterComponentKey  = "component"
    42  	kubeProxyLabelName   = "kube-proxy"
    43  )
    44  
    45  // KubeProxyUpgradeTest tests kube-proxy static pods -> DaemonSet upgrade path.
    46  type KubeProxyUpgradeTest struct {
    47  }
    48  
    49  // Name returns the tracking name of the test.
    50  func (KubeProxyUpgradeTest) Name() string { return "[sig-network] kube-proxy-upgrade" }
    51  
    52  // Setup verifies kube-proxy static pods is running before upgrade.
    53  func (t *KubeProxyUpgradeTest) Setup(ctx context.Context, f *framework.Framework) {
    54  	ginkgo.By("Waiting for kube-proxy static pods running and ready")
    55  	err := waitForKubeProxyStaticPodsRunning(ctx, f.ClientSet)
    56  	framework.ExpectNoError(err)
    57  }
    58  
    59  // Test validates if kube-proxy is migrated from static pods to DaemonSet.
    60  func (t *KubeProxyUpgradeTest) Test(ctx context.Context, f *framework.Framework, done <-chan struct{}, upgrade upgrades.UpgradeType) {
    61  	c := f.ClientSet
    62  
    63  	// Block until upgrade is done.
    64  	ginkgo.By("Waiting for upgrade to finish")
    65  	<-done
    66  
    67  	ginkgo.By("Waiting for kube-proxy static pods disappear")
    68  	err := waitForKubeProxyStaticPodsDisappear(ctx, c)
    69  	framework.ExpectNoError(err)
    70  
    71  	ginkgo.By("Waiting for kube-proxy DaemonSet running and ready")
    72  	err = waitForKubeProxyDaemonSetRunning(ctx, f, c)
    73  	framework.ExpectNoError(err)
    74  }
    75  
    76  // Teardown does nothing.
    77  func (t *KubeProxyUpgradeTest) Teardown(ctx context.Context, f *framework.Framework) {
    78  }
    79  
    80  // KubeProxyDowngradeTest tests kube-proxy DaemonSet -> static pods downgrade path.
    81  type KubeProxyDowngradeTest struct {
    82  }
    83  
    84  // Name returns the tracking name of the test.
    85  func (KubeProxyDowngradeTest) Name() string { return "[sig-network] kube-proxy-downgrade" }
    86  
    87  // Setup verifies kube-proxy DaemonSet is running before upgrade.
    88  func (t *KubeProxyDowngradeTest) Setup(ctx context.Context, f *framework.Framework) {
    89  	ginkgo.By("Waiting for kube-proxy DaemonSet running and ready")
    90  	err := waitForKubeProxyDaemonSetRunning(ctx, f, f.ClientSet)
    91  	framework.ExpectNoError(err)
    92  }
    93  
    94  // Test validates if kube-proxy is migrated from DaemonSet to static pods.
    95  func (t *KubeProxyDowngradeTest) Test(ctx context.Context, f *framework.Framework, done <-chan struct{}, upgrade upgrades.UpgradeType) {
    96  	c := f.ClientSet
    97  
    98  	// Block until upgrade is done.
    99  	ginkgo.By("Waiting for upgrade to finish")
   100  	<-done
   101  
   102  	ginkgo.By("Waiting for kube-proxy DaemonSet disappear")
   103  	err := waitForKubeProxyDaemonSetDisappear(ctx, c)
   104  	framework.ExpectNoError(err)
   105  
   106  	ginkgo.By("Waiting for kube-proxy static pods running and ready")
   107  	err = waitForKubeProxyStaticPodsRunning(ctx, c)
   108  	framework.ExpectNoError(err)
   109  }
   110  
   111  // Teardown does nothing.
   112  func (t *KubeProxyDowngradeTest) Teardown(ctx context.Context, f *framework.Framework) {
   113  }
   114  
   115  func waitForKubeProxyStaticPodsRunning(ctx context.Context, c clientset.Interface) error {
   116  	framework.Logf("Waiting up to %v for kube-proxy static pods running", defaultTestTimeout)
   117  
   118  	condition := func() (bool, error) {
   119  		pods, err := getKubeProxyStaticPods(ctx, c)
   120  		if err != nil {
   121  			framework.Logf("Failed to get kube-proxy static pods: %v", err)
   122  			return false, nil
   123  		}
   124  
   125  		nodes, err := e2enode.GetReadySchedulableNodes(ctx, c)
   126  		if err != nil {
   127  			framework.Logf("Failed to get nodes: %v", err)
   128  			return false, nil
   129  		}
   130  
   131  		numberSchedulableNodes := len(nodes.Items)
   132  		numberkubeProxyPods := 0
   133  		for _, pod := range pods.Items {
   134  			if pod.Status.Phase == v1.PodRunning {
   135  				numberkubeProxyPods = numberkubeProxyPods + 1
   136  			}
   137  		}
   138  		if numberkubeProxyPods != numberSchedulableNodes {
   139  			framework.Logf("Expect %v kube-proxy static pods running, got %v running, %v in total", numberSchedulableNodes, numberkubeProxyPods, len(pods.Items))
   140  			return false, nil
   141  		}
   142  		return true, nil
   143  	}
   144  
   145  	if err := wait.PollImmediate(5*time.Second, defaultTestTimeout, condition); err != nil {
   146  		return fmt.Errorf("error waiting for kube-proxy static pods running: %w", err)
   147  	}
   148  	return nil
   149  }
   150  
   151  func waitForKubeProxyStaticPodsDisappear(ctx context.Context, c clientset.Interface) error {
   152  	framework.Logf("Waiting up to %v for kube-proxy static pods disappear", defaultTestTimeout)
   153  
   154  	condition := func() (bool, error) {
   155  		pods, err := getKubeProxyStaticPods(ctx, c)
   156  		if err != nil {
   157  			framework.Logf("Failed to get kube-proxy static pods: %v", err)
   158  			return false, nil
   159  		}
   160  
   161  		if len(pods.Items) != 0 {
   162  			framework.Logf("Expect kube-proxy static pods to disappear, got %v pods", len(pods.Items))
   163  			return false, nil
   164  		}
   165  		return true, nil
   166  	}
   167  
   168  	if err := wait.PollImmediate(5*time.Second, defaultTestTimeout, condition); err != nil {
   169  		return fmt.Errorf("error waiting for kube-proxy static pods disappear: %w", err)
   170  	}
   171  	return nil
   172  }
   173  
   174  func waitForKubeProxyDaemonSetRunning(ctx context.Context, f *framework.Framework, c clientset.Interface) error {
   175  	framework.Logf("Waiting up to %v for kube-proxy DaemonSet running", defaultTestTimeout)
   176  
   177  	condition := func() (bool, error) {
   178  		daemonSets, err := getKubeProxyDaemonSet(ctx, c)
   179  		if err != nil {
   180  			framework.Logf("Failed to get kube-proxy DaemonSet: %v", err)
   181  			return false, nil
   182  		}
   183  
   184  		if len(daemonSets.Items) != 1 {
   185  			framework.Logf("Expect only one kube-proxy DaemonSet, got %v", len(daemonSets.Items))
   186  			return false, nil
   187  		}
   188  
   189  		return e2edaemonset.CheckRunningOnAllNodes(ctx, f, &daemonSets.Items[0])
   190  	}
   191  
   192  	if err := wait.PollImmediate(5*time.Second, defaultTestTimeout, condition); err != nil {
   193  		return fmt.Errorf("error waiting for kube-proxy DaemonSet running: %w", err)
   194  	}
   195  	return nil
   196  }
   197  
   198  func waitForKubeProxyDaemonSetDisappear(ctx context.Context, c clientset.Interface) error {
   199  	framework.Logf("Waiting up to %v for kube-proxy DaemonSet disappear", defaultTestTimeout)
   200  
   201  	condition := func() (bool, error) {
   202  		daemonSets, err := getKubeProxyDaemonSet(ctx, c)
   203  		if err != nil {
   204  			framework.Logf("Failed to get kube-proxy DaemonSet: %v", err)
   205  			return false, nil
   206  		}
   207  
   208  		if len(daemonSets.Items) != 0 {
   209  			framework.Logf("Expect kube-proxy DaemonSet to disappear, got %v DaemonSet", len(daemonSets.Items))
   210  			return false, nil
   211  		}
   212  		return true, nil
   213  	}
   214  
   215  	if err := wait.PollImmediate(5*time.Second, defaultTestTimeout, condition); err != nil {
   216  		return fmt.Errorf("error waiting for kube-proxy DaemonSet disappear: %w", err)
   217  	}
   218  	return nil
   219  }
   220  
   221  func getKubeProxyStaticPods(ctx context.Context, c clientset.Interface) (*v1.PodList, error) {
   222  	label := labels.SelectorFromSet(labels.Set(map[string]string{clusterComponentKey: kubeProxyLabelName}))
   223  	listOpts := metav1.ListOptions{LabelSelector: label.String()}
   224  	return c.CoreV1().Pods(metav1.NamespaceSystem).List(ctx, listOpts)
   225  }
   226  
   227  func getKubeProxyDaemonSet(ctx context.Context, c clientset.Interface) (*appsv1.DaemonSetList, error) {
   228  	label := labels.SelectorFromSet(labels.Set(map[string]string{clusterAddonLabelKey: kubeProxyLabelName}))
   229  	listOpts := metav1.ListOptions{LabelSelector: label.String()}
   230  	return c.AppsV1().DaemonSets(metav1.NamespaceSystem).List(ctx, listOpts)
   231  }
   232  

View as plain text