...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package bigquery
16
17 import (
18 "context"
19 "time"
20
21 bq "google.golang.org/api/bigquery/v2"
22 )
23
24
25
26 type TableCopyOperationType string
27
28 var (
29
30 CopyOperation TableCopyOperationType = "COPY"
31
32
33 SnapshotOperation TableCopyOperationType = "SNAPSHOT"
34
35 RestoreOperation TableCopyOperationType = "RESTORE"
36
37
38 CloneOperation TableCopyOperationType = "CLONE"
39 )
40
41
42 type CopyConfig struct {
43
44 Srcs []*Table
45
46
47 Dst *Table
48
49
50
51 CreateDisposition TableCreateDisposition
52
53
54
55 WriteDisposition TableWriteDisposition
56
57
58 Labels map[string]string
59
60
61 DestinationEncryptionConfig *EncryptionConfig
62
63
64
65 OperationType TableCopyOperationType
66
67
68
69
70
71
72
73
74
75 JobTimeout time.Duration
76 }
77
78 func (c *CopyConfig) toBQ() *bq.JobConfiguration {
79 var ts []*bq.TableReference
80 for _, t := range c.Srcs {
81 ts = append(ts, t.toBQ())
82 }
83 return &bq.JobConfiguration{
84 Labels: c.Labels,
85 Copy: &bq.JobConfigurationTableCopy{
86 CreateDisposition: string(c.CreateDisposition),
87 WriteDisposition: string(c.WriteDisposition),
88 DestinationTable: c.Dst.toBQ(),
89 DestinationEncryptionConfiguration: c.DestinationEncryptionConfig.toBQ(),
90 SourceTables: ts,
91 OperationType: string(c.OperationType),
92 },
93 JobTimeoutMs: c.JobTimeout.Milliseconds(),
94 }
95 }
96
97 func bqToCopyConfig(q *bq.JobConfiguration, c *Client) *CopyConfig {
98 cc := &CopyConfig{
99 Labels: q.Labels,
100 CreateDisposition: TableCreateDisposition(q.Copy.CreateDisposition),
101 WriteDisposition: TableWriteDisposition(q.Copy.WriteDisposition),
102 Dst: bqToTable(q.Copy.DestinationTable, c),
103 DestinationEncryptionConfig: bqToEncryptionConfig(q.Copy.DestinationEncryptionConfiguration),
104 OperationType: TableCopyOperationType(q.Copy.OperationType),
105 JobTimeout: time.Duration(q.JobTimeoutMs) * time.Millisecond,
106 }
107 for _, t := range q.Copy.SourceTables {
108 cc.Srcs = append(cc.Srcs, bqToTable(t, c))
109 }
110 return cc
111 }
112
113
114 type Copier struct {
115 JobIDConfig
116 CopyConfig
117 c *Client
118 }
119
120
121
122
123 func (t *Table) CopierFrom(srcs ...*Table) *Copier {
124 return &Copier{
125 c: t.c,
126 CopyConfig: CopyConfig{
127 Srcs: srcs,
128 Dst: t,
129 },
130 }
131 }
132
133
134 func (c *Copier) Run(ctx context.Context) (*Job, error) {
135 return c.c.insertJob(ctx, c.newJob(), nil)
136 }
137
138 func (c *Copier) newJob() *bq.Job {
139 return &bq.Job{
140 JobReference: c.JobIDConfig.createJobRef(c.c),
141 Configuration: c.CopyConfig.toBQ(),
142 }
143 }
144
View as plain text