...

Source file src/google.golang.org/api/transport/bytestream/client.go

Documentation: google.golang.org/api/transport/bytestream

     1  // Copyright 2016 Google LLC.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  // Package bytestream provides a client for any service that exposes a ByteStream API.
     6  //
     7  // Note: This package is a work-in-progress.  Backwards-incompatible changes should be expected.
     8  package bytestream
     9  
    10  // This file contains the client implementation of Bytestream declared at:
    11  // https://github.com/googleapis/googleapis/blob/master/google/bytestream/bytestream.proto
    12  
    13  import (
    14  	"context"
    15  	"fmt"
    16  	"math/rand"
    17  	"time"
    18  
    19  	"google.golang.org/grpc"
    20  
    21  	pb "google.golang.org/genproto/googleapis/bytestream"
    22  )
    23  
    24  const (
    25  	// MaxBufSize is the maximum buffer size (in bytes) received in a read chunk or sent in a write chunk.
    26  	MaxBufSize  = 2 * 1024 * 1024
    27  	backoffBase = 10 * time.Millisecond
    28  	backoffMax  = 1 * time.Second
    29  	maxTries    = 5
    30  )
    31  
    32  // Client is the go wrapper around a ByteStreamClient and provides an interface to it.
    33  type Client struct {
    34  	client  pb.ByteStreamClient
    35  	options []grpc.CallOption
    36  	conn    *grpc.ClientConn
    37  }
    38  
    39  // NewClient creates a new bytestream.Client.
    40  func NewClient(cc *grpc.ClientConn, options ...grpc.CallOption) *Client {
    41  	return &Client{
    42  		client:  pb.NewByteStreamClient(cc),
    43  		options: options,
    44  		conn:    cc,
    45  	}
    46  }
    47  
    48  // Reader reads from a byte stream.
    49  type Reader struct {
    50  	ctx          context.Context
    51  	c            *Client
    52  	readClient   pb.ByteStream_ReadClient
    53  	resourceName string
    54  	err          error
    55  	buf          []byte
    56  }
    57  
    58  // ResourceName gets the resource name this Reader is reading.
    59  func (r *Reader) ResourceName() string {
    60  	return r.resourceName
    61  }
    62  
    63  // Read implements io.Reader.
    64  // Read buffers received bytes that do not fit in p.
    65  func (r *Reader) Read(p []byte) (int, error) {
    66  	if r.err != nil {
    67  		return 0, r.err
    68  	}
    69  
    70  	var backoffDelay time.Duration
    71  	for tries := 0; len(r.buf) == 0 && tries < maxTries; tries++ {
    72  		// No data in buffer.
    73  		resp, err := r.readClient.Recv()
    74  		if err != nil {
    75  			r.err = err
    76  			return 0, err
    77  		}
    78  		r.buf = resp.Data
    79  		if len(r.buf) != 0 {
    80  			break
    81  		}
    82  
    83  		// back off
    84  		if backoffDelay < backoffBase {
    85  			backoffDelay = backoffBase
    86  		} else {
    87  			backoffDelay = time.Duration(float64(backoffDelay) * 1.3 * (1 - 0.4*rand.Float64()))
    88  		}
    89  		if backoffDelay > backoffMax {
    90  			backoffDelay = backoffMax
    91  		}
    92  		t := time.NewTimer(backoffDelay)
    93  		select {
    94  		case <-t.C:
    95  		case <-r.ctx.Done():
    96  			t.Stop()
    97  			if err := r.ctx.Err(); err != nil {
    98  				r.err = err
    99  			}
   100  			return 0, r.err
   101  		}
   102  	}
   103  
   104  	// Copy from buffer.
   105  	n := copy(p, r.buf)
   106  	r.buf = r.buf[n:]
   107  	return n, nil
   108  }
   109  
   110  // Close implements io.Closer.
   111  func (r *Reader) Close() error {
   112  	if r.readClient == nil {
   113  		return nil
   114  	}
   115  	err := r.readClient.CloseSend()
   116  	r.readClient = nil
   117  	return err
   118  }
   119  
   120  // NewReader creates a new Reader to read a resource.
   121  func (c *Client) NewReader(ctx context.Context, resourceName string) (*Reader, error) {
   122  	return c.NewReaderAt(ctx, resourceName, 0)
   123  }
   124  
   125  // NewReaderAt creates a new Reader to read a resource from the given offset.
   126  func (c *Client) NewReaderAt(ctx context.Context, resourceName string, offset int64) (*Reader, error) {
   127  	// readClient is set up for Read(). ReadAt() will copy needed fields into its reentrantReader.
   128  	readClient, err := c.client.Read(ctx, &pb.ReadRequest{
   129  		ResourceName: resourceName,
   130  		ReadOffset:   offset,
   131  	}, c.options...)
   132  	if err != nil {
   133  		return nil, err
   134  	}
   135  
   136  	return &Reader{
   137  		ctx:          ctx,
   138  		c:            c,
   139  		resourceName: resourceName,
   140  		readClient:   readClient,
   141  	}, nil
   142  }
   143  
   144  // Writer writes to a byte stream.
   145  type Writer struct {
   146  	ctx          context.Context
   147  	writeClient  pb.ByteStream_WriteClient
   148  	resourceName string
   149  	offset       int64
   150  	err          error
   151  }
   152  
   153  // ResourceName gets the resource name this Writer is writing.
   154  func (w *Writer) ResourceName() string {
   155  	return w.resourceName
   156  }
   157  
   158  // Write implements io.Writer.
   159  func (w *Writer) Write(p []byte) (int, error) {
   160  	if w.err != nil {
   161  		return 0, w.err
   162  	}
   163  
   164  	n := 0
   165  	for n < len(p) {
   166  		bufSize := len(p) - n
   167  		if bufSize > MaxBufSize {
   168  			bufSize = MaxBufSize
   169  		}
   170  		r := pb.WriteRequest{
   171  			WriteOffset: w.offset,
   172  			FinishWrite: false,
   173  			Data:        p[n : n+bufSize],
   174  		}
   175  		// Bytestream only requires the resourceName to be sent in the first WriteRequest.
   176  		if w.offset == 0 {
   177  			r.ResourceName = w.resourceName
   178  		}
   179  		err := w.writeClient.Send(&r)
   180  		if err != nil {
   181  			w.err = err
   182  			return n, err
   183  		}
   184  		w.offset += int64(bufSize)
   185  		n += bufSize
   186  	}
   187  	return n, nil
   188  }
   189  
   190  // Close implements io.Closer. It is the caller's responsibility to call Close() when writing is done.
   191  func (w *Writer) Close() error {
   192  	err := w.writeClient.Send(&pb.WriteRequest{
   193  		ResourceName: w.resourceName,
   194  		WriteOffset:  w.offset,
   195  		FinishWrite:  true,
   196  		Data:         nil,
   197  	})
   198  	if err != nil {
   199  		w.err = err
   200  		return fmt.Errorf("Send(WriteRequest< FinishWrite >) failed: %v", err)
   201  	}
   202  	resp, err := w.writeClient.CloseAndRecv()
   203  	if err != nil {
   204  		w.err = err
   205  		return fmt.Errorf("CloseAndRecv: %v", err)
   206  	}
   207  	if resp == nil {
   208  		err = fmt.Errorf("expected a response on close, got %v", resp)
   209  	} else if resp.CommittedSize != w.offset {
   210  		err = fmt.Errorf("server only wrote %d bytes, want %d", resp.CommittedSize, w.offset)
   211  	}
   212  	w.err = err
   213  	return err
   214  }
   215  
   216  // NewWriter creates a new Writer to write a resource.
   217  //
   218  // resourceName specifies the name of the resource.
   219  // The resource will be available after Close has been called.
   220  //
   221  // It is the caller's responsibility to call Close when writing is done.
   222  //
   223  // TODO: There is currently no way to resume a write. Maybe NewWriter should begin with a call to QueryWriteStatus.
   224  func (c *Client) NewWriter(ctx context.Context, resourceName string) (*Writer, error) {
   225  	wc, err := c.client.Write(ctx, c.options...)
   226  	if err != nil {
   227  		return nil, err
   228  	}
   229  	return &Writer{
   230  		ctx:          ctx,
   231  		writeClient:  wc,
   232  		resourceName: resourceName,
   233  	}, nil
   234  }
   235  
   236  // Close closes the connection to the API service. The user should invoke this when
   237  // the client is no longer required.
   238  func (c *Client) Close() {
   239  	c.conn.Close()
   240  }
   241  

View as plain text