diff --git a/CHANGELOG.md b/CHANGELOG.md index 20ec7a66b..1dc248da1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ - Remove biome (as prettier was reinstated) - Replace node-based markdownlint with pythonic library +- Attempt to ditch untyped Munch for the existing TypedDicts by leveraging pydantic to type xmltodict XML conversion ## v1.11.2 - Add/update model-data for Jip, Tom and Floor via PR [#842](https://github.com/plugwise/python-plugwise/pull/842) diff --git a/plugwise/__init__.py b/plugwise/__init__.py index 9c4cd84dd..98d24462a 100644 --- a/plugwise/__init__.py +++ b/plugwise/__init__.py @@ -41,6 +41,8 @@ from munch import Munch from packaging.version import Version, parse +from .model import GatewayData, PlugwiseData + class Smile(SmileComm): """The main Plugwise Smile API class.""" @@ -74,18 +76,8 @@ def __init__( self._smile_api: SmileAPI | SmileLegacyAPI self._stretch_v2 = False self._target_smile: str = NONE - self.smile: Munch = Munch() - self.smile.anna_p1 = False - self.smile.hostname = NONE - self.smile.hw_version = None - self.smile.legacy = False - self.smile.mac_address = None - self.smile.model = NONE - self.smile.model_id = None - self.smile.name = NONE - self.smile.type = NONE - self.smile.version = Version("0.0.0") - self.smile.zigbee_mac_address = None + self.data: PlugwiseData + self.smile = GatewayData(hostname="smile") @property def cooling_present(self) -> bool: @@ -117,23 +109,19 @@ def reboot(self) -> bool: async def connect(self) -> Version: """Connect to the Plugwise Gateway and determine its name, type, version, and other data.""" - result = await self._request(DOMAIN_OBJECTS) - # Work-around for Stretch fw 2.7.18 - if not (vendor_names := result.findall("./module/vendor_name")): - result = await self._request(MODULES) - vendor_names = result.findall("./module/vendor_name") - - names: list[str] = [] - for name in vendor_names: - names.append(name.text) - - vendor_models = result.findall("./module/vendor_model") - models: list[str] = [] - for model in vendor_models: - models.append(model.text) + await self._request(DOMAIN_OBJECTS, new=True) - dsmrmain = result.find("./module/protocols/dsmrmain") - if "Plugwise" not in names and dsmrmain is None: # pragma: no cover + # Work-around for Stretch fw 2.7.18 + dsmrmain: bool = False + vendor_names: list = [] + vendor_models: list = [] + for module in self.data.module: + vendor_names.append(module.vendor_name) + vendor_models.append(module.vendor_model) + if "dsmrmain" in module.protocols: + dsmrmain = True + + if "Plugwise" not in vendor_names and dsmrmain is None: # pragma: no cover LOGGER.error( "Connected but expected text not returned, we got %s. Please create" " an issue on http://github.com/plugwise/python-plugwise", @@ -142,14 +130,14 @@ async def connect(self) -> Version: raise ResponseError # Check if Anna is connected to an Adam - if "159.2" in models: + if "159.2" in vendor_models: LOGGER.error( "Your Anna is connected to an Adam, make sure to only add the Adam as integration." ) raise InvalidSetupError # Determine smile specifics - await self._smile_detect(result, dsmrmain) + await self._smile_detect() self._smile_api = ( SmileAPI( @@ -162,6 +150,7 @@ async def connect(self) -> Version: self._request, self._schedule_old_states, self.smile, + self.data, ) if not self.smile.legacy else SmileLegacyAPI( @@ -173,45 +162,57 @@ async def connect(self) -> Version: self._stretch_v2, self._target_smile, self.smile, + self.data, ) ) # Update all endpoints on first connect await self._smile_api.full_xml_update() - return cast(Version, self.smile.version) + return self.smile.firmware_version - async def _smile_detect( - self, result: etree.Element, dsmrmain: etree.Element - ) -> None: + async def _smile_detect(self) -> None: """Helper-function for connect(). Detect which type of Plugwise Gateway is being connected. """ + print(f"HOI14 {self}") + print(f"HOI14 {self.smile}") model: str = "Unknown" - if (gateway := result.find("./gateway")) is not None: - self.smile.version = parse(gateway.find("firmware_version").text) - self.smile.hw_version = gateway.find("hardware_version").text - self.smile.hostname = gateway.find("hostname").text - self.smile.mac_address = gateway.find("mac_address").text - if (vendor_model := gateway.find("vendor_model")) is None: + if self.data.gateway is not None: + if self.data.gateway.vendor_model is None: return # pragma: no cover - model = vendor_model.text - elec_measurement = gateway.find( - "gateway_environment/electricity_consumption_tariff_structure" - ) + self.smile.firmware_version = self.data.gateway.firmware_version + self.smile.hardware_version = self.data.gateway.hardware_version + self.smile.hostname = self.data.gateway.hostname + self.smile.mac_address = self.data.gateway.mac_address + + print(f"HOI11a {self.data.gateway}") + print(f"HOI11b {self.data.gateway.gateway_environment}") + if ( + "electricity_consumption_tariff_structure" + in self.data.gateway.gateway_environment + ): + print( + f"HOI11c {self.data.gateway.gateway_environment.electricity_consumption_tariff_structure}" + ) if ( - elec_measurement is not None - and elec_measurement.text - and model == "smile_thermo" + "electricity_consumption_tariff_structure" + in self.data.gateway.gateway_environment + and self.data.gateway.gateway_environment.electricity_consumption_tariff_structure + and self.smile.vendor_model == "smile_thermo" ): self.smile.anna_p1 = True else: - model = await self._smile_detect_legacy(result, dsmrmain, model) + # TODO + self.smile.vendor_model = await self._smile_detect_legacy( + result, dsmrmain, model + ) - if model == "Unknown" or self.smile.version == Version( - "0.0.0" + if ( + self.data.gateway.vendor_model == "Unknown" + or self.smile.firmware_version == Version("0.0.0") ): # pragma: no cover # Corner case check LOGGER.error( @@ -220,8 +221,8 @@ async def _smile_detect( ) raise UnsupportedDeviceError - version_major = str(self.smile.version.major) - self._target_smile = f"{model}_v{version_major}" + version_major = Version(self.smile.firmware_version).major + self._target_smile = f"{self.data.gateway.vendor_model}_v{version_major}" LOGGER.debug("Plugwise identified as %s", self._target_smile) if self._target_smile not in SMILES: LOGGER.error( @@ -242,7 +243,7 @@ async def _smile_detect( raise UnsupportedDeviceError # pragma: no cover self.smile.model = "Gateway" - self.smile.model_id = model + self.smile.model_id = self.data.gateway.vendor_model self.smile.name = SMILES[self._target_smile].smile_name self.smile.type = SMILES[self._target_smile].smile_type if self.smile.name == "Smile Anna" and self.smile.anna_p1: @@ -251,9 +252,9 @@ async def _smile_detect( if self.smile.type == "stretch": self._stretch_v2 = int(version_major) == 2 - self._process_for_thermostat(result) + self._process_for_thermostat() - def _process_for_thermostat(self, result: etree.Element) -> None: + def _process_for_thermostat(self) -> None: """Extra processing for thermostats.""" if self.smile.type != "thermostat": return @@ -262,18 +263,22 @@ def _process_for_thermostat(self, result: etree.Element) -> None: # For Adam, Anna, determine the system capabilities: # Find the connected heating/cooling device (heater_central), # e.g. heat-pump or gas-fired heater - onoff_boiler = result.find("./module/protocols/onoff_boiler") - open_therm_boiler = result.find("./module/protocols/open_therm_boiler") - self._on_off_device = onoff_boiler is not None - self._opentherm_device = open_therm_boiler is not None + self._on_off_device: bool = ( + True + if "protocols" in self.data.module + and "on_off_boiler" in self.data.module.protocols + else False + ) + self._opentherm_device: bool = ( + True + if "protocols" in self.data.module + and "open_therm_boiler" in self.data.module.protocols + else False + ) # Determine the presence of special features - locator_1 = "./gateway/features/cooling" - locator_2 = "./gateway/features/elga_support" - if result.find(locator_1) is not None: - self._cooling_present = True - if result.find(locator_2) is not None: - self._elga = True + self._cooling_present = "cooling" in self.data.gateway.features + self._elga = "elga_support" in self.data.gateway.features async def _smile_detect_legacy( self, result: etree.Element, dsmrmain: etree.Element, model: str diff --git a/plugwise/common.py b/plugwise/common.py index 0c36be74f..e6d768064 100644 --- a/plugwise/common.py +++ b/plugwise/common.py @@ -16,23 +16,21 @@ SWITCH_GROUP_TYPES, ApplianceType, GwEntityData, - ModuleData, -) -from plugwise.util import ( - check_heater_central, - check_model, - get_vendor_name, - return_valid, + # ModuleData, ) +from plugwise.util import check_heater_central, check_model, return_valid from defusedxml import ElementTree as etree from munch import Munch +from .model import Module, ModuleData + -def get_zigbee_data( - module: etree.Element, module_data: ModuleData, legacy: bool -) -> None: +def get_zigbee_data(module: Module, module_data: ModuleData, legacy: bool) -> None: """Helper-function for _get_module_data().""" + if not module.protocols: + return + if legacy: # Stretches if (router := module.find("./protocols/network_router")) is not None: @@ -40,10 +38,19 @@ def get_zigbee_data( # Also look for the Circle+/Stealth M+ if (coord := module.find("./protocols/network_coordinator")) is not None: module_data["zigbee_mac_address"] = coord.find("mac_address").text + if legacy: + if module.protocols.network_router: + module_data.zigbee_mac_address = module.protocols.network_router.mac_address + if module.protocols.network_coordinator: + module_data.zigbee_mac_address = ( + module.protocols.network_coordinator.mac_address + ) + return # Adam - elif (zb_node := module.find("./protocols/zig_bee_node")) is not None: - module_data["zigbee_mac_address"] = zb_node.find("mac_address").text - module_data["reachable"] = zb_node.find("reachable").text == "true" + if module.protocols.zig_bee_node: + zb = module.protocols.zig_bee_node + module_data.zigbee_mac_address = zb.mac_address + module_data.reachable = zb.reachable class SmileCommon: @@ -103,9 +110,9 @@ def _appl_heater_central_info( # xml_3: self._modules for legacy, self._domain_objects for actual xml_3 = return_valid(xml_3, self._domain_objects) module_data = self._get_module_data(xml_1, locator_1, xml_2=xml_3) - if not module_data["contents"]: + if not module_data["content"]: module_data = self._get_module_data(xml_1, locator_2, xml_2=xml_3) - if not module_data["contents"]: + if not module_data["content"]: self._heater_id = NONE return ( Munch() @@ -120,13 +127,13 @@ def _appl_heater_central_info( return appl def _appl_thermostat_info( - self, appl: Munch, xml_1: etree.Element, xml_2: etree.Element = None + self, appl: Appliance, xml_1: etree.Element, xml_2: etree.Element = None ) -> Munch: """Helper-function for _appliance_info_finder().""" locator = "./logs/point_log[type='thermostat']/thermostat" xml_2 = return_valid(xml_2, self._domain_objects) module_data = self._get_module_data(xml_1, locator, xml_2=xml_2) - if not module_data["contents"]: + if not module_data["content"]: return Munch() # no module-data present means the device has been removed appl.vendor_name = module_data["vendor_name"] @@ -144,21 +151,21 @@ def _appl_thermostat_info( return appl - def _create_gw_entities(self, appl: Munch) -> None: + def _create_gw_entities(self, appl: Appliance) -> None: """Helper-function for creating/updating gw_entities.""" - self.gw_entities[appl.entity_id] = {"dev_class": appl.pwclass} + self.gw_entities[appl.id] = {"dev_class": appl.type} self._count += 1 for key, value in { "available": appl.available, - "firmware": appl.firmware, - "hardware": appl.hardware, + "firmware": appl.firmware_version, + "hardware": appl.hardware_version, "location": appl.location, - "mac_address": appl.mac, + "mac_address": appl.mac_address, "model": appl.model, "model_id": appl.model_id, "name": appl.name, "vendor": appl.vendor_name, - "zigbee_mac_address": appl.zigbee_mac, + "zigbee_mac_address": appl.zigbee_mac_address, }.items(): if value is not None or key == "location": appl_key = cast(ApplianceType, key) @@ -197,36 +204,26 @@ def _get_groups(self) -> None: if self.smile.type == "power" or self.check_name(ANNA): return - for group in self._domain_objects.findall("./group"): - group_id = group.get("id") - if group_id is None: - continue # pragma: no cover - - if not (members := self._collect_members(group)): + for group in self.data.group: + members: list[str] = [] + if not group.appliances: continue - group_name = group.find("name").text - group_type = group.find("type").text - if group_type in GROUP_TYPES: - self.gw_entities[group_id] = { - "dev_class": group_type, + for item in group.appliances.appliance: + # Check if members are not orphaned - stretch + if item.id in self.gw_entities: + members.append(item.id) + + if group.type in GROUP_TYPES and members and group.id: + self.gw_entities[group.id] = { + "dev_class": group.type, "model": "Group", - "name": group_name, + "name": group.name, "members": members, "vendor": "Plugwise", } self._count += 5 - def _collect_members(self, element: etree.Element) -> list[str]: - """Check and collect members.""" - members: list[str] = [] - group_appliances = element.findall("appliances/appliance") - for item in group_appliances: - if (member_id := item.get("id")) in self.gw_entities: - members.append(member_id) - - return members - def _get_lock_state( self, xml: etree.Element, data: GwEntityData, stretch_v2: bool = False ) -> None: @@ -247,31 +244,42 @@ def _get_lock_state( def _get_module_data( self, - xml_1: etree.Element, - locator: str, key: str | None = None, - xml_2: etree.Element | None = None, legacy: bool = False, ) -> ModuleData: """Helper-function for _energy_device_info_finder() and _appliance_info_finder(). Collect requested info from MODULES. """ - module_data: ModuleData = { - "contents": False, - "firmware_version": None, - "hardware_version": None, - "reachable": None, - "vendor_name": None, - "vendor_model": None, - "zigbee_mac_address": None, - } - - for appl_search in xml_1.findall(locator): - link_tag = appl_search.tag - if key is not None and key not in link_tag: + module_data = ModuleData() + if "services" not in self.data.appliance or not self.data.appliance.services: + return module_data + + for service_type, services in self.data.appliance.services.iter_services(): + if key and key not in service_type: continue + # NOW correctly nested + for service in services: + module = self.data.get_module(service.id) + if not module: + continue + + module_data = ModuleData( + content=True, + firmware_version=module.firmware_version, + hardware_version=module.hardware_version, + reachable=module.reachable, + vendor_name=module.vendor_name, + vendor_model=module.vendor_model, + zigbee_mac_address=module.zigbee_mac_address, + ) + get_zigbee_data(module, module_data, legacy) + + return module_data + + # TODO legacy + """ link_id = appl_search.get("id") loc = f".//services/{link_tag}[@id='{link_id}']...." # Not possible to walrus for some reason... @@ -279,7 +287,7 @@ def _get_module_data( search = return_valid(xml_2, self._domain_objects) module = search.find(loc) if module is not None: # pylint: disable=consider-using-assignment-expr - module_data["contents"] = True + module_data["content"] = True get_vendor_name(module, module_data) module_data["vendor_model"] = module.find("vendor_model").text module_data["hardware_version"] = module.find("hardware_version").text @@ -287,5 +295,4 @@ def _get_module_data( get_zigbee_data(module, module_data, legacy) break - - return module_data + """ diff --git a/plugwise/constants.py b/plugwise/constants.py index 267f2cbdb..575fd1e4e 100644 --- a/plugwise/constants.py +++ b/plugwise/constants.py @@ -419,6 +419,7 @@ ) +# TODO recreate or obsolete class ModuleData(TypedDict): """The Module data class.""" diff --git a/plugwise/data.py b/plugwise/data.py index eb244f0f1..0fb2cfddd 100644 --- a/plugwise/data.py +++ b/plugwise/data.py @@ -12,7 +12,6 @@ ANNA, MAX_SETPOINT, MIN_SETPOINT, - NONE, OFF, ActuatorData, GwEntityData, @@ -272,14 +271,14 @@ def _climate_data(self, location_id: str, entity: GwEntityData) -> None: entity["select_schedule"] = None self._count += 2 avail_schedules, sel_schedule = self._schedules(loc_id) - if avail_schedules != [NONE]: + if avail_schedules != [None]: entity["available_schedules"] = avail_schedules entity["select_schedule"] = sel_schedule # Set HA climate HVACMode: auto, heat, heat_cool, cool and off entity["climate_mode"] = "auto" self._count += 1 - if sel_schedule in (NONE, OFF): + if sel_schedule in (None, OFF): entity["climate_mode"] = "heat" if self._cooling_present: entity["climate_mode"] = ( @@ -289,7 +288,7 @@ def _climate_data(self, location_id: str, entity: GwEntityData) -> None: if self.check_reg_mode("off"): entity["climate_mode"] = "off" - if NONE not in avail_schedules: + if None not in avail_schedules: self._get_schedule_states_with_off( loc_id, avail_schedules, sel_schedule, entity ) @@ -320,7 +319,7 @@ def _get_schedule_states_with_off( ) -> None: """Collect schedules with states for each thermostat. - Also, replace NONE by OFF when none of the schedules are active. + Also, replace None by OFF when none of the schedules are active. """ all_off = True self._schedule_old_states[location] = {} diff --git a/plugwise/helper.py b/plugwise/helper.py index 075f5b17d..a96833d25 100644 --- a/plugwise/helper.py +++ b/plugwise/helper.py @@ -26,7 +26,6 @@ LOCATIONS, LOGGER, MODULE_LOCATOR, - NONE, OFF, P1_MEASUREMENTS, TEMP_CELSIUS, @@ -43,6 +42,7 @@ ThermoLoc, ToggleNameType, ) +from plugwise.model import Appliance, ApplianceType, OffsetFunctionality from plugwise.util import ( check_model, collect_power_values, @@ -57,17 +57,6 @@ from packaging import version -def extend_plug_device_class(appl: Munch, appliance: etree.Element) -> None: - """Extend device_class name of Plugs (Plugwise and Aqara) - Pw-Beta Issue #739.""" - - if ( - (search := appliance.find("description")) is not None - and (description := search.text) is not None - and ("ZigBee protocol" in description or "smart plug" in description) - ): - appl.pwclass = f"{appl.pwclass}_plug" - - def search_actuator_functionalities( appliance: etree.Element, actuator: str ) -> etree.Element | None: @@ -90,7 +79,7 @@ def __init__(self) -> None: self._is_thermostat: bool self._loc_data: dict[str, ThermoLoc] self._schedule_old_states: dict[str, dict[str, str]] - self._gateway_id: str = NONE + self._gateway_id: str = None self._zones: dict[str, GwEntityData] self.gw_entities: dict[str, GwEntityData] self.smile: Munch = Munch() @@ -114,49 +103,37 @@ def _get_appliances(self) -> None: self._count = 0 self._get_locations() - for appliance in self._domain_objects.findall("./appliance"): - appl = Munch() - appl.available = None - appl.entity_id = appliance.get("id") - appl.firmware = None - appl.hardware = None - appl.location = None - appl.mac = None - appl.model = None - appl.model_id = None - appl.module_id = None - appl.name = appliance.find("name").text - appl.pwclass = appliance.find("type").text - appl.zigbee_mac = None - appl.vendor_name = None - + for appliance in self.data.appliance: # Don't collect data for the OpenThermGateway appliance, skip thermostat(s) # without actuator_functionalities, should be an orphaned device(s) (Core #81712) - if appl.pwclass == "open_therm_gateway" or ( - appl.pwclass == "thermostat" - and appliance.find("actuator_functionalities/") is None + if appliance.type == ApplianceType.OPENTHERMGW or ( + appliance.type == ApplianceType.THERMOSTAT + and appliance.actuator_functionalities is None ): continue - if (appl_loc := appliance.find("location")) is not None: - appl.location = appl_loc.get("id") + if appliance.location is not None: + appliance.fixed_location = appliance.id # Set location to the _home_loc_id when the appliance-location is not found, # except for thermostat-devices without a location, they are not active - elif appl.pwclass not in THERMOSTAT_CLASSES: - appl.location = self._home_loc_id + elif appliance.type not in THERMOSTAT_CLASSES: + appliance.fixed_location = self._home_loc_id - # Don't show orphaned (no location) thermostat-types - if appl.pwclass in THERMOSTAT_CLASSES and appl.location is None: + # Don't show orphaned thermostat-types + if appliance.type in THERMOSTAT_CLASSES and appliance.location is None: continue - extend_plug_device_class(appl, appliance) + # Extend device_class name of Plugs (Plugwise and Aqara) - Pw-Beta Issue #739 + if appliance.description is not None and ( + "ZigBee protocol" in appliance.description + or "smart plug" in appliance.description + ): + appliance.type = f"{appliance.type}_plug" # Collect appliance info, skip orphaned/removed devices - if not (appl := self._appliance_info_finder(appl, appliance)): + if not self._appliance_info_finder(appliance): continue - self._create_gw_entities(appl) - # A smartmeter is not present as an appliance, add it specifically if self.smile.type == "power" or self.smile.anna_p1: self._get_p1_smartmeter_info() @@ -170,27 +147,26 @@ def _get_p1_smartmeter_info(self) -> None: Note: For P1, the entity_id for the gateway and smartmeter are switched to maintain backward compatibility. For Anna P1, the smartmeter uses the home location_id directly. """ - appl = Munch() locator = MODULE_LOCATOR tag = "electricity" module_data = self._get_module_data(self._home_location, locator, key=tag) # No module-data present means the device has been removed - if not module_data["contents"]: # pragma: no cover + if not module_data["content"]: # pragma: no cover return appl.available = None appl.entity_id = self._home_loc_id if not self.smile.anna_p1: appl.entity_id = self._gateway_id - appl.firmware = module_data["firmware_version"] - appl.hardware = module_data["hardware_version"] + appl.firmware = module_data.firmware_version + appl.hardware = module_data.hardware_version appl.location = self._home_loc_id appl.mac = None - appl.model = module_data["vendor_model"] + appl.model = module_data.vendor_model appl.model_id = None # don't use model_id for SmartMeter appl.name = "P1" appl.pwclass = "smartmeter" - appl.vendor_name = module_data["vendor_name"] + appl.vendor_name = module_data.vendor_name appl.zigbee_mac = None # Replace the entity_id of the gateway by the smartmeter location_id @@ -204,21 +180,24 @@ def _get_locations(self) -> None: """Collect all locations.""" counter = 0 loc = Munch() - locations = self._domain_objects.findall("./location") + print(f"HOI15 {self.data}") + print(f"HOI15 {self.data.location}") + locations = self.data.location if not locations: raise KeyError("No location data present!") for location in locations: - loc.loc_id = location.get("id") - loc.name = location.find("name").text - loc._type = location.find("type").text + loc.loc_id = location.id + loc.name = location.name + loc._type = location.type self._loc_data[loc.loc_id] = {"name": loc.name} # Home location is of type building if loc._type == "building": counter += 1 self._home_loc_id = loc.loc_id - self._home_location = self._domain_objects.find( - f"./location[@id='{loc.loc_id}']" + self._home_location = next( + (l for l in self.data.location if l.id == loc.loc_id), + None, ) if counter == 0: @@ -226,63 +205,57 @@ def _get_locations(self) -> None: "Error, location Home (building) not found!" ) # pragma: no cover - def _appliance_info_finder(self, appl: Munch, appliance: etree.Element) -> Munch: + def _appliance_info_finder(self, appliance: Appliance) -> Appliance | None: """Collect info for all appliances found.""" - match appl.pwclass: + print(f"HOI22 appliance type {appliance.type}!") + match appliance.type: case "gateway": # Collect gateway entity info - return self._appl_gateway_info(appl, appliance) + print("HOI22 gateway!") + return self._appl_gateway_info(appliance) case _ as dev_class if dev_class in THERMOSTAT_CLASSES: # Collect thermostat entity info - return self._appl_thermostat_info(appl, appliance) + return self._appl_thermostat_info(appliance) case "heater_central": # Collect heater_central entity info # 251016: the added guarding below also solves Core Issue #104433 if not ( - appl := self._appl_heater_central_info(appl, appliance, False) + appliance := self._appl_heater_central_info(appliance, False) ): # False means non-legacy entity return Munch() self._dhw_allowed_modes = self._get_appl_actuator_modes( appliance, "domestic_hot_water_mode_control_functionality" ) - return appl + return appliance case _ as s if s.endswith("_plug"): # Collect info from plug-types (Plug, Aqara Smart Plug) locator = MODULE_LOCATOR module_data = self._get_module_data(appliance, locator) # A plug without module-data is orphaned/ no present - if not module_data["contents"]: - return Munch() - - appl.available = module_data["reachable"] - appl.firmware = module_data["firmware_version"] - appl.hardware = module_data["hardware_version"] - appl.model_id = module_data["vendor_model"] - appl.vendor_name = module_data["vendor_name"] - appl.model = check_model(appl.model_id, appl.vendor_name) - appl.zigbee_mac = module_data["zigbee_mac_address"] - return appl + if not module_data.content: + return None + + print(f"HOI24 {module_data}") + appliance.available = module_data.reachable + appliance.firmware_version = module_data.firmware_version + appliance.hardware_version = module_data.hardware_version + appliance.model_id = module_data.vendor_model + appliance.vendor_name = module_data.vendor_name + appliance.model = check_model(appl.model_id, appl.vendor_name) + appliance.zigbee_mac_address = module_data.zigbee_mac_address + return appliance case _: # pragma: no cover - return Munch() + return None - def _appl_gateway_info(self, appl: Munch, appliance: etree.Element) -> Munch: + def _appl_gateway_info(self, appliance: Appliance) -> Appliance: """Helper-function for _appliance_info_finder().""" - self._gateway_id = appl.entity_id - locator = "./gateway/firmware_version" - appl.firmware = self._domain_objects.find(locator).text - appl.hardware = self.smile.hw_version - appl.mac = self.smile.mac_address - appl.model = self.smile.model - appl.model_id = self.smile.model_id - appl.name = self.smile.name - appl.vendor_name = "Plugwise" + print(f"HOI19 {appliance.id}") + self._gateway_id = appliance.id # Adam: collect the ZigBee MAC address of the Smile - if self.check_name(ADAM): - if ( - found := self._domain_objects.find(".//protocols/zig_bee_coordinator") - ) is not None: - appl.zigbee_mac = found.find("mac_address").text + if ADAM in appliance.name: + if (found := appliance.protocols.zig_bee_coordinator) is not None: + appliance.zigbee_mac = found.mac_address # Also, collect regulation_modes and check for cooling, indicating cooling-mode is present self._reg_allowed_modes = self._get_appl_actuator_modes( @@ -296,7 +269,7 @@ def _appl_gateway_info(self, appl: Munch, appliance: etree.Element) -> Munch: # Limit the possible gateway-modes self._gw_allowed_modes = ["away", "full", "vacation"] - return appl + return appliance def _get_appl_actuator_modes( self, appliance: etree.Element, actuator_type: str @@ -404,9 +377,8 @@ def _collect_appliance_data( measurements: dict[str, DATA | UOM], ) -> etree.Element | None: """Collect initial appliance data.""" - if ( - appliance := self._domain_objects.find(f'./appliance[@id="{entity_id}"]') - ) is not None: + if (appliance := self._domain_objects.get_appliance(entity_id)) is not None: + print(f"HOI9 {appliance}") self._appliance_measurements(appliance, data, measurements) self._get_lock_state(appliance, data) @@ -440,14 +412,24 @@ def _power_data_from_location(self) -> GwEntityData: def _appliance_measurements( self, - appliance: etree.Element, + appliance: Appliance, data: GwEntityData, measurements: dict[str, DATA | UOM], ) -> None: """Helper-function for _get_measurement_data() - collect appliance measurement data.""" for measurement, attrs in measurements.items(): - p_locator = f'.//logs/point_log[type="{measurement}"]/period/measurement' - if (appl_p_loc := appliance.find(p_locator)) is not None: + print(f"HOI10 {appliance}") + print(f"HOI10 {appliance.logs}") + if "point_log" not in appliance.logs: + continue + + print(f"HOI10 {appliance.logs.point_log}") + + if ( + measurement := next( + (m for m in appliance.logs if m.type == "measurement"), None + ) + ) is not None: if skip_obsolete_measurements(appliance, measurement): continue @@ -488,11 +470,11 @@ def _get_toggle_state( def _get_plugwise_notifications(self) -> None: """Collect the Plugwise notifications.""" self._notifications = {} - for notification in self._domain_objects.findall("./notification"): + for notification in self._domain_objects.notification: try: - msg_id = notification.get("id") - msg_type = notification.find("type").text - msg = notification.find("message").text + msg_id = notification.id + msg_type = notification.type + msg = notification.message self._notifications[msg_id] = {msg_type: msg} LOGGER.debug("Plugwise notifications: %s", self._notifications) except AttributeError: # pragma: no cover @@ -621,7 +603,7 @@ def _get_gateway_outdoor_temp(self, entity_id: str, data: GwEntityData) -> None: if self._is_thermostat and entity_id == self._gateway_id: locator = "./logs/point_log[type='outdoor_temperature']/period/measurement" if (found := self._home_location.find(locator)) is not None: - value = format_measure(found.text, NONE) + value = format_measure(found.text, None) data.update({"sensors": {"outdoor_temperature": value}}) self._count += 1 @@ -925,7 +907,7 @@ def _rule_ids_by_name(self, name: str, loc_id: str) -> dict[str, dict[str, str]] } else: schedule_ids[rule.get("id")] = { - "location": NONE, + "location": None, "name": name, "active": active, } @@ -952,7 +934,7 @@ def _rule_ids_by_tag(self, tag: str, loc_id: str) -> dict[str, dict[str, str]]: } else: schedule_ids[rule.get("id")] = { - "location": NONE, + "location": None, "name": name, "active": active, } @@ -965,9 +947,9 @@ def _schedules(self, location: str) -> tuple[list[str], str]: Obtain the available schedules/schedules. Adam: a schedule can be connected to more than one location. NEW: when a location_id is present then the schedule is active. Valid for both Adam and non-legacy Anna. """ - available: list[str] = [NONE] + available: list[str] = [None] rule_ids: dict[str, dict[str, str]] = {} - selected = NONE + selected = None tag = "zone_preset_based_on_time_and_presence_with_override" if not (rule_ids := self._rule_ids_by_tag(tag, location)): return available, selected @@ -987,9 +969,9 @@ def _schedules(self, location: str) -> tuple[list[str], str]: schedules.append(name) if schedules: - available.remove(NONE) + available.remove(None) available.append(OFF) - if selected == NONE: + if selected == None: selected = OFF return available, selected diff --git a/plugwise/model.py b/plugwise/model.py new file mode 100644 index 000000000..fe193893e --- /dev/null +++ b/plugwise/model.py @@ -0,0 +1,439 @@ +"""Plugwise models.""" + +from enum import Enum +from typing import Any + +from packaging.version import Version +from pydantic import BaseModel, ConfigDict, Field + + +class PWBase(BaseModel): + """Base / common Plugwise class.""" + + # Allow additional struct (ignored) + model_config = ConfigDict(extra="ignore") + + +class WithID(PWBase): + """Class for Plugwise ID base XML elements. + + Takes id from the xml definition. + """ + + id: str = Field(alias="@id") + model_config = ConfigDict(extra="allow") + + +# Period and measurements +class Measurement(PWBase): + """Plugwise Measurement.""" + + log_date: str = Field(alias="@log_date") + value: str = Field(alias="#text") + + +class Period(PWBase): + """Plugwise period of time.""" + + start_date: str = Field(alias="@start_date") + end_date: str = Field(alias="@end_date") + interval: str | None = Field(default=None, alias="@interval") + measurement: Measurement | None = None + + +# Notification +class Notification(WithID): + """Plugwise notification. + + Our examples only show single optional notification being present + """ + + type: str + origin: str | None = None + title: str | None = None + message: str | None = None + + created_date: str + modified_date: str | list[str] | None = None + deleted_date: str | None = None + + valid_from: str | list[str] | None = None + valid_to: str | list[str] | None = None + read_date: str | list[str] | None = None + + +# Logging +class BaseLog(WithID): + """Plugwise mapping for point_log and interval_log constructs.""" + + type: str + unit: str | None = None + updated_date: str | None = None + last_consecutive_log_date: str | None = None + interval: str | None = None + period: Period | None = None + + +class PointLog(BaseLog): + """Plugwise class ofr specific point_logs. + + i.e. + """ + + relay: WithID | None = None + thermo_meter: WithID | None = None + thermostat: WithID | None = None + battery_meter: WithID | None = None + temperature_offset: WithID | None = None + weather_descriptor: WithID | None = None + irradiance_meter: WithID | None = None + wind_vector: WithID | None = None + hygro_meter: WithID | None = None + + +class IntervalLog(BaseLog): + """Plugwise class ofr specific interval_logs.""" + + electricity_interval_meter: WithID | None = ( + None # references only, still to type if we need this + ) + + +# Functionality +class BaseFunctionality(WithID): + """Plugwise functionality.""" + + updated_date: str | None = None + + +class RelayFunctionality(BaseFunctionality): + """Relay functionality.""" + + lock: bool | None = None + state: str | None = None + relay: WithID | None = None + + +class ThermostatFunctionality(BaseFunctionality): + """Thermostat functionality.""" + + type: str + lower_bound: float + upper_bound: float + resolution: float + setpoint: float + thermostat: WithID | None = None + + +class OffsetFunctionality(BaseFunctionality): + """Offset functionality.""" + + type: str + offset: float + temperature_offset: WithID | None = None + + +# Services +class ServiceBase(WithID): + """Plugwise Services.""" + + log_type: str | None = Field(default=None, alias="@log_type") + endpoint: str | None = Field(default=None, alias="@endpoint") + functionalities: dict[str, WithID | list[WithID]] | None = ( + None # references only, still to type if we need this + ) + + +# Protocols +class Neighbor(PWBase): + """Neighbor definition.""" + + mac_address: str = Field(alias="@mac_address") + lqi: int | None = None + depth: int | None = None + relationship: str | None = None + + +class ZigBeeNode(WithID): + """ZigBee node definition.""" + + reachable: bool | None = None + mac_address: str + type: str + reachable: bool + power_source: str | None = None + battery_type: str | None = None + zig_bee_coordinator: WithID | None = None + neighbors: list[Neighbor] + last_neighbor_table_received: str | None = None + neighbor_table_support: bool | None = None + + +class NetworkRouter(BaseModel): + """Network router.""" + + mac_address: str | None = None + + +class NetworkCoordinator(BaseModel): + """Network coordinator.""" + + mac_address: str | None = None + + +class Protocols(BaseModel): + """Protocol definition.""" + + network_router: NetworkRouter | None = None + network_coordinator: NetworkCoordinator | None = None + zig_bee_node: ZigBeeNode | None = None + + +# Appliance +class ApplianceType(str, Enum): + """Define application types.""" + + GATEWAY = "gateway" + OPENTHERMGW = "open_therm_gateway" + THERMOSTAT = "thermostat" + CHP = "central_heating_pump" + CD = "computer_desktop" + HC = "heater_central" + HT = "hometheater" + STRETCH = "stretch" + THERMO_RV = "thermostatic_radiator_valve" + VA = "valve_actuator" + VA_plug = "valve_actuator_plug" + WHV = "water_heater_vessel" + ZONETHERMOMETER = "zone_thermometer" + ZONETHERMOSTAT = "zone_thermostat" + + # TODO we still need all the '{}_plug' things here eventually + + +class Appliance(WithID): + """Plugwise Appliance.""" + + available: bool = False + name: str + description: str | None = None + type: ApplianceType + created_date: str + modified_date: str | list[str] | None = None + deleted_date: str | None = None + + location: dict[str, Any] | None = None + groups: dict[str, WithID | list[WithID]] | None = None + logs: dict[str, BaseLog | list[BaseLog]] | None = None + actuator_functionalities: ( + dict[str, BaseFunctionality | list[BaseFunctionality]] | None + ) = None + + # Internal processing + fixed_location: str | None = None + + +# Module +class Module(WithID): + """Plugwise Module.""" + + vendor_name: str | None = None + vendor_model: str | None = None + hardware_version: str | None = None + firmware_version: str | None = None + created_date: str + modified_date: str | list[str] | None = None + deleted_date: str | None = None + + # This is too much :) shorted to Any, but we should still look at this + # services: dict[str, ServiceBase | list[ServiceBase]] | list[dict[str, Any]] | None = None + services: dict[str, Any] | list[Any] | None = None + + # protocols: dict[str, Any] | None = None # ZigBeeNode, WLAN, LAN + protocols: dict[str, Protocols] | list[Protocols] | None = None + + +# Gateway +class GatewayEnvironment(WithID): + """Minimal Gateway Environment.""" + + postal_code: str | None = None + electricity_consumption_tariff_structure: str | None = None + electricity_production_tariff_structure: str | None = None + + +class Gateway(Module): + """Plugwise Gateway.""" + + last_reset_date: str | list[str] | None = None + last_boot_date: str | list[str] | None = None + + project: dict[str, Any] | None = None + gateway_environment: GatewayEnvironment | None = None + features: dict[str, Any] | None = None + + +# Group +class ApplianceRef(WithID): + """Group appliance reference.""" + + pass + + +class AppliancesContainer(PWBase): + """Group container containing appliance IDs.""" + + appliance: list[ApplianceRef] | ApplianceRef + + +class GroupType(str, Enum): + """Define group types.""" + + PUMPING = "pumping" + SWITCHING = "switching" + + +class Group(WithID): + """Group of appliances.""" + + name: str + description: str | None = None + type: GroupType | None = None + + created_date: str + modified_date: str | list[str] | None = None + deleted_date: str | None = None + + logs: dict[str, BaseLog | list[BaseLog]] | list[BaseLog] | None + appliances: AppliancesContainer | None = None + actuator_functionalities: dict[str, BaseFunctionality] | None = None + + +# Location +class Location(WithID): + """Plugwise Location.""" + + name: str + description: str | None = None + type: str + created_date: str + modified_date: str | list[str] | None = None + deleted_date: str | None = None + preset: str | None = None + appliances: list[WithID] + logs: dict[str, BaseLog | list[BaseLog]] | list[BaseLog] | None + appliances: dict[str, WithID | list[WithID]] | None = None + actuator_functionalities: dict[str, BaseFunctionality] | None = None + + +# Root objects +class DomainObjects(PWBase): + """Plugwise Domain Objects.""" + + appliance: list[Appliance] = [] + gateway: Gateway | list[Gateway] | None = None + group: Group | list[Group] | None = None + module: list[Module] = [] + location: list[Location] = [] + notification: Notification | list[Notification] | None = None + rule: list[dict] = [] + template: list[dict] = [] + + # Runtime-only cache + _appliance_index: dict[str, Appliance] = {} + _location_index: dict[str, Location] = {} + _module_index: dict[str, Module] = {} + + def model_post_init(self, __context): + """Build index for referencing by ID. + + Runs after validation. + """ + self._appliance_index = {a.id: a for a in self.appliance} + self._location_index = {a.id: a for a in self.location} + self._module_index = {a.id: a for a in self.module} + + def get_appliance(self, id: str) -> Appliance | None: + """Get Appliance by ID.""" + return self._appliance_index.get(id) + + def get_location(self, id: str) -> Location | None: + """Get Location by ID.""" + return self._location_index.get(id) + + def get_module(self, id: str) -> Module | None: + """Get Module by ID.""" + return self._module_index.get(id) + + +class PlugwiseData(PWBase): + """Main XML definition.""" + + domain_objects: DomainObjects + + +# Mappings + + +class SwitchDeviceType(str, Enum): + """Define switch device types.""" + + TOGGLE = "toggle" + LOCK = "lock" + + +class SwitchFunctionType(str, Enum): + """Define switch function types.""" + + TOGGLE = "toggle_functionality" + LOCK = "lock" + NONE = None + + +class SwitchActuatorType(str, Enum): + """Define switch actuator types.""" + + DHWCM = "domestic_hot_water_comfort_mode" + CE = "cooling_enabled" + + +class Switch(BaseModel): + """Switch/relay definition.""" + + device: SwitchDeviceType = SwitchDeviceType.TOGGLE + func_type: SwitchFunctionType = SwitchFunctionType.TOGGLE + act_type: SwitchActuatorType = SwitchActuatorType.CE + func: SwitchFunctionType = SwitchFunctionType.NONE + + +class GatewayData(BaseModel): + """Base Smile/gateway/hub model.""" + + anna_p1: bool = False + hostname: str + firmware_version: str | None = None + hardware_version: str | None = None + legacy: bool = False + mac_address: str | None = None + model: str | None = None + model_id: str | None = None + name: str | None = None + type: ApplianceType | None = None + version: str = "0.0.0" + zigbee_mac_address: str | None = None + + def model_post_init(self, __context): + """Init arbitrary types.""" + self.version = Version(self.version) + + +class ModuleData(BaseModel): + """Module model.""" + + content: bool = False + firmware_version: str | None = None + hardware_version: str | None = None + reachable: bool | None = None + vendor_name: str | None = None + vendor_model: str | None = None + zigbee_mac_address: str | None = None diff --git a/plugwise/smile.py b/plugwise/smile.py index 6acb4233a..2a7b39f41 100644 --- a/plugwise/smile.py +++ b/plugwise/smile.py @@ -36,8 +36,10 @@ # Dict as class from munch import Munch +from .model import PlugwiseData, Switch -def model_to_switch_items(model: str, state: str, switch: Munch) -> tuple[str, Munch]: + +def model_to_switch_items(model: str, state: str, switch: Switch) -> tuple[str, Switch]: """Translate state and switch attributes based on model name. Helper function for set_switch_state(). @@ -74,6 +76,7 @@ def __init__( _request: Callable[..., Awaitable[Any]], _schedule_old_states: dict[str, dict[str, str]], smile: Munch, + data: PlugwiseData, ) -> None: """Set the constructor for this class.""" super().__init__() @@ -87,6 +90,9 @@ def __init__( self._schedule_old_states = _schedule_old_states self.smile = smile self.therms_with_offset_func: list[str] = [] + self.data = data + + print(f"HOI16 {self.data.location}") @property def cooling_present(self) -> bool: @@ -95,8 +101,11 @@ def cooling_present(self) -> bool: async def full_xml_update(self) -> None: """Perform a first fetch of the Plugwise server XML data.""" - self._domain_objects = await self._request(DOMAIN_OBJECTS) - self._get_plugwise_notifications() + await self._request(DOMAIN_OBJECTS, new=True) + print(f"HOI3a {self.data}") + if "notification" in self.data and self.data.notification is not None: + print(f"HOI3b {self.data.notification}") + self._get_plugwise_notifications() def get_all_gateway_entities(self) -> None: """Collect the Plugwise gateway entities and their data and states from the received raw XML-data. diff --git a/plugwise/smilecomm.py b/plugwise/smilecomm.py index 61617a7e4..8c9c840e6 100644 --- a/plugwise/smilecomm.py +++ b/plugwise/smilecomm.py @@ -5,6 +5,8 @@ from __future__ import annotations +import json # Debugging + from plugwise.constants import LOGGER from plugwise.exceptions import ( ConnectionFailedError, @@ -17,6 +19,9 @@ # This way of importing aiohttp is because of patch/mocking in testing (aiohttp timeouts) from aiohttp import BasicAuth, ClientError, ClientResponse, ClientSession, ClientTimeout from defusedxml import ElementTree as etree +import xmltodict + +from .model import Appliance, PlugwiseData class SmileComm: @@ -45,13 +50,31 @@ def __init__( self._auth = BasicAuth(username, password=password) self._endpoint = f"http://{host}:{str(port)}" # Sensitive + def _parse_xml(self, xml: str) -> dict: + """Map XML to Pydantic class.""" + element = etree.fromstring(xml) + xml_dict = xmltodict.parse(etree.tostring(element)) + print(f"HOI1 {xml_dict.keys()}") + print( + f"HOI2 {json.dumps(xmltodict.parse(xml, process_namespaces=True), indent=2)}" + ) + appliance_in = xml_dict["domain_objects"]["appliance"][0] + print(f"HOI4a1 {json.dumps(appliance_in, indent=2)}") + appliance_in = xml_dict["domain_objects"]["appliance"][5] + print(f"HOI4a1 {json.dumps(appliance_in, indent=2)}") + appliance = Appliance.model_validate(appliance_in) + print(f"HOI4a2 {appliance}") + + return PlugwiseData.model_validate(xml_dict) + async def _request( self, command: str, retry: int = 3, method: str = "get", data: str | None = None, - ) -> etree.Element: + new: bool = False, + ) -> etree.Element | str: """Get/put/delete data from a give URL.""" resp: ClientResponse url = f"{self._endpoint}{command}" @@ -105,11 +128,14 @@ async def _request( raise ConnectionFailedError return await self._request(command, retry - 1) - return await self._request_validate(resp, method) + return await self._request_validate(resp, method, new) async def _request_validate( - self, resp: ClientResponse, method: str - ) -> etree.Element: + self, + resp: ClientResponse, + method: str, + new: bool = False, + ) -> etree.Element | str: """Helper-function for _request(): validate the returned data.""" match resp.status: case 200: @@ -143,6 +169,11 @@ async def _request_validate( LOGGER.warning("Smile returns invalid XML for %s", self._endpoint) raise InvalidXMLError from exc + if new: + domain_objects = result + root = self._parse_xml(domain_objects) + self.data = root.domain_objects + return result return xml async def close_connection(self) -> None: diff --git a/tests/test_init.py b/tests/test_init.py index 1f8f3c803..a39d7b279 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -1034,7 +1034,9 @@ def validate_test_basics( if smile_version: log_msg = f" # Assert version matching '{smile_version}" parent_logger.info(log_msg) - assert api.smile.version == version.parse(smile_version) + assert version.parse(api.smile.firmware_version) == version.parse( + smile_version + ) log_msg = f" # Assert legacy {smile_legacy}" parent_logger.info(log_msg) if smile_legacy: