1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "errors"
20 "fmt"
21 "strings"
22 "time"
23
24 "cloud.google.com/go/internal/optional"
25 "cloud.google.com/go/internal/trace"
26 bq "google.golang.org/api/bigquery/v2"
27 )
28
29
30
31
32 type Routine struct {
33 ProjectID string
34 DatasetID string
35 RoutineID string
36
37 c *Client
38 }
39
40 func (r *Routine) toBQ() *bq.RoutineReference {
41 return &bq.RoutineReference{
42 ProjectId: r.ProjectID,
43 DatasetId: r.DatasetID,
44 RoutineId: r.RoutineID,
45 }
46 }
47
48
49
50
51
52 func (r *Routine) Identifier(f IdentifierFormat) (string, error) {
53 switch f {
54 case StandardSQLID:
55 if strings.Contains(r.ProjectID, "-") {
56 return fmt.Sprintf("`%s`.%s.%s", r.ProjectID, r.DatasetID, r.RoutineID), nil
57 }
58 return fmt.Sprintf("%s.%s.%s", r.ProjectID, r.DatasetID, r.RoutineID), nil
59 default:
60 return "", ErrUnknownIdentifierFormat
61 }
62 }
63
64
65 func (r *Routine) FullyQualifiedName() string {
66 s, _ := r.Identifier(StandardSQLID)
67 return s
68 }
69
70
71
72 func (r *Routine) Create(ctx context.Context, rm *RoutineMetadata) (err error) {
73 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Routine.Create")
74 defer func() { trace.EndSpan(ctx, err) }()
75
76 routine, err := rm.toBQ()
77 if err != nil {
78 return err
79 }
80 routine.RoutineReference = &bq.RoutineReference{
81 ProjectId: r.ProjectID,
82 DatasetId: r.DatasetID,
83 RoutineId: r.RoutineID,
84 }
85 req := r.c.bqs.Routines.Insert(r.ProjectID, r.DatasetID, routine).Context(ctx)
86 setClientHeader(req.Header())
87 _, err = req.Do()
88 return err
89 }
90
91
92 func (r *Routine) Metadata(ctx context.Context) (rm *RoutineMetadata, err error) {
93 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Routine.Metadata")
94 defer func() { trace.EndSpan(ctx, err) }()
95
96 req := r.c.bqs.Routines.Get(r.ProjectID, r.DatasetID, r.RoutineID).Context(ctx)
97 setClientHeader(req.Header())
98 var routine *bq.Routine
99 err = runWithRetry(ctx, func() (err error) {
100 ctx = trace.StartSpan(ctx, "bigquery.routines.get")
101 routine, err = req.Do()
102 trace.EndSpan(ctx, err)
103 return err
104 })
105 if err != nil {
106 return nil, err
107 }
108 return bqToRoutineMetadata(routine)
109 }
110
111
112 func (r *Routine) Update(ctx context.Context, upd *RoutineMetadataToUpdate, etag string) (rm *RoutineMetadata, err error) {
113 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Routine.Update")
114 defer func() { trace.EndSpan(ctx, err) }()
115
116 bqr, err := upd.toBQ()
117 if err != nil {
118 return nil, err
119 }
120
121 bqr.RoutineReference = &bq.RoutineReference{
122 ProjectId: r.ProjectID,
123 DatasetId: r.DatasetID,
124 RoutineId: r.RoutineID,
125 }
126
127 call := r.c.bqs.Routines.Update(r.ProjectID, r.DatasetID, r.RoutineID, bqr).Context(ctx)
128 setClientHeader(call.Header())
129 if etag != "" {
130 call.Header().Set("If-Match", etag)
131 }
132 var res *bq.Routine
133 if err := runWithRetry(ctx, func() (err error) {
134 ctx = trace.StartSpan(ctx, "bigquery.routines.update")
135 res, err = call.Do()
136 trace.EndSpan(ctx, err)
137 return err
138 }); err != nil {
139 return nil, err
140 }
141 return bqToRoutineMetadata(res)
142 }
143
144
145 func (r *Routine) Delete(ctx context.Context) (err error) {
146 ctx = trace.StartSpan(ctx, "cloud.google.com/go/bigquery.Model.Delete")
147 defer func() { trace.EndSpan(ctx, err) }()
148
149 req := r.c.bqs.Routines.Delete(r.ProjectID, r.DatasetID, r.RoutineID).Context(ctx)
150 setClientHeader(req.Header())
151 return req.Do()
152 }
153
154
155
156 type RoutineDeterminism string
157
158 const (
159
160 Deterministic RoutineDeterminism = "DETERMINISTIC"
161
162
163 NotDeterministic RoutineDeterminism = "NOT_DETERMINISTIC"
164 )
165
166 const (
167
168 ScalarFunctionRoutine = "SCALAR_FUNCTION"
169
170 ProcedureRoutine = "PROCEDURE"
171
172 TableValuedFunctionRoutine = "TABLE_VALUED_FUNCTION"
173 )
174
175
176 type RoutineMetadata struct {
177 ETag string
178
179
180 Type string
181 CreationTime time.Time
182 Description string
183
184 DeterminismLevel RoutineDeterminism
185 LastModifiedTime time.Time
186
187 Language string
188
189 Arguments []*RoutineArgument
190
191
192 RemoteFunctionOptions *RemoteFunctionOptions
193
194 ReturnType *StandardSQLDataType
195
196
197 ReturnTableType *StandardSQLTableType
198
199 ImportedLibraries []string
200
201
202
203
204
205
206
207
208 Body string
209
210
211
212
213 DataGovernanceType string
214 }
215
216
217 type RemoteFunctionOptions struct {
218
219
220
221
222
223 Connection string
224
225
226
227 Endpoint string
228
229
230
231 MaxBatchingRows int64
232
233
234
235
236
237 UserDefinedContext map[string]string
238 }
239
240 func bqToRemoteFunctionOptions(in *bq.RemoteFunctionOptions) (*RemoteFunctionOptions, error) {
241 if in == nil {
242 return nil, nil
243 }
244 rfo := &RemoteFunctionOptions{
245 Connection: in.Connection,
246 Endpoint: in.Endpoint,
247 MaxBatchingRows: in.MaxBatchingRows,
248 }
249 if in.UserDefinedContext != nil {
250 rfo.UserDefinedContext = make(map[string]string)
251 for k, v := range in.UserDefinedContext {
252 rfo.UserDefinedContext[k] = v
253 }
254 }
255 return rfo, nil
256 }
257
258 func (rfo *RemoteFunctionOptions) toBQ() (*bq.RemoteFunctionOptions, error) {
259 if rfo == nil {
260 return nil, nil
261 }
262 r := &bq.RemoteFunctionOptions{
263 Connection: rfo.Connection,
264 Endpoint: rfo.Endpoint,
265 MaxBatchingRows: rfo.MaxBatchingRows,
266 }
267 if rfo.UserDefinedContext != nil {
268 r.UserDefinedContext = make(map[string]string)
269 for k, v := range rfo.UserDefinedContext {
270 r.UserDefinedContext[k] = v
271 }
272 }
273 return r, nil
274 }
275
276 func (rm *RoutineMetadata) toBQ() (*bq.Routine, error) {
277 r := &bq.Routine{}
278 if rm == nil {
279 return r, nil
280 }
281 r.Description = rm.Description
282 r.DeterminismLevel = string(rm.DeterminismLevel)
283 r.Language = rm.Language
284 r.RoutineType = rm.Type
285 r.DefinitionBody = rm.Body
286 r.DataGovernanceType = rm.DataGovernanceType
287 rt, err := rm.ReturnType.toBQ()
288 if err != nil {
289 return nil, err
290 }
291 r.ReturnType = rt
292 if rm.ReturnTableType != nil {
293 tt, err := rm.ReturnTableType.toBQ()
294 if err != nil {
295 return nil, fmt.Errorf("couldn't convert return table type: %w", err)
296 }
297 r.ReturnTableType = tt
298 }
299 var args []*bq.Argument
300 for _, v := range rm.Arguments {
301 bqa, err := v.toBQ()
302 if err != nil {
303 return nil, err
304 }
305 args = append(args, bqa)
306 }
307 r.Arguments = args
308 r.ImportedLibraries = rm.ImportedLibraries
309 if rm.RemoteFunctionOptions != nil {
310 rfo, err := rm.RemoteFunctionOptions.toBQ()
311 if err != nil {
312 return nil, err
313 }
314 r.RemoteFunctionOptions = rfo
315 }
316 if !rm.CreationTime.IsZero() {
317 return nil, errors.New("cannot set CreationTime on create")
318 }
319 if !rm.LastModifiedTime.IsZero() {
320 return nil, errors.New("cannot set LastModifiedTime on create")
321 }
322 if rm.ETag != "" {
323 return nil, errors.New("cannot set ETag on create")
324 }
325 return r, nil
326 }
327
328
329
330 type RoutineArgument struct {
331
332 Name string
333
334
335
336
337
338
339
340 Kind string
341
342
343
344
345
346
347
348
349 Mode string
350
351
352 DataType *StandardSQLDataType
353 }
354
355 func (ra *RoutineArgument) toBQ() (*bq.Argument, error) {
356 if ra == nil {
357 return nil, nil
358 }
359 a := &bq.Argument{
360 Name: ra.Name,
361 ArgumentKind: ra.Kind,
362 Mode: ra.Mode,
363 }
364 if ra.DataType != nil {
365 dt, err := ra.DataType.toBQ()
366 if err != nil {
367 return nil, err
368 }
369 a.DataType = dt
370 }
371 return a, nil
372 }
373
374 func bqToRoutineArgument(bqa *bq.Argument) (*RoutineArgument, error) {
375 arg := &RoutineArgument{
376 Name: bqa.Name,
377 Kind: bqa.ArgumentKind,
378 Mode: bqa.Mode,
379 }
380 dt, err := bqToStandardSQLDataType(bqa.DataType)
381 if err != nil {
382 return nil, err
383 }
384 arg.DataType = dt
385 return arg, nil
386 }
387
388 func bqToArgs(in []*bq.Argument) ([]*RoutineArgument, error) {
389 var out []*RoutineArgument
390 for _, a := range in {
391 arg, err := bqToRoutineArgument(a)
392 if err != nil {
393 return nil, err
394 }
395 out = append(out, arg)
396 }
397 return out, nil
398 }
399
400 func routineArgumentsToBQ(in []*RoutineArgument) ([]*bq.Argument, error) {
401 var out []*bq.Argument
402 for _, inarg := range in {
403 arg, err := inarg.toBQ()
404 if err != nil {
405 return nil, err
406 }
407 out = append(out, arg)
408 }
409 return out, nil
410 }
411
412
413 type RoutineMetadataToUpdate struct {
414 Arguments []*RoutineArgument
415 Description optional.String
416 DeterminismLevel optional.String
417 Type optional.String
418 Language optional.String
419 Body optional.String
420 ImportedLibraries []string
421 ReturnType *StandardSQLDataType
422 ReturnTableType *StandardSQLTableType
423 DataGovernanceType optional.String
424 }
425
426 func (rm *RoutineMetadataToUpdate) toBQ() (*bq.Routine, error) {
427 r := &bq.Routine{}
428 forceSend := func(field string) {
429 r.ForceSendFields = append(r.ForceSendFields, field)
430 }
431 nullField := func(field string) {
432 r.NullFields = append(r.NullFields, field)
433 }
434 if rm.Description != nil {
435 r.Description = optional.ToString(rm.Description)
436 forceSend("Description")
437 }
438 if rm.DeterminismLevel != nil {
439 processed := false
440
441 if x, ok := rm.DeterminismLevel.(RoutineDeterminism); ok {
442 r.DeterminismLevel = string(x)
443 processed = true
444 }
445 if x, ok := rm.DeterminismLevel.(string); ok {
446 r.DeterminismLevel = x
447 processed = true
448 }
449 if !processed {
450 panic(fmt.Sprintf("DeterminismLevel should be either type string or RoutineDetermism in update, got %T", rm.DeterminismLevel))
451 }
452 }
453 if rm.Arguments != nil {
454 if len(rm.Arguments) == 0 {
455 nullField("Arguments")
456 } else {
457 args, err := routineArgumentsToBQ(rm.Arguments)
458 if err != nil {
459 return nil, err
460 }
461 r.Arguments = args
462 forceSend("Arguments")
463 }
464 }
465 if rm.Type != nil {
466 r.RoutineType = optional.ToString(rm.Type)
467 forceSend("RoutineType")
468 }
469 if rm.Language != nil {
470 r.Language = optional.ToString(rm.Language)
471 forceSend("Language")
472 }
473 if rm.Body != nil {
474 r.DefinitionBody = optional.ToString(rm.Body)
475 forceSend("DefinitionBody")
476 }
477 if rm.ImportedLibraries != nil {
478 if len(rm.ImportedLibraries) == 0 {
479 nullField("ImportedLibraries")
480 } else {
481 r.ImportedLibraries = rm.ImportedLibraries
482 forceSend("ImportedLibraries")
483 }
484 }
485 if rm.ReturnType != nil {
486 dt, err := rm.ReturnType.toBQ()
487 if err != nil {
488 return nil, err
489 }
490 r.ReturnType = dt
491 forceSend("ReturnType")
492 }
493 if rm.ReturnTableType != nil {
494 tt, err := rm.ReturnTableType.toBQ()
495 if err != nil {
496 return nil, err
497 }
498 r.ReturnTableType = tt
499 forceSend("ReturnTableType")
500 }
501 if rm.DataGovernanceType != nil {
502 r.DataGovernanceType = optional.ToString(rm.DataGovernanceType)
503 forceSend("DataGovernanceType")
504 }
505 return r, nil
506 }
507
508 func bqToRoutineMetadata(r *bq.Routine) (*RoutineMetadata, error) {
509 meta := &RoutineMetadata{
510 ETag: r.Etag,
511 Type: r.RoutineType,
512 CreationTime: unixMillisToTime(r.CreationTime),
513 Description: r.Description,
514 DeterminismLevel: RoutineDeterminism(r.DeterminismLevel),
515 LastModifiedTime: unixMillisToTime(r.LastModifiedTime),
516 Language: r.Language,
517 ImportedLibraries: r.ImportedLibraries,
518 Body: r.DefinitionBody,
519 DataGovernanceType: r.DataGovernanceType,
520 }
521 args, err := bqToArgs(r.Arguments)
522 if err != nil {
523 return nil, err
524 }
525 meta.Arguments = args
526 ret, err := bqToStandardSQLDataType(r.ReturnType)
527 if err != nil {
528 return nil, err
529 }
530 meta.ReturnType = ret
531 rfo, err := bqToRemoteFunctionOptions(r.RemoteFunctionOptions)
532 if err != nil {
533 return nil, err
534 }
535 meta.RemoteFunctionOptions = rfo
536 tt, err := bqToStandardSQLTableType(r.ReturnTableType)
537 if err != nil {
538 return nil, err
539 }
540 meta.ReturnTableType = tt
541 return meta, nil
542 }
543
View as plain text