...

Source file src/github.com/docker/go-connections/proxy/tcp_proxy.go

Documentation: github.com/docker/go-connections/proxy

     1  package proxy
     2  
     3  import (
     4  	"io"
     5  	"net"
     6  	"syscall"
     7  )
     8  
     9  // TCPProxy is a proxy for TCP connections. It implements the Proxy interface to
    10  // handle TCP traffic forwarding between the frontend and backend addresses.
    11  type TCPProxy struct {
    12  	Logger       logger
    13  	listener     *net.TCPListener
    14  	frontendAddr *net.TCPAddr
    15  	backendAddr  *net.TCPAddr
    16  }
    17  
    18  // NewTCPProxy creates a new TCPProxy.
    19  func NewTCPProxy(frontendAddr, backendAddr *net.TCPAddr, ops ...func(*TCPProxy)) (*TCPProxy, error) {
    20  	listener, err := net.ListenTCP("tcp", frontendAddr)
    21  	if err != nil {
    22  		return nil, err
    23  	}
    24  	// If the port in frontendAddr was 0 then ListenTCP will have a picked
    25  	// a port to listen on, hence the call to Addr to get that actual port:
    26  	proxy := &TCPProxy{
    27  		listener:     listener,
    28  		frontendAddr: listener.Addr().(*net.TCPAddr),
    29  		backendAddr:  backendAddr,
    30  		Logger:       &noopLogger{},
    31  	}
    32  
    33  	for _, op := range ops {
    34  		op(proxy)
    35  	}
    36  
    37  	return proxy, nil
    38  }
    39  
    40  func (proxy *TCPProxy) clientLoop(client *net.TCPConn, quit chan bool) {
    41  	backend, err := net.DialTCP("tcp", nil, proxy.backendAddr)
    42  	if err != nil {
    43  		proxy.Logger.Printf("Can't forward traffic to backend tcp/%v: %s\n", proxy.backendAddr, err)
    44  		_ = client.Close()
    45  		return
    46  	}
    47  
    48  	event := make(chan int64)
    49  	broker := func(to, from *net.TCPConn) {
    50  		written, err := io.Copy(to, from)
    51  		if err != nil {
    52  			// If the socket we are writing to is shutdown with
    53  			// SHUT_WR, forward it to the other end of the pipe:
    54  			if err, ok := err.(*net.OpError); ok && err.Err == syscall.EPIPE {
    55  				_ = from.CloseRead()
    56  			}
    57  		}
    58  		_ = to.CloseWrite()
    59  		event <- written
    60  	}
    61  
    62  	go broker(client, backend)
    63  	go broker(backend, client)
    64  
    65  	var transferred int64
    66  	for i := 0; i < 2; i++ {
    67  		select {
    68  		case written := <-event:
    69  			transferred += written
    70  		case <-quit:
    71  			// Interrupt the two brokers and "join" them.
    72  			_ = client.Close()
    73  			_ = backend.Close()
    74  			for ; i < 2; i++ {
    75  				transferred += <-event
    76  			}
    77  			return
    78  		}
    79  	}
    80  	_ = client.Close()
    81  	_ = backend.Close()
    82  }
    83  
    84  // Run starts forwarding the traffic using TCP.
    85  func (proxy *TCPProxy) Run() {
    86  	quit := make(chan bool)
    87  	defer close(quit)
    88  	for {
    89  		client, err := proxy.listener.Accept()
    90  		if err != nil {
    91  			proxy.Logger.Printf("Stopping proxy on tcp/%v for tcp/%v (%s)", proxy.frontendAddr, proxy.backendAddr, err)
    92  			return
    93  		}
    94  		go proxy.clientLoop(client.(*net.TCPConn), quit)
    95  	}
    96  }
    97  
    98  // Close stops forwarding the traffic.
    99  func (proxy *TCPProxy) Close() { _ = proxy.listener.Close() }
   100  
   101  // FrontendAddr returns the TCP address on which the proxy is listening.
   102  func (proxy *TCPProxy) FrontendAddr() net.Addr { return proxy.frontendAddr }
   103  
   104  // BackendAddr returns the TCP proxied address.
   105  func (proxy *TCPProxy) BackendAddr() net.Addr { return proxy.backendAddr }
   106  

View as plain text