1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30 package gax
31
32 import (
33 "bytes"
34 "errors"
35 "io"
36 "io/ioutil"
37 "testing"
38 "time"
39
40 "github.com/google/go-cmp/cmp"
41 serviceconfigpb "google.golang.org/genproto/googleapis/api/serviceconfig"
42 "google.golang.org/genproto/googleapis/rpc/code"
43 "google.golang.org/genproto/googleapis/rpc/status"
44 "google.golang.org/protobuf/encoding/protojson"
45 "google.golang.org/protobuf/proto"
46 "google.golang.org/protobuf/reflect/protoreflect"
47 "google.golang.org/protobuf/types/known/anypb"
48 "google.golang.org/protobuf/types/known/durationpb"
49 )
50
51 func TestRecv(t *testing.T) {
52 locations := []proto.Message{
53 &serviceconfigpb.Property{
54 Name: "property1",
55 Type: serviceconfigpb.Property_STRING,
56 Description: "Property 1",
57 },
58 &serviceconfigpb.Property{
59 Name: "property2",
60 Type: serviceconfigpb.Property_STRING,
61 Description: "Property 2",
62 },
63 &serviceconfigpb.Property{
64 Name: "property3",
65 Type: serviceconfigpb.Property_STRING,
66 Description: "Property 3",
67 },
68 }
69
70 durations := []proto.Message{
71 durationpb.New(time.Second),
72 durationpb.New(time.Minute),
73 durationpb.New(time.Hour),
74 }
75
76 detail, err := anypb.New(locations[0])
77 if err != nil {
78 t.Fatal(err)
79 }
80 nested := []proto.Message{
81 &status.Status{
82 Code: int32(code.Code_INTERNAL),
83 Message: "oops",
84 Details: []*anypb.Any{
85 detail,
86 },
87 },
88 }
89
90 for _, tst := range []struct {
91 name string
92 want []proto.Message
93 typ protoreflect.MessageType
94 }{
95 {
96 name: "empty",
97 },
98 {
99 name: "simple_locations",
100 want: locations,
101 typ: locations[0].ProtoReflect().Type(),
102 },
103 {
104
105 name: "message_as_primitive",
106 want: durations,
107 typ: durations[0].ProtoReflect().Type(),
108 },
109 {
110 name: "nested",
111 want: nested,
112 typ: nested[0].ProtoReflect().Type(),
113 },
114 } {
115 s, err := prepareStream(tst.want)
116 if err != nil {
117 t.Errorf("%s: %v", tst.name, err)
118 continue
119 }
120 stream := NewProtoJSONStreamReader(s, tst.typ)
121 defer stream.Close()
122
123 got, err := stream.Recv()
124 for ndx := 0; err == nil; ndx++ {
125 if diff := cmp.Diff(got, tst.want[ndx], cmp.Comparer(proto.Equal)); diff != "" {
126 t.Errorf("%s: got(-),want(+):\n%s", tst.name, diff)
127 }
128 got, err = stream.Recv()
129 }
130 if !errors.Is(err, io.EOF) {
131 t.Errorf("%s: expected %v but got %v", tst.name, io.EOF, err)
132 }
133 }
134 }
135
136 func TestRecvAfterClose(t *testing.T) {
137 empty := ioutil.NopCloser(bytes.NewReader([]byte("[]")))
138 s := NewProtoJSONStreamReader(empty, nil)
139 if _, err := s.Recv(); !errors.Is(err, io.EOF) {
140 t.Errorf("Expected %v but got %v", io.EOF, err)
141 }
142
143
144 s.Close()
145 if _, err := s.Recv(); !errors.Is(err, io.EOF) {
146 t.Errorf("Expected %v after close but got %v", io.EOF, err)
147 }
148
149 }
150
151 func TestRecvError(t *testing.T) {
152 noOpening := ioutil.NopCloser(bytes.NewReader([]byte{'{'}))
153 s := NewProtoJSONStreamReader(noOpening, nil)
154 if _, err := s.Recv(); !errors.Is(err, errBadOpening) {
155 t.Errorf("Expected %v but got %v", errBadOpening, err)
156 }
157 }
158
159 func prepareStream(messages []proto.Message) (io.ReadCloser, error) {
160 if len(messages) == 0 {
161 return ioutil.NopCloser(bytes.NewReader([]byte("[]"))), nil
162 }
163
164 data := []byte("[")
165 mo := protojson.MarshalOptions{AllowPartial: true, UseEnumNumbers: true}
166 for _, m := range messages {
167 d, err := mo.Marshal(m)
168 if err != nil {
169 return nil, err
170 }
171 data = append(data, d...)
172 data = append(data, ',')
173 }
174
175 data[len(data)-1] = ']'
176 return ioutil.NopCloser(bytes.NewReader(data)), nil
177 }
178
View as plain text