...

Source file src/github.com/googleapis/gax-go/v2/proto_json_stream.go

Documentation: github.com/googleapis/gax-go/v2

     1  // Copyright 2022, Google Inc.
     2  // All rights reserved.
     3  //
     4  // Redistribution and use in source and binary forms, with or without
     5  // modification, are permitted provided that the following conditions are
     6  // met:
     7  //
     8  //     * Redistributions of source code must retain the above copyright
     9  // notice, this list of conditions and the following disclaimer.
    10  //     * Redistributions in binary form must reproduce the above
    11  // copyright notice, this list of conditions and the following disclaimer
    12  // in the documentation and/or other materials provided with the
    13  // distribution.
    14  //     * Neither the name of Google Inc. nor the names of its
    15  // contributors may be used to endorse or promote products derived from
    16  // this software without specific prior written permission.
    17  //
    18  // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
    19  // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
    20  // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
    21  // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
    22  // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
    23  // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
    24  // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
    25  // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
    26  // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
    27  // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
    28  // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
    29  
    30  package gax
    31  
    32  import (
    33  	"encoding/json"
    34  	"errors"
    35  	"io"
    36  
    37  	"google.golang.org/protobuf/encoding/protojson"
    38  	"google.golang.org/protobuf/proto"
    39  	"google.golang.org/protobuf/reflect/protoreflect"
    40  )
    41  
    42  var (
    43  	arrayOpen     = json.Delim('[')
    44  	arrayClose    = json.Delim(']')
    45  	errBadOpening = errors.New("unexpected opening token, expected '['")
    46  )
    47  
    48  // ProtoJSONStream represents a wrapper for consuming a stream of protobuf
    49  // messages encoded using protobuf-JSON format. More information on this format
    50  // can be found at https://developers.google.com/protocol-buffers/docs/proto3#json.
    51  // The stream must appear as a comma-delimited, JSON array of obbjects with
    52  // opening and closing square braces.
    53  //
    54  // This is for internal use only.
    55  type ProtoJSONStream struct {
    56  	first, closed bool
    57  	reader        io.ReadCloser
    58  	stream        *json.Decoder
    59  	typ           protoreflect.MessageType
    60  }
    61  
    62  // NewProtoJSONStreamReader accepts a stream of bytes via an io.ReadCloser that are
    63  // protobuf-JSON encoded protobuf messages of the given type. The ProtoJSONStream
    64  // must be closed when done.
    65  //
    66  // This is for internal use only.
    67  func NewProtoJSONStreamReader(rc io.ReadCloser, typ protoreflect.MessageType) *ProtoJSONStream {
    68  	return &ProtoJSONStream{
    69  		first:  true,
    70  		reader: rc,
    71  		stream: json.NewDecoder(rc),
    72  		typ:    typ,
    73  	}
    74  }
    75  
    76  // Recv decodes the next protobuf message in the stream or returns io.EOF if
    77  // the stream is done. It is not safe to call Recv on the same stream from
    78  // different goroutines, just like it is not safe to do so with a single gRPC
    79  // stream. Type-cast the protobuf message returned to the type provided at
    80  // ProtoJSONStream creation.
    81  // Calls to Recv after calling Close will produce io.EOF.
    82  func (s *ProtoJSONStream) Recv() (proto.Message, error) {
    83  	if s.closed {
    84  		return nil, io.EOF
    85  	}
    86  	if s.first {
    87  		s.first = false
    88  
    89  		// Consume the opening '[' so Decode gets one object at a time.
    90  		if t, err := s.stream.Token(); err != nil {
    91  			return nil, err
    92  		} else if t != arrayOpen {
    93  			return nil, errBadOpening
    94  		}
    95  	}
    96  
    97  	// Capture the next block of data for the item (a JSON object) in the stream.
    98  	var raw json.RawMessage
    99  	if err := s.stream.Decode(&raw); err != nil {
   100  		e := err
   101  		// To avoid checking the first token of each stream, just attempt to
   102  		// Decode the next blob and if that fails, double check if it is just
   103  		// the closing token ']'. If it is the closing, return io.EOF. If it
   104  		// isn't, return the original error.
   105  		if t, _ := s.stream.Token(); t == arrayClose {
   106  			e = io.EOF
   107  		}
   108  		return nil, e
   109  	}
   110  
   111  	// Initialize a new instance of the protobuf message to unmarshal the
   112  	// raw data into.
   113  	m := s.typ.New().Interface()
   114  	unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true}
   115  	err := unm.Unmarshal(raw, m)
   116  
   117  	return m, err
   118  }
   119  
   120  // Close closes the stream so that resources are cleaned up.
   121  func (s *ProtoJSONStream) Close() error {
   122  	// Dereference the *json.Decoder so that the memory is gc'd.
   123  	s.stream = nil
   124  	s.closed = true
   125  
   126  	return s.reader.Close()
   127  }
   128  

View as plain text