Skip to content

Latest commit

 

History

History
798 lines (646 loc) · 25.6 KB

File metadata and controls

798 lines (646 loc) · 25.6 KB

Delphi Agent 数据存储

本文档梳理 Delphi Agent 在三类外部存储中的所有数据结构、用途与样例数据:

  • MongoDB — 持久化主存储(session、entries、runs、outbox、audit、usage、subagent)
  • Redis — 协调与实时层(active run lease、queue、event stream、pub/sub、配额计数)
  • S3 / MinIO — Workspace snapshot 对象存储

命名约定:Mongo _idid 字段;Redis key 中 <prefix> 默认是 delphi,可通过 PI_CLUSTER_REDIS_KEY_PREFIX 修改;下文样例统一用 delphi

目录

MongoDB Collections

Collection 主键 / 唯一索引 主要写入路径 读取路径
agent_sessions _id(sessionId) session 创建 / compact / fork / setModel SDK / /api/chat/stream 加载 session
agent_session_entries (sessionId, entryId) 唯一;(sessionId, runId, sequence) 唯一 sparse prompt/continue 期间消息追加 session 历史回放、fork、navigate
runs runId 唯一 run admission → outbox commit /api/audit、调度恢复
runtime_event_outbox (runId, eventName, eventId) 唯一 terminal event 提交 RuntimeEventOutboxWorker 重试投递
audit_logs _id AuditService.record GET /api/audit
usage_metrics (namespace, date) 唯一 UsageMeteringService flush GET /api/usage
subagent_states (namespace, sessionId, id) 唯一 SubagentRuntime.spawn subagent 状态查询

agent_sessions

会话元数据 + head 指针 + fencing token。是 session 树的根节点。

字段定义:

