package pubsub import ( "context" "sync" "cloud.google.com/go/pubsub" "github.com/go-logr/logr" "edge-infra.dev/pkg/lib/logging" ) const ( DefaultTopic = "changelog-topic" DefaultSubscription = "changelog-subscription" DefaultProject = "1037378570607" ) type PubSub struct { ctx context.Context client *pubsub.Client log logr.Logger topic string sub string project string } type Opts func(*PubSub) // New creates a new PubSub client func New(opts ...Opts) (*PubSub, error) { ps := &PubSub{ ctx: context.Background(), sub: DefaultSubscription, topic: DefaultTopic, project: DefaultProject, log: logging.NewLogger().WithName("pubsub"), } for _, opt := range opts { opt(ps) } if ps.client == nil { c, err := pubsub.NewClient(ps.ctx, ps.project) if err != nil { ps.log.Info("error creating client", "err", err) } ps.client = c } return ps, nil } func (ps *PubSub) Publish(PR []byte) { result := ps.client.Topic(ps.topic).Publish(ps.ctx, &pubsub.Message{ Data: PR, }) var wg sync.WaitGroup wg.Add(1) go func(res *pubsub.PublishResult) { defer wg.Done() id, err := res.Get(ps.ctx) if err != nil { ps.log.Info("Failed to publish", "err", err) return } ps.log.Info("message was published", "ID", id) }(result) wg.Wait() } func (ps *PubSub) Subscribe() *pubsub.Subscription { return ps.client.Subscription(ps.sub) } func WithTopic(topic string) Opts { return func(p *PubSub) { p.topic = topic } } func WithSubscription(sub string) Opts { return func(p *PubSub) { p.sub = sub } } func WithProject(project string) Opts { return func(p *PubSub) { p.project = project } } func WithContext(ctx context.Context) Opts { return func(p *PubSub) { p.ctx = ctx } }