1
16
17
22
23 package main
24
25 import (
26 "context"
27 "flag"
28 "fmt"
29 "os"
30 "path/filepath"
31 "time"
32
33 v1 "k8s.io/api/core/v1"
34 apierrors "k8s.io/apimachinery/pkg/api/errors"
35 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
36 "k8s.io/apimachinery/pkg/runtime"
37 "k8s.io/apimachinery/pkg/util/intstr"
38 clientset "k8s.io/client-go/kubernetes"
39 restclient "k8s.io/client-go/rest"
40 "k8s.io/client-go/tools/clientcmd"
41 "k8s.io/kubernetes/pkg/api/legacyscheme"
42 e2e "k8s.io/kubernetes/test/e2e/framework"
43 "k8s.io/kubernetes/test/e2e/framework/service"
44
45 "k8s.io/klog/v2"
46 )
47
48 var (
49 queriesAverage = flag.Int("queries", 100, "Number of hostname queries to make in each iteration per pod on average")
50 podsPerNode = flag.Int("pods_per_node", 1, "Number of serve_hostname pods per node")
51 upTo = flag.Int("up_to", 1, "Number of iterations or -1 for no limit")
52 maxPar = flag.Int("max_par", 500, "Maximum number of queries in flight")
53 gke = flag.String("gke_context", "", "Target GKE cluster with context gke_{project}_{zone}_{cluster-name}")
54 )
55
56 const (
57 deleteTimeout = 2 * time.Minute
58 endpointTimeout = 5 * time.Minute
59 nodeListTimeout = 2 * time.Minute
60 podCreateTimeout = 2 * time.Minute
61 podStartTimeout = 30 * time.Minute
62 serviceCreateTimeout = 2 * time.Minute
63 namespaceDeleteTimeout = 5 * time.Minute
64 )
65
66 func main() {
67 flag.Parse()
68
69 klog.Infof("Starting serve_hostnames soak test with queries=%d and podsPerNode=%d upTo=%d",
70 *queriesAverage, *podsPerNode, *upTo)
71
72 var spec string
73 if *gke != "" {
74 spec = filepath.Join(os.Getenv("HOME"), ".config", "gcloud", "kubernetes", "kubeconfig")
75 } else {
76 spec = filepath.Join(os.Getenv("HOME"), ".kube", "config")
77 }
78 settings, err := clientcmd.LoadFromFile(spec)
79 if err != nil {
80 klog.Fatalf("Error loading configuration: %v", err.Error())
81 }
82 if *gke != "" {
83 settings.CurrentContext = *gke
84 }
85 config, err := clientcmd.NewDefaultClientConfig(*settings, &clientcmd.ConfigOverrides{}).ClientConfig()
86 if err != nil {
87 klog.Fatalf("Failed to construct config: %v", err)
88 }
89
90 client, err := clientset.NewForConfig(config)
91 if err != nil {
92 klog.Fatalf("Failed to make client: %v", err)
93 }
94
95 var nodes *v1.NodeList
96 for start := time.Now(); time.Since(start) < nodeListTimeout; time.Sleep(2 * time.Second) {
97 nodes, err = client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
98 if err == nil {
99 break
100 }
101 klog.Warningf("Failed to list nodes: %v", err)
102 }
103 if err != nil {
104 klog.Fatalf("Giving up trying to list nodes: %v", err)
105 }
106
107 if len(nodes.Items) == 0 {
108 klog.Fatalf("Failed to find any nodes.")
109 }
110
111 klog.Infof("Found %d nodes on this cluster:", len(nodes.Items))
112 for i, node := range nodes.Items {
113 klog.Infof("%d: %s", i, node.Name)
114 }
115
116 queries := *queriesAverage * len(nodes.Items) * *podsPerNode
117
118
119 got, err := client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{GenerateName: "serve-hostnames-"}}, metav1.CreateOptions{})
120 if err != nil {
121 klog.Fatalf("Failed to create namespace: %v", err)
122 }
123 ns := got.Name
124 defer func(ns string) {
125 if err := client.CoreV1().Namespaces().Delete(context.TODO(), ns, metav1.DeleteOptions{}); err != nil {
126 klog.Warningf("Failed to delete namespace %s: %v", ns, err)
127 } else {
128
129 for i := 0; i < int(namespaceDeleteTimeout/time.Second); i++ {
130 if _, err := client.CoreV1().Namespaces().Get(context.TODO(), ns, metav1.GetOptions{}); err != nil {
131 if apierrors.IsNotFound(err) {
132 return
133 }
134 }
135 time.Sleep(time.Second)
136 }
137 }
138 }(ns)
139 klog.Infof("Created namespace %s", ns)
140
141
142 klog.Infof("Creating service %s/serve-hostnames", ns)
143
144 var svc *v1.Service
145 for start := time.Now(); time.Since(start) < serviceCreateTimeout; time.Sleep(2 * time.Second) {
146 t := time.Now()
147 svc, err = client.CoreV1().Services(ns).Create(context.TODO(), &v1.Service{
148 ObjectMeta: metav1.ObjectMeta{
149 Name: "serve-hostnames",
150 Labels: map[string]string{
151 "name": "serve-hostname",
152 },
153 },
154 Spec: v1.ServiceSpec{
155 Ports: []v1.ServicePort{{
156 Protocol: "TCP",
157 Port: 9376,
158 TargetPort: intstr.FromInt32(9376),
159 }},
160 Selector: map[string]string{
161 "name": "serve-hostname",
162 },
163 },
164 }, metav1.CreateOptions{})
165 klog.V(4).Infof("Service create %s/server-hostnames took %v", ns, time.Since(t))
166 if err == nil {
167 break
168 }
169 klog.Warningf("After %v failed to create service %s/serve-hostnames: %v", time.Since(start), ns, err)
170 }
171 if err != nil {
172 klog.Warningf("Unable to create service %s/%s: %v", ns, svc.Name, err)
173 return
174 }
175
176 defer func() {
177 klog.Infof("Cleaning up service %s/serve-hostnames", ns)
178
179 for start := time.Now(); time.Since(start) < deleteTimeout; time.Sleep(1 * time.Second) {
180 if err := client.CoreV1().Services(ns).Delete(context.TODO(), svc.Name, metav1.DeleteOptions{}); err == nil {
181 return
182 }
183 klog.Warningf("After %v unable to delete service %s/%s: %v", time.Since(start), ns, svc.Name, err)
184 }
185 }()
186
187
188 podNames := []string{}
189 for i, node := range nodes.Items {
190 for j := 0; j < *podsPerNode; j++ {
191 podName := fmt.Sprintf("serve-hostname-%d-%d", i, j)
192 podNames = append(podNames, podName)
193
194 for start := time.Now(); time.Since(start) < podCreateTimeout; time.Sleep(2 * time.Second) {
195 klog.Infof("Creating pod %s/%s on node %s", ns, podName, node.Name)
196 t := time.Now()
197 _, err = client.CoreV1().Pods(ns).Create(context.TODO(), &v1.Pod{
198 ObjectMeta: metav1.ObjectMeta{
199 Name: podName,
200 Labels: map[string]string{
201 "name": "serve-hostname",
202 },
203 },
204 Spec: v1.PodSpec{
205 Containers: []v1.Container{
206 {
207 Name: "serve-hostname",
208 Image: e2e.ServeHostnameImage,
209 Ports: []v1.ContainerPort{{ContainerPort: 9376}},
210 },
211 },
212 NodeName: node.Name,
213 },
214 }, metav1.CreateOptions{})
215 klog.V(4).Infof("Pod create %s/%s request took %v", ns, podName, time.Since(t))
216 if err == nil {
217 break
218 }
219 klog.Warningf("After %s failed to create pod %s/%s: %v", time.Since(start), ns, podName, err)
220 }
221 if err != nil {
222 klog.Warningf("Failed to create pod %s/%s: %v", ns, podName, err)
223 return
224 }
225 }
226 }
227
228 defer func() {
229 klog.Info("Cleaning up pods")
230
231 for _, podName := range podNames {
232 for start := time.Now(); time.Since(start) < deleteTimeout; time.Sleep(1 * time.Second) {
233 if err = client.CoreV1().Pods(ns).Delete(context.TODO(), podName, metav1.DeleteOptions{}); err == nil {
234 break
235 }
236 klog.Warningf("After %v failed to delete pod %s/%s: %v", time.Since(start), ns, podName, err)
237 }
238 }
239 }()
240
241 klog.Info("Waiting for the serve-hostname pods to be ready")
242 for _, podName := range podNames {
243 var pod *v1.Pod
244 for start := time.Now(); time.Since(start) < podStartTimeout; time.Sleep(5 * time.Second) {
245 pod, err = client.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
246 if err != nil {
247 klog.Warningf("Get pod %s/%s failed, ignoring for %v: %v", ns, podName, err, podStartTimeout)
248 continue
249 }
250 if pod.Status.Phase == v1.PodRunning {
251 break
252 }
253 }
254 if pod.Status.Phase != v1.PodRunning {
255 klog.Warningf("Gave up waiting on pod %s/%s to be running (saw %v)", ns, podName, pod.Status.Phase)
256 } else {
257 klog.Infof("%s/%s is running", ns, podName)
258 }
259 }
260
261 rclient, err := restclient.RESTClientFor(config)
262 if err != nil {
263 klog.Warningf("Failed to build restclient: %v", err)
264 return
265 }
266 proxyRequest, errProxy := service.GetServicesProxyRequest(client, rclient.Get())
267 if errProxy != nil {
268 klog.Warningf("Get services proxy request failed: %v", errProxy)
269 return
270 }
271
272
273 for start := time.Now(); time.Since(start) < endpointTimeout; time.Sleep(10 * time.Second) {
274 hostname, err := proxyRequest.
275 Namespace(ns).
276 Name("serve-hostnames").
277 DoRaw(context.TODO())
278 if err != nil {
279 klog.Infof("After %v while making a proxy call got error %v", time.Since(start), err)
280 continue
281 }
282 var r metav1.Status
283 if err := runtime.DecodeInto(legacyscheme.Codecs.UniversalDecoder(), hostname, &r); err != nil {
284 break
285 }
286 if r.Status == metav1.StatusFailure {
287 klog.Infof("After %v got status %v", time.Since(start), string(hostname))
288 continue
289 }
290 break
291 }
292
293
294 for iteration := 0; iteration != *upTo; iteration++ {
295 responseChan := make(chan string, queries)
296
297
298 inFlight := make(chan struct{}, *maxPar)
299 start := time.Now()
300 for q := 0; q < queries; q++ {
301 go func(i int, query int) {
302 inFlight <- struct{}{}
303 t := time.Now()
304 hostname, err := proxyRequest.
305 Namespace(ns).
306 Name("serve-hostnames").
307 DoRaw(context.TODO())
308 klog.V(4).Infof("Proxy call in namespace %s took %v", ns, time.Since(t))
309 if err != nil {
310 klog.Warningf("Call failed during iteration %d query %d : %v", i, query, err)
311
312
313 responseChan <- fmt.Sprintf("!failed in iteration %d to issue query %d: %v", i, query, err)
314 } else {
315 responseChan <- string(hostname)
316 }
317 <-inFlight
318 }(iteration, q)
319 }
320 responses := make(map[string]int, *podsPerNode*len(nodes.Items))
321 missing := 0
322 for q := 0; q < queries; q++ {
323 r := <-responseChan
324 klog.V(4).Infof("Got response from %s", r)
325 responses[r]++
326
327
328 if len(r) > 0 && r[0] == '!' {
329 klog.V(3).Infof("Got response %s", r)
330 missing++
331 }
332 }
333 if missing > 0 {
334 klog.Warningf("Missing %d responses out of %d", missing, queries)
335 }
336
337 for n, node := range nodes.Items {
338 for i := 0; i < *podsPerNode; i++ {
339 name := fmt.Sprintf("serve-hostname-%d-%d", n, i)
340 if _, ok := responses[name]; !ok {
341 klog.Warningf("No response from pod %s on node %s at iteration %d", name, node.Name, iteration)
342 }
343 }
344 }
345 klog.Infof("Iteration %d took %v for %d queries (%.2f QPS) with %d missing",
346 iteration, time.Since(start), queries-missing, float64(queries-missing)/time.Since(start).Seconds(), missing)
347 }
348 }
349
View as plain text