字段 类型 说明
_id String sessionId(业务方传入或 session_<uuid>
version Long (@Version) 乐观锁版本号
namespace String 租户命名空间(== tenantId)
ownerRef String session 创建者标识
projectKey String 项目 key(用于 workspace 关联)
sessionName String 用户可见名
modelProvider String 当前 provider,例如 deepseek
modelId String 当前 modelId
thinkingLevel String thinking 等级(OFF / LOW / HIGH
systemPrompt String system prompt
headEntryId String 当前 head entry,会话树游标
persistedMessageCount int 已落库的消息数
steeringMode String ALL / ONE_AT_A_TIME
followUpMode String ALL / ONE_AT_A_TIME
autoCompactionEnabled boolean 是否启用自动压缩
autoRetryEnabled boolean 是否启用自动重试
fencingToken long 最后成功提交的 fencing token
lastCommittedRunId String 最后成功完成的 runId
workspaceVersion long workspace 版本(每次 persist 自增)
createdAt / updatedAt Instant 创建 / 更新时间

索引: project_updated_idxnamespace_project_updated_idxnamespace_id_idx

样例:

{
  "_id": "session_b9b1c8d4f0a14e7f9d3a",
  "version": 12,
  "namespace": "acme-corp",
  "ownerRef": "user:dave",
  "projectKey": "user-service",
  "sessionName": "dave-backend",
  "modelProvider": "deepseek",
  "modelId": "deepseek-v4-pro",
  "thinkingLevel": "OFF",
  "systemPrompt": "You are a Java backend assistant.",
  "headEntryId": "entry_4a8c1f",
  "persistedMessageCount": 14,
  "steeringMode": "ALL",
  "followUpMode": "ALL",
  "autoCompactionEnabled": true,
  "autoRetryEnabled": false,
  "fencingToken": 42,
  "lastCommittedRunId": "run_2c5e1a3f4b6d",
  "workspaceVersion": 7,
  "createdAt": "2026-05-20T10:11:23.001Z",
  "updatedAt": "2026-05-26T08:42:55.231Z"
}

agent_session_entries

会话内每一条消息 / 工具调用 / compaction / fork summary 的不可变记录。entryId 形成树形结构(parentId 指向父节点),用于 fork 与 navigate。

字段定义:

字段 类型 说明
_id String 文档 id(自动生成)
sessionId String 所属 session
entryId String 业务 entryId(UUID)
parentId String 父 entry,构造会话树
runId String 所属 run
sequence Integer run 内单调序号,幂等键的一部分
type String message / compaction / branch_summary
payload Map<String,Object> 序列化后的 message / 工具调用结果 / 摘要内容
timestamp Instant 写入时刻

索引: session_time_idxsession_entry_idx(unique)、session_parent_idxsession_run_seq_idx(unique sparse) — 后者保证 (sessionId, runId, sequence) 三元组幂等,防止 stream 重试导致的重复追加。

样例:

// 用户 prompt
{
  "_id": "ObjectId(666...)",
  "sessionId": "session_b9b1c8d4f0a14e7f9d3a",
  "entryId": "entry_4a8c1f",
  "parentId": "entry_3f7b2e",
  "runId": "run_2c5e1a3f4b6d",
  "sequence": 0,
  "type": "message",
  "payload": {
    "role": "user",
    "content": [{"type": "text", "text": "请检查 UserService.java"}]
  },
  "timestamp": "2026-05-26T08:40:11.230Z"
}

// 工具调用结果
{
  "sessionId": "session_b9b1c8d4f0a14e7f9d3a",
  "entryId": "entry_5b9d2c",
  "parentId": "entry_4a8c1f",
  "runId": "run_2c5e1a3f4b6d",
  "sequence": 2,
  "type": "message",
  "payload": {
    "role": "tool",
    "toolCallId": "call_7abc",
    "toolName": "read",
    "content": [{"type": "text", "text": "public class UserService {...}"}],
    "isError": false
  },
  "timestamp": "2026-05-26T08:40:13.445Z"
}

// compaction 摘要
{
  "entryId": "entry_8x9y2z",
  "type": "compaction",
  "payload": {
    "keptCount": 20,
    "summary": "前 30 条消息讨论了 UserService 的设计与重构,最终决定使用 Builder 模式...",
    "compactedAt": "2026-05-25T14:00:00Z"
  }
}

runs

Run 生命周期记录。每次 prompt / continue / fork 触发的 run 都会写入一条。

字段定义:

字段 类型 说明
_id / runId String runId(run_<hex>,唯一索引)
namespace String 命名空间
sessionId String session
tenantId / userId String 租户 / 用户
operation String PROMPT / CONTINUE / STEER
queueMode String INTERRUPT / FOLLOWUP / STEER / DROP / REJECT
status String ACCEPTED / QUEUED / RUNNING / COMPLETED / FAILED / ABORTED
ownerNodeId String 当前持有 lease 的节点
fencingToken Long 写入会话的 fencing token
createdAt / queuedAt / startedAt / completedAt Instant 各阶段时间戳
failureType String 失败分类(SHUTDOWN_TIMEOUT / WORKSPACE_SNAPSHOT_FAILED / QUOTA_REJECTED / MODEL_ERROR 等)
failureMessage String 失败描述
terminalEventId String terminal event 在 Redis Stream 中的 id
version Long 乐观锁版本号

索引: run_session_created_idxrun_owner_status_idx

样例:

// 正常完成
{
  "_id": "run_2c5e1a3f4b6d",
  "runId": "run_2c5e1a3f4b6d",
  "namespace": "acme-corp",
  "sessionId": "session_b9b1c8d4f0a14e7f9d3a",
  "tenantId": "acme-corp",
  "userId": "dave",
  "operation": "PROMPT",
  "queueMode": "REJECT",
  "status": "COMPLETED",
  "ownerNodeId": "node-a",
  "fencingToken": 42,
  "createdAt": "2026-05-26T08:40:10.001Z",
  "queuedAt": null,
  "startedAt": "2026-05-26T08:40:10.150Z",
  "completedAt": "2026-05-26T08:40:18.890Z",
  "failureType": null,
  "failureMessage": null,
  "terminalEventId": "1716712818890-0",
  "version": 5
}

// shutdown timeout 失败
{
  "runId": "run_9f2e8c1a",
  "status": "FAILED",
  "ownerNodeId": "node-b",
  "failureType": "SHUTDOWN_TIMEOUT",
  "failureMessage": "active run did not complete within drain timeout",
  "terminalEventId": "1716712900123-0"
}

runtime_event_outbox

Terminal event 投递兜底表。terminal event 必须先在 Mongo 事务里写入此表,再由 worker 投递到 Redis Stream,从而保证「terminal event 至少一次送达」与「runs 终态原子落地」。

字段定义:

字段 类型 说明
_id String 文档 id
eventId String 业务 eventId
runId / namespace / sessionId / tenantId / userId String run 上下文
operation / queueMode String run 元数据快照
runStatus String terminal 后的 run 状态
runFailureType / runFailureMessage String 失败时填充
ownerNodeId String terminal 时的 owner
eventName String run_completed / run_failed / quota_rejected
payload Map event 负载
eventTimestamp Instant event 业务时间
status String PENDING / DELIVERED / FAILED_RETRYABLE
redisStreamId String append 成功后回填的 Stream ID
leaseReleasedAt Instant 释放 lease 时刻
attempts int 已尝试次数
nextAttemptAt Instant 下次重试时刻
createdAt / updatedAt Instant 时间戳

索引: outbox_event_unique_idx(unique)、outbox_retry_idx

样例:

{
  "_id": "ObjectId(...)",
  "eventId": "evt_run_completed_2c5e1a3f",
  "runId": "run_2c5e1a3f4b6d",
  "namespace": "acme-corp",
  "sessionId": "session_b9b1c8d4f0a14e7f9d3a",
  "tenantId": "acme-corp",
  "userId": "dave",
  "operation": "PROMPT",
  "queueMode": "REJECT",
  "runStatus": "COMPLETED",
  "runFailureType": null,
  "runFailureMessage": null,
  "ownerNodeId": "node-a",
  "eventName": "run_completed",
  "payload": {
    "runId": "run_2c5e1a3f4b6d",
    "totalTokens": 1234,
    "durationMs": 8740
  },
  "eventTimestamp": "2026-05-26T08:40:18.890Z",
  "status": "DELIVERED",
  "redisStreamId": "1716712818890-0",
  "leaseReleasedAt": "2026-05-26T08:40:18.920Z",
  "attempts": 1,
  "nextAttemptAt": null,
  "createdAt": "2026-05-26T08:40:18.890Z",
  "updatedAt": "2026-05-26T08:40:18.920Z"
}

audit_logs

AuditService.record(...) 异步追加。代码中实际产生的 action 取值:

session_create / prompt / continue / abort / compact / session_fork / tool_policy / tool_execution / orchestrated_prompt 等。

字段定义:

字段 类型 说明
_id String 文档 id
namespace String 命名空间
userId String 触发用户(系统调用可空)
sessionId String session
action String 操作类型
timestamp Instant 写入时刻
details Map<String,Object> 自由结构详情

索引: namespace_timestamp_idxsession_timestamp_idx

样例:

// session 创建
{
  "namespace": "acme-corp",
  "userId": null,
  "sessionId": "session_b9b1c8d4f0a14e7f9d3a",
  "action": "session_create",
  "timestamp": "2026-05-20T10:11:23.001Z",
  "details": {
    "projectKey": "user-service",
    "modelProvider": "deepseek",
    "modelId": "deepseek-v4-pro"
  }
}

// 工具执行审计
{
  "namespace": "acme-corp",
  "userId": "dave",
  "sessionId": "session_b9b1c8d4f0a14e7f9d3a",
  "action": "tool_execution",
  "timestamp": "2026-05-26T08:40:13.500Z",
  "details": {
    "toolName": "read",
    "category": "READONLY",
    "outcome": "OK",
    "durationMs": 12,
    "args": {"path": "/workspace/UserService.java"}
  }
}

// 工具策略拒绝
{
  "action": "tool_policy",
  "details": {
    "toolName": "bash",
    "category": "EXECUTABLE",
    "decision": "DENY",
    "reason": "ORCHESTRATOR_STRICT mode requires CODER subagent"
  }
}

usage_metrics

按 (namespace, date) 一行的日聚合表。计量值由 UsageMeteringService 周期性 flush(默认 30s,可配置 pi.metering.flush-interval-seconds)。

字段定义:

字段 类型 说明
_id String 文档 id
namespace String 命名空间
date LocalDate UTC 日期
totalInputTokens long 累计输入 token
totalOutputTokens long 累计输出 token
totalRequests long 累计请求 / run 数
totalToolCalls long 累计工具调用次数

索引: namespace_date_idx(unique)。

样例:

{
  "namespace": "acme-corp",
  "date": "2026-05-26",
  "totalInputTokens": 1024500,
  "totalOutputTokens": 312000,
  "totalRequests": 187,
  "totalToolCalls": 423
}

subagent_states

Subagent(spawn 出来的子 Agent)的生命周期状态。

字段定义:

字段 类型 说明
_id String subagentId(sub_<hex>
parentRunId String 父 run
tenantId / namespace / userId / projectKey / sessionId String 上下文
role String ORCHESTRATOR / PLANNER / RESEARCHER / REVIEWER / CODER / TESTER
depth int 嵌套深度
workspaceScope String INHERIT / PRIVATE
task String 任务描述
contextText String 父 Agent 注入的上下文
maxDurationSeconds int 超时
ownerNodeId String 持有节点
status String RUNNING / COMPLETED / FAILED / ABORTED
summary String 完成时的摘要
errorMessage String 失败描述
startedAt / endedAt Instant 时间戳
details Map 自由结构

索引: namespace_session_id_idx(unique)、namespace_parent_status_idxnamespace_session_status_idx

样例:

{
  "_id": "sub_3e7c1a8d",
  "parentRunId": "run_2c5e1a3f4b6d",
  "tenantId": "acme-corp",
  "namespace": "acme-corp",
  "userId": "dave",
  "projectKey": "user-service",
  "sessionId": "session_b9b1c8d4f0a14e7f9d3a",
  "role": "CODER",
  "depth": 1,
  "workspaceScope": "INHERIT",
  "task": "实现 UserService.findByEmail",
  "contextText": "Schema in /workspace/schema.sql ...",
  "maxDurationSeconds": 600,
  "ownerNodeId": "node-a",
  "status": "COMPLETED",
  "summary": "Added findByEmail using parameterized query.",
  "errorMessage": null,
  "startedAt": "2026-05-26T08:40:14.000Z",
  "endedAt": "2026-05-26T08:40:17.500Z",
  "details": {"toolCallCount": 6, "filesChanged": ["UserService.java"]}
}

Redis Keys

所有 key 由 ClusterKeyRegistry 统一管理,prefix 默认 delphi

1. Active run / 配额

delphi:run:active:<runId> — Hash

active run 元数据,由 RunAdmissionController.acquire Lua 脚本原子写入;TTL = PI_CLUSTER_RUN_MAX_TTL_MS(默认 1800000ms);后台每 ttl/3 续期一次。

field 说明
runId run id
namespace 命名空间
sessionId session
tenantId 租户
userId 用户(可空字符串)
nodeId 持有 lease 的节点
fencingToken 单调递增的 fencing token
status 固定 RUNNING
startedAt 启动时间戳(毫秒)

样例:

HGETALL delphi:run:active:run_2c5e1a3f4b6d
1) "runId"        2) "run_2c5e1a3f4b6d"
3) "namespace"    4) "acme-corp"
5) "sessionId"    6) "session_b9b1c8d4f0a14e7f9d3a"
7) "tenantId"     8) "acme-corp"
9) "userId"       10) "dave"
11) "nodeId"      12) "node-a"
13) "fencingToken" 14) "42"
15) "status"      16) "RUNNING"
17) "startedAt"   18) "1716712810150"
TTL → 1782 (秒)

