...
1
16
17 package portforward
18
19 import (
20 "errors"
21 "fmt"
22 "io"
23 "net"
24 "sync"
25 "time"
26
27 gwebsocket "github.com/gorilla/websocket"
28
29 "k8s.io/klog/v2"
30 )
31
32 var _ net.Conn = &TunnelingConnection{}
33
34
35
36 type TunnelingConnection struct {
37 name string
38 conn *gwebsocket.Conn
39 inProgressMessage io.Reader
40 closeOnce sync.Once
41 }
42
43
44
45 func NewTunnelingConnection(name string, conn *gwebsocket.Conn) *TunnelingConnection {
46 return &TunnelingConnection{
47 name: name,
48 conn: conn,
49 }
50 }
51
52
53
54
55 func (c *TunnelingConnection) Read(p []byte) (int, error) {
56 klog.V(7).Infof("%s: tunneling connection read...", c.name)
57 defer klog.V(7).Infof("%s: tunneling connection read...complete", c.name)
58 for {
59 if c.inProgressMessage == nil {
60 klog.V(8).Infof("%s: tunneling connection read before NextReader()...", c.name)
61 messageType, nextReader, err := c.conn.NextReader()
62 if err != nil {
63 closeError := &gwebsocket.CloseError{}
64 if errors.As(err, &closeError) && closeError.Code == gwebsocket.CloseNormalClosure {
65 return 0, io.EOF
66 }
67 klog.V(4).Infof("%s:tunneling connection NextReader() error: %v", c.name, err)
68 return 0, err
69 }
70 if messageType != gwebsocket.BinaryMessage {
71 return 0, fmt.Errorf("invalid message type received")
72 }
73 c.inProgressMessage = nextReader
74 }
75 klog.V(8).Infof("%s: tunneling connection read in progress message...", c.name)
76 i, err := c.inProgressMessage.Read(p)
77 if i == 0 && err == io.EOF {
78 c.inProgressMessage = nil
79 } else {
80 klog.V(8).Infof("%s: read %d bytes, error=%v, bytes=% X", c.name, i, err, p[:i])
81 return i, err
82 }
83 }
84 }
85
86
87
88
89 func (c *TunnelingConnection) Write(p []byte) (n int, err error) {
90 klog.V(7).Infof("%s: write: %d bytes, bytes=% X", c.name, len(p), p)
91 defer klog.V(7).Infof("%s: tunneling connection write...complete", c.name)
92 w, err := c.conn.NextWriter(gwebsocket.BinaryMessage)
93 if err != nil {
94 return 0, err
95 }
96 defer func() {
97
98 closeErr := w.Close()
99 if closeErr != nil && err == nil {
100
101 err = closeErr
102 }
103 }()
104
105 n, err = w.Write(p)
106 return
107 }
108
109
110
111 func (c *TunnelingConnection) Close() error {
112 var err error
113 c.closeOnce.Do(func() {
114 klog.V(7).Infof("%s: tunneling connection Close()...", c.name)
115
116 normalCloseMsg := gwebsocket.FormatCloseMessage(gwebsocket.CloseNormalClosure, "")
117 writeControlErr := c.conn.WriteControl(gwebsocket.CloseMessage, normalCloseMsg, time.Now().Add(time.Second))
118 closeErr := c.conn.Close()
119 if closeErr != nil {
120 err = closeErr
121 } else if writeControlErr != nil {
122 err = writeControlErr
123 }
124 })
125 return err
126 }
127
128
129
130 func (c *TunnelingConnection) LocalAddr() net.Addr {
131 return c.conn.LocalAddr()
132 }
133
134
135
136 func (c *TunnelingConnection) RemoteAddr() net.Addr {
137 return c.conn.RemoteAddr()
138 }
139
140
141
142 func (c *TunnelingConnection) SetDeadline(t time.Time) error {
143 rerr := c.SetReadDeadline(t)
144 werr := c.SetWriteDeadline(t)
145 return errors.Join(rerr, werr)
146 }
147
148
149
150 func (c *TunnelingConnection) SetReadDeadline(t time.Time) error {
151 return c.conn.SetReadDeadline(t)
152 }
153
154
155
156 func (c *TunnelingConnection) SetWriteDeadline(t time.Time) error {
157 return c.conn.SetWriteDeadline(t)
158 }
159
View as plain text