1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78 package journal
79
80 import (
81 "encoding/binary"
82 "fmt"
83 "io"
84
85 "github.com/syndtr/goleveldb/leveldb/errors"
86 "github.com/syndtr/goleveldb/leveldb/storage"
87 "github.com/syndtr/goleveldb/leveldb/util"
88 )
89
90
91 const (
92 fullChunkType = 1
93 firstChunkType = 2
94 middleChunkType = 3
95 lastChunkType = 4
96 )
97
98 const (
99 blockSize = 32 * 1024
100 headerSize = 7
101 )
102
103 type flusher interface {
104 Flush() error
105 }
106
107
108 type ErrCorrupted struct {
109 Size int
110 Reason string
111 }
112
113 func (e *ErrCorrupted) Error() string {
114 return fmt.Sprintf("leveldb/journal: block/chunk corrupted: %s (%d bytes)", e.Reason, e.Size)
115 }
116
117
118
119 type Dropper interface {
120 Drop(err error)
121 }
122
123
124 type Reader struct {
125
126 r io.Reader
127
128 dropper Dropper
129
130 strict bool
131
132 checksum bool
133
134 seq int
135
136
137 i, j int
138
139
140 n int
141
142 last bool
143
144 err error
145
146 buf [blockSize]byte
147 }
148
149
150
151
152 func NewReader(r io.Reader, dropper Dropper, strict, checksum bool) *Reader {
153 return &Reader{
154 r: r,
155 dropper: dropper,
156 strict: strict,
157 checksum: checksum,
158 last: true,
159 }
160 }
161
162 var errSkip = errors.New("leveldb/journal: skipped")
163
164 func (r *Reader) corrupt(n int, reason string, skip bool) error {
165 if r.dropper != nil {
166 r.dropper.Drop(&ErrCorrupted{n, reason})
167 }
168 if r.strict && !skip {
169 r.err = errors.NewErrCorrupted(storage.FileDesc{}, &ErrCorrupted{n, reason})
170 return r.err
171 }
172 return errSkip
173 }
174
175
176
177 func (r *Reader) nextChunk(first bool) error {
178 for {
179 if r.j+headerSize <= r.n {
180 checksum := binary.LittleEndian.Uint32(r.buf[r.j+0 : r.j+4])
181 length := binary.LittleEndian.Uint16(r.buf[r.j+4 : r.j+6])
182 chunkType := r.buf[r.j+6]
183 unprocBlock := r.n - r.j
184 if checksum == 0 && length == 0 && chunkType == 0 {
185
186 r.i = r.n
187 r.j = r.n
188 return r.corrupt(unprocBlock, "zero header", false)
189 }
190 if chunkType < fullChunkType || chunkType > lastChunkType {
191
192 r.i = r.n
193 r.j = r.n
194 return r.corrupt(unprocBlock, fmt.Sprintf("invalid chunk type %#x", chunkType), false)
195 }
196 r.i = r.j + headerSize
197 r.j = r.j + headerSize + int(length)
198 if r.j > r.n {
199
200 r.i = r.n
201 r.j = r.n
202 return r.corrupt(unprocBlock, "chunk length overflows block", false)
203 } else if r.checksum && checksum != util.NewCRC(r.buf[r.i-1:r.j]).Value() {
204
205 r.i = r.n
206 r.j = r.n
207 return r.corrupt(unprocBlock, "checksum mismatch", false)
208 }
209 if first && chunkType != fullChunkType && chunkType != firstChunkType {
210 chunkLength := (r.j - r.i) + headerSize
211 r.i = r.j
212
213 return r.corrupt(chunkLength, "orphan chunk", true)
214 }
215 r.last = chunkType == fullChunkType || chunkType == lastChunkType
216 return nil
217 }
218
219
220 if r.n < blockSize && r.n > 0 {
221 if !first {
222 return r.corrupt(0, "missing chunk part", false)
223 }
224 r.err = io.EOF
225 return r.err
226 }
227
228
229 n, err := io.ReadFull(r.r, r.buf[:])
230 if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF {
231 return err
232 }
233 if n == 0 {
234 if !first {
235 return r.corrupt(0, "missing chunk part", false)
236 }
237 r.err = io.EOF
238 return r.err
239 }
240 r.i, r.j, r.n = 0, 0, n
241 }
242 }
243
244
245
246
247
248 func (r *Reader) Next() (io.Reader, error) {
249 r.seq++
250 if r.err != nil {
251 return nil, r.err
252 }
253 r.i = r.j
254 for {
255 if err := r.nextChunk(true); err == nil {
256 break
257 } else if err != errSkip {
258 return nil, err
259 }
260 }
261 return &singleReader{r, r.seq, nil}, nil
262 }
263
264
265
266 func (r *Reader) Reset(reader io.Reader, dropper Dropper, strict, checksum bool) error {
267 r.seq++
268 err := r.err
269 r.r = reader
270 r.dropper = dropper
271 r.strict = strict
272 r.checksum = checksum
273 r.i = 0
274 r.j = 0
275 r.n = 0
276 r.last = true
277 r.err = nil
278 return err
279 }
280
281 type singleReader struct {
282 r *Reader
283 seq int
284 err error
285 }
286
287 func (x *singleReader) Read(p []byte) (int, error) {
288 r := x.r
289 if r.seq != x.seq {
290 return 0, errors.New("leveldb/journal: stale reader")
291 }
292 if x.err != nil {
293 return 0, x.err
294 }
295 if r.err != nil {
296 return 0, r.err
297 }
298 for r.i == r.j {
299 if r.last {
300 return 0, io.EOF
301 }
302 x.err = r.nextChunk(false)
303 if x.err != nil {
304 if x.err == errSkip {
305 x.err = io.ErrUnexpectedEOF
306 }
307 return 0, x.err
308 }
309 }
310 n := copy(p, r.buf[r.i:r.j])
311 r.i += n
312 return n, nil
313 }
314
315 func (x *singleReader) ReadByte() (byte, error) {
316 r := x.r
317 if r.seq != x.seq {
318 return 0, errors.New("leveldb/journal: stale reader")
319 }
320 if x.err != nil {
321 return 0, x.err
322 }
323 if r.err != nil {
324 return 0, r.err
325 }
326 for r.i == r.j {
327 if r.last {
328 return 0, io.EOF
329 }
330 x.err = r.nextChunk(false)
331 if x.err != nil {
332 if x.err == errSkip {
333 x.err = io.ErrUnexpectedEOF
334 }
335 return 0, x.err
336 }
337 }
338 c := r.buf[r.i]
339 r.i++
340 return c, nil
341 }
342
343
344 type Writer struct {
345
346 w io.Writer
347
348 seq int
349
350 f flusher
351
352
353 i, j int
354
355
356 written int
357
358 blockNumber int64
359
360 first bool
361
362 pending bool
363
364 err error
365
366 buf [blockSize]byte
367 }
368
369
370 func NewWriter(w io.Writer) *Writer {
371 f, _ := w.(flusher)
372 return &Writer{
373 w: w,
374 f: f,
375 }
376 }
377
378
379 func (w *Writer) fillHeader(last bool) {
380 if w.i+headerSize > w.j || w.j > blockSize {
381 panic("leveldb/journal: bad writer state")
382 }
383 if last {
384 if w.first {
385 w.buf[w.i+6] = fullChunkType
386 } else {
387 w.buf[w.i+6] = lastChunkType
388 }
389 } else {
390 if w.first {
391 w.buf[w.i+6] = firstChunkType
392 } else {
393 w.buf[w.i+6] = middleChunkType
394 }
395 }
396 binary.LittleEndian.PutUint32(w.buf[w.i+0:w.i+4], util.NewCRC(w.buf[w.i+6:w.j]).Value())
397 binary.LittleEndian.PutUint16(w.buf[w.i+4:w.i+6], uint16(w.j-w.i-headerSize))
398 }
399
400
401
402 func (w *Writer) writeBlock() {
403 _, w.err = w.w.Write(w.buf[w.written:])
404 w.i = 0
405 w.j = headerSize
406 w.written = 0
407 w.blockNumber++
408 }
409
410
411
412 func (w *Writer) writePending() {
413 if w.err != nil {
414 return
415 }
416 if w.pending {
417 w.fillHeader(true)
418 w.pending = false
419 }
420 _, w.err = w.w.Write(w.buf[w.written:w.j])
421 w.written = w.j
422 }
423
424
425 func (w *Writer) Close() error {
426 w.seq++
427 w.writePending()
428 if w.err != nil {
429 return w.err
430 }
431 w.err = errors.New("leveldb/journal: closed Writer")
432 return nil
433 }
434
435
436
437 func (w *Writer) Flush() error {
438 w.seq++
439 w.writePending()
440 if w.err != nil {
441 return w.err
442 }
443 if w.f != nil {
444 w.err = w.f.Flush()
445 return w.err
446 }
447 return nil
448 }
449
450
451
452 func (w *Writer) Reset(writer io.Writer) (err error) {
453 w.seq++
454 if w.err == nil {
455 w.writePending()
456 err = w.err
457 }
458 w.w = writer
459 w.f, _ = writer.(flusher)
460 w.i = 0
461 w.j = 0
462 w.written = 0
463 w.blockNumber = 0
464 w.first = false
465 w.pending = false
466 w.err = nil
467 return
468 }
469
470
471
472 func (w *Writer) Next() (io.Writer, error) {
473 w.seq++
474 if w.err != nil {
475 return nil, w.err
476 }
477 if w.pending {
478 w.fillHeader(true)
479 }
480 w.i = w.j
481 w.j += headerSize
482
483 if w.j > blockSize {
484
485 for k := w.i; k < blockSize; k++ {
486 w.buf[k] = 0
487 }
488 w.writeBlock()
489 if w.err != nil {
490 return nil, w.err
491 }
492 }
493 w.first = true
494 w.pending = true
495 return singleWriter{w, w.seq}, nil
496 }
497
498
499 func (w *Writer) Size() int64 {
500 if w == nil {
501 return 0
502 }
503 return w.blockNumber*blockSize + int64(w.j)
504 }
505
506 type singleWriter struct {
507 w *Writer
508 seq int
509 }
510
511 func (x singleWriter) Write(p []byte) (int, error) {
512 w := x.w
513 if w.seq != x.seq {
514 return 0, errors.New("leveldb/journal: stale writer")
515 }
516 if w.err != nil {
517 return 0, w.err
518 }
519 n0 := len(p)
520 for len(p) > 0 {
521
522 if w.j == blockSize {
523 w.fillHeader(false)
524 w.writeBlock()
525 if w.err != nil {
526 return 0, w.err
527 }
528 w.first = false
529 }
530
531 n := copy(w.buf[w.j:], p)
532 w.j += n
533 p = p[n:]
534 }
535 return n0, nil
536 }
537
View as plain text