...

Package pstest

import "cloud.google.com/go/pubsub/pstest"
Overview
Index
Examples

Overview ▾

Package pstest provides a fake Cloud PubSub service for testing. It implements a simplified form of the service, suitable for unit tests. It may behave differently from the actual service in ways in which the service is non-deterministic or unspecified: timing, delivery order, etc.

This package is EXPERIMENTAL and is subject to change without notice.

See the example for usage.

Index ▾

func ResetMinAckDeadline()
func SetMinAckDeadline(n time.Duration)
func ValidateFilter(filter string) error
type GServer
    func (s *GServer) Acknowledge(_ context.Context, req *pb.AcknowledgeRequest) (*emptypb.Empty, error)
    func (s *GServer) CommitSchema(_ context.Context, req *pb.CommitSchemaRequest) (*pb.Schema, error)
    func (s *GServer) CreateSchema(_ context.Context, req *pb.CreateSchemaRequest) (*pb.Schema, error)
    func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*pb.Subscription, error)
    func (s *GServer) CreateTopic(_ context.Context, t *pb.Topic) (*pb.Topic, error)
    func (s *GServer) DeleteSchema(_ context.Context, req *pb.DeleteSchemaRequest) (*emptypb.Empty, error)
    func (s *GServer) DeleteSchemaRevision(_ context.Context, req *pb.DeleteSchemaRevisionRequest) (*pb.Schema, error)
    func (s *GServer) DeleteSubscription(_ context.Context, req *pb.DeleteSubscriptionRequest) (*emptypb.Empty, error)
    func (s *GServer) DeleteTopic(_ context.Context, req *pb.DeleteTopicRequest) (*emptypb.Empty, error)
    func (s *GServer) DetachSubscription(_ context.Context, req *pb.DetachSubscriptionRequest) (*pb.DetachSubscriptionResponse, error)
    func (s *GServer) GetSchema(_ context.Context, req *pb.GetSchemaRequest) (*pb.Schema, error)
    func (s *GServer) GetSubscription(_ context.Context, req *pb.GetSubscriptionRequest) (*pb.Subscription, error)
    func (s *GServer) GetTopic(_ context.Context, req *pb.GetTopicRequest) (*pb.Topic, error)
    func (s *GServer) ListSchemaRevisions(_ context.Context, req *pb.ListSchemaRevisionsRequest) (*pb.ListSchemaRevisionsResponse, error)
    func (s *GServer) ListSchemas(_ context.Context, req *pb.ListSchemasRequest) (*pb.ListSchemasResponse, error)
    func (s *GServer) ListSubscriptions(_ context.Context, req *pb.ListSubscriptionsRequest) (*pb.ListSubscriptionsResponse, error)
    func (s *GServer) ListTopicSubscriptions(_ context.Context, req *pb.ListTopicSubscriptionsRequest) (*pb.ListTopicSubscriptionsResponse, error)
    func (s *GServer) ListTopics(_ context.Context, req *pb.ListTopicsRequest) (*pb.ListTopicsResponse, error)
    func (s *GServer) ModifyAckDeadline(_ context.Context, req *pb.ModifyAckDeadlineRequest) (*emptypb.Empty, error)
    func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.PublishResponse, error)
    func (s *GServer) Pull(ctx context.Context, req *pb.PullRequest) (*pb.PullResponse, error)
    func (s *GServer) RollbackSchema(_ context.Context, req *pb.RollbackSchemaRequest) (*pb.Schema, error)
    func (s *GServer) Seek(ctx context.Context, req *pb.SeekRequest) (*pb.SeekResponse, error)
    func (s *GServer) StreamingPull(sps pb.Subscriber_StreamingPullServer) error
    func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscriptionRequest) (*pb.Subscription, error)
    func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*pb.Topic, error)
    func (s *GServer) ValidateMessage(_ context.Context, req *pb.ValidateMessageRequest) (*pb.ValidateMessageResponse, error)
    func (s *GServer) ValidateSchema(_ context.Context, req *pb.ValidateSchemaRequest) (*pb.ValidateSchemaResponse, error)
