1 // Copyright 2016 Google LLC 2 // 3 // Licensed under the Apache License, Version 2.0 (the "License"); 4 // you may not use this file except in compliance with the License. 5 // You may obtain a copy of the License at 6 // 7 // http://www.apache.org/licenses/LICENSE-2.0 8 // 9 // Unless required by applicable law or agreed to in writing, software 10 // distributed under the License is distributed on an "AS IS" BASIS, 11 // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 12 // See the License for the specific language governing permissions and 13 // limitations under the License. 14 15 /* 16 Package pubsub provides an easy way to publish and receive Google Cloud Pub/Sub 17 messages, hiding the details of the underlying server RPCs. Google Cloud 18 Pub/Sub is a many-to-many, asynchronous messaging system that decouples senders 19 and receivers. 20 21 More information about Google Cloud Pub/Sub is available at 22 https://cloud.google.com/pubsub/docs 23 24 See https://godoc.org/cloud.google.com/go for authentication, timeouts, 25 connection pooling and similar aspects of this package. 26 27 # Publishing 28 29 Google Cloud Pub/Sub messages are published to topics. Topics may be created 30 using the pubsub package like so: 31 32 topic, err := pubsubClient.CreateTopic(context.Background(), "topic-name") 33 34 Messages may then be published to a topic: 35 36 res := topic.Publish(ctx, &pubsub.Message{Data: []byte("payload")}) 37 38 Publish queues the message for publishing and returns immediately. When enough 39 messages have accumulated, or enough time has elapsed, the batch of messages is 40 sent to the Pub/Sub service. 41 42 Publish returns a PublishResult, which behaves like a future: its Get method 43 blocks until the message has been sent to the service. 44 45 The first time you call Publish on a topic, goroutines are started in the 46 background. To clean up these goroutines, call Stop: 47 48 topic.Stop() 49 50 # Receiving 51 52 To receive messages published to a topic, clients create subscriptions 53 to the topic. There may be more than one subscription per topic; each message 54 that is published to the topic will be delivered to all of its subscriptions. 55 56 Subscriptions may be created like so: 57 58 sub, err := pubsubClient.CreateSubscription(context.Background(), "sub-name", 59 pubsub.SubscriptionConfig{Topic: topic}) 60 61 Messages are then consumed from a subscription via callback. 62 63 err := sub.Receive(context.Background(), func(ctx context.Context, m *Message) { 64 log.Printf("Got message: %s", m.Data) 65 m.Ack() 66 }) 67 if err != nil { 68 // Handle error. 69 } 70 71 The callback is invoked concurrently by multiple goroutines, maximizing 72 throughput. To terminate a call to Receive, cancel its context. 73 74 Once client code has processed the message, it must call Message.Ack or 75 Message.Nack; otherwise the message will eventually be redelivered. Ack/Nack 76 MUST be called within the Receive handler function, and not from a goroutine. 77 Otherwise, flow control (e.g. ReceiveSettings.MaxOutstandingMessages) will 78 not be respected, and messages can get orphaned when cancelling Receive. 79 80 If the client cannot or doesn't want to process the message, it can call Message.Nack 81 to speed redelivery. For more information and configuration options, see 82 "Ack Deadlines" below. 83 84 Note: It is possible for Messages to be redelivered even if Message.Ack has 85 been called. Client code must be robust to multiple deliveries of messages. 86 87 Note: This uses pubsub's streaming pull feature. This feature has properties that 88 may be surprising. Please take a look at https://cloud.google.com/pubsub/docs/pull#streamingpull 89 for more details on how streaming pull behaves compared to the synchronous 90 pull method. 91 92 # Streams Management 93 94 The number of StreamingPull connections can be configured by setting sub.ReceiveSettings.NumGoroutines. 95 The default value of 10 means the client library will maintain 10 StreamingPull connections. 96 This is more than sufficient for most use cases, as StreamingPull connections can handle up to 97 10 MB/s https://cloud.google.com/pubsub/quotas#resource_limits. In some cases, using too many streams 98 can lead to client library behaving poorly as the application becomes I/O bound. 99 100 By default, the number of connections in the gRPC conn pool is min(4,GOMAXPROCS). Each connection supports 101 up to 100 streams. Thus, if you have 4 or more CPU cores, the default setting allows a maximum of 400 streams 102 which is already excessive for most use cases. 103 If you want to change the limits on the number of streams, you can change the number of connections 104 in the gRPC connection pool as shown below: 105 106 opts := []option.ClientOption{ 107 option.WithGRPCConnectionPool(2), 108 } 109 client, err := pubsub.NewClient(ctx, projID, opts...) 110 111 # Ack Deadlines 112 113 The default pubsub deadlines are suitable for most use cases, but may be 114 overridden. This section describes the tradeoffs that should be considered 115 when overriding the defaults. 116 117 Behind the scenes, each message returned by the Pub/Sub server has an 118 associated lease, known as an "ack deadline". Unless a message is 119 acknowledged within the ack deadline, or the client requests that 120 the ack deadline be extended, the message will become eligible for redelivery. 121 122 As a convenience, the pubsub client will automatically extend deadlines until 123 either: 124 - Message.Ack or Message.Nack is called, or 125 - The "MaxExtension" duration elapses from the time the message is fetched from 126 the server. This defaults to 60m. 127 128 Ack deadlines are extended periodically by the client. The initial ack 129 deadline given to messages is based on the subscription's AckDeadline property, 130 which defaults to 10s. The period between extensions, as well as the 131 length of the extension, automatically adjusts based on the time it takes the 132 subscriber application to ack messages (based on the 99th percentile of ack latency). 133 By default, this extension period is capped at 10m, but this limit can be configured 134 by the "MaxExtensionPeriod" setting. This has the effect that subscribers that process 135 messages quickly have their message ack deadlines extended for a short amount, whereas 136 subscribers that process message slowly have their message ack deadlines extended 137 for a large amount. The net effect is fewer RPCs sent from the client library. 138 139 For example, consider a subscriber that takes 3 minutes to process each message. 140 Since the library has already recorded several 3-minute "ack latencies"s in a 141 percentile distribution, future message extensions are sent with a value of 3 142 minutes, every 3 minutes. Suppose the application crashes 5 seconds after the 143 library sends such an extension: the Pub/Sub server would wait the remaining 144 2m55s before re-sending the messages out to other subscribers. 145 146 Please note that by default, the client library does not use the subscription's 147 AckDeadline for the MaxExtension value. To enforce the subscription's AckDeadline, 148 set MaxExtension to the subscription's AckDeadline: 149 150 cfg, err := sub.Config(ctx) 151 if err != nil { 152 // TODO: handle err 153 } 154 155 sub.ReceiveSettings.MaxExtension = cfg.AckDeadline 156 157 # Slow Message Processing 158 159 For use cases where message processing exceeds 30 minutes, we recommend using 160 the base client in a pull model, since long-lived streams are periodically killed 161 by firewalls. See the example at https://godoc.org/cloud.google.com/go/pubsub/apiv1#example-SubscriberClient-Pull-LengthyClientProcessing 162 163 # Emulator 164 165 To use an emulator with this library, you can set the PUBSUB_EMULATOR_HOST 166 environment variable to the address at which your emulator is running. This will 167 send requests to that address instead of to Cloud Pub/Sub. You can then create 168 and use a client as usual: 169 170 // Set PUBSUB_EMULATOR_HOST environment variable. 171 err := os.Setenv("PUBSUB_EMULATOR_HOST", "localhost:9000") 172 if err != nil { 173 // TODO: Handle error. 174 } 175 // Create client as usual. 176 client, err := pubsub.NewClient(ctx, "my-project-id") 177 if err != nil { 178 // TODO: Handle error. 179 } 180 defer client.Close() 181 */ 182 package pubsub // import "cloud.google.com/go/pubsub" 183