1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package main
17
18 import (
19 "context"
20 cryptotls "crypto/tls"
21 "flag"
22 "fmt"
23 "io"
24 "log"
25 "net/http"
26 "os"
27 "runtime"
28 "runtime/pprof"
29 "time"
30
31 "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3"
32 "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/v3"
33 "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test"
34 "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test/resource/v3"
35 testv3 "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test/v3"
36 )
37
38 var (
39 debug bool
40
41 port uint
42 gatewayPort uint
43 upstreamPort uint
44 upstreamMessage string
45 basePort uint
46 alsPort uint
47
48 delay time.Duration
49 requests int
50 updates int
51
52 mode string
53 clusters int
54 httpListeners int
55 scopedHTTPListeners int
56 vhdsHTTPListeners int
57 tcpListeners int
58 runtimes int
59 tls bool
60 mux bool
61 extensionNum int
62
63 nodeID string
64
65 pprofEnabled bool
66 )
67
68 func init() {
69 flag.BoolVar(&debug, "debug", false, "Use debug logging")
70
71
72
73
74
75
76
77
78 flag.UintVar(&port, "port", 18000, "xDS management server port")
79
80
81
82 flag.UintVar(&gatewayPort, "gateway", 18001, "Management HTTP gateway (from HTTP to xDS) server port")
83
84
85
86 flag.UintVar(&upstreamPort, "upstream", 18080, "Upstream HTTP/1.1 port")
87
88
89
90 flag.UintVar(&basePort, "base", 9000, "Envoy Proxy listener port")
91
92
93 flag.UintVar(&alsPort, "als", 18090, "Control plane accesslog server port")
94
95
96
97
98
99
100
101 flag.StringVar(&mode, "xds", resource.Ads, "Management protocol to test (ADS, xDS, REST, DELTA, DELTA-ADS)")
102
103
104 flag.StringVar(&nodeID, "nodeID", "test-id", "Node ID")
105
106
107 flag.BoolVar(&tls, "tls", false, "Enable TLS on all listeners and use SDS for secret delivery")
108
109
110 flag.IntVar(&clusters, "clusters", 4, "Number of clusters")
111
112
113
114 flag.IntVar(&runtimes, "runtimes", 1, "Number of RTDS layers")
115
116
117
118
119
120
121
122 flag.StringVar(&upstreamMessage, "message", "Default message", "Upstream HTTP server response message")
123
124
125 flag.DurationVar(&delay, "delay", 500*time.Millisecond, "Interval between request batch retries")
126
127
128
129
130 flag.IntVar(&updates, "u", 3, "Number of snapshot updates")
131
132
133
134 flag.IntVar(&requests, "r", 5, "Number of requests between snapshot updates")
135
136
137 flag.IntVar(&httpListeners, "http", 2, "Number of HTTP listeners (and RDS configs)")
138
139 flag.IntVar(&scopedHTTPListeners, "scopedhttp", 2, "Number of HTTP listeners (and SRDS configs)")
140
141 flag.IntVar(&vhdsHTTPListeners, "vhdshttp", 2, "Number of VHDS HTTP listeners")
142
143 flag.IntVar(&tcpListeners, "tcp", 2, "Number of TCP pass-through listeners")
144
145
146 flag.BoolVar(&mux, "mux", false, "Enable muxed linear cache for EDS")
147
148
149 flag.IntVar(&extensionNum, "extension", 1, "Number of Extension")
150
151
152
153
154
155 flag.BoolVar(&pprofEnabled, "pprof", false, "Enable use of the pprof profiler")
156
157 }
158
159
160 func main() {
161 flag.Parse()
162 ctx := context.Background()
163
164 if pprofEnabled {
165 runtime.SetBlockProfileRate(1)
166 for _, prof := range []string{"block", "goroutine", "mutex"} {
167 log.Printf("turn on pprof %s profiler", prof)
168 if pprof.Lookup(prof) == nil {
169 pprof.NewProfile(prof)
170 }
171 }
172 }
173
174
175 signal := make(chan struct{})
176 cb := &testv3.Callbacks{Signal: signal, Debug: debug}
177
178
179
180 config := cache.NewSnapshotCache(mode == resource.Ads, cache.IDHash{}, nil)
181 var configCache cache.Cache = config
182 typeURL := "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"
183 eds := cache.NewLinearCache(typeURL)
184 if mux {
185 configCache = &cache.MuxCache{
186 Classify: func(req *cache.Request) string {
187 if req.TypeUrl == typeURL {
188 return "eds"
189 }
190 return "default"
191 },
192 Caches: map[string]cache.Cache{
193 "default": config,
194 "eds": eds,
195 },
196 }
197 }
198 srv := server.NewServer(context.Background(), configCache, cb)
199 als := &testv3.AccessLogService{}
200
201 if mode != "delta" {
202 vhdsHTTPListeners = 0
203 }
204
205
206 snapshots := resource.TestSnapshot{
207 Xds: mode,
208 UpstreamPort: uint32(upstreamPort),
209 BasePort: uint32(basePort),
210 NumClusters: clusters,
211 NumHTTPListeners: httpListeners,
212 NumScopedHTTPListeners: scopedHTTPListeners,
213 NumVHDSHTTPListeners: vhdsHTTPListeners,
214 NumTCPListeners: tcpListeners,
215 TLS: tls,
216 NumRuntimes: runtimes,
217 NumExtension: extensionNum,
218 }
219
220
221 go test.RunAccessLogServer(ctx, als, alsPort)
222 go test.RunManagementServer(ctx, srv, port)
223 go test.RunManagementGateway(ctx, srv, gatewayPort)
224
225 log.Println("waiting for the first request...")
226 select {
227 case <-signal:
228 break
229 case <-time.After(1 * time.Minute):
230 log.Println("timeout waiting for the first request")
231 os.Exit(1)
232 }
233 log.Printf("initial snapshot %+v\n", snapshots)
234 log.Printf("executing sequence updates=%d request=%d\n", updates, requests)
235
236 for i := 0; i < updates; i++ {
237 snapshots.Version = fmt.Sprintf("v%d", i)
238 log.Printf("update snapshot %v\n", snapshots.Version)
239
240 snapshot := snapshots.Generate()
241 if err := snapshot.Consistent(); err != nil {
242 log.Printf("snapshot inconsistency: %+v\n%+v\n", snapshot, err)
243 }
244
245 err := config.SetSnapshot(context.Background(), nodeID, snapshot)
246 if err != nil {
247 log.Printf("snapshot error %q for %+v\n", err, snapshot)
248 os.Exit(1)
249 }
250
251 if mux {
252 for name, res := range snapshot.GetResources(typeURL) {
253 if err := eds.UpdateResource(name, res); err != nil {
254 log.Printf("update error %q for %+v\n", err, name)
255 os.Exit(1)
256
257 }
258 }
259 }
260
261
262 pass := false
263 for j := 0; j < requests; j++ {
264 ok, failed := callEcho()
265 if failed == 0 && !pass {
266 pass = true
267 }
268 log.Printf("request batch %d, ok %v, failed %v, pass %v\n", j, ok, failed, pass)
269 select {
270 case <-time.After(delay):
271 case <-ctx.Done():
272 return
273 }
274 }
275
276 als.Dump(func(s string) {
277 if debug {
278 log.Println(s)
279 }
280 })
281 cb.Report()
282
283 if !pass {
284 log.Printf("failed all requests in a run %d\n", i)
285 os.Exit(1)
286 }
287 }
288
289 if pprofEnabled {
290 for _, prof := range []string{"block", "goroutine", "mutex"} {
291 p := pprof.Lookup(prof)
292 filePath := fmt.Sprintf("%s_profile_%s.pb.gz", prof, mode)
293 log.Printf("storing %s profile for %s in %s", prof, mode, filePath)
294 f, err := os.Create(filePath)
295 if err != nil {
296 log.Fatalf("could not create %s profile %s: %s", prof, filePath, err)
297 }
298 p.WriteTo(f, 1)
299 f.Close()
300 }
301 }
302
303 log.Printf("Test for %s passed!\n", mode)
304 }
305
306
307
308 func callEcho() (int, int) {
309 total := httpListeners + scopedHTTPListeners + tcpListeners + vhdsHTTPListeners
310 ok, failed := 0, 0
311 ch := make(chan error, total)
312
313 client := http.Client{
314 Timeout: 100 * time.Millisecond,
315 Transport: &http.Transport{
316 TLSClientConfig: &cryptotls.Config{InsecureSkipVerify: true},
317 },
318 }
319
320 get := func(count int) (*http.Response, error) {
321 proto := "http"
322 if tls {
323 proto = "https"
324 }
325
326 req, err := http.NewRequestWithContext(
327 context.Background(),
328 http.MethodGet,
329 fmt.Sprintf("%s://127.0.0.1:%d", proto, basePort+uint(count)),
330 nil,
331 )
332 if err != nil {
333 return nil, err
334 }
335 return client.Do(req)
336 }
337
338
339 for i := 0; i < total; i++ {
340 go func(i int) {
341 resp, err := get(i)
342 if err != nil {
343 ch <- err
344 return
345 }
346 body, err := io.ReadAll(resp.Body)
347 if err != nil {
348 resp.Body.Close()
349 ch <- err
350 return
351 }
352 if err := resp.Body.Close(); err != nil {
353 ch <- err
354 return
355 }
356 if string(body) != upstreamMessage {
357 ch <- fmt.Errorf("unexpected return %q", string(body))
358 return
359 }
360 ch <- nil
361 }(i)
362 }
363
364 for {
365 out := <-ch
366 if out == nil {
367 ok++
368 } else {
369 failed++
370 }
371 if ok+failed == total {
372 return ok, failed
373 }
374 }
375 }
376
View as plain text