...

Source file src/cloud.google.com/go/storage/writer.go

Documentation: cloud.google.com/go/storage

     1  // Copyright 2014 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package storage
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"io"
    22  	"sync"
    23  	"time"
    24  	"unicode/utf8"
    25  
    26  	"cloud.google.com/go/internal/trace"
    27  )
    28  
    29  // A Writer writes a Cloud Storage object.
    30  type Writer struct {
    31  	// ObjectAttrs are optional attributes to set on the object. Any attributes
    32  	// must be initialized before the first Write call. Nil or zero-valued
    33  	// attributes are ignored.
    34  	ObjectAttrs
    35  
    36  	// SendCRC32C specifies whether to transmit a CRC32C field. It should be set
    37  	// to true in addition to setting the Writer's CRC32C field, because zero
    38  	// is a valid CRC and normally a zero would not be transmitted.
    39  	// If a CRC32C is sent, and the data written does not match the checksum,
    40  	// the write will be rejected.
    41  	//
    42  	// Note: SendCRC32C must be set to true BEFORE the first call to
    43  	// Writer.Write() in order to send the checksum. If it is set after that
    44  	// point, the checksum will be ignored.
    45  	SendCRC32C bool
    46  
    47  	// ChunkSize controls the maximum number of bytes of the object that the
    48  	// Writer will attempt to send to the server in a single request. Objects
    49  	// smaller than the size will be sent in a single request, while larger
    50  	// objects will be split over multiple requests. The value will be rounded up
    51  	// to the nearest multiple of 256K. The default ChunkSize is 16MiB.
    52  	//
    53  	// Each Writer will internally allocate a buffer of size ChunkSize. This is
    54  	// used to buffer input data and allow for the input to be sent again if a
    55  	// request must be retried.
    56  	//
    57  	// If you upload small objects (< 16MiB), you should set ChunkSize
    58  	// to a value slightly larger than the objects' sizes to avoid memory bloat.
    59  	// This is especially important if you are uploading many small objects
    60  	// concurrently. See
    61  	// https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload#size
    62  	// for more information about performance trade-offs related to ChunkSize.
    63  	//
    64  	// If ChunkSize is set to zero, chunking will be disabled and the object will
    65  	// be uploaded in a single request without the use of a buffer. This will
    66  	// further reduce memory used during uploads, but will also prevent the writer
    67  	// from retrying in case of a transient error from the server or resuming an
    68  	// upload that fails midway through, since the buffer is required in order to
    69  	// retry the failed request.
    70  	//
    71  	// ChunkSize must be set before the first Write call.
    72  	ChunkSize int
    73  
    74  	// ChunkRetryDeadline sets a per-chunk retry deadline for multi-chunk
    75  	// resumable uploads.
    76  	//
    77  	// For uploads of larger files, the Writer will attempt to retry if the
    78  	// request to upload a particular chunk fails with a transient error.
    79  	// If a single chunk has been attempting to upload for longer than this
    80  	// deadline and the request fails, it will no longer be retried, and the error
    81  	// will be returned to the caller. This is only applicable for files which are
    82  	// large enough to require a multi-chunk resumable upload. The default value
    83  	// is 32s. Users may want to pick a longer deadline if they are using larger
    84  	// values for ChunkSize or if they expect to have a slow or unreliable
    85  	// internet connection.
    86  	//
    87  	// To set a deadline on the entire upload, use context timeout or
    88  	// cancellation.
    89  	ChunkRetryDeadline time.Duration
    90  
    91  	// ForceEmptyContentType is an optional parameter that is used to disable
    92  	// auto-detection of Content-Type. By default, if a blank Content-Type
    93  	// is provided, then gax.DetermineContentType is called to sniff the type.
    94  	ForceEmptyContentType bool
    95  
    96  	// ProgressFunc can be used to monitor the progress of a large write
    97  	// operation. If ProgressFunc is not nil and writing requires multiple
    98  	// calls to the underlying service (see
    99  	// https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload),
   100  	// then ProgressFunc will be invoked after each call with the number of bytes of
   101  	// content copied so far.
   102  	//
   103  	// ProgressFunc should return quickly without blocking.
   104  	ProgressFunc func(int64)
   105  
   106  	ctx context.Context
   107  	o   *ObjectHandle
   108  
   109  	opened bool
   110  	pw     *io.PipeWriter
   111  
   112  	donec chan struct{} // closed after err and obj are set.
   113  	obj   *ObjectAttrs
   114  
   115  	mu  sync.Mutex
   116  	err error
   117  }
   118  
   119  // Write appends to w. It implements the io.Writer interface.
   120  //
   121  // Since writes happen asynchronously, Write may return a nil
   122  // error even though the write failed (or will fail). Always
   123  // use the error returned from Writer.Close to determine if
   124  // the upload was successful.
   125  //
   126  // Writes will be retried on transient errors from the server, unless
   127  // Writer.ChunkSize has been set to zero.
   128  func (w *Writer) Write(p []byte) (n int, err error) {
   129  	w.mu.Lock()
   130  	werr := w.err
   131  	w.mu.Unlock()
   132  	if werr != nil {
   133  		return 0, werr
   134  	}
   135  	if !w.opened {
   136  		if err := w.openWriter(); err != nil {
   137  			return 0, err
   138  		}
   139  	}
   140  	n, err = w.pw.Write(p)
   141  	if err != nil {
   142  		w.mu.Lock()
   143  		werr := w.err
   144  		w.mu.Unlock()
   145  		// Preserve existing functionality that when context is canceled, Write will return
   146  		// context.Canceled instead of "io: read/write on closed pipe". This hides the
   147  		// pipe implementation detail from users and makes Write seem as though it's an RPC.
   148  		if errors.Is(werr, context.Canceled) || errors.Is(werr, context.DeadlineExceeded) {
   149  			return n, werr
   150  		}
   151  	}
   152  	return n, err
   153  }
   154  
   155  // Close completes the write operation and flushes any buffered data.
   156  // If Close doesn't return an error, metadata about the written object
   157  // can be retrieved by calling Attrs.
   158  func (w *Writer) Close() error {
   159  	if !w.opened {
   160  		if err := w.openWriter(); err != nil {
   161  			return err
   162  		}
   163  	}
   164  
   165  	// Closing either the read or write causes the entire pipe to close.
   166  	if err := w.pw.Close(); err != nil {
   167  		return err
   168  	}
   169  
   170  	<-w.donec
   171  	w.mu.Lock()
   172  	defer w.mu.Unlock()
   173  	trace.EndSpan(w.ctx, w.err)
   174  	return w.err
   175  }
   176  
   177  func (w *Writer) openWriter() (err error) {
   178  	if err := w.validateWriteAttrs(); err != nil {
   179  		return err
   180  	}
   181  	if w.o.gen != defaultGen {
   182  		return fmt.Errorf("storage: generation not supported on Writer, got %v", w.o.gen)
   183  	}
   184  
   185  	isIdempotent := w.o.conds != nil && (w.o.conds.GenerationMatch >= 0 || w.o.conds.DoesNotExist == true)
   186  	opts := makeStorageOpts(isIdempotent, w.o.retry, w.o.userProject)
   187  	params := &openWriterParams{
   188  		ctx:                   w.ctx,
   189  		chunkSize:             w.ChunkSize,
   190  		chunkRetryDeadline:    w.ChunkRetryDeadline,
   191  		bucket:                w.o.bucket,
   192  		attrs:                 &w.ObjectAttrs,
   193  		conds:                 w.o.conds,
   194  		encryptionKey:         w.o.encryptionKey,
   195  		sendCRC32C:            w.SendCRC32C,
   196  		donec:                 w.donec,
   197  		setError:              w.error,
   198  		progress:              w.progress,
   199  		setObj:                func(o *ObjectAttrs) { w.obj = o },
   200  		forceEmptyContentType: w.ForceEmptyContentType,
   201  	}
   202  	if err := w.ctx.Err(); err != nil {
   203  		return err // short-circuit
   204  	}
   205  	w.pw, err = w.o.c.tc.OpenWriter(params, opts...)
   206  	if err != nil {
   207  		return err
   208  	}
   209  	w.opened = true
   210  	go w.monitorCancel()
   211  
   212  	return nil
   213  }
   214  
   215  // monitorCancel is intended to be used as a background goroutine. It monitors the
   216  // context, and when it observes that the context has been canceled, it manually
   217  // closes things that do not take a context.
   218  func (w *Writer) monitorCancel() {
   219  	select {
   220  	case <-w.ctx.Done():
   221  		w.mu.Lock()
   222  		werr := w.ctx.Err()
   223  		w.err = werr
   224  		w.mu.Unlock()
   225  
   226  		// Closing either the read or write causes the entire pipe to close.
   227  		w.CloseWithError(werr)
   228  	case <-w.donec:
   229  	}
   230  }
   231  
   232  // CloseWithError aborts the write operation with the provided error.
   233  // CloseWithError always returns nil.
   234  //
   235  // Deprecated: cancel the context passed to NewWriter instead.
   236  func (w *Writer) CloseWithError(err error) error {
   237  	if !w.opened {
   238  		return nil
   239  	}
   240  	return w.pw.CloseWithError(err)
   241  }
   242  
   243  // Attrs returns metadata about a successfully-written object.
   244  // It's only valid to call it after Close returns nil.
   245  func (w *Writer) Attrs() *ObjectAttrs {
   246  	return w.obj
   247  }
   248  
   249  func (w *Writer) validateWriteAttrs() error {
   250  	attrs := w.ObjectAttrs
   251  	// Check the developer didn't change the object Name (this is unfortunate, but
   252  	// we don't want to store an object under the wrong name).
   253  	if attrs.Name != w.o.object {
   254  		return fmt.Errorf("storage: Writer.Name %q does not match object name %q", attrs.Name, w.o.object)
   255  	}
   256  	if !utf8.ValidString(attrs.Name) {
   257  		return fmt.Errorf("storage: object name %q is not valid UTF-8", attrs.Name)
   258  	}
   259  	if attrs.KMSKeyName != "" && w.o.encryptionKey != nil {
   260  		return errors.New("storage: cannot use KMSKeyName with a customer-supplied encryption key")
   261  	}
   262  	if w.ChunkSize < 0 {
   263  		return errors.New("storage: Writer.ChunkSize must be non-negative")
   264  	}
   265  	return nil
   266  }
   267  
   268  // progress is a convenience wrapper that reports write progress to the Writer
   269  // ProgressFunc if it is set and progress is non-zero.
   270  func (w *Writer) progress(p int64) {
   271  	if w.ProgressFunc != nil && p != 0 {
   272  		w.ProgressFunc(p)
   273  	}
   274  }
   275  
   276  // error acquires the Writer's lock, sets the Writer's err to the given error,
   277  // then relinquishes the lock.
   278  func (w *Writer) error(err error) {
   279  	w.mu.Lock()
   280  	w.err = err
   281  	w.mu.Unlock()
   282  }
   283  

View as plain text