From 2c8e49485c11d33dfe41342410c9e26ede6ea077 Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Sun, 7 Jun 2026 14:55:56 +0800 Subject: [PATCH 1/2] docs(code): document the shared budget, execute_loop, and Workflow facade MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Update the orchestration reference, examples, and tutorial (en + cn) for the new capabilities: - parallel(specs, budgetTokens?) — a shared token ledger across a fan-out, with the soft-cap caveat and the union/dict return shape. - execute_loop / LoopDecision — bounded loop-until-done with a mandatory max_iterations hard cap. - the Rust Workflow facade — session.workflow(), named resumable phases, WorkflowEvent milestones, and budget_snapshot(). --- .../docs/cn/code/examples/orchestration.mdx | 59 +++++++++- .../content/docs/cn/code/orchestration.mdx | 101 ++++++++++++++++ .../docs/en/code/examples/orchestration.mdx | 61 +++++++++- .../content/docs/en/code/orchestration.mdx | 108 ++++++++++++++++++ .../content/tutorials/cn/orchestration.mdx | 20 ++++ .../content/tutorials/en/orchestration.mdx | 22 ++++ 6 files changed, 369 insertions(+), 2 deletions(-) diff --git a/apps/docs/content/docs/cn/code/examples/orchestration.mdx b/apps/docs/content/docs/cn/code/examples/orchestration.mdx index edf303cf..1347501f 100644 --- a/apps/docs/content/docs/cn/code/examples/orchestration.mdx +++ b/apps/docs/content/docs/cn/code/examples/orchestration.mdx @@ -7,7 +7,7 @@ import { Tab, Tabs } from 'fumadocs-ui/components/tabs'; # 编排 -本页展示 A3S Code v3.4.0 中的三个可编程编排原语:用于扇出的 `session.parallel`、用于按条目执行多阶段链的 `session.pipeline`,以及用于在崩溃后仍可恢复的带日志运行的 `session.parallelResumable`。当你有多个相互独立的子代理任务时使用 parallel;当你需要让每个输入流经一组有序阶段时使用 pipeline。 +本页展示 A3S Code 中的可编程编排原语:用于扇出的 `session.parallel`、用于按条目执行多阶段链的 `session.pipeline`,以及用于在崩溃后仍可恢复的带日志运行的 `session.parallelResumable`。`parallel` 还接受一个可选的 token 预算,让整个扇出共享同一个账本。当你有多个相互独立的子代理任务时使用 parallel;当你需要让每个输入流经一组有序阶段时使用 pipeline。 关于这些原语背后的概念模型,请参阅[编排](/cn/docs/code/orchestration)。 @@ -227,11 +227,68 @@ session.close() +## 用 `session.parallel` 做预算受限的扇出 + +把 token 预算作为第二个参数传入,所有子代理就会汇入**同一个账本**。传入预算时, +`parallel` 解析为 `{ outcomes, budget }`(账本快照)而非原来的结果数组;一旦达到 +上限,之后启动的 step 会被拒绝(`success: false`)。它是软上限——宽扇出可能冲过 +上限几个在飞回合;在飞工作绝不会被强杀。 + + + + +```ts +import { Agent } from '@a3s-lab/code'; + +const agent = await Agent.create('agent.acl'); +const session = agent.session('.', {}); + +const specs = [ + { taskId: 'a', agent: 'general', description: 'q1', prompt: 'Reply with one word: ready.', maxSteps: 2 }, + { taskId: 'b', agent: 'general', description: 'q2', prompt: 'Reply with one word: go.', maxSteps: 2 }, +]; + +// 传入预算时,parallel() 解析为 { outcomes, budget } —— 所有子代理共享一个账本。 +// (不传时,parallel(specs) 返回原来的数组。) +const { outcomes, budget } = await session.parallel(specs, 50_000); +for (const o of outcomes) console.log(`[budget] ${o.taskId}: success=${o.success}`); +console.log(`spent ${budget.consumedTokens} / ${budget.limitTokens} tokens`); + +await session.close(); +``` + + + + +```python +from a3s_code import Agent, SessionOptions + +agent = Agent.create(open("agent.acl").read()) +session = agent.session(".", SessionOptions()) + +specs = [ + {"task_id": "a", "agent": "general", "description": "q1", "prompt": "Reply with one word: ready.", "max_steps": 2}, + {"task_id": "b", "agent": "general", "description": "q2", "prompt": "Reply with one word: go.", "max_steps": 2}, +] + +# 传入预算时,parallel() 返回 {"outcomes", "budget"}(共享账本)。 +res = session.parallel(specs, budget_tokens=50_000) +for o in res["outcomes"]: + print(f"[budget] {o['task_id']}: success={o['success']}") +print(f"spent {res['budget']['consumed_tokens']} / {res['budget']['limit_tokens']} tokens") + +session.close() +``` + + + + 说明: - 三个原语都返回按输入顺序对齐的结果:`{ taskId, success, output, error?, structured? }`(Node 对象)/ `{ "task_id", "success", "output", "error"?, "structured"? }`(Python 字典)。 - 在 spec 上设置 `outputSchema` / `output_schema` 可在 `structured` 中拿到解析后的结果。 - `maxSteps` / `max_steps` 限制每个子代理的步数;会话选项 `maxParallelTasks` / `max_parallel_tasks` 限制扇出并发量。 +- 给 `parallel` 传入 token 预算(第 2 个参数 `budgetTokens` / `budget_tokens=`)即可让整个扇出对同一个账本计数;此时返回形状变为 `{ outcomes, budget }`。它是软上限(用量在每次调用之后记账)。 - Node 的 pipeline 阶段回调绝不能抛出异常——出错时返回 `null`。Python 的阶段可以抛出(抛出的阶段会被捕获并视为 `None`)。 可运行版本见 `crates/code/sdk/node/examples/orchestration/parallel-pipeline.mjs` 和 `crates/code/sdk/python/examples/orchestration_workflow.py`。 diff --git a/apps/docs/content/docs/cn/code/orchestration.mdx b/apps/docs/content/docs/cn/code/orchestration.mdx index e82bcda9..71d0fa60 100644 --- a/apps/docs/content/docs/cn/code/orchestration.mdx +++ b/apps/docs/content/docs/cn/code/orchestration.mdx @@ -245,6 +245,107 @@ outcomes = session.parallel_resumable(specs, "release-batch-42") outcomes = session.parallel_resumable(specs, "release-batch-42") ``` +## 跨 fan-out 的共享预算 [#shared-budget] + +默认情况下,每个子代理各自统计自己的 LLM 成本。给 `parallel` 传入一个 token +预算后,所有子代理改为汇入**同一个账本**——为整个 fan-out 设一个统一上限。它映射 +到核心的 `WorkflowBudget`:一个安装到每个子运行上的、聚合型 `BudgetGuard`。 + +预算是一个**可选参数**,因此向后兼容: + +- 不传预算时,`parallel(specs)` 返回原来的结果数组,与之前完全一致。 +- 传入预算时,`parallel(specs, budgetTokens)` 解析为 `{ outcomes, budget }`, + 其中 `budget` 是账本快照(`consumedTokens` / `limitTokens`)。 + +一旦达到上限,之后*启动*的 step 会被拒绝——其结果为 `success: false`,并带有预算 +耗尽信息。它是一个**软上限**:由于用量是在每次 LLM 调用*之后*记账的,宽 fan-out +可能在账本追上之前冲过上限几个在飞回合。框架**绝不**强杀在飞的 fan-out;预算耗尽 +只是拒绝*下一次* LLM 调用。 + +```ts +// 不传预算 → 原样返回结果数组(行为不变)。 +const outcomes = await session.parallel(specs); + +// 传入预算 → { outcomes, budget }。所有子代理共享一个账本。 +const { outcomes: out, budget } = await session.parallel(specs, 500_000); +console.log(budget.consumedTokens, budget.limitTokens); // 例如 48213, 500000 +// TS 返回类型是联合类型:Array | { outcomes, budget }。 +``` + +```python +# 不传预算 → 原样返回列表(行为不变)。 +outcomes = session.parallel(specs) + +# 传入预算 → {"outcomes": [...], "budget": {"consumed_tokens", "limit_tokens"}}。 +res = session.parallel(specs, budget_tokens=500_000) +print(res["budget"]["consumed_tokens"], res["budget"]["limit_tokens"]) +``` + +## 循环直到完成(`execute_loop`)[#looping] + +对于长度未知、需要迭代至收敛的工作(loop-until-dry、反复打磨直到满意),核心语法 +提供了 `execute_loop`。每一轮都是一个屏障(`execute_steps_parallel`);宿主提供的 +谓词看到本轮的结果,返回 `LoopDecision::Continue(next_specs)` 或 +`LoopDecision::Stop`。必填的 `max_iterations` 是一个**硬上限**——一旦达到,即使谓词 +还想继续也会停止,从而让 LLM 驱动的循环永远不会失控。 + +```rust +use a3s_code_core::orchestration::{execute_loop, AgentStepSpec, LoopDecision}; + +let outcomes = execute_loop(executor, initial_specs, /* max_iterations */ 5, None, |round| { + // 本轮没有新发现就停止;否则扇出后续 step。 + let follow_ups = derive_follow_ups(round); + if follow_ups.is_empty() { + LoopDecision::Stop + } else { + LoopDecision::Continue(follow_ups) + } +}) +.await; +``` + +> 从宿主 SDK 你并不需要专门的 `loop` 动词——直接用你自己语言里的 `while`/`for` +> 围绕 `parallel` 写循环,根据结果决定下一轮即可。`execute_loop` 是为 Rust 语法 +> 而存在,并为循环提供一个单一、强制的终止守卫。 + +## Workflow facade(Rust / 嵌入)[#workflow-facade] + +`session.workflow()` 返回一个可廉价克隆的 `Workflow`,它预先接好了会话的 +executor、持久化 store、逐 step 事件流,以及一个稳定的、由会话派生的 root id。它是 +把以上能力打包起来的可编程句柄;控制流就是普通 Rust——`await` 一个动词、查看结果、 +决定下一步运行什么。 + +- **动词**——`agent`(单步)、`parallel`(屏障式 fan-out)、`phase`(*命名*的、 + 可恢复的屏障,并发出里程碑)、`pipeline`(按条目的链),以及不会失败的 `log`。 + 每个动词都只委派给一个 combinator。 +- **Phase 与事件**——`phase(name, specs)` 派生确定性 checkpoint id + (`{root}/{index}:{name}`),在配置了 store 时走可恢复屏障,并在一个广播上发出 + `WorkflowEvent::PhaseStart` / `PhaseEnd`,你可用 `subscribe()` 读取。`log()` + 发出 `WorkflowEvent::Log`。 +- **预算**——`session.workflow_with_token_budget(Some(limit))` 安装一个共享的 + `WorkflowBudget`;`budget_snapshot()` 读取账本,达到上限时会触发 + `WorkflowEvent::BudgetExhausted`。 + +```rust +let wf = session.workflow(); // 或 session.workflow_with_token_budget(Some(500_000)) +let mut events = wf.subscribe(); + +// 先跑一步,再根据其结果计算出一个*可变*数量的 fan-out——这正是“动态”所在: +// 形状在运行时决定,而非提前声明。 +let plan = wf.agent(AgentStepSpec::new("plan", "plan", "plan", goal)).await; +let specs = derive_specs(&plan); // 你的代码 +let done = wf.phase("implement", specs).await; // 可恢复屏障 + 里程碑 +let reviews = wf.phase("review", to_review(&done)).await; // 预算在各 phase 间共享 + +if let Some(b) = wf.budget_snapshot() { + println!("spent {} / {:?} tokens", b.consumed_tokens, b.limit_tokens); +} +``` + +SDK 暴露的是扁平的 `parallel` / `pipeline` / `parallelResumable` 动词(以及上面 +`parallel` 的预算重载);完整的 `Workflow` 句柄——phases、事件订阅、loop +combinator——属于 Rust / 嵌入层 API。 + ## 强制 schema 的 step 输出 [#schema-forced-step-output] 携带 `output_schema`(Node 中为 `outputSchema`)的 spec 会强制 step 返回符合该 JSON diff --git a/apps/docs/content/docs/en/code/examples/orchestration.mdx b/apps/docs/content/docs/en/code/examples/orchestration.mdx index 9326a742..690bb970 100644 --- a/apps/docs/content/docs/en/code/examples/orchestration.mdx +++ b/apps/docs/content/docs/en/code/examples/orchestration.mdx @@ -7,7 +7,7 @@ import { Tab, Tabs } from 'fumadocs-ui/components/tabs'; # Orchestration -This page shows the three programmable orchestration primitives in A3S Code v3.4.0: `session.parallel` for fan-out, `session.pipeline` for per-item multi-stage chains, and `session.parallelResumable` for journaled runs that survive a crash. Use orchestration when you have several independent subagent tasks (parallel), or one transformation that flows through ordered stages per input (pipeline). +This page shows the programmable orchestration primitives in A3S Code: `session.parallel` for fan-out, `session.pipeline` for per-item multi-stage chains, and `session.parallelResumable` for journaled runs that survive a crash. `parallel` also takes an optional token budget so a whole fan-out shares one ledger. Use orchestration when you have several independent subagent tasks (parallel), or one transformation that flows through ordered stages per input (pipeline). For the conceptual model behind these primitives, see [Orchestration](/docs/code/orchestration). @@ -227,11 +227,70 @@ session.close() +## Budgeted fan-out with `session.parallel` + +Pass a token budget as the second argument and every child agent feeds **one +shared ledger**. With a budget, `parallel` resolves to `{ outcomes, budget }` +(the ledger snapshot) instead of the plain outcomes array; once the cap is hit, a +step that starts afterwards is denied (`success: false`). It is a soft cap — a +wide fan-out can race a few in-flight turns past it; the in-flight work is never +force-killed. + + + + +```ts +import { Agent } from '@a3s-lab/code'; + +const agent = await Agent.create('agent.acl'); +const session = agent.session('.', {}); + +const specs = [ + { taskId: 'a', agent: 'general', description: 'q1', prompt: 'Reply with one word: ready.', maxSteps: 2 }, + { taskId: 'b', agent: 'general', description: 'q2', prompt: 'Reply with one word: go.', maxSteps: 2 }, +]; + +// With a budget, parallel() resolves to { outcomes, budget } — all children +// share one ledger. (Without it, parallel(specs) returns the plain array.) +const { outcomes, budget } = await session.parallel(specs, 50_000); +for (const o of outcomes) console.log(`[budget] ${o.taskId}: success=${o.success}`); +console.log(`spent ${budget.consumedTokens} / ${budget.limitTokens} tokens`); + +await session.close(); +``` + + + + +```python +from a3s_code import Agent, SessionOptions + +agent = Agent.create(open("agent.acl").read()) +session = agent.session(".", SessionOptions()) + +specs = [ + {"task_id": "a", "agent": "general", "description": "q1", "prompt": "Reply with one word: ready.", "max_steps": 2}, + {"task_id": "b", "agent": "general", "description": "q2", "prompt": "Reply with one word: go.", "max_steps": 2}, +] + +# With a budget, parallel() returns {"outcomes", "budget"} (a shared ledger). +res = session.parallel(specs, budget_tokens=50_000) +for o in res["outcomes"]: + print(f"[budget] {o['task_id']}: success={o['success']}") +print(f"spent {res['budget']['consumed_tokens']} / {res['budget']['limit_tokens']} tokens") + +session.close() +``` + + + + Notes: - All three primitives return outcomes aligned to input order: `{ taskId, success, output, error?, structured? }` (Node objects) / `{ "task_id", "success", "output", "error"?, "structured"? }` (Python dicts). - Set `outputSchema` / `output_schema` on a spec to get a parsed result back in `structured`. - `maxSteps` / `max_steps` caps the steps per subagent; `maxParallelTasks` / `max_parallel_tasks` (session option) caps fan-out concurrency. +- Pass a token budget to `parallel` (2nd arg `budgetTokens` / `budget_tokens=`) to cap a whole fan-out against one shared ledger; the return shape then becomes `{ outcomes, budget }`. It is a soft cap (usage is recorded after each call). - Node pipeline stage callbacks must never throw — return `null` on error. Python stages may raise (a raised stage is caught and treated as `None`). A runnable version ships at `crates/code/sdk/node/examples/orchestration/parallel-pipeline.mjs` and `crates/code/sdk/python/examples/orchestration_workflow.py`. diff --git a/apps/docs/content/docs/en/code/orchestration.mdx b/apps/docs/content/docs/en/code/orchestration.mdx index 473232d7..2ed598a9 100644 --- a/apps/docs/content/docs/en/code/orchestration.mdx +++ b/apps/docs/content/docs/en/code/orchestration.mdx @@ -267,6 +267,114 @@ outcomes = session.parallel_resumable(specs, "release-batch-42") outcomes = session.parallel_resumable(specs, "release-batch-42") ``` +## Shared budget across a fan-out + +By default each child agent counts its own LLM cost. Pass a token budget to +`parallel` and every child instead feeds **one shared ledger** — a single cap +for the whole fan-out. It maps to the core `WorkflowBudget`, an aggregating +`BudgetGuard` installed onto each child run. + +The budget is an **optional argument**, so it is backward compatible: + +- Without a budget, `parallel(specs)` returns the plain outcomes array, exactly + as before. +- With a budget, `parallel(specs, budgetTokens)` resolves to `{ outcomes, budget }`, + where `budget` is the ledger snapshot (`consumedTokens` / `limitTokens`). + +Once the cap is reached, a step that *starts* afterwards is denied — its outcome +is `success: false` with a budget-exhausted message. It is a **soft cap**: +because usage is recorded *after* each LLM call, a wide fan-out can race a few +in-flight turns past the cap before the ledger catches up. The framework never +force-kills an in-flight fan-out; an exhausted budget simply denies the *next* +LLM call. + +```ts +// No budget → plain outcomes array (unchanged). +const outcomes = await session.parallel(specs); + +// With a budget → { outcomes, budget }. All children share one ledger. +const { outcomes: out, budget } = await session.parallel(specs, 500_000); +console.log(budget.consumedTokens, budget.limitTokens); // e.g. 48213, 500000 +// TS return type is a union: Array | { outcomes, budget }. +``` + +```python +# No budget → plain list (unchanged). +outcomes = session.parallel(specs) + +# With a budget → {"outcomes": [...], "budget": {"consumed_tokens", "limit_tokens"}}. +res = session.parallel(specs, budget_tokens=500_000) +print(res["budget"]["consumed_tokens"], res["budget"]["limit_tokens"]) +``` + +## Looping until done (`execute_loop`) + +For unknown-length, iterate-until-converge work (loop-until-dry, refine-until-good), +the core grammar adds `execute_loop`. Each round is a barrier (`execute_steps_parallel`); +a host-supplied predicate sees the round's outcomes and returns +`LoopDecision::Continue(next_specs)` or `LoopDecision::Stop`. A required +`max_iterations` is a **hard cap** — once reached the loop stops even if the +predicate would continue, so an LLM-driven loop can never run away. + +```rust +use a3s_code_core::orchestration::{execute_loop, AgentStepSpec, LoopDecision}; + +let outcomes = execute_loop(executor, initial_specs, /* max_iterations */ 5, None, |round| { + // Stop when the round produced no new findings; otherwise fan out follow-ups. + let follow_ups = derive_follow_ups(round); + if follow_ups.is_empty() { + LoopDecision::Stop + } else { + LoopDecision::Continue(follow_ups) + } +}) +.await; +``` + +> From the host SDKs you don't need a dedicated `loop` verb — write the loop in +> your own language (`while`/`for`) around `parallel`, deciding the next round +> from the outcomes. `execute_loop` exists for the Rust grammar and to give the +> loop a single, enforced termination guard. + +## The Workflow facade (Rust / embedding) + +`session.workflow()` returns a cheaply-clonable `Workflow` that pre-wires the +session's executor, persistence store, per-step event stream, and a stable, +session-derived root id. It is the programmable handle that bundles everything +above; control flow is ordinary Rust — `await` a verb, inspect the outcomes, +decide what runs next. + +- **Verbs** — `agent` (one step), `parallel` (barrier fan-out), `phase` (a + *named*, resumable barrier that emits milestones), `pipeline` (per-item + chains), and the non-failing `log`. Each delegates to exactly one combinator. +- **Phases & events** — `phase(name, specs)` derives a deterministic checkpoint + id (`{root}/{index}:{name}`), runs the resumable barrier when a store is + present, and emits `WorkflowEvent::PhaseStart` / `PhaseEnd` on a broadcast you + read with `subscribe()`. `log()` emits `WorkflowEvent::Log`. +- **Budget** — `session.workflow_with_token_budget(Some(limit))` installs a + shared `WorkflowBudget`; `budget_snapshot()` reads the ledger and a + `WorkflowEvent::BudgetExhausted` fires once the cap is hit. + +```rust +let wf = session.workflow(); // or session.workflow_with_token_budget(Some(500_000)) +let mut events = wf.subscribe(); + +// One step, then a *variable* fan-out computed from its result — the "dynamic" +// part: the shape is decided at run time, not declared up front. +let plan = wf.agent(AgentStepSpec::new("plan", "plan", "plan", goal)).await; +let specs = derive_specs(&plan); // your code +let done = wf.phase("implement", specs).await; // resumable barrier + milestones +let reviews = wf.phase("review", to_review(&done)).await; // budget shared across phases + +if let Some(b) = wf.budget_snapshot() { + println!("spent {} / {:?} tokens", b.consumed_tokens, b.limit_tokens); +} +``` + +The SDKs expose the flat `parallel` / `pipeline` / `parallelResumable` verbs (and +the `parallel` budget overload above); the full `Workflow` handle — phases, +event subscription, the loop combinator — is a Rust/embedding API. + ## Schema-forced step output A spec carrying `output_schema` (`outputSchema` in Node) forces the step to diff --git a/apps/docs/content/tutorials/cn/orchestration.mdx b/apps/docs/content/tutorials/cn/orchestration.mdx index b9a4476f..2d795935 100644 --- a/apps/docs/content/tutorials/cn/orchestration.mdx +++ b/apps/docs/content/tutorials/cn/orchestration.mdx @@ -313,3 +313,23 @@ specs = [ outcomes = session.parallel_resumable(specs, "audit-2026-05") ``` + +## 为扇出设预算 + +给 `parallel` 传入 token 预算,整个扇出就会共享**同一个账本**。不传预算时拿到的是 +原来的结果;传入后,`parallel` 返回 `{ outcomes, budget }`,且一旦达到上限,之后 +*启动*的 step 会被拒绝。它是软上限——在飞的扇出绝不会被强杀。 + +```ts +const { outcomes, budget } = await session.parallel(specs, 500_000); +console.log(budget.consumedTokens, budget.limitTokens); +``` + +```python +res = session.parallel(specs, budget_tokens=500_000) +print(res["budget"]["consumed_tokens"], res["budget"]["limit_tokens"]) +``` + +如需 Rust 中完整的可编程句柄——带命名、可恢复 `phase` 的 `session.workflow()`、 +`WorkflowEvent` 里程碑、`execute_loop` combinator 以及 `budget_snapshot()`—— +参见 [编排](/docs/code/orchestration)。 diff --git a/apps/docs/content/tutorials/en/orchestration.mdx b/apps/docs/content/tutorials/en/orchestration.mdx index b8142203..7f9e1e27 100644 --- a/apps/docs/content/tutorials/en/orchestration.mdx +++ b/apps/docs/content/tutorials/en/orchestration.mdx @@ -321,3 +321,25 @@ specs = [ outcomes = session.parallel_resumable(specs, "audit-2026-05") ``` + +## Budgeting a Fan-out + +Pass a token budget to `parallel` so an entire fan-out shares **one ledger**. +Without a budget you get the plain outcomes; with one, `parallel` returns +`{ outcomes, budget }` and, once the cap is reached, a step that *starts* +afterward is denied. It is a soft cap — the in-flight fan-out is never +force-killed. + +```ts +const { outcomes, budget } = await session.parallel(specs, 500_000); +console.log(budget.consumedTokens, budget.limitTokens); +``` + +```python +res = session.parallel(specs, budget_tokens=500_000) +print(res["budget"]["consumed_tokens"], res["budget"]["limit_tokens"]) +``` + +For the full programmable handle in Rust — `session.workflow()` with named, +resumable `phase`s, `WorkflowEvent` milestones, the `execute_loop` combinator, +and `budget_snapshot()` — see [Orchestration](/docs/code/orchestration). From 041514f8382645b953d17efae6f52d1e2fcf3670 Mon Sep 17 00:00:00 2001 From: Roy Lin Date: Sun, 7 Jun 2026 14:56:07 +0800 Subject: [PATCH 2/2] docs(blog): add 'Programmable, Budget-Bounded Multi-Agent Workflows' (en + cn) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit A technical deep-dive on how A3S Code composes a Claude-Code-style dynamic workflow runtime — fan-out, phases, loops, and a shared token budget — from the existing AgentExecutor, WorkflowCheckpoint, and BudgetGuard seams rather than a new subsystem. Includes the honest soft-cap budget caveat. --- .../blog/cn/programmable-agent-workflows.mdx | 174 ++++++++++++++++ .../blog/en/programmable-agent-workflows.mdx | 189 ++++++++++++++++++ 2 files changed, 363 insertions(+) create mode 100644 apps/docs/content/blog/cn/programmable-agent-workflows.mdx create mode 100644 apps/docs/content/blog/en/programmable-agent-workflows.mdx diff --git a/apps/docs/content/blog/cn/programmable-agent-workflows.mdx b/apps/docs/content/blog/cn/programmable-agent-workflows.mdx new file mode 100644 index 00000000..9299148d --- /dev/null +++ b/apps/docs/content/blog/cn/programmable-agent-workflows.mdx @@ -0,0 +1,174 @@ +--- +title: "可编程、预算受限的多智能体工作流——从你已有的接缝中生长出来" +description: A3S Code 如何通过组合 AgentExecutor、WorkflowCheckpoint 与 BudgetGuard 这三处接缝,长出一套 Claude-Code 风格的动态工作流运行时——fan-out、phases、loops 以及共享的 token 预算——而不是硬塞进一个全新的子系统。 +date: "2026-06-07" +author: A3S Lab +tags: [a3s-code, orchestration, multi-agent, workflow, budget, rust] +--- + +> 最强大的功能往往是你*没有*去构建的那一个。A3S Code 早已拥有一个沙箱化的脚本 +> 运行时、一处与放置位置无关的 executor 接缝、可恢复的 checkpoint,以及一份预算 +> 契约。所谓“动态工作流”——那个让你把智能体 fan out、运行 phases、循环到完成, +> 并把整轮运行限制在同一份 token 预算之内的东西——最终只是一个*组合*这四者的 +> 薄薄一层,而非第五个子系统。 + +--- + +## 智能体 fan out 的两种方式 + +要运行不止一个智能体,有两种诚实的做法。 + +第一种是**模型驱动**:你给模型一个 `task` / `parallel_task` 工具,由它在运行时 +决定是否以及如何委派。fan-out 的形态完全由模型自己选择。当*是否委派*这个决策 +本身就是问题的一部分时,这种方式非常合适。 + +第二种是**可编程**:由*你*在代码里决定形态——并行运行这三个 reviewer;让每个 +候选项流经 explore → verify → review;循环到没有新发现为止;整轮运行在 +500k tokens 处停下。这种形态是可复现、可测试、预算受限、可恢复的,并且独立于 +模型的选择。当工作的*结构*提前已知时,你会伸手去拿的就是它。 + +Claude Code 让可编程风格流行起来:一段调用 `agent()` / `parallel()` / +`pipeline()` / `phase()` 的脚本,配上一份共享预算和可恢复的 checkpoint。我们想在 +A3S Code 里获得同样的表达力——又不把运行时变成一个工作流引擎。 + +## 诀窍:一切都是同一处接缝 + +整个编排层都是围绕单个 trait `AgentExecutor` 编写的: + +```rust +#[async_trait] +pub trait AgentExecutor: Send + Sync { + async fn execute_step(&self, spec: AgentStepSpec, tx: Option>) -> StepOutcome; + fn concurrency_hint(&self) -> usize { /* advisory */ } +} +``` + +这处接缝把职责干净地切开。**框架**拥有*语法*——存在哪些 step、它们如何组合, +以及可序列化的契约 `AgentStepSpec` / `StepOutcome`。**宿主**拥有*放置位置*—— +传输、调度,以及一个 step 实际在哪里运行。in-box executor 把每个 step 作为子 +智能体跑在本地的 tokio runtime 上;集群宿主则替换成自己的 executor,把 step +分布到各个节点上。这些组合子从不观察某个 step 在哪里运行过,因此同一个工作流 +无需改动就能从单进程扩展到一整支机群。 + +因为每个 step 都只是 `execute_step`,组合子都很小巧: + +- `execute_steps_parallel` —— 一个 barrier 式的 fan-out(等待全部完成;失败与 + panic 都会变成 `success: false`,绝不会丢掉某个兄弟任务)。 +- `execute_pipeline` —— 按 item 分级的链式处理,**各阶段之间没有 barrier**, + 因此 item A 可以处在 stage 3,而 item B 还在 stage 1。 +- `execute_steps_parallel_resumable` —— 同样的 fan-out,但在每个 step 边界处 + 记录到一个 `WorkflowCheckpoint`。 + +## “动态”究竟需要什么 + +把 Claude Code 的那些动词映射到已经存在的东西上,真正缺失的只有寥寥几样: + +| 能力 | 已经具备 | 新增 | +| ----------------- | -------------------------------------- | ----- | +| fan-out / pipeline | `execute_steps_parallel` / `execute_pipeline` | — | +| 恢复 | `WorkflowCheckpoint` + `SessionStore` | — | +| 结构化输出 | `AgentStepSpec.output_schema` → `StepOutcome.structured` | — | +| 预算契约 | `BudgetGuard`(按调用的决策点) | — | +| **phases + milestones** | — | `Workflow::phase` + `WorkflowEvent` | +| **循环到完成** | — | `execute_loop` + `LoopDecision` | +| **跨 fan-out 的单一预算** | `BudgetGuard` 此前仅限于单 session | `WorkflowBudget` | + +所以这份工作就是三个小而可组合的部件——外加一个把它们接起来的 facade。 + +## 一个 facade,而非一个引擎 + +`session.workflow()` 返回一个可廉价克隆的 handle,它预先接好了 session 的 +executor、store、事件流,以及一个稳定的、由 session 派生的 id。控制流仍然留在 +宿主语言里——你 `await` 一个动词,看看 outcomes,再用普通的 `if` / `for` / +`while` 决定接下来运行什么: + +```rust +let wf = session.workflow(); + +// One step, then a *variable* fan-out computed from its result. This is the +// "dynamic" part — the shape is decided at run time, not declared up front. +let plan = wf.agent(AgentStepSpec::new("plan", "plan", "plan", goal)).await; +let specs = derive_specs(&plan); // your code +let done = wf.phase("implement", specs).await; // resumable barrier +let review = wf.phase("review", to_review(&done)).await; +``` + +每个动词都恰好委派给一个组合子——facade 不拥有任何调度,也不拥有任何 LLM +逻辑。`phase(name, specs)` 是唯一带有新行为的部件:它派生出一个确定性的 +checkpoint id(`{root}/{index}:{name}`),在 store 存在时运行可恢复的 barrier, +并在一个你可以 `subscribe()` 的广播上发出 +`WorkflowEvent::PhaseStart` / `PhaseEnd`。因为没有内嵌的脚本解释器,也就没有需要 +加固的沙箱——“解释器”就是宿主语言,而攻击面只是 Rust 在调用一些带类型的函数。 + +## 循环,配上一个你忘不掉的 guard + +长度未知的工作——循环到枯竭、精炼到满意——需要一个循环。但一个只能让*自己* +停下的 LLM 驱动循环,就是一场等着发生的失控。所以 `execute_loop` 把 guard 设为 +强制: + +```rust +let outcomes = execute_loop(executor, initial, /* max_iterations */ 5, None, |round| { + let follow_ups = derive_follow_ups(round); + if follow_ups.is_empty() { LoopDecision::Stop } + else { LoopDecision::Continue(follow_ups) } +}).await; +``` + +`max_iterations` 是一个硬性上限:一旦达到,即使谓词还想继续,循环也会停下。 +谓词是*软*条件;上限是*硬*条件。你没法写出不带 guard 的那个版本。 + +## 跨整个 fan-out 的单一预算——诚实地说 + +`BudgetGuard` 早已能为每次 LLM 调用裁定成本,但每个子任务各自统计自己的开销。 +为了给一个 *fan-out* 设上限,`WorkflowBudget` 包裹了同一个 guard,并把每个子 +任务的用量累加进一份共享的原子账本: + +```rust +let wf = session.workflow_with_token_budget(Some(500_000)); +// ...run phases... +if let Some(b) = wf.budget_snapshot() { + println!("spent {} / {:?}", b.consumed_tokens, b.limit_tokens); +} +``` + +它通过那处*未经改动*的接缝安装进去——它*就是*一个 `BudgetGuard`——因此每个子 +循环已有的按轮检查都会自动向共享账本喂数据。没有新的强制点。 + +诚实的部分:用量是在每次调用**之后**记录的,而上限是在调用**之前**检查的。在 +宽幅并行 fan-out 之下,少数几个在途的 turn 可能在账本追上之前冲过硬性上限。 +所以它是一个**软**天花板,而非按 token 的保证——并且框架绝不会强行杀掉一个 +在途的 fan-out。一份耗尽的预算只是拒绝*下一次*调用,这会表现为一个失败的 step, +供宿主作出反应。我们把这个权衡如实写进文档,而不是假装这场竞态不存在;顺序 +调用者得到一个清晰的上限,宽幅 fan-out 得到一个柔性的上限。 + +## 来自 SDK + +Node 与 Python SDK 暴露了这些扁平的动词(`parallel`、`pipeline`、 +`parallelResumable`)。共享预算作为 `parallel` 上的一个*可选参数*搭车进来,因此 +它向后兼容——不给预算,还是你一直拿到的那个数组;给了预算,则是更丰富的结果: + +```ts +// No budget → the plain outcomes array (unchanged). +const outcomes = await session.parallel(specs); + +// With a budget → { outcomes, budget }; all children share one ledger. +const { outcomes: out, budget } = await session.parallel(specs, 500_000); +console.log(budget.consumedTokens, budget.limitTokens); +``` + +```python +res = session.parallel(specs, budget_tokens=500_000) +print(res["budget"]["consumed_tokens"], res["budget"]["limit_tokens"]) +``` + +## 这堂课 + +一套动态工作流运行时听起来像是个庞大的子系统。它并不是——因为这个运行时早已 +围绕正确的接缝做好了拆分。`AgentExecutor` 给了我们与放置位置无关的执行; +`WorkflowCheckpoint` 给了我们恢复能力;`BudgetGuard` 给了我们一份预算契约。 +所谓“功能”,就是一个薄薄的 facade 外加两个组合它们的小组合子。当接缝对了,那个 +强大的东西就很小。 + +如果你在构建智能体基础设施,要带走的并不是“抄下这三个类型”,而是:找到那一处 +能让你整层都围绕它编写的接缝,让它保持可序列化、与放置位置无关,然后让那些 +强大的功能作为组合自然涌现出来。最好的工作流引擎,就是你不必去构建的那一个。 diff --git a/apps/docs/content/blog/en/programmable-agent-workflows.mdx b/apps/docs/content/blog/en/programmable-agent-workflows.mdx new file mode 100644 index 00000000..3996aa3f --- /dev/null +++ b/apps/docs/content/blog/en/programmable-agent-workflows.mdx @@ -0,0 +1,189 @@ +--- +title: "Programmable, Budget-Bounded Multi-Agent Workflows — Built From Seams You Already Have" +description: How A3S Code grows a Claude-Code-style dynamic workflow runtime — fan-out, phases, loops, and a shared token budget — by composing the AgentExecutor, WorkflowCheckpoint, and BudgetGuard seams instead of bolting on a new subsystem. +date: "2026-06-07" +author: A3S Lab +tags: [a3s-code, orchestration, multi-agent, workflow, budget, rust] +--- + +> The most powerful feature is often the one you *don't* build. A3S Code already +> had a sandboxed script runtime, a placement-agnostic executor seam, resumable +> checkpoints, and a budget contract. A "dynamic workflow" — the thing that lets +> you fan agents out, run phases, loop until done, and cap the whole run against +> one token budget — turned out to be a thin layer that *composes* those four, +> not a fifth subsystem. + +--- + +## Two ways an agent fans out + +There are two honest ways to run more than one agent. + +The first is **model-driven**: you give the model a `task` / `parallel_task` +tool and it decides, at run time, whether and how to delegate. The shape of the +fan-out is whatever the model chose. This is great when the *decision* to +delegate is itself part of the problem. + +The second is **programmable**: *you* decide the shape in code — run these three +reviewers in parallel; flow each candidate through explore → verify → review; +loop until no new findings; stop the whole thing at 500k tokens. The shape is +reproducible, testable, budget-bounded, and resumable, independent of what the +model picks. This is the one you reach for when the *structure* of the work is +known ahead of time. + +Claude Code popularized the programmable style: a script that calls +`agent()` / `parallel()` / `pipeline()` / `phase()`, with a shared budget and +resumable checkpoints. We wanted the same expressive power in A3S Code — without +turning the runtime into a workflow engine. + +## The trick: everything is one seam + +The whole orchestration layer is written against a single trait, `AgentExecutor`: + +```rust +#[async_trait] +pub trait AgentExecutor: Send + Sync { + async fn execute_step(&self, spec: AgentStepSpec, tx: Option>) -> StepOutcome; + fn concurrency_hint(&self) -> usize { /* advisory */ } +} +``` + +That seam splits responsibilities cleanly. The **framework** owns the *grammar* — +which steps exist, how they compose, and the serializable contracts +`AgentStepSpec` / `StepOutcome`. The **host** owns *placement* — transport, +scheduling, and where a step actually runs. The in-box executor runs each step +as a child agent on the local tokio runtime; a cluster host substitutes its own +executor to place steps across nodes. The combinators never observe where a step +ran, so the same workflow scales from one process to a fleet without changes. + +Because every step is just `execute_step`, the combinators are tiny: + +- `execute_steps_parallel` — a barrier fan-out (await all; failures and panics + become `success: false`, never a dropped sibling). +- `execute_pipeline` — per-item staged chains with **no barrier between stages**, + so item A can be in stage 3 while item B is still in stage 1. +- `execute_steps_parallel_resumable` — the same fan-out, but journaled to a + `WorkflowCheckpoint` at each step boundary. + +## What "dynamic" actually needs + +Mapping Claude Code's verbs onto what already existed, only a few things were +genuinely missing: + +| Capability | Already there | Added | +| ----------------- | -------------------------------------- | ----- | +| fan-out / pipeline | `execute_steps_parallel` / `execute_pipeline` | — | +| resume | `WorkflowCheckpoint` + `SessionStore` | — | +| structured output | `AgentStepSpec.output_schema` → `StepOutcome.structured` | — | +| a budget contract | `BudgetGuard` (per-call decision points) | — | +| **phases + milestones** | — | `Workflow::phase` + `WorkflowEvent` | +| **loop-until-done** | — | `execute_loop` + `LoopDecision` | +| **one budget across a fan-out** | `BudgetGuard` was per-session only | `WorkflowBudget` | + +So the work was three small, composable pieces — and a facade to wire them up. + +## A facade, not an engine + +`session.workflow()` returns a cheaply-clonable handle that pre-wires the +session's executor, store, event stream, and a stable, session-derived id. +Control flow stays in the host language — you `await` a verb, look at the +outcomes, and decide what runs next with ordinary `if` / `for` / `while`: + +```rust +let wf = session.workflow(); + +// One step, then a *variable* fan-out computed from its result. This is the +// "dynamic" part — the shape is decided at run time, not declared up front. +let plan = wf.agent(AgentStepSpec::new("plan", "plan", "plan", goal)).await; +let specs = derive_specs(&plan); // your code +let done = wf.phase("implement", specs).await; // resumable barrier +let review = wf.phase("review", to_review(&done)).await; +``` + +Each verb delegates to exactly one combinator — the facade owns no scheduling +and no LLM logic. `phase(name, specs)` is the one piece with new behavior: it +derives a deterministic checkpoint id (`{root}/{index}:{name}`), runs the +resumable barrier when a store is present, and emits +`WorkflowEvent::PhaseStart` / `PhaseEnd` on a broadcast you can `subscribe()` to. +Because there is no embedded script interpreter, there is no sandbox to harden — +the "interpreter" is the host language, and the attack surface is just Rust +calling typed functions. + +## Looping, with a guard you can't forget + +Unknown-length work — loop-until-dry, refine-until-good — needs a loop. But an +LLM-driven loop that can only stop *itself* is a runaway waiting to happen. So +`execute_loop` makes the guard mandatory: + +```rust +let outcomes = execute_loop(executor, initial, /* max_iterations */ 5, None, |round| { + let follow_ups = derive_follow_ups(round); + if follow_ups.is_empty() { LoopDecision::Stop } + else { LoopDecision::Continue(follow_ups) } +}).await; +``` + +`max_iterations` is a hard cap: once reached, the loop stops even if the +predicate would continue. The predicate is the *soft* condition; the cap is the +*hard* one. You can't write the version without the guard. + +## One budget across the whole fan-out — honestly + +`BudgetGuard` already decided cost per LLM call, but each child counted its own +spend. To cap a *fan-out*, `WorkflowBudget` wraps that same guard and +accumulates every child's usage into one shared atomic ledger: + +```rust +let wf = session.workflow_with_token_budget(Some(500_000)); +// ...run phases... +if let Some(b) = wf.budget_snapshot() { + println!("spent {} / {:?}", b.consumed_tokens, b.limit_tokens); +} +``` + +It installs through the *unchanged* seam — it *is* a `BudgetGuard` — so every +child loop's existing per-turn check feeds the shared ledger automatically. No +new enforcement point. + +The honest part: usage is recorded **after** each call, while the cap is checked +**before**. Under a wide parallel fan-out, a handful of in-flight turns can race +past a hard cap before the ledger catches up. So it is a **soft** ceiling, not a +per-token guarantee — and the framework never force-kills an in-flight fan-out. +An exhausted budget simply denies the *next* call, which surfaces as a failed +step the host can react to. We document that tradeoff rather than pretend the +race away; a sequential caller gets a crisp cap, a wide fan-out gets a soft one. + +## From the SDK + +The Node and Python SDKs expose the flat verbs (`parallel`, `pipeline`, +`parallelResumable`). The shared budget rides in as an *optional argument* on +`parallel`, so it's backward compatible — no budget, same array you always got; +with a budget, the richer result: + +```ts +// No budget → the plain outcomes array (unchanged). +const outcomes = await session.parallel(specs); + +// With a budget → { outcomes, budget }; all children share one ledger. +const { outcomes: out, budget } = await session.parallel(specs, 500_000); +console.log(budget.consumedTokens, budget.limitTokens); +``` + +```python +res = session.parallel(specs, budget_tokens=500_000) +print(res["budget"]["consumed_tokens"], res["budget"]["limit_tokens"]) +``` + +## The lesson + +A dynamic workflow runtime sounds like a big subsystem. It wasn't — because the +runtime was already factored around the right seams. `AgentExecutor` gave us +placement-agnostic execution; `WorkflowCheckpoint` gave us resume; +`BudgetGuard` gave us a budget contract. The "feature" was a thin facade plus +two small combinators that compose them. When the seams are right, the powerful +thing is small. + +If you're building agent infrastructure, the takeaway isn't "copy these three +types." It's: find the one seam your whole layer can be written against, keep it +serializable and placement-agnostic, and let the powerful features fall out as +compositions. The best workflow engine is the one you didn't have to build.