1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "encoding/binary"
11 "fmt"
12 "io"
13
14 "github.com/syndtr/goleveldb/leveldb/errors"
15 "github.com/syndtr/goleveldb/leveldb/memdb"
16 "github.com/syndtr/goleveldb/leveldb/storage"
17 )
18
19
20
21 type ErrBatchCorrupted struct {
22 Reason string
23 }
24
25 func (e *ErrBatchCorrupted) Error() string {
26 return fmt.Sprintf("leveldb: batch corrupted: %s", e.Reason)
27 }
28
29 func newErrBatchCorrupted(reason string) error {
30 return errors.NewErrCorrupted(storage.FileDesc{}, &ErrBatchCorrupted{reason})
31 }
32
33 const (
34 batchHeaderLen = 8 + 4
35 batchGrowLimit = 3000
36 )
37
38
39 type BatchReplay interface {
40 Put(key, value []byte)
41 Delete(key []byte)
42 }
43
44 type batchIndex struct {
45 keyType keyType
46 keyPos, keyLen int
47 valuePos, valueLen int
48 }
49
50 func (index batchIndex) k(data []byte) []byte {
51 return data[index.keyPos : index.keyPos+index.keyLen]
52 }
53
54 func (index batchIndex) v(data []byte) []byte {
55 if index.valueLen != 0 {
56 return data[index.valuePos : index.valuePos+index.valueLen]
57 }
58 return nil
59 }
60
61
62 type Batch struct {
63 data []byte
64 index []batchIndex
65
66
67 internalLen int
68
69
70
71
72
73 growLimit int
74 }
75
76 func (b *Batch) grow(n int) {
77 o := len(b.data)
78 if cap(b.data)-o < n {
79 limit := batchGrowLimit
80 if b.growLimit > 0 {
81 limit = b.growLimit
82 }
83 div := 1
84 if len(b.index) > limit {
85 div = len(b.index) / limit
86 }
87 ndata := make([]byte, o, o+n+o/div)
88 copy(ndata, b.data)
89 b.data = ndata
90 }
91 }
92
93 func (b *Batch) appendRec(kt keyType, key, value []byte) {
94 n := 1 + binary.MaxVarintLen32 + len(key)
95 if kt == keyTypeVal {
96 n += binary.MaxVarintLen32 + len(value)
97 }
98 b.grow(n)
99 index := batchIndex{keyType: kt}
100 o := len(b.data)
101 data := b.data[:o+n]
102 data[o] = byte(kt)
103 o++
104 o += binary.PutUvarint(data[o:], uint64(len(key)))
105 index.keyPos = o
106 index.keyLen = len(key)
107 o += copy(data[o:], key)
108 if kt == keyTypeVal {
109 o += binary.PutUvarint(data[o:], uint64(len(value)))
110 index.valuePos = o
111 index.valueLen = len(value)
112 o += copy(data[o:], value)
113 }
114 b.data = data[:o]
115 b.index = append(b.index, index)
116 b.internalLen += index.keyLen + index.valueLen + 8
117 }
118
119
120
121
122 func (b *Batch) Put(key, value []byte) {
123 b.appendRec(keyTypeVal, key, value)
124 }
125
126
127
128
129 func (b *Batch) Delete(key []byte) {
130 b.appendRec(keyTypeDel, key, nil)
131 }
132
133
134
135
136
137 func (b *Batch) Dump() []byte {
138 return b.data
139 }
140
141
142
143
144
145 func (b *Batch) Load(data []byte) error {
146 return b.decode(data, -1)
147 }
148
149
150 func (b *Batch) Replay(r BatchReplay) error {
151 for _, index := range b.index {
152 switch index.keyType {
153 case keyTypeVal:
154 r.Put(index.k(b.data), index.v(b.data))
155 case keyTypeDel:
156 r.Delete(index.k(b.data))
157 }
158 }
159 return nil
160 }
161
162
163 func (b *Batch) Len() int {
164 return len(b.index)
165 }
166
167
168 func (b *Batch) Reset() {
169 b.data = b.data[:0]
170 b.index = b.index[:0]
171 b.internalLen = 0
172 }
173
174 func (b *Batch) replayInternal(fn func(i int, kt keyType, k, v []byte) error) error {
175 for i, index := range b.index {
176 if err := fn(i, index.keyType, index.k(b.data), index.v(b.data)); err != nil {
177 return err
178 }
179 }
180 return nil
181 }
182
183 func (b *Batch) append(p *Batch) {
184 ob := len(b.data)
185 oi := len(b.index)
186 b.data = append(b.data, p.data...)
187 b.index = append(b.index, p.index...)
188 b.internalLen += p.internalLen
189
190
191 if ob != 0 {
192 for ; oi < len(b.index); oi++ {
193 index := &b.index[oi]
194 index.keyPos += ob
195 if index.valueLen != 0 {
196 index.valuePos += ob
197 }
198 }
199 }
200 }
201
202 func (b *Batch) decode(data []byte, expectedLen int) error {
203 b.data = data
204 b.index = b.index[:0]
205 b.internalLen = 0
206 err := decodeBatch(data, func(i int, index batchIndex) error {
207 b.index = append(b.index, index)
208 b.internalLen += index.keyLen + index.valueLen + 8
209 return nil
210 })
211 if err != nil {
212 return err
213 }
214 if expectedLen >= 0 && len(b.index) != expectedLen {
215 return newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", expectedLen, len(b.index)))
216 }
217 return nil
218 }
219
220 func (b *Batch) putMem(seq uint64, mdb *memdb.DB) error {
221 var ik []byte
222 for i, index := range b.index {
223 ik = makeInternalKey(ik, index.k(b.data), seq+uint64(i), index.keyType)
224 if err := mdb.Put(ik, index.v(b.data)); err != nil {
225 return err
226 }
227 }
228 return nil
229 }
230
231 func newBatch() interface{} {
232 return &Batch{}
233 }
234
235
236 func MakeBatch(n int) *Batch {
237 return &Batch{data: make([]byte, 0, n)}
238 }
239
240
241 type BatchConfig struct {
242
243
244
245 InitialCapacity int
246
247
248
249
250
251
252
253
254
255
256
257
258
259 GrowLimit int
260 }
261
262
263 func MakeBatchWithConfig(config *BatchConfig) *Batch {
264 var batch = new(Batch)
265 if config != nil {
266 if config.InitialCapacity > 0 {
267 batch.data = make([]byte, 0, config.InitialCapacity)
268 }
269 if config.GrowLimit > 0 {
270 batch.growLimit = config.GrowLimit
271 }
272 }
273 return batch
274 }
275
276 func decodeBatch(data []byte, fn func(i int, index batchIndex) error) error {
277 var index batchIndex
278 for i, o := 0, 0; o < len(data); i++ {
279
280 index.keyType = keyType(data[o])
281 if index.keyType > keyTypeVal {
282 return newErrBatchCorrupted(fmt.Sprintf("bad record: invalid type %#x", uint(index.keyType)))
283 }
284 o++
285
286
287 x, n := binary.Uvarint(data[o:])
288 o += n
289 if n <= 0 || o+int(x) > len(data) {
290 return newErrBatchCorrupted("bad record: invalid key length")
291 }
292 index.keyPos = o
293 index.keyLen = int(x)
294 o += index.keyLen
295
296
297 if index.keyType == keyTypeVal {
298 x, n = binary.Uvarint(data[o:])
299 o += n
300 if n <= 0 || o+int(x) > len(data) {
301 return newErrBatchCorrupted("bad record: invalid value length")
302 }
303 index.valuePos = o
304 index.valueLen = int(x)
305 o += index.valueLen
306 } else {
307 index.valuePos = 0
308 index.valueLen = 0
309 }
310
311 if err := fn(i, index); err != nil {
312 return err
313 }
314 }
315 return nil
316 }
317
318 func decodeBatchToMem(data []byte, expectSeq uint64, mdb *memdb.DB) (seq uint64, batchLen int, err error) {
319 seq, batchLen, err = decodeBatchHeader(data)
320 if err != nil {
321 return 0, 0, err
322 }
323 if seq < expectSeq {
324 return 0, 0, newErrBatchCorrupted("invalid sequence number")
325 }
326 data = data[batchHeaderLen:]
327 var ik []byte
328 var decodedLen int
329 err = decodeBatch(data, func(i int, index batchIndex) error {
330 if i >= batchLen {
331 return newErrBatchCorrupted("invalid records length")
332 }
333 ik = makeInternalKey(ik, index.k(data), seq+uint64(i), index.keyType)
334 if err := mdb.Put(ik, index.v(data)); err != nil {
335 return err
336 }
337 decodedLen++
338 return nil
339 })
340 if err == nil && decodedLen != batchLen {
341 err = newErrBatchCorrupted(fmt.Sprintf("invalid records length: %d vs %d", batchLen, decodedLen))
342 }
343 return
344 }
345
346 func encodeBatchHeader(dst []byte, seq uint64, batchLen int) []byte {
347 dst = ensureBuffer(dst, batchHeaderLen)
348 binary.LittleEndian.PutUint64(dst, seq)
349 binary.LittleEndian.PutUint32(dst[8:], uint32(batchLen))
350 return dst
351 }
352
353 func decodeBatchHeader(data []byte) (seq uint64, batchLen int, err error) {
354 if len(data) < batchHeaderLen {
355 return 0, 0, newErrBatchCorrupted("too short")
356 }
357
358 seq = binary.LittleEndian.Uint64(data)
359 batchLen = int(binary.LittleEndian.Uint32(data[8:]))
360 if batchLen < 0 {
361 return 0, 0, newErrBatchCorrupted("invalid records length")
362 }
363 return
364 }
365
366 func batchesLen(batches []*Batch) int {
367 batchLen := 0
368 for _, batch := range batches {
369 batchLen += batch.Len()
370 }
371 return batchLen
372 }
373
374 func writeBatchesWithHeader(wr io.Writer, batches []*Batch, seq uint64) error {
375 if _, err := wr.Write(encodeBatchHeader(nil, seq, batchesLen(batches))); err != nil {
376 return err
377 }
378 for _, batch := range batches {
379 if _, err := wr.Write(batch.data); err != nil {
380 return err
381 }
382 }
383 return nil
384 }
385
View as plain text