Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
174 changes: 174 additions & 0 deletions apps/docs/content/blog/cn/programmable-agent-workflows.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
---
title: "可编程、预算受限的多智能体工作流——从你已有的接缝中生长出来"
description: A3S Code 如何通过组合 AgentExecutor、WorkflowCheckpoint 与 BudgetGuard 这三处接缝,长出一套 Claude-Code 风格的动态工作流运行时——fan-out、phases、loops 以及共享的 token 预算——而不是硬塞进一个全新的子系统。
date: "2026-06-07"
author: A3S Lab
tags: [a3s-code, orchestration, multi-agent, workflow, budget, rust]
---

> 最强大的功能往往是你*没有*去构建的那一个。A3S Code 早已拥有一个沙箱化的脚本
> 运行时、一处与放置位置无关的 executor 接缝、可恢复的 checkpoint,以及一份预算
> 契约。所谓“动态工作流”——那个让你把智能体 fan out、运行 phases、循环到完成,
> 并把整轮运行限制在同一份 token 预算之内的东西——最终只是一个*组合*这四者的
> 薄薄一层,而非第五个子系统。

---

## 智能体 fan out 的两种方式

要运行不止一个智能体,有两种诚实的做法。

第一种是**模型驱动**:你给模型一个 `task` / `parallel_task` 工具,由它在运行时
决定是否以及如何委派。fan-out 的形态完全由模型自己选择。当*是否委派*这个决策
本身就是问题的一部分时,这种方式非常合适。

第二种是**可编程**:由*你*在代码里决定形态——并行运行这三个 reviewer;让每个
候选项流经 explore → verify → review;循环到没有新发现为止;整轮运行在
500k tokens 处停下。这种形态是可复现、可测试、预算受限、可恢复的,并且独立于
模型的选择。当工作的*结构*提前已知时,你会伸手去拿的就是它。

Claude Code 让可编程风格流行起来:一段调用 `agent()` / `parallel()` /
`pipeline()` / `phase()` 的脚本,配上一份共享预算和可恢复的 checkpoint。我们想在
A3S Code 里获得同样的表达力——又不把运行时变成一个工作流引擎。

## 诀窍:一切都是同一处接缝

整个编排层都是围绕单个 trait `AgentExecutor` 编写的:

```rust
#[async_trait]
pub trait AgentExecutor: Send + Sync {
async fn execute_step(&self, spec: AgentStepSpec, tx: Option<Sender<AgentEvent>>) -> StepOutcome;
fn concurrency_hint(&self) -> usize { /* advisory */ }
}
```

这处接缝把职责干净地切开。**框架**拥有*语法*——存在哪些 step、它们如何组合,
以及可序列化的契约 `AgentStepSpec` / `StepOutcome`。**宿主**拥有*放置位置*——
传输、调度,以及一个 step 实际在哪里运行。in-box executor 把每个 step 作为子
智能体跑在本地的 tokio runtime 上;集群宿主则替换成自己的 executor,把 step
分布到各个节点上。这些组合子从不观察某个 step 在哪里运行过,因此同一个工作流
无需改动就能从单进程扩展到一整支机群。

因为每个 step 都只是 `execute_step`,组合子都很小巧:

- `execute_steps_parallel` —— 一个 barrier 式的 fan-out(等待全部完成;失败与
panic 都会变成 `success: false`,绝不会丢掉某个兄弟任务)。
- `execute_pipeline` —— 按 item 分级的链式处理,**各阶段之间没有 barrier**,
因此 item A 可以处在 stage 3,而 item B 还在 stage 1。
- `execute_steps_parallel_resumable` —— 同样的 fan-out,但在每个 step 边界处
记录到一个 `WorkflowCheckpoint`。

## “动态”究竟需要什么

把 Claude Code 的那些动词映射到已经存在的东西上,真正缺失的只有寥寥几样:

| 能力 | 已经具备 | 新增 |
| ----------------- | -------------------------------------- | ----- |
| fan-out / pipeline | `execute_steps_parallel` / `execute_pipeline` | — |
| 恢复 | `WorkflowCheckpoint` + `SessionStore` | — |
| 结构化输出 | `AgentStepSpec.output_schema` → `StepOutcome.structured` | — |
| 预算契约 | `BudgetGuard`(按调用的决策点) | — |
| **phases + milestones** | — | `Workflow::phase` + `WorkflowEvent` |
| **循环到完成** | — | `execute_loop` + `LoopDecision` |
| **跨 fan-out 的单一预算** | `BudgetGuard` 此前仅限于单 session | `WorkflowBudget` |

