...

Source file src/github.com/go-openapi/runtime/csv.go

Documentation: github.com/go-openapi/runtime

     1  // Copyright 2015 go-swagger maintainers
     2  //
     3  // Licensed under the Apache License, Version 2.0 (the "License");
     4  // you may not use this file except in compliance with the License.
     5  // You may obtain a copy of the License at
     6  //
     7  //    http://www.apache.org/licenses/LICENSE-2.0
     8  //
     9  // Unless required by applicable law or agreed to in writing, software
    10  // distributed under the License is distributed on an "AS IS" BASIS,
    11  // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    12  // See the License for the specific language governing permissions and
    13  // limitations under the License.
    14  
    15  package runtime
    16  
    17  import (
    18  	"bytes"
    19  	"context"
    20  	"encoding"
    21  	"encoding/csv"
    22  	"errors"
    23  	"fmt"
    24  	"io"
    25  	"reflect"
    26  
    27  	"golang.org/x/sync/errgroup"
    28  )
    29  
    30  // CSVConsumer creates a new CSV consumer.
    31  //
    32  // The consumer consumes CSV records from a provided reader into the data passed by reference.
    33  //
    34  // CSVOpts options may be specified to alter the default CSV behavior on the reader and the writer side (e.g. separator, skip header, ...).
    35  // The defaults are those of the standard library's csv.Reader and csv.Writer.
    36  //
    37  // Supported output underlying types and interfaces, prioritized in this order:
    38  // - *csv.Writer
    39  // - CSVWriter (writer options are ignored)
    40  // - io.Writer (as raw bytes)
    41  // - io.ReaderFrom (as raw bytes)
    42  // - encoding.BinaryUnmarshaler (as raw bytes)
    43  // - *[][]string (as a collection of records)
    44  // - *[]byte (as raw bytes)
    45  // - *string (a raw bytes)
    46  //
    47  // The consumer prioritizes situations where buffering the input is not required.
    48  func CSVConsumer(opts ...CSVOpt) Consumer {
    49  	o := csvOptsWithDefaults(opts)
    50  
    51  	return ConsumerFunc(func(reader io.Reader, data interface{}) error {
    52  		if reader == nil {
    53  			return errors.New("CSVConsumer requires a reader")
    54  		}
    55  		if data == nil {
    56  			return errors.New("nil destination for CSVConsumer")
    57  		}
    58  
    59  		csvReader := csv.NewReader(reader)
    60  		o.applyToReader(csvReader)
    61  		closer := defaultCloser
    62  		if o.closeStream {
    63  			if cl, isReaderCloser := reader.(io.Closer); isReaderCloser {
    64  				closer = cl.Close
    65  			}
    66  		}
    67  		defer func() {
    68  			_ = closer()
    69  		}()
    70  
    71  		switch destination := data.(type) {
    72  		case *csv.Writer:
    73  			csvWriter := destination
    74  			o.applyToWriter(csvWriter)
    75  
    76  			return pipeCSV(csvWriter, csvReader, o)
    77  
    78  		case CSVWriter:
    79  			csvWriter := destination
    80  			// no writer options available
    81  
    82  			return pipeCSV(csvWriter, csvReader, o)
    83  
    84  		case io.Writer:
    85  			csvWriter := csv.NewWriter(destination)
    86  			o.applyToWriter(csvWriter)
    87  
    88  			return pipeCSV(csvWriter, csvReader, o)
    89  
    90  		case io.ReaderFrom:
    91  			var buf bytes.Buffer
    92  			csvWriter := csv.NewWriter(&buf)
    93  			o.applyToWriter(csvWriter)
    94  			if err := bufferedCSV(csvWriter, csvReader, o); err != nil {
    95  				return err
    96  			}
    97  			_, err := destination.ReadFrom(&buf)
    98  
    99  			return err
   100  
   101  		case encoding.BinaryUnmarshaler:
   102  			var buf bytes.Buffer
   103  			csvWriter := csv.NewWriter(&buf)
   104  			o.applyToWriter(csvWriter)
   105  			if err := bufferedCSV(csvWriter, csvReader, o); err != nil {
   106  				return err
   107  			}
   108  
   109  			return destination.UnmarshalBinary(buf.Bytes())
   110  
   111  		default:
   112  			// support *[][]string, *[]byte, *string
   113  			if ptr := reflect.TypeOf(data); ptr.Kind() != reflect.Ptr {
   114  				return errors.New("destination must be a pointer")
   115  			}
   116  
   117  			v := reflect.Indirect(reflect.ValueOf(data))
   118  			t := v.Type()
   119  
   120  			switch {
   121  			case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Slice && t.Elem().Elem().Kind() == reflect.String:
   122  				csvWriter := &csvRecordsWriter{}
   123  				// writer options are ignored
   124  				if err := pipeCSV(csvWriter, csvReader, o); err != nil {
   125  					return err
   126  				}
   127  
   128  				v.Grow(len(csvWriter.records))
   129  				v.SetCap(len(csvWriter.records)) // in case Grow was unnessary, trim down the capacity
   130  				v.SetLen(len(csvWriter.records))
   131  				reflect.Copy(v, reflect.ValueOf(csvWriter.records))
   132  
   133  				return nil
   134  
   135  			case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8:
   136  				var buf bytes.Buffer
   137  				csvWriter := csv.NewWriter(&buf)
   138  				o.applyToWriter(csvWriter)
   139  				if err := bufferedCSV(csvWriter, csvReader, o); err != nil {
   140  					return err
   141  				}
   142  				v.SetBytes(buf.Bytes())
   143  
   144  				return nil
   145  
   146  			case t.Kind() == reflect.String:
   147  				var buf bytes.Buffer
   148  				csvWriter := csv.NewWriter(&buf)
   149  				o.applyToWriter(csvWriter)
   150  				if err := bufferedCSV(csvWriter, csvReader, o); err != nil {
   151  					return err
   152  				}
   153  				v.SetString(buf.String())
   154  
   155  				return nil
   156  
   157  			default:
   158  				return fmt.Errorf("%v (%T) is not supported by the CSVConsumer, %s",
   159  					data, data, "can be resolved by supporting CSVWriter/Writer/BinaryUnmarshaler interface",
   160  				)
   161  			}
   162  		}
   163  	})
   164  }
   165  
   166  // CSVProducer creates a new CSV producer.
   167  //
   168  // The producer takes input data then writes as CSV to an output writer (essentially as a pipe).
   169  //
   170  // Supported input underlying types and interfaces, prioritized in this order:
   171  // - *csv.Reader
   172  // - CSVReader (reader options are ignored)
   173  // - io.Reader
   174  // - io.WriterTo
   175  // - encoding.BinaryMarshaler
   176  // - [][]string
   177  // - []byte
   178  // - string
   179  //
   180  // The producer prioritizes situations where buffering the input is not required.
   181  func CSVProducer(opts ...CSVOpt) Producer {
   182  	o := csvOptsWithDefaults(opts)
   183  
   184  	return ProducerFunc(func(writer io.Writer, data interface{}) error {
   185  		if writer == nil {
   186  			return errors.New("CSVProducer requires a writer")
   187  		}
   188  		if data == nil {
   189  			return errors.New("nil data for CSVProducer")
   190  		}
   191  
   192  		csvWriter := csv.NewWriter(writer)
   193  		o.applyToWriter(csvWriter)
   194  		closer := defaultCloser
   195  		if o.closeStream {
   196  			if cl, isWriterCloser := writer.(io.Closer); isWriterCloser {
   197  				closer = cl.Close
   198  			}
   199  		}
   200  		defer func() {
   201  			_ = closer()
   202  		}()
   203  
   204  		if rc, isDataCloser := data.(io.ReadCloser); isDataCloser {
   205  			defer rc.Close()
   206  		}
   207  
   208  		switch origin := data.(type) {
   209  		case *csv.Reader:
   210  			csvReader := origin
   211  			o.applyToReader(csvReader)
   212  
   213  			return pipeCSV(csvWriter, csvReader, o)
   214  
   215  		case CSVReader:
   216  			csvReader := origin
   217  			// no reader options available
   218  
   219  			return pipeCSV(csvWriter, csvReader, o)
   220  
   221  		case io.Reader:
   222  			csvReader := csv.NewReader(origin)
   223  			o.applyToReader(csvReader)
   224  
   225  			return pipeCSV(csvWriter, csvReader, o)
   226  
   227  		case io.WriterTo:
   228  			// async piping of the writes performed by WriteTo
   229  			r, w := io.Pipe()
   230  			csvReader := csv.NewReader(r)
   231  			o.applyToReader(csvReader)
   232  
   233  			pipe, _ := errgroup.WithContext(context.Background())
   234  			pipe.Go(func() error {
   235  				_, err := origin.WriteTo(w)
   236  				_ = w.Close()
   237  				return err
   238  			})
   239  
   240  			pipe.Go(func() error {
   241  				defer func() {
   242  					_ = r.Close()
   243  				}()
   244  
   245  				return pipeCSV(csvWriter, csvReader, o)
   246  			})
   247  
   248  			return pipe.Wait()
   249  
   250  		case encoding.BinaryMarshaler:
   251  			buf, err := origin.MarshalBinary()
   252  			if err != nil {
   253  				return err
   254  			}
   255  			rdr := bytes.NewBuffer(buf)
   256  			csvReader := csv.NewReader(rdr)
   257  
   258  			return bufferedCSV(csvWriter, csvReader, o)
   259  
   260  		default:
   261  			// support [][]string, []byte, string (or pointers to those)
   262  			v := reflect.Indirect(reflect.ValueOf(data))
   263  			t := v.Type()
   264  
   265  			switch {
   266  			case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Slice && t.Elem().Elem().Kind() == reflect.String:
   267  				csvReader := &csvRecordsWriter{
   268  					records: make([][]string, v.Len()),
   269  				}
   270  				reflect.Copy(reflect.ValueOf(csvReader.records), v)
   271  
   272  				return pipeCSV(csvWriter, csvReader, o)
   273  
   274  			case t.Kind() == reflect.Slice && t.Elem().Kind() == reflect.Uint8:
   275  				buf := bytes.NewBuffer(v.Bytes())
   276  				csvReader := csv.NewReader(buf)
   277  				o.applyToReader(csvReader)
   278  
   279  				return bufferedCSV(csvWriter, csvReader, o)
   280  
   281  			case t.Kind() == reflect.String:
   282  				buf := bytes.NewBufferString(v.String())
   283  				csvReader := csv.NewReader(buf)
   284  				o.applyToReader(csvReader)
   285  
   286  				return bufferedCSV(csvWriter, csvReader, o)
   287  
   288  			default:
   289  				return fmt.Errorf("%v (%T) is not supported by the CSVProducer, %s",
   290  					data, data, "can be resolved by supporting CSVReader/Reader/BinaryMarshaler interface",
   291  				)
   292  			}
   293  		}
   294  	})
   295  }
   296  
   297  // pipeCSV copies CSV records from a CSV reader to a CSV writer
   298  func pipeCSV(csvWriter CSVWriter, csvReader CSVReader, opts csvOpts) error {
   299  	for ; opts.skippedLines > 0; opts.skippedLines-- {
   300  		_, err := csvReader.Read()
   301  		if err != nil {
   302  			if errors.Is(err, io.EOF) {
   303  				return nil
   304  			}
   305  
   306  			return err
   307  		}
   308  	}
   309  
   310  	for {
   311  		record, err := csvReader.Read()
   312  		if err != nil {
   313  			if errors.Is(err, io.EOF) {
   314  				break
   315  			}
   316  
   317  			return err
   318  		}
   319  
   320  		if err := csvWriter.Write(record); err != nil {
   321  			return err
   322  		}
   323  	}
   324  
   325  	csvWriter.Flush()
   326  
   327  	return csvWriter.Error()
   328  }
   329  
   330  // bufferedCSV copies CSV records from a CSV reader to a CSV writer,
   331  // by first reading all records then writing them at once.
   332  func bufferedCSV(csvWriter *csv.Writer, csvReader *csv.Reader, opts csvOpts) error {
   333  	for ; opts.skippedLines > 0; opts.skippedLines-- {
   334  		_, err := csvReader.Read()
   335  		if err != nil {
   336  			if errors.Is(err, io.EOF) {
   337  				return nil
   338  			}
   339  
   340  			return err
   341  		}
   342  	}
   343  
   344  	records, err := csvReader.ReadAll()
   345  	if err != nil {
   346  		return err
   347  	}
   348  
   349  	return csvWriter.WriteAll(records)
   350  }
   351  

View as plain text