1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "fmt"
11 "sync/atomic"
12 "time"
13
14 "github.com/syndtr/goleveldb/leveldb/journal"
15 "github.com/syndtr/goleveldb/leveldb/storage"
16 )
17
18
19
20 type dropper struct {
21 s *session
22 fd storage.FileDesc
23 }
24
25 func (d dropper) Drop(err error) {
26 if e, ok := err.(*journal.ErrCorrupted); ok {
27 d.s.logf("journal@drop %s-%d S·%s %q", d.fd.Type, d.fd.Num, shortenb(int64(e.Size)), e.Reason)
28 } else {
29 d.s.logf("journal@drop %s-%d %q", d.fd.Type, d.fd.Num, err)
30 }
31 }
32
33 func (s *session) log(v ...interface{}) { s.stor.Log(fmt.Sprint(v...)) }
34 func (s *session) logf(format string, v ...interface{}) { s.stor.Log(fmt.Sprintf(format, v...)) }
35
36
37
38 func (s *session) newTemp() storage.FileDesc {
39 num := atomic.AddInt64(&s.stTempFileNum, 1) - 1
40 return storage.FileDesc{Type: storage.TypeTemp, Num: num}
41 }
42
43
44
45 const (
46
47
48 maxCachedNumber = 256
49
50
51
52 maxCachedTime = 5 * time.Minute
53 )
54
55
56
57 type vDelta struct {
58 vid int64
59 added []int64
60 deleted []int64
61 }
62
63
64 type vTask struct {
65 vid int64
66 files []tFiles
67 created time.Time
68 }
69
70 func (s *session) refLoop() {
71 var (
72 fileRef = make(map[int64]int)
73 ref = make(map[int64]*vTask)
74 deltas = make(map[int64]*vDelta)
75 referenced = make(map[int64]struct{})
76 released = make(map[int64]*vDelta)
77 abandoned = make(map[int64]struct{})
78 next, last int64
79 )
80
81
82 addFileRef := func(fnum int64, ref int) int {
83 ref += fileRef[fnum]
84 if ref > 0 {
85 fileRef[fnum] = ref
86 } else if ref == 0 {
87 delete(fileRef, fnum)
88 } else {
89 panic(fmt.Sprintf("negative ref: %v", fnum))
90 }
91 return ref
92 }
93
94 skipAbandoned := func() bool {
95 if _, exist := abandoned[next]; exist {
96 delete(abandoned, next)
97 return true
98 }
99 return false
100 }
101
102 applyDelta := func(d *vDelta) {
103 for _, t := range d.added {
104 addFileRef(t, 1)
105 }
106 for _, t := range d.deleted {
107 if addFileRef(t, -1) == 0 {
108 s.tops.remove(storage.FileDesc{Type: storage.TypeTable, Num: t})
109 }
110 }
111 }
112
113 timer := time.NewTimer(0)
114 <-timer.C
115 defer timer.Stop()
116
117
118
119
120
121
122
123
124
125
126
127 processTasks := func() {
128 timer.Reset(maxCachedTime)
129
130 for {
131
132 if skipAbandoned() {
133 next++
134 continue
135 }
136
137 if _, exist := released[next]; exist {
138 break
139 }
140
141 if _, exist := ref[next]; !exist {
142 break
143 }
144 if last-next < maxCachedNumber && time.Since(ref[next].created) < maxCachedTime {
145 break
146 }
147
148
149
150 for _, tt := range ref[next].files {
151 for _, t := range tt {
152 addFileRef(t.fd.Num, 1)
153 }
154 }
155
156
157
158
159 if d := deltas[next]; d != nil {
160 applyDelta(d)
161 }
162 referenced[next] = struct{}{}
163 delete(ref, next)
164 delete(deltas, next)
165 next++
166 }
167
168
169 for {
170 if skipAbandoned() {
171 next++
172 continue
173 }
174 if d, exist := released[next]; exist {
175 if d != nil {
176 applyDelta(d)
177 }
178 delete(released, next)
179 next++
180 continue
181 }
182 return
183 }
184 }
185
186 for {
187 processTasks()
188
189 select {
190 case t := <-s.refCh:
191 if _, exist := ref[t.vid]; exist {
192 panic("duplicate reference request")
193 }
194 ref[t.vid] = t
195 if t.vid > last {
196 last = t.vid
197 }
198
199 case d := <-s.deltaCh:
200 if _, exist := ref[d.vid]; !exist {
201 if _, exist2 := referenced[d.vid]; !exist2 {
202 panic("invalid release request")
203 }
204
205
206 applyDelta(d)
207 continue
208 }
209 deltas[d.vid] = d
210
211 case t := <-s.relCh:
212 if _, exist := referenced[t.vid]; exist {
213 for _, tt := range t.files {
214 for _, t := range tt {
215 if addFileRef(t.fd.Num, -1) == 0 {
216 s.tops.remove(t.fd)
217 }
218 }
219 }
220 delete(referenced, t.vid)
221 continue
222 }
223 if _, exist := ref[t.vid]; !exist {
224 panic("invalid release request")
225 }
226 released[t.vid] = deltas[t.vid]
227 delete(deltas, t.vid)
228 delete(ref, t.vid)
229
230 case id := <-s.abandon:
231 if id >= next {
232 abandoned[id] = struct{}{}
233 }
234
235 case <-timer.C:
236
237 case r := <-s.fileRefCh:
238 ref := make(map[int64]int)
239 for f, c := range fileRef {
240 ref[f] = c
241 }
242 r <- ref
243
244 case <-s.closeC:
245 s.closeW.Done()
246 return
247 }
248 }
249 }
250
251
252
253 func (s *session) version() *version {
254 s.vmu.Lock()
255 defer s.vmu.Unlock()
256 s.stVersion.incref()
257 return s.stVersion
258 }
259
260 func (s *session) tLen(level int) int {
261 s.vmu.Lock()
262 defer s.vmu.Unlock()
263 return s.stVersion.tLen(level)
264 }
265
266
267 func (s *session) setVersion(r *sessionRecord, v *version) {
268 s.vmu.Lock()
269 defer s.vmu.Unlock()
270
271
272 v.incref()
273 if s.stVersion != nil {
274 if r != nil {
275 var (
276 added = make([]int64, 0, len(r.addedTables))
277 deleted = make([]int64, 0, len(r.deletedTables))
278 )
279 for _, t := range r.addedTables {
280 added = append(added, t.num)
281 }
282 for _, t := range r.deletedTables {
283 deleted = append(deleted, t.num)
284 }
285 select {
286 case s.deltaCh <- &vDelta{vid: s.stVersion.id, added: added, deleted: deleted}:
287 case <-v.s.closeC:
288 s.log("reference loop already exist")
289 }
290 }
291
292 s.stVersion.releaseNB()
293 }
294 s.stVersion = v
295 }
296
297
298 func (s *session) nextFileNum() int64 {
299 return atomic.LoadInt64(&s.stNextFileNum)
300 }
301
302
303 func (s *session) setNextFileNum(num int64) {
304 atomic.StoreInt64(&s.stNextFileNum, num)
305 }
306
307
308 func (s *session) markFileNum(num int64) {
309 nextFileNum := num + 1
310 for {
311 old, x := atomic.LoadInt64(&s.stNextFileNum), nextFileNum
312 if old > x {
313 x = old
314 }
315 if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
316 break
317 }
318 }
319 }
320
321
322 func (s *session) allocFileNum() int64 {
323 return atomic.AddInt64(&s.stNextFileNum, 1) - 1
324 }
325
326
327 func (s *session) reuseFileNum(num int64) {
328 for {
329 old, x := atomic.LoadInt64(&s.stNextFileNum), num
330 if old != x+1 {
331 x = old
332 }
333 if atomic.CompareAndSwapInt64(&s.stNextFileNum, old, x) {
334 break
335 }
336 }
337 }
338
339
340 func (s *session) setCompPtr(level int, ik internalKey) {
341 if level >= len(s.stCompPtrs) {
342 newCompPtrs := make([]internalKey, level+1)
343 copy(newCompPtrs, s.stCompPtrs)
344 s.stCompPtrs = newCompPtrs
345 }
346 s.stCompPtrs[level] = append(internalKey{}, ik...)
347 }
348
349
350 func (s *session) getCompPtr(level int) internalKey {
351 if level >= len(s.stCompPtrs) {
352 return nil
353 }
354 return s.stCompPtrs[level]
355 }
356
357
358
359
360
361 func (s *session) fillRecord(r *sessionRecord, snapshot bool) {
362 r.setNextFileNum(s.nextFileNum())
363
364 if snapshot {
365 if !r.has(recJournalNum) {
366 r.setJournalNum(s.stJournalNum)
367 }
368
369 if !r.has(recSeqNum) {
370 r.setSeqNum(s.stSeqNum)
371 }
372
373 for level, ik := range s.stCompPtrs {
374 if ik != nil {
375 r.addCompPtr(level, ik)
376 }
377 }
378
379 r.setComparer(s.icmp.uName())
380 }
381 }
382
383
384
385 func (s *session) recordCommited(rec *sessionRecord) {
386 if rec.has(recJournalNum) {
387 s.stJournalNum = rec.journalNum
388 }
389
390 if rec.has(recPrevJournalNum) {
391 s.stPrevJournalNum = rec.prevJournalNum
392 }
393
394 if rec.has(recSeqNum) {
395 s.stSeqNum = rec.seqNum
396 }
397
398 for _, r := range rec.compPtrs {
399 s.setCompPtr(r.level, r.ikey)
400 }
401 }
402
403
404 func (s *session) newManifest(rec *sessionRecord, v *version) (err error) {
405 fd := storage.FileDesc{Type: storage.TypeManifest, Num: s.allocFileNum()}
406 writer, err := s.stor.Create(fd)
407 if err != nil {
408 return
409 }
410 jw := journal.NewWriter(writer)
411
412 if v == nil {
413 v = s.version()
414 defer v.release()
415 }
416 if rec == nil {
417 rec = &sessionRecord{}
418 }
419 s.fillRecord(rec, true)
420 v.fillRecord(rec)
421
422 defer func() {
423 if err == nil {
424 s.recordCommited(rec)
425 if s.manifest != nil {
426 s.manifest.Close()
427 }
428 if s.manifestWriter != nil {
429 s.manifestWriter.Close()
430 }
431 if !s.manifestFd.Zero() {
432 err = s.stor.Remove(s.manifestFd)
433 }
434 s.manifestFd = fd
435 s.manifestWriter = writer
436 s.manifest = jw
437 } else {
438 writer.Close()
439 if rerr := s.stor.Remove(fd); err != nil {
440 err = fmt.Errorf("newManifest error: %v, cleanup error (%v)", err, rerr)
441 }
442 s.reuseFileNum(fd.Num)
443 }
444 }()
445
446 w, err := jw.Next()
447 if err != nil {
448 return
449 }
450 err = rec.encode(w)
451 if err != nil {
452 return
453 }
454 err = jw.Flush()
455 if err != nil {
456 return
457 }
458 if !s.o.GetNoSync() {
459 err = writer.Sync()
460 if err != nil {
461 return
462 }
463 }
464 err = s.stor.SetMeta(fd)
465 return
466 }
467
468
469 func (s *session) flushManifest(rec *sessionRecord) (err error) {
470 s.fillRecord(rec, false)
471 w, err := s.manifest.Next()
472 if err != nil {
473 return
474 }
475 err = rec.encode(w)
476 if err != nil {
477 return
478 }
479 err = s.manifest.Flush()
480 if err != nil {
481 return
482 }
483 if !s.o.GetNoSync() {
484 err = s.manifestWriter.Sync()
485 if err != nil {
486 return
487 }
488 }
489 s.recordCommited(rec)
490 return
491 }
492
View as plain text