summaryrefslogtreecommitdiff
path: root/pkg/api/handlers/compat/events.go
blob: 3fc8248d6bdcd587dd2a0efe38801bb1e7d614ce (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package compat

import (
	"fmt"
	"net/http"
	"sync"

	"github.com/containers/libpod/v2/libpod"
	"github.com/containers/libpod/v2/libpod/events"
	"github.com/containers/libpod/v2/pkg/api/handlers/utils"
	"github.com/containers/libpod/v2/pkg/domain/entities"
	"github.com/gorilla/schema"
	jsoniter "github.com/json-iterator/go"
	"github.com/pkg/errors"
	"github.com/sirupsen/logrus"
)

func GetEvents(w http.ResponseWriter, r *http.Request) {
	var (
		fromStart bool
		decoder   = r.Context().Value("decoder").(*schema.Decoder)
		runtime   = r.Context().Value("runtime").(*libpod.Runtime)
		json      = jsoniter.ConfigCompatibleWithStandardLibrary // FIXME: this should happen on the package level
	)

	query := struct {
		Since   string              `schema:"since"`
		Until   string              `schema:"until"`
		Filters map[string][]string `schema:"filters"`
		Stream  bool                `schema:"stream"`
	}{
		Stream: true,
	}
	if err := decoder.Decode(&query, r.URL.Query()); err != nil {
		utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Wrapf(err, "Failed to parse parameters for %s", r.URL.String()))
		return
	}

	var libpodFilters = []string{}
	if _, found := r.URL.Query()["filters"]; found {
		for k, v := range query.Filters {
			if len(v) == 0 {
				utils.Error(w, "Failed to parse parameters", http.StatusBadRequest, errors.Errorf("empty value for filter %q", k))
				return
			}
			libpodFilters = append(libpodFilters, fmt.Sprintf("%s=%s", k, v[0]))
		}
	}

	if len(query.Since) > 0 || len(query.Until) > 0 {
		fromStart = true
	}

	eventChannel := make(chan *events.Event)
	errorChannel := make(chan error)

	// Start reading events.
	go func() {
		readOpts := events.ReadOptions{
			FromStart:    fromStart,
			Stream:       query.Stream,
			Filters:      libpodFilters,
			EventChannel: eventChannel,
			Since:        query.Since,
			Until:        query.Until,
		}
		errorChannel <- runtime.Events(r.Context(), readOpts)
	}()

	var coder *jsoniter.Encoder
	var writeHeader sync.Once

	for stream := true; stream; stream = query.Stream {
		select {
		case err := <-errorChannel:
			if err != nil {
				utils.InternalServerError(w, err)
				return
			}
		case evt := <-eventChannel:
			writeHeader.Do(func() {
				// Use a sync.Once so that we write the header
				// only once.
				w.Header().Set("Content-Type", "application/json")
				w.WriteHeader(http.StatusOK)
				if flusher, ok := w.(http.Flusher); ok {
					flusher.Flush()
				}
				coder = json.NewEncoder(w)
				coder.SetEscapeHTML(true)
			})

			if evt == nil {
				continue
			}

			e := entities.ConvertToEntitiesEvent(*evt)
			if err := coder.Encode(e); err != nil {
				logrus.Errorf("unable to write json: %q", err)
			}
			if flusher, ok := w.(http.Flusher); ok {
				flusher.Flush()
			}
		}

	}
}