package pubsub import ( "context" "time" "cloud.google.com/go/pubsub/pstest" "google.golang.org/api/option" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" pubSub "cloud.google.com/go/pubsub" "edge-infra.dev/pkg/lib/gcp/pubsub" ) type MockPubsubServerResponse struct { PubsubServer *pstest.Server Conn *grpc.ClientConn } func SetupMockPubsubServer() (*MockPubsubServerResponse, error) { pubsubsrv := pstest.NewServer() conn, err := grpc.NewClient(pubsubsrv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials())) if err != nil { return nil, err } return &MockPubsubServerResponse{ PubsubServer: pubsubsrv, Conn: conn, }, nil } // NewMockPubsubServer creates a new mock pubsub server and returns a client option. func NewMockPubsubServer() (option.ClientOption, error) { res, err := SetupMockPubsubServer() if err != nil { return nil, err } pubsubGrpcClient := option.WithGRPCConn(res.Conn) return pubsubGrpcClient, nil } // CreateMockTopic creates a mock pubsub topic. func CreateMockTopic(ctx context.Context, projectID, topicID string, opts ...option.ClientOption) (*pubSub.Topic, error) { pubSubService, err := pubsub.NewWithOptions(ctx, projectID, opts...) if err != nil { return nil, err } return pubSubService.CreateTopic(ctx, topicID) } // SendMessage publishes a pubsub message to the specified topic. func SendMessage(ctx context.Context, projectID, topicID, message string, attributes map[string]string, opts ...option.ClientOption) error { pubSubService, err := pubsub.NewWithOptions(ctx, projectID, opts...) if err != nil { return err } return pubSubService.Send(ctx, topicID, []byte(message), attributes) } // CreateMockSubscription creates a mock pubsub subscription and receives an assertion function to assert the pubsub message. func CreateMockSubscription(ctx context.Context, done chan<- bool, projectID, topicID, subscriptionID string, deadline time.Duration, filter string, assert func(msg *pubSub.Message), opts ...option.ClientOption) error { pubSubService, err := pubsub.NewWithOptions(ctx, projectID, opts...) if err != nil { return err } subscription, err := pubSubService.CreateSubscription(ctx, topicID, subscriptionID, deadline, filter) if err != nil { return err } return subscription.Receive(ctx, func(_ context.Context, msg *pubSub.Message) { assert(msg) done <- true msg.Ack() }) }