Skip to content

Latest commit

 

History

History
715 lines (588 loc) · 29.4 KB

File metadata and controls

715 lines (588 loc) · 29.4 KB

Delphi Agent API Docs

DOCS.md 是 API / SDK / runtime 配置的入口。技术架构图见 ARCHITECHE.MD,项目运行说明见 README.md,数据存储结构见 DATA.md

兼容入口:DOC.md

目录

API Reference

API Surface

方法 路径 说明
POST /api/chat/stream 主入口:发送 prompt、continue 或 command,统一返回 SSE。
GET /api/chat/events SSE replay 入口,基于 Redis Stream ID 补读并接 live。
GET /api/catalog/models?provider=<provider> 查询模型列表。
GET /api/catalog/skills?namespace=<ns> 查询 namespace 可见 skills。
GET /api/catalog/skills/{name}?namespace=<ns> 查询单个 skill。
GET /api/catalog/prompts 查询 prompt catalog。
GET /api/catalog/resources 查询 resource catalog。
POST /api/catalog/reload?namespace=<ns> 重扫 catalog,并清理 skill cache,跨节点广播失效。
GET /api/audit?namespace=<ns>&from=&to=&limit= 查询审计日志(默认近 7 天)。
GET /api/usage?namespace=<ns>&from=&to= 查询用量统计(默认近 30 天)。
GET /api/usage/today?namespace=<ns> 查询今日用量。
GET /actuator/health Redis、node drain、cluster runtime 健康。
GET /actuator/info nodeId、hostname、version、startTime、draining。
GET /actuator/metrics / /actuator/prometheus 运行指标。

Required Headers

Header 说明
Content-Type: application/json POST 请求必须使用 JSON。
X-Tenant-Id 租户标识,必须等于请求体中的 namespace
X-User-Id 当前用户标识,用于审计和计量。

RuntimeIdentityResolver 会校验 header 与 namespace 的一致性,不一致时返回 400 / TENANT_MISMATCH

POST /api/chat/stream

请求分两类:

  1. command 为空:普通 prompt,服务端创建或加载 session,进入 AgentRunRuntime
  2. command 非空:执行 command;continue 会作为标准 run 进入 AgentRunRuntime,其他 command 返回 ack SSE。

关键字段:

字段 说明
namespace 必填,必须与 X-Tenant-Id 一致。
sessionId 可选;为空时创建新 session。
projectKey 可选,用于 session/project workspace 关联。
prompt prompt 文本;steer/continue 按命令语义使用。
provider / modelId 可选;创建新 session 时未传则使用默认模型(pi.defaults)。
systemPrompt 可选,新 session 的 system prompt。
queueMode INTERRUPT / FOLLOWUP / STEER / DROP / REJECT;普通 prompt 未传默认 REJECT
command abort / steer / compact / fork / navigate / continue,大小写会归一化。
commandArgs 命令参数,例如 {"keep": 20}{"entryId": "..."}{"newSessionName": "..."}

Drain 期间的入口语义

节点进入 draining 后:

请求类型 行为
新 prompt 拒绝,返回 503 / NODE_DRAINING
continue 拒绝,返回 503 / NODE_DRAINING
compact / fork / navigate 拒绝,返回 503 / NODE_DRAINING
abort / steer 允许,用于结束或引导已有 active run。

Queue 决策

AgentRunRuntime.dispatchByQueuePolicy 通过 RunQueueManager.decide() 计算决策:

决策 行为
RUN_NOW 无 active run,直接调度。
INTERRUPT 有 active run,先 abort 旧 run,新 run 入队等待。
ENQUEUE 有 active run,新 run 入队等待。
STEER 有 active run,将 prompt 注入当前 run,发出 queue_updated 事件。
DROP 有 active run,丢弃新请求,发出 queue_updated 事件。
REJECT 拒绝,发 quota_rejected 事件。

TenantRuntimeGuard.ensureQueueCapacity 在入队前会检查租户队列容量。

Command Consistency

