1
16
17 package network
18
19 import (
20 "context"
21 "fmt"
22 "sort"
23 "strings"
24 "time"
25
26 v1 "k8s.io/api/core/v1"
27 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28 "k8s.io/apimachinery/pkg/runtime"
29 "k8s.io/apimachinery/pkg/util/sets"
30 "k8s.io/apimachinery/pkg/watch"
31 "k8s.io/client-go/kubernetes"
32 "k8s.io/client-go/tools/cache"
33 "k8s.io/client-go/util/flowcontrol"
34 "k8s.io/kubernetes/test/e2e/framework"
35 e2erc "k8s.io/kubernetes/test/e2e/framework/rc"
36 "k8s.io/kubernetes/test/e2e/network/common"
37 testutils "k8s.io/kubernetes/test/utils"
38 imageutils "k8s.io/kubernetes/test/utils/image"
39 admissionapi "k8s.io/pod-security-admission/api"
40
41 "github.com/onsi/ginkgo/v2"
42 )
43
44 type durations []time.Duration
45
46 func (d durations) Len() int { return len(d) }
47 func (d durations) Less(i, j int) bool { return d[i] < d[j] }
48 func (d durations) Swap(i, j int) { d[i], d[j] = d[j], d[i] }
49
50 var _ = common.SIGDescribe("Service endpoints latency", func() {
51 f := framework.NewDefaultFramework("svc-latency")
52 f.NamespacePodSecurityLevel = admissionapi.LevelBaseline
53
54
59 framework.ConformanceIt("should not be very high", func(ctx context.Context) {
60 const (
61
62
63
64 limitMedian = time.Second * 20
65 limitTail = time.Second * 50
66
67
68
69
70
71
72
73
74
75
76 totalTrials = 200
77 parallelTrials = 15
78 minSampleSize = 100
79
80
81 acceptableFailureRatio = .05
82 )
83
84
85 cfg, err := framework.LoadConfig()
86 if err != nil {
87 framework.Failf("Unable to load config: %v", err)
88 }
89 cfg.RateLimiter = flowcontrol.NewFakeAlwaysRateLimiter()
90 f.ClientSet = kubernetes.NewForConfigOrDie(cfg)
91
92 failing := sets.NewString()
93 d, err := runServiceLatencies(ctx, f, parallelTrials, totalTrials, acceptableFailureRatio)
94 if err != nil {
95 failing.Insert(fmt.Sprintf("Not all RC/pod/service trials succeeded: %v", err))
96 }
97 dSorted := durations(d)
98 sort.Sort(dSorted)
99 n := len(dSorted)
100 if n < minSampleSize {
101 failing.Insert(fmt.Sprintf("Did not get a good sample size: %v", dSorted))
102 }
103 if n < 2 {
104 failing.Insert("Less than two runs succeeded; aborting.")
105 framework.Failf(strings.Join(failing.List(), "\n"))
106 }
107 percentile := func(p int) time.Duration {
108 est := n * p / 100
109 if est >= n {
110 return dSorted[n-1]
111 }
112 return dSorted[est]
113 }
114 framework.Logf("Latencies: %v", dSorted)
115 p50 := percentile(50)
116 p90 := percentile(90)
117 p99 := percentile(99)
118 framework.Logf("50 %%ile: %v", p50)
119 framework.Logf("90 %%ile: %v", p90)
120 framework.Logf("99 %%ile: %v", p99)
121 framework.Logf("Total sample count: %v", len(dSorted))
122
123 if p50 > limitMedian {
124 failing.Insert("Median latency should be less than " + limitMedian.String())
125 }
126 if p99 > limitTail {
127 failing.Insert("Tail (99 percentile) latency should be less than " + limitTail.String())
128 }
129 if failing.Len() > 0 {
130 errList := strings.Join(failing.List(), "\n")
131 helpfulInfo := fmt.Sprintf("\n50, 90, 99 percentiles: %v %v %v", p50, p90, p99)
132 framework.Failf(errList + helpfulInfo)
133 }
134 })
135 })
136
137 func runServiceLatencies(ctx context.Context, f *framework.Framework, inParallel, total int, acceptableFailureRatio float32) (output []time.Duration, err error) {
138 cfg := testutils.RCConfig{
139 Client: f.ClientSet,
140 Image: imageutils.GetPauseImageName(),
141 Name: "svc-latency-rc",
142 Namespace: f.Namespace.Name,
143 Replicas: 1,
144 PollInterval: time.Second,
145 }
146 if err := e2erc.RunRC(ctx, cfg); err != nil {
147 return nil, err
148 }
149
150
151
152
153
154 endpointQueries := newQuerier()
155 startEndpointWatcher(ctx, f, endpointQueries)
156 defer close(endpointQueries.stop)
157
158
159
160 _, err = singleServiceLatency(ctx, f, cfg.Name, endpointQueries)
161 framework.ExpectNoError(err)
162
163
164
165
166 errs := make(chan error, total)
167 durations := make(chan time.Duration, total)
168
169 blocker := make(chan struct{}, inParallel)
170 for i := 0; i < total; i++ {
171 go func() {
172 defer ginkgo.GinkgoRecover()
173 blocker <- struct{}{}
174 defer func() { <-blocker }()
175 if d, err := singleServiceLatency(ctx, f, cfg.Name, endpointQueries); err != nil {
176 errs <- err
177 } else {
178 durations <- d
179 }
180 }()
181 }
182
183 errCount := 0
184 for i := 0; i < total; i++ {
185 select {
186 case e := <-errs:
187 framework.Logf("Got error: %v", e)
188 errCount++
189 case d := <-durations:
190 output = append(output, d)
191 }
192 }
193 if errCount != 0 {
194 framework.Logf("Got %d errors out of %d tries", errCount, total)
195 errRatio := float32(errCount) / float32(total)
196 if errRatio > acceptableFailureRatio {
197 return output, fmt.Errorf("error ratio %g is higher than the acceptable ratio %g", errRatio, acceptableFailureRatio)
198 }
199 }
200 return output, nil
201 }
202
203 type endpointQuery struct {
204 endpointsName string
205 endpoints *v1.Endpoints
206 result chan<- struct{}
207 }
208
209 type endpointQueries struct {
210 requests map[string]*endpointQuery
211
212 stop chan struct{}
213 requestChan chan *endpointQuery
214 seenChan chan *v1.Endpoints
215 }
216
217 func newQuerier() *endpointQueries {
218 eq := &endpointQueries{
219 requests: map[string]*endpointQuery{},
220
221 stop: make(chan struct{}, 100),
222 requestChan: make(chan *endpointQuery),
223 seenChan: make(chan *v1.Endpoints, 100),
224 }
225 go eq.join()
226 return eq
227 }
228
229
230
231
232
233 func (eq *endpointQueries) join() {
234 defer func() {
235
236
237 for _, req := range eq.requests {
238 if req.result != nil {
239 close(req.result)
240 }
241 }
242 }()
243
244 for {
245 select {
246 case <-eq.stop:
247 return
248 case req := <-eq.requestChan:
249 if cur, ok := eq.requests[req.endpointsName]; ok && cur.endpoints != nil {
250
251
252 delete(eq.requests, req.endpointsName)
253 req.endpoints = cur.endpoints
254 close(req.result)
255 } else {
256
257 eq.requests[req.endpointsName] = req
258 }
259 case got := <-eq.seenChan:
260 if req, ok := eq.requests[got.Name]; ok {
261 if req.result != nil {
262
263 delete(eq.requests, got.Name)
264 req.endpoints = got
265 close(req.result)
266 }
267
268
269
270 } else {
271
272
273 eq.requests[got.Name] = &endpointQuery{
274 endpoints: got,
275 }
276 }
277 }
278 }
279 }
280
281
282 func (eq *endpointQueries) request(endpointsName string) *v1.Endpoints {
283 result := make(chan struct{})
284 req := &endpointQuery{
285 endpointsName: endpointsName,
286 result: result,
287 }
288 eq.requestChan <- req
289 <-result
290 return req.endpoints
291 }
292
293
294 func (eq *endpointQueries) added(e *v1.Endpoints) {
295 eq.seenChan <- e
296 }
297
298
299 func startEndpointWatcher(ctx context.Context, f *framework.Framework, q *endpointQueries) {
300 _, controller := cache.NewInformer(
301 &cache.ListWatch{
302 ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
303 obj, err := f.ClientSet.CoreV1().Endpoints(f.Namespace.Name).List(ctx, options)
304 return runtime.Object(obj), err
305 },
306 WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
307 return f.ClientSet.CoreV1().Endpoints(f.Namespace.Name).Watch(ctx, options)
308 },
309 },
310 &v1.Endpoints{},
311 0,
312 cache.ResourceEventHandlerFuncs{
313 AddFunc: func(obj interface{}) {
314 if e, ok := obj.(*v1.Endpoints); ok {
315 if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
316 q.added(e)
317 }
318 }
319 },
320 UpdateFunc: func(old, cur interface{}) {
321 if e, ok := cur.(*v1.Endpoints); ok {
322 if len(e.Subsets) > 0 && len(e.Subsets[0].Addresses) > 0 {
323 q.added(e)
324 }
325 }
326 },
327 },
328 )
329
330 go controller.Run(q.stop)
331
332
333 for !controller.HasSynced() {
334 time.Sleep(100 * time.Millisecond)
335 }
336 }
337
338 func singleServiceLatency(ctx context.Context, f *framework.Framework, name string, q *endpointQueries) (time.Duration, error) {
339
340 svc := &v1.Service{
341 ObjectMeta: metav1.ObjectMeta{
342 GenerateName: "latency-svc-",
343 },
344 Spec: v1.ServiceSpec{
345 Ports: []v1.ServicePort{{Protocol: v1.ProtocolTCP, Port: 80}},
346 Selector: map[string]string{"name": name},
347 Type: v1.ServiceTypeClusterIP,
348 SessionAffinity: v1.ServiceAffinityNone,
349 },
350 }
351 startTime := time.Now()
352 gotSvc, err := f.ClientSet.CoreV1().Services(f.Namespace.Name).Create(ctx, svc, metav1.CreateOptions{})
353 if err != nil {
354 return 0, err
355 }
356 framework.Logf("Created: %v", gotSvc.Name)
357
358 if e := q.request(gotSvc.Name); e == nil {
359 return 0, fmt.Errorf("never got a result for endpoint %v", gotSvc.Name)
360 }
361 stopTime := time.Now()
362 d := stopTime.Sub(startTime)
363 framework.Logf("Got endpoints: %v [%v]", gotSvc.Name, d)
364 return d, nil
365 }
366
View as plain text