1
16
17 package apps
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "io"
24 "net"
25 "net/http"
26 "path/filepath"
27 "sync"
28 "time"
29
30 "github.com/onsi/ginkgo/v2"
31 "github.com/onsi/gomega"
32
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/util/version"
35 "k8s.io/apimachinery/pkg/util/wait"
36
37 "k8s.io/kubernetes/test/e2e/framework"
38 e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
39 e2estatefulset "k8s.io/kubernetes/test/e2e/framework/statefulset"
40 e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
41 "k8s.io/kubernetes/test/e2e/upgrades"
42 )
43
44 const manifestPath = "test/e2e/testing-manifests/statefulset/etcd"
45
46
47 type EtcdUpgradeTest struct {
48 ip string
49 successfulWrites int
50 }
51
52
53 func (EtcdUpgradeTest) Name() string { return "etcd-upgrade" }
54
55
56 func (EtcdUpgradeTest) Skip(upgCtx upgrades.UpgradeContext) bool {
57 minVersion := version.MustParseSemantic("1.6.0")
58 for _, vCtx := range upgCtx.Versions {
59 if vCtx.Version.LessThan(minVersion) {
60 return true
61 }
62 }
63 return false
64 }
65
66 func kubectlCreate(ns, file string) {
67 data, err := e2etestfiles.Read(filepath.Join(manifestPath, file))
68 if err != nil {
69 framework.Fail(err.Error())
70 }
71 input := string(data)
72 e2ekubectl.RunKubectlOrDieInput(ns, input, "create", "-f", "-")
73 }
74
75
76 func (t *EtcdUpgradeTest) Setup(ctx context.Context, f *framework.Framework) {
77 ns := f.Namespace.Name
78 statefulsetPoll := 30 * time.Second
79 statefulsetTimeout := 10 * time.Minute
80
81 ginkgo.By("Creating a PDB")
82 kubectlCreate(ns, "pdb.yaml")
83
84 ginkgo.By("Creating an etcd StatefulSet")
85 e2estatefulset.CreateStatefulSet(ctx, f.ClientSet, manifestPath, ns)
86
87 ginkgo.By("Creating an etcd--test-server deployment")
88 kubectlCreate(ns, "tester.yaml")
89
90 ginkgo.By("Getting the ingress IPs from the services")
91 err := wait.PollUntilContextTimeout(ctx, statefulsetPoll, statefulsetTimeout, true, func(ctx context.Context) (bool, error) {
92 if t.ip = t.getServiceIP(ctx, f, ns, "test-server"); t.ip == "" {
93 return false, nil
94 }
95 if _, err := t.listUsers(); err != nil {
96 framework.Logf("Service endpoint is up but isn't responding")
97 return false, nil
98 }
99 return true, nil
100 })
101 framework.ExpectNoError(err)
102 framework.Logf("Service endpoint is up")
103
104 ginkgo.By("Adding 2 dummy users")
105 err = t.addUser("Alice")
106 framework.ExpectNoError(err)
107 err = t.addUser("Bob")
108 framework.ExpectNoError(err)
109 t.successfulWrites = 2
110
111 ginkgo.By("Verifying that the users exist")
112 users, err := t.listUsers()
113 framework.ExpectNoError(err)
114 gomega.Expect(users).To(gomega.HaveLen(2))
115 }
116
117 func (t *EtcdUpgradeTest) listUsers() ([]string, error) {
118 r, err := http.Get(fmt.Sprintf("http://%s/list", net.JoinHostPort(t.ip, "8080")))
119 if err != nil {
120 return nil, err
121 }
122 defer r.Body.Close()
123 if r.StatusCode != http.StatusOK {
124 b, err := io.ReadAll(r.Body)
125 if err != nil {
126 return nil, err
127 }
128 return nil, fmt.Errorf(string(b))
129 }
130 var names []string
131 if err := json.NewDecoder(r.Body).Decode(&names); err != nil {
132 return nil, err
133 }
134 return names, nil
135 }
136
137 func (t *EtcdUpgradeTest) addUser(name string) error {
138 val := map[string][]string{"name": {name}}
139 r, err := http.PostForm(fmt.Sprintf("http://%s/add", net.JoinHostPort(t.ip, "8080")), val)
140 if err != nil {
141 return err
142 }
143 defer r.Body.Close()
144 if r.StatusCode != http.StatusOK {
145 b, err := io.ReadAll(r.Body)
146 if err != nil {
147 return err
148 }
149 return fmt.Errorf(string(b))
150 }
151 return nil
152 }
153
154 func (t *EtcdUpgradeTest) getServiceIP(ctx context.Context, f *framework.Framework, ns, svcName string) string {
155 svc, err := f.ClientSet.CoreV1().Services(ns).Get(ctx, svcName, metav1.GetOptions{})
156 framework.ExpectNoError(err)
157 ingress := svc.Status.LoadBalancer.Ingress
158 if len(ingress) == 0 {
159 return ""
160 }
161 return ingress[0].IP
162 }
163
164
165 func (t *EtcdUpgradeTest) Test(ctx context.Context, f *framework.Framework, done <-chan struct{}, upgrade upgrades.UpgradeType) {
166 ginkgo.By("Continuously polling the database during upgrade.")
167 var (
168 success, failures, writeAttempts, lastUserCount int
169 mu sync.Mutex
170 errors = map[string]int{}
171 )
172
173 go wait.Until(func() {
174 writeAttempts++
175 if err := t.addUser(fmt.Sprintf("user-%d", writeAttempts)); err != nil {
176 framework.Logf("Unable to add user: %v", err)
177 mu.Lock()
178 errors[err.Error()]++
179 mu.Unlock()
180 return
181 }
182 t.successfulWrites++
183 }, 10*time.Millisecond, done)
184
185 wait.Until(func() {
186 users, err := t.listUsers()
187 if err != nil {
188 framework.Logf("Could not retrieve users: %v", err)
189 failures++
190 mu.Lock()
191 errors[err.Error()]++
192 mu.Unlock()
193 return
194 }
195 success++
196 lastUserCount = len(users)
197 }, 10*time.Millisecond, done)
198 framework.Logf("got %d users; want >=%d", lastUserCount, t.successfulWrites)
199 gomega.Expect(lastUserCount).To(gomega.BeNumerically(">=", t.successfulWrites), "lastUserCount is too small")
200 ratio := float64(success) / float64(success+failures)
201 framework.Logf("Successful gets %d/%d=%v", success, success+failures, ratio)
202 ratio = float64(t.successfulWrites) / float64(writeAttempts)
203 framework.Logf("Successful writes %d/%d=%v", t.successfulWrites, writeAttempts, ratio)
204 framework.Logf("Errors: %v", errors)
205
206 gomega.Expect(ratio).To(gomega.BeNumerically(">", 0.75), "ratio too small")
207 }
208
209
210 func (t *EtcdUpgradeTest) Teardown(ctx context.Context, f *framework.Framework) {
211 users, err := t.listUsers()
212 framework.ExpectNoError(err)
213 gomega.Expect(len(users)).To(gomega.BeNumerically(">=", t.successfulWrites), "len(users) is too small")
214 }
215
View as plain text