aboutsummaryrefslogtreecommitdiff
path: root/pkg
diff options
context:
space:
mode:
authorOpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com>2020-05-27 15:07:32 -0400
committerGitHub <noreply@github.com>2020-05-27 15:07:32 -0400
commitc64abd0b037e5918d6d24e326005ae65131dd738 (patch)
tree81a3647c98522436b35a1520d44cc2f64d707775 /pkg
parentab3a620f74f2f1458054822f2113200863b60d40 (diff)
parent8438fa4fec01142ad9f6943f6e0c429c64d951b7 (diff)
downloadpodman-c64abd0b037e5918d6d24e326005ae65131dd738.tar.gz
podman-c64abd0b037e5918d6d24e326005ae65131dd738.tar.bz2
podman-c64abd0b037e5918d6d24e326005ae65131dd738.zip
Merge pull request #6407 from baude/v2eventsstream
Add streaming ability to endpoint
Diffstat (limited to 'pkg')
-rw-r--r--pkg/api/handlers/compat/events.go12
-rw-r--r--pkg/api/server/register_events.go5
-rw-r--r--pkg/bindings/system/system.go29
-rw-r--r--pkg/bindings/test/system_test.go4
-rw-r--r--pkg/domain/infra/abi/events.go5
-rw-r--r--pkg/domain/infra/tunnel/events.go3
6 files changed, 38 insertions, 20 deletions
diff --git a/pkg/api/handlers/compat/events.go b/pkg/api/handlers/compat/events.go
index 7ebfb0d1e..577ddd0a1 100644
--- a/pkg/api/handlers/compat/events.go
+++ b/pkg/api/handlers/compat/events.go
@@ -26,7 +26,10 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
Since string `schema:"since"`
Until string `schema:"until"`
Filters map[string][]string `schema:"filters"`
- }{}
+ Stream bool `schema:"stream"`
+ }{
+ Stream: true,
+ }
if err := decoder.Decode(&query, r.URL.Query()); err != nil {
utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Wrapf(err, "Failed to parse parameters for %s", r.URL.String()))
}
@@ -41,9 +44,10 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
if len(query.Since) > 0 || len(query.Until) > 0 {
fromStart = true
}
+
eventChannel := make(chan *events.Event)
go func() {
- readOpts := events.ReadOptions{FromStart: fromStart, Stream: true, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until}
+ readOpts := events.ReadOptions{FromStart: fromStart, Stream: query.Stream, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until}
eventsError = runtime.Events(readOpts)
}()
if eventsError != nil {
@@ -55,7 +59,9 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
// If client disappears we need to stop listening for events
go func(done <-chan struct{}) {
<-done
- close(eventChannel)
+ if _, ok := <-eventChannel; ok {
+ close(eventChannel)
+ }
}(r.Context().Done())
// Headers need to be written out before turning Writer() over to json encoder
diff --git a/pkg/api/server/register_events.go b/pkg/api/server/register_events.go
index e909303da..2b85eb169 100644
--- a/pkg/api/server/register_events.go
+++ b/pkg/api/server/register_events.go
@@ -58,6 +58,11 @@ func (s *APIServer) registerEventsHandlers(r *mux.Router) error {
// type: string
// in: query
// description: JSON encoded map[string][]string of constraints
+ // - name: stream
+ // type: boolean
+ // in: query
+ // default: true
+ // description: when false, do not follow events
// responses:
// 200:
// description: returns a string of json data describing an event
diff --git a/pkg/bindings/system/system.go b/pkg/bindings/system/system.go
index 5348d0cfb..2cd894228 100644
--- a/pkg/bindings/system/system.go
+++ b/pkg/bindings/system/system.go
@@ -20,7 +20,7 @@ import (
// Events allows you to monitor libdpod related events like container creation and
// removal. The events are then passed to the eventChan provided. The optional cancelChan
// can be used to cancel the read of events and close down the HTTP connection.
-func Events(ctx context.Context, eventChan chan (entities.Event), cancelChan chan bool, since, until *string, filters map[string][]string) error {
+func Events(ctx context.Context, eventChan chan entities.Event, cancelChan chan bool, since, until *string, filters map[string][]string, stream *bool) error {
conn, err := bindings.GetClient(ctx)
if err != nil {
return err
@@ -32,6 +32,9 @@ func Events(ctx context.Context, eventChan chan (entities.Event), cancelChan cha
if until != nil {
params.Set("until", *until)
}
+ if stream != nil {
+ params.Set("stream", strconv.FormatBool(*stream))
+ }
if filters != nil {
filterString, err := bindings.FiltersToString(filters)
if err != nil {
@@ -50,18 +53,24 @@ func Events(ctx context.Context, eventChan chan (entities.Event), cancelChan cha
logrus.Error(errors.Wrap(err, "unable to close event response body"))
}()
}
+
dec := json.NewDecoder(response.Body)
- for {
- e := entities.Event{}
- if err := dec.Decode(&e); err != nil {
- if err == io.EOF {
- break
- }
- return errors.Wrap(err, "unable to decode event response")
+ for err = (error)(nil); err == nil; {
+ var e = entities.Event{}
+ err = dec.Decode(&e)
+ if err == nil {
+ eventChan <- e
}
- eventChan <- e
}
- return nil
+ close(eventChan)
+ switch {
+ case err == nil:
+ return nil
+ case errors.Is(err, io.EOF):
+ return nil
+ default:
+ return errors.Wrap(err, "unable to decode event response")
+ }
}
// Prune removes all unused system data.
diff --git a/pkg/bindings/test/system_test.go b/pkg/bindings/test/system_test.go
index 27ab2f555..dd3778754 100644
--- a/pkg/bindings/test/system_test.go
+++ b/pkg/bindings/test/system_test.go
@@ -47,13 +47,13 @@ var _ = Describe("Podman system", func() {
}
}()
go func() {
- system.Events(bt.conn, eChan, cancelChan, nil, nil, nil)
+ system.Events(bt.conn, eChan, cancelChan, nil, nil, nil, bindings.PFalse)
}()
_, err := bt.RunTopContainer(nil, nil, nil)
Expect(err).To(BeNil())
cancelChan <- true
- Expect(len(messages)).To(BeNumerically("==", 3))
+ Expect(len(messages)).To(BeNumerically("==", 5))
})
It("podman system prune - pod,container stopped", func() {
diff --git a/pkg/domain/infra/abi/events.go b/pkg/domain/infra/abi/events.go
index 20773cdce..7ec9db369 100644
--- a/pkg/domain/infra/abi/events.go
+++ b/pkg/domain/infra/abi/events.go
@@ -5,12 +5,9 @@ import (
"github.com/containers/libpod/libpod/events"
"github.com/containers/libpod/pkg/domain/entities"
- "github.com/sirupsen/logrus"
)
func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptions) error {
readOpts := events.ReadOptions{FromStart: opts.FromStart, Stream: opts.Stream, Filters: opts.Filter, EventChannel: opts.EventChan, Since: opts.Since, Until: opts.Until}
- err := ic.Libpod.Events(readOpts)
- logrus.Error(err)
- return err
+ return ic.Libpod.Events(readOpts)
}
diff --git a/pkg/domain/infra/tunnel/events.go b/pkg/domain/infra/tunnel/events.go
index 93da3aeb4..6a08a1f85 100644
--- a/pkg/domain/infra/tunnel/events.go
+++ b/pkg/domain/infra/tunnel/events.go
@@ -25,6 +25,7 @@ func (ic *ContainerEngine) Events(ctx context.Context, opts entities.EventsOptio
for e := range binChan {
opts.EventChan <- entities.ConvertToLibpodEvent(e)
}
+ close(opts.EventChan)
}()
- return system.Events(ic.ClientCxt, binChan, nil, &opts.Since, &opts.Until, filters)
+ return system.Events(ic.ClientCxt, binChan, nil, &opts.Since, &opts.Until, filters, &opts.Stream)
}