...

Source file src/cloud.google.com/go/pubsub/flow_controller_test.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2017 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"sync/atomic"
    22  	"testing"
    23  	"time"
    24  
    25  	"golang.org/x/sync/errgroup"
    26  )
    27  
    28  func fcSettings(c int, s int, l LimitExceededBehavior) FlowControlSettings {
    29  	return FlowControlSettings{
    30  		MaxOutstandingMessages: c,
    31  		MaxOutstandingBytes:    s,
    32  		LimitExceededBehavior:  l,
    33  	}
    34  }
    35  
    36  func TestFlowControllerCancel(t *testing.T) {
    37  	// Test canceling a flow controller's context.
    38  	t.Parallel()
    39  	fc := newFlowController(fcSettings(3, 10, FlowControlBlock))
    40  	if err := fc.acquire(context.Background(), 5); err != nil {
    41  		t.Fatal(err)
    42  	}
    43  	// Experiment: a context that times out should always return an error.
    44  	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Millisecond)
    45  	defer cancel()
    46  	if err := fc.acquire(ctx, 6); err != context.DeadlineExceeded {
    47  		t.Fatalf("got %v, expected DeadlineExceeded", err)
    48  	}
    49  	// Control: a context that is not done should always return nil.
    50  	go func() {
    51  		time.Sleep(5 * time.Millisecond)
    52  		fc.release(ctx, 5)
    53  	}()
    54  	if err := fc.acquire(context.Background(), 6); err != nil {
    55  		t.Errorf("got %v, expected nil", err)
    56  	}
    57  }
    58  
    59  func TestFlowControllerLargeRequest(t *testing.T) {
    60  	// Large requests succeed, consuming the entire allotment.
    61  	t.Parallel()
    62  	fc := newFlowController(fcSettings(3, 10, FlowControlBlock))
    63  	err := fc.acquire(context.Background(), 11)
    64  	if err != nil {
    65  		t.Fatal(err)
    66  	}
    67  }
    68  
    69  func TestFlowControllerNoStarve(t *testing.T) {
    70  	// A large request won't starve, because the flowController is
    71  	// (best-effort) FIFO.
    72  	t.Parallel()
    73  	ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
    74  	defer cancel()
    75  	fc := newFlowController(fcSettings(10, 10, FlowControlBlock))
    76  	first := make(chan int)
    77  	for i := 0; i < 20; i++ {
    78  		go func() {
    79  			for {
    80  				if err := fc.acquire(ctx, 1); err != nil {
    81  					if err != context.Canceled {
    82  						t.Error(err)
    83  					}
    84  					return
    85  				}
    86  				select {
    87  				case first <- 1:
    88  				default:
    89  				}
    90  				fc.release(ctx, 1)
    91  			}
    92  		}()
    93  	}
    94  	<-first // Wait until the flowController's state is non-zero.
    95  	if err := fc.acquire(ctx, 11); err != nil {
    96  		t.Errorf("got %v, want nil", err)
    97  	}
    98  }
    99  
   100  func TestFlowControllerSaturation(t *testing.T) {
   101  	t.Parallel()
   102  	const (
   103  		maxCount = 6
   104  		maxSize  = 10
   105  	)
   106  	for _, test := range []struct {
   107  		acquireSize         int
   108  		wantCount, wantSize int64
   109  	}{
   110  		{
   111  			// Many small acquires cause the flow controller to reach its max count.
   112  			acquireSize: 1,
   113  			wantCount:   6,
   114  			wantSize:    6,
   115  		},
   116  		{
   117  			// Five acquires of size 2 will cause the flow controller to reach its max size,
   118  			// but not its max count.
   119  			acquireSize: 2,
   120  			wantCount:   5,
   121  			wantSize:    10,
   122  		},
   123  		{
   124  			// If the requests are the right size (relatively prime to maxSize),
   125  			// the flow controller will not saturate on size. (In this case, not on count either.)
   126  			acquireSize: 3,
   127  			wantCount:   3,
   128  			wantSize:    9,
   129  		},
   130  	} {
   131  		fc := newFlowController(fcSettings(maxCount, maxSize, FlowControlBlock))
   132  		// Atomically track flow controller state.
   133  		// The flowController itself tracks count.
   134  		var curSize int64
   135  		success := errors.New("")
   136  		// Time out if wantSize or wantCount is never reached.
   137  		ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
   138  		defer cancel()
   139  		g, ctx := errgroup.WithContext(ctx)
   140  		for i := 0; i < 10; i++ {
   141  			g.Go(func() error {
   142  				var hitCount, hitSize bool
   143  				// Run at least until we hit the expected values, and at least
   144  				// for enough iterations to exceed them if the flow controller
   145  				// is broken.
   146  				for i := 0; i < 100 || !hitCount || !hitSize; i++ {
   147  					select {
   148  					case <-ctx.Done():
   149  						return ctx.Err()
   150  					default:
   151  					}
   152  					if err := fc.acquire(ctx, test.acquireSize); err != nil {
   153  						return err
   154  					}
   155  					c := int64(fc.count())
   156  					if c > test.wantCount {
   157  						return fmt.Errorf("count %d exceeds want %d", c, test.wantCount)
   158  					}
   159  					if c == test.wantCount {
   160  						hitCount = true
   161  					}
   162  					s := atomic.AddInt64(&curSize, int64(test.acquireSize))
   163  					if s > test.wantSize {
   164  						return fmt.Errorf("size %d exceeds want %d", s, test.wantSize)
   165  					}
   166  					if s == test.wantSize {
   167  						hitSize = true
   168  					}
   169  					time.Sleep(5 * time.Millisecond) // Let other goroutines make progress.
   170  					if atomic.AddInt64(&curSize, -int64(test.acquireSize)) < 0 {
   171  						return errors.New("negative size")
   172  					}
   173  					fc.release(ctx, test.acquireSize)
   174  				}
   175  				return success
   176  			})
   177  		}
   178  		if err := g.Wait(); err != success {
   179  			t.Errorf("%+v: %v", test, err)
   180  			continue
   181  		}
   182  	}
   183  }
   184  
   185  func TestFlowControllerUnboundedCount(t *testing.T) {
   186  	t.Parallel()
   187  	ctx := context.Background()
   188  	fc := newFlowController(fcSettings(0, 10, FlowControlSignalError))
   189  
   190  	// Successfully acquire 4 bytes.
   191  	if err := fc.acquire(ctx, 4); err != nil {
   192  		t.Errorf("got %v, wanted no error", err)
   193  	}
   194  
   195  	// Successfully acquire 4 bytes.
   196  	if err := fc.acquire(ctx, 4); err != nil {
   197  		t.Errorf("got %v, wanted no error", err)
   198  	}
   199  
   200  	// Fail to acquire 3 bytes.
   201  	if err := fc.acquire(ctx, 3); err == nil {
   202  		t.Errorf("got nil, wanted %v", ErrFlowControllerMaxOutstandingBytes)
   203  	}
   204  }
   205  
   206  func TestFlowControllerUnboundedCount2(t *testing.T) {
   207  	t.Parallel()
   208  	ctx := context.Background()
   209  	fc := newFlowController(fcSettings(0, 0, FlowControlSignalError))
   210  	// Successfully acquire 4 bytes.
   211  	if err := fc.acquire(ctx, 4); err != nil {
   212  		t.Errorf("got %v, wanted no error", err)
   213  	}
   214  	fc.release(ctx, 1)
   215  	fc.release(ctx, 1)
   216  	fc.release(ctx, 1)
   217  	wantCount := int64(0)
   218  	c := int64(fc.count())
   219  	if c != wantCount {
   220  		t.Fatalf("got count %d, want %d", c, wantCount)
   221  	}
   222  }
   223  
   224  func TestFlowControllerUnboundedBytes(t *testing.T) {
   225  	t.Parallel()
   226  	ctx := context.Background()
   227  	fc := newFlowController(fcSettings(2, 0, FlowControlSignalError))
   228  
   229  	// Successfully acquire 4GB.
   230  	if err := fc.acquire(ctx, 4e9); err != nil {
   231  		t.Errorf("got %v, wanted no error", err)
   232  	}
   233  
   234  	// Successfully acquired 4GB bytes.
   235  	if err := fc.acquire(ctx, 4e9); err != nil {
   236  		t.Errorf("got %v, wanted no error", err)
   237  	}
   238  
   239  	// Fail to acquire a third message.
   240  	if err := fc.acquire(ctx, 3); err == nil {
   241  		t.Errorf("got nil, wanted %v", ErrFlowControllerMaxOutstandingMessages)
   242  	}
   243  }
   244  

View as plain text