...

Source file src/edge-infra.dev/pkg/edge/chariot/response_test.go

Documentation: edge-infra.dev/pkg/edge/chariot

     1  package chariot
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"testing"
     8  	"time"
     9  
    10  	"github.com/fsouza/fake-gcs-server/fakestorage"
    11  )
    12  
    13  type mockResponsePublisher struct {
    14  	Responses chan IResponseMessage
    15  }
    16  
    17  func newMockResponsePublisher() *mockResponsePublisher {
    18  	return &mockResponsePublisher{
    19  		Responses: make(chan IResponseMessage, 1),
    20  	}
    21  }
    22  
    23  func (m *mockResponsePublisher) Publish(ctx context.Context, r IResponseMessage) error {
    24  	select {
    25  	case m.Responses <- r:
    26  		return nil
    27  	case <-ctx.Done():
    28  		return ctx.Err()
    29  	}
    30  }
    31  
    32  func TestChariotSuccessfulResponse(t *testing.T) {
    33  	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
    34  	defer cancel()
    35  
    36  	var mrp = newMockResponsePublisher()
    37  
    38  	var mpss = newMockPubSubService()
    39  	defer mpss.Close()
    40  
    41  	const testbucket = "testbucket"
    42  	var existingIgnoredObject = StorageObject{
    43  		Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket),
    44  		Content:  "this file causes the fake-gcs-server to create the test bucket",
    45  	}
    46  	gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{
    47  		ObjectAttrs: fakestorage.ObjectAttrs{
    48  			BucketName: existingIgnoredObject.getGcsBucket(),
    49  			Name:       existingIgnoredObject.getGcsPath(),
    50  		},
    51  		Content: []byte(existingIgnoredObject.Content),
    52  	})
    53  	if err != nil {
    54  		t.Fatal(err)
    55  	}
    56  	defer stopFakeGcsServer()
    57  
    58  	var (
    59  		mrpOption = OptionPubSubResponsePublisher(mrp)
    60  		psrOption = OptionPubSubReceiver(mpss)
    61  		gcsOption = OptionGoogleCloudStorage(gcsClient)
    62  	)
    63  
    64  	daemon, err := NewDaemon(psrOption, gcsOption, mrpOption)
    65  	if err != nil {
    66  		t.Fatal(err)
    67  	}
    68  	go daemon.Run(ctx) //nolint:errcheck
    69  
    70  	var createReqs = []Request{
    71  		{
    72  			Banner:    testbucket,
    73  			Cluster:   "", // empty for banner-wide objects
    74  			Operation: "CREATE",
    75  			Objects: [][]byte{
    76  				randomChariotYamlObject(),
    77  				randomChariotYamlObject(),
    78  				randomChariotYamlObject(),
    79  				randomChariotYamlObject(),
    80  				randomChariotYamlObject(),
    81  			},
    82  			Owner: "test",
    83  		},
    84  		{
    85  			Banner:    testbucket,
    86  			Cluster:   "cluster1",
    87  			Operation: "CREATE",
    88  			Objects: [][]byte{
    89  				randomChariotYamlObject(),
    90  				randomChariotYamlObject(),
    91  				randomChariotYamlObject(),
    92  				randomChariotYamlObject(),
    93  				randomChariotYamlObject(),
    94  			},
    95  			Owner: "test",
    96  		},
    97  	}
    98  
    99  	// Send requests over pubsub.
   100  	t.Logf("Sending %d create requests to the Daemon", len(createReqs))
   101  	for i, req := range createReqs {
   102  		var an = newMockPubSubAckNacker()
   103  		defer an.Close()
   104  		data, err := json.Marshal(req)
   105  		if err != nil {
   106  			t.Fatal(err)
   107  		}
   108  		err = mpss.Send(ctx, &PubSubMessage{
   109  			ackNack:     an,
   110  			id:          fmt.Sprintf("%d", i),
   111  			data:        data,
   112  			publishTime: time.Now(),
   113  		})
   114  		if err != nil {
   115  			t.Fatal(err)
   116  		}
   117  		select {
   118  		case <-an.Acked:
   119  			t.Logf("Got ack from message %d", i+1)
   120  		case <-an.Nacked:
   121  			t.Fatalf("Message should not send a nack signal")
   122  		case <-ctx.Done():
   123  			t.Fatal(ctx.Err())
   124  		}
   125  
   126  		// Check for response
   127  		var resp IResponseMessage
   128  		select {
   129  		case resp = <-mrp.Responses:
   130  			t.Logf("Got response data: %s", resp.Data())
   131  			t.Logf("Got response attributes: %v", resp.Attributes())
   132  		case <-ctx.Done():
   133  			t.Fatal("Did not get response in appropriate time.", ctx.Err())
   134  		}
   135  
   136  		// Check that the attribute is correctly set.
   137  		if attr := resp.Attributes(); len(attr) == 0 {
   138  			t.Fatalf("Got empty response pubsub attributes")
   139  		} else if v, ok := attr["owner"]; !ok {
   140  			t.Fatalf("owner attribute not set.")
   141  		} else if v != req.Owner {
   142  			t.Fatalf("owner attribute %q does not equal expected %q", v, req.Owner)
   143  		}
   144  
   145  		// Decode the response data and ensure it's correct.
   146  		var chresp ResponseJSON
   147  		err = json.Unmarshal(resp.Data(), &chresp)
   148  		if err != nil {
   149  			t.Fatal(err)
   150  		}
   151  
   152  		if !chresp.Ok {
   153  			t.Fatalf("The response should set Ok=true")
   154  		}
   155  
   156  		if chresp.Operation != req.Operation {
   157  			t.Fatalf("Response operation %q does not equal request operation %q", chresp.Operation, req.Operation)
   158  		}
   159  
   160  		if chresp.Error != "" {
   161  			t.Fatalf("Do not expect response error: %q", chresp.Error)
   162  		}
   163  
   164  		wantID := fmt.Sprintf("%d", i)
   165  		if chresp.PubSubRequestID != wantID {
   166  			t.Fatalf("Response did not contain correct ID. Want %q Got %q", wantID, chresp.PubSubRequestID)
   167  		}
   168  	}
   169  	t.Logf("Got acks from all %d requests", len(createReqs))
   170  }
   171  
   172  func TestChariotErrorResponse(t *testing.T) {
   173  	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
   174  	defer cancel()
   175  
   176  	var mrp = newMockResponsePublisher()
   177  
   178  	var mpss = newMockPubSubService()
   179  	defer mpss.Close()
   180  
   181  	const testbucket = "testbucket"
   182  	var existingIgnoredObject = StorageObject{
   183  		Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket),
   184  		Content:  "this file causes the fake-gcs-server to create the test bucket",
   185  	}
   186  	gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{
   187  		ObjectAttrs: fakestorage.ObjectAttrs{
   188  			BucketName: existingIgnoredObject.getGcsBucket(),
   189  			Name:       existingIgnoredObject.getGcsPath(),
   190  		},
   191  		Content: []byte(existingIgnoredObject.Content),
   192  	})
   193  	if err != nil {
   194  		t.Fatal(err)
   195  	}
   196  	defer stopFakeGcsServer()
   197  
   198  	var (
   199  		mrpOption = OptionPubSubResponsePublisher(mrp)
   200  		psrOption = OptionPubSubReceiver(mpss)
   201  		gcsOption = OptionGoogleCloudStorage(gcsClient)
   202  	)
   203  
   204  	daemon, err := NewDaemon(psrOption, gcsOption, mrpOption)
   205  	if err != nil {
   206  		t.Fatal(err)
   207  	}
   208  	go daemon.Run(ctx) //nolint:errcheck
   209  
   210  	var createReqs = []Request{
   211  		{
   212  			Banner:    "", // Empty to cause an error
   213  			Operation: "CREATE",
   214  			Objects: [][]byte{
   215  				randomChariotYamlObject(),
   216  				randomChariotYamlObject(),
   217  				randomChariotYamlObject(),
   218  				randomChariotYamlObject(),
   219  				randomChariotYamlObject(),
   220  			},
   221  			Owner: "test",
   222  		},
   223  		{
   224  			Banner:    testbucket,
   225  			Cluster:   "cluster1",
   226  			Operation: "DANCE", // An invalid operation to cause an error.
   227  			Objects: [][]byte{
   228  				randomChariotYamlObject(),
   229  				randomChariotYamlObject(),
   230  				randomChariotYamlObject(),
   231  				randomChariotYamlObject(),
   232  				randomChariotYamlObject(),
   233  			},
   234  			Owner: "test",
   235  		},
   236  		{
   237  			Banner:    testbucket,
   238  			Cluster:   "",
   239  			Operation: "CREATE",
   240  			Objects: [][]byte{
   241  				randomChariotYamlObject(),
   242  				randomChariotYamlObject(),
   243  				randomChariotYamlObject(),
   244  				randomChariotYamlObject(),
   245  				[]byte("AN INVALID REQUEST"), // non yaml to cause error.
   246  				randomChariotYamlObject(),
   247  			},
   248  			Owner: "test",
   249  		},
   250  	}
   251  
   252  	// Send requests over pubsub.
   253  	t.Logf("Sending %d create requests to the Daemon", len(createReqs))
   254  	for i, req := range createReqs {
   255  		var an = newMockPubSubAckNacker()
   256  		defer an.Close()
   257  		data, err := json.Marshal(req)
   258  		if err != nil {
   259  			t.Fatal(err)
   260  		}
   261  		err = mpss.Send(ctx, &PubSubMessage{
   262  			ackNack:     an,
   263  			id:          fmt.Sprintf("%d", i),
   264  			data:        data,
   265  			publishTime: time.Now(),
   266  		})
   267  		if err != nil {
   268  			t.Fatal(err)
   269  		}
   270  		select {
   271  		case <-an.Acked:
   272  			t.Logf("Got ack from message %d", i+1)
   273  		case <-an.Nacked:
   274  			t.Fatalf("Message should not send a nack signal")
   275  		case <-ctx.Done():
   276  			t.Fatal(ctx.Err())
   277  		}
   278  
   279  		// Check for response
   280  		var resp IResponseMessage
   281  		select {
   282  		case resp = <-mrp.Responses:
   283  			t.Logf("Got response data: %s", resp.Data())
   284  			t.Logf("Got response attributes: %v", resp.Attributes())
   285  		case <-ctx.Done():
   286  			t.Fatal("Did not get response in appropriate time.", ctx.Err())
   287  		}
   288  
   289  		// Check that the attribute is correctly set.
   290  		if attr := resp.Attributes(); len(attr) == 0 {
   291  			t.Fatalf("Got empty response pubsub attributes")
   292  		} else if v, ok := attr["owner"]; !ok {
   293  			t.Fatalf("owner attribute not set.")
   294  		} else if v != req.Owner {
   295  			t.Fatalf("owner attribute %q does not equal expected %q", v, req.Owner)
   296  		}
   297  
   298  		// Decode the response data and ensure it's correct.
   299  		var chresp ResponseJSON
   300  		err = json.Unmarshal(resp.Data(), &chresp)
   301  		if err != nil {
   302  			t.Fatal(err)
   303  		}
   304  
   305  		if chresp.Ok {
   306  			t.Fatalf("The response should set Ok=false")
   307  		}
   308  
   309  		if chresp.Error == "" {
   310  			t.Fatalf("Did not get response error.")
   311  		}
   312  
   313  		if chresp.Operation != req.Operation {
   314  			t.Fatalf("Response operation %q does not equal request operation %q", chresp.Operation, req.Operation)
   315  		}
   316  
   317  		wantID := fmt.Sprintf("%d", i)
   318  		if chresp.PubSubRequestID != wantID {
   319  			t.Fatalf("Response did not contain correct ID. Want %q Got %q", wantID, chresp.PubSubRequestID)
   320  		}
   321  	}
   322  	t.Logf("Got acks from all %d requests", len(createReqs))
   323  }
   324  

View as plain text