summaryrefslogtreecommitdiff
path: root/pkg/channel
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/channel')
-rw-r--r--pkg/channel/doc.go17
-rw-r--r--pkg/channel/writer.go53
2 files changed, 70 insertions, 0 deletions
diff --git a/pkg/channel/doc.go b/pkg/channel/doc.go
new file mode 100644
index 000000000..656fbddaa
--- /dev/null
+++ b/pkg/channel/doc.go
@@ -0,0 +1,17 @@
+/*
+Package channel provides helper structs/methods/funcs for working with channels
+
+Proxy from an io.Writer to a channel:
+
+ w := channel.NewWriter(make(chan []byte, 10))
+ go func() {
+ w.Write([]byte("Hello, World"))
+ }()
+
+ fmt.Println(string(<-w.Chan()))
+ w.Close()
+
+Use of the constructor is required to initialize the channel.
+Provide a channel of sufficient size to handle messages from writer(s).
+*/
+package channel
diff --git a/pkg/channel/writer.go b/pkg/channel/writer.go
new file mode 100644
index 000000000..dbb38e416
--- /dev/null
+++ b/pkg/channel/writer.go
@@ -0,0 +1,53 @@
+package channel
+
+import (
+ "io"
+ "sync"
+
+ "github.com/pkg/errors"
+)
+
+// WriteCloser is an io.WriteCloser that that proxies Write() calls to a channel
+// The []byte buffer of the Write() is queued on the channel as one message.
+type WriteCloser interface {
+ io.WriteCloser
+ Chan() <-chan []byte
+}
+
+type writeCloser struct {
+ ch chan []byte
+ mux sync.Mutex
+}
+
+// NewWriter initializes a new channel writer
+func NewWriter(c chan []byte) WriteCloser {
+ return &writeCloser{
+ ch: c,
+ }
+}
+
+// Chan returns the R/O channel behind WriteCloser
+func (w *writeCloser) Chan() <-chan []byte {
+ return w.ch
+}
+
+// Write method for WriteCloser
+func (w *writeCloser) Write(b []byte) (int, error) {
+ if w == nil || w.ch == nil {
+ return 0, errors.New("use channel.NewWriter() to initialize a WriteCloser")
+ }
+
+ w.mux.Lock()
+ buf := make([]byte, len(b))
+ copy(buf, b)
+ w.ch <- buf
+ w.mux.Unlock()
+
+ return len(b), nil
+}
+
+// Close method for WriteCloser
+func (w *writeCloser) Close() error {
+ close(w.ch)
+ return nil
+}