1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package cindex
16
17 import (
18 "encoding/binary"
19 "sync"
20 "sync/atomic"
21
22 "go.etcd.io/etcd/server/v3/mvcc/backend"
23 "go.etcd.io/etcd/server/v3/mvcc/buckets"
24 )
25
26 type Backend interface {
27 BatchTx() backend.BatchTx
28 ReadTx() backend.ReadTx
29 }
30
31
32 type ConsistentIndexer interface {
33
34
35 ConsistentIndex() uint64
36
37
38 ConsistentApplyingIndex() (uint64, uint64)
39
40
41 UnsafeConsistentIndex() uint64
42
43
44 SetConsistentIndex(v uint64, term uint64)
45
46
47 SetConsistentApplyingIndex(v uint64, term uint64)
48
49
50
51 UnsafeSave(tx backend.BatchTx)
52
53
54 SetBackend(be Backend)
55 }
56
57
58 type consistentIndex struct {
59
60
61
62 consistentIndex uint64
63
64
65
66 term uint64
67
68
69
70
71 applyingIndex uint64
72 applyingTerm uint64
73
74
75 be Backend
76
77 mutex sync.Mutex
78 }
79
80
81
82 func NewConsistentIndex(be Backend) ConsistentIndexer {
83 return &consistentIndex{be: be}
84 }
85
86 func (ci *consistentIndex) ConsistentIndex() uint64 {
87 if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
88 return index
89 }
90 ci.mutex.Lock()
91 defer ci.mutex.Unlock()
92
93 v, term := ReadConsistentIndex(ci.be.ReadTx())
94 ci.SetConsistentIndex(v, term)
95 return v
96 }
97
98 func (ci *consistentIndex) UnsafeConsistentIndex() uint64 {
99 if index := atomic.LoadUint64(&ci.consistentIndex); index > 0 {
100 return index
101 }
102
103 v, term := unsafeReadConsistentIndex(ci.be.ReadTx())
104 ci.SetConsistentIndex(v, term)
105 return v
106 }
107
108 func (ci *consistentIndex) SetConsistentIndex(v uint64, term uint64) {
109 atomic.StoreUint64(&ci.consistentIndex, v)
110 atomic.StoreUint64(&ci.term, term)
111 }
112
113 func (ci *consistentIndex) UnsafeSave(tx backend.BatchTx) {
114 index := atomic.LoadUint64(&ci.consistentIndex)
115 term := atomic.LoadUint64(&ci.term)
116 UnsafeUpdateConsistentIndex(tx, index, term)
117 }
118
119 func (ci *consistentIndex) SetBackend(be Backend) {
120 ci.mutex.Lock()
121 defer ci.mutex.Unlock()
122 ci.be = be
123
124 ci.SetConsistentIndex(0, 0)
125 }
126
127 func (ci *consistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
128 return atomic.LoadUint64(&ci.applyingIndex), atomic.LoadUint64(&ci.applyingTerm)
129 }
130
131 func (ci *consistentIndex) SetConsistentApplyingIndex(v uint64, term uint64) {
132 atomic.StoreUint64(&ci.applyingIndex, v)
133 atomic.StoreUint64(&ci.applyingTerm, term)
134 }
135
136 func NewFakeConsistentIndex(index uint64) ConsistentIndexer {
137 return &fakeConsistentIndex{index: index}
138 }
139
140 type fakeConsistentIndex struct {
141 index uint64
142 term uint64
143 }
144
145 func (f *fakeConsistentIndex) ConsistentIndex() uint64 {
146 return atomic.LoadUint64(&f.index)
147 }
148 func (f *fakeConsistentIndex) ConsistentApplyingIndex() (uint64, uint64) {
149 return atomic.LoadUint64(&f.index), atomic.LoadUint64(&f.term)
150 }
151 func (f *fakeConsistentIndex) UnsafeConsistentIndex() uint64 {
152 return atomic.LoadUint64(&f.index)
153 }
154
155 func (f *fakeConsistentIndex) SetConsistentIndex(index uint64, term uint64) {
156 atomic.StoreUint64(&f.index, index)
157 atomic.StoreUint64(&f.term, term)
158 }
159 func (f *fakeConsistentIndex) SetConsistentApplyingIndex(index uint64, term uint64) {
160 atomic.StoreUint64(&f.index, index)
161 atomic.StoreUint64(&f.term, term)
162 }
163
164 func (f *fakeConsistentIndex) UnsafeSave(_ backend.BatchTx) {}
165 func (f *fakeConsistentIndex) SetBackend(_ Backend) {}
166
167
168 func UnsafeCreateMetaBucket(tx backend.BatchTx) {
169 tx.UnsafeCreateBucket(buckets.Meta)
170 }
171
172
173 func CreateMetaBucket(tx backend.BatchTx) {
174 tx.LockOutsideApply()
175 defer tx.Unlock()
176 tx.UnsafeCreateBucket(buckets.Meta)
177 }
178
179
180
181
182 func unsafeReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
183 _, vs := tx.UnsafeRange(buckets.Meta, buckets.MetaConsistentIndexKeyName, nil, 0)
184 if len(vs) == 0 {
185 return 0, 0
186 }
187 v := binary.BigEndian.Uint64(vs[0])
188 _, ts := tx.UnsafeRange(buckets.Meta, buckets.MetaTermKeyName, nil, 0)
189 if len(ts) == 0 {
190 return v, 0
191 }
192 t := binary.BigEndian.Uint64(ts[0])
193 return v, t
194 }
195
196
197
198 func ReadConsistentIndex(tx backend.ReadTx) (uint64, uint64) {
199 tx.Lock()
200 defer tx.Unlock()
201 return unsafeReadConsistentIndex(tx)
202 }
203
204 func UnsafeUpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
205 if index == 0 {
206
207 return
208 }
209
210 bs1 := make([]byte, 8)
211 binary.BigEndian.PutUint64(bs1, index)
212
213
214 tx.UnsafePut(buckets.Meta, buckets.MetaConsistentIndexKeyName, bs1)
215 if term > 0 {
216 bs2 := make([]byte, 8)
217 binary.BigEndian.PutUint64(bs2, term)
218 tx.UnsafePut(buckets.Meta, buckets.MetaTermKeyName, bs2)
219 }
220 }
221
222 func UpdateConsistentIndex(tx backend.BatchTx, index uint64, term uint64) {
223 tx.LockOutsideApply()
224 defer tx.Unlock()
225 UnsafeUpdateConsistentIndex(tx, index, term)
226 }
227
View as plain text