...

Source file src/github.com/jackc/pgx/v5/large_objects.go

Documentation: github.com/jackc/pgx/v5

     1  package pgx
     2  
     3  import (
     4  	"context"
     5  	"errors"
     6  	"io"
     7  )
     8  
     9  // The PostgreSQL wire protocol has a limit of 1 GB - 1 per message. See definition of
    10  // PQ_LARGE_MESSAGE_LIMIT in the PostgreSQL source code. To allow for the other data
    11  // in the message,maxLargeObjectMessageLength should be no larger than 1 GB - 1 KB.
    12  var maxLargeObjectMessageLength = 1024*1024*1024 - 1024
    13  
    14  // LargeObjects is a structure used to access the large objects API. It is only valid within the transaction where it
    15  // was created.
    16  //
    17  // For more details see: http://www.postgresql.org/docs/current/static/largeobjects.html
    18  type LargeObjects struct {
    19  	tx Tx
    20  }
    21  
    22  type LargeObjectMode int32
    23  
    24  const (
    25  	LargeObjectModeWrite LargeObjectMode = 0x20000
    26  	LargeObjectModeRead  LargeObjectMode = 0x40000
    27  )
    28  
    29  // Create creates a new large object. If oid is zero, the server assigns an unused OID.
    30  func (o *LargeObjects) Create(ctx context.Context, oid uint32) (uint32, error) {
    31  	err := o.tx.QueryRow(ctx, "select lo_create($1)", oid).Scan(&oid)
    32  	return oid, err
    33  }
    34  
    35  // Open opens an existing large object with the given mode. ctx will also be used for all operations on the opened large
    36  // object.
    37  func (o *LargeObjects) Open(ctx context.Context, oid uint32, mode LargeObjectMode) (*LargeObject, error) {
    38  	var fd int32
    39  	err := o.tx.QueryRow(ctx, "select lo_open($1, $2)", oid, mode).Scan(&fd)
    40  	if err != nil {
    41  		return nil, err
    42  	}
    43  	return &LargeObject{fd: fd, tx: o.tx, ctx: ctx}, nil
    44  }
    45  
    46  // Unlink removes a large object from the database.
    47  func (o *LargeObjects) Unlink(ctx context.Context, oid uint32) error {
    48  	var result int32
    49  	err := o.tx.QueryRow(ctx, "select lo_unlink($1)", oid).Scan(&result)
    50  	if err != nil {
    51  		return err
    52  	}
    53  
    54  	if result != 1 {
    55  		return errors.New("failed to remove large object")
    56  	}
    57  
    58  	return nil
    59  }
    60  
    61  // A LargeObject is a large object stored on the server. It is only valid within the transaction that it was initialized
    62  // in. It uses the context it was initialized with for all operations. It implements these interfaces:
    63  //
    64  //	io.Writer
    65  //	io.Reader
    66  //	io.Seeker
    67  //	io.Closer
    68  type LargeObject struct {
    69  	ctx context.Context
    70  	tx  Tx
    71  	fd  int32
    72  }
    73  
    74  // Write writes p to the large object and returns the number of bytes written and an error if not all of p was written.
    75  func (o *LargeObject) Write(p []byte) (int, error) {
    76  	nTotal := 0
    77  	for {
    78  		expected := len(p) - nTotal
    79  		if expected == 0 {
    80  			break
    81  		} else if expected > maxLargeObjectMessageLength {
    82  			expected = maxLargeObjectMessageLength
    83  		}
    84  
    85  		var n int
    86  		err := o.tx.QueryRow(o.ctx, "select lowrite($1, $2)", o.fd, p[nTotal:nTotal+expected]).Scan(&n)
    87  		if err != nil {
    88  			return nTotal, err
    89  		}
    90  
    91  		if n < 0 {
    92  			return nTotal, errors.New("failed to write to large object")
    93  		}
    94  
    95  		nTotal += n
    96  
    97  		if n < expected {
    98  			return nTotal, errors.New("short write to large object")
    99  		} else if n > expected {
   100  			return nTotal, errors.New("invalid write to large object")
   101  		}
   102  	}
   103  
   104  	return nTotal, nil
   105  }
   106  
   107  // Read reads up to len(p) bytes into p returning the number of bytes read.
   108  func (o *LargeObject) Read(p []byte) (int, error) {
   109  	nTotal := 0
   110  	for {
   111  		expected := len(p) - nTotal
   112  		if expected == 0 {
   113  			break
   114  		} else if expected > maxLargeObjectMessageLength {
   115  			expected = maxLargeObjectMessageLength
   116  		}
   117  
   118  		var res []byte
   119  		err := o.tx.QueryRow(o.ctx, "select loread($1, $2)", o.fd, expected).Scan(&res)
   120  		copy(p[nTotal:], res)
   121  		nTotal += len(res)
   122  		if err != nil {
   123  			return nTotal, err
   124  		}
   125  
   126  		if len(res) < expected {
   127  			return nTotal, io.EOF
   128  		} else if len(res) > expected {
   129  			return nTotal, errors.New("invalid read of large object")
   130  		}
   131  	}
   132  
   133  	return nTotal, nil
   134  }
   135  
   136  // Seek moves the current location pointer to the new location specified by offset.
   137  func (o *LargeObject) Seek(offset int64, whence int) (n int64, err error) {
   138  	err = o.tx.QueryRow(o.ctx, "select lo_lseek64($1, $2, $3)", o.fd, offset, whence).Scan(&n)
   139  	return n, err
   140  }
   141  
   142  // Tell returns the current read or write location of the large object descriptor.
   143  func (o *LargeObject) Tell() (n int64, err error) {
   144  	err = o.tx.QueryRow(o.ctx, "select lo_tell64($1)", o.fd).Scan(&n)
   145  	return n, err
   146  }
   147  
   148  // Truncate the large object to size.
   149  func (o *LargeObject) Truncate(size int64) (err error) {
   150  	_, err = o.tx.Exec(o.ctx, "select lo_truncate64($1, $2)", o.fd, size)
   151  	return err
   152  }
   153  
   154  // Close the large object descriptor.
   155  func (o *LargeObject) Close() error {
   156  	_, err := o.tx.Exec(o.ctx, "select lo_close($1)", o.fd)
   157  	return err
   158  }
   159  

View as plain text