summaryrefslogtreecommitdiff
path: root/vendor/github.com/fsouza/go-dockerclient/event.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/fsouza/go-dockerclient/event.go')
-rw-r--r--vendor/github.com/fsouza/go-dockerclient/event.go410
1 files changed, 410 insertions, 0 deletions
diff --git a/vendor/github.com/fsouza/go-dockerclient/event.go b/vendor/github.com/fsouza/go-dockerclient/event.go
new file mode 100644
index 000000000..18ae5d5a6
--- /dev/null
+++ b/vendor/github.com/fsouza/go-dockerclient/event.go
@@ -0,0 +1,410 @@
+// Copyright 2014 go-dockerclient authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package docker
+
+import (
+ "encoding/json"
+ "errors"
+ "fmt"
+ "io"
+ "math"
+ "net"
+ "net/http"
+ "net/http/httputil"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// APIEvents represents events coming from the Docker API
+// The fields in the Docker API changed in API version 1.22, and
+// events for more than images and containers are now fired off.
+// To maintain forward and backward compatibility, go-dockerclient
+// replicates the event in both the new and old format as faithfully as possible.
+//
+// For events that only exist in 1.22 in later, `Status` is filled in as
+// `"Type:Action"` instead of just `Action` to allow for older clients to
+// differentiate and not break if they rely on the pre-1.22 Status types.
+//
+// The transformEvent method can be consulted for more information about how
+// events are translated from new/old API formats
+type APIEvents struct {
+ // New API Fields in 1.22
+ Action string `json:"action,omitempty"`
+ Type string `json:"type,omitempty"`
+ Actor APIActor `json:"actor,omitempty"`
+
+ // Old API fields for < 1.22
+ Status string `json:"status,omitempty"`
+ ID string `json:"id,omitempty"`
+ From string `json:"from,omitempty"`
+
+ // Fields in both
+ Time int64 `json:"time,omitempty"`
+ TimeNano int64 `json:"timeNano,omitempty"`
+}
+
+// APIActor represents an actor that accomplishes something for an event
+type APIActor struct {
+ ID string `json:"id,omitempty"`
+ Attributes map[string]string `json:"attributes,omitempty"`
+}
+
+type eventMonitoringState struct {
+ // `sync/atomic` expects the first word in an allocated struct to be 64-bit
+ // aligned on both ARM and x86-32. See https://goo.gl/zW7dgq for more details.
+ lastSeen int64
+ sync.RWMutex
+ sync.WaitGroup
+ enabled bool
+ C chan *APIEvents
+ errC chan error
+ listeners []chan<- *APIEvents
+}
+
+const (
+ maxMonitorConnRetries = 5
+ retryInitialWaitTime = 10.
+)
+
+var (
+ // ErrNoListeners is the error returned when no listeners are available
+ // to receive an event.
+ ErrNoListeners = errors.New("no listeners present to receive event")
+
+ // ErrListenerAlreadyExists is the error returned when the listerner already
+ // exists.
+ ErrListenerAlreadyExists = errors.New("listener already exists for docker events")
+
+ // ErrTLSNotSupported is the error returned when the client does not support
+ // TLS (this applies to the Windows named pipe client).
+ ErrTLSNotSupported = errors.New("tls not supported by this client")
+
+ // EOFEvent is sent when the event listener receives an EOF error.
+ EOFEvent = &APIEvents{
+ Type: "EOF",
+ Status: "EOF",
+ }
+)
+
+// AddEventListener adds a new listener to container events in the Docker API.
+//
+// The parameter is a channel through which events will be sent.
+func (c *Client) AddEventListener(listener chan<- *APIEvents) error {
+ var err error
+ if !c.eventMonitor.isEnabled() {
+ err = c.eventMonitor.enableEventMonitoring(c)
+ if err != nil {
+ return err
+ }
+ }
+ return c.eventMonitor.addListener(listener)
+}
+
+// RemoveEventListener removes a listener from the monitor.
+func (c *Client) RemoveEventListener(listener chan *APIEvents) error {
+ err := c.eventMonitor.removeListener(listener)
+ if err != nil {
+ return err
+ }
+ if c.eventMonitor.listernersCount() == 0 {
+ c.eventMonitor.disableEventMonitoring()
+ }
+ return nil
+}
+
+func (eventState *eventMonitoringState) addListener(listener chan<- *APIEvents) error {
+ eventState.Lock()
+ defer eventState.Unlock()
+ if listenerExists(listener, &eventState.listeners) {
+ return ErrListenerAlreadyExists
+ }
+ eventState.Add(1)
+ eventState.listeners = append(eventState.listeners, listener)
+ return nil
+}
+
+func (eventState *eventMonitoringState) removeListener(listener chan<- *APIEvents) error {
+ eventState.Lock()
+ defer eventState.Unlock()
+ if listenerExists(listener, &eventState.listeners) {
+ var newListeners []chan<- *APIEvents
+ for _, l := range eventState.listeners {
+ if l != listener {
+ newListeners = append(newListeners, l)
+ }
+ }
+ eventState.listeners = newListeners
+ eventState.Add(-1)
+ }
+ return nil
+}
+
+func (eventState *eventMonitoringState) closeListeners() {
+ for _, l := range eventState.listeners {
+ close(l)
+ eventState.Add(-1)
+ }
+ eventState.listeners = nil
+}
+
+func (eventState *eventMonitoringState) listernersCount() int {
+ eventState.RLock()
+ defer eventState.RUnlock()
+ return len(eventState.listeners)
+}
+
+func listenerExists(a chan<- *APIEvents, list *[]chan<- *APIEvents) bool {
+ for _, b := range *list {
+ if b == a {
+ return true
+ }
+ }
+ return false
+}
+
+func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error {
+ eventState.Lock()
+ defer eventState.Unlock()
+ if !eventState.enabled {
+ eventState.enabled = true
+ atomic.StoreInt64(&eventState.lastSeen, 0)
+ eventState.C = make(chan *APIEvents, 100)
+ eventState.errC = make(chan error, 1)
+ go eventState.monitorEvents(c)
+ }
+ return nil
+}
+
+func (eventState *eventMonitoringState) disableEventMonitoring() error {
+ eventState.Lock()
+ defer eventState.Unlock()
+
+ eventState.closeListeners()
+
+ eventState.Wait()
+
+ if eventState.enabled {
+ eventState.enabled = false
+ close(eventState.C)
+ close(eventState.errC)
+ }
+ return nil
+}
+
+func (eventState *eventMonitoringState) monitorEvents(c *Client) {
+ const (
+ noListenersTimeout = 5 * time.Second
+ noListenersInterval = 10 * time.Millisecond
+ noListenersMaxTries = noListenersTimeout / noListenersInterval
+ )
+
+ var err error
+ for i := time.Duration(0); i < noListenersMaxTries && eventState.noListeners(); i++ {
+ time.Sleep(10 * time.Millisecond)
+ }
+
+ if eventState.noListeners() {
+ // terminate if no listener is available after 5 seconds.
+ // Prevents goroutine leak when RemoveEventListener is called
+ // right after AddEventListener.
+ eventState.disableEventMonitoring()
+ return
+ }
+
+ if err = eventState.connectWithRetry(c); err != nil {
+ // terminate if connect failed
+ eventState.disableEventMonitoring()
+ return
+ }
+ for eventState.isEnabled() {
+ timeout := time.After(100 * time.Millisecond)
+ select {
+ case ev, ok := <-eventState.C:
+ if !ok {
+ return
+ }
+ if ev == EOFEvent {
+ eventState.disableEventMonitoring()
+ return
+ }
+ eventState.updateLastSeen(ev)
+ eventState.sendEvent(ev)
+ case err = <-eventState.errC:
+ if err == ErrNoListeners {
+ eventState.disableEventMonitoring()
+ return
+ } else if err != nil {
+ defer func() { go eventState.monitorEvents(c) }()
+ return
+ }
+ case <-timeout:
+ continue
+ }
+ }
+}
+
+func (eventState *eventMonitoringState) connectWithRetry(c *Client) error {
+ var retries int
+ eventState.RLock()
+ eventChan := eventState.C
+ errChan := eventState.errC
+ eventState.RUnlock()
+ err := c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
+ for ; err != nil && retries < maxMonitorConnRetries; retries++ {
+ waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
+ time.Sleep(time.Duration(waitTime) * time.Millisecond)
+ eventState.RLock()
+ eventChan = eventState.C
+ errChan = eventState.errC
+ eventState.RUnlock()
+ err = c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
+ }
+ return err
+}
+
+func (eventState *eventMonitoringState) noListeners() bool {
+ eventState.RLock()
+ defer eventState.RUnlock()
+ return len(eventState.listeners) == 0
+}
+
+func (eventState *eventMonitoringState) isEnabled() bool {
+ eventState.RLock()
+ defer eventState.RUnlock()
+ return eventState.enabled
+}
+
+func (eventState *eventMonitoringState) sendEvent(event *APIEvents) {
+ eventState.RLock()
+ defer eventState.RUnlock()
+ eventState.Add(1)
+ defer eventState.Done()
+ if eventState.enabled {
+ if len(eventState.listeners) == 0 {
+ eventState.errC <- ErrNoListeners
+ return
+ }
+
+ for _, listener := range eventState.listeners {
+ select {
+ case listener <- event:
+ default:
+ }
+ }
+ }
+}
+
+func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
+ eventState.Lock()
+ defer eventState.Unlock()
+ if atomic.LoadInt64(&eventState.lastSeen) < e.Time {
+ atomic.StoreInt64(&eventState.lastSeen, e.Time)
+ }
+}
+
+func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan chan error) error {
+ uri := "/events"
+ if startTime != 0 {
+ uri += fmt.Sprintf("?since=%d", startTime)
+ }
+ protocol := c.endpointURL.Scheme
+ address := c.endpointURL.Path
+ if protocol != "unix" && protocol != "npipe" {
+ protocol = "tcp"
+ address = c.endpointURL.Host
+ }
+ var dial net.Conn
+ var err error
+ if c.TLSConfig == nil {
+ dial, err = c.Dialer.Dial(protocol, address)
+ } else {
+ netDialer, ok := c.Dialer.(*net.Dialer)
+ if !ok {
+ return ErrTLSNotSupported
+ }
+ dial, err = tlsDialWithDialer(netDialer, protocol, address, c.TLSConfig)
+ }
+ if err != nil {
+ return err
+ }
+ conn := httputil.NewClientConn(dial, nil)
+ req, err := http.NewRequest("GET", uri, nil)
+ if err != nil {
+ return err
+ }
+ res, err := conn.Do(req)
+ if err != nil {
+ return err
+ }
+ go func(res *http.Response, conn *httputil.ClientConn) {
+ defer conn.Close()
+ defer res.Body.Close()
+ decoder := json.NewDecoder(res.Body)
+ for {
+ var event APIEvents
+ if err = decoder.Decode(&event); err != nil {
+ if err == io.EOF || err == io.ErrUnexpectedEOF {
+ c.eventMonitor.RLock()
+ if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
+ // Signal that we're exiting.
+ eventChan <- EOFEvent
+ }
+ c.eventMonitor.RUnlock()
+ break
+ }
+ errChan <- err
+ }
+ if event.Time == 0 {
+ continue
+ }
+ transformEvent(&event)
+ c.eventMonitor.RLock()
+ if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
+ eventChan <- &event
+ }
+ c.eventMonitor.RUnlock()
+ }
+ }(res, conn)
+ return nil
+}
+
+// transformEvent takes an event and determines what version it is from
+// then populates both versions of the event
+func transformEvent(event *APIEvents) {
+ // if event version is <= 1.21 there will be no Action and no Type
+ if event.Action == "" && event.Type == "" {
+ event.Action = event.Status
+ event.Actor.ID = event.ID
+ event.Actor.Attributes = map[string]string{}
+ switch event.Status {
+ case "delete", "import", "pull", "push", "tag", "untag":
+ event.Type = "image"
+ default:
+ event.Type = "container"
+ if event.From != "" {
+ event.Actor.Attributes["image"] = event.From
+ }
+ }
+ } else {
+ if event.Status == "" {
+ if event.Type == "image" || event.Type == "container" {
+ event.Status = event.Action
+ } else {
+ // Because just the Status has been overloaded with different Types
+ // if an event is not for an image or a container, we prepend the type
+ // to avoid problems for people relying on actions being only for
+ // images and containers
+ event.Status = event.Type + ":" + event.Action
+ }
+ }
+ if event.ID == "" {
+ event.ID = event.Actor.ID
+ }
+ if event.From == "" {
+ event.From = event.Actor.Attributes["image"]
+ }
+ }
+}