...
1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "errors"
11 "sync/atomic"
12 "time"
13
14 "github.com/syndtr/goleveldb/leveldb/journal"
15 "github.com/syndtr/goleveldb/leveldb/memdb"
16 "github.com/syndtr/goleveldb/leveldb/storage"
17 )
18
19 var (
20 errHasFrozenMem = errors.New("has frozen mem")
21 )
22
23 type memDB struct {
24 db *DB
25 *memdb.DB
26 ref int32
27 }
28
29 func (m *memDB) getref() int32 {
30 return atomic.LoadInt32(&m.ref)
31 }
32
33 func (m *memDB) incref() {
34 atomic.AddInt32(&m.ref, 1)
35 }
36
37 func (m *memDB) decref() {
38 if ref := atomic.AddInt32(&m.ref, -1); ref == 0 {
39
40 if m.Capacity() == m.db.s.o.GetWriteBuffer() {
41 m.Reset()
42 m.db.mpoolPut(m.DB)
43 }
44 m.db = nil
45 m.DB = nil
46 } else if ref < 0 {
47 panic("negative memdb ref")
48 }
49 }
50
51
52 func (db *DB) getSeq() uint64 {
53 return atomic.LoadUint64(&db.seq)
54 }
55
56
57 func (db *DB) addSeq(delta uint64) {
58 atomic.AddUint64(&db.seq, delta)
59 }
60
61 func (db *DB) setSeq(seq uint64) {
62 atomic.StoreUint64(&db.seq, seq)
63 }
64
65 func (db *DB) sampleSeek(ikey internalKey) {
66 v := db.s.version()
67 if v.sampleSeek(ikey) {
68
69 db.compTrigger(db.tcompCmdC)
70 }
71 v.release()
72 }
73
74 func (db *DB) mpoolPut(mem *memdb.DB) {
75 if !db.isClosed() {
76 select {
77 case db.memPool <- mem:
78 default:
79 }
80 }
81 }
82
83 func (db *DB) mpoolGet(n int) *memDB {
84 var mdb *memdb.DB
85 select {
86 case mdb = <-db.memPool:
87 default:
88 }
89 if mdb == nil || mdb.Capacity() < n {
90 mdb = memdb.New(db.s.icmp, maxInt(db.s.o.GetWriteBuffer(), n))
91 }
92 return &memDB{
93 db: db,
94 DB: mdb,
95 }
96 }
97
98 func (db *DB) mpoolDrain() {
99 ticker := time.NewTicker(30 * time.Second)
100 for {
101 select {
102 case <-ticker.C:
103 select {
104 case <-db.memPool:
105 default:
106 }
107 case <-db.closeC:
108 ticker.Stop()
109
110 select {
111 case <-db.memPool:
112 case <-time.After(time.Second):
113 }
114 close(db.memPool)
115 return
116 }
117 }
118 }
119
120
121
122 func (db *DB) newMem(n int) (mem *memDB, err error) {
123 fd := storage.FileDesc{Type: storage.TypeJournal, Num: db.s.allocFileNum()}
124 w, err := db.s.stor.Create(fd)
125 if err != nil {
126 db.s.reuseFileNum(fd.Num)
127 return
128 }
129
130 db.memMu.Lock()
131 defer db.memMu.Unlock()
132
133 if db.frozenMem != nil {
134 return nil, errHasFrozenMem
135 }
136
137 if db.journal == nil {
138 db.journal = journal.NewWriter(w)
139 } else {
140 if err := db.journal.Reset(w); err != nil {
141 return nil, err
142 }
143 if err := db.journalWriter.Close(); err != nil {
144 return nil, err
145 }
146 db.frozenJournalFd = db.journalFd
147 }
148 db.journalWriter = w
149 db.journalFd = fd
150 db.frozenMem = db.mem
151 mem = db.mpoolGet(n)
152 mem.incref()
153 mem.incref()
154 db.mem = mem
155
156
157 db.frozenSeq = db.seq
158 return
159 }
160
161
162 func (db *DB) getMems() (e, f *memDB) {
163 db.memMu.RLock()
164 defer db.memMu.RUnlock()
165 if db.mem != nil {
166 db.mem.incref()
167 } else if !db.isClosed() {
168 panic("nil effective mem")
169 }
170 if db.frozenMem != nil {
171 db.frozenMem.incref()
172 }
173 return db.mem, db.frozenMem
174 }
175
176
177 func (db *DB) getEffectiveMem() *memDB {
178 db.memMu.RLock()
179 defer db.memMu.RUnlock()
180 if db.mem != nil {
181 db.mem.incref()
182 } else if !db.isClosed() {
183 panic("nil effective mem")
184 }
185 return db.mem
186 }
187
188
189 func (db *DB) getFrozenMem() *memDB {
190 db.memMu.RLock()
191 defer db.memMu.RUnlock()
192 if db.frozenMem != nil {
193 db.frozenMem.incref()
194 }
195 return db.frozenMem
196 }
197
198
199 func (db *DB) dropFrozenMem() {
200 db.memMu.Lock()
201 if err := db.s.stor.Remove(db.frozenJournalFd); err != nil {
202 db.logf("journal@remove removing @%d %q", db.frozenJournalFd.Num, err)
203 } else {
204 db.logf("journal@remove removed @%d", db.frozenJournalFd.Num)
205 }
206 db.frozenJournalFd = storage.FileDesc{}
207 db.frozenMem.decref()
208 db.frozenMem = nil
209 db.memMu.Unlock()
210 }
211
212
213 func (db *DB) clearMems() {
214 db.memMu.Lock()
215 db.mem = nil
216 db.frozenMem = nil
217 db.memMu.Unlock()
218 }
219
220
221 func (db *DB) setClosed() bool {
222 return atomic.CompareAndSwapUint32(&db.closed, 0, 1)
223 }
224
225
226 func (db *DB) isClosed() bool {
227 return atomic.LoadUint32(&db.closed) != 0
228 }
229
230
231 func (db *DB) ok() error {
232 if db.isClosed() {
233 return ErrClosed
234 }
235 return nil
236 }
237
View as plain text