diff --git a/.gitignore b/.gitignore index 185ad45..12ba88a 100644 --- a/.gitignore +++ b/.gitignore @@ -105,3 +105,6 @@ dmypy.json /CSharp/Examples/RailenginePoweredStatusPage/appsettings.Development.json + +**/.railtracks +**/uv.lock diff --git a/Python/README.md b/Python/README.md index 2bc0b66..4bc989a 100644 --- a/Python/README.md +++ b/Python/README.md @@ -1 +1,7 @@ -These samples use the Railengine Python SDK +# Railengine Python Examples + +These samples use the Railengine Python SDK and [Railtracks](https://github.com/RailtownAI/railtracks). + +## Examples + +- **[Customer Support Triage Agent](customer-support/README.md)** — Ingest with `rail-engine-ingest`, search with `rail-engine`, triage with a Railtracks agent (dashboard + ingest UI; setup details are in `customer-support/README.md`). diff --git a/Python/customer-support/.env.example b/Python/customer-support/.env.example new file mode 100644 index 0000000..1b00789 --- /dev/null +++ b/Python/customer-support/.env.example @@ -0,0 +1,15 @@ +# Ingestion (from Project Settings → Engine Token) +ENGINE_TOKEN="[Your Engine Token]" + +# Retrieval / search (PAT + Engine ID) +ENGINE_ID="[Your Engine ID]" +ENGINE_PAT="[Your Engine PAT]" + +# Optional: override API host (must match the stack your ENGINE_TOKEN targets) +# RAILTOWN_API_URL="https://cndr.railtown.ai/api" + +# Railtracks LLM (default: OpenAI — set one provider the Railtracks OpenAI/Anthropic integrations expect) +OPENAI_API_KEY="[Your OpenAI API key]" + +# Optional: Anthropic instead of OpenAI (edit agent.py to use rt.llm.AnthropicLLM) +# ANTHROPIC_API_KEY="[Your Anthropic API key]" diff --git a/Python/customer-support/README.md b/Python/customer-support/README.md new file mode 100644 index 0000000..8d4fd79 --- /dev/null +++ b/Python/customer-support/README.md @@ -0,0 +1,103 @@ +# Customer Support Triage + +Demo stack: ingest support tickets into [Railengine](https://railengine.ai/), search resolved history (keyword index + `VectorStore1`), and triage a case with [Railtracks](https://github.com/RailtownAI/railtracks) structured output (priority, category, summaries, draft reply). + +## Before you start + +- A [Railengine](https://railengine.ai/) account plus a **new engine** configured with the sample schema in [`engine-schema.json`](engine-schema.json). +- Select that schema into your engine creation modal so documents match **`SupportTicket`**. +- Allowed ticket **`status`** values when ingesting vs. validating in-app: **`pending`**, **`open`**, **`in_progress`**, **`resolved`** — update long-lived engine/schema rules if yours differ before re-ingesting fixtures. +- Enable **Index** plus **VectorStore1** on fields such as `subject`, `body`, and `tags` in the Railengine console so search tools get useful hits beyond raw storage scans. + +## Quick start + +From `Python/customer-support/`: + +```bash +cd Python/customer-support +cp .env.example .env # fill ENGINE_TOKEN, ENGINE_ID, ENGINE_PAT, OPENAI_API_KEY +uv sync +uv run streamlit run src/streamlit_app.py +``` + +## First demo flow + +1. Open **Ingest**, load **`fixtures/tickets/ticket_001.json`**, and click **Ingest to Railengine**. +2. (Optional breadth) On **Ingest**, use sidebar **Seed all fixtures** to ingest every `fixtures/tickets/*.json`. +3. On **Ingest**, click **Run triage** on a single ticket, or open **Agent** → **Load queue** → **Triage all** for open and pending tickets. +4. Switch to **Dashboard**, click **Refresh board**, and browse the Kanban. Click a **card subject** to open ticket details in a dialog; change status from the card **dropdown** (**requires `ENGINE_TOKEN`** alongside list credentials). +5. Open **Search**, enter keywords (e.g. `billing invoice`), and press **Enter** or **🔍 Search** to query the keyword index. Select a row (or **📋 Details**) to open the ticket dialog. + +## Debug and visualize the triage agent (optional) + +After you run triage once (**Ingest** or **Agent**), inspect agent runs in the Railtracks UI: + +```bash +cd Python/customer-support +railtracks update +railtracks viz +``` + +(`railtracks[visual]` is included in project dependencies; run `uv sync` if you have not already.) + +Opens the local visualization app so you can debug tool calls, prompts, and structured output from the support triage flow. + +## Environment variables + +| Variable | Used for | Required when | +|----------|-----------|---------------| +| `ENGINE_TOKEN` | Ingest SDK | **Ingest** page · **Kanban status** dropdown | +| `ENGINE_PAT` | Retrieval / list / search | **Dashboard**, **Search**, triage tools | +| `ENGINE_ID` | Engine routing | **Dashboard**, **Search**, triage tools | +| `OPENAI_API_KEY` | Railtracks LLM | **Run triage** | + +A local `.env` next to [`pyproject.toml`](pyproject.toml) is loaded automatically for Streamlit and the webhook receiver. + +
+Optional: PII masking + +If your engine masks sensitive fields after ingest, compare raw fixtures to stored docs in the dashboard to illustrate compliance-aware storage.
+ +## Project layout + +Code is organized in layers so UI, business logic, and Railengine I/O stay separate. Imports use the **`customer_support`** package (`pyproject.toml` maps `src/` to that name). + +```text +pages / controllers → services → repositories → rail-engine / rail-engine-ingest + ↓ + models +agents (tools) → services or repositories +``` + +| Path | Role | Examples | +|------|------|----------| +| [`src/models/`](src/models/) | Pydantic domain types and constants | `SupportTicket`, `TriageAssessment`, `TicketPage` | +| [`src/repositories/`](src/repositories/) | SDK calls only (`Railengine`, `RailengineIngest` with `model=SupportTicket`) | `TicketRepository` — list, search, ingest, JSONPath | +| [`src/services/`](src/services/) | Use-case orchestration; pages call these, not the SDK | `IngestService`, `TicketListService`, `SearchService`, `TriageService` | +| [`src/agents/`](src/agents/) | Railtracks structured agent, chat agent, and tool nodes | `triage_agent.py`, `tools.py` (`search_similar_tickets`, …) | +| [`src/pages/`](src/pages/) | Streamlit screens (thin UI + session state) | `dashboard.py`, `search_page.py`, `ingest_page.py`, `agent_page.py` | +| [`src/controllers/`](src/controllers/) | Optional non-UI entry points | [`webhook.py`](src/controllers/webhook.py) — local publishing smoke test | +| [`src/config/`](src/config/) | Environment bootstrap | `ensure_dotenv_loaded()` loads `.env` next to `pyproject.toml` | +| [`src/streamlit_app.py`](src/streamlit_app.py) | App entry: `st.navigation` for Dashboard / Search / Ingest / Agent | `uv run streamlit run src/streamlit_app.py` | +| [`src/streamlit_common.py`](src/streamlit_common.py) | Shared UI (brand, toolbar, Kanban cards, ticket dialog, pipeline diagram) | Used across pages | + +**Repository root (besides `src/`)** + +- [`engine-schema.json`](engine-schema.json) — paste into the Railengine console when creating the engine +- [`fixtures/tickets/`](fixtures/tickets/) — sample `SupportTicket` JSON for ingest demos +- [`assets/`](assets/) — Streamlit branding (e.g. logo) +- [`.env.example`](.env.example) — required env var template + +## Local only + +**Do not** expose the Streamlit app on the public internet with live credentials unless you add authentication and hardening yourself. + +## Optional: webhook receiver + +Activation / publishing smoke test: + +```bash +uv run python -m customer_support.controllers.webhook --port 8765 +``` + +POST to `http://127.0.0.1:8765/webhook` (tunnel with ngrok if you need a public URL). diff --git a/Python/customer-support/assets/logo-railengine.png b/Python/customer-support/assets/logo-railengine.png new file mode 100644 index 0000000..773b30c Binary files /dev/null and b/Python/customer-support/assets/logo-railengine.png differ diff --git a/Python/customer-support/engine-schema.json b/Python/customer-support/engine-schema.json new file mode 100644 index 0000000..accdbb6 --- /dev/null +++ b/Python/customer-support/engine-schema.json @@ -0,0 +1,11 @@ +{ + "id": "ticket-sample-001", + "subject": "Unable to download invoice — billing portal error", + "body": "I'm logged in as finance@acmecorp.example but the invoice PDF returns 500. Our API key for the sandbox is sk-demo-INVALID-KEY-12345. Please call me at +1-555-0199.", + "status": "open", + "tags": ["billing", "portal"], + "createdAt": "2026-05-01T14:30:00Z", + "customerEmail": "finance@acmecorp.example", + "customerPhone": "+1-555-0199", + "productArea": "billing" +} diff --git a/Python/customer-support/fixtures/tickets/pending_001.json b/Python/customer-support/fixtures/tickets/pending_001.json new file mode 100644 index 0000000..8916519 --- /dev/null +++ b/Python/customer-support/fixtures/tickets/pending_001.json @@ -0,0 +1,11 @@ +{ + "id": "ticket-pending-001", + "subject": "Awaiting billing approval for quota increase", + "body": "Customer requested doubling API quota pending finance sign-off on the purchase order.", + "status": "pending", + "tags": ["billing", "quota"], + "createdAt": "2026-05-21T09:00:00Z", + "customerEmail": "ops-lead@vendor.example", + "customerPhone": "", + "productArea": "billing" +} diff --git a/Python/customer-support/fixtures/tickets/resolved_auth_001.json b/Python/customer-support/fixtures/tickets/resolved_auth_001.json new file mode 100644 index 0000000..39eb707 --- /dev/null +++ b/Python/customer-support/fixtures/tickets/resolved_auth_001.json @@ -0,0 +1,11 @@ +{ + "id": "ticket-resolved-auth-001", + "subject": "Resolved: MFA enrollment loop for new hires", + "body": "Caused by stale device id in mobile app. Support had users reinstall app and re-enroll TOTP. No tenant-wide outage.", + "status": "resolved", + "tags": ["auth", "mfa", "mobile"], + "createdAt": "2026-03-18T14:00:00Z", + "customerEmail": "redacted@example.com", + "customerPhone": "", + "productArea": "authentication" +} diff --git a/Python/customer-support/fixtures/tickets/resolved_billing_001.json b/Python/customer-support/fixtures/tickets/resolved_billing_001.json new file mode 100644 index 0000000..0d6a7e0 --- /dev/null +++ b/Python/customer-support/fixtures/tickets/resolved_billing_001.json @@ -0,0 +1,11 @@ +{ + "id": "ticket-resolved-billing-001", + "subject": "Resolved: VAT ID missing on invoice export", + "body": "Customer could not download VAT-compliant PDFs. Fix was to add VAT ID in Billing → Company profile and clear CDN cache.", + "status": "resolved", + "tags": ["billing", "vat", "invoice"], + "createdAt": "2026-04-10T09:00:00Z", + "customerEmail": "redacted@example.com", + "customerPhone": "", + "productArea": "billing" +} diff --git a/Python/customer-support/fixtures/tickets/resolved_billing_002.json b/Python/customer-support/fixtures/tickets/resolved_billing_002.json new file mode 100644 index 0000000..23422b4 --- /dev/null +++ b/Python/customer-support/fixtures/tickets/resolved_billing_002.json @@ -0,0 +1,11 @@ +{ + "id": "ticket-resolved-billing-002", + "subject": "Resolved: Portal 500 on invoice — stale session cookie", + "body": "User saw 500 on invoice PDF until cookies cleared. Documented workaround and shipped fix in portal 2.3.1.", + "status": "resolved", + "tags": ["billing", "portal", "cache"], + "createdAt": "2026-04-22T11:30:00Z", + "customerEmail": "redacted@example.com", + "customerPhone": "", + "productArea": "billing" +} diff --git a/Python/customer-support/fixtures/tickets/ticket_001.json b/Python/customer-support/fixtures/tickets/ticket_001.json new file mode 100644 index 0000000..54be87f --- /dev/null +++ b/Python/customer-support/fixtures/tickets/ticket_001.json @@ -0,0 +1,11 @@ +{ + "id": "ticket-open-001", + "subject": "Invoice PDF returns 500 in billing portal", + "body": "I'm logged in as finance@acmecorp.example but every invoice download fails with HTTP 500. Our sandbox integration key is sk-demo-INVALID-KEY-12345 — pasted from the dashboard. Please call me at +1-555-0199 if you need to debug live.", + "status": "open", + "tags": ["billing", "portal", "urgent"], + "createdAt": "2026-05-20T15:00:00Z", + "customerEmail": "finance@acmecorp.example", + "customerPhone": "+1-555-0199", + "productArea": "billing" +} diff --git a/Python/customer-support/fixtures/tickets/ticket_002.json b/Python/customer-support/fixtures/tickets/ticket_002.json new file mode 100644 index 0000000..bfc2d1c --- /dev/null +++ b/Python/customer-support/fixtures/tickets/ticket_002.json @@ -0,0 +1,11 @@ +{ + "id": "ticket-open-002", + "subject": "SSO fails after password rotation — error AADSTS50076", + "body": "All users in our eu-west tenant see AADSTS50076 when signing in via SAML. Admin contact: it-lead@contoso.example. Mobile: +44 20 7946 0958. Happened right after we rotated IdP certs.", + "status": "in_progress", + "tags": ["auth", "sso", "azure-ad"], + "createdAt": "2026-05-20T16:20:00Z", + "customerEmail": "it-lead@contoso.example", + "customerPhone": "+44 20 7946 0958", + "productArea": "authentication" +} diff --git a/Python/customer-support/pyproject.toml b/Python/customer-support/pyproject.toml new file mode 100644 index 0000000..1d9dcd5 --- /dev/null +++ b/Python/customer-support/pyproject.toml @@ -0,0 +1,33 @@ +[build-system] +requires = ["setuptools>=61"] +build-backend = "setuptools.build_meta" + +[project] +name = "customer-support-triage" +version = "0.1.0" +description = "Customer support triage demo: Railengine ingest + search + Railtracks agent" +readme = "README.md" +requires-python = ">=3.10" +dependencies = [ + "rail-engine>=0.2.1", + "rail-engine-ingest>=0.2.1", + "railtracks[visual]>=1.3.0", + "pydantic>=2.0", + "python-dotenv>=1.0.0", + "streamlit>=1.57.0", + "watchdog>=6.0.0", +] + +# Modules live directly under ``src/``; setuptools maps that tree to ``customer_support.*``. +[tool.setuptools] +package-dir = { "customer_support" = "src" } +packages = [ + "customer_support", + "customer_support.agents", + "customer_support.config", + "customer_support.controllers", + "customer_support.models", + "customer_support.pages", + "customer_support.repositories", + "customer_support.services", +] diff --git a/Python/customer-support/src/__init__.py b/Python/customer-support/src/__init__.py new file mode 100644 index 0000000..40ca1d5 --- /dev/null +++ b/Python/customer-support/src/__init__.py @@ -0,0 +1,5 @@ +"""Customer support triage example: Railengine + Railtracks.""" + +__all__ = ["__version__"] + +__version__ = "0.1.0" diff --git a/Python/customer-support/src/agents/__init__.py b/Python/customer-support/src/agents/__init__.py new file mode 100644 index 0000000..398bd9e --- /dev/null +++ b/Python/customer-support/src/agents/__init__.py @@ -0,0 +1,5 @@ +"""Railtracks agents.""" + +from customer_support.agents.triage_agent import build_triage_agent + +__all__ = ["build_triage_agent"] diff --git a/Python/customer-support/src/agents/tools.py b/Python/customer-support/src/agents/tools.py new file mode 100644 index 0000000..898c05a --- /dev/null +++ b/Python/customer-support/src/agents/tools.py @@ -0,0 +1,121 @@ +"""Railtracks tool nodes backed by TicketRepository (no global SDK client).""" + +from __future__ import annotations + +import json +from typing import Any, Literal + +import railtracks as rt + +from customer_support.models.ticket import TICKET_STATUSES, SupportTicket +from customer_support.repositories import TicketRepository + + +def _ticket_to_brief(t: SupportTicket) -> dict[str, Any]: + return { + "id": t.id, + "subject": t.subject, + "status": t.status, + "tags": t.tags, + "productArea": t.productArea, + "createdAt": t.createdAt, + "body_excerpt": (t.body[:280] + "…") if len(t.body) > 280 else t.body, + } + + +@rt.function_node +async def search_similar_tickets( + query: str, + mode: Literal["vector", "index", "both"] = "both", + limit: int = 6, +) -> str: + """ + Search historical tickets in Railengine using keyword index and/or semantic vector search. + + Args: + query: Natural language or keywords describing the issue (subject + symptoms). + mode: Use embedding search (vector), keyword index (index), or run both and merge. + limit: Max tickets to return per search mode (capped for latency). + """ + limit = max(1, min(int(limit), 25)) + merged: dict[str, SupportTicket] = {} + repo = TicketRepository() + + if mode in ("index", "both"): + for t in await repo.search_index_hits(query, limit): + merged[t.id] = t + if mode in ("vector", "both"): + for t in await repo.search_vector_hits(query, limit): + merged[t.id] = t + + briefs = [_ticket_to_brief(t) for t in merged.values()] + return json.dumps(briefs, ensure_ascii=False, indent=2) + + +@rt.function_node +async def list_recent_tickets(status: str = "resolved", limit: int = 15) -> str: + """ + List recent tickets from hot storage. Filter client-side by status. + + Args: + status: pending | open | in_progress | resolved — only matching tickets returned. + limit: Max rows to return (JSONPath query when supported; otherwise capped scan). + """ + repo = TicketRepository() + want = status.strip().lower() + valid = set(TICKET_STATUSES) + if want not in valid: + return json.dumps( + {"error": "status must be pending, open, in_progress, or resolved"}, + indent=2, + ) + + cap = max(1, min(int(limit), 50)) + collected: list[SupportTicket] = [] + + for t in await repo.query_jsonpath_tickets(f"$.status:{want}"): + collected.append(t) + if len(collected) >= cap: + break + + if len(collected) < cap: + max_scan = 400 + seen_ids = {t.id for t in collected} + async for t in repo.iter_storage_tickets(page_size=100, max_docs=max_scan): + if t.status != want: + continue + if t.id in seen_ids: + continue + seen_ids.add(t.id) + collected.append(t) + if len(collected) >= cap: + break + + return json.dumps( + [_ticket_to_brief(t) for t in collected], + ensure_ascii=False, + indent=2, + ) + + +@rt.function_node +async def get_ticket_by_id(ticket_id: str) -> str: + """ + Fetch a ticket by its business id (the `id` field inside the ingested JSON document). + + Uses JSONPath storage query when available; falls back to a capped scan of recent pages. + """ + repo = TicketRepository() + tid = ticket_id.strip() + if not tid: + return json.dumps({"error": "ticket_id required"}, indent=2) + + for t in await repo.query_jsonpath_tickets(f"$.id:{tid}"): + if t.id == tid: + return json.dumps(_ticket_to_brief(t), ensure_ascii=False, indent=2) + + async for t in repo.iter_storage_tickets(page_size=100, max_docs=500): + if t.id == tid: + return json.dumps(_ticket_to_brief(t), ensure_ascii=False, indent=2) + + return json.dumps({"error": f"ticket not found: {tid}"}, indent=2) diff --git a/Python/customer-support/src/agents/triage_agent.py b/Python/customer-support/src/agents/triage_agent.py new file mode 100644 index 0000000..2a18639 --- /dev/null +++ b/Python/customer-support/src/agents/triage_agent.py @@ -0,0 +1,78 @@ +"""Railtracks triage agent definition.""" + +from __future__ import annotations + +import os + +import railtracks as rt + +from customer_support.agents.tools import ( + get_ticket_by_id, + list_recent_tickets, + search_similar_tickets, +) +from customer_support.models import TriageAssessment + + +def build_triage_agent(): + """Create agent with structured JSON output and Railengine-backed tools.""" + api_key = os.environ.get("OPENAI_API_KEY", "").strip() + if not api_key: + raise RuntimeError( + "OPENAI_API_KEY is not set. Add it to .env or export it before running triage." + ) + + llm = rt.llm.OpenAILLM("gpt-5.4") + + system = """You are an enterprise support triage lead. You receive a single support ticket (JSON). + +Rules: +- Use the search tools to find similar RESOLVED tickets before writing a reply. +- Never copy API keys, tokens, or passwords into the draft_reply — refer to them only as + "the credential mentioned in the ticket" if needed. +- Prioritize customer impact and whether the issue blocks billing, security, or wide outages. +- Populate similar_ticket_ids with ids you actually saw from tool results (may be empty if none). +- Keep internal_summary factual and concise. +""" + + return rt.agent_node( + "Support Triage Agent", + tool_nodes=( + search_similar_tickets, + list_recent_tickets, + get_ticket_by_id, + ), + llm=llm, + system_message=system, + output_schema=TriageAssessment, + ) + + +def build_triage_chat_agent(): + """Conversational triage lead with the same Railengine tools (no structured output).""" + api_key = os.environ.get("OPENAI_API_KEY", "").strip() + if not api_key: + raise RuntimeError( + "OPENAI_API_KEY is not set. Add it to .env or export it before running triage." + ) + + llm = rt.llm.OpenAILLM("gpt-5.4") + + system = """You are an enterprise support triage lead chatting with a human support manager. + +You help prioritize open and pending tickets, explain customer impact, and suggest next steps. +Use search_similar_tickets, list_recent_tickets, and get_ticket_by_id when you need historical context from Railengine. +Never paste API keys, tokens, or passwords — refer to them only as credentials mentioned in a ticket. +Be concise and actionable. When comparing tickets, state priority rationale clearly (P1–P4 style). +""" + + return rt.agent_node( + "Support Triage Chat Agent", + tool_nodes=( + search_similar_tickets, + list_recent_tickets, + get_ticket_by_id, + ), + llm=llm, + system_message=system, + ) diff --git a/Python/customer-support/src/config/__init__.py b/Python/customer-support/src/config/__init__.py new file mode 100644 index 0000000..24d3f64 --- /dev/null +++ b/Python/customer-support/src/config/__init__.py @@ -0,0 +1,5 @@ +"""Configuration helpers.""" + +from customer_support.config.env import ensure_dotenv_loaded + +__all__ = ["ensure_dotenv_loaded"] diff --git a/Python/customer-support/src/config/env.py b/Python/customer-support/src/config/env.py new file mode 100644 index 0000000..37965af --- /dev/null +++ b/Python/customer-support/src/config/env.py @@ -0,0 +1,17 @@ +"""Load `.env` before reading Railengine / LLM environment variables.""" + +from __future__ import annotations + +from pathlib import Path + +from dotenv import load_dotenv + + +def ensure_dotenv_loaded() -> None: + """ + Load env from `Python/customer-support/.env`, then cwd `.env` (override). + """ + # ``src/config/env.py`` → parents[2] == customer-support project root + project_root = Path(__file__).resolve().parents[2] + load_dotenv(project_root / ".env") + load_dotenv() diff --git a/Python/customer-support/src/controllers/__init__.py b/Python/customer-support/src/controllers/__init__.py new file mode 100644 index 0000000..6f911e8 --- /dev/null +++ b/Python/customer-support/src/controllers/__init__.py @@ -0,0 +1 @@ +"""Optional local controllers (e.g. webhook receiver).""" diff --git a/Python/customer-support/src/controllers/webhook.py b/Python/customer-support/src/controllers/webhook.py new file mode 100644 index 0000000..c09ccaa --- /dev/null +++ b/Python/customer-support/src/controllers/webhook.py @@ -0,0 +1,89 @@ +""" +Optional local webhook receiver for Railengine publishing (activation demo). + +Run:: + + uv run python -m customer_support.controllers.webhook --port 8765 + +Point your engine webhook URL at http://localhost:8765/webhook (use ngrok or similar for a public URL). + +The handler parses `WebhookPublishingPayload` bodies using the same ``SupportTicket`` model as ingest. +""" + +from __future__ import annotations + +import argparse +import json +import sys +from http.server import BaseHTTPRequestHandler, HTTPServer +from typing import Any + +from railtown.engine.ingest import WebhookHandler + +from customer_support.config.env import ensure_dotenv_loaded +from customer_support.models import SupportTicket + + +class Handler(BaseHTTPRequestHandler): + model_handler = WebhookHandler(SupportTicket) + + def log_message(self, format: str, *args: Any) -> None: + print(f"[webhook] {self.address_string()} - {format % args}") + + def do_POST(self) -> None: # noqa: N802 + if self.path.rstrip("/") != "/webhook": + self.send_error(404, "Not Found") + return + length = int(self.headers.get("Content-Length", "0")) + raw_body = self.rfile.read(length).decode("utf-8", errors="replace") + try: + payload: Any = json.loads(raw_body) if raw_body.strip() else {} + except json.JSONDecodeError: + self.send_error(400, "Invalid JSON") + return + + try: + events = self.model_handler.parse(payload) + except Exception as e: + self.send_error(400, f"Parse error: {e}") + return + + for ev in events: + print( + json.dumps( + { + "eventId": ev.EventId, + "ticketId": ev.body.id, + "subject": ev.body.subject, + "status": ev.body.status, + }, + indent=2, + ) + ) + + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(b'{"ok":true}') + + +def main() -> None: + ensure_dotenv_loaded() + parser = argparse.ArgumentParser( + description="Receive Railengine webhook POSTs locally." + ) + parser.add_argument("--host", default="127.0.0.1") + parser.add_argument("--port", type=int, default=8765) + args = parser.parse_args() + httpd = HTTPServer((args.host, args.port), Handler) + print( + f"Listening on http://{args.host}:{args.port}/webhook (POST)", file=sys.stderr + ) + try: + httpd.serve_forever() + except KeyboardInterrupt: + print("\nShutting down.", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/Python/customer-support/src/models/__init__.py b/Python/customer-support/src/models/__init__.py new file mode 100644 index 0000000..8fcaa27 --- /dev/null +++ b/Python/customer-support/src/models/__init__.py @@ -0,0 +1,19 @@ +"""Domain models.""" + +from customer_support.models.ticket import ( + KANBAN_COLUMNS, + TICKET_STATUSES, + TicketStatus, + SupportTicket, +) +from customer_support.models.ticket_page import TicketPage +from customer_support.models.triage import TriageAssessment + +__all__ = [ + "KANBAN_COLUMNS", + "TICKET_STATUSES", + "TicketPage", + "TicketStatus", + "SupportTicket", + "TriageAssessment", +] diff --git a/Python/customer-support/src/models/ticket.py b/Python/customer-support/src/models/ticket.py new file mode 100644 index 0000000..a692d4b --- /dev/null +++ b/Python/customer-support/src/models/ticket.py @@ -0,0 +1,37 @@ +"""Support ticket document (Railengine schema).""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, Field + +TicketStatus = Literal["pending", "open", "in_progress", "resolved"] + +TICKET_STATUSES: tuple[TicketStatus, ...] = ( + "pending", + "open", + "in_progress", + "resolved", +) + +KANBAN_COLUMNS: tuple[tuple[str, TicketStatus], ...] = ( + ("Pending", "pending"), + ("Open", "open"), + ("In Progress", "in_progress"), + ("Resolved", "resolved"), +) + + +class SupportTicket(BaseModel): + """Document shape stored in Railengine (matches engine-schema sample).""" + + id: str = Field(..., description="Stable ticket id / business key") + subject: str + body: str + status: TicketStatus = Field(...) + tags: list[str] = Field(default_factory=list) + createdAt: str + customerEmail: str = "" + customerPhone: str = "" + productArea: str = "" diff --git a/Python/customer-support/src/models/ticket_page.py b/Python/customer-support/src/models/ticket_page.py new file mode 100644 index 0000000..ef85f4c --- /dev/null +++ b/Python/customer-support/src/models/ticket_page.py @@ -0,0 +1,18 @@ +"""Paging DTO for storage list.""" + +from __future__ import annotations + +from dataclasses import dataclass + +from customer_support.models.ticket import SupportTicket + + +@dataclass(frozen=True) +class TicketPage: + """One page of tickets from Railengine storage list.""" + + items: list[SupportTicket] + total_pages: int + total_count: int + page_number: int + page_size: int diff --git a/Python/customer-support/src/models/triage.py b/Python/customer-support/src/models/triage.py new file mode 100644 index 0000000..ad9d93d --- /dev/null +++ b/Python/customer-support/src/models/triage.py @@ -0,0 +1,33 @@ +"""Structured triage output from the Railtracks agent.""" + +from __future__ import annotations + +from typing import Literal + +from pydantic import BaseModel, Field + + +class TriageAssessment(BaseModel): + """Structured triage output from the Railtracks agent.""" + + priority: Literal["p1", "p2", "p3", "p4"] = Field( + ..., + description="p1 critical outage / legal; p2 major customer pain; p3 normal; p4 low", + ) + category: str = Field(..., description="Short category label, e.g. billing, auth") + internal_summary: str = Field( + ..., + description="Internal 2–4 sentence summary for support leads (no raw secrets).", + ) + draft_reply_to_customer: str = Field( + ..., + description="Empathetic draft reply. Do not repeat suspected secrets or API keys.", + ) + similar_ticket_ids: list[str] = Field( + default_factory=list, + description="Ids of similar historical tickets found via search tools", + ) + reasoning: str = Field( + ..., + description="Brief justification for priority and category", + ) diff --git a/Python/customer-support/src/pages/__init__.py b/Python/customer-support/src/pages/__init__.py new file mode 100644 index 0000000..b1956fd --- /dev/null +++ b/Python/customer-support/src/pages/__init__.py @@ -0,0 +1 @@ +"""Streamlit multipage scripts (included in the wheel for paths relative to ``streamlit_app``).""" diff --git a/Python/customer-support/src/pages/agent_page.py b/Python/customer-support/src/pages/agent_page.py new file mode 100644 index 0000000..182f98d --- /dev/null +++ b/Python/customer-support/src/pages/agent_page.py @@ -0,0 +1,239 @@ +"""Streamlit page: Customer Support Agent (chat + structured triage).""" + +from __future__ import annotations + +import asyncio +import traceback + +import streamlit as st + +from customer_support.models import SupportTicket, TriageAssessment +from customer_support.repositories import TicketRepository +from customer_support.services.ticket_list_service import TicketListService +from customer_support.services.triage_service import TriageService +from customer_support.streamlit_common import ( + PRIORITY_ORDER, + TICKET_ID_PATTERN, + env_ok, + render_app_toolbar, + render_chat_message_with_ticket_links, + render_page_brand, + render_ticket_subject_button, + render_triage_assessment, +) + +_QUEUE_KEY = "agent_queue" +_RESULTS_KEY = "agent_results" +_CHAT_KEY = "agent_chat" +_TICKET_CACHE_KEY = "agent_ticket_cache" + +# (button label, prompt sent to the agent) +_EXAMPLE_CHAT_PROMPTS: tuple[tuple[str, str], ...] = ( + ( + "🎯 Which ticket first?", + "Which open or pending ticket should we handle first, and why?", + ), + ( + "💳 Similar billing issues", + "Search for similar resolved tickets related to billing portal or invoice errors.", + ), + ( + "📋 Summarize the queue", + "Summarize all open and pending tickets in the queue by customer impact and urgency.", + ), + ( + "⚡ Highest-impact next steps", + "What are the recommended next steps for the highest-impact open ticket?", + ), +) + + +def _ticket_lookup_base() -> dict[str, SupportTicket]: + """Queue plus any tickets resolved from prior chat replies.""" + lookup: dict[str, SupportTicket] = dict(st.session_state.get(_TICKET_CACHE_KEY, {})) + for ticket in st.session_state.get(_QUEUE_KEY, []): + lookup[ticket.id] = ticket + return lookup + + +async def _resolve_tickets_mentioned_in_text( + content: str, *, list_ready: bool +) -> dict[str, SupportTicket]: + lookup = _ticket_lookup_base() + for tid in dict.fromkeys(TICKET_ID_PATTERN.findall(content)): + if tid in lookup or not list_ready: + continue + repo = TicketRepository() + for ticket in await repo.query_jsonpath_tickets(f"$.id:{tid}"): + if ticket.id == tid: + lookup[tid] = ticket + break + st.session_state[_TICKET_CACHE_KEY] = lookup + return lookup + + +def _submit_chat_turn(prompt: str, queue_snapshot: list[SupportTicket] | None) -> None: + st.session_state[_CHAT_KEY].append({"role": "user", "content": prompt}) + try: + with st.spinner("Agent thinking…"): + reply = asyncio.run( + TriageService().chat( + st.session_state[_CHAT_KEY], + queue=queue_snapshot or None, + ) + ) + st.session_state[_CHAT_KEY].append({"role": "assistant", "content": reply}) + except Exception: + st.session_state[_CHAT_KEY].pop() + st.error(traceback.format_exc()) + else: + st.rerun() + + +render_page_brand() +render_app_toolbar() + +st.title("Customer Support Agent") +st.caption( + "Ask questions about your customer support tickets (queue, priorities, similar resolved " + "cases) or **Load queue** and **Triage all** to run structured triage on open and pending tickets." +) + +env = env_ok() +list_ready = env["ENGINE_PAT"] and env["ENGINE_ID"] +triage_ready = list_ready and env["OPENAI_API_KEY"] + +if _QUEUE_KEY not in st.session_state: + st.session_state[_QUEUE_KEY] = [] +if _RESULTS_KEY not in st.session_state: + st.session_state[_RESULTS_KEY] = {} +if _CHAT_KEY not in st.session_state: + st.session_state[_CHAT_KEY] = [] + +if not list_ready: + st.warning("Set **ENGINE_PAT** and **ENGINE_ID** to load the triage queue.") +if not env["OPENAI_API_KEY"]: + st.warning("Set **OPENAI_API_KEY** to run the triage agent.") + +chat_col, triage_col = st.columns([1, 1], gap="large") + +with chat_col: + st.subheader("Chat with agent") + st.caption("Ask about the queue, priorities, or similar resolved tickets.") + + if st.button("🗑️ Clear chat", type="secondary"): + st.session_state[_CHAT_KEY] = [] + st.rerun() + + queue_snapshot: list[SupportTicket] = st.session_state[_QUEUE_KEY] + + for i, msg in enumerate(st.session_state[_CHAT_KEY]): + with st.chat_message(msg["role"]): + if msg["role"] == "assistant": + tickets_by_id = asyncio.run( + _resolve_tickets_mentioned_in_text( + msg["content"], list_ready=list_ready + ) + ) + render_chat_message_with_ticket_links( + msg["content"], + tickets_by_id, + message_index=i, + ) + else: + st.markdown(msg["content"]) + + st.caption("Example questions") + ex_cols = st.columns(2) + for idx, (label, example_prompt) in enumerate(_EXAMPLE_CHAT_PROMPTS): + with ex_cols[idx % 2]: + if st.button( + label, + key=f"chat_example:{idx}", + disabled=not triage_ready, + use_container_width=True, + ): + _submit_chat_turn(example_prompt, queue_snapshot) + + if prompt := st.chat_input( + "Ask about your support tickets…", + disabled=not triage_ready, + ): + _submit_chat_turn(prompt, queue_snapshot) + +with triage_col: + toolbar = st.columns([2, 2]) + load_queue = toolbar[0].button("📋 Load queue") + + if load_queue and list_ready: + try: + with st.spinner("Loading open and pending tickets…"): + st.session_state[_QUEUE_KEY] = asyncio.run( + TicketListService().fetch_open_and_pending() + ) + except Exception: + st.error(traceback.format_exc()) + else: + n = len(st.session_state[_QUEUE_KEY]) + st.success(f"Queue loaded: **{n}** ticket(s) (open + pending).") + + queue_loaded = len(st.session_state[_QUEUE_KEY]) > 0 + triage_all = toolbar[1].button( + "🤖 Triage all", + disabled=not triage_ready or not queue_loaded, + help="Run structured triage on every ticket in the loaded queue.", + ) + + queue: list[SupportTicket] = st.session_state[_QUEUE_KEY] + results: dict[str, TriageAssessment] = st.session_state[_RESULTS_KEY] + + if triage_all and triage_ready and queue: + try: + progress = st.progress(0.0, text="Running triage agent…") + + async def _triage_all() -> dict[str, TriageAssessment]: + svc = TriageService() + batch: dict[str, TriageAssessment] = {} + total = len(queue) + for i, ticket in enumerate(queue, start=1): + progress.progress( + i / total, text=f"Triage {i}/{total}: {ticket.id}" + ) + batch[ticket.id] = await svc.run(ticket) + return batch + + batch = asyncio.run(_triage_all()) + st.session_state[_RESULTS_KEY] = batch + results = batch + progress.empty() + st.success(f"Triage complete for **{len(batch)}** ticket(s).") + except Exception: + st.error(traceback.format_exc()) + + if queue: + st.subheader("Queue") + st.caption(f"{len(queue)} ticket(s) (open or pending).") + for ticket in queue: + render_ticket_subject_button(ticket, key=f"queue_view:{ticket.id}") + st.caption(f"`{ticket.id}` · **{ticket.status}** · {ticket.productArea}") + else: + st.info("Click **Load queue** to fetch open and pending tickets from storage.") + + if results: + st.subheader("Triage results") + st.caption("Sorted by priority (P1 first).") + + ticket_by_id = {t.id: t for t in queue} + sorted_ids = sorted( + results.keys(), + key=lambda tid: ( + PRIORITY_ORDER.get(results[tid].priority, 99), + tid, + ), + ) + + for tid in sorted_ids: + ticket = ticket_by_id.get(tid) + if ticket is None: + continue + render_triage_assessment(ticket, results[tid]) diff --git a/Python/customer-support/src/pages/dashboard.py b/Python/customer-support/src/pages/dashboard.py new file mode 100644 index 0000000..c6dfc6f --- /dev/null +++ b/Python/customer-support/src/pages/dashboard.py @@ -0,0 +1,101 @@ +"""Dashboard: interactive Kanban from full storage snapshot.""" + +from __future__ import annotations + +import asyncio +import traceback + +import streamlit as st + +from customer_support.models.ticket import KANBAN_COLUMNS, TicketStatus, SupportTicket +from customer_support.services.ingest_service import IngestService +from customer_support.services.ticket_list_service import TicketListService +from customer_support.streamlit_common import ( + env_ok, + group_tickets_by_status, + render_app_toolbar, + render_kanban_ticket_card, + render_page_brand, +) + +_KANBAN_SESSION_KEY = "kanban_tickets" +_KANBAN_INITIAL_LOAD_DONE = "kanban_initial_load_done" + +render_page_brand() +refresh = render_app_toolbar(show_refresh_board=True) + +st.title("Dashboard") +env = env_ok() +list_ready = env["ENGINE_PAT"] and env["ENGINE_ID"] +ingest_ready = env["ENGINE_TOKEN"] + +if _KANBAN_SESSION_KEY not in st.session_state: + st.session_state[_KANBAN_SESSION_KEY] = [] + +if not list_ready: + st.warning( + "Set **ENGINE_PAT** and **ENGINE_ID** to load tickets from Railengine.", + ) + +if not ingest_ready: + st.warning( + "Set **ENGINE_TOKEN** to change status from card dropdowns (updates use ingest upsert)." + ) + +should_load = list_ready and ( + refresh or not st.session_state.get(_KANBAN_INITIAL_LOAD_DONE) +) +if should_load: + try: + with st.spinner("Loading tickets from storage…"): + st.session_state[_KANBAN_SESSION_KEY] = asyncio.run( + TicketListService().fetch_all() + ) + st.session_state[_KANBAN_INITIAL_LOAD_DONE] = True + except Exception: + st.error(traceback.format_exc()) + else: + if refresh: + st.success( + f"Loaded **{len(st.session_state[_KANBAN_SESSION_KEY])}** tickets." + ) + +tickets: list[SupportTicket] = st.session_state[_KANBAN_SESSION_KEY] +buckets = group_tickets_by_status(tickets) + +cols = st.columns(4) + +move_hit: tuple[SupportTicket, TicketStatus] | None = None + +for idx, (_label, stat) in enumerate(KANBAN_COLUMNS): + with cols[idx]: + cnt = len(buckets[stat]) + st.markdown(f"#### {_label} ") + st.caption(f"{cnt} ticket(s)") + for ticket in buckets[stat]: + target = render_kanban_ticket_card( + ticket, + moves_disabled=not ingest_ready, + ) + if target is not None: + move_hit = (ticket, target) + +if move_hit is not None and ingest_ready: + tk, dest = move_hit + svc = IngestService() + try: + with st.spinner("Updating status…"): + status_code = asyncio.run(svc.update_status(tk, dest)) + if list_ready: + st.session_state[_KANBAN_SESSION_KEY] = asyncio.run( + TicketListService().fetch_all() + ) + else: + st.session_state[_KANBAN_SESSION_KEY] = [ + t.model_copy(update={"status": dest}) if t.id == tk.id else t + for t in st.session_state[_KANBAN_SESSION_KEY] + ] + st.toast(f"Moved `{tk.id}` → **{dest}** (HTTP {status_code})", icon="✅") + st.rerun() + except Exception: + st.error(traceback.format_exc()) diff --git a/Python/customer-support/src/pages/ingest_page.py b/Python/customer-support/src/pages/ingest_page.py new file mode 100644 index 0000000..eb2997b --- /dev/null +++ b/Python/customer-support/src/pages/ingest_page.py @@ -0,0 +1,156 @@ +"""Streamlit page: ingest + triage.""" + +from __future__ import annotations + +import asyncio +import json +import traceback + +import streamlit as st +from pydantic import ValidationError + +from customer_support.models import SupportTicket, TriageAssessment +from customer_support.services.ingest_service import IngestService +from customer_support.services.triage_service import TriageService +from customer_support.streamlit_common import ( + FIXTURES_DIR, + env_ok, + fixture_paths, + render_app_toolbar, + render_page_brand, + render_pipeline_stages, +) + +render_page_brand() +render_app_toolbar() + +st.title("Ingest") +st.caption( + "Edit ticket JSON, send to Railengine, or run structured triage with Railtracks. " + "The Railengine is configured to do PII masking, vector embeddings, and full text search." +) + +render_pipeline_stages() + +status = env_ok() + +if "ticket_editor" not in st.session_state: + st.session_state.ticket_editor = "" + +sidebar = st.sidebar +sidebar.subheader("Fixtures") +fixtures = fixture_paths() +fixture_names = [p.name for p in fixtures] + +if fixture_names: + pick = sidebar.selectbox("Pick fixture file", fixture_names) + if sidebar.button("📄 Load into editor", type="secondary"): + text = (FIXTURES_DIR / pick).read_text(encoding="utf-8") + st.session_state.ticket_editor = text + st.rerun() + if sidebar.button( + "🌱 Seed all fixtures", + type="secondary", + disabled=not status["ENGINE_TOKEN"], + help="Ingest every `fixtures/tickets/*.json` (requires ENGINE_TOKEN).", + ): + try: + with st.spinner(f"Ingesting {len(fixtures)} fixture(s)…"): + results = asyncio.run(IngestService().ingest_paths(fixtures)) + st.sidebar.success( + f"Seeded **{len(results)}** file(s): " + + ", ".join(f"`{name}` ({code})" for name, code in results) + ) + except Exception: + st.sidebar.error(traceback.format_exc()) + +sidebar.caption( + f"`fixtures/` path: `{FIXTURES_DIR}`" + if fixtures + else f"No `{FIXTURES_DIR}` — paste JSON manually." +) + +txt = st.text_area( + "Ticket JSON (`SupportTicket` schema)", + height=340, + key="ticket_editor", + width="stretch", +) + +c1, c2 = st.columns(2) + +ingest_ready = status["ENGINE_TOKEN"] + +ticket: SupportTicket | None = None +if txt.strip(): + try: + ticket = SupportTicket.model_validate(json.loads(txt)) + st.success("JSON matches `SupportTicket` schema.") + except json.JSONDecodeError as e: + st.warning(f"Invalid JSON: {e}") + except ValidationError as e: + st.error(f"Does not match `SupportTicket`: {e}") + +with c1: + do_ingest = st.button( + "📥 Ingest to Railengine", + disabled=not (ticket and ingest_ready), + help="Requires ENGINE_TOKEN.", + ) + +triage_ready = ( + ticket and status["ENGINE_PAT"] and status["ENGINE_ID"] and status["OPENAI_API_KEY"] +) + +with c2: + do_triage = st.button( + "🤖 Run triage", + disabled=not triage_ready, + help="Requires ENGINE_PAT, ENGINE_ID, and OPENAI_API_KEY.", + ) + +if do_ingest and ticket: + try: + with st.spinner("Ingesting…"): + http_status = asyncio.run(IngestService().ingest_ticket(ticket)) + st.success(f"Ingest succeeded (HTTP {http_status}).") + except Exception: + st.error(traceback.format_exc()) + +if do_triage and ticket: + try: + with st.spinner("Running Railtracks triage (tools + structured output)…"): + assessment = asyncio.run(TriageService().run(ticket)) + if not isinstance(assessment, TriageAssessment): + assessment = TriageAssessment.model_validate(assessment) + + st.subheader("Triage output") + m1, m2 = st.columns(2) + m1.metric("Priority", assessment.priority.upper()) + m2.metric("Category", assessment.category) + + st.markdown("**Internal summary**") + st.write(assessment.internal_summary) + + st.markdown("**Draft reply**") + st.write(assessment.draft_reply_to_customer) + + st.markdown("**Reasoning**") + st.write(assessment.reasoning) + + sid = assessment.similar_ticket_ids + if sid: + st.markdown("**Similar ticket ids**") + st.code(", ".join(sid)) + + raw = assessment.model_dump() + st.download_button( + label="⬇️ Download TriageAssessment JSON", + file_name=f"triage-{ticket.id}.json", + mime="application/json", + data=json.dumps(raw, indent=2, ensure_ascii=False).encode("utf-8"), + ) + with st.expander("Full JSON"): + st.json(raw) + except Exception: + st.error(traceback.format_exc()) diff --git a/Python/customer-support/src/pages/search_page.py b/Python/customer-support/src/pages/search_page.py new file mode 100644 index 0000000..a8df613 --- /dev/null +++ b/Python/customer-support/src/pages/search_page.py @@ -0,0 +1,155 @@ +"""Streamlit page: Railengine index search.""" + +from __future__ import annotations + +import asyncio +import traceback +from typing import Any + +import streamlit as st + +from customer_support.models import SupportTicket +from customer_support.services.search_service import SearchService +from customer_support.streamlit_common import ( + env_ok, + render_app_toolbar, + render_page_brand, + show_ticket_details_dialog, +) + +_SEARCH_RESULTS_KEY = "search_results" +_SEARCH_DF_KEY = "search_results_df" +_DEFAULT_LIMIT = 50 + + +def _trunc_subject(subject: str, *, max_len: int = 140) -> str: + return subject[:max_len] + ("…" if len(subject) > max_len else "") + + +def _tickets_to_table_rows(tickets: list[SupportTicket]) -> list[dict[str, str]]: + return [ + { + "id": t.id, + "subject": _trunc_subject(t.subject), + "status": t.status, + "tags": ", ".join(t.tags), + "productArea": t.productArea, + "createdAt": t.createdAt, + } + for t in tickets + ] + + +def _selected_row_index(df_key: str) -> int | None: + """Row index from ``st.dataframe`` selection state (dict or widget state object).""" + state: Any = st.session_state.get(df_key) + if state is None: + return None + selection = ( + state.get("selection") + if isinstance(state, dict) + else getattr(state, "selection", None) + ) + if selection is None: + return None + rows = ( + selection.get("rows") + if isinstance(selection, dict) + else getattr(selection, "rows", None) + ) + if not rows: + return None + return int(rows[0]) + + +def _open_details_for_selected_row() -> None: + """``on_select`` callback: open ticket dialog for the newly selected row.""" + idx = _selected_row_index(_SEARCH_DF_KEY) + if idx is None: + return + tickets: list[SupportTicket] = st.session_state.get(_SEARCH_RESULTS_KEY, []) + if 0 <= idx < len(tickets): + show_ticket_details_dialog(tickets[idx]) + + +render_page_brand() +render_app_toolbar() + +st.title("Search") +st.caption( + "Find tickets in Railengine using the **keyword index** (`search_index`). " + "Use subject keywords, product area, or phrases from ticket bodies." +) + +env = env_ok() +search_ready = env["ENGINE_PAT"] and env["ENGINE_ID"] + +if not search_ready: + st.warning("Set **ENGINE_PAT** and **ENGINE_ID** to search Railengine.") + +if _SEARCH_RESULTS_KEY not in st.session_state: + st.session_state[_SEARCH_RESULTS_KEY] = [] + +with st.form("search_form", clear_on_submit=False): + query = st.text_input( + "Search query", + placeholder="e.g. billing portal invoice 500", + disabled=not search_ready, + ) + limit = st.slider( + "Max results", + min_value=5, + max_value=100, + value=_DEFAULT_LIMIT, + step=5, + disabled=not search_ready, + ) + run_search = st.form_submit_button( + "🔍 Search", + disabled=not search_ready, + type="primary", + ) + +if run_search and query.strip(): + try: + with st.spinner("Searching Railengine index…"): + st.session_state[_SEARCH_RESULTS_KEY] = asyncio.run( + SearchService().search_index(query.strip(), limit=limit) + ) + if _SEARCH_DF_KEY in st.session_state: + del st.session_state[_SEARCH_DF_KEY] + except Exception: + st.error(traceback.format_exc()) + else: + n = len(st.session_state[_SEARCH_RESULTS_KEY]) + st.success(f"Found **{n}** ticket(s) for `{query.strip()}`.") + +results: list[SupportTicket] = st.session_state[_SEARCH_RESULTS_KEY] + +if results: + st.subheader("Results") + st.caption( + "Select a row in the table (checkbox on the left) to open **Ticket details**. " + "You can also use **📋 Details** after selecting a row." + ) + st.dataframe( + _tickets_to_table_rows(results), + hide_index=True, + width="stretch", + on_select=_open_details_for_selected_row, + selection_mode="single-row", + key=_SEARCH_DF_KEY, + ) + selected_idx = _selected_row_index(_SEARCH_DF_KEY) + if st.button( + "📋 Details", + disabled=selected_idx is None, + type="secondary", + ): + if selected_idx is not None and 0 <= selected_idx < len(results): + show_ticket_details_dialog(results[selected_idx]) +elif search_ready and not run_search: + st.info( + "Enter a query and press **Enter** or click **🔍 Search** " + "to query the Railengine index." + ) diff --git a/Python/customer-support/src/repositories/__init__.py b/Python/customer-support/src/repositories/__init__.py new file mode 100644 index 0000000..01551f2 --- /dev/null +++ b/Python/customer-support/src/repositories/__init__.py @@ -0,0 +1,8 @@ +"""Repository layer.""" + +from customer_support.repositories.ticket_repository import ( + TicketRepository, + ingest_ticket_with_client, +) + +__all__ = ["TicketRepository", "ingest_ticket_with_client"] diff --git a/Python/customer-support/src/repositories/ticket_repository.py b/Python/customer-support/src/repositories/ticket_repository.py new file mode 100644 index 0000000..9ac703c --- /dev/null +++ b/Python/customer-support/src/repositories/ticket_repository.py @@ -0,0 +1,157 @@ +"""Railengine ingest + retrieval I/O.""" + +from __future__ import annotations + +import json +from collections.abc import AsyncIterator +from pathlib import Path +from typing import Any + +from railtown.engine import Railengine +from railtown.engine.ingest import RailengineIngest + +from customer_support.models import SupportTicket, TicketPage + + +def _as_ticket(item: Any) -> SupportTicket | None: + """Coerce SDK hits to ``SupportTicket`` (model instances or fallback dict parse).""" + if isinstance(item, SupportTicket): + return item + if isinstance(item, dict): + try: + return SupportTicket.model_validate(item) + except Exception: + return None + return None + + +def _collect_tickets(items: list[Any]) -> list[SupportTicket]: + out: list[SupportTicket] = [] + for item in items: + t = _as_ticket(item) + if t is not None: + out.append(t) + return out + + +async def ingest_ticket_with_client( + client: RailengineIngest, ticket: SupportTicket +) -> int: + """Upsert using an existing ingest client.""" + resp = await client.upsert(ticket) + return resp.status_code + + +class TicketRepository: + """SDK-backed ticket persistence and search.""" + + async def upsert(self, ticket: SupportTicket) -> int: + async with RailengineIngest(model=SupportTicket) as client: + return await ingest_ticket_with_client(client, ticket) + + async def ingest_paths(self, paths: list[Path]) -> list[tuple[str, int]]: + """Batch ingest preserving a single ingest session.""" + results: list[tuple[str, int]] = [] + async with RailengineIngest(model=SupportTicket) as client: + for path in paths: + raw = json.loads(path.read_text(encoding="utf-8")) + ticket = SupportTicket.model_validate(raw) + status = await ingest_ticket_with_client(client, ticket) + results.append((path.name, status)) + return results + + async def list_page(self, page_number: int = 1, page_size: int = 100) -> TicketPage: + capped = max(1, min(int(page_size), 100)) + ps = capped + async with Railengine(model=SupportTicket) as client: + page = await client.list_storage_documents( + page_number=page_number, + page_size=ps, + ) + + tickets = _collect_tickets(page.items) + + return TicketPage( + items=tickets, + total_pages=getattr(page, "total_pages", 0) or 0, + total_count=getattr(page, "total_count", len(tickets)), + page_number=getattr(page, "page_number", page_number), + page_size=getattr(page, "page_size", ps), + ) + + async def list_all(self, *, page_size: int = 100) -> list[SupportTicket]: + """Walk every storage page in one Railengine session; return parsed tickets.""" + capped = max(1, min(int(page_size), 100)) + out: list[SupportTicket] = [] + async with Railengine(model=SupportTicket) as client: + pn = 1 + while True: + page = await client.list_storage_documents( + page_number=pn, + page_size=capped, + ) + out.extend(_collect_tickets(page.items)) + total_pages = getattr(page, "total_pages", 0) or 0 + if total_pages < 1 or pn >= total_pages: + break + pn += 1 + return out + + async def search_index_hits(self, query: str, limit: int) -> list[SupportTicket]: + out: list[SupportTicket] = [] + async with Railengine(model=SupportTicket) as client: + result = await client.search_index(query={"search": query}, raw=False) + for item in result.items: + t = _as_ticket(item) + if t is not None: + out.append(t) + if len(out) >= limit: + break + return out + + async def search_vector_hits(self, query: str, limit: int) -> list[SupportTicket]: + out: list[SupportTicket] = [] + async with Railengine(model=SupportTicket) as client: + items = await client.search_vector_store( + vector_store="VectorStore1", + query=query, + top=limit, + ) + for item in items: + t = _as_ticket(item) + if t is not None: + out.append(t) + if len(out) >= limit: + break + return out + + async def query_jsonpath_tickets(self, jq: str) -> list[SupportTicket]: + async with Railengine(model=SupportTicket) as client: + page = await client.query_storage_by_jsonpath(json_path_query=jq) + return _collect_tickets(page.items) + + async def iter_storage_tickets( + self, + *, + page_size: int, + max_docs: int, + ) -> AsyncIterator[SupportTicket]: + """Yield parsed tickets from storage pages until ``max_docs`` or pages exhausted.""" + async with Railengine(model=SupportTicket) as client: + scanned = 0 + pn = 1 + while scanned < max_docs: + page = await client.list_storage_documents( + page_number=pn, + page_size=page_size, + ) + for item in page.items: + t = _as_ticket(item) + if t is not None: + yield t + scanned += 1 + if scanned >= max_docs: + return + if page.total_pages < 1 or pn >= page.total_pages: + return + pn += 1 diff --git a/Python/customer-support/src/services/__init__.py b/Python/customer-support/src/services/__init__.py new file mode 100644 index 0000000..2aa8da7 --- /dev/null +++ b/Python/customer-support/src/services/__init__.py @@ -0,0 +1,8 @@ +"""Application services.""" + +from customer_support.services.ingest_service import IngestService +from customer_support.services.search_service import SearchService +from customer_support.services.ticket_list_service import TicketListService +from customer_support.services.triage_service import TriageService + +__all__ = ["IngestService", "SearchService", "TicketListService", "TriageService"] diff --git a/Python/customer-support/src/services/ingest_service.py b/Python/customer-support/src/services/ingest_service.py new file mode 100644 index 0000000..9e00280 --- /dev/null +++ b/Python/customer-support/src/services/ingest_service.py @@ -0,0 +1,28 @@ +"""Ingest orchestration.""" + +from __future__ import annotations + +from pathlib import Path + +from customer_support.models.ticket import SupportTicket, TicketStatus +from customer_support.repositories import TicketRepository + + +class IngestService: + """Use-case wrapper for fixture → Railengine ingest.""" + + def __init__(self, repository: TicketRepository | None = None) -> None: + self._repo = repository or TicketRepository() + + async def ingest_ticket(self, ticket: SupportTicket) -> int: + """Upsert a single ticket.""" + return await self._repo.upsert(ticket) + + async def update_status(self, ticket: SupportTicket, status: TicketStatus) -> int: + """Persist a Kanban column change via upsert (requires ENGINE_TOKEN).""" + updated = ticket.model_copy(update={"status": status}) + return await self._repo.upsert(updated) + + async def ingest_paths(self, paths: list[Path]) -> list[tuple[str, int]]: + """Batch-ingest fixture files (single ingest session).""" + return await self._repo.ingest_paths(paths) diff --git a/Python/customer-support/src/services/search_service.py b/Python/customer-support/src/services/search_service.py new file mode 100644 index 0000000..955a6c4 --- /dev/null +++ b/Python/customer-support/src/services/search_service.py @@ -0,0 +1,17 @@ +"""Railengine index search orchestration.""" + +from __future__ import annotations + +from customer_support.models import SupportTicket +from customer_support.repositories import TicketRepository + + +class SearchService: + """Expose ``search_index`` for keyword retrieval.""" + + def __init__(self, repository: TicketRepository | None = None) -> None: + self._repo = repository or TicketRepository() + + async def search_index(self, query: str, *, limit: int = 50) -> list[SupportTicket]: + capped = max(1, min(int(limit), 100)) + return await self._repo.search_index_hits(query, capped) diff --git a/Python/customer-support/src/services/ticket_list_service.py b/Python/customer-support/src/services/ticket_list_service.py new file mode 100644 index 0000000..fdd2a4c --- /dev/null +++ b/Python/customer-support/src/services/ticket_list_service.py @@ -0,0 +1,27 @@ +"""Paginated storage list orchestration.""" + +from __future__ import annotations + +from customer_support.models import SupportTicket, TicketPage +from customer_support.repositories import TicketRepository + + +class TicketListService: + """Expose ``list_storage_documents`` as a TicketPage.""" + + def __init__(self, repository: TicketRepository | None = None) -> None: + self._repo = repository or TicketRepository() + + async def fetch_page(self, page_number: int = 1, page_size: int = 50) -> TicketPage: + return await self._repo.list_page(page_number=page_number, page_size=page_size) + + async def fetch_all(self, *, page_size: int = 100) -> list[SupportTicket]: + """Full storage snapshot (paginated internally).""" + return await self._repo.list_all(page_size=page_size) + + async def fetch_open_and_pending( + self, *, page_size: int = 100 + ) -> list[SupportTicket]: + """Tickets in ``open`` or ``pending`` status for the triage queue.""" + tickets = await self.fetch_all(page_size=page_size) + return [t for t in tickets if t.status in ("open", "pending")] diff --git a/Python/customer-support/src/services/triage_service.py b/Python/customer-support/src/services/triage_service.py new file mode 100644 index 0000000..4a5682e --- /dev/null +++ b/Python/customer-support/src/services/triage_service.py @@ -0,0 +1,87 @@ +"""Railtracks triage flow orchestration.""" + +from __future__ import annotations + +import json + +import railtracks as rt +from railtracks.built_nodes.concrete.response import StructuredResponse + +from customer_support.agents.triage_agent import ( + build_triage_agent, + build_triage_chat_agent, +) +from customer_support.models import SupportTicket, TriageAssessment + + +def _flow_reply_text(result: object) -> str: + if isinstance(result, StructuredResponse): + structured = result.structured + if hasattr(structured, "model_dump_json"): + return structured.model_dump_json(indent=2) + return str(structured) + if hasattr(result, "content"): + return str(result.content) + return str(result) + + +class TriageService: + """Run the structured triage agent (tools use TicketRepository internally).""" + + async def run(self, ticket: SupportTicket) -> TriageAssessment: + agent_cls = build_triage_agent() + flow = rt.Flow(name="CustomerSupportTriage", entry_point=agent_cls) + + prompt = f"""Triage this ticket: + +```json +{json.dumps(ticket.model_dump(), indent=2, ensure_ascii=False)} +``` + +1) Call search_similar_tickets with a query built from subject, body, and productArea. +2) Optionally call list_recent_tickets(status="resolved") for extra historical context. +3) Return a TriageAssessment with priority, category, internal_summary, + draft_reply_to_customer, similar_ticket_ids, and reasoning. +""" + result = await flow.ainvoke(prompt) + if isinstance(result, StructuredResponse): + structured = result.structured + if isinstance(structured, TriageAssessment): + return structured + return TriageAssessment.model_validate(structured) + if isinstance(result, TriageAssessment): + return result + if hasattr(result, "model_dump"): + return TriageAssessment.model_validate(result.model_dump()) + raise TypeError(f"Unexpected agent result type: {type(result)}") + + async def run_batch( + self, tickets: list[SupportTicket] + ) -> dict[str, TriageAssessment]: + """Triage each ticket sequentially (one agent run per ticket).""" + results: dict[str, TriageAssessment] = {} + for ticket in tickets: + results[ticket.id] = await self.run(ticket) + return results + + async def chat( + self, + messages: list[dict[str, str]], + *, + queue: list[SupportTicket] | None = None, + ) -> str: + """Free-form chat with the triage agent (same tools, conversational reply).""" + agent_cls = build_triage_chat_agent() + flow = rt.Flow(name="CustomerSupportTriageChat", entry_point=agent_cls) + + parts: list[str] = [] + if queue: + lines = [f"- {t.id} ({t.status}): {t.subject}" for t in queue[:30]] + parts.append("Open/pending queue in Railengine:\n" + "\n".join(lines)) + + transcript = "\n".join(f"{m['role'].upper()}: {m['content']}" for m in messages) + parts.append(f"Conversation:\n{transcript}") + + prompt = "\n\n".join(parts) + "\n\nRespond to the latest USER message." + result = await flow.ainvoke(prompt) + return _flow_reply_text(result) diff --git a/Python/customer-support/src/streamlit_app.py b/Python/customer-support/src/streamlit_app.py new file mode 100644 index 0000000..b650279 --- /dev/null +++ b/Python/customer-support/src/streamlit_app.py @@ -0,0 +1,27 @@ +"""Streamlit multipage entry: Dashboard, Search, Ingest, Agent.""" + +from __future__ import annotations + +import streamlit as st + +from customer_support.config.env import ensure_dotenv_loaded +from customer_support.streamlit_common import use_full_width_layout + + +def main() -> None: + ensure_dotenv_loaded() + st.set_page_config(page_title="Support Triage", layout="wide") + use_full_width_layout() + pg = st.navigation( + [ + st.Page("pages/dashboard.py", title="Dashboard", icon="🗂️", default=True), + st.Page("pages/search_page.py", title="Search", icon="🔍"), + st.Page("pages/ingest_page.py", title="Ingest", icon="📥"), + st.Page("pages/agent_page.py", title="Agent", icon="🤖"), + ] + ) + pg.run() + + +if __name__ == "__main__": + main() diff --git a/Python/customer-support/src/streamlit_common.py b/Python/customer-support/src/streamlit_common.py new file mode 100644 index 0000000..d2bd23d --- /dev/null +++ b/Python/customer-support/src/streamlit_common.py @@ -0,0 +1,370 @@ +"""Shared Streamlit helpers: paths, fixtures, env status.""" + +from __future__ import annotations + +import base64 +import os +import re +from pathlib import Path + +import streamlit as st + +from customer_support.models.ticket import ( + KANBAN_COLUMNS, + TICKET_STATUSES, + TicketStatus, + SupportTicket, +) +from customer_support.models.triage import TriageAssessment + +PROJECT_ROOT = Path(__file__).resolve().parents[1] +FIXTURES_DIR = PROJECT_ROOT / "fixtures" / "tickets" +BRAND_LOGO_PATH = PROJECT_ROOT / "assets" / "logo-railengine.png" + +PRIORITY_ORDER = {"p1": 0, "p2": 1, "p3": 2, "p4": 3} + +TICKET_ID_PATTERN = re.compile(r"\b(ticket-[a-zA-Z0-9-]+)\b") + + +def env_ok() -> dict[str, bool]: + return { + "ENGINE_TOKEN": bool(os.environ.get("ENGINE_TOKEN", "").strip()), + "ENGINE_PAT": bool(os.environ.get("ENGINE_PAT", "").strip()), + "ENGINE_ID": bool(os.environ.get("ENGINE_ID", "").strip()), + "OPENAI_API_KEY": bool(os.environ.get("OPENAI_API_KEY", "").strip()), + } + + +def fixture_paths() -> list[Path]: + if not FIXTURES_DIR.is_dir(): + return [] + return sorted(FIXTURES_DIR.glob("*.json")) + + +_PIPELINE_STAGES_HTML = """ + + +""" + + +_FULL_WIDTH_CSS = """ + +""" + + +def use_full_width_layout() -> None: + """Expand main content to full viewport width (call once from app entry).""" + st.markdown(_FULL_WIDTH_CSS, unsafe_allow_html=True) + + +def render_page_brand() -> None: + """Clickable Railengine logo at the top of the main content area.""" + if not BRAND_LOGO_PATH.is_file(): + return + b64 = base64.b64encode(BRAND_LOGO_PATH.read_bytes()).decode() + st.markdown( + f'' + f'Railengine', + unsafe_allow_html=True, + ) + + +def render_pipeline_stages() -> None: + """Railengine pipeline overview (HTML; works without Mermaid support).""" + st.markdown("#### Pipeline stages") + st.html(_PIPELINE_STAGES_HTML, width="stretch") + st.caption( + "This page sends JSON through **Ingest**. Triage agents search **Embedding** and " + "**Indexing**; the dashboard reads **Hot Storage**; **Search** queries the index." + ) + + +@st.dialog("Config") +def show_env_config_dialog() -> None: + """Modal: which env vars are set (values never shown).""" + st.caption( + "Loaded from `.env` next to `pyproject.toml`. Secret values are not displayed." + ) + status = env_ok() + cols = st.columns(4) + for i, (key, ok) in enumerate(status.items()): + cols[i].metric(label=key, value="set" if ok else "missing") + + +def render_app_toolbar(*, show_refresh_board: bool = False) -> bool: + """Top-right toolbar: optional **Refresh board** + **Config** (same row).""" + refresh_clicked = False + if show_refresh_board: + _, refresh_col, config_col = st.columns([9, 1, 1]) + with refresh_col: + refresh_clicked = st.button( + "🔄 Refresh", + type="secondary", + use_container_width=True, + ) + else: + _, config_col = st.columns([11, 1]) + + with config_col: + if st.button("⚙️ Config", type="secondary", use_container_width=True): + show_env_config_dialog() + + return refresh_clicked + + +def group_tickets_by_status( + tickets: list[SupportTicket], +) -> dict[TicketStatus, list[SupportTicket]]: + buckets: dict[TicketStatus, list[SupportTicket]] = {s: [] for s in TICKET_STATUSES} + for t in tickets: + buckets[t.status].append(t) + return buckets + + +@st.dialog("Ticket details", width="large") +def show_ticket_details_dialog(ticket: SupportTicket) -> None: + """Modal with full ticket fields (read-only).""" + st.markdown(f"### {ticket.subject}") + + m1, m2, m3 = st.columns(3) + status_label = next( + (label for label, stat in KANBAN_COLUMNS if stat == ticket.status), + ticket.status, + ) + m1.metric("Status", status_label) + m2.metric("Product area", ticket.productArea or "—") + m3.metric("Created", ticket.createdAt[:10] if ticket.createdAt else "—") + + st.markdown("**Ticket ID**") + st.code(ticket.id) + + if ticket.tags: + st.markdown("**Tags**") + st.write(", ".join(ticket.tags)) + + if ticket.customerEmail or ticket.customerPhone: + st.markdown("**Customer**") + if ticket.customerEmail: + st.write(f"Email: `{ticket.customerEmail}`") + if ticket.customerPhone: + st.write(f"Phone: `{ticket.customerPhone}`") + + st.markdown("**Body**") + st.text_area( + "Body", + value=ticket.body, + height=220, + disabled=True, + label_visibility="collapsed", + ) + + with st.expander("Raw JSON"): + st.json(ticket.model_dump()) + + +def render_chat_message_with_ticket_links( + content: str, + tickets_by_id: dict[str, SupportTicket], + *, + message_index: int, +) -> None: + """Render assistant text; link ticket ids mentioned in the response.""" + st.markdown(content) + mentioned = list(dict.fromkeys(TICKET_ID_PATTERN.findall(content))) + linked = [tid for tid in mentioned if tid in tickets_by_id] + if not linked: + return + st.caption("Tickets in this reply (click subject for details):") + for tid in linked: + render_ticket_subject_button( + tickets_by_id[tid], + key=f"chat_msg:{message_index}:{tid}", + ) + + +def render_ticket_subject_button(ticket: SupportTicket, *, key: str) -> None: + """Tertiary button on the ticket subject; opens ``show_ticket_details_dialog``.""" + subj = ticket.subject[:100] + ("…" if len(ticket.subject) > 100 else "") + if st.button( + f"🎫 {subj}", + key=key, + width="stretch", + type="tertiary", + ): + show_ticket_details_dialog(ticket) + + +def render_kanban_ticket_card( + ticket: SupportTicket, + *, + moves_disabled: bool, +) -> TicketStatus | None: + """ + One Kanban card with a status dropdown and a dialog trigger on the subject. + Returns the new status when the selection differs from the ticket's current status. + """ + labels = [label for label, _ in KANBAN_COLUMNS] + status_by_label = {label: stat for label, stat in KANBAN_COLUMNS} + index = next(i for i, (_, s) in enumerate(KANBAN_COLUMNS) if s == ticket.status) + + with st.container(border=True): + render_ticket_subject_button(ticket, key=f"view:{ticket.id}") + tags = ", ".join(ticket.tags[:6]) + st.caption(f"`{ticket.id}` · _{ticket.productArea}_ · {ticket.createdAt[:10]}") + if tags: + st.caption(tags) + selected_label = st.selectbox( + "Status", + options=labels, + index=index, + key=f"status:{ticket.id}", + disabled=moves_disabled, + label_visibility="collapsed", + ) + + selected = status_by_label[selected_label] + if selected != ticket.status: + return selected + return None + + +def render_triage_assessment( + ticket: SupportTicket, assessment: TriageAssessment +) -> None: + """Display structured triage output for a single ticket.""" + with st.container(border=True): + render_ticket_subject_button(ticket, key=f"triage_result_view:{ticket.id}") + st.caption(f"`{ticket.id}` · queue status: **{ticket.status}**") + m1, m2 = st.columns(2) + m1.metric("Priority", assessment.priority.upper()) + m2.metric("Category", assessment.category) + st.markdown("**Why work on this**") + st.write(assessment.reasoning) + st.markdown("**Internal summary**") + st.write(assessment.internal_summary) + if assessment.similar_ticket_ids: + st.caption(f"Similar tickets: {', '.join(assessment.similar_ticket_ids)}") + with st.expander("Draft reply & full JSON"): + st.markdown("**Draft reply**") + st.write(assessment.draft_reply_to_customer) + st.json(assessment.model_dump()) diff --git a/README.md b/README.md index 071a5d0..52fb5c8 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ [TypeScript](https://github.com/RailtownAI/railengine-examples/tree/main/TypeScript) -[Python](https://github.com/RailtownAI/railengine-examples/tree/main/Python) +[Python](https://github.com/RailtownAI/railengine-examples/tree/main/Python) — includes [Customer Support Triage Agent](https://github.com/RailtownAI/railengine-examples/tree/main/Python/customer-support) (Railengine + Railtracks) ### AIoT @@ -31,4 +31,6 @@ [Food Diary](https://github.com/RailtownAI/railengine-examples/tree/main/TypeScript/nextjs-food-diary) +[Customer Support Triage (Python)](https://github.com/RailtownAI/railengine-examples/tree/main/Python/customer-support) + [Thermometer](https://github.com/RailtownAI/railengine-examples/tree/main/MicroPython)