Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 24 additions & 10 deletions infra/lambda/poller/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,20 +553,34 @@ def build_active_plan_ride_index(
item.get("plan_window"), now_et
):
continue
active_plans.append({
"user_id": user_id,
"plan_id": plan_id,
"park_key": item.get("park_key"),
# park_name isn't stored on the plan row, but it's
# derivable from park_key in the handler via the same
# PARK_NAME lookup the notifier uses. Leaving the slot
# here for clarity.
})
# Alert recipients (2026-07-03): the partition owner is always
# implicit, plus any opted-in family members from the row's
# alert_subscribers String Set (ids with USER#<id>/PROFILE
# rows — see set_plan_alert_subscription in the MCP). Absent
# attribute = owner-only, the pre-feature behavior. Each
# recipient gets their own index/active_plans entries; the
# weather path's per-user dedup + per-(user, plan) cooldowns
# already handle the rest.
subscribers = item.get("alert_subscribers") or set()
recipients = [user_id] + sorted(
s for s in subscribers if s and s != user_id
)
for recipient in recipients:
active_plans.append({
"user_id": recipient,
"plan_id": plan_id,
"park_key": item.get("park_key"),
# park_name isn't stored on the plan row, but it's
# derivable from park_key in the handler via the same
# PARK_NAME lookup the notifier uses. Leaving the slot
# here for clarity.
})
for ride in item.get("ride_sequence", []) or []:
ride_id = ride.get("ride_id")
ride_name = ride.get("ride_name")
for key in filter(None, (ride_id, (ride_name or "").lower())):
index.setdefault(key, []).append((user_id, plan_id))
for recipient in recipients:
index.setdefault(key, []).append((recipient, plan_id))
last_evaluated_key = resp.get("LastEvaluatedKey")
if not last_evaluated_key:
break
Expand Down
46 changes: 46 additions & 0 deletions infra/lambda/poller/tests/test_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,3 +444,49 @@ def test_get_user_favorites_reads_all_pages(self):
})
favs = db.get_user_favorites_for_park("megan", "magic_kingdom")
assert favs == {"r1", "r2", "r3"}


# ── Plan alert subscribers fanout (2026-07-03) ──

class TestPlanAlertSubscribersFanout:
"""alert_subscribers on a PLAN# row adds recipients to BOTH fanout
surfaces (per-ride index for DOWN/UP/low-wait, active_plans for
weather). Absent attribute = owner-only — the pre-feature behavior."""

def setup_method(self):
self.stub = _StubTable()
_swap_in_stub(self.stub)

def _put_plan(self, plan_id, subscribers=None):
item = {
"PK": "USER#megan",
"SK": f"PLAN#{plan_id}",
"planned_for_date": "2026-06-23",
"outcome_recorded": False,
"active": True,
"park_key": "magic_kingdom",
"ride_sequence": [{"ride_id": "sm", "ride_name": "Space Mountain"}],
}
if subscribers:
item["alert_subscribers"] = set(subscribers)
self.stub.put_item(Item=item)

def test_absent_attribute_is_owner_only(self):
self._put_plan("p1")
index, active_plans = db.build_active_plan_ride_index("2026-06-23")
assert index["sm"] == [("megan", "p1")]
assert [p["user_id"] for p in active_plans] == ["megan"]

def test_subscribers_expand_both_surfaces(self):
self._put_plan("p1", subscribers={"sub-jim", "sub-sis"})
index, active_plans = db.build_active_plan_ride_index("2026-06-23")
# Ride index: owner + both subscribers, owner first.
assert set(index["sm"]) == {("megan", "p1"), ("sub-jim", "p1"), ("sub-sis", "p1")}
assert index["sm"][0] == ("megan", "p1")
# Weather surface: one active_plans entry per recipient.
assert {p["user_id"] for p in active_plans} == {"megan", "sub-jim", "sub-sis"}

