...
1 package queue
2
3 import (
4 "fmt"
5 "sync"
6 "testing"
7 "time"
8 )
9
10 func TestEnqueueDequeue(t *testing.T) {
11 q := NewMessageQueue()
12
13 vals := []int{1, 2, 3, 4, 5}
14 for _, val := range vals {
15
16 if err := q.Enqueue(val); err != nil {
17 t.Fatal(err)
18 }
19 }
20
21 for _, val := range vals {
22
23 qVal, err := q.Dequeue()
24 if err != nil {
25 t.Fatal(err)
26 }
27
28 if qVal != val {
29 t.Fatalf("expected %d, got: %d", val, qVal)
30 }
31 }
32 }
33
34 func TestEnqueueDequeueClose(t *testing.T) {
35 q := NewMessageQueue()
36
37 vals := []int{1, 2, 3}
38 go func() {
39 for _, val := range vals {
40 _ = q.Enqueue(val)
41 }
42 }()
43
44 read := 0
45 for {
46 if _, err := q.Dequeue(); err == nil {
47 read++
48 if read == len(vals) {
49
50
51 q.Close()
52 }
53 continue
54 } else if err != ErrQueueClosed {
55 t.Fatalf("expected to receive ErrQueueClosed, instead got: %s", err)
56 }
57 break
58 }
59 }
60
61 func TestMultipleReaders(t *testing.T) {
62 q := NewMessageQueue()
63 errChan := make(chan error)
64 done := make(chan struct{})
65 go func() {
66 for i := 0; i < 50; i++ {
67 if err := q.Enqueue(1); err != nil {
68 errChan <- err
69 }
70 }
71 }()
72
73 wg := sync.WaitGroup{}
74 wg.Add(2)
75
76
77 go func() {
78 for i := 0; i < 25; i++ {
79 if _, err := q.Dequeue(); err != nil {
80 errChan <- err
81 }
82 }
83 wg.Done()
84 }()
85
86
87 go func() {
88 for i := 0; i < 25; i++ {
89 if _, err := q.Dequeue(); err != nil {
90 errChan <- err
91 }
92 }
93 wg.Done()
94 }()
95
96 go func() {
97 wg.Wait()
98 done <- struct{}{}
99 }()
100
101 select {
102 case err := <-errChan:
103 t.Fatalf("failed in read or write: %s", err)
104 case <-done:
105 case <-time.After(time.Second * 20):
106 t.Fatalf("timeout exceeded waiting for reads to complete")
107 }
108 }
109
110 func TestMultipleReadersClose(t *testing.T) {
111 q := NewMessageQueue()
112 errChan := make(chan error)
113 done := make(chan struct{})
114
115 wg := sync.WaitGroup{}
116 wg.Add(2)
117
118
119 go func() {
120 if _, err := q.Dequeue(); err != ErrQueueClosed {
121 errChan <- err
122 }
123 wg.Done()
124 }()
125
126
127 go func() {
128 if _, err := q.Dequeue(); err != ErrQueueClosed {
129 errChan <- err
130 }
131 wg.Done()
132 }()
133
134 go func() {
135 wg.Wait()
136 done <- struct{}{}
137 }()
138
139 time.Sleep(time.Second * 2)
140
141 q.Close()
142
143 select {
144 case err := <-errChan:
145 t.Fatalf("failed in read or write: %s", err)
146 case <-done:
147 case <-time.After(time.Second * 20):
148 t.Fatalf("timeout exceeded waiting for reads to complete")
149 }
150 }
151
152 func TestDequeueBlock(t *testing.T) {
153 q := NewMessageQueue()
154 errChan := make(chan error)
155 testVal := 1
156
157 go func() {
158
159
160 val, err := q.Dequeue()
161 if err != nil {
162 errChan <- err
163 }
164 if val != testVal {
165 errChan <- fmt.Errorf("expected %d, but got %d", testVal, val)
166 }
167 close(errChan)
168 }()
169
170
171 time.Sleep(time.Second * 3)
172 if err := q.Enqueue(testVal); err != nil {
173 t.Fatal(err)
174 }
175
176 select {
177 case err := <-errChan:
178 if err != nil {
179 t.Fatal(err)
180 }
181 case <-time.After(10 * time.Second):
182
183 q.Close()
184 t.Fatal("timeout waiting for Dequeue go routine to complete")
185 }
186 }
187
View as plain text