...

Source file src/cloud.google.com/go/pubsub/schema_test.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //     https://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  
    14  package pubsub
    15  
    16  import (
    17  	"context"
    18  	"fmt"
    19  	"testing"
    20  
    21  	"cloud.google.com/go/internal/testutil"
    22  	"github.com/google/go-cmp/cmp"
    23  	"github.com/google/go-cmp/cmp/cmpopts"
    24  	"google.golang.org/api/iterator"
    25  	"google.golang.org/api/option"
    26  	"google.golang.org/grpc"
    27  	"google.golang.org/grpc/codes"
    28  	"google.golang.org/grpc/status"
    29  
    30  	"cloud.google.com/go/pubsub/pstest"
    31  )
    32  
    33  func newSchemaFake(t *testing.T) (*SchemaClient, *pstest.Server) {
    34  	ctx := context.Background()
    35  	srv := pstest.NewServer()
    36  
    37  	schema, err := NewSchemaClient(ctx, "my-proj", option.WithEndpoint(srv.Addr), option.WithoutAuthentication(), option.WithGRPCDialOption(grpc.WithInsecure()))
    38  	if err != nil {
    39  		t.Fatal(err)
    40  	}
    41  	return schema, srv
    42  }
    43  
    44  func TestSchemaBasicCreateGetDelete(t *testing.T) {
    45  	ctx := context.Background()
    46  
    47  	// Inputs
    48  	schemaID := "my-schema"
    49  	schemaPath := fmt.Sprintf("projects/my-proj/schemas/%s", schemaID)
    50  	schemaConfig := SchemaConfig{
    51  		Name:       schemaPath,
    52  		Type:       SchemaAvro,
    53  		Definition: "some-definition",
    54  	}
    55  
    56  	admin, _ := newSchemaFake(t)
    57  	defer admin.Close()
    58  
    59  	gotConfig, err := admin.CreateSchema(ctx, schemaID, schemaConfig)
    60  	if err != nil {
    61  		t.Fatalf("CreateSchema() got err: %v", err)
    62  	}
    63  	// Don't compare revisionID / create time since that isn't known
    64  	// until after it is created.
    65  	if diff := cmp.Diff(*gotConfig, schemaConfig, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
    66  		t.Errorf("CreateSchema() -want, +got: %v", diff)
    67  	}
    68  
    69  	gotConfig, err = admin.Schema(ctx, schemaID, SchemaViewFull)
    70  	if err != nil {
    71  		t.Errorf("Schema() got err: %v", err)
    72  	}
    73  	if diff := testutil.Diff(*gotConfig, schemaConfig, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
    74  		t.Errorf("Schema() -got, +want:\n%v", diff)
    75  	}
    76  
    77  	if err := admin.DeleteSchema(ctx, schemaID); err != nil {
    78  		t.Errorf("DeleteSchema() got err: %v", err)
    79  	}
    80  
    81  	if err := admin.DeleteSchema(ctx, "fake-schema"); err == nil {
    82  		t.Error("DeleteSchema() got nil, expected NotFound err")
    83  	}
    84  }
    85  
    86  func TestSchemaListSchemas(t *testing.T) {
    87  	ctx := context.Background()
    88  	admin, _ := newSchemaFake(t)
    89  	defer admin.Close()
    90  
    91  	// Inputs
    92  	schemaConfig1 := SchemaConfig{
    93  		Name:       "projects/my-proj/schemas/schema-1",
    94  		Type:       SchemaAvro,
    95  		Definition: "some schema definition",
    96  	}
    97  	schemaConfig2 := SchemaConfig{
    98  		Name:       "projects/my-proj/schemas/schema-2",
    99  		Type:       SchemaProtocolBuffer,
   100  		Definition: "some other schema definition",
   101  	}
   102  
   103  	mustCreateSchema(t, admin, "schema-1", schemaConfig1)
   104  	mustCreateSchema(t, admin, "schema-2", schemaConfig2)
   105  
   106  	var gotSchemaConfigs []SchemaConfig
   107  	it := admin.Schemas(ctx, SchemaViewFull)
   108  	for {
   109  		schema, err := it.Next()
   110  		if err == iterator.Done {
   111  			break
   112  		}
   113  		if err != nil {
   114  			t.Fatalf("SchemaIterator.Next() got err: %v", err)
   115  		}
   116  		gotSchemaConfigs = append(gotSchemaConfigs, *schema)
   117  	}
   118  
   119  	got := len(gotSchemaConfigs)
   120  	want := 2
   121  	if got != want {
   122  		t.Errorf("Schemas() want: %d schemas, got: %d", want, got)
   123  	}
   124  }
   125  
   126  func TestSchema_SchemaRevisions(t *testing.T) {
   127  	ctx := context.Background()
   128  	admin, _ := newSchemaFake(t)
   129  	defer admin.Close()
   130  
   131  	// Inputs
   132  	schemaID := "my-schema"
   133  	schemaPath := fmt.Sprintf("projects/my-proj/schemas/%s", schemaID)
   134  	schemaConfig := SchemaConfig{
   135  		Name:       schemaPath,
   136  		Type:       SchemaAvro,
   137  		Definition: "def1",
   138  	}
   139  
   140  	gotConfig, err := admin.CreateSchema(ctx, schemaID, schemaConfig)
   141  	if err != nil {
   142  		t.Fatalf("CreateSchema() got err: %v", err)
   143  	}
   144  	if diff := cmp.Diff(*gotConfig, schemaConfig, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
   145  		t.Fatalf("CreateSchema() -want, +got: %v", diff)
   146  	}
   147  
   148  	schemaConfig.Definition = "def2"
   149  	revConfig, err := admin.CommitSchema(ctx, schemaID, schemaConfig)
   150  	if err != nil {
   151  		t.Fatalf("CommitSchema() got err: %v", err)
   152  	}
   153  	if diff := cmp.Diff(*revConfig, schemaConfig, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
   154  		t.Fatalf("CommitSchema() -want, +got: %v", diff)
   155  	}
   156  
   157  	rbConfig, err := admin.RollbackSchema(ctx, schemaID, gotConfig.RevisionID)
   158  	if err != nil {
   159  		t.Fatalf("RollbackSchema() got err: %v", err)
   160  	}
   161  	schemaConfig.Definition = "def1"
   162  	if diff := cmp.Diff(*rbConfig, schemaConfig, cmpopts.IgnoreFields(SchemaConfig{}, "RevisionID", "RevisionCreateTime")); diff != "" {
   163  		t.Fatalf("RollbackSchema() -want, +got: %v", diff)
   164  	}
   165  
   166  	if _, err := admin.DeleteSchemaRevision(ctx, schemaID, gotConfig.RevisionID); err != nil {
   167  		t.Fatalf("DeleteSchemaRevision() got err: %v", err)
   168  	}
   169  
   170  	var got []*SchemaConfig
   171  	it := admin.ListSchemaRevisions(ctx, schemaID, SchemaViewFull)
   172  	for {
   173  		sc, err := it.Next()
   174  		if err == iterator.Done {
   175  			break
   176  		}
   177  		if err != nil {
   178  			t.Fatalf("SchemaIterator.Next() got err: %v", err)
   179  		}
   180  		got = append(got, sc)
   181  	}
   182  	if gotLen, wantLen := len(got), 2; gotLen != wantLen {
   183  		t.Errorf("ListSchemaRevisions() got %d revisions, want: %d", gotLen, wantLen)
   184  	}
   185  }
   186  
   187  func TestSchemaRollbackSchema(t *testing.T) {
   188  	admin, _ := newSchemaFake(t)
   189  	defer admin.Close()
   190  }
   191  
   192  func TestSchemaDeleteSchemaRevision(t *testing.T) {
   193  	admin, _ := newSchemaFake(t)
   194  	defer admin.Close()
   195  }
   196  
   197  func TestSchemaValidateSchema(t *testing.T) {
   198  	ctx := context.Background()
   199  	admin, _ := newSchemaFake(t)
   200  	defer admin.Close()
   201  
   202  	for _, tc := range []struct {
   203  		desc    string
   204  		schema  SchemaConfig
   205  		wantErr error
   206  	}{
   207  		{
   208  			desc: "valid avro schema",
   209  			schema: SchemaConfig{
   210  				Name:       "schema-1",
   211  				Type:       SchemaAvro,
   212  				Definition: "{name:some-avro-schema}",
   213  			},
   214  			wantErr: nil,
   215  		},
   216  		{
   217  			desc: "valid proto schema",
   218  			schema: SchemaConfig{
   219  				Name:       "schema-1",
   220  				Type:       SchemaProtocolBuffer,
   221  				Definition: "some proto buf schema definition",
   222  			},
   223  			wantErr: nil,
   224  		},
   225  		{
   226  			desc: "empty invalid schema",
   227  			schema: SchemaConfig{
   228  				Name:       "schema-3",
   229  				Type:       SchemaProtocolBuffer,
   230  				Definition: "",
   231  			},
   232  			wantErr: status.Error(codes.InvalidArgument, "schema definition cannot be empty"),
   233  		},
   234  	} {
   235  		t.Run(tc.desc, func(t *testing.T) {
   236  			_, gotErr := admin.ValidateSchema(ctx, tc.schema)
   237  			if status.Code(gotErr) != status.Code(tc.wantErr) {
   238  				t.Fatalf("got err: %v\nwant err: %v", gotErr, tc.wantErr)
   239  			}
   240  		})
   241  	}
   242  }
   243  
   244  func mustCreateSchema(t *testing.T, c *SchemaClient, id string, sc SchemaConfig) *SchemaConfig {
   245  	schema, err := c.CreateSchema(context.Background(), id, sc)
   246  	if err != nil {
   247  		t.Fatal(err)
   248  	}
   249  	return schema
   250  }
   251  

View as plain text