...

Source file src/github.com/datawire/ambassador/v2/pkg/envoytest/controller.go

Documentation: github.com/datawire/ambassador/v2/pkg/envoytest

     1  package envoytest
     2  
     3  import (
     4  	// standard library
     5  	"context"
     6  	"fmt"
     7  	"net"
     8  	"sync"
     9  	"testing"
    10  
    11  	// third-party libraries
    12  	"google.golang.org/genproto/googleapis/rpc/status"
    13  	"google.golang.org/grpc"
    14  
    15  	// envoy api v2
    16  	apiv2 "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2"
    17  	apiv2_core "github.com/datawire/ambassador/v2/pkg/api/envoy/api/v2/core"
    18  	apiv2_discovery "github.com/datawire/ambassador/v2/pkg/api/envoy/service/discovery/v2"
    19  
    20  	// envoy control plane
    21  	ecp_cache_types "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/types"
    22  	ecp_v2_cache "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/cache/v2"
    23  	ecp_v2_server "github.com/datawire/ambassador/v2/pkg/envoy-control-plane/server/v2"
    24  
    25  	// first-party-libraries
    26  	"github.com/datawire/dlib/dhttp"
    27  	"github.com/datawire/dlib/dlog"
    28  )
    29  
    30  // EnvoyController runs a go control plane for envoy that tracks ACKS/NACKS for configuration
    31  // updates. This allows code to know when envoy has successfully reconfigured as well as have access
    32  // to the error details when envoy is fed invalid configuration.
    33  type EnvoyController struct {
    34  	address string
    35  
    36  	configCache ecp_v2_cache.SnapshotCache
    37  
    38  	// Protects the errors and outstanding fields.
    39  	cond        *sync.Cond
    40  	errors      map[string]*errorInfo // Maps config version to error info related to that config
    41  	outstanding map[string]ackInfo    // Maps response nonce to config version and typeUrl
    42  
    43  	// Captured context for logging callbacks.
    44  	logCtx context.Context
    45  }
    46  
    47  // ackInfo is used to correlate the nonce supplied in discovery responses to the error detail
    48  // supplied in discovery requests.
    49  type ackInfo struct {
    50  	version string
    51  	typeUrl string
    52  }
    53  
    54  // Holds the error info associated with a configuration version. The details map is keyed by typeUrl and has
    55  type errorInfo struct {
    56  	version string
    57  	details map[string]*status.Status // keyed by typeUrl
    58  }
    59  
    60  func (e *errorInfo) String() string {
    61  	return fmt.Sprintf("%s %v", e.version, e.details)
    62  }
    63  
    64  // NewEnvoyControler creates a new envoy controller that binds to the supplied address when Run.
    65  func NewEnvoyController(address string) *EnvoyController {
    66  	result := &EnvoyController{
    67  		address:     address,
    68  		cond:        sync.NewCond(&sync.Mutex{}),
    69  		errors:      map[string]*errorInfo{},
    70  		outstanding: map[string]ackInfo{},
    71  	}
    72  	result.configCache = ecp_v2_cache.NewSnapshotCache(true, result, result)
    73  	return result
    74  }
    75  
    76  // Configure will update the envoy configuration and block until the reconfiguration either succeeds
    77  // or signals an error.
    78  func (e *EnvoyController) Configure(node, version string, snapshot ecp_v2_cache.Snapshot) (*status.Status, error) {
    79  	err := e.configCache.SetSnapshot(node, snapshot)
    80  	if err != nil {
    81  		return nil, err
    82  	}
    83  
    84  	// Versioning happens on a per type basis, so we need to figure out how many versions will be
    85  	// requested in order to figure out how to properly check that the entire snapshot was
    86  	// acked/nacked.
    87  	typeUrls := []string{}
    88  	if len(snapshot.Resources[ecp_cache_types.Endpoint].Items) > 0 {
    89  		typeUrls = append(typeUrls, "type.googleapis.com/envoy.api.v2.ClusterLoadAssignment")
    90  	}
    91  	if len(snapshot.Resources[ecp_cache_types.Cluster].Items) > 0 {
    92  		typeUrls = append(typeUrls, "type.googleapis.com/envoy.api.v2.Cluster")
    93  	}
    94  	if len(snapshot.Resources[ecp_cache_types.Route].Items) > 0 {
    95  		typeUrls = append(typeUrls, "type.googleapis.com/envoy.api.v2.RouteConfiguration")
    96  	}
    97  	if len(snapshot.Resources[ecp_cache_types.Listener].Items) > 0 {
    98  		typeUrls = append(typeUrls, "type.googleapis.com/envoy.api.v2.Listener")
    99  	}
   100  
   101  	for _, t := range typeUrls {
   102  		status := e.waitFor(version, t)
   103  		if status != nil {
   104  			return status, nil
   105  		}
   106  	}
   107  
   108  	return nil, nil
   109  }
   110  
   111  // waitFor blocks until the supplied version and typeUrl are acknowledged by envoy. It returns the
   112  // status if there is an error and nil if the configuration is successfully accepted by envoy.
   113  func (e *EnvoyController) waitFor(version string, typeUrl string) *status.Status {
   114  	e.cond.L.Lock()
   115  	defer e.cond.L.Unlock()
   116  	for {
   117  		error, ok := e.errors[version]
   118  		if ok {
   119  			for k, v := range error.details {
   120  				if v != nil {
   121  					return v
   122  				}
   123  				if k == typeUrl {
   124  					return v
   125  				}
   126  			}
   127  		}
   128  		e.cond.Wait()
   129  	}
   130  }
   131  
   132  // Run the ADS server.
   133  func (e *EnvoyController) Run(ctx context.Context) error {
   134  	// The callbacks don't have access to a context, so we'll capture this one for them to use.
   135  	e.logCtx = ctx
   136  
   137  	grpcServer := grpc.NewServer()
   138  	srv := ecp_v2_server.NewServer(ctx, e.configCache, e)
   139  
   140  	apiv2_discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, srv)
   141  	apiv2.RegisterEndpointDiscoveryServiceServer(grpcServer, srv)
   142  	apiv2.RegisterClusterDiscoveryServiceServer(grpcServer, srv)
   143  	apiv2.RegisterRouteDiscoveryServiceServer(grpcServer, srv)
   144  	apiv2.RegisterListenerDiscoveryServiceServer(grpcServer, srv)
   145  
   146  	lis, err := net.Listen("tcp", e.address)
   147  	if err != nil {
   148  		return err
   149  	}
   150  
   151  	sc := &dhttp.ServerConfig{
   152  		Handler: grpcServer,
   153  	}
   154  	if err := sc.Serve(ctx, lis); err != nil {
   155  		if err != nil && err != context.Canceled {
   156  			return err
   157  		}
   158  	}
   159  	return nil
   160  }
   161  
   162  // SetupEnvoyController will create and run an EnvoyController with the supplied address as well as
   163  // registering a Cleanup function to shutdown the EnvoyController.
   164  func SetupEnvoyController(t *testing.T, address string) *EnvoyController {
   165  	e := NewEnvoyController(address)
   166  	ctx, cancel := context.WithCancel(dlog.NewTestContext(t, false))
   167  	done := make(chan struct{})
   168  	t.Cleanup(func() {
   169  		cancel()
   170  		<-done
   171  	})
   172  	go func() {
   173  		err := e.Run(ctx)
   174  		if err != nil {
   175  			t.Errorf("envoy controller exited with error: %+v", err)
   176  		}
   177  		close(done)
   178  	}()
   179  	return e
   180  }
   181  
   182  // ID is a callback function that the go control plane uses. I don't know what it does.
   183  func (e EnvoyController) ID(node *apiv2_core.Node) string {
   184  	if node == nil {
   185  		return "unknown"
   186  	}
   187  	return node.Id
   188  }
   189  
   190  // OnStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS).
   191  func (e *EnvoyController) OnStreamOpen(_ context.Context, sid int64, stype string) error {
   192  	//e.Infof("Stream open[%v]: %v", sid, stype)
   193  	return nil
   194  }
   195  
   196  // OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
   197  func (e *EnvoyController) OnStreamClosed(sid int64) {
   198  	//e.Infof("Stream closed[%v]", sid)
   199  }
   200  
   201  // OnStreamRequest is called once a request is received on a stream.
   202  func (e *EnvoyController) OnStreamRequest(sid int64, req *apiv2.DiscoveryRequest) error {
   203  	//e.Infof("Stream request[%v]: %v", sid, req.TypeUrl)
   204  
   205  	func() {
   206  		e.cond.L.Lock()
   207  		defer e.cond.L.Unlock()
   208  		ackInfo, ok := e.outstanding[req.ResponseNonce]
   209  		if ok {
   210  			errors, ok := e.errors[ackInfo.version]
   211  			if !ok {
   212  				errors = &errorInfo{version: ackInfo.version, details: map[string]*status.Status{}}
   213  				e.errors[ackInfo.version] = errors
   214  			}
   215  			errors.details[ackInfo.typeUrl] = req.ErrorDetail
   216  			delete(e.outstanding, req.ResponseNonce)
   217  		}
   218  		e.cond.Broadcast()
   219  	}()
   220  
   221  	return nil
   222  }
   223  
   224  // OnStreamResponse is called immediately prior to sending a response on a stream.
   225  func (e *EnvoyController) OnStreamResponse(sid int64, req *apiv2.DiscoveryRequest, res *apiv2.DiscoveryResponse) {
   226  	//e.Infof("Stream response[%v]: %v -> %v", sid, req.TypeUrl, res.Nonce)
   227  	func() {
   228  		e.cond.L.Lock()
   229  		defer e.cond.L.Unlock()
   230  		e.outstanding[res.Nonce] = ackInfo{res.VersionInfo, res.TypeUrl}
   231  	}()
   232  }
   233  
   234  // OnFetchRequest is called for each Fetch request
   235  func (e *EnvoyController) OnFetchRequest(_ context.Context, r *apiv2.DiscoveryRequest) error {
   236  	//e.Infof("Fetch request: %v", r)
   237  	return nil
   238  }
   239  
   240  // OnFetchResponse is called immediately prior to sending a response.
   241  func (e *EnvoyController) OnFetchResponse(req *apiv2.DiscoveryRequest, res *apiv2.DiscoveryResponse) {
   242  	//e.Infof("Fetch response: %v -> %v", req, res)
   243  }
   244  
   245  // The go control plane requires a logger to be injected. These methods implement the logger
   246  // interface.
   247  func (e *EnvoyController) Debugf(format string, args ...interface{}) {
   248  	dlog.Debugf(e.logCtx, format, args...)
   249  }
   250  func (e *EnvoyController) Infof(format string, args ...interface{}) {
   251  	dlog.Infof(e.logCtx, format, args...)
   252  }
   253  func (e *EnvoyController) Warnf(format string, args ...interface{}) {
   254  	dlog.Warnf(e.logCtx, format, args...)
   255  }
   256  func (e *EnvoyController) Errorf(format string, args ...interface{}) {
   257  	dlog.Errorf(e.logCtx, format, args...)
   258  }
   259  

View as plain text