1 package watcherx
2
3 import (
4 "context"
5 "fmt"
6 "io/ioutil"
7 "net/http/httptest"
8 "os"
9 "path/filepath"
10 "strings"
11 "testing"
12 "time"
13
14 "github.com/ory/x/logrusx"
15
16 "github.com/sirupsen/logrus/hooks/test"
17 "github.com/stretchr/testify/assert"
18 "github.com/stretchr/testify/require"
19
20 "github.com/ory/herodot"
21 "github.com/ory/x/urlx"
22 )
23
24 func TestWatchWebsocket(t *testing.T) {
25 t.Run("case=forwards events", func(t *testing.T) {
26 ctx, c, dir, cancel := setup(t)
27 defer cancel()
28
29 hook := &test.Hook{}
30 l := logrusx.New("", "", logrusx.WithHook(hook))
31
32 fn := filepath.Join(dir, "some.file")
33 f, err := os.Create(fn)
34 require.NoError(t, err)
35
36 url, err := urlx.Parse("file://" + fn)
37 require.NoError(t, err)
38 t.Log(url)
39 handler, err := WatchAndServeWS(ctx, url, herodot.NewJSONWriter(l))
40 require.NoError(t, err)
41 s := httptest.NewServer(handler)
42
43 u := urlx.ParseOrPanic("ws" + strings.TrimLeft(s.URL, "http"))
44 _, err = WatchWebsocket(ctx, u, c)
45 require.NoError(t, err)
46
47 _, err = fmt.Fprint(f, "content here")
48 require.NoError(t, err)
49 require.NoError(t, f.Close())
50 assertChange(t, <-c, "content here", u.String()+fn)
51
52 require.NoError(t, os.Remove(fn))
53 assertRemove(t, <-c, u.String()+fn)
54
55 assert.Len(t, hook.Entries, 0, "%+v", hook.Entries)
56 })
57
58 t.Run("case=client closes itself on context cancel", func(t *testing.T) {
59 ctx1, c, dir, cancel1 := setup(t)
60 defer cancel1()
61
62 hook := &test.Hook{}
63 l := logrusx.New("", "", logrusx.WithHook(hook))
64
65 fn := filepath.Join(dir, "some.file")
66
67 handler, err := WatchAndServeWS(ctx1, urlx.ParseOrPanic("file://"+fn), herodot.NewJSONWriter(l))
68 require.NoError(t, err)
69 s := httptest.NewServer(handler)
70
71 ctx2, cancel2 := context.WithCancel(context.Background())
72 u := urlx.ParseOrPanic("ws" + strings.TrimLeft(s.URL, "http"))
73 _, err = WatchWebsocket(ctx2, u, c)
74 require.NoError(t, err)
75
76 cancel2()
77
78 e, ok := <-c
79 assert.False(t, ok, "%#v", e)
80
81 assert.Len(t, hook.Entries, 0, "%+v", hook.Entries)
82 })
83
84 t.Run("case=quits client watcher when server connection is closed", func(t *testing.T) {
85 ctxClient, c, dir, cancel := setup(t)
86 defer cancel()
87
88 hook := &test.Hook{}
89 l := logrusx.New("", "", logrusx.WithHook(hook))
90
91 fn := filepath.Join(dir, "some.file")
92
93 ctxServe, cancelServe := context.WithCancel(context.Background())
94 handler, err := WatchAndServeWS(ctxServe, urlx.ParseOrPanic("file://"+fn), herodot.NewJSONWriter(l))
95 require.NoError(t, err)
96 s := httptest.NewServer(handler)
97
98 u := urlx.ParseOrPanic("ws" + strings.TrimLeft(s.URL, "http"))
99 _, err = WatchWebsocket(ctxClient, u, c)
100 require.NoError(t, err)
101
102 cancelServe()
103
104 e, ok := <-c
105 assert.False(t, ok, "%#v", e)
106
107 assert.Len(t, hook.Entries, 0, "%+v", hook.Entries)
108 })
109
110 t.Run("case=successive watching works after client connection is closed", func(t *testing.T) {
111 ctxServer, c, dir, cancel := setup(t)
112 defer cancel()
113
114 hook := &test.Hook{}
115 l := logrusx.New("", "", logrusx.WithHook(hook))
116
117 fn := filepath.Join(dir, "some.file")
118
119 handler, err := WatchAndServeWS(ctxServer, urlx.ParseOrPanic("file://"+fn), herodot.NewJSONWriter(l))
120 require.NoError(t, err)
121 s := httptest.NewServer(handler)
122
123 ctxClient1, cancelClient1 := context.WithCancel(context.Background())
124 u := urlx.ParseOrPanic("ws" + strings.TrimLeft(s.URL, "http"))
125 _, err = WatchWebsocket(ctxClient1, u, c)
126 require.NoError(t, err)
127
128 cancelClient1()
129
130 _, ok := <-c
131 assert.False(t, ok)
132
133 ctxClient2, cancelClient2 := context.WithCancel(context.Background())
134 defer cancelClient2()
135 c2 := make(EventChannel)
136 _, err = WatchWebsocket(ctxClient2, u, c2)
137 require.NoError(t, err)
138
139 f, err := os.Create(fn)
140 require.NoError(t, err)
141 require.NoError(t, f.Close())
142
143 assertChange(t, <-c2, "", u.String()+fn)
144
145 assert.Len(t, hook.Entries, 0, "%+v", hook.Entries)
146 })
147
148 t.Run("case=broadcasts to multiple client connections", func(t *testing.T) {
149 ctxServer, c1, dir, cancel := setup(t)
150 defer cancel()
151
152 hook := &test.Hook{}
153 l := logrusx.New("", "", logrusx.WithHook(hook))
154
155 fn := filepath.Join(dir, "some.file")
156
157 handler, err := WatchAndServeWS(ctxServer, urlx.ParseOrPanic("file://"+fn), herodot.NewJSONWriter(l))
158 require.NoError(t, err)
159 s := httptest.NewServer(handler)
160
161 ctxClient1, cancelClient1 := context.WithCancel(context.Background())
162 defer cancelClient1()
163
164 u := urlx.ParseOrPanic("ws" + strings.TrimLeft(s.URL, "http"))
165 _, err = WatchWebsocket(ctxClient1, u, c1)
166 require.NoError(t, err)
167
168 ctxClient2, cancelClient2 := context.WithCancel(context.Background())
169 defer cancelClient2()
170 c2 := make(EventChannel)
171 _, err = WatchWebsocket(ctxClient2, u, c2)
172 require.NoError(t, err)
173
174 f, err := os.Create(fn)
175 require.NoError(t, err)
176 require.NoError(t, f.Close())
177
178 assertChange(t, <-c1, "", u.String()+fn)
179 assertChange(t, <-c2, "", u.String()+fn)
180
181 assert.Len(t, hook.Entries, 0, "%+v", hook.Entries)
182 })
183
184 t.Run("case=sends event when requested", func(t *testing.T) {
185 ctxServer, c, dir, cancel := setup(t)
186 defer cancel()
187
188
189 c = make(EventChannel, 1)
190
191 hook := &test.Hook{}
192 l := logrusx.New("", "", logrusx.WithHook(hook))
193
194 fn := filepath.Join(dir, "some.file")
195 initialContent := "initial content"
196 require.NoError(t, ioutil.WriteFile(fn, []byte(initialContent), 0600))
197
198 handler, err := WatchAndServeWS(ctxServer, urlx.ParseOrPanic("file://"+fn), herodot.NewJSONWriter(l))
199 require.NoError(t, err)
200 s := httptest.NewServer(handler)
201
202 ctxClient, cancelClient := context.WithCancel(context.Background())
203 defer cancelClient()
204
205 u := urlx.ParseOrPanic("ws" + strings.TrimLeft(s.URL, "http"))
206 d, err := WatchWebsocket(ctxClient, u, c)
207 require.NoError(t, err)
208 done, err := d.DispatchNow()
209 require.NoError(t, err)
210
211
212 select {
213 case <-time.After(time.Second):
214 t.Logf("Waiting for done timed out. %+v", <-c)
215 t.FailNow()
216 case eventsSend := <-done:
217 assert.Equal(t, 1, eventsSend)
218 }
219
220 assertChange(t, <-c, initialContent, u.String()+fn)
221
222 assert.Len(t, hook.Entries, 0, "%+v", hook.Entries)
223 })
224 }
225
View as plain text