...

Source file src/golang.org/x/net/http2/netconn_test.go

Documentation: golang.org/x/net/http2

     1  // Copyright 2024 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package http2
     6  
     7  import (
     8  	"bytes"
     9  	"context"
    10  	"errors"
    11  	"io"
    12  	"math"
    13  	"net"
    14  	"net/netip"
    15  	"os"
    16  	"sync"
    17  	"time"
    18  )
    19  
    20  // synctestNetPipe creates an in-memory, full duplex network connection.
    21  // Read and write timeouts are managed by the synctest group.
    22  //
    23  // Unlike net.Pipe, the connection is not synchronous.
    24  // Writes are made to a buffer, and return immediately.
    25  // By default, the buffer size is unlimited.
    26  func synctestNetPipe(group *synctestGroup) (r, w *synctestNetConn) {
    27  	s1addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:8000"))
    28  	s2addr := net.TCPAddrFromAddrPort(netip.MustParseAddrPort("127.0.0.1:8001"))
    29  	s1 := newSynctestNetConnHalf(s1addr)
    30  	s2 := newSynctestNetConnHalf(s2addr)
    31  	r = &synctestNetConn{group: group, loc: s1, rem: s2}
    32  	w = &synctestNetConn{group: group, loc: s2, rem: s1}
    33  	r.peer = w
    34  	w.peer = r
    35  	return r, w
    36  }
    37  
    38  // A synctestNetConn is one endpoint of the connection created by synctestNetPipe.
    39  type synctestNetConn struct {
    40  	group *synctestGroup
    41  
    42  	// local and remote connection halves.
    43  	// Each half contains a buffer.
    44  	// Reads pull from the local buffer, and writes push to the remote buffer.
    45  	loc, rem *synctestNetConnHalf
    46  
    47  	// When set, group.Wait is automatically called before reads and after writes.
    48  	autoWait bool
    49  
    50  	// peer is the other endpoint.
    51  	peer *synctestNetConn
    52  }
    53  
    54  // Read reads data from the connection.
    55  func (c *synctestNetConn) Read(b []byte) (n int, err error) {
    56  	if c.autoWait {
    57  		c.group.Wait()
    58  	}
    59  	return c.loc.read(b)
    60  }
    61  
    62  // Peek returns the available unread read buffer,
    63  // without consuming its contents.
    64  func (c *synctestNetConn) Peek() []byte {
    65  	if c.autoWait {
    66  		c.group.Wait()
    67  	}
    68  	return c.loc.peek()
    69  }
    70  
    71  // Write writes data to the connection.
    72  func (c *synctestNetConn) Write(b []byte) (n int, err error) {
    73  	if c.autoWait {
    74  		defer c.group.Wait()
    75  	}
    76  	return c.rem.write(b)
    77  }
    78  
    79  // IsClosed reports whether the peer has closed its end of the connection.
    80  func (c *synctestNetConn) IsClosedByPeer() bool {
    81  	if c.autoWait {
    82  		c.group.Wait()
    83  	}
    84  	return c.loc.isClosedByPeer()
    85  }
    86  
    87  // Close closes the connection.
    88  func (c *synctestNetConn) Close() error {
    89  	c.loc.setWriteError(errors.New("connection closed by peer"))
    90  	c.rem.setReadError(io.EOF)
    91  	if c.autoWait {
    92  		c.group.Wait()
    93  	}
    94  	return nil
    95  }
    96  
    97  // LocalAddr returns the (fake) local network address.
    98  func (c *synctestNetConn) LocalAddr() net.Addr {
    99  	return c.loc.addr
   100  }
   101  
   102  // LocalAddr returns the (fake) remote network address.
   103  func (c *synctestNetConn) RemoteAddr() net.Addr {
   104  	return c.rem.addr
   105  }
   106  
   107  // SetDeadline sets the read and write deadlines for the connection.
   108  func (c *synctestNetConn) SetDeadline(t time.Time) error {
   109  	c.SetReadDeadline(t)
   110  	c.SetWriteDeadline(t)
   111  	return nil
   112  }
   113  
   114  // SetReadDeadline sets the read deadline for the connection.
   115  func (c *synctestNetConn) SetReadDeadline(t time.Time) error {
   116  	c.loc.rctx.setDeadline(c.group, t)
   117  	return nil
   118  }
   119  
   120  // SetWriteDeadline sets the write deadline for the connection.
   121  func (c *synctestNetConn) SetWriteDeadline(t time.Time) error {
   122  	c.rem.wctx.setDeadline(c.group, t)
   123  	return nil
   124  }
   125  
   126  // SetReadBufferSize sets the read buffer limit for the connection.
   127  // Writes by the peer will block so long as the buffer is full.
   128  func (c *synctestNetConn) SetReadBufferSize(size int) {
   129  	c.loc.setReadBufferSize(size)
   130  }
   131  
   132  // synctestNetConnHalf is one data flow in the connection created by synctestNetPipe.
   133  // Each half contains a buffer. Writes to the half push to the buffer, and reads pull from it.
   134  type synctestNetConnHalf struct {
   135  	addr net.Addr
   136  
   137  	// Read and write timeouts.
   138  	rctx, wctx deadlineContext
   139  
   140  	// A half can be readable and/or writable.
   141  	//
   142  	// These four channels act as a lock,
   143  	// and allow waiting for readability/writability.
   144  	// When the half is unlocked, exactly one channel contains a value.
   145  	// When the half is locked, all channels are empty.
   146  	lockr  chan struct{} // readable
   147  	lockw  chan struct{} // writable
   148  	lockrw chan struct{} // readable and writable
   149  	lockc  chan struct{} // neither readable nor writable
   150  
   151  	bufMax   int // maximum buffer size
   152  	buf      bytes.Buffer
   153  	readErr  error // error returned by reads
   154  	writeErr error // error returned by writes
   155  }
   156  
   157  func newSynctestNetConnHalf(addr net.Addr) *synctestNetConnHalf {
   158  	h := &synctestNetConnHalf{
   159  		addr:   addr,
   160  		lockw:  make(chan struct{}, 1),
   161  		lockr:  make(chan struct{}, 1),
   162  		lockrw: make(chan struct{}, 1),
   163  		lockc:  make(chan struct{}, 1),
   164  		bufMax: math.MaxInt, // unlimited
   165  	}
   166  	h.unlock()
   167  	return h
   168  }
   169  
   170  func (h *synctestNetConnHalf) lock() {
   171  	select {
   172  	case <-h.lockw:
   173  	case <-h.lockr:
   174  	case <-h.lockrw:
   175  	case <-h.lockc:
   176  	}
   177  }
   178  
   179  func (h *synctestNetConnHalf) unlock() {
   180  	canRead := h.readErr != nil || h.buf.Len() > 0
   181  	canWrite := h.writeErr != nil || h.bufMax > h.buf.Len()
   182  	switch {
   183  	case canRead && canWrite:
   184  		h.lockrw <- struct{}{}
   185  	case canRead:
   186  		h.lockr <- struct{}{}
   187  	case canWrite:
   188  		h.lockw <- struct{}{}
   189  	default:
   190  		h.lockc <- struct{}{}
   191  	}
   192  }
   193  
   194  func (h *synctestNetConnHalf) readWaitAndLock() error {
   195  	select {
   196  	case <-h.lockr:
   197  		return nil
   198  	case <-h.lockrw:
   199  		return nil
   200  	default:
   201  	}
   202  	ctx := h.rctx.context()
   203  	select {
   204  	case <-h.lockr:
   205  		return nil
   206  	case <-h.lockrw:
   207  		return nil
   208  	case <-ctx.Done():
   209  		return context.Cause(ctx)
   210  	}
   211  }
   212  
   213  func (h *synctestNetConnHalf) writeWaitAndLock() error {
   214  	select {
   215  	case <-h.lockw:
   216  		return nil
   217  	case <-h.lockrw:
   218  		return nil
   219  	default:
   220  	}
   221  	ctx := h.wctx.context()
   222  	select {
   223  	case <-h.lockw:
   224  		return nil
   225  	case <-h.lockrw:
   226  		return nil
   227  	case <-ctx.Done():
   228  		return context.Cause(ctx)
   229  	}
   230  }
   231  
   232  func (h *synctestNetConnHalf) peek() []byte {
   233  	h.lock()
   234  	defer h.unlock()
   235  	return h.buf.Bytes()
   236  }
   237  
   238  func (h *synctestNetConnHalf) isClosedByPeer() bool {
   239  	h.lock()
   240  	defer h.unlock()
   241  	return h.readErr != nil
   242  }
   243  
   244  func (h *synctestNetConnHalf) read(b []byte) (n int, err error) {
   245  	if err := h.readWaitAndLock(); err != nil {
   246  		return 0, err
   247  	}
   248  	defer h.unlock()
   249  	if h.buf.Len() == 0 && h.readErr != nil {
   250  		return 0, h.readErr
   251  	}
   252  	return h.buf.Read(b)
   253  }
   254  
   255  func (h *synctestNetConnHalf) setReadBufferSize(size int) {
   256  	h.lock()
   257  	defer h.unlock()
   258  	h.bufMax = size
   259  }
   260  
   261  func (h *synctestNetConnHalf) write(b []byte) (n int, err error) {
   262  	for n < len(b) {
   263  		nn, err := h.writePartial(b[n:])
   264  		n += nn
   265  		if err != nil {
   266  			return n, err
   267  		}
   268  	}
   269  	return n, nil
   270  }
   271  
   272  func (h *synctestNetConnHalf) writePartial(b []byte) (n int, err error) {
   273  	if err := h.writeWaitAndLock(); err != nil {
   274  		return 0, err
   275  	}
   276  	defer h.unlock()
   277  	if h.writeErr != nil {
   278  		return 0, h.writeErr
   279  	}
   280  	writeMax := h.bufMax - h.buf.Len()
   281  	if writeMax < len(b) {
   282  		b = b[:writeMax]
   283  	}
   284  	return h.buf.Write(b)
   285  }
   286  
   287  func (h *synctestNetConnHalf) setReadError(err error) {
   288  	h.lock()
   289  	defer h.unlock()
   290  	if h.readErr == nil {
   291  		h.readErr = err
   292  	}
   293  }
   294  
   295  func (h *synctestNetConnHalf) setWriteError(err error) {
   296  	h.lock()
   297  	defer h.unlock()
   298  	if h.writeErr == nil {
   299  		h.writeErr = err
   300  	}
   301  }
   302  
   303  // deadlineContext converts a changable deadline (as in net.Conn.SetDeadline) into a Context.
   304  type deadlineContext struct {
   305  	mu     sync.Mutex
   306  	ctx    context.Context
   307  	cancel context.CancelCauseFunc
   308  	timer  timer
   309  }
   310  
   311  // context returns a Context which expires when the deadline does.
   312  func (t *deadlineContext) context() context.Context {
   313  	t.mu.Lock()
   314  	defer t.mu.Unlock()
   315  	if t.ctx == nil {
   316  		t.ctx, t.cancel = context.WithCancelCause(context.Background())
   317  	}
   318  	return t.ctx
   319  }
   320  
   321  // setDeadline sets the current deadline.
   322  func (t *deadlineContext) setDeadline(group *synctestGroup, deadline time.Time) {
   323  	t.mu.Lock()
   324  	defer t.mu.Unlock()
   325  	// If t.ctx is non-nil and t.cancel is nil, then t.ctx was canceled
   326  	// and we should create a new one.
   327  	if t.ctx == nil || t.cancel == nil {
   328  		t.ctx, t.cancel = context.WithCancelCause(context.Background())
   329  	}
   330  	// Stop any existing deadline from expiring.
   331  	if t.timer != nil {
   332  		t.timer.Stop()
   333  	}
   334  	if deadline.IsZero() {
   335  		// No deadline.
   336  		return
   337  	}
   338  	if !deadline.After(group.Now()) {
   339  		// Deadline has already expired.
   340  		t.cancel(os.ErrDeadlineExceeded)
   341  		t.cancel = nil
   342  		return
   343  	}
   344  	if t.timer != nil {
   345  		// Reuse existing deadline timer.
   346  		t.timer.Reset(deadline.Sub(group.Now()))
   347  		return
   348  	}
   349  	// Create a new timer to cancel the context at the deadline.
   350  	t.timer = group.AfterFunc(deadline.Sub(group.Now()), func() {
   351  		t.mu.Lock()
   352  		defer t.mu.Unlock()
   353  		t.cancel(os.ErrDeadlineExceeded)
   354  		t.cancel = nil
   355  	})
   356  }
   357  

View as plain text