1
2
3
4
5 package bytestream
6
7 import (
8 "bytes"
9 "context"
10 "errors"
11 "fmt"
12 "io"
13 "log"
14 "net"
15 "testing"
16
17 "google.golang.org/api/transport/bytestream/internal"
18 "google.golang.org/grpc"
19 "google.golang.org/grpc/metadata"
20
21 pb "google.golang.org/genproto/googleapis/bytestream"
22 )
23
24 const testData = "0123456789"
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44 type grpcServer struct {
45 Addr string
46 l net.Listener
47 Gsrv *grpc.Server
48 }
49
50 type TestSetup struct {
51 ctx context.Context
52 rpcTest *grpcServer
53 server *internal.Server
54 client *Client
55 }
56
57 func TestClientRead(t *testing.T) {
58 testCases := []struct {
59 name string
60 input string
61 resourceName string
62 extraBufSize int
63 extraError error
64 want string
65 wantErr bool
66 wantEOF bool
67 }{
68 {
69 name: "test foo",
70 input: testData,
71 resourceName: "foo",
72 want: testData,
73 }, {
74 name: "test bar",
75 input: testData,
76 resourceName: "bar",
77 want: testData,
78 }, {
79 name: "test bar extraBufSize=1",
80 input: testData,
81 resourceName: "bar",
82 extraBufSize: 1,
83 want: testData,
84 wantEOF: true,
85 }, {
86 name: "test bar extraBufSize=2",
87 input: testData,
88 resourceName: "bar",
89 extraBufSize: 2,
90 want: testData,
91 wantEOF: true,
92 }, {
93 name: "empty resource name",
94 input: testData,
95 resourceName: "",
96 extraBufSize: 1,
97 wantErr: true,
98 }, {
99 name: "read after error returns error again",
100 input: testData,
101 resourceName: "does not matter",
102 extraBufSize: 1,
103 extraError: errors.New("some error"),
104 wantErr: true,
105 },
106 }
107
108 for _, tc := range testCases {
109 bufSize := len(tc.want) + tc.extraBufSize
110 if bufSize == 0 {
111 t.Errorf("%s: This is probably wrong. Read returning 0 bytes?", tc.name)
112 continue
113 }
114
115 setup := newTestSetup(tc.input)
116 r, err := setup.client.NewReader(setup.ctx, tc.resourceName)
117 if err != nil {
118 t.Errorf("%s: NewReader(%q): %v", tc.name, tc.resourceName, err)
119 continue
120 }
121 if tc.extraError != nil {
122 r.err = tc.extraError
123 }
124 buf := make([]byte, bufSize)
125 gotEOF := false
126 total := 0
127 for total < bufSize && err == nil {
128 var n int
129 n, err = r.Read(buf[total:])
130 total += n
131 }
132 if err == io.EOF {
133 gotEOF = true
134 err = nil
135 doubleCheckBuf := make([]byte, bufSize)
136 n2, err2 := r.Read(doubleCheckBuf)
137 if err2 != io.EOF {
138 t.Errorf("%s: read and got EOF, double-check: read %d bytes got err=%v", tc.name, n2, err2)
139 continue
140 }
141 }
142 setup.Close()
143
144 if gotErr := err != nil; tc.wantErr != gotErr {
145 t.Errorf("%s: read %d bytes, got err=%v, wantErr=%t", tc.name, total, err, tc.wantErr)
146 continue
147 }
148 if tc.wantEOF != gotEOF {
149 t.Errorf("%s: read %d bytes, gotEOF=%t, wantEOF=%t", tc.name, total, gotEOF, tc.wantEOF)
150 continue
151 }
152 if got := string(buf[:total]); got != tc.want {
153 t.Errorf("%s: read %q, want %q", tc.name, got, tc.want)
154 continue
155 }
156 }
157 }
158
159 func TestClientWrite(t *testing.T) {
160 testCases := []struct {
161 name string
162 resourceName string
163 data string
164 results []int
165 wantWriteErr bool
166 wantCloseErr bool
167 }{
168 {
169 name: "test foo",
170 resourceName: "foo",
171 data: testData,
172 results: []int{len(testData)},
173 }, {
174 name: "empty resource name",
175 resourceName: "",
176 data: testData,
177 results: []int{10},
178
179 wantCloseErr: true,
180 }, {
181 name: "test bar",
182 resourceName: "bar",
183 data: testData,
184 results: []int{len(testData)},
185 },
186 }
187
188 var setup *TestSetup
189
190 tcFor:
191 for _, tc := range testCases {
192 if setup != nil {
193 setup.Close()
194 }
195 setup = newTestSetup("")
196 buf := []byte(tc.data)
197 var ofs int
198 w, err := setup.client.NewWriter(setup.ctx, tc.resourceName)
199 if err != nil {
200 t.Errorf("%s: NewWriter(): %v", tc.name, err)
201 continue
202 }
203
204 for i := 0; i < len(tc.results); i++ {
205 if ofs >= len(tc.data) {
206 t.Errorf("%s [%d]: Attempting to write more than tc.input: ofs=%d len(buf)=%d",
207 tc.name, i, ofs, len(tc.data))
208 continue tcFor
209 }
210 n, err := w.Write(buf[ofs:])
211 ofs += n
212 if gotErr := err != nil; gotErr != tc.wantWriteErr {
213 t.Errorf("%s [%d]: Write() got n=%d err=%v, wantWriteErr=%t", tc.name, i, n, err, tc.wantWriteErr)
214 continue tcFor
215 } else if tc.wantWriteErr && i+1 < len(tc.results) {
216 t.Errorf("%s: wantWriteErr and got err after %d results, len(results)=%d is too long.", tc.name, i+1, len(tc.results))
217 continue tcFor
218 }
219 if n != tc.results[i] {
220 t.Errorf("%s [%d]: Write() wrote %d bytes, want %d bytes", tc.name, i, n, tc.results[i])
221 continue tcFor
222 }
223 }
224
225 err = w.Close()
226 if gotErr := err != nil; gotErr != tc.wantCloseErr {
227 t.Errorf("%s: Close() got err=%v, wantCloseErr=%t", tc.name, err, tc.wantCloseErr)
228 continue tcFor
229 }
230 }
231 setup.Close()
232 }
233
234 func TestClientRead_AfterSetupClose(t *testing.T) {
235 setup := newTestSetup("closed")
236 setup.Close()
237 _, err := setup.client.NewReader(setup.ctx, "should fail")
238 if err == nil {
239 t.Errorf("NewReader(%q): err=%v", "should fail", err)
240 }
241 }
242
243 func TestClientWrite_AfterSetupClose(t *testing.T) {
244 setup := newTestSetup("closed")
245 setup.Close()
246 _, err := setup.client.NewWriter(setup.ctx, "should fail")
247 if err == nil {
248 t.Fatalf("NewWriter(%q): err=%v", "should fail", err)
249 }
250 }
251
252 type UnsendableWriteClient struct {
253 closeAndRecvWriteResponse *pb.WriteResponse
254 closeAndRecvError error
255 }
256
257 func (w *UnsendableWriteClient) Send(*pb.WriteRequest) error {
258 if w.closeAndRecvError != nil {
259 return nil
260 }
261 return errors.New("UnsendableWriteClient.Send() fails unless closeAndRecvError is set")
262 }
263
264 func (w *UnsendableWriteClient) CloseAndRecv() (*pb.WriteResponse, error) {
265 if w.closeAndRecvError == nil {
266 log.Fatalf("UnsendableWriteClient.Close() when closeAndRecvError == nil.")
267 }
268 return w.closeAndRecvWriteResponse, w.closeAndRecvError
269 }
270
271 func (w *UnsendableWriteClient) Context() context.Context {
272 log.Fatalf("UnsendableWriteClient.Context() should never be called")
273 return context.Background()
274 }
275 func (w *UnsendableWriteClient) CloseSend() error {
276 return errors.New("UnsendableWriteClient.CloseSend() should never be called")
277 }
278 func (w *UnsendableWriteClient) Header() (metadata.MD, error) {
279 log.Fatalf("UnsendableWriteClient.Header() should never be called")
280 return metadata.MD{}, nil
281 }
282 func (w *UnsendableWriteClient) Trailer() metadata.MD {
283 log.Fatalf("UnsendableWriteClient.Trailer() should never be called")
284 return metadata.MD{}
285 }
286 func (w *UnsendableWriteClient) SendMsg(m interface{}) error {
287 log.Fatalf("UnsendableWriteClient.SendMsg() should never be called")
288 return nil
289 }
290 func (w *UnsendableWriteClient) RecvMsg(m interface{}) error {
291 log.Fatalf("UnsendableWriteClient.RecvMsg() should never be called")
292 return nil
293 }
294
295 func TestClientWrite_WriteFails(t *testing.T) {
296 setup := newTestSetup("")
297 w, err := setup.client.NewWriter(setup.ctx, "")
298 if err != nil {
299 t.Fatalf("NewWriter(): %v", err)
300 }
301 defer setup.Close()
302 w.writeClient = &UnsendableWriteClient{}
303 _, err = w.Write([]byte(testData))
304 if err == nil {
305 t.Errorf("Write() should fail")
306 }
307 }
308
309 func TestClientWrite_CloseAndRecvFails(t *testing.T) {
310 setup := newTestSetup("")
311 w, err := setup.client.NewWriter(setup.ctx, "CloseAndRecvFails")
312 if err != nil {
313 t.Fatalf("NewWriter(): %v", err)
314 }
315 defer setup.Close()
316 n, err := w.Write([]byte(testData))
317 if err != nil {
318 t.Errorf("Write() failed: %v", err)
319 return
320 }
321 if n != len(testData) {
322 t.Errorf("Write() got n=%d, want n=%d", n, len(testData))
323 return
324 }
325 w.writeClient = &UnsendableWriteClient{
326 closeAndRecvError: errors.New("CloseAndRecv() must fail"),
327 }
328 if err = w.Close(); err == nil {
329 t.Errorf("Close() should fail")
330 return
331 }
332 }
333
334 type TestWriteHandler struct {
335 buf bytes.Buffer
336 name string
337 }
338
339 func (w *TestWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
340 if w.name == "" {
341 w.name = name
342 } else if w.name != name {
343 return nil, fmt.Errorf("writer already has name=%q, now a new name=%q confuses me", w.name, name)
344 }
345
346 return &w.buf, nil
347 }
348
349 func (w *TestWriteHandler) Close(ctx context.Context, name string) error {
350 w.name = ""
351 w.buf.Reset()
352 return nil
353 }
354
355 type TestReadHandler struct {
356 buf string
357 name string
358 }
359
360
361 func (r *TestReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
362 if r.name == "" {
363 r.name = name
364 } else if r.name != name {
365 return nil, fmt.Errorf("reader already has name=%q, now a new name=%q confuses me", r.name, name)
366 }
367 return bytes.NewReader([]byte(r.buf)), nil
368 }
369
370
371 func (r *TestReadHandler) Close(ctx context.Context, name string) error {
372 return nil
373 }
374
375
376
377 func newGRPCServer() (*grpcServer, error) {
378 l, err := net.Listen("tcp", "127.0.0.1:0")
379 if err != nil {
380 return nil, err
381 }
382 s := &grpcServer{
383 Addr: l.Addr().String(),
384 l: l,
385 Gsrv: grpc.NewServer(),
386 }
387 return s, nil
388 }
389
390
391
392 func (s *grpcServer) Start() {
393 go s.Gsrv.Serve(s.l)
394 }
395
396
397 func (s *grpcServer) Close() {
398 s.Gsrv.Stop()
399 s.l.Close()
400 }
401
402 func newTestSetup(input string) *TestSetup {
403 testSetup := &TestSetup{
404 ctx: context.Background(),
405 }
406 testReadHandler := &TestReadHandler{
407 buf: input,
408 }
409 var err error
410 if testSetup.rpcTest, err = newGRPCServer(); err != nil {
411 log.Fatalf("newGRPCServer: %v", err)
412 }
413 if testSetup.server, err = internal.NewServer(testSetup.rpcTest.Gsrv, testReadHandler, &TestWriteHandler{}); err != nil {
414 log.Fatalf("internal.NewServer: %v", err)
415 }
416 testSetup.rpcTest.Start()
417
418 conn, err := grpc.Dial(testSetup.rpcTest.Addr, grpc.WithInsecure())
419 if err != nil {
420 log.Fatalf("grpc.Dial: %v", err)
421 }
422 testSetup.client = NewClient(conn, grpc.FailFast(true))
423 return testSetup
424 }
425
426 func (testSetup *TestSetup) Close() {
427 testSetup.client.Close()
428 testSetup.rpcTest.Close()
429 }
430
View as plain text