...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test/main/main.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test/main

     1  // Copyright 2018 Envoyproxy Authors
     2  //
     3  //   Licensed under the Apache License, Version 2.0 (the "License");
     4  //   you may not use this file except in compliance with the License.
     5  //   You may obtain a copy of the License at
     6  //
     7  //       http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  //   Unless required by applicable law or agreed to in writing, software
    10  //   distributed under the License is distributed on an "AS IS" BASIS,
    11  //   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  //   See the License for the specific language governing permissions and
    13  //   limitations under the License.
    14  
    15  // Package main contains the test driver for testing xDS manually.
    16  package main
    17  
    18  import (
    19  	"context"
    20  	cryptotls "crypto/tls"
    21  	"flag"
    22  	"fmt"
    23  	"io"
    24  	"log"
    25  	"net/http"
    26  	"os"
    27  	"runtime"
    28  	"runtime/pprof"
    29  	"time"
    30  
    31  	"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3"
    32  	"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/v3"
    33  	"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test"
    34  	"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test/resource/v3"
    35  	testv3 "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test/v3"
    36  )
    37  
    38  var (
    39  	debug bool
    40  
    41  	port            uint
    42  	gatewayPort     uint
    43  	upstreamPort    uint
    44  	upstreamMessage string
    45  	basePort        uint
    46  	alsPort         uint
    47  
    48  	delay    time.Duration
    49  	requests int
    50  	updates  int
    51  
    52  	mode                string
    53  	clusters            int
    54  	httpListeners       int
    55  	scopedHTTPListeners int
    56  	vhdsHTTPListeners   int
    57  	tcpListeners        int
    58  	runtimes            int
    59  	tls                 bool
    60  	mux                 bool
    61  	extensionNum        int
    62  
    63  	nodeID string
    64  
    65  	pprofEnabled bool
    66  )
    67  
    68  func init() {
    69  	flag.BoolVar(&debug, "debug", false, "Use debug logging")
    70  
    71  	//
    72  	// These parameters control the ports that the integration test
    73  	// components use to talk to one another
    74  	//
    75  
    76  	// The port that the Envoy xDS client uses to talk to the control
    77  	// plane xDS server (part of this program)
    78  	flag.UintVar(&port, "port", 18000, "xDS management server port")
    79  
    80  	// The port that the Envoy REST client uses to talk to the control
    81  	// plane gateway (which translates from REST to xDS)
    82  	flag.UintVar(&gatewayPort, "gateway", 18001, "Management HTTP gateway (from HTTP to xDS) server port")
    83  
    84  	// The port that Envoy uses to talk to the upstream http "echo"
    85  	// server
    86  	flag.UintVar(&upstreamPort, "upstream", 18080, "Upstream HTTP/1.1 port")
    87  
    88  	// The port that the tests below use to talk to Envoy's proxy of the
    89  	// upstream server
    90  	flag.UintVar(&basePort, "base", 9000, "Envoy Proxy listener port")
    91  
    92  	// The control plane accesslog server port
    93  	flag.UintVar(&alsPort, "als", 18090, "Control plane accesslog server port")
    94  
    95  	//
    96  	// These parameters control Envoy configuration
    97  	//
    98  
    99  	// Tell Envoy to request configurations from the control plane using
   100  	// this protocol
   101  	flag.StringVar(&mode, "xds", resource.Ads, "Management protocol to test (ADS, xDS, REST, DELTA, DELTA-ADS)")
   102  
   103  	// Tell Envoy to use this Node ID
   104  	flag.StringVar(&nodeID, "nodeID", "test-id", "Node ID")
   105  
   106  	// Tell Envoy to use TLS to talk to the control plane
   107  	flag.BoolVar(&tls, "tls", false, "Enable TLS on all listeners and use SDS for secret delivery")
   108  
   109  	// Tell Envoy to configure this many clusters for each snapshot
   110  	flag.IntVar(&clusters, "clusters", 4, "Number of clusters")
   111  
   112  	// Tell Envoy to configure this many Runtime Discovery Service
   113  	// layers for each snapshot
   114  	flag.IntVar(&runtimes, "runtimes", 1, "Number of RTDS layers")
   115  
   116  	//
   117  	// These parameters control the test harness
   118  	//
   119  
   120  	// The message that the tests expect to receive from the upstream
   121  	// server
   122  	flag.StringVar(&upstreamMessage, "message", "Default message", "Upstream HTTP server response message")
   123  
   124  	// Time to wait between test request batches
   125  	flag.DurationVar(&delay, "delay", 500*time.Millisecond, "Interval between request batch retries")
   126  
   127  	// Each test loads a configuration snapshot into the control plane
   128  	// which is then picked up by Envoy.  This parameter specifies how
   129  	// many snapshots to test
   130  	flag.IntVar(&updates, "u", 3, "Number of snapshot updates")
   131  
   132  	// Each snapshot test sends this many requests to the upstream
   133  	// server for each snapshot for each listener port
   134  	flag.IntVar(&requests, "r", 5, "Number of requests between snapshot updates")
   135  
   136  	// Test this many HTTP listeners per snapshot
   137  	flag.IntVar(&httpListeners, "http", 2, "Number of HTTP listeners (and RDS configs)")
   138  	// Test this many scoped HTTP listeners per snapshot
   139  	flag.IntVar(&scopedHTTPListeners, "scopedhttp", 2, "Number of HTTP listeners (and SRDS configs)")
   140  	// Test this many VHDS HTTP listeners per snapshot
   141  	flag.IntVar(&vhdsHTTPListeners, "vhdshttp", 2, "Number of VHDS HTTP listeners")
   142  	// Test this many TCP listeners per snapshot
   143  	flag.IntVar(&tcpListeners, "tcp", 2, "Number of TCP pass-through listeners")
   144  
   145  	// Enable a muxed cache with partial snapshots
   146  	flag.BoolVar(&mux, "mux", false, "Enable muxed linear cache for EDS")
   147  
   148  	// Number of ExtensionConfig
   149  	flag.IntVar(&extensionNum, "extension", 1, "Number of Extension")
   150  	//
   151  	// These parameters control the the use of the pprof profiler
   152  	//
   153  
   154  	// Enable use of the pprof profiler
   155  	flag.BoolVar(&pprofEnabled, "pprof", false, "Enable use of the pprof profiler")
   156  
   157  }
   158  
   159  // main returns code 1 if any of the batches failed to pass all requests
   160  func main() {
   161  	flag.Parse()
   162  	ctx := context.Background()
   163  
   164  	if pprofEnabled {
   165  		runtime.SetBlockProfileRate(1)
   166  		for _, prof := range []string{"block", "goroutine", "mutex"} {
   167  			log.Printf("turn on pprof %s profiler", prof)
   168  			if pprof.Lookup(prof) == nil {
   169  				pprof.NewProfile(prof)
   170  			}
   171  		}
   172  	}
   173  
   174  	// create a cache
   175  	signal := make(chan struct{})
   176  	cb := &testv3.Callbacks{Signal: signal, Debug: debug}
   177  
   178  	// mux integration
   179  	// nil for logger uses default logger
   180  	config := cache.NewSnapshotCache(mode == resource.Ads, cache.IDHash{}, nil)
   181  	var configCache cache.Cache = config
   182  	typeURL := "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment"
   183  	eds := cache.NewLinearCache(typeURL)
   184  	if mux {
   185  		configCache = &cache.MuxCache{
   186  			Classify: func(req *cache.Request) string {
   187  				if req.TypeUrl == typeURL {
   188  					return "eds"
   189  				}
   190  				return "default"
   191  			},
   192  			Caches: map[string]cache.Cache{
   193  				"default": config,
   194  				"eds":     eds,
   195  			},
   196  		}
   197  	}
   198  	srv := server.NewServer(context.Background(), configCache, cb)
   199  	als := &testv3.AccessLogService{}
   200  
   201  	if mode != "delta" {
   202  		vhdsHTTPListeners = 0
   203  	}
   204  
   205  	// create a test snapshot
   206  	snapshots := resource.TestSnapshot{
   207  		Xds:                    mode,
   208  		UpstreamPort:           uint32(upstreamPort),
   209  		BasePort:               uint32(basePort),
   210  		NumClusters:            clusters,
   211  		NumHTTPListeners:       httpListeners,
   212  		NumScopedHTTPListeners: scopedHTTPListeners,
   213  		NumVHDSHTTPListeners:   vhdsHTTPListeners,
   214  		NumTCPListeners:        tcpListeners,
   215  		TLS:                    tls,
   216  		NumRuntimes:            runtimes,
   217  		NumExtension:           extensionNum,
   218  	}
   219  
   220  	// start the xDS server
   221  	go test.RunAccessLogServer(ctx, als, alsPort)
   222  	go test.RunManagementServer(ctx, srv, port)
   223  	go test.RunManagementGateway(ctx, srv, gatewayPort)
   224  
   225  	log.Println("waiting for the first request...")
   226  	select {
   227  	case <-signal:
   228  		break
   229  	case <-time.After(1 * time.Minute):
   230  		log.Println("timeout waiting for the first request")
   231  		os.Exit(1)
   232  	}
   233  	log.Printf("initial snapshot %+v\n", snapshots)
   234  	log.Printf("executing sequence updates=%d request=%d\n", updates, requests)
   235  
   236  	for i := 0; i < updates; i++ {
   237  		snapshots.Version = fmt.Sprintf("v%d", i)
   238  		log.Printf("update snapshot %v\n", snapshots.Version)
   239  
   240  		snapshot := snapshots.Generate()
   241  		if err := snapshot.Consistent(); err != nil {
   242  			log.Printf("snapshot inconsistency: %+v\n%+v\n", snapshot, err)
   243  		}
   244  
   245  		err := config.SetSnapshot(context.Background(), nodeID, snapshot)
   246  		if err != nil {
   247  			log.Printf("snapshot error %q for %+v\n", err, snapshot)
   248  			os.Exit(1)
   249  		}
   250  
   251  		if mux {
   252  			for name, res := range snapshot.GetResources(typeURL) {
   253  				if err := eds.UpdateResource(name, res); err != nil {
   254  					log.Printf("update error %q for %+v\n", err, name)
   255  					os.Exit(1)
   256  
   257  				}
   258  			}
   259  		}
   260  
   261  		// pass is true if all requests succeed at least once in a run
   262  		pass := false
   263  		for j := 0; j < requests; j++ {
   264  			ok, failed := callEcho()
   265  			if failed == 0 && !pass {
   266  				pass = true
   267  			}
   268  			log.Printf("request batch %d, ok %v, failed %v, pass %v\n", j, ok, failed, pass)
   269  			select {
   270  			case <-time.After(delay):
   271  			case <-ctx.Done():
   272  				return
   273  			}
   274  		}
   275  
   276  		als.Dump(func(s string) {
   277  			if debug {
   278  				log.Println(s)
   279  			}
   280  		})
   281  		cb.Report()
   282  
   283  		if !pass {
   284  			log.Printf("failed all requests in a run %d\n", i)
   285  			os.Exit(1)
   286  		}
   287  	}
   288  
   289  	if pprofEnabled {
   290  		for _, prof := range []string{"block", "goroutine", "mutex"} {
   291  			p := pprof.Lookup(prof)
   292  			filePath := fmt.Sprintf("%s_profile_%s.pb.gz", prof, mode)
   293  			log.Printf("storing %s profile for %s in %s", prof, mode, filePath)
   294  			f, err := os.Create(filePath)
   295  			if err != nil {
   296  				log.Fatalf("could not create %s profile %s: %s", prof, filePath, err)
   297  			}
   298  			p.WriteTo(f, 1) // nolint:errcheck
   299  			f.Close()
   300  		}
   301  	}
   302  
   303  	log.Printf("Test for %s passed!\n", mode)
   304  }
   305  
   306  // callEcho calls upstream echo service on all listener ports and returns an error
   307  // if any of the listeners returned an error.
   308  func callEcho() (int, int) {
   309  	total := httpListeners + scopedHTTPListeners + tcpListeners + vhdsHTTPListeners
   310  	ok, failed := 0, 0
   311  	ch := make(chan error, total)
   312  
   313  	client := http.Client{
   314  		Timeout: 100 * time.Millisecond,
   315  		Transport: &http.Transport{
   316  			TLSClientConfig: &cryptotls.Config{InsecureSkipVerify: true}, // nolint:gosec
   317  		},
   318  	}
   319  
   320  	get := func(count int) (*http.Response, error) {
   321  		proto := "http"
   322  		if tls {
   323  			proto = "https"
   324  		}
   325  
   326  		req, err := http.NewRequestWithContext(
   327  			context.Background(),
   328  			http.MethodGet,
   329  			fmt.Sprintf("%s://127.0.0.1:%d", proto, basePort+uint(count)),
   330  			nil,
   331  		)
   332  		if err != nil {
   333  			return nil, err
   334  		}
   335  		return client.Do(req)
   336  	}
   337  
   338  	// spawn requests
   339  	for i := 0; i < total; i++ {
   340  		go func(i int) {
   341  			resp, err := get(i)
   342  			if err != nil {
   343  				ch <- err
   344  				return
   345  			}
   346  			body, err := io.ReadAll(resp.Body)
   347  			if err != nil {
   348  				resp.Body.Close()
   349  				ch <- err
   350  				return
   351  			}
   352  			if err := resp.Body.Close(); err != nil {
   353  				ch <- err
   354  				return
   355  			}
   356  			if string(body) != upstreamMessage {
   357  				ch <- fmt.Errorf("unexpected return %q", string(body))
   358  				return
   359  			}
   360  			ch <- nil
   361  		}(i)
   362  	}
   363  
   364  	for {
   365  		out := <-ch
   366  		if out == nil {
   367  			ok++
   368  		} else {
   369  			failed++
   370  		}
   371  		if ok+failed == total {
   372  			return ok, failed
   373  		}
   374  	}
   375  }
   376  

View as plain text