diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/apiv2/python/__init__.py | 0 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/__init__.py | 0 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/fixtures/__init__.py | 3 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/fixtures/api_testcase.py | 103 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/fixtures/podman.py (renamed from test/apiv2/rest_api/__init__.py) | 6 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/test_v2_0_0_container.py | 192 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/test_v2_0_0_image.py | 165 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/test_v2_0_0_manifest.py | 14 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/test_v2_0_0_network.py | 155 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/test_v2_0_0_pod.py | 65 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/test_v2_0_0_system.py | 88 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/test_v2_0_0_volume.py | 75 | ||||
-rw-r--r-- | test/apiv2/python/rest_api/v1_test_rest_v1_0_0.py (renamed from test/apiv2/rest_api/v1_test_rest_v1_0_0.py) | 2 | ||||
-rw-r--r-- | test/apiv2/rest_api/test_rest_v2_0_0.py | 744 |
14 files changed, 864 insertions, 748 deletions
diff --git a/test/apiv2/python/__init__.py b/test/apiv2/python/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/test/apiv2/python/__init__.py diff --git a/test/apiv2/python/rest_api/__init__.py b/test/apiv2/python/rest_api/__init__.py new file mode 100644 index 000000000..e69de29bb --- /dev/null +++ b/test/apiv2/python/rest_api/__init__.py diff --git a/test/apiv2/python/rest_api/fixtures/__init__.py b/test/apiv2/python/rest_api/fixtures/__init__.py new file mode 100644 index 000000000..5d763e454 --- /dev/null +++ b/test/apiv2/python/rest_api/fixtures/__init__.py @@ -0,0 +1,3 @@ +from .api_testcase import APITestCase + +__all__ = ["APITestCase"] diff --git a/test/apiv2/python/rest_api/fixtures/api_testcase.py b/test/apiv2/python/rest_api/fixtures/api_testcase.py new file mode 100644 index 000000000..8b771774b --- /dev/null +++ b/test/apiv2/python/rest_api/fixtures/api_testcase.py @@ -0,0 +1,103 @@ +import json +import subprocess +import unittest + +import requests +import sys +import time + +from .podman import Podman + + +class APITestCase(unittest.TestCase): + PODMAN_URL = "http://localhost:8080" + podman = None # initialized podman configuration for tests + service = None # podman service instance + + @classmethod + def setUpClass(cls): + super().setUpClass() + + APITestCase.podman = Podman() + APITestCase.service = APITestCase.podman.open( + "system", "service", "tcp:localhost:8080", "--time=0" + ) + # give the service some time to be ready... + time.sleep(2) + + returncode = APITestCase.service.poll() + if returncode is not None: + raise subprocess.CalledProcessError(returncode, "podman system service") + + r = requests.post( + APITestCase.uri("/images/pull?reference=quay.io%2Flibpod%2Falpine%3Alatest") + ) + if r.status_code != 200: + raise subprocess.CalledProcessError( + r.status_code, f"podman images pull quay.io/libpod/alpine:latest {r.text}" + ) + + @classmethod + def tearDownClass(cls): + APITestCase.service.terminate() + stdout, stderr = APITestCase.service.communicate(timeout=0.5) + if stdout: + sys.stdout.write("\nService Stdout:\n" + stdout.decode("utf-8")) + if stderr: + sys.stderr.write("\nService Stderr:\n" + stderr.decode("utf-8")) + return super().tearDownClass() + + def setUp(self): + super().setUp() + APITestCase.podman.run("run", "alpine", "/bin/ls", check=True) + + def tearDown(self) -> None: + APITestCase.podman.run("pod", "rm", "--all", "--force", check=True) + APITestCase.podman.run("rm", "--all", "--force", check=True) + super().tearDown() + + @property + def podman_url(self): + return "http://localhost:8080" + + @staticmethod + def uri(path): + return APITestCase.PODMAN_URL + "/v2.0.0/libpod" + path + + def resolve_container(self, path): + """Find 'first' container and return 'Id' formatted into given URI path.""" + + try: + r = requests.get(self.uri("/containers/json?all=true")) + containers = r.json() + except Exception as e: + msg = f"Bad container response: {e}" + if r is not None: + msg += ": " + r.text + raise self.failureException(msg) + return path.format(containers[0]["Id"]) + + def assertContainerExists(self, member, msg=None): # pylint: disable=invalid-name + r = requests.get(self.uri(f"/containers/{member}/exists")) + if r.status_code == 404: + if msg is None: + msg = f"Container '{member}' does not exist." + self.failureException(msg) + + def assertContainerNotExists(self, member, msg=None): # pylint: disable=invalid-name + r = requests.get(self.uri(f"/containers/{member}/exists")) + if r.status_code == 204: + if msg is None: + msg = f"Container '{member}' exists." + self.failureException(msg) + + def assertId(self, content): # pylint: disable=invalid-name + objects = json.loads(content) + try: + if isinstance(objects, dict): + _ = objects["Id"] + else: + for item in objects: + _ = item["Id"] + except KeyError: + self.failureException("Failed in find 'Id' in return value.") diff --git a/test/apiv2/rest_api/__init__.py b/test/apiv2/python/rest_api/fixtures/podman.py index 0ad6b51b3..bae04f87d 100644 --- a/test/apiv2/rest_api/__init__.py +++ b/test/apiv2/python/rest_api/fixtures/podman.py @@ -7,7 +7,7 @@ import sys import tempfile -class Podman(object): +class Podman: """ Instances hold the configuration and setup for running podman commands """ @@ -34,7 +34,7 @@ class Podman(object): p = configparser.ConfigParser() p.read_dict( { - "registries.search": {"registries": "['docker.io']"}, + "registries.search": {"registries": "['quay.io']"}, "registries.insecure": {"registries": "[]"}, "registries.block": {"registries": "[]"}, } @@ -102,7 +102,7 @@ class Podman(object): ) def run(self, command, *args, **kwargs): - """Podman initialized instance to run a given command + """Run given podman command :param self: Podman instance :param command: podman sub-command to run diff --git a/test/apiv2/python/rest_api/test_v2_0_0_container.py b/test/apiv2/python/rest_api/test_v2_0_0_container.py new file mode 100644 index 000000000..70c07d47f --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_container.py @@ -0,0 +1,192 @@ +import random +import unittest + +import requests +from dateutil.parser import parse + +from .fixtures import APITestCase + + +class ContainerTestCase(APITestCase): + def test_list(self): + r = requests.get(self.uri("/containers/json"), timeout=5) + self.assertEqual(r.status_code, 200, r.text) + obj = r.json() + self.assertEqual(len(obj), 0) + + def test_list_all(self): + r = requests.get(self.uri("/containers/json?all=true")) + self.assertEqual(r.status_code, 200, r.text) + self.assertId(r.content) + + def test_inspect(self): + r = requests.get(self.uri(self.resolve_container("/containers/{}/json"))) + self.assertEqual(r.status_code, 200, r.text) + self.assertId(r.content) + _ = parse(r.json()["Created"]) + + def test_stats(self): + r = requests.get(self.uri(self.resolve_container("/containers/{}/stats?stream=false"))) + self.assertIn(r.status_code, (200, 409), r.text) + if r.status_code == 200: + self.assertId(r.content) + + def test_delete(self): + r = requests.delete(self.uri(self.resolve_container("/containers/{}"))) + self.assertEqual(r.status_code, 204, r.text) + + def test_stop(self): + r = requests.post(self.uri(self.resolve_container("/containers/{}/start"))) + self.assertIn(r.status_code, (204, 304), r.text) + + r = requests.post(self.uri(self.resolve_container("/containers/{}/stop"))) + self.assertIn(r.status_code, (204, 304), r.text) + + def test_start(self): + r = requests.post(self.uri(self.resolve_container("/containers/{}/stop"))) + self.assertIn(r.status_code, (204, 304), r.text) + + r = requests.post(self.uri(self.resolve_container("/containers/{}/start"))) + self.assertIn(r.status_code, (204, 304), r.text) + + def test_restart(self): + r = requests.post(self.uri(self.resolve_container("/containers/{}/start"))) + self.assertIn(r.status_code, (204, 304), r.text) + + r = requests.post(self.uri(self.resolve_container("/containers/{}/restart")), timeout=5) + self.assertEqual(r.status_code, 204, r.text) + + def test_resize(self): + r = requests.post(self.uri(self.resolve_container("/containers/{}/resize?h=43&w=80"))) + self.assertIn(r.status_code, (200, 409), r.text) + if r.status_code == 200: + self.assertEqual(r.text, "", r.text) + + def test_attach(self): + self.skipTest("FIXME: Test timeouts") + r = requests.post(self.uri(self.resolve_container("/containers/{}/attach")), timeout=5) + self.assertIn(r.status_code, (101, 500), r.text) + + def test_logs(self): + r = requests.get(self.uri(self.resolve_container("/containers/{}/logs?stdout=true"))) + self.assertEqual(r.status_code, 200, r.text) + + def test_commit(self): + r = requests.post(self.uri(self.resolve_container("/commit?container={}"))) + self.assertEqual(r.status_code, 200, r.text) + self.assertId(r.content) + + obj = r.json() + self.assertIsInstance(obj, dict) + + def test_prune(self): + name = f"Container_{random.getrandbits(160):x}" + + r = requests.post( + self.podman_url + f"/v1.40/containers/create?name={name}", + json={ + "Cmd": ["cp", "/etc/motd", "/motd.size_test"], + "Image": "alpine:latest", + "NetworkDisabled": True, + }, + ) + self.assertEqual(r.status_code, 201, r.text) + create = r.json() + + r = requests.post(self.podman_url + f"/v1.40/containers/{create['Id']}/start") + self.assertEqual(r.status_code, 204, r.text) + + r = requests.post(self.podman_url + f"/v1.40/containers/{create['Id']}/wait") + self.assertEqual(r.status_code, 200, r.text) + wait = r.json() + self.assertEqual(wait["StatusCode"], 0, wait["Error"]["Message"]) + + prune = requests.post(self.podman_url + "/v1.40/containers/prune") + self.assertEqual(prune.status_code, 200, prune.status_code) + prune_payload = prune.json() + self.assertGreater(prune_payload["SpaceReclaimed"], 0) + self.assertIn(create["Id"], prune_payload["ContainersDeleted"]) + + # Delete any orphaned containers + r = requests.get(self.podman_url + "/v1.40/containers/json?all=true") + self.assertEqual(r.status_code, 200, r.text) + for self.resolve_container in r.json(): + requests.delete( + self.podman_url + f"/v1.40/containers/{self.resolve_container['Id']}?force=true" + ) + + # Image prune here tied to containers freeing up + prune = requests.post(self.podman_url + "/v1.40/images/prune") + self.assertEqual(prune.status_code, 200, prune.text) + prune_payload = prune.json() + self.assertGreater(prune_payload["SpaceReclaimed"], 0) + + # FIXME need method to determine which image is going to be "pruned" to fix test + # TODO should handler be recursive when deleting images? + # self.assertIn(img["Id"], prune_payload["ImagesDeleted"][1]["Deleted"]) + + # FIXME (@vrothberg): I commented this line out during the `libimage` migration. + # It doesn't make sense to report anything to be deleted if the reclaimed space + # is zero. I think the test needs some rewrite. + # self.assertIsNotNone(prune_payload["ImagesDeleted"][1]["Deleted"]) + + def test_status(self): + r = requests.post( + self.podman_url + "/v1.40/containers/create?name=topcontainer", + json={"Cmd": ["top"], "Image": "alpine:latest"}, + ) + self.assertEqual(r.status_code, 201, r.text) + payload = r.json() + container_id = payload["Id"] + self.assertIsNotNone(container_id) + + r = requests.get( + self.podman_url + "/v1.40/containers/json", + params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, + ) + self.assertEqual(r.status_code, 200, r.text) + payload = r.json() + self.assertEqual(payload[0]["Status"], "Created") + + r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/start") + self.assertEqual(r.status_code, 204, r.text) + + r = requests.get( + self.podman_url + "/v1.40/containers/json", + params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, + ) + self.assertEqual(r.status_code, 200, r.text) + payload = r.json() + self.assertTrue(str(payload[0]["Status"]).startswith("Up")) + + r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/pause") + self.assertEqual(r.status_code, 204, r.text) + + r = requests.get( + self.podman_url + "/v1.40/containers/json", + params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, + ) + self.assertEqual(r.status_code, 200, r.text) + payload = r.json() + self.assertTrue(str(payload[0]["Status"]).startswith("Up")) + self.assertTrue(str(payload[0]["Status"]).endswith("(Paused)")) + + r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/unpause") + self.assertEqual(r.status_code, 204, r.text) + r = requests.post(self.podman_url + f"/v1.40/containers/{container_id}/stop") + self.assertEqual(r.status_code, 204, r.text) + + r = requests.get( + self.podman_url + "/v1.40/containers/json", + params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, + ) + self.assertEqual(r.status_code, 200, r.text) + payload = r.json() + self.assertTrue(str(payload[0]["Status"]).startswith("Exited")) + + r = requests.delete(self.podman_url + f"/v1.40/containers/{container_id}") + self.assertEqual(r.status_code, 204, r.text) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/test_v2_0_0_image.py b/test/apiv2/python/rest_api/test_v2_0_0_image.py new file mode 100644 index 000000000..99f513608 --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_image.py @@ -0,0 +1,165 @@ +import json +import unittest +from multiprocessing import Process + +import requests +from dateutil.parser import parse +from .fixtures import APITestCase + + +class ImageTestCase(APITestCase): + def test_list(self): + r = requests.get(self.podman_url + "/v1.40/images/json") + self.assertEqual(r.status_code, 200, r.text) + + # See https://docs.docker.com/engine/api/v1.40/#operation/ImageList + required_keys = ( + "Id", + "ParentId", + "RepoTags", + "RepoDigests", + "Created", + "Size", + "SharedSize", + "VirtualSize", + "Labels", + "Containers", + ) + images = r.json() + self.assertIsInstance(images, list) + for item in images: + self.assertIsInstance(item, dict) + for k in required_keys: + self.assertIn(k, item) + + def test_inspect(self): + r = requests.get(self.podman_url + "/v1.40/images/alpine/json") + self.assertEqual(r.status_code, 200, r.text) + + # See https://docs.docker.com/engine/api/v1.40/#operation/ImageInspect + required_keys = ( + "Id", + "Parent", + "Comment", + "Created", + "Container", + "DockerVersion", + "Author", + "Architecture", + "Os", + "Size", + "VirtualSize", + "GraphDriver", + "RootFS", + "Metadata", + ) + + image = r.json() + self.assertIsInstance(image, dict) + for item in required_keys: + self.assertIn(item, image) + _ = parse(image["Created"]) + + def test_delete(self): + r = requests.delete(self.podman_url + "/v1.40/images/alpine?force=true") + self.assertEqual(r.status_code, 200, r.text) + self.assertIsInstance(r.json(), list) + + def test_pull(self): + r = requests.post(self.uri("/images/pull?reference=alpine"), timeout=15) + self.assertEqual(r.status_code, 200, r.status_code) + text = r.text + keys = { + "error": False, + "id": False, + "images": False, + "stream": False, + } + # Read and record stanza's from pull + for line in str.splitlines(text): + obj = json.loads(line) + key_list = list(obj.keys()) + for k in key_list: + keys[k] = True + + self.assertFalse(keys["error"], "Expected no errors") + self.assertTrue(keys["id"], "Expected to find id stanza") + self.assertTrue(keys["images"], "Expected to find images stanza") + self.assertTrue(keys["stream"], "Expected to find stream progress stanza's") + + def test_search_compat(self): + url = self.podman_url + "/v1.40/images/search" + + # Had issues with this test hanging when repositories not happy + def do_search1(): + payload = {"term": "alpine"} + r = requests.get(url, params=payload, timeout=5) + self.assertEqual(r.status_code, 200, f"#1: {r.text}") + self.assertIsInstance(r.json(), list) + + def do_search2(): + payload = {"term": "alpine", "limit": 1} + r = requests.get(url, params=payload, timeout=5) + self.assertEqual(r.status_code, 200, f"#2: {r.text}") + + results = r.json() + self.assertIsInstance(results, list) + self.assertEqual(len(results), 1) + + def do_search3(): + # FIXME: Research if quay.io supports is-official and which image is "official" + return + payload = {"term": "thanos", "filters": '{"is-official":["true"]}'} + r = requests.get(url, params=payload, timeout=5) + self.assertEqual(r.status_code, 200, f"#3: {r.text}") + + results = r.json() + self.assertIsInstance(results, list) + + # There should be only one official image + self.assertEqual(len(results), 1) + + def do_search4(): + headers = {"X-Registry-Auth": "null"} + payload = {"term": "alpine"} + r = requests.get(url, params=payload, headers=headers, timeout=5) + self.assertEqual(r.status_code, 200, f"#4: {r.text}") + + def do_search5(): + headers = {"X-Registry-Auth": "invalid value"} + payload = {"term": "alpine"} + r = requests.get(url, params=payload, headers=headers, timeout=5) + self.assertEqual(r.status_code, 400, f"#5: {r.text}") + + i = 1 + for fn in [do_search1, do_search2, do_search3, do_search4, do_search5]: + with self.subTest(i=i): + search = Process(target=fn) + search.start() + search.join(timeout=10) + self.assertFalse(search.is_alive(), f"#{i} /images/search took too long") + + # search_methods = [do_search1, do_search2, do_search3, do_search4, do_search5] + # for search_method in search_methods: + # search = Process(target=search_method) + # search.start() + # search.join(timeout=10) + # self.assertFalse(search.is_alive(), "/images/search took too long") + + def test_history(self): + r = requests.get(self.podman_url + "/v1.40/images/alpine/history") + self.assertEqual(r.status_code, 200, r.text) + + # See https://docs.docker.com/engine/api/v1.40/#operation/ImageHistory + required_keys = ("Id", "Created", "CreatedBy", "Tags", "Size", "Comment") + + changes = r.json() + self.assertIsInstance(changes, list) + for change in changes: + self.assertIsInstance(change, dict) + for k in required_keys: + self.assertIn(k, change) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/test_v2_0_0_manifest.py b/test/apiv2/python/rest_api/test_v2_0_0_manifest.py new file mode 100644 index 000000000..c28c63bcb --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_manifest.py @@ -0,0 +1,14 @@ +import unittest + +import requests +from .fixtures import APITestCase + + +class ManifestTestCase(APITestCase): + def test_manifest_409(self): + r = requests.post(self.uri("/manifests/create"), params={"name": "ThisIsAnInvalidImage"}) + self.assertEqual(r.status_code, 400, r.text) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/test_v2_0_0_network.py b/test/apiv2/python/rest_api/test_v2_0_0_network.py new file mode 100644 index 000000000..3888123fb --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_network.py @@ -0,0 +1,155 @@ +import random +import unittest + +import requests + +from .fixtures import APITestCase + + +class NetworkTestCase(APITestCase): + # TODO Need to support Docker-py order of network/container creates + def test_connect(self): + """Create network and container then connect to network""" + net_default = requests.post( + self.podman_url + "/v1.40/networks/create", json={"Name": "TestDefaultNetwork"} + ) + self.assertEqual(net_default.status_code, 201, net_default.text) + + create = requests.post( + self.podman_url + "/v1.40/containers/create?name=postCreateConnect", + json={ + "Cmd": ["top"], + "Image": "alpine:latest", + "NetworkDisabled": False, + # FIXME adding these 2 lines cause: (This is sampled from docker-py) + # "network already exists","message":"container + # 01306e499df5441560d70071a54342611e422a94de20865add50a9565fd79fb9 is already connected to CNI + # network \"TestDefaultNetwork\": network already exists" + # "HostConfig": {"NetworkMode": "TestDefaultNetwork"}, + # "NetworkingConfig": {"EndpointsConfig": {"TestDefaultNetwork": None}}, + # FIXME These two lines cause: + # CNI network \"TestNetwork\" not found","message":"error configuring network namespace for container + # 369ddfa7d3211ebf1fbd5ddbff91bd33fa948858cea2985c133d6b6507546dff: CNI network \"TestNetwork\" not + # found" + # "HostConfig": {"NetworkMode": "TestNetwork"}, + # "NetworkingConfig": {"EndpointsConfig": {"TestNetwork": None}}, + # FIXME no networking defined cause: (note this error is from the container inspect below) + # "internal libpod error","message":"network inspection mismatch: asked to join 2 CNI network(s) [ + # TestDefaultNetwork podman], but have information on 1 network(s): internal libpod error" + }, + ) + self.assertEqual(create.status_code, 201, create.text) + self.assertId(create.content) + + payload = create.json() + start = requests.post(self.podman_url + f"/v1.40/containers/{payload['Id']}/start") + self.assertEqual(start.status_code, 204, start.text) + + connect = requests.post( + self.podman_url + "/v1.40/networks/TestDefaultNetwork/connect", + json={"Container": payload["Id"]}, + ) + self.assertEqual(connect.status_code, 200, connect.text) + self.assertEqual(connect.text, "OK\n") + + inspect = requests.get(f"{self.podman_url}/v1.40/containers/{payload['Id']}/json") + self.assertEqual(inspect.status_code, 200, inspect.text) + + payload = inspect.json() + self.assertFalse(payload["Config"].get("NetworkDisabled", False)) + + self.assertEqual( + "TestDefaultNetwork", + payload["NetworkSettings"]["Networks"]["TestDefaultNetwork"]["NetworkID"], + ) + # TODO restore this to test, when joining multiple networks possible + # self.assertEqual( + # "TestNetwork", + # payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"], + # ) + # TODO Need to support network aliases + # self.assertIn( + # "test_post_create", + # payload["NetworkSettings"]["Networks"]["TestNetwork"]["Aliases"], + # ) + + def test_create(self): + """Create network and connect container during create""" + net = requests.post( + self.podman_url + "/v1.40/networks/create", json={"Name": "TestNetwork"} + ) + self.assertEqual(net.status_code, 201, net.text) + + create = requests.post( + self.podman_url + "/v1.40/containers/create?name=postCreate", + json={ + "Cmd": ["date"], + "Image": "alpine:latest", + "NetworkDisabled": False, + "HostConfig": {"NetworkMode": "TestNetwork"}, + }, + ) + self.assertEqual(create.status_code, 201, create.text) + self.assertId(create.content) + + payload = create.json() + inspect = requests.get(f"{self.podman_url}/v1.40/containers/{payload['Id']}/json") + self.assertEqual(inspect.status_code, 200, inspect.text) + + payload = inspect.json() + self.assertFalse(payload["Config"].get("NetworkDisabled", False)) + self.assertEqual( + "TestNetwork", + payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"], + ) + + def test_crud(self): + name = f"Network_{random.getrandbits(160):x}" + + # Cannot test for 0 existing networks because default "podman" network always exists + + create = requests.post(self.podman_url + "/v1.40/networks/create", json={"Name": name}) + self.assertEqual(create.status_code, 201, create.text) + self.assertId(create.content) + + net = create.json() + self.assertIsInstance(net, dict) + self.assertNotEqual(net["Id"], name) + ident = net["Id"] + + ls = requests.get(self.podman_url + "/v1.40/networks") + self.assertEqual(ls.status_code, 200, ls.text) + + networks = ls.json() + self.assertIsInstance(networks, list) + + found = False + for net in networks: + if net["Name"] == name: + found = True + break + self.assertTrue(found, f"Network '{name}' not found") + + inspect = requests.get(self.podman_url + f"/v1.40/networks/{ident}") + self.assertEqual(inspect.status_code, 200, inspect.text) + self.assertIsInstance(inspect.json(), dict) + + inspect = requests.delete(self.podman_url + f"/v1.40/networks/{ident}") + self.assertEqual(inspect.status_code, 204, inspect.text) + inspect = requests.get(self.podman_url + f"/v1.40/networks/{ident}") + self.assertEqual(inspect.status_code, 404, inspect.text) + + # network prune + prune_name = f"Network_{random.getrandbits(160):x}" + prune_create = requests.post( + self.podman_url + "/v1.40/networks/create", json={"Name": prune_name} + ) + self.assertEqual(create.status_code, 201, prune_create.text) + + prune = requests.post(self.podman_url + "/v1.40/networks/prune") + self.assertEqual(prune.status_code, 200, prune.text) + self.assertTrue(prune_name in prune.json()["NetworksDeleted"]) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/test_v2_0_0_pod.py b/test/apiv2/python/rest_api/test_v2_0_0_pod.py new file mode 100644 index 000000000..9155ad19c --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_pod.py @@ -0,0 +1,65 @@ +import random +import unittest + +import requests +from .fixtures import APITestCase + + +class TestApi(APITestCase): + def test_pod_start_conflict(self): + """Verify issue #8865""" + + pod_name = list() + pod_name.append(f"Pod_{random.getrandbits(160):x}") + pod_name.append(f"Pod_{random.getrandbits(160):x}") + + r = requests.post( + self.uri("/pods/create"), + json={ + "name": pod_name[0], + "no_infra": False, + "portmappings": [{"host_ip": "127.0.0.1", "host_port": 8889, "container_port": 89}], + }, + ) + self.assertEqual(r.status_code, 201, r.text) + r = requests.post( + self.uri("/containers/create"), + json={ + "pod": pod_name[0], + "image": "quay.io/libpod/alpine:latest", + "command": ["top"], + }, + ) + self.assertEqual(r.status_code, 201, r.text) + + r = requests.post( + self.uri("/pods/create"), + json={ + "name": pod_name[1], + "no_infra": False, + "portmappings": [{"host_ip": "127.0.0.1", "host_port": 8889, "container_port": 89}], + }, + ) + self.assertEqual(r.status_code, 201, r.text) + r = requests.post( + self.uri("/containers/create"), + json={ + "pod": pod_name[1], + "image": "quay.io/libpod/alpine:latest", + "command": ["top"], + }, + ) + self.assertEqual(r.status_code, 201, r.text) + + r = requests.post(self.uri(f"/pods/{pod_name[0]}/start")) + self.assertEqual(r.status_code, 200, r.text) + + r = requests.post(self.uri(f"/pods/{pod_name[1]}/start")) + self.assertEqual(r.status_code, 409, r.text) + + start = r.json() + self.assertGreater(len(start["Errs"]), 0, r.text) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/test_v2_0_0_system.py b/test/apiv2/python/rest_api/test_v2_0_0_system.py new file mode 100644 index 000000000..3628b5af1 --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_system.py @@ -0,0 +1,88 @@ +import json +import unittest + +import requests +from .fixtures import APITestCase + + +class SystemTestCase(APITestCase): + def test_info(self): + r = requests.get(self.uri("/info")) + self.assertEqual(r.status_code, 200, r.text) + self.assertIsNotNone(r.content) + _ = r.json() + + r = requests.get(self.podman_url + "/v1.40/info") + self.assertEqual(r.status_code, 200, r.text) + self.assertIsNotNone(r.content) + _ = r.json() + + def test_events(self): + r = requests.get(self.uri("/events?stream=false")) + self.assertEqual(r.status_code, 200, r.text) + self.assertIsNotNone(r.content) + + report = r.text.splitlines() + self.assertGreater(len(report), 0, "No events found!") + for line in report: + obj = json.loads(line) + # Actor.ID is uppercase for compatibility + self.assertIn("ID", obj["Actor"]) + + def test_ping(self): + required_headers = ( + "API-Version", + "Builder-Version", + "Docker-Experimental", + "Cache-Control", + "Pragma", + "Pragma", + ) + + def check_headers(req): + for k in required_headers: + self.assertIn(k, req.headers) + + r = requests.get(self.podman_url + "/_ping") + self.assertEqual(r.status_code, 200, r.text) + self.assertEqual(r.text, "OK") + check_headers(r) + + r = requests.head(self.podman_url + "/_ping") + self.assertEqual(r.status_code, 200, r.text) + self.assertEqual(r.text, "") + check_headers(r) + + r = requests.get(self.uri("/_ping")) + self.assertEqual(r.status_code, 200, r.text) + self.assertEqual(r.text, "OK") + check_headers(r) + + r = requests.head(self.uri("/_ping")) + self.assertEqual(r.status_code, 200, r.text) + self.assertEqual(r.text, "") + check_headers(r) + + def test_version(self): + r = requests.get(self.podman_url + "/v1.40/version") + self.assertEqual(r.status_code, 200, r.text) + + r = requests.get(self.uri("/version")) + self.assertEqual(r.status_code, 200, r.text) + + def test_df(self): + r = requests.get(self.podman_url + "/v1.40/system/df") + self.assertEqual(r.status_code, 200, r.text) + + obj = r.json() + self.assertIn("Images", obj) + self.assertIn("Containers", obj) + self.assertIn("Volumes", obj) + self.assertIn("BuildCache", obj) + + r = requests.get(self.uri("/system/df")) + self.assertEqual(r.status_code, 200, r.text) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/python/rest_api/test_v2_0_0_volume.py b/test/apiv2/python/rest_api/test_v2_0_0_volume.py new file mode 100644 index 000000000..f5231e17c --- /dev/null +++ b/test/apiv2/python/rest_api/test_v2_0_0_volume.py @@ -0,0 +1,75 @@ +import os +import random +import unittest + +import requests +from .fixtures import APITestCase + + +class VolumeTestCase(APITestCase): + def test_volume(self): + name = f"Volume_{random.getrandbits(160):x}" + + ls = requests.get(self.podman_url + "/v1.40/volumes") + self.assertEqual(ls.status_code, 200, ls.text) + + # See https://docs.docker.com/engine/api/v1.40/#operation/VolumeList + required_keys = ( + "Volumes", + "Warnings", + ) + + volumes = ls.json() + self.assertIsInstance(volumes, dict) + for key in required_keys: + self.assertIn(key, volumes) + + create = requests.post(self.podman_url + "/v1.40/volumes/create", json={"Name": name}) + self.assertEqual(create.status_code, 201, create.text) + + # See https://docs.docker.com/engine/api/v1.40/#operation/VolumeCreate + # and https://docs.docker.com/engine/api/v1.40/#operation/VolumeInspect + required_keys = ( + "Name", + "Driver", + "Mountpoint", + "Labels", + "Scope", + "Options", + ) + + volume = create.json() + self.assertIsInstance(volume, dict) + for k in required_keys: + self.assertIn(k, volume) + self.assertEqual(volume["Name"], name) + + inspect = requests.get(self.podman_url + f"/v1.40/volumes/{name}") + self.assertEqual(inspect.status_code, 200, inspect.text) + + volume = inspect.json() + self.assertIsInstance(volume, dict) + for k in required_keys: + self.assertIn(k, volume) + + rm = requests.delete(self.podman_url + f"/v1.40/volumes/{name}") + self.assertEqual(rm.status_code, 204, rm.text) + + # recreate volume with data and then prune it + r = requests.post(self.podman_url + "/v1.40/volumes/create", json={"Name": name}) + self.assertEqual(create.status_code, 201, create.text) + + create = r.json() + with open(os.path.join(create["Mountpoint"], "test_prune"), "w") as file: + file.writelines(["This is a test\n", "This is a good test\n"]) + + prune = requests.post(self.podman_url + "/v1.40/volumes/prune") + self.assertEqual(prune.status_code, 200, prune.text) + + payload = prune.json() + self.assertIn(name, payload["VolumesDeleted"]) + self.assertGreater(payload["SpaceReclaimed"], 0) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/apiv2/rest_api/v1_test_rest_v1_0_0.py b/test/apiv2/python/rest_api/v1_test_rest_v1_0_0.py index 23528a246..905c29683 100644 --- a/test/apiv2/rest_api/v1_test_rest_v1_0_0.py +++ b/test/apiv2/python/rest_api/v1_test_rest_v1_0_0.py @@ -45,7 +45,7 @@ class TestApi(unittest.TestCase): if TestApi.podman.poll() is not None: sys.stderr.write("podman service returned {}", TestApi.podman.returncode) sys.exit(2) - requests.get(_url("/images/create?fromSrc=docker.io%2Falpine%3Alatest")) + requests.get(_url("/images/create?fromSrc=quay.io%2Flibpod%2Falpine%3Alatest")) # calling out to podman is easier than the API for running a container subprocess.run( [podman(), "run", "alpine", "/bin/ls"], diff --git a/test/apiv2/rest_api/test_rest_v2_0_0.py b/test/apiv2/rest_api/test_rest_v2_0_0.py deleted file mode 100644 index f66e2b120..000000000 --- a/test/apiv2/rest_api/test_rest_v2_0_0.py +++ /dev/null @@ -1,744 +0,0 @@ -import json -import os -import random -import string -import subprocess -import sys -import time -import unittest -from multiprocessing import Process - -import requests -from dateutil.parser import parse - -from test.apiv2.rest_api import Podman - -PODMAN_URL = "http://localhost:8080" - - -def _url(path): - return PODMAN_URL + "/v2.0.0/libpod" + path - - -def ctnr(path): - try: - r = requests.get(_url("/containers/json?all=true")) - ctnrs = json.loads(r.text) - except Exception as e: - msg = f"Bad container response: {e}" - if r is not None: - msg = msg + " " + r.text - sys.stderr.write(msg + "\n") - raise - return path.format(ctnrs[0]["Id"]) - - -def validateObjectFields(buffer): - objs = json.loads(buffer) - if not isinstance(objs, dict): - for o in objs: - _ = o["Id"] - else: - _ = objs["Id"] - return objs - - -class TestApi(unittest.TestCase): - podman = None # initialized podman configuration for tests - service = None # podman service instance - - def setUp(self): - super().setUp() - - TestApi.podman.run("run", "alpine", "/bin/ls", check=True) - - def tearDown(self) -> None: - super().tearDown() - - TestApi.podman.run("pod", "rm", "--all", "--force", check=True) - TestApi.podman.run("rm", "--all", "--force", check=True) - - @classmethod - def setUpClass(cls): - super().setUpClass() - - TestApi.podman = Podman() - TestApi.service = TestApi.podman.open("system", "service", "tcp:localhost:8080", "--time=0") - # give the service some time to be ready... - time.sleep(2) - - returncode = TestApi.service.poll() - if returncode is not None: - raise subprocess.CalledProcessError(returncode, "podman system service") - - r = requests.post(_url("/images/pull?reference=docker.io%2Falpine%3Alatest")) - if r.status_code != 200: - raise subprocess.CalledProcessError( - r.status_code, f"podman images pull docker.io/alpine:latest {r.text}" - ) - - @classmethod - def tearDownClass(cls): - TestApi.service.terminate() - stdout, stderr = TestApi.service.communicate(timeout=0.5) - if stdout: - sys.stdout.write("\nService Stdout:\n" + stdout.decode("utf-8")) - if stderr: - sys.stderr.write("\nService Stderr:\n" + stderr.decode("utf-8")) - return super().tearDownClass() - - def test_info(self): - r = requests.get(_url("/info")) - self.assertEqual(r.status_code, 200) - self.assertIsNotNone(r.content) - _ = json.loads(r.text) - - info = requests.get(PODMAN_URL + "/v1.40/info") - self.assertEqual(info.status_code, 200, info.content) - _ = json.loads(info.text) - - def test_events(self): - r = requests.get(_url("/events?stream=false")) - self.assertEqual(r.status_code, 200, r.text) - self.assertIsNotNone(r.content) - - report = r.text.splitlines() - self.assertGreater(len(report), 0, "No events found!") - for line in report: - obj = json.loads(line) - # Actor.ID is uppercase for compatibility - self.assertIn("ID", obj["Actor"]) - - def test_containers(self): - r = requests.get(_url("/containers/json"), timeout=5) - self.assertEqual(r.status_code, 200, r.text) - obj = json.loads(r.text) - self.assertEqual(len(obj), 0) - - def test_containers_all(self): - r = requests.get(_url("/containers/json?all=true")) - self.assertEqual(r.status_code, 200, r.text) - validateObjectFields(r.text) - - def test_inspect_container(self): - r = requests.get(_url(ctnr("/containers/{}/json"))) - self.assertEqual(r.status_code, 200, r.text) - obj = validateObjectFields(r.content) - _ = parse(obj["Created"]) - - def test_stats(self): - r = requests.get(_url(ctnr("/containers/{}/stats?stream=false"))) - self.assertIn(r.status_code, (200, 409), r.text) - if r.status_code == 200: - validateObjectFields(r.text) - - def test_delete_containers(self): - r = requests.delete(_url(ctnr("/containers/{}"))) - self.assertEqual(r.status_code, 204, r.text) - - def test_stop_containers(self): - r = requests.post(_url(ctnr("/containers/{}/start"))) - self.assertIn(r.status_code, (204, 304), r.text) - - r = requests.post(_url(ctnr("/containers/{}/stop"))) - self.assertIn(r.status_code, (204, 304), r.text) - - def test_start_containers(self): - r = requests.post(_url(ctnr("/containers/{}/stop"))) - self.assertIn(r.status_code, (204, 304), r.text) - - r = requests.post(_url(ctnr("/containers/{}/start"))) - self.assertIn(r.status_code, (204, 304), r.text) - - def test_restart_containers(self): - r = requests.post(_url(ctnr("/containers/{}/start"))) - self.assertIn(r.status_code, (204, 304), r.text) - - r = requests.post(_url(ctnr("/containers/{}/restart")), timeout=5) - self.assertEqual(r.status_code, 204, r.text) - - def test_resize(self): - r = requests.post(_url(ctnr("/containers/{}/resize?h=43&w=80"))) - self.assertIn(r.status_code, (200, 409), r.text) - if r.status_code == 200: - self.assertEqual(r.text, "", r.text) - - def test_attach_containers(self): - self.skipTest("FIXME: Test timeouts") - r = requests.post(_url(ctnr("/containers/{}/attach")), timeout=5) - self.assertIn(r.status_code, (101, 500), r.text) - - def test_logs_containers(self): - r = requests.get(_url(ctnr("/containers/{}/logs?stdout=true"))) - self.assertEqual(r.status_code, 200, r.text) - - # TODO Need to support Docker-py order of network/container creates - def test_post_create_compat_connect(self): - """Create network and container then connect to network""" - net_default = requests.post( - PODMAN_URL + "/v1.40/networks/create", json={"Name": "TestDefaultNetwork"} - ) - self.assertEqual(net_default.status_code, 201, net_default.text) - - create = requests.post( - PODMAN_URL + "/v1.40/containers/create?name=postCreateConnect", - json={ - "Cmd": ["top"], - "Image": "alpine:latest", - "NetworkDisabled": False, - # FIXME adding these 2 lines cause: (This is sampled from docker-py) - # "network already exists","message":"container - # 01306e499df5441560d70071a54342611e422a94de20865add50a9565fd79fb9 is already connected to CNI - # network \"TestDefaultNetwork\": network already exists" - # "HostConfig": {"NetworkMode": "TestDefaultNetwork"}, - # "NetworkingConfig": {"EndpointsConfig": {"TestDefaultNetwork": None}}, - # FIXME These two lines cause: - # CNI network \"TestNetwork\" not found","message":"error configuring network namespace for container - # 369ddfa7d3211ebf1fbd5ddbff91bd33fa948858cea2985c133d6b6507546dff: CNI network \"TestNetwork\" not - # found" - # "HostConfig": {"NetworkMode": "TestNetwork"}, - # "NetworkingConfig": {"EndpointsConfig": {"TestNetwork": None}}, - # FIXME no networking defined cause: (note this error is from the container inspect below) - # "internal libpod error","message":"network inspection mismatch: asked to join 2 CNI network(s) [ - # TestDefaultNetwork podman], but have information on 1 network(s): internal libpod error" - }, - ) - self.assertEqual(create.status_code, 201, create.text) - payload = json.loads(create.text) - self.assertIsNotNone(payload["Id"]) - - start = requests.post(PODMAN_URL + f"/v1.40/containers/{payload['Id']}/start") - self.assertEqual(start.status_code, 204, start.text) - - connect = requests.post( - PODMAN_URL + "/v1.40/networks/TestDefaultNetwork/connect", - json={"Container": payload["Id"]}, - ) - self.assertEqual(connect.status_code, 200, connect.text) - self.assertEqual(connect.text, "OK\n") - - inspect = requests.get(f"{PODMAN_URL}/v1.40/containers/{payload['Id']}/json") - self.assertEqual(inspect.status_code, 200, inspect.text) - - payload = json.loads(inspect.text) - self.assertFalse(payload["Config"].get("NetworkDisabled", False)) - - self.assertEqual( - "TestDefaultNetwork", - payload["NetworkSettings"]["Networks"]["TestDefaultNetwork"]["NetworkID"], - ) - # TODO restore this to test, when joining multiple networks possible - # self.assertEqual( - # "TestNetwork", - # payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"], - # ) - # TODO Need to support network aliases - # self.assertIn( - # "test_post_create", - # payload["NetworkSettings"]["Networks"]["TestNetwork"]["Aliases"], - # ) - - def test_post_create_compat(self): - """Create network and connect container during create""" - net = requests.post(PODMAN_URL + "/v1.40/networks/create", json={"Name": "TestNetwork"}) - self.assertEqual(net.status_code, 201, net.text) - - create = requests.post( - PODMAN_URL + "/v1.40/containers/create?name=postCreate", - json={ - "Cmd": ["date"], - "Image": "alpine:latest", - "NetworkDisabled": False, - "HostConfig": {"NetworkMode": "TestNetwork"}, - }, - ) - self.assertEqual(create.status_code, 201, create.text) - payload = json.loads(create.text) - self.assertIsNotNone(payload["Id"]) - - inspect = requests.get(f"{PODMAN_URL}/v1.40/containers/{payload['Id']}/json") - self.assertEqual(inspect.status_code, 200, inspect.text) - payload = json.loads(inspect.text) - self.assertFalse(payload["Config"].get("NetworkDisabled", False)) - self.assertEqual( - "TestNetwork", - payload["NetworkSettings"]["Networks"]["TestNetwork"]["NetworkID"], - ) - - def test_commit(self): - r = requests.post(_url(ctnr("/commit?container={}"))) - self.assertEqual(r.status_code, 200, r.text) - - obj = json.loads(r.content) - self.assertIsInstance(obj, dict) - self.assertIn("Id", obj) - - def test_images_compat(self): - r = requests.get(PODMAN_URL + "/v1.40/images/json") - self.assertEqual(r.status_code, 200, r.text) - - # See https://docs.docker.com/engine/api/v1.40/#operation/ImageList - required_keys = ( - "Id", - "ParentId", - "RepoTags", - "RepoDigests", - "Created", - "Size", - "SharedSize", - "VirtualSize", - "Labels", - "Containers", - ) - objs = json.loads(r.content) - self.assertIn(type(objs), (list,)) - for o in objs: - self.assertIsInstance(o, dict) - for k in required_keys: - self.assertIn(k, o) - - def test_inspect_image_compat(self): - r = requests.get(PODMAN_URL + "/v1.40/images/alpine/json") - self.assertEqual(r.status_code, 200, r.text) - - # See https://docs.docker.com/engine/api/v1.40/#operation/ImageInspect - required_keys = ( - "Id", - "Parent", - "Comment", - "Created", - "Container", - "DockerVersion", - "Author", - "Architecture", - "Os", - "Size", - "VirtualSize", - "GraphDriver", - "RootFS", - "Metadata", - ) - - obj = json.loads(r.content) - self.assertIn(type(obj), (dict,)) - for k in required_keys: - self.assertIn(k, obj) - _ = parse(obj["Created"]) - - def test_delete_image_compat(self): - r = requests.delete(PODMAN_URL + "/v1.40/images/alpine?force=true") - self.assertEqual(r.status_code, 200, r.text) - obj = json.loads(r.content) - self.assertIn(type(obj), (list,)) - - def test_pull(self): - r = requests.post(_url("/images/pull?reference=alpine"), timeout=15) - self.assertEqual(r.status_code, 200, r.status_code) - text = r.text - keys = { - "error": False, - "id": False, - "images": False, - "stream": False, - } - # Read and record stanza's from pull - for line in str.splitlines(text): - obj = json.loads(line) - key_list = list(obj.keys()) - for k in key_list: - keys[k] = True - - self.assertFalse(keys["error"], "Expected no errors") - self.assertTrue(keys["id"], "Expected to find id stanza") - self.assertTrue(keys["images"], "Expected to find images stanza") - self.assertTrue(keys["stream"], "Expected to find stream progress stanza's") - - def test_search_compat(self): - url = PODMAN_URL + "/v1.40/images/search" - - # Had issues with this test hanging when repositories not happy - def do_search1(): - payload = {"term": "alpine"} - r = requests.get(url, params=payload, timeout=5) - self.assertEqual(r.status_code, 200, r.text) - objs = json.loads(r.text) - self.assertIn(type(objs), (list,)) - - def do_search2(): - payload = {"term": "alpine", "limit": 1} - r = requests.get(url, params=payload, timeout=5) - self.assertEqual(r.status_code, 200, r.text) - objs = json.loads(r.text) - self.assertIn(type(objs), (list,)) - self.assertEqual(len(objs), 1) - - def do_search3(): - payload = {"term": "alpine", "filters": '{"is-official":["true"]}'} - r = requests.get(url, params=payload, timeout=5) - self.assertEqual(r.status_code, 200, r.text) - objs = json.loads(r.text) - self.assertIn(type(objs), (list,)) - # There should be only one official image - self.assertEqual(len(objs), 1) - - def do_search4(): - headers = {"X-Registry-Auth": "null"} - payload = {"term": "alpine"} - r = requests.get(url, params=payload, headers=headers, timeout=5) - self.assertEqual(r.status_code, 200, r.text) - - def do_search5(): - headers = {"X-Registry-Auth": "invalid value"} - payload = {"term": "alpine"} - r = requests.get(url, params=payload, headers=headers, timeout=5) - self.assertEqual(r.status_code, 400, r.text) - - search_methods = [do_search1, do_search2, do_search3, do_search4, do_search5] - for search_method in search_methods: - search = Process(target=search_method) - search.start() - search.join(timeout=10) - self.assertFalse(search.is_alive(), "/images/search took too long") - - def test_ping(self): - required_headers = ( - "API-Version", - "Builder-Version", - "Docker-Experimental", - "Cache-Control", - "Pragma", - "Pragma", - ) - - def check_headers(req): - for k in required_headers: - self.assertIn(k, req.headers) - - r = requests.get(PODMAN_URL + "/_ping") - self.assertEqual(r.status_code, 200, r.text) - self.assertEqual(r.text, "OK") - check_headers(r) - - r = requests.head(PODMAN_URL + "/_ping") - self.assertEqual(r.status_code, 200, r.text) - self.assertEqual(r.text, "") - check_headers(r) - - r = requests.get(_url("/_ping")) - self.assertEqual(r.status_code, 200, r.text) - self.assertEqual(r.text, "OK") - check_headers(r) - - r = requests.head(_url("/_ping")) - self.assertEqual(r.status_code, 200, r.text) - self.assertEqual(r.text, "") - check_headers(r) - - def test_history_compat(self): - r = requests.get(PODMAN_URL + "/v1.40/images/alpine/history") - self.assertEqual(r.status_code, 200, r.text) - - # See https://docs.docker.com/engine/api/v1.40/#operation/ImageHistory - required_keys = ("Id", "Created", "CreatedBy", "Tags", "Size", "Comment") - - objs = json.loads(r.content) - self.assertIn(type(objs), (list,)) - for o in objs: - self.assertIsInstance(o, dict) - for k in required_keys: - self.assertIn(k, o) - - def test_network_compat(self): - name = "Network_" + "".join(random.choice(string.ascii_letters) for i in range(10)) - - # Cannot test for 0 existing networks because default "podman" network always exists - - create = requests.post(PODMAN_URL + "/v1.40/networks/create", json={"Name": name}) - self.assertEqual(create.status_code, 201, create.content) - obj = json.loads(create.content) - self.assertIn(type(obj), (dict,)) - self.assertIn("Id", obj) - ident = obj["Id"] - self.assertNotEqual(name, ident) - - ls = requests.get(PODMAN_URL + "/v1.40/networks") - self.assertEqual(ls.status_code, 200, ls.content) - objs = json.loads(ls.content) - self.assertIn(type(objs), (list,)) - - found = False - for network in objs: - if network["Name"] == name: - found = True - self.assertTrue(found, f"Network {name} not found") - - inspect = requests.get(PODMAN_URL + f"/v1.40/networks/{ident}") - self.assertEqual(inspect.status_code, 200, inspect.content) - obj = json.loads(create.content) - self.assertIn(type(obj), (dict,)) - - inspect = requests.delete(PODMAN_URL + f"/v1.40/networks/{ident}") - self.assertEqual(inspect.status_code, 204, inspect.content) - inspect = requests.get(PODMAN_URL + f"/v1.40/networks/{ident}") - self.assertEqual(inspect.status_code, 404, inspect.content) - - # network prune - prune_name = "Network_" + "".join(random.choice(string.ascii_letters) for i in range(10)) - prune_create = requests.post( - PODMAN_URL + "/v1.40/networks/create", json={"Name": prune_name} - ) - self.assertEqual(create.status_code, 201, prune_create.content) - - prune = requests.post(PODMAN_URL + "/v1.40/networks/prune") - self.assertEqual(prune.status_code, 200, prune.content) - obj = json.loads(prune.content) - self.assertTrue(prune_name in obj["NetworksDeleted"]) - - def test_volumes_compat(self): - name = "Volume_" + "".join(random.choice(string.ascii_letters) for i in range(10)) - - ls = requests.get(PODMAN_URL + "/v1.40/volumes") - self.assertEqual(ls.status_code, 200, ls.content) - - # See https://docs.docker.com/engine/api/v1.40/#operation/VolumeList - required_keys = ( - "Volumes", - "Warnings", - ) - - obj = json.loads(ls.content) - self.assertIn(type(obj), (dict,)) - for k in required_keys: - self.assertIn(k, obj) - - create = requests.post(PODMAN_URL + "/v1.40/volumes/create", json={"Name": name}) - self.assertEqual(create.status_code, 201, create.content) - - # See https://docs.docker.com/engine/api/v1.40/#operation/VolumeCreate - # and https://docs.docker.com/engine/api/v1.40/#operation/VolumeInspect - required_keys = ( - "Name", - "Driver", - "Mountpoint", - "Labels", - "Scope", - "Options", - ) - - obj = json.loads(create.content) - self.assertIn(type(obj), (dict,)) - for k in required_keys: - self.assertIn(k, obj) - self.assertEqual(obj["Name"], name) - - inspect = requests.get(PODMAN_URL + f"/v1.40/volumes/{name}") - self.assertEqual(inspect.status_code, 200, inspect.content) - - obj = json.loads(create.content) - self.assertIn(type(obj), (dict,)) - for k in required_keys: - self.assertIn(k, obj) - - rm = requests.delete(PODMAN_URL + f"/v1.40/volumes/{name}") - self.assertEqual(rm.status_code, 204, rm.content) - - # recreate volume with data and then prune it - r = requests.post(PODMAN_URL + "/v1.40/volumes/create", json={"Name": name}) - self.assertEqual(create.status_code, 201, create.content) - create = json.loads(r.content) - with open(os.path.join(create["Mountpoint"], "test_prune"), "w") as file: - file.writelines(["This is a test\n", "This is a good test\n"]) - - prune = requests.post(PODMAN_URL + "/v1.40/volumes/prune") - self.assertEqual(prune.status_code, 200, prune.content) - payload = json.loads(prune.content) - self.assertIn(name, payload["VolumesDeleted"]) - self.assertGreater(payload["SpaceReclaimed"], 0) - - def test_version(self): - r = requests.get(PODMAN_URL + "/v1.40/version") - self.assertEqual(r.status_code, 200, r.content) - - r = requests.get(_url("/version")) - self.assertEqual(r.status_code, 200, r.content) - - def test_df_compat(self): - r = requests.get(PODMAN_URL + "/v1.40/system/df") - self.assertEqual(r.status_code, 200, r.content) - - obj = json.loads(r.content) - self.assertIn("Images", obj) - self.assertIn("Containers", obj) - self.assertIn("Volumes", obj) - self.assertIn("BuildCache", obj) - - def test_prune_compat(self): - name = "Ctnr_" + "".join(random.choice(string.ascii_letters) for i in range(10)) - - r = requests.post( - PODMAN_URL + f"/v1.40/containers/create?name={name}", - json={ - "Cmd": ["cp", "/etc/motd", "/motd.size_test"], - "Image": "alpine:latest", - "NetworkDisabled": True, - }, - ) - self.assertEqual(r.status_code, 201, r.text) - create = json.loads(r.text) - - r = requests.post(PODMAN_URL + f"/v1.40/containers/{create['Id']}/start") - self.assertEqual(r.status_code, 204, r.text) - - r = requests.post(PODMAN_URL + f"/v1.40/containers/{create['Id']}/wait") - self.assertEqual(r.status_code, 200, r.text) - wait = json.loads(r.text) - self.assertEqual(wait["StatusCode"], 0, wait["Error"]["Message"]) - - prune = requests.post(PODMAN_URL + "/v1.40/containers/prune") - self.assertEqual(prune.status_code, 200, prune.status_code) - prune_payload = json.loads(prune.text) - self.assertGreater(prune_payload["SpaceReclaimed"], 0) - self.assertIn(create["Id"], prune_payload["ContainersDeleted"]) - - # Delete any orphaned containers - r = requests.get(PODMAN_URL + "/v1.40/containers/json?all=true") - self.assertEqual(r.status_code, 200, r.text) - for ctnr in json.loads(r.text): - requests.delete(PODMAN_URL + f"/v1.40/containers/{ctnr['Id']}?force=true") - - prune = requests.post(PODMAN_URL + "/v1.40/images/prune") - self.assertEqual(prune.status_code, 200, prune.text) - prune_payload = json.loads(prune.text) - self.assertGreater(prune_payload["SpaceReclaimed"], 0) - - # FIXME need method to determine which image is going to be "pruned" to fix test - # TODO should handler be recursive when deleting images? - # self.assertIn(img["Id"], prune_payload["ImagesDeleted"][1]["Deleted"]) - - # FIXME (@vrothberg): I commented this line out during the `libimage` migration. - # It doesn't make sense to report anything to be deleted if the reclaimed space - # is zero. I think the test needs some rewrite. - # self.assertIsNotNone(prune_payload["ImagesDeleted"][1]["Deleted"]) - - def test_status_compat(self): - r = requests.post( - PODMAN_URL + "/v1.40/containers/create?name=topcontainer", - json={"Cmd": ["top"], "Image": "alpine:latest"}, - ) - self.assertEqual(r.status_code, 201, r.text) - payload = json.loads(r.text) - container_id = payload["Id"] - self.assertIsNotNone(container_id) - - r = requests.get( - PODMAN_URL + "/v1.40/containers/json", - params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, - ) - self.assertEqual(r.status_code, 200, r.text) - payload = json.loads(r.text) - self.assertEqual(payload[0]["Status"], "Created") - - r = requests.post(PODMAN_URL + f"/v1.40/containers/{container_id}/start") - self.assertEqual(r.status_code, 204, r.text) - - r = requests.get( - PODMAN_URL + "/v1.40/containers/json", - params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, - ) - self.assertEqual(r.status_code, 200, r.text) - payload = json.loads(r.text) - self.assertTrue(str(payload[0]["Status"]).startswith("Up")) - - r = requests.post(PODMAN_URL + f"/v1.40/containers/{container_id}/pause") - self.assertEqual(r.status_code, 204, r.text) - - r = requests.get( - PODMAN_URL + "/v1.40/containers/json", - params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, - ) - self.assertEqual(r.status_code, 200, r.text) - payload = json.loads(r.text) - self.assertTrue(str(payload[0]["Status"]).startswith("Up")) - self.assertTrue(str(payload[0]["Status"]).endswith("(Paused)")) - - r = requests.post(PODMAN_URL + f"/v1.40/containers/{container_id}/unpause") - self.assertEqual(r.status_code, 204, r.text) - r = requests.post(PODMAN_URL + f"/v1.40/containers/{container_id}/stop") - self.assertEqual(r.status_code, 204, r.text) - - r = requests.get( - PODMAN_URL + "/v1.40/containers/json", - params={"all": "true", "filters": f'{{"id":["{container_id}"]}}'}, - ) - self.assertEqual(r.status_code, 200, r.text) - payload = json.loads(r.text) - self.assertTrue(str(payload[0]["Status"]).startswith("Exited")) - - r = requests.delete(PODMAN_URL + f"/v1.40/containers/{container_id}") - self.assertEqual(r.status_code, 204, r.text) - - def test_pod_start_conflict(self): - """Verify issue #8865""" - - pod_name = list() - pod_name.append("Pod_" + "".join(random.choice(string.ascii_letters) for i in range(10))) - pod_name.append("Pod_" + "".join(random.choice(string.ascii_letters) for i in range(10))) - - r = requests.post( - _url("/pods/create"), - json={ - "name": pod_name[0], - "no_infra": False, - "portmappings": [{"host_ip": "127.0.0.1", "host_port": 8889, "container_port": 89}], - }, - ) - self.assertEqual(r.status_code, 201, r.text) - r = requests.post( - _url("/containers/create"), - json={ - "pod": pod_name[0], - "image": "docker.io/alpine:latest", - "command": ["top"], - }, - ) - self.assertEqual(r.status_code, 201, r.text) - - r = requests.post( - _url("/pods/create"), - json={ - "name": pod_name[1], - "no_infra": False, - "portmappings": [{"host_ip": "127.0.0.1", "host_port": 8889, "container_port": 89}], - }, - ) - self.assertEqual(r.status_code, 201, r.text) - r = requests.post( - _url("/containers/create"), - json={ - "pod": pod_name[1], - "image": "docker.io/alpine:latest", - "command": ["top"], - }, - ) - self.assertEqual(r.status_code, 201, r.text) - - r = requests.post(_url(f"/pods/{pod_name[0]}/start")) - self.assertEqual(r.status_code, 200, r.text) - - r = requests.post(_url(f"/pods/{pod_name[1]}/start")) - self.assertEqual(r.status_code, 409, r.text) - - start = json.loads(r.text) - self.assertGreater(len(start["Errs"]), 0, r.text) - - def test_manifest_409(self): - r = requests.post(_url("/manifests/create"), params={"name": "ThisIsAnInvalidImage"}) - self.assertEqual(r.status_code, 400, r.text) - - def test_df(self): - r = requests.get(_url("/system/df")) - self.assertEqual(r.status_code, 200, r.text) - - -if __name__ == "__main__": - unittest.main() |