...

Source file src/google.golang.org/api/internal/gensupport/resumable_test.go

Documentation: google.golang.org/api/internal/gensupport

     1  // Copyright 2016 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package gensupport
     6  
     7  import (
     8  	"context"
     9  	"fmt"
    10  	"io"
    11  	"net/http"
    12  	"reflect"
    13  	"strings"
    14  	"testing"
    15  	"time"
    16  )
    17  
    18  type unexpectedReader struct{}
    19  
    20  func (unexpectedReader) Read([]byte) (int, error) {
    21  	return 0, fmt.Errorf("unexpected read in test")
    22  }
    23  
    24  // event is an expected request/response pair
    25  type event struct {
    26  	// the byte range header that should be present in a request.
    27  	byteRange string
    28  	// the http status code to send in response.
    29  	responseStatus int
    30  }
    31  
    32  // interruptibleTransport is configured with a canned set of requests/responses.
    33  // It records the incoming data, unless the corresponding event is configured to return
    34  // http.StatusServiceUnavailable.
    35  type interruptibleTransport struct {
    36  	events []event
    37  	buf    []byte
    38  	bodies bodyTracker
    39  }
    40  
    41  // bodyTracker keeps track of response bodies that have not been closed.
    42  type bodyTracker map[io.ReadCloser]struct{}
    43  
    44  func (bt bodyTracker) Add(body io.ReadCloser) {
    45  	bt[body] = struct{}{}
    46  }
    47  
    48  func (bt bodyTracker) Close(body io.ReadCloser) {
    49  	delete(bt, body)
    50  }
    51  
    52  type trackingCloser struct {
    53  	io.Reader
    54  	tracker bodyTracker
    55  }
    56  
    57  func (tc *trackingCloser) Close() error {
    58  	tc.tracker.Close(tc)
    59  	return nil
    60  }
    61  
    62  func (tc *trackingCloser) Open() {
    63  	tc.tracker.Add(tc)
    64  }
    65  
    66  func (t *interruptibleTransport) RoundTrip(req *http.Request) (*http.Response, error) {
    67  	if len(t.events) == 0 {
    68  		panic("ran out of events, but got a request")
    69  	}
    70  	ev := t.events[0]
    71  	t.events = t.events[1:]
    72  	if got, want := req.Header.Get("Content-Range"), ev.byteRange; got != want {
    73  		return nil, fmt.Errorf("byte range: got %s; want %s", got, want)
    74  	}
    75  
    76  	if ev.responseStatus != http.StatusServiceUnavailable {
    77  		buf, err := io.ReadAll(req.Body)
    78  		if err != nil {
    79  			return nil, fmt.Errorf("error reading from request data: %v", err)
    80  		}
    81  		t.buf = append(t.buf, buf...)
    82  	}
    83  
    84  	tc := &trackingCloser{unexpectedReader{}, t.bodies}
    85  	tc.Open()
    86  	h := http.Header{}
    87  	status := ev.responseStatus
    88  
    89  	// Support "X-GUploader-No-308" like Google:
    90  	if status == 308 && req.Header.Get("X-GUploader-No-308") == "yes" {
    91  		status = 200
    92  		h.Set("X-Http-Status-Code-Override", "308")
    93  	}
    94  
    95  	res := &http.Response{
    96  		StatusCode: status,
    97  		Header:     h,
    98  		Body:       tc,
    99  	}
   100  	return res, nil
   101  }
   102  
   103  // progressRecorder records updates, and calls f for every invocation of ProgressUpdate.
   104  type progressRecorder struct {
   105  	updates []int64
   106  	f       func()
   107  }
   108  
   109  func (pr *progressRecorder) ProgressUpdate(current int64) {
   110  	pr.updates = append(pr.updates, current)
   111  	if pr.f != nil {
   112  		pr.f()
   113  	}
   114  }
   115  
   116  func TestInterruptedTransferChunks(t *testing.T) {
   117  	type testCase struct {
   118  		name         string
   119  		data         string
   120  		chunkSize    int
   121  		events       []event
   122  		wantProgress []int64
   123  	}
   124  
   125  	for _, tc := range []testCase{
   126  		{
   127  			name:      "large",
   128  			data:      strings.Repeat("a", 300),
   129  			chunkSize: 90,
   130  			events: []event{
   131  				{"bytes 0-89/*", http.StatusServiceUnavailable},
   132  				{"bytes 0-89/*", 308},
   133  				{"bytes 90-179/*", 308},
   134  				{"bytes 180-269/*", http.StatusServiceUnavailable},
   135  				{"bytes 180-269/*", 308},
   136  				{"bytes 270-299/300", 200},
   137  			},
   138  			wantProgress: []int64{90, 180, 270, 300},
   139  		},
   140  		{
   141  			name:      "small",
   142  			data:      strings.Repeat("a", 20),
   143  			chunkSize: 10,
   144  			events: []event{
   145  				{"bytes 0-9/*", http.StatusServiceUnavailable},
   146  				{"bytes 0-9/*", 308},
   147  				{"bytes 10-19/*", http.StatusServiceUnavailable},
   148  				{"bytes 10-19/*", 308},
   149  				// 0 byte final request demands a byte range with leading asterix.
   150  				{"bytes */20", http.StatusServiceUnavailable},
   151  				{"bytes */20", 200},
   152  			},
   153  			wantProgress: []int64{10, 20},
   154  		},
   155  	} {
   156  		t.Run(tc.name, func(t *testing.T) {
   157  			media := strings.NewReader(tc.data)
   158  
   159  			tr := &interruptibleTransport{
   160  				buf:    make([]byte, 0, len(tc.data)),
   161  				events: tc.events,
   162  				bodies: bodyTracker{},
   163  			}
   164  
   165  			pr := progressRecorder{}
   166  			rx := &ResumableUpload{
   167  				Client:    &http.Client{Transport: tr},
   168  				Media:     NewMediaBuffer(media, tc.chunkSize),
   169  				MediaType: "text/plain",
   170  				Callback:  pr.ProgressUpdate,
   171  			}
   172  
   173  			oldBackoff := backoff
   174  			backoff = func() Backoff { return new(NoPauseBackoff) }
   175  			defer func() { backoff = oldBackoff }()
   176  
   177  			res, err := rx.Upload(context.Background())
   178  			if err == nil {
   179  				res.Body.Close()
   180  			}
   181  			if err != nil || res == nil || res.StatusCode != http.StatusOK {
   182  				if res == nil {
   183  					t.Fatalf("Upload not successful, res=nil: %v", err)
   184  				} else {
   185  					t.Fatalf("Upload not successful, statusCode=%v, err=%v", res.StatusCode, err)
   186  				}
   187  			}
   188  			if !reflect.DeepEqual(tr.buf, []byte(tc.data)) {
   189  				t.Fatalf("transferred contents:\ngot %s\nwant %s", tr.buf, tc.data)
   190  			}
   191  
   192  			if !reflect.DeepEqual(pr.updates, tc.wantProgress) {
   193  				t.Fatalf("progress updates: got %v, want %v", pr.updates, tc.wantProgress)
   194  			}
   195  
   196  			if len(tr.events) > 0 {
   197  				t.Fatalf("did not observe all expected events.  leftover events: %v", tr.events)
   198  			}
   199  			if len(tr.bodies) > 0 {
   200  				t.Errorf("unclosed request bodies: %v", tr.bodies)
   201  			}
   202  		})
   203  	}
   204  }
   205  
   206  func TestCancelUploadFast(t *testing.T) {
   207  	const (
   208  		chunkSize = 90
   209  		mediaSize = 300
   210  	)
   211  	media := strings.NewReader(strings.Repeat("a", mediaSize))
   212  
   213  	tr := &interruptibleTransport{
   214  		buf: make([]byte, 0, mediaSize),
   215  	}
   216  
   217  	pr := progressRecorder{}
   218  	rx := &ResumableUpload{
   219  		Client:    &http.Client{Transport: tr},
   220  		Media:     NewMediaBuffer(media, chunkSize),
   221  		MediaType: "text/plain",
   222  		Callback:  pr.ProgressUpdate,
   223  	}
   224  
   225  	oldBackoff := backoff
   226  	backoff = func() Backoff { return new(NoPauseBackoff) }
   227  	defer func() { backoff = oldBackoff }()
   228  
   229  	ctx, cancelFunc := context.WithCancel(context.Background())
   230  	cancelFunc() // stop the upload that hasn't started yet
   231  	res, err := rx.Upload(ctx)
   232  	if err != context.Canceled {
   233  		t.Fatalf("Upload err: got: %v; want: context cancelled", err)
   234  	}
   235  	if res != nil {
   236  		t.Fatalf("Upload result: got: %v; want: nil", res)
   237  	}
   238  	if pr.updates != nil {
   239  		t.Errorf("progress updates: got %v; want: nil", pr.updates)
   240  	}
   241  }
   242  
   243  func TestCancelUploadBasic(t *testing.T) {
   244  	const (
   245  		chunkSize = 90
   246  		mediaSize = 300
   247  	)
   248  	media := strings.NewReader(strings.Repeat("a", mediaSize))
   249  
   250  	tr := &interruptibleTransport{
   251  		buf: make([]byte, 0, mediaSize),
   252  		events: []event{
   253  			{"bytes 0-89/*", http.StatusServiceUnavailable},
   254  			{"bytes 0-89/*", 308},
   255  			{"bytes 90-179/*", 308},
   256  			{"bytes 180-269/*", 308}, // Upload should be cancelled before this event.
   257  		},
   258  		bodies: bodyTracker{},
   259  	}
   260  
   261  	ctx, cancelFunc := context.WithCancel(context.Background())
   262  	numUpdates := 0
   263  
   264  	pr := progressRecorder{f: func() {
   265  		numUpdates++
   266  		if numUpdates >= 2 {
   267  			cancelFunc()
   268  		}
   269  	}}
   270  
   271  	rx := &ResumableUpload{
   272  		Client:    &http.Client{Transport: tr},
   273  		Media:     NewMediaBuffer(media, chunkSize),
   274  		MediaType: "text/plain",
   275  		Callback:  pr.ProgressUpdate,
   276  	}
   277  
   278  	oldBackoff := backoff
   279  	backoff = func() Backoff { return new(PauseOneSecond) }
   280  	defer func() { backoff = oldBackoff }()
   281  
   282  	res, err := rx.Upload(ctx)
   283  	if err != context.Canceled {
   284  		t.Fatalf("Upload err: got: %v; want: context cancelled", err)
   285  	}
   286  	if res != nil {
   287  		t.Fatalf("Upload result: got: %v; want: nil", res)
   288  	}
   289  	if got, want := tr.buf, []byte(strings.Repeat("a", chunkSize*2)); !reflect.DeepEqual(got, want) {
   290  		t.Fatalf("transferred contents:\ngot %s\nwant %s", got, want)
   291  	}
   292  	if got, want := pr.updates, []int64{chunkSize, chunkSize * 2}; !reflect.DeepEqual(got, want) {
   293  		t.Fatalf("progress updates: got %v; want: %v", got, want)
   294  	}
   295  	if len(tr.bodies) > 0 {
   296  		t.Errorf("unclosed request bodies: %v", tr.bodies)
   297  	}
   298  }
   299  
   300  func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) {
   301  	const (
   302  		chunkSize = 90
   303  		mediaSize = 300
   304  	)
   305  	media := strings.NewReader(strings.Repeat("a", mediaSize))
   306  
   307  	// This transport returns multiple errors on both the first chunk and third
   308  	// chunk of the upload. If the timeout were not reset between chunks, the
   309  	// errors on the third chunk would not retry and cause a failure.
   310  	tr := &interruptibleTransport{
   311  		buf: make([]byte, 0, mediaSize),
   312  		events: []event{
   313  			{"bytes 0-89/*", http.StatusServiceUnavailable},
   314  			// cum: 1s sleep
   315  			{"bytes 0-89/*", http.StatusServiceUnavailable},
   316  			// cum: 2s sleep
   317  			{"bytes 0-89/*", http.StatusServiceUnavailable},
   318  			// cum: 3s sleep
   319  			{"bytes 0-89/*", http.StatusServiceUnavailable},
   320  			// cum: 4s sleep
   321  			{"bytes 0-89/*", 308},
   322  			// cum: 1s sleep <-- resets because it's a new chunk
   323  			{"bytes 90-179/*", 308},
   324  			// cum: 1s sleep <-- resets because it's a new chunk
   325  			{"bytes 180-269/*", http.StatusServiceUnavailable},
   326  			// cum: 1s sleep on later chunk
   327  			{"bytes 180-269/*", http.StatusServiceUnavailable},
   328  			// cum: 2s sleep on later chunk
   329  			{"bytes 180-269/*", 308},
   330  			// cum: 3s sleep <-- resets because it's a new chunk
   331  			{"bytes 270-299/300", 200},
   332  		},
   333  		bodies: bodyTracker{},
   334  	}
   335  
   336  	rx := &ResumableUpload{
   337  		Client:             &http.Client{Transport: tr},
   338  		Media:              NewMediaBuffer(media, chunkSize),
   339  		MediaType:          "text/plain",
   340  		Callback:           func(int64) {},
   341  		ChunkRetryDeadline: 5 * time.Second,
   342  	}
   343  
   344  	oldBackoff := backoff
   345  	backoff = func() Backoff { return new(PauseOneSecond) }
   346  	defer func() { backoff = oldBackoff }()
   347  
   348  	resCode := make(chan int, 1)
   349  	go func() {
   350  		resp, err := rx.Upload(context.Background())
   351  		if err != nil {
   352  			t.Error(err)
   353  			return
   354  		}
   355  		resCode <- resp.StatusCode
   356  	}()
   357  
   358  	select {
   359  	case <-time.After(15 * time.Second):
   360  		t.Fatal("timed out waiting for Upload to complete")
   361  	case got := <-resCode:
   362  		if want := http.StatusOK; got != want {
   363  			t.Fatalf("want %d, got %d", want, got)
   364  		}
   365  	}
   366  }
   367  

View as plain text