...
1
16
17 package spdy
18
19 import (
20 "bufio"
21 "fmt"
22 "io"
23 "net"
24 "net/http"
25 "strings"
26 "sync/atomic"
27 "time"
28
29 "k8s.io/apimachinery/pkg/util/httpstream"
30 "k8s.io/apimachinery/pkg/util/runtime"
31 )
32
33 const HeaderSpdy31 = "SPDY/3.1"
34
35
36
37 type responseUpgrader struct {
38 pingPeriod time.Duration
39 }
40
41
42
43
44
45
46 type connWrapper struct {
47 net.Conn
48 closed int32
49 bufReader *bufio.Reader
50 }
51
52 func (w *connWrapper) Read(b []byte) (n int, err error) {
53 if atomic.LoadInt32(&w.closed) == 1 {
54 return 0, io.EOF
55 }
56 return w.bufReader.Read(b)
57 }
58
59 func (w *connWrapper) Close() error {
60 err := w.Conn.Close()
61 atomic.StoreInt32(&w.closed, 1)
62 return err
63 }
64
65
66
67
68 func NewResponseUpgrader() httpstream.ResponseUpgrader {
69 return NewResponseUpgraderWithPings(0)
70 }
71
72
73
74
75
76
77
78
79 func NewResponseUpgraderWithPings(pingPeriod time.Duration) httpstream.ResponseUpgrader {
80 return responseUpgrader{pingPeriod: pingPeriod}
81 }
82
83
84
85
86 func (u responseUpgrader) UpgradeResponse(w http.ResponseWriter, req *http.Request, newStreamHandler httpstream.NewStreamHandler) httpstream.Connection {
87 connectionHeader := strings.ToLower(req.Header.Get(httpstream.HeaderConnection))
88 upgradeHeader := strings.ToLower(req.Header.Get(httpstream.HeaderUpgrade))
89 if !strings.Contains(connectionHeader, strings.ToLower(httpstream.HeaderUpgrade)) || !strings.Contains(upgradeHeader, strings.ToLower(HeaderSpdy31)) {
90 errorMsg := fmt.Sprintf("unable to upgrade: missing upgrade headers in request: %#v", req.Header)
91 http.Error(w, errorMsg, http.StatusBadRequest)
92 return nil
93 }
94
95 hijacker, ok := w.(http.Hijacker)
96 if !ok {
97 errorMsg := "unable to upgrade: unable to hijack response"
98 http.Error(w, errorMsg, http.StatusInternalServerError)
99 return nil
100 }
101
102 w.Header().Add(httpstream.HeaderConnection, httpstream.HeaderUpgrade)
103 w.Header().Add(httpstream.HeaderUpgrade, HeaderSpdy31)
104 w.WriteHeader(http.StatusSwitchingProtocols)
105
106 conn, bufrw, err := hijacker.Hijack()
107 if err != nil {
108 runtime.HandleError(fmt.Errorf("unable to upgrade: error hijacking response: %v", err))
109 return nil
110 }
111
112 connWithBuf := &connWrapper{Conn: conn, bufReader: bufrw.Reader}
113 spdyConn, err := NewServerConnectionWithPings(connWithBuf, newStreamHandler, u.pingPeriod)
114 if err != nil {
115 runtime.HandleError(fmt.Errorf("unable to upgrade: error creating SPDY server connection: %v", err))
116 return nil
117 }
118
119 return spdyConn
120 }
121
View as plain text