...

Source file src/golang.org/x/sync/semaphore/semaphore_test.go

Documentation: golang.org/x/sync/semaphore

     1  // Copyright 2017 The Go Authors. All rights reserved.
     2  // Use of this source code is governed by a BSD-style
     3  // license that can be found in the LICENSE file.
     4  
     5  package semaphore_test
     6  
     7  import (
     8  	"context"
     9  	"math/rand"
    10  	"runtime"
    11  	"sync"
    12  	"testing"
    13  	"time"
    14  
    15  	"golang.org/x/sync/errgroup"
    16  	"golang.org/x/sync/semaphore"
    17  )
    18  
    19  const maxSleep = 1 * time.Millisecond
    20  
    21  func HammerWeighted(sem *semaphore.Weighted, n int64, loops int) {
    22  	for i := 0; i < loops; i++ {
    23  		sem.Acquire(context.Background(), n)
    24  		time.Sleep(time.Duration(rand.Int63n(int64(maxSleep/time.Nanosecond))) * time.Nanosecond)
    25  		sem.Release(n)
    26  	}
    27  }
    28  
    29  func TestWeighted(t *testing.T) {
    30  	t.Parallel()
    31  
    32  	n := runtime.GOMAXPROCS(0)
    33  	loops := 10000 / n
    34  	sem := semaphore.NewWeighted(int64(n))
    35  	var wg sync.WaitGroup
    36  	wg.Add(n)
    37  	for i := 0; i < n; i++ {
    38  		i := i
    39  		go func() {
    40  			defer wg.Done()
    41  			HammerWeighted(sem, int64(i), loops)
    42  		}()
    43  	}
    44  	wg.Wait()
    45  }
    46  
    47  func TestWeightedPanic(t *testing.T) {
    48  	t.Parallel()
    49  
    50  	defer func() {
    51  		if recover() == nil {
    52  			t.Fatal("release of an unacquired weighted semaphore did not panic")
    53  		}
    54  	}()
    55  	w := semaphore.NewWeighted(1)
    56  	w.Release(1)
    57  }
    58  
    59  func TestWeightedTryAcquire(t *testing.T) {
    60  	t.Parallel()
    61  
    62  	ctx := context.Background()
    63  	sem := semaphore.NewWeighted(2)
    64  	tries := []bool{}
    65  	sem.Acquire(ctx, 1)
    66  	tries = append(tries, sem.TryAcquire(1))
    67  	tries = append(tries, sem.TryAcquire(1))
    68  
    69  	sem.Release(2)
    70  
    71  	tries = append(tries, sem.TryAcquire(1))
    72  	sem.Acquire(ctx, 1)
    73  	tries = append(tries, sem.TryAcquire(1))
    74  
    75  	want := []bool{true, false, true, false}
    76  	for i := range tries {
    77  		if tries[i] != want[i] {
    78  			t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
    79  		}
    80  	}
    81  }
    82  
    83  func TestWeightedAcquire(t *testing.T) {
    84  	t.Parallel()
    85  
    86  	ctx := context.Background()
    87  	sem := semaphore.NewWeighted(2)
    88  	tryAcquire := func(n int64) bool {
    89  		ctx, cancel := context.WithTimeout(ctx, 10*time.Millisecond)
    90  		defer cancel()
    91  		return sem.Acquire(ctx, n) == nil
    92  	}
    93  
    94  	tries := []bool{}
    95  	sem.Acquire(ctx, 1)
    96  	tries = append(tries, tryAcquire(1))
    97  	tries = append(tries, tryAcquire(1))
    98  
    99  	sem.Release(2)
   100  
   101  	tries = append(tries, tryAcquire(1))
   102  	sem.Acquire(ctx, 1)
   103  	tries = append(tries, tryAcquire(1))
   104  
   105  	want := []bool{true, false, true, false}
   106  	for i := range tries {
   107  		if tries[i] != want[i] {
   108  			t.Errorf("tries[%d]: got %t, want %t", i, tries[i], want[i])
   109  		}
   110  	}
   111  }
   112  
   113  func TestWeightedDoesntBlockIfTooBig(t *testing.T) {
   114  	t.Parallel()
   115  
   116  	const n = 2
   117  	sem := semaphore.NewWeighted(n)
   118  	{
   119  		ctx, cancel := context.WithCancel(context.Background())
   120  		defer cancel()
   121  		go sem.Acquire(ctx, n+1)
   122  	}
   123  
   124  	g, ctx := errgroup.WithContext(context.Background())
   125  	for i := n * 3; i > 0; i-- {
   126  		g.Go(func() error {
   127  			err := sem.Acquire(ctx, 1)
   128  			if err == nil {
   129  				time.Sleep(1 * time.Millisecond)
   130  				sem.Release(1)
   131  			}
   132  			return err
   133  		})
   134  	}
   135  	if err := g.Wait(); err != nil {
   136  		t.Errorf("semaphore.NewWeighted(%v) failed to AcquireCtx(_, 1) with AcquireCtx(_, %v) pending", n, n+1)
   137  	}
   138  }
   139  
   140  // TestLargeAcquireDoesntStarve times out if a large call to Acquire starves.
   141  // Merely returning from the test function indicates success.
   142  func TestLargeAcquireDoesntStarve(t *testing.T) {
   143  	t.Parallel()
   144  
   145  	ctx := context.Background()
   146  	n := int64(runtime.GOMAXPROCS(0))
   147  	sem := semaphore.NewWeighted(n)
   148  	running := true
   149  
   150  	var wg sync.WaitGroup
   151  	wg.Add(int(n))
   152  	for i := n; i > 0; i-- {
   153  		sem.Acquire(ctx, 1)
   154  		go func() {
   155  			defer func() {
   156  				sem.Release(1)
   157  				wg.Done()
   158  			}()
   159  			for running {
   160  				time.Sleep(1 * time.Millisecond)
   161  				sem.Release(1)
   162  				sem.Acquire(ctx, 1)
   163  			}
   164  		}()
   165  	}
   166  
   167  	sem.Acquire(ctx, n)
   168  	running = false
   169  	sem.Release(n)
   170  	wg.Wait()
   171  }
   172  
   173  // translated from https://github.com/zhiqiangxu/util/blob/master/mutex/crwmutex_test.go#L43
   174  func TestAllocCancelDoesntStarve(t *testing.T) {
   175  	sem := semaphore.NewWeighted(10)
   176  
   177  	// Block off a portion of the semaphore so that Acquire(_, 10) can eventually succeed.
   178  	sem.Acquire(context.Background(), 1)
   179  
   180  	// In the background, Acquire(_, 10).
   181  	ctx, cancel := context.WithCancel(context.Background())
   182  	defer cancel()
   183  	go func() {
   184  		sem.Acquire(ctx, 10)
   185  	}()
   186  
   187  	// Wait until the Acquire(_, 10) call blocks.
   188  	for sem.TryAcquire(1) {
   189  		sem.Release(1)
   190  		runtime.Gosched()
   191  	}
   192  
   193  	// Now try to grab a read lock, and simultaneously unblock the Acquire(_, 10) call.
   194  	// Both Acquire calls should unblock and return, in either order.
   195  	go cancel()
   196  
   197  	err := sem.Acquire(context.Background(), 1)
   198  	if err != nil {
   199  		t.Fatalf("Acquire(_, 1) failed unexpectedly: %v", err)
   200  	}
   201  	sem.Release(1)
   202  }
   203  
   204  func TestWeightedAcquireCanceled(t *testing.T) {
   205  	// https://go.dev/issue/63615
   206  	sem := semaphore.NewWeighted(2)
   207  	ctx, cancel := context.WithCancel(context.Background())
   208  	sem.Acquire(context.Background(), 1)
   209  	ch := make(chan struct{})
   210  	go func() {
   211  		// Synchronize with the Acquire(2) below.
   212  		for sem.TryAcquire(1) {
   213  			sem.Release(1)
   214  		}
   215  		// Now cancel ctx, and then release the token.
   216  		cancel()
   217  		sem.Release(1)
   218  		close(ch)
   219  	}()
   220  	// Since the context closing happens before enough tokens become available,
   221  	// this Acquire must fail.
   222  	if err := sem.Acquire(ctx, 2); err != context.Canceled {
   223  		t.Errorf("Acquire with canceled context returned wrong error: want context.Canceled, got %v", err)
   224  	}
   225  	// There must always be two tokens in the semaphore after the other
   226  	// goroutine releases the one we held at the start.
   227  	<-ch
   228  	if !sem.TryAcquire(2) {
   229  		t.Fatal("TryAcquire after canceled Acquire failed")
   230  	}
   231  	// Additionally verify that we don't acquire with a done context even when
   232  	// we wouldn't need to block to do so.
   233  	sem.Release(2)
   234  	if err := sem.Acquire(ctx, 1); err != context.Canceled {
   235  		t.Errorf("Acquire with canceled context returned wrong error: want context.Canceled, got %v", err)
   236  	}
   237  }
   238  

View as plain text