1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "sync/atomic"
11 "time"
12
13 "github.com/syndtr/goleveldb/leveldb/memdb"
14 "github.com/syndtr/goleveldb/leveldb/opt"
15 "github.com/syndtr/goleveldb/leveldb/util"
16 )
17
18 func (db *DB) writeJournal(batches []*Batch, seq uint64, sync bool) error {
19 wr, err := db.journal.Next()
20 if err != nil {
21 return err
22 }
23 if err := writeBatchesWithHeader(wr, batches, seq); err != nil {
24 return err
25 }
26 if err := db.journal.Flush(); err != nil {
27 return err
28 }
29 if sync {
30 return db.journalWriter.Sync()
31 }
32 return nil
33 }
34
35 func (db *DB) rotateMem(n int, wait bool) (mem *memDB, err error) {
36 retryLimit := 3
37 retry:
38
39 err = db.compTriggerWait(db.mcompCmdC)
40 if err != nil {
41 return
42 }
43 retryLimit--
44
45
46 mem, err = db.newMem(n)
47 if err != nil {
48 if err == errHasFrozenMem {
49 if retryLimit <= 0 {
50 panic("BUG: still has frozen memdb")
51 }
52 goto retry
53 }
54 return
55 }
56
57
58 if wait {
59 err = db.compTriggerWait(db.mcompCmdC)
60 } else {
61 db.compTrigger(db.mcompCmdC)
62 }
63 return
64 }
65
66 func (db *DB) flush(n int) (mdb *memDB, mdbFree int, err error) {
67 delayed := false
68 slowdownTrigger := db.s.o.GetWriteL0SlowdownTrigger()
69 pauseTrigger := db.s.o.GetWriteL0PauseTrigger()
70 flush := func() (retry bool) {
71 mdb = db.getEffectiveMem()
72 if mdb == nil {
73 err = ErrClosed
74 return false
75 }
76 defer func() {
77 if retry {
78 mdb.decref()
79 mdb = nil
80 }
81 }()
82 tLen := db.s.tLen(0)
83 mdbFree = mdb.Free()
84 switch {
85 case tLen >= slowdownTrigger && !delayed:
86 delayed = true
87 time.Sleep(time.Millisecond)
88 case mdbFree >= n:
89 return false
90 case tLen >= pauseTrigger:
91 delayed = true
92
93 atomic.StoreInt32(&db.inWritePaused, 1)
94 err = db.compTriggerWait(db.tcompCmdC)
95
96 atomic.StoreInt32(&db.inWritePaused, 0)
97 if err != nil {
98 return false
99 }
100 default:
101
102 if mdb.Len() == 0 {
103 mdbFree = n
104 } else {
105 mdb.decref()
106 mdb, err = db.rotateMem(n, false)
107 if err == nil {
108 mdbFree = mdb.Free()
109 } else {
110 mdbFree = 0
111 }
112 }
113 return false
114 }
115 return true
116 }
117 start := time.Now()
118 for flush() {
119 }
120 if delayed {
121 db.writeDelay += time.Since(start)
122 db.writeDelayN++
123 } else if db.writeDelayN > 0 {
124 db.logf("db@write was delayed N·%d T·%v", db.writeDelayN, db.writeDelay)
125 atomic.AddInt32(&db.cWriteDelayN, int32(db.writeDelayN))
126 atomic.AddInt64(&db.cWriteDelay, int64(db.writeDelay))
127 db.writeDelay = 0
128 db.writeDelayN = 0
129 }
130 return
131 }
132
133 type writeMerge struct {
134 sync bool
135 batch *Batch
136 keyType keyType
137 key, value []byte
138 }
139
140 func (db *DB) unlockWrite(overflow bool, merged int, err error) {
141 for i := 0; i < merged; i++ {
142 db.writeAckC <- err
143 }
144 if overflow {
145
146 db.writeMergedC <- false
147 } else {
148
149 <-db.writeLockC
150 }
151 }
152
153
154 func (db *DB) writeLocked(batch, ourBatch *Batch, merge, sync bool) error {
155
156
157 mdb, mdbFree, err := db.flush(batch.internalLen)
158 if err != nil {
159 db.unlockWrite(false, 0, err)
160 return err
161 }
162 defer mdb.decref()
163
164 var (
165 overflow bool
166 merged int
167 batches = []*Batch{batch}
168 )
169
170 if merge {
171
172 var mergeLimit int
173 if batch.internalLen > 128<<10 {
174 mergeLimit = (1 << 20) - batch.internalLen
175 } else {
176 mergeLimit = 128 << 10
177 }
178 mergeCap := mdbFree - batch.internalLen
179 if mergeLimit > mergeCap {
180 mergeLimit = mergeCap
181 }
182
183 merge:
184 for mergeLimit > 0 {
185 select {
186 case incoming := <-db.writeMergeC:
187 if incoming.batch != nil {
188
189 if incoming.batch.internalLen > mergeLimit {
190 overflow = true
191 break merge
192 }
193 batches = append(batches, incoming.batch)
194 mergeLimit -= incoming.batch.internalLen
195 } else {
196
197 internalLen := len(incoming.key) + len(incoming.value) + 8
198 if internalLen > mergeLimit {
199 overflow = true
200 break merge
201 }
202 if ourBatch == nil {
203 ourBatch = db.batchPool.Get().(*Batch)
204 ourBatch.Reset()
205 batches = append(batches, ourBatch)
206 }
207
208
209 ourBatch.appendRec(incoming.keyType, incoming.key, incoming.value)
210 mergeLimit -= internalLen
211 }
212 sync = sync || incoming.sync
213 merged++
214 db.writeMergedC <- true
215
216 default:
217 break merge
218 }
219 }
220 }
221
222
223 if ourBatch != nil {
224 defer db.batchPool.Put(ourBatch)
225 }
226
227
228 seq := db.seq + 1
229
230
231 if err := db.writeJournal(batches, seq, sync); err != nil {
232 db.unlockWrite(overflow, merged, err)
233 return err
234 }
235
236
237 for _, batch := range batches {
238 if err := batch.putMem(seq, mdb.DB); err != nil {
239 panic(err)
240 }
241 seq += uint64(batch.Len())
242 }
243
244
245 db.addSeq(uint64(batchesLen(batches)))
246
247
248 if batch.internalLen >= mdbFree {
249 if _, err := db.rotateMem(0, false); err != nil {
250 db.unlockWrite(overflow, merged, err)
251 return err
252 }
253 }
254
255 db.unlockWrite(overflow, merged, nil)
256 return nil
257 }
258
259
260
261
262
263
264
265
266 func (db *DB) Write(batch *Batch, wo *opt.WriteOptions) error {
267 if err := db.ok(); err != nil || batch == nil || batch.Len() == 0 {
268 return err
269 }
270
271
272
273
274 if batch.internalLen > db.s.o.GetWriteBuffer() && !db.s.o.GetDisableLargeBatchTransaction() {
275 tr, err := db.OpenTransaction()
276 if err != nil {
277 return err
278 }
279 if err := tr.Write(batch, wo); err != nil {
280 tr.Discard()
281 return err
282 }
283 return tr.Commit()
284 }
285
286 merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
287 sync := wo.GetSync() && !db.s.o.GetNoSync()
288
289
290 if merge {
291 select {
292 case db.writeMergeC <- writeMerge{sync: sync, batch: batch}:
293 if <-db.writeMergedC {
294
295 return <-db.writeAckC
296 }
297
298 case db.writeLockC <- struct{}{}:
299
300 case err := <-db.compPerErrC:
301
302 return err
303 case <-db.closeC:
304
305 return ErrClosed
306 }
307 } else {
308 select {
309 case db.writeLockC <- struct{}{}:
310
311 case err := <-db.compPerErrC:
312
313 return err
314 case <-db.closeC:
315
316 return ErrClosed
317 }
318 }
319
320 return db.writeLocked(batch, nil, merge, sync)
321 }
322
323 func (db *DB) putRec(kt keyType, key, value []byte, wo *opt.WriteOptions) error {
324 if err := db.ok(); err != nil {
325 return err
326 }
327
328 merge := !wo.GetNoWriteMerge() && !db.s.o.GetNoWriteMerge()
329 sync := wo.GetSync() && !db.s.o.GetNoSync()
330
331
332 if merge {
333 select {
334 case db.writeMergeC <- writeMerge{sync: sync, keyType: kt, key: key, value: value}:
335 if <-db.writeMergedC {
336
337 return <-db.writeAckC
338 }
339
340 case db.writeLockC <- struct{}{}:
341
342 case err := <-db.compPerErrC:
343
344 return err
345 case <-db.closeC:
346
347 return ErrClosed
348 }
349 } else {
350 select {
351 case db.writeLockC <- struct{}{}:
352
353 case err := <-db.compPerErrC:
354
355 return err
356 case <-db.closeC:
357
358 return ErrClosed
359 }
360 }
361
362 batch := db.batchPool.Get().(*Batch)
363 batch.Reset()
364 batch.appendRec(kt, key, value)
365 return db.writeLocked(batch, batch, merge, sync)
366 }
367
368
369
370
371
372
373
374 func (db *DB) Put(key, value []byte, wo *opt.WriteOptions) error {
375 return db.putRec(keyTypeVal, key, value, wo)
376 }
377
378
379
380
381
382
383 func (db *DB) Delete(key []byte, wo *opt.WriteOptions) error {
384 return db.putRec(keyTypeDel, key, nil, wo)
385 }
386
387 func isMemOverlaps(icmp *iComparer, mem *memdb.DB, min, max []byte) bool {
388 iter := mem.NewIterator(nil)
389 defer iter.Release()
390 return (max == nil || (iter.First() && icmp.uCompare(max, internalKey(iter.Key()).ukey()) >= 0)) &&
391 (min == nil || (iter.Last() && icmp.uCompare(min, internalKey(iter.Key()).ukey()) <= 0))
392 }
393
394
395
396
397
398
399
400
401
402
403 func (db *DB) CompactRange(r util.Range) error {
404 if err := db.ok(); err != nil {
405 return err
406 }
407
408
409 select {
410 case db.writeLockC <- struct{}{}:
411 case err := <-db.compPerErrC:
412 return err
413 case <-db.closeC:
414 return ErrClosed
415 }
416
417
418 mdb := db.getEffectiveMem()
419 if mdb == nil {
420 return ErrClosed
421 }
422 defer mdb.decref()
423 if isMemOverlaps(db.s.icmp, mdb.DB, r.Start, r.Limit) {
424
425 if _, err := db.rotateMem(0, false); err != nil {
426 <-db.writeLockC
427 return err
428 }
429 <-db.writeLockC
430 if err := db.compTriggerWait(db.mcompCmdC); err != nil {
431 return err
432 }
433 } else {
434 <-db.writeLockC
435 }
436
437
438 return db.compTriggerRange(db.tcompCmdC, -1, r.Start, r.Limit)
439 }
440
441
442 func (db *DB) SetReadOnly() error {
443 if err := db.ok(); err != nil {
444 return err
445 }
446
447
448 select {
449 case db.writeLockC <- struct{}{}:
450 db.compWriteLocking = true
451 case err := <-db.compPerErrC:
452 return err
453 case <-db.closeC:
454 return ErrClosed
455 }
456
457
458 select {
459 case db.compErrSetC <- ErrReadOnly:
460 case perr := <-db.compPerErrC:
461 return perr
462 case <-db.closeC:
463 return ErrClosed
464 }
465
466 return nil
467 }
468
View as plain text