diff --git a/.catpaw/rules/python-launcher.md b/.catpaw/rules/python-launcher.md new file mode 100644 index 000000000..e21eaf398 --- /dev/null +++ b/.catpaw/rules/python-launcher.md @@ -0,0 +1,23 @@ +--- +ruleType: Manual +description: 本地环境优先使用 py 命令运行 Python 脚本 +globs: +--- +rule编写规则: https://catpaw.meituan.com/guides/settings/rules + +# 本地 Python 命令约定 + +在 Windows 本地环境中,优先使用 `py` 而非 `python` 来运行 Python 脚本。 + +## 原因 + +- **`py`** 是 Windows 上的 Python Launcher(`py.exe`),安装时写入 `C:\Windows\`,始终在 PATH 中。 +- **`python`** 的可执行目录可能未加入 PATH(安装时未勾选 "Add Python to PATH"),导致命令不可用。 +- `py` 支持多版本选择(如 `py -3.11`、`py -3.12`),更灵活可靠。 + +## 执行规则 + +- 运行 Python 脚本时使用 `py script.py` 而非 `python script.py` +- 安装包时使用 `py -m pip install ` 而非 `python -m pip install ` +- 指定版本时使用 `py -3.x` 格式 +- 运行模块时使用 `py -m ` 格式 diff --git a/TODO.md b/TODO.md new file mode 100644 index 000000000..f589265e2 --- /dev/null +++ b/TODO.md @@ -0,0 +1,12 @@ +# TODO +- [ ] [context.py] 在 build_runtime_system_prompt() 中新增 PLAN 模式感知段落(类似 fast_mode),告知 LLM 当前处于只读分析模式 +- [ ] [runtime.py] 在 refresh_runtime_client() 中调用 build_runtime_system_prompt() 重建 system prompt,使模式切换后 LLM 立即感知新模式 +- [ ] [runtime.py] 验证 refresh_runtime_client() 中 bundle 对象的必要属性(cwd、extra_skill_dirs、extra_plugin_roots、include_project_memory)在调用点可用 +- [ ] [测试] 编写/验证测试用例:进入 plan 模式后 system prompt 包含 plan mode 提示,退出后不再包含 +- [x] [context.py] 在 build_runtime_system_prompt() 中新增 PLAN 模式感知段落 — 已完成 +- [x] [runtime.py] 在 refresh_runtime_client() 中重建 system prompt — 已完成 +- [x] [runtime.py] 可用性验证 — RuntimeBundle 所有必要属性均已存在 — 已完成 +- [x] [测试] 编写/验证测试用例 — 所有 prompt 相关测试通过,权限模式切换逻辑验证通过 +- [x] 创建 clipboard_screenshot_tool.py 工具实现 +- [x] 在 __init__.py 中注册新工具 +- [x] 创建单元测试 test_clipboard_screenshot_tool.py diff --git a/frontend/terminal/src/components/TranscriptPane.tsx b/frontend/terminal/src/components/TranscriptPane.tsx index d7a69e70d..c1965f672 100644 --- a/frontend/terminal/src/components/TranscriptPane.tsx +++ b/frontend/terminal/src/components/TranscriptPane.tsx @@ -32,6 +32,8 @@ function labelFor(role: TranscriptItem['role']): string { return 'tool>'; case 'tool_result': return 'tool_result>'; + case 'thinking': + return 'Think:'; default: return `${role}>`; } @@ -41,6 +43,9 @@ function roleColor(role: TranscriptItem['role']): string | undefined { if (role === 'assistant') { return 'green'; } + if (role === 'thinking') { + return 'gray'; + } if (role === 'tool') { return 'cyan'; } diff --git a/frontend/terminal/src/hooks/useBackendSession.ts b/frontend/terminal/src/hooks/useBackendSession.ts index 6fc84b7ed..a59bf589e 100644 --- a/frontend/terminal/src/hooks/useBackendSession.ts +++ b/frontend/terminal/src/hooks/useBackendSession.ts @@ -52,6 +52,9 @@ export function useBackendSession(config: FrontendConfig, onExit: (code?: number const assistantFlushTimerRef = useRef(null); const pendingTranscriptItemsRef = useRef([]); const transcriptFlushTimerRef = useRef(null); + // Thinking content buffer to accumulate chunks into single item + const thinkingBufferRef = useRef(''); + const thinkingActiveRef = useRef(false); const flushAssistantDelta = (): void => { const pending = pendingAssistantDeltaRef.current; @@ -294,11 +297,34 @@ export function useBackendSession(config: FrontendConfig, onExit: (code?: number } return; } + if (event.type === 'thinking_delta') { + const delta = event.message ?? ''; + if (!delta) { + return; + } + // Accumulate thinking content instead of creating separate items + if (!thinkingActiveRef.current) { + thinkingActiveRef.current = true; + thinkingBufferRef.current = delta; + } else { + thinkingBufferRef.current += delta; + } + return; + } if (event.type === 'assistant_delta') { const delta = event.message ?? ''; if (!delta) { return; } + // Flush accumulated thinking content before first assistant delta + if (thinkingActiveRef.current) { + const thinkingContent = thinkingBufferRef.current.trim(); + if (thinkingContent) { + queueTranscriptItem({role: 'thinking', text: thinkingContent}); + } + thinkingActiveRef.current = false; + thinkingBufferRef.current = ''; + } const isCodexStyle = String(statusRef.current.output_style ?? 'default') === 'codex'; if (isCodexStyle) { // Keep collecting text for assistant_complete fallback, but avoid @@ -325,6 +351,15 @@ export function useBackendSession(config: FrontendConfig, onExit: (code?: number assistantFlushTimerRef.current = null; } flushTranscriptItems(); + // Flush any remaining thinking content before assistant_complete + if (thinkingActiveRef.current) { + const thinkingContent = thinkingBufferRef.current.trim(); + if (thinkingContent) { + pendingTranscriptItemsRef.current.push({role: 'thinking', text: thinkingContent}); + } + thinkingActiveRef.current = false; + thinkingBufferRef.current = ''; + } const isCodexStyle = String(statusRef.current.output_style ?? 'default') === 'codex'; if (isCodexStyle) { if (pendingAssistantDeltaRef.current) { diff --git a/frontend/terminal/src/types.ts b/frontend/terminal/src/types.ts index 3575f08af..32369034f 100644 --- a/frontend/terminal/src/types.ts +++ b/frontend/terminal/src/types.ts @@ -4,7 +4,7 @@ export type FrontendConfig = { }; export type TranscriptItem = { - role: 'system' | 'user' | 'assistant' | 'tool' | 'tool_result' | 'log' | 'status'; + role: 'system' | 'user' | 'assistant' | 'thinking' | 'tool' | 'tool_result' | 'log' | 'status'; text: string; tool_name?: string; tool_input?: Record; diff --git a/pyproject.toml b/pyproject.toml index 427229db2..a214db476 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,7 @@ dev = [ "pytest-cov>=5.0.0", "ruff>=0.5.0", "mypy>=1.10.0", + "Pillow>=10.0.0", ] [project.scripts] diff --git a/src/openharness/api/client.py b/src/openharness/api/client.py index 26be6a7cf..0829a70d9 100644 --- a/src/openharness/api/client.py +++ b/src/openharness/api/client.py @@ -46,6 +46,7 @@ class ApiMessageRequest: max_tokens: int = 4096 tools: list[dict[str, Any]] = field(default_factory=list) effort: str | None = None + show_thinking: bool = False @dataclass(frozen=True) @@ -74,7 +75,14 @@ class ApiRetryEvent: delay_seconds: float -ApiStreamEvent = ApiTextDeltaEvent | ApiMessageCompleteEvent | ApiRetryEvent +@dataclass(frozen=True) +class ApiThinkingDeltaEvent: + """Incremental thinking/reasoning content from the model.""" + + text: str + + +ApiStreamEvent = ApiTextDeltaEvent | ApiThinkingDeltaEvent | ApiMessageCompleteEvent | ApiRetryEvent class SupportsStreamingMessages(Protocol): diff --git a/src/openharness/api/copilot_client.py b/src/openharness/api/copilot_client.py index db3701354..078d0a277 100644 --- a/src/openharness/api/copilot_client.py +++ b/src/openharness/api/copilot_client.py @@ -125,6 +125,7 @@ async def stream_message(self, request: ApiMessageRequest) -> AsyncIterator[ApiS system_prompt=request.system_prompt, max_tokens=request.max_tokens, tools=request.tools, + show_thinking=request.show_thinking, ) async for event in self._inner.stream_message(patched): yield event diff --git a/src/openharness/api/openai_client.py b/src/openharness/api/openai_client.py index 3bac63163..3b1c04718 100644 --- a/src/openharness/api/openai_client.py +++ b/src/openharness/api/openai_client.py @@ -18,6 +18,7 @@ ApiRetryEvent, ApiStreamEvent, ApiTextDeltaEvent, + ApiThinkingDeltaEvent, ) from openharness.api.errors import ( AuthenticationFailure, @@ -334,7 +335,7 @@ async def _stream_once(self, request: ApiMessageRequest) -> AsyncIterator[ApiStr collected_tool_calls: dict[int, dict[str, Any]] = {} finish_reason: str | None = None usage_data: dict[str, int] = {} - # Buffer to strip inline blocks across streaming chunks. + # Buffer to strip inline blocks across streaming chunks. _think_buf = "" stream = await self._client.chat.completions.create(**params) @@ -354,18 +355,32 @@ async def _stream_once(self, request: ApiMessageRequest) -> AsyncIterator[ApiStr if chunk_finish: finish_reason = chunk_finish - # Accumulate reasoning_content from thinking models (not shown to user) + # Accumulate reasoning_content from thinking models reasoning_piece = getattr(delta, "reasoning_content", None) or "" if reasoning_piece: collected_reasoning += reasoning_piece + if request.show_thinking: + yield ApiThinkingDeltaEvent(text=reasoning_piece) - # Stream text content to user, stripping inline blocks + # Stream text content to user if delta.content: _think_buf += delta.content - visible, _think_buf = _strip_think_blocks(_think_buf) - if visible: - collected_content += visible - yield ApiTextDeltaEvent(text=visible) + if request.show_thinking: + # Convert inline blocks into classified segments + segments, _think_buf = _convert_think_blocks_display(_think_buf) + for text, is_thinking in segments: + if not text: + continue + if is_thinking: + yield ApiThinkingDeltaEvent(text=text) + else: + collected_content += text + yield ApiTextDeltaEvent(text=text) + else: + visible, _think_buf = _strip_think_blocks(_think_buf) + if visible: + collected_content += visible + yield ApiTextDeltaEvent(text=visible) # Accumulate tool calls if delta.tool_calls: @@ -449,32 +464,79 @@ def _translate_error(exc: Exception) -> OpenHarnessApiError: return RequestFailure(msg) -# Matches complete blocks (DOTALL so newlines are included). -_THINK_RE = re.compile(r".*?", re.DOTALL) +# Matches complete blocks (DOTALL so newlines are included). +_THINK_RE = re.compile(r"(.*?)", re.DOTALL) _THINK_OPEN_TAG = "" +_THINK_CLOSE_TAG = "" def _strip_think_blocks(buf: str) -> tuple[str, str]: - """Strip complete ```` blocks and return ``(visible_text, leftover)``. + """Strip complete ``...`` blocks and return ``(visible_text, leftover)``. - Complete pairs are removed via regex. An unclosed ```` is held in + Complete pairs are removed via regex. An unclosed ```` is held in *leftover* so it can be re-evaluated once the closing tag arrives in the next streaming chunk. """ # Remove fully-closed blocks. cleaned = _THINK_RE.sub("", buf) - # Hold back any unclosed for the next chunk. + # Hold back any unclosed for the next chunk. open_idx = cleaned.find(_THINK_OPEN_TAG) if open_idx != -1: return cleaned[:open_idx], cleaned[open_idx:] # Streaming providers may split the opening tag itself across chunk # boundaries (e.g. ``"..."``). Hold back the longest - # suffix that could still become ```` on the next chunk. + # suffix that could still become ```` on the next chunk. max_prefix = min(len(cleaned), len(_THINK_OPEN_TAG) - 1) for prefix_len in range(max_prefix, 0, -1): if _THINK_OPEN_TAG.startswith(cleaned[-prefix_len:]): return cleaned[:-prefix_len], cleaned[-prefix_len:] return cleaned, "" + + +def _convert_think_blocks_display(buf: str) -> tuple[list[tuple[str, bool]], str]: + """Convert ``...`` blocks into classified segments. + + Instead of stripping thinking content, this extracts it and classifies + each segment as thinking or normal text so the caller can emit the + appropriate event type. + Returns ``(segments, leftover)`` where each segment is + ``(text, is_thinking)`` and *leftover* holds an unclosed `` +`` for the next streaming chunk. + """ + segments: list[tuple[str, bool]] = [] + pos = 0 + while True: + open_idx = buf.find(_THINK_OPEN_TAG, pos) + if open_idx == -1: + # No more opening tags; flush remaining text + remaining = buf[pos:] + # Check if the tail could be a partial opening tag + max_prefix = min(len(remaining), len(_THINK_OPEN_TAG) - 1) + for prefix_len in range(max_prefix, 0, -1): + if _THINK_OPEN_TAG.startswith(remaining[-prefix_len:]): + if remaining[:-prefix_len]: + segments.append((remaining[:-prefix_len], False)) + return segments, remaining[-prefix_len:] + if remaining: + segments.append((remaining, False)) + return segments, "" + + # Text before the opening tag + if open_idx > pos: + segments.append((buf[pos:open_idx], False)) + + close_idx = buf.find(_THINK_CLOSE_TAG, open_idx + len(_THINK_OPEN_TAG)) + if close_idx == -1: + # Unclosed block — hold back from the opening tag + return segments, buf[open_idx:] + + # Extract thinking content + think_content = buf[open_idx + len(_THINK_OPEN_TAG):close_idx].strip() + if think_content: + segments.append((think_content, True)) + pos = close_idx + len(_THINK_CLOSE_TAG) + + return segments, "" diff --git a/src/openharness/autopilot/service.py b/src/openharness/autopilot/service.py index 0cba9ddbc..e73d55efa 100644 --- a/src/openharness/autopilot/service.py +++ b/src/openharness/autopilot/service.py @@ -36,7 +36,7 @@ get_project_repo_journal_path, get_project_verification_policy_path, ) -from openharness.engine.stream_events import AssistantTextDelta, AssistantTurnComplete, ErrorEvent +from openharness.engine.stream_events import AssistantTextDelta, AssistantThinkingDelta, AssistantTurnComplete, ErrorEvent from openharness.swarm.worktree import WorktreeManager from openharness.utils.fs import atomic_write_text @@ -2067,7 +2067,9 @@ async def _ask(_question: str) -> str: collected: list[str] = [] try: async for event in bundle.engine.submit_message(prompt): - if isinstance(event, AssistantTextDelta): + if isinstance(event, AssistantThinkingDelta): + pass + elif isinstance(event, AssistantTextDelta): collected.append(event.text) elif isinstance(event, AssistantTurnComplete): text = event.message.text.strip() diff --git a/src/openharness/channels/adapter.py b/src/openharness/channels/adapter.py index ba5569e31..8d0990978 100644 --- a/src/openharness/channels/adapter.py +++ b/src/openharness/channels/adapter.py @@ -18,7 +18,7 @@ from openharness.channels.bus.events import InboundMessage, OutboundMessage from openharness.channels.bus.queue import MessageBus -from openharness.engine.stream_events import AssistantTextDelta, AssistantTurnComplete +from openharness.engine.stream_events import AssistantTextDelta, AssistantThinkingDelta, AssistantTurnComplete if TYPE_CHECKING: from openharness.engine.query_engine import QueryEngine @@ -98,7 +98,10 @@ async def _handle(self, msg: InboundMessage) -> None: reply_parts: list[str] = [] try: async for event in self._engine.submit_message(msg.content): - if isinstance(event, AssistantTextDelta): + if isinstance(event, AssistantThinkingDelta): + # Thinking content is omitted from channel replies + pass + elif isinstance(event, AssistantTextDelta): reply_parts.append(event.text) elif isinstance(event, AssistantTurnComplete): # Turn is done; we'll send the accumulated text below diff --git a/src/openharness/cli.py b/src/openharness/cli.py index 3b50d1d17..4dcb06fcc 100644 --- a/src/openharness/cli.py +++ b/src/openharness/cli.py @@ -2162,6 +2162,12 @@ def main( help="Override verbose mode setting from config", rich_help_panel="Model & Effort", ), + show_thinking: bool = typer.Option( + False, + "--show-thinking", + help="Show model thinking/reasoning process in the output", + rich_help_panel="Model & Effort", + ), max_turns: int | None = typer.Option( None, "--max-turns", @@ -2446,6 +2452,7 @@ def main( permission_mode=permission_mode, max_turns=max_turns, effort=effort, + show_thinking=show_thinking or None, ) ) return @@ -2479,5 +2486,6 @@ def main( api_format=api_format, permission_mode=permission_mode, effort=effort, + show_thinking=show_thinking or None, ) ) diff --git a/src/openharness/commands/registry.py b/src/openharness/commands/registry.py index 141614cbb..974653460 100644 --- a/src/openharness/commands/registry.py +++ b/src/openharness/commands/registry.py @@ -1252,6 +1252,26 @@ async def _effort_handler(args: str, context: CommandContext) -> CommandResult: context.app_state.set(effort=value) return CommandResult(message=f"Reasoning effort set to {value}.") + async def _thinking_handler(args: str, context: CommandContext) -> CommandResult: + settings = load_settings() + current = settings.show_thinking + arg = args.strip().lower() + if arg in {"on", "true", "1", "yes"}: + new_val = True + elif arg in {"off", "false", "0", "no"}: + new_val = False + elif arg == "show": + return CommandResult(message=f"Thinking display: {'on' if current else 'off'}") + else: + new_val = not current + settings = settings.model_copy(update={"show_thinking": new_val}) + save_settings(settings) + context.engine._show_thinking = new_val + return CommandResult( + message=f"Thinking display: {'on' if new_val else 'off'}", + refresh_runtime=True, + ) + async def _passes_handler(args: str, context: CommandContext) -> CommandResult: settings = load_settings() current = context.app_state.get().passes if context.app_state is not None else settings.passes @@ -2445,6 +2465,7 @@ async def _ship_handler(args: str, context: CommandContext) -> CommandResult: ) registry.register(SlashCommand("fast", "Show or update fast mode", _fast_handler)) registry.register(SlashCommand("effort", "Show or update reasoning effort", _effort_handler)) + registry.register(SlashCommand("thinking", "Toggle thinking/reasoning display", _thinking_handler)) registry.register(SlashCommand("passes", "Show or update reasoning pass count", _passes_handler)) registry.register(SlashCommand("turns", "Show or update maximum agentic turn count", _turns_handler)) registry.register(SlashCommand("continue", "Continue the previous tool loop if it was interrupted", _continue_handler)) diff --git a/src/openharness/config/settings.py b/src/openharness/config/settings.py index 32c543542..ddaf22592 100644 --- a/src/openharness/config/settings.py +++ b/src/openharness/config/settings.py @@ -595,6 +595,7 @@ class Settings(BaseModel): effort: str = "medium" passes: int = 1 verbose: bool = False + show_thinking: bool = False # Vision model (image-to-text fallback) vision: VisionModelConfig = Field(default_factory=VisionModelConfig) @@ -941,6 +942,10 @@ def _apply_env_overrides(settings: Settings) -> Settings: if max_turns: updates["max_turns"] = int(max_turns) + show_thinking = os.environ.get("OPENHARNESS_SHOW_THINKING") + if show_thinking: + updates["show_thinking"] = _parse_bool_env(show_thinking) + context_window_tokens = os.environ.get("OPENHARNESS_CONTEXT_WINDOW_TOKENS") if context_window_tokens: updates["context_window_tokens"] = int(context_window_tokens) diff --git a/src/openharness/engine/query.py b/src/openharness/engine/query.py index bc475152d..2543c731a 100644 --- a/src/openharness/engine/query.py +++ b/src/openharness/engine/query.py @@ -16,6 +16,7 @@ ApiMessageRequest, ApiRetryEvent, ApiTextDeltaEvent, + ApiThinkingDeltaEvent, SupportsStreamingMessages, ) from openharness.api.provider import is_model_multimodal @@ -29,6 +30,7 @@ ) from openharness.engine.stream_events import ( AssistantTextDelta, + AssistantThinkingDelta, AssistantTurnComplete, CompactProgressEvent, ErrorEvent, @@ -153,6 +155,7 @@ class QueryContext: max_turns: int | None = 200 hook_executor: HookExecutor | None = None tool_metadata: dict[str, object] | None = None + show_thinking: bool = False def _append_capped_unique(bucket: list[Any], value: Any, *, limit: int) -> None: @@ -733,11 +736,15 @@ async def _progress(event: CompactProgressEvent) -> None: max_tokens=effective_max_tokens, tools=context.tool_registry.to_api_schema(), effort=context.effort, + show_thinking=context.show_thinking, ) ): if isinstance(event, ApiTextDeltaEvent): yield AssistantTextDelta(text=event.text), None continue + if isinstance(event, ApiThinkingDeltaEvent): + yield AssistantThinkingDelta(text=event.text), None + continue if isinstance(event, ApiRetryEvent): yield StatusEvent( message=( diff --git a/src/openharness/engine/query_engine.py b/src/openharness/engine/query_engine.py index aa4acfca6..e36917398 100644 --- a/src/openharness/engine/query_engine.py +++ b/src/openharness/engine/query_engine.py @@ -56,6 +56,7 @@ def __init__( self._hook_executor = hook_executor self._tool_metadata = tool_metadata or {} self._settings = settings + self._show_thinking = settings.show_thinking if settings is not None else False self._messages: list[ConversationMessage] = [] self._cost_tracker = CostTracker() @@ -260,6 +261,7 @@ async def submit_message(self, prompt: str | ConversationMessage) -> AsyncIterat ask_user_prompt=self._ask_user_prompt, hook_executor=self._hook_executor, tool_metadata=self._tool_metadata, + show_thinking=self._show_thinking, ) query_messages = list(self._messages) coordinator_context = self._build_coordinator_context_message() @@ -297,6 +299,7 @@ async def continue_pending(self, *, max_turns: int | None = None) -> AsyncIterat ask_user_prompt=self._ask_user_prompt, hook_executor=self._hook_executor, tool_metadata=self._tool_metadata, + show_thinking=self._show_thinking, ) async for event, usage in run_query(context, self._messages): if usage is not None: diff --git a/src/openharness/engine/stream_events.py b/src/openharness/engine/stream_events.py index ea31b2d40..4e2af9936 100644 --- a/src/openharness/engine/stream_events.py +++ b/src/openharness/engine/stream_events.py @@ -16,6 +16,13 @@ class AssistantTextDelta: text: str +@dataclass(frozen=True) +class AssistantThinkingDelta: + """Incremental thinking/reasoning content from the model.""" + + text: str + + @dataclass(frozen=True) class AssistantTurnComplete: """Completed assistant turn.""" @@ -81,6 +88,7 @@ class CompactProgressEvent: StreamEvent = ( AssistantTextDelta + | AssistantThinkingDelta | AssistantTurnComplete | ToolExecutionStarted | ToolExecutionCompleted diff --git a/src/openharness/prompts/context.py b/src/openharness/prompts/context.py index 2b73a33f7..b9eed9328 100644 --- a/src/openharness/prompts/context.py +++ b/src/openharness/prompts/context.py @@ -15,8 +15,8 @@ from openharness.memory import load_memory_prompt from openharness.memory.relevance import format_relevant_memories, select_relevant_memories from openharness.memory.usage import mark_memory_used -from openharness.personalization.rules import load_local_rules from openharness.permissions.modes import PermissionMode +from openharness.personalization.rules import load_local_rules from openharness.prompts.claudemd import load_claude_md_prompt from openharness.prompts.system_prompt import build_system_prompt from openharness.skills.loader import load_skill_registry @@ -124,6 +124,16 @@ def build_runtime_system_prompt( "# Session Mode\nFast mode is enabled. Prefer concise replies, minimal tool use, and quicker progress over exhaustive exploration." ) + if settings.permission.mode == PermissionMode.PLAN: + sections.append( + "# Session Mode\n" + "Plan mode is active. You are in a read-only analysis and design phase. " + "Do NOT call mutating tools (write_file, edit_file, bash, etc.). " + "Instead: read code, explore the codebase, analyze the problem, " + "and present a detailed implementation plan to the user. " + "The user will exit plan mode when they are ready for you to implement." + ) + sections.append( "# Reasoning Settings\n" f"- Effort: {settings.effort}\n" diff --git a/src/openharness/tools/__init__.py b/src/openharness/tools/__init__.py index 0b068472e..bb5a7af82 100644 --- a/src/openharness/tools/__init__.py +++ b/src/openharness/tools/__init__.py @@ -5,6 +5,7 @@ from openharness.tools.bash_tool import BashTool from openharness.tools.base import BaseTool, ToolExecutionContext, ToolRegistry, ToolResult from openharness.tools.brief_tool import BriefTool +from openharness.tools.clipboard_screenshot_tool import ClipboardScreenshotTool from openharness.tools.config_tool import ConfigTool from openharness.tools.cron_create_tool import CronCreateTool from openharness.tools.cron_delete_tool import CronDeleteTool @@ -50,6 +51,7 @@ def create_default_tool_registry(mcp_manager=None) -> ToolRegistry: registry = ToolRegistry() for tool in ( BashTool(), + ClipboardScreenshotTool(), AskUserQuestionTool(), FileReadTool(), FileWriteTool(), diff --git a/src/openharness/tools/clipboard_screenshot_tool.py b/src/openharness/tools/clipboard_screenshot_tool.py new file mode 100644 index 000000000..2a8a72fd3 --- /dev/null +++ b/src/openharness/tools/clipboard_screenshot_tool.py @@ -0,0 +1,592 @@ +"""Clipboard screenshot tool — read images from the system clipboard. + +Supports Windows (PowerShell / CMD), macOS, and Linux with automatic +platform detection and multi-tier fallback. The tool can return the +clipboard image as base64 data, save it to a file, or auto-describe +it via a configured vision model. +""" + +from __future__ import annotations + +import base64 +import logging +import os +import shutil +import subprocess +import tempfile +from pathlib import Path +from typing import Any, Literal + +from pydantic import BaseModel, Field + +from openharness.platforms import get_platform +from openharness.tools.base import BaseTool, ToolExecutionContext, ToolResult + +log = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Pydantic input model +# --------------------------------------------------------------------------- + + +class ClipboardScreenshotToolInput(BaseModel): + """Arguments for clipboard screenshot capture.""" + + output_format: Literal["base64", "file", "text"] = Field( + default="base64", + description=( + "Output format: 'base64' returns raw base64-encoded PNG data " + "(pass to image_to_text for analysis), 'file' saves to disk and " + "returns the path, 'text' auto-describes the image via a " + "configured vision model and returns the description text." + ), + ) + save_path: str | None = Field( + default=None, + description=( + "File path when output_format is 'file'. " + "Defaults to 'clipboard_screenshot.png' in the current working directory." + ), + ) + description_prompt: str | None = Field( + default=None, + description="Custom instruction for vision description when output_format is 'text'.", + ) + + +# --------------------------------------------------------------------------- +# Tool implementation +# --------------------------------------------------------------------------- + + +class ClipboardScreenshotTool(BaseTool): + """Read an image from the system clipboard. + + Works regardless of where the screenshot was taken (browser, terminal, + desktop, IDE, etc.) — as long as the image is in the system clipboard. + """ + + name = "clipboard_screenshot" + description = ( + "Read an image from the system clipboard (screenshot or copied image) " + "and return it as base64-encoded PNG data, save it to a file, or " + "auto-describe it via a configured vision model. " + "Use this when the user has taken a screenshot or copied an image and " + "wants the AI to see and process it. The tool works on Windows, macOS, " + "and Linux." + ) + input_model = ClipboardScreenshotToolInput + + # ------------------------------------------------------------------ + # Public API + # ------------------------------------------------------------------ + + def is_read_only(self, arguments: ClipboardScreenshotToolInput) -> bool: + del arguments + return True + + async def execute( + self, arguments: ClipboardScreenshotToolInput, context: ToolExecutionContext + ) -> ToolResult: + # 1. Read raw PNG bytes from the clipboard + image_bytes = await self._read_clipboard_image() + if image_bytes is None: + return ToolResult( + output=( + "clipboard_screenshot: no image found in clipboard. " + "Please take a screenshot (e.g. Win+Shift+S on Windows, " + "Cmd+Ctrl+Shift+4 on macOS) or copy an image first, then " + "try again." + ), + is_error=True, + ) + + size_kb = len(image_bytes) / 1024 + + # 2. Route by output format + if arguments.output_format == "base64": + b64 = base64.b64encode(image_bytes).decode("ascii") + return ToolResult( + output=( + f"[Clipboard image captured: {len(image_bytes)} bytes " + f"({size_kb:.1f} KB), PNG format]\n" + f"The image is available in the metadata. " + f"Use image_to_text with image_data to analyze it." + ), + metadata={ + "image_data": b64, + "media_type": "image/png", + "size_bytes": len(image_bytes), + }, + ) + + if arguments.output_format == "file": + save_path = Path(arguments.save_path or "clipboard_screenshot.png") + if not save_path.is_absolute(): + save_path = (context.cwd / save_path).resolve() + save_path.parent.mkdir(parents=True, exist_ok=True) + save_path.write_bytes(image_bytes) + return ToolResult( + output=( + f"Saved clipboard image to {save_path} " + f"({len(image_bytes)} bytes, {size_kb:.1f} KB)" + ), + metadata={ + "path": str(save_path), + "size_bytes": len(image_bytes), + }, + ) + + if arguments.output_format == "text": + return await self._describe_via_vision(image_bytes, arguments, context) + + return ToolResult(output="Invalid output_format", is_error=True) + + # ------------------------------------------------------------------ + # Clipboard reading — platform dispatch + # ------------------------------------------------------------------ + + async def _read_clipboard_image(self) -> bytes | None: + """Read an image from the clipboard, returning PNG bytes or None.""" + platform = get_platform() + + if platform == "windows": + return self._read_clipboard_windows() + if platform == "macos": + return self._read_clipboard_macos() + if platform in ("linux", "wsl"): + return self._read_clipboard_linux() + + # Unknown platform — try PIL as a last resort + return self._read_clipboard_pil() + + # ------------------------------------------------------------------ + # Windows: multi-tier fallback + # ------------------------------------------------------------------ + + def _read_clipboard_windows(self) -> bytes | None: + """Windows clipboard reading with automatic fallback. + + Tier 1: Pillow ImageGrab (simplest, synchronous) + Tier 2: PowerShell + System.Windows.Forms (always available on Win10+) + """ + # Tier 1 — Pillow + result = self._read_clipboard_pil() + if result is not None: + log.debug("clipboard_screenshot: captured via Pillow ImageGrab") + return result + + # Tier 2 — PowerShell (Windows PowerShell 5.1) + result = self._read_clipboard_powershell() + if result is not None: + log.debug("clipboard_screenshot: captured via PowerShell") + return result + + return None + + @staticmethod + def _read_clipboard_pil() -> bytes | None: + """Read clipboard image via Pillow ImageGrab (cross-platform).""" + try: + from PIL import ImageGrab # type: ignore[import-untyped] + except ImportError: + return None + + try: + img = ImageGrab.grabclipboard() + except Exception: + log.debug("PIL ImageGrab.grabclipboard() raised", exc_info=True) + return None + + if img is None: + return None + + # Convert to PNG bytes in memory + try: + import io + + buf = io.BytesIO() + # Ensure RGBA → RGB if no alpha needed (PNG handles both) + if img.mode not in ("RGB", "RGBA", "L", "P"): + img = img.convert("RGBA") + img.save(buf, format="PNG") + return buf.getvalue() + except Exception: + log.debug("PIL image save to PNG buffer failed", exc_info=True) + return None + + @staticmethod + def _read_clipboard_powershell() -> bytes | None: + """Read clipboard image via Windows PowerShell subprocess. + + Uses Windows PowerShell 5.1 (powershell.exe), not pwsh.exe, + because System.Windows.Forms and System.Drawing are built-in + on every Windows 10/11 installation. + """ + ps_exe = _find_windows_powershell() + if ps_exe is None: + return None + + tmp_path = None + try: + # Create temp file for PowerShell to write into + fd, tmp_path_str = tempfile.mkstemp(suffix=".png", prefix="oh_clip_") + tmp_path = Path(tmp_path_str) + os_close_fd(fd) + + script = ( + f"Add-Type -AssemblyName System.Windows.Forms;" + f"Add-Type -AssemblyName System.Drawing;" + f"$img = [System.Windows.Forms.Clipboard]::GetImage();" + f'if ($img -ne $null) {{' + f' $img.Save("{tmp_path}", [System.Drawing.Imaging.ImageFormat]::Png);' + f' Write-Output "OK"' + f"}} else {{" + f' Write-Output "NO_IMAGE"' + f"}}" + ) + + result = subprocess.run( + [str(ps_exe), "-NoProfile", "-NonInteractive", "-Command", script], + capture_output=True, + text=True, + timeout=15, + ) + + stdout = result.stdout.strip() if result.stdout else "" + if stdout == "OK" and tmp_path.exists() and tmp_path.stat().st_size > 0: + image_data = tmp_path.read_bytes() + try: + tmp_path.unlink(missing_ok=True) + except Exception: + pass + tmp_path = None # prevent double-unlink in finally + return image_data + + if stdout == "NO_IMAGE": + log.debug("PowerShell: clipboard contains no image") + else: + log.debug( + "PowerShell unexpected output: stdout=%r stderr=%r", + stdout, + result.stderr[:200] if result.stderr else "", + ) + return None + + except FileNotFoundError: + log.debug("PowerShell executable not found") + return None + except subprocess.TimeoutExpired: + log.debug("PowerShell clipboard read timed out") + return None + except Exception: + log.debug("PowerShell clipboard read failed", exc_info=True) + return None + finally: + if tmp_path is not None: + try: + tmp_path.unlink(missing_ok=True) + except Exception: + pass + + # ------------------------------------------------------------------ + # macOS — osascript + PNG clipboard + # ------------------------------------------------------------------ + + def _read_clipboard_macos(self) -> bytes | None: + """Read clipboard image on macOS. + + Tier 1: Pillow ImageGrab + Tier 2: osascript (AppKit) to extract TIFF → PNG + """ + # Tier 1 — Pillow + result = self._read_clipboard_pil() + if result is not None: + return result + + # Tier 2 — osascript + return self._read_clipboard_macos_osascript() + + @staticmethod + def _read_clipboard_macos_osascript() -> bytes | None: + """Use AppleScript/AppKit to extract clipboard image as PNG.""" + osascript = shutil.which("osascript") + if osascript is None: + return None + + tmp_path = None + try: + fd, tmp_path_str = tempfile.mkstemp(suffix=".png", prefix="oh_clip_") + tmp_path = Path(tmp_path_str) + os_close_fd(fd) + + script = ( + 'use framework "AppKit"\n' + 'use scripting additions\n' + "set pb to current application's NSPasteboard's generalPasteboard()\n" + "set imageData to pb's dataForType:(current application's NSPasteboardTypePNG)\n" + "if imageData is missing value then\n" + ' return "NO_IMAGE"\n' + "end if\n" + f"set filePath to \"{tmp_path}\"\n" + "imageData's writeToFile:filePath atomically:true\n" + 'return "OK"\n' + ) + + result = subprocess.run( + [osascript, "-e", script], + capture_output=True, + text=True, + timeout=10, + ) + + stdout = result.stdout.strip() if result.stdout else "" + if stdout == "OK" and tmp_path.exists() and tmp_path.stat().st_size > 0: + return tmp_path.read_bytes() + + return None + except Exception: + log.debug("macOS osascript clipboard read failed", exc_info=True) + return None + finally: + if tmp_path is not None: + try: + tmp_path.unlink(missing_ok=True) + except Exception: + pass + + # ------------------------------------------------------------------ + # Linux — xclip / wl-paste + # ------------------------------------------------------------------ + + def _read_clipboard_linux(self) -> bytes | None: + """Read clipboard image on Linux / WSL. + + Tier 1: Pillow ImageGrab + Tier 2: xclip (X11) or wl-paste (Wayland) + """ + # Tier 1 — Pillow + result = self._read_clipboard_pil() + if result is not None: + return result + + # Tier 2 — detect display server and use appropriate tool + if _is_wayland(): + return self._read_clipboard_wl_paste() + return self._read_clipboard_xclip() + + @staticmethod + def _read_clipboard_xclip() -> bytes | None: + """Read clipboard image via xclip (X11).""" + xclip = shutil.which("xclip") + if xclip is None: + return None + + try: + # xclip -selection clipboard -t image/png -o + result = subprocess.run( + [xclip, "-selection", "clipboard", "-t", "image/png", "-o"], + capture_output=True, + timeout=10, + ) + if result.returncode == 0 and result.stdout: + return result.stdout + # Try common alternative targets + for target in ("image/jpeg", "image/bmp", "image/gif"): + result = subprocess.run( + [xclip, "-selection", "clipboard", "-t", target, "-o"], + capture_output=True, + timeout=5, + ) + if result.returncode == 0 and result.stdout: + # Convert to PNG via Pillow if available + return _convert_to_png(result.stdout) + return None + except Exception: + log.debug("xclip clipboard read failed", exc_info=True) + return None + + @staticmethod + def _read_clipboard_wl_paste() -> bytes | None: + """Read clipboard image via wl-paste (Wayland).""" + wl_paste = shutil.which("wl-paste") + if wl_paste is None: + return None + + try: + result = subprocess.run( + [wl_paste, "-t", "image/png"], + capture_output=True, + timeout=10, + ) + if result.returncode == 0 and result.stdout: + return result.stdout + return None + except Exception: + log.debug("wl-paste clipboard read failed", exc_info=True) + return None + + # ------------------------------------------------------------------ + # Vision model auto-description (for output_format="text") + # ------------------------------------------------------------------ + + async def _describe_via_vision( + self, + image_bytes: bytes, + arguments: ClipboardScreenshotToolInput, + context: ToolExecutionContext, + ) -> ToolResult: + """Describe the clipboard image using a configured vision model.""" + vision_config = context.metadata.get("vision_model_config", None) + + # Fallback: load directly from settings when metadata is not injected + # (key absent from metadata dict, not just empty value) + if vision_config is None: + try: + from openharness.config.settings import load_settings + + settings_vision = load_settings().vision + if settings_vision.is_configured: + vision_config = { + "model": settings_vision.model, + "api_key": settings_vision.api_key, + "base_url": settings_vision.base_url or "", + } + except Exception: + pass + + if not vision_config: + return ToolResult( + output=( + "clipboard_screenshot: vision model is not configured. " + "Set vision.model and vision.api_key in your settings, " + "or use output_format='base64' and pipe the result to " + "image_to_text manually." + ), + is_error=True, + ) + + model = vision_config["model"] + api_key = vision_config["api_key"] + base_url = vision_config.get("base_url", "") + + prompt = arguments.description_prompt or ( + "Describe this screenshot in detail, including any visible text, " + "UI elements, error messages, code, diagrams, or data. Be precise " + "so that a text-only AI can fully understand the content." + ) + + try: + description = await self._call_vision( + image_bytes=image_bytes, + prompt=prompt, + model=model, + api_key=api_key, + base_url=base_url, + ) + except Exception as exc: + log.exception("clipboard_screenshot: vision model call failed") + return ToolResult( + output=f"clipboard_screenshot: vision model error: {exc}", + is_error=True, + ) + + return ToolResult(output=f"[Clipboard screenshot description via {model}]\n\n{description}") + + @staticmethod + async def _call_vision( + *, + image_bytes: bytes, + prompt: str, + model: str, + api_key: str, + base_url: str, + ) -> str: + """Call a vision-capable model to describe the image.""" + from openharness.api.client import ApiMessageRequest + from openharness.api.openai_client import OpenAICompatibleClient + from openharness.engine.messages import ( + ConversationMessage, + ImageBlock, + TextBlock, + ) + + b64_data = base64.b64encode(image_bytes).decode("ascii") + + user_content: list[Any] = [TextBlock(text=prompt)] + user_content.append(ImageBlock(media_type="image/png", data=b64_data)) + user_message = ConversationMessage(role="user", content=user_content) + + client = OpenAICompatibleClient(api_key=api_key, base_url=base_url or None) + + collected_text = "" + async for event in client.stream_message( + ApiMessageRequest( + model=model, + messages=[user_message], + system_prompt="", + max_tokens=2048, + tools=[], + ) + ): + from openharness.api.client import ApiMessageCompleteEvent, ApiTextDeltaEvent + + if isinstance(event, ApiTextDeltaEvent): + collected_text += event.text + elif isinstance(event, ApiMessageCompleteEvent): + text = event.message.text + if text and text not in collected_text: + collected_text = text + + return collected_text.strip() or "(no description returned)" + + +# --------------------------------------------------------------------------- +# Module-level helpers +# --------------------------------------------------------------------------- + + +def _find_windows_powershell() -> Path | None: + """Resolve the path to Windows PowerShell 5.1 (powershell.exe).""" + # Try the canonical system path first + candidates = [ + Path(r"C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe"), + ] + for candidate in candidates: + if candidate.is_file(): + return candidate + + # Fall back to PATH lookup + found = shutil.which("powershell.exe") + if found: + return Path(found) + return None + + +def _is_wayland() -> bool: + """Return True when the active display server is Wayland.""" + return os.environ.get("WAYLAND_DISPLAY", "") != "" or os.environ.get("XDG_SESSION_TYPE", "") == "wayland" + + +def _convert_to_png(raw: bytes) -> bytes | None: + """Convert raw image bytes to PNG using Pillow (if available).""" + try: + from PIL import Image # type: ignore[import-untyped] + import io + + img = Image.open(io.BytesIO(raw)) + buf = io.BytesIO() + img.save(buf, format="PNG") + return buf.getvalue() + except Exception: + return None + + +def os_close_fd(fd: int) -> None: + """Close an open file descriptor; best-effort.""" + import os as _os + + try: + _os.close(fd) + except OSError: + pass \ No newline at end of file diff --git a/src/openharness/ui/app.py b/src/openharness/ui/app.py index c89dc04c9..69b0c8f1c 100644 --- a/src/openharness/ui/app.py +++ b/src/openharness/ui/app.py @@ -53,6 +53,7 @@ async def run_repl( restore_messages: list[dict] | None = None, restore_tool_metadata: dict[str, object] | None = None, permission_mode: str | None = None, + show_thinking: bool | None = None, ) -> None: """Run the default OpenHarness interactive application (React TUI).""" if backend_only: @@ -101,6 +102,7 @@ async def run_task_worker( api_format: str | None = None, api_client: SupportsStreamingMessages | None = None, permission_mode: str | None = None, + show_thinking: bool = False, ) -> None: """Run a stdin-driven headless worker for background agent tasks. @@ -119,11 +121,14 @@ async def _print_system(message: str) -> None: print(message, flush=True) async def _render_event(event: StreamEvent) -> None: - from openharness.engine.stream_events import AssistantTextDelta, AssistantTurnComplete, ErrorEvent, StatusEvent + from openharness.engine.stream_events import AssistantTextDelta, AssistantThinkingDelta, AssistantTurnComplete, ErrorEvent, StatusEvent if isinstance(event, AssistantTextDelta): sys.stdout.write(event.text) sys.stdout.flush() + elif isinstance(event, AssistantThinkingDelta): + sys.stdout.write(event.text) + sys.stdout.flush() elif isinstance(event, AssistantTurnComplete): sys.stdout.write("\n") sys.stdout.flush() @@ -149,6 +154,7 @@ async def _clear_output() -> None: ask_user_prompt=_noop_ask, enforce_max_turns=max_turns is not None, permission_mode=permission_mode, + show_thinking=show_thinking, ) await start_runtime(bundle) try: @@ -189,10 +195,12 @@ async def run_print_mode( api_client: SupportsStreamingMessages | None = None, permission_mode: str | None = None, max_turns: int | None = None, + show_thinking: bool | None = None, ) -> None: """Non-interactive mode: submit prompt, stream output, exit.""" from openharness.engine.stream_events import ( AssistantTextDelta, + AssistantThinkingDelta, AssistantTurnComplete, CompactProgressEvent, ErrorEvent, @@ -221,6 +229,7 @@ async def _noop_ask(question: str) -> str: api_client=api_client, permission_prompt=_noop_permission, ask_user_prompt=_noop_ask, + show_thinking=show_thinking, ) await start_runtime(bundle) @@ -248,6 +257,15 @@ async def _render_event(event: StreamEvent) -> None: obj = {"type": "assistant_delta", "text": event.text} print(json.dumps(obj), flush=True) events_list.append(obj) + elif isinstance(event, AssistantThinkingDelta): + collected_text += event.text + if output_format == "text": + sys.stderr.write(event.text) + sys.stderr.flush() + elif output_format == "stream-json": + obj = {"type": "thinking_delta", "text": event.text} + print(json.dumps(obj), flush=True) + events_list.append(obj) elif isinstance(event, AssistantTurnComplete): if output_format == "text": sys.stdout.write("\n") diff --git a/src/openharness/ui/backend_host.py b/src/openharness/ui/backend_host.py index c8c20dcd5..6d685078c 100644 --- a/src/openharness/ui/backend_host.py +++ b/src/openharness/ui/backend_host.py @@ -23,6 +23,7 @@ from openharness.themes import list_themes from openharness.engine.stream_events import ( AssistantTextDelta, + AssistantThinkingDelta, AssistantTurnComplete, CompactProgressEvent, ErrorEvent, @@ -272,6 +273,10 @@ async def _print_system(message: str) -> None: ) async def _render_event(event: StreamEvent) -> None: + if isinstance(event, AssistantThinkingDelta): + print(f"[DEBUG] Sending thinking_delta: {event.text[:50]}...", file=sys.stderr) + await self._emit(BackendEvent(type="thinking_delta", message=event.text)) + return if isinstance(event, AssistantTextDelta): await self._emit(BackendEvent(type="assistant_delta", message=event.text)) return diff --git a/src/openharness/ui/output.py b/src/openharness/ui/output.py index e79192c4a..8dd71b4b0 100644 --- a/src/openharness/ui/output.py +++ b/src/openharness/ui/output.py @@ -9,6 +9,7 @@ from openharness.engine.stream_events import ( AssistantTextDelta, + AssistantThinkingDelta, AssistantTurnComplete, CompactProgressEvent, StreamEvent, @@ -27,6 +28,7 @@ def __init__(self, style_name: str = "default") -> None: self._style_name = style_name self._spinner_status = None self._last_tool_input: dict | None = None + self._thinking_active = False def set_style(self, style_name: str) -> None: self._style_name = style_name @@ -44,6 +46,7 @@ def show_thinking(self) -> None: def start_assistant_turn(self) -> None: self._stop_spinner() # Stop the thinking spinner when output starts + self._thinking_active = False if self._assistant_line_open: self.console.print() self._assistant_buffer = "" @@ -55,11 +58,27 @@ def start_assistant_turn(self) -> None: def render_event(self, event: StreamEvent) -> None: if isinstance(event, AssistantTextDelta): + # If we were showing thinking, add blank line separator before normal text + if self._thinking_active: + self._thinking_active = False + self.console.print() # End thinking line + self.console.print() # Blank line separator self._assistant_buffer += event.text # Stream raw text for responsiveness self.console.print(event.text, end="", markup=False, highlight=False) return + if isinstance(event, AssistantThinkingDelta): + if not self._thinking_active: + self._thinking_active = True + # First thinking event: print prefix with newline before + if self._style_name != "minimal": + self.console.print("\n[dim]Think: [/dim]", end="") + else: + self.console.print("Think: ", end="", style="dim") + self.console.print(event.text, end="", markup=False, highlight=False, style="dim") + return + if isinstance(event, AssistantTurnComplete): if self._assistant_line_open: self.console.print() diff --git a/src/openharness/ui/protocol.py b/src/openharness/ui/protocol.py index 780bafa62..46efdf3fc 100644 --- a/src/openharness/ui/protocol.py +++ b/src/openharness/ui/protocol.py @@ -60,7 +60,7 @@ class FrontendRequest(BaseModel): class TranscriptItem(BaseModel): """One transcript row rendered by the frontend.""" - role: Literal["system", "user", "assistant", "tool", "tool_result", "log"] + role: Literal["system", "user", "assistant", "thinking", "tool", "tool_result", "log"] text: str tool_name: str | None = None tool_input: dict[str, Any] | None = None @@ -96,6 +96,7 @@ class BackendEvent(BaseModel): "tasks_snapshot", "transcript_item", "compact_progress", + "thinking_delta", "assistant_delta", "assistant_complete", "line_complete", diff --git a/src/openharness/ui/runtime.py b/src/openharness/ui/runtime.py index feda65920..45684283c 100644 --- a/src/openharness/ui/runtime.py +++ b/src/openharness/ui/runtime.py @@ -297,6 +297,7 @@ async def build_runtime( memory_backend: MemoryCommandBackend | None = None, include_project_memory: bool = True, autodream_context: dict[str, object] | None = None, + show_thinking: bool | None = None, ) -> RuntimeBundle: """Build the shared runtime for an OpenHarness session.""" settings_overrides: dict[str, Any] = { @@ -309,6 +310,7 @@ async def build_runtime( "api_format": api_format, "active_profile": active_profile, "permission_mode": permission_mode, + "show_thinking": show_thinking, } settings = load_settings().merge_cli_overrides(**settings_overrides) cwd = str(Path(cwd).expanduser().resolve()) if cwd else str(Path.cwd()) diff --git a/src/openharness/ui/textual_app.py b/src/openharness/ui/textual_app.py index a0635d2a5..82b8a8261 100644 --- a/src/openharness/ui/textual_app.py +++ b/src/openharness/ui/textual_app.py @@ -19,6 +19,7 @@ from openharness.coordinator.coordinator_mode import is_coordinator_mode from openharness.engine.stream_events import ( AssistantTextDelta, + AssistantThinkingDelta, AssistantTurnComplete, CompactProgressEvent, ErrorEvent, @@ -325,6 +326,10 @@ async def _print_system(self, message: str) -> None: self._set_current_response("Ready.") async def _render_event(self, event: StreamEvent) -> None: + if isinstance(event, AssistantThinkingDelta): + self._assistant_buffer += event.text + self._set_current_response(f"[dim]\u2500\u2500 Thinking \u2500\u2500[/dim] {self._assistant_buffer}") + return if isinstance(event, AssistantTextDelta): self._assistant_buffer += event.text self._set_current_response(f"[bold]assistant>[/bold] {self._assistant_buffer}") diff --git a/tests/test_tools/test_clipboard_screenshot_tool.py b/tests/test_tools/test_clipboard_screenshot_tool.py new file mode 100644 index 000000000..da5895413 --- /dev/null +++ b/tests/test_tools/test_clipboard_screenshot_tool.py @@ -0,0 +1,421 @@ +"""Tests for the clipboard_screenshot tool.""" + +from __future__ import annotations + +import base64 +import platform +from pathlib import Path +from unittest import mock + +import pytest + +from openharness.tools.base import ToolExecutionContext +from openharness.tools.clipboard_screenshot_tool import ( + ClipboardScreenshotTool, + ClipboardScreenshotToolInput, +) + +_is_win = platform.system() == "Windows" +_is_mac = platform.system() == "Darwin" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _fake_png_bytes() -> bytes: + """Return valid minimal PNG bytes for testing. Skip if Pillow not installed.""" + try: + from PIL import Image as _Image # type: ignore[import-untyped] + import io as _io + + buf = _io.BytesIO() + img = _Image.new("RGB", (10, 10), color="red") + img.save(buf, format="PNG") + return buf.getvalue() + except ImportError: + pytest.skip("Pillow not installed") + + +def _make_ctx(cwd: Path | None = None) -> ToolExecutionContext: + return ToolExecutionContext(cwd=cwd or Path.cwd()) + + +# --------------------------------------------------------------------------- +# Basic tool properties +# --------------------------------------------------------------------------- + + +def test_tool_name_and_description(): + tool = ClipboardScreenshotTool() + assert tool.name == "clipboard_screenshot" + assert "system clipboard" in tool.description.lower() + + +def test_input_model_is_pydantic(): + inp = ClipboardScreenshotToolInput() + assert inp.output_format == "base64" + assert inp.save_path is None + assert inp.description_prompt is None + + +def test_is_read_only(): + tool = ClipboardScreenshotTool() + assert tool.is_read_only(ClipboardScreenshotToolInput()) is True + + +def test_api_schema_generation(): + tool = ClipboardScreenshotTool() + schema = tool.to_api_schema() + assert schema["name"] == "clipboard_screenshot" + assert "input_schema" in schema + + +# --------------------------------------------------------------------------- +# output_format = "base64" +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_base64_output(tmp_path: Path): + tool = ClipboardScreenshotTool() + png = _fake_png_bytes() + + with mock.patch.object( + tool, + "_read_clipboard_image", + return_value=png, + ): + result = await tool.execute( + ClipboardScreenshotToolInput(output_format="base64"), + _make_ctx(tmp_path), + ) + + assert not result.is_error + assert "Clipboard image captured" in result.output + meta = result.metadata + assert meta["size_bytes"] == len(png) + assert meta["media_type"] == "image/png" + # Verify it's valid base64 + decoded = base64.b64decode(meta["image_data"]) + assert decoded == png + + +# --------------------------------------------------------------------------- +# output_format = "file" +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_file_output_default_path(tmp_path: Path): + tool = ClipboardScreenshotTool() + png = _fake_png_bytes() + + with mock.patch.object( + tool, + "_read_clipboard_image", + return_value=png, + ): + result = await tool.execute( + ClipboardScreenshotToolInput(output_format="file"), + _make_ctx(tmp_path), + ) + + assert not result.is_error + assert "Saved clipboard image" in result.output + written_path = Path(result.metadata["path"]) + assert written_path.name == "clipboard_screenshot.png" + assert written_path.read_bytes() == png + + +@pytest.mark.asyncio +async def test_file_output_custom_path(tmp_path: Path): + tool = ClipboardScreenshotTool() + png = _fake_png_bytes() + custom = tmp_path / "sub" / "my_screenshot.png" + + with mock.patch.object( + tool, + "_read_clipboard_image", + return_value=png, + ): + result = await tool.execute( + ClipboardScreenshotToolInput( + output_format="file", + save_path=str(custom), + ), + _make_ctx(tmp_path), + ) + + assert not result.is_error + assert custom.read_bytes() == png + + +# --------------------------------------------------------------------------- +# output_format = "text" (vision model) +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_text_output_no_vision_config(): + tool = ClipboardScreenshotTool() + png = _fake_png_bytes() + ctx = _make_ctx() + ctx.metadata["vision_model_config"] = {} # key exists but empty → no fallback + + with mock.patch.object( + tool, + "_read_clipboard_image", + return_value=png, + ): + result = await tool.execute( + ClipboardScreenshotToolInput(output_format="text"), + ctx, + ) + + assert result.is_error + assert "vision model" in result.output.lower() + + +@pytest.mark.asyncio +async def test_text_output_with_vision_config(): + tool = ClipboardScreenshotTool() + png = _fake_png_bytes() + ctx = _make_ctx() + ctx.metadata["vision_model_config"] = { + "model": "gpt-4o", + "api_key": "sk-fake", + "base_url": "", + } + + fake_description = "A screenshot of a terminal window showing a Python traceback." + + with mock.patch.object( + tool, + "_read_clipboard_image", + return_value=png, + ): + with mock.patch.object( + tool, + "_call_vision", + return_value=fake_description, + ): + result = await tool.execute( + ClipboardScreenshotToolInput(output_format="text"), + ctx, + ) + + assert not result.is_error + assert fake_description in result.output + + +# --------------------------------------------------------------------------- +# Error case: no image in clipboard +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_no_image_in_clipboard(): + tool = ClipboardScreenshotTool() + + with mock.patch.object( + tool, + "_read_clipboard_image", + return_value=None, + ): + result = await tool.execute( + ClipboardScreenshotToolInput(), + _make_ctx(), + ) + + assert result.is_error + assert "no image" in result.output.lower() + + +# --------------------------------------------------------------------------- +# _read_clipboard_pil: unit tests +# --------------------------------------------------------------------------- + + +def test_read_clipboard_pil_no_pillow(): + """When Pillow is not installed, return None.""" + with mock.patch.dict("sys.modules", {"PIL": None}): + result = ClipboardScreenshotTool._read_clipboard_pil() + assert result is None + + +def test_read_clipboard_pil_no_image(): + """When clipboard has no image, ImageGrab returns None.""" + with mock.patch("PIL.ImageGrab.grabclipboard", return_value=None): + result = ClipboardScreenshotTool._read_clipboard_pil() + assert result is None + + +def test_read_clipboard_pil_with_image(): + """When clipboard has an image, return PNG bytes.""" + fake_png = _fake_png_bytes() # real PNG from a real PIL image for realism + + mock_img = mock.MagicMock() + mock_img.mode = "RGBA" + + def _mock_save(buf, format=None): + buf.write(fake_png) + + mock_img.save = _mock_save + + with mock.patch("PIL.ImageGrab.grabclipboard", return_value=mock_img): + result = ClipboardScreenshotTool._read_clipboard_pil() + + assert result is not None + assert result == fake_png + + +# --------------------------------------------------------------------------- +# _read_clipboard_powershell: unit tests (Windows-only) +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif(not _is_win, reason="PowerShell clipboard is Windows-only") +def test_powershell_no_powershell_exe(): + """When powershell.exe is not found, return None.""" + with ( + mock.patch( + "openharness.tools.clipboard_screenshot_tool._find_windows_powershell", + return_value=None, + ), + ): + result = ClipboardScreenshotTool._read_clipboard_powershell() + assert result is None + + +@pytest.mark.skipif(not _is_win, reason="PowerShell clipboard is Windows-only") +def test_powershell_no_image_in_clipboard(): + """When clipboard is empty, PowerShell outputs NO_IMAGE.""" + fake_ps = Path(r"C:\fake\powershell.exe") + + with ( + mock.patch( + "openharness.tools.clipboard_screenshot_tool._find_windows_powershell", + return_value=fake_ps, + ), + mock.patch("subprocess.run") as mock_run, + ): + mock_run.return_value = mock.MagicMock( + stdout="NO_IMAGE", stderr="", returncode=0 + ) + result = ClipboardScreenshotTool._read_clipboard_powershell() + assert result is None + + +@pytest.mark.skipif(not _is_win, reason="PowerShell clipboard is Windows-only") +def test_powershell_image_found(tmp_path: Path): + """When clipboard has an image, PowerShell saves it and we read it.""" + import subprocess as _subprocess + + png = _fake_png_bytes() + tmp_file = tmp_path / "test_clip.png" + tmp_file.write_bytes(png) + + fake_ps = Path(r"C:\fake\powershell.exe") + + fake_result = _subprocess.CompletedProcess( + args=[], returncode=0, stdout="OK", stderr="" + ) + + with ( + mock.patch( + "openharness.tools.clipboard_screenshot_tool._find_windows_powershell", + return_value=fake_ps, + ), + mock.patch("tempfile.mkstemp", return_value=(999, str(tmp_file))), + mock.patch("os.close"), + mock.patch("subprocess.run", return_value=fake_result), + ): + result = ClipboardScreenshotTool._read_clipboard_powershell() + + assert result == png + + +# --------------------------------------------------------------------------- +# macOS osascript: unit tests (macOS-only) +# --------------------------------------------------------------------------- + + +@pytest.mark.skipif(not _is_mac, reason="osascript clipboard is macOS-only") +def test_macos_osascript_not_found(): + with mock.patch("shutil.which", return_value=None): + result = ClipboardScreenshotTool._read_clipboard_macos_osascript() + assert result is None + + +@pytest.mark.skipif(not _is_mac, reason="osascript clipboard is macOS-only") +def test_macos_osascript_no_image(): + with ( + mock.patch("shutil.which", return_value="/usr/bin/osascript"), + mock.patch("subprocess.run") as mock_run, + mock.patch("tempfile.mkstemp", return_value=(888, "/tmp/oh_test.png")), + ): + mock_run.return_value = mock.MagicMock( + stdout="NO_IMAGE", stderr="", returncode=0 + ) + result = ClipboardScreenshotTool._read_clipboard_macos_osascript() + assert result is None + + +# --------------------------------------------------------------------------- +# Linux xclip / wl-paste: unit tests +# --------------------------------------------------------------------------- + + +def test_xclip_not_found(): + with mock.patch("shutil.which", return_value=None): + result = ClipboardScreenshotTool._read_clipboard_xclip() + assert result is None + + +def test_xclip_image_found(): + png = _fake_png_bytes() + + with ( + mock.patch("shutil.which", return_value="/usr/bin/xclip"), + mock.patch("subprocess.run") as mock_run, + ): + mock_run.return_value = mock.MagicMock( + returncode=0, stdout=png + ) + result = ClipboardScreenshotTool._read_clipboard_xclip() + assert result == png + + +def test_wl_paste_not_found(): + with mock.patch("shutil.which", return_value=None): + result = ClipboardScreenshotTool._read_clipboard_wl_paste() + assert result is None + + +def test_wl_paste_image_found(): + png = _fake_png_bytes() + + with ( + mock.patch("shutil.which", return_value="/usr/bin/wl-paste"), + mock.patch("subprocess.run") as mock_run, + ): + mock_run.return_value = mock.MagicMock( + returncode=0, stdout=png + ) + result = ClipboardScreenshotTool._read_clipboard_wl_paste() + assert result == png + + +# --------------------------------------------------------------------------- +# Tool registry integration +# --------------------------------------------------------------------------- + + +def test_registry_includes_clipboard_screenshot(): + from openharness.tools import create_default_tool_registry + + registry = create_default_tool_registry() + tool = registry.get("clipboard_screenshot") + assert tool is not None + assert tool.name == "clipboard_screenshot"