1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "reflect"
22
23 "cloud.google.com/go/internal/trace"
24 bq "google.golang.org/api/bigquery/v2"
25 )
26
27
28
29
30 const NoDedupeID = "NoDedupeID"
31
32
33
34 type Inserter struct {
35 t *Table
36
37
38
39
40 SkipInvalidRows bool
41
42
43
44
45 IgnoreUnknownValues bool
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60 TableTemplateSuffix string
61 }
62
63
64
65
66
67
68 func (t *Table) Inserter() *Inserter {
69 return &Inserter{t: t}
70 }
71
72
73
74 func (t *Table) Uploader() *Inserter { return t.Inserter() }
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95 func (u *Inserter) Put(ctx context.Context, src interface{}) (err error) {
96 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Inserter.Put")
97 defer func() { trace.EndSpan(ctx, err) }()
98
99 savers, err := valueSavers(src)
100 if err != nil {
101 return err
102 }
103 return u.putMulti(ctx, savers)
104 }
105
106 func valueSavers(src interface{}) ([]ValueSaver, error) {
107 saver, ok, err := toValueSaver(src)
108 if err != nil {
109 return nil, err
110 }
111 if ok {
112 return []ValueSaver{saver}, nil
113 }
114 srcVal := reflect.ValueOf(src)
115 if srcVal.Kind() != reflect.Slice {
116 return nil, fmt.Errorf("%T is not a ValueSaver, struct, struct pointer, or slice", src)
117
118 }
119 var savers []ValueSaver
120 for i := 0; i < srcVal.Len(); i++ {
121 s := srcVal.Index(i).Interface()
122 saver, ok, err := toValueSaver(s)
123 if err != nil {
124 return nil, err
125 }
126 if !ok {
127 return nil, fmt.Errorf("src[%d] has type %T, which is not a ValueSaver, struct or struct pointer", i, s)
128 }
129 savers = append(savers, saver)
130 }
131 return savers, nil
132 }
133
134
135
136 func toValueSaver(x interface{}) (ValueSaver, bool, error) {
137 if _, ok := x.(StructSaver); ok {
138 return nil, false, errors.New("bigquery: use &StructSaver, not StructSaver")
139 }
140 var insertID string
141
142 if ss, ok := x.(*StructSaver); ok && ss.Schema == nil {
143 x = ss.Struct
144 insertID = ss.InsertID
145
146 }
147 if saver, ok := x.(ValueSaver); ok {
148 return saver, ok, nil
149 }
150 v := reflect.ValueOf(x)
151
152 if v.Kind() == reflect.Interface {
153 v = v.Elem()
154 }
155 if v.Kind() == reflect.Ptr {
156 v = v.Elem()
157 }
158 if v.Kind() != reflect.Struct {
159 return nil, false, nil
160 }
161 schema, err := inferSchemaReflectCached(v.Type())
162 if err != nil {
163 return nil, false, err
164 }
165 return &StructSaver{
166 Struct: x,
167 InsertID: insertID,
168 Schema: schema,
169 }, true, nil
170 }
171
172 func (u *Inserter) putMulti(ctx context.Context, src []ValueSaver) error {
173 req, err := u.newInsertRequest(src)
174 if err != nil {
175 return err
176 }
177 if req == nil {
178 return nil
179 }
180 call := u.t.c.bqs.Tabledata.InsertAll(u.t.ProjectID, u.t.DatasetID, u.t.TableID, req).Context(ctx)
181 setClientHeader(call.Header())
182 var res *bq.TableDataInsertAllResponse
183 err = runWithRetry(ctx, func() (err error) {
184 ctx = trace.StartSpan(ctx, "bigquery.tabledata.insertAll")
185 res, err = call.Do()
186 trace.EndSpan(ctx, err)
187 return err
188 })
189 if err != nil {
190 return err
191 }
192 return handleInsertErrors(res.InsertErrors, req.Rows)
193 }
194
195 func (u *Inserter) newInsertRequest(savers []ValueSaver) (*bq.TableDataInsertAllRequest, error) {
196 if savers == nil {
197 return nil, nil
198 }
199 req := &bq.TableDataInsertAllRequest{
200 TemplateSuffix: u.TableTemplateSuffix,
201 IgnoreUnknownValues: u.IgnoreUnknownValues,
202 SkipInvalidRows: u.SkipInvalidRows,
203 }
204 for _, saver := range savers {
205 row, insertID, err := saver.Save()
206 if err != nil {
207 return nil, err
208 }
209 if insertID == NoDedupeID {
210
211 insertID = ""
212 } else if insertID == "" {
213 insertID = randomIDFn()
214 }
215 m := make(map[string]bq.JsonValue)
216 for k, v := range row {
217 m[k] = bq.JsonValue(v)
218 }
219 req.Rows = append(req.Rows, &bq.TableDataInsertAllRequestRows{
220 InsertId: insertID,
221 Json: m,
222 })
223 }
224 return req, nil
225 }
226
227 func handleInsertErrors(ierrs []*bq.TableDataInsertAllResponseInsertErrors, rows []*bq.TableDataInsertAllRequestRows) error {
228 if len(ierrs) == 0 {
229 return nil
230 }
231 var errs PutMultiError
232 for _, e := range ierrs {
233 if int(e.Index) >= len(rows) {
234 return fmt.Errorf("internal error: unexpected row index: %v", e.Index)
235 }
236 rie := RowInsertionError{
237 InsertID: rows[e.Index].InsertId,
238 RowIndex: int(e.Index),
239 }
240 for _, errp := range e.Errors {
241 rie.Errors = append(rie.Errors, bqToError(errp))
242 }
243 errs = append(errs, rie)
244 }
245 return errs
246 }
247
248
249 type Uploader = Inserter
250
View as plain text