1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "io"
22 "net/http"
23 "net/url"
24 "os"
25 "strings"
26 "time"
27
28 "cloud.google.com/go/bigquery/internal"
29 cloudinternal "cloud.google.com/go/internal"
30 "cloud.google.com/go/internal/detect"
31 "cloud.google.com/go/internal/trace"
32 "cloud.google.com/go/internal/version"
33 gax "github.com/googleapis/gax-go/v2"
34 bq "google.golang.org/api/bigquery/v2"
35 "google.golang.org/api/googleapi"
36 "google.golang.org/api/option"
37 )
38
39 const (
40
41
42
43 Scope = "https://www.googleapis.com/auth/bigquery"
44 userAgentPrefix = "gcloud-golang-bigquery"
45 )
46
47 var xGoogHeader = fmt.Sprintf("gl-go/%s gccl/%s", version.Go(), internal.Version)
48
49 func setClientHeader(headers http.Header) {
50 headers.Set("x-goog-api-client", xGoogHeader)
51 }
52
53
54 type Client struct {
55
56
57
58 Location string
59
60 projectID string
61 bqs *bq.Service
62 rc *readClient
63
64
65 enableQueryPreview bool
66 }
67
68
69
70
71
72
73
74
75 const DetectProjectID = "*detect-project-id*"
76
77
78
79
80
81
82
83
84
85
86
87
88 func NewClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*Client, error) {
89 o := []option.ClientOption{
90 option.WithScopes(Scope),
91 option.WithUserAgent(fmt.Sprintf("%s/%s", userAgentPrefix, internal.Version)),
92 }
93 o = append(o, opts...)
94 bqs, err := bq.NewService(ctx, o...)
95 if err != nil {
96 return nil, fmt.Errorf("bigquery: constructing client: %w", err)
97 }
98
99
100 projectID, err = detect.ProjectID(ctx, projectID, "", opts...)
101 if err != nil {
102 return nil, err
103 }
104
105 var preview bool
106 if v, ok := os.LookupEnv("QUERY_PREVIEW_ENABLED"); ok {
107 if strings.ToUpper(v) == "TRUE" {
108 preview = true
109 }
110 }
111
112 c := &Client{
113 projectID: projectID,
114 bqs: bqs,
115 enableQueryPreview: preview,
116 }
117 return c, nil
118 }
119
120
121
122
123
124
125 func (c *Client) EnableStorageReadClient(ctx context.Context, opts ...option.ClientOption) error {
126 if c.isStorageReadAvailable() {
127 return fmt.Errorf("failed: storage read client already set up")
128 }
129 rc, err := newReadClient(ctx, c.projectID, opts...)
130 if err != nil {
131 return err
132 }
133 c.rc = rc
134 return nil
135 }
136
137 func (c *Client) isStorageReadAvailable() bool {
138 return c.rc != nil
139 }
140
141
142
143 func (c *Client) Project() string {
144 return c.projectID
145 }
146
147
148
149
150 func (c *Client) Close() error {
151 if c.isStorageReadAvailable() {
152 err := c.rc.close()
153 if err != nil {
154 return err
155 }
156 }
157 return nil
158 }
159
160
161 func (c *Client) insertJob(ctx context.Context, job *bq.Job, media io.Reader, mediaOpts ...googleapi.MediaOption) (*Job, error) {
162 call := c.bqs.Jobs.Insert(c.projectID, job).Context(ctx)
163 setClientHeader(call.Header())
164 if media != nil {
165 call.Media(media, mediaOpts...)
166 }
167 var res *bq.Job
168 var err error
169 invoke := func() error {
170 sCtx := trace.StartSpan(ctx, "bigquery.jobs.insert")
171 res, err = call.Do()
172 trace.EndSpan(sCtx, err)
173 return err
174 }
175
176
177
178
179
180 if job.JobReference != nil && media == nil {
181
182 err = runWithRetryExplicit(ctx, invoke, jobRetryReasons)
183 } else {
184 err = invoke()
185 }
186 if err != nil {
187 return nil, err
188 }
189 return bqToJob(res, c)
190 }
191
192
193
194
195 func (c *Client) runQuery(ctx context.Context, queryRequest *bq.QueryRequest) (*bq.QueryResponse, error) {
196 call := c.bqs.Jobs.Query(c.projectID, queryRequest).Context(ctx)
197 setClientHeader(call.Header())
198
199 var res *bq.QueryResponse
200 var err error
201 invoke := func() error {
202 sCtx := trace.StartSpan(ctx, "bigquery.jobs.query")
203 res, err = call.Do()
204 trace.EndSpan(sCtx, err)
205 return err
206 }
207
208
209 err = runWithRetryExplicit(ctx, invoke, jobRetryReasons)
210 if err != nil {
211 return nil, err
212 }
213 return res, nil
214 }
215
216
217
218
219 func unixMillisToTime(m int64) time.Time {
220 if m == 0 {
221 return time.Time{}
222 }
223 return time.Unix(0, m*1e6)
224 }
225
226
227
228
229
230 func runWithRetry(ctx context.Context, call func() error) error {
231 return runWithRetryExplicit(ctx, call, defaultRetryReasons)
232 }
233
234 func runWithRetryExplicit(ctx context.Context, call func() error, allowedReasons []string) error {
235
236 backoff := gax.Backoff{
237 Initial: 1 * time.Second,
238 Max: 32 * time.Second,
239 Multiplier: 2,
240 }
241 return cloudinternal.Retry(ctx, backoff, func() (stop bool, err error) {
242 err = call()
243 if err == nil {
244 return true, nil
245 }
246 return !retryableError(err, allowedReasons), err
247 })
248 }
249
250 var (
251 defaultRetryReasons = []string{"backendError", "rateLimitExceeded"}
252
253
254
255 jobRetryReasons = []string{"backendError", "rateLimitExceeded", "jobRateLimitExceeded", "internalError"}
256
257 retry5xxCodes = []int{
258 http.StatusInternalServerError,
259 http.StatusBadGateway,
260 http.StatusServiceUnavailable,
261 http.StatusGatewayTimeout,
262 }
263 )
264
265
266
267 func retryableError(err error, allowedReasons []string) bool {
268 if err == nil {
269 return false
270 }
271 if err == io.ErrUnexpectedEOF {
272 return true
273 }
274
275
276
277
278
279 if err.Error() == "http2: stream closed" {
280 return true
281 }
282
283 switch e := err.(type) {
284 case *googleapi.Error:
285
286 var reason string
287 if len(e.Errors) > 0 {
288 reason = e.Errors[0].Reason
289 for _, r := range allowedReasons {
290 if reason == r {
291 return true
292 }
293 }
294 }
295 for _, code := range retry5xxCodes {
296 if e.Code == code {
297 return true
298 }
299 }
300 case *url.Error:
301 retryable := []string{"connection refused", "connection reset"}
302 for _, s := range retryable {
303 if strings.Contains(e.Error(), s) {
304 return true
305 }
306 }
307 case interface{ Temporary() bool }:
308 if e.Temporary() {
309 return true
310 }
311 }
312
313 return retryableError(errors.Unwrap(err), allowedReasons)
314 }
315
View as plain text