命令 行为
abort AgentRunRuntime.abort 查找 active owner;远端 owner 通过 Redis command channel 中止本地 Agent;同时取消子 Agent。
steer 通过 active owner 注入当前 run;如果无 active run,退化为 session steering(写入 steeringQueue)。
compact SessionCommandRuntime,active run 存在时拒绝,空闲时获取 Redis session lock 后执行。
fork 同上,返回 forkSessionIdcommandArgs.entryId 指定从哪个 entry fork。
navigate 同上,避免导航和正在运行的 Agent 并发修改 head。commandArgs.entryId 指定目标节点。
continue 作为 AgentRunOperation.CONTINUE 进入 run admission、queue、fencing、SSE 生命周期。

会话写入幂等性:

  • prompt / continue 携带 runId + fencingToken 进入 AgentSessionRuntime
  • Mongo entries 使用 sessionId + runId + sequence 唯一索引避免重复追加。
  • SessionDocument.fencingToken 记录最后成功提交的 token;低 token 写入会被拒绝。

SSE Events

类别 事件名称
Run lifecycle run_startedqueue_updatedrun_completedrun_failedquota_rejected
Message stream message_startmessage_deltamessage_end
Tool execution tool_startedtool_updatedtool_completed
Subagent subagent_startedsubagent_completedsubagent_failed
Command ack ack

每个 runtime event 的 SSE id 是 Redis Stream ID。客户端应保存最后处理成功的 SSE id,用于断线补读。

SessionEventBroker 实现 RuntimeEventSink,本地投递 SSE 的同时通过 RedisSseEventPublisher<prefix>:sse:events 频道 fanout 到其他节点;终端事件触发 SseEmitter.complete()

GET /api/chat/events

用于 replay + live subscribe。

参数:

参数 说明
namespace 必填。
sessionId 必填。
runId 可选;传入后只接收该 run 的事件。
lastEventId 可选;等价于 Last-Event-ID header。
mode live 默认;replay-only 表示追到 high-watermark 后 complete。
from last 默认;run-start 表示从 Redis Stream 起点补读。

补读流程(SessionEventReplayController):

  1. 服务端读取当前 Redis Stream high-watermark。
  2. 如果历史中已有当前 run 的 terminal event,直接 replay 后 complete。
  3. 否则注册 live emitter,并标记 live 只接收 > highWatermark 的事件。
  4. 补发 (lastEventId, highWatermark]
  5. live 阶段继续接收新事件。

这样客户端可连接任意节点,不依赖原始 owner node 的本地 SseEmitter

Catalog API

