package pstest import ( "context" "fmt" "testing" _ "edge-infra.dev/test/f2/integration" // Required for registering integration test flag "cloud.google.com/go/pubsub" "cloud.google.com/go/pubsub/pstest" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" "edge-infra.dev/test/f2" "edge-infra.dev/test/f2/fctx" ) type PubSub struct { client *pubsub.Client // Conn provides the following: // // 1. Test subscriptions losing connection by calling // ps.Conn.Close() // // 2. Create pubsub clients in different projects by calling // pubsub.NewClient(ctx, "projectID", option.WithGRPCConn(ps.Conn)) Conn *grpc.ClientConn options *pubsuboptions } // Create a new mock PubSub instance. func New(opts ...PubSubOption) (ps *PubSub) { o := makePubSubOptions(opts...) return &PubSub{ options: o, } } // FromContext attempts to fetch an instance of PubSub from the test context and // returns an error if it is not discovered. func FromContext(ctx context.Context) (*PubSub, error) { v := fctx.ValueFrom[PubSub](ctx) if v == nil { return nil, fmt.Errorf("%w: pstest.PubSub extension", fctx.ErrNotFound) } return v, nil } // FromContextT is a testing variant of FromContext that immediately fails the // test if PubSub is not present in the testing context. func FromContextT(ctx fctx.Context, t *testing.T) *PubSub { return fctx.ValueFromT[PubSub](ctx, t) } // IntoContext stores the framework extension in the test context. func (ps *PubSub) IntoContext(ctx f2.Context) f2.Context { return fctx.ValueInto(ctx, ps) } // RegisterFns is called by the framework after binding and parsing test flags (not currently implemented here). // Ensures that every test has a fresh instance of PubSub. func (ps *PubSub) RegisterFns(f f2.Framework) { var server *pstest.Server f.BeforeEachTest(func(ctx fctx.Context, t *testing.T) (fctx.Context, error) { server = pstest.NewServerWithPort(ps.options.port) t.Setenv("PUBSUB_EMULATOR_HOST", server.Addr) var err error ps.Conn, err = grpc.NewClient(server.Addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return ctx, err } ps.client, err = pubsub.NewClient(ctx, ps.options.projectID, option.WithGRPCConn(ps.Conn)) if err != nil { return ctx, err } return ctx, nil }).AfterEachTest(func(ctx fctx.Context, _ *testing.T) (fctx.Context, error) { ps.client.Close() ps.Conn.Close() server.Close() return ctx, nil }) } // Creates a new topic in the PubSub instance. func (ps *PubSub) CreateTopic(ctx context.Context, topicID string) (*pubsub.Topic, error) { return ps.client.CreateTopic(ctx, topicID) } // Creates a new subscription to a given topic in the Pubsub instance. func (ps *PubSub) CreateSubscription(ctx context.Context, topicID, subID string, opts ...SubOption) (*pubsub.Subscription, error) { o := makeSubOptions(opts...) top := ps.client.Topic(topicID) return ps.client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{ Topic: top, AckDeadline: o.ackDeadline, ExpirationPolicy: o.expirationPolicy, Filter: o.filter, }) } // Publishes a pubsub message to all subscribers of a given topic func (ps *PubSub) Publish(ctx context.Context, topicID, data string, attributes map[string]string) (msgID string, err error) { top := ps.client.Topic(topicID) res := top.Publish(ctx, &pubsub.Message{ Data: []byte(data), Attributes: attributes, }) return res.Get(ctx) } // Starts an active subscription with a given receive function func (ps *PubSub) Subscribe(ctx context.Context, subID string, receiveFunc func(context.Context, *pubsub.Message)) error { sub := ps.client.Subscription(subID) return sub.Receive(ctx, receiveFunc) } // StepFns func WithNewTopic(topicID string) func(ctx f2.Context, t *testing.T) f2.Context { return func(ctx f2.Context, t *testing.T) f2.Context { ps := FromContextT(ctx, t) _, err := ps.CreateTopic(ctx, topicID) if err != nil { t.Fatal(err) } return ctx } } func WithNewSubscription( subID, topicID string, opts ...SubOption, ) func(ctx f2.Context, t *testing.T) f2.Context { return func(ctx f2.Context, t *testing.T) f2.Context { ps := FromContextT(ctx, t) _, err := ps.CreateSubscription(ctx, topicID, subID, opts...) if err != nil { t.Fatal(err) } return ctx } }