...
1
2
3
4
5 package http2
6
7 import (
8 "errors"
9 "io"
10 "sync"
11 )
12
13
14
15
16 type pipe struct {
17 mu sync.Mutex
18 c sync.Cond
19 b pipeBuffer
20 unread int
21 err error
22 breakErr error
23 donec chan struct{}
24 readFn func()
25 }
26
27 type pipeBuffer interface {
28 Len() int
29 io.Writer
30 io.Reader
31 }
32
33
34
35 func (p *pipe) setBuffer(b pipeBuffer) {
36 p.mu.Lock()
37 defer p.mu.Unlock()
38 if p.err != nil || p.breakErr != nil {
39 return
40 }
41 p.b = b
42 }
43
44 func (p *pipe) Len() int {
45 p.mu.Lock()
46 defer p.mu.Unlock()
47 if p.b == nil {
48 return p.unread
49 }
50 return p.b.Len()
51 }
52
53
54
55 func (p *pipe) Read(d []byte) (n int, err error) {
56 p.mu.Lock()
57 defer p.mu.Unlock()
58 if p.c.L == nil {
59 p.c.L = &p.mu
60 }
61 for {
62 if p.breakErr != nil {
63 return 0, p.breakErr
64 }
65 if p.b != nil && p.b.Len() > 0 {
66 return p.b.Read(d)
67 }
68 if p.err != nil {
69 if p.readFn != nil {
70 p.readFn()
71 p.readFn = nil
72 }
73 p.b = nil
74 return 0, p.err
75 }
76 p.c.Wait()
77 }
78 }
79
80 var (
81 errClosedPipeWrite = errors.New("write on closed buffer")
82 errUninitializedPipeWrite = errors.New("write on uninitialized buffer")
83 )
84
85
86
87 func (p *pipe) Write(d []byte) (n int, err error) {
88 p.mu.Lock()
89 defer p.mu.Unlock()
90 if p.c.L == nil {
91 p.c.L = &p.mu
92 }
93 defer p.c.Signal()
94 if p.err != nil || p.breakErr != nil {
95 return 0, errClosedPipeWrite
96 }
97
98
99
100 if p.b == nil {
101 return 0, errUninitializedPipeWrite
102 }
103 return p.b.Write(d)
104 }
105
106
107
108
109
110
111 func (p *pipe) CloseWithError(err error) { p.closeWithError(&p.err, err, nil) }
112
113
114
115
116 func (p *pipe) BreakWithError(err error) { p.closeWithError(&p.breakErr, err, nil) }
117
118
119
120 func (p *pipe) closeWithErrorAndCode(err error, fn func()) { p.closeWithError(&p.err, err, fn) }
121
122 func (p *pipe) closeWithError(dst *error, err error, fn func()) {
123 if err == nil {
124 panic("err must be non-nil")
125 }
126 p.mu.Lock()
127 defer p.mu.Unlock()
128 if p.c.L == nil {
129 p.c.L = &p.mu
130 }
131 defer p.c.Signal()
132 if *dst != nil {
133
134 return
135 }
136 p.readFn = fn
137 if dst == &p.breakErr {
138 if p.b != nil {
139 p.unread += p.b.Len()
140 }
141 p.b = nil
142 }
143 *dst = err
144 p.closeDoneLocked()
145 }
146
147
148 func (p *pipe) closeDoneLocked() {
149 if p.donec == nil {
150 return
151 }
152
153
154 select {
155 case <-p.donec:
156 default:
157 close(p.donec)
158 }
159 }
160
161
162 func (p *pipe) Err() error {
163 p.mu.Lock()
164 defer p.mu.Unlock()
165 if p.breakErr != nil {
166 return p.breakErr
167 }
168 return p.err
169 }
170
171
172
173 func (p *pipe) Done() <-chan struct{} {
174 p.mu.Lock()
175 defer p.mu.Unlock()
176 if p.donec == nil {
177 p.donec = make(chan struct{})
178 if p.err != nil || p.breakErr != nil {
179
180 p.closeDoneLocked()
181 }
182 }
183 return p.donec
184 }
185
View as plain text