...

Source file src/edge-infra.dev/pkg/f8n/devinfra/gcp/pubsub/pubsub.go

Documentation: edge-infra.dev/pkg/f8n/devinfra/gcp/pubsub

     1  package pubsub
     2  
     3  import (
     4  	"context"
     5  	"sync"
     6  
     7  	"cloud.google.com/go/pubsub"
     8  	"github.com/go-logr/logr"
     9  
    10  	"edge-infra.dev/pkg/lib/logging"
    11  )
    12  
    13  const (
    14  	DefaultTopic        = "changelog-topic"
    15  	DefaultSubscription = "changelog-subscription"
    16  	DefaultProject      = "1037378570607"
    17  )
    18  
    19  type PubSub struct {
    20  	ctx     context.Context
    21  	client  *pubsub.Client
    22  	log     logr.Logger
    23  	topic   string
    24  	sub     string
    25  	project string
    26  }
    27  
    28  type Opts func(*PubSub)
    29  
    30  // New creates a new PubSub client
    31  func New(opts ...Opts) (*PubSub, error) {
    32  	ps := &PubSub{
    33  		ctx:     context.Background(),
    34  		sub:     DefaultSubscription,
    35  		topic:   DefaultTopic,
    36  		project: DefaultProject,
    37  		log:     logging.NewLogger().WithName("pubsub"),
    38  	}
    39  
    40  	for _, opt := range opts {
    41  		opt(ps)
    42  	}
    43  
    44  	if ps.client == nil {
    45  		c, err := pubsub.NewClient(ps.ctx, ps.project)
    46  		if err != nil {
    47  			ps.log.Info("error creating client", "err", err)
    48  		}
    49  		ps.client = c
    50  	}
    51  
    52  	return ps, nil
    53  }
    54  
    55  func (ps *PubSub) Publish(PR []byte) {
    56  	result := ps.client.Topic(ps.topic).Publish(ps.ctx, &pubsub.Message{
    57  		Data: PR,
    58  	})
    59  
    60  	var wg sync.WaitGroup
    61  
    62  	wg.Add(1)
    63  	go func(res *pubsub.PublishResult) {
    64  		defer wg.Done()
    65  
    66  		id, err := res.Get(ps.ctx)
    67  		if err != nil {
    68  			ps.log.Info("Failed to publish", "err", err)
    69  			return
    70  		}
    71  		ps.log.Info("message was published", "ID", id)
    72  	}(result)
    73  
    74  	wg.Wait()
    75  }
    76  
    77  func (ps *PubSub) Subscribe() *pubsub.Subscription {
    78  	return ps.client.Subscription(ps.sub)
    79  }
    80  
    81  func WithTopic(topic string) Opts {
    82  	return func(p *PubSub) {
    83  		p.topic = topic
    84  	}
    85  }
    86  
    87  func WithSubscription(sub string) Opts {
    88  	return func(p *PubSub) {
    89  		p.sub = sub
    90  	}
    91  }
    92  
    93  func WithProject(project string) Opts {
    94  	return func(p *PubSub) {
    95  		p.project = project
    96  	}
    97  }
    98  
    99  func WithContext(ctx context.Context) Opts {
   100  	return func(p *PubSub) {
   101  		p.ctx = ctx
   102  	}
   103  }
   104  

View as plain text