VYPR
Low severityNVD Advisory· Published May 8, 2014· Updated May 6, 2026

CVE-2014-0134

CVE-2014-0134

Description

The instance rescue mode in OpenStack Compute (Nova) 2013.2 before 2013.2.3 and Icehouse before 2014.1, when using libvirt to spawn images and use_cow_images is set to false, allows remote authenticated users to read certain compute host files by overwriting an instance disk with a crafted image.

Affected packages

Versions sourced from the GitHub Security Advisory.

PackageAffected versionsPatched versions
novaPyPI
< 12.0.0a012.0.0a0

Affected products

3
  • OpenStack/Compute3 versions
    cpe:2.3:a:openstack:compute:2013.2:*:*:*:*:*:*:*+ 2 more
    • cpe:2.3:a:openstack:compute:2013.2:*:*:*:*:*:*:*
    • cpe:2.3:a:openstack:compute:2013.2.1:*:*:*:*:*:*:*
    • cpe:2.3:a:openstack:compute:2013.2.2:*:*:*:*:*:*:*

Patches

3
d416f4310bb9

Avoid the possibility of truncating disk info file

https://github.com/openstack/novaNikola DipanovApr 9, 2014via ghsa
4 files changed · +13 48
  • etc/nova/rootwrap.d/compute.filters+0 1 modified
    @@ -45,7 +45,6 @@ mkdir: CommandFilter, mkdir, root
     # nova/virt/libvirt/connection.py: 'chown', os.getuid( console_log
     # nova/virt/libvirt/connection.py: 'chown', os.getuid( console_log
     # nova/virt/libvirt/connection.py: 'chown', 'root', basepath('disk')
    -# nova/utils.py: 'chown', owner_uid, path
     chown: CommandFilter, chown, root
     
     # nova/virt/disk/vfs/localfs.py: 'chmod'
    
  • nova/tests/virt/libvirt/test_imagebackend.py+0 21 modified
    @@ -28,7 +28,6 @@
     from nova import test
     from nova.tests import fake_processutils
     from nova.tests.virt.libvirt import fake_libvirt_utils
    -from nova import utils
     from nova.virt.libvirt import imagebackend
     
     CONF = cfg.CONF
    @@ -67,10 +66,6 @@ def setUp(self):
                 'nova.virt.libvirt.imagebackend.libvirt_utils',
                 fake_libvirt_utils))
     
    -        def fake_chown(path, owner_uid=None):
    -            return None
    -        self.stubs.Set(utils, 'chown', fake_chown)
    -
         def tearDown(self):
             super(_ImageTestCase, self).tearDown()
             shutil.rmtree(self.INSTANCES_PATH)
    @@ -127,10 +122,6 @@ def setUp(self):
             super(RawTestCase, self).setUp()
             self.stubs.Set(imagebackend.Raw, 'correct_format', lambda _: None)
     
    -        def fake_chown(path, owner_uid=None):
    -            return None
    -        self.stubs.Set(utils, 'chown', fake_chown)
    -
         def prepare_mocks(self):
             fn = self.mox.CreateMockAnything()
             self.mox.StubOutWithMock(imagebackend.utils.synchronized,
    @@ -243,10 +234,6 @@ def test_correct_format(self):
             self.mox.StubOutWithMock(os.path, 'exists')
             self.mox.StubOutWithMock(imagebackend.images, 'qemu_img_info')
     
    -        def fake_chown(path, owner_uid=None):
    -            return None
    -        self.stubs.Set(utils, 'chown', fake_chown)
    -
             os.path.exists(self.PATH).AndReturn(True)
             os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
             info = self.mox.CreateMockAnything()
    @@ -275,10 +262,6 @@ def setUp(self):
             self.QCOW2_BASE = (self.TEMPLATE_PATH +
                                '_%d' % (self.SIZE / units.Gi))
     
    -        def fake_chown(path, owner_uid=None):
    -            return None
    -        self.stubs.Set(utils, 'chown', fake_chown)
    -
         def prepare_mocks(self):
             fn = self.mox.CreateMockAnything()
             self.mox.StubOutWithMock(imagebackend.utils.synchronized,
    @@ -874,10 +857,6 @@ class BackendTestCase(test.NoDBTestCase):
         def setUp(self):
             super(BackendTestCase, self).setUp()
     
    -        def fake_chown(path, owner_uid=None):
    -            return None
    -        self.stubs.Set(utils, 'chown', fake_chown)
    -
         def get_image(self, use_cow, image_type):
             return imagebackend.Backend(use_cow).image(self.INSTANCE,
                                                        self.NAME,
    
  • nova/utils.py+0 14 modified
    @@ -761,20 +761,6 @@ def temporary_chown(path, owner_uid=None):
                 execute('chown', orig_uid, path, run_as_root=True)
     
     
    -def chown(path, owner_uid=None):
    -    """chown a path.
    -
    -    :param owner_uid: UID of owner (defaults to current user)
    -    """
    -    if owner_uid is None:
    -        owner_uid = os.getuid()
    -
    -    orig_uid = os.stat(path).st_uid
    -
    -    if orig_uid != owner_uid:
    -        execute('chown', owner_uid, path, run_as_root=True)
    -
    -
     @contextlib.contextmanager
     def tempdir(**kwargs):
         argdict = kwargs.copy()
    
  • nova/virt/libvirt/imagebackend.py+13 12 modified
    @@ -272,20 +272,21 @@ def _dict_from_line(line):
                                 lock_path=self.lock_path)
             def write_to_disk_info_file():
                 # Use os.open to create it without group or world write permission.
    -            fd = os.open(self.disk_info_path, os.O_RDWR | os.O_CREAT, 0o644)
    -            with os.fdopen(fd, "r+") as disk_info_file:
    +            fd = os.open(self.disk_info_path, os.O_RDONLY | os.O_CREAT, 0o644)
    +            with os.fdopen(fd, "r") as disk_info_file:
                     line = disk_info_file.read().rstrip()
                     dct = _dict_from_line(line)
    -                if self.path in dct:
    -                    msg = _("Attempted overwrite of an existing value.")
    -                    raise exception.InvalidDiskInfo(reason=msg)
    -                dct.update({self.path: driver_format})
    -                disk_info_file.seek(0)
    -                disk_info_file.truncate()
    -                disk_info_file.write('%s\n' % jsonutils.dumps(dct))
    -            # Ensure the file is always owned by the nova user so qemu can't
    -            # write it.
    -            utils.chown(self.disk_info_path, owner_uid=os.getuid())
    +
    +            if self.path in dct:
    +                msg = _("Attempted overwrite of an existing value.")
    +                raise exception.InvalidDiskInfo(reason=msg)
    +            dct.update({self.path: driver_format})
    +
    +            tmp_path = self.disk_info_path + ".tmp"
    +            fd = os.open(tmp_path, os.O_WRONLY | os.O_CREAT, 0o644)
    +            with os.fdopen(fd, "w") as tmp_file:
    +                tmp_file.write('%s\n' % jsonutils.dumps(dct))
    +            os.rename(tmp_path, self.disk_info_path)
     
             try:
                 if (self.disk_info_path is not None and
    
25e761acd56d

Persist image format to a file, to prevent attacks based on changing it

https://github.com/openstack/novaDavid RiptonJan 28, 2014via ghsa
6 files changed · +403 64
  • nova/exception.py+8 0 modified
    @@ -449,6 +449,14 @@ class InvalidDiskFormat(Invalid):
         msg_fmt = _("Disk format %(disk_format)s is not acceptable")
     
     
    +class InvalidDiskInfo(Invalid):
    +    msg_fmt = _("Disk info file is invalid: %(reason)s")
    +
    +
    +class DiskInfoReadWriteFail(Invalid):
    +    msg_fmt = _("Failed to read or write disk info file: %(reason)s")
    +
    +
     class ImageUnacceptable(Invalid):
         msg_fmt = _("Image %(image_id)s is unacceptable: %(reason)s")
     
    
  • nova/tests/virt/libvirt/test_imagebackend.py+297 62 modified
    @@ -16,6 +16,8 @@
     #    under the License.
     
     import os
    +import shutil
    +import tempfile
     
     import fixtures
     from oslo.config import cfg
    @@ -27,13 +29,13 @@
     from nova import test
     from nova.tests import fake_processutils
     from nova.tests.virt.libvirt import fake_libvirt_utils
    +from nova import utils
     from nova.virt.libvirt import imagebackend
     
     CONF = cfg.CONF
     
     
     class _ImageTestCase(object):
    -    INSTANCES_PATH = '/instances_path'
     
         def mock_create_image(self, image):
             def create_image(fn, base, size, *args, **kwargs):
    @@ -42,10 +44,13 @@ def create_image(fn, base, size, *args, **kwargs):
     
         def setUp(self):
             super(_ImageTestCase, self).setUp()
    +        self.INSTANCES_PATH = tempfile.mkdtemp(suffix='instances')
             self.flags(disable_process_locking=True,
                        instances_path=self.INSTANCES_PATH)
             self.INSTANCE = {'name': 'instance',
                              'uuid': uuidutils.generate_uuid()}
    +        self.DISK_INFO_PATH = os.path.join(self.INSTANCES_PATH,
    +                                           self.INSTANCE['uuid'], 'disk.info')
             self.NAME = 'fake.vm'
             self.TEMPLATE = 'template'
     
    @@ -63,6 +68,78 @@ def setUp(self):
                 'nova.virt.libvirt.imagebackend.libvirt_utils',
                 fake_libvirt_utils))
     
    +        def fake_chown(path, owner_uid=None):
    +            return None
    +        self.stubs.Set(utils, 'chown', fake_chown)
    +
    +    def tearDown(self):
    +        super(_ImageTestCase, self).tearDown()
    +        shutil.rmtree(self.INSTANCES_PATH)
    +
    +    def test_prealloc_image(self):
    +        CONF.set_override('preallocate_images', 'space')
    +
    +        fake_processutils.fake_execute_clear_log()
    +        fake_processutils.stub_out_processutils_execute(self.stubs)
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +
    +        def fake_fetch(target, *args, **kwargs):
    +            return
    +
    +        self.stubs.Set(os.path, 'exists', lambda _: True)
    +        self.stubs.Set(os, 'access', lambda p, w: True)
    +
    +        # Call twice to verify testing fallocate is only called once.
    +        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    +        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    +
    +        self.assertEqual(fake_processutils.fake_execute_get_log(),
    +            ['fallocate -n -l 1 %s.fallocate_test' % self.PATH,
    +             'fallocate -n -l %s %s' % (self.SIZE, self.PATH),
    +             'fallocate -n -l %s %s' % (self.SIZE, self.PATH)])
    +
    +    def test_prealloc_image_without_write_access(self):
    +        CONF.set_override('preallocate_images', 'space')
    +
    +        fake_processutils.fake_execute_clear_log()
    +        fake_processutils.stub_out_processutils_execute(self.stubs)
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +
    +        def fake_fetch(target, *args, **kwargs):
    +            return
    +
    +        self.stubs.Set(image, 'check_image_exists', lambda: True)
    +        self.stubs.Set(image, '_can_fallocate', lambda: True)
    +        self.stubs.Set(os.path, 'exists', lambda _: True)
    +        self.stubs.Set(os, 'access', lambda p, w: False)
    +
    +        # Testing fallocate is only called when user has write access.
    +        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    +
    +        self.assertEqual(fake_processutils.fake_execute_get_log(), [])
    +
    +
    +class RawTestCase(_ImageTestCase, test.NoDBTestCase):
    +
    +    SIZE = 1024
    +
    +    def setUp(self):
    +        self.image_class = imagebackend.Raw
    +        super(RawTestCase, self).setUp()
    +        self.stubs.Set(imagebackend.Raw, 'correct_format', lambda _: None)
    +
    +        def fake_chown(path, owner_uid=None):
    +            return None
    +        self.stubs.Set(utils, 'chown', fake_chown)
    +
    +    def prepare_mocks(self):
    +        fn = self.mox.CreateMockAnything()
    +        self.mox.StubOutWithMock(imagebackend.utils.synchronized,
    +                                 '__call__')
    +        self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
    +        self.mox.StubOutWithMock(imagebackend.disk, 'extend')
    +        return fn
    +
         def test_cache(self):
             self.mox.StubOutWithMock(os.path, 'exists')
             if self.OLD_STYLE_INSTANCE_PATH:
    @@ -130,66 +207,6 @@ def test_cache_template_exists(self):
     
             self.mox.VerifyAll()
     
    -    def test_prealloc_image(self):
    -        CONF.set_override('preallocate_images', 'space')
    -
    -        fake_processutils.fake_execute_clear_log()
    -        fake_processutils.stub_out_processutils_execute(self.stubs)
    -        image = self.image_class(self.INSTANCE, self.NAME)
    -
    -        def fake_fetch(target, *args, **kwargs):
    -            return
    -
    -        self.stubs.Set(os.path, 'exists', lambda _: True)
    -        self.stubs.Set(os, 'access', lambda p, w: True)
    -
    -        # Call twice to verify testing fallocate is only called once.
    -        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    -        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    -
    -        self.assertEqual(fake_processutils.fake_execute_get_log(),
    -            ['fallocate -n -l 1 %s.fallocate_test' % self.PATH,
    -             'fallocate -n -l %s %s' % (self.SIZE, self.PATH),
    -             'fallocate -n -l %s %s' % (self.SIZE, self.PATH)])
    -
    -    def test_prealloc_image_without_write_access(self):
    -        CONF.set_override('preallocate_images', 'space')
    -
    -        fake_processutils.fake_execute_clear_log()
    -        fake_processutils.stub_out_processutils_execute(self.stubs)
    -        image = self.image_class(self.INSTANCE, self.NAME)
    -
    -        def fake_fetch(target, *args, **kwargs):
    -            return
    -
    -        self.stubs.Set(image, 'check_image_exists', lambda: True)
    -        self.stubs.Set(image, '_can_fallocate', lambda: True)
    -        self.stubs.Set(os.path, 'exists', lambda _: True)
    -        self.stubs.Set(os, 'access', lambda p, w: False)
    -
    -        # Testing fallocate is only called when user has write access.
    -        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    -
    -        self.assertEqual(fake_processutils.fake_execute_get_log(), [])
    -
    -
    -class RawTestCase(_ImageTestCase, test.NoDBTestCase):
    -
    -    SIZE = 1024
    -
    -    def setUp(self):
    -        self.image_class = imagebackend.Raw
    -        super(RawTestCase, self).setUp()
    -        self.stubs.Set(imagebackend.Raw, 'correct_format', lambda _: None)
    -
    -    def prepare_mocks(self):
    -        fn = self.mox.CreateMockAnything()
    -        self.mox.StubOutWithMock(imagebackend.utils.synchronized,
    -                                 '__call__')
    -        self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
    -        self.mox.StubOutWithMock(imagebackend.disk, 'extend')
    -        return fn
    -
         def test_create_image(self):
             fn = self.prepare_mocks()
             fn(target=self.TEMPLATE_PATH, max_size=None, image_id=None)
    @@ -224,23 +241,33 @@ def test_create_image_extend(self):
             self.mox.VerifyAll()
     
         def test_correct_format(self):
    -        info = self.mox.CreateMockAnything()
             self.stubs.UnsetAll()
     
             self.mox.StubOutWithMock(os.path, 'exists')
             self.mox.StubOutWithMock(imagebackend.images, 'qemu_img_info')
     
    +        def fake_chown(path, owner_uid=None):
    +            return None
    +        self.stubs.Set(utils, 'chown', fake_chown)
    +
             os.path.exists(self.PATH).AndReturn(True)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
             info = self.mox.CreateMockAnything()
             info.file_format = 'foo'
             imagebackend.images.qemu_img_info(self.PATH).AndReturn(info)
    +        os.path.exists(CONF.instances_path).AndReturn(True)
             self.mox.ReplayAll()
     
             image = self.image_class(self.INSTANCE, self.NAME, path=self.PATH)
             self.assertEqual(image.driver_format, 'foo')
     
             self.mox.VerifyAll()
     
    +    def test_resolve_driver_format(self):
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        driver_format = image.resolve_driver_format()
    +        self.assertEqual(driver_format, 'raw')
    +
     
     class Qcow2TestCase(_ImageTestCase, test.NoDBTestCase):
         SIZE = 1024 * 1024 * 1024
    @@ -251,6 +278,10 @@ def setUp(self):
             self.QCOW2_BASE = (self.TEMPLATE_PATH +
                                '_%d' % (self.SIZE / (1024 * 1024 * 1024)))
     
    +        def fake_chown(path, owner_uid=None):
    +            return None
    +        self.stubs.Set(utils, 'chown', fake_chown)
    +
         def prepare_mocks(self):
             fn = self.mox.CreateMockAnything()
             self.mox.StubOutWithMock(imagebackend.utils.synchronized,
    @@ -261,6 +292,80 @@ def prepare_mocks(self):
             self.mox.StubOutWithMock(imagebackend.disk, 'extend')
             return fn
     
    +    def test_cache(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(CONF.instances_path).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
    +        os.path.exists(self.PATH).AndReturn(False)
    +        os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
    +        fn = self.mox.CreateMockAnything()
    +        fn(target=self.TEMPLATE_PATH)
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        self.mock_create_image(image)
    +        image.cache(fn, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
    +    def test_cache_image_exists(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
    +        os.path.exists(self.PATH).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        image.cache(None, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
    +    def test_cache_base_dir_exists(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
    +        os.path.exists(self.PATH).AndReturn(False)
    +        os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
    +        fn = self.mox.CreateMockAnything()
    +        fn(target=self.TEMPLATE_PATH)
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        self.mock_create_image(image)
    +        image.cache(fn, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
    +    def test_cache_template_exists(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
    +        os.path.exists(self.PATH).AndReturn(False)
    +        os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
    +        fn = self.mox.CreateMockAnything()
    +        fn(target=self.TEMPLATE_PATH)
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        self.mock_create_image(image)
    +        image.cache(fn, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
         def test_create_image(self):
             fn = self.prepare_mocks()
             fn(max_size=None, target=self.TEMPLATE_PATH)
    @@ -279,6 +384,8 @@ def test_create_image_with_size(self):
             self.mox.StubOutWithMock(os.path, 'exists')
             if self.OLD_STYLE_INSTANCE_PATH:
                 os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
             os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
             os.path.exists(self.PATH).AndReturn(False)
             os.path.exists(self.PATH).AndReturn(False)
    @@ -298,6 +405,8 @@ def test_create_image_too_small(self):
             self.mox.StubOutWithMock(imagebackend.disk, 'get_disk_size')
             if self.OLD_STYLE_INSTANCE_PATH:
                 os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
             os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
             imagebackend.disk.get_disk_size(self.TEMPLATE_PATH
                                            ).AndReturn(self.SIZE)
    @@ -316,6 +425,8 @@ def test_generate_resized_backing_files(self):
                                      'get_disk_backing_file')
             if self.OLD_STYLE_INSTANCE_PATH:
                 os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(CONF.instances_path).AndReturn(True)
             os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
             os.path.exists(self.PATH).AndReturn(True)
     
    @@ -342,6 +453,9 @@ def test_qcow2_exists_and_has_no_backing_file(self):
                                      'get_disk_backing_file')
             if self.OLD_STYLE_INSTANCE_PATH:
                 os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
    +
             os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
             os.path.exists(self.PATH).AndReturn(True)
     
    @@ -355,6 +469,53 @@ def test_qcow2_exists_and_has_no_backing_file(self):
     
             self.mox.VerifyAll()
     
    +    def test_resolve_driver_format(self):
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        driver_format = image.resolve_driver_format()
    +        self.assertEqual(driver_format, 'qcow2')
    +
    +    def test_prealloc_image(self):
    +        CONF.set_override('preallocate_images', 'space')
    +
    +        fake_processutils.fake_execute_clear_log()
    +        fake_processutils.stub_out_processutils_execute(self.stubs)
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +
    +        def fake_fetch(target, *args, **kwargs):
    +            return
    +
    +        self.stubs.Set(os.path, 'exists', lambda _: True)
    +        self.stubs.Set(os, 'access', lambda p, w: True)
    +
    +        # Call twice to verify testing fallocate is only called once.
    +        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    +        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    +
    +        self.assertEqual(fake_processutils.fake_execute_get_log(),
    +            ['fallocate -n -l 1 %s.fallocate_test' % self.PATH,
    +             'fallocate -n -l %s %s' % (self.SIZE, self.PATH),
    +             'fallocate -n -l %s %s' % (self.SIZE, self.PATH)])
    +
    +    def test_prealloc_image_without_write_access(self):
    +        CONF.set_override('preallocate_images', 'space')
    +
    +        fake_processutils.fake_execute_clear_log()
    +        fake_processutils.stub_out_processutils_execute(self.stubs)
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +
    +        def fake_fetch(target, *args, **kwargs):
    +            return
    +
    +        self.stubs.Set(image, 'check_image_exists', lambda: True)
    +        self.stubs.Set(image, '_can_fallocate', lambda: True)
    +        self.stubs.Set(os.path, 'exists', lambda _: True)
    +        self.stubs.Set(os, 'access', lambda p, w: False)
    +
    +        # Testing fallocate is only called when user has write access.
    +        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    +
    +        self.assertEqual(fake_processutils.fake_execute_get_log(), [])
    +
     
     class LvmTestCase(_ImageTestCase, test.NoDBTestCase):
         VG = 'FakeVG'
    @@ -431,6 +592,58 @@ def _create_image_resize(self, sparse):
     
             self.mox.VerifyAll()
     
    +    def test_cache(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(False)
    +        os.path.exists(self.PATH).AndReturn(False)
    +        os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
    +
    +        fn = self.mox.CreateMockAnything()
    +        fn(target=self.TEMPLATE_PATH)
    +        self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
    +        imagebackend.fileutils.ensure_tree(self.TEMPLATE_DIR)
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        self.mock_create_image(image)
    +        image.cache(fn, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
    +    def test_cache_image_exists(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
    +        os.path.exists(self.PATH).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        image.cache(None, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
    +    def test_cache_base_dir_exists(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
    +        os.path.exists(self.PATH).AndReturn(False)
    +        os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
    +        fn = self.mox.CreateMockAnything()
    +        fn(target=self.TEMPLATE_PATH)
    +        self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        self.mock_create_image(image)
    +        image.cache(fn, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
         def test_create_image(self):
             self._create_image(False)
     
    @@ -596,6 +809,21 @@ def test_cache_template_exists(self):
     
             self.mox.VerifyAll()
     
    +    def test_cache_base_dir_exists(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
    +        fn = self.mox.CreateMockAnything()
    +        fn(target=self.TEMPLATE_PATH)
    +        self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        self.mock_create_image(image)
    +        image.cache(fn, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
         def test_create_image(self):
             fn = self.prepare_mocks()
             fn(max_size=None, rbd=self.rbd, target=self.TEMPLATE_PATH)
    @@ -642,6 +870,13 @@ class BackendTestCase(test.NoDBTestCase):
                     'uuid': uuidutils.generate_uuid()}
         NAME = 'fake-name.suffix'
     
    +    def setUp(self):
    +        super(BackendTestCase, self).setUp()
    +
    +        def fake_chown(path, owner_uid=None):
    +            return None
    +        self.stubs.Set(utils, 'chown', fake_chown)
    +
         def get_image(self, use_cow, image_type):
             return imagebackend.Backend(use_cow).image(self.INSTANCE,
                                                        self.NAME,
    
  • nova/tests/virt/libvirt/test_libvirt.py+3 0 modified
    @@ -390,6 +390,9 @@ def fake_extend(image, size, use_cow=False):
     
             self.stubs.Set(libvirt_driver.disk, 'extend', fake_extend)
     
    +        self.stubs.Set(imagebackend.Image, 'resolve_driver_format',
    +                       imagebackend.Image._get_driver_format)
    +
             class FakeConn():
                 def getCapabilities(self):
                     """Ensure standard capabilities being returned."""
    
  • nova/tests/virt/test_virt_drivers.py+7 0 modified
    @@ -30,6 +30,7 @@
     from nova.tests.virt.libvirt import fake_libvirt_utils
     from nova.virt import event as virtevent
     from nova.virt import fake
    +from nova.virt.libvirt import imagebackend
     
     LOG = logging.getLogger(__name__)
     
    @@ -201,6 +202,12 @@ def setUp(self):
                                                         fake.FakeVirtAPI())
             self.ctxt = test_utils.get_test_admin_context()
             self.image_service = fake_image.FakeImageService()
    +        # NOTE(dripton): resolve_driver_format does some file reading and
    +        # writing and chowning that complicate testing too much by requiring
    +        # using real directories with proper permissions.  Just stub it out
    +        # here; we test it in test_imagebackend.py
    +        self.stubs.Set(imagebackend.Image, 'resolve_driver_format',
    +                       imagebackend.Image._get_driver_format)
     
         def _get_running_instance(self):
             instance_ref = test_utils.get_test_instance()
    
  • nova/utils.py+14 0 modified
    @@ -924,6 +924,20 @@ def temporary_chown(path, owner_uid=None):
                 execute('chown', orig_uid, path, run_as_root=True)
     
     
    +def chown(path, owner_uid=None):
    +    """chown a path.
    +
    +    :param owner_uid: UID of owner (defaults to current user)
    +    """
    +    if owner_uid is None:
    +        owner_uid = os.getuid()
    +
    +    orig_uid = os.stat(path).st_uid
    +
    +    if orig_uid != owner_uid:
    +        execute('chown', owner_uid, path, run_as_root=True)
    +
    +
     @contextlib.contextmanager
     def tempdir(**kwargs):
         argdict = kwargs.copy()
    
  • nova/virt/libvirt/imagebackend.py+74 2 modified
    @@ -88,6 +88,11 @@ def __init__(self, source_type, driver_format, is_block_dev=False):
             self.is_block_dev = is_block_dev
             self.preallocate = False
     
    +        # NOTE(dripton): We store lines of json (path, disk_format) in this
    +        # file, for some image types, to prevent attacks based on changing the
    +        # disk_format.
    +        self.disk_info_path = None
    +
             # NOTE(mikal): We need a lock directory which is shared along with
             # instance files, to cover the scenario where multiple compute nodes
             # are trying to create a base file at the same time
    @@ -232,6 +237,65 @@ def snapshot_extract(self, target, out_format):
         def snapshot_delete(self):
             raise NotImplementedError()
     
    +    def _get_driver_format(self):
    +        return self.driver_format
    +
    +    def resolve_driver_format(self):
    +        """Return the driver format for self.path.
    +
    +        First checks self.disk_info_path for an entry.
    +        If it's not there, calls self._get_driver_format(), and then
    +        stores the result in self.disk_info_path
    +
    +        See https://bugs.launchpad.net/nova/+bug/1221190
    +        """
    +        def _dict_from_line(line):
    +            if not line:
    +                return {}
    +            try:
    +                return jsonutils.loads(line)
    +            except (TypeError, ValueError) as e:
    +                msg = (_("Could not load line %(line)s, got error "
    +                        "%(error)s") %
    +                        {'line': line, 'error': unicode(e)})
    +                raise exception.InvalidDiskInfo(reason=msg)
    +
    +        @utils.synchronized(self.disk_info_path, external=False,
    +                            lock_path=self.lock_path)
    +        def write_to_disk_info_file():
    +            # Use os.open to create it without group or world write permission.
    +            fd = os.open(self.disk_info_path, os.O_RDWR | os.O_CREAT, 0o644)
    +            with os.fdopen(fd, "r+") as disk_info_file:
    +                line = disk_info_file.read().rstrip()
    +                dct = _dict_from_line(line)
    +                if self.path in dct:
    +                    msg = _("Attempted overwrite of an existing value.")
    +                    raise exception.InvalidDiskInfo(reason=msg)
    +                dct.update({self.path: driver_format})
    +                disk_info_file.seek(0)
    +                disk_info_file.truncate()
    +                disk_info_file.write('%s\n' % jsonutils.dumps(dct))
    +            # Ensure the file is always owned by the nova user so qemu can't
    +            # write it.
    +            utils.chown(self.disk_info_path, owner_uid=os.getuid())
    +
    +        try:
    +            if (self.disk_info_path is not None and
    +                        os.path.exists(self.disk_info_path)):
    +                with open(self.disk_info_path) as disk_info_file:
    +                    line = disk_info_file.read().rstrip()
    +                    dct = _dict_from_line(line)
    +                    for path, driver_format in dct.iteritems():
    +                        if path == self.path:
    +                            return driver_format
    +            driver_format = self._get_driver_format()
    +            if self.disk_info_path is not None:
    +                fileutils.ensure_tree(os.path.dirname(self.disk_info_path))
    +                write_to_disk_info_file()
    +        except OSError as e:
    +            raise exception.DiskInfoReadWriteFail(reason=unicode(e))
    +        return driver_format
    +
     
     class Raw(Image):
         def __init__(self, instance=None, disk_name=None, path=None,
    @@ -243,12 +307,17 @@ def __init__(self, instance=None, disk_name=None, path=None,
                                       disk_name))
             self.snapshot_name = snapshot_name
             self.preallocate = CONF.preallocate_images != 'none'
    +        self.disk_info_path = os.path.join(os.path.dirname(self.path),
    +                                           'disk.info')
             self.correct_format()
     
    +    def _get_driver_format(self):
    +        data = images.qemu_img_info(self.path)
    +        return data.file_format or 'raw'
    +
         def correct_format(self):
             if os.path.exists(self.path):
    -            data = images.qemu_img_info(self.path)
    -            self.driver_format = data.file_format or 'raw'
    +            self.driver_format = self.resolve_driver_format()
     
         def create_image(self, prepare_template, base, size, *args, **kwargs):
             @utils.synchronized(base, external=True, lock_path=self.lock_path)
    @@ -291,6 +360,9 @@ def __init__(self, instance=None, disk_name=None, path=None,
                                       disk_name))
             self.snapshot_name = snapshot_name
             self.preallocate = CONF.preallocate_images != 'none'
    +        self.disk_info_path = os.path.join(os.path.dirname(self.path),
    +                                           'disk.info')
    +        self.resolve_driver_format()
     
         def create_image(self, prepare_template, base, size, *args, **kwargs):
             @utils.synchronized(base, external=True, lock_path=self.lock_path)
    
dc8de4260669

Persist image format to a file, to prevent attacks based on changing it

https://github.com/openstack/novaDavid RiptonJan 28, 2014via ghsa
6 files changed · +397 64
  • nova/exception.py+8 0 modified
    @@ -463,6 +463,14 @@ class InvalidDiskFormat(Invalid):
         msg_fmt = _("Disk format %(disk_format)s is not acceptable")
     
     
    +class InvalidDiskInfo(Invalid):
    +    msg_fmt = _("Disk info file is invalid: %(reason)s")
    +
    +
    +class DiskInfoReadWriteFail(Invalid):
    +    msg_fmt = _("Failed to read or write disk info file: %(reason)s")
    +
    +
     class ImageUnacceptable(Invalid):
         msg_fmt = _("Image %(image_id)s is unacceptable: %(reason)s")
     
    
  • nova/tests/virt/libvirt/test_imagebackend.py+291 62 modified
    @@ -14,6 +14,8 @@
     #    under the License.
     
     import os
    +import shutil
    +import tempfile
     
     import fixtures
     from oslo.config import cfg
    @@ -26,13 +28,13 @@
     from nova import test
     from nova.tests import fake_processutils
     from nova.tests.virt.libvirt import fake_libvirt_utils
    +from nova import utils
     from nova.virt.libvirt import imagebackend
     
     CONF = cfg.CONF
     
     
     class _ImageTestCase(object):
    -    INSTANCES_PATH = '/instances_path'
     
         def mock_create_image(self, image):
             def create_image(fn, base, size, *args, **kwargs):
    @@ -41,10 +43,13 @@ def create_image(fn, base, size, *args, **kwargs):
     
         def setUp(self):
             super(_ImageTestCase, self).setUp()
    +        self.INSTANCES_PATH = tempfile.mkdtemp(suffix='instances')
             self.flags(disable_process_locking=True,
                        instances_path=self.INSTANCES_PATH)
             self.INSTANCE = {'name': 'instance',
                              'uuid': uuidutils.generate_uuid()}
    +        self.DISK_INFO_PATH = os.path.join(self.INSTANCES_PATH,
    +                                           self.INSTANCE['uuid'], 'disk.info')
             self.NAME = 'fake.vm'
             self.TEMPLATE = 'template'
     
    @@ -62,6 +67,78 @@ def setUp(self):
                 'nova.virt.libvirt.imagebackend.libvirt_utils',
                 fake_libvirt_utils))
     
    +        def fake_chown(path, owner_uid=None):
    +            return None
    +        self.stubs.Set(utils, 'chown', fake_chown)
    +
    +    def tearDown(self):
    +        super(_ImageTestCase, self).tearDown()
    +        shutil.rmtree(self.INSTANCES_PATH)
    +
    +    def test_prealloc_image(self):
    +        CONF.set_override('preallocate_images', 'space')
    +
    +        fake_processutils.fake_execute_clear_log()
    +        fake_processutils.stub_out_processutils_execute(self.stubs)
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +
    +        def fake_fetch(target, *args, **kwargs):
    +            return
    +
    +        self.stubs.Set(os.path, 'exists', lambda _: True)
    +        self.stubs.Set(os, 'access', lambda p, w: True)
    +
    +        # Call twice to verify testing fallocate is only called once.
    +        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    +        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    +
    +        self.assertEqual(fake_processutils.fake_execute_get_log(),
    +            ['fallocate -n -l 1 %s.fallocate_test' % self.PATH,
    +             'fallocate -n -l %s %s' % (self.SIZE, self.PATH),
    +             'fallocate -n -l %s %s' % (self.SIZE, self.PATH)])
    +
    +    def test_prealloc_image_without_write_access(self):
    +        CONF.set_override('preallocate_images', 'space')
    +
    +        fake_processutils.fake_execute_clear_log()
    +        fake_processutils.stub_out_processutils_execute(self.stubs)
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +
    +        def fake_fetch(target, *args, **kwargs):
    +            return
    +
    +        self.stubs.Set(image, 'check_image_exists', lambda: True)
    +        self.stubs.Set(image, '_can_fallocate', lambda: True)
    +        self.stubs.Set(os.path, 'exists', lambda _: True)
    +        self.stubs.Set(os, 'access', lambda p, w: False)
    +
    +        # Testing fallocate is only called when user has write access.
    +        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    +
    +        self.assertEqual(fake_processutils.fake_execute_get_log(), [])
    +
    +
    +class RawTestCase(_ImageTestCase, test.NoDBTestCase):
    +
    +    SIZE = 1024
    +
    +    def setUp(self):
    +        self.image_class = imagebackend.Raw
    +        super(RawTestCase, self).setUp()
    +        self.stubs.Set(imagebackend.Raw, 'correct_format', lambda _: None)
    +
    +        def fake_chown(path, owner_uid=None):
    +            return None
    +        self.stubs.Set(utils, 'chown', fake_chown)
    +
    +    def prepare_mocks(self):
    +        fn = self.mox.CreateMockAnything()
    +        self.mox.StubOutWithMock(imagebackend.utils.synchronized,
    +                                 '__call__')
    +        self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
    +        self.mox.StubOutWithMock(imagebackend.disk, 'extend')
    +        return fn
    +
         def test_cache(self):
             self.mox.StubOutWithMock(os.path, 'exists')
             if self.OLD_STYLE_INSTANCE_PATH:
    @@ -127,66 +204,6 @@ def test_cache_template_exists(self):
     
             self.mox.VerifyAll()
     
    -    def test_prealloc_image(self):
    -        CONF.set_override('preallocate_images', 'space')
    -
    -        fake_processutils.fake_execute_clear_log()
    -        fake_processutils.stub_out_processutils_execute(self.stubs)
    -        image = self.image_class(self.INSTANCE, self.NAME)
    -
    -        def fake_fetch(target, *args, **kwargs):
    -            return
    -
    -        self.stubs.Set(os.path, 'exists', lambda _: True)
    -        self.stubs.Set(os, 'access', lambda p, w: True)
    -
    -        # Call twice to verify testing fallocate is only called once.
    -        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    -        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    -
    -        self.assertEqual(fake_processutils.fake_execute_get_log(),
    -            ['fallocate -n -l 1 %s.fallocate_test' % self.PATH,
    -             'fallocate -n -l %s %s' % (self.SIZE, self.PATH),
    -             'fallocate -n -l %s %s' % (self.SIZE, self.PATH)])
    -
    -    def test_prealloc_image_without_write_access(self):
    -        CONF.set_override('preallocate_images', 'space')
    -
    -        fake_processutils.fake_execute_clear_log()
    -        fake_processutils.stub_out_processutils_execute(self.stubs)
    -        image = self.image_class(self.INSTANCE, self.NAME)
    -
    -        def fake_fetch(target, *args, **kwargs):
    -            return
    -
    -        self.stubs.Set(image, 'check_image_exists', lambda: True)
    -        self.stubs.Set(image, '_can_fallocate', lambda: True)
    -        self.stubs.Set(os.path, 'exists', lambda _: True)
    -        self.stubs.Set(os, 'access', lambda p, w: False)
    -
    -        # Testing fallocate is only called when user has write access.
    -        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    -
    -        self.assertEqual(fake_processutils.fake_execute_get_log(), [])
    -
    -
    -class RawTestCase(_ImageTestCase, test.NoDBTestCase):
    -
    -    SIZE = 1024
    -
    -    def setUp(self):
    -        self.image_class = imagebackend.Raw
    -        super(RawTestCase, self).setUp()
    -        self.stubs.Set(imagebackend.Raw, 'correct_format', lambda _: None)
    -
    -    def prepare_mocks(self):
    -        fn = self.mox.CreateMockAnything()
    -        self.mox.StubOutWithMock(imagebackend.utils.synchronized,
    -                                 '__call__')
    -        self.mox.StubOutWithMock(imagebackend.libvirt_utils, 'copy_image')
    -        self.mox.StubOutWithMock(imagebackend.disk, 'extend')
    -        return fn
    -
         def test_create_image(self):
             fn = self.prepare_mocks()
             fn(target=self.TEMPLATE_PATH, max_size=None, image_id=None)
    @@ -221,23 +238,33 @@ def test_create_image_extend(self):
             self.mox.VerifyAll()
     
         def test_correct_format(self):
    -        info = self.mox.CreateMockAnything()
             self.stubs.UnsetAll()
     
             self.mox.StubOutWithMock(os.path, 'exists')
             self.mox.StubOutWithMock(imagebackend.images, 'qemu_img_info')
     
    +        def fake_chown(path, owner_uid=None):
    +            return None
    +        self.stubs.Set(utils, 'chown', fake_chown)
    +
             os.path.exists(self.PATH).AndReturn(True)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
             info = self.mox.CreateMockAnything()
             info.file_format = 'foo'
             imagebackend.images.qemu_img_info(self.PATH).AndReturn(info)
    +        os.path.exists(CONF.instances_path).AndReturn(True)
             self.mox.ReplayAll()
     
             image = self.image_class(self.INSTANCE, self.NAME, path=self.PATH)
             self.assertEqual(image.driver_format, 'foo')
     
             self.mox.VerifyAll()
     
    +    def test_resolve_driver_format(self):
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        driver_format = image.resolve_driver_format()
    +        self.assertEqual(driver_format, 'raw')
    +
     
     class Qcow2TestCase(_ImageTestCase, test.NoDBTestCase):
         SIZE = units.Gi
    @@ -248,6 +275,10 @@ def setUp(self):
             self.QCOW2_BASE = (self.TEMPLATE_PATH +
                                '_%d' % (self.SIZE / units.Gi))
     
    +        def fake_chown(path, owner_uid=None):
    +            return None
    +        self.stubs.Set(utils, 'chown', fake_chown)
    +
         def prepare_mocks(self):
             fn = self.mox.CreateMockAnything()
             self.mox.StubOutWithMock(imagebackend.utils.synchronized,
    @@ -258,6 +289,77 @@ def prepare_mocks(self):
             self.mox.StubOutWithMock(imagebackend.disk, 'extend')
             return fn
     
    +    def test_cache(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(CONF.instances_path).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
    +        os.path.exists(self.PATH).AndReturn(False)
    +        fn = self.mox.CreateMockAnything()
    +        fn(target=self.TEMPLATE_PATH)
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        self.mock_create_image(image)
    +        image.cache(fn, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
    +    def test_cache_image_exists(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
    +        os.path.exists(self.PATH).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        image.cache(None, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
    +    def test_cache_base_dir_exists(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
    +        os.path.exists(self.PATH).AndReturn(False)
    +        fn = self.mox.CreateMockAnything()
    +        fn(target=self.TEMPLATE_PATH)
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        self.mock_create_image(image)
    +        image.cache(fn, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
    +    def test_cache_template_exists(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
    +        os.path.exists(self.PATH).AndReturn(False)
    +        fn = self.mox.CreateMockAnything()
    +        fn(target=self.TEMPLATE_PATH)
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        self.mock_create_image(image)
    +        image.cache(fn, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
         def test_create_image(self):
             fn = self.prepare_mocks()
             fn(max_size=None, target=self.TEMPLATE_PATH)
    @@ -276,6 +378,8 @@ def test_create_image_with_size(self):
             self.mox.StubOutWithMock(os.path, 'exists')
             if self.OLD_STYLE_INSTANCE_PATH:
                 os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
             os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
             os.path.exists(self.PATH).AndReturn(False)
             os.path.exists(self.PATH).AndReturn(False)
    @@ -295,6 +399,8 @@ def test_create_image_too_small(self):
             self.mox.StubOutWithMock(imagebackend.disk, 'get_disk_size')
             if self.OLD_STYLE_INSTANCE_PATH:
                 os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
             os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
             imagebackend.disk.get_disk_size(self.TEMPLATE_PATH
                                            ).AndReturn(self.SIZE)
    @@ -313,6 +419,8 @@ def test_generate_resized_backing_files(self):
                                      'get_disk_backing_file')
             if self.OLD_STYLE_INSTANCE_PATH:
                 os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(CONF.instances_path).AndReturn(True)
             os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
             os.path.exists(self.PATH).AndReturn(True)
     
    @@ -339,6 +447,9 @@ def test_qcow2_exists_and_has_no_backing_file(self):
                                      'get_disk_backing_file')
             if self.OLD_STYLE_INSTANCE_PATH:
                 os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.DISK_INFO_PATH).AndReturn(False)
    +        os.path.exists(self.INSTANCES_PATH).AndReturn(True)
    +
             os.path.exists(self.TEMPLATE_PATH).AndReturn(False)
             os.path.exists(self.PATH).AndReturn(True)
     
    @@ -352,6 +463,53 @@ def test_qcow2_exists_and_has_no_backing_file(self):
     
             self.mox.VerifyAll()
     
    +    def test_resolve_driver_format(self):
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        driver_format = image.resolve_driver_format()
    +        self.assertEqual(driver_format, 'qcow2')
    +
    +    def test_prealloc_image(self):
    +        CONF.set_override('preallocate_images', 'space')
    +
    +        fake_processutils.fake_execute_clear_log()
    +        fake_processutils.stub_out_processutils_execute(self.stubs)
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +
    +        def fake_fetch(target, *args, **kwargs):
    +            return
    +
    +        self.stubs.Set(os.path, 'exists', lambda _: True)
    +        self.stubs.Set(os, 'access', lambda p, w: True)
    +
    +        # Call twice to verify testing fallocate is only called once.
    +        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    +        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    +
    +        self.assertEqual(fake_processutils.fake_execute_get_log(),
    +            ['fallocate -n -l 1 %s.fallocate_test' % self.PATH,
    +             'fallocate -n -l %s %s' % (self.SIZE, self.PATH),
    +             'fallocate -n -l %s %s' % (self.SIZE, self.PATH)])
    +
    +    def test_prealloc_image_without_write_access(self):
    +        CONF.set_override('preallocate_images', 'space')
    +
    +        fake_processutils.fake_execute_clear_log()
    +        fake_processutils.stub_out_processutils_execute(self.stubs)
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +
    +        def fake_fetch(target, *args, **kwargs):
    +            return
    +
    +        self.stubs.Set(image, 'check_image_exists', lambda: True)
    +        self.stubs.Set(image, '_can_fallocate', lambda: True)
    +        self.stubs.Set(os.path, 'exists', lambda _: True)
    +        self.stubs.Set(os, 'access', lambda p, w: False)
    +
    +        # Testing fallocate is only called when user has write access.
    +        image.cache(fake_fetch, self.TEMPLATE_PATH, self.SIZE)
    +
    +        self.assertEqual(fake_processutils.fake_execute_get_log(), [])
    +
     
     class LvmTestCase(_ImageTestCase, test.NoDBTestCase):
         VG = 'FakeVG'
    @@ -428,6 +586,56 @@ def _create_image_resize(self, sparse):
     
             self.mox.VerifyAll()
     
    +    def test_cache(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(False)
    +        os.path.exists(self.PATH).AndReturn(False)
    +
    +        fn = self.mox.CreateMockAnything()
    +        fn(target=self.TEMPLATE_PATH)
    +        self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
    +        imagebackend.fileutils.ensure_tree(self.TEMPLATE_DIR)
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        self.mock_create_image(image)
    +        image.cache(fn, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
    +    def test_cache_image_exists(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
    +        os.path.exists(self.PATH).AndReturn(True)
    +        os.path.exists(self.TEMPLATE_PATH).AndReturn(True)
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        image.cache(None, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
    +    def test_cache_base_dir_exists(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        if self.OLD_STYLE_INSTANCE_PATH:
    +            os.path.exists(self.OLD_STYLE_INSTANCE_PATH).AndReturn(False)
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
    +        os.path.exists(self.PATH).AndReturn(False)
    +        fn = self.mox.CreateMockAnything()
    +        fn(target=self.TEMPLATE_PATH)
    +        self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        self.mock_create_image(image)
    +        image.cache(fn, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
         def test_create_image(self):
             self._create_image(False)
     
    @@ -595,6 +803,20 @@ def test_cache_template_exists(self):
     
             self.mox.VerifyAll()
     
    +    def test_cache_base_dir_exists(self):
    +        self.mox.StubOutWithMock(os.path, 'exists')
    +        os.path.exists(self.TEMPLATE_DIR).AndReturn(True)
    +        fn = self.mox.CreateMockAnything()
    +        fn(target=self.TEMPLATE_PATH)
    +        self.mox.StubOutWithMock(imagebackend.fileutils, 'ensure_tree')
    +        self.mox.ReplayAll()
    +
    +        image = self.image_class(self.INSTANCE, self.NAME)
    +        self.mock_create_image(image)
    +        image.cache(fn, self.TEMPLATE)
    +
    +        self.mox.VerifyAll()
    +
         def test_create_image(self):
             fn = self.prepare_mocks()
             fn(max_size=None, rbd=self.rbd, target=self.TEMPLATE_PATH)
    @@ -663,6 +885,13 @@ class BackendTestCase(test.NoDBTestCase):
                     'uuid': uuidutils.generate_uuid()}
         NAME = 'fake-name.suffix'
     
    +    def setUp(self):
    +        super(BackendTestCase, self).setUp()
    +
    +        def fake_chown(path, owner_uid=None):
    +            return None
    +        self.stubs.Set(utils, 'chown', fake_chown)
    +
         def get_image(self, use_cow, image_type):
             return imagebackend.Backend(use_cow).image(self.INSTANCE,
                                                        self.NAME,
    
  • nova/tests/virt/libvirt/test_libvirt.py+3 0 modified
    @@ -409,6 +409,9 @@ def fake_extend(image, size, use_cow=False):
     
             self.stubs.Set(libvirt_driver.disk, 'extend', fake_extend)
     
    +        self.stubs.Set(imagebackend.Image, 'resolve_driver_format',
    +                       imagebackend.Image._get_driver_format)
    +
             class FakeConn():
                 def baselineCPU(self, cpu, flag):
                     """Add new libvirt API."""
    
  • nova/tests/virt/test_virt_drivers.py+7 0 modified
    @@ -32,6 +32,7 @@
     from nova.tests.virt.libvirt import fake_libvirt_utils
     from nova.virt import event as virtevent
     from nova.virt import fake
    +from nova.virt.libvirt import imagebackend
     
     LOG = logging.getLogger(__name__)
     
    @@ -204,6 +205,12 @@ def setUp(self):
                                                         fake.FakeVirtAPI())
             self.ctxt = test_utils.get_test_admin_context()
             self.image_service = fake_image.FakeImageService()
    +        # NOTE(dripton): resolve_driver_format does some file reading and
    +        # writing and chowning that complicate testing too much by requiring
    +        # using real directories with proper permissions.  Just stub it out
    +        # here; we test it in test_imagebackend.py
    +        self.stubs.Set(imagebackend.Image, 'resolve_driver_format',
    +                       imagebackend.Image._get_driver_format)
     
         def _get_running_instance(self, obj=False):
             instance_ref = test_utils.get_test_instance(obj=obj)
    
  • nova/utils.py+14 0 modified
    @@ -761,6 +761,20 @@ def temporary_chown(path, owner_uid=None):
                 execute('chown', orig_uid, path, run_as_root=True)
     
     
    +def chown(path, owner_uid=None):
    +    """chown a path.
    +
    +    :param owner_uid: UID of owner (defaults to current user)
    +    """
    +    if owner_uid is None:
    +        owner_uid = os.getuid()
    +
    +    orig_uid = os.stat(path).st_uid
    +
    +    if orig_uid != owner_uid:
    +        execute('chown', owner_uid, path, run_as_root=True)
    +
    +
     @contextlib.contextmanager
     def tempdir(**kwargs):
         argdict = kwargs.copy()
    
  • nova/virt/libvirt/imagebackend.py+74 2 modified
    @@ -104,6 +104,11 @@ def __init__(self, source_type, driver_format, is_block_dev=False):
             self.is_block_dev = is_block_dev
             self.preallocate = False
     
    +        # NOTE(dripton): We store lines of json (path, disk_format) in this
    +        # file, for some image types, to prevent attacks based on changing the
    +        # disk_format.
    +        self.disk_info_path = None
    +
             # NOTE(mikal): We need a lock directory which is shared along with
             # instance files, to cover the scenario where multiple compute nodes
             # are trying to create a base file at the same time
    @@ -240,6 +245,65 @@ def verify_base_size(base, size, base_size=0):
         def snapshot_extract(self, target, out_format):
             raise NotImplementedError()
     
    +    def _get_driver_format(self):
    +        return self.driver_format
    +
    +    def resolve_driver_format(self):
    +        """Return the driver format for self.path.
    +
    +        First checks self.disk_info_path for an entry.
    +        If it's not there, calls self._get_driver_format(), and then
    +        stores the result in self.disk_info_path
    +
    +        See https://bugs.launchpad.net/nova/+bug/1221190
    +        """
    +        def _dict_from_line(line):
    +            if not line:
    +                return {}
    +            try:
    +                return jsonutils.loads(line)
    +            except (TypeError, ValueError) as e:
    +                msg = (_("Could not load line %(line)s, got error "
    +                        "%(error)s") %
    +                        {'line': line, 'error': unicode(e)})
    +                raise exception.InvalidDiskInfo(reason=msg)
    +
    +        @utils.synchronized(self.disk_info_path, external=False,
    +                            lock_path=self.lock_path)
    +        def write_to_disk_info_file():
    +            # Use os.open to create it without group or world write permission.
    +            fd = os.open(self.disk_info_path, os.O_RDWR | os.O_CREAT, 0o644)
    +            with os.fdopen(fd, "r+") as disk_info_file:
    +                line = disk_info_file.read().rstrip()
    +                dct = _dict_from_line(line)
    +                if self.path in dct:
    +                    msg = _("Attempted overwrite of an existing value.")
    +                    raise exception.InvalidDiskInfo(reason=msg)
    +                dct.update({self.path: driver_format})
    +                disk_info_file.seek(0)
    +                disk_info_file.truncate()
    +                disk_info_file.write('%s\n' % jsonutils.dumps(dct))
    +            # Ensure the file is always owned by the nova user so qemu can't
    +            # write it.
    +            utils.chown(self.disk_info_path, owner_uid=os.getuid())
    +
    +        try:
    +            if (self.disk_info_path is not None and
    +                        os.path.exists(self.disk_info_path)):
    +                with open(self.disk_info_path) as disk_info_file:
    +                    line = disk_info_file.read().rstrip()
    +                    dct = _dict_from_line(line)
    +                    for path, driver_format in dct.iteritems():
    +                        if path == self.path:
    +                            return driver_format
    +            driver_format = self._get_driver_format()
    +            if self.disk_info_path is not None:
    +                fileutils.ensure_tree(os.path.dirname(self.disk_info_path))
    +                write_to_disk_info_file()
    +        except OSError as e:
    +            raise exception.DiskInfoReadWriteFail(reason=unicode(e))
    +        return driver_format
    +
     
     class Raw(Image):
         def __init__(self, instance=None, disk_name=None, path=None):
    @@ -249,12 +313,17 @@ def __init__(self, instance=None, disk_name=None, path=None):
                          os.path.join(libvirt_utils.get_instance_path(instance),
                                       disk_name))
             self.preallocate = CONF.preallocate_images != 'none'
    +        self.disk_info_path = os.path.join(os.path.dirname(self.path),
    +                                           'disk.info')
             self.correct_format()
     
    +    def _get_driver_format(self):
    +        data = images.qemu_img_info(self.path)
    +        return data.file_format or 'raw'
    +
         def correct_format(self):
             if os.path.exists(self.path):
    -            data = images.qemu_img_info(self.path)
    -            self.driver_format = data.file_format or 'raw'
    +            self.driver_format = self.resolve_driver_format()
     
         def create_image(self, prepare_template, base, size, *args, **kwargs):
             filename = os.path.split(base)[-1]
    @@ -293,6 +362,9 @@ def __init__(self, instance=None, disk_name=None, path=None):
                          os.path.join(libvirt_utils.get_instance_path(instance),
                                       disk_name))
             self.preallocate = CONF.preallocate_images != 'none'
    +        self.disk_info_path = os.path.join(os.path.dirname(self.path),
    +                                           'disk.info')
    +        self.resolve_driver_format()
     
         def create_image(self, prepare_template, base, size, *args, **kwargs):
             filename = os.path.split(base)[-1]
    

Vulnerability mechanics

Generated by null/stub on May 9, 2026. Inputs: CWE entries + fix-commit diffs from this CVE's patches. Citations validated against bundle.

References

9

News mentions

0

No linked articles in our index yet.