Skip to content

Commit

Permalink
Add migration for devices
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexxIT committed Mar 29, 2024
1 parent 0d58370 commit 4c05771
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 71 deletions.
1 change: 1 addition & 0 deletions custom_components/xiaomi_gateway3/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ async def async_unload_entry(hass: HomeAssistant, config_entry: ConfigEntry) ->

async def async_migrate_entry(hass: HomeAssistant, config_entry: ConfigEntry):
if config_entry.version == 1:
hass_utils.migrate_legacy_devices_unique_id(hass)
hass_utils.migrate_legacy_entitites_unique_id(hass)
hass_utils.migrate_devices_store()
return True
Expand Down
80 changes: 48 additions & 32 deletions custom_components/xiaomi_gateway3/hass/hass_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,18 +207,45 @@ def remove_stats_entities(hass: HomeAssistant, config_entry: ConfigEntry):
registry.async_remove(entity_id)


def migrate_legacy_devices_unique_id(hass: HomeAssistant):
registry = device_registry.async_get(hass)
for device in registry.devices.values():
try:
if not any(
i[1] != migrate_uid(i[1]) for i in device.identifiers if i[0] == DOMAIN
):
continue

new_identifiers = {
(DOMAIN, migrate_uid(i[1])) if i[0] == DOMAIN else i
for i in device.identifiers
}
_LOGGER.info(f"Migrate device {device.identifiers} to {new_identifiers}")
registry.async_update_device(device.id, new_identifiers=new_identifiers)
except Exception as e:
_LOGGER.warning(f"Migration error for {device}: {repr(e)}")


def migrate_legacy_entitites_unique_id(hass: HomeAssistant):
registry = entity_registry.async_get(hass)
for entity in list(registry.entities.values()):
if entity.platform != DOMAIN:
for entry in list(registry.entities.values()):
if entry.platform != DOMAIN:
continue

try:
if new_uid := check_entity_unique_id(entity):
_LOGGER.info(f"Migrate {entity.entity_id} to unique_id: {new_uid}")
registry.async_update_entity(entity.entity_id, new_unique_id=new_uid)
# split mac and attr in unique id
uid, attr = entry.unique_id.split("_", 1)

new_uid = migrate_uid(uid)
new_attr = migrate_attr(attr, entry.original_device_class)
if uid == new_uid and attr == new_attr:
continue

new_unique_id = f"{new_uid}_{new_attr}"
_LOGGER.info(f"Migrate entity '{entry.unique_id}' to '{new_unique_id}'")
registry.async_update_entity(entry.entity_id, new_unique_id=new_unique_id)
except Exception as e:
_LOGGER.warning(f"Migration error for {entity}: {repr(e)}")
_LOGGER.warning(f"Migration error for {entry}: {repr(e)}")


def migrate_devices_store():
Expand All @@ -237,46 +264,35 @@ def migrate_devices_store():
v["last_report_ts"] = v.pop("last_decode_ts")


def check_entity_unique_id(registry_entry: RegistryEntry) -> str | None:
has_update = False

# split mac and attr in unique id
uid, attr = registry_entry.unique_id.split("_", 1)

def migrate_uid(uid: str) -> str:
if uid.startswith("0x"):
# ZIGBEE format should be "0x" + 16 hex lowercase
if len(uid) < 18:
uid = f"0x{uid[2:]:>016s}"
has_update = True
return f"0x{uid[2:]:>016s}"
elif len(uid) == 12:
# GATEWAY, BLE, MESH format should be 12 hex lowercase
if uid.isupper():
uid = uid.lower()
has_update = True
elif not uid.startswith("group"):
return uid.lower()
elif len(uid) == 16:
# GROUP format should be "group" + big int
if registry_entry.original_icon == "mdi:lightbulb-group":
did = int.from_bytes(bytes.fromhex(uid), "big")
uid = f"group{did}"
has_update = True
did = int.from_bytes(bytes.fromhex(uid), "big")
return f"group{did}"
return uid


