DOCS.md 是 API / SDK / runtime 配置的入口。技术架构图见 ARCHITECHE.MD,项目运行说明见 README.md,数据存储结构见 DATA.md。
兼容入口:DOC.md。
- API Reference
- API Surface
- Required Headers
POST /api/chat/stream- Command 一致性
- SSE Events
GET /api/chat/events- Catalog API
- Audit / Usage API
- Runtime Configuration
- Redis Keys
- Mongo Collections
- Failure / Shutdown 不变量
- Actuator
- SDK Reference
| 方法 | 路径 | 说明 |
|---|---|---|
POST |
/api/chat/stream |
主入口:发送 prompt、continue 或 command,统一返回 SSE。 |
GET |
/api/chat/events |
SSE replay 入口,基于 Redis Stream ID 补读并接 live。 |
GET |
/api/catalog/models?provider=<provider> |
查询模型列表。 |
GET |
/api/catalog/skills?namespace=<ns> |
查询 namespace 可见 skills。 |
GET |
/api/catalog/skills/{name}?namespace=<ns> |
查询单个 skill。 |
GET |
/api/catalog/prompts |
查询 prompt catalog。 |
GET |
/api/catalog/resources |
查询 resource catalog。 |
POST |
/api/catalog/reload?namespace=<ns> |
重扫 catalog,并清理 skill cache,跨节点广播失效。 |
GET |
/api/audit?namespace=<ns>&from=&to=&limit= |
查询审计日志(默认近 7 天)。 |
GET |
/api/usage?namespace=<ns>&from=&to= |
查询用量统计(默认近 30 天)。 |
GET |
/api/usage/today?namespace=<ns> |
查询今日用量。 |
GET |
/actuator/health |
Redis、node drain、cluster runtime 健康。 |
GET |
/actuator/info |
nodeId、hostname、version、startTime、draining。 |
GET |
/actuator/metrics / /actuator/prometheus |
运行指标。 |
| Header | 说明 |
|---|---|
Content-Type: application/json |
POST 请求必须使用 JSON。 |
X-Tenant-Id |
租户标识,必须等于请求体中的 namespace。 |
X-User-Id |
当前用户标识,用于审计和计量。 |
RuntimeIdentityResolver 会校验 header 与 namespace 的一致性,不一致时返回 400 / TENANT_MISMATCH。
请求分两类:
command为空:普通 prompt,服务端创建或加载 session,进入AgentRunRuntime。command非空:执行 command;continue会作为标准 run 进入AgentRunRuntime,其他 command 返回ackSSE。
关键字段:
| 字段 | 说明 |
|---|---|
namespace |
必填,必须与 X-Tenant-Id 一致。 |
sessionId |
可选;为空时创建新 session。 |
projectKey |
可选,用于 session/project workspace 关联。 |
prompt |
prompt 文本;steer/continue 按命令语义使用。 |
provider / modelId |
可选;创建新 session 时未传则使用默认模型(pi.defaults)。 |
systemPrompt |
可选,新 session 的 system prompt。 |
queueMode |
INTERRUPT / FOLLOWUP / STEER / DROP / REJECT;普通 prompt 未传默认 REJECT。 |
command |
abort / steer / compact / fork / navigate / continue,大小写会归一化。 |
commandArgs |
命令参数,例如 {"keep": 20}、{"entryId": "..."}、{"newSessionName": "..."}。 |
节点进入 draining 后:
| 请求类型 | 行为 |
|---|---|
| 新 prompt | 拒绝,返回 503 / NODE_DRAINING。 |
continue |
拒绝,返回 503 / NODE_DRAINING。 |
compact / fork / navigate |
拒绝,返回 503 / NODE_DRAINING。 |
abort / steer |
允许,用于结束或引导已有 active run。 |
AgentRunRuntime.dispatchByQueuePolicy 通过 RunQueueManager.decide() 计算决策:
| 决策 | 行为 |
|---|---|
RUN_NOW |
无 active run,直接调度。 |
INTERRUPT |
有 active run,先 abort 旧 run,新 run 入队等待。 |
ENQUEUE |
有 active run,新 run 入队等待。 |
STEER |
有 active run,将 prompt 注入当前 run,发出 queue_updated 事件。 |
DROP |
有 active run,丢弃新请求,发出 queue_updated 事件。 |
REJECT |
拒绝,发 quota_rejected 事件。 |
TenantRuntimeGuard.ensureQueueCapacity 在入队前会检查租户队列容量。
| 命令 | 行为 |
|---|---|
abort |
AgentRunRuntime.abort 查找 active owner;远端 owner 通过 Redis command channel 中止本地 Agent;同时取消子 Agent。 |
steer |
通过 active owner 注入当前 run;如果无 active run,退化为 session steering(写入 steeringQueue)。 |
compact |
走 SessionCommandRuntime,active run 存在时拒绝,空闲时获取 Redis session lock 后执行。 |
fork |
同上,返回 forkSessionId。commandArgs.entryId 指定从哪个 entry fork。 |
navigate |
同上,避免导航和正在运行的 Agent 并发修改 head。commandArgs.entryId 指定目标节点。 |
continue |
作为 AgentRunOperation.CONTINUE 进入 run admission、queue、fencing、SSE 生命周期。 |
会话写入幂等性:
- prompt / continue 携带
runId + fencingToken进入AgentSessionRuntime。 - Mongo entries 使用
sessionId + runId + sequence唯一索引避免重复追加。 SessionDocument.fencingToken记录最后成功提交的 token;低 token 写入会被拒绝。
| 类别 | 事件名称 |
|---|---|
| Run lifecycle | run_started、queue_updated、run_completed、run_failed、quota_rejected |
| Message stream | message_start、message_delta、message_end |
| Tool execution | tool_started、tool_updated、tool_completed |
| Subagent | subagent_started、subagent_completed、subagent_failed |
| Command ack | ack |
每个 runtime event 的 SSE id 是 Redis Stream ID。客户端应保存最后处理成功的 SSE id,用于断线补读。
SessionEventBroker 实现 RuntimeEventSink,本地投递 SSE 的同时通过 RedisSseEventPublisher 在 <prefix>:sse:events 频道 fanout 到其他节点;终端事件触发 SseEmitter.complete()。
用于 replay + live subscribe。
参数:
| 参数 | 说明 |
|---|---|
namespace |
必填。 |
sessionId |
必填。 |
runId |
可选;传入后只接收该 run 的事件。 |
lastEventId |
可选;等价于 Last-Event-ID header。 |
mode |
live 默认;replay-only 表示追到 high-watermark 后 complete。 |
from |
last 默认;run-start 表示从 Redis Stream 起点补读。 |
补读流程(SessionEventReplayController):
- 服务端读取当前 Redis Stream high-watermark。
- 如果历史中已有当前 run 的 terminal event,直接 replay 后 complete。
- 否则注册 live emitter,并标记 live 只接收
> highWatermark的事件。 - 补发
(lastEventId, highWatermark]。 - live 阶段继续接收新事件。
这样客户端可连接任意节点,不依赖原始 owner node 的本地 SseEmitter。
/api/catalog/* 由 CatalogController 提供:
GET /api/catalog/models?provider=<provider>:返回pi.models中已注册的模型,可按 provider 过滤。GET /api/catalog/skills?namespace=<ns>:返回 namespace 可见 Skills(public + namespace 私有);同名 namespace 会覆盖 public。GET /api/catalog/skills/{name}?namespace=<ns>:返回单个 skill 的 metadata 和content。GET /api/catalog/prompts:列出 prompt catalog。GET /api/catalog/resources:列出 resource catalog。POST /api/catalog/reload?namespace=<ns>:重新扫描磁盘,并通过CacheInvalidationPublisher在集群内广播失效,使其他节点也清空 cache。
| 路径 | 说明 |
|---|---|
GET /api/audit?namespace=<ns>&from=&to=&limit= |
查询 audit_logs。from / to 为 ISO-8601 instant;缺省返回近 7 天,最大 limit=500。 |
GET /api/usage?namespace=<ns>&from=&to= |
查询 usage_metrics,按天聚合 token / call / sandbox 用量。缺省近 30 天。 |
GET /api/usage/today?namespace=<ns> |
当天聚合,便于 dashboard 实时显示。 |
正式 runtime 不区分 local / standalone / cluster 配置开关。下面这些依赖和配置始终是正式运行路径的一部分。
| 环境变量 / Property | 默认值 | 说明 |
|---|---|---|
MONGODB_URI |
mongodb://localhost:27017/pi-agent-framework |
MongoDB 连接;单节点可使用 standalone MongoDB。 |
REDIS_HOST / REDIS_PORT / REDIS_PASSWORD |
localhost / 6379 / 空 |
Redis 连接。 |
REDIS_TIMEOUT |
3000ms |
Lettuce 命令超时。 |
REDIS_POOL_MAX_ACTIVE / REDIS_POOL_MAX_IDLE / REDIS_POOL_MIN_IDLE |
16 / 8 / 2 |
连接池。 |
PORT |
8080 |
HTTP 端口。 |
| 配置 | 默认值 | 说明 |
|---|---|---|
PI_NODE_ID |
空 | 节点唯一标识;为空时使用 PI_CLUSTER_NODE_ID / POD_NAME / 进程 UUID。不会默认使用 hostname,避免本机快速重启或同机多实例误判为重复节点。重复 nodeId 在 <prefix>:node:registry 中会被检测并拒绝启动。 |
PI_CLUSTER_REDIS_KEY_PREFIX |
delphi |
Redis key 前缀。 |
PI_CLUSTER_RUN_MAX_TTL_MS |
1800000 |
active-run lease TTL。 |
PI_CLUSTER_LOCK_DEFAULT_TTL_MS |
30000 |
session command lock TTL。 |
PI_CLUSTER_COMMAND_TIMEOUT_MS |
5000 |
abort / steer 跨节点命令等待超时。 |
| 配置 | 默认值 | 说明 |
|---|---|---|
PI_WORKSPACES_ROOT |
workspaces |
本地热缓存根目录。 |
PI_WORKSPACE_STORAGE |
snapshot |
正式 runtime 只允许 snapshot;local 会启动失败。 |
PI_WORKSPACE_BUCKET |
空 | snapshot bucket,必填。 |
PI_WORKSPACE_S3_ENDPOINT |
空 | S3/MinIO endpoint。 |
PI_WORKSPACE_S3_REGION |
us-east-1 |
S3 region。 |
PI_WORKSPACE_S3_AK / PI_WORKSPACE_S3_SK |
空 | S3 凭证。 |
PI_WORKSPACE_S3_PATH_STYLE |
true |
path-style 访问(MinIO 必须)。 |
PI_WORKSPACE_SNAPSHOT_EXCLUDE |
.skills/,node_modules/,.git/,target/,build/,dist/ |
跳过快照的子路径。 |
| 配置 | 默认值 | 说明 |
|---|---|---|
PI_SESSION_MAX |
2000 |
进程内最大 session 缓存数。 |
PI_SESSION_IDLE_TTL_HOURS |
24 |
session idle 后回收时长。 |
PI_SESSION_REAP_MINUTES |
5 |
reap 调度周期。 |
pi.compaction.context-window-ratio |
0.8 |
触发自动 compaction 的窗口占比。 |
pi.compaction.max-messages |
60 |
触发自动 compaction 的消息数阈值。 |
pi.compaction.default-keep-count |
20 |
compaction 默认保留消息数。 |
PI_PLANNING_TIMEOUT |
45 |
TaskPlanning 工具超时。 |
PI_HTTP_ASYNC_REQUEST_TIMEOUT_MS |
0 |
Spring MVC async 超时;0 表示不超时。 |
PI_HTTP_CHAT_STREAM_TIMEOUT_MS |
0 |
SSE 长连接超时;0 表示直到终止事件或客户端断开。 |
PI_HTTP_CHAT_COMMAND_TIMEOUT_MS |
60000 |
command 类请求超时。 |
| 配置 | 默认值 | 说明 |
|---|---|---|
PI_TOOLS_BUILTIN_ENABLED |
true |
是否启用内置工具。 |
pi.tools.builtin.available |
[read,bash,edit,write,grep,find,ls] |
可用内置工具白名单。 |
pi.tools.builtin.default-enabled |
[read,bash,edit,write] |
新 session 默认开启的工具。 |
PI_TOOLS_READ_MAX_LINES / PI_TOOLS_READ_MAX_BYTES |
2000 / 51200 |
read 工具上限。 |
PI_TOOLS_BASH_TIMEOUT_SECONDS |
1800 |
bash 默认超时。 |
PI_TOOLS_BASH_MAX_LINES / PI_TOOLS_BASH_MAX_BYTES |
2000 / 51200 |
bash 输出截断。 |
PI_TOOLS_GREP_DEFAULT_LIMIT / PI_TOOLS_GREP_MAX_LINE_LENGTH / PI_TOOLS_GREP_MAX_BYTES |
100 / 500 / 51200 |
grep 限制。 |
PI_TOOLS_FIND_DEFAULT_LIMIT / PI_TOOLS_FIND_MAX_BYTES |
1000 / 51200 |
find 限制。 |
PI_TOOLS_LS_DEFAULT_LIMIT / PI_TOOLS_LS_MAX_BYTES |
500 / 51200 |
ls 限制。 |
PI_TOOL_POLICY_ORCHESTRATOR_STRICT |
false |
true 时主代理只能 READONLY / INSTRUCTIONAL / ORCHESTRATION。 |
| 配置 | 默认值 | 说明 |
|---|---|---|
PI_AGENT_SKILLS_DIRS |
./skills,${user.home}/.codex/skills |
Skill 扫描目录。 |
PI_AGENT_PROMPTS_DIRS |
./prompts |
Prompt 扫描目录。 |
PI_AGENT_RESOURCES_DIRS |
./resources |
Resource 扫描目录。 |
| 配置 | 默认值 | 说明 |
|---|---|---|
pi.rate-limit.enabled |
true |
是否启用 RPM 限流。 |
pi.rate-limit.default-rpm |
60 |
默认每分钟请求数。 |
pi.quota.enabled |
true |
是否启用配额。 |
pi.quota.defaults.max-concurrent-sessions |
50 |
单租户并发 session。 |
pi.quota.defaults.daily-token-limit |
1000000 |
每日 token 上限。 |
pi.quota.defaults.cpu-quota / memory-limit / pids-limit |
50000 / 256m / 100 |
sandbox 资源限制。 |
pi.quota.overrides.<tenant> |
— | 指定租户覆盖。 |
pi.audit.enabled |
true |
是否启用审计。 |
pi.metering.enabled |
true |
是否启用计量。 |
pi.metering.flush-interval-seconds |
30 |
计量刷写间隔。 |
| 配置 | 默认值 | 说明 |
|---|---|---|
PI_SHUTDOWN_DRAIN_TIMEOUT_SECONDS |
1800 |
等待 active run 的最长时间。 |
PI_SHUTDOWN_ABORT_GRACE_SECONDS |
10 |
timeout 后等待 abort 生效的短延时。 |
SPRING_LIFECYCLE_TIMEOUT_PER_SHUTDOWN_PHASE |
30m |
Spring lifecycle phase 超时。 |
PI_CLUSTER_ENABLED 已废弃,不再控制正式 runtime bean 创建。
| Key / Channel | 类型 | 生命周期语义 |
|---|---|---|
<prefix>:run:active:<runId> |
Hash + TTL | 当前 active run 元数据,包含 owner node 和 fencing token。 |
<prefix>:run:by-session:<namespace>:<sessionId> |
String + TTL | session 到 active run 的短生命周期索引。 |
<prefix>:session:fencing:<namespace>:<sessionId> |
Counter | session fencing token 单调递增源。 |
<prefix>:session:lock:<namespace>:<sessionId> |
String + TTL | compact / fork / navigate 的 session 写锁。 |
<prefix>:queue:session:<namespace>:<sessionId> |
Redis Stream | queued run。 |
<prefix>:queue:drain-lock:<namespace>:<sessionId> |
String + TTL | queue drainer 单 session 扫描锁。 |
<prefix>:run:commands:<nodeId> |
Pub/sub channel | abort / steer 发往 owner node。 |
<prefix>:subagent:commands:<nodeId> |
Pub/sub channel | subagent abort 发往持有节点。 |
<prefix>:sse:events |
Pub/sub channel | 跨节点 SSE 实时广播。 |
<prefix>:catalog:invalidate |
Pub/sub channel | catalog reload 广播。 |
<prefix>:events:session:<namespace>:<sessionId> |
Redis Stream | session event replay。 |
<prefix>:ratelimit:<namespace>:<window> |
String + TTL | 全局限流窗口。 |
<prefix>:usage:<yyyyMMdd>:<namespace>:<metric> |
String | usage buffer。 |
<prefix>:node:registry:<nodeId> |
String + TTL | 节点注册和重复 nodeId 检测。 |
| Collection | 用途 |
|---|---|
sessions |
session 元数据、head entry、模型配置、fencing 信息、auto compaction / retry 开关。 |
session_entries |
conversation entries,使用 sessionId + runId + sequence 幂等。 |
runs |
run lifecycle,含 owner、fencing、终态、failureType、terminalEventId。 |
runtime_event_outbox |
terminal event 投递兜底;outbox 先持久化,Redis terminal append 成功后再把 runs 更新为终态。 |
audit_logs |
审计事件。 |
usage_metrics |
计量指标(token、call、sandbox usage)。 |
subagent_states |
subagent 生命周期与结果。 |
- terminal outbox 先写入单条 Mongo outbox 文档,文档携带目标 run 终态、failureType 和 payload。
- Redis terminal event 写入成功后,
RuntimeEventOutboxWorker才把runs更新为终态并写入terminalEventId。 - owner 只在 Redis terminal event 可见之后释放 active run lease。
- 如果 Redis append 失败,lease 保留,outbox 进入
FAILED_RETRYABLE;后台 worker 成功投递后再释放 owner 本地 lease。 - queued run 只有调度接受后才
XACK + XDEL;调度失败会保留 pending 或显式 requeue,避免复制同一条 run。 - workspace snapshot 必须先于 success terminal event。
- shutdown drain 不迁移 active Agent,只让 owner 节点完成或超时终止。
- failure 分类由
RunFailureClassifier决定failureType:包括SHUTDOWN_TIMEOUT、WORKSPACE_SNAPSHOT_FAILED、WORKSPACE_SNAPSHOT_FAILED_DURING_SHUTDOWN、QUOTA_REJECTED、MODEL_ERROR、TOOL_ERROR等。
| 路径 | 说明 |
|---|---|
/actuator/health |
Redis、node drain、cluster runtime 健康状态。draining 时 readiness 为 OUT_OF_SERVICE,并暴露 localActiveRuns、localActiveEmitters 计数。 |
/actuator/info |
暴露 nodeId、hostname、version、startTime、draining,由 NodeInfoContributor 提供。 |
/actuator/metrics |
Spring Boot 指标。 |
/actuator/prometheus |
Prometheus exposition。 |
management.endpoints.web.exposure.include 默认 health,info,metrics,prometheus。
嵌入式接入(同一 JVM)优先使用 SDK:
@Autowired DelphiCodingAgentSdk sdk;
AgentSessionHandle handle = sdk.createSession(new CreateAgentSessionOptions(
"tenant-a", // namespace
"demo-project", // projectKey
"session-1", // sessionName
null, // provider,使用默认
null, // modelId,使用默认
null // systemPrompt
));
handle.prompt("帮我写一个排序算法").join();
String answer = handle.lastAssistantText();
handle.steer("注意要使用 Java 21 写法");
handle.followUp("再加单元测试");
handle.compact(20);
String forked = sdk.fork(handle.sessionId(), "tenant-a", "<entryId>", "实验分支");
sdk.navigateTree(handle.sessionId(), "tenant-a", "<entryId>");
// 订阅事件
AutoCloseable subscription = handle.subscribeEvents(event -> {
// AgentEvent: AgentStart / TurnStart / MessageStart / MessageUpdate / MessageEnd / ToolExecutionStart / ...
});| 入口 | 说明 |
|---|---|
DelphiCodingAgentSdk |
顶级 SDK,按 sessionId 调用。 |
CreateAgentSessionOptions |
创建 session 的参数。 |
AgentSessionHandle |
session 操作句柄:prompt、cont、steer、followUp、abort、compact、setSteeringMode、setFollowUpMode、setAutoCompaction、setAutoRetry、stats、lastAssistantText、state。 |
详细说明见 docs/agent-sdk.zh-CN.md。
前端通过 POST /api/chat/stream 发送请求,服务端返回 SSE 流。以下示例展示 Vue 和 React 的集成方式。
1. 发送 POST 请求到 /api/chat/stream(body 包含 namespace、sessionId、prompt 等)
2. 读取 SSE 响应流,逐帧解析事件(run_started → message_delta → run_completed)
3. 保存最后一个 SSE id,用于断线补读(GET /api/chat/events?lastEventId=xxx)
4. sessionId 由后端首次返回后前端保存,后续请求复用实现多轮对话
// composables/useDelphiChat.ts
import { ref, reactive } from 'vue'
interface ChatOptions {
baseUrl?: string
namespace: string
userId: string
tenantId?: string
}
export function useDelphiChat(options: ChatOptions) {
const baseUrl = options.baseUrl || ''
const sessionId = ref<string | null>(null)
const messages = reactive<Array<{ role: string; content: string }>>([])
const streaming = ref(false)
const lastEventId = ref<string | null>(null)
function buildHeaders() {
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
headers['X-Tenant-Id'] = options.tenantId || options.namespace
headers['X-User-Id'] = options.userId
return headers
}
async function sendPrompt(text: string, opts?: { queueMode?: string }) {
if (streaming.value) return
streaming.value = true
messages.push({ role: 'user', content: text })
const body: Record<string, any> = {
namespace: options.namespace,
prompt: text,
sessionId: sessionId.value,
queueMode: opts?.queueMode || 'INTERRUPT',
}
let assistantContent = ''
messages.push({ role: 'assistant', content: '' })
const assistantIdx = messages.length - 1
try {
const response = await fetch(`${baseUrl}/api/chat/stream`, {
method: 'POST',
headers: buildHeaders(),
body: JSON.stringify(body),
})
if (!response.ok) throw new Error(`HTTP ${response.status}`)
await readSseStream(response.body!, (name, data) => {
if (data.sessionId && !sessionId.value) {
sessionId.value = data.sessionId
}
if (name === 'message_delta' && data.data?.text) {
assistantContent += data.data.text
messages[assistantIdx].content = assistantContent
}
})
} finally {
streaming.value = false
}
}
async function readSseStream(
body: ReadableStream,
onEvent: (name: string, data: any) => void
) {
const reader = body.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
let boundary: number
while ((boundary = buffer.indexOf('\n\n')) !== -1) {
const frame = buffer.substring(0, boundary)
buffer = buffer.substring(boundary + 2)
const parsed = parseSseFrame(frame)
if (parsed) {
if (parsed.id) lastEventId.value = parsed.id
onEvent(parsed.name, parsed.data)
}
}
}
}
function parseSseFrame(frame: string) {
let id: string | null = null
let name: string | null = null
let dataStr = ''
for (const line of frame.split('\n')) {
if (line.startsWith('id:')) id = line.substring(3).trim()
else if (line.startsWith('event:')) name = line.substring(6).trim()
else if (line.startsWith('data:')) dataStr += line.substring(5).trimStart()
}
if (!dataStr) return null
try {
return { id, name: name || 'message', data: JSON.parse(dataStr) }
} catch { return null }
}
async function abort() {
if (!sessionId.value) return
await fetch(`${baseUrl}/api/chat/stream`, {
method: 'POST',
headers: buildHeaders(),
body: JSON.stringify({
namespace: options.namespace,
sessionId: sessionId.value,
command: 'abort',
}),
})
}
return { sessionId, messages, streaming, lastEventId, sendPrompt, abort }
}使用示例:
<script setup lang="ts">
import { useDelphiChat } from '@/composables/useDelphiChat'
const { messages, streaming, sendPrompt, abort } = useDelphiChat({
namespace: 'my-tenant',
userId: 'user-1',
})
const input = ref('')
async function onSend() {
const text = input.value.trim()
if (!text) return
input.value = ''
await sendPrompt(text)
}
</script>
<template>
<div class="chat">
<div v-for="(msg, i) in messages" :key="i" :class="msg.role">
{{ msg.content }}
</div>
<input v-model="input" @keyup.enter="onSend" :disabled="streaming" />
<button @click="onSend" :disabled="streaming">发送</button>
<button v-if="streaming" @click="abort">中止</button>
</div>
</template>// hooks/useDelphiChat.ts
import { useState, useRef, useCallback } from 'react'
interface ChatOptions {
baseUrl?: string
namespace: string
userId: string
tenantId?: string
}
interface Message {
role: 'user' | 'assistant' | 'system'
content: string
}
export function useDelphiChat(options: ChatOptions) {
const baseUrl = options.baseUrl || ''
const [sessionId, setSessionId] = useState<string | null>(null)
const [messages, setMessages] = useState<Message[]>([])
const [streaming, setStreaming] = useState(false)
const lastEventIdRef = useRef<string | null>(null)
function buildHeaders() {
const headers: Record<string, string> = { 'Content-Type': 'application/json' }
headers['X-Tenant-Id'] = options.tenantId || options.namespace
headers['X-User-Id'] = options.userId
return headers
}
const sendPrompt = useCallback(async (text: string, queueMode = 'INTERRUPT') => {
if (streaming) return
setStreaming(true)
setMessages(prev => [...prev, { role: 'user', content: text }])
let assistantContent = ''
setMessages(prev => [...prev, { role: 'assistant', content: '' }])
try {
const response = await fetch(`${baseUrl}/api/chat/stream`, {
method: 'POST',
headers: buildHeaders(),
body: JSON.stringify({
namespace: options.namespace,
prompt: text,
sessionId,
queueMode,
}),
})
if (!response.ok) throw new Error(`HTTP ${response.status}`)
const reader = response.body!.getReader()
const decoder = new TextDecoder()
let buffer = ''
while (true) {
const { done, value } = await reader.read()
if (done) break
buffer += decoder.decode(value, { stream: true })
let boundary: number
while ((boundary = buffer.indexOf('\n\n')) !== -1) {
const frame = buffer.substring(0, boundary)
buffer = buffer.substring(boundary + 2)
const parsed = parseSseFrame(frame)
if (!parsed) continue
if (parsed.id) lastEventIdRef.current = parsed.id
if (parsed.data.sessionId && !sessionId) {
setSessionId(parsed.data.sessionId)
}
if (parsed.name === 'message_delta' && parsed.data.data?.text) {
assistantContent += parsed.data.data.text
setMessages(prev => {
const updated = [...prev]
updated[updated.length - 1] = { role: 'assistant', content: assistantContent }
return updated
})
}
}
}
} finally {
setStreaming(false)
}
}, [streaming, sessionId, options.namespace])
const abort = useCallback(async () => {
if (!sessionId) return
await fetch(`${baseUrl}/api/chat/stream`, {
method: 'POST',
headers: buildHeaders(),
body: JSON.stringify({
namespace: options.namespace,
sessionId,
command: 'abort',
}),
})
}, [sessionId, options.namespace])
return { sessionId, messages, streaming, sendPrompt, abort }
}
function parseSseFrame(frame: string) {
let id: string | null = null
let name: string | null = null
let dataStr = ''
for (const line of frame.split('\n')) {
if (line.startsWith('id:')) id = line.substring(3).trim()
else if (line.startsWith('event:')) name = line.substring(6).trim()
else if (line.startsWith('data:')) dataStr += line.substring(5).trimStart()
}
if (!dataStr) return null
try {
return { id, name: name || 'message', data: JSON.parse(dataStr) }
} catch { return null }
}使用示例:
import { useDelphiChat } from './hooks/useDelphiChat'
function Chat() {
const { messages, streaming, sendPrompt, abort } = useDelphiChat({
namespace: 'my-tenant',
userId: 'user-1',
})
const [input, setInput] = useState('')
const onSend = async () => {
if (!input.trim()) return
const text = input
setInput('')
await sendPrompt(text)
}
return (
<div>
{messages.map((msg, i) => (
<div key={i} className={msg.role}>{msg.content}</div>
))}
<input value={input} onChange={e => setInput(e.target.value)}
onKeyDown={e => e.key === 'Enter' && onSend()}
disabled={streaming} />
<button onClick={onSend} disabled={streaming}>发送</button>
{streaming && <button onClick={abort}>中止</button>}
</div>
)
}当 SSE 连接中断但 run 尚未结束时,使用 /api/chat/events 从上次位置继续接收:
async function reconnect(namespace: string, sessionId: string, runId: string, lastEventId: string) {
const params = new URLSearchParams({ namespace, sessionId, runId, mode: 'live', from: 'last' })
if (lastEventId) params.set('lastEventId', lastEventId)
const response = await fetch(`/api/chat/events?${params.toString()}`, {
headers: lastEventId ? { 'Last-Event-ID': lastEventId } : {},
})
// 使用相同的 SSE 流解析逻辑
}| 项目 | 说明 |
|---|---|
| sessionId 管理 | 首次请求可不传 sessionId,后端自动创建并在事件中返回;前端保存后复用即可实现多轮对话。 |
| 必要 Headers | X-Tenant-Id 必须等于 namespace,X-User-Id 必填,否则返回 400。 |
| queueMode | 推荐 INTERRUPT(默认行为:打断当前 run 重新执行);如果不希望打断,用 FOLLOWUP 排队。 |
| 流式解析 | SSE 帧以 \n\n 分隔,每帧可能包含 id:、event:、data: 多行。 |
| 终态判断 | 收到 run_completed / run_failed / quota_rejected 事件表示 run 结束。 |
| CORS | 开发环境下前端 devserver 需代理 /api 到后端,或后端配置 CORS。 |