...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package adapter
16
17 import (
18 "context"
19 "errors"
20
21 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
22 "google.golang.org/grpc"
23 )
24
25 var errAlreadySentHeader = errors.New("adapter: already sent header")
26
27 type ws2wc struct{ wserv pb.WatchServer }
28
29 func WatchServerToWatchClient(wserv pb.WatchServer) pb.WatchClient {
30 return &ws2wc{wserv}
31 }
32
33 func (s *ws2wc) Watch(ctx context.Context, opts ...grpc.CallOption) (pb.Watch_WatchClient, error) {
34 cs := newPipeStream(ctx, func(ss chanServerStream) error {
35 return s.wserv.Watch(&ws2wcServerStream{ss})
36 })
37 return &ws2wcClientStream{cs}, nil
38 }
39
40
41 type ws2wcClientStream struct{ chanClientStream }
42
43
44 type ws2wcServerStream struct{ chanServerStream }
45
46 func (s *ws2wcClientStream) Send(wr *pb.WatchRequest) error {
47 return s.SendMsg(wr)
48 }
49 func (s *ws2wcClientStream) Recv() (*pb.WatchResponse, error) {
50 var v interface{}
51 if err := s.RecvMsg(&v); err != nil {
52 return nil, err
53 }
54 return v.(*pb.WatchResponse), nil
55 }
56
57 func (s *ws2wcServerStream) Send(wr *pb.WatchResponse) error {
58 return s.SendMsg(wr)
59 }
60 func (s *ws2wcServerStream) Recv() (*pb.WatchRequest, error) {
61 var v interface{}
62 if err := s.RecvMsg(&v); err != nil {
63 return nil, err
64 }
65 return v.(*pb.WatchRequest), nil
66 }
67
View as plain text