diff options
author | OpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com> | 2021-11-02 22:28:54 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-02 22:28:54 +0100 |
commit | 33643f4b093db36b73f1cbb6f6206be431869c3c (patch) | |
tree | d2920d76e16de16d950c34c8285b3fd3ae0ebeab | |
parent | 82dba9754745f7598619050af4212a20b21821ad (diff) | |
parent | 449cc7a5c217dbc1296f2f3fcc63d0b9b9826bec (diff) | |
download | podman-33643f4b093db36b73f1cbb6f6206be431869c3c.tar.gz podman-33643f4b093db36b73f1cbb6f6206be431869c3c.tar.bz2 podman-33643f4b093db36b73f1cbb6f6206be431869c3c.zip |
Merge pull request #12159 from jwhonce/issues/12115
Implement top streaming for containers and pods
-rw-r--r-- | pkg/api/handlers/compat/containers_top.go | 81 | ||||
-rw-r--r-- | pkg/api/handlers/libpod/pods.go | 82 | ||||
-rw-r--r-- | pkg/api/server/register_containers.go | 17 | ||||
-rw-r--r-- | pkg/api/server/register_pods.go | 15 | ||||
-rw-r--r-- | test/apiv2/40-pods.at | 4 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/test_v2_0_0_container.py | 103 |
6 files changed, 237 insertions, 65 deletions
diff --git a/pkg/api/handlers/compat/containers_top.go b/pkg/api/handlers/compat/containers_top.go index b5debd37d..545320ad9 100644 --- a/pkg/api/handlers/compat/containers_top.go +++ b/pkg/api/handlers/compat/containers_top.go @@ -1,8 +1,11 @@ package compat import ( + "encoding/json" + "fmt" "net/http" "strings" + "time" "github.com/containers/podman/v3/libpod" "github.com/containers/podman/v3/pkg/api/handlers" @@ -10,20 +13,24 @@ import ( api "github.com/containers/podman/v3/pkg/api/types" "github.com/gorilla/schema" "github.com/pkg/errors" + "github.com/sirupsen/logrus" ) func TopContainer(w http.ResponseWriter, r *http.Request) { runtime := r.Context().Value(api.RuntimeKey).(*libpod.Runtime) decoder := r.Context().Value(api.DecoderKey).(*schema.Decoder) - defaultValue := "-ef" + psArgs := "-ef" if utils.IsLibpodRequest(r) { - defaultValue = "" + psArgs = "" } query := struct { + Delay int `schema:"delay"` PsArgs string `schema:"ps_args"` + Stream bool `schema:"stream"` }{ - PsArgs: defaultValue, + Delay: 5, + PsArgs: psArgs, } if err := decoder.Decode(&query, r.URL.Query()); err != nil { utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest, @@ -31,6 +38,12 @@ func TopContainer(w http.ResponseWriter, r *http.Request) { return } + if query.Delay < 1 { + utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest, + fmt.Errorf("\"delay\" parameter of value %d < 1", query.Delay)) + return + } + name := utils.GetName(r) c, err := runtime.LookupContainer(name) if err != nil { @@ -38,26 +51,56 @@ func TopContainer(w http.ResponseWriter, r *http.Request) { return } - output, err := c.Top([]string{query.PsArgs}) - if err != nil { - utils.InternalServerError(w, err) - return + // We are committed now - all errors logged but not reported to client, ship has sailed + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + if f, ok := w.(http.Flusher); ok { + f.Flush() } - var body = handlers.ContainerTopOKBody{} - if len(output) > 0 { - body.Titles = strings.Split(output[0], "\t") - for i := range body.Titles { - body.Titles[i] = strings.TrimSpace(body.Titles[i]) - } + encoder := json.NewEncoder(w) + +loop: // break out of for/select infinite` loop + for { + select { + case <-r.Context().Done(): + break loop + default: + output, err := c.Top([]string{query.PsArgs}) + if err != nil { + logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err) + break loop + } + + if len(output) > 0 { + body := handlers.ContainerTopOKBody{} + body.Titles = strings.Split(output[0], "\t") + for i := range body.Titles { + body.Titles[i] = strings.TrimSpace(body.Titles[i]) + } + + for _, line := range output[1:] { + process := strings.Split(line, "\t") + for i := range process { + process[i] = strings.TrimSpace(process[i]) + } + body.Processes = append(body.Processes, process) + } + + if err := encoder.Encode(body); err != nil { + logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err) + break loop + } + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + } - for _, line := range output[1:] { - process := strings.Split(line, "\t") - for i := range process { - process[i] = strings.TrimSpace(process[i]) + if query.Stream { + time.Sleep(time.Duration(query.Delay) * time.Second) + } else { + break loop } - body.Processes = append(body.Processes, process) } } - utils.WriteJSON(w, http.StatusOK, body) } diff --git a/pkg/api/handlers/libpod/pods.go b/pkg/api/handlers/libpod/pods.go index 1e64de0ee..2ba292579 100644 --- a/pkg/api/handlers/libpod/pods.go +++ b/pkg/api/handlers/libpod/pods.go @@ -5,6 +5,7 @@ import ( "fmt" "net/http" "strings" + "time" "github.com/containers/common/pkg/config" "github.com/containers/podman/v3/libpod" @@ -363,10 +364,17 @@ func PodTop(w http.ResponseWriter, r *http.Request) { runtime := r.Context().Value(api.RuntimeKey).(*libpod.Runtime) decoder := r.Context().Value(api.DecoderKey).(*schema.Decoder) + psArgs := "-ef" + if utils.IsLibpodRequest(r) { + psArgs = "" + } query := struct { + Delay int `schema:"delay"` PsArgs string `schema:"ps_args"` + Stream bool `schema:"stream"` }{ - PsArgs: "", + Delay: 5, + PsArgs: psArgs, } if err := decoder.Decode(&query, r.URL.Query()); err != nil { utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest, @@ -374,31 +382,71 @@ func PodTop(w http.ResponseWriter, r *http.Request) { return } - name := utils.GetName(r) - pod, err := runtime.LookupPod(name) - if err != nil { - utils.PodNotFound(w, name, err) + if query.Delay < 1 { + utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest, + fmt.Errorf("\"delay\" parameter of value %d < 1", query.Delay)) return } - args := []string{} - if query.PsArgs != "" { - args = append(args, query.PsArgs) - } - output, err := pod.GetPodPidInformation(args) + name := utils.GetName(r) + pod, err := runtime.LookupPod(name) if err != nil { - utils.InternalServerError(w, err) + utils.PodNotFound(w, name, err) return } - var body = handlers.PodTopOKBody{} - if len(output) > 0 { - body.Titles = strings.Split(output[0], "\t") - for _, line := range output[1:] { - body.Processes = append(body.Processes, strings.Split(line, "\t")) + // We are committed now - all errors logged but not reported to client, ship has sailed + w.WriteHeader(http.StatusOK) + w.Header().Set("Content-Type", "application/json") + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + + encoder := json.NewEncoder(w) + +loop: // break out of for/select infinite` loop + for { + select { + case <-r.Context().Done(): + break loop + default: + output, err := pod.GetPodPidInformation([]string{query.PsArgs}) + if err != nil { + logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err) + break loop + } + + if len(output) > 0 { + var body = handlers.PodTopOKBody{} + body.Titles = strings.Split(output[0], "\t") + for i := range body.Titles { + body.Titles[i] = strings.TrimSpace(body.Titles[i]) + } + + for _, line := range output[1:] { + process := strings.Split(line, "\t") + for i := range process { + process[i] = strings.TrimSpace(process[i]) + } + body.Processes = append(body.Processes, process) + } + + if err := encoder.Encode(body); err != nil { + logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err) + break loop + } + if f, ok := w.(http.Flusher); ok { + f.Flush() + } + } + + if query.Stream { + time.Sleep(time.Duration(query.Delay) * time.Second) + } else { + break loop + } } } - utils.WriteJSON(w, http.StatusOK, body) } func PodKill(w http.ResponseWriter, r *http.Request) { diff --git a/pkg/api/server/register_containers.go b/pkg/api/server/register_containers.go index 8dcea1301..c4919182b 100644 --- a/pkg/api/server/register_containers.go +++ b/pkg/api/server/register_containers.go @@ -442,6 +442,7 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error { // - in: query // name: ps_args // type: string + // default: -ef // description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used. // produces: // - application/json @@ -1142,19 +1143,23 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error { // name: name // type: string // required: true - // description: | - // Name of container to query for processes - // (As of version 1.xx) + // description: Name of container to query for processes (As of version 1.xx) // - in: query // name: stream // type: boolean - // default: true - // description: Stream the output + // description: when true, repeatedly stream the latest output (As of version 4.0) + // - in: query + // name: delay + // type: integer + // description: if streaming, delay in seconds between updates. Must be >1. (As of version 4.0) + // default: 5 // - in: query // name: ps_args // type: string // default: -ef - // description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used. + // description: | + // arguments to pass to ps such as aux. + // Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used. // produces: // - application/json // responses: diff --git a/pkg/api/server/register_pods.go b/pkg/api/server/register_pods.go index de3669a0a..16a7bbb4c 100644 --- a/pkg/api/server/register_pods.go +++ b/pkg/api/server/register_pods.go @@ -296,18 +296,23 @@ func (s *APIServer) registerPodsHandlers(r *mux.Router) error { // name: name // type: string // required: true - // description: | - // Name of pod to query for processes + // description: Name of pod to query for processes // - in: query // name: stream // type: boolean - // default: true - // description: Stream the output + // description: when true, repeatedly stream the latest output (As of version 4.0) + // - in: query + // name: delay + // type: integer + // description: if streaming, delay in seconds between updates. Must be >1. (As of version 4.0) + // default: 5 // - in: query // name: ps_args // type: string // default: -ef - // description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used. + // description: | + // arguments to pass to ps such as aux. + // Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used. // responses: // 200: // $ref: "#/responses/DocsPodTopResponse" diff --git a/test/apiv2/40-pods.at b/test/apiv2/40-pods.at index 985b26411..f45e85f61 100644 --- a/test/apiv2/40-pods.at +++ b/test/apiv2/40-pods.at @@ -110,11 +110,11 @@ t GET libpod/pods/fakename/top 404 \ .cause="no such pod" t GET libpod/pods/foo/top 200 \ - .Processes[0][-1]="/pause " \ + .Processes[0][-1]="/pause" \ .Titles[-1]="COMMAND" t GET libpod/pods/foo/top?ps_args=args,pid 200 \ - .Processes[0][0]="/pause " \ + .Processes[0][0]="/pause" \ .Processes[0][1]="1" \ .Titles[0]="COMMAND" \ .Titles[1]="PID" \ diff --git a/test/apiv2/python/rest_api/test_v2_0_0_container.py b/test/apiv2/python/rest_api/test_v2_0_0_container.py index 853e9da88..101044bbb 100644 --- a/test/apiv2/python/rest_api/test_v2_0_0_container.py +++ b/test/apiv2/python/rest_api/test_v2_0_0_container.py @@ -1,8 +1,11 @@ +import multiprocessing +import queue import random +import threading import unittest -import json import requests +import time from dateutil.parser import parse from .fixtures import APITestCase @@ -16,7 +19,10 @@ class ContainerTestCase(APITestCase): self.assertEqual(len(obj), 1) def test_list_filters(self): - r = requests.get(self.podman_url + "/v1.40/containers/json?filters%3D%7B%22status%22%3A%5B%22running%22%5D%7D") + r = requests.get( + self.podman_url + + "/v1.40/containers/json?filters%3D%7B%22status%22%3A%5B%22running%22%5D%7D" + ) self.assertEqual(r.status_code, 200, r.text) payload = r.json() containerAmnt = len(payload) @@ -33,18 +39,18 @@ class ContainerTestCase(APITestCase): self.assertId(r.content) _ = parse(r.json()["Created"]) - r = requests.post( self.podman_url + "/v1.40/containers/create?name=topcontainer", - json={"Cmd": ["top"], - "Image": "alpine:latest", - "Healthcheck": { - "Test": ["CMD", "pidof", "top"], - "Interval": 5000000000, - "Timeout": 2000000000, - "Retries": 3, - "StartPeriod": 5000000000 - } + json={ + "Cmd": ["top"], + "Image": "alpine:latest", + "Healthcheck": { + "Test": ["CMD", "pidof", "top"], + "Interval": 5000000000, + "Timeout": 2000000000, + "Retries": 3, + "StartPeriod": 5000000000, + }, }, ) self.assertEqual(r.status_code, 201, r.text) @@ -67,7 +73,7 @@ class ContainerTestCase(APITestCase): self.assertEqual(r.status_code, 200, r.text) self.assertId(r.content) out = r.json() - hc = out["Config"]["Healthcheck"]["Test"] + hc = out["Config"]["Healthcheck"]["Test"] self.assertListEqual(["CMD", "pidof", "top"], hc) r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/start") @@ -84,7 +90,9 @@ class ContainerTestCase(APITestCase): self.assertIn(r.status_code, (200, 409), r.text) if r.status_code == 200: self.assertId(r.content) - r = requests.get(self.uri(self.resolve_container("/containers/{}/stats?stream=false&one-shot=true"))) + r = requests.get( + self.uri(self.resolve_container("/containers/{}/stats?stream=false&one-shot=true")) + ) self.assertIn(r.status_code, (200, 409), r.text) if r.status_code == 200: self.assertId(r.content) @@ -136,9 +144,15 @@ class ContainerTestCase(APITestCase): payload = r.json() container_id = payload["Id"] self.assertIsNotNone(container_id) - r = requests.get(self.podman_url + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=0") + r = requests.get( + self.podman_url + + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=0" + ) self.assertEqual(r.status_code, 200, r.text) - r = requests.get(self.podman_url + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=1") + r = requests.get( + self.podman_url + + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=1" + ) self.assertEqual(r.status_code, 200, r.text) def test_commit(self): @@ -257,6 +271,63 @@ class ContainerTestCase(APITestCase): r = requests.delete(self.podman_url + f"/v1.40/containers/{container_id}") self.assertEqual(r.status_code, 204, r.text) + def test_top_no_stream(self): + uri = self.uri(self.resolve_container("/containers/{}/top")) + q = queue.Queue() + + def _impl(fifo): + fifo.put(requests.get(uri, params={"stream": False}, timeout=2)) + + top = threading.Thread(target=_impl, args=(q,)) + top.start() + time.sleep(2) + self.assertFalse(top.is_alive(), f"GET {uri} failed to return in 2s") + + qr = q.get(False) + self.assertEqual(qr.status_code, 200, qr.text) + + qr.close() + top.join() + + def test_top_stream(self): + uri = self.uri(self.resolve_container("/containers/{}/top")) + q = queue.Queue() + + stop_thread = False + + def _impl(fifo, stop): + try: + with requests.get(uri, params={"stream": True, "delay": 1}, stream=True) as r: + r.raise_for_status() + fifo.put(r) + for buf in r.iter_lines(chunk_size=None): + if stop(): + break + fifo.put(buf) + except Exception: + pass + + top = threading.Thread(target=_impl, args=(q, (lambda: stop_thread))) + top.start() + time.sleep(4) + self.assertTrue(top.is_alive(), f"GET {uri} exited too soon") + stop_thread = True + + for _ in range(10): + try: + qr = q.get_nowait() + if qr is not None: + self.assertEqual(qr.status_code, 200) + qr.close() + break + except queue.Empty: + pass + finally: + time.sleep(1) + else: + self.fail("Server failed to respond in 10s") + top.join() + if __name__ == "__main__": unittest.main() |