1
2
3
4
5
6 package s2
7
8 import (
9 "crypto/rand"
10 "encoding/binary"
11 "errors"
12 "fmt"
13 "io"
14 "runtime"
15 "sync"
16
17 "github.com/klauspost/compress/internal/race"
18 )
19
20 const (
21 levelUncompressed = iota + 1
22 levelFast
23 levelBetter
24 levelBest
25 )
26
27
28
29
30
31
32
33
34 func NewWriter(w io.Writer, opts ...WriterOption) *Writer {
35 w2 := Writer{
36 blockSize: defaultBlockSize,
37 concurrency: runtime.GOMAXPROCS(0),
38 randSrc: rand.Reader,
39 level: levelFast,
40 }
41 for _, opt := range opts {
42 if err := opt(&w2); err != nil {
43 w2.errState = err
44 return &w2
45 }
46 }
47 w2.obufLen = obufHeaderLen + MaxEncodedLen(w2.blockSize)
48 w2.paramsOK = true
49 w2.ibuf = make([]byte, 0, w2.blockSize)
50 w2.buffers.New = func() interface{} {
51 return make([]byte, w2.obufLen)
52 }
53 w2.Reset(w)
54 return &w2
55 }
56
57
58 type Writer struct {
59 errMu sync.Mutex
60 errState error
61
62
63 ibuf []byte
64
65 blockSize int
66 obufLen int
67 concurrency int
68 written int64
69 uncompWritten int64
70 output chan chan result
71 buffers sync.Pool
72 pad int
73
74 writer io.Writer
75 randSrc io.Reader
76 writerWg sync.WaitGroup
77 index Index
78 customEnc func(dst, src []byte) int
79
80
81 wroteStreamHeader bool
82 paramsOK bool
83 snappy bool
84 flushOnWrite bool
85 appendIndex bool
86 level uint8
87 }
88
89 type result struct {
90 b []byte
91
92 startOffset int64
93 }
94
95
96
97 func (w *Writer) err(err error) error {
98 w.errMu.Lock()
99 errSet := w.errState
100 if errSet == nil && err != nil {
101 w.errState = err
102 errSet = err
103 }
104 w.errMu.Unlock()
105 return errSet
106 }
107
108
109
110 func (w *Writer) Reset(writer io.Writer) {
111 if !w.paramsOK {
112 return
113 }
114
115 if w.output != nil {
116 close(w.output)
117 w.writerWg.Wait()
118 w.output = nil
119 }
120 w.errState = nil
121 w.ibuf = w.ibuf[:0]
122 w.wroteStreamHeader = false
123 w.written = 0
124 w.writer = writer
125 w.uncompWritten = 0
126 w.index.reset(w.blockSize)
127
128
129 if writer == nil {
130 return
131 }
132
133 if w.concurrency == 1 {
134 return
135 }
136
137 toWrite := make(chan chan result, w.concurrency)
138 w.output = toWrite
139 w.writerWg.Add(1)
140
141
142 go func() {
143 defer w.writerWg.Done()
144
145
146 for write := range toWrite {
147
148 input := <-write
149 in := input.b
150 if len(in) > 0 {
151 if w.err(nil) == nil {
152
153 toWrite := in[:len(in):len(in)]
154
155 n, err := writer.Write(toWrite)
156 if err == nil && n != len(toWrite) {
157 err = io.ErrShortBuffer
158 }
159 _ = w.err(err)
160 w.err(w.index.add(w.written, input.startOffset))
161 w.written += int64(n)
162 }
163 }
164 if cap(in) >= w.obufLen {
165 w.buffers.Put(in)
166 }
167
168
169 close(write)
170 }
171 }()
172 }
173
174
175 func (w *Writer) Write(p []byte) (nRet int, errRet error) {
176 if err := w.err(nil); err != nil {
177 return 0, err
178 }
179 if w.flushOnWrite {
180 return w.write(p)
181 }
182
183 for len(p) > (cap(w.ibuf)-len(w.ibuf)) && w.err(nil) == nil {
184 var n int
185 if len(w.ibuf) == 0 {
186
187
188 n, _ = w.write(p)
189 } else {
190 n = copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
191 w.ibuf = w.ibuf[:len(w.ibuf)+n]
192 w.write(w.ibuf)
193 w.ibuf = w.ibuf[:0]
194 }
195 nRet += n
196 p = p[n:]
197 }
198 if err := w.err(nil); err != nil {
199 return nRet, err
200 }
201
202 n := copy(w.ibuf[len(w.ibuf):cap(w.ibuf)], p)
203 w.ibuf = w.ibuf[:len(w.ibuf)+n]
204 nRet += n
205 return nRet, nil
206 }
207
208
209
210
211
212
213 func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
214 if err := w.err(nil); err != nil {
215 return 0, err
216 }
217 if len(w.ibuf) > 0 {
218 err := w.AsyncFlush()
219 if err != nil {
220 return 0, err
221 }
222 }
223 if br, ok := r.(byter); ok {
224 buf := br.Bytes()
225 if err := w.EncodeBuffer(buf); err != nil {
226 return 0, err
227 }
228 return int64(len(buf)), w.AsyncFlush()
229 }
230 for {
231 inbuf := w.buffers.Get().([]byte)[:w.blockSize+obufHeaderLen]
232 n2, err := io.ReadFull(r, inbuf[obufHeaderLen:])
233 if err != nil {
234 if err == io.ErrUnexpectedEOF {
235 err = io.EOF
236 }
237 if err != io.EOF {
238 return n, w.err(err)
239 }
240 }
241 if n2 == 0 {
242 break
243 }
244 n += int64(n2)
245 err2 := w.writeFull(inbuf[:n2+obufHeaderLen])
246 if w.err(err2) != nil {
247 break
248 }
249
250 if err != nil {
251
252 break
253 }
254 }
255
256 return n, w.err(nil)
257 }
258
259
260
261
262 func (w *Writer) AddSkippableBlock(id uint8, data []byte) (err error) {
263 if err := w.err(nil); err != nil {
264 return err
265 }
266 if len(data) == 0 {
267 return nil
268 }
269 if id < 0x80 || id > chunkTypePadding {
270 return fmt.Errorf("invalid skippable block id %x", id)
271 }
272 if len(data) > maxChunkSize {
273 return fmt.Errorf("skippable block excessed maximum size")
274 }
275 var header [4]byte
276 chunkLen := len(data)
277 header[0] = id
278 header[1] = uint8(chunkLen >> 0)
279 header[2] = uint8(chunkLen >> 8)
280 header[3] = uint8(chunkLen >> 16)
281 if w.concurrency == 1 {
282 write := func(b []byte) error {
283 n, err := w.writer.Write(b)
284 if err = w.err(err); err != nil {
285 return err
286 }
287 if n != len(b) {
288 return w.err(io.ErrShortWrite)
289 }
290 w.written += int64(n)
291 return w.err(nil)
292 }
293 if !w.wroteStreamHeader {
294 w.wroteStreamHeader = true
295 if w.snappy {
296 if err := write([]byte(magicChunkSnappy)); err != nil {
297 return err
298 }
299 } else {
300 if err := write([]byte(magicChunk)); err != nil {
301 return err
302 }
303 }
304 }
305 if err := write(header[:]); err != nil {
306 return err
307 }
308 return write(data)
309 }
310
311
312 if !w.wroteStreamHeader {
313 w.wroteStreamHeader = true
314 hWriter := make(chan result)
315 w.output <- hWriter
316 if w.snappy {
317 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
318 } else {
319 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
320 }
321 }
322
323
324 inbuf := w.buffers.Get().([]byte)[:4]
325 copy(inbuf, header[:])
326 inbuf = append(inbuf, data...)
327
328 output := make(chan result, 1)
329
330 w.output <- output
331 output <- result{startOffset: w.uncompWritten, b: inbuf}
332
333 return nil
334 }
335
336
337
338
339
340
341
342
343
344
345
346 func (w *Writer) EncodeBuffer(buf []byte) (err error) {
347 if err := w.err(nil); err != nil {
348 return err
349 }
350
351 if w.flushOnWrite {
352 _, err := w.write(buf)
353 return err
354 }
355
356 if len(w.ibuf) > 0 {
357 err := w.AsyncFlush()
358 if err != nil {
359 return err
360 }
361 }
362 if w.concurrency == 1 {
363 _, err := w.writeSync(buf)
364 return err
365 }
366
367
368 if !w.wroteStreamHeader {
369 w.wroteStreamHeader = true
370 hWriter := make(chan result)
371 w.output <- hWriter
372 if w.snappy {
373 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
374 } else {
375 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
376 }
377 }
378
379 for len(buf) > 0 {
380
381 uncompressed := buf
382 if len(uncompressed) > w.blockSize {
383 uncompressed = uncompressed[:w.blockSize]
384 }
385 buf = buf[len(uncompressed):]
386
387 obuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
388 race.WriteSlice(obuf)
389
390 output := make(chan result)
391
392 w.output <- output
393 res := result{
394 startOffset: w.uncompWritten,
395 }
396 w.uncompWritten += int64(len(uncompressed))
397 go func() {
398 race.ReadSlice(uncompressed)
399
400 checksum := crc(uncompressed)
401
402
403 chunkType := uint8(chunkTypeUncompressedData)
404 chunkLen := 4 + len(uncompressed)
405
406
407 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
408 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
409
410
411 if n2 > 0 {
412 chunkType = uint8(chunkTypeCompressedData)
413 chunkLen = 4 + n + n2
414 obuf = obuf[:obufHeaderLen+n+n2]
415 } else {
416
417 copy(obuf[obufHeaderLen:], uncompressed)
418 }
419
420
421 obuf[0] = chunkType
422 obuf[1] = uint8(chunkLen >> 0)
423 obuf[2] = uint8(chunkLen >> 8)
424 obuf[3] = uint8(chunkLen >> 16)
425 obuf[4] = uint8(checksum >> 0)
426 obuf[5] = uint8(checksum >> 8)
427 obuf[6] = uint8(checksum >> 16)
428 obuf[7] = uint8(checksum >> 24)
429
430
431 res.b = obuf
432 output <- res
433 }()
434 }
435 return nil
436 }
437
438 func (w *Writer) encodeBlock(obuf, uncompressed []byte) int {
439 if w.customEnc != nil {
440 if ret := w.customEnc(obuf, uncompressed); ret >= 0 {
441 return ret
442 }
443 }
444 if w.snappy {
445 switch w.level {
446 case levelFast:
447 return encodeBlockSnappy(obuf, uncompressed)
448 case levelBetter:
449 return encodeBlockBetterSnappy(obuf, uncompressed)
450 case levelBest:
451 return encodeBlockBestSnappy(obuf, uncompressed)
452 }
453 return 0
454 }
455 switch w.level {
456 case levelFast:
457 return encodeBlock(obuf, uncompressed)
458 case levelBetter:
459 return encodeBlockBetter(obuf, uncompressed)
460 case levelBest:
461 return encodeBlockBest(obuf, uncompressed, nil)
462 }
463 return 0
464 }
465
466 func (w *Writer) write(p []byte) (nRet int, errRet error) {
467 if err := w.err(nil); err != nil {
468 return 0, err
469 }
470 if w.concurrency == 1 {
471 return w.writeSync(p)
472 }
473
474
475 for len(p) > 0 {
476 if !w.wroteStreamHeader {
477 w.wroteStreamHeader = true
478 hWriter := make(chan result)
479 w.output <- hWriter
480 if w.snappy {
481 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
482 } else {
483 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
484 }
485 }
486
487 var uncompressed []byte
488 if len(p) > w.blockSize {
489 uncompressed, p = p[:w.blockSize], p[w.blockSize:]
490 } else {
491 uncompressed, p = p, nil
492 }
493
494
495
496 inbuf := w.buffers.Get().([]byte)[:len(uncompressed)+obufHeaderLen]
497 obuf := w.buffers.Get().([]byte)[:w.obufLen]
498 copy(inbuf[obufHeaderLen:], uncompressed)
499 uncompressed = inbuf[obufHeaderLen:]
500
501 output := make(chan result)
502
503 w.output <- output
504 res := result{
505 startOffset: w.uncompWritten,
506 }
507 w.uncompWritten += int64(len(uncompressed))
508
509 go func() {
510 checksum := crc(uncompressed)
511
512
513 chunkType := uint8(chunkTypeUncompressedData)
514 chunkLen := 4 + len(uncompressed)
515
516
517 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
518 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
519
520
521 if n2 > 0 {
522 chunkType = uint8(chunkTypeCompressedData)
523 chunkLen = 4 + n + n2
524 obuf = obuf[:obufHeaderLen+n+n2]
525 } else {
526
527 obuf, inbuf = inbuf, obuf
528 }
529
530
531 obuf[0] = chunkType
532 obuf[1] = uint8(chunkLen >> 0)
533 obuf[2] = uint8(chunkLen >> 8)
534 obuf[3] = uint8(chunkLen >> 16)
535 obuf[4] = uint8(checksum >> 0)
536 obuf[5] = uint8(checksum >> 8)
537 obuf[6] = uint8(checksum >> 16)
538 obuf[7] = uint8(checksum >> 24)
539
540
541 res.b = obuf
542 output <- res
543
544
545 w.buffers.Put(inbuf)
546 }()
547 nRet += len(uncompressed)
548 }
549 return nRet, nil
550 }
551
552
553
554
555
556 func (w *Writer) writeFull(inbuf []byte) (errRet error) {
557 if err := w.err(nil); err != nil {
558 return err
559 }
560
561 if w.concurrency == 1 {
562 _, err := w.writeSync(inbuf[obufHeaderLen:])
563 return err
564 }
565
566
567 if !w.wroteStreamHeader {
568 w.wroteStreamHeader = true
569 hWriter := make(chan result)
570 w.output <- hWriter
571 if w.snappy {
572 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunkSnappy)}
573 } else {
574 hWriter <- result{startOffset: w.uncompWritten, b: []byte(magicChunk)}
575 }
576 }
577
578
579 obuf := w.buffers.Get().([]byte)[:w.obufLen]
580 uncompressed := inbuf[obufHeaderLen:]
581
582 output := make(chan result)
583
584 w.output <- output
585 res := result{
586 startOffset: w.uncompWritten,
587 }
588 w.uncompWritten += int64(len(uncompressed))
589
590 go func() {
591 checksum := crc(uncompressed)
592
593
594 chunkType := uint8(chunkTypeUncompressedData)
595 chunkLen := 4 + len(uncompressed)
596
597
598 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
599 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
600
601
602 if n2 > 0 {
603 chunkType = uint8(chunkTypeCompressedData)
604 chunkLen = 4 + n + n2
605 obuf = obuf[:obufHeaderLen+n+n2]
606 } else {
607
608 obuf, inbuf = inbuf, obuf
609 }
610
611
612 obuf[0] = chunkType
613 obuf[1] = uint8(chunkLen >> 0)
614 obuf[2] = uint8(chunkLen >> 8)
615 obuf[3] = uint8(chunkLen >> 16)
616 obuf[4] = uint8(checksum >> 0)
617 obuf[5] = uint8(checksum >> 8)
618 obuf[6] = uint8(checksum >> 16)
619 obuf[7] = uint8(checksum >> 24)
620
621
622 res.b = obuf
623 output <- res
624
625
626 w.buffers.Put(inbuf)
627 }()
628 return nil
629 }
630
631 func (w *Writer) writeSync(p []byte) (nRet int, errRet error) {
632 if err := w.err(nil); err != nil {
633 return 0, err
634 }
635 if !w.wroteStreamHeader {
636 w.wroteStreamHeader = true
637 var n int
638 var err error
639 if w.snappy {
640 n, err = w.writer.Write([]byte(magicChunkSnappy))
641 } else {
642 n, err = w.writer.Write([]byte(magicChunk))
643 }
644 if err != nil {
645 return 0, w.err(err)
646 }
647 if n != len(magicChunk) {
648 return 0, w.err(io.ErrShortWrite)
649 }
650 w.written += int64(n)
651 }
652
653 for len(p) > 0 {
654 var uncompressed []byte
655 if len(p) > w.blockSize {
656 uncompressed, p = p[:w.blockSize], p[w.blockSize:]
657 } else {
658 uncompressed, p = p, nil
659 }
660
661 obuf := w.buffers.Get().([]byte)[:w.obufLen]
662 checksum := crc(uncompressed)
663
664
665 chunkType := uint8(chunkTypeUncompressedData)
666 chunkLen := 4 + len(uncompressed)
667
668
669 n := binary.PutUvarint(obuf[obufHeaderLen:], uint64(len(uncompressed)))
670 n2 := w.encodeBlock(obuf[obufHeaderLen+n:], uncompressed)
671
672 if n2 > 0 {
673 chunkType = uint8(chunkTypeCompressedData)
674 chunkLen = 4 + n + n2
675 obuf = obuf[:obufHeaderLen+n+n2]
676 } else {
677 obuf = obuf[:8]
678 }
679
680
681 obuf[0] = chunkType
682 obuf[1] = uint8(chunkLen >> 0)
683 obuf[2] = uint8(chunkLen >> 8)
684 obuf[3] = uint8(chunkLen >> 16)
685 obuf[4] = uint8(checksum >> 0)
686 obuf[5] = uint8(checksum >> 8)
687 obuf[6] = uint8(checksum >> 16)
688 obuf[7] = uint8(checksum >> 24)
689
690 n, err := w.writer.Write(obuf)
691 if err != nil {
692 return 0, w.err(err)
693 }
694 if n != len(obuf) {
695 return 0, w.err(io.ErrShortWrite)
696 }
697 w.err(w.index.add(w.written, w.uncompWritten))
698 w.written += int64(n)
699 w.uncompWritten += int64(len(uncompressed))
700
701 if chunkType == chunkTypeUncompressedData {
702
703 n, err := w.writer.Write(uncompressed)
704 if err != nil {
705 return 0, w.err(err)
706 }
707 if n != len(uncompressed) {
708 return 0, w.err(io.ErrShortWrite)
709 }
710 w.written += int64(n)
711 }
712 w.buffers.Put(obuf)
713
714 nRet += len(uncompressed)
715 }
716 return nRet, nil
717 }
718
719
720
721 func (w *Writer) AsyncFlush() error {
722 if err := w.err(nil); err != nil {
723 return err
724 }
725
726
727 if len(w.ibuf) != 0 {
728 if !w.wroteStreamHeader {
729 _, err := w.writeSync(w.ibuf)
730 w.ibuf = w.ibuf[:0]
731 return w.err(err)
732 } else {
733 _, err := w.write(w.ibuf)
734 w.ibuf = w.ibuf[:0]
735 err = w.err(err)
736 if err != nil {
737 return err
738 }
739 }
740 }
741 return w.err(nil)
742 }
743
744
745
746 func (w *Writer) Flush() error {
747 if err := w.AsyncFlush(); err != nil {
748 return err
749 }
750 if w.output == nil {
751 return w.err(nil)
752 }
753
754
755 res := make(chan result)
756 w.output <- res
757
758 res <- result{b: nil, startOffset: w.uncompWritten}
759
760 <-res
761 return w.err(nil)
762 }
763
764
765
766
767 func (w *Writer) Close() error {
768 _, err := w.closeIndex(w.appendIndex)
769 return err
770 }
771
772
773
774 func (w *Writer) CloseIndex() ([]byte, error) {
775 return w.closeIndex(true)
776 }
777
778 func (w *Writer) closeIndex(idx bool) ([]byte, error) {
779 err := w.Flush()
780 if w.output != nil {
781 close(w.output)
782 w.writerWg.Wait()
783 w.output = nil
784 }
785
786 var index []byte
787 if w.err(err) == nil && w.writer != nil {
788
789 if idx {
790 compSize := int64(-1)
791 if w.pad <= 1 {
792 compSize = w.written
793 }
794 index = w.index.appendTo(w.ibuf[:0], w.uncompWritten, compSize)
795
796 if w.appendIndex {
797 w.written += int64(len(index))
798 }
799 }
800
801 if w.pad > 1 {
802 tmp := w.ibuf[:0]
803 if len(index) > 0 {
804
805 tmp = w.buffers.Get().([]byte)[:0]
806 defer w.buffers.Put(tmp)
807 }
808 add := calcSkippableFrame(w.written, int64(w.pad))
809 frame, err := skippableFrame(tmp, add, w.randSrc)
810 if err = w.err(err); err != nil {
811 return nil, err
812 }
813 n, err2 := w.writer.Write(frame)
814 if err2 == nil && n != len(frame) {
815 err2 = io.ErrShortWrite
816 }
817 _ = w.err(err2)
818 }
819 if len(index) > 0 && w.appendIndex {
820 n, err2 := w.writer.Write(index)
821 if err2 == nil && n != len(index) {
822 err2 = io.ErrShortWrite
823 }
824 _ = w.err(err2)
825 }
826 }
827 err = w.err(errClosed)
828 if err == errClosed {
829 return index, nil
830 }
831 return nil, err
832 }
833
834
835
836
837
838 func calcSkippableFrame(written, wantMultiple int64) int {
839 if wantMultiple <= 0 {
840 panic("wantMultiple <= 0")
841 }
842 if written < 0 {
843 panic("written < 0")
844 }
845 leftOver := written % wantMultiple
846 if leftOver == 0 {
847 return 0
848 }
849 toAdd := wantMultiple - leftOver
850 for toAdd < skippableFrameHeader {
851 toAdd += wantMultiple
852 }
853 return int(toAdd)
854 }
855
856
857
858 func skippableFrame(dst []byte, total int, r io.Reader) ([]byte, error) {
859 if total == 0 {
860 return dst, nil
861 }
862 if total < skippableFrameHeader {
863 return dst, fmt.Errorf("s2: requested skippable frame (%d) < 4", total)
864 }
865 if int64(total) >= maxBlockSize+skippableFrameHeader {
866 return dst, fmt.Errorf("s2: requested skippable frame (%d) >= max 1<<24", total)
867 }
868
869 dst = append(dst, chunkTypePadding)
870 f := uint32(total - skippableFrameHeader)
871
872 dst = append(dst, uint8(f), uint8(f>>8), uint8(f>>16))
873
874 start := len(dst)
875 dst = append(dst, make([]byte, f)...)
876 _, err := io.ReadFull(r, dst[start:])
877 return dst, err
878 }
879
880 var errClosed = errors.New("s2: Writer is closed")
881
882
883 type WriterOption func(*Writer) error
884
885
886
887
888
889 func WriterConcurrency(n int) WriterOption {
890 return func(w *Writer) error {
891 if n <= 0 {
892 return errors.New("concurrency must be at least 1")
893 }
894 w.concurrency = n
895 return nil
896 }
897 }
898
899
900
901 func WriterAddIndex() WriterOption {
902 return func(w *Writer) error {
903 w.appendIndex = true
904 return nil
905 }
906 }
907
908
909
910
911 func WriterBetterCompression() WriterOption {
912 return func(w *Writer) error {
913 w.level = levelBetter
914 return nil
915 }
916 }
917
918
919
920
921 func WriterBestCompression() WriterOption {
922 return func(w *Writer) error {
923 w.level = levelBest
924 return nil
925 }
926 }
927
928
929
930
931 func WriterUncompressed() WriterOption {
932 return func(w *Writer) error {
933 w.level = levelUncompressed
934 return nil
935 }
936 }
937
938
939
940
941
942
943
944
945
946
947
948
949 func WriterBlockSize(n int) WriterOption {
950 return func(w *Writer) error {
951 if w.snappy && n > maxSnappyBlockSize || n < minBlockSize {
952 return errors.New("s2: block size too large. Must be <= 64K and >=4KB on for snappy compatible output")
953 }
954 if n > maxBlockSize || n < minBlockSize {
955 return errors.New("s2: block size too large. Must be <= 4MB and >=4KB")
956 }
957 w.blockSize = n
958 return nil
959 }
960 }
961
962
963
964
965
966
967
968 func WriterPadding(n int) WriterOption {
969 return func(w *Writer) error {
970 if n <= 0 {
971 return fmt.Errorf("s2: padding must be at least 1")
972 }
973
974 if n == 1 {
975 w.pad = 0
976 }
977 if n > maxBlockSize {
978 return fmt.Errorf("s2: padding must less than 4MB")
979 }
980 w.pad = n
981 return nil
982 }
983 }
984
985
986
987 func WriterPaddingSrc(reader io.Reader) WriterOption {
988 return func(w *Writer) error {
989 w.randSrc = reader
990 return nil
991 }
992 }
993
994
995
996
997 func WriterSnappyCompat() WriterOption {
998 return func(w *Writer) error {
999 w.snappy = true
1000 if w.blockSize > 64<<10 {
1001
1002
1003 w.blockSize = (64 << 10) - 8
1004 }
1005 return nil
1006 }
1007 }
1008
1009
1010
1011
1012
1013
1014
1015 func WriterFlushOnWrite() WriterOption {
1016 return func(w *Writer) error {
1017 w.flushOnWrite = true
1018 return nil
1019 }
1020 }
1021
1022
1023
1024
1025
1026
1027
1028 func WriterCustomEncoder(fn func(dst, src []byte) int) WriterOption {
1029 return func(w *Writer) error {
1030 w.customEnc = fn
1031 return nil
1032 }
1033 }
1034
View as plain text