Skip to content

Commit

Permalink
tests: Add btrfs multidisk volume tests
Browse files Browse the repository at this point in the history
  • Loading branch information
tbzatek committed Jan 27, 2021
1 parent c5e455f commit 76a58b6
Showing 1 changed file with 199 additions and 0 deletions.
199 changes: 199 additions & 0 deletions src/tests/dbus-tests/test_btrfs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import re
import tempfile
import unittest
import time

from bytesize import bytesize
from collections import namedtuple
Expand Down Expand Up @@ -376,3 +377,201 @@ def test_subvolume_mount(self):
dbus_mounts.assertLen(1) # just one mountpoint
dbus_mnt = self.ay_to_str(dbus_mounts.value[0]) # mountpoints are arrays of bytes
self.assertEqual(dbus_mnt, mnt_path)

def test_multivolume(self):
devs = self._get_devices(4)
for i_dev in devs:
self.addCleanup(self.wipe_fs, i_dev.path)

manager = self.get_object('/Manager')
manager.CreateVolume([devs[0].obj_path],
'test_multivolume', 'single', 'single',
self.no_options,
dbus_interface=self.iface_prefix + '.Manager.BTRFS')

fstype = self.get_property(devs[0].obj, '.Block', 'IdType')
fstype.assertEqual('btrfs')
dbus_devs = self.get_property(devs[0].obj, '.Filesystem.BTRFS', 'num_devices')
dbus_devs.assertEqual(1)

# mount it as a single device
mnt_path = devs[0].obj.Mount(self.no_options, dbus_interface=self.iface_prefix + '.Filesystem')
self.addCleanup(self.try_unmount, mnt_path)
self.assertIsNotNone(mnt_path)
self.assertTrue(os.path.ismount(mnt_path))
self.assertTrue('test_multivolume' in mnt_path)

# save ID_FS_UUID for later
fs_uuid = self.get_property_raw(devs[0].obj, '.Block', 'IdUUID')
self.assertIsNotNone(fs_uuid)
self.assertGreater(len(fs_uuid), 0)

# verify that additional devices carry no signature
for i_dev in devs[1:]:
i_fstype = self.get_property(i_dev.obj, '.Block', 'IdType')
i_fstype.assertEqual('')
i_fs_uuid = self.get_property(i_dev.obj, '.Block', 'IdUUID')
i_fs_uuid.assertEqual('')
msg = 'No such interface .org.freedesktop.UDisks2.Filesystem.BTRFS'
with six.assertRaisesRegex(self, dbus.exceptions.DBusException, msg):
# no btrfs module interface should be attached
self.get_property_raw(i_dev.obj, '.Filesystem.BTRFS', 'num_devices')

# convert to a multidisk volume
for i_dev in devs[1:]:
devs[0].obj.AddDevice(i_dev.obj_path, self.no_options,
dbus_interface=self.iface_prefix + '.Filesystem.BTRFS')

# verify that properties on all disks are updated
for i_dev in devs:
dbus_devs = self.get_property(i_dev.obj, '.Filesystem.BTRFS', 'num_devices')
dbus_devs.assertEqual(4)
dbus_mounts = self.get_property(i_dev.obj, '.Filesystem', 'MountPoints')
dbus_mounts.assertLen(1) # just one mountpoint
dbus_mnt = self.ay_to_str(dbus_mounts.value[0]) # mountpoints are arrays of bytes
self.assertEqual(dbus_mnt, mnt_path)
i_fstype = self.get_property(i_dev.obj, '.Block', 'IdType')
i_fstype.assertEqual('btrfs')
i_fs_uuid = self.get_property_raw(i_dev.obj, '.Block', 'IdUUID')
self.assertEqual(i_fs_uuid, fs_uuid)
dbus_label = self.get_property(i_dev.obj, '.Filesystem.BTRFS', 'label')
dbus_label.assertEqual('test_multivolume')

# unmount it
devs[0].obj.Unmount(self.no_options, dbus_interface=self.iface_prefix + '.Filesystem')
self.assertFalse(os.path.ismount(mnt_path))
self.assertFalse(os.path.exists(mnt_path))

# change the label
devs[0].obj.SetLabel('btr12345678fs', self.no_options,
dbus_interface=self.iface_prefix + '.Filesystem.BTRFS')

# verify that mountpoint disappeared from all objects
for i_dev in devs:
dbus_mounts = self.get_property(i_dev.obj, '.Filesystem', 'MountPoints')
dbus_mounts.assertLen(0)

# mount again on a different block device
mnt_path2 = devs[3].obj.Mount(self.no_options, dbus_interface=self.iface_prefix + '.Filesystem')
self.addCleanup(self.try_unmount, mnt_path2)
self.assertIsNotNone(mnt_path2)
self.assertTrue(os.path.ismount(mnt_path2))
self.assertTrue('btr12345678fs' in mnt_path2)

# verify that properties on all disks are updated
for i_dev in devs:
dbus_mounts = self.get_property(i_dev.obj, '.Filesystem', 'MountPoints')
dbus_mounts.assertLen(1)
dbus_mnt = self.ay_to_str(dbus_mounts.value[0])
self.assertEqual(dbus_mnt, mnt_path2)
dbus_devs = self.get_property(i_dev.obj, '.Filesystem.BTRFS', 'num_devices')
dbus_devs.assertEqual(4)
dbus_label = self.get_property(i_dev.obj, '.Filesystem.BTRFS', 'label')
dbus_label.assertEqual('btr12345678fs')