所以这份工作就是三个小而可组合的部件——外加一个把它们接起来的 facade。

## 一个 facade,而非一个引擎

`session.workflow()` 返回一个可廉价克隆的 handle,它预先接好了 session 的
executor、store、事件流,以及一个稳定的、由 session 派生的 id。控制流仍然留在
宿主语言里——你 `await` 一个动词,看看 outcomes,再用普通的 `if` / `for` /
`while` 决定接下来运行什么:

```rust
let wf = session.workflow();

// One step, then a *variable* fan-out computed from its result. This is the
// "dynamic" part — the shape is decided at run time, not declared up front.
let plan = wf.agent(AgentStepSpec::new("plan", "plan", "plan", goal)).await;
let specs = derive_specs(&plan); // your code
let done = wf.phase("implement", specs).await; // resumable barrier
let review = wf.phase("review", to_review(&done)).await;
```

每个动词都恰好委派给一个组合子——facade 不拥有任何调度,也不拥有任何 LLM
逻辑。`phase(name, specs)` 是唯一带有新行为的部件:它派生出一个确定性的
checkpoint id(`{root}/{index}:{name}`),在 store 存在时运行可恢复的 barrier,
并在一个你可以 `subscribe()` 的广播上发出
`WorkflowEvent::PhaseStart` / `PhaseEnd`。因为没有内嵌的脚本解释器,也就没有需要
加固的沙箱——“解释器”就是宿主语言,而攻击面只是 Rust 在调用一些带类型的函数。

## 循环,配上一个你忘不掉的 guard

长度未知的工作——循环到枯竭、精炼到满意——需要一个循环。但一个只能让*自己*
停下的 LLM 驱动循环,就是一场等着发生的失控。所以 `execute_loop` 把 guard 设为
强制:

```rust
let outcomes = execute_loop(executor, initial, /* max_iterations */ 5, None, |round| {
let follow_ups = derive_follow_ups(round);
if follow_ups.is_empty() { LoopDecision::Stop }
else { LoopDecision::Continue(follow_ups) }
}).await;
```

`max_iterations` 是一个硬性上限:一旦达到,即使谓词还想继续,循环也会停下。
谓词是*软*条件;上限是*硬*条件。你没法写出不带 guard 的那个版本。

## 跨整个 fan-out 的单一预算——诚实地说

`BudgetGuard` 早已能为每次 LLM 调用裁定成本,但每个子任务各自统计自己的开销。
为了给一个 *fan-out* 设上限,`WorkflowBudget` 包裹了同一个 guard,并把每个子
任务的用量累加进一份共享的原子账本:

```rust
let wf = session.workflow_with_token_budget(Some(500_000));
// ...run phases...
if let Some(b) = wf.budget_snapshot() {
println!("spent {} / {:?}", b.consumed_tokens, b.limit_tokens);
}
```

它通过那处*未经改动*的接缝安装进去——它*就是*一个 `BudgetGuard`——因此每个子
循环已有的按轮检查都会自动向共享账本喂数据。没有新的强制点。

诚实的部分:用量是在每次调用**之后**记录的,而上限是在调用**之前**检查的。在
宽幅并行 fan-out 之下,少数几个在途的 turn 可能在账本追上之前冲过硬性上限。
所以它是一个**软**天花板,而非按 token 的保证——并且框架绝不会强行杀掉一个
在途的 fan-out。一份耗尽的预算只是拒绝*下一次*调用,这会表现为一个失败的 step,
供宿主作出反应。我们把这个权衡如实写进文档,而不是假装这场竞态不存在;顺序
调用者得到一个清晰的上限,宽幅 fan-out 得到一个柔性的上限。

## 来自 SDK

Node 与 Python SDK 暴露了这些扁平的动词(`parallel`、`pipeline`、
`parallelResumable`)。共享预算作为 `parallel` 上的一个*可选参数*搭车进来,因此
它向后兼容——不给预算,还是你一直拿到的那个数组;给了预算,则是更丰富的结果:

```ts
// No budget → the plain outcomes array (unchanged).
const outcomes = await session.parallel(specs);

// With a budget → { outcomes, budget }; all children share one ledger.
const { outcomes: out, budget } = await session.parallel(specs, 500_000);
console.log(budget.consumedTokens, budget.limitTokens);
```

```python
res = session.parallel(specs, budget_tokens=500_000)
print(res["budget"]["consumed_tokens"], res["budget"]["limit_tokens"])
```

## 这堂课

