1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "fmt"
20 "testing"
21
22 "cloud.google.com/go/bigquery/connection/apiv1/connectionpb"
23 "cloud.google.com/go/internal"
24 "cloud.google.com/go/internal/testutil"
25 gax "github.com/googleapis/gax-go/v2"
26 "google.golang.org/api/iterator"
27 )
28
29 func TestIntegration_RoutineScalarUDF(t *testing.T) {
30 if client == nil {
31 t.Skip("Integration tests skipped")
32 }
33 ctx := context.Background()
34
35
36 routineID := routineIDs.New()
37 routine := dataset.Routine(routineID)
38 err := routine.Create(ctx, &RoutineMetadata{
39 Type: "SCALAR_FUNCTION",
40 Language: "SQL",
41 Body: "x * 3",
42 Arguments: []*RoutineArgument{
43 {
44 Name: "x",
45 DataType: &StandardSQLDataType{
46 TypeKind: "INT64",
47 },
48 },
49 },
50 })
51 if err != nil {
52 t.Fatalf("Create: %v", err)
53 }
54 }
55
56 func TestIntegration_RoutineRangeType(t *testing.T) {
57 if client == nil {
58 t.Skip("Integration tests skipped")
59 }
60 ctx := context.Background()
61
62 routineID := routineIDs.New()
63 routine := dataset.Routine(routineID)
64 err := routine.Create(ctx, &RoutineMetadata{
65 Type: "SCALAR_FUNCTION",
66 Language: "SQL",
67 Body: "RANGE_CONTAINS(r1,r2)",
68 Arguments: []*RoutineArgument{
69 {
70 Name: "r1",
71 DataType: &StandardSQLDataType{
72 TypeKind: "RANGE",
73 RangeElementType: &StandardSQLDataType{
74 TypeKind: "TIMESTAMP",
75 },
76 },
77 },
78 {
79 Name: "r2",
80 DataType: &StandardSQLDataType{
81 TypeKind: "RANGE",
82 RangeElementType: &StandardSQLDataType{
83 TypeKind: "TIMESTAMP",
84 },
85 },
86 },
87 },
88 })
89 if err != nil {
90 t.Fatalf("Create: %v", err)
91 }
92 }
93
94 func TestIntegration_RoutineDataGovernance(t *testing.T) {
95 if client == nil {
96 t.Skip("Integration tests skipped")
97 }
98 ctx := context.Background()
99
100
101 routineID := routineIDs.New()
102 routine := dataset.Routine(routineID)
103 err := routine.Create(ctx, &RoutineMetadata{
104 Type: "SCALAR_FUNCTION",
105 Language: "SQL",
106 Body: "x",
107 Arguments: []*RoutineArgument{
108 {
109 Name: "x",
110 DataType: &StandardSQLDataType{
111 TypeKind: "INT64",
112 },
113 },
114 },
115 ReturnType: &StandardSQLDataType{TypeKind: "INT64"},
116 DataGovernanceType: "DATA_MASKING",
117 })
118 if err != nil {
119 t.Fatalf("Create: %v", err)
120 }
121 }
122
123 func TestIntegration_RoutineJSUDF(t *testing.T) {
124 if client == nil {
125 t.Skip("Integration tests skipped")
126 }
127 ctx := context.Background()
128
129
130 routineID := routineIDs.New()
131 routine := dataset.Routine(routineID)
132 meta := &RoutineMetadata{
133 Language: "JAVASCRIPT", Type: "SCALAR_FUNCTION",
134 Description: "capitalizes using javascript",
135 DeterminismLevel: Deterministic,
136 Arguments: []*RoutineArgument{
137 {Name: "instr", Kind: "FIXED_TYPE", DataType: &StandardSQLDataType{TypeKind: "STRING"}},
138 },
139 ReturnType: &StandardSQLDataType{TypeKind: "STRING"},
140 Body: "return instr.toUpperCase();",
141 }
142 if err := routine.Create(ctx, meta); err != nil {
143 t.Fatalf("Create: %v", err)
144 }
145
146 newMeta := &RoutineMetadataToUpdate{
147 Language: meta.Language,
148 Body: meta.Body,
149 Arguments: meta.Arguments,
150 Description: meta.Description,
151 ReturnType: meta.ReturnType,
152 Type: meta.Type,
153
154 DeterminismLevel: NotDeterministic,
155 }
156 if _, err := routine.Update(ctx, newMeta, ""); err != nil {
157 t.Fatalf("Update: %v", err)
158 }
159 }
160
161 func TestIntegration_RoutineRemoteUDF(t *testing.T) {
162 if client == nil {
163 t.Skip("Integration tests skipped")
164 }
165 ctx := context.Background()
166
167 routineID := routineIDs.New()
168 routine := dataset.Routine(routineID)
169 uri := "https://aaabbbccc-uc.a.run.app"
170
171 connectionLocation := fmt.Sprintf("projects/%s/locations/%s", dataset.ProjectID, "us")
172 connectionName := fmt.Sprintf("udf_conn%s", routineID)
173 cleanupConnection, connectionID, err := createConnection(ctx, t, connectionLocation, connectionName)
174 if err != nil {
175 t.Fatal(err)
176 }
177 defer cleanupConnection()
178
179 remoteOpts := &RemoteFunctionOptions{
180 Endpoint: uri,
181 Connection: connectionID,
182 MaxBatchingRows: 50,
183 UserDefinedContext: map[string]string{"foo": "bar"},
184 }
185 meta := &RoutineMetadata{
186 RemoteFunctionOptions: remoteOpts,
187 Description: "defines a remote function",
188 Type: ScalarFunctionRoutine,
189 ReturnType: &StandardSQLDataType{
190 TypeKind: "STRING",
191 },
192 }
193
194 err = internal.Retry(ctx, gax.Backoff{}, func() (stop bool, err error) {
195 if err := routine.Create(ctx, meta); err != nil {
196 return false, err
197 }
198 return true, nil
199 })
200 if err != nil {
201 t.Fatalf("routine.Create: %v", err)
202 }
203
204 gotMeta, err := routine.Metadata(ctx)
205 if err != nil {
206 t.Fatalf("routine.Metadata: %v", err)
207 }
208
209 if diff := testutil.Diff(gotMeta.RemoteFunctionOptions, remoteOpts); diff != "" {
210 t.Fatalf("RemoteFunctionOptions: -got, +want:\n%s", diff)
211 }
212 }
213
214 func createConnection(ctx context.Context, t *testing.T, parent, name string) (cleanup func(), connectionID string, err error) {
215 fullname := fmt.Sprintf("%s/connections/%s", parent, name)
216 conn, err := connectionsClient.CreateConnection(ctx, &connectionpb.CreateConnectionRequest{
217 Parent: parent,
218 ConnectionId: name,
219 Connection: &connectionpb.Connection{
220 FriendlyName: name,
221 Properties: &connectionpb.Connection_CloudResource{
222 CloudResource: &connectionpb.CloudResourceProperties{},
223 },
224 },
225 })
226 if err != nil {
227 return
228 }
229 conn, err = connectionsClient.GetConnection(ctx, &connectionpb.GetConnectionRequest{
230 Name: fullname,
231 })
232 if err != nil {
233 return
234 }
235 cleanup = func() {
236 err := connectionsClient.DeleteConnection(ctx, &connectionpb.DeleteConnectionRequest{
237 Name: fullname,
238 })
239 if err != nil {
240 t.Logf("could not delete connection: %s", fullname)
241 }
242 }
243 connectionID = conn.Name
244 return
245 }
246
247 func TestIntegration_RoutineComplexTypes(t *testing.T) {
248 if client == nil {
249 t.Skip("Integration tests skipped")
250 }
251 ctx := context.Background()
252
253 routineID := routineIDs.New()
254 routine := dataset.Routine(routineID)
255 routineSQLID, _ := routine.Identifier(StandardSQLID)
256 sql := fmt.Sprintf(`
257 CREATE FUNCTION %s(
258 arr ARRAY<STRUCT<name STRING, val INT64>>
259 ) AS (
260 (SELECT SUM(IF(elem.name = "foo",elem.val,null)) FROM UNNEST(arr) AS elem)
261 )`,
262 routineSQLID)
263 if _, _, err := runQuerySQL(ctx, sql); err != nil {
264 t.Fatal(err)
265 }
266 defer routine.Delete(ctx)
267
268 meta, err := routine.Metadata(ctx)
269 if err != nil {
270 t.Fatalf("Metadata: %v", err)
271 }
272 if meta.Type != "SCALAR_FUNCTION" {
273 t.Fatalf("routine type mismatch, got %s want SCALAR_FUNCTION", meta.Type)
274 }
275 if meta.Language != "SQL" {
276 t.Fatalf("language type mismatch, got %s want SQL", meta.Language)
277 }
278 want := []*RoutineArgument{
279 {
280 Name: "arr",
281 DataType: &StandardSQLDataType{
282 TypeKind: "ARRAY",
283 ArrayElementType: &StandardSQLDataType{
284 TypeKind: "STRUCT",
285 StructType: &StandardSQLStructType{
286 Fields: []*StandardSQLField{
287 {
288 Name: "name",
289 Type: &StandardSQLDataType{
290 TypeKind: "STRING",
291 },
292 },
293 {
294 Name: "val",
295 Type: &StandardSQLDataType{
296 TypeKind: "INT64",
297 },
298 },
299 },
300 },
301 },
302 },
303 },
304 }
305 if diff := testutil.Diff(meta.Arguments, want); diff != "" {
306 t.Fatalf("%+v: -got, +want:\n%s", meta.Arguments, diff)
307 }
308 }
309
310 func TestIntegration_RoutineLifecycle(t *testing.T) {
311 if client == nil {
312 t.Skip("Integration tests skipped")
313 }
314 ctx := context.Background()
315
316
317 routineID := routineIDs.New()
318 routine := dataset.Routine(routineID)
319 routineSQLID, _ := routine.Identifier(StandardSQLID)
320
321 sql := fmt.Sprintf(`
322 CREATE FUNCTION %s(x INT64) AS (x * 3);`,
323 routineSQLID)
324 if _, _, err := runQuerySQL(ctx, sql); err != nil {
325 t.Fatal(err)
326 }
327 defer routine.Delete(ctx)
328
329
330 curMeta, err := routine.Metadata(ctx)
331 if err != nil {
332 t.Fatalf("couldn't get metadata: %v", err)
333 }
334
335 want := "SCALAR_FUNCTION"
336 if curMeta.Type != want {
337 t.Errorf("Routine type mismatch. got %s want %s", curMeta.Type, want)
338 }
339
340 want = "SQL"
341 if curMeta.Language != want {
342 t.Errorf("Language mismatch. got %s want %s", curMeta.Language, want)
343 }
344
345
346 want = "x * 4"
347 wantDescription := "an updated description"
348
349 newMeta, err := routine.Update(ctx, &RoutineMetadataToUpdate{
350 Body: want,
351 Arguments: curMeta.Arguments,
352 Description: wantDescription,
353 ReturnType: curMeta.ReturnType,
354 Type: curMeta.Type,
355 }, curMeta.ETag)
356 if err != nil {
357 t.Fatalf("Update: %v", err)
358 }
359 if newMeta.Body != want {
360 t.Fatalf("Update body failed. want %s got %s", want, newMeta.Body)
361 }
362 if newMeta.Description != wantDescription {
363 t.Fatalf("Update description failed. want %s got %s", wantDescription, newMeta.Description)
364 }
365
366
367 it := dataset.Routines(ctx)
368 seen := false
369 for {
370 r, err := it.Next()
371 if err == iterator.Done {
372 break
373 }
374 if err != nil {
375 t.Fatal(err)
376 }
377 if r.RoutineID == routineID {
378 seen = true
379 }
380 }
381 if !seen {
382 t.Fatal("routine not listed in dataset")
383 }
384
385
386 if err := routine.Delete(ctx); err != nil {
387 t.Fatalf("failed to delete routine: %v", err)
388 }
389 }
390
View as plain text