1
16
17 package ttrpc
18
19 import (
20 "context"
21 "io"
22 "testing"
23
24 "github.com/containerd/ttrpc/internal"
25 )
26
27 func TestStreamClient(t *testing.T) {
28 var (
29 ctx = context.Background()
30 server = mustServer(t)(NewServer())
31 addr, listener = newTestListener(t)
32 client, cleanup = newTestClient(t, addr)
33 serviceName = "streamService"
34 )
35
36 defer listener.Close()
37 defer cleanup()
38
39 desc := &ServiceDesc{
40 Methods: map[string]Method{
41 "Echo": func(ctx context.Context, unmarshal func(interface{}) error) (interface{}, error) {
42 var req internal.EchoPayload
43 if err := unmarshal(&req); err != nil {
44 return nil, err
45 }
46 req.Seq++
47 return &req, nil
48 },
49 },
50 Streams: map[string]Stream{
51 "EchoStream": {
52 Handler: func(ctx context.Context, ss StreamServer) (interface{}, error) {
53 for {
54 var req internal.EchoPayload
55 if err := ss.RecvMsg(&req); err != nil {
56 if err == io.EOF {
57 err = nil
58 }
59 return nil, err
60 }
61 req.Seq++
62 if err := ss.SendMsg(&req); err != nil {
63 return nil, err
64 }
65 }
66
67 },
68 StreamingClient: true,
69 StreamingServer: true,
70 },
71 },
72 }
73 server.RegisterService(serviceName, desc)
74
75 go server.Serve(ctx, listener)
76 defer server.Shutdown(ctx)
77
78
79 var req, resp internal.EchoPayload
80 if err := client.Call(ctx, serviceName, "Echo", &req, &resp); err != nil {
81 t.Fatal(err)
82 }
83
84 stream, err := client.NewStream(ctx, &StreamDesc{true, true}, serviceName, "EchoStream", nil)
85 if err != nil {
86 t.Fatal(err)
87 }
88 for i := 1; i <= 100; i++ {
89 req := internal.EchoPayload{
90 Seq: int64(i),
91 Msg: "should be returned",
92 }
93 if err := stream.SendMsg(&req); err != nil {
94 t.Fatalf("%d: %v", i, err)
95 }
96 var resp internal.EchoPayload
97 if err := stream.RecvMsg(&resp); err != nil {
98 t.Fatalf("%d: %v", i, err)
99 }
100 if resp.Seq != int64(i)+1 {
101 t.Fatalf("%d: unexpected sequence value: %d, expected %d", i, resp.Seq, i+1)
102 }
103 if resp.Msg != req.Msg {
104 t.Fatalf("%d: unexpected message: %q, expected %q", i, resp.Msg, req.Msg)
105 }
106 }
107 if err := stream.CloseSend(); err != nil {
108 t.Fatal(err)
109 }
110
111 err = stream.RecvMsg(&resp)
112 if err == nil {
113 t.Fatal("expected io.EOF after close send")
114 }
115 if err != io.EOF {
116 t.Fatalf("expected io.EOF after close send, got %v", err)
117 }
118 }
119
View as plain text