Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion custom_components/robovac/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

from .tuyalocaldiscovery import TuyaLocalDiscovery

PLATFORMS = [Platform.VACUUM, Platform.SENSOR]
PLATFORMS = [Platform.VACUUM, Platform.SENSOR, Platform.SELECT, Platform.SWITCH]
_LOGGER = logging.getLogger(__name__)


Expand Down
254 changes: 254 additions & 0 deletions custom_components/robovac/proto_decode.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
"""

import base64
import binascii
from typing import Any


Expand Down Expand Up @@ -125,6 +126,239 @@ def _parse_proto(data: bytes) -> dict[int, Any]:
return fields


def _encode_varint(v: int) -> bytes:
"""Encode a non-negative integer as protobuf varint bytes."""
if v < 0:
raise ValueError("varint must be non-negative")
buf = bytearray()
while True:
piece = v & 0x7F
v >>= 7
if v:
buf.append(piece | 0x80)
else:
buf.append(piece)
break
return bytes(buf)


def _serialize_proto(fields: dict[int, Any]) -> bytes:
"""Serialize a protobuf message from field_num -> int | bytes | dict (nested submessage).

Wire types: int → varint (type 0), bytes/dict → length-delimited (type 2).
Dict values are serialized as nested sub-messages.
"""
chunks: list[bytes] = []
for field_num in sorted(fields.keys()):
val = fields[field_num]
tag = field_num << 3
if isinstance(val, int):
chunks.append(_encode_varint(tag))
chunks.append(_encode_varint(val))
elif isinstance(val, bytes):
chunks.append(_encode_varint(tag | 2))
chunks.append(_encode_varint(len(val)))
chunks.append(val)
elif isinstance(val, dict):
nested = _serialize_proto(val)
chunks.append(_encode_varint(tag | 2))
chunks.append(_encode_varint(len(nested)))
chunks.append(nested)
else:
raise TypeError(f"Unsupported protobuf field value type: {type(val)!r}")
return b"".join(chunks)


def _clean_param_b64_with_length_prefix(proto_payload: bytes) -> str:
"""Wrap payload with single-byte length prefix and return base64 (DPS 154 on-wire)."""
return base64.b64encode(bytes([len(proto_payload)]) + proto_payload).decode("ascii")


def _with_varint_length_prefix(proto_payload: bytes) -> str:
"""Wrap payload with a protobuf varint length prefix and base64 encode it."""
return base64.b64encode(_encode_varint(len(proto_payload)) + proto_payload).decode("ascii")


def _strip_varint_length_prefix(data: bytes) -> bytes:
"""Remove a leading protobuf varint length prefix when it matches payload size."""
if not data:
return data
try:
length, offset = _parse_varint(data, 0)
except Exception:
return data
if length == len(data) - offset:
return data[offset:]
return data


def _field_varint(field_num: int, value: int) -> bytes:
return _encode_varint(field_num << 3) + _encode_varint(value)


def _field_bytes(field_num: int, value: bytes) -> bytes:
return _encode_varint((field_num << 3) | 2) + _encode_varint(len(value)) + value


def decode_t2320_room_meta(raw_b64: str) -> dict[str, Any]:
"""Decode T2320/X9 Pro DPS 165 room metadata.

Real X9 Pro payload shape:
length_prefix
field_1 bytes:
field_1 uint32 map_id
repeated field_2 bytes:
field_1 uint32 room_id
field_2 string room_label
"""
if not raw_b64:
return {"map_id": None, "rooms": []}

raw = _strip_varint_length_prefix(base64.b64decode(raw_b64))
outer = _parse_proto(raw)
inner_raw = outer.get(1)
if isinstance(inner_raw, list):
inner_raw = next((item for item in inner_raw if isinstance(item, bytes)), None)
if not isinstance(inner_raw, bytes):
return {"map_id": None, "rooms": []}

inner = _parse_proto(inner_raw)
rooms: list[dict[str, Any]] = []
entries = inner.get(2, [])
if isinstance(entries, bytes):
entries = [entries]
if not isinstance(entries, list):
entries = []

for entry_raw in entries:
if not isinstance(entry_raw, bytes):
continue
entry = _parse_proto(entry_raw)
room_id = entry.get(1)
label_raw = entry.get(2)
if not isinstance(room_id, int):
continue
label = str(room_id)
if isinstance(label_raw, bytes):
try:
decoded = label_raw.decode("utf-8").strip()
if decoded:
label = decoded
except UnicodeDecodeError:
pass
rooms.append({"id": room_id, "label": label})

return {
"map_id": inner.get(1) if isinstance(inner.get(1), int) else None,
"rooms": rooms,
}


def build_t2320_room_clean_mode(
room_ids: list[int],
*,
map_id: int,
clean_times: int = 1,
) -> str:
"""Build a DPS 152 ModeCtrlRequest for T2320 selected-room cleaning."""
if not room_ids:
raise ValueError("room_ids must not be empty")
if map_id <= 0:
raise ValueError("map_id must be positive")
if clean_times <= 0:
raise ValueError("clean_times must be positive")

select_rooms = b""
for order, room_id in enumerate(room_ids, start=1):
room = _field_varint(1, int(room_id)) + _field_varint(2, order)
select_rooms += _field_bytes(1, room)
select_rooms += _field_varint(2, int(clean_times))
select_rooms += _field_varint(3, int(map_id))

# ModeCtrlRequest: method=1 START_SELECT_ROOMS_CLEAN, oneof field 4 payload.
request = _field_varint(1, 1) + _field_bytes(4, select_rooms)
return _with_varint_length_prefix(request)


def patch_clean_param_dps154(
raw_b64: str,
*,
clean_type: str | None = None,
mop_level: str | None = None,
edge_hugging_mopping: bool | None = None,
) -> str:
"""Patch decoded CleanParamResponse params and re-encode for DPS 154 SET.

