1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "io"
20 "time"
21
22 "cloud.google.com/go/internal/trace"
23 bq "google.golang.org/api/bigquery/v2"
24 "google.golang.org/api/googleapi"
25 )
26
27
28 type LoadConfig struct {
29
30 Src LoadSource
31
32
33 Dst *Table
34
35
36
37 CreateDisposition TableCreateDisposition
38
39
40
41 WriteDisposition TableWriteDisposition
42
43
44 Labels map[string]string
45
46
47 TimePartitioning *TimePartitioning
48
49
50 RangePartitioning *RangePartitioning
51
52
53 Clustering *Clustering
54
55
56 DestinationEncryptionConfig *EncryptionConfig
57
58
59
60 SchemaUpdateOptions []string
61
62
63
64
65 UseAvroLogicalTypes bool
66
67
68
69 ProjectionFields []string
70
71
72
73 HivePartitioningOptions *HivePartitioningOptions
74
75
76
77
78
79
80
81 DecimalTargetTypes []DecimalTargetType
82
83
84
85
86
87
88
89
90
91 JobTimeout time.Duration
92
93
94
95 ReferenceFileSchemaURI string
96
97
98
99
100
101 CreateSession bool
102
103
104 ConnectionProperties []*ConnectionProperty
105
106
107 MediaOptions []googleapi.MediaOption
108 }
109
110 func (l *LoadConfig) toBQ() (*bq.JobConfiguration, io.Reader) {
111 config := &bq.JobConfiguration{
112 Labels: l.Labels,
113 Load: &bq.JobConfigurationLoad{
114 CreateDisposition: string(l.CreateDisposition),
115 WriteDisposition: string(l.WriteDisposition),
116 DestinationTable: l.Dst.toBQ(),
117 TimePartitioning: l.TimePartitioning.toBQ(),
118 RangePartitioning: l.RangePartitioning.toBQ(),
119 Clustering: l.Clustering.toBQ(),
120 DestinationEncryptionConfiguration: l.DestinationEncryptionConfig.toBQ(),
121 SchemaUpdateOptions: l.SchemaUpdateOptions,
122 UseAvroLogicalTypes: l.UseAvroLogicalTypes,
123 ProjectionFields: l.ProjectionFields,
124 HivePartitioningOptions: l.HivePartitioningOptions.toBQ(),
125 ReferenceFileSchemaUri: l.ReferenceFileSchemaURI,
126 CreateSession: l.CreateSession,
127 },
128 JobTimeoutMs: l.JobTimeout.Milliseconds(),
129 }
130 for _, v := range l.DecimalTargetTypes {
131 config.Load.DecimalTargetTypes = append(config.Load.DecimalTargetTypes, string(v))
132 }
133 for _, v := range l.ConnectionProperties {
134 config.Load.ConnectionProperties = append(config.Load.ConnectionProperties, v.toBQ())
135 }
136 media := l.Src.populateLoadConfig(config.Load)
137 return config, media
138 }
139
140 func bqToLoadConfig(q *bq.JobConfiguration, c *Client) *LoadConfig {
141 lc := &LoadConfig{
142 Labels: q.Labels,
143 CreateDisposition: TableCreateDisposition(q.Load.CreateDisposition),
144 WriteDisposition: TableWriteDisposition(q.Load.WriteDisposition),
145 Dst: bqToTable(q.Load.DestinationTable, c),
146 TimePartitioning: bqToTimePartitioning(q.Load.TimePartitioning),
147 RangePartitioning: bqToRangePartitioning(q.Load.RangePartitioning),
148 Clustering: bqToClustering(q.Load.Clustering),
149 DestinationEncryptionConfig: bqToEncryptionConfig(q.Load.DestinationEncryptionConfiguration),
150 SchemaUpdateOptions: q.Load.SchemaUpdateOptions,
151 UseAvroLogicalTypes: q.Load.UseAvroLogicalTypes,
152 ProjectionFields: q.Load.ProjectionFields,
153 HivePartitioningOptions: bqToHivePartitioningOptions(q.Load.HivePartitioningOptions),
154 ReferenceFileSchemaURI: q.Load.ReferenceFileSchemaUri,
155 CreateSession: q.Load.CreateSession,
156 }
157 if q.JobTimeoutMs > 0 {
158 lc.JobTimeout = time.Duration(q.JobTimeoutMs) * time.Millisecond
159 }
160 for _, v := range q.Load.DecimalTargetTypes {
161 lc.DecimalTargetTypes = append(lc.DecimalTargetTypes, DecimalTargetType(v))
162 }
163 for _, v := range q.Load.ConnectionProperties {
164 lc.ConnectionProperties = append(lc.ConnectionProperties, bqToConnectionProperty(v))
165 }
166 var fc *FileConfig
167 if len(q.Load.SourceUris) == 0 {
168 s := NewReaderSource(nil)
169 fc = &s.FileConfig
170 lc.Src = s
171 } else {
172 s := NewGCSReference(q.Load.SourceUris...)
173 fc = &s.FileConfig
174 lc.Src = s
175 }
176 bqPopulateFileConfig(q.Load, fc)
177 return lc
178 }
179
180
181 type Loader struct {
182 JobIDConfig
183 LoadConfig
184 c *Client
185 }
186
187
188
189
190
191
192 type LoadSource interface {
193
194 populateLoadConfig(*bq.JobConfigurationLoad) io.Reader
195 }
196
197
198
199
200
201 func (t *Table) LoaderFrom(src LoadSource) *Loader {
202 return &Loader{
203 c: t.c,
204 LoadConfig: LoadConfig{
205 Src: src,
206 Dst: t,
207 },
208 }
209 }
210
211
212 func (l *Loader) Run(ctx context.Context) (j *Job, err error) {
213 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Load.Run")
214 defer func() { trace.EndSpan(ctx, err) }()
215
216 job, media := l.newJob()
217 return l.c.insertJob(ctx, job, media, l.LoadConfig.MediaOptions...)
218 }
219
220 func (l *Loader) newJob() (*bq.Job, io.Reader) {
221 config, media := l.LoadConfig.toBQ()
222 return &bq.Job{
223 JobReference: l.JobIDConfig.createJobRef(l.c),
224 Configuration: config,
225 }, media
226 }
227
228
229 type DecimalTargetType string
230
231 var (
232
233 NumericTargetType DecimalTargetType = "NUMERIC"
234
235
236 BigNumericTargetType DecimalTargetType = "BIGNUMERIC"
237
238
239 StringTargetType DecimalTargetType = "STRING"
240 )
241
View as plain text