...

Source file src/github.com/letsencrypt/boulder/semaphore/semaphore_test.go

Documentation: github.com/letsencrypt/boulder/semaphore

     1  // Copyright 2017 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package semaphore_test
     6  
     7  import (
     8  	"context"
     9  	"math/rand"
    10  	"runtime"
    11  	"sync"
    12  	"testing"
    13  	"time"
    14  
    15  	"github.com/letsencrypt/boulder/semaphore"
    16  	"golang.org/x/sync/errgroup"
    17  )
    18  
    19  const maxSleep = 1 * time.Millisecond
    20  
    21  func HammerWeighted(sem *semaphore.Weighted, n int64, loops int) {
    22  	for i := 0; i < loops; i++ {
    23  		_ = sem.Acquire(context.Background(), n)
    24  		time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond)
    25  		sem.Release(n)
    26  	}
    27  }
    28  
    29  func TestWeighted(t *testing.T) {
    30  	t.Parallel()
    31  
    32  	n := runtime.GOMAXPROCS(0)
    33  	loops := 10000 / n
    34  	sem := semaphore.NewWeighted(int64(n), 0)
    35  	var wg sync.WaitGroup
    36  	wg.Add(n)
    37  	for i := 0; i < n; i++ {
    38  		i := i
    39  		go func() {
    40  			defer wg.Done()
    41  			HammerWeighted(sem, int64(i), loops)
    42  		}()
    43  	}
    44  	wg.Wait()
    45  }
    46  
    47  func TestWeightedPanic(t *testing.T) {
    48  	t.Parallel()
    49  
    50  	defer func() {
    51  		if recover() == nil {
    52  			t.Fatal("release of an unacquired weighted semaphore did not panic")
    53  		}
    54  	}()
    55  	w := semaphore.NewWeighted(1, 0)
    56  	w.Release(1)
    57  }
    58  
    59  func TestWeightedTryAcquire(t *testing.T) {
    60  	t.Parallel()
    61  
    62  	ctx := context.Background()
    63  	sem := semaphore.NewWeighted(2, 0)
    64  	tries := []bool{}
    65  	_ = sem.Acquire(ctx, 1)
    66  	tries = append(tries, sem.TryAcquire(1))
    67  	tries = append(tries, sem.TryAcquire(1))
    68  
    69  	sem.Release(2)
    70  
    71  	tries = append(tries, sem.TryAcquire(1))
    72  	_ = sem.Acquire(ctx, 1)
    73  	tries = append(tries, sem.TryAcquire(1))
    74  
    75  	want := []bool{true, false, true, false}
    76  	for i := range tries {
    77  		if tries[i] != want[i] {
    78  			t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
    79  		}
    80  	}
    81  }
    82  
    83  func TestWeightedAcquire(t *testing.T) {
    84  	t.Parallel()
    85  
    86  	ctx := context.Background()
    87  	sem := semaphore.NewWeighted(2, 0)
    88  	tryAcquire := func(n int64) bool {
    89  		ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
    90  		defer cancel()
    91  		return sem.Acquire(ctx, n) == nil
    92  	}
    93  
    94  	tries := []bool{}
    95  	_ = sem.Acquire(ctx, 1)
    96  	tries = append(tries, tryAcquire(1))
    97  	tries = append(tries, tryAcquire(1))
    98  
    99  	sem.Release(2)
   100  
   101  	tries = append(tries, tryAcquire(1))
   102  	_ = sem.Acquire(ctx, 1)
   103  	tries = append(tries, tryAcquire(1))
   104  
   105  	want := []bool{true, false, true, false}
   106  	for i := range tries {
   107  		if tries[i] != want[i] {
   108  			t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
   109  		}
   110  	}
   111  }
   112  
   113  func TestWeightedDoesntBlockIfTooBig(t *testing.T) {
   114  	t.Parallel()
   115  
   116  	const n = 2
   117  	sem := semaphore.NewWeighted(n, 0)
   118  	{
   119  		ctx, cancel := context.WithCancel(context.Background())
   120  		defer cancel()
   121  		go func() {
   122  			_ = sem.Acquire(ctx, n+1)
   123  		}()
   124  	}
   125  
   126  	g, ctx := errgroup.WithContext(context.Background())
   127  	for i := n * 3; i > 0; i-- {
   128  		g.Go(func() error {
   129  			err := sem.Acquire(ctx, 1)
   130  			if err == nil {
   131  				time.Sleep(1 * time.Millisecond)
   132  				sem.Release(1)
   133  			}
   134  			return err
   135  		})
   136  	}
   137  	if err := g.Wait(); err != nil {
   138  		t.Errorf("semaphore.NewWeighted(%v, 0) failed to AcquireCtx(_, 1) with AcquireCtx(_, %v) pending", n, n+1)
   139  	}
   140  }
   141  
   142  // TestLargeAcquireDoesntStarve times out if a large call to Acquire starves.
   143  // Merely returning from the test function indicates success.
   144  func TestLargeAcquireDoesntStarve(t *testing.T) {
   145  	t.Parallel()
   146  
   147  	ctx := context.Background()
   148  	n := int64(runtime.GOMAXPROCS(0))
   149  	sem := semaphore.NewWeighted(n, 0)
   150  	running := true
   151  
   152  	var wg sync.WaitGroup
   153  	wg.Add(int(n))
   154  	for i := n; i > 0; i-- {
   155  		_ = sem.Acquire(ctx, 1)
   156  		go func() {
   157  			defer func() {
   158  				sem.Release(1)
   159  				wg.Done()
   160  			}()
   161  			for running {
   162  				time.Sleep(1 * time.Millisecond)
   163  				sem.Release(1)
   164  				_ = sem.Acquire(ctx, 1)
   165  			}
   166  		}()
   167  	}
   168  
   169  	_ = sem.Acquire(ctx, n)
   170  	running = false
   171  	sem.Release(n)
   172  	wg.Wait()
   173  }
   174  
   175  // translated from https://github.com/zhiqiangxu/util/blob/master/mutex/crwmutex_test.go#L43
   176  func TestAllocCancelDoesntStarve(t *testing.T) {
   177  	sem := semaphore.NewWeighted(10, 0)
   178  
   179  	// Block off a portion of the semaphore so that Acquire(_, 10) can eventually succeed.
   180  	_ = sem.Acquire(context.Background(), 1)
   181  
   182  	// In the background, Acquire(_, 10).
   183  	ctx, cancel := context.WithCancel(context.Background())
   184  	defer cancel()
   185  	go func() {
   186  		_ = sem.Acquire(ctx, 10)
   187  	}()
   188  
   189  	// Wait until the Acquire(_, 10) call blocks.
   190  	for sem.TryAcquire(1) {
   191  		sem.Release(1)
   192  		runtime.Gosched()
   193  	}
   194  
   195  	// Now try to grab a read lock, and simultaneously unblock the Acquire(_, 10) call.
   196  	// Both Acquire calls should unblock and return, in either order.
   197  	go cancel()
   198  
   199  	err := sem.Acquire(context.Background(), 1)
   200  	if err != nil {
   201  		t.Fatalf("Acquire(_, 1) failed unexpectedly: %v", err)
   202  	}
   203  	sem.Release(1)
   204  }
   205  
   206  func TestMaxWaiters(t *testing.T) {
   207  	ctx, cancel := context.WithCancel(context.Background())
   208  	defer cancel()
   209  	sem := semaphore.NewWeighted(1, 10)
   210  	_ = sem.Acquire(ctx, 1)
   211  
   212  	for i := 0; i < 10; i++ {
   213  		go func() {
   214  			_ = sem.Acquire(ctx, 1)
   215  			<-ctx.Done()
   216  		}()
   217  	}
   218  
   219  	// Since the goroutines that act as waiters are intended to block in
   220  	// sem.Acquire, there's no principled wait to trigger here once they're
   221  	// blocked. Instead, loop until we reach the expected number of waiters.
   222  	for sem.NumWaiters() < 10 {
   223  		time.Sleep(10 * time.Millisecond)
   224  	}
   225  	err := sem.Acquire(ctx, 1)
   226  	if err != semaphore.ErrMaxWaiters {
   227  		t.Errorf("expected error when maxWaiters was reached, but got %#v", err)
   228  	}
   229  }
   230  

View as plain text