Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 82 additions & 4 deletions src/agent_term/matrix_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from agent_term.config import load_config
from agent_term.dispatch_cli import build_pipeline
from agent_term.events import AgentTermEvent
from agent_term.matrix_service import MatrixServiceAdapter, build_matrix_service_backend
from agent_term.matrix_service import normalize_sync_payload
from agent_term.matrix_state import DEFAULT_STATE_PATH, MatrixStateStore, resolve_matrix_room
from agent_term.matrix_state import rooms_from_sync_payload
Expand Down Expand Up @@ -48,6 +49,13 @@ def build_parser() -> argparse.ArgumentParser:
sync.add_argument("--sender", default="@agent-term")
sync.add_argument("--channel", default="!matrix-sync")

live_sync = subparsers.add_parser("sync", help="Run incremental Matrix sync through the configured backend.")
live_sync.add_argument("--persist", action="store_true", help="Persist normalized events into EventStore.")
live_sync.add_argument("--save-state", action="store_true", default=True, help="Persist next_batch and room IDs into Matrix state.")
live_sync.add_argument("--no-save-state", dest="save_state", action="store_false")
live_sync.add_argument("--timeout-ms", type=int, default=0)
live_sync.add_argument("--full-state", action="store_true")

state = subparsers.add_parser("state", help="Show durable Matrix sync state.")
state.add_argument("--json", action="store_true", help="Print state as JSON.")

Expand Down Expand Up @@ -118,12 +126,77 @@ def cmd_normalize_sync(args: argparse.Namespace) -> int:
payload = _load_json_payload(args.payload)
batch = normalize_sync_payload(payload)

print_sync_batch(batch, db_path=db_path, state_path=Path(args.state), persist=args.persist, save_state=args.save_state, payload=payload)
return 0


def cmd_incremental_sync(args: argparse.Namespace) -> int:
config = load_config(args.config)
db_path = Path(args.db or config.event_store.path or DEFAULT_DB_PATH)
state_store = MatrixStateStore(args.state)
state = state_store.load()
event = AgentTermEvent(
channel="!matrix-sync",
sender="@agent-term",
kind="matrix_sync",
source="matrix-service",
body="Run Matrix incremental sync.",
metadata={
"since": state.next_batch,
"timeout_ms": args.timeout_ms,
"full_state": args.full_state,
},
)
result = MatrixServiceAdapter(build_matrix_service_backend(config)).handle(event)
if not result.ok:
print("matrix_sync_status=blocked")
if result.metadata.get("deny_reason"):
print(f"blocked_reason={result.metadata['deny_reason']}")
return 1

events_metadata = result.metadata.get("matrix_events")
events = []
if isinstance(events_metadata, list):
for metadata in events_metadata:
if not isinstance(metadata, dict):
continue
events.append(
AgentTermEvent(
channel=str(metadata.get("matrix_room_alias") or metadata.get("matrix_room_id") or "!matrix-sync"),
sender=str(metadata.get("matrix_sender_mxid") or "@agent-term"),
kind="matrix_room_event",
source="matrix",
body="",
thread_id=_optional_str(metadata.get("matrix_thread_root_event_id")),
metadata=metadata,
)
)

print(f"matrix_sync_events={len(events)}")
next_batch = _optional_str(result.metadata.get("matrix_next_batch"))
if next_batch:
print(f"matrix_next_batch={next_batch}")
if args.persist:
store = EventStore(db_path)
try:
for event in events:
store.append(event)
finally:
store.close()
print(f"persisted_events={len(events)}")
if args.save_state:
state = state_store.update_next_batch(next_batch)
print(f"matrix_state_next_batch={state.next_batch}")
return 0


def print_sync_batch(batch, *, db_path: Path, state_path: Path, persist: bool, save_state: bool, payload: dict[str, Any]) -> None:
print(f"matrix_sync_events={len(batch.events)}")
if batch.next_batch:
print(f"matrix_next_batch={batch.next_batch}")

events = [matrix_event.to_agentterm_event() for matrix_event in batch.events]
if args.persist:
if persist:
store = EventStore(db_path)
try:
for event in events:
Expand All @@ -132,16 +205,15 @@ def cmd_normalize_sync(args: argparse.Namespace) -> int:
store.close()
print(f"persisted_events={len(events)}")

