...

Source file src/github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test/server.go

Documentation: github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test

     1  // Package test contains test utilities
     2  package test
     3  
     4  import (
     5  	"context"
     6  	"fmt"
     7  	"log"
     8  	"net"
     9  	"net/http"
    10  	"time"
    11  
    12  	"google.golang.org/grpc"
    13  	"google.golang.org/grpc/keepalive"
    14  
    15  	server "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/v3"
    16  	"github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test/v3"
    17  
    18  	gcplogger "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/log"
    19  )
    20  
    21  const (
    22  	grpcKeepaliveTime        = 30 * time.Second
    23  	grpcKeepaliveTimeout     = 5 * time.Second
    24  	grpcKeepaliveMinTime     = 30 * time.Second
    25  	grpcMaxConcurrentStreams = 1000000
    26  )
    27  
    28  // HTTPGateway is a custom implementation of [gRPC gateway](https://github.com/grpc-ecosystem/grpc-gateway)
    29  // specialized to Envoy xDS API.
    30  type HTTPGateway struct {
    31  	// Log is an optional log for errors in response write
    32  	Log gcplogger.Logger
    33  
    34  	Gateway server.HTTPGateway
    35  }
    36  
    37  // RunAccessLogServer starts an accesslog server.
    38  func RunAccessLogServer(ctx context.Context, als *test.AccessLogService, alsPort uint) {
    39  	grpcServer := grpc.NewServer()
    40  	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", alsPort))
    41  	if err != nil {
    42  		log.Fatal(err)
    43  	}
    44  
    45  	test.RegisterAccessLogServer(grpcServer, als)
    46  	log.Printf("access log server listening on %d\n", alsPort)
    47  
    48  	go func() {
    49  		if err = grpcServer.Serve(lis); err != nil {
    50  			log.Println(err)
    51  		}
    52  	}()
    53  	<-ctx.Done()
    54  
    55  	grpcServer.GracefulStop()
    56  }
    57  
    58  // RunManagementServer starts an xDS server at the given port.
    59  func RunManagementServer(ctx context.Context, srv server.Server, port uint) {
    60  	// gRPC golang library sets a very small upper bound for the number gRPC/h2
    61  	// streams over a single TCP connection. If a proxy multiplexes requests over
    62  	// a single connection to the management server, then it might lead to
    63  	// availability problems. Keepalive timeouts based on connection_keepalive parameter https://www.envoyproxy.io/docs/envoy/latest/configuration/overview/examples#dynamic
    64  	var grpcOptions []grpc.ServerOption
    65  	grpcOptions = append(grpcOptions,
    66  		grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
    67  		grpc.KeepaliveParams(keepalive.ServerParameters{
    68  			Time:    grpcKeepaliveTime,
    69  			Timeout: grpcKeepaliveTimeout,
    70  		}),
    71  		grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
    72  			MinTime:             grpcKeepaliveMinTime,
    73  			PermitWithoutStream: true,
    74  		}),
    75  	)
    76  	grpcServer := grpc.NewServer(grpcOptions...)
    77  
    78  	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
    79  	if err != nil {
    80  		log.Fatal(err)
    81  	}
    82  
    83  	test.RegisterServer(grpcServer, srv)
    84  
    85  	log.Printf("management server listening on %d\n", port)
    86  	go func() {
    87  		if err = grpcServer.Serve(lis); err != nil {
    88  			log.Println(err)
    89  		}
    90  	}()
    91  	<-ctx.Done()
    92  
    93  	grpcServer.GracefulStop()
    94  }
    95  
    96  // RunManagementGateway starts an HTTP gateway to an xDS server.
    97  func RunManagementGateway(ctx context.Context, srv server.Server, port uint) {
    98  	log.Printf("gateway listening HTTP/1.1 on %d\n", port)
    99  	// Ignore: G114: Use of net/http serve function that has no support for setting timeouts
   100  	// nolint:gosec
   101  	server := &http.Server{
   102  		Addr: fmt.Sprintf(":%d", port),
   103  		Handler: &HTTPGateway{
   104  			Gateway: server.HTTPGateway{Server: srv},
   105  		},
   106  	}
   107  	go func() {
   108  		if err := server.ListenAndServe(); err != nil {
   109  			log.Printf("failed to start listening: %s", err)
   110  		}
   111  	}()
   112  	<-ctx.Done()
   113  
   114  	// Cleanup our gateway if we receive a shutdown
   115  	if err := server.Shutdown(ctx); err != nil {
   116  		log.Printf("failed to shut down: %s", err)
   117  	}
   118  }
   119  
   120  func (h *HTTPGateway) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
   121  	bytes, code, err := h.Gateway.ServeHTTP(req)
   122  
   123  	if err != nil {
   124  		http.Error(resp, err.Error(), code)
   125  		return
   126  	}
   127  
   128  	if bytes == nil {
   129  		resp.WriteHeader(http.StatusNotModified)
   130  		return
   131  	}
   132  
   133  	if _, err = resp.Write(bytes); err != nil && h.Log != nil {
   134  		h.Log.Errorf("gateway error: %v", err)
   135  	}
   136  }
   137  

View as plain text