...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rpcreplay
16
17 import (
18 "context"
19 "io"
20 "log"
21 "net"
22
23 pb "cloud.google.com/go/rpcreplay/proto/intstore"
24 "google.golang.org/grpc"
25 "google.golang.org/grpc/codes"
26 "google.golang.org/grpc/status"
27 )
28
29
30 type intStoreServer struct {
31 pb.IntStoreServer
32
33 Addr string
34 l net.Listener
35 gsrv *grpc.Server
36
37 items map[string]int32
38 }
39
40 func newIntStoreServer() *intStoreServer {
41 l, err := net.Listen("tcp", "localhost:0")
42 if err != nil {
43 log.Fatal(err)
44 }
45 s := &intStoreServer{
46 Addr: l.Addr().String(),
47 l: l,
48 gsrv: grpc.NewServer(),
49 }
50 pb.RegisterIntStoreServer(s.gsrv, s)
51 go s.gsrv.Serve(s.l)
52 return s
53 }
54
55 func (s *intStoreServer) stop() {
56 s.gsrv.Stop()
57 s.l.Close()
58 }
59
60 func (s *intStoreServer) Set(_ context.Context, item *pb.Item) (*pb.SetResponse, error) {
61 old := s.setItem(item)
62 return &pb.SetResponse{PrevValue: old}, nil
63 }
64
65 func (s *intStoreServer) setItem(item *pb.Item) int32 {
66 if s.items == nil {
67 s.items = map[string]int32{}
68 }
69 old := s.items[item.Name]
70 s.items[item.Name] = item.Value
71 return old
72 }
73
74 func (s *intStoreServer) Get(_ context.Context, req *pb.GetRequest) (*pb.Item, error) {
75 val, ok := s.items[req.Name]
76 if !ok {
77 return nil, status.Errorf(codes.NotFound, "%q", req.Name)
78 }
79 return &pb.Item{Name: req.Name, Value: val}, nil
80 }
81
82 func (s *intStoreServer) ListItems(req *pb.ListItemsRequest, ss pb.IntStore_ListItemsServer) error {
83 for name, val := range s.items {
84 if val > req.GreaterThan {
85 if err := ss.Send(&pb.Item{Name: name, Value: val}); err != nil {
86 return err
87 }
88 }
89 }
90 return nil
91 }
92
93 func (s *intStoreServer) SetStream(ss pb.IntStore_SetStreamServer) error {
94 n := 0
95 for {
96 item, err := ss.Recv()
97 if err == io.EOF {
98 break
99 }
100 if err != nil {
101 return err
102 }
103 s.setItem(item)
104 n++
105 }
106 return ss.SendAndClose(&pb.Summary{Count: int32(n)})
107 }
108
109 func (s *intStoreServer) StreamChat(ss pb.IntStore_StreamChatServer) error {
110 for {
111 item, err := ss.Recv()
112 if err == io.EOF {
113 break
114 }
115 if err != nil {
116 return err
117 }
118 if err := ss.Send(item); err != nil {
119 return err
120 }
121 }
122 return nil
123 }
124
View as plain text