1
18
19 package transport
20
21 import (
22 "context"
23 "fmt"
24 "io"
25 "math"
26 "net"
27 "net/http"
28 "path/filepath"
29 "strconv"
30 "strings"
31 "sync"
32 "sync/atomic"
33 "time"
34
35 "golang.org/x/net/http2"
36 "golang.org/x/net/http2/hpack"
37 "google.golang.org/grpc/codes"
38 "google.golang.org/grpc/credentials"
39 "google.golang.org/grpc/internal"
40 "google.golang.org/grpc/internal/channelz"
41 icredentials "google.golang.org/grpc/internal/credentials"
42 "google.golang.org/grpc/internal/grpclog"
43 "google.golang.org/grpc/internal/grpcsync"
44 "google.golang.org/grpc/internal/grpcutil"
45 imetadata "google.golang.org/grpc/internal/metadata"
46 istatus "google.golang.org/grpc/internal/status"
47 isyscall "google.golang.org/grpc/internal/syscall"
48 "google.golang.org/grpc/internal/transport/networktype"
49 "google.golang.org/grpc/keepalive"
50 "google.golang.org/grpc/metadata"
51 "google.golang.org/grpc/peer"
52 "google.golang.org/grpc/resolver"
53 "google.golang.org/grpc/stats"
54 "google.golang.org/grpc/status"
55 )
56
57
58
59
60 var clientConnectionCounter uint64
61
62 var metadataFromOutgoingContextRaw = internal.FromOutgoingContextRaw.(func(context.Context) (metadata.MD, [][]string, bool))
63
64
65 type http2Client struct {
66 lastRead int64
67 ctx context.Context
68 cancel context.CancelFunc
69 ctxDone <-chan struct{}
70 userAgent string
71
72
73
74 address resolver.Address
75 md metadata.MD
76 conn net.Conn
77 loopy *loopyWriter
78 remoteAddr net.Addr
79 localAddr net.Addr
80 authInfo credentials.AuthInfo
81
82 readerDone chan struct{}
83 writerDone chan struct{}
84
85
86 goAway chan struct{}
87
88 framer *framer
89
90
91
92 controlBuf *controlBuffer
93 fc *trInFlow
94
95 scheme string
96
97 isSecure bool
98
99 perRPCCreds []credentials.PerRPCCredentials
100
101 kp keepalive.ClientParameters
102 keepaliveEnabled bool
103
104 statsHandlers []stats.Handler
105
106 initialWindowSize int32
107
108
109 maxSendHeaderListSize *uint32
110
111 bdpEst *bdpEstimator
112
113 maxConcurrentStreams uint32
114 streamQuota int64
115 streamsQuotaAvailable chan struct{}
116 waitingStreams uint32
117 registeredCompressors string
118
119
120 mu sync.Mutex
121 nextID uint32
122 state transportState
123 activeStreams map[uint32]*Stream
124
125 prevGoAwayID uint32
126
127
128 goAwayReason GoAwayReason
129
130
131 goAwayDebugMessage string
132
133
134
135
136
137 kpDormancyCond *sync.Cond
138
139
140
141 kpDormant bool
142
143 channelz *channelz.Socket
144
145 onClose func(GoAwayReason)
146
147 bufferPool *bufferPool
148
149 connectionID uint64
150 logger *grpclog.PrefixLogger
151 }
152
153 func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr resolver.Address, useProxy bool, grpcUA string) (net.Conn, error) {
154 address := addr.Addr
155 networkType, ok := networktype.Get(addr)
156 if fn != nil {
157
158
159
160
161
162
163
164 if networkType == "unix" && !strings.HasPrefix(address, "\x00") {
165
166
167 if filepath.IsAbs(address) {
168 return fn(ctx, "unix://"+address)
169 }
170 return fn(ctx, "unix:"+address)
171 }
172 return fn(ctx, address)
173 }
174 if !ok {
175 networkType, address = parseDialTarget(address)
176 }
177 if networkType == "tcp" && useProxy {
178 return proxyDial(ctx, address, grpcUA)
179 }
180 return internal.NetDialerWithTCPKeepalive().DialContext(ctx, networkType, address)
181 }
182
183 func isTemporary(err error) bool {
184 switch err := err.(type) {
185 case interface {
186 Temporary() bool
187 }:
188 return err.Temporary()
189 case interface {
190 Timeout() bool
191 }:
192
193
194 return err.Timeout()
195 }
196 return true
197 }
198
199
200
201
202 func newHTTP2Client(connectCtx, ctx context.Context, addr resolver.Address, opts ConnectOptions, onClose func(GoAwayReason)) (_ *http2Client, err error) {
203 scheme := "http"
204 ctx, cancel := context.WithCancel(ctx)
205 defer func() {
206 if err != nil {
207 cancel()
208 }
209 }()
210
211
212
213
214
215 connectCtx = icredentials.NewClientHandshakeInfoContext(connectCtx, credentials.ClientHandshakeInfo{Attributes: addr.Attributes})
216
217 conn, err := dial(connectCtx, opts.Dialer, addr, opts.UseProxy, opts.UserAgent)
218 if err != nil {
219 if opts.FailOnNonTempDialError {
220 return nil, connectionErrorf(isTemporary(err), err, "transport: error while dialing: %v", err)
221 }
222 return nil, connectionErrorf(true, err, "transport: Error while dialing: %v", err)
223 }
224
225
226 defer func(conn net.Conn) {
227 if err != nil {
228 conn.Close()
229 }
230 }(conn)
231
232
233
234
235
236
237
238 ctxMonitorDone := grpcsync.NewEvent()
239 newClientCtx, newClientDone := context.WithCancel(connectCtx)
240 defer func() {
241 newClientDone()
242 <-ctxMonitorDone.Done()
243 }()
244 go func(conn net.Conn) {
245 defer ctxMonitorDone.Fire()
246 <-newClientCtx.Done()
247 if err := connectCtx.Err(); err != nil {
248
249 if logger.V(logLevel) {
250 logger.Infof("Aborting due to connect deadline expiring: %v", err)
251 }
252 conn.Close()
253 }
254 }(conn)
255
256 kp := opts.KeepaliveParams
257
258 if kp.Time == 0 {
259 kp.Time = defaultClientKeepaliveTime
260 }
261 if kp.Timeout == 0 {
262 kp.Timeout = defaultClientKeepaliveTimeout
263 }
264 keepaliveEnabled := false
265 if kp.Time != infinity {
266 if err = isyscall.SetTCPUserTimeout(conn, kp.Timeout); err != nil {
267 return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
268 }
269 keepaliveEnabled = true
270 }
271 var (
272 isSecure bool
273 authInfo credentials.AuthInfo
274 )
275 transportCreds := opts.TransportCredentials
276 perRPCCreds := opts.PerRPCCredentials
277
278 if b := opts.CredsBundle; b != nil {
279 if t := b.TransportCredentials(); t != nil {
280 transportCreds = t
281 }
282 if t := b.PerRPCCredentials(); t != nil {
283 perRPCCreds = append(perRPCCreds, t)
284 }
285 }
286 if transportCreds != nil {
287 conn, authInfo, err = transportCreds.ClientHandshake(connectCtx, addr.ServerName, conn)
288 if err != nil {
289 return nil, connectionErrorf(isTemporary(err), err, "transport: authentication handshake failed: %v", err)
290 }
291 for _, cd := range perRPCCreds {
292 if cd.RequireTransportSecurity() {
293 if ci, ok := authInfo.(interface {
294 GetCommonAuthInfo() credentials.CommonAuthInfo
295 }); ok {
296 secLevel := ci.GetCommonAuthInfo().SecurityLevel
297 if secLevel != credentials.InvalidSecurityLevel && secLevel < credentials.PrivacyAndIntegrity {
298 return nil, connectionErrorf(true, nil, "transport: cannot send secure credentials on an insecure connection")
299 }
300 }
301 }
302 }
303 isSecure = true
304 if transportCreds.Info().SecurityProtocol == "tls" {
305 scheme = "https"
306 }
307 }
308 dynamicWindow := true
309 icwz := int32(initialWindowSize)
310 if opts.InitialConnWindowSize >= defaultWindowSize {
311 icwz = opts.InitialConnWindowSize
312 dynamicWindow = false
313 }
314 writeBufSize := opts.WriteBufferSize
315 readBufSize := opts.ReadBufferSize
316 maxHeaderListSize := defaultClientMaxHeaderListSize
317 if opts.MaxHeaderListSize != nil {
318 maxHeaderListSize = *opts.MaxHeaderListSize
319 }
320
321 t := &http2Client{
322 ctx: ctx,
323 ctxDone: ctx.Done(),
324 cancel: cancel,
325 userAgent: opts.UserAgent,
326 registeredCompressors: grpcutil.RegisteredCompressors(),
327 address: addr,
328 conn: conn,
329 remoteAddr: conn.RemoteAddr(),
330 localAddr: conn.LocalAddr(),
331 authInfo: authInfo,
332 readerDone: make(chan struct{}),
333 writerDone: make(chan struct{}),
334 goAway: make(chan struct{}),
335 framer: newFramer(conn, writeBufSize, readBufSize, opts.SharedWriteBuffer, maxHeaderListSize),
336 fc: &trInFlow{limit: uint32(icwz)},
337 scheme: scheme,
338 activeStreams: make(map[uint32]*Stream),
339 isSecure: isSecure,
340 perRPCCreds: perRPCCreds,
341 kp: kp,
342 statsHandlers: opts.StatsHandlers,
343 initialWindowSize: initialWindowSize,
344 nextID: 1,
345 maxConcurrentStreams: defaultMaxStreamsClient,
346 streamQuota: defaultMaxStreamsClient,
347 streamsQuotaAvailable: make(chan struct{}, 1),
348 keepaliveEnabled: keepaliveEnabled,
349 bufferPool: newBufferPool(),
350 onClose: onClose,
351 }
352 var czSecurity credentials.ChannelzSecurityValue
353 if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
354 czSecurity = au.GetSecurityValue()
355 }
356 t.channelz = channelz.RegisterSocket(
357 &channelz.Socket{
358 SocketType: channelz.SocketTypeNormal,
359 Parent: opts.ChannelzParent,
360 SocketMetrics: channelz.SocketMetrics{},
361 EphemeralMetrics: t.socketMetrics,
362 LocalAddr: t.localAddr,
363 RemoteAddr: t.remoteAddr,
364 SocketOptions: channelz.GetSocketOption(t.conn),
365 Security: czSecurity,
366 })
367 t.logger = prefixLoggerForClientTransport(t)
368
369 t.ctx = peer.NewContext(t.ctx, t.getPeer())
370
371 if md, ok := addr.Metadata.(*metadata.MD); ok {
372 t.md = *md
373 } else if md := imetadata.Get(addr); md != nil {
374 t.md = md
375 }
376 t.controlBuf = newControlBuffer(t.ctxDone)
377 if opts.InitialWindowSize >= defaultWindowSize {
378 t.initialWindowSize = opts.InitialWindowSize
379 dynamicWindow = false
380 }
381 if dynamicWindow {
382 t.bdpEst = &bdpEstimator{
383 bdp: initialWindowSize,
384 updateFlowControl: t.updateFlowControl,
385 }
386 }
387 for _, sh := range t.statsHandlers {
388 t.ctx = sh.TagConn(t.ctx, &stats.ConnTagInfo{
389 RemoteAddr: t.remoteAddr,
390 LocalAddr: t.localAddr,
391 })
392 connBegin := &stats.ConnBegin{
393 Client: true,
394 }
395 sh.HandleConn(t.ctx, connBegin)
396 }
397 if t.keepaliveEnabled {
398 t.kpDormancyCond = sync.NewCond(&t.mu)
399 go t.keepalive()
400 }
401
402
403
404
405
406
407
408 readerErrCh := make(chan error, 1)
409 go t.reader(readerErrCh)
410 defer func() {
411 if err != nil {
412
413
414 close(t.writerDone)
415 t.Close(err)
416 }
417 }()
418
419
420 n, err := t.conn.Write(clientPreface)
421 if err != nil {
422 err = connectionErrorf(true, err, "transport: failed to write client preface: %v", err)
423 return nil, err
424 }
425 if n != len(clientPreface) {
426 err = connectionErrorf(true, nil, "transport: preface mismatch, wrote %d bytes; want %d", n, len(clientPreface))
427 return nil, err
428 }
429 var ss []http2.Setting
430
431 if t.initialWindowSize != defaultWindowSize {
432 ss = append(ss, http2.Setting{
433 ID: http2.SettingInitialWindowSize,
434 Val: uint32(t.initialWindowSize),
435 })
436 }
437 if opts.MaxHeaderListSize != nil {
438 ss = append(ss, http2.Setting{
439 ID: http2.SettingMaxHeaderListSize,
440 Val: *opts.MaxHeaderListSize,
441 })
442 }
443 err = t.framer.fr.WriteSettings(ss...)
444 if err != nil {
445 err = connectionErrorf(true, err, "transport: failed to write initial settings frame: %v", err)
446 return nil, err
447 }
448
449 if delta := uint32(icwz - defaultWindowSize); delta > 0 {
450 if err := t.framer.fr.WriteWindowUpdate(0, delta); err != nil {
451 err = connectionErrorf(true, err, "transport: failed to write window update: %v", err)
452 return nil, err
453 }
454 }
455
456 t.connectionID = atomic.AddUint64(&clientConnectionCounter, 1)
457
458 if err := t.framer.writer.Flush(); err != nil {
459 return nil, err
460 }
461
462 if err = <-readerErrCh; err != nil {
463 return nil, err
464 }
465 go func() {
466 t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
467 if err := t.loopy.run(); !isIOError(err) {
468
469
470
471
472 t.conn.Close()
473 }
474 close(t.writerDone)
475 }()
476 return t, nil
477 }
478
479 func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
480
481 s := &Stream{
482 ct: t,
483 done: make(chan struct{}),
484 method: callHdr.Method,
485 sendCompress: callHdr.SendCompress,
486 buf: newRecvBuffer(),
487 headerChan: make(chan struct{}),
488 contentSubtype: callHdr.ContentSubtype,
489 doneFunc: callHdr.DoneFunc,
490 }
491 s.wq = newWriteQuota(defaultWriteQuota, s.done)
492 s.requestRead = func(n int) {
493 t.adjustWindow(s, uint32(n))
494 }
495
496
497
498 s.ctx = ctx
499 s.trReader = &transportReader{
500 reader: &recvBufferReader{
501 ctx: s.ctx,
502 ctxDone: s.ctx.Done(),
503 recv: s.buf,
504 closeStream: func(err error) {
505 t.CloseStream(s, err)
506 },
507 freeBuffer: t.bufferPool.put,
508 },
509 windowHandler: func(n int) {
510 t.updateWindow(s, uint32(n))
511 },
512 }
513 return s
514 }
515
516 func (t *http2Client) getPeer() *peer.Peer {
517 return &peer.Peer{
518 Addr: t.remoteAddr,
519 AuthInfo: t.authInfo,
520 LocalAddr: t.localAddr,
521 }
522 }
523
524
525
526 func (t *http2Client) outgoingGoAwayHandler(g *goAway) (bool, error) {
527 t.mu.Lock()
528 defer t.mu.Unlock()
529 if err := t.framer.fr.WriteGoAway(t.nextID-2, http2.ErrCodeNo, g.debugData); err != nil {
530 return false, err
531 }
532 return false, g.closeConn
533 }
534
535 func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {
536 aud := t.createAudience(callHdr)
537 ri := credentials.RequestInfo{
538 Method: callHdr.Method,
539 AuthInfo: t.authInfo,
540 }
541 ctxWithRequestInfo := icredentials.NewRequestInfoContext(ctx, ri)
542 authData, err := t.getTrAuthData(ctxWithRequestInfo, aud)
543 if err != nil {
544 return nil, err
545 }
546 callAuthData, err := t.getCallAuthData(ctxWithRequestInfo, aud, callHdr)
547 if err != nil {
548 return nil, err
549 }
550
551
552
553 hfLen := 7
554 hfLen += len(authData) + len(callAuthData)
555 headerFields := make([]hpack.HeaderField, 0, hfLen)
556 headerFields = append(headerFields, hpack.HeaderField{Name: ":method", Value: "POST"})
557 headerFields = append(headerFields, hpack.HeaderField{Name: ":scheme", Value: t.scheme})
558 headerFields = append(headerFields, hpack.HeaderField{Name: ":path", Value: callHdr.Method})
559 headerFields = append(headerFields, hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
560 headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(callHdr.ContentSubtype)})
561 headerFields = append(headerFields, hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
562 headerFields = append(headerFields, hpack.HeaderField{Name: "te", Value: "trailers"})
563 if callHdr.PreviousAttempts > 0 {
564 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-previous-rpc-attempts", Value: strconv.Itoa(callHdr.PreviousAttempts)})
565 }
566
567 registeredCompressors := t.registeredCompressors
568 if callHdr.SendCompress != "" {
569 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: callHdr.SendCompress})
570
571
572
573 if !grpcutil.IsCompressorNameRegistered(callHdr.SendCompress) {
574 if registeredCompressors != "" {
575 registeredCompressors += ","
576 }
577 registeredCompressors += callHdr.SendCompress
578 }
579 }
580
581 if registeredCompressors != "" {
582 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-accept-encoding", Value: registeredCompressors})
583 }
584 if dl, ok := ctx.Deadline(); ok {
585
586
587 timeout := time.Until(dl)
588 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})
589 }
590 for k, v := range authData {
591 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
592 }
593 for k, v := range callAuthData {
594 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
595 }
596 if b := stats.OutgoingTags(ctx); b != nil {
597 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
598 }
599 if b := stats.OutgoingTrace(ctx); b != nil {
600 headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
601 }
602
603 if md, added, ok := metadataFromOutgoingContextRaw(ctx); ok {
604 var k string
605 for k, vv := range md {
606
607 if isReservedHeader(k) {
608 continue
609 }
610 for _, v := range vv {
611 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
612 }
613 }
614 for _, vv := range added {
615 for i, v := range vv {
616 if i%2 == 0 {
617 k = strings.ToLower(v)
618 continue
619 }
620
621 if isReservedHeader(k) {
622 continue
623 }
624 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
625 }
626 }
627 }
628 for k, vv := range t.md {
629 if isReservedHeader(k) {
630 continue
631 }
632 for _, v := range vv {
633 headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
634 }
635 }
636 return headerFields, nil
637 }
638
639 func (t *http2Client) createAudience(callHdr *CallHdr) string {
640
641 if len(t.perRPCCreds) == 0 && callHdr.Creds == nil {
642 return ""
643 }
644
645
646 host := strings.TrimSuffix(callHdr.Host, ":443")
647 pos := strings.LastIndex(callHdr.Method, "/")
648 if pos == -1 {
649 pos = len(callHdr.Method)
650 }
651 return "https://" + host + callHdr.Method[:pos]
652 }
653
654 func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[string]string, error) {
655 if len(t.perRPCCreds) == 0 {
656 return nil, nil
657 }
658 authData := map[string]string{}
659 for _, c := range t.perRPCCreds {
660 data, err := c.GetRequestMetadata(ctx, audience)
661 if err != nil {
662 if st, ok := status.FromError(err); ok {
663
664 if istatus.IsRestrictedControlPlaneCode(st) {
665 err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
666 }
667 return nil, err
668 }
669
670 return nil, status.Errorf(codes.Unauthenticated, "transport: per-RPC creds failed due to error: %v", err)
671 }
672 for k, v := range data {
673
674 k = strings.ToLower(k)
675 authData[k] = v
676 }
677 }
678 return authData, nil
679 }
680
681 func (t *http2Client) getCallAuthData(ctx context.Context, audience string, callHdr *CallHdr) (map[string]string, error) {
682 var callAuthData map[string]string
683
684
685
686 if callCreds := callHdr.Creds; callCreds != nil {
687 if callCreds.RequireTransportSecurity() {
688 ri, _ := credentials.RequestInfoFromContext(ctx)
689 if !t.isSecure || credentials.CheckSecurityLevel(ri.AuthInfo, credentials.PrivacyAndIntegrity) != nil {
690 return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
691 }
692 }
693 data, err := callCreds.GetRequestMetadata(ctx, audience)
694 if err != nil {
695 if st, ok := status.FromError(err); ok {
696
697 if istatus.IsRestrictedControlPlaneCode(st) {
698 err = status.Errorf(codes.Internal, "transport: received per-RPC creds error with illegal status: %v", err)
699 }
700 return nil, err
701 }
702 return nil, status.Errorf(codes.Internal, "transport: per-RPC creds failed due to error: %v", err)
703 }
704 callAuthData = make(map[string]string, len(data))
705 for k, v := range data {
706
707 k = strings.ToLower(k)
708 callAuthData[k] = v
709 }
710 }
711 return callAuthData, nil
712 }
713
714
715
716
717
718
719
720
721
722
723
724
725 type NewStreamError struct {
726 Err error
727
728 AllowTransparentRetry bool
729 }
730
731 func (e NewStreamError) Error() string {
732 return e.Err.Error()
733 }
734
735
736
737 func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {
738 ctx = peer.NewContext(ctx, t.getPeer())
739
740
741
742
743
744
745 if t.address.ServerName != "" {
746 newCallHdr := *callHdr
747 newCallHdr.Host = t.address.ServerName
748 callHdr = &newCallHdr
749 }
750
751 headerFields, err := t.createHeaderFields(ctx, callHdr)
752 if err != nil {
753 return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}
754 }
755 s := t.newStream(ctx, callHdr)
756 cleanup := func(err error) {
757 if s.swapState(streamDone) == streamDone {
758
759 return
760 }
761
762 atomic.StoreUint32(&s.unprocessed, 1)
763 s.write(recvMsg{err: err})
764 close(s.done)
765
766 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
767 close(s.headerChan)
768 }
769 }
770 hdr := &headerFrame{
771 hf: headerFields,
772 endStream: false,
773 initStream: func(id uint32) error {
774 t.mu.Lock()
775
776
777 if t.state == closing {
778 t.mu.Unlock()
779 cleanup(ErrConnClosing)
780 return ErrConnClosing
781 }
782 if channelz.IsOn() {
783 t.channelz.SocketMetrics.StreamsStarted.Add(1)
784 t.channelz.SocketMetrics.LastLocalStreamCreatedTimestamp.Store(time.Now().UnixNano())
785 }
786
787 if t.kpDormant {
788 t.kpDormancyCond.Signal()
789 }
790 t.mu.Unlock()
791 return nil
792 },
793 onOrphaned: cleanup,
794 wq: s.wq,
795 }
796 firstTry := true
797 var ch chan struct{}
798 transportDrainRequired := false
799 checkForStreamQuota := func() bool {
800 if t.streamQuota <= 0 {
801 if firstTry {
802 t.waitingStreams++
803 }
804 ch = t.streamsQuotaAvailable
805 return false
806 }
807 if !firstTry {
808 t.waitingStreams--
809 }
810 t.streamQuota--
811
812 t.mu.Lock()
813 if t.state == draining || t.activeStreams == nil {
814 t.mu.Unlock()
815 return false
816 }
817
818 hdr.streamID = t.nextID
819 t.nextID += 2
820
821
822 transportDrainRequired = t.nextID > MaxStreamID
823
824 s.id = hdr.streamID
825 s.fc = &inFlow{limit: uint32(t.initialWindowSize)}
826 t.activeStreams[s.id] = s
827 t.mu.Unlock()
828
829 if t.streamQuota > 0 && t.waitingStreams > 0 {
830 select {
831 case t.streamsQuotaAvailable <- struct{}{}:
832 default:
833 }
834 }
835 return true
836 }
837 var hdrListSizeErr error
838 checkForHeaderListSize := func() bool {
839 if t.maxSendHeaderListSize == nil {
840 return true
841 }
842 var sz int64
843 for _, f := range hdr.hf {
844 if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
845 hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
846 return false
847 }
848 }
849 return true
850 }
851 for {
852 success, err := t.controlBuf.executeAndPut(func() bool {
853 return checkForHeaderListSize() && checkForStreamQuota()
854 }, hdr)
855 if err != nil {
856
857 return nil, &NewStreamError{Err: err, AllowTransparentRetry: true}
858 }
859 if success {
860 break
861 }
862 if hdrListSizeErr != nil {
863 return nil, &NewStreamError{Err: hdrListSizeErr}
864 }
865 firstTry = false
866 select {
867 case <-ch:
868 case <-ctx.Done():
869 return nil, &NewStreamError{Err: ContextErr(ctx.Err())}
870 case <-t.goAway:
871 return nil, &NewStreamError{Err: errStreamDrain, AllowTransparentRetry: true}
872 case <-t.ctx.Done():
873 return nil, &NewStreamError{Err: ErrConnClosing, AllowTransparentRetry: true}
874 }
875 }
876 if len(t.statsHandlers) != 0 {
877 header, ok := metadata.FromOutgoingContext(ctx)
878 if ok {
879 header.Set("user-agent", t.userAgent)
880 } else {
881 header = metadata.Pairs("user-agent", t.userAgent)
882 }
883 for _, sh := range t.statsHandlers {
884
885
886
887 outHeader := &stats.OutHeader{
888 Client: true,
889 FullMethod: callHdr.Method,
890 RemoteAddr: t.remoteAddr,
891 LocalAddr: t.localAddr,
892 Compression: callHdr.SendCompress,
893 Header: header,
894 }
895 sh.HandleRPC(s.ctx, outHeader)
896 }
897 }
898 if transportDrainRequired {
899 if t.logger.V(logLevel) {
900 t.logger.Infof("Draining transport: t.nextID > MaxStreamID")
901 }
902 t.GracefulClose()
903 }
904 return s, nil
905 }
906
907
908
909 func (t *http2Client) CloseStream(s *Stream, err error) {
910 var (
911 rst bool
912 rstCode http2.ErrCode
913 )
914 if err != nil {
915 rst = true
916 rstCode = http2.ErrCodeCancel
917 }
918 t.closeStream(s, err, rst, rstCode, status.Convert(err), nil, false)
919 }
920
921 func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.ErrCode, st *status.Status, mdata map[string][]string, eosReceived bool) {
922
923 if s.swapState(streamDone) == streamDone {
924
925
926 <-s.done
927 return
928 }
929
930
931
932 s.status = st
933 if len(mdata) > 0 {
934 s.trailer = mdata
935 }
936 if err != nil {
937
938 s.write(recvMsg{err: err})
939 }
940
941 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
942 s.noHeaders = true
943 close(s.headerChan)
944 }
945 cleanup := &cleanupStream{
946 streamID: s.id,
947 onWrite: func() {
948 t.mu.Lock()
949 if t.activeStreams != nil {
950 delete(t.activeStreams, s.id)
951 }
952 t.mu.Unlock()
953 if channelz.IsOn() {
954 if eosReceived {
955 t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
956 } else {
957 t.channelz.SocketMetrics.StreamsFailed.Add(1)
958 }
959 }
960 },
961 rst: rst,
962 rstCode: rstCode,
963 }
964 addBackStreamQuota := func() bool {
965 t.streamQuota++
966 if t.streamQuota > 0 && t.waitingStreams > 0 {
967 select {
968 case t.streamsQuotaAvailable <- struct{}{}:
969 default:
970 }
971 }
972 return true
973 }
974 t.controlBuf.executeAndPut(addBackStreamQuota, cleanup)
975
976 close(s.done)
977 if s.doneFunc != nil {
978 s.doneFunc()
979 }
980 }
981
982
983
984
985 func (t *http2Client) Close(err error) {
986 t.mu.Lock()
987
988 if t.state == closing {
989 t.mu.Unlock()
990 return
991 }
992 if t.logger.V(logLevel) {
993 t.logger.Infof("Closing: %v", err)
994 }
995
996
997 if t.state != draining {
998 t.onClose(GoAwayInvalid)
999 }
1000 t.state = closing
1001 streams := t.activeStreams
1002 t.activeStreams = nil
1003 if t.kpDormant {
1004
1005
1006 t.kpDormancyCond.Signal()
1007 }
1008 t.mu.Unlock()
1009
1010
1011 t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte("client transport shutdown"), closeConn: err})
1012 <-t.writerDone
1013 t.cancel()
1014 t.conn.Close()
1015 channelz.RemoveEntry(t.channelz.ID)
1016
1017
1018 _, goAwayDebugMessage := t.GetGoAwayReason()
1019
1020 var st *status.Status
1021 if len(goAwayDebugMessage) > 0 {
1022 st = status.Newf(codes.Unavailable, "closing transport due to: %v, received prior goaway: %v", err, goAwayDebugMessage)
1023 err = st.Err()
1024 } else {
1025 st = status.New(codes.Unavailable, err.Error())
1026 }
1027
1028
1029 for _, s := range streams {
1030 t.closeStream(s, err, false, http2.ErrCodeNo, st, nil, false)
1031 }
1032 for _, sh := range t.statsHandlers {
1033 connEnd := &stats.ConnEnd{
1034 Client: true,
1035 }
1036 sh.HandleConn(t.ctx, connEnd)
1037 }
1038 }
1039
1040
1041
1042
1043
1044
1045 func (t *http2Client) GracefulClose() {
1046 t.mu.Lock()
1047
1048 if t.state == draining || t.state == closing {
1049 t.mu.Unlock()
1050 return
1051 }
1052 if t.logger.V(logLevel) {
1053 t.logger.Infof("GracefulClose called")
1054 }
1055 t.onClose(GoAwayInvalid)
1056 t.state = draining
1057 active := len(t.activeStreams)
1058 t.mu.Unlock()
1059 if active == 0 {
1060 t.Close(connectionErrorf(true, nil, "no active streams left to process while draining"))
1061 return
1062 }
1063 t.controlBuf.put(&incomingGoAway{})
1064 }
1065
1066
1067
1068 func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
1069 if opts.Last {
1070
1071 if !s.compareAndSwapState(streamActive, streamWriteDone) {
1072 return errStreamDone
1073 }
1074 } else if s.getState() != streamActive {
1075 return errStreamDone
1076 }
1077 df := &dataFrame{
1078 streamID: s.id,
1079 endStream: opts.Last,
1080 h: hdr,
1081 d: data,
1082 }
1083 if hdr != nil || data != nil {
1084 if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
1085 return err
1086 }
1087 }
1088 return t.controlBuf.put(df)
1089 }
1090
1091 func (t *http2Client) getStream(f http2.Frame) *Stream {
1092 t.mu.Lock()
1093 s := t.activeStreams[f.Header().StreamID]
1094 t.mu.Unlock()
1095 return s
1096 }
1097
1098
1099
1100
1101 func (t *http2Client) adjustWindow(s *Stream, n uint32) {
1102 if w := s.fc.maybeAdjust(n); w > 0 {
1103 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
1104 }
1105 }
1106
1107
1108
1109
1110 func (t *http2Client) updateWindow(s *Stream, n uint32) {
1111 if w := s.fc.onRead(n); w > 0 {
1112 t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
1113 }
1114 }
1115
1116
1117
1118
1119 func (t *http2Client) updateFlowControl(n uint32) {
1120 updateIWS := func() bool {
1121 t.initialWindowSize = int32(n)
1122 t.mu.Lock()
1123 for _, s := range t.activeStreams {
1124 s.fc.newLimit(n)
1125 }
1126 t.mu.Unlock()
1127 return true
1128 }
1129 t.controlBuf.executeAndPut(updateIWS, &outgoingWindowUpdate{streamID: 0, increment: t.fc.newLimit(n)})
1130 t.controlBuf.put(&outgoingSettings{
1131 ss: []http2.Setting{
1132 {
1133 ID: http2.SettingInitialWindowSize,
1134 Val: n,
1135 },
1136 },
1137 })
1138 }
1139
1140 func (t *http2Client) handleData(f *http2.DataFrame) {
1141 size := f.Header().Length
1142 var sendBDPPing bool
1143 if t.bdpEst != nil {
1144 sendBDPPing = t.bdpEst.add(size)
1145 }
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155 if w := t.fc.onData(size); w > 0 {
1156 t.controlBuf.put(&outgoingWindowUpdate{
1157 streamID: 0,
1158 increment: w,
1159 })
1160 }
1161 if sendBDPPing {
1162
1163
1164
1165 if w := t.fc.reset(); w > 0 {
1166 t.controlBuf.put(&outgoingWindowUpdate{
1167 streamID: 0,
1168 increment: w,
1169 })
1170 }
1171
1172 t.controlBuf.put(bdpPing)
1173 }
1174
1175 s := t.getStream(f)
1176 if s == nil {
1177 return
1178 }
1179 if size > 0 {
1180 if err := s.fc.onData(size); err != nil {
1181 t.closeStream(s, io.EOF, true, http2.ErrCodeFlowControl, status.New(codes.Internal, err.Error()), nil, false)
1182 return
1183 }
1184 if f.Header().Flags.Has(http2.FlagDataPadded) {
1185 if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
1186 t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
1187 }
1188 }
1189
1190
1191
1192 if len(f.Data()) > 0 {
1193 buffer := t.bufferPool.get()
1194 buffer.Reset()
1195 buffer.Write(f.Data())
1196 s.write(recvMsg{buffer: buffer})
1197 }
1198 }
1199
1200
1201 if f.StreamEnded() {
1202 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.New(codes.Internal, "server closed the stream without sending trailers"), nil, true)
1203 }
1204 }
1205
1206 func (t *http2Client) handleRSTStream(f *http2.RSTStreamFrame) {
1207 s := t.getStream(f)
1208 if s == nil {
1209 return
1210 }
1211 if f.ErrCode == http2.ErrCodeRefusedStream {
1212
1213 atomic.StoreUint32(&s.unprocessed, 1)
1214 }
1215 statusCode, ok := http2ErrConvTab[f.ErrCode]
1216 if !ok {
1217 if t.logger.V(logLevel) {
1218 t.logger.Infof("Received a RST_STREAM frame with code %q, but found no mapped gRPC status", f.ErrCode)
1219 }
1220 statusCode = codes.Unknown
1221 }
1222 if statusCode == codes.Canceled {
1223 if d, ok := s.ctx.Deadline(); ok && !d.After(time.Now()) {
1224
1225
1226 statusCode = codes.DeadlineExceeded
1227 }
1228 }
1229 t.closeStream(s, io.EOF, false, http2.ErrCodeNo, status.Newf(statusCode, "stream terminated by RST_STREAM with error code: %v", f.ErrCode), nil, false)
1230 }
1231
1232 func (t *http2Client) handleSettings(f *http2.SettingsFrame, isFirst bool) {
1233 if f.IsAck() {
1234 return
1235 }
1236 var maxStreams *uint32
1237 var ss []http2.Setting
1238 var updateFuncs []func()
1239 f.ForeachSetting(func(s http2.Setting) error {
1240 switch s.ID {
1241 case http2.SettingMaxConcurrentStreams:
1242 maxStreams = new(uint32)
1243 *maxStreams = s.Val
1244 case http2.SettingMaxHeaderListSize:
1245 updateFuncs = append(updateFuncs, func() {
1246 t.maxSendHeaderListSize = new(uint32)
1247 *t.maxSendHeaderListSize = s.Val
1248 })
1249 default:
1250 ss = append(ss, s)
1251 }
1252 return nil
1253 })
1254 if isFirst && maxStreams == nil {
1255 maxStreams = new(uint32)
1256 *maxStreams = math.MaxUint32
1257 }
1258 sf := &incomingSettings{
1259 ss: ss,
1260 }
1261 if maxStreams != nil {
1262 updateStreamQuota := func() {
1263 delta := int64(*maxStreams) - int64(t.maxConcurrentStreams)
1264 t.maxConcurrentStreams = *maxStreams
1265 t.streamQuota += delta
1266 if delta > 0 && t.waitingStreams > 0 {
1267 close(t.streamsQuotaAvailable)
1268 t.streamsQuotaAvailable = make(chan struct{}, 1)
1269 }
1270 }
1271 updateFuncs = append(updateFuncs, updateStreamQuota)
1272 }
1273 t.controlBuf.executeAndPut(func() bool {
1274 for _, f := range updateFuncs {
1275 f()
1276 }
1277 return true
1278 }, sf)
1279 }
1280
1281 func (t *http2Client) handlePing(f *http2.PingFrame) {
1282 if f.IsAck() {
1283
1284 if t.bdpEst != nil {
1285 t.bdpEst.calculate(f.Data)
1286 }
1287 return
1288 }
1289 pingAck := &ping{ack: true}
1290 copy(pingAck.data[:], f.Data[:])
1291 t.controlBuf.put(pingAck)
1292 }
1293
1294 func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
1295 t.mu.Lock()
1296 if t.state == closing {
1297 t.mu.Unlock()
1298 return
1299 }
1300 if f.ErrCode == http2.ErrCodeEnhanceYourCalm && string(f.DebugData()) == "too_many_pings" {
1301
1302
1303
1304
1305 logger.Errorf("Client received GoAway with error code ENHANCE_YOUR_CALM and debug data equal to ASCII \"too_many_pings\".")
1306 }
1307 id := f.LastStreamID
1308 if id > 0 && id%2 == 0 {
1309 t.mu.Unlock()
1310 t.Close(connectionErrorf(true, nil, "received goaway with non-zero even-numbered numbered stream id: %v", id))
1311 return
1312 }
1313
1314
1315
1316
1317
1318
1319
1320
1321
1322
1323 select {
1324 case <-t.goAway:
1325
1326 if id > t.prevGoAwayID {
1327 t.mu.Unlock()
1328 t.Close(connectionErrorf(true, nil, "received goaway with stream id: %v, which exceeds stream id of previous goaway: %v", id, t.prevGoAwayID))
1329 return
1330 }
1331 default:
1332 t.setGoAwayReason(f)
1333 close(t.goAway)
1334 defer t.controlBuf.put(&incomingGoAway{})
1335
1336
1337
1338 if t.state != draining {
1339 t.onClose(t.goAwayReason)
1340 t.state = draining
1341 }
1342 }
1343
1344
1345 upperLimit := t.prevGoAwayID
1346 if upperLimit == 0 {
1347 upperLimit = math.MaxUint32
1348 }
1349
1350 t.prevGoAwayID = id
1351 if len(t.activeStreams) == 0 {
1352 t.mu.Unlock()
1353 t.Close(connectionErrorf(true, nil, "received goaway and there are no active streams"))
1354 return
1355 }
1356
1357 streamsToClose := make([]*Stream, 0)
1358 for streamID, stream := range t.activeStreams {
1359 if streamID > id && streamID <= upperLimit {
1360
1361 atomic.StoreUint32(&stream.unprocessed, 1)
1362 streamsToClose = append(streamsToClose, stream)
1363 }
1364 }
1365 t.mu.Unlock()
1366
1367
1368 for _, stream := range streamsToClose {
1369 t.closeStream(stream, errStreamDrain, false, http2.ErrCodeNo, statusGoAway, nil, false)
1370 }
1371 }
1372
1373
1374
1375
1376
1377 func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
1378 t.goAwayReason = GoAwayNoReason
1379 switch f.ErrCode {
1380 case http2.ErrCodeEnhanceYourCalm:
1381 if string(f.DebugData()) == "too_many_pings" {
1382 t.goAwayReason = GoAwayTooManyPings
1383 }
1384 }
1385 if len(f.DebugData()) == 0 {
1386 t.goAwayDebugMessage = fmt.Sprintf("code: %s", f.ErrCode)
1387 } else {
1388 t.goAwayDebugMessage = fmt.Sprintf("code: %s, debug data: %q", f.ErrCode, string(f.DebugData()))
1389 }
1390 }
1391
1392 func (t *http2Client) GetGoAwayReason() (GoAwayReason, string) {
1393 t.mu.Lock()
1394 defer t.mu.Unlock()
1395 return t.goAwayReason, t.goAwayDebugMessage
1396 }
1397
1398 func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
1399 t.controlBuf.put(&incomingWindowUpdate{
1400 streamID: f.Header().StreamID,
1401 increment: f.Increment,
1402 })
1403 }
1404
1405
1406 func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
1407 s := t.getStream(frame)
1408 if s == nil {
1409 return
1410 }
1411 endStream := frame.StreamEnded()
1412 atomic.StoreUint32(&s.bytesReceived, 1)
1413 initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
1414
1415 if !initialHeader && !endStream {
1416
1417 st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
1418 t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
1419 return
1420 }
1421
1422
1423
1424 if frame.Truncated {
1425 se := status.New(codes.Internal, "peer header list size exceeded limit")
1426 t.closeStream(s, se.Err(), true, http2.ErrCodeFrameSize, se, nil, endStream)
1427 return
1428 }
1429
1430 var (
1431
1432
1433 isGRPC = !initialHeader
1434 mdata = make(map[string][]string)
1435 contentTypeErr = "malformed header: missing HTTP content-type"
1436 grpcMessage string
1437 recvCompress string
1438 httpStatusCode *int
1439 httpStatusErr string
1440 rawStatusCode = codes.Unknown
1441
1442 headerError string
1443 )
1444
1445 if initialHeader {
1446 httpStatusErr = "malformed header: missing HTTP status"
1447 }
1448
1449 for _, hf := range frame.Fields {
1450 switch hf.Name {
1451 case "content-type":
1452 if _, validContentType := grpcutil.ContentSubtype(hf.Value); !validContentType {
1453 contentTypeErr = fmt.Sprintf("transport: received unexpected content-type %q", hf.Value)
1454 break
1455 }
1456 contentTypeErr = ""
1457 mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
1458 isGRPC = true
1459 case "grpc-encoding":
1460 recvCompress = hf.Value
1461 case "grpc-status":
1462 code, err := strconv.ParseInt(hf.Value, 10, 32)
1463 if err != nil {
1464 se := status.New(codes.Internal, fmt.Sprintf("transport: malformed grpc-status: %v", err))
1465 t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1466 return
1467 }
1468 rawStatusCode = codes.Code(uint32(code))
1469 case "grpc-message":
1470 grpcMessage = decodeGrpcMessage(hf.Value)
1471 case ":status":
1472 if hf.Value == "200" {
1473 httpStatusErr = ""
1474 statusCode := 200
1475 httpStatusCode = &statusCode
1476 break
1477 }
1478
1479 c, err := strconv.ParseInt(hf.Value, 10, 32)
1480 if err != nil {
1481 se := status.New(codes.Internal, fmt.Sprintf("transport: malformed http-status: %v", err))
1482 t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1483 return
1484 }
1485 statusCode := int(c)
1486 httpStatusCode = &statusCode
1487
1488 httpStatusErr = fmt.Sprintf(
1489 "unexpected HTTP status code received from server: %d (%s)",
1490 statusCode,
1491 http.StatusText(statusCode),
1492 )
1493 default:
1494 if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
1495 break
1496 }
1497 v, err := decodeMetadataHeader(hf.Name, hf.Value)
1498 if err != nil {
1499 headerError = fmt.Sprintf("transport: malformed %s: %v", hf.Name, err)
1500 logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
1501 break
1502 }
1503 mdata[hf.Name] = append(mdata[hf.Name], v)
1504 }
1505 }
1506
1507 if !isGRPC || httpStatusErr != "" {
1508 var code = codes.Internal
1509
1510 if httpStatusCode != nil {
1511 var ok bool
1512 code, ok = HTTPStatusConvTab[*httpStatusCode]
1513 if !ok {
1514 code = codes.Unknown
1515 }
1516 }
1517 var errs []string
1518 if httpStatusErr != "" {
1519 errs = append(errs, httpStatusErr)
1520 }
1521 if contentTypeErr != "" {
1522 errs = append(errs, contentTypeErr)
1523 }
1524
1525 se := status.New(code, strings.Join(errs, "; "))
1526 t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1527 return
1528 }
1529
1530 if headerError != "" {
1531 se := status.New(codes.Internal, headerError)
1532 t.closeStream(s, se.Err(), true, http2.ErrCodeProtocol, se, nil, endStream)
1533 return
1534 }
1535
1536
1537
1538
1539 if !endStream {
1540
1541
1542
1543 if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
1544 s.headerValid = true
1545
1546
1547
1548 s.recvCompress = recvCompress
1549 if len(mdata) > 0 {
1550 s.header = mdata
1551 }
1552 close(s.headerChan)
1553 }
1554 }
1555
1556 for _, sh := range t.statsHandlers {
1557 if !endStream {
1558 inHeader := &stats.InHeader{
1559 Client: true,
1560 WireLength: int(frame.Header().Length),
1561 Header: metadata.MD(mdata).Copy(),
1562 Compression: s.recvCompress,
1563 }
1564 sh.HandleRPC(s.ctx, inHeader)
1565 } else {
1566 inTrailer := &stats.InTrailer{
1567 Client: true,
1568 WireLength: int(frame.Header().Length),
1569 Trailer: metadata.MD(mdata).Copy(),
1570 }
1571 sh.HandleRPC(s.ctx, inTrailer)
1572 }
1573 }
1574
1575 if !endStream {
1576 return
1577 }
1578
1579 status := istatus.NewWithProto(rawStatusCode, grpcMessage, mdata[grpcStatusDetailsBinHeader])
1580
1581
1582
1583 rstStream := s.getState() == streamActive
1584 t.closeStream(s, io.EOF, rstStream, http2.ErrCodeNo, status, mdata, true)
1585 }
1586
1587
1588
1589 func (t *http2Client) readServerPreface() error {
1590 frame, err := t.framer.fr.ReadFrame()
1591 if err != nil {
1592 return connectionErrorf(true, err, "error reading server preface: %v", err)
1593 }
1594 sf, ok := frame.(*http2.SettingsFrame)
1595 if !ok {
1596 return connectionErrorf(true, nil, "initial http2 frame from server is not a settings frame: %T", frame)
1597 }
1598 t.handleSettings(sf, true)
1599 return nil
1600 }
1601
1602
1603
1604
1605 func (t *http2Client) reader(errCh chan<- error) {
1606 defer close(t.readerDone)
1607
1608 if err := t.readServerPreface(); err != nil {
1609 errCh <- err
1610 return
1611 }
1612 close(errCh)
1613 if t.keepaliveEnabled {
1614 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
1615 }
1616
1617
1618 for {
1619 t.controlBuf.throttle()
1620 frame, err := t.framer.fr.ReadFrame()
1621 if t.keepaliveEnabled {
1622 atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
1623 }
1624 if err != nil {
1625
1626
1627
1628 if se, ok := err.(http2.StreamError); ok {
1629 t.mu.Lock()
1630 s := t.activeStreams[se.StreamID]
1631 t.mu.Unlock()
1632 if s != nil {
1633
1634 code := http2ErrConvTab[se.Code]
1635 errorDetail := t.framer.fr.ErrorDetail()
1636 var msg string
1637 if errorDetail != nil {
1638 msg = errorDetail.Error()
1639 } else {
1640 msg = "received invalid frame"
1641 }
1642 t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
1643 }
1644 continue
1645 } else {
1646
1647 t.Close(connectionErrorf(true, err, "error reading from server: %v", err))
1648 return
1649 }
1650 }
1651 switch frame := frame.(type) {
1652 case *http2.MetaHeadersFrame:
1653 t.operateHeaders(frame)
1654 case *http2.DataFrame:
1655 t.handleData(frame)
1656 case *http2.RSTStreamFrame:
1657 t.handleRSTStream(frame)
1658 case *http2.SettingsFrame:
1659 t.handleSettings(frame, false)
1660 case *http2.PingFrame:
1661 t.handlePing(frame)
1662 case *http2.GoAwayFrame:
1663 t.handleGoAway(frame)
1664 case *http2.WindowUpdateFrame:
1665 t.handleWindowUpdate(frame)
1666 default:
1667 if logger.V(logLevel) {
1668 logger.Errorf("transport: http2Client.reader got unhandled frame type %v.", frame)
1669 }
1670 }
1671 }
1672 }
1673
1674 func minTime(a, b time.Duration) time.Duration {
1675 if a < b {
1676 return a
1677 }
1678 return b
1679 }
1680
1681
1682 func (t *http2Client) keepalive() {
1683 p := &ping{data: [8]byte{}}
1684
1685 outstandingPing := false
1686
1687
1688 timeoutLeft := time.Duration(0)
1689
1690
1691 prevNano := time.Now().UnixNano()
1692 timer := time.NewTimer(t.kp.Time)
1693 for {
1694 select {
1695 case <-timer.C:
1696 lastRead := atomic.LoadInt64(&t.lastRead)
1697 if lastRead > prevNano {
1698
1699 outstandingPing = false
1700
1701 timer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
1702 prevNano = lastRead
1703 continue
1704 }
1705 if outstandingPing && timeoutLeft <= 0 {
1706 t.Close(connectionErrorf(true, nil, "keepalive ping failed to receive ACK within timeout"))
1707 return
1708 }
1709 t.mu.Lock()
1710 if t.state == closing {
1711
1712
1713
1714
1715
1716
1717 t.mu.Unlock()
1718 return
1719 }
1720 if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream {
1721
1722
1723
1724
1725
1726 outstandingPing = false
1727 t.kpDormant = true
1728 t.kpDormancyCond.Wait()
1729 }
1730 t.kpDormant = false
1731 t.mu.Unlock()
1732
1733
1734
1735
1736 if !outstandingPing {
1737 if channelz.IsOn() {
1738 t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
1739 }
1740 t.controlBuf.put(p)
1741 timeoutLeft = t.kp.Timeout
1742 outstandingPing = true
1743 }
1744
1745
1746
1747
1748 sleepDuration := minTime(t.kp.Time, timeoutLeft)
1749 timeoutLeft -= sleepDuration
1750 timer.Reset(sleepDuration)
1751 case <-t.ctx.Done():
1752 if !timer.Stop() {
1753 <-timer.C
1754 }
1755 return
1756 }
1757 }
1758 }
1759
1760 func (t *http2Client) Error() <-chan struct{} {
1761 return t.ctx.Done()
1762 }
1763
1764 func (t *http2Client) GoAway() <-chan struct{} {
1765 return t.goAway
1766 }
1767
1768 func (t *http2Client) socketMetrics() *channelz.EphemeralSocketMetrics {
1769 return &channelz.EphemeralSocketMetrics{
1770 LocalFlowControlWindow: int64(t.fc.getSize()),
1771 RemoteFlowControlWindow: t.getOutFlowWindow(),
1772 }
1773 }
1774
1775 func (t *http2Client) RemoteAddr() net.Addr { return t.remoteAddr }
1776
1777 func (t *http2Client) IncrMsgSent() {
1778 t.channelz.SocketMetrics.MessagesSent.Add(1)
1779 t.channelz.SocketMetrics.LastMessageSentTimestamp.Store(time.Now().UnixNano())
1780 }
1781
1782 func (t *http2Client) IncrMsgRecv() {
1783 t.channelz.SocketMetrics.MessagesReceived.Add(1)
1784 t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Store(time.Now().UnixNano())
1785 }
1786
1787 func (t *http2Client) getOutFlowWindow() int64 {
1788 resp := make(chan uint32, 1)
1789 timer := time.NewTimer(time.Second)
1790 defer timer.Stop()
1791 t.controlBuf.put(&outFlowControlSizeRequest{resp})
1792 select {
1793 case sz := <-resp:
1794 return int64(sz)
1795 case <-t.ctxDone:
1796 return -1
1797 case <-timer.C:
1798 return -2
1799 }
1800 }
1801
1802 func (t *http2Client) stateForTesting() transportState {
1803 t.mu.Lock()
1804 defer t.mu.Unlock()
1805 return t.state
1806 }
1807
View as plain text