...

Source file src/cloud.google.com/go/pubsub/apiv1/publisher_client.go

Documentation: cloud.google.com/go/pubsub/apiv1

     1  // Copyright 2024 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  // Code generated by protoc-gen-go_gapic. DO NOT EDIT.
    16  
    17  package pubsub
    18  
    19  import (
    20  	"bytes"
    21  	"context"
    22  	"fmt"
    23  	"io"
    24  	"math"
    25  	"net/http"
    26  	"net/url"
    27  	"time"
    28  
    29  	iampb "cloud.google.com/go/iam/apiv1/iampb"
    30  	pubsubpb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
    31  	gax "github.com/googleapis/gax-go/v2"
    32  	"google.golang.org/api/googleapi"
    33  	"google.golang.org/api/iterator"
    34  	"google.golang.org/api/option"
    35  	"google.golang.org/api/option/internaloption"
    36  	gtransport "google.golang.org/api/transport/grpc"
    37  	httptransport "google.golang.org/api/transport/http"
    38  	"google.golang.org/grpc"
    39  	"google.golang.org/grpc/codes"
    40  	"google.golang.org/protobuf/encoding/protojson"
    41  	"google.golang.org/protobuf/proto"
    42  )
    43  
    44  var newPublisherClientHook clientHook
    45  
    46  // PublisherCallOptions contains the retry settings for each method of PublisherClient.
    47  type PublisherCallOptions struct {
    48  	CreateTopic            []gax.CallOption
    49  	UpdateTopic            []gax.CallOption
    50  	Publish                []gax.CallOption
    51  	GetTopic               []gax.CallOption
    52  	ListTopics             []gax.CallOption
    53  	ListTopicSubscriptions []gax.CallOption
    54  	ListTopicSnapshots     []gax.CallOption
    55  	DeleteTopic            []gax.CallOption
    56  	DetachSubscription     []gax.CallOption
    57  	GetIamPolicy           []gax.CallOption
    58  	SetIamPolicy           []gax.CallOption
    59  	TestIamPermissions     []gax.CallOption
    60  }
    61  
    62  func defaultPublisherGRPCClientOptions() []option.ClientOption {
    63  	return []option.ClientOption{
    64  		internaloption.WithDefaultEndpoint("pubsub.googleapis.com:443"),
    65  		internaloption.WithDefaultEndpointTemplate("pubsub.UNIVERSE_DOMAIN:443"),
    66  		internaloption.WithDefaultMTLSEndpoint("pubsub.mtls.googleapis.com:443"),
    67  		internaloption.WithDefaultUniverseDomain("googleapis.com"),
    68  		internaloption.WithDefaultAudience("https://pubsub.googleapis.com/"),
    69  		internaloption.WithDefaultScopes(DefaultAuthScopes()...),
    70  		internaloption.EnableJwtWithScope(),
    71  		option.WithGRPCDialOption(grpc.WithDefaultCallOptions(
    72  			grpc.MaxCallRecvMsgSize(math.MaxInt32))),
    73  	}
    74  }
    75  
    76  func defaultPublisherCallOptions() *PublisherCallOptions {
    77  	return &PublisherCallOptions{
    78  		CreateTopic: []gax.CallOption{
    79  			gax.WithTimeout(60000 * time.Millisecond),
    80  			gax.WithRetry(func() gax.Retryer {
    81  				return gax.OnCodes([]codes.Code{
    82  					codes.Unavailable,
    83  				}, gax.Backoff{
    84  					Initial:    100 * time.Millisecond,
    85  					Max:        60000 * time.Millisecond,
    86  					Multiplier: 1.30,
    87  				})
    88  			}),
    89  		},
    90  		UpdateTopic: []gax.CallOption{
    91  			gax.WithTimeout(60000 * time.Millisecond),
    92  			gax.WithRetry(func() gax.Retryer {
    93  				return gax.OnCodes([]codes.Code{
    94  					codes.Unavailable,
    95  				}, gax.Backoff{
    96  					Initial:    100 * time.Millisecond,
    97  					Max:        60000 * time.Millisecond,
    98  					Multiplier: 1.30,
    99  				})
   100  			}),
   101  		},
   102  		Publish: []gax.CallOption{
   103  			gax.WithTimeout(60000 * time.Millisecond),
   104  			gax.WithRetry(func() gax.Retryer {
   105  				return gax.OnCodes([]codes.Code{
   106  					codes.Aborted,
   107  					codes.Canceled,
   108  					codes.Internal,
   109  					codes.ResourceExhausted,
   110  					codes.Unknown,
   111  					codes.Unavailable,
   112  					codes.DeadlineExceeded,
   113  				}, gax.Backoff{
   114  					Initial:    100 * time.Millisecond,
   115  					Max:        60000 * time.Millisecond,
   116  					Multiplier: 4.00,
   117  				})
   118  			}),
   119  		},
   120  		GetTopic: []gax.CallOption{
   121  			gax.WithTimeout(60000 * time.Millisecond),
   122  			gax.WithRetry(func() gax.Retryer {
   123  				return gax.OnCodes([]codes.Code{
   124  					codes.Unknown,
   125  					codes.Aborted,
   126  					codes.Unavailable,
   127  				}, gax.Backoff{
   128  					Initial:    100 * time.Millisecond,
   129  					Max:        60000 * time.Millisecond,
   130  					Multiplier: 1.30,
   131  				})
   132  			}),
   133  		},
   134  		ListTopics: []gax.CallOption{
   135  			gax.WithTimeout(60000 * time.Millisecond),
   136  			gax.WithRetry(func() gax.Retryer {
   137  				return gax.OnCodes([]codes.Code{
   138  					codes.Unknown,
   139  					codes.Aborted,
   140  					codes.Unavailable,
   141  				}, gax.Backoff{
   142  					Initial:    100 * time.Millisecond,
   143  					Max:        60000 * time.Millisecond,
   144  					Multiplier: 1.30,
   145  				})
   146  			}),
   147  		},
   148  		ListTopicSubscriptions: []gax.CallOption{
   149  			gax.WithTimeout(60000 * time.Millisecond),
   150  			gax.WithRetry(func() gax.Retryer {
   151  				return gax.OnCodes([]codes.Code{
   152  					codes.Unknown,
   153  					codes.Aborted,
   154  					codes.Unavailable,
   155  				}, gax.Backoff{
   156  					Initial:    100 * time.Millisecond,
   157  					Max:        60000 * time.Millisecond,
   158  					Multiplier: 1.30,
   159  				})
   160  			}),
   161  		},
   162  		ListTopicSnapshots: []gax.CallOption{
   163  			gax.WithTimeout(60000 * time.Millisecond),
   164  			gax.WithRetry(func() gax.Retryer {
   165  				return gax.OnCodes([]codes.Code{
   166  					codes.Unknown,
   167  					codes.Aborted,
   168  					codes.Unavailable,
   169  				}, gax.Backoff{
   170  					Initial:    100 * time.Millisecond,
   171  					Max:        60000 * time.Millisecond,
   172  					Multiplier: 1.30,
   173  				})
   174  			}),
   175  		},
   176  		DeleteTopic: []gax.CallOption{
   177  			gax.WithTimeout(60000 * time.Millisecond),
   178  			gax.WithRetry(func() gax.Retryer {
   179  				return gax.OnCodes([]codes.Code{
   180  					codes.Unavailable,
   181  				}, gax.Backoff{
   182  					Initial:    100 * time.Millisecond,
   183  					Max:        60000 * time.Millisecond,
   184  					Multiplier: 1.30,
   185  				})
   186  			}),
   187  		},
   188  		DetachSubscription: []gax.CallOption{
   189  			gax.WithTimeout(60000 * time.Millisecond),
   190  			gax.WithRetry(func() gax.Retryer {
   191  				return gax.OnCodes([]codes.Code{
   192  					codes.Unavailable,
   193  				}, gax.Backoff{
   194  					Initial:    100 * time.Millisecond,
   195  					Max:        60000 * time.Millisecond,
   196  					Multiplier: 1.30,
   197  				})
   198  			}),
   199  		},
   200  		GetIamPolicy:       []gax.CallOption{},
   201  		SetIamPolicy:       []gax.CallOption{},
   202  		TestIamPermissions: []gax.CallOption{},
   203  	}
   204  }
   205  
   206  func defaultPublisherRESTCallOptions() *PublisherCallOptions {
   207  	return &PublisherCallOptions{
   208  		CreateTopic: []gax.CallOption{
   209  			gax.WithTimeout(60000 * time.Millisecond),
   210  			gax.WithRetry(func() gax.Retryer {
   211  				return gax.OnHTTPCodes(gax.Backoff{
   212  					Initial:    100 * time.Millisecond,
   213  					Max:        60000 * time.Millisecond,
   214  					Multiplier: 1.30,
   215  				},
   216  					http.StatusServiceUnavailable)
   217  			}),
   218  		},
   219  		UpdateTopic: []gax.CallOption{
   220  			gax.WithTimeout(60000 * time.Millisecond),
   221  			gax.WithRetry(func() gax.Retryer {
   222  				return gax.OnHTTPCodes(gax.Backoff{
   223  					Initial:    100 * time.Millisecond,
   224  					Max:        60000 * time.Millisecond,
   225  					Multiplier: 1.30,
   226  				},
   227  					http.StatusServiceUnavailable)
   228  			}),
   229  		},
   230  		Publish: []gax.CallOption{
   231  			gax.WithTimeout(60000 * time.Millisecond),
   232  			gax.WithRetry(func() gax.Retryer {
   233  				return gax.OnHTTPCodes(gax.Backoff{
   234  					Initial:    100 * time.Millisecond,
   235  					Max:        60000 * time.Millisecond,
   236  					Multiplier: 4.00,
   237  				},
   238  					http.StatusConflict,
   239  					499,
   240  					http.StatusInternalServerError,
   241  					http.StatusTooManyRequests,
   242  					http.StatusInternalServerError,
   243  					http.StatusServiceUnavailable,
   244  					http.StatusGatewayTimeout)
   245  			}),
   246  		},
   247  		GetTopic: []gax.CallOption{
   248  			gax.WithTimeout(60000 * time.Millisecond),
   249  			gax.WithRetry(func() gax.Retryer {
   250  				return gax.OnHTTPCodes(gax.Backoff{
   251  					Initial:    100 * time.Millisecond,
   252  					Max:        60000 * time.Millisecond,
   253  					Multiplier: 1.30,
   254  				},
   255  					http.StatusInternalServerError,
   256  					http.StatusConflict,
   257  					http.StatusServiceUnavailable)
   258  			}),
   259  		},
   260  		ListTopics: []gax.CallOption{
   261  			gax.WithTimeout(60000 * time.Millisecond),
   262  			gax.WithRetry(func() gax.Retryer {
   263  				return gax.OnHTTPCodes(gax.Backoff{
   264  					Initial:    100 * time.Millisecond,
   265  					Max:        60000 * time.Millisecond,
   266  					Multiplier: 1.30,
   267  				},
   268  					http.StatusInternalServerError,
   269  					http.StatusConflict,
   270  					http.StatusServiceUnavailable)
   271  			}),
   272  		},
   273  		ListTopicSubscriptions: []gax.CallOption{
   274  			gax.WithTimeout(60000 * time.Millisecond),
   275  			gax.WithRetry(func() gax.Retryer {
   276  				return gax.OnHTTPCodes(gax.Backoff{
   277  					Initial:    100 * time.Millisecond,
   278  					Max:        60000 * time.Millisecond,
   279  					Multiplier: 1.30,
   280  				},
   281  					http.StatusInternalServerError,
   282  					http.StatusConflict,
   283  					http.StatusServiceUnavailable)
   284  			}),
   285  		},
   286  		ListTopicSnapshots: []gax.CallOption{
   287  			gax.WithTimeout(60000 * time.Millisecond),
   288  			gax.WithRetry(func() gax.Retryer {
   289  				return gax.OnHTTPCodes(gax.Backoff{
   290  					Initial:    100 * time.Millisecond,
   291  					Max:        60000 * time.Millisecond,
   292  					Multiplier: 1.30,
   293  				},
   294  					http.StatusInternalServerError,
   295  					http.StatusConflict,
   296  					http.StatusServiceUnavailable)
   297  			}),
   298  		},
   299  		DeleteTopic: []gax.CallOption{
   300  			gax.WithTimeout(60000 * time.Millisecond),
   301  			gax.WithRetry(func() gax.Retryer {
   302  				return gax.OnHTTPCodes(gax.Backoff{
   303  					Initial:    100 * time.Millisecond,
   304  					Max:        60000 * time.Millisecond,
   305  					Multiplier: 1.30,
   306  				},
   307  					http.StatusServiceUnavailable)
   308  			}),
   309  		},
   310  		DetachSubscription: []gax.CallOption{
   311  			gax.WithTimeout(60000 * time.Millisecond),
   312  			gax.WithRetry(func() gax.Retryer {
   313  				return gax.OnHTTPCodes(gax.Backoff{
   314  					Initial:    100 * time.Millisecond,
   315  					Max:        60000 * time.Millisecond,
   316  					Multiplier: 1.30,
   317  				},
   318  					http.StatusServiceUnavailable)
   319  			}),
   320  		},
   321  		GetIamPolicy:       []gax.CallOption{},
   322  		SetIamPolicy:       []gax.CallOption{},
   323  		TestIamPermissions: []gax.CallOption{},
   324  	}
   325  }
   326  
   327  // internalPublisherClient is an interface that defines the methods available from Cloud Pub/Sub API.
   328  type internalPublisherClient interface {
   329  	Close() error
   330  	setGoogleClientInfo(...string)
   331  	Connection() *grpc.ClientConn
   332  	CreateTopic(context.Context, *pubsubpb.Topic, ...gax.CallOption) (*pubsubpb.Topic, error)
   333  	UpdateTopic(context.Context, *pubsubpb.UpdateTopicRequest, ...gax.CallOption) (*pubsubpb.Topic, error)
   334  	Publish(context.Context, *pubsubpb.PublishRequest, ...gax.CallOption) (*pubsubpb.PublishResponse, error)
   335  	GetTopic(context.Context, *pubsubpb.GetTopicRequest, ...gax.CallOption) (*pubsubpb.Topic, error)
   336  	ListTopics(context.Context, *pubsubpb.ListTopicsRequest, ...gax.CallOption) *TopicIterator
   337  	ListTopicSubscriptions(context.Context, *pubsubpb.ListTopicSubscriptionsRequest, ...gax.CallOption) *StringIterator
   338  	ListTopicSnapshots(context.Context, *pubsubpb.ListTopicSnapshotsRequest, ...gax.CallOption) *StringIterator
   339  	DeleteTopic(context.Context, *pubsubpb.DeleteTopicRequest, ...gax.CallOption) error
   340  	DetachSubscription(context.Context, *pubsubpb.DetachSubscriptionRequest, ...gax.CallOption) (*pubsubpb.DetachSubscriptionResponse, error)
   341  	GetIamPolicy(context.Context, *iampb.GetIamPolicyRequest, ...gax.CallOption) (*iampb.Policy, error)
   342  	SetIamPolicy(context.Context, *iampb.SetIamPolicyRequest, ...gax.CallOption) (*iampb.Policy, error)
   343  	TestIamPermissions(context.Context, *iampb.TestIamPermissionsRequest, ...gax.CallOption) (*iampb.TestIamPermissionsResponse, error)
   344  }
   345  
   346  // PublisherClient is a client for interacting with Cloud Pub/Sub API.
   347  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   348  //
   349  // The service that an application uses to manipulate topics, and to send
   350  // messages to a topic.
   351  type PublisherClient struct {
   352  	// The internal transport-dependent client.
   353  	internalClient internalPublisherClient
   354  
   355  	// The call options for this service.
   356  	CallOptions *PublisherCallOptions
   357  }
   358  
   359  // Wrapper methods routed to the internal client.
   360  
   361  // Close closes the connection to the API service. The user should invoke this when
   362  // the client is no longer required.
   363  func (c *PublisherClient) Close() error {
   364  	return c.internalClient.Close()
   365  }
   366  
   367  // setGoogleClientInfo sets the name and version of the application in
   368  // the `x-goog-api-client` header passed on each request. Intended for
   369  // use by Google-written clients.
   370  func (c *PublisherClient) setGoogleClientInfo(keyval ...string) {
   371  	c.internalClient.setGoogleClientInfo(keyval...)
   372  }
   373  
   374  // Connection returns a connection to the API service.
   375  //
   376  // Deprecated: Connections are now pooled so this method does not always
   377  // return the same resource.
   378  func (c *PublisherClient) Connection() *grpc.ClientConn {
   379  	return c.internalClient.Connection()
   380  }
   381  
   382  // CreateTopic creates the given topic with the given name. See the [resource name rules]
   383  // (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names (at https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names)).
   384  func (c *PublisherClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
   385  	return c.internalClient.CreateTopic(ctx, req, opts...)
   386  }
   387  
   388  // UpdateTopic updates an existing topic by updating the fields specified in the update
   389  // mask. Note that certain properties of a topic are not modifiable.
   390  func (c *PublisherClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
   391  	return c.internalClient.UpdateTopic(ctx, req, opts...)
   392  }
   393  
   394  // Publish adds one or more messages to the topic. Returns NOT_FOUND if the topic
   395  // does not exist.
   396  func (c *PublisherClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) {
   397  	return c.internalClient.Publish(ctx, req, opts...)
   398  }
   399  
   400  // GetTopic gets the configuration of a topic.
   401  func (c *PublisherClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
   402  	return c.internalClient.GetTopic(ctx, req, opts...)
   403  }
   404  
   405  // ListTopics lists matching topics.
   406  func (c *PublisherClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator {
   407  	return c.internalClient.ListTopics(ctx, req, opts...)
   408  }
   409  
   410  // ListTopicSubscriptions lists the names of the attached subscriptions on this topic.
   411  func (c *PublisherClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator {
   412  	return c.internalClient.ListTopicSubscriptions(ctx, req, opts...)
   413  }
   414  
   415  // ListTopicSnapshots lists the names of the snapshots on this topic. Snapshots are used in
   416  // Seek (at https://cloud.google.com/pubsub/docs/replay-overview) operations,
   417  // which allow you to manage message acknowledgments in bulk. That is, you can
   418  // set the acknowledgment state of messages in an existing subscription to the
   419  // state captured by a snapshot.
   420  func (c *PublisherClient) ListTopicSnapshots(ctx context.Context, req *pubsubpb.ListTopicSnapshotsRequest, opts ...gax.CallOption) *StringIterator {
   421  	return c.internalClient.ListTopicSnapshots(ctx, req, opts...)
   422  }
   423  
   424  // DeleteTopic deletes the topic with the given name. Returns NOT_FOUND if the topic
   425  // does not exist. After a topic is deleted, a new topic may be created with
   426  // the same name; this is an entirely new topic with none of the old
   427  // configuration or subscriptions. Existing subscriptions to this topic are
   428  // not deleted, but their topic field is set to _deleted-topic_.
   429  func (c *PublisherClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error {
   430  	return c.internalClient.DeleteTopic(ctx, req, opts...)
   431  }
   432  
   433  // DetachSubscription detaches a subscription from this topic. All messages retained in the
   434  // subscription are dropped. Subsequent Pull and StreamingPull requests
   435  // will return FAILED_PRECONDITION. If the subscription is a push
   436  // subscription, pushes to the endpoint will stop.
   437  func (c *PublisherClient) DetachSubscription(ctx context.Context, req *pubsubpb.DetachSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.DetachSubscriptionResponse, error) {
   438  	return c.internalClient.DetachSubscription(ctx, req, opts...)
   439  }
   440  
   441  // GetIamPolicy gets the access control policy for a resource. Returns an empty policy
   442  // if the resource exists and does not have a policy set.
   443  func (c *PublisherClient) GetIamPolicy(ctx context.Context, req *iampb.GetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error) {
   444  	return c.internalClient.GetIamPolicy(ctx, req, opts...)
   445  }
   446  
   447  // SetIamPolicy sets the access control policy on the specified resource. Replaces
   448  // any existing policy.
   449  //
   450  // Can return NOT_FOUND, INVALID_ARGUMENT, and PERMISSION_DENIED
   451  // errors.
   452  func (c *PublisherClient) SetIamPolicy(ctx context.Context, req *iampb.SetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error) {
   453  	return c.internalClient.SetIamPolicy(ctx, req, opts...)
   454  }
   455  
   456  // TestIamPermissions returns permissions that a caller has on the specified resource. If the
   457  // resource does not exist, this will return an empty set of
   458  // permissions, not a NOT_FOUND error.
   459  //
   460  // Note: This operation is designed to be used for building
   461  // permission-aware UIs and command-line tools, not for authorization
   462  // checking. This operation may “fail open” without warning.
   463  func (c *PublisherClient) TestIamPermissions(ctx context.Context, req *iampb.TestIamPermissionsRequest, opts ...gax.CallOption) (*iampb.TestIamPermissionsResponse, error) {
   464  	return c.internalClient.TestIamPermissions(ctx, req, opts...)
   465  }
   466  
   467  // publisherGRPCClient is a client for interacting with Cloud Pub/Sub API over gRPC transport.
   468  //
   469  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   470  type publisherGRPCClient struct {
   471  	// Connection pool of gRPC connections to the service.
   472  	connPool gtransport.ConnPool
   473  
   474  	// Points back to the CallOptions field of the containing PublisherClient
   475  	CallOptions **PublisherCallOptions
   476  
   477  	// The gRPC API client.
   478  	publisherClient pubsubpb.PublisherClient
   479  
   480  	iamPolicyClient iampb.IAMPolicyClient
   481  
   482  	// The x-goog-* metadata to be sent with each request.
   483  	xGoogHeaders []string
   484  }
   485  
   486  // NewPublisherClient creates a new publisher client based on gRPC.
   487  // The returned client must be Closed when it is done being used to clean up its underlying connections.
   488  //
   489  // The service that an application uses to manipulate topics, and to send
   490  // messages to a topic.
   491  func NewPublisherClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error) {
   492  	clientOpts := defaultPublisherGRPCClientOptions()
   493  	if newPublisherClientHook != nil {
   494  		hookOpts, err := newPublisherClientHook(ctx, clientHookParams{})
   495  		if err != nil {
   496  			return nil, err
   497  		}
   498  		clientOpts = append(clientOpts, hookOpts...)
   499  	}
   500  
   501  	connPool, err := gtransport.DialPool(ctx, append(clientOpts, opts...)...)
   502  	if err != nil {
   503  		return nil, err
   504  	}
   505  	client := PublisherClient{CallOptions: defaultPublisherCallOptions()}
   506  
   507  	c := &publisherGRPCClient{
   508  		connPool:        connPool,
   509  		publisherClient: pubsubpb.NewPublisherClient(connPool),
   510  		CallOptions:     &client.CallOptions,
   511  		iamPolicyClient: iampb.NewIAMPolicyClient(connPool),
   512  	}
   513  	c.setGoogleClientInfo()
   514  
   515  	client.internalClient = c
   516  
   517  	return &client, nil
   518  }
   519  
   520  // Connection returns a connection to the API service.
   521  //
   522  // Deprecated: Connections are now pooled so this method does not always
   523  // return the same resource.
   524  func (c *publisherGRPCClient) Connection() *grpc.ClientConn {
   525  	return c.connPool.Conn()
   526  }
   527  
   528  // setGoogleClientInfo sets the name and version of the application in
   529  // the `x-goog-api-client` header passed on each request. Intended for
   530  // use by Google-written clients.
   531  func (c *publisherGRPCClient) setGoogleClientInfo(keyval ...string) {
   532  	kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
   533  	kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "grpc", grpc.Version)
   534  	c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
   535  }
   536  
   537  // Close closes the connection to the API service. The user should invoke this when
   538  // the client is no longer required.
   539  func (c *publisherGRPCClient) Close() error {
   540  	return c.connPool.Close()
   541  }
   542  
   543  // Methods, except Close, may be called concurrently. However, fields must not be modified concurrently with method calls.
   544  type publisherRESTClient struct {
   545  	// The http endpoint to connect to.
   546  	endpoint string
   547  
   548  	// The http client.
   549  	httpClient *http.Client
   550  
   551  	// The x-goog-* headers to be sent with each request.
   552  	xGoogHeaders []string
   553  
   554  	// Points back to the CallOptions field of the containing PublisherClient
   555  	CallOptions **PublisherCallOptions
   556  }
   557  
   558  // NewPublisherRESTClient creates a new publisher rest client.
   559  //
   560  // The service that an application uses to manipulate topics, and to send
   561  // messages to a topic.
   562  func NewPublisherRESTClient(ctx context.Context, opts ...option.ClientOption) (*PublisherClient, error) {
   563  	clientOpts := append(defaultPublisherRESTClientOptions(), opts...)
   564  	httpClient, endpoint, err := httptransport.NewClient(ctx, clientOpts...)
   565  	if err != nil {
   566  		return nil, err
   567  	}
   568  
   569  	callOpts := defaultPublisherRESTCallOptions()
   570  	c := &publisherRESTClient{
   571  		endpoint:    endpoint,
   572  		httpClient:  httpClient,
   573  		CallOptions: &callOpts,
   574  	}
   575  	c.setGoogleClientInfo()
   576  
   577  	return &PublisherClient{internalClient: c, CallOptions: callOpts}, nil
   578  }
   579  
   580  func defaultPublisherRESTClientOptions() []option.ClientOption {
   581  	return []option.ClientOption{
   582  		internaloption.WithDefaultEndpoint("https://pubsub.googleapis.com"),
   583  		internaloption.WithDefaultEndpointTemplate("https://pubsub.UNIVERSE_DOMAIN"),
   584  		internaloption.WithDefaultMTLSEndpoint("https://pubsub.mtls.googleapis.com"),
   585  		internaloption.WithDefaultUniverseDomain("googleapis.com"),
   586  		internaloption.WithDefaultAudience("https://pubsub.googleapis.com/"),
   587  		internaloption.WithDefaultScopes(DefaultAuthScopes()...),
   588  	}
   589  }
   590  
   591  // setGoogleClientInfo sets the name and version of the application in
   592  // the `x-goog-api-client` header passed on each request. Intended for
   593  // use by Google-written clients.
   594  func (c *publisherRESTClient) setGoogleClientInfo(keyval ...string) {
   595  	kv := append([]string{"gl-go", gax.GoVersion}, keyval...)
   596  	kv = append(kv, "gapic", getVersionClient(), "gax", gax.Version, "rest", "UNKNOWN")
   597  	c.xGoogHeaders = []string{"x-goog-api-client", gax.XGoogHeader(kv...)}
   598  }
   599  
   600  // Close closes the connection to the API service. The user should invoke this when
   601  // the client is no longer required.
   602  func (c *publisherRESTClient) Close() error {
   603  	// Replace httpClient with nil to force cleanup.
   604  	c.httpClient = nil
   605  	return nil
   606  }
   607  
   608  // Connection returns a connection to the API service.
   609  //
   610  // Deprecated: This method always returns nil.
   611  func (c *publisherRESTClient) Connection() *grpc.ClientConn {
   612  	return nil
   613  }
   614  func (c *publisherGRPCClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
   615  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
   616  
   617  	hds = append(c.xGoogHeaders, hds...)
   618  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   619  	opts = append((*c.CallOptions).CreateTopic[0:len((*c.CallOptions).CreateTopic):len((*c.CallOptions).CreateTopic)], opts...)
   620  	var resp *pubsubpb.Topic
   621  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   622  		var err error
   623  		resp, err = c.publisherClient.CreateTopic(ctx, req, settings.GRPC...)
   624  		return err
   625  	}, opts...)
   626  	if err != nil {
   627  		return nil, err
   628  	}
   629  	return resp, nil
   630  }
   631  
   632  func (c *publisherGRPCClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
   633  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic.name", url.QueryEscape(req.GetTopic().GetName()))}
   634  
   635  	hds = append(c.xGoogHeaders, hds...)
   636  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   637  	opts = append((*c.CallOptions).UpdateTopic[0:len((*c.CallOptions).UpdateTopic):len((*c.CallOptions).UpdateTopic)], opts...)
   638  	var resp *pubsubpb.Topic
   639  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   640  		var err error
   641  		resp, err = c.publisherClient.UpdateTopic(ctx, req, settings.GRPC...)
   642  		return err
   643  	}, opts...)
   644  	if err != nil {
   645  		return nil, err
   646  	}
   647  	return resp, nil
   648  }
   649  
   650  func (c *publisherGRPCClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) {
   651  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
   652  
   653  	hds = append(c.xGoogHeaders, hds...)
   654  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   655  	opts = append((*c.CallOptions).Publish[0:len((*c.CallOptions).Publish):len((*c.CallOptions).Publish)], opts...)
   656  	var resp *pubsubpb.PublishResponse
   657  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   658  		var err error
   659  		resp, err = c.publisherClient.Publish(ctx, req, settings.GRPC...)
   660  		return err
   661  	}, opts...)
   662  	if err != nil {
   663  		return nil, err
   664  	}
   665  	return resp, nil
   666  }
   667  
   668  func (c *publisherGRPCClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
   669  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
   670  
   671  	hds = append(c.xGoogHeaders, hds...)
   672  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   673  	opts = append((*c.CallOptions).GetTopic[0:len((*c.CallOptions).GetTopic):len((*c.CallOptions).GetTopic)], opts...)
   674  	var resp *pubsubpb.Topic
   675  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   676  		var err error
   677  		resp, err = c.publisherClient.GetTopic(ctx, req, settings.GRPC...)
   678  		return err
   679  	}, opts...)
   680  	if err != nil {
   681  		return nil, err
   682  	}
   683  	return resp, nil
   684  }
   685  
   686  func (c *publisherGRPCClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator {
   687  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "project", url.QueryEscape(req.GetProject()))}
   688  
   689  	hds = append(c.xGoogHeaders, hds...)
   690  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   691  	opts = append((*c.CallOptions).ListTopics[0:len((*c.CallOptions).ListTopics):len((*c.CallOptions).ListTopics)], opts...)
   692  	it := &TopicIterator{}
   693  	req = proto.Clone(req).(*pubsubpb.ListTopicsRequest)
   694  	it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Topic, string, error) {
   695  		resp := &pubsubpb.ListTopicsResponse{}
   696  		if pageToken != "" {
   697  			req.PageToken = pageToken
   698  		}
   699  		if pageSize > math.MaxInt32 {
   700  			req.PageSize = math.MaxInt32
   701  		} else if pageSize != 0 {
   702  			req.PageSize = int32(pageSize)
   703  		}
   704  		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   705  			var err error
   706  			resp, err = c.publisherClient.ListTopics(ctx, req, settings.GRPC...)
   707  			return err
   708  		}, opts...)
   709  		if err != nil {
   710  			return nil, "", err
   711  		}
   712  
   713  		it.Response = resp
   714  		return resp.GetTopics(), resp.GetNextPageToken(), nil
   715  	}
   716  	fetch := func(pageSize int, pageToken string) (string, error) {
   717  		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
   718  		if err != nil {
   719  			return "", err
   720  		}
   721  		it.items = append(it.items, items...)
   722  		return nextPageToken, nil
   723  	}
   724  
   725  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
   726  	it.pageInfo.MaxSize = int(req.GetPageSize())
   727  	it.pageInfo.Token = req.GetPageToken()
   728  
   729  	return it
   730  }
   731  
   732  func (c *publisherGRPCClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator {
   733  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
   734  
   735  	hds = append(c.xGoogHeaders, hds...)
   736  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   737  	opts = append((*c.CallOptions).ListTopicSubscriptions[0:len((*c.CallOptions).ListTopicSubscriptions):len((*c.CallOptions).ListTopicSubscriptions)], opts...)
   738  	it := &StringIterator{}
   739  	req = proto.Clone(req).(*pubsubpb.ListTopicSubscriptionsRequest)
   740  	it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
   741  		resp := &pubsubpb.ListTopicSubscriptionsResponse{}
   742  		if pageToken != "" {
   743  			req.PageToken = pageToken
   744  		}
   745  		if pageSize > math.MaxInt32 {
   746  			req.PageSize = math.MaxInt32
   747  		} else if pageSize != 0 {
   748  			req.PageSize = int32(pageSize)
   749  		}
   750  		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   751  			var err error
   752  			resp, err = c.publisherClient.ListTopicSubscriptions(ctx, req, settings.GRPC...)
   753  			return err
   754  		}, opts...)
   755  		if err != nil {
   756  			return nil, "", err
   757  		}
   758  
   759  		it.Response = resp
   760  		return resp.GetSubscriptions(), resp.GetNextPageToken(), nil
   761  	}
   762  	fetch := func(pageSize int, pageToken string) (string, error) {
   763  		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
   764  		if err != nil {
   765  			return "", err
   766  		}
   767  		it.items = append(it.items, items...)
   768  		return nextPageToken, nil
   769  	}
   770  
   771  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
   772  	it.pageInfo.MaxSize = int(req.GetPageSize())
   773  	it.pageInfo.Token = req.GetPageToken()
   774  
   775  	return it
   776  }
   777  
   778  func (c *publisherGRPCClient) ListTopicSnapshots(ctx context.Context, req *pubsubpb.ListTopicSnapshotsRequest, opts ...gax.CallOption) *StringIterator {
   779  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
   780  
   781  	hds = append(c.xGoogHeaders, hds...)
   782  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   783  	opts = append((*c.CallOptions).ListTopicSnapshots[0:len((*c.CallOptions).ListTopicSnapshots):len((*c.CallOptions).ListTopicSnapshots)], opts...)
   784  	it := &StringIterator{}
   785  	req = proto.Clone(req).(*pubsubpb.ListTopicSnapshotsRequest)
   786  	it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
   787  		resp := &pubsubpb.ListTopicSnapshotsResponse{}
   788  		if pageToken != "" {
   789  			req.PageToken = pageToken
   790  		}
   791  		if pageSize > math.MaxInt32 {
   792  			req.PageSize = math.MaxInt32
   793  		} else if pageSize != 0 {
   794  			req.PageSize = int32(pageSize)
   795  		}
   796  		err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   797  			var err error
   798  			resp, err = c.publisherClient.ListTopicSnapshots(ctx, req, settings.GRPC...)
   799  			return err
   800  		}, opts...)
   801  		if err != nil {
   802  			return nil, "", err
   803  		}
   804  
   805  		it.Response = resp
   806  		return resp.GetSnapshots(), resp.GetNextPageToken(), nil
   807  	}
   808  	fetch := func(pageSize int, pageToken string) (string, error) {
   809  		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
   810  		if err != nil {
   811  			return "", err
   812  		}
   813  		it.items = append(it.items, items...)
   814  		return nextPageToken, nil
   815  	}
   816  
   817  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
   818  	it.pageInfo.MaxSize = int(req.GetPageSize())
   819  	it.pageInfo.Token = req.GetPageToken()
   820  
   821  	return it
   822  }
   823  
   824  func (c *publisherGRPCClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error {
   825  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
   826  
   827  	hds = append(c.xGoogHeaders, hds...)
   828  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   829  	opts = append((*c.CallOptions).DeleteTopic[0:len((*c.CallOptions).DeleteTopic):len((*c.CallOptions).DeleteTopic)], opts...)
   830  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   831  		var err error
   832  		_, err = c.publisherClient.DeleteTopic(ctx, req, settings.GRPC...)
   833  		return err
   834  	}, opts...)
   835  	return err
   836  }
   837  
   838  func (c *publisherGRPCClient) DetachSubscription(ctx context.Context, req *pubsubpb.DetachSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.DetachSubscriptionResponse, error) {
   839  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription()))}
   840  
   841  	hds = append(c.xGoogHeaders, hds...)
   842  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   843  	opts = append((*c.CallOptions).DetachSubscription[0:len((*c.CallOptions).DetachSubscription):len((*c.CallOptions).DetachSubscription)], opts...)
   844  	var resp *pubsubpb.DetachSubscriptionResponse
   845  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   846  		var err error
   847  		resp, err = c.publisherClient.DetachSubscription(ctx, req, settings.GRPC...)
   848  		return err
   849  	}, opts...)
   850  	if err != nil {
   851  		return nil, err
   852  	}
   853  	return resp, nil
   854  }
   855  
   856  func (c *publisherGRPCClient) GetIamPolicy(ctx context.Context, req *iampb.GetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error) {
   857  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "resource", url.QueryEscape(req.GetResource()))}
   858  
   859  	hds = append(c.xGoogHeaders, hds...)
   860  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   861  	opts = append((*c.CallOptions).GetIamPolicy[0:len((*c.CallOptions).GetIamPolicy):len((*c.CallOptions).GetIamPolicy)], opts...)
   862  	var resp *iampb.Policy
   863  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   864  		var err error
   865  		resp, err = c.iamPolicyClient.GetIamPolicy(ctx, req, settings.GRPC...)
   866  		return err
   867  	}, opts...)
   868  	if err != nil {
   869  		return nil, err
   870  	}
   871  	return resp, nil
   872  }
   873  
   874  func (c *publisherGRPCClient) SetIamPolicy(ctx context.Context, req *iampb.SetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error) {
   875  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "resource", url.QueryEscape(req.GetResource()))}
   876  
   877  	hds = append(c.xGoogHeaders, hds...)
   878  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   879  	opts = append((*c.CallOptions).SetIamPolicy[0:len((*c.CallOptions).SetIamPolicy):len((*c.CallOptions).SetIamPolicy)], opts...)
   880  	var resp *iampb.Policy
   881  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   882  		var err error
   883  		resp, err = c.iamPolicyClient.SetIamPolicy(ctx, req, settings.GRPC...)
   884  		return err
   885  	}, opts...)
   886  	if err != nil {
   887  		return nil, err
   888  	}
   889  	return resp, nil
   890  }
   891  
   892  func (c *publisherGRPCClient) TestIamPermissions(ctx context.Context, req *iampb.TestIamPermissionsRequest, opts ...gax.CallOption) (*iampb.TestIamPermissionsResponse, error) {
   893  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "resource", url.QueryEscape(req.GetResource()))}
   894  
   895  	hds = append(c.xGoogHeaders, hds...)
   896  	ctx = gax.InsertMetadataIntoOutgoingContext(ctx, hds...)
   897  	opts = append((*c.CallOptions).TestIamPermissions[0:len((*c.CallOptions).TestIamPermissions):len((*c.CallOptions).TestIamPermissions)], opts...)
   898  	var resp *iampb.TestIamPermissionsResponse
   899  	err := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   900  		var err error
   901  		resp, err = c.iamPolicyClient.TestIamPermissions(ctx, req, settings.GRPC...)
   902  		return err
   903  	}, opts...)
   904  	if err != nil {
   905  		return nil, err
   906  	}
   907  	return resp, nil
   908  }
   909  
   910  // CreateTopic creates the given topic with the given name. See the [resource name rules]
   911  // (https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names (at https://cloud.google.com/pubsub/docs/pubsub-basics#resource_names)).
   912  func (c *publisherRESTClient) CreateTopic(ctx context.Context, req *pubsubpb.Topic, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
   913  	m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
   914  	jsonReq, err := m.Marshal(req)
   915  	if err != nil {
   916  		return nil, err
   917  	}
   918  
   919  	baseUrl, err := url.Parse(c.endpoint)
   920  	if err != nil {
   921  		return nil, err
   922  	}
   923  	baseUrl.Path += fmt.Sprintf("/v1/%v", req.GetName())
   924  
   925  	params := url.Values{}
   926  	params.Add("$alt", "json;enum-encoding=int")
   927  
   928  	baseUrl.RawQuery = params.Encode()
   929  
   930  	// Build HTTP headers from client and context metadata.
   931  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "name", url.QueryEscape(req.GetName()))}
   932  
   933  	hds = append(c.xGoogHeaders, hds...)
   934  	hds = append(hds, "Content-Type", "application/json")
   935  	headers := gax.BuildHeaders(ctx, hds...)
   936  	opts = append((*c.CallOptions).CreateTopic[0:len((*c.CallOptions).CreateTopic):len((*c.CallOptions).CreateTopic)], opts...)
   937  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
   938  	resp := &pubsubpb.Topic{}
   939  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
   940  		if settings.Path != "" {
   941  			baseUrl.Path = settings.Path
   942  		}
   943  		httpReq, err := http.NewRequest("PUT", baseUrl.String(), bytes.NewReader(jsonReq))
   944  		if err != nil {
   945  			return err
   946  		}
   947  		httpReq = httpReq.WithContext(ctx)
   948  		httpReq.Header = headers
   949  
   950  		httpRsp, err := c.httpClient.Do(httpReq)
   951  		if err != nil {
   952  			return err
   953  		}
   954  		defer httpRsp.Body.Close()
   955  
   956  		if err = googleapi.CheckResponse(httpRsp); err != nil {
   957  			return err
   958  		}
   959  
   960  		buf, err := io.ReadAll(httpRsp.Body)
   961  		if err != nil {
   962  			return err
   963  		}
   964  
   965  		if err := unm.Unmarshal(buf, resp); err != nil {
   966  			return err
   967  		}
   968  
   969  		return nil
   970  	}, opts...)
   971  	if e != nil {
   972  		return nil, e
   973  	}
   974  	return resp, nil
   975  }
   976  
   977  // UpdateTopic updates an existing topic by updating the fields specified in the update
   978  // mask. Note that certain properties of a topic are not modifiable.
   979  func (c *publisherRESTClient) UpdateTopic(ctx context.Context, req *pubsubpb.UpdateTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
   980  	m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
   981  	jsonReq, err := m.Marshal(req)
   982  	if err != nil {
   983  		return nil, err
   984  	}
   985  
   986  	baseUrl, err := url.Parse(c.endpoint)
   987  	if err != nil {
   988  		return nil, err
   989  	}
   990  	baseUrl.Path += fmt.Sprintf("/v1/%v", req.GetTopic().GetName())
   991  
   992  	params := url.Values{}
   993  	params.Add("$alt", "json;enum-encoding=int")
   994  
   995  	baseUrl.RawQuery = params.Encode()
   996  
   997  	// Build HTTP headers from client and context metadata.
   998  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic.name", url.QueryEscape(req.GetTopic().GetName()))}
   999  
  1000  	hds = append(c.xGoogHeaders, hds...)
  1001  	hds = append(hds, "Content-Type", "application/json")
  1002  	headers := gax.BuildHeaders(ctx, hds...)
  1003  	opts = append((*c.CallOptions).UpdateTopic[0:len((*c.CallOptions).UpdateTopic):len((*c.CallOptions).UpdateTopic)], opts...)
  1004  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
  1005  	resp := &pubsubpb.Topic{}
  1006  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
  1007  		if settings.Path != "" {
  1008  			baseUrl.Path = settings.Path
  1009  		}
  1010  		httpReq, err := http.NewRequest("PATCH", baseUrl.String(), bytes.NewReader(jsonReq))
  1011  		if err != nil {
  1012  			return err
  1013  		}
  1014  		httpReq = httpReq.WithContext(ctx)
  1015  		httpReq.Header = headers
  1016  
  1017  		httpRsp, err := c.httpClient.Do(httpReq)
  1018  		if err != nil {
  1019  			return err
  1020  		}
  1021  		defer httpRsp.Body.Close()
  1022  
  1023  		if err = googleapi.CheckResponse(httpRsp); err != nil {
  1024  			return err
  1025  		}
  1026  
  1027  		buf, err := io.ReadAll(httpRsp.Body)
  1028  		if err != nil {
  1029  			return err
  1030  		}
  1031  
  1032  		if err := unm.Unmarshal(buf, resp); err != nil {
  1033  			return err
  1034  		}
  1035  
  1036  		return nil
  1037  	}, opts...)
  1038  	if e != nil {
  1039  		return nil, e
  1040  	}
  1041  	return resp, nil
  1042  }
  1043  
  1044  // Publish adds one or more messages to the topic. Returns NOT_FOUND if the topic
  1045  // does not exist.
  1046  func (c *publisherRESTClient) Publish(ctx context.Context, req *pubsubpb.PublishRequest, opts ...gax.CallOption) (*pubsubpb.PublishResponse, error) {
  1047  	m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
  1048  	jsonReq, err := m.Marshal(req)
  1049  	if err != nil {
  1050  		return nil, err
  1051  	}
  1052  
  1053  	baseUrl, err := url.Parse(c.endpoint)
  1054  	if err != nil {
  1055  		return nil, err
  1056  	}
  1057  	baseUrl.Path += fmt.Sprintf("/v1/%v:publish", req.GetTopic())
  1058  
  1059  	params := url.Values{}
  1060  	params.Add("$alt", "json;enum-encoding=int")
  1061  
  1062  	baseUrl.RawQuery = params.Encode()
  1063  
  1064  	// Build HTTP headers from client and context metadata.
  1065  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
  1066  
  1067  	hds = append(c.xGoogHeaders, hds...)
  1068  	hds = append(hds, "Content-Type", "application/json")
  1069  	headers := gax.BuildHeaders(ctx, hds...)
  1070  	opts = append((*c.CallOptions).Publish[0:len((*c.CallOptions).Publish):len((*c.CallOptions).Publish)], opts...)
  1071  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
  1072  	resp := &pubsubpb.PublishResponse{}
  1073  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
  1074  		if settings.Path != "" {
  1075  			baseUrl.Path = settings.Path
  1076  		}
  1077  		httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
  1078  		if err != nil {
  1079  			return err
  1080  		}
  1081  		httpReq = httpReq.WithContext(ctx)
  1082  		httpReq.Header = headers
  1083  
  1084  		httpRsp, err := c.httpClient.Do(httpReq)
  1085  		if err != nil {
  1086  			return err
  1087  		}
  1088  		defer httpRsp.Body.Close()
  1089  
  1090  		if err = googleapi.CheckResponse(httpRsp); err != nil {
  1091  			return err
  1092  		}
  1093  
  1094  		buf, err := io.ReadAll(httpRsp.Body)
  1095  		if err != nil {
  1096  			return err
  1097  		}
  1098  
  1099  		if err := unm.Unmarshal(buf, resp); err != nil {
  1100  			return err
  1101  		}
  1102  
  1103  		return nil
  1104  	}, opts...)
  1105  	if e != nil {
  1106  		return nil, e
  1107  	}
  1108  	return resp, nil
  1109  }
  1110  
  1111  // GetTopic gets the configuration of a topic.
  1112  func (c *publisherRESTClient) GetTopic(ctx context.Context, req *pubsubpb.GetTopicRequest, opts ...gax.CallOption) (*pubsubpb.Topic, error) {
  1113  	baseUrl, err := url.Parse(c.endpoint)
  1114  	if err != nil {
  1115  		return nil, err
  1116  	}
  1117  	baseUrl.Path += fmt.Sprintf("/v1/%v", req.GetTopic())
  1118  
  1119  	params := url.Values{}
  1120  	params.Add("$alt", "json;enum-encoding=int")
  1121  
  1122  	baseUrl.RawQuery = params.Encode()
  1123  
  1124  	// Build HTTP headers from client and context metadata.
  1125  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
  1126  
  1127  	hds = append(c.xGoogHeaders, hds...)
  1128  	hds = append(hds, "Content-Type", "application/json")
  1129  	headers := gax.BuildHeaders(ctx, hds...)
  1130  	opts = append((*c.CallOptions).GetTopic[0:len((*c.CallOptions).GetTopic):len((*c.CallOptions).GetTopic)], opts...)
  1131  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
  1132  	resp := &pubsubpb.Topic{}
  1133  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
  1134  		if settings.Path != "" {
  1135  			baseUrl.Path = settings.Path
  1136  		}
  1137  		httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
  1138  		if err != nil {
  1139  			return err
  1140  		}
  1141  		httpReq = httpReq.WithContext(ctx)
  1142  		httpReq.Header = headers
  1143  
  1144  		httpRsp, err := c.httpClient.Do(httpReq)
  1145  		if err != nil {
  1146  			return err
  1147  		}
  1148  		defer httpRsp.Body.Close()
  1149  
  1150  		if err = googleapi.CheckResponse(httpRsp); err != nil {
  1151  			return err
  1152  		}
  1153  
  1154  		buf, err := io.ReadAll(httpRsp.Body)
  1155  		if err != nil {
  1156  			return err
  1157  		}
  1158  
  1159  		if err := unm.Unmarshal(buf, resp); err != nil {
  1160  			return err
  1161  		}
  1162  
  1163  		return nil
  1164  	}, opts...)
  1165  	if e != nil {
  1166  		return nil, e
  1167  	}
  1168  	return resp, nil
  1169  }
  1170  
  1171  // ListTopics lists matching topics.
  1172  func (c *publisherRESTClient) ListTopics(ctx context.Context, req *pubsubpb.ListTopicsRequest, opts ...gax.CallOption) *TopicIterator {
  1173  	it := &TopicIterator{}
  1174  	req = proto.Clone(req).(*pubsubpb.ListTopicsRequest)
  1175  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
  1176  	it.InternalFetch = func(pageSize int, pageToken string) ([]*pubsubpb.Topic, string, error) {
  1177  		resp := &pubsubpb.ListTopicsResponse{}
  1178  		if pageToken != "" {
  1179  			req.PageToken = pageToken
  1180  		}
  1181  		if pageSize > math.MaxInt32 {
  1182  			req.PageSize = math.MaxInt32
  1183  		} else if pageSize != 0 {
  1184  			req.PageSize = int32(pageSize)
  1185  		}
  1186  		baseUrl, err := url.Parse(c.endpoint)
  1187  		if err != nil {
  1188  			return nil, "", err
  1189  		}
  1190  		baseUrl.Path += fmt.Sprintf("/v1/%v/topics", req.GetProject())
  1191  
  1192  		params := url.Values{}
  1193  		params.Add("$alt", "json;enum-encoding=int")
  1194  		if req.GetPageSize() != 0 {
  1195  			params.Add("pageSize", fmt.Sprintf("%v", req.GetPageSize()))
  1196  		}
  1197  		if req.GetPageToken() != "" {
  1198  			params.Add("pageToken", fmt.Sprintf("%v", req.GetPageToken()))
  1199  		}
  1200  
  1201  		baseUrl.RawQuery = params.Encode()
  1202  
  1203  		// Build HTTP headers from client and context metadata.
  1204  		hds := append(c.xGoogHeaders, "Content-Type", "application/json")
  1205  		headers := gax.BuildHeaders(ctx, hds...)
  1206  		e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
  1207  			if settings.Path != "" {
  1208  				baseUrl.Path = settings.Path
  1209  			}
  1210  			httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
  1211  			if err != nil {
  1212  				return err
  1213  			}
  1214  			httpReq.Header = headers
  1215  
  1216  			httpRsp, err := c.httpClient.Do(httpReq)
  1217  			if err != nil {
  1218  				return err
  1219  			}
  1220  			defer httpRsp.Body.Close()
  1221  
  1222  			if err = googleapi.CheckResponse(httpRsp); err != nil {
  1223  				return err
  1224  			}
  1225  
  1226  			buf, err := io.ReadAll(httpRsp.Body)
  1227  			if err != nil {
  1228  				return err
  1229  			}
  1230  
  1231  			if err := unm.Unmarshal(buf, resp); err != nil {
  1232  				return err
  1233  			}
  1234  
  1235  			return nil
  1236  		}, opts...)
  1237  		if e != nil {
  1238  			return nil, "", e
  1239  		}
  1240  		it.Response = resp
  1241  		return resp.GetTopics(), resp.GetNextPageToken(), nil
  1242  	}
  1243  
  1244  	fetch := func(pageSize int, pageToken string) (string, error) {
  1245  		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
  1246  		if err != nil {
  1247  			return "", err
  1248  		}
  1249  		it.items = append(it.items, items...)
  1250  		return nextPageToken, nil
  1251  	}
  1252  
  1253  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
  1254  	it.pageInfo.MaxSize = int(req.GetPageSize())
  1255  	it.pageInfo.Token = req.GetPageToken()
  1256  
  1257  	return it
  1258  }
  1259  
  1260  // ListTopicSubscriptions lists the names of the attached subscriptions on this topic.
  1261  func (c *publisherRESTClient) ListTopicSubscriptions(ctx context.Context, req *pubsubpb.ListTopicSubscriptionsRequest, opts ...gax.CallOption) *StringIterator {
  1262  	it := &StringIterator{}
  1263  	req = proto.Clone(req).(*pubsubpb.ListTopicSubscriptionsRequest)
  1264  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
  1265  	it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
  1266  		resp := &pubsubpb.ListTopicSubscriptionsResponse{}
  1267  		if pageToken != "" {
  1268  			req.PageToken = pageToken
  1269  		}
  1270  		if pageSize > math.MaxInt32 {
  1271  			req.PageSize = math.MaxInt32
  1272  		} else if pageSize != 0 {
  1273  			req.PageSize = int32(pageSize)
  1274  		}
  1275  		baseUrl, err := url.Parse(c.endpoint)
  1276  		if err != nil {
  1277  			return nil, "", err
  1278  		}
  1279  		baseUrl.Path += fmt.Sprintf("/v1/%v/subscriptions", req.GetTopic())
  1280  
  1281  		params := url.Values{}
  1282  		params.Add("$alt", "json;enum-encoding=int")
  1283  		if req.GetPageSize() != 0 {
  1284  			params.Add("pageSize", fmt.Sprintf("%v", req.GetPageSize()))
  1285  		}
  1286  		if req.GetPageToken() != "" {
  1287  			params.Add("pageToken", fmt.Sprintf("%v", req.GetPageToken()))
  1288  		}
  1289  
  1290  		baseUrl.RawQuery = params.Encode()
  1291  
  1292  		// Build HTTP headers from client and context metadata.
  1293  		hds := append(c.xGoogHeaders, "Content-Type", "application/json")
  1294  		headers := gax.BuildHeaders(ctx, hds...)
  1295  		e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
  1296  			if settings.Path != "" {
  1297  				baseUrl.Path = settings.Path
  1298  			}
  1299  			httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
  1300  			if err != nil {
  1301  				return err
  1302  			}
  1303  			httpReq.Header = headers
  1304  
  1305  			httpRsp, err := c.httpClient.Do(httpReq)
  1306  			if err != nil {
  1307  				return err
  1308  			}
  1309  			defer httpRsp.Body.Close()
  1310  
  1311  			if err = googleapi.CheckResponse(httpRsp); err != nil {
  1312  				return err
  1313  			}
  1314  
  1315  			buf, err := io.ReadAll(httpRsp.Body)
  1316  			if err != nil {
  1317  				return err
  1318  			}
  1319  
  1320  			if err := unm.Unmarshal(buf, resp); err != nil {
  1321  				return err
  1322  			}
  1323  
  1324  			return nil
  1325  		}, opts...)
  1326  		if e != nil {
  1327  			return nil, "", e
  1328  		}
  1329  		it.Response = resp
  1330  		return resp.GetSubscriptions(), resp.GetNextPageToken(), nil
  1331  	}
  1332  
  1333  	fetch := func(pageSize int, pageToken string) (string, error) {
  1334  		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
  1335  		if err != nil {
  1336  			return "", err
  1337  		}
  1338  		it.items = append(it.items, items...)
  1339  		return nextPageToken, nil
  1340  	}
  1341  
  1342  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
  1343  	it.pageInfo.MaxSize = int(req.GetPageSize())
  1344  	it.pageInfo.Token = req.GetPageToken()
  1345  
  1346  	return it
  1347  }
  1348  
  1349  // ListTopicSnapshots lists the names of the snapshots on this topic. Snapshots are used in
  1350  // Seek (at https://cloud.google.com/pubsub/docs/replay-overview) operations,
  1351  // which allow you to manage message acknowledgments in bulk. That is, you can
  1352  // set the acknowledgment state of messages in an existing subscription to the
  1353  // state captured by a snapshot.
  1354  func (c *publisherRESTClient) ListTopicSnapshots(ctx context.Context, req *pubsubpb.ListTopicSnapshotsRequest, opts ...gax.CallOption) *StringIterator {
  1355  	it := &StringIterator{}
  1356  	req = proto.Clone(req).(*pubsubpb.ListTopicSnapshotsRequest)
  1357  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
  1358  	it.InternalFetch = func(pageSize int, pageToken string) ([]string, string, error) {
  1359  		resp := &pubsubpb.ListTopicSnapshotsResponse{}
  1360  		if pageToken != "" {
  1361  			req.PageToken = pageToken
  1362  		}
  1363  		if pageSize > math.MaxInt32 {
  1364  			req.PageSize = math.MaxInt32
  1365  		} else if pageSize != 0 {
  1366  			req.PageSize = int32(pageSize)
  1367  		}
  1368  		baseUrl, err := url.Parse(c.endpoint)
  1369  		if err != nil {
  1370  			return nil, "", err
  1371  		}
  1372  		baseUrl.Path += fmt.Sprintf("/v1/%v/snapshots", req.GetTopic())
  1373  
  1374  		params := url.Values{}
  1375  		params.Add("$alt", "json;enum-encoding=int")
  1376  		if req.GetPageSize() != 0 {
  1377  			params.Add("pageSize", fmt.Sprintf("%v", req.GetPageSize()))
  1378  		}
  1379  		if req.GetPageToken() != "" {
  1380  			params.Add("pageToken", fmt.Sprintf("%v", req.GetPageToken()))
  1381  		}
  1382  
  1383  		baseUrl.RawQuery = params.Encode()
  1384  
  1385  		// Build HTTP headers from client and context metadata.
  1386  		hds := append(c.xGoogHeaders, "Content-Type", "application/json")
  1387  		headers := gax.BuildHeaders(ctx, hds...)
  1388  		e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
  1389  			if settings.Path != "" {
  1390  				baseUrl.Path = settings.Path
  1391  			}
  1392  			httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
  1393  			if err != nil {
  1394  				return err
  1395  			}
  1396  			httpReq.Header = headers
  1397  
  1398  			httpRsp, err := c.httpClient.Do(httpReq)
  1399  			if err != nil {
  1400  				return err
  1401  			}
  1402  			defer httpRsp.Body.Close()
  1403  
  1404  			if err = googleapi.CheckResponse(httpRsp); err != nil {
  1405  				return err
  1406  			}
  1407  
  1408  			buf, err := io.ReadAll(httpRsp.Body)
  1409  			if err != nil {
  1410  				return err
  1411  			}
  1412  
  1413  			if err := unm.Unmarshal(buf, resp); err != nil {
  1414  				return err
  1415  			}
  1416  
  1417  			return nil
  1418  		}, opts...)
  1419  		if e != nil {
  1420  			return nil, "", e
  1421  		}
  1422  		it.Response = resp
  1423  		return resp.GetSnapshots(), resp.GetNextPageToken(), nil
  1424  	}
  1425  
  1426  	fetch := func(pageSize int, pageToken string) (string, error) {
  1427  		items, nextPageToken, err := it.InternalFetch(pageSize, pageToken)
  1428  		if err != nil {
  1429  			return "", err
  1430  		}
  1431  		it.items = append(it.items, items...)
  1432  		return nextPageToken, nil
  1433  	}
  1434  
  1435  	it.pageInfo, it.nextFunc = iterator.NewPageInfo(fetch, it.bufLen, it.takeBuf)
  1436  	it.pageInfo.MaxSize = int(req.GetPageSize())
  1437  	it.pageInfo.Token = req.GetPageToken()
  1438  
  1439  	return it
  1440  }
  1441  
  1442  // DeleteTopic deletes the topic with the given name. Returns NOT_FOUND if the topic
  1443  // does not exist. After a topic is deleted, a new topic may be created with
  1444  // the same name; this is an entirely new topic with none of the old
  1445  // configuration or subscriptions. Existing subscriptions to this topic are
  1446  // not deleted, but their topic field is set to _deleted-topic_.
  1447  func (c *publisherRESTClient) DeleteTopic(ctx context.Context, req *pubsubpb.DeleteTopicRequest, opts ...gax.CallOption) error {
  1448  	baseUrl, err := url.Parse(c.endpoint)
  1449  	if err != nil {
  1450  		return err
  1451  	}
  1452  	baseUrl.Path += fmt.Sprintf("/v1/%v", req.GetTopic())
  1453  
  1454  	params := url.Values{}
  1455  	params.Add("$alt", "json;enum-encoding=int")
  1456  
  1457  	baseUrl.RawQuery = params.Encode()
  1458  
  1459  	// Build HTTP headers from client and context metadata.
  1460  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "topic", url.QueryEscape(req.GetTopic()))}
  1461  
  1462  	hds = append(c.xGoogHeaders, hds...)
  1463  	hds = append(hds, "Content-Type", "application/json")
  1464  	headers := gax.BuildHeaders(ctx, hds...)
  1465  	return gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
  1466  		if settings.Path != "" {
  1467  			baseUrl.Path = settings.Path
  1468  		}
  1469  		httpReq, err := http.NewRequest("DELETE", baseUrl.String(), nil)
  1470  		if err != nil {
  1471  			return err
  1472  		}
  1473  		httpReq = httpReq.WithContext(ctx)
  1474  		httpReq.Header = headers
  1475  
  1476  		httpRsp, err := c.httpClient.Do(httpReq)
  1477  		if err != nil {
  1478  			return err
  1479  		}
  1480  		defer httpRsp.Body.Close()
  1481  
  1482  		// Returns nil if there is no error, otherwise wraps
  1483  		// the response code and body into a non-nil error
  1484  		return googleapi.CheckResponse(httpRsp)
  1485  	}, opts...)
  1486  }
  1487  
  1488  // DetachSubscription detaches a subscription from this topic. All messages retained in the
  1489  // subscription are dropped. Subsequent Pull and StreamingPull requests
  1490  // will return FAILED_PRECONDITION. If the subscription is a push
  1491  // subscription, pushes to the endpoint will stop.
  1492  func (c *publisherRESTClient) DetachSubscription(ctx context.Context, req *pubsubpb.DetachSubscriptionRequest, opts ...gax.CallOption) (*pubsubpb.DetachSubscriptionResponse, error) {
  1493  	baseUrl, err := url.Parse(c.endpoint)
  1494  	if err != nil {
  1495  		return nil, err
  1496  	}
  1497  	baseUrl.Path += fmt.Sprintf("/v1/%v:detach", req.GetSubscription())
  1498  
  1499  	params := url.Values{}
  1500  	params.Add("$alt", "json;enum-encoding=int")
  1501  
  1502  	baseUrl.RawQuery = params.Encode()
  1503  
  1504  	// Build HTTP headers from client and context metadata.
  1505  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "subscription", url.QueryEscape(req.GetSubscription()))}
  1506  
  1507  	hds = append(c.xGoogHeaders, hds...)
  1508  	hds = append(hds, "Content-Type", "application/json")
  1509  	headers := gax.BuildHeaders(ctx, hds...)
  1510  	opts = append((*c.CallOptions).DetachSubscription[0:len((*c.CallOptions).DetachSubscription):len((*c.CallOptions).DetachSubscription)], opts...)
  1511  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
  1512  	resp := &pubsubpb.DetachSubscriptionResponse{}
  1513  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
  1514  		if settings.Path != "" {
  1515  			baseUrl.Path = settings.Path
  1516  		}
  1517  		httpReq, err := http.NewRequest("POST", baseUrl.String(), nil)
  1518  		if err != nil {
  1519  			return err
  1520  		}
  1521  		httpReq = httpReq.WithContext(ctx)
  1522  		httpReq.Header = headers
  1523  
  1524  		httpRsp, err := c.httpClient.Do(httpReq)
  1525  		if err != nil {
  1526  			return err
  1527  		}
  1528  		defer httpRsp.Body.Close()
  1529  
  1530  		if err = googleapi.CheckResponse(httpRsp); err != nil {
  1531  			return err
  1532  		}
  1533  
  1534  		buf, err := io.ReadAll(httpRsp.Body)
  1535  		if err != nil {
  1536  			return err
  1537  		}
  1538  
  1539  		if err := unm.Unmarshal(buf, resp); err != nil {
  1540  			return err
  1541  		}
  1542  
  1543  		return nil
  1544  	}, opts...)
  1545  	if e != nil {
  1546  		return nil, e
  1547  	}
  1548  	return resp, nil
  1549  }
  1550  
  1551  // GetIamPolicy gets the access control policy for a resource. Returns an empty policy
  1552  // if the resource exists and does not have a policy set.
  1553  func (c *publisherRESTClient) GetIamPolicy(ctx context.Context, req *iampb.GetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error) {
  1554  	baseUrl, err := url.Parse(c.endpoint)
  1555  	if err != nil {
  1556  		return nil, err
  1557  	}
  1558  	baseUrl.Path += fmt.Sprintf("/v1/%v:getIamPolicy", req.GetResource())
  1559  
  1560  	params := url.Values{}
  1561  	params.Add("$alt", "json;enum-encoding=int")
  1562  	if req.GetOptions().GetRequestedPolicyVersion() != 0 {
  1563  		params.Add("options.requestedPolicyVersion", fmt.Sprintf("%v", req.GetOptions().GetRequestedPolicyVersion()))
  1564  	}
  1565  
  1566  	baseUrl.RawQuery = params.Encode()
  1567  
  1568  	// Build HTTP headers from client and context metadata.
  1569  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "resource", url.QueryEscape(req.GetResource()))}
  1570  
  1571  	hds = append(c.xGoogHeaders, hds...)
  1572  	hds = append(hds, "Content-Type", "application/json")
  1573  	headers := gax.BuildHeaders(ctx, hds...)
  1574  	opts = append((*c.CallOptions).GetIamPolicy[0:len((*c.CallOptions).GetIamPolicy):len((*c.CallOptions).GetIamPolicy)], opts...)
  1575  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
  1576  	resp := &iampb.Policy{}
  1577  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
  1578  		if settings.Path != "" {
  1579  			baseUrl.Path = settings.Path
  1580  		}
  1581  		httpReq, err := http.NewRequest("GET", baseUrl.String(), nil)
  1582  		if err != nil {
  1583  			return err
  1584  		}
  1585  		httpReq = httpReq.WithContext(ctx)
  1586  		httpReq.Header = headers
  1587  
  1588  		httpRsp, err := c.httpClient.Do(httpReq)
  1589  		if err != nil {
  1590  			return err
  1591  		}
  1592  		defer httpRsp.Body.Close()
  1593  
  1594  		if err = googleapi.CheckResponse(httpRsp); err != nil {
  1595  			return err
  1596  		}
  1597  
  1598  		buf, err := io.ReadAll(httpRsp.Body)
  1599  		if err != nil {
  1600  			return err
  1601  		}
  1602  
  1603  		if err := unm.Unmarshal(buf, resp); err != nil {
  1604  			return err
  1605  		}
  1606  
  1607  		return nil
  1608  	}, opts...)
  1609  	if e != nil {
  1610  		return nil, e
  1611  	}
  1612  	return resp, nil
  1613  }
  1614  
  1615  // SetIamPolicy sets the access control policy on the specified resource. Replaces
  1616  // any existing policy.
  1617  //
  1618  // Can return NOT_FOUND, INVALID_ARGUMENT, and PERMISSION_DENIED
  1619  // errors.
  1620  func (c *publisherRESTClient) SetIamPolicy(ctx context.Context, req *iampb.SetIamPolicyRequest, opts ...gax.CallOption) (*iampb.Policy, error) {
  1621  	m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
  1622  	jsonReq, err := m.Marshal(req)
  1623  	if err != nil {
  1624  		return nil, err
  1625  	}
  1626  
  1627  	baseUrl, err := url.Parse(c.endpoint)
  1628  	if err != nil {
  1629  		return nil, err
  1630  	}
  1631  	baseUrl.Path += fmt.Sprintf("/v1/%v:setIamPolicy", req.GetResource())
  1632  
  1633  	params := url.Values{}
  1634  	params.Add("$alt", "json;enum-encoding=int")
  1635  
  1636  	baseUrl.RawQuery = params.Encode()
  1637  
  1638  	// Build HTTP headers from client and context metadata.
  1639  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "resource", url.QueryEscape(req.GetResource()))}
  1640  
  1641  	hds = append(c.xGoogHeaders, hds...)
  1642  	hds = append(hds, "Content-Type", "application/json")
  1643  	headers := gax.BuildHeaders(ctx, hds...)
  1644  	opts = append((*c.CallOptions).SetIamPolicy[0:len((*c.CallOptions).SetIamPolicy):len((*c.CallOptions).SetIamPolicy)], opts...)
  1645  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
  1646  	resp := &iampb.Policy{}
  1647  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
  1648  		if settings.Path != "" {
  1649  			baseUrl.Path = settings.Path
  1650  		}
  1651  		httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
  1652  		if err != nil {
  1653  			return err
  1654  		}
  1655  		httpReq = httpReq.WithContext(ctx)
  1656  		httpReq.Header = headers
  1657  
  1658  		httpRsp, err := c.httpClient.Do(httpReq)
  1659  		if err != nil {
  1660  			return err
  1661  		}
  1662  		defer httpRsp.Body.Close()
  1663  
  1664  		if err = googleapi.CheckResponse(httpRsp); err != nil {
  1665  			return err
  1666  		}
  1667  
  1668  		buf, err := io.ReadAll(httpRsp.Body)
  1669  		if err != nil {
  1670  			return err
  1671  		}
  1672  
  1673  		if err := unm.Unmarshal(buf, resp); err != nil {
  1674  			return err
  1675  		}
  1676  
  1677  		return nil
  1678  	}, opts...)
  1679  	if e != nil {
  1680  		return nil, e
  1681  	}
  1682  	return resp, nil
  1683  }
  1684  
  1685  // TestIamPermissions returns permissions that a caller has on the specified resource. If the
  1686  // resource does not exist, this will return an empty set of
  1687  // permissions, not a NOT_FOUND error.
  1688  //
  1689  // Note: This operation is designed to be used for building
  1690  // permission-aware UIs and command-line tools, not for authorization
  1691  // checking. This operation may “fail open” without warning.
  1692  func (c *publisherRESTClient) TestIamPermissions(ctx context.Context, req *iampb.TestIamPermissionsRequest, opts ...gax.CallOption) (*iampb.TestIamPermissionsResponse, error) {
  1693  	m := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
  1694  	jsonReq, err := m.Marshal(req)
  1695  	if err != nil {
  1696  		return nil, err
  1697  	}
  1698  
  1699  	baseUrl, err := url.Parse(c.endpoint)
  1700  	if err != nil {
  1701  		return nil, err
  1702  	}
  1703  	baseUrl.Path += fmt.Sprintf("/v1/%v:testIamPermissions", req.GetResource())
  1704  
  1705  	params := url.Values{}
  1706  	params.Add("$alt", "json;enum-encoding=int")
  1707  
  1708  	baseUrl.RawQuery = params.Encode()
  1709  
  1710  	// Build HTTP headers from client and context metadata.
  1711  	hds := []string{"x-goog-request-params", fmt.Sprintf("%s=%v", "resource", url.QueryEscape(req.GetResource()))}
  1712  
  1713  	hds = append(c.xGoogHeaders, hds...)
  1714  	hds = append(hds, "Content-Type", "application/json")
  1715  	headers := gax.BuildHeaders(ctx, hds...)
  1716  	opts = append((*c.CallOptions).TestIamPermissions[0:len((*c.CallOptions).TestIamPermissions):len((*c.CallOptions).TestIamPermissions)], opts...)
  1717  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
  1718  	resp := &iampb.TestIamPermissionsResponse{}
  1719  	e := gax.Invoke(ctx, func(ctx context.Context, settings gax.CallSettings) error {
  1720  		if settings.Path != "" {
  1721  			baseUrl.Path = settings.Path
  1722  		}
  1723  		httpReq, err := http.NewRequest("POST", baseUrl.String(), bytes.NewReader(jsonReq))
  1724  		if err != nil {
  1725  			return err
  1726  		}
  1727  		httpReq = httpReq.WithContext(ctx)
  1728  		httpReq.Header = headers
  1729  
  1730  		httpRsp, err := c.httpClient.Do(httpReq)
  1731  		if err != nil {
  1732  			return err
  1733  		}
  1734  		defer httpRsp.Body.Close()
  1735  
  1736  		if err = googleapi.CheckResponse(httpRsp); err != nil {
  1737  			return err
  1738  		}
  1739  
  1740  		buf, err := io.ReadAll(httpRsp.Body)
  1741  		if err != nil {
  1742  			return err
  1743  		}
  1744  
  1745  		if err := unm.Unmarshal(buf, resp); err != nil {
  1746  			return err
  1747  		}
  1748  
  1749  		return nil
  1750  	}, opts...)
  1751  	if e != nil {
  1752  		return nil, e
  1753  	}
  1754  	return resp, nil
  1755  }
  1756  

View as plain text