delphi:run:by-session:<namespace>:<sessionId> — String

session 到 active runId 的反向索引。

GET delphi:run:by-session:acme-corp:session_b9b1c8d4f0a14e7f9d3a
"run_2c5e1a3f4b6d"

delphi:session:fencing:<namespace>:<sessionId> — Counter

fencing token 单调递增源。INCR 产生新 token。

GET delphi:session:fencing:acme-corp:session_b9b1c8d4f0a14e7f9d3a
"42"

delphi:session:lock:<namespace>:<sessionId> — String + TTL

compact / fork / navigate 命令获取的 session 写锁,TTL PI_CLUSTER_LOCK_DEFAULT_TTL_MS(默认 30000ms)。

GET delphi:session:lock:acme-corp:session_b9b1c8d4f0a14e7f9d3a
"node-a:cmd_compact_8f3e2c1a"
TTL → 28

delphi:run:tenant-active:<namespace> — Sorted Set

租户活跃 run 集合,分数为 lease 过期毫秒时间戳。ZCOUNT 用于配额校验。

ZRANGEBYSCORE delphi:run:tenant-active:acme-corp <now> +inf WITHSCORES
1) "run_2c5e1a3f4b6d"  2) "1716714610150"
3) "run_9f2e8c1a..."   4) "1716714650200"

