Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 67 additions & 7 deletions src/bcli/batch/ledger.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
``list_runs`` and ``batch state`` both call this on read so the operator
always sees the truthful state, not a stale stamp.

Schema (version 1)
Schema (version 2)
------------------
::

Expand All @@ -63,13 +63,21 @@
status TEXT, -- "committed" | "failed" | "rollback_skipped" | "rolled_back" | "unknown"
bc_correlation_id TEXT,
error_message TEXT,
rollback_url TEXT
rollback_url TEXT,
idempotency_key TEXT -- v2 (AIP §Phase 4d)
);
CREATE TABLE schema_version (version INTEGER);

The step ``status`` column is intentionally not constrained — rollback
introduces transient states (e.g. ``rollback_skipped``) we don't want to
keep adding to a CHECK enum.

Migrations
----------
``_ensure_schema`` inspects ``schema_version`` on every connect. v1
ledgers get an ``ALTER TABLE step ADD COLUMN idempotency_key TEXT`` and
a version bump — non-destructive, preserves all existing rows. New
ledgers get the column inline.
"""

from __future__ import annotations
Expand All @@ -80,7 +88,7 @@
from pathlib import Path
from typing import Any

SCHEMA_VERSION = 1
SCHEMA_VERSION = 2

# Allowed values for ``run.state``. Mirrors the contract doc §Phase 3
# enum. Step-level statuses are deliberately *not* enforced — they
Expand Down Expand Up @@ -232,7 +240,8 @@ def _ensure_schema(conn: sqlite3.Connection) -> None:
status TEXT,
bc_correlation_id TEXT,
error_message TEXT,
rollback_url TEXT
rollback_url TEXT,
idempotency_key TEXT
)
""",
)
Expand All @@ -245,6 +254,21 @@ def _ensure_schema(conn: sqlite3.Connection) -> None:
conn.execute(
"INSERT INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,)
)
else:
# Migration: v1 → v2 adds the idempotency_key column. Inspect
# the live table (rather than just the version row) because a
# rolled-out v2 client may have created the table inline
# already — additive-only ALTER is the safe path either way.
existing_version = int(existing[0])
if existing_version < 2:
cols = {row[1] for row in conn.execute("PRAGMA table_info(step)")}
if "idempotency_key" not in cols:
conn.execute(
"ALTER TABLE step ADD COLUMN idempotency_key TEXT"
)
conn.execute(
"UPDATE schema_version SET version = ?", (SCHEMA_VERSION,)
)

# ── Lifecycle ───────────────────────────────────────────────────

