1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "time"
22
23 "cloud.google.com/go/internal"
24 "cloud.google.com/go/internal/trace"
25 gax "github.com/googleapis/gax-go/v2"
26 bq "google.golang.org/api/bigquery/v2"
27 "google.golang.org/api/googleapi"
28 "google.golang.org/api/iterator"
29 )
30
31
32 type Job struct {
33 c *Client
34 projectID string
35 jobID string
36 location string
37 email string
38 config *bq.JobConfiguration
39 lastStatus *JobStatus
40 }
41
42
43
44
45
46
47
48 func (c *Client) JobFromID(ctx context.Context, id string) (*Job, error) {
49 return c.JobFromProject(ctx, c.projectID, id, c.Location)
50 }
51
52
53
54
55 func (c *Client) JobFromIDLocation(ctx context.Context, id, location string) (j *Job, err error) {
56 return c.JobFromProject(ctx, c.projectID, id, location)
57 }
58
59
60
61
62 func (c *Client) JobFromProject(ctx context.Context, projectID, jobID, location string) (j *Job, err error) {
63 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.JobFromProject")
64 defer func() { trace.EndSpan(ctx, err) }()
65
66 bqjob, err := c.getJobInternal(ctx, jobID, location, projectID, "user_email", "configuration", "jobReference", "status", "statistics")
67 if err != nil {
68 return nil, err
69 }
70 return bqToJob(bqjob, c)
71 }
72
73
74 func (j *Job) ProjectID() string {
75 return j.projectID
76 }
77
78
79 func (j *Job) ID() string {
80 return j.jobID
81 }
82
83
84 func (j *Job) Location() string {
85 return j.location
86 }
87
88
89 func (j *Job) Email() string {
90 return j.email
91 }
92
93
94 type State int
95
96 const (
97
98 StateUnspecified State = iota
99
100 Pending
101
102 Running
103
104 Done
105 )
106
107
108 type JobStatus struct {
109 State State
110
111 err error
112
113
114
115 Errors []*Error
116
117
118 Statistics *JobStatistics
119 }
120
121
122
123 type JobConfig interface {
124 isJobConfig()
125 }
126
127 func (*CopyConfig) isJobConfig() {}
128 func (*ExtractConfig) isJobConfig() {}
129 func (*LoadConfig) isJobConfig() {}
130 func (*QueryConfig) isJobConfig() {}
131
132
133 func (j *Job) Config() (JobConfig, error) {
134 return bqToJobConfig(j.config, j.c)
135 }
136
137
138
139
140 func (j *Job) Children(ctx context.Context) *JobIterator {
141 it := j.c.Jobs(ctx)
142 it.ParentJobID = j.ID()
143 return it
144 }
145
146 func bqToJobConfig(q *bq.JobConfiguration, c *Client) (JobConfig, error) {
147 switch {
148 case q == nil:
149 return nil, nil
150 case q.Copy != nil:
151 return bqToCopyConfig(q, c), nil
152 case q.Extract != nil:
153 return bqToExtractConfig(q, c), nil
154 case q.Load != nil:
155 return bqToLoadConfig(q, c), nil
156 case q.Query != nil:
157 return bqToQueryConfig(q, c)
158 default:
159 return nil, nil
160 }
161 }
162
163
164 type JobIDConfig struct {
165
166 JobID string
167
168
169 AddJobIDSuffix bool
170
171
172 Location string
173
174
175 ProjectID string
176 }
177
178
179 func (j *JobIDConfig) createJobRef(c *Client) *bq.JobReference {
180 projectID := j.ProjectID
181 if projectID == "" {
182 projectID = c.projectID
183 }
184 loc := j.Location
185 if loc == "" {
186 loc = c.Location
187 }
188 jr := &bq.JobReference{ProjectId: projectID, Location: loc}
189 if j.JobID == "" {
190 jr.JobId = randomIDFn()
191 } else if j.AddJobIDSuffix {
192 jr.JobId = j.JobID + "-" + randomIDFn()
193 } else {
194 jr.JobId = j.JobID
195 }
196 return jr
197 }
198
199
200
201 func (s *JobStatus) Done() bool {
202 return s.State == Done
203 }
204
205
206 func (s *JobStatus) Err() error {
207 return s.err
208 }
209
210
211 func (j *Job) Status(ctx context.Context) (js *JobStatus, err error) {
212 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Status")
213 defer func() { trace.EndSpan(ctx, err) }()
214
215 bqjob, err := j.c.getJobInternal(ctx, j.jobID, j.location, j.projectID, "status", "statistics")
216 if err != nil {
217 return nil, err
218 }
219 if err := j.setStatus(bqjob.Status); err != nil {
220 return nil, err
221 }
222 j.setStatistics(bqjob.Statistics, j.c)
223 return j.lastStatus, nil
224 }
225
226
227
228
229 func (j *Job) LastStatus() *JobStatus {
230 return j.lastStatus
231 }
232
233
234
235
236 func (j *Job) Cancel(ctx context.Context) error {
237
238
239
240
241
242 call := j.c.bqs.Jobs.Cancel(j.projectID, j.jobID).
243 Location(j.location).
244 Fields().
245 Context(ctx)
246 setClientHeader(call.Header())
247 return runWithRetry(ctx, func() error {
248 sCtx := trace.StartSpan(ctx, "bigquery.jobs.cancel")
249 _, err := call.Do()
250 trace.EndSpan(sCtx, err)
251 return err
252 })
253 }
254
255
256 func (j *Job) Delete(ctx context.Context) (err error) {
257 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Delete")
258 defer func() { trace.EndSpan(ctx, err) }()
259
260 call := j.c.bqs.Jobs.Delete(j.projectID, j.jobID).Context(ctx)
261 if j.location != "" {
262 call = call.Location(j.location)
263 }
264 setClientHeader(call.Header())
265
266 return runWithRetry(ctx, func() (err error) {
267 sCtx := trace.StartSpan(ctx, "bigquery.jobs.delete")
268 err = call.Do()
269 trace.EndSpan(sCtx, err)
270 return err
271 })
272 }
273
274
275
276
277
278
279 func (j *Job) Wait(ctx context.Context) (js *JobStatus, err error) {
280 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Wait")
281 defer func() { trace.EndSpan(ctx, err) }()
282
283 if j.isQuery() {
284
285 if _, _, err := j.waitForQuery(ctx, j.projectID); err != nil {
286 return nil, err
287 }
288
289 js, err := j.Status(ctx)
290 if err != nil {
291 return nil, err
292 }
293 return js, nil
294 }
295
296 err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
297 js, err = j.Status(ctx)
298 if err != nil {
299 return true, err
300 }
301 if js.Done() {
302 return true, nil
303 }
304 return false, nil
305 })
306 if err != nil {
307 return nil, err
308 }
309 return js, nil
310 }
311
312
313
314 func (j *Job) Read(ctx context.Context) (ri *RowIterator, err error) {
315 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Job.Read")
316 defer func() { trace.EndSpan(ctx, err) }()
317
318 return j.read(ctx, j.waitForQuery, fetchPage)
319 }
320
321 func (j *Job) read(ctx context.Context, waitForQuery func(context.Context, string) (Schema, uint64, error), pf pageFetcher) (*RowIterator, error) {
322 if !j.isQuery() {
323 return nil, errors.New("bigquery: cannot read from a non-query job")
324 }
325 schema, totalRows, err := waitForQuery(ctx, j.projectID)
326 if err != nil {
327 return nil, err
328 }
329 var it *RowIterator
330 if j.c.isStorageReadAvailable() {
331 it, err = newStorageRowIteratorFromJob(ctx, j)
332 if err != nil {
333 it = nil
334 }
335 }
336 if it == nil {
337
338 itJob := &Job{
339 c: j.c,
340 projectID: j.projectID,
341 jobID: j.jobID,
342 location: j.location,
343 }
344 it = newRowIterator(ctx, &rowSource{j: itJob}, pf)
345 it.TotalRows = totalRows
346 }
347 it.Schema = schema
348 return it, nil
349 }
350
351
352
353 func (j *Job) waitForQuery(ctx context.Context, projectID string) (Schema, uint64, error) {
354
355 call := j.c.bqs.Jobs.GetQueryResults(projectID, j.jobID).Location(j.location).Context(ctx).MaxResults(0)
356 call = call.FormatOptionsUseInt64Timestamp(true)
357 setClientHeader(call.Header())
358 backoff := gax.Backoff{
359 Initial: 1 * time.Second,
360 Multiplier: 2,
361 Max: 60 * time.Second,
362 }
363 var res *bq.GetQueryResultsResponse
364 err := internal.Retry(ctx, backoff, func() (stop bool, err error) {
365 sCtx := trace.StartSpan(ctx, "bigquery.jobs.getQueryResults")
366 res, err = call.Do()
367 trace.EndSpan(sCtx, err)
368 if err != nil {
369 return !retryableError(err, jobRetryReasons), err
370 }
371 if !res.JobComplete {
372 return false, nil
373 }
374 return true, nil
375 })
376 if err != nil {
377 return nil, 0, err
378 }
379 return bqToSchema(res.Schema), res.TotalRows, nil
380 }
381
382
383 type JobStatistics struct {
384 CreationTime time.Time
385 StartTime time.Time
386 EndTime time.Time
387 TotalBytesProcessed int64
388
389 Details Statistics
390
391
392 NumChildJobs int64
393
394
395 ParentJobID string
396
397
398
399 ScriptStatistics *ScriptStatistics
400
401
402 ReservationUsage []*ReservationUsage
403
404
405 TransactionInfo *TransactionInfo
406
407
408 SessionInfo *SessionInfo
409 }
410
411
412 type Statistics interface {
413 implementsStatistics()
414 }
415
416
417 type ExtractStatistics struct {
418
419
420
421 DestinationURIFileCounts []int64
422 }
423
424
425 type LoadStatistics struct {
426
427 InputFileBytes int64
428
429
430 InputFiles int64
431
432
433
434 OutputBytes int64
435
436
437
438 OutputRows int64
439 }
440
441
442 type QueryStatistics struct {
443
444
445 BIEngineStatistics *BIEngineStatistics
446
447
448 BillingTier int64
449
450
451 CacheHit bool
452
453
454 StatementType string
455
456
457 TotalBytesBilled int64
458
459
460 TotalBytesProcessed int64
461
462
463
464
465
466
467
468 TotalBytesProcessedAccuracy string
469
470
471 QueryPlan []*ExplainQueryStage
472
473
474
475 NumDMLAffectedRows int64
476
477
478
479 DMLStats *DMLStatistics
480
481
482 Timeline []*QueryTimelineSample
483
484
485
486
487 ReferencedTables []*Table
488
489
490
491 Schema Schema
492
493
494 SlotMillis int64
495
496
497
498 UndeclaredQueryParameterNames []string
499
500
501 DDLTargetTable *Table
502
503
504
505 DDLOperationPerformed string
506
507
508 DDLTargetRoutine *Routine
509
510
511 ExportDataStatistics *ExportDataStatistics
512 }
513
514
515
516 type ExportDataStatistics struct {
517
518 FileCount int64
519
520
521 RowCount int64
522 }
523
524 func bqToExportDataStatistics(in *bq.ExportDataStatistics) *ExportDataStatistics {
525 if in == nil {
526 return nil
527 }
528 stats := &ExportDataStatistics{
529 FileCount: in.FileCount,
530 RowCount: in.RowCount,
531 }
532 return stats
533 }
534
535
536 type BIEngineStatistics struct {
537
538 BIEngineMode string
539
540
541
542
543
544 BIEngineReasons []*BIEngineReason
545 }
546
547 func bqToBIEngineStatistics(in *bq.BiEngineStatistics) *BIEngineStatistics {
548 if in == nil {
549 return nil
550 }
551 stats := &BIEngineStatistics{
552 BIEngineMode: in.BiEngineMode,
553 }
554 for _, v := range in.BiEngineReasons {
555 stats.BIEngineReasons = append(stats.BIEngineReasons, bqToBIEngineReason(v))
556 }
557 return stats
558 }
559
560
561
562 type BIEngineReason struct {
563
564 Code string
565
566
567 Message string
568 }
569
570 func bqToBIEngineReason(in *bq.BiEngineReason) *BIEngineReason {
571 if in == nil {
572 return nil
573 }
574 return &BIEngineReason{
575 Code: in.Code,
576 Message: in.Message,
577 }
578 }
579
580
581 type ExplainQueryStage struct {
582
583 CompletedParallelInputs int64
584
585
586 ComputeAvg time.Duration
587
588
589 ComputeMax time.Duration
590
591
592 ComputeRatioAvg float64
593
594
595 ComputeRatioMax float64
596
597
598 EndTime time.Time
599
600
601 ID int64
602
603
604 InputStages []int64
605
606
607 Name string
608
609
610 ParallelInputs int64
611
612
613 ReadAvg time.Duration
614
615
616 ReadMax time.Duration
617
618
619 ReadRatioAvg float64
620
621
622 ReadRatioMax float64
623
624
625 RecordsRead int64
626
627
628 RecordsWritten int64
629
630
631 ShuffleOutputBytes int64
632
633
634
635 ShuffleOutputBytesSpilled int64
636
637
638 StartTime time.Time
639
640
641 Status string
642
643
644
645 Steps []*ExplainQueryStep
646
647
648 WaitAvg time.Duration
649
650
651 WaitMax time.Duration
652
653
654 WaitRatioAvg float64
655
656
657 WaitRatioMax float64
658
659
660 WriteAvg time.Duration
661
662
663 WriteMax time.Duration
664
665
666 WriteRatioAvg float64
667
668
669 WriteRatioMax float64
670 }
671
672
673 type ExplainQueryStep struct {
674
675 Kind string
676
677
678 Substeps []string
679 }
680
681
682 type QueryTimelineSample struct {
683
684
685 ActiveUnits int64
686
687
688 CompletedUnits int64
689
690
691 Elapsed time.Duration
692
693
694 PendingUnits int64
695
696
697 SlotMillis int64
698 }
699
700
701 type ReservationUsage struct {
702
703 SlotMillis int64
704
705 Name string
706 }
707
708 func bqToReservationUsage(ru []*bq.JobStatisticsReservationUsage) []*ReservationUsage {
709 var usage []*ReservationUsage
710 for _, in := range ru {
711 usage = append(usage, &ReservationUsage{
712 SlotMillis: in.SlotMs,
713 Name: in.Name,
714 })
715 }
716 return usage
717 }
718
719
720 type ScriptStatistics struct {
721 EvaluationKind string
722 StackFrames []*ScriptStackFrame
723 }
724
725 func bqToScriptStatistics(bs *bq.ScriptStatistics) *ScriptStatistics {
726 if bs == nil {
727 return nil
728 }
729 ss := &ScriptStatistics{
730 EvaluationKind: bs.EvaluationKind,
731 }
732 for _, f := range bs.StackFrames {
733 ss.StackFrames = append(ss.StackFrames, bqToScriptStackFrame(f))
734 }
735 return ss
736 }
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754 type ScriptStackFrame struct {
755 StartLine int64
756 StartColumn int64
757 EndLine int64
758 EndColumn int64
759
760 ProcedureID string
761
762 Text string
763 }
764
765 func bqToScriptStackFrame(bsf *bq.ScriptStackFrame) *ScriptStackFrame {
766 if bsf == nil {
767 return nil
768 }
769 return &ScriptStackFrame{
770 StartLine: bsf.StartLine,
771 StartColumn: bsf.StartColumn,
772 EndLine: bsf.EndLine,
773 EndColumn: bsf.EndColumn,
774 ProcedureID: bsf.ProcedureId,
775 Text: bsf.Text,
776 }
777 }
778
779
780 type DMLStatistics struct {
781
782 InsertedRowCount int64
783
784 DeletedRowCount int64
785
786 UpdatedRowCount int64
787 }
788
789 func bqToDMLStatistics(q *bq.DmlStatistics) *DMLStatistics {
790 if q == nil {
791 return nil
792 }
793 return &DMLStatistics{
794 InsertedRowCount: q.InsertedRowCount,
795 DeletedRowCount: q.DeletedRowCount,
796 UpdatedRowCount: q.UpdatedRowCount,
797 }
798 }
799
800 func (*ExtractStatistics) implementsStatistics() {}
801 func (*LoadStatistics) implementsStatistics() {}
802 func (*QueryStatistics) implementsStatistics() {}
803
804
805 func (c *Client) Jobs(ctx context.Context) *JobIterator {
806 it := &JobIterator{
807 ctx: ctx,
808 c: c,
809 ProjectID: c.projectID,
810 }
811 it.pageInfo, it.nextFunc = iterator.NewPageInfo(
812 it.fetch,
813 func() int { return len(it.items) },
814 func() interface{} { b := it.items; it.items = nil; return b })
815 return it
816 }
817
818
819 type JobIterator struct {
820 ProjectID string
821 AllUsers bool
822 State State
823 MinCreationTime time.Time
824 MaxCreationTime time.Time
825 ParentJobID string
826
827 ctx context.Context
828 c *Client
829 pageInfo *iterator.PageInfo
830 nextFunc func() error
831 items []*Job
832 }
833
834
835 func (it *JobIterator) PageInfo() *iterator.PageInfo { return it.pageInfo }
836
837
838
839
840 func (it *JobIterator) Next() (*Job, error) {
841 if err := it.nextFunc(); err != nil {
842 return nil, err
843 }
844 item := it.items[0]
845 it.items = it.items[1:]
846 return item, nil
847 }
848
849 func (it *JobIterator) fetch(pageSize int, pageToken string) (string, error) {
850 var st string
851 switch it.State {
852 case StateUnspecified:
853 st = ""
854 case Pending:
855 st = "pending"
856 case Running:
857 st = "running"
858 case Done:
859 st = "done"
860 default:
861 return "", fmt.Errorf("bigquery: invalid value for JobIterator.State: %d", it.State)
862 }
863
864 req := it.c.bqs.Jobs.List(it.ProjectID).
865 Context(it.ctx).
866 PageToken(pageToken).
867 Projection("full").
868 AllUsers(it.AllUsers)
869 if st != "" {
870 req.StateFilter(st)
871 }
872 if !it.MinCreationTime.IsZero() {
873 req.MinCreationTime(uint64(it.MinCreationTime.UnixNano() / 1e6))
874 }
875 if !it.MaxCreationTime.IsZero() {
876 req.MaxCreationTime(uint64(it.MaxCreationTime.UnixNano() / 1e6))
877 }
878 setClientHeader(req.Header())
879 if pageSize > 0 {
880 req.MaxResults(int64(pageSize))
881 }
882 if it.ParentJobID != "" {
883 req.ParentJobId(it.ParentJobID)
884 }
885 var res *bq.JobList
886 err := runWithRetry(it.ctx, func() (err error) {
887 sCtx := trace.StartSpan(it.ctx, "bigquery.jobs.list")
888 res, err = req.Do()
889 trace.EndSpan(sCtx, err)
890 return err
891 })
892
893 if err != nil {
894 return "", err
895 }
896 for _, j := range res.Jobs {
897 job, err := convertListedJob(j, it.c)
898 if err != nil {
899 return "", err
900 }
901 it.items = append(it.items, job)
902 }
903 return res.NextPageToken, nil
904 }
905
906 func convertListedJob(j *bq.JobListJobs, c *Client) (*Job, error) {
907 return bqToJob2(j.JobReference, j.Configuration, j.Status, j.Statistics, j.UserEmail, c)
908 }
909
910 func (c *Client) getJobInternal(ctx context.Context, jobID, location, projectID string, fields ...googleapi.Field) (*bq.Job, error) {
911 var job *bq.Job
912 proj := projectID
913 if proj == "" {
914 proj = c.projectID
915 }
916 call := c.bqs.Jobs.Get(proj, jobID).Context(ctx)
917 if location != "" {
918 call = call.Location(location)
919 }
920 if len(fields) > 0 {
921 call = call.Fields(fields...)
922 }
923 setClientHeader(call.Header())
924 err := runWithRetry(ctx, func() (err error) {
925 sCtx := trace.StartSpan(ctx, "bigquery.jobs.get")
926 job, err = call.Do()
927 trace.EndSpan(sCtx, err)
928 return err
929 })
930 if err != nil {
931 return nil, err
932 }
933 return job, nil
934 }
935
936 func bqToJob(q *bq.Job, c *Client) (*Job, error) {
937 return bqToJob2(q.JobReference, q.Configuration, q.Status, q.Statistics, q.UserEmail, c)
938 }
939
940 func bqToJob2(qr *bq.JobReference, qc *bq.JobConfiguration, qs *bq.JobStatus, qt *bq.JobStatistics, email string, c *Client) (*Job, error) {
941 j := &Job{
942 projectID: qr.ProjectId,
943 jobID: qr.JobId,
944 location: qr.Location,
945 c: c,
946 email: email,
947 }
948 j.setConfig(qc)
949 if err := j.setStatus(qs); err != nil {
950 return nil, err
951 }
952 j.setStatistics(qt, c)
953 return j, nil
954 }
955
956 func (j *Job) setConfig(config *bq.JobConfiguration) {
957 if config == nil {
958 return
959 }
960 j.config = config
961 }
962
963 func (j *Job) isQuery() bool {
964 return j.config != nil && j.config.Query != nil
965 }
966
967 func (j *Job) isScript() bool {
968 return j.hasStatementType("SCRIPT")
969 }
970
971 func (j *Job) isSelectQuery() bool {
972 return j.hasStatementType("SELECT")
973 }
974
975 func (j *Job) hasStatementType(statementType string) bool {
976 if !j.isQuery() {
977 return false
978 }
979 if j.lastStatus == nil {
980 return false
981 }
982 queryStats, ok := j.lastStatus.Statistics.Details.(*QueryStatistics)
983 if !ok {
984 return false
985 }
986 return queryStats.StatementType == statementType
987 }
988
989 var stateMap = map[string]State{"PENDING": Pending, "RUNNING": Running, "DONE": Done}
990
991 func (j *Job) setStatus(qs *bq.JobStatus) error {
992 if qs == nil {
993 return nil
994 }
995 state, ok := stateMap[qs.State]
996 if !ok {
997 return fmt.Errorf("unexpected job state: %s", qs.State)
998 }
999 j.lastStatus = &JobStatus{
1000 State: state,
1001 err: nil,
1002 }
1003 if err := bqToError(qs.ErrorResult); state == Done && err != nil {
1004 j.lastStatus.err = err
1005 }
1006 for _, ep := range qs.Errors {
1007 j.lastStatus.Errors = append(j.lastStatus.Errors, bqToError(ep))
1008 }
1009 return nil
1010 }
1011
1012 func (j *Job) setStatistics(s *bq.JobStatistics, c *Client) {
1013 if s == nil || j.lastStatus == nil {
1014 return
1015 }
1016 js := &JobStatistics{
1017 CreationTime: unixMillisToTime(s.CreationTime),
1018 StartTime: unixMillisToTime(s.StartTime),
1019 EndTime: unixMillisToTime(s.EndTime),
1020 TotalBytesProcessed: s.TotalBytesProcessed,
1021 NumChildJobs: s.NumChildJobs,
1022 ParentJobID: s.ParentJobId,
1023 ScriptStatistics: bqToScriptStatistics(s.ScriptStatistics),
1024 ReservationUsage: bqToReservationUsage(s.ReservationUsage),
1025 TransactionInfo: bqToTransactionInfo(s.TransactionInfo),
1026 SessionInfo: bqToSessionInfo(s.SessionInfo),
1027 }
1028 switch {
1029 case s.Extract != nil:
1030 js.Details = &ExtractStatistics{
1031 DestinationURIFileCounts: []int64(s.Extract.DestinationUriFileCounts),
1032 }
1033 case s.Load != nil:
1034 js.Details = &LoadStatistics{
1035 InputFileBytes: s.Load.InputFileBytes,
1036 InputFiles: s.Load.InputFiles,
1037 OutputBytes: s.Load.OutputBytes,
1038 OutputRows: s.Load.OutputRows,
1039 }
1040 case s.Query != nil:
1041 var names []string
1042 for _, qp := range s.Query.UndeclaredQueryParameters {
1043 names = append(names, qp.Name)
1044 }
1045 var tables []*Table
1046 for _, tr := range s.Query.ReferencedTables {
1047 tables = append(tables, bqToTable(tr, c))
1048 }
1049 js.Details = &QueryStatistics{
1050 BIEngineStatistics: bqToBIEngineStatistics(s.Query.BiEngineStatistics),
1051 BillingTier: s.Query.BillingTier,
1052 CacheHit: s.Query.CacheHit,
1053 DDLTargetTable: bqToTable(s.Query.DdlTargetTable, c),
1054 DDLOperationPerformed: s.Query.DdlOperationPerformed,
1055 DDLTargetRoutine: bqToRoutine(s.Query.DdlTargetRoutine, c),
1056 ExportDataStatistics: bqToExportDataStatistics(s.Query.ExportDataStatistics),
1057 StatementType: s.Query.StatementType,
1058 TotalBytesBilled: s.Query.TotalBytesBilled,
1059 TotalBytesProcessed: s.Query.TotalBytesProcessed,
1060 TotalBytesProcessedAccuracy: s.Query.TotalBytesProcessedAccuracy,
1061 NumDMLAffectedRows: s.Query.NumDmlAffectedRows,
1062 DMLStats: bqToDMLStatistics(s.Query.DmlStats),
1063 QueryPlan: queryPlanFromProto(s.Query.QueryPlan),
1064 Schema: bqToSchema(s.Query.Schema),
1065 SlotMillis: s.Query.TotalSlotMs,
1066 Timeline: timelineFromProto(s.Query.Timeline),
1067 ReferencedTables: tables,
1068 UndeclaredQueryParameterNames: names,
1069 }
1070 }
1071 j.lastStatus.Statistics = js
1072 }
1073
1074 func queryPlanFromProto(stages []*bq.ExplainQueryStage) []*ExplainQueryStage {
1075 var res []*ExplainQueryStage
1076 for _, s := range stages {
1077 var steps []*ExplainQueryStep
1078 for _, p := range s.Steps {
1079 steps = append(steps, &ExplainQueryStep{
1080 Kind: p.Kind,
1081 Substeps: p.Substeps,
1082 })
1083 }
1084 res = append(res, &ExplainQueryStage{
1085 CompletedParallelInputs: s.CompletedParallelInputs,
1086 ComputeAvg: time.Duration(s.ComputeMsAvg) * time.Millisecond,
1087 ComputeMax: time.Duration(s.ComputeMsMax) * time.Millisecond,
1088 ComputeRatioAvg: s.ComputeRatioAvg,
1089 ComputeRatioMax: s.ComputeRatioMax,
1090 EndTime: time.Unix(0, s.EndMs*1e6),
1091 ID: s.Id,
1092 InputStages: s.InputStages,
1093 Name: s.Name,
1094 ParallelInputs: s.ParallelInputs,
1095 ReadAvg: time.Duration(s.ReadMsAvg) * time.Millisecond,
1096 ReadMax: time.Duration(s.ReadMsMax) * time.Millisecond,
1097 ReadRatioAvg: s.ReadRatioAvg,
1098 ReadRatioMax: s.ReadRatioMax,
1099 RecordsRead: s.RecordsRead,
1100 RecordsWritten: s.RecordsWritten,
1101 ShuffleOutputBytes: s.ShuffleOutputBytes,
1102 ShuffleOutputBytesSpilled: s.ShuffleOutputBytesSpilled,
1103 StartTime: time.Unix(0, s.StartMs*1e6),
1104 Status: s.Status,
1105 Steps: steps,
1106 WaitAvg: time.Duration(s.WaitMsAvg) * time.Millisecond,
1107 WaitMax: time.Duration(s.WaitMsMax) * time.Millisecond,
1108 WaitRatioAvg: s.WaitRatioAvg,
1109 WaitRatioMax: s.WaitRatioMax,
1110 WriteAvg: time.Duration(s.WriteMsAvg) * time.Millisecond,
1111 WriteMax: time.Duration(s.WriteMsMax) * time.Millisecond,
1112 WriteRatioAvg: s.WriteRatioAvg,
1113 WriteRatioMax: s.WriteRatioMax,
1114 })
1115 }
1116 return res
1117 }
1118
1119 func timelineFromProto(timeline []*bq.QueryTimelineSample) []*QueryTimelineSample {
1120 var res []*QueryTimelineSample
1121 for _, s := range timeline {
1122 res = append(res, &QueryTimelineSample{
1123 ActiveUnits: s.ActiveUnits,
1124 CompletedUnits: s.CompletedUnits,
1125 Elapsed: time.Duration(s.ElapsedMs) * time.Millisecond,
1126 PendingUnits: s.PendingUnits,
1127 SlotMillis: s.TotalSlotMs,
1128 })
1129 }
1130 return res
1131 }
1132
1133
1134 type TransactionInfo struct {
1135
1136 TransactionID string
1137 }
1138
1139 func bqToTransactionInfo(in *bq.TransactionInfo) *TransactionInfo {
1140 if in == nil {
1141 return nil
1142 }
1143 return &TransactionInfo{
1144 TransactionID: in.TransactionId,
1145 }
1146 }
1147
1148
1149 type SessionInfo struct {
1150 SessionID string
1151 }
1152
1153 func bqToSessionInfo(in *bq.SessionInfo) *SessionInfo {
1154 if in == nil {
1155 return nil
1156 }
1157 return &SessionInfo{
1158 SessionID: in.SessionId,
1159 }
1160 }
1161
View as plain text