...

Source file src/github.com/Microsoft/hcsshim/internal/queue/queue_test.go

Documentation: github.com/Microsoft/hcsshim/internal/queue

     1  package queue
     2  
     3  import (
     4  	"fmt"
     5  	"sync"
     6  	"testing"
     7  	"time"
     8  )
     9  
    10  func TestEnqueueDequeue(t *testing.T) {
    11  	q := NewMessageQueue()
    12  
    13  	vals := []int{1, 2, 3, 4, 5}
    14  	for _, val := range vals {
    15  		// Enqueue vals to the queue and read later.
    16  		if err := q.Enqueue(val); err != nil {
    17  			t.Fatal(err)
    18  		}
    19  	}
    20  
    21  	for _, val := range vals {
    22  		// Dequeueing from an empty queue should block forever until a write occurs.
    23  		qVal, err := q.Dequeue()
    24  		if err != nil {
    25  			t.Fatal(err)
    26  		}
    27  
    28  		if qVal != val {
    29  			t.Fatalf("expected %d, got: %d", val, qVal)
    30  		}
    31  	}
    32  }
    33  
    34  func TestEnqueueDequeueClose(t *testing.T) {
    35  	q := NewMessageQueue()
    36  
    37  	vals := []int{1, 2, 3}
    38  	go func() {
    39  		for _, val := range vals {
    40  			_ = q.Enqueue(val)
    41  		}
    42  	}()
    43  
    44  	read := 0
    45  	for {
    46  		if _, err := q.Dequeue(); err == nil {
    47  			read++
    48  			if read == len(vals) {
    49  				// Close after we've read all of our values, then on the next
    50  				// go around make sure we get ErrClosed()
    51  				q.Close()
    52  			}
    53  			continue
    54  		} else if err != ErrQueueClosed {
    55  			t.Fatalf("expected to receive ErrQueueClosed, instead got: %s", err)
    56  		}
    57  		break
    58  	}
    59  }
    60  
    61  func TestMultipleReaders(t *testing.T) {
    62  	q := NewMessageQueue()
    63  	errChan := make(chan error)
    64  	done := make(chan struct{})
    65  	go func() {
    66  		for i := 0; i < 50; i++ {
    67  			if err := q.Enqueue(1); err != nil {
    68  				errChan <- err
    69  			}
    70  		}
    71  	}()
    72  
    73  	wg := sync.WaitGroup{}
    74  	wg.Add(2)
    75  
    76  	// Reader 1
    77  	go func() {
    78  		for i := 0; i < 25; i++ {
    79  			if _, err := q.Dequeue(); err != nil {
    80  				errChan <- err
    81  			}
    82  		}
    83  		wg.Done()
    84  	}()
    85  
    86  	// Reader 2
    87  	go func() {
    88  		for i := 0; i < 25; i++ {
    89  			if _, err := q.Dequeue(); err != nil {
    90  				errChan <- err
    91  			}
    92  		}
    93  		wg.Done()
    94  	}()
    95  
    96  	go func() {
    97  		wg.Wait()
    98  		done <- struct{}{}
    99  	}()
   100  
   101  	select {
   102  	case err := <-errChan:
   103  		t.Fatalf("failed in read or write: %s", err)
   104  	case <-done:
   105  	case <-time.After(time.Second * 20):
   106  		t.Fatalf("timeout exceeded waiting for reads to complete")
   107  	}
   108  }
   109  
   110  func TestMultipleReadersClose(t *testing.T) {
   111  	q := NewMessageQueue()
   112  	errChan := make(chan error)
   113  	done := make(chan struct{})
   114  
   115  	wg := sync.WaitGroup{}
   116  	wg.Add(2)
   117  
   118  	// Reader 1
   119  	go func() {
   120  		if _, err := q.Dequeue(); err != ErrQueueClosed {
   121  			errChan <- err
   122  		}
   123  		wg.Done()
   124  	}()
   125  
   126  	// Reader 2
   127  	go func() {
   128  		if _, err := q.Dequeue(); err != ErrQueueClosed {
   129  			errChan <- err
   130  		}
   131  		wg.Done()
   132  	}()
   133  
   134  	go func() {
   135  		wg.Wait()
   136  		done <- struct{}{}
   137  	}()
   138  
   139  	time.Sleep(time.Second * 2)
   140  	// Close the queue and this should signal both readers to return ErrQueueClosed.
   141  	q.Close()
   142  
   143  	select {
   144  	case err := <-errChan:
   145  		t.Fatalf("failed in read or write: %s", err)
   146  	case <-done:
   147  	case <-time.After(time.Second * 20):
   148  		t.Fatalf("timeout exceeded waiting for reads to complete")
   149  	}
   150  }
   151  
   152  func TestDequeueBlock(t *testing.T) {
   153  	q := NewMessageQueue()
   154  	errChan := make(chan error)
   155  	testVal := 1
   156  
   157  	go func() {
   158  		// Intentionally dequeue right away with no elements so we know we actually block on
   159  		// no elements.
   160  		val, err := q.Dequeue()
   161  		if err != nil {
   162  			errChan <- err
   163  		}
   164  		if val != testVal {
   165  			errChan <- fmt.Errorf("expected %d, but got %d", testVal, val)
   166  		}
   167  		close(errChan)
   168  	}()
   169  
   170  	// Ensure dequeue has started
   171  	time.Sleep(time.Second * 3)
   172  	if err := q.Enqueue(testVal); err != nil {
   173  		t.Fatal(err)
   174  	}
   175  
   176  	select {
   177  	case err := <-errChan:
   178  		if err != nil {
   179  			t.Fatal(err)
   180  		}
   181  	case <-time.After(10 * time.Second):
   182  		// Closing the queue will finish the Dequeue go routine.
   183  		q.Close()
   184  		t.Fatal("timeout waiting for Dequeue go routine to complete")
   185  	}
   186  }
   187  

View as plain text