1
18
19
20
21
22
23
24 package transport
25
26 import (
27 "bytes"
28 "context"
29 "errors"
30 "fmt"
31 "io"
32 "net"
33 "net/http"
34 "strings"
35 "sync"
36 "time"
37
38 "golang.org/x/net/http2"
39 "google.golang.org/grpc/codes"
40 "google.golang.org/grpc/credentials"
41 "google.golang.org/grpc/internal/grpclog"
42 "google.golang.org/grpc/internal/grpcutil"
43 "google.golang.org/grpc/metadata"
44 "google.golang.org/grpc/peer"
45 "google.golang.org/grpc/stats"
46 "google.golang.org/grpc/status"
47 "google.golang.org/protobuf/proto"
48 )
49
50
51
52
53 func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats []stats.Handler) (ServerTransport, error) {
54 if r.Method != http.MethodPost {
55 w.Header().Set("Allow", http.MethodPost)
56 msg := fmt.Sprintf("invalid gRPC request method %q", r.Method)
57 http.Error(w, msg, http.StatusMethodNotAllowed)
58 return nil, errors.New(msg)
59 }
60 contentType := r.Header.Get("Content-Type")
61
62 contentSubtype, validContentType := grpcutil.ContentSubtype(contentType)
63 if !validContentType {
64 msg := fmt.Sprintf("invalid gRPC request content-type %q", contentType)
65 http.Error(w, msg, http.StatusUnsupportedMediaType)
66 return nil, errors.New(msg)
67 }
68 if r.ProtoMajor != 2 {
69 msg := "gRPC requires HTTP/2"
70 http.Error(w, msg, http.StatusHTTPVersionNotSupported)
71 return nil, errors.New(msg)
72 }
73 if _, ok := w.(http.Flusher); !ok {
74 msg := "gRPC requires a ResponseWriter supporting http.Flusher"
75 http.Error(w, msg, http.StatusInternalServerError)
76 return nil, errors.New(msg)
77 }
78
79 var localAddr net.Addr
80 if la := r.Context().Value(http.LocalAddrContextKey); la != nil {
81 localAddr, _ = la.(net.Addr)
82 }
83 var authInfo credentials.AuthInfo
84 if r.TLS != nil {
85 authInfo = credentials.TLSInfo{State: *r.TLS, CommonAuthInfo: credentials.CommonAuthInfo{SecurityLevel: credentials.PrivacyAndIntegrity}}
86 }
87 p := peer.Peer{
88 Addr: strAddr(r.RemoteAddr),
89 LocalAddr: localAddr,
90 AuthInfo: authInfo,
91 }
92 st := &serverHandlerTransport{
93 rw: w,
94 req: r,
95 closedCh: make(chan struct{}),
96 writes: make(chan func()),
97 peer: p,
98 contentType: contentType,
99 contentSubtype: contentSubtype,
100 stats: stats,
101 }
102 st.logger = prefixLoggerForServerHandlerTransport(st)
103
104 if v := r.Header.Get("grpc-timeout"); v != "" {
105 to, err := decodeTimeout(v)
106 if err != nil {
107 msg := fmt.Sprintf("malformed grpc-timeout: %v", err)
108 http.Error(w, msg, http.StatusBadRequest)
109 return nil, status.Error(codes.Internal, msg)
110 }
111 st.timeoutSet = true
112 st.timeout = to
113 }
114
115 metakv := []string{"content-type", contentType}
116 if r.Host != "" {
117 metakv = append(metakv, ":authority", r.Host)
118 }
119 for k, vv := range r.Header {
120 k = strings.ToLower(k)
121 if isReservedHeader(k) && !isWhitelistedHeader(k) {
122 continue
123 }
124 for _, v := range vv {
125 v, err := decodeMetadataHeader(k, v)
126 if err != nil {
127 msg := fmt.Sprintf("malformed binary metadata %q in header %q: %v", v, k, err)
128 http.Error(w, msg, http.StatusBadRequest)
129 return nil, status.Error(codes.Internal, msg)
130 }
131 metakv = append(metakv, k, v)
132 }
133 }
134 st.headerMD = metadata.Pairs(metakv...)
135
136 return st, nil
137 }
138
139
140
141
142
143
144 type serverHandlerTransport struct {
145 rw http.ResponseWriter
146 req *http.Request
147 timeoutSet bool
148 timeout time.Duration
149
150 headerMD metadata.MD
151
152 peer peer.Peer
153
154 closeOnce sync.Once
155 closedCh chan struct{}
156
157
158
159
160 writes chan func()
161
162
163
164 writeStatusMu sync.Mutex
165
166
167 contentType string
168
169
170 contentSubtype string
171
172 stats []stats.Handler
173 logger *grpclog.PrefixLogger
174 }
175
176 func (ht *serverHandlerTransport) Close(err error) {
177 ht.closeOnce.Do(func() {
178 if ht.logger.V(logLevel) {
179 ht.logger.Infof("Closing: %v", err)
180 }
181 close(ht.closedCh)
182 })
183 }
184
185 func (ht *serverHandlerTransport) Peer() *peer.Peer {
186 return &peer.Peer{
187 Addr: ht.peer.Addr,
188 LocalAddr: ht.peer.LocalAddr,
189 AuthInfo: ht.peer.AuthInfo,
190 }
191 }
192
193
194
195 type strAddr string
196
197 func (a strAddr) Network() string {
198 if a != "" {
199
200
201
202
203
204
205
206
207
208 return "tcp"
209 }
210 return ""
211 }
212
213 func (a strAddr) String() string { return string(a) }
214
215
216 func (ht *serverHandlerTransport) do(fn func()) error {
217 select {
218 case <-ht.closedCh:
219 return ErrConnClosing
220 case ht.writes <- fn:
221 return nil
222 }
223 }
224
225 func (ht *serverHandlerTransport) WriteStatus(s *Stream, st *status.Status) error {
226 ht.writeStatusMu.Lock()
227 defer ht.writeStatusMu.Unlock()
228
229 headersWritten := s.updateHeaderSent()
230 err := ht.do(func() {
231 if !headersWritten {
232 ht.writePendingHeaders(s)
233 }
234
235
236
237
238 ht.rw.(http.Flusher).Flush()
239
240 h := ht.rw.Header()
241 h.Set("Grpc-Status", fmt.Sprintf("%d", st.Code()))
242 if m := st.Message(); m != "" {
243 h.Set("Grpc-Message", encodeGrpcMessage(m))
244 }
245
246 s.hdrMu.Lock()
247 if p := st.Proto(); p != nil && len(p.Details) > 0 {
248 delete(s.trailer, grpcStatusDetailsBinHeader)
249 stBytes, err := proto.Marshal(p)
250 if err != nil {
251
252 panic(err)
253 }
254
255 h.Set(grpcStatusDetailsBinHeader, encodeBinHeader(stBytes))
256 }
257
258 if len(s.trailer) > 0 {
259 for k, vv := range s.trailer {
260
261 if isReservedHeader(k) {
262 continue
263 }
264 for _, v := range vv {
265
266
267 h.Add(http2.TrailerPrefix+k, encodeMetadataHeader(k, v))
268 }
269 }
270 }
271 s.hdrMu.Unlock()
272 })
273
274 if err == nil {
275
276
277 for _, sh := range ht.stats {
278 sh.HandleRPC(s.Context(), &stats.OutTrailer{
279 Trailer: s.trailer.Copy(),
280 })
281 }
282 }
283 ht.Close(errors.New("finished writing status"))
284 return err
285 }
286
287
288
289 func (ht *serverHandlerTransport) writePendingHeaders(s *Stream) {
290 ht.writeCommonHeaders(s)
291 ht.writeCustomHeaders(s)
292 }
293
294
295
296 func (ht *serverHandlerTransport) writeCommonHeaders(s *Stream) {
297 h := ht.rw.Header()
298 h["Date"] = nil
299 h.Set("Content-Type", ht.contentType)
300
301
302
303
304
305
306 h.Add("Trailer", "Grpc-Status")
307 h.Add("Trailer", "Grpc-Message")
308 h.Add("Trailer", "Grpc-Status-Details-Bin")
309
310 if s.sendCompress != "" {
311 h.Set("Grpc-Encoding", s.sendCompress)
312 }
313 }
314
315
316
317 func (ht *serverHandlerTransport) writeCustomHeaders(s *Stream) {
318 h := ht.rw.Header()
319
320 s.hdrMu.Lock()
321 for k, vv := range s.header {
322 if isReservedHeader(k) {
323 continue
324 }
325 for _, v := range vv {
326 h.Add(k, encodeMetadataHeader(k, v))
327 }
328 }
329
330 s.hdrMu.Unlock()
331 }
332
333 func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
334 headersWritten := s.updateHeaderSent()
335 return ht.do(func() {
336 if !headersWritten {
337 ht.writePendingHeaders(s)
338 }
339 ht.rw.Write(hdr)
340 ht.rw.Write(data)
341 ht.rw.(http.Flusher).Flush()
342 })
343 }
344
345 func (ht *serverHandlerTransport) WriteHeader(s *Stream, md metadata.MD) error {
346 if err := s.SetHeader(md); err != nil {
347 return err
348 }
349
350 headersWritten := s.updateHeaderSent()
351 err := ht.do(func() {
352 if !headersWritten {
353 ht.writePendingHeaders(s)
354 }
355
356 ht.rw.WriteHeader(200)
357 ht.rw.(http.Flusher).Flush()
358 })
359
360 if err == nil {
361 for _, sh := range ht.stats {
362
363
364 sh.HandleRPC(s.Context(), &stats.OutHeader{
365 Header: md.Copy(),
366 Compression: s.sendCompress,
367 })
368 }
369 }
370 return err
371 }
372
373 func (ht *serverHandlerTransport) HandleStreams(ctx context.Context, startStream func(*Stream)) {
374
375 var cancel context.CancelFunc
376 if ht.timeoutSet {
377 ctx, cancel = context.WithTimeout(ctx, ht.timeout)
378 } else {
379 ctx, cancel = context.WithCancel(ctx)
380 }
381
382
383 requestOver := make(chan struct{})
384 go func() {
385 select {
386 case <-requestOver:
387 case <-ht.closedCh:
388 case <-ht.req.Context().Done():
389 }
390 cancel()
391 ht.Close(errors.New("request is done processing"))
392 }()
393
394 ctx = metadata.NewIncomingContext(ctx, ht.headerMD)
395 req := ht.req
396 s := &Stream{
397 id: 0,
398 ctx: ctx,
399 requestRead: func(int) {},
400 cancel: cancel,
401 buf: newRecvBuffer(),
402 st: ht,
403 method: req.URL.Path,
404 recvCompress: req.Header.Get("grpc-encoding"),
405 contentSubtype: ht.contentSubtype,
406 headerWireLength: 0,
407 }
408 s.trReader = &transportReader{
409 reader: &recvBufferReader{ctx: s.ctx, ctxDone: s.ctx.Done(), recv: s.buf, freeBuffer: func(*bytes.Buffer) {}},
410 windowHandler: func(int) {},
411 }
412
413
414 readerDone := make(chan struct{})
415 go func() {
416 defer close(readerDone)
417
418
419 const readSize = 8196
420 for buf := make([]byte, readSize); ; {
421 n, err := req.Body.Read(buf)
422 if n > 0 {
423 s.buf.put(recvMsg{buffer: bytes.NewBuffer(buf[:n:n])})
424 buf = buf[n:]
425 }
426 if err != nil {
427 s.buf.put(recvMsg{err: mapRecvMsgError(err)})
428 return
429 }
430 if len(buf) == 0 {
431 buf = make([]byte, readSize)
432 }
433 }
434 }()
435
436
437
438
439
440 startStream(s)
441
442 ht.runStream()
443 close(requestOver)
444
445
446 req.Body.Close()
447 <-readerDone
448 }
449
450 func (ht *serverHandlerTransport) runStream() {
451 for {
452 select {
453 case fn := <-ht.writes:
454 fn()
455 case <-ht.closedCh:
456 return
457 }
458 }
459 }
460
461 func (ht *serverHandlerTransport) IncrMsgSent() {}
462
463 func (ht *serverHandlerTransport) IncrMsgRecv() {}
464
465 func (ht *serverHandlerTransport) Drain(debugData string) {
466 panic("Drain() is not implemented")
467 }
468
469
470
471
472
473
474
475
476 func mapRecvMsgError(err error) error {
477 if err == io.EOF || err == io.ErrUnexpectedEOF {
478 return err
479 }
480 if se, ok := err.(http2.StreamError); ok {
481 if code, ok := http2ErrConvTab[se.Code]; ok {
482 return status.Error(code, se.Error())
483 }
484 }
485 if strings.Contains(err.Error(), "body closed by handler") {
486 return status.Error(codes.Canceled, err.Error())
487 }
488 return connectionErrorf(true, err, err.Error())
489 }
490
View as plain text