...

Source file src/github.com/datawire/ambassador/v2/pkg/ambex/main.go

Documentation: github.com/datawire/ambassador/v2/pkg/ambex

     1  package ambex
     2  
     3  /**********************************************
     4   * ambex: Ambassador Experimental ADS server
     5   *
     6   * Here's the deal.
     7   *
     8   * go-control-plane, several different classes manage this stuff:
     9   *
    10   * - The root of the world is a SnapshotCache.
    11   *   - import github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v2, then refer
    12   *     to cache.SnapshotCache.
    13   *   - A collection of internally consistent configuration objects is a
    14   *     Snapshot (cache.Snapshot).
    15   *   - Snapshots are collected in the SnapshotCache.
    16   *   - A given SnapshotCache can hold configurations for multiple Envoys,
    17   *     identified by the Envoy 'node ID', which must be configured for the
    18   *     Envoy.
    19   * - The SnapshotCache can only hold go-control-plane configuration objects,
    20   *   so you have to build these up to hand to the SnapshotCache.
    21   * - The gRPC stuff is handled by a Server.
    22   *   - import github.com/datawire/ambassador/v2/pkg/envoy-control-plane/server, then refer
    23   *     to server.Server.
    24   *   - Our runManagementServer (largely ripped off from the go-control-plane
    25   *     tests) gets this running. It takes a SnapshotCache (cleverly called a
    26   *     "config" for no reason I understand) and a gRPCServer as arguments.
    27   *   - _ALL_ the gRPC madness is handled by the Server, with the assistance
    28   *     of the methods in a callback object.
    29   * - Once the Server is running, Envoy can open a gRPC stream to it.
    30   *   - On connection, Envoy will get handed the most recent Snapshot that
    31   *     the Server's SnapshotCache knows about.
    32   *   - Whenever a newer Snapshot is added to the SnapshotCache, that Snapshot
    33   *     will get sent to the Envoy.
    34   * - We manage the SnapshotCache by loading envoy configuration from
    35   *   json and/or protobuf files on disk.
    36   *   - By default when we get a SIGHUP, we reload configuration.
    37   *   - When passed the -watch argument we reload whenever any file in
    38   *     the directory changes.
    39   */
    40  
    41  import (
    42  	// standard library
    43  	"context"
    44  	"encoding/json"
    45  	"flag"
    46  	"fmt"
    47  	"io/ioutil"
    48  	"net"
    49  	"os"
    50  	"os/signal"
    51  	"path"
    52  	"path/filepath"
    53  	"strconv"
    54  	"strings"
    55  	"syscall"
    56  
    57  	// third-party libraries
    58  	"github.com/fsnotify/fsnotify"
    59  	"google.golang.org/grpc"
    60  	"google.golang.org/protobuf/encoding/protojson"
    61  	"google.golang.org/protobuf/encoding/prototext"
    62  	"google.golang.org/protobuf/proto"
    63  	"google.golang.org/protobuf/types/known/anypb"
    64  
    65  	// envoy control plane
    66  	ecp_cache_types "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
    67  	ecp_v2_cache "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v2"
    68  	ecp_v3_cache "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v3"
    69  	ecp_log "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/log"
    70  	ecp_v2_server "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/server/v2"
    71  	ecp_v3_server "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/server/v3"
    72  
    73  	// Envoy API v2
    74  	// Be sure to import the package of any types that're referenced with "@type" in our
    75  	// generated Envoy config, even if that package is otherwise not used by ambex.
    76  	v2 "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2"
    77  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/auth"
    78  	v2core "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/core"
    79  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/accesslog/v2"
    80  	v2bootstrap "github.com/datawire/ambassador/v2/pkg/api/envoy/config/bootstrap/v2"
    81  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/buffer/v2"
    82  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/ext_authz/v2"
    83  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/gzip/v2"
    84  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/lua/v2"
    85  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/rate_limit/v2"
    86  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/rbac/v2"
    87  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/http/router/v2"
    88  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/network/http_connection_manager/v2"
    89  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/filter/network/tcp_proxy/v2"
    90  	v2discovery "github.com/datawire/ambassador/v2/pkg/api/envoy/service/discovery/v2"
    91  
    92  	// Envoy API v3
    93  	// Be sure to import the package of any types that're referenced with "@type" in our
    94  	// generated Envoy config, even if that package is otherwise not used by ambex.
    95  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/config/accesslog/v3"
    96  	v3bootstrap "github.com/datawire/ambassador/v2/pkg/api/envoy/config/bootstrap/v3"
    97  	v3clusterconfig "github.com/datawire/ambassador/v2/pkg/api/envoy/config/cluster/v3"
    98  	v3core "github.com/datawire/ambassador/v2/pkg/api/envoy/config/core/v3"
    99  	v3endpointconfig "github.com/datawire/ambassador/v2/pkg/api/envoy/config/endpoint/v3"
   100  	v3listenerconfig "github.com/datawire/ambassador/v2/pkg/api/envoy/config/listener/v3"
   101  	v3routeconfig "github.com/datawire/ambassador/v2/pkg/api/envoy/config/route/v3"
   102  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/access_loggers/file/v3"
   103  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/access_loggers/grpc/v3"
   104  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/compression/gzip/compressor/v3"
   105  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/buffer/v3"
   106  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/compressor/v3"
   107  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/ext_authz/v3"
   108  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/grpc_stats/v3"
   109  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/gzip/v3"
   110  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/lua/v3"
   111  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/ratelimit/v3"
   112  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/rbac/v3"
   113  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/response_map/v3"
   114  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/http/router/v3"
   115  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/network/http_connection_manager/v3"
   116  	_ "github.com/datawire/ambassador/v2/pkg/api/envoy/extensions/filters/network/tcp_proxy/v3"
   117  	v3cluster "github.com/datawire/ambassador/v2/pkg/api/envoy/service/cluster/v3"
   118  	v3discovery "github.com/datawire/ambassador/v2/pkg/api/envoy/service/discovery/v3"
   119  	v3endpoint "github.com/datawire/ambassador/v2/pkg/api/envoy/service/endpoint/v3"
   120  	v3listener "github.com/datawire/ambassador/v2/pkg/api/envoy/service/listener/v3"
   121  	v3route "github.com/datawire/ambassador/v2/pkg/api/envoy/service/route/v3"
   122  	v3runtime "github.com/datawire/ambassador/v2/pkg/api/envoy/service/runtime/v3"
   123  
   124  	// first-party libraries
   125  	"github.com/datawire/dlib/dgroup"
   126  	"github.com/datawire/dlib/dhttp"
   127  	"github.com/datawire/dlib/dlog"
   128  )
   129  
   130  type Args struct {
   131  	watch bool
   132  
   133  	adsNetwork string
   134  	adsAddress string
   135  
   136  	dirs []string
   137  
   138  	snapdirPath string
   139  	numsnaps    int
   140  
   141  	// edsBypass will bypass using EDS and will insert the endpoints into the cluster data manually
   142  	// This is a stop gap solution to resolve 503s on certification rotation
   143  	edsBypass bool
   144  }
   145  
   146  func parseArgs(ctx context.Context, rawArgs ...string) (*Args, error) {
   147  	var args Args
   148  	flagset := flag.NewFlagSet("ambex", flag.ContinueOnError)
   149  
   150  	flagset.BoolVar(&args.watch, "watch", false, "Watch for file changes")
   151  
   152  	// TODO(lukeshu): Consider changing the default here so we don't need to put it in entrypoint.sh
   153  	flagset.StringVar(&args.adsNetwork, "ads-listen-network", "tcp", "network for ADS to listen on")
   154  	flagset.StringVar(&args.adsAddress, "ads-listen-address", ":18000", "address (on --ads-listen-network) for ADS to listen on")
   155  
   156  	var legacyAdsPort uint
   157  	flagset.UintVar(&legacyAdsPort, "ads", 0, "port number for ADS to listen on--deprecated, use --ads-listen-address=:1234 instead")
   158  
   159  	if err := flagset.Parse(rawArgs); err != nil {
   160  		return nil, err
   161  	}
   162  
   163  	if legacyAdsPort != 0 {
   164  		args.adsAddress = fmt.Sprintf(":%v", legacyAdsPort)
   165  	}
   166  
   167  	args.dirs = flagset.Args()
   168  	if len(args.dirs) == 0 {
   169  		args.dirs = []string{"."}
   170  	}
   171  
   172  	// ambex logs its own snapshots, separately from the ones provided by the Python
   173  	// side of the world, in $rootdir/snapshots/ambex-#.json, where rootdir is taken
   174  	// from $AMBASSADOR_CONFIG_BASE_DIR if set, else $ambassador_root if set, else
   175  	// whatever, set rootdir to /ambassador.
   176  	snapdirPath := os.Getenv("AMBASSADOR_CONFIG_BASE_DIR")
   177  	if snapdirPath == "" {
   178  		snapdirPath = os.Getenv("ambassador_root")
   179  	}
   180  	if snapdirPath == "" {
   181  		snapdirPath = "/ambassador"
   182  	}
   183  	args.snapdirPath = path.Join(snapdirPath, "snapshots")
   184  
   185  	// We'll keep $AMBASSADOR_AMBEX_SNAPSHOT_COUNT snapshots. If unset, or set to
   186  	// something we can't treat as an int, use 30 (which Flynn just made up, so don't
   187  	// be afraid to change it if need be).
   188  	numsnapStr := os.Getenv("AMBASSADOR_AMBEX_SNAPSHOT_COUNT")
   189  	if numsnapStr == "" {
   190  		numsnapStr = "30"
   191  	}
   192  	var err error
   193  	args.numsnaps, err = strconv.Atoi(numsnapStr)
   194  	if (err != nil) || (args.numsnaps < 0) {
   195  		args.numsnaps = 30
   196  		dlog.Errorf(ctx, "Invalid AMBASSADOR_AMBEX_SNAPSHOT_COUNT: %s, using %d", numsnapStr, args.numsnaps)
   197  	}
   198  
   199  	// edsBypass will bypass using EDS and will insert the endpoints into the cluster data manually
   200  	// This is a stop gap solution to resolve 503s on certification rotation
   201  	edsBypass := os.Getenv("AMBASSADOR_EDS_BYPASS")
   202  	if v, err := strconv.ParseBool(edsBypass); err == nil && v {
   203  		dlog.Info(ctx, "AMBASSADOR_EDS_BYPASS has been set to true. EDS will be bypassed and endpoints will be inserted manually.")
   204  		args.edsBypass = v
   205  	}
   206  
   207  	return &args, nil
   208  }
   209  
   210  // Hasher returns node ID as an ID
   211  type HasherV2 struct {
   212  }
   213  
   214  // ID function
   215  func (h HasherV2) ID(node *v2core.Node) string {
   216  	if node == nil {
   217  		return "unknown"
   218  	}
   219  	return node.Id
   220  }
   221  
   222  // Hasher returns node ID as an ID
   223  type HasherV3 struct {
   224  }
   225  
   226  // ID function
   227  func (h HasherV3) ID(node *v3core.Node) string {
   228  	if node == nil {
   229  		return "unknown"
   230  	}
   231  	return node.Id
   232  }
   233  
   234  // end Hasher stuff
   235  
   236  // run stuff
   237  // RunManagementServer starts an xDS server at the given port.
   238  func runManagementServer(ctx context.Context, server ecp_v2_server.Server, serverv3 ecp_v3_server.Server, adsNetwork, adsAddress string) error {
   239  	grpcServer := grpc.NewServer()
   240  
   241  	lis, err := net.Listen(adsNetwork, adsAddress)
   242  	if err != nil {
   243  		return fmt.Errorf("failed to listen: %w", err)
   244  	}
   245  
   246  	// register services
   247  	v2discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, server)
   248  	v2.RegisterEndpointDiscoveryServiceServer(grpcServer, server)
   249  	v2.RegisterClusterDiscoveryServiceServer(grpcServer, server)
   250  	v2.RegisterRouteDiscoveryServiceServer(grpcServer, server)
   251  	v2.RegisterListenerDiscoveryServiceServer(grpcServer, server)
   252  
   253  	v3discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, serverv3)
   254  	v3endpoint.RegisterEndpointDiscoveryServiceServer(grpcServer, serverv3)
   255  	v3cluster.RegisterClusterDiscoveryServiceServer(grpcServer, serverv3)
   256  	v3route.RegisterRouteDiscoveryServiceServer(grpcServer, serverv3)
   257  	v3listener.RegisterListenerDiscoveryServiceServer(grpcServer, serverv3)
   258  
   259  	dlog.Infof(ctx, "Listening on %s:%s", adsNetwork, adsAddress)
   260  
   261  	sc := &dhttp.ServerConfig{
   262  		Handler: grpcServer,
   263  	}
   264  	return sc.Serve(ctx, lis)
   265  }
   266  
   267  // Decoders for unmarshalling our config
   268  var decoders = map[string](func([]byte, proto.Message) error){
   269  	".json": protojson.Unmarshal,
   270  	".pb":   prototext.Unmarshal,
   271  }
   272  
   273  func isDecodable(name string) bool {
   274  	if strings.HasPrefix(name, ".") {
   275  		return false
   276  	}
   277  
   278  	ext := filepath.Ext(name)
   279  	_, ok := decoders[ext]
   280  	return ok
   281  }
   282  
   283  // Not sure if there is a better way to do this, but we cast to this
   284  // so we can call the generated Validate method.
   285  type Validatable interface {
   286  	proto.Message
   287  	Validate() error
   288  }
   289  
   290  func Decode(ctx context.Context, name string) (proto.Message, error) {
   291  	any := &anypb.Any{}
   292  	contents, err := ioutil.ReadFile(name)
   293  	if err != nil {
   294  		return nil, err
   295  	}
   296  
   297  	ext := filepath.Ext(name)
   298  	decoder := decoders[ext]
   299  	err = decoder(contents, any)
   300  	if err != nil {
   301  		return nil, err
   302  	}
   303  
   304  	m, err := any.UnmarshalNew()
   305  	if err != nil {
   306  		return nil, err
   307  	}
   308  
   309  	v := m.(Validatable)
   310  
   311  	if err := v.Validate(); err != nil {
   312  		return nil, err
   313  	}
   314  	dlog.Infof(ctx, "Loaded file %s", name)
   315  	return v, nil
   316  }
   317  
   318  // Observability:
   319  //
   320  // These "expanded" snapshots make the snapshots we log easier to read: basically,
   321  // instead of just indexing by Golang types, make the JSON marshal with real names.
   322  type v2ExpandedSnapshot struct {
   323  	Endpoints ecp_v2_cache.Resources `json:"endpoints"`
   324  	Clusters  ecp_v2_cache.Resources `json:"clusters"`
   325  	Routes    ecp_v2_cache.Resources `json:"routes"`
   326  	Listeners ecp_v2_cache.Resources `json:"listeners"`
   327  	Runtimes  ecp_v2_cache.Resources `json:"runtimes"`
   328  }
   329  
   330  func NewV2ExpandedSnapshot(v2snap *ecp_v2_cache.Snapshot) v2ExpandedSnapshot {
   331  	return v2ExpandedSnapshot{
   332  		Endpoints: v2snap.Resources[ecp_cache_types.Endpoint],
   333  		Clusters:  v2snap.Resources[ecp_cache_types.Cluster],
   334  		Routes:    v2snap.Resources[ecp_cache_types.Route],
   335  		Listeners: v2snap.Resources[ecp_cache_types.Listener],
   336  		Runtimes:  v2snap.Resources[ecp_cache_types.Runtime],
   337  	}
   338  }
   339  
   340  type v3ExpandedSnapshot struct {
   341  	Endpoints ecp_v3_cache.Resources `json:"endpoints"`
   342  	Clusters  ecp_v3_cache.Resources `json:"clusters"`
   343  	Routes    ecp_v3_cache.Resources `json:"routes"`
   344  	Listeners ecp_v3_cache.Resources `json:"listeners"`
   345  	Runtimes  ecp_v3_cache.Resources `json:"runtimes"`
   346  }
   347  
   348  func NewV3ExpandedSnapshot(v3snap *ecp_v3_cache.Snapshot) v3ExpandedSnapshot {
   349  	return v3ExpandedSnapshot{
   350  		Endpoints: v3snap.Resources[ecp_cache_types.Endpoint],
   351  		Clusters:  v3snap.Resources[ecp_cache_types.Cluster],
   352  		Routes:    v3snap.Resources[ecp_cache_types.Route],
   353  		Listeners: v3snap.Resources[ecp_cache_types.Listener],
   354  		Runtimes:  v3snap.Resources[ecp_cache_types.Runtime],
   355  	}
   356  }
   357  
   358  // A combinedSnapshot has both a V2 and V3 snapshot, for logging.
   359  type combinedSnapshot struct {
   360  	Version string             `json:"version"`
   361  	V2      v2ExpandedSnapshot `json:"v2"`
   362  	V3      v3ExpandedSnapshot `json:"v3"`
   363  }
   364  
   365  // csDump creates a combinedSnapshot from a V2 snapshot and a V3 snapshot, then
   366  // dumps the combinedSnapshot to disk. Only numsnaps snapshots are kept: ambex-1.json
   367  // is the newest, then ambex-2.json, etc., so ambex-$numsnaps.json is the oldest.
   368  // Every time we write a new one, we rename all the older ones, ditching the oldest
   369  // after we've written numsnaps snapshots.
   370  func csDump(ctx context.Context, snapdirPath string, numsnaps int, generation int, v2snap *ecp_v2_cache.Snapshot, v3snap *ecp_v3_cache.Snapshot) {
   371  	if numsnaps <= 0 {
   372  		// Don't do snapshotting at all.
   373  		return
   374  	}
   375  
   376  	// OK, they want snapshots. Make a proper version string...
   377  	version := fmt.Sprintf("v%d", generation)
   378  
   379  	// ...and a combinedSnapshot.
   380  	cs := combinedSnapshot{
   381  		Version: version,
   382  		V2:      NewV2ExpandedSnapshot(v2snap),
   383  		V3:      NewV3ExpandedSnapshot(v3snap),
   384  	}
   385  
   386  	// Next up, marshal as JSON and write to ambex-0.json. Note that we
   387  	// didn't say anything about a -0 file; that's because it's about to
   388  	// be renamed.
   389  
   390  	bs, err := json.MarshalIndent(cs, "", "  ")
   391  
   392  	if err != nil {
   393  		dlog.Errorf(ctx, "CSNAP: marshal failure: %s", err)
   394  		return
   395  	}
   396  
   397  	csPath := path.Join(snapdirPath, "ambex-0.json")
   398  
   399  	err = ioutil.WriteFile(csPath, bs, 0644)
   400  
   401  	if err != nil {
   402  		dlog.Errorf(ctx, "CSNAP: write failure: %s", err)
   403  	} else {
   404  		dlog.Infof(ctx, "Saved snapshot %s", version)
   405  	}
   406  
   407  	// Rotate everything one file down. This includes renaming the just-written
   408  	// ambex-0 to ambex-1.
   409  	for i := numsnaps; i > 0; i-- {
   410  		previous := i - 1
   411  
   412  		fromPath := path.Join(snapdirPath, fmt.Sprintf("ambex-%d.json", previous))
   413  		toPath := path.Join(snapdirPath, fmt.Sprintf("ambex-%d.json", i))
   414  
   415  		err := os.Rename(fromPath, toPath)
   416  
   417  		if (err != nil) && !os.IsNotExist(err) {
   418  			dlog.Infof(ctx, "CSNAP: could not rename %s -> %s: %#v", fromPath, toPath, err)
   419  		}
   420  	}
   421  }
   422  
   423  // Get an updated snapshot going.
   424  func update(
   425  	ctx context.Context,
   426  	snapdirPath string,
   427  	numsnaps int,
   428  	edsBypass bool,
   429  	config ecp_v2_cache.SnapshotCache,
   430  	configv3 ecp_v3_cache.SnapshotCache,
   431  	generation *int,
   432  	dirs []string,
   433  	edsEndpoints map[string]*v2.ClusterLoadAssignment,
   434  	edsEndpointsV3 map[string]*v3endpointconfig.ClusterLoadAssignment,
   435  	fastpathSnapshot *FastpathSnapshot,
   436  	updates chan<- Update,
   437  ) error {
   438  	clusters := []ecp_cache_types.Resource{}  // v2.Cluster
   439  	routes := []ecp_cache_types.Resource{}    // v2.RouteConfiguration
   440  	listeners := []ecp_cache_types.Resource{} // v2.Listener
   441  	runtimes := []ecp_cache_types.Resource{}  // discovery.Runtime
   442  
   443  	clustersv3 := []ecp_cache_types.Resource{}  // v3.Cluster
   444  	routesv3 := []ecp_cache_types.Resource{}    // v3.RouteConfiguration
   445  	listenersv3 := []ecp_cache_types.Resource{} // v3.Listener
   446  	runtimesv3 := []ecp_cache_types.Resource{}  // v3.Runtime
   447  
   448  	var filenames []string
   449  
   450  	for _, dir := range dirs {
   451  		files, err := ioutil.ReadDir(dir)
   452  		if err != nil {
   453  			dlog.Warnf(ctx, "Error listing %q: %v", dir, err)
   454  			continue
   455  		}
   456  		for _, file := range files {
   457  			name := file.Name()
   458  			if isDecodable(name) {
   459  				filenames = append(filenames, filepath.Join(dir, name))
   460  			}
   461  		}
   462  	}
   463  
   464  	for _, name := range filenames {
   465  		m, e := Decode(ctx, name)
   466  		if e != nil {
   467  			dlog.Warnf(ctx, "%s: %v", name, e)
   468  			continue
   469  		}
   470  		var dst *[]ecp_cache_types.Resource
   471  		switch m.(type) {
   472  		case *v2.Cluster:
   473  			dst = &clusters
   474  		case *v2.RouteConfiguration:
   475  			dst = &routes
   476  		case *v2.Listener:
   477  			dst = &listeners
   478  		case *v2discovery.Runtime:
   479  			dst = &runtimes
   480  		case *v2bootstrap.Bootstrap:
   481  			bs := m.(*v2bootstrap.Bootstrap)
   482  			sr := bs.StaticResources
   483  			for _, lst := range sr.Listeners {
   484  				// When the RouteConfiguration is embedded in the listener, it will cause envoy to
   485  				// go through a complete drain cycle whenever there is a routing change and that
   486  				// will potentially disrupt in-flight requests. By converting all listeners to use
   487  				// RDS rather than inlining their routing configuration, we significantly reduce the
   488  				// set of circumstances where the listener definition itself changes, and this in
   489  				// turn reduces the set of circumstances where envoy has to go through that drain
   490  				// process and disrupt in-flight requests.
   491  				rdsListener, routeConfigs, err := ListenerToRdsListener(lst)
   492  				if err != nil {
   493  					dlog.Errorf(ctx, "Error converting listener to RDS: %+v", err)
   494  					listeners = append(listeners, proto.Clone(lst).(ecp_cache_types.Resource))
   495  					continue
   496  				}
   497  				listeners = append(listeners, rdsListener)
   498  				for _, rc := range routeConfigs {
   499  					// These routes will get included in the configuration snapshot created below.
   500  					routes = append(routes, rc)
   501  				}
   502  			}
   503  			for _, cls := range sr.Clusters {
   504  				clusters = append(clusters, proto.Clone(cls).(ecp_cache_types.Resource))
   505  			}
   506  			continue
   507  		case *v3clusterconfig.Cluster:
   508  			dst = &clustersv3
   509  		case *v3routeconfig.RouteConfiguration:
   510  			dst = &routesv3
   511  		case *v3listenerconfig.Listener:
   512  			dst = &listenersv3
   513  		case *v3runtime.Runtime:
   514  			dst = &runtimesv3
   515  		case *v3bootstrap.Bootstrap:
   516  			bs := m.(*v3bootstrap.Bootstrap)
   517  			sr := bs.StaticResources
   518  			for _, lst := range sr.Listeners {
   519  				// When the RouteConfiguration is embedded in the listener, it will cause envoy to
   520  				// go through a complete drain cycle whenever there is a routing change and that
   521  				// will potentially disrupt in-flight requests. By converting all listeners to use
   522  				// RDS rather than inlining their routing configuration, we significantly reduce the
   523  				// set of circumstances where the listener definition itself changes, and this in
   524  				// turn reduces the set of circumstances where envoy has to go through that drain
   525  				// process and disrupt in-flight requests.
   526  				rdsListener, routeConfigs, err := V3ListenerToRdsListener(lst)
   527  				if err != nil {
   528  					dlog.Errorf(ctx, "Error converting listener to RDS: %+v", err)
   529  					listenersv3 = append(listenersv3, proto.Clone(lst).(ecp_cache_types.Resource))
   530  					continue
   531  				}
   532  				listenersv3 = append(listenersv3, rdsListener)
   533  				for _, rc := range routeConfigs {
   534  					// These routes will get included in the configuration snapshot created below.
   535  					routesv3 = append(routesv3, rc)
   536  				}
   537  			}
   538  			for _, cls := range sr.Clusters {
   539  				clustersv3 = append(clustersv3, proto.Clone(cls).(ecp_cache_types.Resource))
   540  			}
   541  			continue
   542  		default:
   543  			dlog.Warnf(ctx, "Unrecognized resource %s: %v", name, e)
   544  			continue
   545  		}
   546  		*dst = append(*dst, m.(ecp_cache_types.Resource))
   547  	}
   548  
   549  	if fastpathSnapshot != nil && fastpathSnapshot.Snapshot != nil {
   550  		for _, lst := range fastpathSnapshot.Snapshot.Resources[ecp_cache_types.Listener].Items {
   551  			listeners = append(listeners, lst.Resource)
   552  		}
   553  		for _, route := range fastpathSnapshot.Snapshot.Resources[ecp_cache_types.Route].Items {
   554  			routes = append(routes, route.Resource)
   555  		}
   556  		for _, clu := range fastpathSnapshot.Snapshot.Resources[ecp_cache_types.Cluster].Items {
   557  			clusters = append(clusters, clu.Resource)
   558  		}
   559  		// We intentionally omit endpoints since those are carried separately.
   560  	}
   561  
   562  	// The configuration data that reaches us here arrives via two parallel paths that race each
   563  	// other. The endpoint data comes in realtime directly from the golang watcher in the entrypoint
   564  	// package. The cluster configuration comes from the python code. Either one can win which means
   565  	// we might at times see endpoint data with no corresponding cluster and we might also see
   566  	// clusters with no corresponding endpoint data. Both of these circumstances should be
   567  	// transient.
   568  	//
   569  	// To produce a consistent configuration we do an outer join operation on the endpoint and
   570  	// cluster configuration that we have at this moment. If there is no endpoint information for a
   571  	// given cluster, we will synthesize an empty ClusterLoadAssignment.
   572  	//
   573  	// Note that a cluster not existing is very different to envoy than a cluster existing but
   574  	// having an empty ClusterLoadAssignment. When envoy first discovers clusters it goes through a
   575  	// warmup process to be sure the cluster is properly bootstrapped before routing traffic to
   576  	// it. See here for more details:
   577  	//
   578  	// https://www.envoyproxy.io/docs/envoy/latest/intro/arch_overview/upstream/cluster_manager.html?highlight=cluster%20warming
   579  	//
   580  	// For this reason if there is no endpoint data for the cluster we will synthesize an empty
   581  	// ClusterLoadAssignment rather than filtering out the cluster. This avoids triggering the
   582  	// warmup sequence in scenarios where the endpoint data for a cluster is really flapping into
   583  	// and out of existence. In that circumstance we want to faithfully relay to envoy that the
   584  	// cluster exists but currently has no endpoints.
   585  	endpoints := JoinEdsClusters(ctx, clusters, edsEndpoints, edsBypass)
   586  	endpointsv3 := JoinEdsClustersV3(ctx, clustersv3, edsEndpointsV3, edsBypass)
   587  
   588  	// Create a new configuration snapshot from everything we have just loaded from disk.
   589  	curgen := *generation
   590  	*generation++
   591  
   592  	version := fmt.Sprintf("v%d", curgen)
   593  	snapshot := ecp_v2_cache.NewSnapshot(
   594  		version,
   595  		endpoints,
   596  		clusters,
   597  		routes,
   598  		listeners,
   599  		runtimes,
   600  		nil, // secrets
   601  	)
   602  
   603  	if err := snapshot.Consistent(); err != nil {
   604  		bs, _ := json.Marshal(snapshot)
   605  		dlog.Errorf(ctx, "V2 Snapshot inconsistency: %v: %s", err, bs)
   606  		return nil // TODO: should we return the error, rather than just logging it?
   607  	}
   608  
   609  	snapshotv3 := ecp_v3_cache.NewSnapshot(
   610  		version,
   611  		endpointsv3,
   612  		clustersv3,
   613  		routesv3,
   614  		listenersv3,
   615  		runtimesv3,
   616  		nil, // secrets
   617  	)
   618  
   619  	if err := snapshotv3.Consistent(); err != nil {
   620  		bs, _ := json.Marshal(snapshotv3)
   621  		dlog.Errorf(ctx, "V3 Snapshot inconsistency: %v: %s", err, bs)
   622  		return nil // TODO: should we return the error, rather than just logging it?
   623  	}
   624  
   625  	// This used to just directly update envoy. Since we want ratelimiting, we now send an
   626  	// Update object down the channel with a function that knows how to do the update if/when
   627  	// the ratelimiting logic decides.
   628  
   629  	dlog.Debugf(ctx, "Created snapshot %s", version)
   630  	csDump(ctx, snapdirPath, numsnaps, curgen, &snapshot, &snapshotv3)
   631  
   632  	update := Update{version, func() error {
   633  		dlog.Debugf(ctx, "Accepting snapshot %s", version)
   634  
   635  		err := config.SetSnapshot("test-id", snapshot)
   636  		if err != nil {
   637  			return fmt.Errorf("V2 Snapshot error %q for %+v", err, snapshot)
   638  		}
   639  
   640  		err = configv3.SetSnapshot("test-id", snapshotv3)
   641  		if err != nil {
   642  			return fmt.Errorf("V3 Snapshot error %q for %+v", err, snapshotv3)
   643  		}
   644  
   645  		return nil
   646  	}}
   647  
   648  	// We also need to pay attention to contexts here so we can shutdown properly. If we didn't
   649  	// have the context portion, the ratelimit goroutine could shutdown first and we could end
   650  	// up blocking here and never shutting down.
   651  	select {
   652  	case updates <- update:
   653  	case <-ctx.Done():
   654  	}
   655  	return nil
   656  }
   657  
   658  type logAdapterBase struct {
   659  	prefix string
   660  }
   661  
   662  type logAdapterV2 struct {
   663  	logAdapterBase
   664  }
   665  
   666  var _ ecp_v2_server.Callbacks = logAdapterV2{}
   667  var _ ecp_log.Logger = logAdapterV2{}
   668  
   669  type logAdapterV3 struct {
   670  	logAdapterBase
   671  }
   672  
   673  var _ ecp_v3_server.Callbacks = logAdapterV3{}
   674  var _ ecp_log.Logger = logAdapterV3{}
   675  
   676  // Debugf implements ecp_log.Logger.
   677  func (l logAdapterBase) Debugf(format string, args ...interface{}) {
   678  	dlog.Debugf(context.TODO(), format, args...)
   679  }
   680  
   681  // Infof implements ecp_log.Logger.
   682  func (l logAdapterBase) Infof(format string, args ...interface{}) {
   683  	dlog.Infof(context.TODO(), format, args...)
   684  }
   685  
   686  // Warnf implements ecp_log.Logger.
   687  func (l logAdapterBase) Warnf(format string, args ...interface{}) {
   688  	dlog.Warnf(context.TODO(), format, args...)
   689  }
   690  
   691  // Errorf implements ecp_log.Logger.
   692  func (l logAdapterBase) Errorf(format string, args ...interface{}) {
   693  	dlog.Errorf(context.TODO(), format, args...)
   694  }
   695  
   696  // OnStreamOpen implements ecp_v2_server.Callbacks and ecp_v3_server.Callbacks.
   697  func (l logAdapterBase) OnStreamOpen(ctx context.Context, sid int64, stype string) error {
   698  	dlog.Debugf(ctx, "%v Stream open[%v]: %v", l.prefix, sid, stype)
   699  	return nil
   700  }
   701  
   702  // OnStreamClosed implements ecp_v2_server.Callbacks and ecp_v3_server.Callbacks.
   703  func (l logAdapterBase) OnStreamClosed(sid int64) {
   704  	dlog.Debugf(context.TODO(), "%v Stream closed[%v]", l.prefix, sid)
   705  }
   706  
   707  // OnStreamRequest implements ecp_v2_server.Callbacks.
   708  func (l logAdapterV2) OnStreamRequest(sid int64, req *v2.DiscoveryRequest) error {
   709  	dlog.Debugf(context.TODO(), "V2 Stream request[%v] for type %s: requesting %d resources", sid, req.TypeUrl, len(req.ResourceNames))
   710  	dlog.Debugf(context.TODO(), "V2 Stream request[%v] dump: %v", sid, req)
   711  	return nil
   712  }
   713  
   714  // OnStreamRequest implements ecp_v3_server.Callbacks.
   715  func (l logAdapterV3) OnStreamRequest(sid int64, req *v3discovery.DiscoveryRequest) error {
   716  	dlog.Debugf(context.TODO(), "V3 Stream request[%v] for type %s: requesting %d resources", sid, req.TypeUrl, len(req.ResourceNames))
   717  	dlog.Debugf(context.TODO(), "V3 Stream request[%v] dump: %v", sid, req)
   718  	return nil
   719  }
   720  
   721  // OnStreamResponse implements ecp_v2_server.Callbacks.
   722  func (l logAdapterV2) OnStreamResponse(sid int64, req *v2.DiscoveryRequest, res *v2.DiscoveryResponse) {
   723  	dlog.Debugf(context.TODO(), "V2 Stream response[%v] for type %s: returning %d resources", sid, res.TypeUrl, len(res.Resources))
   724  	dlog.Debugf(context.TODO(), "V2 Stream dump response[%v]: %v -> %v", sid, req, res)
   725  }
   726  
   727  // OnStreamResponse implements ecp_v3_server.Callbacks.
   728  func (l logAdapterV3) OnStreamResponse(sid int64, req *v3discovery.DiscoveryRequest, res *v3discovery.DiscoveryResponse) {
   729  	dlog.Debugf(context.TODO(), "V3 Stream response[%v] for type %s: returning %d resources", sid, res.TypeUrl, len(res.Resources))
   730  	dlog.Debugf(context.TODO(), "V3 Stream dump response[%v]: %v -> %v", sid, req, res)
   731  }
   732  
   733  // OnFetchRequest implements ecp_v2_server.Callbacks.
   734  func (l logAdapterV2) OnFetchRequest(ctx context.Context, r *v2.DiscoveryRequest) error {
   735  	dlog.Debugf(ctx, "V2 Fetch request: %v", r)
   736  	return nil
   737  }
   738  
   739  // OnFetchRequest implements ecp_v3_server.Callbacks.
   740  func (l logAdapterV3) OnFetchRequest(ctx context.Context, r *v3discovery.DiscoveryRequest) error {
   741  	dlog.Debugf(ctx, "V3 Fetch request: %v", r)
   742  	return nil
   743  }
   744  
   745  // OnFetchResponse implements ecp_v2_server.Callbacks.
   746  func (l logAdapterV2) OnFetchResponse(req *v2.DiscoveryRequest, res *v2.DiscoveryResponse) {
   747  	dlog.Debugf(context.TODO(), "V2 Fetch response: %v -> %v", req, res)
   748  }
   749  
   750  // OnFetchResponse implements ecp_v3_server.Callbacks.
   751  func (l logAdapterV3) OnFetchResponse(req *v3discovery.DiscoveryRequest, res *v3discovery.DiscoveryResponse) {
   752  	dlog.Debugf(context.TODO(), "V3 Fetch response: %v -> %v", req, res)
   753  }
   754  
   755  func Main(
   756  	ctx context.Context,
   757  	Version string,
   758  	getUsage MemoryGetter,
   759  	fastpathCh <-chan *FastpathSnapshot,
   760  	rawArgs ...string,
   761  ) error {
   762  	args, err := parseArgs(ctx, rawArgs...)
   763  	if err != nil {
   764  		return err
   765  	}
   766  
   767  	dlog.Infof(ctx, "Ambex %s starting, snapdirPath %s", Version, args.snapdirPath)
   768  
   769  	watcher, err := fsnotify.NewWatcher()
   770  	if err != nil {
   771  		return err
   772  	}
   773  	defer watcher.Close()
   774  
   775  	if args.watch {
   776  		for _, d := range args.dirs {
   777  			if err := watcher.Add(d); err != nil {
   778  				return err
   779  			}
   780  		}
   781  	}
   782  
   783  	// The golang signal package does not block when it writes to the channel. We therefore need a
   784  	// nonzero buffer for the channel to minimize the possiblity that we miss out on a signal that
   785  	// comes in while we are doing work and not reading from the channel. To minimize the chance
   786  	// of that happening we will choose a buffer size of 100. That may well be overkill, but
   787  	// better to not have to consider the possibility that we lose a signal.
   788  	sigCh := make(chan os.Signal, 100)
   789  	signal.Notify(sigCh, syscall.SIGHUP)
   790  	defer func() { signal.Stop(sigCh) }()
   791  
   792  	ctx, cancel := context.WithCancel(ctx)
   793  	defer cancel()
   794  
   795  	config := ecp_v2_cache.NewSnapshotCache(true, HasherV2{}, logAdapterV2{logAdapterBase{"V2"}})
   796  	configv3 := ecp_v3_cache.NewSnapshotCache(true, HasherV3{}, logAdapterV3{logAdapterBase{"V3"}})
   797  	server := ecp_v2_server.NewServer(ctx, config, logAdapterV2{logAdapterBase{"V2"}})
   798  	serverv3 := ecp_v3_server.NewServer(ctx, configv3, logAdapterV3{logAdapterBase{"V3"}})
   799  
   800  	grp := dgroup.NewGroup(ctx, dgroup.GroupConfig{})
   801  
   802  	grp.Go("management-server", func(ctx context.Context) error {
   803  		return runManagementServer(ctx, server, serverv3, args.adsNetwork, args.adsAddress)
   804  	})
   805  
   806  	pid := os.Getpid()
   807  	file := "ambex.pid"
   808  	if err := ioutil.WriteFile(file, []byte(fmt.Sprintf("%v", pid)), 0644); err != nil {
   809  		dlog.Warn(ctx, err)
   810  	} else {
   811  		ctx := dlog.WithField(ctx, "pid", pid)
   812  		ctx = dlog.WithField(ctx, "file", file)
   813  		dlog.Info(ctx, "Wrote PID")
   814  	}
   815  
   816  	updates := make(chan Update)
   817  	grp.Go("updater", func(ctx context.Context) error {
   818  		return Updater(ctx, updates, getUsage)
   819  	})
   820  	grp.Go("main-loop", func(ctx context.Context) error {
   821  		generation := 0
   822  		var fastpathSnapshot *FastpathSnapshot
   823  		edsEndpoints := map[string]*v2.ClusterLoadAssignment{}
   824  		edsEndpointsV3 := map[string]*v3endpointconfig.ClusterLoadAssignment{}
   825  
   826  		// We always start by updating with a totally empty snapshot.
   827  		//
   828  		// XXX This seems questionable: why do we do this? Envoy isn't currently started until
   829  		// we have a real configuration...
   830  		err = update(
   831  			ctx,
   832  			args.snapdirPath,
   833  			args.numsnaps,
   834  			args.edsBypass,
   835  			config,
   836  			configv3,
   837  			&generation,
   838  			args.dirs,
   839  			edsEndpoints,
   840  			edsEndpointsV3,
   841  			fastpathSnapshot,
   842  			updates,
   843  		)
   844  		if err != nil {
   845  			return err
   846  		}
   847  
   848  		// This is the main loop where the magic happens.
   849  		for {
   850  
   851  			select {
   852  			case _ = <-sigCh:
   853  				err := update(
   854  					ctx,
   855  					args.snapdirPath,
   856  					args.numsnaps,
   857  					args.edsBypass,
   858  					config,
   859  					configv3,
   860  					&generation,
   861  					args.dirs,
   862  					edsEndpoints,
   863  					edsEndpointsV3,
   864  					fastpathSnapshot,
   865  					updates,
   866  				)
   867  				if err != nil {
   868  					return err
   869  				}
   870  			case fpSnap := <-fastpathCh:
   871  				// Fastpath update. Grab new endpoints and update.
   872  				if fpSnap.Endpoints != nil {
   873  					edsEndpoints = fpSnap.Endpoints.ToMap_v2()
   874  					edsEndpointsV3 = fpSnap.Endpoints.ToMap_v3()
   875  				}
   876  				fastpathSnapshot = fpSnap
   877  				err := update(
   878  					ctx,
   879  					args.snapdirPath,
   880  					args.numsnaps,
   881  					args.edsBypass,
   882  					config,
   883  					configv3,
   884  					&generation,
   885  					args.dirs,
   886  					edsEndpoints,
   887  					edsEndpointsV3,
   888  					fastpathSnapshot,
   889  					updates,
   890  				)
   891  				if err != nil {
   892  					return err
   893  				}
   894  			case <-watcher.Events:
   895  				// Non-fastpath update. Just update.
   896  				err := update(
   897  					ctx,
   898  					args.snapdirPath,
   899  					args.numsnaps,
   900  					args.edsBypass,
   901  					config,
   902  					configv3,
   903  					&generation,
   904  					args.dirs,
   905  					edsEndpoints,
   906  					edsEndpointsV3,
   907  					fastpathSnapshot,
   908  					updates,
   909  				)
   910  				if err != nil {
   911  					return err
   912  				}
   913  			case err := <-watcher.Errors:
   914  				// Something went wrong, so scream about that.
   915  				dlog.Warnf(ctx, "Watcher error: %v", err)
   916  			case <-ctx.Done():
   917  				return nil
   918  			}
   919  		}
   920  	})
   921  
   922  	return grp.Wait()
   923  }
   924  

View as plain text