...

Source file src/golang.org/x/net/http2/transport.go

Documentation: golang.org/x/net/http2

     1  // Copyright 2015 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Transport code.
     6  
     7  package http2
     8  
     9  import (
    10  	"bufio"
    11  	"bytes"
    12  	"compress/gzip"
    13  	"context"
    14  	"crypto/rand"
    15  	"crypto/tls"
    16  	"errors"
    17  	"fmt"
    18  	"io"
    19  	"io/fs"
    20  	"log"
    21  	"math"
    22  	"math/bits"
    23  	mathrand "math/rand"
    24  	"net"
    25  	"net/http"
    26  	"net/http/httptrace"
    27  	"net/textproto"
    28  	"sort"
    29  	"strconv"
    30  	"strings"
    31  	"sync"
    32  	"sync/atomic"
    33  	"time"
    34  
    35  	"golang.org/x/net/http/httpguts"
    36  	"golang.org/x/net/http2/hpack"
    37  	"golang.org/x/net/idna"
    38  )
    39  
    40  const (
    41  	// transportDefaultConnFlow is how many connection-level flow control
    42  	// tokens we give the server at start-up, past the default 64k.
    43  	transportDefaultConnFlow = 1 << 30
    44  
    45  	// transportDefaultStreamFlow is how many stream-level flow
    46  	// control tokens we announce to the peer, and how many bytes
    47  	// we buffer per stream.
    48  	transportDefaultStreamFlow = 4 << 20
    49  
    50  	defaultUserAgent = "Go-http-client/2.0"
    51  
    52  	// initialMaxConcurrentStreams is a connections maxConcurrentStreams until
    53  	// it's received servers initial SETTINGS frame, which corresponds with the
    54  	// spec's minimum recommended value.
    55  	initialMaxConcurrentStreams = 100
    56  
    57  	// defaultMaxConcurrentStreams is a connections default maxConcurrentStreams
    58  	// if the server doesn't include one in its initial SETTINGS frame.
    59  	defaultMaxConcurrentStreams = 1000
    60  )
    61  
    62  // Transport is an HTTP/2 Transport.
    63  //
    64  // A Transport internally caches connections to servers. It is safe
    65  // for concurrent use by multiple goroutines.
    66  type Transport struct {
    67  	// DialTLSContext specifies an optional dial function with context for
    68  	// creating TLS connections for requests.
    69  	//
    70  	// If DialTLSContext and DialTLS is nil, tls.Dial is used.
    71  	//
    72  	// If the returned net.Conn has a ConnectionState method like tls.Conn,
    73  	// it will be used to set http.Response.TLS.
    74  	DialTLSContext func(ctx context.Context, network, addr string, cfg *tls.Config) (net.Conn, error)
    75  
    76  	// DialTLS specifies an optional dial function for creating
    77  	// TLS connections for requests.
    78  	//
    79  	// If DialTLSContext and DialTLS is nil, tls.Dial is used.
    80  	//
    81  	// Deprecated: Use DialTLSContext instead, which allows the transport
    82  	// to cancel dials as soon as they are no longer needed.
    83  	// If both are set, DialTLSContext takes priority.
    84  	DialTLS func(network, addr string, cfg *tls.Config) (net.Conn, error)
    85  
    86  	// TLSClientConfig specifies the TLS configuration to use with
    87  	// tls.Client. If nil, the default configuration is used.
    88  	TLSClientConfig *tls.Config
    89  
    90  	// ConnPool optionally specifies an alternate connection pool to use.
    91  	// If nil, the default is used.
    92  	ConnPool ClientConnPool
    93  
    94  	// DisableCompression, if true, prevents the Transport from
    95  	// requesting compression with an "Accept-Encoding: gzip"
    96  	// request header when the Request contains no existing
    97  	// Accept-Encoding value. If the Transport requests gzip on
    98  	// its own and gets a gzipped response, it's transparently
    99  	// decoded in the Response.Body. However, if the user
   100  	// explicitly requested gzip it is not automatically
   101  	// uncompressed.
   102  	DisableCompression bool
   103  
   104  	// AllowHTTP, if true, permits HTTP/2 requests using the insecure,
   105  	// plain-text "http" scheme. Note that this does not enable h2c support.
   106  	AllowHTTP bool
   107  
   108  	// MaxHeaderListSize is the http2 SETTINGS_MAX_HEADER_LIST_SIZE to
   109  	// send in the initial settings frame. It is how many bytes
   110  	// of response headers are allowed. Unlike the http2 spec, zero here
   111  	// means to use a default limit (currently 10MB). If you actually
   112  	// want to advertise an unlimited value to the peer, Transport
   113  	// interprets the highest possible value here (0xffffffff or 1<<32-1)
   114  	// to mean no limit.
   115  	MaxHeaderListSize uint32
   116  
   117  	// MaxReadFrameSize is the http2 SETTINGS_MAX_FRAME_SIZE to send in the
   118  	// initial settings frame. It is the size in bytes of the largest frame
   119  	// payload that the sender is willing to receive. If 0, no setting is
   120  	// sent, and the value is provided by the peer, which should be 16384
   121  	// according to the spec:
   122  	// https://datatracker.ietf.org/doc/html/rfc7540#section-6.5.2.
   123  	// Values are bounded in the range 16k to 16M.
   124  	MaxReadFrameSize uint32
   125  
   126  	// MaxDecoderHeaderTableSize optionally specifies the http2
   127  	// SETTINGS_HEADER_TABLE_SIZE to send in the initial settings frame. It
   128  	// informs the remote endpoint of the maximum size of the header compression
   129  	// table used to decode header blocks, in octets. If zero, the default value
   130  	// of 4096 is used.
   131  	MaxDecoderHeaderTableSize uint32
   132  
   133  	// MaxEncoderHeaderTableSize optionally specifies an upper limit for the
   134  	// header compression table used for encoding request headers. Received
   135  	// SETTINGS_HEADER_TABLE_SIZE settings are capped at this limit. If zero,
   136  	// the default value of 4096 is used.
   137  	MaxEncoderHeaderTableSize uint32
   138  
   139  	// StrictMaxConcurrentStreams controls whether the server's
   140  	// SETTINGS_MAX_CONCURRENT_STREAMS should be respected
   141  	// globally. If false, new TCP connections are created to the
   142  	// server as needed to keep each under the per-connection
   143  	// SETTINGS_MAX_CONCURRENT_STREAMS limit. If true, the
   144  	// server's SETTINGS_MAX_CONCURRENT_STREAMS is interpreted as
   145  	// a global limit and callers of RoundTrip block when needed,
   146  	// waiting for their turn.
   147  	StrictMaxConcurrentStreams bool
   148  
   149  	// IdleConnTimeout is the maximum amount of time an idle
   150  	// (keep-alive) connection will remain idle before closing
   151  	// itself.
   152  	// Zero means no limit.
   153  	IdleConnTimeout time.Duration
   154  
   155  	// ReadIdleTimeout is the timeout after which a health check using ping
   156  	// frame will be carried out if no frame is received on the connection.
   157  	// Note that a ping response will is considered a received frame, so if
   158  	// there is no other traffic on the connection, the health check will
   159  	// be performed every ReadIdleTimeout interval.
   160  	// If zero, no health check is performed.
   161  	ReadIdleTimeout time.Duration
   162  
   163  	// PingTimeout is the timeout after which the connection will be closed
   164  	// if a response to Ping is not received.
   165  	// Defaults to 15s.
   166  	PingTimeout time.Duration
   167  
   168  	// WriteByteTimeout is the timeout after which the connection will be
   169  	// closed no data can be written to it. The timeout begins when data is
   170  	// available to write, and is extended whenever any bytes are written.
   171  	WriteByteTimeout time.Duration
   172  
   173  	// CountError, if non-nil, is called on HTTP/2 transport errors.
   174  	// It's intended to increment a metric for monitoring, such
   175  	// as an expvar or Prometheus metric.
   176  	// The errType consists of only ASCII word characters.
   177  	CountError func(errType string)
   178  
   179  	// t1, if non-nil, is the standard library Transport using
   180  	// this transport. Its settings are used (but not its
   181  	// RoundTrip method, etc).
   182  	t1 *http.Transport
   183  
   184  	connPoolOnce  sync.Once
   185  	connPoolOrDef ClientConnPool // non-nil version of ConnPool
   186  
   187  	*transportTestHooks
   188  }
   189  
   190  // Hook points used for testing.
   191  // Outside of tests, t.transportTestHooks is nil and these all have minimal implementations.
   192  // Inside tests, see the testSyncHooks function docs.
   193  
   194  type transportTestHooks struct {
   195  	newclientconn func(*ClientConn)
   196  	group         synctestGroupInterface
   197  }
   198  
   199  func (t *Transport) markNewGoroutine() {
   200  	if t != nil && t.transportTestHooks != nil {
   201  		t.transportTestHooks.group.Join()
   202  	}
   203  }
   204  
   205  func (t *Transport) now() time.Time {
   206  	if t != nil && t.transportTestHooks != nil {
   207  		return t.transportTestHooks.group.Now()
   208  	}
   209  	return time.Now()
   210  }
   211  
   212  func (t *Transport) timeSince(when time.Time) time.Duration {
   213  	if t != nil && t.transportTestHooks != nil {
   214  		return t.now().Sub(when)
   215  	}
   216  	return time.Since(when)
   217  }
   218  
   219  // newTimer creates a new time.Timer, or a synthetic timer in tests.
   220  func (t *Transport) newTimer(d time.Duration) timer {
   221  	if t.transportTestHooks != nil {
   222  		return t.transportTestHooks.group.NewTimer(d)
   223  	}
   224  	return timeTimer{time.NewTimer(d)}
   225  }
   226  
   227  // afterFunc creates a new time.AfterFunc timer, or a synthetic timer in tests.
   228  func (t *Transport) afterFunc(d time.Duration, f func()) timer {
   229  	if t.transportTestHooks != nil {
   230  		return t.transportTestHooks.group.AfterFunc(d, f)
   231  	}
   232  	return timeTimer{time.AfterFunc(d, f)}
   233  }
   234  
   235  func (t *Transport) contextWithTimeout(ctx context.Context, d time.Duration) (context.Context, context.CancelFunc) {
   236  	if t.transportTestHooks != nil {
   237  		return t.transportTestHooks.group.ContextWithTimeout(ctx, d)
   238  	}
   239  	return context.WithTimeout(ctx, d)
   240  }
   241  
   242  func (t *Transport) maxHeaderListSize() uint32 {
   243  	n := int64(t.MaxHeaderListSize)
   244  	if t.t1 != nil && t.t1.MaxResponseHeaderBytes != 0 {
   245  		n = t.t1.MaxResponseHeaderBytes
   246  		if n > 0 {
   247  			n = adjustHTTP1MaxHeaderSize(n)
   248  		}
   249  	}
   250  	if n <= 0 {
   251  		return 10 << 20
   252  	}
   253  	if n >= 0xffffffff {
   254  		return 0
   255  	}
   256  	return uint32(n)
   257  }
   258  
   259  func (t *Transport) disableCompression() bool {
   260  	return t.DisableCompression || (t.t1 != nil && t.t1.DisableCompression)
   261  }
   262  
   263  // ConfigureTransport configures a net/http HTTP/1 Transport to use HTTP/2.
   264  // It returns an error if t1 has already been HTTP/2-enabled.
   265  //
   266  // Use ConfigureTransports instead to configure the HTTP/2 Transport.
   267  func ConfigureTransport(t1 *http.Transport) error {
   268  	_, err := ConfigureTransports(t1)
   269  	return err
   270  }
   271  
   272  // ConfigureTransports configures a net/http HTTP/1 Transport to use HTTP/2.
   273  // It returns a new HTTP/2 Transport for further configuration.
   274  // It returns an error if t1 has already been HTTP/2-enabled.
   275  func ConfigureTransports(t1 *http.Transport) (*Transport, error) {
   276  	return configureTransports(t1)
   277  }
   278  
   279  func configureTransports(t1 *http.Transport) (*Transport, error) {
   280  	connPool := new(clientConnPool)
   281  	t2 := &Transport{
   282  		ConnPool: noDialClientConnPool{connPool},
   283  		t1:       t1,
   284  	}
   285  	connPool.t = t2
   286  	if err := registerHTTPSProtocol(t1, noDialH2RoundTripper{t2}); err != nil {
   287  		return nil, err
   288  	}
   289  	if t1.TLSClientConfig == nil {
   290  		t1.TLSClientConfig = new(tls.Config)
   291  	}
   292  	if !strSliceContains(t1.TLSClientConfig.NextProtos, "h2") {
   293  		t1.TLSClientConfig.NextProtos = append([]string{"h2"}, t1.TLSClientConfig.NextProtos...)
   294  	}
   295  	if !strSliceContains(t1.TLSClientConfig.NextProtos, "http/1.1") {
   296  		t1.TLSClientConfig.NextProtos = append(t1.TLSClientConfig.NextProtos, "http/1.1")
   297  	}
   298  	upgradeFn := func(scheme, authority string, c net.Conn) http.RoundTripper {
   299  		addr := authorityAddr(scheme, authority)
   300  		if used, err := connPool.addConnIfNeeded(addr, t2, c); err != nil {
   301  			go c.Close()
   302  			return erringRoundTripper{err}
   303  		} else if !used {
   304  			// Turns out we don't need this c.
   305  			// For example, two goroutines made requests to the same host
   306  			// at the same time, both kicking off TCP dials. (since protocol
   307  			// was unknown)
   308  			go c.Close()
   309  		}
   310  		if scheme == "http" {
   311  			return (*unencryptedTransport)(t2)
   312  		}
   313  		return t2
   314  	}
   315  	if t1.TLSNextProto == nil {
   316  		t1.TLSNextProto = make(map[string]func(string, *tls.Conn) http.RoundTripper)
   317  	}
   318  	t1.TLSNextProto[NextProtoTLS] = func(authority string, c *tls.Conn) http.RoundTripper {
   319  		return upgradeFn("https", authority, c)
   320  	}
   321  	// The "unencrypted_http2" TLSNextProto key is used to pass off non-TLS HTTP/2 conns.
   322  	t1.TLSNextProto[nextProtoUnencryptedHTTP2] = func(authority string, c *tls.Conn) http.RoundTripper {
   323  		nc, err := unencryptedNetConnFromTLSConn(c)
   324  		if err != nil {
   325  			go c.Close()
   326  			return erringRoundTripper{err}
   327  		}
   328  		return upgradeFn("http", authority, nc)
   329  	}
   330  	return t2, nil
   331  }
   332  
   333  // unencryptedTransport is a Transport with a RoundTrip method that
   334  // always permits http:// URLs.
   335  type unencryptedTransport Transport
   336  
   337  func (t *unencryptedTransport) RoundTrip(req *http.Request) (*http.Response, error) {
   338  	return (*Transport)(t).RoundTripOpt(req, RoundTripOpt{allowHTTP: true})
   339  }
   340  
   341  func (t *Transport) connPool() ClientConnPool {
   342  	t.connPoolOnce.Do(t.initConnPool)
   343  	return t.connPoolOrDef
   344  }
   345  
   346  func (t *Transport) initConnPool() {
   347  	if t.ConnPool != nil {
   348  		t.connPoolOrDef = t.ConnPool
   349  	} else {
   350  		t.connPoolOrDef = &clientConnPool{t: t}
   351  	}
   352  }
   353  
   354  // ClientConn is the state of a single HTTP/2 client connection to an
   355  // HTTP/2 server.
   356  type ClientConn struct {
   357  	t             *Transport
   358  	tconn         net.Conn             // usually *tls.Conn, except specialized impls
   359  	tlsState      *tls.ConnectionState // nil only for specialized impls
   360  	atomicReused  uint32               // whether conn is being reused; atomic
   361  	singleUse     bool                 // whether being used for a single http.Request
   362  	getConnCalled bool                 // used by clientConnPool
   363  
   364  	// readLoop goroutine fields:
   365  	readerDone chan struct{} // closed on error
   366  	readerErr  error         // set before readerDone is closed
   367  
   368  	idleTimeout time.Duration // or 0 for never
   369  	idleTimer   timer
   370  
   371  	mu               sync.Mutex // guards following
   372  	cond             *sync.Cond // hold mu; broadcast on flow/closed changes
   373  	flow             outflow    // our conn-level flow control quota (cs.outflow is per stream)
   374  	inflow           inflow     // peer's conn-level flow control
   375  	doNotReuse       bool       // whether conn is marked to not be reused for any future requests
   376  	closing          bool
   377  	closed           bool
   378  	seenSettings     bool                     // true if we've seen a settings frame, false otherwise
   379  	seenSettingsChan chan struct{}            // closed when seenSettings is true or frame reading fails
   380  	wantSettingsAck  bool                     // we sent a SETTINGS frame and haven't heard back
   381  	goAway           *GoAwayFrame             // if non-nil, the GoAwayFrame we received
   382  	goAwayDebug      string                   // goAway frame's debug data, retained as a string
   383  	streams          map[uint32]*clientStream // client-initiated
   384  	streamsReserved  int                      // incr by ReserveNewRequest; decr on RoundTrip
   385  	nextStreamID     uint32
   386  	pendingRequests  int                       // requests blocked and waiting to be sent because len(streams) == maxConcurrentStreams
   387  	pings            map[[8]byte]chan struct{} // in flight ping data to notification channel
   388  	br               *bufio.Reader
   389  	lastActive       time.Time
   390  	lastIdle         time.Time // time last idle
   391  	// Settings from peer: (also guarded by wmu)
   392  	maxFrameSize                uint32
   393  	maxConcurrentStreams        uint32
   394  	peerMaxHeaderListSize       uint64
   395  	peerMaxHeaderTableSize      uint32
   396  	initialWindowSize           uint32
   397  	initialStreamRecvWindowSize int32
   398  	readIdleTimeout             time.Duration
   399  	pingTimeout                 time.Duration
   400  	extendedConnectAllowed      bool
   401  
   402  	// rstStreamPingsBlocked works around an unfortunate gRPC behavior.
   403  	// gRPC strictly limits the number of PING frames that it will receive.
   404  	// The default is two pings per two hours, but the limit resets every time
   405  	// the gRPC endpoint sends a HEADERS or DATA frame. See golang/go#70575.
   406  	//
   407  	// rstStreamPingsBlocked is set after receiving a response to a PING frame
   408  	// bundled with an RST_STREAM (see pendingResets below), and cleared after
   409  	// receiving a HEADERS or DATA frame.
   410  	rstStreamPingsBlocked bool
   411  
   412  	// pendingResets is the number of RST_STREAM frames we have sent to the peer,
   413  	// without confirming that the peer has received them. When we send a RST_STREAM,
   414  	// we bundle it with a PING frame, unless a PING is already in flight. We count
   415  	// the reset stream against the connection's concurrency limit until we get
   416  	// a PING response. This limits the number of requests we'll try to send to a
   417  	// completely unresponsive connection.
   418  	pendingResets int
   419  
   420  	// reqHeaderMu is a 1-element semaphore channel controlling access to sending new requests.
   421  	// Write to reqHeaderMu to lock it, read from it to unlock.
   422  	// Lock reqmu BEFORE mu or wmu.
   423  	reqHeaderMu chan struct{}
   424  
   425  	// wmu is held while writing.
   426  	// Acquire BEFORE mu when holding both, to avoid blocking mu on network writes.
   427  	// Only acquire both at the same time when changing peer settings.
   428  	wmu  sync.Mutex
   429  	bw   *bufio.Writer
   430  	fr   *Framer
   431  	werr error        // first write error that has occurred
   432  	hbuf bytes.Buffer // HPACK encoder writes into this
   433  	henc *hpack.Encoder
   434  }
   435  
   436  // clientStream is the state for a single HTTP/2 stream. One of these
   437  // is created for each Transport.RoundTrip call.
   438  type clientStream struct {
   439  	cc *ClientConn
   440  
   441  	// Fields of Request that we may access even after the response body is closed.
   442  	ctx       context.Context
   443  	reqCancel <-chan struct{}
   444  
   445  	trace         *httptrace.ClientTrace // or nil
   446  	ID            uint32
   447  	bufPipe       pipe // buffered pipe with the flow-controlled response payload
   448  	requestedGzip bool
   449  	isHead        bool
   450  
   451  	abortOnce sync.Once
   452  	abort     chan struct{} // closed to signal stream should end immediately
   453  	abortErr  error         // set if abort is closed
   454  
   455  	peerClosed chan struct{} // closed when the peer sends an END_STREAM flag
   456  	donec      chan struct{} // closed after the stream is in the closed state
   457  	on100      chan struct{} // buffered; written to if a 100 is received
   458  
   459  	respHeaderRecv chan struct{}  // closed when headers are received
   460  	res            *http.Response // set if respHeaderRecv is closed
   461  
   462  	flow        outflow // guarded by cc.mu
   463  	inflow      inflow  // guarded by cc.mu
   464  	bytesRemain int64   // -1 means unknown; owned by transportResponseBody.Read
   465  	readErr     error   // sticky read error; owned by transportResponseBody.Read
   466  
   467  	reqBody              io.ReadCloser
   468  	reqBodyContentLength int64         // -1 means unknown
   469  	reqBodyClosed        chan struct{} // guarded by cc.mu; non-nil on Close, closed when done
   470  
   471  	// owned by writeRequest:
   472  	sentEndStream bool // sent an END_STREAM flag to the peer
   473  	sentHeaders   bool
   474  
   475  	// owned by clientConnReadLoop:
   476  	firstByte       bool  // got the first response byte
   477  	pastHeaders     bool  // got first MetaHeadersFrame (actual headers)
   478  	pastTrailers    bool  // got optional second MetaHeadersFrame (trailers)
   479  	readClosed      bool  // peer sent an END_STREAM flag
   480  	readAborted     bool  // read loop reset the stream
   481  	totalHeaderSize int64 // total size of 1xx headers seen
   482  
   483  	trailer    http.Header  // accumulated trailers
   484  	resTrailer *http.Header // client's Response.Trailer
   485  }
   486  
   487  var got1xxFuncForTests func(int, textproto.MIMEHeader) error
   488  
   489  // get1xxTraceFunc returns the value of request's httptrace.ClientTrace.Got1xxResponse func,
   490  // if any. It returns nil if not set or if the Go version is too old.
   491  func (cs *clientStream) get1xxTraceFunc() func(int, textproto.MIMEHeader) error {
   492  	if fn := got1xxFuncForTests; fn != nil {
   493  		return fn
   494  	}
   495  	return traceGot1xxResponseFunc(cs.trace)
   496  }
   497  
   498  func (cs *clientStream) abortStream(err error) {
   499  	cs.cc.mu.Lock()
   500  	defer cs.cc.mu.Unlock()
   501  	cs.abortStreamLocked(err)
   502  }
   503  
   504  func (cs *clientStream) abortStreamLocked(err error) {
   505  	cs.abortOnce.Do(func() {
   506  		cs.abortErr = err
   507  		close(cs.abort)
   508  	})
   509  	if cs.reqBody != nil {
   510  		cs.closeReqBodyLocked()
   511  	}
   512  	// TODO(dneil): Clean up tests where cs.cc.cond is nil.
   513  	if cs.cc.cond != nil {
   514  		// Wake up writeRequestBody if it is waiting on flow control.
   515  		cs.cc.cond.Broadcast()
   516  	}
   517  }
   518  
   519  func (cs *clientStream) abortRequestBodyWrite() {
   520  	cc := cs.cc
   521  	cc.mu.Lock()
   522  	defer cc.mu.Unlock()
   523  	if cs.reqBody != nil && cs.reqBodyClosed == nil {
   524  		cs.closeReqBodyLocked()
   525  		cc.cond.Broadcast()
   526  	}
   527  }
   528  
   529  func (cs *clientStream) closeReqBodyLocked() {
   530  	if cs.reqBodyClosed != nil {
   531  		return
   532  	}
   533  	cs.reqBodyClosed = make(chan struct{})
   534  	reqBodyClosed := cs.reqBodyClosed
   535  	go func() {
   536  		cs.cc.t.markNewGoroutine()
   537  		cs.reqBody.Close()
   538  		close(reqBodyClosed)
   539  	}()
   540  }
   541  
   542  type stickyErrWriter struct {
   543  	group   synctestGroupInterface
   544  	conn    net.Conn
   545  	timeout time.Duration
   546  	err     *error
   547  }
   548  
   549  func (sew stickyErrWriter) Write(p []byte) (n int, err error) {
   550  	if *sew.err != nil {
   551  		return 0, *sew.err
   552  	}
   553  	n, err = writeWithByteTimeout(sew.group, sew.conn, sew.timeout, p)
   554  	*sew.err = err
   555  	return n, err
   556  }
   557  
   558  // noCachedConnError is the concrete type of ErrNoCachedConn, which
   559  // needs to be detected by net/http regardless of whether it's its
   560  // bundled version (in h2_bundle.go with a rewritten type name) or
   561  // from a user's x/net/http2. As such, as it has a unique method name
   562  // (IsHTTP2NoCachedConnError) that net/http sniffs for via func
   563  // isNoCachedConnError.
   564  type noCachedConnError struct{}
   565  
   566  func (noCachedConnError) IsHTTP2NoCachedConnError() {}
   567  func (noCachedConnError) Error() string             { return "http2: no cached connection was available" }
   568  
   569  // isNoCachedConnError reports whether err is of type noCachedConnError
   570  // or its equivalent renamed type in net/http2's h2_bundle.go. Both types
   571  // may coexist in the same running program.
   572  func isNoCachedConnError(err error) bool {
   573  	_, ok := err.(interface{ IsHTTP2NoCachedConnError() })
   574  	return ok
   575  }
   576  
   577  var ErrNoCachedConn error = noCachedConnError{}
   578  
   579  // RoundTripOpt are options for the Transport.RoundTripOpt method.
   580  type RoundTripOpt struct {
   581  	// OnlyCachedConn controls whether RoundTripOpt may
   582  	// create a new TCP connection. If set true and
   583  	// no cached connection is available, RoundTripOpt
   584  	// will return ErrNoCachedConn.
   585  	OnlyCachedConn bool
   586  
   587  	allowHTTP bool // allow http:// URLs
   588  }
   589  
   590  func (t *Transport) RoundTrip(req *http.Request) (*http.Response, error) {
   591  	return t.RoundTripOpt(req, RoundTripOpt{})
   592  }
   593  
   594  // authorityAddr returns a given authority (a host/IP, or host:port / ip:port)
   595  // and returns a host:port. The port 443 is added if needed.
   596  func authorityAddr(scheme string, authority string) (addr string) {
   597  	host, port, err := net.SplitHostPort(authority)
   598  	if err != nil { // authority didn't have a port
   599  		host = authority
   600  		port = ""
   601  	}
   602  	if port == "" { // authority's port was empty
   603  		port = "443"
   604  		if scheme == "http" {
   605  			port = "80"
   606  		}
   607  	}
   608  	if a, err := idna.ToASCII(host); err == nil {
   609  		host = a
   610  	}
   611  	// IPv6 address literal, without a port:
   612  	if strings.HasPrefix(host, "[") && strings.HasSuffix(host, "]") {
   613  		return host + ":" + port
   614  	}
   615  	return net.JoinHostPort(host, port)
   616  }
   617  
   618  // RoundTripOpt is like RoundTrip, but takes options.
   619  func (t *Transport) RoundTripOpt(req *http.Request, opt RoundTripOpt) (*http.Response, error) {
   620  	switch req.URL.Scheme {
   621  	case "https":
   622  		// Always okay.
   623  	case "http":
   624  		if !t.AllowHTTP && !opt.allowHTTP {
   625  			return nil, errors.New("http2: unencrypted HTTP/2 not enabled")
   626  		}
   627  	default:
   628  		return nil, errors.New("http2: unsupported scheme")
   629  	}
   630  
   631  	addr := authorityAddr(req.URL.Scheme, req.URL.Host)
   632  	for retry := 0; ; retry++ {
   633  		cc, err := t.connPool().GetClientConn(req, addr)
   634  		if err != nil {
   635  			t.vlogf("http2: Transport failed to get client conn for %s: %v", addr, err)
   636  			return nil, err
   637  		}
   638  		reused := !atomic.CompareAndSwapUint32(&cc.atomicReused, 0, 1)
   639  		traceGotConn(req, cc, reused)
   640  		res, err := cc.RoundTrip(req)
   641  		if err != nil && retry <= 6 {
   642  			roundTripErr := err
   643  			if req, err = shouldRetryRequest(req, err); err == nil {
   644  				// After the first retry, do exponential backoff with 10% jitter.
   645  				if retry == 0 {
   646  					t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
   647  					continue
   648  				}
   649  				backoff := float64(uint(1) << (uint(retry) - 1))
   650  				backoff += backoff * (0.1 * mathrand.Float64())
   651  				d := time.Second * time.Duration(backoff)
   652  				tm := t.newTimer(d)
   653  				select {
   654  				case <-tm.C():
   655  					t.vlogf("RoundTrip retrying after failure: %v", roundTripErr)
   656  					continue
   657  				case <-req.Context().Done():
   658  					tm.Stop()
   659  					err = req.Context().Err()
   660  				}
   661  			}
   662  		}
   663  		if err == errClientConnNotEstablished {
   664  			// This ClientConn was created recently,
   665  			// this is the first request to use it,
   666  			// and the connection is closed and not usable.
   667  			//
   668  			// In this state, cc.idleTimer will remove the conn from the pool
   669  			// when it fires. Stop the timer and remove it here so future requests
   670  			// won't try to use this connection.
   671  			//
   672  			// If the timer has already fired and we're racing it, the redundant
   673  			// call to MarkDead is harmless.
   674  			if cc.idleTimer != nil {
   675  				cc.idleTimer.Stop()
   676  			}
   677  			t.connPool().MarkDead(cc)
   678  		}
   679  		if err != nil {
   680  			t.vlogf("RoundTrip failure: %v", err)
   681  			return nil, err
   682  		}
   683  		return res, nil
   684  	}
   685  }
   686  
   687  // CloseIdleConnections closes any connections which were previously
   688  // connected from previous requests but are now sitting idle.
   689  // It does not interrupt any connections currently in use.
   690  func (t *Transport) CloseIdleConnections() {
   691  	if cp, ok := t.connPool().(clientConnPoolIdleCloser); ok {
   692  		cp.closeIdleConnections()
   693  	}
   694  }
   695  
   696  var (
   697  	errClientConnClosed         = errors.New("http2: client conn is closed")
   698  	errClientConnUnusable       = errors.New("http2: client conn not usable")
   699  	errClientConnNotEstablished = errors.New("http2: client conn could not be established")
   700  	errClientConnGotGoAway      = errors.New("http2: Transport received Server's graceful shutdown GOAWAY")
   701  )
   702  
   703  // shouldRetryRequest is called by RoundTrip when a request fails to get
   704  // response headers. It is always called with a non-nil error.
   705  // It returns either a request to retry (either the same request, or a
   706  // modified clone), or an error if the request can't be replayed.
   707  func shouldRetryRequest(req *http.Request, err error) (*http.Request, error) {
   708  	if !canRetryError(err) {
   709  		return nil, err
   710  	}
   711  	// If the Body is nil (or http.NoBody), it's safe to reuse
   712  	// this request and its Body.
   713  	if req.Body == nil || req.Body == http.NoBody {
   714  		return req, nil
   715  	}
   716  
   717  	// If the request body can be reset back to its original
   718  	// state via the optional req.GetBody, do that.
   719  	if req.GetBody != nil {
   720  		body, err := req.GetBody()
   721  		if err != nil {
   722  			return nil, err
   723  		}
   724  		newReq := *req
   725  		newReq.Body = body
   726  		return &newReq, nil
   727  	}
   728  
   729  	// The Request.Body can't reset back to the beginning, but we
   730  	// don't seem to have started to read from it yet, so reuse
   731  	// the request directly.
   732  	if err == errClientConnUnusable {
   733  		return req, nil
   734  	}
   735  
   736  	return nil, fmt.Errorf("http2: Transport: cannot retry err [%v] after Request.Body was written; define Request.GetBody to avoid this error", err)
   737  }
   738  
   739  func canRetryError(err error) bool {
   740  	if err == errClientConnUnusable || err == errClientConnGotGoAway {
   741  		return true
   742  	}
   743  	if se, ok := err.(StreamError); ok {
   744  		if se.Code == ErrCodeProtocol && se.Cause == errFromPeer {
   745  			// See golang/go#47635, golang/go#42777
   746  			return true
   747  		}
   748  		return se.Code == ErrCodeRefusedStream
   749  	}
   750  	return false
   751  }
   752  
   753  func (t *Transport) dialClientConn(ctx context.Context, addr string, singleUse bool) (*ClientConn, error) {
   754  	if t.transportTestHooks != nil {
   755  		return t.newClientConn(nil, singleUse)
   756  	}
   757  	host, _, err := net.SplitHostPort(addr)
   758  	if err != nil {
   759  		return nil, err
   760  	}
   761  	tconn, err := t.dialTLS(ctx, "tcp", addr, t.newTLSConfig(host))
   762  	if err != nil {
   763  		return nil, err
   764  	}
   765  	return t.newClientConn(tconn, singleUse)
   766  }
   767  
   768  func (t *Transport) newTLSConfig(host string) *tls.Config {
   769  	cfg := new(tls.Config)
   770  	if t.TLSClientConfig != nil {
   771  		*cfg = *t.TLSClientConfig.Clone()
   772  	}
   773  	if !strSliceContains(cfg.NextProtos, NextProtoTLS) {
   774  		cfg.NextProtos = append([]string{NextProtoTLS}, cfg.NextProtos...)
   775  	}
   776  	if cfg.ServerName == "" {
   777  		cfg.ServerName = host
   778  	}
   779  	return cfg
   780  }
   781  
   782  func (t *Transport) dialTLS(ctx context.Context, network, addr string, tlsCfg *tls.Config) (net.Conn, error) {
   783  	if t.DialTLSContext != nil {
   784  		return t.DialTLSContext(ctx, network, addr, tlsCfg)
   785  	} else if t.DialTLS != nil {
   786  		return t.DialTLS(network, addr, tlsCfg)
   787  	}
   788  
   789  	tlsCn, err := t.dialTLSWithContext(ctx, network, addr, tlsCfg)
   790  	if err != nil {
   791  		return nil, err
   792  	}
   793  	state := tlsCn.ConnectionState()
   794  	if p := state.NegotiatedProtocol; p != NextProtoTLS {
   795  		return nil, fmt.Errorf("http2: unexpected ALPN protocol %q; want %q", p, NextProtoTLS)
   796  	}
   797  	if !state.NegotiatedProtocolIsMutual {
   798  		return nil, errors.New("http2: could not negotiate protocol mutually")
   799  	}
   800  	return tlsCn, nil
   801  }
   802  
   803  // disableKeepAlives reports whether connections should be closed as
   804  // soon as possible after handling the first request.
   805  func (t *Transport) disableKeepAlives() bool {
   806  	return t.t1 != nil && t.t1.DisableKeepAlives
   807  }
   808  
   809  func (t *Transport) expectContinueTimeout() time.Duration {
   810  	if t.t1 == nil {
   811  		return 0
   812  	}
   813  	return t.t1.ExpectContinueTimeout
   814  }
   815  
   816  func (t *Transport) NewClientConn(c net.Conn) (*ClientConn, error) {
   817  	return t.newClientConn(c, t.disableKeepAlives())
   818  }
   819  
   820  func (t *Transport) newClientConn(c net.Conn, singleUse bool) (*ClientConn, error) {
   821  	conf := configFromTransport(t)
   822  	cc := &ClientConn{
   823  		t:                           t,
   824  		tconn:                       c,
   825  		readerDone:                  make(chan struct{}),
   826  		nextStreamID:                1,
   827  		maxFrameSize:                16 << 10, // spec default
   828  		initialWindowSize:           65535,    // spec default
   829  		initialStreamRecvWindowSize: conf.MaxUploadBufferPerStream,
   830  		maxConcurrentStreams:        initialMaxConcurrentStreams, // "infinite", per spec. Use a smaller value until we have received server settings.
   831  		peerMaxHeaderListSize:       0xffffffffffffffff,          // "infinite", per spec. Use 2^64-1 instead.
   832  		streams:                     make(map[uint32]*clientStream),
   833  		singleUse:                   singleUse,
   834  		seenSettingsChan:            make(chan struct{}),
   835  		wantSettingsAck:             true,
   836  		readIdleTimeout:             conf.SendPingTimeout,
   837  		pingTimeout:                 conf.PingTimeout,
   838  		pings:                       make(map[[8]byte]chan struct{}),
   839  		reqHeaderMu:                 make(chan struct{}, 1),
   840  		lastActive:                  t.now(),
   841  	}
   842  	var group synctestGroupInterface
   843  	if t.transportTestHooks != nil {
   844  		t.markNewGoroutine()
   845  		t.transportTestHooks.newclientconn(cc)
   846  		c = cc.tconn
   847  		group = t.group
   848  	}
   849  	if VerboseLogs {
   850  		t.vlogf("http2: Transport creating client conn %p to %v", cc, c.RemoteAddr())
   851  	}
   852  
   853  	cc.cond = sync.NewCond(&cc.mu)
   854  	cc.flow.add(int32(initialWindowSize))
   855  
   856  	// TODO: adjust this writer size to account for frame size +
   857  	// MTU + crypto/tls record padding.
   858  	cc.bw = bufio.NewWriter(stickyErrWriter{
   859  		group:   group,
   860  		conn:    c,
   861  		timeout: conf.WriteByteTimeout,
   862  		err:     &cc.werr,
   863  	})
   864  	cc.br = bufio.NewReader(c)
   865  	cc.fr = NewFramer(cc.bw, cc.br)
   866  	cc.fr.SetMaxReadFrameSize(conf.MaxReadFrameSize)
   867  	if t.CountError != nil {
   868  		cc.fr.countError = t.CountError
   869  	}
   870  	maxHeaderTableSize := conf.MaxDecoderHeaderTableSize
   871  	cc.fr.ReadMetaHeaders = hpack.NewDecoder(maxHeaderTableSize, nil)
   872  	cc.fr.MaxHeaderListSize = t.maxHeaderListSize()
   873  
   874  	cc.henc = hpack.NewEncoder(&cc.hbuf)
   875  	cc.henc.SetMaxDynamicTableSizeLimit(conf.MaxEncoderHeaderTableSize)
   876  	cc.peerMaxHeaderTableSize = initialHeaderTableSize
   877  
   878  	if cs, ok := c.(connectionStater); ok {
   879  		state := cs.ConnectionState()
   880  		cc.tlsState = &state
   881  	}
   882  
   883  	initialSettings := []Setting{
   884  		{ID: SettingEnablePush, Val: 0},
   885  		{ID: SettingInitialWindowSize, Val: uint32(cc.initialStreamRecvWindowSize)},
   886  	}
   887  	initialSettings = append(initialSettings, Setting{ID: SettingMaxFrameSize, Val: conf.MaxReadFrameSize})
   888  	if max := t.maxHeaderListSize(); max != 0 {
   889  		initialSettings = append(initialSettings, Setting{ID: SettingMaxHeaderListSize, Val: max})
   890  	}
   891  	if maxHeaderTableSize != initialHeaderTableSize {
   892  		initialSettings = append(initialSettings, Setting{ID: SettingHeaderTableSize, Val: maxHeaderTableSize})
   893  	}
   894  
   895  	cc.bw.Write(clientPreface)
   896  	cc.fr.WriteSettings(initialSettings...)
   897  	cc.fr.WriteWindowUpdate(0, uint32(conf.MaxUploadBufferPerConnection))
   898  	cc.inflow.init(conf.MaxUploadBufferPerConnection + initialWindowSize)
   899  	cc.bw.Flush()
   900  	if cc.werr != nil {
   901  		cc.Close()
   902  		return nil, cc.werr
   903  	}
   904  
   905  	// Start the idle timer after the connection is fully initialized.
   906  	if d := t.idleConnTimeout(); d != 0 {
   907  		cc.idleTimeout = d
   908  		cc.idleTimer = t.afterFunc(d, cc.onIdleTimeout)
   909  	}
   910  
   911  	go cc.readLoop()
   912  	return cc, nil
   913  }
   914  
   915  func (cc *ClientConn) healthCheck() {
   916  	pingTimeout := cc.pingTimeout
   917  	// We don't need to periodically ping in the health check, because the readLoop of ClientConn will
   918  	// trigger the healthCheck again if there is no frame received.
   919  	ctx, cancel := cc.t.contextWithTimeout(context.Background(), pingTimeout)
   920  	defer cancel()
   921  	cc.vlogf("http2: Transport sending health check")
   922  	err := cc.Ping(ctx)
   923  	if err != nil {
   924  		cc.vlogf("http2: Transport health check failure: %v", err)
   925  		cc.closeForLostPing()
   926  	} else {
   927  		cc.vlogf("http2: Transport health check success")
   928  	}
   929  }
   930  
   931  // SetDoNotReuse marks cc as not reusable for future HTTP requests.
   932  func (cc *ClientConn) SetDoNotReuse() {
   933  	cc.mu.Lock()
   934  	defer cc.mu.Unlock()
   935  	cc.doNotReuse = true
   936  }
   937  
   938  func (cc *ClientConn) setGoAway(f *GoAwayFrame) {
   939  	cc.mu.Lock()
   940  	defer cc.mu.Unlock()
   941  
   942  	old := cc.goAway
   943  	cc.goAway = f
   944  
   945  	// Merge the previous and current GoAway error frames.
   946  	if cc.goAwayDebug == "" {
   947  		cc.goAwayDebug = string(f.DebugData())
   948  	}
   949  	if old != nil && old.ErrCode != ErrCodeNo {
   950  		cc.goAway.ErrCode = old.ErrCode
   951  	}
   952  	last := f.LastStreamID
   953  	for streamID, cs := range cc.streams {
   954  		if streamID <= last {
   955  			// The server's GOAWAY indicates that it received this stream.
   956  			// It will either finish processing it, or close the connection
   957  			// without doing so. Either way, leave the stream alone for now.
   958  			continue
   959  		}
   960  		if streamID == 1 && cc.goAway.ErrCode != ErrCodeNo {
   961  			// Don't retry the first stream on a connection if we get a non-NO error.
   962  			// If the server is sending an error on a new connection,
   963  			// retrying the request on a new one probably isn't going to work.
   964  			cs.abortStreamLocked(fmt.Errorf("http2: Transport received GOAWAY from server ErrCode:%v", cc.goAway.ErrCode))
   965  		} else {
   966  			// Aborting the stream with errClentConnGotGoAway indicates that
   967  			// the request should be retried on a new connection.
   968  			cs.abortStreamLocked(errClientConnGotGoAway)
   969  		}
   970  	}
   971  }
   972  
   973  // CanTakeNewRequest reports whether the connection can take a new request,
   974  // meaning it has not been closed or received or sent a GOAWAY.
   975  //
   976  // If the caller is going to immediately make a new request on this
   977  // connection, use ReserveNewRequest instead.
   978  func (cc *ClientConn) CanTakeNewRequest() bool {
   979  	cc.mu.Lock()
   980  	defer cc.mu.Unlock()
   981  	return cc.canTakeNewRequestLocked()
   982  }
   983  
   984  // ReserveNewRequest is like CanTakeNewRequest but also reserves a
   985  // concurrent stream in cc. The reservation is decremented on the
   986  // next call to RoundTrip.
   987  func (cc *ClientConn) ReserveNewRequest() bool {
   988  	cc.mu.Lock()
   989  	defer cc.mu.Unlock()
   990  	if st := cc.idleStateLocked(); !st.canTakeNewRequest {
   991  		return false
   992  	}
   993  	cc.streamsReserved++
   994  	return true
   995  }
   996  
   997  // ClientConnState describes the state of a ClientConn.
   998  type ClientConnState struct {
   999  	// Closed is whether the connection is closed.
  1000  	Closed bool
  1001  
  1002  	// Closing is whether the connection is in the process of
  1003  	// closing. It may be closing due to shutdown, being a
  1004  	// single-use connection, being marked as DoNotReuse, or
  1005  	// having received a GOAWAY frame.
  1006  	Closing bool
  1007  
  1008  	// StreamsActive is how many streams are active.
  1009  	StreamsActive int
  1010  
  1011  	// StreamsReserved is how many streams have been reserved via
  1012  	// ClientConn.ReserveNewRequest.
  1013  	StreamsReserved int
  1014  
  1015  	// StreamsPending is how many requests have been sent in excess
  1016  	// of the peer's advertised MaxConcurrentStreams setting and
  1017  	// are waiting for other streams to complete.
  1018  	StreamsPending int
  1019  
  1020  	// MaxConcurrentStreams is how many concurrent streams the
  1021  	// peer advertised as acceptable. Zero means no SETTINGS
  1022  	// frame has been received yet.
  1023  	MaxConcurrentStreams uint32
  1024  
  1025  	// LastIdle, if non-zero, is when the connection last
  1026  	// transitioned to idle state.
  1027  	LastIdle time.Time
  1028  }
  1029  
  1030  // State returns a snapshot of cc's state.
  1031  func (cc *ClientConn) State() ClientConnState {
  1032  	cc.wmu.Lock()
  1033  	maxConcurrent := cc.maxConcurrentStreams
  1034  	if !cc.seenSettings {
  1035  		maxConcurrent = 0
  1036  	}
  1037  	cc.wmu.Unlock()
  1038  
  1039  	cc.mu.Lock()
  1040  	defer cc.mu.Unlock()
  1041  	return ClientConnState{
  1042  		Closed:               cc.closed,
  1043  		Closing:              cc.closing || cc.singleUse || cc.doNotReuse || cc.goAway != nil,
  1044  		StreamsActive:        len(cc.streams) + cc.pendingResets,
  1045  		StreamsReserved:      cc.streamsReserved,
  1046  		StreamsPending:       cc.pendingRequests,
  1047  		LastIdle:             cc.lastIdle,
  1048  		MaxConcurrentStreams: maxConcurrent,
  1049  	}
  1050  }
  1051  
  1052  // clientConnIdleState describes the suitability of a client
  1053  // connection to initiate a new RoundTrip request.
  1054  type clientConnIdleState struct {
  1055  	canTakeNewRequest bool
  1056  }
  1057  
  1058  func (cc *ClientConn) idleState() clientConnIdleState {
  1059  	cc.mu.Lock()
  1060  	defer cc.mu.Unlock()
  1061  	return cc.idleStateLocked()
  1062  }
  1063  
  1064  func (cc *ClientConn) idleStateLocked() (st clientConnIdleState) {
  1065  	if cc.singleUse && cc.nextStreamID > 1 {
  1066  		return
  1067  	}
  1068  	var maxConcurrentOkay bool
  1069  	if cc.t.StrictMaxConcurrentStreams {
  1070  		// We'll tell the caller we can take a new request to
  1071  		// prevent the caller from dialing a new TCP
  1072  		// connection, but then we'll block later before
  1073  		// writing it.
  1074  		maxConcurrentOkay = true
  1075  	} else {
  1076  		// We can take a new request if the total of
  1077  		//   - active streams;
  1078  		//   - reservation slots for new streams; and
  1079  		//   - streams for which we have sent a RST_STREAM and a PING,
  1080  		//     but received no subsequent frame
  1081  		// is less than the concurrency limit.
  1082  		maxConcurrentOkay = cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams)
  1083  	}
  1084  
  1085  	st.canTakeNewRequest = cc.goAway == nil && !cc.closed && !cc.closing && maxConcurrentOkay &&
  1086  		!cc.doNotReuse &&
  1087  		int64(cc.nextStreamID)+2*int64(cc.pendingRequests) < math.MaxInt32 &&
  1088  		!cc.tooIdleLocked()
  1089  
  1090  	// If this connection has never been used for a request and is closed,
  1091  	// then let it take a request (which will fail).
  1092  	//
  1093  	// This avoids a situation where an error early in a connection's lifetime
  1094  	// goes unreported.
  1095  	if cc.nextStreamID == 1 && cc.streamsReserved == 0 && cc.closed {
  1096  		st.canTakeNewRequest = true
  1097  	}
  1098  
  1099  	return
  1100  }
  1101  
  1102  // currentRequestCountLocked reports the number of concurrency slots currently in use,
  1103  // including active streams, reserved slots, and reset streams waiting for acknowledgement.
  1104  func (cc *ClientConn) currentRequestCountLocked() int {
  1105  	return len(cc.streams) + cc.streamsReserved + cc.pendingResets
  1106  }
  1107  
  1108  func (cc *ClientConn) canTakeNewRequestLocked() bool {
  1109  	st := cc.idleStateLocked()
  1110  	return st.canTakeNewRequest
  1111  }
  1112  
  1113  // tooIdleLocked reports whether this connection has been been sitting idle
  1114  // for too much wall time.
  1115  func (cc *ClientConn) tooIdleLocked() bool {
  1116  	// The Round(0) strips the monontonic clock reading so the
  1117  	// times are compared based on their wall time. We don't want
  1118  	// to reuse a connection that's been sitting idle during
  1119  	// VM/laptop suspend if monotonic time was also frozen.
  1120  	return cc.idleTimeout != 0 && !cc.lastIdle.IsZero() && cc.t.timeSince(cc.lastIdle.Round(0)) > cc.idleTimeout
  1121  }
  1122  
  1123  // onIdleTimeout is called from a time.AfterFunc goroutine. It will
  1124  // only be called when we're idle, but because we're coming from a new
  1125  // goroutine, there could be a new request coming in at the same time,
  1126  // so this simply calls the synchronized closeIfIdle to shut down this
  1127  // connection. The timer could just call closeIfIdle, but this is more
  1128  // clear.
  1129  func (cc *ClientConn) onIdleTimeout() {
  1130  	cc.closeIfIdle()
  1131  }
  1132  
  1133  func (cc *ClientConn) closeConn() {
  1134  	t := time.AfterFunc(250*time.Millisecond, cc.forceCloseConn)
  1135  	defer t.Stop()
  1136  	cc.tconn.Close()
  1137  }
  1138  
  1139  // A tls.Conn.Close can hang for a long time if the peer is unresponsive.
  1140  // Try to shut it down more aggressively.
  1141  func (cc *ClientConn) forceCloseConn() {
  1142  	tc, ok := cc.tconn.(*tls.Conn)
  1143  	if !ok {
  1144  		return
  1145  	}
  1146  	if nc := tc.NetConn(); nc != nil {
  1147  		nc.Close()
  1148  	}
  1149  }
  1150  
  1151  func (cc *ClientConn) closeIfIdle() {
  1152  	cc.mu.Lock()
  1153  	if len(cc.streams) > 0 || cc.streamsReserved > 0 {
  1154  		cc.mu.Unlock()
  1155  		return
  1156  	}
  1157  	cc.closed = true
  1158  	nextID := cc.nextStreamID
  1159  	// TODO: do clients send GOAWAY too? maybe? Just Close:
  1160  	cc.mu.Unlock()
  1161  
  1162  	if VerboseLogs {
  1163  		cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, nextID-2)
  1164  	}
  1165  	cc.closeConn()
  1166  }
  1167  
  1168  func (cc *ClientConn) isDoNotReuseAndIdle() bool {
  1169  	cc.mu.Lock()
  1170  	defer cc.mu.Unlock()
  1171  	return cc.doNotReuse && len(cc.streams) == 0
  1172  }
  1173  
  1174  var shutdownEnterWaitStateHook = func() {}
  1175  
  1176  // Shutdown gracefully closes the client connection, waiting for running streams to complete.
  1177  func (cc *ClientConn) Shutdown(ctx context.Context) error {
  1178  	if err := cc.sendGoAway(); err != nil {
  1179  		return err
  1180  	}
  1181  	// Wait for all in-flight streams to complete or connection to close
  1182  	done := make(chan struct{})
  1183  	cancelled := false // guarded by cc.mu
  1184  	go func() {
  1185  		cc.t.markNewGoroutine()
  1186  		cc.mu.Lock()
  1187  		defer cc.mu.Unlock()
  1188  		for {
  1189  			if len(cc.streams) == 0 || cc.closed {
  1190  				cc.closed = true
  1191  				close(done)
  1192  				break
  1193  			}
  1194  			if cancelled {
  1195  				break
  1196  			}
  1197  			cc.cond.Wait()
  1198  		}
  1199  	}()
  1200  	shutdownEnterWaitStateHook()
  1201  	select {
  1202  	case <-done:
  1203  		cc.closeConn()
  1204  		return nil
  1205  	case <-ctx.Done():
  1206  		cc.mu.Lock()
  1207  		// Free the goroutine above
  1208  		cancelled = true
  1209  		cc.cond.Broadcast()
  1210  		cc.mu.Unlock()
  1211  		return ctx.Err()
  1212  	}
  1213  }
  1214  
  1215  func (cc *ClientConn) sendGoAway() error {
  1216  	cc.mu.Lock()
  1217  	closing := cc.closing
  1218  	cc.closing = true
  1219  	maxStreamID := cc.nextStreamID
  1220  	cc.mu.Unlock()
  1221  	if closing {
  1222  		// GOAWAY sent already
  1223  		return nil
  1224  	}
  1225  
  1226  	cc.wmu.Lock()
  1227  	defer cc.wmu.Unlock()
  1228  	// Send a graceful shutdown frame to server
  1229  	if err := cc.fr.WriteGoAway(maxStreamID, ErrCodeNo, nil); err != nil {
  1230  		return err
  1231  	}
  1232  	if err := cc.bw.Flush(); err != nil {
  1233  		return err
  1234  	}
  1235  	// Prevent new requests
  1236  	return nil
  1237  }
  1238  
  1239  // closes the client connection immediately. In-flight requests are interrupted.
  1240  // err is sent to streams.
  1241  func (cc *ClientConn) closeForError(err error) {
  1242  	cc.mu.Lock()
  1243  	cc.closed = true
  1244  	for _, cs := range cc.streams {
  1245  		cs.abortStreamLocked(err)
  1246  	}
  1247  	cc.cond.Broadcast()
  1248  	cc.mu.Unlock()
  1249  	cc.closeConn()
  1250  }
  1251  
  1252  // Close closes the client connection immediately.
  1253  //
  1254  // In-flight requests are interrupted. For a graceful shutdown, use Shutdown instead.
  1255  func (cc *ClientConn) Close() error {
  1256  	err := errors.New("http2: client connection force closed via ClientConn.Close")
  1257  	cc.closeForError(err)
  1258  	return nil
  1259  }
  1260  
  1261  // closes the client connection immediately. In-flight requests are interrupted.
  1262  func (cc *ClientConn) closeForLostPing() {
  1263  	err := errors.New("http2: client connection lost")
  1264  	if f := cc.t.CountError; f != nil {
  1265  		f("conn_close_lost_ping")
  1266  	}
  1267  	cc.closeForError(err)
  1268  }
  1269  
  1270  // errRequestCanceled is a copy of net/http's errRequestCanceled because it's not
  1271  // exported. At least they'll be DeepEqual for h1-vs-h2 comparisons tests.
  1272  var errRequestCanceled = errors.New("net/http: request canceled")
  1273  
  1274  func commaSeparatedTrailers(req *http.Request) (string, error) {
  1275  	keys := make([]string, 0, len(req.Trailer))
  1276  	for k := range req.Trailer {
  1277  		k = canonicalHeader(k)
  1278  		switch k {
  1279  		case "Transfer-Encoding", "Trailer", "Content-Length":
  1280  			return "", fmt.Errorf("invalid Trailer key %q", k)
  1281  		}
  1282  		keys = append(keys, k)
  1283  	}
  1284  	if len(keys) > 0 {
  1285  		sort.Strings(keys)
  1286  		return strings.Join(keys, ","), nil
  1287  	}
  1288  	return "", nil
  1289  }
  1290  
  1291  func (cc *ClientConn) responseHeaderTimeout() time.Duration {
  1292  	if cc.t.t1 != nil {
  1293  		return cc.t.t1.ResponseHeaderTimeout
  1294  	}
  1295  	// No way to do this (yet?) with just an http2.Transport. Probably
  1296  	// no need. Request.Cancel this is the new way. We only need to support
  1297  	// this for compatibility with the old http.Transport fields when
  1298  	// we're doing transparent http2.
  1299  	return 0
  1300  }
  1301  
  1302  // checkConnHeaders checks whether req has any invalid connection-level headers.
  1303  // per RFC 7540 section 8.1.2.2: Connection-Specific Header Fields.
  1304  // Certain headers are special-cased as okay but not transmitted later.
  1305  func checkConnHeaders(req *http.Request) error {
  1306  	if v := req.Header.Get("Upgrade"); v != "" {
  1307  		return fmt.Errorf("http2: invalid Upgrade request header: %q", req.Header["Upgrade"])
  1308  	}
  1309  	if vv := req.Header["Transfer-Encoding"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && vv[0] != "chunked") {
  1310  		return fmt.Errorf("http2: invalid Transfer-Encoding request header: %q", vv)
  1311  	}
  1312  	if vv := req.Header["Connection"]; len(vv) > 0 && (len(vv) > 1 || vv[0] != "" && !asciiEqualFold(vv[0], "close") && !asciiEqualFold(vv[0], "keep-alive")) {
  1313  		return fmt.Errorf("http2: invalid Connection request header: %q", vv)
  1314  	}
  1315  	return nil
  1316  }
  1317  
  1318  // actualContentLength returns a sanitized version of
  1319  // req.ContentLength, where 0 actually means zero (not unknown) and -1
  1320  // means unknown.
  1321  func actualContentLength(req *http.Request) int64 {
  1322  	if req.Body == nil || req.Body == http.NoBody {
  1323  		return 0
  1324  	}
  1325  	if req.ContentLength != 0 {
  1326  		return req.ContentLength
  1327  	}
  1328  	return -1
  1329  }
  1330  
  1331  func (cc *ClientConn) decrStreamReservations() {
  1332  	cc.mu.Lock()
  1333  	defer cc.mu.Unlock()
  1334  	cc.decrStreamReservationsLocked()
  1335  }
  1336  
  1337  func (cc *ClientConn) decrStreamReservationsLocked() {
  1338  	if cc.streamsReserved > 0 {
  1339  		cc.streamsReserved--
  1340  	}
  1341  }
  1342  
  1343  func (cc *ClientConn) RoundTrip(req *http.Request) (*http.Response, error) {
  1344  	return cc.roundTrip(req, nil)
  1345  }
  1346  
  1347  func (cc *ClientConn) roundTrip(req *http.Request, streamf func(*clientStream)) (*http.Response, error) {
  1348  	ctx := req.Context()
  1349  	cs := &clientStream{
  1350  		cc:                   cc,
  1351  		ctx:                  ctx,
  1352  		reqCancel:            req.Cancel,
  1353  		isHead:               req.Method == "HEAD",
  1354  		reqBody:              req.Body,
  1355  		reqBodyContentLength: actualContentLength(req),
  1356  		trace:                httptrace.ContextClientTrace(ctx),
  1357  		peerClosed:           make(chan struct{}),
  1358  		abort:                make(chan struct{}),
  1359  		respHeaderRecv:       make(chan struct{}),
  1360  		donec:                make(chan struct{}),
  1361  	}
  1362  
  1363  	// TODO(bradfitz): this is a copy of the logic in net/http. Unify somewhere?
  1364  	if !cc.t.disableCompression() &&
  1365  		req.Header.Get("Accept-Encoding") == "" &&
  1366  		req.Header.Get("Range") == "" &&
  1367  		!cs.isHead {
  1368  		// Request gzip only, not deflate. Deflate is ambiguous and
  1369  		// not as universally supported anyway.
  1370  		// See: https://zlib.net/zlib_faq.html#faq39
  1371  		//
  1372  		// Note that we don't request this for HEAD requests,
  1373  		// due to a bug in nginx:
  1374  		//   http://trac.nginx.org/nginx/ticket/358
  1375  		//   https://golang.org/issue/5522
  1376  		//
  1377  		// We don't request gzip if the request is for a range, since
  1378  		// auto-decoding a portion of a gzipped document will just fail
  1379  		// anyway. See https://golang.org/issue/8923
  1380  		cs.requestedGzip = true
  1381  	}
  1382  
  1383  	go cs.doRequest(req, streamf)
  1384  
  1385  	waitDone := func() error {
  1386  		select {
  1387  		case <-cs.donec:
  1388  			return nil
  1389  		case <-ctx.Done():
  1390  			return ctx.Err()
  1391  		case <-cs.reqCancel:
  1392  			return errRequestCanceled
  1393  		}
  1394  	}
  1395  
  1396  	handleResponseHeaders := func() (*http.Response, error) {
  1397  		res := cs.res
  1398  		if res.StatusCode > 299 {
  1399  			// On error or status code 3xx, 4xx, 5xx, etc abort any
  1400  			// ongoing write, assuming that the server doesn't care
  1401  			// about our request body. If the server replied with 1xx or
  1402  			// 2xx, however, then assume the server DOES potentially
  1403  			// want our body (e.g. full-duplex streaming:
  1404  			// golang.org/issue/13444). If it turns out the server
  1405  			// doesn't, they'll RST_STREAM us soon enough. This is a
  1406  			// heuristic to avoid adding knobs to Transport. Hopefully
  1407  			// we can keep it.
  1408  			cs.abortRequestBodyWrite()
  1409  		}
  1410  		res.Request = req
  1411  		res.TLS = cc.tlsState
  1412  		if res.Body == noBody && actualContentLength(req) == 0 {
  1413  			// If there isn't a request or response body still being
  1414  			// written, then wait for the stream to be closed before
  1415  			// RoundTrip returns.
  1416  			if err := waitDone(); err != nil {
  1417  				return nil, err
  1418  			}
  1419  		}
  1420  		return res, nil
  1421  	}
  1422  
  1423  	cancelRequest := func(cs *clientStream, err error) error {
  1424  		cs.cc.mu.Lock()
  1425  		bodyClosed := cs.reqBodyClosed
  1426  		cs.cc.mu.Unlock()
  1427  		// Wait for the request body to be closed.
  1428  		//
  1429  		// If nothing closed the body before now, abortStreamLocked
  1430  		// will have started a goroutine to close it.
  1431  		//
  1432  		// Closing the body before returning avoids a race condition
  1433  		// with net/http checking its readTrackingBody to see if the
  1434  		// body was read from or closed. See golang/go#60041.
  1435  		//
  1436  		// The body is closed in a separate goroutine without the
  1437  		// connection mutex held, but dropping the mutex before waiting
  1438  		// will keep us from holding it indefinitely if the body
  1439  		// close is slow for some reason.
  1440  		if bodyClosed != nil {
  1441  			<-bodyClosed
  1442  		}
  1443  		return err
  1444  	}
  1445  
  1446  	for {
  1447  		select {
  1448  		case <-cs.respHeaderRecv:
  1449  			return handleResponseHeaders()
  1450  		case <-cs.abort:
  1451  			select {
  1452  			case <-cs.respHeaderRecv:
  1453  				// If both cs.respHeaderRecv and cs.abort are signaling,
  1454  				// pick respHeaderRecv. The server probably wrote the
  1455  				// response and immediately reset the stream.
  1456  				// golang.org/issue/49645
  1457  				return handleResponseHeaders()
  1458  			default:
  1459  				waitDone()
  1460  				return nil, cs.abortErr
  1461  			}
  1462  		case <-ctx.Done():
  1463  			err := ctx.Err()
  1464  			cs.abortStream(err)
  1465  			return nil, cancelRequest(cs, err)
  1466  		case <-cs.reqCancel:
  1467  			cs.abortStream(errRequestCanceled)
  1468  			return nil, cancelRequest(cs, errRequestCanceled)
  1469  		}
  1470  	}
  1471  }
  1472  
  1473  // doRequest runs for the duration of the request lifetime.
  1474  //
  1475  // It sends the request and performs post-request cleanup (closing Request.Body, etc.).
  1476  func (cs *clientStream) doRequest(req *http.Request, streamf func(*clientStream)) {
  1477  	cs.cc.t.markNewGoroutine()
  1478  	err := cs.writeRequest(req, streamf)
  1479  	cs.cleanupWriteRequest(err)
  1480  }
  1481  
  1482  var errExtendedConnectNotSupported = errors.New("net/http: extended connect not supported by peer")
  1483  
  1484  // writeRequest sends a request.
  1485  //
  1486  // It returns nil after the request is written, the response read,
  1487  // and the request stream is half-closed by the peer.
  1488  //
  1489  // It returns non-nil if the request ends otherwise.
  1490  // If the returned error is StreamError, the error Code may be used in resetting the stream.
  1491  func (cs *clientStream) writeRequest(req *http.Request, streamf func(*clientStream)) (err error) {
  1492  	cc := cs.cc
  1493  	ctx := cs.ctx
  1494  
  1495  	if err := checkConnHeaders(req); err != nil {
  1496  		return err
  1497  	}
  1498  
  1499  	// wait for setting frames to be received, a server can change this value later,
  1500  	// but we just wait for the first settings frame
  1501  	var isExtendedConnect bool
  1502  	if req.Method == "CONNECT" && req.Header.Get(":protocol") != "" {
  1503  		isExtendedConnect = true
  1504  	}
  1505  
  1506  	// Acquire the new-request lock by writing to reqHeaderMu.
  1507  	// This lock guards the critical section covering allocating a new stream ID
  1508  	// (requires mu) and creating the stream (requires wmu).
  1509  	if cc.reqHeaderMu == nil {
  1510  		panic("RoundTrip on uninitialized ClientConn") // for tests
  1511  	}
  1512  	if isExtendedConnect {
  1513  		select {
  1514  		case <-cs.reqCancel:
  1515  			return errRequestCanceled
  1516  		case <-ctx.Done():
  1517  			return ctx.Err()
  1518  		case <-cc.seenSettingsChan:
  1519  			if !cc.extendedConnectAllowed {
  1520  				return errExtendedConnectNotSupported
  1521  			}
  1522  		}
  1523  	}
  1524  	select {
  1525  	case cc.reqHeaderMu <- struct{}{}:
  1526  	case <-cs.reqCancel:
  1527  		return errRequestCanceled
  1528  	case <-ctx.Done():
  1529  		return ctx.Err()
  1530  	}
  1531  
  1532  	cc.mu.Lock()
  1533  	if cc.idleTimer != nil {
  1534  		cc.idleTimer.Stop()
  1535  	}
  1536  	cc.decrStreamReservationsLocked()
  1537  	if err := cc.awaitOpenSlotForStreamLocked(cs); err != nil {
  1538  		cc.mu.Unlock()
  1539  		<-cc.reqHeaderMu
  1540  		return err
  1541  	}
  1542  	cc.addStreamLocked(cs) // assigns stream ID
  1543  	if isConnectionCloseRequest(req) {
  1544  		cc.doNotReuse = true
  1545  	}
  1546  	cc.mu.Unlock()
  1547  
  1548  	if streamf != nil {
  1549  		streamf(cs)
  1550  	}
  1551  
  1552  	continueTimeout := cc.t.expectContinueTimeout()
  1553  	if continueTimeout != 0 {
  1554  		if !httpguts.HeaderValuesContainsToken(req.Header["Expect"], "100-continue") {
  1555  			continueTimeout = 0
  1556  		} else {
  1557  			cs.on100 = make(chan struct{}, 1)
  1558  		}
  1559  	}
  1560  
  1561  	// Past this point (where we send request headers), it is possible for
  1562  	// RoundTrip to return successfully. Since the RoundTrip contract permits
  1563  	// the caller to "mutate or reuse" the Request after closing the Response's Body,
  1564  	// we must take care when referencing the Request from here on.
  1565  	err = cs.encodeAndWriteHeaders(req)
  1566  	<-cc.reqHeaderMu
  1567  	if err != nil {
  1568  		return err
  1569  	}
  1570  
  1571  	hasBody := cs.reqBodyContentLength != 0
  1572  	if !hasBody {
  1573  		cs.sentEndStream = true
  1574  	} else {
  1575  		if continueTimeout != 0 {
  1576  			traceWait100Continue(cs.trace)
  1577  			timer := time.NewTimer(continueTimeout)
  1578  			select {
  1579  			case <-timer.C:
  1580  				err = nil
  1581  			case <-cs.on100:
  1582  				err = nil
  1583  			case <-cs.abort:
  1584  				err = cs.abortErr
  1585  			case <-ctx.Done():
  1586  				err = ctx.Err()
  1587  			case <-cs.reqCancel:
  1588  				err = errRequestCanceled
  1589  			}
  1590  			timer.Stop()
  1591  			if err != nil {
  1592  				traceWroteRequest(cs.trace, err)
  1593  				return err
  1594  			}
  1595  		}
  1596  
  1597  		if err = cs.writeRequestBody(req); err != nil {
  1598  			if err != errStopReqBodyWrite {
  1599  				traceWroteRequest(cs.trace, err)
  1600  				return err
  1601  			}
  1602  		} else {
  1603  			cs.sentEndStream = true
  1604  		}
  1605  	}
  1606  
  1607  	traceWroteRequest(cs.trace, err)
  1608  
  1609  	var respHeaderTimer <-chan time.Time
  1610  	var respHeaderRecv chan struct{}
  1611  	if d := cc.responseHeaderTimeout(); d != 0 {
  1612  		timer := cc.t.newTimer(d)
  1613  		defer timer.Stop()
  1614  		respHeaderTimer = timer.C()
  1615  		respHeaderRecv = cs.respHeaderRecv
  1616  	}
  1617  	// Wait until the peer half-closes its end of the stream,
  1618  	// or until the request is aborted (via context, error, or otherwise),
  1619  	// whichever comes first.
  1620  	for {
  1621  		select {
  1622  		case <-cs.peerClosed:
  1623  			return nil
  1624  		case <-respHeaderTimer:
  1625  			return errTimeout
  1626  		case <-respHeaderRecv:
  1627  			respHeaderRecv = nil
  1628  			respHeaderTimer = nil // keep waiting for END_STREAM
  1629  		case <-cs.abort:
  1630  			return cs.abortErr
  1631  		case <-ctx.Done():
  1632  			return ctx.Err()
  1633  		case <-cs.reqCancel:
  1634  			return errRequestCanceled
  1635  		}
  1636  	}
  1637  }
  1638  
  1639  func (cs *clientStream) encodeAndWriteHeaders(req *http.Request) error {
  1640  	cc := cs.cc
  1641  	ctx := cs.ctx
  1642  
  1643  	cc.wmu.Lock()
  1644  	defer cc.wmu.Unlock()
  1645  
  1646  	// If the request was canceled while waiting for cc.mu, just quit.
  1647  	select {
  1648  	case <-cs.abort:
  1649  		return cs.abortErr
  1650  	case <-ctx.Done():
  1651  		return ctx.Err()
  1652  	case <-cs.reqCancel:
  1653  		return errRequestCanceled
  1654  	default:
  1655  	}
  1656  
  1657  	// Encode headers.
  1658  	//
  1659  	// we send: HEADERS{1}, CONTINUATION{0,} + DATA{0,} (DATA is
  1660  	// sent by writeRequestBody below, along with any Trailers,
  1661  	// again in form HEADERS{1}, CONTINUATION{0,})
  1662  	trailers, err := commaSeparatedTrailers(req)
  1663  	if err != nil {
  1664  		return err
  1665  	}
  1666  	hasTrailers := trailers != ""
  1667  	contentLen := actualContentLength(req)
  1668  	hasBody := contentLen != 0
  1669  	hdrs, err := cc.encodeHeaders(req, cs.requestedGzip, trailers, contentLen)
  1670  	if err != nil {
  1671  		return err
  1672  	}
  1673  
  1674  	// Write the request.
  1675  	endStream := !hasBody && !hasTrailers
  1676  	cs.sentHeaders = true
  1677  	err = cc.writeHeaders(cs.ID, endStream, int(cc.maxFrameSize), hdrs)
  1678  	traceWroteHeaders(cs.trace)
  1679  	return err
  1680  }
  1681  
  1682  // cleanupWriteRequest performs post-request tasks.
  1683  //
  1684  // If err (the result of writeRequest) is non-nil and the stream is not closed,
  1685  // cleanupWriteRequest will send a reset to the peer.
  1686  func (cs *clientStream) cleanupWriteRequest(err error) {
  1687  	cc := cs.cc
  1688  
  1689  	if cs.ID == 0 {
  1690  		// We were canceled before creating the stream, so return our reservation.
  1691  		cc.decrStreamReservations()
  1692  	}
  1693  
  1694  	// TODO: write h12Compare test showing whether
  1695  	// Request.Body is closed by the Transport,
  1696  	// and in multiple cases: server replies <=299 and >299
  1697  	// while still writing request body
  1698  	cc.mu.Lock()
  1699  	mustCloseBody := false
  1700  	if cs.reqBody != nil && cs.reqBodyClosed == nil {
  1701  		mustCloseBody = true
  1702  		cs.reqBodyClosed = make(chan struct{})
  1703  	}
  1704  	bodyClosed := cs.reqBodyClosed
  1705  	closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
  1706  	cc.mu.Unlock()
  1707  	if mustCloseBody {
  1708  		cs.reqBody.Close()
  1709  		close(bodyClosed)
  1710  	}
  1711  	if bodyClosed != nil {
  1712  		<-bodyClosed
  1713  	}
  1714  
  1715  	if err != nil && cs.sentEndStream {
  1716  		// If the connection is closed immediately after the response is read,
  1717  		// we may be aborted before finishing up here. If the stream was closed
  1718  		// cleanly on both sides, there is no error.
  1719  		select {
  1720  		case <-cs.peerClosed:
  1721  			err = nil
  1722  		default:
  1723  		}
  1724  	}
  1725  	if err != nil {
  1726  		cs.abortStream(err) // possibly redundant, but harmless
  1727  		if cs.sentHeaders {
  1728  			if se, ok := err.(StreamError); ok {
  1729  				if se.Cause != errFromPeer {
  1730  					cc.writeStreamReset(cs.ID, se.Code, false, err)
  1731  				}
  1732  			} else {
  1733  				// We're cancelling an in-flight request.
  1734  				//
  1735  				// This could be due to the server becoming unresponsive.
  1736  				// To avoid sending too many requests on a dead connection,
  1737  				// we let the request continue to consume a concurrency slot
  1738  				// until we can confirm the server is still responding.
  1739  				// We do this by sending a PING frame along with the RST_STREAM
  1740  				// (unless a ping is already in flight).
  1741  				//
  1742  				// For simplicity, we don't bother tracking the PING payload:
  1743  				// We reset cc.pendingResets any time we receive a PING ACK.
  1744  				//
  1745  				// We skip this if the conn is going to be closed on idle,
  1746  				// because it's short lived and will probably be closed before
  1747  				// we get the ping response.
  1748  				ping := false
  1749  				if !closeOnIdle {
  1750  					cc.mu.Lock()
  1751  					// rstStreamPingsBlocked works around a gRPC behavior:
  1752  					// see comment on the field for details.
  1753  					if !cc.rstStreamPingsBlocked {
  1754  						if cc.pendingResets == 0 {
  1755  							ping = true
  1756  						}
  1757  						cc.pendingResets++
  1758  					}
  1759  					cc.mu.Unlock()
  1760  				}
  1761  				cc.writeStreamReset(cs.ID, ErrCodeCancel, ping, err)
  1762  			}
  1763  		}
  1764  		cs.bufPipe.CloseWithError(err) // no-op if already closed
  1765  	} else {
  1766  		if cs.sentHeaders && !cs.sentEndStream {
  1767  			cc.writeStreamReset(cs.ID, ErrCodeNo, false, nil)
  1768  		}
  1769  		cs.bufPipe.CloseWithError(errRequestCanceled)
  1770  	}
  1771  	if cs.ID != 0 {
  1772  		cc.forgetStreamID(cs.ID)
  1773  	}
  1774  
  1775  	cc.wmu.Lock()
  1776  	werr := cc.werr
  1777  	cc.wmu.Unlock()
  1778  	if werr != nil {
  1779  		cc.Close()
  1780  	}
  1781  
  1782  	close(cs.donec)
  1783  }
  1784  
  1785  // awaitOpenSlotForStreamLocked waits until len(streams) < maxConcurrentStreams.
  1786  // Must hold cc.mu.
  1787  func (cc *ClientConn) awaitOpenSlotForStreamLocked(cs *clientStream) error {
  1788  	for {
  1789  		if cc.closed && cc.nextStreamID == 1 && cc.streamsReserved == 0 {
  1790  			// This is the very first request sent to this connection.
  1791  			// Return a fatal error which aborts the retry loop.
  1792  			return errClientConnNotEstablished
  1793  		}
  1794  		cc.lastActive = cc.t.now()
  1795  		if cc.closed || !cc.canTakeNewRequestLocked() {
  1796  			return errClientConnUnusable
  1797  		}
  1798  		cc.lastIdle = time.Time{}
  1799  		if cc.currentRequestCountLocked() < int(cc.maxConcurrentStreams) {
  1800  			return nil
  1801  		}
  1802  		cc.pendingRequests++
  1803  		cc.cond.Wait()
  1804  		cc.pendingRequests--
  1805  		select {
  1806  		case <-cs.abort:
  1807  			return cs.abortErr
  1808  		default:
  1809  		}
  1810  	}
  1811  }
  1812  
  1813  // requires cc.wmu be held
  1814  func (cc *ClientConn) writeHeaders(streamID uint32, endStream bool, maxFrameSize int, hdrs []byte) error {
  1815  	first := true // first frame written (HEADERS is first, then CONTINUATION)
  1816  	for len(hdrs) > 0 && cc.werr == nil {
  1817  		chunk := hdrs
  1818  		if len(chunk) > maxFrameSize {
  1819  			chunk = chunk[:maxFrameSize]
  1820  		}
  1821  		hdrs = hdrs[len(chunk):]
  1822  		endHeaders := len(hdrs) == 0
  1823  		if first {
  1824  			cc.fr.WriteHeaders(HeadersFrameParam{
  1825  				StreamID:      streamID,
  1826  				BlockFragment: chunk,
  1827  				EndStream:     endStream,
  1828  				EndHeaders:    endHeaders,
  1829  			})
  1830  			first = false
  1831  		} else {
  1832  			cc.fr.WriteContinuation(streamID, endHeaders, chunk)
  1833  		}
  1834  	}
  1835  	cc.bw.Flush()
  1836  	return cc.werr
  1837  }
  1838  
  1839  // internal error values; they don't escape to callers
  1840  var (
  1841  	// abort request body write; don't send cancel
  1842  	errStopReqBodyWrite = errors.New("http2: aborting request body write")
  1843  
  1844  	// abort request body write, but send stream reset of cancel.
  1845  	errStopReqBodyWriteAndCancel = errors.New("http2: canceling request")
  1846  
  1847  	errReqBodyTooLong = errors.New("http2: request body larger than specified content length")
  1848  )
  1849  
  1850  // frameScratchBufferLen returns the length of a buffer to use for
  1851  // outgoing request bodies to read/write to/from.
  1852  //
  1853  // It returns max(1, min(peer's advertised max frame size,
  1854  // Request.ContentLength+1, 512KB)).
  1855  func (cs *clientStream) frameScratchBufferLen(maxFrameSize int) int {
  1856  	const max = 512 << 10
  1857  	n := int64(maxFrameSize)
  1858  	if n > max {
  1859  		n = max
  1860  	}
  1861  	if cl := cs.reqBodyContentLength; cl != -1 && cl+1 < n {
  1862  		// Add an extra byte past the declared content-length to
  1863  		// give the caller's Request.Body io.Reader a chance to
  1864  		// give us more bytes than they declared, so we can catch it
  1865  		// early.
  1866  		n = cl + 1
  1867  	}
  1868  	if n < 1 {
  1869  		return 1
  1870  	}
  1871  	return int(n) // doesn't truncate; max is 512K
  1872  }
  1873  
  1874  // Seven bufPools manage different frame sizes. This helps to avoid scenarios where long-running
  1875  // streaming requests using small frame sizes occupy large buffers initially allocated for prior
  1876  // requests needing big buffers. The size ranges are as follows:
  1877  // {0 KB, 16 KB], {16 KB, 32 KB], {32 KB, 64 KB], {64 KB, 128 KB], {128 KB, 256 KB],
  1878  // {256 KB, 512 KB], {512 KB, infinity}
  1879  // In practice, the maximum scratch buffer size should not exceed 512 KB due to
  1880  // frameScratchBufferLen(maxFrameSize), thus the "infinity pool" should never be used.
  1881  // It exists mainly as a safety measure, for potential future increases in max buffer size.
  1882  var bufPools [7]sync.Pool // of *[]byte
  1883  func bufPoolIndex(size int) int {
  1884  	if size <= 16384 {
  1885  		return 0
  1886  	}
  1887  	size -= 1
  1888  	bits := bits.Len(uint(size))
  1889  	index := bits - 14
  1890  	if index >= len(bufPools) {
  1891  		return len(bufPools) - 1
  1892  	}
  1893  	return index
  1894  }
  1895  
  1896  func (cs *clientStream) writeRequestBody(req *http.Request) (err error) {
  1897  	cc := cs.cc
  1898  	body := cs.reqBody
  1899  	sentEnd := false // whether we sent the final DATA frame w/ END_STREAM
  1900  
  1901  	hasTrailers := req.Trailer != nil
  1902  	remainLen := cs.reqBodyContentLength
  1903  	hasContentLen := remainLen != -1
  1904  
  1905  	cc.mu.Lock()
  1906  	maxFrameSize := int(cc.maxFrameSize)
  1907  	cc.mu.Unlock()
  1908  
  1909  	// Scratch buffer for reading into & writing from.
  1910  	scratchLen := cs.frameScratchBufferLen(maxFrameSize)
  1911  	var buf []byte
  1912  	index := bufPoolIndex(scratchLen)
  1913  	if bp, ok := bufPools[index].Get().(*[]byte); ok && len(*bp) >= scratchLen {
  1914  		defer bufPools[index].Put(bp)
  1915  		buf = *bp
  1916  	} else {
  1917  		buf = make([]byte, scratchLen)
  1918  		defer bufPools[index].Put(&buf)
  1919  	}
  1920  
  1921  	var sawEOF bool
  1922  	for !sawEOF {
  1923  		n, err := body.Read(buf)
  1924  		if hasContentLen {
  1925  			remainLen -= int64(n)
  1926  			if remainLen == 0 && err == nil {
  1927  				// The request body's Content-Length was predeclared and
  1928  				// we just finished reading it all, but the underlying io.Reader
  1929  				// returned the final chunk with a nil error (which is one of
  1930  				// the two valid things a Reader can do at EOF). Because we'd prefer
  1931  				// to send the END_STREAM bit early, double-check that we're actually
  1932  				// at EOF. Subsequent reads should return (0, EOF) at this point.
  1933  				// If either value is different, we return an error in one of two ways below.
  1934  				var scratch [1]byte
  1935  				var n1 int
  1936  				n1, err = body.Read(scratch[:])
  1937  				remainLen -= int64(n1)
  1938  			}
  1939  			if remainLen < 0 {
  1940  				err = errReqBodyTooLong
  1941  				return err
  1942  			}
  1943  		}
  1944  		if err != nil {
  1945  			cc.mu.Lock()
  1946  			bodyClosed := cs.reqBodyClosed != nil
  1947  			cc.mu.Unlock()
  1948  			switch {
  1949  			case bodyClosed:
  1950  				return errStopReqBodyWrite
  1951  			case err == io.EOF:
  1952  				sawEOF = true
  1953  				err = nil
  1954  			default:
  1955  				return err
  1956  			}
  1957  		}
  1958  
  1959  		remain := buf[:n]
  1960  		for len(remain) > 0 && err == nil {
  1961  			var allowed int32
  1962  			allowed, err = cs.awaitFlowControl(len(remain))
  1963  			if err != nil {
  1964  				return err
  1965  			}
  1966  			cc.wmu.Lock()
  1967  			data := remain[:allowed]
  1968  			remain = remain[allowed:]
  1969  			sentEnd = sawEOF && len(remain) == 0 && !hasTrailers
  1970  			err = cc.fr.WriteData(cs.ID, sentEnd, data)
  1971  			if err == nil {
  1972  				// TODO(bradfitz): this flush is for latency, not bandwidth.
  1973  				// Most requests won't need this. Make this opt-in or
  1974  				// opt-out?  Use some heuristic on the body type? Nagel-like
  1975  				// timers?  Based on 'n'? Only last chunk of this for loop,
  1976  				// unless flow control tokens are low? For now, always.
  1977  				// If we change this, see comment below.
  1978  				err = cc.bw.Flush()
  1979  			}
  1980  			cc.wmu.Unlock()
  1981  		}
  1982  		if err != nil {
  1983  			return err
  1984  		}
  1985  	}
  1986  
  1987  	if sentEnd {
  1988  		// Already sent END_STREAM (which implies we have no
  1989  		// trailers) and flushed, because currently all
  1990  		// WriteData frames above get a flush. So we're done.
  1991  		return nil
  1992  	}
  1993  
  1994  	// Since the RoundTrip contract permits the caller to "mutate or reuse"
  1995  	// a request after the Response's Body is closed, verify that this hasn't
  1996  	// happened before accessing the trailers.
  1997  	cc.mu.Lock()
  1998  	trailer := req.Trailer
  1999  	err = cs.abortErr
  2000  	cc.mu.Unlock()
  2001  	if err != nil {
  2002  		return err
  2003  	}
  2004  
  2005  	cc.wmu.Lock()
  2006  	defer cc.wmu.Unlock()
  2007  	var trls []byte
  2008  	if len(trailer) > 0 {
  2009  		trls, err = cc.encodeTrailers(trailer)
  2010  		if err != nil {
  2011  			return err
  2012  		}
  2013  	}
  2014  
  2015  	// Two ways to send END_STREAM: either with trailers, or
  2016  	// with an empty DATA frame.
  2017  	if len(trls) > 0 {
  2018  		err = cc.writeHeaders(cs.ID, true, maxFrameSize, trls)
  2019  	} else {
  2020  		err = cc.fr.WriteData(cs.ID, true, nil)
  2021  	}
  2022  	if ferr := cc.bw.Flush(); ferr != nil && err == nil {
  2023  		err = ferr
  2024  	}
  2025  	return err
  2026  }
  2027  
  2028  // awaitFlowControl waits for [1, min(maxBytes, cc.cs.maxFrameSize)] flow
  2029  // control tokens from the server.
  2030  // It returns either the non-zero number of tokens taken or an error
  2031  // if the stream is dead.
  2032  func (cs *clientStream) awaitFlowControl(maxBytes int) (taken int32, err error) {
  2033  	cc := cs.cc
  2034  	ctx := cs.ctx
  2035  	cc.mu.Lock()
  2036  	defer cc.mu.Unlock()
  2037  	for {
  2038  		if cc.closed {
  2039  			return 0, errClientConnClosed
  2040  		}
  2041  		if cs.reqBodyClosed != nil {
  2042  			return 0, errStopReqBodyWrite
  2043  		}
  2044  		select {
  2045  		case <-cs.abort:
  2046  			return 0, cs.abortErr
  2047  		case <-ctx.Done():
  2048  			return 0, ctx.Err()
  2049  		case <-cs.reqCancel:
  2050  			return 0, errRequestCanceled
  2051  		default:
  2052  		}
  2053  		if a := cs.flow.available(); a > 0 {
  2054  			take := a
  2055  			if int(take) > maxBytes {
  2056  
  2057  				take = int32(maxBytes) // can't truncate int; take is int32
  2058  			}
  2059  			if take > int32(cc.maxFrameSize) {
  2060  				take = int32(cc.maxFrameSize)
  2061  			}
  2062  			cs.flow.take(take)
  2063  			return take, nil
  2064  		}
  2065  		cc.cond.Wait()
  2066  	}
  2067  }
  2068  
  2069  func validateHeaders(hdrs http.Header) string {
  2070  	for k, vv := range hdrs {
  2071  		if !httpguts.ValidHeaderFieldName(k) && k != ":protocol" {
  2072  			return fmt.Sprintf("name %q", k)
  2073  		}
  2074  		for _, v := range vv {
  2075  			if !httpguts.ValidHeaderFieldValue(v) {
  2076  				// Don't include the value in the error,
  2077  				// because it may be sensitive.
  2078  				return fmt.Sprintf("value for header %q", k)
  2079  			}
  2080  		}
  2081  	}
  2082  	return ""
  2083  }
  2084  
  2085  var errNilRequestURL = errors.New("http2: Request.URI is nil")
  2086  
  2087  func isNormalConnect(req *http.Request) bool {
  2088  	return req.Method == "CONNECT" && req.Header.Get(":protocol") == ""
  2089  }
  2090  
  2091  // requires cc.wmu be held.
  2092  func (cc *ClientConn) encodeHeaders(req *http.Request, addGzipHeader bool, trailers string, contentLength int64) ([]byte, error) {
  2093  	cc.hbuf.Reset()
  2094  	if req.URL == nil {
  2095  		return nil, errNilRequestURL
  2096  	}
  2097  
  2098  	host := req.Host
  2099  	if host == "" {
  2100  		host = req.URL.Host
  2101  	}
  2102  	host, err := httpguts.PunycodeHostPort(host)
  2103  	if err != nil {
  2104  		return nil, err
  2105  	}
  2106  	if !httpguts.ValidHostHeader(host) {
  2107  		return nil, errors.New("http2: invalid Host header")
  2108  	}
  2109  
  2110  	var path string
  2111  	if !isNormalConnect(req) {
  2112  		path = req.URL.RequestURI()
  2113  		if !validPseudoPath(path) {
  2114  			orig := path
  2115  			path = strings.TrimPrefix(path, req.URL.Scheme+"://"+host)
  2116  			if !validPseudoPath(path) {
  2117  				if req.URL.Opaque != "" {
  2118  					return nil, fmt.Errorf("invalid request :path %q from URL.Opaque = %q", orig, req.URL.Opaque)
  2119  				} else {
  2120  					return nil, fmt.Errorf("invalid request :path %q", orig)
  2121  				}
  2122  			}
  2123  		}
  2124  	}
  2125  
  2126  	// Check for any invalid headers+trailers and return an error before we
  2127  	// potentially pollute our hpack state. (We want to be able to
  2128  	// continue to reuse the hpack encoder for future requests)
  2129  	if err := validateHeaders(req.Header); err != "" {
  2130  		return nil, fmt.Errorf("invalid HTTP header %s", err)
  2131  	}
  2132  	if err := validateHeaders(req.Trailer); err != "" {
  2133  		return nil, fmt.Errorf("invalid HTTP trailer %s", err)
  2134  	}
  2135  
  2136  	enumerateHeaders := func(f func(name, value string)) {
  2137  		// 8.1.2.3 Request Pseudo-Header Fields
  2138  		// The :path pseudo-header field includes the path and query parts of the
  2139  		// target URI (the path-absolute production and optionally a '?' character
  2140  		// followed by the query production, see Sections 3.3 and 3.4 of
  2141  		// [RFC3986]).
  2142  		f(":authority", host)
  2143  		m := req.Method
  2144  		if m == "" {
  2145  			m = http.MethodGet
  2146  		}
  2147  		f(":method", m)
  2148  		if !isNormalConnect(req) {
  2149  			f(":path", path)
  2150  			f(":scheme", req.URL.Scheme)
  2151  		}
  2152  		if trailers != "" {
  2153  			f("trailer", trailers)
  2154  		}
  2155  
  2156  		var didUA bool
  2157  		for k, vv := range req.Header {
  2158  			if asciiEqualFold(k, "host") || asciiEqualFold(k, "content-length") {
  2159  				// Host is :authority, already sent.
  2160  				// Content-Length is automatic, set below.
  2161  				continue
  2162  			} else if asciiEqualFold(k, "connection") ||
  2163  				asciiEqualFold(k, "proxy-connection") ||
  2164  				asciiEqualFold(k, "transfer-encoding") ||
  2165  				asciiEqualFold(k, "upgrade") ||
  2166  				asciiEqualFold(k, "keep-alive") {
  2167  				// Per 8.1.2.2 Connection-Specific Header
  2168  				// Fields, don't send connection-specific
  2169  				// fields. We have already checked if any
  2170  				// are error-worthy so just ignore the rest.
  2171  				continue
  2172  			} else if asciiEqualFold(k, "user-agent") {
  2173  				// Match Go's http1 behavior: at most one
  2174  				// User-Agent. If set to nil or empty string,
  2175  				// then omit it. Otherwise if not mentioned,
  2176  				// include the default (below).
  2177  				didUA = true
  2178  				if len(vv) < 1 {
  2179  					continue
  2180  				}
  2181  				vv = vv[:1]
  2182  				if vv[0] == "" {
  2183  					continue
  2184  				}
  2185  			} else if asciiEqualFold(k, "cookie") {
  2186  				// Per 8.1.2.5 To allow for better compression efficiency, the
  2187  				// Cookie header field MAY be split into separate header fields,
  2188  				// each with one or more cookie-pairs.
  2189  				for _, v := range vv {
  2190  					for {
  2191  						p := strings.IndexByte(v, ';')
  2192  						if p < 0 {
  2193  							break
  2194  						}
  2195  						f("cookie", v[:p])
  2196  						p++
  2197  						// strip space after semicolon if any.
  2198  						for p+1 <= len(v) && v[p] == ' ' {
  2199  							p++
  2200  						}
  2201  						v = v[p:]
  2202  					}
  2203  					if len(v) > 0 {
  2204  						f("cookie", v)
  2205  					}
  2206  				}
  2207  				continue
  2208  			}
  2209  
  2210  			for _, v := range vv {
  2211  				f(k, v)
  2212  			}
  2213  		}
  2214  		if shouldSendReqContentLength(req.Method, contentLength) {
  2215  			f("content-length", strconv.FormatInt(contentLength, 10))
  2216  		}
  2217  		if addGzipHeader {
  2218  			f("accept-encoding", "gzip")
  2219  		}
  2220  		if !didUA {
  2221  			f("user-agent", defaultUserAgent)
  2222  		}
  2223  	}
  2224  
  2225  	// Do a first pass over the headers counting bytes to ensure
  2226  	// we don't exceed cc.peerMaxHeaderListSize. This is done as a
  2227  	// separate pass before encoding the headers to prevent
  2228  	// modifying the hpack state.
  2229  	hlSize := uint64(0)
  2230  	enumerateHeaders(func(name, value string) {
  2231  		hf := hpack.HeaderField{Name: name, Value: value}
  2232  		hlSize += uint64(hf.Size())
  2233  	})
  2234  
  2235  	if hlSize > cc.peerMaxHeaderListSize {
  2236  		return nil, errRequestHeaderListSize
  2237  	}
  2238  
  2239  	trace := httptrace.ContextClientTrace(req.Context())
  2240  	traceHeaders := traceHasWroteHeaderField(trace)
  2241  
  2242  	// Header list size is ok. Write the headers.
  2243  	enumerateHeaders(func(name, value string) {
  2244  		name, ascii := lowerHeader(name)
  2245  		if !ascii {
  2246  			// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
  2247  			// field names have to be ASCII characters (just as in HTTP/1.x).
  2248  			return
  2249  		}
  2250  		cc.writeHeader(name, value)
  2251  		if traceHeaders {
  2252  			traceWroteHeaderField(trace, name, value)
  2253  		}
  2254  	})
  2255  
  2256  	return cc.hbuf.Bytes(), nil
  2257  }
  2258  
  2259  // shouldSendReqContentLength reports whether the http2.Transport should send
  2260  // a "content-length" request header. This logic is basically a copy of the net/http
  2261  // transferWriter.shouldSendContentLength.
  2262  // The contentLength is the corrected contentLength (so 0 means actually 0, not unknown).
  2263  // -1 means unknown.
  2264  func shouldSendReqContentLength(method string, contentLength int64) bool {
  2265  	if contentLength > 0 {
  2266  		return true
  2267  	}
  2268  	if contentLength < 0 {
  2269  		return false
  2270  	}
  2271  	// For zero bodies, whether we send a content-length depends on the method.
  2272  	// It also kinda doesn't matter for http2 either way, with END_STREAM.
  2273  	switch method {
  2274  	case "POST", "PUT", "PATCH":
  2275  		return true
  2276  	default:
  2277  		return false
  2278  	}
  2279  }
  2280  
  2281  // requires cc.wmu be held.
  2282  func (cc *ClientConn) encodeTrailers(trailer http.Header) ([]byte, error) {
  2283  	cc.hbuf.Reset()
  2284  
  2285  	hlSize := uint64(0)
  2286  	for k, vv := range trailer {
  2287  		for _, v := range vv {
  2288  			hf := hpack.HeaderField{Name: k, Value: v}
  2289  			hlSize += uint64(hf.Size())
  2290  		}
  2291  	}
  2292  	if hlSize > cc.peerMaxHeaderListSize {
  2293  		return nil, errRequestHeaderListSize
  2294  	}
  2295  
  2296  	for k, vv := range trailer {
  2297  		lowKey, ascii := lowerHeader(k)
  2298  		if !ascii {
  2299  			// Skip writing invalid headers. Per RFC 7540, Section 8.1.2, header
  2300  			// field names have to be ASCII characters (just as in HTTP/1.x).
  2301  			continue
  2302  		}
  2303  		// Transfer-Encoding, etc.. have already been filtered at the
  2304  		// start of RoundTrip
  2305  		for _, v := range vv {
  2306  			cc.writeHeader(lowKey, v)
  2307  		}
  2308  	}
  2309  	return cc.hbuf.Bytes(), nil
  2310  }
  2311  
  2312  func (cc *ClientConn) writeHeader(name, value string) {
  2313  	if VerboseLogs {
  2314  		log.Printf("http2: Transport encoding header %q = %q", name, value)
  2315  	}
  2316  	cc.henc.WriteField(hpack.HeaderField{Name: name, Value: value})
  2317  }
  2318  
  2319  type resAndError struct {
  2320  	_   incomparable
  2321  	res *http.Response
  2322  	err error
  2323  }
  2324  
  2325  // requires cc.mu be held.
  2326  func (cc *ClientConn) addStreamLocked(cs *clientStream) {
  2327  	cs.flow.add(int32(cc.initialWindowSize))
  2328  	cs.flow.setConnFlow(&cc.flow)
  2329  	cs.inflow.init(cc.initialStreamRecvWindowSize)
  2330  	cs.ID = cc.nextStreamID
  2331  	cc.nextStreamID += 2
  2332  	cc.streams[cs.ID] = cs
  2333  	if cs.ID == 0 {
  2334  		panic("assigned stream ID 0")
  2335  	}
  2336  }
  2337  
  2338  func (cc *ClientConn) forgetStreamID(id uint32) {
  2339  	cc.mu.Lock()
  2340  	slen := len(cc.streams)
  2341  	delete(cc.streams, id)
  2342  	if len(cc.streams) != slen-1 {
  2343  		panic("forgetting unknown stream id")
  2344  	}
  2345  	cc.lastActive = cc.t.now()
  2346  	if len(cc.streams) == 0 && cc.idleTimer != nil {
  2347  		cc.idleTimer.Reset(cc.idleTimeout)
  2348  		cc.lastIdle = cc.t.now()
  2349  	}
  2350  	// Wake up writeRequestBody via clientStream.awaitFlowControl and
  2351  	// wake up RoundTrip if there is a pending request.
  2352  	cc.cond.Broadcast()
  2353  
  2354  	closeOnIdle := cc.singleUse || cc.doNotReuse || cc.t.disableKeepAlives() || cc.goAway != nil
  2355  	if closeOnIdle && cc.streamsReserved == 0 && len(cc.streams) == 0 {
  2356  		if VerboseLogs {
  2357  			cc.vlogf("http2: Transport closing idle conn %p (forSingleUse=%v, maxStream=%v)", cc, cc.singleUse, cc.nextStreamID-2)
  2358  		}
  2359  		cc.closed = true
  2360  		defer cc.closeConn()
  2361  	}
  2362  
  2363  	cc.mu.Unlock()
  2364  }
  2365  
  2366  // clientConnReadLoop is the state owned by the clientConn's frame-reading readLoop.
  2367  type clientConnReadLoop struct {
  2368  	_  incomparable
  2369  	cc *ClientConn
  2370  }
  2371  
  2372  // readLoop runs in its own goroutine and reads and dispatches frames.
  2373  func (cc *ClientConn) readLoop() {
  2374  	cc.t.markNewGoroutine()
  2375  	rl := &clientConnReadLoop{cc: cc}
  2376  	defer rl.cleanup()
  2377  	cc.readerErr = rl.run()
  2378  	if ce, ok := cc.readerErr.(ConnectionError); ok {
  2379  		cc.wmu.Lock()
  2380  		cc.fr.WriteGoAway(0, ErrCode(ce), nil)
  2381  		cc.wmu.Unlock()
  2382  	}
  2383  }
  2384  
  2385  // GoAwayError is returned by the Transport when the server closes the
  2386  // TCP connection after sending a GOAWAY frame.
  2387  type GoAwayError struct {
  2388  	LastStreamID uint32
  2389  	ErrCode      ErrCode
  2390  	DebugData    string
  2391  }
  2392  
  2393  func (e GoAwayError) Error() string {
  2394  	return fmt.Sprintf("http2: server sent GOAWAY and closed the connection; LastStreamID=%v, ErrCode=%v, debug=%q",
  2395  		e.LastStreamID, e.ErrCode, e.DebugData)
  2396  }
  2397  
  2398  func isEOFOrNetReadError(err error) bool {
  2399  	if err == io.EOF {
  2400  		return true
  2401  	}
  2402  	ne, ok := err.(*net.OpError)
  2403  	return ok && ne.Op == "read"
  2404  }
  2405  
  2406  func (rl *clientConnReadLoop) cleanup() {
  2407  	cc := rl.cc
  2408  	defer cc.closeConn()
  2409  	defer close(cc.readerDone)
  2410  
  2411  	if cc.idleTimer != nil {
  2412  		cc.idleTimer.Stop()
  2413  	}
  2414  
  2415  	// Close any response bodies if the server closes prematurely.
  2416  	// TODO: also do this if we've written the headers but not
  2417  	// gotten a response yet.
  2418  	err := cc.readerErr
  2419  	cc.mu.Lock()
  2420  	if cc.goAway != nil && isEOFOrNetReadError(err) {
  2421  		err = GoAwayError{
  2422  			LastStreamID: cc.goAway.LastStreamID,
  2423  			ErrCode:      cc.goAway.ErrCode,
  2424  			DebugData:    cc.goAwayDebug,
  2425  		}
  2426  	} else if err == io.EOF {
  2427  		err = io.ErrUnexpectedEOF
  2428  	}
  2429  	cc.closed = true
  2430  
  2431  	// If the connection has never been used, and has been open for only a short time,
  2432  	// leave it in the connection pool for a little while.
  2433  	//
  2434  	// This avoids a situation where new connections are constantly created,
  2435  	// added to the pool, fail, and are removed from the pool, without any error
  2436  	// being surfaced to the user.
  2437  	const unusedWaitTime = 5 * time.Second
  2438  	idleTime := cc.t.now().Sub(cc.lastActive)
  2439  	if atomic.LoadUint32(&cc.atomicReused) == 0 && idleTime < unusedWaitTime {
  2440  		cc.idleTimer = cc.t.afterFunc(unusedWaitTime-idleTime, func() {
  2441  			cc.t.connPool().MarkDead(cc)
  2442  		})
  2443  	} else {
  2444  		cc.mu.Unlock() // avoid any deadlocks in MarkDead
  2445  		cc.t.connPool().MarkDead(cc)
  2446  		cc.mu.Lock()
  2447  	}
  2448  
  2449  	for _, cs := range cc.streams {
  2450  		select {
  2451  		case <-cs.peerClosed:
  2452  			// The server closed the stream before closing the conn,
  2453  			// so no need to interrupt it.
  2454  		default:
  2455  			cs.abortStreamLocked(err)
  2456  		}
  2457  	}
  2458  	cc.cond.Broadcast()
  2459  	cc.mu.Unlock()
  2460  }
  2461  
  2462  // countReadFrameError calls Transport.CountError with a string
  2463  // representing err.
  2464  func (cc *ClientConn) countReadFrameError(err error) {
  2465  	f := cc.t.CountError
  2466  	if f == nil || err == nil {
  2467  		return
  2468  	}
  2469  	if ce, ok := err.(ConnectionError); ok {
  2470  		errCode := ErrCode(ce)
  2471  		f(fmt.Sprintf("read_frame_conn_error_%s", errCode.stringToken()))
  2472  		return
  2473  	}
  2474  	if errors.Is(err, io.EOF) {
  2475  		f("read_frame_eof")
  2476  		return
  2477  	}
  2478  	if errors.Is(err, io.ErrUnexpectedEOF) {
  2479  		f("read_frame_unexpected_eof")
  2480  		return
  2481  	}
  2482  	if errors.Is(err, ErrFrameTooLarge) {
  2483  		f("read_frame_too_large")
  2484  		return
  2485  	}
  2486  	f("read_frame_other")
  2487  }
  2488  
  2489  func (rl *clientConnReadLoop) run() error {
  2490  	cc := rl.cc
  2491  	gotSettings := false
  2492  	readIdleTimeout := cc.readIdleTimeout
  2493  	var t timer
  2494  	if readIdleTimeout != 0 {
  2495  		t = cc.t.afterFunc(readIdleTimeout, cc.healthCheck)
  2496  	}
  2497  	for {
  2498  		f, err := cc.fr.ReadFrame()
  2499  		if t != nil {
  2500  			t.Reset(readIdleTimeout)
  2501  		}
  2502  		if err != nil {
  2503  			cc.vlogf("http2: Transport readFrame error on conn %p: (%T) %v", cc, err, err)
  2504  		}
  2505  		if se, ok := err.(StreamError); ok {
  2506  			if cs := rl.streamByID(se.StreamID, notHeaderOrDataFrame); cs != nil {
  2507  				if se.Cause == nil {
  2508  					se.Cause = cc.fr.errDetail
  2509  				}
  2510  				rl.endStreamError(cs, se)
  2511  			}
  2512  			continue
  2513  		} else if err != nil {
  2514  			cc.countReadFrameError(err)
  2515  			return err
  2516  		}
  2517  		if VerboseLogs {
  2518  			cc.vlogf("http2: Transport received %s", summarizeFrame(f))
  2519  		}
  2520  		if !gotSettings {
  2521  			if _, ok := f.(*SettingsFrame); !ok {
  2522  				cc.logf("protocol error: received %T before a SETTINGS frame", f)
  2523  				return ConnectionError(ErrCodeProtocol)
  2524  			}
  2525  			gotSettings = true
  2526  		}
  2527  
  2528  		switch f := f.(type) {
  2529  		case *MetaHeadersFrame:
  2530  			err = rl.processHeaders(f)
  2531  		case *DataFrame:
  2532  			err = rl.processData(f)
  2533  		case *GoAwayFrame:
  2534  			err = rl.processGoAway(f)
  2535  		case *RSTStreamFrame:
  2536  			err = rl.processResetStream(f)
  2537  		case *SettingsFrame:
  2538  			err = rl.processSettings(f)
  2539  		case *PushPromiseFrame:
  2540  			err = rl.processPushPromise(f)
  2541  		case *WindowUpdateFrame:
  2542  			err = rl.processWindowUpdate(f)
  2543  		case *PingFrame:
  2544  			err = rl.processPing(f)
  2545  		default:
  2546  			cc.logf("Transport: unhandled response frame type %T", f)
  2547  		}
  2548  		if err != nil {
  2549  			if VerboseLogs {
  2550  				cc.vlogf("http2: Transport conn %p received error from processing frame %v: %v", cc, summarizeFrame(f), err)
  2551  			}
  2552  			if !cc.seenSettings {
  2553  				close(cc.seenSettingsChan)
  2554  			}
  2555  			return err
  2556  		}
  2557  	}
  2558  }
  2559  
  2560  func (rl *clientConnReadLoop) processHeaders(f *MetaHeadersFrame) error {
  2561  	cs := rl.streamByID(f.StreamID, headerOrDataFrame)
  2562  	if cs == nil {
  2563  		// We'd get here if we canceled a request while the
  2564  		// server had its response still in flight. So if this
  2565  		// was just something we canceled, ignore it.
  2566  		return nil
  2567  	}
  2568  	if cs.readClosed {
  2569  		rl.endStreamError(cs, StreamError{
  2570  			StreamID: f.StreamID,
  2571  			Code:     ErrCodeProtocol,
  2572  			Cause:    errors.New("protocol error: headers after END_STREAM"),
  2573  		})
  2574  		return nil
  2575  	}
  2576  	if !cs.firstByte {
  2577  		if cs.trace != nil {
  2578  			// TODO(bradfitz): move first response byte earlier,
  2579  			// when we first read the 9 byte header, not waiting
  2580  			// until all the HEADERS+CONTINUATION frames have been
  2581  			// merged. This works for now.
  2582  			traceFirstResponseByte(cs.trace)
  2583  		}
  2584  		cs.firstByte = true
  2585  	}
  2586  	if !cs.pastHeaders {
  2587  		cs.pastHeaders = true
  2588  	} else {
  2589  		return rl.processTrailers(cs, f)
  2590  	}
  2591  
  2592  	res, err := rl.handleResponse(cs, f)
  2593  	if err != nil {
  2594  		if _, ok := err.(ConnectionError); ok {
  2595  			return err
  2596  		}
  2597  		// Any other error type is a stream error.
  2598  		rl.endStreamError(cs, StreamError{
  2599  			StreamID: f.StreamID,
  2600  			Code:     ErrCodeProtocol,
  2601  			Cause:    err,
  2602  		})
  2603  		return nil // return nil from process* funcs to keep conn alive
  2604  	}
  2605  	if res == nil {
  2606  		// (nil, nil) special case. See handleResponse docs.
  2607  		return nil
  2608  	}
  2609  	cs.resTrailer = &res.Trailer
  2610  	cs.res = res
  2611  	close(cs.respHeaderRecv)
  2612  	if f.StreamEnded() {
  2613  		rl.endStream(cs)
  2614  	}
  2615  	return nil
  2616  }
  2617  
  2618  // may return error types nil, or ConnectionError. Any other error value
  2619  // is a StreamError of type ErrCodeProtocol. The returned error in that case
  2620  // is the detail.
  2621  //
  2622  // As a special case, handleResponse may return (nil, nil) to skip the
  2623  // frame (currently only used for 1xx responses).
  2624  func (rl *clientConnReadLoop) handleResponse(cs *clientStream, f *MetaHeadersFrame) (*http.Response, error) {
  2625  	if f.Truncated {
  2626  		return nil, errResponseHeaderListSize
  2627  	}
  2628  
  2629  	status := f.PseudoValue("status")
  2630  	if status == "" {
  2631  		return nil, errors.New("malformed response from server: missing status pseudo header")
  2632  	}
  2633  	statusCode, err := strconv.Atoi(status)
  2634  	if err != nil {
  2635  		return nil, errors.New("malformed response from server: malformed non-numeric status pseudo header")
  2636  	}
  2637  
  2638  	regularFields := f.RegularFields()
  2639  	strs := make([]string, len(regularFields))
  2640  	header := make(http.Header, len(regularFields))
  2641  	res := &http.Response{
  2642  		Proto:      "HTTP/2.0",
  2643  		ProtoMajor: 2,
  2644  		Header:     header,
  2645  		StatusCode: statusCode,
  2646  		Status:     status + " " + http.StatusText(statusCode),
  2647  	}
  2648  	for _, hf := range regularFields {
  2649  		key := canonicalHeader(hf.Name)
  2650  		if key == "Trailer" {
  2651  			t := res.Trailer
  2652  			if t == nil {
  2653  				t = make(http.Header)
  2654  				res.Trailer = t
  2655  			}
  2656  			foreachHeaderElement(hf.Value, func(v string) {
  2657  				t[canonicalHeader(v)] = nil
  2658  			})
  2659  		} else {
  2660  			vv := header[key]
  2661  			if vv == nil && len(strs) > 0 {
  2662  				// More than likely this will be a single-element key.
  2663  				// Most headers aren't multi-valued.
  2664  				// Set the capacity on strs[0] to 1, so any future append
  2665  				// won't extend the slice into the other strings.
  2666  				vv, strs = strs[:1:1], strs[1:]
  2667  				vv[0] = hf.Value
  2668  				header[key] = vv
  2669  			} else {
  2670  				header[key] = append(vv, hf.Value)
  2671  			}
  2672  		}
  2673  	}
  2674  
  2675  	if statusCode >= 100 && statusCode <= 199 {
  2676  		if f.StreamEnded() {
  2677  			return nil, errors.New("1xx informational response with END_STREAM flag")
  2678  		}
  2679  		if fn := cs.get1xxTraceFunc(); fn != nil {
  2680  			// If the 1xx response is being delivered to the user,
  2681  			// then they're responsible for limiting the number
  2682  			// of responses.
  2683  			if err := fn(statusCode, textproto.MIMEHeader(header)); err != nil {
  2684  				return nil, err
  2685  			}
  2686  		} else {
  2687  			// If the user didn't examine the 1xx response, then we
  2688  			// limit the size of all 1xx headers.
  2689  			//
  2690  			// This differs a bit from the HTTP/1 implementation, which
  2691  			// limits the size of all 1xx headers plus the final response.
  2692  			// Use the larger limit of MaxHeaderListSize and
  2693  			// net/http.Transport.MaxResponseHeaderBytes.
  2694  			limit := int64(cs.cc.t.maxHeaderListSize())
  2695  			if t1 := cs.cc.t.t1; t1 != nil && t1.MaxResponseHeaderBytes > limit {
  2696  				limit = t1.MaxResponseHeaderBytes
  2697  			}
  2698  			for _, h := range f.Fields {
  2699  				cs.totalHeaderSize += int64(h.Size())
  2700  			}
  2701  			if cs.totalHeaderSize > limit {
  2702  				if VerboseLogs {
  2703  					log.Printf("http2: 1xx informational responses too large")
  2704  				}
  2705  				return nil, errors.New("header list too large")
  2706  			}
  2707  		}
  2708  		if statusCode == 100 {
  2709  			traceGot100Continue(cs.trace)
  2710  			select {
  2711  			case cs.on100 <- struct{}{}:
  2712  			default:
  2713  			}
  2714  		}
  2715  		cs.pastHeaders = false // do it all again
  2716  		return nil, nil
  2717  	}
  2718  
  2719  	res.ContentLength = -1
  2720  	if clens := res.Header["Content-Length"]; len(clens) == 1 {
  2721  		if cl, err := strconv.ParseUint(clens[0], 10, 63); err == nil {
  2722  			res.ContentLength = int64(cl)
  2723  		} else {
  2724  			// TODO: care? unlike http/1, it won't mess up our framing, so it's
  2725  			// more safe smuggling-wise to ignore.
  2726  		}
  2727  	} else if len(clens) > 1 {
  2728  		// TODO: care? unlike http/1, it won't mess up our framing, so it's
  2729  		// more safe smuggling-wise to ignore.
  2730  	} else if f.StreamEnded() && !cs.isHead {
  2731  		res.ContentLength = 0
  2732  	}
  2733  
  2734  	if cs.isHead {
  2735  		res.Body = noBody
  2736  		return res, nil
  2737  	}
  2738  
  2739  	if f.StreamEnded() {
  2740  		if res.ContentLength > 0 {
  2741  			res.Body = missingBody{}
  2742  		} else {
  2743  			res.Body = noBody
  2744  		}
  2745  		return res, nil
  2746  	}
  2747  
  2748  	cs.bufPipe.setBuffer(&dataBuffer{expected: res.ContentLength})
  2749  	cs.bytesRemain = res.ContentLength
  2750  	res.Body = transportResponseBody{cs}
  2751  
  2752  	if cs.requestedGzip && asciiEqualFold(res.Header.Get("Content-Encoding"), "gzip") {
  2753  		res.Header.Del("Content-Encoding")
  2754  		res.Header.Del("Content-Length")
  2755  		res.ContentLength = -1
  2756  		res.Body = &gzipReader{body: res.Body}
  2757  		res.Uncompressed = true
  2758  	}
  2759  	return res, nil
  2760  }
  2761  
  2762  func (rl *clientConnReadLoop) processTrailers(cs *clientStream, f *MetaHeadersFrame) error {
  2763  	if cs.pastTrailers {
  2764  		// Too many HEADERS frames for this stream.
  2765  		return ConnectionError(ErrCodeProtocol)
  2766  	}
  2767  	cs.pastTrailers = true
  2768  	if !f.StreamEnded() {
  2769  		// We expect that any headers for trailers also
  2770  		// has END_STREAM.
  2771  		return ConnectionError(ErrCodeProtocol)
  2772  	}
  2773  	if len(f.PseudoFields()) > 0 {
  2774  		// No pseudo header fields are defined for trailers.
  2775  		// TODO: ConnectionError might be overly harsh? Check.
  2776  		return ConnectionError(ErrCodeProtocol)
  2777  	}
  2778  
  2779  	trailer := make(http.Header)
  2780  	for _, hf := range f.RegularFields() {
  2781  		key := canonicalHeader(hf.Name)
  2782  		trailer[key] = append(trailer[key], hf.Value)
  2783  	}
  2784  	cs.trailer = trailer
  2785  
  2786  	rl.endStream(cs)
  2787  	return nil
  2788  }
  2789  
  2790  // transportResponseBody is the concrete type of Transport.RoundTrip's
  2791  // Response.Body. It is an io.ReadCloser.
  2792  type transportResponseBody struct {
  2793  	cs *clientStream
  2794  }
  2795  
  2796  func (b transportResponseBody) Read(p []byte) (n int, err error) {
  2797  	cs := b.cs
  2798  	cc := cs.cc
  2799  
  2800  	if cs.readErr != nil {
  2801  		return 0, cs.readErr
  2802  	}
  2803  	n, err = b.cs.bufPipe.Read(p)
  2804  	if cs.bytesRemain != -1 {
  2805  		if int64(n) > cs.bytesRemain {
  2806  			n = int(cs.bytesRemain)
  2807  			if err == nil {
  2808  				err = errors.New("net/http: server replied with more than declared Content-Length; truncated")
  2809  				cs.abortStream(err)
  2810  			}
  2811  			cs.readErr = err
  2812  			return int(cs.bytesRemain), err
  2813  		}
  2814  		cs.bytesRemain -= int64(n)
  2815  		if err == io.EOF && cs.bytesRemain > 0 {
  2816  			err = io.ErrUnexpectedEOF
  2817  			cs.readErr = err
  2818  			return n, err
  2819  		}
  2820  	}
  2821  	if n == 0 {
  2822  		// No flow control tokens to send back.
  2823  		return
  2824  	}
  2825  
  2826  	cc.mu.Lock()
  2827  	connAdd := cc.inflow.add(n)
  2828  	var streamAdd int32
  2829  	if err == nil { // No need to refresh if the stream is over or failed.
  2830  		streamAdd = cs.inflow.add(n)
  2831  	}
  2832  	cc.mu.Unlock()
  2833  
  2834  	if connAdd != 0 || streamAdd != 0 {
  2835  		cc.wmu.Lock()
  2836  		defer cc.wmu.Unlock()
  2837  		if connAdd != 0 {
  2838  			cc.fr.WriteWindowUpdate(0, mustUint31(connAdd))
  2839  		}
  2840  		if streamAdd != 0 {
  2841  			cc.fr.WriteWindowUpdate(cs.ID, mustUint31(streamAdd))
  2842  		}
  2843  		cc.bw.Flush()
  2844  	}
  2845  	return
  2846  }
  2847  
  2848  var errClosedResponseBody = errors.New("http2: response body closed")
  2849  
  2850  func (b transportResponseBody) Close() error {
  2851  	cs := b.cs
  2852  	cc := cs.cc
  2853  
  2854  	cs.bufPipe.BreakWithError(errClosedResponseBody)
  2855  	cs.abortStream(errClosedResponseBody)
  2856  
  2857  	unread := cs.bufPipe.Len()
  2858  	if unread > 0 {
  2859  		cc.mu.Lock()
  2860  		// Return connection-level flow control.
  2861  		connAdd := cc.inflow.add(unread)
  2862  		cc.mu.Unlock()
  2863  
  2864  		// TODO(dneil): Acquiring this mutex can block indefinitely.
  2865  		// Move flow control return to a goroutine?
  2866  		cc.wmu.Lock()
  2867  		// Return connection-level flow control.
  2868  		if connAdd > 0 {
  2869  			cc.fr.WriteWindowUpdate(0, uint32(connAdd))
  2870  		}
  2871  		cc.bw.Flush()
  2872  		cc.wmu.Unlock()
  2873  	}
  2874  
  2875  	select {
  2876  	case <-cs.donec:
  2877  	case <-cs.ctx.Done():
  2878  		// See golang/go#49366: The net/http package can cancel the
  2879  		// request context after the response body is fully read.
  2880  		// Don't treat this as an error.
  2881  		return nil
  2882  	case <-cs.reqCancel:
  2883  		return errRequestCanceled
  2884  	}
  2885  	return nil
  2886  }
  2887  
  2888  func (rl *clientConnReadLoop) processData(f *DataFrame) error {
  2889  	cc := rl.cc
  2890  	cs := rl.streamByID(f.StreamID, headerOrDataFrame)
  2891  	data := f.Data()
  2892  	if cs == nil {
  2893  		cc.mu.Lock()
  2894  		neverSent := cc.nextStreamID
  2895  		cc.mu.Unlock()
  2896  		if f.StreamID >= neverSent {
  2897  			// We never asked for this.
  2898  			cc.logf("http2: Transport received unsolicited DATA frame; closing connection")
  2899  			return ConnectionError(ErrCodeProtocol)
  2900  		}
  2901  		// We probably did ask for this, but canceled. Just ignore it.
  2902  		// TODO: be stricter here? only silently ignore things which
  2903  		// we canceled, but not things which were closed normally
  2904  		// by the peer? Tough without accumulating too much state.
  2905  
  2906  		// But at least return their flow control:
  2907  		if f.Length > 0 {
  2908  			cc.mu.Lock()
  2909  			ok := cc.inflow.take(f.Length)
  2910  			connAdd := cc.inflow.add(int(f.Length))
  2911  			cc.mu.Unlock()
  2912  			if !ok {
  2913  				return ConnectionError(ErrCodeFlowControl)
  2914  			}
  2915  			if connAdd > 0 {
  2916  				cc.wmu.Lock()
  2917  				cc.fr.WriteWindowUpdate(0, uint32(connAdd))
  2918  				cc.bw.Flush()
  2919  				cc.wmu.Unlock()
  2920  			}
  2921  		}
  2922  		return nil
  2923  	}
  2924  	if cs.readClosed {
  2925  		cc.logf("protocol error: received DATA after END_STREAM")
  2926  		rl.endStreamError(cs, StreamError{
  2927  			StreamID: f.StreamID,
  2928  			Code:     ErrCodeProtocol,
  2929  		})
  2930  		return nil
  2931  	}
  2932  	if !cs.pastHeaders {
  2933  		cc.logf("protocol error: received DATA before a HEADERS frame")
  2934  		rl.endStreamError(cs, StreamError{
  2935  			StreamID: f.StreamID,
  2936  			Code:     ErrCodeProtocol,
  2937  		})
  2938  		return nil
  2939  	}
  2940  	if f.Length > 0 {
  2941  		if cs.isHead && len(data) > 0 {
  2942  			cc.logf("protocol error: received DATA on a HEAD request")
  2943  			rl.endStreamError(cs, StreamError{
  2944  				StreamID: f.StreamID,
  2945  				Code:     ErrCodeProtocol,
  2946  			})
  2947  			return nil
  2948  		}
  2949  		// Check connection-level flow control.
  2950  		cc.mu.Lock()
  2951  		if !takeInflows(&cc.inflow, &cs.inflow, f.Length) {
  2952  			cc.mu.Unlock()
  2953  			return ConnectionError(ErrCodeFlowControl)
  2954  		}
  2955  		// Return any padded flow control now, since we won't
  2956  		// refund it later on body reads.
  2957  		var refund int
  2958  		if pad := int(f.Length) - len(data); pad > 0 {
  2959  			refund += pad
  2960  		}
  2961  
  2962  		didReset := false
  2963  		var err error
  2964  		if len(data) > 0 {
  2965  			if _, err = cs.bufPipe.Write(data); err != nil {
  2966  				// Return len(data) now if the stream is already closed,
  2967  				// since data will never be read.
  2968  				didReset = true
  2969  				refund += len(data)
  2970  			}
  2971  		}
  2972  
  2973  		sendConn := cc.inflow.add(refund)
  2974  		var sendStream int32
  2975  		if !didReset {
  2976  			sendStream = cs.inflow.add(refund)
  2977  		}
  2978  		cc.mu.Unlock()
  2979  
  2980  		if sendConn > 0 || sendStream > 0 {
  2981  			cc.wmu.Lock()
  2982  			if sendConn > 0 {
  2983  				cc.fr.WriteWindowUpdate(0, uint32(sendConn))
  2984  			}
  2985  			if sendStream > 0 {
  2986  				cc.fr.WriteWindowUpdate(cs.ID, uint32(sendStream))
  2987  			}
  2988  			cc.bw.Flush()
  2989  			cc.wmu.Unlock()
  2990  		}
  2991  
  2992  		if err != nil {
  2993  			rl.endStreamError(cs, err)
  2994  			return nil
  2995  		}
  2996  	}
  2997  
  2998  	if f.StreamEnded() {
  2999  		rl.endStream(cs)
  3000  	}
  3001  	return nil
  3002  }
  3003  
  3004  func (rl *clientConnReadLoop) endStream(cs *clientStream) {
  3005  	// TODO: check that any declared content-length matches, like
  3006  	// server.go's (*stream).endStream method.
  3007  	if !cs.readClosed {
  3008  		cs.readClosed = true
  3009  		// Close cs.bufPipe and cs.peerClosed with cc.mu held to avoid a
  3010  		// race condition: The caller can read io.EOF from Response.Body
  3011  		// and close the body before we close cs.peerClosed, causing
  3012  		// cleanupWriteRequest to send a RST_STREAM.
  3013  		rl.cc.mu.Lock()
  3014  		defer rl.cc.mu.Unlock()
  3015  		cs.bufPipe.closeWithErrorAndCode(io.EOF, cs.copyTrailers)
  3016  		close(cs.peerClosed)
  3017  	}
  3018  }
  3019  
  3020  func (rl *clientConnReadLoop) endStreamError(cs *clientStream, err error) {
  3021  	cs.readAborted = true
  3022  	cs.abortStream(err)
  3023  }
  3024  
  3025  // Constants passed to streamByID for documentation purposes.
  3026  const (
  3027  	headerOrDataFrame    = true
  3028  	notHeaderOrDataFrame = false
  3029  )
  3030  
  3031  // streamByID returns the stream with the given id, or nil if no stream has that id.
  3032  // If headerOrData is true, it clears rst.StreamPingsBlocked.
  3033  func (rl *clientConnReadLoop) streamByID(id uint32, headerOrData bool) *clientStream {
  3034  	rl.cc.mu.Lock()
  3035  	defer rl.cc.mu.Unlock()
  3036  	if headerOrData {
  3037  		// Work around an unfortunate gRPC behavior.
  3038  		// See comment on ClientConn.rstStreamPingsBlocked for details.
  3039  		rl.cc.rstStreamPingsBlocked = false
  3040  	}
  3041  	cs := rl.cc.streams[id]
  3042  	if cs != nil && !cs.readAborted {
  3043  		return cs
  3044  	}
  3045  	return nil
  3046  }
  3047  
  3048  func (cs *clientStream) copyTrailers() {
  3049  	for k, vv := range cs.trailer {
  3050  		t := cs.resTrailer
  3051  		if *t == nil {
  3052  			*t = make(http.Header)
  3053  		}
  3054  		(*t)[k] = vv
  3055  	}
  3056  }
  3057  
  3058  func (rl *clientConnReadLoop) processGoAway(f *GoAwayFrame) error {
  3059  	cc := rl.cc
  3060  	cc.t.connPool().MarkDead(cc)
  3061  	if f.ErrCode != 0 {
  3062  		// TODO: deal with GOAWAY more. particularly the error code
  3063  		cc.vlogf("transport got GOAWAY with error code = %v", f.ErrCode)
  3064  		if fn := cc.t.CountError; fn != nil {
  3065  			fn("recv_goaway_" + f.ErrCode.stringToken())
  3066  		}
  3067  	}
  3068  	cc.setGoAway(f)
  3069  	return nil
  3070  }
  3071  
  3072  func (rl *clientConnReadLoop) processSettings(f *SettingsFrame) error {
  3073  	cc := rl.cc
  3074  	// Locking both mu and wmu here allows frame encoding to read settings with only wmu held.
  3075  	// Acquiring wmu when f.IsAck() is unnecessary, but convenient and mostly harmless.
  3076  	cc.wmu.Lock()
  3077  	defer cc.wmu.Unlock()
  3078  
  3079  	if err := rl.processSettingsNoWrite(f); err != nil {
  3080  		return err
  3081  	}
  3082  	if !f.IsAck() {
  3083  		cc.fr.WriteSettingsAck()
  3084  		cc.bw.Flush()
  3085  	}
  3086  	return nil
  3087  }
  3088  
  3089  func (rl *clientConnReadLoop) processSettingsNoWrite(f *SettingsFrame) error {
  3090  	cc := rl.cc
  3091  	cc.mu.Lock()
  3092  	defer cc.mu.Unlock()
  3093  
  3094  	if f.IsAck() {
  3095  		if cc.wantSettingsAck {
  3096  			cc.wantSettingsAck = false
  3097  			return nil
  3098  		}
  3099  		return ConnectionError(ErrCodeProtocol)
  3100  	}
  3101  
  3102  	var seenMaxConcurrentStreams bool
  3103  	err := f.ForeachSetting(func(s Setting) error {
  3104  		switch s.ID {
  3105  		case SettingMaxFrameSize:
  3106  			cc.maxFrameSize = s.Val
  3107  		case SettingMaxConcurrentStreams:
  3108  			cc.maxConcurrentStreams = s.Val
  3109  			seenMaxConcurrentStreams = true
  3110  		case SettingMaxHeaderListSize:
  3111  			cc.peerMaxHeaderListSize = uint64(s.Val)
  3112  		case SettingInitialWindowSize:
  3113  			// Values above the maximum flow-control
  3114  			// window size of 2^31-1 MUST be treated as a
  3115  			// connection error (Section 5.4.1) of type
  3116  			// FLOW_CONTROL_ERROR.
  3117  			if s.Val > math.MaxInt32 {
  3118  				return ConnectionError(ErrCodeFlowControl)
  3119  			}
  3120  
  3121  			// Adjust flow control of currently-open
  3122  			// frames by the difference of the old initial
  3123  			// window size and this one.
  3124  			delta := int32(s.Val) - int32(cc.initialWindowSize)
  3125  			for _, cs := range cc.streams {
  3126  				cs.flow.add(delta)
  3127  			}
  3128  			cc.cond.Broadcast()
  3129  
  3130  			cc.initialWindowSize = s.Val
  3131  		case SettingHeaderTableSize:
  3132  			cc.henc.SetMaxDynamicTableSize(s.Val)
  3133  			cc.peerMaxHeaderTableSize = s.Val
  3134  		case SettingEnableConnectProtocol:
  3135  			if err := s.Valid(); err != nil {
  3136  				return err
  3137  			}
  3138  			// If the peer wants to send us SETTINGS_ENABLE_CONNECT_PROTOCOL,
  3139  			// we require that it do so in the first SETTINGS frame.
  3140  			//
  3141  			// When we attempt to use extended CONNECT, we wait for the first
  3142  			// SETTINGS frame to see if the server supports it. If we let the
  3143  			// server enable the feature with a later SETTINGS frame, then
  3144  			// users will see inconsistent results depending on whether we've
  3145  			// seen that frame or not.
  3146  			if !cc.seenSettings {
  3147  				cc.extendedConnectAllowed = s.Val == 1
  3148  			}
  3149  		default:
  3150  			cc.vlogf("Unhandled Setting: %v", s)
  3151  		}
  3152  		return nil
  3153  	})
  3154  	if err != nil {
  3155  		return err
  3156  	}
  3157  
  3158  	if !cc.seenSettings {
  3159  		if !seenMaxConcurrentStreams {
  3160  			// This was the servers initial SETTINGS frame and it
  3161  			// didn't contain a MAX_CONCURRENT_STREAMS field so
  3162  			// increase the number of concurrent streams this
  3163  			// connection can establish to our default.
  3164  			cc.maxConcurrentStreams = defaultMaxConcurrentStreams
  3165  		}
  3166  		close(cc.seenSettingsChan)
  3167  		cc.seenSettings = true
  3168  	}
  3169  
  3170  	return nil
  3171  }
  3172  
  3173  func (rl *clientConnReadLoop) processWindowUpdate(f *WindowUpdateFrame) error {
  3174  	cc := rl.cc
  3175  	cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
  3176  	if f.StreamID != 0 && cs == nil {
  3177  		return nil
  3178  	}
  3179  
  3180  	cc.mu.Lock()
  3181  	defer cc.mu.Unlock()
  3182  
  3183  	fl := &cc.flow
  3184  	if cs != nil {
  3185  		fl = &cs.flow
  3186  	}
  3187  	if !fl.add(int32(f.Increment)) {
  3188  		// For stream, the sender sends RST_STREAM with an error code of FLOW_CONTROL_ERROR
  3189  		if cs != nil {
  3190  			rl.endStreamError(cs, StreamError{
  3191  				StreamID: f.StreamID,
  3192  				Code:     ErrCodeFlowControl,
  3193  			})
  3194  			return nil
  3195  		}
  3196  
  3197  		return ConnectionError(ErrCodeFlowControl)
  3198  	}
  3199  	cc.cond.Broadcast()
  3200  	return nil
  3201  }
  3202  
  3203  func (rl *clientConnReadLoop) processResetStream(f *RSTStreamFrame) error {
  3204  	cs := rl.streamByID(f.StreamID, notHeaderOrDataFrame)
  3205  	if cs == nil {
  3206  		// TODO: return error if server tries to RST_STREAM an idle stream
  3207  		return nil
  3208  	}
  3209  	serr := streamError(cs.ID, f.ErrCode)
  3210  	serr.Cause = errFromPeer
  3211  	if f.ErrCode == ErrCodeProtocol {
  3212  		rl.cc.SetDoNotReuse()
  3213  	}
  3214  	if fn := cs.cc.t.CountError; fn != nil {
  3215  		fn("recv_rststream_" + f.ErrCode.stringToken())
  3216  	}
  3217  	cs.abortStream(serr)
  3218  
  3219  	cs.bufPipe.CloseWithError(serr)
  3220  	return nil
  3221  }
  3222  
  3223  // Ping sends a PING frame to the server and waits for the ack.
  3224  func (cc *ClientConn) Ping(ctx context.Context) error {
  3225  	c := make(chan struct{})
  3226  	// Generate a random payload
  3227  	var p [8]byte
  3228  	for {
  3229  		if _, err := rand.Read(p[:]); err != nil {
  3230  			return err
  3231  		}
  3232  		cc.mu.Lock()
  3233  		// check for dup before insert
  3234  		if _, found := cc.pings[p]; !found {
  3235  			cc.pings[p] = c
  3236  			cc.mu.Unlock()
  3237  			break
  3238  		}
  3239  		cc.mu.Unlock()
  3240  	}
  3241  	var pingError error
  3242  	errc := make(chan struct{})
  3243  	go func() {
  3244  		cc.t.markNewGoroutine()
  3245  		cc.wmu.Lock()
  3246  		defer cc.wmu.Unlock()
  3247  		if pingError = cc.fr.WritePing(false, p); pingError != nil {
  3248  			close(errc)
  3249  			return
  3250  		}
  3251  		if pingError = cc.bw.Flush(); pingError != nil {
  3252  			close(errc)
  3253  			return
  3254  		}
  3255  	}()
  3256  	select {
  3257  	case <-c:
  3258  		return nil
  3259  	case <-errc:
  3260  		return pingError
  3261  	case <-ctx.Done():
  3262  		return ctx.Err()
  3263  	case <-cc.readerDone:
  3264  		// connection closed
  3265  		return cc.readerErr
  3266  	}
  3267  }
  3268  
  3269  func (rl *clientConnReadLoop) processPing(f *PingFrame) error {
  3270  	if f.IsAck() {
  3271  		cc := rl.cc
  3272  		cc.mu.Lock()
  3273  		defer cc.mu.Unlock()
  3274  		// If ack, notify listener if any
  3275  		if c, ok := cc.pings[f.Data]; ok {
  3276  			close(c)
  3277  			delete(cc.pings, f.Data)
  3278  		}
  3279  		if cc.pendingResets > 0 {
  3280  			// See clientStream.cleanupWriteRequest.
  3281  			cc.pendingResets = 0
  3282  			cc.rstStreamPingsBlocked = true
  3283  			cc.cond.Broadcast()
  3284  		}
  3285  		return nil
  3286  	}
  3287  	cc := rl.cc
  3288  	cc.wmu.Lock()
  3289  	defer cc.wmu.Unlock()
  3290  	if err := cc.fr.WritePing(true, f.Data); err != nil {
  3291  		return err
  3292  	}
  3293  	return cc.bw.Flush()
  3294  }
  3295  
  3296  func (rl *clientConnReadLoop) processPushPromise(f *PushPromiseFrame) error {
  3297  	// We told the peer we don't want them.
  3298  	// Spec says:
  3299  	// "PUSH_PROMISE MUST NOT be sent if the SETTINGS_ENABLE_PUSH
  3300  	// setting of the peer endpoint is set to 0. An endpoint that
  3301  	// has set this setting and has received acknowledgement MUST
  3302  	// treat the receipt of a PUSH_PROMISE frame as a connection
  3303  	// error (Section 5.4.1) of type PROTOCOL_ERROR."
  3304  	return ConnectionError(ErrCodeProtocol)
  3305  }
  3306  
  3307  // writeStreamReset sends a RST_STREAM frame.
  3308  // When ping is true, it also sends a PING frame with a random payload.
  3309  func (cc *ClientConn) writeStreamReset(streamID uint32, code ErrCode, ping bool, err error) {
  3310  	// TODO: map err to more interesting error codes, once the
  3311  	// HTTP community comes up with some. But currently for
  3312  	// RST_STREAM there's no equivalent to GOAWAY frame's debug
  3313  	// data, and the error codes are all pretty vague ("cancel").
  3314  	cc.wmu.Lock()
  3315  	cc.fr.WriteRSTStream(streamID, code)
  3316  	if ping {
  3317  		var payload [8]byte
  3318  		rand.Read(payload[:])
  3319  		cc.fr.WritePing(false, payload)
  3320  	}
  3321  	cc.bw.Flush()
  3322  	cc.wmu.Unlock()
  3323  }
  3324  
  3325  var (
  3326  	errResponseHeaderListSize = errors.New("http2: response header list larger than advertised limit")
  3327  	errRequestHeaderListSize  = errors.New("http2: request header list larger than peer's advertised limit")
  3328  )
  3329  
  3330  func (cc *ClientConn) logf(format string, args ...interface{}) {
  3331  	cc.t.logf(format, args...)
  3332  }
  3333  
  3334  func (cc *ClientConn) vlogf(format string, args ...interface{}) {
  3335  	cc.t.vlogf(format, args...)
  3336  }
  3337  
  3338  func (t *Transport) vlogf(format string, args ...interface{}) {
  3339  	if VerboseLogs {
  3340  		t.logf(format, args...)
  3341  	}
  3342  }
  3343  
  3344  func (t *Transport) logf(format string, args ...interface{}) {
  3345  	log.Printf(format, args...)
  3346  }
  3347  
  3348  var noBody io.ReadCloser = noBodyReader{}
  3349  
  3350  type noBodyReader struct{}
  3351  
  3352  func (noBodyReader) Close() error             { return nil }
  3353  func (noBodyReader) Read([]byte) (int, error) { return 0, io.EOF }
  3354  
  3355  type missingBody struct{}
  3356  
  3357  func (missingBody) Close() error             { return nil }
  3358  func (missingBody) Read([]byte) (int, error) { return 0, io.ErrUnexpectedEOF }
  3359  
  3360  func strSliceContains(ss []string, s string) bool {
  3361  	for _, v := range ss {
  3362  		if v == s {
  3363  			return true
  3364  		}
  3365  	}
  3366  	return false
  3367  }
  3368  
  3369  type erringRoundTripper struct{ err error }
  3370  
  3371  func (rt erringRoundTripper) RoundTripErr() error                             { return rt.err }
  3372  func (rt erringRoundTripper) RoundTrip(*http.Request) (*http.Response, error) { return nil, rt.err }
  3373  
  3374  // gzipReader wraps a response body so it can lazily
  3375  // call gzip.NewReader on the first call to Read
  3376  type gzipReader struct {
  3377  	_    incomparable
  3378  	body io.ReadCloser // underlying Response.Body
  3379  	zr   *gzip.Reader  // lazily-initialized gzip reader
  3380  	zerr error         // sticky error
  3381  }
  3382  
  3383  func (gz *gzipReader) Read(p []byte) (n int, err error) {
  3384  	if gz.zerr != nil {
  3385  		return 0, gz.zerr
  3386  	}
  3387  	if gz.zr == nil {
  3388  		gz.zr, err = gzip.NewReader(gz.body)
  3389  		if err != nil {
  3390  			gz.zerr = err
  3391  			return 0, err
  3392  		}
  3393  	}
  3394  	return gz.zr.Read(p)
  3395  }
  3396  
  3397  func (gz *gzipReader) Close() error {
  3398  	if err := gz.body.Close(); err != nil {
  3399  		return err
  3400  	}
  3401  	gz.zerr = fs.ErrClosed
  3402  	return nil
  3403  }
  3404  
  3405  type errorReader struct{ err error }
  3406  
  3407  func (r errorReader) Read(p []byte) (int, error) { return 0, r.err }
  3408  
  3409  // isConnectionCloseRequest reports whether req should use its own
  3410  // connection for a single request and then close the connection.
  3411  func isConnectionCloseRequest(req *http.Request) bool {
  3412  	return req.Close || httpguts.HeaderValuesContainsToken(req.Header["Connection"], "close")
  3413  }
  3414  
  3415  // registerHTTPSProtocol calls Transport.RegisterProtocol but
  3416  // converting panics into errors.
  3417  func registerHTTPSProtocol(t *http.Transport, rt noDialH2RoundTripper) (err error) {
  3418  	defer func() {
  3419  		if e := recover(); e != nil {
  3420  			err = fmt.Errorf("%v", e)
  3421  		}
  3422  	}()
  3423  	t.RegisterProtocol("https", rt)
  3424  	return nil
  3425  }
  3426  
  3427  // noDialH2RoundTripper is a RoundTripper which only tries to complete the request
  3428  // if there's already has a cached connection to the host.
  3429  // (The field is exported so it can be accessed via reflect from net/http; tested
  3430  // by TestNoDialH2RoundTripperType)
  3431  type noDialH2RoundTripper struct{ *Transport }
  3432  
  3433  func (rt noDialH2RoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
  3434  	res, err := rt.Transport.RoundTrip(req)
  3435  	if isNoCachedConnError(err) {
  3436  		return nil, http.ErrSkipAltProtocol
  3437  	}
  3438  	return res, err
  3439  }
  3440  
  3441  func (t *Transport) idleConnTimeout() time.Duration {
  3442  	// to keep things backwards compatible, we use non-zero values of
  3443  	// IdleConnTimeout, followed by using the IdleConnTimeout on the underlying
  3444  	// http1 transport, followed by 0
  3445  	if t.IdleConnTimeout != 0 {
  3446  		return t.IdleConnTimeout
  3447  	}
  3448  
  3449  	if t.t1 != nil {
  3450  		return t.t1.IdleConnTimeout
  3451  	}
  3452  
  3453  	return 0
  3454  }
  3455  
  3456  func traceGetConn(req *http.Request, hostPort string) {
  3457  	trace := httptrace.ContextClientTrace(req.Context())
  3458  	if trace == nil || trace.GetConn == nil {
  3459  		return
  3460  	}
  3461  	trace.GetConn(hostPort)
  3462  }
  3463  
  3464  func traceGotConn(req *http.Request, cc *ClientConn, reused bool) {
  3465  	trace := httptrace.ContextClientTrace(req.Context())
  3466  	if trace == nil || trace.GotConn == nil {
  3467  		return
  3468  	}
  3469  	ci := httptrace.GotConnInfo{Conn: cc.tconn}
  3470  	ci.Reused = reused
  3471  	cc.mu.Lock()
  3472  	ci.WasIdle = len(cc.streams) == 0 && reused
  3473  	if ci.WasIdle && !cc.lastActive.IsZero() {
  3474  		ci.IdleTime = cc.t.timeSince(cc.lastActive)
  3475  	}
  3476  	cc.mu.Unlock()
  3477  
  3478  	trace.GotConn(ci)
  3479  }
  3480  
  3481  func traceWroteHeaders(trace *httptrace.ClientTrace) {
  3482  	if trace != nil && trace.WroteHeaders != nil {
  3483  		trace.WroteHeaders()
  3484  	}
  3485  }
  3486  
  3487  func traceGot100Continue(trace *httptrace.ClientTrace) {
  3488  	if trace != nil && trace.Got100Continue != nil {
  3489  		trace.Got100Continue()
  3490  	}
  3491  }
  3492  
  3493  func traceWait100Continue(trace *httptrace.ClientTrace) {
  3494  	if trace != nil && trace.Wait100Continue != nil {
  3495  		trace.Wait100Continue()
  3496  	}
  3497  }
  3498  
  3499  func traceWroteRequest(trace *httptrace.ClientTrace, err error) {
  3500  	if trace != nil && trace.WroteRequest != nil {
  3501  		trace.WroteRequest(httptrace.WroteRequestInfo{Err: err})
  3502  	}
  3503  }
  3504  
  3505  func traceFirstResponseByte(trace *httptrace.ClientTrace) {
  3506  	if trace != nil && trace.GotFirstResponseByte != nil {
  3507  		trace.GotFirstResponseByte()
  3508  	}
  3509  }
  3510  
  3511  func traceHasWroteHeaderField(trace *httptrace.ClientTrace) bool {
  3512  	return trace != nil && trace.WroteHeaderField != nil
  3513  }
  3514  
  3515  func traceWroteHeaderField(trace *httptrace.ClientTrace, k, v string) {
  3516  	if trace != nil && trace.WroteHeaderField != nil {
  3517  		trace.WroteHeaderField(k, []string{v})
  3518  	}
  3519  }
  3520  
  3521  func traceGot1xxResponseFunc(trace *httptrace.ClientTrace) func(int, textproto.MIMEHeader) error {
  3522  	if trace != nil {
  3523  		return trace.Got1xxResponse
  3524  	}
  3525  	return nil
  3526  }
  3527  
  3528  // dialTLSWithContext uses tls.Dialer, added in Go 1.15, to open a TLS
  3529  // connection.
  3530  func (t *Transport) dialTLSWithContext(ctx context.Context, network, addr string, cfg *tls.Config) (*tls.Conn, error) {
  3531  	dialer := &tls.Dialer{
  3532  		Config: cfg,
  3533  	}
  3534  	cn, err := dialer.DialContext(ctx, network, addr)
  3535  	if err != nil {
  3536  		return nil, err
  3537  	}
  3538  	tlsCn := cn.(*tls.Conn) // DialContext comment promises this will always succeed
  3539  	return tlsCn, nil
  3540  }
  3541  

View as plain text