def migrate_attr(attr: str, device_class: str) -> str:
if attr == "switch":
# attr for "plug" and "outlet" should be not "switch"
if registry_entry.original_device_class in ("plug", "outlet"):
attr = registry_entry.original_device_class
has_update = True
if device_class in ("plug", "outlet"):
return device_class
elif attr.startswith("channel ") or attr.endswith(" density"):
# spaces in attr was by mistake
attr = attr.replace(" ", "_")
has_update = True
return attr.replace(" ", "_")
elif attr == "pressure_state":
attr = "pressure"
has_update = True
return "pressure"
elif attr == "occupancy_distance":
attr = "distance"
has_update = True

return f"{uid}_{attr}" if has_update else None
return "distance"
return attr


def remove_device(hass: HomeAssistant, device: XDevice):
Expand Down
75 changes: 36 additions & 39 deletions tests/test_migrate.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,44 @@
from homeassistant.helpers.entity_registry import RegistryEntry
from custom_components.xiaomi_gateway3.core.const import DOMAIN
from custom_components.xiaomi_gateway3.hass.hass_utils import migrate_uid, migrate_attr

from custom_components.xiaomi_gateway3.hass.hass_utils import check_entity_unique_id

def test_migrate_uid():
def new_identifiers(identifiers: set) -> set | None:
assert any(i[1] != migrate_uid(i[1]) for i in identifiers if i[0] == DOMAIN)
return {
(DOMAIN, migrate_uid(i[1])) if i[0] == DOMAIN else i for i in identifiers
}

def test_migrate_entity_unique_id():
eid = "sensor.dummy"
p = new_identifiers({(DOMAIN, "0x158d000fffffff")})
assert p == {("xiaomi_gateway3", "0x00158d000fffffff")}

entry = RegistryEntry(
entity_id=eid,
unique_id="0x158d000fffffff_gas density",
platform="",
)
assert check_entity_unique_id(entry) == "0x00158d000fffffff_gas_density"
p = new_identifiers({(DOMAIN, "14ae46fffffff000")})
assert p == {("xiaomi_gateway3", "group1490206592031780864")}

entry = RegistryEntry(
entity_id=eid,
unique_id="0x158d000fffffff_smoke density",
platform="",
p = new_identifiers(
{(DOMAIN, "50EC50FFFFFF"), (DOMAIN, "50ec50ffffff"), ("dummy", "id")}
)
assert check_entity_unique_id(entry) == "0x00158d000fffffff_smoke_density"
assert p == {("dummy", "id"), ("xiaomi_gateway3", "50ec50ffffff")}

entry = RegistryEntry(
entity_id=eid,
unique_id="50EC50FFFFFF_light",
platform="",
)
assert check_entity_unique_id(entry) == "50ec50ffffff_light"

entry = RegistryEntry(
entity_id=eid,
unique_id="0x158d000fffffff_switch",
platform="",
original_device_class="plug",
original_icon="mdi:power-plug",
)
assert check_entity_unique_id(entry) == "0x00158d000fffffff_plug"

entry = RegistryEntry(
entity_id=eid,
unique_id="13e81ad1f34ab000_light",
platform="",
original_device_class=None,
original_icon="mdi:lightbulb-group",
)

assert check_entity_unique_id(entry) == "group1434425970349748224_light"
def test_migrate_entity_unique_id():
def new_unique_id(unique_id: str, original_device_class: str = None):
uid, attr = unique_id.split("_", 1)
new_uid = migrate_uid(uid)
new_attr = migrate_attr(attr, original_device_class)
return f"{new_uid}_{new_attr}"

p = new_unique_id("0x158d000fffffff_gas density")
assert p == "0x00158d000fffffff_gas_density"

p = new_unique_id("0x158d000fffffff_smoke density")
assert p == "0x00158d000fffffff_smoke_density"

p = new_unique_id("50EC50FFFFFF_light")
assert p == "50ec50ffffff_light"

p = new_unique_id("0x158d000fffffff_switch", "plug")
assert p == "0x00158d000fffffff_plug"

p = new_unique_id("13e81ad1f34ab000_light")
assert p == "group1434425970349748224_light"

0 comments on commit 4c05771

Please sign in to comment.