...

Source file src/google.golang.org/api/internal/gensupport/resumable.go

Documentation: google.golang.org/api/internal/gensupport

     1  // Copyright 2016 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package gensupport
     6  
     7  import (
     8  	"context"
     9  	"errors"
    10  	"fmt"
    11  	"io"
    12  	"net/http"
    13  	"strings"
    14  	"sync"
    15  	"time"
    16  
    17  	"github.com/google/uuid"
    18  	"google.golang.org/api/internal"
    19  )
    20  
    21  // ResumableUpload is used by the generated APIs to provide resumable uploads.
    22  // It is not used by developers directly.
    23  type ResumableUpload struct {
    24  	Client *http.Client
    25  	// URI is the resumable resource destination provided by the server after specifying "&uploadType=resumable".
    26  	URI       string
    27  	UserAgent string // User-Agent for header of the request
    28  	// Media is the object being uploaded.
    29  	Media *MediaBuffer
    30  	// MediaType defines the media type, e.g. "image/jpeg".
    31  	MediaType string
    32  
    33  	mu       sync.Mutex // guards progress
    34  	progress int64      // number of bytes uploaded so far
    35  
    36  	// Callback is an optional function that will be periodically called with the cumulative number of bytes uploaded.
    37  	Callback func(int64)
    38  
    39  	// Retry optionally configures retries for requests made against the upload.
    40  	Retry *RetryConfig
    41  
    42  	// ChunkRetryDeadline configures the per-chunk deadline after which no further
    43  	// retries should happen.
    44  	ChunkRetryDeadline time.Duration
    45  
    46  	// Track current request invocation ID and attempt count for retry metrics
    47  	// and idempotency headers.
    48  	invocationID string
    49  	attempts     int
    50  }
    51  
    52  // Progress returns the number of bytes uploaded at this point.
    53  func (rx *ResumableUpload) Progress() int64 {
    54  	rx.mu.Lock()
    55  	defer rx.mu.Unlock()
    56  	return rx.progress
    57  }
    58  
    59  // doUploadRequest performs a single HTTP request to upload data.
    60  // off specifies the offset in rx.Media from which data is drawn.
    61  // size is the number of bytes in data.
    62  // final specifies whether data is the final chunk to be uploaded.
    63  func (rx *ResumableUpload) doUploadRequest(ctx context.Context, data io.Reader, off, size int64, final bool) (*http.Response, error) {
    64  	req, err := http.NewRequest("POST", rx.URI, data)
    65  	if err != nil {
    66  		return nil, err
    67  	}
    68  
    69  	req.ContentLength = size
    70  	var contentRange string
    71  	if final {
    72  		if size == 0 {
    73  			contentRange = fmt.Sprintf("bytes */%v", off)
    74  		} else {
    75  			contentRange = fmt.Sprintf("bytes %v-%v/%v", off, off+size-1, off+size)
    76  		}
    77  	} else {
    78  		contentRange = fmt.Sprintf("bytes %v-%v/*", off, off+size-1)
    79  	}
    80  	req.Header.Set("Content-Range", contentRange)
    81  	req.Header.Set("Content-Type", rx.MediaType)
    82  	req.Header.Set("User-Agent", rx.UserAgent)
    83  
    84  	// TODO(b/274504690): Consider dropping gccl-invocation-id key since it
    85  	// duplicates the X-Goog-Gcs-Idempotency-Token header (added in v0.115.0).
    86  	baseXGoogHeader := "gl-go/" + GoVersion() + " gdcl/" + internal.Version
    87  	invocationHeader := fmt.Sprintf("gccl-invocation-id/%s gccl-attempt-count/%d", rx.invocationID, rx.attempts)
    88  	req.Header.Set("X-Goog-Api-Client", strings.Join([]string{baseXGoogHeader, invocationHeader}, " "))
    89  
    90  	// Set idempotency token header which is used by GCS uploads.
    91  	req.Header.Set("X-Goog-Gcs-Idempotency-Token", rx.invocationID)
    92  
    93  	// Google's upload endpoint uses status code 308 for a
    94  	// different purpose than the "308 Permanent Redirect"
    95  	// since-standardized in RFC 7238. Because of the conflict in
    96  	// semantics, Google added this new request header which
    97  	// causes it to not use "308" and instead reply with 200 OK
    98  	// and sets the upload-specific "X-HTTP-Status-Code-Override:
    99  	// 308" response header.
   100  	req.Header.Set("X-GUploader-No-308", "yes")
   101  
   102  	return SendRequest(ctx, rx.Client, req)
   103  }
   104  
   105  func statusResumeIncomplete(resp *http.Response) bool {
   106  	// This is how the server signals "status resume incomplete"
   107  	// when X-GUploader-No-308 is set to "yes":
   108  	return resp != nil && resp.Header.Get("X-Http-Status-Code-Override") == "308"
   109  }
   110  
   111  // reportProgress calls a user-supplied callback to report upload progress.
   112  // If old==updated, the callback is not called.
   113  func (rx *ResumableUpload) reportProgress(old, updated int64) {
   114  	if updated-old == 0 {
   115  		return
   116  	}
   117  	rx.mu.Lock()
   118  	rx.progress = updated
   119  	rx.mu.Unlock()
   120  	if rx.Callback != nil {
   121  		rx.Callback(updated)
   122  	}
   123  }
   124  
   125  // transferChunk performs a single HTTP request to upload a single chunk from rx.Media.
   126  func (rx *ResumableUpload) transferChunk(ctx context.Context) (*http.Response, error) {
   127  	chunk, off, size, err := rx.Media.Chunk()
   128  
   129  	done := err == io.EOF
   130  	if !done && err != nil {
   131  		return nil, err
   132  	}
   133  
   134  	res, err := rx.doUploadRequest(ctx, chunk, off, int64(size), done)
   135  	if err != nil {
   136  		return res, err
   137  	}
   138  
   139  	// We sent "X-GUploader-No-308: yes" (see comment elsewhere in
   140  	// this file), so we don't expect to get a 308.
   141  	if res.StatusCode == 308 {
   142  		return nil, errors.New("unexpected 308 response status code")
   143  	}
   144  
   145  	if res.StatusCode == http.StatusOK {
   146  		rx.reportProgress(off, off+int64(size))
   147  	}
   148  
   149  	if statusResumeIncomplete(res) {
   150  		rx.Media.Next()
   151  	}
   152  	return res, nil
   153  }
   154  
   155  // Upload starts the process of a resumable upload with a cancellable context.
   156  // It retries using the provided back off strategy until cancelled or the
   157  // strategy indicates to stop retrying.
   158  // It is called from the auto-generated API code and is not visible to the user.
   159  // Before sending an HTTP request, Upload calls any registered hook functions,
   160  // and calls the returned functions after the request returns (see send.go).
   161  // rx is private to the auto-generated API code.
   162  // Exactly one of resp or err will be nil.  If resp is non-nil, the caller must call resp.Body.Close.
   163  func (rx *ResumableUpload) Upload(ctx context.Context) (resp *http.Response, err error) {
   164  
   165  	// There are a couple of cases where it's possible for err and resp to both
   166  	// be non-nil. However, we expose a simpler contract to our callers: exactly
   167  	// one of resp and err will be non-nil. This means that any response body
   168  	// must be closed here before returning a non-nil error.
   169  	var prepareReturn = func(resp *http.Response, err error) (*http.Response, error) {
   170  		if err != nil {
   171  			if resp != nil && resp.Body != nil {
   172  				resp.Body.Close()
   173  			}
   174  			return nil, err
   175  		}
   176  		// This case is very unlikely but possible only if rx.ChunkRetryDeadline is
   177  		// set to a very small value, in which case no requests will be sent before
   178  		// the deadline. Return an error to avoid causing a panic.
   179  		if resp == nil {
   180  			return nil, fmt.Errorf("upload request to %v not sent, choose larger value for ChunkRetryDealine", rx.URI)
   181  		}
   182  		return resp, nil
   183  	}
   184  	// Configure retryable error criteria.
   185  	errorFunc := rx.Retry.errorFunc()
   186  
   187  	// Configure per-chunk retry deadline.
   188  	var retryDeadline time.Duration
   189  	if rx.ChunkRetryDeadline != 0 {
   190  		retryDeadline = rx.ChunkRetryDeadline
   191  	} else {
   192  		retryDeadline = defaultRetryDeadline
   193  	}
   194  
   195  	// Send all chunks.
   196  	for {
   197  		var pause time.Duration
   198  
   199  		// Each chunk gets its own initialized-at-zero backoff and invocation ID.
   200  		bo := rx.Retry.backoff()
   201  		quitAfterTimer := time.NewTimer(retryDeadline)
   202  		rx.attempts = 1
   203  		rx.invocationID = uuid.New().String()
   204  
   205  		// Retry loop for a single chunk.
   206  		for {
   207  			pauseTimer := time.NewTimer(pause)
   208  			select {
   209  			case <-ctx.Done():
   210  				quitAfterTimer.Stop()
   211  				pauseTimer.Stop()
   212  				if err == nil {
   213  					err = ctx.Err()
   214  				}
   215  				return prepareReturn(resp, err)
   216  			case <-pauseTimer.C:
   217  			case <-quitAfterTimer.C:
   218  				pauseTimer.Stop()
   219  				return prepareReturn(resp, err)
   220  			}
   221  			pauseTimer.Stop()
   222  
   223  			// Check for context cancellation or timeout once more. If more than one
   224  			// case in the select statement above was satisfied at the same time, Go
   225  			// will choose one arbitrarily.
   226  			// That can cause an operation to go through even if the context was
   227  			// canceled before or the timeout was reached.
   228  			select {
   229  			case <-ctx.Done():
   230  				quitAfterTimer.Stop()
   231  				if err == nil {
   232  					err = ctx.Err()
   233  				}
   234  				return prepareReturn(resp, err)
   235  			case <-quitAfterTimer.C:
   236  				return prepareReturn(resp, err)
   237  			default:
   238  			}
   239  
   240  			resp, err = rx.transferChunk(ctx)
   241  
   242  			var status int
   243  			if resp != nil {
   244  				status = resp.StatusCode
   245  			}
   246  
   247  			// Check if we should retry the request.
   248  			if !errorFunc(status, err) {
   249  				quitAfterTimer.Stop()
   250  				break
   251  			}
   252  
   253  			rx.attempts++
   254  			pause = bo.Pause()
   255  			if resp != nil && resp.Body != nil {
   256  				resp.Body.Close()
   257  			}
   258  		}
   259  
   260  		// If the chunk was uploaded successfully, but there's still
   261  		// more to go, upload the next chunk without any delay.
   262  		if statusResumeIncomplete(resp) {
   263  			resp.Body.Close()
   264  			continue
   265  		}
   266  
   267  		return prepareReturn(resp, err)
   268  	}
   269  }
   270  

View as plain text