1
2
3
4
5
6
7
8 package bytestream
9
10
11
12
13 import (
14 "context"
15 "fmt"
16 "math/rand"
17 "time"
18
19 "google.golang.org/grpc"
20
21 pb "google.golang.org/genproto/googleapis/bytestream"
22 )
23
24 const (
25
26 MaxBufSize = 2 * 1024 * 1024
27 backoffBase = 10 * time.Millisecond
28 backoffMax = 1 * time.Second
29 maxTries = 5
30 )
31
32
33 type Client struct {
34 client pb.ByteStreamClient
35 options []grpc.CallOption
36 conn *grpc.ClientConn
37 }
38
39
40 func NewClient(cc *grpc.ClientConn, options ...grpc.CallOption) *Client {
41 return &Client{
42 client: pb.NewByteStreamClient(cc),
43 options: options,
44 conn: cc,
45 }
46 }
47
48
49 type Reader struct {
50 ctx context.Context
51 c *Client
52 readClient pb.ByteStream_ReadClient
53 resourceName string
54 err error
55 buf []byte
56 }
57
58
59 func (r *Reader) ResourceName() string {
60 return r.resourceName
61 }
62
63
64
65 func (r *Reader) Read(p []byte) (int, error) {
66 if r.err != nil {
67 return 0, r.err
68 }
69
70 var backoffDelay time.Duration
71 for tries := 0; len(r.buf) == 0 && tries < maxTries; tries++ {
72
73 resp, err := r.readClient.Recv()
74 if err != nil {
75 r.err = err
76 return 0, err
77 }
78 r.buf = resp.Data
79 if len(r.buf) != 0 {
80 break
81 }
82
83
84 if backoffDelay < backoffBase {
85 backoffDelay = backoffBase
86 } else {
87 backoffDelay = time.Duration(float64(backoffDelay) * 1.3 * (1 - 0.4*rand.Float64()))
88 }
89 if backoffDelay > backoffMax {
90 backoffDelay = backoffMax
91 }
92 t := time.NewTimer(backoffDelay)
93 select {
94 case <-t.C:
95 case <-r.ctx.Done():
96 t.Stop()
97 if err := r.ctx.Err(); err != nil {
98 r.err = err
99 }
100 return 0, r.err
101 }
102 }
103
104
105 n := copy(p, r.buf)
106 r.buf = r.buf[n:]
107 return n, nil
108 }
109
110
111 func (r *Reader) Close() error {
112 if r.readClient == nil {
113 return nil
114 }
115 err := r.readClient.CloseSend()
116 r.readClient = nil
117 return err
118 }
119
120
121 func (c *Client) NewReader(ctx context.Context, resourceName string) (*Reader, error) {
122 return c.NewReaderAt(ctx, resourceName, 0)
123 }
124
125
126 func (c *Client) NewReaderAt(ctx context.Context, resourceName string, offset int64) (*Reader, error) {
127
128 readClient, err := c.client.Read(ctx, &pb.ReadRequest{
129 ResourceName: resourceName,
130 ReadOffset: offset,
131 }, c.options...)
132 if err != nil {
133 return nil, err
134 }
135
136 return &Reader{
137 ctx: ctx,
138 c: c,
139 resourceName: resourceName,
140 readClient: readClient,
141 }, nil
142 }
143
144
145 type Writer struct {
146 ctx context.Context
147 writeClient pb.ByteStream_WriteClient
148 resourceName string
149 offset int64
150 err error
151 }
152
153
154 func (w *Writer) ResourceName() string {
155 return w.resourceName
156 }
157
158
159 func (w *Writer) Write(p []byte) (int, error) {
160 if w.err != nil {
161 return 0, w.err
162 }
163
164 n := 0
165 for n < len(p) {
166 bufSize := len(p) - n
167 if bufSize > MaxBufSize {
168 bufSize = MaxBufSize
169 }
170 r := pb.WriteRequest{
171 WriteOffset: w.offset,
172 FinishWrite: false,
173 Data: p[n : n+bufSize],
174 }
175
176 if w.offset == 0 {
177 r.ResourceName = w.resourceName
178 }
179 err := w.writeClient.Send(&r)
180 if err != nil {
181 w.err = err
182 return n, err
183 }
184 w.offset += int64(bufSize)
185 n += bufSize
186 }
187 return n, nil
188 }
189
190
191 func (w *Writer) Close() error {
192 err := w.writeClient.Send(&pb.WriteRequest{
193 ResourceName: w.resourceName,
194 WriteOffset: w.offset,
195 FinishWrite: true,
196 Data: nil,
197 })
198 if err != nil {
199 w.err = err
200 return fmt.Errorf("Send(WriteRequest< FinishWrite >) failed: %v", err)
201 }
202 resp, err := w.writeClient.CloseAndRecv()
203 if err != nil {
204 w.err = err
205 return fmt.Errorf("CloseAndRecv: %v", err)
206 }
207 if resp == nil {
208 err = fmt.Errorf("expected a response on close, got %v", resp)
209 } else if resp.CommittedSize != w.offset {
210 err = fmt.Errorf("server only wrote %d bytes, want %d", resp.CommittedSize, w.offset)
211 }
212 w.err = err
213 return err
214 }
215
216
217
218
219
220
221
222
223
224 func (c *Client) NewWriter(ctx context.Context, resourceName string) (*Writer, error) {
225 wc, err := c.client.Write(ctx, c.options...)
226 if err != nil {
227 return nil, err
228 }
229 return &Writer{
230 ctx: ctx,
231 writeClient: wc,
232 resourceName: resourceName,
233 }, nil
234 }
235
236
237
238 func (c *Client) Close() {
239 c.conn.Close()
240 }
241
View as plain text