1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package v3rpc
16
17 import (
18 "context"
19 "crypto/sha256"
20 "io"
21 "time"
22
23 "github.com/dustin/go-humanize"
24 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
25 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
26 "go.etcd.io/etcd/api/v3/version"
27 "go.etcd.io/etcd/raft/v3"
28 "go.etcd.io/etcd/server/v3/auth"
29 "go.etcd.io/etcd/server/v3/etcdserver"
30 "go.etcd.io/etcd/server/v3/mvcc"
31 "go.etcd.io/etcd/server/v3/mvcc/backend"
32
33 "go.uber.org/zap"
34 )
35
36 type KVGetter interface {
37 KV() mvcc.WatchableKV
38 }
39
40 type BackendGetter interface {
41 Backend() backend.Backend
42 }
43
44 type Alarmer interface {
45
46
47 Alarms() []*pb.AlarmMember
48 Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error)
49 }
50
51 type Downgrader interface {
52 Downgrade(ctx context.Context, dr *pb.DowngradeRequest) (*pb.DowngradeResponse, error)
53 }
54
55 type LeaderTransferrer interface {
56 MoveLeader(ctx context.Context, lead, target uint64) error
57 }
58
59 type AuthGetter interface {
60 AuthInfoFromCtx(ctx context.Context) (*auth.AuthInfo, error)
61 AuthStore() auth.AuthStore
62 }
63
64 type ClusterStatusGetter interface {
65 IsLearner() bool
66 }
67
68 type maintenanceServer struct {
69 lg *zap.Logger
70 rg etcdserver.RaftStatusGetter
71 hasher mvcc.HashStorage
72 kg KVGetter
73 bg BackendGetter
74 a Alarmer
75 lt LeaderTransferrer
76 hdr header
77 cs ClusterStatusGetter
78 d Downgrader
79 }
80
81 func NewMaintenanceServer(s *etcdserver.EtcdServer) pb.MaintenanceServer {
82 srv := &maintenanceServer{lg: s.Cfg.Logger, rg: s, hasher: s.KV().HashStorage(), kg: s, bg: s, a: s, lt: s, hdr: newHeader(s), cs: s, d: s}
83 if srv.lg == nil {
84 srv.lg = zap.NewNop()
85 }
86 return &authMaintenanceServer{srv, s}
87 }
88
89 func (ms *maintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
90 ms.lg.Info("starting defragment")
91 err := ms.bg.Backend().Defrag()
92 if err != nil {
93 ms.lg.Warn("failed to defragment", zap.Error(err))
94 return nil, err
95 }
96 ms.lg.Info("finished defragment")
97 return &pb.DefragmentResponse{}, nil
98 }
99
100
101 const snapshotSendBufferSize = 32 * 1024
102
103 func (ms *maintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
104 snap := ms.bg.Backend().Snapshot()
105 pr, pw := io.Pipe()
106
107 defer pr.Close()
108
109 go func() {
110 snap.WriteTo(pw)
111 if err := snap.Close(); err != nil {
112 ms.lg.Warn("failed to close snapshot", zap.Error(err))
113 }
114 pw.Close()
115 }()
116
117
118
119 h := sha256.New()
120
121 sent := int64(0)
122 total := snap.Size()
123 size := humanize.Bytes(uint64(total))
124
125 start := time.Now()
126 ms.lg.Info("sending database snapshot to client",
127 zap.Int64("total-bytes", total),
128 zap.String("size", size),
129 )
130 for total-sent > 0 {
131
132
133
134
135
136 buf := make([]byte, snapshotSendBufferSize)
137
138 n, err := io.ReadFull(pr, buf)
139 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
140 return togRPCError(err)
141 }
142 sent += int64(n)
143
144
145
146
147
148
149
150
151
152 resp := &pb.SnapshotResponse{
153 RemainingBytes: uint64(total - sent),
154 Blob: buf[:n],
155 }
156 if err = srv.Send(resp); err != nil {
157 return togRPCError(err)
158 }
159 h.Write(buf[:n])
160 }
161
162
163
164 sha := h.Sum(nil)
165
166 ms.lg.Info("sending database sha256 checksum to client",
167 zap.Int64("total-bytes", total),
168 zap.Int("checksum-size", len(sha)),
169 )
170 hresp := &pb.SnapshotResponse{RemainingBytes: 0, Blob: sha}
171 if err := srv.Send(hresp); err != nil {
172 return togRPCError(err)
173 }
174
175 ms.lg.Info("successfully sent database snapshot to client",
176 zap.Int64("total-bytes", total),
177 zap.String("size", size),
178 zap.String("took", humanize.Time(start)),
179 )
180 return nil
181 }
182
183 func (ms *maintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
184 h, rev, err := ms.hasher.Hash()
185 if err != nil {
186 return nil, togRPCError(err)
187 }
188 resp := &pb.HashResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h}
189 ms.hdr.fill(resp.Header)
190 return resp, nil
191 }
192
193 func (ms *maintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
194 h, rev, err := ms.hasher.HashByRev(r.Revision)
195 if err != nil {
196 return nil, togRPCError(err)
197 }
198
199 resp := &pb.HashKVResponse{Header: &pb.ResponseHeader{Revision: rev}, Hash: h.Hash, CompactRevision: h.CompactRevision}
200 ms.hdr.fill(resp.Header)
201 return resp, nil
202 }
203
204 func (ms *maintenanceServer) Alarm(ctx context.Context, ar *pb.AlarmRequest) (*pb.AlarmResponse, error) {
205 resp, err := ms.a.Alarm(ctx, ar)
206 if err != nil {
207 return nil, togRPCError(err)
208 }
209 if resp.Header == nil {
210 resp.Header = &pb.ResponseHeader{}
211 }
212 ms.hdr.fill(resp.Header)
213 return resp, nil
214 }
215
216 func (ms *maintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
217 hdr := &pb.ResponseHeader{}
218 ms.hdr.fill(hdr)
219 resp := &pb.StatusResponse{
220 Header: hdr,
221 Version: version.Version,
222 Leader: uint64(ms.rg.Leader()),
223 RaftIndex: ms.rg.CommittedIndex(),
224 RaftAppliedIndex: ms.rg.AppliedIndex(),
225 RaftTerm: ms.rg.Term(),
226 DbSize: ms.bg.Backend().Size(),
227 DbSizeInUse: ms.bg.Backend().SizeInUse(),
228 IsLearner: ms.cs.IsLearner(),
229 }
230 if resp.Leader == raft.None {
231 resp.Errors = append(resp.Errors, etcdserver.ErrNoLeader.Error())
232 }
233 for _, a := range ms.a.Alarms() {
234 resp.Errors = append(resp.Errors, a.String())
235 }
236 return resp, nil
237 }
238
239 func (ms *maintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
240 if ms.rg.ID() != ms.rg.Leader() {
241 return nil, rpctypes.ErrGRPCNotLeader
242 }
243
244 if err := ms.lt.MoveLeader(ctx, uint64(ms.rg.Leader()), tr.TargetID); err != nil {
245 return nil, togRPCError(err)
246 }
247 return &pb.MoveLeaderResponse{}, nil
248 }
249
250 func (ms *maintenanceServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
251 resp, err := ms.d.Downgrade(ctx, r)
252 if err != nil {
253 return nil, togRPCError(err)
254 }
255 resp.Header = &pb.ResponseHeader{}
256 ms.hdr.fill(resp.Header)
257 return resp, nil
258 }
259
260 type authMaintenanceServer struct {
261 *maintenanceServer
262 ag AuthGetter
263 }
264
265 func (ams *authMaintenanceServer) isAuthenticated(ctx context.Context) error {
266 authInfo, err := ams.ag.AuthInfoFromCtx(ctx)
267 if err != nil {
268 return err
269 }
270
271 return ams.ag.AuthStore().IsAdminPermitted(authInfo)
272 }
273
274 func (ams *authMaintenanceServer) Defragment(ctx context.Context, sr *pb.DefragmentRequest) (*pb.DefragmentResponse, error) {
275 if err := ams.isAuthenticated(ctx); err != nil {
276 return nil, err
277 }
278
279 return ams.maintenanceServer.Defragment(ctx, sr)
280 }
281
282 func (ams *authMaintenanceServer) Snapshot(sr *pb.SnapshotRequest, srv pb.Maintenance_SnapshotServer) error {
283 if err := ams.isAuthenticated(srv.Context()); err != nil {
284 return err
285 }
286
287 return ams.maintenanceServer.Snapshot(sr, srv)
288 }
289
290 func (ams *authMaintenanceServer) Hash(ctx context.Context, r *pb.HashRequest) (*pb.HashResponse, error) {
291 if err := ams.isAuthenticated(ctx); err != nil {
292 return nil, err
293 }
294
295 return ams.maintenanceServer.Hash(ctx, r)
296 }
297
298 func (ams *authMaintenanceServer) HashKV(ctx context.Context, r *pb.HashKVRequest) (*pb.HashKVResponse, error) {
299 if err := ams.isAuthenticated(ctx); err != nil {
300 return nil, err
301 }
302 return ams.maintenanceServer.HashKV(ctx, r)
303 }
304
305 func (ams *authMaintenanceServer) Status(ctx context.Context, ar *pb.StatusRequest) (*pb.StatusResponse, error) {
306 return ams.maintenanceServer.Status(ctx, ar)
307 }
308
309 func (ams *authMaintenanceServer) MoveLeader(ctx context.Context, tr *pb.MoveLeaderRequest) (*pb.MoveLeaderResponse, error) {
310 return ams.maintenanceServer.MoveLeader(ctx, tr)
311 }
312
313 func (ams *authMaintenanceServer) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) {
314 return ams.maintenanceServer.Downgrade(ctx, r)
315 }
316
View as plain text