if args.save_state:
state_store = MatrixStateStore(args.state)
if save_state:
state_store = MatrixStateStore(state_path)
state = state_store.update_rooms(rooms_from_sync_payload(payload))
state = state_store.save(state.with_next_batch(batch.next_batch))
print(f"matrix_state_next_batch={state.next_batch}")
print(f"matrix_state_rooms={len(state.rooms)}")

for event in events:
print(f"{event.channel}\t{event.sender}\t{event.kind}\t{event.body}")
return 0


def cmd_state(args: argparse.Namespace) -> int:
Expand All @@ -167,12 +239,18 @@ def _load_json_payload(path: str) -> dict[str, Any]:
return value


def _optional_str(value: object) -> str | None:
return str(value) if value is not None else None


def main(argv: list[str] | None = None) -> int:
args = build_parser().parse_args(argv)
if args.command == "send":
return cmd_send(args)
if args.command == "normalize-sync":
return cmd_normalize_sync(args)
if args.command == "sync":
return cmd_incremental_sync(args)
if args.command == "state":
return cmd_state(args)
raise SystemExit(f"unknown command: {args.command}")
Expand Down
98 changes: 88 additions & 10 deletions src/agent_term/matrix_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,16 @@ def to_metadata(self) -> dict[str, object]:
}


@dataclass(frozen=True)
class MatrixSyncRequest:
"""A Matrix incremental sync request."""

since: str | None = None
timeout_ms: int = 0
full_state: bool = False
metadata: dict[str, object] = field(default_factory=dict)


@dataclass(frozen=True)
class MatrixSyncBatch:
"""Normalized Matrix sync batch."""
Expand All @@ -79,15 +89,20 @@ class MatrixServiceBackend(Protocol):
def send_text(self, request: MatrixSendRequest) -> MatrixSendResult:
"""Send a text event into a Matrix room."""

def sync(self, request: MatrixSyncRequest) -> MatrixSyncBatch:
"""Run an incremental Matrix sync."""

def normalize_sync(self, payload: dict[str, Any]) -> MatrixSyncBatch:
"""Normalize a Matrix sync payload into AgentTerm MatrixRoomEvents."""


class InMemoryMatrixServiceBackend:
"""Offline Matrix backend for tests and local development."""

def __init__(self) -> None:
def __init__(self, sync_payloads: list[dict[str, Any]] | None = None) -> None:
self.sent: list[MatrixSendRequest] = []
self.sync_requests: list[MatrixSyncRequest] = []
self._sync_payloads = list(sync_payloads or [])

def send_text(self, request: MatrixSendRequest) -> MatrixSendResult:
self.sent.append(request)
Expand All @@ -100,6 +115,22 @@ def send_text(self, request: MatrixSendRequest) -> MatrixSendResult:
metadata={"local_backend": True, "txn_id": request.txn_id},
)

def sync(self, request: MatrixSyncRequest) -> MatrixSyncBatch:
self.sync_requests.append(request)
if not self._sync_payloads:
return MatrixSyncBatch(
events=(),
next_batch=request.since,
metadata={"local_backend": True, "empty_sync": True},
)
payload = self._sync_payloads.pop(0)
batch = normalize_sync_payload(payload)
return MatrixSyncBatch(
events=batch.events,
next_batch=batch.next_batch,
metadata={**batch.metadata, "local_backend": True, "since": request.since},
)

def normalize_sync(self, payload: dict[str, Any]) -> MatrixSyncBatch:
return normalize_sync_payload(payload)

Expand Down Expand Up @@ -127,6 +158,9 @@ def __init__(
def send_text(self, request: MatrixSendRequest) -> MatrixSendResult:
return asyncio.run(self._send_text_async(request))

def sync(self, request: MatrixSyncRequest) -> MatrixSyncBatch:
return asyncio.run(self._sync_async(request))

def normalize_sync(self, payload: dict[str, Any]) -> MatrixSyncBatch:
return normalize_sync_payload(payload)

Expand Down Expand Up @@ -171,6 +205,46 @@ async def _send_text_async(self, request: MatrixSendRequest) -> MatrixSendResult
error=repr(response),
)

