1 package chariot
2
3 import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "io"
8 "testing"
9 "time"
10
11 "cloud.google.com/go/storage"
12 "github.com/fsouza/fake-gcs-server/fakestorage"
13 )
14
15 type mockPubSubAckNacker struct {
16 Acked chan bool
17 Nacked chan bool
18 }
19
20 func newMockPubSubAckNacker() *mockPubSubAckNacker {
21 return &mockPubSubAckNacker{
22 Acked: make(chan bool, 1),
23 Nacked: make(chan bool, 1),
24 }
25 }
26
27 func (mpsan *mockPubSubAckNacker) Close() error {
28 close(mpsan.Acked)
29 close(mpsan.Nacked)
30 return nil
31 }
32
33 func (mpsan *mockPubSubAckNacker) Ack() {
34 mpsan.Acked <- true
35 }
36
37 func (mpsan *mockPubSubAckNacker) Nack() {
38 mpsan.Nacked <- true
39 }
40
41 type mockPubSubService struct {
42 messages chan IPubSubMessage
43 }
44
45 func newMockPubSubService() *mockPubSubService {
46 return &mockPubSubService{
47 messages: make(chan IPubSubMessage, 100),
48 }
49 }
50
51 func (mpss *mockPubSubService) Close() error {
52 close(mpss.messages)
53 return nil
54 }
55
56 func (mpss *mockPubSubService) Send(ctx context.Context, ipsm IPubSubMessage) error {
57 select {
58 case mpss.messages <- ipsm:
59 return nil
60 case <-ctx.Done():
61 return ctx.Err()
62 }
63 }
64
65 func (mpss *mockPubSubService) Receive(ctx context.Context, f func(context.Context, IPubSubMessage)) error {
66 for msg := range mpss.messages {
67 go f(ctx, msg)
68 }
69 return fmt.Errorf("All calls to Receive return with an error per the pubsub.Subscription.Receive specs")
70 }
71
72 func TestDaemonCreatesObjects(t *testing.T) {
73 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
74 defer cancel()
75
76 var mpss = newMockPubSubService()
77 defer mpss.Close()
78
79 const testbucket = "testbucket"
80 var existingIgnoredObject = StorageObject{
81 Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket),
82 Content: "this file causes the fake-gcs-server to create the test bucket",
83 }
84 gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{
85 ObjectAttrs: fakestorage.ObjectAttrs{
86 BucketName: existingIgnoredObject.getGcsBucket(),
87 Name: existingIgnoredObject.getGcsPath(),
88 },
89 Content: []byte(existingIgnoredObject.Content),
90 })
91 if err != nil {
92 t.Fatal(err)
93 }
94 defer stopFakeGcsServer()
95
96 var (
97 psrOption = OptionPubSubReceiver(mpss)
98 gcsOption = OptionGoogleCloudStorage(gcsClient)
99 )
100
101 daemon, err := NewDaemon(psrOption, gcsOption)
102 if err != nil {
103 t.Fatal(err)
104 }
105 go daemon.Run(ctx)
106
107 var createReqs = []Request{
108 {
109 Banner: testbucket,
110 Cluster: "",
111 Operation: "CREATE",
112 Objects: [][]byte{
113 randomChariotYamlObject(),
114 randomChariotYamlObject(),
115 randomChariotYamlObject(),
116 randomChariotYamlObject(),
117 randomChariotYamlObject(),
118 },
119 Owner: "test",
120 },
121 {
122 Banner: testbucket,
123 Cluster: "cluster1",
124 Operation: "CREATE",
125 Objects: [][]byte{
126 randomChariotYamlObject(),
127 randomChariotYamlObject(),
128 randomChariotYamlObject(),
129 randomChariotYamlObject(),
130 randomChariotYamlObject(),
131 },
132 Owner: "test",
133 },
134 {
135 Banner: testbucket,
136 Cluster: "",
137 Operation: "CREATE",
138 Objects: [][]byte{
139 randomChariotYamlObject(),
140 randomChariotYamlObject(),
141 randomChariotYamlObject(),
142 randomChariotYamlObject(),
143 randomChariotYamlObject(),
144 },
145 Owner: "test",
146 },
147 {
148 Banner: testbucket,
149 Cluster: "cluster2",
150 Operation: "CREATE",
151 Objects: [][]byte{
152 randomChariotYamlObject(),
153 randomChariotYamlObject(),
154 randomChariotYamlObject(),
155 randomChariotYamlObject(),
156 randomChariotYamlObject(),
157 },
158 Owner: "test",
159 },
160 }
161
162
163 t.Logf("Sending %d create requests to the Daemon", len(createReqs))
164 for i, req := range createReqs {
165 var an = newMockPubSubAckNacker()
166 defer an.Close()
167 data, err := json.Marshal(req)
168 if err != nil {
169 t.Fatal(err)
170 }
171 err = mpss.Send(ctx, &PubSubMessage{
172 ackNack: an,
173 id: fmt.Sprintf("%d", i),
174 data: data,
175 publishTime: time.Now(),
176 })
177 if err != nil {
178 t.Fatal(err)
179 }
180 select {
181 case <-an.Acked:
182 t.Logf("Got ack from message %d", i+1)
183 case <-an.Nacked:
184 t.Fatalf("Message should not send a nack signal")
185 case <-ctx.Done():
186 t.Fatal(ctx.Err())
187 }
188 }
189 t.Logf("Got acks from all %d requests", len(createReqs))
190
191
192 for _, req := range createReqs {
193 so, err := req.StorageObjects()
194 if err != nil {
195 t.Fatal(err)
196 }
197 t.Logf("Checking %d objects exist in bucket", len(so))
198 err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...)
199 if err != nil {
200 t.Fatal(err)
201 }
202 }
203 t.Logf("Successfully created all objects in GCS storage")
204 }
205
206 func TestDaemonDeletesObjects(t *testing.T) {
207 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
208 defer cancel()
209
210 var mpss = newMockPubSubService()
211 defer mpss.Close()
212
213 const testbucket = "testbucket"
214 var existingIgnoredObject = StorageObject{
215 Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket),
216 Content: "this file causes the fake-gcs-server to create the test bucket",
217 }
218 gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{
219 ObjectAttrs: fakestorage.ObjectAttrs{
220 BucketName: existingIgnoredObject.getGcsBucket(),
221 Name: existingIgnoredObject.getGcsPath(),
222 },
223 Content: []byte(existingIgnoredObject.Content),
224 })
225 if err != nil {
226 t.Fatal(err)
227 }
228 defer stopFakeGcsServer()
229
230 var (
231 psrOption = OptionPubSubReceiver(mpss)
232 gcsOption = OptionGoogleCloudStorage(gcsClient)
233 )
234
235 daemon, err := NewDaemon(psrOption, gcsOption)
236 if err != nil {
237 t.Fatal(err)
238 }
239 go daemon.Run(ctx)
240
241 var createReqs = []Request{
242 {
243 Banner: testbucket,
244 Cluster: "",
245 Operation: "CREATE",
246 Objects: [][]byte{
247 randomChariotYamlObject(),
248 randomChariotYamlObject(),
249 randomChariotYamlObject(),
250 randomChariotYamlObject(),
251 randomChariotYamlObject(),
252 },
253 Owner: "test",
254 },
255 {
256 Banner: testbucket,
257 Cluster: "cluster1",
258 Operation: "CREATE",
259 Objects: [][]byte{
260 randomChariotYamlObject(),
261 randomChariotYamlObject(),
262 randomChariotYamlObject(),
263 randomChariotYamlObject(),
264 randomChariotYamlObject(),
265 },
266 Owner: "test",
267 },
268 {
269 Banner: testbucket,
270 Cluster: "",
271 Operation: "CREATE",
272 Objects: [][]byte{
273 randomChariotYamlObject(),
274 randomChariotYamlObject(),
275 randomChariotYamlObject(),
276 randomChariotYamlObject(),
277 randomChariotYamlObject(),
278 },
279 Owner: "test",
280 },
281 {
282 Banner: testbucket,
283 Cluster: "cluster2",
284 Operation: "CREATE",
285 Objects: [][]byte{
286 randomChariotYamlObject(),
287 randomChariotYamlObject(),
288 randomChariotYamlObject(),
289 randomChariotYamlObject(),
290 randomChariotYamlObject(),
291 },
292 Owner: "test",
293 },
294 }
295
296
297 t.Logf("Sending %d create requests to the Daemon", len(createReqs))
298 for i, req := range createReqs {
299 var an = newMockPubSubAckNacker()
300 defer an.Close()
301 data, err := json.Marshal(req)
302 if err != nil {
303 t.Fatal(err)
304 }
305 err = mpss.Send(ctx, &PubSubMessage{
306 ackNack: an,
307 id: fmt.Sprintf("%d", i),
308 data: data,
309 publishTime: time.Now(),
310 })
311 if err != nil {
312 t.Fatal(err)
313 }
314 select {
315 case <-an.Acked:
316 t.Logf("Got ack from message %d", i+1)
317 case <-an.Nacked:
318 t.Fatalf("Message should not send a nack signal")
319 case <-ctx.Done():
320 t.Fatal(ctx.Err())
321 }
322 }
323 t.Logf("Got acks from all %d requests", len(createReqs))
324
325
326 for _, req := range createReqs {
327 so, err := req.StorageObjects()
328 if err != nil {
329 t.Fatal(err)
330 }
331 t.Logf("Checking %d objects exist in bucket", len(so))
332 err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...)
333 if err != nil {
334 t.Fatal(err)
335 }
336 }
337 t.Logf("Created all objects in GCS storage")
338
339 t.Logf("Now testing the deletion of the created objects in GCS")
340 for i, req := range createReqs {
341
342 var keepObjects [][]byte
343 var deleteObjects [][]byte
344 for j, object := range req.Objects {
345 if j%2 == 0 {
346 keepObjects = append(keepObjects, object)
347 } else {
348 deleteObjects = append(deleteObjects, object)
349 }
350 }
351
352
353 req.Operation = "DELETE"
354 req.Objects = deleteObjects
355
356 var an = newMockPubSubAckNacker()
357 defer an.Close()
358 data, err := json.Marshal(req)
359 if err != nil {
360 t.Fatal(err)
361 }
362 err = mpss.Send(ctx, &PubSubMessage{
363 ackNack: an,
364 id: fmt.Sprintf("%d", i),
365 data: data,
366 publishTime: time.Now(),
367 })
368 if err != nil {
369 t.Fatal(err)
370 }
371 select {
372 case <-an.Acked:
373 t.Logf("Got ack from message %d", i+1)
374 case <-an.Nacked:
375 t.Fatalf("Message should not send a nack signal")
376 case <-ctx.Done():
377 t.Fatal(ctx.Err())
378 }
379
380 so, err := req.StorageObjects()
381 if err != nil {
382 t.Fatal(err)
383 }
384 t.Logf("Checking %d objects do not exist in bucket", len(so))
385 err = assertStorageObjectsDoNotExistInBucket(ctx, gcsClient, so...)
386 if err != nil {
387 t.Fatal(err)
388 }
389
390
391 req.Operation = "CREATE"
392 req.Objects = keepObjects
393 so, err = req.StorageObjects()
394 if err != nil {
395 t.Fatal(err)
396 }
397 t.Logf("Checking %d objects exist in bucket", len(so))
398 err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...)
399 if err != nil {
400 t.Fatal(err)
401 }
402 }
403 t.Logf("Verified the correct objects were deleted and not deleted")
404 }
405
406 func TestDaemonDeletesObjectsThatDoNotExist(t *testing.T) {
407 ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
408 defer cancel()
409
410 var mpss = newMockPubSubService()
411 defer mpss.Close()
412
413 const testbucket = "testbucket"
414 var existingIgnoredObject = StorageObject{
415 Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket),
416 Content: "this file causes the fake-gcs-server to create the test bucket",
417 }
418 gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{
419 ObjectAttrs: fakestorage.ObjectAttrs{
420 BucketName: existingIgnoredObject.getGcsBucket(),
421 Name: existingIgnoredObject.getGcsPath(),
422 },
423 Content: []byte(existingIgnoredObject.Content),
424 })
425 if err != nil {
426 t.Fatal(err)
427 }
428 defer stopFakeGcsServer()
429
430 var (
431 psrOption = OptionPubSubReceiver(mpss)
432 gcsOption = OptionGoogleCloudStorage(gcsClient)
433 )
434
435 daemon, err := NewDaemon(psrOption, gcsOption)
436 if err != nil {
437 t.Fatal(err)
438 }
439 go daemon.Run(ctx)
440
441 var createReqs = []Request{
442 {
443 Banner: testbucket,
444 Cluster: "",
445 Operation: "CREATE",
446 Objects: [][]byte{
447 randomChariotYamlObject(),
448 randomChariotYamlObject(),
449 randomChariotYamlObject(),
450 randomChariotYamlObject(),
451 randomChariotYamlObject(),
452 },
453 Owner: "test",
454 },
455 {
456 Banner: testbucket,
457 Cluster: "cluster1",
458 Operation: "CREATE",
459 Objects: [][]byte{
460 randomChariotYamlObject(),
461 randomChariotYamlObject(),
462 randomChariotYamlObject(),
463 randomChariotYamlObject(),
464 randomChariotYamlObject(),
465 },
466 Owner: "test",
467 },
468 {
469 Banner: testbucket,
470 Cluster: "",
471 Operation: "CREATE",
472 Objects: [][]byte{
473 randomChariotYamlObject(),
474 randomChariotYamlObject(),
475 randomChariotYamlObject(),
476 randomChariotYamlObject(),
477 randomChariotYamlObject(),
478 },
479 Owner: "test",
480 },
481 {
482 Banner: testbucket,
483 Cluster: "cluster2",
484 Operation: "CREATE",
485 Objects: [][]byte{
486 randomChariotYamlObject(),
487 randomChariotYamlObject(),
488 randomChariotYamlObject(),
489 randomChariotYamlObject(),
490 randomChariotYamlObject(),
491 },
492 Owner: "test",
493 },
494 }
495
496
497 t.Logf("Sending %d create requests to the Daemon", len(createReqs))
498 for i, req := range createReqs {
499 var an = newMockPubSubAckNacker()
500 defer an.Close()
501 data, err := json.Marshal(req)
502 if err != nil {
503 t.Fatal(err)
504 }
505 err = mpss.Send(ctx, &PubSubMessage{
506 ackNack: an,
507 id: fmt.Sprintf("%d", i),
508 data: data,
509 publishTime: time.Now(),
510 })
511 if err != nil {
512 t.Fatal(err)
513 }
514 select {
515 case <-an.Acked:
516 t.Logf("Got ack from message %d", i+1)
517 case <-an.Nacked:
518 t.Fatalf("Message should not send a nack signal")
519 case <-ctx.Done():
520 t.Fatal(ctx.Err())
521 }
522 }
523 t.Logf("Got acks from all %d requests", len(createReqs))
524
525
526 for _, req := range createReqs {
527 so, err := req.StorageObjects()
528 if err != nil {
529 t.Fatal(err)
530 }
531 t.Logf("Checking %d objects exist in bucket", len(so))
532 err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...)
533 if err != nil {
534 t.Fatal(err)
535 }
536 }
537 t.Logf("Created all objects in GCS storage")
538
539 t.Logf("Now testing the deletion of the created objects in GCS")
540 for i, req := range createReqs {
541
542 var keepObjects [][]byte
543 var deleteObjects [][]byte
544 for j, object := range req.Objects {
545 if j%2 == 0 {
546 keepObjects = append(keepObjects, object)
547 } else {
548 deleteObjects = append(deleteObjects, object)
549 }
550 }
551
552
553 req.Operation = "DELETE"
554 req.Objects = deleteObjects
555
556 var an = newMockPubSubAckNacker()
557 defer an.Close()
558 data, err := json.Marshal(req)
559 if err != nil {
560 t.Fatal(err)
561 }
562 err = mpss.Send(ctx, &PubSubMessage{
563 ackNack: an,
564 id: fmt.Sprintf("%d", i),
565 data: data,
566 publishTime: time.Now(),
567 })
568 if err != nil {
569 t.Fatal(err)
570 }
571 select {
572 case <-an.Acked:
573 t.Logf("Got ack from message %d", i+1)
574 case <-an.Nacked:
575 t.Fatalf("Message should not send a nack signal")
576 case <-ctx.Done():
577 t.Fatal(ctx.Err())
578 }
579
580 so, err := req.StorageObjects()
581 if err != nil {
582 t.Fatal(err)
583 }
584 t.Logf("Checking %d objects do not exist in bucket", len(so))
585 err = assertStorageObjectsDoNotExistInBucket(ctx, gcsClient, so...)
586 if err != nil {
587 t.Fatal(err)
588 }
589
590 t.Logf("Attempting to delete objects again")
591 err = mpss.Send(ctx, &PubSubMessage{
592 ackNack: an,
593 id: fmt.Sprintf("%d", i),
594 data: data,
595 publishTime: time.Now(),
596 })
597 if err != nil {
598 t.Fatal(err)
599 }
600 select {
601 case <-an.Acked:
602 t.Logf("Got ack from message %d", i+1)
603 case <-an.Nacked:
604 t.Fatalf("Message should not send a nack signal")
605 case <-ctx.Done():
606 t.Fatal(ctx.Err())
607 }
608
609
610 req.Operation = "CREATE"
611 req.Objects = keepObjects
612 so, err = req.StorageObjects()
613 if err != nil {
614 t.Fatal(err)
615 }
616 t.Logf("Checking %d objects exist in bucket", len(so))
617 err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...)
618 if err != nil {
619 t.Fatal(err)
620 }
621 }
622 t.Logf("Verified the correct objects were deleted and not deleted")
623 }
624
625 func assertStorageObjectsExistInBucket(ctx context.Context, client *storage.Client, objects ...StorageObject) error {
626 for _, object := range objects {
627 var bucket = object.getGcsBucket()
628 var path = object.getGcsPath()
629 r, err := client.Bucket(bucket).Object(path).NewReader(ctx)
630 if err != nil {
631 return err
632 }
633 b, err := io.ReadAll(r)
634 if err != nil {
635 return err
636 } else if err = r.Close(); err != nil {
637 return err
638 } else if string(b) != object.Content {
639 return fmt.Errorf("Expected object content does not match. Got %q, want %q", string(b), object.Content)
640 }
641 }
642 return nil
643 }
644
645 func assertStorageObjectsDoNotExistInBucket(ctx context.Context, client *storage.Client, objects ...StorageObject) error {
646 for _, object := range objects {
647 var bucket = object.getGcsBucket()
648 var path = object.getGcsPath()
649 r, err := client.Bucket(bucket).Object(path).NewReader(ctx)
650 if err != nil && err == storage.ErrObjectNotExist {
651
652 continue
653 } else if err != nil {
654 return err
655 }
656 r.Close()
657 return fmt.Errorf("Object at location %q should not exist", object.Location)
658 }
659 return nil
660 }
661
View as plain text