aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOpenShift Merge Robot <openshift-merge-robot@users.noreply.github.com>2021-11-02 22:28:54 +0100
committerGitHub <noreply@github.com>2021-11-02 22:28:54 +0100
commit33643f4b093db36b73f1cbb6f6206be431869c3c (patch)
treed2920d76e16de16d950c34c8285b3fd3ae0ebeab
parent82dba9754745f7598619050af4212a20b21821ad (diff)
parent449cc7a5c217dbc1296f2f3fcc63d0b9b9826bec (diff)
downloadpodman-33643f4b093db36b73f1cbb6f6206be431869c3c.tar.gz
podman-33643f4b093db36b73f1cbb6f6206be431869c3c.tar.bz2
podman-33643f4b093db36b73f1cbb6f6206be431869c3c.zip
Merge pull request #12159 from jwhonce/issues/12115
Implement top streaming for containers and pods
-rw-r--r--pkg/api/handlers/compat/containers_top.go81
-rw-r--r--pkg/api/handlers/libpod/pods.go82
-rw-r--r--pkg/api/server/register_containers.go17
-rw-r--r--pkg/api/server/register_pods.go15
-rw-r--r--test/apiv2/40-pods.at4
-rw-r--r--test/apiv2/python/rest_api/test_v2_0_0_container.py103
6 files changed, 237 insertions, 65 deletions
diff --git a/pkg/api/handlers/compat/containers_top.go b/pkg/api/handlers/compat/containers_top.go
index b5debd37d..545320ad9 100644
--- a/pkg/api/handlers/compat/containers_top.go
+++ b/pkg/api/handlers/compat/containers_top.go
@@ -1,8 +1,11 @@
package compat
import (
+ "encoding/json"
+ "fmt"
"net/http"
"strings"
+ "time"
"github.com/containers/podman/v3/libpod"
"github.com/containers/podman/v3/pkg/api/handlers"
@@ -10,20 +13,24 @@ import (
api "github.com/containers/podman/v3/pkg/api/types"
"github.com/gorilla/schema"
"github.com/pkg/errors"
+ "github.com/sirupsen/logrus"
)
func TopContainer(w http.ResponseWriter, r *http.Request) {
runtime := r.Context().Value(api.RuntimeKey).(*libpod.Runtime)
decoder := r.Context().Value(api.DecoderKey).(*schema.Decoder)
- defaultValue := "-ef"
+ psArgs := "-ef"
if utils.IsLibpodRequest(r) {
- defaultValue = ""
+ psArgs = ""
}
query := struct {
+ Delay int `schema:"delay"`
PsArgs string `schema:"ps_args"`
+ Stream bool `schema:"stream"`
}{
- PsArgs: defaultValue,
+ Delay: 5,
+ PsArgs: psArgs,
}
if err := decoder.Decode(&query, r.URL.Query()); err != nil {
utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest,
@@ -31,6 +38,12 @@ func TopContainer(w http.ResponseWriter, r *http.Request) {
return
}
+ if query.Delay < 1 {
+ utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest,
+ fmt.Errorf("\"delay\" parameter of value %d < 1", query.Delay))
+ return
+ }
+
name := utils.GetName(r)
c, err := runtime.LookupContainer(name)
if err != nil {
@@ -38,26 +51,56 @@ func TopContainer(w http.ResponseWriter, r *http.Request) {
return
}
- output, err := c.Top([]string{query.PsArgs})
- if err != nil {
- utils.InternalServerError(w, err)
- return
+ // We are committed now - all errors logged but not reported to client, ship has sailed
+ w.WriteHeader(http.StatusOK)
+ w.Header().Set("Content-Type", "application/json")
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
}
- var body = handlers.ContainerTopOKBody{}
- if len(output) > 0 {
- body.Titles = strings.Split(output[0], "\t")
- for i := range body.Titles {
- body.Titles[i] = strings.TrimSpace(body.Titles[i])
- }
+ encoder := json.NewEncoder(w)
+
+loop: // break out of for/select infinite` loop
+ for {
+ select {
+ case <-r.Context().Done():
+ break loop
+ default:
+ output, err := c.Top([]string{query.PsArgs})
+ if err != nil {
+ logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err)
+ break loop
+ }
+
+ if len(output) > 0 {
+ body := handlers.ContainerTopOKBody{}
+ body.Titles = strings.Split(output[0], "\t")
+ for i := range body.Titles {
+ body.Titles[i] = strings.TrimSpace(body.Titles[i])
+ }
+
+ for _, line := range output[1:] {
+ process := strings.Split(line, "\t")
+ for i := range process {
+ process[i] = strings.TrimSpace(process[i])
+ }
+ body.Processes = append(body.Processes, process)
+ }
+
+ if err := encoder.Encode(body); err != nil {
+ logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err)
+ break loop
+ }
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
+ }
+ }
- for _, line := range output[1:] {
- process := strings.Split(line, "\t")
- for i := range process {
- process[i] = strings.TrimSpace(process[i])
+ if query.Stream {
+ time.Sleep(time.Duration(query.Delay) * time.Second)
+ } else {
+ break loop
}
- body.Processes = append(body.Processes, process)
}
}
- utils.WriteJSON(w, http.StatusOK, body)
}
diff --git a/pkg/api/handlers/libpod/pods.go b/pkg/api/handlers/libpod/pods.go
index 1e64de0ee..2ba292579 100644
--- a/pkg/api/handlers/libpod/pods.go
+++ b/pkg/api/handlers/libpod/pods.go
@@ -5,6 +5,7 @@ import (
"fmt"
"net/http"
"strings"
+ "time"
"github.com/containers/common/pkg/config"
"github.com/containers/podman/v3/libpod"
@@ -363,10 +364,17 @@ func PodTop(w http.ResponseWriter, r *http.Request) {
runtime := r.Context().Value(api.RuntimeKey).(*libpod.Runtime)
decoder := r.Context().Value(api.DecoderKey).(*schema.Decoder)
+ psArgs := "-ef"
+ if utils.IsLibpodRequest(r) {
+ psArgs = ""
+ }
query := struct {
+ Delay int `schema:"delay"`
PsArgs string `schema:"ps_args"`
+ Stream bool `schema:"stream"`
}{
- PsArgs: "",
+ Delay: 5,
+ PsArgs: psArgs,
}
if err := decoder.Decode(&query, r.URL.Query()); err != nil {
utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest,
@@ -374,31 +382,71 @@ func PodTop(w http.ResponseWriter, r *http.Request) {
return
}
- name := utils.GetName(r)
- pod, err := runtime.LookupPod(name)
- if err != nil {
- utils.PodNotFound(w, name, err)
+ if query.Delay < 1 {
+ utils.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest,
+ fmt.Errorf("\"delay\" parameter of value %d < 1", query.Delay))
return
}
- args := []string{}
- if query.PsArgs != "" {
- args = append(args, query.PsArgs)
- }
- output, err := pod.GetPodPidInformation(args)
+ name := utils.GetName(r)
+ pod, err := runtime.LookupPod(name)
if err != nil {
- utils.InternalServerError(w, err)
+ utils.PodNotFound(w, name, err)
return
}
- var body = handlers.PodTopOKBody{}
- if len(output) > 0 {
- body.Titles = strings.Split(output[0], "\t")
- for _, line := range output[1:] {
- body.Processes = append(body.Processes, strings.Split(line, "\t"))
+ // We are committed now - all errors logged but not reported to client, ship has sailed
+ w.WriteHeader(http.StatusOK)
+ w.Header().Set("Content-Type", "application/json")
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
+ }
+
+ encoder := json.NewEncoder(w)
+
+loop: // break out of for/select infinite` loop
+ for {
+ select {
+ case <-r.Context().Done():
+ break loop
+ default:
+ output, err := pod.GetPodPidInformation([]string{query.PsArgs})
+ if err != nil {
+ logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err)
+ break loop
+ }
+
+ if len(output) > 0 {
+ var body = handlers.PodTopOKBody{}
+ body.Titles = strings.Split(output[0], "\t")
+ for i := range body.Titles {
+ body.Titles[i] = strings.TrimSpace(body.Titles[i])
+ }
+
+ for _, line := range output[1:] {
+ process := strings.Split(line, "\t")
+ for i := range process {
+ process[i] = strings.TrimSpace(process[i])
+ }
+ body.Processes = append(body.Processes, process)
+ }
+
+ if err := encoder.Encode(body); err != nil {
+ logrus.Infof("Error from %s %q : %v", r.Method, r.URL, err)
+ break loop
+ }
+ if f, ok := w.(http.Flusher); ok {
+ f.Flush()
+ }
+ }
+
+ if query.Stream {
+ time.Sleep(time.Duration(query.Delay) * time.Second)
+ } else {
+ break loop
+ }
}
}
- utils.WriteJSON(w, http.StatusOK, body)
}
func PodKill(w http.ResponseWriter, r *http.Request) {
diff --git a/pkg/api/server/register_containers.go b/pkg/api/server/register_containers.go
index 8dcea1301..c4919182b 100644
--- a/pkg/api/server/register_containers.go
+++ b/pkg/api/server/register_containers.go
@@ -442,6 +442,7 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error {
// - in: query
// name: ps_args
// type: string
+ // default: -ef
// description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
// produces:
// - application/json
@@ -1142,19 +1143,23 @@ func (s *APIServer) registerContainersHandlers(r *mux.Router) error {
// name: name
// type: string
// required: true
- // description: |
- // Name of container to query for processes
- // (As of version 1.xx)
+ // description: Name of container to query for processes (As of version 1.xx)
// - in: query
// name: stream
// type: boolean
- // default: true
- // description: Stream the output
+ // description: when true, repeatedly stream the latest output (As of version 4.0)
+ // - in: query
+ // name: delay
+ // type: integer
+ // description: if streaming, delay in seconds between updates. Must be >1. (As of version 4.0)
+ // default: 5
// - in: query
// name: ps_args
// type: string
// default: -ef
- // description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
+ // description: |
+ // arguments to pass to ps such as aux.
+ // Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
// produces:
// - application/json
// responses:
diff --git a/pkg/api/server/register_pods.go b/pkg/api/server/register_pods.go
index de3669a0a..16a7bbb4c 100644
--- a/pkg/api/server/register_pods.go
+++ b/pkg/api/server/register_pods.go
@@ -296,18 +296,23 @@ func (s *APIServer) registerPodsHandlers(r *mux.Router) error {
// name: name
// type: string
// required: true
- // description: |
- // Name of pod to query for processes
+ // description: Name of pod to query for processes
// - in: query
// name: stream
// type: boolean
- // default: true
- // description: Stream the output
+ // description: when true, repeatedly stream the latest output (As of version 4.0)
+ // - in: query
+ // name: delay
+ // type: integer
+ // description: if streaming, delay in seconds between updates. Must be >1. (As of version 4.0)
+ // default: 5
// - in: query
// name: ps_args
// type: string
// default: -ef
- // description: arguments to pass to ps such as aux. Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
+ // description: |
+ // arguments to pass to ps such as aux.
+ // Requires ps(1) to be installed in the container if no ps(1) compatible AIX descriptors are used.
// responses:
// 200:
// $ref: "#/responses/DocsPodTopResponse"
diff --git a/test/apiv2/40-pods.at b/test/apiv2/40-pods.at
index 985b26411..f45e85f61 100644
--- a/test/apiv2/40-pods.at
+++ b/test/apiv2/40-pods.at
@@ -110,11 +110,11 @@ t GET libpod/pods/fakename/top 404 \
.cause="no such pod"
t GET libpod/pods/foo/top 200 \
- .Processes[0][-1]="/pause " \
+ .Processes[0][-1]="/pause" \
.Titles[-1]="COMMAND"
t GET libpod/pods/foo/top?ps_args=args,pid 200 \
- .Processes[0][0]="/pause " \
+ .Processes[0][0]="/pause" \
.Processes[0][1]="1" \
.Titles[0]="COMMAND" \
.Titles[1]="PID" \
diff --git a/test/apiv2/python/rest_api/test_v2_0_0_container.py b/test/apiv2/python/rest_api/test_v2_0_0_container.py
index 853e9da88..101044bbb 100644
--- a/test/apiv2/python/rest_api/test_v2_0_0_container.py
+++ b/test/apiv2/python/rest_api/test_v2_0_0_container.py
@@ -1,8 +1,11 @@
+import multiprocessing
+import queue
import random
+import threading
import unittest
-import json
import requests
+import time
from dateutil.parser import parse
from .fixtures import APITestCase
@@ -16,7 +19,10 @@ class ContainerTestCase(APITestCase):
self.assertEqual(len(obj), 1)
def test_list_filters(self):
- r = requests.get(self.podman_url + "/v1.40/containers/json?filters%3D%7B%22status%22%3A%5B%22running%22%5D%7D")
+ r = requests.get(
+ self.podman_url
+ + "/v1.40/containers/json?filters%3D%7B%22status%22%3A%5B%22running%22%5D%7D"
+ )
self.assertEqual(r.status_code, 200, r.text)
payload = r.json()
containerAmnt = len(payload)
@@ -33,18 +39,18 @@ class ContainerTestCase(APITestCase):
self.assertId(r.content)
_ = parse(r.json()["Created"])
-
r = requests.post(
self.podman_url + "/v1.40/containers/create?name=topcontainer",
- json={"Cmd": ["top"],
- "Image": "alpine:latest",
- "Healthcheck": {
- "Test": ["CMD", "pidof", "top"],
- "Interval": 5000000000,
- "Timeout": 2000000000,
- "Retries": 3,
- "StartPeriod": 5000000000
- }
+ json={
+ "Cmd": ["top"],
+ "Image": "alpine:latest",
+ "Healthcheck": {
+ "Test": ["CMD", "pidof", "top"],
+ "Interval": 5000000000,
+ "Timeout": 2000000000,
+ "Retries": 3,
+ "StartPeriod": 5000000000,
+ },
},
)
self.assertEqual(r.status_code, 201, r.text)
@@ -67,7 +73,7 @@ class ContainerTestCase(APITestCase):
self.assertEqual(r.status_code, 200, r.text)
self.assertId(r.content)
out = r.json()
- hc = out["Config"]["Healthcheck"]["Test"]
+ hc = out["Config"]["Healthcheck"]["Test"]
self.assertListEqual(["CMD", "pidof", "top"], hc)
r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/start")
@@ -84,7 +90,9 @@ class ContainerTestCase(APITestCase):
self.assertIn(r.status_code, (200, 409), r.text)
if r.status_code == 200:
self.assertId(r.content)
- r = requests.get(self.uri(self.resolve_container("/containers/{}/stats?stream=false&one-shot=true")))
+ r = requests.get(
+ self.uri(self.resolve_container("/containers/{}/stats?stream=false&one-shot=true"))
+ )
self.assertIn(r.status_code, (200, 409), r.text)
if r.status_code == 200:
self.assertId(r.content)
@@ -136,9 +144,15 @@ class ContainerTestCase(APITestCase):
payload = r.json()
container_id = payload["Id"]
self.assertIsNotNone(container_id)
- r = requests.get(self.podman_url + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=0")
+ r = requests.get(
+ self.podman_url
+ + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=0"
+ )
self.assertEqual(r.status_code, 200, r.text)
- r = requests.get(self.podman_url + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=1")
+ r = requests.get(
+ self.podman_url
+ + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=1"
+ )
self.assertEqual(r.status_code, 200, r.text)
def test_commit(self):
@@ -257,6 +271,63 @@ class ContainerTestCase(APITestCase):
r = requests.delete(self.podman_url + f"/v1.40/containers/{container_id}")
self.assertEqual(r.status_code, 204, r.text)
+ def test_top_no_stream(self):
+ uri = self.uri(self.resolve_container("/containers/{}/top"))
+ q = queue.Queue()
+
+ def _impl(fifo):
+ fifo.put(requests.get(uri, params={"stream": False}, timeout=2))
+
+ top = threading.Thread(target=_impl, args=(q,))
+ top.start()
+ time.sleep(2)
+ self.assertFalse(top.is_alive(), f"GET {uri} failed to return in 2s")
+
+ qr = q.get(False)
+ self.assertEqual(qr.status_code, 200, qr.text)
+
+ qr.close()
+ top.join()
+
+ def test_top_stream(self):
+ uri = self.uri(self.resolve_container("/containers/{}/top"))
+ q = queue.Queue()
+
+ stop_thread = False
+
+ def _impl(fifo, stop):
+ try:
+ with requests.get(uri, params={"stream": True, "delay": 1}, stream=True) as r:
+ r.raise_for_status()
+ fifo.put(r)
+ for buf in r.iter_lines(chunk_size=None):
+ if stop():
+ break
+ fifo.put(buf)
+ except Exception:
+ pass
+
+ top = threading.Thread(target=_impl, args=(q, (lambda: stop_thread)))
+ top.start()
+ time.sleep(4)
+ self.assertTrue(top.is_alive(), f"GET {uri} exited too soon")
+ stop_thread = True
+
+ for _ in range(10):
+ try:
+ qr = q.get_nowait()
+ if qr is not None:
+ self.assertEqual(qr.status_code, 200)
+ qr.close()
+ break
+ except queue.Empty:
+ pass
+ finally:
+ time.sleep(1)
+ else:
+ self.fail("Server failed to respond in 10s")
+ top.join()
+
if __name__ == "__main__":
unittest.main()