1 package chariot
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "testing"
8 "time"
9
10 "github.com/fsouza/fake-gcs-server/fakestorage"
11 )
12
13 type mockResponsePublisher struct {
14 Responses chan IResponseMessage
15 }
16
17 func newMockResponsePublisher() *mockResponsePublisher {
18 return &mockResponsePublisher{
19 Responses: make(chan IResponseMessage, 1),
20 }
21 }
22
23 func (m *mockResponsePublisher) Publish(ctx context.Context, r IResponseMessage) error {
24 select {
25 case m.Responses <- r:
26 return nil
27 case <-ctx.Done():
28 return ctx.Err()
29 }
30 }
31
32 func TestChariotSuccessfulResponse(t *testing.T) {
33 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
34 defer cancel()
35
36 var mrp = newMockResponsePublisher()
37
38 var mpss = newMockPubSubService()
39 defer mpss.Close()
40
41 const testbucket = "testbucket"
42 var existingIgnoredObject = StorageObject{
43 Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket),
44 Content: "this file causes the fake-gcs-server to create the test bucket",
45 }
46 gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{
47 ObjectAttrs: fakestorage.ObjectAttrs{
48 BucketName: existingIgnoredObject.getGcsBucket(),
49 Name: existingIgnoredObject.getGcsPath(),
50 },
51 Content: []byte(existingIgnoredObject.Content),
52 })
53 if err != nil {
54 t.Fatal(err)
55 }
56 defer stopFakeGcsServer()
57
58 var (
59 mrpOption = OptionPubSubResponsePublisher(mrp)
60 psrOption = OptionPubSubReceiver(mpss)
61 gcsOption = OptionGoogleCloudStorage(gcsClient)
62 )
63
64 daemon, err := NewDaemon(psrOption, gcsOption, mrpOption)
65 if err != nil {
66 t.Fatal(err)
67 }
68 go daemon.Run(ctx)
69
70 var createReqs = []Request{
71 {
72 Banner: testbucket,
73 Cluster: "",
74 Operation: "CREATE",
75 Objects: [][]byte{
76 randomChariotYamlObject(),
77 randomChariotYamlObject(),
78 randomChariotYamlObject(),
79 randomChariotYamlObject(),
80 randomChariotYamlObject(),
81 },
82 Owner: "test",
83 },
84 {
85 Banner: testbucket,
86 Cluster: "cluster1",
87 Operation: "CREATE",
88 Objects: [][]byte{
89 randomChariotYamlObject(),
90 randomChariotYamlObject(),
91 randomChariotYamlObject(),
92 randomChariotYamlObject(),
93 randomChariotYamlObject(),
94 },
95 Owner: "test",
96 },
97 }
98
99
100 t.Logf("Sending %d create requests to the Daemon", len(createReqs))
101 for i, req := range createReqs {
102 var an = newMockPubSubAckNacker()
103 defer an.Close()
104 data, err := json.Marshal(req)
105 if err != nil {
106 t.Fatal(err)
107 }
108 err = mpss.Send(ctx, &PubSubMessage{
109 ackNack: an,
110 id: fmt.Sprintf("%d", i),
111 data: data,
112 publishTime: time.Now(),
113 })
114 if err != nil {
115 t.Fatal(err)
116 }
117 select {
118 case <-an.Acked:
119 t.Logf("Got ack from message %d", i+1)
120 case <-an.Nacked:
121 t.Fatalf("Message should not send a nack signal")
122 case <-ctx.Done():
123 t.Fatal(ctx.Err())
124 }
125
126
127 var resp IResponseMessage
128 select {
129 case resp = <-mrp.Responses:
130 t.Logf("Got response data: %s", resp.Data())
131 t.Logf("Got response attributes: %v", resp.Attributes())
132 case <-ctx.Done():
133 t.Fatal("Did not get response in appropriate time.", ctx.Err())
134 }
135
136
137 if attr := resp.Attributes(); len(attr) == 0 {
138 t.Fatalf("Got empty response pubsub attributes")
139 } else if v, ok := attr["owner"]; !ok {
140 t.Fatalf("owner attribute not set.")
141 } else if v != req.Owner {
142 t.Fatalf("owner attribute %q does not equal expected %q", v, req.Owner)
143 }
144
145
146 var chresp ResponseJSON
147 err = json.Unmarshal(resp.Data(), &chresp)
148 if err != nil {
149 t.Fatal(err)
150 }
151
152 if !chresp.Ok {
153 t.Fatalf("The response should set Ok=true")
154 }
155
156 if chresp.Operation != req.Operation {
157 t.Fatalf("Response operation %q does not equal request operation %q", chresp.Operation, req.Operation)
158 }
159
160 if chresp.Error != "" {
161 t.Fatalf("Do not expect response error: %q", chresp.Error)
162 }
163
164 wantID := fmt.Sprintf("%d", i)
165 if chresp.PubSubRequestID != wantID {
166 t.Fatalf("Response did not contain correct ID. Want %q Got %q", wantID, chresp.PubSubRequestID)
167 }
168 }
169 t.Logf("Got acks from all %d requests", len(createReqs))
170 }
171
172 func TestChariotErrorResponse(t *testing.T) {
173 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
174 defer cancel()
175
176 var mrp = newMockResponsePublisher()
177
178 var mpss = newMockPubSubService()
179 defer mpss.Close()
180
181 const testbucket = "testbucket"
182 var existingIgnoredObject = StorageObject{
183 Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket),
184 Content: "this file causes the fake-gcs-server to create the test bucket",
185 }
186 gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{
187 ObjectAttrs: fakestorage.ObjectAttrs{
188 BucketName: existingIgnoredObject.getGcsBucket(),
189 Name: existingIgnoredObject.getGcsPath(),
190 },
191 Content: []byte(existingIgnoredObject.Content),
192 })
193 if err != nil {
194 t.Fatal(err)
195 }
196 defer stopFakeGcsServer()
197
198 var (
199 mrpOption = OptionPubSubResponsePublisher(mrp)
200 psrOption = OptionPubSubReceiver(mpss)
201 gcsOption = OptionGoogleCloudStorage(gcsClient)
202 )
203
204 daemon, err := NewDaemon(psrOption, gcsOption, mrpOption)
205 if err != nil {
206 t.Fatal(err)
207 }
208 go daemon.Run(ctx)
209
210 var createReqs = []Request{
211 {
212 Banner: "",
213 Operation: "CREATE",
214 Objects: [][]byte{
215 randomChariotYamlObject(),
216 randomChariotYamlObject(),
217 randomChariotYamlObject(),
218 randomChariotYamlObject(),
219 randomChariotYamlObject(),
220 },
221 Owner: "test",
222 },
223 {
224 Banner: testbucket,
225 Cluster: "cluster1",
226 Operation: "DANCE",
227 Objects: [][]byte{
228 randomChariotYamlObject(),
229 randomChariotYamlObject(),
230 randomChariotYamlObject(),
231 randomChariotYamlObject(),
232 randomChariotYamlObject(),
233 },
234 Owner: "test",
235 },
236 {
237 Banner: testbucket,
238 Cluster: "",
239 Operation: "CREATE",
240 Objects: [][]byte{
241 randomChariotYamlObject(),
242 randomChariotYamlObject(),
243 randomChariotYamlObject(),
244 randomChariotYamlObject(),
245 []byte("AN INVALID REQUEST"),
246 randomChariotYamlObject(),
247 },
248 Owner: "test",
249 },
250 }
251
252
253 t.Logf("Sending %d create requests to the Daemon", len(createReqs))
254 for i, req := range createReqs {
255 var an = newMockPubSubAckNacker()
256 defer an.Close()
257 data, err := json.Marshal(req)
258 if err != nil {
259 t.Fatal(err)
260 }
261 err = mpss.Send(ctx, &PubSubMessage{
262 ackNack: an,
263 id: fmt.Sprintf("%d", i),
264 data: data,
265 publishTime: time.Now(),
266 })
267 if err != nil {
268 t.Fatal(err)
269 }
270 select {
271 case <-an.Acked:
272 t.Logf("Got ack from message %d", i+1)
273 case <-an.Nacked:
274 t.Fatalf("Message should not send a nack signal")
275 case <-ctx.Done():
276 t.Fatal(ctx.Err())
277 }
278
279
280 var resp IResponseMessage
281 select {
282 case resp = <-mrp.Responses:
283 t.Logf("Got response data: %s", resp.Data())
284 t.Logf("Got response attributes: %v", resp.Attributes())
285 case <-ctx.Done():
286 t.Fatal("Did not get response in appropriate time.", ctx.Err())
287 }
288
289
290 if attr := resp.Attributes(); len(attr) == 0 {
291 t.Fatalf("Got empty response pubsub attributes")
292 } else if v, ok := attr["owner"]; !ok {
293 t.Fatalf("owner attribute not set.")
294 } else if v != req.Owner {
295 t.Fatalf("owner attribute %q does not equal expected %q", v, req.Owner)
296 }
297
298
299 var chresp ResponseJSON
300 err = json.Unmarshal(resp.Data(), &chresp)
301 if err != nil {
302 t.Fatal(err)
303 }
304
305 if chresp.Ok {
306 t.Fatalf("The response should set Ok=false")
307 }
308
309 if chresp.Error == "" {
310 t.Fatalf("Did not get response error.")
311 }
312
313 if chresp.Operation != req.Operation {
314 t.Fatalf("Response operation %q does not equal request operation %q", chresp.Operation, req.Operation)
315 }
316
317 wantID := fmt.Sprintf("%d", i)
318 if chresp.PubSubRequestID != wantID {
319 t.Fatalf("Response did not contain correct ID. Want %q Got %q", wantID, chresp.PubSubRequestID)
320 }
321 }
322 t.Logf("Got acks from all %d requests", len(createReqs))
323 }
324
View as plain text