1 package pgx
2
3 import (
4 "context"
5 "errors"
6 "io"
7 )
8
9
10
11
12 var maxLargeObjectMessageLength = 1024*1024*1024 - 1024
13
14
15
16
17
18 type LargeObjects struct {
19 tx Tx
20 }
21
22 type LargeObjectMode int32
23
24 const (
25 LargeObjectModeWrite LargeObjectMode = 0x20000
26 LargeObjectModeRead LargeObjectMode = 0x40000
27 )
28
29
30 func (o *LargeObjects) Create(ctx context.Context, oid uint32) (uint32, error) {
31 err := o.tx.QueryRow(ctx, "select lo_create($1)", oid).Scan(&oid)
32 return oid, err
33 }
34
35
36
37 func (o *LargeObjects) Open(ctx context.Context, oid uint32, mode LargeObjectMode) (*LargeObject, error) {
38 var fd int32
39 err := o.tx.QueryRow(ctx, "select lo_open($1, $2)", oid, mode).Scan(&fd)
40 if err != nil {
41 return nil, err
42 }
43 return &LargeObject{fd: fd, tx: o.tx, ctx: ctx}, nil
44 }
45
46
47 func (o *LargeObjects) Unlink(ctx context.Context, oid uint32) error {
48 var result int32
49 err := o.tx.QueryRow(ctx, "select lo_unlink($1)", oid).Scan(&result)
50 if err != nil {
51 return err
52 }
53
54 if result != 1 {
55 return errors.New("failed to remove large object")
56 }
57
58 return nil
59 }
60
61
62
63
64
65
66
67
68 type LargeObject struct {
69 ctx context.Context
70 tx Tx
71 fd int32
72 }
73
74
75 func (o *LargeObject) Write(p []byte) (int, error) {
76 nTotal := 0
77 for {
78 expected := len(p) - nTotal
79 if expected == 0 {
80 break
81 } else if expected > maxLargeObjectMessageLength {
82 expected = maxLargeObjectMessageLength
83 }
84
85 var n int
86 err := o.tx.QueryRow(o.ctx, "select lowrite($1, $2)", o.fd, p[nTotal:nTotal+expected]).Scan(&n)
87 if err != nil {
88 return nTotal, err
89 }
90
91 if n < 0 {
92 return nTotal, errors.New("failed to write to large object")
93 }
94
95 nTotal += n
96
97 if n < expected {
98 return nTotal, errors.New("short write to large object")
99 } else if n > expected {
100 return nTotal, errors.New("invalid write to large object")
101 }
102 }
103
104 return nTotal, nil
105 }
106
107
108 func (o *LargeObject) Read(p []byte) (int, error) {
109 nTotal := 0
110 for {
111 expected := len(p) - nTotal
112 if expected == 0 {
113 break
114 } else if expected > maxLargeObjectMessageLength {
115 expected = maxLargeObjectMessageLength
116 }
117
118 var res []byte
119 err := o.tx.QueryRow(o.ctx, "select loread($1, $2)", o.fd, expected).Scan(&res)
120 copy(p[nTotal:], res)
121 nTotal += len(res)
122 if err != nil {
123 return nTotal, err
124 }
125
126 if len(res) < expected {
127 return nTotal, io.EOF
128 } else if len(res) > expected {
129 return nTotal, errors.New("invalid read of large object")
130 }
131 }
132
133 return nTotal, nil
134 }
135
136
137 func (o *LargeObject) Seek(offset int64, whence int) (n int64, err error) {
138 err = o.tx.QueryRow(o.ctx, "select lo_lseek64($1, $2, $3)", o.fd, offset, whence).Scan(&n)
139 return n, err
140 }
141
142
143 func (o *LargeObject) Tell() (n int64, err error) {
144 err = o.tx.QueryRow(o.ctx, "select lo_tell64($1)", o.fd).Scan(&n)
145 return n, err
146 }
147
148
149 func (o *LargeObject) Truncate(size int64) (err error) {
150 _, err = o.tx.Exec(o.ctx, "select lo_truncate64($1, $2)", o.fd, size)
151 return err
152 }
153
154
155 func (o *LargeObject) Close() error {
156 _, err := o.tx.Exec(o.ctx, "select lo_close($1)", o.fd)
157 return err
158 }
159
View as plain text