...

Source file src/edge-infra.dev/test/framework/gcp/pubsub/pubsub.go

Documentation: edge-infra.dev/test/framework/gcp/pubsub

     1  package pubsub
     2  
     3  import (
     4  	"context"
     5  	"time"
     6  
     7  	"cloud.google.com/go/pubsub/pstest"
     8  	"google.golang.org/api/option"
     9  	"google.golang.org/grpc"
    10  	"google.golang.org/grpc/credentials/insecure"
    11  
    12  	pubSub "cloud.google.com/go/pubsub"
    13  
    14  	"edge-infra.dev/pkg/lib/gcp/pubsub"
    15  )
    16  
    17  type MockPubsubServerResponse struct {
    18  	PubsubServer *pstest.Server
    19  	Conn         *grpc.ClientConn
    20  }
    21  
    22  func SetupMockPubsubServer() (*MockPubsubServerResponse, error) {
    23  	pubsubsrv := pstest.NewServer()
    24  	conn, err := grpc.NewClient(pubsubsrv.Addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
    25  	if err != nil {
    26  		return nil, err
    27  	}
    28  	return &MockPubsubServerResponse{
    29  		PubsubServer: pubsubsrv,
    30  		Conn:         conn,
    31  	}, nil
    32  }
    33  
    34  // NewMockPubsubServer creates a new mock pubsub server and returns a client option.
    35  func NewMockPubsubServer() (option.ClientOption, error) {
    36  	res, err := SetupMockPubsubServer()
    37  	if err != nil {
    38  		return nil, err
    39  	}
    40  	pubsubGrpcClient := option.WithGRPCConn(res.Conn)
    41  	return pubsubGrpcClient, nil
    42  }
    43  
    44  // CreateMockTopic creates a mock pubsub topic.
    45  func CreateMockTopic(ctx context.Context, projectID, topicID string, opts ...option.ClientOption) (*pubSub.Topic, error) {
    46  	pubSubService, err := pubsub.NewWithOptions(ctx, projectID, opts...)
    47  	if err != nil {
    48  		return nil, err
    49  	}
    50  	return pubSubService.CreateTopic(ctx, topicID)
    51  }
    52  
    53  // SendMessage publishes a pubsub message to the specified topic.
    54  func SendMessage(ctx context.Context, projectID, topicID, message string, attributes map[string]string, opts ...option.ClientOption) error {
    55  	pubSubService, err := pubsub.NewWithOptions(ctx, projectID, opts...)
    56  	if err != nil {
    57  		return err
    58  	}
    59  	return pubSubService.Send(ctx, topicID, []byte(message), attributes)
    60  }
    61  
    62  // CreateMockSubscription creates a mock pubsub subscription and receives an assertion function to assert the pubsub message.
    63  func CreateMockSubscription(ctx context.Context, done chan<- bool, projectID, topicID, subscriptionID string, deadline time.Duration, filter string, assert func(msg *pubSub.Message), opts ...option.ClientOption) error {
    64  	pubSubService, err := pubsub.NewWithOptions(ctx, projectID, opts...)
    65  	if err != nil {
    66  		return err
    67  	}
    68  	subscription, err := pubSubService.CreateSubscription(ctx, topicID, subscriptionID, deadline, filter)
    69  	if err != nil {
    70  		return err
    71  	}
    72  	return subscription.Receive(ctx, func(_ context.Context, msg *pubSub.Message) {
    73  		assert(msg)
    74  		done <- true
    75  		msg.Ack()
    76  	})
    77  }
    78  

View as plain text