summaryrefslogtreecommitdiff
path: root/test/apiv2/python/rest_api
diff options
context:
space:
mode:
Diffstat (limited to 'test/apiv2/python/rest_api')
-rw-r--r--test/apiv2/python/rest_api/__init__.py0
-rw-r--r--test/apiv2/python/rest_api/fixtures/__init__.py3
-rw-r--r--test/apiv2/python/rest_api/fixtures/api_testcase.py103
-rw-r--r--test/apiv2/python/rest_api/fixtures/podman.py136
-rw-r--r--test/apiv2/python/rest_api/test_v2_0_0_container.py192
-rw-r--r--test/apiv2/python/rest_api/test_v2_0_0_image.py165
-rw-r--r--test/apiv2/python/rest_api/test_v2_0_0_manifest.py14
-rw-r--r--test/apiv2/python/rest_api/test_v2_0_0_network.py155
-rw-r--r--test/apiv2/python/rest_api/test_v2_0_0_pod.py65
-rw-r--r--test/apiv2/python/rest_api/test_v2_0_0_system.py88
-rw-r--r--test/apiv2/python/rest_api/test_v2_0_0_volume.py75
-rw-r--r--test/apiv2/python/rest_api/v1_test_rest_v1_0_0.py238
12 files changed, 1234 insertions, 0 deletions
diff --git a/test/apiv2/python/rest_api/__init__.py b/test/apiv2/python/rest_api/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/test/apiv2/python/rest_api/__init__.py
diff --git a/test/apiv2/python/rest_api/fixtures/__init__.py b/test/apiv2/python/rest_api/fixtures/__init__.py
new file mode 100644
index 000000000..5d763e454
--- /dev/null
+++ b/test/apiv2/python/rest_api/fixtures/__init__.py
@@ -0,0 +1,3 @@
+from .api_testcase import APITestCase
+
+__all__ = ["APITestCase"]
diff --git a/test/apiv2/python/rest_api/fixtures/api_testcase.py b/test/apiv2/python/rest_api/fixtures/api_testcase.py
new file mode 100644
index 000000000..8b771774b
--- /dev/null
+++ b/test/apiv2/python/rest_api/fixtures/api_testcase.py
@@ -0,0 +1,103 @@
+import json
+import subprocess
+import unittest
+
+import requests
+import sys
+import time
+
+from .podman import Podman
+
+
+class APITestCase(unittest.TestCase):
+ PODMAN_URL = "http://localhost:8080"
+ podman = None # initialized podman configuration for tests
+ service = None # podman service instance
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ APITestCase.podman = Podman()
+ APITestCase.service = APITestCase.podman.open(
+ "system", "service", "tcp:localhost:8080", "--time=0"
+ )
+ # give the service some time to be ready...
+ time.sleep(2)
+
+ returncode = APITestCase.service.poll()
+ if returncode is not None:
+ raise subprocess.CalledProcessError(returncode, "podman system service")
+
+ r = requests.post(
+ APITestCase.uri("/images/pull?reference=quay.io%2Flibpod%2Falpine%3Alatest")
+ )
+ if r.status_code != 200:
+ raise subprocess.CalledProcessError(
+ r.status_code, f"podman images pull quay.io/libpod/alpine:latest {r.text}"
+ )
+
+ @classmethod
+ def tearDownClass(cls):
+ APITestCase.service.terminate()
+ stdout, stderr = APITestCase.service.communicate(timeout=0.5)
+ if stdout:
+ sys.stdout.write("\nService Stdout:\n" + stdout.decode("utf-8"))
+ if stderr:
+ sys.stderr.write("\nService Stderr:\n" + stderr.decode("utf-8"))
+ return super().tearDownClass()
+
+ def setUp(self):
+ super().setUp()
+ APITestCase.podman.run("run", "alpine", "/bin/ls", check=True)
+
+ def tearDown(self) -> None:
+ APITestCase.podman.run("pod", "rm", "--all", "--force", check=True)
+ APITestCase.podman.run("rm", "--all", "--force", check=True)
+ super().tearDown()
+
+ @property
+ def podman_url(self):
+ return "http://localhost:8080"
+
+ @staticmethod
+ def uri(path):
+ return APITestCase.PODMAN_URL + "/v2.0.0/libpod" + path
+
+ def resolve_container(self, path):
+ """Find 'first' container and return 'Id' formatted into given URI path."""
+
+ try:
+ r = requests.get(self.uri("/containers/json?all=true"))
+ containers = r.json()
+ except Exception as e:
+ msg = f"Bad container response: {e}"
+ if r is not None:
+ msg += ": " + r.text
+ raise self.failureException(msg)
+ return path.format(containers[0]["Id"])
+
+ def assertContainerExists(self, member, msg=None): # pylint: disable=invalid-name
+ r = requests.get(self.uri(f"/containers/{member}/exists"))
+ if r.status_code == 404:
+ if msg is None:
+ msg = f"Container '{member}' does not exist."
+ self.failureException(msg)
+
+ def assertContainerNotExists(self, member, msg=None): # pylint: disable=invalid-name
+ r = requests.get(self.uri(f"/containers/{member}/exists"))
+ if r.status_code == 204:
+ if msg is None:
+ msg = f"Container '{member}' exists."
+ self.failureException(msg)
+
+ def assertId(self, content): # pylint: disable=invalid-name
+ objects = json.loads(content)
+ try:
+ if isinstance(objects, dict):
+ _ = objects["Id"]
+ else:
+ for item in objects:
+ _ = item["Id"]
+ except KeyError:
+ self.failureException("Failed in find 'Id' in return value.")
diff --git a/test/apiv2/python/rest_api/fixtures/podman.py b/test/apiv2/python/rest_api/fixtures/podman.py
new file mode 100644
index 000000000..bae04f87d
--- /dev/null
+++ b/test/apiv2/python/rest_api/fixtures/podman.py
@@ -0,0 +1,136 @@
+import configparser
+import json
+import os
+import shutil
+import subprocess
+import sys
+import tempfile
+
+
+class Podman:
+ """
+ Instances hold the configuration and setup for running podman commands
+ """
+
+ def __init__(self):
+ """Initialize a Podman instance with global options"""
+ binary = os.getenv("PODMAN", "bin/podman")
+ self.cmd = [binary, "--storage-driver=vfs"]
+
+ cgroupfs = os.getenv("CGROUP_MANAGER", "systemd")
+ self.cmd.append(f"--cgroup-manager={cgroupfs}")
+
+ if os.getenv("DEBUG"):
+ self.cmd.append("--log-level=debug")
+ self.cmd.append("--syslog=true")
+
+ self.anchor_directory = tempfile.mkdtemp(prefix="podman_restapi_")
+ self.cmd.append("--root=" + os.path.join(self.anchor_directory, "crio"))
+ self.cmd.append("--runroot=" + os.path.join(self.anchor_directory, "crio-run"))
+
+ os.environ["CONTAINERS_REGISTRIES_CONF"] = os.path.join(
+ self.anchor_directory, "registry.conf"
+ )
+ p = configparser.ConfigParser()
+ p.read_dict(
+ {
+ "registries.search": {"registries": "['quay.io']"},
+ "registries.insecure": {"registries": "[]"},
+ "registries.block": {"registries": "[]"},
+ }
+ )
+ with open(os.environ["CONTAINERS_REGISTRIES_CONF"], "w") as w:
+ p.write(w)
+
+ os.environ["CNI_CONFIG_PATH"] = os.path.join(self.anchor_directory, "cni", "net.d")
+ os.makedirs(os.environ["CNI_CONFIG_PATH"], exist_ok=True)
+ self.cmd.append("--cni-config-dir=" + os.environ["CNI_CONFIG_PATH"])
+ cni_cfg = os.path.join(os.environ["CNI_CONFIG_PATH"], "87-podman-bridge.conflist")
+ # json decoded and encoded to ensure legal json
+ buf = json.loads(
+ """
+ {
+ "cniVersion": "0.3.0",
+ "name": "podman",
+ "plugins": [{
+ "type": "bridge",
+ "bridge": "cni0",
+ "isGateway": true,
+ "ipMasq": true,
+ "ipam": {
+ "type": "host-local",
+ "subnet": "10.88.0.0/16",
+ "routes": [{
+ "dst": "0.0.0.0/0"
+ }]
+ }
+ },
+ {
+ "type": "portmap",
+ "capabilities": {
+ "portMappings": true
+ }
+ }
+ ]
+ }
+ """
+ )
+ with open(cni_cfg, "w") as w:
+ json.dump(buf, w)
+
+ def open(self, command, *args, **kwargs):
+ """Podman initialized instance to run a given command
+
+ :param self: Podman instance
+ :param command: podman sub-command to run
+ :param args: arguments and options for command
+ :param kwargs: See subprocess.Popen() for shell keyword
+ :return: subprocess.Popen() instance configured to run podman instance
+ """
+ cmd = self.cmd.copy()
+ cmd.append(command)
+ cmd.extend(args)
+
+ shell = kwargs.get("shell", False)
+
+ return subprocess.Popen(
+ cmd,
+ shell=shell,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ def run(self, command, *args, **kwargs):
+ """Run given podman command
+
+ :param self: Podman instance
+ :param command: podman sub-command to run
+ :param args: arguments and options for command
+ :param kwargs: See subprocess.Popen() for shell and check keywords
+ :return: subprocess.Popen() instance configured to run podman instance
+ """
+ cmd = self.cmd.copy()
+ cmd.append(command)
+ cmd.extend(args)
+
+ check = kwargs.get("check", False)
+ shell = kwargs.get("shell", False)
+
+ try:
+ return subprocess.run(
+ cmd,
+ shell=shell,
+ check=check,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE,
+ )
+ except subprocess.CalledProcessError as e:
+ if e.stdout:
+ sys.stdout.write("\nRun Stdout:\n" + e.stdout.decode("utf-8"))
+ if e.stderr:
+ sys.stderr.write("\nRun Stderr:\n" + e.stderr.decode("utf-8"))
+ raise
+
+ def tear_down(self):
+ shutil.rmtree(self.anchor_directory, ignore_errors=True)
diff --git a/test/apiv2/python/rest_api/test_v2_0_0_container.py b/test/apiv2/python/rest_api/test_v2_0_0_container.py
new file mode 100644
index 000000000..70c07d47f
--- /dev/null
+++ b/test/apiv2/python/rest_api/test_v2_0_0_container.py
@@ -0,0 +1,192 @@
+import random
+import unittest
+
+import requests
+from dateutil.parser import parse
+
+from .fixtures import APITestCase
+
+
+class ContainerTestCase(APITestCase):
+ def test_list(self):
+ r = requests.get(self.uri("/containers/json"), timeout=5)
+ self.assertEqual(r.status_code, 200, r.text)
+ obj = r.json()
+ self.assertEqual(len(obj), 0)
+
+ def test_list_all(self):
+ r = requests.get(self.uri("/containers/json?all=true"))
+ self.assertEqual(r.status_code, 200, r.text)
+ self.assertId(r.content)
+
+ def test_inspect(self):
+ r = requests.get(self.uri(self.resolve_container("/containers/{}/json")))
+ self.assertEqual(r.status_code, 200, r.text)
+ self.assertId(r.content)
+ _ = parse(r.json()["Created"])
+
+ def test_stats(self):
+ r = requests.get(self.uri(self.resolve_container("/containers/{}/stats?stream=false")))
+ self.assertIn(r.status_code, (200, 409), r.text)
+ if r.status_code == 200:
+ self.assertId(r.content)
+
+ def test_delete(self):
+ r = requests.delete(self.uri(self.resolve_container("/containers/{}")))
+ self.assertEqual(r.status_code, 204, r.text)
+
+ def test_stop(self):
+ r = requests.post(self.uri(self.resolve_container("/containers/{}/start")))
+ self.assertIn(r.status_code, (204, 304), r.text)
+
+ r = requests.post(self.uri(self.resolve_container("/containers/{}/stop")))
+ self.assertIn(r.status_code, (204, 304), r.text)
+
+ def test_start(self):
+ r = requests.post(self.uri(self.resolve_container("/containers/{}/stop")))
+ self.assertIn(r.status_code, (204, 304), r.text)
+
+ r = requests.post(self.uri(self.resolve_container("/containers/{}/start")))
+ self.assertIn(r.status_code, (204, 304), r.text)
+
+ def test_restart(self):
+ r = requests.post(self.uri(self.resolve_container("/containers/{}/start")))
+ self.assertIn(r.status_code, (204, 304), r.text)
+
+ r = requests.post(self.uri(self.resolve_container("/containers/{}/restart")), timeout=5)
+ self.assertEqual(r.status_code, 204, r.text)
+
+ def test_resize(self):
+ r = requests.post(self.uri(self.resolve_container("/containers/{}/resize?h=43&w=80")))
+ self.assertIn(r.status_code, (200, 409), r.text)
+ if r.status_code == 200:
+ self.assertEqual(r.text, "", r.text)
+
+ def test_attach(self):
+ self.skipTest("FIXME: Test timeouts")
+ r = requests.post(self.uri(self.resolve_container("/containers/{}/attach")), timeout=5)
+ self.assertIn(r.status_code, (101, 500), r.text)
+
+ def test_logs(self):
+ r = requests.get(self.uri(self.resolve_container("/containers/{}/logs?stdout=true")))
+ self.assertEqual(r.status_code, 200, r.text)
+
+ def test_commit(self):
+ r = requests.post(self.uri(self.resolve_container("/commit?container={}")))
+ self.assertEqual(r.status_code, 200, r.text)
+ self.assertId(r.content)
+
+ obj = r.json()
+ self.assertIsInstance(obj, dict)
+
+ def test_prune(self):
+ name = f"Container_{random.getrandbits(160):x}"
+
+ r = requests.post(
+ self.podman_url + f"/v1.40/containers/create?name={name}",
+ json={
+ "Cmd": ["cp", "/etc/motd", "/motd.size_test"],
+ "Image": "alpine:latest",
+ "NetworkDisabled": True,
+ },
+ )
+ self.assertEqual(r.status_code, 201, r.text)
+ create = r.json()
+
+ r = requests.post(self.podman_url + f"/v1.40/containers/{create['Id']}/start")
+ self.assertEqual(r.status_code, 204, r.text)
+
+ r = requests.post(self.podman_url + f"/v1.40/containers/{create['Id']}/wait")
+ self.assertEqual(r.status_code, 200, r.text)
+ wait = r.json()
+ self.assertEqual(wait["StatusCode"], 0, wait["Error"]["Message"])
+
+ prune = requests.post(self.podman_url + "/v1.40/containers/prune")
+ self.assertEqual(prune.status_code, 200, prune.status_code)
+ prune_payload = prune.json()
+ self.assertGreater(prune_payload["SpaceReclaimed"], 0)
+ self.assertIn(create["Id"], prune_payload["ContainersDeleted"])
+
+ # Delete any orphaned containers
+ r = requests.get(self.podman_url + "/v1.40/containers/json?all=true")
+ self.assertEqual(r.status_code, 200, r.text)
+ for self.resolve_container in r.json():
+ requests.delete(
+ self.podman_url + f"/v1.40/containers/{self.resolve_container['Id']}?force=true"
+ )
+
+ # Image prune here tied to containers freeing up
+ prune = requests.post(self.podman_url + "/v1.40/images/prune")
+ self.assertEqual(prune.status_code, 200, prune.text)
+ prune_payload = prune.json()
+ self.assertGreater(prune_payload["SpaceReclaimed"], 0)
+
+ # FIXME need method to determine which image is going to be "pruned" to fix test
+ # TODO should handler be recursive when deleting images?
+ # self.assertIn(img["Id"], prune_payload["ImagesDeleted"][1]["Deleted"])
+
+ # FIXME (@vrothberg): I commented this line out during the `libimage` migration.
+ # It doesn't make sense to report anything to be deleted if the reclaimed space
+ # is zero. I think the test needs some rewrite.
+ # self.assertIsNotNone(prune_payload["ImagesDeleted"][1]["Deleted"])
+
+ def test_status(self):
+ r = requests.post(
+ self.podman_url + "/v1.40/containers/create?name=topcontainer",
+ json={"Cmd": ["top"], "Image": "alpine:latest"},
+ )
+ self.assertEqual(r.status_code, 201, r.text)
+ payload = r.json()
+ container_id = payload["Id"]
+ self.assertIsNotNone(container_id)
+
+ r = requests.get(
+ self.podman_url + "/v1.40/containers/json",
+ params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'},
+ )
+ self.assertEqual(r.status_code, 200, r.text)
+ payload = r.json()
+ self.assertEqual(payload[0]["Status"], "Created")
+
+ r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/start")
+ self.assertEqual(r.status_code, 204, r.text)
+
+ r = requests.get(
+ self.podman_url + "/v1.40/containers/json",
+ params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'},
+ )
+ self.assertEqual(r.status_code, 200, r.text)
+ payload = r.json()
+ self.assertTrue(str(payload[0]["Status"]).startswith("Up"))
+
+ r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/pause")
+ self.assertEqual(r.status_code, 204, r.text)
+
+ r = requests.get(
+ self.podman_url + "/v1.40/containers/json",
+ params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'},
+ )
+ self.assertEqual(r.status_code, 200, r.text)
+ payload = r.json()
+ self.assertTrue(str(payload[0]["Status"]).startswith("Up"))
+ self.assertTrue(str(payload[0]["Status"]).endswith("(Paused)"))
+
+ r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/unpause")
+ self.assertEqual(r.status_code, 204, r.text)
+ r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/stop")
+ self.assertEqual(r.status_code, 204, r.text)
+
+ r = requests.get(
+ self.podman_url + "/v1.40/containers/json",
+ params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'},
+ )
+ self.assertEqual(r.status_code, 200, r.text)
+ payload = r.json()
+ self.assertTrue(str(payload[0]["Status"]).startswith("Exited"))
+
+ r = requests.delete(self.podman_url + f"/v1.40/containers/{container_id}")
+ self.assertEqual(r.status_code, 204, r.text)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/apiv2/python/rest_api/test_v2_0_0_image.py b/test/apiv2/python/rest_api/test_v2_0_0_image.py
new file mode 100644
index 000000000..99f513608
--- /dev/null
+++ b/test/apiv2/python/rest_api/test_v2_0_0_image.py
@@ -0,0 +1,165 @@
+import json
+import unittest
+from multiprocessing import Process
+
+import requests
+from dateutil.parser import parse
+from .fixtures import APITestCase
+
+
+class ImageTestCase(APITestCase):
+ def test_list(self):
+ r = requests.get(self.podman_url + "/v1.40/images/json")
+ self.assertEqual(r.status_code, 200, r.text)
+
+ # See https://docs.docker.com/engine/api/v1.40/#operation/ImageList
+ required_keys = (
+ "Id",
+ "ParentId",
+ "RepoTags",
+ "RepoDigests",
+ "Created",
+ "Size",
+ "SharedSize",
+ "VirtualSize",
+ "Labels",
+ "Containers",
+ )
+ images = r.json()
+ self.assertIsInstance(images, list)
+ for item in images:
+ self.assertIsInstance(item, dict)
+ for k in required_keys:
+ self.assertIn(k, item)
+
+ def test_inspect(self):
+ r = requests.get(self.podman_url + "/v1.40/images/alpine/json")
+ self.assertEqual(r.status_code, 200, r.text)
+
+ # See https://docs.docker.com/engine/api/v1.40/#operation/ImageInspect
+ required_keys = (
+ "Id",
+ "Parent",
+ "Comment",
+ "Created",
+ "Container",
+ "DockerVersion",
+ "Author",
+ "Architecture",
+ "Os",
+ "Size",
+ "VirtualSize",
+ "GraphDriver",
+ "RootFS",
+ "Metadata",
+ )
+
+ image = r.json()
+ self.assertIsInstance(image, dict)
+ for item in required_keys:
+ self.assertIn(item, image)
+ _ = parse(image["Created"])
+
+ def test_delete(self):
+ r = requests.delete(self.podman_url + "/v1.40/images/alpine?force=true")
+ self.assertEqual(r.status_code, 200, r.text)
+ self.assertIsInstance(r.json(), list)
+
+ def test_pull(self):
+ r = requests.post(self.uri("/images/pull?reference=alpine"), timeout=15)
+ self.assertEqual(r.status_code, 200, r.status_code)
+ text = r.text
+ keys = {
+ "error": False,
+ "id": False,
+ "images": False,
+ "stream": False,
+ }
+ # Read and record stanza's from pull
+ for line in str.splitlines(text):
+ obj = json.loads(line)
+ key_list = list(obj.keys())
+ for k in key_list:
+ keys[k] = True
+
+ self.assertFalse(keys["error"], "Expected no errors")
+ self.assertTrue(keys["id"], "Expected to find id stanza")
+ self.assertTrue(keys["images"], "Expected to find images stanza")
+ self.assertTrue(keys["stream"], "Expected to find stream progress stanza's")
+
+ def test_search_compat(self):
+ url = self.podman_url + "/v1.40/images/search"
+
+ # Had issues with this test hanging when repositories not happy
+ def do_search1():
+ payload = {"term": "alpine"}
+ r = requests.get(url, params=payload, timeout=5)
+ self.assertEqual(r.status_code, 200, f"#1: {r.text}")
+ self.assertIsInstance(r.json(), list)
+
+ def do_search2():
+ payload = {"term": "alpine", "limit": 1}
+ r = requests.get(url, params=payload, timeout=5)
+ self.assertEqual(r.status_code, 200, f"#2: {r.text}")
+
+ results = r.json()
+ self.assertIsInstance(results, list)
+ self.assertEqual(len(results), 1)
+
+ def do_search3():
+ # FIXME: Research if quay.io supports is-official and which image is "official"
+ return
+ payload = {"term": "thanos", "filters": '{"is-official":["true"]}'}
+ r = requests.get(url, params=payload, timeout=5)
+ self.assertEqual(r.status_code, 200, f"#3: {r.text}")
+
+ results = r.json()
+ self.assertIsInstance(results, list)
+
+ # There should be only one official image
+ self.assertEqual(len(results), 1)
+
+ def do_search4():
+ headers = {"X-Registry-Auth": "null"}
+ payload = {"term": "alpine"}
+ r = requests.get(url, params=payload, headers=headers, timeout=5)
+ self.assertEqual(r.status_code, 200, f"#4: {r.text}")
+
+ def do_search5():
+ headers = {"X-Registry-Auth": "invalid value"}
+ payload = {"term": "alpine"}
+ r = requests.get(url, params=payload, headers=headers, timeout=5)
+ self.assertEqual(r.status_code, 400, f"#5: {r.text}")
+
+ i = 1
+ for fn in [do_search1, do_search2, do_search3, do_search4, do_search5]:
+ with self.subTest(i=i):
+ search = Process(target=fn)
+ search.start()
+ search.join(timeout=10)
+ self.assertFalse(search.is_alive(), f"#{i} /images/search took too long")
+
+ # search_methods = [do_search1, do_search2, do_search3, do_search4, do_search5]
+ # for search_method in search_methods:
+ # search = Process(target=search_method)
+ # search.start()
+ # search.join(timeout=10)
+ # self.assertFalse(search.is_alive(), "/images/search took too long")
+
+ def test_history(self):
+ r = requests.get(self.podman_url + "/v1.40/images/alpine/history")
+ self.assertEqual(r.status_code, 200, r.text)
+
+ # See https://docs.docker.com/engine/api/v1.40/#operation/ImageHistory
+ required_keys = ("Id", "Created", "CreatedBy", "Tags", "Size", "Comment")
+
+ changes = r.json()
+ self.assertIsInstance(changes, list)
+ for change in changes:
+ self.assertIsInstance(change, dict)
+ for k in required_keys:
+ self.assertIn(k, change)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/apiv2/python/rest_api/test_v2_0_0_manifest.py b/test/apiv2/python/rest_api/test_v2_0_0_manifest.py
new file mode 100644
index 000000000..c28c63bcb
--- /dev/null
+++ b/test/apiv2/python/rest_api/test_v2_0_0_manifest.py
@@ -0,0 +1,14 @@
+import unittest
+
+import requests
+from .fixtures import APITestCase
+
+
+class ManifestTestCase(APITestCase):
+ def test_manifest_409(self):
+ r = requests.post(self.uri("/manifests/create"), params={"name": "ThisIsAnInvalidImage"})
+ self.assertEqual(r.status_code, 400, r.text)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/apiv2/python/rest_api/test_v2_0_0_network.py b/test/apiv2/python/rest_api/test_v2_0_0_network.py
new file mode 100644
index 000000000..3888123fb
--- /dev/null
+++ b/test/apiv2/python/rest_api/test_v2_0_0_network.py
@@ -0,0 +1,155 @@
+import random
+import unittest
+
+import requests
+
+from .fixtures import APITestCase
+
+
+class NetworkTestCase(APITestCase):
+ # TODO Need to support Docker-py order of network/container creates
+ def test_connect(self):
+ """Create network and container then connect to network"""
+ net_default = requests.post(
+ self.podman_url + "/v1.40/networks/create", json={"Name": "TestDefaultNetwork"}
+ )
+ self.assertEqual(net_default.status_code, 201, net_default.text)
+
+ create = requests.post(
+ self.podman_url + "/v1.40/containers/create?name=postCreateConnect",
+ json={
+ "Cmd": ["top"],
+ "Image": "alpine:latest",
+ "NetworkDisabled": False,
+ # FIXME adding these 2 lines cause: (This is sampled from docker-py)
+ # "network already exists","message":"container
+ # 01306e499df5441560d70071a54342611e422a94de20865add50a9565fd79fb9 is already connected to CNI
+ # network \"TestDefaultNetwork\": network already exists"
+ # "HostConfig": {"NetworkMode": "TestDefaultNetwork"},
+ # "NetworkingConfig": {"EndpointsConfig": {"TestDefaultNetwork": None}},
+ # FIXME These two lines cause:
+ # CNI network \"TestNetwork\" not found","message":"error configuring network namespace for container
+ # 369ddfa7d3211ebf1fbd5ddbff91bd33fa948858cea2985c133d6b6507546dff: CNI network \"TestNetwork\" not
+ # found"
+ # "HostConfig": {"NetworkMode": "TestNetwork"},
+ # "NetworkingConfig": {"EndpointsConfig": {"TestNetwork": None}},
+ # FIXME no networking defined cause: (note this error is from the container inspect below)
+ # "internal libpod error","message":"network inspection mismatch: asked to join 2 CNI network(s) [
+ # TestDefaultNetwork podman], but have information on 1 network(s): internal libpod error"
+ },
+ )
+ self.assertEqual(create.status_code, 201, create.text)
+ self.assertId(create.content)
+
+ payload = create.json()
+ start = requests.post(self.podman_url + f"/v1.40/containers/{payload['Id']}/start")
+ self.assertEqual(start.status_code, 204, start.text)
+
+ connect = requests.post(
+ self.podman_url + "/v1.40/networks/TestDefaultNetwork/connect",
+ json={"Container": payload["Id"]},
+ )
+ self.assertEqual(connect.status_code, 200, connect.text)
+ self.assertEqual(connect.text, "OK\n")
+
+ inspect = requests.get(f"{self.podman_url}/v1.40/containers/{payload['Id']}/json")
+ self.assertEqual(inspect.status_code, 200, inspect.text)
+
+ payload = inspect.json()
+ self.assertFalse(payload["Config"].get("NetworkDisabled", False))
+
+ self.assertEqual(
+ "TestDefaultNetwork",
+ payload["NetworkSettings"]["Networks"]["TestDefaultNetwork"]["NetworkID"],
+ )
+ # TODO restore this to test, when joining multiple networks possible
+ # self.assertEqual(
+ # "TestNetwork",
+ # payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"],
+ # )
+ # TODO Need to support network aliases
+ # self.assertIn(
+ # "test_post_create",
+ # payload["NetworkSettings"]["Networks"]["TestNetwork"]["Aliases"],
+ # )
+
+ def test_create(self):
+ """Create network and connect container during create"""
+ net = requests.post(
+ self.podman_url + "/v1.40/networks/create", json={"Name": "TestNetwork"}
+ )
+ self.assertEqual(net.status_code, 201, net.text)
+
+ create = requests.post(
+ self.podman_url + "/v1.40/containers/create?name=postCreate",
+ json={
+ "Cmd": ["date"],
+ "Image": "alpine:latest",
+ "NetworkDisabled": False,
+ "HostConfig": {"NetworkMode": "TestNetwork"},
+ },
+ )
+ self.assertEqual(create.status_code, 201, create.text)
+ self.assertId(create.content)
+
+ payload = create.json()
+ inspect = requests.get(f"{self.podman_url}/v1.40/containers/{payload['Id']}/json")
+ self.assertEqual(inspect.status_code, 200, inspect.text)
+
+ payload = inspect.json()
+ self.assertFalse(payload["Config"].get("NetworkDisabled", False))
+ self.assertEqual(
+ "TestNetwork",
+ payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"],
+ )
+
+ def test_crud(self):
+ name = f"Network_{random.getrandbits(160):x}"
+
+ # Cannot test for 0 existing networks because default "podman" network always exists
+
+ create = requests.post(self.podman_url + "/v1.40/networks/create", json={"Name": name})
+ self.assertEqual(create.status_code, 201, create.text)
+ self.assertId(create.content)
+
+ net = create.json()
+ self.assertIsInstance(net, dict)
+ self.assertNotEqual(net["Id"], name)
+ ident = net["Id"]
+
+ ls = requests.get(self.podman_url + "/v1.40/networks")
+ self.assertEqual(ls.status_code, 200, ls.text)
+
+ networks = ls.json()
+ self.assertIsInstance(networks, list)
+
+ found = False
+ for net in networks:
+ if net["Name"] == name:
+ found = True
+ break
+ self.assertTrue(found, f"Network '{name}' not found")
+
+ inspect = requests.get(self.podman_url + f"/v1.40/networks/{ident}")
+ self.assertEqual(inspect.status_code, 200, inspect.text)
+ self.assertIsInstance(inspect.json(), dict)
+
+ inspect = requests.delete(self.podman_url + f"/v1.40/networks/{ident}")
+ self.assertEqual(inspect.status_code, 204, inspect.text)
+ inspect = requests.get(self.podman_url + f"/v1.40/networks/{ident}")
+ self.assertEqual(inspect.status_code, 404, inspect.text)
+
+ # network prune
+ prune_name = f"Network_{random.getrandbits(160):x}"
+ prune_create = requests.post(
+ self.podman_url + "/v1.40/networks/create", json={"Name": prune_name}
+ )
+ self.assertEqual(create.status_code, 201, prune_create.text)
+
+ prune = requests.post(self.podman_url + "/v1.40/networks/prune")
+ self.assertEqual(prune.status_code, 200, prune.text)
+ self.assertTrue(prune_name in prune.json()["NetworksDeleted"])
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/apiv2/python/rest_api/test_v2_0_0_pod.py b/test/apiv2/python/rest_api/test_v2_0_0_pod.py
new file mode 100644
index 000000000..9155ad19c
--- /dev/null
+++ b/test/apiv2/python/rest_api/test_v2_0_0_pod.py
@@ -0,0 +1,65 @@
+import random
+import unittest
+
+import requests
+from .fixtures import APITestCase
+
+
+class TestApi(APITestCase):
+ def test_pod_start_conflict(self):
+ """Verify issue #8865"""
+
+ pod_name = list()
+ pod_name.append(f"Pod_{random.getrandbits(160):x}")
+ pod_name.append(f"Pod_{random.getrandbits(160):x}")
+
+ r = requests.post(
+ self.uri("/pods/create"),
+ json={
+ "name": pod_name[0],
+ "no_infra": False,
+ "portmappings": [{"host_ip": "127.0.0.1", "host_port": 8889, "container_port": 89}],
+ },
+ )
+ self.assertEqual(r.status_code, 201, r.text)
+ r = requests.post(
+ self.uri("/containers/create"),
+ json={
+ "pod": pod_name[0],
+ "image": "quay.io/libpod/alpine:latest",
+ "command": ["top"],
+ },
+ )
+ self.assertEqual(r.status_code, 201, r.text)
+
+ r = requests.post(
+ self.uri("/pods/create"),
+ json={
+ "name": pod_name[1],
+ "no_infra": False,
+ "portmappings": [{"host_ip": "127.0.0.1", "host_port": 8889, "container_port": 89}],
+ },
+ )
+ self.assertEqual(r.status_code, 201, r.text)
+ r = requests.post(
+ self.uri("/containers/create"),
+ json={
+ "pod": pod_name[1],
+ "image": "quay.io/libpod/alpine:latest",
+ "command": ["top"],
+ },
+ )
+ self.assertEqual(r.status_code, 201, r.text)
+
+ r = requests.post(self.uri(f"/pods/{pod_name[0]}/start"))
+ self.assertEqual(r.status_code, 200, r.text)
+
+ r = requests.post(self.uri(f"/pods/{pod_name[1]}/start"))
+ self.assertEqual(r.status_code, 409, r.text)
+
+ start = r.json()
+ self.assertGreater(len(start["Errs"]), 0, r.text)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/apiv2/python/rest_api/test_v2_0_0_system.py b/test/apiv2/python/rest_api/test_v2_0_0_system.py
new file mode 100644
index 000000000..3628b5af1
--- /dev/null
+++ b/test/apiv2/python/rest_api/test_v2_0_0_system.py
@@ -0,0 +1,88 @@
+import json
+import unittest
+
+import requests
+from .fixtures import APITestCase
+
+
+class SystemTestCase(APITestCase):
+ def test_info(self):
+ r = requests.get(self.uri("/info"))
+ self.assertEqual(r.status_code, 200, r.text)
+ self.assertIsNotNone(r.content)
+ _ = r.json()
+
+ r = requests.get(self.podman_url + "/v1.40/info")
+ self.assertEqual(r.status_code, 200, r.text)
+ self.assertIsNotNone(r.content)
+ _ = r.json()
+
+ def test_events(self):
+ r = requests.get(self.uri("/events?stream=false"))
+ self.assertEqual(r.status_code, 200, r.text)
+ self.assertIsNotNone(r.content)
+
+ report = r.text.splitlines()
+ self.assertGreater(len(report), 0, "No events found!")
+ for line in report:
+ obj = json.loads(line)
+ # Actor.ID is uppercase for compatibility
+ self.assertIn("ID", obj["Actor"])
+
+ def test_ping(self):
+ required_headers = (
+ "API-Version",
+ "Builder-Version",
+ "Docker-Experimental",
+ "Cache-Control",
+ "Pragma",
+ "Pragma",
+ )
+
+ def check_headers(req):
+ for k in required_headers:
+ self.assertIn(k, req.headers)
+
+ r = requests.get(self.podman_url + "/_ping")
+ self.assertEqual(r.status_code, 200, r.text)
+ self.assertEqual(r.text, "OK")
+ check_headers(r)
+
+ r = requests.head(self.podman_url + "/_ping")
+ self.assertEqual(r.status_code, 200, r.text)
+ self.assertEqual(r.text, "")
+ check_headers(r)
+
+ r = requests.get(self.uri("/_ping"))
+ self.assertEqual(r.status_code, 200, r.text)
+ self.assertEqual(r.text, "OK")
+ check_headers(r)
+
+ r = requests.head(self.uri("/_ping"))
+ self.assertEqual(r.status_code, 200, r.text)
+ self.assertEqual(r.text, "")
+ check_headers(r)
+
+ def test_version(self):
+ r = requests.get(self.podman_url + "/v1.40/version")
+ self.assertEqual(r.status_code, 200, r.text)
+
+ r = requests.get(self.uri("/version"))
+ self.assertEqual(r.status_code, 200, r.text)
+
+ def test_df(self):
+ r = requests.get(self.podman_url + "/v1.40/system/df")
+ self.assertEqual(r.status_code, 200, r.text)
+
+ obj = r.json()
+ self.assertIn("Images", obj)
+ self.assertIn("Containers", obj)
+ self.assertIn("Volumes", obj)
+ self.assertIn("BuildCache", obj)
+
+ r = requests.get(self.uri("/system/df"))
+ self.assertEqual(r.status_code, 200, r.text)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/apiv2/python/rest_api/test_v2_0_0_volume.py b/test/apiv2/python/rest_api/test_v2_0_0_volume.py
new file mode 100644
index 000000000..f5231e17c
--- /dev/null
+++ b/test/apiv2/python/rest_api/test_v2_0_0_volume.py
@@ -0,0 +1,75 @@
+import os
+import random
+import unittest
+
+import requests
+from .fixtures import APITestCase
+
+
+class VolumeTestCase(APITestCase):
+ def test_volume(self):
+ name = f"Volume_{random.getrandbits(160):x}"
+
+ ls = requests.get(self.podman_url + "/v1.40/volumes")
+ self.assertEqual(ls.status_code, 200, ls.text)
+
+ # See https://docs.docker.com/engine/api/v1.40/#operation/VolumeList
+ required_keys = (
+ "Volumes",
+ "Warnings",
+ )
+
+ volumes = ls.json()
+ self.assertIsInstance(volumes, dict)
+ for key in required_keys:
+ self.assertIn(key, volumes)
+
+ create = requests.post(self.podman_url + "/v1.40/volumes/create", json={"Name": name})
+ self.assertEqual(create.status_code, 201, create.text)
+
+ # See https://docs.docker.com/engine/api/v1.40/#operation/VolumeCreate
+ # and https://docs.docker.com/engine/api/v1.40/#operation/VolumeInspect
+ required_keys = (
+ "Name",
+ "Driver",
+ "Mountpoint",
+ "Labels",
+ "Scope",
+ "Options",
+ )
+
+ volume = create.json()
+ self.assertIsInstance(volume, dict)
+ for k in required_keys:
+ self.assertIn(k, volume)
+ self.assertEqual(volume["Name"], name)
+
+ inspect = requests.get(self.podman_url + f"/v1.40/volumes/{name}")
+ self.assertEqual(inspect.status_code, 200, inspect.text)
+
+ volume = inspect.json()
+ self.assertIsInstance(volume, dict)
+ for k in required_keys:
+ self.assertIn(k, volume)
+
+ rm = requests.delete(self.podman_url + f"/v1.40/volumes/{name}")
+ self.assertEqual(rm.status_code, 204, rm.text)
+
+ # recreate volume with data and then prune it
+ r = requests.post(self.podman_url + "/v1.40/volumes/create", json={"Name": name})
+ self.assertEqual(create.status_code, 201, create.text)
+
+ create = r.json()
+ with open(os.path.join(create["Mountpoint"], "test_prune"), "w") as file:
+ file.writelines(["This is a test\n", "This is a good test\n"])
+
+ prune = requests.post(self.podman_url + "/v1.40/volumes/prune")
+ self.assertEqual(prune.status_code, 200, prune.text)
+
+ payload = prune.json()
+ self.assertIn(name, payload["VolumesDeleted"])
+ self.assertGreater(payload["SpaceReclaimed"], 0)
+
+
+if __name__ == "__main__":
+ unittest.main()
diff --git a/test/apiv2/python/rest_api/v1_test_rest_v1_0_0.py b/test/apiv2/python/rest_api/v1_test_rest_v1_0_0.py
new file mode 100644
index 000000000..905c29683
--- /dev/null
+++ b/test/apiv2/python/rest_api/v1_test_rest_v1_0_0.py
@@ -0,0 +1,238 @@
+import json
+import os
+import shlex
+import signal
+import string
+import subprocess
+import sys
+import time
+import unittest
+from collections.abc import Iterable
+from multiprocessing import Process
+
+import requests
+from dateutil.parser import parse
+
+PODMAN_URL = "http://localhost:8080"
+
+
+def _url(path):
+ return PODMAN_URL + "/v1.0.0/libpod" + path
+
+
+def podman():
+ binary = os.getenv("PODMAN_BINARY")
+ if binary is None:
+ binary = "bin/podman"
+ return binary
+
+
+def ctnr(path):
+ r = requests.get(_url("/containers/json?all=true"))
+ try:
+ ctnrs = json.loads(r.text)
+ except Exception as e:
+ sys.stderr.write("Bad container response: {}/{}".format(r.text, e))
+ raise e
+ return path.format(ctnrs[0]["Id"])
+
+
+class TestApi(unittest.TestCase):
+ podman = None
+
+ def setUp(self):
+ super().setUp()
+ if TestApi.podman.poll() is not None:
+ sys.stderr.write("podman service returned {}", TestApi.podman.returncode)
+ sys.exit(2)
+ requests.get(_url("/images/create?fromSrc=quay.io%2Flibpod%2Falpine%3Alatest"))
+ # calling out to podman is easier than the API for running a container
+ subprocess.run(
+ [podman(), "run", "alpine", "/bin/ls"],
+ check=True,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+
+ TestApi.podman = subprocess.Popen(
+ [
+ podman(),
+ "system",
+ "service",
+ "tcp:localhost:8080",
+ "--log-level=debug",
+ "--time=0",
+ ],
+ shell=False,
+ stdin=subprocess.DEVNULL,
+ stdout=subprocess.DEVNULL,
+ stderr=subprocess.DEVNULL,
+ )
+ time.sleep(2)
+
+ @classmethod
+ def tearDownClass(cls):
+ TestApi.podman.terminate()
+ stdout, stderr = TestApi.podman.communicate(timeout=0.5)
+ if stdout:
+ print("\nService Stdout:\n" + stdout.decode("utf-8"))
+ if stderr:
+ print("\nService Stderr:\n" + stderr.decode("utf-8"))
+
+ if TestApi.podman.returncode > 0:
+ sys.stderr.write("podman exited with error code {}\n".format(TestApi.podman.returncode))
+ sys.exit(2)
+
+ return super().tearDownClass()
+
+ def test_info(self):
+ r = requests.get(_url("/info"))
+ self.assertEqual(r.status_code, 200)
+ self.assertIsNotNone(r.content)
+ _ = json.loads(r.text)
+
+ def test_events(self):
+ r = requests.get(_url("/events?stream=false"))
+ self.assertEqual(r.status_code, 200, r.text)
+ self.assertIsNotNone(r.content)
+ for line in r.text.splitlines():
+ obj = json.loads(line)
+ # Actor.ID is uppercase for compatibility
+ _ = obj["Actor"]["ID"]
+
+ def test_containers(self):
+ r = requests.get(_url("/containers/json"), timeout=5)
+ self.assertEqual(r.status_code, 200, r.text)
+ obj = json.loads(r.text)
+ self.assertEqual(len(obj), 0)
+
+ def test_containers_all(self):
+ r = requests.get(_url("/containers/json?all=true"))
+ self.assertEqual(r.status_code, 200, r.text)
+ self.validateObjectFields(r.text)
+
+ def test_inspect_container(self):
+ r = requests.get(_url(ctnr("/containers/{}/json")))
+ self.assertEqual(r.status_code, 200, r.text)
+ obj = self.validateObjectFields(r.content)
+ _ = parse(obj["Created"])
+
+ def test_stats(self):
+ r = requests.get(_url(ctnr("/containers/{}/stats?stream=false")))
+ self.assertIn(r.status_code, (200, 409), r.text)
+ if r.status_code == 200:
+ self.validateObjectFields(r.text)
+
+ def test_delete_containers(self):
+ r = requests.delete(_url(ctnr("/containers/{}")))
+ self.assertEqual(r.status_code, 204, r.text)
+
+ def test_stop_containers(self):
+ r = requests.post(_url(ctnr("/containers/{}/start")))
+ self.assertIn(r.status_code, (204, 304), r.text)
+
+ r = requests.post(_url(ctnr("/containers/{}/stop")))
+ self.assertIn(r.status_code, (204, 304), r.text)
+
+ def test_start_containers(self):
+ r = requests.post(_url(ctnr("/containers/{}/stop")))
+ self.assertIn(r.status_code, (204, 304), r.text)
+
+ r = requests.post(_url(ctnr("/containers/{}/start")))
+ self.assertIn(r.status_code, (204, 304), r.text)
+
+ def test_restart_containers(self):
+ r = requests.post(_url(ctnr("/containers/{}/start")))
+ self.assertIn(r.status_code, (204, 304), r.text)
+
+ r = requests.post(_url(ctnr("/containers/{}/restart")), timeout=5)
+ self.assertEqual(r.status_code, 204, r.text)
+
+ def test_resize(self):
+ r = requests.post(_url(ctnr("/containers/{}/resize?h=43&w=80")))
+ self.assertIn(r.status_code, (200, 409), r.text)
+ if r.status_code == 200:
+ self.assertIsNone(r.text)
+
+ def test_attach_containers(self):
+ r = requests.post(_url(ctnr("/containers/{}/attach")))
+ self.assertIn(r.status_code, (101, 409), r.text)
+
+ def test_logs_containers(self):
+ r = requests.get(_url(ctnr("/containers/{}/logs?stdout=true")))
+ self.assertEqual(r.status_code, 200, r.text)
+
+ def test_post_create(self):
+ self.skipTest("TODO: create request body")
+ r = requests.post(_url("/containers/create?args=True"))
+ self.assertEqual(r.status_code, 200, r.text)
+ json.loads(r.text)
+
+ def test_commit(self):
+ r = requests.post(_url(ctnr("/commit?container={}")))
+ self.assertEqual(r.status_code, 200, r.text)
+ self.validateObjectFields(r.text)
+
+ def test_images(self):
+ r = requests.get(_url("/images/json"))
+ self.assertEqual(r.status_code, 200, r.text)
+ self.validateObjectFields(r.content)
+
+ def test_inspect_image(self):
+ r = requests.get(_url("/images/alpine/json"))
+ self.assertEqual(r.status_code, 200, r.text)
+ obj = self.validateObjectFields(r.content)
+ _ = parse(obj["Created"])
+
+ def test_delete_image(self):
+ r = requests.delete(_url("/images/alpine?force=true"))
+ self.assertEqual(r.status_code, 200, r.text)
+ json.loads(r.text)
+
+ def test_pull(self):
+ r = requests.post(_url("/images/pull?reference=alpine"), timeout=5)
+ self.assertEqual(r.status_code, 200, r.text)
+ json.loads(r.text)
+
+ def test_search(self):
+ # Had issues with this test hanging when repositories not happy
+ def do_search():
+ r = requests.get(_url("/images/search?term=alpine"), timeout=5)
+ self.assertEqual(r.status_code, 200, r.text)
+ json.loads(r.text)
+
+ search = Process(target=do_search)
+ search.start()
+ search.join(timeout=10)
+ self.assertFalse(search.is_alive(), "/images/search took too long")
+
+ def test_ping(self):
+ r = requests.get(PODMAN_URL + "/_ping")
+ self.assertEqual(r.status_code, 200, r.text)
+
+ r = requests.head(PODMAN_URL + "/_ping")
+ self.assertEqual(r.status_code, 200, r.text)
+
+ r = requests.get(_url("/_ping"))
+ self.assertEqual(r.status_code, 200, r.text)
+
+ r = requests.get(_url("/_ping"))
+ self.assertEqual(r.status_code, 200, r.text)
+
+
+def validateObjectFields(self, buffer):
+ objs = json.loads(buffer)
+ if not isinstance(objs, dict):
+ for o in objs:
+ _ = o["Id"]
+ else:
+ _ = objs["Id"]
+ return objs
+
+
+if __name__ == "__main__":
+ unittest.main()