// Copyright 2016 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. package pubsub import ( "context" "errors" "fmt" "log" "runtime" "strings" "sync" "time" "cloud.google.com/go/iam" "cloud.google.com/go/internal/optional" ipubsub "cloud.google.com/go/internal/pubsub" vkit "cloud.google.com/go/pubsub/apiv1" pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" "cloud.google.com/go/pubsub/internal/scheduler" gax "github.com/googleapis/gax-go/v2" "go.opencensus.io/stats" "go.opencensus.io/tag" "google.golang.org/api/support/bundler" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/encoding/gzip" "google.golang.org/grpc/status" "google.golang.org/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" fmpb "google.golang.org/protobuf/types/known/fieldmaskpb" ) const ( // MaxPublishRequestCount is the maximum number of messages that can be in // a single publish request, as defined by the PubSub service. MaxPublishRequestCount = 1000 // MaxPublishRequestBytes is the maximum size of a single publish request // in bytes, as defined by the PubSub service. MaxPublishRequestBytes = 1e7 ) const ( // TODO: math.MaxInt was added in Go 1.17. We should use that once 1.17 // becomes the minimum supported version of Go. intSize = 32 << (^uint(0) >> 63) maxInt = 1<<(intSize-1) - 1 ) // ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes. var ErrOversizedMessage = bundler.ErrOversizedItem // Topic is a reference to a PubSub topic. // // The methods of Topic are safe for use by multiple goroutines. type Topic struct { c *Client // The fully qualified identifier for the topic, in the format "projects//topics/" name string // Settings for publishing messages. All changes must be made before the // first call to Publish. The default is DefaultPublishSettings. PublishSettings PublishSettings mu sync.RWMutex stopped bool scheduler *scheduler.PublishScheduler flowController // EnableMessageOrdering enables delivery of ordered keys. EnableMessageOrdering bool } // PublishSettings control the bundling of published messages. type PublishSettings struct { // Publish a non-empty batch after this delay has passed. DelayThreshold time.Duration // Publish a batch when it has this many messages. The maximum is // MaxPublishRequestCount. CountThreshold int // Publish a batch when its size in bytes reaches this value. ByteThreshold int // The number of goroutines used in each of the data structures that are // involved along the the Publish path. Adjusting this value adjusts // concurrency along the publish path. // // Defaults to a multiple of GOMAXPROCS. NumGoroutines int // The maximum time that the client will attempt to publish a bundle of messages. Timeout time.Duration // The maximum number of bytes that the Bundler will keep in memory before // returning ErrOverflow. This is now superseded by FlowControlSettings.MaxOutstandingBytes. // If MaxOutstandingBytes is set, that value will override BufferedByteLimit. // // Defaults to DefaultPublishSettings.BufferedByteLimit. // Deprecated: Set `Topic.PublishSettings.FlowControlSettings.MaxOutstandingBytes` instead. BufferedByteLimit int // FlowControlSettings defines publisher flow control settings. FlowControlSettings FlowControlSettings // EnableCompression enables transport compression for Publish operations EnableCompression bool // CompressionBytesThreshold defines the threshold (in bytes) above which messages // are compressed for transport. Only takes effect if EnableCompression is true. CompressionBytesThreshold int } func (ps *PublishSettings) shouldCompress(batchSize int) bool { return ps.EnableCompression && batchSize > ps.CompressionBytesThreshold } // DefaultPublishSettings holds the default values for topics' PublishSettings. var DefaultPublishSettings = PublishSettings{ DelayThreshold: 10 * time.Millisecond, CountThreshold: 100, ByteThreshold: 1e6, Timeout: 60 * time.Second, // By default, limit the bundler to 10 times the max message size. The number 10 is // chosen as a reasonable amount of messages in the worst case whilst still // capping the number to a low enough value to not OOM users. BufferedByteLimit: 10 * MaxPublishRequestBytes, FlowControlSettings: FlowControlSettings{ MaxOutstandingMessages: 1000, MaxOutstandingBytes: -1, LimitExceededBehavior: FlowControlIgnore, }, // Publisher compression defaults matches Java's defaults // https://github.com/googleapis/java-pubsub/blob/7d33e7891db1b2e32fd523d7655b6c11ea140a8b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L717-L718 EnableCompression: false, CompressionBytesThreshold: 240, } // CreateTopic creates a new topic. // // The specified topic ID must start with a letter, and contain only letters // ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), // tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 // characters in length, and must not start with "goog". For more information, // see: https://cloud.google.com/pubsub/docs/admin#resource_names // // If the topic already exists an error will be returned. func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error) { t := c.Topic(topicID) _, err := c.pubc.CreateTopic(ctx, &pb.Topic{Name: t.name}) if err != nil { return nil, err } return t, nil } // CreateTopicWithConfig creates a topic from TopicConfig. // // The specified topic ID must start with a letter, and contain only letters // ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.), // tildes (~), plus (+) or percent signs (%). It must be between 3 and 255 // characters in length, and must not start with "goog". For more information, // see: https://cloud.google.com/pubsub/docs/admin#resource_names. // // If the topic already exists, an error will be returned. func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error) { t := c.Topic(topicID) topic := tc.toProto() topic.Name = t.name _, err := c.pubc.CreateTopic(ctx, topic) if err != nil { return nil, err } return t, nil } // Topic creates a reference to a topic in the client's project. // // If a Topic's Publish method is called, it has background goroutines // associated with it. Clean them up by calling Topic.Stop. // // Avoid creating many Topic instances if you use them to publish. func (c *Client) Topic(id string) *Topic { return c.TopicInProject(id, c.projectID) } // TopicInProject creates a reference to a topic in the given project. // // If a Topic's Publish method is called, it has background goroutines // associated with it. Clean them up by calling Topic.Stop. // // Avoid creating many Topic instances if you use them to publish. func (c *Client) TopicInProject(id, projectID string) *Topic { return newTopic(c, fmt.Sprintf("projects/%s/topics/%s", projectID, id)) } func newTopic(c *Client, name string) *Topic { return &Topic{ c: c, name: name, PublishSettings: DefaultPublishSettings, } } // TopicState denotes the possible states for a topic. type TopicState int const ( // TopicStateUnspecified is the default value. This value is unused. TopicStateUnspecified = iota // TopicStateActive means the topic does not have any persistent errors. TopicStateActive // TopicStateIngestionResourceError means ingestion from the data source // has encountered a permanent error. // See the more detailed error state in the corresponding ingestion // source configuration. TopicStateIngestionResourceError ) // TopicConfig describes the configuration of a topic. type TopicConfig struct { // The fully qualified identifier for the topic, in the format "projects//topics/" name string // The set of labels for the topic. Labels map[string]string // The topic's message storage policy. MessageStoragePolicy MessageStoragePolicy // The name of the Cloud KMS key to be used to protect access to messages // published to this topic, in the format // "projects/P/locations/L/keyRings/R/cryptoKeys/K". KMSKeyName string // Schema defines the schema settings upon topic creation. SchemaSettings *SchemaSettings // RetentionDuration configures the minimum duration to retain a message // after it is published to the topic. If this field is set, messages published // to the topic in the last `RetentionDuration` are always available to subscribers. // For instance, it allows any attached subscription to [seek to a // timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time) // that is up to `RetentionDuration` in the past. If this field is // not set, message retention is controlled by settings on individual // subscriptions. Cannot be more than 31 days or less than 10 minutes. // // For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention. RetentionDuration optional.Duration // State is an output-only field indicating the state of the topic. State TopicState // IngestionDataSourceSettings are settings for ingestion from a // data source into this topic. IngestionDataSourceSettings *IngestionDataSourceSettings } // String returns the printable globally unique name for the topic config. // This method only works when the topic config is returned from the server, // such as when calling `client.Topic` or `client.Topics`. // Otherwise, this will return an empty string. func (t *TopicConfig) String() string { return t.name } // ID returns the unique identifier of the topic within its project. // This method only works when the topic config is returned from the server, // such as when calling `client.Topic` or `client.Topics`. // Otherwise, this will return an empty string. func (t *TopicConfig) ID() string { slash := strings.LastIndex(t.name, "/") if slash == -1 { return "" } return t.name[slash+1:] } func (tc *TopicConfig) toProto() *pb.Topic { var retDur *durationpb.Duration if tc.RetentionDuration != nil { retDur = durationpb.New(optional.ToDuration(tc.RetentionDuration)) } pbt := &pb.Topic{ Labels: tc.Labels, MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy), KmsKeyName: tc.KMSKeyName, SchemaSettings: schemaSettingsToProto(tc.SchemaSettings), MessageRetentionDuration: retDur, IngestionDataSourceSettings: tc.IngestionDataSourceSettings.toProto(), } return pbt } // TopicConfigToUpdate describes how to update a topic. type TopicConfigToUpdate struct { // If non-nil, the current set of labels is completely // replaced by the new set. Labels map[string]string // If non-nil, the existing policy (containing the list of regions) // is completely replaced by the new policy. // // Use the zero value &MessageStoragePolicy{} to reset the topic back to // using the organization's Resource Location Restriction policy. // // If nil, the policy remains unchanged. // // This field has beta status. It is not subject to the stability guarantee // and may change. MessageStoragePolicy *MessageStoragePolicy // If set to a positive duration between 10 minutes and 31 days, RetentionDuration is changed. // If set to a negative value, this clears RetentionDuration from the topic. // If nil, the retention duration remains unchanged. RetentionDuration optional.Duration // Schema defines the schema settings upon topic creation. // // Use the zero value &SchemaSettings{} to remove the schema from the topic. SchemaSettings *SchemaSettings // IngestionDataSourceSettings are settings for ingestion from a // data source into this topic. // // Use the zero value &IngestionDataSourceSettings{} to remove the ingestion settings from the topic. IngestionDataSourceSettings *IngestionDataSourceSettings } func protoToTopicConfig(pbt *pb.Topic) TopicConfig { tc := TopicConfig{ name: pbt.Name, Labels: pbt.Labels, MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy), KMSKeyName: pbt.KmsKeyName, SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings), State: TopicState(pbt.State), IngestionDataSourceSettings: protoToIngestionDataSourceSettings(pbt.IngestionDataSourceSettings), } if pbt.GetMessageRetentionDuration() != nil { tc.RetentionDuration = pbt.GetMessageRetentionDuration().AsDuration() } return tc } // DetachSubscriptionResult is the response for the DetachSubscription method. // Reserved for future use. type DetachSubscriptionResult struct{} // DetachSubscription detaches a subscription from its topic. All messages // retained in the subscription are dropped. Subsequent `Pull` and `StreamingPull` // requests will return FAILED_PRECONDITION. If the subscription is a push // subscription, pushes to the endpoint will stop. func (c *Client) DetachSubscription(ctx context.Context, sub string) (*DetachSubscriptionResult, error) { _, err := c.pubc.DetachSubscription(ctx, &pb.DetachSubscriptionRequest{ Subscription: sub, }) if err != nil { return nil, err } return &DetachSubscriptionResult{}, nil } // MessageStoragePolicy constrains how messages published to the topic may be stored. It // is determined when the topic is created based on the policy configured at // the project level. type MessageStoragePolicy struct { // AllowedPersistenceRegions is the list of GCP regions where messages that are published // to the topic may be persisted in storage. Messages published by publishers running in // non-allowed GCP regions (or running outside of GCP altogether) will be // routed for storage in one of the allowed regions. // // If empty, it indicates a misconfiguration at the project or organization level, which // will result in all Publish operations failing. This field cannot be empty in updates. // // If nil, then the policy is not defined on a topic level. When used in updates, it resets // the regions back to the organization level Resource Location Restriction policy. // // For more information, see // https://cloud.google.com/pubsub/docs/resource-location-restriction#pubsub-storage-locations. AllowedPersistenceRegions []string } func protoToMessageStoragePolicy(msp *pb.MessageStoragePolicy) MessageStoragePolicy { if msp == nil { return MessageStoragePolicy{} } return MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions} } func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePolicy { if msp == nil || msp.AllowedPersistenceRegions == nil { return nil } return &pb.MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions} } // IngestionDataSourceSettings enables ingestion from a data source into this topic. type IngestionDataSourceSettings struct { Source IngestionDataSource } // IngestionDataSource is the kind of ingestion source to be used. type IngestionDataSource interface { isIngestionDataSource() bool } // AWSKinesisState denotes the possible states for ingestion from Amazon Kinesis Data Streams. type AWSKinesisState int const ( // AWSKinesisStateUnspecified is the default value. This value is unused. AWSKinesisStateUnspecified = iota // AWSKinesisStateActive means ingestion is active. AWSKinesisStateActive // AWSKinesisStatePermissionDenied means encountering an error while consumign data from Kinesis. // This can happen if: // - The provided `aws_role_arn` does not exist or does not have the // appropriate permissions attached. // - The provided `aws_role_arn` is not set up properly for Identity // Federation using `gcp_service_account`. // - The Pub/Sub SA is not granted the // `iam.serviceAccounts.getOpenIdToken` permission on // `gcp_service_account`. AWSKinesisStatePermissionDenied // AWSKinesisStatePublishPermissionDenied means permission denied encountered while publishing to the topic. // This can happen due to Pub/Sub SA has not been granted the appropriate publish // permissions https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher AWSKinesisStatePublishPermissionDenied // AWSKinesisStateStreamNotFound means the Kinesis stream does not exist. AWSKinesisStateStreamNotFound // AWSKinesisStateConsumerNotFound means the Kinesis consumer does not exist. AWSKinesisStateConsumerNotFound ) // IngestionDataSourceAWSKinesis are ingestion settings for Amazon Kinesis Data Streams. type IngestionDataSourceAWSKinesis struct { // State is an output-only field indicating the state of the kinesis connection. State AWSKinesisState // StreamARN is the Kinesis stream ARN to ingest data from. StreamARN string // ConsumerARn is the Kinesis consumer ARN to used for ingestion in Enhanced // Fan-Out mode. The consumer must be already created and ready to be used. ConsumerARN string // AWSRoleARn is the AWS role ARN to be used for Federated Identity authentication // with Kinesis. Check the Pub/Sub docs for how to set up this role and the // required permissions that need to be attached to it. AWSRoleARN string // GCPServiceAccount is the GCP service account to be used for Federated Identity // authentication with Kinesis (via a `AssumeRoleWithWebIdentity` call for // the provided role). The `aws_role_arn` must be set up with // `accounts.google.com:sub` equals to this service account number. GCPServiceAccount string } var _ IngestionDataSource = (*IngestionDataSourceAWSKinesis)(nil) func (i *IngestionDataSourceAWSKinesis) isIngestionDataSource() bool { return true } func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *IngestionDataSourceSettings { if pbs == nil { return nil } s := &IngestionDataSourceSettings{} if k := pbs.GetAwsKinesis(); k != nil { s.Source = &IngestionDataSourceAWSKinesis{ State: AWSKinesisState(k.State), StreamARN: k.GetStreamArn(), ConsumerARN: k.GetConsumerArn(), AWSRoleARN: k.GetAwsRoleArn(), GCPServiceAccount: k.GetGcpServiceAccount(), } } return s } func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings { if i == nil { return nil } // An empty/zero-valued config is treated the same as nil and clearing this setting. if (IngestionDataSourceSettings{}) == *i { return nil } pbs := &pb.IngestionDataSourceSettings{} if out := i.Source; out != nil { if k, ok := out.(*IngestionDataSourceAWSKinesis); ok { pbs.Source = &pb.IngestionDataSourceSettings_AwsKinesis_{ AwsKinesis: &pb.IngestionDataSourceSettings_AwsKinesis{ State: pb.IngestionDataSourceSettings_AwsKinesis_State(k.State), StreamArn: k.StreamARN, ConsumerArn: k.ConsumerARN, AwsRoleArn: k.AWSRoleARN, GcpServiceAccount: k.GCPServiceAccount, }, } } } return pbs } // Config returns the TopicConfig for the topic. func (t *Topic) Config(ctx context.Context) (TopicConfig, error) { pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name}) if err != nil { return TopicConfig{}, err } return protoToTopicConfig(pbt), nil } // Update changes an existing topic according to the fields set in cfg. It returns // the new TopicConfig. func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error) { req := t.updateRequest(cfg) if len(req.UpdateMask.Paths) == 0 { return TopicConfig{}, errors.New("pubsub: UpdateTopic call with nothing to update") } rpt, err := t.c.pubc.UpdateTopic(ctx, req) if err != nil { return TopicConfig{}, err } return protoToTopicConfig(rpt), nil } func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest { pt := &pb.Topic{Name: t.name} var paths []string if cfg.Labels != nil { pt.Labels = cfg.Labels paths = append(paths, "labels") } if cfg.MessageStoragePolicy != nil { pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy) paths = append(paths, "message_storage_policy") } if cfg.RetentionDuration != nil { r := optional.ToDuration(cfg.RetentionDuration) pt.MessageRetentionDuration = durationpb.New(r) if r < 0 { // Clear MessageRetentionDuration if sentinel value is read. pt.MessageRetentionDuration = nil } paths = append(paths, "message_retention_duration") } // Updating SchemaSettings' field masks are more complicated here // since each field should be able to be independently edited, while // preserving the current values for everything else. We also denote // the zero value SchemaSetting to mean clearing or removing schema // from the topic. if cfg.SchemaSettings != nil { pt.SchemaSettings = schemaSettingsToProto(cfg.SchemaSettings) clearSchema := true if pt.SchemaSettings.Schema != "" { paths = append(paths, "schema_settings.schema") clearSchema = false } if pt.SchemaSettings.Encoding != pb.Encoding_ENCODING_UNSPECIFIED { paths = append(paths, "schema_settings.encoding") clearSchema = false } if pt.SchemaSettings.FirstRevisionId != "" { paths = append(paths, "schema_settings.first_revision_id") clearSchema = false } if pt.SchemaSettings.LastRevisionId != "" { paths = append(paths, "schema_settings.last_revision_id") clearSchema = false } // Clear the schema if all of its values are equal to the zero value. if clearSchema { paths = append(paths, "schema_settings") pt.SchemaSettings = nil } } if cfg.IngestionDataSourceSettings != nil { pt.IngestionDataSourceSettings = cfg.IngestionDataSourceSettings.toProto() paths = append(paths, "ingestion_data_source_settings") } return &pb.UpdateTopicRequest{ Topic: pt, UpdateMask: &fmpb.FieldMask{Paths: paths}, } } // Topics returns an iterator which returns all of the topics for the client's project. func (c *Client) Topics(ctx context.Context) *TopicIterator { it := c.pubc.ListTopics(ctx, &pb.ListTopicsRequest{Project: c.fullyQualifiedProjectName()}) return &TopicIterator{ c: c, it: it, next: func() (string, error) { topic, err := it.Next() if err != nil { return "", err } return topic.Name, nil }, } } // TopicIterator is an iterator that returns a series of topics. type TopicIterator struct { c *Client it *vkit.TopicIterator next func() (string, error) } // Next returns the next topic. If there are no more topics, iterator.Done will be returned. func (tps *TopicIterator) Next() (*Topic, error) { topicName, err := tps.next() if err != nil { return nil, err } return newTopic(tps.c, topicName), nil } // NextConfig returns the next topic config. If there are no more topics, // iterator.Done will be returned. // This call shares the underlying iterator with calls to `TopicIterator.Next`. // If you wish to use mix calls, create separate iterator instances for both. func (t *TopicIterator) NextConfig() (*TopicConfig, error) { tpb, err := t.it.Next() if err != nil { return nil, err } cfg := protoToTopicConfig(tpb) return &cfg, nil } // ID returns the unique identifier of the topic within its project. func (t *Topic) ID() string { slash := strings.LastIndex(t.name, "/") if slash == -1 { // name is not a fully-qualified name. panic("bad topic name") } return t.name[slash+1:] } // String returns the printable globally unique name for the topic. func (t *Topic) String() string { return t.name } // Delete deletes the topic. func (t *Topic) Delete(ctx context.Context) error { return t.c.pubc.DeleteTopic(ctx, &pb.DeleteTopicRequest{Topic: t.name}) } // Exists reports whether the topic exists on the server. func (t *Topic) Exists(ctx context.Context) (bool, error) { if t.name == "_deleted-topic_" { return false, nil } _, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name}) if err == nil { return true, nil } if status.Code(err) == codes.NotFound { return false, nil } return false, err } // IAM returns the topic's IAM handle. func (t *Topic) IAM() *iam.Handle { return iam.InternalNewHandle(t.c.pubc.Connection(), t.name) } // Subscriptions returns an iterator which returns the subscriptions for this topic. // // Some of the returned subscriptions may belong to a project other than t. func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator { it := t.c.pubc.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{ Topic: t.name, }) return &SubscriptionIterator{ c: t.c, next: it.Next, } } // ErrTopicStopped indicates that topic has been stopped and further publishing will fail. var ErrTopicStopped = errors.New("pubsub: Stop has been called for this topic") // A PublishResult holds the result from a call to Publish. // // Call Get to obtain the result of the Publish call. Example: // // // Get blocks until Publish completes or ctx is done. // id, err := r.Get(ctx) // if err != nil { // // TODO: Handle error. // } type PublishResult = ipubsub.PublishResult var errTopicOrderingNotEnabled = errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering") // Publish publishes msg to the topic asynchronously. Messages are batched and // sent according to the topic's PublishSettings. Publish never blocks. // // Publish returns a non-nil PublishResult which will be ready when the // message has been sent (or has failed to be sent) to the server. // // Publish creates goroutines for batching and sending messages. These goroutines // need to be stopped by calling t.Stop(). Once stopped, future calls to Publish // will immediately return a PublishResult with an error. func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult { ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name)) if err != nil { log.Printf("pubsub: cannot create context with tag in Publish: %v", err) } r := ipubsub.NewPublishResult() if !t.EnableMessageOrdering && msg.OrderingKey != "" { ipubsub.SetPublishResult(r, "", errTopicOrderingNotEnabled) return r } // Calculate the size of the encoded proto message by accounting // for the length of an individual PubSubMessage and Data/Attributes field. msgSize := proto.Size(&pb.PubsubMessage{ Data: msg.Data, Attributes: msg.Attributes, OrderingKey: msg.OrderingKey, }) t.initBundler() t.mu.RLock() defer t.mu.RUnlock() if t.stopped { ipubsub.SetPublishResult(r, "", ErrTopicStopped) return r } if err := t.flowController.acquire(ctx, msgSize); err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) return r } err = t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize) if err != nil { t.scheduler.Pause(msg.OrderingKey) ipubsub.SetPublishResult(r, "", err) } return r } // Stop sends all remaining published messages and stop goroutines created for handling // publishing. Returns once all outstanding messages have been sent or have // failed to be sent. func (t *Topic) Stop() { t.mu.Lock() noop := t.stopped || t.scheduler == nil t.stopped = true t.mu.Unlock() if noop { return } t.scheduler.FlushAndStop() } // Flush blocks until all remaining messages are sent. func (t *Topic) Flush() { if t.stopped || t.scheduler == nil { return } t.scheduler.Flush() } type bundledMessage struct { msg *Message res *PublishResult size int } func (t *Topic) initBundler() { t.mu.RLock() noop := t.stopped || t.scheduler != nil t.mu.RUnlock() if noop { return } t.mu.Lock() defer t.mu.Unlock() // Must re-check, since we released the lock. if t.stopped || t.scheduler != nil { return } timeout := t.PublishSettings.Timeout workers := t.PublishSettings.NumGoroutines // Unless overridden, allow many goroutines per CPU to call the Publish RPC // concurrently. The default value was determined via extensive load // testing (see the loadtest subdirectory). if t.PublishSettings.NumGoroutines == 0 { workers = 25 * runtime.GOMAXPROCS(0) } t.scheduler = scheduler.NewPublishScheduler(workers, func(bundle interface{}) { // TODO(jba): use a context detached from the one passed to NewClient. ctx := context.TODO() if timeout != 0 { var cancel func() ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() } t.publishMessageBundle(ctx, bundle.([]*bundledMessage)) }) t.scheduler.DelayThreshold = t.PublishSettings.DelayThreshold t.scheduler.BundleCountThreshold = t.PublishSettings.CountThreshold if t.scheduler.BundleCountThreshold > MaxPublishRequestCount { t.scheduler.BundleCountThreshold = MaxPublishRequestCount } t.scheduler.BundleByteThreshold = t.PublishSettings.ByteThreshold fcs := DefaultPublishSettings.FlowControlSettings fcs.LimitExceededBehavior = t.PublishSettings.FlowControlSettings.LimitExceededBehavior if t.PublishSettings.FlowControlSettings.MaxOutstandingBytes > 0 { b := t.PublishSettings.FlowControlSettings.MaxOutstandingBytes fcs.MaxOutstandingBytes = b // If MaxOutstandingBytes is set, disable BufferedByteLimit by setting it to maxint. // This is because there's no way to set "unlimited" for BufferedByteLimit, // and simply setting it to MaxOutstandingBytes occasionally leads to issues where // BufferedByteLimit is reached even though there are resources available. t.PublishSettings.BufferedByteLimit = maxInt } if t.PublishSettings.FlowControlSettings.MaxOutstandingMessages > 0 { fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages } t.flowController = newTopicFlowController(fcs) bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit if t.PublishSettings.BufferedByteLimit > 0 { bufferedByteLimit = t.PublishSettings.BufferedByteLimit } t.scheduler.BufferedByteLimit = bufferedByteLimit // Calculate the max limit of a single bundle. 5 comes from the number of bytes // needed to be reserved for encoding the PubsubMessage repeated field. t.scheduler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) - 5 } // ErrPublishingPaused is a custom error indicating that the publish paused for the specified ordering key. type ErrPublishingPaused struct { OrderingKey string } func (e ErrPublishingPaused) Error() string { return fmt.Sprintf("pubsub: Publishing for ordering key, %s, paused due to previous error. Call topic.ResumePublish(orderingKey) before resuming publishing", e.OrderingKey) } func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) { ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name)) if err != nil { log.Printf("pubsub: cannot create context with tag in publishMessageBundle: %v", err) } pbMsgs := make([]*pb.PubsubMessage, len(bms)) var orderingKey string batchSize := 0 for i, bm := range bms { orderingKey = bm.msg.OrderingKey pbMsgs[i] = &pb.PubsubMessage{ Data: bm.msg.Data, Attributes: bm.msg.Attributes, OrderingKey: bm.msg.OrderingKey, } batchSize = batchSize + proto.Size(pbMsgs[i]) bm.msg = nil // release bm.msg for GC } var res *pb.PublishResponse start := time.Now() if orderingKey != "" && t.scheduler.IsPaused(orderingKey) { err = ErrPublishingPaused{OrderingKey: orderingKey} } else { // Apply custom publish retryer on top of user specified retryer and // default retryer. opts := t.c.pubc.CallOptions.Publish var settings gax.CallSettings for _, opt := range opts { opt.Resolve(&settings) } r := &publishRetryer{defaultRetryer: settings.Retry()} gaxOpts := []gax.CallOption{ gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)), gax.WithRetry(func() gax.Retryer { return r }), } if t.PublishSettings.shouldCompress(batchSize) { gaxOpts = append(gaxOpts, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name))) } res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{ Topic: t.name, Messages: pbMsgs, }, gaxOpts...) } end := time.Now() if err != nil { t.scheduler.Pause(orderingKey) // Update context with error tag for OpenCensus, // using same stats.Record() call as success case. ctx, _ = tag.New(ctx, tag.Upsert(keyStatus, "ERROR"), tag.Upsert(keyError, err.Error())) } stats.Record(ctx, PublishLatency.M(float64(end.Sub(start)/time.Millisecond)), PublishedMessages.M(int64(len(bms)))) for i, bm := range bms { t.flowController.release(ctx, bm.size) if err != nil { ipubsub.SetPublishResult(bm.res, "", err) } else { ipubsub.SetPublishResult(bm.res, res.MessageIds[i], nil) } } } // ResumePublish resumes accepting messages for the provided ordering key. // Publishing using an ordering key might be paused if an error is // encountered while publishing, to prevent messages from being published // out of order. func (t *Topic) ResumePublish(orderingKey string) { t.mu.RLock() noop := t.scheduler == nil t.mu.RUnlock() if noop { return } t.scheduler.Resume(orderingKey) }