From b36032beb7c35f52d671c43e49598d78b71756e1 Mon Sep 17 00:00:00 2001 From: Mahmoud Mabrouk Date: Wed, 10 Jun 2026 21:53:23 +0200 Subject: [PATCH 1/2] design(sdk): class-oriented authoring API proposal (POC) --- .../designs/class-based-sdk/01_application.py | 132 ++++++++++ docs/designs/class-based-sdk/02_evaluators.py | 130 ++++++++++ docs/designs/class-based-sdk/03_run_evals.py | 65 +++++ .../class-based-sdk/04_config_registry.py | 83 +++++++ docs/designs/class-based-sdk/05_serve.py | 78 ++++++ .../class-based-sdk/06_framework_adapters.py | 225 ++++++++++++++++++ .../designs/class-based-sdk/07_config_only.py | 89 +++++++ docs/designs/class-based-sdk/08_testsets.py | 96 ++++++++ docs/designs/class-based-sdk/README.md | 64 +++++ 9 files changed, 962 insertions(+) create mode 100644 docs/designs/class-based-sdk/01_application.py create mode 100644 docs/designs/class-based-sdk/02_evaluators.py create mode 100644 docs/designs/class-based-sdk/03_run_evals.py create mode 100644 docs/designs/class-based-sdk/04_config_registry.py create mode 100644 docs/designs/class-based-sdk/05_serve.py create mode 100644 docs/designs/class-based-sdk/06_framework_adapters.py create mode 100644 docs/designs/class-based-sdk/07_config_only.py create mode 100644 docs/designs/class-based-sdk/08_testsets.py create mode 100644 docs/designs/class-based-sdk/README.md diff --git a/docs/designs/class-based-sdk/01_application.py b/docs/designs/class-based-sdk/01_application.py new file mode 100644 index 0000000000..2247577de2 --- /dev/null +++ b/docs/designs/class-based-sdk/01_application.py @@ -0,0 +1,132 @@ +"""An application as a class. (POC, does not run.) + +The class is the workflow: +- `Parameters`, `Inputs`, `Outputs` compile to the revision's JSON schemas. +- `run()` is the handler. It is auto-instrumented: one root span per invocation + with inputs, outputs, cost, and token capture. +- `stream()` is optional. Declare it only if the app can stream. +- `slug`, `name`, `description` map to the workflow identity on the platform. +""" + +import asyncio +from typing import AsyncIterator + +from openai import AsyncOpenAI +from pydantic import BaseModel, Field + +import agenta as ag + + +class HotelAgent(ag.Application): + slug = "hotel-agent" + name = "Hotel Agent" + description = "Concierge agent that answers questions about the hotel." + + class Parameters(BaseModel): + """Playground-editable configuration. Becomes schemas.parameters.""" + + prompt: ag.PromptTemplate = ag.PromptTemplate( + messages=[ + ag.Message( + role="system", + content="You are the concierge of {{hotel_name}}. Be brief.", + ), + ag.Message(role="user", content="{{message}}"), + ], + llm_config=ag.ModelConfig(model="gpt-4o-mini", temperature=0.2), + ) + hotel_name: str = "Grand Agenta Hotel" + top_k: int = Field(4, ge=1, le=20, description="Documents to retrieve") + + class Inputs(BaseModel): + """Runtime inputs. Becomes schemas.inputs and the expected testset columns.""" + + message: str + persona: str = "guest" + + class Outputs(BaseModel): + """Becomes schemas.outputs. This is what evaluators receive as `outputs`.""" + + answer: str + sources: list[str] = [] + + def __init__(self, **kwargs): + super().__init__(**kwargs) + # Instance state is for request-independent resources only (clients, + # retrievers, indexes). Parameters arrive per request, not here. + self.client = AsyncOpenAI() + + async def run(self, *, inputs: Inputs, parameters: Parameters) -> Outputs: + documents = await self.retrieve(query=inputs.message, top_k=parameters.top_k) + + prompt = parameters.prompt.format( + hotel_name=parameters.hotel_name, + message=inputs.message, + ) + response = await self.client.chat.completions.create( + **prompt.to_openai_kwargs() + ) + + return self.Outputs( + answer=response.choices[0].message.content, + sources=[doc["id"] for doc in documents], + ) + + # Nested steps use @ag.instrument as today. They show up as child spans + # under the invocation span. + @ag.instrument(type="retriever") + async def retrieve(self, *, query: str, top_k: int) -> list[dict]: + return [{"id": "faq-12", "text": "The pool is open 7am to 10pm."}] + + # Optional. When declared, the server can negotiate streaming responses + # (SSE / NDJSON) from the Accept header. Batch callers still get run(). + async def stream( + self, *, inputs: Inputs, parameters: Parameters + ) -> AsyncIterator[str]: + prompt = parameters.prompt.format( + hotel_name=parameters.hotel_name, + message=inputs.message, + ) + stream = await self.client.chat.completions.create( + stream=True, **prompt.to_openai_kwargs() + ) + async for chunk in stream: + yield chunk.choices[0].delta.content or "" + + +async def main(): + ag.init() + + # Local typed call. Traced like any decorated handler today. + agent = HotelAgent() + result = await agent(message="Do you have a pool?") + print(result.answer, result.sources) + + # Pin a configuration. These values become revision.data.parameters + # when the class is pushed. + boutique = HotelAgent(parameters={"hotel_name": "Hotel California", "top_k": 8}) + result = await boutique(message="Can I check out any time I like?") + + # Full pipeline (vault -> resolver -> normalizer), same code path the + # platform runs. Plain kwargs; the SDK builds the wire request internally. + # WorkflowInvokeRequest stays as the HTTP body format for /invoke (it also + # carries references, selector, secrets, credentials), but local callers + # never construct it. + _response = await agent.invoke( + inputs={"message": "Is breakfast included?"}, + parameters={"top_k": 2}, + ) + + # Inspect returns the interface. Schemas come straight from the class, + # no signature inference. + interface = await HotelAgent.inspect() + print(interface.data.schemas.outputs) + + # Push to Agenta: creates or updates the application and returns the + # new revision. Schemas and default parameters travel with it. + revision_id = await HotelAgent.apush() + print(f"pushed revision {revision_id}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/class-based-sdk/02_evaluators.py b/docs/designs/class-based-sdk/02_evaluators.py new file mode 100644 index 0000000000..963bb6bfed --- /dev/null +++ b/docs/designs/class-based-sdk/02_evaluators.py @@ -0,0 +1,130 @@ +"""Evaluators as classes. (POC, does not run.) + +An evaluator is a workflow with `is_evaluator=True`, so the authoring model is +identical to applications. The differences: +- you implement `evaluate()` instead of `run()` +- `Inputs` are the testcase columns the evaluator consumes (permissive by + default, since batch mode passes every column) +- `evaluate()` also receives `outputs` (the application output under + evaluation) and `trace` (the full execution trace) + +`Parameters` are the evaluator settings users edit in the UI. `Outputs` is the +declared metrics schema, which the UI uses to render score columns. +""" + +import asyncio + +from openai import AsyncOpenAI +from pydantic import BaseModel, ConfigDict, Field + +import agenta as ag + + +class StartsCapitalized(ag.Evaluator): + """The smallest possible evaluator: no settings, one metric.""" + + slug = "starts-capitalized" + name = "Starts Capitalized" + + class Outputs(BaseModel): + score: float + success: bool + + async def evaluate(self, *, outputs, **_) -> Outputs: + text = outputs["answer"] if isinstance(outputs, dict) else str(outputs) + ok = bool(text) and text[0].isupper() + return self.Outputs(score=1.0 if ok else 0.0, success=ok) + + +class RubricJudge(ag.Evaluator): + """LLM-as-a-judge with its own typed settings. + + The settings render in the evaluator config UI from schemas.parameters, + exactly like builtin evaluator settings do today. + """ + + slug = "rubric-judge" + name = "Rubric Judge" + description = "Grades the answer against a rubric with an LLM." + + class Parameters(BaseModel): + judge_model: str = "gpt-4o-mini" + rubric: str = Field( + "The answer is factual, polite, and under 100 words.", + json_schema_extra={"x-ag-type": "text"}, + ) + + class Inputs(BaseModel): + # Evaluators receive all testcase columns. Declare the ones you use, + # allow the rest. + model_config = ConfigDict(extra="allow") + expected_answer: str | None = None + + class Outputs(BaseModel): + score: float = Field(ge=0.0, le=1.0) + verdict: str + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.client = AsyncOpenAI() + + async def evaluate( + self, *, inputs: Inputs, outputs, parameters: Parameters, **_ + ) -> Outputs: + response = await self.client.chat.completions.create( + model=parameters.judge_model, + messages=[ + {"role": "system", "content": f"Grade against: {parameters.rubric}"}, + {"role": "user", "content": str(outputs)}, + ], + ) + verdict = response.choices[0].message.content + return self.Outputs(score=0.9, verdict=verdict) + + +class StaysUnderBudget(ag.Evaluator): + """A trace-based evaluator. `trace` exposes the full invocation: spans, + costs, tokens, tool calls. Same data the custom code evaluator gets today, + but typed access instead of a raw dict. + """ + + slug = "stays-under-budget" + name = "Stays Under Budget" + + class Parameters(BaseModel): + max_cost_usd: float = 0.01 + + class Outputs(BaseModel): + success: bool + cost_usd: float + + async def evaluate( + self, *, trace: ag.Trace, parameters: Parameters, **_ + ) -> Outputs: + cost = trace.metrics.costs.cumulative + return self.Outputs(success=cost <= parameters.max_cost_usd, cost_usd=cost) + + +async def main(): + ag.init() + + # Local typed call against a single output. Useful for unit tests. + judge = RubricJudge(parameters={"judge_model": "gpt-4.1"}) + result = await judge( + outputs={"answer": "The pool is open 7am to 10pm."}, + expected_answer="Pool hours are 7-22.", + ) + print(result.score, result.verdict) + + # Builtins are classes too: typed configurators over the existing + # agenta:builtin:* handlers, instead of settings dicts. + _exact = ag.evaluators.ExactMatch(correct_answer_key="expected_answer") + _similarity = ag.evaluators.SemanticSimilarity(threshold=0.8) + + # Push registers the evaluator on the platform with its compiled schemas. + # It then shows up in the UI next to the builtins. + await RubricJudge.apush() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/class-based-sdk/03_run_evals.py b/docs/designs/class-based-sdk/03_run_evals.py new file mode 100644 index 0000000000..69cdc272a0 --- /dev/null +++ b/docs/designs/class-based-sdk/03_run_evals.py @@ -0,0 +1,65 @@ +"""Running an evaluation with class applications and evaluators. (POC, does not run.) + +`ag.aevaluate` accepts classes, configured instances, and platform slugs +interchangeably. A class means "use the default Parameters". An instance means +"use these pinned Parameters". A slug means "use what is deployed on Agenta". +""" + +import asyncio + +import agenta as ag + +from application import HotelAgent # 01_application.py +from evaluators import ( + RubricJudge, + StartsCapitalized, + StaysUnderBudget, +) # 02_evaluators.py + + +async def main(): + ag.init() + + result = await ag.aevaluate( + name="hotel-agent-regression", + # A testset slug from Agenta, or inline testcases. Columns map onto + # HotelAgent.Inputs; extra columns (like expected_answer) flow to the + # evaluators' Inputs. + testset="hotel-faq-v2", + application=HotelAgent, + evaluators=[ + StartsCapitalized, + RubricJudge(parameters={"judge_model": "gpt-4.1"}), + StaysUnderBudget(parameters={"max_cost_usd": 0.005}), + ag.evaluators.ExactMatch(correct_answer_key="expected_answer"), + ], + ) + + print(result.url) # link to the evaluation in the Agenta UI + for scenario in result.scenarios: + print(scenario.inputs, scenario.outputs, scenario.metrics) + + # Compare two configurations of the same application in one run. + await ag.aevaluate( + name="prompt-shootout", + testset="hotel-faq-v2", + applications=[ + HotelAgent, # current defaults + HotelAgent(parameters={"top_k": 8}), # candidate config + ], + evaluators=[RubricJudge], + ) + + # Evaluate inline testcases without a stored testset. Handy in CI. + await ag.aevaluate( + application=HotelAgent, + testcases=[ + {"message": "Do you have a pool?", "expected_answer": "Yes, 7am-10pm."}, + {"message": "Is parking free?", "expected_answer": "Yes, for guests."}, + ], + evaluators=[StartsCapitalized], + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/class-based-sdk/04_config_registry.py b/docs/designs/class-based-sdk/04_config_registry.py new file mode 100644 index 0000000000..13ece8a49b --- /dev/null +++ b/docs/designs/class-based-sdk/04_config_registry.py @@ -0,0 +1,83 @@ +"""Pulling configurations from Agenta and referencing them. (POC, does not run.) + +Three patterns: +1. Fetch deployed parameters as a typed object. +2. Bind an instance to an environment, so traces link back to the deployed + revision automatically. +3. Reference a managed config from inside another application's Parameters. + The resolver middleware fetches the referenced revision at invoke time. +""" + +import asyncio + +from pydantic import BaseModel + +import agenta as ag + +from application import HotelAgent # 01_application.py + + +async def main(): + ag.init() + + # --- 1. Fetch deployed parameters, typed and validated ----------------- + # Replaces ConfigManager.get_from_registry + manual dict access. The + # return value is a HotelAgent.Parameters instance, so typos and schema + # drift fail loudly here instead of deep inside the app. + params = await HotelAgent.afetch_parameters(environment="production") + print(params.prompt.messages[0].content) + print(params.hotel_name, params.top_k) + + # Specific variant or revision instead of an environment: + _candidate = await HotelAgent.afetch_parameters(variant="experiment-1") + + # --- 2. Bind an instance to a deployed revision ------------------------- + # Parameters are pulled from the registry, and every trace produced by + # this instance carries application/variant/environment references. No + # manual ag.tracing.store_refs calls. + agent = await HotelAgent.afrom_registry(environment="production") + _result = await agent(message="Do you have a pool?") + + # The invocation link (trace_id + span_id) is available for feedback + # and annotation flows, same as build_invocation_link today. + print(agent.last_invocation.trace_id) + + # --- 3. Reference managed configs from another application ------------- + class RouterAgent(ag.Application): + slug = "router-agent" + + class Parameters(BaseModel): + # An inline default, editable in the playground. + routing_prompt: ag.PromptTemplate = ag.PromptTemplate( + messages=[ag.Message(role="system", content="Route the request.")], + ) + # A reference instead of a value. The resolver middleware fetches + # the deployed revision of hotel-agent's prompt at invoke time, so + # this app always follows what is live in production. The + # playground renders a revision picker for it. + concierge_prompt: ag.PromptTemplate = ag.Reference( + application="hotel-agent", + environment="production", + key="prompt", + ) + + class Inputs(BaseModel): + message: str + + class Outputs(BaseModel): + answer: str + route: str + + async def run(self, *, inputs: Inputs, parameters: Parameters) -> Outputs: + # parameters.concierge_prompt is already resolved here. + ... + + # --- Lifecycle: push and deploy ----------------------------------------- + # apush() commits a new revision with the schemas and defaults compiled + # from the class. adeploy() promotes it to an environment. + revision_id = await HotelAgent.apush() + await HotelAgent.adeploy(revision=revision_id, environment="staging") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/class-based-sdk/05_serve.py b/docs/designs/class-based-sdk/05_serve.py new file mode 100644 index 0000000000..aafc41883b --- /dev/null +++ b/docs/designs/class-based-sdk/05_serve.py @@ -0,0 +1,78 @@ +"""Serving class workflows over HTTP. (POC, does not run.) + +Workflows plug into FastAPI the FastAPI way: each class exposes a standard +APIRouter, and you mount it with app.include_router like any other router. +No custom registration call, no hidden sub-app mounting. + +router() carries two endpoints: +- POST /invoke Typed from the class: the request body model is generated + from Inputs + Parameters, the response model from Outputs. + So /docs shows the real schemas, and clients get validation + errors from FastAPI like on any other endpoint. Streams + (SSE/NDJSON) when the Accept header asks for it and the + class declares stream(). +- POST /inspect Returns the interface: schemas, parameters, identity. + +The playground talks to these endpoints to render the config form and run +the app. +""" + +import agenta as ag +import uvicorn +from fastapi import Depends, FastAPI + +from application import HotelAgent # 01_application.py +from evaluators import RubricJudge # 02_evaluators.py + +ag.init() + +app = FastAPI() + + +# Plain FastAPI composition. +app.include_router(HotelAgent.router(), prefix="/hotel", tags=["hotel"]) + +# Evaluators are workflows too, so a custom evaluator can run as its own +# service and be registered on the platform by URL. +app.include_router(RubricJudge.router(), prefix="/evaluators/rubric-judge") + + +# Everything FastAPI gives you keeps working: dependencies, auth, middleware, +# versioned prefixes. The router does not care where it is mounted. +def verify_internal_token(): # noqa: D103 + ... + + +app.include_router( + HotelAgent.router(), + prefix="/internal/hotel", + dependencies=[Depends(verify_internal_token)], +) + +# A configured instance serves with its pinned parameters as defaults. +boutique = HotelAgent(parameters={"hotel_name": "Hotel California"}) +app.include_router(boutique.router(), prefix="/boutique") + + +# Your own routes live next to them, as usual. +@app.get("/healthz") +def healthz(): + return {"ok": True} + + +# No FastAPI app of your own? A class is directly servable as an ASGI app: +# +# asgi = HotelAgent.asgi() # then: uvicorn 05_serve:asgi +# +# or from the CLI, without writing a server file at all: +# +# agenta serve application:HotelAgent --port 8000 + + +if __name__ == "__main__": + # Resulting endpoints: + # POST /hotel/invoke POST /hotel/inspect + # POST /evaluators/rubric-judge/invoke POST /evaluators/rubric-judge/inspect + # POST /internal/hotel/invoke (token-protected) + # POST /boutique/invoke (pinned parameters) + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/docs/designs/class-based-sdk/06_framework_adapters.py b/docs/designs/class-based-sdk/06_framework_adapters.py new file mode 100644 index 0000000000..520c9a735a --- /dev/null +++ b/docs/designs/class-based-sdk/06_framework_adapters.py @@ -0,0 +1,225 @@ +"""Using agent frameworks inside Agenta workflows. (POC, does not run.) + +The question: if the hotel agent is built with OpenAI Agents SDK, Pydantic AI, +or LangGraph, do you have to restate everything in the class? No. Three tiers, +from full control to one line: + +1. MANUAL - a plain ag.Application that builds the framework agent inside + run(). You decide what is configurable. Most code, no magic. +2. FACTORY - subclass a framework base and implement build(parameters). + You declare Parameters; the base handles running, streaming, + output schemas, and instrumentation. +3. AUTOMATIC - ag.Application.from_agent(agent). An adapter introspects the + agent object and generates Parameters/Inputs/Outputs for you. + +All three tiers compile to the same workflow revision. The common contract +underneath is the AgentAdapter port at the bottom of this file. Frameworks +already emit OpenTelemetry spans (openai-agents tracing, pydantic-ai, LangChain +instrumentors); the adapter routes them into Agenta's tracer so they nest as +child spans under the invocation span. Costs and tokens aggregate as usual. +""" + +import asyncio +from typing import Any, AsyncIterator, Protocol, TypeVar + +from agents import Agent as OpenAIAgent # noqa: E402 (openai-agents SDK, not installed) +from agents import function_tool # noqa: E402 +from langgraph.graph import StateGraph # noqa: E402 +from pydantic import BaseModel +from pydantic_ai import Agent as PydanticAgent # noqa: E402 + +import agenta as ag + + +# ========================================================================= +# Tier 1: MANUAL. Plain ag.Application, Pydantic AI inside run(). +# +# The framework agent is rebuilt per call from parameters. That is the price +# of playground-editable config: the source of truth lives in Agenta, not in +# the agent constructor. Building an Agent object is cheap in every framework. +# ========================================================================= + + +class HotelAgentManual(ag.Application): + slug = "hotel-agent-pydantic-ai" + name = "Hotel Agent (Pydantic AI, manual)" + + class Parameters(BaseModel): + model: str = "openai:gpt-4o-mini" + instructions: str = "You are the concierge of {{hotel_name}}. Be brief." + hotel_name: str = "Grand Agenta Hotel" + + class Inputs(BaseModel): + message: str + + class Outputs(BaseModel): + answer: str + + async def run(self, *, inputs: Inputs, parameters: Parameters) -> Outputs: + agent = PydanticAgent( + parameters.model, + system_prompt=ag.render( + parameters.instructions, hotel_name=parameters.hotel_name + ), + ) + result = await agent.run(inputs.message) + return self.Outputs(answer=result.output) + + +# ========================================================================= +# Tier 2: FACTORY (the port). You implement build(), the base does the rest. +# +# Use this when the automatic tier does not expose enough: conditional tools, +# nested config, multi-agent wiring. There is no run() here. The framework +# base implements run() and stream() by executing whatever build() returns +# (Runner.run for openai-agents, agent.run for pydantic-ai, graph.ainvoke for +# langgraph), and derives Outputs from the agent's output_type when you do +# not declare one. +# ========================================================================= + + +@function_tool +def search_rooms(date: str, guests: int) -> list[dict]: ... + + +@function_tool +def book_room(room_id: str, date: str) -> dict: ... + + +class BookingAnswer(BaseModel): + answer: str + room_id: str | None = None + + +class HotelAgentFactory(ag.ext.openai_agents.Application): + slug = "hotel-agent-openai" + name = "Hotel Agent (OpenAI Agents, factory)" + + class Parameters(BaseModel): + model: str = "gpt-4o-mini" + instructions: str = "You are the hotel concierge. Use tools to answer." + booking_enabled: bool = True + + class Inputs(BaseModel): + message: str + + # Outputs is derived from build().output_type (BookingAnswer) unless + # declared explicitly. + + def build(self, parameters: Parameters) -> OpenAIAgent: + tools = [search_rooms] + ([book_room] if parameters.booking_enabled else []) + return OpenAIAgent( + name="hotel-concierge", + model=parameters.model, + instructions=parameters.instructions, + tools=tools, + output_type=BookingAnswer, + ) + + +# ========================================================================= +# Tier 3: AUTOMATIC (the adapter). Wrap an existing agent, it becomes one +# of us. The adapter is picked by agent type from a registry. +# ========================================================================= + +concierge = OpenAIAgent( + name="concierge", + model="gpt-4o-mini", + instructions="You are the hotel concierge.", + tools=[search_rooms], + output_type=BookingAnswer, +) + +HotelAgentAuto = ag.Application.from_agent(concierge, slug="hotel-agent-auto") +# What the openai-agents adapter extracted: +# Parameters <- model, instructions, model_settings (playground-editable) +# Inputs <- {message: str} (the run input) +# Outputs <- BookingAnswer schema (from output_type) +# handoffs/tools -> listed read-only in metadata, visible in the UI +# Applying edited parameters uses agent.clone(model=..., instructions=...) +# per invocation. The original object is never mutated. + + +# LangGraph maps the cleanest, because graphs already separate the three +# concerns the schema model needs: +# Parameters <- config_schema (the "configurable" section) +# Inputs <- input state schema +# Outputs <- output state schema +# Binding needs no rebuild: graph.ainvoke(inputs, config={"configurable": ...}). + + +class GraphState(BaseModel): + message: str + answer: str | None = None + + +class GraphConfig(BaseModel): + model: str = "gpt-4o-mini" + system_prompt: str = "You are the hotel concierge." + + +builder = StateGraph(GraphState, config_schema=GraphConfig) +# ... add nodes and edges ... +graph = builder.compile() + +HotelGraph = ag.Application.from_agent(graph, slug="hotel-agent-langgraph") + + +async def main(): + ag.init() + + # All three tiers behave identically from here on. + for App in (HotelAgentManual, HotelAgentFactory, HotelAgentAuto, HotelGraph): + await App.apush() + + agent = HotelAgentAuto(parameters={"model": "gpt-4.1"}) + _result = await agent(message="Do you have a pool?") + + await ag.aevaluate( + testset="hotel-faq-v2", + applications=[HotelAgentManual, HotelAgentAuto], + evaluators=["rubric-judge"], + ) + + +# ========================================================================= +# The port: one adapter per framework, registered by agent type. This is +# what tiers 2 and 3 are built on. Third-party frameworks can ship their +# own adapter and register it via entry point. +# ========================================================================= + +AgentT = TypeVar("AgentT") + + +class AgentAdapter(Protocol[AgentT]): + def parameters(self, agent: AgentT) -> tuple[type[BaseModel], dict]: + """The editable config model, and current values read off the agent. + These become schemas.parameters and revision.data.parameters.""" + + def io(self, agent: AgentT) -> tuple[type[BaseModel], type[BaseModel]]: + """Inputs and Outputs models. These become schemas.inputs/outputs.""" + + def bind(self, agent: AgentT, parameters: BaseModel) -> AgentT: + """Apply parameters without mutating the original: clone() for + openai-agents, rebuild for pydantic-ai, configurable for langgraph.""" + + async def invoke(self, agent: AgentT, inputs: BaseModel) -> Any: + """Run once. The result is normalized against Outputs.""" + + def stream(self, agent: AgentT, inputs: BaseModel) -> AsyncIterator[Any]: + """Optional. When the adapter implements it, the wrapped app streams.""" + + +# ag.adapters.register(OpenAIAgent, OpenAIAgentsAdapter()) +# ag.adapters.register(PydanticAgent, PydanticAIAdapter()) +# ag.adapters.register(CompiledStateGraph, LangGraphAdapter()) + +# Honest limits of the automatic tier: the adapter can only expose what the +# agent object carries. Model, instructions, and model settings come for +# free. Prompts buried inside tools, retrievers, or graph nodes are invisible +# to introspection. When you need those in the playground, drop to the +# factory tier and lift them into Parameters yourself. + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/class-based-sdk/07_config_only.py b/docs/designs/class-based-sdk/07_config_only.py new file mode 100644 index 0000000000..b134695221 --- /dev/null +++ b/docs/designs/class-based-sdk/07_config_only.py @@ -0,0 +1,89 @@ +"""Configuration-only workflows: prompt management and beyond. (POC, does not run.) + +Two answers to "I want the config, not the runnable": + +1. Any workflow class already works that way. A pushed class registers its + schemas and parameters on the platform; run() only executes where your + code runs. If you never serve it, the platform side is exactly a managed, + versioned configuration. Just call afetch_parameters() and never invoke. + +2. ag.Configuration makes that intent explicit. It has Parameters and + nothing else: no Inputs, no Outputs, no run(). In the data model it is a + WorkflowRevision with schemas.parameters and parameters, and no handler + URI. The UI renders a config form and version history, no run button. + invoke() and serve() do not exist on it. + +This generalizes prompt management to arbitrary typed config: prompts, +rubrics, routing tables, guardrail lists, feature flags for the LLM layer. +""" + +import asyncio + +from pydantic import BaseModel, Field + +import agenta as ag + + +class ConciergeConfig(ag.Configuration): + slug = "concierge-config" + name = "Concierge Configuration" + description = "Everything the support flow needs, versioned and deployable." + + class Parameters(BaseModel): + prompt: ag.PromptTemplate = ag.PromptTemplate( + messages=[ + ag.Message( + role="system", content="You are the concierge of {{hotel_name}}." + ), + ], + llm_config=ag.ModelConfig(model="gpt-4o-mini"), + ) + escalation_keywords: list[str] = ["lawyer", "refund", "manager"] + model_by_tier: dict[str, str] = Field( + default={"free": "gpt-4o-mini", "pro": "gpt-4.1"}, + description="Routing table, editable without a deploy.", + ) + + +async def main(): + ag.init() + + # Same lifecycle as any workflow: commit a revision, promote it. + await ConciergeConfig.apush() + await ConciergeConfig.adeploy(environment="production") + + # The consuming code does not need to be an Agenta workflow at all. + # This can run inside any service, cron job, or notebook. + config = await ConciergeConfig.afetch(environment="production") + print(config.escalation_keywords) + _model = config.model_by_tier["pro"] + _prompt = config.prompt.format(hotel_name="Grand Agenta Hotel") + + # Sync variant for non-async codebases, cached with a TTL so it is safe + # to call per request. + config = ConciergeConfig.fetch(environment="production") + + # And it composes with everything else in this POC: an application can + # reference it instead of duplicating the values (see 04_config_registry). + class SupportAgent(ag.Application): + slug = "support-agent" + + class Parameters(BaseModel): + concierge: ConciergeConfig.Parameters = ag.Reference( + configuration="concierge-config", + environment="production", + ) + + class Inputs(BaseModel): + message: str + + class Outputs(BaseModel): + answer: str + + async def run(self, *, inputs: Inputs, parameters: Parameters) -> Outputs: + # parameters.concierge is resolved and typed. + ... + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/class-based-sdk/08_testsets.py b/docs/designs/class-based-sdk/08_testsets.py new file mode 100644 index 0000000000..b112173c58 --- /dev/null +++ b/docs/designs/class-based-sdk/08_testsets.py @@ -0,0 +1,96 @@ +"""Testsets as classes. (POC, does not run.) + +A testset class declares its columns as a Pydantic model. That buys: +- the platform validates rows against the schema, on upload and on edit +- typed iteration in code, no dict["colunm_typo"] surprises +- fail-fast compatibility checks: before an evaluation runs anything, the + testset columns are checked against the application's Inputs and every + evaluator's Inputs + +The schema compiles into the same JSON Schema machinery as everything else +in this POC, so the testset UI can render typed cells (numbers, enums, +nested JSON) instead of treating every column as text. +""" + +import asyncio + +from pydantic import BaseModel, Field + +import agenta as ag + +from application import HotelAgent # 01_application.py +from evaluators import RubricJudge # 02_evaluators.py + + +class HotelFAQ(ag.Testset): + slug = "hotel-faq" + name = "Hotel FAQ" + + class Case(BaseModel): + """One row. Becomes the testset's column schema.""" + + message: str + persona: str = "guest" + expected_answer: str + difficulty: int = Field(1, ge=1, le=3) + + # Optional seed data, committed with the class on first push. + cases = [ + Case( + message="Do you have a pool?", + expected_answer="Yes, open 7am to 10pm.", + ), + Case( + message="Can I check in at 6am?", + expected_answer="Early check-in depends on availability.", + difficulty=2, + ), + ] + + +async def main(): + ag.init() + + await HotelFAQ.apush() + + # Typed access. Editors keep working in the UI; this pulls their edits. + testset = await HotelFAQ.afetch() + for case in testset: + print(case.message, case.difficulty) + + # Append programmatically, validated against Case. + await HotelFAQ.aadd( + cases=[HotelFAQ.Case(message="Is parking free?", expected_answer="Yes.")] + ) + + # Curate from production traces: map spans onto typed cases. + await HotelFAQ.afrom_traces( + filter=ag.TraceFilter( + application="hotel-agent", annotations={"thumbs": "down"} + ), + map=lambda span: HotelFAQ.Case( + message=span.inputs["message"], + expected_answer=span.annotations["corrected_answer"], + ), + ) + + # The payoff: compatibility is checked before anything runs. + # HotelFAQ.Case covers HotelAgent.Inputs (message, persona)? yes + # HotelFAQ.Case covers RubricJudge.Inputs (expected_answer)? yes + # A missing or mistyped column raises here, not after 200 LLM calls. + await ag.aevaluate( + testset=HotelFAQ, + application=HotelAgent, + evaluators=[RubricJudge], + ) + + # Mismatches fail loudly and early: + # + # await ag.aevaluate(testset=HotelFAQ, application=FlightAgent, ...) + # IncompatibleTestsetError: FlightAgent.Inputs requires column + # 'origin' (str); testset 'hotel-faq' has columns + # [message, persona, expected_answer, difficulty] + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/class-based-sdk/README.md b/docs/designs/class-based-sdk/README.md new file mode 100644 index 0000000000..04fa649689 --- /dev/null +++ b/docs/designs/class-based-sdk/README.md @@ -0,0 +1,64 @@ +# Class-based SDK (POC) + +**Status: design proposal. None of this code runs.** `ag.Application`, `ag.Evaluator`, +`ag.serve`, and the related methods do not exist yet. This folder shows what the +authoring experience would look like if the SDK were class-oriented. + +## The idea + +Today users author applications and evaluators by decorating plain functions. Schemas +are inferred from function signatures, and the settings/inputs/outputs contracts are +untyped dicts. + +In this design, the class **is** the workflow. You subclass `ag.Application` or +`ag.Evaluator`, declare three inner Pydantic models, and implement one method. The +class compiles directly into the existing workflow data models: + +| You write | It becomes | +|---|---| +| `class Parameters(BaseModel)` | `schemas.parameters` (the playground-editable config) | +| `class Inputs(BaseModel)` | `schemas.inputs` (runtime inputs / testset columns) | +| `class Outputs(BaseModel)` | `schemas.outputs` (what evaluators receive) | +| `run()` / `evaluate()` | the registered handler, auto-instrumented | +| `slug`, `name`, `description` | the workflow identity on the platform | +| `MyApp(parameters={...})` | pinned revision parameters | + +Evaluators stay what they already are in the backend: workflows with +`is_evaluator=True`. The class layer is a typed front-end over `WorkflowRevision`, +not a parallel system. Instrumentation, middleware, the handler registry, +`invoke`/`inspect`, serving, and upsert all reuse the existing engine. + +## Files, in reading order + +1. `01_application.py` - an application as a class: typed config, inputs, outputs, + instrumentation with child spans, optional streaming, local calls, push. +2. `02_evaluators.py` - evaluators as classes: a heuristic check, an LLM judge with + its own settings, a trace-based cost check, and a typed builtin. +3. `03_run_evals.py` - running an evaluation with class applications and evaluators. +4. `04_config_registry.py` - pulling deployed configurations from Agenta, binding an + instance to an environment, and referencing managed configs from other apps. +5. `05_serve.py` - serving with plain FastAPI: every class exposes a standard + `APIRouter` (typed `/invoke` and `/inspect` endpoints), mounted with + `app.include_router`. Streaming negotiation, auth via FastAPI dependencies, + and a standalone ASGI/CLI path for the no-boilerplate case. +6. `06_framework_adapters.py` - bringing agents built with OpenAI Agents SDK, + Pydantic AI, or LangGraph into Agenta. Three tiers: manual (framework + inside `run()`), factory (implement `build()`, the base does the rest), and + automatic (`ag.Application.from_agent(agent)`). The `AgentAdapter` port at + the bottom is the contract all of it sits on. +7. `07_config_only.py` - configuration-only workflows (`ag.Configuration`): + Parameters without a runnable. Prompt management generalized to any typed, + versioned, deployable config. +8. `08_testsets.py` - testsets as classes: typed columns, validated rows, + curation from traces, and fail-fast compatibility checks against + application and evaluator inputs. + +## What carries over from the current SDK + +- `@ag.instrument()` still exists and works on any method for child spans. The + `run`/`evaluate` methods are instrumented automatically, like decorated handlers + are today. +- The middleware chain (vault, resolver, normalizer) runs unchanged under + `.invoke()`. +- Decorators do not go away. They stay as the quick path for wrapping existing + functions. Both compile to the same engine. From 766056f6458f0030afdb08f1960e92ed30b7fe6f Mon Sep 17 00:00:00 2001 From: Juan Pablo Vega Date: Thu, 11 Jun 2026 16:42:35 +0200 Subject: [PATCH 2/2] Add functional SDK extensions --- docs/designs/class-based-sdk/00_core.py | 205 ++++++++++ .../designs/class-based-sdk/01_application.py | 4 + docs/designs/class-based-sdk/02_evaluators.py | 4 + .../class-based-sdk/06_framework_adapters.py | 4 + .../designs/class-based-sdk/07_config_only.py | 5 + docs/designs/class-based-sdk/08_testsets.py | 3 + docs/designs/class-based-sdk/README.md | 21 +- docs/designs/function-based-sdk/00_core.py | 164 ++++++++ .../function-based-sdk/01_application.py | 147 +++++++ .../function-based-sdk/02_evaluators.py | 139 +++++++ .../function-based-sdk/03_run_evals.py | 71 ++++ .../function-based-sdk/04_config_registry.py | 99 +++++ docs/designs/function-based-sdk/05_serve.py | 79 ++++ .../06_framework_adapters.py | 240 ++++++++++++ .../function-based-sdk/07_config_only.py | 102 +++++ .../designs/function-based-sdk/08_testsets.py | 105 +++++ docs/designs/function-based-sdk/README.md | 117 ++++++ .../functional-based-class-sdk/00_core.py | 362 ++++++++++++++++++ .../01_application.py | 149 +++++++ .../02_evaluators.py | 146 +++++++ .../03_run_evals.py | 64 ++++ .../04_config_registry.py | 64 ++++ .../functional-based-class-sdk/05_serve.py | 56 +++ .../06_framework_adapters.py | 73 ++++ .../07_config_only.py | 112 ++++++ .../functional-based-class-sdk/08_testsets.py | 110 ++++++ .../functional-based-class-sdk/README.md | 96 +++++ 27 files changed, 2738 insertions(+), 3 deletions(-) create mode 100644 docs/designs/class-based-sdk/00_core.py create mode 100644 docs/designs/function-based-sdk/00_core.py create mode 100644 docs/designs/function-based-sdk/01_application.py create mode 100644 docs/designs/function-based-sdk/02_evaluators.py create mode 100644 docs/designs/function-based-sdk/03_run_evals.py create mode 100644 docs/designs/function-based-sdk/04_config_registry.py create mode 100644 docs/designs/function-based-sdk/05_serve.py create mode 100644 docs/designs/function-based-sdk/06_framework_adapters.py create mode 100644 docs/designs/function-based-sdk/07_config_only.py create mode 100644 docs/designs/function-based-sdk/08_testsets.py create mode 100644 docs/designs/function-based-sdk/README.md create mode 100644 docs/designs/functional-based-class-sdk/00_core.py create mode 100644 docs/designs/functional-based-class-sdk/01_application.py create mode 100644 docs/designs/functional-based-class-sdk/02_evaluators.py create mode 100644 docs/designs/functional-based-class-sdk/03_run_evals.py create mode 100644 docs/designs/functional-based-class-sdk/04_config_registry.py create mode 100644 docs/designs/functional-based-class-sdk/05_serve.py create mode 100644 docs/designs/functional-based-class-sdk/06_framework_adapters.py create mode 100644 docs/designs/functional-based-class-sdk/07_config_only.py create mode 100644 docs/designs/functional-based-class-sdk/08_testsets.py create mode 100644 docs/designs/functional-based-class-sdk/README.md diff --git a/docs/designs/class-based-sdk/00_core.py b/docs/designs/class-based-sdk/00_core.py new file mode 100644 index 0000000000..a8df0d691b --- /dev/null +++ b/docs/designs/class-based-sdk/00_core.py @@ -0,0 +1,205 @@ +"""The base classes this folder assumes. (POC, does not run.) + +The numbered files 01-08 write `class HotelAgent(ag.Application)`, `class +RubricJudge(ag.Evaluator)`, etc. This file is where those bases come from — +implemented CLASS-FIRST, as the native foundation of the class-only proposal. + +Compare the three `00_*` foundations across the peer folders: + + ./00_core.py bases as native classes (this file) + ../functional-based-class-sdk/00_core.py the SAME bases on the functional core + ../function-based-sdk/00_core.py the function form's foundation (no + base classes — decorators + closures) + +In this native version each base registers directly: its `__init_subclass__` +reads the inner Parameters/Inputs/Outputs models and the run/evaluate method off +the subclass and calls the engine. There is no decorator/function layer beneath +— the class is the registration path. + +Grounded in the real SDK (sdks/python/agenta/sdk/): +- `decorators/running.py` has the `Workflow` handle and the `workflow` + registrar this delegates to. +- `models/workflows.py::WorkflowFlags` encodes kind as flags: `is_application`, + `is_evaluator`, `has_handler`. Application and Evaluator are the same workflow + with a different flag; Configuration has `has_handler=False`. + +`Testset` is intentionally NOT a workflow (no parameters, no handler, no schemas +triple, absent from WorkflowFlags). It gets its own base at the bottom. +""" + +from __future__ import annotations + +from typing import Any, Optional + +from agenta import workflow as _register # the existing `workflow` registrar +from agenta.sdk.models.workflows import WorkflowFlags + + +# ========================================================================= +# Shared base. `__init_subclass__` compiles the inner models + handler method +# into a registered workflow. Subclasses set `_flags` (kind) and `_handler_name` +# (which method is the handler — None for config-only). +# ========================================================================= + + +class _WorkflowClass: + _handler_name: Optional[str] = "run" + + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + if not getattr(cls, "slug", None): + return # a base (Application/Evaluator/...), not a user workflow + + instance = cls() # holds __init__ resources (clients, indexes) + method = getattr(cls, cls._handler_name, None) if cls._handler_name else None + + schemas = _compile_schemas( + getattr(cls, "Parameters", None), + getattr(cls, "Inputs", None), + getattr(cls, "Outputs", None), + ) + + registrar = _register( + slug=cls.slug, + name=getattr(cls, "name", None), + description=getattr(cls, "description", None), + flags=cls._flags().model_dump(), + schemas=schemas, + ) + + if method is None: + cls._handle = registrar # config-only: schemas, no handler + else: + + async def handler(**kw): + return await method(instance, **kw) + + cls._handle = registrar(handler) + + @classmethod + def _flags(cls) -> WorkflowFlags: # overridden per kind + raise NotImplementedError + + # Authoring/lifecycle surface — same names the examples call. + def __init__(self, *, parameters: dict | None = None): + self._bound = ( + type(self)._handle.pin(**parameters) if parameters else type(self)._handle + ) + + async def __call__(self, **kw): + return await self._bound(**kw) + + def __getattr__(self, item): + # A configured instance delegates router/invoke/inspect/... to its + # pinned handle. + return getattr(object.__getattribute__(self, "_bound"), item) + + @classmethod + async def apush(cls): + return await cls._handle.push() + + @classmethod + async def adeploy(cls, **k): + return await cls._handle.deploy(**k) + + @classmethod + async def inspect(cls): + return await cls._handle.inspect() + + @classmethod + def router(cls, *a, **k): + return cls._handle.router(*a, **k) + + @classmethod + async def afetch_parameters(cls, **k): + return await cls._handle.fetch_parameters(**k) + + @classmethod + async def afrom_registry(cls, **k): + return await cls._handle.from_registry(**k) + + +def _compile_schemas(parameters, inputs, outputs) -> dict: + return { + "parameters": parameters.model_json_schema() if parameters else None, + "inputs": inputs.model_json_schema() if inputs else None, + "outputs": outputs.model_json_schema() if outputs else None, + } + + +# ========================================================================= +# The kinds. Each is two lines: which handler method, which flag. +# ========================================================================= + + +class Application(_WorkflowClass): + _handler_name = "run" + + @classmethod + def _flags(cls): + return WorkflowFlags(is_application=True, has_handler=True) + + +class Evaluator(_WorkflowClass): + _handler_name = "evaluate" + + @classmethod + def _flags(cls): + return WorkflowFlags(is_evaluator=True, has_handler=True) + + +class Configuration(_WorkflowClass): + _handler_name = None # config-only: no runnable + + @classmethod + def _flags(cls): + return WorkflowFlags(is_application=True, has_handler=False) + + @classmethod + async def afetch(cls, **k): + return await cls._handle.fetch(**k) + + @classmethod + def fetch(cls, **k): + return cls._handle.fetch_sync(**k) + + +# ========================================================================= +# NOT a workflow. Testset is a typed row collection — inner `Case` + seed +# `cases`, no parameters/handler/schemas-triple, absent from WorkflowFlags. Its +# own base, so the hierarchy matches the data model. +# ========================================================================= + + +class Testset: + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + if not getattr(cls, "slug", None): + return + cls._handle = _register_testset( + slug=cls.slug, + name=getattr(cls, "name", None), + case=cls.Case, + cases=getattr(cls, "cases", None), + ) + + @classmethod + async def apush(cls) -> Any: + return await cls._handle.push() + + @classmethod + async def afetch(cls) -> Any: + return await cls._handle.fetch() + + @classmethod + async def aadd(cls, **k) -> Any: + return await cls._handle.add(**k) + + @classmethod + async def afrom_traces(cls, **k) -> Any: + return await cls._handle.from_traces(**k) + + +# In the real SDK this is the testset registrar (column schema = Case), a +# different engine path from `workflow`. Stubbed here as the contract. +def _register_testset(*, slug, name, case, cases): ... diff --git a/docs/designs/class-based-sdk/01_application.py b/docs/designs/class-based-sdk/01_application.py index 2247577de2..8ce3b8b40f 100644 --- a/docs/designs/class-based-sdk/01_application.py +++ b/docs/designs/class-based-sdk/01_application.py @@ -16,6 +16,10 @@ import agenta as ag +from core import Application # 00_core.py — the native class base + +ag.Application = Application # what the SDK __init__ would export + class HotelAgent(ag.Application): slug = "hotel-agent" diff --git a/docs/designs/class-based-sdk/02_evaluators.py b/docs/designs/class-based-sdk/02_evaluators.py index 963bb6bfed..00f7646c67 100644 --- a/docs/designs/class-based-sdk/02_evaluators.py +++ b/docs/designs/class-based-sdk/02_evaluators.py @@ -19,6 +19,10 @@ import agenta as ag +from core import Evaluator # 00_core.py — the native class base + +ag.Evaluator = Evaluator # what the SDK __init__ would export + class StartsCapitalized(ag.Evaluator): """The smallest possible evaluator: no settings, one metric.""" diff --git a/docs/designs/class-based-sdk/06_framework_adapters.py b/docs/designs/class-based-sdk/06_framework_adapters.py index 520c9a735a..c54804fce6 100644 --- a/docs/designs/class-based-sdk/06_framework_adapters.py +++ b/docs/designs/class-based-sdk/06_framework_adapters.py @@ -30,6 +30,10 @@ import agenta as ag +from core import Application # 00_core.py — the native class base + +ag.Application = Application # what the SDK __init__ would export + # ========================================================================= # Tier 1: MANUAL. Plain ag.Application, Pydantic AI inside run(). diff --git a/docs/designs/class-based-sdk/07_config_only.py b/docs/designs/class-based-sdk/07_config_only.py index b134695221..873df781e1 100644 --- a/docs/designs/class-based-sdk/07_config_only.py +++ b/docs/designs/class-based-sdk/07_config_only.py @@ -23,6 +23,11 @@ import agenta as ag +from core import Application, Configuration # 00_core.py — native class bases + +ag.Configuration = Configuration # what the SDK __init__ would export +ag.Application = Application # for the SupportAgent reference in main() + class ConciergeConfig(ag.Configuration): slug = "concierge-config" diff --git a/docs/designs/class-based-sdk/08_testsets.py b/docs/designs/class-based-sdk/08_testsets.py index b112173c58..f20cd555f7 100644 --- a/docs/designs/class-based-sdk/08_testsets.py +++ b/docs/designs/class-based-sdk/08_testsets.py @@ -19,8 +19,11 @@ import agenta as ag from application import HotelAgent # 01_application.py +from core import Testset # 00_core.py — its own base, NOT a Workflow from evaluators import RubricJudge # 02_evaluators.py +ag.Testset = Testset # what the SDK __init__ would export + class HotelFAQ(ag.Testset): slug = "hotel-faq" diff --git a/docs/designs/class-based-sdk/README.md b/docs/designs/class-based-sdk/README.md index 04fa649689..6ff2e3d51b 100644 --- a/docs/designs/class-based-sdk/README.md +++ b/docs/designs/class-based-sdk/README.md @@ -1,8 +1,23 @@ # Class-based SDK (POC) -**Status: design proposal. None of this code runs.** `ag.Application`, `ag.Evaluator`, -`ag.serve`, and the related methods do not exist yet. This folder shows what the -authoring experience would look like if the SDK were class-oriented. +**Status: design proposal. None of this code runs.** This folder shows what the +authoring experience would look like if the SDK were class-oriented. The bases +the examples subclass — `ag.Application`, `ag.Evaluator`, `ag.Configuration`, +`ag.Testset` — are defined in [`00_core.py`](00_core.py), implemented natively +as classes (no functional layer underneath). That is this folder's foundation. + +> One of three peer demos under `docs/designs/`, with aligned filenames +> (`00_*` foundation + `01_application.py` … `08_testsets.py`) so they diff 1:1: +> +> | folder | `00_*` foundation | `01`–`08` | +> |---|---|---| +> | [class-based-sdk/](.) (this one) | `00_core.py` — bases as **native classes** | class API | +> | [function-based-sdk/](../function-based-sdk/) | `00_core.py` — no base classes; decorators + closures | function API | +> | [functional-based-class-sdk/](../functional-based-class-sdk/) | `00_core.py` — the **same bases on the functional core** | class API as sugar | +> +> Same `01`–`08` examples everywhere; three different `00_*` foundations. The +> third folder proves the class API is sugar by rebuilding `00_core.py` on top +> of the function front-end. ## The idea diff --git a/docs/designs/function-based-sdk/00_core.py b/docs/designs/function-based-sdk/00_core.py new file mode 100644 index 0000000000..d8eac8e363 --- /dev/null +++ b/docs/designs/function-based-sdk/00_core.py @@ -0,0 +1,164 @@ +"""The foundation the function form's 01-08 sit on. (POC, does not run.) + +Each peer folder has a `00_core.py` holding what its numbered files build on: + + ../class-based-sdk/00_core.py the base CLASSES (ag.Application, ...) + ../functional-based-class-sdk/00_core.py those same bases on the function core + ./00_core.py (this file) the function form has NO base classes + +The function form needs no base class: you decorate, you don't subclass. So this +folder's "core" is not a type hierarchy — it is the conventions the decorator +makes possible. The headline one, which the class form cannot match, is closures: +factories that keep Parameters/Inputs/Outputs (and clients, captured config) +PRIVATE to the workflow instead of public attributes on a class. + +That is why this file looks different from the other two `00_core.py`: there is +no `Workflow`/`Application` base to define here. The foundation is the decorator +plus closure scoping. (The numbered files 01-08 still diff 1:1 across folders; +this file has no per-kind counterpart — it is the function form's foundation.) + +== Closures: scoped, private schemas the class cannot match == + +The other files declare Parameters/Inputs/Outputs at module scope. That works, +but it leaks three types into the module namespace. A closure factory keeps them +private to the workflow — captured in the enclosing scope, invisible to the rest +of the module — while the decorator still registers them, because they are +passed to it explicitly. Registration needs the *value*, not an importable name. + +This is the functional analogue of instance state, and on encapsulation it beats +the class outright: + + class: Parameters is HotelAgent.Parameters -> always public, always + reachable, namespace permanently occupied. + closure: Parameters lives in the factory's local scope -> private by + default, exposed only if the factory chooses to return it. + +Three patterns below: fully private, selectively exposed, and a config-bound +factory that captures a deployed revision once and closes over it. +""" + +from __future__ import annotations + +import asyncio + +from openai import AsyncOpenAI +from pydantic import BaseModel, Field + +import agenta as ag + + +# ========================================================================= +# 1. FULLY PRIVATE. Schemas + client live inside the factory. Nothing leaks; +# the module sees only `hotel_agent`, the handle. +# ========================================================================= + + +def make_hotel_agent() -> ag.Workflow: + client = AsyncOpenAI() # captured once, like instance state — no module global + + class Parameters(BaseModel): + hotel_name: str = "Grand Agenta Hotel" + top_k: int = Field(4, ge=1, le=20) + + class Inputs(BaseModel): + message: str + + class Outputs(BaseModel): + answer: str + sources: list[str] = [] + + @ag.application( + slug="hotel-agent", + name="Hotel Agent", + parameters=Parameters, # passed by value: registered without being importable + inputs=Inputs, + outputs=Outputs, + ) + async def run(*, inputs: Inputs, parameters: Parameters) -> Outputs: + prompt = f"You are the concierge of {parameters.hotel_name}. {inputs.message}" + response = await client.chat.completions.create( + model="gpt-4o-mini", + messages=[{"role": "user", "content": prompt}], + ) + return Outputs(answer=response.choices[0].message.content, sources=[]) + + return run # the handle; Parameters/Inputs/Outputs are now unreachable + + +hotel_agent = make_hotel_agent() +# Parameters does not exist out here. There is no `hotel_agent.Parameters` leak, +# no module-level `Parameters` symbol. The schema still registered fine. + + +# ========================================================================= +# 2. SELECTIVELY EXPOSED. Keep them private, but hand back exactly the ones +# callers legitimately need (e.g. Parameters, to build a pinned config), +# while Inputs/Outputs stay sealed. The class forces all-or-nothing: every +# inner model is public the moment the class is. +# ========================================================================= + + +def make_router_agent(): + class Parameters(BaseModel): + routing_prompt: str = "Route the request." + + class Inputs(BaseModel): # stays private + message: str + + class Outputs(BaseModel): # stays private + answer: str + route: str + + @ag.application( + slug="router-agent", parameters=Parameters, inputs=Inputs, outputs=Outputs + ) + async def run(*, inputs: Inputs, parameters: Parameters) -> Outputs: ... + + # Expose only what the caller needs to construct configs; seal the rest. + return run, Parameters + + +router_agent, RouterParameters = make_router_agent() +_pinned = router_agent.pin(routing_prompt="Be terse.") +_typed = RouterParameters(routing_prompt="Be terse.") # usable; Inputs/Outputs are not + + +# ========================================================================= +# 3. CONFIG-BOUND FACTORY. Capture a deployed revision (or any resource) once, +# close over it, and return a handle that never re-fetches. The captured +# config is private state — the closure is the binding. With a class you'd +# reach for __init__ + an instance attribute; the closure needs neither. +# ========================================================================= + + +async def make_bound_agent(environment: str) -> ag.Workflow: + # Fetched once, at factory time, then captured. Every call uses it; no + # per-request lookup, no instance to thread it through. + deployed = await hotel_agent.fetch_parameters(environment=environment) + + class Inputs(BaseModel): + message: str + + class Outputs(BaseModel): + answer: str + + @ag.application(slug="hotel-agent-bound", inputs=Inputs, outputs=Outputs) + async def run(*, inputs: Inputs) -> Outputs: + # `deployed` is closed over — the binding lives in scope, not in config. + return Outputs(answer=f"[{deployed.hotel_name}] {inputs.message}") + + return run + + +async def main(): + ag.init() + + result = await hotel_agent(message="Do you have a pool?") + print(result.answer) + + bound = await make_bound_agent(environment="production") + await bound(message="Is breakfast included?") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/function-based-sdk/01_application.py b/docs/designs/function-based-sdk/01_application.py new file mode 100644 index 0000000000..e115ed2e53 --- /dev/null +++ b/docs/designs/function-based-sdk/01_application.py @@ -0,0 +1,147 @@ +"""An application as a function. (POC, does not run.) + +The function is the workflow: +- `Parameters`, `Inputs`, `Outputs` are plain models, passed to the decorator. + They compile to the revision's JSON schemas, declared not inferred. +- The decorated function is the handler. It is auto-instrumented: one root + span per invocation with inputs, outputs, cost, and token capture. +- `@handler.stream` is optional. Declare a second function only if the app + can stream. +- `slug`, `name`, `description` map to the workflow identity on the platform. + +`@ag.application(...)` returns a *handle*, not a bare function: it is callable +(local typed invocation) and also carries `.pin`, `.invoke`, `.inspect`, +`.push`, `.router`, `.from_registry`, `.fetch_parameters`. The handle is the +functional equivalent of a class instance. +""" + +import asyncio +from typing import AsyncIterator + +from openai import AsyncOpenAI +from pydantic import BaseModel, Field + +import agenta as ag + +# Request-independent resources live at module scope. They are the functional +# equivalent of the class's `__init__` (clients, retrievers, indexes). They are +# built once, not per request. Parameters arrive per request, not here. +client = AsyncOpenAI() + + +class Parameters(BaseModel): + """Playground-editable configuration. Becomes schemas.parameters.""" + + prompt: ag.PromptTemplate = ag.PromptTemplate( + messages=[ + ag.Message( + role="system", + content="You are the concierge of {{hotel_name}}. Be brief.", + ), + ag.Message(role="user", content="{{message}}"), + ], + llm_config=ag.ModelConfig(model="gpt-4o-mini", temperature=0.2), + ) + hotel_name: str = "Grand Agenta Hotel" + top_k: int = Field(4, ge=1, le=20, description="Documents to retrieve") + + +class Inputs(BaseModel): + """Runtime inputs. Becomes schemas.inputs and the expected testset columns.""" + + message: str + persona: str = "guest" + + +class Outputs(BaseModel): + """Becomes schemas.outputs. This is what evaluators receive as `outputs`.""" + + answer: str + sources: list[str] = [] + + +# Nested steps use @ag.instrument as today. They show up as child spans under +# the invocation span. Defined before the handler so it can call them. +@ag.instrument(type="retriever") +async def retrieve(*, query: str, top_k: int) -> list[dict]: + return [{"id": "faq-12", "text": "The pool is open 7am to 10pm."}] + + +@ag.application( + slug="hotel-agent", + name="Hotel Agent", + description="Concierge agent that answers questions about the hotel.", + parameters=Parameters, # schemas declared explicitly, not inferred + inputs=Inputs, + outputs=Outputs, +) +async def hotel_agent(*, inputs: Inputs, parameters: Parameters) -> Outputs: + documents = await retrieve(query=inputs.message, top_k=parameters.top_k) + + prompt = parameters.prompt.format( + hotel_name=parameters.hotel_name, + message=inputs.message, + ) + response = await client.chat.completions.create(**prompt.to_openai_kwargs()) + + return Outputs( + answer=response.choices[0].message.content, + sources=[doc["id"] for doc in documents], + ) + + +# Optional. When declared, the server can negotiate streaming responses +# (SSE / NDJSON) from the Accept header. Batch callers still get the handler. +# No introspection guessing whether a `stream` method exists: the handle simply +# does, or does not, have a registered streamer. +@hotel_agent.stream +async def hotel_agent_stream( + *, inputs: Inputs, parameters: Parameters +) -> AsyncIterator[str]: + prompt = parameters.prompt.format( + hotel_name=parameters.hotel_name, + message=inputs.message, + ) + stream = await client.chat.completions.create( + stream=True, **prompt.to_openai_kwargs() + ) + async for chunk in stream: + yield chunk.choices[0].delta.content or "" + + +async def main(): + ag.init() + + # Local typed call. Traced like any decorated handler today. + result = await hotel_agent(message="Do you have a pool?") + print(result.answer, result.sources) + + # Pin a configuration. `.pin()` is a functools.partial over parameters: + # it returns a new handle with these values baked in as defaults. They + # become revision.data.parameters when the handle is pushed. This is the + # "constructor builder with partials" — no instance, no __init__. + boutique = hotel_agent.pin(hotel_name="Hotel California", top_k=8) + result = await boutique(message="Can I check out any time I like?") + + # Full pipeline (vault -> resolver -> normalizer), same code path the + # platform runs. Plain kwargs; the SDK builds the wire request internally. + # WorkflowInvokeRequest stays the HTTP body format for /invoke; local + # callers never construct it. + _response = await hotel_agent.invoke( + inputs={"message": "Is breakfast included?"}, + parameters={"top_k": 2}, + ) + + # Inspect returns the interface. Schemas come straight from the models you + # passed to the decorator, no signature inference. + interface = await hotel_agent.inspect() + print(interface.data.schemas.outputs) + + # Push to Agenta: creates or updates the application and returns the new + # revision. Schemas and default parameters travel with it. + revision_id = await hotel_agent.push() + print(f"pushed revision {revision_id}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/function-based-sdk/02_evaluators.py b/docs/designs/function-based-sdk/02_evaluators.py new file mode 100644 index 0000000000..f68a95edd2 --- /dev/null +++ b/docs/designs/function-based-sdk/02_evaluators.py @@ -0,0 +1,139 @@ +"""Evaluators as functions. (POC, does not run.) + +An evaluator is a workflow with `is_evaluator=True`, so the authoring model is +identical to applications. The differences: +- you decorate with `@ag.evaluator(...)` and your function returns metrics +- `inputs` are the testcase columns the evaluator consumes (permissive by + default, since batch mode passes every column) +- the function also receives `outputs` (the application output under + evaluation) and `trace` (the full execution trace) + +`parameters=` are the evaluator settings users edit in the UI. `outputs=` is +the declared metrics schema, which the UI uses to render score columns. + +Note how the trivial case has no boilerplate: no class body, no inner models +you do not need. The smallest evaluator is one decorated function. +""" + +import asyncio + +from openai import AsyncOpenAI +from pydantic import BaseModel, ConfigDict, Field + +import agenta as ag + +client = AsyncOpenAI() + + +# The smallest possible evaluator: no settings, one metric. A class proposal +# spends a class statement, a docstring, and an inner Outputs model on this. +# Here it is the function plus its return model. +class StartsCapitalizedOut(BaseModel): + score: float + success: bool + + +@ag.evaluator( + slug="starts-capitalized", name="Starts Capitalized", outputs=StartsCapitalizedOut +) +async def starts_capitalized(*, outputs, **_) -> StartsCapitalizedOut: + text = outputs["answer"] if isinstance(outputs, dict) else str(outputs) + ok = bool(text) and text[0].isupper() + return StartsCapitalizedOut(score=1.0 if ok else 0.0, success=ok) + + +# LLM-as-a-judge with its own typed settings. The settings render in the +# evaluator config UI from schemas.parameters, exactly like builtin evaluator +# settings do today. +class RubricParams(BaseModel): + judge_model: str = "gpt-4o-mini" + rubric: str = Field( + "The answer is factual, polite, and under 100 words.", + json_schema_extra={"x-ag-type": "text"}, + ) + + +class RubricInputs(BaseModel): + # Evaluators receive all testcase columns. Declare the ones you use, + # allow the rest. + model_config = ConfigDict(extra="allow") + expected_answer: str | None = None + + +class RubricOut(BaseModel): + score: float = Field(ge=0.0, le=1.0) + verdict: str + + +@ag.evaluator( + slug="rubric-judge", + name="Rubric Judge", + description="Grades the answer against a rubric with an LLM.", + parameters=RubricParams, + inputs=RubricInputs, + outputs=RubricOut, +) +async def rubric_judge( + *, inputs: RubricInputs, outputs, parameters: RubricParams, **_ +) -> RubricOut: + response = await client.chat.completions.create( + model=parameters.judge_model, + messages=[ + {"role": "system", "content": f"Grade against: {parameters.rubric}"}, + {"role": "user", "content": str(outputs)}, + ], + ) + verdict = response.choices[0].message.content + return RubricOut(score=0.9, verdict=verdict) + + +# A trace-based evaluator. `trace` exposes the full invocation: spans, costs, +# tokens, tool calls. Same data the custom code evaluator gets today, but typed +# access instead of a raw dict. +class BudgetParams(BaseModel): + max_cost_usd: float = 0.01 + + +class BudgetOut(BaseModel): + success: bool + cost_usd: float + + +@ag.evaluator( + slug="stays-under-budget", + name="Stays Under Budget", + parameters=BudgetParams, + outputs=BudgetOut, +) +async def stays_under_budget( + *, trace: ag.Trace, parameters: BudgetParams, **_ +) -> BudgetOut: + cost = trace.metrics.costs.cumulative + return BudgetOut(success=cost <= parameters.max_cost_usd, cost_usd=cost) + + +async def main(): + ag.init() + + # Local typed call against a single output. Useful for unit tests. `.pin()` + # bakes in settings, same partial mechanism as applications. + judge = rubric_judge.pin(judge_model="gpt-4.1") + result = await judge( + outputs={"answer": "The pool is open 7am to 10pm."}, + expected_answer="Pool hours are 7-22.", + ) + print(result.score, result.verdict) + + # Builtins are factory functions: typed configurators over the existing + # agenta:builtin:* handlers, instead of settings dicts. They return handles + # too, so they slot into aevaluate next to your own. + _exact = ag.evaluators.exact_match(correct_answer_key="expected_answer") + _similarity = ag.evaluators.semantic_similarity(threshold=0.8) + + # Push registers the evaluator on the platform with its compiled schemas. + # It then shows up in the UI next to the builtins. + await rubric_judge.push() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/function-based-sdk/03_run_evals.py b/docs/designs/function-based-sdk/03_run_evals.py new file mode 100644 index 0000000000..7712eaf2e3 --- /dev/null +++ b/docs/designs/function-based-sdk/03_run_evals.py @@ -0,0 +1,71 @@ +"""Running an evaluation with function workflows. (POC, does not run.) + +`ag.aevaluate` accepts handles, pinned handles, and platform slugs +interchangeably. A bare handle means "use the default parameters". A pinned +handle (`.pin(...)`) means "use these baked-in parameters". A slug means "use +what is deployed on Agenta". + +This is identical to the class proposal, with one simplification: there is no +"class vs instance" distinction. There is one kind of thing — a handle — and +`.pin()` produces another handle. Default and pinned are the same type. +""" + +import asyncio + +import agenta as ag + +from application import hotel_agent # 01_application.py +from evaluators import ( # 02_evaluators.py + rubric_judge, + starts_capitalized, + stays_under_budget, +) + + +async def main(): + ag.init() + + result = await ag.aevaluate( + name="hotel-agent-regression", + # A testset slug from Agenta, or inline testcases. Columns map onto + # the application's Inputs; extra columns (like expected_answer) flow + # to the evaluators' inputs. + testset="hotel-faq-v2", + application=hotel_agent, + evaluators=[ + starts_capitalized, + rubric_judge.pin(judge_model="gpt-4.1"), + stays_under_budget.pin(max_cost_usd=0.005), + ag.evaluators.exact_match(correct_answer_key="expected_answer"), + ], + ) + + print(result.url) # link to the evaluation in the Agenta UI + for scenario in result.scenarios: + print(scenario.inputs, scenario.outputs, scenario.metrics) + + # Compare two configurations of the same application in one run. Both are + # handles; the second is just a partial of the first. + await ag.aevaluate( + name="prompt-shootout", + testset="hotel-faq-v2", + applications=[ + hotel_agent, # current defaults + hotel_agent.pin(top_k=8), # candidate config + ], + evaluators=[rubric_judge], + ) + + # Evaluate inline testcases without a stored testset. Handy in CI. + await ag.aevaluate( + application=hotel_agent, + testcases=[ + {"message": "Do you have a pool?", "expected_answer": "Yes, 7am-10pm."}, + {"message": "Is parking free?", "expected_answer": "Yes, for guests."}, + ], + evaluators=[starts_capitalized], + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/function-based-sdk/04_config_registry.py b/docs/designs/function-based-sdk/04_config_registry.py new file mode 100644 index 0000000000..405fa35f6d --- /dev/null +++ b/docs/designs/function-based-sdk/04_config_registry.py @@ -0,0 +1,99 @@ +"""Pulling configurations from Agenta and referencing them. (POC, does not run.) + +Three patterns, identical to the class proposal: +1. Fetch deployed parameters as a typed object. +2. Bind a handle to an environment, so traces link back to the deployed + revision automatically. +3. Reference a managed config from inside another application's Parameters. + The resolver middleware fetches the referenced revision at invoke time. + +The only difference from the class version: "bind to environment" is the same +`.pin`-family operation as everything else — `.from_registry()` returns a +handle pinned to deployed parameters, instead of a freshly constructed +instance. One mechanism, not two. +""" + +import asyncio + +from pydantic import BaseModel + +import agenta as ag + +from application import Parameters as HotelParameters # 01_application.py +from application import hotel_agent # 01_application.py + + +async def main(): + ag.init() + + # --- 1. Fetch deployed parameters, typed and validated ----------------- + # Replaces ConfigManager.get_from_registry + manual dict access. The + # return value is a HotelParameters instance, so typos and schema drift + # fail loudly here instead of deep inside the app. + params = await hotel_agent.fetch_parameters(environment="production") + print(params.prompt.messages[0].content) + print(params.hotel_name, params.top_k) + + # The Parameters model is just a module-level type. Referencing it as a + # type needs no `Class.Parameters` namespacing — you import it. + assert isinstance(params, HotelParameters) + + # Specific variant or revision instead of an environment: + _candidate = await hotel_agent.fetch_parameters(variant="experiment-1") + + # --- 2. Bind a handle to a deployed revision --------------------------- + # Parameters are pulled from the registry and baked into the returned + # handle (a partial, like .pin). Every trace it produces carries + # application/variant/environment references. No manual + # ag.tracing.store_refs calls. + agent = await hotel_agent.from_registry(environment="production") + _result = await agent(message="Do you have a pool?") + + # The invocation link (trace_id + span_id) is available for feedback and + # annotation flows, same as build_invocation_link today. + print(agent.last_invocation.trace_id) + + # --- 3. Reference managed configs from another application ------------- + class RouterParams(BaseModel): + # An inline default, editable in the playground. + routing_prompt: ag.PromptTemplate = ag.PromptTemplate( + messages=[ag.Message(role="system", content="Route the request.")], + ) + # A reference instead of a value. The resolver middleware fetches the + # deployed revision of hotel-agent's prompt at invoke time, so this app + # always follows what is live in production. The playground renders a + # revision picker for it. + concierge_prompt: ag.PromptTemplate = ag.Reference( + application="hotel-agent", + environment="production", + key="prompt", + ) + + class RouterInputs(BaseModel): + message: str + + class RouterOutputs(BaseModel): + answer: str + route: str + + @ag.application( + slug="router-agent", + parameters=RouterParams, + inputs=RouterInputs, + outputs=RouterOutputs, + ) + async def router_agent( + *, inputs: RouterInputs, parameters: RouterParams + ) -> RouterOutputs: + # parameters.concierge_prompt is already resolved here. + ... + + # --- Lifecycle: push and deploy ----------------------------------------- + # push() commits a new revision with the schemas and defaults compiled from + # the decorator. deploy() promotes it to an environment. + revision_id = await hotel_agent.push() + await hotel_agent.deploy(revision=revision_id, environment="staging") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/function-based-sdk/05_serve.py b/docs/designs/function-based-sdk/05_serve.py new file mode 100644 index 0000000000..365fce5e19 --- /dev/null +++ b/docs/designs/function-based-sdk/05_serve.py @@ -0,0 +1,79 @@ +"""Serving function workflows over HTTP. (POC, does not run.) + +Workflows plug into FastAPI the FastAPI way: each handle exposes a standard +APIRouter via `.router()`, and you mount it with app.include_router like any +other router. No custom registration call, no hidden sub-app mounting. + +router() carries two endpoints: +- POST /invoke Typed from the handle: the request body model is generated + from inputs + parameters, the response model from outputs. + So /docs shows the real schemas, and clients get validation + errors from FastAPI like on any other endpoint. Streams + (SSE/NDJSON) when the Accept header asks for it and the + handle has a registered streamer. +- POST /inspect Returns the interface: schemas, parameters, identity. + +The playground talks to these endpoints to render the config form and run the +app. This file is line-for-line equivalent to the class version, except a +"configured instance" is a pinned handle, not a constructed object. +""" + +import agenta as ag +import uvicorn +from fastapi import Depends, FastAPI + +from application import hotel_agent # 01_application.py +from evaluators import rubric_judge # 02_evaluators.py + +ag.init() + +app = FastAPI() + + +# Plain FastAPI composition. +app.include_router(hotel_agent.router(), prefix="/hotel", tags=["hotel"]) + +# Evaluators are workflows too, so a custom evaluator can run as its own +# service and be registered on the platform by URL. +app.include_router(rubric_judge.router(), prefix="/evaluators/rubric-judge") + + +# Everything FastAPI gives you keeps working: dependencies, auth, middleware, +# versioned prefixes. The router does not care where it is mounted. +def verify_internal_token(): # noqa: D103 + ... + + +app.include_router( + hotel_agent.router(), + prefix="/internal/hotel", + dependencies=[Depends(verify_internal_token)], +) + +# A pinned handle serves with its baked-in parameters as defaults. +boutique = hotel_agent.pin(hotel_name="Hotel California") +app.include_router(boutique.router(), prefix="/boutique") + + +# Your own routes live next to them, as usual. +@app.get("/healthz") +def healthz(): + return {"ok": True} + + +# No FastAPI app of your own? A handle is directly servable as an ASGI app: +# +# asgi = hotel_agent.asgi() # then: uvicorn 05_serve:asgi +# +# or from the CLI, without writing a server file at all: +# +# agenta serve application:hotel_agent --port 8000 + + +if __name__ == "__main__": + # Resulting endpoints: + # POST /hotel/invoke POST /hotel/inspect + # POST /evaluators/rubric-judge/invoke POST /evaluators/rubric-judge/inspect + # POST /internal/hotel/invoke (token-protected) + # POST /boutique/invoke (pinned parameters) + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/docs/designs/function-based-sdk/06_framework_adapters.py b/docs/designs/function-based-sdk/06_framework_adapters.py new file mode 100644 index 0000000000..d9927c7d09 --- /dev/null +++ b/docs/designs/function-based-sdk/06_framework_adapters.py @@ -0,0 +1,240 @@ +"""Using agent frameworks inside Agenta workflows. (POC, does not run.) + +The question: if the hotel agent is built with OpenAI Agents SDK, Pydantic AI, +or LangGraph, do you have to restate everything? No. Three tiers, from full +control to one line: + +1. MANUAL - a plain @ag.application function that builds the framework agent + inside it. You decide what is configurable. Most code, no magic. +2. FACTORY - decorate a `build(parameters)` function with the framework's + application decorator. You declare Parameters; the decorator + handles running, streaming, output schemas, and instrumentation. +3. AUTOMATIC - ag.from_agent(agent). An adapter introspects the agent object + and generates Parameters/Inputs/Outputs for you. + +All three tiers compile to the same workflow revision. The common contract +underneath is the AgentAdapter port at the bottom — unchanged from the class +proposal, because it was already function/protocol-shaped there. + +The factory tier is where functions read *better* than classes: the class +proposal hides run()/stream() inside a base class you inherit. Here, `build` +is plainly a function the decorator drives. Nothing is hidden in a superclass. +""" + +import asyncio +from typing import Any, AsyncIterator, Protocol, TypeVar + +from agents import Agent as OpenAIAgent # noqa: E402 (openai-agents SDK, not installed) +from agents import function_tool # noqa: E402 +from langgraph.graph import StateGraph # noqa: E402 +from pydantic import BaseModel +from pydantic_ai import Agent as PydanticAgent # noqa: E402 + +import agenta as ag + + +# ========================================================================= +# Tier 1: MANUAL. Plain @ag.application, Pydantic AI inside the function. +# +# The framework agent is rebuilt per call from parameters. That is the price +# of playground-editable config: the source of truth lives in Agenta, not in +# the agent constructor. Building an Agent object is cheap in every framework. +# ========================================================================= + + +class ManualParams(BaseModel): + model: str = "openai:gpt-4o-mini" + instructions: str = "You are the concierge of {{hotel_name}}. Be brief." + hotel_name: str = "Grand Agenta Hotel" + + +class ManualInputs(BaseModel): + message: str + + +class ManualOutputs(BaseModel): + answer: str + + +@ag.application( + slug="hotel-agent-pydantic-ai", + name="Hotel Agent (Pydantic AI, manual)", + parameters=ManualParams, + inputs=ManualInputs, + outputs=ManualOutputs, +) +async def hotel_agent_manual( + *, inputs: ManualInputs, parameters: ManualParams +) -> ManualOutputs: + agent = PydanticAgent( + parameters.model, + system_prompt=ag.render( + parameters.instructions, hotel_name=parameters.hotel_name + ), + ) + result = await agent.run(inputs.message) + return ManualOutputs(answer=result.output) + + +# ========================================================================= +# Tier 2: FACTORY (the port). You write build(), the decorator does the rest. +# +# Use this when the automatic tier does not expose enough: conditional tools, +# nested config, multi-agent wiring. There is no run() to write. The framework +# decorator builds run()/stream() around whatever build() returns (Runner.run +# for openai-agents, agent.run for pydantic-ai, graph.ainvoke for langgraph), +# and derives Outputs from the agent's output_type when you do not declare one. +# ========================================================================= + + +@function_tool +def search_rooms(date: str, guests: int) -> list[dict]: ... + + +@function_tool +def book_room(room_id: str, date: str) -> dict: ... + + +class BookingAnswer(BaseModel): + answer: str + room_id: str | None = None + + +class FactoryParams(BaseModel): + model: str = "gpt-4o-mini" + instructions: str = "You are the hotel concierge. Use tools to answer." + booking_enabled: bool = True + + +# Outputs is derived from build()'s output_type (BookingAnswer) unless an +# outputs= model is passed to the decorator. +@ag.ext.openai_agents.application( + slug="hotel-agent-openai", + name="Hotel Agent (OpenAI Agents, factory)", + parameters=FactoryParams, + inputs=ManualInputs, +) +def build_hotel_agent(parameters: FactoryParams) -> OpenAIAgent: + tools = [search_rooms] + ([book_room] if parameters.booking_enabled else []) + return OpenAIAgent( + name="hotel-concierge", + model=parameters.model, + instructions=parameters.instructions, + tools=tools, + output_type=BookingAnswer, + ) + + +# ========================================================================= +# Tier 3: AUTOMATIC (the adapter). Wrap an existing agent, it becomes one of +# us. The adapter is picked by agent type from a registry. +# ========================================================================= + +concierge = OpenAIAgent( + name="concierge", + model="gpt-4o-mini", + instructions="You are the hotel concierge.", + tools=[search_rooms], + output_type=BookingAnswer, +) + +hotel_agent_auto = ag.from_agent(concierge, slug="hotel-agent-auto") +# What the openai-agents adapter extracted: +# parameters <- model, instructions, model_settings (playground-editable) +# inputs <- {message: str} (the run input) +# outputs <- BookingAnswer schema (from output_type) +# handoffs/tools -> listed read-only in metadata, visible in the UI +# Applying edited parameters uses agent.clone(model=..., instructions=...) per +# invocation. The original object is never mutated. + + +# LangGraph maps the cleanest, because graphs already separate the three +# concerns the schema model needs: +# parameters <- config_schema (the "configurable" section) +# inputs <- input state schema +# outputs <- output state schema +# Binding needs no rebuild: graph.ainvoke(inputs, config={"configurable": ...}). + + +class GraphState(BaseModel): + message: str + answer: str | None = None + + +class GraphConfig(BaseModel): + model: str = "gpt-4o-mini" + system_prompt: str = "You are the hotel concierge." + + +builder = StateGraph(GraphState, config_schema=GraphConfig) +# ... add nodes and edges ... +graph = builder.compile() + +hotel_graph = ag.from_agent(graph, slug="hotel-agent-langgraph") + + +async def main(): + ag.init() + + # All four handles behave identically from here on. + for handle in ( + hotel_agent_manual, + build_hotel_agent, + hotel_agent_auto, + hotel_graph, + ): + await handle.push() + + agent = hotel_agent_auto.pin(model="gpt-4.1") + _result = await agent(message="Do you have a pool?") + + await ag.aevaluate( + testset="hotel-faq-v2", + applications=[hotel_agent_manual, hotel_agent_auto], + evaluators=["rubric-judge"], + ) + + +# ========================================================================= +# The port: one adapter per framework, registered by agent type. This is what +# tiers 2 and 3 are built on. Third-party frameworks can ship their own adapter +# and register it via entry point. Unchanged from the class proposal — it was +# already a Protocol of functions there, and there is nothing to make +# functional. +# ========================================================================= + +AgentT = TypeVar("AgentT") + + +class AgentAdapter(Protocol[AgentT]): + def parameters(self, agent: AgentT) -> tuple[type[BaseModel], dict]: + """The editable config model, and current values read off the agent. + These become schemas.parameters and revision.data.parameters.""" + + def io(self, agent: AgentT) -> tuple[type[BaseModel], type[BaseModel]]: + """Inputs and Outputs models. These become schemas.inputs/outputs.""" + + def bind(self, agent: AgentT, parameters: BaseModel) -> AgentT: + """Apply parameters without mutating the original: clone() for + openai-agents, rebuild for pydantic-ai, configurable for langgraph.""" + + async def invoke(self, agent: AgentT, inputs: BaseModel) -> Any: + """Run once. The result is normalized against Outputs.""" + + def stream(self, agent: AgentT, inputs: BaseModel) -> AsyncIterator[Any]: + """Optional. When the adapter implements it, the wrapped app streams.""" + + +# ag.adapters.register(OpenAIAgent, OpenAIAgentsAdapter()) +# ag.adapters.register(PydanticAgent, PydanticAIAdapter()) +# ag.adapters.register(CompiledStateGraph, LangGraphAdapter()) + +# Honest limits of the automatic tier: the adapter can only expose what the +# agent object carries. Model, instructions, and model settings come for free. +# Prompts buried inside tools, retrievers, or graph nodes are invisible to +# introspection. When you need those in the playground, drop to the factory +# tier and lift them into Parameters yourself. + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/function-based-sdk/07_config_only.py b/docs/designs/function-based-sdk/07_config_only.py new file mode 100644 index 0000000000..5dd27df2ff --- /dev/null +++ b/docs/designs/function-based-sdk/07_config_only.py @@ -0,0 +1,102 @@ +"""Configuration-only workflows: prompt management and beyond. (POC, does not run.) + +Two answers to "I want the config, not the runnable": + +1. Any pushed handle already works that way. A pushed workflow registers its + schemas and parameters on the platform; the handler only executes where your + code runs. If you never serve it, the platform side is exactly a managed, + versioned configuration. Just call fetch_parameters() and never invoke. + +2. ag.configuration() makes that intent explicit. It takes Parameters and + nothing else: no inputs, no outputs, no handler. In the data model it is a + WorkflowRevision with schemas.parameters and parameters, and no handler URI. + The UI renders a config form and version history, no run button. There is no + invoke()/serve() on the returned handle. + +This is where dropping the class is the clearest win: the class version is an +empty class body with three methods conspicuously *not* implemented. Here it is +a single call with one argument — there is no runnable, so there is nothing to +leave blank. + +This generalizes prompt management to arbitrary typed config: prompts, rubrics, +routing tables, guardrail lists, feature flags for the LLM layer. +""" + +import asyncio + +from pydantic import BaseModel, Field + +import agenta as ag + + +class ConciergeConfigParams(BaseModel): + prompt: ag.PromptTemplate = ag.PromptTemplate( + messages=[ + ag.Message( + role="system", content="You are the concierge of {{hotel_name}}." + ), + ], + llm_config=ag.ModelConfig(model="gpt-4o-mini"), + ) + escalation_keywords: list[str] = ["lawyer", "refund", "manager"] + model_by_tier: dict[str, str] = Field( + default={"free": "gpt-4o-mini", "pro": "gpt-4.1"}, + description="Routing table, editable without a deploy.", + ) + + +concierge_config = ag.configuration( + slug="concierge-config", + name="Concierge Configuration", + description="Everything the support flow needs, versioned and deployable.", + parameters=ConciergeConfigParams, +) + + +async def main(): + ag.init() + + # Same lifecycle as any workflow: commit a revision, promote it. + await concierge_config.push() + await concierge_config.deploy(environment="production") + + # The consuming code does not need to be an Agenta workflow at all. This + # can run inside any service, cron job, or notebook. + config = await concierge_config.fetch(environment="production") + print(config.escalation_keywords) + _model = config.model_by_tier["pro"] + _prompt = config.prompt.format(hotel_name="Grand Agenta Hotel") + + # Sync variant for non-async codebases, cached with a TTL so it is safe to + # call per request. + config = concierge_config.fetch_sync(environment="production") + + # And it composes with everything else in this POC: an application can + # reference it instead of duplicating the values (see 04_config_registry). + class SupportParams(BaseModel): + concierge: ConciergeConfigParams = ag.Reference( + configuration="concierge-config", + environment="production", + ) + + class SupportInputs(BaseModel): + message: str + + class SupportOutputs(BaseModel): + answer: str + + @ag.application( + slug="support-agent", + parameters=SupportParams, + inputs=SupportInputs, + outputs=SupportOutputs, + ) + async def support_agent( + *, inputs: SupportInputs, parameters: SupportParams + ) -> SupportOutputs: + # parameters.concierge is resolved and typed. + ... + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/function-based-sdk/08_testsets.py b/docs/designs/function-based-sdk/08_testsets.py new file mode 100644 index 0000000000..774c423673 --- /dev/null +++ b/docs/designs/function-based-sdk/08_testsets.py @@ -0,0 +1,105 @@ +"""Testsets as functions. (POC, does not run.) + +A testset declares its columns as a Pydantic model, passed to `ag.testset(...)`. +That buys exactly what the class version buys: +- the platform validates rows against the schema, on upload and on edit +- typed iteration in code, no dict["colunm_typo"] surprises +- fail-fast compatibility checks: before an evaluation runs anything, the + testset columns are checked against the application's inputs and every + evaluator's inputs + +The schema compiles into the same JSON Schema machinery as everything else in +this POC, so the testset UI can render typed cells (numbers, enums, nested +JSON) instead of treating every column as text. + +`ag.testset()` returns a handle with `.push`, `.fetch`, `.add`, `.from_traces` +— the same handle shape as applications and evaluators. The row model is a +module-level type you reference directly, so there is no `Class.Case` +namespacing to reach for. +""" + +import asyncio +from typing import Optional + +from pydantic import BaseModel, Field + +import agenta as ag + +from application import hotel_agent # 01_application.py +from evaluators import rubric_judge # 02_evaluators.py + + +class HotelFAQCase(BaseModel): + """One row. Becomes the testset's column schema.""" + + message: str + persona: str = "guest" + expected_answer: str + difficulty: Optional[int] = Field(1, ge=1, le=3) + + +hotel_faq = ag.testset( + slug="hotel-faq", + name="Hotel FAQ", + case=HotelFAQCase, + # Optional seed data, committed on first push. + cases=[ + HotelFAQCase( + message="Do you have a pool?", + expected_answer="Yes, open 7am to 10pm.", + ), + HotelFAQCase( + message="Can I check in at 6am?", + expected_answer="Early check-in depends on availability.", + difficulty=2, + ), + ], +) + + +async def main(): + ag.init() + + await hotel_faq.push() + + # Typed access. Editors keep working in the UI; this pulls their edits. + testset = await hotel_faq.fetch() + for case in testset: + print(case.message, case.difficulty) + + # Append programmatically, validated against HotelFAQCase. + await hotel_faq.add( + cases=[HotelFAQCase(message="Is parking free?", expected_answer="Yes.")] + ) + + # Curate from production traces: map spans onto typed cases. + await hotel_faq.from_traces( + filter=ag.TraceFilter( + application="hotel-agent", annotations={"thumbs": "down"} + ), + map=lambda span: HotelFAQCase( + message=span.inputs["message"], + expected_answer=span.annotations["corrected_answer"], + ), + ) + + # The payoff: compatibility is checked before anything runs. + # HotelFAQCase covers hotel_agent's inputs (message, persona)? yes + # HotelFAQCase covers rubric_judge's inputs (expected_answer)? yes + # A missing or mistyped column raises here, not after 200 LLM calls. + await ag.aevaluate( + testset=hotel_faq, + application=hotel_agent, + evaluators=[rubric_judge], + ) + + # Mismatches fail loudly and early: + # + # await ag.aevaluate(testset=hotel_faq, application=flight_agent, ...) + # IncompatibleTestsetError: flight_agent inputs require column + # 'origin' (str); testset 'hotel-faq' has columns + # [message, persona, expected_answer, difficulty] + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/function-based-sdk/README.md b/docs/designs/function-based-sdk/README.md new file mode 100644 index 0000000000..c03a947036 --- /dev/null +++ b/docs/designs/function-based-sdk/README.md @@ -0,0 +1,117 @@ +# Function-based SDK (POC) + +**Status: design proposal. None of this code runs.** This is the functional +counter-proposal to `../class-based-sdk/`. It exposes the *same* surface +(typed `Parameters`/`Inputs`/`Outputs`, lifecycle, serving, framework adapters, +config registry, testsets) without subclassing anything. + +> One of three peer demos under `docs/designs/`, with aligned filenames +> (`00_*` foundation + `01_application.py` … `08_testsets.py`) so they diff 1:1: +> +> | folder | `00_*` foundation | `01`–`08` | +> |---|---|---| +> | [class-based-sdk/](../class-based-sdk/) | `00_core.py` — bases as native classes | class API | +> | [function-based-sdk/](.) (this one) | `00_core.py` — no base classes; decorators + closures | function API | +> | [functional-based-class-sdk/](../functional-based-class-sdk/) | `00_core.py` — the same bases on the functional core | class API as sugar | + +Read `../class-based-sdk/` first. This folder answers one question its author +raised: **does the class earn its keep, or is it style?** The claim here is +style. The peer folder [`../functional-based-class-sdk/`](../functional-based-class-sdk/) +proves it by rebuilding the entire class API as a thin shim over this functional +core — one file per base class, each diffable against its functional sibling +here and its class-based original in `../class-based-sdk/`. The class becomes +optional sugar, not a second +system. + +## The idea + +The class-based proposal makes the *class* the workflow. This one keeps the +workflow a **function** — which is what it already is in today's SDK — and moves +the three things the class actually added onto the decorator and its return +value: + +| class-based | function-based | +|--------------------------------------|---------------------------------------------------| +| `class HotelAgent(ag.Application)` | `@ag.application(...)` over a function | +| inner `class Parameters(BaseModel)` | a module-level `Parameters` model, passed in | +| inner `class Inputs` / `class Outputs`| module-level models, passed in | +| `run()` method | the decorated function itself | +| `stream()` method | a second function, `@handler.stream` | +| `__init__` for clients/retrievers | module singletons, or a `setup=` hook | +| `HotelAgent(parameters={...})` | `handler.pin(**overrides)` — a `functools.partial` | +| `HotelAgent.apush()` / `.router()` | `handler.push()` / `handler.router()` | +| `HotelAgent.Parameters` as a type | the `Parameters` model you already named | + +The decorator does **not** return a bare function. It returns a **handle**: a +callable object that also carries `.pin()`, `.push()`, `.router()`, +`.inspect()`, `.from_registry()`, `.fetch_parameters()`. That handle is the +functional equivalent of the class instance. `.pin()` is the constructor +builder — a partial over `parameters` — which is exactly the "constructor +builders with partials" intuition that motivated this folder. + +## Why functions, concretely + +- **One authoring model, not two.** The class proposal keeps decorators "as the + quick path" *alongside* classes — two ways to author one engine. Here the + decorator scales from "wrap my existing function" to "declare full typed + schemas + lifecycle" by adding keyword arguments. No inheritance, no `self`, + no "did they define a `stream` method?" introspection. +- **Schemas are declared, not inferred.** This is the real upgrade in the class + proposal, and it has nothing to do with classes. Passing `parameters=`, + `inputs=`, `outputs=` to the decorator gives the same explicit, validated + schemas — the win survives the demotion from class to function. +- **No empty shells.** A config-only workflow (`07`) is `ag.configuration(...)`, + not a class with three methods left unimplemented. A trivial evaluator (`02`) + is four lines, no class body. +- **Partials all the way down.** Pinning config, binding to an environment, and + serving a configured instance are all the same operation: a partial over + `parameters`. The class spells this three different ways (`__init__`, + `from_registry`, pinned-instance `router`); here it is one verb, `.pin()`. + +## Files, in reading order + +Files `01`–`08` are aligned 1:1 with the other two folders so they diff +directly. Each folder's `00_core.py` is its foundation; this folder's has no +base classes (the function form doesn't subclass), so it documents the +decorator + closure conventions instead. + +0. `00_core.py` — the function form's foundation. No base classes to define; + the foundation is the decorator plus closures: factories that keep + `Parameters`/`Inputs`/`Outputs` private (scoped, not leaked to the module), + selective exposure, and config-bound factories. The one part with no 1:1 + sibling in the other folders. +1. `01_application.py` — an application as a decorated function. Typed config, + inputs, outputs; instrumentation with child spans; optional streaming; + local calls; pinning; push. +2. `02_evaluators.py` — evaluators as functions: a heuristic check, an LLM + judge with its own settings, a trace-based cost check, and typed builtins. +3. `03_run_evals.py` — running an evaluation with function workflows and + pinned handles. +4. `04_config_registry.py` — fetching deployed parameters as typed objects, + binding a handle to an environment, referencing managed configs. +5. `05_serve.py` — serving with plain FastAPI: each handle exposes a standard + `APIRouter`, mounted with `app.include_router`. +6. `06_framework_adapters.py` — OpenAI Agents SDK, Pydantic AI, LangGraph. + Three tiers: manual (framework inside the function), factory (a `build` + function the decorator drives), automatic (`ag.from_agent(agent)`). +7. `07_config_only.py` — configuration-only workflows (`ag.configuration`): + Parameters, no runnable. +8. `08_testsets.py` — testsets as functions: typed columns, validated rows, + curation from traces, fail-fast compatibility checks. +**The punchline lives in a peer folder:** +[`../functional-based-class-sdk/`](../functional-based-class-sdk/) rebuilds the +class-based API of `../class-based-sdk/` as a thin shim over this functional +core — one base, two front-ends (`WorkflowFunction` decorator path + +`ClassFrontEnd` subclass path), split one file per kind so each diffs against +its sibling here and its class original. Both proposals, one engine, the class +shown to be cosmetics. + +## What carries over from the current SDK + +- `@ag.instrument()` still wraps any function for child spans. The decorated + handler is instrumented automatically. +- The middleware chain (vault, resolver, normalizer) runs unchanged under + `.invoke()`. +- Today's plain `@ag.application` / `@ag.evaluator` decorators are the *same* + decorators, just taught to accept explicit schema models. Existing decorated + functions keep working with inferred schemas. diff --git a/docs/designs/functional-based-class-sdk/00_core.py b/docs/designs/functional-based-class-sdk/00_core.py new file mode 100644 index 0000000000..f56224e480 --- /dev/null +++ b/docs/designs/functional-based-class-sdk/00_core.py @@ -0,0 +1,362 @@ +"""The core: ONE Workflow base, TWO authoring front-ends. (POC, does not run.) + +This folder is demo #3 of three peers under docs/designs/: + + ../class-based-sdk/ #1 class-only (class API, native) + ../function-based-sdk/ #2 functional-only (function API, native) + ./ (functional-based-class) #3 class API rebuilt on the functional core + +The point of #3: the class API of #1 is *sugar*. There is one workflow base and +two ways to author it — a decorator (function) front-end and a subclass (class) +front-end. The class front-end does not register anything itself; it gathers the +same data a decorator would and forwards into the function front-end. So: + + class authoring -> function authoring -> one Workflow base + +If the class path built workflows itself, there would be two engines. It does +not, so there is one. That chain is the whole argument. + +Grounded in the real SDK (sdks/python/agenta/sdk/): +- `decorators/running.py` already has `class Workflow` wrapping a function + (`self._fn`) with `.invoke()`/`.inspect()`. The handle exists, and it already + wraps a function — the function is the substrate today. +- `models/workflows.py::WorkflowFlags` already has `is_application`, + `is_evaluator`, `has_handler`. A workflow's *kind is a flag*, not a type: + Application and Evaluator are the same Workflow with a different flag; + Configuration is a Workflow with has_handler=False. + +`Testset` is intentionally NOT a Workflow (no parameters, no handler, no schemas +triple). It gets its own base at the bottom of this file. +""" + +from __future__ import annotations + +from typing import Any, Callable, Optional + +# In the real SDK: agenta.sdk.decorators.running (the `workflow` handle) and +# agenta.sdk.models.workflows (WorkflowFlags). Shown as the contract we sit on. +from agenta import workflow as _workflow_decorator +from agenta.sdk.models.workflows import WorkflowFlags +from pydantic import BaseModel + + +# ========================================================================= +# THE ONE BASE. Both front-ends below produce an instance of this. It is the +# SDK's existing `Workflow` handle given the lifecycle surface the proposal +# needs (.pin/.push/.router/...). Nothing here is kind-specific: "application +# vs evaluator" is a flag, passed in. +# ========================================================================= + + +class Workflow: + def __init__( + self, + *, + handler: Optional[Callable[..., Any]], + slug: str, + name: Optional[str] = None, + description: Optional[str] = None, + flags: WorkflowFlags, + parameters: Optional[type[BaseModel]] = None, + inputs: Optional[type[BaseModel]] = None, + outputs: Optional[type[BaseModel]] = None, + streamer: Optional[Callable[..., Any]] = None, + pinned: Optional[dict] = None, + ): + # Compile the three models to JSON Schema and register via the existing + # `workflow` decorator. flags carries the kind, as data. + self._decorator = _workflow_decorator( + slug=slug, + name=name, + description=description, + flags=flags.model_dump(), + schemas=_compile_schemas(parameters, inputs, outputs), + parameters=(pinned or {}), + ) + self._handle = self._decorator(handler) if handler else self._decorator + self._streamer = streamer + self._pinned = pinned or {} + self._spec = dict( + slug=slug, + name=name, + description=description, + flags=flags, + parameters=parameters, + inputs=inputs, + outputs=outputs, + ) + + async def __call__(self, **kw): + return await self._handle(**{**self._pinned, **kw}) + + async def invoke(self, **kw): + return await self._handle.invoke(request=kw) + + async def inspect(self): + return await self._handle.inspect() + + def pin(self, **overrides) -> "Workflow": + # functools.partial over parameters, expressed as a new handle. + return Workflow( + handler=self._handle._fn, + streamer=self._streamer, + pinned={**self._pinned, **overrides}, + **self._spec, + ) + + async def push(self) -> Any: ... + async def deploy(self, **k) -> Any: ... + def router(self, *a, **k): ... + async def fetch_parameters(self, **k): ... + async def from_registry(self, **k): ... + + +def _compile_schemas(parameters, inputs, outputs) -> dict: + return { + "parameters": parameters.model_json_schema() if parameters else None, + "inputs": inputs.model_json_schema() if inputs else None, + "outputs": outputs.model_json_schema() if outputs else None, + } + + +# ========================================================================= +# FRONT-END 1: WorkflowFunction — the DECORATOR path. `@ag.application(...)` over +# a function -> a Workflow. This is exactly what #2 (../function-based-sdk/) +# calls `ag.application`. It gathers (handler, models, flags) and hands them to +# Workflow. Nothing more. +# ========================================================================= + + +class WorkflowFunction: + @staticmethod + def make( + handler, + *, + flags, + slug, + name, + description, + parameters, + inputs, + outputs, + streamer=None, + ): + return Workflow( + handler=handler, + flags=flags, + slug=slug, + name=name, + description=description, + parameters=parameters, + inputs=inputs, + outputs=outputs, + streamer=streamer, + ) + + @classmethod + def application( + cls, + *, + slug, + name=None, + description=None, + parameters=None, + inputs=None, + outputs=None, + ): + def deco(handler): + return cls.make( + handler, + flags=WorkflowFlags(is_application=True, has_handler=True), + slug=slug, + name=name, + description=description, + parameters=parameters, + inputs=inputs, + outputs=outputs, + ) + + return deco + + @classmethod + def evaluator( + cls, + *, + slug, + name=None, + description=None, + parameters=None, + inputs=None, + outputs=None, + ): + def deco(handler): + return cls.make( + handler, + flags=WorkflowFlags(is_evaluator=True, has_handler=True), + slug=slug, + name=name, + description=description, + parameters=parameters, + inputs=inputs, + outputs=outputs, + ) + + return deco + + @classmethod + def configuration(cls, *, slug, name=None, description=None, parameters=None): + # No handler: same Workflow, has_handler=False. + return Workflow( + handler=None, + flags=WorkflowFlags(is_application=True, has_handler=False), + slug=slug, + name=name, + description=description, + parameters=parameters, + ) + + +# ========================================================================= +# FRONT-END 2: WorkflowClass / ClassFrontEnd — the SUBCLASS path. Same base. +# +# `class X(ag.Application)` -> on __init_subclass__, read the inner models + +# handler method off the class and call the SAME WorkflowFunction path. The +# class gathers exactly what the decorator gathered, just from attributes +# instead of arguments. THIS is the "class is sugar" mechanism, in one place. +# +# The per-kind shim files (01_*, 02_*, 07_*) each set _handler_name + flags and +# run a class-based example from ../class-based-sdk verbatim on top of this. +# ========================================================================= + + +class ClassFrontEnd: + _handler_name: Optional[str] = "run" + + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + if not getattr(cls, "slug", None): + return # an Application/Evaluator base itself, not a user workflow + + instance = cls() # holds __init__ resources (clients, indexes) + method = getattr(cls, cls._handler_name, None) if cls._handler_name else None + + async def handler(**kw): + return await method(instance, **kw) + + streamer_method = getattr(cls, "stream", None) + streamer = None + if streamer_method is not None: + + async def streamer(**kw): # noqa: F811 + async for chunk in streamer_method(instance, **kw): + yield chunk + + # ---- the punchline: the class forwards into the function path ---- + cls._handle = WorkflowFunction.make( + handler if method else None, + flags=cls._flags(), + slug=cls.slug, + name=getattr(cls, "name", None), + description=getattr(cls, "description", None), + parameters=getattr(cls, "Parameters", None), + inputs=getattr(cls, "Inputs", None), + outputs=getattr(cls, "Outputs", None), + streamer=streamer, + ) + + @classmethod + def _flags(cls) -> WorkflowFlags: # overridden per kind + raise NotImplementedError + + # instance(parameters=...) -> pin; calls/lifecycle delegate to the handle. + def __init__(self, *, parameters: dict | None = None): + self._bound = ( + type(self)._handle.pin(**parameters) if parameters else type(self)._handle + ) + + async def __call__(self, **kw): + return await self._bound(**kw) + + # A configured instance delegates everything else (router, invoke, inspect, + # push, ...) to its *pinned* handle, so it serves/runs with its baked-in + # defaults. Class-level lifecycle uses the classmethods below. + def __getattr__(self, item): + return getattr(object.__getattribute__(self, "_bound"), item) + + @classmethod + async def apush(cls): + return await cls._handle.push() + + @classmethod + async def adeploy(cls, **k): + return await cls._handle.deploy(**k) + + @classmethod + async def inspect(cls): + return await cls._handle.inspect() + + @classmethod + def router(cls, *a, **k): + return cls._handle.router(*a, **k) + + @classmethod + async def afetch_parameters(cls, **k): + return await cls._handle.fetch_parameters(**k) + + @classmethod + async def afrom_registry(cls, **k): + return await cls._handle.from_registry(**k) + + +# Per-kind bases. Each is two lines: which handler method, which flag. +class Application(ClassFrontEnd): + _handler_name = "run" + + @classmethod + def _flags(cls): + return WorkflowFlags(is_application=True, has_handler=True) + + +class Evaluator(ClassFrontEnd): + _handler_name = "evaluate" + + @classmethod + def _flags(cls): + return WorkflowFlags(is_evaluator=True, has_handler=True) + + +class Configuration(ClassFrontEnd): + _handler_name = None # no runnable + + @classmethod + def _flags(cls): + return WorkflowFlags(is_application=True, has_handler=False) + + +# ========================================================================= +# NOT a Workflow. Testset is a typed row collection: inner `Case` model + seed +# `cases`, no parameters/handler/schemas-triple, absent from WorkflowFlags. Its +# own tiny base, deliberately not under `Workflow`, so the hierarchy matches the +# data model instead of papering over it. +# ========================================================================= + + +class Testset: + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + if not getattr(cls, "slug", None): + return + cls._case = cls.Case + cls._seed = getattr(cls, "cases", None) + + @classmethod + async def apush(cls): ... + + @classmethod + async def afetch(cls): ... + + @classmethod + async def aadd(cls, **k): ... + + @classmethod + async def afrom_traces(cls, **k): ... diff --git a/docs/designs/functional-based-class-sdk/01_application.py b/docs/designs/functional-based-class-sdk/01_application.py new file mode 100644 index 0000000000..e4ea038799 --- /dev/null +++ b/docs/designs/functional-based-class-sdk/01_application.py @@ -0,0 +1,149 @@ +"""ag.Application as sugar over the functional core. (POC, does not run.) + +Diff this against ../function-based-sdk/01_application.py (the functional +original) and ../class-based-sdk/01_application.py (the class proposal it +reproduces). + +PART A is nearly empty — ag.Application is just the class front-end from +00_core.py, bound onto `ag`. PART B is ../class-based-sdk/01_application.py +running VERBATIM on top of it. Nothing in PART B knows it is sugar; `apush()`, +`HotelAgent(parameters={...})`, `.invoke()`, and `.inspect()` all work because +each delegates to the functional handle. +""" + +from __future__ import annotations + +import asyncio +from typing import AsyncIterator + +from openai import AsyncOpenAI +from pydantic import BaseModel, Field + +import agenta as ag + +from core import Application # 00_core.py — class front-end over the one base + +# ========================================================================= +# PART A — there is no base to define here. ag.Application is the class +# front-end from 00_core.py. Binding it onto `ag` is what the real SDK +# __init__ does; the example body below is unchanged. +# ========================================================================= + +ag.Application = Application # type: ignore[attr-defined] + + +# ========================================================================= +# PART B — ../class-based-sdk/01_application.py, VERBATIM, on the shim. +# ========================================================================= + + +class HotelAgent(ag.Application): + slug = "hotel-agent" + name = "Hotel Agent" + description = "Concierge agent that answers questions about the hotel." + + class Parameters(BaseModel): + """Playground-editable configuration. Becomes schemas.parameters.""" + + prompt: ag.PromptTemplate = ag.PromptTemplate( + messages=[ + ag.Message( + role="system", + content="You are the concierge of {{hotel_name}}. Be brief.", + ), + ag.Message(role="user", content="{{message}}"), + ], + llm_config=ag.ModelConfig(model="gpt-4o-mini", temperature=0.2), + ) + hotel_name: str = "Grand Agenta Hotel" + top_k: int = Field(4, ge=1, le=20, description="Documents to retrieve") + + class Inputs(BaseModel): + """Runtime inputs. Becomes schemas.inputs and the expected testset columns.""" + + message: str + persona: str = "guest" + + class Outputs(BaseModel): + """Becomes schemas.outputs. This is what evaluators receive as `outputs`.""" + + answer: str + sources: list[str] = [] + + def __init__(self, **kwargs): + super().__init__(**kwargs) + # Instance state is for request-independent resources only (clients, + # retrievers, indexes). Parameters arrive per request, not here. + self.client = AsyncOpenAI() + + async def run(self, *, inputs: Inputs, parameters: Parameters) -> Outputs: + documents = await self.retrieve(query=inputs.message, top_k=parameters.top_k) + + prompt = parameters.prompt.format( + hotel_name=parameters.hotel_name, + message=inputs.message, + ) + response = await self.client.chat.completions.create( + **prompt.to_openai_kwargs() + ) + + return self.Outputs( + answer=response.choices[0].message.content, + sources=[doc["id"] for doc in documents], + ) + + # Nested steps use @ag.instrument as today. They show up as child spans + # under the invocation span. + @ag.instrument(type="retriever") + async def retrieve(self, *, query: str, top_k: int) -> list[dict]: + return [{"id": "faq-12", "text": "The pool is open 7am to 10pm."}] + + # Optional. When declared, the server can negotiate streaming responses + # (SSE / NDJSON) from the Accept header. Batch callers still get run(). + async def stream( + self, *, inputs: Inputs, parameters: Parameters + ) -> AsyncIterator[str]: + prompt = parameters.prompt.format( + hotel_name=parameters.hotel_name, + message=inputs.message, + ) + stream = await self.client.chat.completions.create( + stream=True, **prompt.to_openai_kwargs() + ) + async for chunk in stream: + yield chunk.choices[0].delta.content or "" + + +async def main(): + ag.init() + + # Local typed call. Traced like any decorated handler today. + agent = HotelAgent() + result = await agent(message="Do you have a pool?") + print(result.answer, result.sources) + + # Pin a configuration. These values become revision.data.parameters + # when the class is pushed. + boutique = HotelAgent(parameters={"hotel_name": "Hotel California", "top_k": 8}) + result = await boutique(message="Can I check out any time I like?") + + # Full pipeline (vault -> resolver -> normalizer), same code path the + # platform runs. Plain kwargs; the SDK builds the wire request internally. + _response = await agent.invoke( + inputs={"message": "Is breakfast included?"}, + parameters={"top_k": 2}, + ) + + # Inspect returns the interface. Schemas come straight from the class, + # no signature inference. + interface = await HotelAgent.inspect() + print(interface.data.schemas.outputs) + + # Push to Agenta: creates or updates the application and returns the + # new revision. Schemas and default parameters travel with it. + revision_id = await HotelAgent.apush() + print(f"pushed revision {revision_id}") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/functional-based-class-sdk/02_evaluators.py b/docs/designs/functional-based-class-sdk/02_evaluators.py new file mode 100644 index 0000000000..d7820bb43a --- /dev/null +++ b/docs/designs/functional-based-class-sdk/02_evaluators.py @@ -0,0 +1,146 @@ +"""ag.Evaluator as sugar over the functional core. (POC, does not run.) + +Diff this against ../function-based-sdk/02_evaluators.py (functional original) +and ../class-based-sdk/02_evaluators.py (class proposal). + +In 00_core.py, the Evaluator front-end differs from Application by exactly one +word — `_handler_name = "evaluate"` — and one flag (`is_evaluator` vs +`is_application`). Everything else — schemas, pinning, push, router — is the +shared `Workflow` base. That one-word difference is the entire "evaluators are a +different base class" story. + +PART B is ../class-based-sdk/02_evaluators.py running verbatim on the shim. +""" + +from __future__ import annotations + +import asyncio + +from openai import AsyncOpenAI +from pydantic import BaseModel, ConfigDict, Field + +import agenta as ag + +from core import Evaluator # 00_core.py — WorkflowClass path, is_evaluator flag + +# ========================================================================= +# PART A — no base to define. ag.Evaluator is the class front-end from +# 00_core.py, bound onto `ag`. +# ========================================================================= + +ag.Evaluator = Evaluator # type: ignore[attr-defined] + + +# ========================================================================= +# PART B — ../class-based-sdk/02_evaluators.py, VERBATIM, on the shim. +# ========================================================================= + + +class StartsCapitalized(ag.Evaluator): + """The smallest possible evaluator: no settings, one metric.""" + + slug = "starts-capitalized" + name = "Starts Capitalized" + + class Outputs(BaseModel): + score: float + success: bool + + async def evaluate(self, *, outputs, **_) -> Outputs: + text = outputs["answer"] if isinstance(outputs, dict) else str(outputs) + ok = bool(text) and text[0].isupper() + return self.Outputs(score=1.0 if ok else 0.0, success=ok) + + +class RubricJudge(ag.Evaluator): + """LLM-as-a-judge with its own typed settings. + + The settings render in the evaluator config UI from schemas.parameters, + exactly like builtin evaluator settings do today. + """ + + slug = "rubric-judge" + name = "Rubric Judge" + description = "Grades the answer against a rubric with an LLM." + + class Parameters(BaseModel): + judge_model: str = "gpt-4o-mini" + rubric: str = Field( + "The answer is factual, polite, and under 100 words.", + json_schema_extra={"x-ag-type": "text"}, + ) + + class Inputs(BaseModel): + # Evaluators receive all testcase columns. Declare the ones you use, + # allow the rest. + model_config = ConfigDict(extra="allow") + expected_answer: str | None = None + + class Outputs(BaseModel): + score: float = Field(ge=0.0, le=1.0) + verdict: str + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.client = AsyncOpenAI() + + async def evaluate( + self, *, inputs: Inputs, outputs, parameters: Parameters, **_ + ) -> Outputs: + response = await self.client.chat.completions.create( + model=parameters.judge_model, + messages=[ + {"role": "system", "content": f"Grade against: {parameters.rubric}"}, + {"role": "user", "content": str(outputs)}, + ], + ) + verdict = response.choices[0].message.content + return self.Outputs(score=0.9, verdict=verdict) + + +class StaysUnderBudget(ag.Evaluator): + """A trace-based evaluator. `trace` exposes the full invocation: spans, + costs, tokens, tool calls. Same data the custom code evaluator gets today, + but typed access instead of a raw dict. + """ + + slug = "stays-under-budget" + name = "Stays Under Budget" + + class Parameters(BaseModel): + max_cost_usd: float = 0.01 + + class Outputs(BaseModel): + success: bool + cost_usd: float + + async def evaluate( + self, *, trace: ag.Trace, parameters: Parameters, **_ + ) -> Outputs: + cost = trace.metrics.costs.cumulative + return self.Outputs(success=cost <= parameters.max_cost_usd, cost_usd=cost) + + +async def main(): + ag.init() + + # Local typed call against a single output. Useful for unit tests. + judge = RubricJudge(parameters={"judge_model": "gpt-4.1"}) + result = await judge( + outputs={"answer": "The pool is open 7am to 10pm."}, + expected_answer="Pool hours are 7-22.", + ) + print(result.score, result.verdict) + + # Builtins are classes too: typed configurators over the existing + # agenta:builtin:* handlers, instead of settings dicts. + _exact = ag.evaluators.ExactMatch(correct_answer_key="expected_answer") + _similarity = ag.evaluators.SemanticSimilarity(threshold=0.8) + + # Push registers the evaluator on the platform with its compiled schemas. + # It then shows up in the UI next to the builtins. + await RubricJudge.apush() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/functional-based-class-sdk/03_run_evals.py b/docs/designs/functional-based-class-sdk/03_run_evals.py new file mode 100644 index 0000000000..21afdbbdc4 --- /dev/null +++ b/docs/designs/functional-based-class-sdk/03_run_evals.py @@ -0,0 +1,64 @@ +"""Running evals — identical to the class version, because consuming a handle +is the same whether it came from a class or a function. (POC, does not run.) + +Diff this against ../function-based-sdk/03_run_evals.py and +../class-based-sdk/03_run_evals.py. This file defines no base — it imports the +class workflows from 01/02 (which are sugar over the functional core) and uses +them. There is nothing kind-specific here; the three folders converge. +""" + +import asyncio + +import agenta as ag + +from application import HotelAgent # 01_application.py (class over functional core) +from evaluators import ( # 02_evaluators.py + RubricJudge, + StartsCapitalized, + StaysUnderBudget, +) + + +async def main(): + ag.init() + + result = await ag.aevaluate( + name="hotel-agent-regression", + testset="hotel-faq-v2", + application=HotelAgent, + evaluators=[ + StartsCapitalized, + RubricJudge(parameters={"judge_model": "gpt-4.1"}), + StaysUnderBudget(parameters={"max_cost_usd": 0.005}), + ag.evaluators.ExactMatch(correct_answer_key="expected_answer"), + ], + ) + + print(result.url) + for scenario in result.scenarios: + print(scenario.inputs, scenario.outputs, scenario.metrics) + + # Compare two configurations of the same application in one run. + await ag.aevaluate( + name="prompt-shootout", + testset="hotel-faq-v2", + applications=[ + HotelAgent, # current defaults + HotelAgent(parameters={"top_k": 8}), # candidate config (pin) + ], + evaluators=[RubricJudge], + ) + + # Evaluate inline testcases without a stored testset. Handy in CI. + await ag.aevaluate( + application=HotelAgent, + testcases=[ + {"message": "Do you have a pool?", "expected_answer": "Yes, 7am-10pm."}, + {"message": "Is parking free?", "expected_answer": "Yes, for guests."}, + ], + evaluators=[StartsCapitalized], + ) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/functional-based-class-sdk/04_config_registry.py b/docs/designs/functional-based-class-sdk/04_config_registry.py new file mode 100644 index 0000000000..0a1b8ab727 --- /dev/null +++ b/docs/designs/functional-based-class-sdk/04_config_registry.py @@ -0,0 +1,64 @@ +"""Config registry — class-based example on the functional core. (POC, does not run.) + +Diff against ../function-based-sdk/04_config_registry.py and +../class-based-sdk/04_config_registry.py. Importing HotelAgent from 01 binds +ag.Application (the class front-end from 00_core), so the inline RouterAgent +subclass below resolves. Lifecycle calls delegate to the functional handle. +""" + +import asyncio + +from pydantic import BaseModel + +import agenta as ag + +from application import HotelAgent # 01_application.py — also binds ag.Application + + +async def main(): + ag.init() + + # 1. Fetch deployed parameters, typed and validated. + params = await HotelAgent.afetch_parameters(environment="production") + print(params.prompt.messages[0].content) + print(params.hotel_name, params.top_k) + + _candidate = await HotelAgent.afetch_parameters(variant="experiment-1") + + # 2. Bind to a deployed revision (afrom_registry -> pinned handle). + agent = await HotelAgent.afrom_registry(environment="production") + _result = await agent(message="Do you have a pool?") + print(agent.last_invocation.trace_id) + + # 3. Reference managed configs from another application. + class RouterAgent(ag.Application): + slug = "router-agent" + + class Parameters(BaseModel): + routing_prompt: ag.PromptTemplate = ag.PromptTemplate( + messages=[ag.Message(role="system", content="Route the request.")], + ) + concierge_prompt: ag.PromptTemplate = ag.Reference( + application="hotel-agent", + environment="production", + key="prompt", + ) + + class Inputs(BaseModel): + message: str + + class Outputs(BaseModel): + answer: str + route: str + + async def run(self, *, inputs: Inputs, parameters: Parameters) -> Outputs: + # parameters.concierge_prompt is already resolved here. + ... + + # Lifecycle: push and deploy. + revision_id = await HotelAgent.apush() + await HotelAgent.adeploy(revision=revision_id, environment="staging") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/functional-based-class-sdk/05_serve.py b/docs/designs/functional-based-class-sdk/05_serve.py new file mode 100644 index 0000000000..eccc44b1b6 --- /dev/null +++ b/docs/designs/functional-based-class-sdk/05_serve.py @@ -0,0 +1,56 @@ +"""Serving — class-based example on the functional core. (POC, does not run.) + +Diff against ../function-based-sdk/05_serve.py and ../class-based-sdk/05_serve.py. +Identical to the class version: HotelAgent/RubricJudge are the shim classes from +01/02, and .router() delegates to the functional handle. Consuming a router is +the same regardless of how the workflow was authored. +""" + +import agenta as ag +import uvicorn +from fastapi import Depends, FastAPI + +from application import HotelAgent # 01_application.py +from evaluators import RubricJudge # 02_evaluators.py + +ag.init() + +app = FastAPI() + + +# Plain FastAPI composition. +app.include_router(HotelAgent.router(), prefix="/hotel", tags=["hotel"]) + +# Evaluators are workflows too. +app.include_router(RubricJudge.router(), prefix="/evaluators/rubric-judge") + + +# Dependencies, auth, middleware, versioned prefixes all keep working. +def verify_internal_token(): # noqa: D103 + ... + + +app.include_router( + HotelAgent.router(), + prefix="/internal/hotel", + dependencies=[Depends(verify_internal_token)], +) + +# A pinned instance serves with its baked-in parameters as defaults. +boutique = HotelAgent(parameters={"hotel_name": "Hotel California"}) +app.include_router(boutique.router(), prefix="/boutique") + + +@app.get("/healthz") +def healthz(): + return {"ok": True} + + +# No FastAPI app of your own? A class is directly servable as an ASGI app: +# asgi = HotelAgent.asgi() # then: uvicorn 05_serve:asgi +# or from the CLI: +# agenta serve application:HotelAgent --port 8000 + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/docs/designs/functional-based-class-sdk/06_framework_adapters.py b/docs/designs/functional-based-class-sdk/06_framework_adapters.py new file mode 100644 index 0000000000..e206afc45e --- /dev/null +++ b/docs/designs/functional-based-class-sdk/06_framework_adapters.py @@ -0,0 +1,73 @@ +"""Framework adapters on the functional core. (POC, does not run.) + +Diff against ../function-based-sdk/06_framework_adapters.py and +../class-based-sdk/06_framework_adapters.py. + +The manual tier (a plain ag.Application subclass that builds the framework agent +inside run()) works on the shim unchanged — shown below. The factory tier +(ag.ext.openai_agents.Application) and automatic tier (ag.Application.from_agent) +are framework-specific bases the SDK ships; on this core they are additional +WorkflowFunction.make(...) front-ends that set the right flags and adapter. They +all compile to the same Workflow, so they are noted here rather than restated. +""" + +import asyncio + +from pydantic_ai import Agent as PydanticAgent # noqa: E402 (not installed) +from pydantic import BaseModel + +import agenta as ag + +from application import HotelAgent # noqa: F401 — binds ag.Application from 00_core + + +# Tier 1: MANUAL. Plain ag.Application, framework inside run(). The agent is +# rebuilt per call from parameters, so the source of truth stays in Agenta. +class HotelAgentManual(ag.Application): + slug = "hotel-agent-pydantic-ai" + name = "Hotel Agent (Pydantic AI, manual)" + + class Parameters(BaseModel): + model: str = "openai:gpt-4o-mini" + instructions: str = "You are the concierge of {{hotel_name}}. Be brief." + hotel_name: str = "Grand Agenta Hotel" + + class Inputs(BaseModel): + message: str + + class Outputs(BaseModel): + answer: str + + async def run(self, *, inputs: Inputs, parameters: Parameters) -> Outputs: + agent = PydanticAgent( + parameters.model, + system_prompt=ag.render( + parameters.instructions, hotel_name=parameters.hotel_name + ), + ) + result = await agent.run(inputs.message) + return self.Outputs(answer=result.output) + + +# Tier 2 (FACTORY) and Tier 3 (AUTOMATIC) are framework-specific front-ends that +# the SDK ships on top of the same Workflow base: +# +# ag.ext.openai_agents.application(build=...) -> WorkflowFunction.make( +# handler=, flags=is_application, ...) +# ag.from_agent(agent, slug=...) -> introspect agent, then the +# same WorkflowFunction.make(...) +# +# Both yield a Workflow identical in kind to the manual tier. See the function +# folder's 06 for the decorator spelling; the class folder's 06 for the subclass +# spelling. They converge here. + + +async def main(): + ag.init() + await HotelAgentManual.apush() + agent = HotelAgentManual(parameters={"model": "openai:gpt-4.1"}) + await agent(message="Do you have a pool?") + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/functional-based-class-sdk/07_config_only.py b/docs/designs/functional-based-class-sdk/07_config_only.py new file mode 100644 index 0000000000..9448c9abb5 --- /dev/null +++ b/docs/designs/functional-based-class-sdk/07_config_only.py @@ -0,0 +1,112 @@ +"""ag.Configuration as sugar over the functional core. (POC, does not run.) + +Diff this against ../function-based-sdk/07_config_only.py (functional original) +and ../class-based-sdk/07_config_only.py (class proposal). + +A configuration has Parameters and nothing else: no Inputs, no Outputs, no +runnable. In 00_core.py that is just `_handler_name = None` and +`has_handler=False` — the Workflow base registers schemas + parameters and skips +the handler entirely. No empty `run()`/`evaluate()` to leave conspicuously +unimplemented. + +The fetch helpers (`afetch`, `fetch`) are config-only conveniences, added here +as a thin subclass rather than baked into the shared base. + +PART B is ../class-based-sdk/07_config_only.py running verbatim on the shim. +""" + +from __future__ import annotations + +import asyncio + +from pydantic import BaseModel, Field + +import agenta as ag + +from core import Application # 00_core.py — for the inline SupportAgent in main() +from core import Configuration as _Configuration # 00_core.py, has_handler=False + +# ========================================================================= +# PART A — config-only fetch helpers over the core Configuration front-end, +# plus binding ag.Application for the SupportAgent reference in PART B. +# ========================================================================= + + +class Configuration(_Configuration): + @classmethod + async def afetch(cls, **k): + return await cls._handle.fetch(**k) + + @classmethod + def fetch(cls, **k): + return cls._handle.fetch_sync(**k) + + +ag.Configuration = Configuration # type: ignore[attr-defined] +ag.Application = Application # type: ignore[attr-defined] + + +# ========================================================================= +# PART B — ../class-based-sdk/07_config_only.py, verbatim, on the shim. +# ========================================================================= + + +class ConciergeConfig(ag.Configuration): + slug = "concierge-config" + name = "Concierge Configuration" + description = "Everything the support flow needs, versioned and deployable." + + class Parameters(BaseModel): + prompt: ag.PromptTemplate = ag.PromptTemplate( + messages=[ + ag.Message( + role="system", content="You are the concierge of {{hotel_name}}." + ), + ], + llm_config=ag.ModelConfig(model="gpt-4o-mini"), + ) + escalation_keywords: list[str] = ["lawyer", "refund", "manager"] + model_by_tier: dict[str, str] = Field( + default={"free": "gpt-4o-mini", "pro": "gpt-4.1"}, + description="Routing table, editable without a deploy.", + ) + + +async def main(): + ag.init() + + await ConciergeConfig.apush() + await ConciergeConfig.adeploy(environment="production") + + config = await ConciergeConfig.afetch(environment="production") + print(config.escalation_keywords) + _model = config.model_by_tier["pro"] + _prompt = config.prompt.format(hotel_name="Grand Agenta Hotel") + + config = ConciergeConfig.fetch(environment="production") + + # An application can reference the config instead of duplicating values. + # ConciergeConfig.Parameters is reachable because the shim keeps the inner + # class as a plain attribute — same as the native class proposal. + class SupportAgent(ag.Application): + slug = "support-agent" + + class Parameters(BaseModel): + concierge: ConciergeConfig.Parameters = ag.Reference( + configuration="concierge-config", + environment="production", + ) + + class Inputs(BaseModel): + message: str + + class Outputs(BaseModel): + answer: str + + async def run(self, *, inputs: Inputs, parameters: Parameters) -> Outputs: + # parameters.concierge is resolved and typed. + ... + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/functional-based-class-sdk/08_testsets.py b/docs/designs/functional-based-class-sdk/08_testsets.py new file mode 100644 index 0000000000..ebd2db7790 --- /dev/null +++ b/docs/designs/functional-based-class-sdk/08_testsets.py @@ -0,0 +1,110 @@ +"""ag.Testset — NOT a Workflow. (POC, does not run.) + +Diff this against ../function-based-sdk/08_testsets.py (functional original) and +../class-based-sdk/08_testsets.py (class proposal). + +A testset is shaped differently from the runnable workflows: an inner `Case` +model (the row schema) + optional `cases` seed data, no Parameters and no +handler. It is absent from `WorkflowFlags` and does NOT subclass `Workflow`. In +00_core.py it gets its own tiny base, separate from the Application/Evaluator/ +Configuration front-ends. That separation is the point: the type hierarchy +matches the data model instead of forcing a non-workflow under `Workflow`. + +PART B is ../class-based-sdk/08_testsets.py running verbatim on the shim. +""" + +from __future__ import annotations + +import asyncio + +from pydantic import BaseModel, Field + +import agenta as ag + +from core import Testset # 00_core.py — its own base, NOT a Workflow + +from application import HotelAgent # 01_application.py (the compatibility check) +from evaluators import RubricJudge # 02_evaluators.py + +# ========================================================================= +# PART A — no base to define. ag.Testset is the standalone Testset base from +# 00_core.py (deliberately not under Workflow), bound onto `ag`. +# ========================================================================= + +ag.Testset = Testset # type: ignore[attr-defined] + + +# ========================================================================= +# PART B — ../../class-based-sdk/08_testsets.py, verbatim, on the shim. +# ========================================================================= + + +class HotelFAQ(ag.Testset): + slug = "hotel-faq" + name = "Hotel FAQ" + + class Case(BaseModel): + """One row. Becomes the testset's column schema.""" + + message: str + persona: str = "guest" + expected_answer: str + difficulty: int = Field(1, ge=1, le=3) + + cases = [ + Case( + message="Do you have a pool?", + expected_answer="Yes, open 7am to 10pm.", + ), + Case( + message="Can I check in at 6am?", + expected_answer="Early check-in depends on availability.", + difficulty=2, + ), + ] + + +async def main(): + ag.init() + + await HotelFAQ.apush() + + testset = await HotelFAQ.afetch() + for case in testset: + print(case.message, case.difficulty) + + await HotelFAQ.aadd( + cases=[HotelFAQ.Case(message="Is parking free?", expected_answer="Yes.")] + ) + + # Curate from production traces: map spans onto typed cases. + await HotelFAQ.afrom_traces( + filter=ag.TraceFilter( + application="hotel-agent", annotations={"thumbs": "down"} + ), + map=lambda span: HotelFAQ.Case( + message=span.inputs["message"], + expected_answer=span.annotations["corrected_answer"], + ), + ) + + # The payoff: compatibility is checked before anything runs. + # HotelFAQ.Case covers HotelAgent.Inputs (message, persona)? yes + # HotelFAQ.Case covers RubricJudge.Inputs (expected_answer)? yes + # A missing or mistyped column raises here, not after 200 LLM calls. + await ag.aevaluate( + testset=HotelFAQ, + application=HotelAgent, + evaluators=[RubricJudge], + ) + + # Mismatches fail loudly and early: + # + # await ag.aevaluate(testset=HotelFAQ, application=FlightAgent, ...) + # IncompatibleTestsetError: FlightAgent.Inputs requires column + # 'origin' (str); testset 'hotel-faq' has columns + # [message, persona, expected_answer, difficulty] + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/docs/designs/functional-based-class-sdk/README.md b/docs/designs/functional-based-class-sdk/README.md new file mode 100644 index 0000000000..b0fba536f5 --- /dev/null +++ b/docs/designs/functional-based-class-sdk/README.md @@ -0,0 +1,96 @@ +# functional-based-class SDK (POC) — demo #3 of three + +One of three peer demos under `docs/designs/`, with aligned filenames (`00_*` +foundation + `01_application.py` … `08_testsets.py`) so they diff 1:1: + +| folder | `00_*` foundation | `01`–`08` | +|---|---|---| +| [class-based-sdk/](../class-based-sdk/) | `00_core.py` — bases as native classes | **#1** class API, native | +| [function-based-sdk/](../function-based-sdk/) | `00_core.py` — no base classes; decorators + closures | **#2** function API, native | +| this folder | `00_core.py` — the **same bases on the functional core** | **#3** class API as sugar | + +Same `01`–`08` examples in all three; three different `00_*` foundations. This +folder's `00_core.py` rebuilds #1's bases on #2's function front-end, proving the +class is sugar. + +## The claim + +There is **one** workflow base and **two** authoring front-ends over it: + +```text + Workflow (the one base — wraps a handler, + │ holds flags + compiled schemas) + ┌──────────────┴──────────────┐ + WorkflowFunction ClassFrontEnd + (decorator authoring) (subclass authoring) + │ │ + ag.application ag.Application is_application flag + ag.evaluator ag.Evaluator is_evaluator flag + ag.configuration ag.Configuration has_handler = False +``` + +The class front-end does not register anything itself. Its `__init_subclass__` +reads the inner models + handler method off your class and **calls +`WorkflowFunction`**, which gathers exactly what a decorator gathers and builds +the one `Workflow`. So: **class authoring → function authoring → one base.** +That chain — all in [`00_core.py`](00_core.py) — is the whole argument. + +Grounded in the real SDK: `decorators/running.py` already has a `Workflow` that +wraps a function; `models/workflows.py::WorkflowFlags` already encodes kind +(`is_application`, `is_evaluator`, `has_handler`) as flags. So this isn't +invented structure — it's the structure the SDK already has, surfaced. + +## Layout + +Filenames are aligned 1:1 with the other two folders (`01_application.py` … +`08_testsets.py`), so each file diffs directly against its siblings. The shared +machinery that has no counterpart lives in `00_core.py`. + +| File | Shows | 1:1 sibling in #1 and #2 | +|---|---|---| +| `00_core.py` | `Workflow` + `WorkflowFunction` + `ClassFrontEnd` + `Testset` (the engine) | — (no counterpart; the "extra") | +| `01_application.py` | `ag.Application` | `01_application.py` | +| `02_evaluators.py` | `ag.Evaluator` | `02_evaluators.py` | +| `03_run_evals.py` | using handles in evals | `03_run_evals.py` | +| `04_config_registry.py` | fetch/bind/reference | `04_config_registry.py` | +| `05_serve.py` | routers | `05_serve.py` | +| `06_framework_adapters.py` | manual tier on the shim | `06_framework_adapters.py` | +| `07_config_only.py` | `ag.Configuration` | `07_config_only.py` | +| `08_testsets.py` | `ag.Testset` (not a Workflow) | `08_testsets.py` | + +Each `01`–`08` file has two parts: + +- **PART A** — binds the front-end from `00_core.py` onto `ag`. Nearly empty, + because the work lives in the shared core, not per kind. (`03`/`05` need no + bind — they only consume handles imported from `01`/`02`.) +- **PART B** — the class-based example from `../class-based-sdk/NN_*.py`, pasted + **verbatim**, running on PART A. It does not know it is sugar. + +## What the structure makes visible + +- **`01` vs `02`**: in `00_core.py`, `Evaluator` differs from `Application` by + one word (`_handler_name = "evaluate"`) and one flag (`is_evaluator`). That is + the entire "evaluators are a different class" story. +- **`07`**: `Configuration` is `_handler_name = None` + `has_handler=False`. No + runnable, nothing to leave unimplemented — why the functional + `ag.configuration(...)` is cleaner than an empty class body. +- **`08`**: `Testset` is **not** a `Workflow` and does not go through + `WorkflowFunction` at all. It has no parameters/handler/schemas-triple and is + absent from `WorkflowFlags`, so it gets its own small base. The hierarchy + matches the data model instead of papering over it. + +## Files 03, 04, 05, 06 have no shim + +They define **no** base class — they only *use* handles (`aevaluate`, `router`, +`from_registry`, `from_agent`). Compare those folder-to-folder directly: +`../function-based-sdk/03_run_evals.py` ↔ `../class-based-sdk/03_run_evals.py`, +and so on. Consuming a handle is the same whether it came from a decorator or a +class. + +## The conclusion + +The class front-end is an `__init_subclass__` that forwards into the function +front-end, which builds the one `Workflow`. It adds an authoring style — a +`class` statement, `self`, inner-model indentation, method-vs-function lookup — +and nothing else: no engine, no capability, no type. Ship the function +front-end; offer the class front-end if users want it. Both are the same base.