...
1
16
17 package ttrpc
18
19 import (
20 "bytes"
21 "errors"
22 "io"
23 "net"
24 "reflect"
25 "testing"
26
27 "google.golang.org/grpc/codes"
28 "google.golang.org/grpc/status"
29 )
30
31 func TestReadWriteMessage(t *testing.T) {
32 var (
33 w, r = net.Pipe()
34 ch = newChannel(w)
35 rch = newChannel(r)
36 messages = [][]byte{
37 []byte("hello"),
38 []byte("this is a test"),
39 []byte("of message framing"),
40 }
41 received [][]byte
42 errs = make(chan error, 1)
43 )
44
45 go func() {
46 for i, msg := range messages {
47 if err := ch.send(uint32(i), 1, 0, msg); err != nil {
48 errs <- err
49 return
50 }
51 }
52
53 w.Close()
54 }()
55
56 for {
57 _, p, err := rch.recv()
58 if err != nil {
59 if !errors.Is(err, io.EOF) {
60 t.Fatal(err)
61 }
62
63 break
64 }
65 received = append(received, p)
66
67
68 select {
69 case err := <-errs:
70 if err != nil {
71 t.Fatal(err)
72 }
73 default:
74 }
75 }
76
77 if !reflect.DeepEqual(received, messages) {
78 t.Fatalf("didn't received expected set of messages: %v != %v", received, messages)
79 }
80
81 select {
82 case err := <-errs:
83 if err != nil {
84 t.Fatal(err)
85 }
86 default:
87 }
88 }
89
90 func TestMessageOversize(t *testing.T) {
91 var (
92 w, r = net.Pipe()
93 wch, rch = newChannel(w), newChannel(r)
94 msg = bytes.Repeat([]byte("a message of massive length"), 512<<10)
95 errs = make(chan error, 1)
96 )
97
98 go func() {
99 if err := wch.send(1, 1, 0, msg); err != nil {
100 errs <- err
101 }
102 }()
103
104 _, _, err := rch.recv()
105 if err == nil {
106 t.Fatalf("error expected reading with small buffer")
107 }
108
109 status, ok := status.FromError(err)
110 if !ok {
111 t.Fatalf("expected grpc status error: %v", err)
112 }
113
114 if status.Code() != codes.ResourceExhausted {
115 t.Fatalf("expected grpc status code: %v != %v", status.Code(), codes.ResourceExhausted)
116 }
117
118 select {
119 case err := <-errs:
120 if err != nil {
121 t.Fatal(err)
122 }
123 default:
124 }
125 }
126
View as plain text