...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "time"
20
21 "cloud.google.com/go/internal/trace"
22 bq "google.golang.org/api/bigquery/v2"
23 )
24
25
26 type ExtractConfig struct {
27
28
29 Src *Table
30
31
32
33 SrcModel *Model
34
35
36 Dst *GCSReference
37
38
39 DisableHeader bool
40
41
42 Labels map[string]string
43
44
45
46
47
48
49 UseAvroLogicalTypes bool
50
51
52
53
54
55
56
57
58
59 JobTimeout time.Duration
60 }
61
62 func (e *ExtractConfig) toBQ() *bq.JobConfiguration {
63 var printHeader *bool
64 if e.DisableHeader {
65 f := false
66 printHeader = &f
67 }
68 cfg := &bq.JobConfiguration{
69 Labels: e.Labels,
70 Extract: &bq.JobConfigurationExtract{
71 DestinationUris: append([]string{}, e.Dst.URIs...),
72 Compression: string(e.Dst.Compression),
73 DestinationFormat: string(e.Dst.DestinationFormat),
74 FieldDelimiter: e.Dst.FieldDelimiter,
75
76 PrintHeader: printHeader,
77 UseAvroLogicalTypes: e.UseAvroLogicalTypes,
78 },
79 JobTimeoutMs: e.JobTimeout.Milliseconds(),
80 }
81 if e.Src != nil {
82 cfg.Extract.SourceTable = e.Src.toBQ()
83 }
84 if e.SrcModel != nil {
85 cfg.Extract.SourceModel = e.SrcModel.toBQ()
86 }
87 return cfg
88 }
89
90 func bqToExtractConfig(q *bq.JobConfiguration, c *Client) *ExtractConfig {
91 qe := q.Extract
92 return &ExtractConfig{
93 Labels: q.Labels,
94 Dst: &GCSReference{
95 URIs: qe.DestinationUris,
96 Compression: Compression(qe.Compression),
97 DestinationFormat: DataFormat(qe.DestinationFormat),
98 FileConfig: FileConfig{
99 CSVOptions: CSVOptions{
100 FieldDelimiter: qe.FieldDelimiter,
101 },
102 },
103 },
104 DisableHeader: qe.PrintHeader != nil && !*qe.PrintHeader,
105 Src: bqToTable(qe.SourceTable, c),
106 SrcModel: bqToModel(qe.SourceModel, c),
107 UseAvroLogicalTypes: qe.UseAvroLogicalTypes,
108 JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond,
109 }
110 }
111
112
113 type Extractor struct {
114 JobIDConfig
115 ExtractConfig
116 c *Client
117 }
118
119
120
121
122 func (t *Table) ExtractorTo(dst *GCSReference) *Extractor {
123 return &Extractor{
124 c: t.c,
125 ExtractConfig: ExtractConfig{
126 Src: t,
127 Dst: dst,
128 },
129 }
130 }
131
132
133
134
135 func (m *Model) ExtractorTo(dst *GCSReference) *Extractor {
136 return &Extractor{
137 c: m.c,
138 ExtractConfig: ExtractConfig{
139 SrcModel: m,
140 Dst: dst,
141 },
142 }
143 }
144
145
146 func (e *Extractor) Run(ctx context.Context) (j *Job, err error) {
147 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Extractor.Run")
148 defer func() { trace.EndSpan(ctx, err) }()
149
150 return e.c.insertJob(ctx, e.newJob(), nil)
151 }
152
153 func (e *Extractor) newJob() *bq.Job {
154 return &bq.Job{
155 JobReference: e.JobIDConfig.createJobRef(e.c),
156 Configuration: e.ExtractConfig.toBQ(),
157 }
158 }
159
View as plain text