delphi:run:user-active:<namespace>:<userId> — Sorted Set

同上,按用户聚合。

2. Run Queue

delphi:queue:session:<namespace>:<sessionId> — Stream

queued run 等待调度。消费者组:session-<namespace>-<sessionId>

每条消息字段:

field 说明
kind 固定 run
context JSON 序列化的 AgentRunContext

样例:

XRANGE delphi:queue:session:acme-corp:session_xxx - +
1) 1) "1716712820500-0"
   2) 1) "kind"    2) "run"
      3) "context" 4) "{\"runId\":\"run_3a8c1f...\",\"namespace\":\"acme-corp\",\"sessionId\":\"...\",\"tenantId\":\"acme-corp\",\"userId\":\"dave\",\"prompt\":\"...\",\"operation\":\"PROMPT\",\"queueMode\":\"INTERRUPT\"}"

delphi:queue:processing:<namespace>:<sessionId> — String

queue 处理状态指示。

delphi:queue:recovery:lock — String + TTL

queue drain stale recovery 的全局锁,确保只有一个节点执行恢复扫描。

3. Event Stream / SSE

delphi:events:session:<namespace>:<sessionId> — Stream

所有 runtime event 的持久化 stream。SSE replay 直接读它。

每条消息字段:

field 说明
eventId 业务 eventId
runId 所属 run(可空)
name 事件名(run_started / message_delta / tool_completed / run_completed 等)
envelope JSON 序列化的 RuntimeEvent(含 payload)

