1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package v3rpc
17
18 import (
19 "context"
20
21 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
22 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
23 "go.etcd.io/etcd/pkg/v3/adt"
24 "go.etcd.io/etcd/server/v3/etcdserver"
25 )
26
27 type kvServer struct {
28 hdr header
29 kv etcdserver.RaftKV
30
31
32
33
34 maxTxnOps uint
35 }
36
37 func NewKVServer(s *etcdserver.EtcdServer) pb.KVServer {
38 return &kvServer{hdr: newHeader(s), kv: s, maxTxnOps: s.Cfg.MaxTxnOps}
39 }
40
41 func (s *kvServer) Range(ctx context.Context, r *pb.RangeRequest) (*pb.RangeResponse, error) {
42 if err := checkRangeRequest(r); err != nil {
43 return nil, err
44 }
45
46 resp, err := s.kv.Range(ctx, r)
47 if err != nil {
48 return nil, togRPCError(err)
49 }
50
51 s.hdr.fill(resp.Header)
52 return resp, nil
53 }
54
55 func (s *kvServer) Put(ctx context.Context, r *pb.PutRequest) (*pb.PutResponse, error) {
56 if err := checkPutRequest(r); err != nil {
57 return nil, err
58 }
59
60 resp, err := s.kv.Put(ctx, r)
61 if err != nil {
62 return nil, togRPCError(err)
63 }
64
65 s.hdr.fill(resp.Header)
66 return resp, nil
67 }
68
69 func (s *kvServer) DeleteRange(ctx context.Context, r *pb.DeleteRangeRequest) (*pb.DeleteRangeResponse, error) {
70 if err := checkDeleteRequest(r); err != nil {
71 return nil, err
72 }
73
74 resp, err := s.kv.DeleteRange(ctx, r)
75 if err != nil {
76 return nil, togRPCError(err)
77 }
78
79 s.hdr.fill(resp.Header)
80 return resp, nil
81 }
82
83 func (s *kvServer) Txn(ctx context.Context, r *pb.TxnRequest) (*pb.TxnResponse, error) {
84 if err := checkTxnRequest(r, int(s.maxTxnOps)); err != nil {
85 return nil, err
86 }
87
88 if _, _, err := checkIntervals(r.Success); err != nil {
89 return nil, err
90 }
91 if _, _, err := checkIntervals(r.Failure); err != nil {
92 return nil, err
93 }
94
95 resp, err := s.kv.Txn(ctx, r)
96 if err != nil {
97 return nil, togRPCError(err)
98 }
99
100 s.hdr.fill(resp.Header)
101 return resp, nil
102 }
103
104 func (s *kvServer) Compact(ctx context.Context, r *pb.CompactionRequest) (*pb.CompactionResponse, error) {
105 resp, err := s.kv.Compact(ctx, r)
106 if err != nil {
107 return nil, togRPCError(err)
108 }
109
110 s.hdr.fill(resp.Header)
111 return resp, nil
112 }
113
114 func checkRangeRequest(r *pb.RangeRequest) error {
115 if len(r.Key) == 0 {
116 return rpctypes.ErrGRPCEmptyKey
117 }
118 return nil
119 }
120
121 func checkPutRequest(r *pb.PutRequest) error {
122 if len(r.Key) == 0 {
123 return rpctypes.ErrGRPCEmptyKey
124 }
125 if r.IgnoreValue && len(r.Value) != 0 {
126 return rpctypes.ErrGRPCValueProvided
127 }
128 if r.IgnoreLease && r.Lease != 0 {
129 return rpctypes.ErrGRPCLeaseProvided
130 }
131 return nil
132 }
133
134 func checkDeleteRequest(r *pb.DeleteRangeRequest) error {
135 if len(r.Key) == 0 {
136 return rpctypes.ErrGRPCEmptyKey
137 }
138 return nil
139 }
140
141 func checkTxnRequest(r *pb.TxnRequest, maxTxnOps int) error {
142 opc := len(r.Compare)
143 if opc < len(r.Success) {
144 opc = len(r.Success)
145 }
146 if opc < len(r.Failure) {
147 opc = len(r.Failure)
148 }
149 if opc > maxTxnOps {
150 return rpctypes.ErrGRPCTooManyOps
151 }
152
153 for _, c := range r.Compare {
154 if len(c.Key) == 0 {
155 return rpctypes.ErrGRPCEmptyKey
156 }
157 }
158 for _, u := range r.Success {
159 if err := checkRequestOp(u, maxTxnOps-opc); err != nil {
160 return err
161 }
162 }
163 for _, u := range r.Failure {
164 if err := checkRequestOp(u, maxTxnOps-opc); err != nil {
165 return err
166 }
167 }
168
169 return nil
170 }
171
172
173
174
175 func checkIntervals(reqs []*pb.RequestOp) (map[string]struct{}, adt.IntervalTree, error) {
176 dels := adt.NewIntervalTree()
177
178
179 for _, req := range reqs {
180 tv, ok := req.Request.(*pb.RequestOp_RequestDeleteRange)
181 if !ok {
182 continue
183 }
184 dreq := tv.RequestDeleteRange
185 if dreq == nil {
186 continue
187 }
188 var iv adt.Interval
189 if len(dreq.RangeEnd) != 0 {
190 iv = adt.NewStringAffineInterval(string(dreq.Key), string(dreq.RangeEnd))
191 } else {
192 iv = adt.NewStringAffinePoint(string(dreq.Key))
193 }
194 dels.Insert(iv, struct{}{})
195 }
196
197
198 puts := make(map[string]struct{})
199 for _, req := range reqs {
200 tv, ok := req.Request.(*pb.RequestOp_RequestTxn)
201 if !ok {
202 continue
203 }
204 putsThen, delsThen, err := checkIntervals(tv.RequestTxn.Success)
205 if err != nil {
206 return nil, dels, err
207 }
208 putsElse, delsElse, err := checkIntervals(tv.RequestTxn.Failure)
209 if err != nil {
210 return nil, dels, err
211 }
212 for k := range putsThen {
213 if _, ok := puts[k]; ok {
214 return nil, dels, rpctypes.ErrGRPCDuplicateKey
215 }
216 if dels.Intersects(adt.NewStringAffinePoint(k)) {
217 return nil, dels, rpctypes.ErrGRPCDuplicateKey
218 }
219 puts[k] = struct{}{}
220 }
221 for k := range putsElse {
222 if _, ok := puts[k]; ok {
223
224
225 if _, isSafe := putsThen[k]; !isSafe {
226 return nil, dels, rpctypes.ErrGRPCDuplicateKey
227 }
228 }
229 if dels.Intersects(adt.NewStringAffinePoint(k)) {
230 return nil, dels, rpctypes.ErrGRPCDuplicateKey
231 }
232 puts[k] = struct{}{}
233 }
234 dels.Union(delsThen, adt.NewStringAffineInterval("\x00", ""))
235 dels.Union(delsElse, adt.NewStringAffineInterval("\x00", ""))
236 }
237
238
239 for _, req := range reqs {
240 tv, ok := req.Request.(*pb.RequestOp_RequestPut)
241 if !ok || tv.RequestPut == nil {
242 continue
243 }
244 k := string(tv.RequestPut.Key)
245 if _, ok := puts[k]; ok {
246 return nil, dels, rpctypes.ErrGRPCDuplicateKey
247 }
248 if dels.Intersects(adt.NewStringAffinePoint(k)) {
249 return nil, dels, rpctypes.ErrGRPCDuplicateKey
250 }
251 puts[k] = struct{}{}
252 }
253 return puts, dels, nil
254 }
255
256 func checkRequestOp(u *pb.RequestOp, maxTxnOps int) error {
257
258 switch uv := u.Request.(type) {
259 case *pb.RequestOp_RequestRange:
260 return checkRangeRequest(uv.RequestRange)
261 case *pb.RequestOp_RequestPut:
262 return checkPutRequest(uv.RequestPut)
263 case *pb.RequestOp_RequestDeleteRange:
264 return checkDeleteRequest(uv.RequestDeleteRange)
265 case *pb.RequestOp_RequestTxn:
266 return checkTxnRequest(uv.RequestTxn, maxTxnOps)
267 default:
268
269 return rpctypes.ErrGRPCKeyNotFound
270 }
271 }
272
View as plain text