Preserves other outer fields and inner fields (carpet, extent, etc.) when possible.
"""
try:
body = base64.b64decode(raw_b64, validate=True)
except (binascii.Error, ValueError) as err:
raise ValueError("invalid clean param base64") from err
if not body:
raise ValueError("empty clean param base64")
# Match decode_clean_param_response / _strip_length_prefix: byte 0 is length, protobuf starts at 1
payload = body[1:] if len(body) > 1 else b""

outer = _parse_proto(payload)

clean_type_names = ["sweep_only", "mop_only", "sweep_and_mop", "sweep_then_mop"]
mop_level_names = ["low", "middle", "high"]

def _patch_layer(inner_raw: Any) -> bytes:
if not isinstance(inner_raw, bytes):
inner_raw = b""
inner: dict[int, Any] = dict(_parse_proto(inner_raw))

if clean_type_key is not None:
inner[1] = _serialize_proto({1: clean_type_names.index(clean_type_key)})

if mop_level is not None or edge_hugging_mopping is not None:
prev_m = inner.get(4)
midx: int | None = None
edge = 0
if isinstance(prev_m, bytes):
mf = _parse_proto(prev_m)
midx = _as_varint(mf.get(1))
edge = int(mf.get(2) or 0)
if mop_level_key is not None:
midx = mop_level_names.index(mop_level_key)
if midx is None:
midx = 0
if edge_hugging_mopping is not None:
edge = 1 if edge_hugging_mopping else 0
inner[4] = _serialize_proto({1: midx, 2: edge})

return _serialize_proto(inner)

clean_type_key = None
if clean_type is not None:
key = clean_type.lower().replace(" ", "_").replace("-", "_")
if key not in clean_type_names:
raise ValueError(f"Unknown clean_type {clean_type!r}")
clean_type_key = key

mop_level_key = None
if mop_level is not None or edge_hugging_mopping is not None:
if mop_level is not None:
mk = mop_level.lower().replace(" ", "_")
if mk not in mop_level_names:
raise ValueError(f"Unknown mop_level {mop_level!r}")
mop_level_key = mk

new_outer = dict(outer)
fields_to_patch = (
(1, 4)
if edge_hugging_mopping is not None and mop_level is None
else (1, 3, 4)
)
for field in fields_to_patch:
if field in outer or field == 1:
new_outer[field] = _patch_layer(outer.get(field))
new_payload = _serialize_proto(new_outer)
return _clean_param_b64_with_length_prefix(new_payload)


def _strip_length_prefix(raw_b64: str) -> bytes:
"""Decode base64 and strip the length prefix byte."""
return base64.b64decode(raw_b64)[1:]
Expand Down Expand Up @@ -511,8 +745,10 @@ def _decode_param(param_bytes: bytes | None) -> dict[str, Any]:
v = _enum_val(f[3])
result["clean_extent"] = EXTENT_NAMES[v] if v < len(EXTENT_NAMES) else f"extent_{v}"
if 4 in f:
mop_fields = _parse_proto(f[4]) if isinstance(f[4], bytes) else {}
v = _enum_val(f[4])
result["mop_level"] = MOP_LEVEL_NAMES[v] if v < len(MOP_LEVEL_NAMES) else f"mop_{v}"
result["edge_hugging_mopping"] = mop_fields.get(2) == 1
if 6 in f:
v = _enum_val(f[6])
result["fan"] = FAN_NAMES[v] if v < len(FAN_NAMES) else f"fan_{v}"
Expand All @@ -536,6 +772,24 @@ def _decode_param(param_bytes: bytes | None) -> dict[str, Any]:
return result


def merge_clean_param_layers(decoded: dict[str, Any]) -> dict[str, Any]:
"""Merge clean_param / area_clean_param / running_clean_param for display.

During an active job, ``running_clean_param`` often includes fan and mop
fields but omits ``clean_type``; that value still lives on the global
``clean_param``. Later layers override earlier ones for keys present in both.
"""
merged: dict[str, Any] = {}
for layer in (
decoded.get("clean_param"),
decoded.get("area_clean_param"),
decoded.get("running_clean_param"),
):
if isinstance(layer, dict):
merged.update(layer)
return merged


def decode_consumable_response(raw_b64: str) -> dict[str, int]:
"""Decode DPS 168 (ConsumableResponse) to a dict of {name: hours}.

Expand Down
4 changes: 4 additions & 0 deletions custom_components/robovac/robovac.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,10 @@ def getDpsCodes(self) -> dict[str, str]:
}

codes = {}
model_dps_codes = getattr(self.model_details, "dps_codes", {})
if isinstance(model_dps_codes, dict):
codes.update({str(key): str(value) for key, value in model_dps_codes.items()})

# Extract codes from commands dictionary
for key, value in self.model_details.commands.items():
# Get the DPS name from the mapping, or use the command name if not in mapping
Expand Down
Loading
Loading