diff --git a/.gitignore b/.gitignore index bf52364..15fd32e 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,6 @@ build/ .DS_Store .idea/ .vscode/ + +# Runtime UI session data +data/ui_sessions/ diff --git a/apps/ui/src/twfarmbot_ui/app.py b/apps/ui/src/twfarmbot_ui/app.py index ff46384..7ac51b7 100644 --- a/apps/ui/src/twfarmbot_ui/app.py +++ b/apps/ui/src/twfarmbot_ui/app.py @@ -26,6 +26,7 @@ from twfarmbot_core.actions import summarize_action from twfarmbot_ui.client import ApiClient, ApiResult +from twfarmbot_ui import history # ── config ──────────────────────────────────────────────────────────────────── @@ -115,20 +116,20 @@ def _render_tool_call( return if name == "analyze_image" and result.get("image_url"): - st.image(result["image_url"], width=400) + st.image(result["image_url"], use_container_width=True) elif name == "estimate_traversability" and result.get("image_url"): - st.image(result["image_url"], width=400) + st.image(result["image_url"], use_container_width=True) elif name in {"segment_image", "visualize_image_features"} and result.get( "image_urls" ): cols = st.columns(min(len(result["image_urls"]), 3)) for idx, url in enumerate(result["image_urls"]): - cols[idx % len(cols)].image(url, width=300) + cols[idx % len(cols)].image(url, use_container_width=True) for label_text in result.get("labels", []): st.caption(label_text) elif result.get("image_url"): # Fallback for any other tool that returns a single image (e.g. take_photo). - st.image(result["image_url"], width=400) + st.image(result["image_url"], use_container_width=True) def _render_proposed_actions_inline( @@ -147,10 +148,12 @@ def _render_proposed_actions_inline( results = _execute_proposed_actions(actions, message, wait=True) message["approved"] = True message["content"] += "\n\n" + _format_execution_results(results) + _persist_session() st.rerun() if reject_col.button("✕ Reject", key=f"reject_{idx}", use_container_width=True): message["rejected"] = True message["content"] += "\n\n❌ Cancelled." + _persist_session() st.rerun() @@ -295,6 +298,7 @@ def _refresh_telemetry(client: ApiClient) -> None: "Camera", "I/O", "Assistant", + "History", "Diagnostics", "Settings", ] @@ -437,6 +441,18 @@ def _do_pin_pulse( .card-label { font-size: .66rem; font-weight: 700; letter-spacing: .09em; text-transform: uppercase; opacity: .5; } .card-value { font-size: 1.3rem; font-weight: 650; margin-top: .4rem; } + .sensor-value { + background: var(--secondary-background-color); + border: 1px solid rgba(128,128,128,0.10); + border-radius: 9px; + padding: .55rem .75rem; + font-size: 1.25rem; + font-weight: 650; + min-height: 2.2rem; + display: flex; + align-items: center; + margin-top: .4rem; + } .empty { opacity: .4; font-size: .8rem; } div[data-testid="stMetric"] { background: var(--secondary-background-color); @@ -463,13 +479,15 @@ def _do_pin_pulse( .st-key-analysis_source img, .st-key-analysis_processed img { width: 100% !important; - height: 320px !important; + max-height: 70vh !important; object-fit: contain !important; background: var(--secondary-background-color); border-radius: 9px; } - [data-testid="stChatMessage"] img { - max-width: 480px !important; + [data-testid="stChatMessage"] img, + [data-testid="stImage"] img { + max-width: 100% !important; + max-height: 70vh !important; border-radius: 9px; } [data-testid="stChatMessage"] { @@ -1099,34 +1117,44 @@ def _render_io() -> None: # ── Sensors ─────────────────────────────────────────────────────────────── sensors = [p for p in named if p.get("kind") == "sensor"] if sensors: - st.markdown("## Sensors") + st.markdown("### 🔍 Sensors") cols = st.columns(min(3, len(sensors))) for i, s in enumerate(sensors): with cols[i]: - st.caption(f"{s['label']} · pin {s['pin']} · {s.get('mode', 'analog')}") - if st.button("Read", key=f"sensor_{i}", use_container_width=True): - r = client.request( - "GET", - f"/pin/{s['pin']}", - params={"mode": s.get("mode", "analog")}, + with st.container(border=True, height=170): + mode = s.get("mode", "analog") + st.markdown( + f"**{s['label']}** {mode}", + unsafe_allow_html=True, ) - st.session_state[f"sv_{s['pin']}"] = ( - r.body.get("value") if r.ok else "—" + st.caption(f"pin {s['pin']}") + if st.button("Read", key=f"sensor_{i}", use_container_width=True): + r = client.request( + "GET", + f"/pin/{s['pin']}", + params={"mode": mode}, + ) + st.session_state[f"sv_{s['pin']}"] = ( + r.body.get("value") if r.ok else "—" + ) + sensor_value = st.session_state.get(f"sv_{s['pin']}", "—") + st.markdown( + f"
{sensor_value}
", + unsafe_allow_html=True, ) - st.metric("Value", st.session_state.get(f"sv_{s['pin']}", "—")) elif named: st.info("No sensor pins configured.") st.divider() # ── Actuators ───────────────────────────────────────────────────────────── - st.markdown("## Actuators") + st.markdown("### ⚡ Actuators") a, b = st.columns(2) with a: - st.markdown("**Irrigation**") - with st.form("water"): - secs = st.number_input("Seconds", 0.1, 300.0, 2.0, 0.5) - if st.form_submit_button("Water", use_container_width=True): + with st.container(border=True, height=320): + st.markdown("**💧 Irrigation**") + secs = st.number_input("Seconds", 0.1, 300.0, 2.0, 0.5, key="water_secs") + if st.button("Water", use_container_width=True, type="primary"): r = client.request( "POST", "/actions", @@ -1136,43 +1164,62 @@ def _render_io() -> None: st.success("Queued") else: st.error(r.error_message()) + st.caption("Runs the pump for the selected duration.") with b: - st.markdown("**Peripheral control**") - outputs = [p for p in named if p.get("kind") != "sensor"] - if not outputs: - st.info("No output pins configured.") - else: - sel = st.selectbox( - "Output", - outputs, - format_func=lambda p: f"{p['label']} · pin {p['pin']}", - ) - if sel: - mode = sel.get("mode", "digital") - high_mode = st.segmented_control( - "HIGH mode", ["Timed", "Keep on"], default="Timed" + with st.container(border=True, height=320): + st.markdown("**🔌 Peripheral control**") + outputs = [p for p in named if p.get("kind") != "sensor"] + if not outputs: + st.info("No output pins configured.") + else: + sel = st.selectbox( + "Output", + outputs, + format_func=lambda p: f"{p['label']} · pin {p['pin']}", + label_visibility="collapsed", ) - if high_mode == "Timed": - high_secs = st.number_input( - "Seconds to stay HIGH", - 0.1, - 300.0, - 2.0, - 0.5, - key="high_secs", + if sel: + mode = sel.get("mode", "digital") + st.markdown( + f"{mode}", + unsafe_allow_html=True, ) - else: - high_secs = None - - off, on = st.columns(2) - if off.button("Set LOW", use_container_width=True): - _do_pin_write(client, sel["pin"], 0, mode) - if on.button("Set HIGH", use_container_width=True): - if high_mode == "Timed" and high_secs is not None: - _do_pin_pulse(client, sel["pin"], high_secs, mode) + + if mode == "analog": + val_col, btn_col = st.columns([4, 1]) + with val_col: + analog_value = st.slider( + "PWM value", + min_value=0, + max_value=255, + value=0, + key=f"analog_value_{sel['pin']}", + ) + with btn_col: + st.markdown("
", unsafe_allow_html=True) + if st.button("Apply", use_container_width=True): + _do_pin_write(client, sel["pin"], analog_value, mode) else: - _do_pin_write(client, sel["pin"], 1, mode) + pulse = st.toggle("Timed pulse", value=True) + if pulse: + pulse_secs = st.number_input( + "Seconds", + 0.1, + 300.0, + 2.0, + 0.5, + key=f"pulse_secs_{sel['pin']}", + ) + + off, on = st.columns(2) + if off.button("⏻ OFF", use_container_width=True): + _do_pin_write(client, sel["pin"], 0, mode) + if on.button("⏻ ON", use_container_width=True, type="primary"): + if pulse and pulse_secs is not None: + _do_pin_pulse(client, sel["pin"], pulse_secs, mode) + else: + _do_pin_write(client, sel["pin"], 1, mode) def _render_camera() -> None: @@ -1220,7 +1267,7 @@ def _render_camera() -> None: img_col, ctrl_col = st.columns([1.6, 1]) with img_col: - st.image(selected.get("attachment_url"), width=500) + st.image(selected.get("attachment_url"), use_container_width=True) with ctrl_col: st.markdown("**AI analysis**") @@ -1371,7 +1418,7 @@ def _render_camera() -> None: result_cols = st.columns(len(result["paths"])) for idx, (path, caption) in enumerate(zip(result["paths"], result["captions"])): with result_cols[idx]: - st.image(path, caption=caption, width=280) + st.image(path, caption=caption, use_container_width=True) st.markdown("**Raw output**") if result.get("class_scores"): @@ -1472,10 +1519,13 @@ def _render_assistant() -> None: with clear_col: if st.button("Clear chat", use_container_width=True): st.session_state["assistant_messages"] = [] + _persist_session() st.rerun() + _render_session_controls() selected_model = _render_model_picker() st.session_state["assistant_selected_model"] = selected_model _render_chat() + _persist_session() def _render_chat() -> None: @@ -1556,9 +1606,11 @@ def _render_chat() -> None: else: last_assistant["rejected"] = True last_assistant["content"] += "\n\n❌ Cancelled." + _persist_session() st.rerun() else: st.toast("No pending proposal to approve or reject.", icon="⚠️") + _persist_session() st.rerun() st.session_state["assistant_messages"].append( @@ -1724,6 +1776,7 @@ def _close_text_segment() -> None: "images": photo_images, } ) + _persist_session() st.rerun() @@ -1833,6 +1886,163 @@ def _format_execution_results(results: list[dict[str, Any]]) -> str: return "\n".join(lines) +# ── session persistence ─────────────────────────────────────────────────────── + + +def _has_session_state() -> bool: + """Return True if any chat/plan state has been initialised.""" + return ( + "assistant_messages" in st.session_state + or "assistant_plan_response" in st.session_state + or "executed_plans" in st.session_state + ) + + +def _is_session_empty() -> bool: + """Return True if the current session has nothing worth saving.""" + messages = st.session_state.get("assistant_messages") or [] + plan_response = st.session_state.get("assistant_plan_response") + executed = st.session_state.get("executed_plans") or [] + return not messages and not plan_response and not executed + + +def _restore_session() -> None: + """Load the latest or URL-specified session on first app load.""" + if _has_session_state(): + return + + session_id = st.query_params.get("session") + if isinstance(session_id, list): + session_id = session_id[0] if session_id else None + + snapshot: dict[str, Any] | None = None + if session_id: + snapshot = history.load_session(session_id) + if snapshot is None: + sessions = history.list_sessions(limit=1) + if sessions: + snapshot = history.load_session(sessions[0]["session_id"]) + + if snapshot is None: + snapshot = history.empty_snapshot() + + st.session_state["assistant_session_id"] = snapshot["session_id"] + st.session_state["assistant_session_label"] = snapshot.get("label") + st.session_state["assistant_messages"] = snapshot.get("assistant_messages", []) + st.session_state["assistant_plan_request"] = snapshot.get( + "assistant_plan_request", "" + ) + st.session_state["assistant_plan_response"] = snapshot.get( + "assistant_plan_response" + ) + st.session_state["assistant_plan_status"] = snapshot.get( + "assistant_plan_status" + ) + st.session_state["assistant_selected_model"] = snapshot.get( + "assistant_selected_model" + ) + st.session_state["executed_plans"] = snapshot.get("executed_plans", []) + + +def _persist_session() -> None: + """Save the current chat/plan state to disk.""" + if _is_session_empty(): + return + snapshot = history.empty_snapshot( + session_id=st.session_state.get("assistant_session_id") + ) + snapshot["label"] = st.session_state.get("assistant_session_label") + snapshot["created_at"] = st.session_state.get( + "assistant_session_created_at", snapshot["created_at"] + ) + snapshot["assistant_messages"] = st.session_state.get("assistant_messages", []) + snapshot["assistant_plan_request"] = st.session_state.get( + "assistant_plan_request", "" + ) + snapshot["assistant_plan_response"] = st.session_state.get( + "assistant_plan_response" + ) + snapshot["assistant_plan_status"] = st.session_state.get("assistant_plan_status") + snapshot["assistant_selected_model"] = st.session_state.get( + "assistant_selected_model" + ) + snapshot["executed_plans"] = st.session_state.get("executed_plans", []) + history.save_session(snapshot) + + +def _render_session_controls() -> None: + """Render session management widgets inside the Assistant tab.""" + with st.expander("🗂️ Session", expanded=False): + current_label = st.text_input( + "Session label", + value=st.session_state.get("assistant_session_label") or "", + key="assistant_session_label_input", + placeholder="e.g. watering experiment", + ) + st.session_state["assistant_session_label"] = ( + current_label.strip() or None + ) + + new_col, save_col = st.columns([1, 1]) + if new_col.button("New session", use_container_width=True): + _persist_session() + new_id = history.new_session_id() + st.session_state["assistant_session_id"] = new_id + st.session_state["assistant_session_label"] = None + st.session_state["assistant_messages"] = [] + st.session_state["assistant_plan_request"] = "" + st.session_state["assistant_plan_response"] = None + st.session_state["assistant_plan_status"] = None + st.session_state["executed_plans"] = [] + st.query_params.pop("session", None) + st.rerun() + if save_col.button("Save now", use_container_width=True): + _persist_session() + st.toast("Session saved", icon="💾") + + sessions = history.list_sessions(limit=20) + if sessions: + st.divider() + st.markdown("**Previous sessions**") + for sess in sessions: + if sess["session_id"] == st.session_state.get("assistant_session_id"): + continue + label = sess["label"] or sess["session_id"] + preview = sess["preview"] + c1, c2, c3 = st.columns([3, 1, 1]) + c1.caption(f"{label}" + (f" · {preview}" if preview else "")) + if c2.button("Load", key=f"load_sess_{sess['session_id']}", use_container_width=True): + snapshot = history.load_session(sess["session_id"]) + if snapshot is None: + st.error("Session not found") + continue + st.session_state["assistant_session_id"] = snapshot["session_id"] + st.session_state["assistant_session_label"] = snapshot.get("label") + st.session_state["assistant_messages"] = snapshot.get( + "assistant_messages", [] + ) + st.session_state["assistant_plan_request"] = snapshot.get( + "assistant_plan_request", "" + ) + st.session_state["assistant_plan_response"] = snapshot.get( + "assistant_plan_response" + ) + st.session_state["assistant_plan_status"] = snapshot.get( + "assistant_plan_status" + ) + st.session_state["assistant_selected_model"] = snapshot.get( + "assistant_selected_model" + ) + st.session_state["executed_plans"] = snapshot.get( + "executed_plans", [] + ) + st.query_params["session"] = snapshot["session_id"] + st.rerun() + if c3.button("🗑", key=f"del_sess_{sess['session_id']}", use_container_width=True): + history.delete_session(sess["session_id"]) + st.rerun() + + def _render_plan() -> None: st.caption( "Describe a task. The LLM builds a step-by-step plan; review it before running." @@ -1894,6 +2104,7 @@ def _render_plan() -> None: r.body if r.ok else {"error": r.body} ) st.session_state["assistant_plan_status"] = r.code + _persist_session() response = st.session_state.get("assistant_plan_response") status = st.session_state.get("assistant_plan_status") @@ -1934,17 +2145,26 @@ def _render_plan() -> None: if clear_col.button("Clear", use_container_width=True): st.session_state["assistant_plan_response"] = None st.session_state["assistant_plan_status"] = None + _persist_session() st.rerun() if run_col.button("Run plan", type="primary", use_container_width=True): queued = 0 failed = 0 + action_results: list[dict[str, Any]] = [] for action in actions: r = client.request( "POST", "/actions", json={"kind": action["kind"], "params": action.get("params", {})}, ) + action_results.append( + { + "kind": action["kind"], + "ok": r.ok, + "detail": r.error_message() if not r.ok else None, + } + ) if r.ok: queued += 1 st.toast(f"Queued {action['kind']}", icon="➡️") @@ -1955,11 +2175,101 @@ def _render_plan() -> None: st.success(f"Plan queued · {queued} action(s)") else: st.warning(f"Plan partially queued · {queued} ok, {failed} failed") + executed = st.session_state.get("executed_plans") or [] + executed.append( + { + "request": response.get("request", ""), + "actions": actions, + "results": action_results, + "queued_at": datetime.now().isoformat(), + "status": "ok" if failed == 0 else ("partial" if queued > 0 else "failed"), + } + ) + st.session_state["executed_plans"] = executed st.session_state["assistant_plan_response"] = None st.session_state["assistant_plan_status"] = None + _persist_session() st.rerun() +def _render_history() -> None: + st.markdown( + '
TWFarmBot · UAS Technikum Wien
', + unsafe_allow_html=True, + ) + st.markdown("# History") + + sessions = history.list_sessions(limit=50) + if not sessions: + st.info("No saved sessions yet. Chat and plans are saved automatically.") + return + + st.markdown("## Chat sessions") + for sess in sessions: + label = sess["label"] or sess["session_id"] + updated = sess["updated_at"][:19].replace("T", " ") if sess["updated_at"] else "" + c1, c2 = st.columns([4, 1]) + with c1: + st.markdown(f"**{label}**") + st.caption( + f"Updated {updated}" + (f" · {sess['preview']}" if sess["preview"] else "") + ) + if c2.button("Load", key=f"hist_load_{sess['session_id']}", use_container_width=True): + snapshot = history.load_session(sess["session_id"]) + if snapshot is None: + st.error("Session not found") + else: + st.session_state["assistant_session_id"] = snapshot["session_id"] + st.session_state["assistant_session_label"] = snapshot.get("label") + st.session_state["assistant_messages"] = snapshot.get( + "assistant_messages", [] + ) + st.session_state["assistant_plan_request"] = snapshot.get( + "assistant_plan_request", "" + ) + st.session_state["assistant_plan_response"] = snapshot.get( + "assistant_plan_response" + ) + st.session_state["assistant_plan_status"] = snapshot.get( + "assistant_plan_status" + ) + st.session_state["assistant_selected_model"] = snapshot.get( + "assistant_selected_model" + ) + st.session_state["executed_plans"] = snapshot.get("executed_plans", []) + st.query_params["session"] = snapshot["session_id"] + st.rerun() + + executed = st.session_state.get("executed_plans") or [] + if executed: + st.markdown("## Executed plans") + for idx, plan in enumerate(reversed(executed), start=1): + with st.container(border=True): + queued_at = plan.get("queued_at", "") + ts = queued_at[:19].replace("T", " ") if queued_at else "" + status = plan.get("status", "unknown") + status_emoji = {"ok": "✅", "partial": "⚠️", "failed": "❌"}.get( + status, "❓" + ) + st.markdown( + f"{status_emoji} **Plan {idx}** · {plan.get('request', '')}" + ) + st.caption(f"{ts} · {len(plan.get('actions', []))} action(s) · {status}") + with st.expander("Actions"): + for action in plan.get("actions", []): + st.markdown(f"• {_action_summary(action)}") + results = plan.get("results", []) + if results: + with st.expander("Results"): + for res in results: + icon = "✅" if res.get("ok") else "❌" + detail = res.get("detail") + line = f"{icon} {_action_summary({'kind': res['kind'], 'params': {}})}" + if detail: + line += f" — {detail}" + st.markdown(line) + + def _render_diagnostics() -> None: st.markdown( '
TWFarmBot · UAS Technikum Wien
', @@ -2080,7 +2390,11 @@ def _render_settings() -> None: "Camera": _render_camera, "I/O": _render_io, "Assistant": _render_assistant, + "History": _render_history, "Diagnostics": _render_diagnostics, "Settings": _render_settings, } + +# Restore the latest or URL-specified chat/plan session on first load. +_restore_session() renderers[tab]() diff --git a/apps/ui/src/twfarmbot_ui/history.py b/apps/ui/src/twfarmbot_ui/history.py new file mode 100644 index 0000000..7631737 --- /dev/null +++ b/apps/ui/src/twfarmbot_ui/history.py @@ -0,0 +1,124 @@ +"""Persistence for Streamlit UI session state. + +Chat history, plan previews, and executed plans are saved as JSON files so +they survive page reloads. Storage is local and intended for a single-user +research UI; concurrent writes to the same session file are last-write-wins. +""" + +from __future__ import annotations + +import json +import os +import secrets +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + + +def session_data_dir() -> Path: + """Return the directory used to store session JSON files.""" + path = Path( + os.getenv("TWFB_UI_DATA_DIR", Path.cwd() / "data" / "ui_sessions") + ) + path.mkdir(parents=True, exist_ok=True) + return path + + +def _utc_now() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _session_path(session_id: str) -> Path: + return session_data_dir() / f"{session_id}.json" + + +def new_session_id() -> str: + """Generate a new session id based on an ISO timestamp plus a random suffix.""" + stamp = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H-%M-%S") + suffix = secrets.token_hex(4) + return f"{stamp}-{suffix}" + + +def save_session(snapshot: dict[str, Any]) -> Path: + """Write a session snapshot to disk. + + ``snapshot`` must contain a ``session_id`` key. The ``updated_at`` field + is refreshed automatically. + """ + session_id = snapshot["session_id"] + snapshot["updated_at"] = _utc_now() + path = _session_path(session_id) + path.write_text(json.dumps(snapshot, indent=2, default=str), encoding="utf-8") + return path + + +def load_session(session_id: str) -> dict[str, Any] | None: + """Load a session snapshot by id, or return None if it does not exist.""" + path = _session_path(session_id) + if not path.exists(): + return None + try: + return json.loads(path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + return None + + +def delete_session(session_id: str) -> bool: + """Delete a session file. Returns True if it existed and was removed.""" + path = _session_path(session_id) + if path.exists(): + path.unlink() + return True + return False + + +def list_sessions(limit: int = 50) -> list[dict[str, Any]]: + """Return metadata for saved sessions, newest first. + + Each item contains ``session_id``, ``label``, ``created_at``, + ``updated_at``, and a ``preview`` snippet of the latest user message. + """ + sessions: list[dict[str, Any]] = [] + data_dir = session_data_dir() + for path in data_dir.glob("*.json"): + try: + snapshot = json.loads(path.read_text(encoding="utf-8")) + except (json.JSONDecodeError, OSError): + continue + session_id = snapshot.get("session_id") + if not session_id: + continue + messages = snapshot.get("assistant_messages") or [] + preview = "" + for msg in reversed(messages): + if msg.get("role") == "user": + preview = str(msg.get("content", ""))[:80] + break + sessions.append( + { + "session_id": session_id, + "label": snapshot.get("label") or None, + "created_at": snapshot.get("created_at", ""), + "updated_at": snapshot.get("updated_at", ""), + "preview": preview, + } + ) + sessions.sort(key=lambda s: s["updated_at"], reverse=True) + return sessions[:limit] + + +def empty_snapshot(session_id: str | None = None) -> dict[str, Any]: + """Return a fresh, empty session snapshot.""" + now = _utc_now() + return { + "session_id": session_id or new_session_id(), + "label": None, + "created_at": now, + "updated_at": now, + "assistant_messages": [], + "assistant_plan_request": "", + "assistant_plan_response": None, + "assistant_plan_status": None, + "assistant_selected_model": None, + "executed_plans": [], + } diff --git a/configs/dev.yaml b/configs/dev.yaml index 8f734cb..df11856 100644 --- a/configs/dev.yaml +++ b/configs/dev.yaml @@ -7,14 +7,14 @@ log_level: INFO # the environment, or reference a different env var via api_key_env. planning: base_url: https://openrouter.ai/api/v1 # OpenAI-compatible endpoint - model: openai/gpt-oss-safeguard-20b # any model the endpoint exposes + model: openai/gpt-4o-mini # tool-calling model api_key_env: PLANNING_LLM_API_KEY # name of the env var that holds the secret timeout_s: 120 # raise this for slower models temperature: 0.0 - # Provider-specific request body extensions. For DeepSeek V4 reasoning on - # OpenRouter use: extra_body: {reasoning: {effort: low}} + # Provider-specific request body extensions. Only needed for reasoning models. + # For DeepSeek V4 on OpenRouter use: extra_body: {reasoning: {effort: low}} # For native DeepSeek use: extra_body: {thinking: {type: enabled}} - extra_body: {reasoning: {effort: low}} + extra_body: {} # Optional W&B Weave project for tracing. Set WEAVE_PROJECT or this field # to enable model/tool-call/token tracing. weave_project: "" @@ -46,6 +46,7 @@ watering: pins: - {label: "Pump", pin: 7, mode: digital, kind: valve, group: "Water"} - {label: "Water solenoid", pin: 8, mode: digital, kind: valve, group: "Water"} +- {label: "Cutting end effector", pin: 4, mode: analog, kind: io, group: "Tools"} - {label: "Tool servo", pin: 10, mode: digital, kind: servo, group: "Tools"} - {label: "Soil sensor", pin: 14, mode: analog, kind: sensor, group: "Sensors"} - {label: "Light sensor", pin: 15, mode: analog, kind: sensor, group: "Sensors"} diff --git a/tests/test_history.py b/tests/test_history.py new file mode 100644 index 0000000..d956543 --- /dev/null +++ b/tests/test_history.py @@ -0,0 +1,106 @@ +"""Tests for the UI session persistence helper.""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from twfarmbot_ui import history + + +@pytest.fixture +def tmp_history_dir(tmp_path: pytest.TempPathFactory, monkeypatch: pytest.MonkeyPatch): + """Redirect session storage to a temp directory for each test.""" + monkeypatch.setattr(history, "session_data_dir", lambda: tmp_path) + return tmp_path + + +def test_empty_snapshot_has_required_keys() -> None: + snap = history.empty_snapshot("test-id") + assert snap["session_id"] == "test-id" + assert snap["assistant_messages"] == [] + assert snap["executed_plans"] == [] + assert "created_at" in snap + assert "updated_at" in snap + + +def test_save_and_load_session_round_trip(tmp_history_dir: Any) -> None: + snap = history.empty_snapshot("sess-1") + snap["label"] = "my session" + snap["assistant_messages"] = [ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "hi"}, + ] + snap["executed_plans"] = [ + { + "request": "water bed", + "actions": [{"kind": "water", "params": {"seconds": 10}}], + "results": [{"kind": "water", "ok": True, "detail": None}], + "queued_at": "2026-06-24T12:00:00", + "status": "ok", + } + ] + history.save_session(snap) + + loaded = history.load_session("sess-1") + assert loaded is not None + assert loaded["label"] == "my session" + assert loaded["assistant_messages"][0]["content"] == "hello" + assert len(loaded["executed_plans"]) == 1 + assert loaded["executed_plans"][0]["status"] == "ok" + + +def test_save_session_updates_updated_at(tmp_history_dir: Any) -> None: + snap = history.empty_snapshot("sess-2") + original_updated = snap["updated_at"] + history.save_session(snap) + loaded = history.load_session("sess-2") + assert loaded is not None + assert loaded["updated_at"] >= original_updated + + +def test_load_missing_session_returns_none(tmp_history_dir: Any) -> None: + assert history.load_session("does-not-exist") is None + + +def test_delete_session(tmp_history_dir: Any) -> None: + snap = history.empty_snapshot("sess-del") + history.save_session(snap) + assert history.delete_session("sess-del") is True + assert history.load_session("sess-del") is None + assert history.delete_session("sess-del") is False + + +def test_list_sessions_orders_by_updated_desc(tmp_history_dir: Any) -> None: + old = history.empty_snapshot("old-sess") + history.save_session(old) + + new = history.empty_snapshot("new-sess") + new["assistant_messages"] = [{"role": "user", "content": "latest"}] + history.save_session(new) + + sessions = history.list_sessions() + ids = [s["session_id"] for s in sessions] + assert ids == ["new-sess", "old-sess"] + + +def test_list_sessions_includes_preview(tmp_history_dir: Any) -> None: + snap = history.empty_snapshot("preview-sess") + snap["assistant_messages"] = [ + {"role": "assistant", "content": "hi"}, + {"role": "user", "content": "water the tomatoes please"}, + ] + history.save_session(snap) + + sessions = history.list_sessions() + assert len(sessions) == 1 + assert sessions[0]["preview"] == "water the tomatoes please" + + +def test_list_sessions_respects_limit(tmp_history_dir: Any) -> None: + for i in range(5): + snap = history.empty_snapshot(f"sess-{i}") + history.save_session(snap) + assert len(history.list_sessions(limit=2)) == 2 + assert len(history.list_sessions(limit=10)) == 5