type Message
type Modack
type Reactor
type ReactorOptions
type Server
    func NewServer(opts ...ServerReactorOption) *Server
    func NewServerWithPort(port int, opts ...ServerReactorOption) *Server
    func (s *Server) AddPublishResponse(pbr *pb.PublishResponse, err error)
    func (s *Server) ClearMessages()
    func (s *Server) Close() error
    func (s *Server) Message(id string) *Message
    func (s *Server) Messages() []*Message
    func (s *Server) Publish(topic string, data []byte, attrs map[string]string) string
    func (s *Server) PublishOrdered(topic string, data []byte, attrs map[string]string, orderingKey string) string
    func (s *Server) ResetPublishResponses(size int)
    func (s *Server) SetAutoPublishResponse(autoPublishResponse bool)
    func (s *Server) SetStreamTimeout(d time.Duration)
    func (s *Server) SetTimeNowFunc(f func() time.Time)
    func (s *Server) Wait()
type ServerReactorOption
    func WithErrorInjection(funcName string, code codes.Code, msg string) ServerReactorOption

Examples

NewServer
NewServerWithPort

Package files

fake.go filtering.go

func ResetMinAckDeadline

func ResetMinAckDeadline()

ResetMinAckDeadline resets the minack deadline to the default.

func SetMinAckDeadline

func SetMinAckDeadline(n time.Duration)

SetMinAckDeadline changes the minack deadline to n. Must be greater than or equal to 1 second. Remember to reset this value to the default after your test changes it. Example usage:

pstest.SetMinAckDeadlineSecs(1)
defer pstest.ResetMinAckDeadlineSecs()

func ValidateFilter

func ValidateFilter(filter string) error

ValidateFilter validates if the filter string is parsable.

type GServer

GServer is the underlying service implementor. It is not intended to be used directly.

type GServer struct {
    pb.UnimplementedPublisherServer
    pb.UnimplementedSubscriberServer
    pb.UnimplementedSchemaServiceServer
    // contains filtered or unexported fields
}

func (*GServer) Acknowledge

func (s *GServer) Acknowledge(_ context.Context, req *pb.AcknowledgeRequest) (*emptypb.Empty, error)

func (*GServer) CommitSchema

func (s *GServer) CommitSchema(_ context.Context, req *pb.CommitSchemaRequest) (*pb.Schema, error)

func (*GServer) CreateSchema

func (s *GServer) CreateSchema(_ context.Context, req *pb.CreateSchemaRequest) (*pb.Schema, error)

func (*GServer) CreateSubscription

func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*pb.Subscription, error)

func (*GServer) CreateTopic

func (s *GServer) CreateTopic(_ context.Context, t *pb.Topic) (*pb.Topic, error)

func (*GServer) DeleteSchema

func (s *GServer) DeleteSchema(_ context.Context, req *pb.DeleteSchemaRequest) (*emptypb.Empty, error)

func (*GServer) DeleteSchemaRevision

func (s *GServer) DeleteSchemaRevision(_ context.Context, req *pb.DeleteSchemaRevisionRequest) (*pb.Schema, error)

func (*GServer) DeleteSubscription

func (s *GServer) DeleteSubscription(_ context.Context, req *pb.DeleteSubscriptionRequest) (*emptypb.Empty, error)

func (*GServer) DeleteTopic

func (s *GServer) DeleteTopic(_ context.Context, req *pb.DeleteTopicRequest) (*emptypb.Empty, error)

func (*GServer) DetachSubscription

func (s *GServer) DetachSubscription(_ context.Context, req *pb.DetachSubscriptionRequest) (*pb.DetachSubscriptionResponse, error)

func (*GServer) GetSchema

func (s *GServer) GetSchema(_ context.Context, req *pb.GetSchemaRequest) (*pb.Schema, error)

func (*GServer) GetSubscription

func (s *GServer) GetSubscription(_ context.Context, req *pb.GetSubscriptionRequest) (*pb.Subscription, error)

func (*GServer) GetTopic

