Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 29 additions & 58 deletions src/bub/builtin/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ def tapes(self) -> TapeService:
return TapeService(bub.home / "tapes", tape_store, self.framework.build_tape_context())

@staticmethod
def _events_from_iterable(iterable: Iterable) -> AsyncStreamEvents:
async def generator() -> AsyncIterator:
def _events_from_iterable(iterable: Iterable[StreamEvent]) -> AsyncStreamEvents:
async def generator() -> AsyncIterator[StreamEvent]:
for item in iterable:
yield item

Expand Down Expand Up @@ -181,7 +181,6 @@ async def _agent_loop(
allowed_skills: Collection[str] | None = None,
allowed_tools: Collection[str] | None = None,
) -> AsyncStreamEvents:
next_prompt: str | list[dict] = prompt
display_model = model or self.settings.model
await self.tapes.append_event(
tape.name,
Expand All @@ -196,7 +195,7 @@ async def _agent_loop(
state = StreamState()
iterator = self._stream_events_with_auto_handoff(
tape=tape,
prompt=next_prompt,
prompt=prompt,
state=state,
model=model,
allowed_skills=allowed_skills,
Expand Down Expand Up @@ -232,23 +231,13 @@ async def _stream_events_with_auto_handoff(
async for event in output:
yield event
if event.kind == "error":
elapsed_ms = int((time.monotonic() - start) * 1000)
await self.tapes.append_event(
tape.name,
"loop.step",
{
"step": step,
"elapsed_ms": elapsed_ms,
"status": "error",
"error": event.data.get("message", ""),
"date": datetime.now(UTC).isoformat(),
},
await self._append_loop_step(
tape, step=step, start=start, status="error", error=event.data.get("message", "")
)
elif event.kind == "final":
should_continue = bool(event.data.get("tool_calls") or event.data.get("tool_results"))
except Exception as exc:
error_message = f"{exc!s}"
elapsed_ms = int((time.monotonic() - start) * 1000)
if auto_handoff_remaining > 0 and _is_context_length_error(error_message):
auto_handoff_remaining -= 1
logger.warning(
Expand All @@ -261,63 +250,45 @@ async def _stream_events_with_auto_handoff(
name="auto_handoff/context_overflow",
state={"reason": "context_length_exceeded", "error": error_message},
)
await self.tapes.append_event(
tape.name,
"loop.step",
{
"step": step,
"elapsed_ms": elapsed_ms,
"status": "auto_handoff",
"error": error_message,
"date": datetime.now(UTC).isoformat(),
},
await self._append_loop_step(
tape, step=step, start=start, status="auto_handoff", error=error_message
)
next_prompt = prompt
continue

await self.tapes.append_event(
tape.name,
"loop.step",
{
"step": step,
"elapsed_ms": elapsed_ms,
"status": "error",
"error": error_message,
"date": datetime.now(UTC).isoformat(),
},
)
await self._append_loop_step(tape, step=step, start=start, status="error", error=error_message)
raise

state.error = output.error
state.usage = output.usage
elapsed_ms = int((time.monotonic() - start) * 1000)
if not should_continue:
await self.tapes.append_event(
tape.name,
"loop.step",
{
"step": step,
"elapsed_ms": elapsed_ms,
"status": "ok",
"date": datetime.now(UTC).isoformat(),
},
)
await self._append_loop_step(tape, step=step, start=start, status="ok")
return

next_prompt = self._continue_prompt(tape)
await self.tapes.append_event(
tape.name,
"loop.step",
{
"step": step,
"elapsed_ms": elapsed_ms,
"status": "continue",
"date": datetime.now(UTC).isoformat(),
},
)
await self._append_loop_step(tape, step=step, start=start, status="continue")

raise RuntimeError(f"max_steps_reached={self.settings.max_steps}")

async def _append_loop_step(
self,
tape: Tape,
*,
step: int,
start: float,
status: str,
error: str | None = None,
) -> None:
payload: dict[str, Any] = {
"step": step,
"elapsed_ms": int((time.monotonic() - start) * 1000),
"status": status,
"date": datetime.now(UTC).isoformat(),
}
if error is not None:
payload["error"] = error
await self.tapes.append_event(tape.name, "loop.step", payload)

def _load_skills_prompt(self, prompt: str, workspace: Path, allowed_skills: set[str] | None = None) -> str:
skill_index = {
skill.name.casefold(): skill
Expand Down
Loading