package chariot import ( "context" "encoding/json" "fmt" "io" "testing" "time" "cloud.google.com/go/storage" "github.com/fsouza/fake-gcs-server/fakestorage" ) type mockPubSubAckNacker struct { Acked chan bool Nacked chan bool } func newMockPubSubAckNacker() *mockPubSubAckNacker { return &mockPubSubAckNacker{ Acked: make(chan bool, 1), Nacked: make(chan bool, 1), } } func (mpsan *mockPubSubAckNacker) Close() error { close(mpsan.Acked) close(mpsan.Nacked) return nil } func (mpsan *mockPubSubAckNacker) Ack() { mpsan.Acked <- true } func (mpsan *mockPubSubAckNacker) Nack() { mpsan.Nacked <- true } type mockPubSubService struct { messages chan IPubSubMessage } func newMockPubSubService() *mockPubSubService { return &mockPubSubService{ messages: make(chan IPubSubMessage, 100), } } func (mpss *mockPubSubService) Close() error { close(mpss.messages) return nil } func (mpss *mockPubSubService) Send(ctx context.Context, ipsm IPubSubMessage) error { select { case mpss.messages <- ipsm: return nil case <-ctx.Done(): return ctx.Err() } } func (mpss *mockPubSubService) Receive(ctx context.Context, f func(context.Context, IPubSubMessage)) error { for msg := range mpss.messages { go f(ctx, msg) } return fmt.Errorf("All calls to Receive return with an error per the pubsub.Subscription.Receive specs") } func TestDaemonCreatesObjects(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() var mpss = newMockPubSubService() defer mpss.Close() const testbucket = "testbucket" var existingIgnoredObject = StorageObject{ Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket), Content: "this file causes the fake-gcs-server to create the test bucket", } gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{ ObjectAttrs: fakestorage.ObjectAttrs{ BucketName: existingIgnoredObject.getGcsBucket(), Name: existingIgnoredObject.getGcsPath(), }, Content: []byte(existingIgnoredObject.Content), }) if err != nil { t.Fatal(err) } defer stopFakeGcsServer() var ( psrOption = OptionPubSubReceiver(mpss) gcsOption = OptionGoogleCloudStorage(gcsClient) ) daemon, err := NewDaemon(psrOption, gcsOption) if err != nil { t.Fatal(err) } go daemon.Run(ctx) //nolint:errcheck var createReqs = []Request{ { Banner: testbucket, Cluster: "", // empty for banner-wide objects Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, { Banner: testbucket, Cluster: "cluster1", Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, { Banner: testbucket, Cluster: "", // empty for banner-wide objects Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, { Banner: testbucket, Cluster: "cluster2", Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, } // Send requests over pubsub. t.Logf("Sending %d create requests to the Daemon", len(createReqs)) for i, req := range createReqs { var an = newMockPubSubAckNacker() defer an.Close() data, err := json.Marshal(req) if err != nil { t.Fatal(err) } err = mpss.Send(ctx, &PubSubMessage{ ackNack: an, id: fmt.Sprintf("%d", i), data: data, publishTime: time.Now(), }) if err != nil { t.Fatal(err) } select { case <-an.Acked: t.Logf("Got ack from message %d", i+1) case <-an.Nacked: t.Fatalf("Message should not send a nack signal") case <-ctx.Done(): t.Fatal(ctx.Err()) } } t.Logf("Got acks from all %d requests", len(createReqs)) // Check the bucket for the files we created. for _, req := range createReqs { so, err := req.StorageObjects() if err != nil { t.Fatal(err) } t.Logf("Checking %d objects exist in bucket", len(so)) err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...) if err != nil { t.Fatal(err) } } t.Logf("Successfully created all objects in GCS storage") } func TestDaemonDeletesObjects(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() var mpss = newMockPubSubService() defer mpss.Close() const testbucket = "testbucket" var existingIgnoredObject = StorageObject{ Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket), Content: "this file causes the fake-gcs-server to create the test bucket", } gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{ ObjectAttrs: fakestorage.ObjectAttrs{ BucketName: existingIgnoredObject.getGcsBucket(), Name: existingIgnoredObject.getGcsPath(), }, Content: []byte(existingIgnoredObject.Content), }) if err != nil { t.Fatal(err) } defer stopFakeGcsServer() var ( psrOption = OptionPubSubReceiver(mpss) gcsOption = OptionGoogleCloudStorage(gcsClient) ) daemon, err := NewDaemon(psrOption, gcsOption) if err != nil { t.Fatal(err) } go daemon.Run(ctx) //nolint:errcheck var createReqs = []Request{ { Banner: testbucket, Cluster: "", // empty for banner-wide objects Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, { Banner: testbucket, Cluster: "cluster1", Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, { Banner: testbucket, Cluster: "", // empty for banner-wide objects Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, { Banner: testbucket, Cluster: "cluster2", Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, } // Send requests over pubsub. t.Logf("Sending %d create requests to the Daemon", len(createReqs)) for i, req := range createReqs { var an = newMockPubSubAckNacker() defer an.Close() data, err := json.Marshal(req) if err != nil { t.Fatal(err) } err = mpss.Send(ctx, &PubSubMessage{ ackNack: an, id: fmt.Sprintf("%d", i), data: data, publishTime: time.Now(), }) if err != nil { t.Fatal(err) } select { case <-an.Acked: t.Logf("Got ack from message %d", i+1) case <-an.Nacked: t.Fatalf("Message should not send a nack signal") case <-ctx.Done(): t.Fatal(ctx.Err()) } } t.Logf("Got acks from all %d requests", len(createReqs)) // Check the bucket for the files we created. for _, req := range createReqs { so, err := req.StorageObjects() if err != nil { t.Fatal(err) } t.Logf("Checking %d objects exist in bucket", len(so)) err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...) if err != nil { t.Fatal(err) } } t.Logf("Created all objects in GCS storage") t.Logf("Now testing the deletion of the created objects in GCS") for i, req := range createReqs { // Keep half of the objects and delete the other half. var keepObjects [][]byte var deleteObjects [][]byte for j, object := range req.Objects { if j%2 == 0 { keepObjects = append(keepObjects, object) } else { deleteObjects = append(deleteObjects, object) } } // Set the operation to DELETE req.Operation = "DELETE" req.Objects = deleteObjects var an = newMockPubSubAckNacker() defer an.Close() data, err := json.Marshal(req) if err != nil { t.Fatal(err) } err = mpss.Send(ctx, &PubSubMessage{ ackNack: an, id: fmt.Sprintf("%d", i), data: data, publishTime: time.Now(), }) if err != nil { t.Fatal(err) } select { case <-an.Acked: t.Logf("Got ack from message %d", i+1) case <-an.Nacked: t.Fatalf("Message should not send a nack signal") case <-ctx.Done(): t.Fatal(ctx.Err()) } // Check for the deleted objects. so, err := req.StorageObjects() if err != nil { t.Fatal(err) } t.Logf("Checking %d objects do not exist in bucket", len(so)) err = assertStorageObjectsDoNotExistInBucket(ctx, gcsClient, so...) if err != nil { t.Fatal(err) } // Now check for the objects that should be kept. req.Operation = "CREATE" req.Objects = keepObjects so, err = req.StorageObjects() if err != nil { t.Fatal(err) } t.Logf("Checking %d objects exist in bucket", len(so)) err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...) if err != nil { t.Fatal(err) } } t.Logf("Verified the correct objects were deleted and not deleted") } func TestDaemonDeletesObjectsThatDoNotExist(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() var mpss = newMockPubSubService() defer mpss.Close() const testbucket = "testbucket" var existingIgnoredObject = StorageObject{ Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket), Content: "this file causes the fake-gcs-server to create the test bucket", } gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{ ObjectAttrs: fakestorage.ObjectAttrs{ BucketName: existingIgnoredObject.getGcsBucket(), Name: existingIgnoredObject.getGcsPath(), }, Content: []byte(existingIgnoredObject.Content), }) if err != nil { t.Fatal(err) } defer stopFakeGcsServer() var ( psrOption = OptionPubSubReceiver(mpss) gcsOption = OptionGoogleCloudStorage(gcsClient) ) daemon, err := NewDaemon(psrOption, gcsOption) if err != nil { t.Fatal(err) } go daemon.Run(ctx) //nolint:errcheck var createReqs = []Request{ { Banner: testbucket, Cluster: "", // empty for banner-wide objects Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, { Banner: testbucket, Cluster: "cluster1", Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, { Banner: testbucket, Cluster: "", // empty for banner-wide objects Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, { Banner: testbucket, Cluster: "cluster2", Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, } // Send requests over pubsub. t.Logf("Sending %d create requests to the Daemon", len(createReqs)) for i, req := range createReqs { var an = newMockPubSubAckNacker() defer an.Close() data, err := json.Marshal(req) if err != nil { t.Fatal(err) } err = mpss.Send(ctx, &PubSubMessage{ ackNack: an, id: fmt.Sprintf("%d", i), data: data, publishTime: time.Now(), }) if err != nil { t.Fatal(err) } select { case <-an.Acked: t.Logf("Got ack from message %d", i+1) case <-an.Nacked: t.Fatalf("Message should not send a nack signal") case <-ctx.Done(): t.Fatal(ctx.Err()) } } t.Logf("Got acks from all %d requests", len(createReqs)) // Check the bucket for the files we created. for _, req := range createReqs { so, err := req.StorageObjects() if err != nil { t.Fatal(err) } t.Logf("Checking %d objects exist in bucket", len(so)) err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...) if err != nil { t.Fatal(err) } } t.Logf("Created all objects in GCS storage") t.Logf("Now testing the deletion of the created objects in GCS") for i, req := range createReqs { // Keep half of the objects and delete the other half. var keepObjects [][]byte var deleteObjects [][]byte for j, object := range req.Objects { if j%2 == 0 { keepObjects = append(keepObjects, object) } else { deleteObjects = append(deleteObjects, object) } } // Set the operation to DELETE req.Operation = "DELETE" req.Objects = deleteObjects var an = newMockPubSubAckNacker() defer an.Close() data, err := json.Marshal(req) if err != nil { t.Fatal(err) } err = mpss.Send(ctx, &PubSubMessage{ ackNack: an, id: fmt.Sprintf("%d", i), data: data, publishTime: time.Now(), }) if err != nil { t.Fatal(err) } select { case <-an.Acked: t.Logf("Got ack from message %d", i+1) case <-an.Nacked: t.Fatalf("Message should not send a nack signal") case <-ctx.Done(): t.Fatal(ctx.Err()) } // Check for the deleted objects. so, err := req.StorageObjects() if err != nil { t.Fatal(err) } t.Logf("Checking %d objects do not exist in bucket", len(so)) err = assertStorageObjectsDoNotExistInBucket(ctx, gcsClient, so...) if err != nil { t.Fatal(err) } // Attempt to delete the objects again t.Logf("Attempting to delete objects again") err = mpss.Send(ctx, &PubSubMessage{ ackNack: an, id: fmt.Sprintf("%d", i), data: data, publishTime: time.Now(), }) if err != nil { t.Fatal(err) } select { case <-an.Acked: t.Logf("Got ack from message %d", i+1) case <-an.Nacked: t.Fatalf("Message should not send a nack signal") case <-ctx.Done(): t.Fatal(ctx.Err()) } // Now check for the objects that should be kept. req.Operation = "CREATE" req.Objects = keepObjects so, err = req.StorageObjects() if err != nil { t.Fatal(err) } t.Logf("Checking %d objects exist in bucket", len(so)) err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...) if err != nil { t.Fatal(err) } } t.Logf("Verified the correct objects were deleted and not deleted") } func assertStorageObjectsExistInBucket(ctx context.Context, client *storage.Client, objects ...StorageObject) error { for _, object := range objects { var bucket = object.getGcsBucket() var path = object.getGcsPath() r, err := client.Bucket(bucket).Object(path).NewReader(ctx) if err != nil { return err } b, err := io.ReadAll(r) if err != nil { return err } else if err = r.Close(); err != nil { return err } else if string(b) != object.Content { return fmt.Errorf("Expected object content does not match. Got %q, want %q", string(b), object.Content) } } return nil } func assertStorageObjectsDoNotExistInBucket(ctx context.Context, client *storage.Client, objects ...StorageObject) error { for _, object := range objects { var bucket = object.getGcsBucket() var path = object.getGcsPath() r, err := client.Bucket(bucket).Object(path).NewReader(ctx) if err != nil && err == storage.ErrObjectNotExist { // Yay it does not exist. continue } else if err != nil { return err } r.Close() return fmt.Errorf("Object at location %q should not exist", object.Location) } return nil }