From 8b9e5fb08591a9477d0173344a8624623b958d86 Mon Sep 17 00:00:00 2001 From: "Jason T. Greene" Date: Thu, 28 Jul 2022 21:55:16 -0500 Subject: Use 8k buffer to help clients w/ broken parsing Signed-off-by: Jason T. Greene --- pkg/api/server/handler_api.go | 95 ++++++++++++++++++++++++++++------- pkg/api/server/register_containers.go | 8 +-- pkg/api/server/register_events.go | 4 +- pkg/api/server/register_images.go | 4 +- 4 files changed, 84 insertions(+), 27 deletions(-) diff --git a/pkg/api/server/handler_api.go b/pkg/api/server/handler_api.go index a3aa681d0..57cafa5f6 100644 --- a/pkg/api/server/handler_api.go +++ b/pkg/api/server/handler_api.go @@ -1,7 +1,10 @@ package server import ( + "bufio" + "errors" "fmt" + "net" "net/http" "runtime" @@ -9,34 +12,54 @@ import ( "github.com/sirupsen/logrus" ) +type BufferedResponseWriter struct { + b *bufio.Writer + w http.ResponseWriter +} + // APIHandler is a wrapper to enhance HandlerFunc's and remove redundant code func (s *APIServer) APIHandler(h http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { // Wrapper to hide some boilerplate - fn := func(w http.ResponseWriter, r *http.Request) { - if err := r.ParseForm(); err != nil { - logrus.WithFields(logrus.Fields{ - "X-Reference-Id": r.Header.Get("X-Reference-Id"), - }).Info("Failed Request: unable to parse form: " + err.Error()) - } + s.apiWrapper(h, w, r, false) + } +} + +// An API Handler to help historical clients with broken parsing that expect +// streaming JSON payloads to be reliably messaged framed (full JSON record +// always fits in each read()) +func (s *APIServer) StreamBufferedAPIHandler(h http.HandlerFunc) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // Wrapper to hide some boilerplate + s.apiWrapper(h, w, r, true) + } +} - cv := version.APIVersion[version.Compat][version.CurrentAPI] - w.Header().Set("API-Version", fmt.Sprintf("%d.%d", cv.Major, cv.Minor)) +func (s *APIServer) apiWrapper(h http.HandlerFunc, w http.ResponseWriter, r *http.Request, buffer bool) { + if err := r.ParseForm(); err != nil { + logrus.WithFields(logrus.Fields{ + "X-Reference-Id": r.Header.Get("X-Reference-Id"), + }).Info("Failed Request: unable to parse form: " + err.Error()) + } - lv := version.APIVersion[version.Libpod][version.CurrentAPI].String() - w.Header().Set("Libpod-API-Version", lv) - w.Header().Set("Server", "Libpod/"+lv+" ("+runtime.GOOS+")") + cv := version.APIVersion[version.Compat][version.CurrentAPI] + w.Header().Set("API-Version", fmt.Sprintf("%d.%d", cv.Major, cv.Minor)) - if s.CorsHeaders != "" { - w.Header().Set("Access-Control-Allow-Origin", s.CorsHeaders) - w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, X-Registry-Auth, Connection, Upgrade, X-Registry-Config") - w.Header().Set("Access-Control-Allow-Methods", "HEAD, GET, POST, DELETE, PUT, OPTIONS") - } + lv := version.APIVersion[version.Libpod][version.CurrentAPI].String() + w.Header().Set("Libpod-API-Version", lv) + w.Header().Set("Server", "Libpod/"+lv+" ("+runtime.GOOS+")") - h(w, r) - } - fn(w, r) + if s.CorsHeaders != "" { + w.Header().Set("Access-Control-Allow-Origin", s.CorsHeaders) + w.Header().Set("Access-Control-Allow-Headers", "Origin, X-Requested-With, Content-Type, Accept, X-Registry-Auth, Connection, Upgrade, X-Registry-Config") + w.Header().Set("Access-Control-Allow-Methods", "HEAD, GET, POST, DELETE, PUT, OPTIONS") } + + if buffer { + w = newBufferedResponseWriter(w) + } + + h(w, r) } // VersionedPath prepends the version parsing code @@ -44,3 +67,37 @@ func (s *APIServer) APIHandler(h http.HandlerFunc) http.HandlerFunc { func VersionedPath(p string) string { return "/v{version:[0-9][0-9A-Za-z.-]*}" + p } + +func (w *BufferedResponseWriter) Header() http.Header { + return w.w.Header() +} + +func (w *BufferedResponseWriter) Hijack() (net.Conn, *bufio.ReadWriter, error) { + _ = w.b.Flush() + if wrapped, ok := w.w.(http.Hijacker); ok { + return wrapped.Hijack() + } + + return nil, nil, errors.New("ResponseWriter does not support hijacking") +} + +func (w *BufferedResponseWriter) Write(b []byte) (int, error) { + return w.b.Write(b) +} + +func (w *BufferedResponseWriter) WriteHeader(statusCode int) { + w.w.WriteHeader(statusCode) +} + +func (w *BufferedResponseWriter) Flush() { + _ = w.b.Flush() + if wrapped, ok := w.w.(http.Flusher); ok { + wrapped.Flush() + } +} +func newBufferedResponseWriter(rw http.ResponseWriter) *BufferedResponseWriter { + return &BufferedResponseWriter{ + bufio.NewWriterSize(rw, 8192), + rw, + } +} diff --git a/pkg/api/server/register_containers.go b/pkg/api/server/register_containers.go index e2ecdb6af..b319fc14a 100644 --- a/pkg/api/server/register_containers.go +++ b/pkg/api/server/register_containers.go @@ -397,9 +397,9 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error { // $ref: "#/responses/containerNotFound" // 500: // $ref: "#/responses/internalError" - r.HandleFunc(VersionedPath("/containers/{name}/stats"), s.APIHandler(compat.StatsContainer)).Methods(http.MethodGet) + r.HandleFunc(VersionedPath("/containers/{name}/stats"), s.StreamBufferedAPIHandler(compat.StatsContainer)).Methods(http.MethodGet) // Added non version path to URI to support docker non versioned paths - r.HandleFunc("/containers/{name}/stats", s.APIHandler(compat.StatsContainer)).Methods(http.MethodGet) + r.HandleFunc("/containers/{name}/stats", s.StreamBufferedAPIHandler(compat.StatsContainer)).Methods(http.MethodGet) // swagger:operation POST /containers/{name}/stop compat ContainerStop // --- // tags: @@ -455,9 +455,9 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error { // $ref: "#/responses/containerNotFound" // 500: // $ref: "#/responses/internalError" - r.HandleFunc(VersionedPath("/containers/{name}/top"), s.APIHandler(compat.TopContainer)).Methods(http.MethodGet) + r.HandleFunc(VersionedPath("/containers/{name}/top"), s.StreamBufferedAPIHandler(compat.TopContainer)).Methods(http.MethodGet) // Added non version path to URI to support docker non versioned paths - r.HandleFunc("/containers/{name}/top", s.APIHandler(compat.TopContainer)).Methods(http.MethodGet) + r.HandleFunc("/containers/{name}/top", s.StreamBufferedAPIHandler(compat.TopContainer)).Methods(http.MethodGet) // swagger:operation POST /containers/{name}/unpause compat ContainerUnpause // --- // tags: diff --git a/pkg/api/server/register_events.go b/pkg/api/server/register_events.go index 76f9ec619..3442b3eab 100644 --- a/pkg/api/server/register_events.go +++ b/pkg/api/server/register_events.go @@ -34,9 +34,9 @@ func (s *APIServer) registerEventsHandlers(r *mux.Router) error { // description: returns a string of json data describing an event // 500: // "$ref": "#/responses/internalError" - r.Handle(VersionedPath("/events"), s.APIHandler(compat.GetEvents)).Methods(http.MethodGet) + r.Handle(VersionedPath("/events"), s.StreamBufferedAPIHandler(compat.GetEvents)).Methods(http.MethodGet) // Added non version path to URI to support docker non versioned paths - r.Handle("/events", s.APIHandler(compat.GetEvents)).Methods(http.MethodGet) + r.Handle("/events", s.StreamBufferedAPIHandler(compat.GetEvents)).Methods(http.MethodGet) // swagger:operation GET /libpod/events system SystemEventsLibpod // --- // tags: diff --git a/pkg/api/server/register_images.go b/pkg/api/server/register_images.go index 1bfedd77e..d71e0d470 100644 --- a/pkg/api/server/register_images.go +++ b/pkg/api/server/register_images.go @@ -702,9 +702,9 @@ func (s *APIServer) registerImagesHandlers(r *mux.Router) error { // $ref: "#/responses/badParamError" // 500: // $ref: "#/responses/internalError" - r.Handle(VersionedPath("/build"), s.APIHandler(compat.BuildImage)).Methods(http.MethodPost) + r.Handle(VersionedPath("/build"), s.StreamBufferedAPIHandler(compat.BuildImage)).Methods(http.MethodPost) // Added non version path to URI to support docker non versioned paths - r.Handle("/build", s.APIHandler(compat.BuildImage)).Methods(http.MethodPost) + r.Handle("/build", s.StreamBufferedAPIHandler(compat.BuildImage)).Methods(http.MethodPost) /* libpod endpoints */ -- cgit v1.2.3-54-g00ecf