...
1 package testqueue
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "strings"
8 "sync"
9 "testing"
10 "time"
11
12 "github.com/stretchr/testify/assert"
13
14 "github.com/datawire/dlib/dgroup"
15 "github.com/datawire/dlib/dlog"
16 )
17
18
19
20
21 type Queue struct {
22 timeout time.Duration
23 cond *sync.Cond
24 entries []interface{}
25 offset int
26 }
27
28
29 func NewQueue(t *testing.T, timeout time.Duration) *Queue {
30 q := &Queue{
31 timeout: timeout,
32 cond: sync.NewCond(&sync.Mutex{}),
33 }
34 ctx, cancel := context.WithCancel(dlog.NewTestContext(t, true))
35 grp := dgroup.NewGroup(ctx, dgroup.GroupConfig{})
36 t.Cleanup(func() {
37 cancel()
38 assert.NoError(t, grp.Wait())
39 })
40
41
42 grp.Go("ticker", func(ctx context.Context) error {
43 ticker := time.NewTicker(3 * time.Second)
44 for {
45 select {
46 case <-ticker.C:
47 q.cond.Broadcast()
48 case <-ctx.Done():
49 return nil
50 }
51 }
52 })
53 return q
54 }
55
56
57 func (q *Queue) Add(t *testing.T, obj interface{}) {
58 t.Helper()
59 q.cond.L.Lock()
60 defer q.cond.L.Unlock()
61 q.entries = append(q.entries, obj)
62 q.cond.Broadcast()
63 }
64
65
66 func (q *Queue) Get(t *testing.T, predicate func(interface{}) bool) (interface{}, error) {
67 t.Helper()
68 start := time.Now()
69 q.cond.L.Lock()
70 defer q.cond.L.Unlock()
71
72 for {
73 for idx, obj := range q.entries[q.offset:] {
74 if predicate(obj) {
75 q.offset += idx + 1
76 return obj, nil
77 }
78 }
79
80 if time.Since(start) > q.timeout {
81 msg := &strings.Builder{}
82 for idx, entry := range q.entries {
83 bytes, err := json.MarshalIndent(entry, "", " ")
84 if err != nil {
85 return nil, err
86 }
87 var extra string
88 if idx < q.offset {
89 extra = "(Before Offset)"
90 } else if idx == q.offset {
91 extra = "(Offset Here)"
92 } else {
93 extra = "(After Offset)"
94 }
95 msg.WriteString(fmt.Sprintf("\n--- Queue Entry[%d] %s---\n%s\n", idx, extra, string(bytes)))
96 }
97
98 t.Fatalf("Get timed out!\n%s", msg)
99 }
100 q.cond.Wait()
101 }
102 }
103
104
105 func (q *Queue) AssertEmpty(t *testing.T, timeout time.Duration, msg string) {
106 t.Helper()
107 time.Sleep(timeout)
108 q.cond.L.Lock()
109 defer q.cond.L.Unlock()
110 assert.Empty(t, q.entries, msg)
111 }
112
View as plain text