一套动态工作流运行时听起来像是个庞大的子系统。它并不是——因为这个运行时早已
围绕正确的接缝做好了拆分。`AgentExecutor` 给了我们与放置位置无关的执行;
`WorkflowCheckpoint` 给了我们恢复能力;`BudgetGuard` 给了我们一份预算契约。
所谓“功能”,就是一个薄薄的 facade 外加两个组合它们的小组合子。当接缝对了,那个
强大的东西就很小。

如果你在构建智能体基础设施,要带走的并不是“抄下这三个类型”,而是:找到那一处
能让你整层都围绕它编写的接缝,让它保持可序列化、与放置位置无关,然后让那些
强大的功能作为组合自然涌现出来。最好的工作流引擎,就是你不必去构建的那一个。
189 changes: 189 additions & 0 deletions apps/docs/content/blog/en/programmable-agent-workflows.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
---
title: "Programmable, Budget-Bounded Multi-Agent Workflows — Built From Seams You Already Have"
description: How A3S Code grows a Claude-Code-style dynamic workflow runtime — fan-out, phases, loops, and a shared token budget — by composing the AgentExecutor, WorkflowCheckpoint, and BudgetGuard seams instead of bolting on a new subsystem.
date: "2026-06-07"
author: A3S Lab
tags: [a3s-code, orchestration, multi-agent, workflow, budget, rust]
---

> The most powerful feature is often the one you *don't* build. A3S Code already
> had a sandboxed script runtime, a placement-agnostic executor seam, resumable
> checkpoints, and a budget contract. A "dynamic workflow" — the thing that lets
> you fan agents out, run phases, loop until done, and cap the whole run against
> one token budget — turned out to be a thin layer that *composes* those four,
> not a fifth subsystem.

---

## Two ways an agent fans out

There are two honest ways to run more than one agent.

The first is **model-driven**: you give the model a `task` / `parallel_task`
tool and it decides, at run time, whether and how to delegate. The shape of the
fan-out is whatever the model chose. This is great when the *decision* to
delegate is itself part of the problem.

The second is **programmable**: *you* decide the shape in code — run these three
reviewers in parallel; flow each candidate through explore → verify → review;
loop until no new findings; stop the whole thing at 500k tokens. The shape is
reproducible, testable, budget-bounded, and resumable, independent of what the
model picks. This is the one you reach for when the *structure* of the work is
known ahead of time.

Claude Code popularized the programmable style: a script that calls
`agent()` / `parallel()` / `pipeline()` / `phase()`, with a shared budget and
resumable checkpoints. We wanted the same expressive power in A3S Code — without
turning the runtime into a workflow engine.

## The trick: everything is one seam

The whole orchestration layer is written against a single trait, `AgentExecutor`:

```rust
#[async_trait]
pub trait AgentExecutor: Send + Sync {
async fn execute_step(&self, spec: AgentStepSpec, tx: Option<Sender<AgentEvent>>) -> StepOutcome;
fn concurrency_hint(&self) -> usize { /* advisory */ }
}
```

That seam splits responsibilities cleanly. The **framework** owns the *grammar* —
which steps exist, how they compose, and the serializable contracts
`AgentStepSpec` / `StepOutcome`. The **host** owns *placement* — transport,
scheduling, and where a step actually runs. The in-box executor runs each step
as a child agent on the local tokio runtime; a cluster host substitutes its own
executor to place steps across nodes. The combinators never observe where a step
ran, so the same workflow scales from one process to a fleet without changes.

Because every step is just `execute_step`, the combinators are tiny:

- `execute_steps_parallel` — a barrier fan-out (await all; failures and panics
become `success: false`, never a dropped sibling).
- `execute_pipeline` — per-item staged chains with **no barrier between stages**,
so item A can be in stage 3 while item B is still in stage 1.
- `execute_steps_parallel_resumable` — the same fan-out, but journaled to a
`WorkflowCheckpoint` at each step boundary.

## What "dynamic" actually needs

Mapping Claude Code's verbs onto what already existed, only a few things were
genuinely missing:

| Capability | Already there | Added |
| ----------------- | -------------------------------------- | ----- |
| fan-out / pipeline | `execute_steps_parallel` / `execute_pipeline` | — |
| resume | `WorkflowCheckpoint` + `SessionStore` | — |
| structured output | `AgentStepSpec.output_schema` → `StepOutcome.structured` | — |
| a budget contract | `BudgetGuard` (per-call decision points) | — |
| **phases + milestones** | — | `Workflow::phase` + `WorkflowEvent` |
| **loop-until-done** | — | `execute_loop` + `LoopDecision` |
| **one budget across a fan-out** | `BudgetGuard` was per-session only | `WorkflowBudget` |