# remove the devices
for i_dev in devs[:3]:
devs[3].obj.RemoveDevice(i_dev.obj_path, self.no_options,
dbus_interface=self.iface_prefix + '.Filesystem.BTRFS')

# check number of devices
dbus_devs = self.get_property(devs[3].obj, '.Filesystem.BTRFS', 'num_devices')
dbus_devs.assertEqual(1)

# verify that additional devices carry no signature after removal
for i_dev in devs[:3]:
msg = 'No such interface .org.freedesktop.UDisks2.Filesystem'
with six.assertRaisesRegex(self, dbus.exceptions.DBusException, msg):
self.get_property_raw(i_dev.obj, '.Filesystem', 'MountPoints')
msg = 'No such interface .org.freedesktop.UDisks2.Filesystem.BTRFS'
with six.assertRaisesRegex(self, dbus.exceptions.DBusException, msg):
self.get_property_raw(i_dev.obj, '.Filesystem.BTRFS', 'num_devices')
i_fstype = self.get_property(i_dev.obj, '.Block', 'IdType')
i_fstype.assertEqual('')
i_fs_uuid = self.get_property(i_dev.obj, '.Block', 'IdUUID')
i_fs_uuid.assertEqual('')

# unmount and check that the mountpoint is gone
devs[3].obj.Unmount(self.no_options, dbus_interface=self.iface_prefix + '.Filesystem')
self.assertFalse(os.path.ismount(mnt_path2))
self.assertFalse(os.path.exists(mnt_path2))

def test_multivolume_missing(self):
devs = self._get_devices(4)
for i_dev in devs:
self.addCleanup(self.wipe_fs, i_dev.path)

manager = self.get_object('/Manager')
manager.CreateVolume([devs[0].obj_path],
'test_multivolume_missing', 'single', 'single',
self.no_options,
dbus_interface=self.iface_prefix + '.Manager.BTRFS')

mnt_path = devs[0].obj.Mount(self.no_options, dbus_interface=self.iface_prefix + '.Filesystem')
self.addCleanup(self.try_unmount, mnt_path)
self.assertIsNotNone(mnt_path)
self.assertTrue(os.path.ismount(mnt_path))

for i_dev in devs[1:]:
devs[0].obj.AddDevice(i_dev.obj_path, self.no_options,
dbus_interface=self.iface_prefix + '.Filesystem.BTRFS')

# unmount it
devs[0].obj.Unmount(self.no_options, dbus_interface=self.iface_prefix + '.Filesystem')
self.assertFalse(os.path.ismount(mnt_path))
self.assertFalse(os.path.exists(mnt_path))

# wipe the second block device
self.wipe_fs(devs[1].path)

# try to mount it again with a missing component
msg = 'Error mounting /dev/sd. at /.*: wrong fs type, bad option, bad superblock'
with six.assertRaisesRegex(self, dbus.exceptions.DBusException, msg):
mnt_path = devs[0].obj.Mount(self.no_options, dbus_interface=self.iface_prefix + '.Filesystem')

def test_multivolume_external(self):
devs = self._get_devices(4)
for i_dev in devs:
self.addCleanup(self.wipe_fs, i_dev.path)

manager = self.get_object('/Manager')
manager.CreateVolume([devs[0].obj_path],
'test_multivolume_external', 'single', 'single',
self.no_options,
dbus_interface=self.iface_prefix + '.Manager.BTRFS')

mnt_path = devs[0].obj.Mount(self.no_options, dbus_interface=self.iface_prefix + '.Filesystem')
self.addCleanup(self.try_unmount, mnt_path)
self.assertIsNotNone(mnt_path)
self.assertTrue(os.path.ismount(mnt_path))

for i_dev in devs[1:]:
devs[0].obj.AddDevice(i_dev.obj_path, self.no_options,
dbus_interface=self.iface_prefix + '.Filesystem.BTRFS')

# unmount it
devs[0].obj.Unmount(self.no_options, dbus_interface=self.iface_prefix + '.Filesystem')
self.assertFalse(os.path.ismount(mnt_path))
self.assertFalse(os.path.exists(mnt_path))

# verify that mountpoint disappeared from all objects
for i_dev in devs:
dbus_mounts = self.get_property(i_dev.obj, '.Filesystem', 'MountPoints')
dbus_mounts.assertLen(0)

# mount it outside of UDisks
tmp = tempfile.mkdtemp()
self.addCleanup(shutil.rmtree, tmp)
ret, _out = self.run_command('mount %s %s' % (devs[0].path, tmp))
self.addCleanup(self.try_unmount, tmp)
self.assertEqual(ret, 0)

# give UDisks some time to register the event
time.sleep(2)

# verify that properties on all disks are updated
for i_dev in devs:
dbus_mounts = self.get_property(i_dev.obj, '.Filesystem', 'MountPoints')
dbus_mounts.assertLen(1)
dbus_mnt = self.ay_to_str(dbus_mounts.value[0])
self.assertEqual(dbus_mnt, tmp)

0 comments on commit 76a58b6

Please sign in to comment.