1 package pubsub
2
3 import (
4 "context"
5 "time"
6
7 "cloud.google.com/go/pubsub/pstest"
8 "google.golang.org/api/option"
9 "google.golang.org/grpc"
10 "google.golang.org/grpc/credentials/insecure"
11
12 pubSub "cloud.google.com/go/pubsub"
13
14 "edge-infra.dev/pkg/lib/gcp/pubsub"
15 )
16
17 type MockPubsubServerResponse struct {
18 PubsubServer *pstest.Server
19 Conn *grpc.ClientConn
20 }
21
22 func SetupMockPubsubServer() (*MockPubsubServerResponse, error) {
23 pubsubsrv := pstest.NewServer()
24 conn, err := grpc.NewClient(pubsubsrv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
25 if err != nil {
26 return nil, err
27 }
28 return &MockPubsubServerResponse{
29 PubsubServer: pubsubsrv,
30 Conn: conn,
31 }, nil
32 }
33
34
35 func NewMockPubsubServer() (option.ClientOption, error) {
36 res, err := SetupMockPubsubServer()
37 if err != nil {
38 return nil, err
39 }
40 pubsubGrpcClient := option.WithGRPCConn(res.Conn)
41 return pubsubGrpcClient, nil
42 }
43
44
45 func CreateMockTopic(ctx context.Context, projectID, topicID string, opts ...option.ClientOption) (*pubSub.Topic, error) {
46 pubSubService, err := pubsub.NewWithOptions(ctx, projectID, opts...)
47 if err != nil {
48 return nil, err
49 }
50 return pubSubService.CreateTopic(ctx, topicID)
51 }
52
53
54 func SendMessage(ctx context.Context, projectID, topicID, message string, attributes map[string]string, opts ...option.ClientOption) error {
55 pubSubService, err := pubsub.NewWithOptions(ctx, projectID, opts...)
56 if err != nil {
57 return err
58 }
59 return pubSubService.Send(ctx, topicID, []byte(message), attributes)
60 }
61
62
63 func CreateMockSubscription(ctx context.Context, done chan<- bool, projectID, topicID, subscriptionID string, deadline time.Duration, filter string, assert func(msg *pubSub.Message), opts ...option.ClientOption) error {
64 pubSubService, err := pubsub.NewWithOptions(ctx, projectID, opts...)
65 if err != nil {
66 return err
67 }
68 subscription, err := pubSubService.CreateSubscription(ctx, topicID, subscriptionID, deadline, filter)
69 if err != nil {
70 return err
71 }
72 return subscription.Receive(ctx, func(_ context.Context, msg *pubSub.Message) {
73 assert(msg)
74 done <- true
75 msg.Ack()
76 })
77 }
78
View as plain text