1 // Copyright 2022, Google Inc. 2 // All rights reserved. 3 // 4 // Redistribution and use in source and binary forms, with or without 5 // modification, are permitted provided that the following conditions are 6 // met: 7 // 8 // * Redistributions of source code must retain the above copyright 9 // notice, this list of conditions and the following disclaimer. 10 // * Redistributions in binary form must reproduce the above 11 // copyright notice, this list of conditions and the following disclaimer 12 // in the documentation and/or other materials provided with the 13 // distribution. 14 // * Neither the name of Google Inc. nor the names of its 15 // contributors may be used to endorse or promote products derived from 16 // this software without specific prior written permission. 17 // 18 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 19 // "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 20 // LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 21 // A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 22 // OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 23 // SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 24 // LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 25 // DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 26 // THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 27 // (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 28 // OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 29 30 package gax 31 32 import ( 33 "encoding/json" 34 "errors" 35 "io" 36 37 "google.golang.org/protobuf/encoding/protojson" 38 "google.golang.org/protobuf/proto" 39 "google.golang.org/protobuf/reflect/protoreflect" 40 ) 41 42 var ( 43 arrayOpen = json.Delim('[') 44 arrayClose = json.Delim(']') 45 errBadOpening = errors.New("unexpected opening token, expected '['") 46 ) 47 48 // ProtoJSONStream represents a wrapper for consuming a stream of protobuf 49 // messages encoded using protobuf-JSON format. More information on this format 50 // can be found at https://developers.google.com/protocol-buffers/docs/proto3#json. 51 // The stream must appear as a comma-delimited, JSON array of obbjects with 52 // opening and closing square braces. 53 // 54 // This is for internal use only. 55 type ProtoJSONStream struct { 56 first, closed bool 57 reader io.ReadCloser 58 stream *json.Decoder 59 typ protoreflect.MessageType 60 } 61 62 // NewProtoJSONStreamReader accepts a stream of bytes via an io.ReadCloser that are 63 // protobuf-JSON encoded protobuf messages of the given type. The ProtoJSONStream 64 // must be closed when done. 65 // 66 // This is for internal use only. 67 func NewProtoJSONStreamReader(rc io.ReadCloser, typ protoreflect.MessageType) *ProtoJSONStream { 68 return &ProtoJSONStream{ 69 first: true, 70 reader: rc, 71 stream: json.NewDecoder(rc), 72 typ: typ, 73 } 74 } 75 76 // Recv decodes the next protobuf message in the stream or returns io.EOF if 77 // the stream is done. It is not safe to call Recv on the same stream from 78 // different goroutines, just like it is not safe to do so with a single gRPC 79 // stream. Type-cast the protobuf message returned to the type provided at 80 // ProtoJSONStream creation. 81 // Calls to Recv after calling Close will produce io.EOF. 82 func (s *ProtoJSONStream) Recv() (proto.Message, error) { 83 if s.closed { 84 return nil, io.EOF 85 } 86 if s.first { 87 s.first = false 88 89 // Consume the opening '[' so Decode gets one object at a time. 90 if t, err := s.stream.Token(); err != nil { 91 return nil, err 92 } else if t != arrayOpen { 93 return nil, errBadOpening 94 } 95 } 96 97 // Capture the next block of data for the item (a JSON object) in the stream. 98 var raw json.RawMessage 99 if err := s.stream.Decode(&raw); err != nil { 100 e := err 101 // To avoid checking the first token of each stream, just attempt to 102 // Decode the next blob and if that fails, double check if it is just 103 // the closing token ']'. If it is the closing, return io.EOF. If it 104 // isn't, return the original error. 105 if t, _ := s.stream.Token(); t == arrayClose { 106 e = io.EOF 107 } 108 return nil, e 109 } 110 111 // Initialize a new instance of the protobuf message to unmarshal the 112 // raw data into. 113 m := s.typ.New().Interface() 114 unm := protojson.UnmarshalOptions{AllowPartial: true, DiscardUnknown: true} 115 err := unm.Unmarshal(raw, m) 116 117 return m, err 118 } 119 120 // Close closes the stream so that resources are cleaned up. 121 func (s *ProtoJSONStream) Close() error { 122 // Dereference the *json.Decoder so that the memory is gc'd. 123 s.stream = nil 124 s.closed = true 125 126 return s.reader.Close() 127 } 128