async def _sync_async(self, request: MatrixSyncRequest) -> MatrixSyncBatch:
try:
from nio import AsyncClient, SyncError, SyncResponse
except ImportError as exc:
raise RuntimeError(
"matrix-nio is required for NioMatrixServiceBackend; install agent-term[matrix]"
) from exc

client = AsyncClient(self.homeserver_url, self.user_id, device_id=self.device_name)
client.access_token = self.access_token
try:
response = await client.sync(
timeout=request.timeout_ms,
since=request.since,
full_state=request.full_state,
)
finally:
await client.close()

if isinstance(response, SyncResponse):
payload = getattr(response, "source", None)
if isinstance(payload, dict):
return normalize_sync_payload(payload)
return MatrixSyncBatch(
events=(),
next_batch=response.next_batch,
metadata={"matrix_sync_response": "nio_without_source_payload"},
)
if isinstance(response, SyncError):
return MatrixSyncBatch(
events=(),
next_batch=request.since,
metadata={"matrix_sync_error": response.message},
)
return MatrixSyncBatch(
events=(),
next_batch=request.since,
metadata={"matrix_sync_error": repr(response)},
)


class MatrixServiceAdapter:
"""Adapter that performs Matrix service send/sync operations through a backend."""
Expand Down Expand Up @@ -248,16 +322,19 @@ def _send(self, event: AgentTermEvent) -> AdapterResult:
)

def _sync(self, event: AgentTermEvent) -> AdapterResult:
payload = event.metadata.get("matrix_sync") or event.metadata.get("payload")
if not isinstance(payload, dict):
return AdapterResult(
ok=False,
source=self.key,
body="Matrix service sync blocked: missing sync payload",
kind="matrix_sync",
metadata={"deny_reason": "missing_sync_payload", "fail_closed": True},
if isinstance(event.metadata.get("matrix_sync"), dict) or isinstance(event.metadata.get("payload"), dict):
payload = event.metadata.get("matrix_sync") or event.metadata.get("payload")
batch = self.backend.normalize_sync(payload) # type: ignore[arg-type]
else:
timeout_ms = int(event.metadata.get("timeout_ms") or 0)
batch = self.backend.sync(
MatrixSyncRequest(
since=_optional_str(event.metadata.get("since") or event.metadata.get("next_batch")),
timeout_ms=timeout_ms,
full_state=bool(event.metadata.get("full_state")),
metadata={"request_event_id": event.event_id},
)
)
batch = self.backend.normalize_sync(payload)
return AdapterResult(
ok=True,
source=self.key,
Expand All @@ -269,6 +346,7 @@ def _sync(self, event: AgentTermEvent) -> AdapterResult:
"matrix_sync_event_count": len(batch.events),
"matrix_next_batch": batch.next_batch,
"matrix_events": [matrix_event.to_metadata() for matrix_event in batch.events],
**batch.metadata,
},
)

Expand Down
32 changes: 32 additions & 0 deletions tests/test_matrix_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,38 @@ def test_matrix_cli_normalize_sync_saves_state(tmp_path, capsys):
assert state["rooms"] == {"!room:example.org": "!room:example.org"}


def test_matrix_cli_incremental_sync_uses_stored_next_batch_and_updates_state(tmp_path, capsys):
db_path = tmp_path / "events.sqlite3"
state_path = tmp_path / "matrix-state.json"
state_path.write_text(json.dumps({"next_batch": "batch-1", "rooms": {}}), encoding="utf-8")

exit_code = main(
[
"--db",
str(db_path),
"--state",
str(state_path),
"sync",
"--persist",
]
)

captured = capsys.readouterr()
assert exit_code == 0
assert "matrix_sync_events=0" in captured.out
assert "matrix_next_batch=batch-1" in captured.out
assert "matrix_state_next_batch=batch-1" in captured.out
state = json.loads(state_path.read_text(encoding="utf-8"))
assert state["next_batch"] == "batch-1"

store = EventStore(db_path)
try:
events = store.tail(limit=10)
finally:
store.close()
assert events == []


def test_matrix_cli_state_prints_state(tmp_path, capsys):
state_path = tmp_path / "matrix-state.json"
state_path.write_text(
Expand Down
Loading
Loading