1
16
17 package apps
18
19 import (
20 "context"
21 "encoding/json"
22 "fmt"
23 "io"
24 "net"
25 "net/http"
26 "path/filepath"
27 "strconv"
28 "time"
29
30 "github.com/onsi/ginkgo/v2"
31 "github.com/onsi/gomega"
32
33 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
34 "k8s.io/apimachinery/pkg/util/version"
35 "k8s.io/apimachinery/pkg/util/wait"
36
37 "k8s.io/kubernetes/test/e2e/framework"
38 e2ekubectl "k8s.io/kubernetes/test/e2e/framework/kubectl"
39 e2estatefulset "k8s.io/kubernetes/test/e2e/framework/statefulset"
40 e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles"
41 "k8s.io/kubernetes/test/e2e/upgrades"
42 )
43
44 const mysqlManifestPath = "test/e2e/testing-manifests/statefulset/mysql-upgrade"
45
46
47 type MySQLUpgradeTest struct {
48 ip string
49 successfulWrites int
50 nextWrite int
51 }
52
53
54 func (MySQLUpgradeTest) Name() string { return "mysql-upgrade" }
55
56
57 func (MySQLUpgradeTest) Skip(upgCtx upgrades.UpgradeContext) bool {
58 minVersion := version.MustParseSemantic("1.5.0")
59
60 for _, vCtx := range upgCtx.Versions {
61 if vCtx.Version.LessThan(minVersion) {
62 return true
63 }
64 }
65 return false
66 }
67
68 func mysqlKubectlCreate(ns, file string) {
69 data, err := e2etestfiles.Read(filepath.Join(mysqlManifestPath, file))
70 if err != nil {
71 framework.Fail(err.Error())
72 }
73 input := string(data)
74 e2ekubectl.RunKubectlOrDieInput(ns, input, "create", "-f", "-")
75 }
76
77 func (t *MySQLUpgradeTest) getServiceIP(ctx context.Context, f *framework.Framework, ns, svcName string) string {
78 svc, err := f.ClientSet.CoreV1().Services(ns).Get(ctx, svcName, metav1.GetOptions{})
79 framework.ExpectNoError(err)
80 ingress := svc.Status.LoadBalancer.Ingress
81 if len(ingress) == 0 {
82 return ""
83 }
84 return ingress[0].IP
85 }
86
87
88
89
90
91 func (t *MySQLUpgradeTest) Setup(ctx context.Context, f *framework.Framework) {
92 ns := f.Namespace.Name
93 statefulsetPoll := 30 * time.Second
94 statefulsetTimeout := 10 * time.Minute
95
96 ginkgo.By("Creating a configmap")
97 mysqlKubectlCreate(ns, "configmap.yaml")
98
99 ginkgo.By("Creating a mysql StatefulSet")
100 e2estatefulset.CreateStatefulSet(ctx, f.ClientSet, mysqlManifestPath, ns)
101
102 ginkgo.By("Creating a mysql-test-server deployment")
103 mysqlKubectlCreate(ns, "tester.yaml")
104
105 ginkgo.By("Getting the ingress IPs from the test-service")
106 err := wait.PollUntilContextTimeout(ctx, statefulsetPoll, statefulsetTimeout, true, func(ctx context.Context) (bool, error) {
107 if t.ip = t.getServiceIP(ctx, f, ns, "test-server"); t.ip == "" {
108 return false, nil
109 }
110 if _, err := t.countNames(); err != nil {
111 framework.Logf("Service endpoint is up but isn't responding")
112 return false, nil
113 }
114 return true, nil
115 })
116 framework.ExpectNoError(err)
117 framework.Logf("Service endpoint is up")
118
119 ginkgo.By("Adding 2 names to the database")
120 err = t.addName(strconv.Itoa(t.nextWrite))
121 framework.ExpectNoError(err)
122 err = t.addName(strconv.Itoa(t.nextWrite))
123 framework.ExpectNoError(err)
124
125 ginkgo.By("Verifying that the 2 names have been inserted")
126 count, err := t.countNames()
127 framework.ExpectNoError(err)
128 gomega.Expect(count).To(gomega.Equal(2))
129 }
130
131
132
133 func (t *MySQLUpgradeTest) Test(ctx context.Context, f *framework.Framework, done <-chan struct{}, upgrade upgrades.UpgradeType) {
134 var writeSuccess, readSuccess, writeFailure, readFailure int
135 ginkgo.By("Continuously polling the database during upgrade.")
136 go wait.Until(func() {
137 _, err := t.countNames()
138 if err != nil {
139 framework.Logf("Error while trying to read data: %v", err)
140 readFailure++
141 } else {
142 readSuccess++
143 }
144 }, framework.Poll, done)
145
146 wait.Until(func() {
147 err := t.addName(strconv.Itoa(t.nextWrite))
148 if err != nil {
149 framework.Logf("Error while trying to write data: %v", err)
150 writeFailure++
151 } else {
152 writeSuccess++
153 }
154 }, framework.Poll, done)
155
156 t.successfulWrites = writeSuccess
157 framework.Logf("Successful reads: %d", readSuccess)
158 framework.Logf("Successful writes: %d", writeSuccess)
159 framework.Logf("Failed reads: %d", readFailure)
160 framework.Logf("Failed writes: %d", writeFailure)
161
162
163
164
165
166 readRatio := float64(readSuccess) / float64(readSuccess+readFailure)
167 writeRatio := float64(writeSuccess) / float64(writeSuccess+writeFailure)
168 if readRatio < 0.75 {
169 framework.Failf("Too many failures reading data. Success ratio: %f", readRatio)
170 }
171 if writeRatio < 0.75 {
172 framework.Failf("Too many failures writing data. Success ratio: %f", writeRatio)
173 }
174 }
175
176
177 func (t *MySQLUpgradeTest) Teardown(ctx context.Context, f *framework.Framework) {
178 count, err := t.countNames()
179 framework.ExpectNoError(err)
180 gomega.Expect(count).To(gomega.BeNumerically(">=", t.successfulWrites), "count is too small")
181 }
182
183
184 func (t *MySQLUpgradeTest) addName(name string) error {
185 val := map[string][]string{"name": {name}}
186 t.nextWrite++
187 r, err := http.PostForm(fmt.Sprintf("http://%s/addName", net.JoinHostPort(t.ip, "8080")), val)
188 if err != nil {
189 return err
190 }
191 defer r.Body.Close()
192 if r.StatusCode != http.StatusOK {
193 b, err := io.ReadAll(r.Body)
194 if err != nil {
195 return err
196 }
197 return fmt.Errorf(string(b))
198 }
199 return nil
200 }
201
202
203
204 func (t *MySQLUpgradeTest) countNames() (int, error) {
205 r, err := http.Get(fmt.Sprintf("http://%s/countNames", net.JoinHostPort(t.ip, "8080")))
206 if err != nil {
207 return 0, err
208 }
209 defer r.Body.Close()
210 if r.StatusCode != http.StatusOK {
211 b, err := io.ReadAll(r.Body)
212 if err != nil {
213 return 0, err
214 }
215 return 0, fmt.Errorf(string(b))
216 }
217 var count int
218 if err := json.NewDecoder(r.Body).Decode(&count); err != nil {
219 return 0, err
220 }
221 return count, nil
222 }
223
View as plain text