summaryrefslogtreecommitdiff
path: root/pkg/api
diff options
context:
space:
mode:
Diffstat (limited to 'pkg/api')
-rw-r--r--pkg/api/handlers/compat/events.go92
1 files changed, 54 insertions, 38 deletions
diff --git a/pkg/api/handlers/compat/events.go b/pkg/api/handlers/compat/events.go
index 5acc94153..3fc8248d6 100644
--- a/pkg/api/handlers/compat/events.go
+++ b/pkg/api/handlers/compat/events.go
@@ -1,9 +1,9 @@
package compat
import (
- "context"
"fmt"
"net/http"
+ "sync"
"github.com/containers/libpod/v2/libpod"
"github.com/containers/libpod/v2/libpod/events"
@@ -17,10 +17,10 @@ import (
func GetEvents(w http.ResponseWriter, r *http.Request) {
var (
- fromStart bool
- eventsError error
- decoder = r.Context().Value("decoder").(*schema.Decoder)
- runtime = r.Context().Value("runtime").(*libpod.Runtime)
+ fromStart bool
+ decoder = r.Context().Value("decoder").(*schema.Decoder)
+ runtime = r.Context().Value("runtime").(*libpod.Runtime)
+ json = jsoniter.ConfigCompatibleWithStandardLibrary // FIXME: this should happen on the package level
)
query := struct {
@@ -33,11 +33,16 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
}
if err := decoder.Decode(&query, r.URL.Query()); err != nil {
utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Wrapf(err, "Failed to parse parameters for %s", r.URL.String()))
+ return
}
var libpodFilters = []string{}
if _, found := r.URL.Query()["filters"]; found {
for k, v := range query.Filters {
+ if len(v) == 0 {
+ utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Errorf("empty value for filter %q", k))
+ return
+ }
libpodFilters = append(libpodFilters, fmt.Sprintf("%s=%s", k, v[0]))
}
}
@@ -46,46 +51,57 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
fromStart = true
}
- eventCtx, eventCancel := context.WithCancel(r.Context())
eventChannel := make(chan *events.Event)
+ errorChannel := make(chan error)
+
+ // Start reading events.
go func() {
- readOpts := events.ReadOptions{FromStart: fromStart, Stream: query.Stream, Filters: libpodFilters, EventChannel: eventChannel, Since: query.Since, Until: query.Until}
- eventsError = runtime.Events(eventCtx, readOpts)
+ readOpts := events.ReadOptions{
+ FromStart: fromStart,
+ Stream: query.Stream,
+ Filters: libpodFilters,
+ EventChannel: eventChannel,
+ Since: query.Since,
+ Until: query.Until,
+ }
+ errorChannel <- runtime.Events(r.Context(), readOpts)
}()
- if eventsError != nil {
- utils.InternalServerError(w, eventsError)
- eventCancel()
- close(eventChannel)
- return
- }
- // If client disappears we need to stop listening for events
- go func(done <-chan struct{}) {
- <-done
- eventCancel()
- if _, ok := <-eventChannel; ok {
- close(eventChannel)
- }
- }(r.Context().Done())
+ var coder *jsoniter.Encoder
+ var writeHeader sync.Once
- // Headers need to be written out before turning Writer() over to json encoder
- w.Header().Set("Content-Type", "application/json")
- w.WriteHeader(http.StatusOK)
- if flusher, ok := w.(http.Flusher); ok {
- flusher.Flush()
- }
+ for stream := true; stream; stream = query.Stream {
+ select {
+ case err := <-errorChannel:
+ if err != nil {
+ utils.InternalServerError(w, err)
+ return
+ }
+ case evt := <-eventChannel:
+ writeHeader.Do(func() {
+ // Use a sync.Once so that we write the header
+ // only once.
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(http.StatusOK)
+ if flusher, ok := w.(http.Flusher); ok {
+ flusher.Flush()
+ }
+ coder = json.NewEncoder(w)
+ coder.SetEscapeHTML(true)
+ })
- json := jsoniter.ConfigCompatibleWithStandardLibrary
- coder := json.NewEncoder(w)
- coder.SetEscapeHTML(true)
+ if evt == nil {
+ continue
+ }
- for event := range eventChannel {
- e := entities.ConvertToEntitiesEvent(*event)
- if err := coder.Encode(e); err != nil {
- logrus.Errorf("unable to write json: %q", err)
- }
- if flusher, ok := w.(http.Flusher); ok {
- flusher.Flush()
+ e := entities.ConvertToEntitiesEvent(*evt)
+ if err := coder.Encode(e); err != nil {
+ logrus.Errorf("unable to write json: %q", err)
+ }
+ if flusher, ok := w.(http.Flusher); ok {
+ flusher.Flush()
+ }
}
+
}
}