func (s *GServer) GetTopic(_ context.Context, req *pb.GetTopicRequest) (*pb.Topic, error)

func (*GServer) ListSchemaRevisions

func (s *GServer) ListSchemaRevisions(_ context.Context, req *pb.ListSchemaRevisionsRequest) (*pb.ListSchemaRevisionsResponse, error)

func (*GServer) ListSchemas

func (s *GServer) ListSchemas(_ context.Context, req *pb.ListSchemasRequest) (*pb.ListSchemasResponse, error)

func (*GServer) ListSubscriptions

func (s *GServer) ListSubscriptions(_ context.Context, req *pb.ListSubscriptionsRequest) (*pb.ListSubscriptionsResponse, error)

func (*GServer) ListTopicSubscriptions

func (s *GServer) ListTopicSubscriptions(_ context.Context, req *pb.ListTopicSubscriptionsRequest) (*pb.ListTopicSubscriptionsResponse, error)

func (*GServer) ListTopics

func (s *GServer) ListTopics(_ context.Context, req *pb.ListTopicsRequest) (*pb.ListTopicsResponse, error)

func (*GServer) ModifyAckDeadline

func (s *GServer) ModifyAckDeadline(_ context.Context, req *pb.ModifyAckDeadlineRequest) (*emptypb.Empty, error)

func (*GServer) Publish

func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.PublishResponse, error)

func (*GServer) Pull

func (s *GServer) Pull(ctx context.Context, req *pb.PullRequest) (*pb.PullResponse, error)

func (*GServer) RollbackSchema

func (s *GServer) RollbackSchema(_ context.Context, req *pb.RollbackSchemaRequest) (*pb.Schema, error)

RollbackSchema rolls back the current schema to a previous revision by copying and creating a new revision.

func (*GServer) Seek

func (s *GServer) Seek(ctx context.Context, req *pb.SeekRequest) (*pb.SeekResponse, error)

func (*GServer) StreamingPull

func (s *GServer) StreamingPull(sps pb.Subscriber_StreamingPullServer) error

func (*GServer) UpdateSubscription

func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscriptionRequest) (*pb.Subscription, error)

func (*GServer) UpdateTopic

func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*pb.Topic, error)

func (*GServer) ValidateMessage

func (s *GServer) ValidateMessage(_ context.Context, req *pb.ValidateMessageRequest) (*pb.ValidateMessageResponse, error)

ValidateMessage mocks the ValidateMessage call but only checks that the schema definition to validate the message against is not empty.

func (*GServer) ValidateSchema

func (s *GServer) ValidateSchema(_ context.Context, req *pb.ValidateSchemaRequest) (*pb.ValidateSchemaResponse, error)

ValidateSchema mocks the ValidateSchema call but only checks that the schema definition is not empty.

type Message

A Message is a message that was published to the server.

type Message struct {
    ID          string
    Data        []byte
    Attributes  map[string]string
    PublishTime time.Time
    Deliveries  int      // number of times delivery of the message was attempted
    Acks        int      // number of acks received from clients
    Modacks     []Modack // modacks received by server for this message
    OrderingKey string
    // contains filtered or unexported fields
}

type Modack

Modack represents a modack sent to the server.

type Modack struct {
    AckID       string
    AckDeadline int32
    ReceivedAt  time.Time
}

type Reactor

Reactor is an interface to allow reaction function to a certain call.

type Reactor interface {
    // React handles the message types and returns results.  If "handled" is false,
    // then the test server will ignore the results and continue to the next reactor
    // or the original handler.
    React(_ interface{}) (handled bool, ret interface{}, err error)
}

type ReactorOptions

ReactorOptions is a map that Server uses to look up reactors. Key is the function name, value is array of reactor for the function.

type ReactorOptions map[string][]Reactor

type Server

Server is a fake Pub/Sub server.

type Server struct {
    Addr    string  // The address that the server is listening on.
    GServer GServer // Not intended to be used directly.
    // contains filtered or unexported fields
}

func NewServer

