...

Source file src/cloud.google.com/go/pubsub/pubsub.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2014 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub // import "cloud.google.com/go/pubsub"
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"os"
    22  	"reflect"
    23  	"runtime"
    24  	"strings"
    25  	"time"
    26  
    27  	"cloud.google.com/go/internal/detect"
    28  	vkit "cloud.google.com/go/pubsub/apiv1"
    29  	"cloud.google.com/go/pubsub/internal"
    30  	gax "github.com/googleapis/gax-go/v2"
    31  	"google.golang.org/api/option"
    32  	"google.golang.org/api/option/internaloption"
    33  	"google.golang.org/grpc"
    34  	"google.golang.org/grpc/keepalive"
    35  )
    36  
    37  const (
    38  	// ScopePubSub grants permissions to view and manage Pub/Sub
    39  	// topics and subscriptions.
    40  	ScopePubSub = "https://www.googleapis.com/auth/pubsub"
    41  
    42  	// ScopeCloudPlatform grants permissions to view and manage your data
    43  	// across Google Cloud Platform services.
    44  	ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform"
    45  )
    46  
    47  // Client is a Google Pub/Sub client scoped to a single project.
    48  //
    49  // Clients should be reused rather than being created as needed.
    50  // A Client may be shared by multiple goroutines.
    51  type Client struct {
    52  	projectID string
    53  	pubc      *vkit.PublisherClient
    54  	subc      *vkit.SubscriberClient
    55  }
    56  
    57  // ClientConfig has configurations for the client.
    58  type ClientConfig struct {
    59  	PublisherCallOptions  *vkit.PublisherCallOptions
    60  	SubscriberCallOptions *vkit.SubscriberCallOptions
    61  }
    62  
    63  // mergePublisherCallOptions merges two PublisherCallOptions into one and the first argument has
    64  // a lower order of precedence than the second one. If either is nil, return the other.
    65  func mergePublisherCallOptions(a *vkit.PublisherCallOptions, b *vkit.PublisherCallOptions) *vkit.PublisherCallOptions {
    66  	if a == nil {
    67  		return b
    68  	}
    69  	if b == nil {
    70  		return a
    71  	}
    72  	res := &vkit.PublisherCallOptions{}
    73  	resVal := reflect.ValueOf(res).Elem()
    74  	aVal := reflect.ValueOf(a).Elem()
    75  	bVal := reflect.ValueOf(b).Elem()
    76  
    77  	t := aVal.Type()
    78  
    79  	for i := 0; i < aVal.NumField(); i++ {
    80  		fieldName := t.Field(i).Name
    81  
    82  		aFieldVal := aVal.Field(i).Interface().([]gax.CallOption)
    83  		bFieldVal := bVal.Field(i).Interface().([]gax.CallOption)
    84  
    85  		merged := append(aFieldVal, bFieldVal...)
    86  		resVal.FieldByName(fieldName).Set(reflect.ValueOf(merged))
    87  	}
    88  	return res
    89  }
    90  
    91  // mergeSubscribercallOptions merges two SubscriberCallOptions into one and the first argument has
    92  // a lower order of precedence than the second one. If either is nil, the other is used.
    93  func mergeSubscriberCallOptions(a *vkit.SubscriberCallOptions, b *vkit.SubscriberCallOptions) *vkit.SubscriberCallOptions {
    94  	if a == nil {
    95  		return b
    96  	}
    97  	if b == nil {
    98  		return a
    99  	}
   100  	res := &vkit.SubscriberCallOptions{}
   101  	resVal := reflect.ValueOf(res).Elem()
   102  	aVal := reflect.ValueOf(a).Elem()
   103  	bVal := reflect.ValueOf(b).Elem()
   104  
   105  	t := aVal.Type()
   106  
   107  	for i := 0; i < aVal.NumField(); i++ {
   108  		fieldName := t.Field(i).Name
   109  
   110  		aFieldVal := aVal.Field(i).Interface().([]gax.CallOption)
   111  		bFieldVal := bVal.Field(i).Interface().([]gax.CallOption)
   112  
   113  		merged := append(aFieldVal, bFieldVal...)
   114  		resVal.FieldByName(fieldName).Set(reflect.ValueOf(merged))
   115  	}
   116  	return res
   117  }
   118  
   119  // DetectProjectID is a sentinel value that instructs NewClient to detect the
   120  // project ID. It is given in place of the projectID argument. NewClient will
   121  // use the project ID from the given credentials or the default credentials
   122  // (https://developers.google.com/accounts/docs/application-default-credentials)
   123  // if no credentials were provided. When providing credentials, not all
   124  // options will allow NewClient to extract the project ID. Specifically a JWT
   125  // does not have the project ID encoded.
   126  const DetectProjectID = "*detect-project-id*"
   127  
   128  // ErrEmptyProjectID denotes that the project string passed into NewClient was empty.
   129  // Please provide a valid project ID or use the DetectProjectID sentinel value to detect
   130  // project ID from well defined sources.
   131  var ErrEmptyProjectID = errors.New("pubsub: projectID string is empty")
   132  
   133  // NewClient creates a new PubSub client. It uses a default configuration.
   134  func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) {
   135  	return NewClientWithConfig(ctx, projectID, nil, opts...)
   136  }
   137  
   138  // NewClientWithConfig creates a new PubSub client.
   139  func NewClientWithConfig(ctx context.Context, projectID string, config *ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
   140  	if projectID == "" {
   141  		return nil, ErrEmptyProjectID
   142  	}
   143  	var o []option.ClientOption
   144  	// Environment variables for gcloud emulator:
   145  	// https://cloud.google.com/sdk/gcloud/reference/beta/emulators/pubsub/
   146  	if addr := os.Getenv("PUBSUB_EMULATOR_HOST"); addr != "" {
   147  		emulatorOpts := []option.ClientOption{
   148  			option.WithEndpoint(addr),
   149  			option.WithGRPCDialOption(grpc.WithInsecure()),
   150  			option.WithoutAuthentication(),
   151  			option.WithTelemetryDisabled(),
   152  			internaloption.SkipDialSettingsValidation(),
   153  		}
   154  		opts = append(emulatorOpts, opts...)
   155  	} else {
   156  		numConns := runtime.GOMAXPROCS(0)
   157  		if numConns > 4 {
   158  			numConns = 4
   159  		}
   160  		o = []option.ClientOption{
   161  			// Create multiple connections to increase throughput.
   162  			option.WithGRPCConnectionPool(numConns),
   163  			option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
   164  				Time: 5 * time.Minute,
   165  			})),
   166  		}
   167  	}
   168  	o = append(o, opts...)
   169  	pubc, err := vkit.NewPublisherClient(ctx, o...)
   170  	if err != nil {
   171  		return nil, fmt.Errorf("pubsub(publisher): %w", err)
   172  	}
   173  	subc, err := vkit.NewSubscriberClient(ctx, o...)
   174  	if err != nil {
   175  		return nil, fmt.Errorf("pubsub(subscriber): %w", err)
   176  	}
   177  	if config != nil {
   178  		pubc.CallOptions = mergePublisherCallOptions(pubc.CallOptions, config.PublisherCallOptions)
   179  		subc.CallOptions = mergeSubscriberCallOptions(subc.CallOptions, config.SubscriberCallOptions)
   180  	}
   181  	pubc.SetGoogleClientInfo("gccl", internal.Version)
   182  	subc.SetGoogleClientInfo("gccl", internal.Version)
   183  
   184  	// Handle project autodetection.
   185  	projectID, err = detect.ProjectID(ctx, projectID, "", opts...)
   186  	if err != nil {
   187  		return nil, err
   188  	}
   189  
   190  	return &Client{
   191  		projectID: projectID,
   192  		pubc:      pubc,
   193  		subc:      subc,
   194  	}, nil
   195  }
   196  
   197  // Project returns the project ID or number for this instance of the client, which may have
   198  // either been explicitly specified or autodetected.
   199  func (c *Client) Project() string {
   200  	return c.projectID
   201  }
   202  
   203  // Close releases any resources held by the client,
   204  // such as memory and goroutines.
   205  //
   206  // If the client is available for the lifetime of the program, then Close need not be
   207  // called at exit.
   208  func (c *Client) Close() error {
   209  	pubErr := c.pubc.Close()
   210  	subErr := c.subc.Close()
   211  	if pubErr != nil {
   212  		return fmt.Errorf("pubsub publisher closing error: %w", pubErr)
   213  	}
   214  	if subErr != nil {
   215  		// Suppress client connection closing errors. This will only happen
   216  		// when using the client in conjunction with the Pub/Sub emulator
   217  		// or fake (pstest). Closing both clients separately will never
   218  		// return this error against the live Pub/Sub service.
   219  		if strings.Contains(subErr.Error(), "the client connection is closing") {
   220  			return nil
   221  		}
   222  		return fmt.Errorf("pubsub subscriber closing error: %w", subErr)
   223  	}
   224  	return nil
   225  }
   226  
   227  func (c *Client) fullyQualifiedProjectName() string {
   228  	return fmt.Sprintf("projects/%s", c.projectID)
   229  }
   230  

View as plain text