diff --git a/custom_components/robovac/__init__.py b/custom_components/robovac/__init__.py index 2e948829..6b3118c3 100644 --- a/custom_components/robovac/__init__.py +++ b/custom_components/robovac/__init__.py @@ -25,7 +25,7 @@ from .tuyalocaldiscovery import TuyaLocalDiscovery -PLATFORMS = [Platform.VACUUM, Platform.SENSOR] +PLATFORMS = [Platform.VACUUM, Platform.SENSOR, Platform.SELECT, Platform.SWITCH] _LOGGER = logging.getLogger(__name__) diff --git a/custom_components/robovac/proto_decode.py b/custom_components/robovac/proto_decode.py index 660a9f04..10f0a322 100644 --- a/custom_components/robovac/proto_decode.py +++ b/custom_components/robovac/proto_decode.py @@ -17,6 +17,7 @@ """ import base64 +import binascii from typing import Any @@ -125,6 +126,239 @@ def _parse_proto(data: bytes) -> dict[int, Any]: return fields +def _encode_varint(v: int) -> bytes: + """Encode a non-negative integer as protobuf varint bytes.""" + if v < 0: + raise ValueError("varint must be non-negative") + buf = bytearray() + while True: + piece = v & 0x7F + v >>= 7 + if v: + buf.append(piece | 0x80) + else: + buf.append(piece) + break + return bytes(buf) + + +def _serialize_proto(fields: dict[int, Any]) -> bytes: + """Serialize a protobuf message from field_num -> int | bytes | dict (nested submessage). + + Wire types: int → varint (type 0), bytes/dict → length-delimited (type 2). + Dict values are serialized as nested sub-messages. + """ + chunks: list[bytes] = [] + for field_num in sorted(fields.keys()): + val = fields[field_num] + tag = field_num << 3 + if isinstance(val, int): + chunks.append(_encode_varint(tag)) + chunks.append(_encode_varint(val)) + elif isinstance(val, bytes): + chunks.append(_encode_varint(tag | 2)) + chunks.append(_encode_varint(len(val))) + chunks.append(val) + elif isinstance(val, dict): + nested = _serialize_proto(val) + chunks.append(_encode_varint(tag | 2)) + chunks.append(_encode_varint(len(nested))) + chunks.append(nested) + else: + raise TypeError(f"Unsupported protobuf field value type: {type(val)!r}") + return b"".join(chunks) + + +def _clean_param_b64_with_length_prefix(proto_payload: bytes) -> str: + """Wrap payload with single-byte length prefix and return base64 (DPS 154 on-wire).""" + return base64.b64encode(bytes([len(proto_payload)]) + proto_payload).decode("ascii") + + +def _with_varint_length_prefix(proto_payload: bytes) -> str: + """Wrap payload with a protobuf varint length prefix and base64 encode it.""" + return base64.b64encode(_encode_varint(len(proto_payload)) + proto_payload).decode("ascii") + + +def _strip_varint_length_prefix(data: bytes) -> bytes: + """Remove a leading protobuf varint length prefix when it matches payload size.""" + if not data: + return data + try: + length, offset = _parse_varint(data, 0) + except Exception: + return data + if length == len(data) - offset: + return data[offset:] + return data + + +def _field_varint(field_num: int, value: int) -> bytes: + return _encode_varint(field_num << 3) + _encode_varint(value) + + +def _field_bytes(field_num: int, value: bytes) -> bytes: + return _encode_varint((field_num << 3) | 2) + _encode_varint(len(value)) + value + + +def decode_t2320_room_meta(raw_b64: str) -> dict[str, Any]: + """Decode T2320/X9 Pro DPS 165 room metadata. + + Real X9 Pro payload shape: + length_prefix + field_1 bytes: + field_1 uint32 map_id + repeated field_2 bytes: + field_1 uint32 room_id + field_2 string room_label + """ + if not raw_b64: + return {"map_id": None, "rooms": []} + + raw = _strip_varint_length_prefix(base64.b64decode(raw_b64)) + outer = _parse_proto(raw) + inner_raw = outer.get(1) + if isinstance(inner_raw, list): + inner_raw = next((item for item in inner_raw if isinstance(item, bytes)), None) + if not isinstance(inner_raw, bytes): + return {"map_id": None, "rooms": []} + + inner = _parse_proto(inner_raw) + rooms: list[dict[str, Any]] = [] + entries = inner.get(2, []) + if isinstance(entries, bytes): + entries = [entries] + if not isinstance(entries, list): + entries = [] + + for entry_raw in entries: + if not isinstance(entry_raw, bytes): + continue + entry = _parse_proto(entry_raw) + room_id = entry.get(1) + label_raw = entry.get(2) + if not isinstance(room_id, int): + continue + label = str(room_id) + if isinstance(label_raw, bytes): + try: + decoded = label_raw.decode("utf-8").strip() + if decoded: + label = decoded + except UnicodeDecodeError: + pass + rooms.append({"id": room_id, "label": label}) + + return { + "map_id": inner.get(1) if isinstance(inner.get(1), int) else None, + "rooms": rooms, + } + + +def build_t2320_room_clean_mode( + room_ids: list[int], + *, + map_id: int, + clean_times: int = 1, +) -> str: + """Build a DPS 152 ModeCtrlRequest for T2320 selected-room cleaning.""" + if not room_ids: + raise ValueError("room_ids must not be empty") + if map_id <= 0: + raise ValueError("map_id must be positive") + if clean_times <= 0: + raise ValueError("clean_times must be positive") + + select_rooms = b"" + for order, room_id in enumerate(room_ids, start=1): + room = _field_varint(1, int(room_id)) + _field_varint(2, order) + select_rooms += _field_bytes(1, room) + select_rooms += _field_varint(2, int(clean_times)) + select_rooms += _field_varint(3, int(map_id)) + + # ModeCtrlRequest: method=1 START_SELECT_ROOMS_CLEAN, oneof field 4 payload. + request = _field_varint(1, 1) + _field_bytes(4, select_rooms) + return _with_varint_length_prefix(request) + + +def patch_clean_param_dps154( + raw_b64: str, + *, + clean_type: str | None = None, + mop_level: str | None = None, + edge_hugging_mopping: bool | None = None, +) -> str: + """Patch decoded CleanParamResponse params and re-encode for DPS 154 SET. + + Preserves other outer fields and inner fields (carpet, extent, etc.) when possible. + """ + try: + body = base64.b64decode(raw_b64, validate=True) + except (binascii.Error, ValueError) as err: + raise ValueError("invalid clean param base64") from err + if not body: + raise ValueError("empty clean param base64") + # Match decode_clean_param_response / _strip_length_prefix: byte 0 is length, protobuf starts at 1 + payload = body[1:] if len(body) > 1 else b"" + + outer = _parse_proto(payload) + + clean_type_names = ["sweep_only", "mop_only", "sweep_and_mop", "sweep_then_mop"] + mop_level_names = ["low", "middle", "high"] + + def _patch_layer(inner_raw: Any) -> bytes: + if not isinstance(inner_raw, bytes): + inner_raw = b"" + inner: dict[int, Any] = dict(_parse_proto(inner_raw)) + + if clean_type_key is not None: + inner[1] = _serialize_proto({1: clean_type_names.index(clean_type_key)}) + + if mop_level is not None or edge_hugging_mopping is not None: + prev_m = inner.get(4) + midx: int | None = None + edge = 0 + if isinstance(prev_m, bytes): + mf = _parse_proto(prev_m) + midx = _as_varint(mf.get(1)) + edge = int(mf.get(2) or 0) + if mop_level_key is not None: + midx = mop_level_names.index(mop_level_key) + if midx is None: + midx = 0 + if edge_hugging_mopping is not None: + edge = 1 if edge_hugging_mopping else 0 + inner[4] = _serialize_proto({1: midx, 2: edge}) + + return _serialize_proto(inner) + + clean_type_key = None + if clean_type is not None: + key = clean_type.lower().replace(" ", "_").replace("-", "_") + if key not in clean_type_names: + raise ValueError(f"Unknown clean_type {clean_type!r}") + clean_type_key = key + + mop_level_key = None + if mop_level is not None or edge_hugging_mopping is not None: + if mop_level is not None: + mk = mop_level.lower().replace(" ", "_") + if mk not in mop_level_names: + raise ValueError(f"Unknown mop_level {mop_level!r}") + mop_level_key = mk + + new_outer = dict(outer) + fields_to_patch = ( + (1, 4) + if edge_hugging_mopping is not None and mop_level is None + else (1, 3, 4) + ) + for field in fields_to_patch: + if field in outer or field == 1: + new_outer[field] = _patch_layer(outer.get(field)) + new_payload = _serialize_proto(new_outer) + return _clean_param_b64_with_length_prefix(new_payload) + + def _strip_length_prefix(raw_b64: str) -> bytes: """Decode base64 and strip the length prefix byte.""" return base64.b64decode(raw_b64)[1:] @@ -511,8 +745,10 @@ def _decode_param(param_bytes: bytes | None) -> dict[str, Any]: v = _enum_val(f[3]) result["clean_extent"] = EXTENT_NAMES[v] if v < len(EXTENT_NAMES) else f"extent_{v}" if 4 in f: + mop_fields = _parse_proto(f[4]) if isinstance(f[4], bytes) else {} v = _enum_val(f[4]) result["mop_level"] = MOP_LEVEL_NAMES[v] if v < len(MOP_LEVEL_NAMES) else f"mop_{v}" + result["edge_hugging_mopping"] = mop_fields.get(2) == 1 if 6 in f: v = _enum_val(f[6]) result["fan"] = FAN_NAMES[v] if v < len(FAN_NAMES) else f"fan_{v}" @@ -536,6 +772,24 @@ def _decode_param(param_bytes: bytes | None) -> dict[str, Any]: return result +def merge_clean_param_layers(decoded: dict[str, Any]) -> dict[str, Any]: + """Merge clean_param / area_clean_param / running_clean_param for display. + + During an active job, ``running_clean_param`` often includes fan and mop + fields but omits ``clean_type``; that value still lives on the global + ``clean_param``. Later layers override earlier ones for keys present in both. + """ + merged: dict[str, Any] = {} + for layer in ( + decoded.get("clean_param"), + decoded.get("area_clean_param"), + decoded.get("running_clean_param"), + ): + if isinstance(layer, dict): + merged.update(layer) + return merged + + def decode_consumable_response(raw_b64: str) -> dict[str, int]: """Decode DPS 168 (ConsumableResponse) to a dict of {name: hours}. diff --git a/custom_components/robovac/robovac.py b/custom_components/robovac/robovac.py index 5faa4f98..b5b28de8 100644 --- a/custom_components/robovac/robovac.py +++ b/custom_components/robovac/robovac.py @@ -187,6 +187,10 @@ def getDpsCodes(self) -> dict[str, str]: } codes = {} + model_dps_codes = getattr(self.model_details, "dps_codes", {}) + if isinstance(model_dps_codes, dict): + codes.update({str(key): str(value) for key, value in model_dps_codes.items()}) + # Extract codes from commands dictionary for key, value in self.model_details.commands.items(): # Get the DPS name from the mapping, or use the command name if not in mapping diff --git a/custom_components/robovac/select.py b/custom_components/robovac/select.py new file mode 100644 index 00000000..55645b42 --- /dev/null +++ b/custom_components/robovac/select.py @@ -0,0 +1,362 @@ +"""Select entities for RoboVac DPS-backed settings (clean type, mop level, fan speed).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, cast + +from homeassistant.components.select import SelectEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import CONF_ID, CONF_MODEL, CONF_NAME, EntityCategory +from homeassistant.core import HomeAssistant +from homeassistant.exceptions import HomeAssistantError +from homeassistant.helpers.device_registry import DeviceInfo +from homeassistant.helpers.entity_platform import AddEntitiesCallback + +from .const import CONF_VACS, DOMAIN +from .vacuums import ROBOVAC_MODELS +from .vacuums.base import RobovacCommand + +if TYPE_CHECKING: + from .vacuum import RoboVacEntity + +_CLEAN_TYPE_LABELS_ALL = { + "sweep_only": "Sweep only", + "mop_only": "Mop only", + "sweep_and_mop": "Vacuum and mop", + "sweep_then_mop": "Vacuum then mop", +} +_DEFAULT_CLEAN_TYPE_KEYS = tuple(_CLEAN_TYPE_LABELS_ALL.keys()) + +_MOP_LEVEL_TO_OPTION = {"low": "Low", "middle": "Middle", "high": "High"} +_OPTION_TO_MOP_LEVEL = {v: k for k, v in _MOP_LEVEL_TO_OPTION.items()} +_CLEAN_WHOLE_HOUSE_OPTION = "Clean whole house" + + +class _RobovacSelectEntity(SelectEntity): + """Base that bypasses SelectEntity cached_property for options/current_option. + + HA caches the first return value of options/current_option; our values are + filled after the vacuum reports DPS, so we always read _attr_* directly. + """ + + @property + def options(self) -> list[str]: + return self._attr_options + + @property + def current_option(self) -> str | None: + return self._attr_current_option + + +def _vacuum_ready(vacuum_entity: RoboVacEntity | None) -> bool: + """True when the vacuum object exists and can accept setting commands.""" + return bool(vacuum_entity and vacuum_entity.vacuum is not None) + + +def _match_fan_speed_option(current: str, options: list[str]) -> str | None: + """Pick the fan_speed_list entry that matches the vacuum's displayed fan string.""" + cur = str(current).strip() + if not cur: + return None + folded = cur.casefold() + for opt in options: + if opt == cur or str(opt).casefold() == folded: + return opt + return None + + +def _device_info(item: dict) -> DeviceInfo: + return DeviceInfo( + identifiers={(DOMAIN, item[CONF_ID])}, + name=item[CONF_NAME], + ) + + +async def async_setup_entry( + hass: HomeAssistant, + config_entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Set up RoboVac select entities.""" + vacuums = config_entry.data[CONF_VACS] + entities: list[SelectEntity] = [] + + for key in vacuums: + item = vacuums[key] + model_prefix = (item.get(CONF_MODEL) or "")[:5] + model_class = ROBOVAC_MODELS.get(model_prefix) + if model_class is None: + continue + if not getattr(model_class, "expose_config_entities", False): + continue + commands = getattr(model_class, "commands", {}) + if RobovacCommand.FAN_SPEED in commands: + entities.append(RobovacFanSpeedSelect(item)) + + if RobovacCommand.CLEAN_PARAM in commands: + dps = str(commands[RobovacCommand.CLEAN_PARAM]["code"]) + clean_keys: tuple[str, ...] = getattr( + model_class, "clean_type_select_keys", _DEFAULT_CLEAN_TYPE_KEYS + ) + entities.append(RobovacCleanTypeSelect(item, dps, clean_keys)) + entities.append(RobovacMopLevelSelect(item, dps)) + if getattr(model_class, "expose_room_select", False): + entities.append(RobovacRoomSelect(item)) + + async_add_entities(entities) + + +class RobovacCleanTypeSelect(_RobovacSelectEntity): + """Select clean type (sweep / mop / both) via DPS 154 protobuf patch.""" + + _attr_has_entity_name = True + _attr_should_poll = True + _attr_entity_category = EntityCategory.CONFIG + _attr_icon = "mdi:broom" + + def __init__(self, item: dict, dps_code: str, clean_type_keys: tuple[str, ...]) -> None: + self.robovac_id = item[CONF_ID] + self._dps_code = dps_code + self._snake_to_label = { + k: _CLEAN_TYPE_LABELS_ALL[k] + for k in clean_type_keys + if k in _CLEAN_TYPE_LABELS_ALL + } + self._label_to_snake = {v: k for k, v in self._snake_to_label.items()} + self._attr_unique_id = f"{item[CONF_ID]}_clean_type_select" + self._attr_name = "Clean type" + self._attr_device_info = _device_info(item) + self._attr_options = list(self._snake_to_label.values()) + + async def async_update(self) -> None: + try: + vacuum_entity: RoboVacEntity | None = self.hass.data[DOMAIN][CONF_VACS].get( + self.robovac_id + ) + except KeyError: + vacuum_entity = None + if not _vacuum_ready(vacuum_entity): + self._attr_available = False + self._attr_current_option = None + return + vacuum_entity = cast("RoboVacEntity", vacuum_entity) + self._attr_available = True + ct = vacuum_entity.clean_type + if ct is None: + self._attr_current_option = None + return + key = str(ct).lower().replace(" ", "_").replace("-", "_") + opt = self._snake_to_label.get(key) + if opt is None and key == "sweep_then_mop" and "sweep_and_mop" in self._snake_to_label: + opt = self._snake_to_label["sweep_and_mop"] + self._attr_current_option = opt + + async def async_select_option(self, option: str) -> None: + try: + vacuum_entity: RoboVacEntity | None = self.hass.data[DOMAIN][CONF_VACS].get( + self.robovac_id + ) + except KeyError: + vacuum_entity = None + if not vacuum_entity: + return + snake = self._label_to_snake.get(option, option.lower().replace(" ", "_")) + await vacuum_entity.async_set_clean_param(clean_type=snake) + await self.async_update() + self.async_write_ha_state() + + +class RobovacMopLevelSelect(_RobovacSelectEntity): + """Select mop water level via DPS 154 protobuf patch.""" + + _attr_has_entity_name = True + _attr_should_poll = True + _attr_entity_category = EntityCategory.CONFIG + _attr_icon = "mdi:cup-water" + + def __init__(self, item: dict, dps_code: str) -> None: + self.robovac_id = item[CONF_ID] + self._dps_code = dps_code + self._attr_unique_id = f"{item[CONF_ID]}_mop_level_select" + self._attr_name = "Mop level" + self._attr_device_info = _device_info(item) + self._attr_options = list(_MOP_LEVEL_TO_OPTION.values()) + + async def async_update(self) -> None: + try: + vacuum_entity: RoboVacEntity | None = self.hass.data[DOMAIN][CONF_VACS].get( + self.robovac_id + ) + except KeyError: + vacuum_entity = None + if not _vacuum_ready(vacuum_entity): + self._attr_available = False + self._attr_current_option = None + return + vacuum_entity = cast("RoboVacEntity", vacuum_entity) + self._attr_available = True + ml = vacuum_entity.mop_level + if ml is None: + self._attr_current_option = None + return + key = str(ml).lower().replace(" ", "_") + opt = _MOP_LEVEL_TO_OPTION.get(key) + self._attr_current_option = opt + + async def async_select_option(self, option: str) -> None: + try: + vacuum_entity: RoboVacEntity | None = self.hass.data[DOMAIN][CONF_VACS].get( + self.robovac_id + ) + except KeyError: + vacuum_entity = None + if not vacuum_entity: + return + snake = _OPTION_TO_MOP_LEVEL.get(option, option.lower()) + await vacuum_entity.async_set_mop_level(snake) + await self.async_update() + self.async_write_ha_state() + + +class RobovacFanSpeedSelect(_RobovacSelectEntity): + """Fan / suction level (same DPS as the vacuum card, exposed under Configuration).""" + + _attr_has_entity_name = True + _attr_should_poll = True + _attr_entity_category = EntityCategory.CONFIG + _attr_icon = "mdi:fan" + + def __init__(self, item: dict) -> None: + self.robovac_id = item[CONF_ID] + self._attr_unique_id = f"{item[CONF_ID]}_fan_speed_select" + self._attr_name = "Fan speed" + self._attr_device_info = _device_info(item) + model_prefix = (item.get(CONF_MODEL) or "")[:5] + model_class = ROBOVAC_MODELS.get(model_prefix) + values: dict[str, str] = {} + if model_class is not None: + command = getattr(model_class, "commands", {}).get(RobovacCommand.FAN_SPEED, {}) + values = command.get("values", {}) if isinstance(command, dict) else {} + self._attr_options = [ + key.replace("_", " ").title() + for key in values + ] + + async def async_update(self) -> None: + try: + vacuum_entity: RoboVacEntity | None = self.hass.data[DOMAIN][CONF_VACS].get( + self.robovac_id + ) + except KeyError: + vacuum_entity = None + if not _vacuum_ready(vacuum_entity): + self._attr_available = False + self._attr_current_option = None + return + vacuum_entity = cast("RoboVacEntity", vacuum_entity) + opts = list(vacuum_entity.fan_speed_list or self._attr_options) + if not opts: + self._attr_available = False + self._attr_current_option = None + return + self._attr_options = opts + self._attr_available = True + cur = vacuum_entity.fan_speed + if not cur: + self._attr_current_option = None + return + self._attr_current_option = _match_fan_speed_option(str(cur), opts) + + async def async_select_option(self, option: str) -> None: + try: + vacuum_entity: RoboVacEntity | None = self.hass.data[DOMAIN][CONF_VACS].get( + self.robovac_id + ) + except KeyError: + vacuum_entity = None + if not vacuum_entity: + return + await vacuum_entity.async_set_fan_speed(option) + await self.async_update() + self.async_write_ha_state() + + +class RobovacRoomSelect(SelectEntity): + """Select a known room target and start selected-room cleaning.""" + + _attr_has_entity_name = True + _attr_should_poll = True + _attr_entity_category = EntityCategory.CONFIG + _attr_icon = "mdi:floor-plan" + + def __init__(self, item: dict) -> None: + self.robovac_id = item[CONF_ID] + self._attr_unique_id = f"{item[CONF_ID]}_room_select" + self._attr_name = "Room" + self._attr_device_info = _device_info(item) + self._attr_options = [_CLEAN_WHOLE_HOUSE_OPTION] + self._attr_current_option: str | None = None + self._room_lookup: dict[str, Any] = {} + + @property + def options(self) -> list[str]: + return self._attr_options + + @property + def current_option(self) -> str | None: + return self._attr_current_option + + async def async_update(self) -> None: + vacuum_entity = self._vacuum() + if not _vacuum_ready(vacuum_entity): + self._attr_available = False + return + vacuum_entity = cast("RoboVacEntity", vacuum_entity) + self._attr_available = True + if hasattr(vacuum_entity, "async_get_segments"): + segments = await vacuum_entity.async_get_segments() + else: + segments = [] + options = [_CLEAN_WHOLE_HOUSE_OPTION] + lookup: dict[str, Any] = {} + for segment in segments: + if isinstance(segment, dict): + segment_id = segment.get("id") + segment_name = segment.get("name") + else: + segment_id = getattr(segment, "id", None) + segment_name = getattr(segment, "name", None) + label = str(segment_name or segment_id or "").strip() + if not label: + continue + lookup[label] = segment_id + options.append(label) + self._room_lookup = lookup + self._attr_options = options + + async def async_select_option(self, option: str) -> None: + vacuum_entity = self._vacuum() + if not vacuum_entity: + return + if option == _CLEAN_WHOLE_HOUSE_OPTION: + await vacuum_entity.async_start() + else: + room_id = self._room_lookup.get(option) + if room_id is None: + raise HomeAssistantError(f"Invalid room option: {option}") + await vacuum_entity.async_send_command( + "roomClean", + {"roomIds": [room_id], "count": 1}, + ) + self._attr_current_option = option + await self.async_update() + self.async_write_ha_state() + + def _vacuum(self) -> RoboVacEntity | None: + try: + return cast( + "RoboVacEntity | None", + self.hass.data[DOMAIN][CONF_VACS].get(self.robovac_id), + ) + except KeyError: + return None diff --git a/custom_components/robovac/sensor.py b/custom_components/robovac/sensor.py index cc318640..c3ec4c26 100644 --- a/custom_components/robovac/sensor.py +++ b/custom_components/robovac/sensor.py @@ -15,6 +15,7 @@ from .vacuums import ROBOVAC_MODELS from .proto_decode import ( decode_clean_param_response, + merge_clean_param_layers, decode_clean_record_list, decode_consumable_response, decode_device_info, @@ -33,14 +34,15 @@ # Consumables exposed as individual sensors for proto-based models (DPS 168). # Tuple: (decode_key, display_name, icon) -_PROTO_CONSUMABLES = [ - ("side_brush", "Side Brush", "mdi:brush"), - ("rolling_brush", "Rolling Brush", "mdi:brush-variant"), - ("filter_mesh", "Filter", "mdi:air-filter"), - ("scrape", "Scraper", "mdi:squeegee"), - ("sensor", "Sensor", "mdi:motion-sensor"), - ("dustbag", "Dust Bag", "mdi:trash-can-outline"), -] +_PROTO_CONSUMABLES = { + "side_brush": ("Side Brush", "mdi:brush"), + "rolling_brush": ("Rolling Brush", "mdi:brush-variant"), + "filter_mesh": ("Filter", "mdi:air-filter"), + "scrape": ("Scraper", "mdi:squeegee"), + "sensor": ("Sensor", "mdi:motion-sensor"), + "mop": ("Mop", "mdi:robot-vacuum"), + "dustbag": ("Dust Bag", "mdi:trash-can-outline"), +} def _device_info(item: dict) -> DeviceInfo: @@ -80,17 +82,29 @@ async def async_setup_entry( # Notification sensor — prompt/notification codes (DPS 178) if RobovacCommand.ACTIVE_ERRORS in commands: dps = str(commands[RobovacCommand.ACTIVE_ERRORS]["code"]) - entities.append(RobovacNotificationSensor(item, dps)) + entities.append(RobovacNotificationSensor(item, dps, model_class)) + + # Warning sensor — non-fatal maintenance/station warnings. + warning_dps = getattr(model_class, "warning_dps_code", None) + if warning_dps is not None: + entities.append(RobovacWarningSensor(item, str(warning_dps), model_class)) # Per-consumable sensors — proto models using DPS 168 consumables_cmd = commands.get(RobovacCommand.CONSUMABLES, {}) if isinstance(consumables_cmd, dict) and consumables_cmd.get("code") == 168: dps = str(consumables_cmd["code"]) - for key, label, icon in _PROTO_CONSUMABLES: + keys = getattr(model_class, "consumable_sensor_keys", _PROTO_CONSUMABLES) + for key in keys: + label, icon = _PROTO_CONSUMABLES[key] entities.append(RobovacConsumableSensor(item, dps, key, label, icon)) - # Clean-type sensor — DPS 154 - if RobovacCommand.CLEAN_PARAM in commands: + # Clean-type sensor — DPS 154. Models that expose first-class config + # entities already surface these values as selects/switches and vacuum + # attributes, so avoid creating a duplicate diagnostic sensor. + if ( + RobovacCommand.CLEAN_PARAM in commands + and not getattr(model_class, "expose_config_entities", False) + ): dps = str(commands[RobovacCommand.CLEAN_PARAM]["code"]) entities.append(RobovacCleanTypeSensor(item, dps)) @@ -264,6 +278,8 @@ def __init__(self, item: dict, dps_code: str) -> None: self._attr_unique_id = f"{item[CONF_ID]}_error" self._attr_name = "Error" self._attr_device_info = _device_info(item) + self._attr_native_value = "No error" + self._attr_available = True self._has_had_data = False async def async_update(self) -> None: @@ -276,12 +292,14 @@ async def async_update(self) -> None: return if not tuyastatus: if not self._has_had_data: - self._attr_available = False + self._attr_native_value = "No error" + self._attr_available = True return raw = tuyastatus.get(self._dps_code) if raw is None: if not self._has_had_data: - self._attr_available = False + self._attr_native_value = "No error" + self._attr_available = True return if vacuum_entity.vacuum is not None: decoded = vacuum_entity.vacuum.getRoboVacHumanReadableValue( @@ -289,7 +307,7 @@ async def async_update(self) -> None: ) else: decoded = str(raw) - self._attr_native_value = None if decoded == "no_error" else decoded + self._attr_native_value = "No error" if decoded == "no_error" else decoded self._attr_available = True self._has_had_data = True except Exception as ex: @@ -309,11 +327,13 @@ class RobovacNotificationSensor(SensorEntity): _attr_has_entity_name = True _attr_should_poll = True + _attr_entity_category = EntityCategory.DIAGNOSTIC _attr_icon = "mdi:bell-outline" - def __init__(self, item: dict, dps_code: str) -> None: + def __init__(self, item: dict, dps_code: str, model_class: type | None = None) -> None: self.robovac_id = item[CONF_ID] self._dps_code = dps_code + self._model_class = model_class self._attr_unique_id = f"{item[CONF_ID]}_notification" self._attr_name = "Notification" self._attr_device_info = _device_info(item) @@ -336,7 +356,10 @@ async def async_update(self) -> None: if not self._has_had_data: self._attr_available = False return - value = decode_error_code(raw) + decoder = getattr(self._model_class, "decode_dps", None) + value = decoder(self._dps_code, raw) if decoder else None + if value is None: + value = decode_error_code(raw) self._attr_native_value = None if value == "no_error" else value self._attr_available = True self._has_had_data = True @@ -345,6 +368,59 @@ async def async_update(self) -> None: self._attr_available = False +class RobovacWarningSensor(SensorEntity): + """Non-fatal maintenance/station warnings from model-specific DPS fields.""" + + _attr_has_entity_name = True + _attr_should_poll = True + _attr_entity_category = EntityCategory.DIAGNOSTIC + _attr_icon = "mdi:alert-outline" + + def __init__(self, item: dict, dps_code: str, model_class: type) -> None: + self.robovac_id = item[CONF_ID] + self._dps_code = dps_code + self._model_class = model_class + self._attr_unique_id = f"{item[CONF_ID]}_warning" + self._attr_name = "Warning" + self._attr_device_info = _device_info(item) + self._attr_native_value = "No warning" + self._attr_extra_state_attributes: dict = {"warnings": []} + self._attr_available = True + self._has_had_data = False + + async def async_update(self) -> None: + try: + vacuum_entity, tuyastatus = _vacuum_and_status( + self.hass, DOMAIN, CONF_VACS, self.robovac_id + ) + if vacuum_entity is None: + self._attr_available = False + return + if not tuyastatus: + if not self._has_had_data: + self._attr_native_value = "No warning" + self._attr_extra_state_attributes = {"warnings": []} + self._attr_available = True + return + raw = tuyastatus.get(self._dps_code) + if raw is None: + if not self._has_had_data: + self._attr_native_value = "No warning" + self._attr_extra_state_attributes = {"warnings": []} + self._attr_available = True + return + decoder = getattr(self._model_class, "decode_warning_dps", None) + warnings = decoder(raw) if decoder else [] + messages = [str(warning["message"]) for warning in warnings] + self._attr_native_value = "; ".join(messages) if messages else "No warning" + self._attr_extra_state_attributes = {"warnings": warnings} + self._attr_available = True + self._has_had_data = True + except Exception as ex: + _LOGGER.error("Failed to update warning sensor for %s: %s", self.robovac_id, ex) + self._attr_available = False + + # --------------------------------------------------------------------------- # Consumable sensors (DPS 168) # --------------------------------------------------------------------------- @@ -359,6 +435,7 @@ class RobovacConsumableSensor(SensorEntity): _attr_has_entity_name = True _attr_should_poll = True + _attr_entity_category = EntityCategory.DIAGNOSTIC _attr_native_unit_of_measurement = "h" def __init__( @@ -456,8 +533,7 @@ async def async_update(self) -> None: if not self._has_had_data: self._attr_available = False return - # Prefer running params (active clean); fall back to global defaults - params = d.get("running_clean_param") or d.get("clean_param") or {} + params = merge_clean_param_layers(d) clean_type = params.get("clean_type") if clean_type is None: if not self._has_had_data: @@ -466,7 +542,13 @@ async def async_update(self) -> None: self._attr_native_value = clean_type self._attr_extra_state_attributes = { k: params[k] - for k in ("fan", "clean_extent", "mop_level", "clean_times") + for k in ( + "fan", + "clean_extent", + "mop_level", + "edge_hugging_mopping", + "clean_times", + ) if k in params } self._attr_available = True diff --git a/custom_components/robovac/switch.py b/custom_components/robovac/switch.py new file mode 100644 index 00000000..3ab28916 --- /dev/null +++ b/custom_components/robovac/switch.py @@ -0,0 +1,118 @@ +"""Switch entities for RoboVac DPS-backed toggles (edge-hugging mop).""" + +from __future__ import annotations + +from typing import TYPE_CHECKING, Any, cast + +from homeassistant.components.switch import SwitchEntity +from homeassistant.config_entries import ConfigEntry +from homeassistant.const import CONF_ID, CONF_MODEL, CONF_NAME, EntityCategory +from homeassistant.core import HomeAssistant +from homeassistant.helpers.device_registry import DeviceInfo +from homeassistant.helpers.entity_platform import AddEntitiesCallback +from homeassistant.helpers.event import async_call_later + +from .const import CONF_VACS, DOMAIN +from .vacuums import ROBOVAC_MODELS +from .vacuums.base import RobovacCommand + +if TYPE_CHECKING: + from .vacuum import RoboVacEntity + + +def _device_info(item: dict) -> DeviceInfo: + return DeviceInfo( + identifiers={(DOMAIN, item[CONF_ID])}, + name=item[CONF_NAME], + ) + + +async def async_setup_entry( + hass: HomeAssistant, + config_entry: ConfigEntry, + async_add_entities: AddEntitiesCallback, +) -> None: + """Set up RoboVac switch entities.""" + vacuums = config_entry.data[CONF_VACS] + entities: list[SwitchEntity] = [] + + for key in vacuums: + item = vacuums[key] + model_prefix = (item.get(CONF_MODEL) or "")[:5] + model_class = ROBOVAC_MODELS.get(model_prefix) + if model_class is None: + continue + if not getattr(model_class, "expose_config_entities", False): + continue + commands = getattr(model_class, "commands", {}) + if RobovacCommand.CLEAN_PARAM not in commands: + continue + entities.append(RobovacEdgeHuggingMopSwitch(item)) + + async_add_entities(entities) + + +class RobovacEdgeHuggingMopSwitch(SwitchEntity): + """Toggle edge-hugging mop path (DPS 154 MopMode bit).""" + + _attr_has_entity_name = True + _attr_should_poll = True + _attr_entity_category = EntityCategory.CONFIG + _attr_icon = "mdi:radius-outline" + + @property + def is_on(self) -> bool | None: + """Bypass ToggleEntity.cached_property so _attr_is_on updates are visible.""" + return self._attr_is_on + + def __init__(self, item: dict) -> None: + self.robovac_id = item[CONF_ID] + self._attr_unique_id = f"{item[CONF_ID]}_edge_hugging_mop" + self._attr_name = "Edge mopping" + self._attr_device_info = _device_info(item) + + async def async_update(self) -> None: + try: + vacuum_entity: RoboVacEntity | None = self.hass.data[DOMAIN][CONF_VACS].get( + self.robovac_id + ) + except KeyError: + vacuum_entity = None + if not (vacuum_entity and vacuum_entity.vacuum is not None): + self._attr_available = False + return + self._attr_available = True + val = vacuum_entity.edge_hugging_mopping + self._attr_is_on = val + + async def async_turn_on(self, **kwargs: Any) -> None: + vacuum_entity = self._vacuum() + if vacuum_entity: + await vacuum_entity.async_set_clean_param(edge_hugging_mopping=True) + await self._refresh_after_write(True) + + async def async_turn_off(self, **kwargs: Any) -> None: + vacuum_entity = self._vacuum() + if vacuum_entity: + await vacuum_entity.async_set_clean_param(edge_hugging_mopping=False) + await self._refresh_after_write(False) + + async def _refresh_after_write(self, expected_state: bool) -> None: + self._attr_available = True + self._attr_is_on = expected_state + self.async_write_ha_state() + async_call_later(self.hass, 2, self._delayed_refresh_after_write) + + async def _delayed_refresh_after_write(self, _: Any) -> None: + """Refresh again after the device has had time to echo patched DPS 154.""" + await self.async_update() + self.async_write_ha_state() + + def _vacuum(self) -> RoboVacEntity | None: + try: + return cast( + "RoboVacEntity | None", + self.hass.data[DOMAIN][CONF_VACS].get(self.robovac_id), + ) + except KeyError: + return None diff --git a/custom_components/robovac/vacuum.py b/custom_components/robovac/vacuum.py index 7881687f..4bdb941c 100644 --- a/custom_components/robovac/vacuum.py +++ b/custom_components/robovac/vacuum.py @@ -27,6 +27,7 @@ from typing import Any, cast from homeassistant.components.vacuum import ( + Segment, StateVacuumEntity, VacuumActivity, VacuumEntityFeature, @@ -34,23 +35,39 @@ from homeassistant.config_entries import ConfigEntry from homeassistant.const import ( CONF_ACCESS_TOKEN, + CONF_CLIENT_ID, + CONF_COUNTRY_CODE, CONF_DESCRIPTION, CONF_ID, CONF_IP_ADDRESS, CONF_MAC, CONF_MODEL, CONF_NAME, + CONF_PASSWORD, + CONF_REGION, + CONF_TIME_ZONE, + CONF_USERNAME, ) from homeassistant.core import HomeAssistant +from homeassistant.exceptions import HomeAssistantError from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC from homeassistant.helpers.entity import DeviceInfo from homeassistant.helpers.entity_platform import AddEntitiesCallback from .const import CONF_VACS, DOMAIN, PING_RATE, REFRESH_RATE, TIMEOUT +from .eufywebapi import EufyLogon from .errors import getErrorMessage +from .proto_decode import ( + build_t2320_room_clean_mode, + decode_clean_param_response, + decode_t2320_room_meta, + merge_clean_param_layers, + patch_clean_param_dps154, +) from .vacuums.base import RobovacCommand, RoboVacEntityFeature, TuyaCodes, TUYA_CONSUMABLES_CODES from .robovac import ModelNotSupportedException, RoboVac from .tuyalocalapi import TuyaException +from .tuyawebapi import TuyaAPISession ATTR_BATTERY_ICON = "battery_icon" ATTR_ERROR = "error" @@ -66,6 +83,74 @@ ATTR_BOOST_IQ = "boost_iq" ATTR_CONSUMABLES = "consumables" ATTR_MODE = "mode" +ATTR_CLEAN_TYPE = "clean_type" +ATTR_CLEAN_TYPE_LABEL = "clean_type_label" +ATTR_MOP_LEVEL = "mop_level" +ATTR_EDGE_HUGGING_MOPPING = "edge_hugging_mopping" +ATTR_CLEAN_CARPET = "clean_carpet" +ATTR_ROOM_NAMES = "room_names" +ATTR_ROOMS = "rooms" +ATTR_SEGMENTS = "segments" + +_CLEAN_TYPE_LABELS = { + "sweep_only": "Sweep only", + "mop_only": "Mop only", + "sweep_and_mop": "Vacuum and mop", + "sweep_then_mop": "Vacuum then mop", +} + + +def _clean_type_label(clean_type: str | None) -> str | None: + if not clean_type: + return None + if clean_type in _CLEAN_TYPE_LABELS: + return _CLEAN_TYPE_LABELS[clean_type] + return clean_type.replace("_", " ").title() + + +def _lookup_activity( + mapping: dict[str, VacuumActivity], state: Any +) -> VacuumActivity | None: + """Map Tuya human-readable status to VacuumActivity; keys may differ by case.""" + s = str(state) + if s in mapping: + return mapping[s] + folded = s.casefold() + for key, activity in mapping.items(): + if str(key).casefold() == folded: + return activity + return None + + +def _activity_from_mode(mode: str | None) -> VacuumActivity | None: + """Map decoded mode DPS to VacuumActivity when status DPS is idle/station-only.""" + if not mode: + return None + normalized = str(mode).casefold() + if normalized in {"auto", "cleaning"}: + return VacuumActivity.CLEANING + if normalized in {"pause", "paused"}: + return VacuumActivity.PAUSED + if normalized in {"return", "returning", "docking"}: + return VacuumActivity.RETURNING + if normalized in {"standby", "stop", "idle"}: + return VacuumActivity.IDLE + return None + + +def _activity_from_return_progress(progress: str | None) -> VacuumActivity | None: + """Map decoded return/dock progress to VacuumActivity.""" + if not progress: + return None + normalized = str(progress).casefold() + if normalized in {"docked", "charging"}: + return VacuumActivity.DOCKED + if normalized in {"cleaning", "auto"}: + return VacuumActivity.CLEANING + if normalized in {"returning", "return"}: + return VacuumActivity.RETURNING + return None + _LOGGER = logging.getLogger(__name__) SCAN_INTERVAL = timedelta(seconds=REFRESH_RATE) @@ -84,7 +169,17 @@ async def async_setup_entry( """Initialize my test integration 2 config entry.""" vacuums = config_entry.data[CONF_VACS] for item in vacuums: - item = vacuums[item] + item = dict(vacuums[item]) + for key in ( + CONF_USERNAME, + CONF_PASSWORD, + CONF_CLIENT_ID, + CONF_REGION, + CONF_COUNTRY_CODE, + CONF_TIME_ZONE, + ): + if key in config_entry.data: + item[key] = config_entry.data[key] entity = RoboVacEntity(item) hass.data[DOMAIN][CONF_VACS][item[CONF_ID]] = entity async_add_entities([entity]) @@ -118,6 +213,8 @@ class RoboVacEntity(StateVacuumEntity): _attr_activity_mapping: dict[str, VacuumActivity] | None = None _attr_error_code: int | str | None = None _attr_tuya_state: int | str | None = None + _attr_room_names: dict[str, dict[str, Any]] | None = None + _attr_room_map_id: int | None = None @property def robovac_supported(self) -> int | None: @@ -310,27 +407,76 @@ def activity(self) -> VacuumActivity | None: As of Home Assistant Core 2025.1, this property should be used instead of directly setting the state property. """ - if self._attr_tuya_state is None or self._attr_tuya_state == 0: - # 0 is a default set when we don't have a state - return None - elif ( - self.error_code is not None - and self.error_code not in [0, "no_error", "No error"] + mode_activity = _activity_from_mode(self._attr_mode) + return_progress_activity = self._return_progress_activity() + if ( + return_progress_activity == VacuumActivity.RETURNING + and mode_activity not in (None, VacuumActivity.RETURNING) ): + return_progress_activity = None + if self.error_code is not None and self.error_code not in [0, "no_error", "No error"]: _LOGGER.debug( "State changed to error. Error message: {}".format( getErrorMessage(self.error_code) ) ) return VacuumActivity.ERROR + if return_progress_activity == VacuumActivity.DOCKED: + return return_progress_activity + if self._attr_tuya_state is None or self._attr_tuya_state == 0: + if return_progress_activity is not None: + _LOGGER.debug( + "Using return progress activity %s without status state", + return_progress_activity, + ) + return return_progress_activity + if mode_activity is not None: + _LOGGER.debug("Using mode activity %s without status state", mode_activity) + return mode_activity + fallback_activity = self._fallback_state_from_partial_dps() + if fallback_activity == VacuumActivity.IDLE: + return fallback_activity + # 0 is a default set when we don't have a state + return None elif self._attr_tuya_state in VACUUM_ACTIVITY_VALUES: + if return_progress_activity is not None: + _LOGGER.debug( + "Using return progress activity %s over activity state %s", + return_progress_activity, + self._attr_tuya_state, + ) + return return_progress_activity + if self._attr_tuya_state == VacuumActivity.IDLE and mode_activity not in ( + None, + VacuumActivity.IDLE, + ): + _LOGGER.debug( + "Using mode activity %s over idle activity state", + mode_activity, + ) + return mode_activity # Particularly at system startup, the state may be set to a # VacuumActivity value directly, so we can return it as is. return cast(VacuumActivity, self._attr_tuya_state) elif self.activity_mapping is not None: # Use the activity mapping from the model details - activity = self.activity_mapping.get(str(self._attr_tuya_state)) + activity = _lookup_activity(self.activity_mapping, self._attr_tuya_state) + mode_activity = _activity_from_mode(self._attr_mode) + if return_progress_activity is not None: + _LOGGER.debug( + "Using return progress activity %s over status %s", + return_progress_activity, + self._attr_tuya_state, + ) + return return_progress_activity + if activity == VacuumActivity.IDLE and mode_activity not in (None, VacuumActivity.IDLE): + _LOGGER.debug( + "Using mode activity %s over idle status %s", + mode_activity, + self._attr_tuya_state, + ) + return mode_activity if activity is not None: _LOGGER.debug( "Used activity mapping, changing status %s to activity %s", @@ -359,6 +505,16 @@ def activity(self) -> VacuumActivity | None: ) return VacuumActivity.CLEANING + def _return_progress_activity(self) -> VacuumActivity | None: + """Return activity from models that expose return/dock progress on RETURN_HOME DPS.""" + if self.tuyastatus is None or self.vacuum is None: + return None + raw = self.tuyastatus.get(self.get_dps_code("RETURN_HOME")) + if raw is None or isinstance(raw, bool): + return None + progress = self.vacuum.getRoboVacHumanReadableValue(RobovacCommand.RETURN_HOME, raw) + return _activity_from_return_progress(progress) + @property def extra_state_attributes(self) -> dict[str, Any]: """Return the device-specific state attributes of this vacuum.""" @@ -404,6 +560,29 @@ def extra_state_attributes(self) -> dict[str, Any]: data[ATTR_CONSUMABLES] = self.consumables if self.mode: data[ATTR_MODE] = self.mode + if self._attr_clean_type is not None: + data[ATTR_CLEAN_TYPE] = self._attr_clean_type + if self._attr_clean_type_label is not None: + data[ATTR_CLEAN_TYPE_LABEL] = self._attr_clean_type_label + if self._attr_mop_level is not None: + data[ATTR_MOP_LEVEL] = self._attr_mop_level + if self._attr_edge_hugging_mopping is not None: + data[ATTR_EDGE_HUGGING_MOPPING] = self._attr_edge_hugging_mopping + if self._attr_clean_carpet is not None: + data[ATTR_CLEAN_CARPET] = self._attr_clean_carpet + if self._attr_room_names: + data[ATTR_ROOM_NAMES] = self._attr_room_names + data[ATTR_ROOMS] = { + key: value["label"] + for key, value in self._attr_room_names.items() + if isinstance(value.get("label"), str) + } + data[ATTR_SEGMENTS] = [ + {"id": value.get("id", key), "name": value.get("label", key)} + for key, value in self._attr_room_names.items() + ] + if self._attr_room_map_id is not None: + data["room_map_id"] = self._attr_room_map_id return data def __init__(self, item: dict[str, Any]) -> None: @@ -437,6 +616,14 @@ def __init__(self, item: dict[str, Any]) -> None: self._consumables_codes_cache: list[str] | None = None self._dps_codes_memo: dict[str, str] = {} self._last_consumable_data: str | None = None + self._room_name_registry: dict[str, dict[str, Any]] = {} + self._eufy_username: str | None = item.get(CONF_USERNAME) + self._eufy_password: str | None = item.get(CONF_PASSWORD) + self._eufy_client_id: str | None = item.get(CONF_CLIENT_ID) + self._eufy_region: str | None = item.get(CONF_REGION) + self._eufy_country_code: str | None = item.get(CONF_COUNTRY_CODE) + self._eufy_time_zone: str | None = item.get(CONF_TIME_ZONE) + self._cloud_room_lookup_attempted = False # Initialize the RoboVac connection try: @@ -494,6 +681,13 @@ def __init__(self, item: dict[str, Any]) -> None: # Initialize additional attributes self._attr_mode = None self._attr_consumables = None + self._attr_clean_type: str | None = None + self._attr_clean_type_label: str | None = None + self._attr_mop_level: str | None = None + self._attr_edge_hugging_mopping: bool | None = None + self._attr_clean_carpet: str | None = None + self._attr_room_names = None + self._attr_room_map_id = None # Set up device info for Home Assistant device registry self._attr_device_info = DeviceInfo( @@ -522,6 +716,9 @@ async def async_update(self) -> None: _LOGGER.debug("Skipping update for unsupported model: %s", self._attr_model_code) return + if self._supports_t2320_rooms() and not self._attr_room_names: + await self._async_fetch_t2320_rooms_from_cloud_once() + # Skip update if the IP address is not set if not self.ip_address: _LOGGER.warning("Cannot update vacuum %s: IP address not set", self._attr_name) @@ -604,9 +801,11 @@ def update_entity_values(self) -> None: # Update common attributes for all models self._update_state_and_error() self._update_mode_and_fan_speed() + self._update_clean_param_attributes() # Update model-specific attributes self._update_cleaning_stats() + self._update_room_names_from_device_payload() def get_dps_code(self, code_name: str | TuyaCodes) -> str: """Get the DPS code for a specific function. @@ -722,6 +921,40 @@ def _update_state_and_error(self) -> None: else: self._attr_error_code = 0 + def _update_clean_param_attributes(self) -> None: + """Decode DPS 154 (clean params) for vacuum card / automations.""" + if self.tuyastatus is None or self.vacuum is None: + return + if RobovacCommand.CLEAN_PARAM not in self.vacuum.getSupportedCommands(): + self._attr_clean_type = None + self._attr_clean_type_label = None + self._attr_mop_level = None + self._attr_edge_hugging_mopping = None + self._attr_clean_carpet = None + return + + raw = self.tuyastatus.get(self.get_dps_code("CLEAN_PARAM")) + if raw is None or raw == "": + return + + try: + raw_str = raw if isinstance(raw, str) else str(raw) + decoded = decode_clean_param_response(raw_str) + params = merge_clean_param_layers(decoded) + clean_type = params.get("clean_type") + if clean_type is None: + return + self._attr_clean_type = str(clean_type) + self._attr_clean_type_label = _clean_type_label(str(clean_type)) + if "mop_level" in params: + self._attr_mop_level = str(params["mop_level"]) + if "edge_hugging_mopping" in params: + self._attr_edge_hugging_mopping = bool(params["edge_hugging_mopping"]) + if "clean_carpet" in params: + self._attr_clean_carpet = str(params["clean_carpet"]) + except Exception as ex: + _LOGGER.debug("Clean param decode failed for %s: %s", self.name, ex) + def _fallback_state_from_partial_dps(self) -> VacuumActivity | int: """Infer a usable state from partial model DPS returned after startup.""" if self.tuyastatus is None: @@ -776,7 +1009,9 @@ def _update_mode_and_fan_speed(self) -> None: elif self.fan_speed == "Boost_IQ": self._attr_fan_speed = "Boost IQ" elif self.fan_speed == "Quiet": - self._attr_fan_speed = "Pure" + self._attr_fan_speed = ( + "Pure" if "Pure" in self._attr_fan_speed_list else "Quiet" + ) def _update_cleaning_stats(self) -> None: """Update cleaning statistics and settings attributes. @@ -841,6 +1076,161 @@ def _update_cleaning_stats(self) -> None: except Exception as e: _LOGGER.warning("Failed to decode consumable data: %s", str(e)) + def _supports_t2320_rooms(self) -> bool: + return bool(self.model_code and str(self.model_code).startswith("T2320")) + + def _merge_room_meta(self, meta: dict[str, Any], source: str) -> bool: + """Merge decoded T2320 room metadata into exported attributes.""" + changed = False + map_id = meta.get("map_id") + if isinstance(map_id, int) and map_id != self._attr_room_map_id: + self._attr_room_map_id = map_id + changed = True + + rooms = meta.get("rooms") + if not isinstance(rooms, list): + rooms = [] + for room in rooms: + if not isinstance(room, dict): + continue + room_id = room.get("id") + if room_id is None: + continue + label = str(room.get("label") or room_id) + key = str(room_id) + entry = {"id": room_id, "key": key, "label": label, "source": source} + if self._room_name_registry.get(key) != entry: + self._room_name_registry[key] = entry + changed = True + + if changed: + self._attr_room_names = { + key: self._room_name_registry[key] + for key in sorted(self._room_name_registry, key=lambda item: int(item) if item.isdigit() else item) + } + return changed + + def _update_room_names_from_device_payload(self) -> None: + """Update T2320 room names from local DPS 165 when it is present.""" + if not self._supports_t2320_rooms() or self.tuyastatus is None: + return + raw = self.tuyastatus.get(self.get_dps_code("ROOM_META")) + if not raw: + return + try: + self._merge_room_meta(decode_t2320_room_meta(str(raw)), "device") + except Exception as ex: + _LOGGER.debug("T2320 room metadata decode failed for %s: %s", self.name, ex) + + def _build_tuya_session_sync(self) -> TuyaAPISession | None: + """Authenticate to Tuya cloud using stored Eufy credentials.""" + if not self._eufy_username or not self._eufy_password: + return None + + client_id = self._eufy_client_id + region = self._eufy_region or "EU" + country_code = self._eufy_country_code or "44" + time_zone = self._eufy_time_zone or "Europe/London" + + if not client_id: + eufy_session = EufyLogon(self._eufy_username, self._eufy_password) + response = eufy_session.get_user_info() + if response is None or response.status_code != 200: + return None + user_response = response.json() + if user_response.get("res_code") != 1: + return None + user_info = user_response.get("user_info", {}) + client_id = user_info.get("id") + region = self._eufy_region or region + country_code = user_info.get("phone_code") or country_code + time_zone = user_info.get("timezone") or time_zone + request_host = user_info.get("request_host") + access_token = user_response.get("access_token") + if request_host and client_id and access_token: + settings_response = eufy_session.get_user_settings( + request_host, client_id, access_token + ) + if settings_response is not None and settings_response.status_code == 200: + settings = settings_response.json() + region = ( + settings.get("setting", {}) + .get("home_setting", {}) + .get("tuya_home", {}) + .get("tuya_region_code", region) + ) + + if not client_id: + return None + return TuyaAPISession( + username=f"eh-{client_id}", + region=region, + timezone=time_zone, + phone_code=country_code, + ) + + def _fetch_t2320_dps_from_cloud_sync(self) -> dict[str, Any]: + session = self._build_tuya_session_sync() + if session is None: + return {} + try: + return session._request( + action="tuya.m.device.dp.get", + version="1.0", + data={"devId": str(self.unique_id)}, + ) + except Exception as ex: + _LOGGER.debug("T2320 cloud DPS fetch failed for %s: %s", self.name, ex) + return {} + + @staticmethod + def _cloud_dps_map(response: dict[str, Any]) -> dict[str, Any]: + """Return a Tuya DPS map from either flat or {'dps': {...}} responses.""" + nested = response.get("dps") + return nested if isinstance(nested, dict) else response + + def _fetch_t2320_rooms_from_cloud_sync(self) -> dict[str, Any]: + dps = self._cloud_dps_map(self._fetch_t2320_dps_from_cloud_sync()) + raw = dps.get(self.get_dps_code("ROOM_META")) or dps.get("165") + return decode_t2320_room_meta(str(raw)) if raw else {"map_id": None, "rooms": []} + + async def _async_fetch_t2320_rooms_from_cloud_once(self) -> None: + if self._cloud_room_lookup_attempted or self.hass is None: + return + self._cloud_room_lookup_attempted = True + meta = await self.hass.async_add_executor_job( + self._fetch_t2320_rooms_from_cloud_sync + ) + if self._merge_room_meta(meta, "cloud"): + self.async_write_ha_state() + + def _t2320_room_id_for_label(self, room_label: str) -> int | None: + label = room_label.casefold() + for entry in self._room_name_registry.values(): + if str(entry.get("label", "")).casefold() == label: + room_id = entry.get("id") + room_id_str = str(room_id) + return int(room_id_str) if room_id_str.isdigit() else None + return None + + async def async_get_segments(self) -> list[Segment]: + """Return known T2320 room segments for HA callers that support it.""" + if not self._attr_room_names: + await self._async_fetch_t2320_rooms_from_cloud_once() + if not self._attr_room_names: + return [] + return [ + Segment(id=str(entry["id"]), name=str(entry["label"])) + for entry in self._attr_room_names.values() + ] + + async def async_clean_segments(self, segment_ids: list[str], **kwargs: Any) -> None: + """Clean selected T2320 room segments.""" + await self.async_send_command( + "roomClean", + {"roomIds": segment_ids, "count": kwargs.get("count", 1)}, + ) + async def async_locate(self, **kwargs: Any) -> None: """Locate the vacuum cleaner. @@ -873,6 +1263,10 @@ async def async_return_to_base(self, **kwargs: Any) -> None: self.get_dps_code("RETURN_HOME"): self.vacuum.getRoboVacCommandValue(RobovacCommand.RETURN_HOME, "return") } + mode_return_value = self.vacuum.getRoboVacCommandValue(RobovacCommand.MODE, "return") + if mode_return_value != "return": + payload[self.get_dps_code("MODE")] = mode_return_value + # For models with boolean START_PAUSE (e.g. T2128, T2276), DPS 2 is the # execution trigger — without it, the device ACKs but doesn't physically act. start_value = self.vacuum.getRoboVacCommandValue(RobovacCommand.START_PAUSE, "start") @@ -913,9 +1307,15 @@ async def async_pause(self, **kwargs: Any) -> None: _LOGGER.error("Cannot pause vacuum: vacuum not initialized") return - await self.vacuum.async_set({ + payload: dict[str, Any] = { self.get_dps_code("START_PAUSE"): self.vacuum.getRoboVacCommandValue(RobovacCommand.START_PAUSE, "pause") - }) + } + + mode_pause_value = self.vacuum.getRoboVacCommandValue(RobovacCommand.MODE, "pause") + if mode_pause_value != "pause": + payload[self.get_dps_code("MODE")] = mode_pause_value + + await self.vacuum.async_set(payload) async def async_stop(self, **kwargs: Any) -> None: """Stop the vacuum cleaner. @@ -961,6 +1361,66 @@ async def async_set_fan_speed(self, fan_speed: str, **kwargs: Any) -> None: RobovacCommand.FAN_SPEED, normalized_fan_speed ) }) + self.update_entity_values() + if self.hass: + self.async_write_ha_state() + + @property + def clean_type(self) -> str | None: + """Decoded global clean type from DPS 154 (snake_case), if available.""" + return self._attr_clean_type + + @property + def mop_level(self) -> str | None: + """Decoded mop water level from DPS 154, if available.""" + return self._attr_mop_level + + @property + def edge_hugging_mopping(self) -> bool | None: + """Edge-hugging mop mode from DPS 154, if present in the last decode.""" + return self._attr_edge_hugging_mopping + + async def async_set_clean_param( + self, + *, + clean_type: str | None = None, + mop_level: str | None = None, + edge_hugging_mopping: bool | None = None, + ) -> None: + """Write DPS 154 by patching the current protobuf payload.""" + if self.vacuum is None: + raise HomeAssistantError("Vacuum not initialized") + if RobovacCommand.CLEAN_PARAM not in self.vacuum.getSupportedCommands(): + raise HomeAssistantError("Clean parameters are not supported on this model") + dps = self.get_dps_code("CLEAN_PARAM") + raw = self.tuyastatus.get(dps) if self.tuyastatus else None + if raw is None or raw == "": + raw = getattr(self.vacuum.model_details, "default_clean_param_dps154", None) + if raw is None or raw == "": + raise HomeAssistantError("Clean parameter DPS is empty; wait for the next poll") + raw_str = raw if isinstance(raw, str) else str(raw) + try: + new_b64 = patch_clean_param_dps154( + raw_str, + clean_type=clean_type, + mop_level=mop_level, + edge_hugging_mopping=edge_hugging_mopping, + ) + except ValueError as err: + raise HomeAssistantError(str(err)) from err + await self.vacuum.async_set({dps: new_b64}) + if self.tuyastatus is None: + self.tuyastatus = {} + self.tuyastatus[dps] = new_b64 + if hasattr(self.vacuum, "_dps"): + self.vacuum._dps[dps] = new_b64 + self.update_entity_values() + if self.hass: + self.async_write_ha_state() + + async def async_set_mop_level(self, mop_level: str) -> None: + """Set mop water level (low / middle / high) via DPS 154.""" + await self.async_set_clean_param(mop_level=mop_level) async def async_send_command( self, @@ -991,6 +1451,10 @@ async def async_send_command( } if command in mode_commands: + if command == "smallRoomClean" and self._supports_t2320_rooms(): + raise HomeAssistantError( + "T2320 selected-room cleaning requires the roomClean command" + ) command_data = self._get_mode_command_data(mode_commands[command]) if command_data: await self.vacuum.async_set(command_data) @@ -1012,25 +1476,75 @@ async def async_send_command( await self.vacuum.async_set({ self.get_dps_code("BOOST_IQ"): new_value }) - elif command in ("roomClean", "room_clean") and params is not None: - # HA may pass params as a list of single-key dicts. + elif command in ("roomClean", "room_clean", "app_segment_clean") and params is not None: if isinstance(params, list): - merged: dict[str, Any] = {} - for item in params: - if isinstance(item, dict): + # HA may pass params as a list of single-key dicts. + if all(isinstance(item, dict) for item in params): + merged: dict[str, Any] = {} + for item in params: merged.update(item) - params = merged - if not isinstance(params, dict): + params = merged + room_ids = params.get("roomIds") or params.get("room_ids", [1]) + count = params.get("count", 1) + else: + room_ids = params + count = 1 + elif isinstance(params, dict): + room_ids = params.get("roomIds") or params.get("room_ids", [1]) + count = params.get("count", 1) + else: _LOGGER.error("roomClean: unexpected params type %s", type(params).__name__) return + if self._supports_t2320_rooms(): + if not self._attr_room_names: + await self._async_fetch_t2320_rooms_from_cloud_once() + normalized_room_ids: list[int] = [] + for room_id in room_ids: + if isinstance(room_id, str) and not room_id.isdigit(): + resolved = self._t2320_room_id_for_label(room_id) + if resolved is None: + raise HomeAssistantError(f"Unknown room {room_id!r}") + normalized_room_ids.append(resolved) + else: + normalized_room_ids.append(int(room_id)) + try: + clean_times = max(1, int(count)) + except (TypeError, ValueError): + clean_times = 1 + map_id = self._attr_room_map_id + if map_id is None: + if self.hass is None: + raise HomeAssistantError("T2320 room map ID is unavailable") + dps = self._cloud_dps_map( + await self.hass.async_add_executor_job( + self._fetch_t2320_dps_from_cloud_sync + ) + ) + raw = dps.get(self.get_dps_code("ROOM_META")) or dps.get("165") + if raw: + self._merge_room_meta(decode_t2320_room_meta(str(raw)), "cloud") + map_id = self._attr_room_map_id + if map_id is None: + raise HomeAssistantError("T2320 room map ID is unavailable") + payload = build_t2320_room_clean_mode( + normalized_room_ids, + map_id=map_id, + clean_times=clean_times, + ) + _LOGGER.info( + "T2320 roomClean: rooms=%s map_id=%s payload=%s", + normalized_room_ids, + map_id, + payload, + ) + await self.vacuum.async_set({self.get_dps_code("MODE"): payload}) + return - room_ids = params.get("roomIds") or params.get("room_ids", [1]) - count = params.get("count", 1) mode_dps = self.get_dps_code("MODE") auto_val = self.vacuum.getRoboVacCommandValue(RobovacCommand.MODE, "auto") # Protobuf models (e.g. T2278) encode room IDs directly in a - # ModeCtrlRequest on the MODE DPS code. Legacy models use a + # ModeCtrlRequest on the MODE DPS code. Legacy models use a # JSON payload on DPS 124 followed by a start command on DPS 2. if auto_val not in ("auto", "Auto") and mode_dps != TuyaCodes.ROOM_CLEAN: proto_cmd = self._build_protobuf_room_clean(room_ids, count) @@ -1047,6 +1561,9 @@ async def async_send_command( base64_str = base64.b64encode(json_str.encode("utf8")).decode("utf8") _LOGGER.debug("roomClean JSON: %s", json_str) await self.vacuum.async_set({TuyaCodes.ROOM_CLEAN: base64_str}) + # Wait for the vacuum to ACK DPS 124 before sending the start command. + # Without this delay, DPS 2 arrives before the room selection is processed + # and the vacuum ignores the start command. await asyncio.sleep(1) await self.vacuum.async_set({TuyaCodes.START_PAUSE: True}) diff --git a/custom_components/robovac/vacuums/T2320.py b/custom_components/robovac/vacuums/T2320.py index df5c6e71..646c702a 100644 --- a/custom_components/robovac/vacuums/T2320.py +++ b/custom_components/robovac/vacuums/T2320.py @@ -1,9 +1,31 @@ -"""Eufy Robot Vacuum and Mop X9 Pro with Auto-Clean Station (T2320)""" -from homeassistant.components.vacuum import VacuumEntityFeature +"""Eufy Robot Vacuum and Mop X9 Pro with Auto-Clean Station (T2320). + +Model-specific DPS codes, activity mapping for station states, and decode_dps +for base64/protobuf payloads on this hardware variant. +""" +import base64 +from typing import Any + +from homeassistant.components.vacuum import VacuumActivity, VacuumEntityFeature from .base import RoboVacEntityFeature, RobovacCommand, RobovacModelDetails class T2320(RobovacModelDetails): + # X9 Pro firmware maps unsupported "sweep_then_mop" to "sweep_and_mop"; expose only real modes. + expose_config_entities = True + clean_type_select_keys = ("sweep_only", "mop_only", "sweep_and_mop") + default_clean_param_dps154 = "JgoOCgIIAhIAGgAiAggCKgASABoAIhAKAggCGgAiAggCKgAyAggB" + warning_dps_code = 177 + expose_room_select = True + consumable_sensor_keys = ( + "side_brush", + "rolling_brush", + "filter_mesh", + "scrape", + "sensor", + "mop", + ) + homeassistant_features = ( VacuumEntityFeature.FAN_SPEED | VacuumEntityFeature.LOCATE @@ -18,22 +40,44 @@ class T2320(RobovacModelDetails): RoboVacEntityFeature.DO_NOT_DISTURB | RoboVacEntityFeature.BOOST_IQ ) + + # ── Activity mapping for HA vacuum state ────────────────────────── + activity_mapping = { + "standby": VacuumActivity.IDLE, + "idle": VacuumActivity.IDLE, + "auto": VacuumActivity.CLEANING, + "cleaning": VacuumActivity.CLEANING, + "pause": VacuumActivity.PAUSED, + "paused": VacuumActivity.PAUSED, + "return": VacuumActivity.RETURNING, + "returning": VacuumActivity.RETURNING, + "docking": VacuumActivity.RETURNING, + "charging": VacuumActivity.DOCKED, + "docked": VacuumActivity.DOCKED, + "washing": VacuumActivity.DOCKED, + "drying": VacuumActivity.DOCKED, + "removing scale": VacuumActivity.DOCKED, + "error": VacuumActivity.ERROR, + } + + # ── Command definitions ─────────────────────────────────────────── commands = { RobovacCommand.START_PAUSE: { "code": 2, "values": { "start": True, - "pause": False + "pause": False, }, }, RobovacCommand.MODE: { "code": 152, "values": { - "auto": "auto", - "return": "return", - "pause": "pause", - "small_room": "small_room", - "single_room": "single_room" + "auto": "BBoCCAE=", + "return": "AggG", + "pause": "AggN", + "standby": "AA==", + "stop": "AggM", + "resume": "AggO", }, }, RobovacCommand.STATUS: { @@ -42,28 +86,372 @@ class T2320(RobovacModelDetails): RobovacCommand.RETURN_HOME: { "code": 153, "values": { - "return_home": True - } + "return_home": True, + "return": True, + }, }, - RobovacCommand.FAN_SPEED: { + # DPS 154 — CleanParamResponse (sweep/mop type, mop level, etc.). Separate from + # FAN_SPEED on 158; enables RobovacCleanTypeSensor on the sensor platform. + RobovacCommand.CLEAN_PARAM: { "code": 154, + }, + RobovacCommand.FAN_SPEED: { + "code": 158, "values": { - "Standard": "standard", - "Boost IQ": "boost_iq", - "Max": "max", - "Quiet": "Quiet", + "standard": "Standard", + "turbo": "Turbo", + "max": "Max", + "quiet": "Quiet", }, }, RobovacCommand.LOCATE: { - "code": 153, + "code": 160, "values": { - "locate": True - } + "locate": True, + }, }, RobovacCommand.BATTERY: { - "code": 172, + "code": 163, + }, + RobovacCommand.CONSUMABLES: { + "code": 168, }, RobovacCommand.ERROR: { - "code": 169, + "code": 177, + }, + RobovacCommand.ACTIVE_ERRORS: { + "code": 178, }, } + dps_codes = { + "ROOM_META": "165", + } + + # ── DPS 152 base64 mode detection ───────────────────────────────── + _MODE_BASE64 = { + "AA==": "standby", + "AggN": "pause", + "AggM": "stop", + "AggG": "return", + "BBoCCAE=": "auto", + "AggO": "auto", # resume + } + + # ── DPS 173 station status detection ────────────────────────────── + _STATION_KEYWORDS = { + "WASHING": "washing", + "DRYING": "drying", + "REMOVING_SCALE": "removing scale", + } + _STATION_CODES = { + 45: "washing", + 76: "drying", + } + + # ── DPS 177 error/warning codes ─────────────────────────────────── + # T2320 ErrorCodeList/PromptCodeList labels adapted from + # `error_code_list_t2320.proto` in martijnpoppen/eufy-clean, + # copyright (c) Martijn Poppen: + # https://github.com/martijnpoppen/eufy-clean + # + # The same file was reviewed through the jeppesens/eufy-clean fork + # history and the GijsKruize/eufy-clean renamed path: + # custom_components/robovac_mqtt/proto/cloud/error_code_list_t2320.proto + # + # Eufy-Clean License, Version 1.0 - 2024-09-01, permits use, copy, + # modification, merge, publication, distribution, sublicensing, and sale + # with attribution. These enum names/comments were translated into + # robovac's human-readable message style. + _ERROR_CODES = { + 1: "Crash buffer stuck", + 2: "Wheel stuck", + 3: "Side brush stuck", + 4: "Rolling brush stuck", + 5: "Robot trapped, clear surrounding obstacles", + 6: "Robot trapped, move it near the starting point", + 7: "Wheel overhanging", + 8: "Power too low, shutting down", + 13: "Robot tilted", + 14: "Dust box or filter missing", + 17: "Forbidden area detected", + 18: "Laser cover stuck", + 19: "Laser sensor stuck or tangled", + 20: "Laser sensor may be blocked", + 21: "Docking failed", + 26: "Low battery, scheduled cleaning failed", + 31: "Foreign object stuck in suction port", + 32: "Mop holder rotation motor stuck", + 33: "Mop holder lift motor stuck", + 39: "Positioning failed, ending cleaning", + 40: "Mop cloth dislodged", + 41: "Air-drying heater abnormal", + 50: "Robot mistakenly on carpet", + 51: "Camera blocked", + 52: "Unable to leave station", + 55: "Base station exploration failed", + 70: "Clean dust box and filter", + 71: "Wall sensor abnormal", + 72: "Robot water tank low", + 73: "Dirty water tank full", + 74: "Clean water tank low", + 75: "Water tank missing", + 76: "Camera abnormal", + 77: "3D ToF sensor abnormal", + 78: "Ultrasonic sensor abnormal", + 79: "Clean tray not installed", + 80: "Robot and station communication abnormal", + 81: "Sewage tank air leak", + 82: "Clean tray needs cleaning", + 83: "Poor charging contact", + 101: "Battery abnormal", + 102: "Wheel module abnormal", + 103: "Side brush module abnormal", + 104: "Fan abnormal", + 105: "Rolling brush motor abnormal", + 106: "Robot water pump abnormal", + 107: "Laser sensor abnormal", + 111: "Rotation motor abnormal", + 112: "Lift motor abnormal", + 113: "Water spraying device abnormal", + 114: "Water pumping device abnormal", + 117: "Ultrasonic sensor abnormal", + 119: "Wi-Fi or Bluetooth abnormal", + } + _PROMPT_CODES = { + 1: "Start scheduled cleaning", + 3: "Low battery, returning to base station immediately", + 4: "Positioning failed, rebuilding map and starting new cleaning", + 5: "Positioning failed, mission ended, returning to base station", + 6: "Some areas were not cleaned because they are unreachable", + 7: "Path planning failed, cannot reach the designated area", + 9: "Base station exploration failed, robot returned to starting point", + 10: "Positioning successful", + 11: "Task finished, returning to base station", + 12: "Cannot start task while on station", + 13: "Scheduled cleaning failed because robot is working", + 14: "Map data updating, try again later", + 15: "Finished washing mop, resuming task", + 16: "Low battery, charge and try again", + 17: "Mop cleaning completed", + } + + @classmethod + def decode_warning_dps(cls, raw_value: str) -> list[dict[str, int | str]]: + """Decode DPS 177 warning fields into warning code/message pairs.""" + if not raw_value: + return [] + try: + from custom_components.robovac.proto_decode import ( + _decode_packed_varints, + _parse_proto, + _strip_length_prefix, + ) + + fields = _parse_proto(_strip_length_prefix(raw_value)) + codes: set[int] = set() + + def collect(field_value: Any) -> None: + if field_value is None: + return + if isinstance(field_value, list): + for item in field_value: + collect(item) + elif isinstance(field_value, int): + codes.add(field_value) + elif isinstance(field_value, bytes): + codes.update(_decode_packed_varints(field_value)) + + collect(fields.get(3)) + + new_code = fields.get(10) + if isinstance(new_code, bytes): + new_code_fields = _parse_proto(new_code) + collect(new_code_fields.get(2)) + + codes.discard(0) + return [ + { + "code": warning_code, + "message": cls._ERROR_CODES.get( + warning_code, f"warning_{warning_code}" + ), + } + for warning_code in sorted(codes) + ] + except Exception: + return [] + + # ── Custom DPS decoder ──────────────────────────────────────────── + @classmethod + def decode_dps(cls, dps_code: str, raw_value: str) -> str | None: + """Decode base64/protobuf DPS payloads into human-readable strings.""" + if not raw_value: + return None + + code = str(dps_code) + + # DPS 152 — mode/activity (base64 encoded) + if code == "152": + decoded = cls._MODE_BASE64.get(raw_value) + if decoded: + return decoded + try: + base64.b64decode(raw_value, validate=True) + return f"mode:{raw_value}" + except Exception: + return raw_value + + # DPS 153 — return/dock progress. X9 leaves DPS 152 as "return" after it + # reaches the dock, so this payload is needed to distinguish returning + # from already docked. + if code == "153": + try: + from custom_components.robovac.proto_decode import ( + _as_varint, + _parse_proto, + _strip_length_prefix, + ) + + fields = _parse_proto(_strip_length_prefix(raw_value)) + dock_state = fields.get(7) + if isinstance(dock_state, bytes): + dock_fields = _parse_proto(dock_state) + progress = _as_varint(dock_fields.get(2)) + if progress == 1: + if isinstance(fields.get(6), bytes) or isinstance( + fields.get(14), bytes + ): + return "docked" + return "returning" + if progress == 2: + return "docked" + state = _as_varint(fields.get(2)) + if state == 7: + return "returning" + if state == 3: + return "docked" + if state == 5: + return "cleaning" + active_state = fields.get(6) + if isinstance(active_state, bytes) and not active_state: + return "cleaning" + except Exception: + pass + return None + + # DPS 173 — station status + if code == "173": + raw_bytes = b"" + try: + from custom_components.robovac.proto_decode import ( + _parse_proto, + _strip_length_prefix, + ) + + raw_bytes = base64.b64decode(raw_value, validate=True) + upper = raw_bytes.decode("utf-8", errors="ignore").upper() + for keyword, label in cls._STATION_KEYWORDS.items(): + if keyword in upper: + return label + + fields = _parse_proto(_strip_length_prefix(raw_value)) + station_bytes = fields.get(5) + if isinstance(station_bytes, bytes): + station_fields = _parse_proto(station_bytes) + station_code = station_fields.get(1) + station_label = ( + cls._STATION_CODES.get(station_code) + if isinstance(station_code, int) + else None + ) + if station_label: + return station_label + except Exception: + upper = raw_bytes.decode("utf-8", errors="ignore").upper() + for keyword, label in cls._STATION_KEYWORDS.items(): + if keyword in upper: + return label + return "idle" + + # DPS 177 — error/warning protobuf + if code == "177": + try: + from custom_components.robovac.proto_decode import ( + _decode_packed_varints, + _parse_proto, + _strip_length_prefix, + ) + + fields = _parse_proto(_strip_length_prefix(raw_value)) + codes: set[int] = set() + + def collect(field_value: Any) -> None: + if field_value is None: + return + if isinstance(field_value, list): + for item in field_value: + collect(item) + elif isinstance(field_value, int): + codes.add(field_value) + elif isinstance(field_value, bytes): + codes.update(_decode_packed_varints(field_value)) + + # Only field 2 is an active error list. Field 3 carries warnings, + # and on the X9 mop-wash station notifications are warning-only. + collect(fields.get(2)) + + new_code = fields.get(10) + if isinstance(new_code, bytes): + new_code_fields = _parse_proto(new_code) + collect(new_code_fields.get(1)) + + codes.discard(0) + if not codes: + return "no_error" + + return "; ".join( + cls._ERROR_CODES.get(error_code, f"error_{error_code}") + for error_code in sorted(codes) + ) + except Exception: + pass + return raw_value + + # DPS 178 — prompt/notification protobuf + if code == "178": + try: + from custom_components.robovac.proto_decode import ( + _decode_packed_varints as _decode_prompt_packed_varints, + _parse_proto, + _strip_length_prefix, + ) + + fields = _parse_proto(_strip_length_prefix(raw_value)) + prompt_codes: set[int] = set() + + def collect_prompt(field_value: Any) -> None: + if field_value is None: + return + if isinstance(field_value, list): + for item in field_value: + collect_prompt(item) + elif isinstance(field_value, int): + prompt_codes.add(field_value) + elif isinstance(field_value, bytes): + prompt_codes.update(_decode_prompt_packed_varints(field_value)) + + collect_prompt(fields.get(2)) + + prompt_codes.discard(0) + if not prompt_codes: + return "no_error" + + return "; ".join( + cls._PROMPT_CODES.get(prompt_code, f"prompt_{prompt_code}") + for prompt_code in sorted(prompt_codes) + ) + except Exception: + pass + return raw_value + + return None diff --git a/tests/test_vacuum/test_config_entities.py b/tests/test_vacuum/test_config_entities.py new file mode 100644 index 00000000..752dc231 --- /dev/null +++ b/tests/test_vacuum/test_config_entities.py @@ -0,0 +1,97 @@ +"""Tests for RoboVac configuration entities.""" + +from types import SimpleNamespace + +import pytest + +from homeassistant.const import ( + CONF_ACCESS_TOKEN, + CONF_DESCRIPTION, + CONF_ID, + CONF_IP_ADDRESS, + CONF_MAC, + CONF_MODEL, + CONF_NAME, +) + +from custom_components.robovac.const import CONF_VACS +from custom_components.robovac.select import async_setup_entry as async_setup_select_entry +from custom_components.robovac.sensor import async_setup_entry as async_setup_sensor_entry +from custom_components.robovac.switch import async_setup_entry as async_setup_switch_entry + + +def _vacuum_config(model: str) -> dict[str, str]: + return { + CONF_NAME: "Test Vacuum", + CONF_ID: f"test_{model.lower()}", + CONF_MODEL: model, + CONF_IP_ADDRESS: "192.168.1.100", + CONF_ACCESS_TOKEN: "test_key", + CONF_DESCRIPTION: model, + CONF_MAC: "aa:bb:cc:dd:ee:ff", + } + + +@pytest.mark.asyncio +async def test_t2320_exposes_config_entities() -> None: + """T2320 exposes X9 Pro clean-param and fan configuration controls.""" + entry = SimpleNamespace(data={CONF_VACS: {"vacuum": _vacuum_config("T2320")}}) + select_entities = [] + switch_entities = [] + + await async_setup_select_entry(None, entry, select_entities.extend) + await async_setup_switch_entry(None, entry, switch_entities.extend) + + assert [entity.name for entity in select_entities] == [ + "Fan speed", + "Clean type", + "Mop level", + "Room", + ] + assert [entity.name for entity in switch_entities] == ["Edge mopping"] + + +@pytest.mark.asyncio +async def test_other_clean_param_models_do_not_get_t2320_config_entities() -> None: + """Avoid exposing X9-specific config controls on other clean-param models.""" + entry = SimpleNamespace(data={CONF_VACS: {"vacuum": _vacuum_config("T2277")}}) + select_entities = [] + switch_entities = [] + + await async_setup_select_entry(None, entry, select_entities.extend) + await async_setup_switch_entry(None, entry, switch_entities.extend) + + assert select_entities == [] + assert switch_entities == [] + + +@pytest.mark.asyncio +async def test_t2320_does_not_duplicate_clean_type_as_diagnostic_sensor() -> None: + """T2320 exposes clean type as configuration, not a duplicate sensor.""" + entry = SimpleNamespace(data={CONF_VACS: {"vacuum": _vacuum_config("T2320")}}) + sensor_entities = [] + + await async_setup_sensor_entry(None, entry, sensor_entities.extend) + + sensor_names = [entity.name for entity in sensor_entities] + assert "Clean Type" not in sensor_names + assert "Notification" in sensor_names + assert "Warning" in sensor_names + assert "Side Brush" in sensor_names + assert "Rolling Brush" in sensor_names + assert "Filter" in sensor_names + assert "Scraper" in sensor_names + assert "Sensor" in sensor_names + assert "Mop" in sensor_names + assert "Dust Bag" not in sensor_names + + +@pytest.mark.asyncio +async def test_other_clean_param_models_keep_clean_type_diagnostic_sensor() -> None: + """Models without config controls still expose DPS 154 as diagnostics.""" + entry = SimpleNamespace(data={CONF_VACS: {"vacuum": _vacuum_config("T2277")}}) + sensor_entities = [] + + await async_setup_sensor_entry(None, entry, sensor_entities.extend) + + assert "Clean Type" in [entity.name for entity in sensor_entities] diff --git a/tests/test_vacuum/test_dps_command_mapping.py b/tests/test_vacuum/test_dps_command_mapping.py index ce7b79f8..dcd405a6 100644 --- a/tests/test_vacuum/test_dps_command_mapping.py +++ b/tests/test_vacuum/test_dps_command_mapping.py @@ -127,11 +127,13 @@ def test_getDpsCodes_extraction_method() -> None: assert t2320_dps_codes["STATUS"] == "173" # Non-default code assert t2320_dps_codes["STATUS"] != TuyaCodes.STATUS assert "BATTERY_LEVEL" in t2320_dps_codes - assert t2320_dps_codes["BATTERY_LEVEL"] == "172" # Non-default code + assert t2320_dps_codes["BATTERY_LEVEL"] == "163" # Non-default code assert t2320_dps_codes["BATTERY_LEVEL"] != TuyaCodes.BATTERY_LEVEL assert "ERROR_CODE" in t2320_dps_codes - assert t2320_dps_codes["ERROR_CODE"] == "169" # Non-default code + assert t2320_dps_codes["ERROR_CODE"] == "177" # Non-default code assert t2320_dps_codes["ERROR_CODE"] != TuyaCodes.ERROR_CODE + assert t2320_dps_codes.get("CLEAN_PARAM") == "154" + assert t2320_dps_codes.get("FAN_SPEED") == "158" @pytest.mark.asyncio @@ -173,3 +175,32 @@ async def test_vacuum_update_uses_correct_dps_codes() -> None: assert entity._attr_error_code == 0 assert entity._attr_mode == "auto" assert entity._attr_fan_speed == "Standard" + + +@pytest.mark.asyncio +async def test_quiet_fan_speed_display_stays_quiet_when_model_lists_quiet() -> None: + """Test raw Quiet is only renamed to Pure for models that expose Pure.""" + mock_vacuum_data = { + CONF_NAME: "Test Vacuum", + CONF_ID: "test_id", + CONF_MAC: "test_mac", + CONF_MODEL: "T2320", + CONF_IP_ADDRESS: "192.168.1.1", + CONF_ACCESS_TOKEN: "test_token", + CONF_DESCRIPTION: "Test Description", + } + + mock_robovac = MagicMock() + mock_robovac._dps = {"158": "Quiet"} + mock_robovac.getFanSpeeds.return_value = ["Standard", "Turbo", "Max", "Quiet"] + mock_robovac.getHomeAssistantFeatures.return_value = 0 + mock_robovac.getRoboVacFeatures.return_value = 0 + mock_robovac.getRoboVacActivityMapping.return_value = None + mock_robovac.getDpsCodes.return_value = {"FAN_SPEED": "158"} + mock_robovac.getRoboVacHumanReadableValue.side_effect = lambda command, value: value + + with patch("custom_components.robovac.vacuum.RoboVac", return_value=mock_robovac): + entity = RoboVacEntity(mock_vacuum_data) + entity.update_entity_values() + + assert entity._attr_fan_speed == "Quiet" diff --git a/tests/test_vacuum/test_proto_decode.py b/tests/test_vacuum/test_proto_decode.py index c16b28da..2cb2d2f4 100644 --- a/tests/test_vacuum/test_proto_decode.py +++ b/tests/test_vacuum/test_proto_decode.py @@ -13,6 +13,8 @@ decode_work_status_v2, decode_error_code, decode_clean_param_response, + merge_clean_param_layers, + patch_clean_param_dps154, decode_consumable_response, decode_device_info, decode_unisetting_response, @@ -559,6 +561,74 @@ def test_empty_payload_returns_empty_dict(self) -> None: result = decode_clean_param_response(raw) assert result == {} + def test_merge_clean_param_layers_keeps_clean_type_when_running_omits_it(self) -> None: + """Running job payload may omit clean_type; inherit from global clean_param.""" + decoded = { + "clean_param": {"clean_type": "mop_only", "mop_level": "low"}, + "running_clean_param": {"fan": "standard", "mop_level": "high"}, + } + merged = merge_clean_param_layers(decoded) + assert merged["clean_type"] == "mop_only" + assert merged["fan"] == "standard" + assert merged["mop_level"] == "high" + + def test_dps154_edge_hugging_mopping_on(self) -> None: + """Decode observed X9 edge-hugging mopping enabled payload.""" + result = decode_clean_param_response( + "KgoQCgIIAhIAGgAiBAgCEAEqABIAGgAiEgoCCAIaACIECAIQASoAMgIIAQ==" + ) + + params = result["running_clean_param"] + assert params["clean_type"] == "sweep_and_mop" + assert params["mop_level"] == "high" + assert params["fan"] == "standard" + assert params["edge_hugging_mopping"] is True + + def test_dps154_edge_hugging_mopping_off(self) -> None: + """Decode observed X9 edge-hugging mopping disabled payload.""" + result = decode_clean_param_response( + "JgoOCgIIAhIAGgAiAggCKgASABoAIhAKAggCGgAiAggCKgAyAggB" + ) + + params = result["running_clean_param"] + assert params["clean_type"] == "sweep_and_mop" + assert params["mop_level"] == "high" + assert params["fan"] == "standard" + assert params["edge_hugging_mopping"] is False + + +class TestPatchCleanParamDps154: + """Encode/patch tests for DPS 154.""" + + def test_patch_mop_level_preserves_clean_type(self) -> None: + raw = "DgoKCgAaAggBIgIIARIA" + new = patch_clean_param_dps154(raw, mop_level="high") + cp = decode_clean_param_response(new)["clean_param"] + assert cp["clean_type"] == "sweep_only" + assert cp["mop_level"] == "high" + + def test_patch_clean_type_preserves_extent(self) -> None: + raw = "DgoKCgAaAggBIgIIARIA" + new = patch_clean_param_dps154(raw, clean_type="mop_only") + cp = decode_clean_param_response(new)["clean_param"] + assert cp["clean_type"] == "mop_only" + assert cp["clean_extent"] == "narrow" + + def test_patch_edge_hugging_does_not_create_area_layer(self) -> None: + """Edge-only writes match app payload shape by not inventing area params.""" + raw = "JgoOCgIIAhIAGgAiAggCKgASABoAIhAKAggCGgAiAggCKgAyAggB" + new = patch_clean_param_dps154(raw, edge_hugging_mopping=True) + decoded = decode_clean_param_response(new) + + assert "area_clean_param" not in decoded + assert decoded["clean_param"]["edge_hugging_mopping"] is True + assert decoded["running_clean_param"]["edge_hugging_mopping"] is True + + def test_patch_clean_param_invalid_base64_raises_value_error(self) -> None: + """Malformed DPS 154 payloads surface as ValueError for HA callers.""" + with pytest.raises(ValueError, match="invalid clean param base64"): + patch_clean_param_dps154("not-base64!!", mop_level="high") + # ============================================================================ # Tests for decode_consumable_response (DPS 168) @@ -578,6 +648,19 @@ def test_dps168_sample(self) -> None: assert result.get("filter_mesh") == 48 assert result.get("dustbag") == 1574 + def test_t2320_dps168_sample(self) -> None: + """Decode the observed T2320 DPS 168 consumable fields.""" + payload = "IgogCgIIWRICCFkaAghZIgIIWSoDCLgCMgIIHaAB8fjazwY=" + result = decode_consumable_response(payload) + assert result == { + "side_brush": 89, + "rolling_brush": 89, + "filter_mesh": 89, + "scrape": 89, + "sensor": 312, + "mop": 29, + } + def test_empty_payload_returns_empty_dict(self) -> None: """Empty payload → empty dict.""" import base64 @@ -906,3 +989,40 @@ def test_error_code_with_extended_codes() -> None: result = decode_error_code("AA==") # Should return string or None assert result is None or isinstance(result, str) + + +def test_decode_t2320_room_meta_live_payload_shape() -> None: + """Decode X9 Pro DPS 165 length-prefixed room metadata.""" + from custom_components.robovac.proto_decode import decode_t2320_room_meta + + payload = ( + "fwp9CAsSDAgBEghCYXRocm9vbRIKCAISBlNob3dlchILCAMSB0tpdGNoZW4SCQgEEgVTdHVkeRIQ" + "CAUSDE1haW4gYmVkcm9vbRIOCAYSCkd1ZXN0IHJvb20SDwgIEgtMaXZpbmcgcm9vbRIKCAkSBlRv" + "aWxldBIICAoSBEhhbGw=" + ) + result = decode_t2320_room_meta( + payload + ) + + assert result["map_id"] == 11 + assert result["rooms"] == [ + {"id": 1, "label": "Bathroom"}, + {"id": 2, "label": "Shower"}, + {"id": 3, "label": "Kitchen"}, + {"id": 4, "label": "Study"}, + {"id": 5, "label": "Main bedroom"}, + {"id": 6, "label": "Guest room"}, + {"id": 8, "label": "Living room"}, + {"id": 9, "label": "Toilet"}, + {"id": 10, "label": "Hall"}, + ] + + +def test_build_t2320_room_clean_mode_payload() -> None: + """Build selected-room ModeCtrlRequest for DPS 152.""" + from custom_components.robovac.proto_decode import build_t2320_room_clean_mode + + assert build_t2320_room_clean_mode([3], map_id=11) == "DggBIgoKBAgDEAEQARgL" + assert build_t2320_room_clean_mode([3, 8], map_id=11, clean_times=2) == ( + "FAgBIhAKBAgDEAEKBAgIEAIQAhgL" + ) diff --git a/tests/test_vacuum/test_robovac.py b/tests/test_vacuum/test_robovac.py index eaace981..2ca4ae01 100644 --- a/tests/test_vacuum/test_robovac.py +++ b/tests/test_vacuum/test_robovac.py @@ -123,6 +123,40 @@ def test_get_robovac_features() -> None: assert robovac_l70.getRoboVacFeatures() == expected_l_features +def test_t2320_consumables_dps_code() -> None: + """T2320 exposes DPS 168 as consumables, not room-clean metadata.""" + with patch( + "custom_components.robovac.robovac.TuyaDevice.__init__", return_value=None + ): + robovac = RoboVac( + model_code="T2320", + device_id="test_id", + host="192.168.1.100", + local_key="test_key", + ) + + dps_codes = robovac.getDpsCodes() + assert dps_codes["CONSUMABLES"] == "168" + assert "ROOM_CLEAN" not in dps_codes + + +def test_get_dps_codes_includes_model_extra_codes() -> None: + """Model-level dps_codes are exposed alongside command DPS codes.""" + with patch( + "custom_components.robovac.robovac.TuyaDevice.__init__", return_value=None + ): + robovac = RoboVac( + model_code="T2320", + device_id="test_id", + host="192.168.1.100", + local_key="test_key", + ) + + dps_codes = robovac.getDpsCodes() + assert dps_codes["ROOM_META"] == "165" + assert "ROOM_CLEAN" not in dps_codes + + def test_get_fan_speeds() -> None: """Test getFanSpeeds returns correct fan speeds for different series. diff --git a/tests/test_vacuum/test_sensor.py b/tests/test_vacuum/test_sensor.py index f18bab6e..f5987aa4 100644 --- a/tests/test_vacuum/test_sensor.py +++ b/tests/test_vacuum/test_sensor.py @@ -4,13 +4,14 @@ import pytest from unittest.mock import patch, MagicMock -from homeassistant.const import PERCENTAGE, CONF_ID +from homeassistant.const import PERCENTAGE, CONF_ID, EntityCategory from homeassistant.components.sensor import SensorDeviceClass from custom_components.robovac.sensor import ( RobovacBatterySensor, RobovacErrorSensor, RobovacNotificationSensor, + RobovacWarningSensor, RobovacConsumableSensor, RobovacCleanTypeSensor, RobovacLastCleanRecordSensor, @@ -27,6 +28,7 @@ RobovacChildrenLockSensor, ) from custom_components.robovac.vacuums.base import TuyaCodes +from custom_components.robovac.vacuums.T2320 import T2320 # ============================================================================ # Fixtures for common test scenarios @@ -287,6 +289,17 @@ async def test_notification_sensor_init(mock_vacuum_data: Any) -> None: sensor = RobovacNotificationSensor(mock_vacuum_data, "178") assert sensor is not None assert sensor.robovac_id == mock_vacuum_data[CONF_ID] + assert sensor.entity_category == EntityCategory.DIAGNOSTIC + + +@pytest.mark.asyncio +async def test_warning_sensor_init(mock_vacuum_data: Any) -> None: + """Test warning sensor initialization.""" + + sensor = RobovacWarningSensor(mock_vacuum_data, "177", T2320) + assert sensor is not None + assert sensor.robovac_id == mock_vacuum_data[CONF_ID] + assert sensor.entity_category == EntityCategory.DIAGNOSTIC @pytest.mark.asyncio @@ -300,6 +313,7 @@ async def test_consumable_sensor_init(mock_vacuum_data: Any) -> None: assert sensor.robovac_id == mock_vacuum_data[CONF_ID] assert sensor._attr_name == "Side Brush" assert sensor._attr_icon == "mdi:brush" + assert sensor.entity_category == EntityCategory.DIAGNOSTIC @pytest.mark.asyncio @@ -451,7 +465,8 @@ async def test_error_sensor_update_no_tuyastatus(mock_vacuum_data: Any) -> None: # First update with no data await sensor.async_update() - assert sensor._attr_available is False + assert sensor._attr_available is True + assert sensor._attr_native_value == "No error" @pytest.mark.asyncio @@ -469,7 +484,8 @@ async def test_error_sensor_update_no_dps_code(mock_vacuum_data: Any) -> None: sensor.hass = mock_hass await sensor.async_update() - assert sensor._attr_available is False + assert sensor._attr_available is True + assert sensor._attr_native_value == "No error" @pytest.mark.asyncio @@ -618,7 +634,7 @@ async def test_error_sensor_update_successful(mock_vacuum_data: Any) -> None: await sensor.async_update() assert sensor._attr_available is True - assert sensor._attr_native_value is None # no_error returns None + assert sensor._attr_native_value == "No error" @pytest.mark.asyncio @@ -646,6 +662,29 @@ async def test_notification_sensor_update_successful(mock_vacuum_data: Any) -> N assert sensor._attr_native_value is None # no_error returns None +@pytest.mark.asyncio +async def test_warning_sensor_update_successful(mock_vacuum_data: Any) -> None: + """Test warning sensor decodes non-fatal T2320 warning payloads.""" + + mock_data = {CONF_ID: "test_id", "name": "Test"} + sensor = RobovacWarningSensor(mock_data, "177", T2320) + + mock_vacuum = MagicMock() + mock_vacuum.tuyastatus = {"177": "Dwj22eCIkfFEGgFSIgBSAA=="} + + mock_hass = MagicMock() + mock_hass.data = {"robovac": {"vacuums": {"test_id": mock_vacuum}}} + sensor.hass = mock_hass + + await sensor.async_update() + + assert sensor._attr_available is True + assert sensor._attr_native_value == "Clean tray needs cleaning" + assert sensor._attr_extra_state_attributes == { + "warnings": [{"code": 82, "message": "Clean tray needs cleaning"}] + } + + @pytest.mark.asyncio async def test_wifi_signal_sensor_update_no_data_first_time(mock_vacuum_data: Any) -> None: """Test WiFi signal sensor when no data available on first update.""" @@ -739,7 +778,7 @@ async def test_error_sensor_successful_update_no_error(mock_vacuum_data: Any) -> await sensor.async_update() assert sensor._attr_available is True - assert sensor._attr_native_value is None # no_error becomes None + assert sensor._attr_native_value == "No error" assert sensor._has_had_data is True @@ -762,6 +801,26 @@ async def test_notification_sensor_successful_update(mock_hass_with_valid_vacuum assert sensor._has_had_data is True +@pytest.mark.asyncio +async def test_t2320_notification_sensor_decodes_prompt_code_10(mock_vacuum_data: Any) -> None: + """Test T2320 notification sensor uses the model prompt table.""" + + sensor = RobovacNotificationSensor(mock_vacuum_data, "178", T2320) + + mock_vacuum = MagicMock() + mock_vacuum.tuyastatus = {"178": "CwiY+IOJrO9JEgEK"} + + mock_hass = MagicMock() + mock_hass.data = {"robovac": {"vacuums": {mock_vacuum_data[CONF_ID]: mock_vacuum}}} + sensor.hass = mock_hass + + await sensor.async_update() + + assert sensor._attr_available is True + assert sensor._attr_native_value == "Positioning successful" + assert sensor._attr_native_value != "Prompt 10" + + @pytest.mark.asyncio async def test_battery_sensor_successful_update(mock_hass_with_valid_vacuum: Any) -> None: """Test battery sensor successfully updates with valid data.""" @@ -809,7 +868,11 @@ async def test_sensor_missing_dps_code(sensor_class: Any, dps_code: Any) -> None await sensor.async_update() - assert sensor._attr_available is False + if sensor_class == RobovacErrorSensor: + assert sensor._attr_available is True + assert sensor._attr_native_value == "No error" + else: + assert sensor._attr_available is False # ============================================================================ # Malformed Data Error Handling Tests (Scenario #5.3) diff --git a/tests/test_vacuum/test_t2320_command_mappings.py b/tests/test_vacuum/test_t2320_command_mappings.py index c6813211..24e46ffe 100644 --- a/tests/test_vacuum/test_t2320_command_mappings.py +++ b/tests/test_vacuum/test_t2320_command_mappings.py @@ -2,6 +2,7 @@ import pytest from unittest.mock import patch +import base64 from custom_components.robovac.robovac import RoboVac, RobovacCommand from custom_components.robovac.vacuums.T2320 import T2320 @@ -27,7 +28,7 @@ def test_return_home_command_value(self, t2320_robovac): """Test RETURN_HOME command returns boolean true as seen in debug logs.""" # Debug log shows: "dps": {"153": true} result = t2320_robovac.getRoboVacCommandValue(RobovacCommand.RETURN_HOME, "return_home") - assert result is True or result == "True" or result == "true" + assert result is True def test_start_pause_command_exists(self, t2320_robovac): """Test START_PAUSE command is defined for T2320.""" @@ -39,20 +40,26 @@ def test_start_pause_command_value(self, t2320_robovac): """Test START_PAUSE command returns boolean values.""" # Debug log shows: "dps": {"2": false} pause_result = t2320_robovac.getRoboVacCommandValue(RobovacCommand.START_PAUSE, "pause") - assert pause_result is False or pause_result == "False" or pause_result == "false" + assert pause_result is False start_result = t2320_robovac.getRoboVacCommandValue(RobovacCommand.START_PAUSE, "start") - assert start_result is True or start_result == "True" or start_result == "true" + assert start_result is True def test_mode_command_value(self, t2320_robovac): - """Test MODE command returns plain string values as seen in debug logs.""" - # Debug log shows: "dps": {"152": "auto"} + """Test MODE command returns base64 protobuf values observed on X9.""" result = t2320_robovac.getRoboVacCommandValue(RobovacCommand.MODE, "auto") - assert result == "auto" + assert result == "BBoCCAE=" + + def test_mode_command_does_not_expose_legacy_room_literals(self): + """T2320 room cleaning uses roomClean payloads, not literal DPS 152 modes.""" + values = T2320.commands[RobovacCommand.MODE]["values"] + assert "small_room" not in values + assert "single_room" not in values def test_fan_speed_command_has_multiple_options(self, t2320_robovac): """Test FAN_SPEED command has multiple readable options.""" fan_speeds = t2320_robovac.getFanSpeeds() + assert fan_speeds == ["Standard", "Turbo", "Max", "Quiet"] # Should have more than one option and not contain base64-like strings assert len(fan_speeds) > 1 for speed in fan_speeds: @@ -60,6 +67,13 @@ def test_fan_speed_command_has_multiple_options(self, t2320_robovac): assert not speed.startswith("Ag") assert len(speed) < 20 # Reasonable length for human-readable names + def test_fan_speed_command_values(self, t2320_robovac): + """Test FAN_SPEED command values match X9 suction levels.""" + assert t2320_robovac.getRoboVacCommandValue(RobovacCommand.FAN_SPEED, "quiet") == "Quiet" + assert t2320_robovac.getRoboVacCommandValue(RobovacCommand.FAN_SPEED, "standard") == "Standard" + assert t2320_robovac.getRoboVacCommandValue(RobovacCommand.FAN_SPEED, "turbo") == "Turbo" + assert t2320_robovac.getRoboVacCommandValue(RobovacCommand.FAN_SPEED, "max") == "Max" + def test_dps_codes_mapping(self, t2320_robovac): """Test DPS codes match debug log expectations.""" dps_codes = t2320_robovac.getDpsCodes() @@ -67,14 +81,104 @@ def test_dps_codes_mapping(self, t2320_robovac): assert dps_codes.get("RETURN_HOME") == "153" assert dps_codes.get("START_PAUSE") == "2" assert dps_codes.get("MODE") == "152" - assert dps_codes.get("FAN_SPEED") == "154" + assert dps_codes.get("CLEAN_PARAM") == "154" + assert dps_codes.get("FAN_SPEED") == "158" + assert dps_codes.get("BATTERY_LEVEL") == "163" + assert dps_codes.get("CONSUMABLES") == "168" + assert dps_codes.get("ERROR_CODE") == "177" + assert "ROOM_CLEAN" not in dps_codes def test_status_command_exists(self, t2320_robovac): """Test STATUS command is defined for state polling.""" commands = t2320_robovac.getSupportedCommands() assert RobovacCommand.STATUS in commands + def test_consumables_command_exists(self, t2320_robovac): + """Test DPS 168 is exposed as consumables, not room-clean metadata.""" + commands = t2320_robovac.getSupportedCommands() + assert RobovacCommand.CONSUMABLES in commands + assert t2320_robovac.model_details.commands[RobovacCommand.CONSUMABLES]["code"] == 168 + def test_locate_command_exists(self, t2320_robovac): """Test LOCATE command is defined.""" commands = t2320_robovac.getSupportedCommands() assert RobovacCommand.LOCATE in commands + + def test_clean_param_command_enables_mop_telemetry_sensor(self, t2320_robovac): + """CLEAN_PARAM on 154 is required for RobovacCleanTypeSensor (sweep/mop mode).""" + commands = t2320_robovac.getSupportedCommands() + assert RobovacCommand.CLEAN_PARAM in commands + assert t2320_robovac.model_details.commands[RobovacCommand.CLEAN_PARAM]["code"] == 154 + + def test_decode_station_status_from_base64_payload(self): + """Test station keywords are found after base64 decoding DPS 173.""" + raw = base64.b64encode(b"\x08\x01 station WASHING active").decode() + assert T2320.decode_dps("173", raw) == "washing" + + def test_decode_station_status_from_live_washing_payload(self): + """Test observed X9 station status payload maps to washing.""" + raw = "MgokCgwKBggBGgIIChIAGAESBggBEgIIAjIMCgIIARIGCAEQARgPEgYIARABKAEqAggt" + assert T2320.decode_dps("173", raw) == "washing" + + def test_decode_station_status_from_live_drying_payload(self): + """Test observed X9 station status payload maps to drying.""" + raw = "MAokCgwKBggBGgIIChIAGAESBggBEgIIAjIMCgIIARIGCAEQARgPEgQIARACKgIITA==" + assert T2320.decode_dps("173", raw) == "drying" + + def test_decode_error_without_active_codes_is_no_error(self): + """Test empty/zero DPS 177 protobuf payload does not force an error.""" + assert T2320.decode_dps("177", "AA==") == "no_error" + + def test_decode_warning_only_station_payload_is_no_error(self): + """Test observed X9 warning-only station payload does not force error.""" + assert T2320.decode_dps("177", "Dwj22eCIkfFEGgFSIgBSAA==") == "no_error" + + def test_decode_warning_dps_from_live_payload(self): + """Test observed X9 warning payload exposes non-fatal warnings.""" + assert T2320.decode_warning_dps("Dwj22eCIkfFEGgFSIgBSAA==") == [ + {"code": 82, "message": "Clean tray needs cleaning"} + ] + + def test_decode_t2320_active_error_from_dps177_field_2(self): + """Test active errors still decode from DPS 177 field 2.""" + raw = base64.b64encode(bytes([3, 0x12, 0x01, 52])).decode() + assert T2320.decode_dps("177", raw) == "Unable to leave station" + + def test_decode_t2320_warning_from_dps177_field_3_only(self): + """Test warning-only DPS 177 field 3 does not become an active error.""" + raw = base64.b64encode(bytes([3, 0x1A, 0x01, 79])).decode() + assert T2320.decode_dps("177", raw) == "no_error" + assert T2320.decode_warning_dps(raw) == [ + {"code": 79, "message": "Clean tray not installed"} + ] + + def test_decode_return_progress_payloads(self): + """Test observed X9 return progress payloads distinguish moving vs docked.""" + assert T2320.decode_dps("153", "CBAFGgA6AhAB") == "returning" + assert T2320.decode_dps("153", "CBAHQgByAiIA") == "returning" + assert T2320.decode_dps("153", "DhAFGgA6AhABcgQaACIA") == "docked" + assert T2320.decode_dps("153", "EBAFGgA6AhACcgYaAggBIgA=") == "docked" + assert T2320.decode_dps("153", "FAoAEAUaADICCAE6AhABcgQaACIA") == "docked" + assert T2320.decode_dps("153", "CgoAEAUyAHICIgA=") == "cleaning" + + def test_decode_route_unavailable_prompt(self): + """Test observed X9 room-clean failure prompt from DPS 178.""" + assert T2320.decode_dps("178", "CwjczIPu6YlJEgEH") == ( + "Path planning failed, cannot reach the designated area" + ) + + @pytest.mark.parametrize( + ("raw", "expected"), + [ + ("CwiY+IOJrO9JEgEK", "Positioning successful"), + ("Cwj6iLDX8uxJEgEM", "Cannot start task while on station"), + ], + ) + def test_decode_observed_x9_prompt_codes(self, raw, expected): + """Test observed X9 prompt codes avoid raw prompt_N states.""" + assert T2320.decode_dps("178", raw) == expected + + def test_decode_observed_x9_prompt_17(self): + """Test observed X9 prompt 17 avoids a raw prompt_N state.""" + raw = base64.b64encode(bytes([3, 0x12, 0x01, 17])).decode() + assert T2320.decode_dps("178", raw) == "Mop cleaning completed" diff --git a/tests/test_vacuum/test_vacuum_activity_lookup.py b/tests/test_vacuum/test_vacuum_activity_lookup.py new file mode 100644 index 00000000..2c9517fe --- /dev/null +++ b/tests/test_vacuum/test_vacuum_activity_lookup.py @@ -0,0 +1,12 @@ +"""Tests for vacuum activity mapping helpers.""" + +from homeassistant.components.vacuum import VacuumActivity + +from custom_components.robovac.vacuum import _lookup_activity + + +def test_lookup_activity_case_insensitive() -> None: + mapping = {"Paused": VacuumActivity.PAUSED, "auto": VacuumActivity.CLEANING} + assert _lookup_activity(mapping, "paused") == VacuumActivity.PAUSED + assert _lookup_activity(mapping, "AUTO") == VacuumActivity.CLEANING + assert _lookup_activity(mapping, "unknown") is None diff --git a/tests/test_vacuum/test_vacuum_commands.py b/tests/test_vacuum/test_vacuum_commands.py index 76048678..53a3a1b3 100644 --- a/tests/test_vacuum/test_vacuum_commands.py +++ b/tests/test_vacuum/test_vacuum_commands.py @@ -4,6 +4,17 @@ from typing import Any from unittest.mock import patch, MagicMock, AsyncMock, call +from homeassistant.const import ( + CONF_ACCESS_TOKEN, + CONF_DESCRIPTION, + CONF_ID, + CONF_IP_ADDRESS, + CONF_MAC, + CONF_MODEL, + CONF_NAME, +) + +from custom_components.robovac.proto_decode import decode_clean_param_response from custom_components.robovac.robovac import RoboVac from custom_components.robovac.vacuum import RoboVacEntity @@ -138,6 +149,144 @@ async def test_async_pause_sends_boolean_for_toggle_models( robovac.async_set.assert_called_once_with({"2": False}) +@pytest.mark.asyncio +async def test_async_pause_sends_mode_payload_when_model_requires_it( + mock_vacuum_data, +) -> None: + """Test async_pause sends model-specific mode payloads with boolean pause.""" + with patch("custom_components.robovac.robovac.TuyaDevice.__init__", return_value=None): + robovac = RoboVac( + model_code="T2320", + device_id="test_id", + host="192.168.1.100", + local_key="test_key", + ) + robovac.async_set = AsyncMock(return_value=True) + + with patch("custom_components.robovac.vacuum.RoboVac", return_value=robovac): + entity = RoboVacEntity(mock_vacuum_data) + await entity.async_pause() + + robovac.async_set.assert_called_once_with({"2": False, "152": "AggN"}) + + +@pytest.mark.asyncio +async def test_async_return_to_base_sends_mode_payload_when_model_requires_it( + mock_vacuum_data, +) -> None: + """Test return-to-base updates mode DPS for models with encoded mode payloads.""" + with patch("custom_components.robovac.robovac.TuyaDevice.__init__", return_value=None): + robovac = RoboVac( + model_code="T2320", + device_id="test_id", + host="192.168.1.100", + local_key="test_key", + ) + robovac.async_set = AsyncMock(return_value=True) + + with patch("custom_components.robovac.vacuum.RoboVac", return_value=robovac): + entity = RoboVacEntity(mock_vacuum_data) + await entity.async_return_to_base() + + robovac.async_set.assert_called_once_with({"153": True, "152": "AggG", "2": True}) + + +@pytest.mark.asyncio +async def test_async_set_clean_param_uses_model_default_before_dps154_read() -> None: + """Test T2320 clean-param settings can be changed before DPS 154 is read.""" + data = { + CONF_NAME: "Test X9", + CONF_ID: "test_x9_id", + CONF_MODEL: "T2320", + CONF_IP_ADDRESS: "192.168.1.100", + CONF_ACCESS_TOKEN: "test_key", + CONF_DESCRIPTION: "X9 Pro", + CONF_MAC: "aa:bb:cc:dd:ee:99", + } + with patch("custom_components.robovac.robovac.TuyaDevice.__init__", return_value=None): + entity = RoboVacEntity(data) + assert entity.vacuum is not None + entity.vacuum._dps = {} + entity.tuyastatus = {} + entity.vacuum.async_set = AsyncMock(return_value=True) + + await entity.async_set_clean_param(clean_type="mop_only", mop_level="low") + + entity.vacuum.async_set.assert_called_once() + payload = entity.vacuum.async_set.call_args.args[0] + assert list(payload) == ["154"] + clean_param = decode_clean_param_response(payload["154"])["clean_param"] + assert clean_param["clean_type"] == "mop_only" + assert clean_param["mop_level"] == "low" + assert entity.tuyastatus["154"] == payload["154"] + assert entity.vacuum._dps["154"] == payload["154"] + assert entity.clean_type == "mop_only" + assert entity.mop_level == "low" + + +@pytest.mark.asyncio +async def test_update_entity_values_does_not_display_default_before_dps154_read() -> None: + """Test T2320 clean-param display values wait for a real DPS 154 read.""" + data = { + CONF_NAME: "Test X9", + CONF_ID: "test_x9_id", + CONF_MODEL: "T2320", + CONF_IP_ADDRESS: "192.168.1.100", + CONF_ACCESS_TOKEN: "test_key", + CONF_DESCRIPTION: "X9 Pro", + CONF_MAC: "aa:bb:cc:dd:ee:99", + } + with patch("custom_components.robovac.robovac.TuyaDevice.__init__", return_value=None): + entity = RoboVacEntity(data) + assert entity.vacuum is not None + entity.vacuum._dps = { + "151": True, + "156": True, + "158": "Standard", + "159": True, + "160": False, + "161": 80, + "163": 29, + } + + entity.update_entity_values() + + assert entity.clean_type is None + assert entity.mop_level is None + assert entity.edge_hugging_mopping is None + + +@pytest.mark.asyncio +async def test_update_entity_values_preserves_clean_params_on_partial_update() -> None: + """Test battery-only updates do not clear the last real DPS 154 decode.""" + data = { + CONF_NAME: "Test X9", + CONF_ID: "test_x9_id", + CONF_MODEL: "T2320", + CONF_IP_ADDRESS: "192.168.1.100", + CONF_ACCESS_TOKEN: "test_key", + CONF_DESCRIPTION: "X9 Pro", + CONF_MAC: "aa:bb:cc:dd:ee:99", + } + with patch("custom_components.robovac.robovac.TuyaDevice.__init__", return_value=None): + entity = RoboVacEntity(data) + assert entity.vacuum is not None + entity.vacuum._dps = { + "154": "JgoOCgIIAhIAGgAiAggCKgASABoAIhAKAggCGgAiAggCKgAyAggB", + "163": 29, + } + + entity.update_entity_values() + assert entity.clean_type == "sweep_and_mop" + assert entity.edge_hugging_mopping is False + + entity.vacuum._dps = {"163": 30} + entity.update_entity_values() + + assert entity.clean_type == "sweep_and_mop" + assert entity.edge_hugging_mopping is False + + @pytest.mark.asyncio async def test_async_pause(mock_robovac, mock_vacuum_data) -> None: """Test the async_pause method.""" diff --git a/tests/test_vacuum/test_vacuum_entity.py b/tests/test_vacuum/test_vacuum_entity.py index 95b508a3..969b10e8 100644 --- a/tests/test_vacuum/test_vacuum_entity.py +++ b/tests/test_vacuum/test_vacuum_entity.py @@ -1,10 +1,21 @@ """Tests for the RoboVac vacuum entity.""" +import base64 import pytest from typing import Any from unittest.mock import patch, MagicMock from homeassistant.components.vacuum import VacuumActivity +from homeassistant.const import ( + CONF_ACCESS_TOKEN, + CONF_DESCRIPTION, + CONF_ID, + CONF_IP_ADDRESS, + CONF_MAC, + CONF_MODEL, + CONF_NAME, +) +from custom_components.robovac.robovac import RoboVac from custom_components.robovac.vacuum import RoboVacEntity from custom_components.robovac.vacuums.base import TuyaCodes @@ -24,6 +35,40 @@ async def test_activity_property_none(mock_robovac, mock_vacuum_data) -> None: assert result is None +@pytest.mark.asyncio +async def test_activity_property_uses_mode_without_status( + mock_robovac, mock_vacuum_data +) -> None: + """Test mode DPS drives activity when no status DPS has been received.""" + with patch("custom_components.robovac.vacuum.RoboVac", return_value=mock_robovac): + entity = RoboVacEntity(mock_vacuum_data) + entity.tuya_state = None + entity.error_code = "no_error" + + entity._attr_mode = "auto" + assert entity.activity == VacuumActivity.CLEANING + + entity._attr_mode = "pause" + assert entity.activity == VacuumActivity.PAUSED + + entity._attr_mode = "return" + assert entity.activity == VacuumActivity.RETURNING + + +@pytest.mark.asyncio +async def test_activity_property_idle_when_dps_has_no_status_or_mode( + mock_robovac, mock_vacuum_data +) -> None: + """Test a connected vacuum with no active status/mode reports idle.""" + with patch("custom_components.robovac.vacuum.RoboVac", return_value=mock_robovac): + entity = RoboVacEntity(mock_vacuum_data) + entity.tuya_state = None + entity.error_code = "no_error" + entity.tuyastatus = {"158": "Standard", "163": 39} + + assert entity.activity == VacuumActivity.IDLE + + @pytest.mark.asyncio async def test_activity_property_error(mock_robovac, mock_vacuum_data) -> None: """Test activity property returns ERROR when error_code is set.""" @@ -142,6 +187,131 @@ async def test_activity_property_cleaning(mock_robovac, mock_vacuum_data) -> Non assert result == VacuumActivity.CLEANING +@pytest.mark.asyncio +async def test_activity_property_uses_mode_when_status_is_idle( + mock_robovac, mock_vacuum_data +) -> None: + """Test active mode DPS overrides station-idle status for T2320-like models.""" + with patch("custom_components.robovac.vacuum.RoboVac", return_value=mock_robovac): + entity = RoboVacEntity(mock_vacuum_data) + entity._attr_activity_mapping = {"idle": VacuumActivity.IDLE} + entity.tuya_state = "idle" + entity.error_code = "no_error" + + entity._attr_mode = "auto" + assert entity.activity == VacuumActivity.CLEANING + + entity._attr_mode = "pause" + assert entity.activity == VacuumActivity.PAUSED + + entity._attr_mode = "return" + assert entity.activity == VacuumActivity.RETURNING + + +@pytest.mark.asyncio +async def test_activity_property_uses_return_progress_over_stale_return_mode() -> None: + """Test T2320 dock progress overrides stale DPS 152 return mode.""" + data = { + CONF_NAME: "Test X9", + CONF_ID: "test_x9_id", + CONF_MODEL: "T2320", + CONF_IP_ADDRESS: "192.168.1.100", + CONF_ACCESS_TOKEN: "test_key", + CONF_DESCRIPTION: "X9 Pro", + CONF_MAC: "aa:bb:cc:dd:ee:99", + } + with patch("custom_components.robovac.robovac.TuyaDevice.__init__", return_value=None): + robovac = RoboVac( + model_code="T2320", + device_id="test_id", + host="192.168.1.100", + local_key="test_key", + ) + with patch("custom_components.robovac.vacuum.RoboVac", return_value=robovac): + entity = RoboVacEntity(data) + robovac._dps = { + "152": "AggG", + "153": "DhAFGgA6AhABcgQaACIA", + "173": "LgokCgwKBggBGgIIChIAGAESBggBEgIIAjIMCgIIARIGCAEQARgPEgIIASoCCFg=", + } + entity.update_entity_values() + + assert entity.mode == "return" + assert entity.activity == VacuumActivity.DOCKED + + +@pytest.mark.asyncio +async def test_activity_property_error_overrides_docked_return_progress() -> None: + """Test active errors take precedence over stale docked return-progress payloads.""" + data = { + CONF_NAME: "Test X9", + CONF_ID: "test_x9_id", + CONF_MODEL: "T2320", + CONF_IP_ADDRESS: "192.168.1.100", + CONF_ACCESS_TOKEN: "test_key", + CONF_DESCRIPTION: "X9 Pro", + CONF_MAC: "aa:bb:cc:dd:ee:99", + } + with patch("custom_components.robovac.robovac.TuyaDevice.__init__", return_value=None): + robovac = RoboVac( + model_code="T2320", + device_id="test_id", + host="192.168.1.100", + local_key="test_key", + ) + with patch("custom_components.robovac.vacuum.RoboVac", return_value=robovac): + entity = RoboVacEntity(data) + robovac._dps = { + "152": "AggG", + "153": "DhAFGgA6AhABcgQaACIA", + "173": "LgokCgwKBggBGgIIChIAGAESBggBEgIIAjIMCgIIARIGCAEQARgPEgIIASoCCFg=", + "177": base64.b64encode(bytes([3, 0x12, 0x01, 52])).decode(), + } + entity.update_entity_values() + + assert entity._return_progress_activity() == VacuumActivity.DOCKED + assert entity.error_code == "Unable to leave station" + assert entity.activity == VacuumActivity.ERROR + + +@pytest.mark.asyncio +async def test_activity_property_uses_return_progress_cleaning_signal() -> None: + """Test T2320 active cleaning DPS 153 overrides standby/idle status.""" + data = { + CONF_NAME: "Test X9", + CONF_ID: "test_x9_id", + CONF_MODEL: "T2320", + CONF_IP_ADDRESS: "192.168.1.100", + CONF_ACCESS_TOKEN: "test_key", + CONF_DESCRIPTION: "X9 Pro", + CONF_MAC: "aa:bb:cc:dd:ee:99", + } + with patch("custom_components.robovac.robovac.TuyaDevice.__init__", return_value=None): + robovac = RoboVac( + model_code="T2320", + device_id="test_id", + host="192.168.1.100", + local_key="test_key", + ) + with patch("custom_components.robovac.vacuum.RoboVac", return_value=robovac): + entity = RoboVacEntity(data) + robovac._dps = { + "152": "AA==", + "153": "CgoAEAUyAHICIgA=", + "173": "LgokCgwKBggBGgIIChIAGAESBggBEgIIAjIMCgIIARIGCAEQARgPEgIIASoCCFg=", + } + entity.update_entity_values() + + assert entity.mode == "standby" + assert entity.activity == VacuumActivity.CLEANING + + +def test_cloud_dps_map_accepts_flat_and_nested_responses() -> None: + """Tuya cloud may return either a flat DPS map or {'dps': {...}}.""" + assert RoboVacEntity._cloud_dps_map({"165": "flat"}) == {"165": "flat"} + assert RoboVacEntity._cloud_dps_map({"dps": {"165": "nested"}}) == {"165": "nested"} + + @pytest.mark.asyncio async def test_update_entity_values(mock_robovac, mock_vacuum_data) -> None: """Test update_entity_values correctly sets entity attributes.""" @@ -262,7 +432,7 @@ async def test_fan_speed_formatting(mock_robovac, mock_vacuum_data) -> None: test_cases = [ ("No_suction", "No Suction"), ("Boost_IQ", "Boost IQ"), - ("Quiet", "Pure"), + ("Quiet", "Quiet"), ("Standard", "Standard"), # No change ]