func NewServer(opts ...ServerReactorOption) *Server

NewServer creates a new fake server running in the current process.

Example

Code:

ctx := context.Background()
// Start a fake server running locally.
srv := pstest.NewServer()
defer srv.Close()
// Connect to the server without using TLS.
conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
if err != nil {
    // TODO: Handle error.
}
defer conn.Close()
// Use the connection when creating a pubsub client.
client, err := pubsub.NewClient(ctx, "project", option.WithGRPCConn(conn))
if err != nil {
    // TODO: Handle error.
}
defer client.Close()
_ = client // TODO: Use the client.

func NewServerWithPort

func NewServerWithPort(port int, opts ...ServerReactorOption) *Server

NewServerWithPort creates a new fake server running in the current process at the specified port.

Example

Code:

ctx := context.Background()
// Start a fake server running locally at 9001.
srv := pstest.NewServerWithPort(9001)
defer srv.Close()
// Connect to the server without using TLS.
conn, err := grpc.Dial(srv.Addr, grpc.WithInsecure())
if err != nil {
    // TODO: Handle error.
}
defer conn.Close()
// Use the connection when creating a pubsub client.
client, err := pubsub.NewClient(ctx, "project", option.WithGRPCConn(conn))
if err != nil {
    // TODO: Handle error.
}
defer client.Close()
_ = client // TODO: Use the client.

func (*Server) AddPublishResponse

func (s *Server) AddPublishResponse(pbr *pb.PublishResponse, err error)

AddPublishResponse adds a new publish response to the channel used for responding to publish requests.

func (*Server) ClearMessages

func (s *Server) ClearMessages()

ClearMessages removes all published messages from internal containers.

func (*Server) Close

func (s *Server) Close() error

Close shuts down the server and releases all resources.

func (*Server) Message

func (s *Server) Message(id string) *Message

Message returns the message with the given ID, or nil if no message with that ID was published.

func (*Server) Messages

func (s *Server) Messages() []*Message

Messages returns information about all messages ever published.

func (*Server) Publish

func (s *Server) Publish(topic string, data []byte, attrs map[string]string) string

Publish behaves as if the Publish RPC was called with a message with the given data and attrs. It returns the ID of the message. The topic will be created if it doesn't exist.

Publish panics if there is an error, which is appropriate for testing.

func (*Server) PublishOrdered

func (s *Server) PublishOrdered(topic string, data []byte, attrs map[string]string, orderingKey string) string

PublishOrdered behaves as if the Publish RPC was called with a message with the given data, attrs and ordering key. It returns the ID of the message. The topic will be created if it doesn't exist.

PublishOrdered panics if there is an error, which is appropriate for testing.

func (*Server) ResetPublishResponses

func (s *Server) ResetPublishResponses(size int)

ResetPublishResponses resets the buffered publishResponses channel with a new buffered channel with the given size.

func (*Server) SetAutoPublishResponse

func (s *Server) SetAutoPublishResponse(autoPublishResponse bool)

SetAutoPublishResponse controls whether to automatically respond to messages published or to use user-added responses from the publishResponses channel.

func (*Server) SetStreamTimeout

func (s *Server) SetStreamTimeout(d time.Duration)

SetStreamTimeout sets the amount of time a stream will be active before it shuts itself down. This mimics the real service's behavior of closing streams after 30 minutes. If SetStreamTimeout is never called or is passed zero, streams never shut down.

func (*Server) SetTimeNowFunc

func (s *Server) SetTimeNowFunc(f func() time.Time)

SetTimeNowFunc registers f as a function to be used instead of time.Now for this server.

func (*Server) Wait

func (s *Server) Wait()

Wait blocks until all server activity has completed.

type ServerReactorOption

ServerReactorOption is options passed to the server for reactor creation.

type ServerReactorOption struct {
    FuncName string
    Reactor  Reactor
}

func WithErrorInjection

func WithErrorInjection(funcName string, code codes.Code, msg string) ServerReactorOption

WithErrorInjection creates a ServerReactorOption that injects error with defined status code and message for a certain function.