...

Source file src/github.com/jackc/pgx/v4/batch.go

Documentation: github.com/jackc/pgx/v4

     1  package pgx
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"fmt"
     7  
     8  	"github.com/jackc/pgconn"
     9  )
    10  
    11  type batchItem struct {
    12  	query     string
    13  	arguments []interface{}
    14  }
    15  
    16  // Batch queries are a way of bundling multiple queries together to avoid
    17  // unnecessary network round trips.
    18  type Batch struct {
    19  	items []*batchItem
    20  }
    21  
    22  // Queue queues a query to batch b. query can be an SQL query or the name of a prepared statement.
    23  func (b *Batch) Queue(query string, arguments ...interface{}) {
    24  	b.items = append(b.items, &batchItem{
    25  		query:     query,
    26  		arguments: arguments,
    27  	})
    28  }
    29  
    30  // Len returns number of queries that have been queued so far.
    31  func (b *Batch) Len() int {
    32  	return len(b.items)
    33  }
    34  
    35  type BatchResults interface {
    36  	// Exec reads the results from the next query in the batch as if the query has been sent with Conn.Exec.
    37  	Exec() (pgconn.CommandTag, error)
    38  
    39  	// Query reads the results from the next query in the batch as if the query has been sent with Conn.Query.
    40  	Query() (Rows, error)
    41  
    42  	// QueryRow reads the results from the next query in the batch as if the query has been sent with Conn.QueryRow.
    43  	QueryRow() Row
    44  
    45  	// QueryFunc reads the results from the next query in the batch as if the query has been sent with Conn.QueryFunc.
    46  	QueryFunc(scans []interface{}, f func(QueryFuncRow) error) (pgconn.CommandTag, error)
    47  
    48  	// Close closes the batch operation. This must be called before the underlying connection can be used again. Any error
    49  	// that occurred during a batch operation may have made it impossible to resyncronize the connection with the server.
    50  	// In this case the underlying connection will have been closed. Close is safe to call multiple times.
    51  	Close() error
    52  }
    53  
    54  type batchResults struct {
    55  	ctx    context.Context
    56  	conn   *Conn
    57  	mrr    *pgconn.MultiResultReader
    58  	err    error
    59  	b      *Batch
    60  	ix     int
    61  	closed bool
    62  }
    63  
    64  // Exec reads the results from the next query in the batch as if the query has been sent with Exec.
    65  func (br *batchResults) Exec() (pgconn.CommandTag, error) {
    66  	if br.err != nil {
    67  		return nil, br.err
    68  	}
    69  	if br.closed {
    70  		return nil, fmt.Errorf("batch already closed")
    71  	}
    72  
    73  	query, arguments, _ := br.nextQueryAndArgs()
    74  
    75  	if !br.mrr.NextResult() {
    76  		err := br.mrr.Close()
    77  		if err == nil {
    78  			err = errors.New("no result")
    79  		}
    80  		if br.conn.shouldLog(LogLevelError) {
    81  			br.conn.log(br.ctx, LogLevelError, "BatchResult.Exec", map[string]interface{}{
    82  				"sql":  query,
    83  				"args": logQueryArgs(arguments),
    84  				"err":  err,
    85  			})
    86  		}
    87  		return nil, err
    88  	}
    89  
    90  	commandTag, err := br.mrr.ResultReader().Close()
    91  
    92  	if err != nil {
    93  		if br.conn.shouldLog(LogLevelError) {
    94  			br.conn.log(br.ctx, LogLevelError, "BatchResult.Exec", map[string]interface{}{
    95  				"sql":  query,
    96  				"args": logQueryArgs(arguments),
    97  				"err":  err,
    98  			})
    99  		}
   100  	} else if br.conn.shouldLog(LogLevelInfo) {
   101  		br.conn.log(br.ctx, LogLevelInfo, "BatchResult.Exec", map[string]interface{}{
   102  			"sql":        query,
   103  			"args":       logQueryArgs(arguments),
   104  			"commandTag": commandTag,
   105  		})
   106  	}
   107  
   108  	return commandTag, err
   109  }
   110  
   111  // Query reads the results from the next query in the batch as if the query has been sent with Query.
   112  func (br *batchResults) Query() (Rows, error) {
   113  	query, arguments, ok := br.nextQueryAndArgs()
   114  	if !ok {
   115  		query = "batch query"
   116  	}
   117  
   118  	if br.err != nil {
   119  		return &connRows{err: br.err, closed: true}, br.err
   120  	}
   121  
   122  	if br.closed {
   123  		alreadyClosedErr := fmt.Errorf("batch already closed")
   124  		return &connRows{err: alreadyClosedErr, closed: true}, alreadyClosedErr
   125  	}
   126  
   127  	rows := br.conn.getRows(br.ctx, query, arguments)
   128  
   129  	if !br.mrr.NextResult() {
   130  		rows.err = br.mrr.Close()
   131  		if rows.err == nil {
   132  			rows.err = errors.New("no result")
   133  		}
   134  		rows.closed = true
   135  
   136  		if br.conn.shouldLog(LogLevelError) {
   137  			br.conn.log(br.ctx, LogLevelError, "BatchResult.Query", map[string]interface{}{
   138  				"sql":  query,
   139  				"args": logQueryArgs(arguments),
   140  				"err":  rows.err,
   141  			})
   142  		}
   143  
   144  		return rows, rows.err
   145  	}
   146  
   147  	rows.resultReader = br.mrr.ResultReader()
   148  	return rows, nil
   149  }
   150  
   151  // QueryFunc reads the results from the next query in the batch as if the query has been sent with Conn.QueryFunc.
   152  func (br *batchResults) QueryFunc(scans []interface{}, f func(QueryFuncRow) error) (pgconn.CommandTag, error) {
   153  	if br.closed {
   154  		return nil, fmt.Errorf("batch already closed")
   155  	}
   156  
   157  	rows, err := br.Query()
   158  	if err != nil {
   159  		return nil, err
   160  	}
   161  	defer rows.Close()
   162  
   163  	for rows.Next() {
   164  		err = rows.Scan(scans...)
   165  		if err != nil {
   166  			return nil, err
   167  		}
   168  
   169  		err = f(rows)
   170  		if err != nil {
   171  			return nil, err
   172  		}
   173  	}
   174  
   175  	if err := rows.Err(); err != nil {
   176  		return nil, err
   177  	}
   178  
   179  	return rows.CommandTag(), nil
   180  }
   181  
   182  // QueryRow reads the results from the next query in the batch as if the query has been sent with QueryRow.
   183  func (br *batchResults) QueryRow() Row {
   184  	rows, _ := br.Query()
   185  	return (*connRow)(rows.(*connRows))
   186  
   187  }
   188  
   189  // Close closes the batch operation. Any error that occurred during a batch operation may have made it impossible to
   190  // resyncronize the connection with the server. In this case the underlying connection will have been closed.
   191  func (br *batchResults) Close() error {
   192  	if br.err != nil {
   193  		return br.err
   194  	}
   195  
   196  	if br.closed {
   197  		return nil
   198  	}
   199  	br.closed = true
   200  
   201  	// log any queries that haven't yet been logged by Exec or Query
   202  	for {
   203  		query, args, ok := br.nextQueryAndArgs()
   204  		if !ok {
   205  			break
   206  		}
   207  
   208  		if br.conn.shouldLog(LogLevelInfo) {
   209  			br.conn.log(br.ctx, LogLevelInfo, "BatchResult.Close", map[string]interface{}{
   210  				"sql":  query,
   211  				"args": logQueryArgs(args),
   212  			})
   213  		}
   214  	}
   215  
   216  	return br.mrr.Close()
   217  }
   218  
   219  func (br *batchResults) nextQueryAndArgs() (query string, args []interface{}, ok bool) {
   220  	if br.b != nil && br.ix < len(br.b.items) {
   221  		bi := br.b.items[br.ix]
   222  		query = bi.query
   223  		args = bi.arguments
   224  		ok = true
   225  		br.ix++
   226  	}
   227  	return
   228  }
   229  

View as plain text