diff options
Diffstat (limited to 'vendor/github.com/fsouza/go-dockerclient/event.go')
-rw-r--r-- | vendor/github.com/fsouza/go-dockerclient/event.go | 79 |
1 files changed, 60 insertions, 19 deletions
diff --git a/vendor/github.com/fsouza/go-dockerclient/event.go b/vendor/github.com/fsouza/go-dockerclient/event.go index 8e362d44e..024b4ecc2 100644 --- a/vendor/github.com/fsouza/go-dockerclient/event.go +++ b/vendor/github.com/fsouza/go-dockerclient/event.go @@ -7,17 +7,48 @@ package docker import ( "encoding/json" "errors" - "fmt" "io" "math" "net" "net/http" "net/http/httputil" + "strconv" "sync" "sync/atomic" "time" ) +// EventsOptions to filter events +// See https://docs.docker.com/engine/api/v1.41/#operation/SystemEvents for more details. +type EventsOptions struct { + // Show events created since this timestamp then stream new events. + Since string + + // Show events created until this timestamp then stop streaming. + Until string + + // Filter for events. For example: + // map[string][]string{"type": {"container"}, "event": {"start", "die"}} + // will return events when container was started and stopped or killed + // + // Available filters: + // config=<string> config name or ID + // container=<string> container name or ID + // daemon=<string> daemon name or ID + // event=<string> event type + // image=<string> image name or ID + // label=<string> image or container label + // network=<string> network name or ID + // node=<string> node ID + // plugin= plugin name or ID + // scope= local or swarm + // secret=<string> secret name or ID + // service=<string> service name or ID + // type=<string> container, image, volume, network, daemon, plugin, node, service, secret or config + // volume=<string> volume name + Filters map[string][]string +} + // APIEvents represents events coming from the Docker API // The fields in the Docker API changed in API version 1.22, and // events for more than images and containers are now fired off. @@ -93,9 +124,17 @@ var ( // // The parameter is a channel through which events will be sent. func (c *Client) AddEventListener(listener chan<- *APIEvents) error { + return c.AddEventListenerWithOptions(EventsOptions{}, listener) +} + +// AddEventListener adds a new listener to container events in the Docker API. +// See https://docs.docker.com/engine/api/v1.41/#operation/SystemEvents for more details. +// +// The listener parameter is a channel through which events will be sent. +func (c *Client) AddEventListenerWithOptions(options EventsOptions, listener chan<- *APIEvents) error { var err error if !c.eventMonitor.isEnabled() { - err = c.eventMonitor.enableEventMonitoring(c) + err = c.eventMonitor.enableEventMonitoring(c, options) if err != nil { return err } @@ -165,7 +204,7 @@ func listenerExists(a chan<- *APIEvents, list *[]chan<- *APIEvents) bool { return false } -func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error { +func (eventState *eventMonitoringState) enableEventMonitoring(c *Client, opts EventsOptions) error { eventState.Lock() defer eventState.Unlock() if !eventState.enabled { @@ -173,7 +212,7 @@ func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error { atomic.StoreInt64(&eventState.lastSeen, 0) eventState.C = make(chan *APIEvents, 100) eventState.errC = make(chan error, 1) - go eventState.monitorEvents(c) + go eventState.monitorEvents(c, opts) } return nil } @@ -193,7 +232,7 @@ func (eventState *eventMonitoringState) disableEventMonitoring() { } } -func (eventState *eventMonitoringState) monitorEvents(c *Client) { +func (eventState *eventMonitoringState) monitorEvents(c *Client, opts EventsOptions) { const ( noListenersTimeout = 5 * time.Second noListenersInterval = 10 * time.Millisecond @@ -213,7 +252,7 @@ func (eventState *eventMonitoringState) monitorEvents(c *Client) { return } - if err = eventState.connectWithRetry(c); err != nil { + if err = eventState.connectWithRetry(c, opts); err != nil { // terminate if connect failed eventState.disableEventMonitoring() return @@ -232,11 +271,11 @@ func (eventState *eventMonitoringState) monitorEvents(c *Client) { eventState.updateLastSeen(ev) eventState.sendEvent(ev) case err = <-eventState.errC: - if err == ErrNoListeners { + if errors.Is(err, ErrNoListeners) { eventState.disableEventMonitoring() return } else if err != nil { - defer func() { go eventState.monitorEvents(c) }() + defer func() { go eventState.monitorEvents(c, opts) }() return } case <-timeout: @@ -245,13 +284,13 @@ func (eventState *eventMonitoringState) monitorEvents(c *Client) { } } -func (eventState *eventMonitoringState) connectWithRetry(c *Client) error { +func (eventState *eventMonitoringState) connectWithRetry(c *Client, opts EventsOptions) error { var retries int eventState.RLock() eventChan := eventState.C errChan := eventState.errC eventState.RUnlock() - err := c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan) + err := c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan) for ; err != nil && retries < maxMonitorConnRetries; retries++ { waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries))) time.Sleep(time.Duration(waitTime) * time.Millisecond) @@ -259,7 +298,7 @@ func (eventState *eventMonitoringState) connectWithRetry(c *Client) error { eventChan = eventState.C errChan = eventState.errC eventState.RUnlock() - err = c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan) + err = c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan) } return err } @@ -304,11 +343,12 @@ func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) { } } -func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan chan error) error { - uri := "/events" +func (c *Client) eventHijack(opts EventsOptions, startTime int64, eventChan chan *APIEvents, errChan chan error) error { + // on reconnect override initial Since with last event seen time if startTime != 0 { - uri += fmt.Sprintf("?since=%d", startTime) + opts.Since = strconv.FormatInt(startTime, 10) } + uri := "/events?" + queryString(opts) protocol := c.endpointURL.Scheme address := c.endpointURL.Path if protocol != "unix" && protocol != "npipe" { @@ -329,16 +369,17 @@ func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan if err != nil { return err } - conn := httputil.NewClientConn(dial, nil) //nolint:staticcheck - req, err := http.NewRequest(http.MethodGet, uri, nil) //nolint:noctx + //lint:ignore SA1019 the alternative doesn't quite work, so keep using the deprecated thing. + conn := httputil.NewClientConn(dial, nil) + req, err := http.NewRequest(http.MethodGet, uri, nil) if err != nil { return err } - res, err := conn.Do(req) //nolint:bodyclose + res, err := conn.Do(req) if err != nil { return err } - //nolint:staticcheck + //lint:ignore SA1019 the alternative doesn't quite work, so keep using the deprecated thing. go func(res *http.Response, conn *httputil.ClientConn) { defer conn.Close() defer res.Body.Close() @@ -346,7 +387,7 @@ func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan for { var event APIEvents if err = decoder.Decode(&event); err != nil { - if err == io.EOF || err == io.ErrUnexpectedEOF { + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { c.eventMonitor.RLock() if c.eventMonitor.enabled && c.eventMonitor.C == eventChan { // Signal that we're exiting. |