summaryrefslogtreecommitdiff
path: root/test/python/docker/compat/test_containers.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/python/docker/compat/test_containers.py')
-rw-r--r--test/python/docker/compat/test_containers.py200
1 files changed, 200 insertions, 0 deletions
diff --git a/test/python/docker/compat/test_containers.py b/test/python/docker/compat/test_containers.py
new file mode 100644
index 000000000..be70efa67
--- /dev/null
+++ b/test/python/docker/compat/test_containers.py
@@ -0,0 +1,200 @@
+import subprocess
+import sys
+import time
+import unittest
+
+from docker import DockerClient, errors
+
+from test.python.docker import Podman
+from test.python.docker.compat import common, constant
+
+
+class TestContainers(unittest.TestCase):
+ podman = None # initialized podman configuration for tests
+ service = None # podman service instance
+ topContainerId = ""
+
+ def setUp(self):
+ super().setUp()
+ self.client = DockerClient(base_url="tcp://127.0.0.1:8080", timeout=15)
+ TestContainers.podman.restore_image_from_cache(self.client)
+ TestContainers.topContainerId = common.run_top_container(self.client)
+ self.assertIsNotNone(TestContainers.topContainerId)
+
+ def tearDown(self):
+ common.remove_all_containers(self.client)
+ common.remove_all_images(self.client)
+ self.client.close()
+ return super().tearDown()
+
+ @classmethod
+ def setUpClass(cls):
+ super().setUpClass()
+ TestContainers.podman = Podman()
+ TestContainers.service = TestContainers.podman.open(
+ "system", "service", "tcp:127.0.0.1:8080", "--time=0"
+ )
+ # give the service some time to be ready...
+ time.sleep(2)
+
+ rc = TestContainers.service.poll()
+ if rc is not None:
+ raise subprocess.CalledProcessError(rc, "podman system service")
+
+ @classmethod
+ def tearDownClass(cls):
+ TestContainers.service.terminate()
+ stdout, stderr = TestContainers.service.communicate(timeout=0.5)
+ if stdout:
+ sys.stdout.write("\nContainers Service Stdout:\n" + stdout.decode("utf-8"))
+ if stderr:
+ sys.stderr.write("\nContainers Service Stderr:\n" + stderr.decode("utf-8"))
+
+ TestContainers.podman.tear_down()
+ return super().tearDownClass()
+
+ def test_create_container(self):
+ # Run a container with detach mode
+ self.client.containers.create(image="alpine", detach=True)
+ self.assertEqual(len(self.client.containers.list(all=True)), 2)
+
+ def test_create_network(self):
+ net = self.client.networks.create("testNetwork", driver="bridge")
+ ctnr = self.client.containers.create(image="alpine", detach=True)
+
+ # TODO fix when ready
+ # This test will not work until all connect|disconnect
+ # code is fixed.
+ # net.connect(ctnr)
+
+ # nets = self.client.networks.list(greedy=True)
+ # self.assertGreaterEqual(len(nets), 1)
+
+ # TODO fix endpoint to include containers
+ # for n in nets:
+ # if n.id == "testNetwork":
+ # self.assertEqual(ctnr.id, n.containers)
+ # self.assertTrue(False, "testNetwork not found")
+
+ def test_start_container(self):
+ # Podman docs says it should give a 304 but returns with no response
+ # # Start a already started container should return 304
+ # response = self.client.api.start(container=TestContainers.topContainerId)
+ # self.assertEqual(error.exception.response.status_code, 304)
+
+ # Create a new container and validate the count
+ self.client.containers.create(image=constant.ALPINE, name="container2")
+ containers = self.client.containers.list(all=True)
+ self.assertEqual(len(containers), 2)
+
+ def test_start_container_with_random_port_bind(self):
+ container = self.client.containers.create(
+ image=constant.ALPINE,
+ name="containerWithRandomBind",
+ ports={"1234/tcp": None},
+ )
+ containers = self.client.containers.list(all=True)
+ self.assertTrue(container in containers)
+
+ def test_stop_container(self):
+ top = self.client.containers.get(TestContainers.topContainerId)
+ self.assertEqual(top.status, "running")
+
+ # Stop a running container and validate the state
+ top.stop()
+ top.reload()
+ self.assertIn(top.status, ("stopped", "exited"))
+
+ def test_kill_container(self):
+ top = self.client.containers.get(TestContainers.topContainerId)
+ self.assertEqual(top.status, "running")
+
+ # Kill a running container and validate the state
+ top.kill()
+ top.reload()
+ self.assertIn(top.status, ("stopped", "exited"))
+
+ def test_restart_container(self):
+ # Validate the container state
+ top = self.client.containers.get(TestContainers.topContainerId)
+ top.stop()
+ top.reload()
+ self.assertIn(top.status, ("stopped", "exited"))
+
+ # restart a running container and validate the state
+ top.restart()
+ top.reload()
+ self.assertEqual(top.status, "running")
+
+ def test_remove_container(self):
+ # Remove container by ID with force
+ top = self.client.containers.get(TestContainers.topContainerId)
+ top.remove(force=True)
+ self.assertEqual(len(self.client.containers.list()), 0)
+
+ def test_remove_container_without_force(self):
+ # Validate current container count
+ self.assertTrue(len(self.client.containers.list()), 1)
+
+ # Remove running container should throw error
+ top = self.client.containers.get(TestContainers.topContainerId)
+ with self.assertRaises(errors.APIError) as error:
+ top.remove()
+ self.assertEqual(error.exception.response.status_code, 500)
+
+ # Remove container by ID without force
+ top.stop()
+ top.remove()
+ self.assertEqual(len(self.client.containers.list()), 0)
+
+ def test_pause_container(self):
+ # Validate the container state
+ top = self.client.containers.get(TestContainers.topContainerId)
+ self.assertEqual(top.status, "running")
+
+ # Pause a running container and validate the state
+ top.pause()
+ top.reload()
+ self.assertEqual(top.status, "paused")
+
+ def test_pause_stopped_container(self):
+ # Stop the container
+ top = self.client.containers.get(TestContainers.topContainerId)
+ top.stop()
+
+ # Pause exited container should throw error
+ with self.assertRaises(errors.APIError) as error:
+ top.pause()
+ self.assertEqual(error.exception.response.status_code, 500)
+
+ def test_unpause_container(self):
+ top = self.client.containers.get(TestContainers.topContainerId)
+
+ # Validate the container state
+ top.pause()
+ top.reload()
+ self.assertEqual(top.status, "paused")
+
+ # Pause a running container and validate the state
+ top.unpause()
+ top.reload()
+ self.assertEqual(top.status, "running")
+
+ def test_list_container(self):
+ # Add container and validate the count
+ self.client.containers.create(image="alpine", detach=True)
+ containers = self.client.containers.list(all=True)
+ self.assertEqual(len(containers), 2)
+
+ def test_filters(self):
+ self.skipTest("TODO Endpoint does not yet support filters")
+
+ # List container with filter by id
+ filters = {"id": TestContainers.topContainerId}
+ ctnrs = self.client.containers.list(all=True, filters=filters)
+ self.assertEqual(len(ctnrs), 1)
+
+ # List container with filter by name
+ filters = {"name": "top"}
+ ctnrs = self.client.containers.list(all=True, filters=filters)
+ self.assertEqual(len(ctnrs), 1)