diff --git a/custom_components/robovac/vacuum.py b/custom_components/robovac/vacuum.py index 4d720aaf..4cf0be50 100644 --- a/custom_components/robovac/vacuum.py +++ b/custom_components/robovac/vacuum.py @@ -16,9 +16,11 @@ This module provides the vacuum entity integration for Eufy Robovac devices. """ + from __future__ import annotations import asyncio import base64 +import binascii from datetime import timedelta from enum import StrEnum import json @@ -40,6 +42,8 @@ CONF_MAC, CONF_MODEL, CONF_NAME, + CONF_PASSWORD, + CONF_USERNAME, ) from homeassistant.core import HomeAssistant from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC @@ -47,10 +51,17 @@ from homeassistant.helpers.entity_platform import AddEntitiesCallback from .const import CONF_VACS, DOMAIN, PING_RATE, REFRESH_RATE, TIMEOUT +from .eufywebapi import EufyLogon from .errors import getErrorMessage -from .vacuums.base import RobovacCommand, RoboVacEntityFeature, TuyaCodes, TUYA_CONSUMABLES_CODES +from .vacuums.base import ( + RobovacCommand, + RoboVacEntityFeature, + TuyaCodes, + TUYA_CONSUMABLES_CODES, +) from .robovac import ModelNotSupportedException, RoboVac from .tuyalocalapi import TuyaException +from .tuyawebapi import TuyaAPISession ATTR_BATTERY_ICON = "battery_icon" ATTR_ERROR = "error" @@ -71,6 +82,17 @@ SCAN_INTERVAL = timedelta(seconds=REFRESH_RATE) UPDATE_RETRIES = 3 +# Per-model room discovery strategies. +# Add new entries as models expose room metadata in different DPS payload formats. +ROOM_DISCOVERY_STRATEGIES: dict[str, dict[str, Any]] = { + "T2320": { + "local_dps_key": "ROOM_META", + "local_dps_fallback": "165", + "local_decoder": "_decode_t2320_room_meta_payload", + "cloud_fetcher": "_fetch_room_names_from_tuya_cloud", + } +} + # ⚡ Bolt optimization: Pre-calculate valid VacuumActivity values into a set # to avoid O(n) list comprehension on every property getter access VACUUM_ACTIVITY_VALUES = {activity.value for activity in VacuumActivity} @@ -83,8 +105,14 @@ async def async_setup_entry( ) -> None: """Initialize my test integration 2 config entry.""" vacuums = config_entry.data[CONF_VACS] - for item in vacuums: - item = vacuums[item] + username = config_entry.data.get(CONF_USERNAME) + password = config_entry.data.get(CONF_PASSWORD) + for vacuum_id in vacuums: + item = dict(vacuums[vacuum_id]) + if username: + item[CONF_USERNAME] = username + if password: + item[CONF_PASSWORD] = password entity = RoboVacEntity(item) hass.data[DOMAIN][CONF_VACS][item[CONF_ID]] = entity async_add_entities([entity]) @@ -118,6 +146,7 @@ class RoboVacEntity(StateVacuumEntity): _attr_activity_mapping: dict[str, VacuumActivity] | None = None _attr_error_code: int | str | None = None _attr_tuya_state: int | str | None = None + _attr_room_names: dict[str, dict[str, Any]] | None = None @property def robovac_supported(self) -> int | None: @@ -243,7 +272,9 @@ def _get_mode_command_data(self, mode: str) -> dict[str, str | bool] | None: return None return { - self.get_dps_code("MODE"): self.vacuum.getRoboVacCommandValue(RobovacCommand.MODE, mode) + self.get_dps_code("MODE"): self.vacuum.getRoboVacCommandValue( + RobovacCommand.MODE, mode + ) } @property @@ -257,10 +288,11 @@ def activity(self) -> VacuumActivity | None: if self._attr_tuya_state is None or self._attr_tuya_state == 0: # 0 is a default set when we don't have a state return None - elif ( - self.error_code is not None - and self.error_code not in [0, "no_error", "No error"] - ): + elif self.error_code is not None and self.error_code not in [ + 0, + "no_error", + "No error", + ]: _LOGGER.debug( "State changed to error. Error message: {}".format( getErrorMessage(self.error_code) @@ -279,16 +311,20 @@ def activity(self) -> VacuumActivity | None: _LOGGER.debug( "Used activity mapping, changing status %s to activity %s", self._attr_tuya_state, - activity + activity, ) return activity else: _LOGGER.debug( "Activity mapping lookup failed for status %s - no mapping found", - self._attr_tuya_state + self._attr_tuya_state, ) return None - elif self._attr_tuya_state == "Charging" or self._attr_tuya_state == "completed" or self._attr_tuya_state == "Completed": + elif ( + self._attr_tuya_state == "Charging" + or self._attr_tuya_state == "completed" + or self._attr_tuya_state == "Completed" + ): return VacuumActivity.DOCKED elif self._attr_tuya_state == "Recharge": return VacuumActivity.RETURNING @@ -298,8 +334,7 @@ def activity(self) -> VacuumActivity | None: return VacuumActivity.PAUSED else: _LOGGER.debug( - "State changed to cleaning. Raw Tuya state: %s", - self._attr_tuya_state + "State changed to cleaning. Raw Tuya state: %s", self._attr_tuya_state ) return VacuumActivity.CLEANING @@ -308,7 +343,10 @@ def extra_state_attributes(self) -> dict[str, Any]: """Return the device-specific state attributes of this vacuum.""" data: dict[str, Any] = {} - if self._attr_error_code is not None and self._attr_error_code not in [0, "no_error"]: + if self._attr_error_code is not None and self._attr_error_code not in [ + 0, + "no_error", + ]: data[ATTR_ERROR] = getErrorMessage(self._attr_error_code) if ( self.robovac_supported is not None @@ -348,6 +386,8 @@ def extra_state_attributes(self) -> dict[str, Any]: data[ATTR_CONSUMABLES] = self.consumables if self.mode: data[ATTR_MODE] = self.mode + if self._attr_room_names: + data["room_names"] = self._attr_room_names return data def __init__(self, item: dict[str, Any]) -> None: @@ -381,6 +421,10 @@ def __init__(self, item: dict[str, Any]) -> None: self._consumables_codes_cache: list[str] | None = None self._dps_codes_memo: dict[str, str] = {} self._last_consumable_data: str | None = None + self._room_name_registry: dict[str, dict[str, Any]] = {} + self._eufy_username: str | None = item.get(CONF_USERNAME) + self._eufy_password: str | None = item.get(CONF_PASSWORD) + self._cloud_room_lookup_attempted = False # Initialize the RoboVac connection try: @@ -402,13 +446,10 @@ def __init__(self, item: dict[str, Any]) -> None: _LOGGER.debug( "Initialized RoboVac connection for %s (model: %s)", self._attr_name, - self._attr_model_code + self._attr_model_code, ) except ModelNotSupportedException: - _LOGGER.error( - "Model %s is not supported", - self._attr_model_code - ) + _LOGGER.error("Model %s is not supported", self._attr_model_code) self._attr_error_code = "UNSUPPORTED_MODEL" # Set supported features if vacuum was initialized successfully @@ -423,7 +464,7 @@ def __init__(self, item: dict[str, Any]) -> None: _LOGGER.debug( "Vacuum %s supports features: %s", self._attr_name, - self._attr_supported_features + self._attr_supported_features, ) else: # Set default values if vacuum initialization failed @@ -432,7 +473,7 @@ def __init__(self, item: dict[str, Any]) -> None: self._attr_fan_speed_list = [] _LOGGER.warning( "Vacuum %s initialization failed, features not available", - self._attr_name + self._attr_name, ) # Initialize additional attributes @@ -463,12 +504,19 @@ async def async_update(self) -> None: """ # Skip update if the model is not supported if self._attr_error_code == "UNSUPPORTED_MODEL": - _LOGGER.debug("Skipping update for unsupported model: %s", self._attr_model_code) + _LOGGER.debug( + "Skipping update for unsupported model: %s", self._attr_model_code + ) return + if self._supports_room_discovery() and not self._attr_room_names: + await self._async_fetch_room_names_from_cloud_once() + # Skip update if the IP address is not set if not self.ip_address: - _LOGGER.warning("Cannot update vacuum %s: IP address not set", self._attr_name) + _LOGGER.warning( + "Cannot update vacuum %s: IP address not set", self._attr_name + ) self._attr_error_code = "IP_ADDRESS" return @@ -491,7 +539,7 @@ async def async_update(self) -> None: self._attr_name, self.update_failures, UPDATE_RETRIES, - str(e) + str(e), ) # Set error code after maximum retries @@ -499,7 +547,7 @@ async def async_update(self) -> None: self._attr_error_code = "CONNECTION_FAILED" _LOGGER.error( "Maximum update retries reached for vacuum %s. Marking as unavailable", - self._attr_name + self._attr_name, ) async def pushed_update_handler(self) -> None: @@ -532,7 +580,10 @@ def update_entity_values(self) -> None: if self.tuyastatus is None or not self.tuyastatus: current_time = time.time() # Only log warning when state changes or after 5 minutes - if not self._no_data_warning_logged or (current_time - self._last_no_data_warning_time) >= 300: + if ( + not self._no_data_warning_logged + or (current_time - self._last_no_data_warning_time) >= 300 + ): _LOGGER.warning("Cannot update entity values: no data points available") self._last_no_data_warning_time = current_time self._no_data_warning_logged = True @@ -551,6 +602,7 @@ def update_entity_values(self) -> None: # Update model-specific attributes self._update_cleaning_stats() + self._update_room_names_from_device_payload() def get_dps_code(self, code_name: str | TuyaCodes) -> str: """Get the DPS code for a specific function. @@ -588,7 +640,9 @@ def get_dps_code(self, code_name: str | TuyaCodes) -> str: if isinstance(model_dps_codes, dict) and lookup_name in model_dps_codes: result = str(model_dps_codes[lookup_name]) except Exception as ex: - _LOGGER.debug("Error getting model-specific DPS code for %s: %s", lookup_name, ex) + _LOGGER.debug( + "Error getting model-specific DPS code for %s: %s", lookup_name, ex + ) if not result: # Fallback to defaults in TuyaCodes @@ -626,7 +680,9 @@ def _get_consumables_codes(self) -> list[str]: # Model-specific consumables can be a list or comma-separated string consumables = model_dps_codes["CONSUMABLES"] if isinstance(consumables, str): - self._consumables_codes_cache = [code.strip() for code in consumables.split(",")] + self._consumables_codes_cache = [ + code.strip() for code in consumables.split(",") + ] else: self._consumables_codes_cache = list(consumables) return self._consumables_codes_cache @@ -646,22 +702,26 @@ def _update_state_and_error(self) -> None: # Update state attribute if tuya_state is not None and self.vacuum is not None: - self._attr_tuya_state = self.vacuum.getRoboVacHumanReadableValue(RobovacCommand.STATUS, tuya_state) + self._attr_tuya_state = self.vacuum.getRoboVacHumanReadableValue( + RobovacCommand.STATUS, tuya_state + ) _LOGGER.debug( "in _update_state_and_error, tuya_state: %s, self._attr_tuya_state: %s.", tuya_state, - self._attr_tuya_state + self._attr_tuya_state, ) else: self._attr_tuya_state = 0 # Update error code attribute if error_code is not None and self.vacuum is not None: - self._attr_error_code = self.vacuum.getRoboVacHumanReadableValue(RobovacCommand.ERROR, error_code) + self._attr_error_code = self.vacuum.getRoboVacHumanReadableValue( + RobovacCommand.ERROR, error_code + ) _LOGGER.debug( "in _update_state_and_error, error_code: %s, self._attr_error_code: %s.", error_code, - self._attr_error_code + self._attr_error_code, ) else: self._attr_error_code = 0 @@ -677,11 +737,13 @@ def _update_mode_and_fan_speed(self) -> None: # Update mode attribute if mode is not None and self.vacuum is not None: - self._attr_mode = self.vacuum.getRoboVacHumanReadableValue(RobovacCommand.MODE, mode) + self._attr_mode = self.vacuum.getRoboVacHumanReadableValue( + RobovacCommand.MODE, mode + ) _LOGGER.debug( "in _update_mode_and_fan_speed, mode: %s, self._attr_mode: %s.", mode, - self._attr_mode + self._attr_mode, ) else: self._attr_mode = "" @@ -715,10 +777,14 @@ def _update_cleaning_stats(self) -> None: # Update other attributes using model-specific DPS codes auto_return = self.tuyastatus.get(self.get_dps_code("AUTO_RETURN")) - self._attr_auto_return = str(auto_return) if auto_return is not None else None + self._attr_auto_return = ( + str(auto_return) if auto_return is not None else None + ) do_not_disturb = self.tuyastatus.get(self.get_dps_code("DO_NOT_DISTURB")) - self._attr_do_not_disturb = str(do_not_disturb) if do_not_disturb is not None else None + self._attr_do_not_disturb = ( + str(do_not_disturb) if do_not_disturb is not None else None + ) boost_iq = self.tuyastatus.get(self.get_dps_code("BOOST_IQ")) self._attr_boost_iq = str(boost_iq) if boost_iq is not None else None @@ -750,9 +816,308 @@ def _update_cleaning_stats(self) -> None: and isinstance(consumables.get("consumable"), dict) and "duration" in consumables["consumable"] ): - self._attr_consumables = consumables["consumable"]["duration"] + self._attr_consumables = consumables["consumable"][ + "duration" + ] except Exception as e: - _LOGGER.warning("Failed to decode consumable data: %s", str(e)) + _LOGGER.warning( + "Failed to decode consumable data: %s", str(e) + ) + + def _refresh_room_names_attr(self) -> None: + """Refresh room names exported to Home Assistant.""" + if self._room_name_registry: + self._attr_room_names = { + key: { + "id": entry.get("id"), + "key": entry.get("key", key), + "label": entry.get("label"), + "source": entry.get("source", "device"), + } + for key, entry in sorted(self._room_name_registry.items()) + } + else: + self._attr_room_names = None + + def _normalize_room_entry( + self, identifier: Any, label: Any, source: str + ) -> dict[str, Any]: + """Build a normalized room entry payload.""" + key = str(identifier) + room_label = label.strip() if isinstance(label, str) else "" + if not room_label: + room_label = key + return { + "id": identifier, + "key": key, + "label": room_label, + "source": source, + } + + def _merge_room_entries(self, entries: dict[str, dict[str, Any]]) -> None: + """Merge room entries into registry and refresh attributes.""" + changed = False + for key, entry in entries.items(): + if self._room_name_registry.get(key) != entry: + self._room_name_registry[key] = entry + changed = True + if changed: + self._refresh_room_names_attr() + + def _decode_t2320_room_meta_payload( + self, payload: Any + ) -> dict[str, dict[str, Any]]: + """Decode T2320 DP165 room metadata payload.""" + raw: bytes | None = None + if isinstance(payload, (bytes, bytearray, memoryview)): + raw = bytes(payload) + elif isinstance(payload, str): + text = payload.strip() + if not text: + return {} + try: + raw = base64.b64decode(text, validate=True) + except (binascii.Error, ValueError): + try: + raw = base64.b64decode(text) + except (binascii.Error, ValueError): + return {} + + if not raw: + return {} + + try: + top = self._parse_protobuf_message(raw) + except ValueError: + return {} + + parsed: dict[str, dict[str, Any]] = {} + for entry_payload in top.get(2, []): + if not isinstance(entry_payload, (bytes, bytearray, memoryview)): + continue + try: + room_fields = self._parse_protobuf_message(bytes(entry_payload)) + except ValueError: + continue + + room_id = next( + (v for v in room_fields.get(1, []) if isinstance(v, int)), None + ) + if room_id is None: + continue + + label = None + for value in room_fields.get(2, []): + if not isinstance(value, (bytes, bytearray, memoryview)): + continue + try: + candidate = bytes(value).decode("utf-8").strip() + except UnicodeDecodeError: + continue + if candidate: + label = candidate + break + + parsed[str(room_id)] = self._normalize_room_entry(room_id, label, "device") + + return parsed + + def _parse_protobuf_message(self, message: bytes) -> dict[int, list[int | bytes]]: + """Parse a minimal protobuf message into a field map.""" + offset = 0 + result: dict[int, list[int | bytes]] = {} + + while offset < len(message): + tag, offset = self._read_protobuf_varint(message, offset) + field_number = tag >> 3 + wire_type = tag & 0x07 + + if wire_type == 0: + value, offset = self._read_protobuf_varint(message, offset) + elif wire_type == 2: + size, offset = self._read_protobuf_varint(message, offset) + if offset + size > len(message): + raise ValueError("invalid protobuf payload") + value = message[offset : offset + size] + offset += size + elif wire_type == 1: + if offset + 8 > len(message): + raise ValueError("invalid protobuf payload") + value = message[offset : offset + 8] + offset += 8 + elif wire_type == 5: + if offset + 4 > len(message): + raise ValueError("invalid protobuf payload") + value = message[offset : offset + 4] + offset += 4 + else: + raise ValueError("unsupported protobuf wire type") + + result.setdefault(field_number, []).append(value) + + return result + + def _read_protobuf_varint(self, buffer: bytes, offset: int) -> tuple[int, int]: + """Read protobuf varint from *buffer*.""" + result = 0 + shift = 0 + while offset < len(buffer): + byte = buffer[offset] + offset += 1 + result |= (byte & 0x7F) << shift + if not (byte & 0x80): + return result, offset + shift += 7 + if shift >= 64: + break + raise ValueError("invalid varint") + + def _get_room_discovery_strategy(self) -> dict[str, Any] | None: + """Return the room discovery strategy for the current model.""" + if not self.model_code: + return None + + for model_prefix, strategy in ROOM_DISCOVERY_STRATEGIES.items(): + if self.model_code.startswith(model_prefix): + return strategy + return None + + def _supports_room_discovery(self) -> bool: + """Return whether room discovery is configured for this model.""" + return self._get_room_discovery_strategy() is not None + + def _discover_rooms_from_local_dps(self) -> dict[str, dict[str, Any]]: + """Discover room metadata from local DPS values using strategy.""" + strategy = self._get_room_discovery_strategy() + if not strategy or self.tuyastatus is None: + return {} + + decoder_name = strategy.get("local_decoder") + if not isinstance(decoder_name, str): + return {} + decoder = getattr(self, decoder_name, None) + if not callable(decoder): + return {} + + dps_key_name = strategy.get("local_dps_key", "ROOM_META") + room_meta_code = self.get_dps_code(str(dps_key_name)) or str( + strategy.get("local_dps_fallback", "") + ) + if not room_meta_code: + return {} + + parsed = decoder(self.tuyastatus.get(room_meta_code)) + if not isinstance(parsed, dict): + return {} + _LOGGER.debug("Discovered %d local rooms for %s", len(parsed), self._attr_name) + return parsed + + def _update_room_names_from_device_payload(self) -> None: + """Update room metadata from local DPS payloads.""" + if not self._supports_room_discovery(): + return + + parsed = self._discover_rooms_from_local_dps() + if parsed: + self._merge_room_entries(parsed) + + def _fetch_room_names_from_tuya_cloud(self) -> dict[str, dict[str, Any]]: + """Fetch T2320 room names from Tuya cloud DPS values.""" + username = self._eufy_username + password = self._eufy_password + if not username or not password: + return {} + + eufy_session = EufyLogon(username, password) + response = eufy_session.get_user_info() + if response is None or response.status_code != 200: + return {} + + user_response = response.json() + if user_response.get("res_code") != 1: + return {} + + user_info = user_response.get("user_info", {}) + request_host = user_info.get("request_host") + user_id = user_info.get("id") + access_token = user_response.get("access_token") + if not request_host or not user_id or not access_token: + return {} + + settings_response = eufy_session.get_user_settings( + request_host, user_id, access_token + ) + region = "EU" + if settings_response is not None and settings_response.status_code == 200: + settings = settings_response.json() + region = ( + settings.get("setting", {}) + .get("home_setting", {}) + .get("tuya_home", {}) + .get("tuya_region_code", region) + ) + + tuya_session = TuyaAPISession( + username=f"eh-{user_id}", + region=region, + timezone=user_info.get("timezone") or "Europe/London", + phone_code=user_info.get("phone_code") or "44", + ) + + try: + dps = tuya_session._request( + action="tuya.m.device.dp.get", + version="1.0", + data={"devId": str(self.unique_id)}, + ) + except Exception: + return {} + + room_meta_code = self.get_dps_code("ROOM_META") or "165" + parsed = self._decode_t2320_room_meta_payload(dps.get(str(room_meta_code))) + for entry in parsed.values(): + entry["source"] = "cloud" + return parsed + + def _discover_rooms_from_cloud(self) -> dict[str, dict[str, Any]]: + """Discover room metadata from cloud using strategy.""" + strategy = self._get_room_discovery_strategy() + if not strategy: + return {} + + fetcher_name = strategy.get("cloud_fetcher") + if not isinstance(fetcher_name, str): + return {} + fetcher = getattr(self, fetcher_name, None) + if not callable(fetcher): + return {} + + parsed = fetcher() + if not isinstance(parsed, dict): + return {} + _LOGGER.debug("Discovered %d cloud rooms for %s", len(parsed), self._attr_name) + return parsed + + async def _async_fetch_room_names_from_cloud_once(self) -> None: + """Bootstrap room metadata from cloud one time.""" + if not self._supports_room_discovery(): + return + if self._cloud_room_lookup_attempted: + return + self._cloud_room_lookup_attempted = True + if self.hass is None: + return + + try: + cloud_rooms = await self.hass.async_add_executor_job( + self._discover_rooms_from_cloud + ) + except Exception: + return + + if cloud_rooms: + self._merge_room_entries(cloud_rooms) + self.async_write_ha_state() async def async_locate(self, **kwargs: Any) -> None: """Locate the vacuum cleaner. @@ -783,12 +1148,16 @@ async def async_return_to_base(self, **kwargs: Any) -> None: return payload: dict[str, Any] = { - self.get_dps_code("RETURN_HOME"): self.vacuum.getRoboVacCommandValue(RobovacCommand.RETURN_HOME, "return") + self.get_dps_code("RETURN_HOME"): self.vacuum.getRoboVacCommandValue( + RobovacCommand.RETURN_HOME, "return" + ) } # For models with boolean START_PAUSE (e.g. T2128, T2276), DPS 2 is the # execution trigger — without it, the device ACKs but doesn't physically act. - start_value = self.vacuum.getRoboVacCommandValue(RobovacCommand.START_PAUSE, "start") + start_value = self.vacuum.getRoboVacCommandValue( + RobovacCommand.START_PAUSE, "start" + ) if start_value != "start": payload[self.get_dps_code("START_PAUSE")] = start_value @@ -806,11 +1175,15 @@ async def async_start(self, **kwargs: Any) -> None: return payload: dict[str, Any] = { - self.get_dps_code("MODE"): self.vacuum.getRoboVacCommandValue(RobovacCommand.MODE, "auto") + self.get_dps_code("MODE"): self.vacuum.getRoboVacCommandValue( + RobovacCommand.MODE, "auto" + ) } # For models with boolean START_PAUSE (e.g. T2118, T2128), also toggle start - start_value = self.vacuum.getRoboVacCommandValue(RobovacCommand.START_PAUSE, "start") + start_value = self.vacuum.getRoboVacCommandValue( + RobovacCommand.START_PAUSE, "start" + ) if start_value != "start": payload[self.get_dps_code("START_PAUSE")] = start_value @@ -826,9 +1199,13 @@ async def async_pause(self, **kwargs: Any) -> None: _LOGGER.error("Cannot pause vacuum: vacuum not initialized") return - await self.vacuum.async_set({ - self.get_dps_code("START_PAUSE"): self.vacuum.getRoboVacCommandValue(RobovacCommand.START_PAUSE, "pause") - }) + await self.vacuum.async_set( + { + self.get_dps_code("START_PAUSE"): self.vacuum.getRoboVacCommandValue( + RobovacCommand.START_PAUSE, "pause" + ) + } + ) async def async_stop(self, **kwargs: Any) -> None: """Stop the vacuum cleaner. @@ -849,9 +1226,13 @@ async def async_clean_spot(self, **kwargs: Any) -> None: _LOGGER.error("Cannot clean spot: vacuum not initialized") return - await self.vacuum.async_set({ - self.get_dps_code("MODE"): self.vacuum.getRoboVacCommandValue(RobovacCommand.MODE, "spot") - }) + await self.vacuum.async_set( + { + self.get_dps_code("MODE"): self.vacuum.getRoboVacCommandValue( + RobovacCommand.MODE, "spot" + ) + } + ) async def async_set_fan_speed(self, fan_speed: str, **kwargs: Any) -> None: """Set fan speed. @@ -869,17 +1250,16 @@ async def async_set_fan_speed(self, fan_speed: str, **kwargs: Any) -> None: _LOGGER.debug("Normalized Fan Speed: %s", normalized_fan_speed) - await self.vacuum.async_set({ - self.get_dps_code("FAN_SPEED"): self.vacuum.getRoboVacCommandValue( - RobovacCommand.FAN_SPEED, normalized_fan_speed - ) - }) + await self.vacuum.async_set( + { + self.get_dps_code("FAN_SPEED"): self.vacuum.getRoboVacCommandValue( + RobovacCommand.FAN_SPEED, normalized_fan_speed + ) + } + ) async def async_send_command( - self, - command: str, - params: dict[str, Any] | list | None = None, - **kwargs: Any + self, command: str, params: dict[str, Any] | list | None = None, **kwargs: Any ) -> None: """Send a command to a vacuum cleaner. @@ -897,7 +1277,7 @@ async def async_send_command( mode_commands = { "edgeClean": "edge", "smallRoomClean": "small_room", - "autoClean": "auto" + "autoClean": "auto", } if command in mode_commands: @@ -907,22 +1287,22 @@ async def async_send_command( elif command == "autoReturn": # Toggle the auto return setting new_value = not self._is_value_true(self.auto_return) - await self.vacuum.async_set({ - self.get_dps_code("AUTO_RETURN"): new_value - }) + await self.vacuum.async_set({self.get_dps_code("AUTO_RETURN"): new_value}) elif command == "doNotDisturb": # Toggle the do not disturb setting new_value = not self._is_value_true(self.do_not_disturb) - await self.vacuum.async_set({ - self.get_dps_code("DO_NOT_DISTURB"): new_value - }) + await self.vacuum.async_set( + {self.get_dps_code("DO_NOT_DISTURB"): new_value} + ) elif command == "boostIQ": # Toggle the boost IQ setting new_value = not self._is_value_true(self.boost_iq) - await self.vacuum.async_set({ - self.get_dps_code("BOOST_IQ"): new_value - }) - elif command in ("roomClean", "room_clean") and params is not None and isinstance(params, dict): + await self.vacuum.async_set({self.get_dps_code("BOOST_IQ"): new_value}) + elif ( + command in ("roomClean", "room_clean") + and params is not None + and isinstance(params, dict) + ): room_ids = params.get("roomIds") or params.get("room_ids", [1]) count = params.get("count", 1) clean_request = {"roomIds": room_ids, "cleanTimes": count} @@ -934,7 +1314,8 @@ async def async_send_command( json_str = json.dumps(method_call, separators=(",", ":")) base64_str = base64.b64encode(json_str.encode("utf8")).decode("utf8") _LOGGER.debug("roomClean call %s", json_str) - await self.vacuum.async_set({TuyaCodes.ROOM_CLEAN: base64_str}) + room_clean_code = self.get_dps_code("ROOM_CLEAN") or TuyaCodes.ROOM_CLEAN + await self.vacuum.async_set({room_clean_code: base64_str}) # Wait for the vacuum to ACK DPS 124 before sending the start command. # Without this delay, DPS 2 arrives before the room selection is processed # and the vacuum ignores the start command.