package chariot import ( "context" "encoding/json" "fmt" "testing" "time" "github.com/fsouza/fake-gcs-server/fakestorage" ) type mockResponsePublisher struct { Responses chan IResponseMessage } func newMockResponsePublisher() *mockResponsePublisher { return &mockResponsePublisher{ Responses: make(chan IResponseMessage, 1), } } func (m *mockResponsePublisher) Publish(ctx context.Context, r IResponseMessage) error { select { case m.Responses <- r: return nil case <-ctx.Done(): return ctx.Err() } } func TestChariotSuccessfulResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() var mrp = newMockResponsePublisher() var mpss = newMockPubSubService() defer mpss.Close() const testbucket = "testbucket" var existingIgnoredObject = StorageObject{ Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket), Content: "this file causes the fake-gcs-server to create the test bucket", } gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{ ObjectAttrs: fakestorage.ObjectAttrs{ BucketName: existingIgnoredObject.getGcsBucket(), Name: existingIgnoredObject.getGcsPath(), }, Content: []byte(existingIgnoredObject.Content), }) if err != nil { t.Fatal(err) } defer stopFakeGcsServer() var ( mrpOption = OptionPubSubResponsePublisher(mrp) psrOption = OptionPubSubReceiver(mpss) gcsOption = OptionGoogleCloudStorage(gcsClient) ) daemon, err := NewDaemon(psrOption, gcsOption, mrpOption) if err != nil { t.Fatal(err) } go daemon.Run(ctx) //nolint:errcheck var createReqs = []Request{ { Banner: testbucket, Cluster: "", // empty for banner-wide objects Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, { Banner: testbucket, Cluster: "cluster1", Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, } // Send requests over pubsub. t.Logf("Sending %d create requests to the Daemon", len(createReqs)) for i, req := range createReqs { var an = newMockPubSubAckNacker() defer an.Close() data, err := json.Marshal(req) if err != nil { t.Fatal(err) } err = mpss.Send(ctx, &PubSubMessage{ ackNack: an, id: fmt.Sprintf("%d", i), data: data, publishTime: time.Now(), }) if err != nil { t.Fatal(err) } select { case <-an.Acked: t.Logf("Got ack from message %d", i+1) case <-an.Nacked: t.Fatalf("Message should not send a nack signal") case <-ctx.Done(): t.Fatal(ctx.Err()) } // Check for response var resp IResponseMessage select { case resp = <-mrp.Responses: t.Logf("Got response data: %s", resp.Data()) t.Logf("Got response attributes: %v", resp.Attributes()) case <-ctx.Done(): t.Fatal("Did not get response in appropriate time.", ctx.Err()) } // Check that the attribute is correctly set. if attr := resp.Attributes(); len(attr) == 0 { t.Fatalf("Got empty response pubsub attributes") } else if v, ok := attr["owner"]; !ok { t.Fatalf("owner attribute not set.") } else if v != req.Owner { t.Fatalf("owner attribute %q does not equal expected %q", v, req.Owner) } // Decode the response data and ensure it's correct. var chresp ResponseJSON err = json.Unmarshal(resp.Data(), &chresp) if err != nil { t.Fatal(err) } if !chresp.Ok { t.Fatalf("The response should set Ok=true") } if chresp.Operation != req.Operation { t.Fatalf("Response operation %q does not equal request operation %q", chresp.Operation, req.Operation) } if chresp.Error != "" { t.Fatalf("Do not expect response error: %q", chresp.Error) } wantID := fmt.Sprintf("%d", i) if chresp.PubSubRequestID != wantID { t.Fatalf("Response did not contain correct ID. Want %q Got %q", wantID, chresp.PubSubRequestID) } } t.Logf("Got acks from all %d requests", len(createReqs)) } func TestChariotErrorResponse(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() var mrp = newMockResponsePublisher() var mpss = newMockPubSubService() defer mpss.Close() const testbucket = "testbucket" var existingIgnoredObject = StorageObject{ Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket), Content: "this file causes the fake-gcs-server to create the test bucket", } gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{ ObjectAttrs: fakestorage.ObjectAttrs{ BucketName: existingIgnoredObject.getGcsBucket(), Name: existingIgnoredObject.getGcsPath(), }, Content: []byte(existingIgnoredObject.Content), }) if err != nil { t.Fatal(err) } defer stopFakeGcsServer() var ( mrpOption = OptionPubSubResponsePublisher(mrp) psrOption = OptionPubSubReceiver(mpss) gcsOption = OptionGoogleCloudStorage(gcsClient) ) daemon, err := NewDaemon(psrOption, gcsOption, mrpOption) if err != nil { t.Fatal(err) } go daemon.Run(ctx) //nolint:errcheck var createReqs = []Request{ { Banner: "", // Empty to cause an error Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, { Banner: testbucket, Cluster: "cluster1", Operation: "DANCE", // An invalid operation to cause an error. Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), }, Owner: "test", }, { Banner: testbucket, Cluster: "", Operation: "CREATE", Objects: [][]byte{ randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), randomChariotYamlObject(), []byte("AN INVALID REQUEST"), // non yaml to cause error. randomChariotYamlObject(), }, Owner: "test", }, } // Send requests over pubsub. t.Logf("Sending %d create requests to the Daemon", len(createReqs)) for i, req := range createReqs { var an = newMockPubSubAckNacker() defer an.Close() data, err := json.Marshal(req) if err != nil { t.Fatal(err) } err = mpss.Send(ctx, &PubSubMessage{ ackNack: an, id: fmt.Sprintf("%d", i), data: data, publishTime: time.Now(), }) if err != nil { t.Fatal(err) } select { case <-an.Acked: t.Logf("Got ack from message %d", i+1) case <-an.Nacked: t.Fatalf("Message should not send a nack signal") case <-ctx.Done(): t.Fatal(ctx.Err()) } // Check for response var resp IResponseMessage select { case resp = <-mrp.Responses: t.Logf("Got response data: %s", resp.Data()) t.Logf("Got response attributes: %v", resp.Attributes()) case <-ctx.Done(): t.Fatal("Did not get response in appropriate time.", ctx.Err()) } // Check that the attribute is correctly set. if attr := resp.Attributes(); len(attr) == 0 { t.Fatalf("Got empty response pubsub attributes") } else if v, ok := attr["owner"]; !ok { t.Fatalf("owner attribute not set.") } else if v != req.Owner { t.Fatalf("owner attribute %q does not equal expected %q", v, req.Owner) } // Decode the response data and ensure it's correct. var chresp ResponseJSON err = json.Unmarshal(resp.Data(), &chresp) if err != nil { t.Fatal(err) } if chresp.Ok { t.Fatalf("The response should set Ok=false") } if chresp.Error == "" { t.Fatalf("Did not get response error.") } if chresp.Operation != req.Operation { t.Fatalf("Response operation %q does not equal request operation %q", chresp.Operation, req.Operation) } wantID := fmt.Sprintf("%d", i) if chresp.PubSubRequestID != wantID { t.Fatalf("Response did not contain correct ID. Want %q Got %q", wantID, chresp.PubSubRequestID) } } t.Logf("Got acks from all %d requests", len(createReqs)) }