样例:

XREAD COUNT 2 STREAMS delphi:events:session:acme-corp:session_xxx 0
1) 1) "1716712810150-0"
   2) 1) "eventId"  2) "evt_run_started_2c5e"
      3) "runId"    4) "run_2c5e1a3f4b6d"
      5) "name"     6) "run_started"
      7) "envelope" 8) "{\"name\":\"run_started\",\"runId\":\"run_2c5e1a3f4b6d\",\"namespace\":\"acme-corp\",\"sessionId\":\"...\",\"timestamp\":\"2026-05-26T08:40:10.150Z\",\"payload\":{\"queueMode\":\"REJECT\"}}"
2) 1) "1716712818890-0"
   2) 1) "eventId"  2) "evt_run_completed_2c5e"
      3) "name"     4) "run_completed"
      5) "envelope" 6) "{...\"payload\":{\"totalTokens\":1234,\"durationMs\":8740}}"

delphi:sse:events — Pub/Sub

跨节点 SSE 实时广播频道。订阅者根据 envelope 的 (namespace, sessionId, runId) 决定是否投递给本地 emitter。

{
  "streamId": "1716712818890-0",
  "namespace": "acme-corp",
  "sessionId": "session_b9b1c8d4f0a14e7f9d3a",
  "runId": "run_2c5e1a3f4b6d",
  "name": "run_completed",
  "envelope": "{...}",
  "sourceNodeId": "node-a"
}

