...

Source file src/cloud.google.com/go/bigquery/routine_integration_test.go

Documentation: cloud.google.com/go/bigquery

     1  // Copyright 2022 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package bigquery
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"testing"
    21  
    22  	"cloud.google.com/go/bigquery/connection/apiv1/connectionpb"
    23  	"cloud.google.com/go/internal"
    24  	"cloud.google.com/go/internal/testutil"
    25  	gax "github.com/googleapis/gax-go/v2"
    26  	"google.golang.org/api/iterator"
    27  )
    28  
    29  func TestIntegration_RoutineScalarUDF(t *testing.T) {
    30  	if client == nil {
    31  		t.Skip("Integration tests skipped")
    32  	}
    33  	ctx := context.Background()
    34  
    35  	// Create a scalar UDF routine via API.
    36  	routineID := routineIDs.New()
    37  	routine := dataset.Routine(routineID)
    38  	err := routine.Create(ctx, &RoutineMetadata{
    39  		Type:     "SCALAR_FUNCTION",
    40  		Language: "SQL",
    41  		Body:     "x * 3",
    42  		Arguments: []*RoutineArgument{
    43  			{
    44  				Name: "x",
    45  				DataType: &StandardSQLDataType{
    46  					TypeKind: "INT64",
    47  				},
    48  			},
    49  		},
    50  	})
    51  	if err != nil {
    52  		t.Fatalf("Create: %v", err)
    53  	}
    54  }
    55  
    56  func TestIntegration_RoutineRangeType(t *testing.T) {
    57  	if client == nil {
    58  		t.Skip("Integration tests skipped")
    59  	}
    60  	ctx := context.Background()
    61  
    62  	routineID := routineIDs.New()
    63  	routine := dataset.Routine(routineID)
    64  	err := routine.Create(ctx, &RoutineMetadata{
    65  		Type:     "SCALAR_FUNCTION",
    66  		Language: "SQL",
    67  		Body:     "RANGE_CONTAINS(r1,r2)",
    68  		Arguments: []*RoutineArgument{
    69  			{
    70  				Name: "r1",
    71  				DataType: &StandardSQLDataType{
    72  					TypeKind: "RANGE",
    73  					RangeElementType: &StandardSQLDataType{
    74  						TypeKind: "TIMESTAMP",
    75  					},
    76  				},
    77  			},
    78  			{
    79  				Name: "r2",
    80  				DataType: &StandardSQLDataType{
    81  					TypeKind: "RANGE",
    82  					RangeElementType: &StandardSQLDataType{
    83  						TypeKind: "TIMESTAMP",
    84  					},
    85  				},
    86  			},
    87  		},
    88  	})
    89  	if err != nil {
    90  		t.Fatalf("Create: %v", err)
    91  	}
    92  }
    93  
    94  func TestIntegration_RoutineDataGovernance(t *testing.T) {
    95  	if client == nil {
    96  		t.Skip("Integration tests skipped")
    97  	}
    98  	ctx := context.Background()
    99  
   100  	// Create a scalar UDF routine via API.
   101  	routineID := routineIDs.New()
   102  	routine := dataset.Routine(routineID)
   103  	err := routine.Create(ctx, &RoutineMetadata{
   104  		Type:     "SCALAR_FUNCTION",
   105  		Language: "SQL",
   106  		Body:     "x",
   107  		Arguments: []*RoutineArgument{
   108  			{
   109  				Name: "x",
   110  				DataType: &StandardSQLDataType{
   111  					TypeKind: "INT64",
   112  				},
   113  			},
   114  		},
   115  		ReturnType:         &StandardSQLDataType{TypeKind: "INT64"},
   116  		DataGovernanceType: "DATA_MASKING",
   117  	})
   118  	if err != nil {
   119  		t.Fatalf("Create: %v", err)
   120  	}
   121  }
   122  
   123  func TestIntegration_RoutineJSUDF(t *testing.T) {
   124  	if client == nil {
   125  		t.Skip("Integration tests skipped")
   126  	}
   127  	ctx := context.Background()
   128  
   129  	// Create a scalar UDF routine via API.
   130  	routineID := routineIDs.New()
   131  	routine := dataset.Routine(routineID)
   132  	meta := &RoutineMetadata{
   133  		Language: "JAVASCRIPT", Type: "SCALAR_FUNCTION",
   134  		Description:      "capitalizes using javascript",
   135  		DeterminismLevel: Deterministic,
   136  		Arguments: []*RoutineArgument{
   137  			{Name: "instr", Kind: "FIXED_TYPE", DataType: &StandardSQLDataType{TypeKind: "STRING"}},
   138  		},
   139  		ReturnType: &StandardSQLDataType{TypeKind: "STRING"},
   140  		Body:       "return instr.toUpperCase();",
   141  	}
   142  	if err := routine.Create(ctx, meta); err != nil {
   143  		t.Fatalf("Create: %v", err)
   144  	}
   145  
   146  	newMeta := &RoutineMetadataToUpdate{
   147  		Language:    meta.Language,
   148  		Body:        meta.Body,
   149  		Arguments:   meta.Arguments,
   150  		Description: meta.Description,
   151  		ReturnType:  meta.ReturnType,
   152  		Type:        meta.Type,
   153  
   154  		DeterminismLevel: NotDeterministic,
   155  	}
   156  	if _, err := routine.Update(ctx, newMeta, ""); err != nil {
   157  		t.Fatalf("Update: %v", err)
   158  	}
   159  }
   160  
   161  func TestIntegration_RoutineRemoteUDF(t *testing.T) {
   162  	if client == nil {
   163  		t.Skip("Integration tests skipped")
   164  	}
   165  	ctx := context.Background()
   166  
   167  	routineID := routineIDs.New()
   168  	routine := dataset.Routine(routineID)
   169  	uri := "https://aaabbbccc-uc.a.run.app"
   170  
   171  	connectionLocation := fmt.Sprintf("projects/%s/locations/%s", dataset.ProjectID, "us")
   172  	connectionName := fmt.Sprintf("udf_conn%s", routineID)
   173  	cleanupConnection, connectionID, err := createConnection(ctx, t, connectionLocation, connectionName)
   174  	if err != nil {
   175  		t.Fatal(err)
   176  	}
   177  	defer cleanupConnection()
   178  
   179  	remoteOpts := &RemoteFunctionOptions{
   180  		Endpoint:           uri,
   181  		Connection:         connectionID,
   182  		MaxBatchingRows:    50,
   183  		UserDefinedContext: map[string]string{"foo": "bar"},
   184  	}
   185  	meta := &RoutineMetadata{
   186  		RemoteFunctionOptions: remoteOpts,
   187  		Description:           "defines a remote function",
   188  		Type:                  ScalarFunctionRoutine,
   189  		ReturnType: &StandardSQLDataType{
   190  			TypeKind: "STRING",
   191  		},
   192  	}
   193  
   194  	err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
   195  		if err := routine.Create(ctx, meta); err != nil {
   196  			return false, err
   197  		}
   198  		return true, nil
   199  	})
   200  	if err != nil {
   201  		t.Fatalf("routine.Create: %v", err)
   202  	}
   203  
   204  	gotMeta, err := routine.Metadata(ctx)
   205  	if err != nil {
   206  		t.Fatalf("routine.Metadata: %v", err)
   207  	}
   208  
   209  	if diff := testutil.Diff(gotMeta.RemoteFunctionOptions, remoteOpts); diff != "" {
   210  		t.Fatalf("RemoteFunctionOptions: -got, +want:\n%s", diff)
   211  	}
   212  }
   213  
   214  func createConnection(ctx context.Context, t *testing.T, parent, name string) (cleanup func(), connectionID string, err error) {
   215  	fullname := fmt.Sprintf("%s/connections/%s", parent, name)
   216  	conn, err := connectionsClient.CreateConnection(ctx, &connectionpb.CreateConnectionRequest{
   217  		Parent:       parent,
   218  		ConnectionId: name,
   219  		Connection: &connectionpb.Connection{
   220  			FriendlyName: name,
   221  			Properties: &connectionpb.Connection_CloudResource{
   222  				CloudResource: &connectionpb.CloudResourceProperties{},
   223  			},
   224  		},
   225  	})
   226  	if err != nil {
   227  		return
   228  	}
   229  	conn, err = connectionsClient.GetConnection(ctx, &connectionpb.GetConnectionRequest{
   230  		Name: fullname,
   231  	})
   232  	if err != nil {
   233  		return
   234  	}
   235  	cleanup = func() {
   236  		err := connectionsClient.DeleteConnection(ctx, &connectionpb.DeleteConnectionRequest{
   237  			Name: fullname,
   238  		})
   239  		if err != nil {
   240  			t.Logf("could not delete connection: %s", fullname)
   241  		}
   242  	}
   243  	connectionID = conn.Name
   244  	return
   245  }
   246  
   247  func TestIntegration_RoutineComplexTypes(t *testing.T) {
   248  	if client == nil {
   249  		t.Skip("Integration tests skipped")
   250  	}
   251  	ctx := context.Background()
   252  
   253  	routineID := routineIDs.New()
   254  	routine := dataset.Routine(routineID)
   255  	routineSQLID, _ := routine.Identifier(StandardSQLID)
   256  	sql := fmt.Sprintf(`
   257  		CREATE FUNCTION %s(
   258  			arr ARRAY<STRUCT<name STRING, val INT64>>
   259  		  ) AS (
   260  			  (SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem)
   261  		  )`,
   262  		routineSQLID)
   263  	if _, _, err := runQuerySQL(ctx, sql); err != nil {
   264  		t.Fatal(err)
   265  	}
   266  	defer routine.Delete(ctx)
   267  
   268  	meta, err := routine.Metadata(ctx)
   269  	if err != nil {
   270  		t.Fatalf("Metadata: %v", err)
   271  	}
   272  	if meta.Type != "SCALAR_FUNCTION" {
   273  		t.Fatalf("routine type mismatch, got %s want SCALAR_FUNCTION", meta.Type)
   274  	}
   275  	if meta.Language != "SQL" {
   276  		t.Fatalf("language type mismatch, got  %s want SQL", meta.Language)
   277  	}
   278  	want := []*RoutineArgument{
   279  		{
   280  			Name: "arr",
   281  			DataType: &StandardSQLDataType{
   282  				TypeKind: "ARRAY",
   283  				ArrayElementType: &StandardSQLDataType{
   284  					TypeKind: "STRUCT",
   285  					StructType: &StandardSQLStructType{
   286  						Fields: []*StandardSQLField{
   287  							{
   288  								Name: "name",
   289  								Type: &StandardSQLDataType{
   290  									TypeKind: "STRING",
   291  								},
   292  							},
   293  							{
   294  								Name: "val",
   295  								Type: &StandardSQLDataType{
   296  									TypeKind: "INT64",
   297  								},
   298  							},
   299  						},
   300  					},
   301  				},
   302  			},
   303  		},
   304  	}
   305  	if diff := testutil.Diff(meta.Arguments, want); diff != "" {
   306  		t.Fatalf("%+v: -got, +want:\n%s", meta.Arguments, diff)
   307  	}
   308  }
   309  
   310  func TestIntegration_RoutineLifecycle(t *testing.T) {
   311  	if client == nil {
   312  		t.Skip("Integration tests skipped")
   313  	}
   314  	ctx := context.Background()
   315  
   316  	// Create a scalar UDF routine via a CREATE FUNCTION query
   317  	routineID := routineIDs.New()
   318  	routine := dataset.Routine(routineID)
   319  	routineSQLID, _ := routine.Identifier(StandardSQLID)
   320  
   321  	sql := fmt.Sprintf(`
   322  		CREATE FUNCTION %s(x INT64) AS (x * 3);`,
   323  		routineSQLID)
   324  	if _, _, err := runQuerySQL(ctx, sql); err != nil {
   325  		t.Fatal(err)
   326  	}
   327  	defer routine.Delete(ctx)
   328  
   329  	// Get the routine metadata.
   330  	curMeta, err := routine.Metadata(ctx)
   331  	if err != nil {
   332  		t.Fatalf("couldn't get metadata: %v", err)
   333  	}
   334  
   335  	want := "SCALAR_FUNCTION"
   336  	if curMeta.Type != want {
   337  		t.Errorf("Routine type mismatch.  got %s want %s", curMeta.Type, want)
   338  	}
   339  
   340  	want = "SQL"
   341  	if curMeta.Language != want {
   342  		t.Errorf("Language mismatch. got %s want %s", curMeta.Language, want)
   343  	}
   344  
   345  	// Perform an update to change the routine body and description.
   346  	want = "x * 4"
   347  	wantDescription := "an updated description"
   348  	// during beta, update doesn't allow partial updates.  Provide all fields.
   349  	newMeta, err := routine.Update(ctx, &RoutineMetadataToUpdate{
   350  		Body:        want,
   351  		Arguments:   curMeta.Arguments,
   352  		Description: wantDescription,
   353  		ReturnType:  curMeta.ReturnType,
   354  		Type:        curMeta.Type,
   355  	}, curMeta.ETag)
   356  	if err != nil {
   357  		t.Fatalf("Update: %v", err)
   358  	}
   359  	if newMeta.Body != want {
   360  		t.Fatalf("Update body failed. want %s got %s", want, newMeta.Body)
   361  	}
   362  	if newMeta.Description != wantDescription {
   363  		t.Fatalf("Update description failed. want %s got %s", wantDescription, newMeta.Description)
   364  	}
   365  
   366  	// Ensure presence when enumerating the model list.
   367  	it := dataset.Routines(ctx)
   368  	seen := false
   369  	for {
   370  		r, err := it.Next()
   371  		if err == iterator.Done {
   372  			break
   373  		}
   374  		if err != nil {
   375  			t.Fatal(err)
   376  		}
   377  		if r.RoutineID == routineID {
   378  			seen = true
   379  		}
   380  	}
   381  	if !seen {
   382  		t.Fatal("routine not listed in dataset")
   383  	}
   384  
   385  	// Delete the model.
   386  	if err := routine.Delete(ctx); err != nil {
   387  		t.Fatalf("failed to delete routine: %v", err)
   388  	}
   389  }
   390  

View as plain text