1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package clientv3
16
17 import (
18 "context"
19 "fmt"
20 "io"
21
22 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
23 "go.uber.org/zap"
24 "google.golang.org/grpc"
25 )
26
27 type (
28 DefragmentResponse pb.DefragmentResponse
29 AlarmResponse pb.AlarmResponse
30 AlarmMember pb.AlarmMember
31 StatusResponse pb.StatusResponse
32 HashKVResponse pb.HashKVResponse
33 MoveLeaderResponse pb.MoveLeaderResponse
34 )
35
36 type Maintenance interface {
37
38 AlarmList(ctx context.Context) (*AlarmResponse, error)
39
40
41 AlarmDisarm(ctx context.Context, m *AlarmMember) (*AlarmResponse, error)
42
43
44
45
46
47
48
49
50 Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error)
51
52
53 Status(ctx context.Context, endpoint string) (*StatusResponse, error)
54
55
56
57
58 HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error)
59
60
61
62
63 Snapshot(ctx context.Context) (io.ReadCloser, error)
64
65
66
67 MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error)
68 }
69
70 type maintenance struct {
71 lg *zap.Logger
72 dial func(endpoint string) (pb.MaintenanceClient, func(), error)
73 remote pb.MaintenanceClient
74 callOpts []grpc.CallOption
75 }
76
77 func NewMaintenance(c *Client) Maintenance {
78 api := &maintenance{
79 lg: c.lg,
80 dial: func(endpoint string) (pb.MaintenanceClient, func(), error) {
81 conn, err := c.Dial(endpoint)
82 if err != nil {
83 return nil, nil, fmt.Errorf("failed to dial endpoint %s with maintenance client: %v", endpoint, err)
84 }
85
86
87 dctx := c.ctx
88 cancel := func() {}
89 if c.cfg.DialTimeout > 0 {
90 dctx, cancel = context.WithTimeout(c.ctx, c.cfg.DialTimeout)
91 }
92 err = c.getToken(dctx)
93 cancel()
94 if err != nil {
95 conn.Close()
96 return nil, nil, fmt.Errorf("failed to getToken from endpoint %s with maintenance client: %v", endpoint, err)
97 }
98 cancel = func() { conn.Close() }
99 return RetryMaintenanceClient(c, conn), cancel, nil
100 },
101 remote: RetryMaintenanceClient(c, c.conn),
102 }
103 if c != nil {
104 api.callOpts = c.callOpts
105 }
106 return api
107 }
108
109 func NewMaintenanceFromMaintenanceClient(remote pb.MaintenanceClient, c *Client) Maintenance {
110 api := &maintenance{
111 lg: c.lg,
112 dial: func(string) (pb.MaintenanceClient, func(), error) {
113 return remote, func() {}, nil
114 },
115 remote: remote,
116 }
117 if c != nil {
118 api.callOpts = c.callOpts
119 }
120 return api
121 }
122
123 func (m *maintenance) AlarmList(ctx context.Context) (*AlarmResponse, error) {
124 req := &pb.AlarmRequest{
125 Action: pb.AlarmRequest_GET,
126 MemberID: 0,
127 Alarm: pb.AlarmType_NONE,
128 }
129 resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
130 if err == nil {
131 return (*AlarmResponse)(resp), nil
132 }
133 return nil, toErr(ctx, err)
134 }
135
136 func (m *maintenance) AlarmDisarm(ctx context.Context, am *AlarmMember) (*AlarmResponse, error) {
137 req := &pb.AlarmRequest{
138 Action: pb.AlarmRequest_DEACTIVATE,
139 MemberID: am.MemberID,
140 Alarm: am.Alarm,
141 }
142
143 if req.MemberID == 0 && req.Alarm == pb.AlarmType_NONE {
144 ar, err := m.AlarmList(ctx)
145 if err != nil {
146 return nil, toErr(ctx, err)
147 }
148 ret := AlarmResponse{}
149 for _, am := range ar.Alarms {
150 dresp, derr := m.AlarmDisarm(ctx, (*AlarmMember)(am))
151 if derr != nil {
152 return nil, toErr(ctx, derr)
153 }
154 ret.Alarms = append(ret.Alarms, dresp.Alarms...)
155 }
156 return &ret, nil
157 }
158
159 resp, err := m.remote.Alarm(ctx, req, m.callOpts...)
160 if err == nil {
161 return (*AlarmResponse)(resp), nil
162 }
163 return nil, toErr(ctx, err)
164 }
165
166 func (m *maintenance) Defragment(ctx context.Context, endpoint string) (*DefragmentResponse, error) {
167 remote, cancel, err := m.dial(endpoint)
168 if err != nil {
169 return nil, toErr(ctx, err)
170 }
171 defer cancel()
172 resp, err := remote.Defragment(ctx, &pb.DefragmentRequest{}, m.callOpts...)
173 if err != nil {
174 return nil, toErr(ctx, err)
175 }
176 return (*DefragmentResponse)(resp), nil
177 }
178
179 func (m *maintenance) Status(ctx context.Context, endpoint string) (*StatusResponse, error) {
180 remote, cancel, err := m.dial(endpoint)
181 if err != nil {
182 return nil, toErr(ctx, err)
183 }
184 defer cancel()
185 resp, err := remote.Status(ctx, &pb.StatusRequest{}, m.callOpts...)
186 if err != nil {
187 return nil, toErr(ctx, err)
188 }
189 return (*StatusResponse)(resp), nil
190 }
191
192 func (m *maintenance) HashKV(ctx context.Context, endpoint string, rev int64) (*HashKVResponse, error) {
193 remote, cancel, err := m.dial(endpoint)
194 if err != nil {
195
196 return nil, toErr(ctx, err)
197 }
198 defer cancel()
199 resp, err := remote.HashKV(ctx, &pb.HashKVRequest{Revision: rev}, m.callOpts...)
200 if err != nil {
201 return nil, toErr(ctx, err)
202 }
203 return (*HashKVResponse)(resp), nil
204 }
205
206 func (m *maintenance) Snapshot(ctx context.Context) (io.ReadCloser, error) {
207 ss, err := m.remote.Snapshot(ctx, &pb.SnapshotRequest{}, append(m.callOpts, withMax(defaultStreamMaxRetries))...)
208 if err != nil {
209 return nil, toErr(ctx, err)
210 }
211
212 m.lg.Info("opened snapshot stream; downloading")
213 pr, pw := io.Pipe()
214 go func() {
215 for {
216 resp, err := ss.Recv()
217 if err != nil {
218 switch err {
219 case io.EOF:
220 m.lg.Info("completed snapshot read; closing")
221 default:
222 m.lg.Warn("failed to receive from snapshot stream; closing", zap.Error(err))
223 }
224 pw.CloseWithError(err)
225 return
226 }
227
228
229
230
231
232
233 if _, werr := pw.Write(resp.Blob); werr != nil {
234 pw.CloseWithError(werr)
235 return
236 }
237 }
238 }()
239 return &snapshotReadCloser{ctx: ctx, ReadCloser: pr}, nil
240 }
241
242 type snapshotReadCloser struct {
243 ctx context.Context
244 io.ReadCloser
245 }
246
247 func (rc *snapshotReadCloser) Read(p []byte) (n int, err error) {
248 n, err = rc.ReadCloser.Read(p)
249 return n, toErr(rc.ctx, err)
250 }
251
252 func (m *maintenance) MoveLeader(ctx context.Context, transfereeID uint64) (*MoveLeaderResponse, error) {
253 resp, err := m.remote.MoveLeader(ctx, &pb.MoveLeaderRequest{TargetID: transfereeID}, m.callOpts...)
254 return (*MoveLeaderResponse)(resp), toErr(ctx, err)
255 }
256
View as plain text