const ( // ScopePubSub grants permissions to view and manage Pub/Sub // topics and subscriptions. ScopePubSub = "https://www.googleapis.com/auth/pubsub" // ScopeCloudPlatform grants permissions to view and manage your data // across Google Cloud Platform services. ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform" )
const ( // BigQueryConfigStateUnspecified is the default value. This value is unused. BigQueryConfigStateUnspecified = iota // BigQueryConfigActive means the subscription can actively send messages to BigQuery. BigQueryConfigActive // BigQueryConfigPermissionDenied means the subscription cannot write to the BigQuery table because of permission denied errors. BigQueryConfigPermissionDenied // BigQueryConfigNotFound means the subscription cannot write to the BigQuery table because it does not exist. BigQueryConfigNotFound // BigQueryConfigSchemaMismatch means the subscription cannot write to the BigQuery table due to a schema mismatch. BigQueryConfigSchemaMismatch )
const ( // CloudStorageConfigStateUnspecified is the default value. This value is unused. CloudStorageConfigStateUnspecified = iota // CloudStorageConfigActive means the subscription can actively send messages to Cloud Storage. CloudStorageConfigActive // CloudStorageConfigPermissionDenied means the subscription cannot write to the Cloud storage bucket because of permission denied errors. CloudStorageConfigPermissionDenied // CloudStorageConfigNotFound means the subscription cannot write to the Cloud Storage bucket because it does not exist. CloudStorageConfigNotFound )
const ( // SubscriptionStateUnspecified is the default value. This value is unused. SubscriptionStateUnspecified = iota // SubscriptionStateActive means the subscription can actively send messages to BigQuery. SubscriptionStateActive // SubscriptionStateResourceError means the subscription receive messages because of an // error with the resource to which it pushes messages. // See the more detailed error state in the corresponding configuration. SubscriptionStateResourceError )
const ( // MaxPublishRequestCount is the maximum number of messages that can be in // a single publish request, as defined by the PubSub service. MaxPublishRequestCount = 1000 // MaxPublishRequestBytes is the maximum size of a single publish request // in bytes, as defined by the PubSub service. MaxPublishRequestBytes = 1e7 )
const ( // TopicStateUnspecified is the default value. This value is unused. TopicStateUnspecified = iota // TopicStateActive means the topic does not have any persistent errors. TopicStateActive // TopicStateIngestionResourceError means ingestion from the data source // has encountered a permanent error. // See the more detailed error state in the corresponding ingestion // source configuration. TopicStateIngestionResourceError )
const ( // AWSKinesisStateUnspecified is the default value. This value is unused. AWSKinesisStateUnspecified = iota // AWSKinesisStateActive means ingestion is active. AWSKinesisStateActive // AWSKinesisStatePermissionDenied means encountering an error while consumign data from Kinesis. // This can happen if: // - The provided `aws_role_arn` does not exist or does not have the // appropriate permissions attached. // - The provided `aws_role_arn` is not set up properly for Identity // Federation using `gcp_service_account`. // - The Pub/Sub SA is not granted the // `iam.serviceAccounts.getOpenIdToken` permission on // `gcp_service_account`. AWSKinesisStatePermissionDenied // AWSKinesisStatePublishPermissionDenied means permission denied encountered while publishing to the topic. // This can happen due to Pub/Sub SA has not been granted the appropriate publish // permissions https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher AWSKinesisStatePublishPermissionDenied // AWSKinesisStateStreamNotFound means the Kinesis stream does not exist. AWSKinesisStateStreamNotFound // AWSKinesisStateConsumerNotFound means the Kinesis consumer does not exist. AWSKinesisStateConsumerNotFound )
DetectProjectID is a sentinel value that instructs NewClient to detect the project ID. It is given in place of the projectID argument. NewClient will use the project ID from the given credentials or the default credentials (https://developers.google.com/accounts/docs/application-default-credentials) if no credentials were provided. When providing credentials, not all options will allow NewClient to extract the project ID. Specifically a JWT does not have the project ID encoded.
const DetectProjectID = "*detect-project-id*"
var ( // ErrFlowControllerMaxOutstandingMessages indicates that outstanding messages exceeds MaxOutstandingMessages. ErrFlowControllerMaxOutstandingMessages = errors.New("pubsub: MaxOutstandingMessages flow controller limit exceeded") // ErrFlowControllerMaxOutstandingBytes indicates that outstanding bytes of messages exceeds MaxOutstandingBytes. ErrFlowControllerMaxOutstandingBytes = errors.New("pubsub: MaxOutstandingBytes flow control limit exceeded") )
The following are measures recorded in publish/subscribe flows.
var ( // PublishedMessages is a measure of the number of messages published, which may include errors. // It is EXPERIMENTAL and subject to change or removal without notice. PublishedMessages = stats.Int64(statsPrefix+"published_messages", "Number of PubSub message published", stats.UnitDimensionless) // PublishLatency is a measure of the number of milliseconds it took to publish a bundle, // which may consist of one or more messages. // It is EXPERIMENTAL and subject to change or removal without notice. PublishLatency = stats.Float64(statsPrefix+"publish_roundtrip_latency", "The latency in milliseconds per publish batch", stats.UnitMilliseconds) // PullCount is a measure of the number of messages pulled. // It is EXPERIMENTAL and subject to change or removal without notice. PullCount = stats.Int64(statsPrefix+"pull_count", "Number of PubSub messages pulled", stats.UnitDimensionless) // AckCount is a measure of the number of messages acked. // It is EXPERIMENTAL and subject to change or removal without notice. AckCount = stats.Int64(statsPrefix+"ack_count", "Number of PubSub messages acked", stats.UnitDimensionless) // NackCount is a measure of the number of messages nacked. // It is EXPERIMENTAL and subject to change or removal without notice. NackCount = stats.Int64(statsPrefix+"nack_count", "Number of PubSub messages nacked", stats.UnitDimensionless) // ModAckCount is a measure of the number of messages whose ack-deadline was modified. // It is EXPERIMENTAL and subject to change or removal without notice. ModAckCount = stats.Int64(statsPrefix+"mod_ack_count", "Number of ack-deadlines modified", stats.UnitDimensionless) // ModAckTimeoutCount is a measure of the number ModifyAckDeadline RPCs that timed out. // It is EXPERIMENTAL and subject to change or removal without notice. ModAckTimeoutCount = stats.Int64(statsPrefix+"mod_ack_timeout_count", "Number of ModifyAckDeadline RPCs that timed out", stats.UnitDimensionless) // StreamOpenCount is a measure of the number of times a streaming-pull stream was opened. // It is EXPERIMENTAL and subject to change or removal without notice. StreamOpenCount = stats.Int64(statsPrefix+"stream_open_count", "Number of calls opening a new streaming pull", stats.UnitDimensionless) // StreamRetryCount is a measure of the number of times a streaming-pull operation was retried. // It is EXPERIMENTAL and subject to change or removal without notice. StreamRetryCount = stats.Int64(statsPrefix+"stream_retry_count", "Number of retries of a stream send or receive", stats.UnitDimensionless) // StreamRequestCount is a measure of the number of requests sent on a streaming-pull stream. // It is EXPERIMENTAL and subject to change or removal without notice. StreamRequestCount = stats.Int64(statsPrefix+"stream_request_count", "Number gRPC StreamingPull request messages sent", stats.UnitDimensionless) // StreamResponseCount is a measure of the number of responses received on a streaming-pull stream. // It is EXPERIMENTAL and subject to change or removal without notice. StreamResponseCount = stats.Int64(statsPrefix+"stream_response_count", "Number of gRPC StreamingPull response messages received", stats.UnitDimensionless) // OutstandingMessages is a measure of the number of outstanding messages held by the client before they are processed. // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingMessages = stats.Int64(statsPrefix+"outstanding_messages", "Number of outstanding Pub/Sub messages", stats.UnitDimensionless) // OutstandingBytes is a measure of the number of bytes all outstanding messages held by the client take up. // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingBytes = stats.Int64(statsPrefix+"outstanding_bytes", "Number of outstanding bytes", stats.UnitDimensionless) // PublisherOutstandingMessages is a measure of the number of published outstanding messages held by the client before they are processed. // It is EXPERIMENTAL and subject to change or removal without notice. PublisherOutstandingMessages = stats.Int64(statsPrefix+"publisher_outstanding_messages", "Number of outstanding publish messages", stats.UnitDimensionless) // PublisherOutstandingBytes is a measure of the number of bytes all outstanding publish messages held by the client take up. // It is EXPERIMENTAL and subject to change or removal without notice. PublisherOutstandingBytes = stats.Int64(statsPrefix+"publisher_outstanding_bytes", "Number of outstanding publish bytes", stats.UnitDimensionless) )
var ( // PublishedMessagesView is a cumulative sum of PublishedMessages. // It is EXPERIMENTAL and subject to change or removal without notice. PublishedMessagesView *view.View // PublishLatencyView is a distribution of PublishLatency. // It is EXPERIMENTAL and subject to change or removal without notice. PublishLatencyView *view.View // PullCountView is a cumulative sum of PullCount. // It is EXPERIMENTAL and subject to change or removal without notice. PullCountView *view.View // AckCountView is a cumulative sum of AckCount. // It is EXPERIMENTAL and subject to change or removal without notice. AckCountView *view.View // NackCountView is a cumulative sum of NackCount. // It is EXPERIMENTAL and subject to change or removal without notice. NackCountView *view.View // ModAckCountView is a cumulative sum of ModAckCount. // It is EXPERIMENTAL and subject to change or removal without notice. ModAckCountView *view.View // ModAckTimeoutCountView is a cumulative sum of ModAckTimeoutCount. // It is EXPERIMENTAL and subject to change or removal without notice. ModAckTimeoutCountView *view.View // StreamOpenCountView is a cumulative sum of StreamOpenCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamOpenCountView *view.View // StreamRetryCountView is a cumulative sum of StreamRetryCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamRetryCountView *view.View // StreamRequestCountView is a cumulative sum of StreamRequestCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamRequestCountView *view.View // StreamResponseCountView is a cumulative sum of StreamResponseCount. // It is EXPERIMENTAL and subject to change or removal without notice. StreamResponseCountView *view.View // OutstandingMessagesView is the last value of OutstandingMessages // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingMessagesView *view.View // OutstandingBytesView is the last value of OutstandingBytes // It is EXPERIMENTAL and subject to change or removal without notice. OutstandingBytesView *view.View // PublisherOutstandingMessagesView is the last value of OutstandingMessages // It is EXPERIMENTAL and subject to change or removal without notice. PublisherOutstandingMessagesView *view.View // PublisherOutstandingBytesView is the last value of OutstandingBytes // It is EXPERIMENTAL and subject to change or removal without notice. PublisherOutstandingBytesView *view.View )
These arrays hold the default OpenCensus views that keep track of publish/subscribe operations. It is EXPERIMENTAL and subject to change or removal without notice.
var ( DefaultPublishViews []*view.View DefaultSubscribeViews []*view.View )
DefaultPublishSettings holds the default values for topics' PublishSettings.
var DefaultPublishSettings = PublishSettings{ DelayThreshold: 10 * time.Millisecond, CountThreshold: 100, ByteThreshold: 1e6, Timeout: 60 * time.Second, BufferedByteLimit: 10 * MaxPublishRequestBytes, FlowControlSettings: FlowControlSettings{ MaxOutstandingMessages: 1000, MaxOutstandingBytes: -1, LimitExceededBehavior: FlowControlIgnore, }, EnableCompression: false, CompressionBytesThreshold: 240, }
DefaultReceiveSettings holds the default values for ReceiveSettings.
var DefaultReceiveSettings = ReceiveSettings{ MaxExtension: 60 * time.Minute, MaxExtensionPeriod: 0, MinExtensionPeriod: 0, MaxOutstandingMessages: 1000, MaxOutstandingBytes: 1e9, NumGoroutines: 10, }
ErrEmptyProjectID denotes that the project string passed into NewClient was empty. Please provide a valid project ID or use the DetectProjectID sentinel value to detect project ID from well defined sources.
var ErrEmptyProjectID = errors.New("pubsub: projectID string is empty")
ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes.
var ErrOversizedMessage = bundler.ErrOversizedItem
ErrTopicStopped indicates that topic has been stopped and further publishing will fail.
var ErrTopicStopped = errors.New("pubsub: Stop has been called for this topic")
AWSKinesisState denotes the possible states for ingestion from Amazon Kinesis Data Streams.
type AWSKinesisState int
AckResult holds the result from a call to Ack or Nack.
Call Get to obtain the result of the Ack/NackWithResult call. Example:
// Get blocks until Ack/NackWithResult completes or ctx is done. ackStatus, err := r.Get(ctx) if err != nil { // TODO: Handle error. }
type AckResult = ipubsub.AckResult
AcknowledgeStatus represents the status of an Ack or Nack request.
type AcknowledgeStatus = ipubsub.AcknowledgeStatus
const ( // AcknowledgeStatusSuccess indicates the request was a success. AcknowledgeStatusSuccess AcknowledgeStatus = iota // AcknowledgeStatusPermissionDenied indicates the caller does not have sufficient permissions. AcknowledgeStatusPermissionDenied // AcknowledgeStatusFailedPrecondition indicates the request encountered a FailedPrecondition error. AcknowledgeStatusFailedPrecondition // AcknowledgeStatusInvalidAckID indicates one or more of the ack IDs sent were invalid. AcknowledgeStatusInvalidAckID // AcknowledgeStatusOther indicates another unknown error was returned. AcknowledgeStatusOther )
AuthenticationMethod is used by push subscriptions to verify the source of push requests.
type AuthenticationMethod interface {
// contains filtered or unexported methods
}
BigQueryConfig configures the subscription to deliver to a BigQuery table.
type BigQueryConfig struct { // The name of the table to which to write data, of the form // {projectId}:{datasetId}.{tableId} Table string // When true, use the topic's schema as the columns to write to in BigQuery, // if it exists. UseTopicSchema bool // When true, write the subscription name, message_id, publish_time, // attributes, and ordering_key to additional columns in the table. The // subscription name, message_id, and publish_time fields are put in their own // columns while all other message properties (other than data) are written to // a JSON object in the attributes column. WriteMetadata bool // When true and use_topic_schema is true, any fields that are a part of the // topic schema that are not part of the BigQuery table schema are dropped // when writing to BigQuery. Otherwise, the schemas must be kept in sync and // any messages with extra fields are not written and remain in the // subscription's backlog. DropUnknownFields bool // This is an output-only field that indicates whether or not the subscription can // receive messages. This field is set only in responses from the server; // it is ignored if it is set in any requests. State BigQueryConfigState }
BigQueryConfigState denotes the possible states for a BigQuery Subscription.
type BigQueryConfigState int
Client is a Google Pub/Sub client scoped to a single project.
Clients should be reused rather than being created as needed. A Client may be shared by multiple goroutines.
type Client struct {
// contains filtered or unexported fields
}
func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error)
NewClient creates a new PubSub client. It uses a default configuration.
▹ Example
func NewClientWithConfig(ctx context.Context, projectID string, config *ClientConfig, opts ...option.ClientOption) (c *Client, err error)
NewClientWithConfig creates a new PubSub client.
func (c *Client) Close() error
Close releases any resources held by the client, such as memory and goroutines.
If the client is available for the lifetime of the program, then Close need not be called at exit.
func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error)
CreateSubscription creates a new subscription on a topic.
id is the name of the subscription to create. It must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog".
cfg.Topic is the topic from which the subscription should receive messages. It need not belong to the same project as the subscription. This field is required.
cfg.AckDeadline is the maximum time after a subscriber receives a message before the subscriber should acknowledge the message. It must be between 10 and 600 seconds (inclusive), and is rounded down to the nearest second. If the provided ackDeadline is 0, then the default value of 10 seconds is used. Note: messages which are obtained via Subscription.Receive need not be acknowledged within this deadline, as the deadline will be automatically extended.
cfg.PushConfig may be set to configure this subscription for push delivery.
If the subscription already exists an error will be returned.
▹ Example
▹ Example (NeverExpire)
func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error)
CreateTopic creates a new topic.
The specified topic ID must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog". For more information, see: https://cloud.google.com/pubsub/docs/admin#resource_names
If the topic already exists an error will be returned.
▹ Example
func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error)
CreateTopicWithConfig creates a topic from TopicConfig.
The specified topic ID must start with a letter, and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 characters in length, and must not start with "goog". For more information, see: https://cloud.google.com/pubsub/docs/admin#resource_names.
If the topic already exists, an error will be returned.
▹ Example
func (c *Client) DetachSubscription(ctx context.Context, sub string) (*DetachSubscriptionResult, error)
DetachSubscription detaches a subscription from its topic. All messages retained in the subscription are dropped. Subsequent `Pull` and `StreamingPull` requests will return FAILED_PRECONDITION. If the subscription is a push subscription, pushes to the endpoint will stop.
func (c *Client) Project() string
Project returns the project ID or number for this instance of the client, which may have either been explicitly specified or autodetected.
func (c *Client) Snapshot(id string) *Snapshot
Snapshot creates a reference to a snapshot.
func (c *Client) Snapshots(ctx context.Context) *SnapshotConfigIterator
Snapshots returns an iterator which returns snapshots for this project.
▹ Example
func (c *Client) Subscription(id string) *Subscription
Subscription creates a reference to a subscription.
func (c *Client) SubscriptionInProject(id, projectID string) *Subscription
SubscriptionInProject creates a reference to a subscription in a given project.
func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator
Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
▹ Example
func (c *Client) Topic(id string) *Topic
Topic creates a reference to a topic in the client's project.
If a Topic's Publish method is called, it has background goroutines associated with it. Clean them up by calling Topic.Stop.
Avoid creating many Topic instances if you use them to publish.
func (c *Client) TopicInProject(id, projectID string) *Topic
TopicInProject creates a reference to a topic in the given project.
If a Topic's Publish method is called, it has background goroutines associated with it. Clean them up by calling Topic.Stop.
Avoid creating many Topic instances if you use them to publish.
▹ Example
func (c *Client) Topics(ctx context.Context) *TopicIterator
Topics returns an iterator which returns all of the topics for the client's project.
▹ Example
ClientConfig has configurations for the client.
type ClientConfig struct { PublisherCallOptions *vkit.PublisherCallOptions SubscriberCallOptions *vkit.SubscriberCallOptions }
CloudStorageConfig configures the subscription to deliver to Cloud Storage.
type CloudStorageConfig struct { // User-provided name for the Cloud Storage bucket. // The bucket must be created by the user. The bucket name must be without // any prefix like "gs://". See the [bucket naming // requirements] (https://cloud.google.com/storage/docs/buckets#naming). Bucket string // User-provided prefix for Cloud Storage filename. See the [object naming // requirements](https://cloud.google.com/storage/docs/objects#naming). FilenamePrefix string // User-provided suffix for Cloud Storage filename. See the [object naming // requirements](https://cloud.google.com/storage/docs/objects#naming). FilenameSuffix string // Configuration for how to write message data. Options are // CloudStorageOutputFormat_TextConfig and CloudStorageOutputFormat_AvroConfig. // Defaults to text format. OutputFormat isCloudStorageOutputFormat // The maximum duration that can elapse before a new Cloud Storage file is // created. Min 1 minute, max 10 minutes, default 5 minutes. May not exceed // the subscription's acknowledgement deadline. MaxDuration optional.Duration // The maximum bytes that can be written to a Cloud Storage file before a new // file is created. Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded // in cases where messages are larger than the limit. MaxBytes int64 // Output only. An output-only field that indicates whether or not the // subscription can receive messages. State CloudStorageConfigState }
CloudStorageConfigState denotes the possible states for a Cloud Storage Subscription.
type CloudStorageConfigState int
CloudStorageOutputFormatAvroConfig is the configuration for writing message data in Avro format. Message payloads and metadata will be written to the files as an Avro binary.
type CloudStorageOutputFormatAvroConfig struct { // When true, write the subscription name, message_id, publish_time, // attributes, and ordering_key as additional fields in the output. WriteMetadata bool }
CloudStorageOutputFormatTextConfig is the configuration for writing message data in text format. Message payloads will be written to files as raw text, separated by a newline.
type CloudStorageOutputFormatTextConfig struct{}
DeadLetterPolicy specifies the conditions for dead lettering messages in a subscription.
type DeadLetterPolicy struct { DeadLetterTopic string MaxDeliveryAttempts int }
DetachSubscriptionResult is the response for the DetachSubscription method. Reserved for future use.
type DetachSubscriptionResult struct{}
ErrPublishingPaused is a custom error indicating that the publish paused for the specified ordering key.
type ErrPublishingPaused struct { OrderingKey string }
func (e ErrPublishingPaused) Error() string
FlowControlSettings controls flow control for messages while publishing or subscribing.
type FlowControlSettings struct { // MaxOutstandingMessages is the maximum number of buffered messages to be published. // If less than or equal to zero, this is disabled. MaxOutstandingMessages int // MaxOutstandingBytes is the maximum size of buffered messages to be published. // If less than or equal to zero, this is disabled. MaxOutstandingBytes int // LimitExceededBehavior configures the behavior when trying to publish // additional messages while the flow controller is full. The available options // are Ignore (disable, default), Block, and SignalError (publish // results will return an error). LimitExceededBehavior LimitExceededBehavior }
IngestionDataSource is the kind of ingestion source to be used.
type IngestionDataSource interface {
// contains filtered or unexported methods
}
IngestionDataSourceAWSKinesis are ingestion settings for Amazon Kinesis Data Streams.
type IngestionDataSourceAWSKinesis struct { // State is an output-only field indicating the state of the kinesis connection. State AWSKinesisState // StreamARN is the Kinesis stream ARN to ingest data from. StreamARN string // ConsumerARn is the Kinesis consumer ARN to used for ingestion in Enhanced // Fan-Out mode. The consumer must be already created and ready to be used. ConsumerARN string // AWSRoleARn is the AWS role ARN to be used for Federated Identity authentication // with Kinesis. Check the Pub/Sub docs for how to set up this role and the // required permissions that need to be attached to it. AWSRoleARN string // GCPServiceAccount is the GCP service account to be used for Federated Identity // authentication with Kinesis (via a `AssumeRoleWithWebIdentity` call for // the provided role). The `aws_role_arn` must be set up with // `accounts.google.com:sub` equals to this service account number. GCPServiceAccount string }
IngestionDataSourceSettings enables ingestion from a data source into this topic.
type IngestionDataSourceSettings struct { Source IngestionDataSource }
LimitExceededBehavior configures the behavior that flowController can use in case the flow control limits are exceeded.
type LimitExceededBehavior int
const ( // FlowControlIgnore disables flow control. FlowControlIgnore LimitExceededBehavior = iota // FlowControlBlock signals to wait until the request can be made without exceeding the limit. FlowControlBlock // FlowControlSignalError signals an error to the caller of acquire. FlowControlSignalError )
Message represents a Pub/Sub message.
Message can be passed to Topic.Publish for publishing.
If received in the callback passed to Subscription.Receive, client code must call Message.Ack or Message.Nack when finished processing the Message. Calls to Ack or Nack have no effect after the first call.
Ack indicates successful processing of a Message. If message acknowledgement fails, the Message will be redelivered. Nack indicates that the client will not or cannot process a Message. Nack will result in the Message being redelivered more quickly than if it were allowed to expire.
If using exactly once delivery, you should call Message.AckWithResult and Message.NackWithResult instead. These methods will return an AckResult, which tracks the state of acknowledgement operation. If the AckResult returns successful, the message is guaranteed NOT to be re-delivered. Otherwise, the AckResult will return an error with more details about the failure and the message may be re-delivered.
type Message = ipubsub.Message
MessageStoragePolicy constrains how messages published to the topic may be stored. It is determined when the topic is created based on the policy configured at the project level.
type MessageStoragePolicy struct { // AllowedPersistenceRegions is the list of GCP regions where messages that are published // to the topic may be persisted in storage. Messages published by publishers running in // non-allowed GCP regions (or running outside of GCP altogether) will be // routed for storage in one of the allowed regions. // // If empty, it indicates a misconfiguration at the project or organization level, which // will result in all Publish operations failing. This field cannot be empty in updates. // // If nil, then the policy is not defined on a topic level. When used in updates, it resets // the regions back to the organization level Resource Location Restriction policy. // // For more information, see // https://cloud.google.com/pubsub/docs/resource-location-restriction#pubsub-storage-locations. AllowedPersistenceRegions []string }
NoWrapper denotes not wrapping the payload sent to the push endpoint.
type NoWrapper struct { WriteMetadata bool }
OIDCToken allows PushConfigs to be authenticated using the OpenID Connect protocol https://openid.net/connect/
type OIDCToken struct { // Audience to be used when generating OIDC token. The audience claim // identifies the recipients that the JWT is intended for. The audience // value is a single case-sensitive string. Having multiple values (array) // for the audience field is not supported. More info about the OIDC JWT // token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3 // Note: if not specified, the Push endpoint URL will be used. Audience string // The service account email to be used for generating the OpenID Connect token. // The caller of: // * CreateSubscription // * UpdateSubscription // * ModifyPushConfig // calls must have the iam.serviceAccounts.actAs permission for the service account. // See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles. ServiceAccountEmail string }
A PublishResult holds the result from a call to Publish.
Call Get to obtain the result of the Publish call. Example:
// Get blocks until Publish completes or ctx is done. id, err := r.Get(ctx) if err != nil { // TODO: Handle error. }
type PublishResult = ipubsub.PublishResult
PublishSettings control the bundling of published messages.
type PublishSettings struct { // Publish a non-empty batch after this delay has passed. DelayThreshold time.Duration // Publish a batch when it has this many messages. The maximum is // MaxPublishRequestCount. CountThreshold int // Publish a batch when its size in bytes reaches this value. ByteThreshold int // The number of goroutines used in each of the data structures that are // involved along the the Publish path. Adjusting this value adjusts // concurrency along the publish path. // // Defaults to a multiple of GOMAXPROCS. NumGoroutines int // The maximum time that the client will attempt to publish a bundle of messages. Timeout time.Duration // The maximum number of bytes that the Bundler will keep in memory before // returning ErrOverflow. This is now superseded by FlowControlSettings.MaxOutstandingBytes. // If MaxOutstandingBytes is set, that value will override BufferedByteLimit. // // Defaults to DefaultPublishSettings.BufferedByteLimit. // Deprecated: Set `Topic.PublishSettings.FlowControlSettings.MaxOutstandingBytes` instead. BufferedByteLimit int // FlowControlSettings defines publisher flow control settings. FlowControlSettings FlowControlSettings // EnableCompression enables transport compression for Publish operations EnableCompression bool // CompressionBytesThreshold defines the threshold (in bytes) above which messages // are compressed for transport. Only takes effect if EnableCompression is true. CompressionBytesThreshold int }
PubsubWrapper denotes sending the payload to the push endpoint in the form of the JSON representation of a PubsubMessage (https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage).
type PubsubWrapper struct{}
PushConfig contains configuration for subscriptions that operate in push mode.
type PushConfig struct { // A URL locating the endpoint to which messages should be pushed. Endpoint string // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details. Attributes map[string]string // AuthenticationMethod is used by push endpoints to verify the source // of push requests. // It can be used with push endpoints that are private by default to // allow requests only from the Cloud Pub/Sub system, for example. // This field is optional and should be set only by users interested in // authenticated push. AuthenticationMethod AuthenticationMethod // The format of the delivered message to the push endpoint is defined by // the chosen wrapper. When unset, `PubsubWrapper` is used. Wrapper Wrapper }
ReceiveSettings configure the Receive method. A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
type ReceiveSettings struct { // MaxExtension is the maximum period for which the Subscription should // automatically extend the ack deadline for each message. // // The Subscription will automatically extend the ack deadline of all // fetched Messages up to the duration specified. Automatic deadline // extension beyond the initial receipt may be disabled by specifying a // duration less than 0. MaxExtension time.Duration // MaxExtensionPeriod is the maximum duration by which to extend the ack // deadline at a time. The ack deadline will continue to be extended by up // to this duration until MaxExtension is reached. Setting MaxExtensionPeriod // bounds the maximum amount of time before a message redelivery in the // event the subscriber fails to extend the deadline. // // MaxExtensionPeriod must be between 10s and 600s (inclusive). This configuration // can be disabled by specifying a duration less than (or equal to) 0. MaxExtensionPeriod time.Duration // MinExtensionPeriod is the the min duration for a single lease extension attempt. // By default the 99th percentile of ack latency is used to determine lease extension // periods but this value can be set to minimize the number of extraneous RPCs sent. // // MinExtensionPeriod must be between 10s and 600s (inclusive). This configuration // can be disabled by specifying a duration less than (or equal to) 0. // Defaults to off but set to 60 seconds if the subscription has exactly-once delivery enabled, // which will be added in a future release. MinExtensionPeriod time.Duration // MaxOutstandingMessages is the maximum number of unprocessed messages // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages. // If the value is negative, then there will be no limit on the number of // unprocessed messages. MaxOutstandingMessages int // MaxOutstandingBytes is the maximum size of unprocessed messages // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If // the value is negative, then there will be no limit on the number of bytes // for unprocessed messages. MaxOutstandingBytes int // UseLegacyFlowControl disables enforcing flow control settings at the Cloud // PubSub server and the less accurate method of only enforcing flow control // at the client side is used. // The default is false. UseLegacyFlowControl bool // NumGoroutines sets the number of StreamingPull streams to pull messages // from the subscription. // // NumGoroutines defaults to DefaultReceiveSettings.NumGoroutines. // // NumGoroutines does not limit the number of messages that can be processed // concurrently. Even with one goroutine, many messages might be processed at // once, because that goroutine may continually receive messages and invoke the // function passed to Receive on them. To limit the number of messages being // processed concurrently, set MaxOutstandingMessages. NumGoroutines int // Synchronous switches the underlying receiving mechanism to unary Pull. // When Synchronous is false, the more performant StreamingPull is used. // StreamingPull also has the benefit of subscriber affinity when using // ordered delivery. // When Synchronous is true, NumGoroutines is set to 1 and only one Pull // RPC will be made to poll messages at a time. // The default is false. // // Deprecated. // Previously, users might use Synchronous mode since StreamingPull had a limitation // where MaxOutstandingMessages was not always respected with large batches of // small messages. With server side flow control, this is no longer an issue // and we recommend switching to the default StreamingPull mode by setting // Synchronous to false. // Synchronous mode does not work with exactly once delivery. Synchronous bool }
RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
Retry delay will be exponential based on provided minimum and maximum backoffs. https://en.wikipedia.org/wiki/Exponential_backoff.
RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded events for a given message.
Retry Policy is implemented on a best effort basis. At times, the delay between consecutive deliveries may not match the configuration. That is, delay can be more or less than configured backoff.
type RetryPolicy struct { // MinimumBackoff is the minimum delay between consecutive deliveries of a // given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds. MinimumBackoff optional.Duration // MaximumBackoff is the maximum delay between consecutive deliveries of a // given message. Value should be between 0 and 600 seconds. Defaults to 600 seconds. MaximumBackoff optional.Duration }
SchemaClient is a Pub/Sub schema client scoped to a single project.
type SchemaClient struct {
// contains filtered or unexported fields
}
func NewSchemaClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*SchemaClient, error)
NewSchemaClient creates a new Pub/Sub Schema client.
func (s *SchemaClient) Close() error
Close closes the schema client and frees up resources.
func (c *SchemaClient) CommitSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error)
CommitSchema commits a new schema revision to an existing schema.
func (c *SchemaClient) CreateSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error)
CreateSchema creates a new schema with the given schemaID and config. Schemas cannot be updated after creation.
func (c *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error
DeleteSchema deletes an existing schema given a schema ID.
func (c *SchemaClient) DeleteSchemaRevision(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error)
DeleteSchemaRevision deletes a specific schema revision.
func (c *SchemaClient) ListSchemaRevisions(ctx context.Context, schemaID string, view SchemaView) *SchemaIterator
ListSchemaRevisions lists all schema revisions for the named schema.
func (c *SchemaClient) RollbackSchema(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error)
RollbackSchema creates a new schema revision that is a copy of the provided revision.
func (c *SchemaClient) Schema(ctx context.Context, schemaID string, view SchemaView) (*SchemaConfig, error)
Schema retrieves the configuration of a schema given a schemaID and a view.
func (c *SchemaClient) Schemas(ctx context.Context, view SchemaView) *SchemaIterator
Schemas returns an iterator which returns all of the schemas for the client's project.
func (c *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error)
ValidateMessageWithConfig validates a message against an schema specified by a schema config.
func (c *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error)
ValidateMessageWithID validates a message against an schema specified by the schema ID of an existing schema.
func (c *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error)
ValidateSchema validates a schema config and returns an error if invalid.
SchemaConfig is a reference to a PubSub schema.
type SchemaConfig struct { // Name of the schema. // Format is `projects/{project}/schemas/{schema}` Name string // The type of the schema definition. Type SchemaType // The definition of the schema. This should contain a string representing // the full definition of the schema that is a valid schema definition of // the type specified in `type`. Definition string // RevisionID is the revision ID of the schema. // This field is output only. RevisionID string // RevisionCreateTime is the timestamp that the revision was created. // This field is output only. RevisionCreateTime time.Time }
SchemaEncoding is the encoding expected for messages.
type SchemaEncoding pb.Encoding
const ( // EncodingUnspecified is the default unused value. EncodingUnspecified SchemaEncoding = 0 // EncodingJSON is the JSON encoding type for a message. EncodingJSON SchemaEncoding = 1 // EncodingBinary is the binary encoding type for a message. // For some schema types, binary encoding may not be available. EncodingBinary SchemaEncoding = 2 )
SchemaIterator is a struct used to iterate over schemas.
type SchemaIterator struct {
// contains filtered or unexported fields
}
func (s *SchemaIterator) Next() (*SchemaConfig, error)
Next returns the next schema. If there are no more schemas, iterator.Done will be returned.
SchemaSettings are settings for validating messages published against a schema.
type SchemaSettings struct { // The name of the schema that messages published should be // validated against. Format is `projects/{project}/schemas/{schema}` Schema string // The encoding of messages validated against the schema. Encoding SchemaEncoding // The minimum (inclusive) revision allowed for validating messages. If empty // or not present, allow any revision to be validated against LastRevisionID or // any revision created before. FirstRevisionID string // The maximum (inclusive) revision allowed for validating messages. If empty // or not present, allow any revision to be validated against FirstRevisionID // or any revision created after. LastRevisionID string }
SchemaType is the possible schema definition types.
type SchemaType pb.Schema_Type
const ( // SchemaTypeUnspecified is the unused default value. SchemaTypeUnspecified SchemaType = 0 // SchemaProtocolBuffer is a protobuf schema definition. SchemaProtocolBuffer SchemaType = 1 // SchemaAvro is an Avro schema definition. SchemaAvro SchemaType = 2 )
SchemaView is a view of Schema object fields to be returned by GetSchema and ListSchemas.
type SchemaView pb.SchemaView
const ( // SchemaViewUnspecified is the default/unset value. SchemaViewUnspecified SchemaView = 0 // SchemaViewBasic includes the name and type of the schema, but not the definition. SchemaViewBasic SchemaView = 1 // SchemaViewFull includes all Schema object fields. SchemaViewFull SchemaView = 2 )
Snapshot is a reference to a PubSub snapshot.
type Snapshot struct {
// contains filtered or unexported fields
}
func (s *Snapshot) Delete(ctx context.Context) error
Delete deletes a snapshot.
▹ Example
func (s *Snapshot) ID() string
ID returns the unique identifier of the snapshot within its project.
func (s *Snapshot) SetLabels(ctx context.Context, label map[string]string) (*SnapshotConfig, error)
SetLabels sets or replaces the labels on a given snapshot.
SnapshotConfig contains the details of a Snapshot.
type SnapshotConfig struct { *Snapshot Topic *Topic Expiration time.Time // The set of labels for the snapshot. Labels map[string]string }
SnapshotConfigIterator is an iterator that returns a series of snapshots.
type SnapshotConfigIterator struct {
// contains filtered or unexported fields
}
func (snaps *SnapshotConfigIterator) Next() (*SnapshotConfig, error)
Next returns the next SnapshotConfig. Its second return value is iterator.Done if there are no more results. Once Next returns iterator.Done, all subsequent calls will return iterator.Done.
▹ Example
Subscription is a reference to a PubSub subscription.
type Subscription struct { // Settings for pulling messages. Configure these before calling Receive. ReceiveSettings ReceiveSettings // contains filtered or unexported fields }
func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error)
Config fetches the current configuration for the subscription.
▹ Example
func (s *Subscription) CreateSnapshot(ctx context.Context, name string) (*SnapshotConfig, error)
CreateSnapshot creates a new snapshot from this subscription. The snapshot will be for the topic this subscription is subscribed to. If the name is empty string, a unique name is assigned.
The created snapshot is guaranteed to retain:
(a) The existing backlog on the subscription. More precisely, this is defined as the messages in the subscription's backlog that are unacknowledged when Snapshot returns without error. (b) Any messages published to the subscription's topic following Snapshot returning without error.
▹ Example
func (s *Subscription) Delete(ctx context.Context) error
Delete deletes the subscription.
▹ Example
func (s *Subscription) Exists(ctx context.Context) (bool, error)
Exists reports whether the subscription exists on the server.
▹ Example
func (s *Subscription) IAM() *iam.Handle
IAM returns the subscription's IAM handle.
func (s *Subscription) ID() string
ID returns the unique identifier of the subscription within its project.
func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error
Receive calls f with the outstanding messages from the subscription. It blocks until ctx is done, or the service returns a non-retryable error.
The standard way to terminate a Receive is to cancel its context:
cctx, cancel := context.WithCancel(ctx) err := sub.Receive(cctx, callback) // Call cancel from callback, or another goroutine.
If the service returns a non-retryable error, Receive returns that error after all of the outstanding calls to f have returned. If ctx is done, Receive returns nil after all of the outstanding calls to f have returned and all messages have been acknowledged or have expired.
Receive calls f concurrently from multiple goroutines. It is encouraged to process messages synchronously in f, even if that processing is relatively time-consuming; Receive will spawn new goroutines for incoming messages, limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
The context passed to f will be canceled when ctx is Done or there is a fatal service error.
Receive will send an ack deadline extension on message receipt, then automatically extend the ack deadline of all fetched Messages up to the period specified by s.ReceiveSettings.MaxExtension.
Each Subscription may have only one invocation of Receive active at a time.
▹ Example
▹ Example (MaxExtension)
▹ Example (MaxOutstanding)
func (s *Subscription) SeekToSnapshot(ctx context.Context, snap *Snapshot) error
SeekToSnapshot seeks the subscription to a snapshot.
The snapshot need not be created from this subscription, but it must be for the topic this subscription is subscribed to.
▹ Example
func (s *Subscription) SeekToTime(ctx context.Context, t time.Time) error
SeekToTime seeks the subscription to a point in time.
Messages retained in the subscription that were published before this time are marked as acknowledged, and messages retained in the subscription that were published after this time are marked as unacknowledged. Note that this operation affects only those messages retained in the subscription (configured by SnapshotConfig). For example, if `time` corresponds to a point before the message retention window (or to a point before the system's notion of the subscription creation time), only retained messages will be marked as unacknowledged, and already-expunged messages will not be restored.
▹ Example
func (s *Subscription) String() string
String returns the globally unique printable name of the subscription.
func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error)
Update changes an existing subscription according to the fields set in cfg. It returns the new SubscriptionConfig.
Update returns an error if no fields were modified.
▹ Example
▹ Example (PushConfigAuthenticationMethod)
SubscriptionConfig describes the configuration of a subscription. If none of PushConfig, BigQueryConfig, or CloudStorageConfig is set, then the subscriber will pull and ack messages using API methods. At most one of these fields may be set.
type SubscriptionConfig struct { // The topic from which this subscription is receiving messages. Topic *Topic // If push delivery is used with this subscription, this field is // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, // or `CloudStorageConfig` can be set. If all are empty, then the // subscriber will pull and ack messages using API methods. PushConfig PushConfig // If delivery to BigQuery is used with this subscription, this field is // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, // or `CloudStorageConfig` can be set. If all are empty, then the // subscriber will pull and ack messages using API methods. BigQueryConfig BigQueryConfig // If delivery to Cloud Storage is used with this subscription, this field is // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, // or `CloudStorageConfig` can be set. If all are empty, then the // subscriber will pull and ack messages using API methods. CloudStorageConfig CloudStorageConfig // The default maximum time after a subscriber receives a message before // the subscriber should acknowledge the message. Note: messages which are // obtained via Subscription.Receive need not be acknowledged within this // deadline, as the deadline will be automatically extended. AckDeadline time.Duration // Whether to retain acknowledged messages. If true, acknowledged messages // will not be expunged until they fall out of the RetentionDuration window. RetainAckedMessages bool // How long to retain messages in backlog, from the time of publish. If // RetainAckedMessages is true, this duration affects the retention of // acknowledged messages, otherwise only unacknowledged messages are retained. // Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes. RetentionDuration time.Duration // Expiration policy specifies the conditions for a subscription's expiration. // A subscription is considered active as long as any connected subscriber is // successfully consuming messages from the subscription or is issuing // operations on the subscription. If `expiration_policy` is not set, a // *default policy* with `ttl` of 31 days will be used. The minimum allowed // value for `expiration_policy.ttl` is 1 day. // // Use time.Duration(0) to indicate that the subscription should never expire. ExpirationPolicy optional.Duration // The set of labels for the subscription. Labels map[string]string // EnableMessageOrdering enables message ordering on this subscription. // This value is only used for subscription creation and update, and // is not read locally in calls like Subscription.Receive(). // // If set to false, even if messages are published with ordering keys, // messages will not be delivered in order. // // When calling Subscription.Receive(), the client will check this // value with a call to Subscription.Config(), which requires the // roles/viewer or roles/pubsub.viewer role on your service account. // If that call fails, mesages with ordering keys will be delivered in order. EnableMessageOrdering bool // DeadLetterPolicy specifies the conditions for dead lettering messages in // a subscription. If not set, dead lettering is disabled. DeadLetterPolicy *DeadLetterPolicy // Filter is an expression written in the Cloud Pub/Sub filter language. If // non-empty, then only `PubsubMessage`s whose `attributes` field matches the // filter are delivered on this subscription. If empty, then no messages are // filtered out. Cannot be changed after the subscription is created. Filter string // RetryPolicy specifies how Cloud Pub/Sub retries message delivery. RetryPolicy *RetryPolicy // Detached indicates whether the subscription is detached from its topic. // Detached subscriptions don't receive messages from their topic and don't // retain any backlog. `Pull` and `StreamingPull` requests will return // FAILED_PRECONDITION. If the subscription is a push subscription, pushes to // the endpoint will not be made. Detached bool // TopicMessageRetentionDuration indicates the minimum duration for which a message is // retained after it is published to the subscription's topic. If this field is // set, messages published to the subscription's topic in the last // `TopicMessageRetentionDuration` are always available to subscribers. // You can enable both topic and subscription retention for the same topic. // In this situation, the maximum of the retention durations takes effect. // // This is an output only field, meaning it will only appear in responses from the backend // and will be ignored if sent in a request. TopicMessageRetentionDuration time.Duration // EnableExactlyOnceDelivery configures Pub/Sub to provide the following guarantees // for the delivery of a message with a given MessageID on this subscription: // // The message sent to a subscriber is guaranteed not to be resent // before the message's acknowledgement deadline expires. // An acknowledged message will not be resent to a subscriber. // // Note that subscribers may still receive multiple copies of a message // when `enable_exactly_once_delivery` is true if the message was published // multiple times by a publisher client. These copies are considered distinct // by Pub/Sub and have distinct MessageID values. // // Lastly, to guarantee messages have been acked or nacked properly, you must // call Message.AckWithResult() or Message.NackWithResult(). These return an // AckResult which will be ready if the message has been acked (or failed to be acked). EnableExactlyOnceDelivery bool // State indicates whether or not the subscription can receive messages. // This is an output-only field that indicates whether or not the subscription can // receive messages. This field is set only in responses from the server; // it is ignored if it is set in any requests. State SubscriptionState // contains filtered or unexported fields }
func (s *SubscriptionConfig) ID() string
ID returns the unique identifier of the subscription within its project. This method only works when the subscription config is returned from the server, such as when calling `client.Subscription` or `client.Subscriptions`. Otherwise, this will return an empty string.
func (s *SubscriptionConfig) String() string
String returns the globally unique printable name of the subscription config. This method only works when the subscription config is returned from the server, such as when calling `client.Subscription` or `client.Subscriptions`. Otherwise, this will return an empty string.
SubscriptionConfigToUpdate describes how to update a subscription.
type SubscriptionConfigToUpdate struct { // If non-nil, the push config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig // can be set. // If currently in push mode, set this value to the zero value to revert to a Pull based subscription. PushConfig *PushConfig // If non-nil, the bigquery config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig // can be set. // If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription, BigQueryConfig *BigQueryConfig // If non-nil, the Cloud Storage config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig // can be set. // If currently in CloudStorage mode, set this value to the zero value to revert to a Pull based subscription, CloudStorageConfig *CloudStorageConfig // If non-zero, the ack deadline is changed. AckDeadline time.Duration // If set, RetainAckedMessages is changed. RetainAckedMessages optional.Bool // If non-zero, RetentionDuration is changed. RetentionDuration time.Duration // If non-zero, Expiration is changed. ExpirationPolicy optional.Duration // If non-nil, DeadLetterPolicy is changed. To remove dead lettering from // a subscription, use the zero value for this struct. DeadLetterPolicy *DeadLetterPolicy // If non-nil, the current set of labels is completely // replaced by the new set. // This field has beta status. It is not subject to the stability guarantee // and may change. Labels map[string]string // If non-nil, RetryPolicy is changed. To remove an existing retry policy // (to redeliver messages as soon as possible) use a pointer to the zero value // for this struct. RetryPolicy *RetryPolicy // If set, EnableExactlyOnce is changed. EnableExactlyOnceDelivery optional.Bool }
SubscriptionIterator is an iterator that returns a series of subscriptions.
type SubscriptionIterator struct {
// contains filtered or unexported fields
}
func (subs *SubscriptionIterator) Next() (*Subscription, error)
Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
▹ Example
func (subs *SubscriptionIterator) NextConfig() (*SubscriptionConfig, error)
NextConfig returns the next subscription config. If there are no more subscriptions, iterator.Done will be returned. This call shares the underlying iterator with calls to `SubscriptionIterator.Next`. If you wish to use mix calls, create separate iterator instances for both.
SubscriptionState denotes the possible states for a Subscription.
type SubscriptionState int
Topic is a reference to a PubSub topic.
The methods of Topic are safe for use by multiple goroutines.
type Topic struct { // Settings for publishing messages. All changes must be made before the // first call to Publish. The default is DefaultPublishSettings. PublishSettings PublishSettings // EnableMessageOrdering enables delivery of ordered keys. EnableMessageOrdering bool // contains filtered or unexported fields }
func (t *Topic) Config(ctx context.Context) (TopicConfig, error)
Config returns the TopicConfig for the topic.
func (t *Topic) Delete(ctx context.Context) error
Delete deletes the topic.
▹ Example
func (t *Topic) Exists(ctx context.Context) (bool, error)
Exists reports whether the topic exists on the server.
▹ Example
func (t *Topic) Flush()
Flush blocks until all remaining messages are sent.
func (t *Topic) IAM() *iam.Handle
IAM returns the topic's IAM handle.
func (t *Topic) ID() string
ID returns the unique identifier of the topic within its project.
func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult
Publish publishes msg to the topic asynchronously. Messages are batched and sent according to the topic's PublishSettings. Publish never blocks.
Publish returns a non-nil PublishResult which will be ready when the message has been sent (or has failed to be sent) to the server.
Publish creates goroutines for batching and sending messages. These goroutines need to be stopped by calling t.Stop(). Once stopped, future calls to Publish will immediately return a PublishResult with an error.
▹ Example
func (t *Topic) ResumePublish(orderingKey string)
ResumePublish resumes accepting messages for the provided ordering key. Publishing using an ordering key might be paused if an error is encountered while publishing, to prevent messages from being published out of order.
func (t *Topic) Stop()
Stop sends all remaining published messages and stop goroutines created for handling publishing. Returns once all outstanding messages have been sent or have failed to be sent.
func (t *Topic) String() string
String returns the printable globally unique name for the topic.
func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator
Subscriptions returns an iterator which returns the subscriptions for this topic.
Some of the returned subscriptions may belong to a project other than t.
▹ Example
func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error)
Update changes an existing topic according to the fields set in cfg. It returns the new TopicConfig.
▹ Example
▹ Example (ResetMessageStoragePolicy)
TopicConfig describes the configuration of a topic.
type TopicConfig struct { // The set of labels for the topic. Labels map[string]string // The topic's message storage policy. MessageStoragePolicy MessageStoragePolicy // The name of the Cloud KMS key to be used to protect access to messages // published to this topic, in the format // "projects/P/locations/L/keyRings/R/cryptoKeys/K". KMSKeyName string // Schema defines the schema settings upon topic creation. SchemaSettings *SchemaSettings // RetentionDuration configures the minimum duration to retain a message // after it is published to the topic. If this field is set, messages published // to the topic in the last `RetentionDuration` are always available to subscribers. // For instance, it allows any attached subscription to [seek to a // timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time) // that is up to `RetentionDuration` in the past. If this field is // not set, message retention is controlled by settings on individual // subscriptions. Cannot be more than 31 days or less than 10 minutes. // // For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention. RetentionDuration optional.Duration // State is an output-only field indicating the state of the topic. State TopicState // IngestionDataSourceSettings are settings for ingestion from a // data source into this topic. IngestionDataSourceSettings *IngestionDataSourceSettings // contains filtered or unexported fields }
func (t *TopicConfig) ID() string
ID returns the unique identifier of the topic within its project. This method only works when the topic config is returned from the server, such as when calling `client.Topic` or `client.Topics`. Otherwise, this will return an empty string.
func (t *TopicConfig) String() string
String returns the printable globally unique name for the topic config. This method only works when the topic config is returned from the server, such as when calling `client.Topic` or `client.Topics`. Otherwise, this will return an empty string.
TopicConfigToUpdate describes how to update a topic.
type TopicConfigToUpdate struct { // If non-nil, the current set of labels is completely // replaced by the new set. Labels map[string]string // If non-nil, the existing policy (containing the list of regions) // is completely replaced by the new policy. // // Use the zero value &MessageStoragePolicy{} to reset the topic back to // using the organization's Resource Location Restriction policy. // // If nil, the policy remains unchanged. // // This field has beta status. It is not subject to the stability guarantee // and may change. MessageStoragePolicy *MessageStoragePolicy // If set to a positive duration between 10 minutes and 31 days, RetentionDuration is changed. // If set to a negative value, this clears RetentionDuration from the topic. // If nil, the retention duration remains unchanged. RetentionDuration optional.Duration // Schema defines the schema settings upon topic creation. // // Use the zero value &SchemaSettings{} to remove the schema from the topic. SchemaSettings *SchemaSettings // IngestionDataSourceSettings are settings for ingestion from a // data source into this topic. // // Use the zero value &IngestionDataSourceSettings{} to remove the ingestion settings from the topic. IngestionDataSourceSettings *IngestionDataSourceSettings }
TopicIterator is an iterator that returns a series of topics.
type TopicIterator struct {
// contains filtered or unexported fields
}
func (tps *TopicIterator) Next() (*Topic, error)
Next returns the next topic. If there are no more topics, iterator.Done will be returned.
▹ Example
func (t *TopicIterator) NextConfig() (*TopicConfig, error)
NextConfig returns the next topic config. If there are no more topics, iterator.Done will be returned. This call shares the underlying iterator with calls to `TopicIterator.Next`. If you wish to use mix calls, create separate iterator instances for both.
TopicState denotes the possible states for a topic.
type TopicState int
ValidateMessageResult is the response for the ValidateMessage method. Reserved for future use.
type ValidateMessageResult struct{}
ValidateSchemaResult is the response for the ValidateSchema method. Reserved for future use.
type ValidateSchemaResult struct{}
Wrapper defines the format of message delivered to push endpoints.
type Wrapper interface {
// contains filtered or unexported methods
}
Name | Synopsis |
---|---|
.. | |
aliasshim | Package aliasshim is used to keep the dependency on go-genproto during our go-genproto to google-cloud-go stubs migration window. |
apiv1 | Package pubsub is an auto-generated package for the Cloud Pub/Sub API. |
pubsubpb | |
loadtest | Package loadtest implements load testing for pubsub, following the interface defined in https://github.com/GoogleCloudPlatform/pubsub/tree/master/load-test-framework/ . |
cmd | |
pb | |
pstest | Package pstest provides a fake Cloud PubSub service for testing. |