本文档梳理 Delphi Agent 在三类外部存储中的所有数据结构、用途与样例数据:
- MongoDB — 持久化主存储(session、entries、runs、outbox、audit、usage、subagent)
- Redis — 协调与实时层(active run lease、queue、event stream、pub/sub、配额计数)
- S3 / MinIO — Workspace snapshot 对象存储
命名约定:Mongo
_id即id字段;Redis key 中<prefix>默认是delphi,可通过PI_CLUSTER_REDIS_KEY_PREFIX修改;下文样例统一用delphi。
| Collection | 主键 / 唯一索引 | 主要写入路径 | 读取路径 |
|---|---|---|---|
agent_sessions |
_id(sessionId) |
session 创建 / compact / fork / setModel | SDK / /api/chat/stream 加载 session |
agent_session_entries |
(sessionId, entryId) 唯一;(sessionId, runId, sequence) 唯一 sparse |
prompt/continue 期间消息追加 | session 历史回放、fork、navigate |
runs |
runId 唯一 |
run admission → outbox commit | /api/audit、调度恢复 |
runtime_event_outbox |
(runId, eventName, eventId) 唯一 |
terminal event 提交 | RuntimeEventOutboxWorker 重试投递 |
audit_logs |
_id |
AuditService.record |
GET /api/audit |
usage_metrics |
(namespace, date) 唯一 |
UsageMeteringService flush |
GET /api/usage |
subagent_states |
(namespace, sessionId, id) 唯一 |
SubagentRuntime.spawn |
subagent 状态查询 |
会话元数据 + head 指针 + fencing token。是 session 树的根节点。
字段定义:
| 字段 | 类型 | 说明 |
|---|---|---|
_id |
String |
sessionId(业务方传入或 session_<uuid>) |
version |
Long (@Version) |
乐观锁版本号 |
namespace |
String |
租户命名空间(== tenantId) |
ownerRef |
String |
session 创建者标识 |
projectKey |
String |
项目 key(用于 workspace 关联) |
sessionName |
String |
用户可见名 |
modelProvider |
String |
当前 provider,例如 deepseek |
modelId |
String |
当前 modelId |
thinkingLevel |
String |
thinking 等级(OFF / LOW / HIGH) |
systemPrompt |
String |
system prompt |
headEntryId |
String |
当前 head entry,会话树游标 |
persistedMessageCount |
int |
已落库的消息数 |
steeringMode |
String |
ALL / ONE_AT_A_TIME |
followUpMode |
String |
ALL / ONE_AT_A_TIME |
autoCompactionEnabled |
boolean |
是否启用自动压缩 |
autoRetryEnabled |
boolean |
是否启用自动重试 |
fencingToken |
long |
最后成功提交的 fencing token |
lastCommittedRunId |
String |
最后成功完成的 runId |
workspaceVersion |
long |
workspace 版本(每次 persist 自增) |
createdAt / updatedAt |
Instant |
创建 / 更新时间 |
索引: project_updated_idx、namespace_project_updated_idx、namespace_id_idx。
样例:
{
"_id": "session_b9b1c8d4f0a14e7f9d3a",
"version": 12,
"namespace": "acme-corp",
"ownerRef": "user:dave",
"projectKey": "user-service",
"sessionName": "dave-backend",
"modelProvider": "deepseek",
"modelId": "deepseek-v4-pro",
"thinkingLevel": "OFF",
"systemPrompt": "You are a Java backend assistant.",
"headEntryId": "entry_4a8c1f",
"persistedMessageCount": 14,
"steeringMode": "ALL",
"followUpMode": "ALL",
"autoCompactionEnabled": true,
"autoRetryEnabled": false,
"fencingToken": 42,
"lastCommittedRunId": "run_2c5e1a3f4b6d",
"workspaceVersion": 7,
"createdAt": "2026-05-20T10:11:23.001Z",
"updatedAt": "2026-05-26T08:42:55.231Z"
}会话内每一条消息 / 工具调用 / compaction / fork summary 的不可变记录。entryId 形成树形结构(parentId 指向父节点),用于 fork 与 navigate。
字段定义:
| 字段 | 类型 | 说明 |
|---|---|---|
_id |
String |
文档 id(自动生成) |
sessionId |
String |
所属 session |
entryId |
String |
业务 entryId(UUID) |
parentId |
String |
父 entry,构造会话树 |
runId |
String |
所属 run |
sequence |
Integer |
run 内单调序号,幂等键的一部分 |
type |
String |
message / compaction / branch_summary |
payload |
Map<String,Object> |
序列化后的 message / 工具调用结果 / 摘要内容 |
timestamp |
Instant |
写入时刻 |
索引: session_time_idx、session_entry_idx(unique)、session_parent_idx、session_run_seq_idx(unique sparse) — 后者保证 (sessionId, runId, sequence) 三元组幂等,防止 stream 重试导致的重复追加。
样例:
// 用户 prompt
{
"_id": "ObjectId(666...)",
"sessionId": "session_b9b1c8d4f0a14e7f9d3a",
"entryId": "entry_4a8c1f",
"parentId": "entry_3f7b2e",
"runId": "run_2c5e1a3f4b6d",
"sequence": 0,
"type": "message",
"payload": {
"role": "user",
"content": [{"type": "text", "text": "请检查 UserService.java"}]
},
"timestamp": "2026-05-26T08:40:11.230Z"
}
// 工具调用结果
{
"sessionId": "session_b9b1c8d4f0a14e7f9d3a",
"entryId": "entry_5b9d2c",
"parentId": "entry_4a8c1f",
"runId": "run_2c5e1a3f4b6d",
"sequence": 2,
"type": "message",
"payload": {
"role": "tool",
"toolCallId": "call_7abc",
"toolName": "read",
"content": [{"type": "text", "text": "public class UserService {...}"}],
"isError": false
},
"timestamp": "2026-05-26T08:40:13.445Z"
}
// compaction 摘要
{
"entryId": "entry_8x9y2z",
"type": "compaction",
"payload": {
"keptCount": 20,
"summary": "前 30 条消息讨论了 UserService 的设计与重构,最终决定使用 Builder 模式...",
"compactedAt": "2026-05-25T14:00:00Z"
}
}Run 生命周期记录。每次 prompt / continue / fork 触发的 run 都会写入一条。
字段定义:
| 字段 | 类型 | 说明 |
|---|---|---|
_id / runId |
String |
runId(run_<hex>,唯一索引) |
namespace |
String |
命名空间 |
sessionId |
String |
session |
tenantId / userId |
String |
租户 / 用户 |
operation |
String |
PROMPT / CONTINUE / STEER 等 |
queueMode |
String |
INTERRUPT / FOLLOWUP / STEER / DROP / REJECT |
status |
String |
ACCEPTED / QUEUED / RUNNING / COMPLETED / FAILED / ABORTED |
ownerNodeId |
String |
当前持有 lease 的节点 |
fencingToken |
Long |
写入会话的 fencing token |
createdAt / queuedAt / startedAt / completedAt |
Instant |
各阶段时间戳 |
failureType |
String |
失败分类(SHUTDOWN_TIMEOUT / WORKSPACE_SNAPSHOT_FAILED / QUOTA_REJECTED / MODEL_ERROR 等) |
failureMessage |
String |
失败描述 |
terminalEventId |
String |
terminal event 在 Redis Stream 中的 id |
version |
Long |
乐观锁版本号 |
索引: run_session_created_idx、run_owner_status_idx。
样例:
// 正常完成
{
"_id": "run_2c5e1a3f4b6d",
"runId": "run_2c5e1a3f4b6d",
"namespace": "acme-corp",
"sessionId": "session_b9b1c8d4f0a14e7f9d3a",
"tenantId": "acme-corp",
"userId": "dave",
"operation": "PROMPT",
"queueMode": "REJECT",
"status": "COMPLETED",
"ownerNodeId": "node-a",
"fencingToken": 42,
"createdAt": "2026-05-26T08:40:10.001Z",
"queuedAt": null,
"startedAt": "2026-05-26T08:40:10.150Z",
"completedAt": "2026-05-26T08:40:18.890Z",
"failureType": null,
"failureMessage": null,
"terminalEventId": "1716712818890-0",
"version": 5
}
// shutdown timeout 失败
{
"runId": "run_9f2e8c1a",
"status": "FAILED",
"ownerNodeId": "node-b",
"failureType": "SHUTDOWN_TIMEOUT",
"failureMessage": "active run did not complete within drain timeout",
"terminalEventId": "1716712900123-0"
}Terminal event 投递兜底表。terminal event 必须先在 Mongo 事务里写入此表,再由 worker 投递到 Redis Stream,从而保证「terminal event 至少一次送达」与「runs 终态原子落地」。
字段定义:
| 字段 | 类型 | 说明 |
|---|---|---|
_id |
String |
文档 id |
eventId |
String |
业务 eventId |
runId / namespace / sessionId / tenantId / userId |
String |
run 上下文 |
operation / queueMode |
String |
run 元数据快照 |
runStatus |
String |
terminal 后的 run 状态 |
runFailureType / runFailureMessage |
String |
失败时填充 |
ownerNodeId |
String |
terminal 时的 owner |
eventName |
String |
run_completed / run_failed / quota_rejected |
payload |
Map |
event 负载 |
eventTimestamp |
Instant |
event 业务时间 |
status |
String |
PENDING / DELIVERED / FAILED_RETRYABLE |
redisStreamId |
String |
append 成功后回填的 Stream ID |
leaseReleasedAt |
Instant |
释放 lease 时刻 |
attempts |
int |
已尝试次数 |
nextAttemptAt |
Instant |
下次重试时刻 |
createdAt / updatedAt |
Instant |
时间戳 |
索引: outbox_event_unique_idx(unique)、outbox_retry_idx。
样例:
{
"_id": "ObjectId(...)",
"eventId": "evt_run_completed_2c5e1a3f",
"runId": "run_2c5e1a3f4b6d",
"namespace": "acme-corp",
"sessionId": "session_b9b1c8d4f0a14e7f9d3a",
"tenantId": "acme-corp",
"userId": "dave",
"operation": "PROMPT",
"queueMode": "REJECT",
"runStatus": "COMPLETED",
"runFailureType": null,
"runFailureMessage": null,
"ownerNodeId": "node-a",
"eventName": "run_completed",
"payload": {
"runId": "run_2c5e1a3f4b6d",
"totalTokens": 1234,
"durationMs": 8740
},
"eventTimestamp": "2026-05-26T08:40:18.890Z",
"status": "DELIVERED",
"redisStreamId": "1716712818890-0",
"leaseReleasedAt": "2026-05-26T08:40:18.920Z",
"attempts": 1,
"nextAttemptAt": null,
"createdAt": "2026-05-26T08:40:18.890Z",
"updatedAt": "2026-05-26T08:40:18.920Z"
}由 AuditService.record(...) 异步追加。代码中实际产生的 action 取值:
session_create / prompt / continue / abort / compact / session_fork / tool_policy / tool_execution / orchestrated_prompt 等。
字段定义:
| 字段 | 类型 | 说明 |
|---|---|---|
_id |
String |
文档 id |
namespace |
String |
命名空间 |
userId |
String |
触发用户(系统调用可空) |
sessionId |
String |
session |
action |
String |
操作类型 |
timestamp |
Instant |
写入时刻 |
details |
Map<String,Object> |
自由结构详情 |
索引: namespace_timestamp_idx、session_timestamp_idx。
样例:
// session 创建
{
"namespace": "acme-corp",
"userId": null,
"sessionId": "session_b9b1c8d4f0a14e7f9d3a",
"action": "session_create",
"timestamp": "2026-05-20T10:11:23.001Z",
"details": {
"projectKey": "user-service",
"modelProvider": "deepseek",
"modelId": "deepseek-v4-pro"
}
}
// 工具执行审计
{
"namespace": "acme-corp",
"userId": "dave",
"sessionId": "session_b9b1c8d4f0a14e7f9d3a",
"action": "tool_execution",
"timestamp": "2026-05-26T08:40:13.500Z",
"details": {
"toolName": "read",
"category": "READONLY",
"outcome": "OK",
"durationMs": 12,
"args": {"path": "/workspace/UserService.java"}
}
}
// 工具策略拒绝
{
"action": "tool_policy",
"details": {
"toolName": "bash",
"category": "EXECUTABLE",
"decision": "DENY",
"reason": "ORCHESTRATOR_STRICT mode requires CODER subagent"
}
}按 (namespace, date) 一行的日聚合表。计量值由 UsageMeteringService 周期性 flush(默认 30s,可配置 pi.metering.flush-interval-seconds)。
字段定义:
| 字段 | 类型 | 说明 |
|---|---|---|
_id |
String |
文档 id |
namespace |
String |
命名空间 |
date |
LocalDate |
UTC 日期 |
totalInputTokens |
long |
累计输入 token |
totalOutputTokens |
long |
累计输出 token |
totalRequests |
long |
累计请求 / run 数 |
totalToolCalls |
long |
累计工具调用次数 |
索引: namespace_date_idx(unique)。
样例:
{
"namespace": "acme-corp",
"date": "2026-05-26",
"totalInputTokens": 1024500,
"totalOutputTokens": 312000,
"totalRequests": 187,
"totalToolCalls": 423
}Subagent(spawn 出来的子 Agent)的生命周期状态。
字段定义:
| 字段 | 类型 | 说明 |
|---|---|---|
_id |
String |
subagentId(sub_<hex>) |
parentRunId |
String |
父 run |
tenantId / namespace / userId / projectKey / sessionId |
String |
上下文 |
role |
String |
ORCHESTRATOR / PLANNER / RESEARCHER / REVIEWER / CODER / TESTER |
depth |
int |
嵌套深度 |
workspaceScope |
String |
INHERIT / PRIVATE |
task |
String |
任务描述 |
contextText |
String |
父 Agent 注入的上下文 |
maxDurationSeconds |
int |
超时 |
ownerNodeId |
String |
持有节点 |
status |
String |
RUNNING / COMPLETED / FAILED / ABORTED |
summary |
String |
完成时的摘要 |
errorMessage |
String |
失败描述 |
startedAt / endedAt |
Instant |
时间戳 |
details |
Map |
自由结构 |
索引: namespace_session_id_idx(unique)、namespace_parent_status_idx、namespace_session_status_idx。
样例:
{
"_id": "sub_3e7c1a8d",
"parentRunId": "run_2c5e1a3f4b6d",
"tenantId": "acme-corp",
"namespace": "acme-corp",
"userId": "dave",
"projectKey": "user-service",
"sessionId": "session_b9b1c8d4f0a14e7f9d3a",
"role": "CODER",
"depth": 1,
"workspaceScope": "INHERIT",
"task": "实现 UserService.findByEmail",
"contextText": "Schema in /workspace/schema.sql ...",
"maxDurationSeconds": 600,
"ownerNodeId": "node-a",
"status": "COMPLETED",
"summary": "Added findByEmail using parameterized query.",
"errorMessage": null,
"startedAt": "2026-05-26T08:40:14.000Z",
"endedAt": "2026-05-26T08:40:17.500Z",
"details": {"toolCallCount": 6, "filesChanged": ["UserService.java"]}
}所有 key 由 ClusterKeyRegistry 统一管理,prefix 默认 delphi。
active run 元数据,由 RunAdmissionController.acquire Lua 脚本原子写入;TTL = PI_CLUSTER_RUN_MAX_TTL_MS(默认 1800000ms);后台每 ttl/3 续期一次。
| field | 说明 |
|---|---|
runId |
run id |
namespace |
命名空间 |
sessionId |
session |
tenantId |
租户 |
userId |
用户(可空字符串) |
nodeId |
持有 lease 的节点 |
fencingToken |
单调递增的 fencing token |
status |
固定 RUNNING |
startedAt |
启动时间戳(毫秒) |
样例:
HGETALL delphi:run:active:run_2c5e1a3f4b6d
1) "runId" 2) "run_2c5e1a3f4b6d"
3) "namespace" 4) "acme-corp"
5) "sessionId" 6) "session_b9b1c8d4f0a14e7f9d3a"
7) "tenantId" 8) "acme-corp"
9) "userId" 10) "dave"
11) "nodeId" 12) "node-a"
13) "fencingToken" 14) "42"
15) "status" 16) "RUNNING"
17) "startedAt" 18) "1716712810150"
TTL → 1782 (秒)
session 到 active runId 的反向索引。
GET delphi:run:by-session:acme-corp:session_b9b1c8d4f0a14e7f9d3a
"run_2c5e1a3f4b6d"
fencing token 单调递增源。INCR 产生新 token。
GET delphi:session:fencing:acme-corp:session_b9b1c8d4f0a14e7f9d3a
"42"
compact / fork / navigate 命令获取的 session 写锁,TTL PI_CLUSTER_LOCK_DEFAULT_TTL_MS(默认 30000ms)。
GET delphi:session:lock:acme-corp:session_b9b1c8d4f0a14e7f9d3a
"node-a:cmd_compact_8f3e2c1a"
TTL → 28
租户活跃 run 集合,分数为 lease 过期毫秒时间戳。ZCOUNT 用于配额校验。
ZRANGEBYSCORE delphi:run:tenant-active:acme-corp <now> +inf WITHSCORES
1) "run_2c5e1a3f4b6d" 2) "1716714610150"
3) "run_9f2e8c1a..." 4) "1716714650200"
同上,按用户聚合。
queued run 等待调度。消费者组:session-<namespace>-<sessionId>。
每条消息字段:
| field | 说明 |
|---|---|
kind |
固定 run |
context |
JSON 序列化的 AgentRunContext |
样例:
XRANGE delphi:queue:session:acme-corp:session_xxx - +
1) 1) "1716712820500-0"
2) 1) "kind" 2) "run"
3) "context" 4) "{\"runId\":\"run_3a8c1f...\",\"namespace\":\"acme-corp\",\"sessionId\":\"...\",\"tenantId\":\"acme-corp\",\"userId\":\"dave\",\"prompt\":\"...\",\"operation\":\"PROMPT\",\"queueMode\":\"INTERRUPT\"}"
queue 处理状态指示。
queue drain stale recovery 的全局锁,确保只有一个节点执行恢复扫描。
所有 runtime event 的持久化 stream。SSE replay 直接读它。
每条消息字段:
| field | 说明 |
|---|---|
eventId |
业务 eventId |
runId |
所属 run(可空) |
name |
事件名(run_started / message_delta / tool_completed / run_completed 等) |
envelope |
JSON 序列化的 RuntimeEvent(含 payload) |
样例:
XREAD COUNT 2 STREAMS delphi:events:session:acme-corp:session_xxx 0
1) 1) "1716712810150-0"
2) 1) "eventId" 2) "evt_run_started_2c5e"
3) "runId" 4) "run_2c5e1a3f4b6d"
5) "name" 6) "run_started"
7) "envelope" 8) "{\"name\":\"run_started\",\"runId\":\"run_2c5e1a3f4b6d\",\"namespace\":\"acme-corp\",\"sessionId\":\"...\",\"timestamp\":\"2026-05-26T08:40:10.150Z\",\"payload\":{\"queueMode\":\"REJECT\"}}"
2) 1) "1716712818890-0"
2) 1) "eventId" 2) "evt_run_completed_2c5e"
3) "name" 4) "run_completed"
5) "envelope" 6) "{...\"payload\":{\"totalTokens\":1234,\"durationMs\":8740}}"
跨节点 SSE 实时广播频道。订阅者根据 envelope 的 (namespace, sessionId, runId) 决定是否投递给本地 emitter。
{
"streamId": "1716712818890-0",
"namespace": "acme-corp",
"sessionId": "session_b9b1c8d4f0a14e7f9d3a",
"runId": "run_2c5e1a3f4b6d",
"name": "run_completed",
"envelope": "{...}",
"sourceNodeId": "node-a"
}abort / steer 跨节点命令。每个节点订阅自己的 channel。
// abort
{
"commandId": "cmd_3e7c1a8d",
"type": "abort",
"namespace": "acme-corp",
"sessionId": "session_b9b1c8d4f0a14e7f9d3a",
"runId": "run_2c5e1a3f4b6d",
"payload": "user_request",
"requestedByNodeId": "node-b"
}
// steer
{
"commandId": "cmd_5f9d2c4a",
"type": "steer",
"namespace": "acme-corp",
"sessionId": "session_b9b1c8d4f0a14e7f9d3a",
"runId": "run_2c5e1a3f4b6d",
"payload": "请用中文回答",
"requestedByNodeId": "node-b"
}subagent abort 跨节点命令。
{
"commandId": "cmd_8a2b3c4d",
"type": "abort",
"subagentId": "sub_3e7c1a8d",
"reason": "parent_run_aborted",
"requestedByNodeId": "node-b"
}POST /api/catalog/reload 后广播,使其他节点清空本地 catalog 缓存。
{
"scope": "NAMESPACE",
"namespace": "acme-corp",
"reason": "catalog_reload",
"sourceNodeId": "node-a",
"timestamp": "2026-05-26T08:50:00.000Z"
}scope 可为 ALL(清空全部)或 NAMESPACE(仅清空指定 namespace)。
SnapshotWorkspaceStorage.prepareForRun 在并发恢复 snapshot 时持有的分布式锁,TTL 5 分钟。
GET delphi:workspace:lock:acme-corp:session_xxx
"node-a:8c1f2e9b"
TTL → 287
启动时注册,重复 nodeId 检测。
GET delphi:node:registry:node-a
"started:1716700000000;hostname:host-a.local;version:0.1.0-SNAPSHOT"
| Key | 说明 |
|---|---|
delphi:ratelimit:<namespace> |
在 ClusterKeyRegistry 中预留,限流当前在内存实现。 |
delphi:usage:<yyyyMMdd>:<namespace>:<metric> |
预留,计量当前直接落 Mongo usage_metrics。 |
只用于存储 workspace snapshot,不存放任何业务记录。
workspaces/<namespace>/<sessionId>/snapshot.tar.gz
例如:
workspaces/acme-corp/session_b9b1c8d4f0a14e7f9d3a/snapshot.tar.gz
tar.gz 归档,包含 session workspace 目录下的所有文件,排除 pi.execution.workspace-storage.snapshot.exclude 列出的子路径(默认 .skills/、node_modules/、.git/、target/、build/、dist/)。
样例归档结构:
snapshot.tar.gz
├── src/
│ └── main/
│ └── java/
│ └── com/example/UserService.java
├── pom.xml
├── README.md
└── .env
- 写入 (
persistAfterRun):run 成功完成后,先打包临时文件 →PutObject上传,覆盖旧 snapshot。空 workspace 跳过上传,避免覆盖有效 snapshot。 - 读取 (
prepareForRun):run 开始前,先检查本地 workspace 是否已就绪;否则获取 Redis workspace lock →GetObject拉取 → 解压到本地热缓存。NoSuchKey视为首次运行,跳过恢复。
正式 runtime 不在 S3 对象上写自定义元数据,所有 workspace 版本信息记录在 agent_sessions.workspaceVersion 中。
下图展示一个完整 prompt run 的端到端数据写入序列(同节点):
1. POST /api/chat/stream (prompt)
└─ Mongo: agent_sessions.upsert (创建/加载)
└─ audit_logs.insert {action: "session_create"} (仅新建)
2. AgentRunRuntime.stream
├─ Mongo: runs.insert {status: ACCEPTED}
├─ Redis Lua acquire:
│ ├─ HSET delphi:run:active:<runId>
│ ├─ SET delphi:run:by-session:<ns>:<sid>
│ ├─ INCR delphi:session:fencing:<ns>:<sid>
│ └─ ZADD delphi:run:tenant-active:<ns>
├─ Mongo: runs.update {status: RUNNING, fencingToken}
└─ S3: GET workspaces/<ns>/<sid>/snapshot.tar.gz (解压)
3. RuntimeEvent (run_started)
├─ Redis Stream: XADD delphi:events:session:<ns>:<sid>
└─ Redis Pub/Sub: PUBLISH delphi:sse:events
4. Agent.runLoop (流式推理 + 工具调用)
├─ Mongo: agent_session_entries.insert (用户消息, sequence=0)
│ └─ audit_logs.insert {action: "prompt"}
├─ Redis Stream: XADD message_start / message_delta / message_end
├─ Mongo: agent_session_entries.insert (assistant, sequence=1)
├─ Redis Stream: XADD tool_started
│ └─ audit_logs.insert {action: "tool_execution"}
├─ Mongo: agent_session_entries.insert (tool_result, sequence=2)
└─ Redis Stream: XADD tool_completed
5. Run 完成
├─ S3: PUT workspaces/<ns>/<sid>/snapshot.tar.gz
├─ Mongo TX:
│ └─ runtime_event_outbox.insert {status: PENDING, eventName: run_completed}
├─ RuntimeEventOutboxWorker:
│ ├─ Redis Stream: XADD delphi:events:session:... (terminal)
│ ├─ Mongo: runs.update {status: COMPLETED, terminalEventId}
│ ├─ Mongo: runtime_event_outbox.update {status: DELIVERED, redisStreamId}
│ └─ Redis Lua release:
│ ├─ DEL delphi:run:active:<runId>
│ ├─ DEL delphi:run:by-session:<ns>:<sid>
│ └─ ZREM delphi:run:tenant-active:<ns>
├─ Mongo: agent_sessions.update {workspaceVersion+1, lastCommittedRunId}
└─ Mongo: usage_metrics.upsert {namespace, date, totalInputTokens+=, ...}
| 数据 | 保留策略 | 清理方式 |
|---|---|---|
agent_sessions |
长期(业务保留) | 应用层删除(删除 session 同时清理 entries) |
agent_session_entries |
长期 | 同上 |
runs |
长期(用于审计回溯) | 业务侧定期归档 |
runtime_event_outbox |
直到 DELIVERED 后可定期清理已成功记录 |
自定义清理 job |
audit_logs |
长期;查询接口默认窗口 7 天 | 业务侧 TTL(建议 90 天) |
usage_metrics |
按天 | 业务侧 TTL(建议 365 天) |
subagent_states |
中期 | 业务侧 TTL |
| Redis active / queue / event stream | TTL 自动过期 | 部分 stream 建议设置 MAXLEN 控制大小 |
| Redis pub/sub | 即时 | 不持久化 |
| S3 snapshot | 长期;与 session 同生命周期 | session 删除时通过应用层清理对象 |