1 package pstest
2
3 import (
4 "context"
5 "fmt"
6 "testing"
7
8 _ "edge-infra.dev/test/f2/integration"
9
10 "cloud.google.com/go/pubsub"
11 "cloud.google.com/go/pubsub/pstest"
12 "google.golang.org/api/option"
13 "google.golang.org/grpc"
14 "google.golang.org/grpc/credentials/insecure"
15
16 "edge-infra.dev/test/f2"
17 "edge-infra.dev/test/f2/fctx"
18 )
19
20 type PubSub struct {
21 client *pubsub.Client
22
23
24
25
26
27
28
29
30 Conn *grpc.ClientConn
31
32 options *pubsuboptions
33 }
34
35
36 func New(opts ...PubSubOption) (ps *PubSub) {
37 o := makePubSubOptions(opts...)
38 return &PubSub{
39 options: o,
40 }
41 }
42
43
44
45 func FromContext(ctx context.Context) (*PubSub, error) {
46 v := fctx.ValueFrom[PubSub](ctx)
47 if v == nil {
48 return nil, fmt.Errorf("%w: pstest.PubSub extension", fctx.ErrNotFound)
49 }
50 return v, nil
51 }
52
53
54
55 func FromContextT(ctx fctx.Context, t *testing.T) *PubSub {
56 return fctx.ValueFromT[PubSub](ctx, t)
57 }
58
59
60 func (ps *PubSub) IntoContext(ctx f2.Context) f2.Context {
61 return fctx.ValueInto(ctx, ps)
62 }
63
64
65
66 func (ps *PubSub) RegisterFns(f f2.Framework) {
67 var server *pstest.Server
68 f.BeforeEachTest(func(ctx fctx.Context, t *testing.T) (fctx.Context, error) {
69 server = pstest.NewServerWithPort(ps.options.port)
70 t.Setenv("PUBSUB_EMULATOR_HOST", server.Addr)
71
72 var err error
73 ps.Conn, err = grpc.NewClient(server.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
74 if err != nil {
75 return ctx, err
76 }
77
78 ps.client, err = pubsub.NewClient(ctx, ps.options.projectID, option.WithGRPCConn(ps.Conn))
79 if err != nil {
80 return ctx, err
81 }
82
83 return ctx, nil
84 }).AfterEachTest(func(ctx fctx.Context, _ *testing.T) (fctx.Context, error) {
85 ps.client.Close()
86 ps.Conn.Close()
87 server.Close()
88 return ctx, nil
89 })
90 }
91
92
93 func (ps *PubSub) CreateTopic(ctx context.Context, topicID string) (*pubsub.Topic, error) {
94 return ps.client.CreateTopic(ctx, topicID)
95 }
96
97
98 func (ps *PubSub) CreateSubscription(ctx context.Context, topicID, subID string, opts ...SubOption) (*pubsub.Subscription, error) {
99 o := makeSubOptions(opts...)
100 top := ps.client.Topic(topicID)
101 return ps.client.CreateSubscription(ctx, subID, pubsub.SubscriptionConfig{
102 Topic: top,
103 AckDeadline: o.ackDeadline,
104 ExpirationPolicy: o.expirationPolicy,
105 Filter: o.filter,
106 })
107 }
108
109
110 func (ps *PubSub) Publish(ctx context.Context, topicID, data string, attributes map[string]string) (msgID string, err error) {
111 top := ps.client.Topic(topicID)
112 res := top.Publish(ctx, &pubsub.Message{
113 Data: []byte(data),
114 Attributes: attributes,
115 })
116 return res.Get(ctx)
117 }
118
119
120 func (ps *PubSub) Subscribe(ctx context.Context, subID string, receiveFunc func(context.Context, *pubsub.Message)) error {
121 sub := ps.client.Subscription(subID)
122 return sub.Receive(ctx, receiveFunc)
123 }
124
125
126
127 func WithNewTopic(topicID string) func(ctx f2.Context, t *testing.T) f2.Context {
128 return func(ctx f2.Context, t *testing.T) f2.Context {
129 ps := FromContextT(ctx, t)
130 _, err := ps.CreateTopic(ctx, topicID)
131 if err != nil {
132 t.Fatal(err)
133 }
134 return ctx
135 }
136 }
137
138 func WithNewSubscription(
139 subID, topicID string,
140 opts ...SubOption,
141 ) func(ctx f2.Context, t *testing.T) f2.Context {
142 return func(ctx f2.Context, t *testing.T) f2.Context {
143 ps := FromContextT(ctx, t)
144 _, err := ps.CreateSubscription(ctx, topicID, subID, opts...)
145 if err != nil {
146 t.Fatal(err)
147 }
148 return ctx
149 }
150 }
151
View as plain text