...
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15 package wal
16
17 import (
18 "fmt"
19 "os"
20 "path/filepath"
21
22 "go.etcd.io/etcd/client/pkg/v3/fileutil"
23
24 "go.uber.org/zap"
25 )
26
27
28 type filePipeline struct {
29 lg *zap.Logger
30
31
32 dir string
33
34 size int64
35
36 count int
37
38 filec chan *fileutil.LockedFile
39 errc chan error
40 donec chan struct{}
41 }
42
43 func newFilePipeline(lg *zap.Logger, dir string, fileSize int64) *filePipeline {
44 if lg == nil {
45 lg = zap.NewNop()
46 }
47 fp := &filePipeline{
48 lg: lg,
49 dir: dir,
50 size: fileSize,
51 filec: make(chan *fileutil.LockedFile),
52 errc: make(chan error, 1),
53 donec: make(chan struct{}),
54 }
55 go fp.run()
56 return fp
57 }
58
59
60
61 func (fp *filePipeline) Open() (f *fileutil.LockedFile, err error) {
62 select {
63 case f = <-fp.filec:
64 case err = <-fp.errc:
65 }
66 return f, err
67 }
68
69 func (fp *filePipeline) Close() error {
70 close(fp.donec)
71 return <-fp.errc
72 }
73
74 func (fp *filePipeline) alloc() (f *fileutil.LockedFile, err error) {
75
76 fpath := filepath.Join(fp.dir, fmt.Sprintf("%d.tmp", fp.count%2))
77 if f, err = fileutil.LockFile(fpath, os.O_CREATE|os.O_WRONLY, fileutil.PrivateFileMode); err != nil {
78 return nil, err
79 }
80 if err = fileutil.Preallocate(f.File, fp.size, true); err != nil {
81 fp.lg.Error("failed to preallocate space when creating a new WAL", zap.Int64("size", fp.size), zap.Error(err))
82 f.Close()
83 return nil, err
84 }
85 fp.count++
86 return f, nil
87 }
88
89 func (fp *filePipeline) run() {
90 defer close(fp.errc)
91 for {
92 f, err := fp.alloc()
93 if err != nil {
94 fp.errc <- err
95 return
96 }
97 select {
98 case fp.filec <- f:
99 case <-fp.donec:
100 os.Remove(f.Name())
101 f.Close()
102 return
103 }
104 }
105 }
106
View as plain text