...

Source file src/github.com/go-kit/kit/util/conn/manager.go

Documentation: github.com/go-kit/kit/util/conn

     1  package conn
     2  
     3  import (
     4  	"errors"
     5  	"math/rand"
     6  	"net"
     7  	"time"
     8  
     9  	"github.com/go-kit/log"
    10  )
    11  
    12  // Dialer imitates net.Dial. Dialer is assumed to yield connections that are
    13  // safe for use by multiple concurrent goroutines.
    14  type Dialer func(network, address string) (net.Conn, error)
    15  
    16  // AfterFunc imitates time.After.
    17  type AfterFunc func(time.Duration) <-chan time.Time
    18  
    19  // Manager manages a net.Conn.
    20  //
    21  // Clients provide a way to create the connection with a Dialer, network, and
    22  // address. Clients should Take the connection when they want to use it, and Put
    23  // back whatever error they receive from its use. When a non-nil error is Put,
    24  // the connection is invalidated, and a new connection is established.
    25  // Connection failures are retried after an exponential backoff.
    26  type Manager struct {
    27  	dialer  Dialer
    28  	network string
    29  	address string
    30  	after   AfterFunc
    31  	logger  log.Logger
    32  
    33  	takec chan net.Conn
    34  	putc  chan error
    35  }
    36  
    37  // NewManager returns a connection manager using the passed Dialer, network, and
    38  // address. The AfterFunc is used to control exponential backoff and retries.
    39  // The logger is used to log errors; pass a log.NopLogger if you don't care to
    40  // receive them. For normal use, prefer NewDefaultManager.
    41  func NewManager(d Dialer, network, address string, after AfterFunc, logger log.Logger) *Manager {
    42  	m := &Manager{
    43  		dialer:  d,
    44  		network: network,
    45  		address: address,
    46  		after:   after,
    47  		logger:  logger,
    48  
    49  		takec: make(chan net.Conn),
    50  		putc:  make(chan error),
    51  	}
    52  	go m.loop()
    53  	return m
    54  }
    55  
    56  // NewDefaultManager is a helper constructor, suitable for most normal use in
    57  // real (non-test) code. It uses the real net.Dial and time.After functions.
    58  func NewDefaultManager(network, address string, logger log.Logger) *Manager {
    59  	return NewManager(net.Dial, network, address, time.After, logger)
    60  }
    61  
    62  // Take yields the current connection. It may be nil.
    63  func (m *Manager) Take() net.Conn {
    64  	return <-m.takec
    65  }
    66  
    67  // Put accepts an error that came from a previously yielded connection. If the
    68  // error is non-nil, the manager will invalidate the current connection and try
    69  // to reconnect, with exponential backoff. Putting a nil error is a no-op.
    70  func (m *Manager) Put(err error) {
    71  	m.putc <- err
    72  }
    73  
    74  // Write writes the passed data to the connection in a single Take/Put cycle.
    75  func (m *Manager) Write(b []byte) (int, error) {
    76  	conn := m.Take()
    77  	if conn == nil {
    78  		return 0, ErrConnectionUnavailable
    79  	}
    80  	n, err := conn.Write(b)
    81  	defer m.Put(err)
    82  	return n, err
    83  }
    84  
    85  func (m *Manager) loop() {
    86  	var (
    87  		conn       = dial(m.dialer, m.network, m.address, m.logger) // may block slightly
    88  		connc      = make(chan net.Conn, 1)
    89  		reconnectc <-chan time.Time // initially nil
    90  		backoff    = time.Second
    91  	)
    92  
    93  	// If the initial dial fails, we need to trigger a reconnect via the loop
    94  	// body, below. If we did this in a goroutine, we would race on the conn
    95  	// variable. So we use a buffered chan instead.
    96  	connc <- conn
    97  
    98  	for {
    99  		select {
   100  		case <-reconnectc:
   101  			reconnectc = nil // one-shot
   102  			go func() { connc <- dial(m.dialer, m.network, m.address, m.logger) }()
   103  
   104  		case conn = <-connc:
   105  			if conn == nil {
   106  				// didn't work
   107  				backoff = Exponential(backoff) // wait longer
   108  				reconnectc = m.after(backoff)  // try again
   109  			} else {
   110  				// worked!
   111  				backoff = time.Second // reset wait time
   112  				reconnectc = nil      // no retry necessary
   113  			}
   114  
   115  		case m.takec <- conn:
   116  
   117  		case err := <-m.putc:
   118  			if err != nil && conn != nil {
   119  				m.logger.Log("err", err)
   120  				conn.Close()
   121  				conn = nil                            // connection is bad
   122  				reconnectc = m.after(time.Nanosecond) // trigger immediately
   123  			}
   124  		}
   125  	}
   126  }
   127  
   128  func dial(d Dialer, network, address string, logger log.Logger) net.Conn {
   129  	conn, err := d(network, address)
   130  	if err != nil {
   131  		logger.Log("err", err)
   132  		conn = nil // just to be sure
   133  	}
   134  	return conn
   135  }
   136  
   137  // Exponential takes a duration and returns another one that is twice as long, +/- 50%. It is
   138  // used to provide backoff for operations that may fail and should avoid thundering herds.
   139  // See https://aws.amazon.com/blogs/architecture/exponential-backoff-and-jitter/ for rationale
   140  func Exponential(d time.Duration) time.Duration {
   141  	d *= 2
   142  	jitter := rand.Float64() + 0.5
   143  	d = time.Duration(int64(float64(d.Nanoseconds()) * jitter))
   144  	if d > time.Minute {
   145  		d = time.Minute
   146  	}
   147  	return d
   148  
   149  }
   150  
   151  // ErrConnectionUnavailable is returned by the Manager's Write method when the
   152  // manager cannot yield a good connection.
   153  var ErrConnectionUnavailable = errors.New("connection unavailable")
   154  

View as plain text