diff --git a/examples/notebooks/optimize_anything_api.ipynb b/examples/notebooks/optimize_anything_api.ipynb new file mode 100644 index 00000000..66966253 --- /dev/null +++ b/examples/notebooks/optimize_anything_api.ipynb @@ -0,0 +1,507 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "faae1913", + "metadata": {}, + "source": [ + "# Trace `optimize_anything` API tutorial\n", + "\n", + "[Open in Colab](https://colab.research.google.com/github/doxav/NewTrace/blob/experimental/examples/notebooks/optimize_anything_api.ipynb)\n", + "\n", + "This notebook demonstrates the additive `opto.optimize_anything` compatibility layer and compares it with native Trace. It starts with deterministic offline examples, then runs low-budget GPT-5 nano examples when OpenAI/OpenRouter credentials are configured. The examples are GEPA-style, but are tutorial examples inspired by public optimize-anything workflows rather than claims about any exact current blog implementation.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 29, + "id": "419a98e1", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T05:01:39.768604Z", + "iopub.status.busy": "2026-05-13T05:01:39.767982Z", + "iopub.status.idle": "2026-05-13T05:01:39.778494Z", + "shell.execute_reply": "2026-05-13T05:01:39.777024Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Python 3.12.13 Colab False\n" + ] + } + ], + "source": [ + "import os, sys, json, textwrap\n", + "IN_COLAB = \"google.colab\" in sys.modules\n", + "if IN_COLAB:\n", + " import subprocess\n", + " # Install the branch version because the new opto.optimize_anything API\n", + " # may not exist yet in the published trace-opt package.\n", + " trace_ref = os.getenv(\"TRACE_NOTEBOOK_REF\", \"experimental\")\n", + " subprocess.check_call([\n", + " sys.executable, \"-m\", \"pip\", \"install\", \"-q\",\n", + " f\"git+https://github.com/doxav/NewTrace.git@{trace_ref}\",\n", + " \"datasets\", \"litellm\",\n", + " ])\n", + "print(\"Python\", sys.version.split()[0], \"Colab\", IN_COLAB)\n" + ] + }, + { + "cell_type": "markdown", + "id": "376ed147", + "metadata": {}, + "source": [ + "## Configure OpenRouter or OpenAI\n", + "\n", + "The cell uses Colab secrets if available (`OPENROUTER_API_KEY`, `OPENAI_API_KEY`), then normal environment variables. OpenRouter uses LiteLLM's `openrouter/...` model convention and OpenAI defaults to `gpt-5-nano`.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "id": "9ccbd49b", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T05:01:39.781114Z", + "iopub.status.busy": "2026-05-13T05:01:39.780921Z", + "iopub.status.idle": "2026-05-13T05:01:39.790450Z", + "shell.execute_reply": "2026-05-13T05:01:39.789579Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'provider': 'OpenAI', 'model': 'gpt-4o-mini'}\n" + ] + } + ], + "source": [ + "def _colab_secret(name):\n", + " try:\n", + " from google.colab import userdata\n", + " return userdata.get(name)\n", + " except Exception:\n", + " return None\n", + "\n", + "openrouter_key = _colab_secret(\"OPENROUTER_API_KEY\") or os.getenv(\"OPENROUTER_API_KEY\")\n", + "openai_key = _colab_secret(\"OPENAI_API_KEY\") or os.getenv(\"OPENAI_API_KEY\")\n", + "os.environ.setdefault(\"TRACE_DEFAULT_LLM_BACKEND\", \"LiteLLM\")\n", + "\n", + "if openrouter_key:\n", + " os.environ[\"OPENROUTER_API_KEY\"] = openrouter_key\n", + " os.environ[\"OPENAI_API_KEY\"] = openrouter_key\n", + " os.environ.setdefault(\"OPENAI_API_BASE\", \"https://openrouter.ai/api/v1\")\n", + " os.environ.setdefault(\"TRACE_LITELLM_MODEL\", os.getenv(\"OPENROUTER_MODEL\", \"openrouter/openai/gpt-4o-mini\"))\n", + " provider = \"OpenRouter\"\n", + "elif openai_key:\n", + " os.environ[\"OPENAI_API_KEY\"] = openai_key\n", + " os.environ[\"TRACE_LITELLM_MODEL\"] = \"gpt-4o-mini\"\n", + " provider = \"OpenAI\"\n", + "else:\n", + " provider = \"offline\"\n", + "\n", + "HAS_LLM = provider != \"offline\"\n", + "print({\"provider\": provider, \"model\": os.getenv(\"TRACE_LITELLM_MODEL\")})\n" + ] + }, + { + "cell_type": "code", + "execution_count": 31, + "id": "23d0bc95", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T05:01:39.793503Z", + "iopub.status.busy": "2026-05-13T05:01:39.793092Z", + "iopub.status.idle": "2026-05-13T05:01:42.755389Z", + "shell.execute_reply": "2026-05-13T05:01:42.753731Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "optimize_anything ready EngineConfig(max_metric_calls=3, max_steps=None, higher_is_better=True, cache_evaluation=True, capture_stdio=False, candidate_selection_strategy='best', frontier_type='score', random_seed=0)\n" + ] + } + ], + "source": [ + "import opto.optimize_anything as oa\n", + "from opto.optimize_anything import TraceOptimizerBackend\n", + "from opto.trace import node, bundle, GRAPH\n", + "from opto.optimizers import OptoPrimeV2, OptoPrime, OPROv2, TextGrad\n", + "try:\n", + " from opto.optimizers import OptoPrimeMulti\n", + "except Exception:\n", + " OptoPrimeMulti = None\n", + "print(\"optimize_anything ready\", oa.EngineConfig(max_metric_calls=3))" + ] + }, + { + "cell_type": "markdown", + "id": "5cd3845e", + "metadata": {}, + "source": [ + "## Deterministic GEPA-style prompt optimization\n", + "\n", + "A candidate can be a string, dict, or JSON-like object. Evaluators can call `oa.log()`; logs are captured in evaluation records rather than printed during evaluation.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 32, + "id": "de52b3f0", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T05:01:42.758577Z", + "iopub.status.busy": "2026-05-13T05:01:42.758381Z", + "iopub.status.idle": "2026-05-13T05:01:42.771195Z", + "shell.execute_reply": "2026-05-13T05:01:42.769564Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "best_score 1.0\n", + "You are a helpful assistant.\n", + "Calculate carefully and answer only with the final answer.\n", + "first_record_logs ['prompt_len 28 question 2 + 2']\n" + ] + } + ], + "source": [ + "train_examples = [\n", + " {\"question\": \"2 + 2\", \"answer\": \"4\"},\n", + " {\"question\": \"3 * 3\", \"answer\": \"9\"},\n", + " {\"question\": \"10 - 7\", \"answer\": \"3\"},\n", + "]\n", + "\n", + "def deterministic_prompt_evaluator(candidate, example, opt_state=None):\n", + " prompt = candidate if isinstance(candidate, str) else candidate.get(\"prompt\", \"\")\n", + " score = 0.2\n", + " if \"calculate\" in prompt.lower() or \"solve\" in prompt.lower(): score += 0.4\n", + " if \"answer only\" in prompt.lower() or \"final answer\" in prompt.lower(): score += 0.4\n", + " oa.log(\"prompt_len\", len(prompt), \"question\", example[\"question\"])\n", + " return min(score, 1.0), {\"scores\": {\"prompt_proxy\": score}}\n", + "\n", + "def deterministic_proposer(candidate, feedback, **kwargs):\n", + " if \"answer only\" not in candidate.lower():\n", + " return candidate + \"\\nCalculate carefully and answer only with the final answer.\"\n", + " return candidate\n", + "\n", + "result = oa.optimize_anything(\n", + " seed_candidate=\"You are a helpful assistant.\",\n", + " evaluator=deterministic_prompt_evaluator,\n", + " dataset=train_examples,\n", + " objective=\"Improve exact-answer arithmetic prompt quality.\",\n", + " config=oa.GEPAConfig(\n", + " engine=oa.EngineConfig(max_metric_calls=12, max_steps=2, capture_stdio=True),\n", + " reflection=oa.ReflectionConfig(custom_candidate_proposer=deterministic_proposer),\n", + " ),\n", + ")\n", + "print(\"best_score\", result.best_score)\n", + "print(result.best_candidate)\n", + "print(\"first_record_logs\", result.history[0].logs)\n" + ] + }, + { + "cell_type": "markdown", + "id": "918d8d0e", + "metadata": {}, + "source": [ + "## Trace optimizer backend\n", + "\n", + "`TraceOptimizerBackend` adapts Trace optimizers (`OptoPrimeV2`, `OptoPrime`, `OptoPrimeMulti`, `OPROv2`, `TextGrad`, or custom protocol-compatible classes) to the proposer interface. The live cell is skipped without credentials.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 33, + "id": "ca249d52", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T05:01:42.774013Z", + "iopub.status.busy": "2026-05-13T05:01:42.773817Z", + "iopub.status.idle": "2026-05-13T05:01:42.781481Z", + "shell.execute_reply": "2026-05-13T05:01:42.780490Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " 0.2\n", + "You are a helpful assistant.\n" + ] + } + ], + "source": [ + "if HAS_LLM:\n", + " trace_backend = TraceOptimizerBackend(\n", + " optimizer_cls=\"OptoPrimeV2\",\n", + " optimizer_kwargs={\"memory_size\": 1, \"use_json_object_format\": False},\n", + " )\n", + " llm_result = oa.optimize_anything(\n", + " seed_candidate=\"You are a helpful assistant.\",\n", + " evaluator=deterministic_prompt_evaluator,\n", + " dataset=train_examples[:1],\n", + " objective=\"Make the prompt concise and exact-answer oriented.\",\n", + " config=oa.GEPAConfig(\n", + " engine=oa.EngineConfig(max_metric_calls=2, max_steps=1, capture_stdio=True),\n", + " reflection=oa.ReflectionConfig(custom_candidate_proposer=trace_backend),\n", + " ),\n", + " )\n", + " print(type(llm_result.best_candidate), llm_result.best_score)\n", + " print(str(llm_result.best_candidate)[:300])\n", + "else:\n", + " print(\"Skipping live TraceOptimizerBackend demo: no OPENAI_API_KEY/OPENROUTER_API_KEY configured.\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "c2d2fdeb", + "metadata": {}, + "source": [ + "## Native Trace API comparison\n", + "\n", + "The native API is graph-first: `node -> bundle -> optimizer.backward -> optimizer.step`. The compatibility API is evaluator-first: `candidate -> evaluator -> proposer`.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "id": "350d64a3", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T05:01:42.783909Z", + "iopub.status.busy": "2026-05-13T05:01:42.783717Z", + "iopub.status.idle": "2026-05-13T05:01:46.978809Z", + "shell.execute_reply": "2026-05-13T05:01:46.977457Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'str:0': '4'}\n" + ] + } + ], + "source": [ + "@bundle()\n", + "def evaluate_prompt_text(prompt, question):\n", + " return prompt + \"\\nQuestion: \" + question\n", + "\n", + "GRAPH.clear()\n", + "prompt = node(\"You are a helpful assistant.\", trainable=True, description=\"Arithmetic answer prompt\")\n", + "output = evaluate_prompt_text(prompt, \"2 + 2\")\n", + "optimizer = OptoPrimeV2([prompt], use_json_object_format=False, memory_size=1, max_tokens=256)\n", + "optimizer.zero_feedback()\n", + "optimizer.backward(output, \"The answer should be concise and answer only with the final number.\")\n", + "\n", + "if HAS_LLM:\n", + " update = optimizer.step(bypassing=True)\n", + " print({k.name: str(v)[:200] for k, v in update.items()})\n", + "else:\n", + " summary = optimizer.summarize()\n", + " system_prompt, user_prompt = optimizer.construct_prompt(summary)\n", + " print(system_prompt.splitlines()[0])\n", + " print(user_prompt[:300])\n" + ] + }, + { + "cell_type": "markdown", + "id": "fca5d509", + "metadata": {}, + "source": [ + "## BBEH / BBH-style task selection\n", + "\n", + "Set `BBEH_TASK`, and optionally `BBEH_HF_DATASET`/`BBEH_SPLIT`, to load a HuggingFace dataset. Local mini tasks keep the notebook runnable offline.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "id": "bc8b3b9c", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T05:01:46.982113Z", + "iopub.status.busy": "2026-05-13T05:01:46.981723Z", + "iopub.status.idle": "2026-05-13T05:01:46.990766Z", + "shell.execute_reply": "2026-05-13T05:01:46.990020Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "boolean_expressions [{'input': 'not ( true and false )', 'target': 'true'}, {'input': 'true and false', 'target': 'false'}]\n" + ] + } + ], + "source": [ + "FALLBACK_TASKS = {\n", + " \"boolean_expressions\": [\n", + " {\"input\": \"not ( true and false )\", \"target\": \"true\"},\n", + " {\"input\": \"true and false\", \"target\": \"false\"},\n", + " ],\n", + " \"date_understanding\": [\n", + " {\"input\": \"Today is Monday. What day is tomorrow?\", \"target\": \"Tuesday\"},\n", + " {\"input\": \"Yesterday was Friday. What day is today?\", \"target\": \"Saturday\"},\n", + " ],\n", + " \"word_sorting\": [\n", + " {\"input\": \"Sort: zebra apple lemon\", \"target\": \"apple lemon zebra\"},\n", + " {\"input\": \"Sort: beta alpha gamma\", \"target\": \"alpha beta gamma\"},\n", + " ],\n", + "}\n", + "\n", + "def load_bbeh_like_task(task_name=\"boolean_expressions\", n=8):\n", + " dataset_id = os.getenv(\"BBEH_HF_DATASET\")\n", + " if dataset_id:\n", + " try:\n", + " from datasets import load_dataset\n", + " return list(load_dataset(dataset_id, task_name, split=os.getenv(\"BBEH_SPLIT\", \"test\")))[:n]\n", + " except Exception as exc:\n", + " print(\"Falling back to local examples:\", exc)\n", + " return FALLBACK_TASKS.get(task_name, FALLBACK_TASKS[\"boolean_expressions\"])[:n]\n", + "\n", + "task_name = os.getenv(\"BBEH_TASK\", \"boolean_expressions\")\n", + "task_examples = load_bbeh_like_task(task_name)\n", + "print(task_name, task_examples)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 36, + "id": "004fe95f", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T05:01:46.994225Z", + "iopub.status.busy": "2026-05-13T05:01:46.993973Z", + "iopub.status.idle": "2026-05-13T05:01:47.003999Z", + "shell.execute_reply": "2026-05-13T05:01:47.002864Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "0.8999999999999999\n", + "Solve the task.\n", + "For boolean_expressions, reason briefly, then provide the final answer only.\n" + ] + } + ], + "source": [ + "def bbeh_style_evaluator(candidate, example):\n", + " prompt = candidate if isinstance(candidate, str) else candidate.get(\"prompt\", \"\")\n", + " lower = prompt.lower()\n", + " score = 0.1\n", + " if \"think\" in lower or \"reason\" in lower: score += 0.35\n", + " if \"answer only\" in lower or \"final answer\" in lower: score += 0.45\n", + " if task_name.replace(\"_\", \" \") in lower: score += 0.10\n", + " return min(score, 1.0), {\"scores\": {\"prompt_proxy\": score}, \"task\": task_name}\n", + "\n", + "def bbeh_tutorial_proposer(candidate, feedback, **kwargs):\n", + " return (candidate + f\"\\nFor {task_name}, reason briefly, then provide the final answer only.\").strip()\n", + "\n", + "bbeh_result = oa.optimize_anything(\n", + " seed_candidate=\"Solve the task.\",\n", + " evaluator=bbeh_style_evaluator,\n", + " dataset=task_examples,\n", + " objective=f\"Improve performance on {task_name}.\",\n", + " config=oa.GEPAConfig(\n", + " engine=oa.EngineConfig(max_metric_calls=8, max_steps=2),\n", + " reflection=oa.ReflectionConfig(custom_candidate_proposer=bbeh_tutorial_proposer),\n", + " ),\n", + ")\n", + "print(bbeh_result.best_score)\n", + "print(bbeh_result.best_candidate)\n" + ] + }, + { + "cell_type": "markdown", + "id": "fda9caea", + "metadata": {}, + "source": [ + "## Optional `OptoPrimeMulti`\n", + "\n", + "`OptoPrimeMulti` is available as a multi-candidate backend, but it is not the default backend. The cell uses tiny generation settings and skips without credentials.\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "d6b58176", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T05:01:47.006244Z", + "iopub.status.busy": "2026-05-13T05:01:47.006049Z", + "iopub.status.idle": "2026-05-13T05:01:47.012029Z", + "shell.execute_reply": "2026-05-13T05:01:47.010933Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + " 0.2\n" + ] + } + ], + "source": [ + "if HAS_LLM and OptoPrimeMulti is not None:\n", + " multi_backend = TraceOptimizerBackend(\n", + " optimizer_cls=\"OptoPrimeMulti\",\n", + " optimizer_kwargs={\"num_responses\": 2, \"max_tokens\": 256},\n", + " )\n", + " multi_result = oa.optimize_anything(\n", + " seed_candidate=\"Solve the task.\",\n", + " evaluator=bbeh_style_evaluator,\n", + " dataset=task_examples[:1],\n", + " objective=f\"Improve performance on {task_name}.\",\n", + " config=oa.GEPAConfig(\n", + " engine=oa.EngineConfig(max_metric_calls=2, max_steps=1),\n", + " reflection=oa.ReflectionConfig(custom_candidate_proposer=multi_backend),\n", + " ),\n", + " )\n", + " print(f\"type: {type(multi_result.best_candidate)}, score: {multi_result.best_score}\")\n", + "else:\n", + " print(\"Skipping OptoPrimeMulti backend demo.\")\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "humanllm", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/opto/optimize_anything/__init__.py b/opto/optimize_anything/__init__.py new file mode 100644 index 00000000..c4b91172 --- /dev/null +++ b/opto/optimize_anything/__init__.py @@ -0,0 +1,40 @@ +"""GEPA optimize_anything-compatible API for Trace.""" + +from opto.optimize_anything.api import ( + EngineConfig, + EvaluationRecord, + GEPAConfig, + GEPAResult, + MergeConfig, + OptimizationState, + ReflectionConfig, + RefinerConfig, + TrackingConfig, + get_log_context, + log, + make_litellm_lm, + optimize_anything, + reset_log_context, + set_log_context, +) +from opto.optimize_anything.trace_backend import TraceOptimizerBackend, resolve_optimizer_cls + +__all__ = [ + "EngineConfig", + "EvaluationRecord", + "GEPAConfig", + "GEPAResult", + "MergeConfig", + "OptimizationState", + "ReflectionConfig", + "RefinerConfig", + "TrackingConfig", + "TraceOptimizerBackend", + "get_log_context", + "log", + "make_litellm_lm", + "optimize_anything", + "reset_log_context", + "resolve_optimizer_cls", + "set_log_context", +] diff --git a/opto/optimize_anything/api.py b/opto/optimize_anything/api.py new file mode 100644 index 00000000..1df25027 --- /dev/null +++ b/opto/optimize_anything/api.py @@ -0,0 +1,546 @@ +from __future__ import annotations + +import contextlib +import contextvars +import copy +import inspect +import io +import json +import statistics +from dataclasses import asdict, dataclass, field, is_dataclass +from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple + +_LOG_CONTEXT = contextvars.ContextVar("opto_optimize_anything_log_context", default=None) + + +def set_log_context(logs: Optional[List[str]]): + """Set the context-local optimize_anything log sink and return its token.""" + return _LOG_CONTEXT.set(logs) + + +def reset_log_context(token) -> None: + _LOG_CONTEXT.reset(token) + + +def get_log_context() -> Optional[List[str]]: + return _LOG_CONTEXT.get() + + +def log(*values: Any, sep: str = " ", end: str = "\n", flush: bool = False) -> None: + """Append to the current evaluation log sink, or print outside one.""" + text = sep.join(str(v) for v in values) + sink = get_log_context() + if sink is None: + print(text, end=end, flush=flush) + else: + sink.append(text) + + +@dataclass +class EngineConfig: + max_metric_calls: int = 20 + max_steps: Optional[int] = None + higher_is_better: bool = True + cache_evaluation: bool = True + capture_stdio: bool = False + candidate_selection_strategy: str = "best" + frontier_type: str = "score" + random_seed: int = 0 + + +@dataclass +class ReflectionConfig: + custom_candidate_proposer: Optional[Callable[..., Any]] = None + reflection_lm: Optional[Any] = None + reflection_minibatch_size: int = 1 + + +@dataclass +class RefinerConfig: + enabled: bool = False + max_refinements: int = 0 + + +@dataclass +class MergeConfig: + enabled: bool = False + max_merge_candidates: int = 4 + + +@dataclass +class TrackingConfig: + enabled: bool = True + run_name: Optional[str] = None + + +@dataclass +class GEPAConfig: + engine: EngineConfig = field(default_factory=EngineConfig) + reflection: ReflectionConfig = field(default_factory=ReflectionConfig) + refiner: RefinerConfig = field(default_factory=RefinerConfig) + merge: MergeConfig = field(default_factory=MergeConfig) + tracking: TrackingConfig = field(default_factory=TrackingConfig) + + +@dataclass +class OptimizationState: + step: int = 0 + metric_calls: int = 0 + candidate: Any = None + best_candidate: Any = None + best_score: Optional[float] = None + objective: Optional[str] = None + config: GEPAConfig = field(default_factory=GEPAConfig) + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class EvaluationRecord: + candidate: Any + example: Any + score: float + side_info: Any = None + stdout: str = "" + stderr: str = "" + logs: List[str] = field(default_factory=list) + cached: bool = False + step: int = 0 + + def to_dict(self) -> Dict[str, Any]: + return { + "candidate": _json_like(self.candidate), + "example": _json_like(self.example), + "score": self.score, + "side_info": _json_like(self.side_info), + "stdout": self.stdout, + "stderr": self.stderr, + "logs": list(self.logs), + "cached": self.cached, + "step": self.step, + } + + +@dataclass +class GEPAResult: + best_candidate: Any + best_score: Optional[float] + candidate_scores: List[Tuple[Any, Optional[float]]] + history: List[EvaluationRecord] + config: GEPAConfig + total_metric_calls: int + validation_score: Optional[float] = None + validation_records: List[EvaluationRecord] = field(default_factory=list) + metadata: Dict[str, Any] = field(default_factory=dict) + + @property + def candidates(self) -> List[Any]: + return [candidate for candidate, _ in self.candidate_scores] + + @property + def scores(self) -> List[float]: + return [score for _, score in self.candidate_scores if score is not None] + + @property + def metric_calls(self) -> int: + return self.total_metric_calls + + @property + def validation_history(self) -> List[EvaluationRecord]: + return self.validation_records + + def to_dict(self) -> Dict[str, Any]: + return { + "best_candidate": _json_like(self.best_candidate), + "best_score": self.best_score, + "candidate_scores": [ + {"candidate": _json_like(candidate), "score": score} + for candidate, score in self.candidate_scores + ], + "candidates": _json_like(self.candidates), + "scores": self.scores, + "history": [r.to_dict() for r in self.history], + "validation_score": self.validation_score, + "validation_records": [r.to_dict() for r in self.validation_records], + "validation_history": [r.to_dict() for r in self.validation_records], + "total_metric_calls": self.total_metric_calls, + "metric_calls": self.total_metric_calls, + "config": _json_like(asdict(self.config)), + "metadata": _json_like(self.metadata), + } + + +def make_litellm_lm(*args: Any, **kwargs: Any) -> Any: + """Return Trace's LiteLLM backend lazily, matching GEPA-style helpers.""" + from opto.utils.llm import LiteLLM + + return LiteLLM(*args, **kwargs) + + + +def _json_like(value: Any) -> Any: + if is_dataclass(value): + return _json_like(asdict(value)) + if isinstance(value, dict): + return {str(k): _json_like(v) for k, v in value.items()} + if isinstance(value, (list, tuple)): + return [_json_like(v) for v in value] + if isinstance(value, set): + return sorted((_json_like(v) for v in value), key=repr) + if isinstance(value, (str, int, float, bool)) or value is None: + return value + return repr(value) + + +def _stable(value: Any) -> Any: + if is_dataclass(value): + return _stable(asdict(value)) + if isinstance(value, dict): + return {str(k): _stable(v) for k, v in sorted(value.items(), key=lambda item: repr(item[0]))} + if isinstance(value, (list, tuple)): + return [_stable(v) for v in value] + if isinstance(value, set): + return sorted((_stable(v) for v in value), key=repr) + try: + json.dumps(value) + return value + except TypeError: + return repr(value) + + +def _copy_config(config: Optional[GEPAConfig]) -> GEPAConfig: + return copy.deepcopy(config) if config is not None else GEPAConfig() + + +def _patch_config_from_kwargs(config: GEPAConfig, kwargs: Dict[str, Any]) -> GEPAConfig: + groups = { + "engine": EngineConfig, + "reflection": ReflectionConfig, + "refiner": RefinerConfig, + "merge": MergeConfig, + "tracking": TrackingConfig, + } + for key in list(kwargs): + for attr, cls in groups.items(): + if key in cls.__dataclass_fields__: + setattr(getattr(config, attr), key, kwargs.pop(key)) + break + return config + + +def _stable_json(value: Any) -> str: + return json.dumps(_stable(value), sort_keys=True, separators=(",", ":"), default=repr) + + +def _cache_key(candidate: Any, example: Any) -> Tuple[str, str]: + return _stable_json(candidate), _stable_json(example) + + +def _examples(dataset: Optional[Iterable[Any]]) -> List[Any]: + if dataset is None: + return [None] + values = dataset if isinstance(dataset, list) else list(dataset) + return values or [None] + + +def _mean(values: Sequence[float]) -> float: + return float(statistics.fmean(values)) if values else float("nan") + + +def _score_from_side_info(side_info: Any) -> Optional[float]: + if not isinstance(side_info, dict) or "scores" not in side_info: + return None + scores = side_info["scores"] + values = scores.values() if isinstance(scores, dict) else scores if isinstance(scores, (list, tuple)) else [] + numeric = [float(v) for v in values if isinstance(v, (int, float, bool))] + return _mean(numeric) if numeric else None + + +def _coerce_evaluator_return(value: Any) -> Tuple[float, Any]: + score, side_info = (value if isinstance(value, tuple) and len(value) == 2 else (value, None)) + if isinstance(score, (int, float, bool)): + return float(score), side_info + inferred = _score_from_side_info(side_info) + if inferred is not None: + return inferred, side_info + raise TypeError("Evaluator must return a numeric score, bool, or (score, side_info) with numeric side_info['scores'].") + + +def _positional_capacity(sig: inspect.Signature) -> Tuple[int, bool]: + count = 0 + varargs = False + for p in sig.parameters.values(): + if p.kind == p.VAR_POSITIONAL: + varargs = True + elif p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) and p.default is p.empty: + count += 1 + return count, varargs + + +def _call_flex(fn: Callable[..., Any], ordered: Sequence[Any], **available: Any) -> Any: + """Call a GEPA-style evaluator/proposer with flexible signatures. + + Prefer keyword dispatch only when all required positional-or-keyword + parameters can be satisfied by known names. Otherwise fall back to + positional dispatch using the supplied ordered arguments. + + This avoids a subtle bug for callables such as: + + def evaluator(candidate, e): ... + + where the first parameter name is known but the second one is arbitrary. + """ + try: + sig = inspect.signature(fn) + except (TypeError, ValueError): + return fn(*ordered) + + params = list(sig.parameters.values()) + + required_positional = [ + p + for p in params + if p.kind in (p.POSITIONAL_ONLY, p.POSITIONAL_OR_KEYWORD) + and p.default is p.empty + ] + has_varargs = any(p.kind == p.VAR_POSITIONAL for p in params) + has_varkw = any(p.kind == p.VAR_KEYWORD for p in params) + has_positional_only = any(p.kind == p.POSITIONAL_ONLY for p in params) + + required_names_are_known = all( + p.kind != p.POSITIONAL_ONLY and p.name in available + for p in required_positional + ) + + if required_names_are_known and not has_positional_only: + kwargs = dict(available) if has_varkw else { + k: v + for k, v in available.items() + if k in sig.parameters + and sig.parameters[k].kind + in (sig.parameters[k].POSITIONAL_OR_KEYWORD, sig.parameters[k].KEYWORD_ONLY) + } + return fn(**kwargs) + + required, varargs = _positional_capacity(sig) + n = len(ordered) if (varargs or has_varargs) else min(len(ordered), max(required, 1)) + positional = list(ordered[:n]) + kwargs = { + p.name: available[p.name] + for p in params + if p.kind == p.KEYWORD_ONLY and p.name in available + } + if has_varkw: + kwargs.update({k: v for k, v in available.items() if k not in kwargs}) + return fn(*positional, **kwargs) + + +class EvaluatorWrapper: + def __init__(self, evaluator: Callable[..., Any], config: EngineConfig): + self.evaluator = evaluator + self.config = config + self.cache: Dict[Tuple[str, str], EvaluationRecord] = {} + + def __call__(self, *, candidate: Any, example: Any, opt_state: OptimizationState, count_budget: bool = True) -> EvaluationRecord: + key = _cache_key(candidate, example) + if self.config.cache_evaluation and key in self.cache: + cached = copy.deepcopy(self.cache[key]) + cached.cached = True + cached.step = opt_state.step + return cached + + logs: List[str] = [] + stdout, stderr = io.StringIO(), io.StringIO() + token = set_log_context(logs) + try: + out_cm = contextlib.redirect_stdout(stdout) if self.config.capture_stdio else contextlib.nullcontext() + err_cm = contextlib.redirect_stderr(stderr) if self.config.capture_stdio else contextlib.nullcontext() + with out_cm, err_cm: + raw = _call_flex( + self.evaluator, + (candidate, example, opt_state), + candidate=candidate, + example=example, + opt_state=opt_state, + ) + finally: + reset_log_context(token) + + score, side_info = _coerce_evaluator_return(raw) + if count_budget: + opt_state.metric_calls += 1 + record = EvaluationRecord( + candidate=copy.deepcopy(candidate), + example=copy.deepcopy(example), + score=score, + side_info=copy.deepcopy(side_info), + stdout=stdout.getvalue(), + stderr=stderr.getvalue(), + logs=logs, + step=opt_state.step, + ) + if self.config.cache_evaluation: + self.cache[key] = copy.deepcopy(record) + return record + + +def _is_better(score: Optional[float], incumbent: Optional[float], higher_is_better: bool) -> bool: + if score is None: + return False + if incumbent is None: + return True + return score > incumbent if higher_is_better else score < incumbent + + +def _aggregate(records: Sequence[EvaluationRecord]) -> Optional[float]: + return _mean([r.score for r in records]) if records else None + + +def _feedback(candidate: Any, objective: Optional[str], score: Optional[float], records: Sequence[EvaluationRecord]) -> str: + lines = [] + if objective: + lines.append(f"Objective: {objective}") + lines.append(f"Candidate: {candidate!r}") + lines.append(f"Aggregate score: {score}") + for i, r in enumerate(records): + lines.append(f"Example {i}: score={r.score}, side_info={r.side_info!r}") + if r.logs: + lines.append("Logs: " + " | ".join(r.logs)) + if r.stdout: + lines.append("Stdout: " + r.stdout.strip()) + if r.stderr: + lines.append("Stderr: " + r.stderr.strip()) + return "\n".join(lines) + + +def _normalize_proposals(raw: Any) -> List[Any]: + if raw is None: + return [] + if isinstance(raw, list): + return raw + if isinstance(raw, tuple): + return list(raw) + return [raw] + + +def _default_proposer() -> Callable[..., Any]: + from opto.optimize_anything.trace_backend import TraceOptimizerBackend + + return TraceOptimizerBackend() + + +def _call_proposer(proposer: Callable[..., Any], *, candidate: Any, feedback: str, objective: Optional[str], side_info: Any, opt_state: OptimizationState) -> List[Any]: + raw = _call_flex( + proposer, + (candidate, feedback), + candidate=candidate, + feedback=feedback, + objective=objective, + side_info=side_info, + opt_state=opt_state, + ) + return _normalize_proposals(raw) + + +def _evaluate_candidate(wrapper: EvaluatorWrapper, candidate: Any, examples: Sequence[Any], opt_state: OptimizationState, budget: int) -> List[EvaluationRecord]: + records: List[EvaluationRecord] = [] + for example in examples: + key = _cache_key(candidate, example) + if opt_state.metric_calls >= budget and not (wrapper.config.cache_evaluation and key in wrapper.cache): + break + opt_state.candidate = candidate + records.append(wrapper(candidate=candidate, example=example, opt_state=opt_state)) + return records + + +def optimize_anything( + *, + seed_candidate: Any = None, + evaluator: Callable[..., Any], + dataset: Optional[Iterable[Any]] = None, + valset: Optional[Iterable[Any]] = None, + objective: Optional[str] = None, + config: Optional[GEPAConfig] = None, + **direct_config_kwargs: Any, +) -> GEPAResult: + if evaluator is None: + raise ValueError("evaluator is required") + + config = _patch_config_from_kwargs(_copy_config(config), direct_config_kwargs) + if direct_config_kwargs: + raise TypeError("Unknown optimize_anything keyword argument(s): " + ", ".join(sorted(direct_config_kwargs))) + + train_examples = _examples(dataset) + validation_examples = _examples(valset) if valset is not None else [] + wrapper = EvaluatorWrapper(evaluator, config.engine) + proposer = config.reflection.custom_candidate_proposer or _default_proposer() + opt_state = OptimizationState(objective=objective, config=config) + + best_candidate = None + best_score: Optional[float] = None + current_candidate = seed_candidate + current_score: Optional[float] = None + current_records: List[EvaluationRecord] = [] + history: List[EvaluationRecord] = [] + candidate_scores: List[Tuple[Any, Optional[float]]] = [] + candidate_records: Dict[str, List[EvaluationRecord]] = {} + + def evaluate(candidate: Any) -> Tuple[Optional[float], List[EvaluationRecord]]: + nonlocal best_candidate, best_score + records = _evaluate_candidate(wrapper, candidate, train_examples, opt_state, config.engine.max_metric_calls) + if not records: + return None, [] + score = _aggregate(records) + history.extend(records) + candidate_scores.append((copy.deepcopy(candidate), score)) + candidate_records[_stable_json(candidate)] = records + if _is_better(score, best_score, config.engine.higher_is_better): + best_candidate = copy.deepcopy(candidate) + best_score = score + opt_state.best_candidate = best_candidate + opt_state.best_score = best_score + return score, records + + current_score, current_records = evaluate(current_candidate) + max_steps = 0 if config.engine.max_steps == 0 else (config.engine.max_steps or config.engine.max_metric_calls) + + for step in range(max_steps): + if opt_state.metric_calls >= config.engine.max_metric_calls: + break + opt_state.step = step + 1 + source = best_candidate if config.engine.candidate_selection_strategy == "best" else current_candidate + source_score = best_score if source == best_candidate else current_score + source_records = candidate_records.get(_stable_json(source), current_records) + proposals = _call_proposer( + proposer, + candidate=source, + feedback=_feedback(source, objective, source_score, source_records), + objective=objective, + side_info=[r.side_info for r in source_records], + opt_state=opt_state, + ) + if not proposals: + break + for proposal in proposals: + if opt_state.metric_calls >= config.engine.max_metric_calls: + break + current_candidate = proposal + current_score, current_records = evaluate(proposal) + + validation_records: List[EvaluationRecord] = [] + validation_score = None + if validation_examples and best_candidate is not None: + opt_state.step += 1 + validation_records = _evaluate_candidate(wrapper, best_candidate, validation_examples, opt_state, config.engine.max_metric_calls) + validation_score = _aggregate(validation_records) + + return GEPAResult( + best_candidate=best_candidate, + best_score=best_score, + candidate_scores=candidate_scores, + history=history, + config=config, + total_metric_calls=opt_state.metric_calls, + validation_score=validation_score, + validation_records=validation_records, + metadata={"objective": objective}, + ) diff --git a/opto/optimize_anything/trace_backend.py b/opto/optimize_anything/trace_backend.py new file mode 100644 index 00000000..e071468f --- /dev/null +++ b/opto/optimize_anything/trace_backend.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +import copy +import json +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, Optional, Type, Union + +from opto.trace import bundle, node + + +def resolve_optimizer_cls(optimizer_cls: Optional[Union[str, Type[Any]]] = None) -> Type[Any]: + if optimizer_cls is None: + try: + from opto.optimizers import OptoPrimeV2 + + return OptoPrimeV2 + except Exception: + from opto.optimizers import OptoPrime + + return OptoPrime + if isinstance(optimizer_cls, str): + import opto.optimizers as optimizers + + try: + return getattr(optimizers, optimizer_cls) + except AttributeError as exc: + available = sorted(name for name in getattr(optimizers, "__all__", dir(optimizers)) if not name.startswith("_")) + raise ValueError(f"Unknown Trace optimizer '{optimizer_cls}'. Available optimizers include: {available}") from exc + if not isinstance(optimizer_cls, type): + raise ValueError(f"optimizer_cls must be a class or string name, got {type(optimizer_cls).__name__}") + return optimizer_cls + + +def _jsonable(candidate: Any) -> bool: + try: + json.dumps(candidate) + return True + except TypeError: + return False + + +def default_candidate_serializer(candidate: Any) -> Any: + if isinstance(candidate, str): + return candidate + if isinstance(candidate, dict) and len(candidate) == 1: + value = next(iter(candidate.values())) + if isinstance(value, (str, int, float, bool)) or value is None: + return value + if _jsonable(candidate): + return json.dumps(candidate, sort_keys=True) + return repr(candidate) + + +def default_candidate_deserializer(original: Any, proposed: Any) -> Any: + if isinstance(original, str): + return str(proposed) + if isinstance(proposed, str): + try: + decoded = json.loads(proposed) + except Exception: + decoded = None + if isinstance(original, dict) and isinstance(decoded, dict): + return decoded + if isinstance(original, list) and isinstance(decoded, list): + return decoded + if isinstance(original, dict) and len(original) == 1 and not isinstance(proposed, dict): + key = next(iter(original)) + return {key: proposed} + return copy.deepcopy(proposed) + + +@bundle( + description="[optimize_anything_candidate] Identity wrapper used to expose an optimize_anything candidate to Trace.", + trainable=False, +) +def _identity_candidate(candidate): + return candidate + + +@dataclass +class TraceOptimizerBackend: + """Adapt Trace optimizers to the optimize_anything candidate-proposer protocol.""" + + optimizer_cls: Optional[Union[str, Type[Any]]] = None + optimizer_kwargs: Dict[str, Any] = field(default_factory=dict) + parameter_name: str = "candidate" + candidate_serializer: Callable[[Any], Any] = default_candidate_serializer + candidate_deserializer: Optional[Callable[[Any, Any], Any]] = None + + def __post_init__(self) -> None: + self.optimizer_cls = resolve_optimizer_cls(self.optimizer_cls) + + def __call__( + self, + *, + candidate: Any, + feedback: str, + objective: Optional[str] = None, + side_info: Optional[Any] = None, + opt_state: Optional[Any] = None, + **kwargs: Any, + ) -> Any: + del side_info, opt_state, kwargs + original = copy.deepcopy(candidate) + parameter = node( + self.candidate_serializer(original), + name=self.parameter_name, + trainable=True, + description="Candidate optimized through the optimize_anything compatibility layer.", + ) + output = _identity_candidate(parameter) + optimizer_kwargs = dict(self.optimizer_kwargs) + optimizer = self._make_optimizer(parameter, optimizer_kwargs, objective) + optimizer.zero_feedback() + optimizer.backward(output, feedback) + updates = self._propose_without_mutating(optimizer) + if not updates or parameter not in updates: + return original + proposed = updates[parameter] + if self.candidate_deserializer is not None: + return self.candidate_deserializer(original, proposed) + return default_candidate_deserializer(original, proposed) + + def _make_optimizer(self, parameter: Any, optimizer_kwargs: Dict[str, Any], objective: Optional[str]) -> Any: + if objective is not None and "objective" not in optimizer_kwargs: + try: + return self.optimizer_cls([parameter], objective=objective, **optimizer_kwargs) + except TypeError as exc: + if "objective" not in str(exc): + raise + return self.optimizer_cls([parameter], **optimizer_kwargs) + + @staticmethod + def _propose_without_mutating(optimizer: Any) -> Dict[Any, Any]: + if hasattr(optimizer, "step"): + try: + return optimizer.step(bypassing=True) + except TypeError: + return optimizer.step() + if hasattr(optimizer, "propose"): + return optimizer.propose() + raise TypeError(f"{optimizer.__class__.__name__} does not implement step() or propose()") diff --git a/tests/llm_optimizers_tests/test_optimize_anything_trace_backend.py b/tests/llm_optimizers_tests/test_optimize_anything_trace_backend.py new file mode 100644 index 00000000..bfe131c7 --- /dev/null +++ b/tests/llm_optimizers_tests/test_optimize_anything_trace_backend.py @@ -0,0 +1,176 @@ +import os + +import pytest + +from opto.optimizers.optimizer import Optimizer +from opto.trace import GRAPH +from opto.optimize_anything import GEPAConfig, EngineConfig, ReflectionConfig, optimize_anything +from opto.optimize_anything.trace_backend import TraceOptimizerBackend, resolve_optimizer_cls + + +@pytest.fixture(autouse=True) +def clear_graph(): + GRAPH.clear() + yield + GRAPH.clear() + + +class SuffixOptimizer(Optimizer): + def __init__(self, parameters, suffix="!", **kwargs): + super().__init__(parameters) + self.suffix = suffix + self.seen_kwargs = kwargs + + def _step(self, *args, **kwargs): + return {parameter: f"{parameter.data}{self.suffix}" for parameter in self.parameters} + + +class NoBypassOptimizer(Optimizer): + def _step(self, *args, **kwargs): + return {parameter: f"{parameter.data}*" for parameter in self.parameters} + + def step(self): + update_dict = self.propose() + self.update(update_dict) + return update_dict + + +class ProposeOnlyOptimizer: + def __init__(self, parameters, **kwargs): + self.parameters = parameters + + def zero_feedback(self): + pass + + def backward(self, output, feedback): + self.feedback = feedback + + def propose(self): + return {parameter: f"{parameter.data}?" for parameter in self.parameters} + + +def test_trace_backend_updates_string_candidate_with_optimizer_protocol(): + backend = TraceOptimizerBackend(optimizer_cls=SuffixOptimizer, optimizer_kwargs={"suffix": " improved"}) + assert backend(candidate="seed", feedback="make it better", objective="improve candidate") == "seed improved" + + +def test_trace_backend_does_not_mutate_original_candidate(): + original = {"prompt": "seed"} + backend = TraceOptimizerBackend(optimizer_cls=SuffixOptimizer, optimizer_kwargs={"suffix": " v2"}) + updated = backend(candidate=original, feedback="make it better") + assert original == {"prompt": "seed"} + assert updated is not original + + +def test_trace_backend_preserves_single_key_dict_candidate_shape(): + backend = TraceOptimizerBackend(optimizer_cls=SuffixOptimizer, optimizer_kwargs={"suffix": " v2"}) + assert backend(candidate={"prompt": "seed"}, feedback="make it better") == {"prompt": "seed v2"} + + +def test_trace_backend_can_roundtrip_json_dict_candidates_when_configured(): + class JsonOptimizer(Optimizer): + def _step(self, *args, **kwargs): + return {parameter: '{"x": 2, "nested": [1]}' for parameter in self.parameters} + + backend = TraceOptimizerBackend( + optimizer_cls=JsonOptimizer, + candidate_serializer=lambda candidate: '{"x": 1, "nested": []}', + ) + assert backend(candidate={"x": 1, "nested": []}, feedback="increase x") == {"x": 2, "nested": [1]} + + +def test_trace_backend_falls_back_when_step_has_no_bypassing_kwarg(): + backend = TraceOptimizerBackend(optimizer_cls=NoBypassOptimizer) + assert backend(candidate="a", feedback="change") == "a*" + + +def test_trace_backend_can_use_propose_only_optimizer(): + backend = TraceOptimizerBackend(optimizer_cls=ProposeOnlyOptimizer) + assert backend(candidate="a", feedback="change") == "a?" + + +def test_trace_backend_preserves_single_key_dict_for_scalar_non_string_proposal(): + class ScalarOptimizer(Optimizer): + def _step(self, *args, **kwargs): + return {parameter: 2 for parameter in self.parameters} + + backend = TraceOptimizerBackend(optimizer_cls=ScalarOptimizer) + assert backend(candidate={"x": 1}, feedback="increase") == {"x": 2} + + +def test_trace_backend_rejects_non_class_optimizer_cls(): + with pytest.raises(ValueError, match="optimizer_cls must be a class or string name"): + TraceOptimizerBackend(optimizer_cls=object()) + + +def test_trace_backend_inside_optimize_anything_loop(): + backend = TraceOptimizerBackend(optimizer_cls=SuffixOptimizer, optimizer_kwargs={"suffix": "x"}) + result = optimize_anything( + seed_candidate="a", + evaluator=lambda candidate: float(len(candidate)), + objective="make longer", + config=GEPAConfig( + engine=EngineConfig(max_metric_calls=3, max_steps=2), + reflection=ReflectionConfig(custom_candidate_proposer=backend), + ), + ) + assert result.best_candidate == "axx" + assert result.best_score == pytest.approx(3.0) + + +def test_trace_backend_custom_deserializer(): + backend = TraceOptimizerBackend( + optimizer_cls=SuffixOptimizer, + optimizer_kwargs={"suffix": "!"}, + candidate_deserializer=lambda original, proposed: {"old": original, "new": proposed}, + ) + assert backend(candidate="x", feedback="change") == {"old": "x", "new": "x!"} + + +def test_resolve_optimizer_cls_supports_default_and_string_names_for_available_optimizers(): + assert resolve_optimizer_cls(SuffixOptimizer) is SuffixOptimizer + assert resolve_optimizer_cls().__name__ in {"OptoPrimeV2", "OptoPrime"} + for name in ["OptoPrimeV2", "OptoPrime", "OptoPrimeMulti", "OPROv2", "TextGrad"]: + assert resolve_optimizer_cls(name).__name__ == name + + +def test_unknown_optimizer_name_has_clear_error(): + with pytest.raises(ValueError, match="Unknown Trace optimizer"): + resolve_optimizer_cls("DefinitelyMissingOptimizer") + + +requires_openai = pytest.mark.skipif(not os.environ.get("OPENAI_API_KEY"), reason="OPENAI_API_KEY is not available") + + +@requires_openai +def test_live_gpt5_nano_litellm_helper_smoke(): + pytest.importorskip("litellm") + os.environ.setdefault("TRACE_DEFAULT_LLM_BACKEND", "LiteLLM") + os.environ.setdefault("TRACE_LITELLM_MODEL", "gpt-5-nano") + from opto.optimize_anything import make_litellm_lm + + lm = make_litellm_lm(model=os.environ.get("TRACE_LITELLM_MODEL", "gpt-5-nano"), max_retries=1) + assert callable(lm.model) + assert getattr(lm, "model_name", None) == os.environ.get("TRACE_LITELLM_MODEL", "gpt-5-nano") + + +@requires_openai +def test_live_gpt5_nano_trace_backend_protocol_smoke(): + pytest.importorskip("litellm") + os.environ.setdefault("TRACE_DEFAULT_LLM_BACKEND", "LiteLLM") + os.environ.setdefault("TRACE_LITELLM_MODEL", "gpt-5-nano") + backend = TraceOptimizerBackend( + optimizer_cls="OPROv2", + optimizer_kwargs={"max_tokens": 128, "temperature": 0.0, "llm": None}, + ) + result = optimize_anything( + seed_candidate="Answer with one short word.", + evaluator=lambda candidate: 1.0 if isinstance(candidate, str) and candidate else 0.0, + objective="Keep the instruction concise.", + config=GEPAConfig( + engine=EngineConfig(max_metric_calls=2, max_steps=1, capture_stdio=True), + reflection=ReflectionConfig(custom_candidate_proposer=backend), + ), + ) + assert isinstance(result.best_candidate, str) + assert result.total_metric_calls <= 2 diff --git a/tests/unit_tests/test_optimize_anything_api.py b/tests/unit_tests/test_optimize_anything_api.py new file mode 100644 index 00000000..a9d86a6c --- /dev/null +++ b/tests/unit_tests/test_optimize_anything_api.py @@ -0,0 +1,278 @@ +import io +import sys +from contextlib import redirect_stdout + +import pytest + +import opto.optimize_anything as oa + + +def test_public_api_exports_expected_symbols(): + for name in [ + "optimize_anything", "EngineConfig", "ReflectionConfig", "RefinerConfig", + "MergeConfig", "TrackingConfig", "GEPAConfig", "OptimizationState", + "GEPAResult", "EvaluationRecord", "log", "get_log_context", + "set_log_context", "make_litellm_lm", "TraceOptimizerBackend", + ]: + assert hasattr(oa, name) + + +def test_log_context_does_not_write_to_stdout(): + captured_stdout = io.StringIO() + captured_logs = [] + token = oa.set_log_context(captured_logs) + try: + with redirect_stdout(captured_stdout): + oa.log("hidden", 1, sep="-") + finally: + oa.reset_log_context(token) + assert captured_stdout.getvalue() == "" + assert captured_logs == ["hidden-1"] + + +def test_log_falls_back_to_print_without_context(capsys): + oa.log("visible", 2) + assert capsys.readouterr().out == "visible 2\n" + + +def test_evaluator_supports_stdout_stderr_oa_log_cache_and_opt_state(): + calls = {"n": 0} + + def evaluator(candidate, example, opt_state): + assert opt_state.candidate == candidate + print(f"stdout:{example}") + sys.stderr.write(f"stderr:{example}\n") + oa.log("structured", example) + calls["n"] += 1 + return candidate["x"] + example, {"scores": {"x": candidate["x"]}} + + def proposer(candidate, feedback, **kwargs): + assert "structured" in feedback + return {"x": candidate["x"] + 1} + + result = oa.optimize_anything( + seed_candidate={"x": 0}, + evaluator=evaluator, + dataset=[1, 1], + objective="increase x", + config=oa.GEPAConfig( + engine=oa.EngineConfig(max_metric_calls=10, max_steps=1, capture_stdio=True, cache_evaluation=True), + reflection=oa.ReflectionConfig(custom_candidate_proposer=proposer), + ), + ) + assert calls["n"] == 2 + assert result.best_candidate == {"x": 1} + assert result.best_score == pytest.approx(2.0) + assert result.total_metric_calls == 2 + assert any("stdout:1" in r.stdout for r in result.history) + assert any("stderr:1" in r.stderr for r in result.history) + assert any("structured" in "\n".join(r.logs) for r in result.history) + + +@pytest.mark.parametrize("returned,expected", [(1, 1.0), (True, 1.0), (0.25, 0.25), ((0.7, {"a": 1}), 0.7), ((None, {"scores": [0.25, 0.75]}), 0.5)]) +def test_evaluator_return_forms(returned, expected): + result = oa.optimize_anything(seed_candidate="seed", evaluator=lambda candidate: returned, max_metric_calls=1) + assert result.best_score == pytest.approx(expected) + + +@pytest.mark.parametrize( + "evaluator", + [ + lambda candidate: float(candidate), + lambda candidate, example: float(candidate + example), + lambda candidate, example, opt_state: float(candidate + example + opt_state.step), + lambda c, e, s: float(c + e + s.step), + lambda candidate, example, *, opt_state: float(candidate + example + opt_state.step), + ], +) +def test_evaluator_signature_variants_and_opt_state_injection(evaluator): + result = oa.optimize_anything(seed_candidate=1, evaluator=evaluator, dataset=[2], max_metric_calls=1) + assert result.best_score >= 1.0 + + +def test_candidate_proposer_can_return_multiple_candidates_and_budget_is_respected(): + def proposer(candidate, **kwargs): + return [candidate + 1, candidate + 2, candidate + 3] + + result = oa.optimize_anything( + seed_candidate=0, + evaluator=lambda candidate: float(candidate), + config=oa.GEPAConfig( + engine=oa.EngineConfig(max_metric_calls=3, max_steps=3), + reflection=oa.ReflectionConfig(custom_candidate_proposer=proposer), + ), + ) + assert result.total_metric_calls == 3 + assert result.best_candidate == 2 + assert [r.candidate for r in result.history] == [0, 1, 2] + + +def test_direct_kwargs_patch_config_for_gepa_style_callsite(): + result = oa.optimize_anything( + seed_candidate=3, + evaluator=lambda candidate: float(candidate), + max_metric_calls=1, + capture_stdio=True, + cache_evaluation=False, + ) + assert result.best_score == pytest.approx(3.0) + assert result.config.engine.capture_stdio is True + assert result.config.engine.cache_evaluation is False + + +def test_stable_cache_handles_unhashable_nested_candidates_and_examples(): + calls = {"n": 0} + + def evaluator(candidate, example): + calls["n"] += 1 + return float(candidate["values"][0] + example["bias"]) + + result = oa.optimize_anything( + seed_candidate={"values": [1, 2]}, + evaluator=evaluator, + dataset=[{"bias": 3}, {"bias": 3}], + config=oa.GEPAConfig(engine=oa.EngineConfig(max_metric_calls=10, max_steps=0, cache_evaluation=True)), + ) + assert calls["n"] == 1 + assert result.best_score == pytest.approx(4.0) + + +def test_lower_is_better_selection(): + result = oa.optimize_anything( + seed_candidate=10, + evaluator=lambda candidate: float(candidate), + config=oa.GEPAConfig( + engine=oa.EngineConfig(max_metric_calls=3, max_steps=2, higher_is_better=False), + reflection=oa.ReflectionConfig(custom_candidate_proposer=lambda candidate, **kwargs: candidate - 1), + ), + ) + assert result.best_candidate == 8 + assert result.best_score == pytest.approx(8.0) + + +def test_result_to_dict_is_json_like_and_has_validation_aliases(): + result = oa.optimize_anything( + seed_candidate="x", + evaluator=lambda candidate, example=None: (1.0, {"scores": {"ok": 1}}), + valset=[{"heldout": True}], + config=oa.GEPAConfig(engine=oa.EngineConfig(max_metric_calls=2, max_steps=0)), + ) + as_dict = result.to_dict() + assert as_dict["best_candidate"] == "x" + assert as_dict["best_score"] == 1.0 + assert as_dict["candidate_scores"] == [{"candidate": "x", "score": 1.0}] + assert as_dict["history"][0]["side_info"] == {"scores": {"ok": 1}} + assert "validation_records" in as_dict + + +def test_to_dict_converts_non_json_objects_and_config_callables_to_repr(): + class CustomObject: + pass + + def proposer(candidate, **kwargs): + return candidate + + obj = CustomObject() + result = oa.optimize_anything( + seed_candidate={"obj": obj}, + evaluator=lambda candidate: (1.0, {"obj": obj}), + config=oa.GEPAConfig( + engine=oa.EngineConfig(max_metric_calls=1), + reflection=oa.ReflectionConfig(custom_candidate_proposer=proposer), + ), + ) + data = result.to_dict() + assert isinstance(data["best_candidate"]["obj"], str) + assert isinstance(data["history"][0]["side_info"]["obj"], str) + assert isinstance(data["config"]["reflection"]["custom_candidate_proposer"], str) + + +def test_cache_key_is_stable_for_sets_and_nested_unhashables(): + calls = {"n": 0} + + def evaluator(candidate, example): + calls["n"] += 1 + return float(len(candidate["items"]) + len(example["items"])) + + result = oa.optimize_anything( + seed_candidate={"items": {3, 1, 2}}, + evaluator=evaluator, + dataset=[{"items": {2, 1}}, {"items": {1, 2}}], + config=oa.GEPAConfig( + engine=oa.EngineConfig(max_metric_calls=10, max_steps=0, cache_evaluation=True), + ), + ) + assert calls["n"] == 1 + assert len(result.history) == 2 + assert result.total_metric_calls == 1 + + +def test_cache_hits_do_not_consume_budget_after_budget_is_reached(): + calls = {"n": 0} + + def evaluator(candidate, example): + calls["n"] += 1 + return 1.0 + + result = oa.optimize_anything( + seed_candidate="x", + evaluator=evaluator, + dataset=[{"same": [1, 2]}, {"same": [1, 2]}], + config=oa.GEPAConfig( + engine=oa.EngineConfig(max_metric_calls=1, max_steps=0, cache_evaluation=True), + ), + ) + assert calls["n"] == 1 + assert result.total_metric_calls == 1 + assert len(result.history) == 2 + assert result.history[1].cached is True + + +def test_evaluator_mixed_known_and_unknown_positional_names(): + def evaluator(candidate, e): + return float(candidate + e) + + result = oa.optimize_anything( + seed_candidate=2, + evaluator=evaluator, + dataset=[3], + max_metric_calls=1, + ) + assert result.best_score == pytest.approx(5.0) + + +def test_evaluator_keyword_only_opt_state_with_mixed_positional_names(): + seen_steps = [] + + def evaluator(candidate, e, *, opt_state): + seen_steps.append(opt_state.step) + return float(candidate + e + opt_state.step) + + result = oa.optimize_anything( + seed_candidate=2, + evaluator=evaluator, + dataset=[3], + max_metric_calls=1, + ) + assert result.best_score == pytest.approx(5.0) + assert seen_steps == [0] + + +def test_proposer_mixed_known_and_unknown_positional_names(): + def evaluator(candidate): + return float(candidate) + + def proposer(candidate, fb): + assert "Aggregate score" in fb + return candidate + 1 + + result = oa.optimize_anything( + seed_candidate=1, + evaluator=evaluator, + config=oa.GEPAConfig( + engine=oa.EngineConfig(max_metric_calls=2, max_steps=1), + reflection=oa.ReflectionConfig(custom_candidate_proposer=proposer), + ), + ) + assert result.best_candidate == 2 + assert result.best_score == pytest.approx(2.0)