1
2
3
4
5
6
7
8
9
10
11
12
13
14 package pubsub
15
16 import (
17 "context"
18 "fmt"
19 "testing"
20
21 "cloud.google.com/go/internal/testutil"
22 "github.com/google/go-cmp/cmp"
23 "github.com/google/go-cmp/cmp/cmpopts"
24 "google.golang.org/api/iterator"
25 "google.golang.org/api/option"
26 "google.golang.org/grpc"
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29
30 "cloud.google.com/go/pubsub/pstest"
31 )
32
33 func newSchemaFake(t *testing.T) (*SchemaClient, *pstest.Server) {
34 ctx := context.Background()
35 srv := pstest.NewServer()
36
37 schema, err := NewSchemaClient(ctx, "my-proj", option.WithEndpoint(srv.Addr), option.WithoutAuthentication(), option.WithGRPCDialOption(grpc.WithInsecure()))
38 if err != nil {
39 t.Fatal(err)
40 }
41 return schema, srv
42 }
43
44 func TestSchemaBasicCreateGetDelete(t *testing.T) {
45 ctx := context.Background()
46
47
48 schemaID := "my-schema"
49 schemaPath := fmt.Sprintf("projects/my-proj/schemas/%s", schemaID)
50 schemaConfig := SchemaConfig{
51 Name: schemaPath,
52 Type: SchemaAvro,
53 Definition: "some-definition",
54 }
55
56 admin, _ := newSchemaFake(t)
57 defer admin.Close()
58
59 gotConfig, err := admin.CreateSchema(ctx, schemaID, schemaConfig)
60 if err != nil {
61 t.Fatalf("CreateSchema() got err: %v", err)
62 }
63
64
65 if diff := cmp.Diff(*gotConfig, schemaConfig, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
66 t.Errorf("CreateSchema() -want, +got: %v", diff)
67 }
68
69 gotConfig, err = admin.Schema(ctx, schemaID, SchemaViewFull)
70 if err != nil {
71 t.Errorf("Schema() got err: %v", err)
72 }
73 if diff := testutil.Diff(*gotConfig, schemaConfig, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
74 t.Errorf("Schema() -got, +want:\n%v", diff)
75 }
76
77 if err := admin.DeleteSchema(ctx, schemaID); err != nil {
78 t.Errorf("DeleteSchema() got err: %v", err)
79 }
80
81 if err := admin.DeleteSchema(ctx, "fake-schema"); err == nil {
82 t.Error("DeleteSchema() got nil, expected NotFound err")
83 }
84 }
85
86 func TestSchemaListSchemas(t *testing.T) {
87 ctx := context.Background()
88 admin, _ := newSchemaFake(t)
89 defer admin.Close()
90
91
92 schemaConfig1 := SchemaConfig{
93 Name: "projects/my-proj/schemas/schema-1",
94 Type: SchemaAvro,
95 Definition: "some schema definition",
96 }
97 schemaConfig2 := SchemaConfig{
98 Name: "projects/my-proj/schemas/schema-2",
99 Type: SchemaProtocolBuffer,
100 Definition: "some other schema definition",
101 }
102
103 mustCreateSchema(t, admin, "schema-1", schemaConfig1)
104 mustCreateSchema(t, admin, "schema-2", schemaConfig2)
105
106 var gotSchemaConfigs []SchemaConfig
107 it := admin.Schemas(ctx, SchemaViewFull)
108 for {
109 schema, err := it.Next()
110 if err == iterator.Done {
111 break
112 }
113 if err != nil {
114 t.Fatalf("SchemaIterator.Next() got err: %v", err)
115 }
116 gotSchemaConfigs = append(gotSchemaConfigs, *schema)
117 }
118
119 got := len(gotSchemaConfigs)
120 want := 2
121 if got != want {
122 t.Errorf("Schemas() want: %d schemas, got: %d", want, got)
123 }
124 }
125
126 func TestSchema_SchemaRevisions(t *testing.T) {
127 ctx := context.Background()
128 admin, _ := newSchemaFake(t)
129 defer admin.Close()
130
131
132 schemaID := "my-schema"
133 schemaPath := fmt.Sprintf("projects/my-proj/schemas/%s", schemaID)
134 schemaConfig := SchemaConfig{
135 Name: schemaPath,
136 Type: SchemaAvro,
137 Definition: "def1",
138 }
139
140 gotConfig, err := admin.CreateSchema(ctx, schemaID, schemaConfig)
141 if err != nil {
142 t.Fatalf("CreateSchema() got err: %v", err)
143 }
144 if diff := cmp.Diff(*gotConfig, schemaConfig, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
145 t.Fatalf("CreateSchema() -want, +got: %v", diff)
146 }
147
148 schemaConfig.Definition = "def2"
149 revConfig, err := admin.CommitSchema(ctx, schemaID, schemaConfig)
150 if err != nil {
151 t.Fatalf("CommitSchema() got err: %v", err)
152 }
153 if diff := cmp.Diff(*revConfig, schemaConfig, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
154 t.Fatalf("CommitSchema() -want, +got: %v", diff)
155 }
156
157 rbConfig, err := admin.RollbackSchema(ctx, schemaID, gotConfig.RevisionID)
158 if err != nil {
159 t.Fatalf("RollbackSchema() got err: %v", err)
160 }
161 schemaConfig.Definition = "def1"
162 if diff := cmp.Diff(*rbConfig, schemaConfig, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
163 t.Fatalf("RollbackSchema() -want, +got: %v", diff)
164 }
165
166 if _, err := admin.DeleteSchemaRevision(ctx, schemaID, gotConfig.RevisionID); err != nil {
167 t.Fatalf("DeleteSchemaRevision() got err: %v", err)
168 }
169
170 var got []*SchemaConfig
171 it := admin.ListSchemaRevisions(ctx, schemaID, SchemaViewFull)
172 for {
173 sc, err := it.Next()
174 if err == iterator.Done {
175 break
176 }
177 if err != nil {
178 t.Fatalf("SchemaIterator.Next() got err: %v", err)
179 }
180 got = append(got, sc)
181 }
182 if gotLen, wantLen := len(got), 2; gotLen != wantLen {
183 t.Errorf("ListSchemaRevisions() got %d revisions, want: %d", gotLen, wantLen)
184 }
185 }
186
187 func TestSchemaRollbackSchema(t *testing.T) {
188 admin, _ := newSchemaFake(t)
189 defer admin.Close()
190 }
191
192 func TestSchemaDeleteSchemaRevision(t *testing.T) {
193 admin, _ := newSchemaFake(t)
194 defer admin.Close()
195 }
196
197 func TestSchemaValidateSchema(t *testing.T) {
198 ctx := context.Background()
199 admin, _ := newSchemaFake(t)
200 defer admin.Close()
201
202 for _, tc := range []struct {
203 desc string
204 schema SchemaConfig
205 wantErr error
206 }{
207 {
208 desc: "valid avro schema",
209 schema: SchemaConfig{
210 Name: "schema-1",
211 Type: SchemaAvro,
212 Definition: "{name:some-avro-schema}",
213 },
214 wantErr: nil,
215 },
216 {
217 desc: "valid proto schema",
218 schema: SchemaConfig{
219 Name: "schema-1",
220 Type: SchemaProtocolBuffer,
221 Definition: "some proto buf schema definition",
222 },
223 wantErr: nil,
224 },
225 {
226 desc: "empty invalid schema",
227 schema: SchemaConfig{
228 Name: "schema-3",
229 Type: SchemaProtocolBuffer,
230 Definition: "",
231 },
232 wantErr: status.Error(codes.InvalidArgument, "schema definition cannot be empty"),
233 },
234 } {
235 t.Run(tc.desc, func(t *testing.T) {
236 _, gotErr := admin.ValidateSchema(ctx, tc.schema)
237 if status.Code(gotErr) != status.Code(tc.wantErr) {
238 t.Fatalf("got err: %v\nwant err: %v", gotErr, tc.wantErr)
239 }
240 })
241 }
242 }
243
244 func mustCreateSchema(t *testing.T, c *SchemaClient, id string, sc SchemaConfig) *SchemaConfig {
245 schema, err := c.CreateSchema(context.Background(), id, sc)
246 if err != nil {
247 t.Fatal(err)
248 }
249 return schema
250 }
251
View as plain text