// Copyright 2020 The Go Authors. All rights reserved. // Use of this source code is governed by a BSD-style // license that can be found in the LICENSE file. package jsonrpc2 import ( "context" "fmt" "io" "runtime" "sync" "sync/atomic" "time" ) // Listener is implemented by protocols to accept new inbound connections. type Listener interface { // Accept accepts an inbound connection to a server. // It blocks until either an inbound connection is made, or the listener is closed. Accept(context.Context) (io.ReadWriteCloser, error) // Close closes the listener. // Any blocked Accept or Dial operations will unblock and return errors. Close() error // Dialer returns a dialer that can be used to connect to this listener // locally. // If a listener does not implement this it will return nil. Dialer() Dialer } // Dialer is used by clients to dial a server. type Dialer interface { // Dial returns a new communication byte stream to a listening server. Dial(ctx context.Context) (io.ReadWriteCloser, error) } // Server is a running server that is accepting incoming connections. type Server struct { listener Listener binder Binder async *async shutdownOnce sync.Once closing int32 // atomic: set to nonzero when Shutdown is called } // Dial uses the dialer to make a new connection, wraps the returned // reader and writer using the framer to make a stream, and then builds // a connection on top of that stream using the binder. // // The returned Connection will operate independently using the Preempter and/or // Handler provided by the Binder, and will release its own resources when the // connection is broken, but the caller may Close it earlier to stop accepting // (or sending) new requests. func Dial(ctx context.Context, dialer Dialer, binder Binder) (*Connection, error) { // dial a server rwc, err := dialer.Dial(ctx) if err != nil { return nil, err } return newConnection(ctx, rwc, binder, nil), nil } // NewServer starts a new server listening for incoming connections and returns // it. // This returns a fully running and connected server, it does not block on // the listener. // You can call Wait to block on the server, or Shutdown to get the sever to // terminate gracefully. // To notice incoming connections, use an intercepting Binder. func NewServer(ctx context.Context, listener Listener, binder Binder) *Server { server := &Server{ listener: listener, binder: binder, async: newAsync(), } go server.run(ctx) return server } // Wait returns only when the server has shut down. func (s *Server) Wait() error { return s.async.wait() } // Shutdown informs the server to stop accepting new connections. func (s *Server) Shutdown() { s.shutdownOnce.Do(func() { atomic.StoreInt32(&s.closing, 1) s.listener.Close() }) } // run accepts incoming connections from the listener, // If IdleTimeout is non-zero, run exits after there are no clients for this // duration, otherwise it exits only on error. func (s *Server) run(ctx context.Context) { defer s.async.done() var activeConns sync.WaitGroup for { rwc, err := s.listener.Accept(ctx) if err != nil { // Only Shutdown closes the listener. If we get an error after Shutdown is // called, assume that was the cause and don't report the error; // otherwise, report the error in case it is unexpected. if atomic.LoadInt32(&s.closing) == 0 { s.async.setError(err) } // We are done generating new connections for good. break } // A new inbound connection. activeConns.Add(1) _ = newConnection(ctx, rwc, s.binder, activeConns.Done) // unregisters itself when done } activeConns.Wait() } // NewIdleListener wraps a listener with an idle timeout. // // When there are no active connections for at least the timeout duration, // calls to Accept will fail with ErrIdleTimeout. // // A connection is considered inactive as soon as its Close method is called. func NewIdleListener(timeout time.Duration, wrap Listener) Listener { l := &idleListener{ wrapped: wrap, timeout: timeout, active: make(chan int, 1), timedOut: make(chan struct{}), idleTimer: make(chan *time.Timer, 1), } l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired) return l } type idleListener struct { wrapped Listener timeout time.Duration // Only one of these channels is receivable at any given time. active chan int // count of active connections; closed when Close is called if not timed out timedOut chan struct{} // closed when the idle timer expires idleTimer chan *time.Timer // holds the timer only when idle } // Accept accepts an incoming connection. // // If an incoming connection is accepted concurrent to the listener being closed // due to idleness, the new connection is immediately closed. func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) { rwc, err := l.wrapped.Accept(ctx) select { case n, ok := <-l.active: if err != nil { if ok { l.active <- n } return nil, err } if ok { l.active <- n + 1 } else { // l.wrapped.Close Close has been called, but Accept returned a // connection. This race can occur with concurrent Accept and Close calls // with any net.Listener, and it is benign: since the listener was closed // explicitly, it can't have also timed out. } return l.newConn(rwc), nil case <-l.timedOut: if err == nil { // Keeping the connection open would leave the listener simultaneously // active and closed due to idleness, which would be contradictory and // confusing. Close the connection and pretend that it never happened. rwc.Close() } else { // In theory the timeout could have raced with an unrelated error return // from Accept. However, ErrIdleTimeout is arguably still valid (since we // would have closed due to the timeout independent of the error), and the // harm from returning a spurious ErrIdleTimeout is negligible anyway. } return nil, ErrIdleTimeout case timer := <-l.idleTimer: if err != nil { // The idle timer doesn't run until it receives itself from the idleTimer // channel, so it can't have called l.wrapped.Close yet and thus err can't // be ErrIdleTimeout. Leave the idle timer as it was and return whatever // error we got. l.idleTimer <- timer return nil, err } if !timer.Stop() { // Failed to stop the timer — the timer goroutine is in the process of // firing. Send the timer back to the timer goroutine so that it can // safely close the timedOut channel, and then wait for the listener to // actually be closed before we return ErrIdleTimeout. l.idleTimer <- timer rwc.Close() <-l.timedOut return nil, ErrIdleTimeout } l.active <- 1 return l.newConn(rwc), nil } } func (l *idleListener) Close() error { select { case _, ok := <-l.active: if ok { close(l.active) } case <-l.timedOut: // Already closed by the timer; take care not to double-close if the caller // only explicitly invokes this Close method once, since the io.Closer // interface explicitly leaves doubled Close calls undefined. return ErrIdleTimeout case timer := <-l.idleTimer: if !timer.Stop() { // Couldn't stop the timer. It shouldn't take long to run, so just wait // (so that the Listener is guaranteed to be closed before we return) // and pretend that this call happened afterward. // That way we won't leak any timers or goroutines when Close returns. l.idleTimer <- timer <-l.timedOut return ErrIdleTimeout } close(l.active) } return l.wrapped.Close() } func (l *idleListener) Dialer() Dialer { return l.wrapped.Dialer() } func (l *idleListener) timerExpired() { select { case n, ok := <-l.active: if ok { panic(fmt.Sprintf("jsonrpc2: idleListener idle timer fired with %d connections still active", n)) } else { panic("jsonrpc2: Close finished with idle timer still running") } case <-l.timedOut: panic("jsonrpc2: idleListener idle timer fired more than once") case <-l.idleTimer: // The timer for this very call! } // Close the Listener with all channels still blocked to ensure that this call // to l.wrapped.Close doesn't race with the one in l.Close. defer close(l.timedOut) l.wrapped.Close() } func (l *idleListener) connClosed() { select { case n, ok := <-l.active: if !ok { // l is already closed, so it can't close due to idleness, // and we don't need to track the number of active connections any more. return } n-- if n == 0 { l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired) } else { l.active <- n } case <-l.timedOut: panic("jsonrpc2: idleListener idle timer fired before last active connection was closed") case <-l.idleTimer: panic("jsonrpc2: idleListener idle timer active before last active connection was closed") } } type idleListenerConn struct { wrapped io.ReadWriteCloser l *idleListener closeOnce sync.Once } func (l *idleListener) newConn(rwc io.ReadWriteCloser) *idleListenerConn { c := &idleListenerConn{ wrapped: rwc, l: l, } // A caller that forgets to call Close may disrupt the idleListener's // accounting, even though the file descriptor for the underlying connection // may eventually be garbage-collected anyway. // // Set a (best-effort) finalizer to verify that a Close call always occurs. // (We will clear the finalizer explicitly in Close.) runtime.SetFinalizer(c, func(c *idleListenerConn) { panic("jsonrpc2: IdleListener connection became unreachable without a call to Close") }) return c } func (c *idleListenerConn) Read(p []byte) (int, error) { return c.wrapped.Read(p) } func (c *idleListenerConn) Write(p []byte) (int, error) { return c.wrapped.Write(p) } func (c *idleListenerConn) Close() error { defer c.closeOnce.Do(func() { c.l.connClosed() runtime.SetFinalizer(c, nil) }) return c.wrapped.Close() }