...
1
2
3
4
5 package jsonrpc2
6
7 import (
8 "context"
9 "errors"
10 "io"
11 "math"
12 "net"
13 "os"
14 "time"
15
16 "golang.org/x/tools/internal/event"
17 )
18
19
20
21
22
23
24
25
26 type StreamServer interface {
27 ServeStream(context.Context, Conn) error
28 }
29
30
31
32 type ServerFunc func(context.Context, Conn) error
33
34
35 func (f ServerFunc) ServeStream(ctx context.Context, c Conn) error {
36 return f(ctx, c)
37 }
38
39
40
41 func HandlerServer(h Handler) StreamServer {
42 return ServerFunc(func(ctx context.Context, conn Conn) error {
43 conn.Go(ctx, h)
44 <-conn.Done()
45 return conn.Err()
46 })
47 }
48
49
50
51
52 func ListenAndServe(ctx context.Context, network, addr string, server StreamServer, idleTimeout time.Duration) error {
53 ln, err := net.Listen(network, addr)
54 if err != nil {
55 return err
56 }
57 defer ln.Close()
58 if network == "unix" {
59 defer os.Remove(addr)
60 }
61 return Serve(ctx, ln, server, idleTimeout)
62 }
63
64
65
66
67 func Serve(ctx context.Context, ln net.Listener, server StreamServer, idleTimeout time.Duration) error {
68 newConns := make(chan net.Conn)
69 closedConns := make(chan error)
70 activeConns := 0
71 var acceptErr error
72 go func() {
73 defer close(newConns)
74 for {
75 var nc net.Conn
76 nc, acceptErr = ln.Accept()
77 if acceptErr != nil {
78 return
79 }
80 newConns <- nc
81 }
82 }()
83
84 ctx, cancel := context.WithCancel(ctx)
85 defer func() {
86
87
88 ln.Close()
89 for nc := range newConns {
90 nc.Close()
91 }
92
93 cancel()
94 for activeConns > 0 {
95 err := <-closedConns
96 if !isClosingError(err) {
97 event.Error(ctx, "closed a connection", err)
98 }
99 activeConns--
100 }
101 }()
102
103
104 const forever = math.MaxInt64
105 if idleTimeout <= 0 {
106 idleTimeout = forever
107 }
108 connTimer := time.NewTimer(idleTimeout)
109 defer connTimer.Stop()
110
111 for {
112 select {
113 case netConn, ok := <-newConns:
114 if !ok {
115 return acceptErr
116 }
117 if activeConns == 0 && !connTimer.Stop() {
118
119
120 <-connTimer.C
121 }
122 activeConns++
123 stream := NewHeaderStream(netConn)
124 go func() {
125 conn := NewConn(stream)
126 err := server.ServeStream(ctx, conn)
127 stream.Close()
128 closedConns <- err
129 }()
130
131 case err := <-closedConns:
132 if !isClosingError(err) {
133 event.Error(ctx, "closed a connection", err)
134 }
135 activeConns--
136 if activeConns == 0 {
137 connTimer.Reset(idleTimeout)
138 }
139
140 case <-connTimer.C:
141 return ErrIdleTimeout
142
143 case <-ctx.Done():
144 return nil
145 }
146 }
147 }
148
149
150
151
152 func isClosingError(err error) bool {
153 if errors.Is(err, io.EOF) {
154 return true
155 }
156
157
158
159 if err.Error() == "use of closed network connection" {
160 return true
161 }
162 return false
163 }
164
View as plain text