Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 60 additions & 1 deletion custom_components/lock_code_manager/domain/credentials.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

from __future__ import annotations

from collections.abc import Mapping
from collections.abc import Iterable, Mapping
from dataclasses import dataclass, field
from enum import StrEnum
from types import MappingProxyType
Expand Down Expand Up @@ -241,6 +241,13 @@ class CredentialTypeCapability:
``supports_learn`` is True when the lock can enroll the credential at the
device (for example a fingerprint learn flow) rather than being told the
value.

Length convention shared by every provider: a non-positive ``max_length``
means "no advertised maximum / unknown" -- never a literal zero-length
limit, which would be meaningless -- so providers map an absent or
unreadable maximum to ``0`` (Matter's ``max_pin_length or 0`` idiom). A
non-positive ``min_length`` means "no minimum". ``length_bounds`` applies
this normalization; do not emit a literal ``0`` to express a real limit.
"""

num_slots: int
Expand Down Expand Up @@ -287,6 +294,58 @@ def supports(self, credential_type: CredentialType) -> bool:
"""Return True when the lock advertises ``credential_type``."""
return credential_type in self.credential_types

def length_bounds(
self, credential_type: CredentialType
) -> tuple[int, int | None] | None:
"""
Return the effective ``(min, max)`` value length for a credential type.

``None`` when the type is unsupported. A non-positive advertised
bound means "unbounded" rather than a literal limit: Matter reports
``max_pin_length`` as ``... or 0``, where ``0`` is "unknown", so it
normalizes to no upper bound (``max`` of ``None``). A non-positive
minimum normalizes to ``0`` (no minimum).
"""
cap = self.capability_for(credential_type)
if cap is None:
return None
return (
max(cap.min_length, 0),
cap.max_length if cap.max_length > 0 else None,
)


def aggregate_length_bounds(
capabilities: Iterable[LockCapabilities | None],
credential_type: CredentialType,
) -> tuple[int | None, int | None]:
"""
Fold many locks' length limits into one tightest-common ``(min, max)``.

Each bound is ``None`` when nothing constrains it. Locks with no
capabilities (``None``) or that do not support ``credential_type``
contribute nothing, so an all-unknown set yields ``(None, None)``.

The result is the tightest range every lock can satisfy: the largest
minimum and the smallest maximum. A returned ``min`` greater than ``max``
signals an unsatisfiable intersection across locks; the caller decides how
to present it. User-interface defaults deliberately live in the caller, not
here, so non-interface callers can reuse this unchanged.
"""
mins: list[int] = []
maxes: list[int] = []
for caps in capabilities:
if caps is None:
continue
bounds = caps.length_bounds(credential_type)
if bounds is None:
continue
lo, hi = bounds
mins.append(lo)
if hi is not None:
maxes.append(hi)
return (max(mins) if mins else None, min(maxes) if maxes else None)


def credential_from_slot(slot: int, state: SlotCredential) -> Credential:
"""
Expand Down
53 changes: 53 additions & 0 deletions custom_components/lock_code_manager/domain/slot_coordinator.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
HomeAssistant,
callback,
)
from homeassistant.exceptions import ServiceValidationError
from homeassistant.helpers.event import async_track_state_change_event
from homeassistant.helpers.issue_registry import (
IssueSeverity,
Expand All @@ -42,6 +43,7 @@

from ..const import ATTR_IN_SYNC, DOMAIN, EVENT_PIN_USED
from .config import EntryConfig
from .credentials import CredentialType
from .queries import get_entry_config

if TYPE_CHECKING:
Expand Down Expand Up @@ -214,10 +216,21 @@
Normalizing whitespace and the empty-PIN side effect (disabling
the slot on an active slot whose PIN was cleared) live here so
entities do not have to coordinate sibling state themselves.

A non-empty PIN is validated against every bound lock's advertised
length range before it is written; an empty PIN clears the slot and
is exempt. This is the authoritative *minimum* gate: the text entity
keeps ``native_min`` permissive so Home Assistant's ``text.set_value``
service neither rejects the empty clear nor pre-empts the per-lock
error built here. The maximum is additionally surfaced as the entity's
``native_max`` ceiling, which Home Assistant does enforce.
"""
if not value.strip():
value = ""

if value:
self._validate_credential_length(value, CredentialType.PIN)

updates: dict[str, Any] = {CONF_PIN: value}
if not value and self.is_enabled:
_LOGGER.debug(
Expand All @@ -228,6 +241,46 @@

self._write_config_fields(updates)

def _validate_credential_length(
self, value: str, credential_type: CredentialType
) -> None:
"""
Reject ``value`` if it violates any bound lock's length range.

Authoritative gate for credential length. Iterates every bound lock so
the error names each offending lock with its required range. The lock
set is the entry-wide ``runtime_data.locks`` -- the same set the text
entity mirrors in ``self.locks`` to size its surfaced bounds, since LCM
binds every lock to every slot; a future per-slot binding must update
both sites together. Locks whose capabilities are not cached
(disconnected or not yet probed) and locks that do not advertise
``credential_type`` are skipped -- the write proceeds rather than
blocking on unknown limits, and the sync layer surfaces any later
device rejection.
"""
length = len(value)
violations: list[str] = []
for lock in self._config_entry.runtime_data.locks.values():
caps = lock.cached_capabilities
if caps is None:
continue
bounds = caps.length_bounds(credential_type)
if bounds is None:
continue

Check warning on line 269 in custom_components/lock_code_manager/domain/slot_coordinator.py

View check run for this annotation

Codecov / codecov/patch

custom_components/lock_code_manager/domain/slot_coordinator.py#L269

Added line #L269 was not covered by tests
lo, hi = bounds
if length < lo or (hi is not None and length > hi):
required = (
f"at least {lo} characters"
if hi is None
else f"{lo}-{hi} characters"
)
violations.append(f"{required} for {lock.display_name}")
if violations:
raise ServiceValidationError(
f"{credential_type.value.upper()} length {length} is not accepted "
f"by all locks: {'; '.join(violations)}"
)

async def async_request_active_toggle(self, enabled: bool) -> None:
"""
Apply an enabled/disabled toggle requested by the switch entity.
Expand Down
13 changes: 13 additions & 0 deletions custom_components/lock_code_manager/providers/_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -1163,6 +1163,19 @@ async def async_get_usercodes(self) -> dict[int, SlotCredential]:
"""
return await self._project_users_to_slots(CredentialType.PIN)

@final
@property
def cached_capabilities(self) -> LockCapabilities | None:
"""
Return the already-probed capabilities, or ``None``. Never performs I/O.

Synchronous read of the same cache ``_get_cached_capabilities``
populates. Lets synchronous callers (e.g. the PIN text entity sizing
its length bounds) consult capabilities without awaiting; an unprobed
or disconnected lock reads ``None`` and contributes no constraint.
"""
return self._capabilities_cache

@final
async def _get_cached_capabilities(self) -> LockCapabilities:
"""
Expand Down
118 changes: 116 additions & 2 deletions custom_components/lock_code_manager/text.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,34 @@

from __future__ import annotations

from collections.abc import Mapping
import logging
from typing import TYPE_CHECKING

from homeassistant.components.text import TextEntity, TextMode
from homeassistant.const import CONF_NAME, CONF_PIN
from homeassistant.core import HomeAssistant, callback
from homeassistant.helpers import entity_registry as er
from homeassistant.helpers.entity_platform import AddEntitiesCallback

from .domain.credentials import CredentialType, aggregate_length_bounds
from .domain.exceptions import LockCodeManagerProviderError
from .domain.models import LockCodeManagerConfigEntry
from .entity import BaseLockCodeManagerEntity

if TYPE_CHECKING:
from .providers import BaseLock

_LOGGER = logging.getLogger(__name__)

# The single credential-type-specific knob. A text entity's ``key`` maps to a
# credential type when its value length is governed by a lock capability;
# keys absent here (for example the slot name) carry no length constraint.
# Adding password support later is a one-line entry once a password entity exists.
CREDENTIAL_TYPE_BY_CONF_KEY: Mapping[str, CredentialType] = {
CONF_PIN: CredentialType.PIN,
}


async def async_setup_entry(
hass: HomeAssistant,
Expand Down Expand Up @@ -46,8 +61,10 @@ def add_standard_text_entities(slot_num: int, ent_reg: er.EntityRegistry) -> Non
class LockCodeManagerText(BaseLockCodeManagerEntity, TextEntity):
"""Text entity for lock code manager."""

_attr_native_min = 0
_attr_native_max = 9999
# Defaults for keys with no length constraint (the slot name) and the
# fallback when bound locks advertise nothing or an unsatisfiable range.
_DEFAULT_MIN = 0
_DEFAULT_MAX = 9999

def __init__(
self,
Expand All @@ -64,6 +81,103 @@ def __init__(
)
self._attr_mode = text_mode

@property
def native_min(self) -> int:
"""
Return the minimum value length -- always the permissive default.

The advertised per-lock minimum is deliberately NOT surfaced here.
Home Assistant's ``text.set_value`` service rejects
``len(value) < native_min`` before the value reaches the coordinator,
which would block the empty string that clears a slot and would replace
the coordinator's per-lock error with a generic one. The coordinator
(``SlotEntityCoordinator._validate_credential_length``) is the
authoritative minimum gate; an empty PIN is exempt because it clears
the slot.
"""
return self._DEFAULT_MIN

@property
def native_max(self) -> int:
"""
Return the maximum value length advertised by the bound locks.

Unlike the minimum, the maximum is surfaced as a hard ceiling: no bound
lock accepts a longer value, so Home Assistant enforcing it at the
widget is correct rather than a bypass of the coordinator gate. It also
gives the frontend a ``maxlength`` so over-long input is prevented
rather than merely rejected.
"""
return self._max_bound()

def _max_bound(self) -> int:
"""
Compute the live tightest-common maximum length across the bound locks.

Reads each lock's synchronously cached capabilities (uncached or
disconnected locks contribute nothing). Non-credential keys and an
all-unknown set fall back to the default ceiling.
"""
credential_type = CREDENTIAL_TYPE_BY_CONF_KEY.get(self.key)
if credential_type is None:
return self._DEFAULT_MAX
# ``self.locks`` mirrors the entry-wide ``runtime_data.locks`` that the
# coordinator's length gate iterates -- LCM binds every lock to every
# slot. If per-slot lock binding is ever added, this surface and the
# coordinator gate must both switch to the slot's subset together.
_lo, hi = aggregate_length_bounds(
(lock.cached_capabilities for lock in self.locks), credential_type
)
hi = self._DEFAULT_MAX if hi is None else hi
# Home Assistant validates the stored value against this ceiling when it
# renders state and raises if the value is longer, so the ceiling must
# admit a PIN written before a (now tighter) lock advertised its limit.
# New out-of-range input is rejected by the coordinator gate, not here.
value = self.native_value
if value is not None:
hi = max(hi, len(value))
return hi

@callback
def _handle_add_locks(self, locks: list[BaseLock]) -> None:
"""Refresh advertised bounds when locks are added to the slot."""
super()._handle_add_locks(locks)
self._write_bounds_update()
# A freshly added lock has not been probed, so its advertised maximum is
# unknown and contributes nothing yet. Probe in the background and
# re-push once known, otherwise native_max stays at the default ceiling
# until some later state write happens to re-read the warmed cache.
if self.hass is not None:
self.config_entry.async_create_background_task(
self.hass,
self._probe_and_refresh_bounds(locks),
f"{self.entity_id} refresh length bounds",
)

@callback
def _handle_remove_lock(self, lock_entity_id: str) -> None:
"""Refresh advertised bounds when a lock is removed from the slot."""
super()._handle_remove_lock(lock_entity_id)
self._write_bounds_update()

async def _probe_and_refresh_bounds(self, locks: list[BaseLock]) -> None:
"""Populate newly added locks' capabilities, then re-push the bounds."""
for lock in locks:
try:
await lock._get_cached_capabilities()
except LockCodeManagerProviderError:
# A lock that cannot be probed (disconnected, operation failed,
# or no capability support) advertises no ceiling; bounds stay
# at the default until a later write re-reads a warmed cache.
continue
self._write_bounds_update()

@callback
def _write_bounds_update(self) -> None:
"""Re-push state so the frontend re-reads the length bounds."""
if self.hass is not None and self.entity_id:
self.async_write_ha_state()

@property
def native_value(self) -> str | None:
"""Return native value."""
Expand Down
38 changes: 38 additions & 0 deletions tests/providers/test_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,11 @@
DOMAIN,
EVENT_LOCK_STATE_CHANGED,
)
from custom_components.lock_code_manager.domain.credentials import (
CredentialType,
CredentialTypeCapability,
LockCapabilities,
)
from custom_components.lock_code_manager.domain.exceptions import (
DuplicateCodeError,
LockDisconnected,
Expand Down Expand Up @@ -56,6 +61,39 @@ def teardown_push_subscription(self) -> None:
self.unsubscribe_calls += 1


class _CapsLock(MockLCMLock):
"""Mock lock that advertises PIN capabilities."""

async def async_get_capabilities(self) -> LockCapabilities:
"""Report a single PIN credential type with a 4-8 length range."""
return LockCapabilities(
supports_user_management=True,
max_users=30,
credential_types={
CredentialType.PIN: CredentialTypeCapability(
num_slots=30, min_length=4, max_length=8, supports_learn=False
)
},
)


async def test_cached_capabilities_exposes_warmed_cache(hass: HomeAssistant):
"""cached_capabilities is None until probed, then returns the cached snapshot."""
entity_reg = er.async_get(hass)
config_entry = MockConfigEntry(domain=DOMAIN)
config_entry.add_to_hass(hass)
lock_entity = entity_reg.async_get_or_create(
"lock", "test", "caps_lock", config_entry=config_entry
)
lock = _CapsLock(hass, dr.async_get(hass), entity_reg, config_entry, lock_entity)

assert lock.cached_capabilities is None

caps = await lock._get_cached_capabilities()
assert lock.cached_capabilities is caps
assert lock.cached_capabilities.length_bounds(CredentialType.PIN) == (4, 8)


async def test_base(hass: HomeAssistant):
"""Test base class."""
entity_reg = er.async_get(hass)
Expand Down
Loading
Loading