diff options
Diffstat (limited to 'test/python/docker/compat/test_containers.py')
-rw-r--r-- | test/python/docker/compat/test_containers.py | 200 |
1 files changed, 200 insertions, 0 deletions
diff --git a/test/python/docker/compat/test_containers.py b/test/python/docker/compat/test_containers.py new file mode 100644 index 000000000..be70efa67 --- /dev/null +++ b/test/python/docker/compat/test_containers.py @@ -0,0 +1,200 @@ +import subprocess +import sys +import time +import unittest + +from docker import DockerClient, errors + +from test.python.docker import Podman +from test.python.docker.compat import common, constant + + +class TestContainers(unittest.TestCase): + podman = None # initialized podman configuration for tests + service = None # podman service instance + topContainerId = "" + + def setUp(self): + super().setUp() + self.client = DockerClient(base_url="tcp://127.0.0.1:8080", timeout=15) + TestContainers.podman.restore_image_from_cache(self.client) + TestContainers.topContainerId = common.run_top_container(self.client) + self.assertIsNotNone(TestContainers.topContainerId) + + def tearDown(self): + common.remove_all_containers(self.client) + common.remove_all_images(self.client) + self.client.close() + return super().tearDown() + + @classmethod + def setUpClass(cls): + super().setUpClass() + TestContainers.podman = Podman() + TestContainers.service = TestContainers.podman.open( + "system", "service", "tcp:127.0.0.1:8080", "--time=0" + ) + # give the service some time to be ready... + time.sleep(2) + + rc = TestContainers.service.poll() + if rc is not None: + raise subprocess.CalledProcessError(rc, "podman system service") + + @classmethod + def tearDownClass(cls): + TestContainers.service.terminate() + stdout, stderr = TestContainers.service.communicate(timeout=0.5) + if stdout: + sys.stdout.write("\nContainers Service Stdout:\n" + stdout.decode("utf-8")) + if stderr: + sys.stderr.write("\nContainers Service Stderr:\n" + stderr.decode("utf-8")) + + TestContainers.podman.tear_down() + return super().tearDownClass() + + def test_create_container(self): + # Run a container with detach mode + self.client.containers.create(image="alpine", detach=True) + self.assertEqual(len(self.client.containers.list(all=True)), 2) + + def test_create_network(self): + net = self.client.networks.create("testNetwork", driver="bridge") + ctnr = self.client.containers.create(image="alpine", detach=True) + + # TODO fix when ready + # This test will not work until all connect|disconnect + # code is fixed. + # net.connect(ctnr) + + # nets = self.client.networks.list(greedy=True) + # self.assertGreaterEqual(len(nets), 1) + + # TODO fix endpoint to include containers + # for n in nets: + # if n.id == "testNetwork": + # self.assertEqual(ctnr.id, n.containers) + # self.assertTrue(False, "testNetwork not found") + + def test_start_container(self): + # Podman docs says it should give a 304 but returns with no response + # # Start a already started container should return 304 + # response = self.client.api.start(container=TestContainers.topContainerId) + # self.assertEqual(error.exception.response.status_code, 304) + + # Create a new container and validate the count + self.client.containers.create(image=constant.ALPINE, name="container2") + containers = self.client.containers.list(all=True) + self.assertEqual(len(containers), 2) + + def test_start_container_with_random_port_bind(self): + container = self.client.containers.create( + image=constant.ALPINE, + name="containerWithRandomBind", + ports={"1234/tcp": None}, + ) + containers = self.client.containers.list(all=True) + self.assertTrue(container in containers) + + def test_stop_container(self): + top = self.client.containers.get(TestContainers.topContainerId) + self.assertEqual(top.status, "running") + + # Stop a running container and validate the state + top.stop() + top.reload() + self.assertIn(top.status, ("stopped", "exited")) + + def test_kill_container(self): + top = self.client.containers.get(TestContainers.topContainerId) + self.assertEqual(top.status, "running") + + # Kill a running container and validate the state + top.kill() + top.reload() + self.assertIn(top.status, ("stopped", "exited")) + + def test_restart_container(self): + # Validate the container state + top = self.client.containers.get(TestContainers.topContainerId) + top.stop() + top.reload() + self.assertIn(top.status, ("stopped", "exited")) + + # restart a running container and validate the state + top.restart() + top.reload() + self.assertEqual(top.status, "running") + + def test_remove_container(self): + # Remove container by ID with force + top = self.client.containers.get(TestContainers.topContainerId) + top.remove(force=True) + self.assertEqual(len(self.client.containers.list()), 0) + + def test_remove_container_without_force(self): + # Validate current container count + self.assertTrue(len(self.client.containers.list()), 1) + + # Remove running container should throw error + top = self.client.containers.get(TestContainers.topContainerId) + with self.assertRaises(errors.APIError) as error: + top.remove() + self.assertEqual(error.exception.response.status_code, 500) + + # Remove container by ID without force + top.stop() + top.remove() + self.assertEqual(len(self.client.containers.list()), 0) + + def test_pause_container(self): + # Validate the container state + top = self.client.containers.get(TestContainers.topContainerId) + self.assertEqual(top.status, "running") + + # Pause a running container and validate the state + top.pause() + top.reload() + self.assertEqual(top.status, "paused") + + def test_pause_stopped_container(self): + # Stop the container + top = self.client.containers.get(TestContainers.topContainerId) + top.stop() + + # Pause exited container should throw error + with self.assertRaises(errors.APIError) as error: + top.pause() + self.assertEqual(error.exception.response.status_code, 500) + + def test_unpause_container(self): + top = self.client.containers.get(TestContainers.topContainerId) + + # Validate the container state + top.pause() + top.reload() + self.assertEqual(top.status, "paused") + + # Pause a running container and validate the state + top.unpause() + top.reload() + self.assertEqual(top.status, "running") + + def test_list_container(self): + # Add container and validate the count + self.client.containers.create(image="alpine", detach=True) + containers = self.client.containers.list(all=True) + self.assertEqual(len(containers), 2) + + def test_filters(self): + self.skipTest("TODO Endpoint does not yet support filters") + + # List container with filter by id + filters = {"id": TestContainers.topContainerId} + ctnrs = self.client.containers.list(all=True, filters=filters) + self.assertEqual(len(ctnrs), 1) + + # List container with filter by name + filters = {"name": "top"} + ctnrs = self.client.containers.list(all=True, filters=filters) + self.assertEqual(len(ctnrs), 1) |