1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "fmt"
19 "io"
20 "net"
21 "net/http"
22 "net/url"
23 "strings"
24 "time"
25
26 "go.etcd.io/etcd/api/v3/version"
27 "go.etcd.io/etcd/client/pkg/v3/transport"
28 "go.etcd.io/etcd/client/pkg/v3/types"
29
30 "github.com/coreos/go-semver/semver"
31 "go.uber.org/zap"
32 )
33
34 var (
35 errMemberRemoved = fmt.Errorf("the member has been permanently removed from the cluster")
36 errMemberNotFound = fmt.Errorf("member not found")
37 )
38
39
40
41 func NewListener(u url.URL, tlsinfo *transport.TLSInfo) (net.Listener, error) {
42 return transport.NewListenerWithOpts(u.Host, u.Scheme, transport.WithTLSInfo(tlsinfo), transport.WithTimeout(ConnReadTimeout, ConnWriteTimeout))
43 }
44
45
46
47 func NewRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
48
49
50
51 return transport.NewTimeoutTransport(tlsInfo, dialTimeout, 0, 0)
52 }
53
54
55
56
57
58
59 func newStreamRoundTripper(tlsInfo transport.TLSInfo, dialTimeout time.Duration) (http.RoundTripper, error) {
60 return transport.NewTimeoutTransport(tlsInfo, dialTimeout, ConnReadTimeout, ConnWriteTimeout)
61 }
62
63
64 func createPostRequest(lg *zap.Logger, u url.URL, path string, body io.Reader, ct string, urls types.URLs, from, cid types.ID) *http.Request {
65 uu := u
66 uu.Path = path
67 req, err := http.NewRequest("POST", uu.String(), body)
68 if err != nil {
69 if lg != nil {
70 lg.Panic("unexpected new request error", zap.Error(err))
71 }
72 }
73 req.Header.Set("Content-Type", ct)
74 req.Header.Set("X-Server-From", from.String())
75 req.Header.Set("X-Server-Version", version.Version)
76 req.Header.Set("X-Min-Cluster-Version", version.MinClusterVersion)
77 req.Header.Set("X-Etcd-Cluster-ID", cid.String())
78 setPeerURLsHeader(req, urls)
79
80 return req
81 }
82
83
84
85 func checkPostResponse(lg *zap.Logger, resp *http.Response, body []byte, req *http.Request, to types.ID) error {
86 switch resp.StatusCode {
87 case http.StatusPreconditionFailed:
88 switch strings.TrimSuffix(string(body), "\n") {
89 case errIncompatibleVersion.Error():
90 if lg != nil {
91 lg.Error(
92 "request sent was ignored by peer",
93 zap.String("remote-peer-id", to.String()),
94 )
95 }
96 return errIncompatibleVersion
97 case ErrClusterIDMismatch.Error():
98 if lg != nil {
99 lg.Error(
100 "request sent was ignored due to cluster ID mismatch",
101 zap.String("remote-peer-id", to.String()),
102 zap.String("remote-peer-cluster-id", resp.Header.Get("X-Etcd-Cluster-ID")),
103 zap.String("local-member-cluster-id", req.Header.Get("X-Etcd-Cluster-ID")),
104 )
105 }
106 return ErrClusterIDMismatch
107 default:
108 return fmt.Errorf("unhandled error %q when precondition failed", string(body))
109 }
110 case http.StatusForbidden:
111 return errMemberRemoved
112 case http.StatusNoContent:
113 return nil
114 default:
115 return fmt.Errorf("unexpected http status %s while posting to %q", http.StatusText(resp.StatusCode), req.URL.String())
116 }
117 }
118
119
120
121
122
123
124 func reportCriticalError(err error, errc chan<- error) {
125 select {
126 case errc <- err:
127 default:
128 }
129 }
130
131
132
133
134 func compareMajorMinorVersion(a, b *semver.Version) int {
135 na := &semver.Version{Major: a.Major, Minor: a.Minor}
136 nb := &semver.Version{Major: b.Major, Minor: b.Minor}
137 switch {
138 case na.LessThan(*nb):
139 return -1
140 case nb.LessThan(*na):
141 return 1
142 default:
143 return 0
144 }
145 }
146
147
148 func serverVersion(h http.Header) *semver.Version {
149 verStr := h.Get("X-Server-Version")
150
151 if verStr == "" {
152 verStr = "2.0.0"
153 }
154 return semver.Must(semver.NewVersion(verStr))
155 }
156
157
158 func minClusterVersion(h http.Header) *semver.Version {
159 verStr := h.Get("X-Min-Cluster-Version")
160
161 if verStr == "" {
162 verStr = "2.0.0"
163 }
164 return semver.Must(semver.NewVersion(verStr))
165 }
166
167
168
169 func checkVersionCompatibility(name string, server, minCluster *semver.Version) (
170 localServer *semver.Version,
171 localMinCluster *semver.Version,
172 err error) {
173 localServer = semver.Must(semver.NewVersion(version.Version))
174 localMinCluster = semver.Must(semver.NewVersion(version.MinClusterVersion))
175 if compareMajorMinorVersion(server, localMinCluster) == -1 {
176 return localServer, localMinCluster, fmt.Errorf("remote version is too low: remote[%s]=%s, local=%s", name, server, localServer)
177 }
178 if compareMajorMinorVersion(minCluster, localServer) == 1 {
179 return localServer, localMinCluster, fmt.Errorf("local version is too low: remote[%s]=%s, local=%s", name, server, localServer)
180 }
181 return localServer, localMinCluster, nil
182 }
183
184
185 func setPeerURLsHeader(req *http.Request, urls types.URLs) {
186 if urls == nil {
187
188 return
189 }
190 peerURLs := make([]string, urls.Len())
191 for i := range urls {
192 peerURLs[i] = urls[i].String()
193 }
194 req.Header.Set("X-PeerURLs", strings.Join(peerURLs, ","))
195 }
196
197
198 func addRemoteFromRequest(tr Transporter, r *http.Request) {
199 if from, err := types.IDFromString(r.Header.Get("X-Server-From")); err == nil {
200 if urls := r.Header.Get("X-PeerURLs"); urls != "" {
201 tr.AddRemote(from, strings.Split(urls, ","))
202 }
203 }
204 }
205
View as plain text