1
2
3
4
5
6
7 package testutil
8
9 import (
10 "bytes"
11 "fmt"
12 "io"
13 "math/rand"
14 "os"
15 "path/filepath"
16 "runtime"
17 "strings"
18 "sync"
19
20 . "github.com/onsi/gomega"
21
22 "github.com/syndtr/goleveldb/leveldb/storage"
23 )
24
25 var (
26 storageMu sync.Mutex
27 storageUseFS = true
28 storageKeepFS = false
29 storageNum int
30 )
31
32 type StorageMode int
33
34 const (
35 ModeOpen StorageMode = 1 << iota
36 ModeCreate
37 ModeRemove
38 ModeRename
39 ModeRead
40 ModeWrite
41 ModeSync
42 ModeClose
43 )
44
45 const (
46 modeOpen = iota
47 modeCreate
48 modeRemove
49 modeRename
50 modeRead
51 modeWrite
52 modeSync
53 modeClose
54
55 modeCount
56 )
57
58 const (
59 typeManifest = iota
60 typeJournal
61 typeTable
62 typeTemp
63
64 typeCount
65 )
66
67 const flattenCount = modeCount * typeCount
68
69 func flattenType(m StorageMode, t storage.FileType) int {
70 var x int
71 switch m {
72 case ModeOpen:
73 x = modeOpen
74 case ModeCreate:
75 x = modeCreate
76 case ModeRemove:
77 x = modeRemove
78 case ModeRename:
79 x = modeRename
80 case ModeRead:
81 x = modeRead
82 case ModeWrite:
83 x = modeWrite
84 case ModeSync:
85 x = modeSync
86 case ModeClose:
87 x = modeClose
88 default:
89 panic("invalid storage mode")
90 }
91 x *= typeCount
92 switch t {
93 case storage.TypeManifest:
94 return x + typeManifest
95 case storage.TypeJournal:
96 return x + typeJournal
97 case storage.TypeTable:
98 return x + typeTable
99 case storage.TypeTemp:
100 return x + typeTemp
101 default:
102 panic("invalid file type")
103 }
104 }
105
106 func listFlattenType(m StorageMode, t storage.FileType) []int {
107 ret := make([]int, 0, flattenCount)
108 add := func(x int) {
109 x *= typeCount
110 switch {
111 case t&storage.TypeManifest != 0:
112 ret = append(ret, x+typeManifest)
113 case t&storage.TypeJournal != 0:
114 ret = append(ret, x+typeJournal)
115 case t&storage.TypeTable != 0:
116 ret = append(ret, x+typeTable)
117 case t&storage.TypeTemp != 0:
118 ret = append(ret, x+typeTemp)
119 }
120 }
121 switch {
122 case m&ModeOpen != 0:
123 add(modeOpen)
124 case m&ModeCreate != 0:
125 add(modeCreate)
126 case m&ModeRemove != 0:
127 add(modeRemove)
128 case m&ModeRename != 0:
129 add(modeRename)
130 case m&ModeRead != 0:
131 add(modeRead)
132 case m&ModeWrite != 0:
133 add(modeWrite)
134 case m&ModeSync != 0:
135 add(modeSync)
136 case m&ModeClose != 0:
137 add(modeClose)
138 }
139 return ret
140 }
141
142 func packFile(fd storage.FileDesc) uint64 {
143 if fd.Num>>(63-typeCount) != 0 {
144 panic("overflow")
145 }
146 return uint64(fd.Num<<typeCount) | uint64(fd.Type)
147 }
148
149 func unpackFile(x uint64) storage.FileDesc {
150 return storage.FileDesc{
151 Type: storage.FileType(x) & storage.TypeAll,
152 Num: int64(x >> typeCount),
153 }
154 }
155
156 type emulatedError struct {
157 err error
158 }
159
160 func (err emulatedError) Error() string {
161 return fmt.Sprintf("emulated storage error: %v", err.err)
162 }
163
164 type storageLock struct {
165 s *Storage
166 l storage.Locker
167 }
168
169 func (l storageLock) Unlock() {
170 l.l.Unlock()
171 l.s.logI("storage lock released")
172 }
173
174 type reader struct {
175 s *Storage
176 fd storage.FileDesc
177 storage.Reader
178 }
179
180 func (r *reader) Read(p []byte) (n int, err error) {
181 err = r.s.emulateError(ModeRead, r.fd.Type)
182 if err == nil {
183 r.s.stall(ModeRead, r.fd.Type)
184 n, err = r.Reader.Read(p)
185 }
186 r.s.count(ModeRead, r.fd.Type, n)
187 if err != nil && err != io.EOF {
188 r.s.logI("read error, fd=%s n=%d err=%v", r.fd, n, err)
189 }
190 return
191 }
192
193 func (r *reader) ReadAt(p []byte, off int64) (n int, err error) {
194 err = r.s.emulateError(ModeRead, r.fd.Type)
195 if err == nil {
196 r.s.stall(ModeRead, r.fd.Type)
197 n, err = r.Reader.ReadAt(p, off)
198 }
199 r.s.count(ModeRead, r.fd.Type, n)
200 if err != nil && err != io.EOF {
201 r.s.logI("readAt error, fd=%s offset=%d n=%d err=%v", r.fd, off, n, err)
202 }
203 return
204 }
205
206 func (r *reader) Close() (err error) {
207 return r.s.fileClose(r.fd, r.Reader)
208 }
209
210 type writer struct {
211 s *Storage
212 fd storage.FileDesc
213 storage.Writer
214 }
215
216 func (w *writer) Write(p []byte) (n int, err error) {
217 err = w.s.emulateError(ModeWrite, w.fd.Type)
218 if err == nil {
219 w.s.stall(ModeWrite, w.fd.Type)
220 n, err = w.Writer.Write(p)
221 }
222 w.s.count(ModeWrite, w.fd.Type, n)
223 if err != nil && err != io.EOF {
224 w.s.logI("write error, fd=%s n=%d err=%v", w.fd, n, err)
225 }
226 return
227 }
228
229 func (w *writer) Sync() (err error) {
230 err = w.s.emulateError(ModeSync, w.fd.Type)
231 if err == nil {
232 w.s.stall(ModeSync, w.fd.Type)
233 err = w.Writer.Sync()
234 }
235 w.s.count(ModeSync, w.fd.Type, 0)
236 if err != nil {
237 w.s.logI("sync error, fd=%s err=%v", w.fd, err)
238 }
239 return
240 }
241
242 func (w *writer) Close() (err error) {
243 return w.s.fileClose(w.fd, w.Writer)
244 }
245
246 type Storage struct {
247 storage.Storage
248 path string
249 onClose func() (preserve bool, err error)
250 onLog func(str string)
251
252 lmu sync.Mutex
253 lb bytes.Buffer
254
255 mu sync.Mutex
256 rand *rand.Rand
257
258 opens map[uint64]bool
259 counters [flattenCount]int
260 bytesCounter [flattenCount]int64
261 emulatedError [flattenCount]error
262 emulatedErrorOnce [flattenCount]bool
263 emulatedRandomError [flattenCount]error
264 emulatedRandomErrorProb [flattenCount]float64
265 stallCond sync.Cond
266 stalled [flattenCount]bool
267 }
268
269 func (s *Storage) log(skip int, str string) {
270 s.lmu.Lock()
271 defer s.lmu.Unlock()
272 _, file, line, ok := runtime.Caller(skip + 2)
273 if ok {
274
275 if index := strings.LastIndex(file, "/"); index >= 0 {
276 file = file[index+1:]
277 } else if index = strings.LastIndex(file, "\\"); index >= 0 {
278 file = file[index+1:]
279 }
280 } else {
281 file = "???"
282 line = 1
283 }
284 fmt.Fprintf(&s.lb, "%s:%d: ", file, line)
285 lines := strings.Split(str, "\n")
286 if l := len(lines); l > 1 && lines[l-1] == "" {
287 lines = lines[:l-1]
288 }
289 for i, line := range lines {
290 if i > 0 {
291 s.lb.WriteString("\n\t")
292 }
293 s.lb.WriteString(line)
294 }
295 if s.onLog != nil {
296 s.onLog(s.lb.String())
297 s.lb.Reset()
298 } else {
299 s.lb.WriteByte('\n')
300 }
301 }
302
303 func (s *Storage) logISkip(skip int, format string, args ...interface{}) {
304 pc, _, _, ok := runtime.Caller(skip + 1)
305 if ok {
306 if f := runtime.FuncForPC(pc); f != nil {
307 fname := f.Name()
308 if index := strings.LastIndex(fname, "."); index >= 0 {
309 fname = fname[index+1:]
310 }
311 format = fname + ": " + format
312 }
313 }
314 s.log(skip+1, fmt.Sprintf(format, args...))
315 }
316
317 func (s *Storage) logI(format string, args ...interface{}) {
318 s.logISkip(1, format, args...)
319 }
320
321 func (s *Storage) OnLog(onLog func(log string)) {
322 s.lmu.Lock()
323 s.onLog = onLog
324 if s.lb.Len() != 0 {
325 log := s.lb.String()
326 s.onLog(log[:len(log)-1])
327 s.lb.Reset()
328 }
329 s.lmu.Unlock()
330 }
331
332 func (s *Storage) Log(str string) {
333 s.log(1, "Log: "+str)
334 s.Storage.Log(str)
335 }
336
337 func (s *Storage) Lock() (l storage.Locker, err error) {
338 l, err = s.Storage.Lock()
339 if err != nil {
340 s.logI("storage locking failed, err=%v", err)
341 } else {
342 s.logI("storage locked")
343 l = storageLock{s, l}
344 }
345 return
346 }
347
348 func (s *Storage) List(t storage.FileType) (fds []storage.FileDesc, err error) {
349 fds, err = s.Storage.List(t)
350 if err != nil {
351 s.logI("list failed, err=%v", err)
352 return
353 }
354 s.logI("list, type=0x%x count=%d", int(t), len(fds))
355 return
356 }
357
358 func (s *Storage) GetMeta() (fd storage.FileDesc, err error) {
359 fd, err = s.Storage.GetMeta()
360 if err != nil {
361 if !os.IsNotExist(err) {
362 s.logI("get meta failed, err=%v", err)
363 }
364 return
365 }
366 s.logI("get meta, fd=%s", fd)
367 return
368 }
369
370 func (s *Storage) SetMeta(fd storage.FileDesc) error {
371 ExpectWithOffset(1, fd.Type).To(Equal(storage.TypeManifest))
372 err := s.Storage.SetMeta(fd)
373 if err != nil {
374 s.logI("set meta failed, fd=%s err=%v", fd, err)
375 } else {
376 s.logI("set meta, fd=%s", fd)
377 }
378 return err
379 }
380
381 func (s *Storage) fileClose(fd storage.FileDesc, closer io.Closer) (err error) {
382 err = s.emulateError(ModeClose, fd.Type)
383 if err == nil {
384 s.stall(ModeClose, fd.Type)
385 }
386 x := packFile(fd)
387 s.mu.Lock()
388 defer s.mu.Unlock()
389 if err == nil {
390 ExpectWithOffset(2, s.opens).To(HaveKey(x), "File closed, fd=%s", fd)
391 err = closer.Close()
392 }
393 s.countNB(ModeClose, fd.Type, 0)
394 writer := s.opens[x]
395 if err != nil {
396 s.logISkip(1, "file close failed, fd=%s writer=%v err=%v", fd, writer, err)
397 } else {
398 s.logISkip(1, "file closed, fd=%s writer=%v", fd, writer)
399 delete(s.opens, x)
400 }
401 return
402 }
403
404 func (s *Storage) assertOpen(fd storage.FileDesc) {
405 x := packFile(fd)
406 ExpectWithOffset(2, s.opens).NotTo(HaveKey(x), "File open, fd=%s writer=%v", fd, s.opens[x])
407 }
408
409 func (s *Storage) Open(fd storage.FileDesc) (r storage.Reader, err error) {
410 err = s.emulateError(ModeOpen, fd.Type)
411 if err == nil {
412 s.stall(ModeOpen, fd.Type)
413 }
414 s.mu.Lock()
415 defer s.mu.Unlock()
416 if err == nil {
417 s.assertOpen(fd)
418 s.countNB(ModeOpen, fd.Type, 0)
419 r, err = s.Storage.Open(fd)
420 }
421 if err != nil {
422 s.logI("file open failed, fd=%s err=%v", fd, err)
423 } else {
424 s.logI("file opened, fd=%s", fd)
425 s.opens[packFile(fd)] = false
426 r = &reader{s, fd, r}
427 }
428 return
429 }
430
431 func (s *Storage) Create(fd storage.FileDesc) (w storage.Writer, err error) {
432 err = s.emulateError(ModeCreate, fd.Type)
433 if err == nil {
434 s.stall(ModeCreate, fd.Type)
435 }
436 s.mu.Lock()
437 defer s.mu.Unlock()
438 if err == nil {
439 s.assertOpen(fd)
440 s.countNB(ModeCreate, fd.Type, 0)
441 w, err = s.Storage.Create(fd)
442 }
443 if err != nil {
444 s.logI("file create failed, fd=%s err=%v", fd, err)
445 } else {
446 s.logI("file created, fd=%s", fd)
447 s.opens[packFile(fd)] = true
448 w = &writer{s, fd, w}
449 }
450 return
451 }
452
453 func (s *Storage) Remove(fd storage.FileDesc) (err error) {
454 err = s.emulateError(ModeRemove, fd.Type)
455 if err == nil {
456 s.stall(ModeRemove, fd.Type)
457 }
458 s.mu.Lock()
459 defer s.mu.Unlock()
460 if err == nil {
461 s.assertOpen(fd)
462 s.countNB(ModeRemove, fd.Type, 0)
463 err = s.Storage.Remove(fd)
464 }
465 if err != nil {
466 s.logI("file remove failed, fd=%s err=%v", fd, err)
467 } else {
468 s.logI("file removed, fd=%s", fd)
469 }
470 return
471 }
472
473 func (s *Storage) ForceRemove(fd storage.FileDesc) (err error) {
474 s.countNB(ModeRemove, fd.Type, 0)
475 if err = s.Storage.Remove(fd); err != nil {
476 s.logI("file remove failed (forced), fd=%s err=%v", fd, err)
477 } else {
478 s.logI("file removed (forced), fd=%s", fd)
479 }
480 return
481 }
482
483 func (s *Storage) Rename(oldfd, newfd storage.FileDesc) (err error) {
484 err = s.emulateError(ModeRename, oldfd.Type)
485 if err == nil {
486 s.stall(ModeRename, oldfd.Type)
487 }
488 s.mu.Lock()
489 defer s.mu.Unlock()
490 if err == nil {
491 s.assertOpen(oldfd)
492 s.assertOpen(newfd)
493 s.countNB(ModeRename, oldfd.Type, 0)
494 err = s.Storage.Rename(oldfd, newfd)
495 }
496 if err != nil {
497 s.logI("file rename failed, oldfd=%s newfd=%s err=%v", oldfd, newfd, err)
498 } else {
499 s.logI("file renamed, oldfd=%s newfd=%s", oldfd, newfd)
500 }
501 return
502 }
503
504 func (s *Storage) ForceRename(oldfd, newfd storage.FileDesc) (err error) {
505 s.countNB(ModeRename, oldfd.Type, 0)
506 if err = s.Storage.Rename(oldfd, newfd); err != nil {
507 s.logI("file rename failed (forced), oldfd=%s newfd=%s err=%v", oldfd, newfd, err)
508 } else {
509 s.logI("file renamed (forced), oldfd=%s newfd=%s", oldfd, newfd)
510 }
511 return
512 }
513
514 func (s *Storage) openFiles() string {
515 out := "Open files:"
516 for x, writer := range s.opens {
517 fd := unpackFile(x)
518 out += fmt.Sprintf("\n ยท fd=%s writer=%v", fd, writer)
519 }
520 return out
521 }
522
523 func (s *Storage) CloseCheck() {
524 s.mu.Lock()
525 defer s.mu.Unlock()
526 ExpectWithOffset(1, s.opens).To(BeEmpty(), s.openFiles())
527 }
528
529 func (s *Storage) OnClose(onClose func() (preserve bool, err error)) {
530 s.mu.Lock()
531 s.onClose = onClose
532 s.mu.Unlock()
533 }
534
535 func (s *Storage) Close() error {
536 s.mu.Lock()
537 defer s.mu.Unlock()
538 ExpectWithOffset(1, s.opens).To(BeEmpty(), s.openFiles())
539 err := s.Storage.Close()
540 if err != nil {
541 s.logI("storage closing failed, err=%v", err)
542 } else {
543 s.logI("storage closed")
544 }
545 var preserve bool
546 if s.onClose != nil {
547 var err0 error
548 if preserve, err0 = s.onClose(); err0 != nil {
549 s.logI("onClose error, err=%v", err0)
550 }
551 }
552 if s.path != "" {
553 if storageKeepFS || preserve {
554 s.logI("storage is preserved, path=%v", s.path)
555 } else {
556 if err1 := os.RemoveAll(s.path); err1 != nil {
557 s.logI("cannot remove storage, err=%v", err1)
558 } else {
559 s.logI("storage has been removed")
560 }
561 }
562 }
563 return err
564 }
565
566 func (s *Storage) countNB(m StorageMode, t storage.FileType, n int) {
567 s.counters[flattenType(m, t)]++
568 s.bytesCounter[flattenType(m, t)] += int64(n)
569 }
570
571 func (s *Storage) count(m StorageMode, t storage.FileType, n int) {
572 s.mu.Lock()
573 defer s.mu.Unlock()
574 s.countNB(m, t, n)
575 }
576
577 func (s *Storage) ResetCounter(m StorageMode, t storage.FileType) {
578 for _, x := range listFlattenType(m, t) {
579 s.counters[x] = 0
580 s.bytesCounter[x] = 0
581 }
582 }
583
584 func (s *Storage) Counter(m StorageMode, t storage.FileType) (count int, bytes int64) {
585 for _, x := range listFlattenType(m, t) {
586 count += s.counters[x]
587 bytes += s.bytesCounter[x]
588 }
589 return
590 }
591
592 func (s *Storage) emulateError(m StorageMode, t storage.FileType) error {
593 s.mu.Lock()
594 defer s.mu.Unlock()
595 x := flattenType(m, t)
596 if err := s.emulatedError[x]; err != nil {
597 if s.emulatedErrorOnce[x] {
598 s.emulatedError[x] = nil
599 }
600 return emulatedError{err}
601 }
602 if err := s.emulatedRandomError[x]; err != nil && s.rand.Float64() < s.emulatedRandomErrorProb[x] {
603 return emulatedError{err}
604 }
605 return nil
606 }
607
608 func (s *Storage) EmulateError(m StorageMode, t storage.FileType, err error) {
609 s.mu.Lock()
610 defer s.mu.Unlock()
611 for _, x := range listFlattenType(m, t) {
612 s.emulatedError[x] = err
613 s.emulatedErrorOnce[x] = false
614 }
615 }
616
617 func (s *Storage) EmulateErrorOnce(m StorageMode, t storage.FileType, err error) {
618 s.mu.Lock()
619 defer s.mu.Unlock()
620 for _, x := range listFlattenType(m, t) {
621 s.emulatedError[x] = err
622 s.emulatedErrorOnce[x] = true
623 }
624 }
625
626 func (s *Storage) EmulateRandomError(m StorageMode, t storage.FileType, prob float64, err error) {
627 s.mu.Lock()
628 defer s.mu.Unlock()
629 for _, x := range listFlattenType(m, t) {
630 s.emulatedRandomError[x] = err
631 s.emulatedRandomErrorProb[x] = prob
632 }
633 }
634
635 func (s *Storage) stall(m StorageMode, t storage.FileType) {
636 x := flattenType(m, t)
637 s.mu.Lock()
638 defer s.mu.Unlock()
639 for s.stalled[x] {
640 s.stallCond.Wait()
641 }
642 }
643
644 func (s *Storage) Stall(m StorageMode, t storage.FileType) {
645 s.mu.Lock()
646 defer s.mu.Unlock()
647 for _, x := range listFlattenType(m, t) {
648 s.stalled[x] = true
649 }
650 }
651
652 func (s *Storage) Release(m StorageMode, t storage.FileType) {
653 s.mu.Lock()
654 defer s.mu.Unlock()
655 for _, x := range listFlattenType(m, t) {
656 s.stalled[x] = false
657 }
658 s.stallCond.Broadcast()
659 }
660
661 func NewStorage() *Storage {
662 var (
663 stor storage.Storage
664 path string
665 )
666 if storageUseFS {
667 for {
668 storageMu.Lock()
669 num := storageNum
670 storageNum++
671 storageMu.Unlock()
672 path = filepath.Join(os.TempDir(), fmt.Sprintf("goleveldb-test%d0%d0%d", os.Getuid(), os.Getpid(), num))
673 if _, err := os.Stat(path); os.IsNotExist(err) {
674 stor, err = storage.OpenFile(path, false)
675 ExpectWithOffset(1, err).NotTo(HaveOccurred(), "creating storage at %s", path)
676 break
677 }
678 }
679 } else {
680 stor = storage.NewMemStorage()
681 }
682 s := &Storage{
683 Storage: stor,
684 path: path,
685 rand: NewRand(),
686 opens: make(map[uint64]bool),
687 }
688 s.stallCond.L = &s.mu
689 if s.path != "" {
690 s.logI("using FS storage")
691 s.logI("storage path: %s", s.path)
692 } else {
693 s.logI("using MEM storage")
694 }
695 return s
696 }
697
View as plain text