1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
|
/*
Package gbytes provides a buffer that supports incrementally detecting input.
You use gbytes.Buffer with the gbytes.Say matcher. When Say finds a match, it fastforwards the buffer's read cursor to the end of that match.
Subsequent matches against the buffer will only operate against data that appears *after* the read cursor.
The read cursor is an opaque implementation detail that you cannot access. You should use the Say matcher to sift through the buffer. You can always
access the entire buffer's contents with Contents().
*/
package gbytes
import (
"errors"
"fmt"
"io"
"regexp"
"sync"
"time"
)
/*
gbytes.Buffer implements an io.Writer and can be used with the gbytes.Say matcher.
You should only use a gbytes.Buffer in test code. It stores all writes in an in-memory buffer - behavior that is inappropriate for production code!
*/
type Buffer struct {
contents []byte
readCursor uint64
lock *sync.Mutex
detectCloser chan interface{}
closed bool
}
/*
NewBuffer returns a new gbytes.Buffer
*/
func NewBuffer() *Buffer {
return &Buffer{
lock: &sync.Mutex{},
}
}
/*
BufferWithBytes returns a new gbytes.Buffer seeded with the passed in bytes
*/
func BufferWithBytes(bytes []byte) *Buffer {
return &Buffer{
lock: &sync.Mutex{},
contents: bytes,
}
}
/*
BufferReader returns a new gbytes.Buffer that wraps a reader. The reader's contents are read into
the Buffer via io.Copy
*/
func BufferReader(reader io.Reader) *Buffer {
b := &Buffer{
lock: &sync.Mutex{},
}
go func() {
io.Copy(b, reader)
b.Close()
}()
return b
}
/*
Write implements the io.Writer interface
*/
func (b *Buffer) Write(p []byte) (n int, err error) {
b.lock.Lock()
defer b.lock.Unlock()
if b.closed {
return 0, errors.New("attempt to write to closed buffer")
}
b.contents = append(b.contents, p...)
return len(p), nil
}
/*
Read implements the io.Reader interface. It advances the
cursor as it reads.
Returns an error if called after Close.
*/
func (b *Buffer) Read(d []byte) (int, error) {
b.lock.Lock()
defer b.lock.Unlock()
if b.closed {
return 0, errors.New("attempt to read from closed buffer")
}
if uint64(len(b.contents)) <= b.readCursor {
return 0, io.EOF
}
n := copy(d, b.contents[b.readCursor:])
b.readCursor += uint64(n)
return n, nil
}
/*
Clear clears out the buffer's contents
*/
func (b *Buffer) Clear() error {
b.lock.Lock()
defer b.lock.Unlock()
if b.closed {
return errors.New("attempt to clear closed buffer")
}
b.contents = []byte{}
b.readCursor = 0
return nil
}
/*
Close signifies that the buffer will no longer be written to
*/
func (b *Buffer) Close() error {
b.lock.Lock()
defer b.lock.Unlock()
b.closed = true
return nil
}
/*
Closed returns true if the buffer has been closed
*/
func (b *Buffer) Closed() bool {
b.lock.Lock()
defer b.lock.Unlock()
return b.closed
}
/*
Contents returns all data ever written to the buffer.
*/
func (b *Buffer) Contents() []byte {
b.lock.Lock()
defer b.lock.Unlock()
contents := make([]byte, len(b.contents))
copy(contents, b.contents)
return contents
}
/*
Detect takes a regular expression and returns a channel.
The channel will receive true the first time data matching the regular expression is written to the buffer.
The channel is subsequently closed and the buffer's read-cursor is fast-forwarded to just after the matching region.
You typically don't need to use Detect and should use the ghttp.Say matcher instead. Detect is useful, however, in cases where your code must
be branch and handle different outputs written to the buffer.
For example, consider a buffer hooked up to the stdout of a client library. You may (or may not, depending on state outside of your control) need to authenticate the client library.
You could do something like:
select {
case <-buffer.Detect("You are not logged in"):
//log in
case <-buffer.Detect("Success"):
//carry on
case <-time.After(time.Second):
//welp
}
buffer.CancelDetects()
You should always call CancelDetects after using Detect. This will close any channels that have not detected and clean up the goroutines that were spawned to support them.
Finally, you can pass detect a format string followed by variadic arguments. This will construct the regexp using fmt.Sprintf.
*/
func (b *Buffer) Detect(desired string, args ...interface{}) chan bool {
formattedRegexp := desired
if len(args) > 0 {
formattedRegexp = fmt.Sprintf(desired, args...)
}
re := regexp.MustCompile(formattedRegexp)
b.lock.Lock()
defer b.lock.Unlock()
if b.detectCloser == nil {
b.detectCloser = make(chan interface{})
}
closer := b.detectCloser
response := make(chan bool)
go func() {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
defer close(response)
for {
select {
case <-ticker.C:
b.lock.Lock()
data, cursor := b.contents[b.readCursor:], b.readCursor
loc := re.FindIndex(data)
b.lock.Unlock()
if loc != nil {
response <- true
b.lock.Lock()
newCursorPosition := cursor + uint64(loc[1])
if newCursorPosition >= b.readCursor {
b.readCursor = newCursorPosition
}
b.lock.Unlock()
return
}
case <-closer:
return
}
}
}()
return response
}
/*
CancelDetects cancels any pending detects and cleans up their goroutines. You should always call this when you're done with a set of Detect channels.
*/
func (b *Buffer) CancelDetects() {
b.lock.Lock()
defer b.lock.Unlock()
close(b.detectCloser)
b.detectCloser = nil
}
func (b *Buffer) didSay(re *regexp.Regexp) (bool, []byte) {
b.lock.Lock()
defer b.lock.Unlock()
unreadBytes := b.contents[b.readCursor:]
copyOfUnreadBytes := make([]byte, len(unreadBytes))
copy(copyOfUnreadBytes, unreadBytes)
loc := re.FindIndex(unreadBytes)
if loc != nil {
b.readCursor += uint64(loc[1])
return true, copyOfUnreadBytes
}
return false, copyOfUnreadBytes
}
|