...

Source file src/edge-infra.dev/test/f2/x/pstest/pstest.go

Documentation: edge-infra.dev/test/f2/x/pstest

     1  package pstest
     2  
     3  import (
     4  	"context"
     5  	"fmt"
     6  	"testing"
     7  
     8  	_ "edge-infra.dev/test/f2/integration" // Required for registering integration test flag
     9  
    10  	"cloud.google.com/go/pubsub"
    11  	"cloud.google.com/go/pubsub/pstest"
    12  	"google.golang.org/api/option"
    13  	"google.golang.org/grpc"
    14  	"google.golang.org/grpc/credentials/insecure"
    15  
    16  	"edge-infra.dev/test/f2"
    17  	"edge-infra.dev/test/f2/fctx"
    18  )
    19  
    20  type PubSub struct {
    21  	client *pubsub.Client
    22  
    23  	// Conn provides the following:
    24  	//
    25  	// 1. Test subscriptions losing connection by calling
    26  	//      ps.Conn.Close()
    27  	//
    28  	// 2. Create pubsub clients in different projects by calling
    29  	//      pubsub.NewClient(ctx, "projectID", option.WithGRPCConn(ps.Conn))
    30  	Conn *grpc.ClientConn
    31  
    32  	options *pubsuboptions
    33  }
    34  
    35  // Create a new mock PubSub instance.
    36  func New(opts ...PubSubOption) (ps *PubSub) {
    37  	o := makePubSubOptions(opts...)
    38  	return &PubSub{
    39  		options: o,
    40  	}
    41  }
    42  
    43  // FromContext attempts to fetch an instance of PubSub from the test context and
    44  // returns an error if it is not discovered.
    45  func FromContext(ctx context.Context) (*PubSub, error) {
    46  	v := fctx.ValueFrom[PubSub](ctx)
    47  	if v == nil {
    48  		return nil, fmt.Errorf("%w: pstest.PubSub extension", fctx.ErrNotFound)
    49  	}
    50  	return v, nil
    51  }
    52  
    53  // FromContextT is a testing variant of FromContext that immediately fails the
    54  // test if PubSub is not present in the testing context.
    55  func FromContextT(ctx fctx.Context, t *testing.T) *PubSub {
    56  	return fctx.ValueFromT[PubSub](ctx, t)
    57  }
    58  
    59  // IntoContext stores the framework extension in the test context.
    60  func (ps *PubSub) IntoContext(ctx f2.Context) f2.Context {
    61  	return fctx.ValueInto(ctx, ps)
    62  }
    63  
    64  // RegisterFns is called by the framework after binding and parsing test flags (not currently implemented here).
    65  // Ensures that every test has a fresh instance of PubSub.
    66  func (ps *PubSub) RegisterFns(f f2.Framework) {
    67  	var server *pstest.Server
    68  	f.BeforeEachTest(func(ctx fctx.Context, t *testing.T) (fctx.Context, error) {
    69  		server = pstest.NewServerWithPort(ps.options.port)
    70  		t.Setenv("PUBSUB_EMULATOR_HOST", server.Addr)
    71  
    72  		var err error
    73  		ps.Conn, err = grpc.NewClient(server.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    74  		if err != nil {
    75  			return ctx, err
    76  		}
    77  
    78  		ps.client, err = pubsub.NewClient(ctx, ps.options.projectID, option.WithGRPCConn(ps.Conn))
    79  		if err != nil {
    80  			return ctx, err
    81  		}
    82  
    83  		return ctx, nil
    84  	}).AfterEachTest(func(ctx fctx.Context, _ *testing.T) (fctx.Context, error) {
    85  		ps.client.Close()
    86  		ps.Conn.Close()
    87  		server.Close()
    88  		return ctx, nil
    89  	})
    90  }
    91  
    92  // Creates a new topic in the PubSub instance.
    93  func (ps *PubSub) CreateTopic(ctx context.Context, topicID string) (*pubsub.Topic, error) {
    94  	return ps.client.CreateTopic(ctx, topicID)
    95  }
    96  
    97  // Creates a new subscription to a given topic in the Pubsub instance.
    98  func (ps *PubSub) CreateSubscription(ctx context.Context, topicID, subID string, opts ...SubOption) (*pubsub.Subscription, error) {
    99  	o := makeSubOptions(opts...)
   100  	top := ps.client.Topic(topicID)
   101  	return ps.client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
   102  		Topic:            top,
   103  		AckDeadline:      o.ackDeadline,
   104  		ExpirationPolicy: o.expirationPolicy,
   105  		Filter:           o.filter,
   106  	})
   107  }
   108  
   109  // Publishes a pubsub message to all subscribers of a given topic
   110  func (ps *PubSub) Publish(ctx context.Context, topicID, data string, attributes map[string]string) (msgID string, err error) {
   111  	top := ps.client.Topic(topicID)
   112  	res := top.Publish(ctx, &pubsub.Message{
   113  		Data:       []byte(data),
   114  		Attributes: attributes,
   115  	})
   116  	return res.Get(ctx)
   117  }
   118  
   119  // Starts an active subscription with a given receive function
   120  func (ps *PubSub) Subscribe(ctx context.Context, subID string, receiveFunc func(context.Context, *pubsub.Message)) error {
   121  	sub := ps.client.Subscription(subID)
   122  	return sub.Receive(ctx, receiveFunc)
   123  }
   124  
   125  // StepFns
   126  
   127  func WithNewTopic(topicID string) func(ctx f2.Context, t *testing.T) f2.Context {
   128  	return func(ctx f2.Context, t *testing.T) f2.Context {
   129  		ps := FromContextT(ctx, t)
   130  		_, err := ps.CreateTopic(ctx, topicID)
   131  		if err != nil {
   132  			t.Fatal(err)
   133  		}
   134  		return ctx
   135  	}
   136  }
   137  
   138  func WithNewSubscription(
   139  	subID, topicID string,
   140  	opts ...SubOption,
   141  ) func(ctx f2.Context, t *testing.T) f2.Context {
   142  	return func(ctx f2.Context, t *testing.T) f2.Context {
   143  		ps := FromContextT(ctx, t)
   144  		_, err := ps.CreateSubscription(ctx, topicID, subID, opts...)
   145  		if err != nil {
   146  			t.Fatal(err)
   147  		}
   148  		return ctx
   149  	}
   150  }
   151  

View as plain text