...

Source file src/google.golang.org/grpc/internal/transport/http2_server.go

Documentation: google.golang.org/grpc/internal/transport

     1  /*
     2   *
     3   * Copyright 2014 gRPC authors.
     4   *
     5   * Licensed under the Apache License, Version 2.0 (the "License");
     6   * you may not use this file except in compliance with the License.
     7   * You may obtain a copy of the License at
     8   *
     9   *     http://www.apache.org/licenses/LICENSE-2.0
    10   *
    11   * Unless required by applicable law or agreed to in writing, software
    12   * distributed under the License is distributed on an "AS IS" BASIS,
    13   * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14   * See the License for the specific language governing permissions and
    15   * limitations under the License.
    16   *
    17   */
    18  
    19  package transport
    20  
    21  import (
    22  	"bytes"
    23  	"context"
    24  	"errors"
    25  	"fmt"
    26  	"io"
    27  	"math"
    28  	"net"
    29  	"net/http"
    30  	"strconv"
    31  	"sync"
    32  	"sync/atomic"
    33  	"time"
    34  
    35  	"golang.org/x/net/http2"
    36  	"golang.org/x/net/http2/hpack"
    37  	"google.golang.org/grpc/internal/grpclog"
    38  	"google.golang.org/grpc/internal/grpcutil"
    39  	"google.golang.org/grpc/internal/pretty"
    40  	"google.golang.org/grpc/internal/syscall"
    41  	"google.golang.org/protobuf/proto"
    42  
    43  	"google.golang.org/grpc/codes"
    44  	"google.golang.org/grpc/credentials"
    45  	"google.golang.org/grpc/internal/channelz"
    46  	"google.golang.org/grpc/internal/grpcrand"
    47  	"google.golang.org/grpc/internal/grpcsync"
    48  	"google.golang.org/grpc/keepalive"
    49  	"google.golang.org/grpc/metadata"
    50  	"google.golang.org/grpc/peer"
    51  	"google.golang.org/grpc/stats"
    52  	"google.golang.org/grpc/status"
    53  	"google.golang.org/grpc/tap"
    54  )
    55  
    56  var (
    57  	// ErrIllegalHeaderWrite indicates that setting header is illegal because of
    58  	// the stream's state.
    59  	ErrIllegalHeaderWrite = status.Error(codes.Internal, "transport: SendHeader called multiple times")
    60  	// ErrHeaderListSizeLimitViolation indicates that the header list size is larger
    61  	// than the limit set by peer.
    62  	ErrHeaderListSizeLimitViolation = status.Error(codes.Internal, "transport: trying to send header list size larger than the limit set by peer")
    63  )
    64  
    65  // serverConnectionCounter counts the number of connections a server has seen
    66  // (equal to the number of http2Servers created). Must be accessed atomically.
    67  var serverConnectionCounter uint64
    68  
    69  // http2Server implements the ServerTransport interface with HTTP2.
    70  type http2Server struct {
    71  	lastRead        int64 // Keep this field 64-bit aligned. Accessed atomically.
    72  	done            chan struct{}
    73  	conn            net.Conn
    74  	loopy           *loopyWriter
    75  	readerDone      chan struct{} // sync point to enable testing.
    76  	loopyWriterDone chan struct{}
    77  	peer            peer.Peer
    78  	inTapHandle     tap.ServerInHandle
    79  	framer          *framer
    80  	// The max number of concurrent streams.
    81  	maxStreams uint32
    82  	// controlBuf delivers all the control related tasks (e.g., window
    83  	// updates, reset streams, and various settings) to the controller.
    84  	controlBuf *controlBuffer
    85  	fc         *trInFlow
    86  	stats      []stats.Handler
    87  	// Keepalive and max-age parameters for the server.
    88  	kp keepalive.ServerParameters
    89  	// Keepalive enforcement policy.
    90  	kep keepalive.EnforcementPolicy
    91  	// The time instance last ping was received.
    92  	lastPingAt time.Time
    93  	// Number of times the client has violated keepalive ping policy so far.
    94  	pingStrikes uint8
    95  	// Flag to signify that number of ping strikes should be reset to 0.
    96  	// This is set whenever data or header frames are sent.
    97  	// 1 means yes.
    98  	resetPingStrikes      uint32 // Accessed atomically.
    99  	initialWindowSize     int32
   100  	bdpEst                *bdpEstimator
   101  	maxSendHeaderListSize *uint32
   102  
   103  	mu sync.Mutex // guard the following
   104  
   105  	// drainEvent is initialized when Drain() is called the first time. After
   106  	// which the server writes out the first GoAway(with ID 2^31-1) frame. Then
   107  	// an independent goroutine will be launched to later send the second
   108  	// GoAway. During this time we don't want to write another first GoAway(with
   109  	// ID 2^31 -1) frame. Thus call to Drain() will be a no-op if drainEvent is
   110  	// already initialized since draining is already underway.
   111  	drainEvent    *grpcsync.Event
   112  	state         transportState
   113  	activeStreams map[uint32]*Stream
   114  	// idle is the time instant when the connection went idle.
   115  	// This is either the beginning of the connection or when the number of
   116  	// RPCs go down to 0.
   117  	// When the connection is busy, this value is set to 0.
   118  	idle time.Time
   119  
   120  	// Fields below are for channelz metric collection.
   121  	channelz   *channelz.Socket
   122  	bufferPool *bufferPool
   123  
   124  	connectionID uint64
   125  
   126  	// maxStreamMu guards the maximum stream ID
   127  	// This lock may not be taken if mu is already held.
   128  	maxStreamMu sync.Mutex
   129  	maxStreamID uint32 // max stream ID ever seen
   130  
   131  	logger *grpclog.PrefixLogger
   132  }
   133  
   134  // NewServerTransport creates a http2 transport with conn and configuration
   135  // options from config.
   136  //
   137  // It returns a non-nil transport and a nil error on success. On failure, it
   138  // returns a nil transport and a non-nil error. For a special case where the
   139  // underlying conn gets closed before the client preface could be read, it
   140  // returns a nil transport and a nil error.
   141  func NewServerTransport(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
   142  	var authInfo credentials.AuthInfo
   143  	rawConn := conn
   144  	if config.Credentials != nil {
   145  		var err error
   146  		conn, authInfo, err = config.Credentials.ServerHandshake(rawConn)
   147  		if err != nil {
   148  			// ErrConnDispatched means that the connection was dispatched away
   149  			// from gRPC; those connections should be left open. io.EOF means
   150  			// the connection was closed before handshaking completed, which can
   151  			// happen naturally from probers. Return these errors directly.
   152  			if err == credentials.ErrConnDispatched || err == io.EOF {
   153  				return nil, err
   154  			}
   155  			return nil, connectionErrorf(false, err, "ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err)
   156  		}
   157  	}
   158  	writeBufSize := config.WriteBufferSize
   159  	readBufSize := config.ReadBufferSize
   160  	maxHeaderListSize := defaultServerMaxHeaderListSize
   161  	if config.MaxHeaderListSize != nil {
   162  		maxHeaderListSize = *config.MaxHeaderListSize
   163  	}
   164  	framer := newFramer(conn, writeBufSize, readBufSize, config.SharedWriteBuffer, maxHeaderListSize)
   165  	// Send initial settings as connection preface to client.
   166  	isettings := []http2.Setting{{
   167  		ID:  http2.SettingMaxFrameSize,
   168  		Val: http2MaxFrameLen,
   169  	}}
   170  	if config.MaxStreams != math.MaxUint32 {
   171  		isettings = append(isettings, http2.Setting{
   172  			ID:  http2.SettingMaxConcurrentStreams,
   173  			Val: config.MaxStreams,
   174  		})
   175  	}
   176  	dynamicWindow := true
   177  	iwz := int32(initialWindowSize)
   178  	if config.InitialWindowSize >= defaultWindowSize {
   179  		iwz = config.InitialWindowSize
   180  		dynamicWindow = false
   181  	}
   182  	icwz := int32(initialWindowSize)
   183  	if config.InitialConnWindowSize >= defaultWindowSize {
   184  		icwz = config.InitialConnWindowSize
   185  		dynamicWindow = false
   186  	}
   187  	if iwz != defaultWindowSize {
   188  		isettings = append(isettings, http2.Setting{
   189  			ID:  http2.SettingInitialWindowSize,
   190  			Val: uint32(iwz)})
   191  	}
   192  	if config.MaxHeaderListSize != nil {
   193  		isettings = append(isettings, http2.Setting{
   194  			ID:  http2.SettingMaxHeaderListSize,
   195  			Val: *config.MaxHeaderListSize,
   196  		})
   197  	}
   198  	if config.HeaderTableSize != nil {
   199  		isettings = append(isettings, http2.Setting{
   200  			ID:  http2.SettingHeaderTableSize,
   201  			Val: *config.HeaderTableSize,
   202  		})
   203  	}
   204  	if err := framer.fr.WriteSettings(isettings...); err != nil {
   205  		return nil, connectionErrorf(false, err, "transport: %v", err)
   206  	}
   207  	// Adjust the connection flow control window if needed.
   208  	if delta := uint32(icwz - defaultWindowSize); delta > 0 {
   209  		if err := framer.fr.WriteWindowUpdate(0, delta); err != nil {
   210  			return nil, connectionErrorf(false, err, "transport: %v", err)
   211  		}
   212  	}
   213  	kp := config.KeepaliveParams
   214  	if kp.MaxConnectionIdle == 0 {
   215  		kp.MaxConnectionIdle = defaultMaxConnectionIdle
   216  	}
   217  	if kp.MaxConnectionAge == 0 {
   218  		kp.MaxConnectionAge = defaultMaxConnectionAge
   219  	}
   220  	// Add a jitter to MaxConnectionAge.
   221  	kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
   222  	if kp.MaxConnectionAgeGrace == 0 {
   223  		kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
   224  	}
   225  	if kp.Time == 0 {
   226  		kp.Time = defaultServerKeepaliveTime
   227  	}
   228  	if kp.Timeout == 0 {
   229  		kp.Timeout = defaultServerKeepaliveTimeout
   230  	}
   231  	if kp.Time != infinity {
   232  		if err = syscall.SetTCPUserTimeout(rawConn, kp.Timeout); err != nil {
   233  			return nil, connectionErrorf(false, err, "transport: failed to set TCP_USER_TIMEOUT: %v", err)
   234  		}
   235  	}
   236  	kep := config.KeepalivePolicy
   237  	if kep.MinTime == 0 {
   238  		kep.MinTime = defaultKeepalivePolicyMinTime
   239  	}
   240  
   241  	done := make(chan struct{})
   242  	peer := peer.Peer{
   243  		Addr:      conn.RemoteAddr(),
   244  		LocalAddr: conn.LocalAddr(),
   245  		AuthInfo:  authInfo,
   246  	}
   247  	t := &http2Server{
   248  		done:              done,
   249  		conn:              conn,
   250  		peer:              peer,
   251  		framer:            framer,
   252  		readerDone:        make(chan struct{}),
   253  		loopyWriterDone:   make(chan struct{}),
   254  		maxStreams:        config.MaxStreams,
   255  		inTapHandle:       config.InTapHandle,
   256  		fc:                &trInFlow{limit: uint32(icwz)},
   257  		state:             reachable,
   258  		activeStreams:     make(map[uint32]*Stream),
   259  		stats:             config.StatsHandlers,
   260  		kp:                kp,
   261  		idle:              time.Now(),
   262  		kep:               kep,
   263  		initialWindowSize: iwz,
   264  		bufferPool:        newBufferPool(),
   265  	}
   266  	var czSecurity credentials.ChannelzSecurityValue
   267  	if au, ok := authInfo.(credentials.ChannelzSecurityInfo); ok {
   268  		czSecurity = au.GetSecurityValue()
   269  	}
   270  	t.channelz = channelz.RegisterSocket(
   271  		&channelz.Socket{
   272  			SocketType:       channelz.SocketTypeNormal,
   273  			Parent:           config.ChannelzParent,
   274  			SocketMetrics:    channelz.SocketMetrics{},
   275  			EphemeralMetrics: t.socketMetrics,
   276  			LocalAddr:        t.peer.LocalAddr,
   277  			RemoteAddr:       t.peer.Addr,
   278  			SocketOptions:    channelz.GetSocketOption(t.conn),
   279  			Security:         czSecurity,
   280  		},
   281  	)
   282  	t.logger = prefixLoggerForServerTransport(t)
   283  
   284  	t.controlBuf = newControlBuffer(t.done)
   285  	if dynamicWindow {
   286  		t.bdpEst = &bdpEstimator{
   287  			bdp:               initialWindowSize,
   288  			updateFlowControl: t.updateFlowControl,
   289  		}
   290  	}
   291  
   292  	t.connectionID = atomic.AddUint64(&serverConnectionCounter, 1)
   293  	t.framer.writer.Flush()
   294  
   295  	defer func() {
   296  		if err != nil {
   297  			t.Close(err)
   298  		}
   299  	}()
   300  
   301  	// Check the validity of client preface.
   302  	preface := make([]byte, len(clientPreface))
   303  	if _, err := io.ReadFull(t.conn, preface); err != nil {
   304  		// In deployments where a gRPC server runs behind a cloud load balancer
   305  		// which performs regular TCP level health checks, the connection is
   306  		// closed immediately by the latter.  Returning io.EOF here allows the
   307  		// grpc server implementation to recognize this scenario and suppress
   308  		// logging to reduce spam.
   309  		if err == io.EOF {
   310  			return nil, io.EOF
   311  		}
   312  		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to receive the preface from client: %v", err)
   313  	}
   314  	if !bytes.Equal(preface, clientPreface) {
   315  		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams received bogus greeting from client: %q", preface)
   316  	}
   317  
   318  	frame, err := t.framer.fr.ReadFrame()
   319  	if err == io.EOF || err == io.ErrUnexpectedEOF {
   320  		return nil, err
   321  	}
   322  	if err != nil {
   323  		return nil, connectionErrorf(false, err, "transport: http2Server.HandleStreams failed to read initial settings frame: %v", err)
   324  	}
   325  	atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
   326  	sf, ok := frame.(*http2.SettingsFrame)
   327  	if !ok {
   328  		return nil, connectionErrorf(false, nil, "transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
   329  	}
   330  	t.handleSettings(sf)
   331  
   332  	go func() {
   333  		t.loopy = newLoopyWriter(serverSide, t.framer, t.controlBuf, t.bdpEst, t.conn, t.logger, t.outgoingGoAwayHandler)
   334  		err := t.loopy.run()
   335  		close(t.loopyWriterDone)
   336  		if !isIOError(err) {
   337  			// Close the connection if a non-I/O error occurs (for I/O errors
   338  			// the reader will also encounter the error and close).  Wait 1
   339  			// second before closing the connection, or when the reader is done
   340  			// (i.e. the client already closed the connection or a connection
   341  			// error occurred).  This avoids the potential problem where there
   342  			// is unread data on the receive side of the connection, which, if
   343  			// closed, would lead to a TCP RST instead of FIN, and the client
   344  			// encountering errors.  For more info:
   345  			// https://github.com/grpc/grpc-go/issues/5358
   346  			timer := time.NewTimer(time.Second)
   347  			defer timer.Stop()
   348  			select {
   349  			case <-t.readerDone:
   350  			case <-timer.C:
   351  			}
   352  			t.conn.Close()
   353  		}
   354  	}()
   355  	go t.keepalive()
   356  	return t, nil
   357  }
   358  
   359  // operateHeaders takes action on the decoded headers. Returns an error if fatal
   360  // error encountered and transport needs to close, otherwise returns nil.
   361  func (t *http2Server) operateHeaders(ctx context.Context, frame *http2.MetaHeadersFrame, handle func(*Stream)) error {
   362  	// Acquire max stream ID lock for entire duration
   363  	t.maxStreamMu.Lock()
   364  	defer t.maxStreamMu.Unlock()
   365  
   366  	streamID := frame.Header().StreamID
   367  
   368  	// frame.Truncated is set to true when framer detects that the current header
   369  	// list size hits MaxHeaderListSize limit.
   370  	if frame.Truncated {
   371  		t.controlBuf.put(&cleanupStream{
   372  			streamID: streamID,
   373  			rst:      true,
   374  			rstCode:  http2.ErrCodeFrameSize,
   375  			onWrite:  func() {},
   376  		})
   377  		return nil
   378  	}
   379  
   380  	if streamID%2 != 1 || streamID <= t.maxStreamID {
   381  		// illegal gRPC stream id.
   382  		return fmt.Errorf("received an illegal stream id: %v. headers frame: %+v", streamID, frame)
   383  	}
   384  	t.maxStreamID = streamID
   385  
   386  	buf := newRecvBuffer()
   387  	s := &Stream{
   388  		id:               streamID,
   389  		st:               t,
   390  		buf:              buf,
   391  		fc:               &inFlow{limit: uint32(t.initialWindowSize)},
   392  		headerWireLength: int(frame.Header().Length),
   393  	}
   394  	var (
   395  		// if false, content-type was missing or invalid
   396  		isGRPC      = false
   397  		contentType = ""
   398  		mdata       = make(metadata.MD, len(frame.Fields))
   399  		httpMethod  string
   400  		// these are set if an error is encountered while parsing the headers
   401  		protocolError bool
   402  		headerError   *status.Status
   403  
   404  		timeoutSet bool
   405  		timeout    time.Duration
   406  	)
   407  
   408  	for _, hf := range frame.Fields {
   409  		switch hf.Name {
   410  		case "content-type":
   411  			contentSubtype, validContentType := grpcutil.ContentSubtype(hf.Value)
   412  			if !validContentType {
   413  				contentType = hf.Value
   414  				break
   415  			}
   416  			mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
   417  			s.contentSubtype = contentSubtype
   418  			isGRPC = true
   419  
   420  		case "grpc-accept-encoding":
   421  			mdata[hf.Name] = append(mdata[hf.Name], hf.Value)
   422  			if hf.Value == "" {
   423  				continue
   424  			}
   425  			compressors := hf.Value
   426  			if s.clientAdvertisedCompressors != "" {
   427  				compressors = s.clientAdvertisedCompressors + "," + compressors
   428  			}
   429  			s.clientAdvertisedCompressors = compressors
   430  		case "grpc-encoding":
   431  			s.recvCompress = hf.Value
   432  		case ":method":
   433  			httpMethod = hf.Value
   434  		case ":path":
   435  			s.method = hf.Value
   436  		case "grpc-timeout":
   437  			timeoutSet = true
   438  			var err error
   439  			if timeout, err = decodeTimeout(hf.Value); err != nil {
   440  				headerError = status.Newf(codes.Internal, "malformed grpc-timeout: %v", err)
   441  			}
   442  		// "Transports must consider requests containing the Connection header
   443  		// as malformed." - A41
   444  		case "connection":
   445  			if t.logger.V(logLevel) {
   446  				t.logger.Infof("Received a HEADERS frame with a :connection header which makes the request malformed, as per the HTTP/2 spec")
   447  			}
   448  			protocolError = true
   449  		default:
   450  			if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {
   451  				break
   452  			}
   453  			v, err := decodeMetadataHeader(hf.Name, hf.Value)
   454  			if err != nil {
   455  				headerError = status.Newf(codes.Internal, "malformed binary metadata %q in header %q: %v", hf.Value, hf.Name, err)
   456  				t.logger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)
   457  				break
   458  			}
   459  			mdata[hf.Name] = append(mdata[hf.Name], v)
   460  		}
   461  	}
   462  
   463  	// "If multiple Host headers or multiple :authority headers are present, the
   464  	// request must be rejected with an HTTP status code 400 as required by Host
   465  	// validation in RFC 7230 ยง5.4, gRPC status code INTERNAL, or RST_STREAM
   466  	// with HTTP/2 error code PROTOCOL_ERROR." - A41. Since this is a HTTP/2
   467  	// error, this takes precedence over a client not speaking gRPC.
   468  	if len(mdata[":authority"]) > 1 || len(mdata["host"]) > 1 {
   469  		errMsg := fmt.Sprintf("num values of :authority: %v, num values of host: %v, both must only have 1 value as per HTTP/2 spec", len(mdata[":authority"]), len(mdata["host"]))
   470  		if t.logger.V(logLevel) {
   471  			t.logger.Infof("Aborting the stream early: %v", errMsg)
   472  		}
   473  		t.controlBuf.put(&earlyAbortStream{
   474  			httpStatus:     http.StatusBadRequest,
   475  			streamID:       streamID,
   476  			contentSubtype: s.contentSubtype,
   477  			status:         status.New(codes.Internal, errMsg),
   478  			rst:            !frame.StreamEnded(),
   479  		})
   480  		return nil
   481  	}
   482  
   483  	if protocolError {
   484  		t.controlBuf.put(&cleanupStream{
   485  			streamID: streamID,
   486  			rst:      true,
   487  			rstCode:  http2.ErrCodeProtocol,
   488  			onWrite:  func() {},
   489  		})
   490  		return nil
   491  	}
   492  	if !isGRPC {
   493  		t.controlBuf.put(&earlyAbortStream{
   494  			httpStatus:     http.StatusUnsupportedMediaType,
   495  			streamID:       streamID,
   496  			contentSubtype: s.contentSubtype,
   497  			status:         status.Newf(codes.InvalidArgument, "invalid gRPC request content-type %q", contentType),
   498  			rst:            !frame.StreamEnded(),
   499  		})
   500  		return nil
   501  	}
   502  	if headerError != nil {
   503  		t.controlBuf.put(&earlyAbortStream{
   504  			httpStatus:     http.StatusBadRequest,
   505  			streamID:       streamID,
   506  			contentSubtype: s.contentSubtype,
   507  			status:         headerError,
   508  			rst:            !frame.StreamEnded(),
   509  		})
   510  		return nil
   511  	}
   512  
   513  	// "If :authority is missing, Host must be renamed to :authority." - A41
   514  	if len(mdata[":authority"]) == 0 {
   515  		// No-op if host isn't present, no eventual :authority header is a valid
   516  		// RPC.
   517  		if host, ok := mdata["host"]; ok {
   518  			mdata[":authority"] = host
   519  			delete(mdata, "host")
   520  		}
   521  	} else {
   522  		// "If :authority is present, Host must be discarded" - A41
   523  		delete(mdata, "host")
   524  	}
   525  
   526  	if frame.StreamEnded() {
   527  		// s is just created by the caller. No lock needed.
   528  		s.state = streamReadDone
   529  	}
   530  	if timeoutSet {
   531  		s.ctx, s.cancel = context.WithTimeout(ctx, timeout)
   532  	} else {
   533  		s.ctx, s.cancel = context.WithCancel(ctx)
   534  	}
   535  
   536  	// Attach the received metadata to the context.
   537  	if len(mdata) > 0 {
   538  		s.ctx = metadata.NewIncomingContext(s.ctx, mdata)
   539  		if statsTags := mdata["grpc-tags-bin"]; len(statsTags) > 0 {
   540  			s.ctx = stats.SetIncomingTags(s.ctx, []byte(statsTags[len(statsTags)-1]))
   541  		}
   542  		if statsTrace := mdata["grpc-trace-bin"]; len(statsTrace) > 0 {
   543  			s.ctx = stats.SetIncomingTrace(s.ctx, []byte(statsTrace[len(statsTrace)-1]))
   544  		}
   545  	}
   546  	t.mu.Lock()
   547  	if t.state != reachable {
   548  		t.mu.Unlock()
   549  		s.cancel()
   550  		return nil
   551  	}
   552  	if uint32(len(t.activeStreams)) >= t.maxStreams {
   553  		t.mu.Unlock()
   554  		t.controlBuf.put(&cleanupStream{
   555  			streamID: streamID,
   556  			rst:      true,
   557  			rstCode:  http2.ErrCodeRefusedStream,
   558  			onWrite:  func() {},
   559  		})
   560  		s.cancel()
   561  		return nil
   562  	}
   563  	if httpMethod != http.MethodPost {
   564  		t.mu.Unlock()
   565  		errMsg := fmt.Sprintf("Received a HEADERS frame with :method %q which should be POST", httpMethod)
   566  		if t.logger.V(logLevel) {
   567  			t.logger.Infof("Aborting the stream early: %v", errMsg)
   568  		}
   569  		t.controlBuf.put(&earlyAbortStream{
   570  			httpStatus:     405,
   571  			streamID:       streamID,
   572  			contentSubtype: s.contentSubtype,
   573  			status:         status.New(codes.Internal, errMsg),
   574  			rst:            !frame.StreamEnded(),
   575  		})
   576  		s.cancel()
   577  		return nil
   578  	}
   579  	if t.inTapHandle != nil {
   580  		var err error
   581  		if s.ctx, err = t.inTapHandle(s.ctx, &tap.Info{FullMethodName: s.method, Header: mdata}); err != nil {
   582  			t.mu.Unlock()
   583  			if t.logger.V(logLevel) {
   584  				t.logger.Infof("Aborting the stream early due to InTapHandle failure: %v", err)
   585  			}
   586  			stat, ok := status.FromError(err)
   587  			if !ok {
   588  				stat = status.New(codes.PermissionDenied, err.Error())
   589  			}
   590  			t.controlBuf.put(&earlyAbortStream{
   591  				httpStatus:     200,
   592  				streamID:       s.id,
   593  				contentSubtype: s.contentSubtype,
   594  				status:         stat,
   595  				rst:            !frame.StreamEnded(),
   596  			})
   597  			return nil
   598  		}
   599  	}
   600  	t.activeStreams[streamID] = s
   601  	if len(t.activeStreams) == 1 {
   602  		t.idle = time.Time{}
   603  	}
   604  	t.mu.Unlock()
   605  	if channelz.IsOn() {
   606  		t.channelz.SocketMetrics.StreamsStarted.Add(1)
   607  		t.channelz.SocketMetrics.LastRemoteStreamCreatedTimestamp.Store(time.Now().UnixNano())
   608  	}
   609  	s.requestRead = func(n int) {
   610  		t.adjustWindow(s, uint32(n))
   611  	}
   612  	s.ctxDone = s.ctx.Done()
   613  	s.wq = newWriteQuota(defaultWriteQuota, s.ctxDone)
   614  	s.trReader = &transportReader{
   615  		reader: &recvBufferReader{
   616  			ctx:        s.ctx,
   617  			ctxDone:    s.ctxDone,
   618  			recv:       s.buf,
   619  			freeBuffer: t.bufferPool.put,
   620  		},
   621  		windowHandler: func(n int) {
   622  			t.updateWindow(s, uint32(n))
   623  		},
   624  	}
   625  	// Register the stream with loopy.
   626  	t.controlBuf.put(&registerStream{
   627  		streamID: s.id,
   628  		wq:       s.wq,
   629  	})
   630  	handle(s)
   631  	return nil
   632  }
   633  
   634  // HandleStreams receives incoming streams using the given handler. This is
   635  // typically run in a separate goroutine.
   636  // traceCtx attaches trace to ctx and returns the new context.
   637  func (t *http2Server) HandleStreams(ctx context.Context, handle func(*Stream)) {
   638  	defer func() {
   639  		close(t.readerDone)
   640  		<-t.loopyWriterDone
   641  	}()
   642  	for {
   643  		t.controlBuf.throttle()
   644  		frame, err := t.framer.fr.ReadFrame()
   645  		atomic.StoreInt64(&t.lastRead, time.Now().UnixNano())
   646  		if err != nil {
   647  			if se, ok := err.(http2.StreamError); ok {
   648  				if t.logger.V(logLevel) {
   649  					t.logger.Warningf("Encountered http2.StreamError: %v", se)
   650  				}
   651  				t.mu.Lock()
   652  				s := t.activeStreams[se.StreamID]
   653  				t.mu.Unlock()
   654  				if s != nil {
   655  					t.closeStream(s, true, se.Code, false)
   656  				} else {
   657  					t.controlBuf.put(&cleanupStream{
   658  						streamID: se.StreamID,
   659  						rst:      true,
   660  						rstCode:  se.Code,
   661  						onWrite:  func() {},
   662  					})
   663  				}
   664  				continue
   665  			}
   666  			t.Close(err)
   667  			return
   668  		}
   669  		switch frame := frame.(type) {
   670  		case *http2.MetaHeadersFrame:
   671  			if err := t.operateHeaders(ctx, frame, handle); err != nil {
   672  				// Any error processing client headers, e.g. invalid stream ID,
   673  				// is considered a protocol violation.
   674  				t.controlBuf.put(&goAway{
   675  					code:      http2.ErrCodeProtocol,
   676  					debugData: []byte(err.Error()),
   677  					closeConn: err,
   678  				})
   679  				continue
   680  			}
   681  		case *http2.DataFrame:
   682  			t.handleData(frame)
   683  		case *http2.RSTStreamFrame:
   684  			t.handleRSTStream(frame)
   685  		case *http2.SettingsFrame:
   686  			t.handleSettings(frame)
   687  		case *http2.PingFrame:
   688  			t.handlePing(frame)
   689  		case *http2.WindowUpdateFrame:
   690  			t.handleWindowUpdate(frame)
   691  		case *http2.GoAwayFrame:
   692  			// TODO: Handle GoAway from the client appropriately.
   693  		default:
   694  			if t.logger.V(logLevel) {
   695  				t.logger.Infof("Received unsupported frame type %T", frame)
   696  			}
   697  		}
   698  	}
   699  }
   700  
   701  func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
   702  	t.mu.Lock()
   703  	defer t.mu.Unlock()
   704  	if t.activeStreams == nil {
   705  		// The transport is closing.
   706  		return nil, false
   707  	}
   708  	s, ok := t.activeStreams[f.Header().StreamID]
   709  	if !ok {
   710  		// The stream is already done.
   711  		return nil, false
   712  	}
   713  	return s, true
   714  }
   715  
   716  // adjustWindow sends out extra window update over the initial window size
   717  // of stream if the application is requesting data larger in size than
   718  // the window.
   719  func (t *http2Server) adjustWindow(s *Stream, n uint32) {
   720  	if w := s.fc.maybeAdjust(n); w > 0 {
   721  		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id, increment: w})
   722  	}
   723  
   724  }
   725  
   726  // updateWindow adjusts the inbound quota for the stream and the transport.
   727  // Window updates will deliver to the controller for sending when
   728  // the cumulative quota exceeds the corresponding threshold.
   729  func (t *http2Server) updateWindow(s *Stream, n uint32) {
   730  	if w := s.fc.onRead(n); w > 0 {
   731  		t.controlBuf.put(&outgoingWindowUpdate{streamID: s.id,
   732  			increment: w,
   733  		})
   734  	}
   735  }
   736  
   737  // updateFlowControl updates the incoming flow control windows
   738  // for the transport and the stream based on the current bdp
   739  // estimation.
   740  func (t *http2Server) updateFlowControl(n uint32) {
   741  	t.mu.Lock()
   742  	for _, s := range t.activeStreams {
   743  		s.fc.newLimit(n)
   744  	}
   745  	t.initialWindowSize = int32(n)
   746  	t.mu.Unlock()
   747  	t.controlBuf.put(&outgoingWindowUpdate{
   748  		streamID:  0,
   749  		increment: t.fc.newLimit(n),
   750  	})
   751  	t.controlBuf.put(&outgoingSettings{
   752  		ss: []http2.Setting{
   753  			{
   754  				ID:  http2.SettingInitialWindowSize,
   755  				Val: n,
   756  			},
   757  		},
   758  	})
   759  
   760  }
   761  
   762  func (t *http2Server) handleData(f *http2.DataFrame) {
   763  	size := f.Header().Length
   764  	var sendBDPPing bool
   765  	if t.bdpEst != nil {
   766  		sendBDPPing = t.bdpEst.add(size)
   767  	}
   768  	// Decouple connection's flow control from application's read.
   769  	// An update on connection's flow control should not depend on
   770  	// whether user application has read the data or not. Such a
   771  	// restriction is already imposed on the stream's flow control,
   772  	// and therefore the sender will be blocked anyways.
   773  	// Decoupling the connection flow control will prevent other
   774  	// active(fast) streams from starving in presence of slow or
   775  	// inactive streams.
   776  	if w := t.fc.onData(size); w > 0 {
   777  		t.controlBuf.put(&outgoingWindowUpdate{
   778  			streamID:  0,
   779  			increment: w,
   780  		})
   781  	}
   782  	if sendBDPPing {
   783  		// Avoid excessive ping detection (e.g. in an L7 proxy)
   784  		// by sending a window update prior to the BDP ping.
   785  		if w := t.fc.reset(); w > 0 {
   786  			t.controlBuf.put(&outgoingWindowUpdate{
   787  				streamID:  0,
   788  				increment: w,
   789  			})
   790  		}
   791  		t.controlBuf.put(bdpPing)
   792  	}
   793  	// Select the right stream to dispatch.
   794  	s, ok := t.getStream(f)
   795  	if !ok {
   796  		return
   797  	}
   798  	if s.getState() == streamReadDone {
   799  		t.closeStream(s, true, http2.ErrCodeStreamClosed, false)
   800  		return
   801  	}
   802  	if size > 0 {
   803  		if err := s.fc.onData(size); err != nil {
   804  			t.closeStream(s, true, http2.ErrCodeFlowControl, false)
   805  			return
   806  		}
   807  		if f.Header().Flags.Has(http2.FlagDataPadded) {
   808  			if w := s.fc.onRead(size - uint32(len(f.Data()))); w > 0 {
   809  				t.controlBuf.put(&outgoingWindowUpdate{s.id, w})
   810  			}
   811  		}
   812  		// TODO(bradfitz, zhaoq): A copy is required here because there is no
   813  		// guarantee f.Data() is consumed before the arrival of next frame.
   814  		// Can this copy be eliminated?
   815  		if len(f.Data()) > 0 {
   816  			buffer := t.bufferPool.get()
   817  			buffer.Reset()
   818  			buffer.Write(f.Data())
   819  			s.write(recvMsg{buffer: buffer})
   820  		}
   821  	}
   822  	if f.StreamEnded() {
   823  		// Received the end of stream from the client.
   824  		s.compareAndSwapState(streamActive, streamReadDone)
   825  		s.write(recvMsg{err: io.EOF})
   826  	}
   827  }
   828  
   829  func (t *http2Server) handleRSTStream(f *http2.RSTStreamFrame) {
   830  	// If the stream is not deleted from the transport's active streams map, then do a regular close stream.
   831  	if s, ok := t.getStream(f); ok {
   832  		t.closeStream(s, false, 0, false)
   833  		return
   834  	}
   835  	// If the stream is already deleted from the active streams map, then put a cleanupStream item into controlbuf to delete the stream from loopy writer's established streams map.
   836  	t.controlBuf.put(&cleanupStream{
   837  		streamID: f.Header().StreamID,
   838  		rst:      false,
   839  		rstCode:  0,
   840  		onWrite:  func() {},
   841  	})
   842  }
   843  
   844  func (t *http2Server) handleSettings(f *http2.SettingsFrame) {
   845  	if f.IsAck() {
   846  		return
   847  	}
   848  	var ss []http2.Setting
   849  	var updateFuncs []func()
   850  	f.ForeachSetting(func(s http2.Setting) error {
   851  		switch s.ID {
   852  		case http2.SettingMaxHeaderListSize:
   853  			updateFuncs = append(updateFuncs, func() {
   854  				t.maxSendHeaderListSize = new(uint32)
   855  				*t.maxSendHeaderListSize = s.Val
   856  			})
   857  		default:
   858  			ss = append(ss, s)
   859  		}
   860  		return nil
   861  	})
   862  	t.controlBuf.executeAndPut(func() bool {
   863  		for _, f := range updateFuncs {
   864  			f()
   865  		}
   866  		return true
   867  	}, &incomingSettings{
   868  		ss: ss,
   869  	})
   870  }
   871  
   872  const (
   873  	maxPingStrikes     = 2
   874  	defaultPingTimeout = 2 * time.Hour
   875  )
   876  
   877  func (t *http2Server) handlePing(f *http2.PingFrame) {
   878  	if f.IsAck() {
   879  		if f.Data == goAwayPing.data && t.drainEvent != nil {
   880  			t.drainEvent.Fire()
   881  			return
   882  		}
   883  		// Maybe it's a BDP ping.
   884  		if t.bdpEst != nil {
   885  			t.bdpEst.calculate(f.Data)
   886  		}
   887  		return
   888  	}
   889  	pingAck := &ping{ack: true}
   890  	copy(pingAck.data[:], f.Data[:])
   891  	t.controlBuf.put(pingAck)
   892  
   893  	now := time.Now()
   894  	defer func() {
   895  		t.lastPingAt = now
   896  	}()
   897  	// A reset ping strikes means that we don't need to check for policy
   898  	// violation for this ping and the pingStrikes counter should be set
   899  	// to 0.
   900  	if atomic.CompareAndSwapUint32(&t.resetPingStrikes, 1, 0) {
   901  		t.pingStrikes = 0
   902  		return
   903  	}
   904  	t.mu.Lock()
   905  	ns := len(t.activeStreams)
   906  	t.mu.Unlock()
   907  	if ns < 1 && !t.kep.PermitWithoutStream {
   908  		// Keepalive shouldn't be active thus, this new ping should
   909  		// have come after at least defaultPingTimeout.
   910  		if t.lastPingAt.Add(defaultPingTimeout).After(now) {
   911  			t.pingStrikes++
   912  		}
   913  	} else {
   914  		// Check if keepalive policy is respected.
   915  		if t.lastPingAt.Add(t.kep.MinTime).After(now) {
   916  			t.pingStrikes++
   917  		}
   918  	}
   919  
   920  	if t.pingStrikes > maxPingStrikes {
   921  		// Send goaway and close the connection.
   922  		t.controlBuf.put(&goAway{code: http2.ErrCodeEnhanceYourCalm, debugData: []byte("too_many_pings"), closeConn: errors.New("got too many pings from the client")})
   923  	}
   924  }
   925  
   926  func (t *http2Server) handleWindowUpdate(f *http2.WindowUpdateFrame) {
   927  	t.controlBuf.put(&incomingWindowUpdate{
   928  		streamID:  f.Header().StreamID,
   929  		increment: f.Increment,
   930  	})
   931  }
   932  
   933  func appendHeaderFieldsFromMD(headerFields []hpack.HeaderField, md metadata.MD) []hpack.HeaderField {
   934  	for k, vv := range md {
   935  		if isReservedHeader(k) {
   936  			// Clients don't tolerate reading restricted headers after some non restricted ones were sent.
   937  			continue
   938  		}
   939  		for _, v := range vv {
   940  			headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})
   941  		}
   942  	}
   943  	return headerFields
   944  }
   945  
   946  func (t *http2Server) checkForHeaderListSize(it any) bool {
   947  	if t.maxSendHeaderListSize == nil {
   948  		return true
   949  	}
   950  	hdrFrame := it.(*headerFrame)
   951  	var sz int64
   952  	for _, f := range hdrFrame.hf {
   953  		if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
   954  			if t.logger.V(logLevel) {
   955  				t.logger.Infof("Header list size to send violates the maximum size (%d bytes) set by client", *t.maxSendHeaderListSize)
   956  			}
   957  			return false
   958  		}
   959  	}
   960  	return true
   961  }
   962  
   963  func (t *http2Server) streamContextErr(s *Stream) error {
   964  	select {
   965  	case <-t.done:
   966  		return ErrConnClosing
   967  	default:
   968  	}
   969  	return ContextErr(s.ctx.Err())
   970  }
   971  
   972  // WriteHeader sends the header metadata md back to the client.
   973  func (t *http2Server) WriteHeader(s *Stream, md metadata.MD) error {
   974  	s.hdrMu.Lock()
   975  	defer s.hdrMu.Unlock()
   976  	if s.getState() == streamDone {
   977  		return t.streamContextErr(s)
   978  	}
   979  
   980  	if s.updateHeaderSent() {
   981  		return ErrIllegalHeaderWrite
   982  	}
   983  
   984  	if md.Len() > 0 {
   985  		if s.header.Len() > 0 {
   986  			s.header = metadata.Join(s.header, md)
   987  		} else {
   988  			s.header = md
   989  		}
   990  	}
   991  	if err := t.writeHeaderLocked(s); err != nil {
   992  		switch e := err.(type) {
   993  		case ConnectionError:
   994  			return status.Error(codes.Unavailable, e.Desc)
   995  		default:
   996  			return status.Convert(err).Err()
   997  		}
   998  	}
   999  	return nil
  1000  }
  1001  
  1002  func (t *http2Server) setResetPingStrikes() {
  1003  	atomic.StoreUint32(&t.resetPingStrikes, 1)
  1004  }
  1005  
  1006  func (t *http2Server) writeHeaderLocked(s *Stream) error {
  1007  	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  1008  	// first and create a slice of that exact size.
  1009  	headerFields := make([]hpack.HeaderField, 0, 2) // at least :status, content-type will be there if none else.
  1010  	headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  1011  	headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
  1012  	if s.sendCompress != "" {
  1013  		headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-encoding", Value: s.sendCompress})
  1014  	}
  1015  	headerFields = appendHeaderFieldsFromMD(headerFields, s.header)
  1016  	hf := &headerFrame{
  1017  		streamID:  s.id,
  1018  		hf:        headerFields,
  1019  		endStream: false,
  1020  		onWrite:   t.setResetPingStrikes,
  1021  	}
  1022  	success, err := t.controlBuf.executeAndPut(func() bool { return t.checkForHeaderListSize(hf) }, hf)
  1023  	if !success {
  1024  		if err != nil {
  1025  			return err
  1026  		}
  1027  		t.closeStream(s, true, http2.ErrCodeInternal, false)
  1028  		return ErrHeaderListSizeLimitViolation
  1029  	}
  1030  	for _, sh := range t.stats {
  1031  		// Note: Headers are compressed with hpack after this call returns.
  1032  		// No WireLength field is set here.
  1033  		outHeader := &stats.OutHeader{
  1034  			Header:      s.header.Copy(),
  1035  			Compression: s.sendCompress,
  1036  		}
  1037  		sh.HandleRPC(s.Context(), outHeader)
  1038  	}
  1039  	return nil
  1040  }
  1041  
  1042  // WriteStatus sends stream status to the client and terminates the stream.
  1043  // There is no further I/O operations being able to perform on this stream.
  1044  // TODO(zhaoq): Now it indicates the end of entire stream. Revisit if early
  1045  // OK is adopted.
  1046  func (t *http2Server) WriteStatus(s *Stream, st *status.Status) error {
  1047  	s.hdrMu.Lock()
  1048  	defer s.hdrMu.Unlock()
  1049  
  1050  	if s.getState() == streamDone {
  1051  		return nil
  1052  	}
  1053  
  1054  	// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
  1055  	// first and create a slice of that exact size.
  1056  	headerFields := make([]hpack.HeaderField, 0, 2) // grpc-status and grpc-message will be there if none else.
  1057  	if !s.updateHeaderSent() {                      // No headers have been sent.
  1058  		if len(s.header) > 0 { // Send a separate header frame.
  1059  			if err := t.writeHeaderLocked(s); err != nil {
  1060  				return err
  1061  			}
  1062  		} else { // Send a trailer only response.
  1063  			headerFields = append(headerFields, hpack.HeaderField{Name: ":status", Value: "200"})
  1064  			headerFields = append(headerFields, hpack.HeaderField{Name: "content-type", Value: grpcutil.ContentType(s.contentSubtype)})
  1065  		}
  1066  	}
  1067  	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-status", Value: strconv.Itoa(int(st.Code()))})
  1068  	headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-message", Value: encodeGrpcMessage(st.Message())})
  1069  
  1070  	if p := st.Proto(); p != nil && len(p.Details) > 0 {
  1071  		// Do not use the user's grpc-status-details-bin (if present) if we are
  1072  		// even attempting to set our own.
  1073  		delete(s.trailer, grpcStatusDetailsBinHeader)
  1074  		stBytes, err := proto.Marshal(p)
  1075  		if err != nil {
  1076  			// TODO: return error instead, when callers are able to handle it.
  1077  			t.logger.Errorf("Failed to marshal rpc status: %s, error: %v", pretty.ToJSON(p), err)
  1078  		} else {
  1079  			headerFields = append(headerFields, hpack.HeaderField{Name: grpcStatusDetailsBinHeader, Value: encodeBinHeader(stBytes)})
  1080  		}
  1081  	}
  1082  
  1083  	// Attach the trailer metadata.
  1084  	headerFields = appendHeaderFieldsFromMD(headerFields, s.trailer)
  1085  	trailingHeader := &headerFrame{
  1086  		streamID:  s.id,
  1087  		hf:        headerFields,
  1088  		endStream: true,
  1089  		onWrite:   t.setResetPingStrikes,
  1090  	}
  1091  
  1092  	success, err := t.controlBuf.execute(t.checkForHeaderListSize, trailingHeader)
  1093  	if !success {
  1094  		if err != nil {
  1095  			return err
  1096  		}
  1097  		t.closeStream(s, true, http2.ErrCodeInternal, false)
  1098  		return ErrHeaderListSizeLimitViolation
  1099  	}
  1100  	// Send a RST_STREAM after the trailers if the client has not already half-closed.
  1101  	rst := s.getState() == streamActive
  1102  	t.finishStream(s, rst, http2.ErrCodeNo, trailingHeader, true)
  1103  	for _, sh := range t.stats {
  1104  		// Note: The trailer fields are compressed with hpack after this call returns.
  1105  		// No WireLength field is set here.
  1106  		sh.HandleRPC(s.Context(), &stats.OutTrailer{
  1107  			Trailer: s.trailer.Copy(),
  1108  		})
  1109  	}
  1110  	return nil
  1111  }
  1112  
  1113  // Write converts the data into HTTP2 data frame and sends it out. Non-nil error
  1114  // is returns if it fails (e.g., framing error, transport error).
  1115  func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) error {
  1116  	if !s.isHeaderSent() { // Headers haven't been written yet.
  1117  		if err := t.WriteHeader(s, nil); err != nil {
  1118  			return err
  1119  		}
  1120  	} else {
  1121  		// Writing headers checks for this condition.
  1122  		if s.getState() == streamDone {
  1123  			return t.streamContextErr(s)
  1124  		}
  1125  	}
  1126  	df := &dataFrame{
  1127  		streamID:    s.id,
  1128  		h:           hdr,
  1129  		d:           data,
  1130  		onEachWrite: t.setResetPingStrikes,
  1131  	}
  1132  	if err := s.wq.get(int32(len(hdr) + len(data))); err != nil {
  1133  		return t.streamContextErr(s)
  1134  	}
  1135  	return t.controlBuf.put(df)
  1136  }
  1137  
  1138  // keepalive running in a separate goroutine does the following:
  1139  // 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
  1140  // 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
  1141  // 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
  1142  // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-responsive connection
  1143  // after an additional duration of keepalive.Timeout.
  1144  func (t *http2Server) keepalive() {
  1145  	p := &ping{}
  1146  	// True iff a ping has been sent, and no data has been received since then.
  1147  	outstandingPing := false
  1148  	// Amount of time remaining before which we should receive an ACK for the
  1149  	// last sent ping.
  1150  	kpTimeoutLeft := time.Duration(0)
  1151  	// Records the last value of t.lastRead before we go block on the timer.
  1152  	// This is required to check for read activity since then.
  1153  	prevNano := time.Now().UnixNano()
  1154  	// Initialize the different timers to their default values.
  1155  	idleTimer := time.NewTimer(t.kp.MaxConnectionIdle)
  1156  	ageTimer := time.NewTimer(t.kp.MaxConnectionAge)
  1157  	kpTimer := time.NewTimer(t.kp.Time)
  1158  	defer func() {
  1159  		// We need to drain the underlying channel in these timers after a call
  1160  		// to Stop(), only if we are interested in resetting them. Clearly we
  1161  		// are not interested in resetting them here.
  1162  		idleTimer.Stop()
  1163  		ageTimer.Stop()
  1164  		kpTimer.Stop()
  1165  	}()
  1166  
  1167  	for {
  1168  		select {
  1169  		case <-idleTimer.C:
  1170  			t.mu.Lock()
  1171  			idle := t.idle
  1172  			if idle.IsZero() { // The connection is non-idle.
  1173  				t.mu.Unlock()
  1174  				idleTimer.Reset(t.kp.MaxConnectionIdle)
  1175  				continue
  1176  			}
  1177  			val := t.kp.MaxConnectionIdle - time.Since(idle)
  1178  			t.mu.Unlock()
  1179  			if val <= 0 {
  1180  				// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
  1181  				// Gracefully close the connection.
  1182  				t.Drain("max_idle")
  1183  				return
  1184  			}
  1185  			idleTimer.Reset(val)
  1186  		case <-ageTimer.C:
  1187  			t.Drain("max_age")
  1188  			ageTimer.Reset(t.kp.MaxConnectionAgeGrace)
  1189  			select {
  1190  			case <-ageTimer.C:
  1191  				// Close the connection after grace period.
  1192  				if t.logger.V(logLevel) {
  1193  					t.logger.Infof("Closing server transport due to maximum connection age")
  1194  				}
  1195  				t.controlBuf.put(closeConnection{})
  1196  			case <-t.done:
  1197  			}
  1198  			return
  1199  		case <-kpTimer.C:
  1200  			lastRead := atomic.LoadInt64(&t.lastRead)
  1201  			if lastRead > prevNano {
  1202  				// There has been read activity since the last time we were
  1203  				// here. Setup the timer to fire at kp.Time seconds from
  1204  				// lastRead time and continue.
  1205  				outstandingPing = false
  1206  				kpTimer.Reset(time.Duration(lastRead) + t.kp.Time - time.Duration(time.Now().UnixNano()))
  1207  				prevNano = lastRead
  1208  				continue
  1209  			}
  1210  			if outstandingPing && kpTimeoutLeft <= 0 {
  1211  				t.Close(fmt.Errorf("keepalive ping not acked within timeout %s", t.kp.Timeout))
  1212  				return
  1213  			}
  1214  			if !outstandingPing {
  1215  				if channelz.IsOn() {
  1216  					t.channelz.SocketMetrics.KeepAlivesSent.Add(1)
  1217  				}
  1218  				t.controlBuf.put(p)
  1219  				kpTimeoutLeft = t.kp.Timeout
  1220  				outstandingPing = true
  1221  			}
  1222  			// The amount of time to sleep here is the minimum of kp.Time and
  1223  			// timeoutLeft. This will ensure that we wait only for kp.Time
  1224  			// before sending out the next ping (for cases where the ping is
  1225  			// acked).
  1226  			sleepDuration := minTime(t.kp.Time, kpTimeoutLeft)
  1227  			kpTimeoutLeft -= sleepDuration
  1228  			kpTimer.Reset(sleepDuration)
  1229  		case <-t.done:
  1230  			return
  1231  		}
  1232  	}
  1233  }
  1234  
  1235  // Close starts shutting down the http2Server transport.
  1236  // TODO(zhaoq): Now the destruction is not blocked on any pending streams. This
  1237  // could cause some resource issue. Revisit this later.
  1238  func (t *http2Server) Close(err error) {
  1239  	t.mu.Lock()
  1240  	if t.state == closing {
  1241  		t.mu.Unlock()
  1242  		return
  1243  	}
  1244  	if t.logger.V(logLevel) {
  1245  		t.logger.Infof("Closing: %v", err)
  1246  	}
  1247  	t.state = closing
  1248  	streams := t.activeStreams
  1249  	t.activeStreams = nil
  1250  	t.mu.Unlock()
  1251  	t.controlBuf.finish()
  1252  	close(t.done)
  1253  	if err := t.conn.Close(); err != nil && t.logger.V(logLevel) {
  1254  		t.logger.Infof("Error closing underlying net.Conn during Close: %v", err)
  1255  	}
  1256  	channelz.RemoveEntry(t.channelz.ID)
  1257  	// Cancel all active streams.
  1258  	for _, s := range streams {
  1259  		s.cancel()
  1260  	}
  1261  }
  1262  
  1263  // deleteStream deletes the stream s from transport's active streams.
  1264  func (t *http2Server) deleteStream(s *Stream, eosReceived bool) {
  1265  
  1266  	t.mu.Lock()
  1267  	if _, ok := t.activeStreams[s.id]; ok {
  1268  		delete(t.activeStreams, s.id)
  1269  		if len(t.activeStreams) == 0 {
  1270  			t.idle = time.Now()
  1271  		}
  1272  	}
  1273  	t.mu.Unlock()
  1274  
  1275  	if channelz.IsOn() {
  1276  		if eosReceived {
  1277  			t.channelz.SocketMetrics.StreamsSucceeded.Add(1)
  1278  		} else {
  1279  			t.channelz.SocketMetrics.StreamsFailed.Add(1)
  1280  		}
  1281  	}
  1282  }
  1283  
  1284  // finishStream closes the stream and puts the trailing headerFrame into controlbuf.
  1285  func (t *http2Server) finishStream(s *Stream, rst bool, rstCode http2.ErrCode, hdr *headerFrame, eosReceived bool) {
  1286  	// In case stream sending and receiving are invoked in separate
  1287  	// goroutines (e.g., bi-directional streaming), cancel needs to be
  1288  	// called to interrupt the potential blocking on other goroutines.
  1289  	s.cancel()
  1290  
  1291  	oldState := s.swapState(streamDone)
  1292  	if oldState == streamDone {
  1293  		// If the stream was already done, return.
  1294  		return
  1295  	}
  1296  
  1297  	hdr.cleanup = &cleanupStream{
  1298  		streamID: s.id,
  1299  		rst:      rst,
  1300  		rstCode:  rstCode,
  1301  		onWrite: func() {
  1302  			t.deleteStream(s, eosReceived)
  1303  		},
  1304  	}
  1305  	t.controlBuf.put(hdr)
  1306  }
  1307  
  1308  // closeStream clears the footprint of a stream when the stream is not needed any more.
  1309  func (t *http2Server) closeStream(s *Stream, rst bool, rstCode http2.ErrCode, eosReceived bool) {
  1310  	// In case stream sending and receiving are invoked in separate
  1311  	// goroutines (e.g., bi-directional streaming), cancel needs to be
  1312  	// called to interrupt the potential blocking on other goroutines.
  1313  	s.cancel()
  1314  
  1315  	s.swapState(streamDone)
  1316  	t.deleteStream(s, eosReceived)
  1317  
  1318  	t.controlBuf.put(&cleanupStream{
  1319  		streamID: s.id,
  1320  		rst:      rst,
  1321  		rstCode:  rstCode,
  1322  		onWrite:  func() {},
  1323  	})
  1324  }
  1325  
  1326  func (t *http2Server) Drain(debugData string) {
  1327  	t.mu.Lock()
  1328  	defer t.mu.Unlock()
  1329  	if t.drainEvent != nil {
  1330  		return
  1331  	}
  1332  	t.drainEvent = grpcsync.NewEvent()
  1333  	t.controlBuf.put(&goAway{code: http2.ErrCodeNo, debugData: []byte(debugData), headsUp: true})
  1334  }
  1335  
  1336  var goAwayPing = &ping{data: [8]byte{1, 6, 1, 8, 0, 3, 3, 9}}
  1337  
  1338  // Handles outgoing GoAway and returns true if loopy needs to put itself
  1339  // in draining mode.
  1340  func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) {
  1341  	t.maxStreamMu.Lock()
  1342  	t.mu.Lock()
  1343  	if t.state == closing { // TODO(mmukhi): This seems unnecessary.
  1344  		t.mu.Unlock()
  1345  		t.maxStreamMu.Unlock()
  1346  		// The transport is closing.
  1347  		return false, ErrConnClosing
  1348  	}
  1349  	if !g.headsUp {
  1350  		// Stop accepting more streams now.
  1351  		t.state = draining
  1352  		sid := t.maxStreamID
  1353  		retErr := g.closeConn
  1354  		if len(t.activeStreams) == 0 {
  1355  			retErr = errors.New("second GOAWAY written and no active streams left to process")
  1356  		}
  1357  		t.mu.Unlock()
  1358  		t.maxStreamMu.Unlock()
  1359  		if err := t.framer.fr.WriteGoAway(sid, g.code, g.debugData); err != nil {
  1360  			return false, err
  1361  		}
  1362  		t.framer.writer.Flush()
  1363  		if retErr != nil {
  1364  			return false, retErr
  1365  		}
  1366  		return true, nil
  1367  	}
  1368  	t.mu.Unlock()
  1369  	t.maxStreamMu.Unlock()
  1370  	// For a graceful close, send out a GoAway with stream ID of MaxUInt32,
  1371  	// Follow that with a ping and wait for the ack to come back or a timer
  1372  	// to expire. During this time accept new streams since they might have
  1373  	// originated before the GoAway reaches the client.
  1374  	// After getting the ack or timer expiration send out another GoAway this
  1375  	// time with an ID of the max stream server intends to process.
  1376  	if err := t.framer.fr.WriteGoAway(math.MaxUint32, http2.ErrCodeNo, g.debugData); err != nil {
  1377  		return false, err
  1378  	}
  1379  	if err := t.framer.fr.WritePing(false, goAwayPing.data); err != nil {
  1380  		return false, err
  1381  	}
  1382  	go func() {
  1383  		timer := time.NewTimer(5 * time.Second)
  1384  		defer timer.Stop()
  1385  		select {
  1386  		case <-t.drainEvent.Done():
  1387  		case <-timer.C:
  1388  		case <-t.done:
  1389  			return
  1390  		}
  1391  		t.controlBuf.put(&goAway{code: g.code, debugData: g.debugData})
  1392  	}()
  1393  	return false, nil
  1394  }
  1395  
  1396  func (t *http2Server) socketMetrics() *channelz.EphemeralSocketMetrics {
  1397  	return &channelz.EphemeralSocketMetrics{
  1398  		LocalFlowControlWindow:  int64(t.fc.getSize()),
  1399  		RemoteFlowControlWindow: t.getOutFlowWindow(),
  1400  	}
  1401  }
  1402  
  1403  func (t *http2Server) IncrMsgSent() {
  1404  	t.channelz.SocketMetrics.MessagesSent.Add(1)
  1405  	t.channelz.SocketMetrics.LastMessageSentTimestamp.Add(1)
  1406  }
  1407  
  1408  func (t *http2Server) IncrMsgRecv() {
  1409  	t.channelz.SocketMetrics.MessagesReceived.Add(1)
  1410  	t.channelz.SocketMetrics.LastMessageReceivedTimestamp.Add(1)
  1411  }
  1412  
  1413  func (t *http2Server) getOutFlowWindow() int64 {
  1414  	resp := make(chan uint32, 1)
  1415  	timer := time.NewTimer(time.Second)
  1416  	defer timer.Stop()
  1417  	t.controlBuf.put(&outFlowControlSizeRequest{resp})
  1418  	select {
  1419  	case sz := <-resp:
  1420  		return int64(sz)
  1421  	case <-t.done:
  1422  		return -1
  1423  	case <-timer.C:
  1424  		return -2
  1425  	}
  1426  }
  1427  
  1428  // Peer returns the peer of the transport.
  1429  func (t *http2Server) Peer() *peer.Peer {
  1430  	return &peer.Peer{
  1431  		Addr:      t.peer.Addr,
  1432  		LocalAddr: t.peer.LocalAddr,
  1433  		AuthInfo:  t.peer.AuthInfo, // Can be nil
  1434  	}
  1435  }
  1436  
  1437  func getJitter(v time.Duration) time.Duration {
  1438  	if v == infinity {
  1439  		return 0
  1440  	}
  1441  	// Generate a jitter between +/- 10% of the value.
  1442  	r := int64(v / 10)
  1443  	j := grpcrand.Int63n(2*r) - r
  1444  	return time.Duration(j)
  1445  }
  1446  
  1447  type connectionKey struct{}
  1448  
  1449  // GetConnection gets the connection from the context.
  1450  func GetConnection(ctx context.Context) net.Conn {
  1451  	conn, _ := ctx.Value(connectionKey{}).(net.Conn)
  1452  	return conn
  1453  }
  1454  
  1455  // SetConnection adds the connection to the context to be able to get
  1456  // information about the destination ip and port for an incoming RPC. This also
  1457  // allows any unary or streaming interceptors to see the connection.
  1458  func SetConnection(ctx context.Context, conn net.Conn) context.Context {
  1459  	return context.WithValue(ctx, connectionKey{}, conn)
  1460  }
  1461  

View as plain text