1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package storage
16
17 import (
18 "bytes"
19 "compress/gzip"
20 "context"
21 "errors"
22 "fmt"
23 "io"
24 "io/ioutil"
25 "net/http"
26 "net/http/httptest"
27 "net/url"
28 "strconv"
29 "strings"
30 "testing"
31
32 "google.golang.org/api/option"
33 )
34
35 const readData = "0123456789"
36
37 func TestRangeReader(t *testing.T) {
38 ctx := context.Background()
39 hc, close := newTestServer(handleRangeRead)
40 defer close()
41
42 multiReaderTest(ctx, t, func(t *testing.T, c *Client) {
43 obj := c.Bucket("b").Object("o")
44 for _, test := range []struct {
45 offset, length int64
46 want string
47 }{
48 {0, -1, readData},
49 {0, 10, readData},
50 {0, 5, readData[:5]},
51 {1, 3, readData[1:4]},
52 {6, -1, readData[6:]},
53 {4, 20, readData[4:]},
54 {-20, -1, readData},
55 {-6, -1, readData[4:]},
56 } {
57 r, err := obj.NewRangeReader(ctx, test.offset, test.length)
58 if err != nil {
59 t.Errorf("%d/%d: %v", test.offset, test.length, err)
60 continue
61 }
62 gotb, err := ioutil.ReadAll(r)
63 if err != nil {
64 t.Errorf("%d/%d: %v", test.offset, test.length, err)
65 continue
66 }
67 if got := string(gotb); got != test.want {
68 t.Errorf("%d/%d: got %q, want %q", test.offset, test.length, got, test.want)
69 }
70 }
71 }, option.WithHTTPClient(hc))
72 }
73
74 func handleRangeRead(w http.ResponseWriter, r *http.Request) {
75 rh := strings.TrimSpace(r.Header.Get("Range"))
76 data := readData
77 var from, to int
78 if rh == "" {
79 from = 0
80 to = len(data)
81 } else {
82
83 var err error
84 i := strings.IndexRune(rh, '=')
85 j := strings.IndexRune(rh, '-')
86 hasPositiveStartOffset := i+1 != j
87 if hasPositiveStartOffset {
88 from, err = strconv.Atoi(rh[i+1 : j])
89 } else {
90 from, err = strconv.Atoi(rh[i+1:])
91 from += len(data)
92 if from < 0 {
93 from = 0
94 }
95 }
96 if err != nil {
97 w.WriteHeader(500)
98 return
99 }
100 to = len(data)
101 if hasPositiveStartOffset && j+1 < len(rh) {
102 to, err = strconv.Atoi(rh[j+1:])
103 if err != nil {
104 w.WriteHeader(500)
105 return
106 }
107 to++
108 }
109 if from >= len(data) && to != from {
110 w.WriteHeader(416)
111 return
112 }
113 if from > len(data) {
114 from = len(data)
115 }
116 if to > len(data) {
117 to = len(data)
118 }
119 }
120 data = data[from:to]
121 if data != readData {
122 w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", from, to-1, len(readData)))
123 w.WriteHeader(http.StatusPartialContent)
124 }
125 if _, err := w.Write([]byte(data)); err != nil {
126 panic(err)
127 }
128 }
129
130 type http2Error string
131
132 func (h http2Error) Error() string {
133 return string(h)
134 }
135
136
137
138
139
140 func TestRangeReaderRetry(t *testing.T) {
141 internalErr := http2Error("blah blah INTERNAL_ERROR")
142 goawayErr := http2Error("http2: server sent GOAWAY and closed the connection; LastStreamID=15, ErrCode=NO_ERROR, debug=\"load_shed\"")
143 readBytes := []byte(readData)
144 hc, close := newTestServer(handleRangeRead)
145 defer close()
146 ctx := context.Background()
147
148 multiReaderTest(ctx, t, func(t *testing.T, c *Client) {
149 obj := c.Bucket("b").Object("o")
150 for i, test := range []struct {
151 offset, length int64
152 bodies []fakeReadCloser
153 want string
154 }{
155 {
156 offset: 0,
157 length: -1,
158 bodies: []fakeReadCloser{
159 {data: readBytes, counts: []int{10}, err: io.EOF},
160 },
161 want: readData,
162 },
163 {
164 offset: 0,
165 length: -1,
166 bodies: []fakeReadCloser{
167 {data: readBytes, counts: []int{3}, err: internalErr},
168 {data: readBytes[3:], counts: []int{5, 2}, err: io.EOF},
169 },
170 want: readData,
171 },
172 {
173 offset: 0,
174 length: -1,
175 bodies: []fakeReadCloser{
176 {data: readBytes, counts: []int{5}, err: internalErr},
177 {data: readBytes[5:], counts: []int{1, 3}, err: goawayErr},
178 {data: readBytes[9:], counts: []int{1}, err: io.EOF},
179 },
180 want: readData,
181 },
182 {
183 offset: 0,
184 length: 5,
185 bodies: []fakeReadCloser{
186 {data: readBytes, counts: []int{3}, err: internalErr},
187 {data: readBytes[3:], counts: []int{2}, err: io.EOF},
188 },
189 want: readData[:5],
190 },
191 {
192 offset: 0,
193 length: 5,
194 bodies: []fakeReadCloser{
195 {data: readBytes, counts: []int{3}, err: goawayErr},
196 {data: readBytes[3:], counts: []int{2}, err: io.EOF},
197 },
198 want: readData[:5],
199 },
200 {
201 offset: 1,
202 length: 5,
203 bodies: []fakeReadCloser{
204 {data: readBytes, counts: []int{3}, err: internalErr},
205 {data: readBytes[3:], counts: []int{2}, err: io.EOF},
206 },
207 want: readData[:5],
208 },
209 {
210 offset: 1,
211 length: 3,
212 bodies: []fakeReadCloser{
213 {data: readBytes[1:], counts: []int{1}, err: internalErr},
214 {data: readBytes[2:], counts: []int{2}, err: io.EOF},
215 },
216 want: readData[1:4],
217 },
218 {
219 offset: 4,
220 length: -1,
221 bodies: []fakeReadCloser{
222 {data: readBytes[4:], counts: []int{1}, err: internalErr},
223 {data: readBytes[5:], counts: []int{4}, err: internalErr},
224 {data: readBytes[9:], counts: []int{1}, err: io.EOF},
225 },
226 want: readData[4:],
227 },
228 {
229 offset: -4,
230 length: -1,
231 bodies: []fakeReadCloser{
232 {data: readBytes[6:], counts: []int{1}, err: internalErr},
233 {data: readBytes[7:], counts: []int{3}, err: io.EOF},
234 },
235 want: readData[6:],
236 },
237 } {
238 r, err := obj.NewRangeReader(ctx, test.offset, test.length)
239 if err != nil {
240 t.Errorf("#%d: %v", i, err)
241 continue
242 }
243 b := 0
244 r.reader = &httpReader{
245 body: &test.bodies[0],
246 reopen: func(int64) (*http.Response, error) {
247 b++
248 return &http.Response{Body: &test.bodies[b]}, nil
249 },
250 }
251 buf := make([]byte, len(readData)/2)
252 var gotb []byte
253 for {
254 n, err := r.Read(buf)
255 gotb = append(gotb, buf[:n]...)
256 if err == io.EOF {
257 break
258 }
259 if err != nil {
260 t.Fatalf("#%d: %v", i, err)
261 }
262 }
263 if err != nil {
264 t.Errorf("#%d: %v", i, err)
265 continue
266 }
267 if got := string(gotb); got != test.want {
268 t.Errorf("#%d: got %q, want %q", i, got, test.want)
269 }
270 if r.Attrs.Size != int64(len(readData)) {
271 t.Errorf("#%d: got Attrs.Size=%q, want %q", i, r.Attrs.Size, len(readData))
272 }
273 wantOffset := test.offset
274 if wantOffset < 0 {
275 wantOffset += int64(len(readData))
276 if wantOffset < 0 {
277 wantOffset = 0
278 }
279 }
280 if got := r.Attrs.StartOffset; got != wantOffset {
281 t.Errorf("#%d: got Attrs.Offset=%q, want %q", i, got, wantOffset)
282 }
283 }
284 r, err := obj.NewRangeReader(ctx, -100, 10)
285 if err == nil {
286 t.Fatal("Expected a non-nil error with negative offset and positive length")
287 } else if want := "storage: invalid offset"; !strings.HasPrefix(err.Error(), want) {
288 t.Errorf("Error mismatch\nGot: %q\nWant prefix: %q\n", err.Error(), want)
289 }
290 if r != nil {
291 t.Errorf("Expected nil reader")
292 }
293 }, option.WithHTTPClient(hc))
294 }
295
296 type fakeReadCloser struct {
297 data []byte
298 counts []int
299 err error
300
301 d int
302 c int
303 }
304
305 func (f *fakeReadCloser) Close() error {
306 return nil
307 }
308
309 func (f *fakeReadCloser) Read(buf []byte) (int, error) {
310 i := f.c
311 n := 0
312 if i < len(f.counts) {
313 n = f.counts[i]
314 }
315 var err error
316 if i >= len(f.counts)-1 {
317 err = f.err
318 }
319 copy(buf, f.data[f.d:f.d+n])
320 if len(buf) < n {
321 n = len(buf)
322 f.counts[i] -= n
323 err = nil
324 } else {
325 f.c++
326 }
327 f.d += n
328 return n, err
329 }
330
331 func TestFakeReadCloser(t *testing.T) {
332 e := errors.New("")
333 f := &fakeReadCloser{
334 data: []byte(readData),
335 counts: []int{1, 2, 3},
336 err: e,
337 }
338 wants := []string{"0", "12", "345"}
339 buf := make([]byte, 10)
340 for i := 0; i < 3; i++ {
341 n, err := f.Read(buf)
342 if got, want := n, f.counts[i]; got != want {
343 t.Fatalf("i=%d: got %d, want %d", i, got, want)
344 }
345 var wantErr error
346 if i == 2 {
347 wantErr = e
348 }
349 if err != wantErr {
350 t.Fatalf("i=%d: got error %v, want %v", i, err, wantErr)
351 }
352 if got, want := string(buf[:n]), wants[i]; got != want {
353 t.Fatalf("i=%d: got %q, want %q", i, got, want)
354 }
355 }
356 }
357
358 func TestContentEncodingGzipWithReader(t *testing.T) {
359 bucketName := "my-bucket"
360 objectName := "gzip-test"
361 getAttrsURL := fmt.Sprintf("/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName)
362 downloadObjectXMLurl := fmt.Sprintf("/%s/%s", bucketName, objectName)
363 downloadObjectJSONurl := fmt.Sprintf("/b/%s/o/%s?alt=media&prettyPrint=false&projection=full", bucketName, objectName)
364
365 original := bytes.Repeat([]byte("a"), 4<<10)
366 mockGCS := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
367 switch r.URL.String() {
368 case getAttrsURL:
369 fmt.Fprintf(w, `{
370 "bucket": "bucket", "name": "name", "contentEncoding": "gzip",
371 "contentLength": 43,
372 "contentType": "text/plain","timeCreated": "2020-04-10T16:08:58-07:00",
373 "updated": "2020-04-14T16:08:58-07:00"
374 }`)
375 return
376 case downloadObjectXMLurl, downloadObjectJSONurl:
377
378 w.Header().Set("Content-Type", "text/plain")
379 w.Header().Set("Content-Encoding", "gzip")
380 w.Header().Set("Etag", `"c50e3e41c9bc9df34e84c94ce073f928"`)
381 w.Header().Set("X-Goog-Generation", "1587012235914578")
382 w.Header().Set("X-Goog-MetaGeneration", "2")
383 w.Header().Set("X-Goog-Stored-Content-Encoding", "gzip")
384 w.Header().Set("vary", "Accept-Encoding")
385 w.Header().Set("x-goog-stored-content-length", "43")
386 w.Header().Set("x-goog-hash", "crc32c=pYIWwQ==")
387 w.Header().Set("x-goog-hash", "md5=xQ4+Qcm8nfNOhMlM4HP5KA==")
388 w.Header().Set("x-goog-storage-class", "STANDARD")
389 gz := gzip.NewWriter(w)
390 gz.Write(original)
391 gz.Close()
392 default:
393 fmt.Fprintf(w, "unrecognized URL %s", r.URL)
394 }
395 }))
396 mockGCS.EnableHTTP2 = true
397 mockGCS.StartTLS()
398 defer mockGCS.Close()
399
400 ctx := context.Background()
401 hc := mockGCS.Client()
402 ux, _ := url.Parse(mockGCS.URL)
403 hc.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify = true
404 wrt := &alwaysToTargetURLRoundTripper{
405 destURL: ux,
406 hc: hc,
407 }
408
409 whc := &http.Client{Transport: wrt}
410
411
412 readerCreators := []struct {
413 name string
414 create func(ctx context.Context, obj *ObjectHandle) (*Reader, error)
415 }{
416 {
417 "NewReader", func(cxt context.Context, obj *ObjectHandle) (*Reader, error) {
418 return obj.NewReader(ctx)
419 },
420 },
421 {
422 "NewRangeReader(0, -1)",
423 func(ctx context.Context, obj *ObjectHandle) (*Reader, error) {
424 return obj.NewRangeReader(ctx, 0, -1)
425 },
426 },
427 {
428 "NewRangeReader(1kB, 2kB)",
429 func(ctx context.Context, obj *ObjectHandle) (*Reader, error) {
430 return obj.NewRangeReader(ctx, 1<<10, 2<<10)
431 },
432 },
433 {
434 "NewRangeReader(2kB, -1)",
435 func(ctx context.Context, obj *ObjectHandle) (*Reader, error) {
436 return obj.NewRangeReader(ctx, 2<<10, -1)
437 },
438 },
439 {
440 "NewRangeReader(2kB, 3kB)",
441 func(ctx context.Context, obj *ObjectHandle) (*Reader, error) {
442 return obj.NewRangeReader(ctx, 2<<10, 3<<10)
443 },
444 },
445 }
446
447 multiReaderTest(ctx, t, func(t *testing.T, c *Client) {
448 for _, tt := range readerCreators {
449 t.Run(tt.name, func(t *testing.T) {
450 obj := c.Bucket(bucketName).Object(objectName)
451 _, err := obj.Attrs(ctx)
452 if err != nil {
453 t.Fatal(err)
454 }
455 rd, err := tt.create(ctx, obj)
456 if err != nil {
457 t.Fatal(err)
458 }
459 defer rd.Close()
460
461 got, err := ioutil.ReadAll(rd)
462 if err != nil {
463 t.Fatal(err)
464 }
465 if g, w := got, original; !bytes.Equal(g, w) {
466 t.Fatalf("Response mismatch\nGot:\n%q\n\nWant:\n%q", g, w)
467 }
468 })
469 }
470 }, option.WithEndpoint(mockGCS.URL), option.WithoutAuthentication(), option.WithHTTPClient(whc))
471 }
472
473
474
475
476 type alwaysToTargetURLRoundTripper struct {
477 destURL *url.URL
478 hc *http.Client
479 }
480
481 func (adrt *alwaysToTargetURLRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
482 req.URL.Host = adrt.destURL.Host
483
484
485
486
487 delete(req.Header, "Range")
488 return adrt.hc.Do(req)
489 }
490
491
492
493
494
495
496 func multiReaderTest(ctx context.Context, t *testing.T, test func(*testing.T, *Client), opts ...option.ClientOption) {
497 jsonOpts := append(opts, WithJSONReads())
498 xmlOpts := append(opts, WithXMLReads())
499 jsonClient, err := NewClient(ctx, jsonOpts...)
500 if err != nil {
501 t.Fatal(err)
502 }
503 xmlClient, err := NewClient(ctx, xmlOpts...)
504 if err != nil {
505 t.Fatal(err)
506 }
507
508 clients := map[string]*Client{
509 "xmlReads": xmlClient,
510 "jsonReads": jsonClient,
511 }
512
513 for transport, client := range clients {
514 t.Run(transport, func(t *testing.T) {
515 defer client.Close()
516
517 test(t, client)
518 })
519 }
520 }
521
View as plain text