diff options
Diffstat (limited to 'pkg/channel')
-rw-r--r-- | pkg/channel/doc.go | 17 | ||||
-rw-r--r-- | pkg/channel/writer.go | 53 |
2 files changed, 70 insertions, 0 deletions
diff --git a/pkg/channel/doc.go b/pkg/channel/doc.go new file mode 100644 index 000000000..656fbddaa --- /dev/null +++ b/pkg/channel/doc.go @@ -0,0 +1,17 @@ +/* +Package channel provides helper structs/methods/funcs for working with channels + +Proxy from an io.Writer to a channel: + + w := channel.NewWriter(make(chan []byte, 10)) + go func() { + w.Write([]byte("Hello, World")) + }() + + fmt.Println(string(<-w.Chan())) + w.Close() + +Use of the constructor is required to initialize the channel. +Provide a channel of sufficient size to handle messages from writer(s). +*/ +package channel diff --git a/pkg/channel/writer.go b/pkg/channel/writer.go new file mode 100644 index 000000000..dbb38e416 --- /dev/null +++ b/pkg/channel/writer.go @@ -0,0 +1,53 @@ +package channel + +import ( + "io" + "sync" + + "github.com/pkg/errors" +) + +// WriteCloser is an io.WriteCloser that that proxies Write() calls to a channel +// The []byte buffer of the Write() is queued on the channel as one message. +type WriteCloser interface { + io.WriteCloser + Chan() <-chan []byte +} + +type writeCloser struct { + ch chan []byte + mux sync.Mutex +} + +// NewWriter initializes a new channel writer +func NewWriter(c chan []byte) WriteCloser { + return &writeCloser{ + ch: c, + } +} + +// Chan returns the R/O channel behind WriteCloser +func (w *writeCloser) Chan() <-chan []byte { + return w.ch +} + +// Write method for WriteCloser +func (w *writeCloser) Write(b []byte) (int, error) { + if w == nil || w.ch == nil { + return 0, errors.New("use channel.NewWriter() to initialize a WriteCloser") + } + + w.mux.Lock() + buf := make([]byte, len(b)) + copy(buf, b) + w.ch <- buf + w.mux.Unlock() + + return len(b), nil +} + +// Close method for WriteCloser +func (w *writeCloser) Close() error { + close(w.ch) + return nil +} |