...

Source file src/github.com/datawire/ambassador/v2/cmd/entrypoint/internal/testqueue/queue.go

Documentation: github.com/datawire/ambassador/v2/cmd/entrypoint/internal/testqueue

     1  package testqueue
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"strings"
     8  	"sync"
     9  	"testing"
    10  	"time"
    11  
    12  	"github.com/stretchr/testify/assert"
    13  
    14  	"github.com/datawire/dlib/dgroup"
    15  	"github.com/datawire/dlib/dlog"
    16  )
    17  
    18  // The Queue struct implements a multi-writer/multi-reader concurrent queue where the dequeue
    19  // operation (the Get() method) takes a predicate that allows it to skip past queue entries until it
    20  // finds one that satisfies the specified predicate.
    21  type Queue struct {
    22  	timeout time.Duration
    23  	cond    *sync.Cond
    24  	entries []interface{}
    25  	offset  int
    26  }
    27  
    28  // NewQueue constructs a new queue with the supplied timeout.
    29  func NewQueue(t *testing.T, timeout time.Duration) *Queue {
    30  	q := &Queue{
    31  		timeout: timeout,
    32  		cond:    sync.NewCond(&sync.Mutex{}),
    33  	}
    34  	ctx, cancel := context.WithCancel(dlog.NewTestContext(t, true))
    35  	grp := dgroup.NewGroup(ctx, dgroup.GroupConfig{})
    36  	t.Cleanup(func() {
    37  		cancel()
    38  		assert.NoError(t, grp.Wait())
    39  	})
    40  	// Broadcast on Queue.cond every three seconds so that anyone waiting on the condition has a
    41  	// chance to timeout. (Go doesn't support timed wait on conditions.)
    42  	grp.Go("ticker", func(ctx context.Context) error {
    43  		ticker := time.NewTicker(3 * time.Second)
    44  		for {
    45  			select {
    46  			case <-ticker.C:
    47  				q.cond.Broadcast()
    48  			case <-ctx.Done():
    49  				return nil
    50  			}
    51  		}
    52  	})
    53  	return q
    54  }
    55  
    56  // Add an entry to the queue.
    57  func (q *Queue) Add(t *testing.T, obj interface{}) {
    58  	t.Helper()
    59  	q.cond.L.Lock()
    60  	defer q.cond.L.Unlock()
    61  	q.entries = append(q.entries, obj)
    62  	q.cond.Broadcast()
    63  }
    64  
    65  // Get will return the next entry that satisfies the supplied predicate.
    66  func (q *Queue) Get(t *testing.T, predicate func(interface{}) bool) (interface{}, error) {
    67  	t.Helper()
    68  	start := time.Now()
    69  	q.cond.L.Lock()
    70  	defer q.cond.L.Unlock()
    71  
    72  	for {
    73  		for idx, obj := range q.entries[q.offset:] {
    74  			if predicate(obj) {
    75  				q.offset += idx + 1
    76  				return obj, nil
    77  			}
    78  		}
    79  
    80  		if time.Since(start) > q.timeout {
    81  			msg := &strings.Builder{}
    82  			for idx, entry := range q.entries {
    83  				bytes, err := json.MarshalIndent(entry, "", "  ")
    84  				if err != nil {
    85  					return nil, err
    86  				}
    87  				var extra string
    88  				if idx < q.offset {
    89  					extra = "(Before Offset)"
    90  				} else if idx == q.offset {
    91  					extra = "(Offset Here)"
    92  				} else {
    93  					extra = "(After Offset)"
    94  				}
    95  				msg.WriteString(fmt.Sprintf("\n--- Queue Entry[%d] %s---\n%s\n", idx, extra, string(bytes)))
    96  			}
    97  
    98  			t.Fatalf("Get timed out!\n%s", msg)
    99  		}
   100  		q.cond.Wait()
   101  	}
   102  }
   103  
   104  // AssertEmpty will check that the queue remains empty for the supplied duration.
   105  func (q *Queue) AssertEmpty(t *testing.T, timeout time.Duration, msg string) {
   106  	t.Helper()
   107  	time.Sleep(timeout)
   108  	q.cond.L.Lock()
   109  	defer q.cond.L.Unlock()
   110  	assert.Empty(t, q.entries, msg)
   111  }
   112  

View as plain text