/api/catalog/*CatalogController 提供:

  • GET /api/catalog/models?provider=<provider>:返回 pi.models 中已注册的模型,可按 provider 过滤。
  • GET /api/catalog/skills?namespace=<ns>:返回 namespace 可见 Skills(public + namespace 私有);同名 namespace 会覆盖 public。
  • GET /api/catalog/skills/{name}?namespace=<ns>:返回单个 skill 的 metadata 和 content
  • GET /api/catalog/prompts:列出 prompt catalog。
  • GET /api/catalog/resources:列出 resource catalog。
  • POST /api/catalog/reload?namespace=<ns>:重新扫描磁盘,并通过 CacheInvalidationPublisher 在集群内广播失效,使其他节点也清空 cache。

Audit / Usage API

路径 说明
GET /api/audit?namespace=<ns>&from=&to=&limit= 查询 audit_logsfrom / to 为 ISO-8601 instant;缺省返回近 7 天,最大 limit=500。
GET /api/usage?namespace=<ns>&from=&to= 查询 usage_metrics,按天聚合 token / call / sandbox 用量。缺省近 30 天。
GET /api/usage/today?namespace=<ns> 当天聚合,便于 dashboard 实时显示。

Runtime Configuration

正式 runtime 不区分 local / standalone / cluster 配置开关。下面这些依赖和配置始终是正式运行路径的一部分。

基础

环境变量 / Property 默认值 说明
MONGODB_URI mongodb://localhost:27017/pi-agent-framework MongoDB 连接;单节点可使用 standalone MongoDB。
REDIS_HOST / REDIS_PORT / REDIS_PASSWORD localhost / 6379 / 空 Redis 连接。
REDIS_TIMEOUT 3000ms Lettuce 命令超时。
REDIS_POOL_MAX_ACTIVE / REDIS_POOL_MAX_IDLE / REDIS_POOL_MIN_IDLE 16 / 8 / 2 连接池。
PORT 8080 HTTP 端口。

节点与集群

配置 默认值 说明
PI_NODE_ID 节点唯一标识;为空时使用 PI_CLUSTER_NODE_ID / POD_NAME / 进程 UUID。不会默认使用 hostname,避免本机快速重启或同机多实例误判为重复节点。重复 nodeId 在 <prefix>:node:registry 中会被检测并拒绝启动。
PI_CLUSTER_REDIS_KEY_PREFIX delphi Redis key 前缀。
PI_CLUSTER_RUN_MAX_TTL_MS 1800000 active-run lease TTL。
PI_CLUSTER_LOCK_DEFAULT_TTL_MS 30000 session command lock TTL。
PI_CLUSTER_COMMAND_TIMEOUT_MS 5000 abort / steer 跨节点命令等待超时。

Workspace

配置 默认值 说明
PI_WORKSPACES_ROOT workspaces 本地热缓存根目录。
PI_WORKSPACE_STORAGE snapshot 正式 runtime 只允许 snapshotlocal 会启动失败。
PI_WORKSPACE_BUCKET snapshot bucket,必填。
PI_WORKSPACE_S3_ENDPOINT S3/MinIO endpoint。
PI_WORKSPACE_S3_REGION us-east-1 S3 region。
PI_WORKSPACE_S3_AK / PI_WORKSPACE_S3_SK S3 凭证。
PI_WORKSPACE_S3_PATH_STYLE true path-style 访问(MinIO 必须)。
PI_WORKSPACE_SNAPSHOT_EXCLUDE .skills/,node_modules/,.git/,target/,build/,dist/ 跳过快照的子路径。

Session / 计算

配置 默认值 说明
PI_SESSION_MAX 2000 进程内最大 session 缓存数。
PI_SESSION_IDLE_TTL_HOURS 24 session idle 后回收时长。
PI_SESSION_REAP_MINUTES 5 reap 调度周期。
pi.compaction.context-window-ratio 0.8 触发自动 compaction 的窗口占比。
pi.compaction.max-messages 60 触发自动 compaction 的消息数阈值。
pi.compaction.default-keep-count 20 compaction 默认保留消息数。
PI_PLANNING_TIMEOUT 45 TaskPlanning 工具超时。
PI_HTTP_ASYNC_REQUEST_TIMEOUT_MS 0 Spring MVC async 超时;0 表示不超时。
PI_HTTP_CHAT_STREAM_TIMEOUT_MS 0 SSE 长连接超时;0 表示直到终止事件或客户端断开。
PI_HTTP_CHAT_COMMAND_TIMEOUT_MS 60000 command 类请求超时。

工具

配置 默认值 说明
PI_TOOLS_BUILTIN_ENABLED true 是否启用内置工具。
pi.tools.builtin.available [read,bash,edit,write,grep,find,ls] 可用内置工具白名单。
pi.tools.builtin.default-enabled [read,bash,edit,write] 新 session 默认开启的工具。
PI_TOOLS_READ_MAX_LINES / PI_TOOLS_READ_MAX_BYTES 2000 / 51200 read 工具上限。
PI_TOOLS_BASH_TIMEOUT_SECONDS 1800 bash 默认超时。
PI_TOOLS_BASH_MAX_LINES / PI_TOOLS_BASH_MAX_BYTES 2000 / 51200 bash 输出截断。
PI_TOOLS_GREP_DEFAULT_LIMIT / PI_TOOLS_GREP_MAX_LINE_LENGTH / PI_TOOLS_GREP_MAX_BYTES 100 / 500 / 51200 grep 限制。
PI_TOOLS_FIND_DEFAULT_LIMIT / PI_TOOLS_FIND_MAX_BYTES 1000 / 51200 find 限制。
PI_TOOLS_LS_DEFAULT_LIMIT / PI_TOOLS_LS_MAX_BYTES 500 / 51200 ls 限制。
PI_TOOL_POLICY_ORCHESTRATOR_STRICT false true 时主代理只能 READONLY / INSTRUCTIONAL / ORCHESTRATION。

Catalog

配置 默认值 说明
PI_AGENT_SKILLS_DIRS ./skills,${user.home}/.codex/skills Skill 扫描目录。
PI_AGENT_PROMPTS_DIRS ./prompts Prompt 扫描目录。
PI_AGENT_RESOURCES_DIRS ./resources Resource 扫描目录。

治理

配置 默认值 说明
pi.rate-limit.enabled true 是否启用 RPM 限流。
pi.rate-limit.default-rpm 60 默认每分钟请求数。
pi.quota.enabled true 是否启用配额。
pi.quota.defaults.max-concurrent-sessions 50 单租户并发 session。
pi.quota.defaults.daily-token-limit 1000000 每日 token 上限。
pi.quota.defaults.cpu-quota / memory-limit / pids-limit 50000 / 256m / 100 sandbox 资源限制。
pi.quota.overrides.<tenant> 指定租户覆盖。
pi.audit.enabled true 是否启用审计。
pi.metering.enabled true 是否启用计量。
pi.metering.flush-interval-seconds 30 计量刷写间隔。

Shutdown

配置 默认值 说明
PI_SHUTDOWN_DRAIN_TIMEOUT_SECONDS 1800 等待 active run 的最长时间。
PI_SHUTDOWN_ABORT_GRACE_SECONDS 10 timeout 后等待 abort 生效的短延时。
SPRING_LIFECYCLE_TIMEOUT_PER_SHUTDOWN_PHASE 30m Spring lifecycle phase 超时。

PI_CLUSTER_ENABLED 已废弃,不再控制正式 runtime bean 创建。

Redis Keys

Key / Channel 类型 生命周期语义
<prefix>:run:active:<runId> Hash + TTL 当前 active run 元数据,包含 owner node 和 fencing token。
<prefix>:run:by-session:<namespace>:<sessionId> String + TTL session 到 active run 的短生命周期索引。
<prefix>:session:fencing:<namespace>:<sessionId> Counter session fencing token 单调递增源。
<prefix>:session:lock:<namespace>:<sessionId> String + TTL compact / fork / navigate 的 session 写锁。
<prefix>:queue:session:<namespace>:<sessionId> Redis Stream queued run。
<prefix>:queue:drain-lock:<namespace>:<sessionId> String + TTL queue drainer 单 session 扫描锁。
<prefix>:run:commands:<nodeId> Pub/sub channel abort / steer 发往 owner node。
<prefix>:subagent:commands:<nodeId> Pub/sub channel subagent abort 发往持有节点。
<prefix>:sse:events Pub/sub channel 跨节点 SSE 实时广播。
<prefix>:catalog:invalidate Pub/sub channel catalog reload 广播。
<prefix>:events:session:<namespace>:<sessionId> Redis Stream session event replay。
<prefix>:ratelimit:<namespace>:<window> String + TTL 全局限流窗口。
<prefix>:usage:<yyyyMMdd>:<namespace>:<metric> String usage buffer。
<prefix>:node:registry:<nodeId> String + TTL 节点注册和重复 nodeId 检测。

Mongo Collections

Collection 用途
sessions session 元数据、head entry、模型配置、fencing 信息、auto compaction / retry 开关。
session_entries conversation entries,使用 sessionId + runId + sequence 幂等。
runs run lifecycle,含 owner、fencing、终态、failureType、terminalEventId。
runtime_event_outbox terminal event 投递兜底;outbox 先持久化,Redis terminal append 成功后再把 runs 更新为终态。
audit_logs 审计事件。
usage_metrics 计量指标(token、call、sandbox usage)。
subagent_states subagent 生命周期与结果。

Failure / Shutdown 不变量

  • terminal outbox 先写入单条 Mongo outbox 文档,文档携带目标 run 终态、failureType 和 payload。
  • Redis terminal event 写入成功后,RuntimeEventOutboxWorker 才把 runs 更新为终态并写入 terminalEventId
  • owner 只在 Redis terminal event 可见之后释放 active run lease。
  • 如果 Redis append 失败,lease 保留,outbox 进入 FAILED_RETRYABLE;后台 worker 成功投递后再释放 owner 本地 lease。
  • queued run 只有调度接受后才 XACK + XDEL;调度失败会保留 pending 或显式 requeue,避免复制同一条 run。
  • workspace snapshot 必须先于 success terminal event。
  • shutdown drain 不迁移 active Agent,只让 owner 节点完成或超时终止。
  • failure 分类由 RunFailureClassifier 决定 failureType:包括 SHUTDOWN_TIMEOUTWORKSPACE_SNAPSHOT_FAILEDWORKSPACE_SNAPSHOT_FAILED_DURING_SHUTDOWNQUOTA_REJECTEDMODEL_ERRORTOOL_ERROR 等。

Actuator

路径 说明
/actuator/health Redis、node drain、cluster runtime 健康状态。draining 时 readiness 为 OUT_OF_SERVICE,并暴露 localActiveRunslocalActiveEmitters 计数。
/actuator/info 暴露 nodeIdhostnameversionstartTimedraining,由 NodeInfoContributor 提供。
/actuator/metrics Spring Boot 指标。
/actuator/prometheus Prometheus exposition。

management.endpoints.web.exposure.include 默认 health,info,metrics,prometheus

SDK Reference

嵌入式接入(同一 JVM)优先使用 SDK:

@Autowired DelphiCodingAgentSdk sdk;

AgentSessionHandle handle = sdk.createSession(new CreateAgentSessionOptions(
    "tenant-a",            // namespace
    "demo-project",        // projectKey
    "session-1",           // sessionName
    null,                   // provider,使用默认
    null,                   // modelId,使用默认
    null                    // systemPrompt
));

handle.prompt("帮我写一个排序算法").join();
String answer = handle.lastAssistantText();

handle.steer("注意要使用 Java 21 写法");
handle.followUp("再加单元测试");

handle.compact(20);
String forked = sdk.fork(handle.sessionId(), "tenant-a", "<entryId>", "实验分支");
sdk.navigateTree(handle.sessionId(), "tenant-a", "<entryId>");

// 订阅事件
AutoCloseable subscription = handle.subscribeEvents(event -> {
    // AgentEvent: AgentStart / TurnStart / MessageStart / MessageUpdate / MessageEnd / ToolExecutionStart / ...
});
入口 说明
DelphiCodingAgentSdk 顶级 SDK,按 sessionId 调用。
CreateAgentSessionOptions 创建 session 的参数。
AgentSessionHandle session 操作句柄:prompt、cont、steer、followUp、abort、compact、setSteeringMode、setFollowUpMode、setAutoCompaction、setAutoRetry、stats、lastAssistantText、state。

详细说明见 docs/agent-sdk.zh-CN.md

Frontend Integration

前端通过 POST /api/chat/stream 发送请求,服务端返回 SSE 流。以下示例展示 Vue 和 React 的集成方式。

核心流程

1. 发送 POST 请求到 /api/chat/stream(body 包含 namespace、sessionId、prompt 等)
2. 读取 SSE 响应流,逐帧解析事件(run_started → message_delta → run_completed)
3. 保存最后一个 SSE id,用于断线补读(GET /api/chat/events?lastEventId=xxx)
4. sessionId 由后端首次返回后前端保存,后续请求复用实现多轮对话

Vue 3 (Composition API)

// composables/useDelphiChat.ts
import { ref, reactive } from 'vue'

interface ChatOptions {
  baseUrl?: string
  namespace: string
  userId: string
  tenantId?: string
}

export function useDelphiChat(options: ChatOptions) {
  const baseUrl = options.baseUrl || ''
  const sessionId = ref<string | null>(null)
  const messages = reactive<Array<{ role: string; content: string }>>([])
  const streaming = ref(false)
  const lastEventId = ref<string | null>(null)

  function buildHeaders() {
    const headers: Record<string, string> = { 'Content-Type': 'application/json' }
    headers['X-Tenant-Id'] = options.tenantId || options.namespace
    headers['X-User-Id'] = options.userId
    return headers
  }

  async function sendPrompt(text: string, opts?: { queueMode?: string }) {
    if (streaming.value) return
    streaming.value = true
    messages.push({ role: 'user', content: text })

    const body: Record<string, any> = {
      namespace: options.namespace,
      prompt: text,
      sessionId: sessionId.value,
      queueMode: opts?.queueMode || 'INTERRUPT',
    }

    let assistantContent = ''
    messages.push({ role: 'assistant', content: '' })
    const assistantIdx = messages.length - 1

    try {
      const response = await fetch(`${baseUrl}/api/chat/stream`, {
        method: 'POST',
        headers: buildHeaders(),
        body: JSON.stringify(body),
      })
      if (!response.ok) throw new Error(`HTTP ${response.status}`)
      await readSseStream(response.body!, (name, data) => {
        if (data.sessionId && !sessionId.value) {
          sessionId.value = data.sessionId
        }
        if (name === 'message_delta' && data.data?.text) {
          assistantContent += data.data.text
          messages[assistantIdx].content = assistantContent
        }
      })
    } finally {
      streaming.value = false
    }
  }

  async function readSseStream(
    body: ReadableStream,
    onEvent: (name: string, data: any) => void
  ) {
    const reader = body.getReader()
    const decoder = new TextDecoder()
    let buffer = ''
    while (true) {
      const { done, value } = await reader.read()
      if (done) break
      buffer += decoder.decode(value, { stream: true })
      let boundary: number
      while ((boundary = buffer.indexOf('\n\n')) !== -1) {
        const frame = buffer.substring(0, boundary)
        buffer = buffer.substring(boundary + 2)
        const parsed = parseSseFrame(frame)
        if (parsed) {
          if (parsed.id) lastEventId.value = parsed.id
          onEvent(parsed.name, parsed.data)
        }
      }
    }
  }

  function parseSseFrame(frame: string) {
    let id: string | null = null
    let name: string | null = null
    let dataStr = ''
    for (const line of frame.split('\n')) {
      if (line.startsWith('id:')) id = line.substring(3).trim()
      else if (line.startsWith('event:')) name = line.substring(6).trim()
      else if (line.startsWith('data:')) dataStr += line.substring(5).trimStart()
    }
    if (!dataStr) return null
    try {
      return { id, name: name || 'message', data: JSON.parse(dataStr) }
    } catch { return null }
  }

  async function abort() {
    if (!sessionId.value) return
    await fetch(`${baseUrl}/api/chat/stream`, {
      method: 'POST',
      headers: buildHeaders(),
      body: JSON.stringify({
        namespace: options.namespace,
        sessionId: sessionId.value,
        command: 'abort',
      }),
    })
  }

  return { sessionId, messages, streaming, lastEventId, sendPrompt, abort }
}

使用示例:

<script setup lang="ts">
import { useDelphiChat } from '@/composables/useDelphiChat'

const { messages, streaming, sendPrompt, abort } = useDelphiChat({
  namespace: 'my-tenant',
  userId: 'user-1',
})

const input = ref('')
async function onSend() {
  const text = input.value.trim()
  if (!text) return
  input.value = ''
  await sendPrompt(text)
}
</script>

<template>
  <div class="chat">
    <div v-for="(msg, i) in messages" :key="i" :class="msg.role">
      {{ msg.content }}
    </div>
    <input v-model="input" @keyup.enter="onSend" :disabled="streaming" />
    <button @click="onSend" :disabled="streaming">发送</button>
    <button v-if="streaming" @click="abort">中止</button>
  </div>
</template>

React (Hooks)

// hooks/useDelphiChat.ts
import { useState, useRef, useCallback } from 'react'

interface ChatOptions {
  baseUrl?: string
  namespace: string
  userId: string
  tenantId?: string
}

interface Message {
  role: 'user' | 'assistant' | 'system'
  content: string
}

export function useDelphiChat(options: ChatOptions) {
  const baseUrl = options.baseUrl || ''
  const [sessionId, setSessionId] = useState<string | null>(null)
  const [messages, setMessages] = useState<Message[]>([])
  const [streaming, setStreaming] = useState(false)
  const lastEventIdRef = useRef<string | null>(null)

  function buildHeaders() {
    const headers: Record<string, string> = { 'Content-Type': 'application/json' }
    headers['X-Tenant-Id'] = options.tenantId || options.namespace
    headers['X-User-Id'] = options.userId
    return headers
  }

  const sendPrompt = useCallback(async (text: string, queueMode = 'INTERRUPT') => {
    if (streaming) return
    setStreaming(true)
    setMessages(prev => [...prev, { role: 'user', content: text }])

    let assistantContent = ''
    setMessages(prev => [...prev, { role: 'assistant', content: '' }])

    try {
      const response = await fetch(`${baseUrl}/api/chat/stream`, {
        method: 'POST',
        headers: buildHeaders(),
        body: JSON.stringify({
          namespace: options.namespace,
          prompt: text,
          sessionId,
          queueMode,
        }),
      })
      if (!response.ok) throw new Error(`HTTP ${response.status}`)

      const reader = response.body!.getReader()
      const decoder = new TextDecoder()
      let buffer = ''

      while (true) {
        const { done, value } = await reader.read()
        if (done) break
        buffer += decoder.decode(value, { stream: true })
        let boundary: number
        while ((boundary = buffer.indexOf('\n\n')) !== -1) {
          const frame = buffer.substring(0, boundary)
          buffer = buffer.substring(boundary + 2)
          const parsed = parseSseFrame(frame)
          if (!parsed) continue
          if (parsed.id) lastEventIdRef.current = parsed.id
          if (parsed.data.sessionId && !sessionId) {
            setSessionId(parsed.data.sessionId)
          }
          if (parsed.name === 'message_delta' && parsed.data.data?.text) {
            assistantContent += parsed.data.data.text
            setMessages(prev => {
              const updated = [...prev]
              updated[updated.length - 1] = { role: 'assistant', content: assistantContent }
              return updated
            })
          }
        }
      }
    } finally {
      setStreaming(false)
    }
  }, [streaming, sessionId, options.namespace])

  const abort = useCallback(async () => {
    if (!sessionId) return
    await fetch(`${baseUrl}/api/chat/stream`, {
      method: 'POST',
      headers: buildHeaders(),
      body: JSON.stringify({
        namespace: options.namespace,
        sessionId,
        command: 'abort',
      }),
    })
  }, [sessionId, options.namespace])

  return { sessionId, messages, streaming, sendPrompt, abort }
}

function parseSseFrame(frame: string) {
  let id: string | null = null
  let name: string | null = null
  let dataStr = ''
  for (const line of frame.split('\n')) {
    if (line.startsWith('id:')) id = line.substring(3).trim()
    else if (line.startsWith('event:')) name = line.substring(6).trim()
    else if (line.startsWith('data:')) dataStr += line.substring(5).trimStart()
  }
  if (!dataStr) return null
  try {
    return { id, name: name || 'message', data: JSON.parse(dataStr) }
  } catch { return null }
}

使用示例:

import { useDelphiChat } from './hooks/useDelphiChat'

function Chat() {
  const { messages, streaming, sendPrompt, abort } = useDelphiChat({
    namespace: 'my-tenant',
    userId: 'user-1',
  })
  const [input, setInput] = useState('')

  const onSend = async () => {
    if (!input.trim()) return
    const text = input
    setInput('')
    await sendPrompt(text)
  }

  return (
    <div>
      {messages.map((msg, i) => (
        <div key={i} className={msg.role}>{msg.content}</div>
      ))}
      <input value={input} onChange={e => setInput(e.target.value)}
             onKeyDown={e => e.key === 'Enter' && onSend()}
             disabled={streaming} />
      <button onClick={onSend} disabled={streaming}>发送</button>
      {streaming && <button onClick={abort}>中止</button>}
    </div>
  )
}

SSE 断线补读

当 SSE 连接中断但 run 尚未结束时,使用 /api/chat/events 从上次位置继续接收:

async function reconnect(namespace: string, sessionId: string, runId: string, lastEventId: string) {
  const params = new URLSearchParams({ namespace, sessionId, runId, mode: 'live', from: 'last' })
  if (lastEventId) params.set('lastEventId', lastEventId)
  const response = await fetch(`/api/chat/events?${params.toString()}`, {
    headers: lastEventId ? { 'Last-Event-ID': lastEventId } : {},
  })
  // 使用相同的 SSE 流解析逻辑
}

关键注意事项

项目 说明
sessionId 管理 首次请求可不传 sessionId,后端自动创建并在事件中返回;前端保存后复用即可实现多轮对话。
必要 Headers X-Tenant-Id 必须等于 namespaceX-User-Id 必填,否则返回 400。
queueMode 推荐 INTERRUPT(默认行为:打断当前 run 重新执行);如果不希望打断,用 FOLLOWUP 排队。
流式解析 SSE 帧以 \n\n 分隔,每帧可能包含 id:event:data: 多行。
终态判断 收到 run_completed / run_failed / quota_rejected 事件表示 run 结束。
CORS 开发环境下前端 devserver 需代理 /api 到后端,或后端配置 CORS。