1
2
3
4
5 package gensupport
6
7 import (
8 "context"
9 "fmt"
10 "io"
11 "net/http"
12 "reflect"
13 "strings"
14 "testing"
15 "time"
16 )
17
18 type unexpectedReader struct{}
19
20 func (unexpectedReader) Read([]byte) (int, error) {
21 return 0, fmt.Errorf("unexpected read in test")
22 }
23
24
25 type event struct {
26
27 byteRange string
28
29 responseStatus int
30 }
31
32
33
34
35 type interruptibleTransport struct {
36 events []event
37 buf []byte
38 bodies bodyTracker
39 }
40
41
42 type bodyTracker map[io.ReadCloser]struct{}
43
44 func (bt bodyTracker) Add(body io.ReadCloser) {
45 bt[body] = struct{}{}
46 }
47
48 func (bt bodyTracker) Close(body io.ReadCloser) {
49 delete(bt, body)
50 }
51
52 type trackingCloser struct {
53 io.Reader
54 tracker bodyTracker
55 }
56
57 func (tc *trackingCloser) Close() error {
58 tc.tracker.Close(tc)
59 return nil
60 }
61
62 func (tc *trackingCloser) Open() {
63 tc.tracker.Add(tc)
64 }
65
66 func (t *interruptibleTransport) RoundTrip(req *http.Request) (*http.Response, error) {
67 if len(t.events) == 0 {
68 panic("ran out of events, but got a request")
69 }
70 ev := t.events[0]
71 t.events = t.events[1:]
72 if got, want := req.Header.Get("Content-Range"), ev.byteRange; got != want {
73 return nil, fmt.Errorf("byte range: got %s; want %s", got, want)
74 }
75
76 if ev.responseStatus != http.StatusServiceUnavailable {
77 buf, err := io.ReadAll(req.Body)
78 if err != nil {
79 return nil, fmt.Errorf("error reading from request data: %v", err)
80 }
81 t.buf = append(t.buf, buf...)
82 }
83
84 tc := &trackingCloser{unexpectedReader{}, t.bodies}
85 tc.Open()
86 h := http.Header{}
87 status := ev.responseStatus
88
89
90 if status == 308 && req.Header.Get("X-GUploader-No-308") == "yes" {
91 status = 200
92 h.Set("X-Http-Status-Code-Override", "308")
93 }
94
95 res := &http.Response{
96 StatusCode: status,
97 Header: h,
98 Body: tc,
99 }
100 return res, nil
101 }
102
103
104 type progressRecorder struct {
105 updates []int64
106 f func()
107 }
108
109 func (pr *progressRecorder) ProgressUpdate(current int64) {
110 pr.updates = append(pr.updates, current)
111 if pr.f != nil {
112 pr.f()
113 }
114 }
115
116 func TestInterruptedTransferChunks(t *testing.T) {
117 type testCase struct {
118 name string
119 data string
120 chunkSize int
121 events []event
122 wantProgress []int64
123 }
124
125 for _, tc := range []testCase{
126 {
127 name: "large",
128 data: strings.Repeat("a", 300),
129 chunkSize: 90,
130 events: []event{
131 {"bytes 0-89/*", http.StatusServiceUnavailable},
132 {"bytes 0-89/*", 308},
133 {"bytes 90-179/*", 308},
134 {"bytes 180-269/*", http.StatusServiceUnavailable},
135 {"bytes 180-269/*", 308},
136 {"bytes 270-299/300", 200},
137 },
138 wantProgress: []int64{90, 180, 270, 300},
139 },
140 {
141 name: "small",
142 data: strings.Repeat("a", 20),
143 chunkSize: 10,
144 events: []event{
145 {"bytes 0-9/*", http.StatusServiceUnavailable},
146 {"bytes 0-9/*", 308},
147 {"bytes 10-19/*", http.StatusServiceUnavailable},
148 {"bytes 10-19/*", 308},
149
150 {"bytes */20", http.StatusServiceUnavailable},
151 {"bytes */20", 200},
152 },
153 wantProgress: []int64{10, 20},
154 },
155 } {
156 t.Run(tc.name, func(t *testing.T) {
157 media := strings.NewReader(tc.data)
158
159 tr := &interruptibleTransport{
160 buf: make([]byte, 0, len(tc.data)),
161 events: tc.events,
162 bodies: bodyTracker{},
163 }
164
165 pr := progressRecorder{}
166 rx := &ResumableUpload{
167 Client: &http.Client{Transport: tr},
168 Media: NewMediaBuffer(media, tc.chunkSize),
169 MediaType: "text/plain",
170 Callback: pr.ProgressUpdate,
171 }
172
173 oldBackoff := backoff
174 backoff = func() Backoff { return new(NoPauseBackoff) }
175 defer func() { backoff = oldBackoff }()
176
177 res, err := rx.Upload(context.Background())
178 if err == nil {
179 res.Body.Close()
180 }
181 if err != nil || res == nil || res.StatusCode != http.StatusOK {
182 if res == nil {
183 t.Fatalf("Upload not successful, res=nil: %v", err)
184 } else {
185 t.Fatalf("Upload not successful, statusCode=%v, err=%v", res.StatusCode, err)
186 }
187 }
188 if !reflect.DeepEqual(tr.buf, []byte(tc.data)) {
189 t.Fatalf("transferred contents:\ngot %s\nwant %s", tr.buf, tc.data)
190 }
191
192 if !reflect.DeepEqual(pr.updates, tc.wantProgress) {
193 t.Fatalf("progress updates: got %v, want %v", pr.updates, tc.wantProgress)
194 }
195
196 if len(tr.events) > 0 {
197 t.Fatalf("did not observe all expected events. leftover events: %v", tr.events)
198 }
199 if len(tr.bodies) > 0 {
200 t.Errorf("unclosed request bodies: %v", tr.bodies)
201 }
202 })
203 }
204 }
205
206 func TestCancelUploadFast(t *testing.T) {
207 const (
208 chunkSize = 90
209 mediaSize = 300
210 )
211 media := strings.NewReader(strings.Repeat("a", mediaSize))
212
213 tr := &interruptibleTransport{
214 buf: make([]byte, 0, mediaSize),
215 }
216
217 pr := progressRecorder{}
218 rx := &ResumableUpload{
219 Client: &http.Client{Transport: tr},
220 Media: NewMediaBuffer(media, chunkSize),
221 MediaType: "text/plain",
222 Callback: pr.ProgressUpdate,
223 }
224
225 oldBackoff := backoff
226 backoff = func() Backoff { return new(NoPauseBackoff) }
227 defer func() { backoff = oldBackoff }()
228
229 ctx, cancelFunc := context.WithCancel(context.Background())
230 cancelFunc()
231 res, err := rx.Upload(ctx)
232 if err != context.Canceled {
233 t.Fatalf("Upload err: got: %v; want: context cancelled", err)
234 }
235 if res != nil {
236 t.Fatalf("Upload result: got: %v; want: nil", res)
237 }
238 if pr.updates != nil {
239 t.Errorf("progress updates: got %v; want: nil", pr.updates)
240 }
241 }
242
243 func TestCancelUploadBasic(t *testing.T) {
244 const (
245 chunkSize = 90
246 mediaSize = 300
247 )
248 media := strings.NewReader(strings.Repeat("a", mediaSize))
249
250 tr := &interruptibleTransport{
251 buf: make([]byte, 0, mediaSize),
252 events: []event{
253 {"bytes 0-89/*", http.StatusServiceUnavailable},
254 {"bytes 0-89/*", 308},
255 {"bytes 90-179/*", 308},
256 {"bytes 180-269/*", 308},
257 },
258 bodies: bodyTracker{},
259 }
260
261 ctx, cancelFunc := context.WithCancel(context.Background())
262 numUpdates := 0
263
264 pr := progressRecorder{f: func() {
265 numUpdates++
266 if numUpdates >= 2 {
267 cancelFunc()
268 }
269 }}
270
271 rx := &ResumableUpload{
272 Client: &http.Client{Transport: tr},
273 Media: NewMediaBuffer(media, chunkSize),
274 MediaType: "text/plain",
275 Callback: pr.ProgressUpdate,
276 }
277
278 oldBackoff := backoff
279 backoff = func() Backoff { return new(PauseOneSecond) }
280 defer func() { backoff = oldBackoff }()
281
282 res, err := rx.Upload(ctx)
283 if err != context.Canceled {
284 t.Fatalf("Upload err: got: %v; want: context cancelled", err)
285 }
286 if res != nil {
287 t.Fatalf("Upload result: got: %v; want: nil", res)
288 }
289 if got, want := tr.buf, []byte(strings.Repeat("a", chunkSize*2)); !reflect.DeepEqual(got, want) {
290 t.Fatalf("transferred contents:\ngot %s\nwant %s", got, want)
291 }
292 if got, want := pr.updates, []int64{chunkSize, chunkSize * 2}; !reflect.DeepEqual(got, want) {
293 t.Fatalf("progress updates: got %v; want: %v", got, want)
294 }
295 if len(tr.bodies) > 0 {
296 t.Errorf("unclosed request bodies: %v", tr.bodies)
297 }
298 }
299
300 func TestRetry_EachChunkHasItsOwnRetryDeadline(t *testing.T) {
301 const (
302 chunkSize = 90
303 mediaSize = 300
304 )
305 media := strings.NewReader(strings.Repeat("a", mediaSize))
306
307
308
309
310 tr := &interruptibleTransport{
311 buf: make([]byte, 0, mediaSize),
312 events: []event{
313 {"bytes 0-89/*", http.StatusServiceUnavailable},
314
315 {"bytes 0-89/*", http.StatusServiceUnavailable},
316
317 {"bytes 0-89/*", http.StatusServiceUnavailable},
318
319 {"bytes 0-89/*", http.StatusServiceUnavailable},
320
321 {"bytes 0-89/*", 308},
322
323 {"bytes 90-179/*", 308},
324
325 {"bytes 180-269/*", http.StatusServiceUnavailable},
326
327 {"bytes 180-269/*", http.StatusServiceUnavailable},
328
329 {"bytes 180-269/*", 308},
330
331 {"bytes 270-299/300", 200},
332 },
333 bodies: bodyTracker{},
334 }
335
336 rx := &ResumableUpload{
337 Client: &http.Client{Transport: tr},
338 Media: NewMediaBuffer(media, chunkSize),
339 MediaType: "text/plain",
340 Callback: func(int64) {},
341 ChunkRetryDeadline: 5 * time.Second,
342 }
343
344 oldBackoff := backoff
345 backoff = func() Backoff { return new(PauseOneSecond) }
346 defer func() { backoff = oldBackoff }()
347
348 resCode := make(chan int, 1)
349 go func() {
350 resp, err := rx.Upload(context.Background())
351 if err != nil {
352 t.Error(err)
353 return
354 }
355 resCode <- resp.StatusCode
356 }()
357
358 select {
359 case <-time.After(15 * time.Second):
360 t.Fatal("timed out waiting for Upload to complete")
361 case got := <-resCode:
362 if want := http.StatusOK; got != want {
363 t.Fatalf("want %d, got %d", want, got)
364 }
365 }
366 }
367
View as plain text