1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package runtime
16
17 import (
18 "bytes"
19 "encoding/csv"
20 "errors"
21 "io"
22 "net/http/httptest"
23 "strings"
24 "testing"
25
26 "github.com/stretchr/testify/assert"
27 "github.com/stretchr/testify/require"
28 )
29
30 const (
31 csvFixture = `name,country,age
32 John,US,19
33 Mike,US,20
34 `
35 badCSVFixture = `name,country,age
36 John,US,19
37 Mike,US
38 `
39 commentedCSVFixture = `# heading line
40 name,country,age
41 #John's record
42 John,US,19
43 #Mike's record
44 Mike,US,20
45 `
46 )
47
48 var testCSVRecords = [][]string{
49 {"name", "country", "age"},
50 {"John", "US", "19"},
51 {"Mike", "US", "20"},
52 }
53
54 func TestCSVConsumer(t *testing.T) {
55 consumer := CSVConsumer()
56
57 t.Run("can consume as a *csv.Writer", func(t *testing.T) {
58 reader := bytes.NewBufferString(csvFixture)
59 var buf bytes.Buffer
60 dest := csv.NewWriter(&buf)
61
62 err := consumer.Consume(reader, dest)
63 require.NoError(t, err)
64 assert.Equal(t, csvFixture, buf.String())
65 })
66
67 t.Run("can consume as a CSVReader", func(t *testing.T) {
68 reader := bytes.NewBufferString(csvFixture)
69 var dest csvRecordsWriter
70
71 err := consumer.Consume(reader, &dest)
72 require.NoError(t, err)
73 assertCSVRecords(t, dest.records)
74 })
75
76 t.Run("can consume as a Writer", func(t *testing.T) {
77 reader := bytes.NewBufferString(csvFixture)
78 var dest closingWriter
79
80 err := consumer.Consume(reader, &dest)
81 require.NoError(t, err)
82 assert.Equal(t, csvFixture, dest.b.String())
83 })
84
85 t.Run("can consume as a ReaderFrom", func(t *testing.T) {
86 reader := bytes.NewBufferString(csvFixture)
87 var dest readerFromDummy
88
89 err := consumer.Consume(reader, &dest)
90 require.NoError(t, err)
91 assert.Equal(t, csvFixture, dest.b.String())
92 })
93
94 t.Run("can consume as a BinaryUnmarshaler", func(t *testing.T) {
95 reader := bytes.NewBufferString(csvFixture)
96 var dest binaryUnmarshalDummy
97
98 err := consumer.Consume(reader, &dest)
99 require.NoError(t, err)
100 assert.Equal(t, csvFixture, dest.str)
101 })
102
103 t.Run("can consume as a *[][]string", func(t *testing.T) {
104 reader := bytes.NewBufferString(csvFixture)
105 dest := [][]string{}
106
107 err := consumer.Consume(reader, &dest)
108 require.NoError(t, err)
109 assertCSVRecords(t, dest)
110 })
111
112 t.Run("can consume as an alias to *[][]string", func(t *testing.T) {
113 reader := bytes.NewBufferString(csvFixture)
114 type records [][]string
115 var dest records
116
117 err := consumer.Consume(reader, &dest)
118 require.NoError(t, err)
119 assertCSVRecords(t, dest)
120 })
121
122 t.Run("can consume as a *[]byte", func(t *testing.T) {
123 reader := bytes.NewBufferString(csvFixture)
124 var dest []byte
125
126 err := consumer.Consume(reader, &dest)
127 require.NoError(t, err)
128 assert.Equal(t, csvFixture, string(dest))
129 })
130
131 t.Run("can consume as an alias to *[]byte", func(t *testing.T) {
132 reader := bytes.NewBufferString(csvFixture)
133 type buffer []byte
134 var dest buffer
135
136 err := consumer.Consume(reader, &dest)
137 require.NoError(t, err)
138 assert.Equal(t, csvFixture, string(dest))
139 })
140
141 t.Run("can consume as a *string", func(t *testing.T) {
142 reader := bytes.NewBufferString(csvFixture)
143 var dest string
144
145 err := consumer.Consume(reader, &dest)
146 require.NoError(t, err)
147 assert.Equal(t, csvFixture, dest)
148 })
149
150 t.Run("can consume as an alias to *string", func(t *testing.T) {
151 reader := bytes.NewBufferString(csvFixture)
152 type buffer string
153 var dest buffer
154
155 err := consumer.Consume(reader, &dest)
156 require.NoError(t, err)
157 assert.Equal(t, csvFixture, string(dest))
158 })
159
160 t.Run("can consume from an empty reader", func(t *testing.T) {
161 reader := &csvEmptyReader{}
162 var dest bytes.Buffer
163
164 err := consumer.Consume(reader, &dest)
165 require.NoError(t, err)
166 assert.Empty(t, dest.String())
167 })
168
169 t.Run("error cases", func(t *testing.T) {
170 t.Run("nil data is never accepted", func(t *testing.T) {
171 var rdr bytes.Buffer
172
173 require.Error(t, consumer.Consume(&rdr, nil))
174 })
175
176 t.Run("nil readers should also never be acccepted", func(t *testing.T) {
177 var buf bytes.Buffer
178
179 err := consumer.Consume(nil, &buf)
180 require.Error(t, err)
181 })
182
183 t.Run("data must be a pointer", func(t *testing.T) {
184 var rdr bytes.Buffer
185 var dest []byte
186
187 err := consumer.Consume(&rdr, dest)
188 require.Error(t, err)
189 })
190
191 t.Run("unsupported type", func(t *testing.T) {
192 var rdr bytes.Buffer
193 var dest struct{}
194
195 err := consumer.Consume(&rdr, &dest)
196 require.Error(t, err)
197 })
198
199 t.Run("should propagate CSV error (buffered)", func(t *testing.T) {
200 reader := bytes.NewBufferString(badCSVFixture)
201 var dest []byte
202
203 err := consumer.Consume(reader, &dest)
204 require.Error(t, err)
205 require.EqualError(t, err, "record on line 3: wrong number of fields")
206 })
207
208 t.Run("should propagate CSV error (buffered, string)", func(t *testing.T) {
209 reader := bytes.NewBufferString(badCSVFixture)
210 var dest string
211
212 err := consumer.Consume(reader, &dest)
213 require.Error(t, err)
214 require.EqualError(t, err, "record on line 3: wrong number of fields")
215 })
216
217 t.Run("should propagate CSV error (buffered, ReaderFrom)", func(t *testing.T) {
218 reader := bytes.NewBufferString(badCSVFixture)
219 var dest readerFromDummy
220
221 err := consumer.Consume(reader, &dest)
222 require.Error(t, err)
223 require.EqualError(t, err, "record on line 3: wrong number of fields")
224 })
225
226 t.Run("should propagate CSV error (buffered, BinaryUnmarshaler)", func(t *testing.T) {
227 reader := bytes.NewBufferString(badCSVFixture)
228 var dest binaryUnmarshalDummy
229
230 err := consumer.Consume(reader, &dest)
231 require.Error(t, err)
232 require.EqualError(t, err, "record on line 3: wrong number of fields")
233 })
234
235 t.Run("should propagate CSV error (streaming)", func(t *testing.T) {
236 reader := bytes.NewBufferString(badCSVFixture)
237 var dest bytes.Buffer
238
239 err := consumer.Consume(reader, &dest)
240 require.Error(t, err)
241 require.EqualError(t, err, "record on line 3: wrong number of fields")
242 })
243
244 t.Run("should propagate CSV error (streaming, write error)", func(t *testing.T) {
245 reader := bytes.NewBufferString(csvFixture)
246 var buf bytes.Buffer
247 dest := csvWriterDummy{err: errors.New("test error"), Writer: csv.NewWriter(&buf)}
248
249 err := consumer.Consume(reader, &dest)
250 require.Error(t, err)
251 require.EqualError(t, err, "test error")
252 })
253
254 t.Run("should propagate ReaderFrom error", func(t *testing.T) {
255 reader := bytes.NewBufferString(csvFixture)
256 dest := readerFromDummy{err: errors.New("test error")}
257
258 err := consumer.Consume(reader, &dest)
259 require.Error(t, err)
260 require.EqualError(t, err, "test error")
261 })
262
263 t.Run("should propagate BinaryUnmarshaler error", func(t *testing.T) {
264 reader := bytes.NewBufferString(csvFixture)
265 dest := binaryUnmarshalDummy{err: errors.New("test error")}
266
267 err := consumer.Consume(reader, &dest)
268 require.Error(t, err)
269 require.EqualError(t, err, "test error")
270 })
271 })
272 }
273
274 func TestCSVConsumerWithOptions(t *testing.T) {
275 semiColonFixture := strings.ReplaceAll(csvFixture, ",", ";")
276
277 t.Run("with CSV reader Comma", func(t *testing.T) {
278 consumer := CSVConsumer(WithCSVReaderOpts(csv.Reader{Comma: ';', FieldsPerRecord: 3}))
279
280 t.Run("should not read comma-separated input", func(t *testing.T) {
281 reader := bytes.NewBufferString(csvFixture)
282 var dest bytes.Buffer
283
284 err := consumer.Consume(reader, &dest)
285 require.Error(t, err)
286 require.EqualError(t, err, "record on line 1: wrong number of fields")
287 })
288
289 t.Run("should read semicolon-separated input and convert it to colon-separated", func(t *testing.T) {
290 reader := bytes.NewBufferString(semiColonFixture)
291 var dest bytes.Buffer
292
293 err := consumer.Consume(reader, &dest)
294 require.NoError(t, err)
295 assert.Equal(t, csvFixture, dest.String())
296 })
297 })
298
299 t.Run("with CSV reader Comment", func(t *testing.T) {
300 consumer := CSVConsumer(WithCSVReaderOpts(csv.Reader{Comment: '#'}))
301
302 t.Run("should read input and skip commented lines", func(t *testing.T) {
303 reader := bytes.NewBufferString(commentedCSVFixture)
304 var dest [][]string
305
306 err := consumer.Consume(reader, &dest)
307 require.NoError(t, err)
308 assertCSVRecords(t, dest)
309 })
310 })
311
312 t.Run("with CSV writer Comma", func(t *testing.T) {
313 consumer := CSVConsumer(WithCSVWriterOpts(csv.Writer{Comma: ';'}))
314
315 t.Run("should read comma-separated input and convert it to semicolon-separated", func(t *testing.T) {
316 reader := bytes.NewBufferString(csvFixture)
317 var dest bytes.Buffer
318
319 err := consumer.Consume(reader, &dest)
320 require.NoError(t, err)
321 assert.Equal(t, semiColonFixture, dest.String())
322 })
323 })
324
325 t.Run("with SkipLines (streaming)", func(t *testing.T) {
326 consumer := CSVConsumer(WithCSVSkipLines(1))
327 reader := bytes.NewBufferString(csvFixture)
328 var dest [][]string
329
330 err := consumer.Consume(reader, &dest)
331 require.NoError(t, err)
332
333 expected := testCSVRecords[1:]
334 assert.Equalf(t, expected, dest, "expected output to skip header")
335 })
336
337 t.Run("with SkipLines (buffered)", func(t *testing.T) {
338 consumer := CSVConsumer(WithCSVSkipLines(1))
339 reader := bytes.NewBufferString(csvFixture)
340 var dest []byte
341
342 err := consumer.Consume(reader, &dest)
343 require.NoError(t, err)
344
345 r := csv.NewReader(bytes.NewReader(dest))
346 consumed, err := r.ReadAll()
347 require.NoError(t, err)
348 expected := testCSVRecords[1:]
349 assert.Equalf(t, expected, consumed, "expected output to skip header")
350 })
351
352 t.Run("should detect errors on skipped lines (streaming)", func(t *testing.T) {
353 consumer := CSVConsumer(WithCSVSkipLines(1))
354 reader := bytes.NewBufferString(strings.ReplaceAll(csvFixture, ",age", `,"age`))
355 var dest [][]string
356
357 err := consumer.Consume(reader, &dest)
358 require.Error(t, err)
359 require.ErrorContains(t, err, "record on line 1; parse error")
360 })
361
362 t.Run("should detect errors on skipped lines (buffered)", func(t *testing.T) {
363 consumer := CSVConsumer(WithCSVSkipLines(1))
364 reader := bytes.NewBufferString(strings.ReplaceAll(csvFixture, ",age", `,"age`))
365 var dest []byte
366
367 err := consumer.Consume(reader, &dest)
368 require.Error(t, err)
369 require.ErrorContains(t, err, "record on line 1; parse error")
370 })
371
372 t.Run("with SkipLines greater than the total number of lines (streaming)", func(t *testing.T) {
373 consumer := CSVConsumer(WithCSVSkipLines(4))
374 reader := bytes.NewBufferString(csvFixture)
375 var dest [][]string
376
377 err := consumer.Consume(reader, &dest)
378 require.NoError(t, err)
379
380 assert.Empty(t, dest)
381 })
382
383 t.Run("with SkipLines greater than the total number of lines (buffered)", func(t *testing.T) {
384 consumer := CSVConsumer(WithCSVSkipLines(4))
385 reader := bytes.NewBufferString(csvFixture)
386 var dest []byte
387
388 err := consumer.Consume(reader, &dest)
389 require.NoError(t, err)
390
391 assert.Empty(t, dest)
392 })
393
394 t.Run("with CloseStream", func(t *testing.T) {
395 t.Run("wants to close stream", func(t *testing.T) {
396 closingConsumer := CSVConsumer(WithCSVClosesStream())
397 var dest bytes.Buffer
398 r := &closingReader{b: bytes.NewBufferString(csvFixture)}
399
400 require.NoError(t, closingConsumer.Consume(r, &dest))
401 assert.Equal(t, csvFixture, dest.String())
402 assert.EqualValues(t, 1, r.calledClose)
403 })
404
405 t.Run("don't want to close stream", func(t *testing.T) {
406 nonClosingConsumer := CSVConsumer()
407 var dest bytes.Buffer
408 r := &closingReader{b: bytes.NewBufferString(csvFixture)}
409
410 require.NoError(t, nonClosingConsumer.Consume(r, &dest))
411 assert.Equal(t, csvFixture, dest.String())
412 assert.EqualValues(t, 0, r.calledClose)
413 })
414 })
415 }
416
417 func TestCSVProducer(t *testing.T) {
418 producer := CSVProducer()
419
420 t.Run("can produce CSV from *csv.Reader", func(t *testing.T) {
421 writer := new(bytes.Buffer)
422 buf := bytes.NewBufferString(csvFixture)
423 data := csv.NewReader(buf)
424
425 err := producer.Produce(writer, data)
426 require.NoError(t, err)
427 assert.Equal(t, csvFixture, writer.String())
428 })
429
430 t.Run("can produce CSV from CSVReader", func(t *testing.T) {
431 writer := new(bytes.Buffer)
432 data := &csvRecordsWriter{
433 records: testCSVRecords,
434 }
435
436 err := producer.Produce(writer, data)
437 require.NoError(t, err)
438 assert.Equal(t, csvFixture, writer.String())
439 })
440
441 t.Run("can produce CSV from Reader", func(t *testing.T) {
442 writer := new(bytes.Buffer)
443 data := bytes.NewReader([]byte(csvFixture))
444
445 err := producer.Produce(writer, data)
446 require.NoError(t, err)
447 assert.Equal(t, csvFixture, writer.String())
448 })
449
450 t.Run("can produce CSV from WriterTo", func(t *testing.T) {
451 writer := new(bytes.Buffer)
452 buf := bytes.NewBufferString(csvFixture)
453 data := &writerToDummy{
454 b: *buf,
455 }
456
457 err := producer.Produce(writer, data)
458 require.NoError(t, err)
459 assert.Equal(t, csvFixture, writer.String())
460 })
461
462 t.Run("can produce CSV from BinaryMarshaler", func(t *testing.T) {
463 writer := new(bytes.Buffer)
464 data := &binaryMarshalDummy{str: csvFixture}
465
466 err := producer.Produce(writer, data)
467 require.NoError(t, err)
468 assert.Equal(t, csvFixture, writer.String())
469 })
470
471 t.Run("can produce CSV from [][]string", func(t *testing.T) {
472 writer := new(bytes.Buffer)
473 data := testCSVRecords
474
475 err := producer.Produce(writer, data)
476 require.NoError(t, err)
477 assert.Equal(t, csvFixture, writer.String())
478 })
479
480 t.Run("can produce CSV from alias to [][]string", func(t *testing.T) {
481 writer := new(bytes.Buffer)
482 type records [][]string
483 data := records(testCSVRecords)
484
485 err := producer.Produce(writer, data)
486 require.NoError(t, err)
487 assert.Equal(t, csvFixture, writer.String())
488 })
489
490 t.Run("can produce CSV from []byte", func(t *testing.T) {
491 writer := httptest.NewRecorder()
492 data := []byte(csvFixture)
493
494 err := producer.Produce(writer, data)
495 require.NoError(t, err)
496 assert.Equal(t, csvFixture, writer.Body.String())
497 })
498
499 t.Run("can produce CSV from alias to []byte", func(t *testing.T) {
500 writer := httptest.NewRecorder()
501 type buffer []byte
502 data := buffer(csvFixture)
503
504 err := producer.Produce(writer, data)
505 require.NoError(t, err)
506 assert.Equal(t, csvFixture, writer.Body.String())
507 })
508
509 t.Run("can produce CSV from string", func(t *testing.T) {
510 writer := httptest.NewRecorder()
511 data := csvFixture
512
513 err := producer.Produce(writer, data)
514 require.NoError(t, err)
515 assert.Equal(t, csvFixture, writer.Body.String())
516 })
517
518 t.Run("can produce CSV from alias to string", func(t *testing.T) {
519 writer := httptest.NewRecorder()
520 type buffer string
521 data := buffer(csvFixture)
522
523 err := producer.Produce(writer, data)
524 require.NoError(t, err)
525 assert.Equal(t, csvFixture, writer.Body.String())
526 })
527
528 t.Run("always close data reader whenever possible", func(t *testing.T) {
529 nonClosingProducer := CSVProducer()
530 r := &closingWriter{}
531 data := &closingReader{b: bytes.NewBufferString(csvFixture)}
532
533 require.NoError(t, nonClosingProducer.Produce(r, data))
534 assert.Equal(t, csvFixture, r.String())
535 assert.EqualValuesf(t, 0, r.calledClose, "expected the input reader NOT to be closed")
536 assert.EqualValuesf(t, 1, data.calledClose, "expected the data reader to be closed")
537 })
538
539 t.Run("error cases", func(t *testing.T) {
540 t.Run("unsupported type", func(t *testing.T) {
541 writer := httptest.NewRecorder()
542 var data struct{}
543
544 err := producer.Produce(writer, data)
545 require.Error(t, err)
546 })
547
548 t.Run("data cannot be nil", func(t *testing.T) {
549 writer := httptest.NewRecorder()
550
551 err := producer.Produce(writer, nil)
552 require.Error(t, err)
553 })
554
555 t.Run("writer cannot be nil", func(t *testing.T) {
556 data := []byte(csvFixture)
557
558 err := producer.Produce(nil, data)
559 require.Error(t, err)
560 })
561
562 t.Run("should propagate error from BinaryMarshaler", func(t *testing.T) {
563 var rdr bytes.Buffer
564 data := new(binaryMarshalDummy)
565
566 err := producer.Produce(&rdr, data)
567 require.Error(t, err)
568 require.ErrorContains(t, err, "no text set")
569 })
570 })
571 }
572
573 func TestCSVProducerWithOptions(t *testing.T) {
574 t.Run("with CloseStream", func(t *testing.T) {
575 t.Run("wants to close stream", func(t *testing.T) {
576 closingProducer := CSVProducer(WithCSVClosesStream())
577 r := &closingWriter{}
578 data := bytes.NewBufferString(csvFixture)
579
580 require.NoError(t, closingProducer.Produce(r, data))
581 assert.Equal(t, csvFixture, r.String())
582 assert.EqualValues(t, 1, r.calledClose)
583 })
584
585 t.Run("don't want to close stream", func(t *testing.T) {
586 nonClosingProducer := CSVProducer()
587 r := &closingWriter{}
588 data := bytes.NewBufferString(csvFixture)
589
590 require.NoError(t, nonClosingProducer.Produce(r, data))
591 assert.Equal(t, csvFixture, r.String())
592 assert.EqualValues(t, 0, r.calledClose)
593 })
594 })
595 }
596
597 func assertCSVRecords(t testing.TB, dest [][]string) {
598 assert.Len(t, dest, 3)
599 for i, record := range dest {
600 assert.Equal(t, testCSVRecords[i], record)
601 }
602 }
603
604 type csvEmptyReader struct{}
605
606 func (r *csvEmptyReader) Read(_ []byte) (int, error) {
607 return 0, io.EOF
608 }
609
610 type readerFromDummy struct {
611 err error
612 b bytes.Buffer
613 }
614
615 func (r *readerFromDummy) ReadFrom(rdr io.Reader) (int64, error) {
616 if r.err != nil {
617 return 0, r.err
618 }
619
620 return r.b.ReadFrom(rdr)
621 }
622
623 type writerToDummy struct {
624 b bytes.Buffer
625 }
626
627 func (w *writerToDummy) WriteTo(writer io.Writer) (int64, error) {
628 return w.b.WriteTo(writer)
629 }
630
631 type csvWriterDummy struct {
632 err error
633 *csv.Writer
634 }
635
636 func (w *csvWriterDummy) Write(record []string) error {
637 if w.err != nil {
638 return w.err
639 }
640
641 return w.Writer.Write(record)
642 }
643
644 func (w *csvWriterDummy) Error() error {
645 if w.err != nil {
646 return w.err
647 }
648
649 return w.Writer.Error()
650 }
651
View as plain text