1
2
3
4
5
6
7 package leveldb
8
9 import (
10 "fmt"
11 "io"
12 "os"
13 "sync"
14
15 "github.com/syndtr/goleveldb/leveldb/errors"
16 "github.com/syndtr/goleveldb/leveldb/journal"
17 "github.com/syndtr/goleveldb/leveldb/opt"
18 "github.com/syndtr/goleveldb/leveldb/storage"
19 )
20
21
22
23 type ErrManifestCorrupted struct {
24 Field string
25 Reason string
26 }
27
28 func (e *ErrManifestCorrupted) Error() string {
29 return fmt.Sprintf("leveldb: manifest corrupted (field '%s'): %s", e.Field, e.Reason)
30 }
31
32 func newErrManifestCorrupted(fd storage.FileDesc, field, reason string) error {
33 return errors.NewErrCorrupted(fd, &ErrManifestCorrupted{field, reason})
34 }
35
36
37 type session struct {
38
39 stNextFileNum int64
40 stJournalNum int64
41 stPrevJournalNum int64
42 stTempFileNum int64
43 stSeqNum uint64
44
45 stor *iStorage
46 storLock storage.Locker
47 o *cachedOptions
48 icmp *iComparer
49 tops *tOps
50
51 manifest *journal.Writer
52 manifestWriter storage.Writer
53 manifestFd storage.FileDesc
54
55 stCompPtrs []internalKey
56 stVersion *version
57 ntVersionID int64
58 refCh chan *vTask
59 relCh chan *vTask
60 deltaCh chan *vDelta
61 abandon chan int64
62 closeC chan struct{}
63 closeW sync.WaitGroup
64 vmu sync.Mutex
65
66
67 fileRefCh chan chan map[int64]int
68 }
69
70
71 func newSession(stor storage.Storage, o *opt.Options) (s *session, err error) {
72 if stor == nil {
73 return nil, os.ErrInvalid
74 }
75 storLock, err := stor.Lock()
76 if err != nil {
77 return
78 }
79 s = &session{
80 stor: newIStorage(stor),
81 storLock: storLock,
82 refCh: make(chan *vTask),
83 relCh: make(chan *vTask),
84 deltaCh: make(chan *vDelta),
85 abandon: make(chan int64),
86 fileRefCh: make(chan chan map[int64]int),
87 closeC: make(chan struct{}),
88 }
89 s.setOptions(o)
90 s.tops = newTableOps(s)
91
92 s.closeW.Add(1)
93 go s.refLoop()
94 s.setVersion(nil, newVersion(s))
95 s.log("log@legend F·NumFile S·FileSize N·Entry C·BadEntry B·BadBlock Ke·KeyError D·DroppedEntry L·Level Q·SeqNum T·TimeElapsed")
96 return
97 }
98
99
100 func (s *session) close() {
101 s.tops.close()
102 if s.manifest != nil {
103 s.manifest.Close()
104 }
105 if s.manifestWriter != nil {
106 s.manifestWriter.Close()
107 }
108 s.manifest = nil
109 s.manifestWriter = nil
110 s.setVersion(nil, &version{s: s, closing: true, id: s.ntVersionID})
111
112
113 close(s.closeC)
114 s.closeW.Wait()
115 }
116
117
118 func (s *session) release() {
119 s.storLock.Unlock()
120 }
121
122
123 func (s *session) create() error {
124
125 return s.newManifest(nil, nil)
126 }
127
128
129 func (s *session) recover() (err error) {
130 defer func() {
131 if os.IsNotExist(err) {
132
133
134 if fds, _ := s.stor.List(storage.TypeAll); len(fds) > 0 {
135 err = &errors.ErrCorrupted{Err: errors.New("database entry point either missing or corrupted")}
136 }
137 }
138 }()
139
140 fd, err := s.stor.GetMeta()
141 if err != nil {
142 return
143 }
144
145 reader, err := s.stor.Open(fd)
146 if err != nil {
147 return
148 }
149 defer reader.Close()
150
151 var (
152
153 strict = s.o.GetStrict(opt.StrictManifest)
154
155 jr = journal.NewReader(reader, dropper{s, fd}, strict, true)
156 rec = &sessionRecord{}
157 staging = s.stVersion.newStaging()
158 )
159 for {
160 var r io.Reader
161 r, err = jr.Next()
162 if err != nil {
163 if err == io.EOF {
164 err = nil
165 break
166 }
167 return errors.SetFd(err, fd)
168 }
169
170 err = rec.decode(r)
171 if err == nil {
172
173 for _, r := range rec.compPtrs {
174 s.setCompPtr(r.level, r.ikey)
175 }
176
177 staging.commit(rec)
178 } else {
179 err = errors.SetFd(err, fd)
180 if strict || !errors.IsCorrupted(err) {
181 return
182 }
183 s.logf("manifest error: %v (skipped)", errors.SetFd(err, fd))
184 }
185 rec.resetCompPtrs()
186 rec.resetAddedTables()
187 rec.resetDeletedTables()
188 }
189
190 switch {
191 case !rec.has(recComparer):
192 return newErrManifestCorrupted(fd, "comparer", "missing")
193 case rec.comparer != s.icmp.uName():
194 return newErrManifestCorrupted(fd, "comparer", fmt.Sprintf("mismatch: want '%s', got '%s'", s.icmp.uName(), rec.comparer))
195 case !rec.has(recNextFileNum):
196 return newErrManifestCorrupted(fd, "next-file-num", "missing")
197 case !rec.has(recJournalNum):
198 return newErrManifestCorrupted(fd, "journal-file-num", "missing")
199 case !rec.has(recSeqNum):
200 return newErrManifestCorrupted(fd, "seq-num", "missing")
201 }
202
203 s.manifestFd = fd
204 s.setVersion(rec, staging.finish(false))
205 s.setNextFileNum(rec.nextFileNum)
206 s.recordCommited(rec)
207 return nil
208 }
209
210
211 func (s *session) commit(r *sessionRecord, trivial bool) (err error) {
212 v := s.version()
213 defer v.release()
214
215
216 nv := v.spawn(r, trivial)
217
218
219 defer func() {
220 if err != nil {
221 s.abandon <- nv.id
222 s.logf("commit@abandon useless vid D%d", nv.id)
223 }
224 }()
225
226 if s.manifest == nil {
227
228 err = s.newManifest(r, nv)
229 } else if s.manifest.Size() >= s.o.GetMaxManifestFileSize() {
230
231 err = s.newManifest(nil, nv)
232 } else {
233 err = s.flushManifest(r)
234 }
235
236
237 if err == nil {
238 s.setVersion(r, nv)
239 }
240
241 return
242 }
243
View as plain text