1
2
3
4
5
6
7
8
9
10
11
12
13
14 package storage_test
15
16 import (
17 "context"
18 "errors"
19 "fmt"
20 "io/ioutil"
21 "net/http"
22 "net/http/httptest"
23 "strings"
24 "sync/atomic"
25 "testing"
26 "time"
27
28 "golang.org/x/oauth2"
29
30 "cloud.google.com/go/storage"
31 "google.golang.org/api/googleapi"
32 "google.golang.org/api/option"
33 )
34
35 func TestIndefiniteRetries(t *testing.T) {
36 uploadRoute := "/upload"
37
38 var resumableUploadIDs atomic.Value
39 resumableUploadIDs.Store(make(map[string]time.Time))
40
41 lookupUploadID := func(resumableUploadID string) bool {
42 _, ok := resumableUploadIDs.Load().(map[string]time.Time)[resumableUploadID]
43 return ok
44 }
45
46 memoizeUploadID := func(resumableUploadID string) {
47 resumableUploadIDs.Load().(map[string]time.Time)[resumableUploadID] = time.Now().UTC()
48 }
49
50 cst := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
51 resumableUploadID := r.URL.Query().Get("upload_id")
52 path := r.URL.Path
53
54 switch {
55 case path == "/b":
56 w.Write([]byte(`{"kind":"storage#bucket","id":"bucket","name":"bucket"}`))
57 return
58
59 case (strings.HasPrefix(path, "/b/") || strings.HasPrefix(path, "/upload/storage/v1/b/")) && strings.HasSuffix(path, "/o"):
60 if resumableUploadID == "" {
61 uploadID := time.Now().Format(time.RFC3339Nano)
62 w.Header().Set("X-GUploader-UploadID", uploadID)
63
64 w.Header().Set("Location", fmt.Sprintf("http://%s?upload_id=%s", r.Host+uploadRoute, uploadID))
65 } else {
66 w.Write([]byte(`{"kind":"storage#object","bucket":"bucket","name":"bucket"}`))
67 }
68 return
69
70 case path == uploadRoute:
71 start, completedUpload, spamThem := parseContentRange(r.Header)
72
73 if resumableUploadID != "" {
74 if !lookupUploadID(resumableUploadID) {
75 if start == "0" {
76
77
78 memoizeUploadID(resumableUploadID)
79 } else {
80
81
82
83
84 errStr := fmt.Sprintf("mismatched_content_start (Invalid request. According to the Content-Range header,"+
85 "the upload offset is %s byte(s), which exceeds already uploaded size of 0 byte(s).)\n%s", start, r.Header["Content-Range"])
86 http.Error(w, errStr, http.StatusServiceUnavailable)
87 return
88 }
89 }
90 }
91 if spamThem {
92
93
94 w.WriteHeader(http.StatusTooManyRequests)
95 return
96 }
97 if completedUpload {
98
99 return
100 }
101
102
103 ioutil.ReadAll(r.Body)
104 w.Header().Set("X-Http-Status-Code-Override", "308")
105 return
106
107 default:
108 http.Error(w, "Unimplemented", http.StatusNotFound)
109 return
110 }
111 }))
112 defer cst.Close()
113
114 hc := &http.Client{
115 Transport: &oauth2.Transport{
116 Source: new(tokenSupplier),
117 },
118 }
119
120 ctx, cancel := context.WithCancel(context.Background())
121 defer cancel()
122
123 opts := []option.ClientOption{option.WithHTTPClient(hc), option.WithEndpoint(cst.URL)}
124
125 sc, err := storage.NewClient(ctx, opts...)
126 if err != nil {
127 t.Fatalf("Failed to create storage client: %v", err)
128 }
129 defer sc.Close()
130
131 obj := sc.Bucket("issue-1507").Object("object")
132 w := obj.NewWriter(ctx)
133
134 maxFileSize := 1 << 20
135 w.ChunkSize = maxFileSize / 4
136
137
138 w.ChunkRetryDeadline = time.Second
139
140 for i := 0; i < maxFileSize; {
141 nowStr := time.Now().Format(time.RFC3339Nano)
142 n, err := fmt.Fprintf(w, "%s", nowStr)
143 if err != nil {
144 t.Fatalf("Failed to write to object: %v", err)
145 }
146 i += n
147 }
148
149 closeDone := make(chan error, 1)
150 go func() {
151
152 closeDone <- w.Close()
153 }()
154
155
156
157 maxWait := 10 * time.Second
158 select {
159 case <-time.After(maxWait):
160 t.Fatalf("Test took longer than %s to return", maxWait)
161 case err := <-closeDone:
162 var ge *googleapi.Error
163 if !errors.As(err, &ge) {
164 t.Fatalf("Got error (%v) of type %T, expected *googleapi.Error", err, err)
165 }
166 if ge.Code != http.StatusTooManyRequests {
167 t.Fatalf("Got unexpected error: %#v\nWant statusCode of %d", ge, http.StatusTooManyRequests)
168 }
169 }
170 }
171
172 type tokenSupplier int
173
174 func (ts *tokenSupplier) Token() (*oauth2.Token, error) {
175 return &oauth2.Token{
176 AccessToken: "access-token",
177 TokenType: "Bearer",
178 RefreshToken: "refresh-token",
179 Expiry: time.Now().Add(time.Hour),
180 }, nil
181 }
182
183 func parseContentRange(hdr http.Header) (start string, completed, spamThem bool) {
184 cRange := strings.TrimPrefix(hdr.Get("Content-Range"), "bytes ")
185 rangeSplits := strings.Split(cRange, "/")
186 prelude := rangeSplits[0]
187
188 if rangeSplits[1] != "*" {
189
190
191 spamThem = true
192 }
193 if len(prelude) == 0 || prelude == "*" {
194
195 completed = true
196 return
197 }
198 startEndSplit := strings.Split(prelude, "-")
199 start = startEndSplit[0]
200 return
201 }
202
View as plain text