...
1 package lz4
2
3 import (
4 "io"
5
6 "github.com/pierrec/lz4/v4/internal/lz4block"
7 "github.com/pierrec/lz4/v4/internal/lz4errors"
8 "github.com/pierrec/lz4/v4/internal/lz4stream"
9 )
10
11 var writerStates = []aState{
12 noState: newState,
13 newState: writeState,
14 writeState: closedState,
15 closedState: newState,
16 errorState: newState,
17 }
18
19
20 func NewWriter(w io.Writer) *Writer {
21 zw := &Writer{frame: lz4stream.NewFrame()}
22 zw.state.init(writerStates)
23 _ = zw.Apply(DefaultBlockSizeOption, DefaultChecksumOption, DefaultConcurrency, defaultOnBlockDone)
24 zw.Reset(w)
25 return zw
26 }
27
28
29 type Writer struct {
30 state _State
31 src io.Writer
32 level lz4block.CompressionLevel
33 num int
34 frame *lz4stream.Frame
35 data []byte
36 idx int
37 handler func(int)
38 legacy bool
39 }
40
41 func (*Writer) private() {}
42
43 func (w *Writer) Apply(options ...Option) (err error) {
44 defer w.state.check(&err)
45 switch w.state.state {
46 case newState:
47 case errorState:
48 return w.state.err
49 default:
50 return lz4errors.ErrOptionClosedOrError
51 }
52 w.Reset(w.src)
53 for _, o := range options {
54 if err = o(w); err != nil {
55 return
56 }
57 }
58 return
59 }
60
61 func (w *Writer) isNotConcurrent() bool {
62 return w.num == 1
63 }
64
65
66 func (w *Writer) init() error {
67 w.frame.InitW(w.src, w.num, w.legacy)
68 size := w.frame.Descriptor.Flags.BlockSizeIndex()
69 w.data = size.Get()
70 w.idx = 0
71 return w.frame.Descriptor.Write(w.frame, w.src)
72 }
73
74 func (w *Writer) Write(buf []byte) (n int, err error) {
75 defer w.state.check(&err)
76 switch w.state.state {
77 case writeState:
78 case closedState, errorState:
79 return 0, w.state.err
80 case newState:
81 if err = w.init(); w.state.next(err) {
82 return
83 }
84 default:
85 return 0, w.state.fail()
86 }
87
88 zn := len(w.data)
89 for len(buf) > 0 {
90 if w.isNotConcurrent() && w.idx == 0 && len(buf) >= zn {
91
92 if err = w.write(buf[:zn], false); err != nil {
93 return
94 }
95 n += zn
96 buf = buf[zn:]
97 continue
98 }
99
100 m := copy(w.data[w.idx:], buf)
101 n += m
102 w.idx += m
103 buf = buf[m:]
104
105 if w.idx < len(w.data) {
106
107 return
108 }
109
110
111 if err = w.write(w.data, true); err != nil {
112 return
113 }
114 if !w.isNotConcurrent() {
115 size := w.frame.Descriptor.Flags.BlockSizeIndex()
116 w.data = size.Get()
117 }
118 w.idx = 0
119 }
120 return
121 }
122
123 func (w *Writer) write(data []byte, safe bool) error {
124 if w.isNotConcurrent() {
125 block := w.frame.Blocks.Block
126 err := block.Compress(w.frame, data, w.level).Write(w.frame, w.src)
127 w.handler(len(block.Data))
128 return err
129 }
130 c := make(chan *lz4stream.FrameDataBlock)
131 w.frame.Blocks.Blocks <- c
132 go func(c chan *lz4stream.FrameDataBlock, data []byte, safe bool) {
133 b := lz4stream.NewFrameDataBlock(w.frame)
134 c <- b.Compress(w.frame, data, w.level)
135 <-c
136 w.handler(len(b.Data))
137 b.Close(w.frame)
138 if safe {
139
140 lz4block.Put(data)
141 }
142 }(c, data, safe)
143
144 return nil
145 }
146
147
148 func (w *Writer) Flush() (err error) {
149 switch w.state.state {
150 case writeState:
151 case errorState:
152 return w.state.err
153 case newState:
154 if err = w.init(); w.state.next(err) {
155 return
156 }
157 default:
158 return nil
159 }
160
161 if w.idx > 0 {
162
163 if err = w.write(w.data[:w.idx], false); err != nil {
164 return err
165 }
166 w.idx = 0
167 }
168 return nil
169 }
170
171
172
173 func (w *Writer) Close() error {
174 if err := w.Flush(); err != nil {
175 return err
176 }
177 err := w.frame.CloseW(w.src, w.num)
178
179 if w.data != nil {
180 lz4block.Put(w.data)
181 w.data = nil
182 }
183 return err
184 }
185
186
187
188
189
190
191
192 func (w *Writer) Reset(writer io.Writer) {
193 w.frame.Reset(w.num)
194 w.state.reset()
195 w.src = writer
196 }
197
198
199 func (w *Writer) ReadFrom(r io.Reader) (n int64, err error) {
200 switch w.state.state {
201 case closedState, errorState:
202 return 0, w.state.err
203 case newState:
204 if err = w.init(); w.state.next(err) {
205 return
206 }
207 default:
208 return 0, w.state.fail()
209 }
210 defer w.state.check(&err)
211
212 size := w.frame.Descriptor.Flags.BlockSizeIndex()
213 var done bool
214 var rn int
215 data := size.Get()
216 if w.isNotConcurrent() {
217
218 defer lz4block.Put(data)
219 }
220 for !done {
221 rn, err = io.ReadFull(r, data)
222 switch err {
223 case nil:
224 case io.EOF, io.ErrUnexpectedEOF:
225 done = true
226 default:
227 return
228 }
229 n += int64(rn)
230 err = w.write(data[:rn], true)
231 if err != nil {
232 return
233 }
234 w.handler(rn)
235 if !done && !w.isNotConcurrent() {
236
237
238 data = size.Get()
239 }
240 }
241 return
242 }
243
View as plain text