
Source file src/cloud.google.com/go/bigquery/storage/managedwriter/managed_stream_test.go

Documentation: cloud.google.com/go/bigquery/storage/managedwriter

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    15  package managedwriter
    17  import (
    18  	"context"
    19  	"errors"
    20  	"io"
    21  	"runtime"
    22  	"sync"
    23  	"testing"
    24  	"time"
    26  	"cloud.google.com/go/bigquery/storage/apiv1/storagepb"
    27  	"github.com/googleapis/gax-go/v2"
    28  	"google.golang.org/grpc/codes"
    29  	"google.golang.org/grpc/status"
    30  	"google.golang.org/protobuf/types/descriptorpb"
    31  )
    33  type testRecvResponse struct {
    34  	resp *storagepb.AppendRowsResponse
    35  	err  error
    36  }
    38  type testAppendRowsClient struct {
    39  	storagepb.BigQueryWrite_AppendRowsClient
    40  	openCount int
    41  	requests  []*storagepb.AppendRowsRequest
    42  	responses []*testRecvResponse
    43  	sendF     func(*storagepb.AppendRowsRequest) error
    44  	recvF     func() (*storagepb.AppendRowsResponse, error)
    45  	closeF    func() error
    46  }
    48  func (tarc *testAppendRowsClient) Send(req *storagepb.AppendRowsRequest) error {
    49  	return tarc.sendF(req)
    50  }
    52  func (tarc *testAppendRowsClient) Recv() (*storagepb.AppendRowsResponse, error) {
    53  	return tarc.recvF()
    54  }
    56  func (tarc *testAppendRowsClient) CloseSend() error {
    57  	return tarc.closeF()
    58  }
    60  // openTestArc handles wiring in a test AppendRowsClient into a managedstream by providing the open function.
    61  func openTestArc(testARC *testAppendRowsClient, sendF func(req *storagepb.AppendRowsRequest) error, recvF func() (*storagepb.AppendRowsResponse, error)) func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
    62  	sF := func(req *storagepb.AppendRowsRequest) error {
    63  		testARC.requests = append(testARC.requests, req)
    64  		return nil
    65  	}
    66  	if sendF != nil {
    67  		sF = sendF
    68  	}
    69  	rF := func() (*storagepb.AppendRowsResponse, error) {
    70  		return &storagepb.AppendRowsResponse{
    71  			Response: &storagepb.AppendRowsResponse_AppendResult_{},
    72  		}, nil
    73  	}
    74  	if recvF != nil {
    75  		rF = recvF
    76  	}
    77  	testARC.sendF = sF
    78  	testARC.recvF = rF
    79  	testARC.closeF = func() error {
    80  		return nil
    81  	}
    82  	return func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
    83  		testARC.openCount = testARC.openCount + 1
    84  		// Simulate grpc finalizer goroutine
    85  		go func() {
    86  			<-ctx.Done()
    87  		}()
    88  		return testARC, nil
    89  	}
    90  }
    92  func TestManagedStream_RequestOptimization(t *testing.T) {
    94  	ctx := context.Background()
    95  	testARC := &testAppendRowsClient{}
    96  	pool := &connectionPool{
    97  		ctx:                ctx,
    98  		open:               openTestArc(testARC, nil, nil),
    99  		baseFlowController: newFlowController(0, 0),
   100  	}
   101  	if err := pool.activateRouter(newSimpleRouter("")); err != nil {
   102  		t.Errorf("activateRouter: %v", err)
   103  	}
   104  	ms := &ManagedStream{
   105  		id:             "foo",
   106  		ctx:            ctx,
   107  		streamSettings: defaultStreamSettings(),
   108  	}
   109  	if err := pool.addWriter(ms); err != nil {
   110  		t.Errorf("addWriter: %v", err)
   111  	}
   112  	ms.streamSettings.streamID = "FOO"
   113  	ms.streamSettings.TraceID = "TRACE"
   114  	ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
   116  	fakeData := [][]byte{
   117  		[]byte("foo"),
   118  		[]byte("bar"),
   119  	}
   121  	wantReqs := 3
   123  	for i := 0; i < wantReqs; i++ {
   124  		_, err := ms.AppendRows(ctx, fakeData, WithOffset(int64(i)))
   125  		if err != nil {
   126  			t.Errorf("AppendRows; %v", err)
   127  		}
   128  	}
   130  	if testARC.openCount != 1 {
   131  		t.Errorf("expected a single open, got %d", testARC.openCount)
   132  	}
   134  	if len(testARC.requests) != wantReqs {
   135  		t.Errorf("expected %d requests, got %d", wantReqs, len(testARC.requests))
   136  	}
   138  	for k, v := range testARC.requests {
   139  		if v == nil {
   140  			t.Errorf("request %d was nil", k)
   141  		}
   142  		if v.GetOffset() == nil {
   143  			t.Errorf("request %d had no offset", k)
   144  		} else {
   145  			gotOffset := v.GetOffset().GetValue()
   146  			if gotOffset != int64(k) {
   147  				t.Errorf("request %d wanted offset %d, got %d", k, k, gotOffset)
   148  			}
   149  		}
   150  		if k == 0 {
   151  			if v.GetTraceId() == "" {
   152  				t.Errorf("expected TraceId on first request, was empty")
   153  			}
   154  			if v.GetWriteStream() == "" {
   155  				t.Errorf("expected WriteStream on first request, was empty")
   156  			}
   157  			if v.GetProtoRows().GetWriterSchema().GetProtoDescriptor() == nil {
   158  				t.Errorf("expected WriterSchema on first request, was empty")
   159  			}
   161  		} else {
   162  			// TODO: add validation to ensure we're optimizing requests on the wire.
   163  			// Sending consecutive requests with same dest/schema we should redact.
   164  		}
   165  	}
   166  }
   168  func TestManagedStream_FlowControllerFailure(t *testing.T) {
   170  	ctx := context.Background()
   172  	pool := &connectionPool{
   173  		ctx:                ctx,
   174  		open:               openTestArc(&testAppendRowsClient{}, nil, nil),
   175  		baseFlowController: newFlowController(1, 0),
   176  	}
   177  	router := newSimpleRouter("")
   178  	if err := pool.activateRouter(router); err != nil {
   179  		t.Errorf("activateRouter: %v", err)
   180  	}
   182  	ms := &ManagedStream{
   183  		id:             "foo",
   184  		ctx:            ctx,
   185  		streamSettings: defaultStreamSettings(),
   186  	}
   187  	if err := pool.addWriter(ms); err != nil {
   188  		t.Errorf("addWritre: %v", err)
   189  	}
   191  	// Exhaust inflight requests on the single connection.
   192  	router.conn.fc = newFlowController(1, 0)
   193  	router.conn.fc.acquire(ctx, 0)
   195  	ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
   197  	fakeData := [][]byte{
   198  		[]byte("foo"),
   199  		[]byte("bar"),
   200  	}
   202  	// Create a context that will expire during the append.
   203  	// This is expected to surface a flowcontroller error, as there's no
   204  	// capacity.
   205  	expireCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
   206  	defer cancel()
   207  	_, err := ms.AppendRows(expireCtx, fakeData)
   208  	if err == nil {
   209  		t.Errorf("expected AppendRows to error, but it succeeded")
   210  	}
   211  }
   213  func TestManagedStream_AppendWithDeadline(t *testing.T) {
   214  	ctx := context.Background()
   216  	pool := &connectionPool{
   217  		ctx:                ctx,
   218  		baseFlowController: newFlowController(0, 0),
   219  		open: openTestArc(&testAppendRowsClient{},
   220  			func(req *storagepb.AppendRowsRequest) error {
   221  				// Append is intentionally slow.
   222  				time.Sleep(200 * time.Millisecond)
   223  				return nil
   224  			}, nil),
   225  	}
   226  	router := newSimpleRouter("")
   227  	if err := pool.activateRouter(router); err != nil {
   228  		t.Errorf("activateRouter: %v", err)
   229  	}
   231  	ms := &ManagedStream{
   232  		id:             "foo",
   233  		ctx:            ctx,
   234  		streamSettings: defaultStreamSettings(),
   235  	}
   236  	if err := pool.addWriter(ms); err != nil {
   237  		t.Errorf("addWriter: %v", err)
   238  	}
   239  	conn := router.conn
   240  	ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
   242  	fakeData := [][]byte{
   243  		[]byte("foo"),
   244  	}
   246  	wantCount := 0
   247  	if ct := conn.fc.count(); ct != wantCount {
   248  		t.Errorf("flowcontroller count mismatch, got %d want %d", ct, wantCount)
   249  	}
   251  	// Create a context that will expire during the append, to verify the passed in
   252  	// context expires.
   253  	expireCtx, cancel := context.WithTimeout(ctx, 100*time.Millisecond)
   254  	defer cancel()
   256  	_, err := ms.AppendRows(expireCtx, fakeData)
   257  	if err == nil {
   258  		t.Errorf("expected AppendRows to error, but it succeeded")
   259  	}
   261  	// We expect the flowcontroller count to still be occupied, as the Send is slow.
   262  	wantCount = 1
   263  	if ct := conn.fc.count(); ct != wantCount {
   264  		t.Errorf("flowcontroller post-append count mismatch, got %d want %d", ct, wantCount)
   265  	}
   267  	// Wait for the append to finish, then check again.
   268  	time.Sleep(300 * time.Millisecond)
   269  	wantCount = 0
   270  	if ct := conn.fc.count(); ct != wantCount {
   271  		t.Errorf("flowcontroller post-append count mismatch, got %d want %d", ct, wantCount)
   272  	}
   273  }
   275  func TestManagedStream_ContextExpiry(t *testing.T) {
   276  	// Issue: retaining error from append as stream error
   277  	// https://github.com/googleapis/google-cloud-go/issues/6657
   278  	ctx := context.Background()
   280  	pool := &connectionPool{
   281  		ctx:                ctx,
   282  		baseFlowController: newFlowController(0, 0),
   283  		open: openTestArc(&testAppendRowsClient{},
   284  			func(req *storagepb.AppendRowsRequest) error {
   285  				return nil
   286  			}, nil),
   287  	}
   288  	if err := pool.activateRouter(newSimpleRouter("")); err != nil {
   289  		t.Errorf("activateRouter: %v", err)
   290  	}
   292  	ms := &ManagedStream{
   293  		id:             "foo",
   294  		ctx:            ctx,
   295  		streamSettings: defaultStreamSettings(),
   296  	}
   297  	ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
   298  	if err := pool.addWriter(ms); err != nil {
   299  		t.Errorf("addWriter: %v", err)
   300  	}
   302  	fakeData := [][]byte{
   303  		[]byte("foo"),
   304  	}
   305  	fakeReq := &storagepb.AppendRowsRequest{
   306  		Rows: &storagepb.AppendRowsRequest_ProtoRows{
   307  			ProtoRows: &storagepb.AppendRowsRequest_ProtoData{
   308  				Rows: &storagepb.ProtoRows{
   309  					SerializedRows: fakeData,
   310  				},
   311  			},
   312  		},
   313  	}
   315  	// Create a context and immediately cancel it.
   316  	cancelCtx, cancel := context.WithCancel(ctx)
   317  	cancel()
   319  	// First, append with an invalid context.
   320  	pw := newPendingWrite(cancelCtx, ms, fakeReq, ms.curTemplate, "", "")
   321  	err := ms.appendWithRetry(pw)
   322  	if err != context.Canceled {
   323  		t.Errorf("expected cancelled context error, got: %v", err)
   324  	}
   326  	// a second append with a valid context should succeed
   327  	_, err = ms.AppendRows(ctx, fakeData)
   328  	if err != nil {
   329  		t.Errorf("expected second append to succeed, but failed: %v", err)
   330  	}
   331  }
   333  func TestManagedStream_AppendDeadlocks(t *testing.T) {
   334  	// Ensure we don't deadlock by issing two appends.
   335  	testCases := []struct {
   336  		desc       string
   337  		openErrors []error
   338  		ctx        context.Context
   339  		respErr    error
   340  	}{
   341  		{
   342  			desc:       "no errors",
   343  			openErrors: []error{nil, nil},
   344  			ctx:        context.Background(),
   345  			respErr:    nil,
   346  		},
   347  		{
   348  			desc:       "cancelled caller context",
   349  			openErrors: []error{nil, nil},
   350  			ctx: func() context.Context {
   351  				cctx, cancel := context.WithCancel(context.Background())
   352  				cancel()
   353  				return cctx
   354  			}(),
   355  			respErr: context.Canceled,
   356  		},
   357  		{
   358  			desc:       "expired caller context",
   359  			openErrors: []error{nil, nil},
   360  			ctx: func() context.Context {
   361  				cctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
   362  				defer cancel()
   363  				time.Sleep(2 * time.Millisecond)
   364  				return cctx
   365  			}(),
   366  			respErr: context.DeadlineExceeded,
   367  		},
   368  		{
   369  			desc:       "errored getstream",
   370  			openErrors: []error{status.Errorf(codes.ResourceExhausted, "some error"), status.Errorf(codes.ResourceExhausted, "some error")},
   371  			ctx:        context.Background(),
   372  			respErr:    status.Errorf(codes.ResourceExhausted, "some error"),
   373  		},
   374  	}
   376  	for _, tc := range testCases {
   377  		ctx := context.Background()
   378  		openF := openTestArc(&testAppendRowsClient{}, nil, nil)
   379  		pool := &connectionPool{
   380  			ctx: ctx,
   381  			open: func(ctx context.Context, opts ...gax.CallOption) (storagepb.BigQueryWrite_AppendRowsClient, error) {
   382  				if len(tc.openErrors) == 0 {
   383  					panic("out of open errors")
   384  				}
   385  				curErr := tc.openErrors[0]
   386  				tc.openErrors = tc.openErrors[1:]
   387  				if curErr == nil {
   388  					return openF(ctx, opts...)
   389  				}
   390  				return nil, curErr
   391  			},
   392  		}
   393  		router := newSimpleRouter("")
   394  		if err := pool.activateRouter(router); err != nil {
   395  			t.Errorf("activateRouter: %v", err)
   396  		}
   397  		ms := &ManagedStream{
   398  			id: "foo",
   399  			streamSettings: &streamSettings{
   400  				streamID: "foo",
   401  			},
   402  		}
   403  		ms.ctx, ms.cancel = context.WithCancel(pool.ctx)
   404  		if err := pool.addWriter(ms); err != nil {
   405  			t.Errorf("addWriter: %v", err)
   406  		}
   408  		testReq := ms.buildRequest([][]byte{[]byte("foo")})
   409  		// first append
   410  		pw := newPendingWrite(tc.ctx, ms, testReq, nil, "", "")
   411  		gotErr := ms.appendWithRetry(pw)
   412  		if !errors.Is(gotErr, tc.respErr) {
   413  			t.Errorf("%s first response: got %v, want %v", tc.desc, gotErr, tc.respErr)
   414  		}
   415  		// second append
   416  		pw = newPendingWrite(tc.ctx, ms, testReq, nil, "", "")
   417  		gotErr = ms.appendWithRetry(pw)
   418  		if !errors.Is(gotErr, tc.respErr) {
   419  			t.Errorf("%s second response: got %v, want %v", tc.desc, gotErr, tc.respErr)
   420  		}
   422  		// Issue two closes, to ensure we're not deadlocking there either.
   423  		ms.Close()
   424  		ms.Close()
   426  		// Issue two more appends, ensure we're not deadlocked as the writer is closed.
   427  		gotErr = ms.appendWithRetry(pw)
   428  		if !errors.Is(gotErr, io.EOF) {
   429  			t.Errorf("expected io.EOF, got %v", gotErr)
   430  		}
   431  		gotErr = ms.appendWithRetry(pw)
   432  		if !errors.Is(gotErr, io.EOF) {
   433  			t.Errorf("expected io.EOF, got %v", gotErr)
   434  		}
   436  	}
   438  }
   440  func TestManagedStream_LeakingGoroutines(t *testing.T) {
   441  	ctx := context.Background()
   443  	pool := &connectionPool{
   444  		ctx: ctx,
   445  		open: openTestArc(&testAppendRowsClient{},
   446  			func(req *storagepb.AppendRowsRequest) error {
   447  				// Append is intentionally slower than context to cause pressure.
   448  				time.Sleep(40 * time.Millisecond)
   449  				return nil
   450  			}, nil),
   451  		baseFlowController: newFlowController(10, 0),
   452  	}
   453  	if err := pool.activateRouter(newSimpleRouter("")); err != nil {
   454  		t.Errorf("activateRouter: %v", err)
   455  	}
   456  	ms := &ManagedStream{
   457  		id:             "foo",
   458  		ctx:            ctx,
   459  		streamSettings: defaultStreamSettings(),
   460  	}
   461  	ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
   462  	if err := pool.addWriter(ms); err != nil {
   463  		t.Errorf("addWriter: %v", err)
   464  	}
   466  	fakeData := [][]byte{
   467  		[]byte("foo"),
   468  	}
   470  	threshold := runtime.NumGoroutine() + 20
   472  	// Send a bunch of appends that expire quicker than response, and monitor that
   473  	// goroutine growth stays within bounded threshold.
   474  	for i := 0; i < 250; i++ {
   475  		expireCtx, cancel := context.WithTimeout(ctx, 25*time.Millisecond)
   476  		defer cancel()
   477  		ms.AppendRows(expireCtx, fakeData)
   478  		if i%50 == 0 {
   479  			if current := runtime.NumGoroutine(); current > threshold {
   480  				t.Errorf("potential goroutine leak, append %d: current %d, threshold %d", i, current, threshold)
   481  			}
   482  		}
   483  	}
   484  }
   486  func TestManagedStream_LeakingGoroutinesReconnect(t *testing.T) {
   487  	ctx := context.Background()
   489  	reqCount := 0
   490  	testArc := &testAppendRowsClient{}
   491  	pool := &connectionPool{
   492  		ctx: ctx,
   493  		open: openTestArc(testArc,
   494  			func(req *storagepb.AppendRowsRequest) error {
   495  				reqCount++
   496  				if reqCount%2 == 1 {
   497  					return io.EOF
   498  				}
   499  				return nil
   500  			}, nil),
   501  		baseFlowController: newFlowController(1000, 0),
   502  	}
   503  	if err := pool.activateRouter(newSimpleRouter("")); err != nil {
   504  		t.Errorf("activateRouter: %v", err)
   505  	}
   506  	ms := &ManagedStream{
   507  		id:             "foo",
   508  		ctx:            ctx,
   509  		streamSettings: defaultStreamSettings(),
   510  		retry:          newStatelessRetryer(),
   511  	}
   512  	ms.retry.maxAttempts = 4
   513  	ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
   514  	if err := pool.addWriter(ms); err != nil {
   515  		t.Errorf("addWriter: %v", err)
   516  	}
   518  	fakeData := [][]byte{
   519  		[]byte("foo"),
   520  	}
   522  	threshold := runtime.NumGoroutine() + 5
   524  	// Send a bunch of appends that will trigger reconnects and monitor that
   525  	// goroutine growth stays within bounded threshold.
   526  	for i := 0; i < 30; i++ {
   527  		writeCtx := context.Background()
   528  		r, err := ms.AppendRows(writeCtx, fakeData)
   529  		if err != nil {
   530  			t.Fatalf("failed to append row: %v", err)
   531  		}
   532  		_, err = r.GetResult(context.Background())
   533  		if err != nil {
   534  			t.Fatalf("failed to get result: %v", err)
   535  		}
   536  		if r.totalAttempts != 2 {
   537  			t.Fatalf("should trigger a retry, but found: %d attempts", r.totalAttempts)
   538  		}
   539  		if testArc.openCount != i+2 {
   540  			t.Errorf("should trigger a reconnect, but found openCount %d", testArc.openCount)
   541  		}
   542  		if i%10 == 0 {
   543  			if current := runtime.NumGoroutine(); current > threshold {
   544  				t.Errorf("potential goroutine leak, append %d: current %d, threshold %d", i, current, threshold)
   545  			}
   546  		}
   547  	}
   548  }
   550  func TestManagedWriter_CancellationDuringRetry(t *testing.T) {
   551  	// Issue: double close of pending write.
   552  	// https://github.com/googleapis/google-cloud-go/issues/7380
   553  	ctx, cancel := context.WithCancel(context.Background())
   554  	pool := &connectionPool{
   555  		ctx: ctx,
   556  		open: openTestArc(&testAppendRowsClient{},
   557  			func(req *storagepb.AppendRowsRequest) error {
   558  				// Append doesn't error, but is slow.
   559  				time.Sleep(time.Second)
   560  				return nil
   561  			},
   562  			func() (*storagepb.AppendRowsResponse, error) {
   563  				// Response is slow and always returns a retriable error.
   564  				time.Sleep(2 * time.Second)
   565  				return nil, io.EOF
   566  			}),
   567  		baseFlowController: newFlowController(10, 0),
   568  	}
   569  	if err := pool.activateRouter(newSimpleRouter("")); err != nil {
   570  		t.Errorf("activateRouter: %v", err)
   571  	}
   573  	ms := &ManagedStream{
   574  		id:             "foo",
   575  		ctx:            ctx,
   576  		streamSettings: defaultStreamSettings(),
   577  		retry:          newStatelessRetryer(),
   578  	}
   579  	ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
   580  	if err := pool.addWriter(ms); err != nil {
   581  		t.Errorf("addWriter: %v", err)
   582  	}
   584  	fakeData := [][]byte{
   585  		[]byte("foo"),
   586  	}
   588  	res, err := ms.AppendRows(context.Background(), fakeData)
   589  	if err != nil {
   590  		t.Errorf("AppendRows send err: %v", err)
   591  	}
   592  	cancel()
   594  	select {
   596  	case <-res.Ready():
   597  		if _, err := res.GetResult(context.Background()); err == nil {
   598  			t.Errorf("expected failure, got success")
   599  		}
   601  	case <-time.After(5 * time.Second):
   602  		t.Errorf("result was not ready in expected time")
   603  	}
   604  }
   606  func TestManagedStream_Closure(t *testing.T) {
   607  	ctx, cancel := context.WithCancel(context.Background())
   609  	pool := &connectionPool{
   610  		ctx:                ctx,
   611  		cancel:             cancel,
   612  		baseFlowController: newFlowController(0, 0),
   613  		open: openTestArc(&testAppendRowsClient{},
   614  			func(req *storagepb.AppendRowsRequest) error {
   615  				return nil
   616  			}, nil),
   617  	}
   618  	router := newSimpleRouter("")
   619  	if err := pool.activateRouter(router); err != nil {
   620  		t.Errorf("activateRouter: %v", err)
   621  	}
   623  	ms := &ManagedStream{
   624  		id:             "foo",
   625  		streamSettings: defaultStreamSettings(),
   626  	}
   627  	ms.ctx, ms.cancel = context.WithCancel(pool.ctx)
   628  	ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
   629  	if err := pool.addWriter(ms); err != nil {
   630  		t.Errorf("addWriter A: %v", err)
   631  	}
   633  	if router.conn == nil {
   634  		t.Errorf("expected non-nil connection")
   635  	}
   637  	if err := ms.Close(); err != io.EOF {
   638  		t.Errorf("msB.Close, want %v got %v", io.EOF, err)
   639  	}
   640  	if router.conn != nil {
   641  		t.Errorf("expected nil connection")
   642  	}
   643  	if ms.ctx.Err() == nil {
   644  		t.Errorf("expected writer ctx to be dead, is alive")
   645  	}
   646  }
   648  // This test exists to try to surface data races by sharing
   649  // a single writer with multiple goroutines.  It doesn't assert
   650  // anything about the behavior of the system.
   651  func TestManagedStream_RaceFinder(t *testing.T) {
   652  	ctx, cancel := context.WithCancel(context.Background())
   654  	var totalsMu sync.Mutex
   655  	totalSends := 0
   656  	totalRecvs := 0
   657  	pool := &connectionPool{
   658  		ctx:                ctx,
   659  		cancel:             cancel,
   660  		baseFlowController: newFlowController(0, 0),
   661  		open: openTestArc(&testAppendRowsClient{},
   662  			func(req *storagepb.AppendRowsRequest) error {
   663  				totalsMu.Lock()
   664  				totalSends = totalSends + 1
   665  				curSends := totalSends
   666  				totalsMu.Unlock()
   667  				if curSends%25 == 0 {
   668  					//time.Sleep(10 * time.Millisecond)
   669  					return io.EOF
   670  				}
   671  				return nil
   672  			},
   673  			func() (*storagepb.AppendRowsResponse, error) {
   674  				totalsMu.Lock()
   675  				totalRecvs = totalRecvs + 1
   676  				curRecvs := totalRecvs
   677  				totalsMu.Unlock()
   678  				if curRecvs%15 == 0 {
   679  					return nil, io.EOF
   680  				}
   681  				return &storagepb.AppendRowsResponse{}, nil
   682  			}),
   683  	}
   684  	router := newSimpleRouter("")
   685  	if err := pool.activateRouter(router); err != nil {
   686  		t.Errorf("activateRouter: %v", err)
   687  	}
   689  	ms := &ManagedStream{
   690  		id:             "foo",
   691  		streamSettings: defaultStreamSettings(),
   692  		retry:          newStatelessRetryer(),
   693  	}
   694  	ms.retry.maxAttempts = 4
   695  	ms.ctx, ms.cancel = context.WithCancel(pool.ctx)
   696  	ms.curTemplate = newVersionedTemplate().revise(reviseProtoSchema(&descriptorpb.DescriptorProto{}))
   697  	if err := pool.addWriter(ms); err != nil {
   698  		t.Errorf("addWriter A: %v", err)
   699  	}
   701  	if router.conn == nil {
   702  		t.Errorf("expected non-nil connection")
   703  	}
   705  	numWriters := 5
   706  	numWrites := 15
   708  	var wg sync.WaitGroup
   709  	wg.Add(numWriters)
   710  	for i := 0; i < numWriters; i++ {
   711  		go func() {
   712  			for j := 0; j < numWrites; j++ {
   713  				result, err := ms.AppendRows(ctx, [][]byte{[]byte("foo")})
   714  				if err != nil {
   715  					continue
   716  				}
   717  				_, err = result.GetResult(ctx)
   718  				if err != nil {
   719  					continue
   720  				}
   721  			}
   722  			wg.Done()
   723  		}()
   724  	}
   725  	wg.Wait()
   726  	cancel()
   727  }

View as plain text