import collections import os import subprocess import sys import time import unittest from docker import DockerClient, errors from test.python.docker import Podman, common, constant class TestImages(unittest.TestCase): podman = None # initialized podman configuration for tests service = None # podman service instance def setUp(self): super().setUp() self.client = DockerClient(base_url="tcp://127.0.0.1:8080", timeout=15) TestImages.podman.restore_image_from_cache(self.client) def tearDown(self): common.remove_all_images(self.client) self.client.close() return super().tearDown() @classmethod def setUpClass(cls): super().setUpClass() TestImages.podman = Podman() TestImages.service = TestImages.podman.open( "system", "service", "tcp:127.0.0.1:8080", "--time=0" ) # give the service some time to be ready... time.sleep(2) returncode = TestImages.service.poll() if returncode is not None: raise subprocess.CalledProcessError(returncode, "podman system service") @classmethod def tearDownClass(cls): TestImages.service.terminate() stdout, stderr = TestImages.service.communicate(timeout=0.5) if stdout: sys.stdout.write("\nImages Service Stdout:\n" + stdout.decode("utf-8")) if stderr: sys.stderr.write("\nImAges Service Stderr:\n" + stderr.decode("utf-8")) TestImages.podman.tear_down() return super().tearDownClass() def test_tag_valid_image(self): """Validates if the image is tagged successfully""" alpine = self.client.images.get(constant.ALPINE) self.assertTrue(alpine.tag("demo", constant.ALPINE_SHORTNAME)) alpine = self.client.images.get(constant.ALPINE) for t in alpine.tags: self.assertIn("alpine", t) # @unittest.skip("doesn't work now") def test_retag_valid_image(self): """Validates if name updates when the image is retagged""" alpine = self.client.images.get(constant.ALPINE) self.assertTrue(alpine.tag("demo", "rename")) alpine = self.client.images.get(constant.ALPINE) self.assertNotIn("demo:test", alpine.tags) def test_list_images(self): """List images""" self.assertEqual(len(self.client.images.list()), 1) # Add more images self.client.images.pull(constant.BB) self.assertEqual(len(self.client.images.list()), 2) # List images with filter self.assertEqual(len(self.client.images.list(filters={"reference": "alpine"})), 1) def test_search_image(self): """Search for image""" for r in self.client.images.search("libpod/alpine"): self.assertIn("quay.io/libpod/alpine", r["Name"]) def test_remove_image(self): """Remove image""" # Check for error with wrong image name with self.assertRaises(errors.NotFound): self.client.images.remove("dummy") self.assertEqual(len(self.client.images.list()), 1) self.client.images.remove(constant.ALPINE) self.assertEqual(len(self.client.images.list()), 0) def test_image_history(self): """Image history""" img = self.client.images.get(constant.ALPINE) history = img.history() image_id = img.id[7:] if img.id.startswith("sha256:") else img.id found = False for change in history: found |= image_id in change.values() self.assertTrue(found, f"image id {image_id} not found in history") def test_get_image_exists_not(self): """Negative test for get image""" with self.assertRaises(errors.NotFound): response = self.client.images.get("image_does_not_exists") collections.deque(response) def test_save_image(self): """Export Image""" image = self.client.images.pull(constant.BB) file = os.path.join(TestImages.podman.image_cache, "busybox.tar") with open(file, mode="wb") as tarball: for frame in image.save(named=True): tarball.write(frame) sz = os.path.getsize(file) self.assertGreater(sz, 0) def test_load_image(self): """Import|Load Image""" self.assertEqual(len(self.client.images.list()), 1) image = self.client.images.pull(constant.BB) file = os.path.join(TestImages.podman.image_cache, "busybox.tar") with open(file, mode="wb") as tarball: for frame in image.save(): tarball.write(frame) with open(file, mode="rb") as saved: _ = self.client.images.load(saved) self.assertEqual(len(self.client.images.list()), 2) if __name__ == "__main__": # Setup temporary space unittest.main()