From 449cc7a5c217dbc1296f2f3fcc63d0b9b9826bec Mon Sep 17 00:00:00 2001 From: Jhon Honce Date: Mon, 1 Nov 2021 14:17:30 -0700 Subject: Implement top streaming for containers and pods * Implement API query parameter stream and delay for containers and pods top endpoints * Update swagger with breaking changes * Add python API tests for endpoints Fixes #12115 Signed-off-by: Jhon Honce --- .../apiv2/python/rest_api/test_v2_0_0_container.py | 103 +++++++++++++++++---- 1 file changed, 87 insertions(+), 16 deletions(-) (limited to 'test/apiv2/python') diff --git a/test/apiv2/python/rest_api/test_v2_0_0_container.py b/test/apiv2/python/rest_api/test_v2_0_0_container.py index 853e9da88..101044bbb 100644 --- a/test/apiv2/python/rest_api/test_v2_0_0_container.py +++ b/test/apiv2/python/rest_api/test_v2_0_0_container.py @@ -1,8 +1,11 @@ +import multiprocessing +import queue import random +import threading import unittest -import json import requests +import time from dateutil.parser import parse from .fixtures import APITestCase @@ -16,7 +19,10 @@ class ContainerTestCase(APITestCase): self.assertEqual(len(obj), 1) def test_list_filters(self): - r = requests.get(self.podman_url + "/v1.40/containers/json?filters%3D%7B%22status%22%3A%5B%22running%22%5D%7D") + r = requests.get( + self.podman_url + + "/v1.40/containers/json?filters%3D%7B%22status%22%3A%5B%22running%22%5D%7D" + ) self.assertEqual(r.status_code, 200, r.text) payload = r.json() containerAmnt = len(payload) @@ -33,18 +39,18 @@ class ContainerTestCase(APITestCase): self.assertId(r.content) _ = parse(r.json()["Created"]) - r = requests.post( self.podman_url + "/v1.40/containers/create?name=topcontainer", - json={"Cmd": ["top"], - "Image": "alpine:latest", - "Healthcheck": { - "Test": ["CMD", "pidof", "top"], - "Interval": 5000000000, - "Timeout": 2000000000, - "Retries": 3, - "StartPeriod": 5000000000 - } + json={ + "Cmd": ["top"], + "Image": "alpine:latest", + "Healthcheck": { + "Test": ["CMD", "pidof", "top"], + "Interval": 5000000000, + "Timeout": 2000000000, + "Retries": 3, + "StartPeriod": 5000000000, + }, }, ) self.assertEqual(r.status_code, 201, r.text) @@ -67,7 +73,7 @@ class ContainerTestCase(APITestCase): self.assertEqual(r.status_code, 200, r.text) self.assertId(r.content) out = r.json() - hc = out["Config"]["Healthcheck"]["Test"] + hc = out["Config"]["Healthcheck"]["Test"] self.assertListEqual(["CMD", "pidof", "top"], hc) r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/start") @@ -84,7 +90,9 @@ class ContainerTestCase(APITestCase): self.assertIn(r.status_code, (200, 409), r.text) if r.status_code == 200: self.assertId(r.content) - r = requests.get(self.uri(self.resolve_container("/containers/{}/stats?stream=false&one-shot=true"))) + r = requests.get( + self.uri(self.resolve_container("/containers/{}/stats?stream=false&one-shot=true")) + ) self.assertIn(r.status_code, (200, 409), r.text) if r.status_code == 200: self.assertId(r.content) @@ -136,9 +144,15 @@ class ContainerTestCase(APITestCase): payload = r.json() container_id = payload["Id"] self.assertIsNotNone(container_id) - r = requests.get(self.podman_url + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=0") + r = requests.get( + self.podman_url + + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=0" + ) self.assertEqual(r.status_code, 200, r.text) - r = requests.get(self.podman_url + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=1") + r = requests.get( + self.podman_url + + f"/v1.40/containers/{payload['Id']}/logs?follow=false&stdout=true&until=1" + ) self.assertEqual(r.status_code, 200, r.text) def test_commit(self): @@ -257,6 +271,63 @@ class ContainerTestCase(APITestCase): r = requests.delete(self.podman_url + f"/v1.40/containers/{container_id}") self.assertEqual(r.status_code, 204, r.text) + def test_top_no_stream(self): + uri = self.uri(self.resolve_container("/containers/{}/top")) + q = queue.Queue() + + def _impl(fifo): + fifo.put(requests.get(uri, params={"stream": False}, timeout=2)) + + top = threading.Thread(target=_impl, args=(q,)) + top.start() + time.sleep(2) + self.assertFalse(top.is_alive(), f"GET {uri} failed to return in 2s") + + qr = q.get(False) + self.assertEqual(qr.status_code, 200, qr.text) + + qr.close() + top.join() + + def test_top_stream(self): + uri = self.uri(self.resolve_container("/containers/{}/top")) + q = queue.Queue() + + stop_thread = False + + def _impl(fifo, stop): + try: + with requests.get(uri, params={"stream": True, "delay": 1}, stream=True) as r: + r.raise_for_status() + fifo.put(r) + for buf in r.iter_lines(chunk_size=None): + if stop(): + break + fifo.put(buf) + except Exception: + pass + + top = threading.Thread(target=_impl, args=(q, (lambda: stop_thread))) + top.start() + time.sleep(4) + self.assertTrue(top.is_alive(), f"GET {uri} exited too soon") + stop_thread = True + + for _ in range(10): + try: + qr = q.get_nowait() + if qr is not None: + self.assertEqual(qr.status_code, 200) + qr.close() + break + except queue.Empty: + pass + finally: + time.sleep(1) + else: + self.fail("Server failed to respond in 10s") + top.join() + if __name__ == "__main__": unittest.main() -- cgit v1.2.3-54-g00ecf