diff options
Diffstat (limited to 'test/python/docker/test_containers.py')
-rw-r--r-- | test/python/docker/test_containers.py | 197 |
1 files changed, 0 insertions, 197 deletions
diff --git a/test/python/docker/test_containers.py b/test/python/docker/test_containers.py deleted file mode 100644 index 337cacd5c..000000000 --- a/test/python/docker/test_containers.py +++ /dev/null @@ -1,197 +0,0 @@ -import subprocess -import sys -import time -import unittest - -from docker import DockerClient, errors - -from test.python.docker import Podman, common, constant - - -class TestContainers(unittest.TestCase): - podman = None # initialized podman configuration for tests - service = None # podman service instance - topContainerId = "" - - def setUp(self): - super().setUp() - self.client = DockerClient(base_url="tcp://127.0.0.1:8080", timeout=15) - TestContainers.podman.restore_image_from_cache(self.client) - TestContainers.topContainerId = common.run_top_container(self.client) - self.assertIsNotNone(TestContainers.topContainerId) - - def tearDown(self): - common.remove_all_containers(self.client) - common.remove_all_images(self.client) - self.client.close() - return super().tearDown() - - @classmethod - def setUpClass(cls): - super().setUpClass() - TestContainers.podman = Podman() - TestContainers.service = TestContainers.podman.open( - "system", "service", "tcp:127.0.0.1:8080", "--time=0" - ) - # give the service some time to be ready... - time.sleep(2) - - rc = TestContainers.service.poll() - if rc is not None: - raise subprocess.CalledProcessError(rc, "podman system service") - - @classmethod - def tearDownClass(cls): - TestContainers.service.terminate() - stdout, stderr = TestContainers.service.communicate(timeout=0.5) - if stdout: - sys.stdout.write("\nContainers Service Stdout:\n" + stdout.decode("utf-8")) - if stderr: - sys.stderr.write("\nContainers Service Stderr:\n" + stderr.decode("utf-8")) - - TestContainers.podman.tear_down() - return super().tearDownClass() - - def test_create_container(self): - # Run a container with detach mode - self.client.containers.create(image="alpine", detach=True) - self.assertEqual(len(self.client.containers.list(all=True)), 2) - - def test_create_network(self): - net = self.client.networks.create("testNetwork", driver="bridge") - ctnr = self.client.containers.create(image="alpine", detach=True) - - # TODO fix when ready - # This test will not work until all connect|disconnect - # code is fixed. - # net.connect(ctnr) - - # nets = self.client.networks.list(greedy=True) - # self.assertGreaterEqual(len(nets), 1) - - # TODO fix endpoint to include containers - # for n in nets: - # if n.id == "testNetwork": - # self.assertEqual(ctnr.id, n.containers) - # self.assertTrue(False, "testNetwork not found") - - def test_start_container(self): - # Podman docs says it should give a 304 but returns with no response - # # Start a already started container should return 304 - # response = self.client.api.start(container=TestContainers.topContainerId) - # self.assertEqual(error.exception.response.status_code, 304) - - # Create a new container and validate the count - self.client.containers.create(image=constant.ALPINE, name="container2") - containers = self.client.containers.list(all=True) - self.assertEqual(len(containers), 2) - - def test_start_container_with_random_port_bind(self): - container = self.client.containers.create(image=constant.ALPINE, - name="containerWithRandomBind", - ports={'1234/tcp': None}) - containers = self.client.containers.list(all=True) - self.assertTrue(container in containers) - - def test_stop_container(self): - top = self.client.containers.get(TestContainers.topContainerId) - self.assertEqual(top.status, "running") - - # Stop a running container and validate the state - top.stop() - top.reload() - self.assertIn(top.status, ("stopped", "exited")) - - def test_kill_container(self): - top = self.client.containers.get(TestContainers.topContainerId) - self.assertEqual(top.status, "running") - - # Kill a running container and validate the state - top.kill() - top.reload() - self.assertIn(top.status, ("stopped", "exited")) - - def test_restart_container(self): - # Validate the container state - top = self.client.containers.get(TestContainers.topContainerId) - top.stop() - top.reload() - self.assertIn(top.status, ("stopped", "exited")) - - # restart a running container and validate the state - top.restart() - top.reload() - self.assertEqual(top.status, "running") - - def test_remove_container(self): - # Remove container by ID with force - top = self.client.containers.get(TestContainers.topContainerId) - top.remove(force=True) - self.assertEqual(len(self.client.containers.list()), 0) - - def test_remove_container_without_force(self): - # Validate current container count - self.assertTrue(len(self.client.containers.list()), 1) - - # Remove running container should throw error - top = self.client.containers.get(TestContainers.topContainerId) - with self.assertRaises(errors.APIError) as error: - top.remove() - self.assertEqual(error.exception.response.status_code, 500) - - # Remove container by ID without force - top.stop() - top.remove() - self.assertEqual(len(self.client.containers.list()), 0) - - def test_pause_container(self): - # Validate the container state - top = self.client.containers.get(TestContainers.topContainerId) - self.assertEqual(top.status, "running") - - # Pause a running container and validate the state - top.pause() - top.reload() - self.assertEqual(top.status, "paused") - - def test_pause_stopped_container(self): - # Stop the container - top = self.client.containers.get(TestContainers.topContainerId) - top.stop() - - # Pause exited container should throw error - with self.assertRaises(errors.APIError) as error: - top.pause() - self.assertEqual(error.exception.response.status_code, 500) - - def test_unpause_container(self): - top = self.client.containers.get(TestContainers.topContainerId) - - # Validate the container state - top.pause() - top.reload() - self.assertEqual(top.status, "paused") - - # Pause a running container and validate the state - top.unpause() - top.reload() - self.assertEqual(top.status, "running") - - def test_list_container(self): - # Add container and validate the count - self.client.containers.create(image="alpine", detach=True) - containers = self.client.containers.list(all=True) - self.assertEqual(len(containers), 2) - - def test_filters(self): - self.skipTest("TODO Endpoint does not yet support filters") - - # List container with filter by id - filters = {"id": TestContainers.topContainerId} - ctnrs = self.client.containers.list(all=True, filters=filters) - self.assertEqual(len(ctnrs), 1) - - # List container with filter by name - filters = {"name": "top"} - ctnrs = self.client.containers.list(all=True, filters=filters) - self.assertEqual(len(ctnrs), 1) |