...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package scheduler_test
16
17 import (
18 "fmt"
19 "testing"
20 "time"
21
22 "cloud.google.com/go/pubsub/internal/scheduler"
23 )
24
25 type pair struct {
26 k string
27 v int
28 }
29
30 func TestReceiveScheduler_Put_Basic(t *testing.T) {
31 done := make(chan struct{})
32 defer close(done)
33
34 keysHandled := map[string]chan int{}
35 handle := func(itemi interface{}) {
36 item := itemi.(pair)
37 keysHandled[item.k] <- item.v
38 }
39
40
41 numItems := 100
42 numKeys := 10
43
44 s := scheduler.NewReceiveScheduler(1)
45 defer s.Shutdown()
46 for ki := 0; ki < numKeys; ki++ {
47 k := fmt.Sprintf("some_key_%d", ki)
48 keysHandled[k] = make(chan int, numItems)
49 }
50
51 for ki := 0; ki < numKeys; ki++ {
52 k := fmt.Sprintf("some_key_%d", ki)
53 go func() {
54 for i := 0; i < numItems; i++ {
55 select {
56 case <-done:
57 return
58 default:
59 }
60 if err := s.Add(k, pair{k: k, v: i}, handle); err != nil {
61 t.Error(err)
62 }
63 }
64 }()
65 }
66
67 for ki := 0; ki < numKeys; ki++ {
68 k := fmt.Sprintf("some_key_%d", ki)
69 for want := 0; want < numItems; want++ {
70 select {
71 case got := <-keysHandled[k]:
72 if got != want {
73 t.Fatalf("%s: got %d, want %d", k, got, want)
74 }
75 case <-time.After(5 * time.Second):
76 t.Fatalf("%s: expected key %s - item %d to be handled but never was", k, k, want)
77 }
78 }
79 }
80 }
81
82
83
84 func TestReceiveScheduler_Put_ManyWithOneKey(t *testing.T) {
85 done := make(chan struct{})
86 defer close(done)
87
88 recvd := make(chan int)
89 handle := func(itemi interface{}) {
90 recvd <- itemi.(int)
91 }
92
93
94 numItems := 10000
95 s := scheduler.NewReceiveScheduler(10)
96 defer s.Shutdown()
97
98 go func() {
99 for i := 0; i < numItems; i++ {
100 select {
101 case <-done:
102 return
103 default:
104 }
105 if err := s.Add("some-key", i, handle); err != nil {
106 t.Error(err)
107 }
108 }
109 }()
110
111 for want := 0; want < numItems; want++ {
112 select {
113 case got := <-recvd:
114 if got != want {
115 t.Fatalf("got %d, want %d", got, want)
116 }
117 case <-time.After(5 * time.Second):
118 t.Fatalf("timed out waiting for item %d to be handled", want)
119 }
120 }
121 }
122
123
124 func TestReceiveScheduler_FlushAndStop(t *testing.T) {
125 for _, tc := range []struct {
126 name string
127 input map[string][]int
128 }{
129 {
130 name: "two messages with the same key",
131 input: map[string][]int{"foo": {1, 2}},
132 },
133 {
134 name: "two messages with different keys",
135 input: map[string][]int{"foo": {1}, "bar": {2}},
136 },
137 {
138 name: "two messages with no key",
139 input: map[string][]int{"": {1, 2}},
140 },
141 } {
142 t.Run(tc.name, func(t *testing.T) {
143 recvd := make(chan int, 10)
144 handle := func(itemi interface{}) {
145 recvd <- itemi.(int)
146 }
147 s := scheduler.NewReceiveScheduler(1)
148 for k, vs := range tc.input {
149 for _, v := range vs {
150 if err := s.Add(k, v, handle); err != nil {
151 t.Fatal(err)
152 }
153 }
154 }
155
156 go func() {
157 s.Shutdown()
158 }()
159
160 time.Sleep(10 * time.Millisecond)
161
162 select {
163 case <-recvd:
164 case <-time.After(time.Second):
165 t.Fatal("timed out waiting for first message to arrive")
166 }
167
168 select {
169 case <-recvd:
170 case <-time.After(time.Second):
171 t.Fatal("timed out waiting for second message to arrive")
172 }
173 })
174 }
175 }
176
View as plain text