1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package runtime
16
17 import (
18 "bytes"
19 "context"
20 "encoding"
21 "encoding/csv"
22 "errors"
23 "fmt"
24 "io"
25 "reflect"
26
27 "golang.org/x/sync/errgroup"
28 )
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48 func CSVConsumer(opts ...CSVOpt) Consumer {
49 o := csvOptsWithDefaults(opts)
50
51 return ConsumerFunc(func(reader io.Reader, data interface{}) error {
52 if reader == nil {
53 return errors.New("CSVConsumer requires a reader")
54 }
55 if data == nil {
56 return errors.New("nil destination for CSVConsumer")
57 }
58
59 csvReader := csv.NewReader(reader)
60 o.applyToReader(csvReader)
61 closer := defaultCloser
62 if o.closeStream {
63 if cl, isReaderCloser := reader.(io.Closer); isReaderCloser {
64 closer = cl.Close
65 }
66 }
67 defer func() {
68 _ = closer()
69 }()
70
71 switch destination := data.(type) {
72 case *csv.Writer:
73 csvWriter := destination
74 o.applyToWriter(csvWriter)
75
76 return pipeCSV(csvWriter, csvReader, o)
77
78 case CSVWriter:
79 csvWriter := destination
80
81
82 return pipeCSV(csvWriter, csvReader, o)
83
84 case io.Writer:
85 csvWriter := csv.NewWriter(destination)
86 o.applyToWriter(csvWriter)
87
88 return pipeCSV(csvWriter, csvReader, o)
89
90 case io.ReaderFrom:
91 var buf bytes.Buffer
92 csvWriter := csv.NewWriter(&buf)
93 o.applyToWriter(csvWriter)
94 if err := bufferedCSV(csvWriter, csvReader, o); err != nil {
95 return err
96 }
97 _, err := destination.ReadFrom(&buf)
98
99 return err
100
101 case encoding.BinaryUnmarshaler:
102 var buf bytes.Buffer
103 csvWriter := csv.NewWriter(&buf)
104 o.applyToWriter(csvWriter)
105 if err := bufferedCSV(csvWriter, csvReader, o); err != nil {
106 return err
107 }
108
109 return destination.UnmarshalBinary(buf.Bytes())
110
111 default:
112
113 if ptr := reflect.TypeOf(data); ptr.Kind() != reflect.Ptr {
114 return errors.New("destination must be a pointer")
115 }
116
117 v := reflect.Indirect(reflect.ValueOf(data))
118 t := v.Type()
119
120 switch {
121 case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Slice && t.Elem().Elem().Kind() == reflect.String:
122 csvWriter := &csvRecordsWriter{}
123
124 if err := pipeCSV(csvWriter, csvReader, o); err != nil {
125 return err
126 }
127
128 v.Grow(len(csvWriter.records))
129 v.SetCap(len(csvWriter.records))
130 v.SetLen(len(csvWriter.records))
131 reflect.Copy(v, reflect.ValueOf(csvWriter.records))
132
133 return nil
134
135 case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8:
136 var buf bytes.Buffer
137 csvWriter := csv.NewWriter(&buf)
138 o.applyToWriter(csvWriter)
139 if err := bufferedCSV(csvWriter, csvReader, o); err != nil {
140 return err
141 }
142 v.SetBytes(buf.Bytes())
143
144 return nil
145
146 case t.Kind() == reflect.String:
147 var buf bytes.Buffer
148 csvWriter := csv.NewWriter(&buf)
149 o.applyToWriter(csvWriter)
150 if err := bufferedCSV(csvWriter, csvReader, o); err != nil {
151 return err
152 }
153 v.SetString(buf.String())
154
155 return nil
156
157 default:
158 return fmt.Errorf("%v (%T) is not supported by the CSVConsumer, %s",
159 data, data, "can be resolved by supporting CSVWriter/Writer/BinaryUnmarshaler interface",
160 )
161 }
162 }
163 })
164 }
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181 func CSVProducer(opts ...CSVOpt) Producer {
182 o := csvOptsWithDefaults(opts)
183
184 return ProducerFunc(func(writer io.Writer, data interface{}) error {
185 if writer == nil {
186 return errors.New("CSVProducer requires a writer")
187 }
188 if data == nil {
189 return errors.New("nil data for CSVProducer")
190 }
191
192 csvWriter := csv.NewWriter(writer)
193 o.applyToWriter(csvWriter)
194 closer := defaultCloser
195 if o.closeStream {
196 if cl, isWriterCloser := writer.(io.Closer); isWriterCloser {
197 closer = cl.Close
198 }
199 }
200 defer func() {
201 _ = closer()
202 }()
203
204 if rc, isDataCloser := data.(io.ReadCloser); isDataCloser {
205 defer rc.Close()
206 }
207
208 switch origin := data.(type) {
209 case *csv.Reader:
210 csvReader := origin
211 o.applyToReader(csvReader)
212
213 return pipeCSV(csvWriter, csvReader, o)
214
215 case CSVReader:
216 csvReader := origin
217
218
219 return pipeCSV(csvWriter, csvReader, o)
220
221 case io.Reader:
222 csvReader := csv.NewReader(origin)
223 o.applyToReader(csvReader)
224
225 return pipeCSV(csvWriter, csvReader, o)
226
227 case io.WriterTo:
228
229 r, w := io.Pipe()
230 csvReader := csv.NewReader(r)
231 o.applyToReader(csvReader)
232
233 pipe, _ := errgroup.WithContext(context.Background())
234 pipe.Go(func() error {
235 _, err := origin.WriteTo(w)
236 _ = w.Close()
237 return err
238 })
239
240 pipe.Go(func() error {
241 defer func() {
242 _ = r.Close()
243 }()
244
245 return pipeCSV(csvWriter, csvReader, o)
246 })
247
248 return pipe.Wait()
249
250 case encoding.BinaryMarshaler:
251 buf, err := origin.MarshalBinary()
252 if err != nil {
253 return err
254 }
255 rdr := bytes.NewBuffer(buf)
256 csvReader := csv.NewReader(rdr)
257
258 return bufferedCSV(csvWriter, csvReader, o)
259
260 default:
261
262 v := reflect.Indirect(reflect.ValueOf(data))
263 t := v.Type()
264
265 switch {
266 case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Slice && t.Elem().Elem().Kind() == reflect.String:
267 csvReader := &csvRecordsWriter{
268 records: make([][]string, v.Len()),
269 }
270 reflect.Copy(reflect.ValueOf(csvReader.records), v)
271
272 return pipeCSV(csvWriter, csvReader, o)
273
274 case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8:
275 buf := bytes.NewBuffer(v.Bytes())
276 csvReader := csv.NewReader(buf)
277 o.applyToReader(csvReader)
278
279 return bufferedCSV(csvWriter, csvReader, o)
280
281 case t.Kind() == reflect.String:
282 buf := bytes.NewBufferString(v.String())
283 csvReader := csv.NewReader(buf)
284 o.applyToReader(csvReader)
285
286 return bufferedCSV(csvWriter, csvReader, o)
287
288 default:
289 return fmt.Errorf("%v (%T) is not supported by the CSVProducer, %s",
290 data, data, "can be resolved by supporting CSVReader/Reader/BinaryMarshaler interface",
291 )
292 }
293 }
294 })
295 }
296
297
298 func pipeCSV(csvWriter CSVWriter, csvReader CSVReader, opts csvOpts) error {
299 for ; opts.skippedLines > 0; opts.skippedLines-- {
300 _, err := csvReader.Read()
301 if err != nil {
302 if errors.Is(err, io.EOF) {
303 return nil
304 }
305
306 return err
307 }
308 }
309
310 for {
311 record, err := csvReader.Read()
312 if err != nil {
313 if errors.Is(err, io.EOF) {
314 break
315 }
316
317 return err
318 }
319
320 if err := csvWriter.Write(record); err != nil {
321 return err
322 }
323 }
324
325 csvWriter.Flush()
326
327 return csvWriter.Error()
328 }
329
330
331
332 func bufferedCSV(csvWriter *csv.Writer, csvReader *csv.Reader, opts csvOpts) error {
333 for ; opts.skippedLines > 0; opts.skippedLines-- {
334 _, err := csvReader.Read()
335 if err != nil {
336 if errors.Is(err, io.EOF) {
337 return nil
338 }
339
340 return err
341 }
342 }
343
344 records, err := csvReader.ReadAll()
345 if err != nil {
346 return err
347 }
348
349 return csvWriter.WriteAll(records)
350 }
351
View as plain text