...

Source file src/github.com/spf13/afero/gcsfs/file_resource.go

Documentation: github.com/spf13/afero/gcsfs

     1  // Copyright © 2021 Vasily Ovchinnikov <vasily@remerge.io>.
     2  //
     3  // The code in this file is derived from afero fork github.com/Zatte/afero by Mikael Rapp
     4  // licensed under Apache License 2.0.
     5  //
     6  // Licensed under the Apache License, Version 2.0 (the "License");
     7  // you may not use this file except in compliance with the License.
     8  // You may obtain a copy of the License at
     9  // http://www.apache.org/licenses/LICENSE-2.0
    10  //
    11  // Unless required by applicable law or agreed to in writing, software
    12  // distributed under the License is distributed on an "AS IS" BASIS,
    13  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    14  // See the License for the specific language governing permissions and
    15  // limitations under the License.
    16  
    17  package gcsfs
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"io"
    24  	"os"
    25  	"syscall"
    26  
    27  	"github.com/googleapis/google-cloud-go-testing/storage/stiface"
    28  )
    29  
    30  const (
    31  	maxWriteSize = 10000
    32  )
    33  
    34  // gcsFileResource represents a singleton version of each GCS object;
    35  // Google cloud storage allows users to open multiple writers(!) to the same
    36  // underlying resource, once the write is closed the written stream is commented. We are doing
    37  // some magic where we read and and write to the same file which requires synchronization
    38  // of the underlying resource.
    39  
    40  type gcsFileResource struct {
    41  	ctx context.Context
    42  
    43  	fs *Fs
    44  
    45  	obj      stiface.ObjectHandle
    46  	name     string
    47  	fileMode os.FileMode
    48  
    49  	currentGcsSize int64
    50  	offset         int64
    51  	reader         io.ReadCloser
    52  	writer         io.WriteCloser
    53  
    54  	closed bool
    55  }
    56  
    57  func (o *gcsFileResource) Close() error {
    58  	o.closed = true
    59  	// TODO rawGcsObjectsMap ?
    60  	return o.maybeCloseIo()
    61  }
    62  
    63  func (o *gcsFileResource) maybeCloseIo() error {
    64  	if err := o.maybeCloseReader(); err != nil {
    65  		return fmt.Errorf("error closing reader: %v", err)
    66  	}
    67  	if err := o.maybeCloseWriter(); err != nil {
    68  		return fmt.Errorf("error closing writer: %v", err)
    69  	}
    70  
    71  	return nil
    72  }
    73  
    74  func (o *gcsFileResource) maybeCloseReader() error {
    75  	if o.reader == nil {
    76  		return nil
    77  	}
    78  	if err := o.reader.Close(); err != nil {
    79  		return err
    80  	}
    81  	o.reader = nil
    82  	return nil
    83  }
    84  
    85  func (o *gcsFileResource) maybeCloseWriter() error {
    86  	if o.writer == nil {
    87  		return nil
    88  	}
    89  
    90  	// In cases of partial writes (e.g. to the middle of a file stream), we need to
    91  	// append any remaining data from the original file before we close the reader (and
    92  	// commit the results.)
    93  	// For small writes it can be more efficient
    94  	// to keep the original reader but that is for another iteration
    95  	if o.currentGcsSize > o.offset {
    96  		currentFile, err := o.obj.NewRangeReader(o.ctx, o.offset, -1)
    97  		if err != nil {
    98  			return fmt.Errorf(
    99  				"couldn't simulate a partial write; the closing (and thus"+
   100  					" the whole file write) is NOT commited to GCS. %v", err)
   101  		}
   102  		if currentFile != nil && currentFile.Remain() > 0 {
   103  			if _, err := io.Copy(o.writer, currentFile); err != nil {
   104  				return fmt.Errorf("error writing: %v", err)
   105  			}
   106  		}
   107  	}
   108  
   109  	if err := o.writer.Close(); err != nil {
   110  		return err
   111  	}
   112  	o.writer = nil
   113  	return nil
   114  }
   115  
   116  func (o *gcsFileResource) ReadAt(p []byte, off int64) (n int, err error) {
   117  	if cap(p) == 0 {
   118  		return 0, nil
   119  	}
   120  
   121  	// Assume that if the reader is open; it is at the correct offset
   122  	// a good performance assumption that we must ensure holds
   123  	if off == o.offset && o.reader != nil {
   124  		n, err = o.reader.Read(p)
   125  		o.offset += int64(n)
   126  		return n, err
   127  	}
   128  
   129  	// we have to check, whether it's a folder; the folder must not have an open readers, or writers though,
   130  	// so this check should not be invoked excessively and cause too much of a performance drop
   131  	if o.reader == nil && o.writer == nil {
   132  		var info *FileInfo
   133  		info, err = newFileInfo(o.name, o.fs, o.fileMode)
   134  		if err != nil {
   135  			return 0, err
   136  		}
   137  
   138  		if info.IsDir() {
   139  			// trying to read a directory must return this
   140  			return 0, syscall.EISDIR
   141  		}
   142  	}
   143  
   144  	// If any writers have written anything; commit it first so we can read it back.
   145  	if err = o.maybeCloseIo(); err != nil {
   146  		return 0, err
   147  	}
   148  
   149  	// Then read at the correct offset.
   150  	r, err := o.obj.NewRangeReader(o.ctx, off, -1)
   151  	if err != nil {
   152  		return 0, err
   153  	}
   154  	o.reader = r
   155  	o.offset = off
   156  
   157  	read, err := o.reader.Read(p)
   158  	o.offset += int64(read)
   159  	return read, err
   160  }
   161  
   162  func (o *gcsFileResource) WriteAt(b []byte, off int64) (n int, err error) {
   163  	// If the writer is opened and at the correct offset we're good!
   164  	if off == o.offset && o.writer != nil {
   165  		n, err = o.writer.Write(b)
   166  		o.offset += int64(n)
   167  		return n, err
   168  	}
   169  
   170  	// Ensure readers must be re-opened and that if a writer is active at another
   171  	// offset it is first committed before we do a "seek" below
   172  	if err = o.maybeCloseIo(); err != nil {
   173  		return 0, err
   174  	}
   175  
   176  	w := o.obj.NewWriter(o.ctx)
   177  	// TRIGGER WARNING: This can seem like a hack but it works thanks
   178  	// to GCS strong consistency. We will open and write to the same file; First when the
   179  	// writer is closed will the content get committed to GCS.
   180  	// The general idea is this:
   181  	// Objectv1[:offset] -> Objectv2
   182  	// newData1 -> Objectv2
   183  	// Objectv1[offset+len(newData1):] -> Objectv2
   184  	// Objectv2.Close
   185  	//
   186  	// It will however require a download and upload of the original file but it
   187  	// can't be avoided if we should support seek-write-operations on GCS.
   188  	objAttrs, err := o.obj.Attrs(o.ctx)
   189  	if err != nil {
   190  		if off > 0 {
   191  			return 0, err // WriteAt to a non existing file
   192  		}
   193  
   194  		o.currentGcsSize = 0
   195  	} else {
   196  		o.currentGcsSize = objAttrs.Size
   197  	}
   198  
   199  	if off > o.currentGcsSize {
   200  		return 0, ErrOutOfRange
   201  	}
   202  
   203  	if off > 0 {
   204  		var r stiface.Reader
   205  		r, err = o.obj.NewReader(o.ctx)
   206  		if err != nil {
   207  			return 0, err
   208  		}
   209  		if _, err = io.CopyN(w, r, off); err != nil {
   210  			return 0, err
   211  		}
   212  		if err = r.Close(); err != nil {
   213  			return 0, err
   214  		}
   215  	}
   216  
   217  	o.writer = w
   218  	o.offset = off
   219  
   220  	written, err := o.writer.Write(b)
   221  
   222  	o.offset += int64(written)
   223  	return written, err
   224  }
   225  
   226  func min(x, y int) int {
   227  	if x < y {
   228  		return x
   229  	}
   230  	return y
   231  }
   232  
   233  func (o *gcsFileResource) Truncate(wantedSize int64) error {
   234  	if wantedSize < 0 {
   235  		return ErrOutOfRange
   236  	}
   237  
   238  	if err := o.maybeCloseIo(); err != nil {
   239  		return err
   240  	}
   241  
   242  	r, err := o.obj.NewRangeReader(o.ctx, 0, wantedSize)
   243  	if err != nil {
   244  		return err
   245  	}
   246  
   247  	w := o.obj.NewWriter(o.ctx)
   248  	written, err := io.Copy(w, r)
   249  	if err != nil {
   250  		return err
   251  	}
   252  
   253  	for written < wantedSize {
   254  		// Bulk up padding writes
   255  		paddingBytes := bytes.Repeat([]byte(" "), min(maxWriteSize, int(wantedSize-written)))
   256  
   257  		n := 0
   258  		if n, err = w.Write(paddingBytes); err != nil {
   259  			return err
   260  		}
   261  
   262  		written += int64(n)
   263  	}
   264  	if err = r.Close(); err != nil {
   265  		return fmt.Errorf("error closing reader: %v", err)
   266  	}
   267  	if err = w.Close(); err != nil {
   268  		return fmt.Errorf("error closing writer: %v", err)
   269  	}
   270  	return nil
   271  }
   272  

View as plain text