...

Source file src/google.golang.org/api/internal/gensupport/media.go

Documentation: google.golang.org/api/internal/gensupport

     1  // Copyright 2016 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package gensupport
     6  
     7  import (
     8  	"bytes"
     9  	"fmt"
    10  	"io"
    11  	"mime"
    12  	"mime/multipart"
    13  	"net/http"
    14  	"net/textproto"
    15  	"strings"
    16  	"sync"
    17  	"time"
    18  
    19  	gax "github.com/googleapis/gax-go/v2"
    20  	"google.golang.org/api/googleapi"
    21  )
    22  
    23  type typeReader struct {
    24  	io.Reader
    25  	typ string
    26  }
    27  
    28  // multipartReader combines the contents of multiple readers to create a multipart/related HTTP body.
    29  // Close must be called if reads from the multipartReader are abandoned before reaching EOF.
    30  type multipartReader struct {
    31  	pr       *io.PipeReader
    32  	ctype    string
    33  	mu       sync.Mutex
    34  	pipeOpen bool
    35  }
    36  
    37  // boundary optionally specifies the MIME boundary
    38  func newMultipartReader(parts []typeReader, boundary string) *multipartReader {
    39  	mp := &multipartReader{pipeOpen: true}
    40  	var pw *io.PipeWriter
    41  	mp.pr, pw = io.Pipe()
    42  	mpw := multipart.NewWriter(pw)
    43  	if boundary != "" {
    44  		mpw.SetBoundary(boundary)
    45  	}
    46  	mp.ctype = "multipart/related; boundary=" + mpw.Boundary()
    47  	go func() {
    48  		for _, part := range parts {
    49  			w, err := mpw.CreatePart(typeHeader(part.typ))
    50  			if err != nil {
    51  				mpw.Close()
    52  				pw.CloseWithError(fmt.Errorf("googleapi: CreatePart failed: %v", err))
    53  				return
    54  			}
    55  			_, err = io.Copy(w, part.Reader)
    56  			if err != nil {
    57  				mpw.Close()
    58  				pw.CloseWithError(fmt.Errorf("googleapi: Copy failed: %v", err))
    59  				return
    60  			}
    61  		}
    62  
    63  		mpw.Close()
    64  		pw.Close()
    65  	}()
    66  	return mp
    67  }
    68  
    69  func (mp *multipartReader) Read(data []byte) (n int, err error) {
    70  	return mp.pr.Read(data)
    71  }
    72  
    73  func (mp *multipartReader) Close() error {
    74  	mp.mu.Lock()
    75  	if !mp.pipeOpen {
    76  		mp.mu.Unlock()
    77  		return nil
    78  	}
    79  	mp.pipeOpen = false
    80  	mp.mu.Unlock()
    81  	return mp.pr.Close()
    82  }
    83  
    84  // CombineBodyMedia combines a json body with media content to create a multipart/related HTTP body.
    85  // It returns a ReadCloser containing the combined body, and the overall "multipart/related" content type, with random boundary.
    86  //
    87  // The caller must call Close on the returned ReadCloser if reads are abandoned before reaching EOF.
    88  func CombineBodyMedia(body io.Reader, bodyContentType string, media io.Reader, mediaContentType string) (io.ReadCloser, string) {
    89  	return combineBodyMedia(body, bodyContentType, media, mediaContentType, "")
    90  }
    91  
    92  // combineBodyMedia is CombineBodyMedia but with an optional mimeBoundary field.
    93  func combineBodyMedia(body io.Reader, bodyContentType string, media io.Reader, mediaContentType, mimeBoundary string) (io.ReadCloser, string) {
    94  	mp := newMultipartReader([]typeReader{
    95  		{body, bodyContentType},
    96  		{media, mediaContentType},
    97  	}, mimeBoundary)
    98  	return mp, mp.ctype
    99  }
   100  
   101  func typeHeader(contentType string) textproto.MIMEHeader {
   102  	h := make(textproto.MIMEHeader)
   103  	if contentType != "" {
   104  		h.Set("Content-Type", contentType)
   105  	}
   106  	return h
   107  }
   108  
   109  // PrepareUpload determines whether the data in the supplied reader should be
   110  // uploaded in a single request, or in sequential chunks.
   111  // chunkSize is the size of the chunk that media should be split into.
   112  //
   113  // If chunkSize is zero, media is returned as the first value, and the other
   114  // two return values are nil, true.
   115  //
   116  // Otherwise, a MediaBuffer is returned, along with a bool indicating whether the
   117  // contents of media fit in a single chunk.
   118  //
   119  // After PrepareUpload has been called, media should no longer be used: the
   120  // media content should be accessed via one of the return values.
   121  func PrepareUpload(media io.Reader, chunkSize int) (r io.Reader, mb *MediaBuffer, singleChunk bool) {
   122  	if chunkSize == 0 { // do not chunk
   123  		return media, nil, true
   124  	}
   125  	mb = NewMediaBuffer(media, chunkSize)
   126  	_, _, _, err := mb.Chunk()
   127  	// If err is io.EOF, we can upload this in a single request. Otherwise, err is
   128  	// either nil or a non-EOF error. If it is the latter, then the next call to
   129  	// mb.Chunk will return the same error. Returning a MediaBuffer ensures that this
   130  	// error will be handled at some point.
   131  	return nil, mb, err == io.EOF
   132  }
   133  
   134  // MediaInfo holds information for media uploads. It is intended for use by generated
   135  // code only.
   136  type MediaInfo struct {
   137  	// At most one of Media and MediaBuffer will be set.
   138  	media              io.Reader
   139  	buffer             *MediaBuffer
   140  	singleChunk        bool
   141  	mType              string
   142  	size               int64 // mediaSize, if known.  Used only for calls to progressUpdater_.
   143  	progressUpdater    googleapi.ProgressUpdater
   144  	chunkRetryDeadline time.Duration
   145  }
   146  
   147  // NewInfoFromMedia should be invoked from the Media method of a call. It returns a
   148  // MediaInfo populated with chunk size and content type, and a reader or MediaBuffer
   149  // if needed.
   150  func NewInfoFromMedia(r io.Reader, options []googleapi.MediaOption) *MediaInfo {
   151  	mi := &MediaInfo{}
   152  	opts := googleapi.ProcessMediaOptions(options)
   153  	if !opts.ForceEmptyContentType {
   154  		mi.mType = opts.ContentType
   155  		if mi.mType == "" {
   156  			r, mi.mType = gax.DetermineContentType(r)
   157  		}
   158  	}
   159  	mi.chunkRetryDeadline = opts.ChunkRetryDeadline
   160  	mi.media, mi.buffer, mi.singleChunk = PrepareUpload(r, opts.ChunkSize)
   161  	return mi
   162  }
   163  
   164  // NewInfoFromResumableMedia should be invoked from the ResumableMedia method of a
   165  // call. It returns a MediaInfo using the given reader, size and media type.
   166  func NewInfoFromResumableMedia(r io.ReaderAt, size int64, mediaType string) *MediaInfo {
   167  	rdr := ReaderAtToReader(r, size)
   168  	mType := mediaType
   169  	if mType == "" {
   170  		rdr, mType = gax.DetermineContentType(rdr)
   171  	}
   172  
   173  	return &MediaInfo{
   174  		size:        size,
   175  		mType:       mType,
   176  		buffer:      NewMediaBuffer(rdr, googleapi.DefaultUploadChunkSize),
   177  		media:       nil,
   178  		singleChunk: false,
   179  	}
   180  }
   181  
   182  // SetProgressUpdater sets the progress updater for the media info.
   183  func (mi *MediaInfo) SetProgressUpdater(pu googleapi.ProgressUpdater) {
   184  	if mi != nil {
   185  		mi.progressUpdater = pu
   186  	}
   187  }
   188  
   189  // UploadType determines the type of upload: a single request, or a resumable
   190  // series of requests.
   191  func (mi *MediaInfo) UploadType() string {
   192  	if mi.singleChunk {
   193  		return "multipart"
   194  	}
   195  	return "resumable"
   196  }
   197  
   198  // UploadRequest sets up an HTTP request for media upload. It adds headers
   199  // as necessary, and returns a replacement for the body and a function for http.Request.GetBody.
   200  func (mi *MediaInfo) UploadRequest(reqHeaders http.Header, body io.Reader) (newBody io.Reader, getBody func() (io.ReadCloser, error), cleanup func()) {
   201  	cleanup = func() {}
   202  	if mi == nil {
   203  		return body, nil, cleanup
   204  	}
   205  	var media io.Reader
   206  	if mi.media != nil {
   207  		// This only happens when the caller has turned off chunking. In that
   208  		// case, we write all of media in a single non-retryable request.
   209  		media = mi.media
   210  	} else if mi.singleChunk {
   211  		// The data fits in a single chunk, which has now been read into the MediaBuffer.
   212  		// We obtain that chunk so we can write it in a single request. The request can
   213  		// be retried because the data is stored in the MediaBuffer.
   214  		media, _, _, _ = mi.buffer.Chunk()
   215  	}
   216  	toCleanup := []io.Closer{}
   217  	if media != nil {
   218  		fb := readerFunc(body)
   219  		fm := readerFunc(media)
   220  		combined, ctype := CombineBodyMedia(body, "application/json", media, mi.mType)
   221  		toCleanup = append(toCleanup, combined)
   222  		if fb != nil && fm != nil {
   223  			getBody = func() (io.ReadCloser, error) {
   224  				rb := io.NopCloser(fb())
   225  				rm := io.NopCloser(fm())
   226  				var mimeBoundary string
   227  				if _, params, err := mime.ParseMediaType(ctype); err == nil {
   228  					mimeBoundary = params["boundary"]
   229  				}
   230  				r, _ := combineBodyMedia(rb, "application/json", rm, mi.mType, mimeBoundary)
   231  				toCleanup = append(toCleanup, r)
   232  				return r, nil
   233  			}
   234  		}
   235  		reqHeaders.Set("Content-Type", ctype)
   236  		body = combined
   237  	}
   238  	if mi.buffer != nil && mi.mType != "" && !mi.singleChunk {
   239  		// This happens when initiating a resumable upload session.
   240  		// The initial request contains a JSON body rather than media.
   241  		// It can be retried with a getBody function that re-creates the request body.
   242  		fb := readerFunc(body)
   243  		if fb != nil {
   244  			getBody = func() (io.ReadCloser, error) {
   245  				rb := io.NopCloser(fb())
   246  				toCleanup = append(toCleanup, rb)
   247  				return rb, nil
   248  			}
   249  		}
   250  		reqHeaders.Set("X-Upload-Content-Type", mi.mType)
   251  	}
   252  	// Ensure that any bodies created in getBody are cleaned up.
   253  	cleanup = func() {
   254  		for _, closer := range toCleanup {
   255  			_ = closer.Close()
   256  		}
   257  
   258  	}
   259  	return body, getBody, cleanup
   260  }
   261  
   262  // readerFunc returns a function that always returns an io.Reader that has the same
   263  // contents as r, provided that can be done without consuming r. Otherwise, it
   264  // returns nil.
   265  // See http.NewRequest (in net/http/request.go).
   266  func readerFunc(r io.Reader) func() io.Reader {
   267  	switch r := r.(type) {
   268  	case *bytes.Buffer:
   269  		buf := r.Bytes()
   270  		return func() io.Reader { return bytes.NewReader(buf) }
   271  	case *bytes.Reader:
   272  		snapshot := *r
   273  		return func() io.Reader { r := snapshot; return &r }
   274  	case *strings.Reader:
   275  		snapshot := *r
   276  		return func() io.Reader { r := snapshot; return &r }
   277  	default:
   278  		return nil
   279  	}
   280  }
   281  
   282  // ResumableUpload returns an appropriately configured ResumableUpload value if the
   283  // upload is resumable, or nil otherwise.
   284  func (mi *MediaInfo) ResumableUpload(locURI string) *ResumableUpload {
   285  	if mi == nil || mi.singleChunk {
   286  		return nil
   287  	}
   288  	return &ResumableUpload{
   289  		URI:       locURI,
   290  		Media:     mi.buffer,
   291  		MediaType: mi.mType,
   292  		Callback: func(curr int64) {
   293  			if mi.progressUpdater != nil {
   294  				mi.progressUpdater(curr, mi.size)
   295  			}
   296  		},
   297  		ChunkRetryDeadline: mi.chunkRetryDeadline,
   298  	}
   299  }
   300  
   301  // SetGetBody sets the GetBody field of req to f. This was once needed
   302  // to gracefully support Go 1.7 and earlier which didn't have that
   303  // field.
   304  //
   305  // Deprecated: the code generator no longer uses this as of
   306  // 2019-02-19. Nothing else should be calling this anyway, but we
   307  // won't delete this immediately; it will be deleted in as early as 6
   308  // months.
   309  func SetGetBody(req *http.Request, f func() (io.ReadCloser, error)) {
   310  	req.GetBody = f
   311  }
   312  

View as plain text