1
2
3
4
5
6
7 package table
8
9 import (
10 "encoding/binary"
11 "errors"
12 "fmt"
13 "io"
14
15 "github.com/golang/snappy"
16
17 "github.com/syndtr/goleveldb/leveldb/comparer"
18 "github.com/syndtr/goleveldb/leveldb/filter"
19 "github.com/syndtr/goleveldb/leveldb/opt"
20 "github.com/syndtr/goleveldb/leveldb/util"
21 )
22
23 func sharedPrefixLen(a, b []byte) int {
24 i, n := 0, len(a)
25 if n > len(b) {
26 n = len(b)
27 }
28 for i < n && a[i] == b[i] {
29 i++
30 }
31 return i
32 }
33
34 type blockWriter struct {
35 restartInterval int
36 buf util.Buffer
37 nEntries int
38 prevKey []byte
39 restarts []uint32
40 scratch []byte
41 }
42
43 func (w *blockWriter) append(key, value []byte) (err error) {
44 nShared := 0
45 if w.nEntries%w.restartInterval == 0 {
46 w.restarts = append(w.restarts, uint32(w.buf.Len()))
47 } else {
48 nShared = sharedPrefixLen(w.prevKey, key)
49 }
50 n := binary.PutUvarint(w.scratch[0:], uint64(nShared))
51 n += binary.PutUvarint(w.scratch[n:], uint64(len(key)-nShared))
52 n += binary.PutUvarint(w.scratch[n:], uint64(len(value)))
53 if _, err = w.buf.Write(w.scratch[:n]); err != nil {
54 return err
55 }
56 if _, err = w.buf.Write(key[nShared:]); err != nil {
57 return err
58 }
59 if _, err = w.buf.Write(value); err != nil {
60 return err
61 }
62 w.prevKey = append(w.prevKey[:0], key...)
63 w.nEntries++
64 return nil
65 }
66
67 func (w *blockWriter) finish() error {
68
69 if w.nEntries == 0 {
70
71 w.restarts = append(w.restarts, 0)
72 }
73 w.restarts = append(w.restarts, uint32(len(w.restarts)))
74 for _, x := range w.restarts {
75 buf4 := w.buf.Alloc(4)
76 binary.LittleEndian.PutUint32(buf4, x)
77 }
78 return nil
79 }
80
81 func (w *blockWriter) reset() {
82 w.buf.Reset()
83 w.nEntries = 0
84 w.restarts = w.restarts[:0]
85 }
86
87 func (w *blockWriter) bytesLen() int {
88 restartsLen := len(w.restarts)
89 if restartsLen == 0 {
90 restartsLen = 1
91 }
92 return w.buf.Len() + 4*restartsLen + 4
93 }
94
95 type filterWriter struct {
96 generator filter.FilterGenerator
97 buf util.Buffer
98 nKeys int
99 offsets []uint32
100 baseLg uint
101 }
102
103 func (w *filterWriter) add(key []byte) {
104 if w.generator == nil {
105 return
106 }
107 w.generator.Add(key)
108 w.nKeys++
109 }
110
111 func (w *filterWriter) flush(offset uint64) {
112 if w.generator == nil {
113 return
114 }
115 for x := int(offset / uint64(1<<w.baseLg)); x > len(w.offsets); {
116 w.generate()
117 }
118 }
119
120 func (w *filterWriter) finish() error {
121 if w.generator == nil {
122 return nil
123 }
124
125
126 if w.nKeys > 0 {
127 w.generate()
128 }
129 w.offsets = append(w.offsets, uint32(w.buf.Len()))
130 for _, x := range w.offsets {
131 buf4 := w.buf.Alloc(4)
132 binary.LittleEndian.PutUint32(buf4, x)
133 }
134 return w.buf.WriteByte(byte(w.baseLg))
135 }
136
137 func (w *filterWriter) generate() {
138
139 w.offsets = append(w.offsets, uint32(w.buf.Len()))
140
141 if w.nKeys > 0 {
142 w.generator.Generate(&w.buf)
143 w.nKeys = 0
144 }
145 }
146
147
148 type Writer struct {
149 writer io.Writer
150 err error
151
152 cmp comparer.Comparer
153 filter filter.Filter
154 compression opt.Compression
155 blockSize int
156
157 bpool *util.BufferPool
158 dataBlock blockWriter
159 indexBlock blockWriter
160 filterBlock filterWriter
161 pendingBH blockHandle
162 offset uint64
163 nEntries int
164
165
166
167 scratch [50]byte
168 comparerScratch []byte
169 compressionScratch []byte
170 }
171
172 func (w *Writer) writeBlock(buf *util.Buffer, compression opt.Compression) (bh blockHandle, err error) {
173
174 var b []byte
175 if compression == opt.SnappyCompression {
176
177 if n := snappy.MaxEncodedLen(buf.Len()) + blockTrailerLen; len(w.compressionScratch) < n {
178 w.compressionScratch = make([]byte, n)
179 }
180 compressed := snappy.Encode(w.compressionScratch, buf.Bytes())
181 n := len(compressed)
182 b = compressed[:n+blockTrailerLen]
183 b[n] = blockTypeSnappyCompression
184 } else {
185 tmp := buf.Alloc(blockTrailerLen)
186 tmp[0] = blockTypeNoCompression
187 b = buf.Bytes()
188 }
189
190
191 n := len(b) - 4
192 checksum := util.NewCRC(b[:n]).Value()
193 binary.LittleEndian.PutUint32(b[n:], checksum)
194
195
196 _, err = w.writer.Write(b)
197 if err != nil {
198 return
199 }
200 bh = blockHandle{w.offset, uint64(len(b) - blockTrailerLen)}
201 w.offset += uint64(len(b))
202 return
203 }
204
205 func (w *Writer) flushPendingBH(key []byte) error {
206 if w.pendingBH.length == 0 {
207 return nil
208 }
209 var separator []byte
210 if len(key) == 0 {
211 separator = w.cmp.Successor(w.comparerScratch[:0], w.dataBlock.prevKey)
212 } else {
213 separator = w.cmp.Separator(w.comparerScratch[:0], w.dataBlock.prevKey, key)
214 }
215 if separator == nil {
216 separator = w.dataBlock.prevKey
217 } else {
218 w.comparerScratch = separator
219 }
220 n := encodeBlockHandle(w.scratch[:20], w.pendingBH)
221
222 if err := w.indexBlock.append(separator, w.scratch[:n]); err != nil {
223 return err
224 }
225
226 w.dataBlock.prevKey = w.dataBlock.prevKey[:0]
227
228 w.pendingBH = blockHandle{}
229 return nil
230 }
231
232 func (w *Writer) finishBlock() error {
233 if err := w.dataBlock.finish(); err != nil {
234 return err
235 }
236 bh, err := w.writeBlock(&w.dataBlock.buf, w.compression)
237 if err != nil {
238 return err
239 }
240 w.pendingBH = bh
241
242 w.dataBlock.reset()
243
244 w.filterBlock.flush(w.offset)
245 return nil
246 }
247
248
249
250
251
252 func (w *Writer) Append(key, value []byte) error {
253 if w.err != nil {
254 return w.err
255 }
256 if w.nEntries > 0 && w.cmp.Compare(w.dataBlock.prevKey, key) >= 0 {
257 w.err = fmt.Errorf("leveldb/table: Writer: keys are not in increasing order: %q, %q", w.dataBlock.prevKey, key)
258 return w.err
259 }
260
261 if err := w.flushPendingBH(key); err != nil {
262 return err
263 }
264
265 if err := w.dataBlock.append(key, value); err != nil {
266 return err
267 }
268
269 w.filterBlock.add(key)
270
271
272 if w.dataBlock.bytesLen() >= w.blockSize {
273 if err := w.finishBlock(); err != nil {
274 w.err = err
275 return w.err
276 }
277 }
278 w.nEntries++
279 return nil
280 }
281
282
283 func (w *Writer) BlocksLen() int {
284 n := w.indexBlock.nEntries
285 if w.pendingBH.length > 0 {
286
287 n++
288 }
289 return n
290 }
291
292
293 func (w *Writer) EntriesLen() int {
294 return w.nEntries
295 }
296
297
298 func (w *Writer) BytesLen() int {
299 return int(w.offset)
300 }
301
302
303
304
305 func (w *Writer) Close() error {
306 defer func() {
307 if w.bpool != nil {
308
309
310
311 w.dataBlock.buf.Reset()
312 w.bpool.Put(w.dataBlock.buf.Bytes())
313 }
314 }()
315
316 if w.err != nil {
317 return w.err
318 }
319
320
321
322 if w.dataBlock.nEntries > 0 || w.nEntries == 0 {
323 if err := w.finishBlock(); err != nil {
324 w.err = err
325 return w.err
326 }
327 }
328 if err := w.flushPendingBH(nil); err != nil {
329 return err
330 }
331
332
333 var filterBH blockHandle
334 if err := w.filterBlock.finish(); err != nil {
335 return err
336 }
337 if buf := &w.filterBlock.buf; buf.Len() > 0 {
338 filterBH, w.err = w.writeBlock(buf, opt.NoCompression)
339 if w.err != nil {
340 return w.err
341 }
342 }
343
344
345 if filterBH.length > 0 {
346 key := []byte("filter." + w.filter.Name())
347 n := encodeBlockHandle(w.scratch[:20], filterBH)
348 if err := w.dataBlock.append(key, w.scratch[:n]); err != nil {
349 return err
350 }
351 }
352 if err := w.dataBlock.finish(); err != nil {
353 return err
354 }
355 metaindexBH, err := w.writeBlock(&w.dataBlock.buf, w.compression)
356 if err != nil {
357 w.err = err
358 return w.err
359 }
360
361
362 if err := w.indexBlock.finish(); err != nil {
363 return err
364 }
365 indexBH, err := w.writeBlock(&w.indexBlock.buf, w.compression)
366 if err != nil {
367 w.err = err
368 return w.err
369 }
370
371
372 footer := w.scratch[:footerLen]
373 for i := range footer {
374 footer[i] = 0
375 }
376 n := encodeBlockHandle(footer, metaindexBH)
377 encodeBlockHandle(footer[n:], indexBH)
378 copy(footer[footerLen-len(magic):], magic)
379 if _, err := w.writer.Write(footer); err != nil {
380 w.err = err
381 return w.err
382 }
383 w.offset += footerLen
384
385 w.err = errors.New("leveldb/table: writer is closed")
386 return nil
387 }
388
389
390
391
392 func NewWriter(f io.Writer, o *opt.Options, pool *util.BufferPool, size int) *Writer {
393 var bufBytes []byte
394 if pool == nil {
395 bufBytes = make([]byte, size)
396 } else {
397 bufBytes = pool.Get(size)
398 }
399 bufBytes = bufBytes[:0]
400
401 w := &Writer{
402 writer: f,
403 cmp: o.GetComparer(),
404 filter: o.GetFilter(),
405 compression: o.GetCompression(),
406 blockSize: o.GetBlockSize(),
407 comparerScratch: make([]byte, 0),
408 bpool: pool,
409 dataBlock: blockWriter{buf: *util.NewBuffer(bufBytes)},
410 }
411
412 w.dataBlock.restartInterval = o.GetBlockRestartInterval()
413
414 w.dataBlock.scratch = w.scratch[20:]
415
416 w.indexBlock.restartInterval = 1
417 w.indexBlock.scratch = w.scratch[20:]
418
419 if w.filter != nil {
420 w.filterBlock.generator = w.filter.NewGenerator()
421 w.filterBlock.baseLg = uint(o.GetFilterBaseLg())
422 w.filterBlock.flush(0)
423 }
424 return w
425 }
426
View as plain text