diff --git a/.gitignore b/.gitignore
index bf52364..15fd32e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,3 +14,6 @@ build/
.DS_Store
.idea/
.vscode/
+
+# Runtime UI session data
+data/ui_sessions/
diff --git a/apps/ui/src/twfarmbot_ui/app.py b/apps/ui/src/twfarmbot_ui/app.py
index ff46384..7ac51b7 100644
--- a/apps/ui/src/twfarmbot_ui/app.py
+++ b/apps/ui/src/twfarmbot_ui/app.py
@@ -26,6 +26,7 @@
from twfarmbot_core.actions import summarize_action
from twfarmbot_ui.client import ApiClient, ApiResult
+from twfarmbot_ui import history
# ── config ────────────────────────────────────────────────────────────────────
@@ -115,20 +116,20 @@ def _render_tool_call(
return
if name == "analyze_image" and result.get("image_url"):
- st.image(result["image_url"], width=400)
+ st.image(result["image_url"], use_container_width=True)
elif name == "estimate_traversability" and result.get("image_url"):
- st.image(result["image_url"], width=400)
+ st.image(result["image_url"], use_container_width=True)
elif name in {"segment_image", "visualize_image_features"} and result.get(
"image_urls"
):
cols = st.columns(min(len(result["image_urls"]), 3))
for idx, url in enumerate(result["image_urls"]):
- cols[idx % len(cols)].image(url, width=300)
+ cols[idx % len(cols)].image(url, use_container_width=True)
for label_text in result.get("labels", []):
st.caption(label_text)
elif result.get("image_url"):
# Fallback for any other tool that returns a single image (e.g. take_photo).
- st.image(result["image_url"], width=400)
+ st.image(result["image_url"], use_container_width=True)
def _render_proposed_actions_inline(
@@ -147,10 +148,12 @@ def _render_proposed_actions_inline(
results = _execute_proposed_actions(actions, message, wait=True)
message["approved"] = True
message["content"] += "\n\n" + _format_execution_results(results)
+ _persist_session()
st.rerun()
if reject_col.button("✕ Reject", key=f"reject_{idx}", use_container_width=True):
message["rejected"] = True
message["content"] += "\n\n❌ Cancelled."
+ _persist_session()
st.rerun()
@@ -295,6 +298,7 @@ def _refresh_telemetry(client: ApiClient) -> None:
"Camera",
"I/O",
"Assistant",
+ "History",
"Diagnostics",
"Settings",
]
@@ -437,6 +441,18 @@ def _do_pin_pulse(
.card-label { font-size: .66rem; font-weight: 700; letter-spacing: .09em;
text-transform: uppercase; opacity: .5; }
.card-value { font-size: 1.3rem; font-weight: 650; margin-top: .4rem; }
+ .sensor-value {
+ background: var(--secondary-background-color);
+ border: 1px solid rgba(128,128,128,0.10);
+ border-radius: 9px;
+ padding: .55rem .75rem;
+ font-size: 1.25rem;
+ font-weight: 650;
+ min-height: 2.2rem;
+ display: flex;
+ align-items: center;
+ margin-top: .4rem;
+ }
.empty { opacity: .4; font-size: .8rem; }
div[data-testid="stMetric"] {
background: var(--secondary-background-color);
@@ -463,13 +479,15 @@ def _do_pin_pulse(
.st-key-analysis_source img,
.st-key-analysis_processed img {
width: 100% !important;
- height: 320px !important;
+ max-height: 70vh !important;
object-fit: contain !important;
background: var(--secondary-background-color);
border-radius: 9px;
}
- [data-testid="stChatMessage"] img {
- max-width: 480px !important;
+ [data-testid="stChatMessage"] img,
+ [data-testid="stImage"] img {
+ max-width: 100% !important;
+ max-height: 70vh !important;
border-radius: 9px;
}
[data-testid="stChatMessage"] {
@@ -1099,34 +1117,44 @@ def _render_io() -> None:
# ── Sensors ───────────────────────────────────────────────────────────────
sensors = [p for p in named if p.get("kind") == "sensor"]
if sensors:
- st.markdown("## Sensors")
+ st.markdown("### 🔍 Sensors")
cols = st.columns(min(3, len(sensors)))
for i, s in enumerate(sensors):
with cols[i]:
- st.caption(f"{s['label']} · pin {s['pin']} · {s.get('mode', 'analog')}")
- if st.button("Read", key=f"sensor_{i}", use_container_width=True):
- r = client.request(
- "GET",
- f"/pin/{s['pin']}",
- params={"mode": s.get("mode", "analog")},
+ with st.container(border=True, height=170):
+ mode = s.get("mode", "analog")
+ st.markdown(
+ f"**{s['label']}** {mode}",
+ unsafe_allow_html=True,
)
- st.session_state[f"sv_{s['pin']}"] = (
- r.body.get("value") if r.ok else "—"
+ st.caption(f"pin {s['pin']}")
+ if st.button("Read", key=f"sensor_{i}", use_container_width=True):
+ r = client.request(
+ "GET",
+ f"/pin/{s['pin']}",
+ params={"mode": mode},
+ )
+ st.session_state[f"sv_{s['pin']}"] = (
+ r.body.get("value") if r.ok else "—"
+ )
+ sensor_value = st.session_state.get(f"sv_{s['pin']}", "—")
+ st.markdown(
+ f"
{sensor_value}
",
+ unsafe_allow_html=True,
)
- st.metric("Value", st.session_state.get(f"sv_{s['pin']}", "—"))
elif named:
st.info("No sensor pins configured.")
st.divider()
# ── Actuators ─────────────────────────────────────────────────────────────
- st.markdown("## Actuators")
+ st.markdown("### ⚡ Actuators")
a, b = st.columns(2)
with a:
- st.markdown("**Irrigation**")
- with st.form("water"):
- secs = st.number_input("Seconds", 0.1, 300.0, 2.0, 0.5)
- if st.form_submit_button("Water", use_container_width=True):
+ with st.container(border=True, height=320):
+ st.markdown("**💧 Irrigation**")
+ secs = st.number_input("Seconds", 0.1, 300.0, 2.0, 0.5, key="water_secs")
+ if st.button("Water", use_container_width=True, type="primary"):
r = client.request(
"POST",
"/actions",
@@ -1136,43 +1164,62 @@ def _render_io() -> None:
st.success("Queued")
else:
st.error(r.error_message())
+ st.caption("Runs the pump for the selected duration.")
with b:
- st.markdown("**Peripheral control**")
- outputs = [p for p in named if p.get("kind") != "sensor"]
- if not outputs:
- st.info("No output pins configured.")
- else:
- sel = st.selectbox(
- "Output",
- outputs,
- format_func=lambda p: f"{p['label']} · pin {p['pin']}",
- )
- if sel:
- mode = sel.get("mode", "digital")
- high_mode = st.segmented_control(
- "HIGH mode", ["Timed", "Keep on"], default="Timed"
+ with st.container(border=True, height=320):
+ st.markdown("**🔌 Peripheral control**")
+ outputs = [p for p in named if p.get("kind") != "sensor"]
+ if not outputs:
+ st.info("No output pins configured.")
+ else:
+ sel = st.selectbox(
+ "Output",
+ outputs,
+ format_func=lambda p: f"{p['label']} · pin {p['pin']}",
+ label_visibility="collapsed",
)
- if high_mode == "Timed":
- high_secs = st.number_input(
- "Seconds to stay HIGH",
- 0.1,
- 300.0,
- 2.0,
- 0.5,
- key="high_secs",
+ if sel:
+ mode = sel.get("mode", "digital")
+ st.markdown(
+ f"{mode}",
+ unsafe_allow_html=True,
)
- else:
- high_secs = None
-
- off, on = st.columns(2)
- if off.button("Set LOW", use_container_width=True):
- _do_pin_write(client, sel["pin"], 0, mode)
- if on.button("Set HIGH", use_container_width=True):
- if high_mode == "Timed" and high_secs is not None:
- _do_pin_pulse(client, sel["pin"], high_secs, mode)
+
+ if mode == "analog":
+ val_col, btn_col = st.columns([4, 1])
+ with val_col:
+ analog_value = st.slider(
+ "PWM value",
+ min_value=0,
+ max_value=255,
+ value=0,
+ key=f"analog_value_{sel['pin']}",
+ )
+ with btn_col:
+ st.markdown("", unsafe_allow_html=True)
+ if st.button("Apply", use_container_width=True):
+ _do_pin_write(client, sel["pin"], analog_value, mode)
else:
- _do_pin_write(client, sel["pin"], 1, mode)
+ pulse = st.toggle("Timed pulse", value=True)
+ if pulse:
+ pulse_secs = st.number_input(
+ "Seconds",
+ 0.1,
+ 300.0,
+ 2.0,
+ 0.5,
+ key=f"pulse_secs_{sel['pin']}",
+ )
+
+ off, on = st.columns(2)
+ if off.button("⏻ OFF", use_container_width=True):
+ _do_pin_write(client, sel["pin"], 0, mode)
+ if on.button("⏻ ON", use_container_width=True, type="primary"):
+ if pulse and pulse_secs is not None:
+ _do_pin_pulse(client, sel["pin"], pulse_secs, mode)
+ else:
+ _do_pin_write(client, sel["pin"], 1, mode)
def _render_camera() -> None:
@@ -1220,7 +1267,7 @@ def _render_camera() -> None:
img_col, ctrl_col = st.columns([1.6, 1])
with img_col:
- st.image(selected.get("attachment_url"), width=500)
+ st.image(selected.get("attachment_url"), use_container_width=True)
with ctrl_col:
st.markdown("**AI analysis**")
@@ -1371,7 +1418,7 @@ def _render_camera() -> None:
result_cols = st.columns(len(result["paths"]))
for idx, (path, caption) in enumerate(zip(result["paths"], result["captions"])):
with result_cols[idx]:
- st.image(path, caption=caption, width=280)
+ st.image(path, caption=caption, use_container_width=True)
st.markdown("**Raw output**")
if result.get("class_scores"):
@@ -1472,10 +1519,13 @@ def _render_assistant() -> None:
with clear_col:
if st.button("Clear chat", use_container_width=True):
st.session_state["assistant_messages"] = []
+ _persist_session()
st.rerun()
+ _render_session_controls()
selected_model = _render_model_picker()
st.session_state["assistant_selected_model"] = selected_model
_render_chat()
+ _persist_session()
def _render_chat() -> None:
@@ -1556,9 +1606,11 @@ def _render_chat() -> None:
else:
last_assistant["rejected"] = True
last_assistant["content"] += "\n\n❌ Cancelled."
+ _persist_session()
st.rerun()
else:
st.toast("No pending proposal to approve or reject.", icon="⚠️")
+ _persist_session()
st.rerun()
st.session_state["assistant_messages"].append(
@@ -1724,6 +1776,7 @@ def _close_text_segment() -> None:
"images": photo_images,
}
)
+ _persist_session()
st.rerun()
@@ -1833,6 +1886,163 @@ def _format_execution_results(results: list[dict[str, Any]]) -> str:
return "\n".join(lines)
+# ── session persistence ───────────────────────────────────────────────────────
+
+
+def _has_session_state() -> bool:
+ """Return True if any chat/plan state has been initialised."""
+ return (
+ "assistant_messages" in st.session_state
+ or "assistant_plan_response" in st.session_state
+ or "executed_plans" in st.session_state
+ )
+
+
+def _is_session_empty() -> bool:
+ """Return True if the current session has nothing worth saving."""
+ messages = st.session_state.get("assistant_messages") or []
+ plan_response = st.session_state.get("assistant_plan_response")
+ executed = st.session_state.get("executed_plans") or []
+ return not messages and not plan_response and not executed
+
+
+def _restore_session() -> None:
+ """Load the latest or URL-specified session on first app load."""
+ if _has_session_state():
+ return
+
+ session_id = st.query_params.get("session")
+ if isinstance(session_id, list):
+ session_id = session_id[0] if session_id else None
+
+ snapshot: dict[str, Any] | None = None
+ if session_id:
+ snapshot = history.load_session(session_id)
+ if snapshot is None:
+ sessions = history.list_sessions(limit=1)
+ if sessions:
+ snapshot = history.load_session(sessions[0]["session_id"])
+
+ if snapshot is None:
+ snapshot = history.empty_snapshot()
+
+ st.session_state["assistant_session_id"] = snapshot["session_id"]
+ st.session_state["assistant_session_label"] = snapshot.get("label")
+ st.session_state["assistant_messages"] = snapshot.get("assistant_messages", [])
+ st.session_state["assistant_plan_request"] = snapshot.get(
+ "assistant_plan_request", ""
+ )
+ st.session_state["assistant_plan_response"] = snapshot.get(
+ "assistant_plan_response"
+ )
+ st.session_state["assistant_plan_status"] = snapshot.get(
+ "assistant_plan_status"
+ )
+ st.session_state["assistant_selected_model"] = snapshot.get(
+ "assistant_selected_model"
+ )
+ st.session_state["executed_plans"] = snapshot.get("executed_plans", [])
+
+
+def _persist_session() -> None:
+ """Save the current chat/plan state to disk."""
+ if _is_session_empty():
+ return
+ snapshot = history.empty_snapshot(
+ session_id=st.session_state.get("assistant_session_id")
+ )
+ snapshot["label"] = st.session_state.get("assistant_session_label")
+ snapshot["created_at"] = st.session_state.get(
+ "assistant_session_created_at", snapshot["created_at"]
+ )
+ snapshot["assistant_messages"] = st.session_state.get("assistant_messages", [])
+ snapshot["assistant_plan_request"] = st.session_state.get(
+ "assistant_plan_request", ""
+ )
+ snapshot["assistant_plan_response"] = st.session_state.get(
+ "assistant_plan_response"
+ )
+ snapshot["assistant_plan_status"] = st.session_state.get("assistant_plan_status")
+ snapshot["assistant_selected_model"] = st.session_state.get(
+ "assistant_selected_model"
+ )
+ snapshot["executed_plans"] = st.session_state.get("executed_plans", [])
+ history.save_session(snapshot)
+
+
+def _render_session_controls() -> None:
+ """Render session management widgets inside the Assistant tab."""
+ with st.expander("🗂️ Session", expanded=False):
+ current_label = st.text_input(
+ "Session label",
+ value=st.session_state.get("assistant_session_label") or "",
+ key="assistant_session_label_input",
+ placeholder="e.g. watering experiment",
+ )
+ st.session_state["assistant_session_label"] = (
+ current_label.strip() or None
+ )
+
+ new_col, save_col = st.columns([1, 1])
+ if new_col.button("New session", use_container_width=True):
+ _persist_session()
+ new_id = history.new_session_id()
+ st.session_state["assistant_session_id"] = new_id
+ st.session_state["assistant_session_label"] = None
+ st.session_state["assistant_messages"] = []
+ st.session_state["assistant_plan_request"] = ""
+ st.session_state["assistant_plan_response"] = None
+ st.session_state["assistant_plan_status"] = None
+ st.session_state["executed_plans"] = []
+ st.query_params.pop("session", None)
+ st.rerun()
+ if save_col.button("Save now", use_container_width=True):
+ _persist_session()
+ st.toast("Session saved", icon="💾")
+
+ sessions = history.list_sessions(limit=20)
+ if sessions:
+ st.divider()
+ st.markdown("**Previous sessions**")
+ for sess in sessions:
+ if sess["session_id"] == st.session_state.get("assistant_session_id"):
+ continue
+ label = sess["label"] or sess["session_id"]
+ preview = sess["preview"]
+ c1, c2, c3 = st.columns([3, 1, 1])
+ c1.caption(f"{label}" + (f" · {preview}" if preview else ""))
+ if c2.button("Load", key=f"load_sess_{sess['session_id']}", use_container_width=True):
+ snapshot = history.load_session(sess["session_id"])
+ if snapshot is None:
+ st.error("Session not found")
+ continue
+ st.session_state["assistant_session_id"] = snapshot["session_id"]
+ st.session_state["assistant_session_label"] = snapshot.get("label")
+ st.session_state["assistant_messages"] = snapshot.get(
+ "assistant_messages", []
+ )
+ st.session_state["assistant_plan_request"] = snapshot.get(
+ "assistant_plan_request", ""
+ )
+ st.session_state["assistant_plan_response"] = snapshot.get(
+ "assistant_plan_response"
+ )
+ st.session_state["assistant_plan_status"] = snapshot.get(
+ "assistant_plan_status"
+ )
+ st.session_state["assistant_selected_model"] = snapshot.get(
+ "assistant_selected_model"
+ )
+ st.session_state["executed_plans"] = snapshot.get(
+ "executed_plans", []
+ )
+ st.query_params["session"] = snapshot["session_id"]
+ st.rerun()
+ if c3.button("🗑", key=f"del_sess_{sess['session_id']}", use_container_width=True):
+ history.delete_session(sess["session_id"])
+ st.rerun()
+
+
def _render_plan() -> None:
st.caption(
"Describe a task. The LLM builds a step-by-step plan; review it before running."
@@ -1894,6 +2104,7 @@ def _render_plan() -> None:
r.body if r.ok else {"error": r.body}
)
st.session_state["assistant_plan_status"] = r.code
+ _persist_session()
response = st.session_state.get("assistant_plan_response")
status = st.session_state.get("assistant_plan_status")
@@ -1934,17 +2145,26 @@ def _render_plan() -> None:
if clear_col.button("Clear", use_container_width=True):
st.session_state["assistant_plan_response"] = None
st.session_state["assistant_plan_status"] = None
+ _persist_session()
st.rerun()
if run_col.button("Run plan", type="primary", use_container_width=True):
queued = 0
failed = 0
+ action_results: list[dict[str, Any]] = []
for action in actions:
r = client.request(
"POST",
"/actions",
json={"kind": action["kind"], "params": action.get("params", {})},
)
+ action_results.append(
+ {
+ "kind": action["kind"],
+ "ok": r.ok,
+ "detail": r.error_message() if not r.ok else None,
+ }
+ )
if r.ok:
queued += 1
st.toast(f"Queued {action['kind']}", icon="➡️")
@@ -1955,11 +2175,101 @@ def _render_plan() -> None:
st.success(f"Plan queued · {queued} action(s)")
else:
st.warning(f"Plan partially queued · {queued} ok, {failed} failed")
+ executed = st.session_state.get("executed_plans") or []
+ executed.append(
+ {
+ "request": response.get("request", ""),
+ "actions": actions,
+ "results": action_results,
+ "queued_at": datetime.now().isoformat(),
+ "status": "ok" if failed == 0 else ("partial" if queued > 0 else "failed"),
+ }
+ )
+ st.session_state["executed_plans"] = executed
st.session_state["assistant_plan_response"] = None
st.session_state["assistant_plan_status"] = None
+ _persist_session()
st.rerun()
+def _render_history() -> None:
+ st.markdown(
+ 'TWFarmBot · UAS Technikum Wien
',
+ unsafe_allow_html=True,
+ )
+ st.markdown("# History")
+
+ sessions = history.list_sessions(limit=50)
+ if not sessions:
+ st.info("No saved sessions yet. Chat and plans are saved automatically.")
+ return
+
+ st.markdown("## Chat sessions")
+ for sess in sessions:
+ label = sess["label"] or sess["session_id"]
+ updated = sess["updated_at"][:19].replace("T", " ") if sess["updated_at"] else ""
+ c1, c2 = st.columns([4, 1])
+ with c1:
+ st.markdown(f"**{label}**")
+ st.caption(
+ f"Updated {updated}" + (f" · {sess['preview']}" if sess["preview"] else "")
+ )
+ if c2.button("Load", key=f"hist_load_{sess['session_id']}", use_container_width=True):
+ snapshot = history.load_session(sess["session_id"])
+ if snapshot is None:
+ st.error("Session not found")
+ else:
+ st.session_state["assistant_session_id"] = snapshot["session_id"]
+ st.session_state["assistant_session_label"] = snapshot.get("label")
+ st.session_state["assistant_messages"] = snapshot.get(
+ "assistant_messages", []
+ )
+ st.session_state["assistant_plan_request"] = snapshot.get(
+ "assistant_plan_request", ""
+ )
+ st.session_state["assistant_plan_response"] = snapshot.get(
+ "assistant_plan_response"
+ )
+ st.session_state["assistant_plan_status"] = snapshot.get(
+ "assistant_plan_status"
+ )
+ st.session_state["assistant_selected_model"] = snapshot.get(
+ "assistant_selected_model"
+ )
+ st.session_state["executed_plans"] = snapshot.get("executed_plans", [])
+ st.query_params["session"] = snapshot["session_id"]
+ st.rerun()
+
+ executed = st.session_state.get("executed_plans") or []
+ if executed:
+ st.markdown("## Executed plans")
+ for idx, plan in enumerate(reversed(executed), start=1):
+ with st.container(border=True):
+ queued_at = plan.get("queued_at", "")
+ ts = queued_at[:19].replace("T", " ") if queued_at else ""
+ status = plan.get("status", "unknown")
+ status_emoji = {"ok": "✅", "partial": "⚠️", "failed": "❌"}.get(
+ status, "❓"
+ )
+ st.markdown(
+ f"{status_emoji} **Plan {idx}** · {plan.get('request', '')}"
+ )
+ st.caption(f"{ts} · {len(plan.get('actions', []))} action(s) · {status}")
+ with st.expander("Actions"):
+ for action in plan.get("actions", []):
+ st.markdown(f"• {_action_summary(action)}")
+ results = plan.get("results", [])
+ if results:
+ with st.expander("Results"):
+ for res in results:
+ icon = "✅" if res.get("ok") else "❌"
+ detail = res.get("detail")
+ line = f"{icon} {_action_summary({'kind': res['kind'], 'params': {}})}"
+ if detail:
+ line += f" — {detail}"
+ st.markdown(line)
+
+
def _render_diagnostics() -> None:
st.markdown(
'TWFarmBot · UAS Technikum Wien
',
@@ -2080,7 +2390,11 @@ def _render_settings() -> None:
"Camera": _render_camera,
"I/O": _render_io,
"Assistant": _render_assistant,
+ "History": _render_history,
"Diagnostics": _render_diagnostics,
"Settings": _render_settings,
}
+
+# Restore the latest or URL-specified chat/plan session on first load.
+_restore_session()
renderers[tab]()
diff --git a/apps/ui/src/twfarmbot_ui/history.py b/apps/ui/src/twfarmbot_ui/history.py
new file mode 100644
index 0000000..7631737
--- /dev/null
+++ b/apps/ui/src/twfarmbot_ui/history.py
@@ -0,0 +1,124 @@
+"""Persistence for Streamlit UI session state.
+
+Chat history, plan previews, and executed plans are saved as JSON files so
+they survive page reloads. Storage is local and intended for a single-user
+research UI; concurrent writes to the same session file are last-write-wins.
+"""
+
+from __future__ import annotations
+
+import json
+import os
+import secrets
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Any
+
+
+def session_data_dir() -> Path:
+ """Return the directory used to store session JSON files."""
+ path = Path(
+ os.getenv("TWFB_UI_DATA_DIR", Path.cwd() / "data" / "ui_sessions")
+ )
+ path.mkdir(parents=True, exist_ok=True)
+ return path
+
+
+def _utc_now() -> str:
+ return datetime.now(timezone.utc).isoformat()
+
+
+def _session_path(session_id: str) -> Path:
+ return session_data_dir() / f"{session_id}.json"
+
+
+def new_session_id() -> str:
+ """Generate a new session id based on an ISO timestamp plus a random suffix."""
+ stamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S")
+ suffix = secrets.token_hex(4)
+ return f"{stamp}-{suffix}"
+
+
+def save_session(snapshot: dict[str, Any]) -> Path:
+ """Write a session snapshot to disk.
+
+ ``snapshot`` must contain a ``session_id`` key. The ``updated_at`` field
+ is refreshed automatically.
+ """
+ session_id = snapshot["session_id"]
+ snapshot["updated_at"] = _utc_now()
+ path = _session_path(session_id)
+ path.write_text(json.dumps(snapshot, indent=2, default=str), encoding="utf-8")
+ return path
+
+
+def load_session(session_id: str) -> dict[str, Any] | None:
+ """Load a session snapshot by id, or return None if it does not exist."""
+ path = _session_path(session_id)
+ if not path.exists():
+ return None
+ try:
+ return json.loads(path.read_text(encoding="utf-8"))
+ except (json.JSONDecodeError, OSError):
+ return None
+
+
+def delete_session(session_id: str) -> bool:
+ """Delete a session file. Returns True if it existed and was removed."""
+ path = _session_path(session_id)
+ if path.exists():
+ path.unlink()
+ return True
+ return False
+
+
+def list_sessions(limit: int = 50) -> list[dict[str, Any]]:
+ """Return metadata for saved sessions, newest first.
+
+ Each item contains ``session_id``, ``label``, ``created_at``,
+ ``updated_at``, and a ``preview`` snippet of the latest user message.
+ """
+ sessions: list[dict[str, Any]] = []
+ data_dir = session_data_dir()
+ for path in data_dir.glob("*.json"):
+ try:
+ snapshot = json.loads(path.read_text(encoding="utf-8"))
+ except (json.JSONDecodeError, OSError):
+ continue
+ session_id = snapshot.get("session_id")
+ if not session_id:
+ continue
+ messages = snapshot.get("assistant_messages") or []
+ preview = ""
+ for msg in reversed(messages):
+ if msg.get("role") == "user":
+ preview = str(msg.get("content", ""))[:80]
+ break
+ sessions.append(
+ {
+ "session_id": session_id,
+ "label": snapshot.get("label") or None,
+ "created_at": snapshot.get("created_at", ""),
+ "updated_at": snapshot.get("updated_at", ""),
+ "preview": preview,
+ }
+ )
+ sessions.sort(key=lambda s: s["updated_at"], reverse=True)
+ return sessions[:limit]
+
+
+def empty_snapshot(session_id: str | None = None) -> dict[str, Any]:
+ """Return a fresh, empty session snapshot."""
+ now = _utc_now()
+ return {
+ "session_id": session_id or new_session_id(),
+ "label": None,
+ "created_at": now,
+ "updated_at": now,
+ "assistant_messages": [],
+ "assistant_plan_request": "",
+ "assistant_plan_response": None,
+ "assistant_plan_status": None,
+ "assistant_selected_model": None,
+ "executed_plans": [],
+ }
diff --git a/configs/dev.yaml b/configs/dev.yaml
index 8f734cb..df11856 100644
--- a/configs/dev.yaml
+++ b/configs/dev.yaml
@@ -7,14 +7,14 @@ log_level: INFO
# the environment, or reference a different env var via api_key_env.
planning:
base_url: https://openrouter.ai/api/v1 # OpenAI-compatible endpoint
- model: openai/gpt-oss-safeguard-20b # any model the endpoint exposes
+ model: openai/gpt-4o-mini # tool-calling model
api_key_env: PLANNING_LLM_API_KEY # name of the env var that holds the secret
timeout_s: 120 # raise this for slower models
temperature: 0.0
- # Provider-specific request body extensions. For DeepSeek V4 reasoning on
- # OpenRouter use: extra_body: {reasoning: {effort: low}}
+ # Provider-specific request body extensions. Only needed for reasoning models.
+ # For DeepSeek V4 on OpenRouter use: extra_body: {reasoning: {effort: low}}
# For native DeepSeek use: extra_body: {thinking: {type: enabled}}
- extra_body: {reasoning: {effort: low}}
+ extra_body: {}
# Optional W&B Weave project for tracing. Set WEAVE_PROJECT or this field
# to enable model/tool-call/token tracing.
weave_project: ""
@@ -46,6 +46,7 @@ watering:
pins:
- {label: "Pump", pin: 7, mode: digital, kind: valve, group: "Water"}
- {label: "Water solenoid", pin: 8, mode: digital, kind: valve, group: "Water"}
+- {label: "Cutting end effector", pin: 4, mode: analog, kind: io, group: "Tools"}
- {label: "Tool servo", pin: 10, mode: digital, kind: servo, group: "Tools"}
- {label: "Soil sensor", pin: 14, mode: analog, kind: sensor, group: "Sensors"}
- {label: "Light sensor", pin: 15, mode: analog, kind: sensor, group: "Sensors"}
diff --git a/tests/test_history.py b/tests/test_history.py
new file mode 100644
index 0000000..d956543
--- /dev/null
+++ b/tests/test_history.py
@@ -0,0 +1,106 @@
+"""Tests for the UI session persistence helper."""
+
+from __future__ import annotations
+
+from typing import Any
+
+import pytest
+
+from twfarmbot_ui import history
+
+
+@pytest.fixture
+def tmp_history_dir(tmp_path: pytest.TempPathFactory, monkeypatch: pytest.MonkeyPatch):
+ """Redirect session storage to a temp directory for each test."""
+ monkeypatch.setattr(history, "session_data_dir", lambda: tmp_path)
+ return tmp_path
+
+
+def test_empty_snapshot_has_required_keys() -> None:
+ snap = history.empty_snapshot("test-id")
+ assert snap["session_id"] == "test-id"
+ assert snap["assistant_messages"] == []
+ assert snap["executed_plans"] == []
+ assert "created_at" in snap
+ assert "updated_at" in snap
+
+
+def test_save_and_load_session_round_trip(tmp_history_dir: Any) -> None:
+ snap = history.empty_snapshot("sess-1")
+ snap["label"] = "my session"
+ snap["assistant_messages"] = [
+ {"role": "user", "content": "hello"},
+ {"role": "assistant", "content": "hi"},
+ ]
+ snap["executed_plans"] = [
+ {
+ "request": "water bed",
+ "actions": [{"kind": "water", "params": {"seconds": 10}}],
+ "results": [{"kind": "water", "ok": True, "detail": None}],
+ "queued_at": "2026-06-24T12:00:00",
+ "status": "ok",
+ }
+ ]
+ history.save_session(snap)
+
+ loaded = history.load_session("sess-1")
+ assert loaded is not None
+ assert loaded["label"] == "my session"
+ assert loaded["assistant_messages"][0]["content"] == "hello"
+ assert len(loaded["executed_plans"]) == 1
+ assert loaded["executed_plans"][0]["status"] == "ok"
+
+
+def test_save_session_updates_updated_at(tmp_history_dir: Any) -> None:
+ snap = history.empty_snapshot("sess-2")
+ original_updated = snap["updated_at"]
+ history.save_session(snap)
+ loaded = history.load_session("sess-2")
+ assert loaded is not None
+ assert loaded["updated_at"] >= original_updated
+
+
+def test_load_missing_session_returns_none(tmp_history_dir: Any) -> None:
+ assert history.load_session("does-not-exist") is None
+
+
+def test_delete_session(tmp_history_dir: Any) -> None:
+ snap = history.empty_snapshot("sess-del")
+ history.save_session(snap)
+ assert history.delete_session("sess-del") is True
+ assert history.load_session("sess-del") is None
+ assert history.delete_session("sess-del") is False
+
+
+def test_list_sessions_orders_by_updated_desc(tmp_history_dir: Any) -> None:
+ old = history.empty_snapshot("old-sess")
+ history.save_session(old)
+
+ new = history.empty_snapshot("new-sess")
+ new["assistant_messages"] = [{"role": "user", "content": "latest"}]
+ history.save_session(new)
+
+ sessions = history.list_sessions()
+ ids = [s["session_id"] for s in sessions]
+ assert ids == ["new-sess", "old-sess"]
+
+
+def test_list_sessions_includes_preview(tmp_history_dir: Any) -> None:
+ snap = history.empty_snapshot("preview-sess")
+ snap["assistant_messages"] = [
+ {"role": "assistant", "content": "hi"},
+ {"role": "user", "content": "water the tomatoes please"},
+ ]
+ history.save_session(snap)
+
+ sessions = history.list_sessions()
+ assert len(sessions) == 1
+ assert sessions[0]["preview"] == "water the tomatoes please"
+
+
+def test_list_sessions_respects_limit(tmp_history_dir: Any) -> None:
+ for i in range(5):
+ snap = history.empty_snapshot(f"sess-{i}")
+ history.save_session(snap)
+ assert len(history.list_sessions(limit=2)) == 2
+ assert len(history.list_sessions(limit=10)) == 5