diff --git a/apps/predbat/components.py b/apps/predbat/components.py index 3236c1845..b94faf780 100644 --- a/apps/predbat/components.py +++ b/apps/predbat/components.py @@ -44,6 +44,7 @@ HAS_GATEWAY = False GatewayMQTT = None from load_ml_component import LoadMLComponent +from lattice_component import LatticeComponent from datetime import datetime, timezone, timedelta import asyncio import os @@ -51,6 +52,7 @@ COMPONENT_LIST = { "storage": {"class": StorageComponent, "name": "Storage", "args": {}, "can_restart": True, "phase": 0}, + "lattice": {"class": LatticeComponent, "name": "Lattice Projection", "args": {}, "can_restart": True, "phase": 2}, "db": { "class": DatabaseManager, "name": "Database Manager", diff --git a/apps/predbat/config.py b/apps/predbat/config.py index 0a1bbdb2f..533428687 100644 --- a/apps/predbat/config.py +++ b/apps/predbat/config.py @@ -44,6 +44,12 @@ "type": "switch", "default": False, }, + { + "name": "lattice_projection_enable", + "friendly_name": "Lattice Projection (experimental)", + "type": "switch", + "default": False, + }, { "name": "active", "friendly_name": "Predbat Active", diff --git a/apps/predbat/lattice.py b/apps/predbat/lattice.py new file mode 100644 index 000000000..a3ae843d9 --- /dev/null +++ b/apps/predbat/lattice.py @@ -0,0 +1,315 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System - Lattice projection core +# Copyright Trefor Southwell 2026 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +"""Pure, dependency-free Lattice model + merge + resolve. + +Mirrors the gateway C++ descriptor engine on the cloud side: typed fragments +are merged by identity into one site graph, and reads/controls resolve over +ranked access paths (prefer local gateway, fall back to vendor cloud). No +PredBat or Home Assistant dependencies so it can be unit-tested standalone. +""" +from dataclasses import dataclass, field +from typing import Optional + + +@dataclass +class AccessPath: + """A way to reach a node (a provider/transport with a ranked preference).""" + + id: str + provider: str + locality: str = "local" + transport: str = "" + preference: int = 0 + + @staticmethod + def from_dict(d): + """Build an AccessPath from its wire dict.""" + return AccessPath( + id=d["id"], + provider=d.get("provider", ""), + locality=d.get("locality", "local"), + transport=d.get("transport", ""), + preference=int(d.get("preference", 0)), + ) + + +@dataclass +class Capability: + """A read/control affordance on a node, served on a named access path.""" + + capability: str + unit: str = "" + read: bool = False + control: bool = False + access_path: str = "" + constraints: dict = field(default_factory=dict) + + @staticmethod + def from_dict(d): + """Build a Capability from its wire dict.""" + return Capability( + capability=d["capability"], + unit=d.get("unit", ""), + read=bool(d.get("read", False)), + control=bool(d.get("control", False)), + access_path=d.get("accessPath", ""), + constraints=dict(d.get("constraints", {})), + ) + + +@dataclass +class Node: + """A device in the graph, identified by id (serial), with access paths + capabilities.""" + + id: str + kind: str + device_type: str + access_paths: list = field(default_factory=list) + capabilities: list = field(default_factory=list) + + def capability(self, name) -> Optional[Capability]: + """Return the first Capability matching name, or None.""" + for c in self.capabilities: + if c.capability == name: + return c + return None + + @staticmethod + def from_dict(d): + """Build a Node from its wire dict.""" + return Node( + id=d["id"], + kind=d.get("kind", ""), + device_type=d.get("deviceType", ""), + access_paths=[AccessPath.from_dict(a) for a in d.get("accessPaths", [])], + capabilities=[Capability.from_dict(c) for c in d.get("capabilities", [])], + ) + + +@dataclass +class Fragment: + """A producer's slice of the topology: nodes + relationships + its provider id.""" + + provider: str + nodes: list = field(default_factory=list) + relationships: list = field(default_factory=list) + name: str = "" + version: str = "0.1.0" + + @staticmethod + def from_dict(d): + """Build a Fragment from a producer's wire dict.""" + prod = d.get("producer", {}) + return Fragment( + provider=prod.get("provider", ""), + name=prod.get("name", ""), + version=d.get("topologyVersion", "0.1.0"), + nodes=[Node.from_dict(n) for n in d.get("nodes", [])], + relationships=list(d.get("relationships", [])), + ) + + +@dataclass +class SiteGraph: + """The merged site: one node per physical device, carrying all producers' access paths.""" + + nodes: list = field(default_factory=list) + relationships: list = field(default_factory=list) + + def node(self, node_id) -> Optional[Node]: + """Return the node with this id, or None.""" + for n in self.nodes: + if n.id == node_id: + return n + return None + + +def merge_fragments(fragments) -> SiteGraph: + """Merge producer fragments into one site graph, keyed by node id (serial). + + Same id from multiple producers becomes one node carrying every producer's + access paths (ranked by preference desc) and the union of its capabilities. + Distinct ids become sibling nodes. Relationships are combined. + """ + by_id = {} + order = [] + relationships = [] + for frag in fragments: + for n in frag.nodes: + if n.id not in by_id: + by_id[n.id] = Node(id=n.id, kind=n.kind, device_type=n.device_type, access_paths=list(n.access_paths), capabilities=list(n.capabilities)) + order.append(n.id) + else: + existing = by_id[n.id] + seen_ap = {ap.id for ap in existing.access_paths} + existing.access_paths.extend(ap for ap in n.access_paths if ap.id not in seen_ap) + seen_cap = {(c.capability, c.access_path) for c in existing.capabilities} + existing.capabilities.extend(c for c in n.capabilities if (c.capability, c.access_path) not in seen_cap) + relationships.extend(frag.relationships) + for n in by_id.values(): + n.access_paths.sort(key=lambda ap: ap.preference, reverse=True) + return SiteGraph(nodes=[by_id[i] for i in order], relationships=relationships) + + +@dataclass +class ResolveResult: + """Outcome of a resolve: which provider/access path, the (clamped) value, and ok flag.""" + + ok: bool = False + provider: str = "" + access_path: str = "" + value: Optional[int] = None + reason: str = "" + + +def _pick_path(node, cap, available, need_control): + """Pick the highest-preference access path that is available and serves cap (read/control).""" + for ap in node.access_paths: # already preference-desc from merge + served = next((x for x in node.capabilities if x.capability == cap.capability and x.access_path == ap.id and (x.control if need_control else x.read)), None) + if served is not None and ap.provider in available: + return ap, served + return None, None + + +def _clamp(value, constraints): + """Clamp value to constraints' min/max when present; tolerate malformed (non-comparable) bounds.""" + if value is None: + return None + low = constraints.get("min") + high = constraints.get("max") + try: + if low is not None and value < low: + value = low + if high is not None and value > high: + value = high + except TypeError: + return value # malformed constraints (e.g. string bounds) -> leave unclamped rather than raise + return value + + +def resolve_control(site, capability, node_id, value, available): + """Resolve a control intent: pick the best available access path and clamp the value. + + `available` is the set of provider ids currently reachable (liveness). Returns a + ResolveResult; ok is False when no available provider can control the capability. + """ + node = site.node(node_id) + if node is None: + return ResolveResult(reason="no such node") + cap = node.capability(capability) + if cap is None: + return ResolveResult(reason="capability not offered") + ap, served = _pick_path(node, cap, available, need_control=True) + if ap is None: + return ResolveResult(reason="no available control path") + return ResolveResult(ok=True, provider=ap.provider, access_path=ap.id, value=_clamp(value, served.constraints)) + + +def resolve_read(site, capability, node_id, available): + """Resolve a read: pick the best available access path that can read the capability.""" + node = site.node(node_id) + if node is None: + return ResolveResult(reason="no such node") + cap = node.capability(capability) + if cap is None: + return ResolveResult(reason="capability not offered") + ap, served = _pick_path(node, cap, available, need_control=False) + if ap is None: + return ResolveResult(reason="no available read path") + return ResolveResult(ok=True, provider=ap.provider, access_path=ap.id) + + +def control_candidates(site, capability, node_id, value, available): + """Return ranked (provider, access_path_id, clamped_value) candidates for a control intent. + + Highest-preference first, one entry per available access path that can control the + capability, with `value` clamped to that path's own constraints. Used by callers that + want to try providers in order and fall back on execution failure (not just availability). + """ + node = site.node(node_id) + if node is None: + return [] + cap = node.capability(capability) + if cap is None: + return [] + candidates = [] + for ap in node.access_paths: # preference-desc from merge + served = next((x for x in node.capabilities if x.capability == capability and x.access_path == ap.id and x.control), None) + if served is not None and ap.provider in available: + candidates.append((ap.provider, ap.id, _clamp(value, served.constraints))) + return candidates + + +# The capabilities a battery inverter offers, as data. Each spec: name, read/control, unit, and +# how to bound it ("rated" => 0..rated_w for power; a fixed (min,max) for percentages). +INVERTER_CAPS = ( + {"name": "charge_rate", "read": True, "control": True, "unit": "W", "max": "rated"}, + {"name": "discharge_rate", "read": True, "control": True, "unit": "W", "max": "rated"}, + {"name": "target_soc", "read": True, "control": True, "unit": "%", "min": 0, "max": 100}, + {"name": "reserve_soc", "read": True, "control": True, "unit": "%", "min": 0, "max": 100}, + {"name": "soc", "read": True, "control": False, "unit": "%"}, +) + + +def _cap_constraints(spec, rated): + """Resolve a capability spec's constraints against an inverter's rated power.""" + constraints = {} + low = spec.get("min") + if low is not None: + constraints["min"] = low + high = spec.get("max") + if high == "rated": + constraints["min"] = constraints.get("min", 0) + if rated > 0: + constraints["max"] = rated + elif isinstance(high, int): + constraints["max"] = high + return constraints + + +def inverter_fragment(inverters, provider, name, transport, preference, locality, cap_specs=INVERTER_CAPS, controllable=None): + """Build a producer fragment from plain inverter data. + + Each inverter dict needs a `serial` (skipped if missing) and may carry `device_type` + and `rated_w` (used for "rated" power ceilings). Every inverter becomes a node offering + the given capability specs on one access path. Pure — no PredBat deps. + + `controllable` is the set of capability names this provider can ACTUALLY execute via its + `lattice_control`. A capability is marked `control` only if its spec allows control AND it + is in `controllable` — so the fragment never over-promises (a provider with no executor must + pass `()` to stay read-only). `None` (default) keeps every control-capable spec controllable. + """ + nodes = [] + for inv in inverters: + serial = inv.get("serial") + if not serial: + continue + rated = int(inv.get("rated_w", 0) or 0) + caps = [] + for spec in cap_specs: + can_control = bool(spec.get("control", False)) + if controllable is not None: + can_control = can_control and spec["name"] in controllable + caps.append( + { + "capability": spec["name"], + "unit": spec.get("unit", ""), + "read": bool(spec.get("read", False)), + "control": can_control, + "accessPath": provider, + "constraints": _cap_constraints(spec, rated), + } + ) + nodes.append( + { + "id": serial, + "kind": "inverter", + "deviceType": str(inv.get("device_type", "")).lower(), + "accessPaths": [{"id": provider, "provider": provider, "locality": locality, "transport": transport, "preference": preference}], + "capabilities": caps, + } + ) + return {"topologyVersion": "0.1.0", "scope": "fragment", "producer": {"name": name, "provider": provider}, "nodes": nodes, "relationships": []} diff --git a/apps/predbat/lattice_component.py b/apps/predbat/lattice_component.py new file mode 100644 index 000000000..39cfecc5f --- /dev/null +++ b/apps/predbat/lattice_component.py @@ -0,0 +1,37 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System - Lattice projection component +# Copyright Trefor Southwell 2026 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +"""Runs the Lattice projection inside the live PredBat system. + +When `lattice_projection_enable` is on, this component periodically rebuilds the merged site +graph from every producer component and logs what it sees — which devices, their ranked access +paths, and which providers are currently reachable. This is observability only: it does NOT +route or execute control (the inverter hot-path wiring is a separate, reviewed step). When the +flag is off the component is a no-op, so it is safe to register unconditionally. +""" +from component_base import ComponentBase +from lattice_projection import LatticeProjection + + +class LatticeComponent(ComponentBase): + """Live host for the Lattice projection (shadow/observability for now).""" + + def initialize(self, **kwargs): + """Create the projection over the PredBat base.""" + self.projection = LatticeProjection(self.base) + self.run_timeout = 60 + + async def run(self, seconds, first): + """Rebuild + log the merged site graph when enabled; no-op when disabled.""" + if not self.projection.enabled(): + return True + site = self.projection.refresh() + live = self.projection.live_providers() + self.log("Lattice: merged site graph has {} node(s); live providers: {}".format(len(site.nodes), sorted(live))) + for node in site.nodes: + providers = [ap.provider for ap in node.access_paths] + reachable = [p for p in providers if p in live] + self.log("Lattice: node {} ({}) access paths {} -> reachable {}".format(node.id, node.device_type, providers, reachable)) + return True diff --git a/apps/predbat/lattice_projection.py b/apps/predbat/lattice_projection.py new file mode 100644 index 000000000..3e95246a5 --- /dev/null +++ b/apps/predbat/lattice_projection.py @@ -0,0 +1,166 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System - Lattice projection table + glue +# Copyright Trefor Southwell 2026 - All Rights Reserved +# This application maybe used for personal use only and not for commercial use +# ----------------------------------------------------------------------------- +"""Curated projection of Lattice capabilities onto existing predbat.* entities. + +Only the capabilities listed here are routed through the Lattice resolver; every +other entity behaves exactly as today (incremental, reversible adoption). +""" +from dataclasses import dataclass + +from lattice import merge_fragments, resolve_control, resolve_read, control_candidates, Fragment, ResolveResult + + +@dataclass(frozen=True) +class ProjectionEntry: + """One curated mapping: (capability, scope) to a predbat.* entity plus direction.""" + + capability: str + scope: str + entity: str + read: bool + write: bool + + +# Curated capabilities routed through the resolver. Charge/discharge rate is the first slice +# (largest provider-fallback win); target/reserve SOC follow. Each is per-device (battery-system) +# scope so it resolves to a single node's access path. Plant-scope reads (e.g. aggregate soc) +# need plant aggregation in the resolver — deferred. +PROJECTION_TABLE = ( + ProjectionEntry("charge_rate", "battery-system", "predbat.charge_rate", read=True, write=True), + ProjectionEntry("discharge_rate", "battery-system", "predbat.discharge_rate", read=True, write=True), + ProjectionEntry("target_soc", "battery-system", "predbat.target_soc", read=True, write=True), + ProjectionEntry("reserve_soc", "battery-system", "predbat.reserve_soc", read=True, write=True), +) + + +def projection_entries(): + """Return all curated projection entries.""" + return PROJECTION_TABLE + + +def entity_for(capability, scope): + """Return the ProjectionEntry for (capability, scope), or None if not projected.""" + for entry in PROJECTION_TABLE: + if entry.capability == capability and entry.scope == scope: + return entry + return None + + +class LatticeProjection: + """Collects producer fragments, merges them, and routes projected reads/writes. + + Feature-flagged: callers only consult it for capabilities in PROJECTION_TABLE + and only when enabled; everything else stays on today's code paths. + """ + + def __init__(self, base): + """Hold the PredBat base for component access and logging.""" + self.base = base + self.site = None + self.providers = {} # provider id -> producing component (built at refresh) + + def enabled(self): + """True when the Lattice projection is switched on (default off).""" + return bool(self.base.get_arg("lattice_projection_enable", False)) + + def _producers(self): + """Yield (name, component) for every registered component that publishes a fragment. + + Discovery is data-driven: ANY component implementing lattice_fragment is a producer, + so a new integration (Fox, Solax, Solis, ...) is picked up here with no change. + """ + registry = getattr(self.base, "components", None) + if registry is None: + return + for name in registry.get_all(): + comp = registry.get_component(name) + if comp is not None and hasattr(comp, "lattice_fragment"): + yield name, comp + + def refresh(self): + """Re-collect fragments from all producers and rebuild the merged site graph. + + Each producer declares its own provider id in the fragment, so the provider->component + map is built from the data — there is no hardcoded brand list. + """ + fragments = [] + self.providers = {} + for name, comp in self._producers(): + try: + fragment = Fragment.from_dict(comp.lattice_fragment()) + except Exception as exc: # a bad producer must not break the others + self.base.log("Warn: lattice: producer {} failed: {}".format(name, exc)) + continue + if fragment.provider: + self.providers[fragment.provider] = comp + fragments.append(fragment) + self.site = merge_fragments(fragments) + return self.site + + def live_providers(self): + """Provider ids currently reachable (a producing component that reports alive).""" + live = set() + for provider, comp in self.providers.items(): + try: + if comp.is_alive(): + live.add(provider) + except Exception: # treat an unhealthy producer as unavailable + continue + return live + + def write(self, capability, scope, node_id, value, available=None): + """Resolve a projected control write; returns a ResolveResult naming the chosen provider.""" + if entity_for(capability, scope) is None or self.site is None: + return ResolveResult(reason="not projected") + avail = available if available is not None else self.live_providers() + return resolve_control(self.site, capability, node_id, value, avail) + + def read(self, capability, scope, node_id, available=None): + """Resolve a projected read; returns a ResolveResult naming the chosen provider.""" + if entity_for(capability, scope) is None or self.site is None: + return ResolveResult(reason="not projected") + avail = available if available is not None else self.live_providers() + return resolve_read(self.site, capability, node_id, avail) + + def _component_for_provider(self, provider): + """Return the producing component that backs a provider id, or None.""" + return self.providers.get(provider) + + def would_handle(self, capability, scope, node_id): + """True if enabled, the capability is projected, and a provider can currently control it. + + A cheap synchronous gate for the live write path: if this returns False the caller must + fall back to its normal write so control is never lost. + """ + if not self.enabled() or entity_for(capability, scope) is None or self.site is None: + return False + return bool(control_candidates(self.site, capability, node_id, 0, self.live_providers())) + + async def apply(self, capability, scope, node_id, value, available=None): + """Resolve AND execute a projected control write, trying providers in preference order. + + Iterates the ranked control candidates; calls each provider's async lattice_control + and returns on the first success. Falls back to the next provider not only when one is + unavailable but when its write FAILS — the real gateway->cloud resilience. + """ + if entity_for(capability, scope) is None or self.site is None: + return ResolveResult(reason="not projected") + avail = available if available is not None else self.live_providers() + candidates = control_candidates(self.site, capability, node_id, value, avail) + if not candidates: + return ResolveResult(reason="no available control path") + for provider, access_path, clamped in candidates: + comp = self._component_for_provider(provider) + if comp is None or not hasattr(comp, "lattice_control"): + continue + try: + ok = await comp.lattice_control(node_id, capability, clamped) + except Exception as exc: # a failing provider must not stop the fallback + self.base.log("Warn: lattice: {} control raised: {}".format(provider, exc)) + ok = False + if ok: + return ResolveResult(ok=True, provider=provider, access_path=access_path, value=clamped) + return ResolveResult(reason="all providers failed") diff --git a/apps/predbat/test_lattice.py b/apps/predbat/test_lattice.py new file mode 100644 index 000000000..7f259282c --- /dev/null +++ b/apps/predbat/test_lattice.py @@ -0,0 +1,536 @@ +# ----------------------------------------------------------------------------- +# Predbat Home Battery System - Lattice projection core unit tests +# ----------------------------------------------------------------------------- +"""Unit tests for the pure Lattice model/merge/resolve core. + +These run standalone (no PredBat / Home Assistant), like test_oo_utils.py. +""" +import unittest + +from lattice import Fragment, Node, Capability, AccessPath, merge_fragments, resolve_control, resolve_read, inverter_fragment, control_candidates +from lattice_projection import entity_for, projection_entries, LatticeProjection + + +class TestModel(unittest.TestCase): + """Fragment parses from and round-trips to the wire dict shape.""" + + def test_fragment_from_dict(self): + """A producer dict becomes a typed Fragment with nodes/capabilities/access paths.""" + d = { + "topologyVersion": "0.1.0", + "scope": "fragment", + "producer": {"name": "Local gateway", "provider": "local-gateway"}, + "nodes": [ + { + "id": "INV-1", + "kind": "inverter", + "deviceType": "ge-aio", + "accessPaths": [{"id": "gw-local", "provider": "local-gateway", "locality": "local", "transport": "modbus", "preference": 10}], + "capabilities": [{"capability": "charge_rate", "unit": "W", "read": True, "control": True, "accessPath": "gw-local", "constraints": {"min": 0, "max": 6000}}], + } + ], + "relationships": [], + } + f = Fragment.from_dict(d) + self.assertEqual(f.provider, "local-gateway") + self.assertEqual(len(f.nodes), 1) + n = f.nodes[0] + self.assertEqual(n.id, "INV-1") + self.assertEqual(n.access_paths[0].preference, 10) + cap = n.capability("charge_rate") + self.assertIsNotNone(cap) + self.assertTrue(cap.control) + self.assertEqual(cap.constraints["max"], 6000) + + def test_node_capability_missing(self): + """capability() returns None for an unknown capability.""" + n = Node(id="X", kind="meter", device_type="m", access_paths=[], capabilities=[]) + self.assertIsNone(n.capability("charge_rate")) + + def test_access_path_defaults(self): + """AccessPath.from_dict tolerates a minimal dict.""" + ap = AccessPath.from_dict({"id": "x"}) + self.assertEqual(ap.id, "x") + self.assertEqual(ap.preference, 0) + + def test_capability_defaults(self): + """Capability.from_dict defaults read/control to False and constraints to {}.""" + c = Capability.from_dict({"capability": "soc"}) + self.assertFalse(c.read) + self.assertFalse(c.control) + self.assertEqual(c.constraints, {}) + + +class TestMerge(unittest.TestCase): + """Same serial from two producers becomes one node with both access paths, ranked.""" + + def _frag(self, provider, pref, transport): + """Build a one-node fragment for INV-1 with a single access path at the given preference.""" + return Fragment.from_dict( + { + "producer": {"name": provider, "provider": provider}, + "nodes": [ + { + "id": "INV-1", + "kind": "inverter", + "deviceType": "ge-aio", + "accessPaths": [{"id": provider, "provider": provider, "locality": "local" if pref >= 10 else "cloud", "transport": transport, "preference": pref}], + "capabilities": [{"capability": "charge_rate", "control": True, "accessPath": provider}], + } + ], + "relationships": [], + } + ) + + def test_merge_by_serial(self): + """One physical device seen via gateway + cloud => one node, 2 access paths (gateway first).""" + merged = merge_fragments([self._frag("local-gateway", 10, "modbus"), self._frag("ge-cloud", 1, "https")]) + self.assertEqual(len(merged.nodes), 1) + node = merged.nodes[0] + self.assertEqual(node.id, "INV-1") + self.assertEqual([ap.provider for ap in node.access_paths], ["local-gateway", "ge-cloud"]) # preference desc + + def test_distinct_serials_are_siblings(self): + """Different serials remain separate nodes.""" + a = self._frag("local-gateway", 10, "modbus") + b = self._frag("ge-cloud", 1, "https") + b.nodes[0].id = "INV-2" + merged = merge_fragments([a, b]) + self.assertEqual(len(merged.nodes), 2) + + +class TestResolve(unittest.TestCase): + """Resolution picks the highest-preference AVAILABLE access path, clamps, and falls back.""" + + def _site(self): + """A site where INV-1 is reachable via gateway (preferred) and GE-Cloud (fallback).""" + return merge_fragments( + [ + Fragment.from_dict( + { + "producer": {"provider": "local-gateway"}, + "nodes": [ + { + "id": "INV-1", + "kind": "inverter", + "deviceType": "ge-aio", + "accessPaths": [{"id": "gw-local", "provider": "local-gateway", "preference": 10}], + "capabilities": [{"capability": "charge_rate", "read": True, "control": True, "accessPath": "gw-local", "constraints": {"min": 0, "max": 6000}}], + } + ], + } + ), + Fragment.from_dict( + { + "producer": {"provider": "ge-cloud"}, + "nodes": [ + { + "id": "INV-1", + "kind": "inverter", + "deviceType": "ge-aio", + "accessPaths": [{"id": "ge-cloud", "provider": "ge-cloud", "preference": 1}], + "capabilities": [{"capability": "charge_rate", "read": True, "control": True, "accessPath": "ge-cloud", "constraints": {"min": 0, "max": 6000}}], + } + ], + } + ), + ] + ) + + def test_control_prefers_gateway(self): + """With both available, control routes to the local gateway.""" + r = resolve_control(self._site(), "charge_rate", "INV-1", 3000, available={"local-gateway", "ge-cloud"}) + self.assertTrue(r.ok) + self.assertEqual(r.provider, "local-gateway") + self.assertEqual(r.value, 3000) + + def test_control_falls_back_to_cloud(self): + """Gateway unavailable (weak link) => control falls back to GE-Cloud — the Phil scenario.""" + r = resolve_control(self._site(), "charge_rate", "INV-1", 3000, available={"ge-cloud"}) + self.assertTrue(r.ok) + self.assertEqual(r.provider, "ge-cloud") + + def test_control_clamps_to_constraints(self): + """A value above max clamps to the capability ceiling.""" + r = resolve_control(self._site(), "charge_rate", "INV-1", 9999, available={"local-gateway"}) + self.assertEqual(r.value, 6000) + + def test_control_none_available(self): + """No available provider => not ok (caller leaves the entity as-is).""" + r = resolve_control(self._site(), "charge_rate", "INV-1", 3000, available=set()) + self.assertFalse(r.ok) + + def test_read_prefers_gateway(self): + """Read resolves to the highest-preference available provider.""" + r = resolve_read(self._site(), "charge_rate", "INV-1", available={"local-gateway", "ge-cloud"}) + self.assertTrue(r.ok) + self.assertEqual(r.provider, "local-gateway") + + +class TestProducerFragment(unittest.TestCase): + """inverter_fragment builds a producer fragment from plain inverter data.""" + + def test_gateway_local_fragment(self): + """A local producer emits a high-preference modbus access path with a rated ceiling.""" + d = inverter_fragment([{"serial": "CH-1", "device_type": "GIVENERGY_AIO", "rated_w": 6000}], provider="local-gateway", name="Local gateway", transport="modbus", preference=10, locality="local") + f = Fragment.from_dict(d) + self.assertEqual(f.provider, "local-gateway") + n = f.nodes[0] + self.assertEqual(n.id, "CH-1") + self.assertEqual(n.access_paths[0].preference, 10) + self.assertTrue(n.capability("charge_rate").control) + self.assertEqual(n.capability("charge_rate").constraints["max"], 6000) + + def test_cloud_fragment_low_pref_open_max(self): + """A cloud producer emits a low-preference https path; with no rating the max is open.""" + d = inverter_fragment([{"serial": "CH-1", "device_type": "ge-aio"}], provider="ge-cloud", name="GivEnergy Cloud", transport="https", preference=1, locality="cloud") + f = Fragment.from_dict(d) + self.assertEqual(f.nodes[0].access_paths[0].preference, 1) + self.assertEqual(f.nodes[0].capability("charge_rate").constraints, {"min": 0}) + + def test_skips_inverters_without_serial(self): + """Inverters with no serial are skipped (cannot be identity-keyed).""" + d = inverter_fragment([{"device_type": "x"}], provider="p", name="n", transport="t", preference=1, locality="local") + self.assertEqual(d["nodes"], []) + + def test_default_caps_include_soc_and_targets(self): + """The default inverter fragment exposes soc(read-only) + target_soc/reserve_soc(percent control).""" + f = Fragment.from_dict(inverter_fragment([{"serial": "CH-1", "rated_w": 6000}], provider="local-gateway", name="GW", transport="modbus", preference=10, locality="local")) + n = f.nodes[0] + self.assertTrue(n.capability("soc").read) + self.assertFalse(n.capability("soc").control) + self.assertEqual(n.capability("target_soc").constraints, {"min": 0, "max": 100}) + self.assertTrue(n.capability("reserve_soc").control) + self.assertEqual(n.capability("charge_rate").constraints, {"min": 0, "max": 6000}) # rate still W/rated + + def test_controllable_restricts_control_flags(self): + """A provider declares control=true ONLY for capabilities it can actually execute.""" + # read-only producer (no lattice_control): nothing controllable + ro = Fragment.from_dict(inverter_fragment([{"serial": "X1", "rated_w": 6000}], provider="fox-cloud", name="Fox", transport="https", preference=1, locality="cloud", controllable=())) + n = ro.nodes[0] + self.assertFalse(n.capability("charge_rate").control) + self.assertTrue(n.capability("charge_rate").read) # still readable + # partial: only charge_rate executable + partial = Fragment.from_dict(inverter_fragment([{"serial": "X1", "rated_w": 6000}], provider="local-gateway", name="GW", transport="modbus", preference=10, locality="local", controllable=("charge_rate", "discharge_rate"))) + p = partial.nodes[0] + self.assertTrue(p.capability("charge_rate").control) + self.assertFalse(p.capability("target_soc").control) # not executable here + + def test_gateway_and_cloud_merge_to_one_node(self): + """The two producers' fragments merge by serial into one node with both paths (gateway first).""" + gw = Fragment.from_dict(inverter_fragment([{"serial": "CH-1", "rated_w": 6000}], provider="local-gateway", name="GW", transport="modbus", preference=10, locality="local")) + cloud = Fragment.from_dict(inverter_fragment([{"serial": "CH-1"}], provider="ge-cloud", name="Cloud", transport="https", preference=1, locality="cloud")) + merged = merge_fragments([gw, cloud]) + self.assertEqual(len(merged.nodes), 1) + self.assertEqual([ap.provider for ap in merged.nodes[0].access_paths], ["local-gateway", "ge-cloud"]) + + +class TestControlCandidates(unittest.TestCase): + """control_candidates returns ranked, per-path-clamped (provider, access_path, value) tuples.""" + + def _site(self): + """INV-1 via gateway (pref 10, max 6000) and GE-Cloud (pref 1, open max).""" + return merge_fragments( + [ + Fragment.from_dict(inverter_fragment([{"serial": "INV-1", "rated_w": 6000}], provider="local-gateway", name="GW", transport="modbus", preference=10, locality="local")), + Fragment.from_dict(inverter_fragment([{"serial": "INV-1"}], provider="ge-cloud", name="Cloud", transport="https", preference=1, locality="cloud")), + ] + ) + + def test_ranked_and_clamped(self): + """Both providers returned gateway-first; the gateway candidate clamps to its 6000 max.""" + candidates = control_candidates(self._site(), "charge_rate", "INV-1", 9999, {"local-gateway", "ge-cloud"}) + self.assertEqual([c[0] for c in candidates], ["local-gateway", "ge-cloud"]) + self.assertEqual(candidates[0][2], 6000) + + def test_only_available(self): + """Unavailable providers are excluded.""" + candidates = control_candidates(self._site(), "charge_rate", "INV-1", 3000, {"ge-cloud"}) + self.assertEqual([c[0] for c in candidates], ["ge-cloud"]) + + def test_unknown_node_or_capability(self): + """A missing node or capability yields no candidates.""" + self.assertEqual(control_candidates(self._site(), "charge_rate", "NOPE", 1, {"local-gateway"}), []) + self.assertEqual(control_candidates(self._site(), "nope", "INV-1", 1, {"local-gateway"}), []) + + def test_malformed_constraints_do_not_crash(self): + """A non-numeric constraint bound (malformed fragment) must not raise — value left unclamped.""" + site = merge_fragments( + [ + Fragment.from_dict( + { + "producer": {"provider": "p"}, + "nodes": [ + { + "id": "N", + "kind": "inverter", + "deviceType": "x", + "accessPaths": [{"id": "p", "provider": "p", "preference": 5}], + "capabilities": [{"capability": "charge_rate", "control": True, "accessPath": "p", "constraints": {"min": 0, "max": "6000"}}], + } + ], + } + ) + ] + ) + candidates = control_candidates(site, "charge_rate", "N", 9999, {"p"}) + self.assertEqual([c[0] for c in candidates], ["p"]) # resolved without raising + + +class TestProjection(unittest.TestCase): + """The curated table maps (capability, scope) to a predbat.* entity and direction.""" + + def test_charge_rate_entry(self): + """charge_rate (battery-system) maps to predbat.charge_rate, read+write.""" + ent = entity_for("charge_rate", "battery-system") + self.assertEqual(ent.entity, "predbat.charge_rate") + self.assertTrue(ent.write) + + def test_unknown_capability_not_projected(self): + """A capability not in the table returns None (behaves as today).""" + self.assertIsNone(entity_for("does_not_exist", "plant")) + + def test_entries_are_unique(self): + """No duplicate (capability, scope) keys in the curated table.""" + keys = [(e.capability, e.scope) for e in projection_entries()] + self.assertEqual(len(keys), len(set(keys))) + + def test_target_and_reserve_entries(self): + """target_soc and reserve_soc are projected controls.""" + self.assertEqual(entity_for("target_soc", "battery-system").entity, "predbat.target_soc") + self.assertTrue(entity_for("reserve_soc", "battery-system").write) + + +class _FakeComp: + """A stand-in producer component exposing lattice_fragment() and is_alive().""" + + def __init__(self, fragment, alive=True): + """Hold a pre-built fragment dict and a liveness flag.""" + self._fragment = fragment + self._alive = alive + + def lattice_fragment(self): + """Return the canned fragment dict.""" + return self._fragment + + def is_alive(self): + """Return whether this producer is currently reachable.""" + return self._alive + + +class _FakeComponents: + """A stand-in component registry mapping name to component.""" + + def __init__(self, mapping): + """Hold the name -> component mapping.""" + self._mapping = mapping + + def get_component(self, name): + """Return the component for name, or None.""" + return self._mapping.get(name) + + def get_all(self): + """Return all registered component names.""" + return list(self._mapping.keys()) + + +class _FakeBase: + """A minimal PredBat base for dependency-injected projection tests.""" + + def __init__(self, components, args=None): + """Hold a components registry and an args dict (+ ComponentBase init attrs).""" + self.components = components + self._args = args or {} + self.args = self._args + self.logs = [] + self.local_tz = None + self.prefix = "predbat" + + def get_arg(self, name, default=None): + """Return a config arg from the fake args dict.""" + return self._args.get(name, default) + + def log(self, message): + """Capture a log line.""" + self.logs.append(message) + + +class TestLatticeProjection(unittest.TestCase): + """End-to-end: collect producers, merge, and route charge_rate with gateway->cloud fallback.""" + + def _proj(self, gw_alive=True, cloud_alive=True, args=None): + """Build a LatticeProjection over fake gateway + gecloud producers and refresh it.""" + gw = inverter_fragment([{"serial": "CH-1", "rated_w": 6000}], provider="local-gateway", name="GW", transport="modbus", preference=10, locality="local") + cloud = inverter_fragment([{"serial": "CH-1"}], provider="ge-cloud", name="Cloud", transport="https", preference=1, locality="cloud") + comps = _FakeComponents({"gateway": _FakeComp(gw, gw_alive), "gecloud": _FakeComp(cloud, cloud_alive)}) + proj = LatticeProjection(_FakeBase(comps, args)) + proj.refresh() + return proj + + def test_refresh_merges_producers(self): + """refresh() collects both producers and merges them by serial into one node.""" + proj = self._proj() + self.assertEqual(len(proj.site.nodes), 1) + self.assertEqual(len(proj.site.nodes[0].access_paths), 2) + + def test_write_prefers_gateway(self): + """With both producers alive, charge_rate write resolves to the gateway.""" + r = self._proj().write("charge_rate", "battery-system", "CH-1", 3000) + self.assertTrue(r.ok) + self.assertEqual(r.provider, "local-gateway") + self.assertEqual(r.value, 3000) + + def test_write_falls_back_when_gateway_down(self): + """Gateway component not alive => write falls back to GE-Cloud (the Phil scenario).""" + r = self._proj(gw_alive=False).write("charge_rate", "battery-system", "CH-1", 3000) + self.assertTrue(r.ok) + self.assertEqual(r.provider, "ge-cloud") + + def test_write_capability_not_in_table(self): + """A capability not in the projection table is not routed.""" + r = self._proj().write("does_not_exist", "plant", "CH-1", 1) + self.assertFalse(r.ok) + + def test_enabled_default_off(self): + """The projection is switched off unless lattice_projection_enable is set.""" + self.assertFalse(self._proj().enabled()) + self.assertTrue(self._proj(args={"lattice_projection_enable": True}).enabled()) + + def test_would_handle(self): + """would_handle gates the live write path: enabled + projected + an available provider.""" + off = self._proj() # flag off by default + self.assertFalse(off.would_handle("charge_rate", "battery-system", "CH-1")) + on = self._proj(args={"lattice_projection_enable": True}) + self.assertTrue(on.would_handle("charge_rate", "battery-system", "CH-1")) + self.assertFalse(on.would_handle("does_not_exist", "plant", "CH-1")) + self.assertFalse(on.would_handle("charge_rate", "battery-system", "NOPE")) # no such node + + def test_would_handle_false_when_no_provider_live(self): + """No reachable provider => would_handle is False (caller keeps the normal write).""" + on = self._proj(gw_alive=False, cloud_alive=False, args={"lattice_projection_enable": True}) + self.assertFalse(on.would_handle("charge_rate", "battery-system", "CH-1")) + + def test_discovers_any_brand_producer(self): + """A third brand (Fox) that publishes a fragment is auto-discovered — no projection change.""" + gw = inverter_fragment([{"serial": "GE-1", "rated_w": 6000}], provider="local-gateway", name="GW", transport="modbus", preference=10, locality="local") + fox = inverter_fragment([{"serial": "FOX-1", "rated_w": 3600}], provider="fox-cloud", name="Fox", transport="https", preference=1, locality="cloud") + comps = _FakeComponents({"gateway": _FakeComp(gw), "fox": _FakeComp(fox)}) + proj = LatticeProjection(_FakeBase(comps)) + proj.refresh() + self.assertEqual({n.id for n in proj.site.nodes}, {"GE-1", "FOX-1"}) + # the Fox-only node resolves to the fox-cloud provider with no central registration + r = proj.write("charge_rate", "battery-system", "FOX-1", 2000) + self.assertTrue(r.ok) + self.assertEqual(r.provider, "fox-cloud") + + +class _FakeAsyncComp: + """A producer whose lattice_control records calls and can simulate failure/liveness.""" + + def __init__(self, fragment, alive=True, succeed=True): + """Hold a fragment, a liveness flag, and whether writes succeed.""" + self._fragment = fragment + self._alive = alive + self._succeed = succeed + self.calls = [] + + def lattice_fragment(self): + """Return the canned fragment dict.""" + return self._fragment + + def is_alive(self): + """Return whether this producer is reachable.""" + return self._alive + + async def lattice_control(self, node_id, capability, value): + """Record the control call and return the configured success flag.""" + self.calls.append((node_id, capability, value)) + return self._succeed + + +class TestLatticeApply(unittest.IsolatedAsyncioTestCase): + """apply() resolves AND executes, trying providers in order and falling back on failure.""" + + def _build(self, gw_alive=True, gw_ok=True, cloud_alive=True, cloud_ok=True): + """Wire a projection over fake async gateway + gecloud producers for INV-1.""" + gw_frag = inverter_fragment([{"serial": "INV-1", "rated_w": 6000}], provider="local-gateway", name="GW", transport="modbus", preference=10, locality="local") + cloud_frag = inverter_fragment([{"serial": "INV-1"}], provider="ge-cloud", name="Cloud", transport="https", preference=1, locality="cloud") + gw = _FakeAsyncComp(gw_frag, gw_alive, gw_ok) + cloud = _FakeAsyncComp(cloud_frag, cloud_alive, cloud_ok) + proj = LatticeProjection(_FakeBase(_FakeComponents({"gateway": gw, "gecloud": cloud}))) + proj.refresh() + return proj, gw, cloud + + async def test_executes_on_gateway(self): + """Both up + gateway write succeeds => executed on the gateway with the clamped value.""" + proj, gw, cloud = self._build() + r = await proj.apply("charge_rate", "battery-system", "INV-1", 3000) + self.assertTrue(r.ok) + self.assertEqual(r.provider, "local-gateway") + self.assertEqual(gw.calls, [("INV-1", "charge_rate", 3000)]) + self.assertEqual(cloud.calls, []) + + async def test_falls_back_when_gateway_write_fails(self): + """Gateway write returns False => fall back and execute on GE-Cloud.""" + proj, gw, cloud = self._build(gw_ok=False) + r = await proj.apply("charge_rate", "battery-system", "INV-1", 3000) + self.assertTrue(r.ok) + self.assertEqual(r.provider, "ge-cloud") + self.assertEqual(len(gw.calls), 1) # tried first + self.assertEqual(len(cloud.calls), 1) # then fell back + + async def test_gateway_down_uses_cloud(self): + """Gateway not alive => only GE-Cloud is a candidate.""" + proj, gw, cloud = self._build(gw_alive=False) + r = await proj.apply("charge_rate", "battery-system", "INV-1", 3000) + self.assertTrue(r.ok) + self.assertEqual(r.provider, "ge-cloud") + self.assertEqual(gw.calls, []) + + async def test_all_providers_fail(self): + """Every provider's write fails => not ok.""" + proj, gw, cloud = self._build(gw_ok=False, cloud_ok=False) + r = await proj.apply("charge_rate", "battery-system", "INV-1", 3000) + self.assertFalse(r.ok) + + async def test_capability_not_in_table_not_applied(self): + """A capability not in the table is not executed.""" + proj, gw, cloud = self._build() + r = await proj.apply("does_not_exist", "plant", "INV-1", 1) + self.assertFalse(r.ok) + self.assertEqual(gw.calls, []) + + +class TestLatticeComponent(unittest.IsolatedAsyncioTestCase): + """The live component refreshes + logs the merged graph when enabled, and is a no-op when off.""" + + def _base(self, enabled): + """A fake base with a gateway + gecloud producer; flag on/off.""" + gw = inverter_fragment([{"serial": "INV-1", "rated_w": 6000}], provider="local-gateway", name="GW", transport="modbus", preference=10, locality="local") + cloud = inverter_fragment([{"serial": "INV-1"}], provider="ge-cloud", name="Cloud", transport="https", preference=1, locality="cloud") + comps = _FakeComponents({"gateway": _FakeComp(gw), "gecloud": _FakeComp(cloud)}) + return _FakeBase(comps, args={"lattice_projection_enable": True} if enabled else None) + + async def test_run_noop_when_disabled(self): + """Flag off: run() returns True, does not refresh, and logs nothing.""" + from lattice_component import LatticeComponent + + comp = LatticeComponent(self._base(enabled=False)) + ok = await comp.run(0, True) + self.assertTrue(ok) + self.assertIsNone(comp.projection.site) + self.assertEqual(comp.base.logs, []) + + async def test_run_logs_graph_when_enabled(self): + """Flag on: run() merges producers and logs the graph.""" + from lattice_component import LatticeComponent + + comp = LatticeComponent(self._base(enabled=True)) + ok = await comp.run(0, True) + self.assertTrue(ok) + self.assertEqual(len(comp.projection.site.nodes), 1) # merged by serial + self.assertTrue(any("merged site graph" in m for m in comp.base.logs)) + + +if __name__ == "__main__": + unittest.main()