...

Source file src/github.com/Shopify/go-storage/timeout_wrapper.go

Documentation: github.com/Shopify/go-storage

     1  package storage
     2  
     3  import (
     4  	"context"
     5  	"io"
     6  	"time"
     7  )
     8  
     9  // NewTimeoutWrapper creates a FS which wraps fs and adds a timeout to most operations:
    10  // read: Open, Attributes, URL
    11  // write: Create, Delete
    12  //
    13  // Note that the Open and Create methods are only for resolving the object, NOT actually reading or writing the contents.
    14  // These operations should be fairly quick, on the same order as Attribute and Delete, respectively.
    15  //
    16  // This depends on the underlying implementation to honour context's errors.
    17  // It is at least supported on the CloudStorageFS.
    18  //
    19  // Walk is not covered, since its duration is highly unpredictable.
    20  func NewTimeoutWrapper(fs FS, read time.Duration, write time.Duration) FS {
    21  	return &timeoutWrapper{
    22  		fs:    fs,
    23  		read:  read,
    24  		write: write,
    25  	}
    26  }
    27  
    28  type timeoutWrapper struct {
    29  	fs    FS
    30  	read  time.Duration
    31  	write time.Duration
    32  }
    33  
    34  // timeoutCall watches the context to be sure it's not Done yet,
    35  // but does NOT modify the context being passed to the underlying call.
    36  // This is important, because the context needs to continue to be alive while the returned object (File, Writer, etc)
    37  // is being used by the caller.
    38  func timeoutCall(ctx context.Context, timeout time.Duration, call func() (interface{}, error)) (interface{}, error) {
    39  	var out interface{}
    40  	var err error
    41  	done := make(chan struct{})
    42  	go func() {
    43  		out, err = call()
    44  		close(done)
    45  	}()
    46  
    47  	select {
    48  	case <-time.After(timeout):
    49  		return nil, context.DeadlineExceeded
    50  	case <-ctx.Done():
    51  		return nil, ctx.Err()
    52  	case <-done:
    53  		return out, err
    54  	}
    55  }
    56  
    57  // Open implements FS.
    58  func (t *timeoutWrapper) Open(ctx context.Context, path string, options *ReaderOptions) (*File, error) {
    59  	out, err := timeoutCall(ctx, t.read, func() (interface{}, error) {
    60  		return t.fs.Open(ctx, path, options)
    61  	})
    62  	if file, ok := out.(*File); ok {
    63  		return file, err
    64  	}
    65  
    66  	return nil, err
    67  }
    68  
    69  // Attributes() implements FS.
    70  func (t *timeoutWrapper) Attributes(ctx context.Context, path string, options *ReaderOptions) (*Attributes, error) {
    71  	out, err := timeoutCall(ctx, t.read, func() (interface{}, error) {
    72  		return t.fs.Attributes(ctx, path, options)
    73  	})
    74  	if attrs, ok := out.(*Attributes); ok {
    75  		return attrs, err
    76  	}
    77  
    78  	return nil, err
    79  }
    80  
    81  // Create implements FS.
    82  func (t *timeoutWrapper) Create(ctx context.Context, path string, options *WriterOptions) (io.WriteCloser, error) {
    83  	out, err := timeoutCall(ctx, t.write, func() (interface{}, error) {
    84  		return t.fs.Create(ctx, path, options)
    85  	})
    86  	if w, ok := out.(io.WriteCloser); ok {
    87  		return w, err
    88  	}
    89  
    90  	return nil, err
    91  }
    92  
    93  // Delete implements FS.
    94  func (t *timeoutWrapper) Delete(ctx context.Context, path string) error {
    95  	_, err := timeoutCall(ctx, t.write, func() (interface{}, error) {
    96  		return nil, t.fs.Delete(ctx, path)
    97  	})
    98  
    99  	return err
   100  }
   101  
   102  // Walk transverses all paths underneath path, calling fn on each visited path.
   103  func (t *timeoutWrapper) Walk(ctx context.Context, path string, fn WalkFn) error {
   104  	return t.fs.Walk(ctx, path, fn)
   105  }
   106  
   107  func (t *timeoutWrapper) URL(ctx context.Context, path string, options *SignedURLOptions) (string, error) {
   108  	out, err := timeoutCall(ctx, t.write, func() (interface{}, error) {
   109  		return t.fs.URL(ctx, path, options)
   110  	})
   111  	if url, ok := out.(string); ok {
   112  		return url, err
   113  	}
   114  
   115  	return "", err
   116  }
   117  

View as plain text