4. Command 通道

delphi:run:commands:<nodeId> — Pub/Sub

abort / steer 跨节点命令。每个节点订阅自己的 channel。

// abort
{
  "commandId": "cmd_3e7c1a8d",
  "type": "abort",
  "namespace": "acme-corp",
  "sessionId": "session_b9b1c8d4f0a14e7f9d3a",
  "runId": "run_2c5e1a3f4b6d",
  "payload": "user_request",
  "requestedByNodeId": "node-b"
}

// steer
{
  "commandId": "cmd_5f9d2c4a",
  "type": "steer",
  "namespace": "acme-corp",
  "sessionId": "session_b9b1c8d4f0a14e7f9d3a",
  "runId": "run_2c5e1a3f4b6d",
  "payload": "请用中文回答",
  "requestedByNodeId": "node-b"
}

delphi:subagent:commands:<nodeId> — Pub/Sub

subagent abort 跨节点命令。

{
  "commandId": "cmd_8a2b3c4d",
  "type": "abort",
  "subagentId": "sub_3e7c1a8d",
  "reason": "parent_run_aborted",
  "requestedByNodeId": "node-b"
}

5. Cache 失效

delphi:cache:invalidate — Pub/Sub

POST /api/catalog/reload 后广播,使其他节点清空本地 catalog 缓存。

{
  "scope": "NAMESPACE",
  "namespace": "acme-corp",
  "reason": "catalog_reload",
  "sourceNodeId": "node-a",
  "timestamp": "2026-05-26T08:50:00.000Z"
}

scope 可为 ALL(清空全部)或 NAMESPACE(仅清空指定 namespace)。

6. Workspace 锁

delphi:workspace:lock:<namespace>:<sessionId> — String + TTL

SnapshotWorkspaceStorage.prepareForRun 在并发恢复 snapshot 时持有的分布式锁,TTL 5 分钟。

GET delphi:workspace:lock:acme-corp:session_xxx
"node-a:8c1f2e9b"
TTL → 287

7. 节点注册

delphi:node:registry:<nodeId> — String + TTL

启动时注册,重复 nodeId 检测。

GET delphi:node:registry:node-a
"started:1716700000000;hostname:host-a.local;version:0.1.0-SNAPSHOT"

已定义但未启用的 key

Key 说明
delphi:ratelimit:<namespace> ClusterKeyRegistry 中预留,限流当前在内存实现。
delphi:usage:<yyyyMMdd>:<namespace>:<metric> 预留,计量当前直接落 Mongo usage_metrics

S3 / MinIO

Bucket:pi.execution.workspace-storage.snapshot.bucket

只用于存储 workspace snapshot,不存放任何业务记录。

对象 key 模板

workspaces/<namespace>/<sessionId>/snapshot.tar.gz

例如:

workspaces/acme-corp/session_b9b1c8d4f0a14e7f9d3a/snapshot.tar.gz

对象内容

tar.gz 归档,包含 session workspace 目录下的所有文件,排除 pi.execution.workspace-storage.snapshot.exclude 列出的子路径(默认 .skills/node_modules/.git/target/build/dist/)。

样例归档结构:

snapshot.tar.gz
├── src/
│   └── main/
│       └── java/
│           └── com/example/UserService.java
├── pom.xml
├── README.md
└── .env

