...

Source file src/cloud.google.com/go/bigquery/load_test.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2015 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"strings"
    19  	"testing"
    20  	"time"
    21  
    22  	"cloud.google.com/go/internal/testutil"
    23  	"github.com/google/go-cmp/cmp"
    24  	"github.com/google/go-cmp/cmp/cmpopts"
    25  	bq "google.golang.org/api/bigquery/v2"
    26  )
    27  
    28  func defaultLoadJob() *bq.Job {
    29  	return &bq.Job{
    30  		JobReference: &bq.JobReference{JobId: "RANDOM", ProjectId: "client-project-id"},
    31  		Configuration: &bq.JobConfiguration{
    32  			Load: &bq.JobConfigurationLoad{
    33  				DestinationTable: &bq.TableReference{
    34  					ProjectId: "client-project-id",
    35  					DatasetId: "dataset-id",
    36  					TableId:   "table-id",
    37  				},
    38  				SourceUris: []string{"uri"},
    39  			},
    40  		},
    41  	}
    42  }
    43  
    44  func stringFieldSchema() *FieldSchema {
    45  	return &FieldSchema{Name: "fieldname", Type: StringFieldType}
    46  }
    47  
    48  func nestedFieldSchema() *FieldSchema {
    49  	return &FieldSchema{
    50  		Name:   "nested",
    51  		Type:   RecordFieldType,
    52  		Schema: Schema{stringFieldSchema()},
    53  	}
    54  }
    55  
    56  func bqStringFieldSchema() *bq.TableFieldSchema {
    57  	return &bq.TableFieldSchema{
    58  		Name: "fieldname",
    59  		Type: "STRING",
    60  	}
    61  }
    62  
    63  func bqNestedFieldSchema() *bq.TableFieldSchema {
    64  	return &bq.TableFieldSchema{
    65  		Name:   "nested",
    66  		Type:   "RECORD",
    67  		Fields: []*bq.TableFieldSchema{bqStringFieldSchema()},
    68  	}
    69  }
    70  
    71  func TestLoad(t *testing.T) {
    72  	defer fixRandomID("RANDOM")()
    73  	c := &Client{projectID: "client-project-id"}
    74  
    75  	testCases := []struct {
    76  		dst      *Table
    77  		src      LoadSource
    78  		jobID    string
    79  		location string
    80  		config   LoadConfig
    81  		want     *bq.Job
    82  	}{
    83  		{
    84  			dst:  c.Dataset("dataset-id").Table("table-id"),
    85  			src:  NewGCSReference("uri"),
    86  			want: defaultLoadJob(),
    87  		},
    88  		{
    89  			dst:      c.Dataset("dataset-id").Table("table-id"),
    90  			src:      NewGCSReference("uri"),
    91  			location: "loc",
    92  			want: func() *bq.Job {
    93  				j := defaultLoadJob()
    94  				j.JobReference.Location = "loc"
    95  				return j
    96  			}(),
    97  		},
    98  		{
    99  			dst:   c.Dataset("dataset-id").Table("table-id"),
   100  			jobID: "ajob",
   101  			config: LoadConfig{
   102  				CreateDisposition:           CreateNever,
   103  				WriteDisposition:            WriteTruncate,
   104  				Labels:                      map[string]string{"a": "b"},
   105  				TimePartitioning:            &TimePartitioning{Type: MonthPartitioningType, Expiration: 1234 * time.Millisecond},
   106  				Clustering:                  &Clustering{Fields: []string{"cfield1"}},
   107  				DestinationEncryptionConfig: &EncryptionConfig{KMSKeyName: "keyName"},
   108  				SchemaUpdateOptions:         []string{"ALLOW_FIELD_ADDITION"},
   109  			},
   110  			src: NewGCSReference("uri"),
   111  			want: func() *bq.Job {
   112  				j := defaultLoadJob()
   113  				j.Configuration.Labels = map[string]string{"a": "b"}
   114  				j.Configuration.Load.CreateDisposition = "CREATE_NEVER"
   115  				j.Configuration.Load.WriteDisposition = "WRITE_TRUNCATE"
   116  				j.Configuration.Load.TimePartitioning = &bq.TimePartitioning{
   117  					Type:         "MONTH",
   118  					ExpirationMs: 1234,
   119  				}
   120  				j.Configuration.Load.Clustering = &bq.Clustering{
   121  					Fields: []string{"cfield1"},
   122  				}
   123  				j.Configuration.Load.DestinationEncryptionConfiguration = &bq.EncryptionConfiguration{KmsKeyName: "keyName"}
   124  				j.JobReference = &bq.JobReference{
   125  					JobId:     "ajob",
   126  					ProjectId: "client-project-id",
   127  				}
   128  				j.Configuration.Load.SchemaUpdateOptions = []string{"ALLOW_FIELD_ADDITION"}
   129  				return j
   130  			}(),
   131  		},
   132  		{
   133  			dst: c.Dataset("dataset-id").Table("table-id"),
   134  			src: func() *GCSReference {
   135  				g := NewGCSReference("uri")
   136  				g.MaxBadRecords = 1
   137  				g.AllowJaggedRows = true
   138  				g.AllowQuotedNewlines = true
   139  				g.IgnoreUnknownValues = true
   140  				return g
   141  			}(),
   142  			config: LoadConfig{
   143  				JobTimeout: 4 * time.Second,
   144  			},
   145  			want: func() *bq.Job {
   146  				j := defaultLoadJob()
   147  				j.Configuration.Load.MaxBadRecords = 1
   148  				j.Configuration.Load.AllowJaggedRows = true
   149  				j.Configuration.Load.AllowQuotedNewlines = true
   150  				j.Configuration.Load.IgnoreUnknownValues = true
   151  				j.Configuration.JobTimeoutMs = 4000
   152  				return j
   153  			}(),
   154  		},
   155  		{
   156  			dst: c.Dataset("dataset-id").Table("table-id"),
   157  			src: func() *GCSReference {
   158  				g := NewGCSReference("uri")
   159  				g.Schema = Schema{
   160  					stringFieldSchema(),
   161  					nestedFieldSchema(),
   162  				}
   163  				return g
   164  			}(),
   165  			want: func() *bq.Job {
   166  				j := defaultLoadJob()
   167  				j.Configuration.Load.Schema = &bq.TableSchema{
   168  					Fields: []*bq.TableFieldSchema{
   169  						bqStringFieldSchema(),
   170  						bqNestedFieldSchema(),
   171  					}}
   172  				return j
   173  			}(),
   174  		},
   175  		{
   176  			dst: c.Dataset("dataset-id").Table("table-id"),
   177  			src: func() *GCSReference {
   178  				g := NewGCSReference("uri")
   179  				g.SkipLeadingRows = 1
   180  				g.SourceFormat = JSON
   181  				g.Encoding = UTF_8
   182  				g.FieldDelimiter = "\t"
   183  				g.Quote = "-"
   184  				return g
   185  			}(),
   186  			want: func() *bq.Job {
   187  				j := defaultLoadJob()
   188  				j.Configuration.Load.SkipLeadingRows = 1
   189  				j.Configuration.Load.SourceFormat = "NEWLINE_DELIMITED_JSON"
   190  				j.Configuration.Load.Encoding = "UTF-8"
   191  				j.Configuration.Load.FieldDelimiter = "\t"
   192  				hyphen := "-"
   193  				j.Configuration.Load.Quote = &hyphen
   194  				return j
   195  			}(),
   196  		},
   197  		{
   198  			dst: c.Dataset("dataset-id").Table("table-id"),
   199  			src: NewGCSReference("uri"),
   200  			want: func() *bq.Job {
   201  				j := defaultLoadJob()
   202  				// Quote is left unset in GCSReference, so should be nil here.
   203  				j.Configuration.Load.Quote = nil
   204  				return j
   205  			}(),
   206  		},
   207  		{
   208  			dst: c.Dataset("dataset-id").Table("table-id"),
   209  			src: func() *GCSReference {
   210  				g := NewGCSReference("uri")
   211  				g.ForceZeroQuote = true
   212  				return g
   213  			}(),
   214  			want: func() *bq.Job {
   215  				j := defaultLoadJob()
   216  				empty := ""
   217  				j.Configuration.Load.Quote = &empty
   218  				return j
   219  			}(),
   220  		},
   221  		{
   222  			dst: c.Dataset("dataset-id").Table("table-id"),
   223  			src: func() *ReaderSource {
   224  				r := NewReaderSource(strings.NewReader("foo"))
   225  				r.SkipLeadingRows = 1
   226  				r.SourceFormat = JSON
   227  				r.Encoding = UTF_8
   228  				r.FieldDelimiter = "\t"
   229  				r.Quote = "-"
   230  				return r
   231  			}(),
   232  			want: func() *bq.Job {
   233  				j := defaultLoadJob()
   234  				j.Configuration.Load.SourceUris = nil
   235  				j.Configuration.Load.SkipLeadingRows = 1
   236  				j.Configuration.Load.SourceFormat = "NEWLINE_DELIMITED_JSON"
   237  				j.Configuration.Load.Encoding = "UTF-8"
   238  				j.Configuration.Load.FieldDelimiter = "\t"
   239  				hyphen := "-"
   240  				j.Configuration.Load.Quote = &hyphen
   241  				return j
   242  			}(),
   243  		},
   244  		{
   245  			dst: c.Dataset("dataset-id").Table("table-id"),
   246  			src: func() *GCSReference {
   247  				g := NewGCSReference("uri")
   248  				g.SourceFormat = Avro
   249  				return g
   250  			}(),
   251  			config: LoadConfig{
   252  				UseAvroLogicalTypes: true,
   253  			},
   254  			want: func() *bq.Job {
   255  				j := defaultLoadJob()
   256  				j.Configuration.Load.SourceFormat = "AVRO"
   257  				j.Configuration.Load.UseAvroLogicalTypes = true
   258  				return j
   259  			}(),
   260  		},
   261  		{
   262  			dst: c.Dataset("dataset-id").Table("table-id"),
   263  			src: func() *ReaderSource {
   264  				r := NewReaderSource(strings.NewReader("foo"))
   265  				r.SourceFormat = Avro
   266  				return r
   267  			}(),
   268  			config: LoadConfig{
   269  				UseAvroLogicalTypes: true,
   270  			},
   271  			want: func() *bq.Job {
   272  				j := defaultLoadJob()
   273  				j.Configuration.Load.SourceUris = nil
   274  				j.Configuration.Load.SourceFormat = "AVRO"
   275  				j.Configuration.Load.UseAvroLogicalTypes = true
   276  				return j
   277  			}(),
   278  		},
   279  		{
   280  			dst: c.Dataset("dataset-id").Table("table-id"),
   281  			src: func() *ReaderSource {
   282  				r := NewReaderSource(strings.NewReader("foo"))
   283  				return r
   284  			}(),
   285  			config: LoadConfig{
   286  				TimePartitioning: &TimePartitioning{
   287  					Type:  HourPartitioningType,
   288  					Field: "somefield",
   289  				},
   290  			},
   291  			want: func() *bq.Job {
   292  				j := defaultLoadJob()
   293  				j.Configuration.Load.SourceUris = nil
   294  				j.Configuration.Load.TimePartitioning = &bq.TimePartitioning{
   295  					Field: "somefield",
   296  					Type:  "HOUR",
   297  				}
   298  				return j
   299  			}(),
   300  		},
   301  		{
   302  			dst: c.Dataset("dataset-id").Table("table-id"),
   303  			src: func() *ReaderSource {
   304  				r := NewReaderSource(strings.NewReader("foo"))
   305  				return r
   306  			}(),
   307  			config: LoadConfig{
   308  				RangePartitioning: &RangePartitioning{
   309  					Field: "somefield",
   310  					Range: &RangePartitioningRange{
   311  						Start:    1,
   312  						End:      2,
   313  						Interval: 3,
   314  					},
   315  				},
   316  			},
   317  			want: func() *bq.Job {
   318  				j := defaultLoadJob()
   319  				j.Configuration.Load.SourceUris = nil
   320  				j.Configuration.Load.RangePartitioning = &bq.RangePartitioning{
   321  					Field: "somefield",
   322  					Range: &bq.RangePartitioningRange{
   323  						Start:           1,
   324  						End:             2,
   325  						Interval:        3,
   326  						ForceSendFields: []string{"Start", "End", "Interval"},
   327  					},
   328  				}
   329  				return j
   330  			}(),
   331  		},
   332  		{
   333  			dst: c.Dataset("dataset-id").Table("table-id"),
   334  			src: func() *GCSReference {
   335  				g := NewGCSReference("uri")
   336  				g.SourceFormat = DatastoreBackup
   337  				return g
   338  			}(),
   339  			config: LoadConfig{
   340  				ProjectionFields: []string{"foo", "bar", "baz"},
   341  			},
   342  			want: func() *bq.Job {
   343  				j := defaultLoadJob()
   344  				j.Configuration.Load.SourceFormat = "DATASTORE_BACKUP"
   345  				j.Configuration.Load.ProjectionFields = []string{"foo", "bar", "baz"}
   346  				return j
   347  			}(),
   348  		},
   349  		{
   350  			dst: c.Dataset("dataset-id").Table("table-id"),
   351  			src: func() *GCSReference {
   352  				g := NewGCSReference("uri")
   353  				g.SourceFormat = Parquet
   354  				return g
   355  			}(),
   356  			config: LoadConfig{
   357  				HivePartitioningOptions: &HivePartitioningOptions{
   358  					Mode:                   CustomHivePartitioningMode,
   359  					SourceURIPrefix:        "source_uri",
   360  					RequirePartitionFilter: true,
   361  				},
   362  			},
   363  			want: func() *bq.Job {
   364  				j := defaultLoadJob()
   365  				j.Configuration.Load.SourceFormat = "PARQUET"
   366  				j.Configuration.Load.HivePartitioningOptions = &bq.HivePartitioningOptions{
   367  					Mode:                   "CUSTOM",
   368  					RequirePartitionFilter: true,
   369  					SourceUriPrefix:        "source_uri",
   370  				}
   371  				return j
   372  			}(),
   373  		},
   374  		{
   375  			dst: c.Dataset("dataset-id").Table("table-id"),
   376  			src: func() *GCSReference {
   377  				g := NewGCSReference("uri")
   378  				g.SourceFormat = Parquet
   379  				return g
   380  			}(),
   381  			config: LoadConfig{
   382  				DecimalTargetTypes: []DecimalTargetType{BigNumericTargetType, NumericTargetType, StringTargetType},
   383  			},
   384  			want: func() *bq.Job {
   385  				j := defaultLoadJob()
   386  				j.Configuration.Load.SourceFormat = "PARQUET"
   387  				j.Configuration.Load.DecimalTargetTypes = []string{"BIGNUMERIC", "NUMERIC", "STRING"}
   388  				return j
   389  			}(),
   390  		},
   391  		{
   392  			dst: c.Dataset("dataset-id").Table("table-id"),
   393  			src: func() *GCSReference {
   394  				g := NewGCSReference("uri")
   395  				g.SourceFormat = Parquet
   396  				return g
   397  			}(),
   398  			config: LoadConfig{
   399  				ReferenceFileSchemaURI: "schema.parquet",
   400  			},
   401  			want: func() *bq.Job {
   402  				j := defaultLoadJob()
   403  				j.Configuration.Load.SourceFormat = "PARQUET"
   404  				j.Configuration.Load.ReferenceFileSchemaUri = "schema.parquet"
   405  				return j
   406  			}(),
   407  		},
   408  		{
   409  			dst: c.Dataset("dataset-id").Table("table-id"),
   410  			src: func() *GCSReference {
   411  				g := NewGCSReference("uri")
   412  				return g
   413  			}(),
   414  			config: LoadConfig{
   415  				CreateSession: true,
   416  				ConnectionProperties: []*ConnectionProperty{
   417  					{
   418  						Key:   "session_id",
   419  						Value: "session_id_1234567890",
   420  					},
   421  				},
   422  			},
   423  			want: func() *bq.Job {
   424  				j := defaultLoadJob()
   425  				j.Configuration.Load.CreateSession = true
   426  				j.Configuration.Load.ConnectionProperties = []*bq.ConnectionProperty{
   427  					{
   428  						Key:   "session_id",
   429  						Value: "session_id_1234567890",
   430  					},
   431  				}
   432  				return j
   433  			}(),
   434  		},
   435  	}
   436  
   437  	for i, tc := range testCases {
   438  		loader := tc.dst.LoaderFrom(tc.src)
   439  		loader.JobID = tc.jobID
   440  		loader.Location = tc.location
   441  		tc.config.Src = tc.src
   442  		tc.config.Dst = tc.dst
   443  		loader.LoadConfig = tc.config
   444  		got, _ := loader.newJob()
   445  		checkJob(t, i, got, tc.want)
   446  
   447  		jc, err := bqToJobConfig(got.Configuration, c)
   448  		if err != nil {
   449  			t.Fatalf("#%d: %v", i, err)
   450  		}
   451  		diff := testutil.Diff(jc.(*LoadConfig), &loader.LoadConfig,
   452  			cmp.AllowUnexported(Table{}, Client{}),
   453  			cmpopts.IgnoreUnexported(ReaderSource{}))
   454  		if diff != "" {
   455  			t.Errorf("#%d: (got=-, want=+:\n%s", i, diff)
   456  		}
   457  	}
   458  }
   459  

View as plain text