...
1
2 package test
3
4 import (
5 "context"
6 "fmt"
7 "log"
8 "net"
9 "net/http"
10 "time"
11
12 "google.golang.org/grpc"
13 "google.golang.org/grpc/keepalive"
14
15 server "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/server/v3"
16 "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/test/v3"
17
18 gcplogger "github.com/emissary-ingress/emissary/v3/pkg/envoy-control-plane/log"
19 )
20
21 const (
22 grpcKeepaliveTime = 30 * time.Second
23 grpcKeepaliveTimeout = 5 * time.Second
24 grpcKeepaliveMinTime = 30 * time.Second
25 grpcMaxConcurrentStreams = 1000000
26 )
27
28
29
30 type HTTPGateway struct {
31
32 Log gcplogger.Logger
33
34 Gateway server.HTTPGateway
35 }
36
37
38 func RunAccessLogServer(ctx context.Context, als *test.AccessLogService, alsPort uint) {
39 grpcServer := grpc.NewServer()
40 lis, err := net.Listen("tcp", fmt.Sprintf(":%d", alsPort))
41 if err != nil {
42 log.Fatal(err)
43 }
44
45 test.RegisterAccessLogServer(grpcServer, als)
46 log.Printf("access log server listening on %d\n", alsPort)
47
48 go func() {
49 if err = grpcServer.Serve(lis); err != nil {
50 log.Println(err)
51 }
52 }()
53 <-ctx.Done()
54
55 grpcServer.GracefulStop()
56 }
57
58
59 func RunManagementServer(ctx context.Context, srv server.Server, port uint) {
60
61
62
63
64 var grpcOptions []grpc.ServerOption
65 grpcOptions = append(grpcOptions,
66 grpc.MaxConcurrentStreams(grpcMaxConcurrentStreams),
67 grpc.KeepaliveParams(keepalive.ServerParameters{
68 Time: grpcKeepaliveTime,
69 Timeout: grpcKeepaliveTimeout,
70 }),
71 grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
72 MinTime: grpcKeepaliveMinTime,
73 PermitWithoutStream: true,
74 }),
75 )
76 grpcServer := grpc.NewServer(grpcOptions...)
77
78 lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
79 if err != nil {
80 log.Fatal(err)
81 }
82
83 test.RegisterServer(grpcServer, srv)
84
85 log.Printf("management server listening on %d\n", port)
86 go func() {
87 if err = grpcServer.Serve(lis); err != nil {
88 log.Println(err)
89 }
90 }()
91 <-ctx.Done()
92
93 grpcServer.GracefulStop()
94 }
95
96
97 func RunManagementGateway(ctx context.Context, srv server.Server, port uint) {
98 log.Printf("gateway listening HTTP/1.1 on %d\n", port)
99
100
101 server := &http.Server{
102 Addr: fmt.Sprintf(":%d", port),
103 Handler: &HTTPGateway{
104 Gateway: server.HTTPGateway{Server: srv},
105 },
106 }
107 go func() {
108 if err := server.ListenAndServe(); err != nil {
109 log.Printf("failed to start listening: %s", err)
110 }
111 }()
112 <-ctx.Done()
113
114
115 if err := server.Shutdown(ctx); err != nil {
116 log.Printf("failed to shut down: %s", err)
117 }
118 }
119
120 func (h *HTTPGateway) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
121 bytes, code, err := h.Gateway.ServeHTTP(req)
122
123 if err != nil {
124 http.Error(resp, err.Error(), code)
125 return
126 }
127
128 if bytes == nil {
129 resp.WriteHeader(http.StatusNotModified)
130 return
131 }
132
133 if _, err = resp.Write(bytes); err != nil && h.Log != nil {
134 h.Log.Errorf("gateway error: %v", err)
135 }
136 }
137
View as plain text