...

Source file src/cloud.google.com/go/bigquery/inserter.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2015 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"errors"
    20  	"fmt"
    21  	"reflect"
    22  
    23  	"cloud.google.com/go/internal/trace"
    24  	bq "google.golang.org/api/bigquery/v2"
    25  )
    26  
    27  // NoDedupeID indicates a streaming insert row wants to opt out of best-effort
    28  // deduplication.
    29  // It is EXPERIMENTAL and subject to change or removal without notice.
    30  const NoDedupeID = "NoDedupeID"
    31  
    32  // An Inserter does streaming inserts into a BigQuery table.
    33  // It is safe for concurrent use.
    34  type Inserter struct {
    35  	t *Table
    36  
    37  	// SkipInvalidRows causes rows containing invalid data to be silently
    38  	// ignored. The default value is false, which causes the entire request to
    39  	// fail if there is an attempt to insert an invalid row.
    40  	SkipInvalidRows bool
    41  
    42  	// IgnoreUnknownValues causes values not matching the schema to be ignored.
    43  	// The default value is false, which causes records containing such values
    44  	// to be treated as invalid records.
    45  	IgnoreUnknownValues bool
    46  
    47  	// A TableTemplateSuffix allows Inserters to create tables automatically.
    48  	//
    49  	// Experimental: this option is experimental and may be modified or removed in future versions,
    50  	// regardless of any other documented package stability guarantees. In general,
    51  	// the BigQuery team recommends the use of partitioned tables over sharding
    52  	// tables by suffix.
    53  	//
    54  	// When you specify a suffix, the table you upload data to
    55  	// will be used as a template for creating a new table, with the same schema,
    56  	// called <table> + <suffix>.
    57  	//
    58  	// More information is available at
    59  	// https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
    60  	TableTemplateSuffix string
    61  }
    62  
    63  // Inserter returns an Inserter that can be used to append rows to t.
    64  // The returned Inserter may optionally be further configured before its Put method is called.
    65  //
    66  // To stream rows into a date-partitioned table at a particular date, add the
    67  // $yyyymmdd suffix to the table name when constructing the Table.
    68  func (t *Table) Inserter() *Inserter {
    69  	return &Inserter{t: t}
    70  }
    71  
    72  // Uploader calls Inserter.
    73  // Deprecated: use Table.Inserter instead.
    74  func (t *Table) Uploader() *Inserter { return t.Inserter() }
    75  
    76  // Put uploads one or more rows to the BigQuery service.
    77  //
    78  // If src is ValueSaver, then its Save method is called to produce a row for uploading.
    79  //
    80  // If src is a struct or pointer to a struct, then a schema is inferred from it
    81  // and used to create a StructSaver. The InsertID of the StructSaver will be
    82  // empty.
    83  //
    84  // If src is a slice of ValueSavers, structs, or struct pointers, then each
    85  // element of the slice is treated as above, and multiple rows are uploaded.
    86  //
    87  // Put returns a PutMultiError if one or more rows failed to be uploaded.
    88  // The PutMultiError contains a RowInsertionError for each failed row.
    89  //
    90  // Put will retry on temporary errors (see
    91  // https://cloud.google.com/bigquery/troubleshooting-errors). This can result
    92  // in duplicate rows if you do not use insert IDs. Also, if the error persists,
    93  // the call will run indefinitely. Pass a context with a timeout to prevent
    94  // hanging calls.
    95  func (u *Inserter) Put(ctx context.Context, src interface{}) (err error) {
    96  	ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Inserter.Put")
    97  	defer func() { trace.EndSpan(ctx, err) }()
    98  
    99  	savers, err := valueSavers(src)
   100  	if err != nil {
   101  		return err
   102  	}
   103  	return u.putMulti(ctx, savers)
   104  }
   105  
   106  func valueSavers(src interface{}) ([]ValueSaver, error) {
   107  	saver, ok, err := toValueSaver(src)
   108  	if err != nil {
   109  		return nil, err
   110  	}
   111  	if ok {
   112  		return []ValueSaver{saver}, nil
   113  	}
   114  	srcVal := reflect.ValueOf(src)
   115  	if srcVal.Kind() != reflect.Slice {
   116  		return nil, fmt.Errorf("%T is not a ValueSaver, struct, struct pointer, or slice", src)
   117  
   118  	}
   119  	var savers []ValueSaver
   120  	for i := 0; i < srcVal.Len(); i++ {
   121  		s := srcVal.Index(i).Interface()
   122  		saver, ok, err := toValueSaver(s)
   123  		if err != nil {
   124  			return nil, err
   125  		}
   126  		if !ok {
   127  			return nil, fmt.Errorf("src[%d] has type %T, which is not a ValueSaver, struct or struct pointer", i, s)
   128  		}
   129  		savers = append(savers, saver)
   130  	}
   131  	return savers, nil
   132  }
   133  
   134  // Make a ValueSaver from x, which must implement ValueSaver already
   135  // or be a struct or pointer to struct.
   136  func toValueSaver(x interface{}) (ValueSaver, bool, error) {
   137  	if _, ok := x.(StructSaver); ok {
   138  		return nil, false, errors.New("bigquery: use &StructSaver, not StructSaver")
   139  	}
   140  	var insertID string
   141  	// Handle StructSavers specially so we can infer the schema if necessary.
   142  	if ss, ok := x.(*StructSaver); ok && ss.Schema == nil {
   143  		x = ss.Struct
   144  		insertID = ss.InsertID
   145  		// Fall through so we can infer the schema.
   146  	}
   147  	if saver, ok := x.(ValueSaver); ok {
   148  		return saver, ok, nil
   149  	}
   150  	v := reflect.ValueOf(x)
   151  	// Support Put with []interface{}
   152  	if v.Kind() == reflect.Interface {
   153  		v = v.Elem()
   154  	}
   155  	if v.Kind() == reflect.Ptr {
   156  		v = v.Elem()
   157  	}
   158  	if v.Kind() != reflect.Struct {
   159  		return nil, false, nil
   160  	}
   161  	schema, err := inferSchemaReflectCached(v.Type())
   162  	if err != nil {
   163  		return nil, false, err
   164  	}
   165  	return &StructSaver{
   166  		Struct:   x,
   167  		InsertID: insertID,
   168  		Schema:   schema,
   169  	}, true, nil
   170  }
   171  
   172  func (u *Inserter) putMulti(ctx context.Context, src []ValueSaver) error {
   173  	req, err := u.newInsertRequest(src)
   174  	if err != nil {
   175  		return err
   176  	}
   177  	if req == nil {
   178  		return nil
   179  	}
   180  	call := u.t.c.bqs.Tabledata.InsertAll(u.t.ProjectID, u.t.DatasetID, u.t.TableID, req).Context(ctx)
   181  	setClientHeader(call.Header())
   182  	var res *bq.TableDataInsertAllResponse
   183  	err = runWithRetry(ctx, func() (err error) {
   184  		ctx = trace.StartSpan(ctx, "bigquery.tabledata.insertAll")
   185  		res, err = call.Do()
   186  		trace.EndSpan(ctx, err)
   187  		return err
   188  	})
   189  	if err != nil {
   190  		return err
   191  	}
   192  	return handleInsertErrors(res.InsertErrors, req.Rows)
   193  }
   194  
   195  func (u *Inserter) newInsertRequest(savers []ValueSaver) (*bq.TableDataInsertAllRequest, error) {
   196  	if savers == nil { // If there are no rows, do nothing.
   197  		return nil, nil
   198  	}
   199  	req := &bq.TableDataInsertAllRequest{
   200  		TemplateSuffix:      u.TableTemplateSuffix,
   201  		IgnoreUnknownValues: u.IgnoreUnknownValues,
   202  		SkipInvalidRows:     u.SkipInvalidRows,
   203  	}
   204  	for _, saver := range savers {
   205  		row, insertID, err := saver.Save()
   206  		if err != nil {
   207  			return nil, err
   208  		}
   209  		if insertID == NoDedupeID {
   210  			// User wants to opt-out of sending deduplication ID.
   211  			insertID = ""
   212  		} else if insertID == "" {
   213  			insertID = randomIDFn()
   214  		}
   215  		m := make(map[string]bq.JsonValue)
   216  		for k, v := range row {
   217  			m[k] = bq.JsonValue(v)
   218  		}
   219  		req.Rows = append(req.Rows, &bq.TableDataInsertAllRequestRows{
   220  			InsertId: insertID,
   221  			Json:     m,
   222  		})
   223  	}
   224  	return req, nil
   225  }
   226  
   227  func handleInsertErrors(ierrs []*bq.TableDataInsertAllResponseInsertErrors, rows []*bq.TableDataInsertAllRequestRows) error {
   228  	if len(ierrs) == 0 {
   229  		return nil
   230  	}
   231  	var errs PutMultiError
   232  	for _, e := range ierrs {
   233  		if int(e.Index) >= len(rows) {
   234  			return fmt.Errorf("internal error: unexpected row index: %v", e.Index)
   235  		}
   236  		rie := RowInsertionError{
   237  			InsertID: rows[e.Index].InsertId,
   238  			RowIndex: int(e.Index),
   239  		}
   240  		for _, errp := range e.Errors {
   241  			rie.Errors = append(rie.Errors, bqToError(errp))
   242  		}
   243  		errs = append(errs, rie)
   244  	}
   245  	return errs
   246  }
   247  
   248  // Uploader is an obsolete name for Inserter.
   249  type Uploader = Inserter
   250  

View as plain text