1
2
3 package streaming
4
5 import (
6 context "context"
7 ttrpc "github.com/containerd/ttrpc"
8 emptypb "google.golang.org/protobuf/types/known/emptypb"
9 )
10
11 type TTRPCStreamingService interface {
12 Echo(context.Context, *EchoPayload) (*EchoPayload, error)
13 EchoStream(context.Context, TTRPCStreaming_EchoStreamServer) error
14 SumStream(context.Context, TTRPCStreaming_SumStreamServer) (*Sum, error)
15 DivideStream(context.Context, *Sum, TTRPCStreaming_DivideStreamServer) error
16 EchoNull(context.Context, TTRPCStreaming_EchoNullServer) (*emptypb.Empty, error)
17 EchoNullStream(context.Context, TTRPCStreaming_EchoNullStreamServer) error
18 }
19
20 type TTRPCStreaming_EchoStreamServer interface {
21 Send(*EchoPayload) error
22 Recv() (*EchoPayload, error)
23 ttrpc.StreamServer
24 }
25
26 type ttrpcstreamingEchoStreamServer struct {
27 ttrpc.StreamServer
28 }
29
30 func (x *ttrpcstreamingEchoStreamServer) Send(m *EchoPayload) error {
31 return x.StreamServer.SendMsg(m)
32 }
33
34 func (x *ttrpcstreamingEchoStreamServer) Recv() (*EchoPayload, error) {
35 m := new(EchoPayload)
36 if err := x.StreamServer.RecvMsg(m); err != nil {
37 return nil, err
38 }
39 return m, nil
40 }
41
42 type TTRPCStreaming_SumStreamServer interface {
43 Recv() (*Part, error)
44 ttrpc.StreamServer
45 }
46
47 type ttrpcstreamingSumStreamServer struct {
48 ttrpc.StreamServer
49 }
50
51 func (x *ttrpcstreamingSumStreamServer) Recv() (*Part, error) {
52 m := new(Part)
53 if err := x.StreamServer.RecvMsg(m); err != nil {
54 return nil, err
55 }
56 return m, nil
57 }
58
59 type TTRPCStreaming_DivideStreamServer interface {
60 Send(*Part) error
61 ttrpc.StreamServer
62 }
63
64 type ttrpcstreamingDivideStreamServer struct {
65 ttrpc.StreamServer
66 }
67
68 func (x *ttrpcstreamingDivideStreamServer) Send(m *Part) error {
69 return x.StreamServer.SendMsg(m)
70 }
71
72 type TTRPCStreaming_EchoNullServer interface {
73 Recv() (*EchoPayload, error)
74 ttrpc.StreamServer
75 }
76
77 type ttrpcstreamingEchoNullServer struct {
78 ttrpc.StreamServer
79 }
80
81 func (x *ttrpcstreamingEchoNullServer) Recv() (*EchoPayload, error) {
82 m := new(EchoPayload)
83 if err := x.StreamServer.RecvMsg(m); err != nil {
84 return nil, err
85 }
86 return m, nil
87 }
88
89 type TTRPCStreaming_EchoNullStreamServer interface {
90 Send(*emptypb.Empty) error
91 Recv() (*EchoPayload, error)
92 ttrpc.StreamServer
93 }
94
95 type ttrpcstreamingEchoNullStreamServer struct {
96 ttrpc.StreamServer
97 }
98
99 func (x *ttrpcstreamingEchoNullStreamServer) Send(m *emptypb.Empty) error {
100 return x.StreamServer.SendMsg(m)
101 }
102
103 func (x *ttrpcstreamingEchoNullStreamServer) Recv() (*EchoPayload, error) {
104 m := new(EchoPayload)
105 if err := x.StreamServer.RecvMsg(m); err != nil {
106 return nil, err
107 }
108 return m, nil
109 }
110
111 func RegisterTTRPCStreamingService(srv *ttrpc.Server, svc TTRPCStreamingService) {
112 srv.RegisterService("ttrpc.integration.streaming.Streaming", &ttrpc.ServiceDesc{
113 Methods: map[string]ttrpc.Method{
114 "Echo": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
115 var req EchoPayload
116 if err := unmarshal(&req); err != nil {
117 return nil, err
118 }
119 return svc.Echo(ctx, &req)
120 },
121 },
122 Streams: map[string]ttrpc.Stream{
123 "EchoStream": {
124 Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
125 return nil, svc.EchoStream(ctx, &ttrpcstreamingEchoStreamServer{stream})
126 },
127 StreamingClient: true,
128 StreamingServer: true,
129 },
130 "SumStream": {
131 Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
132 return svc.SumStream(ctx, &ttrpcstreamingSumStreamServer{stream})
133 },
134 StreamingClient: true,
135 StreamingServer: false,
136 },
137 "DivideStream": {
138 Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
139 m := new(Sum)
140 if err := stream.RecvMsg(m); err != nil {
141 return nil, err
142 }
143 return nil, svc.DivideStream(ctx, m, &ttrpcstreamingDivideStreamServer{stream})
144 },
145 StreamingClient: false,
146 StreamingServer: true,
147 },
148 "EchoNull": {
149 Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
150 return svc.EchoNull(ctx, &ttrpcstreamingEchoNullServer{stream})
151 },
152 StreamingClient: true,
153 StreamingServer: false,
154 },
155 "EchoNullStream": {
156 Handler: func(ctx context.Context, stream ttrpc.StreamServer) (interface{}, error) {
157 return nil, svc.EchoNullStream(ctx, &ttrpcstreamingEchoNullStreamServer{stream})
158 },
159 StreamingClient: true,
160 StreamingServer: true,
161 },
162 },
163 })
164 }
165
166 type TTRPCStreamingClient interface {
167 Echo(context.Context, *EchoPayload) (*EchoPayload, error)
168 EchoStream(context.Context) (TTRPCStreaming_EchoStreamClient, error)
169 SumStream(context.Context) (TTRPCStreaming_SumStreamClient, error)
170 DivideStream(context.Context, *Sum) (TTRPCStreaming_DivideStreamClient, error)
171 EchoNull(context.Context) (TTRPCStreaming_EchoNullClient, error)
172 EchoNullStream(context.Context) (TTRPCStreaming_EchoNullStreamClient, error)
173 }
174
175 type ttrpcstreamingClient struct {
176 client *ttrpc.Client
177 }
178
179 func NewTTRPCStreamingClient(client *ttrpc.Client) TTRPCStreamingClient {
180 return &ttrpcstreamingClient{
181 client: client,
182 }
183 }
184
185 func (c *ttrpcstreamingClient) Echo(ctx context.Context, req *EchoPayload) (*EchoPayload, error) {
186 var resp EchoPayload
187 if err := c.client.Call(ctx, "ttrpc.integration.streaming.Streaming", "Echo", req, &resp); err != nil {
188 return nil, err
189 }
190 return &resp, nil
191 }
192
193 func (c *ttrpcstreamingClient) EchoStream(ctx context.Context) (TTRPCStreaming_EchoStreamClient, error) {
194 stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
195 StreamingClient: true,
196 StreamingServer: true,
197 }, "ttrpc.integration.streaming.Streaming", "EchoStream", nil)
198 if err != nil {
199 return nil, err
200 }
201 x := &ttrpcstreamingEchoStreamClient{stream}
202 return x, nil
203 }
204
205 type TTRPCStreaming_EchoStreamClient interface {
206 Send(*EchoPayload) error
207 Recv() (*EchoPayload, error)
208 ttrpc.ClientStream
209 }
210
211 type ttrpcstreamingEchoStreamClient struct {
212 ttrpc.ClientStream
213 }
214
215 func (x *ttrpcstreamingEchoStreamClient) Send(m *EchoPayload) error {
216 return x.ClientStream.SendMsg(m)
217 }
218
219 func (x *ttrpcstreamingEchoStreamClient) Recv() (*EchoPayload, error) {
220 m := new(EchoPayload)
221 if err := x.ClientStream.RecvMsg(m); err != nil {
222 return nil, err
223 }
224 return m, nil
225 }
226
227 func (c *ttrpcstreamingClient) SumStream(ctx context.Context) (TTRPCStreaming_SumStreamClient, error) {
228 stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
229 StreamingClient: true,
230 StreamingServer: false,
231 }, "ttrpc.integration.streaming.Streaming", "SumStream", nil)
232 if err != nil {
233 return nil, err
234 }
235 x := &ttrpcstreamingSumStreamClient{stream}
236 return x, nil
237 }
238
239 type TTRPCStreaming_SumStreamClient interface {
240 Send(*Part) error
241 CloseAndRecv() (*Sum, error)
242 ttrpc.ClientStream
243 }
244
245 type ttrpcstreamingSumStreamClient struct {
246 ttrpc.ClientStream
247 }
248
249 func (x *ttrpcstreamingSumStreamClient) Send(m *Part) error {
250 return x.ClientStream.SendMsg(m)
251 }
252
253 func (x *ttrpcstreamingSumStreamClient) CloseAndRecv() (*Sum, error) {
254 if err := x.ClientStream.CloseSend(); err != nil {
255 return nil, err
256 }
257 m := new(Sum)
258 if err := x.ClientStream.RecvMsg(m); err != nil {
259 return nil, err
260 }
261 return m, nil
262 }
263
264 func (c *ttrpcstreamingClient) DivideStream(ctx context.Context, req *Sum) (TTRPCStreaming_DivideStreamClient, error) {
265 stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
266 StreamingClient: false,
267 StreamingServer: true,
268 }, "ttrpc.integration.streaming.Streaming", "DivideStream", req)
269 if err != nil {
270 return nil, err
271 }
272 x := &ttrpcstreamingDivideStreamClient{stream}
273 return x, nil
274 }
275
276 type TTRPCStreaming_DivideStreamClient interface {
277 Recv() (*Part, error)
278 ttrpc.ClientStream
279 }
280
281 type ttrpcstreamingDivideStreamClient struct {
282 ttrpc.ClientStream
283 }
284
285 func (x *ttrpcstreamingDivideStreamClient) Recv() (*Part, error) {
286 m := new(Part)
287 if err := x.ClientStream.RecvMsg(m); err != nil {
288 return nil, err
289 }
290 return m, nil
291 }
292
293 func (c *ttrpcstreamingClient) EchoNull(ctx context.Context) (TTRPCStreaming_EchoNullClient, error) {
294 stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
295 StreamingClient: true,
296 StreamingServer: false,
297 }, "ttrpc.integration.streaming.Streaming", "EchoNull", nil)
298 if err != nil {
299 return nil, err
300 }
301 x := &ttrpcstreamingEchoNullClient{stream}
302 return x, nil
303 }
304
305 type TTRPCStreaming_EchoNullClient interface {
306 Send(*EchoPayload) error
307 CloseAndRecv() (*emptypb.Empty, error)
308 ttrpc.ClientStream
309 }
310
311 type ttrpcstreamingEchoNullClient struct {
312 ttrpc.ClientStream
313 }
314
315 func (x *ttrpcstreamingEchoNullClient) Send(m *EchoPayload) error {
316 return x.ClientStream.SendMsg(m)
317 }
318
319 func (x *ttrpcstreamingEchoNullClient) CloseAndRecv() (*emptypb.Empty, error) {
320 if err := x.ClientStream.CloseSend(); err != nil {
321 return nil, err
322 }
323 m := new(emptypb.Empty)
324 if err := x.ClientStream.RecvMsg(m); err != nil {
325 return nil, err
326 }
327 return m, nil
328 }
329
330 func (c *ttrpcstreamingClient) EchoNullStream(ctx context.Context) (TTRPCStreaming_EchoNullStreamClient, error) {
331 stream, err := c.client.NewStream(ctx, &ttrpc.StreamDesc{
332 StreamingClient: true,
333 StreamingServer: true,
334 }, "ttrpc.integration.streaming.Streaming", "EchoNullStream", nil)
335 if err != nil {
336 return nil, err
337 }
338 x := &ttrpcstreamingEchoNullStreamClient{stream}
339 return x, nil
340 }
341
342 type TTRPCStreaming_EchoNullStreamClient interface {
343 Send(*EchoPayload) error
344 Recv() (*emptypb.Empty, error)
345 ttrpc.ClientStream
346 }
347
348 type ttrpcstreamingEchoNullStreamClient struct {
349 ttrpc.ClientStream
350 }
351
352 func (x *ttrpcstreamingEchoNullStreamClient) Send(m *EchoPayload) error {
353 return x.ClientStream.SendMsg(m)
354 }
355
356 func (x *ttrpcstreamingEchoNullStreamClient) Recv() (*emptypb.Empty, error) {
357 m := new(emptypb.Empty)
358 if err := x.ClientStream.RecvMsg(m); err != nil {
359 return nil, err
360 }
361 return m, nil
362 }
363
View as plain text