1
18
19 package transport
20
21 import (
22 "bytes"
23 "context"
24 "errors"
25 "fmt"
26 "io"
27 "math"
28 "net"
29 "net/http"
30 "strconv"
31 "sync"
32 "sync/atomic"
33 "time"
34
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37 "google.golang.org/grpc/internal/grpclog"
38 "google.golang.org/grpc/internal/grpcutil"
39 "google.golang.org/grpc/internal/pretty"
40 "google.golang.org/grpc/internal/syscall"
41 "google.golang.org/protobuf/proto"
42
43 "google.golang.org/grpc/codes"
44 "google.golang.org/grpc/credentials"
45 "google.golang.org/grpc/internal/channelz"
46 "google.golang.org/grpc/internal/grpcrand"
47 "google.golang.org/grpc/internal/grpcsync"
48 "google.golang.org/grpc/keepalive"
49 "google.golang.org/grpc/metadata"
50 "google.golang.org/grpc/peer"
51 "google.golang.org/grpc/stats"
52 "google.golang.org/grpc/status"
53 "google.golang.org/grpc/tap"
54 )
55
56 var (
57
58
59 ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
60
61
62 ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
63 )
64
65
66
67 var serverConnectionCounter uint64
68
69
70 type http2Server struct {
71 lastRead int64
72 done chan struct{}
73 conn net.Conn
74 loopy *loopyWriter
75 readerDone chan struct{}
76 loopyWriterDone chan struct{}
77 peer peer.Peer
78 inTapHandle tap.ServerInHandle
79 framer *framer
80
81 maxStreams uint32
82
83
84 controlBuf *controlBuffer
85 fc *trInFlow
86 stats []stats.Handler
87
88 kp keepalive.ServerParameters
89
90 kep keepalive.EnforcementPolicy
91
92 lastPingAt time.Time
93
94 pingStrikes uint8
95
96
97
98 resetPingStrikes uint32
99 initialWindowSize int32
100 bdpEst *bdpEstimator
101 maxSendHeaderListSize *uint32
102
103 mu sync.Mutex
104
105
106
107
108
109
110
111 drainEvent *grpcsync.Event
112 state transportState
113 activeStreams map[uint32]*Stream
114
115
116
117
118 idle time.Time
119
120
121 channelz *channelz.Socket
122 bufferPool *bufferPool
123
124 connectionID uint64
125
126
127
128 maxStreamMu sync.Mutex
129 maxStreamID uint32
130
131 logger *grpclog.PrefixLogger
132 }
133
134
135
136
137
138
139
140
141 func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
142 var authInfo credentials.AuthInfo
143 rawConn := conn
144 if config.Credentials != nil {
145 var err error
146 conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
147 if err != nil {
148
149
150
151
152 if err == credentials.ErrConnDispatched || err == io.EOF {
153 return nil, err
154 }
155 return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
156 }
157 }
158 writeBufSize := config.WriteBufferSize
159 readBufSize := config.ReadBufferSize
160 maxHeaderListSize := defaultServerMaxHeaderListSize
161 if config.MaxHeaderListSize != nil {
162 maxHeaderListSize = *config.MaxHeaderListSize
163 }
164 framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
165
166 isettings := []http2.Setting{{
167 ID: http2.SettingMaxFrameSize,
168 Val: http2MaxFrameLen,
169 }}
170 if config.MaxStreams != math.MaxUint32 {
171 isettings = append(isettings, http2.Setting{
172 ID: http2.SettingMaxConcurrentStreams,
173 Val: config.MaxStreams,
174 })
175 }
176 dynamicWindow := true
177 iwz := int32(initialWindowSize)
178 if config.InitialWindowSize >= defaultWindowSize {
179 iwz = config.InitialWindowSize
180 dynamicWindow = false
181 }
182 icwz := int32(initialWindowSize)
183 if config.InitialConnWindowSize >= defaultWindowSize {
184 icwz = config.InitialConnWindowSize
185 dynamicWindow = false
186 }
187 if iwz != defaultWindowSize {
188 isettings = append(isettings, http2.Setting{
189 ID: http2.SettingInitialWindowSize,
190 Val: uint32(iwz)})
191 }
192 if config.MaxHeaderListSize != nil {
193 isettings = append(isettings, http2.Setting{
194 ID: http2.SettingMaxHeaderListSize,
195 Val: *config.MaxHeaderListSize,
196 })
197 }
198 if config.HeaderTableSize != nil {
199 isettings = append(isettings, http2.Setting{
200 ID: http2.SettingHeaderTableSize,
201 Val: *config.HeaderTableSize,
202 })
203 }
204 if err := framer.fr.WriteSettings(isettings...); err != nil {
205 return nil, connectionErrorf(false, err, "transport: %v", err)
206 }
207
208 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
209 if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
210 return nil, connectionErrorf(false, err, "transport: %v", err)
211 }
212 }
213 kp := config.KeepaliveParams
214 if kp.MaxConnectionIdle == 0 {
215 kp.MaxConnectionIdle = defaultMaxConnectionIdle
216 }
217 if kp.MaxConnectionAge == 0 {
218 kp.MaxConnectionAge = defaultMaxConnectionAge
219 }
220
221 kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
222 if kp.MaxConnectionAgeGrace == 0 {
223 kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
224 }
225 if kp.Time == 0 {
226 kp.Time = defaultServerKeepaliveTime
227 }
228 if kp.Timeout == 0 {
229 kp.Timeout = defaultServerKeepaliveTimeout
230 }
231 if kp.Time != infinity {
232 if err = syscall.SetTCPUserTimeout(rawConn, kp.Timeout); err != nil {
233 return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
234 }
235 }
236 kep := config.KeepalivePolicy
237 if kep.MinTime == 0 {
238 kep.MinTime = defaultKeepalivePolicyMinTime
239 }
240
241 done := make(chan struct{})
242 peer := peer.Peer{
243 Addr: conn.RemoteAddr(),
244 LocalAddr: conn.LocalAddr(),
245 AuthInfo: authInfo,
246 }
247 t := &http2Server{
248 done: done,
249 conn: conn,
250 peer: peer,
251 framer: framer,
252 readerDone: make(chan struct{}),
253 loopyWriterDone: make(chan struct{}),
254 maxStreams: config.MaxStreams,
255 inTapHandle: config.InTapHandle,
256 fc: &trInFlow{limit: uint32(icwz)},
257 state: reachable,
258 activeStreams: make(map[uint32]*Stream),
259 stats: config.StatsHandlers,
260 kp: kp,
261 idle: time.Now(),
262 kep: kep,
263 initialWindowSize: iwz,
264 bufferPool: newBufferPool(),
265 }
266 var czSecurity credentials.ChannelzSecurityValue
267 if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
268 czSecurity = au.GetSecurityValue()
269 }
270 t.channelz = channelz.RegisterSocket(
271 &channelz.Socket{
272 SocketType: channelz.SocketTypeNormal,
273 Parent: config.ChannelzParent,
274 SocketMetrics: channelz.SocketMetrics{},
275 EphemeralMetrics: t.socketMetrics,
276 LocalAddr: t.peer.LocalAddr,
277 RemoteAddr: t.peer.Addr,
278 SocketOptions: channelz.GetSocketOption(t.conn),
279 Security: czSecurity,
280 },
281 )
282 t.logger = prefixLoggerForServerTransport(t)
283
284 t.controlBuf = newControlBuffer(t.done)
285 if dynamicWindow {
286 t.bdpEst = &bdpEstimator{
287 bdp: initialWindowSize,
288 updateFlowControl: t.updateFlowControl,
289 }
290 }
291
292 t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
293 t.framer.writer.Flush()
294
295 defer func() {
296 if err != nil {
297 t.Close(err)
298 }
299 }()
300
301
302 preface := make([]byte, len(clientPreface))
303 if _, err := io.ReadFull(t.conn, preface); err != nil {
304
305
306
307
308
309 if err == io.EOF {
310 return nil, io.EOF
311 }
312 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
313 }
314 if !bytes.Equal(preface, clientPreface) {
315 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
316 }
317
318 frame, err := t.framer.fr.ReadFrame()
319 if err == io.EOF || err == io.ErrUnexpectedEOF {
320 return nil, err
321 }
322 if err != nil {
323 return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
324 }
325 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
326 sf, ok := frame.(*http2.SettingsFrame)
327 if !ok {
328 return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
329 }
330 t.handleSettings(sf)
331
332 go func() {
333 t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
334 err := t.loopy.run()
335 close(t.loopyWriterDone)
336 if !isIOError(err) {
337
338
339
340
341
342
343
344
345
346 timer := time.NewTimer(time.Second)
347 defer timer.Stop()
348 select {
349 case <-t.readerDone:
350 case <-timer.C:
351 }
352 t.conn.Close()
353 }
354 }()
355 go t.keepalive()
356 return t, nil
357 }
358
359
360
361 func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
362
363 t.maxStreamMu.Lock()
364 defer t.maxStreamMu.Unlock()
365
366 streamID := frame.Header().StreamID
367
368
369
370 if frame.Truncated {
371 t.controlBuf.put(&cleanupStream{
372 streamID: streamID,
373 rst: true,
374 rstCode: http2.ErrCodeFrameSize,
375 onWrite: func() {},
376 })
377 return nil
378 }
379
380 if streamID%2 != 1 || streamID <= t.maxStreamID {
381
382 return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
383 }
384 t.maxStreamID = streamID
385
386 buf := newRecvBuffer()
387 s := &Stream{
388 id: streamID,
389 st: t,
390 buf: buf,
391 fc: &inFlow{limit: uint32(t.initialWindowSize)},
392 headerWireLength: int(frame.Header().Length),
393 }
394 var (
395
396 isGRPC = false
397 contentType = ""
398 mdata = make(metadata.MD, len(frame.Fields))
399 httpMethod string
400
401 protocolError bool
402 headerError *status.Status
403
404 timeoutSet bool
405 timeout time.Duration
406 )
407
408 for _, hf := range frame.Fields {
409 switch hf.Name {
410 case "content-type":
411 contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
412 if !validContentType {
413 contentType = hf.Value
414 break
415 }
416 mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
417 s.contentSubtype = contentSubtype
418 isGRPC = true
419
420 case "grpc-accept-encoding":
421 mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
422 if hf.Value == "" {
423 continue
424 }
425 compressors := hf.Value
426 if s.clientAdvertisedCompressors != "" {
427 compressors = s.clientAdvertisedCompressors + "," + compressors
428 }
429 s.clientAdvertisedCompressors = compressors
430 case "grpc-encoding":
431 s.recvCompress = hf.Value
432 case ":method":
433 httpMethod = hf.Value
434 case ":path":
435 s.method = hf.Value
436 case "grpc-timeout":
437 timeoutSet = true
438 var err error
439 if timeout, err = decodeTimeout(hf.Value); err != nil {
440 headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
441 }
442
443
444 case "connection":
445 if t.logger.V(logLevel) {
446 t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
447 }
448 protocolError = true
449 default:
450 if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
451 break
452 }
453 v, err := decodeMetadataHeader(hf.Name, hf.Value)
454 if err != nil {
455 headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
456 t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
457 break
458 }
459 mdata[hf.Name] = append(mdata[hf.Name], v)
460 }
461 }
462
463
464
465
466
467
468 if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
469 errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
470 if t.logger.V(logLevel) {
471 t.logger.Infof("Aborting the stream early: %v", errMsg)
472 }
473 t.controlBuf.put(&earlyAbortStream{
474 httpStatus: http.StatusBadRequest,
475 streamID: streamID,
476 contentSubtype: s.contentSubtype,
477 status: status.New(codes.Internal, errMsg),
478 rst: !frame.StreamEnded(),
479 })
480 return nil
481 }
482
483 if protocolError {
484 t.controlBuf.put(&cleanupStream{
485 streamID: streamID,
486 rst: true,
487 rstCode: http2.ErrCodeProtocol,
488 onWrite: func() {},
489 })
490 return nil
491 }
492 if !isGRPC {
493 t.controlBuf.put(&earlyAbortStream{
494 httpStatus: http.StatusUnsupportedMediaType,
495 streamID: streamID,
496 contentSubtype: s.contentSubtype,
497 status: status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
498 rst: !frame.StreamEnded(),
499 })
500 return nil
501 }
502 if headerError != nil {
503 t.controlBuf.put(&earlyAbortStream{
504 httpStatus: http.StatusBadRequest,
505 streamID: streamID,
506 contentSubtype: s.contentSubtype,
507 status: headerError,
508 rst: !frame.StreamEnded(),
509 })
510 return nil
511 }
512
513
514 if len(mdata[":authority"]) == 0 {
515
516
517 if host, ok := mdata["host"]; ok {
518 mdata[":authority"] = host
519 delete(mdata, "host")
520 }
521 } else {
522
523 delete(mdata, "host")
524 }
525
526 if frame.StreamEnded() {
527
528 s.state = streamReadDone
529 }
530 if timeoutSet {
531 s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
532 } else {
533 s.ctx, s.cancel = context.WithCancel(ctx)
534 }
535
536
537 if len(mdata) > 0 {
538 s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
539 if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 {
540 s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1]))
541 }
542 if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 {
543 s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1]))
544 }
545 }
546 t.mu.Lock()
547 if t.state != reachable {
548 t.mu.Unlock()
549 s.cancel()
550 return nil
551 }
552 if uint32(len(t.activeStreams)) >= t.maxStreams {
553 t.mu.Unlock()
554 t.controlBuf.put(&cleanupStream{
555 streamID: streamID,
556 rst: true,
557 rstCode: http2.ErrCodeRefusedStream,
558 onWrite: func() {},
559 })
560 s.cancel()
561 return nil
562 }
563 if httpMethod != http.MethodPost {
564 t.mu.Unlock()
565 errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
566 if t.logger.V(logLevel) {
567 t.logger.Infof("Aborting the stream early: %v", errMsg)
568 }
569 t.controlBuf.put(&earlyAbortStream{
570 httpStatus: 405,
571 streamID: streamID,
572 contentSubtype: s.contentSubtype,
573 status: status.New(codes.Internal, errMsg),
574 rst: !frame.StreamEnded(),
575 })
576 s.cancel()
577 return nil
578 }
579 if t.inTapHandle != nil {
580 var err error
581 if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method, Header: mdata}); err != nil {
582 t.mu.Unlock()
583 if t.logger.V(logLevel) {
584 t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
585 }
586 stat, ok := status.FromError(err)
587 if !ok {
588 stat = status.New(codes.PermissionDenied, err.Error())
589 }
590 t.controlBuf.put(&earlyAbortStream{
591 httpStatus: 200,
592 streamID: s.id,
593 contentSubtype: s.contentSubtype,
594 status: stat,
595 rst: !frame.StreamEnded(),
596 })
597 return nil
598 }
599 }
600 t.activeStreams[streamID] = s
601 if len(t.activeStreams) == 1 {
602 t.idle = time.Time{}
603 }
604 t.mu.Unlock()
605 if channelz.IsOn() {
606 t.channelz.SocketMetrics.StreamsStarted.Add(1)
607 t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
608 }
609 s.requestRead = func(n int) {
610 t.adjustWindow(s, uint32(n))
611 }
612 s.ctxDone = s.ctx.Done()
613 s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
614 s.trReader = &transportReader{
615 reader: &recvBufferReader{
616 ctx: s.ctx,
617 ctxDone: s.ctxDone,
618 recv: s.buf,
619 freeBuffer: t.bufferPool.put,
620 },
621 windowHandler: func(n int) {
622 t.updateWindow(s, uint32(n))
623 },
624 }
625
626 t.controlBuf.put(®isterStream{
627 streamID: s.id,
628 wq: s.wq,
629 })
630 handle(s)
631 return nil
632 }
633
634
635
636
637 func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
638 defer func() {
639 close(t.readerDone)
640 <-t.loopyWriterDone
641 }()
642 for {
643 t.controlBuf.throttle()
644 frame, err := t.framer.fr.ReadFrame()
645 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
646 if err != nil {
647 if se, ok := err.(http2.StreamError); ok {
648 if t.logger.V(logLevel) {
649 t.logger.Warningf("Encountered http2.StreamError: %v", se)
650 }
651 t.mu.Lock()
652 s := t.activeStreams[se.StreamID]
653 t.mu.Unlock()
654 if s != nil {
655 t.closeStream(s, true, se.Code, false)
656 } else {
657 t.controlBuf.put(&cleanupStream{
658 streamID: se.StreamID,
659 rst: true,
660 rstCode: se.Code,
661 onWrite: func() {},
662 })
663 }
664 continue
665 }
666 t.Close(err)
667 return
668 }
669 switch frame := frame.(type) {
670 case *http2.MetaHeadersFrame:
671 if err := t.operateHeaders(ctx, frame, handle); err != nil {
672
673
674 t.controlBuf.put(&goAway{
675 code: http2.ErrCodeProtocol,
676 debugData: []byte(err.Error()),
677 closeConn: err,
678 })
679 continue
680 }
681 case *http2.DataFrame:
682 t.handleData(frame)
683 case *http2.RSTStreamFrame:
684 t.handleRSTStream(frame)
685 case *http2.SettingsFrame:
686 t.handleSettings(frame)
687 case *http2.PingFrame:
688 t.handlePing(frame)
689 case *http2.WindowUpdateFrame:
690 t.handleWindowUpdate(frame)
691 case *http2.GoAwayFrame:
692
693 default:
694 if t.logger.V(logLevel) {
695 t.logger.Infof("Received unsupported frame type %T", frame)
696 }
697 }
698 }
699 }
700
701 func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
702 t.mu.Lock()
703 defer t.mu.Unlock()
704 if t.activeStreams == nil {
705
706 return nil, false
707 }
708 s, ok := t.activeStreams[f.Header().StreamID]
709 if !ok {
710
711 return nil, false
712 }
713 return s, true
714 }
715
716
717
718
719 func (t *http2Server) adjustWindow(s *Stream, n uint32) {
720 if w := s.fc.maybeAdjust(n); w > 0 {
721 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
722 }
723
724 }
725
726
727
728
729 func (t *http2Server) updateWindow(s *Stream, n uint32) {
730 if w := s.fc.onRead(n); w > 0 {
731 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
732 increment: w,
733 })
734 }
735 }
736
737
738
739
740 func (t *http2Server) updateFlowControl(n uint32) {
741 t.mu.Lock()
742 for _, s := range t.activeStreams {
743 s.fc.newLimit(n)
744 }
745 t.initialWindowSize = int32(n)
746 t.mu.Unlock()
747 t.controlBuf.put(&outgoingWindowUpdate{
748 streamID: 0,
749 increment: t.fc.newLimit(n),
750 })
751 t.controlBuf.put(&outgoingSettings{
752 ss: []http2.Setting{
753 {
754 ID: http2.SettingInitialWindowSize,
755 Val: n,
756 },
757 },
758 })
759
760 }
761
762 func (t *http2Server) handleData(f *http2.DataFrame) {
763 size := f.Header().Length
764 var sendBDPPing bool
765 if t.bdpEst != nil {
766 sendBDPPing = t.bdpEst.add(size)
767 }
768
769
770
771
772
773
774
775
776 if w := t.fc.onData(size); w > 0 {
777 t.controlBuf.put(&outgoingWindowUpdate{
778 streamID: 0,
779 increment: w,
780 })
781 }
782 if sendBDPPing {
783
784
785 if w := t.fc.reset(); w > 0 {
786 t.controlBuf.put(&outgoingWindowUpdate{
787 streamID: 0,
788 increment: w,
789 })
790 }
791 t.controlBuf.put(bdpPing)
792 }
793
794 s, ok := t.getStream(f)
795 if !ok {
796 return
797 }
798 if s.getState() == streamReadDone {
799 t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
800 return
801 }
802 if size > 0 {
803 if err := s.fc.onData(size); err != nil {
804 t.closeStream(s, true, http2.ErrCodeFlowControl, false)
805 return
806 }
807 if f.Header().Flags.Has(http2.FlagDataPadded) {
808 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
809 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
810 }
811 }
812
813
814
815 if len(f.Data()) > 0 {
816 buffer := t.bufferPool.get()
817 buffer.Reset()
818 buffer.Write(f.Data())
819 s.write(recvMsg{buffer: buffer})
820 }
821 }
822 if f.StreamEnded() {
823
824 s.compareAndSwapState(streamActive, streamReadDone)
825 s.write(recvMsg{err: io.EOF})
826 }
827 }
828
829 func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
830
831 if s, ok := t.getStream(f); ok {
832 t.closeStream(s, false, 0, false)
833 return
834 }
835
836 t.controlBuf.put(&cleanupStream{
837 streamID: f.Header().StreamID,
838 rst: false,
839 rstCode: 0,
840 onWrite: func() {},
841 })
842 }
843
844 func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
845 if f.IsAck() {
846 return
847 }
848 var ss []http2.Setting
849 var updateFuncs []func()
850 f.ForeachSetting(func(s http2.Setting) error {
851 switch s.ID {
852 case http2.SettingMaxHeaderListSize:
853 updateFuncs = append(updateFuncs, func() {
854 t.maxSendHeaderListSize = new(uint32)
855 *t.maxSendHeaderListSize = s.Val
856 })
857 default:
858 ss = append(ss, s)
859 }
860 return nil
861 })
862 t.controlBuf.executeAndPut(func() bool {
863 for _, f := range updateFuncs {
864 f()
865 }
866 return true
867 }, &incomingSettings{
868 ss: ss,
869 })
870 }
871
872 const (
873 maxPingStrikes = 2
874 defaultPingTimeout = 2 * time.Hour
875 )
876
877 func (t *http2Server) handlePing(f *http2.PingFrame) {
878 if f.IsAck() {
879 if f.Data == goAwayPing.data && t.drainEvent != nil {
880 t.drainEvent.Fire()
881 return
882 }
883
884 if t.bdpEst != nil {
885 t.bdpEst.calculate(f.Data)
886 }
887 return
888 }
889 pingAck := &ping{ack: true}
890 copy(pingAck.data[:], f.Data[:])
891 t.controlBuf.put(pingAck)
892
893 now := time.Now()
894 defer func() {
895 t.lastPingAt = now
896 }()
897
898
899
900 if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
901 t.pingStrikes = 0
902 return
903 }
904 t.mu.Lock()
905 ns := len(t.activeStreams)
906 t.mu.Unlock()
907 if ns < 1 && !t.kep.PermitWithoutStream {
908
909
910 if t.lastPingAt.Add(defaultPingTimeout).After(now) {
911 t.pingStrikes++
912 }
913 } else {
914
915 if t.lastPingAt.Add(t.kep.MinTime).After(now) {
916 t.pingStrikes++
917 }
918 }
919
920 if t.pingStrikes > maxPingStrikes {
921
922 t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
923 }
924 }
925
926 func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
927 t.controlBuf.put(&incomingWindowUpdate{
928 streamID: f.Header().StreamID,
929 increment: f.Increment,
930 })
931 }
932
933 func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
934 for k, vv := range md {
935 if isReservedHeader(k) {
936
937 continue
938 }
939 for _, v := range vv {
940 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
941 }
942 }
943 return headerFields
944 }
945
946 func (t *http2Server) checkForHeaderListSize(it any) bool {
947 if t.maxSendHeaderListSize == nil {
948 return true
949 }
950 hdrFrame := it.(*headerFrame)
951 var sz int64
952 for _, f := range hdrFrame.hf {
953 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
954 if t.logger.V(logLevel) {
955 t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
956 }
957 return false
958 }
959 }
960 return true
961 }
962
963 func (t *http2Server) streamContextErr(s *Stream) error {
964 select {
965 case <-t.done:
966 return ErrConnClosing
967 default:
968 }
969 return ContextErr(s.ctx.Err())
970 }
971
972
973 func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
974 s.hdrMu.Lock()
975 defer s.hdrMu.Unlock()
976 if s.getState() == streamDone {
977 return t.streamContextErr(s)
978 }
979
980 if s.updateHeaderSent() {
981 return ErrIllegalHeaderWrite
982 }
983
984 if md.Len() > 0 {
985 if s.header.Len() > 0 {
986 s.header = metadata.Join(s.header, md)
987 } else {
988 s.header = md
989 }
990 }
991 if err := t.writeHeaderLocked(s); err != nil {
992 switch e := err.(type) {
993 case ConnectionError:
994 return status.Error(codes.Unavailable, e.Desc)
995 default:
996 return status.Convert(err).Err()
997 }
998 }
999 return nil
1000 }
1001
1002 func (t *http2Server) setResetPingStrikes() {
1003 atomic.StoreUint32(&t.resetPingStrikes, 1)
1004 }
1005
1006 func (t *http2Server) writeHeaderLocked(s *Stream) error {
1007
1008
1009 headerFields := make([]hpack.HeaderField, 0, 2)
1010 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
1011 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
1012 if s.sendCompress != "" {
1013 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
1014 }
1015 headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
1016 hf := &headerFrame{
1017 streamID: s.id,
1018 hf: headerFields,
1019 endStream: false,
1020 onWrite: t.setResetPingStrikes,
1021 }
1022 success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf)
1023 if !success {
1024 if err != nil {
1025 return err
1026 }
1027 t.closeStream(s, true, http2.ErrCodeInternal, false)
1028 return ErrHeaderListSizeLimitViolation
1029 }
1030 for _, sh := range t.stats {
1031
1032
1033 outHeader := &stats.OutHeader{
1034 Header: s.header.Copy(),
1035 Compression: s.sendCompress,
1036 }
1037 sh.HandleRPC(s.Context(), outHeader)
1038 }
1039 return nil
1040 }
1041
1042
1043
1044
1045
1046 func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
1047 s.hdrMu.Lock()
1048 defer s.hdrMu.Unlock()
1049
1050 if s.getState() == streamDone {
1051 return nil
1052 }
1053
1054
1055
1056 headerFields := make([]hpack.HeaderField, 0, 2)
1057 if !s.updateHeaderSent() {
1058 if len(s.header) > 0 {
1059 if err := t.writeHeaderLocked(s); err != nil {
1060 return err
1061 }
1062 } else {
1063 headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
1064 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
1065 }
1066 }
1067 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
1068 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
1069
1070 if p := st.Proto(); p != nil && len(p.Details) > 0 {
1071
1072
1073 delete(s.trailer, grpcStatusDetailsBinHeader)
1074 stBytes, err := proto.Marshal(p)
1075 if err != nil {
1076
1077 t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
1078 } else {
1079 headerFields = append(headerFields, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)})
1080 }
1081 }
1082
1083
1084 headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
1085 trailingHeader := &headerFrame{
1086 streamID: s.id,
1087 hf: headerFields,
1088 endStream: true,
1089 onWrite: t.setResetPingStrikes,
1090 }
1091
1092 success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
1093 if !success {
1094 if err != nil {
1095 return err
1096 }
1097 t.closeStream(s, true, http2.ErrCodeInternal, false)
1098 return ErrHeaderListSizeLimitViolation
1099 }
1100
1101 rst := s.getState() == streamActive
1102 t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
1103 for _, sh := range t.stats {
1104
1105
1106 sh.HandleRPC(s.Context(), &stats.OutTrailer{
1107 Trailer: s.trailer.Copy(),
1108 })
1109 }
1110 return nil
1111 }
1112
1113
1114
1115 func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
1116 if !s.isHeaderSent() {
1117 if err := t.WriteHeader(s, nil); err != nil {
1118 return err
1119 }
1120 } else {
1121
1122 if s.getState() == streamDone {
1123 return t.streamContextErr(s)
1124 }
1125 }
1126 df := &dataFrame{
1127 streamID: s.id,
1128 h: hdr,
1129 d: data,
1130 onEachWrite: t.setResetPingStrikes,
1131 }
1132 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
1133 return t.streamContextErr(s)
1134 }
1135 return t.controlBuf.put(df)
1136 }
1137
1138
1139
1140
1141
1142
1143
1144 func (t *http2Server) keepalive() {
1145 p := &ping{}
1146
1147 outstandingPing := false
1148
1149
1150 kpTimeoutLeft := time.Duration(0)
1151
1152
1153 prevNano := time.Now().UnixNano()
1154
1155 idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
1156 ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
1157 kpTimer := time.NewTimer(t.kp.Time)
1158 defer func() {
1159
1160
1161
1162 idleTimer.Stop()
1163 ageTimer.Stop()
1164 kpTimer.Stop()
1165 }()
1166
1167 for {
1168 select {
1169 case <-idleTimer.C:
1170 t.mu.Lock()
1171 idle := t.idle
1172 if idle.IsZero() {
1173 t.mu.Unlock()
1174 idleTimer.Reset(t.kp.MaxConnectionIdle)
1175 continue
1176 }
1177 val := t.kp.MaxConnectionIdle - time.Since(idle)
1178 t.mu.Unlock()
1179 if val <= 0 {
1180
1181
1182 t.Drain("max_idle")
1183 return
1184 }
1185 idleTimer.Reset(val)
1186 case <-ageTimer.C:
1187 t.Drain("max_age")
1188 ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
1189 select {
1190 case <-ageTimer.C:
1191
1192 if t.logger.V(logLevel) {
1193 t.logger.Infof("Closing server transport due to maximum connection age")
1194 }
1195 t.controlBuf.put(closeConnection{})
1196 case <-t.done:
1197 }
1198 return
1199 case <-kpTimer.C:
1200 lastRead := atomic.LoadInt64(&t.lastRead)
1201 if lastRead > prevNano {
1202
1203
1204
1205 outstandingPing = false
1206 kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1207 prevNano = lastRead
1208 continue
1209 }
1210 if outstandingPing && kpTimeoutLeft <= 0 {
1211 t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Timeout))
1212 return
1213 }
1214 if !outstandingPing {
1215 if channelz.IsOn() {
1216 t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
1217 }
1218 t.controlBuf.put(p)
1219 kpTimeoutLeft = t.kp.Timeout
1220 outstandingPing = true
1221 }
1222
1223
1224
1225
1226 sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
1227 kpTimeoutLeft -= sleepDuration
1228 kpTimer.Reset(sleepDuration)
1229 case <-t.done:
1230 return
1231 }
1232 }
1233 }
1234
1235
1236
1237
1238 func (t *http2Server) Close(err error) {
1239 t.mu.Lock()
1240 if t.state == closing {
1241 t.mu.Unlock()
1242 return
1243 }
1244 if t.logger.V(logLevel) {
1245 t.logger.Infof("Closing: %v", err)
1246 }
1247 t.state = closing
1248 streams := t.activeStreams
1249 t.activeStreams = nil
1250 t.mu.Unlock()
1251 t.controlBuf.finish()
1252 close(t.done)
1253 if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
1254 t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
1255 }
1256 channelz.RemoveEntry(t.channelz.ID)
1257
1258 for _, s := range streams {
1259 s.cancel()
1260 }
1261 }
1262
1263
1264 func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
1265
1266 t.mu.Lock()
1267 if _, ok := t.activeStreams[s.id]; ok {
1268 delete(t.activeStreams, s.id)
1269 if len(t.activeStreams) == 0 {
1270 t.idle = time.Now()
1271 }
1272 }
1273 t.mu.Unlock()
1274
1275 if channelz.IsOn() {
1276 if eosReceived {
1277 t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
1278 } else {
1279 t.channelz.SocketMetrics.StreamsFailed.Add(1)
1280 }
1281 }
1282 }
1283
1284
1285 func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
1286
1287
1288
1289 s.cancel()
1290
1291 oldState := s.swapState(streamDone)
1292 if oldState == streamDone {
1293
1294 return
1295 }
1296
1297 hdr.cleanup = &cleanupStream{
1298 streamID: s.id,
1299 rst: rst,
1300 rstCode: rstCode,
1301 onWrite: func() {
1302 t.deleteStream(s, eosReceived)
1303 },
1304 }
1305 t.controlBuf.put(hdr)
1306 }
1307
1308
1309 func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
1310
1311
1312
1313 s.cancel()
1314
1315 s.swapState(streamDone)
1316 t.deleteStream(s, eosReceived)
1317
1318 t.controlBuf.put(&cleanupStream{
1319 streamID: s.id,
1320 rst: rst,
1321 rstCode: rstCode,
1322 onWrite: func() {},
1323 })
1324 }
1325
1326 func (t *http2Server) Drain(debugData string) {
1327 t.mu.Lock()
1328 defer t.mu.Unlock()
1329 if t.drainEvent != nil {
1330 return
1331 }
1332 t.drainEvent = grpcsync.NewEvent()
1333 t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
1334 }
1335
1336 var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
1337
1338
1339
1340 func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
1341 t.maxStreamMu.Lock()
1342 t.mu.Lock()
1343 if t.state == closing {
1344 t.mu.Unlock()
1345 t.maxStreamMu.Unlock()
1346
1347 return false, ErrConnClosing
1348 }
1349 if !g.headsUp {
1350
1351 t.state = draining
1352 sid := t.maxStreamID
1353 retErr := g.closeConn
1354 if len(t.activeStreams) == 0 {
1355 retErr = errors.New("second GOAWAY written and no active streams left to process")
1356 }
1357 t.mu.Unlock()
1358 t.maxStreamMu.Unlock()
1359 if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
1360 return false, err
1361 }
1362 t.framer.writer.Flush()
1363 if retErr != nil {
1364 return false, retErr
1365 }
1366 return true, nil
1367 }
1368 t.mu.Unlock()
1369 t.maxStreamMu.Unlock()
1370
1371
1372
1373
1374
1375
1376 if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
1377 return false, err
1378 }
1379 if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
1380 return false, err
1381 }
1382 go func() {
1383 timer := time.NewTimer(5 * time.Second)
1384 defer timer.Stop()
1385 select {
1386 case <-t.drainEvent.Done():
1387 case <-timer.C:
1388 case <-t.done:
1389 return
1390 }
1391 t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
1392 }()
1393 return false, nil
1394 }
1395
1396 func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {
1397 return &channelz.EphemeralSocketMetrics{
1398 LocalFlowControlWindow: int64(t.fc.getSize()),
1399 RemoteFlowControlWindow: t.getOutFlowWindow(),
1400 }
1401 }
1402
1403 func (t *http2Server) IncrMsgSent() {
1404 t.channelz.SocketMetrics.MessagesSent.Add(1)
1405 t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
1406 }
1407
1408 func (t *http2Server) IncrMsgRecv() {
1409 t.channelz.SocketMetrics.MessagesReceived.Add(1)
1410 t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
1411 }
1412
1413 func (t *http2Server) getOutFlowWindow() int64 {
1414 resp := make(chan uint32, 1)
1415 timer := time.NewTimer(time.Second)
1416 defer timer.Stop()
1417 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1418 select {
1419 case sz := <-resp:
1420 return int64(sz)
1421 case <-t.done:
1422 return -1
1423 case <-timer.C:
1424 return -2
1425 }
1426 }
1427
1428
1429 func (t *http2Server) Peer() *peer.Peer {
1430 return &peer.Peer{
1431 Addr: t.peer.Addr,
1432 LocalAddr: t.peer.LocalAddr,
1433 AuthInfo: t.peer.AuthInfo,
1434 }
1435 }
1436
1437 func getJitter(v time.Duration) time.Duration {
1438 if v == infinity {
1439 return 0
1440 }
1441
1442 r := int64(v / 10)
1443 j := grpcrand.Int63n(2*r) - r
1444 return time.Duration(j)
1445 }
1446
1447 type connectionKey struct{}
1448
1449
1450 func GetConnection(ctx context.Context) net.Conn {
1451 conn, _ := ctx.Value(connectionKey{}).(net.Conn)
1452 return conn
1453 }
1454
1455
1456
1457
1458 func SetConnection(ctx context.Context, conn net.Conn) context.Context {
1459 return context.WithValue(ctx, connectionKey{}, conn)
1460 }
1461
View as plain text