summaryrefslogtreecommitdiff
path: root/test/python/docker/test_containers.py
diff options
context:
space:
mode:
authorJhon Honce <jhonce@redhat.com>2021-03-01 10:55:20 -0700
committerJhon Honce <jhonce@redhat.com>2021-03-01 13:15:59 -0700
commit7927fe01f165bb4a3f381601d847036a3a130182 (patch)
tree1ed8c48b6187a80b3822cf41431e92972fb0ad20 /test/python/docker/test_containers.py
parent73044b28172fd0df23836052a9061dc41f51f39a (diff)
downloadpodman-7927fe01f165bb4a3f381601d847036a3a130182.tar.gz
podman-7927fe01f165bb4a3f381601d847036a3a130182.tar.bz2
podman-7927fe01f165bb4a3f381601d847036a3a130182.zip
Refactor python tests to run against python3.9
* Introduce sub-package compat to meet packaging and import requirements * Update documenation for running tests * Add requirements.txt to improve IDE support Signed-off-by: Jhon Honce <jhonce@redhat.com>
Diffstat (limited to 'test/python/docker/test_containers.py')
-rw-r--r--test/python/docker/test_containers.py197
1 files changed, 0 insertions, 197 deletions
diff --git a/test/python/docker/test_containers.py b/test/python/docker/test_containers.py
deleted file mode 100644
index 337cacd5c..000000000
--- a/test/python/docker/test_containers.py
+++ /dev/null
@@ -1,197 +0,0 @@
-import subprocess
-import sys
-import time
-import unittest
-
-from docker import DockerClient, errors
-
-from test.python.docker import Podman, common, constant
-
-
-class TestContainers(unittest.TestCase):
- podman = None # initialized podman configuration for tests
- service = None # podman service instance
- topContainerId = ""
-
- def setUp(self):
- super().setUp()
- self.client = DockerClient(base_url="tcp://127.0.0.1:8080", timeout=15)
- TestContainers.podman.restore_image_from_cache(self.client)
- TestContainers.topContainerId = common.run_top_container(self.client)
- self.assertIsNotNone(TestContainers.topContainerId)
-
- def tearDown(self):
- common.remove_all_containers(self.client)
- common.remove_all_images(self.client)
- self.client.close()
- return super().tearDown()
-
- @classmethod
- def setUpClass(cls):
- super().setUpClass()
- TestContainers.podman = Podman()
- TestContainers.service = TestContainers.podman.open(
- "system", "service", "tcp:127.0.0.1:8080", "--time=0"
- )
- # give the service some time to be ready...
- time.sleep(2)
-
- rc = TestContainers.service.poll()
- if rc is not None:
- raise subprocess.CalledProcessError(rc, "podman system service")
-
- @classmethod
- def tearDownClass(cls):
- TestContainers.service.terminate()
- stdout, stderr = TestContainers.service.communicate(timeout=0.5)
- if stdout:
- sys.stdout.write("\nContainers Service Stdout:\n" + stdout.decode("utf-8"))
- if stderr:
- sys.stderr.write("\nContainers Service Stderr:\n" + stderr.decode("utf-8"))
-
- TestContainers.podman.tear_down()
- return super().tearDownClass()
-
- def test_create_container(self):
- # Run a container with detach mode
- self.client.containers.create(image="alpine", detach=True)
- self.assertEqual(len(self.client.containers.list(all=True)), 2)
-
- def test_create_network(self):
- net = self.client.networks.create("testNetwork", driver="bridge")
- ctnr = self.client.containers.create(image="alpine", detach=True)
-
- # TODO fix when ready
- # This test will not work until all connect|disconnect
- # code is fixed.
- # net.connect(ctnr)
-
- # nets = self.client.networks.list(greedy=True)
- # self.assertGreaterEqual(len(nets), 1)
-
- # TODO fix endpoint to include containers
- # for n in nets:
- # if n.id == "testNetwork":
- # self.assertEqual(ctnr.id, n.containers)
- # self.assertTrue(False, "testNetwork not found")
-
- def test_start_container(self):
- # Podman docs says it should give a 304 but returns with no response
- # # Start a already started container should return 304
- # response = self.client.api.start(container=TestContainers.topContainerId)
- # self.assertEqual(error.exception.response.status_code, 304)
-
- # Create a new container and validate the count
- self.client.containers.create(image=constant.ALPINE, name="container2")
- containers = self.client.containers.list(all=True)
- self.assertEqual(len(containers), 2)
-
- def test_start_container_with_random_port_bind(self):
- container = self.client.containers.create(image=constant.ALPINE,
- name="containerWithRandomBind",
- ports={'1234/tcp': None})
- containers = self.client.containers.list(all=True)
- self.assertTrue(container in containers)
-
- def test_stop_container(self):
- top = self.client.containers.get(TestContainers.topContainerId)
- self.assertEqual(top.status, "running")
-
- # Stop a running container and validate the state
- top.stop()
- top.reload()
- self.assertIn(top.status, ("stopped", "exited"))
-
- def test_kill_container(self):
- top = self.client.containers.get(TestContainers.topContainerId)
- self.assertEqual(top.status, "running")
-
- # Kill a running container and validate the state
- top.kill()
- top.reload()
- self.assertIn(top.status, ("stopped", "exited"))
-
- def test_restart_container(self):
- # Validate the container state
- top = self.client.containers.get(TestContainers.topContainerId)
- top.stop()
- top.reload()
- self.assertIn(top.status, ("stopped", "exited"))
-
- # restart a running container and validate the state
- top.restart()
- top.reload()
- self.assertEqual(top.status, "running")
-
- def test_remove_container(self):
- # Remove container by ID with force
- top = self.client.containers.get(TestContainers.topContainerId)
- top.remove(force=True)
- self.assertEqual(len(self.client.containers.list()), 0)
-
- def test_remove_container_without_force(self):
- # Validate current container count
- self.assertTrue(len(self.client.containers.list()), 1)
-
- # Remove running container should throw error
- top = self.client.containers.get(TestContainers.topContainerId)
- with self.assertRaises(errors.APIError) as error:
- top.remove()
- self.assertEqual(error.exception.response.status_code, 500)
-
- # Remove container by ID without force
- top.stop()
- top.remove()
- self.assertEqual(len(self.client.containers.list()), 0)
-
- def test_pause_container(self):
- # Validate the container state
- top = self.client.containers.get(TestContainers.topContainerId)
- self.assertEqual(top.status, "running")
-
- # Pause a running container and validate the state
- top.pause()
- top.reload()
- self.assertEqual(top.status, "paused")
-
- def test_pause_stopped_container(self):
- # Stop the container
- top = self.client.containers.get(TestContainers.topContainerId)
- top.stop()
-
- # Pause exited container should throw error
- with self.assertRaises(errors.APIError) as error:
- top.pause()
- self.assertEqual(error.exception.response.status_code, 500)
-
- def test_unpause_container(self):
- top = self.client.containers.get(TestContainers.topContainerId)
-
- # Validate the container state
- top.pause()
- top.reload()
- self.assertEqual(top.status, "paused")
-
- # Pause a running container and validate the state
- top.unpause()
- top.reload()
- self.assertEqual(top.status, "running")
-
- def test_list_container(self):
- # Add container and validate the count
- self.client.containers.create(image="alpine", detach=True)
- containers = self.client.containers.list(all=True)
- self.assertEqual(len(containers), 2)
-
- def test_filters(self):
- self.skipTest("TODO Endpoint does not yet support filters")
-
- # List container with filter by id
- filters = {"id": TestContainers.topContainerId}
- ctnrs = self.client.containers.list(all=True, filters=filters)
- self.assertEqual(len(ctnrs), 1)
-
- # List container with filter by name
- filters = {"name": "top"}
- ctnrs = self.client.containers.list(all=True, filters=filters)
- self.assertEqual(len(ctnrs), 1)