1
2
3
4
5 package gensupport
6
7 import (
8 "bytes"
9 "fmt"
10 "io"
11 "mime"
12 "mime/multipart"
13 "net/http"
14 "net/textproto"
15 "strings"
16 "sync"
17 "time"
18
19 gax "github.com/googleapis/gax-go/v2"
20 "google.golang.org/api/googleapi"
21 )
22
23 type typeReader struct {
24 io.Reader
25 typ string
26 }
27
28
29
30 type multipartReader struct {
31 pr *io.PipeReader
32 ctype string
33 mu sync.Mutex
34 pipeOpen bool
35 }
36
37
38 func newMultipartReader(parts []typeReader, boundary string) *multipartReader {
39 mp := &multipartReader{pipeOpen: true}
40 var pw *io.PipeWriter
41 mp.pr, pw = io.Pipe()
42 mpw := multipart.NewWriter(pw)
43 if boundary != "" {
44 mpw.SetBoundary(boundary)
45 }
46 mp.ctype = "multipart/related; boundary=" + mpw.Boundary()
47 go func() {
48 for _, part := range parts {
49 w, err := mpw.CreatePart(typeHeader(part.typ))
50 if err != nil {
51 mpw.Close()
52 pw.CloseWithError(fmt.Errorf("googleapi: CreatePart failed: %v", err))
53 return
54 }
55 _, err = io.Copy(w, part.Reader)
56 if err != nil {
57 mpw.Close()
58 pw.CloseWithError(fmt.Errorf("googleapi: Copy failed: %v", err))
59 return
60 }
61 }
62
63 mpw.Close()
64 pw.Close()
65 }()
66 return mp
67 }
68
69 func (mp *multipartReader) Read(data []byte) (n int, err error) {
70 return mp.pr.Read(data)
71 }
72
73 func (mp *multipartReader) Close() error {
74 mp.mu.Lock()
75 if !mp.pipeOpen {
76 mp.mu.Unlock()
77 return nil
78 }
79 mp.pipeOpen = false
80 mp.mu.Unlock()
81 return mp.pr.Close()
82 }
83
84
85
86
87
88 func CombineBodyMedia(body io.Reader, bodyContentType string, media io.Reader, mediaContentType string) (io.ReadCloser, string) {
89 return combineBodyMedia(body, bodyContentType, media, mediaContentType, "")
90 }
91
92
93 func combineBodyMedia(body io.Reader, bodyContentType string, media io.Reader, mediaContentType, mimeBoundary string) (io.ReadCloser, string) {
94 mp := newMultipartReader([]typeReader{
95 {body, bodyContentType},
96 {media, mediaContentType},
97 }, mimeBoundary)
98 return mp, mp.ctype
99 }
100
101 func typeHeader(contentType string) textproto.MIMEHeader {
102 h := make(textproto.MIMEHeader)
103 if contentType != "" {
104 h.Set("Content-Type", contentType)
105 }
106 return h
107 }
108
109
110
111
112
113
114
115
116
117
118
119
120
121 func PrepareUpload(media io.Reader, chunkSize int) (r io.Reader, mb *MediaBuffer, singleChunk bool) {
122 if chunkSize == 0 {
123 return media, nil, true
124 }
125 mb = NewMediaBuffer(media, chunkSize)
126 _, _, _, err := mb.Chunk()
127
128
129
130
131 return nil, mb, err == io.EOF
132 }
133
134
135
136 type MediaInfo struct {
137
138 media io.Reader
139 buffer *MediaBuffer
140 singleChunk bool
141 mType string
142 size int64
143 progressUpdater googleapi.ProgressUpdater
144 chunkRetryDeadline time.Duration
145 }
146
147
148
149
150 func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo {
151 mi := &MediaInfo{}
152 opts := googleapi.ProcessMediaOptions(options)
153 if !opts.ForceEmptyContentType {
154 mi.mType = opts.ContentType
155 if mi.mType == "" {
156 r, mi.mType = gax.DetermineContentType(r)
157 }
158 }
159 mi.chunkRetryDeadline = opts.ChunkRetryDeadline
160 mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize)
161 return mi
162 }
163
164
165
166 func NewInfoFromResumableMedia(r io.ReaderAt, size int64, mediaType string) *MediaInfo {
167 rdr := ReaderAtToReader(r, size)
168 mType := mediaType
169 if mType == "" {
170 rdr, mType = gax.DetermineContentType(rdr)
171 }
172
173 return &MediaInfo{
174 size: size,
175 mType: mType,
176 buffer: NewMediaBuffer(rdr, googleapi.DefaultUploadChunkSize),
177 media: nil,
178 singleChunk: false,
179 }
180 }
181
182
183 func (mi *MediaInfo) SetProgressUpdater(pu googleapi.ProgressUpdater) {
184 if mi != nil {
185 mi.progressUpdater = pu
186 }
187 }
188
189
190
191 func (mi *MediaInfo) UploadType() string {
192 if mi.singleChunk {
193 return "multipart"
194 }
195 return "resumable"
196 }
197
198
199
200 func (mi *MediaInfo) UploadRequest(reqHeaders http.Header, body io.Reader) (newBody io.Reader, getBody func() (io.ReadCloser, error), cleanup func()) {
201 cleanup = func() {}
202 if mi == nil {
203 return body, nil, cleanup
204 }
205 var media io.Reader
206 if mi.media != nil {
207
208
209 media = mi.media
210 } else if mi.singleChunk {
211
212
213
214 media, _, _, _ = mi.buffer.Chunk()
215 }
216 toCleanup := []io.Closer{}
217 if media != nil {
218 fb := readerFunc(body)
219 fm := readerFunc(media)
220 combined, ctype := CombineBodyMedia(body, "application/json", media, mi.mType)
221 toCleanup = append(toCleanup, combined)
222 if fb != nil && fm != nil {
223 getBody = func() (io.ReadCloser, error) {
224 rb := io.NopCloser(fb())
225 rm := io.NopCloser(fm())
226 var mimeBoundary string
227 if _, params, err := mime.ParseMediaType(ctype); err == nil {
228 mimeBoundary = params["boundary"]
229 }
230 r, _ := combineBodyMedia(rb, "application/json", rm, mi.mType, mimeBoundary)
231 toCleanup = append(toCleanup, r)
232 return r, nil
233 }
234 }
235 reqHeaders.Set("Content-Type", ctype)
236 body = combined
237 }
238 if mi.buffer != nil && mi.mType != "" && !mi.singleChunk {
239
240
241
242 fb := readerFunc(body)
243 if fb != nil {
244 getBody = func() (io.ReadCloser, error) {
245 rb := io.NopCloser(fb())
246 toCleanup = append(toCleanup, rb)
247 return rb, nil
248 }
249 }
250 reqHeaders.Set("X-Upload-Content-Type", mi.mType)
251 }
252
253 cleanup = func() {
254 for _, closer := range toCleanup {
255 _ = closer.Close()
256 }
257
258 }
259 return body, getBody, cleanup
260 }
261
262
263
264
265
266 func readerFunc(r io.Reader) func() io.Reader {
267 switch r := r.(type) {
268 case *bytes.Buffer:
269 buf := r.Bytes()
270 return func() io.Reader { return bytes.NewReader(buf) }
271 case *bytes.Reader:
272 snapshot := *r
273 return func() io.Reader { r := snapshot; return &r }
274 case *strings.Reader:
275 snapshot := *r
276 return func() io.Reader { r := snapshot; return &r }
277 default:
278 return nil
279 }
280 }
281
282
283
284 func (mi *MediaInfo) ResumableUpload(locURI string) *ResumableUpload {
285 if mi == nil || mi.singleChunk {
286 return nil
287 }
288 return &ResumableUpload{
289 URI: locURI,
290 Media: mi.buffer,
291 MediaType: mi.mType,
292 Callback: func(curr int64) {
293 if mi.progressUpdater != nil {
294 mi.progressUpdater(curr, mi.size)
295 }
296 },
297 ChunkRetryDeadline: mi.chunkRetryDeadline,
298 }
299 }
300
301
302
303
304
305
306
307
308
309 func SetGetBody(req *http.Request, f func() (io.ReadCloser, error)) {
310 req.GetBody = f
311 }
312
View as plain text