Expand Down Expand Up @@ -307,24 +331,60 @@ def write_intent(
method: str,
url: str,
body_hash: str | None,
idempotency_key: str | None = None,
) -> int:
"""Insert the *intent* row for a step and return its ``step_id``.

This is the row that survives a SIGKILL — we know we *tried* the
HTTP call. The matching ``write_outcome`` flips the state once
the response (or an exception) comes back.

``idempotency_key`` (AIP §Phase 4d) is the optional opaque token
the caller wants to associate with this step. Stored verbatim so
a same-run replay can be detected via
:meth:`find_committed_idempotent_step`.
"""
conn = self._connect()
cur = conn.execute(
"""
INSERT INTO step (
run_id, seq, intent_ts, method, url, body_hash
) VALUES (?, ?, ?, ?, ?, ?)
run_id, seq, intent_ts, method, url, body_hash, idempotency_key
) VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(self.run_id, seq, _utc_now_iso(), method, url, body_hash),
(self.run_id, seq, _utc_now_iso(), method, url, body_hash,
idempotency_key),
)
return int(cur.lastrowid)

def find_committed_idempotent_step(
self, idempotency_key: str
) -> dict[str, Any] | None:
"""Return the prior committed step (if any) with this key in this run.

AIP §Phase 4d — same-run replay protection. A new write that
carries an idempotency_key already in the ``committed`` state
should be refused (the agent retried; the prior call landed).

Cross-run collision detection is deliberately out of scope for
v0.1: implementing it requires scanning every ``*.db`` under
``batch/`` on each call. Document deferral in the PR.
"""
if not idempotency_key:
return None
conn = self._connect()
row = conn.execute(
"""
SELECT * FROM step
WHERE run_id = ?
AND idempotency_key = ?
AND status = 'committed'
ORDER BY seq ASC
LIMIT 1
""",
(self.run_id, idempotency_key),
).fetchone()
return dict(row) if row is not None else None

def write_outcome(
self,
*,
Expand Down
30 changes: 25 additions & 5 deletions src/bcli/client/_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,11 +168,20 @@ async def post(
publisher: str | None = None,
group: str | None = None,
version: str | None = None,
idempotency_key: str | None = None,
) -> dict[str, Any]:
"""POST (create) a record."""
"""POST (create) a record.

``idempotency_key`` (AIP §Phase 4d) is forwarded as the IETF
``Idempotency-Key`` HTTP header. BC may not consume it today;
it lets reverse proxies / gateways apply replay protection and
keeps the ledger key consistent with what was wired out.
"""
transport = self._ensure_transport()
url = self._resolve_url(entity_set_name, publisher=publisher, group=group, version=version)
return await transport.post(url, json_body=body)
return await transport.post(
url, json_body=body, idempotency_key=idempotency_key,
)

async def patch(
self,
Expand All @@ -184,14 +193,17 @@ async def patch(
publisher: str | None = None,
group: str | None = None,
version: str | None = None,
idempotency_key: str | None = None,
) -> dict[str, Any]:
"""PATCH (update) a record."""
transport = self._ensure_transport()
url = self._resolve_url(
entity_set_name, record_id=record_id,
publisher=publisher, group=group, version=version,
)
return await transport.patch(url, json_body=body, etag=etag)
return await transport.patch(
url, json_body=body, etag=etag, idempotency_key=idempotency_key,
)

async def delete(
self,
Expand All @@ -202,14 +214,17 @@ async def delete(
publisher: str | None = None,
group: str | None = None,
version: str | None = None,
idempotency_key: str | None = None,
) -> dict[str, Any]:
"""DELETE a record."""
transport = self._ensure_transport()
url = self._resolve_url(
entity_set_name, record_id=record_id,
publisher=publisher, group=group, version=version,
)
return await transport.delete(url, etag=etag)
return await transport.delete(
url, etag=etag, idempotency_key=idempotency_key,
)

async def delete_url(self, url: str, *, etag: str = "*") -> dict[str, Any]:
"""DELETE an already-resolved absolute URL.
Expand All @@ -235,6 +250,7 @@ async def upload_attachment(
group: str | None = None,
version: str | None = None,
force_standard: bool = False,
idempotency_key: str | None = None,
) -> dict[str, Any]:
"""Upload a file as a documentAttachment linked to a parent record (two-phase).

Expand Down Expand Up @@ -300,12 +316,16 @@ async def upload_attachment(
company_id=self._profile.company_id,
entity_set_name="documentAttachments",
)
metadata = await transport.post(post_url, json_body=metadata_body)
metadata = await transport.post(
post_url, json_body=metadata_body,
idempotency_key=idempotency_key,
)
else:
metadata = await self.post(
"documentAttachments",
metadata_body,
publisher=publisher, group=group, version=version,
idempotency_key=idempotency_key,
)
attachment_id = metadata.get("id") or metadata.get("systemId")
if not attachment_id:
Expand Down
46 changes: 40 additions & 6 deletions src/bcli/client/_transport.py
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ async def _request(
content: bytes | None = None,
content_type: str | None = None,
etag: str | None = None,
idempotency_key: str | None = None,
log_context: dict[str, str] | None = None,
) -> dict[str, Any]:
"""Execute an HTTP request with retry and error handling.
Expand Down Expand Up @@ -151,6 +152,13 @@ async def _request(
headers["If-Match"] = etag
if content is not None:
headers["Content-Type"] = content_type or "application/octet-stream"
if idempotency_key is not None:
# AIP §Phase 4d — IETF draft "Idempotency-Key" header.
# BC may not honor it server-side today; any gateway /
# reverse-proxy in front can apply replay protection,
# and we ledger the key so our own retry logic stays
# deterministic regardless of server support.
headers["Idempotency-Key"] = idempotency_key

logger.debug("%s %s (attempt %d)", method, url, attempt + 1)

Expand Down Expand Up @@ -294,13 +302,29 @@ async def get_absolute(self, url: str) -> dict[str, Any]:
assert_bc_origin(url)
return await self._request("GET", url)

async def post(self, url: str, *, json_body: dict[str, Any]) -> dict[str, Any]:
return await self._request("POST", url, json_body=json_body)
async def post(
self,
url: str,
*,
json_body: dict[str, Any],
idempotency_key: str | None = None,
) -> dict[str, Any]:
return await self._request(
"POST", url, json_body=json_body, idempotency_key=idempotency_key,
)

async def patch(
self, url: str, *, json_body: dict[str, Any], etag: str = "*"
self,
url: str,
*,
json_body: dict[str, Any],
etag: str = "*",
idempotency_key: str | None = None,
) -> dict[str, Any]:
return await self._request("PATCH", url, json_body=json_body, etag=etag)
return await self._request(
"PATCH", url, json_body=json_body, etag=etag,
idempotency_key=idempotency_key,
)

async def patch_binary(
self,
Expand All @@ -309,14 +333,24 @@ async def patch_binary(
content: bytes,
content_type: str = "application/octet-stream",
etag: str = "*",
idempotency_key: str | None = None,
) -> dict[str, Any]:
"""PATCH raw binary bytes with a custom Content-Type (e.g. attachments/content)."""
return await self._request(
"PATCH", url, content=content, content_type=content_type, etag=etag,
idempotency_key=idempotency_key,
)

async def delete(self, url: str, *, etag: str = "*") -> dict[str, Any]:
return await self._request("DELETE", url, etag=etag)
async def delete(
self,
url: str,
*,
etag: str = "*",
idempotency_key: str | None = None,
) -> dict[str, Any]:
return await self._request(
"DELETE", url, etag=etag, idempotency_key=idempotency_key,
)


def _parse_bc_error(response: httpx.Response) -> tuple[str | None, str | None]:
Expand Down
22 changes: 22 additions & 0 deletions src/bcli/exit_codes.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,28 @@
EXIT_POLICY = 8


# Public, ordered taxonomy. ``bcli describe`` consumes this exact map so
# AI agents can render meaningful errors from a CLI exit. The label is a
# short human-readable string — keep it stable across minor versions;
# breaking changes here are user-visible.
EXIT_CODES: dict[int, str] = {
EXIT_OK: "success",
EXIT_GENERIC_ERROR: "uncategorised error",
EXIT_USAGE: "usage error",
EXIT_AUTH: "authentication failure",
EXIT_NOT_FOUND: "not found",
EXIT_VALIDATION: "input validation",
EXIT_REMOTE_4XX: "remote 4xx",
EXIT_REMOTE_5XX: "remote 5xx",
EXIT_POLICY: "policy violation",
}


def describe_exit_code(code: int) -> str:
"""Return the short label for an exit code, or ``"unknown"``."""
return EXIT_CODES.get(code, "unknown")


def exit_code_for_status(status_code: int | None) -> int:
"""Map an HTTP status to a CLI exit code.

Expand Down
14 changes: 10 additions & 4 deletions src/bcli_cli/_envelope_wrap.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,14 +225,20 @@ def emit_success(self, *, bc_correlation_id: str | None = None) -> None:
)
_write_if_active(self._cap, env)

def emit_failure(self, exc: BaseException) -> None:
def emit_failure(
self,
exc: BaseException,
*,
exit_code: int | None = None,
) -> None:
if not self.is_active:
return
status_code = getattr(exc, "status_code", None)
correlation_id = getattr(exc, "correlation_id", None)
exit_code = exit_code_for_status(status_code)
if exit_code == EXIT_GENERIC_ERROR and status_code is None:
exit_code = EXIT_GENERIC_ERROR
if exit_code is None:
exit_code = exit_code_for_status(status_code)
if exit_code == EXIT_GENERIC_ERROR and status_code is None:
exit_code = EXIT_GENERIC_ERROR
env = _build_envelope(
self._cap,
status="failed",
Expand Down
Loading
Loading