1
2
3
4
5 package jsonrpc2
6
7 import (
8 "context"
9 "fmt"
10 "io"
11 "runtime"
12 "sync"
13 "sync/atomic"
14 "time"
15 )
16
17
18 type Listener interface {
19
20
21 Accept(context.Context) (io.ReadWriteCloser, error)
22
23
24
25 Close() error
26
27
28
29
30 Dialer() Dialer
31 }
32
33
34 type Dialer interface {
35
36 Dial(ctx context.Context) (io.ReadWriteCloser, error)
37 }
38
39
40 type Server struct {
41 listener Listener
42 binder Binder
43 async *async
44
45 shutdownOnce sync.Once
46 closing int32
47 }
48
49
50
51
52
53
54
55
56
57 func Dial(ctx context.Context, dialer Dialer, binder Binder) (*Connection, error) {
58
59 rwc, err := dialer.Dial(ctx)
60 if err != nil {
61 return nil, err
62 }
63 return newConnection(ctx, rwc, binder, nil), nil
64 }
65
66
67
68
69
70
71
72
73 func NewServer(ctx context.Context, listener Listener, binder Binder) *Server {
74 server := &Server{
75 listener: listener,
76 binder: binder,
77 async: newAsync(),
78 }
79 go server.run(ctx)
80 return server
81 }
82
83
84 func (s *Server) Wait() error {
85 return s.async.wait()
86 }
87
88
89 func (s *Server) Shutdown() {
90 s.shutdownOnce.Do(func() {
91 atomic.StoreInt32(&s.closing, 1)
92 s.listener.Close()
93 })
94 }
95
96
97
98
99 func (s *Server) run(ctx context.Context) {
100 defer s.async.done()
101
102 var activeConns sync.WaitGroup
103 for {
104 rwc, err := s.listener.Accept(ctx)
105 if err != nil {
106
107
108
109 if atomic.LoadInt32(&s.closing) == 0 {
110 s.async.setError(err)
111 }
112
113 break
114 }
115
116
117 activeConns.Add(1)
118 _ = newConnection(ctx, rwc, s.binder, activeConns.Done)
119 }
120 activeConns.Wait()
121 }
122
123
124
125
126
127
128
129 func NewIdleListener(timeout time.Duration, wrap Listener) Listener {
130 l := &idleListener{
131 wrapped: wrap,
132 timeout: timeout,
133 active: make(chan int, 1),
134 timedOut: make(chan struct{}),
135 idleTimer: make(chan *time.Timer, 1),
136 }
137 l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired)
138 return l
139 }
140
141 type idleListener struct {
142 wrapped Listener
143 timeout time.Duration
144
145
146 active chan int
147 timedOut chan struct{}
148 idleTimer chan *time.Timer
149 }
150
151
152
153
154
155 func (l *idleListener) Accept(ctx context.Context) (io.ReadWriteCloser, error) {
156 rwc, err := l.wrapped.Accept(ctx)
157
158 select {
159 case n, ok := <-l.active:
160 if err != nil {
161 if ok {
162 l.active <- n
163 }
164 return nil, err
165 }
166 if ok {
167 l.active <- n + 1
168 } else {
169
170
171
172
173 }
174 return l.newConn(rwc), nil
175
176 case <-l.timedOut:
177 if err == nil {
178
179
180
181 rwc.Close()
182 } else {
183
184
185
186
187 }
188 return nil, ErrIdleTimeout
189
190 case timer := <-l.idleTimer:
191 if err != nil {
192
193
194
195
196 l.idleTimer <- timer
197 return nil, err
198 }
199
200 if !timer.Stop() {
201
202
203
204
205 l.idleTimer <- timer
206 rwc.Close()
207 <-l.timedOut
208 return nil, ErrIdleTimeout
209 }
210
211 l.active <- 1
212 return l.newConn(rwc), nil
213 }
214 }
215
216 func (l *idleListener) Close() error {
217 select {
218 case _, ok := <-l.active:
219 if ok {
220 close(l.active)
221 }
222
223 case <-l.timedOut:
224
225
226
227 return ErrIdleTimeout
228
229 case timer := <-l.idleTimer:
230 if !timer.Stop() {
231
232
233
234
235 l.idleTimer <- timer
236 <-l.timedOut
237 return ErrIdleTimeout
238 }
239 close(l.active)
240 }
241
242 return l.wrapped.Close()
243 }
244
245 func (l *idleListener) Dialer() Dialer {
246 return l.wrapped.Dialer()
247 }
248
249 func (l *idleListener) timerExpired() {
250 select {
251 case n, ok := <-l.active:
252 if ok {
253 panic(fmt.Sprintf("jsonrpc2: idleListener idle timer fired with %d connections still active", n))
254 } else {
255 panic("jsonrpc2: Close finished with idle timer still running")
256 }
257
258 case <-l.timedOut:
259 panic("jsonrpc2: idleListener idle timer fired more than once")
260
261 case <-l.idleTimer:
262
263 }
264
265
266
267 defer close(l.timedOut)
268 l.wrapped.Close()
269 }
270
271 func (l *idleListener) connClosed() {
272 select {
273 case n, ok := <-l.active:
274 if !ok {
275
276
277 return
278 }
279 n--
280 if n == 0 {
281 l.idleTimer <- time.AfterFunc(l.timeout, l.timerExpired)
282 } else {
283 l.active <- n
284 }
285
286 case <-l.timedOut:
287 panic("jsonrpc2: idleListener idle timer fired before last active connection was closed")
288
289 case <-l.idleTimer:
290 panic("jsonrpc2: idleListener idle timer active before last active connection was closed")
291 }
292 }
293
294 type idleListenerConn struct {
295 wrapped io.ReadWriteCloser
296 l *idleListener
297 closeOnce sync.Once
298 }
299
300 func (l *idleListener) newConn(rwc io.ReadWriteCloser) *idleListenerConn {
301 c := &idleListenerConn{
302 wrapped: rwc,
303 l: l,
304 }
305
306
307
308
309
310
311
312 runtime.SetFinalizer(c, func(c *idleListenerConn) {
313 panic("jsonrpc2: IdleListener connection became unreachable without a call to Close")
314 })
315
316 return c
317 }
318
319 func (c *idleListenerConn) Read(p []byte) (int, error) { return c.wrapped.Read(p) }
320 func (c *idleListenerConn) Write(p []byte) (int, error) { return c.wrapped.Write(p) }
321
322 func (c *idleListenerConn) Close() error {
323 defer c.closeOnce.Do(func() {
324 c.l.connClosed()
325 runtime.SetFinalizer(c, nil)
326 })
327 return c.wrapped.Close()
328 }
329
View as plain text