...

Source file src/cloud.google.com/go/pubsub/schema.go

Documentation: cloud.google.com/go/pubsub

     1  // Copyright 2021 Google LLC
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //      http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package pubsub
    16  
    17  import (
    18  	"context"
    19  	"fmt"
    20  	"time"
    21  
    22  	"google.golang.org/api/option"
    23  
    24  	vkit "cloud.google.com/go/pubsub/apiv1"
    25  	pb "cloud.google.com/go/pubsub/apiv1/pubsubpb"
    26  )
    27  
    28  // SchemaClient is a Pub/Sub schema client scoped to a single project.
    29  type SchemaClient struct {
    30  	sc        *vkit.SchemaClient
    31  	projectID string
    32  }
    33  
    34  // Close closes the schema client and frees up resources.
    35  func (s *SchemaClient) Close() error {
    36  	return s.sc.Close()
    37  }
    38  
    39  // NewSchemaClient creates a new Pub/Sub Schema client.
    40  func NewSchemaClient(ctx context.Context, projectID string, opts ...option.ClientOption) (*SchemaClient, error) {
    41  	sc, err := vkit.NewSchemaClient(ctx, opts...)
    42  	if err != nil {
    43  		return nil, err
    44  	}
    45  	return &SchemaClient{sc: sc, projectID: projectID}, nil
    46  }
    47  
    48  // SchemaConfig is a reference to a PubSub schema.
    49  type SchemaConfig struct {
    50  	// Name of the schema.
    51  	// Format is `projects/{project}/schemas/{schema}`
    52  	Name string
    53  
    54  	// The type of the schema definition.
    55  	Type SchemaType
    56  
    57  	// The definition of the schema. This should contain a string representing
    58  	// the full definition of the schema that is a valid schema definition of
    59  	// the type specified in `type`.
    60  	Definition string
    61  
    62  	// RevisionID is the revision ID of the schema.
    63  	// This field is output only.
    64  	RevisionID string
    65  
    66  	// RevisionCreateTime is the timestamp that the revision was created.
    67  	// This field is output only.
    68  	RevisionCreateTime time.Time
    69  }
    70  
    71  // SchemaType is the possible schema definition types.
    72  type SchemaType pb.Schema_Type
    73  
    74  const (
    75  	// SchemaTypeUnspecified is the unused default value.
    76  	SchemaTypeUnspecified SchemaType = 0
    77  	// SchemaProtocolBuffer is a protobuf schema definition.
    78  	SchemaProtocolBuffer SchemaType = 1
    79  	// SchemaAvro is an Avro schema definition.
    80  	SchemaAvro SchemaType = 2
    81  )
    82  
    83  // SchemaView is a view of Schema object fields to be returned
    84  // by GetSchema and ListSchemas.
    85  type SchemaView pb.SchemaView
    86  
    87  const (
    88  	// SchemaViewUnspecified is the default/unset value.
    89  	SchemaViewUnspecified SchemaView = 0
    90  	// SchemaViewBasic includes the name and type of the schema, but not the definition.
    91  	SchemaViewBasic SchemaView = 1
    92  	// SchemaViewFull includes all Schema object fields.
    93  	SchemaViewFull SchemaView = 2
    94  )
    95  
    96  // SchemaSettings are settings for validating messages
    97  // published against a schema.
    98  type SchemaSettings struct {
    99  	// The name of the schema that messages published should be
   100  	// validated against. Format is `projects/{project}/schemas/{schema}`
   101  	Schema string
   102  
   103  	// The encoding of messages validated against the schema.
   104  	Encoding SchemaEncoding
   105  
   106  	// The minimum (inclusive) revision allowed for validating messages. If empty
   107  	// or not present, allow any revision to be validated against LastRevisionID or
   108  	// any revision created before.
   109  	FirstRevisionID string
   110  
   111  	// The maximum (inclusive) revision allowed for validating messages. If empty
   112  	// or not present, allow any revision to be validated against FirstRevisionID
   113  	// or any revision created after.
   114  	LastRevisionID string
   115  }
   116  
   117  func schemaSettingsToProto(schema *SchemaSettings) *pb.SchemaSettings {
   118  	if schema == nil {
   119  		return nil
   120  	}
   121  	return &pb.SchemaSettings{
   122  		Schema:          schema.Schema,
   123  		Encoding:        pb.Encoding(schema.Encoding),
   124  		FirstRevisionId: schema.FirstRevisionID,
   125  		LastRevisionId:  schema.LastRevisionID,
   126  	}
   127  }
   128  
   129  func protoToSchemaSettings(pbs *pb.SchemaSettings) *SchemaSettings {
   130  	if pbs == nil {
   131  		return nil
   132  	}
   133  	return &SchemaSettings{
   134  		Schema:          pbs.Schema,
   135  		Encoding:        SchemaEncoding(pbs.Encoding),
   136  		FirstRevisionID: pbs.FirstRevisionId,
   137  		LastRevisionID:  pbs.LastRevisionId,
   138  	}
   139  }
   140  
   141  // SchemaEncoding is the encoding expected for messages.
   142  type SchemaEncoding pb.Encoding
   143  
   144  const (
   145  	// EncodingUnspecified is the default unused value.
   146  	EncodingUnspecified SchemaEncoding = 0
   147  	// EncodingJSON is the JSON encoding type for a message.
   148  	EncodingJSON SchemaEncoding = 1
   149  	// EncodingBinary is the binary encoding type for a message.
   150  	// For some schema types, binary encoding may not be available.
   151  	EncodingBinary SchemaEncoding = 2
   152  )
   153  
   154  func (s *SchemaConfig) toProto() *pb.Schema {
   155  	pbs := &pb.Schema{
   156  		Name:       s.Name,
   157  		Type:       pb.Schema_Type(s.Type),
   158  		Definition: s.Definition,
   159  	}
   160  	return pbs
   161  }
   162  
   163  func protoToSchemaConfig(pbs *pb.Schema) *SchemaConfig {
   164  	return &SchemaConfig{
   165  		Name:               pbs.Name,
   166  		Type:               SchemaType(pbs.Type),
   167  		Definition:         pbs.Definition,
   168  		RevisionID:         pbs.RevisionId,
   169  		RevisionCreateTime: pbs.RevisionCreateTime.AsTime(),
   170  	}
   171  }
   172  
   173  // CreateSchema creates a new schema with the given schemaID
   174  // and config. Schemas cannot be updated after creation.
   175  func (c *SchemaClient) CreateSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error) {
   176  	req := &pb.CreateSchemaRequest{
   177  		Parent:   fmt.Sprintf("projects/%s", c.projectID),
   178  		Schema:   s.toProto(),
   179  		SchemaId: schemaID,
   180  	}
   181  	pbs, err := c.sc.CreateSchema(ctx, req)
   182  	if err != nil {
   183  		return nil, err
   184  	}
   185  	return protoToSchemaConfig(pbs), nil
   186  }
   187  
   188  // Schema retrieves the configuration of a schema given a schemaID and a view.
   189  func (c *SchemaClient) Schema(ctx context.Context, schemaID string, view SchemaView) (*SchemaConfig, error) {
   190  	schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID)
   191  	req := &pb.GetSchemaRequest{
   192  		Name: schemaPath,
   193  		View: pb.SchemaView(view),
   194  	}
   195  	s, err := c.sc.GetSchema(ctx, req)
   196  	if err != nil {
   197  		return nil, err
   198  	}
   199  	return protoToSchemaConfig(s), nil
   200  }
   201  
   202  // Schemas returns an iterator which returns all of the schemas for the client's project.
   203  func (c *SchemaClient) Schemas(ctx context.Context, view SchemaView) *SchemaIterator {
   204  	return &SchemaIterator{
   205  		it: c.sc.ListSchemas(ctx, &pb.ListSchemasRequest{
   206  			Parent: fmt.Sprintf("projects/%s", c.projectID),
   207  			View:   pb.SchemaView(view),
   208  		}),
   209  	}
   210  }
   211  
   212  // SchemaIterator is a struct used to iterate over schemas.
   213  type SchemaIterator struct {
   214  	it  *vkit.SchemaIterator
   215  	err error
   216  }
   217  
   218  // Next returns the next schema. If there are no more schemas, iterator.Done will be returned.
   219  func (s *SchemaIterator) Next() (*SchemaConfig, error) {
   220  	if s.err != nil {
   221  		return nil, s.err
   222  	}
   223  	pbs, err := s.it.Next()
   224  	if err != nil {
   225  		return nil, err
   226  	}
   227  	return protoToSchemaConfig(pbs), nil
   228  }
   229  
   230  // ListSchemaRevisions lists all schema revisions for the named schema.
   231  func (c *SchemaClient) ListSchemaRevisions(ctx context.Context, schemaID string, view SchemaView) *SchemaIterator {
   232  	return &SchemaIterator{
   233  		it: c.sc.ListSchemaRevisions(ctx, &pb.ListSchemaRevisionsRequest{
   234  			Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID),
   235  			View: pb.SchemaView(view),
   236  		}),
   237  	}
   238  }
   239  
   240  // CommitSchema commits a new schema revision to an existing schema.
   241  func (c *SchemaClient) CommitSchema(ctx context.Context, schemaID string, s SchemaConfig) (*SchemaConfig, error) {
   242  	req := &pb.CommitSchemaRequest{
   243  		Name:   fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID),
   244  		Schema: s.toProto(),
   245  	}
   246  	pbs, err := c.sc.CommitSchema(ctx, req)
   247  	if err != nil {
   248  		return nil, err
   249  	}
   250  	return protoToSchemaConfig(pbs), nil
   251  }
   252  
   253  // RollbackSchema creates a new schema revision that is a copy of the provided revision.
   254  func (c *SchemaClient) RollbackSchema(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error) {
   255  	req := &pb.RollbackSchemaRequest{
   256  		Name:       fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID),
   257  		RevisionId: revisionID,
   258  	}
   259  	pbs, err := c.sc.RollbackSchema(ctx, req)
   260  	if err != nil {
   261  		return nil, err
   262  	}
   263  	return protoToSchemaConfig(pbs), nil
   264  }
   265  
   266  // DeleteSchemaRevision deletes a specific schema revision.
   267  func (c *SchemaClient) DeleteSchemaRevision(ctx context.Context, schemaID, revisionID string) (*SchemaConfig, error) {
   268  	schemaPath := fmt.Sprintf("projects/%s/schemas/%s@%s", c.projectID, schemaID, revisionID)
   269  	schema, err := c.sc.DeleteSchemaRevision(ctx, &pb.DeleteSchemaRevisionRequest{
   270  		Name: schemaPath,
   271  	})
   272  	if err != nil {
   273  		return nil, err
   274  	}
   275  	return protoToSchemaConfig(schema), nil
   276  }
   277  
   278  // DeleteSchema deletes an existing schema given a schema ID.
   279  func (c *SchemaClient) DeleteSchema(ctx context.Context, schemaID string) error {
   280  	schemaPath := fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID)
   281  	return c.sc.DeleteSchema(ctx, &pb.DeleteSchemaRequest{
   282  		Name: schemaPath,
   283  	})
   284  }
   285  
   286  // ValidateSchemaResult is the response for the ValidateSchema method.
   287  // Reserved for future use.
   288  type ValidateSchemaResult struct{}
   289  
   290  // ValidateSchema validates a schema config and returns an error if invalid.
   291  func (c *SchemaClient) ValidateSchema(ctx context.Context, schema SchemaConfig) (*ValidateSchemaResult, error) {
   292  	req := &pb.ValidateSchemaRequest{
   293  		Parent: fmt.Sprintf("projects/%s", c.projectID),
   294  		Schema: schema.toProto(),
   295  	}
   296  	_, err := c.sc.ValidateSchema(ctx, req)
   297  	if err != nil {
   298  		return nil, err
   299  	}
   300  	return &ValidateSchemaResult{}, nil
   301  }
   302  
   303  // ValidateMessageResult is the response for the ValidateMessage method.
   304  // Reserved for future use.
   305  type ValidateMessageResult struct{}
   306  
   307  // ValidateMessageWithConfig validates a message against an schema specified
   308  // by a schema config.
   309  func (c *SchemaClient) ValidateMessageWithConfig(ctx context.Context, msg []byte, encoding SchemaEncoding, config SchemaConfig) (*ValidateMessageResult, error) {
   310  	req := &pb.ValidateMessageRequest{
   311  		Parent: fmt.Sprintf("projects/%s", c.projectID),
   312  		SchemaSpec: &pb.ValidateMessageRequest_Schema{
   313  			Schema: config.toProto(),
   314  		},
   315  		Message:  msg,
   316  		Encoding: pb.Encoding(encoding),
   317  	}
   318  	_, err := c.sc.ValidateMessage(ctx, req)
   319  	if err != nil {
   320  		return nil, err
   321  	}
   322  	return &ValidateMessageResult{}, nil
   323  }
   324  
   325  // ValidateMessageWithID validates a message against an schema specified
   326  // by the schema ID of an existing schema.
   327  func (c *SchemaClient) ValidateMessageWithID(ctx context.Context, msg []byte, encoding SchemaEncoding, schemaID string) (*ValidateMessageResult, error) {
   328  	req := &pb.ValidateMessageRequest{
   329  		Parent: fmt.Sprintf("projects/%s", c.projectID),
   330  		SchemaSpec: &pb.ValidateMessageRequest_Name{
   331  			Name: fmt.Sprintf("projects/%s/schemas/%s", c.projectID, schemaID),
   332  		},
   333  		Message:  msg,
   334  		Encoding: pb.Encoding(encoding),
   335  	}
   336  	_, err := c.sc.ValidateMessage(ctx, req)
   337  	if err != nil {
   338  		return nil, err
   339  	}
   340  	return &ValidateMessageResult{}, nil
   341  }
   342  

View as plain text