1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package pubsub
16
17 import (
18 "context"
19 "fmt"
20 "time"
21
22 "google.golang.org/api/option"
23
24 vkit "cloud.google.com/go/pubsub/apiv1"
25 pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
26 )
27
28
29 type SchemaClient struct {
30 sc *vkit.SchemaClient
31 projectID string
32 }
33
34
35 func (s *SchemaClient) Close() error {
36 return s.sc.Close()
37 }
38
39
40 func NewSchemaClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*SchemaClient, error) {
41 sc, err := vkit.NewSchemaClient(ctx, opts...)
42 if err != nil {
43 return nil, err
44 }
45 return &SchemaClient{sc: sc, projectID: projectID}, nil
46 }
47
48
49 type SchemaConfig struct {
50
51
52 Name string
53
54
55 Type SchemaType
56
57
58
59
60 Definition string
61
62
63
64 RevisionID string
65
66
67
68 RevisionCreateTime time.Time
69 }
70
71
72 type SchemaType pb.Schema_Type
73
74 const (
75
76 SchemaTypeUnspecified SchemaType = 0
77
78 SchemaProtocolBuffer SchemaType = 1
79
80 SchemaAvro SchemaType = 2
81 )
82
83
84
85 type SchemaView pb.SchemaView
86
87 const (
88
89 SchemaViewUnspecified SchemaView = 0
90
91 SchemaViewBasic SchemaView = 1
92
93 SchemaViewFull SchemaView = 2
94 )
95
96
97
98 type SchemaSettings struct {
99
100
101 Schema string
102
103
104 Encoding SchemaEncoding
105
106
107
108
109 FirstRevisionID string
110
111
112
113
114 LastRevisionID string
115 }
116
117 func schemaSettingsToProto(schema *SchemaSettings) *pb.SchemaSettings {
118 if schema == nil {
119 return nil
120 }
121 return &pb.SchemaSettings{
122 Schema: schema.Schema,
123 Encoding: pb.Encoding(schema.Encoding),
124 FirstRevisionId: schema.FirstRevisionID,
125 LastRevisionId: schema.LastRevisionID,
126 }
127 }
128
129 func protoToSchemaSettings(pbs *pb.SchemaSettings) *SchemaSettings {
130 if pbs == nil {
131 return nil
132 }
133 return &SchemaSettings{
134 Schema: pbs.Schema,
135 Encoding: SchemaEncoding(pbs.Encoding),
136 FirstRevisionID: pbs.FirstRevisionId,
137 LastRevisionID: pbs.LastRevisionId,
138 }
139 }
140
141
142 type SchemaEncoding pb.Encoding
143
144 const (
145
146 EncodingUnspecified SchemaEncoding = 0
147
148 EncodingJSON SchemaEncoding = 1
149
150
151 EncodingBinary SchemaEncoding = 2
152 )
153
154 func (s *SchemaConfig) toProto() *pb.Schema {
155 pbs := &pb.Schema{
156 Name: s.Name,
157 Type: pb.Schema_Type(s.Type),
158 Definition: s.Definition,
159 }
160 return pbs
161 }
162
163 func protoToSchemaConfig(pbs *pb.Schema) *SchemaConfig {
164 return &SchemaConfig{
165 Name: pbs.Name,
166 Type: SchemaType(pbs.Type),
167 Definition: pbs.Definition,
168 RevisionID: pbs.RevisionId,
169 RevisionCreateTime: pbs.RevisionCreateTime.AsTime(),
170 }
171 }
172
173
174
175 func (c *SchemaClient) CreateSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error) {
176 req := &pb.CreateSchemaRequest{
177 Parent: fmt.Sprintf("projects/%s", c.projectID),
178 Schema: s.toProto(),
179 SchemaId: schemaID,
180 }
181 pbs, err := c.sc.CreateSchema(ctx, req)
182 if err != nil {
183 return nil, err
184 }
185 return protoToSchemaConfig(pbs), nil
186 }
187
188
189 func (c *SchemaClient) Schema(ctx context.Context, schemaID string, view SchemaView) (*SchemaConfig, error) {
190 schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID)
191 req := &pb.GetSchemaRequest{
192 Name: schemaPath,
193 View: pb.SchemaView(view),
194 }
195 s, err := c.sc.GetSchema(ctx, req)
196 if err != nil {
197 return nil, err
198 }
199 return protoToSchemaConfig(s), nil
200 }
201
202
203 func (c *SchemaClient) Schemas(ctx context.Context, view SchemaView) *SchemaIterator {
204 return &SchemaIterator{
205 it: c.sc.ListSchemas(ctx, &pb.ListSchemasRequest{
206 Parent: fmt.Sprintf("projects/%s", c.projectID),
207 View: pb.SchemaView(view),
208 }),
209 }
210 }
211
212
213 type SchemaIterator struct {
214 it *vkit.SchemaIterator
215 err error
216 }
217
218
219 func (s *SchemaIterator) Next() (*SchemaConfig, error) {
220 if s.err != nil {
221 return nil, s.err
222 }
223 pbs, err := s.it.Next()
224 if err != nil {
225 return nil, err
226 }
227 return protoToSchemaConfig(pbs), nil
228 }
229
230
231 func (c *SchemaClient) ListSchemaRevisions(ctx context.Context, schemaID string, view SchemaView) *SchemaIterator {
232 return &SchemaIterator{
233 it: c.sc.ListSchemaRevisions(ctx, &pb.ListSchemaRevisionsRequest{
234 Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID),
235 View: pb.SchemaView(view),
236 }),
237 }
238 }
239
240
241 func (c *SchemaClient) CommitSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error) {
242 req := &pb.CommitSchemaRequest{
243 Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID),
244 Schema: s.toProto(),
245 }
246 pbs, err := c.sc.CommitSchema(ctx, req)
247 if err != nil {
248 return nil, err
249 }
250 return protoToSchemaConfig(pbs), nil
251 }
252
253
254 func (c *SchemaClient) RollbackSchema(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error) {
255 req := &pb.RollbackSchemaRequest{
256 Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID),
257 RevisionId: revisionID,
258 }
259 pbs, err := c.sc.RollbackSchema(ctx, req)
260 if err != nil {
261 return nil, err
262 }
263 return protoToSchemaConfig(pbs), nil
264 }
265
266
267 func (c *SchemaClient) DeleteSchemaRevision(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error) {
268 schemaPath := fmt.Sprintf("projects/%s/schemas/%s@%s", c.projectID, schemaID, revisionID)
269 schema, err := c.sc.DeleteSchemaRevision(ctx, &pb.DeleteSchemaRevisionRequest{
270 Name: schemaPath,
271 })
272 if err != nil {
273 return nil, err
274 }
275 return protoToSchemaConfig(schema), nil
276 }
277
278
279 func (c *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error {
280 schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID)
281 return c.sc.DeleteSchema(ctx, &pb.DeleteSchemaRequest{
282 Name: schemaPath,
283 })
284 }
285
286
287
288 type ValidateSchemaResult struct{}
289
290
291 func (c *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error) {
292 req := &pb.ValidateSchemaRequest{
293 Parent: fmt.Sprintf("projects/%s", c.projectID),
294 Schema: schema.toProto(),
295 }
296 _, err := c.sc.ValidateSchema(ctx, req)
297 if err != nil {
298 return nil, err
299 }
300 return &ValidateSchemaResult{}, nil
301 }
302
303
304
305 type ValidateMessageResult struct{}
306
307
308
309 func (c *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error) {
310 req := &pb.ValidateMessageRequest{
311 Parent: fmt.Sprintf("projects/%s", c.projectID),
312 SchemaSpec: &pb.ValidateMessageRequest_Schema{
313 Schema: config.toProto(),
314 },
315 Message: msg,
316 Encoding: pb.Encoding(encoding),
317 }
318 _, err := c.sc.ValidateMessage(ctx, req)
319 if err != nil {
320 return nil, err
321 }
322 return &ValidateMessageResult{}, nil
323 }
324
325
326
327 func (c *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error) {
328 req := &pb.ValidateMessageRequest{
329 Parent: fmt.Sprintf("projects/%s", c.projectID),
330 SchemaSpec: &pb.ValidateMessageRequest_Name{
331 Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID),
332 },
333 Message: msg,
334 Encoding: pb.Encoding(encoding),
335 }
336 _, err := c.sc.ValidateMessage(ctx, req)
337 if err != nil {
338 return nil, err
339 }
340 return &ValidateMessageResult{}, nil
341 }
342
View as plain text