1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package namespace
16
17 import (
18 "context"
19
20 pb "go.etcd.io/etcd/api/v3/etcdserverpb"
21 "go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
22 "go.etcd.io/etcd/client/v3"
23 )
24
25 type kvPrefix struct {
26 clientv3.KV
27 pfx string
28 }
29
30
31
32 func NewKV(kv clientv3.KV, prefix string) clientv3.KV {
33 return &kvPrefix{kv, prefix}
34 }
35
36 func (kv *kvPrefix) Put(ctx context.Context, key, val string, opts ...clientv3.OpOption) (*clientv3.PutResponse, error) {
37 if len(key) == 0 {
38 return nil, rpctypes.ErrEmptyKey
39 }
40 op := kv.prefixOp(clientv3.OpPut(key, val, opts...))
41 r, err := kv.KV.Do(ctx, op)
42 if err != nil {
43 return nil, err
44 }
45 put := r.Put()
46 kv.unprefixPutResponse(put)
47 return put, nil
48 }
49
50 func (kv *kvPrefix) Get(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.GetResponse, error) {
51 if len(key) == 0 && !(clientv3.IsOptsWithFromKey(opts) || clientv3.IsOptsWithPrefix(opts)) {
52 return nil, rpctypes.ErrEmptyKey
53 }
54 r, err := kv.KV.Do(ctx, kv.prefixOp(clientv3.OpGet(key, opts...)))
55 if err != nil {
56 return nil, err
57 }
58 get := r.Get()
59 kv.unprefixGetResponse(get)
60 return get, nil
61 }
62
63 func (kv *kvPrefix) Delete(ctx context.Context, key string, opts ...clientv3.OpOption) (*clientv3.DeleteResponse, error) {
64 if len(key) == 0 && !(clientv3.IsOptsWithFromKey(opts) || clientv3.IsOptsWithPrefix(opts)) {
65 return nil, rpctypes.ErrEmptyKey
66 }
67 r, err := kv.KV.Do(ctx, kv.prefixOp(clientv3.OpDelete(key, opts...)))
68 if err != nil {
69 return nil, err
70 }
71 del := r.Del()
72 kv.unprefixDeleteResponse(del)
73 return del, nil
74 }
75
76 func (kv *kvPrefix) Do(ctx context.Context, op clientv3.Op) (clientv3.OpResponse, error) {
77 if len(op.KeyBytes()) == 0 && !op.IsTxn() {
78 return clientv3.OpResponse{}, rpctypes.ErrEmptyKey
79 }
80 r, err := kv.KV.Do(ctx, kv.prefixOp(op))
81 if err != nil {
82 return r, err
83 }
84 switch {
85 case r.Get() != nil:
86 kv.unprefixGetResponse(r.Get())
87 case r.Put() != nil:
88 kv.unprefixPutResponse(r.Put())
89 case r.Del() != nil:
90 kv.unprefixDeleteResponse(r.Del())
91 case r.Txn() != nil:
92 kv.unprefixTxnResponse(r.Txn())
93 }
94 return r, nil
95 }
96
97 type txnPrefix struct {
98 clientv3.Txn
99 kv *kvPrefix
100 }
101
102 func (kv *kvPrefix) Txn(ctx context.Context) clientv3.Txn {
103 return &txnPrefix{kv.KV.Txn(ctx), kv}
104 }
105
106 func (txn *txnPrefix) If(cs ...clientv3.Cmp) clientv3.Txn {
107 txn.Txn = txn.Txn.If(txn.kv.prefixCmps(cs)...)
108 return txn
109 }
110
111 func (txn *txnPrefix) Then(ops ...clientv3.Op) clientv3.Txn {
112 txn.Txn = txn.Txn.Then(txn.kv.prefixOps(ops)...)
113 return txn
114 }
115
116 func (txn *txnPrefix) Else(ops ...clientv3.Op) clientv3.Txn {
117 txn.Txn = txn.Txn.Else(txn.kv.prefixOps(ops)...)
118 return txn
119 }
120
121 func (txn *txnPrefix) Commit() (*clientv3.TxnResponse, error) {
122 resp, err := txn.Txn.Commit()
123 if err != nil {
124 return nil, err
125 }
126 txn.kv.unprefixTxnResponse(resp)
127 return resp, nil
128 }
129
130 func (kv *kvPrefix) prefixOp(op clientv3.Op) clientv3.Op {
131 if !op.IsTxn() {
132 begin, end := kv.prefixInterval(op.KeyBytes(), op.RangeBytes())
133 op.WithKeyBytes(begin)
134 op.WithRangeBytes(end)
135 return op
136 }
137 cmps, thenOps, elseOps := op.Txn()
138 return clientv3.OpTxn(kv.prefixCmps(cmps), kv.prefixOps(thenOps), kv.prefixOps(elseOps))
139 }
140
141 func (kv *kvPrefix) unprefixGetResponse(resp *clientv3.GetResponse) {
142 for i := range resp.Kvs {
143 resp.Kvs[i].Key = resp.Kvs[i].Key[len(kv.pfx):]
144 }
145 }
146
147 func (kv *kvPrefix) unprefixPutResponse(resp *clientv3.PutResponse) {
148 if resp.PrevKv != nil {
149 resp.PrevKv.Key = resp.PrevKv.Key[len(kv.pfx):]
150 }
151 }
152
153 func (kv *kvPrefix) unprefixDeleteResponse(resp *clientv3.DeleteResponse) {
154 for i := range resp.PrevKvs {
155 resp.PrevKvs[i].Key = resp.PrevKvs[i].Key[len(kv.pfx):]
156 }
157 }
158
159 func (kv *kvPrefix) unprefixTxnResponse(resp *clientv3.TxnResponse) {
160 for _, r := range resp.Responses {
161 switch tv := r.Response.(type) {
162 case *pb.ResponseOp_ResponseRange:
163 if tv.ResponseRange != nil {
164 kv.unprefixGetResponse((*clientv3.GetResponse)(tv.ResponseRange))
165 }
166 case *pb.ResponseOp_ResponsePut:
167 if tv.ResponsePut != nil {
168 kv.unprefixPutResponse((*clientv3.PutResponse)(tv.ResponsePut))
169 }
170 case *pb.ResponseOp_ResponseDeleteRange:
171 if tv.ResponseDeleteRange != nil {
172 kv.unprefixDeleteResponse((*clientv3.DeleteResponse)(tv.ResponseDeleteRange))
173 }
174 case *pb.ResponseOp_ResponseTxn:
175 if tv.ResponseTxn != nil {
176 kv.unprefixTxnResponse((*clientv3.TxnResponse)(tv.ResponseTxn))
177 }
178 default:
179 }
180 }
181 }
182
183 func (kv *kvPrefix) prefixInterval(key, end []byte) (pfxKey []byte, pfxEnd []byte) {
184 return prefixInterval(kv.pfx, key, end)
185 }
186
187 func (kv *kvPrefix) prefixCmps(cs []clientv3.Cmp) []clientv3.Cmp {
188 newCmps := make([]clientv3.Cmp, len(cs))
189 for i := range cs {
190 newCmps[i] = cs[i]
191 pfxKey, endKey := kv.prefixInterval(cs[i].KeyBytes(), cs[i].RangeEnd)
192 newCmps[i].WithKeyBytes(pfxKey)
193 if len(cs[i].RangeEnd) != 0 {
194 newCmps[i].RangeEnd = endKey
195 }
196 }
197 return newCmps
198 }
199
200 func (kv *kvPrefix) prefixOps(ops []clientv3.Op) []clientv3.Op {
201 newOps := make([]clientv3.Op, len(ops))
202 for i := range ops {
203 newOps[i] = kv.prefixOp(ops[i])
204 }
205 return newOps
206 }
207
View as plain text