...

Source file src/edge-infra.dev/test/f2/examples/pstest/pstest_test.go

Documentation: edge-infra.dev/test/f2/examples/pstest

     1  package pstest
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"os"
     7  	"testing"
     8  	"time"
     9  
    10  	"cloud.google.com/go/pubsub"
    11  	"google.golang.org/api/option"
    12  	"gotest.tools/v3/assert"
    13  	"gotest.tools/v3/poll"
    14  
    15  	"edge-infra.dev/test/f2"
    16  	"edge-infra.dev/test/f2/x/pstest"
    17  )
    18  
    19  const (
    20  	delay   = 10 * time.Millisecond
    21  	timeout = 500 * time.Millisecond
    22  )
    23  
    24  var f f2.Framework
    25  
    26  func TestMain(m *testing.M) {
    27  	f = f2.New(context.Background(), f2.WithExtensions(pstest.New(pstest.WithProjectID("test_id")))).
    28  		WithLabel("foo", "bar").
    29  		Flaky().
    30  		Setup().
    31  		Teardown()
    32  	os.Exit(f.Run(m))
    33  }
    34  
    35  func TestPstest(t *testing.T) {
    36  	psEmu := f2.NewFeature("PubSub Emulator").
    37  		Setup("Create Topic", pstest.WithNewTopic("topic")).
    38  		Setup("Create Subscription to Topic",
    39  			pstest.WithNewSubscription("sub", "topic",
    40  				// commented out due to failure panic: pstest: bad filter: undeclared identifier 'filter'
    41  				// pstest.WithFilter("filter"),
    42  				pstest.WithAckDeadline(time.Minute),
    43  				pstest.WithExpirationPolicy(time.Duration(0)))).
    44  		Test("Successfully receive a published message", func(ctx f2.Context, t *testing.T) f2.Context {
    45  			topicID, subID := "topic", "sub"
    46  
    47  			// Get fresh PubSub client from context
    48  			ps := pstest.FromContextT(ctx, t)
    49  
    50  			// Subscribe
    51  			messageReceived := false
    52  			var receivedMessage *pubsub.Message
    53  			subCtx, cancelfunc := context.WithCancel(ctx)
    54  			go func() {
    55  				err := ps.Subscribe(subCtx, subID, func(ctx context.Context, msg *pubsub.Message) {
    56  					messageReceived = true
    57  					receivedMessage = msg
    58  					msg.Ack()
    59  				})
    60  				if err != nil {
    61  					assert.ErrorIs(t, err, context.Canceled)
    62  				}
    63  			}()
    64  
    65  			// Publish
    66  			data, attr := "hello world", map[string]string{"attr": "value"}
    67  			msgID, err := ps.Publish(ctx, topicID, data, attr)
    68  			assert.NilError(t, err)
    69  
    70  			// Assertions
    71  			poll.WaitOn(
    72  				t,
    73  				func(t poll.LogT) poll.Result {
    74  					if messageReceived {
    75  						return poll.Success()
    76  					}
    77  					return poll.Continue("waiting...")
    78  				},
    79  				poll.WithDelay(delay),
    80  				poll.WithTimeout(timeout),
    81  			)
    82  
    83  			assert.Equal(t, data, string(receivedMessage.Data))
    84  			assert.DeepEqual(t, attr, receivedMessage.Attributes)
    85  			assert.Equal(t, msgID, receivedMessage.ID)
    86  
    87  			// Ensure subcription subcontexts are given time to fully cancel
    88  			cancelfunc()
    89  			poll.WaitOn(
    90  				t,
    91  				func(t poll.LogT) poll.Result {
    92  					if errors.Is(subCtx.Err(), context.Canceled) {
    93  						return poll.Success()
    94  					}
    95  					return poll.Continue("waiting...")
    96  				},
    97  				poll.WithDelay(delay),
    98  				poll.WithTimeout(timeout),
    99  			)
   100  
   101  			return ctx
   102  		}).
   103  		Test("Use external client", func(ctx f2.Context, t *testing.T) f2.Context {
   104  			ps := pstest.FromContextT(ctx, t)
   105  
   106  			// You can connect your own client to the mock server
   107  			client, err := pubsub.NewClient(ctx, "project", option.WithGRPCConn(ps.Conn))
   108  			assert.NilError(t, err)
   109  
   110  			// Create topics and subscriptions using your client
   111  			top, err := client.CreateTopic(ctx, "different-topic")
   112  			assert.NilError(t, err)
   113  			sub, err := client.CreateSubscription(ctx, "different-sub", pubsub.SubscriptionConfig{Topic: top})
   114  			assert.NilError(t, err)
   115  
   116  			// All messages will be sent and received through the mock server
   117  			messageReceived := false
   118  			subCtx, cancelfunc := context.WithCancel(ctx)
   119  			go func() {
   120  				err = sub.Receive(subCtx, func(ctx context.Context, msg *pubsub.Message) {
   121  					messageReceived = true
   122  					msg.Ack()
   123  				})
   124  				if err != nil {
   125  					assert.ErrorIs(t, err, context.Canceled)
   126  				}
   127  			}()
   128  			top.Publish(ctx, &pubsub.Message{Data: []byte("hello world")})
   129  
   130  			poll.WaitOn(
   131  				t,
   132  				func(t poll.LogT) poll.Result {
   133  					if messageReceived {
   134  						return poll.Success()
   135  					}
   136  					return poll.Continue("waiting...")
   137  				},
   138  				poll.WithDelay(delay),
   139  				poll.WithTimeout(timeout),
   140  			)
   141  
   142  			cancelfunc()
   143  			poll.WaitOn(
   144  				t,
   145  				func(t poll.LogT) poll.Result {
   146  					if errors.Is(subCtx.Err(), context.Canceled) {
   147  						return poll.Success()
   148  					}
   149  					return poll.Continue("waiting...")
   150  				},
   151  				poll.WithDelay(delay),
   152  				poll.WithTimeout(timeout),
   153  			)
   154  
   155  			return ctx
   156  		}).
   157  		Feature()
   158  	f.Test(t, psEmu)
   159  }
   160  

View as plain text