def test_owner_in_subscribers_not_duplicated(self):
self._put_plan("p1", subscribers={"megan", "sub-jim"})
index, _ = db.build_active_plan_ride_index("2026-06-23")
assert sorted(index["sm"]) == [("megan", "p1"), ("sub-jim", "p1")]
73 changes: 72 additions & 1 deletion mcp/_tool_impls.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ def _convert_decimals(obj: Any) -> Any:
return {k: _convert_decimals(v) for k, v in obj.items()}
if isinstance(obj, list):
return [_convert_decimals(v) for v in obj]
if isinstance(obj, set):
# DynamoDB String Sets (e.g. alert_subscribers) come back as Python
# sets, which aren't JSON-serializable — a raw row in a tool return
# would crash the MCP runtime. Sorted list keeps output stable.
return sorted(_convert_decimals(v) for v in obj)
return obj


Expand Down Expand Up @@ -2232,6 +2237,7 @@ def _build_plan_item(
active: bool = False,
activated_at: str | None = None,
created_by: str | None = None,
alert_subscribers: set[str] | None = None,
) -> dict[str, Any]:
"""Assemble a PLAN# row. Shared by record_plan (single day, often
same-day + active) and create_trip (one dormant row per trip day).
Expand All @@ -2247,8 +2253,15 @@ def _build_plan_item(
- `created_by`: attribution label (friendly user id). Defaults to
user_id. In the shared-trip model multiple people write to one
partition, so we stamp who recorded each row.
- `alert_subscribers` (2026-07-03): DDB String Set of ADDITIONAL
alert recipients (ids with a USER#<id>/PROFILE row — Cognito subs
for family members). The partition owner is IMPLICIT and always
alerted; absent attribute = owner-only, exactly the pre-feature
behavior (no migration). Omitted when empty (DDB rejects empty
sets). Mutated only via atomic ADD/DELETE (see
set_plan_alert_subscription) so web + MCP edits can't race.
"""
return {
item = {
"PK": f"USER#{user_id}",
"SK": f"PLAN#{plan_ts}",
"park_key": park_key,
Expand All @@ -2271,6 +2284,64 @@ def _build_plan_item(
"outcome_recorded": False,
"ttl": _plan_pending_ttl(planned_for_date),
}
if alert_subscribers:
item["alert_subscribers"] = set(alert_subscribers)
return item


def _resolve_alert_member(
table, member: str, friendly_to_sub: dict[str, str] | None = None
) -> tuple[str | None, bool]:
"""Resolve a member label to the profile id the poller alerts on.

Tries `member` as-given (a Cognito sub, or a legacy friendly id with
its own profile row), then via a friendly-name→sub map (available on
the HTTP transport from MCP_SUB_USER_MAP). The poller looks up
Pushover keys at USER#<id>/PROFILE, so an id only "works" if that row
exists — family members create it by signing into the dashboard and
saving /me once.

Returns (resolved_id, has_pushover_key); (None, False) when no
profile row exists under any candidate id.
"""
candidates = [member]
if friendly_to_sub:
mapped = friendly_to_sub.get(member.strip().lower())
if mapped:
candidates.append(mapped)
for cand in candidates:
row = table.get_item(
Key={"PK": f"USER#{cand}", "SK": "PROFILE"}
).get("Item")
if row:
return cand, bool(row.get("pushover_user_key"))
return None, False


def _apply_alert_subscription(
table, user_id: str, member_id: str, subscribed: bool,
plan_rows: list[dict],
) -> list[str]:
"""Atomically ADD/DELETE `member_id` in each plan row's
alert_subscribers String Set.

Set-level ADD/DELETE (not read-modify-write) so a concurrent MCP plan
edit or web toggle can't lose the change — and it never touches the
attributes the plan-edit tools rewrite. DELETE removing the last
member removes the attribute entirely, which reads back as
owner-only (the default). Returns the affected planned_for_dates.
"""
op = "ADD" if subscribed else "DELETE"
updated: list[str] = []
for r in plan_rows:
table.update_item(
Key={"PK": f"USER#{user_id}", "SK": r["SK"]},
UpdateExpression=f"{op} alert_subscribers :m",
ExpressionAttributeValues={":m": {member_id}},
ConditionExpression="attribute_exists(PK)",
)
updated.append(r.get("planned_for_date") or r["SK"])
return updated


def _bias_confidence(n: int) -> str:
Expand Down
113 changes: 113 additions & 0 deletions mcp/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
_TRIP_BUFFER_DAYS,
_WDW_LAT,
_WDW_LON,
_apply_alert_subscription,
_aws_error_payload,
_bias_confidence,
_build_plan_item,
Expand All @@ -76,6 +77,7 @@
_park_state_rows_via_gsi,
_plan_pending_ttl,
_pop_ride_from_sequence,
_resolve_alert_member,
_today_et_date_iso,
get_planning_context,
)
Expand Down Expand Up @@ -1605,6 +1607,12 @@ def record_plan(
item["plan_window"] = prior.get("plan_window")
if active and prior.get("active") and prior.get("activated_at"):
item["activated_at"] = prior.get("activated_at")
# Same wipe hazard for alert opt-ins: family members subscribed via
# set_plan_alert_subscription (or the web toggle) must survive a
# re-record of the day.
prior_subs = prior.get("alert_subscribers")
if prior_subs:
item["alert_subscribers"] = set(prior_subs)

try:
table.put_item(Item=_floats_to_decimals(item))
Expand Down Expand Up @@ -2490,6 +2498,111 @@ def record_plan_outcome(
}


@mcp.tool()
def set_plan_alert_subscription(
member: str,
subscribed: bool = True,
trip_id: str | None = None,
date: str | None = None,
user_id: str = _DEFAULT_USER_ID,
) -> dict[str, Any]:
"""Opt a family member IN (or out) of the disruption/weather/low-wait
alert pushes for a trip or a single day's plan.

By default only the shared-partition owner receives plan alerts (the
owner is always subscribed and can't be removed). This adds `member`
as an additional recipient on the matching plan day rows — they'll get
the same DOWN / BACK UP / storm / low-wait pushes for those days.

Args:
member: Who to subscribe. Their Cognito sub (preferred), or any id
that has a USER#<id>/PROFILE row. The member must have signed
into the dashboard and saved /me (that's where their Pushover
key lives) — if they haven't, this errors with instructions.
subscribed: True to opt in (default), False to opt out.
trip_id: Apply to EVERY un-recorded day of this trip.
date: Apply to the single plan for this date (YYYY-MM-DD).
Provide trip_id or date (or both to filter to one trip day).
user_id: Shared-partition owner, default "megan".

Returns:
Dict with member (resolved id), subscribed, days_updated (list of
planned_for_dates), and a warning when the member's profile has no
Pushover key yet (subscription stored, but no pushes until they
add one at /me).
"""
if not trip_id and not date:
return {
"error": "Provide trip_id and/or date",
"error_message": "Say which trip (trip_id) or day (date) to apply to.",
}
target_date = None
if date:
try:
target_date = datetime.fromisoformat(date).date().isoformat()
except ValueError:
return {
"error": "Invalid date",
"error_message": f"Could not parse '{date}'. Use YYYY-MM-DD.",
}

try:
table = _ddb_table()
member_id, has_key = _resolve_alert_member(table, member)
if member_id is None:
return {
"error": "Member has no profile",
"error_message": (
f"No USER#<id>/PROFILE row found for {member!r}. They need "
"to sign into the dashboard once and save /me (name + "
"Pushover key) — then subscribe them by their id."
),
}
if member_id == user_id:
return {
"member": member_id,
"subscribed": True,
"days_updated": [],
"note": "The plan owner always receives alerts — nothing to change.",
}
rows = [
r for r in (
_convert_decimals(x)
for x in _query_user_prefix(table, user_id, "PLAN#")
)
if not r.get("outcome_recorded")
and (trip_id is None or r.get("trip_id") == trip_id)
and (target_date is None or r.get("planned_for_date") == target_date)
]
if not rows:
return {
"error": "No matching plans",
"error_message": (
f"No un-recorded plan rows matched trip_id={trip_id!r} "
f"date={target_date!r}."
),
}
days = _apply_alert_subscription(table, user_id, member_id, subscribed, rows)
except Exception as e:
err = _aws_error_payload(e)
return err if err is not None else {
"error": "Subscription update failed",
"error_message": str(e),
}

out: dict[str, Any] = {
"member": member_id,
"subscribed": subscribed,
"days_updated": sorted(days),
}
if subscribed and not has_key:
out["warning"] = (
"Subscription stored, but this member's profile has no Pushover "
"key — they won't receive pushes until they add one at /me."
)
return out


@mcp.tool()
def mark_ride_complete(
plan_id: str,
Expand Down
Loading
Loading