1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "errors"
11 "sync"
12 "time"
13
14 "github.com/syndtr/goleveldb/leveldb/iterator"
15 "github.com/syndtr/goleveldb/leveldb/opt"
16 "github.com/syndtr/goleveldb/leveldb/util"
17 )
18
19 var errTransactionDone = errors.New("leveldb: transaction already closed")
20
21
22 type Transaction struct {
23 db *DB
24 lk sync.RWMutex
25 seq uint64
26 mem *memDB
27 tables tFiles
28 ikScratch []byte
29 rec sessionRecord
30 stats cStatStaging
31 closed bool
32 }
33
34
35
36
37
38
39
40 func (tr *Transaction) Get(key []byte, ro *opt.ReadOptions) ([]byte, error) {
41 tr.lk.RLock()
42 defer tr.lk.RUnlock()
43 if tr.closed {
44 return nil, errTransactionDone
45 }
46 return tr.db.get(tr.mem.DB, tr.tables, key, tr.seq, ro)
47 }
48
49
50
51
52 func (tr *Transaction) Has(key []byte, ro *opt.ReadOptions) (bool, error) {
53 tr.lk.RLock()
54 defer tr.lk.RUnlock()
55 if tr.closed {
56 return false, errTransactionDone
57 }
58 return tr.db.has(tr.mem.DB, tr.tables, key, tr.seq, ro)
59 }
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82 func (tr *Transaction) NewIterator(slice *util.Range, ro *opt.ReadOptions) iterator.Iterator {
83 tr.lk.RLock()
84 defer tr.lk.RUnlock()
85 if tr.closed {
86 return iterator.NewEmptyIterator(errTransactionDone)
87 }
88 tr.mem.incref()
89 return tr.db.newIterator(tr.mem, tr.tables, tr.seq, slice, ro)
90 }
91
92 func (tr *Transaction) flush() error {
93
94 if tr.mem.Len() != 0 {
95 tr.stats.startTimer()
96 iter := tr.mem.NewIterator(nil)
97 t, n, err := tr.db.s.tops.createFrom(iter)
98 iter.Release()
99 tr.stats.stopTimer()
100 if err != nil {
101 return err
102 }
103 if tr.mem.getref() == 1 {
104 tr.mem.Reset()
105 } else {
106 tr.mem.decref()
107 tr.mem = tr.db.mpoolGet(0)
108 tr.mem.incref()
109 }
110 tr.tables = append(tr.tables, t)
111 tr.rec.addTableFile(0, t)
112 tr.stats.write += t.size
113 tr.db.logf("transaction@flush created L0@%d N·%d S·%s %q:%q", t.fd.Num, n, shortenb(t.size), t.imin, t.imax)
114 }
115 return nil
116 }
117
118 func (tr *Transaction) put(kt keyType, key, value []byte) error {
119 tr.ikScratch = makeInternalKey(tr.ikScratch, key, tr.seq+1, kt)
120 if tr.mem.Free() < len(tr.ikScratch)+len(value) {
121 if err := tr.flush(); err != nil {
122 return err
123 }
124 }
125 if err := tr.mem.Put(tr.ikScratch, value); err != nil {
126 return err
127 }
128 tr.seq++
129 return nil
130 }
131
132
133
134
135
136
137
138 func (tr *Transaction) Put(key, value []byte, wo *opt.WriteOptions) error {
139 tr.lk.Lock()
140 defer tr.lk.Unlock()
141 if tr.closed {
142 return errTransactionDone
143 }
144 return tr.put(keyTypeVal, key, value)
145 }
146
147
148
149
150
151
152 func (tr *Transaction) Delete(key []byte, wo *opt.WriteOptions) error {
153 tr.lk.Lock()
154 defer tr.lk.Unlock()
155 if tr.closed {
156 return errTransactionDone
157 }
158 return tr.put(keyTypeDel, key, nil)
159 }
160
161
162
163
164
165
166
167 func (tr *Transaction) Write(b *Batch, wo *opt.WriteOptions) error {
168 if b == nil || b.Len() == 0 {
169 return nil
170 }
171
172 tr.lk.Lock()
173 defer tr.lk.Unlock()
174 if tr.closed {
175 return errTransactionDone
176 }
177 return b.replayInternal(func(i int, kt keyType, k, v []byte) error {
178 return tr.put(kt, k, v)
179 })
180 }
181
182 func (tr *Transaction) setDone() {
183 tr.closed = true
184 tr.db.tr = nil
185 tr.mem.decref()
186 <-tr.db.writeLockC
187 }
188
189
190
191
192
193 func (tr *Transaction) Commit() error {
194 if err := tr.db.ok(); err != nil {
195 return err
196 }
197
198 tr.lk.Lock()
199 defer tr.lk.Unlock()
200 if tr.closed {
201 return errTransactionDone
202 }
203 if err := tr.flush(); err != nil {
204
205
206 return err
207 }
208 if len(tr.tables) != 0 {
209
210 tr.rec.setSeqNum(tr.seq)
211 tr.db.compCommitLk.Lock()
212 tr.stats.startTimer()
213 var cerr error
214 for retry := 0; retry < 3; retry++ {
215 cerr = tr.db.s.commit(&tr.rec, false)
216 if cerr != nil {
217 tr.db.logf("transaction@commit error R·%d %q", retry, cerr)
218 select {
219 case <-time.After(time.Second):
220 case <-tr.db.closeC:
221 tr.db.logf("transaction@commit exiting")
222 tr.db.compCommitLk.Unlock()
223 return cerr
224 }
225 } else {
226
227 tr.db.setSeq(tr.seq)
228 break
229 }
230 }
231 tr.stats.stopTimer()
232 if cerr != nil {
233
234
235 return cerr
236 }
237
238
239 tr.db.compStats.addStat(0, &tr.stats)
240
241
242 tr.db.compTrigger(tr.db.tcompCmdC)
243 tr.db.compCommitLk.Unlock()
244
245
246
247 _ = tr.db.waitCompaction()
248 }
249
250 tr.setDone()
251 return nil
252 }
253
254 func (tr *Transaction) discard() {
255
256 for _, t := range tr.tables {
257 tr.db.logf("transaction@discard @%d", t.fd.Num)
258
259 tr.db.s.tops.remove(t.fd)
260 }
261 }
262
263
264
265
266
267
268 func (tr *Transaction) Discard() {
269 tr.lk.Lock()
270 if !tr.closed {
271 tr.discard()
272 tr.setDone()
273 }
274 tr.lk.Unlock()
275 }
276
277 func (db *DB) waitCompaction() error {
278 if db.s.tLen(0) >= db.s.o.GetWriteL0PauseTrigger() {
279 return db.compTriggerWait(db.tcompCmdC)
280 }
281 return nil
282 }
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297 func (db *DB) OpenTransaction() (*Transaction, error) {
298 if err := db.ok(); err != nil {
299 return nil, err
300 }
301
302
303 select {
304 case db.writeLockC <- struct{}{}:
305 case err := <-db.compPerErrC:
306 return nil, err
307 case <-db.closeC:
308 return nil, ErrClosed
309 }
310
311 if db.tr != nil {
312 panic("leveldb: has open transaction")
313 }
314
315
316 if db.mem != nil && db.mem.Len() != 0 {
317 if _, err := db.rotateMem(0, true); err != nil {
318 return nil, err
319 }
320 }
321
322
323 if err := db.waitCompaction(); err != nil {
324 return nil, err
325 }
326
327 tr := &Transaction{
328 db: db,
329 seq: db.seq,
330 mem: db.mpoolGet(0),
331 }
332 tr.mem.incref()
333 db.tr = tr
334 return tr, nil
335 }
336
View as plain text