...

Source file src/cloud.google.com/go/pubsub/topic.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2016 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"log"
    22  	"runtime"
    23  	"strings"
    24  	"sync"
    25  	"time"
    26  
    27  	"cloud.google.com/go/iam"
    28  	"cloud.google.com/go/internal/optional"
    29  	ipubsub "cloud.google.com/go/internal/pubsub"
    30  	vkit "cloud.google.com/go/pubsub/apiv1"
    31  	pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
    32  	"cloud.google.com/go/pubsub/internal/scheduler"
    33  	gax "github.com/googleapis/gax-go/v2"
    34  	"go.opencensus.io/stats"
    35  	"go.opencensus.io/tag"
    36  	"google.golang.org/api/support/bundler"
    37  	"google.golang.org/grpc"
    38  	"google.golang.org/grpc/codes"
    39  	"google.golang.org/grpc/encoding/gzip"
    40  	"google.golang.org/grpc/status"
    41  	"google.golang.org/protobuf/proto"
    42  	"google.golang.org/protobuf/types/known/durationpb"
    43  	fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"
    44  )
    45  
    46  const (
    47  	// MaxPublishRequestCount is the maximum number of messages that can be in
    48  	// a single publish request, as defined by the PubSub service.
    49  	MaxPublishRequestCount = 1000
    50  
    51  	// MaxPublishRequestBytes is the maximum size of a single publish request
    52  	// in bytes, as defined by the PubSub service.
    53  	MaxPublishRequestBytes = 1e7
    54  )
    55  
    56  const (
    57  	// TODO: math.MaxInt was added in Go 1.17. We should use that once 1.17
    58  	// becomes the minimum supported version of Go.
    59  	intSize = 32 << (^uint(0) >> 63)
    60  	maxInt  = 1<<(intSize-1) - 1
    61  )
    62  
    63  // ErrOversizedMessage indicates that a message's size exceeds MaxPublishRequestBytes.
    64  var ErrOversizedMessage = bundler.ErrOversizedItem
    65  
    66  // Topic is a reference to a PubSub topic.
    67  //
    68  // The methods of Topic are safe for use by multiple goroutines.
    69  type Topic struct {
    70  	c *Client
    71  	// The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>"
    72  	name string
    73  
    74  	// Settings for publishing messages. All changes must be made before the
    75  	// first call to Publish. The default is DefaultPublishSettings.
    76  	PublishSettings PublishSettings
    77  
    78  	mu        sync.RWMutex
    79  	stopped   bool
    80  	scheduler *scheduler.PublishScheduler
    81  
    82  	flowController
    83  
    84  	// EnableMessageOrdering enables delivery of ordered keys.
    85  	EnableMessageOrdering bool
    86  }
    87  
    88  // PublishSettings control the bundling of published messages.
    89  type PublishSettings struct {
    90  
    91  	// Publish a non-empty batch after this delay has passed.
    92  	DelayThreshold time.Duration
    93  
    94  	// Publish a batch when it has this many messages. The maximum is
    95  	// MaxPublishRequestCount.
    96  	CountThreshold int
    97  
    98  	// Publish a batch when its size in bytes reaches this value.
    99  	ByteThreshold int
   100  
   101  	// The number of goroutines used in each of the data structures that are
   102  	// involved along the the Publish path. Adjusting this value adjusts
   103  	// concurrency along the publish path.
   104  	//
   105  	// Defaults to a multiple of GOMAXPROCS.
   106  	NumGoroutines int
   107  
   108  	// The maximum time that the client will attempt to publish a bundle of messages.
   109  	Timeout time.Duration
   110  
   111  	// The maximum number of bytes that the Bundler will keep in memory before
   112  	// returning ErrOverflow. This is now superseded by FlowControlSettings.MaxOutstandingBytes.
   113  	// If MaxOutstandingBytes is set, that value will override BufferedByteLimit.
   114  	//
   115  	// Defaults to DefaultPublishSettings.BufferedByteLimit.
   116  	// Deprecated: Set `Topic.PublishSettings.FlowControlSettings.MaxOutstandingBytes` instead.
   117  	BufferedByteLimit int
   118  
   119  	// FlowControlSettings defines publisher flow control settings.
   120  	FlowControlSettings FlowControlSettings
   121  
   122  	// EnableCompression enables transport compression for Publish operations
   123  	EnableCompression bool
   124  
   125  	// CompressionBytesThreshold defines the threshold (in bytes) above which messages
   126  	// are compressed for transport. Only takes effect if EnableCompression is true.
   127  	CompressionBytesThreshold int
   128  }
   129  
   130  func (ps *PublishSettings) shouldCompress(batchSize int) bool {
   131  	return ps.EnableCompression && batchSize > ps.CompressionBytesThreshold
   132  }
   133  
   134  // DefaultPublishSettings holds the default values for topics' PublishSettings.
   135  var DefaultPublishSettings = PublishSettings{
   136  	DelayThreshold: 10 * time.Millisecond,
   137  	CountThreshold: 100,
   138  	ByteThreshold:  1e6,
   139  	Timeout:        60 * time.Second,
   140  	// By default, limit the bundler to 10 times the max message size. The number 10 is
   141  	// chosen as a reasonable amount of messages in the worst case whilst still
   142  	// capping the number to a low enough value to not OOM users.
   143  	BufferedByteLimit: 10 * MaxPublishRequestBytes,
   144  	FlowControlSettings: FlowControlSettings{
   145  		MaxOutstandingMessages: 1000,
   146  		MaxOutstandingBytes:    -1,
   147  		LimitExceededBehavior:  FlowControlIgnore,
   148  	},
   149  	// Publisher compression defaults matches Java's defaults
   150  	// https://github.com/googleapis/java-pubsub/blob/7d33e7891db1b2e32fd523d7655b6c11ea140a8b/google-cloud-pubsub/src/main/java/com/google/cloud/pubsub/v1/Publisher.java#L717-L718
   151  	EnableCompression:         false,
   152  	CompressionBytesThreshold: 240,
   153  }
   154  
   155  // CreateTopic creates a new topic.
   156  //
   157  // The specified topic ID must start with a letter, and contain only letters
   158  // ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.),
   159  // tildes (~), plus (+) or percent signs (%). It must be between 3 and 255
   160  // characters in length, and must not start with "goog". For more information,
   161  // see: https://cloud.google.com/pubsub/docs/admin#resource_names
   162  //
   163  // If the topic already exists an error will be returned.
   164  func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error) {
   165  	t := c.Topic(topicID)
   166  	_, err := c.pubc.CreateTopic(ctx, &pb.Topic{Name: t.name})
   167  	if err != nil {
   168  		return nil, err
   169  	}
   170  	return t, nil
   171  }
   172  
   173  // CreateTopicWithConfig creates a topic from TopicConfig.
   174  //
   175  // The specified topic ID must start with a letter, and contain only letters
   176  // ([A-Za-z]), numbers ([0-9]), dashes (-), underscores (_), periods (.),
   177  // tildes (~), plus (+) or percent signs (%). It must be between 3 and 255
   178  // characters in length, and must not start with "goog". For more information,
   179  // see: https://cloud.google.com/pubsub/docs/admin#resource_names.
   180  //
   181  // If the topic already exists, an error will be returned.
   182  func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error) {
   183  	t := c.Topic(topicID)
   184  	topic := tc.toProto()
   185  	topic.Name = t.name
   186  	_, err := c.pubc.CreateTopic(ctx, topic)
   187  	if err != nil {
   188  		return nil, err
   189  	}
   190  	return t, nil
   191  }
   192  
   193  // Topic creates a reference to a topic in the client's project.
   194  //
   195  // If a Topic's Publish method is called, it has background goroutines
   196  // associated with it. Clean them up by calling Topic.Stop.
   197  //
   198  // Avoid creating many Topic instances if you use them to publish.
   199  func (c *Client) Topic(id string) *Topic {
   200  	return c.TopicInProject(id, c.projectID)
   201  }
   202  
   203  // TopicInProject creates a reference to a topic in the given project.
   204  //
   205  // If a Topic's Publish method is called, it has background goroutines
   206  // associated with it. Clean them up by calling Topic.Stop.
   207  //
   208  // Avoid creating many Topic instances if you use them to publish.
   209  func (c *Client) TopicInProject(id, projectID string) *Topic {
   210  	return newTopic(c, fmt.Sprintf("projects/%s/topics/%s", projectID, id))
   211  }
   212  
   213  func newTopic(c *Client, name string) *Topic {
   214  	return &Topic{
   215  		c:               c,
   216  		name:            name,
   217  		PublishSettings: DefaultPublishSettings,
   218  	}
   219  }
   220  
   221  // TopicState denotes the possible states for a topic.
   222  type TopicState int
   223  
   224  const (
   225  	// TopicStateUnspecified is the default value. This value is unused.
   226  	TopicStateUnspecified = iota
   227  
   228  	// TopicStateActive means the topic does not have any persistent errors.
   229  	TopicStateActive
   230  
   231  	// TopicStateIngestionResourceError means ingestion from the data source
   232  	// has encountered a permanent error.
   233  	// See the more detailed error state in the corresponding ingestion
   234  	// source configuration.
   235  	TopicStateIngestionResourceError
   236  )
   237  
   238  // TopicConfig describes the configuration of a topic.
   239  type TopicConfig struct {
   240  	// The fully qualified identifier for the topic, in the format "projects/<projid>/topics/<name>"
   241  	name string
   242  
   243  	// The set of labels for the topic.
   244  	Labels map[string]string
   245  
   246  	// The topic's message storage policy.
   247  	MessageStoragePolicy MessageStoragePolicy
   248  
   249  	// The name of the Cloud KMS key to be used to protect access to messages
   250  	// published to this topic, in the format
   251  	// "projects/P/locations/L/keyRings/R/cryptoKeys/K".
   252  	KMSKeyName string
   253  
   254  	// Schema defines the schema settings upon topic creation.
   255  	SchemaSettings *SchemaSettings
   256  
   257  	// RetentionDuration configures the minimum duration to retain a message
   258  	// after it is published to the topic. If this field is set, messages published
   259  	// to the topic in the last `RetentionDuration` are always available to subscribers.
   260  	// For instance, it allows any attached subscription to [seek to a
   261  	// timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time)
   262  	// that is up to `RetentionDuration` in the past. If this field is
   263  	// not set, message retention is controlled by settings on individual
   264  	// subscriptions. Cannot be more than 31 days or less than 10 minutes.
   265  	//
   266  	// For more information, see https://cloud.google.com/pubsub/docs/replay-overview#topic_message_retention.
   267  	RetentionDuration optional.Duration
   268  
   269  	// State is an output-only field indicating the state of the topic.
   270  	State TopicState
   271  
   272  	// IngestionDataSourceSettings are settings for ingestion from a
   273  	// data source into this topic.
   274  	IngestionDataSourceSettings *IngestionDataSourceSettings
   275  }
   276  
   277  // String returns the printable globally unique name for the topic config.
   278  // This method only works when the topic config is returned from the server,
   279  // such as when calling `client.Topic` or `client.Topics`.
   280  // Otherwise, this will return an empty string.
   281  func (t *TopicConfig) String() string {
   282  	return t.name
   283  }
   284  
   285  // ID returns the unique identifier of the topic within its project.
   286  // This method only works when the topic config is returned from the server,
   287  // such as when calling `client.Topic` or `client.Topics`.
   288  // Otherwise, this will return an empty string.
   289  func (t *TopicConfig) ID() string {
   290  	slash := strings.LastIndex(t.name, "/")
   291  	if slash == -1 {
   292  		return ""
   293  	}
   294  	return t.name[slash+1:]
   295  }
   296  
   297  func (tc *TopicConfig) toProto() *pb.Topic {
   298  	var retDur *durationpb.Duration
   299  	if tc.RetentionDuration != nil {
   300  		retDur = durationpb.New(optional.ToDuration(tc.RetentionDuration))
   301  	}
   302  	pbt := &pb.Topic{
   303  		Labels:                      tc.Labels,
   304  		MessageStoragePolicy:        messageStoragePolicyToProto(&tc.MessageStoragePolicy),
   305  		KmsKeyName:                  tc.KMSKeyName,
   306  		SchemaSettings:              schemaSettingsToProto(tc.SchemaSettings),
   307  		MessageRetentionDuration:    retDur,
   308  		IngestionDataSourceSettings: tc.IngestionDataSourceSettings.toProto(),
   309  	}
   310  	return pbt
   311  }
   312  
   313  // TopicConfigToUpdate describes how to update a topic.
   314  type TopicConfigToUpdate struct {
   315  	// If non-nil, the current set of labels is completely
   316  	// replaced by the new set.
   317  	Labels map[string]string
   318  
   319  	// If non-nil, the existing policy (containing the list of regions)
   320  	// is completely replaced by the new policy.
   321  	//
   322  	// Use the zero value &MessageStoragePolicy{} to reset the topic back to
   323  	// using the organization's Resource Location Restriction policy.
   324  	//
   325  	// If nil, the policy remains unchanged.
   326  	//
   327  	// This field has beta status. It is not subject to the stability guarantee
   328  	// and may change.
   329  	MessageStoragePolicy *MessageStoragePolicy
   330  
   331  	// If set to a positive duration between 10 minutes and 31 days, RetentionDuration is changed.
   332  	// If set to a negative value, this clears RetentionDuration from the topic.
   333  	// If nil, the retention duration remains unchanged.
   334  	RetentionDuration optional.Duration
   335  
   336  	// Schema defines the schema settings upon topic creation.
   337  	//
   338  	// Use the zero value &SchemaSettings{} to remove the schema from the topic.
   339  	SchemaSettings *SchemaSettings
   340  
   341  	// IngestionDataSourceSettings are settings for ingestion from a
   342  	// data source into this topic.
   343  	//
   344  	// Use the zero value &IngestionDataSourceSettings{} to remove the ingestion settings from the topic.
   345  	IngestionDataSourceSettings *IngestionDataSourceSettings
   346  }
   347  
   348  func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
   349  	tc := TopicConfig{
   350  		name:                        pbt.Name,
   351  		Labels:                      pbt.Labels,
   352  		MessageStoragePolicy:        protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
   353  		KMSKeyName:                  pbt.KmsKeyName,
   354  		SchemaSettings:              protoToSchemaSettings(pbt.SchemaSettings),
   355  		State:                       TopicState(pbt.State),
   356  		IngestionDataSourceSettings: protoToIngestionDataSourceSettings(pbt.IngestionDataSourceSettings),
   357  	}
   358  	if pbt.GetMessageRetentionDuration() != nil {
   359  		tc.RetentionDuration = pbt.GetMessageRetentionDuration().AsDuration()
   360  	}
   361  	return tc
   362  }
   363  
   364  // DetachSubscriptionResult is the response for the DetachSubscription method.
   365  // Reserved for future use.
   366  type DetachSubscriptionResult struct{}
   367  
   368  // DetachSubscription detaches a subscription from its topic. All messages
   369  // retained in the subscription are dropped. Subsequent `Pull` and `StreamingPull`
   370  // requests will return FAILED_PRECONDITION. If the subscription is a push
   371  // subscription, pushes to the endpoint will stop.
   372  func (c *Client) DetachSubscription(ctx context.Context, sub string) (*DetachSubscriptionResult, error) {
   373  	_, err := c.pubc.DetachSubscription(ctx, &pb.DetachSubscriptionRequest{
   374  		Subscription: sub,
   375  	})
   376  	if err != nil {
   377  		return nil, err
   378  	}
   379  	return &DetachSubscriptionResult{}, nil
   380  }
   381  
   382  // MessageStoragePolicy constrains how messages published to the topic may be stored. It
   383  // is determined when the topic is created based on the policy configured at
   384  // the project level.
   385  type MessageStoragePolicy struct {
   386  	// AllowedPersistenceRegions is the list of GCP regions where messages that are published
   387  	// to the topic may be persisted in storage. Messages published by publishers running in
   388  	// non-allowed GCP regions (or running outside of GCP altogether) will be
   389  	// routed for storage in one of the allowed regions.
   390  	//
   391  	// If empty, it indicates a misconfiguration at the project or organization level, which
   392  	// will result in all Publish operations failing. This field cannot be empty in updates.
   393  	//
   394  	// If nil, then the policy is not defined on a topic level. When used in updates, it resets
   395  	// the regions back to the organization level Resource Location Restriction policy.
   396  	//
   397  	// For more information, see
   398  	// https://cloud.google.com/pubsub/docs/resource-location-restriction#pubsub-storage-locations.
   399  	AllowedPersistenceRegions []string
   400  }
   401  
   402  func protoToMessageStoragePolicy(msp *pb.MessageStoragePolicy) MessageStoragePolicy {
   403  	if msp == nil {
   404  		return MessageStoragePolicy{}
   405  	}
   406  	return MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
   407  }
   408  
   409  func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePolicy {
   410  	if msp == nil || msp.AllowedPersistenceRegions == nil {
   411  		return nil
   412  	}
   413  	return &pb.MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
   414  }
   415  
   416  // IngestionDataSourceSettings enables ingestion from a data source into this topic.
   417  type IngestionDataSourceSettings struct {
   418  	Source IngestionDataSource
   419  }
   420  
   421  // IngestionDataSource is the kind of ingestion source to be used.
   422  type IngestionDataSource interface {
   423  	isIngestionDataSource() bool
   424  }
   425  
   426  // AWSKinesisState denotes the possible states for ingestion from Amazon Kinesis Data Streams.
   427  type AWSKinesisState int
   428  
   429  const (
   430  	// AWSKinesisStateUnspecified is the default value. This value is unused.
   431  	AWSKinesisStateUnspecified = iota
   432  
   433  	// AWSKinesisStateActive means ingestion is active.
   434  	AWSKinesisStateActive
   435  
   436  	// AWSKinesisStatePermissionDenied means encountering an error while consumign data from Kinesis.
   437  	// This can happen if:
   438  	//   - The provided `aws_role_arn` does not exist or does not have the
   439  	//     appropriate permissions attached.
   440  	//   - The provided `aws_role_arn` is not set up properly for Identity
   441  	//     Federation using `gcp_service_account`.
   442  	//   - The Pub/Sub SA is not granted the
   443  	//     `iam.serviceAccounts.getOpenIdToken` permission on
   444  	//     `gcp_service_account`.
   445  	AWSKinesisStatePermissionDenied
   446  
   447  	// AWSKinesisStatePublishPermissionDenied means permission denied encountered while publishing to the topic.
   448  	// This can happen due to Pub/Sub SA has not been granted the appropriate publish
   449  	// permissions https://cloud.google.com/pubsub/docs/access-control#pubsub.publisher
   450  	AWSKinesisStatePublishPermissionDenied
   451  
   452  	// AWSKinesisStateStreamNotFound means the Kinesis stream does not exist.
   453  	AWSKinesisStateStreamNotFound
   454  
   455  	// AWSKinesisStateConsumerNotFound means the Kinesis consumer does not exist.
   456  	AWSKinesisStateConsumerNotFound
   457  )
   458  
   459  // IngestionDataSourceAWSKinesis are ingestion settings for Amazon Kinesis Data Streams.
   460  type IngestionDataSourceAWSKinesis struct {
   461  	// State is an output-only field indicating the state of the kinesis connection.
   462  	State AWSKinesisState
   463  
   464  	// StreamARN is the Kinesis stream ARN to ingest data from.
   465  	StreamARN string
   466  
   467  	// ConsumerARn is the Kinesis consumer ARN to used for ingestion in Enhanced
   468  	// Fan-Out mode. The consumer must be already created and ready to be used.
   469  	ConsumerARN string
   470  
   471  	// AWSRoleARn is the AWS role ARN to be used for Federated Identity authentication
   472  	// with Kinesis. Check the Pub/Sub docs for how to set up this role and the
   473  	// required permissions that need to be attached to it.
   474  	AWSRoleARN string
   475  
   476  	// GCPServiceAccount is the GCP service account to be used for Federated Identity
   477  	// authentication with Kinesis (via a `AssumeRoleWithWebIdentity` call for
   478  	// the provided role). The `aws_role_arn` must be set up with
   479  	// `accounts.google.com:sub` equals to this service account number.
   480  	GCPServiceAccount string
   481  }
   482  
   483  var _ IngestionDataSource = (*IngestionDataSourceAWSKinesis)(nil)
   484  
   485  func (i *IngestionDataSourceAWSKinesis) isIngestionDataSource() bool {
   486  	return true
   487  }
   488  
   489  func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *IngestionDataSourceSettings {
   490  	if pbs == nil {
   491  		return nil
   492  	}
   493  
   494  	s := &IngestionDataSourceSettings{}
   495  	if k := pbs.GetAwsKinesis(); k != nil {
   496  		s.Source = &IngestionDataSourceAWSKinesis{
   497  			State:             AWSKinesisState(k.State),
   498  			StreamARN:         k.GetStreamArn(),
   499  			ConsumerARN:       k.GetConsumerArn(),
   500  			AWSRoleARN:        k.GetAwsRoleArn(),
   501  			GCPServiceAccount: k.GetGcpServiceAccount(),
   502  		}
   503  	}
   504  	return s
   505  }
   506  
   507  func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings {
   508  	if i == nil {
   509  		return nil
   510  	}
   511  	// An empty/zero-valued config is treated the same as nil and clearing this setting.
   512  	if (IngestionDataSourceSettings{}) == *i {
   513  		return nil
   514  	}
   515  	pbs := &pb.IngestionDataSourceSettings{}
   516  	if out := i.Source; out != nil {
   517  		if k, ok := out.(*IngestionDataSourceAWSKinesis); ok {
   518  			pbs.Source = &pb.IngestionDataSourceSettings_AwsKinesis_{
   519  				AwsKinesis: &pb.IngestionDataSourceSettings_AwsKinesis{
   520  					State:             pb.IngestionDataSourceSettings_AwsKinesis_State(k.State),
   521  					StreamArn:         k.StreamARN,
   522  					ConsumerArn:       k.ConsumerARN,
   523  					AwsRoleArn:        k.AWSRoleARN,
   524  					GcpServiceAccount: k.GCPServiceAccount,
   525  				},
   526  			}
   527  		}
   528  	}
   529  	return pbs
   530  }
   531  
   532  // Config returns the TopicConfig for the topic.
   533  func (t *Topic) Config(ctx context.Context) (TopicConfig, error) {
   534  	pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
   535  	if err != nil {
   536  		return TopicConfig{}, err
   537  	}
   538  	return protoToTopicConfig(pbt), nil
   539  }
   540  
   541  // Update changes an existing topic according to the fields set in cfg. It returns
   542  // the new TopicConfig.
   543  func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error) {
   544  	req := t.updateRequest(cfg)
   545  	if len(req.UpdateMask.Paths) == 0 {
   546  		return TopicConfig{}, errors.New("pubsub: UpdateTopic call with nothing to update")
   547  	}
   548  	rpt, err := t.c.pubc.UpdateTopic(ctx, req)
   549  	if err != nil {
   550  		return TopicConfig{}, err
   551  	}
   552  	return protoToTopicConfig(rpt), nil
   553  }
   554  
   555  func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
   556  	pt := &pb.Topic{Name: t.name}
   557  	var paths []string
   558  	if cfg.Labels != nil {
   559  		pt.Labels = cfg.Labels
   560  		paths = append(paths, "labels")
   561  	}
   562  	if cfg.MessageStoragePolicy != nil {
   563  		pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy)
   564  		paths = append(paths, "message_storage_policy")
   565  	}
   566  	if cfg.RetentionDuration != nil {
   567  		r := optional.ToDuration(cfg.RetentionDuration)
   568  		pt.MessageRetentionDuration = durationpb.New(r)
   569  		if r < 0 {
   570  			// Clear MessageRetentionDuration if sentinel value is read.
   571  			pt.MessageRetentionDuration = nil
   572  		}
   573  		paths = append(paths, "message_retention_duration")
   574  	}
   575  	// Updating SchemaSettings' field masks are more complicated here
   576  	// since each field should be able to be independently edited, while
   577  	// preserving the current values for everything else. We also denote
   578  	// the zero value SchemaSetting to mean clearing or removing schema
   579  	// from the topic.
   580  	if cfg.SchemaSettings != nil {
   581  		pt.SchemaSettings = schemaSettingsToProto(cfg.SchemaSettings)
   582  		clearSchema := true
   583  		if pt.SchemaSettings.Schema != "" {
   584  			paths = append(paths, "schema_settings.schema")
   585  			clearSchema = false
   586  		}
   587  		if pt.SchemaSettings.Encoding != pb.Encoding_ENCODING_UNSPECIFIED {
   588  			paths = append(paths, "schema_settings.encoding")
   589  			clearSchema = false
   590  		}
   591  		if pt.SchemaSettings.FirstRevisionId != "" {
   592  			paths = append(paths, "schema_settings.first_revision_id")
   593  			clearSchema = false
   594  		}
   595  		if pt.SchemaSettings.LastRevisionId != "" {
   596  			paths = append(paths, "schema_settings.last_revision_id")
   597  			clearSchema = false
   598  		}
   599  		// Clear the schema if all of its values are equal to the zero value.
   600  		if clearSchema {
   601  			paths = append(paths, "schema_settings")
   602  			pt.SchemaSettings = nil
   603  		}
   604  	}
   605  	if cfg.IngestionDataSourceSettings != nil {
   606  		pt.IngestionDataSourceSettings = cfg.IngestionDataSourceSettings.toProto()
   607  		paths = append(paths, "ingestion_data_source_settings")
   608  	}
   609  	return &pb.UpdateTopicRequest{
   610  		Topic:      pt,
   611  		UpdateMask: &fmpb.FieldMask{Paths: paths},
   612  	}
   613  }
   614  
   615  // Topics returns an iterator which returns all of the topics for the client's project.
   616  func (c *Client) Topics(ctx context.Context) *TopicIterator {
   617  	it := c.pubc.ListTopics(ctx, &pb.ListTopicsRequest{Project: c.fullyQualifiedProjectName()})
   618  	return &TopicIterator{
   619  		c:  c,
   620  		it: it,
   621  		next: func() (string, error) {
   622  			topic, err := it.Next()
   623  			if err != nil {
   624  				return "", err
   625  			}
   626  			return topic.Name, nil
   627  		},
   628  	}
   629  }
   630  
   631  // TopicIterator is an iterator that returns a series of topics.
   632  type TopicIterator struct {
   633  	c    *Client
   634  	it   *vkit.TopicIterator
   635  	next func() (string, error)
   636  }
   637  
   638  // Next returns the next topic. If there are no more topics, iterator.Done will be returned.
   639  func (tps *TopicIterator) Next() (*Topic, error) {
   640  	topicName, err := tps.next()
   641  	if err != nil {
   642  		return nil, err
   643  	}
   644  	return newTopic(tps.c, topicName), nil
   645  }
   646  
   647  // NextConfig returns the next topic config. If there are no more topics,
   648  // iterator.Done will be returned.
   649  // This call shares the underlying iterator with calls to `TopicIterator.Next`.
   650  // If you wish to use mix calls, create separate iterator instances for both.
   651  func (t *TopicIterator) NextConfig() (*TopicConfig, error) {
   652  	tpb, err := t.it.Next()
   653  	if err != nil {
   654  		return nil, err
   655  	}
   656  	cfg := protoToTopicConfig(tpb)
   657  	return &cfg, nil
   658  }
   659  
   660  // ID returns the unique identifier of the topic within its project.
   661  func (t *Topic) ID() string {
   662  	slash := strings.LastIndex(t.name, "/")
   663  	if slash == -1 {
   664  		// name is not a fully-qualified name.
   665  		panic("bad topic name")
   666  	}
   667  	return t.name[slash+1:]
   668  }
   669  
   670  // String returns the printable globally unique name for the topic.
   671  func (t *Topic) String() string {
   672  	return t.name
   673  }
   674  
   675  // Delete deletes the topic.
   676  func (t *Topic) Delete(ctx context.Context) error {
   677  	return t.c.pubc.DeleteTopic(ctx, &pb.DeleteTopicRequest{Topic: t.name})
   678  }
   679  
   680  // Exists reports whether the topic exists on the server.
   681  func (t *Topic) Exists(ctx context.Context) (bool, error) {
   682  	if t.name == "_deleted-topic_" {
   683  		return false, nil
   684  	}
   685  	_, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
   686  	if err == nil {
   687  		return true, nil
   688  	}
   689  	if status.Code(err) == codes.NotFound {
   690  		return false, nil
   691  	}
   692  	return false, err
   693  }
   694  
   695  // IAM returns the topic's IAM handle.
   696  func (t *Topic) IAM() *iam.Handle {
   697  	return iam.InternalNewHandle(t.c.pubc.Connection(), t.name)
   698  }
   699  
   700  // Subscriptions returns an iterator which returns the subscriptions for this topic.
   701  //
   702  // Some of the returned subscriptions may belong to a project other than t.
   703  func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator {
   704  	it := t.c.pubc.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{
   705  		Topic: t.name,
   706  	})
   707  	return &SubscriptionIterator{
   708  		c:    t.c,
   709  		next: it.Next,
   710  	}
   711  }
   712  
   713  // ErrTopicStopped indicates that topic has been stopped and further publishing will fail.
   714  var ErrTopicStopped = errors.New("pubsub: Stop has been called for this topic")
   715  
   716  // A PublishResult holds the result from a call to Publish.
   717  //
   718  // Call Get to obtain the result of the Publish call. Example:
   719  //
   720  //	// Get blocks until Publish completes or ctx is done.
   721  //	id, err := r.Get(ctx)
   722  //	if err != nil {
   723  //	    // TODO: Handle error.
   724  //	}
   725  type PublishResult = ipubsub.PublishResult
   726  
   727  var errTopicOrderingNotEnabled = errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")
   728  
   729  // Publish publishes msg to the topic asynchronously. Messages are batched and
   730  // sent according to the topic's PublishSettings. Publish never blocks.
   731  //
   732  // Publish returns a non-nil PublishResult which will be ready when the
   733  // message has been sent (or has failed to be sent) to the server.
   734  //
   735  // Publish creates goroutines for batching and sending messages. These goroutines
   736  // need to be stopped by calling t.Stop(). Once stopped, future calls to Publish
   737  // will immediately return a PublishResult with an error.
   738  func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
   739  	ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
   740  	if err != nil {
   741  		log.Printf("pubsub: cannot create context with tag in Publish: %v", err)
   742  	}
   743  
   744  	r := ipubsub.NewPublishResult()
   745  	if !t.EnableMessageOrdering && msg.OrderingKey != "" {
   746  		ipubsub.SetPublishResult(r, "", errTopicOrderingNotEnabled)
   747  		return r
   748  	}
   749  
   750  	// Calculate the size of the encoded proto message by accounting
   751  	// for the length of an individual PubSubMessage and Data/Attributes field.
   752  	msgSize := proto.Size(&pb.PubsubMessage{
   753  		Data:        msg.Data,
   754  		Attributes:  msg.Attributes,
   755  		OrderingKey: msg.OrderingKey,
   756  	})
   757  
   758  	t.initBundler()
   759  	t.mu.RLock()
   760  	defer t.mu.RUnlock()
   761  	if t.stopped {
   762  		ipubsub.SetPublishResult(r, "", ErrTopicStopped)
   763  		return r
   764  	}
   765  
   766  	if err := t.flowController.acquire(ctx, msgSize); err != nil {
   767  		t.scheduler.Pause(msg.OrderingKey)
   768  		ipubsub.SetPublishResult(r, "", err)
   769  		return r
   770  	}
   771  	err = t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize)
   772  	if err != nil {
   773  		t.scheduler.Pause(msg.OrderingKey)
   774  		ipubsub.SetPublishResult(r, "", err)
   775  	}
   776  	return r
   777  }
   778  
   779  // Stop sends all remaining published messages and stop goroutines created for handling
   780  // publishing. Returns once all outstanding messages have been sent or have
   781  // failed to be sent.
   782  func (t *Topic) Stop() {
   783  	t.mu.Lock()
   784  	noop := t.stopped || t.scheduler == nil
   785  	t.stopped = true
   786  	t.mu.Unlock()
   787  	if noop {
   788  		return
   789  	}
   790  	t.scheduler.FlushAndStop()
   791  }
   792  
   793  // Flush blocks until all remaining messages are sent.
   794  func (t *Topic) Flush() {
   795  	if t.stopped || t.scheduler == nil {
   796  		return
   797  	}
   798  	t.scheduler.Flush()
   799  }
   800  
   801  type bundledMessage struct {
   802  	msg  *Message
   803  	res  *PublishResult
   804  	size int
   805  }
   806  
   807  func (t *Topic) initBundler() {
   808  	t.mu.RLock()
   809  	noop := t.stopped || t.scheduler != nil
   810  	t.mu.RUnlock()
   811  	if noop {
   812  		return
   813  	}
   814  	t.mu.Lock()
   815  	defer t.mu.Unlock()
   816  	// Must re-check, since we released the lock.
   817  	if t.stopped || t.scheduler != nil {
   818  		return
   819  	}
   820  
   821  	timeout := t.PublishSettings.Timeout
   822  
   823  	workers := t.PublishSettings.NumGoroutines
   824  	// Unless overridden, allow many goroutines per CPU to call the Publish RPC
   825  	// concurrently. The default value was determined via extensive load
   826  	// testing (see the loadtest subdirectory).
   827  	if t.PublishSettings.NumGoroutines == 0 {
   828  		workers = 25 * runtime.GOMAXPROCS(0)
   829  	}
   830  
   831  	t.scheduler = scheduler.NewPublishScheduler(workers, func(bundle interface{}) {
   832  		// TODO(jba): use a context detached from the one passed to NewClient.
   833  		ctx := context.TODO()
   834  		if timeout != 0 {
   835  			var cancel func()
   836  			ctx, cancel = context.WithTimeout(ctx, timeout)
   837  			defer cancel()
   838  		}
   839  		t.publishMessageBundle(ctx, bundle.([]*bundledMessage))
   840  	})
   841  	t.scheduler.DelayThreshold = t.PublishSettings.DelayThreshold
   842  	t.scheduler.BundleCountThreshold = t.PublishSettings.CountThreshold
   843  	if t.scheduler.BundleCountThreshold > MaxPublishRequestCount {
   844  		t.scheduler.BundleCountThreshold = MaxPublishRequestCount
   845  	}
   846  	t.scheduler.BundleByteThreshold = t.PublishSettings.ByteThreshold
   847  
   848  	fcs := DefaultPublishSettings.FlowControlSettings
   849  	fcs.LimitExceededBehavior = t.PublishSettings.FlowControlSettings.LimitExceededBehavior
   850  	if t.PublishSettings.FlowControlSettings.MaxOutstandingBytes > 0 {
   851  		b := t.PublishSettings.FlowControlSettings.MaxOutstandingBytes
   852  		fcs.MaxOutstandingBytes = b
   853  
   854  		// If MaxOutstandingBytes is set, disable BufferedByteLimit by setting it to maxint.
   855  		// This is because there's no way to set "unlimited" for BufferedByteLimit,
   856  		// and simply setting it to MaxOutstandingBytes occasionally leads to issues where
   857  		// BufferedByteLimit is reached even though there are resources available.
   858  		t.PublishSettings.BufferedByteLimit = maxInt
   859  	}
   860  	if t.PublishSettings.FlowControlSettings.MaxOutstandingMessages > 0 {
   861  		fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages
   862  	}
   863  
   864  	t.flowController = newTopicFlowController(fcs)
   865  
   866  	bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit
   867  	if t.PublishSettings.BufferedByteLimit > 0 {
   868  		bufferedByteLimit = t.PublishSettings.BufferedByteLimit
   869  	}
   870  	t.scheduler.BufferedByteLimit = bufferedByteLimit
   871  
   872  	// Calculate the max limit of a single bundle. 5 comes from the number of bytes
   873  	// needed to be reserved for encoding the PubsubMessage repeated field.
   874  	t.scheduler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) - 5
   875  }
   876  
   877  // ErrPublishingPaused is a custom error indicating that the publish paused for the specified ordering key.
   878  type ErrPublishingPaused struct {
   879  	OrderingKey string
   880  }
   881  
   882  func (e ErrPublishingPaused) Error() string {
   883  	return fmt.Sprintf("pubsub: Publishing for ordering key, %s, paused due to previous error. Call topic.ResumePublish(orderingKey) before resuming publishing", e.OrderingKey)
   884  
   885  }
   886  
   887  func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) {
   888  	ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
   889  	if err != nil {
   890  		log.Printf("pubsub: cannot create context with tag in publishMessageBundle: %v", err)
   891  	}
   892  	pbMsgs := make([]*pb.PubsubMessage, len(bms))
   893  	var orderingKey string
   894  	batchSize := 0
   895  	for i, bm := range bms {
   896  		orderingKey = bm.msg.OrderingKey
   897  		pbMsgs[i] = &pb.PubsubMessage{
   898  			Data:        bm.msg.Data,
   899  			Attributes:  bm.msg.Attributes,
   900  			OrderingKey: bm.msg.OrderingKey,
   901  		}
   902  		batchSize = batchSize + proto.Size(pbMsgs[i])
   903  		bm.msg = nil // release bm.msg for GC
   904  	}
   905  	var res *pb.PublishResponse
   906  	start := time.Now()
   907  	if orderingKey != "" && t.scheduler.IsPaused(orderingKey) {
   908  		err = ErrPublishingPaused{OrderingKey: orderingKey}
   909  	} else {
   910  		// Apply custom publish retryer on top of user specified retryer and
   911  		// default retryer.
   912  		opts := t.c.pubc.CallOptions.Publish
   913  		var settings gax.CallSettings
   914  		for _, opt := range opts {
   915  			opt.Resolve(&settings)
   916  		}
   917  		r := &publishRetryer{defaultRetryer: settings.Retry()}
   918  		gaxOpts := []gax.CallOption{
   919  			gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
   920  			gax.WithRetry(func() gax.Retryer { return r }),
   921  		}
   922  		if t.PublishSettings.shouldCompress(batchSize) {
   923  			gaxOpts = append(gaxOpts, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
   924  		}
   925  		res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{
   926  			Topic:    t.name,
   927  			Messages: pbMsgs,
   928  		}, gaxOpts...)
   929  	}
   930  	end := time.Now()
   931  	if err != nil {
   932  		t.scheduler.Pause(orderingKey)
   933  		// Update context with error tag for OpenCensus,
   934  		// using same stats.Record() call as success case.
   935  		ctx, _ = tag.New(ctx, tag.Upsert(keyStatus, "ERROR"),
   936  			tag.Upsert(keyError, err.Error()))
   937  	}
   938  	stats.Record(ctx,
   939  		PublishLatency.M(float64(end.Sub(start)/time.Millisecond)),
   940  		PublishedMessages.M(int64(len(bms))))
   941  	for i, bm := range bms {
   942  		t.flowController.release(ctx, bm.size)
   943  		if err != nil {
   944  			ipubsub.SetPublishResult(bm.res, "", err)
   945  		} else {
   946  			ipubsub.SetPublishResult(bm.res, res.MessageIds[i], nil)
   947  		}
   948  	}
   949  }
   950  
   951  // ResumePublish resumes accepting messages for the provided ordering key.
   952  // Publishing using an ordering key might be paused if an error is
   953  // encountered while publishing, to prevent messages from being published
   954  // out of order.
   955  func (t *Topic) ResumePublish(orderingKey string) {
   956  	t.mu.RLock()
   957  	noop := t.scheduler == nil
   958  	t.mu.RUnlock()
   959  	if noop {
   960  		return
   961  	}
   962  
   963  	t.scheduler.Resume(orderingKey)
   964  }
   965  

View as plain text