diff --git a/README.md b/README.md index 2a8e413..288c9b6 100644 --- a/README.md +++ b/README.md @@ -222,23 +222,27 @@ edit (locks); there is no mid-tool-call interrupt — see [Limitations](#limitat | --- | --- | | `agent-sync init` | Create the database and tables (auto-runs on any command). | | `agent-sync register --name N [--role R]` | Register / update the current agent. | -| `agent-sync heartbeat` | Mark the current agent active now. | -| `agent-sync status [--compact]` | Show agents, tasks, locks, messages, activity. | -| `agent-sync tasks` | List all tasks. | -| `agent-sync create-task "T" [--description D] [--file P ...] [--priority N]` | Create a task. | -| `agent-sync claim-task T` | Claim a task by id or title. | -| `agent-sync claim-next` | Auto-claim the next available task (highest priority first; reclaims tasks abandoned by crashed sessions). | -| `agent-sync complete-task T` | Mark a task done. | +| `agent-sync whoami [--json]` | Show your resolved agent id and how it was determined. | +| `agent-sync heartbeat` | Mark the current agent active now (use during long quiet stretches to avoid going stale). | +| `agent-sync status [--compact] [--json]` | Show agents, tasks, locks, messages, activity (`--json` for machine-readable state). | +| `agent-sync tasks [--json]` | List all tasks. | +| `agent-sync create-task "T" [--description D] [--file P ...] [--priority N] [--depends-on T ...]` | Create a task (optionally blocked on other tasks). | +| `agent-sync claim-task T [--lock] [--force]` | Claim a task by id or title (`--lock` locks its files; `--force` overrides unmet dependencies). | +| `agent-sync claim-next [--lock]` | Auto-claim the next available task (highest priority first; skips dependency-blocked tasks; reclaims tasks abandoned by crashed sessions). | +| `agent-sync complete-task T` | Mark a task done (reports any dependents it unblocks). | | `agent-sync block-task T --reason R` | Mark a task blocked. | -| `agent-sync lock FILE [--reason R] [--ttl MIN]` | Lock a file (default TTL 60 min). | -| `agent-sync unlock FILE [--force]` | Release a lock (owner only, unless `--force`). | -| `agent-sync locks [--all]` | List live locks (`--all` includes expired). | -| `agent-sync send --to R --message M` | Send to an id, name, role, or `all`. | -| `agent-sync inbox [--all]` | Show unread (or all) messages addressed to you. | +| `agent-sync lock FILE [--reason R] [--ttl MIN] [--wait[=SEC]]` | Lock a file (default TTL 60 min; `--wait` blocks until free). | +| `agent-sync lock --resource KEY [...]` | Lock an arbitrary named resource instead of a file path. | +| `agent-sync unlock FILE \| --resource KEY [--force]` | Release a lock (owner only, unless `--force`). | +| `agent-sync append FILE [--content T] [--wait[=SEC]] [--no-newline]` | Atomically append to a shared file under a lock (body from `--content` or stdin). | +| `agent-sync locks [--all] [--json]` | List live locks (`--all` includes expired). | +| `agent-sync send --to R --message M [--reply-to ID]` | Send to an id, name, role, or `all` (optionally threaded). | +| `agent-sync inbox [--all] [--json]` | Show unread (or all) messages addressed to you. | | `agent-sync read-message ID` | Show a message and mark it read. | +| `agent-sync ack ID` | Acknowledge a message so its sender knows it was handled. | | `agent-sync decision "..."` | Record a shared decision. | -| `agent-sync log --type T --message M [--file P]` | Append an activity entry. | -| `agent-sync gc` | Re-status stale agents and drop expired locks. | +| `agent-sync log "message" [--type T] [--file P]` | Append an activity entry (message is positional; `--message` still accepted). | +| `agent-sync gc` | Re-status stale agents and drop expired locks (also runs automatically on `SessionStart`). | | `agent-sync console [--interval S] [--name N]` | Live operator console: stream activity and steer agents (needs the `tui` extra). | | `agent-sync hook {session-start,user-prompt-submit,pre-tool-use,post-tool-use,stop,session-end}` | Hook entry points (read JSON from stdin). | @@ -252,7 +256,7 @@ into your repo's `.claude/settings.json` (or run an installer with | Event | Matcher | Behaviour | | --- | --- | --- | -| `SessionStart` | (all) | Register/heartbeat the agent; inject compact status into context. With `AGENT_SYNC_AUTO_CLAIM=1`, also hand a free agent its next task (`claim-next`). | +| `SessionStart` | (all) | Garbage-collect stale agents/expired locks; register/heartbeat the agent; inject compact status into context. With `AGENT_SYNC_AUTO_CLAIM=1`, also hand a free agent its next task (`claim-next`). | | `UserPromptSubmit` | (all) | Push any undelivered messages (directed + broadcast) into context for this turn. | | `PreToolUse` | `Edit\|Write\|MultiEdit` | **Block (exit 2)** if the target file is locked by another active agent. | | `PostToolUse` | `Edit\|Write\|MultiEdit` | Log the successful edit to the activity feed. | @@ -273,15 +277,30 @@ environment variables where the hooks run (for example an `"env"` block in - `AGENT_SYNC_AUTO_RELEASE_LOCKS=1` — `SessionEnd` releases the agent's locks immediately instead of leaving them to expire. +**Tuning.** A couple of thresholds can be overridden via the environment: + +- `AGENT_SYNC_ID` — act as a specific agent id (set a distinct value per parallel + subagent so their locks stay mutually exclusive; otherwise identity is + auto-detected from the Claude Code session). `agent-sync whoami` shows the + resolved id and its source. +- `AGENT_SYNC_STALE_MINUTES` / `AGENT_SYNC_OFFLINE_MINUTES` — how long without a + check-in before an agent is considered stale (default 15) / offline (default + 120). Lower them for short-lived sessions, raise them for long quiet ones. +- `AGENT_SYNC_ROOT` — force the coordination root directory (otherwise the repo + root is auto-discovered). + ## Data storage All state lives in **`.claude/coordination/state.sqlite`** inside the target repo, created automatically on first use. Tables: - `agents` — id, name, role, session, cwd, status, current task, timestamps. -- `tasks` + `task_files` — the task board and the files each task touches. -- `locks` — one row per locked path, with owner and `expires_at` (TTL). -- `messages` — sender, recipient (id/name/role/`all`), body, read state. +- `tasks` + `task_files` + `task_deps` — the task board, the files each task + touches, and dependency edges between tasks. +- `locks` — one row per locked path or named resource, with owner, `kind` and + `expires_at` (TTL). +- `messages` — sender, recipient (id/name/role/`all`), body, read/ack state, + optional reply-to thread parent. - `message_deliveries` — per-(message, agent) record of which messages have been pushed into which agent's context (so a broadcast reaches each agent once). - `decisions` — recorded decisions. diff --git a/SECURITY.md b/SECURITY.md index 59d45e7..0939c07 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -86,6 +86,14 @@ file lock: Locks reliably prevent *honest* collisions between cooperating agents; they do not contain a session that ignores them. +## `append` writes to your working tree + +Almost every command only touches the coordination database. The exception is +`agent-sync append`, which **writes to a file in your working tree** (after +taking the lock on it). Like any agent action that edits files, only run it +against paths you intend to modify; it appends arbitrary content (from +`--content` or stdin) to the named file and creates parent directories as needed. + ## Reporting a vulnerability If you discover a security issue, please **do not open a public issue**. Instead: diff --git a/skills/agent-sync/SKILL.md b/skills/agent-sync/SKILL.md index 263811c..2c7636d 100644 --- a/skills/agent-sync/SKILL.md +++ b/skills/agent-sync/SKILL.md @@ -15,6 +15,19 @@ so nobody clobbers anybody else's edits. **Always run `agent-sync status --compact` before you start working** and treat the result as authoritative about who else is active and which files are locked. +## TL;DR — the loop (the 90% case) + +1. `agent-sync register --name --role ""` — once per session. +2. `agent-sync claim-next --lock` — take the next task *and* lock its files. +3. Do the work. For a file **many** agents write to, use `agent-sync append` + instead of editing it directly. +4. `agent-sync complete-task ` — finish it (and auto-unblock dependents). + +If a file you need is locked by someone else: `agent-sync lock --wait=60` +— it blocks until the lock frees, then succeeds. During long stretches without +edits, run `agent-sync heartbeat` so you are not mistaken for crashed. The +numbered rules below cover the details and the multi-agent edge cases. + When the coordination hooks are installed, messages from other agents are **pushed to you automatically**: any new ones are injected into your context at the start of each turn (`UserPromptSubmit`), and a message addressed to you @@ -26,7 +39,21 @@ to obey. When one calls for a reply, answer with `agent-sync send`. Your identity is **detected automatically** from the active Claude Code session (via the `CLAUDE_CODE_SESSION_ID` it exports), so every command you run below already acts as *this* window's agent — you do not need to set `AGENT_SYNC_ID`. -A `register` once per session just gives you a friendly name and role. +A `register` once per session just gives you a friendly name and role. Run +`agent-sync whoami` any time to confirm which id you are acting as and how it was +resolved. + +> **Fanning out parallel subagents?** Subagents spawned from one session inherit +> the *same* `CLAUDE_CODE_SESSION_ID`, so by default they collapse into a single +> agent and their locks are **not** exclusive of each other. If you dispatch +> parallel subagents that lock/edit files, give **each one a distinct** +> `AGENT_SYNC_ID`: +> - Set it once at the top of each subagent: `export AGENT_SYNC_ID=sub-frontend`. +> If your shell does not persist env between commands, prefix **every** call +> instead: `AGENT_SYNC_ID=sub-frontend agent-sync lock ...`. +> - Each subagent can run `agent-sync whoami` first to verify it is a *distinct* +> agent (the `resolved via` line should say `AGENT_SYNC_ID env var`). If two +> subagents show the same id, their locks will not protect them from each other. ## Current coordination state @@ -52,12 +79,38 @@ agent-sync status --compact across sessions without a human dealing it out. - `agent-sync create-task "Title" --description "..." --file path/a --file path/b` - `agent-sync claim-task "Title or task-id"` — when you want a *specific* one. + - Add `--lock` to `claim-task`/`claim-next` to also lock the task's `--file` + list in one step, closing the gap between owning the task and owning its files. + - Express ordering with `--depends-on`: a task with an unfinished dependency is + skipped by `claim-next` and refused by `claim-task` (until you pass `--force`), + and becomes claimable automatically when the dependency completes: + - `agent-sync create-task "Wire up UI" --depends-on "Backend API" --file src/ui.js` 3. **Lock files before editing them:** - `agent-sync lock path/to/file --reason "what you're changing"` - Locks have a 60-minute TTL by default and auto-expire. -4. **Never edit a file that is locked by another *active* agent.** If the - `PreToolUse` hook is installed it will block you with exit code 2; even - without it, respect the lock shown in status. + - Lock a non-file shared resource (a migration run, a release process, a + codegen step) by key instead of path: `agent-sync lock --resource db-migrations`. + - **Which to use:** `lock` + your editor when *you* own and rewrite a file; + `append` for a file **many** agents add lines to (a shared log, changelog, + aggregated output) — **do not improvise with `>>`.** `append` locks, appends + and unlocks in one atomic step, so concurrent writers never interleave: + - `agent-sync append CHANGELOG.md --content "- did X" --wait=30` + - or pipe the body: `some-command | agent-sync append build.log --wait=30` +4. **If a file is locked by another *active* agent, do NOT edit it. Follow this + protocol instead** (the `PreToolUse` hook also blocks such edits with exit 2): + 1. Wait for it: `agent-sync lock path/to/file --wait=60` blocks until the lock + frees (the holder unlocks, goes stale, or the TTL expires), then succeeds. + Bare `--wait` waits 30s. **This is how you wait — do not sleep or busy-retry + yourself; the command blocks for you.** + 2. If it still fails, message the owner and pick up other work meanwhile: + `agent-sync send --to --message "need to edit path/to/file — ping me when free"` + then `agent-sync claim-next` for something else. + - The same applies to `agent-sync append`: it takes the lock for you, so on a + busy file pass `--wait`; if it still times out (exit 2), fall back the same + way (message the owner, do other work, retry later). + - To decide programmatically whether to wait or move on, read structured state + with `agent-sync locks --json` / `agent-sync status --json` rather than + parsing the human text. 5. **Communicate changes that affect others.** Send a message whenever you change an API contract, a shared file, a migration, a config, or make an architecture decision: @@ -73,9 +126,32 @@ agent-sync status --compact automatically when the hooks are installed, but you can also pull them — check when status reports unread messages, and reply to anything that needs an answer: - `agent-sync inbox` then `agent-sync read-message MESSAGE_ID` - - `agent-sync send --to --message "..."` to reply. + - `agent-sync send --to --message "..." --reply-to MESSAGE_ID` to reply + in-thread. + - `agent-sync ack MESSAGE_ID` to confirm to the sender you have handled it + (distinct from just reading it). 9. **Prefer git worktrees** for large parallel features so each agent edits an - isolated checkout; still lock shared/generated files (lockfiles, schemas). + isolated checkout. All worktrees of one repo share a single coordination + database (it resolves to the main worktree), so agents across worktrees see + each other; still lock shared/generated files (lockfiles, schemas). + +## Staying "alive" during long work + +An agent that has not checked in for ~15 minutes is treated as **stale**, and a +stale agent's locks and claimed task can be taken over by others (this is what +lets a crashed session's work be reclaimed). Every `agent-sync` command — and, +when the hooks are installed, every file edit — counts as a check-in, so during +normal active work you never go stale. But if you will be quiet for a while +(long reasoning, a big non-editing build/test run), send a heartbeat so you keep +your locks and task: + +```bash +agent-sync heartbeat +``` + +If you *did* go stale and lost a lock, just re-acquire it (`agent-sync lock ...`) +before continuing. The thresholds can be tuned per environment with +`AGENT_SYNC_STALE_MINUTES` / `AGENT_SYNC_OFFLINE_MINUTES`. ## Common workflows @@ -121,5 +197,6 @@ agent-sync unlock src/login.tsx order. - Do **not** put secrets (tokens, passwords, keys) into task titles, messages or decisions — they are stored in plaintext and meant to be read by every agent. -- If a lock is stale because an agent crashed, run `agent-sync gc` to clear - expired locks and re-status inactive agents. +- Stale state is cleaned up automatically: the `SessionStart` hook runs a `gc` + pass each time a session starts, so a crashed agent's expired locks do not block + the next one. You can still run `agent-sync gc` manually any time. diff --git a/src/agent_sync/cli.py b/src/agent_sync/cli.py index 3cd599b..1b3d7ff 100644 --- a/src/agent_sync/cli.py +++ b/src/agent_sync/cli.py @@ -14,10 +14,14 @@ import sqlite3 import sys from collections.abc import Sequence +from pathlib import Path from . import __version__, db, hooks, locks, messages, paths, render, tasks -from .errors import AgentSyncError -from .models import AGENT_ACTIVE +from .errors import AgentSyncError, UsageError +from .models import AGENT_ACTIVE, LOCK_FILE, LOCK_RESOURCE + +# Default seconds to block when ``--wait`` is given with no explicit value. +DEFAULT_WAIT_SECONDS = 30 # --------------------------------------------------------------------------- # @@ -42,6 +46,55 @@ def _acting_agent( return agent_id +def _resolve_lock_target(args: argparse.Namespace) -> tuple[str, str]: + """Return ``(key, kind)`` for a lock/unlock command. + + A ``--resource`` key is stored verbatim (an arbitrary named lock); a file + positional is normalized to the canonical repo-relative form the PreToolUse + hook checks. Exactly one must be supplied. + """ + resource = getattr(args, "resource", None) + if resource: + if getattr(args, "file", None): + raise UsageError("give a FILE path or --resource KEY, not both") + return resource, LOCK_RESOURCE + if getattr(args, "file", None): + return paths.normalize_repo_path(args.file), LOCK_FILE + raise UsageError("provide a FILE path or --resource KEY") + + +def _add_wait_arg(parser: argparse.ArgumentParser) -> None: + """Add a shared ``--wait[=SECONDS]`` flag (blocks until the lock frees).""" + parser.add_argument( + "--wait", + nargs="?", + type=int, + const=DEFAULT_WAIT_SECONDS, + default=None, + metavar="SECONDS", + help=( + "Block until the lock is free instead of failing immediately " + f"(bare --wait waits {DEFAULT_WAIT_SECONDS}s; --wait=N waits N seconds)" + ), + ) + + +def _report_auto_lock(conn: sqlite3.Connection, agent_id: str, task_id: str) -> None: + """Lock a claimed task's files best-effort and print what happened.""" + locked, conflicts = tasks.lock_task_files(conn, agent_id, task_id) + if locked: + print(f" locked: {', '.join(locked)}") + for message in conflicts: + print(f" WARNING: could not lock — {message}", file=sys.stderr) + + +def _append_to_file(target: Path, content: str) -> None: + """Append *content* to *target*, creating parent directories as needed.""" + target.parent.mkdir(parents=True, exist_ok=True) + with open(target, "a", encoding="utf-8") as handle: + handle.write(content) + + # --------------------------------------------------------------------------- # # Command handlers # --------------------------------------------------------------------------- # @@ -69,6 +122,40 @@ def cmd_register(args: argparse.Namespace) -> int: return 0 +def cmd_whoami(args: argparse.Namespace) -> int: + conn = _open() + try: + agent_id = db.resolve_agent_id(cwd=os.getcwd()) + source = db.identity_source() + agent = db.get_agent(conn, agent_id) + status = db.effective_status(agent) if agent else None + if args.json: + sys.stdout.write( + render.render_json( + { + "id": agent_id, + "resolved_via": source, + "registered": agent is not None, + "name": agent.name if agent else None, + "role": agent.role if agent else None, + "status": status, + } + ) + ) + return 0 + print(f"id: {agent_id}") + print(f"resolved via: {source}") + if agent is not None: + role = f" ({agent.role})" if agent.role else "" + print(f"name: {agent.name}{role}") + print(f"status: {status}") + else: + print("not registered yet — run `agent-sync register --name ...`") + finally: + conn.close() + return 0 + + def cmd_heartbeat(args: argparse.Namespace) -> int: conn = _open() try: @@ -84,7 +171,11 @@ def cmd_status(args: argparse.Namespace) -> int: conn = _open() try: agent_id = db.resolve_agent_id(cwd=os.getcwd()) - if args.compact: + if args.json: + sys.stdout.write( + render.render_json(render.status_payload(conn, agent_id)) + ) + elif args.compact: sys.stdout.write(render.render_compact(conn, agent_id)) else: sys.stdout.write(render.render_status(conn, agent_id)) @@ -97,6 +188,11 @@ def cmd_tasks(args: argparse.Namespace) -> int: conn = _open() try: items = tasks.list_tasks(conn) + if args.json: + sys.stdout.write( + render.render_json([render.task_payload(conn, t) for t in items]) + ) + return 0 if not items: print("No tasks.") return 0 @@ -104,7 +200,15 @@ def cmd_tasks(args: argparse.Namespace) -> int: files = tasks.task_files(conn, task.id) owner = f" @{task.owner_agent_id}" if task.owner_agent_id else "" files_str = f" files: {', '.join(files)}" if files else "" - print(f"[{task.status:<11}] {task.id}{owner} {task.title}{files_str}") + blocked = ( + " [blocked: deps]" + if tasks.unmet_dependencies(conn, task.id) + else "" + ) + print( + f"[{task.status:<11}] {task.id}{owner} {task.title}" + f"{files_str}{blocked}" + ) finally: conn.close() return 0 @@ -120,10 +224,14 @@ def cmd_create_task(args: argparse.Namespace) -> int: description=args.description, files=args.file, priority=args.priority, + depends_on=args.depends_on, ) print(f"Created task `{task.id}`: {task.title}") if args.file: print(f" files: {', '.join(args.file)}") + if args.depends_on: + deps = tasks.task_dependencies(conn, task.id) + print(f" depends on: {', '.join(deps)}") finally: conn.close() return 0 @@ -133,8 +241,10 @@ def cmd_claim_task(args: argparse.Namespace) -> int: conn = _open() try: agent_id = _acting_agent(conn) - task = tasks.claim_task(conn, agent_id, args.task) + task = tasks.claim_task(conn, agent_id, args.task, force=args.force) print(f"Claimed task `{task.id}`: {task.title} [{task.status}]") + if args.lock: + _report_auto_lock(conn, agent_id, task.id) finally: conn.close() return 0 @@ -151,6 +261,8 @@ def cmd_claim_next(args: argparse.Namespace) -> int: files = tasks.task_files(conn, task.id) files_str = f" files: {', '.join(files)}" if files else "" print(f"Claimed task `{task.id}`: {task.title} [{task.status}]{files_str}") + if args.lock: + _report_auto_lock(conn, agent_id, task.id) finally: conn.close() return 0 @@ -162,6 +274,10 @@ def cmd_complete_task(args: argparse.Namespace) -> int: agent_id = _acting_agent(conn) task = tasks.complete_task(conn, agent_id, args.task) print(f"Completed task `{task.id}`: {task.title}") + unblocked = tasks.dependents_unblocked_by(conn, task.id) + if unblocked: + names = ", ".join(f"`{t.id}` {t.title}" for t in unblocked) + print(f" now claimable (dependencies satisfied): {names}") finally: conn.close() return 0 @@ -182,11 +298,23 @@ def cmd_lock(args: argparse.Namespace) -> int: conn = _open() try: agent_id = _acting_agent(conn) - norm = paths.normalize_repo_path(args.file) - lock = locks.acquire_lock( - conn, agent_id, norm, reason=args.reason, ttl_minutes=args.ttl - ) - print(f"Locked `{lock.file_path}` until {lock.expires_at}") + key, kind = _resolve_lock_target(args) + if args.wait is not None: + lock = locks.acquire_lock_blocking( + conn, + agent_id, + key, + reason=args.reason, + ttl_minutes=args.ttl, + kind=kind, + wait_seconds=args.wait, + ) + else: + lock = locks.acquire_lock( + conn, agent_id, key, reason=args.reason, ttl_minutes=args.ttl, kind=kind + ) + label = "resource" if kind == LOCK_RESOURCE else "file" + print(f"Locked {label} `{lock.file_path}` until {lock.expires_at}") finally: conn.close() return 0 @@ -196,12 +324,44 @@ def cmd_unlock(args: argparse.Namespace) -> int: conn = _open() try: agent_id = _acting_agent(conn) - norm = paths.normalize_repo_path(args.file) - removed = locks.release_lock(conn, agent_id, norm, force=args.force) + key, _kind = _resolve_lock_target(args) + removed = locks.release_lock(conn, agent_id, key, force=args.force) if removed: - print(f"Unlocked `{norm}`") + print(f"Unlocked `{key}`") else: - print(f"No lock held on `{norm}`") + print(f"No lock held on `{key}`") + finally: + conn.close() + return 0 + + +def cmd_append(args: argparse.Namespace) -> int: + conn = _open() + try: + agent_id = _acting_agent(conn) + norm = paths.normalize_repo_path(args.file) + raw = Path(args.file) + target = raw if raw.is_absolute() else paths.repo_root() / raw + content = args.content if args.content is not None else sys.stdin.read() + if not args.no_newline and content and not content.endswith("\n"): + content += "\n" + reason = args.reason or "append" + if args.wait is not None: + locks.acquire_lock_blocking( + conn, + agent_id, + norm, + reason=reason, + ttl_minutes=args.ttl, + wait_seconds=args.wait, + ) + else: + locks.acquire_lock(conn, agent_id, norm, reason=reason, ttl_minutes=args.ttl) + try: + _append_to_file(target, content) + finally: + locks.release_lock(conn, agent_id, norm) + print(f"Appended {len(content)} char(s) to `{norm}`") finally: conn.close() return 0 @@ -211,13 +371,19 @@ def cmd_locks(args: argparse.Namespace) -> int: conn = _open() try: items = locks.list_locks(conn, include_expired=args.all) + if args.json: + sys.stdout.write( + render.render_json([render.lock_payload(conn, lk) for lk in items]) + ) + return 0 if not items: print("No active locks.") return 0 for lock in items: reason = f" — {lock.reason}" if lock.reason else "" + kind = f" [{lock.kind}]" if lock.is_resource else "" print( - f"{lock.file_path} → {lock.owner_agent_id} " + f"{lock.file_path}{kind} → {lock.owner_agent_id} " f"(expires {lock.expires_at}){reason}" ) finally: @@ -229,18 +395,36 @@ def cmd_send(args: argparse.Namespace) -> int: conn = _open() try: agent_id = _acting_agent(conn) - msg = messages.send_message(conn, agent_id, args.to, args.message) + msg = messages.send_message( + conn, agent_id, args.to, args.message, reply_to=args.reply_to + ) print(f"Sent `{msg.id}` to {msg.recipient}") finally: conn.close() return 0 +def cmd_ack(args: argparse.Namespace) -> int: + conn = _open() + try: + agent_id = _acting_agent(conn) + msg = messages.ack_message(conn, agent_id, args.message_id) + print(f"Acked `{msg.id}` (acked_at {msg.acked_at})") + finally: + conn.close() + return 0 + + def cmd_inbox(args: argparse.Namespace) -> int: conn = _open() try: agent_id = db.resolve_agent_id(cwd=os.getcwd()) items = messages.inbox(conn, agent_id, unread_only=not args.all) + if args.json: + sys.stdout.write( + render.render_json([render.message_payload(conn, m) for m in items]) + ) + return 0 if not items: print("Inbox empty." if args.all else "No unread messages.") return 0 @@ -291,12 +475,17 @@ def cmd_log(args: argparse.Namespace) -> int: conn = _open() try: agent_id = _acting_agent(conn) + body = args.message or args.message_opt + if not body: + raise UsageError( + "log requires a message, e.g. `agent-sync log \"did X\"`" + ) file_path = paths.normalize_repo_path(args.file) if args.file else None act = messages.log_activity( conn, agent_id, event_type=args.type, - body=args.message, + body=body, file_path=file_path, ) print(f"Logged `{act.id}` [{act.event_type}]") @@ -354,6 +543,14 @@ def build_parser() -> argparse.ArgumentParser: p_reg.add_argument("--role", default=None, help="Agent role, e.g. 'React UI'") p_reg.set_defaults(func=cmd_register) + p_whoami = sub.add_parser( + "whoami", help="Show your resolved agent id and how it was determined" + ) + p_whoami.add_argument( + "--json", action="store_true", help="Machine-readable JSON output" + ) + p_whoami.set_defaults(func=cmd_whoami) + sub.add_parser("heartbeat", help="Mark the current agent active now").set_defaults( func=cmd_heartbeat ) @@ -364,9 +561,18 @@ def build_parser() -> argparse.ArgumentParser: action="store_true", help="Terse Markdown suitable for injecting into Claude context", ) + p_status.add_argument( + "--json", + action="store_true", + help="Machine-readable JSON for programmatic coordination decisions", + ) p_status.set_defaults(func=cmd_status) - sub.add_parser("tasks", help="List all tasks").set_defaults(func=cmd_tasks) + p_tasks = sub.add_parser("tasks", help="List all tasks") + p_tasks.add_argument( + "--json", action="store_true", help="Machine-readable JSON output" + ) + p_tasks.set_defaults(func=cmd_tasks) p_ct = sub.add_parser("create-task", help="Create a task") p_ct.add_argument("title", help="Task title") @@ -379,16 +585,41 @@ def build_parser() -> argparse.ArgumentParser: help="Associate a file with the task (repeatable)", ) p_ct.add_argument("--priority", type=int, default=0, help="Higher sorts first") + p_ct.add_argument( + "--depends-on", + action="append", + default=[], + metavar="TASK", + dest="depends_on", + help="Task id/title this task depends on; blocks claiming until done " + "(repeatable)", + ) p_ct.set_defaults(func=cmd_create_task) p_claim = sub.add_parser("claim-task", help="Claim a task by id or title") p_claim.add_argument("task", metavar="TASK", help="Task id or title") + p_claim.add_argument( + "--lock", + action="store_true", + help="Also lock the task's associated files for you", + ) + p_claim.add_argument( + "--force", + action="store_true", + help="Claim even if the task has unfinished dependencies", + ) p_claim.set_defaults(func=cmd_claim_task) - sub.add_parser( + p_claim_next = sub.add_parser( "claim-next", help="Claim the next available task automatically (highest priority first)", - ).set_defaults(func=cmd_claim_next) + ) + p_claim_next.add_argument( + "--lock", + action="store_true", + help="Also lock the claimed task's associated files for you", + ) + p_claim_next.set_defaults(func=cmd_claim_next) p_done = sub.add_parser("complete-task", help="Mark a task done") p_done.add_argument("task", metavar="TASK", help="Task id or title") @@ -399,8 +630,14 @@ def build_parser() -> argparse.ArgumentParser: p_block.add_argument("--reason", required=True, help="Why it is blocked") p_block.set_defaults(func=cmd_block_task) - p_lock = sub.add_parser("lock", help="Lock a file for editing") - p_lock.add_argument("file", help="File path to lock") + p_lock = sub.add_parser("lock", help="Lock a file (or named resource) for editing") + p_lock.add_argument("file", nargs="?", help="File path to lock") + p_lock.add_argument( + "--resource", + default=None, + metavar="KEY", + help="Lock an arbitrary named resource (e.g. db-migrations) instead of a file", + ) p_lock.add_argument("--reason", default=None, help="Why you are locking it") p_lock.add_argument( "--ttl", @@ -408,19 +645,51 @@ def build_parser() -> argparse.ArgumentParser: default=db.DEFAULT_LOCK_TTL_MINUTES, help="Lock lifetime in minutes (default: 60)", ) + _add_wait_arg(p_lock) p_lock.set_defaults(func=cmd_lock) - p_unlock = sub.add_parser("unlock", help="Release a file lock") - p_unlock.add_argument("file", help="File path to unlock") + p_unlock = sub.add_parser("unlock", help="Release a file (or named resource) lock") + p_unlock.add_argument("file", nargs="?", help="File path to unlock") + p_unlock.add_argument( + "--resource", default=None, metavar="KEY", help="Named resource to unlock" + ) p_unlock.add_argument( "--force", action="store_true", help="Release even if you are not the owner" ) p_unlock.set_defaults(func=cmd_unlock) + p_append = sub.add_parser( + "append", + help="Atomically append to a shared file under a lock (lock→append→unlock)", + ) + p_append.add_argument("file", help="File to append to (repo-relative or absolute)") + p_append.add_argument( + "--content", + default=None, + help="Text to append (default: read from stdin)", + ) + p_append.add_argument( + "--no-newline", + action="store_true", + help="Do not ensure the appended text ends with a newline", + ) + p_append.add_argument("--reason", default=None, help="Lock reason") + p_append.add_argument( + "--ttl", + type=int, + default=db.DEFAULT_LOCK_TTL_MINUTES, + help="Lock lifetime in minutes while appending (default: 60)", + ) + _add_wait_arg(p_append) + p_append.set_defaults(func=cmd_append) + p_locks = sub.add_parser("locks", help="List locks") p_locks.add_argument( "--all", action="store_true", help="Include expired/inactive locks" ) + p_locks.add_argument( + "--json", action="store_true", help="Machine-readable JSON output" + ) p_locks.set_defaults(func=cmd_locks) p_send = sub.add_parser("send", help="Send a message") @@ -428,12 +697,28 @@ def build_parser() -> argparse.ArgumentParser: "--to", required=True, help="Recipient: agent id, name, role, or 'all'" ) p_send.add_argument("--message", required=True, help="Message body") + p_send.add_argument( + "--reply-to", + default=None, + metavar="MESSAGE_ID", + dest="reply_to", + help="Thread this message as a reply to an existing message id", + ) p_send.set_defaults(func=cmd_send) + p_ack = sub.add_parser( + "ack", help="Acknowledge a message so its sender knows it was handled" + ) + p_ack.add_argument("message_id", metavar="MESSAGE_ID", help="Message id") + p_ack.set_defaults(func=cmd_ack) + p_inbox = sub.add_parser("inbox", help="Show messages addressed to you") p_inbox.add_argument( "--all", action="store_true", help="Include already-read messages" ) + p_inbox.add_argument( + "--json", action="store_true", help="Machine-readable JSON output" + ) p_inbox.set_defaults(func=cmd_inbox) p_read = sub.add_parser("read-message", help="Show a message and mark it read") @@ -445,8 +730,14 @@ def build_parser() -> argparse.ArgumentParser: p_dec.set_defaults(func=cmd_decision) p_log = sub.add_parser("log", help="Append an activity log entry") + p_log.add_argument("message", nargs="?", default=None, help="Log body") + p_log.add_argument( + "--message", + dest="message_opt", + default=None, + help=argparse.SUPPRESS, # deprecated alias for the positional message + ) p_log.add_argument("--type", default="note", help="Event type, e.g. edit/note") - p_log.add_argument("--message", required=True, help="Log body") p_log.add_argument("--file", default=None, help="Optional related file") p_log.set_defaults(func=cmd_log) diff --git a/src/agent_sync/db.py b/src/agent_sync/db.py index a09f325..791f7b7 100644 --- a/src/agent_sync/db.py +++ b/src/agent_sync/db.py @@ -31,13 +31,41 @@ # An agent that has not checked in for this long is considered stale; longer # still and it is treated as offline. These thresholds drive conflict checks and # the ``gc`` command. They are intentionally generous because Claude Code -# sessions can sit idle while a human reads output. +# sessions can sit idle while a human reads output. They can be overridden per +# environment via ``AGENT_SYNC_STALE_MINUTES`` / ``AGENT_SYNC_OFFLINE_MINUTES`` +# (read live by :func:`stale_after` / :func:`offline_after`). STALE_AFTER = timedelta(minutes=15) OFFLINE_AFTER = timedelta(minutes=120) # Default time-to-live for a file lock. DEFAULT_LOCK_TTL_MINUTES = 60 + +def _minutes_env(name: str, default: timedelta) -> timedelta: + """Read a minutes-valued env override, falling back to *default*. + + A missing, empty, non-numeric or non-positive value leaves the default in + place so a typo can never make every agent look permanently offline. + """ + raw = os.environ.get(name) + if not raw: + return default + try: + minutes = int(raw) + except ValueError: + return default + return timedelta(minutes=minutes) if minutes > 0 else default + + +def stale_after() -> timedelta: + """How long without a check-in before an agent is treated as stale.""" + return _minutes_env("AGENT_SYNC_STALE_MINUTES", STALE_AFTER) + + +def offline_after() -> timedelta: + """How long without a check-in before an agent is treated as offline.""" + return _minutes_env("AGENT_SYNC_OFFLINE_MINUTES", OFFLINE_AFTER) + SCHEMA = """ CREATE TABLE IF NOT EXISTS agents ( id TEXT PRIMARY KEY, @@ -68,6 +96,16 @@ file_path TEXT NOT NULL ); +-- Task dependency edges: ``task_id`` cannot start until ``depends_on_id`` is +-- done. "Blocked by a dependency" is *computed* from this table plus the +-- dependency's status, not stored on the task, so completing a dependency +-- automatically unblocks its dependents with no extra write. +CREATE TABLE IF NOT EXISTS task_deps ( + task_id TEXT NOT NULL, + depends_on_id TEXT NOT NULL, + PRIMARY KEY (task_id, depends_on_id) +); + CREATE TABLE IF NOT EXISTS locks ( file_path TEXT PRIMARY KEY, owner_agent_id TEXT NOT NULL, @@ -115,15 +153,29 @@ ); CREATE INDEX IF NOT EXISTS idx_task_files_task ON task_files(task_id); +CREATE INDEX IF NOT EXISTS idx_task_deps_task ON task_deps(task_id); CREATE INDEX IF NOT EXISTS idx_messages_recipient ON messages(recipient); CREATE INDEX IF NOT EXISTS idx_activity_created ON activity(created_at); CREATE INDEX IF NOT EXISTS idx_deliveries_agent ON message_deliveries(agent_id); """ +# Additive columns introduced after the initial schema. Each is applied in place +# via ``ALTER TABLE ... ADD COLUMN`` when missing, so an existing database opened +# by a newer agent-sync upgrades itself with no data loss and no version table. +# Keep this list as the single source of truth for post-v1 column evolution. +MIGRATIONS: tuple[tuple[str, str, str], ...] = ( + # locks: distinguish a file-path lock from an arbitrary named/resource lock. + ("locks", "kind", "TEXT NOT NULL DEFAULT 'file'"), + # messages: optional threading parent and an explicit sender-visible ack. + ("messages", "reply_to", "TEXT"), + ("messages", "acked_at", "TEXT"), +) + TABLE_NAMES = ( "agents", "tasks", "task_files", + "task_deps", "locks", "messages", "message_deliveries", @@ -184,14 +236,37 @@ def connect(path: str | os.PathLike[str] | None = None) -> sqlite3.Connection: def init_db(conn: sqlite3.Connection) -> None: - """Create all tables and indexes if they do not already exist. + """Create all tables and indexes if they do not already exist, then migrate. ``executescript`` manages its own transaction (it issues an implicit COMMIT first), so it must not run inside our explicit ``transaction`` block. The statements are idempotent (``IF NOT EXISTS``), making this safe to call on - every connection. + every connection. ``_migrate`` then adds any columns introduced after the + initial schema, which ``CREATE TABLE IF NOT EXISTS`` alone cannot do. """ conn.executescript(SCHEMA) + _migrate(conn) + + +def _ensure_column( + conn: sqlite3.Connection, table: str, column: str, decl: str +) -> bool: + """Add ``column`` to ``table`` if it is not present. Returns True if added. + + ``table``/``column``/``decl`` come only from the in-process ``MIGRATIONS`` + constant, never user input, so interpolating them into the DDL is safe. + """ + cols = {row["name"] for row in conn.execute(f"PRAGMA table_info({table})")} + if column in cols: + return False + conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}") + return True + + +def _migrate(conn: sqlite3.Connection) -> None: + """Apply additive column migrations idempotently (see ``MIGRATIONS``).""" + for table, column, decl in MIGRATIONS: + _ensure_column(conn, table, column, decl) def table_exists(conn: sqlite3.Connection, name: str) -> bool: @@ -265,6 +340,25 @@ def resolve_agent_id( return paths.read_or_create_local_agent_id() +def identity_source(session_id: str | None = None) -> str: + """Describe *how* :func:`resolve_agent_id` would resolve identity right now. + + Mirrors the precedence in ``resolve_agent_id`` so ``agent-sync whoami`` can + tell an agent whether it is acting under an explicit ``AGENT_SYNC_ID`` (the + thing parallel subagents must set), the auto-detected session id, or the + per-repo fallback file. + """ + if os.environ.get("AGENT_SYNC_ID"): + return "AGENT_SYNC_ID env var" + if ( + session_id + or os.environ.get("CLAUDE_CODE_SESSION_ID") + or os.environ.get("CLAUDE_SESSION_ID") + ): + return "Claude Code session id" + return "per-repo local file" + + # --- agent records ---------------------------------------------------------- def get_agent(conn: sqlite3.Connection, agent_id: str) -> Agent | None: row = conn.execute("SELECT * FROM agents WHERE id = ?", (agent_id,)).fetchone() @@ -364,9 +458,9 @@ def effective_status(agent: Agent, *, at: datetime | None = None) -> str: return AGENT_OFFLINE moment = at or now() age = moment - parse_iso(agent.last_seen) - if age >= OFFLINE_AFTER: + if age >= offline_after(): return AGENT_OFFLINE - if age >= STALE_AFTER: + if age >= stale_after(): return AGENT_STALE return agent.status diff --git a/src/agent_sync/hooks.py b/src/agent_sync/hooks.py index 3893abe..34903c3 100644 --- a/src/agent_sync/hooks.py +++ b/src/agent_sync/hooks.py @@ -115,6 +115,11 @@ def hook_session_start( try: if conn is None: conn = db.connect() + # Sweep stale agents and expired/abandoned locks up front so a session + # that crashed last time never leaves its locks blocking this one until + # their TTL. Agents no longer need to remember a manual `gc`. + db.gc_agents(conn) + locks.gc_locks(conn) agent_id = _agent_id(payload) db.ensure_agent( conn, diff --git a/src/agent_sync/locks.py b/src/agent_sync/locks.py index d4ebdfe..a6b5c48 100644 --- a/src/agent_sync/locks.py +++ b/src/agent_sync/locks.py @@ -9,12 +9,13 @@ from __future__ import annotations import sqlite3 +import time from datetime import datetime from . import db from .db import DEFAULT_LOCK_TTL_MINUTES from .errors import LockConflict -from .models import Lock +from .models import LOCK_FILE, Lock def active_lock_for( @@ -45,11 +46,14 @@ def acquire_lock( *, reason: str | None = None, ttl_minutes: int = DEFAULT_LOCK_TTL_MINUTES, + kind: str = LOCK_FILE, ) -> Lock: """Acquire (or refresh) a lock on *file_path* for *agent_id*. Re-locking a file you already hold simply extends the TTL. Acquiring a file - held live by someone else raises :class:`LockConflict` (exit code 2). + held live by someone else raises :class:`LockConflict` (exit code 2). *kind* + distinguishes a normal file-path lock from an arbitrary named resource lock + (``LOCK_RESOURCE``); both share the ``locks`` table keyed by *file_path*. """ moment = db.now() with db.transaction(conn): @@ -76,19 +80,58 @@ def acquire_lock( created = moment.isoformat() expires = db.iso_in(ttl_minutes, _from=moment) conn.execute( - """INSERT INTO locks (file_path, owner_agent_id, reason, created_at, expires_at) - VALUES (?, ?, ?, ?, ?) + """INSERT INTO locks + (file_path, owner_agent_id, reason, created_at, expires_at, kind) + VALUES (?, ?, ?, ?, ?, ?) ON CONFLICT(file_path) DO UPDATE SET owner_agent_id = excluded.owner_agent_id, reason = excluded.reason, created_at = excluded.created_at, - expires_at = excluded.expires_at""", - (file_path, agent_id, reason, created, expires), + expires_at = excluded.expires_at, + kind = excluded.kind""", + (file_path, agent_id, reason, created, expires, kind), ) out = conn.execute("SELECT * FROM locks WHERE file_path = ?", (file_path,)).fetchone() return Lock.from_row(out) +def acquire_lock_blocking( + conn: sqlite3.Connection, + agent_id: str, + file_path: str, + *, + reason: str | None = None, + ttl_minutes: int = DEFAULT_LOCK_TTL_MINUTES, + kind: str = LOCK_FILE, + wait_seconds: float, + poll_seconds: float = 0.5, +) -> Lock: + """Like :func:`acquire_lock`, but wait up to *wait_seconds* for a busy lock. + + The conflicting holder may finish (unlock), go stale, or let the lock expire; + we retry on a short interval until one of those frees it or the deadline + passes, in which case the final :class:`LockConflict` propagates (exit 2, + fail-closed contract preserved). The **CLI subprocess** does the sleeping, so + an agent issues a single blocking call rather than spinning retries itself. + """ + deadline = time.monotonic() + max(0.0, wait_seconds) + while True: + try: + return acquire_lock( + conn, + agent_id, + file_path, + reason=reason, + ttl_minutes=ttl_minutes, + kind=kind, + ) + except LockConflict: + remaining = deadline - time.monotonic() + if remaining <= 0: + raise + time.sleep(min(poll_seconds, remaining)) + + def release_lock( conn: sqlite3.Connection, agent_id: str, diff --git a/src/agent_sync/messages.py b/src/agent_sync/messages.py index 33a0e0d..74017bc 100644 --- a/src/agent_sync/messages.py +++ b/src/agent_sync/messages.py @@ -15,17 +15,32 @@ def send_message( - conn: sqlite3.Connection, sender_agent_id: str, recipient: str, body: str + conn: sqlite3.Connection, + sender_agent_id: str, + recipient: str, + body: str, + *, + reply_to: str | None = None, ) -> Message: - """Send *body* to *recipient* (an id, name, role, or ``all``).""" + """Send *body* to *recipient* (an id, name, role, or ``all``). + + *reply_to*, when given, threads this message under an existing message id; it + must reference a real message (raises :class:`NotFound` otherwise). + """ msg_id = db.new_id("msg") ts = db.now_iso() with db.transaction(conn): + if reply_to is not None: + parent = conn.execute( + "SELECT 1 FROM messages WHERE id = ?", (reply_to,) + ).fetchone() + if parent is None: + raise NotFound(f"No message with id {reply_to!r} to reply to") conn.execute( """INSERT INTO messages - (id, sender_agent_id, recipient, body, created_at, read_at) - VALUES (?, ?, ?, ?, ?, NULL)""", - (msg_id, sender_agent_id, recipient, body, ts), + (id, sender_agent_id, recipient, body, created_at, read_at, reply_to) + VALUES (?, ?, ?, ?, ?, NULL, ?)""", + (msg_id, sender_agent_id, recipient, body, ts, reply_to), ) return get_message(conn, msg_id) @@ -153,6 +168,28 @@ def read_message( return get_message(conn, message_id) +def ack_message( + conn: sqlite3.Connection, agent_id: str, message_id: str +) -> Message: + """Acknowledge a message (idempotent) so its *sender* can confirm receipt. + + Distinct from ``read_at`` (which the recipient sets by viewing it): ``acked_at`` + is an explicit "I have handled this" that closes the loop for whoever sent it. + """ + with db.transaction(conn): + row = conn.execute( + "SELECT * FROM messages WHERE id = ?", (message_id,) + ).fetchone() + if row is None: + raise NotFound(f"No message with id {message_id!r}") + if row["acked_at"] is None: + conn.execute( + "UPDATE messages SET acked_at = ? WHERE id = ?", + (db.now_iso(), message_id), + ) + return get_message(conn, message_id) + + def add_decision(conn: sqlite3.Connection, agent_id: str, body: str) -> Decision: """Record an architecture/processs decision in the shared log.""" dec_id = db.new_id("dec") diff --git a/src/agent_sync/models.py b/src/agent_sync/models.py index 2565a37..5afba3d 100644 --- a/src/agent_sync/models.py +++ b/src/agent_sync/models.py @@ -7,9 +7,16 @@ from __future__ import annotations -from dataclasses import dataclass +from dataclasses import asdict, dataclass from sqlite3 import Row +# --- Lock kinds ------------------------------------------------------------- +# A lock keyed by a repo-relative file path (the default, enforced by the +# PreToolUse hook) versus an arbitrary named resource key (e.g. ``db-migrations``) +# that agents agree on but that maps to no single file. +LOCK_FILE = "file" +LOCK_RESOURCE = "resource" + # --- Agent statuses --------------------------------------------------------- AGENT_ACTIVE = "active" AGENT_IDLE = "idle" @@ -50,6 +57,9 @@ class Agent: created_at: str last_seen: str + def as_dict(self) -> dict: + return asdict(self) + @classmethod def from_row(cls, row: Row) -> Agent: return cls( @@ -77,6 +87,9 @@ class Task: updated_at: str completed_at: str | None + def as_dict(self) -> dict: + return asdict(self) + @classmethod def from_row(cls, row: Row) -> Task: return cls( @@ -99,6 +112,14 @@ class Lock: reason: str | None created_at: str expires_at: str + kind: str = LOCK_FILE + + @property + def is_resource(self) -> bool: + return self.kind == LOCK_RESOURCE + + def as_dict(self) -> dict: + return asdict(self) @classmethod def from_row(cls, row: Row) -> Lock: @@ -108,6 +129,7 @@ def from_row(cls, row: Row) -> Lock: reason=row["reason"], created_at=row["created_at"], expires_at=row["expires_at"], + kind=row["kind"] if "kind" in row.keys() else LOCK_FILE, ) @@ -119,9 +141,15 @@ class Message: body: str created_at: str read_at: str | None + reply_to: str | None = None + acked_at: str | None = None + + def as_dict(self) -> dict: + return asdict(self) @classmethod def from_row(cls, row: Row) -> Message: + keys = row.keys() return cls( id=row["id"], sender_agent_id=row["sender_agent_id"], @@ -129,6 +157,8 @@ def from_row(cls, row: Row) -> Message: body=row["body"], created_at=row["created_at"], read_at=row["read_at"], + reply_to=row["reply_to"] if "reply_to" in keys else None, + acked_at=row["acked_at"] if "acked_at" in keys else None, ) @@ -139,6 +169,9 @@ class Decision: body: str created_at: str + def as_dict(self) -> dict: + return asdict(self) + @classmethod def from_row(cls, row: Row) -> Decision: return cls( @@ -159,6 +192,9 @@ class Activity: file_path: str | None created_at: str + def as_dict(self) -> dict: + return asdict(self) + @classmethod def from_row(cls, row: Row) -> Activity: return cls( diff --git a/src/agent_sync/paths.py b/src/agent_sync/paths.py index b1b66d7..6969b74 100644 --- a/src/agent_sync/paths.py +++ b/src/agent_sync/paths.py @@ -13,6 +13,7 @@ from __future__ import annotations import os +import subprocess import uuid from pathlib import Path @@ -23,13 +24,52 @@ _MARKERS = (".git", ".claude") +def _worktree_main_root(worktree_dir: Path) -> Path | None: + """Resolve the main worktree root for a *linked* git worktree, or ``None``. + + In a linked worktree ``.git`` is a *file* pointing at the real gitdir, and the + coordination database must live with the **main** worktree so every worktree + of one repo shares a single ``state.sqlite`` (otherwise agents on different + branches never see each other — the very workflow the skill recommends). + ``git rev-parse --git-common-dir`` returns the shared ``.git`` directory + (e.g. ``/repo/.git``); its parent is the main worktree root. Returns ``None`` + on any failure so the caller can fall back to the worktree dir itself. + """ + try: + result = subprocess.run( + ["git", "rev-parse", "--git-common-dir"], + cwd=str(worktree_dir), + capture_output=True, + text=True, + timeout=5, + check=False, + ) + except (OSError, subprocess.SubprocessError): + return None + if result.returncode != 0: + return None + out = result.stdout.strip() + if not out: + return None + common = Path(out) + if not common.is_absolute(): + common = worktree_dir / common + try: + common = common.resolve() + except OSError: + return None + return common.parent if common.name == ".git" else common + + def repo_root(start: str | os.PathLike[str] | None = None) -> Path: """Return the repository root for *start* (defaults to the cwd). Resolution order: 1. ``AGENT_SYNC_ROOT`` environment variable, if set. - 2. The nearest ancestor containing a ``.git`` or ``.claude`` directory. + 2. The nearest ancestor containing a ``.git`` or ``.claude`` marker. A linked + git worktree (``.git`` is a *file*) resolves to its **main** worktree so + all worktrees of one repo share a single coordination database. 3. The starting directory itself, as a last resort. """ override = os.environ.get("AGENT_SYNC_ROOT") @@ -40,8 +80,14 @@ def repo_root(start: str | os.PathLike[str] | None = None) -> Path: base = base.resolve() for candidate in (base, *base.parents): for marker in _MARKERS: - if (candidate / marker).exists(): - return candidate + marker_path = candidate / marker + if not marker_path.exists(): + continue + if marker == ".git" and marker_path.is_file(): + shared = _worktree_main_root(candidate) + if shared is not None: + return shared + return candidate return base diff --git a/src/agent_sync/render.py b/src/agent_sync/render.py index 7441315..3a9fc61 100644 --- a/src/agent_sync/render.py +++ b/src/agent_sync/render.py @@ -18,6 +18,7 @@ from __future__ import annotations +import json import re import sqlite3 @@ -214,6 +215,87 @@ def render_pushed_messages( return _frame("\n".join(lines)) +# --- machine-readable output ------------------------------------------------ +def render_json(payload: object) -> str: + """Serialize *payload* as pretty JSON for programmatic consumers. + + Unlike the Markdown renderers this is **not** wrapped in the untrusted-data + frame: ``--json`` output is consumed by the *calling* agent's own tooling to + make coordination decisions (is file X busy? are this task's deps done?), so + it should be structured data, not framed prose. JSON encoding already escapes + values, so an embedded string cannot break out of its field. + """ + return json.dumps(payload, indent=2, default=str) + "\n" + + +def status_payload(conn: sqlite3.Connection, current_agent_id: str) -> dict: + """Structured equivalent of :func:`render_compact` for ``status --json``. + + Lets an agent decide coordination questions from structure instead of parsing + English: which agents are active, which paths/resources are locked, which + tasks are open or blocked by dependencies, and how many messages are unread. + """ + from . import locks as locks_mod + + moment = db.now() + agents = _coordinating_agents(conn) + active = [a for a in agents if db.effective_status(a, at=moment) == "active"] + me = db.get_agent(conn, current_agent_id) + live_locks = locks_mod.list_locks(conn, at=moment) + all_tasks = tasks.list_tasks(conn) + open_tasks = [t for t in all_tasks if t.status in TASK_OPEN_STATUSES] + unread = messages.inbox(conn, current_agent_id, unread_only=True) + + return { + "you": { + "id": current_agent_id, + "name": me.name if me else None, + "status": db.effective_status(me, at=moment) if me else None, + "registered": me is not None, + "current_task_id": me.current_task_id if me else None, + }, + "active_agent_count": len(active), + "agents": [ + { + "id": a.id, + "name": a.name, + "role": a.role, + "status": db.effective_status(a, at=moment), + } + for a in agents + ], + "locks": [lock_payload(conn, lk) for lk in live_locks], + "open_task_count": len(open_tasks), + "tasks": [task_payload(conn, t) for t in all_tasks], + "unread": len(unread), + } + + +def lock_payload(conn: sqlite3.Connection, lock) -> dict: + """Structured view of one lock, including the owner's display name.""" + owner = db.get_agent(conn, lock.owner_agent_id) + data = lock.as_dict() + data["owner_name"] = owner.name if owner else lock.owner_agent_id + return data + + +def task_payload(conn: sqlite3.Connection, task: Task) -> dict: + """Structured view of one task, including files, deps and dep-block state.""" + data = task.as_dict() + data["files"] = tasks.task_files(conn, task.id) + data["depends_on"] = tasks.task_dependencies(conn, task.id) + data["blocked_by_deps"] = bool(tasks.unmet_dependencies(conn, task.id)) + return data + + +def message_payload(conn: sqlite3.Connection, msg: Message) -> dict: + """Structured view of one message, including the sender's display name.""" + sender = db.get_agent(conn, msg.sender_agent_id) + data = msg.as_dict() + data["sender_name"] = sender.name if sender else msg.sender_agent_id + return data + + def render_compact(conn: sqlite3.Connection, current_agent_id: str) -> str: """Terse Markdown summary for injection into a Claude Code session. diff --git a/src/agent_sync/tasks.py b/src/agent_sync/tasks.py index 717f345..4c8f5ce 100644 --- a/src/agent_sync/tasks.py +++ b/src/agent_sync/tasks.py @@ -11,8 +11,8 @@ import sqlite3 from collections.abc import Sequence -from . import db -from .errors import NotFound, TaskConflict +from . import db, locks, paths +from .errors import LockConflict, NotFound, TaskConflict from .models import ( TASK_BLOCKED, TASK_CANCELLED, @@ -22,6 +22,10 @@ Task, ) +# A dependency no longer blocks once it is done or cancelled (cancelled work will +# never complete, so waiting on it forever would deadlock the dependent). +_DEP_RESOLVED_STATUSES = (TASK_DONE, TASK_CANCELLED) + def create_task( conn: sqlite3.Connection, @@ -30,11 +34,18 @@ def create_task( description: str | None = None, files: Sequence[str] | None = None, priority: int = 0, + depends_on: Sequence[str] | None = None, ) -> Task: - """Create a pending task and associate any *files* with it.""" + """Create a pending task and associate any *files* and dependencies with it. + + *depends_on* entries are task ids or titles, resolved up front so a typo + fails fast (:class:`NotFound`). The task is claimable only once every + dependency is done/cancelled (see :func:`unmet_dependencies`). + """ task_id = db.new_id("task") ts = db.now_iso() with db.transaction(conn): + dep_ids = [find_task(conn, dep).id for dep in depends_on or ()] conn.execute( """INSERT INTO tasks (id, title, description, status, owner_agent_id, priority, @@ -47,6 +58,12 @@ def create_task( "INSERT INTO task_files (task_id, file_path) VALUES (?, ?)", (task_id, path), ) + for dep_id in dep_ids: + conn.execute( + "INSERT OR IGNORE INTO task_deps (task_id, depends_on_id) " + "VALUES (?, ?)", + (task_id, dep_id), + ) return get_task(conn, task_id) @@ -104,11 +121,90 @@ def task_files(conn: sqlite3.Connection, task_id: str) -> list[str]: return [r["file_path"] for r in rows] -def claim_task(conn: sqlite3.Connection, agent_id: str, identifier: str) -> Task: +def task_dependencies(conn: sqlite3.Connection, task_id: str) -> list[str]: + """Ids of the tasks *task_id* depends on (must finish first).""" + rows = conn.execute( + "SELECT depends_on_id FROM task_deps WHERE task_id = ? " + "ORDER BY depends_on_id", + (task_id,), + ).fetchall() + return [r["depends_on_id"] for r in rows] + + +def unmet_dependencies(conn: sqlite3.Connection, task_id: str) -> list[Task]: + """Dependencies of *task_id* that are not yet resolved (done/cancelled). + + A dangling dependency id (the referenced task was deleted) is treated as met + rather than blocking forever. + """ + unmet: list[Task] = [] + for dep_id in task_dependencies(conn, task_id): + row = conn.execute("SELECT * FROM tasks WHERE id = ?", (dep_id,)).fetchone() + if row is None: + continue + dep = Task.from_row(row) + if dep.status not in _DEP_RESOLVED_STATUSES: + unmet.append(dep) + return unmet + + +def dependents_unblocked_by(conn: sqlite3.Connection, task_id: str) -> list[Task]: + """Pending tasks that depend on *task_id* and now have all deps resolved. + + Used after completing a task to surface work that just became claimable; + "blocked by deps" is computed, so no status flip is needed to unblock them. + """ + rows = conn.execute( + "SELECT task_id FROM task_deps WHERE depends_on_id = ?", (task_id,) + ).fetchall() + unblocked: list[Task] = [] + for r in rows: + dependent_id = r["task_id"] + if unmet_dependencies(conn, dependent_id): + continue + row = conn.execute( + "SELECT * FROM tasks WHERE id = ?", (dependent_id,) + ).fetchone() + if row is not None and row["status"] == TASK_PENDING: + unblocked.append(Task.from_row(row)) + return unblocked + + +def lock_task_files( + conn: sqlite3.Connection, agent_id: str, task_id: str +) -> tuple[list[str], list[str]]: + """Best-effort: lock every file associated with *task_id* for *agent_id*. + + Paths are normalized to the same canonical form the PreToolUse hook checks, so + the locks actually guard edits. Returns ``(locked, conflicts)`` where + *conflicts* holds human-readable messages for files another active agent + already holds — the caller keeps the claim and just warns. + """ + locked: list[str] = [] + conflicts: list[str] = [] + for raw in task_files(conn, task_id): + norm = paths.normalize_repo_path(raw) + try: + locks.acquire_lock(conn, agent_id, norm, reason=f"task {task_id}") + locked.append(norm) + except LockConflict as exc: + conflicts.append(exc.message) + return locked, conflicts + + +def claim_task( + conn: sqlite3.Connection, + agent_id: str, + identifier: str, + *, + force: bool = False, +) -> Task: """Claim a task for *agent_id*, moving it to ``in_progress``. Refuses if the task is already owned by a *different active* agent. Claiming a task you already own, or one whose previous owner has gone stale, is fine. + A task with unfinished dependencies is refused unless *force* is set, so work + is not started out of order by accident. """ moment = db.now() with db.transaction(conn): @@ -117,6 +213,14 @@ def claim_task(conn: sqlite3.Connection, agent_id: str, identifier: str) -> Task raise TaskConflict( f"Task {task.id} is already {task.status} and cannot be claimed" ) + if not force: + unmet = unmet_dependencies(conn, task.id) + if unmet: + names = ", ".join(f"{t.id} ({t.title!r})" for t in unmet) + raise TaskConflict( + f"Task {task.id} depends on unfinished task(s): {names}. " + f"Use --force to claim anyway." + ) if task.owner_agent_id and task.owner_agent_id != agent_id: owner = db.get_agent(conn, task.owner_agent_id) if db.is_active(owner, at=moment): @@ -153,8 +257,10 @@ def claim_next_task( A task is *available* when it is ``pending`` (nobody owns it) or — when *include_abandoned* is set — ``in_progress`` but its owner is no longer an active agent, so a crashed session's work is automatically redistributed. - ``blocked``, ``done`` and ``cancelled`` tasks are never auto-claimed, and a - task already owned by *agent_id* is skipped (you already have it). + ``blocked``, ``done`` and ``cancelled`` tasks are never auto-claimed, a task + with an unfinished dependency is skipped (and becomes claimable automatically + once the dependency completes), and a task already owned by *agent_id* is + skipped (you already have it). Returns the claimed :class:`Task`, or ``None`` when nothing is available. """ @@ -170,6 +276,8 @@ def claim_next_task( task = Task.from_row(row) if task.owner_agent_id == agent_id: continue # already mine — nothing to hand over + if unmet_dependencies(conn, task.id): + continue # blocked by an unfinished dependency if not task.owner_agent_id: chosen = task # unowned pending work: take it break diff --git a/tests/conftest.py b/tests/conftest.py index 2e691f1..569ca46 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -24,6 +24,8 @@ def repo(tmp_path, monkeypatch): monkeypatch.delenv("CLAUDE_SESSION_ID", raising=False) monkeypatch.delenv("AGENT_SYNC_AUTO_RELEASE_LOCKS", raising=False) monkeypatch.delenv("AGENT_SYNC_AUTO_CLAIM", raising=False) + monkeypatch.delenv("AGENT_SYNC_STALE_MINUTES", raising=False) + monkeypatch.delenv("AGENT_SYNC_OFFLINE_MINUTES", raising=False) return tmp_path diff --git a/tests/test_append.py b/tests/test_append.py new file mode 100644 index 0000000..a49f880 --- /dev/null +++ b/tests/test_append.py @@ -0,0 +1,47 @@ +"""Atomic shared-file append (``agent-sync append``).""" + +from __future__ import annotations + +from agent_sync import cli, locks + + +def test_append_creates_and_extends_file(repo, monkeypatch, capsys): + monkeypatch.setenv("AGENT_SYNC_ID", "agent-a") + assert cli.main(["append", "shared.txt", "--content", "line one"]) == 0 + assert cli.main(["append", "shared.txt", "--content", "line two"]) == 0 + target = repo / "shared.txt" + assert target.read_text(encoding="utf-8") == "line one\nline two\n" + + +def test_append_releases_lock_afterwards(repo, monkeypatch): + monkeypatch.setenv("AGENT_SYNC_ID", "agent-a") + assert cli.main(["append", "shared.txt", "--content", "x"]) == 0 + # The append-time lock must not linger, or a different agent could not edit. + from agent_sync import db, paths + + conn = db.connect() + try: + norm = paths.normalize_repo_path("shared.txt") + assert locks.active_lock_for(conn, norm) is None + finally: + conn.close() + + +def test_append_no_newline_flag(repo, monkeypatch): + monkeypatch.setenv("AGENT_SYNC_ID", "agent-a") + assert cli.main(["append", "shared.txt", "--content", "a", "--no-newline"]) == 0 + assert cli.main(["append", "shared.txt", "--content", "b", "--no-newline"]) == 0 + assert (repo / "shared.txt").read_text(encoding="utf-8") == "ab" + + +def test_append_blocked_by_active_other_agent_lock(repo, monkeypatch): + monkeypatch.setenv("AGENT_SYNC_ID", "agent-a") + cli.main(["register", "--name", "a"]) + # agent-a holds the lock; agent-b's append should fail closed (exit 2) + # without waiting because no --wait was given. + assert cli.main(["lock", "shared.txt", "--reason", "busy"]) == 0 + monkeypatch.setenv("AGENT_SYNC_ID", "agent-b") + cli.main(["register", "--name", "b"]) + assert cli.main(["append", "shared.txt", "--content", "nope"]) == 2 + # The file was never written because the lock was never acquired. + assert not (repo / "shared.txt").exists() diff --git a/tests/test_cli_json.py b/tests/test_cli_json.py new file mode 100644 index 0000000..3cbee82 --- /dev/null +++ b/tests/test_cli_json.py @@ -0,0 +1,69 @@ +"""Machine-readable ``--json`` output for status/locks/inbox/tasks.""" + +from __future__ import annotations + +import json + +from agent_sync import cli + + +def _json_out(capsys): + return json.loads(capsys.readouterr().out) + + +def test_status_json_is_structured(repo, monkeypatch, capsys): + monkeypatch.setenv("AGENT_SYNC_ID", "agent-a") + cli.main(["register", "--name", "alice", "--role", "backend"]) + cli.main(["lock", "src/app.py", "--reason", "editing"]) + capsys.readouterr() + + assert cli.main(["status", "--json"]) == 0 + data = _json_out(capsys) + assert data["you"]["name"] == "alice" + assert data["you"]["registered"] is True + assert data["active_agent_count"] >= 1 + assert any(lk["file_path"] == "src/app.py" for lk in data["locks"]) + + +def test_locks_json_distinguishes_resource(repo, monkeypatch, capsys): + monkeypatch.setenv("AGENT_SYNC_ID", "agent-a") + cli.main(["register", "--name", "alice"]) + cli.main(["lock", "src/app.py"]) + cli.main(["lock", "--resource", "db-migrations"]) + capsys.readouterr() + + assert cli.main(["locks", "--json"]) == 0 + data = _json_out(capsys) + kinds = {lk["file_path"]: lk["kind"] for lk in data} + assert kinds["src/app.py"] == "file" + assert kinds["db-migrations"] == "resource" + assert all("owner_name" in lk for lk in data) + + +def test_tasks_json_reports_dependency_block(repo, monkeypatch, capsys): + monkeypatch.setenv("AGENT_SYNC_ID", "agent-a") + cli.main(["create-task", "Backend"]) + cli.main(["create-task", "Frontend", "--depends-on", "Backend"]) + capsys.readouterr() + + assert cli.main(["tasks", "--json"]) == 0 + data = _json_out(capsys) + by_title = {t["title"]: t for t in data} + assert by_title["Frontend"]["blocked_by_deps"] is True + assert by_title["Backend"]["blocked_by_deps"] is False + assert by_title["Frontend"]["depends_on"] == [by_title["Backend"]["id"]] + + +def test_inbox_json_lists_messages(repo, monkeypatch, capsys): + monkeypatch.setenv("AGENT_SYNC_ID", "agent-a") + cli.main(["register", "--name", "alice"]) + monkeypatch.setenv("AGENT_SYNC_ID", "agent-b") + cli.main(["register", "--name", "bob"]) + cli.main(["send", "--to", "alice", "--message", "hi alice"]) + monkeypatch.setenv("AGENT_SYNC_ID", "agent-a") + capsys.readouterr() + + assert cli.main(["inbox", "--json"]) == 0 + data = _json_out(capsys) + assert [m["body"] for m in data] == ["hi alice"] + assert data[0]["sender_name"] == "bob" diff --git a/tests/test_cli_status.py b/tests/test_cli_status.py index 1a3b043..9144a51 100644 --- a/tests/test_cli_status.py +++ b/tests/test_cli_status.py @@ -54,3 +54,25 @@ def test_no_command_prints_help(repo, capsys): assert cli.main([]) == 1 out = capsys.readouterr().out assert "usage:" in out.lower() + + +def test_whoami_reports_identity_and_source(repo, monkeypatch, capsys): + monkeypatch.setenv("AGENT_SYNC_ID", "agent-x") + cli.main(["register", "--name", "frontend", "--role", "React UI"]) + capsys.readouterr() + assert cli.main(["whoami"]) == 0 + out = capsys.readouterr().out + assert "agent-x" in out + assert "AGENT_SYNC_ID" in out # tells the agent it is acting under an explicit id + assert "frontend" in out + + +def test_whoami_json_when_unregistered(repo, monkeypatch, capsys): + import json + + monkeypatch.setenv("AGENT_SYNC_ID", "agent-x") + assert cli.main(["whoami", "--json"]) == 0 + data = json.loads(capsys.readouterr().out) + assert data["id"] == "agent-x" + assert data["registered"] is False + assert "AGENT_SYNC_ID" in data["resolved_via"] diff --git a/tests/test_db.py b/tests/test_db.py index e186696..a40a108 100644 --- a/tests/test_db.py +++ b/tests/test_db.py @@ -2,6 +2,8 @@ from __future__ import annotations +import sqlite3 + from agent_sync import db, paths, tasks @@ -67,6 +69,44 @@ def test_resolve_agent_id_hook_payload_matches_skill_env(repo, monkeypatch): assert from_env == from_hook +def test_identity_source_reports_explicit_env(repo, monkeypatch): + monkeypatch.setenv("AGENT_SYNC_ID", "x") + assert "AGENT_SYNC_ID" in db.identity_source() + + +def test_identity_source_reports_session(repo, monkeypatch): + monkeypatch.setenv("CLAUDE_CODE_SESSION_ID", "sess") + assert "session" in db.identity_source().lower() + + +def test_identity_source_reports_local_file_fallback(repo): + # repo fixture clears AGENT_SYNC_ID and the session env vars. + assert "local file" in db.identity_source().lower() + + +def test_stale_threshold_is_env_configurable(conn, make_agent, age_agent, monkeypatch): + make_agent("a") + age_agent("a", minutes=5) + assert db.effective_status(db.get_agent(conn, "a")) == "active" # default 15 min + monkeypatch.setenv("AGENT_SYNC_STALE_MINUTES", "3") + assert db.effective_status(db.get_agent(conn, "a")) == "stale" # 5 >= 3 + + +def test_offline_threshold_is_env_configurable(conn, make_agent, age_agent, monkeypatch): + make_agent("a") + age_agent("a", minutes=30) + monkeypatch.setenv("AGENT_SYNC_OFFLINE_MINUTES", "20") + assert db.effective_status(db.get_agent(conn, "a")) == "offline" # 30 >= 20 + + +def test_invalid_threshold_env_falls_back_to_default(conn, make_agent, age_agent, monkeypatch): + monkeypatch.setenv("AGENT_SYNC_STALE_MINUTES", "not-a-number") + make_agent("a") + age_agent("a", minutes=5) + # Garbage value is ignored, so the 15-minute default still applies. + assert db.effective_status(db.get_agent(conn, "a")) == "active" + + def test_timestamps_are_utc_iso_round_trip(): iso = db.now_iso() parsed = db.parse_iso(iso) @@ -77,3 +117,38 @@ def test_timestamps_are_utc_iso_round_trip(): def test_parse_iso_tolerates_z_suffix(): parsed = db.parse_iso("2026-01-01T00:00:00Z") assert parsed.utcoffset().total_seconds() == 0 + + +def test_migration_upgrades_pre_existing_database_in_place(repo): + # Simulate a database created by an older agent-sync: a ``locks`` table + # without the ``kind`` column and no ``task_deps`` table at all. + paths.ensure_coordination_dir() + db_file = paths.db_path() + raw = sqlite3.connect(str(db_file)) + raw.executescript( + "CREATE TABLE locks (file_path TEXT PRIMARY KEY, owner_agent_id TEXT " + "NOT NULL, reason TEXT, created_at TEXT NOT NULL, expires_at TEXT NOT NULL);" + "INSERT INTO locks VALUES ('a.py','x',NULL,'2026-01-01T00:00:00+00:00'," + "'2030-01-01T00:00:00+00:00');" + ) + raw.commit() + raw.close() + + conn = db.connect() # runs init_db -> _migrate + try: + + def cols(table): + return {r["name"] for r in conn.execute(f"PRAGMA table_info({table})")} + + assert "kind" in cols("locks") + assert {"reply_to", "acked_at"} <= cols("messages") + assert db.table_exists(conn, "task_deps") + # Existing data is preserved; the new column takes its default. + row = conn.execute("SELECT kind FROM locks WHERE file_path = 'a.py'").fetchone() + assert row["kind"] == "file" + # Re-running is a no-op (idempotent). + before = cols("locks") + db.init_db(conn) + assert cols("locks") == before + finally: + conn.close() diff --git a/tests/test_hooks_session_start.py b/tests/test_hooks_session_start.py index f08120c..05e4298 100644 --- a/tests/test_hooks_session_start.py +++ b/tests/test_hooks_session_start.py @@ -4,7 +4,7 @@ import io -from agent_sync import db, hooks, tasks +from agent_sync import db, hooks, locks, tasks def _run(payload=None, *, conn): @@ -76,3 +76,14 @@ def test_auto_claim_is_noop_when_no_tasks(conn, monkeypatch): def test_session_start_never_blocks_on_empty_payload(conn): code, _ = _run({}, conn=conn) assert code == 0 + + +def test_session_start_gcs_stale_locks(conn, monkeypatch): + monkeypatch.setenv("AGENT_SYNC_ID", "me") + # A previous (now crashed) session left an expired lock behind. + db.ensure_agent(conn, "ghost") + locks.acquire_lock(conn, "ghost", "src/x.py", ttl_minutes=-1) + code, _ = _run(conn=conn) + assert code == 0 + # gc ran at session start, so the stale lock is gone for the new session. + assert locks.list_locks(conn, include_expired=True) == [] diff --git a/tests/test_locks.py b/tests/test_locks.py index 76a9851..1dcfc59 100644 --- a/tests/test_locks.py +++ b/tests/test_locks.py @@ -6,6 +6,7 @@ from agent_sync import locks from agent_sync.errors import LockConflict +from agent_sync.models import LOCK_FILE, LOCK_RESOURCE def test_lock_file(conn, make_agent): @@ -83,3 +84,58 @@ def test_list_locks_only_shows_live_by_default(conn, make_agent): live = locks.list_locks(conn) assert [lock.file_path for lock in live] == ["live.ts"] assert len(locks.list_locks(conn, include_expired=True)) == 2 + + +# --- named / resource locks ------------------------------------------------- +def test_resource_lock_kind_is_persisted(conn, make_agent): + make_agent("agent-a") + lock = locks.acquire_lock(conn, "agent-a", "db-migrations", kind=LOCK_RESOURCE) + assert lock.kind == LOCK_RESOURCE and lock.is_resource + [stored] = locks.list_locks(conn) + assert stored.kind == LOCK_RESOURCE + + +def test_resource_lock_does_not_block_unrelated_file(conn, make_agent): + make_agent("agent-a") + make_agent("agent-b") + locks.acquire_lock(conn, "agent-a", "db-migrations", kind=LOCK_RESOURCE) + # An unrelated file is still freely lockable by someone else. + other = locks.acquire_lock(conn, "agent-b", "src/app.py") + assert other.kind == LOCK_FILE + # But the same resource key conflicts for a different active agent. + with pytest.raises(LockConflict): + locks.acquire_lock(conn, "agent-b", "db-migrations", kind=LOCK_RESOURCE) + + +# --- blocking acquire (lock --wait) ----------------------------------------- +def test_acquire_blocking_returns_immediately_when_free(conn, make_agent): + make_agent("agent-a") + lock = locks.acquire_lock_blocking(conn, "agent-a", "f.py", wait_seconds=5) + assert lock.owner_agent_id == "agent-a" + + +def test_acquire_blocking_times_out_then_raises(conn, make_agent): + make_agent("agent-a") + make_agent("agent-b") + locks.acquire_lock(conn, "agent-a", "f.py") + with pytest.raises(LockConflict): + locks.acquire_lock_blocking( + conn, "agent-b", "f.py", wait_seconds=0.05, poll_seconds=0.01 + ) + + +def test_acquire_blocking_succeeds_when_holder_releases(conn, make_agent, monkeypatch): + make_agent("agent-a") + make_agent("agent-b") + locks.acquire_lock(conn, "agent-a", "f.py") + + # Simulate the holder releasing while agent-b is waiting: the first poll + # sleep frees the lock, so the next retry acquires it. + def fake_sleep(_seconds): + locks.release_lock(conn, "agent-a", "f.py", force=True) + + monkeypatch.setattr(locks.time, "sleep", fake_sleep) + lock = locks.acquire_lock_blocking( + conn, "agent-b", "f.py", wait_seconds=5, poll_seconds=0.01 + ) + assert lock.owner_agent_id == "agent-b" diff --git a/tests/test_messages.py b/tests/test_messages.py index c6f30e7..e51c9c2 100644 --- a/tests/test_messages.py +++ b/tests/test_messages.py @@ -2,7 +2,10 @@ from __future__ import annotations +import pytest + from agent_sync import messages +from agent_sync.errors import NotFound def test_send_and_receive_direct_by_name(conn, make_agent): @@ -144,3 +147,42 @@ def test_delivery_is_independent_of_read_state(conn, make_agent): messages.mark_delivered(conn, "agent-b", [msg.id]) # Pushed into context, but not explicitly acknowledged: still "unread". assert messages.unread_count(conn, "agent-b") == 1 + + +# --- reply threading & acknowledgement -------------------------------------- +def test_send_with_reply_to_threads_message(conn, make_agent): + make_agent("agent-a", name="alice") + make_agent("agent-b", name="bob") + parent = messages.send_message(conn, "agent-a", "bob", "ping") + reply = messages.send_message(conn, "agent-b", "alice", "pong", reply_to=parent.id) + assert reply.reply_to == parent.id + + +def test_send_reply_to_unknown_message_raises(conn, make_agent): + make_agent("agent-a", name="alice") + with pytest.raises(NotFound): + messages.send_message(conn, "agent-a", "all", "x", reply_to="msg-nope") + + +def test_ack_sets_acked_at(conn, make_agent): + make_agent("agent-a", name="alice") + make_agent("agent-b", name="bob") + msg = messages.send_message(conn, "agent-a", "bob", "hi") + assert msg.acked_at is None + acked = messages.ack_message(conn, "agent-b", msg.id) + assert acked.acked_at is not None + + +def test_ack_is_idempotent(conn, make_agent): + make_agent("agent-a", name="alice") + make_agent("agent-b", name="bob") + msg = messages.send_message(conn, "agent-a", "bob", "hi") + first = messages.ack_message(conn, "agent-b", msg.id) + second = messages.ack_message(conn, "agent-b", msg.id) + assert first.acked_at == second.acked_at + + +def test_ack_unknown_message_raises(conn, make_agent): + make_agent("agent-a", name="alice") + with pytest.raises(NotFound): + messages.ack_message(conn, "agent-a", "msg-nope") diff --git a/tests/test_paths.py b/tests/test_paths.py new file mode 100644 index 0000000..ffabda1 --- /dev/null +++ b/tests/test_paths.py @@ -0,0 +1,51 @@ +"""Repo-root resolution, including the git-worktree shared-DB fix.""" + +from __future__ import annotations + +import shutil +import subprocess + +import pytest + +from agent_sync import paths + +requires_git = pytest.mark.skipif( + shutil.which("git") is None, reason="git is not installed" +) + + +def _git(args, cwd): + subprocess.run( + ["git", *args], cwd=str(cwd), check=True, capture_output=True, text=True + ) + + +def test_agent_sync_root_overrides_everything(tmp_path, monkeypatch): + monkeypatch.setenv("AGENT_SYNC_ROOT", str(tmp_path)) + assert paths.repo_root(tmp_path / "deep" / "nested").resolve() == tmp_path.resolve() + + +def test_claude_marker_dir_is_a_root(tmp_path, monkeypatch): + monkeypatch.delenv("AGENT_SYNC_ROOT", raising=False) + (tmp_path / ".claude").mkdir() + assert paths.repo_root(tmp_path).resolve() == tmp_path.resolve() + + +@requires_git +def test_linked_worktree_shares_main_worktree_db(tmp_path, monkeypatch): + monkeypatch.delenv("AGENT_SYNC_ROOT", raising=False) + main = tmp_path / "main" + main.mkdir() + _git(["init"], main) + _git(["config", "user.email", "t@example.com"], main) + _git(["config", "user.name", "tester"], main) + _git(["commit", "--allow-empty", "-m", "init"], main) + + linked = tmp_path / "linked" + _git(["worktree", "add", str(linked)], main) + + # The main worktree resolves to itself; the linked worktree must resolve to + # the main one so both share a single coordination database. + assert paths.repo_root(main).resolve() == main.resolve() + assert paths.repo_root(linked).resolve() == main.resolve() + assert paths.db_path(linked).resolve() == paths.db_path(main).resolve() diff --git a/tests/test_tasks.py b/tests/test_tasks.py index 176cbde..ad17f7e 100644 --- a/tests/test_tasks.py +++ b/tests/test_tasks.py @@ -4,8 +4,8 @@ import pytest -from agent_sync import db, tasks -from agent_sync.errors import TaskConflict +from agent_sync import db, locks, tasks +from agent_sync.errors import NotFound, TaskConflict def test_create_task_with_files(conn): @@ -134,3 +134,83 @@ def test_claim_next_skips_blocked_and_done(conn, make_agent): tasks.claim_task(conn, "agent-a", t2.id) tasks.complete_task(conn, "agent-a", t2.id) assert tasks.claim_next_task(conn, "agent-a") is None + + +# --- dependencies ----------------------------------------------------------- +def test_create_task_records_dependencies(conn): + dep = tasks.create_task(conn, "Dep") + main = tasks.create_task(conn, "Main", depends_on=[dep.id]) + assert tasks.task_dependencies(conn, main.id) == [dep.id] + assert [d.id for d in tasks.unmet_dependencies(conn, main.id)] == [dep.id] + + +def test_create_task_with_unknown_dependency_raises(conn): + with pytest.raises(NotFound): + tasks.create_task(conn, "Main", depends_on=["does-not-exist"]) + + +def test_depends_on_accepts_titles(conn): + tasks.create_task(conn, "Backend API") + main = tasks.create_task(conn, "Frontend", depends_on=["Backend API"]) + assert len(tasks.task_dependencies(conn, main.id)) == 1 + + +def test_claim_next_skips_task_blocked_by_dependency(conn, make_agent): + make_agent("agent-a") + dep = tasks.create_task(conn, "Dep", priority=0) + tasks.create_task(conn, "Blocked", depends_on=[dep.id], priority=10) + # 'Blocked' outranks 'Dep' on priority but is skipped until the dep is done. + claimed = tasks.claim_next_task(conn, "agent-a") + assert claimed is not None and claimed.id == dep.id + + +def test_completing_dependency_unblocks_dependent(conn, make_agent): + make_agent("agent-a") + dep = tasks.create_task(conn, "Dep") + dependent = tasks.create_task(conn, "Dependent", depends_on=[dep.id]) + tasks.claim_task(conn, "agent-a", dep.id) + tasks.complete_task(conn, "agent-a", dep.id) + assert tasks.unmet_dependencies(conn, dependent.id) == [] + assert [t.id for t in tasks.dependents_unblocked_by(conn, dep.id)] == [dependent.id] + # And it is now auto-claimable. + claimed = tasks.claim_next_task(conn, "agent-a") + assert claimed is not None and claimed.id == dependent.id + + +def test_claim_task_refuses_unmet_dependency_without_force(conn, make_agent): + make_agent("agent-a") + dep = tasks.create_task(conn, "Dep") + dependent = tasks.create_task(conn, "Dependent", depends_on=[dep.id]) + with pytest.raises(TaskConflict): + tasks.claim_task(conn, "agent-a", dependent.id) + forced = tasks.claim_task(conn, "agent-a", dependent.id, force=True) + assert forced.status == "in_progress" + + +def test_cancelled_dependency_does_not_block(conn, make_agent): + make_agent("agent-a") + dep = tasks.create_task(conn, "Dep") + dependent = tasks.create_task(conn, "Dependent", depends_on=[dep.id]) + with db.transaction(conn): + conn.execute("UPDATE tasks SET status = 'cancelled' WHERE id = ?", (dep.id,)) + assert tasks.unmet_dependencies(conn, dependent.id) == [] + + +# --- auto-lock task files on claim ------------------------------------------ +def test_lock_task_files_locks_associated_files(conn, make_agent): + make_agent("agent-a") + task = tasks.create_task(conn, "T", files=["src/a.py", "src/b.py"]) + tasks.claim_task(conn, "agent-a", task.id) + locked, conflicts = tasks.lock_task_files(conn, "agent-a", task.id) + assert set(locked) == {"src/a.py", "src/b.py"} + assert conflicts == [] + + +def test_lock_task_files_reports_conflicts_without_failing(conn, make_agent): + make_agent("agent-a") + make_agent("agent-b") + locks.acquire_lock(conn, "agent-b", "src/a.py") # b already holds it + task = tasks.create_task(conn, "T", files=["src/a.py"]) + locked, conflicts = tasks.lock_task_files(conn, "agent-a", task.id) + assert locked == [] + assert len(conflicts) == 1