...
1
2
3
4
5 package websocket
6
7 import (
8 "bytes"
9 "net"
10 "sync"
11 "time"
12 )
13
14
15
16
17
18
19 type PreparedMessage struct {
20 messageType int
21 data []byte
22 mu sync.Mutex
23 frames map[prepareKey]*preparedFrame
24 }
25
26
27 type prepareKey struct {
28 isServer bool
29 compress bool
30 compressionLevel int
31 }
32
33
34 type preparedFrame struct {
35 once sync.Once
36 data []byte
37 }
38
39
40
41
42
43 func NewPreparedMessage(messageType int, data []byte) (*PreparedMessage, error) {
44 pm := &PreparedMessage{
45 messageType: messageType,
46 frames: make(map[prepareKey]*preparedFrame),
47 data: data,
48 }
49
50
51 _, frameData, err := pm.frame(prepareKey{isServer: true, compress: false})
52 if err != nil {
53 return nil, err
54 }
55
56
57
58 pm.data = frameData[len(frameData)-len(data):]
59 return pm, nil
60 }
61
62 func (pm *PreparedMessage) frame(key prepareKey) (int, []byte, error) {
63 pm.mu.Lock()
64 frame, ok := pm.frames[key]
65 if !ok {
66 frame = &preparedFrame{}
67 pm.frames[key] = frame
68 }
69 pm.mu.Unlock()
70
71 var err error
72 frame.once.Do(func() {
73
74
75
76 mu := make(chan struct{}, 1)
77 mu <- struct{}{}
78 var nc prepareConn
79 c := &Conn{
80 conn: &nc,
81 mu: mu,
82 isServer: key.isServer,
83 compressionLevel: key.compressionLevel,
84 enableWriteCompression: true,
85 writeBuf: make([]byte, defaultWriteBufferSize+maxFrameHeaderSize),
86 }
87 if key.compress {
88 c.newCompressionWriter = compressNoContextTakeover
89 }
90 err = c.WriteMessage(pm.messageType, pm.data)
91 frame.data = nc.buf.Bytes()
92 })
93 return pm.messageType, frame.data, err
94 }
95
96 type prepareConn struct {
97 buf bytes.Buffer
98 net.Conn
99 }
100
101 func (pc *prepareConn) Write(p []byte) (int, error) { return pc.buf.Write(p) }
102 func (pc *prepareConn) SetWriteDeadline(t time.Time) error { return nil }
103
View as plain text