...

Source file src/k8s.io/kubernetes/test/e2e/upgrades/apps/etcd.go

Documentation: k8s.io/kubernetes/test/e2e/upgrades/apps

     1  /*
     2  Copyright 2017 The Kubernetes Authors.
     3  
     4  Licensed under the Apache License, Version 2.0 (the "License");
     5  you may not use this file except in compliance with the License.
     6  You may obtain a copy of the License at
     7  
     8      http://www.apache.org/licenses/LICENSE-2.0
     9  
    10  Unless required by applicable law or agreed to in writing, software
    11  distributed under the License is distributed on an "AS IS" BASIS,
    12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    13  See the License for the specific language governing permissions and
    14  limitations under the License.
    15  */
    16  
    17  package apps
    18  
    19  import (
    20  	"context"
    21  	"encoding/json"
    22  	"fmt"
    23  	"io"
    24  	"net"
    25  	"net/http"
    26  	"path/filepath"
    27  	"sync"
    28  	"time"
    29  
    30  	"github.com/onsi/ginkgo/v2"
    31  	"github.com/onsi/gomega"
    32  
    33  	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
    34  	"k8s.io/apimachinery/pkg/util/version"
    35  	"k8s.io/apimachinery/pkg/util/wait"
    36  
    37  	"k8s.io/kubernetes/test/e2e/framework"
    38  	e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
    39  	e2estatefulset "k8s.io/kubernetes/test/e2e/framework/statefulset"
    40  	e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
    41  	"k8s.io/kubernetes/test/e2e/upgrades"
    42  )
    43  
    44  const manifestPath = "test/e2e/testing-manifests/statefulset/etcd"
    45  
    46  // EtcdUpgradeTest tests that etcd is writable before and after a cluster upgrade.
    47  type EtcdUpgradeTest struct {
    48  	ip               string
    49  	successfulWrites int
    50  }
    51  
    52  // Name returns the tracking name of the test.
    53  func (EtcdUpgradeTest) Name() string { return "etcd-upgrade" }
    54  
    55  // Skip returns true when this test can be skipped.
    56  func (EtcdUpgradeTest) Skip(upgCtx upgrades.UpgradeContext) bool {
    57  	minVersion := version.MustParseSemantic("1.6.0")
    58  	for _, vCtx := range upgCtx.Versions {
    59  		if vCtx.Version.LessThan(minVersion) {
    60  			return true
    61  		}
    62  	}
    63  	return false
    64  }
    65  
    66  func kubectlCreate(ns, file string) {
    67  	data, err := e2etestfiles.Read(filepath.Join(manifestPath, file))
    68  	if err != nil {
    69  		framework.Fail(err.Error())
    70  	}
    71  	input := string(data)
    72  	e2ekubectl.RunKubectlOrDieInput(ns, input, "create", "-f", "-")
    73  }
    74  
    75  // Setup creates etcd statefulset and then verifies that the etcd is writable.
    76  func (t *EtcdUpgradeTest) Setup(ctx context.Context, f *framework.Framework) {
    77  	ns := f.Namespace.Name
    78  	statefulsetPoll := 30 * time.Second
    79  	statefulsetTimeout := 10 * time.Minute
    80  
    81  	ginkgo.By("Creating a PDB")
    82  	kubectlCreate(ns, "pdb.yaml")
    83  
    84  	ginkgo.By("Creating an etcd StatefulSet")
    85  	e2estatefulset.CreateStatefulSet(ctx, f.ClientSet, manifestPath, ns)
    86  
    87  	ginkgo.By("Creating an etcd--test-server deployment")
    88  	kubectlCreate(ns, "tester.yaml")
    89  
    90  	ginkgo.By("Getting the ingress IPs from the services")
    91  	err := wait.PollUntilContextTimeout(ctx, statefulsetPoll, statefulsetTimeout, true, func(ctx context.Context) (bool, error) {
    92  		if t.ip = t.getServiceIP(ctx, f, ns, "test-server"); t.ip == "" {
    93  			return false, nil
    94  		}
    95  		if _, err := t.listUsers(); err != nil {
    96  			framework.Logf("Service endpoint is up but isn't responding")
    97  			return false, nil
    98  		}
    99  		return true, nil
   100  	})
   101  	framework.ExpectNoError(err)
   102  	framework.Logf("Service endpoint is up")
   103  
   104  	ginkgo.By("Adding 2 dummy users")
   105  	err = t.addUser("Alice")
   106  	framework.ExpectNoError(err)
   107  	err = t.addUser("Bob")
   108  	framework.ExpectNoError(err)
   109  	t.successfulWrites = 2
   110  
   111  	ginkgo.By("Verifying that the users exist")
   112  	users, err := t.listUsers()
   113  	framework.ExpectNoError(err)
   114  	gomega.Expect(users).To(gomega.HaveLen(2))
   115  }
   116  
   117  func (t *EtcdUpgradeTest) listUsers() ([]string, error) {
   118  	r, err := http.Get(fmt.Sprintf("http://%s/list", net.JoinHostPort(t.ip, "8080")))
   119  	if err != nil {
   120  		return nil, err
   121  	}
   122  	defer r.Body.Close()
   123  	if r.StatusCode != http.StatusOK {
   124  		b, err := io.ReadAll(r.Body)
   125  		if err != nil {
   126  			return nil, err
   127  		}
   128  		return nil, fmt.Errorf(string(b))
   129  	}
   130  	var names []string
   131  	if err := json.NewDecoder(r.Body).Decode(&names); err != nil {
   132  		return nil, err
   133  	}
   134  	return names, nil
   135  }
   136  
   137  func (t *EtcdUpgradeTest) addUser(name string) error {
   138  	val := map[string][]string{"name": {name}}
   139  	r, err := http.PostForm(fmt.Sprintf("http://%s/add", net.JoinHostPort(t.ip, "8080")), val)
   140  	if err != nil {
   141  		return err
   142  	}
   143  	defer r.Body.Close()
   144  	if r.StatusCode != http.StatusOK {
   145  		b, err := io.ReadAll(r.Body)
   146  		if err != nil {
   147  			return err
   148  		}
   149  		return fmt.Errorf(string(b))
   150  	}
   151  	return nil
   152  }
   153  
   154  func (t *EtcdUpgradeTest) getServiceIP(ctx context.Context, f *framework.Framework, ns, svcName string) string {
   155  	svc, err := f.ClientSet.CoreV1().Services(ns).Get(ctx, svcName, metav1.GetOptions{})
   156  	framework.ExpectNoError(err)
   157  	ingress := svc.Status.LoadBalancer.Ingress
   158  	if len(ingress) == 0 {
   159  		return ""
   160  	}
   161  	return ingress[0].IP
   162  }
   163  
   164  // Test waits for upgrade to complete and verifies if etcd is writable.
   165  func (t *EtcdUpgradeTest) Test(ctx context.Context, f *framework.Framework, done <-chan struct{}, upgrade upgrades.UpgradeType) {
   166  	ginkgo.By("Continuously polling the database during upgrade.")
   167  	var (
   168  		success, failures, writeAttempts, lastUserCount int
   169  		mu                                              sync.Mutex
   170  		errors                                          = map[string]int{}
   171  	)
   172  	// Write loop.
   173  	go wait.Until(func() {
   174  		writeAttempts++
   175  		if err := t.addUser(fmt.Sprintf("user-%d", writeAttempts)); err != nil {
   176  			framework.Logf("Unable to add user: %v", err)
   177  			mu.Lock()
   178  			errors[err.Error()]++
   179  			mu.Unlock()
   180  			return
   181  		}
   182  		t.successfulWrites++
   183  	}, 10*time.Millisecond, done)
   184  	// Read loop.
   185  	wait.Until(func() {
   186  		users, err := t.listUsers()
   187  		if err != nil {
   188  			framework.Logf("Could not retrieve users: %v", err)
   189  			failures++
   190  			mu.Lock()
   191  			errors[err.Error()]++
   192  			mu.Unlock()
   193  			return
   194  		}
   195  		success++
   196  		lastUserCount = len(users)
   197  	}, 10*time.Millisecond, done)
   198  	framework.Logf("got %d users; want >=%d", lastUserCount, t.successfulWrites)
   199  	gomega.Expect(lastUserCount).To(gomega.BeNumerically(">=", t.successfulWrites), "lastUserCount is too small")
   200  	ratio := float64(success) / float64(success+failures)
   201  	framework.Logf("Successful gets %d/%d=%v", success, success+failures, ratio)
   202  	ratio = float64(t.successfulWrites) / float64(writeAttempts)
   203  	framework.Logf("Successful writes %d/%d=%v", t.successfulWrites, writeAttempts, ratio)
   204  	framework.Logf("Errors: %v", errors)
   205  	// TODO(maisem): tweak this value once we have a few test runs.
   206  	gomega.Expect(ratio).To(gomega.BeNumerically(">", 0.75), "ratio too small")
   207  }
   208  
   209  // Teardown does one final check of the data's availability.
   210  func (t *EtcdUpgradeTest) Teardown(ctx context.Context, f *framework.Framework) {
   211  	users, err := t.listUsers()
   212  	framework.ExpectNoError(err)
   213  	gomega.Expect(len(users)).To(gomega.BeNumerically(">=", t.successfulWrites), "len(users) is too small")
   214  }
   215  

View as plain text