1 package services
2
3 import (
4
5 "bytes"
6 "context"
7 "crypto/tls"
8 "encoding/json"
9 "fmt"
10 "strconv"
11 "strings"
12 "time"
13
14
15 grpc "google.golang.org/grpc"
16 codes "google.golang.org/grpc/codes"
17 metadata "google.golang.org/grpc/metadata"
18 status "google.golang.org/grpc/status"
19
20
21 pb "github.com/emissary-ingress/emissary/v3/pkg/api/kat"
22
23
24 "github.com/datawire/dlib/dgroup"
25 "github.com/datawire/dlib/dhttp"
26 "github.com/datawire/dlib/dlog"
27 )
28
29
30 type GRPC struct {
31 Port int16
32 Backend string
33 SecurePort int16
34 SecureBackend string
35 Cert string
36 Key string
37
38 pb.UnsafeEchoServiceServer
39 }
40
41
42 func DefaultOpts() []grpc.ServerOption {
43 return []grpc.ServerOption{
44 grpc.MaxRecvMsgSize(1024 * 1024 * 5),
45 grpc.MaxSendMsgSize(1024 * 1024 * 5),
46 }
47 }
48
49
50 func (g *GRPC) Start(ctx context.Context) <-chan bool {
51 dlog.Printf(ctx, "GRPC: %s listening on %d/%d", g.Backend, g.Port, g.SecurePort)
52
53 grpcHandler := grpc.NewServer(DefaultOpts()...)
54 pb.RegisterEchoServiceServer(grpcHandler, g)
55
56 cer, err := tls.LoadX509KeyPair(g.Cert, g.Key)
57 if err != nil {
58 dlog.Error(ctx, err)
59 panic(err)
60 }
61
62 sc := &dhttp.ServerConfig{
63 Handler: grpcHandler,
64 TLSConfig: &tls.Config{
65 Certificates: []tls.Certificate{cer},
66 },
67 }
68
69 grp := dgroup.NewGroup(ctx, dgroup.GroupConfig{})
70 grp.Go("cleartext", func(ctx context.Context) error {
71 return sc.ListenAndServe(ctx, fmt.Sprintf(":%v", g.Port))
72 })
73 grp.Go("tls", func(ctx context.Context) error {
74 return sc.ListenAndServeTLS(ctx, fmt.Sprintf(":%v", g.SecurePort), "", "")
75 })
76
77 dlog.Print(ctx, "starting gRPC echo service")
78
79 exited := make(chan bool)
80 go func() {
81 if err := grp.Wait(); err != nil {
82 dlog.Error(ctx, err)
83 panic(err)
84 }
85 close(exited)
86 }()
87 return exited
88 }
89
90
91 func (g *GRPC) Echo(ctx context.Context, r *pb.EchoRequest) (*pb.EchoResponse, error) {
92 md, ok := metadata.FromIncomingContext(ctx)
93 if !ok {
94 return nil, status.Error(codes.Code(13), "request has not valid context metadata")
95 }
96
97 buf := bytes.Buffer{}
98 buf.WriteString("metadata received: \n")
99 for k, v := range md {
100 buf.WriteString(fmt.Sprintf("%v : %s\n", k, strings.Join(v, ",")))
101 }
102 dlog.Println(ctx, buf.String())
103
104 request := &pb.Request{
105 Headers: make(map[string]string),
106 }
107
108 response := &pb.Response{
109 Headers: make(map[string]string),
110 }
111
112
113 for k, v := range md {
114 request.Headers[k] = strings.Join(v, ",")
115 response.Headers[k] = strings.Join(v, ",")
116 }
117
118
119 backend := g.Backend
120
121
122 if len(md["x-forwarded-proto"]) > 0 && md["x-forwarded-proto"][0] == "https" {
123
124 backend = g.SecureBackend
125 request.Tls = &pb.TLS{
126 Enabled: true,
127 }
128 }
129
130
131 if h, ok := md["kat-req-echo-requested-backend-delay"]; ok {
132 if v, err := strconv.Atoi(h[0]); err == nil {
133 dlog.Printf(ctx, "Delaying response by %v ms", v)
134 time.Sleep(time.Duration(v) * time.Millisecond)
135 }
136 }
137
138
139 response.Headers["date"] = time.Now().Format(time.RFC1123)
140
141
142 for _, v := range md["kat-req-echo-requested-headers"] {
143 if len(md[v]) > 0 {
144 s := strings.Join(md[v], ",")
145 response.Headers[v] = s
146 p := metadata.Pairs(v, s)
147 if err := grpc.SendHeader(ctx, p); err != nil {
148 return nil, err
149 }
150 }
151 }
152
153
154 echoRES := &pb.EchoResponse{
155 Backend: backend,
156 Request: request,
157 Response: response,
158 }
159
160
161 if data, err := json.MarshalIndent(echoRES, "", " "); err == nil {
162 dlog.Printf(ctx, "setting response: %s\n", string(data))
163 }
164
165
166 if len(md["kat-req-echo-requested-status"]) > 0 {
167 val, err := strconv.Atoi(md["kat-req-echo-requested-status"][0])
168 if err == nil {
169 if val < 18 || val > 0 {
170
171 return echoRES, status.Error(codes.Code(val), "kat-req-echo-requested-status")
172 }
173 }
174 }
175
176
177 return echoRES, nil
178 }
179
View as plain text