1 package watcherx
2
3 import (
4 "context"
5 "fmt"
6 "net"
7 "net/url"
8 "strings"
9
10 "github.com/gorilla/websocket"
11 "github.com/pkg/errors"
12 )
13
14 func WatchWebsocket(ctx context.Context, u *url.URL, c EventChannel) (Watcher, error) {
15 conn, _, err := websocket.DefaultDialer.Dial(u.String(), nil)
16 if err != nil {
17 return nil, errors.WithStack(err)
18 }
19
20 wsClosed := make(chan struct{})
21 go cleanupOnDone(ctx, conn, c, wsClosed)
22
23 d := newDispatcher()
24
25 go forwardWebsocketEvents(conn, c, u, wsClosed, d.done)
26
27 go forwardDispatchNow(ctx, conn, c, d.trigger, u.String())
28
29 return d, nil
30 }
31
32 func cleanupOnDone(ctx context.Context, conn *websocket.Conn, c EventChannel, wsClosed <-chan struct{}) {
33
34 select {
35 case <-ctx.Done():
36 case <-wsClosed:
37 }
38
39
40 close(c)
41
42
43 _ = conn.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, "context canceled by server"))
44 _ = conn.Close()
45 }
46
47 func forwardWebsocketEvents(ws *websocket.Conn, c EventChannel, u *url.URL, wsClosed chan<- struct{}, sendNowDone chan<- int) {
48 serverURL := source(u.String())
49
50 defer func() {
51
52 close(wsClosed)
53 }()
54
55 for {
56
57 _, msg, err := ws.ReadMessage()
58 if err != nil {
59 if closeErr, ok := err.(*websocket.CloseError); ok && closeErr.Code == websocket.CloseNormalClosure {
60 return
61 }
62
63 if opErr, ok := err.(*net.OpError); ok && opErr.Op == "read" && strings.Contains(opErr.Err.Error(), "closed") {
64 return
65 }
66 c <- &ErrorEvent{
67 error: errors.WithStack(err),
68 source: serverURL,
69 }
70 return
71 }
72
73 var eventsSend int
74 _, err = fmt.Sscanf(string(msg), messageSendNowDone, &eventsSend)
75 if err == nil {
76 sendNowDone <- eventsSend
77 continue
78 }
79
80 e, err := unmarshalEvent(msg)
81 if err != nil {
82 c <- &ErrorEvent{
83 error: err,
84 source: serverURL,
85 }
86 continue
87 }
88 localURL := *u
89 localURL.Path = e.Source()
90 e.setSource(localURL.String())
91 c <- e
92 }
93 }
94
95 func forwardDispatchNow(ctx context.Context, ws *websocket.Conn, c EventChannel, sendNow <-chan struct{}, serverURL string) {
96 for {
97 select {
98 case <-ctx.Done():
99 return
100 case _, ok := <-sendNow:
101 if !ok {
102 return
103 }
104
105 if err := ws.WriteMessage(websocket.TextMessage, []byte(messageSendNow)); err != nil {
106 c <- &ErrorEvent{
107 source: source(serverURL),
108 error: err,
109 }
110 }
111 }
112 }
113 }
114
View as plain text