...

Source file src/google.golang.org/api/transport/bytestream/client_test.go

Documentation: google.golang.org/api/transport/bytestream

     1  // Copyright 2017 Google LLC.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package bytestream
     6  
     7  import (
     8  	"bytes"
     9  	"context"
    10  	"errors"
    11  	"fmt"
    12  	"io"
    13  	"log"
    14  	"net"
    15  	"testing"
    16  
    17  	"google.golang.org/api/transport/bytestream/internal"
    18  	"google.golang.org/grpc"
    19  	"google.golang.org/grpc/metadata"
    20  
    21  	pb "google.golang.org/genproto/googleapis/bytestream"
    22  )
    23  
    24  const testData = "0123456789"
    25  
    26  // A grpcServer is an in-process gRPC server, listening on a system-chosen port on
    27  // the local loopback interface. Servers are for testing only and are not
    28  // intended to be used in production code.
    29  // (Copied from "cloud.google.com/internal/testutil/server_test.go")
    30  //
    31  // To create a server, make a new grpcServer, register your handlers, then call
    32  // Start:
    33  //
    34  //	srv, err := NewServer()
    35  //	...
    36  //	mypb.RegisterMyServiceServer(srv.Gsrv, &myHandler)
    37  //	....
    38  //	srv.Start()
    39  //
    40  // Clients should connect to the server with no security:
    41  //
    42  //	conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
    43  //	...
    44  type grpcServer struct {
    45  	Addr string
    46  	l    net.Listener
    47  	Gsrv *grpc.Server
    48  }
    49  
    50  type TestSetup struct {
    51  	ctx     context.Context
    52  	rpcTest *grpcServer
    53  	server  *internal.Server
    54  	client  *Client
    55  }
    56  
    57  func TestClientRead(t *testing.T) {
    58  	testCases := []struct {
    59  		name         string
    60  		input        string
    61  		resourceName string
    62  		extraBufSize int
    63  		extraError   error
    64  		want         string
    65  		wantErr      bool
    66  		wantEOF      bool
    67  	}{
    68  		{
    69  			name:         "test foo",
    70  			input:        testData,
    71  			resourceName: "foo",
    72  			want:         testData,
    73  		}, {
    74  			name:         "test bar",
    75  			input:        testData,
    76  			resourceName: "bar",
    77  			want:         testData,
    78  		}, {
    79  			name:         "test bar extraBufSize=1",
    80  			input:        testData,
    81  			resourceName: "bar",
    82  			extraBufSize: 1,
    83  			want:         testData,
    84  			wantEOF:      true,
    85  		}, {
    86  			name:         "test bar extraBufSize=2",
    87  			input:        testData,
    88  			resourceName: "bar",
    89  			extraBufSize: 2,
    90  			want:         testData,
    91  			wantEOF:      true,
    92  		}, {
    93  			name:         "empty resource name",
    94  			input:        testData,
    95  			resourceName: "",
    96  			extraBufSize: 1,
    97  			wantErr:      true,
    98  		}, {
    99  			name:         "read after error returns error again",
   100  			input:        testData,
   101  			resourceName: "does not matter",
   102  			extraBufSize: 1,
   103  			extraError:   errors.New("some error"),
   104  			wantErr:      true,
   105  		},
   106  	}
   107  
   108  	for _, tc := range testCases {
   109  		bufSize := len(tc.want) + tc.extraBufSize
   110  		if bufSize == 0 {
   111  			t.Errorf("%s: This is probably wrong. Read returning 0 bytes?", tc.name)
   112  			continue
   113  		}
   114  
   115  		setup := newTestSetup(tc.input)
   116  		r, err := setup.client.NewReader(setup.ctx, tc.resourceName)
   117  		if err != nil {
   118  			t.Errorf("%s: NewReader(%q): %v", tc.name, tc.resourceName, err)
   119  			continue
   120  		}
   121  		if tc.extraError != nil {
   122  			r.err = tc.extraError
   123  		}
   124  		buf := make([]byte, bufSize)
   125  		gotEOF := false
   126  		total := 0
   127  		for total < bufSize && err == nil {
   128  			var n int
   129  			n, err = r.Read(buf[total:])
   130  			total += n
   131  		}
   132  		if err == io.EOF {
   133  			gotEOF = true
   134  			err = nil
   135  			doubleCheckBuf := make([]byte, bufSize)
   136  			n2, err2 := r.Read(doubleCheckBuf)
   137  			if err2 != io.EOF {
   138  				t.Errorf("%s: read and got EOF, double-check: read %d bytes got err=%v", tc.name, n2, err2)
   139  				continue
   140  			}
   141  		}
   142  		setup.Close()
   143  
   144  		if gotErr := err != nil; tc.wantErr != gotErr {
   145  			t.Errorf("%s: read %d bytes, got err=%v, wantErr=%t", tc.name, total, err, tc.wantErr)
   146  			continue
   147  		}
   148  		if tc.wantEOF != gotEOF {
   149  			t.Errorf("%s: read %d bytes, gotEOF=%t, wantEOF=%t", tc.name, total, gotEOF, tc.wantEOF)
   150  			continue
   151  		}
   152  		if got := string(buf[:total]); got != tc.want {
   153  			t.Errorf("%s: read %q, want %q", tc.name, got, tc.want)
   154  			continue
   155  		}
   156  	}
   157  }
   158  
   159  func TestClientWrite(t *testing.T) {
   160  	testCases := []struct {
   161  		name         string
   162  		resourceName string
   163  		data         string
   164  		results      []int
   165  		wantWriteErr bool
   166  		wantCloseErr bool
   167  	}{
   168  		{
   169  			name:         "test foo",
   170  			resourceName: "foo",
   171  			data:         testData,
   172  			results:      []int{len(testData)},
   173  		}, {
   174  			name:         "empty resource name",
   175  			resourceName: "",
   176  			data:         testData,
   177  			results:      []int{10},
   178  			//wantWriteErr: true,
   179  			wantCloseErr: true,
   180  		}, {
   181  			name:         "test bar",
   182  			resourceName: "bar",
   183  			data:         testData,
   184  			results:      []int{len(testData)},
   185  		},
   186  	}
   187  
   188  	var setup *TestSetup
   189  
   190  tcFor:
   191  	for _, tc := range testCases {
   192  		if setup != nil {
   193  			setup.Close()
   194  		}
   195  		setup = newTestSetup("")
   196  		buf := []byte(tc.data)
   197  		var ofs int
   198  		w, err := setup.client.NewWriter(setup.ctx, tc.resourceName)
   199  		if err != nil {
   200  			t.Errorf("%s: NewWriter(): %v", tc.name, err)
   201  			continue
   202  		}
   203  
   204  		for i := 0; i < len(tc.results); i++ {
   205  			if ofs >= len(tc.data) {
   206  				t.Errorf("%s [%d]: Attempting to write more than tc.input: ofs=%d len(buf)=%d",
   207  					tc.name, i, ofs, len(tc.data))
   208  				continue tcFor
   209  			}
   210  			n, err := w.Write(buf[ofs:])
   211  			ofs += n
   212  			if gotErr := err != nil; gotErr != tc.wantWriteErr {
   213  				t.Errorf("%s [%d]: Write() got n=%d err=%v, wantWriteErr=%t", tc.name, i, n, err, tc.wantWriteErr)
   214  				continue tcFor
   215  			} else if tc.wantWriteErr && i+1 < len(tc.results) {
   216  				t.Errorf("%s: wantWriteErr and got err after %d results, len(results)=%d is too long.", tc.name, i+1, len(tc.results))
   217  				continue tcFor
   218  			}
   219  			if n != tc.results[i] {
   220  				t.Errorf("%s [%d]: Write() wrote %d bytes, want %d bytes", tc.name, i, n, tc.results[i])
   221  				continue tcFor
   222  			}
   223  		}
   224  
   225  		err = w.Close()
   226  		if gotErr := err != nil; gotErr != tc.wantCloseErr {
   227  			t.Errorf("%s: Close() got err=%v, wantCloseErr=%t", tc.name, err, tc.wantCloseErr)
   228  			continue tcFor
   229  		}
   230  	}
   231  	setup.Close()
   232  }
   233  
   234  func TestClientRead_AfterSetupClose(t *testing.T) {
   235  	setup := newTestSetup("closed")
   236  	setup.Close()
   237  	_, err := setup.client.NewReader(setup.ctx, "should fail")
   238  	if err == nil {
   239  		t.Errorf("NewReader(%q): err=%v", "should fail", err)
   240  	}
   241  }
   242  
   243  func TestClientWrite_AfterSetupClose(t *testing.T) {
   244  	setup := newTestSetup("closed")
   245  	setup.Close()
   246  	_, err := setup.client.NewWriter(setup.ctx, "should fail")
   247  	if err == nil {
   248  		t.Fatalf("NewWriter(%q): err=%v", "should fail", err)
   249  	}
   250  }
   251  
   252  type UnsendableWriteClient struct {
   253  	closeAndRecvWriteResponse *pb.WriteResponse
   254  	closeAndRecvError         error
   255  }
   256  
   257  func (w *UnsendableWriteClient) Send(*pb.WriteRequest) error {
   258  	if w.closeAndRecvError != nil {
   259  		return nil
   260  	}
   261  	return errors.New("UnsendableWriteClient.Send() fails unless closeAndRecvError is set")
   262  }
   263  
   264  func (w *UnsendableWriteClient) CloseAndRecv() (*pb.WriteResponse, error) {
   265  	if w.closeAndRecvError == nil {
   266  		log.Fatalf("UnsendableWriteClient.Close() when closeAndRecvError == nil.")
   267  	}
   268  	return w.closeAndRecvWriteResponse, w.closeAndRecvError
   269  }
   270  
   271  func (w *UnsendableWriteClient) Context() context.Context {
   272  	log.Fatalf("UnsendableWriteClient.Context() should never be called")
   273  	return context.Background()
   274  }
   275  func (w *UnsendableWriteClient) CloseSend() error {
   276  	return errors.New("UnsendableWriteClient.CloseSend() should never be called")
   277  }
   278  func (w *UnsendableWriteClient) Header() (metadata.MD, error) {
   279  	log.Fatalf("UnsendableWriteClient.Header() should never be called")
   280  	return metadata.MD{}, nil
   281  }
   282  func (w *UnsendableWriteClient) Trailer() metadata.MD {
   283  	log.Fatalf("UnsendableWriteClient.Trailer() should never be called")
   284  	return metadata.MD{}
   285  }
   286  func (w *UnsendableWriteClient) SendMsg(m interface{}) error {
   287  	log.Fatalf("UnsendableWriteClient.SendMsg() should never be called")
   288  	return nil
   289  }
   290  func (w *UnsendableWriteClient) RecvMsg(m interface{}) error {
   291  	log.Fatalf("UnsendableWriteClient.RecvMsg() should never be called")
   292  	return nil
   293  }
   294  
   295  func TestClientWrite_WriteFails(t *testing.T) {
   296  	setup := newTestSetup("")
   297  	w, err := setup.client.NewWriter(setup.ctx, "")
   298  	if err != nil {
   299  		t.Fatalf("NewWriter(): %v", err)
   300  	}
   301  	defer setup.Close()
   302  	w.writeClient = &UnsendableWriteClient{}
   303  	_, err = w.Write([]byte(testData))
   304  	if err == nil {
   305  		t.Errorf("Write() should fail")
   306  	}
   307  }
   308  
   309  func TestClientWrite_CloseAndRecvFails(t *testing.T) {
   310  	setup := newTestSetup("")
   311  	w, err := setup.client.NewWriter(setup.ctx, "CloseAndRecvFails")
   312  	if err != nil {
   313  		t.Fatalf("NewWriter(): %v", err)
   314  	}
   315  	defer setup.Close()
   316  	n, err := w.Write([]byte(testData))
   317  	if err != nil {
   318  		t.Errorf("Write() failed: %v", err)
   319  		return
   320  	}
   321  	if n != len(testData) {
   322  		t.Errorf("Write() got n=%d, want n=%d", n, len(testData))
   323  		return
   324  	}
   325  	w.writeClient = &UnsendableWriteClient{
   326  		closeAndRecvError: errors.New("CloseAndRecv() must fail"),
   327  	}
   328  	if err = w.Close(); err == nil {
   329  		t.Errorf("Close() should fail")
   330  		return
   331  	}
   332  }
   333  
   334  type TestWriteHandler struct {
   335  	buf  bytes.Buffer // bytes.Buffer implements io.Writer
   336  	name string       // This service can handle one name only.
   337  }
   338  
   339  func (w *TestWriteHandler) GetWriter(ctx context.Context, name string, initOffset int64) (io.Writer, error) {
   340  	if w.name == "" {
   341  		w.name = name
   342  	} else if w.name != name {
   343  		return nil, fmt.Errorf("writer already has name=%q, now a new name=%q confuses me", w.name, name)
   344  	}
   345  	// initOffset is ignored.
   346  	return &w.buf, nil
   347  }
   348  
   349  func (w *TestWriteHandler) Close(ctx context.Context, name string) error {
   350  	w.name = ""
   351  	w.buf.Reset()
   352  	return nil
   353  }
   354  
   355  type TestReadHandler struct {
   356  	buf  string
   357  	name string // This service can handle one name only.
   358  }
   359  
   360  // GetWriter() returns an io.ReaderAt to accept reads from the given name.
   361  func (r *TestReadHandler) GetReader(ctx context.Context, name string) (io.ReaderAt, error) {
   362  	if r.name == "" {
   363  		r.name = name
   364  	} else if r.name != name {
   365  		return nil, fmt.Errorf("reader already has name=%q, now a new name=%q confuses me", r.name, name)
   366  	}
   367  	return bytes.NewReader([]byte(r.buf)), nil
   368  }
   369  
   370  // Close does nothing.
   371  func (r *TestReadHandler) Close(ctx context.Context, name string) error {
   372  	return nil
   373  }
   374  
   375  // newGRPCServer creates a new grpcServer. The grpcServer will be listening for gRPC connections
   376  // at the address named by the Addr field, without TLS.
   377  func newGRPCServer() (*grpcServer, error) {
   378  	l, err := net.Listen("tcp", "127.0.0.1:0")
   379  	if err != nil {
   380  		return nil, err
   381  	}
   382  	s := &grpcServer{
   383  		Addr: l.Addr().String(),
   384  		l:    l,
   385  		Gsrv: grpc.NewServer(),
   386  	}
   387  	return s, nil
   388  }
   389  
   390  // Start causes the server to start accepting incoming connections.
   391  // Call Start after registering handlers.
   392  func (s *grpcServer) Start() {
   393  	go s.Gsrv.Serve(s.l)
   394  }
   395  
   396  // Close shuts down the server.
   397  func (s *grpcServer) Close() {
   398  	s.Gsrv.Stop()
   399  	s.l.Close()
   400  }
   401  
   402  func newTestSetup(input string) *TestSetup {
   403  	testSetup := &TestSetup{
   404  		ctx: context.Background(),
   405  	}
   406  	testReadHandler := &TestReadHandler{
   407  		buf: input,
   408  	}
   409  	var err error
   410  	if testSetup.rpcTest, err = newGRPCServer(); err != nil {
   411  		log.Fatalf("newGRPCServer: %v", err)
   412  	}
   413  	if testSetup.server, err = internal.NewServer(testSetup.rpcTest.Gsrv, testReadHandler, &TestWriteHandler{}); err != nil {
   414  		log.Fatalf("internal.NewServer: %v", err)
   415  	}
   416  	testSetup.rpcTest.Start()
   417  
   418  	conn, err := grpc.Dial(testSetup.rpcTest.Addr, grpc.WithInsecure())
   419  	if err != nil {
   420  		log.Fatalf("grpc.Dial: %v", err)
   421  	}
   422  	testSetup.client = NewClient(conn, grpc.FailFast(true))
   423  	return testSetup
   424  }
   425  
   426  func (testSetup *TestSetup) Close() {
   427  	testSetup.client.Close()
   428  	testSetup.rpcTest.Close()
   429  }
   430  

View as plain text