From 98955bedbcb3256d63f12716332000586eb6fe31 Mon Sep 17 00:00:00 2001 From: Jhon Honce Date: Mon, 17 May 2021 17:11:50 -0700 Subject: Break up python APIv2 tests * Tests broken up into areas of concern * Introduced fixtures to reduce duplicated code * Introduced new assert methods with APITestCase * General cleanup of code while visiting * Tests now targeting quay.io Known issues: * is-official against quay.io not working Fixes: #9238 Signed-off-by: Jhon Honce --- test/apiv2/python/__init__.py | 0 test/apiv2/python/rest_api/__init__.py | 0 test/apiv2/python/rest_api/fixtures/__init__.py | 3 + .../apiv2/python/rest_api/fixtures/api_testcase.py | 103 +++ test/apiv2/python/rest_api/fixtures/podman.py | 136 ++++ .../apiv2/python/rest_api/test_v2_0_0_container.py | 192 ++++++ test/apiv2/python/rest_api/test_v2_0_0_image.py | 165 +++++ test/apiv2/python/rest_api/test_v2_0_0_manifest.py | 14 + test/apiv2/python/rest_api/test_v2_0_0_network.py | 155 +++++ test/apiv2/python/rest_api/test_v2_0_0_pod.py | 65 ++ test/apiv2/python/rest_api/test_v2_0_0_system.py | 88 +++ test/apiv2/python/rest_api/test_v2_0_0_volume.py | 75 +++ test/apiv2/python/rest_api/v1_test_rest_v1_0_0.py | 238 +++++++ test/apiv2/rest_api/__init__.py | 136 ---- test/apiv2/rest_api/test_rest_v2_0_0.py | 744 --------------------- test/apiv2/rest_api/v1_test_rest_v1_0_0.py | 238 ------- 16 files changed, 1234 insertions(+), 1118 deletions(-) create mode 100644 test/apiv2/python/__init__.py create mode 100644 test/apiv2/python/rest_api/__init__.py create mode 100644 test/apiv2/python/rest_api/fixtures/__init__.py create mode 100644 test/apiv2/python/rest_api/fixtures/api_testcase.py create mode 100644 test/apiv2/python/rest_api/fixtures/podman.py create mode 100644 test/apiv2/python/rest_api/test_v2_0_0_container.py create mode 100644 test/apiv2/python/rest_api/test_v2_0_0_image.py create mode 100644 test/apiv2/python/rest_api/test_v2_0_0_manifest.py create mode 100644 test/apiv2/python/rest_api/test_v2_0_0_network.py create mode 100644 test/apiv2/python/rest_api/test_v2_0_0_pod.py create mode 100644 test/apiv2/python/rest_api/test_v2_0_0_system.py create mode 100644 test/apiv2/python/rest_api/test_v2_0_0_volume.py create mode 100644 test/apiv2/python/rest_api/v1_test_rest_v1_0_0.py delete mode 100644 test/apiv2/rest_api/__init__.py delete mode 100644 test/apiv2/rest_api/test_rest_v2_0_0.py delete mode 100644 test/apiv2/rest_api/v1_test_rest_v1_0_0.py (limited to 'test/apiv2') diff --git a/test/apiv2/python/__init__.py b/test/apiv2/python/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/test/apiv2/python/rest_api/__init__.py b/test/apiv2/python/rest_api/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/test/apiv2/python/rest_api/fixtures/__init__.py b/test/apiv2/python/rest_api/fixtures/__init__.py new file mode 100644 index 000000000..5d763e454 --- /dev/null +++ b/test/apiv2/python/rest_api/fixtures/__init__.py @@ -0,0 +1,3 @@ +from .api_testcase import APITestCase + +__all__ = ["APITestCase"] diff --git a/test/apiv2/python/rest_api/fixtures/api_testcase.py b/test/apiv2/python/rest_api/fixtures/api_testcase.py new file mode 100644 index 000000000..8b771774b --- /dev/null +++ b/test/apiv2/python/rest_api/fixtures/api_testcase.py @@ -0,0 +1,103 @@ +import json +import subprocess +import unittest + +import requests +import sys +import time + +from .podman import Podman + + +class APITestCase(unittest.TestCase): + PODMAN_URL = "http://localhost:8080" + podman = None # initialized podman configuration for tests + service = None # podman service instance + + @classmethod + def setUpClass(cls): + super().setUpClass() + + APITestCase.podman = Podman() + APITestCase.service = APITestCase.podman.open( + "system", "service", "tcp:localhost:8080", "--time=0" + ) + # give the service some time to be ready... + time.sleep(2) + + returncode = APITestCase.service.poll() + if returncode is not None: + raise subprocess.CalledProcessError(returncode, "podman system service") + + r = requests.post( + APITestCase.uri("/images/pull?reference=quay.io%2Flibpod%2Falpine%3Alatest") + ) + if r.status_code != 200: + raise subprocess.CalledProcessError( + r.status_code, f"podman images pull quay.io/libpod/alpine:latest {r.text}" + ) + + @classmethod + def tearDownClass(cls): + APITestCase.service.terminate() + stdout, stderr = APITestCase.service.communicate(timeout=0.5) + if stdout: + sys.stdout.write("\nService Stdout:\n" + stdout.decode("utf-8")) + if stderr: + sys.stderr.write("\nService Stderr:\n" + stderr.decode("utf-8")) + return super().tearDownClass() + + def setUp(self): + super().setUp() + APITestCase.podman.run("run", "alpine", "/bin/ls", check=True) + + def tearDown(self) -> None: + APITestCase.podman.run("pod", "rm", "--all", "--force", check=True) + APITestCase.podman.run("rm", "--all", "--force", check=True) + super().tearDown() + + @property + def podman_url(self): + return "http://localhost:8080" + + @staticmethod + def uri(path): + return APITestCase.PODMAN_URL + "/v2.0.0/libpod" + path + + def resolve_container(self, path): + """Find 'first' container and return 'Id' formatted into given URI path.""" + + try: + r = requests.get(self.uri("/containers/json?all=true")) + containers = r.json() + except Exception as e: + msg = f"Bad container response: {e}" + if r is not None: + msg += ": " + r.text + raise self.failureException(msg) + return path.format(containers[0]["Id"]) + + def assertContainerExists(self, member, msg=None): # pylint: disable=invalid-name + r = requests.get(self.uri(f"/containers/{member}/exists")) + if r.status_code == 404: + if msg is None: + msg = f"Container '{member}' does not exist." + self.failureException(msg) + + def assertContainerNotExists(self, member, msg=None): # pylint: disable=invalid-name + r = requests.get(self.uri(f"/containers/{member}/exists")) + if r.status_code == 204: + if msg is None: + msg = f"Container '{member}' exists." + self.failureException(msg) + + def assertId(self, content): # pylint: disable=invalid-name + objects = json.loads(content) + try: + if isinstance(objects, dict): + _ = objects["Id"] + else: + for item in objects: + _ = item["Id"] + except KeyError: + self.failureException("Failed in find 'Id' in return value.") diff --git a/test/apiv2/python/rest_api/fixtures/podman.py b/test/apiv2/python/rest_api/fixtures/podman.py new file mode 100644 index 000000000..bae04f87d --- /dev/null +++ b/test/apiv2/python/rest_api/fixtures/podman.py @@ -0,0 +1,136 @@ +import configparser +import json +import os +import shutil +import subprocess +import sys +import tempfile + + +class Podman: + """ + Instances hold the configuration and setup for running podman commands + """ + + def __init__(self): + """Initialize a Podman instance with global options""" + binary = os.getenv("PODMAN", "bin/podman") + self.cmd = [binary, "--storage-driver=vfs"] + + cgroupfs = os.getenv("CGROUP_MANAGER", "systemd") + self.cmd.append(f"--cgroup-manager={cgroupfs}") + + if os.getenv("DEBUG"): + self.cmd.append("--log-level=debug") + self.cmd.append("--syslog=true") + + self.anchor_directory = tempfile.mkdtemp(prefix="podman_restapi_") + self.cmd.append("--root=" + os.path.join(self.anchor_directory, "crio")) + self.cmd.append("--runroot=" + os.path.join(self.anchor_directory, "crio-run")) + + os.environ["CONTAINERS_REGISTRIES_CONF"] = os.path.join( + self.anchor_directory, "registry.conf" + ) + p = configparser.ConfigParser() + p.read_dict( + { + "registries.search": {"registries": "['quay.io']"}, + "registries.insecure": {"registries": "[]"}, + "registries.block": {"registries": "[]"}, + } + ) + with open(os.environ["CONTAINERS_REGISTRIES_CONF"], "w") as w: + p.write(w) + + os.environ["CNI_CONFIG_PATH"] = os.path.join(self.anchor_directory, "cni", "net.d") + os.makedirs(os.environ["CNI_CONFIG_PATH"], exist_ok=True) + self.cmd.append("--cni-config-dir=" + os.environ["CNI_CONFIG_PATH"]) + cni_cfg = os.path.join(os.environ["CNI_CONFIG_PATH"], "87-podman-bridge.conflist") + # json decoded and encoded to ensure legal json + buf = json.loads( + """ + { + "cniVersion": "0.3.0", + "name": "podman", + "plugins": [{ + "type": "bridge", + "bridge": "cni0", + "isGateway": true, + "ipMasq": true, + "ipam": { + "type": "host-local", + "subnet": "10.88.0.0/16", + "routes": [{ + "dst": "0.0.0.0/0" + }] + } + }, + { + "type": "portmap", + "capabilities": { + "portMappings": true + } + } + ] + } + """ + ) + with open(cni_cfg, "w") as w: + json.dump(buf, w) + + def open(self, command, *args, **kwargs): + """Podman initialized instance to run a given command + + :param self: Podman instance + :param command: podman sub-command to run + :param args: arguments and options for command + :param kwargs: See subprocess.Popen() for shell keyword + :return: subprocess.Popen() instance configured to run podman instance + """ + cmd = self.cmd.copy() + cmd.append(command) + cmd.extend(args) + + shell = kwargs.get("shell", False) + + return subprocess.Popen( + cmd, + shell=shell, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + def run(self, command, *args, **kwargs): + """Run given podman command + + :param self: Podman instance + :param command: podman sub-command to run + :param args: arguments and options for command + :param kwargs: See subprocess.Popen() for shell and check keywords + :return: subprocess.Popen() instance configured to run podman instance + """ + cmd = self.cmd.copy() + cmd.append(command) + cmd.extend(args) + + check = kwargs.get("check", False) + shell = kwargs.get("shell", False) + + try: + return subprocess.run( + cmd, + shell=shell, + check=check, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + except subprocess.CalledProcessError as e: + if e.stdout: + sys.stdout.write("\nRun Stdout:\n" + e.stdout.decode("utf-8")) + if e.stderr: + sys.stderr.write("\nRun Stderr:\n" + e.stderr.decode("utf-8")) + raise + + def tear_down(self): + shutil.rmtree(self.anchor_directory, ignore_errors=True) diff --git a/test/apiv2/python/rest_api/test_v2_0_0_container.py b/test/apiv2/python/rest_api/test_v2_0_0_container.py new file mode 100644 index 000000000..70c07d47f --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_container.py @@ -0,0 +1,192 @@ +import random +import unittest + +import requests +from dateutil.parser import parse + +from .fixtures import APITestCase + + +class ContainerTestCase(APITestCase): + def test_list(self): + r = requests.get(self.uri("/containers/json"), timeout=5) + self.assertEqual(r.status_code, 200, r.text) + obj = r.json() + self.assertEqual(len(obj), 0) + + def test_list_all(self): + r = requests.get(self.uri("/containers/json?all=true")) + self.assertEqual(r.status_code, 200, r.text) + self.assertId(r.content) + + def test_inspect(self): + r = requests.get(self.uri(self.resolve_container("/containers/{}/json"))) + self.assertEqual(r.status_code, 200, r.text) + self.assertId(r.content) + _ = parse(r.json()["Created"]) + + def test_stats(self): + r = requests.get(self.uri(self.resolve_container("/containers/{}/stats?stream=false"))) + self.assertIn(r.status_code, (200, 409), r.text) + if r.status_code == 200: + self.assertId(r.content) + + def test_delete(self): + r = requests.delete(self.uri(self.resolve_container("/containers/{}"))) + self.assertEqual(r.status_code, 204, r.text) + + def test_stop(self): + r = requests.post(self.uri(self.resolve_container("/containers/{}/start"))) + self.assertIn(r.status_code, (204, 304), r.text) + + r = requests.post(self.uri(self.resolve_container("/containers/{}/stop"))) + self.assertIn(r.status_code, (204, 304), r.text) + + def test_start(self): + r = requests.post(self.uri(self.resolve_container("/containers/{}/stop"))) + self.assertIn(r.status_code, (204, 304), r.text) + + r = requests.post(self.uri(self.resolve_container("/containers/{}/start"))) + self.assertIn(r.status_code, (204, 304), r.text) + + def test_restart(self): + r = requests.post(self.uri(self.resolve_container("/containers/{}/start"))) + self.assertIn(r.status_code, (204, 304), r.text) + + r = requests.post(self.uri(self.resolve_container("/containers/{}/restart")), timeout=5) + self.assertEqual(r.status_code, 204, r.text) + + def test_resize(self): + r = requests.post(self.uri(self.resolve_container("/containers/{}/resize?h=43&w=80"))) + self.assertIn(r.status_code, (200, 409), r.text) + if r.status_code == 200: + self.assertEqual(r.text, "", r.text) + + def test_attach(self): + self.skipTest("FIXME: Test timeouts") + r = requests.post(self.uri(self.resolve_container("/containers/{}/attach")), timeout=5) + self.assertIn(r.status_code, (101, 500), r.text) + + def test_logs(self): + r = requests.get(self.uri(self.resolve_container("/containers/{}/logs?stdout=true"))) + self.assertEqual(r.status_code, 200, r.text) + + def test_commit(self): + r = requests.post(self.uri(self.resolve_container("/commit?container={}"))) + self.assertEqual(r.status_code, 200, r.text) + self.assertId(r.content) + + obj = r.json() + self.assertIsInstance(obj, dict) + + def test_prune(self): + name = f"Container_{random.getrandbits(160):x}" + + r = requests.post( + self.podman_url + f"/v1.40/containers/create?name={name}", + json={ + "Cmd": ["cp", "/etc/motd", "/motd.size_test"], + "Image": "alpine:latest", + "NetworkDisabled": True, + }, + ) + self.assertEqual(r.status_code, 201, r.text) + create = r.json() + + r = requests.post(self.podman_url + f"/v1.40/containers/{create['Id']}/start") + self.assertEqual(r.status_code, 204, r.text) + + r = requests.post(self.podman_url + f"/v1.40/containers/{create['Id']}/wait") + self.assertEqual(r.status_code, 200, r.text) + wait = r.json() + self.assertEqual(wait["StatusCode"], 0, wait["Error"]["Message"]) + + prune = requests.post(self.podman_url + "/v1.40/containers/prune") + self.assertEqual(prune.status_code, 200, prune.status_code) + prune_payload = prune.json() + self.assertGreater(prune_payload["SpaceReclaimed"], 0) + self.assertIn(create["Id"], prune_payload["ContainersDeleted"]) + + # Delete any orphaned containers + r = requests.get(self.podman_url + "/v1.40/containers/json?all=true") + self.assertEqual(r.status_code, 200, r.text) + for self.resolve_container in r.json(): + requests.delete( + self.podman_url + f"/v1.40/containers/{self.resolve_container['Id']}?force=true" + ) + + # Image prune here tied to containers freeing up + prune = requests.post(self.podman_url + "/v1.40/images/prune") + self.assertEqual(prune.status_code, 200, prune.text) + prune_payload = prune.json() + self.assertGreater(prune_payload["SpaceReclaimed"], 0) + + # FIXME need method to determine which image is going to be "pruned" to fix test + # TODO should handler be recursive when deleting images? + # self.assertIn(img["Id"], prune_payload["ImagesDeleted"][1]["Deleted"]) + + # FIXME (@vrothberg): I commented this line out during the `libimage` migration. + # It doesn't make sense to report anything to be deleted if the reclaimed space + # is zero. I think the test needs some rewrite. + # self.assertIsNotNone(prune_payload["ImagesDeleted"][1]["Deleted"]) + + def test_status(self): + r = requests.post( + self.podman_url + "/v1.40/containers/create?name=topcontainer", + json={"Cmd": ["top"], "Image": "alpine:latest"}, + ) + self.assertEqual(r.status_code, 201, r.text) + payload = r.json() + container_id = payload["Id"] + self.assertIsNotNone(container_id) + + r = requests.get( + self.podman_url + "/v1.40/containers/json", + params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, + ) + self.assertEqual(r.status_code, 200, r.text) + payload = r.json() + self.assertEqual(payload[0]["Status"], "Created") + + r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/start") + self.assertEqual(r.status_code, 204, r.text) + + r = requests.get( + self.podman_url + "/v1.40/containers/json", + params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, + ) + self.assertEqual(r.status_code, 200, r.text) + payload = r.json() + self.assertTrue(str(payload[0]["Status"]).startswith("Up")) + + r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/pause") + self.assertEqual(r.status_code, 204, r.text) + + r = requests.get( + self.podman_url + "/v1.40/containers/json", + params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, + ) + self.assertEqual(r.status_code, 200, r.text) + payload = r.json() + self.assertTrue(str(payload[0]["Status"]).startswith("Up")) + self.assertTrue(str(payload[0]["Status"]).endswith("(Paused)")) + + r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/unpause") + self.assertEqual(r.status_code, 204, r.text) + r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/stop") + self.assertEqual(r.status_code, 204, r.text) + + r = requests.get( + self.podman_url + "/v1.40/containers/json", + params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, + ) + self.assertEqual(r.status_code, 200, r.text) + payload = r.json() + self.assertTrue(str(payload[0]["Status"]).startswith("Exited")) + + r = requests.delete(self.podman_url + f"/v1.40/containers/{container_id}") + self.assertEqual(r.status_code, 204, r.text) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/test_v2_0_0_image.py b/test/apiv2/python/rest_api/test_v2_0_0_image.py new file mode 100644 index 000000000..99f513608 --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_image.py @@ -0,0 +1,165 @@ +import json +import unittest +from multiprocessing import Process + +import requests +from dateutil.parser import parse +from .fixtures import APITestCase + + +class ImageTestCase(APITestCase): + def test_list(self): + r = requests.get(self.podman_url + "/v1.40/images/json") + self.assertEqual(r.status_code, 200, r.text) + + # See https://docs.docker.com/engine/api/v1.40/#operation/ImageList + required_keys = ( + "Id", + "ParentId", + "RepoTags", + "RepoDigests", + "Created", + "Size", + "SharedSize", + "VirtualSize", + "Labels", + "Containers", + ) + images = r.json() + self.assertIsInstance(images, list) + for item in images: + self.assertIsInstance(item, dict) + for k in required_keys: + self.assertIn(k, item) + + def test_inspect(self): + r = requests.get(self.podman_url + "/v1.40/images/alpine/json") + self.assertEqual(r.status_code, 200, r.text) + + # See https://docs.docker.com/engine/api/v1.40/#operation/ImageInspect + required_keys = ( + "Id", + "Parent", + "Comment", + "Created", + "Container", + "DockerVersion", + "Author", + "Architecture", + "Os", + "Size", + "VirtualSize", + "GraphDriver", + "RootFS", + "Metadata", + ) + + image = r.json() + self.assertIsInstance(image, dict) + for item in required_keys: + self.assertIn(item, image) + _ = parse(image["Created"]) + + def test_delete(self): + r = requests.delete(self.podman_url + "/v1.40/images/alpine?force=true") + self.assertEqual(r.status_code, 200, r.text) + self.assertIsInstance(r.json(), list) + + def test_pull(self): + r = requests.post(self.uri("/images/pull?reference=alpine"), timeout=15) + self.assertEqual(r.status_code, 200, r.status_code) + text = r.text + keys = { + "error": False, + "id": False, + "images": False, + "stream": False, + } + # Read and record stanza's from pull + for line in str.splitlines(text): + obj = json.loads(line) + key_list = list(obj.keys()) + for k in key_list: + keys[k] = True + + self.assertFalse(keys["error"], "Expected no errors") + self.assertTrue(keys["id"], "Expected to find id stanza") + self.assertTrue(keys["images"], "Expected to find images stanza") + self.assertTrue(keys["stream"], "Expected to find stream progress stanza's") + + def test_search_compat(self): + url = self.podman_url + "/v1.40/images/search" + + # Had issues with this test hanging when repositories not happy + def do_search1(): + payload = {"term": "alpine"} + r = requests.get(url, params=payload, timeout=5) + self.assertEqual(r.status_code, 200, f"#1: {r.text}") + self.assertIsInstance(r.json(), list) + + def do_search2(): + payload = {"term": "alpine", "limit": 1} + r = requests.get(url, params=payload, timeout=5) + self.assertEqual(r.status_code, 200, f"#2: {r.text}") + + results = r.json() + self.assertIsInstance(results, list) + self.assertEqual(len(results), 1) + + def do_search3(): + # FIXME: Research if quay.io supports is-official and which image is "official" + return + payload = {"term": "thanos", "filters": '{"is-official":["true"]}'} + r = requests.get(url, params=payload, timeout=5) + self.assertEqual(r.status_code, 200, f"#3: {r.text}") + + results = r.json() + self.assertIsInstance(results, list) + + # There should be only one official image + self.assertEqual(len(results), 1) + + def do_search4(): + headers = {"X-Registry-Auth": "null"} + payload = {"term": "alpine"} + r = requests.get(url, params=payload, headers=headers, timeout=5) + self.assertEqual(r.status_code, 200, f"#4: {r.text}") + + def do_search5(): + headers = {"X-Registry-Auth": "invalid value"} + payload = {"term": "alpine"} + r = requests.get(url, params=payload, headers=headers, timeout=5) + self.assertEqual(r.status_code, 400, f"#5: {r.text}") + + i = 1 + for fn in [do_search1, do_search2, do_search3, do_search4, do_search5]: + with self.subTest(i=i): + search = Process(target=fn) + search.start() + search.join(timeout=10) + self.assertFalse(search.is_alive(), f"#{i} /images/search took too long") + + # search_methods = [do_search1, do_search2, do_search3, do_search4, do_search5] + # for search_method in search_methods: + # search = Process(target=search_method) + # search.start() + # search.join(timeout=10) + # self.assertFalse(search.is_alive(), "/images/search took too long") + + def test_history(self): + r = requests.get(self.podman_url + "/v1.40/images/alpine/history") + self.assertEqual(r.status_code, 200, r.text) + + # See https://docs.docker.com/engine/api/v1.40/#operation/ImageHistory + required_keys = ("Id", "Created", "CreatedBy", "Tags", "Size", "Comment") + + changes = r.json() + self.assertIsInstance(changes, list) + for change in changes: + self.assertIsInstance(change, dict) + for k in required_keys: + self.assertIn(k, change) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/test_v2_0_0_manifest.py b/test/apiv2/python/rest_api/test_v2_0_0_manifest.py new file mode 100644 index 000000000..c28c63bcb --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_manifest.py @@ -0,0 +1,14 @@ +import unittest + +import requests +from .fixtures import APITestCase + + +class ManifestTestCase(APITestCase): + def test_manifest_409(self): + r = requests.post(self.uri("/manifests/create"), params={"name": "ThisIsAnInvalidImage"}) + self.assertEqual(r.status_code, 400, r.text) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/test_v2_0_0_network.py b/test/apiv2/python/rest_api/test_v2_0_0_network.py new file mode 100644 index 000000000..3888123fb --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_network.py @@ -0,0 +1,155 @@ +import random +import unittest + +import requests + +from .fixtures import APITestCase + + +class NetworkTestCase(APITestCase): + # TODO Need to support Docker-py order of network/container creates + def test_connect(self): + """Create network and container then connect to network""" + net_default = requests.post( + self.podman_url + "/v1.40/networks/create", json={"Name": "TestDefaultNetwork"} + ) + self.assertEqual(net_default.status_code, 201, net_default.text) + + create = requests.post( + self.podman_url + "/v1.40/containers/create?name=postCreateConnect", + json={ + "Cmd": ["top"], + "Image": "alpine:latest", + "NetworkDisabled": False, + # FIXME adding these 2 lines cause: (This is sampled from docker-py) + # "network already exists","message":"container + # 01306e499df5441560d70071a54342611e422a94de20865add50a9565fd79fb9 is already connected to CNI + # network \"TestDefaultNetwork\": network already exists" + # "HostConfig": {"NetworkMode": "TestDefaultNetwork"}, + # "NetworkingConfig": {"EndpointsConfig": {"TestDefaultNetwork": None}}, + # FIXME These two lines cause: + # CNI network \"TestNetwork\" not found","message":"error configuring network namespace for container + # 369ddfa7d3211ebf1fbd5ddbff91bd33fa948858cea2985c133d6b6507546dff: CNI network \"TestNetwork\" not + # found" + # "HostConfig": {"NetworkMode": "TestNetwork"}, + # "NetworkingConfig": {"EndpointsConfig": {"TestNetwork": None}}, + # FIXME no networking defined cause: (note this error is from the container inspect below) + # "internal libpod error","message":"network inspection mismatch: asked to join 2 CNI network(s) [ + # TestDefaultNetwork podman], but have information on 1 network(s): internal libpod error" + }, + ) + self.assertEqual(create.status_code, 201, create.text) + self.assertId(create.content) + + payload = create.json() + start = requests.post(self.podman_url + f"/v1.40/containers/{payload['Id']}/start") + self.assertEqual(start.status_code, 204, start.text) + + connect = requests.post( + self.podman_url + "/v1.40/networks/TestDefaultNetwork/connect", + json={"Container": payload["Id"]}, + ) + self.assertEqual(connect.status_code, 200, connect.text) + self.assertEqual(connect.text, "OK\n") + + inspect = requests.get(f"{self.podman_url}/v1.40/containers/{payload['Id']}/json") + self.assertEqual(inspect.status_code, 200, inspect.text) + + payload = inspect.json() + self.assertFalse(payload["Config"].get("NetworkDisabled", False)) + + self.assertEqual( + "TestDefaultNetwork", + payload["NetworkSettings"]["Networks"]["TestDefaultNetwork"]["NetworkID"], + ) + # TODO restore this to test, when joining multiple networks possible + # self.assertEqual( + # "TestNetwork", + # payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"], + # ) + # TODO Need to support network aliases + # self.assertIn( + # "test_post_create", + # payload["NetworkSettings"]["Networks"]["TestNetwork"]["Aliases"], + # ) + + def test_create(self): + """Create network and connect container during create""" + net = requests.post( + self.podman_url + "/v1.40/networks/create", json={"Name": "TestNetwork"} + ) + self.assertEqual(net.status_code, 201, net.text) + + create = requests.post( + self.podman_url + "/v1.40/containers/create?name=postCreate", + json={ + "Cmd": ["date"], + "Image": "alpine:latest", + "NetworkDisabled": False, + "HostConfig": {"NetworkMode": "TestNetwork"}, + }, + ) + self.assertEqual(create.status_code, 201, create.text) + self.assertId(create.content) + + payload = create.json() + inspect = requests.get(f"{self.podman_url}/v1.40/containers/{payload['Id']}/json") + self.assertEqual(inspect.status_code, 200, inspect.text) + + payload = inspect.json() + self.assertFalse(payload["Config"].get("NetworkDisabled", False)) + self.assertEqual( + "TestNetwork", + payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"], + ) + + def test_crud(self): + name = f"Network_{random.getrandbits(160):x}" + + # Cannot test for 0 existing networks because default "podman" network always exists + + create = requests.post(self.podman_url + "/v1.40/networks/create", json={"Name": name}) + self.assertEqual(create.status_code, 201, create.text) + self.assertId(create.content) + + net = create.json() + self.assertIsInstance(net, dict) + self.assertNotEqual(net["Id"], name) + ident = net["Id"] + + ls = requests.get(self.podman_url + "/v1.40/networks") + self.assertEqual(ls.status_code, 200, ls.text) + + networks = ls.json() + self.assertIsInstance(networks, list) + + found = False + for net in networks: + if net["Name"] == name: + found = True + break + self.assertTrue(found, f"Network '{name}' not found") + + inspect = requests.get(self.podman_url + f"/v1.40/networks/{ident}") + self.assertEqual(inspect.status_code, 200, inspect.text) + self.assertIsInstance(inspect.json(), dict) + + inspect = requests.delete(self.podman_url + f"/v1.40/networks/{ident}") + self.assertEqual(inspect.status_code, 204, inspect.text) + inspect = requests.get(self.podman_url + f"/v1.40/networks/{ident}") + self.assertEqual(inspect.status_code, 404, inspect.text) + + # network prune + prune_name = f"Network_{random.getrandbits(160):x}" + prune_create = requests.post( + self.podman_url + "/v1.40/networks/create", json={"Name": prune_name} + ) + self.assertEqual(create.status_code, 201, prune_create.text) + + prune = requests.post(self.podman_url + "/v1.40/networks/prune") + self.assertEqual(prune.status_code, 200, prune.text) + self.assertTrue(prune_name in prune.json()["NetworksDeleted"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/test_v2_0_0_pod.py b/test/apiv2/python/rest_api/test_v2_0_0_pod.py new file mode 100644 index 000000000..9155ad19c --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_pod.py @@ -0,0 +1,65 @@ +import random +import unittest + +import requests +from .fixtures import APITestCase + + +class TestApi(APITestCase): + def test_pod_start_conflict(self): + """Verify issue #8865""" + + pod_name = list() + pod_name.append(f"Pod_{random.getrandbits(160):x}") + pod_name.append(f"Pod_{random.getrandbits(160):x}") + + r = requests.post( + self.uri("/pods/create"), + json={ + "name": pod_name[0], + "no_infra": False, + "portmappings": [{"host_ip": "127.0.0.1", "host_port": 8889, "container_port": 89}], + }, + ) + self.assertEqual(r.status_code, 201, r.text) + r = requests.post( + self.uri("/containers/create"), + json={ + "pod": pod_name[0], + "image": "quay.io/libpod/alpine:latest", + "command": ["top"], + }, + ) + self.assertEqual(r.status_code, 201, r.text) + + r = requests.post( + self.uri("/pods/create"), + json={ + "name": pod_name[1], + "no_infra": False, + "portmappings": [{"host_ip": "127.0.0.1", "host_port": 8889, "container_port": 89}], + }, + ) + self.assertEqual(r.status_code, 201, r.text) + r = requests.post( + self.uri("/containers/create"), + json={ + "pod": pod_name[1], + "image": "quay.io/libpod/alpine:latest", + "command": ["top"], + }, + ) + self.assertEqual(r.status_code, 201, r.text) + + r = requests.post(self.uri(f"/pods/{pod_name[0]}/start")) + self.assertEqual(r.status_code, 200, r.text) + + r = requests.post(self.uri(f"/pods/{pod_name[1]}/start")) + self.assertEqual(r.status_code, 409, r.text) + + start = r.json() + self.assertGreater(len(start["Errs"]), 0, r.text) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/test_v2_0_0_system.py b/test/apiv2/python/rest_api/test_v2_0_0_system.py new file mode 100644 index 000000000..3628b5af1 --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_system.py @@ -0,0 +1,88 @@ +import json +import unittest + +import requests +from .fixtures import APITestCase + + +class SystemTestCase(APITestCase): + def test_info(self): + r = requests.get(self.uri("/info")) + self.assertEqual(r.status_code, 200, r.text) + self.assertIsNotNone(r.content) + _ = r.json() + + r = requests.get(self.podman_url + "/v1.40/info") + self.assertEqual(r.status_code, 200, r.text) + self.assertIsNotNone(r.content) + _ = r.json() + + def test_events(self): + r = requests.get(self.uri("/events?stream=false")) + self.assertEqual(r.status_code, 200, r.text) + self.assertIsNotNone(r.content) + + report = r.text.splitlines() + self.assertGreater(len(report), 0, "No events found!") + for line in report: + obj = json.loads(line) + # Actor.ID is uppercase for compatibility + self.assertIn("ID", obj["Actor"]) + + def test_ping(self): + required_headers = ( + "API-Version", + "Builder-Version", + "Docker-Experimental", + "Cache-Control", + "Pragma", + "Pragma", + ) + + def check_headers(req): + for k in required_headers: + self.assertIn(k, req.headers) + + r = requests.get(self.podman_url + "/_ping") + self.assertEqual(r.status_code, 200, r.text) + self.assertEqual(r.text, "OK") + check_headers(r) + + r = requests.head(self.podman_url + "/_ping") + self.assertEqual(r.status_code, 200, r.text) + self.assertEqual(r.text, "") + check_headers(r) + + r = requests.get(self.uri("/_ping")) + self.assertEqual(r.status_code, 200, r.text) + self.assertEqual(r.text, "OK") + check_headers(r) + + r = requests.head(self.uri("/_ping")) + self.assertEqual(r.status_code, 200, r.text) + self.assertEqual(r.text, "") + check_headers(r) + + def test_version(self): + r = requests.get(self.podman_url + "/v1.40/version") + self.assertEqual(r.status_code, 200, r.text) + + r = requests.get(self.uri("/version")) + self.assertEqual(r.status_code, 200, r.text) + + def test_df(self): + r = requests.get(self.podman_url + "/v1.40/system/df") + self.assertEqual(r.status_code, 200, r.text) + + obj = r.json() + self.assertIn("Images", obj) + self.assertIn("Containers", obj) + self.assertIn("Volumes", obj) + self.assertIn("BuildCache", obj) + + r = requests.get(self.uri("/system/df")) + self.assertEqual(r.status_code, 200, r.text) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/test_v2_0_0_volume.py b/test/apiv2/python/rest_api/test_v2_0_0_volume.py new file mode 100644 index 000000000..f5231e17c --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_volume.py @@ -0,0 +1,75 @@ +import os +import random +import unittest + +import requests +from .fixtures import APITestCase + + +class VolumeTestCase(APITestCase): + def test_volume(self): + name = f"Volume_{random.getrandbits(160):x}" + + ls = requests.get(self.podman_url + "/v1.40/volumes") + self.assertEqual(ls.status_code, 200, ls.text) + + # See https://docs.docker.com/engine/api/v1.40/#operation/VolumeList + required_keys = ( + "Volumes", + "Warnings", + ) + + volumes = ls.json() + self.assertIsInstance(volumes, dict) + for key in required_keys: + self.assertIn(key, volumes) + + create = requests.post(self.podman_url + "/v1.40/volumes/create", json={"Name": name}) + self.assertEqual(create.status_code, 201, create.text) + + # See https://docs.docker.com/engine/api/v1.40/#operation/VolumeCreate + # and https://docs.docker.com/engine/api/v1.40/#operation/VolumeInspect + required_keys = ( + "Name", + "Driver", + "Mountpoint", + "Labels", + "Scope", + "Options", + ) + + volume = create.json() + self.assertIsInstance(volume, dict) + for k in required_keys: + self.assertIn(k, volume) + self.assertEqual(volume["Name"], name) + + inspect = requests.get(self.podman_url + f"/v1.40/volumes/{name}") + self.assertEqual(inspect.status_code, 200, inspect.text) + + volume = inspect.json() + self.assertIsInstance(volume, dict) + for k in required_keys: + self.assertIn(k, volume) + + rm = requests.delete(self.podman_url + f"/v1.40/volumes/{name}") + self.assertEqual(rm.status_code, 204, rm.text) + + # recreate volume with data and then prune it + r = requests.post(self.podman_url + "/v1.40/volumes/create", json={"Name": name}) + self.assertEqual(create.status_code, 201, create.text) + + create = r.json() + with open(os.path.join(create["Mountpoint"], "test_prune"), "w") as file: + file.writelines(["This is a test\n", "This is a good test\n"]) + + prune = requests.post(self.podman_url + "/v1.40/volumes/prune") + self.assertEqual(prune.status_code, 200, prune.text) + + payload = prune.json() + self.assertIn(name, payload["VolumesDeleted"]) + self.assertGreater(payload["SpaceReclaimed"], 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/v1_test_rest_v1_0_0.py b/test/apiv2/python/rest_api/v1_test_rest_v1_0_0.py new file mode 100644 index 000000000..905c29683 --- /dev/null +++ b/test/apiv2/python/rest_api/v1_test_rest_v1_0_0.py @@ -0,0 +1,238 @@ +import json +import os +import shlex +import signal +import string +import subprocess +import sys +import time +import unittest +from collections.abc import Iterable +from multiprocessing import Process + +import requests +from dateutil.parser import parse + +PODMAN_URL = "http://localhost:8080" + + +def _url(path): + return PODMAN_URL + "/v1.0.0/libpod" + path + + +def podman(): + binary = os.getenv("PODMAN_BINARY") + if binary is None: + binary = "bin/podman" + return binary + + +def ctnr(path): + r = requests.get(_url("/containers/json?all=true")) + try: + ctnrs = json.loads(r.text) + except Exception as e: + sys.stderr.write("Bad container response: {}/{}".format(r.text, e)) + raise e + return path.format(ctnrs[0]["Id"]) + + +class TestApi(unittest.TestCase): + podman = None + + def setUp(self): + super().setUp() + if TestApi.podman.poll() is not None: + sys.stderr.write("podman service returned {}", TestApi.podman.returncode) + sys.exit(2) + requests.get(_url("/images/create?fromSrc=quay.io%2Flibpod%2Falpine%3Alatest")) + # calling out to podman is easier than the API for running a container + subprocess.run( + [podman(), "run", "alpine", "/bin/ls"], + check=True, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + + @classmethod + def setUpClass(cls): + super().setUpClass() + + TestApi.podman = subprocess.Popen( + [ + podman(), + "system", + "service", + "tcp:localhost:8080", + "--log-level=debug", + "--time=0", + ], + shell=False, + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + ) + time.sleep(2) + + @classmethod + def tearDownClass(cls): + TestApi.podman.terminate() + stdout, stderr = TestApi.podman.communicate(timeout=0.5) + if stdout: + print("\nService Stdout:\n" + stdout.decode("utf-8")) + if stderr: + print("\nService Stderr:\n" + stderr.decode("utf-8")) + + if TestApi.podman.returncode > 0: + sys.stderr.write("podman exited with error code {}\n".format(TestApi.podman.returncode)) + sys.exit(2) + + return super().tearDownClass() + + def test_info(self): + r = requests.get(_url("/info")) + self.assertEqual(r.status_code, 200) + self.assertIsNotNone(r.content) + _ = json.loads(r.text) + + def test_events(self): + r = requests.get(_url("/events?stream=false")) + self.assertEqual(r.status_code, 200, r.text) + self.assertIsNotNone(r.content) + for line in r.text.splitlines(): + obj = json.loads(line) + # Actor.ID is uppercase for compatibility + _ = obj["Actor"]["ID"] + + def test_containers(self): + r = requests.get(_url("/containers/json"), timeout=5) + self.assertEqual(r.status_code, 200, r.text) + obj = json.loads(r.text) + self.assertEqual(len(obj), 0) + + def test_containers_all(self): + r = requests.get(_url("/containers/json?all=true")) + self.assertEqual(r.status_code, 200, r.text) + self.validateObjectFields(r.text) + + def test_inspect_container(self): + r = requests.get(_url(ctnr("/containers/{}/json"))) + self.assertEqual(r.status_code, 200, r.text) + obj = self.validateObjectFields(r.content) + _ = parse(obj["Created"]) + + def test_stats(self): + r = requests.get(_url(ctnr("/containers/{}/stats?stream=false"))) + self.assertIn(r.status_code, (200, 409), r.text) + if r.status_code == 200: + self.validateObjectFields(r.text) + + def test_delete_containers(self): + r = requests.delete(_url(ctnr("/containers/{}"))) + self.assertEqual(r.status_code, 204, r.text) + + def test_stop_containers(self): + r = requests.post(_url(ctnr("/containers/{}/start"))) + self.assertIn(r.status_code, (204, 304), r.text) + + r = requests.post(_url(ctnr("/containers/{}/stop"))) + self.assertIn(r.status_code, (204, 304), r.text) + + def test_start_containers(self): + r = requests.post(_url(ctnr("/containers/{}/stop"))) + self.assertIn(r.status_code, (204, 304), r.text) + + r = requests.post(_url(ctnr("/containers/{}/start"))) + self.assertIn(r.status_code, (204, 304), r.text) + + def test_restart_containers(self): + r = requests.post(_url(ctnr("/containers/{}/start"))) + self.assertIn(r.status_code, (204, 304), r.text) + + r = requests.post(_url(ctnr("/containers/{}/restart")), timeout=5) + self.assertEqual(r.status_code, 204, r.text) + + def test_resize(self): + r = requests.post(_url(ctnr("/containers/{}/resize?h=43&w=80"))) + self.assertIn(r.status_code, (200, 409), r.text) + if r.status_code == 200: + self.assertIsNone(r.text) + + def test_attach_containers(self): + r = requests.post(_url(ctnr("/containers/{}/attach"))) + self.assertIn(r.status_code, (101, 409), r.text) + + def test_logs_containers(self): + r = requests.get(_url(ctnr("/containers/{}/logs?stdout=true"))) + self.assertEqual(r.status_code, 200, r.text) + + def test_post_create(self): + self.skipTest("TODO: create request body") + r = requests.post(_url("/containers/create?args=True")) + self.assertEqual(r.status_code, 200, r.text) + json.loads(r.text) + + def test_commit(self): + r = requests.post(_url(ctnr("/commit?container={}"))) + self.assertEqual(r.status_code, 200, r.text) + self.validateObjectFields(r.text) + + def test_images(self): + r = requests.get(_url("/images/json")) + self.assertEqual(r.status_code, 200, r.text) + self.validateObjectFields(r.content) + + def test_inspect_image(self): + r = requests.get(_url("/images/alpine/json")) + self.assertEqual(r.status_code, 200, r.text) + obj = self.validateObjectFields(r.content) + _ = parse(obj["Created"]) + + def test_delete_image(self): + r = requests.delete(_url("/images/alpine?force=true")) + self.assertEqual(r.status_code, 200, r.text) + json.loads(r.text) + + def test_pull(self): + r = requests.post(_url("/images/pull?reference=alpine"), timeout=5) + self.assertEqual(r.status_code, 200, r.text) + json.loads(r.text) + + def test_search(self): + # Had issues with this test hanging when repositories not happy + def do_search(): + r = requests.get(_url("/images/search?term=alpine"), timeout=5) + self.assertEqual(r.status_code, 200, r.text) + json.loads(r.text) + + search = Process(target=do_search) + search.start() + search.join(timeout=10) + self.assertFalse(search.is_alive(), "/images/search took too long") + + def test_ping(self): + r = requests.get(PODMAN_URL + "/_ping") + self.assertEqual(r.status_code, 200, r.text) + + r = requests.head(PODMAN_URL + "/_ping") + self.assertEqual(r.status_code, 200, r.text) + + r = requests.get(_url("/_ping")) + self.assertEqual(r.status_code, 200, r.text) + + r = requests.get(_url("/_ping")) + self.assertEqual(r.status_code, 200, r.text) + + +def validateObjectFields(self, buffer): + objs = json.loads(buffer) + if not isinstance(objs, dict): + for o in objs: + _ = o["Id"] + else: + _ = objs["Id"] + return objs + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/rest_api/__init__.py b/test/apiv2/rest_api/__init__.py deleted file mode 100644 index 0ad6b51b3..000000000 --- a/test/apiv2/rest_api/__init__.py +++ /dev/null @@ -1,136 +0,0 @@ -import configparser -import json -import os -import shutil -import subprocess -import sys -import tempfile - - -class Podman(object): - """ - Instances hold the configuration and setup for running podman commands - """ - - def __init__(self): - """Initialize a Podman instance with global options""" - binary = os.getenv("PODMAN", "bin/podman") - self.cmd = [binary, "--storage-driver=vfs"] - - cgroupfs = os.getenv("CGROUP_MANAGER", "systemd") - self.cmd.append(f"--cgroup-manager={cgroupfs}") - - if os.getenv("DEBUG"): - self.cmd.append("--log-level=debug") - self.cmd.append("--syslog=true") - - self.anchor_directory = tempfile.mkdtemp(prefix="podman_restapi_") - self.cmd.append("--root=" + os.path.join(self.anchor_directory, "crio")) - self.cmd.append("--runroot=" + os.path.join(self.anchor_directory, "crio-run")) - - os.environ["CONTAINERS_REGISTRIES_CONF"] = os.path.join( - self.anchor_directory, "registry.conf" - ) - p = configparser.ConfigParser() - p.read_dict( - { - "registries.search": {"registries": "['docker.io']"}, - "registries.insecure": {"registries": "[]"}, - "registries.block": {"registries": "[]"}, - } - ) - with open(os.environ["CONTAINERS_REGISTRIES_CONF"], "w") as w: - p.write(w) - - os.environ["CNI_CONFIG_PATH"] = os.path.join(self.anchor_directory, "cni", "net.d") - os.makedirs(os.environ["CNI_CONFIG_PATH"], exist_ok=True) - self.cmd.append("--cni-config-dir=" + os.environ["CNI_CONFIG_PATH"]) - cni_cfg = os.path.join(os.environ["CNI_CONFIG_PATH"], "87-podman-bridge.conflist") - # json decoded and encoded to ensure legal json - buf = json.loads( - """ - { - "cniVersion": "0.3.0", - "name": "podman", - "plugins": [{ - "type": "bridge", - "bridge": "cni0", - "isGateway": true, - "ipMasq": true, - "ipam": { - "type": "host-local", - "subnet": "10.88.0.0/16", - "routes": [{ - "dst": "0.0.0.0/0" - }] - } - }, - { - "type": "portmap", - "capabilities": { - "portMappings": true - } - } - ] - } - """ - ) - with open(cni_cfg, "w") as w: - json.dump(buf, w) - - def open(self, command, *args, **kwargs): - """Podman initialized instance to run a given command - - :param self: Podman instance - :param command: podman sub-command to run - :param args: arguments and options for command - :param kwargs: See subprocess.Popen() for shell keyword - :return: subprocess.Popen() instance configured to run podman instance - """ - cmd = self.cmd.copy() - cmd.append(command) - cmd.extend(args) - - shell = kwargs.get("shell", False) - - return subprocess.Popen( - cmd, - shell=shell, - stdin=subprocess.DEVNULL, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - - def run(self, command, *args, **kwargs): - """Podman initialized instance to run a given command - - :param self: Podman instance - :param command: podman sub-command to run - :param args: arguments and options for command - :param kwargs: See subprocess.Popen() for shell and check keywords - :return: subprocess.Popen() instance configured to run podman instance - """ - cmd = self.cmd.copy() - cmd.append(command) - cmd.extend(args) - - check = kwargs.get("check", False) - shell = kwargs.get("shell", False) - - try: - return subprocess.run( - cmd, - shell=shell, - check=check, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - ) - except subprocess.CalledProcessError as e: - if e.stdout: - sys.stdout.write("\nRun Stdout:\n" + e.stdout.decode("utf-8")) - if e.stderr: - sys.stderr.write("\nRun Stderr:\n" + e.stderr.decode("utf-8")) - raise - - def tear_down(self): - shutil.rmtree(self.anchor_directory, ignore_errors=True) diff --git a/test/apiv2/rest_api/test_rest_v2_0_0.py b/test/apiv2/rest_api/test_rest_v2_0_0.py deleted file mode 100644 index f66e2b120..000000000 --- a/test/apiv2/rest_api/test_rest_v2_0_0.py +++ /dev/null @@ -1,744 +0,0 @@ -import json -import os -import random -import string -import subprocess -import sys -import time -import unittest -from multiprocessing import Process - -import requests -from dateutil.parser import parse - -from test.apiv2.rest_api import Podman - -PODMAN_URL = "http://localhost:8080" - - -def _url(path): - return PODMAN_URL + "/v2.0.0/libpod" + path - - -def ctnr(path): - try: - r = requests.get(_url("/containers/json?all=true")) - ctnrs = json.loads(r.text) - except Exception as e: - msg = f"Bad container response: {e}" - if r is not None: - msg = msg + " " + r.text - sys.stderr.write(msg + "\n") - raise - return path.format(ctnrs[0]["Id"]) - - -def validateObjectFields(buffer): - objs = json.loads(buffer) - if not isinstance(objs, dict): - for o in objs: - _ = o["Id"] - else: - _ = objs["Id"] - return objs - - -class TestApi(unittest.TestCase): - podman = None # initialized podman configuration for tests - service = None # podman service instance - - def setUp(self): - super().setUp() - - TestApi.podman.run("run", "alpine", "/bin/ls", check=True) - - def tearDown(self) -> None: - super().tearDown() - - TestApi.podman.run("pod", "rm", "--all", "--force", check=True) - TestApi.podman.run("rm", "--all", "--force", check=True) - - @classmethod - def setUpClass(cls): - super().setUpClass() - - TestApi.podman = Podman() - TestApi.service = TestApi.podman.open("system", "service", "tcp:localhost:8080", "--time=0") - # give the service some time to be ready... - time.sleep(2) - - returncode = TestApi.service.poll() - if returncode is not None: - raise subprocess.CalledProcessError(returncode, "podman system service") - - r = requests.post(_url("/images/pull?reference=docker.io%2Falpine%3Alatest")) - if r.status_code != 200: - raise subprocess.CalledProcessError( - r.status_code, f"podman images pull docker.io/alpine:latest {r.text}" - ) - - @classmethod - def tearDownClass(cls): - TestApi.service.terminate() - stdout, stderr = TestApi.service.communicate(timeout=0.5) - if stdout: - sys.stdout.write("\nService Stdout:\n" + stdout.decode("utf-8")) - if stderr: - sys.stderr.write("\nService Stderr:\n" + stderr.decode("utf-8")) - return super().tearDownClass() - - def test_info(self): - r = requests.get(_url("/info")) - self.assertEqual(r.status_code, 200) - self.assertIsNotNone(r.content) - _ = json.loads(r.text) - - info = requests.get(PODMAN_URL + "/v1.40/info") - self.assertEqual(info.status_code, 200, info.content) - _ = json.loads(info.text) - - def test_events(self): - r = requests.get(_url("/events?stream=false")) - self.assertEqual(r.status_code, 200, r.text) - self.assertIsNotNone(r.content) - - report = r.text.splitlines() - self.assertGreater(len(report), 0, "No events found!") - for line in report: - obj = json.loads(line) - # Actor.ID is uppercase for compatibility - self.assertIn("ID", obj["Actor"]) - - def test_containers(self): - r = requests.get(_url("/containers/json"), timeout=5) - self.assertEqual(r.status_code, 200, r.text) - obj = json.loads(r.text) - self.assertEqual(len(obj), 0) - - def test_containers_all(self): - r = requests.get(_url("/containers/json?all=true")) - self.assertEqual(r.status_code, 200, r.text) - validateObjectFields(r.text) - - def test_inspect_container(self): - r = requests.get(_url(ctnr("/containers/{}/json"))) - self.assertEqual(r.status_code, 200, r.text) - obj = validateObjectFields(r.content) - _ = parse(obj["Created"]) - - def test_stats(self): - r = requests.get(_url(ctnr("/containers/{}/stats?stream=false"))) - self.assertIn(r.status_code, (200, 409), r.text) - if r.status_code == 200: - validateObjectFields(r.text) - - def test_delete_containers(self): - r = requests.delete(_url(ctnr("/containers/{}"))) - self.assertEqual(r.status_code, 204, r.text) - - def test_stop_containers(self): - r = requests.post(_url(ctnr("/containers/{}/start"))) - self.assertIn(r.status_code, (204, 304), r.text) - - r = requests.post(_url(ctnr("/containers/{}/stop"))) - self.assertIn(r.status_code, (204, 304), r.text) - - def test_start_containers(self): - r = requests.post(_url(ctnr("/containers/{}/stop"))) - self.assertIn(r.status_code, (204, 304), r.text) - - r = requests.post(_url(ctnr("/containers/{}/start"))) - self.assertIn(r.status_code, (204, 304), r.text) - - def test_restart_containers(self): - r = requests.post(_url(ctnr("/containers/{}/start"))) - self.assertIn(r.status_code, (204, 304), r.text) - - r = requests.post(_url(ctnr("/containers/{}/restart")), timeout=5) - self.assertEqual(r.status_code, 204, r.text) - - def test_resize(self): - r = requests.post(_url(ctnr("/containers/{}/resize?h=43&w=80"))) - self.assertIn(r.status_code, (200, 409), r.text) - if r.status_code == 200: - self.assertEqual(r.text, "", r.text) - - def test_attach_containers(self): - self.skipTest("FIXME: Test timeouts") - r = requests.post(_url(ctnr("/containers/{}/attach")), timeout=5) - self.assertIn(r.status_code, (101, 500), r.text) - - def test_logs_containers(self): - r = requests.get(_url(ctnr("/containers/{}/logs?stdout=true"))) - self.assertEqual(r.status_code, 200, r.text) - - # TODO Need to support Docker-py order of network/container creates - def test_post_create_compat_connect(self): - """Create network and container then connect to network""" - net_default = requests.post( - PODMAN_URL + "/v1.40/networks/create", json={"Name": "TestDefaultNetwork"} - ) - self.assertEqual(net_default.status_code, 201, net_default.text) - - create = requests.post( - PODMAN_URL + "/v1.40/containers/create?name=postCreateConnect", - json={ - "Cmd": ["top"], - "Image": "alpine:latest", - "NetworkDisabled": False, - # FIXME adding these 2 lines cause: (This is sampled from docker-py) - # "network already exists","message":"container - # 01306e499df5441560d70071a54342611e422a94de20865add50a9565fd79fb9 is already connected to CNI - # network \"TestDefaultNetwork\": network already exists" - # "HostConfig": {"NetworkMode": "TestDefaultNetwork"}, - # "NetworkingConfig": {"EndpointsConfig": {"TestDefaultNetwork": None}}, - # FIXME These two lines cause: - # CNI network \"TestNetwork\" not found","message":"error configuring network namespace for container - # 369ddfa7d3211ebf1fbd5ddbff91bd33fa948858cea2985c133d6b6507546dff: CNI network \"TestNetwork\" not - # found" - # "HostConfig": {"NetworkMode": "TestNetwork"}, - # "NetworkingConfig": {"EndpointsConfig": {"TestNetwork": None}}, - # FIXME no networking defined cause: (note this error is from the container inspect below) - # "internal libpod error","message":"network inspection mismatch: asked to join 2 CNI network(s) [ - # TestDefaultNetwork podman], but have information on 1 network(s): internal libpod error" - }, - ) - self.assertEqual(create.status_code, 201, create.text) - payload = json.loads(create.text) - self.assertIsNotNone(payload["Id"]) - - start = requests.post(PODMAN_URL + f"/v1.40/containers/{payload['Id']}/start") - self.assertEqual(start.status_code, 204, start.text) - - connect = requests.post( - PODMAN_URL + "/v1.40/networks/TestDefaultNetwork/connect", - json={"Container": payload["Id"]}, - ) - self.assertEqual(connect.status_code, 200, connect.text) - self.assertEqual(connect.text, "OK\n") - - inspect = requests.get(f"{PODMAN_URL}/v1.40/containers/{payload['Id']}/json") - self.assertEqual(inspect.status_code, 200, inspect.text) - - payload = json.loads(inspect.text) - self.assertFalse(payload["Config"].get("NetworkDisabled", False)) - - self.assertEqual( - "TestDefaultNetwork", - payload["NetworkSettings"]["Networks"]["TestDefaultNetwork"]["NetworkID"], - ) - # TODO restore this to test, when joining multiple networks possible - # self.assertEqual( - # "TestNetwork", - # payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"], - # ) - # TODO Need to support network aliases - # self.assertIn( - # "test_post_create", - # payload["NetworkSettings"]["Networks"]["TestNetwork"]["Aliases"], - # ) - - def test_post_create_compat(self): - """Create network and connect container during create""" - net = requests.post(PODMAN_URL + "/v1.40/networks/create", json={"Name": "TestNetwork"}) - self.assertEqual(net.status_code, 201, net.text) - - create = requests.post( - PODMAN_URL + "/v1.40/containers/create?name=postCreate", - json={ - "Cmd": ["date"], - "Image": "alpine:latest", - "NetworkDisabled": False, - "HostConfig": {"NetworkMode": "TestNetwork"}, - }, - ) - self.assertEqual(create.status_code, 201, create.text) - payload = json.loads(create.text) - self.assertIsNotNone(payload["Id"]) - - inspect = requests.get(f"{PODMAN_URL}/v1.40/containers/{payload['Id']}/json") - self.assertEqual(inspect.status_code, 200, inspect.text) - payload = json.loads(inspect.text) - self.assertFalse(payload["Config"].get("NetworkDisabled", False)) - self.assertEqual( - "TestNetwork", - payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"], - ) - - def test_commit(self): - r = requests.post(_url(ctnr("/commit?container={}"))) - self.assertEqual(r.status_code, 200, r.text) - - obj = json.loads(r.content) - self.assertIsInstance(obj, dict) - self.assertIn("Id", obj) - - def test_images_compat(self): - r = requests.get(PODMAN_URL + "/v1.40/images/json") - self.assertEqual(r.status_code, 200, r.text) - - # See https://docs.docker.com/engine/api/v1.40/#operation/ImageList - required_keys = ( - "Id", - "ParentId", - "RepoTags", - "RepoDigests", - "Created", - "Size", - "SharedSize", - "VirtualSize", - "Labels", - "Containers", - ) - objs = json.loads(r.content) - self.assertIn(type(objs), (list,)) - for o in objs: - self.assertIsInstance(o, dict) - for k in required_keys: - self.assertIn(k, o) - - def test_inspect_image_compat(self): - r = requests.get(PODMAN_URL + "/v1.40/images/alpine/json") - self.assertEqual(r.status_code, 200, r.text) - - # See https://docs.docker.com/engine/api/v1.40/#operation/ImageInspect - required_keys = ( - "Id", - "Parent", - "Comment", - "Created", - "Container", - "DockerVersion", - "Author", - "Architecture", - "Os", - "Size", - "VirtualSize", - "GraphDriver", - "RootFS", - "Metadata", - ) - - obj = json.loads(r.content) - self.assertIn(type(obj), (dict,)) - for k in required_keys: - self.assertIn(k, obj) - _ = parse(obj["Created"]) - - def test_delete_image_compat(self): - r = requests.delete(PODMAN_URL + "/v1.40/images/alpine?force=true") - self.assertEqual(r.status_code, 200, r.text) - obj = json.loads(r.content) - self.assertIn(type(obj), (list,)) - - def test_pull(self): - r = requests.post(_url("/images/pull?reference=alpine"), timeout=15) - self.assertEqual(r.status_code, 200, r.status_code) - text = r.text - keys = { - "error": False, - "id": False, - "images": False, - "stream": False, - } - # Read and record stanza's from pull - for line in str.splitlines(text): - obj = json.loads(line) - key_list = list(obj.keys()) - for k in key_list: - keys[k] = True - - self.assertFalse(keys["error"], "Expected no errors") - self.assertTrue(keys["id"], "Expected to find id stanza") - self.assertTrue(keys["images"], "Expected to find images stanza") - self.assertTrue(keys["stream"], "Expected to find stream progress stanza's") - - def test_search_compat(self): - url = PODMAN_URL + "/v1.40/images/search" - - # Had issues with this test hanging when repositories not happy - def do_search1(): - payload = {"term": "alpine"} - r = requests.get(url, params=payload, timeout=5) - self.assertEqual(r.status_code, 200, r.text) - objs = json.loads(r.text) - self.assertIn(type(objs), (list,)) - - def do_search2(): - payload = {"term": "alpine", "limit": 1} - r = requests.get(url, params=payload, timeout=5) - self.assertEqual(r.status_code, 200, r.text) - objs = json.loads(r.text) - self.assertIn(type(objs), (list,)) - self.assertEqual(len(objs), 1) - - def do_search3(): - payload = {"term": "alpine", "filters": '{"is-official":["true"]}'} - r = requests.get(url, params=payload, timeout=5) - self.assertEqual(r.status_code, 200, r.text) - objs = json.loads(r.text) - self.assertIn(type(objs), (list,)) - # There should be only one official image - self.assertEqual(len(objs), 1) - - def do_search4(): - headers = {"X-Registry-Auth": "null"} - payload = {"term": "alpine"} - r = requests.get(url, params=payload, headers=headers, timeout=5) - self.assertEqual(r.status_code, 200, r.text) - - def do_search5(): - headers = {"X-Registry-Auth": "invalid value"} - payload = {"term": "alpine"} - r = requests.get(url, params=payload, headers=headers, timeout=5) - self.assertEqual(r.status_code, 400, r.text) - - search_methods = [do_search1, do_search2, do_search3, do_search4, do_search5] - for search_method in search_methods: - search = Process(target=search_method) - search.start() - search.join(timeout=10) - self.assertFalse(search.is_alive(), "/images/search took too long") - - def test_ping(self): - required_headers = ( - "API-Version", - "Builder-Version", - "Docker-Experimental", - "Cache-Control", - "Pragma", - "Pragma", - ) - - def check_headers(req): - for k in required_headers: - self.assertIn(k, req.headers) - - r = requests.get(PODMAN_URL + "/_ping") - self.assertEqual(r.status_code, 200, r.text) - self.assertEqual(r.text, "OK") - check_headers(r) - - r = requests.head(PODMAN_URL + "/_ping") - self.assertEqual(r.status_code, 200, r.text) - self.assertEqual(r.text, "") - check_headers(r) - - r = requests.get(_url("/_ping")) - self.assertEqual(r.status_code, 200, r.text) - self.assertEqual(r.text, "OK") - check_headers(r) - - r = requests.head(_url("/_ping")) - self.assertEqual(r.status_code, 200, r.text) - self.assertEqual(r.text, "") - check_headers(r) - - def test_history_compat(self): - r = requests.get(PODMAN_URL + "/v1.40/images/alpine/history") - self.assertEqual(r.status_code, 200, r.text) - - # See https://docs.docker.com/engine/api/v1.40/#operation/ImageHistory - required_keys = ("Id", "Created", "CreatedBy", "Tags", "Size", "Comment") - - objs = json.loads(r.content) - self.assertIn(type(objs), (list,)) - for o in objs: - self.assertIsInstance(o, dict) - for k in required_keys: - self.assertIn(k, o) - - def test_network_compat(self): - name = "Network_" + "".join(random.choice(string.ascii_letters) for i in range(10)) - - # Cannot test for 0 existing networks because default "podman" network always exists - - create = requests.post(PODMAN_URL + "/v1.40/networks/create", json={"Name": name}) - self.assertEqual(create.status_code, 201, create.content) - obj = json.loads(create.content) - self.assertIn(type(obj), (dict,)) - self.assertIn("Id", obj) - ident = obj["Id"] - self.assertNotEqual(name, ident) - - ls = requests.get(PODMAN_URL + "/v1.40/networks") - self.assertEqual(ls.status_code, 200, ls.content) - objs = json.loads(ls.content) - self.assertIn(type(objs), (list,)) - - found = False - for network in objs: - if network["Name"] == name: - found = True - self.assertTrue(found, f"Network {name} not found") - - inspect = requests.get(PODMAN_URL + f"/v1.40/networks/{ident}") - self.assertEqual(inspect.status_code, 200, inspect.content) - obj = json.loads(create.content) - self.assertIn(type(obj), (dict,)) - - inspect = requests.delete(PODMAN_URL + f"/v1.40/networks/{ident}") - self.assertEqual(inspect.status_code, 204, inspect.content) - inspect = requests.get(PODMAN_URL + f"/v1.40/networks/{ident}") - self.assertEqual(inspect.status_code, 404, inspect.content) - - # network prune - prune_name = "Network_" + "".join(random.choice(string.ascii_letters) for i in range(10)) - prune_create = requests.post( - PODMAN_URL + "/v1.40/networks/create", json={"Name": prune_name} - ) - self.assertEqual(create.status_code, 201, prune_create.content) - - prune = requests.post(PODMAN_URL + "/v1.40/networks/prune") - self.assertEqual(prune.status_code, 200, prune.content) - obj = json.loads(prune.content) - self.assertTrue(prune_name in obj["NetworksDeleted"]) - - def test_volumes_compat(self): - name = "Volume_" + "".join(random.choice(string.ascii_letters) for i in range(10)) - - ls = requests.get(PODMAN_URL + "/v1.40/volumes") - self.assertEqual(ls.status_code, 200, ls.content) - - # See https://docs.docker.com/engine/api/v1.40/#operation/VolumeList - required_keys = ( - "Volumes", - "Warnings", - ) - - obj = json.loads(ls.content) - self.assertIn(type(obj), (dict,)) - for k in required_keys: - self.assertIn(k, obj) - - create = requests.post(PODMAN_URL + "/v1.40/volumes/create", json={"Name": name}) - self.assertEqual(create.status_code, 201, create.content) - - # See https://docs.docker.com/engine/api/v1.40/#operation/VolumeCreate - # and https://docs.docker.com/engine/api/v1.40/#operation/VolumeInspect - required_keys = ( - "Name", - "Driver", - "Mountpoint", - "Labels", - "Scope", - "Options", - ) - - obj = json.loads(create.content) - self.assertIn(type(obj), (dict,)) - for k in required_keys: - self.assertIn(k, obj) - self.assertEqual(obj["Name"], name) - - inspect = requests.get(PODMAN_URL + f"/v1.40/volumes/{name}") - self.assertEqual(inspect.status_code, 200, inspect.content) - - obj = json.loads(create.content) - self.assertIn(type(obj), (dict,)) - for k in required_keys: - self.assertIn(k, obj) - - rm = requests.delete(PODMAN_URL + f"/v1.40/volumes/{name}") - self.assertEqual(rm.status_code, 204, rm.content) - - # recreate volume with data and then prune it - r = requests.post(PODMAN_URL + "/v1.40/volumes/create", json={"Name": name}) - self.assertEqual(create.status_code, 201, create.content) - create = json.loads(r.content) - with open(os.path.join(create["Mountpoint"], "test_prune"), "w") as file: - file.writelines(["This is a test\n", "This is a good test\n"]) - - prune = requests.post(PODMAN_URL + "/v1.40/volumes/prune") - self.assertEqual(prune.status_code, 200, prune.content) - payload = json.loads(prune.content) - self.assertIn(name, payload["VolumesDeleted"]) - self.assertGreater(payload["SpaceReclaimed"], 0) - - def test_version(self): - r = requests.get(PODMAN_URL + "/v1.40/version") - self.assertEqual(r.status_code, 200, r.content) - - r = requests.get(_url("/version")) - self.assertEqual(r.status_code, 200, r.content) - - def test_df_compat(self): - r = requests.get(PODMAN_URL + "/v1.40/system/df") - self.assertEqual(r.status_code, 200, r.content) - - obj = json.loads(r.content) - self.assertIn("Images", obj) - self.assertIn("Containers", obj) - self.assertIn("Volumes", obj) - self.assertIn("BuildCache", obj) - - def test_prune_compat(self): - name = "Ctnr_" + "".join(random.choice(string.ascii_letters) for i in range(10)) - - r = requests.post( - PODMAN_URL + f"/v1.40/containers/create?name={name}", - json={ - "Cmd": ["cp", "/etc/motd", "/motd.size_test"], - "Image": "alpine:latest", - "NetworkDisabled": True, - }, - ) - self.assertEqual(r.status_code, 201, r.text) - create = json.loads(r.text) - - r = requests.post(PODMAN_URL + f"/v1.40/containers/{create['Id']}/start") - self.assertEqual(r.status_code, 204, r.text) - - r = requests.post(PODMAN_URL + f"/v1.40/containers/{create['Id']}/wait") - self.assertEqual(r.status_code, 200, r.text) - wait = json.loads(r.text) - self.assertEqual(wait["StatusCode"], 0, wait["Error"]["Message"]) - - prune = requests.post(PODMAN_URL + "/v1.40/containers/prune") - self.assertEqual(prune.status_code, 200, prune.status_code) - prune_payload = json.loads(prune.text) - self.assertGreater(prune_payload["SpaceReclaimed"], 0) - self.assertIn(create["Id"], prune_payload["ContainersDeleted"]) - - # Delete any orphaned containers - r = requests.get(PODMAN_URL + "/v1.40/containers/json?all=true") - self.assertEqual(r.status_code, 200, r.text) - for ctnr in json.loads(r.text): - requests.delete(PODMAN_URL + f"/v1.40/containers/{ctnr['Id']}?force=true") - - prune = requests.post(PODMAN_URL + "/v1.40/images/prune") - self.assertEqual(prune.status_code, 200, prune.text) - prune_payload = json.loads(prune.text) - self.assertGreater(prune_payload["SpaceReclaimed"], 0) - - # FIXME need method to determine which image is going to be "pruned" to fix test - # TODO should handler be recursive when deleting images? - # self.assertIn(img["Id"], prune_payload["ImagesDeleted"][1]["Deleted"]) - - # FIXME (@vrothberg): I commented this line out during the `libimage` migration. - # It doesn't make sense to report anything to be deleted if the reclaimed space - # is zero. I think the test needs some rewrite. - # self.assertIsNotNone(prune_payload["ImagesDeleted"][1]["Deleted"]) - - def test_status_compat(self): - r = requests.post( - PODMAN_URL + "/v1.40/containers/create?name=topcontainer", - json={"Cmd": ["top"], "Image": "alpine:latest"}, - ) - self.assertEqual(r.status_code, 201, r.text) - payload = json.loads(r.text) - container_id = payload["Id"] - self.assertIsNotNone(container_id) - - r = requests.get( - PODMAN_URL + "/v1.40/containers/json", - params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, - ) - self.assertEqual(r.status_code, 200, r.text) - payload = json.loads(r.text) - self.assertEqual(payload[0]["Status"], "Created") - - r = requests.post(PODMAN_URL + f"/v1.40/containers/{container_id}/start") - self.assertEqual(r.status_code, 204, r.text) - - r = requests.get( - PODMAN_URL + "/v1.40/containers/json", - params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, - ) - self.assertEqual(r.status_code, 200, r.text) - payload = json.loads(r.text) - self.assertTrue(str(payload[0]["Status"]).startswith("Up")) - - r = requests.post(PODMAN_URL + f"/v1.40/containers/{container_id}/pause") - self.assertEqual(r.status_code, 204, r.text) - - r = requests.get( - PODMAN_URL + "/v1.40/containers/json", - params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, - ) - self.assertEqual(r.status_code, 200, r.text) - payload = json.loads(r.text) - self.assertTrue(str(payload[0]["Status"]).startswith("Up")) - self.assertTrue(str(payload[0]["Status"]).endswith("(Paused)")) - - r = requests.post(PODMAN_URL + f"/v1.40/containers/{container_id}/unpause") - self.assertEqual(r.status_code, 204, r.text) - r = requests.post(PODMAN_URL + f"/v1.40/containers/{container_id}/stop") - self.assertEqual(r.status_code, 204, r.text) - - r = requests.get( - PODMAN_URL + "/v1.40/containers/json", - params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, - ) - self.assertEqual(r.status_code, 200, r.text) - payload = json.loads(r.text) - self.assertTrue(str(payload[0]["Status"]).startswith("Exited")) - - r = requests.delete(PODMAN_URL + f"/v1.40/containers/{container_id}") - self.assertEqual(r.status_code, 204, r.text) - - def test_pod_start_conflict(self): - """Verify issue #8865""" - - pod_name = list() - pod_name.append("Pod_" + "".join(random.choice(string.ascii_letters) for i in range(10))) - pod_name.append("Pod_" + "".join(random.choice(string.ascii_letters) for i in range(10))) - - r = requests.post( - _url("/pods/create"), - json={ - "name": pod_name[0], - "no_infra": False, - "portmappings": [{"host_ip": "127.0.0.1", "host_port": 8889, "container_port": 89}], - }, - ) - self.assertEqual(r.status_code, 201, r.text) - r = requests.post( - _url("/containers/create"), - json={ - "pod": pod_name[0], - "image": "docker.io/alpine:latest", - "command": ["top"], - }, - ) - self.assertEqual(r.status_code, 201, r.text) - - r = requests.post( - _url("/pods/create"), - json={ - "name": pod_name[1], - "no_infra": False, - "portmappings": [{"host_ip": "127.0.0.1", "host_port": 8889, "container_port": 89}], - }, - ) - self.assertEqual(r.status_code, 201, r.text) - r = requests.post( - _url("/containers/create"), - json={ - "pod": pod_name[1], - "image": "docker.io/alpine:latest", - "command": ["top"], - }, - ) - self.assertEqual(r.status_code, 201, r.text) - - r = requests.post(_url(f"/pods/{pod_name[0]}/start")) - self.assertEqual(r.status_code, 200, r.text) - - r = requests.post(_url(f"/pods/{pod_name[1]}/start")) - self.assertEqual(r.status_code, 409, r.text) - - start = json.loads(r.text) - self.assertGreater(len(start["Errs"]), 0, r.text) - - def test_manifest_409(self): - r = requests.post(_url("/manifests/create"), params={"name": "ThisIsAnInvalidImage"}) - self.assertEqual(r.status_code, 400, r.text) - - def test_df(self): - r = requests.get(_url("/system/df")) - self.assertEqual(r.status_code, 200, r.text) - - -if __name__ == "__main__": - unittest.main() diff --git a/test/apiv2/rest_api/v1_test_rest_v1_0_0.py b/test/apiv2/rest_api/v1_test_rest_v1_0_0.py deleted file mode 100644 index 23528a246..000000000 --- a/test/apiv2/rest_api/v1_test_rest_v1_0_0.py +++ /dev/null @@ -1,238 +0,0 @@ -import json -import os -import shlex -import signal -import string -import subprocess -import sys -import time -import unittest -from collections.abc import Iterable -from multiprocessing import Process - -import requests -from dateutil.parser import parse - -PODMAN_URL = "http://localhost:8080" - - -def _url(path): - return PODMAN_URL + "/v1.0.0/libpod" + path - - -def podman(): - binary = os.getenv("PODMAN_BINARY") - if binary is None: - binary = "bin/podman" - return binary - - -def ctnr(path): - r = requests.get(_url("/containers/json?all=true")) - try: - ctnrs = json.loads(r.text) - except Exception as e: - sys.stderr.write("Bad container response: {}/{}".format(r.text, e)) - raise e - return path.format(ctnrs[0]["Id"]) - - -class TestApi(unittest.TestCase): - podman = None - - def setUp(self): - super().setUp() - if TestApi.podman.poll() is not None: - sys.stderr.write("podman service returned {}", TestApi.podman.returncode) - sys.exit(2) - requests.get(_url("/images/create?fromSrc=docker.io%2Falpine%3Alatest")) - # calling out to podman is easier than the API for running a container - subprocess.run( - [podman(), "run", "alpine", "/bin/ls"], - check=True, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - - @classmethod - def setUpClass(cls): - super().setUpClass() - - TestApi.podman = subprocess.Popen( - [ - podman(), - "system", - "service", - "tcp:localhost:8080", - "--log-level=debug", - "--time=0", - ], - shell=False, - stdin=subprocess.DEVNULL, - stdout=subprocess.DEVNULL, - stderr=subprocess.DEVNULL, - ) - time.sleep(2) - - @classmethod - def tearDownClass(cls): - TestApi.podman.terminate() - stdout, stderr = TestApi.podman.communicate(timeout=0.5) - if stdout: - print("\nService Stdout:\n" + stdout.decode("utf-8")) - if stderr: - print("\nService Stderr:\n" + stderr.decode("utf-8")) - - if TestApi.podman.returncode > 0: - sys.stderr.write("podman exited with error code {}\n".format(TestApi.podman.returncode)) - sys.exit(2) - - return super().tearDownClass() - - def test_info(self): - r = requests.get(_url("/info")) - self.assertEqual(r.status_code, 200) - self.assertIsNotNone(r.content) - _ = json.loads(r.text) - - def test_events(self): - r = requests.get(_url("/events?stream=false")) - self.assertEqual(r.status_code, 200, r.text) - self.assertIsNotNone(r.content) - for line in r.text.splitlines(): - obj = json.loads(line) - # Actor.ID is uppercase for compatibility - _ = obj["Actor"]["ID"] - - def test_containers(self): - r = requests.get(_url("/containers/json"), timeout=5) - self.assertEqual(r.status_code, 200, r.text) - obj = json.loads(r.text) - self.assertEqual(len(obj), 0) - - def test_containers_all(self): - r = requests.get(_url("/containers/json?all=true")) - self.assertEqual(r.status_code, 200, r.text) - self.validateObjectFields(r.text) - - def test_inspect_container(self): - r = requests.get(_url(ctnr("/containers/{}/json"))) - self.assertEqual(r.status_code, 200, r.text) - obj = self.validateObjectFields(r.content) - _ = parse(obj["Created"]) - - def test_stats(self): - r = requests.get(_url(ctnr("/containers/{}/stats?stream=false"))) - self.assertIn(r.status_code, (200, 409), r.text) - if r.status_code == 200: - self.validateObjectFields(r.text) - - def test_delete_containers(self): - r = requests.delete(_url(ctnr("/containers/{}"))) - self.assertEqual(r.status_code, 204, r.text) - - def test_stop_containers(self): - r = requests.post(_url(ctnr("/containers/{}/start"))) - self.assertIn(r.status_code, (204, 304), r.text) - - r = requests.post(_url(ctnr("/containers/{}/stop"))) - self.assertIn(r.status_code, (204, 304), r.text) - - def test_start_containers(self): - r = requests.post(_url(ctnr("/containers/{}/stop"))) - self.assertIn(r.status_code, (204, 304), r.text) - - r = requests.post(_url(ctnr("/containers/{}/start"))) - self.assertIn(r.status_code, (204, 304), r.text) - - def test_restart_containers(self): - r = requests.post(_url(ctnr("/containers/{}/start"))) - self.assertIn(r.status_code, (204, 304), r.text) - - r = requests.post(_url(ctnr("/containers/{}/restart")), timeout=5) - self.assertEqual(r.status_code, 204, r.text) - - def test_resize(self): - r = requests.post(_url(ctnr("/containers/{}/resize?h=43&w=80"))) - self.assertIn(r.status_code, (200, 409), r.text) - if r.status_code == 200: - self.assertIsNone(r.text) - - def test_attach_containers(self): - r = requests.post(_url(ctnr("/containers/{}/attach"))) - self.assertIn(r.status_code, (101, 409), r.text) - - def test_logs_containers(self): - r = requests.get(_url(ctnr("/containers/{}/logs?stdout=true"))) - self.assertEqual(r.status_code, 200, r.text) - - def test_post_create(self): - self.skipTest("TODO: create request body") - r = requests.post(_url("/containers/create?args=True")) - self.assertEqual(r.status_code, 200, r.text) - json.loads(r.text) - - def test_commit(self): - r = requests.post(_url(ctnr("/commit?container={}"))) - self.assertEqual(r.status_code, 200, r.text) - self.validateObjectFields(r.text) - - def test_images(self): - r = requests.get(_url("/images/json")) - self.assertEqual(r.status_code, 200, r.text) - self.validateObjectFields(r.content) - - def test_inspect_image(self): - r = requests.get(_url("/images/alpine/json")) - self.assertEqual(r.status_code, 200, r.text) - obj = self.validateObjectFields(r.content) - _ = parse(obj["Created"]) - - def test_delete_image(self): - r = requests.delete(_url("/images/alpine?force=true")) - self.assertEqual(r.status_code, 200, r.text) - json.loads(r.text) - - def test_pull(self): - r = requests.post(_url("/images/pull?reference=alpine"), timeout=5) - self.assertEqual(r.status_code, 200, r.text) - json.loads(r.text) - - def test_search(self): - # Had issues with this test hanging when repositories not happy - def do_search(): - r = requests.get(_url("/images/search?term=alpine"), timeout=5) - self.assertEqual(r.status_code, 200, r.text) - json.loads(r.text) - - search = Process(target=do_search) - search.start() - search.join(timeout=10) - self.assertFalse(search.is_alive(), "/images/search took too long") - - def test_ping(self): - r = requests.get(PODMAN_URL + "/_ping") - self.assertEqual(r.status_code, 200, r.text) - - r = requests.head(PODMAN_URL + "/_ping") - self.assertEqual(r.status_code, 200, r.text) - - r = requests.get(_url("/_ping")) - self.assertEqual(r.status_code, 200, r.text) - - r = requests.get(_url("/_ping")) - self.assertEqual(r.status_code, 200, r.text) - - -def validateObjectFields(self, buffer): - objs = json.loads(buffer) - if not isinstance(objs, dict): - for o in objs: - _ = o["Id"] - else: - _ = objs["Id"] - return objs - - -if __name__ == "__main__": - unittest.main() -- cgit v1.2.3-54-g00ecf