1
2
3
4
5 package jsonrpc2
6
7 import (
8 "context"
9 "fmt"
10 "sync"
11
12 "golang.org/x/tools/internal/event"
13 )
14
15
16
17 type Handler func(ctx context.Context, reply Replier, req Request) error
18
19
20
21 type Replier func(ctx context.Context, result interface{}, err error) error
22
23
24
25
26 func MethodNotFound(ctx context.Context, reply Replier, req Request) error {
27 return reply(ctx, nil, fmt.Errorf("%w: %q", ErrMethodNotFound, req.Method()))
28 }
29
30
31
32 func MustReplyHandler(handler Handler) Handler {
33 return func(ctx context.Context, reply Replier, req Request) error {
34 called := false
35 err := handler(ctx, func(ctx context.Context, result interface{}, err error) error {
36 if called {
37 panic(fmt.Errorf("request %q replied to more than once", req.Method()))
38 }
39 called = true
40 return reply(ctx, result, err)
41 }, req)
42 if !called {
43 panic(fmt.Errorf("request %q was never replied to", req.Method()))
44 }
45 return err
46 }
47 }
48
49
50
51 func CancelHandler(handler Handler) (Handler, func(id ID)) {
52 var mu sync.Mutex
53 handling := make(map[ID]context.CancelFunc)
54 wrapped := func(ctx context.Context, reply Replier, req Request) error {
55 if call, ok := req.(*Call); ok {
56 cancelCtx, cancel := context.WithCancel(ctx)
57 ctx = cancelCtx
58 mu.Lock()
59 handling[call.ID()] = cancel
60 mu.Unlock()
61 innerReply := reply
62 reply = func(ctx context.Context, result interface{}, err error) error {
63 mu.Lock()
64 delete(handling, call.ID())
65 mu.Unlock()
66 return innerReply(ctx, result, err)
67 }
68 }
69 return handler(ctx, reply, req)
70 }
71 return wrapped, func(id ID) {
72 mu.Lock()
73 cancel, found := handling[id]
74 mu.Unlock()
75 if found {
76 cancel()
77 }
78 }
79 }
80
81
82
83
84
85
86
87 func AsyncHandler(handler Handler) Handler {
88 nextRequest := make(chan struct{})
89 close(nextRequest)
90 return func(ctx context.Context, reply Replier, req Request) error {
91 waitForPrevious := nextRequest
92 nextRequest = make(chan struct{})
93 unlockNext := nextRequest
94 innerReply := reply
95 reply = func(ctx context.Context, result interface{}, err error) error {
96 close(unlockNext)
97 return innerReply(ctx, result, err)
98 }
99 _, queueDone := event.Start(ctx, "queued")
100 go func() {
101 <-waitForPrevious
102 queueDone()
103 if err := handler(ctx, reply, req); err != nil {
104 event.Error(ctx, "jsonrpc2 async message delivery failed", err)
105 }
106 }()
107 return nil
108 }
109 }
110
View as plain text