1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "sort"
11 "sync/atomic"
12
13 "github.com/syndtr/goleveldb/leveldb/iterator"
14 "github.com/syndtr/goleveldb/leveldb/memdb"
15 "github.com/syndtr/goleveldb/leveldb/opt"
16 )
17
18 const (
19 undefinedCompaction = iota
20 level0Compaction
21 nonLevel0Compaction
22 seekCompaction
23 )
24
25 func (s *session) pickMemdbLevel(umin, umax []byte, maxLevel int) int {
26 v := s.version()
27 defer v.release()
28 return v.pickMemdbLevel(umin, umax, maxLevel)
29 }
30
31 func (s *session) flushMemdb(rec *sessionRecord, mdb *memdb.DB, maxLevel int) (int, error) {
32
33 iter := mdb.NewIterator(nil)
34 defer iter.Release()
35 t, n, err := s.tops.createFrom(iter)
36 if err != nil {
37 return 0, err
38 }
39
40
41
42
43
44
45
46
47
48 flushLevel := s.pickMemdbLevel(t.imin.ukey(), t.imax.ukey(), maxLevel)
49 rec.addTableFile(flushLevel, t)
50
51 s.logf("memdb@flush created L%d@%d N·%d S·%s %q:%q", flushLevel, t.fd.Num, n, shortenb(t.size), t.imin, t.imax)
52 return flushLevel, nil
53 }
54
55
56 func (s *session) pickCompaction() *compaction {
57 v := s.version()
58
59 var sourceLevel int
60 var t0 tFiles
61 var typ int
62 if v.cScore >= 1 {
63 sourceLevel = v.cLevel
64 cptr := s.getCompPtr(sourceLevel)
65 tables := v.levels[sourceLevel]
66 if cptr != nil && sourceLevel > 0 {
67 n := len(tables)
68 if i := sort.Search(n, func(i int) bool {
69 return s.icmp.Compare(tables[i].imax, cptr) > 0
70 }); i < n {
71 t0 = append(t0, tables[i])
72 }
73 }
74 if len(t0) == 0 {
75 t0 = append(t0, tables[0])
76 }
77 if sourceLevel == 0 {
78 typ = level0Compaction
79 } else {
80 typ = nonLevel0Compaction
81 }
82 } else {
83 if p := atomic.LoadPointer(&v.cSeek); p != nil {
84 ts := (*tSet)(p)
85 sourceLevel = ts.level
86 t0 = append(t0, ts.table)
87 typ = seekCompaction
88 } else {
89 v.release()
90 return nil
91 }
92 }
93
94 return newCompaction(s, v, sourceLevel, t0, typ)
95 }
96
97
98 func (s *session) getCompactionRange(sourceLevel int, umin, umax []byte, noLimit bool) *compaction {
99 v := s.version()
100
101 if sourceLevel >= len(v.levels) {
102 v.release()
103 return nil
104 }
105
106 t0 := v.levels[sourceLevel].getOverlaps(nil, s.icmp, umin, umax, sourceLevel == 0)
107 if len(t0) == 0 {
108 v.release()
109 return nil
110 }
111
112
113
114
115
116 if !noLimit && sourceLevel > 0 {
117 limit := int64(v.s.o.GetCompactionSourceLimit(sourceLevel))
118 total := int64(0)
119 for i, t := range t0 {
120 total += t.size
121 if total >= limit {
122 s.logf("table@compaction limiting F·%d -> F·%d", len(t0), i+1)
123 t0 = t0[:i+1]
124 break
125 }
126 }
127 }
128
129 typ := level0Compaction
130 if sourceLevel != 0 {
131 typ = nonLevel0Compaction
132 }
133 return newCompaction(s, v, sourceLevel, t0, typ)
134 }
135
136 func newCompaction(s *session, v *version, sourceLevel int, t0 tFiles, typ int) *compaction {
137 c := &compaction{
138 s: s,
139 v: v,
140 typ: typ,
141 sourceLevel: sourceLevel,
142 levels: [2]tFiles{t0, nil},
143 maxGPOverlaps: int64(s.o.GetCompactionGPOverlaps(sourceLevel)),
144 tPtrs: make([]int, len(v.levels)),
145 }
146 c.expand()
147 c.save()
148 return c
149 }
150
151
152 type compaction struct {
153 s *session
154 v *version
155
156 typ int
157 sourceLevel int
158 levels [2]tFiles
159 maxGPOverlaps int64
160
161 gp tFiles
162 gpi int
163 seenKey bool
164 gpOverlappedBytes int64
165 imin, imax internalKey
166 tPtrs []int
167 released bool
168
169 snapGPI int
170 snapSeenKey bool
171 snapGPOverlappedBytes int64
172 snapTPtrs []int
173 }
174
175 func (c *compaction) save() {
176 c.snapGPI = c.gpi
177 c.snapSeenKey = c.seenKey
178 c.snapGPOverlappedBytes = c.gpOverlappedBytes
179 c.snapTPtrs = append(c.snapTPtrs[:0], c.tPtrs...)
180 }
181
182 func (c *compaction) restore() {
183 c.gpi = c.snapGPI
184 c.seenKey = c.snapSeenKey
185 c.gpOverlappedBytes = c.snapGPOverlappedBytes
186 c.tPtrs = append(c.tPtrs[:0], c.snapTPtrs...)
187 }
188
189 func (c *compaction) release() {
190 if !c.released {
191 c.released = true
192 c.v.release()
193 }
194 }
195
196
197 func (c *compaction) expand() {
198 limit := int64(c.s.o.GetCompactionExpandLimit(c.sourceLevel))
199 vt0 := c.v.levels[c.sourceLevel]
200 vt1 := tFiles{}
201 if level := c.sourceLevel + 1; level < len(c.v.levels) {
202 vt1 = c.v.levels[level]
203 }
204
205 t0, t1 := c.levels[0], c.levels[1]
206 imin, imax := t0.getRange(c.s.icmp)
207
208
209 if c.sourceLevel == 0 {
210
211 t0 = vt0.getOverlaps(t0, c.s.icmp, imin.ukey(), imax.ukey(), c.sourceLevel == 0)
212 if len(t0) != len(c.levels[0]) {
213 imin, imax = t0.getRange(c.s.icmp)
214 }
215 }
216 t1 = vt1.getOverlaps(t1, c.s.icmp, imin.ukey(), imax.ukey(), false)
217
218 amin, amax := append(t0, t1...).getRange(c.s.icmp)
219
220
221
222 if len(t1) > 0 {
223 exp0 := vt0.getOverlaps(nil, c.s.icmp, amin.ukey(), amax.ukey(), c.sourceLevel == 0)
224 if len(exp0) > len(t0) && t1.size()+exp0.size() < limit {
225 xmin, xmax := exp0.getRange(c.s.icmp)
226 exp1 := vt1.getOverlaps(nil, c.s.icmp, xmin.ukey(), xmax.ukey(), false)
227 if len(exp1) == len(t1) {
228 c.s.logf("table@compaction expanding L%d+L%d (F·%d S·%s)+(F·%d S·%s) -> (F·%d S·%s)+(F·%d S·%s)",
229 c.sourceLevel, c.sourceLevel+1, len(t0), shortenb(t0.size()), len(t1), shortenb(t1.size()),
230 len(exp0), shortenb(exp0.size()), len(exp1), shortenb(exp1.size()))
231 imin, imax = xmin, xmax
232 t0, t1 = exp0, exp1
233 amin, amax = append(t0, t1...).getRange(c.s.icmp)
234 }
235 }
236 }
237
238
239
240 if level := c.sourceLevel + 2; level < len(c.v.levels) {
241 c.gp = c.v.levels[level].getOverlaps(c.gp, c.s.icmp, amin.ukey(), amax.ukey(), false)
242 }
243
244 c.levels[0], c.levels[1] = t0, t1
245 c.imin, c.imax = imin, imax
246 }
247
248
249 func (c *compaction) trivial() bool {
250 return len(c.levels[0]) == 1 && len(c.levels[1]) == 0 && c.gp.size() <= c.maxGPOverlaps
251 }
252
253 func (c *compaction) baseLevelForKey(ukey []byte) bool {
254 for level := c.sourceLevel + 2; level < len(c.v.levels); level++ {
255 tables := c.v.levels[level]
256 for c.tPtrs[level] < len(tables) {
257 t := tables[c.tPtrs[level]]
258 if c.s.icmp.uCompare(ukey, t.imax.ukey()) <= 0 {
259
260 if c.s.icmp.uCompare(ukey, t.imin.ukey()) >= 0 {
261
262 return false
263 }
264 break
265 }
266 c.tPtrs[level]++
267 }
268 }
269 return true
270 }
271
272 func (c *compaction) shouldStopBefore(ikey internalKey) bool {
273 for ; c.gpi < len(c.gp); c.gpi++ {
274 gp := c.gp[c.gpi]
275 if c.s.icmp.Compare(ikey, gp.imax) <= 0 {
276 break
277 }
278 if c.seenKey {
279 c.gpOverlappedBytes += gp.size
280 }
281 }
282 c.seenKey = true
283
284 if c.gpOverlappedBytes > c.maxGPOverlaps {
285
286 c.gpOverlappedBytes = 0
287 return true
288 }
289 return false
290 }
291
292
293 func (c *compaction) newIterator() iterator.Iterator {
294
295 icap := len(c.levels)
296 if c.sourceLevel == 0 {
297
298 icap = len(c.levels[0]) + 1
299 }
300 its := make([]iterator.Iterator, 0, icap)
301
302
303 ro := &opt.ReadOptions{
304 DontFillCache: true,
305 Strict: opt.StrictOverride,
306 }
307 strict := c.s.o.GetStrict(opt.StrictCompaction)
308 if strict {
309 ro.Strict |= opt.StrictReader
310 }
311
312 for i, tables := range c.levels {
313 if len(tables) == 0 {
314 continue
315 }
316
317
318 if c.sourceLevel+i == 0 {
319 for _, t := range tables {
320 its = append(its, c.s.tops.newIterator(t, nil, ro))
321 }
322 } else {
323 it := iterator.NewIndexedIterator(tables.newIndexIterator(c.s.tops, c.s.icmp, nil, ro), strict)
324 its = append(its, it)
325 }
326 }
327
328 return iterator.NewMergedIterator(its, c.s.icmp, strict)
329 }
330
View as plain text