1 // Copyright 2014 Google LLC 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package storage 16 17 import ( 18 "context" 19 "errors" 20 "fmt" 21 "io" 22 "sync" 23 "time" 24 "unicode/utf8" 25 26 "cloud.google.com/go/internal/trace" 27 ) 28 29 // A Writer writes a Cloud Storage object. 30 type Writer struct { 31 // ObjectAttrs are optional attributes to set on the object. Any attributes 32 // must be initialized before the first Write call. Nil or zero-valued 33 // attributes are ignored. 34 ObjectAttrs 35 36 // SendCRC32C specifies whether to transmit a CRC32C field. It should be set 37 // to true in addition to setting the Writer's CRC32C field, because zero 38 // is a valid CRC and normally a zero would not be transmitted. 39 // If a CRC32C is sent, and the data written does not match the checksum, 40 // the write will be rejected. 41 // 42 // Note: SendCRC32C must be set to true BEFORE the first call to 43 // Writer.Write() in order to send the checksum. If it is set after that 44 // point, the checksum will be ignored. 45 SendCRC32C bool 46 47 // ChunkSize controls the maximum number of bytes of the object that the 48 // Writer will attempt to send to the server in a single request. Objects 49 // smaller than the size will be sent in a single request, while larger 50 // objects will be split over multiple requests. The value will be rounded up 51 // to the nearest multiple of 256K. The default ChunkSize is 16MiB. 52 // 53 // Each Writer will internally allocate a buffer of size ChunkSize. This is 54 // used to buffer input data and allow for the input to be sent again if a 55 // request must be retried. 56 // 57 // If you upload small objects (< 16MiB), you should set ChunkSize 58 // to a value slightly larger than the objects' sizes to avoid memory bloat. 59 // This is especially important if you are uploading many small objects 60 // concurrently. See 61 // https://cloud.google.com/storage/docs/json_api/v1/how-tos/upload#size 62 // for more information about performance trade-offs related to ChunkSize. 63 // 64 // If ChunkSize is set to zero, chunking will be disabled and the object will 65 // be uploaded in a single request without the use of a buffer. This will 66 // further reduce memory used during uploads, but will also prevent the writer 67 // from retrying in case of a transient error from the server or resuming an 68 // upload that fails midway through, since the buffer is required in order to 69 // retry the failed request. 70 // 71 // ChunkSize must be set before the first Write call. 72 ChunkSize int 73 74 // ChunkRetryDeadline sets a per-chunk retry deadline for multi-chunk 75 // resumable uploads. 76 // 77 // For uploads of larger files, the Writer will attempt to retry if the 78 // request to upload a particular chunk fails with a transient error. 79 // If a single chunk has been attempting to upload for longer than this 80 // deadline and the request fails, it will no longer be retried, and the error 81 // will be returned to the caller. This is only applicable for files which are 82 // large enough to require a multi-chunk resumable upload. The default value 83 // is 32s. Users may want to pick a longer deadline if they are using larger 84 // values for ChunkSize or if they expect to have a slow or unreliable 85 // internet connection. 86 // 87 // To set a deadline on the entire upload, use context timeout or 88 // cancellation. 89 ChunkRetryDeadline time.Duration 90 91 // ForceEmptyContentType is an optional parameter that is used to disable 92 // auto-detection of Content-Type. By default, if a blank Content-Type 93 // is provided, then gax.DetermineContentType is called to sniff the type. 94 ForceEmptyContentType bool 95 96 // ProgressFunc can be used to monitor the progress of a large write 97 // operation. If ProgressFunc is not nil and writing requires multiple 98 // calls to the underlying service (see 99 // https://cloud.google.com/storage/docs/json_api/v1/how-tos/resumable-upload), 100 // then ProgressFunc will be invoked after each call with the number of bytes of 101 // content copied so far. 102 // 103 // ProgressFunc should return quickly without blocking. 104 ProgressFunc func(int64) 105 106 ctx context.Context 107 o *ObjectHandle 108 109 opened bool 110 pw *io.PipeWriter 111 112 donec chan struct{} // closed after err and obj are set. 113 obj *ObjectAttrs 114 115 mu sync.Mutex 116 err error 117 } 118 119 // Write appends to w. It implements the io.Writer interface. 120 // 121 // Since writes happen asynchronously, Write may return a nil 122 // error even though the write failed (or will fail). Always 123 // use the error returned from Writer.Close to determine if 124 // the upload was successful. 125 // 126 // Writes will be retried on transient errors from the server, unless 127 // Writer.ChunkSize has been set to zero. 128 func (w *Writer) Write(p []byte) (n int, err error) { 129 w.mu.Lock() 130 werr := w.err 131 w.mu.Unlock() 132 if werr != nil { 133 return 0, werr 134 } 135 if !w.opened { 136 if err := w.openWriter(); err != nil { 137 return 0, err 138 } 139 } 140 n, err = w.pw.Write(p) 141 if err != nil { 142 w.mu.Lock() 143 werr := w.err 144 w.mu.Unlock() 145 // Preserve existing functionality that when context is canceled, Write will return 146 // context.Canceled instead of "io: read/write on closed pipe". This hides the 147 // pipe implementation detail from users and makes Write seem as though it's an RPC. 148 if errors.Is(werr, context.Canceled) || errors.Is(werr, context.DeadlineExceeded) { 149 return n, werr 150 } 151 } 152 return n, err 153 } 154 155 // Close completes the write operation and flushes any buffered data. 156 // If Close doesn't return an error, metadata about the written object 157 // can be retrieved by calling Attrs. 158 func (w *Writer) Close() error { 159 if !w.opened { 160 if err := w.openWriter(); err != nil { 161 return err 162 } 163 } 164 165 // Closing either the read or write causes the entire pipe to close. 166 if err := w.pw.Close(); err != nil { 167 return err 168 } 169 170 <-w.donec 171 w.mu.Lock() 172 defer w.mu.Unlock() 173 trace.EndSpan(w.ctx, w.err) 174 return w.err 175 } 176 177 func (w *Writer) openWriter() (err error) { 178 if err := w.validateWriteAttrs(); err != nil { 179 return err 180 } 181 if w.o.gen != defaultGen { 182 return fmt.Errorf("storage: generation not supported on Writer, got %v", w.o.gen) 183 } 184 185 isIdempotent := w.o.conds != nil && (w.o.conds.GenerationMatch >= 0 || w.o.conds.DoesNotExist == true) 186 opts := makeStorageOpts(isIdempotent, w.o.retry, w.o.userProject) 187 params := &openWriterParams{ 188 ctx: w.ctx, 189 chunkSize: w.ChunkSize, 190 chunkRetryDeadline: w.ChunkRetryDeadline, 191 bucket: w.o.bucket, 192 attrs: &w.ObjectAttrs, 193 conds: w.o.conds, 194 encryptionKey: w.o.encryptionKey, 195 sendCRC32C: w.SendCRC32C, 196 donec: w.donec, 197 setError: w.error, 198 progress: w.progress, 199 setObj: func(o *ObjectAttrs) { w.obj = o }, 200 forceEmptyContentType: w.ForceEmptyContentType, 201 } 202 if err := w.ctx.Err(); err != nil { 203 return err // short-circuit 204 } 205 w.pw, err = w.o.c.tc.OpenWriter(params, opts...) 206 if err != nil { 207 return err 208 } 209 w.opened = true 210 go w.monitorCancel() 211 212 return nil 213 } 214 215 // monitorCancel is intended to be used as a background goroutine. It monitors the 216 // context, and when it observes that the context has been canceled, it manually 217 // closes things that do not take a context. 218 func (w *Writer) monitorCancel() { 219 select { 220 case <-w.ctx.Done(): 221 w.mu.Lock() 222 werr := w.ctx.Err() 223 w.err = werr 224 w.mu.Unlock() 225 226 // Closing either the read or write causes the entire pipe to close. 227 w.CloseWithError(werr) 228 case <-w.donec: 229 } 230 } 231 232 // CloseWithError aborts the write operation with the provided error. 233 // CloseWithError always returns nil. 234 // 235 // Deprecated: cancel the context passed to NewWriter instead. 236 func (w *Writer) CloseWithError(err error) error { 237 if !w.opened { 238 return nil 239 } 240 return w.pw.CloseWithError(err) 241 } 242 243 // Attrs returns metadata about a successfully-written object. 244 // It's only valid to call it after Close returns nil. 245 func (w *Writer) Attrs() *ObjectAttrs { 246 return w.obj 247 } 248 249 func (w *Writer) validateWriteAttrs() error { 250 attrs := w.ObjectAttrs 251 // Check the developer didn't change the object Name (this is unfortunate, but 252 // we don't want to store an object under the wrong name). 253 if attrs.Name != w.o.object { 254 return fmt.Errorf("storage: Writer.Name %q does not match object name %q", attrs.Name, w.o.object) 255 } 256 if !utf8.ValidString(attrs.Name) { 257 return fmt.Errorf("storage: object name %q is not valid UTF-8", attrs.Name) 258 } 259 if attrs.KMSKeyName != "" && w.o.encryptionKey != nil { 260 return errors.New("storage: cannot use KMSKeyName with a customer-supplied encryption key") 261 } 262 if w.ChunkSize < 0 { 263 return errors.New("storage: Writer.ChunkSize must be non-negative") 264 } 265 return nil 266 } 267 268 // progress is a convenience wrapper that reports write progress to the Writer 269 // ProgressFunc if it is set and progress is non-zero. 270 func (w *Writer) progress(p int64) { 271 if w.ProgressFunc != nil && p != 0 { 272 w.ProgressFunc(p) 273 } 274 } 275 276 // error acquires the Writer's lock, sets the Writer's err to the given error, 277 // then relinquishes the lock. 278 func (w *Writer) error(err error) { 279 w.mu.Lock() 280 w.err = err 281 w.mu.Unlock() 282 } 283