1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package rafthttp
16
17 import (
18 "bytes"
19 "context"
20 "io"
21 "io/ioutil"
22 "net/http"
23 "time"
24
25 "go.etcd.io/etcd/client/pkg/v3/types"
26 "go.etcd.io/etcd/pkg/v3/httputil"
27 pioutil "go.etcd.io/etcd/pkg/v3/ioutil"
28 "go.etcd.io/etcd/raft/v3"
29 "go.etcd.io/etcd/server/v3/etcdserver/api/snap"
30
31 "github.com/dustin/go-humanize"
32 "go.uber.org/zap"
33 )
34
35 var (
36
37 snapResponseReadTimeout = 5 * time.Second
38 )
39
40 type snapshotSender struct {
41 from, to types.ID
42 cid types.ID
43
44 tr *Transport
45 picker *urlPicker
46 status *peerStatus
47 r Raft
48 errorc chan error
49
50 stopc chan struct{}
51 }
52
53 func newSnapshotSender(tr *Transport, picker *urlPicker, to types.ID, status *peerStatus) *snapshotSender {
54 return &snapshotSender{
55 from: tr.ID,
56 to: to,
57 cid: tr.ClusterID,
58 tr: tr,
59 picker: picker,
60 status: status,
61 r: tr.Raft,
62 errorc: tr.ErrorC,
63 stopc: make(chan struct{}),
64 }
65 }
66
67 func (s *snapshotSender) stop() { close(s.stopc) }
68
69 func (s *snapshotSender) send(merged snap.Message) {
70 start := time.Now()
71
72 m := merged.Message
73 to := types.ID(m.To).String()
74
75 body := createSnapBody(s.tr.Logger, merged)
76 defer body.Close()
77
78 u := s.picker.pick()
79 req := createPostRequest(s.tr.Logger, u, RaftSnapshotPrefix, body, "application/octet-stream", s.tr.URLs, s.from, s.cid)
80
81 snapshotSizeVal := uint64(merged.TotalSize)
82 snapshotSize := humanize.Bytes(snapshotSizeVal)
83 if s.tr.Logger != nil {
84 s.tr.Logger.Info(
85 "sending database snapshot",
86 zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
87 zap.String("remote-peer-id", to),
88 zap.Uint64("bytes", snapshotSizeVal),
89 zap.String("size", snapshotSize),
90 )
91 }
92
93 snapshotSendInflights.WithLabelValues(to).Inc()
94 defer func() {
95 snapshotSendInflights.WithLabelValues(to).Dec()
96 }()
97
98 err := s.post(req)
99 defer merged.CloseWithError(err)
100 if err != nil {
101 if s.tr.Logger != nil {
102 s.tr.Logger.Warn(
103 "failed to send database snapshot",
104 zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
105 zap.String("remote-peer-id", to),
106 zap.Uint64("bytes", snapshotSizeVal),
107 zap.String("size", snapshotSize),
108 zap.Error(err),
109 )
110 }
111
112
113
114 if err == errMemberRemoved {
115 reportCriticalError(err, s.errorc)
116 }
117
118 s.picker.unreachable(u)
119 s.status.deactivate(failureType{source: sendSnap, action: "post"}, err.Error())
120 s.r.ReportUnreachable(m.To)
121
122
123
124 s.r.ReportSnapshot(m.To, raft.SnapshotFailure)
125 sentFailures.WithLabelValues(to).Inc()
126 snapshotSendFailures.WithLabelValues(to).Inc()
127 return
128 }
129 s.status.activate()
130 s.r.ReportSnapshot(m.To, raft.SnapshotFinish)
131
132 if s.tr.Logger != nil {
133 s.tr.Logger.Info(
134 "sent database snapshot",
135 zap.Uint64("snapshot-index", m.Snapshot.Metadata.Index),
136 zap.String("remote-peer-id", to),
137 zap.Uint64("bytes", snapshotSizeVal),
138 zap.String("size", snapshotSize),
139 )
140 }
141
142 sentBytes.WithLabelValues(to).Add(float64(merged.TotalSize))
143 snapshotSend.WithLabelValues(to).Inc()
144 snapshotSendSeconds.WithLabelValues(to).Observe(time.Since(start).Seconds())
145 }
146
147
148
149 func (s *snapshotSender) post(req *http.Request) (err error) {
150 ctx, cancel := context.WithCancel(context.Background())
151 req = req.WithContext(ctx)
152 defer cancel()
153
154 type responseAndError struct {
155 resp *http.Response
156 body []byte
157 err error
158 }
159 result := make(chan responseAndError, 1)
160
161 go func() {
162 resp, err := s.tr.pipelineRt.RoundTrip(req)
163 if err != nil {
164 result <- responseAndError{resp, nil, err}
165 return
166 }
167
168
169
170
171 time.AfterFunc(snapResponseReadTimeout, func() { httputil.GracefulClose(resp) })
172 body, err := ioutil.ReadAll(resp.Body)
173 result <- responseAndError{resp, body, err}
174 }()
175
176 select {
177 case <-s.stopc:
178 return errStopped
179 case r := <-result:
180 if r.err != nil {
181 return r.err
182 }
183 return checkPostResponse(s.tr.Logger, r.resp, r.body, req, s.to)
184 }
185 }
186
187 func createSnapBody(lg *zap.Logger, merged snap.Message) io.ReadCloser {
188 buf := new(bytes.Buffer)
189 enc := &messageEncoder{w: buf}
190
191 if err := enc.encode(&merged.Message); err != nil {
192 if lg != nil {
193 lg.Panic("failed to encode message", zap.Error(err))
194 }
195 }
196
197 return &pioutil.ReaderAndCloser{
198 Reader: io.MultiReader(buf, merged.ReadCloser),
199 Closer: merged.ReadCloser,
200 }
201 }
202
View as plain text