1 // Copyright 2016 Google LLC 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 package pubsub 16 17 import ( 18 "context" 19 "errors" 20 "fmt" 21 "io" 22 "strings" 23 "sync" 24 "time" 25 26 "cloud.google.com/go/iam" 27 "cloud.google.com/go/internal/optional" 28 pb "cloud.google.com/go/pubsub/apiv1/pubsubpb" 29 "cloud.google.com/go/pubsub/internal/scheduler" 30 gax "github.com/googleapis/gax-go/v2" 31 "golang.org/x/sync/errgroup" 32 "google.golang.org/grpc/codes" 33 "google.golang.org/grpc/status" 34 "google.golang.org/protobuf/types/known/durationpb" 35 durpb "google.golang.org/protobuf/types/known/durationpb" 36 fmpb "google.golang.org/protobuf/types/known/fieldmaskpb" 37 38 vkit "cloud.google.com/go/pubsub/apiv1" 39 ) 40 41 // Subscription is a reference to a PubSub subscription. 42 type Subscription struct { 43 c *Client 44 45 // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>" 46 name string 47 48 // Settings for pulling messages. Configure these before calling Receive. 49 ReceiveSettings ReceiveSettings 50 51 mu sync.Mutex 52 receiveActive bool 53 } 54 55 // Subscription creates a reference to a subscription. 56 func (c *Client) Subscription(id string) *Subscription { 57 return c.SubscriptionInProject(id, c.projectID) 58 } 59 60 // SubscriptionInProject creates a reference to a subscription in a given project. 61 func (c *Client) SubscriptionInProject(id, projectID string) *Subscription { 62 return &Subscription{ 63 c: c, 64 name: fmt.Sprintf("projects/%s/subscriptions/%s", projectID, id), 65 } 66 } 67 68 // String returns the globally unique printable name of the subscription. 69 func (s *Subscription) String() string { 70 return s.name 71 } 72 73 // ID returns the unique identifier of the subscription within its project. 74 func (s *Subscription) ID() string { 75 slash := strings.LastIndex(s.name, "/") 76 if slash == -1 { 77 // name is not a fully-qualified name. 78 panic("bad subscription name") 79 } 80 return s.name[slash+1:] 81 } 82 83 // Subscriptions returns an iterator which returns all of the subscriptions for the client's project. 84 func (c *Client) Subscriptions(ctx context.Context) *SubscriptionIterator { 85 it := c.subc.ListSubscriptions(ctx, &pb.ListSubscriptionsRequest{ 86 Project: c.fullyQualifiedProjectName(), 87 }) 88 return &SubscriptionIterator{ 89 c: c, 90 it: it, 91 next: func() (string, error) { 92 sub, err := it.Next() 93 if err != nil { 94 return "", err 95 } 96 return sub.Name, nil 97 }, 98 } 99 } 100 101 // SubscriptionIterator is an iterator that returns a series of subscriptions. 102 type SubscriptionIterator struct { 103 c *Client 104 it *vkit.SubscriptionIterator 105 next func() (string, error) 106 } 107 108 // Next returns the next subscription. If there are no more subscriptions, iterator.Done will be returned. 109 func (subs *SubscriptionIterator) Next() (*Subscription, error) { 110 subName, err := subs.next() 111 if err != nil { 112 return nil, err 113 } 114 return &Subscription{c: subs.c, name: subName}, nil 115 } 116 117 // NextConfig returns the next subscription config. If there are no more subscriptions, 118 // iterator.Done will be returned. 119 // This call shares the underlying iterator with calls to `SubscriptionIterator.Next`. 120 // If you wish to use mix calls, create separate iterator instances for both. 121 func (subs *SubscriptionIterator) NextConfig() (*SubscriptionConfig, error) { 122 spb, err := subs.it.Next() 123 if err != nil { 124 return nil, err 125 } 126 cfg, err := protoToSubscriptionConfig(spb, subs.c) 127 if err != nil { 128 return nil, err 129 } 130 return &cfg, nil 131 } 132 133 // PushConfig contains configuration for subscriptions that operate in push mode. 134 type PushConfig struct { 135 // A URL locating the endpoint to which messages should be pushed. 136 Endpoint string 137 138 // Endpoint configuration attributes. See https://cloud.google.com/pubsub/docs/reference/rest/v1/projects.subscriptions#pushconfig for more details. 139 Attributes map[string]string 140 141 // AuthenticationMethod is used by push endpoints to verify the source 142 // of push requests. 143 // It can be used with push endpoints that are private by default to 144 // allow requests only from the Cloud Pub/Sub system, for example. 145 // This field is optional and should be set only by users interested in 146 // authenticated push. 147 AuthenticationMethod AuthenticationMethod 148 149 // The format of the delivered message to the push endpoint is defined by 150 // the chosen wrapper. When unset, `PubsubWrapper` is used. 151 Wrapper Wrapper 152 } 153 154 func (pc *PushConfig) toProto() *pb.PushConfig { 155 if pc == nil { 156 return nil 157 } 158 pbCfg := &pb.PushConfig{ 159 Attributes: pc.Attributes, 160 PushEndpoint: pc.Endpoint, 161 } 162 if authMethod := pc.AuthenticationMethod; authMethod != nil { 163 switch am := authMethod.(type) { 164 case *OIDCToken: 165 pbCfg.AuthenticationMethod = am.toProto() 166 default: // TODO: add others here when GAIC adds more definitions. 167 } 168 } 169 if w := pc.Wrapper; w != nil { 170 switch wt := w.(type) { 171 case *PubsubWrapper: 172 pbCfg.Wrapper = wt.toProto() 173 case *NoWrapper: 174 pbCfg.Wrapper = wt.toProto() 175 default: 176 } 177 } 178 return pbCfg 179 } 180 181 // AuthenticationMethod is used by push subscriptions to verify the source of push requests. 182 type AuthenticationMethod interface { 183 isAuthMethod() bool 184 } 185 186 // OIDCToken allows PushConfigs to be authenticated using 187 // the OpenID Connect protocol https://openid.net/connect/ 188 type OIDCToken struct { 189 // Audience to be used when generating OIDC token. The audience claim 190 // identifies the recipients that the JWT is intended for. The audience 191 // value is a single case-sensitive string. Having multiple values (array) 192 // for the audience field is not supported. More info about the OIDC JWT 193 // token audience here: https://tools.ietf.org/html/rfc7519#section-4.1.3 194 // Note: if not specified, the Push endpoint URL will be used. 195 Audience string 196 197 // The service account email to be used for generating the OpenID Connect token. 198 // The caller of: 199 // * CreateSubscription 200 // * UpdateSubscription 201 // * ModifyPushConfig 202 // calls must have the iam.serviceAccounts.actAs permission for the service account. 203 // See https://cloud.google.com/iam/docs/understanding-roles#service-accounts-roles. 204 ServiceAccountEmail string 205 } 206 207 var _ AuthenticationMethod = (*OIDCToken)(nil) 208 209 func (oidcToken *OIDCToken) isAuthMethod() bool { return true } 210 211 func (oidcToken *OIDCToken) toProto() *pb.PushConfig_OidcToken_ { 212 if oidcToken == nil { 213 return nil 214 } 215 return &pb.PushConfig_OidcToken_{ 216 OidcToken: &pb.PushConfig_OidcToken{ 217 Audience: oidcToken.Audience, 218 ServiceAccountEmail: oidcToken.ServiceAccountEmail, 219 }, 220 } 221 } 222 223 // Wrapper defines the format of message delivered to push endpoints. 224 type Wrapper interface { 225 isWrapper() bool 226 } 227 228 // PubsubWrapper denotes sending the payload to the push endpoint in the form of the JSON 229 // representation of a PubsubMessage 230 // (https://cloud.google.com/pubsub/docs/reference/rpc/google.pubsub.v1#pubsubmessage). 231 type PubsubWrapper struct{} 232 233 var _ Wrapper = (*PubsubWrapper)(nil) 234 235 func (p *PubsubWrapper) isWrapper() bool { return true } 236 237 func (p *PubsubWrapper) toProto() *pb.PushConfig_PubsubWrapper_ { 238 if p == nil { 239 return nil 240 } 241 return &pb.PushConfig_PubsubWrapper_{ 242 PubsubWrapper: &pb.PushConfig_PubsubWrapper{}, 243 } 244 } 245 246 // NoWrapper denotes not wrapping the payload sent to the push endpoint. 247 type NoWrapper struct { 248 WriteMetadata bool 249 } 250 251 var _ Wrapper = (*NoWrapper)(nil) 252 253 func (n *NoWrapper) isWrapper() bool { return true } 254 255 func (n *NoWrapper) toProto() *pb.PushConfig_NoWrapper_ { 256 if n == nil { 257 return nil 258 } 259 return &pb.PushConfig_NoWrapper_{ 260 NoWrapper: &pb.PushConfig_NoWrapper{ 261 WriteMetadata: n.WriteMetadata, 262 }, 263 } 264 } 265 266 // BigQueryConfigState denotes the possible states for a BigQuery Subscription. 267 type BigQueryConfigState int 268 269 const ( 270 // BigQueryConfigStateUnspecified is the default value. This value is unused. 271 BigQueryConfigStateUnspecified = iota 272 273 // BigQueryConfigActive means the subscription can actively send messages to BigQuery. 274 BigQueryConfigActive 275 276 // BigQueryConfigPermissionDenied means the subscription cannot write to the BigQuery table because of permission denied errors. 277 BigQueryConfigPermissionDenied 278 279 // BigQueryConfigNotFound means the subscription cannot write to the BigQuery table because it does not exist. 280 BigQueryConfigNotFound 281 282 // BigQueryConfigSchemaMismatch means the subscription cannot write to the BigQuery table due to a schema mismatch. 283 BigQueryConfigSchemaMismatch 284 ) 285 286 // BigQueryConfig configures the subscription to deliver to a BigQuery table. 287 type BigQueryConfig struct { 288 // The name of the table to which to write data, of the form 289 // {projectId}:{datasetId}.{tableId} 290 Table string 291 292 // When true, use the topic's schema as the columns to write to in BigQuery, 293 // if it exists. 294 UseTopicSchema bool 295 296 // When true, write the subscription name, message_id, publish_time, 297 // attributes, and ordering_key to additional columns in the table. The 298 // subscription name, message_id, and publish_time fields are put in their own 299 // columns while all other message properties (other than data) are written to 300 // a JSON object in the attributes column. 301 WriteMetadata bool 302 303 // When true and use_topic_schema is true, any fields that are a part of the 304 // topic schema that are not part of the BigQuery table schema are dropped 305 // when writing to BigQuery. Otherwise, the schemas must be kept in sync and 306 // any messages with extra fields are not written and remain in the 307 // subscription's backlog. 308 DropUnknownFields bool 309 310 // This is an output-only field that indicates whether or not the subscription can 311 // receive messages. This field is set only in responses from the server; 312 // it is ignored if it is set in any requests. 313 State BigQueryConfigState 314 } 315 316 func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig { 317 if bc == nil { 318 return nil 319 } 320 // If the config is zero valued, this is the sentinel for 321 // clearing bigquery config and switch back to pull. 322 if *bc == (BigQueryConfig{}) { 323 return nil 324 } 325 pbCfg := &pb.BigQueryConfig{ 326 Table: bc.Table, 327 UseTopicSchema: bc.UseTopicSchema, 328 WriteMetadata: bc.WriteMetadata, 329 DropUnknownFields: bc.DropUnknownFields, 330 State: pb.BigQueryConfig_State(bc.State), 331 } 332 return pbCfg 333 } 334 335 // CloudStorageConfigState denotes the possible states for a Cloud Storage Subscription. 336 type CloudStorageConfigState int 337 338 const ( 339 // CloudStorageConfigStateUnspecified is the default value. This value is unused. 340 CloudStorageConfigStateUnspecified = iota 341 342 // CloudStorageConfigActive means the subscription can actively send messages to Cloud Storage. 343 CloudStorageConfigActive 344 345 // CloudStorageConfigPermissionDenied means the subscription cannot write to the Cloud storage bucket because of permission denied errors. 346 CloudStorageConfigPermissionDenied 347 348 // CloudStorageConfigNotFound means the subscription cannot write to the Cloud Storage bucket because it does not exist. 349 CloudStorageConfigNotFound 350 ) 351 352 // Configuration options for how to write the message data to Cloud Storage. 353 type isCloudStorageOutputFormat interface { 354 isCloudStorageOutputFormat() 355 } 356 357 // CloudStorageOutputFormatTextConfig is the configuration for writing 358 // message data in text format. Message payloads will be written to files 359 // as raw text, separated by a newline. 360 type CloudStorageOutputFormatTextConfig struct{} 361 362 // CloudStorageOutputFormatAvroConfig is the configuration for writing 363 // message data in Avro format. Message payloads and metadata will be written 364 // to the files as an Avro binary. 365 type CloudStorageOutputFormatAvroConfig struct { 366 // When true, write the subscription name, message_id, publish_time, 367 // attributes, and ordering_key as additional fields in the output. 368 WriteMetadata bool 369 } 370 371 func (*CloudStorageOutputFormatTextConfig) isCloudStorageOutputFormat() {} 372 373 func (*CloudStorageOutputFormatAvroConfig) isCloudStorageOutputFormat() {} 374 375 // CloudStorageConfig configures the subscription to deliver to Cloud Storage. 376 type CloudStorageConfig struct { 377 // User-provided name for the Cloud Storage bucket. 378 // The bucket must be created by the user. The bucket name must be without 379 // any prefix like "gs://". See the [bucket naming 380 // requirements] (https://cloud.google.com/storage/docs/buckets#naming). 381 Bucket string 382 383 // User-provided prefix for Cloud Storage filename. See the [object naming 384 // requirements](https://cloud.google.com/storage/docs/objects#naming). 385 FilenamePrefix string 386 387 // User-provided suffix for Cloud Storage filename. See the [object naming 388 // requirements](https://cloud.google.com/storage/docs/objects#naming). 389 FilenameSuffix string 390 391 // Configuration for how to write message data. Options are 392 // CloudStorageOutputFormat_TextConfig and CloudStorageOutputFormat_AvroConfig. 393 // Defaults to text format. 394 OutputFormat isCloudStorageOutputFormat 395 396 // The maximum duration that can elapse before a new Cloud Storage file is 397 // created. Min 1 minute, max 10 minutes, default 5 minutes. May not exceed 398 // the subscription's acknowledgement deadline. 399 MaxDuration optional.Duration 400 401 // The maximum bytes that can be written to a Cloud Storage file before a new 402 // file is created. Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded 403 // in cases where messages are larger than the limit. 404 MaxBytes int64 405 406 // Output only. An output-only field that indicates whether or not the 407 // subscription can receive messages. 408 State CloudStorageConfigState 409 } 410 411 func (cs *CloudStorageConfig) toProto() *pb.CloudStorageConfig { 412 if cs == nil { 413 return nil 414 } 415 // For the purposes of the live service, an empty/zero-valued config 416 // is treated the same as nil and clearing this setting. 417 if (CloudStorageConfig{}) == *cs { 418 return nil 419 } 420 var dur *durationpb.Duration 421 if cs.MaxDuration != nil { 422 dur = durationpb.New(optional.ToDuration(cs.MaxDuration)) 423 } 424 pbCfg := &pb.CloudStorageConfig{ 425 Bucket: cs.Bucket, 426 FilenamePrefix: cs.FilenamePrefix, 427 FilenameSuffix: cs.FilenameSuffix, 428 MaxDuration: dur, 429 MaxBytes: cs.MaxBytes, 430 State: pb.CloudStorageConfig_State(cs.State), 431 } 432 if out := cs.OutputFormat; out != nil { 433 if _, ok := out.(*CloudStorageOutputFormatTextConfig); ok { 434 pbCfg.OutputFormat = &pb.CloudStorageConfig_TextConfig_{} 435 } else if cfg, ok := out.(*CloudStorageOutputFormatAvroConfig); ok { 436 pbCfg.OutputFormat = &pb.CloudStorageConfig_AvroConfig_{ 437 AvroConfig: &pb.CloudStorageConfig_AvroConfig{ 438 WriteMetadata: cfg.WriteMetadata, 439 }, 440 } 441 } 442 } 443 return pbCfg 444 } 445 446 // SubscriptionState denotes the possible states for a Subscription. 447 type SubscriptionState int 448 449 const ( 450 // SubscriptionStateUnspecified is the default value. This value is unused. 451 SubscriptionStateUnspecified = iota 452 453 // SubscriptionStateActive means the subscription can actively send messages to BigQuery. 454 SubscriptionStateActive 455 456 // SubscriptionStateResourceError means the subscription receive messages because of an 457 // error with the resource to which it pushes messages. 458 // See the more detailed error state in the corresponding configuration. 459 SubscriptionStateResourceError 460 ) 461 462 // SubscriptionConfig describes the configuration of a subscription. If none of 463 // PushConfig, BigQueryConfig, or CloudStorageConfig is set, then the subscriber will 464 // pull and ack messages using API methods. At most one of these fields may be set. 465 type SubscriptionConfig struct { 466 // The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>" 467 name string 468 469 // The topic from which this subscription is receiving messages. 470 Topic *Topic 471 472 // If push delivery is used with this subscription, this field is 473 // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, 474 // or `CloudStorageConfig` can be set. If all are empty, then the 475 // subscriber will pull and ack messages using API methods. 476 PushConfig PushConfig 477 478 // If delivery to BigQuery is used with this subscription, this field is 479 // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, 480 // or `CloudStorageConfig` can be set. If all are empty, then the 481 // subscriber will pull and ack messages using API methods. 482 BigQueryConfig BigQueryConfig 483 484 // If delivery to Cloud Storage is used with this subscription, this field is 485 // used to configure it. At most one of `PushConfig`, `BigQueryConfig`, 486 // or `CloudStorageConfig` can be set. If all are empty, then the 487 // subscriber will pull and ack messages using API methods. 488 CloudStorageConfig CloudStorageConfig 489 490 // The default maximum time after a subscriber receives a message before 491 // the subscriber should acknowledge the message. Note: messages which are 492 // obtained via Subscription.Receive need not be acknowledged within this 493 // deadline, as the deadline will be automatically extended. 494 AckDeadline time.Duration 495 496 // Whether to retain acknowledged messages. If true, acknowledged messages 497 // will not be expunged until they fall out of the RetentionDuration window. 498 RetainAckedMessages bool 499 500 // How long to retain messages in backlog, from the time of publish. If 501 // RetainAckedMessages is true, this duration affects the retention of 502 // acknowledged messages, otherwise only unacknowledged messages are retained. 503 // Defaults to 7 days. Cannot be longer than 7 days or shorter than 10 minutes. 504 RetentionDuration time.Duration 505 506 // Expiration policy specifies the conditions for a subscription's expiration. 507 // A subscription is considered active as long as any connected subscriber is 508 // successfully consuming messages from the subscription or is issuing 509 // operations on the subscription. If `expiration_policy` is not set, a 510 // *default policy* with `ttl` of 31 days will be used. The minimum allowed 511 // value for `expiration_policy.ttl` is 1 day. 512 // 513 // Use time.Duration(0) to indicate that the subscription should never expire. 514 ExpirationPolicy optional.Duration 515 516 // The set of labels for the subscription. 517 Labels map[string]string 518 519 // EnableMessageOrdering enables message ordering on this subscription. 520 // This value is only used for subscription creation and update, and 521 // is not read locally in calls like Subscription.Receive(). 522 // 523 // If set to false, even if messages are published with ordering keys, 524 // messages will not be delivered in order. 525 // 526 // When calling Subscription.Receive(), the client will check this 527 // value with a call to Subscription.Config(), which requires the 528 // roles/viewer or roles/pubsub.viewer role on your service account. 529 // If that call fails, mesages with ordering keys will be delivered in order. 530 EnableMessageOrdering bool 531 532 // DeadLetterPolicy specifies the conditions for dead lettering messages in 533 // a subscription. If not set, dead lettering is disabled. 534 DeadLetterPolicy *DeadLetterPolicy 535 536 // Filter is an expression written in the Cloud Pub/Sub filter language. If 537 // non-empty, then only `PubsubMessage`s whose `attributes` field matches the 538 // filter are delivered on this subscription. If empty, then no messages are 539 // filtered out. Cannot be changed after the subscription is created. 540 Filter string 541 542 // RetryPolicy specifies how Cloud Pub/Sub retries message delivery. 543 RetryPolicy *RetryPolicy 544 545 // Detached indicates whether the subscription is detached from its topic. 546 // Detached subscriptions don't receive messages from their topic and don't 547 // retain any backlog. `Pull` and `StreamingPull` requests will return 548 // FAILED_PRECONDITION. If the subscription is a push subscription, pushes to 549 // the endpoint will not be made. 550 Detached bool 551 552 // TopicMessageRetentionDuration indicates the minimum duration for which a message is 553 // retained after it is published to the subscription's topic. If this field is 554 // set, messages published to the subscription's topic in the last 555 // `TopicMessageRetentionDuration` are always available to subscribers. 556 // You can enable both topic and subscription retention for the same topic. 557 // In this situation, the maximum of the retention durations takes effect. 558 // 559 // This is an output only field, meaning it will only appear in responses from the backend 560 // and will be ignored if sent in a request. 561 TopicMessageRetentionDuration time.Duration 562 563 // EnableExactlyOnceDelivery configures Pub/Sub to provide the following guarantees 564 // for the delivery of a message with a given MessageID on this subscription: 565 // 566 // The message sent to a subscriber is guaranteed not to be resent 567 // before the message's acknowledgement deadline expires. 568 // An acknowledged message will not be resent to a subscriber. 569 // 570 // Note that subscribers may still receive multiple copies of a message 571 // when `enable_exactly_once_delivery` is true if the message was published 572 // multiple times by a publisher client. These copies are considered distinct 573 // by Pub/Sub and have distinct MessageID values. 574 // 575 // Lastly, to guarantee messages have been acked or nacked properly, you must 576 // call Message.AckWithResult() or Message.NackWithResult(). These return an 577 // AckResult which will be ready if the message has been acked (or failed to be acked). 578 EnableExactlyOnceDelivery bool 579 580 // State indicates whether or not the subscription can receive messages. 581 // This is an output-only field that indicates whether or not the subscription can 582 // receive messages. This field is set only in responses from the server; 583 // it is ignored if it is set in any requests. 584 State SubscriptionState 585 } 586 587 // String returns the globally unique printable name of the subscription config. 588 // This method only works when the subscription config is returned from the server, 589 // such as when calling `client.Subscription` or `client.Subscriptions`. 590 // Otherwise, this will return an empty string. 591 func (s *SubscriptionConfig) String() string { 592 return s.name 593 } 594 595 // ID returns the unique identifier of the subscription within its project. 596 // This method only works when the subscription config is returned from the server, 597 // such as when calling `client.Subscription` or `client.Subscriptions`. 598 // Otherwise, this will return an empty string. 599 func (s *SubscriptionConfig) ID() string { 600 slash := strings.LastIndex(s.name, "/") 601 if slash == -1 { 602 return "" 603 } 604 return s.name[slash+1:] 605 } 606 607 func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription { 608 var pbPushConfig *pb.PushConfig 609 if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil { 610 pbPushConfig = cfg.PushConfig.toProto() 611 } 612 pbBigQueryConfig := cfg.BigQueryConfig.toProto() 613 pbCloudStorageConfig := cfg.CloudStorageConfig.toProto() 614 var retentionDuration *durpb.Duration 615 if cfg.RetentionDuration != 0 { 616 retentionDuration = durpb.New(cfg.RetentionDuration) 617 } 618 var pbDeadLetter *pb.DeadLetterPolicy 619 if cfg.DeadLetterPolicy != nil { 620 pbDeadLetter = cfg.DeadLetterPolicy.toProto() 621 } 622 var pbRetryPolicy *pb.RetryPolicy 623 if cfg.RetryPolicy != nil { 624 pbRetryPolicy = cfg.RetryPolicy.toProto() 625 } 626 return &pb.Subscription{ 627 Name: name, 628 Topic: cfg.Topic.name, 629 PushConfig: pbPushConfig, 630 BigqueryConfig: pbBigQueryConfig, 631 CloudStorageConfig: pbCloudStorageConfig, 632 AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())), 633 RetainAckedMessages: cfg.RetainAckedMessages, 634 MessageRetentionDuration: retentionDuration, 635 Labels: cfg.Labels, 636 ExpirationPolicy: expirationPolicyToProto(cfg.ExpirationPolicy), 637 EnableMessageOrdering: cfg.EnableMessageOrdering, 638 DeadLetterPolicy: pbDeadLetter, 639 Filter: cfg.Filter, 640 RetryPolicy: pbRetryPolicy, 641 Detached: cfg.Detached, 642 EnableExactlyOnceDelivery: cfg.EnableExactlyOnceDelivery, 643 } 644 } 645 646 func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionConfig, error) { 647 rd := time.Hour * 24 * 7 648 if pbSub.MessageRetentionDuration != nil { 649 rd = pbSub.MessageRetentionDuration.AsDuration() 650 } 651 var expirationPolicy time.Duration 652 if ttl := pbSub.ExpirationPolicy.GetTtl(); ttl != nil { 653 expirationPolicy = ttl.AsDuration() 654 } 655 dlp := protoToDLP(pbSub.DeadLetterPolicy) 656 rp := protoToRetryPolicy(pbSub.RetryPolicy) 657 subC := SubscriptionConfig{ 658 name: pbSub.Name, 659 Topic: newTopic(c, pbSub.Topic), 660 AckDeadline: time.Second * time.Duration(pbSub.AckDeadlineSeconds), 661 RetainAckedMessages: pbSub.RetainAckedMessages, 662 RetentionDuration: rd, 663 Labels: pbSub.Labels, 664 ExpirationPolicy: expirationPolicy, 665 EnableMessageOrdering: pbSub.EnableMessageOrdering, 666 DeadLetterPolicy: dlp, 667 Filter: pbSub.Filter, 668 RetryPolicy: rp, 669 Detached: pbSub.Detached, 670 TopicMessageRetentionDuration: pbSub.TopicMessageRetentionDuration.AsDuration(), 671 EnableExactlyOnceDelivery: pbSub.EnableExactlyOnceDelivery, 672 State: SubscriptionState(pbSub.State), 673 } 674 if pc := protoToPushConfig(pbSub.PushConfig); pc != nil { 675 subC.PushConfig = *pc 676 } 677 if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil { 678 subC.BigQueryConfig = *bq 679 } 680 if cs := protoToStorageConfig(pbSub.GetCloudStorageConfig()); cs != nil { 681 subC.CloudStorageConfig = *cs 682 } 683 return subC, nil 684 } 685 686 func protoToPushConfig(pbPc *pb.PushConfig) *PushConfig { 687 if pbPc == nil { 688 return nil 689 } 690 pc := &PushConfig{ 691 Endpoint: pbPc.PushEndpoint, 692 Attributes: pbPc.Attributes, 693 } 694 if am := pbPc.AuthenticationMethod; am != nil { 695 if oidcToken, ok := am.(*pb.PushConfig_OidcToken_); ok && oidcToken != nil && oidcToken.OidcToken != nil { 696 pc.AuthenticationMethod = &OIDCToken{ 697 Audience: oidcToken.OidcToken.GetAudience(), 698 ServiceAccountEmail: oidcToken.OidcToken.GetServiceAccountEmail(), 699 } 700 } 701 } 702 if w := pbPc.Wrapper; w != nil { 703 switch wt := w.(type) { 704 case *pb.PushConfig_PubsubWrapper_: 705 pc.Wrapper = &PubsubWrapper{} 706 case *pb.PushConfig_NoWrapper_: 707 pc.Wrapper = &NoWrapper{ 708 WriteMetadata: wt.NoWrapper.WriteMetadata, 709 } 710 } 711 } 712 return pc 713 } 714 715 func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig { 716 if pbBQ == nil { 717 return nil 718 } 719 bq := &BigQueryConfig{ 720 Table: pbBQ.GetTable(), 721 UseTopicSchema: pbBQ.GetUseTopicSchema(), 722 DropUnknownFields: pbBQ.GetDropUnknownFields(), 723 WriteMetadata: pbBQ.GetWriteMetadata(), 724 State: BigQueryConfigState(pbBQ.State), 725 } 726 return bq 727 } 728 729 func protoToStorageConfig(pbCSC *pb.CloudStorageConfig) *CloudStorageConfig { 730 if pbCSC == nil { 731 return nil 732 } 733 734 csc := &CloudStorageConfig{ 735 Bucket: pbCSC.GetBucket(), 736 FilenamePrefix: pbCSC.GetFilenamePrefix(), 737 FilenameSuffix: pbCSC.GetFilenameSuffix(), 738 MaxBytes: pbCSC.GetMaxBytes(), 739 State: CloudStorageConfigState(pbCSC.GetState()), 740 } 741 if dur := pbCSC.GetMaxDuration().AsDuration(); dur != 0 { 742 csc.MaxDuration = dur 743 } 744 if out := pbCSC.OutputFormat; out != nil { 745 if _, ok := out.(*pb.CloudStorageConfig_TextConfig_); ok { 746 csc.OutputFormat = &CloudStorageOutputFormatTextConfig{} 747 } else if cfg, ok := out.(*pb.CloudStorageConfig_AvroConfig_); ok { 748 csc.OutputFormat = &CloudStorageOutputFormatAvroConfig{WriteMetadata: cfg.AvroConfig.GetWriteMetadata()} 749 } 750 } 751 return csc 752 } 753 754 // DeadLetterPolicy specifies the conditions for dead lettering messages in 755 // a subscription. 756 type DeadLetterPolicy struct { 757 DeadLetterTopic string 758 MaxDeliveryAttempts int 759 } 760 761 func (dlp *DeadLetterPolicy) toProto() *pb.DeadLetterPolicy { 762 if dlp == nil || dlp.DeadLetterTopic == "" { 763 return nil 764 } 765 return &pb.DeadLetterPolicy{ 766 DeadLetterTopic: dlp.DeadLetterTopic, 767 MaxDeliveryAttempts: int32(dlp.MaxDeliveryAttempts), 768 } 769 } 770 func protoToDLP(pbDLP *pb.DeadLetterPolicy) *DeadLetterPolicy { 771 if pbDLP == nil { 772 return nil 773 } 774 return &DeadLetterPolicy{ 775 DeadLetterTopic: pbDLP.GetDeadLetterTopic(), 776 MaxDeliveryAttempts: int(pbDLP.MaxDeliveryAttempts), 777 } 778 } 779 780 // RetryPolicy specifies how Cloud Pub/Sub retries message delivery. 781 // 782 // Retry delay will be exponential based on provided minimum and maximum 783 // backoffs. https://en.wikipedia.org/wiki/Exponential_backoff. 784 // 785 // RetryPolicy will be triggered on NACKs or acknowledgement deadline exceeded 786 // events for a given message. 787 // 788 // Retry Policy is implemented on a best effort basis. At times, the delay 789 // between consecutive deliveries may not match the configuration. That is, 790 // delay can be more or less than configured backoff. 791 type RetryPolicy struct { 792 // MinimumBackoff is the minimum delay between consecutive deliveries of a 793 // given message. Value should be between 0 and 600 seconds. Defaults to 10 seconds. 794 MinimumBackoff optional.Duration 795 // MaximumBackoff is the maximum delay between consecutive deliveries of a 796 // given message. Value should be between 0 and 600 seconds. Defaults to 600 seconds. 797 MaximumBackoff optional.Duration 798 } 799 800 func (rp *RetryPolicy) toProto() *pb.RetryPolicy { 801 if rp == nil { 802 return nil 803 } 804 // If RetryPolicy is the empty struct, take this as an instruction 805 // to remove RetryPolicy from the subscription. 806 if rp.MinimumBackoff == nil && rp.MaximumBackoff == nil { 807 return nil 808 } 809 810 // Initialize minDur and maxDur to be negative, such that if the conversion from an 811 // optional fails, RetryPolicy won't be updated in the proto as it will remain nil. 812 var minDur time.Duration = -1 813 var maxDur time.Duration = -1 814 if rp.MinimumBackoff != nil { 815 minDur = optional.ToDuration(rp.MinimumBackoff) 816 } 817 if rp.MaximumBackoff != nil { 818 maxDur = optional.ToDuration(rp.MaximumBackoff) 819 } 820 821 var minDurPB, maxDurPB *durpb.Duration 822 if minDur > 0 { 823 minDurPB = durpb.New(minDur) 824 } 825 if maxDur > 0 { 826 maxDurPB = durpb.New(maxDur) 827 } 828 829 return &pb.RetryPolicy{ 830 MinimumBackoff: minDurPB, 831 MaximumBackoff: maxDurPB, 832 } 833 } 834 835 func protoToRetryPolicy(rp *pb.RetryPolicy) *RetryPolicy { 836 if rp == nil { 837 return nil 838 } 839 var minBackoff, maxBackoff time.Duration 840 if rp.MinimumBackoff != nil { 841 minBackoff = rp.MinimumBackoff.AsDuration() 842 } 843 if rp.MaximumBackoff != nil { 844 maxBackoff = rp.MaximumBackoff.AsDuration() 845 } 846 847 retryPolicy := &RetryPolicy{ 848 MinimumBackoff: minBackoff, 849 MaximumBackoff: maxBackoff, 850 } 851 return retryPolicy 852 } 853 854 // ReceiveSettings configure the Receive method. 855 // A zero ReceiveSettings will result in values equivalent to DefaultReceiveSettings. 856 type ReceiveSettings struct { 857 // MaxExtension is the maximum period for which the Subscription should 858 // automatically extend the ack deadline for each message. 859 // 860 // The Subscription will automatically extend the ack deadline of all 861 // fetched Messages up to the duration specified. Automatic deadline 862 // extension beyond the initial receipt may be disabled by specifying a 863 // duration less than 0. 864 MaxExtension time.Duration 865 866 // MaxExtensionPeriod is the maximum duration by which to extend the ack 867 // deadline at a time. The ack deadline will continue to be extended by up 868 // to this duration until MaxExtension is reached. Setting MaxExtensionPeriod 869 // bounds the maximum amount of time before a message redelivery in the 870 // event the subscriber fails to extend the deadline. 871 // 872 // MaxExtensionPeriod must be between 10s and 600s (inclusive). This configuration 873 // can be disabled by specifying a duration less than (or equal to) 0. 874 MaxExtensionPeriod time.Duration 875 876 // MinExtensionPeriod is the the min duration for a single lease extension attempt. 877 // By default the 99th percentile of ack latency is used to determine lease extension 878 // periods but this value can be set to minimize the number of extraneous RPCs sent. 879 // 880 // MinExtensionPeriod must be between 10s and 600s (inclusive). This configuration 881 // can be disabled by specifying a duration less than (or equal to) 0. 882 // Defaults to off but set to 60 seconds if the subscription has exactly-once delivery enabled, 883 // which will be added in a future release. 884 MinExtensionPeriod time.Duration 885 886 // MaxOutstandingMessages is the maximum number of unprocessed messages 887 // (unacknowledged but not yet expired). If MaxOutstandingMessages is 0, it 888 // will be treated as if it were DefaultReceiveSettings.MaxOutstandingMessages. 889 // If the value is negative, then there will be no limit on the number of 890 // unprocessed messages. 891 MaxOutstandingMessages int 892 893 // MaxOutstandingBytes is the maximum size of unprocessed messages 894 // (unacknowledged but not yet expired). If MaxOutstandingBytes is 0, it will 895 // be treated as if it were DefaultReceiveSettings.MaxOutstandingBytes. If 896 // the value is negative, then there will be no limit on the number of bytes 897 // for unprocessed messages. 898 MaxOutstandingBytes int 899 900 // UseLegacyFlowControl disables enforcing flow control settings at the Cloud 901 // PubSub server and the less accurate method of only enforcing flow control 902 // at the client side is used. 903 // The default is false. 904 UseLegacyFlowControl bool 905 906 // NumGoroutines sets the number of StreamingPull streams to pull messages 907 // from the subscription. 908 // 909 // NumGoroutines defaults to DefaultReceiveSettings.NumGoroutines. 910 // 911 // NumGoroutines does not limit the number of messages that can be processed 912 // concurrently. Even with one goroutine, many messages might be processed at 913 // once, because that goroutine may continually receive messages and invoke the 914 // function passed to Receive on them. To limit the number of messages being 915 // processed concurrently, set MaxOutstandingMessages. 916 NumGoroutines int 917 918 // Synchronous switches the underlying receiving mechanism to unary Pull. 919 // When Synchronous is false, the more performant StreamingPull is used. 920 // StreamingPull also has the benefit of subscriber affinity when using 921 // ordered delivery. 922 // When Synchronous is true, NumGoroutines is set to 1 and only one Pull 923 // RPC will be made to poll messages at a time. 924 // The default is false. 925 // 926 // Deprecated. 927 // Previously, users might use Synchronous mode since StreamingPull had a limitation 928 // where MaxOutstandingMessages was not always respected with large batches of 929 // small messages. With server side flow control, this is no longer an issue 930 // and we recommend switching to the default StreamingPull mode by setting 931 // Synchronous to false. 932 // Synchronous mode does not work with exactly once delivery. 933 Synchronous bool 934 } 935 936 // For synchronous receive, the time to wait if we are already processing 937 // MaxOutstandingMessages. There is no point calling Pull and asking for zero 938 // messages, so we pause to allow some message-processing callbacks to finish. 939 // 940 // The wait time is large enough to avoid consuming significant CPU, but 941 // small enough to provide decent throughput. Users who want better 942 // throughput should not be using synchronous mode. 943 // 944 // Waiting might seem like polling, so it's natural to think we could do better by 945 // noticing when a callback is finished and immediately calling Pull. But if 946 // callbacks finish in quick succession, this will result in frequent Pull RPCs that 947 // request a single message, which wastes network bandwidth. Better to wait for a few 948 // callbacks to finish, so we make fewer RPCs fetching more messages. 949 // 950 // This value is unexported so the user doesn't have another knob to think about. Note that 951 // it is the same value as the one used for nackTicker, so it matches this client's 952 // idea of a duration that is short, but not so short that we perform excessive RPCs. 953 const synchronousWaitTime = 100 * time.Millisecond 954 955 // DefaultReceiveSettings holds the default values for ReceiveSettings. 956 var DefaultReceiveSettings = ReceiveSettings{ 957 MaxExtension: 60 * time.Minute, 958 MaxExtensionPeriod: 0, 959 MinExtensionPeriod: 0, 960 MaxOutstandingMessages: 1000, 961 MaxOutstandingBytes: 1e9, // 1G 962 NumGoroutines: 10, 963 } 964 965 // Delete deletes the subscription. 966 func (s *Subscription) Delete(ctx context.Context) error { 967 return s.c.subc.DeleteSubscription(ctx, &pb.DeleteSubscriptionRequest{Subscription: s.name}) 968 } 969 970 // Exists reports whether the subscription exists on the server. 971 func (s *Subscription) Exists(ctx context.Context) (bool, error) { 972 _, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name}) 973 if err == nil { 974 return true, nil 975 } 976 if status.Code(err) == codes.NotFound { 977 return false, nil 978 } 979 return false, err 980 } 981 982 // Config fetches the current configuration for the subscription. 983 func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) { 984 pbSub, err := s.c.subc.GetSubscription(ctx, &pb.GetSubscriptionRequest{Subscription: s.name}) 985 if err != nil { 986 return SubscriptionConfig{}, err 987 } 988 cfg, err := protoToSubscriptionConfig(pbSub, s.c) 989 if err != nil { 990 return SubscriptionConfig{}, err 991 } 992 return cfg, nil 993 } 994 995 // SubscriptionConfigToUpdate describes how to update a subscription. 996 type SubscriptionConfigToUpdate struct { 997 // If non-nil, the push config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig 998 // can be set. 999 // If currently in push mode, set this value to the zero value to revert to a Pull based subscription. 1000 PushConfig *PushConfig 1001 1002 // If non-nil, the bigquery config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig 1003 // can be set. 1004 // If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription, 1005 BigQueryConfig *BigQueryConfig 1006 1007 // If non-nil, the Cloud Storage config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig 1008 // can be set. 1009 // If currently in CloudStorage mode, set this value to the zero value to revert to a Pull based subscription, 1010 CloudStorageConfig *CloudStorageConfig 1011 1012 // If non-zero, the ack deadline is changed. 1013 AckDeadline time.Duration 1014 1015 // If set, RetainAckedMessages is changed. 1016 RetainAckedMessages optional.Bool 1017 1018 // If non-zero, RetentionDuration is changed. 1019 RetentionDuration time.Duration 1020 1021 // If non-zero, Expiration is changed. 1022 ExpirationPolicy optional.Duration 1023 1024 // If non-nil, DeadLetterPolicy is changed. To remove dead lettering from 1025 // a subscription, use the zero value for this struct. 1026 DeadLetterPolicy *DeadLetterPolicy 1027 1028 // If non-nil, the current set of labels is completely 1029 // replaced by the new set. 1030 // This field has beta status. It is not subject to the stability guarantee 1031 // and may change. 1032 Labels map[string]string 1033 1034 // If non-nil, RetryPolicy is changed. To remove an existing retry policy 1035 // (to redeliver messages as soon as possible) use a pointer to the zero value 1036 // for this struct. 1037 RetryPolicy *RetryPolicy 1038 1039 // If set, EnableExactlyOnce is changed. 1040 EnableExactlyOnceDelivery optional.Bool 1041 } 1042 1043 // Update changes an existing subscription according to the fields set in cfg. 1044 // It returns the new SubscriptionConfig. 1045 // 1046 // Update returns an error if no fields were modified. 1047 func (s *Subscription) Update(ctx context.Context, cfg SubscriptionConfigToUpdate) (SubscriptionConfig, error) { 1048 req := s.updateRequest(&cfg) 1049 if err := cfg.validate(); err != nil { 1050 return SubscriptionConfig{}, fmt.Errorf("pubsub: UpdateSubscription %w", err) 1051 } 1052 if len(req.UpdateMask.Paths) == 0 { 1053 return SubscriptionConfig{}, errors.New("pubsub: UpdateSubscription call with nothing to update") 1054 } 1055 rpsub, err := s.c.subc.UpdateSubscription(ctx, req) 1056 if err != nil { 1057 return SubscriptionConfig{}, err 1058 } 1059 return protoToSubscriptionConfig(rpsub, s.c) 1060 } 1061 1062 func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.UpdateSubscriptionRequest { 1063 psub := &pb.Subscription{Name: s.name} 1064 var paths []string 1065 if cfg.PushConfig != nil { 1066 psub.PushConfig = cfg.PushConfig.toProto() 1067 paths = append(paths, "push_config") 1068 } 1069 if cfg.BigQueryConfig != nil { 1070 psub.BigqueryConfig = cfg.BigQueryConfig.toProto() 1071 paths = append(paths, "bigquery_config") 1072 } 1073 if cfg.CloudStorageConfig != nil { 1074 psub.CloudStorageConfig = cfg.CloudStorageConfig.toProto() 1075 paths = append(paths, "cloud_storage_config") 1076 } 1077 if cfg.AckDeadline != 0 { 1078 psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds())) 1079 paths = append(paths, "ack_deadline_seconds") 1080 } 1081 if cfg.RetainAckedMessages != nil { 1082 psub.RetainAckedMessages = optional.ToBool(cfg.RetainAckedMessages) 1083 paths = append(paths, "retain_acked_messages") 1084 } 1085 if cfg.RetentionDuration != 0 { 1086 psub.MessageRetentionDuration = durpb.New(cfg.RetentionDuration) 1087 paths = append(paths, "message_retention_duration") 1088 } 1089 if cfg.ExpirationPolicy != nil { 1090 psub.ExpirationPolicy = expirationPolicyToProto(cfg.ExpirationPolicy) 1091 paths = append(paths, "expiration_policy") 1092 } 1093 if cfg.DeadLetterPolicy != nil { 1094 psub.DeadLetterPolicy = cfg.DeadLetterPolicy.toProto() 1095 paths = append(paths, "dead_letter_policy") 1096 } 1097 if cfg.Labels != nil { 1098 psub.Labels = cfg.Labels 1099 paths = append(paths, "labels") 1100 } 1101 if cfg.RetryPolicy != nil { 1102 psub.RetryPolicy = cfg.RetryPolicy.toProto() 1103 paths = append(paths, "retry_policy") 1104 } 1105 if cfg.EnableExactlyOnceDelivery != nil { 1106 psub.EnableExactlyOnceDelivery = optional.ToBool(cfg.EnableExactlyOnceDelivery) 1107 paths = append(paths, "enable_exactly_once_delivery") 1108 } 1109 return &pb.UpdateSubscriptionRequest{ 1110 Subscription: psub, 1111 UpdateMask: &fmpb.FieldMask{Paths: paths}, 1112 } 1113 } 1114 1115 const ( 1116 // The minimum expiration policy duration is 1 day as per: 1117 // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L606-L607 1118 minExpirationPolicy = 24 * time.Hour 1119 1120 // If an expiration policy is not specified, the default of 31 days is used as per: 1121 // https://github.com/googleapis/googleapis/blob/51145ff7812d2bb44c1219d0b76dac92a8bd94b2/google/pubsub/v1/pubsub.proto#L605-L606 1122 defaultExpirationPolicy = 31 * 24 * time.Hour 1123 ) 1124 1125 func (cfg *SubscriptionConfigToUpdate) validate() error { 1126 if cfg == nil || cfg.ExpirationPolicy == nil { 1127 return nil 1128 } 1129 expPolicy, min := optional.ToDuration(cfg.ExpirationPolicy), minExpirationPolicy 1130 if expPolicy != 0 && expPolicy < min { 1131 return fmt.Errorf("invalid expiration policy(%q) < minimum(%q)", expPolicy, min) 1132 } 1133 return nil 1134 } 1135 1136 func expirationPolicyToProto(expirationPolicy optional.Duration) *pb.ExpirationPolicy { 1137 if expirationPolicy == nil { 1138 return nil 1139 } 1140 1141 dur := optional.ToDuration(expirationPolicy) 1142 var ttl *durpb.Duration 1143 // As per: 1144 // https://godoc.org/google.golang.org/genproto/googleapis/pubsub/v1#ExpirationPolicy.Ttl 1145 // if ExpirationPolicy.Ttl is set to nil, the expirationPolicy is toggled to NEVER expire. 1146 if dur != 0 { 1147 ttl = durpb.New(dur) 1148 } 1149 return &pb.ExpirationPolicy{ 1150 Ttl: ttl, 1151 } 1152 } 1153 1154 // IAM returns the subscription's IAM handle. 1155 func (s *Subscription) IAM() *iam.Handle { 1156 return iam.InternalNewHandle(s.c.subc.Connection(), s.name) 1157 } 1158 1159 // CreateSubscription creates a new subscription on a topic. 1160 // 1161 // id is the name of the subscription to create. It must start with a letter, 1162 // and contain only letters ([A-Za-z]), numbers ([0-9]), dashes (-), 1163 // underscores (_), periods (.), tildes (~), plus (+) or percent signs (%). It 1164 // must be between 3 and 255 characters in length, and must not start with 1165 // "goog". 1166 // 1167 // cfg.Topic is the topic from which the subscription should receive messages. It 1168 // need not belong to the same project as the subscription. This field is required. 1169 // 1170 // cfg.AckDeadline is the maximum time after a subscriber receives a message before 1171 // the subscriber should acknowledge the message. It must be between 10 and 600 1172 // seconds (inclusive), and is rounded down to the nearest second. If the 1173 // provided ackDeadline is 0, then the default value of 10 seconds is used. 1174 // Note: messages which are obtained via Subscription.Receive need not be 1175 // acknowledged within this deadline, as the deadline will be automatically 1176 // extended. 1177 // 1178 // cfg.PushConfig may be set to configure this subscription for push delivery. 1179 // 1180 // If the subscription already exists an error will be returned. 1181 func (c *Client) CreateSubscription(ctx context.Context, id string, cfg SubscriptionConfig) (*Subscription, error) { 1182 if cfg.Topic == nil { 1183 return nil, errors.New("pubsub: require non-nil Topic") 1184 } 1185 if cfg.AckDeadline == 0 { 1186 cfg.AckDeadline = 10 * time.Second 1187 } 1188 if d := cfg.AckDeadline; d < 10*time.Second || d > 600*time.Second { 1189 return nil, fmt.Errorf("ack deadline must be between 10 and 600 seconds; got: %v", d) 1190 } 1191 1192 sub := c.Subscription(id) 1193 _, err := c.subc.CreateSubscription(ctx, cfg.toProto(sub.name)) 1194 if err != nil { 1195 return nil, err 1196 } 1197 return sub, nil 1198 } 1199 1200 var errReceiveInProgress = errors.New("pubsub: Receive already in progress for this subscription") 1201 1202 // Receive calls f with the outstanding messages from the subscription. 1203 // It blocks until ctx is done, or the service returns a non-retryable error. 1204 // 1205 // The standard way to terminate a Receive is to cancel its context: 1206 // 1207 // cctx, cancel := context.WithCancel(ctx) 1208 // err := sub.Receive(cctx, callback) 1209 // // Call cancel from callback, or another goroutine. 1210 // 1211 // If the service returns a non-retryable error, Receive returns that error after 1212 // all of the outstanding calls to f have returned. If ctx is done, Receive 1213 // returns nil after all of the outstanding calls to f have returned and 1214 // all messages have been acknowledged or have expired. 1215 // 1216 // Receive calls f concurrently from multiple goroutines. It is encouraged to 1217 // process messages synchronously in f, even if that processing is relatively 1218 // time-consuming; Receive will spawn new goroutines for incoming messages, 1219 // limited by MaxOutstandingMessages and MaxOutstandingBytes in ReceiveSettings. 1220 // 1221 // The context passed to f will be canceled when ctx is Done or there is a 1222 // fatal service error. 1223 // 1224 // Receive will send an ack deadline extension on message receipt, then 1225 // automatically extend the ack deadline of all fetched Messages up to the 1226 // period specified by s.ReceiveSettings.MaxExtension. 1227 // 1228 // Each Subscription may have only one invocation of Receive active at a time. 1229 func (s *Subscription) Receive(ctx context.Context, f func(context.Context, *Message)) error { 1230 s.mu.Lock() 1231 if s.receiveActive { 1232 s.mu.Unlock() 1233 return errReceiveInProgress 1234 } 1235 s.receiveActive = true 1236 s.mu.Unlock() 1237 defer func() { s.mu.Lock(); s.receiveActive = false; s.mu.Unlock() }() 1238 1239 // TODO(hongalex): move settings check to a helper function to make it more testable 1240 maxCount := s.ReceiveSettings.MaxOutstandingMessages 1241 if maxCount == 0 { 1242 maxCount = DefaultReceiveSettings.MaxOutstandingMessages 1243 } 1244 maxBytes := s.ReceiveSettings.MaxOutstandingBytes 1245 if maxBytes == 0 { 1246 maxBytes = DefaultReceiveSettings.MaxOutstandingBytes 1247 } 1248 maxExt := s.ReceiveSettings.MaxExtension 1249 if maxExt == 0 { 1250 maxExt = DefaultReceiveSettings.MaxExtension 1251 } else if maxExt < 0 { 1252 // If MaxExtension is negative, disable automatic extension. 1253 maxExt = 0 1254 } 1255 maxExtPeriod := s.ReceiveSettings.MaxExtensionPeriod 1256 if maxExtPeriod < 0 { 1257 maxExtPeriod = DefaultReceiveSettings.MaxExtensionPeriod 1258 } 1259 minExtPeriod := s.ReceiveSettings.MinExtensionPeriod 1260 if minExtPeriod < 0 { 1261 minExtPeriod = DefaultReceiveSettings.MinExtensionPeriod 1262 } 1263 1264 var numGoroutines int 1265 switch { 1266 case s.ReceiveSettings.Synchronous: 1267 numGoroutines = 1 1268 case s.ReceiveSettings.NumGoroutines >= 1: 1269 numGoroutines = s.ReceiveSettings.NumGoroutines 1270 default: 1271 numGoroutines = DefaultReceiveSettings.NumGoroutines 1272 } 1273 // TODO(jba): add tests that verify that ReceiveSettings are correctly processed. 1274 po := &pullOptions{ 1275 maxExtension: maxExt, 1276 maxExtensionPeriod: maxExtPeriod, 1277 minExtensionPeriod: minExtPeriod, 1278 maxPrefetch: trunc32(int64(maxCount)), 1279 synchronous: s.ReceiveSettings.Synchronous, 1280 maxOutstandingMessages: maxCount, 1281 maxOutstandingBytes: maxBytes, 1282 useLegacyFlowControl: s.ReceiveSettings.UseLegacyFlowControl, 1283 } 1284 fc := newSubscriptionFlowController(FlowControlSettings{ 1285 MaxOutstandingMessages: maxCount, 1286 MaxOutstandingBytes: maxBytes, 1287 LimitExceededBehavior: FlowControlBlock, 1288 }) 1289 1290 sched := scheduler.NewReceiveScheduler(maxCount) 1291 1292 // Wait for all goroutines started by Receive to return, so instead of an 1293 // obscure goroutine leak we have an obvious blocked call to Receive. 1294 group, gctx := errgroup.WithContext(ctx) 1295 1296 type closeablePair struct { 1297 wg *sync.WaitGroup 1298 iter *messageIterator 1299 } 1300 1301 var pairs []closeablePair 1302 1303 // Cancel a sub-context which, when we finish a single receiver, will kick 1304 // off the context-aware callbacks and the goroutine below (which stops 1305 // all receivers, iterators, and the scheduler). 1306 ctx2, cancel2 := context.WithCancel(gctx) 1307 defer cancel2() 1308 1309 for i := 0; i < numGoroutines; i++ { 1310 // The iterator does not use the context passed to Receive. If it did, 1311 // canceling that context would immediately stop the iterator without 1312 // waiting for unacked messages. 1313 iter := newMessageIterator(s.c.subc, s.name, po) 1314 1315 // We cannot use errgroup from Receive here. Receive might already be 1316 // calling group.Wait, and group.Wait cannot be called concurrently with 1317 // group.Go. We give each receive() its own WaitGroup instead. 1318 // 1319 // Since wg.Add is only called from the main goroutine, wg.Wait is 1320 // guaranteed to be called after all Adds. 1321 var wg sync.WaitGroup 1322 wg.Add(1) 1323 pairs = append(pairs, closeablePair{wg: &wg, iter: iter}) 1324 1325 group.Go(func() error { 1326 defer wg.Wait() 1327 defer cancel2() 1328 for { 1329 var maxToPull int32 // maximum number of messages to pull 1330 if po.synchronous { 1331 if po.maxPrefetch < 0 { 1332 // If there is no limit on the number of messages to 1333 // pull, use a reasonable default. 1334 maxToPull = 1000 1335 } else { 1336 // Limit the number of messages in memory to MaxOutstandingMessages 1337 // (here, po.maxPrefetch). For each message currently in memory, we have 1338 // called fc.acquire but not fc.release: this is fc.count(). The next 1339 // call to Pull should fetch no more than the difference between these 1340 // values. 1341 maxToPull = po.maxPrefetch - int32(fc.count()) 1342 if maxToPull <= 0 { 1343 // Wait for some callbacks to finish. 1344 if err := gax.Sleep(ctx, synchronousWaitTime); err != nil { 1345 // Return nil if the context is done, not err. 1346 return nil 1347 } 1348 continue 1349 } 1350 } 1351 } 1352 // If the context is done, don't pull more messages. 1353 select { 1354 case <-ctx.Done(): 1355 return nil 1356 default: 1357 } 1358 msgs, err := iter.receive(maxToPull) 1359 if err == io.EOF { 1360 return nil 1361 } 1362 if err != nil { 1363 return err 1364 } 1365 // If context is done and messages have been pulled, 1366 // nack them. 1367 select { 1368 case <-ctx.Done(): 1369 for _, m := range msgs { 1370 m.Nack() 1371 } 1372 return nil 1373 default: 1374 } 1375 for i, msg := range msgs { 1376 msg := msg 1377 // TODO(jba): call acquire closer to when the message is allocated. 1378 if err := fc.acquire(ctx, len(msg.Data)); err != nil { 1379 // TODO(jba): test that these "orphaned" messages are nacked immediately when ctx is done. 1380 for _, m := range msgs[i:] { 1381 m.Nack() 1382 } 1383 // Return nil if the context is done, not err. 1384 return nil 1385 } 1386 iter.eoMu.RLock() 1387 msgAckHandler(msg, iter.enableExactlyOnceDelivery) 1388 iter.eoMu.RUnlock() 1389 1390 wg.Add(1) 1391 // Only schedule messages in order if an ordering key is present and the subscriber client 1392 // received the ordering flag from a Streaming Pull response. 1393 var key string 1394 iter.orderingMu.RLock() 1395 if iter.enableOrdering { 1396 key = msg.OrderingKey 1397 } 1398 iter.orderingMu.RUnlock() 1399 msgLen := len(msg.Data) 1400 if err := sched.Add(key, msg, func(msg interface{}) { 1401 defer wg.Done() 1402 defer fc.release(ctx, msgLen) 1403 f(ctx2, msg.(*Message)) 1404 }); err != nil { 1405 wg.Done() 1406 // If there are any errors with scheduling messages, 1407 // nack them so they can be redelivered. 1408 msg.Nack() 1409 // Currently, only this error is returned by the receive scheduler. 1410 if errors.Is(err, scheduler.ErrReceiveDraining) { 1411 return nil 1412 } 1413 return err 1414 } 1415 } 1416 } 1417 }) 1418 } 1419 1420 go func() { 1421 <-ctx2.Done() 1422 1423 // Wait for all iterators to stop. 1424 for _, p := range pairs { 1425 p.iter.stop() 1426 p.wg.Done() 1427 } 1428 1429 // This _must_ happen after every iterator has stopped, or some 1430 // iterator will still have undelivered messages but the scheduler will 1431 // already be shut down. 1432 sched.Shutdown() 1433 }() 1434 1435 return group.Wait() 1436 } 1437 1438 type pullOptions struct { 1439 maxExtension time.Duration // the maximum time to extend a message's ack deadline in total 1440 maxExtensionPeriod time.Duration // the maximum time to extend a message's ack deadline per modack rpc 1441 minExtensionPeriod time.Duration // the minimum time to extend a message's lease duration per modack 1442 maxPrefetch int32 // the max number of outstanding messages, used to calculate maxToPull 1443 // If true, use unary Pull instead of StreamingPull, and never pull more 1444 // than maxPrefetch messages. 1445 synchronous bool 1446 maxOutstandingMessages int 1447 maxOutstandingBytes int 1448 useLegacyFlowControl bool 1449 } 1450