1
2
3 package gcs
4
5 import (
6 "context"
7 "encoding/binary"
8 "encoding/json"
9 "errors"
10 "io"
11 "reflect"
12 "strings"
13 "testing"
14 "time"
15
16 "github.com/sirupsen/logrus"
17 )
18
19 type stitched struct {
20 io.ReadCloser
21 io.WriteCloser
22 }
23
24 func (s *stitched) Close() error {
25 s.ReadCloser.Close()
26 s.WriteCloser.Close()
27 return nil
28 }
29
30 func pipeConn() (*stitched, *stitched) {
31 r1, w1 := io.Pipe()
32 r2, w2 := io.Pipe()
33 return &stitched{r1, w2}, &stitched{r2, w1}
34 }
35
36 func sendMessage(t *testing.T, w io.Writer, typ msgType, id int64, msg []byte) {
37 t.Helper()
38 var h [16]byte
39 binary.LittleEndian.PutUint32(h[:], uint32(typ))
40 binary.LittleEndian.PutUint32(h[4:], uint32(len(msg)+16))
41 binary.LittleEndian.PutUint64(h[8:], uint64(id))
42 _, err := w.Write(h[:])
43 if err != nil {
44 t.Error(err)
45 return
46 }
47 _, err = w.Write(msg)
48 if err != nil {
49 t.Error(err)
50 return
51 }
52 }
53
54 func reflector(t *testing.T, rw io.ReadWriteCloser, delay time.Duration) {
55 t.Helper()
56 defer rw.Close()
57 for {
58 id, typ, msg, err := readMessage(rw)
59 if err != nil {
60 if err != io.EOF {
61 t.Error(err)
62 }
63 return
64 }
65 time.Sleep(delay)
66 typ ^= msgTypeResponse ^ msgTypeRequest
67 sendMessage(t, rw, typ, id, msg)
68 }
69 }
70
71 type testReq struct {
72 requestBase
73 X, Y int
74 }
75
76 type testResp struct {
77 responseBase
78 X, Y int
79 }
80
81 func startReflectedBridge(t *testing.T, delay time.Duration) *bridge {
82 t.Helper()
83 s, c := pipeConn()
84 b := newBridge(s, nil, logrus.NewEntry(logrus.StandardLogger()))
85 b.Start()
86 go reflector(t, c, delay)
87 return b
88 }
89
90 func TestBridgeRPC(t *testing.T) {
91 b := startReflectedBridge(t, 0)
92 defer b.Close()
93 req := testReq{X: 5}
94 var resp testResp
95 err := b.RPC(context.Background(), rpcCreate, &req, &resp, false)
96 if err != nil {
97 t.Fatal(err)
98 }
99 if req.X != resp.X || req.Y != resp.Y {
100 t.Fatalf("expected equal: %+v %+v", req, resp)
101 }
102 }
103
104 func TestBridgeRPCResponseTimeout(t *testing.T) {
105 b := startReflectedBridge(t, time.Minute)
106 defer b.Close()
107 b.Timeout = time.Millisecond * 100
108 req := testReq{X: 5}
109 var resp testResp
110 err := b.RPC(context.Background(), rpcCreate, &req, &resp, false)
111 if err == nil || !strings.Contains(err.Error(), "bridge closed") {
112 t.Fatalf("expected bridge disconnection, got %s", err)
113 }
114 }
115
116 func TestBridgeRPCContextDone(t *testing.T) {
117 b := startReflectedBridge(t, time.Minute)
118 defer b.Close()
119 b.Timeout = time.Millisecond * 250
120 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
121 defer cancel()
122 req := testReq{X: 5}
123 var resp testResp
124 err := b.RPC(ctx, rpcCreate, &req, &resp, true)
125 if err != context.DeadlineExceeded {
126 t.Fatalf("expected deadline exceeded, got %s", err)
127 }
128 }
129
130 func TestBridgeRPCContextDoneNoCancel(t *testing.T) {
131 b := startReflectedBridge(t, time.Minute)
132 defer b.Close()
133 b.Timeout = time.Millisecond * 250
134 ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
135 defer cancel()
136 req := testReq{X: 5}
137 var resp testResp
138 err := b.RPC(ctx, rpcCreate, &req, &resp, false)
139 if err == nil || !strings.Contains(err.Error(), "bridge closed") {
140 t.Fatalf("expected bridge disconnection, got %s", err)
141 }
142 }
143
144 func TestBridgeRPCBridgeClosed(t *testing.T) {
145 b := startReflectedBridge(t, 0)
146 eerr := errors.New("forcibly terminated")
147 b.kill(eerr)
148 err := b.RPC(context.Background(), rpcCreate, nil, nil, false)
149 if err != eerr {
150 t.Fatal("unexpected: ", err)
151 }
152 }
153
154 func sendJSON(t *testing.T, w io.Writer, typ msgType, id int64, msg interface{}) error {
155 t.Helper()
156 msgb, err := json.Marshal(msg)
157 if err != nil {
158 return err
159 }
160 sendMessage(t, w, typ, id, msgb)
161 return nil
162 }
163
164 func notifyThroughBridge(t *testing.T, typ msgType, msg interface{}, fn notifyFunc) error {
165 t.Helper()
166 s, c := pipeConn()
167 b := newBridge(s, fn, logrus.NewEntry(logrus.StandardLogger()))
168 b.Start()
169 err := sendJSON(t, c, typ, 0, msg)
170 if err != nil {
171 b.Close()
172 return err
173 }
174 time.Sleep(100 * time.Millisecond)
175 return b.Close()
176 }
177
178 func TestBridgeNotify(t *testing.T) {
179 ntf := &containerNotification{Operation: "testing"}
180 recvd := false
181 err := notifyThroughBridge(t, msgTypeNotify|notifyContainer, ntf, func(nntf *containerNotification) error {
182 if !reflect.DeepEqual(ntf, nntf) {
183 t.Errorf("%+v != %+v", ntf, nntf)
184 }
185 recvd = true
186 return nil
187 })
188 if err != nil {
189 t.Error("notify failed: ", err)
190 }
191 if !recvd {
192 t.Error("did not receive notification")
193 }
194 }
195
196 func TestBridgeNotifyFailure(t *testing.T) {
197 ntf := &containerNotification{Operation: "testing"}
198 errMsg := "notify should have failed"
199 err := notifyThroughBridge(t, msgTypeNotify|notifyContainer, ntf, func(nntf *containerNotification) error {
200 return errors.New(errMsg)
201 })
202 if err == nil || !strings.Contains(err.Error(), errMsg) {
203 t.Error("unexpected result: ", err)
204 }
205 }
206
View as plain text