1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package scheduler_test
16
17 import (
18 "fmt"
19 "testing"
20 "time"
21
22 "cloud.google.com/go/pubsub/internal/scheduler"
23 )
24
25 func TestPublishScheduler_Put_Basic(t *testing.T) {
26 done := make(chan struct{})
27 defer close(done)
28
29 keysHandled := map[string]chan int{}
30 handle := func(itemi interface{}) {
31 items := itemi.([]pair)
32 for _, item := range items {
33 keysHandled[item.k] <- item.v
34 }
35 }
36 s := scheduler.NewPublishScheduler(2, handle)
37 defer s.FlushAndStop()
38
39
40
41 numItems := 100
42 numKeys := 10
43
44 for ki := 0; ki < numKeys; ki++ {
45 k := fmt.Sprintf("some_key_%d", ki)
46 keysHandled[k] = make(chan int, numItems)
47 }
48
49 for ki := 0; ki < numKeys; ki++ {
50 k := fmt.Sprintf("some_key_%d", ki)
51 go func() {
52 for i := 0; i < numItems; i++ {
53 select {
54 case <-done:
55 return
56 default:
57 }
58 if err := s.Add(k, pair{k, i}, 1); err != nil {
59 t.Error(err)
60 }
61 }
62 }()
63 }
64
65 for ki := 0; ki < numKeys; ki++ {
66 k := fmt.Sprintf("some_key_%d", ki)
67 for want := 0; want < numItems; want++ {
68 select {
69 case got := <-keysHandled[k]:
70 if got != want {
71 t.Fatalf("%s: got %d, want %d", k, got, want)
72 }
73 case <-time.After(5 * time.Second):
74 t.Fatalf("%s: expected key %s - item %d to be handled but never was", k, k, want)
75 }
76 }
77 }
78 }
79
80
81
82 func TestPublishScheduler_Put_ManyWithOneKey(t *testing.T) {
83 done := make(chan struct{})
84 defer close(done)
85
86 recvd := make(chan int)
87 handle := func(itemi interface{}) {
88 items := itemi.([]int)
89 for _, item := range items {
90 recvd <- item
91 }
92 }
93 s := scheduler.NewPublishScheduler(10, handle)
94 defer s.FlushAndStop()
95
96
97
98 numItems := 1000
99
100 go func() {
101 for i := 0; i < numItems; i++ {
102 select {
103 case <-done:
104 return
105 default:
106 }
107 if err := s.Add("some-key", i, 1); err != nil {
108 t.Error(err)
109 }
110 }
111 }()
112
113 for want := 0; want < numItems; want++ {
114 select {
115 case got := <-recvd:
116 if got != want {
117 t.Fatalf("got %d, want %d", got, want)
118 }
119 case <-time.After(5 * time.Second):
120 t.Fatalf("timed out waiting for item %d to be handled", want)
121 }
122 }
123 }
124
125 func TestPublishScheduler_DoesntRaceWithPublisher(t *testing.T) {
126 done := make(chan struct{})
127 defer close(done)
128
129 keysHandled := map[string]chan int{}
130 handle := func(itemi interface{}) {
131 items := itemi.([]pair)
132 for _, item := range items {
133 keysHandled[item.k] <- item.v
134 }
135 }
136 s := scheduler.NewPublishScheduler(2, handle)
137 defer s.FlushAndStop()
138
139
140
141 numItems := 100
142 numKeys := 10
143
144 for ki := 0; ki < numKeys; ki++ {
145 k := fmt.Sprintf("some_key_%d", ki)
146 keysHandled[k] = make(chan int, numItems)
147 }
148
149 for ki := 0; ki < numKeys; ki++ {
150 k := fmt.Sprintf("some_key_%d", ki)
151 go func() {
152 for i := 0; i < numItems; i++ {
153 select {
154 case <-done:
155 return
156 default:
157 }
158 if err := s.Add(k, pair{k, i}, 1); err != nil {
159 t.Error(err)
160 }
161 }
162 }()
163 }
164
165 for ki := 0; ki < numKeys; ki++ {
166 k := fmt.Sprintf("some_key_%d", ki)
167 for want := 0; want < numItems; want++ {
168 select {
169 case got := <-keysHandled[k]:
170 if got != want {
171 t.Fatalf("%s: got %d, want %d", k, got, want)
172 }
173 case <-time.After(5 * time.Second):
174 t.Fatalf("%s: expected key %s - item %d to be handled but never was", k, k, want)
175 }
176 }
177 }
178 }
179
180
181 func TestPublishScheduler_FlushAndStop(t *testing.T) {
182 for _, tc := range []struct {
183 name string
184 input map[string][]int
185 }{
186 {
187 name: "two messages with the same key",
188 input: map[string][]int{"foo": {1, 2}},
189 },
190 {
191 name: "two messages with different keys",
192 input: map[string][]int{"foo": {1}, "bar": {2}},
193 },
194 {
195 name: "two messages with no key",
196 input: map[string][]int{"": {1, 2}},
197 },
198 } {
199 t.Run(tc.name, func(t *testing.T) {
200 recvd := make(chan int)
201 handle := func(itemi interface{}) {
202 for _, v := range itemi.([]int) {
203 recvd <- v
204 }
205 }
206 s := scheduler.NewPublishScheduler(1, handle)
207 for k, vs := range tc.input {
208 for _, v := range vs {
209 if err := s.Add(k, v, 1); err != nil {
210 t.Fatal(err)
211 }
212 }
213 }
214
215 doneFlushing := make(chan struct{})
216 go func() {
217 s.FlushAndStop()
218 close(doneFlushing)
219 }()
220
221 time.Sleep(10 * time.Millisecond)
222
223 select {
224 case <-doneFlushing:
225 t.Fatal("expected FlushAndStop to block until all messages handled, but it didn't")
226 default:
227 }
228
229 select {
230 case <-recvd:
231 case <-time.After(time.Second):
232 t.Fatal("timed out waiting for first message to arrive")
233 }
234
235 select {
236 case <-recvd:
237 case <-time.After(time.Second):
238 t.Fatal("timed out waiting for second message to arrive")
239 }
240
241 select {
242 case <-doneFlushing:
243 case <-time.After(time.Second):
244 t.Fatal("timed out waiting for FlushAndStop to finish blocking")
245 }
246 })
247 }
248 }
249
View as plain text