So the work was three small, composable pieces — and a facade to wire them up.

## A facade, not an engine

`session.workflow()` returns a cheaply-clonable handle that pre-wires the
session's executor, store, event stream, and a stable, session-derived id.
Control flow stays in the host language — you `await` a verb, look at the
outcomes, and decide what runs next with ordinary `if` / `for` / `while`:

```rust
let wf = session.workflow();

// One step, then a *variable* fan-out computed from its result. This is the
// "dynamic" part — the shape is decided at run time, not declared up front.
let plan = wf.agent(AgentStepSpec::new("plan", "plan", "plan", goal)).await;
let specs = derive_specs(&plan); // your code
let done = wf.phase("implement", specs).await; // resumable barrier
let review = wf.phase("review", to_review(&done)).await;
```

Each verb delegates to exactly one combinator — the facade owns no scheduling
and no LLM logic. `phase(name, specs)` is the one piece with new behavior: it
derives a deterministic checkpoint id (`{root}/{index}:{name}`), runs the
resumable barrier when a store is present, and emits
`WorkflowEvent::PhaseStart` / `PhaseEnd` on a broadcast you can `subscribe()` to.
Because there is no embedded script interpreter, there is no sandbox to harden —
the "interpreter" is the host language, and the attack surface is just Rust
calling typed functions.

## Looping, with a guard you can't forget

Unknown-length work — loop-until-dry, refine-until-good — needs a loop. But an
LLM-driven loop that can only stop *itself* is a runaway waiting to happen. So
`execute_loop` makes the guard mandatory:

```rust
let outcomes = execute_loop(executor, initial, /* max_iterations */ 5, None, |round| {
let follow_ups = derive_follow_ups(round);
if follow_ups.is_empty() { LoopDecision::Stop }
else { LoopDecision::Continue(follow_ups) }
}).await;
```

`max_iterations` is a hard cap: once reached, the loop stops even if the
predicate would continue. The predicate is the *soft* condition; the cap is the
*hard* one. You can't write the version without the guard.

## One budget across the whole fan-out — honestly

`BudgetGuard` already decided cost per LLM call, but each child counted its own
spend. To cap a *fan-out*, `WorkflowBudget` wraps that same guard and
accumulates every child's usage into one shared atomic ledger:

```rust
let wf = session.workflow_with_token_budget(Some(500_000));
// ...run phases...
if let Some(b) = wf.budget_snapshot() {
println!("spent {} / {:?}", b.consumed_tokens, b.limit_tokens);
}
```

It installs through the *unchanged* seam — it *is* a `BudgetGuard` — so every
child loop's existing per-turn check feeds the shared ledger automatically. No
new enforcement point.

The honest part: usage is recorded **after** each call, while the cap is checked
**before**. Under a wide parallel fan-out, a handful of in-flight turns can race
past a hard cap before the ledger catches up. So it is a **soft** ceiling, not a
per-token guarantee — and the framework never force-kills an in-flight fan-out.
An exhausted budget simply denies the *next* call, which surfaces as a failed
step the host can react to. We document that tradeoff rather than pretend the
race away; a sequential caller gets a crisp cap, a wide fan-out gets a soft one.

## From the SDK

The Node and Python SDKs expose the flat verbs (`parallel`, `pipeline`,
`parallelResumable`). The shared budget rides in as an *optional argument* on
`parallel`, so it's backward compatible — no budget, same array you always got;
with a budget, the richer result:

```ts
// No budget → the plain outcomes array (unchanged).
const outcomes = await session.parallel(specs);

// With a budget → { outcomes, budget }; all children share one ledger.
const { outcomes: out, budget } = await session.parallel(specs, 500_000);
console.log(budget.consumedTokens, budget.limitTokens);
```

```python
res = session.parallel(specs, budget_tokens=500_000)
print(res["budget"]["consumed_tokens"], res["budget"]["limit_tokens"])
```

## The lesson

A dynamic workflow runtime sounds like a big subsystem. It wasn't — because the
runtime was already factored around the right seams. `AgentExecutor` gave us
placement-agnostic execution; `WorkflowCheckpoint` gave us resume;
`BudgetGuard` gave us a budget contract. The "feature" was a thin facade plus
two small combinators that compose them. When the seams are right, the powerful
thing is small.

If you're building agent infrastructure, the takeaway isn't "copy these three
types." It's: find the one seam your whole layer can be written against, keep it
serializable and placement-agnostic, and let the powerful features fall out as
compositions. The best workflow engine is the one you didn't have to build.
Loading