1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "log"
22 "runtime"
23 "strings"
24 "sync"
25 "time"
26
27 "cloud.google.com/go/iam"
28 "cloud.google.com/go/internal/optional"
29 ipubsub "cloud.google.com/go/internal/pubsub"
30 vkit "cloud.google.com/go/pubsub/apiv1"
31 pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
32 "cloud.google.com/go/pubsub/internal/scheduler"
33 gax "github.com/googleapis/gax-go/v2"
34 "go.opencensus.io/stats"
35 "go.opencensus.io/tag"
36 "google.golang.org/api/support/bundler"
37 "google.golang.org/grpc"
38 "google.golang.org/grpc/codes"
39 "google.golang.org/grpc/encoding/gzip"
40 "google.golang.org/grpc/status"
41 "google.golang.org/protobuf/proto"
42 "google.golang.org/protobuf/types/known/durationpb"
43 fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"
44 )
45
46 const (
47
48
49 MaxPublishRequestCount = 1000
50
51
52
53 MaxPublishRequestBytes = 1e7
54 )
55
56 const (
57
58
59 intSize = 32 << (^uint(0) >> 63)
60 maxInt = 1<<(intSize-1) - 1
61 )
62
63
64 var ErrOversizedMessage = bundler.ErrOversizedItem
65
66
67
68
69 type Topic struct {
70 c *Client
71
72 name string
73
74
75
76 PublishSettings PublishSettings
77
78 mu sync.RWMutex
79 stopped bool
80 scheduler *scheduler.PublishScheduler
81
82 flowController
83
84
85 EnableMessageOrdering bool
86 }
87
88
89 type PublishSettings struct {
90
91
92 DelayThreshold time.Duration
93
94
95
96 CountThreshold int
97
98
99 ByteThreshold int
100
101
102
103
104
105
106 NumGoroutines int
107
108
109 Timeout time.Duration
110
111
112
113
114
115
116
117 BufferedByteLimit int
118
119
120 FlowControlSettings FlowControlSettings
121
122
123 EnableCompression bool
124
125
126
127 CompressionBytesThreshold int
128 }
129
130 func (ps *PublishSettings) shouldCompress(batchSize int) bool {
131 return ps.EnableCompression && batchSize > ps.CompressionBytesThreshold
132 }
133
134
135 var DefaultPublishSettings = PublishSettings{
136 DelayThreshold: 10 * time.Millisecond,
137 CountThreshold: 100,
138 ByteThreshold: 1e6,
139 Timeout: 60 * time.Second,
140
141
142
143 BufferedByteLimit: 10 * MaxPublishRequestBytes,
144 FlowControlSettings: FlowControlSettings{
145 MaxOutstandingMessages: 1000,
146 MaxOutstandingBytes: -1,
147 LimitExceededBehavior: FlowControlIgnore,
148 },
149
150
151 EnableCompression: false,
152 CompressionBytesThreshold: 240,
153 }
154
155
156
157
158
159
160
161
162
163
164 func (c *Client) CreateTopic(ctx context.Context, topicID string) (*Topic, error) {
165 t := c.Topic(topicID)
166 _, err := c.pubc.CreateTopic(ctx, &pb.Topic{Name: t.name})
167 if err != nil {
168 return nil, err
169 }
170 return t, nil
171 }
172
173
174
175
176
177
178
179
180
181
182 func (c *Client) CreateTopicWithConfig(ctx context.Context, topicID string, tc *TopicConfig) (*Topic, error) {
183 t := c.Topic(topicID)
184 topic := tc.toProto()
185 topic.Name = t.name
186 _, err := c.pubc.CreateTopic(ctx, topic)
187 if err != nil {
188 return nil, err
189 }
190 return t, nil
191 }
192
193
194
195
196
197
198
199 func (c *Client) Topic(id string) *Topic {
200 return c.TopicInProject(id, c.projectID)
201 }
202
203
204
205
206
207
208
209 func (c *Client) TopicInProject(id, projectID string) *Topic {
210 return newTopic(c, fmt.Sprintf("projects/%s/topics/%s", projectID, id))
211 }
212
213 func newTopic(c *Client, name string) *Topic {
214 return &Topic{
215 c: c,
216 name: name,
217 PublishSettings: DefaultPublishSettings,
218 }
219 }
220
221
222 type TopicState int
223
224 const (
225
226 TopicStateUnspecified = iota
227
228
229 TopicStateActive
230
231
232
233
234
235 TopicStateIngestionResourceError
236 )
237
238
239 type TopicConfig struct {
240
241 name string
242
243
244 Labels map[string]string
245
246
247 MessageStoragePolicy MessageStoragePolicy
248
249
250
251
252 KMSKeyName string
253
254
255 SchemaSettings *SchemaSettings
256
257
258
259
260
261
262
263
264
265
266
267 RetentionDuration optional.Duration
268
269
270 State TopicState
271
272
273
274 IngestionDataSourceSettings *IngestionDataSourceSettings
275 }
276
277
278
279
280
281 func (t *TopicConfig) String() string {
282 return t.name
283 }
284
285
286
287
288
289 func (t *TopicConfig) ID() string {
290 slash := strings.LastIndex(t.name, "/")
291 if slash == -1 {
292 return ""
293 }
294 return t.name[slash+1:]
295 }
296
297 func (tc *TopicConfig) toProto() *pb.Topic {
298 var retDur *durationpb.Duration
299 if tc.RetentionDuration != nil {
300 retDur = durationpb.New(optional.ToDuration(tc.RetentionDuration))
301 }
302 pbt := &pb.Topic{
303 Labels: tc.Labels,
304 MessageStoragePolicy: messageStoragePolicyToProto(&tc.MessageStoragePolicy),
305 KmsKeyName: tc.KMSKeyName,
306 SchemaSettings: schemaSettingsToProto(tc.SchemaSettings),
307 MessageRetentionDuration: retDur,
308 IngestionDataSourceSettings: tc.IngestionDataSourceSettings.toProto(),
309 }
310 return pbt
311 }
312
313
314 type TopicConfigToUpdate struct {
315
316
317 Labels map[string]string
318
319
320
321
322
323
324
325
326
327
328
329 MessageStoragePolicy *MessageStoragePolicy
330
331
332
333
334 RetentionDuration optional.Duration
335
336
337
338
339 SchemaSettings *SchemaSettings
340
341
342
343
344
345 IngestionDataSourceSettings *IngestionDataSourceSettings
346 }
347
348 func protoToTopicConfig(pbt *pb.Topic) TopicConfig {
349 tc := TopicConfig{
350 name: pbt.Name,
351 Labels: pbt.Labels,
352 MessageStoragePolicy: protoToMessageStoragePolicy(pbt.MessageStoragePolicy),
353 KMSKeyName: pbt.KmsKeyName,
354 SchemaSettings: protoToSchemaSettings(pbt.SchemaSettings),
355 State: TopicState(pbt.State),
356 IngestionDataSourceSettings: protoToIngestionDataSourceSettings(pbt.IngestionDataSourceSettings),
357 }
358 if pbt.GetMessageRetentionDuration() != nil {
359 tc.RetentionDuration = pbt.GetMessageRetentionDuration().AsDuration()
360 }
361 return tc
362 }
363
364
365
366 type DetachSubscriptionResult struct{}
367
368
369
370
371
372 func (c *Client) DetachSubscription(ctx context.Context, sub string) (*DetachSubscriptionResult, error) {
373 _, err := c.pubc.DetachSubscription(ctx, &pb.DetachSubscriptionRequest{
374 Subscription: sub,
375 })
376 if err != nil {
377 return nil, err
378 }
379 return &DetachSubscriptionResult{}, nil
380 }
381
382
383
384
385 type MessageStoragePolicy struct {
386
387
388
389
390
391
392
393
394
395
396
397
398
399 AllowedPersistenceRegions []string
400 }
401
402 func protoToMessageStoragePolicy(msp *pb.MessageStoragePolicy) MessageStoragePolicy {
403 if msp == nil {
404 return MessageStoragePolicy{}
405 }
406 return MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
407 }
408
409 func messageStoragePolicyToProto(msp *MessageStoragePolicy) *pb.MessageStoragePolicy {
410 if msp == nil || msp.AllowedPersistenceRegions == nil {
411 return nil
412 }
413 return &pb.MessageStoragePolicy{AllowedPersistenceRegions: msp.AllowedPersistenceRegions}
414 }
415
416
417 type IngestionDataSourceSettings struct {
418 Source IngestionDataSource
419 }
420
421
422 type IngestionDataSource interface {
423 isIngestionDataSource() bool
424 }
425
426
427 type AWSKinesisState int
428
429 const (
430
431 AWSKinesisStateUnspecified = iota
432
433
434 AWSKinesisStateActive
435
436
437
438
439
440
441
442
443
444
445 AWSKinesisStatePermissionDenied
446
447
448
449
450 AWSKinesisStatePublishPermissionDenied
451
452
453 AWSKinesisStateStreamNotFound
454
455
456 AWSKinesisStateConsumerNotFound
457 )
458
459
460 type IngestionDataSourceAWSKinesis struct {
461
462 State AWSKinesisState
463
464
465 StreamARN string
466
467
468
469 ConsumerARN string
470
471
472
473
474 AWSRoleARN string
475
476
477
478
479
480 GCPServiceAccount string
481 }
482
483 var _ IngestionDataSource = (*IngestionDataSourceAWSKinesis)(nil)
484
485 func (i *IngestionDataSourceAWSKinesis) isIngestionDataSource() bool {
486 return true
487 }
488
489 func protoToIngestionDataSourceSettings(pbs *pb.IngestionDataSourceSettings) *IngestionDataSourceSettings {
490 if pbs == nil {
491 return nil
492 }
493
494 s := &IngestionDataSourceSettings{}
495 if k := pbs.GetAwsKinesis(); k != nil {
496 s.Source = &IngestionDataSourceAWSKinesis{
497 State: AWSKinesisState(k.State),
498 StreamARN: k.GetStreamArn(),
499 ConsumerARN: k.GetConsumerArn(),
500 AWSRoleARN: k.GetAwsRoleArn(),
501 GCPServiceAccount: k.GetGcpServiceAccount(),
502 }
503 }
504 return s
505 }
506
507 func (i *IngestionDataSourceSettings) toProto() *pb.IngestionDataSourceSettings {
508 if i == nil {
509 return nil
510 }
511
512 if (IngestionDataSourceSettings{}) == *i {
513 return nil
514 }
515 pbs := &pb.IngestionDataSourceSettings{}
516 if out := i.Source; out != nil {
517 if k, ok := out.(*IngestionDataSourceAWSKinesis); ok {
518 pbs.Source = &pb.IngestionDataSourceSettings_AwsKinesis_{
519 AwsKinesis: &pb.IngestionDataSourceSettings_AwsKinesis{
520 State: pb.IngestionDataSourceSettings_AwsKinesis_State(k.State),
521 StreamArn: k.StreamARN,
522 ConsumerArn: k.ConsumerARN,
523 AwsRoleArn: k.AWSRoleARN,
524 GcpServiceAccount: k.GCPServiceAccount,
525 },
526 }
527 }
528 }
529 return pbs
530 }
531
532
533 func (t *Topic) Config(ctx context.Context) (TopicConfig, error) {
534 pbt, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
535 if err != nil {
536 return TopicConfig{}, err
537 }
538 return protoToTopicConfig(pbt), nil
539 }
540
541
542
543 func (t *Topic) Update(ctx context.Context, cfg TopicConfigToUpdate) (TopicConfig, error) {
544 req := t.updateRequest(cfg)
545 if len(req.UpdateMask.Paths) == 0 {
546 return TopicConfig{}, errors.New("pubsub: UpdateTopic call with nothing to update")
547 }
548 rpt, err := t.c.pubc.UpdateTopic(ctx, req)
549 if err != nil {
550 return TopicConfig{}, err
551 }
552 return protoToTopicConfig(rpt), nil
553 }
554
555 func (t *Topic) updateRequest(cfg TopicConfigToUpdate) *pb.UpdateTopicRequest {
556 pt := &pb.Topic{Name: t.name}
557 var paths []string
558 if cfg.Labels != nil {
559 pt.Labels = cfg.Labels
560 paths = append(paths, "labels")
561 }
562 if cfg.MessageStoragePolicy != nil {
563 pt.MessageStoragePolicy = messageStoragePolicyToProto(cfg.MessageStoragePolicy)
564 paths = append(paths, "message_storage_policy")
565 }
566 if cfg.RetentionDuration != nil {
567 r := optional.ToDuration(cfg.RetentionDuration)
568 pt.MessageRetentionDuration = durationpb.New(r)
569 if r < 0 {
570
571 pt.MessageRetentionDuration = nil
572 }
573 paths = append(paths, "message_retention_duration")
574 }
575
576
577
578
579
580 if cfg.SchemaSettings != nil {
581 pt.SchemaSettings = schemaSettingsToProto(cfg.SchemaSettings)
582 clearSchema := true
583 if pt.SchemaSettings.Schema != "" {
584 paths = append(paths, "schema_settings.schema")
585 clearSchema = false
586 }
587 if pt.SchemaSettings.Encoding != pb.Encoding_ENCODING_UNSPECIFIED {
588 paths = append(paths, "schema_settings.encoding")
589 clearSchema = false
590 }
591 if pt.SchemaSettings.FirstRevisionId != "" {
592 paths = append(paths, "schema_settings.first_revision_id")
593 clearSchema = false
594 }
595 if pt.SchemaSettings.LastRevisionId != "" {
596 paths = append(paths, "schema_settings.last_revision_id")
597 clearSchema = false
598 }
599
600 if clearSchema {
601 paths = append(paths, "schema_settings")
602 pt.SchemaSettings = nil
603 }
604 }
605 if cfg.IngestionDataSourceSettings != nil {
606 pt.IngestionDataSourceSettings = cfg.IngestionDataSourceSettings.toProto()
607 paths = append(paths, "ingestion_data_source_settings")
608 }
609 return &pb.UpdateTopicRequest{
610 Topic: pt,
611 UpdateMask: &fmpb.FieldMask{Paths: paths},
612 }
613 }
614
615
616 func (c *Client) Topics(ctx context.Context) *TopicIterator {
617 it := c.pubc.ListTopics(ctx, &pb.ListTopicsRequest{Project: c.fullyQualifiedProjectName()})
618 return &TopicIterator{
619 c: c,
620 it: it,
621 next: func() (string, error) {
622 topic, err := it.Next()
623 if err != nil {
624 return "", err
625 }
626 return topic.Name, nil
627 },
628 }
629 }
630
631
632 type TopicIterator struct {
633 c *Client
634 it *vkit.TopicIterator
635 next func() (string, error)
636 }
637
638
639 func (tps *TopicIterator) Next() (*Topic, error) {
640 topicName, err := tps.next()
641 if err != nil {
642 return nil, err
643 }
644 return newTopic(tps.c, topicName), nil
645 }
646
647
648
649
650
651 func (t *TopicIterator) NextConfig() (*TopicConfig, error) {
652 tpb, err := t.it.Next()
653 if err != nil {
654 return nil, err
655 }
656 cfg := protoToTopicConfig(tpb)
657 return &cfg, nil
658 }
659
660
661 func (t *Topic) ID() string {
662 slash := strings.LastIndex(t.name, "/")
663 if slash == -1 {
664
665 panic("bad topic name")
666 }
667 return t.name[slash+1:]
668 }
669
670
671 func (t *Topic) String() string {
672 return t.name
673 }
674
675
676 func (t *Topic) Delete(ctx context.Context) error {
677 return t.c.pubc.DeleteTopic(ctx, &pb.DeleteTopicRequest{Topic: t.name})
678 }
679
680
681 func (t *Topic) Exists(ctx context.Context) (bool, error) {
682 if t.name == "_deleted-topic_" {
683 return false, nil
684 }
685 _, err := t.c.pubc.GetTopic(ctx, &pb.GetTopicRequest{Topic: t.name})
686 if err == nil {
687 return true, nil
688 }
689 if status.Code(err) == codes.NotFound {
690 return false, nil
691 }
692 return false, err
693 }
694
695
696 func (t *Topic) IAM() *iam.Handle {
697 return iam.InternalNewHandle(t.c.pubc.Connection(), t.name)
698 }
699
700
701
702
703 func (t *Topic) Subscriptions(ctx context.Context) *SubscriptionIterator {
704 it := t.c.pubc.ListTopicSubscriptions(ctx, &pb.ListTopicSubscriptionsRequest{
705 Topic: t.name,
706 })
707 return &SubscriptionIterator{
708 c: t.c,
709 next: it.Next,
710 }
711 }
712
713
714 var ErrTopicStopped = errors.New("pubsub: Stop has been called for this topic")
715
716
717
718
719
720
721
722
723
724
725 type PublishResult = ipubsub.PublishResult
726
727 var errTopicOrderingNotEnabled = errors.New("Topic.EnableMessageOrdering=false, but an OrderingKey was set in Message. Please remove the OrderingKey or turn on Topic.EnableMessageOrdering")
728
729
730
731
732
733
734
735
736
737
738 func (t *Topic) Publish(ctx context.Context, msg *Message) *PublishResult {
739 ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
740 if err != nil {
741 log.Printf("pubsub: cannot create context with tag in Publish: %v", err)
742 }
743
744 r := ipubsub.NewPublishResult()
745 if !t.EnableMessageOrdering && msg.OrderingKey != "" {
746 ipubsub.SetPublishResult(r, "", errTopicOrderingNotEnabled)
747 return r
748 }
749
750
751
752 msgSize := proto.Size(&pb.PubsubMessage{
753 Data: msg.Data,
754 Attributes: msg.Attributes,
755 OrderingKey: msg.OrderingKey,
756 })
757
758 t.initBundler()
759 t.mu.RLock()
760 defer t.mu.RUnlock()
761 if t.stopped {
762 ipubsub.SetPublishResult(r, "", ErrTopicStopped)
763 return r
764 }
765
766 if err := t.flowController.acquire(ctx, msgSize); err != nil {
767 t.scheduler.Pause(msg.OrderingKey)
768 ipubsub.SetPublishResult(r, "", err)
769 return r
770 }
771 err = t.scheduler.Add(msg.OrderingKey, &bundledMessage{msg, r, msgSize}, msgSize)
772 if err != nil {
773 t.scheduler.Pause(msg.OrderingKey)
774 ipubsub.SetPublishResult(r, "", err)
775 }
776 return r
777 }
778
779
780
781
782 func (t *Topic) Stop() {
783 t.mu.Lock()
784 noop := t.stopped || t.scheduler == nil
785 t.stopped = true
786 t.mu.Unlock()
787 if noop {
788 return
789 }
790 t.scheduler.FlushAndStop()
791 }
792
793
794 func (t *Topic) Flush() {
795 if t.stopped || t.scheduler == nil {
796 return
797 }
798 t.scheduler.Flush()
799 }
800
801 type bundledMessage struct {
802 msg *Message
803 res *PublishResult
804 size int
805 }
806
807 func (t *Topic) initBundler() {
808 t.mu.RLock()
809 noop := t.stopped || t.scheduler != nil
810 t.mu.RUnlock()
811 if noop {
812 return
813 }
814 t.mu.Lock()
815 defer t.mu.Unlock()
816
817 if t.stopped || t.scheduler != nil {
818 return
819 }
820
821 timeout := t.PublishSettings.Timeout
822
823 workers := t.PublishSettings.NumGoroutines
824
825
826
827 if t.PublishSettings.NumGoroutines == 0 {
828 workers = 25 * runtime.GOMAXPROCS(0)
829 }
830
831 t.scheduler = scheduler.NewPublishScheduler(workers, func(bundle interface{}) {
832
833 ctx := context.TODO()
834 if timeout != 0 {
835 var cancel func()
836 ctx, cancel = context.WithTimeout(ctx, timeout)
837 defer cancel()
838 }
839 t.publishMessageBundle(ctx, bundle.([]*bundledMessage))
840 })
841 t.scheduler.DelayThreshold = t.PublishSettings.DelayThreshold
842 t.scheduler.BundleCountThreshold = t.PublishSettings.CountThreshold
843 if t.scheduler.BundleCountThreshold > MaxPublishRequestCount {
844 t.scheduler.BundleCountThreshold = MaxPublishRequestCount
845 }
846 t.scheduler.BundleByteThreshold = t.PublishSettings.ByteThreshold
847
848 fcs := DefaultPublishSettings.FlowControlSettings
849 fcs.LimitExceededBehavior = t.PublishSettings.FlowControlSettings.LimitExceededBehavior
850 if t.PublishSettings.FlowControlSettings.MaxOutstandingBytes > 0 {
851 b := t.PublishSettings.FlowControlSettings.MaxOutstandingBytes
852 fcs.MaxOutstandingBytes = b
853
854
855
856
857
858 t.PublishSettings.BufferedByteLimit = maxInt
859 }
860 if t.PublishSettings.FlowControlSettings.MaxOutstandingMessages > 0 {
861 fcs.MaxOutstandingMessages = t.PublishSettings.FlowControlSettings.MaxOutstandingMessages
862 }
863
864 t.flowController = newTopicFlowController(fcs)
865
866 bufferedByteLimit := DefaultPublishSettings.BufferedByteLimit
867 if t.PublishSettings.BufferedByteLimit > 0 {
868 bufferedByteLimit = t.PublishSettings.BufferedByteLimit
869 }
870 t.scheduler.BufferedByteLimit = bufferedByteLimit
871
872
873
874 t.scheduler.BundleByteLimit = MaxPublishRequestBytes - calcFieldSizeString(t.name) - 5
875 }
876
877
878 type ErrPublishingPaused struct {
879 OrderingKey string
880 }
881
882 func (e ErrPublishingPaused) Error() string {
883 return fmt.Sprintf("pubsub: Publishing for ordering key, %s, paused due to previous error. Call topic.ResumePublish(orderingKey) before resuming publishing", e.OrderingKey)
884
885 }
886
887 func (t *Topic) publishMessageBundle(ctx context.Context, bms []*bundledMessage) {
888 ctx, err := tag.New(ctx, tag.Insert(keyStatus, "OK"), tag.Upsert(keyTopic, t.name))
889 if err != nil {
890 log.Printf("pubsub: cannot create context with tag in publishMessageBundle: %v", err)
891 }
892 pbMsgs := make([]*pb.PubsubMessage, len(bms))
893 var orderingKey string
894 batchSize := 0
895 for i, bm := range bms {
896 orderingKey = bm.msg.OrderingKey
897 pbMsgs[i] = &pb.PubsubMessage{
898 Data: bm.msg.Data,
899 Attributes: bm.msg.Attributes,
900 OrderingKey: bm.msg.OrderingKey,
901 }
902 batchSize = batchSize + proto.Size(pbMsgs[i])
903 bm.msg = nil
904 }
905 var res *pb.PublishResponse
906 start := time.Now()
907 if orderingKey != "" && t.scheduler.IsPaused(orderingKey) {
908 err = ErrPublishingPaused{OrderingKey: orderingKey}
909 } else {
910
911
912 opts := t.c.pubc.CallOptions.Publish
913 var settings gax.CallSettings
914 for _, opt := range opts {
915 opt.Resolve(&settings)
916 }
917 r := &publishRetryer{defaultRetryer: settings.Retry()}
918 gaxOpts := []gax.CallOption{
919 gax.WithGRPCOptions(grpc.MaxCallSendMsgSize(maxSendRecvBytes)),
920 gax.WithRetry(func() gax.Retryer { return r }),
921 }
922 if t.PublishSettings.shouldCompress(batchSize) {
923 gaxOpts = append(gaxOpts, gax.WithGRPCOptions(grpc.UseCompressor(gzip.Name)))
924 }
925 res, err = t.c.pubc.Publish(ctx, &pb.PublishRequest{
926 Topic: t.name,
927 Messages: pbMsgs,
928 }, gaxOpts...)
929 }
930 end := time.Now()
931 if err != nil {
932 t.scheduler.Pause(orderingKey)
933
934
935 ctx, _ = tag.New(ctx, tag.Upsert(keyStatus, "ERROR"),
936 tag.Upsert(keyError, err.Error()))
937 }
938 stats.Record(ctx,
939 PublishLatency.M(float64(end.Sub(start)/time.Millisecond)),
940 PublishedMessages.M(int64(len(bms))))
941 for i, bm := range bms {
942 t.flowController.release(ctx, bm.size)
943 if err != nil {
944 ipubsub.SetPublishResult(bm.res, "", err)
945 } else {
946 ipubsub.SetPublishResult(bm.res, res.MessageIds[i], nil)
947 }
948 }
949 }
950
951
952
953
954
955 func (t *Topic) ResumePublish(orderingKey string) {
956 t.mu.RLock()
957 noop := t.scheduler == nil
958 t.mu.RUnlock()
959 if noop {
960 return
961 }
962
963 t.scheduler.Resume(orderingKey)
964 }
965
View as plain text