...

Source file src/cloud.google.com/go/pubsub/subscription.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2016 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"io"
    22  	"strings"
    23  	"sync"
    24  	"time"
    25  
    26  	"cloud.google.com/go/iam"
    27  	"cloud.google.com/go/internal/optional"
    28  	pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
    29  	"cloud.google.com/go/pubsub/internal/scheduler"
    30  	gax "github.com/googleapis/gax-go/v2"
    31  	"golang.org/x/sync/errgroup"
    32  	"google.golang.org/grpc/codes"
    33  	"google.golang.org/grpc/status"
    34  	"google.golang.org/protobuf/types/known/durationpb"
    35  	durpb "google.golang.org/protobuf/types/known/durationpb"
    36  	fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"
    37  
    38  	vkit "cloud.google.com/go/pubsub/apiv1"
    39  )
    40  
    41  // Subscription is a reference to a PubSub subscription.
    42  type Subscription struct {
    43  	c *Client
    44  
    45  	// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
    46  	name string
    47  
    48  	// Settings for pulling messages. Configure these before calling Receive.
    49  	ReceiveSettings ReceiveSettings
    50  
    51  	mu            sync.Mutex
    52  	receiveActive bool
    53  }
    54  
    55  // Subscription creates a reference to a subscription.
    56  func (c *Client) Subscription(id string) *Subscription {
    57  	return c.SubscriptionInProject(id, c.projectID)
    58  }
    59  
    60  // SubscriptionInProject creates a reference to a subscription in a given project.
    61  func (c *Client) SubscriptionInProject(id, projectID string) *Subscription {
    62  	return &Subscription{
    63  		c:    c,
    64  		name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id),
    65  	}
    66  }
    67  
    68  // String returns the globally unique printable name of the subscription.
    69  func (s *Subscription) String() string {
    70  	return s.name
    71  }
    72  
    73  // ID returns the unique identifier of the subscription within its project.
    74  func (s *Subscription) ID() string {
    75  	slash := strings.LastIndex(s.name, "/")
    76  	if slash == -1 {
    77  		// name is not a fully-qualified name.
    78  		panic("bad subscription name")
    79  	}
    80  	return s.name[slash+1:]
    81  }
    82  
    83  // Subscriptions returns an iterator which returns all of the subscriptions for the client's project.
    84  func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator {
    85  	it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{
    86  		Project: c.fullyQualifiedProjectName(),
    87  	})
    88  	return &SubscriptionIterator{
    89  		c:  c,
    90  		it: it,
    91  		next: func() (string, error) {
    92  			sub, err := it.Next()
    93  			if err != nil {
    94  				return "", err
    95  			}
    96  			return sub.Name, nil
    97  		},
    98  	}
    99  }
   100  
   101  // SubscriptionIterator is an iterator that returns a series of subscriptions.
   102  type SubscriptionIterator struct {
   103  	c    *Client
   104  	it   *vkit.SubscriptionIterator
   105  	next func() (string, error)
   106  }
   107  
   108  // Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned.
   109  func (subs *SubscriptionIterator) Next() (*Subscription, error) {
   110  	subName, err := subs.next()
   111  	if err != nil {
   112  		return nil, err
   113  	}
   114  	return &Subscription{c: subs.c, name: subName}, nil
   115  }
   116  
   117  // NextConfig returns the next subscription config. If there are no more subscriptions,
   118  // iterator.Done will be returned.
   119  // This call shares the underlying iterator with calls to `SubscriptionIterator.Next`.
   120  // If you wish to use mix calls, create separate iterator instances for both.
   121  func (subs *SubscriptionIterator) NextConfig() (*SubscriptionConfig, error) {
   122  	spb, err := subs.it.Next()
   123  	if err != nil {
   124  		return nil, err
   125  	}
   126  	cfg, err := protoToSubscriptionConfig(spb, subs.c)
   127  	if err != nil {
   128  		return nil, err
   129  	}
   130  	return &cfg, nil
   131  }
   132  
   133  // PushConfig contains configuration for subscriptions that operate in push mode.
   134  type PushConfig struct {
   135  	// A URL locating the endpoint to which messages should be pushed.
   136  	Endpoint string
   137  
   138  	// Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details.
   139  	Attributes map[string]string
   140  
   141  	// AuthenticationMethod is used by push endpoints to verify the source
   142  	// of push requests.
   143  	// It can be used with push endpoints that are private by default to
   144  	// allow requests only from the Cloud Pub/Sub system, for example.
   145  	// This field is optional and should be set only by users interested in
   146  	// authenticated push.
   147  	AuthenticationMethod AuthenticationMethod
   148  
   149  	// The format of the delivered message to the push endpoint is defined by
   150  	// the chosen wrapper. When unset, `PubsubWrapper` is used.
   151  	Wrapper Wrapper
   152  }
   153  
   154  func (pc *PushConfig) toProto() *pb.PushConfig {
   155  	if pc == nil {
   156  		return nil
   157  	}
   158  	pbCfg := &pb.PushConfig{
   159  		Attributes:   pc.Attributes,
   160  		PushEndpoint: pc.Endpoint,
   161  	}
   162  	if authMethod := pc.AuthenticationMethod; authMethod != nil {
   163  		switch am := authMethod.(type) {
   164  		case *OIDCToken:
   165  			pbCfg.AuthenticationMethod = am.toProto()
   166  		default: // TODO: add others here when GAIC adds more definitions.
   167  		}
   168  	}
   169  	if w := pc.Wrapper; w != nil {
   170  		switch wt := w.(type) {
   171  		case *PubsubWrapper:
   172  			pbCfg.Wrapper = wt.toProto()
   173  		case *NoWrapper:
   174  			pbCfg.Wrapper = wt.toProto()
   175  		default:
   176  		}
   177  	}
   178  	return pbCfg
   179  }
   180  
   181  // AuthenticationMethod is used by push subscriptions to verify the source of push requests.
   182  type AuthenticationMethod interface {
   183  	isAuthMethod() bool
   184  }
   185  
   186  // OIDCToken allows PushConfigs to be authenticated using
   187  // the OpenID Connect protocol https://openid.net/connect/
   188  type OIDCToken struct {
   189  	// Audience to be used when generating OIDC token. The audience claim
   190  	// identifies the recipients that the JWT is intended for. The audience
   191  	// value is a single case-sensitive string. Having multiple values (array)
   192  	// for the audience field is not supported. More info about the OIDC JWT
   193  	// token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3
   194  	// Note: if not specified, the Push endpoint URL will be used.
   195  	Audience string
   196  
   197  	// The service account email to be used for generating the OpenID Connect token.
   198  	// The caller of:
   199  	//  * CreateSubscription
   200  	//  * UpdateSubscription
   201  	//  * ModifyPushConfig
   202  	// calls must have the iam.serviceAccounts.actAs permission for the service account.
   203  	// See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles.
   204  	ServiceAccountEmail string
   205  }
   206  
   207  var _ AuthenticationMethod = (*OIDCToken)(nil)
   208  
   209  func (oidcToken *OIDCToken) isAuthMethod() bool { return true }
   210  
   211  func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ {
   212  	if oidcToken == nil {
   213  		return nil
   214  	}
   215  	return &pb.PushConfig_OidcToken_{
   216  		OidcToken: &pb.PushConfig_OidcToken{
   217  			Audience:            oidcToken.Audience,
   218  			ServiceAccountEmail: oidcToken.ServiceAccountEmail,
   219  		},
   220  	}
   221  }
   222  
   223  // Wrapper defines the format of message delivered to push endpoints.
   224  type Wrapper interface {
   225  	isWrapper() bool
   226  }
   227  
   228  // PubsubWrapper denotes sending the payload to the push endpoint in the form of the JSON
   229  // representation of a PubsubMessage
   230  // (https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage).
   231  type PubsubWrapper struct{}
   232  
   233  var _ Wrapper = (*PubsubWrapper)(nil)
   234  
   235  func (p *PubsubWrapper) isWrapper() bool { return true }
   236  
   237  func (p *PubsubWrapper) toProto() *pb.PushConfig_PubsubWrapper_ {
   238  	if p == nil {
   239  		return nil
   240  	}
   241  	return &pb.PushConfig_PubsubWrapper_{
   242  		PubsubWrapper: &pb.PushConfig_PubsubWrapper{},
   243  	}
   244  }
   245  
   246  // NoWrapper denotes not wrapping the payload sent to the push endpoint.
   247  type NoWrapper struct {
   248  	WriteMetadata bool
   249  }
   250  
   251  var _ Wrapper = (*NoWrapper)(nil)
   252  
   253  func (n *NoWrapper) isWrapper() bool { return true }
   254  
   255  func (n *NoWrapper) toProto() *pb.PushConfig_NoWrapper_ {
   256  	if n == nil {
   257  		return nil
   258  	}
   259  	return &pb.PushConfig_NoWrapper_{
   260  		NoWrapper: &pb.PushConfig_NoWrapper{
   261  			WriteMetadata: n.WriteMetadata,
   262  		},
   263  	}
   264  }
   265  
   266  // BigQueryConfigState denotes the possible states for a BigQuery Subscription.
   267  type BigQueryConfigState int
   268  
   269  const (
   270  	// BigQueryConfigStateUnspecified is the default value. This value is unused.
   271  	BigQueryConfigStateUnspecified = iota
   272  
   273  	// BigQueryConfigActive means the subscription can actively send messages to BigQuery.
   274  	BigQueryConfigActive
   275  
   276  	// BigQueryConfigPermissionDenied means the subscription cannot write to the BigQuery table because of permission denied errors.
   277  	BigQueryConfigPermissionDenied
   278  
   279  	// BigQueryConfigNotFound means the subscription cannot write to the BigQuery table because it does not exist.
   280  	BigQueryConfigNotFound
   281  
   282  	// BigQueryConfigSchemaMismatch means the subscription cannot write to the BigQuery table due to a schema mismatch.
   283  	BigQueryConfigSchemaMismatch
   284  )
   285  
   286  // BigQueryConfig configures the subscription to deliver to a BigQuery table.
   287  type BigQueryConfig struct {
   288  	// The name of the table to which to write data, of the form
   289  	// {projectId}:{datasetId}.{tableId}
   290  	Table string
   291  
   292  	// When true, use the topic's schema as the columns to write to in BigQuery,
   293  	// if it exists.
   294  	UseTopicSchema bool
   295  
   296  	// When true, write the subscription name, message_id, publish_time,
   297  	// attributes, and ordering_key to additional columns in the table. The
   298  	// subscription name, message_id, and publish_time fields are put in their own
   299  	// columns while all other message properties (other than data) are written to
   300  	// a JSON object in the attributes column.
   301  	WriteMetadata bool
   302  
   303  	// When true and use_topic_schema is true, any fields that are a part of the
   304  	// topic schema that are not part of the BigQuery table schema are dropped
   305  	// when writing to BigQuery. Otherwise, the schemas must be kept in sync and
   306  	// any messages with extra fields are not written and remain in the
   307  	// subscription's backlog.
   308  	DropUnknownFields bool
   309  
   310  	// This is an output-only field that indicates whether or not the subscription can
   311  	// receive messages. This field is set only in responses from the server;
   312  	// it is ignored if it is set in any requests.
   313  	State BigQueryConfigState
   314  }
   315  
   316  func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig {
   317  	if bc == nil {
   318  		return nil
   319  	}
   320  	// If the config is zero valued, this is the sentinel for
   321  	// clearing bigquery config and switch back to pull.
   322  	if *bc == (BigQueryConfig{}) {
   323  		return nil
   324  	}
   325  	pbCfg := &pb.BigQueryConfig{
   326  		Table:             bc.Table,
   327  		UseTopicSchema:    bc.UseTopicSchema,
   328  		WriteMetadata:     bc.WriteMetadata,
   329  		DropUnknownFields: bc.DropUnknownFields,
   330  		State:             pb.BigQueryConfig_State(bc.State),
   331  	}
   332  	return pbCfg
   333  }
   334  
   335  // CloudStorageConfigState denotes the possible states for a Cloud Storage Subscription.
   336  type CloudStorageConfigState int
   337  
   338  const (
   339  	// CloudStorageConfigStateUnspecified is the default value. This value is unused.
   340  	CloudStorageConfigStateUnspecified = iota
   341  
   342  	// CloudStorageConfigActive means the subscription can actively send messages to Cloud Storage.
   343  	CloudStorageConfigActive
   344  
   345  	// CloudStorageConfigPermissionDenied means the subscription cannot write to the Cloud storage bucket because of permission denied errors.
   346  	CloudStorageConfigPermissionDenied
   347  
   348  	// CloudStorageConfigNotFound means the subscription cannot write to the Cloud Storage bucket because it does not exist.
   349  	CloudStorageConfigNotFound
   350  )
   351  
   352  // Configuration options for how to write the message data to Cloud Storage.
   353  type isCloudStorageOutputFormat interface {
   354  	isCloudStorageOutputFormat()
   355  }
   356  
   357  // CloudStorageOutputFormatTextConfig is the configuration for writing
   358  // message data in text format. Message payloads will be written to files
   359  // as raw text, separated by a newline.
   360  type CloudStorageOutputFormatTextConfig struct{}
   361  
   362  // CloudStorageOutputFormatAvroConfig is the configuration for writing
   363  // message data in Avro format. Message payloads and metadata will be written
   364  // to the files as an Avro binary.
   365  type CloudStorageOutputFormatAvroConfig struct {
   366  	// When true, write the subscription name, message_id, publish_time,
   367  	// attributes, and ordering_key as additional fields in the output.
   368  	WriteMetadata bool
   369  }
   370  
   371  func (*CloudStorageOutputFormatTextConfig) isCloudStorageOutputFormat() {}
   372  
   373  func (*CloudStorageOutputFormatAvroConfig) isCloudStorageOutputFormat() {}
   374  
   375  // CloudStorageConfig configures the subscription to deliver to Cloud Storage.
   376  type CloudStorageConfig struct {
   377  	// User-provided name for the Cloud Storage bucket.
   378  	// The bucket must be created by the user. The bucket name must be without
   379  	// any prefix like "gs://". See the [bucket naming
   380  	// requirements] (https://cloud.google.com/storage/docs/buckets#naming).
   381  	Bucket string
   382  
   383  	// User-provided prefix for Cloud Storage filename. See the [object naming
   384  	// requirements](https://cloud.google.com/storage/docs/objects#naming).
   385  	FilenamePrefix string
   386  
   387  	// User-provided suffix for Cloud Storage filename. See the [object naming
   388  	// requirements](https://cloud.google.com/storage/docs/objects#naming).
   389  	FilenameSuffix string
   390  
   391  	// Configuration for how to write message data. Options are
   392  	// CloudStorageOutputFormat_TextConfig and CloudStorageOutputFormat_AvroConfig.
   393  	// Defaults to text format.
   394  	OutputFormat isCloudStorageOutputFormat
   395  
   396  	// The maximum duration that can elapse before a new Cloud Storage file is
   397  	// created. Min 1 minute, max 10 minutes, default 5 minutes. May not exceed
   398  	// the subscription's acknowledgement deadline.
   399  	MaxDuration optional.Duration
   400  
   401  	// The maximum bytes that can be written to a Cloud Storage file before a new
   402  	// file is created. Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded
   403  	// in cases where messages are larger than the limit.
   404  	MaxBytes int64
   405  
   406  	// Output only. An output-only field that indicates whether or not the
   407  	// subscription can receive messages.
   408  	State CloudStorageConfigState
   409  }
   410  
   411  func (cs *CloudStorageConfig) toProto() *pb.CloudStorageConfig {
   412  	if cs == nil {
   413  		return nil
   414  	}
   415  	// For the purposes of the live service, an empty/zero-valued config
   416  	// is treated the same as nil and clearing this setting.
   417  	if (CloudStorageConfig{}) == *cs {
   418  		return nil
   419  	}
   420  	var dur *durationpb.Duration
   421  	if cs.MaxDuration != nil {
   422  		dur = durationpb.New(optional.ToDuration(cs.MaxDuration))
   423  	}
   424  	pbCfg := &pb.CloudStorageConfig{
   425  		Bucket:         cs.Bucket,
   426  		FilenamePrefix: cs.FilenamePrefix,
   427  		FilenameSuffix: cs.FilenameSuffix,
   428  		MaxDuration:    dur,
   429  		MaxBytes:       cs.MaxBytes,
   430  		State:          pb.CloudStorageConfig_State(cs.State),
   431  	}
   432  	if out := cs.OutputFormat; out != nil {
   433  		if _, ok := out.(*CloudStorageOutputFormatTextConfig); ok {
   434  			pbCfg.OutputFormat = &pb.CloudStorageConfig_TextConfig_{}
   435  		} else if cfg, ok := out.(*CloudStorageOutputFormatAvroConfig); ok {
   436  			pbCfg.OutputFormat = &pb.CloudStorageConfig_AvroConfig_{
   437  				AvroConfig: &pb.CloudStorageConfig_AvroConfig{
   438  					WriteMetadata: cfg.WriteMetadata,
   439  				},
   440  			}
   441  		}
   442  	}
   443  	return pbCfg
   444  }
   445  
   446  // SubscriptionState denotes the possible states for a Subscription.
   447  type SubscriptionState int
   448  
   449  const (
   450  	// SubscriptionStateUnspecified is the default value. This value is unused.
   451  	SubscriptionStateUnspecified = iota
   452  
   453  	// SubscriptionStateActive means the subscription can actively send messages to BigQuery.
   454  	SubscriptionStateActive
   455  
   456  	// SubscriptionStateResourceError means the subscription receive messages because of an
   457  	// error with the resource to which it pushes messages.
   458  	// See the more detailed error state in the corresponding configuration.
   459  	SubscriptionStateResourceError
   460  )
   461  
   462  // SubscriptionConfig describes the configuration of a subscription. If none of
   463  // PushConfig, BigQueryConfig, or CloudStorageConfig is set, then the subscriber will
   464  // pull and ack messages using API methods. At most one of these fields may be set.
   465  type SubscriptionConfig struct {
   466  	// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
   467  	name string
   468  
   469  	// The topic from which this subscription is receiving messages.
   470  	Topic *Topic
   471  
   472  	// If push delivery is used with this subscription, this field is
   473  	// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
   474  	// or `CloudStorageConfig` can be set. If all are empty, then the
   475  	// subscriber will pull and ack messages using API methods.
   476  	PushConfig PushConfig
   477  
   478  	// If delivery to BigQuery is used with this subscription, this field is
   479  	// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
   480  	// or `CloudStorageConfig` can be set. If all are empty, then the
   481  	// subscriber will pull and ack messages using API methods.
   482  	BigQueryConfig BigQueryConfig
   483  
   484  	// If delivery to Cloud Storage is used with this subscription, this field is
   485  	// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
   486  	// or `CloudStorageConfig` can be set. If all are empty, then the
   487  	// subscriber will pull and ack messages using API methods.
   488  	CloudStorageConfig CloudStorageConfig
   489  
   490  	// The default maximum time after a subscriber receives a message before
   491  	// the subscriber should acknowledge the message. Note: messages which are
   492  	// obtained via Subscription.Receive need not be acknowledged within this
   493  	// deadline, as the deadline will be automatically extended.
   494  	AckDeadline time.Duration
   495  
   496  	// Whether to retain acknowledged messages. If true, acknowledged messages
   497  	// will not be expunged until they fall out of the RetentionDuration window.
   498  	RetainAckedMessages bool
   499  
   500  	// How long to retain messages in backlog, from the time of publish. If
   501  	// RetainAckedMessages is true, this duration affects the retention of
   502  	// acknowledged messages, otherwise only unacknowledged messages are retained.
   503  	// Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes.
   504  	RetentionDuration time.Duration
   505  
   506  	// Expiration policy specifies the conditions for a subscription's expiration.
   507  	// A subscription is considered active as long as any connected subscriber is
   508  	// successfully consuming messages from the subscription or is issuing
   509  	// operations on the subscription. If `expiration_policy` is not set, a
   510  	// *default policy* with `ttl` of 31 days will be used. The minimum allowed
   511  	// value for `expiration_policy.ttl` is 1 day.
   512  	//
   513  	// Use time.Duration(0) to indicate that the subscription should never expire.
   514  	ExpirationPolicy optional.Duration
   515  
   516  	// The set of labels for the subscription.
   517  	Labels map[string]string
   518  
   519  	// EnableMessageOrdering enables message ordering on this subscription.
   520  	// This value is only used for subscription creation and update, and
   521  	// is not read locally in calls like Subscription.Receive().
   522  	//
   523  	// If set to false, even if messages are published with ordering keys,
   524  	// messages will not be delivered in order.
   525  	//
   526  	// When calling Subscription.Receive(), the client will check this
   527  	// value with a call to Subscription.Config(), which requires the
   528  	// roles/viewer or roles/pubsub.viewer role on your service account.
   529  	// If that call fails, mesages with ordering keys will be delivered in order.
   530  	EnableMessageOrdering bool
   531  
   532  	// DeadLetterPolicy specifies the conditions for dead lettering messages in
   533  	// a subscription. If not set, dead lettering is disabled.
   534  	DeadLetterPolicy *DeadLetterPolicy
   535  
   536  	// Filter is an expression written in the Cloud Pub/Sub filter language. If
   537  	// non-empty, then only `PubsubMessage`s whose `attributes` field matches the
   538  	// filter are delivered on this subscription. If empty, then no messages are
   539  	// filtered out. Cannot be changed after the subscription is created.
   540  	Filter string
   541  
   542  	// RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
   543  	RetryPolicy *RetryPolicy
   544  
   545  	// Detached indicates whether the subscription is detached from its topic.
   546  	// Detached subscriptions don't receive messages from their topic and don't
   547  	// retain any backlog. `Pull` and `StreamingPull` requests will return
   548  	// FAILED_PRECONDITION. If the subscription is a push subscription, pushes to
   549  	// the endpoint will not be made.
   550  	Detached bool
   551  
   552  	// TopicMessageRetentionDuration indicates the minimum duration for which a message is
   553  	// retained after it is published to the subscription's topic. If this field is
   554  	// set, messages published to the subscription's topic in the last
   555  	// `TopicMessageRetentionDuration` are always available to subscribers.
   556  	// You can enable both topic and subscription retention for the same topic.
   557  	// In this situation, the maximum of the retention durations takes effect.
   558  	//
   559  	// This is an output only field, meaning it will only appear in responses from the backend
   560  	// and will be ignored if sent in a request.
   561  	TopicMessageRetentionDuration time.Duration
   562  
   563  	// EnableExactlyOnceDelivery configures Pub/Sub to provide the following guarantees
   564  	// for the delivery of a message with a given MessageID on this subscription:
   565  	//
   566  	// The message sent to a subscriber is guaranteed not to be resent
   567  	// before the message's acknowledgement deadline expires.
   568  	// An acknowledged message will not be resent to a subscriber.
   569  	//
   570  	// Note that subscribers may still receive multiple copies of a message
   571  	// when `enable_exactly_once_delivery` is true if the message was published
   572  	// multiple times by a publisher client. These copies are considered distinct
   573  	// by Pub/Sub and have distinct MessageID values.
   574  	//
   575  	// Lastly, to guarantee messages have been acked or nacked properly, you must
   576  	// call Message.AckWithResult() or Message.NackWithResult(). These return an
   577  	// AckResult which will be ready if the message has been acked (or failed to be acked).
   578  	EnableExactlyOnceDelivery bool
   579  
   580  	// State indicates whether or not the subscription can receive messages.
   581  	// This is an output-only field that indicates whether or not the subscription can
   582  	// receive messages. This field is set only in responses from the server;
   583  	// it is ignored if it is set in any requests.
   584  	State SubscriptionState
   585  }
   586  
   587  // String returns the globally unique printable name of the subscription config.
   588  // This method only works when the subscription config is returned from the server,
   589  // such as when calling `client.Subscription` or `client.Subscriptions`.
   590  // Otherwise, this will return an empty string.
   591  func (s *SubscriptionConfig) String() string {
   592  	return s.name
   593  }
   594  
   595  // ID returns the unique identifier of the subscription within its project.
   596  // This method only works when the subscription config is returned from the server,
   597  // such as when calling `client.Subscription` or `client.Subscriptions`.
   598  // Otherwise, this will return an empty string.
   599  func (s *SubscriptionConfig) ID() string {
   600  	slash := strings.LastIndex(s.name, "/")
   601  	if slash == -1 {
   602  		return ""
   603  	}
   604  	return s.name[slash+1:]
   605  }
   606  
   607  func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
   608  	var pbPushConfig *pb.PushConfig
   609  	if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil {
   610  		pbPushConfig = cfg.PushConfig.toProto()
   611  	}
   612  	pbBigQueryConfig := cfg.BigQueryConfig.toProto()
   613  	pbCloudStorageConfig := cfg.CloudStorageConfig.toProto()
   614  	var retentionDuration *durpb.Duration
   615  	if cfg.RetentionDuration != 0 {
   616  		retentionDuration = durpb.New(cfg.RetentionDuration)
   617  	}
   618  	var pbDeadLetter *pb.DeadLetterPolicy
   619  	if cfg.DeadLetterPolicy != nil {
   620  		pbDeadLetter = cfg.DeadLetterPolicy.toProto()
   621  	}
   622  	var pbRetryPolicy *pb.RetryPolicy
   623  	if cfg.RetryPolicy != nil {
   624  		pbRetryPolicy = cfg.RetryPolicy.toProto()
   625  	}
   626  	return &pb.Subscription{
   627  		Name:                      name,
   628  		Topic:                     cfg.Topic.name,
   629  		PushConfig:                pbPushConfig,
   630  		BigqueryConfig:            pbBigQueryConfig,
   631  		CloudStorageConfig:        pbCloudStorageConfig,
   632  		AckDeadlineSeconds:        trunc32(int64(cfg.AckDeadline.Seconds())),
   633  		RetainAckedMessages:       cfg.RetainAckedMessages,
   634  		MessageRetentionDuration:  retentionDuration,
   635  		Labels:                    cfg.Labels,
   636  		ExpirationPolicy:          expirationPolicyToProto(cfg.ExpirationPolicy),
   637  		EnableMessageOrdering:     cfg.EnableMessageOrdering,
   638  		DeadLetterPolicy:          pbDeadLetter,
   639  		Filter:                    cfg.Filter,
   640  		RetryPolicy:               pbRetryPolicy,
   641  		Detached:                  cfg.Detached,
   642  		EnableExactlyOnceDelivery: cfg.EnableExactlyOnceDelivery,
   643  	}
   644  }
   645  
   646  func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) {
   647  	rd := time.Hour * 24 * 7
   648  	if pbSub.MessageRetentionDuration != nil {
   649  		rd = pbSub.MessageRetentionDuration.AsDuration()
   650  	}
   651  	var expirationPolicy time.Duration
   652  	if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil {
   653  		expirationPolicy = ttl.AsDuration()
   654  	}
   655  	dlp := protoToDLP(pbSub.DeadLetterPolicy)
   656  	rp := protoToRetryPolicy(pbSub.RetryPolicy)
   657  	subC := SubscriptionConfig{
   658  		name:                          pbSub.Name,
   659  		Topic:                         newTopic(c, pbSub.Topic),
   660  		AckDeadline:                   time.Second * time.Duration(pbSub.AckDeadlineSeconds),
   661  		RetainAckedMessages:           pbSub.RetainAckedMessages,
   662  		RetentionDuration:             rd,
   663  		Labels:                        pbSub.Labels,
   664  		ExpirationPolicy:              expirationPolicy,
   665  		EnableMessageOrdering:         pbSub.EnableMessageOrdering,
   666  		DeadLetterPolicy:              dlp,
   667  		Filter:                        pbSub.Filter,
   668  		RetryPolicy:                   rp,
   669  		Detached:                      pbSub.Detached,
   670  		TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(),
   671  		EnableExactlyOnceDelivery:     pbSub.EnableExactlyOnceDelivery,
   672  		State:                         SubscriptionState(pbSub.State),
   673  	}
   674  	if pc := protoToPushConfig(pbSub.PushConfig); pc != nil {
   675  		subC.PushConfig = *pc
   676  	}
   677  	if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil {
   678  		subC.BigQueryConfig = *bq
   679  	}
   680  	if cs := protoToStorageConfig(pbSub.GetCloudStorageConfig()); cs != nil {
   681  		subC.CloudStorageConfig = *cs
   682  	}
   683  	return subC, nil
   684  }
   685  
   686  func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig {
   687  	if pbPc == nil {
   688  		return nil
   689  	}
   690  	pc := &PushConfig{
   691  		Endpoint:   pbPc.PushEndpoint,
   692  		Attributes: pbPc.Attributes,
   693  	}
   694  	if am := pbPc.AuthenticationMethod; am != nil {
   695  		if oidcToken, ok := am.(*pb.PushConfig_OidcToken_); ok && oidcToken != nil && oidcToken.OidcToken != nil {
   696  			pc.AuthenticationMethod = &OIDCToken{
   697  				Audience:            oidcToken.OidcToken.GetAudience(),
   698  				ServiceAccountEmail: oidcToken.OidcToken.GetServiceAccountEmail(),
   699  			}
   700  		}
   701  	}
   702  	if w := pbPc.Wrapper; w != nil {
   703  		switch wt := w.(type) {
   704  		case *pb.PushConfig_PubsubWrapper_:
   705  			pc.Wrapper = &PubsubWrapper{}
   706  		case *pb.PushConfig_NoWrapper_:
   707  			pc.Wrapper = &NoWrapper{
   708  				WriteMetadata: wt.NoWrapper.WriteMetadata,
   709  			}
   710  		}
   711  	}
   712  	return pc
   713  }
   714  
   715  func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig {
   716  	if pbBQ == nil {
   717  		return nil
   718  	}
   719  	bq := &BigQueryConfig{
   720  		Table:             pbBQ.GetTable(),
   721  		UseTopicSchema:    pbBQ.GetUseTopicSchema(),
   722  		DropUnknownFields: pbBQ.GetDropUnknownFields(),
   723  		WriteMetadata:     pbBQ.GetWriteMetadata(),
   724  		State:             BigQueryConfigState(pbBQ.State),
   725  	}
   726  	return bq
   727  }
   728  
   729  func protoToStorageConfig(pbCSC *pb.CloudStorageConfig) *CloudStorageConfig {
   730  	if pbCSC == nil {
   731  		return nil
   732  	}
   733  
   734  	csc := &CloudStorageConfig{
   735  		Bucket:         pbCSC.GetBucket(),
   736  		FilenamePrefix: pbCSC.GetFilenamePrefix(),
   737  		FilenameSuffix: pbCSC.GetFilenameSuffix(),
   738  		MaxBytes:       pbCSC.GetMaxBytes(),
   739  		State:          CloudStorageConfigState(pbCSC.GetState()),
   740  	}
   741  	if dur := pbCSC.GetMaxDuration().AsDuration(); dur != 0 {
   742  		csc.MaxDuration = dur
   743  	}
   744  	if out := pbCSC.OutputFormat; out != nil {
   745  		if _, ok := out.(*pb.CloudStorageConfig_TextConfig_); ok {
   746  			csc.OutputFormat = &CloudStorageOutputFormatTextConfig{}
   747  		} else if cfg, ok := out.(*pb.CloudStorageConfig_AvroConfig_); ok {
   748  			csc.OutputFormat = &CloudStorageOutputFormatAvroConfig{WriteMetadata: cfg.AvroConfig.GetWriteMetadata()}
   749  		}
   750  	}
   751  	return csc
   752  }
   753  
   754  // DeadLetterPolicy specifies the conditions for dead lettering messages in
   755  // a subscription.
   756  type DeadLetterPolicy struct {
   757  	DeadLetterTopic     string
   758  	MaxDeliveryAttempts int
   759  }
   760  
   761  func (dlp *DeadLetterPolicy) toProto() *pb.DeadLetterPolicy {
   762  	if dlp == nil || dlp.DeadLetterTopic == "" {
   763  		return nil
   764  	}
   765  	return &pb.DeadLetterPolicy{
   766  		DeadLetterTopic:     dlp.DeadLetterTopic,
   767  		MaxDeliveryAttempts: int32(dlp.MaxDeliveryAttempts),
   768  	}
   769  }
   770  func protoToDLP(pbDLP *pb.DeadLetterPolicy) *DeadLetterPolicy {
   771  	if pbDLP == nil {
   772  		return nil
   773  	}
   774  	return &DeadLetterPolicy{
   775  		DeadLetterTopic:     pbDLP.GetDeadLetterTopic(),
   776  		MaxDeliveryAttempts: int(pbDLP.MaxDeliveryAttempts),
   777  	}
   778  }
   779  
   780  // RetryPolicy specifies how Cloud Pub/Sub retries message delivery.
   781  //
   782  // Retry delay will be exponential based on provided minimum and maximum
   783  // backoffs. https://en.wikipedia.org/wiki/Exponential_backoff.
   784  //
   785  // RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded
   786  // events for a given message.
   787  //
   788  // Retry Policy is implemented on a best effort basis. At times, the delay
   789  // between consecutive deliveries may not match the configuration. That is,
   790  // delay can be more or less than configured backoff.
   791  type RetryPolicy struct {
   792  	// MinimumBackoff is the minimum delay between consecutive deliveries of a
   793  	// given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds.
   794  	MinimumBackoff optional.Duration
   795  	// MaximumBackoff is the maximum delay between consecutive deliveries of a
   796  	// given message. Value should be between 0 and 600 seconds. Defaults to 600 seconds.
   797  	MaximumBackoff optional.Duration
   798  }
   799  
   800  func (rp *RetryPolicy) toProto() *pb.RetryPolicy {
   801  	if rp == nil {
   802  		return nil
   803  	}
   804  	// If RetryPolicy is the empty struct, take this as an instruction
   805  	// to remove RetryPolicy from the subscription.
   806  	if rp.MinimumBackoff == nil && rp.MaximumBackoff == nil {
   807  		return nil
   808  	}
   809  
   810  	// Initialize minDur and maxDur to be negative, such that if the conversion from an
   811  	// optional fails, RetryPolicy won't be updated in the proto as it will remain nil.
   812  	var minDur time.Duration = -1
   813  	var maxDur time.Duration = -1
   814  	if rp.MinimumBackoff != nil {
   815  		minDur = optional.ToDuration(rp.MinimumBackoff)
   816  	}
   817  	if rp.MaximumBackoff != nil {
   818  		maxDur = optional.ToDuration(rp.MaximumBackoff)
   819  	}
   820  
   821  	var minDurPB, maxDurPB *durpb.Duration
   822  	if minDur > 0 {
   823  		minDurPB = durpb.New(minDur)
   824  	}
   825  	if maxDur > 0 {
   826  		maxDurPB = durpb.New(maxDur)
   827  	}
   828  
   829  	return &pb.RetryPolicy{
   830  		MinimumBackoff: minDurPB,
   831  		MaximumBackoff: maxDurPB,
   832  	}
   833  }
   834  
   835  func protoToRetryPolicy(rp *pb.RetryPolicy) *RetryPolicy {
   836  	if rp == nil {
   837  		return nil
   838  	}
   839  	var minBackoff, maxBackoff time.Duration
   840  	if rp.MinimumBackoff != nil {
   841  		minBackoff = rp.MinimumBackoff.AsDuration()
   842  	}
   843  	if rp.MaximumBackoff != nil {
   844  		maxBackoff = rp.MaximumBackoff.AsDuration()
   845  	}
   846  
   847  	retryPolicy := &RetryPolicy{
   848  		MinimumBackoff: minBackoff,
   849  		MaximumBackoff: maxBackoff,
   850  	}
   851  	return retryPolicy
   852  }
   853  
   854  // ReceiveSettings configure the Receive method.
   855  // A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings.
   856  type ReceiveSettings struct {
   857  	// MaxExtension is the maximum period for which the Subscription should
   858  	// automatically extend the ack deadline for each message.
   859  	//
   860  	// The Subscription will automatically extend the ack deadline of all
   861  	// fetched Messages up to the duration specified. Automatic deadline
   862  	// extension beyond the initial receipt may be disabled by specifying a
   863  	// duration less than 0.
   864  	MaxExtension time.Duration
   865  
   866  	// MaxExtensionPeriod is the maximum duration by which to extend the ack
   867  	// deadline at a time. The ack deadline will continue to be extended by up
   868  	// to this duration until MaxExtension is reached. Setting MaxExtensionPeriod
   869  	// bounds the maximum amount of time before a message redelivery in the
   870  	// event the subscriber fails to extend the deadline.
   871  	//
   872  	// MaxExtensionPeriod must be between 10s and 600s (inclusive). This configuration
   873  	// can be disabled by specifying a duration less than (or equal to) 0.
   874  	MaxExtensionPeriod time.Duration
   875  
   876  	// MinExtensionPeriod is the the min duration for a single lease extension attempt.
   877  	// By default the 99th percentile of ack latency is used to determine lease extension
   878  	// periods but this value can be set to minimize the number of extraneous RPCs sent.
   879  	//
   880  	// MinExtensionPeriod must be between 10s and 600s (inclusive). This configuration
   881  	// can be disabled by specifying a duration less than (or equal to) 0.
   882  	// Defaults to off but set to 60 seconds if the subscription has exactly-once delivery enabled,
   883  	// which will be added in a future release.
   884  	MinExtensionPeriod time.Duration
   885  
   886  	// MaxOutstandingMessages is the maximum number of unprocessed messages
   887  	// (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it
   888  	// will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages.
   889  	// If the value is negative, then there will be no limit on the number of
   890  	// unprocessed messages.
   891  	MaxOutstandingMessages int
   892  
   893  	// MaxOutstandingBytes is the maximum size of unprocessed messages
   894  	// (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will
   895  	// be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If
   896  	// the value is negative, then there will be no limit on the number of bytes
   897  	// for unprocessed messages.
   898  	MaxOutstandingBytes int
   899  
   900  	// UseLegacyFlowControl disables enforcing flow control settings at the Cloud
   901  	// PubSub server and the less accurate method of only enforcing flow control
   902  	// at the client side is used.
   903  	// The default is false.
   904  	UseLegacyFlowControl bool
   905  
   906  	// NumGoroutines sets the number of StreamingPull streams to pull messages
   907  	// from the subscription.
   908  	//
   909  	// NumGoroutines defaults to DefaultReceiveSettings.NumGoroutines.
   910  	//
   911  	// NumGoroutines does not limit the number of messages that can be processed
   912  	// concurrently. Even with one goroutine, many messages might be processed at
   913  	// once, because that goroutine may continually receive messages and invoke the
   914  	// function passed to Receive on them. To limit the number of messages being
   915  	// processed concurrently, set MaxOutstandingMessages.
   916  	NumGoroutines int
   917  
   918  	// Synchronous switches the underlying receiving mechanism to unary Pull.
   919  	// When Synchronous is false, the more performant StreamingPull is used.
   920  	// StreamingPull also has the benefit of subscriber affinity when using
   921  	// ordered delivery.
   922  	// When Synchronous is true, NumGoroutines is set to 1 and only one Pull
   923  	// RPC will be made to poll messages at a time.
   924  	// The default is false.
   925  	//
   926  	// Deprecated.
   927  	// Previously, users might use Synchronous mode since StreamingPull had a limitation
   928  	// where MaxOutstandingMessages was not always respected with large batches of
   929  	// small messages. With server side flow control, this is no longer an issue
   930  	// and we recommend switching to the default StreamingPull mode by setting
   931  	// Synchronous to false.
   932  	// Synchronous mode does not work with exactly once delivery.
   933  	Synchronous bool
   934  }
   935  
   936  // For synchronous receive, the time to wait if we are already processing
   937  // MaxOutstandingMessages. There is no point calling Pull and asking for zero
   938  // messages, so we pause to allow some message-processing callbacks to finish.
   939  //
   940  // The wait time is large enough to avoid consuming significant CPU, but
   941  // small enough to provide decent throughput. Users who want better
   942  // throughput should not be using synchronous mode.
   943  //
   944  // Waiting might seem like polling, so it's natural to think we could do better by
   945  // noticing when a callback is finished and immediately calling Pull. But if
   946  // callbacks finish in quick succession, this will result in frequent Pull RPCs that
   947  // request a single message, which wastes network bandwidth. Better to wait for a few
   948  // callbacks to finish, so we make fewer RPCs fetching more messages.
   949  //
   950  // This value is unexported so the user doesn't have another knob to think about. Note that
   951  // it is the same value as the one used for nackTicker, so it matches this client's
   952  // idea of a duration that is short, but not so short that we perform excessive RPCs.
   953  const synchronousWaitTime = 100 * time.Millisecond
   954  
   955  // DefaultReceiveSettings holds the default values for ReceiveSettings.
   956  var DefaultReceiveSettings = ReceiveSettings{
   957  	MaxExtension:           60 * time.Minute,
   958  	MaxExtensionPeriod:     0,
   959  	MinExtensionPeriod:     0,
   960  	MaxOutstandingMessages: 1000,
   961  	MaxOutstandingBytes:    1e9, // 1G
   962  	NumGoroutines:          10,
   963  }
   964  
   965  // Delete deletes the subscription.
   966  func (s *Subscription) Delete(ctx context.Context) error {
   967  	return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name})
   968  }
   969  
   970  // Exists reports whether the subscription exists on the server.
   971  func (s *Subscription) Exists(ctx context.Context) (bool, error) {
   972  	_, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
   973  	if err == nil {
   974  		return true, nil
   975  	}
   976  	if status.Code(err) == codes.NotFound {
   977  		return false, nil
   978  	}
   979  	return false, err
   980  }
   981  
   982  // Config fetches the current configuration for the subscription.
   983  func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {
   984  	pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name})
   985  	if err != nil {
   986  		return SubscriptionConfig{}, err
   987  	}
   988  	cfg, err := protoToSubscriptionConfig(pbSub, s.c)
   989  	if err != nil {
   990  		return SubscriptionConfig{}, err
   991  	}
   992  	return cfg, nil
   993  }
   994  
   995  // SubscriptionConfigToUpdate describes how to update a subscription.
   996  type SubscriptionConfigToUpdate struct {
   997  	// If non-nil, the push config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig
   998  	// can be set.
   999  	// If currently in push mode, set this value to the zero value to revert to a Pull based subscription.
  1000  	PushConfig *PushConfig
  1001  
  1002  	// If non-nil, the bigquery config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig
  1003  	// can be set.
  1004  	// If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription,
  1005  	BigQueryConfig *BigQueryConfig
  1006  
  1007  	// If non-nil, the Cloud Storage config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig
  1008  	// can be set.
  1009  	// If currently in CloudStorage mode, set this value to the zero value to revert to a Pull based subscription,
  1010  	CloudStorageConfig *CloudStorageConfig
  1011  
  1012  	// If non-zero, the ack deadline is changed.
  1013  	AckDeadline time.Duration
  1014  
  1015  	// If set, RetainAckedMessages is changed.
  1016  	RetainAckedMessages optional.Bool
  1017  
  1018  	// If non-zero, RetentionDuration is changed.
  1019  	RetentionDuration time.Duration
  1020  
  1021  	// If non-zero, Expiration is changed.
  1022  	ExpirationPolicy optional.Duration
  1023  
  1024  	// If non-nil, DeadLetterPolicy is changed. To remove dead lettering from
  1025  	// a subscription, use the zero value for this struct.
  1026  	DeadLetterPolicy *DeadLetterPolicy
  1027  
  1028  	// If non-nil, the current set of labels is completely
  1029  	// replaced by the new set.
  1030  	// This field has beta status. It is not subject to the stability guarantee
  1031  	// and may change.
  1032  	Labels map[string]string
  1033  
  1034  	// If non-nil, RetryPolicy is changed. To remove an existing retry policy
  1035  	// (to redeliver messages as soon as possible) use a pointer to the zero value
  1036  	// for this struct.
  1037  	RetryPolicy *RetryPolicy
  1038  
  1039  	// If set, EnableExactlyOnce is changed.
  1040  	EnableExactlyOnceDelivery optional.Bool
  1041  }
  1042  
  1043  // Update changes an existing subscription according to the fields set in cfg.
  1044  // It returns the new SubscriptionConfig.
  1045  //
  1046  // Update returns an error if no fields were modified.
  1047  func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) {
  1048  	req := s.updateRequest(&cfg)
  1049  	if err := cfg.validate(); err != nil {
  1050  		return SubscriptionConfig{}, fmt.Errorf("pubsub: UpdateSubscription %w", err)
  1051  	}
  1052  	if len(req.UpdateMask.Paths) == 0 {
  1053  		return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update")
  1054  	}
  1055  	rpsub, err := s.c.subc.UpdateSubscription(ctx, req)
  1056  	if err != nil {
  1057  		return SubscriptionConfig{}, err
  1058  	}
  1059  	return protoToSubscriptionConfig(rpsub, s.c)
  1060  }
  1061  
  1062  func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest {
  1063  	psub := &pb.Subscription{Name: s.name}
  1064  	var paths []string
  1065  	if cfg.PushConfig != nil {
  1066  		psub.PushConfig = cfg.PushConfig.toProto()
  1067  		paths = append(paths, "push_config")
  1068  	}
  1069  	if cfg.BigQueryConfig != nil {
  1070  		psub.BigqueryConfig = cfg.BigQueryConfig.toProto()
  1071  		paths = append(paths, "bigquery_config")
  1072  	}
  1073  	if cfg.CloudStorageConfig != nil {
  1074  		psub.CloudStorageConfig = cfg.CloudStorageConfig.toProto()
  1075  		paths = append(paths, "cloud_storage_config")
  1076  	}
  1077  	if cfg.AckDeadline != 0 {
  1078  		psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
  1079  		paths = append(paths, "ack_deadline_seconds")
  1080  	}
  1081  	if cfg.RetainAckedMessages != nil {
  1082  		psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages)
  1083  		paths = append(paths, "retain_acked_messages")
  1084  	}
  1085  	if cfg.RetentionDuration != 0 {
  1086  		psub.MessageRetentionDuration = durpb.New(cfg.RetentionDuration)
  1087  		paths = append(paths, "message_retention_duration")
  1088  	}
  1089  	if cfg.ExpirationPolicy != nil {
  1090  		psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy)
  1091  		paths = append(paths, "expiration_policy")
  1092  	}
  1093  	if cfg.DeadLetterPolicy != nil {
  1094  		psub.DeadLetterPolicy = cfg.DeadLetterPolicy.toProto()
  1095  		paths = append(paths, "dead_letter_policy")
  1096  	}
  1097  	if cfg.Labels != nil {
  1098  		psub.Labels = cfg.Labels
  1099  		paths = append(paths, "labels")
  1100  	}
  1101  	if cfg.RetryPolicy != nil {
  1102  		psub.RetryPolicy = cfg.RetryPolicy.toProto()
  1103  		paths = append(paths, "retry_policy")
  1104  	}
  1105  	if cfg.EnableExactlyOnceDelivery != nil {
  1106  		psub.EnableExactlyOnceDelivery = optional.ToBool(cfg.EnableExactlyOnceDelivery)
  1107  		paths = append(paths, "enable_exactly_once_delivery")
  1108  	}
  1109  	return &pb.UpdateSubscriptionRequest{
  1110  		Subscription: psub,
  1111  		UpdateMask:   &fmpb.FieldMask{Paths: paths},
  1112  	}
  1113  }
  1114  
  1115  const (
  1116  	// The minimum expiration policy duration is 1 day as per:
  1117  	//    https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L606-L607
  1118  	minExpirationPolicy = 24 * time.Hour
  1119  
  1120  	// If an expiration policy is not specified, the default of 31 days is used as per:
  1121  	//    https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L605-L606
  1122  	defaultExpirationPolicy = 31 * 24 * time.Hour
  1123  )
  1124  
  1125  func (cfg *SubscriptionConfigToUpdate) validate() error {
  1126  	if cfg == nil || cfg.ExpirationPolicy == nil {
  1127  		return nil
  1128  	}
  1129  	expPolicy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy
  1130  	if expPolicy != 0 && expPolicy < min {
  1131  		return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", expPolicy, min)
  1132  	}
  1133  	return nil
  1134  }
  1135  
  1136  func expirationPolicyToProto(expirationPolicy optional.Duration) *pb.ExpirationPolicy {
  1137  	if expirationPolicy == nil {
  1138  		return nil
  1139  	}
  1140  
  1141  	dur := optional.ToDuration(expirationPolicy)
  1142  	var ttl *durpb.Duration
  1143  	// As per:
  1144  	//    https://godoc.org/google.golang.org/genproto/googleapis/pubsub/v1#ExpirationPolicy.Ttl
  1145  	// if ExpirationPolicy.Ttl is set to nil, the expirationPolicy is toggled to NEVER expire.
  1146  	if dur != 0 {
  1147  		ttl = durpb.New(dur)
  1148  	}
  1149  	return &pb.ExpirationPolicy{
  1150  		Ttl: ttl,
  1151  	}
  1152  }
  1153  
  1154  // IAM returns the subscription's IAM handle.
  1155  func (s *Subscription) IAM() *iam.Handle {
  1156  	return iam.InternalNewHandle(s.c.subc.Connection(), s.name)
  1157  }
  1158  
  1159  // CreateSubscription creates a new subscription on a topic.
  1160  //
  1161  // id is the name of the subscription to create. It must start with a letter,
  1162  // and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-),
  1163  // underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It
  1164  // must be between 3 and 255 characters in length, and must not start with
  1165  // "goog".
  1166  //
  1167  // cfg.Topic is the topic from which the subscription should receive messages. It
  1168  // need not belong to the same project as the subscription. This field is required.
  1169  //
  1170  // cfg.AckDeadline is the maximum time after a subscriber receives a message before
  1171  // the subscriber should acknowledge the message. It must be between 10 and 600
  1172  // seconds (inclusive), and is rounded down to the nearest second. If the
  1173  // provided ackDeadline is 0, then the default value of 10 seconds is used.
  1174  // Note: messages which are obtained via Subscription.Receive need not be
  1175  // acknowledged within this deadline, as the deadline will be automatically
  1176  // extended.
  1177  //
  1178  // cfg.PushConfig may be set to configure this subscription for push delivery.
  1179  //
  1180  // If the subscription already exists an error will be returned.
  1181  func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) {
  1182  	if cfg.Topic == nil {
  1183  		return nil, errors.New("pubsub: require non-nil Topic")
  1184  	}
  1185  	if cfg.AckDeadline == 0 {
  1186  		cfg.AckDeadline = 10 * time.Second
  1187  	}
  1188  	if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second {
  1189  		return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d)
  1190  	}
  1191  
  1192  	sub := c.Subscription(id)
  1193  	_, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name))
  1194  	if err != nil {
  1195  		return nil, err
  1196  	}
  1197  	return sub, nil
  1198  }
  1199  
  1200  var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription")
  1201  
  1202  // Receive calls f with the outstanding messages from the subscription.
  1203  // It blocks until ctx is done, or the service returns a non-retryable error.
  1204  //
  1205  // The standard way to terminate a Receive is to cancel its context:
  1206  //
  1207  //	cctx, cancel := context.WithCancel(ctx)
  1208  //	err := sub.Receive(cctx, callback)
  1209  //	// Call cancel from callback, or another goroutine.
  1210  //
  1211  // If the service returns a non-retryable error, Receive returns that error after
  1212  // all of the outstanding calls to f have returned. If ctx is done, Receive
  1213  // returns nil after all of the outstanding calls to f have returned and
  1214  // all messages have been acknowledged or have expired.
  1215  //
  1216  // Receive calls f concurrently from multiple goroutines. It is encouraged to
  1217  // process messages synchronously in f, even if that processing is relatively
  1218  // time-consuming; Receive will spawn new goroutines for incoming messages,
  1219  // limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings.
  1220  //
  1221  // The context passed to f will be canceled when ctx is Done or there is a
  1222  // fatal service error.
  1223  //
  1224  // Receive will send an ack deadline extension on message receipt, then
  1225  // automatically extend the ack deadline of all fetched Messages up to the
  1226  // period specified by s.ReceiveSettings.MaxExtension.
  1227  //
  1228  // Each Subscription may have only one invocation of Receive active at a time.
  1229  func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error {
  1230  	s.mu.Lock()
  1231  	if s.receiveActive {
  1232  		s.mu.Unlock()
  1233  		return errReceiveInProgress
  1234  	}
  1235  	s.receiveActive = true
  1236  	s.mu.Unlock()
  1237  	defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }()
  1238  
  1239  	// TODO(hongalex): move settings check to a helper function to make it more testable
  1240  	maxCount := s.ReceiveSettings.MaxOutstandingMessages
  1241  	if maxCount == 0 {
  1242  		maxCount = DefaultReceiveSettings.MaxOutstandingMessages
  1243  	}
  1244  	maxBytes := s.ReceiveSettings.MaxOutstandingBytes
  1245  	if maxBytes == 0 {
  1246  		maxBytes = DefaultReceiveSettings.MaxOutstandingBytes
  1247  	}
  1248  	maxExt := s.ReceiveSettings.MaxExtension
  1249  	if maxExt == 0 {
  1250  		maxExt = DefaultReceiveSettings.MaxExtension
  1251  	} else if maxExt < 0 {
  1252  		// If MaxExtension is negative, disable automatic extension.
  1253  		maxExt = 0
  1254  	}
  1255  	maxExtPeriod := s.ReceiveSettings.MaxExtensionPeriod
  1256  	if maxExtPeriod < 0 {
  1257  		maxExtPeriod = DefaultReceiveSettings.MaxExtensionPeriod
  1258  	}
  1259  	minExtPeriod := s.ReceiveSettings.MinExtensionPeriod
  1260  	if minExtPeriod < 0 {
  1261  		minExtPeriod = DefaultReceiveSettings.MinExtensionPeriod
  1262  	}
  1263  
  1264  	var numGoroutines int
  1265  	switch {
  1266  	case s.ReceiveSettings.Synchronous:
  1267  		numGoroutines = 1
  1268  	case s.ReceiveSettings.NumGoroutines >= 1:
  1269  		numGoroutines = s.ReceiveSettings.NumGoroutines
  1270  	default:
  1271  		numGoroutines = DefaultReceiveSettings.NumGoroutines
  1272  	}
  1273  	// TODO(jba): add tests that verify that ReceiveSettings are correctly processed.
  1274  	po := &pullOptions{
  1275  		maxExtension:           maxExt,
  1276  		maxExtensionPeriod:     maxExtPeriod,
  1277  		minExtensionPeriod:     minExtPeriod,
  1278  		maxPrefetch:            trunc32(int64(maxCount)),
  1279  		synchronous:            s.ReceiveSettings.Synchronous,
  1280  		maxOutstandingMessages: maxCount,
  1281  		maxOutstandingBytes:    maxBytes,
  1282  		useLegacyFlowControl:   s.ReceiveSettings.UseLegacyFlowControl,
  1283  	}
  1284  	fc := newSubscriptionFlowController(FlowControlSettings{
  1285  		MaxOutstandingMessages: maxCount,
  1286  		MaxOutstandingBytes:    maxBytes,
  1287  		LimitExceededBehavior:  FlowControlBlock,
  1288  	})
  1289  
  1290  	sched := scheduler.NewReceiveScheduler(maxCount)
  1291  
  1292  	// Wait for all goroutines started by Receive to return, so instead of an
  1293  	// obscure goroutine leak we have an obvious blocked call to Receive.
  1294  	group, gctx := errgroup.WithContext(ctx)
  1295  
  1296  	type closeablePair struct {
  1297  		wg   *sync.WaitGroup
  1298  		iter *messageIterator
  1299  	}
  1300  
  1301  	var pairs []closeablePair
  1302  
  1303  	// Cancel a sub-context which, when we finish a single receiver, will kick
  1304  	// off the context-aware callbacks and the goroutine below (which stops
  1305  	// all receivers, iterators, and the scheduler).
  1306  	ctx2, cancel2 := context.WithCancel(gctx)
  1307  	defer cancel2()
  1308  
  1309  	for i := 0; i < numGoroutines; i++ {
  1310  		// The iterator does not use the context passed to Receive. If it did,
  1311  		// canceling that context would immediately stop the iterator without
  1312  		// waiting for unacked messages.
  1313  		iter := newMessageIterator(s.c.subc, s.name, po)
  1314  
  1315  		// We cannot use errgroup from Receive here. Receive might already be
  1316  		// calling group.Wait, and group.Wait cannot be called concurrently with
  1317  		// group.Go. We give each receive() its own WaitGroup instead.
  1318  		//
  1319  		// Since wg.Add is only called from the main goroutine, wg.Wait is
  1320  		// guaranteed to be called after all Adds.
  1321  		var wg sync.WaitGroup
  1322  		wg.Add(1)
  1323  		pairs = append(pairs, closeablePair{wg: &wg, iter: iter})
  1324  
  1325  		group.Go(func() error {
  1326  			defer wg.Wait()
  1327  			defer cancel2()
  1328  			for {
  1329  				var maxToPull int32 // maximum number of messages to pull
  1330  				if po.synchronous {
  1331  					if po.maxPrefetch < 0 {
  1332  						// If there is no limit on the number of messages to
  1333  						// pull, use a reasonable default.
  1334  						maxToPull = 1000
  1335  					} else {
  1336  						// Limit the number of messages in memory to MaxOutstandingMessages
  1337  						// (here, po.maxPrefetch). For each message currently in memory, we have
  1338  						// called fc.acquire but not fc.release: this is fc.count(). The next
  1339  						// call to Pull should fetch no more than the difference between these
  1340  						// values.
  1341  						maxToPull = po.maxPrefetch - int32(fc.count())
  1342  						if maxToPull <= 0 {
  1343  							// Wait for some callbacks to finish.
  1344  							if err := gax.Sleep(ctx, synchronousWaitTime); err != nil {
  1345  								// Return nil if the context is done, not err.
  1346  								return nil
  1347  							}
  1348  							continue
  1349  						}
  1350  					}
  1351  				}
  1352  				// If the context is done, don't pull more messages.
  1353  				select {
  1354  				case <-ctx.Done():
  1355  					return nil
  1356  				default:
  1357  				}
  1358  				msgs, err := iter.receive(maxToPull)
  1359  				if err == io.EOF {
  1360  					return nil
  1361  				}
  1362  				if err != nil {
  1363  					return err
  1364  				}
  1365  				// If context is done and messages have been pulled,
  1366  				// nack them.
  1367  				select {
  1368  				case <-ctx.Done():
  1369  					for _, m := range msgs {
  1370  						m.Nack()
  1371  					}
  1372  					return nil
  1373  				default:
  1374  				}
  1375  				for i, msg := range msgs {
  1376  					msg := msg
  1377  					// TODO(jba): call acquire closer to when the message is allocated.
  1378  					if err := fc.acquire(ctx, len(msg.Data)); err != nil {
  1379  						// TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done.
  1380  						for _, m := range msgs[i:] {
  1381  							m.Nack()
  1382  						}
  1383  						// Return nil if the context is done, not err.
  1384  						return nil
  1385  					}
  1386  					iter.eoMu.RLock()
  1387  					msgAckHandler(msg, iter.enableExactlyOnceDelivery)
  1388  					iter.eoMu.RUnlock()
  1389  
  1390  					wg.Add(1)
  1391  					// Only schedule messages in order if an ordering key is present and the subscriber client
  1392  					// received the ordering flag from a Streaming Pull response.
  1393  					var key string
  1394  					iter.orderingMu.RLock()
  1395  					if iter.enableOrdering {
  1396  						key = msg.OrderingKey
  1397  					}
  1398  					iter.orderingMu.RUnlock()
  1399  					msgLen := len(msg.Data)
  1400  					if err := sched.Add(key, msg, func(msg interface{}) {
  1401  						defer wg.Done()
  1402  						defer fc.release(ctx, msgLen)
  1403  						f(ctx2, msg.(*Message))
  1404  					}); err != nil {
  1405  						wg.Done()
  1406  						// If there are any errors with scheduling messages,
  1407  						// nack them so they can be redelivered.
  1408  						msg.Nack()
  1409  						// Currently, only this error is returned by the receive scheduler.
  1410  						if errors.Is(err, scheduler.ErrReceiveDraining) {
  1411  							return nil
  1412  						}
  1413  						return err
  1414  					}
  1415  				}
  1416  			}
  1417  		})
  1418  	}
  1419  
  1420  	go func() {
  1421  		<-ctx2.Done()
  1422  
  1423  		// Wait for all iterators to stop.
  1424  		for _, p := range pairs {
  1425  			p.iter.stop()
  1426  			p.wg.Done()
  1427  		}
  1428  
  1429  		// This _must_ happen after every iterator has stopped, or some
  1430  		// iterator will still have undelivered messages but the scheduler will
  1431  		// already be shut down.
  1432  		sched.Shutdown()
  1433  	}()
  1434  
  1435  	return group.Wait()
  1436  }
  1437  
  1438  type pullOptions struct {
  1439  	maxExtension       time.Duration // the maximum time to extend a message's ack deadline in total
  1440  	maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc
  1441  	minExtensionPeriod time.Duration // the minimum time to extend a message's lease duration per modack
  1442  	maxPrefetch        int32         // the max number of outstanding messages, used to calculate maxToPull
  1443  	// If true, use unary Pull instead of StreamingPull, and never pull more
  1444  	// than maxPrefetch messages.
  1445  	synchronous            bool
  1446  	maxOutstandingMessages int
  1447  	maxOutstandingBytes    int
  1448  	useLegacyFlowControl   bool
  1449  }
  1450  

View as plain text