1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "time"
22
23 "cloud.google.com/go/internal/trace"
24 "cloud.google.com/go/internal/uid"
25 bq "google.golang.org/api/bigquery/v2"
26 )
27
28
29 type QueryConfig struct {
30
31
32 Dst *Table
33
34
35 Q string
36
37
38
39 DefaultProjectID string
40 DefaultDatasetID string
41
42
43
44
45
46
47 TableDefinitions map[string]ExternalData
48
49
50
51 CreateDisposition TableCreateDisposition
52
53
54
55 WriteDisposition TableWriteDisposition
56
57
58
59
60
61
62 DisableQueryCache bool
63
64
65
66
67
68 DisableFlattenedResults bool
69
70
71
72
73
74 AllowLargeResults bool
75
76
77
78
79 Priority QueryPriority
80
81
82
83
84 MaxBillingTier int
85
86
87
88
89
90
91 MaxBytesBilled int64
92
93
94
95 UseStandardSQL bool
96
97
98 UseLegacySQL bool
99
100
101
102
103
104
105 Parameters []QueryParameter
106
107
108
109 TimePartitioning *TimePartitioning
110
111
112
113 RangePartitioning *RangePartitioning
114
115
116 Clustering *Clustering
117
118
119 Labels map[string]string
120
121
122
123
124
125
126
127
128 DryRun bool
129
130
131 DestinationEncryptionConfig *EncryptionConfig
132
133
134
135 SchemaUpdateOptions []string
136
137
138 CreateSession bool
139
140
141 ConnectionProperties []*ConnectionProperty
142
143
144
145
146
147
148
149
150
151 JobTimeout time.Duration
152
153
154 forceStorageAPI bool
155 }
156
157 func (qc *QueryConfig) toBQ() (*bq.JobConfiguration, error) {
158 qconf := &bq.JobConfigurationQuery{
159 Query: qc.Q,
160 CreateDisposition: string(qc.CreateDisposition),
161 WriteDisposition: string(qc.WriteDisposition),
162 AllowLargeResults: qc.AllowLargeResults,
163 Priority: string(qc.Priority),
164 MaximumBytesBilled: qc.MaxBytesBilled,
165 TimePartitioning: qc.TimePartitioning.toBQ(),
166 RangePartitioning: qc.RangePartitioning.toBQ(),
167 Clustering: qc.Clustering.toBQ(),
168 DestinationEncryptionConfiguration: qc.DestinationEncryptionConfig.toBQ(),
169 SchemaUpdateOptions: qc.SchemaUpdateOptions,
170 CreateSession: qc.CreateSession,
171 }
172 if len(qc.TableDefinitions) > 0 {
173 qconf.TableDefinitions = make(map[string]bq.ExternalDataConfiguration)
174 }
175 for name, data := range qc.TableDefinitions {
176 qconf.TableDefinitions[name] = data.toBQ()
177 }
178 if qc.DefaultProjectID != "" || qc.DefaultDatasetID != "" {
179 qconf.DefaultDataset = &bq.DatasetReference{
180 DatasetId: qc.DefaultDatasetID,
181 ProjectId: qc.DefaultProjectID,
182 }
183 }
184 if tier := int64(qc.MaxBillingTier); tier > 0 {
185 qconf.MaximumBillingTier = &tier
186 }
187 f := false
188 if qc.DisableQueryCache {
189 qconf.UseQueryCache = &f
190 }
191 if qc.DisableFlattenedResults {
192 qconf.FlattenResults = &f
193
194 qconf.AllowLargeResults = true
195 }
196 if qc.UseStandardSQL && qc.UseLegacySQL {
197 return nil, errors.New("bigquery: cannot provide both UseStandardSQL and UseLegacySQL")
198 }
199 if len(qc.Parameters) > 0 && qc.UseLegacySQL {
200 return nil, errors.New("bigquery: cannot provide both Parameters (implying standard SQL) and UseLegacySQL")
201 }
202 ptrue := true
203 pfalse := false
204 if qc.UseLegacySQL {
205 qconf.UseLegacySql = &ptrue
206 } else {
207 qconf.UseLegacySql = &pfalse
208 }
209 if qc.Dst != nil && !qc.Dst.implicitTable() {
210 qconf.DestinationTable = qc.Dst.toBQ()
211 }
212 for _, p := range qc.Parameters {
213 qp, err := p.toBQ()
214 if err != nil {
215 return nil, err
216 }
217 qconf.QueryParameters = append(qconf.QueryParameters, qp)
218 }
219 if len(qc.ConnectionProperties) > 0 {
220 bqcp := make([]*bq.ConnectionProperty, len(qc.ConnectionProperties))
221 for k, v := range qc.ConnectionProperties {
222 bqcp[k] = v.toBQ()
223 }
224 qconf.ConnectionProperties = bqcp
225 }
226 jc := &bq.JobConfiguration{
227 Labels: qc.Labels,
228 DryRun: qc.DryRun,
229 Query: qconf,
230 }
231 if qc.JobTimeout > 0 {
232 jc.JobTimeoutMs = qc.JobTimeout.Milliseconds()
233 }
234 return jc, nil
235 }
236
237 func bqToQueryConfig(q *bq.JobConfiguration, c *Client) (*QueryConfig, error) {
238 qq := q.Query
239 qc := &QueryConfig{
240 Labels: q.Labels,
241 DryRun: q.DryRun,
242 JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond,
243 Q: qq.Query,
244 CreateDisposition: TableCreateDisposition(qq.CreateDisposition),
245 WriteDisposition: TableWriteDisposition(qq.WriteDisposition),
246 AllowLargeResults: qq.AllowLargeResults,
247 Priority: QueryPriority(qq.Priority),
248 MaxBytesBilled: qq.MaximumBytesBilled,
249 UseLegacySQL: qq.UseLegacySql == nil || *qq.UseLegacySql,
250 TimePartitioning: bqToTimePartitioning(qq.TimePartitioning),
251 RangePartitioning: bqToRangePartitioning(qq.RangePartitioning),
252 Clustering: bqToClustering(qq.Clustering),
253 DestinationEncryptionConfig: bqToEncryptionConfig(qq.DestinationEncryptionConfiguration),
254 SchemaUpdateOptions: qq.SchemaUpdateOptions,
255 CreateSession: qq.CreateSession,
256 }
257 qc.UseStandardSQL = !qc.UseLegacySQL
258
259 if len(qq.TableDefinitions) > 0 {
260 qc.TableDefinitions = make(map[string]ExternalData)
261 }
262 for name, qedc := range qq.TableDefinitions {
263 edc, err := bqToExternalDataConfig(&qedc)
264 if err != nil {
265 return nil, err
266 }
267 qc.TableDefinitions[name] = edc
268 }
269 if qq.DefaultDataset != nil {
270 qc.DefaultProjectID = qq.DefaultDataset.ProjectId
271 qc.DefaultDatasetID = qq.DefaultDataset.DatasetId
272 }
273 if qq.MaximumBillingTier != nil {
274 qc.MaxBillingTier = int(*qq.MaximumBillingTier)
275 }
276 if qq.UseQueryCache != nil && !*qq.UseQueryCache {
277 qc.DisableQueryCache = true
278 }
279 if qq.FlattenResults != nil && !*qq.FlattenResults {
280 qc.DisableFlattenedResults = true
281 }
282 if qq.DestinationTable != nil {
283 qc.Dst = bqToTable(qq.DestinationTable, c)
284 }
285 for _, qp := range qq.QueryParameters {
286 p, err := bqToQueryParameter(qp)
287 if err != nil {
288 return nil, err
289 }
290 qc.Parameters = append(qc.Parameters, p)
291 }
292 if len(qq.ConnectionProperties) > 0 {
293 props := make([]*ConnectionProperty, len(qq.ConnectionProperties))
294 for k, v := range qq.ConnectionProperties {
295 props[k] = bqToConnectionProperty(v)
296 }
297 qc.ConnectionProperties = props
298 }
299 return qc, nil
300 }
301
302
303 type QueryPriority string
304
305 const (
306
307
308
309
310
311
312
313
314
315 BatchPriority QueryPriority = "BATCH"
316
317
318
319
320
321
322
323 InteractivePriority QueryPriority = "INTERACTIVE"
324 )
325
326
327 type Query struct {
328 JobIDConfig
329 QueryConfig
330 client *Client
331 }
332
333
334
335 func (c *Client) Query(q string) *Query {
336 return &Query{
337 client: c,
338 QueryConfig: QueryConfig{Q: q},
339 }
340 }
341
342
343 func (q *Query) Run(ctx context.Context) (j *Job, err error) {
344 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run")
345 defer func() { trace.EndSpan(ctx, err) }()
346
347 job, err := q.newJob()
348 if err != nil {
349 return nil, err
350 }
351 j, err = q.client.insertJob(ctx, job, nil)
352 if err != nil {
353 return nil, err
354 }
355 return j, nil
356 }
357
358 func (q *Query) newJob() (*bq.Job, error) {
359 config, err := q.QueryConfig.toBQ()
360 if err != nil {
361 return nil, err
362 }
363 return &bq.Job{
364 JobReference: q.JobIDConfig.createJobRef(q.client),
365 Configuration: config,
366 }, nil
367 }
368
369
370
371
372
373 func (q *Query) Read(ctx context.Context) (it *RowIterator, err error) {
374 if q.QueryConfig.DryRun {
375 return nil, errors.New("bigquery: cannot evaluate Query.Read() for dry-run queries")
376 }
377 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Query.Run")
378 defer func() { trace.EndSpan(ctx, err) }()
379 queryRequest, err := q.probeFastPath()
380 if err != nil {
381
382 job, err := q.Run(ctx)
383 if err != nil {
384 return nil, err
385 }
386 return job.Read(ctx)
387 }
388
389
390 resp, err := q.client.runQuery(ctx, queryRequest)
391 if err != nil {
392 return nil, err
393 }
394
395
396 var minimalJob *Job
397 if resp.JobReference != nil {
398 minimalJob = &Job{
399 c: q.client,
400 jobID: resp.JobReference.JobId,
401 location: resp.JobReference.Location,
402 projectID: resp.JobReference.ProjectId,
403 }
404 }
405
406 if resp.JobComplete {
407
408 if resp.PageToken != "" && q.client.isStorageReadAvailable() {
409 it, err = newStorageRowIteratorFromJob(ctx, minimalJob)
410 if err == nil {
411 return it, nil
412 }
413 }
414 rowSource := &rowSource{
415 j: minimalJob,
416 queryID: resp.QueryId,
417
418 cachedRows: resp.Rows,
419 cachedSchema: resp.Schema,
420 cachedNextToken: resp.PageToken,
421 }
422 return newRowIterator(ctx, rowSource, fetchPage), nil
423 }
424
425
426
427
428
429
430 minimalJob.config = &bq.JobConfiguration{
431 Query: &bq.JobConfigurationQuery{},
432 }
433
434 return minimalJob.Read(ctx)
435 }
436
437
438
439
440 func (q *Query) probeFastPath() (*bq.QueryRequest, error) {
441 if q.forceStorageAPI && q.client.isStorageReadAvailable() {
442 return nil, fmt.Errorf("force Storage API usage")
443 }
444
445
446
447 if q.QueryConfig.Dst != nil ||
448 q.QueryConfig.TableDefinitions != nil ||
449 q.QueryConfig.CreateDisposition != "" ||
450 q.QueryConfig.WriteDisposition != "" ||
451 !(q.QueryConfig.Priority == "" || q.QueryConfig.Priority == InteractivePriority) ||
452 q.QueryConfig.UseLegacySQL ||
453 q.QueryConfig.MaxBillingTier != 0 ||
454 q.QueryConfig.TimePartitioning != nil ||
455 q.QueryConfig.RangePartitioning != nil ||
456 q.QueryConfig.Clustering != nil ||
457 q.QueryConfig.DestinationEncryptionConfig != nil ||
458 q.QueryConfig.SchemaUpdateOptions != nil ||
459 q.QueryConfig.JobTimeout != 0 ||
460
461 q.JobIDConfig.JobID != "" {
462 return nil, fmt.Errorf("QueryConfig incompatible with fastPath")
463 }
464 pfalse := false
465 qRequest := &bq.QueryRequest{
466 Query: q.QueryConfig.Q,
467 CreateSession: q.CreateSession,
468 Location: q.Location,
469 UseLegacySql: &pfalse,
470 MaximumBytesBilled: q.QueryConfig.MaxBytesBilled,
471 RequestId: uid.NewSpace("request", nil).New(),
472 Labels: q.Labels,
473 FormatOptions: &bq.DataFormatOptions{
474 UseInt64Timestamp: true,
475 },
476 }
477 if q.QueryConfig.DisableQueryCache {
478 qRequest.UseQueryCache = &pfalse
479 }
480
481 for _, p := range q.QueryConfig.Parameters {
482 qp, err := p.toBQ()
483 if err != nil {
484 return nil, err
485 }
486 qRequest.QueryParameters = append(qRequest.QueryParameters, qp)
487 }
488 if q.QueryConfig.DefaultDatasetID != "" {
489 qRequest.DefaultDataset = &bq.DatasetReference{
490 ProjectId: q.QueryConfig.DefaultProjectID,
491 DatasetId: q.QueryConfig.DefaultDatasetID,
492 }
493 }
494 if q.client.enableQueryPreview {
495 qRequest.JobCreationMode = "JOB_CREATION_OPTIONAL"
496 }
497 return qRequest, nil
498 }
499
500
501 type ConnectionProperty struct {
502
503 Key string
504
505 Value string
506 }
507
508 func (cp *ConnectionProperty) toBQ() *bq.ConnectionProperty {
509 if cp == nil {
510 return nil
511 }
512 return &bq.ConnectionProperty{
513 Key: cp.Key,
514 Value: cp.Value,
515 }
516 }
517
518 func bqToConnectionProperty(in *bq.ConnectionProperty) *ConnectionProperty {
519 if in == nil {
520 return nil
521 }
522 return &ConnectionProperty{
523 Key: in.Key,
524 Value: in.Value,
525 }
526 }
527
View as plain text