summaryrefslogtreecommitdiff
path: root/vendor/github.com/fsouza/go-dockerclient/event.go
diff options
context:
space:
mode:
Diffstat (limited to 'vendor/github.com/fsouza/go-dockerclient/event.go')
-rw-r--r--vendor/github.com/fsouza/go-dockerclient/event.go79
1 files changed, 60 insertions, 19 deletions
diff --git a/vendor/github.com/fsouza/go-dockerclient/event.go b/vendor/github.com/fsouza/go-dockerclient/event.go
index 8e362d44e..024b4ecc2 100644
--- a/vendor/github.com/fsouza/go-dockerclient/event.go
+++ b/vendor/github.com/fsouza/go-dockerclient/event.go
@@ -7,17 +7,48 @@ package docker
import (
"encoding/json"
"errors"
- "fmt"
"io"
"math"
"net"
"net/http"
"net/http/httputil"
+ "strconv"
"sync"
"sync/atomic"
"time"
)
+// EventsOptions to filter events
+// See https://docs.docker.com/engine/api/v1.41/#operation/SystemEvents for more details.
+type EventsOptions struct {
+ // Show events created since this timestamp then stream new events.
+ Since string
+
+ // Show events created until this timestamp then stop streaming.
+ Until string
+
+ // Filter for events. For example:
+ // map[string][]string{"type": {"container"}, "event": {"start", "die"}}
+ // will return events when container was started and stopped or killed
+ //
+ // Available filters:
+ // config=<string> config name or ID
+ // container=<string> container name or ID
+ // daemon=<string> daemon name or ID
+ // event=<string> event type
+ // image=<string> image name or ID
+ // label=<string> image or container label
+ // network=<string> network name or ID
+ // node=<string> node ID
+ // plugin= plugin name or ID
+ // scope= local or swarm
+ // secret=<string> secret name or ID
+ // service=<string> service name or ID
+ // type=<string> container, image, volume, network, daemon, plugin, node, service, secret or config
+ // volume=<string> volume name
+ Filters map[string][]string
+}
+
// APIEvents represents events coming from the Docker API
// The fields in the Docker API changed in API version 1.22, and
// events for more than images and containers are now fired off.
@@ -93,9 +124,17 @@ var (
//
// The parameter is a channel through which events will be sent.
func (c *Client) AddEventListener(listener chan<- *APIEvents) error {
+ return c.AddEventListenerWithOptions(EventsOptions{}, listener)
+}
+
+// AddEventListener adds a new listener to container events in the Docker API.
+// See https://docs.docker.com/engine/api/v1.41/#operation/SystemEvents for more details.
+//
+// The listener parameter is a channel through which events will be sent.
+func (c *Client) AddEventListenerWithOptions(options EventsOptions, listener chan<- *APIEvents) error {
var err error
if !c.eventMonitor.isEnabled() {
- err = c.eventMonitor.enableEventMonitoring(c)
+ err = c.eventMonitor.enableEventMonitoring(c, options)
if err != nil {
return err
}
@@ -165,7 +204,7 @@ func listenerExists(a chan<- *APIEvents, list *[]chan<- *APIEvents) bool {
return false
}
-func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error {
+func (eventState *eventMonitoringState) enableEventMonitoring(c *Client, opts EventsOptions) error {
eventState.Lock()
defer eventState.Unlock()
if !eventState.enabled {
@@ -173,7 +212,7 @@ func (eventState *eventMonitoringState) enableEventMonitoring(c *Client) error {
atomic.StoreInt64(&eventState.lastSeen, 0)
eventState.C = make(chan *APIEvents, 100)
eventState.errC = make(chan error, 1)
- go eventState.monitorEvents(c)
+ go eventState.monitorEvents(c, opts)
}
return nil
}
@@ -193,7 +232,7 @@ func (eventState *eventMonitoringState) disableEventMonitoring() {
}
}
-func (eventState *eventMonitoringState) monitorEvents(c *Client) {
+func (eventState *eventMonitoringState) monitorEvents(c *Client, opts EventsOptions) {
const (
noListenersTimeout = 5 * time.Second
noListenersInterval = 10 * time.Millisecond
@@ -213,7 +252,7 @@ func (eventState *eventMonitoringState) monitorEvents(c *Client) {
return
}
- if err = eventState.connectWithRetry(c); err != nil {
+ if err = eventState.connectWithRetry(c, opts); err != nil {
// terminate if connect failed
eventState.disableEventMonitoring()
return
@@ -232,11 +271,11 @@ func (eventState *eventMonitoringState) monitorEvents(c *Client) {
eventState.updateLastSeen(ev)
eventState.sendEvent(ev)
case err = <-eventState.errC:
- if err == ErrNoListeners {
+ if errors.Is(err, ErrNoListeners) {
eventState.disableEventMonitoring()
return
} else if err != nil {
- defer func() { go eventState.monitorEvents(c) }()
+ defer func() { go eventState.monitorEvents(c, opts) }()
return
}
case <-timeout:
@@ -245,13 +284,13 @@ func (eventState *eventMonitoringState) monitorEvents(c *Client) {
}
}
-func (eventState *eventMonitoringState) connectWithRetry(c *Client) error {
+func (eventState *eventMonitoringState) connectWithRetry(c *Client, opts EventsOptions) error {
var retries int
eventState.RLock()
eventChan := eventState.C
errChan := eventState.errC
eventState.RUnlock()
- err := c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
+ err := c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
for ; err != nil && retries < maxMonitorConnRetries; retries++ {
waitTime := int64(retryInitialWaitTime * math.Pow(2, float64(retries)))
time.Sleep(time.Duration(waitTime) * time.Millisecond)
@@ -259,7 +298,7 @@ func (eventState *eventMonitoringState) connectWithRetry(c *Client) error {
eventChan = eventState.C
errChan = eventState.errC
eventState.RUnlock()
- err = c.eventHijack(atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
+ err = c.eventHijack(opts, atomic.LoadInt64(&eventState.lastSeen), eventChan, errChan)
}
return err
}
@@ -304,11 +343,12 @@ func (eventState *eventMonitoringState) updateLastSeen(e *APIEvents) {
}
}
-func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan chan error) error {
- uri := "/events"
+func (c *Client) eventHijack(opts EventsOptions, startTime int64, eventChan chan *APIEvents, errChan chan error) error {
+ // on reconnect override initial Since with last event seen time
if startTime != 0 {
- uri += fmt.Sprintf("?since=%d", startTime)
+ opts.Since = strconv.FormatInt(startTime, 10)
}
+ uri := "/events?" + queryString(opts)
protocol := c.endpointURL.Scheme
address := c.endpointURL.Path
if protocol != "unix" && protocol != "npipe" {
@@ -329,16 +369,17 @@ func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan
if err != nil {
return err
}
- conn := httputil.NewClientConn(dial, nil) //nolint:staticcheck
- req, err := http.NewRequest(http.MethodGet, uri, nil) //nolint:noctx
+ //lint:ignore SA1019 the alternative doesn't quite work, so keep using the deprecated thing.
+ conn := httputil.NewClientConn(dial, nil)
+ req, err := http.NewRequest(http.MethodGet, uri, nil)
if err != nil {
return err
}
- res, err := conn.Do(req) //nolint:bodyclose
+ res, err := conn.Do(req)
if err != nil {
return err
}
- //nolint:staticcheck
+ //lint:ignore SA1019 the alternative doesn't quite work, so keep using the deprecated thing.
go func(res *http.Response, conn *httputil.ClientConn) {
defer conn.Close()
defer res.Body.Close()
@@ -346,7 +387,7 @@ func (c *Client) eventHijack(startTime int64, eventChan chan *APIEvents, errChan
for {
var event APIEvents
if err = decoder.Decode(&event); err != nil {
- if err == io.EOF || err == io.ErrUnexpectedEOF {
+ if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
c.eventMonitor.RLock()
if c.eventMonitor.enabled && c.eventMonitor.C == eventChan {
// Signal that we're exiting.