1
2
3
4
5 package gensupport
6
7 import (
8 "context"
9 "errors"
10 "fmt"
11 "io"
12 "net/http"
13 "strings"
14 "sync"
15 "time"
16
17 "github.com/google/uuid"
18 "google.golang.org/api/internal"
19 )
20
21
22
23 type ResumableUpload struct {
24 Client *http.Client
25
26 URI string
27 UserAgent string
28
29 Media *MediaBuffer
30
31 MediaType string
32
33 mu sync.Mutex
34 progress int64
35
36
37 Callback func(int64)
38
39
40 Retry *RetryConfig
41
42
43
44 ChunkRetryDeadline time.Duration
45
46
47
48 invocationID string
49 attempts int
50 }
51
52
53 func (rx *ResumableUpload) Progress() int64 {
54 rx.mu.Lock()
55 defer rx.mu.Unlock()
56 return rx.progress
57 }
58
59
60
61
62
63 func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) {
64 req, err := http.NewRequest("POST", rx.URI, data)
65 if err != nil {
66 return nil, err
67 }
68
69 req.ContentLength = size
70 var contentRange string
71 if final {
72 if size == 0 {
73 contentRange = fmt.Sprintf("bytes */%v", off)
74 } else {
75 contentRange = fmt.Sprintf("bytes %v-%v/%v", off, off+size-1, off+size)
76 }
77 } else {
78 contentRange = fmt.Sprintf("bytes %v-%v/*", off, off+size-1)
79 }
80 req.Header.Set("Content-Range", contentRange)
81 req.Header.Set("Content-Type", rx.MediaType)
82 req.Header.Set("User-Agent", rx.UserAgent)
83
84
85
86 baseXGoogHeader := "gl-go/" + GoVersion() + " gdcl/" + internal.Version
87 invocationHeader := fmt.Sprintf("gccl-invocation-id/%s gccl-attempt-count/%d", rx.invocationID, rx.attempts)
88 req.Header.Set("X-Goog-Api-Client", strings.Join([]string{baseXGoogHeader, invocationHeader}, " "))
89
90
91 req.Header.Set("X-Goog-Gcs-Idempotency-Token", rx.invocationID)
92
93
94
95
96
97
98
99
100 req.Header.Set("X-GUploader-No-308", "yes")
101
102 return SendRequest(ctx, rx.Client, req)
103 }
104
105 func statusResumeIncomplete(resp *http.Response) bool {
106
107
108 return resp != nil && resp.Header.Get("X-Http-Status-Code-Override") == "308"
109 }
110
111
112
113 func (rx *ResumableUpload) reportProgress(old, updated int64) {
114 if updated-old == 0 {
115 return
116 }
117 rx.mu.Lock()
118 rx.progress = updated
119 rx.mu.Unlock()
120 if rx.Callback != nil {
121 rx.Callback(updated)
122 }
123 }
124
125
126 func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) {
127 chunk, off, size, err := rx.Media.Chunk()
128
129 done := err == io.EOF
130 if !done && err != nil {
131 return nil, err
132 }
133
134 res, err := rx.doUploadRequest(ctx, chunk, off, int64(size), done)
135 if err != nil {
136 return res, err
137 }
138
139
140
141 if res.StatusCode == 308 {
142 return nil, errors.New("unexpected 308 response status code")
143 }
144
145 if res.StatusCode == http.StatusOK {
146 rx.reportProgress(off, off+int64(size))
147 }
148
149 if statusResumeIncomplete(res) {
150 rx.Media.Next()
151 }
152 return res, nil
153 }
154
155
156
157
158
159
160
161
162
163 func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
164
165
166
167
168
169 var prepareReturn = func(resp *http.Response, err error) (*http.Response, error) {
170 if err != nil {
171 if resp != nil && resp.Body != nil {
172 resp.Body.Close()
173 }
174 return nil, err
175 }
176
177
178
179 if resp == nil {
180 return nil, fmt.Errorf("upload request to %v not sent, choose larger value for ChunkRetryDealine", rx.URI)
181 }
182 return resp, nil
183 }
184
185 errorFunc := rx.Retry.errorFunc()
186
187
188 var retryDeadline time.Duration
189 if rx.ChunkRetryDeadline != 0 {
190 retryDeadline = rx.ChunkRetryDeadline
191 } else {
192 retryDeadline = defaultRetryDeadline
193 }
194
195
196 for {
197 var pause time.Duration
198
199
200 bo := rx.Retry.backoff()
201 quitAfterTimer := time.NewTimer(retryDeadline)
202 rx.attempts = 1
203 rx.invocationID = uuid.New().String()
204
205
206 for {
207 pauseTimer := time.NewTimer(pause)
208 select {
209 case <-ctx.Done():
210 quitAfterTimer.Stop()
211 pauseTimer.Stop()
212 if err == nil {
213 err = ctx.Err()
214 }
215 return prepareReturn(resp, err)
216 case <-pauseTimer.C:
217 case <-quitAfterTimer.C:
218 pauseTimer.Stop()
219 return prepareReturn(resp, err)
220 }
221 pauseTimer.Stop()
222
223
224
225
226
227
228 select {
229 case <-ctx.Done():
230 quitAfterTimer.Stop()
231 if err == nil {
232 err = ctx.Err()
233 }
234 return prepareReturn(resp, err)
235 case <-quitAfterTimer.C:
236 return prepareReturn(resp, err)
237 default:
238 }
239
240 resp, err = rx.transferChunk(ctx)
241
242 var status int
243 if resp != nil {
244 status = resp.StatusCode
245 }
246
247
248 if !errorFunc(status, err) {
249 quitAfterTimer.Stop()
250 break
251 }
252
253 rx.attempts++
254 pause = bo.Pause()
255 if resp != nil && resp.Body != nil {
256 resp.Body.Close()
257 }
258 }
259
260
261
262 if statusResumeIncomplete(resp) {
263 resp.Body.Close()
264 continue
265 }
266
267 return prepareReturn(resp, err)
268 }
269 }
270
View as plain text