Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 205 additions & 0 deletions docs/designs/class-based-sdk/00_core.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
"""The base classes this folder assumes. (POC, does not run.)

The numbered files 01-08 write `class HotelAgent(ag.Application)`, `class
RubricJudge(ag.Evaluator)`, etc. This file is where those bases come from —
implemented CLASS-FIRST, as the native foundation of the class-only proposal.

Compare the three `00_*` foundations across the peer folders:

./00_core.py bases as native classes (this file)
../functional-based-class-sdk/00_core.py the SAME bases on the functional core
../function-based-sdk/00_core.py the function form's foundation (no
base classes — decorators + closures)

In this native version each base registers directly: its `__init_subclass__`
reads the inner Parameters/Inputs/Outputs models and the run/evaluate method off
the subclass and calls the engine. There is no decorator/function layer beneath
— the class is the registration path.

Grounded in the real SDK (sdks/python/agenta/sdk/):
- `decorators/running.py` has the `Workflow` handle and the `workflow`
registrar this delegates to.
- `models/workflows.py::WorkflowFlags` encodes kind as flags: `is_application`,
`is_evaluator`, `has_handler`. Application and Evaluator are the same workflow
with a different flag; Configuration has `has_handler=False`.

`Testset` is intentionally NOT a workflow (no parameters, no handler, no schemas
triple, absent from WorkflowFlags). It gets its own base at the bottom.
"""

from __future__ import annotations

from typing import Any, Optional

from agenta import workflow as _register # the existing `workflow` registrar
from agenta.sdk.models.workflows import WorkflowFlags


# =========================================================================
# Shared base. `__init_subclass__` compiles the inner models + handler method
# into a registered workflow. Subclasses set `_flags` (kind) and `_handler_name`
# (which method is the handler — None for config-only).
# =========================================================================


class _WorkflowClass:
_handler_name: Optional[str] = "run"

def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
if not getattr(cls, "slug", None):
return # a base (Application/Evaluator/...), not a user workflow

instance = cls() # holds __init__ resources (clients, indexes)
method = getattr(cls, cls._handler_name, None) if cls._handler_name else None

schemas = _compile_schemas(
getattr(cls, "Parameters", None),
getattr(cls, "Inputs", None),
getattr(cls, "Outputs", None),
)

registrar = _register(
slug=cls.slug,
name=getattr(cls, "name", None),
description=getattr(cls, "description", None),
flags=cls._flags().model_dump(),
schemas=schemas,
)

if method is None:
cls._handle = registrar # config-only: schemas, no handler
else:

async def handler(**kw):
return await method(instance, **kw)

cls._handle = registrar(handler)

@classmethod
def _flags(cls) -> WorkflowFlags: # overridden per kind
raise NotImplementedError

# Authoring/lifecycle surface — same names the examples call.
def __init__(self, *, parameters: dict | None = None):
self._bound = (
type(self)._handle.pin(**parameters) if parameters else type(self)._handle
)

async def __call__(self, **kw):
return await self._bound(**kw)

def __getattr__(self, item):
# A configured instance delegates router/invoke/inspect/... to its
# pinned handle.
return getattr(object.__getattribute__(self, "_bound"), item)

@classmethod
async def apush(cls):
return await cls._handle.push()

@classmethod
async def adeploy(cls, **k):
return await cls._handle.deploy(**k)

@classmethod
async def inspect(cls):
return await cls._handle.inspect()

@classmethod
def router(cls, *a, **k):
return cls._handle.router(*a, **k)

@classmethod
async def afetch_parameters(cls, **k):
return await cls._handle.fetch_parameters(**k)

@classmethod
async def afrom_registry(cls, **k):
return await cls._handle.from_registry(**k)


def _compile_schemas(parameters, inputs, outputs) -> dict:
return {
"parameters": parameters.model_json_schema() if parameters else None,
"inputs": inputs.model_json_schema() if inputs else None,
"outputs": outputs.model_json_schema() if outputs else None,
}


# =========================================================================
# The kinds. Each is two lines: which handler method, which flag.
# =========================================================================


class Application(_WorkflowClass):
_handler_name = "run"

@classmethod
def _flags(cls):
return WorkflowFlags(is_application=True, has_handler=True)


class Evaluator(_WorkflowClass):
_handler_name = "evaluate"

@classmethod
def _flags(cls):
return WorkflowFlags(is_evaluator=True, has_handler=True)


class Configuration(_WorkflowClass):
_handler_name = None # config-only: no runnable

@classmethod
def _flags(cls):
return WorkflowFlags(is_application=True, has_handler=False)

@classmethod
async def afetch(cls, **k):
return await cls._handle.fetch(**k)

@classmethod
def fetch(cls, **k):
return cls._handle.fetch_sync(**k)


# =========================================================================
# NOT a workflow. Testset is a typed row collection — inner `Case` + seed
# `cases`, no parameters/handler/schemas-triple, absent from WorkflowFlags. Its
# own base, so the hierarchy matches the data model.
# =========================================================================


class Testset:
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
if not getattr(cls, "slug", None):
return
cls._handle = _register_testset(
slug=cls.slug,
name=getattr(cls, "name", None),
case=cls.Case,
cases=getattr(cls, "cases", None),
)

