1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 package pubsub
18
19 import (
20 "bytes"
21 "context"
22 "fmt"
23 "io"
24 "math"
25 "net/http"
26 "net/url"
27 "time"
28
29 iampb "cloud.google.com/go/iam/apiv1/iampb"
30 pubsubpb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
31 gax "github.com/googleapis/gax-go/v2"
32 "google.golang.org/api/googleapi"
33 "google.golang.org/api/iterator"
34 "google.golang.org/api/option"
35 "google.golang.org/api/option/internaloption"
36 gtransport "google.golang.org/api/transport/grpc"
37 httptransport "google.golang.org/api/transport/http"
38 "google.golang.org/grpc"
39 "google.golang.org/grpc/codes"
40 "google.golang.org/protobuf/encoding/protojson"
41 "google.golang.org/protobuf/proto"
42 )
43
44 var newPublisherClientHook clientHook
45
46
47 type PublisherCallOptions struct {
48 CreateTopic []gax.CallOption
49 UpdateTopic []gax.CallOption
50 Publish []gax.CallOption
51 GetTopic []gax.CallOption
52 ListTopics []gax.CallOption
53 ListTopicSubscriptions []gax.CallOption
54 ListTopicSnapshots []gax.CallOption
55 DeleteTopic []gax.CallOption
56 DetachSubscription []gax.CallOption
57 GetIamPolicy []gax.CallOption
58 SetIamPolicy []gax.CallOption
59 TestIamPermissions []gax.CallOption
60 }
61
62 func defaultPublisherGRPCClientOptions() []option.ClientOption {
63 return []option.ClientOption{
64 internaloption.WithDefaultEndpoint("pubsub.googleapis.com:443"),
65 internaloption.WithDefaultEndpointTemplate("pubsub.UNIVERSE_DOMAIN:443"),
66 internaloption.WithDefaultMTLSEndpoint("pubsub.mtls.googleapis.com:443"),
67 internaloption.WithDefaultUniverseDomain("googleapis.com"),
68 internaloption.WithDefaultAudience("https://pubsub.googleapis.com/"),
69 internaloption.WithDefaultScopes(DefaultAuthScopes()...),
70 internaloption.EnableJwtWithScope(),
71 option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
72 grpc.MaxCallRecvMsgSize(math.MaxInt32))),
73 }
74 }
75
76 func defaultPublisherCallOptions() *PublisherCallOptions {
77 return &PublisherCallOptions{
78 CreateTopic: []gax.CallOption{
79 gax.WithTimeout(60000 * time.Millisecond),
80 gax.WithRetry(func() gax.Retryer {
81 return gax.OnCodes([]codes.Code{
82 codes.Unavailable,
83 }, gax.Backoff{
84 Initial: 100 * time.Millisecond,
85 Max: 60000 * time.Millisecond,
86 Multiplier: 1.30,
87 })
88 }),
89 },
90 UpdateTopic: []gax.CallOption{
91 gax.WithTimeout(60000 * time.Millisecond),
92 gax.WithRetry(func() gax.Retryer {
93 return gax.OnCodes([]codes.Code{
94 codes.Unavailable,
95 }, gax.Backoff{
96 Initial: 100 * time.Millisecond,
97 Max: 60000 * time.Millisecond,
98 Multiplier: 1.30,
99 })
100 }),
101 },
102 Publish: []gax.CallOption{
103 gax.WithTimeout(60000 * time.Millisecond),
104 gax.WithRetry(func() gax.Retryer {
105 return gax.OnCodes([]codes.Code{
106 codes.Aborted,
107 codes.Canceled,
108 codes.Internal,
109 codes.ResourceExhausted,
110 codes.Unknown,
111 codes.Unavailable,
112 codes.DeadlineExceeded,
113 }, gax.Backoff{
114 Initial: 100 * time.Millisecond,
115 Max: 60000 * time.Millisecond,
116 Multiplier: 4.00,
117 })
118 }),
119 },
120 GetTopic: []gax.CallOption{
121 gax.WithTimeout(60000 * time.Millisecond),
122 gax.WithRetry(func() gax.Retryer {
123 return gax.OnCodes([]codes.Code{
124 codes.Unknown,
125 codes.Aborted,
126 codes.Unavailable,
127 }, gax.Backoff{
128 Initial: 100 * time.Millisecond,
129 Max: 60000 * time.Millisecond,
130 Multiplier: 1.30,
131 })
132 }),
133 },
134 ListTopics: []gax.CallOption{
135 gax.WithTimeout(60000 * time.Millisecond),
136 gax.WithRetry(func() gax.Retryer {
137 return gax.OnCodes([]codes.Code{
138 codes.Unknown,
139 codes.Aborted,
140 codes.Unavailable,
141 }, gax.Backoff{
142 Initial: 100 * time.Millisecond,
143 Max: 60000 * time.Millisecond,
144 Multiplier: 1.30,
145 })
146 }),
147 },
148 ListTopicSubscriptions: []gax.CallOption{
149 gax.WithTimeout(60000 * time.Millisecond),
150 gax.WithRetry(func() gax.Retryer {
151 return gax.OnCodes([]codes.Code{
152 codes.Unknown,
153 codes.Aborted,
154 codes.Unavailable,
155 }, gax.Backoff{
156 Initial: 100 * time.Millisecond,
157 Max: 60000 * time.Millisecond,
158 Multiplier: 1.30,
159 })
160 }),
161 },
162 ListTopicSnapshots: []gax.CallOption{
163 gax.WithTimeout(60000 * time.Millisecond),
164 gax.WithRetry(func() gax.Retryer {
165 return gax.OnCodes([]codes.Code{
166 codes.Unknown,
167 codes.Aborted,
168 codes.Unavailable,
169 }, gax.Backoff{
170 Initial: 100 * time.Millisecond,
171 Max: 60000 * time.Millisecond,
172 Multiplier: 1.30,
173 })
174 }),
175 },
176 DeleteTopic: []gax.CallOption{
177 gax.WithTimeout(60000 * time.Millisecond),
178 gax.WithRetry(func() gax.Retryer {
179 return gax.OnCodes([]codes.Code{
180 codes.Unavailable,
181 }, gax.Backoff{
182 Initial: 100 * time.Millisecond,
183 Max: 60000 * time.Millisecond,
184 Multiplier: 1.30,
185 })
186 }),
187 },
188 DetachSubscription: []gax.CallOption{
189 gax.WithTimeout(60000 * time.Millisecond),
190 gax.WithRetry(func() gax.Retryer {
191 return gax.OnCodes([]codes.Code{
192 codes.Unavailable,
193 }, gax.Backoff{
194 Initial: 100 * time.Millisecond,
195 Max: 60000 * time.Millisecond,
196 Multiplier: 1.30,
197 })
198 }),
199 },
200 GetIamPolicy: []gax.CallOption{},
201 SetIamPolicy: []gax.CallOption{},
202 TestIamPermissions: []gax.CallOption{},
203 }
204 }
205
206 func defaultPublisherRESTCallOptions() *PublisherCallOptions {
207 return &PublisherCallOptions{
208 CreateTopic: []gax.CallOption{
209 gax.WithTimeout(60000 * time.Millisecond),
210 gax.WithRetry(func() gax.Retryer {
211 return gax.OnHTTPCodes(gax.Backoff{
212 Initial: 100 * time.Millisecond,
213 Max: 60000 * time.Millisecond,
214 Multiplier: 1.30,
215 },
216 http.StatusServiceUnavailable)
217 }),
218 },
219 UpdateTopic: []gax.CallOption{
220 gax.WithTimeout(60000 * time.Millisecond),
221 gax.WithRetry(func() gax.Retryer {
222 return gax.OnHTTPCodes(gax.Backoff{
223 Initial: 100 * time.Millisecond,
224 Max: 60000 * time.Millisecond,
225 Multiplier: 1.30,
226 },
227 http.StatusServiceUnavailable)
228 }),
229 },
230 Publish: []gax.CallOption{
231 gax.WithTimeout(60000 * time.Millisecond),
232 gax.WithRetry(func() gax.Retryer {
233 return gax.OnHTTPCodes(gax.Backoff{
234 Initial: 100 * time.Millisecond,
235 Max: 60000 * time.Millisecond,
236 Multiplier: 4.00,
237 },
238 http.StatusConflict,
239 499,
240 http.StatusInternalServerError,
241 http.StatusTooManyRequests,
242 http.StatusInternalServerError,
243 http.StatusServiceUnavailable,
244 http.StatusGatewayTimeout)
245 }),
246 },
247 GetTopic: []gax.CallOption{
248 gax.WithTimeout(60000 * time.Millisecond),
249 gax.WithRetry(func() gax.Retryer {
250 return gax.OnHTTPCodes(gax.Backoff{
251 Initial: 100 * time.Millisecond,
252 Max: 60000 * time.Millisecond,
253 Multiplier: 1.30,
254 },
255 http.StatusInternalServerError,
256 http.StatusConflict,
257 http.StatusServiceUnavailable)
258 }),
259 },
260 ListTopics: []gax.CallOption{
261 gax.WithTimeout(60000 * time.Millisecond),
262 gax.WithRetry(func() gax.Retryer {
263 return gax.OnHTTPCodes(gax.Backoff{
264 Initial: 100 * time.Millisecond,
265 Max: 60000 * time.Millisecond,
266 Multiplier: 1.30,
267 },
268 http.StatusInternalServerError,
269 http.StatusConflict,
270 http.StatusServiceUnavailable)
271 }),
272 },
273 ListTopicSubscriptions: []gax.CallOption{
274 gax.WithTimeout(60000 * time.Millisecond),
275 gax.WithRetry(func() gax.Retryer {
276 return gax.OnHTTPCodes(gax.Backoff{
277 Initial: 100 * time.Millisecond,
278 Max: 60000 * time.Millisecond,
279 Multiplier: 1.30,
280 },
281 http.StatusInternalServerError,
282 http.StatusConflict,
283 http.StatusServiceUnavailable)
284 }),
285 },
286 ListTopicSnapshots: []gax.CallOption{
287 gax.WithTimeout(60000 * time.Millisecond),
288 gax.WithRetry(func() gax.Retryer {
289 return gax.OnHTTPCodes(gax.Backoff{
290 Initial: 100 * time.Millisecond,
291 Max: 60000 * time.Millisecond,
292 Multiplier: 1.30,
293 },
294 http.StatusInternalServerError,
295 http.StatusConflict,
296 http.StatusServiceUnavailable)
297 }),
298 },
299 DeleteTopic: []gax.CallOption{
300 gax.WithTimeout(60000 * time.Millisecond),
301 gax.WithRetry(func() gax.Retryer {
302 return gax.OnHTTPCodes(gax.Backoff{
303 Initial: 100 * time.Millisecond,
304 Max: 60000 * time.Millisecond,
305 Multiplier: 1.30,
306 },
307 http.StatusServiceUnavailable)
308 }),
309 },
310 DetachSubscription: []gax.CallOption{
311 gax.WithTimeout(60000 * time.Millisecond),
312 gax.WithRetry(func() gax.Retryer {
313 return gax.OnHTTPCodes(gax.Backoff{
314 Initial: 100 * time.Millisecond,
315 Max: 60000 * time.Millisecond,
316 Multiplier: 1.30,
317 },
318 http.StatusServiceUnavailable)
319 }),
320 },
321 GetIamPolicy: []gax.CallOption{},
322 SetIamPolicy: []gax.CallOption{},
323 TestIamPermissions: []gax.CallOption{},
324 }
325 }
326
327
328 type internalPublisherClient interface {
329 Close() error
330 setGoogleClientInfo(...string)
331 Connection() *grpc.ClientConn
332 CreateTopic(context.Context, *pubsubpb.Topic, ...gax.CallOption) (*pubsubpb.Topic, error)
333 UpdateTopic(context.Context, *pubsubpb.UpdateTopicRequest, ...gax.CallOption) (*pubsubpb.Topic, error)
334 Publish(context.Context, *pubsubpb.PublishRequest, ...gax.CallOption) (*pubsubpb.PublishResponse, error)
335 GetTopic(context.Context, *pubsubpb.GetTopicRequest, ...gax.CallOption) (*pubsubpb.Topic, error)
336 ListTopics(context.Context, *pubsubpb.ListTopicsRequest, ...gax.CallOption) *TopicIterator
337 ListTopicSubscriptions(context.Context, *pubsubpb.ListTopicSubscriptionsRequest, ...gax.CallOption) *StringIterator
338 ListTopicSnapshots(context.Context, *pubsubpb.ListTopicSnapshotsRequest, ...gax.CallOption) *StringIterator
339 DeleteTopic(context.Context, *pubsubpb.DeleteTopicRequest, ...gax.CallOption) error
340 DetachSubscription(context.Context, *pubsubpb.DetachSubscriptionRequest, ...gax.CallOption) (*pubsubpb.DetachSubscriptionResponse, error)
341 GetIamPolicy(context.Context, *iampb.GetIamPolicyRequest, ...gax.CallOption) (*iampb.Policy, error)
342 SetIamPolicy(context.Context, *iampb.SetIamPolicyRequest, ...gax.CallOption) (*iampb.Policy, error)
343 TestIamPermissions(context.Context, *iampb.TestIamPermissionsRequest, ...gax.CallOption) (*iampb.TestIamPermissionsResponse, error)
344 }
345
346
347
348
349
350
351 type PublisherClient struct {
352
353 internalClient internalPublisherClient
354
355
356 CallOptions *PublisherCallOptions
357 }
358
359
360
361
362
363 func (c *PublisherClient) Close() error {
364 return c.internalClient.Close()
365 }
366
367
368
369
370 func (c *PublisherClient) setGoogleClientInfo(keyval ...string) {
371 c.internalClient.setGoogleClientInfo(keyval...)
372 }
373
374
375
376
377
378 func (c *PublisherClient) Connection() *grpc.ClientConn {
379 return c.internalClient.Connection()
380 }
381
382
383
384 func (c *PublisherClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
385 return c.internalClient.CreateTopic(ctx, req, opts...)
386 }
387
388
389
390 func (c *PublisherClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
391 return c.internalClient.UpdateTopic(ctx, req, opts...)
392 }
393
394
395
396 func (c *PublisherClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) {
397 return c.internalClient.Publish(ctx, req, opts...)
398 }
399
400
401 func (c *PublisherClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
402 return c.internalClient.GetTopic(ctx, req, opts...)
403 }
404
405
406 func (c *PublisherClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator {
407 return c.internalClient.ListTopics(ctx, req, opts...)
408 }
409
410
411 func (c *PublisherClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator {
412 return c.internalClient.ListTopicSubscriptions(ctx, req, opts...)
413 }
414
415
416
417
418
419
420 func (c *PublisherClient) ListTopicSnapshots(ctx context.Context, req *pubsubpb.ListTopicSnapshotsRequest, opts ...gax.CallOption) *StringIterator {
421 return c.internalClient.ListTopicSnapshots(ctx, req, opts...)
422 }
423
424
425
426
427
428
429 func (c *PublisherClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error {
430 return c.internalClient.DeleteTopic(ctx, req, opts...)
431 }
432
433
434
435
436
437 func (c *PublisherClient) DetachSubscription(ctx context.Context, req *pubsubpb.DetachSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.DetachSubscriptionResponse, error) {
438 return c.internalClient.DetachSubscription(ctx, req, opts...)
439 }
440
441
442
443 func (c *PublisherClient) GetIamPolicy(ctx context.Context, req *iampb.GetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error) {
444 return c.internalClient.GetIamPolicy(ctx, req, opts...)
445 }
446
447
448
449
450
451
452 func (c *PublisherClient) SetIamPolicy(ctx context.Context, req *iampb.SetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error) {
453 return c.internalClient.SetIamPolicy(ctx, req, opts...)
454 }
455
456
457
458
459
460
461
462
463 func (c *PublisherClient) TestIamPermissions(ctx context.Context, req *iampb.TestIamPermissionsRequest, opts ...gax.CallOption) (*iampb.TestIamPermissionsResponse, error) {
464 return c.internalClient.TestIamPermissions(ctx, req, opts...)
465 }
466
467
468
469
470 type publisherGRPCClient struct {
471
472 connPool gtransport.ConnPool
473
474
475 CallOptions **PublisherCallOptions
476
477
478 publisherClient pubsubpb.PublisherClient
479
480 iamPolicyClient iampb.IAMPolicyClient
481
482
483 xGoogHeaders []string
484 }
485
486
487
488
489
490
491 func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error) {
492 clientOpts := defaultPublisherGRPCClientOptions()
493 if newPublisherClientHook != nil {
494 hookOpts, err := newPublisherClientHook(ctx, clientHookParams{})
495 if err != nil {
496 return nil, err
497 }
498 clientOpts = append(clientOpts, hookOpts...)
499 }
500
501 connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
502 if err != nil {
503 return nil, err
504 }
505 client := PublisherClient{CallOptions: defaultPublisherCallOptions()}
506
507 c := &publisherGRPCClient{
508 connPool: connPool,
509 publisherClient: pubsubpb.NewPublisherClient(connPool),
510 CallOptions: &client.CallOptions,
511 iamPolicyClient: iampb.NewIAMPolicyClient(connPool),
512 }
513 c.setGoogleClientInfo()
514
515 client.internalClient = c
516
517 return &client, nil
518 }
519
520
521
522
523
524 func (c *publisherGRPCClient) Connection() *grpc.ClientConn {
525 return c.connPool.Conn()
526 }
527
528
529
530
531 func (c *publisherGRPCClient) setGoogleClientInfo(keyval ...string) {
532 kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
533 kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "grpc", grpc.Version)
534 c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
535 }
536
537
538
539 func (c *publisherGRPCClient) Close() error {
540 return c.connPool.Close()
541 }
542
543
544 type publisherRESTClient struct {
545
546 endpoint string
547
548
549 httpClient *http.Client
550
551
552 xGoogHeaders []string
553
554
555 CallOptions **PublisherCallOptions
556 }
557
558
559
560
561
562 func NewPublisherRESTClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error) {
563 clientOpts := append(defaultPublisherRESTClientOptions(), opts...)
564 httpClient, endpoint, err := httptransport.NewClient(ctx, clientOpts...)
565 if err != nil {
566 return nil, err
567 }
568
569 callOpts := defaultPublisherRESTCallOptions()
570 c := &publisherRESTClient{
571 endpoint: endpoint,
572 httpClient: httpClient,
573 CallOptions: &callOpts,
574 }
575 c.setGoogleClientInfo()
576
577 return &PublisherClient{internalClient: c, CallOptions: callOpts}, nil
578 }
579
580 func defaultPublisherRESTClientOptions() []option.ClientOption {
581 return []option.ClientOption{
582 internaloption.WithDefaultEndpoint("https://pubsub.googleapis.com"),
583 internaloption.WithDefaultEndpointTemplate("https://pubsub.UNIVERSE_DOMAIN"),
584 internaloption.WithDefaultMTLSEndpoint("https://pubsub.mtls.googleapis.com"),
585 internaloption.WithDefaultUniverseDomain("googleapis.com"),
586 internaloption.WithDefaultAudience("https://pubsub.googleapis.com/"),
587 internaloption.WithDefaultScopes(DefaultAuthScopes()...),
588 }
589 }
590
591
592
593
594 func (c *publisherRESTClient) setGoogleClientInfo(keyval ...string) {
595 kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
596 kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "rest", "UNKNOWN")
597 c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
598 }
599
600
601
602 func (c *publisherRESTClient) Close() error {
603
604 c.httpClient = nil
605 return nil
606 }
607
608
609
610
611 func (c *publisherRESTClient) Connection() *grpc.ClientConn {
612 return nil
613 }
614 func (c *publisherGRPCClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
615 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
616
617 hds = append(c.xGoogHeaders, hds...)
618 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
619 opts = append((*c.CallOptions).CreateTopic[0:len((*c.CallOptions).CreateTopic):len((*c.CallOptions).CreateTopic)], opts...)
620 var resp *pubsubpb.Topic
621 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
622 var err error
623 resp, err = c.publisherClient.CreateTopic(ctx, req, settings.GRPC...)
624 return err
625 }, opts...)
626 if err != nil {
627 return nil, err
628 }
629 return resp, nil
630 }
631
632 func (c *publisherGRPCClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
633 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic.name", url.QueryEscape(req.GetTopic().GetName()))}
634
635 hds = append(c.xGoogHeaders, hds...)
636 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
637 opts = append((*c.CallOptions).UpdateTopic[0:len((*c.CallOptions).UpdateTopic):len((*c.CallOptions).UpdateTopic)], opts...)
638 var resp *pubsubpb.Topic
639 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
640 var err error
641 resp, err = c.publisherClient.UpdateTopic(ctx, req, settings.GRPC...)
642 return err
643 }, opts...)
644 if err != nil {
645 return nil, err
646 }
647 return resp, nil
648 }
649
650 func (c *publisherGRPCClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) {
651 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
652
653 hds = append(c.xGoogHeaders, hds...)
654 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
655 opts = append((*c.CallOptions).Publish[0:len((*c.CallOptions).Publish):len((*c.CallOptions).Publish)], opts...)
656 var resp *pubsubpb.PublishResponse
657 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
658 var err error
659 resp, err = c.publisherClient.Publish(ctx, req, settings.GRPC...)
660 return err
661 }, opts...)
662 if err != nil {
663 return nil, err
664 }
665 return resp, nil
666 }
667
668 func (c *publisherGRPCClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
669 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
670
671 hds = append(c.xGoogHeaders, hds...)
672 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
673 opts = append((*c.CallOptions).GetTopic[0:len((*c.CallOptions).GetTopic):len((*c.CallOptions).GetTopic)], opts...)
674 var resp *pubsubpb.Topic
675 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
676 var err error
677 resp, err = c.publisherClient.GetTopic(ctx, req, settings.GRPC...)
678 return err
679 }, opts...)
680 if err != nil {
681 return nil, err
682 }
683 return resp, nil
684 }
685
686 func (c *publisherGRPCClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator {
687 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "project", url.QueryEscape(req.GetProject()))}
688
689 hds = append(c.xGoogHeaders, hds...)
690 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
691 opts = append((*c.CallOptions).ListTopics[0:len((*c.CallOptions).ListTopics):len((*c.CallOptions).ListTopics)], opts...)
692 it := &TopicIterator{}
693 req = proto.Clone(req).(*pubsubpb.ListTopicsRequest)
694 it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Topic, string, error) {
695 resp := &pubsubpb.ListTopicsResponse{}
696 if pageToken != "" {
697 req.PageToken = pageToken
698 }
699 if pageSize > math.MaxInt32 {
700 req.PageSize = math.MaxInt32
701 } else if pageSize != 0 {
702 req.PageSize = int32(pageSize)
703 }
704 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
705 var err error
706 resp, err = c.publisherClient.ListTopics(ctx, req, settings.GRPC...)
707 return err
708 }, opts...)
709 if err != nil {
710 return nil, "", err
711 }
712
713 it.Response = resp
714 return resp.GetTopics(), resp.GetNextPageToken(), nil
715 }
716 fetch := func(pageSize int, pageToken string) (string, error) {
717 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
718 if err != nil {
719 return "", err
720 }
721 it.items = append(it.items, items...)
722 return nextPageToken, nil
723 }
724
725 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
726 it.pageInfo.MaxSize = int(req.GetPageSize())
727 it.pageInfo.Token = req.GetPageToken()
728
729 return it
730 }
731
732 func (c *publisherGRPCClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator {
733 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
734
735 hds = append(c.xGoogHeaders, hds...)
736 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
737 opts = append((*c.CallOptions).ListTopicSubscriptions[0:len((*c.CallOptions).ListTopicSubscriptions):len((*c.CallOptions).ListTopicSubscriptions)], opts...)
738 it := &StringIterator{}
739 req = proto.Clone(req).(*pubsubpb.ListTopicSubscriptionsRequest)
740 it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
741 resp := &pubsubpb.ListTopicSubscriptionsResponse{}
742 if pageToken != "" {
743 req.PageToken = pageToken
744 }
745 if pageSize > math.MaxInt32 {
746 req.PageSize = math.MaxInt32
747 } else if pageSize != 0 {
748 req.PageSize = int32(pageSize)
749 }
750 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
751 var err error
752 resp, err = c.publisherClient.ListTopicSubscriptions(ctx, req, settings.GRPC...)
753 return err
754 }, opts...)
755 if err != nil {
756 return nil, "", err
757 }
758
759 it.Response = resp
760 return resp.GetSubscriptions(), resp.GetNextPageToken(), nil
761 }
762 fetch := func(pageSize int, pageToken string) (string, error) {
763 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
764 if err != nil {
765 return "", err
766 }
767 it.items = append(it.items, items...)
768 return nextPageToken, nil
769 }
770
771 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
772 it.pageInfo.MaxSize = int(req.GetPageSize())
773 it.pageInfo.Token = req.GetPageToken()
774
775 return it
776 }
777
778 func (c *publisherGRPCClient) ListTopicSnapshots(ctx context.Context, req *pubsubpb.ListTopicSnapshotsRequest, opts ...gax.CallOption) *StringIterator {
779 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
780
781 hds = append(c.xGoogHeaders, hds...)
782 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
783 opts = append((*c.CallOptions).ListTopicSnapshots[0:len((*c.CallOptions).ListTopicSnapshots):len((*c.CallOptions).ListTopicSnapshots)], opts...)
784 it := &StringIterator{}
785 req = proto.Clone(req).(*pubsubpb.ListTopicSnapshotsRequest)
786 it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
787 resp := &pubsubpb.ListTopicSnapshotsResponse{}
788 if pageToken != "" {
789 req.PageToken = pageToken
790 }
791 if pageSize > math.MaxInt32 {
792 req.PageSize = math.MaxInt32
793 } else if pageSize != 0 {
794 req.PageSize = int32(pageSize)
795 }
796 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
797 var err error
798 resp, err = c.publisherClient.ListTopicSnapshots(ctx, req, settings.GRPC...)
799 return err
800 }, opts...)
801 if err != nil {
802 return nil, "", err
803 }
804
805 it.Response = resp
806 return resp.GetSnapshots(), resp.GetNextPageToken(), nil
807 }
808 fetch := func(pageSize int, pageToken string) (string, error) {
809 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
810 if err != nil {
811 return "", err
812 }
813 it.items = append(it.items, items...)
814 return nextPageToken, nil
815 }
816
817 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
818 it.pageInfo.MaxSize = int(req.GetPageSize())
819 it.pageInfo.Token = req.GetPageToken()
820
821 return it
822 }
823
824 func (c *publisherGRPCClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error {
825 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
826
827 hds = append(c.xGoogHeaders, hds...)
828 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
829 opts = append((*c.CallOptions).DeleteTopic[0:len((*c.CallOptions).DeleteTopic):len((*c.CallOptions).DeleteTopic)], opts...)
830 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
831 var err error
832 _, err = c.publisherClient.DeleteTopic(ctx, req, settings.GRPC...)
833 return err
834 }, opts...)
835 return err
836 }
837
838 func (c *publisherGRPCClient) DetachSubscription(ctx context.Context, req *pubsubpb.DetachSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.DetachSubscriptionResponse, error) {
839 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription()))}
840
841 hds = append(c.xGoogHeaders, hds...)
842 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
843 opts = append((*c.CallOptions).DetachSubscription[0:len((*c.CallOptions).DetachSubscription):len((*c.CallOptions).DetachSubscription)], opts...)
844 var resp *pubsubpb.DetachSubscriptionResponse
845 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
846 var err error
847 resp, err = c.publisherClient.DetachSubscription(ctx, req, settings.GRPC...)
848 return err
849 }, opts...)
850 if err != nil {
851 return nil, err
852 }
853 return resp, nil
854 }
855
856 func (c *publisherGRPCClient) GetIamPolicy(ctx context.Context, req *iampb.GetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error) {
857 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "resource", url.QueryEscape(req.GetResource()))}
858
859 hds = append(c.xGoogHeaders, hds...)
860 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
861 opts = append((*c.CallOptions).GetIamPolicy[0:len((*c.CallOptions).GetIamPolicy):len((*c.CallOptions).GetIamPolicy)], opts...)
862 var resp *iampb.Policy
863 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
864 var err error
865 resp, err = c.iamPolicyClient.GetIamPolicy(ctx, req, settings.GRPC...)
866 return err
867 }, opts...)
868 if err != nil {
869 return nil, err
870 }
871 return resp, nil
872 }
873
874 func (c *publisherGRPCClient) SetIamPolicy(ctx context.Context, req *iampb.SetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error) {
875 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "resource", url.QueryEscape(req.GetResource()))}
876
877 hds = append(c.xGoogHeaders, hds...)
878 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
879 opts = append((*c.CallOptions).SetIamPolicy[0:len((*c.CallOptions).SetIamPolicy):len((*c.CallOptions).SetIamPolicy)], opts...)
880 var resp *iampb.Policy
881 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
882 var err error
883 resp, err = c.iamPolicyClient.SetIamPolicy(ctx, req, settings.GRPC...)
884 return err
885 }, opts...)
886 if err != nil {
887 return nil, err
888 }
889 return resp, nil
890 }
891
892 func (c *publisherGRPCClient) TestIamPermissions(ctx context.Context, req *iampb.TestIamPermissionsRequest, opts ...gax.CallOption) (*iampb.TestIamPermissionsResponse, error) {
893 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "resource", url.QueryEscape(req.GetResource()))}
894
895 hds = append(c.xGoogHeaders, hds...)
896 ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
897 opts = append((*c.CallOptions).TestIamPermissions[0:len((*c.CallOptions).TestIamPermissions):len((*c.CallOptions).TestIamPermissions)], opts...)
898 var resp *iampb.TestIamPermissionsResponse
899 err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
900 var err error
901 resp, err = c.iamPolicyClient.TestIamPermissions(ctx, req, settings.GRPC...)
902 return err
903 }, opts...)
904 if err != nil {
905 return nil, err
906 }
907 return resp, nil
908 }
909
910
911
912 func (c *publisherRESTClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
913 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
914 jsonReq, err := m.Marshal(req)
915 if err != nil {
916 return nil, err
917 }
918
919 baseUrl, err := url.Parse(c.endpoint)
920 if err != nil {
921 return nil, err
922 }
923 baseUrl.Path += fmt.Sprintf("/v1/%v", req.GetName())
924
925 params := url.Values{}
926 params.Add("$alt", "json;enum-encoding=int")
927
928 baseUrl.RawQuery = params.Encode()
929
930
931 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
932
933 hds = append(c.xGoogHeaders, hds...)
934 hds = append(hds, "Content-Type", "application/json")
935 headers := gax.BuildHeaders(ctx, hds...)
936 opts = append((*c.CallOptions).CreateTopic[0:len((*c.CallOptions).CreateTopic):len((*c.CallOptions).CreateTopic)], opts...)
937 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
938 resp := &pubsubpb.Topic{}
939 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
940 if settings.Path != "" {
941 baseUrl.Path = settings.Path
942 }
943 httpReq, err := http.NewRequest("PUT", baseUrl.String(), bytes.NewReader(jsonReq))
944 if err != nil {
945 return err
946 }
947 httpReq = httpReq.WithContext(ctx)
948 httpReq.Header = headers
949
950 httpRsp, err := c.httpClient.Do(httpReq)
951 if err != nil {
952 return err
953 }
954 defer httpRsp.Body.Close()
955
956 if err = googleapi.CheckResponse(httpRsp); err != nil {
957 return err
958 }
959
960 buf, err := io.ReadAll(httpRsp.Body)
961 if err != nil {
962 return err
963 }
964
965 if err := unm.Unmarshal(buf, resp); err != nil {
966 return err
967 }
968
969 return nil
970 }, opts...)
971 if e != nil {
972 return nil, e
973 }
974 return resp, nil
975 }
976
977
978
979 func (c *publisherRESTClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
980 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
981 jsonReq, err := m.Marshal(req)
982 if err != nil {
983 return nil, err
984 }
985
986 baseUrl, err := url.Parse(c.endpoint)
987 if err != nil {
988 return nil, err
989 }
990 baseUrl.Path += fmt.Sprintf("/v1/%v", req.GetTopic().GetName())
991
992 params := url.Values{}
993 params.Add("$alt", "json;enum-encoding=int")
994
995 baseUrl.RawQuery = params.Encode()
996
997
998 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic.name", url.QueryEscape(req.GetTopic().GetName()))}
999
1000 hds = append(c.xGoogHeaders, hds...)
1001 hds = append(hds, "Content-Type", "application/json")
1002 headers := gax.BuildHeaders(ctx, hds...)
1003 opts = append((*c.CallOptions).UpdateTopic[0:len((*c.CallOptions).UpdateTopic):len((*c.CallOptions).UpdateTopic)], opts...)
1004 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
1005 resp := &pubsubpb.Topic{}
1006 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
1007 if settings.Path != "" {
1008 baseUrl.Path = settings.Path
1009 }
1010 httpReq, err := http.NewRequest("PATCH", baseUrl.String(), bytes.NewReader(jsonReq))
1011 if err != nil {
1012 return err
1013 }
1014 httpReq = httpReq.WithContext(ctx)
1015 httpReq.Header = headers
1016
1017 httpRsp, err := c.httpClient.Do(httpReq)
1018 if err != nil {
1019 return err
1020 }
1021 defer httpRsp.Body.Close()
1022
1023 if err = googleapi.CheckResponse(httpRsp); err != nil {
1024 return err
1025 }
1026
1027 buf, err := io.ReadAll(httpRsp.Body)
1028 if err != nil {
1029 return err
1030 }
1031
1032 if err := unm.Unmarshal(buf, resp); err != nil {
1033 return err
1034 }
1035
1036 return nil
1037 }, opts...)
1038 if e != nil {
1039 return nil, e
1040 }
1041 return resp, nil
1042 }
1043
1044
1045
1046 func (c *publisherRESTClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) {
1047 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
1048 jsonReq, err := m.Marshal(req)
1049 if err != nil {
1050 return nil, err
1051 }
1052
1053 baseUrl, err := url.Parse(c.endpoint)
1054 if err != nil {
1055 return nil, err
1056 }
1057 baseUrl.Path += fmt.Sprintf("/v1/%v:publish", req.GetTopic())
1058
1059 params := url.Values{}
1060 params.Add("$alt", "json;enum-encoding=int")
1061
1062 baseUrl.RawQuery = params.Encode()
1063
1064
1065 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
1066
1067 hds = append(c.xGoogHeaders, hds...)
1068 hds = append(hds, "Content-Type", "application/json")
1069 headers := gax.BuildHeaders(ctx, hds...)
1070 opts = append((*c.CallOptions).Publish[0:len((*c.CallOptions).Publish):len((*c.CallOptions).Publish)], opts...)
1071 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
1072 resp := &pubsubpb.PublishResponse{}
1073 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
1074 if settings.Path != "" {
1075 baseUrl.Path = settings.Path
1076 }
1077 httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
1078 if err != nil {
1079 return err
1080 }
1081 httpReq = httpReq.WithContext(ctx)
1082 httpReq.Header = headers
1083
1084 httpRsp, err := c.httpClient.Do(httpReq)
1085 if err != nil {
1086 return err
1087 }
1088 defer httpRsp.Body.Close()
1089
1090 if err = googleapi.CheckResponse(httpRsp); err != nil {
1091 return err
1092 }
1093
1094 buf, err := io.ReadAll(httpRsp.Body)
1095 if err != nil {
1096 return err
1097 }
1098
1099 if err := unm.Unmarshal(buf, resp); err != nil {
1100 return err
1101 }
1102
1103 return nil
1104 }, opts...)
1105 if e != nil {
1106 return nil, e
1107 }
1108 return resp, nil
1109 }
1110
1111
1112 func (c *publisherRESTClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
1113 baseUrl, err := url.Parse(c.endpoint)
1114 if err != nil {
1115 return nil, err
1116 }
1117 baseUrl.Path += fmt.Sprintf("/v1/%v", req.GetTopic())
1118
1119 params := url.Values{}
1120 params.Add("$alt", "json;enum-encoding=int")
1121
1122 baseUrl.RawQuery = params.Encode()
1123
1124
1125 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
1126
1127 hds = append(c.xGoogHeaders, hds...)
1128 hds = append(hds, "Content-Type", "application/json")
1129 headers := gax.BuildHeaders(ctx, hds...)
1130 opts = append((*c.CallOptions).GetTopic[0:len((*c.CallOptions).GetTopic):len((*c.CallOptions).GetTopic)], opts...)
1131 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
1132 resp := &pubsubpb.Topic{}
1133 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
1134 if settings.Path != "" {
1135 baseUrl.Path = settings.Path
1136 }
1137 httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
1138 if err != nil {
1139 return err
1140 }
1141 httpReq = httpReq.WithContext(ctx)
1142 httpReq.Header = headers
1143
1144 httpRsp, err := c.httpClient.Do(httpReq)
1145 if err != nil {
1146 return err
1147 }
1148 defer httpRsp.Body.Close()
1149
1150 if err = googleapi.CheckResponse(httpRsp); err != nil {
1151 return err
1152 }
1153
1154 buf, err := io.ReadAll(httpRsp.Body)
1155 if err != nil {
1156 return err
1157 }
1158
1159 if err := unm.Unmarshal(buf, resp); err != nil {
1160 return err
1161 }
1162
1163 return nil
1164 }, opts...)
1165 if e != nil {
1166 return nil, e
1167 }
1168 return resp, nil
1169 }
1170
1171
1172 func (c *publisherRESTClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator {
1173 it := &TopicIterator{}
1174 req = proto.Clone(req).(*pubsubpb.ListTopicsRequest)
1175 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
1176 it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Topic, string, error) {
1177 resp := &pubsubpb.ListTopicsResponse{}
1178 if pageToken != "" {
1179 req.PageToken = pageToken
1180 }
1181 if pageSize > math.MaxInt32 {
1182 req.PageSize = math.MaxInt32
1183 } else if pageSize != 0 {
1184 req.PageSize = int32(pageSize)
1185 }
1186 baseUrl, err := url.Parse(c.endpoint)
1187 if err != nil {
1188 return nil, "", err
1189 }
1190 baseUrl.Path += fmt.Sprintf("/v1/%v/topics", req.GetProject())
1191
1192 params := url.Values{}
1193 params.Add("$alt", "json;enum-encoding=int")
1194 if req.GetPageSize() != 0 {
1195 params.Add("pageSize", fmt.Sprintf("%v", req.GetPageSize()))
1196 }
1197 if req.GetPageToken() != "" {
1198 params.Add("pageToken", fmt.Sprintf("%v", req.GetPageToken()))
1199 }
1200
1201 baseUrl.RawQuery = params.Encode()
1202
1203
1204 hds := append(c.xGoogHeaders, "Content-Type", "application/json")
1205 headers := gax.BuildHeaders(ctx, hds...)
1206 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
1207 if settings.Path != "" {
1208 baseUrl.Path = settings.Path
1209 }
1210 httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
1211 if err != nil {
1212 return err
1213 }
1214 httpReq.Header = headers
1215
1216 httpRsp, err := c.httpClient.Do(httpReq)
1217 if err != nil {
1218 return err
1219 }
1220 defer httpRsp.Body.Close()
1221
1222 if err = googleapi.CheckResponse(httpRsp); err != nil {
1223 return err
1224 }
1225
1226 buf, err := io.ReadAll(httpRsp.Body)
1227 if err != nil {
1228 return err
1229 }
1230
1231 if err := unm.Unmarshal(buf, resp); err != nil {
1232 return err
1233 }
1234
1235 return nil
1236 }, opts...)
1237 if e != nil {
1238 return nil, "", e
1239 }
1240 it.Response = resp
1241 return resp.GetTopics(), resp.GetNextPageToken(), nil
1242 }
1243
1244 fetch := func(pageSize int, pageToken string) (string, error) {
1245 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
1246 if err != nil {
1247 return "", err
1248 }
1249 it.items = append(it.items, items...)
1250 return nextPageToken, nil
1251 }
1252
1253 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
1254 it.pageInfo.MaxSize = int(req.GetPageSize())
1255 it.pageInfo.Token = req.GetPageToken()
1256
1257 return it
1258 }
1259
1260
1261 func (c *publisherRESTClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator {
1262 it := &StringIterator{}
1263 req = proto.Clone(req).(*pubsubpb.ListTopicSubscriptionsRequest)
1264 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
1265 it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
1266 resp := &pubsubpb.ListTopicSubscriptionsResponse{}
1267 if pageToken != "" {
1268 req.PageToken = pageToken
1269 }
1270 if pageSize > math.MaxInt32 {
1271 req.PageSize = math.MaxInt32
1272 } else if pageSize != 0 {
1273 req.PageSize = int32(pageSize)
1274 }
1275 baseUrl, err := url.Parse(c.endpoint)
1276 if err != nil {
1277 return nil, "", err
1278 }
1279 baseUrl.Path += fmt.Sprintf("/v1/%v/subscriptions", req.GetTopic())
1280
1281 params := url.Values{}
1282 params.Add("$alt", "json;enum-encoding=int")
1283 if req.GetPageSize() != 0 {
1284 params.Add("pageSize", fmt.Sprintf("%v", req.GetPageSize()))
1285 }
1286 if req.GetPageToken() != "" {
1287 params.Add("pageToken", fmt.Sprintf("%v", req.GetPageToken()))
1288 }
1289
1290 baseUrl.RawQuery = params.Encode()
1291
1292
1293 hds := append(c.xGoogHeaders, "Content-Type", "application/json")
1294 headers := gax.BuildHeaders(ctx, hds...)
1295 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
1296 if settings.Path != "" {
1297 baseUrl.Path = settings.Path
1298 }
1299 httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
1300 if err != nil {
1301 return err
1302 }
1303 httpReq.Header = headers
1304
1305 httpRsp, err := c.httpClient.Do(httpReq)
1306 if err != nil {
1307 return err
1308 }
1309 defer httpRsp.Body.Close()
1310
1311 if err = googleapi.CheckResponse(httpRsp); err != nil {
1312 return err
1313 }
1314
1315 buf, err := io.ReadAll(httpRsp.Body)
1316 if err != nil {
1317 return err
1318 }
1319
1320 if err := unm.Unmarshal(buf, resp); err != nil {
1321 return err
1322 }
1323
1324 return nil
1325 }, opts...)
1326 if e != nil {
1327 return nil, "", e
1328 }
1329 it.Response = resp
1330 return resp.GetSubscriptions(), resp.GetNextPageToken(), nil
1331 }
1332
1333 fetch := func(pageSize int, pageToken string) (string, error) {
1334 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
1335 if err != nil {
1336 return "", err
1337 }
1338 it.items = append(it.items, items...)
1339 return nextPageToken, nil
1340 }
1341
1342 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
1343 it.pageInfo.MaxSize = int(req.GetPageSize())
1344 it.pageInfo.Token = req.GetPageToken()
1345
1346 return it
1347 }
1348
1349
1350
1351
1352
1353
1354 func (c *publisherRESTClient) ListTopicSnapshots(ctx context.Context, req *pubsubpb.ListTopicSnapshotsRequest, opts ...gax.CallOption) *StringIterator {
1355 it := &StringIterator{}
1356 req = proto.Clone(req).(*pubsubpb.ListTopicSnapshotsRequest)
1357 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
1358 it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
1359 resp := &pubsubpb.ListTopicSnapshotsResponse{}
1360 if pageToken != "" {
1361 req.PageToken = pageToken
1362 }
1363 if pageSize > math.MaxInt32 {
1364 req.PageSize = math.MaxInt32
1365 } else if pageSize != 0 {
1366 req.PageSize = int32(pageSize)
1367 }
1368 baseUrl, err := url.Parse(c.endpoint)
1369 if err != nil {
1370 return nil, "", err
1371 }
1372 baseUrl.Path += fmt.Sprintf("/v1/%v/snapshots", req.GetTopic())
1373
1374 params := url.Values{}
1375 params.Add("$alt", "json;enum-encoding=int")
1376 if req.GetPageSize() != 0 {
1377 params.Add("pageSize", fmt.Sprintf("%v", req.GetPageSize()))
1378 }
1379 if req.GetPageToken() != "" {
1380 params.Add("pageToken", fmt.Sprintf("%v", req.GetPageToken()))
1381 }
1382
1383 baseUrl.RawQuery = params.Encode()
1384
1385
1386 hds := append(c.xGoogHeaders, "Content-Type", "application/json")
1387 headers := gax.BuildHeaders(ctx, hds...)
1388 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
1389 if settings.Path != "" {
1390 baseUrl.Path = settings.Path
1391 }
1392 httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
1393 if err != nil {
1394 return err
1395 }
1396 httpReq.Header = headers
1397
1398 httpRsp, err := c.httpClient.Do(httpReq)
1399 if err != nil {
1400 return err
1401 }
1402 defer httpRsp.Body.Close()
1403
1404 if err = googleapi.CheckResponse(httpRsp); err != nil {
1405 return err
1406 }
1407
1408 buf, err := io.ReadAll(httpRsp.Body)
1409 if err != nil {
1410 return err
1411 }
1412
1413 if err := unm.Unmarshal(buf, resp); err != nil {
1414 return err
1415 }
1416
1417 return nil
1418 }, opts...)
1419 if e != nil {
1420 return nil, "", e
1421 }
1422 it.Response = resp
1423 return resp.GetSnapshots(), resp.GetNextPageToken(), nil
1424 }
1425
1426 fetch := func(pageSize int, pageToken string) (string, error) {
1427 items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
1428 if err != nil {
1429 return "", err
1430 }
1431 it.items = append(it.items, items...)
1432 return nextPageToken, nil
1433 }
1434
1435 it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
1436 it.pageInfo.MaxSize = int(req.GetPageSize())
1437 it.pageInfo.Token = req.GetPageToken()
1438
1439 return it
1440 }
1441
1442
1443
1444
1445
1446
1447 func (c *publisherRESTClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error {
1448 baseUrl, err := url.Parse(c.endpoint)
1449 if err != nil {
1450 return err
1451 }
1452 baseUrl.Path += fmt.Sprintf("/v1/%v", req.GetTopic())
1453
1454 params := url.Values{}
1455 params.Add("$alt", "json;enum-encoding=int")
1456
1457 baseUrl.RawQuery = params.Encode()
1458
1459
1460 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
1461
1462 hds = append(c.xGoogHeaders, hds...)
1463 hds = append(hds, "Content-Type", "application/json")
1464 headers := gax.BuildHeaders(ctx, hds...)
1465 return gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
1466 if settings.Path != "" {
1467 baseUrl.Path = settings.Path
1468 }
1469 httpReq, err := http.NewRequest("DELETE", baseUrl.String(), nil)
1470 if err != nil {
1471 return err
1472 }
1473 httpReq = httpReq.WithContext(ctx)
1474 httpReq.Header = headers
1475
1476 httpRsp, err := c.httpClient.Do(httpReq)
1477 if err != nil {
1478 return err
1479 }
1480 defer httpRsp.Body.Close()
1481
1482
1483
1484 return googleapi.CheckResponse(httpRsp)
1485 }, opts...)
1486 }
1487
1488
1489
1490
1491
1492 func (c *publisherRESTClient) DetachSubscription(ctx context.Context, req *pubsubpb.DetachSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.DetachSubscriptionResponse, error) {
1493 baseUrl, err := url.Parse(c.endpoint)
1494 if err != nil {
1495 return nil, err
1496 }
1497 baseUrl.Path += fmt.Sprintf("/v1/%v:detach", req.GetSubscription())
1498
1499 params := url.Values{}
1500 params.Add("$alt", "json;enum-encoding=int")
1501
1502 baseUrl.RawQuery = params.Encode()
1503
1504
1505 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription()))}
1506
1507 hds = append(c.xGoogHeaders, hds...)
1508 hds = append(hds, "Content-Type", "application/json")
1509 headers := gax.BuildHeaders(ctx, hds...)
1510 opts = append((*c.CallOptions).DetachSubscription[0:len((*c.CallOptions).DetachSubscription):len((*c.CallOptions).DetachSubscription)], opts...)
1511 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
1512 resp := &pubsubpb.DetachSubscriptionResponse{}
1513 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
1514 if settings.Path != "" {
1515 baseUrl.Path = settings.Path
1516 }
1517 httpReq, err := http.NewRequest("POST", baseUrl.String(), nil)
1518 if err != nil {
1519 return err
1520 }
1521 httpReq = httpReq.WithContext(ctx)
1522 httpReq.Header = headers
1523
1524 httpRsp, err := c.httpClient.Do(httpReq)
1525 if err != nil {
1526 return err
1527 }
1528 defer httpRsp.Body.Close()
1529
1530 if err = googleapi.CheckResponse(httpRsp); err != nil {
1531 return err
1532 }
1533
1534 buf, err := io.ReadAll(httpRsp.Body)
1535 if err != nil {
1536 return err
1537 }
1538
1539 if err := unm.Unmarshal(buf, resp); err != nil {
1540 return err
1541 }
1542
1543 return nil
1544 }, opts...)
1545 if e != nil {
1546 return nil, e
1547 }
1548 return resp, nil
1549 }
1550
1551
1552
1553 func (c *publisherRESTClient) GetIamPolicy(ctx context.Context, req *iampb.GetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error) {
1554 baseUrl, err := url.Parse(c.endpoint)
1555 if err != nil {
1556 return nil, err
1557 }
1558 baseUrl.Path += fmt.Sprintf("/v1/%v:getIamPolicy", req.GetResource())
1559
1560 params := url.Values{}
1561 params.Add("$alt", "json;enum-encoding=int")
1562 if req.GetOptions().GetRequestedPolicyVersion() != 0 {
1563 params.Add("options.requestedPolicyVersion", fmt.Sprintf("%v", req.GetOptions().GetRequestedPolicyVersion()))
1564 }
1565
1566 baseUrl.RawQuery = params.Encode()
1567
1568
1569 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "resource", url.QueryEscape(req.GetResource()))}
1570
1571 hds = append(c.xGoogHeaders, hds...)
1572 hds = append(hds, "Content-Type", "application/json")
1573 headers := gax.BuildHeaders(ctx, hds...)
1574 opts = append((*c.CallOptions).GetIamPolicy[0:len((*c.CallOptions).GetIamPolicy):len((*c.CallOptions).GetIamPolicy)], opts...)
1575 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
1576 resp := &iampb.Policy{}
1577 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
1578 if settings.Path != "" {
1579 baseUrl.Path = settings.Path
1580 }
1581 httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
1582 if err != nil {
1583 return err
1584 }
1585 httpReq = httpReq.WithContext(ctx)
1586 httpReq.Header = headers
1587
1588 httpRsp, err := c.httpClient.Do(httpReq)
1589 if err != nil {
1590 return err
1591 }
1592 defer httpRsp.Body.Close()
1593
1594 if err = googleapi.CheckResponse(httpRsp); err != nil {
1595 return err
1596 }
1597
1598 buf, err := io.ReadAll(httpRsp.Body)
1599 if err != nil {
1600 return err
1601 }
1602
1603 if err := unm.Unmarshal(buf, resp); err != nil {
1604 return err
1605 }
1606
1607 return nil
1608 }, opts...)
1609 if e != nil {
1610 return nil, e
1611 }
1612 return resp, nil
1613 }
1614
1615
1616
1617
1618
1619
1620 func (c *publisherRESTClient) SetIamPolicy(ctx context.Context, req *iampb.SetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error) {
1621 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
1622 jsonReq, err := m.Marshal(req)
1623 if err != nil {
1624 return nil, err
1625 }
1626
1627 baseUrl, err := url.Parse(c.endpoint)
1628 if err != nil {
1629 return nil, err
1630 }
1631 baseUrl.Path += fmt.Sprintf("/v1/%v:setIamPolicy", req.GetResource())
1632
1633 params := url.Values{}
1634 params.Add("$alt", "json;enum-encoding=int")
1635
1636 baseUrl.RawQuery = params.Encode()
1637
1638
1639 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "resource", url.QueryEscape(req.GetResource()))}
1640
1641 hds = append(c.xGoogHeaders, hds...)
1642 hds = append(hds, "Content-Type", "application/json")
1643 headers := gax.BuildHeaders(ctx, hds...)
1644 opts = append((*c.CallOptions).SetIamPolicy[0:len((*c.CallOptions).SetIamPolicy):len((*c.CallOptions).SetIamPolicy)], opts...)
1645 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
1646 resp := &iampb.Policy{}
1647 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
1648 if settings.Path != "" {
1649 baseUrl.Path = settings.Path
1650 }
1651 httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
1652 if err != nil {
1653 return err
1654 }
1655 httpReq = httpReq.WithContext(ctx)
1656 httpReq.Header = headers
1657
1658 httpRsp, err := c.httpClient.Do(httpReq)
1659 if err != nil {
1660 return err
1661 }
1662 defer httpRsp.Body.Close()
1663
1664 if err = googleapi.CheckResponse(httpRsp); err != nil {
1665 return err
1666 }
1667
1668 buf, err := io.ReadAll(httpRsp.Body)
1669 if err != nil {
1670 return err
1671 }
1672
1673 if err := unm.Unmarshal(buf, resp); err != nil {
1674 return err
1675 }
1676
1677 return nil
1678 }, opts...)
1679 if e != nil {
1680 return nil, e
1681 }
1682 return resp, nil
1683 }
1684
1685
1686
1687
1688
1689
1690
1691
1692 func (c *publisherRESTClient) TestIamPermissions(ctx context.Context, req *iampb.TestIamPermissionsRequest, opts ...gax.CallOption) (*iampb.TestIamPermissionsResponse, error) {
1693 m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
1694 jsonReq, err := m.Marshal(req)
1695 if err != nil {
1696 return nil, err
1697 }
1698
1699 baseUrl, err := url.Parse(c.endpoint)
1700 if err != nil {
1701 return nil, err
1702 }
1703 baseUrl.Path += fmt.Sprintf("/v1/%v:testIamPermissions", req.GetResource())
1704
1705 params := url.Values{}
1706 params.Add("$alt", "json;enum-encoding=int")
1707
1708 baseUrl.RawQuery = params.Encode()
1709
1710
1711 hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "resource", url.QueryEscape(req.GetResource()))}
1712
1713 hds = append(c.xGoogHeaders, hds...)
1714 hds = append(hds, "Content-Type", "application/json")
1715 headers := gax.BuildHeaders(ctx, hds...)
1716 opts = append((*c.CallOptions).TestIamPermissions[0:len((*c.CallOptions).TestIamPermissions):len((*c.CallOptions).TestIamPermissions)], opts...)
1717 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
1718 resp := &iampb.TestIamPermissionsResponse{}
1719 e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
1720 if settings.Path != "" {
1721 baseUrl.Path = settings.Path
1722 }
1723 httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
1724 if err != nil {
1725 return err
1726 }
1727 httpReq = httpReq.WithContext(ctx)
1728 httpReq.Header = headers
1729
1730 httpRsp, err := c.httpClient.Do(httpReq)
1731 if err != nil {
1732 return err
1733 }
1734 defer httpRsp.Body.Close()
1735
1736 if err = googleapi.CheckResponse(httpRsp); err != nil {
1737 return err
1738 }
1739
1740 buf, err := io.ReadAll(httpRsp.Body)
1741 if err != nil {
1742 return err
1743 }
1744
1745 if err := unm.Unmarshal(buf, resp); err != nil {
1746 return err
1747 }
1748
1749 return nil
1750 }, opts...)
1751 if e != nil {
1752 return nil, e
1753 }
1754 return resp, nil
1755 }
1756
View as plain text