写入与读取流程

  • 写入 (persistAfterRun):run 成功完成后,先打包临时文件 → PutObject 上传,覆盖旧 snapshot。空 workspace 跳过上传,避免覆盖有效 snapshot。
  • 读取 (prepareForRun):run 开始前,先检查本地 workspace 是否已就绪;否则获取 Redis workspace lock → GetObject 拉取 → 解压到本地热缓存。NoSuchKey 视为首次运行,跳过恢复。

元数据

正式 runtime 不在 S3 对象上写自定义元数据,所有 workspace 版本信息记录在 agent_sessions.workspaceVersion 中。

数据流转

下图展示一个完整 prompt run 的端到端数据写入序列(同节点):

1. POST /api/chat/stream (prompt)
   └─ Mongo: agent_sessions.upsert (创建/加载)
       └─ audit_logs.insert {action: "session_create"} (仅新建)

2. AgentRunRuntime.stream
   ├─ Mongo: runs.insert {status: ACCEPTED}
   ├─ Redis Lua acquire:
   │   ├─ HSET delphi:run:active:<runId>
   │   ├─ SET  delphi:run:by-session:<ns>:<sid>
   │   ├─ INCR delphi:session:fencing:<ns>:<sid>
   │   └─ ZADD delphi:run:tenant-active:<ns>
   ├─ Mongo: runs.update {status: RUNNING, fencingToken}
   └─ S3: GET workspaces/<ns>/<sid>/snapshot.tar.gz (解压)

3. RuntimeEvent (run_started)
   ├─ Redis Stream: XADD delphi:events:session:<ns>:<sid>
   └─ Redis Pub/Sub: PUBLISH delphi:sse:events

4. Agent.runLoop (流式推理 + 工具调用)
   ├─ Mongo: agent_session_entries.insert (用户消息, sequence=0)
   │   └─ audit_logs.insert {action: "prompt"}
   ├─ Redis Stream: XADD message_start / message_delta / message_end
   ├─ Mongo: agent_session_entries.insert (assistant, sequence=1)
   ├─ Redis Stream: XADD tool_started
   │   └─ audit_logs.insert {action: "tool_execution"}
   ├─ Mongo: agent_session_entries.insert (tool_result, sequence=2)
   └─ Redis Stream: XADD tool_completed

5. Run 完成
   ├─ S3: PUT workspaces/<ns>/<sid>/snapshot.tar.gz
   ├─ Mongo TX:
   │   └─ runtime_event_outbox.insert {status: PENDING, eventName: run_completed}
   ├─ RuntimeEventOutboxWorker:
   │   ├─ Redis Stream: XADD delphi:events:session:... (terminal)
   │   ├─ Mongo: runs.update {status: COMPLETED, terminalEventId}
   │   ├─ Mongo: runtime_event_outbox.update {status: DELIVERED, redisStreamId}
   │   └─ Redis Lua release:
   │       ├─ DEL delphi:run:active:<runId>
   │       ├─ DEL delphi:run:by-session:<ns>:<sid>
   │       └─ ZREM delphi:run:tenant-active:<ns>
   ├─ Mongo: agent_sessions.update {workspaceVersion+1, lastCommittedRunId}
   └─ Mongo: usage_metrics.upsert {namespace, date, totalInputTokens+=, ...}

数据保留与清理

数据 保留策略 清理方式
agent_sessions 长期(业务保留) 应用层删除(删除 session 同时清理 entries)
agent_session_entries 长期 同上
runs 长期(用于审计回溯) 业务侧定期归档
runtime_event_outbox 直到 DELIVERED 后可定期清理已成功记录 自定义清理 job
audit_logs 长期;查询接口默认窗口 7 天 业务侧 TTL(建议 90 天)
usage_metrics 按天 业务侧 TTL(建议 365 天)
subagent_states 中期 业务侧 TTL
Redis active / queue / event stream TTL 自动过期 部分 stream 建议设置 MAXLEN 控制大小
Redis pub/sub 即时 不持久化
S3 snapshot 长期;与 session 同生命周期 session 删除时通过应用层清理对象