@classmethod
async def apush(cls) -> Any:
return await cls._handle.push()

@classmethod
async def afetch(cls) -> Any:
return await cls._handle.fetch()

@classmethod
async def aadd(cls, **k) -> Any:
return await cls._handle.add(**k)

@classmethod
async def afrom_traces(cls, **k) -> Any:
return await cls._handle.from_traces(**k)


# In the real SDK this is the testset registrar (column schema = Case), a
# different engine path from `workflow`. Stubbed here as the contract.
def _register_testset(*, slug, name, case, cases): ...
136 changes: 136 additions & 0 deletions docs/designs/class-based-sdk/01_application.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""An application as a class. (POC, does not run.)

The class is the workflow:
- `Parameters`, `Inputs`, `Outputs` compile to the revision's JSON schemas.
- `run()` is the handler. It is auto-instrumented: one root span per invocation
with inputs, outputs, cost, and token capture.
- `stream()` is optional. Declare it only if the app can stream.
- `slug`, `name`, `description` map to the workflow identity on the platform.
"""

import asyncio
from typing import AsyncIterator

from openai import AsyncOpenAI
from pydantic import BaseModel, Field

import agenta as ag

from core import Application # 00_core.py — the native class base

ag.Application = Application # what the SDK __init__ would export


class HotelAgent(ag.Application):
slug = "hotel-agent"
name = "Hotel Agent"
description = "Concierge agent that answers questions about the hotel."

class Parameters(BaseModel):
"""Playground-editable configuration. Becomes schemas.parameters."""

prompt: ag.PromptTemplate = ag.PromptTemplate(
messages=[
ag.Message(
role="system",
content="You are the concierge of {{hotel_name}}. Be brief.",
),
ag.Message(role="user", content="{{message}}"),
],
llm_config=ag.ModelConfig(model="gpt-4o-mini", temperature=0.2),
)
hotel_name: str = "Grand Agenta Hotel"
top_k: int = Field(4, ge=1, le=20, description="Documents to retrieve")

class Inputs(BaseModel):
"""Runtime inputs. Becomes schemas.inputs and the expected testset columns."""

message: str
persona: str = "guest"

class Outputs(BaseModel):
"""Becomes schemas.outputs. This is what evaluators receive as `outputs`."""

answer: str
sources: list[str] = []

def __init__(self, **kwargs):
super().__init__(**kwargs)
# Instance state is for request-independent resources only (clients,
# retrievers, indexes). Parameters arrive per request, not here.
self.client = AsyncOpenAI()

async def run(self, *, inputs: Inputs, parameters: Parameters) -> Outputs:
documents = await self.retrieve(query=inputs.message, top_k=parameters.top_k)

prompt = parameters.prompt.format(
hotel_name=parameters.hotel_name,
message=inputs.message,
)
response = await self.client.chat.completions.create(
**prompt.to_openai_kwargs()
)

return self.Outputs(
answer=response.choices[0].message.content,
sources=[doc["id"] for doc in documents],
)

# Nested steps use @ag.instrument as today. They show up as child spans
# under the invocation span.
@ag.instrument(type="retriever")
async def retrieve(self, *, query: str, top_k: int) -> list[dict]:
return [{"id": "faq-12", "text": "The pool is open 7am to 10pm."}]

# Optional. When declared, the server can negotiate streaming responses
# (SSE / NDJSON) from the Accept header. Batch callers still get run().
async def stream(
self, *, inputs: Inputs, parameters: Parameters
) -> AsyncIterator[str]:
prompt = parameters.prompt.format(
hotel_name=parameters.hotel_name,
message=inputs.message,
)
stream = await self.client.chat.completions.create(
stream=True, **prompt.to_openai_kwargs()
)
async for chunk in stream:
yield chunk.choices[0].delta.content or ""


async def main():
ag.init()

# Local typed call. Traced like any decorated handler today.
agent = HotelAgent()
result = await agent(message="Do you have a pool?")
print(result.answer, result.sources)

# Pin a configuration. These values become revision.data.parameters
# when the class is pushed.
boutique = HotelAgent(parameters={"hotel_name": "Hotel California", "top_k": 8})
result = await boutique(message="Can I check out any time I like?")

# Full pipeline (vault -> resolver -> normalizer), same code path the
# platform runs. Plain kwargs; the SDK builds the wire request internally.
# WorkflowInvokeRequest stays as the HTTP body format for /invoke (it also
# carries references, selector, secrets, credentials), but local callers
# never construct it.
_response = await agent.invoke(
inputs={"message": "Is breakfast included?"},
parameters={"top_k": 2},
)

# Inspect returns the interface. Schemas come straight from the class,
# no signature inference.
interface = await HotelAgent.inspect()
print(interface.data.schemas.outputs)

# Push to Agenta: creates or updates the application and returns the
# new revision. Schemas and default parameters travel with it.
revision_id = await HotelAgent.apush()
print(f"pushed revision {revision_id}")


if __name__ == "__main__":
asyncio.run(main())
Loading
Loading