...

Source file src/edge-infra.dev/pkg/edge/chariot/mockpubsubpull_test.go

Documentation: edge-infra.dev/pkg/edge/chariot

     1  package chariot
     2  
     3  import (
     4  	"context"
     5  	"encoding/json"
     6  	"fmt"
     7  	"io"
     8  	"testing"
     9  	"time"
    10  
    11  	"cloud.google.com/go/storage"
    12  	"github.com/fsouza/fake-gcs-server/fakestorage"
    13  )
    14  
    15  type mockPubSubAckNacker struct {
    16  	Acked  chan bool
    17  	Nacked chan bool
    18  }
    19  
    20  func newMockPubSubAckNacker() *mockPubSubAckNacker {
    21  	return &mockPubSubAckNacker{
    22  		Acked:  make(chan bool, 1),
    23  		Nacked: make(chan bool, 1),
    24  	}
    25  }
    26  
    27  func (mpsan *mockPubSubAckNacker) Close() error {
    28  	close(mpsan.Acked)
    29  	close(mpsan.Nacked)
    30  	return nil
    31  }
    32  
    33  func (mpsan *mockPubSubAckNacker) Ack() {
    34  	mpsan.Acked <- true
    35  }
    36  
    37  func (mpsan *mockPubSubAckNacker) Nack() {
    38  	mpsan.Nacked <- true
    39  }
    40  
    41  type mockPubSubService struct {
    42  	messages chan IPubSubMessage
    43  }
    44  
    45  func newMockPubSubService() *mockPubSubService {
    46  	return &mockPubSubService{
    47  		messages: make(chan IPubSubMessage, 100),
    48  	}
    49  }
    50  
    51  func (mpss *mockPubSubService) Close() error {
    52  	close(mpss.messages)
    53  	return nil
    54  }
    55  
    56  func (mpss *mockPubSubService) Send(ctx context.Context, ipsm IPubSubMessage) error {
    57  	select {
    58  	case mpss.messages <- ipsm:
    59  		return nil
    60  	case <-ctx.Done():
    61  		return ctx.Err()
    62  	}
    63  }
    64  
    65  func (mpss *mockPubSubService) Receive(ctx context.Context, f func(context.Context, IPubSubMessage)) error {
    66  	for msg := range mpss.messages {
    67  		go f(ctx, msg)
    68  	}
    69  	return fmt.Errorf("All calls to Receive return with an error per the pubsub.Subscription.Receive specs")
    70  }
    71  
    72  func TestDaemonCreatesObjects(t *testing.T) {
    73  	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
    74  	defer cancel()
    75  
    76  	var mpss = newMockPubSubService()
    77  	defer mpss.Close()
    78  
    79  	const testbucket = "testbucket"
    80  	var existingIgnoredObject = StorageObject{
    81  		Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket),
    82  		Content:  "this file causes the fake-gcs-server to create the test bucket",
    83  	}
    84  	gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{
    85  		ObjectAttrs: fakestorage.ObjectAttrs{
    86  			BucketName: existingIgnoredObject.getGcsBucket(),
    87  			Name:       existingIgnoredObject.getGcsPath(),
    88  		},
    89  		Content: []byte(existingIgnoredObject.Content),
    90  	})
    91  	if err != nil {
    92  		t.Fatal(err)
    93  	}
    94  	defer stopFakeGcsServer()
    95  
    96  	var (
    97  		psrOption = OptionPubSubReceiver(mpss)
    98  		gcsOption = OptionGoogleCloudStorage(gcsClient)
    99  	)
   100  
   101  	daemon, err := NewDaemon(psrOption, gcsOption)
   102  	if err != nil {
   103  		t.Fatal(err)
   104  	}
   105  	go daemon.Run(ctx) //nolint:errcheck
   106  
   107  	var createReqs = []Request{
   108  		{
   109  			Banner:    testbucket,
   110  			Cluster:   "", // empty for banner-wide objects
   111  			Operation: "CREATE",
   112  			Objects: [][]byte{
   113  				randomChariotYamlObject(),
   114  				randomChariotYamlObject(),
   115  				randomChariotYamlObject(),
   116  				randomChariotYamlObject(),
   117  				randomChariotYamlObject(),
   118  			},
   119  			Owner: "test",
   120  		},
   121  		{
   122  			Banner:    testbucket,
   123  			Cluster:   "cluster1",
   124  			Operation: "CREATE",
   125  			Objects: [][]byte{
   126  				randomChariotYamlObject(),
   127  				randomChariotYamlObject(),
   128  				randomChariotYamlObject(),
   129  				randomChariotYamlObject(),
   130  				randomChariotYamlObject(),
   131  			},
   132  			Owner: "test",
   133  		},
   134  		{
   135  			Banner:    testbucket,
   136  			Cluster:   "", // empty for banner-wide objects
   137  			Operation: "CREATE",
   138  			Objects: [][]byte{
   139  				randomChariotYamlObject(),
   140  				randomChariotYamlObject(),
   141  				randomChariotYamlObject(),
   142  				randomChariotYamlObject(),
   143  				randomChariotYamlObject(),
   144  			},
   145  			Owner: "test",
   146  		},
   147  		{
   148  			Banner:    testbucket,
   149  			Cluster:   "cluster2",
   150  			Operation: "CREATE",
   151  			Objects: [][]byte{
   152  				randomChariotYamlObject(),
   153  				randomChariotYamlObject(),
   154  				randomChariotYamlObject(),
   155  				randomChariotYamlObject(),
   156  				randomChariotYamlObject(),
   157  			},
   158  			Owner: "test",
   159  		},
   160  	}
   161  
   162  	// Send requests over pubsub.
   163  	t.Logf("Sending %d create requests to the Daemon", len(createReqs))
   164  	for i, req := range createReqs {
   165  		var an = newMockPubSubAckNacker()
   166  		defer an.Close()
   167  		data, err := json.Marshal(req)
   168  		if err != nil {
   169  			t.Fatal(err)
   170  		}
   171  		err = mpss.Send(ctx, &PubSubMessage{
   172  			ackNack:     an,
   173  			id:          fmt.Sprintf("%d", i),
   174  			data:        data,
   175  			publishTime: time.Now(),
   176  		})
   177  		if err != nil {
   178  			t.Fatal(err)
   179  		}
   180  		select {
   181  		case <-an.Acked:
   182  			t.Logf("Got ack from message %d", i+1)
   183  		case <-an.Nacked:
   184  			t.Fatalf("Message should not send a nack signal")
   185  		case <-ctx.Done():
   186  			t.Fatal(ctx.Err())
   187  		}
   188  	}
   189  	t.Logf("Got acks from all %d requests", len(createReqs))
   190  
   191  	// Check the bucket for the files we created.
   192  	for _, req := range createReqs {
   193  		so, err := req.StorageObjects()
   194  		if err != nil {
   195  			t.Fatal(err)
   196  		}
   197  		t.Logf("Checking %d objects exist in bucket", len(so))
   198  		err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...)
   199  		if err != nil {
   200  			t.Fatal(err)
   201  		}
   202  	}
   203  	t.Logf("Successfully created all objects in GCS storage")
   204  }
   205  
   206  func TestDaemonDeletesObjects(t *testing.T) {
   207  	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
   208  	defer cancel()
   209  
   210  	var mpss = newMockPubSubService()
   211  	defer mpss.Close()
   212  
   213  	const testbucket = "testbucket"
   214  	var existingIgnoredObject = StorageObject{
   215  		Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket),
   216  		Content:  "this file causes the fake-gcs-server to create the test bucket",
   217  	}
   218  	gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{
   219  		ObjectAttrs: fakestorage.ObjectAttrs{
   220  			BucketName: existingIgnoredObject.getGcsBucket(),
   221  			Name:       existingIgnoredObject.getGcsPath(),
   222  		},
   223  		Content: []byte(existingIgnoredObject.Content),
   224  	})
   225  	if err != nil {
   226  		t.Fatal(err)
   227  	}
   228  	defer stopFakeGcsServer()
   229  
   230  	var (
   231  		psrOption = OptionPubSubReceiver(mpss)
   232  		gcsOption = OptionGoogleCloudStorage(gcsClient)
   233  	)
   234  
   235  	daemon, err := NewDaemon(psrOption, gcsOption)
   236  	if err != nil {
   237  		t.Fatal(err)
   238  	}
   239  	go daemon.Run(ctx) //nolint:errcheck
   240  
   241  	var createReqs = []Request{
   242  		{
   243  			Banner:    testbucket,
   244  			Cluster:   "", // empty for banner-wide objects
   245  			Operation: "CREATE",
   246  			Objects: [][]byte{
   247  				randomChariotYamlObject(),
   248  				randomChariotYamlObject(),
   249  				randomChariotYamlObject(),
   250  				randomChariotYamlObject(),
   251  				randomChariotYamlObject(),
   252  			},
   253  			Owner: "test",
   254  		},
   255  		{
   256  			Banner:    testbucket,
   257  			Cluster:   "cluster1",
   258  			Operation: "CREATE",
   259  			Objects: [][]byte{
   260  				randomChariotYamlObject(),
   261  				randomChariotYamlObject(),
   262  				randomChariotYamlObject(),
   263  				randomChariotYamlObject(),
   264  				randomChariotYamlObject(),
   265  			},
   266  			Owner: "test",
   267  		},
   268  		{
   269  			Banner:    testbucket,
   270  			Cluster:   "", // empty for banner-wide objects
   271  			Operation: "CREATE",
   272  			Objects: [][]byte{
   273  				randomChariotYamlObject(),
   274  				randomChariotYamlObject(),
   275  				randomChariotYamlObject(),
   276  				randomChariotYamlObject(),
   277  				randomChariotYamlObject(),
   278  			},
   279  			Owner: "test",
   280  		},
   281  		{
   282  			Banner:    testbucket,
   283  			Cluster:   "cluster2",
   284  			Operation: "CREATE",
   285  			Objects: [][]byte{
   286  				randomChariotYamlObject(),
   287  				randomChariotYamlObject(),
   288  				randomChariotYamlObject(),
   289  				randomChariotYamlObject(),
   290  				randomChariotYamlObject(),
   291  			},
   292  			Owner: "test",
   293  		},
   294  	}
   295  
   296  	// Send requests over pubsub.
   297  	t.Logf("Sending %d create requests to the Daemon", len(createReqs))
   298  	for i, req := range createReqs {
   299  		var an = newMockPubSubAckNacker()
   300  		defer an.Close()
   301  		data, err := json.Marshal(req)
   302  		if err != nil {
   303  			t.Fatal(err)
   304  		}
   305  		err = mpss.Send(ctx, &PubSubMessage{
   306  			ackNack:     an,
   307  			id:          fmt.Sprintf("%d", i),
   308  			data:        data,
   309  			publishTime: time.Now(),
   310  		})
   311  		if err != nil {
   312  			t.Fatal(err)
   313  		}
   314  		select {
   315  		case <-an.Acked:
   316  			t.Logf("Got ack from message %d", i+1)
   317  		case <-an.Nacked:
   318  			t.Fatalf("Message should not send a nack signal")
   319  		case <-ctx.Done():
   320  			t.Fatal(ctx.Err())
   321  		}
   322  	}
   323  	t.Logf("Got acks from all %d requests", len(createReqs))
   324  
   325  	// Check the bucket for the files we created.
   326  	for _, req := range createReqs {
   327  		so, err := req.StorageObjects()
   328  		if err != nil {
   329  			t.Fatal(err)
   330  		}
   331  		t.Logf("Checking %d objects exist in bucket", len(so))
   332  		err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...)
   333  		if err != nil {
   334  			t.Fatal(err)
   335  		}
   336  	}
   337  	t.Logf("Created all objects in GCS storage")
   338  
   339  	t.Logf("Now testing the deletion of the created objects in GCS")
   340  	for i, req := range createReqs {
   341  		// Keep half of the objects and delete the other half.
   342  		var keepObjects [][]byte
   343  		var deleteObjects [][]byte
   344  		for j, object := range req.Objects {
   345  			if j%2 == 0 {
   346  				keepObjects = append(keepObjects, object)
   347  			} else {
   348  				deleteObjects = append(deleteObjects, object)
   349  			}
   350  		}
   351  
   352  		// Set the operation to DELETE
   353  		req.Operation = "DELETE"
   354  		req.Objects = deleteObjects
   355  
   356  		var an = newMockPubSubAckNacker()
   357  		defer an.Close()
   358  		data, err := json.Marshal(req)
   359  		if err != nil {
   360  			t.Fatal(err)
   361  		}
   362  		err = mpss.Send(ctx, &PubSubMessage{
   363  			ackNack:     an,
   364  			id:          fmt.Sprintf("%d", i),
   365  			data:        data,
   366  			publishTime: time.Now(),
   367  		})
   368  		if err != nil {
   369  			t.Fatal(err)
   370  		}
   371  		select {
   372  		case <-an.Acked:
   373  			t.Logf("Got ack from message %d", i+1)
   374  		case <-an.Nacked:
   375  			t.Fatalf("Message should not send a nack signal")
   376  		case <-ctx.Done():
   377  			t.Fatal(ctx.Err())
   378  		}
   379  		// Check for the deleted objects.
   380  		so, err := req.StorageObjects()
   381  		if err != nil {
   382  			t.Fatal(err)
   383  		}
   384  		t.Logf("Checking %d objects do not exist in bucket", len(so))
   385  		err = assertStorageObjectsDoNotExistInBucket(ctx, gcsClient, so...)
   386  		if err != nil {
   387  			t.Fatal(err)
   388  		}
   389  
   390  		// Now check for the objects that should be kept.
   391  		req.Operation = "CREATE"
   392  		req.Objects = keepObjects
   393  		so, err = req.StorageObjects()
   394  		if err != nil {
   395  			t.Fatal(err)
   396  		}
   397  		t.Logf("Checking %d objects exist in bucket", len(so))
   398  		err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...)
   399  		if err != nil {
   400  			t.Fatal(err)
   401  		}
   402  	}
   403  	t.Logf("Verified the correct objects were deleted and not deleted")
   404  }
   405  
   406  func TestDaemonDeletesObjectsThatDoNotExist(t *testing.T) {
   407  	ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
   408  	defer cancel()
   409  
   410  	var mpss = newMockPubSubService()
   411  	defer mpss.Close()
   412  
   413  	const testbucket = "testbucket"
   414  	var existingIgnoredObject = StorageObject{
   415  		Location: fmt.Sprintf("gs://%s/ignored.txt", testbucket),
   416  		Content:  "this file causes the fake-gcs-server to create the test bucket",
   417  	}
   418  	gcsClient, stopFakeGcsServer, err := startFakeGcsServer(fakestorage.Object{
   419  		ObjectAttrs: fakestorage.ObjectAttrs{
   420  			BucketName: existingIgnoredObject.getGcsBucket(),
   421  			Name:       existingIgnoredObject.getGcsPath(),
   422  		},
   423  		Content: []byte(existingIgnoredObject.Content),
   424  	})
   425  	if err != nil {
   426  		t.Fatal(err)
   427  	}
   428  	defer stopFakeGcsServer()
   429  
   430  	var (
   431  		psrOption = OptionPubSubReceiver(mpss)
   432  		gcsOption = OptionGoogleCloudStorage(gcsClient)
   433  	)
   434  
   435  	daemon, err := NewDaemon(psrOption, gcsOption)
   436  	if err != nil {
   437  		t.Fatal(err)
   438  	}
   439  	go daemon.Run(ctx) //nolint:errcheck
   440  
   441  	var createReqs = []Request{
   442  		{
   443  			Banner:    testbucket,
   444  			Cluster:   "", // empty for banner-wide objects
   445  			Operation: "CREATE",
   446  			Objects: [][]byte{
   447  				randomChariotYamlObject(),
   448  				randomChariotYamlObject(),
   449  				randomChariotYamlObject(),
   450  				randomChariotYamlObject(),
   451  				randomChariotYamlObject(),
   452  			},
   453  			Owner: "test",
   454  		},
   455  		{
   456  			Banner:    testbucket,
   457  			Cluster:   "cluster1",
   458  			Operation: "CREATE",
   459  			Objects: [][]byte{
   460  				randomChariotYamlObject(),
   461  				randomChariotYamlObject(),
   462  				randomChariotYamlObject(),
   463  				randomChariotYamlObject(),
   464  				randomChariotYamlObject(),
   465  			},
   466  			Owner: "test",
   467  		},
   468  		{
   469  			Banner:    testbucket,
   470  			Cluster:   "", // empty for banner-wide objects
   471  			Operation: "CREATE",
   472  			Objects: [][]byte{
   473  				randomChariotYamlObject(),
   474  				randomChariotYamlObject(),
   475  				randomChariotYamlObject(),
   476  				randomChariotYamlObject(),
   477  				randomChariotYamlObject(),
   478  			},
   479  			Owner: "test",
   480  		},
   481  		{
   482  			Banner:    testbucket,
   483  			Cluster:   "cluster2",
   484  			Operation: "CREATE",
   485  			Objects: [][]byte{
   486  				randomChariotYamlObject(),
   487  				randomChariotYamlObject(),
   488  				randomChariotYamlObject(),
   489  				randomChariotYamlObject(),
   490  				randomChariotYamlObject(),
   491  			},
   492  			Owner: "test",
   493  		},
   494  	}
   495  
   496  	// Send requests over pubsub.
   497  	t.Logf("Sending %d create requests to the Daemon", len(createReqs))
   498  	for i, req := range createReqs {
   499  		var an = newMockPubSubAckNacker()
   500  		defer an.Close()
   501  		data, err := json.Marshal(req)
   502  		if err != nil {
   503  			t.Fatal(err)
   504  		}
   505  		err = mpss.Send(ctx, &PubSubMessage{
   506  			ackNack:     an,
   507  			id:          fmt.Sprintf("%d", i),
   508  			data:        data,
   509  			publishTime: time.Now(),
   510  		})
   511  		if err != nil {
   512  			t.Fatal(err)
   513  		}
   514  		select {
   515  		case <-an.Acked:
   516  			t.Logf("Got ack from message %d", i+1)
   517  		case <-an.Nacked:
   518  			t.Fatalf("Message should not send a nack signal")
   519  		case <-ctx.Done():
   520  			t.Fatal(ctx.Err())
   521  		}
   522  	}
   523  	t.Logf("Got acks from all %d requests", len(createReqs))
   524  
   525  	// Check the bucket for the files we created.
   526  	for _, req := range createReqs {
   527  		so, err := req.StorageObjects()
   528  		if err != nil {
   529  			t.Fatal(err)
   530  		}
   531  		t.Logf("Checking %d objects exist in bucket", len(so))
   532  		err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...)
   533  		if err != nil {
   534  			t.Fatal(err)
   535  		}
   536  	}
   537  	t.Logf("Created all objects in GCS storage")
   538  
   539  	t.Logf("Now testing the deletion of the created objects in GCS")
   540  	for i, req := range createReqs {
   541  		// Keep half of the objects and delete the other half.
   542  		var keepObjects [][]byte
   543  		var deleteObjects [][]byte
   544  		for j, object := range req.Objects {
   545  			if j%2 == 0 {
   546  				keepObjects = append(keepObjects, object)
   547  			} else {
   548  				deleteObjects = append(deleteObjects, object)
   549  			}
   550  		}
   551  
   552  		// Set the operation to DELETE
   553  		req.Operation = "DELETE"
   554  		req.Objects = deleteObjects
   555  
   556  		var an = newMockPubSubAckNacker()
   557  		defer an.Close()
   558  		data, err := json.Marshal(req)
   559  		if err != nil {
   560  			t.Fatal(err)
   561  		}
   562  		err = mpss.Send(ctx, &PubSubMessage{
   563  			ackNack:     an,
   564  			id:          fmt.Sprintf("%d", i),
   565  			data:        data,
   566  			publishTime: time.Now(),
   567  		})
   568  		if err != nil {
   569  			t.Fatal(err)
   570  		}
   571  		select {
   572  		case <-an.Acked:
   573  			t.Logf("Got ack from message %d", i+1)
   574  		case <-an.Nacked:
   575  			t.Fatalf("Message should not send a nack signal")
   576  		case <-ctx.Done():
   577  			t.Fatal(ctx.Err())
   578  		}
   579  		// Check for the deleted objects.
   580  		so, err := req.StorageObjects()
   581  		if err != nil {
   582  			t.Fatal(err)
   583  		}
   584  		t.Logf("Checking %d objects do not exist in bucket", len(so))
   585  		err = assertStorageObjectsDoNotExistInBucket(ctx, gcsClient, so...)
   586  		if err != nil {
   587  			t.Fatal(err)
   588  		}
   589  		// Attempt to delete the objects again
   590  		t.Logf("Attempting to delete objects again")
   591  		err = mpss.Send(ctx, &PubSubMessage{
   592  			ackNack:     an,
   593  			id:          fmt.Sprintf("%d", i),
   594  			data:        data,
   595  			publishTime: time.Now(),
   596  		})
   597  		if err != nil {
   598  			t.Fatal(err)
   599  		}
   600  		select {
   601  		case <-an.Acked:
   602  			t.Logf("Got ack from message %d", i+1)
   603  		case <-an.Nacked:
   604  			t.Fatalf("Message should not send a nack signal")
   605  		case <-ctx.Done():
   606  			t.Fatal(ctx.Err())
   607  		}
   608  
   609  		// Now check for the objects that should be kept.
   610  		req.Operation = "CREATE"
   611  		req.Objects = keepObjects
   612  		so, err = req.StorageObjects()
   613  		if err != nil {
   614  			t.Fatal(err)
   615  		}
   616  		t.Logf("Checking %d objects exist in bucket", len(so))
   617  		err = assertStorageObjectsExistInBucket(ctx, gcsClient, so...)
   618  		if err != nil {
   619  			t.Fatal(err)
   620  		}
   621  	}
   622  	t.Logf("Verified the correct objects were deleted and not deleted")
   623  }
   624  
   625  func assertStorageObjectsExistInBucket(ctx context.Context, client *storage.Client, objects ...StorageObject) error {
   626  	for _, object := range objects {
   627  		var bucket = object.getGcsBucket()
   628  		var path = object.getGcsPath()
   629  		r, err := client.Bucket(bucket).Object(path).NewReader(ctx)
   630  		if err != nil {
   631  			return err
   632  		}
   633  		b, err := io.ReadAll(r)
   634  		if err != nil {
   635  			return err
   636  		} else if err = r.Close(); err != nil {
   637  			return err
   638  		} else if string(b) != object.Content {
   639  			return fmt.Errorf("Expected object content does not match. Got %q, want %q", string(b), object.Content)
   640  		}
   641  	}
   642  	return nil
   643  }
   644  
   645  func assertStorageObjectsDoNotExistInBucket(ctx context.Context, client *storage.Client, objects ...StorageObject) error {
   646  	for _, object := range objects {
   647  		var bucket = object.getGcsBucket()
   648  		var path = object.getGcsPath()
   649  		r, err := client.Bucket(bucket).Object(path).NewReader(ctx)
   650  		if err != nil && err == storage.ErrObjectNotExist {
   651  			// Yay it does not exist.
   652  			continue
   653  		} else if err != nil {
   654  			return err
   655  		}
   656  		r.Close()
   657  		return fmt.Errorf("Object at location %q should not exist", object.Location)
   658  	}
   659  	return nil
   660  }
   661  

View as plain text