func ResetMinAckDeadline()
ResetMinAckDeadline resets the minack deadline to the default.
func SetMinAckDeadline(n time.Duration)
SetMinAckDeadline changes the minack deadline to n. Must be greater than or equal to 1 second. Remember to reset this value to the default after your test changes it. Example usage:
pstest.SetMinAckDeadlineSecs(1) defer pstest.ResetMinAckDeadlineSecs()
func ValidateFilter(filter string) error
ValidateFilter validates if the filter string is parsable.
GServer is the underlying service implementor. It is not intended to be used directly.
type GServer struct { pb.UnimplementedPublisherServer pb.UnimplementedSubscriberServer pb.UnimplementedSchemaServiceServer // contains filtered or unexported fields }
func (s *GServer) Acknowledge(_ context.Context, req *pb.AcknowledgeRequest) (*emptypb.Empty, error)
func (s *GServer) CommitSchema(_ context.Context, req *pb.CommitSchemaRequest) (*pb.Schema, error)
func (s *GServer) CreateSchema(_ context.Context, req *pb.CreateSchemaRequest) (*pb.Schema, error)
func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*pb.Subscription, error)
func (s *GServer) CreateTopic(_ context.Context, t *pb.Topic) (*pb.Topic, error)
func (s *GServer) DeleteSchema(_ context.Context, req *pb.DeleteSchemaRequest) (*emptypb.Empty, error)
func (s *GServer) DeleteSchemaRevision(_ context.Context, req *pb.DeleteSchemaRevisionRequest) (*pb.Schema, error)
func (s *GServer) DeleteSubscription(_ context.Context, req *pb.DeleteSubscriptionRequest) (*emptypb.Empty, error)
func (s *GServer) DeleteTopic(_ context.Context, req *pb.DeleteTopicRequest) (*emptypb.Empty, error)
func (s *GServer) DetachSubscription(_ context.Context, req *pb.DetachSubscriptionRequest) (*pb.DetachSubscriptionResponse, error)
func (s *GServer) GetSchema(_ context.Context, req *pb.GetSchemaRequest) (*pb.Schema, error)
func (s *GServer) GetSubscription(_ context.Context, req *pb.GetSubscriptionRequest) (*pb.Subscription, error)
func (s *GServer) GetTopic(_ context.Context, req *pb.GetTopicRequest) (*pb.Topic, error)
func (s *GServer) ListSchemaRevisions(_ context.Context, req *pb.ListSchemaRevisionsRequest) (*pb.ListSchemaRevisionsResponse, error)
func (s *GServer) ListSchemas(_ context.Context, req *pb.ListSchemasRequest) (*pb.ListSchemasResponse, error)
func (s *GServer) ListSubscriptions(_ context.Context, req *pb.ListSubscriptionsRequest) (*pb.ListSubscriptionsResponse, error)
func (s *GServer) ListTopicSubscriptions(_ context.Context, req *pb.ListTopicSubscriptionsRequest) (*pb.ListTopicSubscriptionsResponse, error)
func (s *GServer) ListTopics(_ context.Context, req *pb.ListTopicsRequest) (*pb.ListTopicsResponse, error)
func (s *GServer) ModifyAckDeadline(_ context.Context, req *pb.ModifyAckDeadlineRequest) (*emptypb.Empty, error)
func (s *GServer) Publish(_ context.Context, req *pb.PublishRequest) (*pb.PublishResponse, error)
func (s *GServer) Pull(ctx context.Context, req *pb.PullRequest) (*pb.PullResponse, error)
func (s *GServer) RollbackSchema(_ context.Context, req *pb.RollbackSchemaRequest) (*pb.Schema, error)
RollbackSchema rolls back the current schema to a previous revision by copying and creating a new revision.
func (s *GServer) Seek(ctx context.Context, req *pb.SeekRequest) (*pb.SeekResponse, error)
func (s *GServer) StreamingPull(sps pb.Subscriber_StreamingPullServer) error
func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscriptionRequest) (*pb.Subscription, error)
func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*pb.Topic, error)
func (s *GServer) ValidateMessage(_ context.Context, req *pb.ValidateMessageRequest) (*pb.ValidateMessageResponse, error)
ValidateMessage mocks the ValidateMessage call but only checks that the schema definition to validate the message against is not empty.
func (s *GServer) ValidateSchema(_ context.Context, req *pb.ValidateSchemaRequest) (*pb.ValidateSchemaResponse, error)
ValidateSchema mocks the ValidateSchema call but only checks that the schema definition is not empty.
A Message is a message that was published to the server.
type Message struct { ID string Data []byte Attributes map[string]string PublishTime time.Time Deliveries int // number of times delivery of the message was attempted Acks int // number of acks received from clients Modacks []Modack // modacks received by server for this message OrderingKey string // contains filtered or unexported fields }
Modack represents a modack sent to the server.
type Modack struct { AckID string AckDeadline int32 ReceivedAt time.Time }
Reactor is an interface to allow reaction function to a certain call.
type Reactor interface { // React handles the message types and returns results. If "handled" is false, // then the test server will ignore the results and continue to the next reactor // or the original handler. React(_ interface{}) (handled bool, ret interface{}, err error) }
ReactorOptions is a map that Server uses to look up reactors. Key is the function name, value is array of reactor for the function.
type ReactorOptions map[string][]Reactor
Server is a fake Pub/Sub server.
type Server struct { Addr string // The address that the server is listening on. GServer GServer // Not intended to be used directly. // contains filtered or unexported fields }
func NewServer(opts ...ServerReactorOption) *Server
NewServer creates a new fake server running in the current process.
▹ Example
func NewServerWithPort(port int, opts ...ServerReactorOption) *Server
NewServerWithPort creates a new fake server running in the current process at the specified port.
▹ Example
func (s *Server) AddPublishResponse(pbr *pb.PublishResponse, err error)
AddPublishResponse adds a new publish response to the channel used for responding to publish requests.
func (s *Server) ClearMessages()
ClearMessages removes all published messages from internal containers.
func (s *Server) Close() error
Close shuts down the server and releases all resources.
func (s *Server) Message(id string) *Message
Message returns the message with the given ID, or nil if no message with that ID was published.
func (s *Server) Messages() []*Message
Messages returns information about all messages ever published.
func (s *Server) Publish(topic string, data []byte, attrs map[string]string) string
Publish behaves as if the Publish RPC was called with a message with the given data and attrs. It returns the ID of the message. The topic will be created if it doesn't exist.
Publish panics if there is an error, which is appropriate for testing.
func (s *Server) PublishOrdered(topic string, data []byte, attrs map[string]string, orderingKey string) string
PublishOrdered behaves as if the Publish RPC was called with a message with the given data, attrs and ordering key. It returns the ID of the message. The topic will be created if it doesn't exist.
PublishOrdered panics if there is an error, which is appropriate for testing.
func (s *Server) ResetPublishResponses(size int)
ResetPublishResponses resets the buffered publishResponses channel with a new buffered channel with the given size.
func (s *Server) SetAutoPublishResponse(autoPublishResponse bool)
SetAutoPublishResponse controls whether to automatically respond to messages published or to use user-added responses from the publishResponses channel.
func (s *Server) SetStreamTimeout(d time.Duration)
SetStreamTimeout sets the amount of time a stream will be active before it shuts itself down. This mimics the real service's behavior of closing streams after 30 minutes. If SetStreamTimeout is never called or is passed zero, streams never shut down.
func (s *Server) SetTimeNowFunc(f func() time.Time)
SetTimeNowFunc registers f as a function to be used instead of time.Now for this server.
func (s *Server) Wait()
Wait blocks until all server activity has completed.
ServerReactorOption is options passed to the server for reactor creation.
type ServerReactorOption struct { FuncName string Reactor Reactor }
func WithErrorInjection(funcName string, code codes.Code, msg string) ServerReactorOption
WithErrorInjection creates a ServerReactorOption that injects error with defined status code and message for a certain function.