From a4dd58665297449786691159c1623566ad957193 Mon Sep 17 00:00:00 2001 From: limityan Date: Mon, 29 Jun 2026 12:58:38 +0800 Subject: [PATCH] refactor(core): route session feature ownership through runtime ports Route session workspace resolution and thread-goal model tool management through AgentRuntime/CoreServiceAgentRuntime owner paths. Preserve workspace/remote identity, goal lifecycle behavior, tool response wire shape, and user-facing error policy while keeping runtime ports additive for SDK callers. Move workspace-search preview/result conversion into services-integrations and keep plan/completed docs aligned with the implemented owner migration. --- docs/plans/core-decomposition-completed.md | 4 + docs/plans/core-decomposition-plan.md | 6 +- .../rules/source/forbidden-rules.mjs | 12 +- .../rules/source/required-rules.mjs | 14 +- scripts/core-boundaries/self-test.mjs | 14 +- .../src/agentic/coordination/coordinator.rs | 65 ++++++ .../tools/implementations/cron_tool.rs | 135 ++++++++++-- .../implementations/session_history_tool.rs | 26 +-- .../implementations/session_message_tool.rs | 98 ++++++++- .../implementations/thread_goal_tools.rs | 120 ++++++++--- .../core/src/service_agent_runtime.rs | 105 ++++++--- src/crates/contracts/runtime-ports/src/lib.rs | 80 +++++++ .../execution/agent-runtime/src/runtime.rs | 199 +++++++++++++++++- src/crates/execution/agent-runtime/src/sdk.rs | 13 +- .../src/workspace_search/result_mapping.rs | 49 +++-- .../src/workspace_search/service.rs | 25 +++ 16 files changed, 811 insertions(+), 154 deletions(-) diff --git a/docs/plans/core-decomposition-completed.md b/docs/plans/core-decomposition-completed.md index 13f377851..2feb3f133 100644 --- a/docs/plans/core-decomposition-completed.md +++ b/docs/plans/core-decomposition-completed.md @@ -28,6 +28,10 @@ - `bitfun-core` 的 function-agent AI concrete acquisition 已从旧 `runtime_services` 路径收拢到明确的 core port adapter;Git / AI compatibility re-export 仍保留旧 public path。 - Product Assembly 已承接 `DeliveryProfile`、当前交付形态入口矩阵、`CapabilitySet`、feature group matrix、profile-scoped capability plan、product-full provider plan、service availability report、profile-scoped harness registry 入口与 legacy-route 行为保护,以及 `ProductAssembler` 对 explicit profile input、runtime services、harness registry 和 service requirement 的验证;core 只保留兼容 re-export。ProductFull / Desktop / CLI / ACP 保留完整能力;Server / Remote / Web / MobileWeb 不再 materialize product-full capability packs、feature groups、runtime services、tool groups 或 harness routes。 +- Agent session/workspace owner routing 已继续收敛:`AgentRuntime` 提供 port-backed session workspace resolution entrypoint;Cron、SessionControl、SessionMessage 和 SessionHistory 不再在工具实现中直接解析目标 session workspace,Cron 保留 target session 可见性验证,workspace identity 中的 `workspace_id` / remote connection / remote host 通过 runtime contract 传递。 +- `/goal` model tool management 已继续收敛:`AgentRuntime` 提供 thread-goal management port,`get_goal` / `create_goal` / `update_goal` 经 `CoreServiceAgentRuntime` 路由到 core concrete adapter;goal lifecycle、metadata、tool response wire shape 和错误类别保持等价。 +- `services-integrations` workspace search result mapping 已承接 flashgrep hit conversion 与 preview split owner,保持缺失 `line_text` 时的既有输出语义,并由 focused tests 覆盖有无 preview 两种路径。 + ## 3. 已建立保护 - owner crate 不得依赖回 `bitfun-core`。 diff --git a/docs/plans/core-decomposition-plan.md b/docs/plans/core-decomposition-plan.md index 0bac5ac63..7f2ca5fa8 100644 --- a/docs/plans/core-decomposition-plan.md +++ b/docs/plans/core-decomposition-plan.md @@ -24,7 +24,7 @@ - Runtime Services、Agent Runtime、Tool Contracts、Tool Execution、Harness、Product Domains、Services Core、Services Integrations 等 owner crate 已建立;部分 concrete 生命周期仍由 core concrete manager 或产品命令路径持有。 - Custom agent / mode / skill、Agent lifecycle、tool side-effect、Computer Use、file tool、MiniApp、DeepReview、DeepResearch、remote-connect、workspace search、remote SSH/SFTP/PTY 等多批 provider-neutral 或 concrete owner 已迁出。 - Root boundary scripts 已覆盖核心 owner 防回流、six-layer path 解析、facade-only 文件、custom agent owner / custom subagent wrapper 保护和重点 feature gate。 -- 当前 `main` 的 boundary check 仍暴露未闭环 owner:Agent Runtime 缺少 session workspace resolution entrypoint;CronTool、SessionControl、SessionMessage 仍缺 port-backed target session / workspace resolution routing;`services-integrations` workspace search result mapping 仍缺 shared flashgrep preview/result conversion owner。上述缺口优先纳入 PR-A / PR-B / PR-C 的实际迁移范围。 +- Agent Runtime session workspace resolution、Cron / SessionControl / SessionMessage / SessionHistory 的 target session/workspace owner routing、`/goal` tool management runtime-port routing,以及 `services-integrations` workspace search preview/result conversion 已纳入已完成摘要;后续计划只保留仍需迁移的 feature/kernel、security/control-plane、execution、extension 和 cross-platform adapter 主体工作。 ## 3. 后续大块 PR 节奏 @@ -34,9 +34,9 @@ 目标: -- 建立 feature bundle / capability pack 的判定口径,把 `/goal`、DeepReview、MiniApp、input command、settings、UI panel 等从 Agent Kernel 能力中分离。 +- 建立 feature bundle / capability pack 的判定口径,把 DeepReview、MiniApp、input command、settings、UI panel 等产品特性从 Agent Kernel 能力中分离;`/goal` 已先完成 model tool 到 AgentRuntime thread-goal management port 的 owner routing。 - 识别仍留在 `bitfun-core` 中的 provider-neutral feature mapping、command-to-runtime request、UI-facing DTO 和 long-running task 相关旧路径。 -- 迁移或显著简化至少一组实际 feature 主体路径,优先选择长程任务 / `/goal` 边界或 DeepReview / MiniApp 的 feature assembly 路径。 +- 迁移或显著简化至少一组实际 feature 主体路径;`/goal` 之后优先选择 DeepReview / MiniApp / input command 的 feature assembly 路径。 保护: diff --git a/scripts/core-boundaries/rules/source/forbidden-rules.mjs b/scripts/core-boundaries/rules/source/forbidden-rules.mjs index 8c0cc7168..da13a1662 100644 --- a/scripts/core-boundaries/rules/source/forbidden-rules.mjs +++ b/scripts/core-boundaries/rules/source/forbidden-rules.mjs @@ -215,12 +215,12 @@ export const forbiddenContentRules = [ 'SessionMessage must submit through AgentRuntime dialog lifecycle port, not direct DialogScheduler', }, { - regex: /\bcoordinator\s*\.\s*resolve_session_workspace_path\b/, + regex: /\bcoordinator\s*\.\s*resolve_session_workspace_binding\b/, message: 'SessionMessage target workspace resolution must flow through AgentRuntime session-management port, not direct coordinator access', }, { - regex: /\bresolve_session_workspace_path\s*\(\s*&?target_session_id\b/, + regex: /\bresolve_session_workspace_binding\s*\(\s*&?target_session_id\b/, message: 'SessionMessage target workspace resolution must use AgentSessionWorkspaceRequest, not legacy direct session id calls', }, @@ -245,12 +245,12 @@ export const forbiddenContentRules = [ 'SessionControl requester-aware cancellation must flow through AgentRuntime cancellation port, not direct DialogScheduler', }, { - regex: /\bcoordinator\s*\.\s*resolve_session_workspace_path\b/, + regex: /\bcoordinator\s*\.\s*resolve_session_workspace_binding\b/, message: 'SessionControl workspace resolution must flow through AgentRuntime session-management port, not direct coordinator access', }, { - regex: /\bresolve_session_workspace_path\s*\(\s*session_id\b/, + regex: /\bresolve_session_workspace_binding\s*\(\s*session_id\b/, message: 'SessionControl workspace resolution must use AgentSessionWorkspaceRequest, not legacy direct session id calls', }, @@ -290,12 +290,12 @@ export const forbiddenContentRules = [ path: 'src/crates/assembly/core/src/agentic/tools/implementations/cron_tool.rs', patterns: [ { - regex: /\bcoordinator\s*\.\s*resolve_session_workspace_path\b/, + regex: /\bcoordinator\s*\.\s*resolve_session_workspace_binding\b/, message: 'CronTool target workspace resolution must flow through AgentRuntime session-management port, not direct coordinator access', }, { - regex: /\bresolve_session_workspace_path\s*\(\s*&?session_id\b/, + regex: /\bresolve_session_workspace_binding\s*\(\s*&?session_id\b/, message: 'CronTool target workspace resolution must use AgentSessionWorkspaceRequest, not legacy direct session id calls', }, diff --git a/scripts/core-boundaries/rules/source/required-rules.mjs b/scripts/core-boundaries/rules/source/required-rules.mjs index 434d377b4..4753a5acf 100644 --- a/scripts/core-boundaries/rules/source/required-rules.mjs +++ b/scripts/core-boundaries/rules/source/required-rules.mjs @@ -179,7 +179,7 @@ export const requiredContentRules = [ message: 'missing agent session delete entrypoint', }, { - regex: /\bresolve_session_workspace_path\b/, + regex: /\bresolve_session_workspace_binding\b/, message: 'missing agent session workspace resolution entrypoint', }, { @@ -1914,8 +1914,8 @@ export const requiredContentRules = [ message: 'missing port-backed cron target session list call', }, { - regex: /\bresolve_session_workspace_path\b/, - message: 'missing port-backed cron target workspace resolution call', + regex: /\bresolve_session_workspace_binding\b/, + message: 'missing port-backed cron target workspace binding resolution call', }, ], }, @@ -4746,8 +4746,8 @@ export const requiredContentRules = [ message: 'missing port-backed agent session delete call', }, { - regex: /\bresolve_session_workspace_path\b/, - message: 'missing port-backed session workspace resolution call', + regex: /\bresolve_session_workspace_binding\b/, + message: 'missing port-backed session workspace binding resolution call', }, { regex: /"createdBy"/, @@ -4789,8 +4789,8 @@ export const requiredContentRules = [ message: 'missing port-backed target session list call', }, { - regex: /\bresolve_session_workspace_path\b/, - message: 'missing port-backed target session workspace resolution call', + regex: /\bresolve_session_workspace_binding\b/, + message: 'missing port-backed target session workspace binding resolution call', }, { regex: /"createdBy"/, diff --git a/scripts/core-boundaries/self-test.mjs b/scripts/core-boundaries/self-test.mjs index b61620854..a561b603a 100644 --- a/scripts/core-boundaries/self-test.mjs +++ b/scripts/core-boundaries/self-test.mjs @@ -743,7 +743,7 @@ export function runManifestParserSelfTest({ throw new Error('SessionMessage boundary rule must forbid direct scheduler submit'); } const sessionMessageLegacySessionAccessContracts = [ - 'resolve_session_workspace_path', + 'resolve_session_workspace_binding', 'list_sessions', ]; for (const contract of sessionMessageLegacySessionAccessContracts) { @@ -758,7 +758,7 @@ export function runManifestParserSelfTest({ throw new Error('SessionControl boundary rule must forbid direct scheduler cancellation'); } const sessionControlLegacySessionAccessContracts = [ - 'resolve_session_workspace_path', + 'resolve_session_workspace_binding', 'list_sessions', 'delete_session', ]; @@ -777,7 +777,7 @@ export function runManifestParserSelfTest({ 'src/crates/assembly/core/src/agentic/tools/implementations/cron_tool.rs', ); const cronToolLegacySessionAccessContracts = [ - 'resolve_session_workspace_path', + 'resolve_session_workspace_binding', 'list_sessions', ]; for (const contract of cronToolLegacySessionAccessContracts) { @@ -1035,7 +1035,7 @@ export function runManifestParserSelfTest({ 'MissingSessionManagementPort', 'list_sessions', 'delete_session', - 'resolve_session_workspace_path', + 'resolve_session_workspace_binding', 'session_management_delegates_to_registered_port', 'RuntimeServices', 'RuntimeEventEnvelope', @@ -1518,7 +1518,7 @@ export function runManifestParserSelfTest({ 'AgentSessionListRequest', 'AgentSessionWorkspaceRequest', 'list_sessions', - 'resolve_session_workspace_path', + 'resolve_session_workspace_binding', ], }, { @@ -1962,7 +1962,7 @@ export function runManifestParserSelfTest({ 'AgentSessionWorkspaceRequest', 'list_sessions', 'delete_session', - 'resolve_session_workspace_path', + 'resolve_session_workspace_binding', '"createdBy"', 'AgentTurnCancellationRequest', 'requester_session_id', @@ -1976,7 +1976,7 @@ export function runManifestParserSelfTest({ 'AgentSessionListRequest', 'AgentSessionWorkspaceRequest', 'list_sessions', - 'resolve_session_workspace_path', + 'resolve_session_workspace_binding', '"createdBy"', 'AgentDialogTurnRequest', 'AgentDialogPrependedReminder', diff --git a/src/crates/assembly/core/src/agentic/coordination/coordinator.rs b/src/crates/assembly/core/src/agentic/coordination/coordinator.rs index e483942d7..3f653d751 100644 --- a/src/crates/assembly/core/src/agentic/coordination/coordinator.rs +++ b/src/crates/assembly/core/src/agentic/coordination/coordinator.rs @@ -6099,6 +6099,7 @@ fn runtime_session_summary(session: SessionSummary) -> bitfun_runtime_ports::Age fn runtime_session_workspace_binding(binding: WorkspaceBinding) -> AgentSessionWorkspaceBinding { AgentSessionWorkspaceBinding { + workspace_id: binding.workspace_id.clone(), workspace_path: binding.root_path_string(), remote_connection_id: binding.connection_id().map(ToOwned::to_owned), remote_ssh_host: if binding.is_remote() { @@ -6109,6 +6110,27 @@ fn runtime_session_workspace_binding(binding: WorkspaceBinding) -> AgentSessionW } } +fn runtime_port_error_from_bitfun(error: BitFunError) -> bitfun_runtime_ports::PortError { + let (kind, message) = match error { + BitFunError::Validation(message) => { + (bitfun_runtime_ports::PortErrorKind::InvalidRequest, message) + } + BitFunError::NotFound(message) => (bitfun_runtime_ports::PortErrorKind::NotFound, message), + BitFunError::Cancelled(message) => { + (bitfun_runtime_ports::PortErrorKind::Cancelled, message) + } + BitFunError::Timeout(message) => (bitfun_runtime_ports::PortErrorKind::Timeout, message), + BitFunError::NotImplemented(message) => { + (bitfun_runtime_ports::PortErrorKind::NotAvailable, message) + } + other => ( + bitfun_runtime_ports::PortErrorKind::Backend, + other.to_string(), + ), + }; + bitfun_runtime_ports::PortError::new(kind, message) +} + #[async_trait::async_trait] impl bitfun_runtime_ports::AgentSessionManagementPort for ConversationCoordinator { async fn list_sessions( @@ -6184,6 +6206,49 @@ impl bitfun_runtime_ports::AgentSessionManagementPort for ConversationCoordinato } } +#[async_trait::async_trait] +impl bitfun_runtime_ports::AgentThreadGoalManagementPort for ConversationCoordinator { + async fn get_thread_goal( + &self, + request: bitfun_runtime_ports::AgentThreadGoalGetRequest, + ) -> bitfun_runtime_ports::PortResult> { + self.get_thread_goal( + &request.session_id, + std::path::Path::new(&request.workspace_path), + ) + .await + .map_err(runtime_port_error_from_bitfun) + } + + async fn create_thread_goal( + &self, + request: bitfun_runtime_ports::AgentThreadGoalCreateRequest, + ) -> bitfun_runtime_ports::PortResult { + self.create_thread_goal( + &request.session_id, + std::path::Path::new(&request.workspace_path), + request.objective, + request.token_budget, + ) + .await + .map_err(runtime_port_error_from_bitfun) + } + + async fn update_thread_goal_status( + &self, + request: bitfun_runtime_ports::AgentThreadGoalUpdateStatusRequest, + ) -> bitfun_runtime_ports::PortResult { + self.update_thread_goal_status( + &request.session_id, + std::path::Path::new(&request.workspace_path), + request.status, + request.turn_id.as_deref(), + ) + .await + .map_err(runtime_port_error_from_bitfun) + } +} + #[async_trait::async_trait] impl bitfun_runtime_ports::AgentTurnCancellationPort for ConversationCoordinator { async fn cancel_turn( diff --git a/src/crates/assembly/core/src/agentic/tools/implementations/cron_tool.rs b/src/crates/assembly/core/src/agentic/tools/implementations/cron_tool.rs index 56bbd28c1..6cf5ccc3e 100644 --- a/src/crates/assembly/core/src/agentic/tools/implementations/cron_tool.rs +++ b/src/crates/assembly/core/src/agentic/tools/implementations/cron_tool.rs @@ -12,12 +12,17 @@ use crate::service::{ }, get_global_cron_service, }; +use crate::service_agent_runtime::CoreServiceAgentRuntime; use crate::util::errors::{BitFunError, BitFunResult}; use async_trait::async_trait; +use bitfun_agent_runtime::sdk::AgentRuntime; +use bitfun_runtime_ports::{ + AgentSessionListRequest, AgentSessionWorkspaceBinding, AgentSessionWorkspaceRequest, +}; use chrono::{DateTime, Local, SecondsFormat, TimeZone}; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; -use std::path::{Path, PathBuf}; +use std::path::Path; const DEFAULT_JOB_NAME: &str = "Cron job"; @@ -122,23 +127,34 @@ impl CronTool { &self, session_id: &str, context: &ToolUseContext, - ) -> BitFunResult { - if let Some(coordinator) = get_global_coordinator() { - if let Some(resolved) = coordinator - .get_session_manager() - .resolve_session_workspace_binding(session_id) + ) -> BitFunResult { + if let Some(runtime) = Self::agent_runtime()? { + if let Some(binding) = runtime + .resolve_session_workspace_binding(AgentSessionWorkspaceRequest { + session_id: session_id.to_string(), + }) .await + .map_err(|error| { + BitFunError::tool(CoreServiceAgentRuntime::runtime_error_message(error)) + })? { - return Ok(resolved); + let workspace_ref = Self::workspace_ref_from_agent_binding(binding); + Self::ensure_target_session_visible(&runtime, &workspace_ref, session_id).await?; + return Ok(workspace_ref); } } if context.session_id.as_deref() == Some(session_id) { if let Some(binding) = context.workspace.as_ref() { - return Ok(binding.clone()); + return Ok(Self::workspace_ref_from_context_binding(binding)); } let resolved = self.resolve_workspace_from_context(context)?; - return Ok(WorkspaceBinding::new(None, PathBuf::from(resolved))); + return Ok(CronWorkspaceRef { + workspace_id: None, + workspace_path: resolved, + remote_connection_id: None, + remote_ssh_host: None, + }); } Err(BitFunError::tool(format!( @@ -147,6 +163,57 @@ impl CronTool { ))) } + fn agent_runtime() -> BitFunResult> { + let Some(coordinator) = get_global_coordinator() else { + return Ok(None); + }; + CoreServiceAgentRuntime::agent_runtime(coordinator) + .map(Some) + .map_err(BitFunError::tool) + } + + async fn ensure_target_session_visible( + runtime: &AgentRuntime, + workspace_ref: &CronWorkspaceRef, + session_id: &str, + ) -> BitFunResult<()> { + let sessions = runtime + .list_sessions(AgentSessionListRequest { + workspace_path: workspace_ref.workspace_path.clone(), + remote_connection_id: workspace_ref.remote_connection_id.clone(), + remote_ssh_host: workspace_ref.remote_ssh_host.clone(), + }) + .await + .map_err(|error| { + BitFunError::tool(CoreServiceAgentRuntime::runtime_error_message(error)) + })?; + if sessions + .iter() + .any(|session| session.session_id == session_id) + { + return Ok(()); + } + + let resolved_agent_type = runtime + .resolve_session_agent_type(session_id) + .await + .map_err(|error| { + BitFunError::tool(CoreServiceAgentRuntime::runtime_error_message(error)) + })?; + if resolved_agent_type + .as_deref() + .map(|value| !value.trim().is_empty()) + .unwrap_or(false) + { + return Ok(()); + } + + Err(BitFunError::NotFound(format!( + "Session '{}' not found in workspace '{}'", + session_id, workspace_ref.workspace_path + ))) + } + fn resolve_effective_session_id( &self, session_id: Option<&str>, @@ -173,7 +240,7 @@ impl CronTool { } } - fn workspace_ref_from_binding(&self, binding: &WorkspaceBinding) -> CronWorkspaceRef { + fn workspace_ref_from_context_binding(binding: &WorkspaceBinding) -> CronWorkspaceRef { CronWorkspaceRef { workspace_id: binding.workspace_id.clone(), workspace_path: binding.root_path_string(), @@ -187,6 +254,15 @@ impl CronTool { } } + fn workspace_ref_from_agent_binding(binding: AgentSessionWorkspaceBinding) -> CronWorkspaceRef { + CronWorkspaceRef { + workspace_id: binding.workspace_id, + workspace_path: binding.workspace_path, + remote_connection_id: binding.remote_connection_id, + remote_ssh_host: binding.remote_ssh_host, + } + } + fn normalize_optional_name(name: Option) -> BitFunResult> { match name { Some(name) if name.trim().is_empty() => Err(BitFunError::tool( @@ -973,11 +1049,10 @@ Patch schema for "update": .ok_or_else(|| BitFunError::tool("cron service not initialized".to_string()))?; let session_id = self.resolve_effective_session_id(params.session_id.as_deref(), context)?; - let workspace_binding = self + let workspace_ref = self .resolve_effective_workspace_for_session(&session_id, context) .await?; - let workspace = workspace_binding.root_path_string(); - let workspace_ref = self.workspace_ref_from_binding(&workspace_binding); + let workspace = workspace_ref.workspace_path.clone(); let mut jobs = cron_service .list_jobs_filtered( Some(&workspace_ref.workspace_path), @@ -1015,16 +1090,15 @@ Patch schema for "update": .ok_or_else(|| BitFunError::tool("cron service not initialized".to_string()))?; let session_id = self.resolve_effective_session_id(params.session_id.as_deref(), context)?; - let workspace_binding = self + let workspace_ref = self .resolve_effective_workspace_for_session(&session_id, context) .await?; - let workspace = workspace_binding.root_path_string(); + let workspace = workspace_ref.workspace_path.clone(); let job = params .job .ok_or_else(|| BitFunError::tool("job is required for add".to_string()))?; Self::validate_payload(&job.payload, "job")?; - let target_workspace = self.workspace_ref_from_binding(&workspace_binding); let created = cron_service .create_job(CreateCronJobRequest { @@ -1034,7 +1108,7 @@ Patch schema for "update": enabled: job.enabled.unwrap_or(true), target: CronJobTarget::Session { session_id: session_id.clone(), - workspace: target_workspace, + workspace: workspace_ref, }, }) .await?; @@ -1282,16 +1356,16 @@ mod tests { } #[test] - fn workspace_ref_from_binding_uses_remote_context_identity() { - let tool = CronTool::new(); + fn workspace_ref_from_context_binding_uses_remote_context_identity() { let context = remote_context( "/home/wsp/projects/test", Some("remote_workspace_1".to_string()), Some("session-1"), ); - let workspace_ref = - tool.workspace_ref_from_binding(context.workspace.as_ref().expect("workspace binding")); + let workspace_ref = CronTool::workspace_ref_from_context_binding( + context.workspace.as_ref().expect("workspace binding"), + ); assert_eq!( workspace_ref.workspace_id.as_deref(), @@ -1304,4 +1378,23 @@ mod tests { ); assert_eq!(workspace_ref.remote_ssh_host.as_deref(), Some("ssh.dev")); } + + #[test] + fn workspace_ref_from_agent_binding_preserves_full_workspace_identity() { + let workspace_ref = + CronTool::workspace_ref_from_agent_binding(AgentSessionWorkspaceBinding { + workspace_id: Some("workspace-1".to_string()), + workspace_path: "/home/wsp/projects/test".to_string(), + remote_connection_id: Some("conn-1".to_string()), + remote_ssh_host: Some("ssh.dev".to_string()), + }); + + assert_eq!(workspace_ref.workspace_id.as_deref(), Some("workspace-1")); + assert_eq!(workspace_ref.workspace_path, "/home/wsp/projects/test"); + assert_eq!( + workspace_ref.remote_connection_id.as_deref(), + Some("conn-1") + ); + assert_eq!(workspace_ref.remote_ssh_host.as_deref(), Some("ssh.dev")); + } } diff --git a/src/crates/assembly/core/src/agentic/tools/implementations/session_history_tool.rs b/src/crates/assembly/core/src/agentic/tools/implementations/session_history_tool.rs index f1c6a2110..53639fbde 100644 --- a/src/crates/assembly/core/src/agentic/tools/implementations/session_history_tool.rs +++ b/src/crates/assembly/core/src/agentic/tools/implementations/session_history_tool.rs @@ -1,10 +1,10 @@ -use crate::agentic::coordination::get_global_coordinator; use crate::agentic::persistence::PersistenceManager; use crate::agentic::tools::framework::{ Tool, ToolExposure, ToolRenderOptions, ToolResult, ToolUseContext, ValidationResult, }; use crate::infrastructure::PathManager; use crate::service::session::SessionTranscriptExportOptions; +use crate::service_agent_runtime::CoreServiceAgentRuntime; use crate::util::errors::{BitFunError, BitFunResult}; use async_trait::async_trait; use serde::Deserialize; @@ -248,20 +248,16 @@ Examples: .map_err(|e| BitFunError::tool(format!("Invalid input: {}", e)))?; let session_id = self.resolve_session_id(¶ms.session_id)?; - let coordinator = get_global_coordinator() - .ok_or_else(|| BitFunError::tool("coordinator not initialized".to_string()))?; - let workspace = coordinator - .get_session_manager() - .resolve_session_workspace_binding(&session_id) - .await - .ok_or_else(|| { - BitFunError::NotFound(format!( - "Workspace for session '{}' could not be resolved", - session_id - )) - })?; - let display_workspace = workspace.root_path_string(); - let session_storage_dir = workspace.session_storage_dir(); + let (display_workspace, session_storage_dir) = + CoreServiceAgentRuntime::resolve_session_workspace_paths(&session_id) + .await + .ok_or_else(|| { + BitFunError::NotFound(format!( + "Workspace for session '{}' could not be resolved", + session_id + )) + })?; + let display_workspace = display_workspace.to_string_lossy().into_owned(); let manager = PersistenceManager::new(Arc::new(PathManager::new()?))?; let transcript = manager .export_session_transcript( diff --git a/src/crates/assembly/core/src/agentic/tools/implementations/session_message_tool.rs b/src/crates/assembly/core/src/agentic/tools/implementations/session_message_tool.rs index cf882cd43..e6adcfb4b 100644 --- a/src/crates/assembly/core/src/agentic/tools/implementations/session_message_tool.rs +++ b/src/crates/assembly/core/src/agentic/tools/implementations/session_message_tool.rs @@ -11,7 +11,8 @@ use crate::util::errors::{BitFunError, BitFunResult}; use async_trait::async_trait; use bitfun_runtime_ports::{ AgentDialogPrependedReminder, AgentDialogTurnRequest, AgentSessionCreateRequest, - AgentSessionReplyRoute, AgentSessionWorkspaceBinding, AgentSessionWorkspaceRequest, + AgentSessionListRequest, AgentSessionReplyRoute, AgentSessionSummary, + AgentSessionWorkspaceBinding, AgentSessionWorkspaceRequest, }; use serde::Deserialize; use serde_json::{json, Value}; @@ -210,6 +211,22 @@ impl SessionMessageTool { && left.remote_ssh_host == right.remote_ssh_host } + fn target_agent_type_from_resolution(agent_type: Option) -> Option { + agent_type.filter(|value| !value.trim().is_empty()) + } + + fn target_agent_type_from_sessions( + sessions: &[AgentSessionSummary], + target_session_id: &str, + ) -> Option { + sessions + .iter() + .find(|session| { + session.session_id == target_session_id && !session.agent_type.trim().is_empty() + }) + .map(|session| session.agent_type.clone()) + } + fn format_forwarded_message( &self, message: &str, @@ -551,14 +568,34 @@ Allowed agent types when creating a session: } } - let target_agent_type = runtime - .resolve_session_agent_type(&target_session_id) + let visible_sessions = runtime + .list_sessions(AgentSessionListRequest { + workspace_path: workspace_target.workspace_path.clone(), + remote_connection_id: workspace_target.remote_connection_id.clone(), + remote_ssh_host: workspace_target.remote_ssh_host.clone(), + }) .await .map_err(|error| { BitFunError::tool(CoreServiceAgentRuntime::runtime_error_message(error)) - })? - .filter(|value| !value.trim().is_empty()) - .ok_or_else(|| { + })?; + let listed_agent_type = + Self::target_agent_type_from_sessions(&visible_sessions, &target_session_id); + let resolved_agent_type = if listed_agent_type.is_none() { + Self::target_agent_type_from_resolution( + runtime + .resolve_session_agent_type(&target_session_id) + .await + .map_err(|error| { + BitFunError::tool(CoreServiceAgentRuntime::runtime_error_message( + error, + )) + })?, + ) + } else { + None + }; + let target_agent_type = + listed_agent_type.or(resolved_agent_type).ok_or_else(|| { BitFunError::NotFound(format!("Session '{}' not found", target_session_id)) })?; @@ -764,6 +801,55 @@ mod tests { )); } + #[test] + fn target_agent_type_rejects_empty_agent_type_resolution() { + assert_eq!( + SessionMessageTool::target_agent_type_from_resolution(Some(" ".to_string())), + None + ); + } + + #[test] + fn target_agent_type_uses_resolved_agent_type() { + assert_eq!( + SessionMessageTool::target_agent_type_from_resolution(Some("agentic".to_string())) + .as_deref(), + Some("agentic") + ); + } + + #[test] + fn target_agent_type_uses_matching_session_agent_type() { + let sessions = vec![AgentSessionSummary { + session_id: "worker_1".to_string(), + session_name: "Worker".to_string(), + agent_type: "agentic".to_string(), + created_at_ms: 1, + last_active_at_ms: 2, + }]; + + assert_eq!( + SessionMessageTool::target_agent_type_from_sessions(&sessions, "worker_1").as_deref(), + Some("agentic") + ); + } + + #[test] + fn target_agent_type_rejects_empty_session_agent_type() { + let sessions = vec![AgentSessionSummary { + session_id: "worker_1".to_string(), + session_name: "Worker".to_string(), + agent_type: " ".to_string(), + created_at_ms: 1, + last_active_at_ms: 2, + }]; + + assert_eq!( + SessionMessageTool::target_agent_type_from_sessions(&sessions, "worker_1"), + None + ); + } + #[tokio::test] async fn validate_existing_session_rejects_agent_type_override() { let tool = SessionMessageTool::new(); diff --git a/src/crates/assembly/core/src/agentic/tools/implementations/thread_goal_tools.rs b/src/crates/assembly/core/src/agentic/tools/implementations/thread_goal_tools.rs index 5099b61b0..d52a1a832 100644 --- a/src/crates/assembly/core/src/agentic/tools/implementations/thread_goal_tools.rs +++ b/src/crates/assembly/core/src/agentic/tools/implementations/thread_goal_tools.rs @@ -3,19 +3,24 @@ use crate::agentic::coordination::get_global_coordinator; use crate::agentic::goal_mode::user_facing_thread_goal_error; use crate::agentic::tools::framework::{Tool, ToolResult, ToolUseContext}; +use crate::service_agent_runtime::CoreServiceAgentRuntime; use crate::util::errors::{BitFunError, BitFunResult}; use async_trait::async_trait; +use bitfun_agent_runtime::sdk::RuntimeError; use bitfun_agent_runtime::thread_goal_tools::{ build_goal_tool_result, parse_create_goal_args, parse_update_goal_args, parse_update_goal_status, CREATE_GOAL_TOOL_NAME, GET_GOAL_TOOL_NAME, UPDATE_GOAL_TOOL_NAME, }; -use bitfun_runtime_ports::ThreadGoalStatus; +use bitfun_runtime_ports::{ + AgentThreadGoalCreateRequest, AgentThreadGoalGetRequest, AgentThreadGoalUpdateStatusRequest, + PortError, PortErrorKind, ThreadGoalStatus, +}; use serde_json::{json, Value}; -fn require_coordinator( -) -> BitFunResult> { - get_global_coordinator() - .ok_or_else(|| BitFunError::Validation("coordinator is unavailable".to_string())) +fn require_agent_runtime() -> BitFunResult { + let coordinator = get_global_coordinator() + .ok_or_else(|| BitFunError::Validation("coordinator is unavailable".to_string()))?; + CoreServiceAgentRuntime::agent_runtime(coordinator).map_err(BitFunError::tool) } fn require_session_context(context: &ToolUseContext) -> BitFunResult<(String, std::path::PathBuf)> { @@ -30,6 +35,34 @@ fn require_session_context(context: &ToolUseContext) -> BitFunResult<(String, st Ok((session_id, workspace_path)) } +fn thread_goal_runtime_error(error: RuntimeError) -> BitFunError { + match error { + RuntimeError::Port(port_error) => thread_goal_port_error(port_error), + other => user_facing_thread_goal_error(BitFunError::Tool( + CoreServiceAgentRuntime::runtime_error_message(other), + )), + } +} + +fn thread_goal_port_error(port_error: PortError) -> BitFunError { + match port_error.kind { + PortErrorKind::InvalidRequest => BitFunError::Validation(port_error.message), + PortErrorKind::NotFound => BitFunError::NotFound(port_error.message), + PortErrorKind::Cancelled => { + user_facing_thread_goal_error(BitFunError::Cancelled(port_error.message)) + } + PortErrorKind::Timeout => { + user_facing_thread_goal_error(BitFunError::Timeout(port_error.message)) + } + PortErrorKind::NotAvailable => { + user_facing_thread_goal_error(BitFunError::NotImplemented(port_error.message)) + } + PortErrorKind::PermissionDenied | PortErrorKind::Backend => { + user_facing_thread_goal_error(BitFunError::Tool(port_error.message)) + } + } +} + pub struct GetGoalTool; impl GetGoalTool { @@ -38,6 +71,38 @@ impl GetGoalTool { } } +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn thread_goal_port_error_preserves_user_facing_policy() { + let invalid = thread_goal_port_error(PortError::new( + PortErrorKind::InvalidRequest, + "missing objective", + )); + let not_found = thread_goal_port_error(PortError::new( + PortErrorKind::NotFound, + "thread goal not found", + )); + let timeout = thread_goal_port_error(PortError::new(PortErrorKind::Timeout, "store lag")); + + assert!(matches!( + invalid, + BitFunError::Validation(message) if message == "missing objective" + )); + assert!(matches!( + not_found, + BitFunError::NotFound(message) if message == "thread goal not found" + )); + assert!(matches!( + timeout, + BitFunError::Validation(message) + if message == "Thread goal operation failed. Check session state and try again." + )); + } +} + impl Default for GetGoalTool { fn default() -> Self { Self::new() @@ -75,12 +140,15 @@ impl Tool for GetGoalTool { _input: &Value, context: &ToolUseContext, ) -> BitFunResult> { - let coordinator = require_coordinator()?; + let runtime = require_agent_runtime()?; let (session_id, workspace_path) = require_session_context(context)?; - let goal = coordinator - .get_thread_goal(&session_id, workspace_path.as_path()) + let goal = runtime + .get_thread_goal(AgentThreadGoalGetRequest { + session_id, + workspace_path: workspace_path.to_string_lossy().into_owned(), + }) .await - .map_err(user_facing_thread_goal_error)?; + .map_err(thread_goal_runtime_error)?; let result = build_goal_tool_result(goal, false) .map_err(|error| BitFunError::Validation(error.to_string()))?; Ok(vec![ToolResult::Result { @@ -147,17 +215,17 @@ Set token_budget only when an explicit token budget is requested. Fails if a goa ) -> BitFunResult> { let parsed = parse_create_goal_args(input.clone()) .map_err(|error| BitFunError::Validation(error.to_string()))?; - let coordinator = require_coordinator()?; + let runtime = require_agent_runtime()?; let (session_id, workspace_path) = require_session_context(context)?; - let goal = coordinator - .create_thread_goal( - &session_id, - workspace_path.as_path(), - parsed.objective, - parsed.token_budget, - ) + let goal = runtime + .create_thread_goal(AgentThreadGoalCreateRequest { + session_id, + workspace_path: workspace_path.to_string_lossy().into_owned(), + objective: parsed.objective, + token_budget: parsed.token_budget, + }) .await - .map_err(user_facing_thread_goal_error)?; + .map_err(thread_goal_runtime_error)?; let result = build_goal_tool_result(Some(goal), false) .map_err(|error| BitFunError::Validation(error.to_string()))?; Ok(vec![ToolResult::Result { @@ -226,17 +294,17 @@ You cannot use this tool to pause, resume, budget-limit, or usage-limit a goal." .map_err(|error| BitFunError::Validation(error.to_string()))?; let status = parse_update_goal_status(&parsed.status) .map_err(|error| BitFunError::Validation(error.to_string()))?; - let coordinator = require_coordinator()?; + let runtime = require_agent_runtime()?; let (session_id, workspace_path) = require_session_context(context)?; - let goal = coordinator - .update_thread_goal_status( - &session_id, - workspace_path.as_path(), + let goal = runtime + .update_thread_goal_status(AgentThreadGoalUpdateStatusRequest { + session_id, + workspace_path: workspace_path.to_string_lossy().into_owned(), status, - context.dialog_turn_id.as_deref(), - ) + turn_id: context.dialog_turn_id.clone(), + }) .await - .map_err(user_facing_thread_goal_error)?; + .map_err(thread_goal_runtime_error)?; let include_report = status == ThreadGoalStatus::Complete; let result = build_goal_tool_result(Some(goal), include_report) .map_err(|error| BitFunError::Validation(error.to_string()))?; diff --git a/src/crates/assembly/core/src/service_agent_runtime.rs b/src/crates/assembly/core/src/service_agent_runtime.rs index 780f2a04e..0121f01e3 100644 --- a/src/crates/assembly/core/src/service_agent_runtime.rs +++ b/src/crates/assembly/core/src/service_agent_runtime.rs @@ -9,10 +9,10 @@ use bitfun_agent_runtime::sdk::{AgentRuntime, AgentRuntimeBuilder, RuntimeError} use bitfun_runtime_ports::{ AgentDialogTurnPort, AgentDialogTurnRequest, AgentInputAttachment, AgentLifecycleDeliveryPort, AgentSessionCreateRequest, AgentSessionManagementPort, AgentSubmissionPort, - AgentSubmissionSource, AgentTurnCancellationPort, AgentTurnCancellationRequest, - RemoteControlStatePort, RemoteControlStateRequest, RemoteControlStateSnapshot, - RemoteSessionWorkspaceIdentity, RuntimeServiceCapability, RuntimeServicePort, - SessionStoragePathRequest, SessionStorePort, + AgentSubmissionSource, AgentThreadGoalManagementPort, AgentTurnCancellationPort, + AgentTurnCancellationRequest, RemoteControlStatePort, RemoteControlStateRequest, + RemoteControlStateSnapshot, RemoteSessionWorkspaceIdentity, RuntimeServiceCapability, + RuntimeServicePort, SessionStoragePathRequest, SessionStorePort, }; use bitfun_services_integrations::remote_connect::{ build_remote_chat_messages, build_remote_model_catalog, @@ -44,6 +44,7 @@ use crate::agentic::coordination::{ }; use crate::agentic::image_analysis::ImageContextData; use crate::agentic::session::session_store_port::CoreSessionStorePort; +use crate::agentic::workspace::WorkspaceBinding; use crate::service::remote_connect::remote_server::RemoteExecutionDispatcher; use crate::service::config::types::{AIConfig, GlobalConfig, ModelCapability, ReasoningMode}; @@ -473,6 +474,7 @@ fn agent_input_attachment_from_image_context(context: ImageContextData) -> Agent fn core_agent_runtime_builder( submission: Arc, session_management: Arc, + thread_goal_management: Arc, cancellation: Arc, ) -> AgentRuntimeBuilder { let agent_registry: Arc = @@ -480,6 +482,7 @@ fn core_agent_runtime_builder( AgentRuntimeBuilder::new() .with_submission_port(submission) .with_session_management_port(session_management) + .with_thread_goal_management_port(thread_goal_management) .with_cancellation_port(cancellation) .with_agent_registry(agent_registry) } @@ -499,26 +502,41 @@ impl RemoteImageContextAdapter for ImageContextData { pub(crate) struct CoreServiceAgentRuntime; impl CoreServiceAgentRuntime { - pub(crate) async fn resolve_session_storage_dir( - session_id: &str, - ) -> Option { + async fn resolve_session_workspace_binding(session_id: &str) -> Option { let coordinator = get_global_coordinator()?; coordinator .get_session_manager() .resolve_session_workspace_binding(session_id) .await - .map(|binding| binding.session_storage_dir()) + } + + pub(crate) async fn resolve_session_workspace_paths( + session_id: &str, + ) -> Option<(std::path::PathBuf, std::path::PathBuf)> { + Self::resolve_session_workspace_binding(session_id) + .await + .map(|binding| { + ( + binding.logical_workspace_path().to_path_buf(), + binding.session_storage_dir(), + ) + }) + } + + pub(crate) async fn resolve_session_storage_dir( + session_id: &str, + ) -> Option { + Self::resolve_session_workspace_paths(session_id) + .await + .map(|(_, storage_dir)| storage_dir) } pub(crate) async fn resolve_session_logical_workspace_path( session_id: &str, ) -> Option { - let coordinator = get_global_coordinator()?; - coordinator - .get_session_manager() - .resolve_session_workspace_binding(session_id) + Self::resolve_session_workspace_paths(session_id) .await - .map(|binding| binding.logical_workspace_path().to_path_buf()) + .map(|(workspace_path, _)| workspace_path) } pub(crate) async fn resolve_remote_file_workspace_root( @@ -749,10 +767,16 @@ impl CoreServiceAgentRuntime { ) -> Result { let submission: Arc = coordinator.clone(); let session_management: Arc = coordinator.clone(); + let thread_goal_management: Arc = coordinator.clone(); let cancellation: Arc = coordinator; - core_agent_runtime_builder(submission, session_management, cancellation) - .build() - .map_err(|error| error.to_string()) + core_agent_runtime_builder( + submission, + session_management, + thread_goal_management, + cancellation, + ) + .build() + .map_err(|error| error.to_string()) } pub(crate) fn agent_runtime_with_dialog_turns( @@ -761,14 +785,20 @@ impl CoreServiceAgentRuntime { ) -> Result { let submission: Arc = coordinator.clone(); let session_management: Arc = coordinator.clone(); + let thread_goal_management: Arc = coordinator.clone(); let cancellation: Arc = coordinator; let dialog_turn: Arc = scheduler.clone(); let lifecycle_delivery: Arc = scheduler; - core_agent_runtime_builder(submission, session_management, cancellation) - .with_dialog_turn_port(dialog_turn) - .with_lifecycle_delivery_port(lifecycle_delivery) - .build() - .map_err(|error| error.to_string()) + core_agent_runtime_builder( + submission, + session_management, + thread_goal_management, + cancellation, + ) + .with_dialog_turn_port(dialog_turn) + .with_lifecycle_delivery_port(lifecycle_delivery) + .build() + .map_err(|error| error.to_string()) } pub(crate) fn agent_runtime_with_lifecycle_delivery( @@ -777,12 +807,18 @@ impl CoreServiceAgentRuntime { ) -> Result { let submission: Arc = coordinator.clone(); let session_management: Arc = coordinator.clone(); + let thread_goal_management: Arc = coordinator.clone(); let cancellation: Arc = coordinator; let lifecycle_delivery: Arc = scheduler; - core_agent_runtime_builder(submission, session_management, cancellation) - .with_lifecycle_delivery_port(lifecycle_delivery) - .build() - .map_err(|error| error.to_string()) + core_agent_runtime_builder( + submission, + session_management, + thread_goal_management, + cancellation, + ) + .with_lifecycle_delivery_port(lifecycle_delivery) + .build() + .map_err(|error| error.to_string()) } pub(crate) fn agent_runtime_with_scheduler_ports( @@ -790,15 +826,21 @@ impl CoreServiceAgentRuntime { scheduler: Arc, ) -> Result { let submission: Arc = coordinator.clone(); - let session_management: Arc = coordinator; + let session_management: Arc = coordinator.clone(); + let thread_goal_management: Arc = coordinator; let cancellation: Arc = scheduler.clone(); let dialog_turn: Arc = scheduler.clone(); let lifecycle_delivery: Arc = scheduler; - core_agent_runtime_builder(submission, session_management, cancellation) - .with_dialog_turn_port(dialog_turn) - .with_lifecycle_delivery_port(lifecycle_delivery) - .build() - .map_err(|error| error.to_string()) + core_agent_runtime_builder( + submission, + session_management, + thread_goal_management, + cancellation, + ) + .with_dialog_turn_port(dialog_turn) + .with_lifecycle_delivery_port(lifecycle_delivery) + .build() + .map_err(|error| error.to_string()) } pub(crate) fn global_agent_runtime_with_lifecycle_delivery() -> Result { @@ -1456,6 +1498,7 @@ mod tests { where T: AgentSubmissionPort + AgentSessionManagementPort + + AgentThreadGoalManagementPort + AgentTurnCancellationPort + RemoteControlStatePort + SessionTranscriptReader, diff --git a/src/crates/contracts/runtime-ports/src/lib.rs b/src/crates/contracts/runtime-ports/src/lib.rs index d7fa09836..26a43b373 100644 --- a/src/crates/contracts/runtime-ports/src/lib.rs +++ b/src/crates/contracts/runtime-ports/src/lib.rs @@ -665,6 +665,8 @@ pub struct AgentSessionWorkspaceRequest { #[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct AgentSessionWorkspaceBinding { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub workspace_id: Option, pub workspace_path: String, #[serde(default, skip_serializing_if = "Option::is_none")] pub remote_connection_id: Option, @@ -1124,6 +1126,33 @@ pub struct ThreadGoalToolResponse { pub completion_budget_report: Option, } +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AgentThreadGoalGetRequest { + pub session_id: String, + pub workspace_path: String, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AgentThreadGoalCreateRequest { + pub session_id: String, + pub workspace_path: String, + pub objective: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub token_budget: Option, +} + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct AgentThreadGoalUpdateStatusRequest { + pub session_id: String, + pub workspace_path: String, + pub status: ThreadGoalStatus, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub turn_id: Option, +} + #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct CompressionContract { #[serde(default, skip_serializing_if = "Vec::is_empty")] @@ -1288,6 +1317,24 @@ pub trait AgentLifecycleDeliveryPort: Send + Sync { async fn deliver_thread_goal(&self, request: AgentThreadGoalDeliveryRequest) -> PortResult<()>; } +#[async_trait::async_trait] +pub trait AgentThreadGoalManagementPort: Send + Sync { + async fn get_thread_goal( + &self, + request: AgentThreadGoalGetRequest, + ) -> PortResult>; + + async fn create_thread_goal( + &self, + request: AgentThreadGoalCreateRequest, + ) -> PortResult; + + async fn update_thread_goal_status( + &self, + request: AgentThreadGoalUpdateStatusRequest, + ) -> PortResult; +} + #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct AgentTurnCancellationRequest { @@ -2091,6 +2138,37 @@ mod tests { assert_eq!(json["goal"]["goalId"], "goal_1"); } + #[test] + fn agent_thread_goal_management_requests_serialize_stable_shape() { + let get_request = AgentThreadGoalGetRequest { + session_id: "session_1".to_string(), + workspace_path: "/workspace/project".to_string(), + }; + let create_request = AgentThreadGoalCreateRequest { + session_id: "session_1".to_string(), + workspace_path: "/workspace/project".to_string(), + objective: "Ship the refactor".to_string(), + token_budget: Some(1000), + }; + let update_request = AgentThreadGoalUpdateStatusRequest { + session_id: "session_1".to_string(), + workspace_path: "/workspace/project".to_string(), + status: ThreadGoalStatus::Complete, + turn_id: Some("turn_1".to_string()), + }; + + let get_json = serde_json::to_value(get_request).expect("serialize get request"); + let create_json = serde_json::to_value(create_request).expect("serialize create request"); + let update_json = serde_json::to_value(update_request).expect("serialize update request"); + + assert_eq!(get_json["sessionId"], "session_1"); + assert_eq!(get_json["workspacePath"], "/workspace/project"); + assert_eq!(create_json["objective"], "Ship the refactor"); + assert_eq!(create_json["tokenBudget"], 1000); + assert_eq!(update_json["status"], "complete"); + assert_eq!(update_json["turnId"], "turn_1"); + } + #[test] fn agent_turn_cancellation_request_serializes_current_contract() { let request = AgentTurnCancellationRequest { @@ -2136,6 +2214,7 @@ mod tests { session_id: "session_1".to_string(), }; let workspace_binding = AgentSessionWorkspaceBinding { + workspace_id: Some("workspace_1".to_string()), workspace_path: "/workspace/project".to_string(), remote_connection_id: Some("conn-1".to_string()), remote_ssh_host: Some("host-1".to_string()), @@ -2159,6 +2238,7 @@ mod tests { assert_eq!(delete_json["remoteConnectionId"], "conn-1"); assert_eq!(delete_json["remoteSshHost"], "host-1"); assert_eq!(workspace_json["sessionId"], "session_1"); + assert_eq!(binding_json["workspaceId"], "workspace_1"); assert_eq!(binding_json["workspacePath"], "/workspace/project"); assert_eq!(binding_json["remoteConnectionId"], "conn-1"); assert_eq!(binding_json["remoteSshHost"], "host-1"); diff --git a/src/crates/execution/agent-runtime/src/runtime.rs b/src/crates/execution/agent-runtime/src/runtime.rs index b6c0913ce..4d7737ccb 100644 --- a/src/crates/execution/agent-runtime/src/runtime.rs +++ b/src/crates/execution/agent-runtime/src/runtime.rs @@ -15,9 +15,10 @@ use bitfun_runtime_ports::{ AgentSessionCreateResult, AgentSessionDeleteRequest, AgentSessionListRequest, AgentSessionManagementPort, AgentSessionSummary, AgentSessionWorkspaceBinding, AgentSessionWorkspaceRequest, AgentSubmissionPort, AgentSubmissionRequest, - AgentSubmissionResult, AgentSubmissionSource, AgentThreadGoalDeliveryRequest, - AgentTurnCancellationPort, AgentTurnCancellationRequest, AgentTurnCancellationResult, - DialogSubmitOutcome, PortError, RuntimeEventEnvelope, + AgentSubmissionResult, AgentSubmissionSource, AgentThreadGoalCreateRequest, + AgentThreadGoalDeliveryRequest, AgentThreadGoalGetRequest, AgentThreadGoalManagementPort, + AgentThreadGoalUpdateStatusRequest, AgentTurnCancellationPort, AgentTurnCancellationRequest, + AgentTurnCancellationResult, DialogSubmitOutcome, PortError, RuntimeEventEnvelope, ThreadGoal, }; use bitfun_runtime_services::RuntimeServices; @@ -39,6 +40,8 @@ pub enum RuntimeError { MissingCancellationPort, #[error("agent session management port is not registered")] MissingSessionManagementPort, + #[error("agent thread goal management port is not registered")] + MissingThreadGoalManagementPort, #[error("runtime event sink is not registered")] MissingEventSink, #[error(transparent)] @@ -97,6 +100,7 @@ pub trait RuntimeAgentRegistry: Send + Sync { pub struct AgentRuntime { submission: Arc, session_management: Option>, + thread_goal_management: Option>, dialog_turn: Option>, lifecycle_delivery: Option>, cancellation: Option>, @@ -119,6 +123,13 @@ impl std::fmt::Debug for AgentRuntime { .as_ref() .map(|_| ""), ) + .field( + "thread_goal_management", + &self + .thread_goal_management + .as_ref() + .map(|_| ""), + ) .field( "dialog_turn", &self @@ -185,6 +196,7 @@ where pub struct AgentRuntimeBuilder { submission: Option>, session_management: Option>, + thread_goal_management: Option>, dialog_turn: Option>, lifecycle_delivery: Option>, cancellation: Option>, @@ -214,6 +226,14 @@ impl AgentRuntimeBuilder { self } + pub fn with_thread_goal_management_port( + mut self, + port: Arc, + ) -> Self { + self.thread_goal_management = Some(port); + self + } + pub fn with_dialog_turn_port(mut self, port: Arc) -> Self { self.dialog_turn = Some(port); self @@ -268,6 +288,7 @@ impl AgentRuntimeBuilder { .submission .ok_or(RuntimeBuildError::MissingSubmissionPort)?, session_management: self.session_management, + thread_goal_management: self.thread_goal_management, dialog_turn: self.dialog_turn, lifecycle_delivery: self.lifecycle_delivery, cancellation: self.cancellation, @@ -511,6 +532,48 @@ impl AgentRuntime { .map_err(RuntimeError::from) } + pub async fn get_thread_goal( + &self, + request: AgentThreadGoalGetRequest, + ) -> Result, RuntimeError> { + let thread_goal_management = self + .thread_goal_management + .as_ref() + .ok_or(RuntimeError::MissingThreadGoalManagementPort)?; + thread_goal_management + .get_thread_goal(request) + .await + .map_err(RuntimeError::from) + } + + pub async fn create_thread_goal( + &self, + request: AgentThreadGoalCreateRequest, + ) -> Result { + let thread_goal_management = self + .thread_goal_management + .as_ref() + .ok_or(RuntimeError::MissingThreadGoalManagementPort)?; + thread_goal_management + .create_thread_goal(request) + .await + .map_err(RuntimeError::from) + } + + pub async fn update_thread_goal_status( + &self, + request: AgentThreadGoalUpdateStatusRequest, + ) -> Result { + let thread_goal_management = self + .thread_goal_management + .as_ref() + .ok_or(RuntimeError::MissingThreadGoalManagementPort)?; + thread_goal_management + .update_thread_goal_status(request) + .await + .map_err(RuntimeError::from) + } + pub async fn resolve_session_agent_type( &self, session_id: &str, @@ -609,10 +672,10 @@ mod tests { AgentSessionCreateResult, AgentSessionDeleteRequest, AgentSessionListRequest, AgentSessionManagementPort, AgentSessionSummary, AgentSessionWorkspaceRequest, AgentSubmissionResult, AgentThreadGoalDeliveryKind, AgentThreadGoalDeliveryRequest, - AgentTurnCancellationResult, ClockPort, DialogQueuePriority, DialogSubmissionPolicy, - DialogSubmitOutcome, FileSystemPort, PermissionPort, PortErrorKind, PortResult, - RuntimeEventSink, RuntimeEventType, RuntimeServiceCapability, SessionStorePort, ThreadGoal, - ThreadGoalStatus, WorkspacePort, + AgentThreadGoalManagementPort, AgentTurnCancellationResult, ClockPort, DialogQueuePriority, + DialogSubmissionPolicy, DialogSubmitOutcome, FileSystemPort, PermissionPort, PortErrorKind, + PortResult, RuntimeEventSink, RuntimeEventType, RuntimeServiceCapability, SessionStorePort, + ThreadGoal, ThreadGoalStatus, WorkspacePort, }; use bitfun_runtime_services::{test_support::FakeRuntimePort, RuntimeServicesBuilder}; @@ -624,9 +687,27 @@ mod tests { listed_sessions: Mutex>, deleted_sessions: Mutex>, workspace_binding_requests: Mutex>, + thread_goal_gets: Mutex>, + thread_goal_creates: Mutex>, + thread_goal_updates: Mutex>, resolved_agent_type: Option, } + fn fake_thread_goal(status: ThreadGoalStatus) -> ThreadGoal { + ThreadGoal { + goal_id: "goal_1".to_string(), + session_id: "session_1".to_string(), + objective: "Ship runtime port".to_string(), + status, + token_budget: Some(1000), + tokens_used: 10, + time_used_seconds: 5, + created_at: 1, + updated_at: 2, + auto_continuation_count: 0, + } + } + #[async_trait::async_trait] impl AgentSessionManagementPort for FakeAgentRuntimePorts { async fn list_sessions( @@ -657,6 +738,7 @@ mod tests { .unwrap() .push(request); Ok(Some(AgentSessionWorkspaceBinding { + workspace_id: Some("workspace_1".to_string()), workspace_path: "/workspace/project".to_string(), remote_connection_id: Some("conn-1".to_string()), remote_ssh_host: Some("host-1".to_string()), @@ -702,6 +784,34 @@ mod tests { } } + #[async_trait::async_trait] + impl AgentThreadGoalManagementPort for FakeAgentRuntimePorts { + async fn get_thread_goal( + &self, + request: AgentThreadGoalGetRequest, + ) -> PortResult> { + self.thread_goal_gets.lock().unwrap().push(request); + Ok(Some(fake_thread_goal(ThreadGoalStatus::Active))) + } + + async fn create_thread_goal( + &self, + request: AgentThreadGoalCreateRequest, + ) -> PortResult { + self.thread_goal_creates.lock().unwrap().push(request); + Ok(fake_thread_goal(ThreadGoalStatus::Active)) + } + + async fn update_thread_goal_status( + &self, + request: AgentThreadGoalUpdateStatusRequest, + ) -> PortResult { + let status = request.status; + self.thread_goal_updates.lock().unwrap().push(request); + Ok(fake_thread_goal(status)) + } + } + #[async_trait::async_trait] impl AgentTurnCancellationPort for FakeAgentRuntimePorts { async fn cancel_turn( @@ -943,6 +1053,10 @@ mod tests { .expect("workspace binding"); assert_eq!(sessions[0].session_id, "session_1"); + assert_eq!( + workspace_binding.workspace_id.as_deref(), + Some("workspace_1") + ); assert_eq!(workspace_binding.workspace_path, "/workspace/project"); assert_eq!( workspace_binding.remote_connection_id.as_deref(), @@ -953,6 +1067,77 @@ mod tests { assert_eq!(ports.workspace_binding_requests.lock().unwrap().len(), 1); } + #[tokio::test] + async fn thread_goal_management_requires_registered_port() { + let ports = Arc::new(FakeAgentRuntimePorts::default()); + let runtime = AgentRuntimeBuilder::new() + .with_submission_port(ports) + .build() + .expect("runtime"); + + let err = runtime + .get_thread_goal(AgentThreadGoalGetRequest { + session_id: "session_1".to_string(), + workspace_path: "/workspace/project".to_string(), + }) + .await + .unwrap_err(); + + assert_eq!(err, RuntimeError::MissingThreadGoalManagementPort); + } + + #[tokio::test] + async fn thread_goal_management_delegates_to_registered_port() { + let ports = Arc::new(FakeAgentRuntimePorts::default()); + let runtime = AgentRuntimeBuilder::new() + .with_submission_port(ports.clone()) + .with_thread_goal_management_port(ports.clone()) + .build() + .expect("runtime"); + + let goal = runtime + .get_thread_goal(AgentThreadGoalGetRequest { + session_id: "session_1".to_string(), + workspace_path: "/workspace/project".to_string(), + }) + .await + .expect("get goal") + .expect("goal"); + let created = runtime + .create_thread_goal(AgentThreadGoalCreateRequest { + session_id: "session_1".to_string(), + workspace_path: "/workspace/project".to_string(), + objective: "Ship runtime port".to_string(), + token_budget: Some(1000), + }) + .await + .expect("create goal"); + let updated = runtime + .update_thread_goal_status(AgentThreadGoalUpdateStatusRequest { + session_id: "session_1".to_string(), + workspace_path: "/workspace/project".to_string(), + status: ThreadGoalStatus::Complete, + turn_id: Some("turn_1".to_string()), + }) + .await + .expect("update goal"); + + assert_eq!(goal.status, ThreadGoalStatus::Active); + assert_eq!(created.objective, "Ship runtime port"); + assert_eq!(updated.status, ThreadGoalStatus::Complete); + assert_eq!(ports.thread_goal_gets.lock().unwrap().len(), 1); + assert_eq!( + ports.thread_goal_creates.lock().unwrap()[0].token_budget, + Some(1000) + ); + assert_eq!( + ports.thread_goal_updates.lock().unwrap()[0] + .turn_id + .as_deref(), + Some("turn_1") + ); + } + #[tokio::test] async fn submit_dialog_turn_requires_registered_dialog_turn_port() { let ports = Arc::new(FakeAgentRuntimePorts::default()); diff --git a/src/crates/execution/agent-runtime/src/sdk.rs b/src/crates/execution/agent-runtime/src/sdk.rs index ed09589bc..94913d708 100644 --- a/src/crates/execution/agent-runtime/src/sdk.rs +++ b/src/crates/execution/agent-runtime/src/sdk.rs @@ -49,17 +49,18 @@ pub use bitfun_runtime_ports::{ AgentSessionCreateRequest, AgentSessionCreateResult, AgentSessionDeleteRequest, AgentSessionListRequest, AgentSessionManagementPort, AgentSessionSummary, AgentSessionWorkspaceRequest, AgentSubmissionPort, AgentSubmissionRequest, - AgentSubmissionResult, AgentSubmissionSource, AgentThreadGoalDeliveryRequest, - AgentTurnCancellationPort, AgentTurnCancellationRequest, AgentTurnCancellationResult, - ClockPort, DialogSubmitOutcome, FileSystemPort, GitPort, McpCatalogPort, NetworkPort, - PermissionDecision, PermissionPort, PermissionRequest, PortError, PortResult, - RemoteAssistantWorkspaceFacts, RemoteCapabilityPort, RemoteConnectionPort, + AgentSubmissionResult, AgentSubmissionSource, AgentThreadGoalCreateRequest, + AgentThreadGoalDeliveryRequest, AgentThreadGoalGetRequest, AgentThreadGoalManagementPort, + AgentThreadGoalUpdateStatusRequest, AgentTurnCancellationPort, AgentTurnCancellationRequest, + AgentTurnCancellationResult, ClockPort, DialogSubmitOutcome, FileSystemPort, GitPort, + McpCatalogPort, NetworkPort, PermissionDecision, PermissionPort, PermissionRequest, PortError, + PortResult, RemoteAssistantWorkspaceFacts, RemoteCapabilityPort, RemoteConnectionPort, RemoteProjectionPort, RemoteRecentWorkspaceFacts, RemoteWorkspaceFacts, RemoteWorkspaceFileRuntimeHost, RemoteWorkspaceKind, RemoteWorkspacePort, RemoteWorkspaceRuntimeHost, RemoteWorkspaceUpdate, RuntimeEventEnvelope, RuntimeEventSink, RuntimeEventType, RuntimeServiceCapability, RuntimeServicePort, SessionStorageKind, SessionStoragePathRequest, SessionStoragePathResolution, SessionStorePort, TerminalPort, - WorkspacePort, + ThreadGoal, ThreadGoalStatus, WorkspacePort, }; pub use bitfun_runtime_services::{ CapabilityAvailability, RuntimeServices, RuntimeServicesBuilder, RuntimeServicesError, diff --git a/src/crates/services/services-integrations/src/workspace_search/result_mapping.rs b/src/crates/services/services-integrations/src/workspace_search/result_mapping.rs index ea22b82cd..4c939f4d9 100644 --- a/src/crates/services/services-integrations/src/workspace_search/result_mapping.rs +++ b/src/crates/services/services-integrations/src/workspace_search/result_mapping.rs @@ -9,7 +9,7 @@ pub(crate) fn convert_search_results( ) -> Vec { match output_mode { ContentSearchOutputMode::Content => { - let line_results = convert_line_matches_to_file_search_results(search_results); + let line_results = convert_hits_to_file_search_results(search_results); if !line_results.is_empty() { return line_results; } @@ -33,33 +33,44 @@ pub(crate) fn convert_search_results( } } -fn convert_line_matches_to_file_search_results( - search_results: &SearchResults, -) -> Vec { +fn convert_hits_to_file_search_results(search_results: &SearchResults) -> Vec { search_results .line_matches .iter() - .map(|matched| FileSearchResult { - path: matched.path.clone(), - name: Path::new(&matched.path) - .file_name() - .and_then(|file_name| file_name.to_str()) - .unwrap_or(&matched.path) - .to_string(), - is_directory: false, - match_type: SearchMatchType::Content, - line_number: Some(matched.line_number), - matched_content: matched + .map(|matched| { + let matched_content = matched .line_text .clone() - .or_else(|| Some(format!("line {}", matched.line_number))), - preview_before: None, - preview_inside: matched.line_text.clone(), - preview_after: None, + .unwrap_or_else(|| format!("line {}", matched.line_number)); + let (preview_before, preview_inside, preview_after) = matched + .line_text + .as_deref() + .map(split_preview) + .unwrap_or((None, None, None)); + + FileSearchResult { + path: matched.path.clone(), + name: Path::new(&matched.path) + .file_name() + .and_then(|file_name| file_name.to_str()) + .unwrap_or(&matched.path) + .to_string(), + is_directory: false, + match_type: SearchMatchType::Content, + line_number: Some(matched.line_number), + matched_content: Some(matched_content), + preview_before, + preview_inside, + preview_after, + } }) .collect() } +fn split_preview(line_text: &str) -> (Option, Option, Option) { + (None, Some(line_text.to_string()), None) +} + fn convert_file_counts_to_search_results(search_results: &SearchResults) -> Vec { search_results .file_counts diff --git a/src/crates/services/services-integrations/src/workspace_search/service.rs b/src/crates/services/services-integrations/src/workspace_search/service.rs index 844a1f7c0..0b954346f 100644 --- a/src/crates/services/services-integrations/src/workspace_search/service.rs +++ b/src/crates/services/services-integrations/src/workspace_search/service.rs @@ -866,4 +866,29 @@ mod tests { assert_eq!(results[0].matched_content.as_deref(), Some("line 42")); assert_eq!(results[0].preview_inside, None); } + + #[test] + fn content_search_converts_line_matches_with_line_text_preview() { + let mut search_results = empty_search_results(); + search_results.line_matches = serde_json::from_value(serde_json::json!([{ + "path": "src/search.rs", + "line_number": 42, + "line_text": "let result = search();" + }])) + .expect("line_matches should decode"); + + let results = convert_search_results(&search_results, ContentSearchOutputMode::Content); + + assert_eq!(results.len(), 1); + assert_eq!( + results[0].matched_content.as_deref(), + Some("let result = search();") + ); + assert_eq!( + results[0].preview_inside.as_deref(), + Some("let result = search();") + ); + assert_eq!(results[0].preview_before, None); + assert_eq!(results[0].preview_after, None); + } }