1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "os"
22 "reflect"
23 "runtime"
24 "strings"
25 "time"
26
27 "cloud.google.com/go/internal/detect"
28 vkit "cloud.google.com/go/pubsub/apiv1"
29 "cloud.google.com/go/pubsub/internal"
30 gax "github.com/googleapis/gax-go/v2"
31 "google.golang.org/api/option"
32 "google.golang.org/api/option/internaloption"
33 "google.golang.org/grpc"
34 "google.golang.org/grpc/keepalive"
35 )
36
37 const (
38
39
40 ScopePubSub = "https://www.googleapis.com/auth/pubsub"
41
42
43
44 ScopeCloudPlatform = "https://www.googleapis.com/auth/cloud-platform"
45 )
46
47
48
49
50
51 type Client struct {
52 projectID string
53 pubc *vkit.PublisherClient
54 subc *vkit.SubscriberClient
55 }
56
57
58 type ClientConfig struct {
59 PublisherCallOptions *vkit.PublisherCallOptions
60 SubscriberCallOptions *vkit.SubscriberCallOptions
61 }
62
63
64
65 func mergePublisherCallOptions(a *vkit.PublisherCallOptions, b *vkit.PublisherCallOptions) *vkit.PublisherCallOptions {
66 if a == nil {
67 return b
68 }
69 if b == nil {
70 return a
71 }
72 res := &vkit.PublisherCallOptions{}
73 resVal := reflect.ValueOf(res).Elem()
74 aVal := reflect.ValueOf(a).Elem()
75 bVal := reflect.ValueOf(b).Elem()
76
77 t := aVal.Type()
78
79 for i := 0; i < aVal.NumField(); i++ {
80 fieldName := t.Field(i).Name
81
82 aFieldVal := aVal.Field(i).Interface().([]gax.CallOption)
83 bFieldVal := bVal.Field(i).Interface().([]gax.CallOption)
84
85 merged := append(aFieldVal, bFieldVal...)
86 resVal.FieldByName(fieldName).Set(reflect.ValueOf(merged))
87 }
88 return res
89 }
90
91
92
93 func mergeSubscriberCallOptions(a *vkit.SubscriberCallOptions, b *vkit.SubscriberCallOptions) *vkit.SubscriberCallOptions {
94 if a == nil {
95 return b
96 }
97 if b == nil {
98 return a
99 }
100 res := &vkit.SubscriberCallOptions{}
101 resVal := reflect.ValueOf(res).Elem()
102 aVal := reflect.ValueOf(a).Elem()
103 bVal := reflect.ValueOf(b).Elem()
104
105 t := aVal.Type()
106
107 for i := 0; i < aVal.NumField(); i++ {
108 fieldName := t.Field(i).Name
109
110 aFieldVal := aVal.Field(i).Interface().([]gax.CallOption)
111 bFieldVal := bVal.Field(i).Interface().([]gax.CallOption)
112
113 merged := append(aFieldVal, bFieldVal...)
114 resVal.FieldByName(fieldName).Set(reflect.ValueOf(merged))
115 }
116 return res
117 }
118
119
120
121
122
123
124
125
126 const DetectProjectID = "*detect-project-id*"
127
128
129
130
131 var ErrEmptyProjectID = errors.New("pubsub: projectID string is empty")
132
133
134 func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (c *Client, err error) {
135 return NewClientWithConfig(ctx, projectID, nil, opts...)
136 }
137
138
139 func NewClientWithConfig(ctx context.Context, projectID string, config *ClientConfig, opts ...option.ClientOption) (c *Client, err error) {
140 if projectID == "" {
141 return nil, ErrEmptyProjectID
142 }
143 var o []option.ClientOption
144
145
146 if addr := os.Getenv("PUBSUB_EMULATOR_HOST"); addr != "" {
147 emulatorOpts := []option.ClientOption{
148 option.WithEndpoint(addr),
149 option.WithGRPCDialOption(grpc.WithInsecure()),
150 option.WithoutAuthentication(),
151 option.WithTelemetryDisabled(),
152 internaloption.SkipDialSettingsValidation(),
153 }
154 opts = append(emulatorOpts, opts...)
155 } else {
156 numConns := runtime.GOMAXPROCS(0)
157 if numConns > 4 {
158 numConns = 4
159 }
160 o = []option.ClientOption{
161
162 option.WithGRPCConnectionPool(numConns),
163 option.WithGRPCDialOption(grpc.WithKeepaliveParams(keepalive.ClientParameters{
164 Time: 5 * time.Minute,
165 })),
166 }
167 }
168 o = append(o, opts...)
169 pubc, err := vkit.NewPublisherClient(ctx, o...)
170 if err != nil {
171 return nil, fmt.Errorf("pubsub(publisher): %w", err)
172 }
173 subc, err := vkit.NewSubscriberClient(ctx, o...)
174 if err != nil {
175 return nil, fmt.Errorf("pubsub(subscriber): %w", err)
176 }
177 if config != nil {
178 pubc.CallOptions = mergePublisherCallOptions(pubc.CallOptions, config.PublisherCallOptions)
179 subc.CallOptions = mergeSubscriberCallOptions(subc.CallOptions, config.SubscriberCallOptions)
180 }
181 pubc.SetGoogleClientInfo("gccl", internal.Version)
182 subc.SetGoogleClientInfo("gccl", internal.Version)
183
184
185 projectID, err = detect.ProjectID(ctx, projectID, "", opts...)
186 if err != nil {
187 return nil, err
188 }
189
190 return &Client{
191 projectID: projectID,
192 pubc: pubc,
193 subc: subc,
194 }, nil
195 }
196
197
198
199 func (c *Client) Project() string {
200 return c.projectID
201 }
202
203
204
205
206
207
208 func (c *Client) Close() error {
209 pubErr := c.pubc.Close()
210 subErr := c.subc.Close()
211 if pubErr != nil {
212 return fmt.Errorf("pubsub publisher closing error: %w", pubErr)
213 }
214 if subErr != nil {
215
216
217
218
219 if strings.Contains(subErr.Error(), "the client connection is closing") {
220 return nil
221 }
222 return fmt.Errorf("pubsub subscriber closing error: %w", subErr)
223 }
224 return nil
225 }
226
227 func (c *Client) fullyQualifiedProjectName() string {
228 return fmt.Sprintf("projects/%s", c.projectID)
229 }
230
View as plain text