...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/envoytest/controller.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/envoytest

     1  package envoytest
     2  
     3  import (
     4  	// standard library
     5  	"context"
     6  	"fmt"
     7  	"sync"
     8  
     9  	// third-party libraries
    10  	"google.golang.org/genproto/googleapis/rpc/status"
    11  	"google.golang.org/grpc"
    12  
    13  	// envoy api v3
    14  	v3core "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/config/core/v3"
    15  	v3cluster "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/cluster/v3"
    16  	v3discovery "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/discovery/v3"
    17  	v3endpoint "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/endpoint/v3"
    18  	v3listener "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/listener/v3"
    19  	v3route "github.com/emissary-ingress/emissary/v3/pkg/api/envoy/service/route/v3"
    20  
    21  	// envoy control plane
    22  	ecp_v3_cache "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/cache/v3"
    23  	ecp_log "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/log"
    24  	ecp_v3_resource "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/resource/v3"
    25  	ecp_v3_server "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/v3"
    26  
    27  	// first-party-libraries
    28  	"github.com/datawire/dlib/dhttp"
    29  	"github.com/datawire/dlib/dlog"
    30  )
    31  
    32  // EnvoyController runs a go control plane for envoy that tracks ACKS/NACKS for configuration
    33  // updates. This allows code to know when envoy has successfully reconfigured as well as have access
    34  // to the error details when envoy is fed invalid configuration.
    35  type EnvoyController struct {
    36  	address string
    37  
    38  	configCache ecp_v3_cache.SnapshotCache
    39  
    40  	cond        *sync.Cond            // Protects the 'results' and 'outstanding'
    41  	results     map[string]*errorInfo // Maps config version to error info related to that config
    42  	outstanding map[string]ackInfo    // Maps response nonce to config version and typeURL
    43  
    44  	// logCtx gets set when .Run() starts.
    45  	logCtx context.Context
    46  }
    47  
    48  // ackInfo is used to correlate the nonce supplied in discovery responses to the error detail
    49  // supplied in discovery requests.
    50  type ackInfo struct {
    51  	version string
    52  	typeURL string
    53  }
    54  
    55  // Holds the error info associated with a configuration version. The details map is keyed by typeURL
    56  // and has
    57  type errorInfo struct {
    58  	version string
    59  	details map[string]*status.Status // keyed by typeURL
    60  }
    61  
    62  func (e *errorInfo) String() string {
    63  	return fmt.Sprintf("%s %v", e.version, e.details)
    64  }
    65  
    66  // NewEnvoyControler creates a new envoy controller that binds to the supplied address when Run.
    67  func NewEnvoyController(address string) *EnvoyController {
    68  	ret := &EnvoyController{
    69  		address:     address,
    70  		cond:        sync.NewCond(&sync.Mutex{}),
    71  		results:     map[string]*errorInfo{},
    72  		outstanding: map[string]ackInfo{},
    73  	}
    74  
    75  	ret.configCache = ecp_v3_cache.NewSnapshotCache(
    76  		true,              // ads
    77  		ecNodeHash{},      // hash
    78  		ecLogger{ec: ret}, // logger
    79  	)
    80  	return ret
    81  }
    82  
    83  // Configure will update the envoy configuration and block until the reconfiguration either succeeds
    84  // or signals an error.
    85  func (e *EnvoyController) Configure(ctx context.Context, node, version string, snapshot ecp_v3_cache.ResourceSnapshot) (*status.Status, error) {
    86  
    87  	err := e.configCache.SetSnapshot(ctx, node, snapshot)
    88  	if err != nil {
    89  		return nil, err
    90  	}
    91  
    92  	// Versioning happens on a per type basis, so we need to figure out how many versions will be
    93  	// requested in order to figure out how to properly check that the entire snapshot was
    94  	// acked/nacked.
    95  	var typeURLs []string
    96  
    97  	if len(snapshot.GetResources(ecp_v3_resource.EndpointType)) > 0 {
    98  		typeURLs = append(typeURLs, ecp_v3_resource.EndpointType)
    99  	}
   100  	if len(snapshot.GetResources(ecp_v3_resource.ClusterType)) > 0 {
   101  		typeURLs = append(typeURLs, ecp_v3_resource.ClusterType)
   102  	}
   103  	if len(snapshot.GetResources(ecp_v3_resource.RouteType)) > 0 {
   104  		typeURLs = append(typeURLs, ecp_v3_resource.RouteType)
   105  	}
   106  	if len(snapshot.GetResources(ecp_v3_resource.ListenerType)) > 0 {
   107  		typeURLs = append(typeURLs, ecp_v3_resource.ListenerType)
   108  	}
   109  
   110  	for _, t := range typeURLs {
   111  		status, err := e.waitFor(ctx, version, t)
   112  		if err != nil {
   113  			return nil, err
   114  		}
   115  		if status != nil {
   116  			return status, nil
   117  		}
   118  	}
   119  
   120  	return nil, nil
   121  }
   122  
   123  // waitFor blocks until the supplied version and typeURL are acknowledged by envoy. It returns the
   124  // status if there is an error and nil if the configuration is successfully accepted by envoy.
   125  func (e *EnvoyController) waitFor(ctx context.Context, version string, typeURL string) (*status.Status, error) {
   126  	ctx, cancel := context.WithCancel(ctx)
   127  	defer func() {
   128  		cancel()
   129  	}()
   130  	go func() {
   131  		<-ctx.Done()
   132  		e.cond.L.Lock()
   133  		defer e.cond.L.Unlock()
   134  		e.cond.Broadcast()
   135  	}()
   136  
   137  	var (
   138  		retStatus *status.Status
   139  		retErr    error
   140  	)
   141  
   142  	condition := func() bool {
   143  		// If the Context was canceled, then go ahead and bail early.
   144  		if err := ctx.Err(); err != nil {
   145  			retErr = err
   146  			return true
   147  		}
   148  		// See if our 'version' has a result yet.
   149  		result, ok := e.results[version]
   150  		if !ok {
   151  			return false
   152  		}
   153  		// Does our typeURL within that result have a status?
   154  		if status, ok := result.details[typeURL]; ok {
   155  			retStatus = status
   156  			return true
   157  		}
   158  		// OK, our 'version' has a result, but our typeURL doesn't have a status within it.
   159  		// Do any other typeURLs within the result have an error status that we can return?
   160  		for _, status := range result.details {
   161  			if status != nil {
   162  				retStatus = status
   163  				return true
   164  			}
   165  		}
   166  		// Darn, we didn't find anything worth returning.
   167  		return false
   168  	}
   169  
   170  	e.cond.L.Lock()
   171  	defer e.cond.L.Unlock()
   172  	for !condition() {
   173  		e.cond.Wait()
   174  	}
   175  	return retStatus, retErr
   176  }
   177  
   178  // Run the ADS server.
   179  func (e *EnvoyController) Run(ctx context.Context) error {
   180  	// The callbacks don't have access to a context, so we'll capture this one for them to use.
   181  	e.logCtx = ctx
   182  
   183  	srv := ecp_v3_server.NewServer(ctx,
   184  		e.configCache,      // config
   185  		ecCallbacks{ec: e}, // calbacks
   186  	)
   187  
   188  	grpcMux := grpc.NewServer()
   189  	v3discovery.RegisterAggregatedDiscoveryServiceServer(grpcMux, srv)
   190  	v3endpoint.RegisterEndpointDiscoveryServiceServer(grpcMux, srv)
   191  	v3cluster.RegisterClusterDiscoveryServiceServer(grpcMux, srv)
   192  	v3route.RegisterRouteDiscoveryServiceServer(grpcMux, srv)
   193  	v3listener.RegisterListenerDiscoveryServiceServer(grpcMux, srv)
   194  
   195  	sc := &dhttp.ServerConfig{
   196  		Handler: grpcMux,
   197  	}
   198  	return sc.ListenAndServe(ctx, e.address)
   199  }
   200  
   201  ////////////////////////////////////////////////////////////////////////////////
   202  
   203  type ecNodeHash struct{}
   204  
   205  var _ ecp_v3_cache.NodeHash = ecNodeHash{}
   206  
   207  // ID implements ecp_v3_cache.NodeHash.
   208  func (ecNodeHash) ID(node *v3core.Node) string {
   209  	if node == nil {
   210  		return "unknown"
   211  	}
   212  	return node.Id
   213  }
   214  
   215  ////////////////////////////////////////////////////////////////////////////////
   216  
   217  type ecCallbacks struct {
   218  	ec *EnvoyController
   219  }
   220  
   221  var _ ecp_v3_server.Callbacks = ecCallbacks{}
   222  
   223  // OnStreamOpen implements ecp_v3_server.Callbacks.
   224  func (ecc ecCallbacks) OnStreamOpen(_ context.Context, sid int64, stype string) error {
   225  	//e.Infof("Stream open[%v]: %v", sid, stype)
   226  	return nil
   227  }
   228  
   229  // OnStreamClosed implements ecp_v3_server.Callbacks.
   230  func (ecc ecCallbacks) OnStreamClosed(sid int64, node *v3core.Node) {
   231  	//e.Infof("Stream closed[%v]", sid)
   232  }
   233  
   234  // OnStreamRequest implements ecp_v2_server.Callbacks.
   235  func (ecc ecCallbacks) OnStreamRequest(sid int64, req *v3discovery.DiscoveryRequest) error {
   236  	//e.Infof("Stream request[%v]: %v", sid, req.TypeURL)
   237  
   238  	ecc.ec.cond.L.Lock()
   239  	defer ecc.ec.cond.L.Unlock()
   240  	defer ecc.ec.cond.Broadcast()
   241  
   242  	if ackInfo, ok := ecc.ec.outstanding[req.ResponseNonce]; ok {
   243  		results, ok := ecc.ec.results[ackInfo.version]
   244  		if !ok {
   245  			results = &errorInfo{version: ackInfo.version, details: map[string]*status.Status{}}
   246  			ecc.ec.results[ackInfo.version] = results
   247  		}
   248  		results.details[ackInfo.typeURL] = req.ErrorDetail
   249  		delete(ecc.ec.outstanding, req.ResponseNonce)
   250  	}
   251  
   252  	return nil
   253  }
   254  
   255  // OnStreamResponse implements ecp_v3_server.Callbacks.
   256  func (ecc ecCallbacks) OnStreamResponse(ctx context.Context, sid int64, req *v3discovery.DiscoveryRequest, res *v3discovery.DiscoveryResponse) {
   257  	//e.Infof("Stream response[%v]: %v -> %v", sid, req.TypeURL, res.Nonce)
   258  
   259  	ecc.ec.cond.L.Lock()
   260  	defer ecc.ec.cond.L.Unlock()
   261  	defer ecc.ec.cond.Broadcast()
   262  
   263  	ecc.ec.outstanding[res.Nonce] = ackInfo{res.VersionInfo, res.TypeUrl}
   264  
   265  }
   266  
   267  // OnFetchRequest implements ecp_v3_server.Callbacks.
   268  func (ecc ecCallbacks) OnFetchRequest(_ context.Context, r *v3discovery.DiscoveryRequest) error {
   269  	//e.Infof("Fetch request: %v", r)
   270  	return nil
   271  }
   272  
   273  // OnFetchResponse implements ecp_v3_server.Callbacks.
   274  func (ecc ecCallbacks) OnFetchResponse(req *v3discovery.DiscoveryRequest, res *v3discovery.DiscoveryResponse) {
   275  	//e.Infof("Fetch response: %v -> %v", req, res)
   276  }
   277  
   278  // OnDeltaStreamOpen implements ecp_v3_server.Callbacks.
   279  func (ecc ecCallbacks) OnDeltaStreamOpen(ctx context.Context, sid int64, stype string) error {
   280  	return nil
   281  }
   282  
   283  // OnDeltaStreamClosed implements ecp_v3_server.Callbacks.
   284  func (ecc ecCallbacks) OnDeltaStreamClosed(sid int64, node *v3core.Node) {
   285  }
   286  
   287  // OnStreamDeltaRequest implements ecp_v3_server.Callbacks.
   288  func (ecc ecCallbacks) OnStreamDeltaRequest(sid int64, req *v3discovery.DeltaDiscoveryRequest) error {
   289  	return nil
   290  }
   291  
   292  // OnStreamDelatResponse implements ecp_v3_server.Callbacks.
   293  func (ecc ecCallbacks) OnStreamDeltaResponse(sid int64, req *v3discovery.DeltaDiscoveryRequest, res *v3discovery.DeltaDiscoveryResponse) {
   294  }
   295  
   296  ////////////////////////////////////////////////////////////////////////////////
   297  
   298  type ecLogger struct {
   299  	ec *EnvoyController
   300  }
   301  
   302  var _ ecp_log.Logger = ecLogger{}
   303  
   304  // Debugf implements ecp_log.Logger.
   305  func (ecl ecLogger) Debugf(format string, args ...interface{}) {
   306  	dlog.Debugf(ecl.ec.logCtx, format, args...)
   307  }
   308  
   309  // Infof implements ecp_log.Logger.
   310  func (ecl ecLogger) Infof(format string, args ...interface{}) {
   311  	dlog.Infof(ecl.ec.logCtx, format, args...)
   312  }
   313  
   314  // Warnf implements ecp_log.Logger.
   315  func (ecl ecLogger) Warnf(format string, args ...interface{}) {
   316  	dlog.Warnf(ecl.ec.logCtx, format, args...)
   317  }
   318  
   319  // Errorf implements ecp_log.Logger.
   320  func (ecl ecLogger) Errorf(format string, args ...interface{}) {
   321  	dlog.Errorf(ecl.ec.logCtx, format, args...)
   322  }
   323  

View as plain text