1
2
3
18
19 package fifo
20
21 import (
22 "context"
23 "fmt"
24 "io"
25 "os"
26 "runtime"
27 "sync"
28 "syscall"
29
30 "golang.org/x/sys/unix"
31 )
32
33 type fifo struct {
34 flag int
35 opened chan struct{}
36 closed chan struct{}
37 closing chan struct{}
38 err error
39 file *os.File
40 closingOnce sync.Once
41 closedOnce sync.Once
42 handle *handle
43 }
44
45 var leakCheckWg *sync.WaitGroup
46
47
48 func OpenFifoDup2(ctx context.Context, fn string, flag int, perm os.FileMode, fd int) (io.ReadWriteCloser, error) {
49 f, err := openFifo(ctx, fn, flag, perm)
50 if err != nil {
51 return nil, fmt.Errorf("fifo error: %w", err)
52 }
53
54 if err := unix.Dup2(int(f.file.Fd()), fd); err != nil {
55 _ = f.Close()
56 return nil, fmt.Errorf("dup2 error: %w", err)
57 }
58
59 return f, nil
60 }
61
62
63
64
65
66
67
68
69
70
71
72 func OpenFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (io.ReadWriteCloser, error) {
73 fifo, err := openFifo(ctx, fn, flag, perm)
74 if fifo == nil {
75
76
77 return nil, err
78 }
79 return fifo, err
80 }
81
82 func openFifo(ctx context.Context, fn string, flag int, perm os.FileMode) (*fifo, error) {
83 if _, err := os.Stat(fn); err != nil {
84 if os.IsNotExist(err) && flag&syscall.O_CREAT != 0 {
85 if err := syscall.Mkfifo(fn, uint32(perm&os.ModePerm)); err != nil && !os.IsExist(err) {
86 return nil, fmt.Errorf("error creating fifo %v: %w", fn, err)
87 }
88 } else {
89 return nil, err
90 }
91 }
92
93 block := flag&syscall.O_NONBLOCK == 0 || flag&syscall.O_RDWR != 0
94
95 flag &= ^syscall.O_CREAT
96 flag &= ^syscall.O_NONBLOCK
97
98 h, err := getHandle(fn)
99 if err != nil {
100 return nil, err
101 }
102
103 f := &fifo{
104 handle: h,
105 flag: flag,
106 opened: make(chan struct{}),
107 closed: make(chan struct{}),
108 closing: make(chan struct{}),
109 }
110
111 wg := leakCheckWg
112 if wg != nil {
113 wg.Add(2)
114 }
115
116 go func() {
117 if wg != nil {
118 defer wg.Done()
119 }
120 select {
121 case <-ctx.Done():
122 select {
123 case <-f.opened:
124 default:
125 f.Close()
126 }
127 case <-f.opened:
128 case <-f.closed:
129 }
130 }()
131 go func() {
132 if wg != nil {
133 defer wg.Done()
134 }
135 var file *os.File
136 fn, err := h.Path()
137 if err == nil {
138 file, err = os.OpenFile(fn, flag, 0)
139 }
140 select {
141 case <-f.closing:
142 if err == nil {
143 select {
144 case <-ctx.Done():
145 err = ctx.Err()
146 default:
147 err = fmt.Errorf("fifo %v was closed before opening", h.Name())
148 }
149 if file != nil {
150 file.Close()
151 }
152 }
153 default:
154 }
155 if err != nil {
156 f.closedOnce.Do(func() {
157 f.err = err
158 close(f.closed)
159 })
160 return
161 }
162 f.file = file
163 close(f.opened)
164 }()
165 if block {
166 select {
167 case <-f.opened:
168 case <-f.closed:
169 return nil, f.err
170 }
171 }
172 return f, nil
173 }
174
175
176 func (f *fifo) Read(b []byte) (int, error) {
177 if f.flag&syscall.O_WRONLY > 0 {
178 return 0, ErrRdFrmWRONLY
179 }
180 select {
181 case <-f.opened:
182 return f.file.Read(b)
183 default:
184 }
185 select {
186 case <-f.opened:
187 return f.file.Read(b)
188 case <-f.closed:
189 return 0, ErrReadClosed
190 }
191 }
192
193
194 func (f *fifo) Write(b []byte) (int, error) {
195 if f.flag&(syscall.O_WRONLY|syscall.O_RDWR) == 0 {
196 return 0, ErrWrToRDONLY
197 }
198 select {
199 case <-f.opened:
200 return f.file.Write(b)
201 default:
202 }
203 select {
204 case <-f.opened:
205 return f.file.Write(b)
206 case <-f.closed:
207 return 0, ErrWriteClosed
208 }
209 }
210
211
212
213 func (f *fifo) Close() (retErr error) {
214 for {
215 if f == nil {
216 return
217 }
218
219 select {
220 case <-f.closed:
221 f.handle.Close()
222 return
223 default:
224 select {
225 case <-f.opened:
226 f.closedOnce.Do(func() {
227 retErr = f.file.Close()
228 f.err = retErr
229 close(f.closed)
230 })
231 default:
232 if f.flag&syscall.O_RDWR != 0 {
233 runtime.Gosched()
234 break
235 }
236 f.closingOnce.Do(func() {
237 close(f.closing)
238 })
239 reverseMode := syscall.O_WRONLY
240 if f.flag&syscall.O_WRONLY > 0 {
241 reverseMode = syscall.O_RDONLY
242 }
243 fn, err := f.handle.Path()
244
245
246 select {
247 case <-f.closed:
248 default:
249 if err != nil {
250
251
252 f.closedOnce.Do(func() {
253 f.err = err
254 close(f.closed)
255 })
256 <-f.closed
257 break
258 }
259 f, err := os.OpenFile(fn, reverseMode|syscall.O_NONBLOCK, 0)
260 if err == nil {
261 f.Close()
262 }
263 runtime.Gosched()
264 }
265 }
266 }
267 }
268 }
269
View as plain text