...
1
12 package gbytes
13
14 import (
15 "errors"
16 "fmt"
17 "io"
18 "regexp"
19 "sync"
20 "time"
21 )
22
23
28 type Buffer struct {
29 contents []byte
30 readCursor uint64
31 lock *sync.Mutex
32 detectCloser chan interface{}
33 closed bool
34 }
35
36
39 func NewBuffer() *Buffer {
40 return &Buffer{
41 lock: &sync.Mutex{},
42 }
43 }
44
45
48 func BufferWithBytes(bytes []byte) *Buffer {
49 return &Buffer{
50 lock: &sync.Mutex{},
51 contents: bytes,
52 }
53 }
54
55
59 func BufferReader(reader io.Reader) *Buffer {
60 b := &Buffer{
61 lock: &sync.Mutex{},
62 }
63
64 go func() {
65 io.Copy(b, reader)
66 b.Close()
67 }()
68
69 return b
70 }
71
72
75 func (b *Buffer) Write(p []byte) (n int, err error) {
76 b.lock.Lock()
77 defer b.lock.Unlock()
78
79 if b.closed {
80 return 0, errors.New("attempt to write to closed buffer")
81 }
82
83 b.contents = append(b.contents, p...)
84 return len(p), nil
85 }
86
87
91 func (b *Buffer) Read(d []byte) (int, error) {
92 b.lock.Lock()
93 defer b.lock.Unlock()
94
95 if uint64(len(b.contents)) <= b.readCursor {
96 return 0, io.EOF
97 }
98
99 n := copy(d, b.contents[b.readCursor:])
100 b.readCursor += uint64(n)
101
102 return n, nil
103 }
104
105
108 func (b *Buffer) Clear() error {
109 b.lock.Lock()
110 defer b.lock.Unlock()
111
112 if b.closed {
113 return errors.New("attempt to clear closed buffer")
114 }
115
116 b.contents = []byte{}
117 b.readCursor = 0
118 return nil
119 }
120
121
124 func (b *Buffer) Close() error {
125 b.lock.Lock()
126 defer b.lock.Unlock()
127
128 b.closed = true
129
130 return nil
131 }
132
133
136 func (b *Buffer) Closed() bool {
137 b.lock.Lock()
138 defer b.lock.Unlock()
139
140 return b.closed
141 }
142
143
146 func (b *Buffer) Contents() []byte {
147 b.lock.Lock()
148 defer b.lock.Unlock()
149
150 contents := make([]byte, len(b.contents))
151 copy(contents, b.contents)
152 return contents
153 }
154
155
182 func (b *Buffer) Detect(desired string, args ...interface{}) chan bool {
183 formattedRegexp := desired
184 if len(args) > 0 {
185 formattedRegexp = fmt.Sprintf(desired, args...)
186 }
187 re := regexp.MustCompile(formattedRegexp)
188
189 b.lock.Lock()
190 defer b.lock.Unlock()
191
192 if b.detectCloser == nil {
193 b.detectCloser = make(chan interface{})
194 }
195
196 closer := b.detectCloser
197 response := make(chan bool)
198 go func() {
199 ticker := time.NewTicker(10 * time.Millisecond)
200 defer ticker.Stop()
201 defer close(response)
202 for {
203 select {
204 case <-ticker.C:
205 b.lock.Lock()
206 data, cursor := b.contents[b.readCursor:], b.readCursor
207 loc := re.FindIndex(data)
208 b.lock.Unlock()
209
210 if loc != nil {
211 response <- true
212 b.lock.Lock()
213 newCursorPosition := cursor + uint64(loc[1])
214 if newCursorPosition >= b.readCursor {
215 b.readCursor = newCursorPosition
216 }
217 b.lock.Unlock()
218 return
219 }
220 case <-closer:
221 return
222 }
223 }
224 }()
225
226 return response
227 }
228
229
232 func (b *Buffer) CancelDetects() {
233 b.lock.Lock()
234 defer b.lock.Unlock()
235
236 close(b.detectCloser)
237 b.detectCloser = nil
238 }
239
240 func (b *Buffer) didSay(re *regexp.Regexp) (bool, []byte) {
241 b.lock.Lock()
242 defer b.lock.Unlock()
243
244 unreadBytes := b.contents[b.readCursor:]
245 copyOfUnreadBytes := make([]byte, len(unreadBytes))
246 copy(copyOfUnreadBytes, unreadBytes)
247
248 loc := re.FindIndex(unreadBytes)
249
250 if loc != nil {
251 b.readCursor += uint64(loc[1])
252 return true, copyOfUnreadBytes
253 }
254 return false, copyOfUnreadBytes
255 }
256
View as plain text