...

Source file src/cloud.google.com/go/storage/reader_test.go

Documentation: cloud.google.com/go/storage

     1  // Copyright 2018 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package storage
    16  
    17  import (
    18  	"bytes"
    19  	"compress/gzip"
    20  	"context"
    21  	"errors"
    22  	"fmt"
    23  	"io"
    24  	"io/ioutil"
    25  	"net/http"
    26  	"net/http/httptest"
    27  	"net/url"
    28  	"strconv"
    29  	"strings"
    30  	"testing"
    31  
    32  	"google.golang.org/api/option"
    33  )
    34  
    35  const readData = "0123456789"
    36  
    37  func TestRangeReader(t *testing.T) {
    38  	ctx := context.Background()
    39  	hc, close := newTestServer(handleRangeRead)
    40  	defer close()
    41  
    42  	multiReaderTest(ctx, t, func(t *testing.T, c *Client) {
    43  		obj := c.Bucket("b").Object("o")
    44  		for _, test := range []struct {
    45  			offset, length int64
    46  			want           string
    47  		}{
    48  			{0, -1, readData},
    49  			{0, 10, readData},
    50  			{0, 5, readData[:5]},
    51  			{1, 3, readData[1:4]},
    52  			{6, -1, readData[6:]},
    53  			{4, 20, readData[4:]},
    54  			{-20, -1, readData},
    55  			{-6, -1, readData[4:]},
    56  		} {
    57  			r, err := obj.NewRangeReader(ctx, test.offset, test.length)
    58  			if err != nil {
    59  				t.Errorf("%d/%d: %v", test.offset, test.length, err)
    60  				continue
    61  			}
    62  			gotb, err := ioutil.ReadAll(r)
    63  			if err != nil {
    64  				t.Errorf("%d/%d: %v", test.offset, test.length, err)
    65  				continue
    66  			}
    67  			if got := string(gotb); got != test.want {
    68  				t.Errorf("%d/%d: got %q, want %q", test.offset, test.length, got, test.want)
    69  			}
    70  		}
    71  	}, option.WithHTTPClient(hc))
    72  }
    73  
    74  func handleRangeRead(w http.ResponseWriter, r *http.Request) {
    75  	rh := strings.TrimSpace(r.Header.Get("Range"))
    76  	data := readData
    77  	var from, to int
    78  	if rh == "" {
    79  		from = 0
    80  		to = len(data)
    81  	} else {
    82  		// assume "bytes=N-", "bytes=-N" or "bytes=N-M"
    83  		var err error
    84  		i := strings.IndexRune(rh, '=')
    85  		j := strings.IndexRune(rh, '-')
    86  		hasPositiveStartOffset := i+1 != j
    87  		if hasPositiveStartOffset { // The case of "bytes=N-"
    88  			from, err = strconv.Atoi(rh[i+1 : j])
    89  		} else { // The case of "bytes=-N"
    90  			from, err = strconv.Atoi(rh[i+1:])
    91  			from += len(data)
    92  			if from < 0 {
    93  				from = 0
    94  			}
    95  		}
    96  		if err != nil {
    97  			w.WriteHeader(500)
    98  			return
    99  		}
   100  		to = len(data)
   101  		if hasPositiveStartOffset && j+1 < len(rh) { // The case of "bytes=N-M"
   102  			to, err = strconv.Atoi(rh[j+1:])
   103  			if err != nil {
   104  				w.WriteHeader(500)
   105  				return
   106  			}
   107  			to++ // Range header is inclusive, Go slice is exclusive
   108  		}
   109  		if from >= len(data) && to != from {
   110  			w.WriteHeader(416)
   111  			return
   112  		}
   113  		if from > len(data) {
   114  			from = len(data)
   115  		}
   116  		if to > len(data) {
   117  			to = len(data)
   118  		}
   119  	}
   120  	data = data[from:to]
   121  	if data != readData {
   122  		w.Header().Set("Content-Range", fmt.Sprintf("bytes %d-%d/%d", from, to-1, len(readData)))
   123  		w.WriteHeader(http.StatusPartialContent)
   124  	}
   125  	if _, err := w.Write([]byte(data)); err != nil {
   126  		panic(err)
   127  	}
   128  }
   129  
   130  type http2Error string
   131  
   132  func (h http2Error) Error() string {
   133  	return string(h)
   134  }
   135  
   136  // TestRangeReaderRetry tests Reader resumption logic. It ensures that offset
   137  // and seen bytes are handled correctly so that data is not corrupted.
   138  // This tests only works for the HTTP Reader.
   139  // TODO: Design a similar test for gRPC.
   140  func TestRangeReaderRetry(t *testing.T) {
   141  	internalErr := http2Error("blah blah INTERNAL_ERROR")
   142  	goawayErr := http2Error("http2: server sent GOAWAY and closed the connection; LastStreamID=15, ErrCode=NO_ERROR, debug=\"load_shed\"")
   143  	readBytes := []byte(readData)
   144  	hc, close := newTestServer(handleRangeRead)
   145  	defer close()
   146  	ctx := context.Background()
   147  
   148  	multiReaderTest(ctx, t, func(t *testing.T, c *Client) {
   149  		obj := c.Bucket("b").Object("o")
   150  		for i, test := range []struct {
   151  			offset, length int64
   152  			bodies         []fakeReadCloser
   153  			want           string
   154  		}{
   155  			{
   156  				offset: 0,
   157  				length: -1,
   158  				bodies: []fakeReadCloser{
   159  					{data: readBytes, counts: []int{10}, err: io.EOF},
   160  				},
   161  				want: readData,
   162  			},
   163  			{
   164  				offset: 0,
   165  				length: -1,
   166  				bodies: []fakeReadCloser{
   167  					{data: readBytes, counts: []int{3}, err: internalErr},
   168  					{data: readBytes[3:], counts: []int{5, 2}, err: io.EOF},
   169  				},
   170  				want: readData,
   171  			},
   172  			{
   173  				offset: 0,
   174  				length: -1,
   175  				bodies: []fakeReadCloser{
   176  					{data: readBytes, counts: []int{5}, err: internalErr},
   177  					{data: readBytes[5:], counts: []int{1, 3}, err: goawayErr},
   178  					{data: readBytes[9:], counts: []int{1}, err: io.EOF},
   179  				},
   180  				want: readData,
   181  			},
   182  			{
   183  				offset: 0,
   184  				length: 5,
   185  				bodies: []fakeReadCloser{
   186  					{data: readBytes, counts: []int{3}, err: internalErr},
   187  					{data: readBytes[3:], counts: []int{2}, err: io.EOF},
   188  				},
   189  				want: readData[:5],
   190  			},
   191  			{
   192  				offset: 0,
   193  				length: 5,
   194  				bodies: []fakeReadCloser{
   195  					{data: readBytes, counts: []int{3}, err: goawayErr},
   196  					{data: readBytes[3:], counts: []int{2}, err: io.EOF},
   197  				},
   198  				want: readData[:5],
   199  			},
   200  			{
   201  				offset: 1,
   202  				length: 5,
   203  				bodies: []fakeReadCloser{
   204  					{data: readBytes, counts: []int{3}, err: internalErr},
   205  					{data: readBytes[3:], counts: []int{2}, err: io.EOF},
   206  				},
   207  				want: readData[:5],
   208  			},
   209  			{
   210  				offset: 1,
   211  				length: 3,
   212  				bodies: []fakeReadCloser{
   213  					{data: readBytes[1:], counts: []int{1}, err: internalErr},
   214  					{data: readBytes[2:], counts: []int{2}, err: io.EOF},
   215  				},
   216  				want: readData[1:4],
   217  			},
   218  			{
   219  				offset: 4,
   220  				length: -1,
   221  				bodies: []fakeReadCloser{
   222  					{data: readBytes[4:], counts: []int{1}, err: internalErr},
   223  					{data: readBytes[5:], counts: []int{4}, err: internalErr},
   224  					{data: readBytes[9:], counts: []int{1}, err: io.EOF},
   225  				},
   226  				want: readData[4:],
   227  			},
   228  			{
   229  				offset: -4,
   230  				length: -1,
   231  				bodies: []fakeReadCloser{
   232  					{data: readBytes[6:], counts: []int{1}, err: internalErr},
   233  					{data: readBytes[7:], counts: []int{3}, err: io.EOF},
   234  				},
   235  				want: readData[6:],
   236  			},
   237  		} {
   238  			r, err := obj.NewRangeReader(ctx, test.offset, test.length)
   239  			if err != nil {
   240  				t.Errorf("#%d: %v", i, err)
   241  				continue
   242  			}
   243  			b := 0
   244  			r.reader = &httpReader{
   245  				body: &test.bodies[0],
   246  				reopen: func(int64) (*http.Response, error) {
   247  					b++
   248  					return &http.Response{Body: &test.bodies[b]}, nil
   249  				},
   250  			}
   251  			buf := make([]byte, len(readData)/2)
   252  			var gotb []byte
   253  			for {
   254  				n, err := r.Read(buf)
   255  				gotb = append(gotb, buf[:n]...)
   256  				if err == io.EOF {
   257  					break
   258  				}
   259  				if err != nil {
   260  					t.Fatalf("#%d: %v", i, err)
   261  				}
   262  			}
   263  			if err != nil {
   264  				t.Errorf("#%d: %v", i, err)
   265  				continue
   266  			}
   267  			if got := string(gotb); got != test.want {
   268  				t.Errorf("#%d: got %q, want %q", i, got, test.want)
   269  			}
   270  			if r.Attrs.Size != int64(len(readData)) {
   271  				t.Errorf("#%d: got Attrs.Size=%q, want %q", i, r.Attrs.Size, len(readData))
   272  			}
   273  			wantOffset := test.offset
   274  			if wantOffset < 0 {
   275  				wantOffset += int64(len(readData))
   276  				if wantOffset < 0 {
   277  					wantOffset = 0
   278  				}
   279  			}
   280  			if got := r.Attrs.StartOffset; got != wantOffset {
   281  				t.Errorf("#%d: got Attrs.Offset=%q, want %q", i, got, wantOffset)
   282  			}
   283  		}
   284  		r, err := obj.NewRangeReader(ctx, -100, 10)
   285  		if err == nil {
   286  			t.Fatal("Expected a non-nil error with negative offset and positive length")
   287  		} else if want := "storage: invalid offset"; !strings.HasPrefix(err.Error(), want) {
   288  			t.Errorf("Error mismatch\nGot:  %q\nWant prefix: %q\n", err.Error(), want)
   289  		}
   290  		if r != nil {
   291  			t.Errorf("Expected nil reader")
   292  		}
   293  	}, option.WithHTTPClient(hc))
   294  }
   295  
   296  type fakeReadCloser struct {
   297  	data   []byte
   298  	counts []int // how much of data to deliver on each read
   299  	err    error // error to return with last count
   300  
   301  	d int // current position in data
   302  	c int // current position in counts
   303  }
   304  
   305  func (f *fakeReadCloser) Close() error {
   306  	return nil
   307  }
   308  
   309  func (f *fakeReadCloser) Read(buf []byte) (int, error) {
   310  	i := f.c
   311  	n := 0
   312  	if i < len(f.counts) {
   313  		n = f.counts[i]
   314  	}
   315  	var err error
   316  	if i >= len(f.counts)-1 {
   317  		err = f.err
   318  	}
   319  	copy(buf, f.data[f.d:f.d+n])
   320  	if len(buf) < n {
   321  		n = len(buf)
   322  		f.counts[i] -= n
   323  		err = nil
   324  	} else {
   325  		f.c++
   326  	}
   327  	f.d += n
   328  	return n, err
   329  }
   330  
   331  func TestFakeReadCloser(t *testing.T) {
   332  	e := errors.New("")
   333  	f := &fakeReadCloser{
   334  		data:   []byte(readData),
   335  		counts: []int{1, 2, 3},
   336  		err:    e,
   337  	}
   338  	wants := []string{"0", "12", "345"}
   339  	buf := make([]byte, 10)
   340  	for i := 0; i < 3; i++ {
   341  		n, err := f.Read(buf)
   342  		if got, want := n, f.counts[i]; got != want {
   343  			t.Fatalf("i=%d: got %d, want %d", i, got, want)
   344  		}
   345  		var wantErr error
   346  		if i == 2 {
   347  			wantErr = e
   348  		}
   349  		if err != wantErr {
   350  			t.Fatalf("i=%d: got error %v, want %v", i, err, wantErr)
   351  		}
   352  		if got, want := string(buf[:n]), wants[i]; got != want {
   353  			t.Fatalf("i=%d: got %q, want %q", i, got, want)
   354  		}
   355  	}
   356  }
   357  
   358  func TestContentEncodingGzipWithReader(t *testing.T) {
   359  	bucketName := "my-bucket"
   360  	objectName := "gzip-test"
   361  	getAttrsURL := fmt.Sprintf("/b/%s/o/%s?alt=json&prettyPrint=false&projection=full", bucketName, objectName)
   362  	downloadObjectXMLurl := fmt.Sprintf("/%s/%s", bucketName, objectName)
   363  	downloadObjectJSONurl := fmt.Sprintf("/b/%s/o/%s?alt=media&prettyPrint=false&projection=full", bucketName, objectName)
   364  
   365  	original := bytes.Repeat([]byte("a"), 4<<10)
   366  	mockGCS := httptest.NewUnstartedServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
   367  		switch r.URL.String() {
   368  		case getAttrsURL:
   369  			fmt.Fprintf(w, `{
   370                              "bucket": "bucket", "name": "name", "contentEncoding": "gzip",
   371                              "contentLength": 43,
   372                              "contentType": "text/plain","timeCreated": "2020-04-10T16:08:58-07:00",
   373                              "updated": "2020-04-14T16:08:58-07:00"
   374                          }`)
   375  			return
   376  		case downloadObjectXMLurl, downloadObjectJSONurl:
   377  			// Serve back the file.
   378  			w.Header().Set("Content-Type", "text/plain")
   379  			w.Header().Set("Content-Encoding", "gzip")
   380  			w.Header().Set("Etag", `"c50e3e41c9bc9df34e84c94ce073f928"`)
   381  			w.Header().Set("X-Goog-Generation", "1587012235914578")
   382  			w.Header().Set("X-Goog-MetaGeneration", "2")
   383  			w.Header().Set("X-Goog-Stored-Content-Encoding", "gzip")
   384  			w.Header().Set("vary", "Accept-Encoding")
   385  			w.Header().Set("x-goog-stored-content-length", "43")
   386  			w.Header().Set("x-goog-hash", "crc32c=pYIWwQ==")
   387  			w.Header().Set("x-goog-hash", "md5=xQ4+Qcm8nfNOhMlM4HP5KA==")
   388  			w.Header().Set("x-goog-storage-class", "STANDARD")
   389  			gz := gzip.NewWriter(w)
   390  			gz.Write(original)
   391  			gz.Close()
   392  		default:
   393  			fmt.Fprintf(w, "unrecognized URL %s", r.URL)
   394  		}
   395  	}))
   396  	mockGCS.EnableHTTP2 = true
   397  	mockGCS.StartTLS()
   398  	defer mockGCS.Close()
   399  
   400  	ctx := context.Background()
   401  	hc := mockGCS.Client()
   402  	ux, _ := url.Parse(mockGCS.URL)
   403  	hc.Transport.(*http.Transport).TLSClientConfig.InsecureSkipVerify = true
   404  	wrt := &alwaysToTargetURLRoundTripper{
   405  		destURL: ux,
   406  		hc:      hc,
   407  	}
   408  
   409  	whc := &http.Client{Transport: wrt}
   410  
   411  	// 2. Different flavours of the read should all return the body.
   412  	readerCreators := []struct {
   413  		name   string
   414  		create func(ctx context.Context, obj *ObjectHandle) (*Reader, error)
   415  	}{
   416  		{
   417  			"NewReader", func(cxt context.Context, obj *ObjectHandle) (*Reader, error) {
   418  				return obj.NewReader(ctx)
   419  			},
   420  		},
   421  		{
   422  			"NewRangeReader(0, -1)",
   423  			func(ctx context.Context, obj *ObjectHandle) (*Reader, error) {
   424  				return obj.NewRangeReader(ctx, 0, -1)
   425  			},
   426  		},
   427  		{
   428  			"NewRangeReader(1kB, 2kB)",
   429  			func(ctx context.Context, obj *ObjectHandle) (*Reader, error) {
   430  				return obj.NewRangeReader(ctx, 1<<10, 2<<10)
   431  			},
   432  		},
   433  		{
   434  			"NewRangeReader(2kB, -1)",
   435  			func(ctx context.Context, obj *ObjectHandle) (*Reader, error) {
   436  				return obj.NewRangeReader(ctx, 2<<10, -1)
   437  			},
   438  		},
   439  		{
   440  			"NewRangeReader(2kB, 3kB)",
   441  			func(ctx context.Context, obj *ObjectHandle) (*Reader, error) {
   442  				return obj.NewRangeReader(ctx, 2<<10, 3<<10)
   443  			},
   444  		},
   445  	}
   446  
   447  	multiReaderTest(ctx, t, func(t *testing.T, c *Client) {
   448  		for _, tt := range readerCreators {
   449  			t.Run(tt.name, func(t *testing.T) {
   450  				obj := c.Bucket(bucketName).Object(objectName)
   451  				_, err := obj.Attrs(ctx)
   452  				if err != nil {
   453  					t.Fatal(err)
   454  				}
   455  				rd, err := tt.create(ctx, obj)
   456  				if err != nil {
   457  					t.Fatal(err)
   458  				}
   459  				defer rd.Close()
   460  
   461  				got, err := ioutil.ReadAll(rd)
   462  				if err != nil {
   463  					t.Fatal(err)
   464  				}
   465  				if g, w := got, original; !bytes.Equal(g, w) {
   466  					t.Fatalf("Response mismatch\nGot:\n%q\n\nWant:\n%q", g, w)
   467  				}
   468  			})
   469  		}
   470  	}, option.WithEndpoint(mockGCS.URL), option.WithoutAuthentication(), option.WithHTTPClient(whc))
   471  }
   472  
   473  // alwaysToTargetURLRoundTripper ensures that every single request
   474  // is routed to a target destination. Some requests within the storage
   475  // client by-pass using the provided HTTP client, hence this enforcemenet.
   476  type alwaysToTargetURLRoundTripper struct {
   477  	destURL *url.URL
   478  	hc      *http.Client
   479  }
   480  
   481  func (adrt *alwaysToTargetURLRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
   482  	req.URL.Host = adrt.destURL.Host
   483  	// Cloud Storage has full control over the response headers for their
   484  	// HTTP server but unfortunately we don't, so we have to prune
   485  	// the Range header to mimick GCS ignoring Range header:
   486  	// https://cloud.google.com/storage/docs/transcoding#range
   487  	delete(req.Header, "Range")
   488  	return adrt.hc.Do(req)
   489  }
   490  
   491  // multiTransportTest initializes fresh clients for each transport, then runs
   492  // given testing function using each transport-specific client, supplying the
   493  // test function with the sub-test instance, the context it was given, the name
   494  // of an existing bucket to use, a bucket name to use for bucket creation, and
   495  // the client to use.
   496  func multiReaderTest(ctx context.Context, t *testing.T, test func(*testing.T, *Client), opts ...option.ClientOption) {
   497  	jsonOpts := append(opts, WithJSONReads())
   498  	xmlOpts := append(opts, WithXMLReads())
   499  	jsonClient, err := NewClient(ctx, jsonOpts...)
   500  	if err != nil {
   501  		t.Fatal(err)
   502  	}
   503  	xmlClient, err := NewClient(ctx, xmlOpts...)
   504  	if err != nil {
   505  		t.Fatal(err)
   506  	}
   507  
   508  	clients := map[string]*Client{
   509  		"xmlReads":  xmlClient,
   510  		"jsonReads": jsonClient,
   511  	}
   512  
   513  	for transport, client := range clients {
   514  		t.Run(transport, func(t *testing.T) {
   515  			defer client.Close()
   516  
   517  			test(t, client)
   518  		})
   519  	}
   520  }
   521  

View as plain text