From 2d09d10583de8a06ebe59ba3f0c5ae5afeaee108 Mon Sep 17 00:00:00 2001 From: doxav <> Date: Wed, 13 May 2026 08:24:03 +0200 Subject: [PATCH] Implement OptoPrimeMultiV2 optimizer with multi-candidate support compatible with OptoPrimeV2 A notebook demonstrate its usafe - Added OptoPrimeMultiMixin for shared functionality across multi-candidate optimizers. - Created OptoPrimeMultiV2 class extending OptoPrimeMultiMixin and OptoPrimeV2 for multi-candidate optimization. - Developed candidate generation strategies including temperature variation, self-refinement, iterative alternatives, and multi-experts. - Implemented selection techniques such as best_of_n, majority, and mixture_of_agents. - Added unit and integration tests for OptoPrimeMultiV2 to ensure functionality and correctness. - Included support for JSON object response format in candidate generation. --- examples/notebooks/optoprimemultiv2.ipynb | 637 ++++++++++++++++ opto/optimizers/__init__.py | 3 +- opto/optimizers/optoprimemulti.py | 688 +----------------- opto/optimizers/optoprimemulti_base.py | 576 +++++++++++++++ opto/optimizers/optoprimemulti_v2.py | 31 + .../test_optoprimemultiv2.py | 49 ++ tests/unit_tests/test_optoprimemultiv2.py | 332 +++++++++ 7 files changed, 1631 insertions(+), 685 deletions(-) create mode 100644 examples/notebooks/optoprimemultiv2.ipynb create mode 100644 opto/optimizers/optoprimemulti_base.py create mode 100644 opto/optimizers/optoprimemulti_v2.py create mode 100644 tests/llm_optimizers_tests/test_optoprimemultiv2.py create mode 100644 tests/unit_tests/test_optoprimemultiv2.py diff --git a/examples/notebooks/optoprimemultiv2.ipynb b/examples/notebooks/optoprimemultiv2.ipynb new file mode 100644 index 00000000..3a4773fb --- /dev/null +++ b/examples/notebooks/optoprimemultiv2.ipynb @@ -0,0 +1,637 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "intro", + "metadata": {}, + "source": [ + "# OptoPrimeMultiV2 on real BBEH tasks\n", + "\n", + "[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/doxav/NewTrace/blob/experimental/examples/notebooks/optoprimemultiv2.ipynb)\n", + "\n", + "This notebook compares **OptoPrimeV2** with **OptoPrimeMultiV2** on a selected real BBEH task. It uses a real LLM only: OpenAI is preferred when `OPENAI_API_KEY` is available, otherwise OpenRouter is used when `OPENROUTER_API_KEY` is available.\n", + "\n", + "Configure the run with environment variables before executing the notebook:\n", + "\n", + "- `BBEH_TASK_NAME`, default `bbeh_boolean_expressions`\n", + "- `BBEH_N_TRAIN`, default `3`\n", + "- `BBEH_N_EVAL`, default `3`\n", + "- `BBEH_SEED`, default `7`\n" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "setup", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T06:17:20.506372Z", + "iopub.status.busy": "2026-05-13T06:17:20.506190Z", + "iopub.status.idle": "2026-05-13T06:17:20.512364Z", + "shell.execute_reply": "2026-05-13T06:17:20.511704Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "IN_COLAB=False\n" + ] + } + ], + "source": [ + "import os\n", + "import sys\n", + "\n", + "try:\n", + " import google.colab # type: ignore\n", + " IN_COLAB = True\n", + "except Exception:\n", + " IN_COLAB = False\n", + "\n", + "if IN_COLAB:\n", + " !git clone https://github.com/doxav/NewTrace.git Trace\n", + " %cd Trace\n", + " !git checkout experimental\n", + " !pip install -e . -q\n", + "else:\n", + " repo_root = os.getcwd()\n", + " if repo_root.endswith(os.path.join(\"examples\", \"notebooks\")):\n", + " repo_root = os.path.abspath(os.path.join(repo_root, \"..\", \"..\"))\n", + " if repo_root not in sys.path:\n", + " sys.path.insert(0, repo_root)\n", + "\n", + "print(f\"IN_COLAB={IN_COLAB}\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "provider-md", + "metadata": {}, + "source": [ + "## Provider setup\n", + "\n", + "The notebook requires a real LLM key. It reads keys from environment variables and, in Colab, from `google.colab.userdata`.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "provider-code", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T06:17:20.513511Z", + "iopub.status.busy": "2026-05-13T06:17:20.513378Z", + "iopub.status.idle": "2026-05-13T06:17:20.517001Z", + "shell.execute_reply": "2026-05-13T06:17:20.516494Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Provider: openai\n", + "Model: gpt-4o-mini\n", + "OpenAI key available: True\n", + "OpenRouter key available: False\n" + ] + } + ], + "source": [ + "def read_secret_or_env(name: str):\n", + " value = os.environ.get(name)\n", + " if value:\n", + " return value\n", + " if IN_COLAB:\n", + " try:\n", + " from google.colab import userdata # type: ignore\n", + " value = userdata.get(name)\n", + " if value:\n", + " os.environ[name] = value\n", + " return value\n", + " except Exception:\n", + " pass\n", + " return None\n", + "\n", + "openai_key = read_secret_or_env(\"OPENAI_API_KEY\")\n", + "openrouter_key = read_secret_or_env(\"OPENROUTER_API_KEY\")\n", + "\n", + "if openai_key:\n", + " PROVIDER = \"openai\"\n", + " MODEL_NAME = os.environ.get(\"TRACE_OPENAI_NOTEBOOK_MODEL\", \"gpt-4o-mini\")\n", + "elif openrouter_key:\n", + " PROVIDER = \"openrouter\"\n", + " MODEL_NAME = os.environ.get(\"TRACE_OPENROUTER_NOTEBOOK_MODEL\", \"openrouter/openai/gpt-4o-mini\")\n", + "else:\n", + " raise RuntimeError(\"Set OPENAI_API_KEY or OPENROUTER_API_KEY before running this notebook.\")\n", + "\n", + "os.environ.setdefault(\"TRACE_DEFAULT_LLM_BACKEND\", \"LiteLLM\")\n", + "os.environ[\"TRACE_LITELLM_MODEL\"] = MODEL_NAME\n", + "\n", + "print(f\"Provider: {PROVIDER}\")\n", + "print(f\"Model: {MODEL_NAME}\")\n", + "print(f\"OpenAI key available: {bool(openai_key)}\")\n", + "print(f\"OpenRouter key available: {bool(openrouter_key)}\")\n" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "imports", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T06:17:20.518201Z", + "iopub.status.busy": "2026-05-13T06:17:20.518122Z", + "iopub.status.idle": "2026-05-13T06:17:21.921502Z", + "shell.execute_reply": "2026-05-13T06:17:21.920665Z" + } + }, + "outputs": [], + "source": [ + "import json\n", + "import random\n", + "import re\n", + "import string\n", + "import subprocess\n", + "from pathlib import Path\n", + "\n", + "try:\n", + " import pandas as pd\n", + "except Exception:\n", + " pd = None\n", + "\n", + "from opto import trace\n", + "from opto.optimizers import OptoPrimeV2, OptoPrimeMultiV2\n", + "from opto.trace.nodes import GRAPH\n", + "from opto.utils.llm import LLM\n", + "\n", + "random.seed(7)\n" + ] + }, + { + "cell_type": "markdown", + "id": "data-md", + "metadata": {}, + "source": [ + "## Real BBEH data\n", + "\n", + "This is the compact replacement for `load_bbeh_like`. It loads examples from the selected BBEH `task.json`, cloning the public BBEH repo to `/tmp/bbeh` only when task files are not already available.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "data-code", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T06:17:21.922951Z", + "iopub.status.busy": "2026-05-13T06:17:21.922786Z", + "iopub.status.idle": "2026-05-13T06:17:21.930065Z", + "shell.execute_reply": "2026-05-13T06:17:21.929642Z" + } + }, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Loaded 6 examples from /tmp/bbeh/bbeh/benchmark_tasks/bbeh_disambiguation_qa/task.json\n", + "Task: bbeh_disambiguation_qa\n", + "Train targets: ['(C)', '(C)', '(B)']\n", + "Eval targets: ['(D)', '(B)', '(E)']\n" + ] + } + ], + "source": [ + "def canonical_bbeh_task_name(task_name: str) -> str:\n", + " name = str(task_name).strip().replace(\"-\", \"_\")\n", + " return name if name.startswith(\"bbeh_\") else f\"bbeh_{name}\"\n", + "\n", + "\n", + "def repo_root() -> Path:\n", + " root = Path.cwd().resolve()\n", + " return root.parent.parent if root.match(\"*/examples/notebooks\") else root\n", + "\n", + "\n", + "def find_bbeh_tasks_dir() -> Path | None:\n", + " bases = [Path(os.environ[p]).expanduser() for p in [\"TRACE_BBEH_TASKS_DIR\"] if os.environ.get(p)]\n", + " bases += [repo_root() / \"bbeh\", Path(\"/tmp/bbeh\"), Path(\"bbeh\"), repo_root()]\n", + " for base in bases:\n", + " for candidate in [base / \"benchmark_tasks\", base / \"bbeh\" / \"benchmark_tasks\"]:\n", + " if candidate.exists():\n", + " return candidate.resolve()\n", + " return None\n", + "\n", + "\n", + "def ensure_bbeh_tasks_dir() -> Path:\n", + " tasks_dir = find_bbeh_tasks_dir()\n", + " if tasks_dir:\n", + " return tasks_dir\n", + " clone_root = Path(os.environ.get(\"TRACE_BBEH_REPO\", \"/tmp/bbeh\")).expanduser()\n", + " subprocess.run(\n", + " [\"git\", \"clone\", \"--depth\", \"1\", \"https://github.com/google-deepmind/bbeh.git\", str(clone_root)],\n", + " check=True,\n", + " stdout=subprocess.PIPE,\n", + " stderr=subprocess.PIPE,\n", + " text=True,\n", + " )\n", + " tasks_dir = find_bbeh_tasks_dir()\n", + " if not tasks_dir:\n", + " raise FileNotFoundError(\"Could not locate BBEH benchmark_tasks after cloning google-deepmind/bbeh.\")\n", + " return tasks_dir\n", + "\n", + "\n", + "def load_bbeh_examples(task_name: str, n_train: int, n_eval: int, seed: int):\n", + " task_name = canonical_bbeh_task_name(task_name)\n", + " task_path = ensure_bbeh_tasks_dir() / task_name / \"task.json\"\n", + " if not task_path.exists():\n", + " raise FileNotFoundError(f\"BBEH task not found: {task_path}\")\n", + "\n", + " examples = json.loads(task_path.read_text()).get(\"examples\", [])\n", + " rows = [{\"input\": ex[\"input\"], \"target\": str(ex[\"target\"]).strip(), \"task\": task_name} for ex in examples]\n", + " random.Random(seed).shuffle(rows)\n", + " if len(rows) < n_train + n_eval:\n", + " raise ValueError(f\"Need {n_train + n_eval} examples for {task_name}; found {len(rows)}\")\n", + "\n", + " allowed = {row[\"target\"] for row in rows if re.fullmatch(r\"\\([A-Z]\\)\", row[\"target\"])}\n", + " allowed = allowed if len(allowed) > 1 else None\n", + " if allowed:\n", + " suffix = \"\\n\\nFinal answer must be one of: \" + \", \".join(sorted(allowed))\n", + " rows = [{**row, \"input\": row[\"input\"] + suffix} for row in rows]\n", + "\n", + " print(f\"Loaded {n_train + n_eval} examples from {task_path}\")\n", + " return rows[:n_train], rows[n_train:n_train + n_eval], allowed\n", + "\n", + "\n", + "BBEH_TASK_NAME = os.environ.get(\"BBEH_TASK_NAME\", \"bbeh_boolean_expressions\")\n", + "BBEH_N_TRAIN = int(os.environ.get(\"BBEH_N_TRAIN\", \"3\"))\n", + "BBEH_N_EVAL = int(os.environ.get(\"BBEH_N_EVAL\", \"3\"))\n", + "BBEH_SEED = int(os.environ.get(\"BBEH_SEED\", \"7\"))\n", + "\n", + "train_examples, eval_examples, allowed_answers = load_bbeh_examples(\n", + " BBEH_TASK_NAME, BBEH_N_TRAIN, BBEH_N_EVAL, BBEH_SEED\n", + ")\n", + "print(\"Task:\", canonical_bbeh_task_name(BBEH_TASK_NAME))\n", + "print(\"Train targets:\", [ex[\"target\"] for ex in train_examples])\n", + "print(\"Eval targets:\", [ex[\"target\"] for ex in eval_examples])\n" + ] + }, + { + "cell_type": "markdown", + "id": "agent-md", + "metadata": {}, + "source": [ + "## Agent and BBEH guide\n", + "\n", + "Every optimizer starts from the same neutral prompt and the same train/eval split. Agent calls use temperature 0 so measured differences come from the optimizer update, not sampling noise.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "agent-code", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T06:17:21.931460Z", + "iopub.status.busy": "2026-05-13T06:17:21.931359Z", + "iopub.status.idle": "2026-05-13T06:17:21.941584Z", + "shell.execute_reply": "2026-05-13T06:17:21.941034Z" + } + }, + "outputs": [], + "source": [ + "ANSWER_RE = re.compile(r\"(?im)^answer\\s*:\\s*(.+)$\")\n", + "OPTION_RE = re.compile(r\"\\([A-Z]\\)\")\n", + "\n", + "\n", + "def extract_answer(text: str) -> str:\n", + " text = str(text).strip()\n", + " match = ANSWER_RE.search(text)\n", + " answer = match.group(1).strip() if match else text.splitlines()[-1].strip()\n", + " options = OPTION_RE.findall(answer)\n", + " return options[-1] if options else answer\n", + "\n", + "\n", + "def normalize_answer(ans) -> str:\n", + " ans = \"\" if ans is None else str(ans).strip().lower()\n", + " return ans.translate(str.maketrans(\"\", \"\", string.punctuation + string.whitespace))\n", + "\n", + "\n", + "class BBEHGuide:\n", + " def __init__(self, allowed=None):\n", + " self.allowed = set(allowed or [])\n", + "\n", + " def feedback(self, prediction, target):\n", + " ok = normalize_answer(prediction) == normalize_answer(target)\n", + " if ok:\n", + " return 1.0, f\"SUCCESS: final answer {prediction!r} matches {target!r}.\"\n", + " allowed = f\" Final answer must be one of {sorted(self.allowed)}.\" if self.allowed else \"\"\n", + " return 0.0, (\n", + " f\"FAILED: final answer {prediction!r} does not match expected {target!r}.\"\n", + " f\"{allowed} Improve the trainable prompt so future BBEH examples produce the exact final answer.\"\n", + " )\n", + "\n", + "\n", + "AGENT_MAX_TOKENS = int(os.environ.get(\"BBEH_AGENT_MAX_TOKENS\", \"800\"))\n", + "\n", + "\n", + "@trace.model\n", + "class BBEHAgent:\n", + " def __init__(self, llm):\n", + " self.llm = llm\n", + " self.system_prompt = trace.node(\n", + " \"You are a careful reasoning assistant.\",\n", + " name=\"system_prompt\",\n", + " trainable=True,\n", + " description=\"General strategy for solving BBEH examples.\",\n", + " )\n", + " self.answer_instruction = trace.node(\n", + " \"End with exactly one line formatted as `Answer: `. For multiple-choice tasks, the final answer must be just the option label, e.g. `(A)`.\",\n", + " name=\"answer_instruction\",\n", + " trainable=True,\n", + " description=\"Final answer formatting instruction.\",\n", + " )\n", + "\n", + " @trace.bundle()\n", + " def call_llm(self, system_prompt, answer_instruction, question):\n", + " response = self.llm(\n", + " messages=[\n", + " {\"role\": \"system\", \"content\": f\"{system_prompt}\\n\\n{answer_instruction}\"},\n", + " {\"role\": \"user\", \"content\": question},\n", + " ],\n", + " max_tokens=AGENT_MAX_TOKENS,\n", + " temperature=0,\n", + " )\n", + " return response.choices[0].message.content\n", + "\n", + " def forward(self, question):\n", + " return self.call_llm(self.system_prompt, self.answer_instruction, question)\n" + ] + }, + { + "cell_type": "markdown", + "id": "compare-md", + "metadata": {}, + "source": [ + "## Compare optimizers\n", + "\n", + "Each optimizer starts from the same initial agent and the same train/eval split. The update step uses the same hard-example rule for every optimizer: among the 3 training examples, only examples currently answered incorrectly generate feedback. Identical live LLM calls are memoized so repeated pre-update evaluations are exactly comparable without introducing synthetic responses.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "compare-code", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-13T06:17:21.942869Z", + "iopub.status.busy": "2026-05-13T06:17:21.942774Z", + "iopub.status.idle": "2026-05-13T06:18:26.648365Z", + "shell.execute_reply": "2026-05-13T06:18:26.647973Z" + } + }, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
experimentbefore_accuracyafter_accuracyaccuracy_gainnum_updatescandidate_count
0OptoPrimeV20.3333330.3333330.00000000
1OptoPrimeMultiV2 | temperature_variation + bes...0.3333330.6666670.33333323
2OptoPrimeMultiV2 | multi_experts + moa0.3333330.3333330.00000023
\n", + "
" + ], + "text/plain": [ + " experiment before_accuracy \\\n", + "0 OptoPrimeV2 0.333333 \n", + "1 OptoPrimeMultiV2 | temperature_variation + bes... 0.333333 \n", + "2 OptoPrimeMultiV2 | multi_experts + moa 0.333333 \n", + "\n", + " after_accuracy accuracy_gain num_updates candidate_count \n", + "0 0.333333 0.000000 0 0 \n", + "1 0.666667 0.333333 2 3 \n", + "2 0.333333 0.000000 2 3 " + ] + }, + "metadata": {}, + "output_type": "display_data" + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Baseline eval predictions: [('(D)', '(D)', 1.0), ('(D)', '(B)', 0.0), ('(A)', '(E)', 0.0)]\n", + "Best pre-declared MultiV2 eval predictions: [('D', '(D)', 1.0), ('(D)', '(B)', 0.0), ('E', '(E)', 1.0)]\n", + "Best pre-declared MultiV2 selected candidate: {'raw': \"```\\n\\n1. The instruction requires modifying the values of the variables `system_prompt0` and `answer_instruction0` to enhance the outputs based on the feedback provided, which indicates that the final answers generated by the model do not match the expected results for the questions.\\n2. The feedback highlights specific instances where the model's answers were incorrect, suggesting that the current prompts may not effectively guide the model in discerning the correct antecedents of the pronouns. This indicates a need for clearer and more directive prompts to improve the model's reasoning capabilities.\\n3. To address these issues, I suggest changing `system_prompt0` to emphasize the importance of accurately identifying pronoun antecedents and resolving ambiguities in sentences. Additionally, I recommend revising `answer_instruction0` to stress that the final answers should closely align with the provided options and reflect the most accurate interpretation based on the context. These changes are expected to improve the model's performance in generating the correct answers.\\n\\n\\nsystem_prompt0\\n\\nYou are a careful reasoning assistant focused on accurately identifying pronoun antecedents and resolving ambiguities in sentences.\\n\\n\\n\\nanswer_instruction0\\n\\nEnd with exactly one line formatted as `Answer: `. Ensure that the final answer corresponds to the most accurate interpretation based on the context and is one of the provided options only.\\n\\n\\n```\", 'parsed': {'reasoning': \"1. The instruction requires modifying the values of the variables `system_prompt0` and `answer_instruction0` to enhance the outputs based on the feedback provided, which indicates that the final answers generated by the model do not match the expected results for the questions.\\n2. The feedback highlights specific instances where the model's answers were incorrect, suggesting that the current prompts may not effectively guide the model in discerning the correct antecedents of the pronouns. This indicates a need for clearer and more directive prompts to improve the model's reasoning capabilities.\\n3. To address these issues, I suggest changing `system_prompt0` to emphasize the importance of accurately identifying pronoun antecedents and resolving ambiguities in sentences. Additionally, I recommend revising `answer_instruction0` to stress that the final answers should closely align with the provided options and reflect the most accurate interpretation based on the context. These changes are expected to improve the model's performance in generating the correct answers.\", 'variables': {'system_prompt0': 'You are a careful reasoning assistant focused on accurately identifying pronoun antecedents and resolving ambiguities in sentences.', 'answer_instruction0': 'End with exactly one line formatted as `Answer: `. Ensure that the final answer corresponds to the most accurate interpretation based on the context and is one of the provided options only.'}}, 'variables': {'system_prompt0': 'You are a careful reasoning assistant focused on accurately identifying pronoun antecedents and resolving ambiguities in sentences.', 'answer_instruction0': 'End with exactly one line formatted as `Answer: `. Ensure that the final answer corresponds to the most accurate interpretation based on the context and is one of the provided options only.'}, 'reasoning': \"1. The instruction requires modifying the values of the variables `system_prompt0` and `answer_instruction0` to enhance the outputs based on the feedback provided, which indicates that the final answers generated by the model do not match the expected results for the questions.\\n2. The feedback highlights specific instances where the model's answers were incorrect, suggesting that the current prompts may not effectively guide the model in discerning the correct antecedents of the pronouns. This indicates a need for clearer and more directive prompts to improve the model's reasoning capabilities.\\n3. To address these issues, I suggest changing `system_prompt0` to emphasize the importance of accurately identifying pronoun antecedents and resolving ambiguities in sentences. Additionally, I recommend revising `answer_instruction0` to stress that the final answers should closely align with the provided options and reflect the most accurate interpretation based on the context. These changes are expected to improve the model's performance in generating the correct answers.\", 'valid': True, 'terminate': False}\n", + "PASS: A pre-declared OptoPrimeMultiV2 strategy beat OptoPrimeV2 on held-out BBEH accuracy.\n" + ] + } + ], + "source": [ + "class CachedLLM:\n", + " def __init__(self, llm):\n", + " self.llm = llm\n", + " self.cache = {}\n", + "\n", + " def __call__(self, *args, **kwargs):\n", + " key = json.dumps({\"args\": args, \"kwargs\": kwargs}, sort_keys=True, default=str)\n", + " if key not in self.cache:\n", + " self.cache[key] = self.llm(*args, **kwargs)\n", + " return self.cache[key]\n", + "\n", + "\n", + "shared_llm = CachedLLM(LLM())\n", + "\n", + "\n", + "def evaluate_agent(agent, examples, guide):\n", + " rows = []\n", + " for ex in examples:\n", + " response = agent(ex[\"input\"])\n", + " prediction = extract_answer(response.data)\n", + " score, feedback = guide.feedback(prediction, ex[\"target\"])\n", + " rows.append({\"target\": ex[\"target\"], \"prediction\": prediction, \"score\": score, \"feedback\": feedback})\n", + " return rows, sum(row[\"score\"] for row in rows) / max(1, len(rows))\n", + "\n", + "\n", + "def optimize_on_examples(agent, optimizer, examples, guide):\n", + " optimizer.zero_feedback()\n", + " traces, train_rows, feedback_lines = [], [], []\n", + " for i, ex in enumerate(examples, start=1):\n", + " response = agent(ex[\"input\"])\n", + " prediction = extract_answer(response.data)\n", + " score, feedback = guide.feedback(prediction, ex[\"target\"])\n", + " train_rows.append({\"target\": ex[\"target\"], \"prediction\": prediction, \"score\": score})\n", + " if score < 1.0:\n", + " traces.append(response)\n", + " feedback_lines.append(f\"Example {i}: {feedback}\")\n", + "\n", + " if not traces:\n", + " return {}, train_rows\n", + " shared_feedback = \"\\n\".join(feedback_lines)\n", + " for response in traces:\n", + " optimizer.backward(response, shared_feedback)\n", + " return optimizer.step(bypassing=False), train_rows\n", + "\n", + "\n", + "def run_experiment(name, optimizer_factory):\n", + " GRAPH.clear()\n", + " guide = BBEHGuide(allowed_answers)\n", + " agent = BBEHAgent(shared_llm)\n", + " optimizer = optimizer_factory(agent)\n", + "\n", + " before_rows, before_acc = evaluate_agent(agent, eval_examples, guide)\n", + " updates, train_rows = optimize_on_examples(agent, optimizer, train_examples, guide)\n", + " after_rows, after_acc = evaluate_agent(agent, eval_examples, guide)\n", + "\n", + " return {\n", + " \"experiment\": name,\n", + " \"before_accuracy\": before_acc,\n", + " \"after_accuracy\": after_acc,\n", + " \"accuracy_gain\": after_acc - before_acc,\n", + " \"num_updates\": len(updates),\n", + " \"candidate_count\": len(getattr(optimizer, \"candidates\", []) or []),\n", + " \"updated_parameters\": {param.py_name: value for param, value in updates.items()},\n", + " \"train_rows\": train_rows,\n", + " \"before_rows\": before_rows,\n", + " \"after_rows\": after_rows,\n", + " \"selected_candidate_details\": getattr(optimizer, \"selected_candidate_details\", None),\n", + " }\n", + "\n", + "\n", + "experiments = [\n", + " (\"OptoPrimeV2\", lambda agent: OptoPrimeV2(agent.parameters(), llm=agent.llm, max_tokens=700)),\n", + " (\n", + " \"OptoPrimeMultiV2 | temperature_variation + best_of_n\",\n", + " lambda agent: OptoPrimeMultiV2(\n", + " agent.parameters(),\n", + " llm=agent.llm,\n", + " num_responses=3,\n", + " generation_technique=\"temperature_variation\",\n", + " selection_technique=\"best_of_n\",\n", + " temperature_min_max=[0.2, 1.0],\n", + " max_tokens=700,\n", + " ),\n", + " ),\n", + " (\n", + " \"OptoPrimeMultiV2 | multi_experts + moa\",\n", + " lambda agent: OptoPrimeMultiV2(\n", + " agent.parameters(),\n", + " llm=agent.llm,\n", + " num_responses=3,\n", + " generation_technique=\"multi_experts\",\n", + " selection_technique=\"moa\",\n", + " experts_list=[\"BBEH Solver\", \"Prompt Engineer\", \"Critical Reviewer\"],\n", + " max_tokens=700,\n", + " ),\n", + " ),\n", + "]\n", + "\n", + "results = [run_experiment(name, factory) for name, factory in experiments]\n", + "summary = [\n", + " {k: v for k, v in result.items() if k not in {\"updated_parameters\", \"train_rows\", \"before_rows\", \"after_rows\", \"selected_candidate_details\"}}\n", + " for result in results\n", + "]\n", + "if pd:\n", + " display(pd.DataFrame(summary))\n", + "else:\n", + " print(summary)\n", + "\n", + "baseline, *multi_results = results\n", + "best_multi = max(multi_results, key=lambda result: (result[\"after_accuracy\"], result[\"accuracy_gain\"]))\n", + "print(\"Baseline eval predictions:\", [(r[\"prediction\"], r[\"target\"], r[\"score\"]) for r in baseline[\"after_rows\"]])\n", + "print(\"Best pre-declared MultiV2 eval predictions:\", [(r[\"prediction\"], r[\"target\"], r[\"score\"]) for r in best_multi[\"after_rows\"]])\n", + "print(\"Best pre-declared MultiV2 selected candidate:\", best_multi[\"selected_candidate_details\"])\n", + "\n", + "assert all(result[\"candidate_count\"] > 1 for result in multi_results), \"A MultiV2 run did not generate multiple candidates.\"\n", + "assert best_multi[\"after_accuracy\"] > baseline[\"after_accuracy\"], \"No pre-declared MultiV2 strategy beat OptoPrimeV2 on held-out accuracy.\"\n", + "print(\"PASS: A pre-declared OptoPrimeMultiV2 strategy beat OptoPrimeV2 on held-out BBEH accuracy.\")\n" + ] + }, + { + "cell_type": "markdown", + "id": "notes", + "metadata": {}, + "source": [ + "## Notes\n", + "\n", + "- The notebook does not include dummy LLMs or embedded BBEH task examples.\n", + "- Use `BBEH_TASK_NAME` to choose another task, for example `bbeh_geometric_shapes` or `bbeh_boardgame_qa`.\n", + "- Use `BBEH_N_TRAIN` and `BBEH_N_EVAL` to change the number of examples.\n" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/opto/optimizers/__init__.py b/opto/optimizers/__init__.py index 482b1b2d..8771713e 100644 --- a/opto/optimizers/__init__.py +++ b/opto/optimizers/__init__.py @@ -1,5 +1,6 @@ from opto.optimizers.optoprime import OptoPrime as OptoPrimeV1 from opto.optimizers.optoprimemulti import OptoPrimeMulti +from opto.optimizers.optoprimemulti_v2 import OptoPrimeMultiV2 from opto.optimizers.opro import OPRO from opto.optimizers.opro_v2 import OPROv2 from opto.optimizers.textgrad import TextGrad @@ -7,4 +8,4 @@ OptoPrime = OptoPrimeV1 -__all__ = ["OPRO", "OptoPrime", "OptoPrimeMulti", "TextGrad", "OptoPrimeV2", "OptoPrimeV1", "OPROv2"] \ No newline at end of file +__all__ = ["OPRO", "OptoPrime", "OptoPrimeMulti", "OptoPrimeMultiV2", "TextGrad", "OptoPrimeV2", "OptoPrimeV1", "OPROv2"] diff --git a/opto/optimizers/optoprimemulti.py b/opto/optimizers/optoprimemulti.py index aba40ee0..8d154af0 100644 --- a/opto/optimizers/optoprimemulti.py +++ b/opto/optimizers/optoprimemulti.py @@ -1,687 +1,7 @@ -from typing import Any, List, Dict, Union, Optional -import json -from typing import List, Dict - -from opto.trace.propagators import GraphPropagator from opto.optimizers.optoprime import OptoPrime -from opto.utils.llm import LLMFactory - -from concurrent.futures import ThreadPoolExecutor, as_completed - -class OptoPrimeMulti(OptoPrime): - """Multi-response variant of OptoPrime optimizer with advanced candidate generation and selection. - - Extends OptoPrime to generate multiple candidate solutions using various techniques, - then select the best one through sophisticated selection methods. Supports multiple - LLM profiles, diverse generation strategies, and parallel processing for improved - optimization performance. - - Parameters - ---------- - *args - Arguments passed to parent OptoPrime constructor. - num_responses : int, default=3 - Number of candidate responses to generate per optimization step. - temperature_min_max : List[float], optional - [min, max] temperature range for response generation. Defaults to [0.0, 1.0]. - selector : callable, optional - Custom function for selecting the best candidate from generated responses. - If None, uses built-in selection techniques. - generation_technique : str, default="temperature_variation" - Technique for generating diverse candidates: - - "temperature_variation": Use varying temperatures across responses - - "self_refinement": Each solution refines the previous one iteratively - - "iterative_alternatives": Generate alternatives informed by previous solutions - - "multi_experts": Use different expert personas for diverse perspectives - - "multi_llm": Use different LLM profiles for generation diversity - selection_technique : str, default="best_of_n" - Method for selecting the best candidate: - - "best_of_n": Choose most promising candidate via LLM evaluation - - "moa"/"mixture_of_agents": Synthesize best elements from all candidates - - "majority": Find consensus solution using clustering - - "last_of_n": Simply return the last generated candidate - experts_list : List[str], optional - List of expert personas for multi_experts generation technique. - If None, experts are automatically generated based on the problem. - llm_profiles : List[str], optional - List of LLM profile names to use for diverse generation. - Enables multi-model optimization approaches. - llm_weights : List[float], optional - Weights for each LLM profile when using weighted selection. - Defaults to equal weights if not specified. - **kwargs - Additional keyword arguments passed to parent OptoPrime constructor. - - Attributes - ---------- - candidates : List[str] - All candidate solutions generated in the current optimization step. - selected_candidate : str or Dict - The candidate solution selected for the current step. - num_responses : int - Number of responses to generate per step. - temperature_min_max : List[float] - Temperature range for generation diversity. - generation_technique : str - Current technique used for candidate generation. - selection_technique : str - Current technique used for candidate selection. - - Methods - ------- - generate_candidates(summary, system_prompt, user_prompt, **kwargs) - Generate multiple candidate solutions using the specified technique. - select_candidate(candidates, selection_technique, problem_summary) - Select the best candidate from generated responses. - _step(verbose, mask, **kwargs) - Perform one optimization step with multi-candidate approach. - - Notes - ----- - OptoPrimeMulti enhances optimization through several mechanisms: - - 1. **Diverse Generation**: Multiple techniques ensure candidate diversity, - preventing local optima and exploring the solution space more thoroughly. - - 2. **Parallel Processing**: Concurrent LLM calls reduce optimization time - while maintaining result quality and deterministic ordering. - - 3. **Advanced Selection**: Sophisticated selection methods choose optimal - solutions by analyzing candidate strengths and synthesizing improvements. - - 4. **Multi-Model Support**: Different LLM profiles provide diverse - perspectives and capabilities for complex optimization problems. - - The optimizer is particularly effective for: - - Complex optimization problems requiring creative solutions - - Scenarios where single-shot optimization may get stuck in local optima - - Applications benefiting from ensemble approaches and diverse perspectives - - See Also - -------- - OptoPrime : Base single-response optimizer - OptoPrimeV2 : Enhanced version with XML-based memory representation - """ - - def __init__( - self, - *args, - num_responses: int = 3, - temperature_min_max: Optional[List[float]] = None, - selector: Optional[callable] = None, - generation_technique: str = "temperature_variation", - selection_technique: str = "best_of_n", - experts_list: Optional[List[str]] = None, - llm_profiles: Optional[List[str]] = None, # List of LLM profiles to use - llm_weights: Optional[List[float]] = None, # Weights for each LLM (for weighted selection) - **kwargs, - ): - super().__init__(*args, **kwargs) - self.temperature_min_max = temperature_min_max if temperature_min_max is not None else [0.0, 1.0] - self.candidates = [] # Store all candidate solutions - self.selected_candidate = None # Store the selected candidate solution - self.num_responses = num_responses - self.selector = selector - self.generation_technique = generation_technique - self.selection_technique = selection_technique - self.experts_list = experts_list - - # NEW: Multiple LLM support - self.llm_profiles = llm_profiles - self.llm_weights = llm_weights or [1.0] * len(llm_profiles) if llm_profiles else None - self._llm_instances = {} # Cache for LLM instances - - def _get_llm_for_profile(self, profile: str = None): - """Get LLM instance for a profile, with caching.""" - if profile is None: - return self.llm # Use default LLM - - if profile not in self._llm_instances: - try: - from opto.utils.llm import LLMFactory - self._llm_instances[profile] = LLMFactory.get_llm(profile) - except Exception as e: - # Fallback to default LLM if profile creation fails - import warnings - warnings.warn(f"Failed to create LLM for profile '{profile}': {e}. Using default LLM.") - return self.llm - - return self._llm_instances[profile] - - def _get_llms_for_generation(self, num_responses: int): - """Get list of LLMs to use for generation.""" - if self.llm_profiles is None or len(self.llm_profiles) == 0: - # Fallback to single LLM (existing behavior) - return [self.llm] * num_responses - - # Distribute responses across multiple LLMs - llms = [] - for i in range(num_responses): - profile_idx = i % len(self.llm_profiles) - profile = self.llm_profiles[profile_idx] - llm = self._get_llm_for_profile(profile) - llms.append(llm) - - return llms - - def call_llm( - self, - system_prompt: str, - user_prompt: str, - verbose: Union[bool, str] = False, - max_tokens: int = 4096, - num_responses: int = 1, - temperature: float = 0.0, - llm = None, # NEW: Optional specific LLM to use - ) -> List[str]: - """Given a prompt, returns multiple candidate answers.""" - # if verbose not in (False, "output"): - # print("Prompt\n", system_prompt + user_prompt) - - # Use provided LLM or fall back to default - active_llm = llm or self.llm - - messages = [ - {"role": "system", "content": system_prompt}, - {"role": "user", "content": user_prompt}, - ] - - try: - if hasattr(active_llm, "create"): - # Standard OpenAI/LangChain style - response = active_llm.create( - messages=messages, - response_format={"type": "json_object"}, - max_tokens=max_tokens, - n=num_responses, - temperature=temperature, - ) - else: - # Fallback for LiteLLM (callable) or other interfaces - # e.g., LiteLLM(messages, max_tokens=…, n=…, temperature=…) - response = active_llm( - messages, - max_tokens=max_tokens, - n=num_responses, - temperature=temperature, - response_format={"type": "json_object"}, - ) - except Exception as e: - if verbose: - print(f"ERROR {e}") - return [] # or re-raise if you prefer - - responses = [choice.message.content for choice in response.choices] - # if verbose: - # print("LLM responses:\n", responses) - - return responses - - # ---------------------------------------------------------------------+ - # Small helper that runs *many* call_llm invocations in parallel | - # while preserving the original order of the results. | - # ---------------------------------------------------------------------+ - def _parallel_call_llm(self, arg_dicts: List[Dict[str, Any]]) -> List[str]: - """ - Run several `self.call_llm(**kwargs)` invocations concurrently. - - * **arg_dicts** – a list where each element is the kwargs you would - normally pass to `self.call_llm`. - * The function returns **one flat list** with the first - message of every response, **in the same order** as `arg_dicts`. - """ - # Pre-allocate result slots so that order is deterministic - out: List[Optional[str]] = [None] * len(arg_dicts) - - # Use threads (cheap, works even if the OpenAI client is sync only) - with ThreadPoolExecutor(max_workers=len(arg_dicts)) as pool: - future_to_idx = { - pool.submit(self.call_llm, **kw): i - for i, kw in enumerate(arg_dicts) - } - - for fut in as_completed(future_to_idx): - idx = future_to_idx[fut] - try: - resp = fut.result() # ← original API returns List[str] - if resp: - out[idx] = resp[0] # keep only the first message - except Exception as e: - if arg_dicts[idx].get("verbose"): - print(f"[async-call-llm] worker {idx} failed: {e}") - out[idx] = None - - # Filter-out failed/empty slots while preserving order - return [x for x in out if x is not None] - - def generate_candidates( - self, - summary, - system_prompt: str, - user_prompt: str, - verbose: Union[bool, str] = False, - mask=None, - max_tokens: int = None, - num_responses: int = 3, - generation_technique: str = "temperature_variation", - temperature_min_max: Optional[List[float]] = None, - experts_list: Optional[List[str]] = None, - ) -> List[str]: - """ - Generate multiple candidates using various techniques. - Args: - summary: The summarized problem instance. - system_prompt (str): The system-level prompt. - user_prompt (str): The user-level prompt. - verbose (bool): Whether to print debug information. - mask: Mask for the problem instance. - max_tokens (int, optional): Maximum token limit for the LLM responses. - num_responses (int): Number of responses to request. - generation_technique (str): Technique to use for generation: - - "temperature_variation": Use varying temperatures - - "self_refinement": Each solution refines the previous one - - "iterative_alternatives": Generate diverse alternatives - - "multi_experts": Use different expert personas - temperature_min_max (List[float], optional): [min, max] temperature range. - experts_list (List[str], optional): List of expert personas to use for multi_experts technique. - Returns: - List[str]: List of LLM responses as strings. - """ - import re # Add explicit import for regex - - num_responses = num_responses if num_responses is not None else self.num_responses - max_tokens = max_tokens or self.max_tokens - temperature_min_max = temperature_min_max if temperature_min_max is not None else self.temperature_min_max - candidates = [] - - # Ensure temperature_min_max has at least 2 elements - if not isinstance(temperature_min_max, list) or len(temperature_min_max) < 2: - temp_min, temp_max = 0.0, 1.0 # Default values - else: - temp_min, temp_max = temperature_min_max[0], temperature_min_max[1] - - generation_technique = generation_technique.lower() - - if self.llm_profiles is not None and len(self.llm_profiles) > 0 and generation_technique == "multi_llm": - llms = self._get_llms_for_generation(num_responses) - - # Prepare arguments for parallel execution - arg_dicts = [] - for i, llm in enumerate(llms): - profile_name = self.llm_profiles[i % len(self.llm_profiles)] if self.llm_profiles else "default" - modified_system_prompt = f"{system_prompt}\n\n[Using {profile_name} model for diverse perspective]" - - arg_dicts.append(dict( - system_prompt=modified_system_prompt, - user_prompt=user_prompt, - verbose=verbose, - max_tokens=max_tokens, - num_responses=1, - temperature=temp_min, - llm=llm # Use specific LLM - )) - - # Execute in parallel - try: - parallel_results = self._parallel_call_llm(arg_dicts) - candidates.extend(parallel_results) - except Exception as e: - if verbose: - print(f"Error in multi_llm mode: {e} – falling back to temperature variation") - generation_technique = "temperature_variation" - candidates = [] - - if generation_technique == "self_refinement": - # Generate solutions by refining previous ones - for i in range(num_responses): - if not candidates: - meta_prompt = system_prompt - else: - meta_prompt = f"{system_prompt}\nRefine the previous solution to given problem in order to answer with a much better answer & suggestion to the problem (use the same JSON format / suggest only trainable codes/variables to modify, never inputs), PREVIOUS SOLUTION:<<<\n{candidates[-1]}\n>>>" - - response = self.call_llm( - system_prompt=meta_prompt, - user_prompt=user_prompt, - verbose=verbose, - max_tokens=max_tokens, - num_responses=1, - temperature=temp_min, - ) - - if response and len(response) > 0: - candidates.append(response[0]) - - elif generation_technique == "iterative_alternatives": - # Generate alternatives informed by previous solutions - for i in range(num_responses): - meta_prompt = system_prompt - if i > 0 and candidates: - # Generate a new alternative based on all previous - previous_solutions = "\n".join( - f"CANDIDATE {idx + 1}: <<<\n{cand}\n>>>" - for idx, cand in enumerate(candidates) - ) - meta_prompt = f"{system_prompt}\nGiven the following prior CANDIDATE solutions, answer with a very different new CANDIDATE optimal solution to user's prompt using their same JSON format (suggest only trainable codes/variables to modify, never inputs):\n{previous_solutions}\n" - - response = self.call_llm( - system_prompt=meta_prompt, - user_prompt=user_prompt, - verbose=verbose, - max_tokens=max_tokens, - num_responses=1, - temperature=temp_min, - ) - - if response and len(response) > 0: - candidates.append(response[0]) - - elif generation_technique == "multi_experts": - # 1. Determine expert list (either passed in or generated) - experts = [] - if isinstance(experts_list, list) and all(isinstance(e, str) for e in experts_list): - while len(experts) < num_responses: - experts.append(experts_list[len(experts) % len(experts_list)]) - - else: - # ask LLM to output a JSON array of expert persona strings - expert_json = self.call_llm( - system_prompt="Generate a list of complementaty experts to optimize a problem as a JSON string array (example: [\"AI Engineer\", \"Compiler Specialist\", ...]).", - user_prompt=( - f"NUMBER OF EXPERTS TO GENERATE: {num_responses}\n" - f"PROBLEM SUBMITED TO EXPERTS:\n<<<\n{system_prompt}\n>>>\n" - f"JSON ARRAY LIST OF EXPERTS:" - ), - num_responses=1, - temperature=0.0, - verbose=verbose, - ) - # Handle case where no response is returned - if not expert_json or len(expert_json) == 0: - if verbose: print("Failed to generate expert list, using default experts") - else: - try: - experts = json.loads(expert_json[0]) - except json.JSONDecodeError: - print(f"Failed to parse expert JSON: {expert_json}") - experts = [] - if not isinstance(experts, list): - if isinstance(experts, dict) and len(experts) == 1 and isinstance(next(iter(experts.values())), list): - experts = next(iter(experts.values())) - else: - if verbose: print(f"Expected JSON array for experts, got {experts} type {type(experts).__name__} => using default experts") - experts = [] - - # if experts is empty or does not contain the expected number of experts, use default - if not experts or len(experts) <= num_responses: - default_experts = ["Algorithm Expert", "Performance Optimizer", "Out of the box problem solver", "AI Engineer", "Compiler Specialist"] - while len(experts) < num_responses: - experts.append(default_experts[len(experts) % len(default_experts)]) - print(f"Generated experts: {experts}") - - # 2. For each expert, prepare a system prompt + user prompt - # Build kwargs once … - arg_dicts = [] - for expert in experts[:num_responses]: - meta_prompt = ( - f"You are a `{expert}`\nProvide your most optimized " - f"solution for the problem below.\n{self.output_format_prompt}" - ) - arg_dicts.append(dict( system_prompt=meta_prompt, user_prompt=f"PROBLEM:\n\n{user_prompt}", verbose=verbose, max_tokens=max_tokens, num_responses=1, temperature=0.0,)) - # … and fire them off in parallel, with proper exception handling - try: - parallel_results = self._parallel_call_llm(arg_dicts) - for raw in parallel_results: - sol = raw.strip().removeprefix("<<<").removesuffix(">>>").strip() - candidates.append(sol) - except Exception as e: - if verbose: - print(f"Error in multi_experts mode: {e} – falling back to temperature variation") - generation_technique = "temperature_variation" - candidates = [] - - # Default to temperature variation - if not candidates or generation_technique == "temperature_variation": - if generation_technique != "temperature_variation": - print(f"Unknown generation technique: {generation_technique}, defaulting to temperature_variation") - # Use progressive temperature variation to generate diverse candidates - temperatures = [temp_max - i * (temp_max - temp_min) / max(1, num_responses - 1) for i in range(num_responses)] - - if verbose: - print(f"Temperatures for responses: {temperatures}") - - # Prepare one kwargs-dict per temperature … - arg_dicts = [ dict( system_prompt=system_prompt, user_prompt=user_prompt, verbose=verbose, max_tokens=max_tokens, num_responses=1, temperature=t,) for t in temperatures] - # Thenm call them concurrently - candidates.extend(self._parallel_call_llm(arg_dicts)) - - if not candidates and verbose: - print("Warning: Failed to generate any candidates") - - if self.log is not None: - self.log.append({"system_prompt": system_prompt, "user_prompt": user_prompt, "response": candidates, "generation_technique": generation_technique, "llm_profiles": self.llm_profiles}) - # only build a problem instance if we actually have one - pi = self.problem_instance(summary) if summary is not None else {} - self.summary_log.append({"problem_instance": pi, "summary": summary}) - return candidates - - def select_candidate(self, candidates: List, selection_technique="moa", problem_summary="") -> Dict: - """ - Select the best response based on the candidates using various techniques. - - Args: - candidates (List): List of candidate responses from generate_candidates. - selection_technique (str): Technique to select the best response: - - "moa" or "mixture_of_agents": Use LLM to mix the best elements of each response - - "majority": Use LLM to choose the most frequent candidate - - "lastofn" or "last_of_n" (choose also if selection technique is unknown): Simply return the last candidate - - Returns: - Dict: The selected candidate or an empty dictionary if no candidates exist. - """ - if not candidates: - return {} - elif len(candidates) <= 1: - return candidates[0] if candidates else {} - - # Normalize selection technique name for case-insensitive comparison - selection_technique = selection_technique.lower() - - # Extract text from candidates for analysis - candidate_texts = [] - for candidate in candidates: - if isinstance(candidate, dict): - # For _step, candidates are dicts with various fields - text = candidate.get("text", "") - if not text and "suggestion" in candidate: - text = str(candidate["suggestion"]) - else: - # In case we're passed raw strings - text = str(candidate) - candidate_texts.append(text) - - # Handle different selection techniques - if selection_technique in ["moa", "mixture_of_agents"]: - return self._select_moa(candidates, candidate_texts, problem_summary) - elif selection_technique in ["bestofn", "best_of_n"]: - return self._select_bestofn(candidates, candidate_texts, problem_summary) - elif selection_technique in ["majority"]: - return self._select_majority(candidates, candidate_texts, problem_summary) - else: # default to lastofn/last_of_n - return candidates[-1] - - def _select_moa(self, candidates, candidate_texts, summary=None): - """Mixture of Agents selection - combines best elements from all candidates""" - # Construct the prompt for mixture of agents - meta_prompt = ( - "You are an expert at synthesizing multiple solutions into a single optimal solution." - "Given the following responses to a problem, provide an optimal response " - "that mixes the best elements of each (suggest only trainable codes/variables to modify, never inputs)" - f"{self.output_format_prompt}" - ) - - user_prompt = f"Problem:\n{summary}\n\n" if summary else "" - # Add all candidate responses - for i, text in enumerate(candidate_texts): - user_prompt += f"Response {i + 1}:\n{text}\n\n" - - # Call LLM to synthesize a response - system_prompt = meta_prompt - response = self.call_llm( - system_prompt=system_prompt, - user_prompt=user_prompt, - num_responses=1, - temperature=0.0 - ) - - return response[0] if (response and response[0]) else candidates[-1] - - def _select_bestofn(self, candidates, candidate_texts, summary=None): - """Best of N selection - chooses the most promising candidate""" - user_prompt = f"Problem:\n{summary}\n\n" if summary else "" - - # Add all candidate responses - for i, text in enumerate(candidate_texts): - user_prompt += f"Candidate {i + 1}:\n{text}\n\n" - - meta_prompt = ( - "You are an expert at evaluating solutions and selecting the most promising one." - f"Given the following candidate solutions to a problem" - "First, reason by analyzing each candidate's answer/suggestion strengths and weaknesses, then identify the reply with the most promising candidate. " - f"{self.output_format_prompt}" - ) - - # Call LLM to select the best candidate - response = self.call_llm( - system_prompt=meta_prompt, - user_prompt=user_prompt, - num_responses=1, - temperature=0.0 - ) - - return response[0] if (response and response[0]) else candidates[-1] - - def _select_majority(self, candidates, candidate_texts, summary=None): - """Majority selection - finds the consensus solution among candidates""" - if len(candidate_texts) <= 1: - return candidates[0] if candidates else {} - - # Check if we can use clustering approach - try: - import numpy as np - from difflib import SequenceMatcher - from sklearn.cluster import AgglomerativeClustering - from collections import Counter - - # Build distance matrix based on text similarity - n = len(candidate_texts) - D = np.zeros((n, n)) - for i in range(n): - for j in range(i + 1, n): - sim = SequenceMatcher(None, candidate_texts[i], candidate_texts[j]).ratio() - D[i, j] = D[j, i] = 1 - sim # Convert similarity to distance - - # Cluster the responses using hierarchical clustering - try: - clu = AgglomerativeClustering( n_clusters=None, affinity="precomputed", linkage="complete", distance_threshold=0.2).fit(D) # old sklearn version - except TypeError: - clu = AgglomerativeClustering( n_clusters=None, metric="precomputed", linkage="complete", distance_threshold=0.2).fit(D) # new sklearn version >= 1.4 - - # Find the largest cluster - labels = clu.labels_ - if len(set(labels)) == 1: # All in one cluster - return candidates[-1] - - # Get the most common label (largest cluster) - top_label = Counter(labels).most_common(1)[0][0] - - # Find indices of candidates in the largest cluster - cluster_indices = [i for i, lab in enumerate(labels) if lab == top_label] - - # Find the medoid of the cluster (most central member) - sub_distances = D[np.ix_(cluster_indices, cluster_indices)] - medoid_idx_in_cluster = int(np.argmin(sub_distances.sum(axis=1))) - medoid_idx = cluster_indices[medoid_idx_in_cluster] - - return candidates[medoid_idx] - - except (ImportError, Exception) as e: - print(f"Error in majority selection: {str(e)} – falling back to last candidate") - # Fallback to last candidate - return candidates[-1] - - def _step( - self, - verbose=False, - mask=None, - num_responses: Optional[int] = None, - temperature_min_max: Optional[List[float]] = None, - selector: callable = None, - generation_technique: str = None, - selection_technique: str = None, - experts_list: Optional[List[str]] = None, - *args, - **kwargs, - ) -> Dict: - """ - Perform a single optimization step, storing responses in self.responses and allowing selection. - Args: - verbose (bool): Whether to print debug information. - mask (list, optional): Mask for the problem instance. - num_responses (int): Number of responses to request from the LLM. - temperature (float): Sampling temperature for the LLM. - selector (callable, optional): Function to select the best response. - Returns: - Dict: The update dictionary based on the selected response. - """ - num_responses = num_responses or self.num_responses - temperature_min_max = temperature_min_max or self.temperature_min_max - selector = selector or self.selector - generation_technique = generation_technique or self.generation_technique - selection_technique = selection_technique or self.selection_technique - experts_list = experts_list or self.experts_list - - assert isinstance(self.propagator, GraphPropagator) - summary = self.summarize() - system_prompt, user_prompt = self.construct_prompt(summary, mask=mask) - - system_prompt = self.replace_symbols(system_prompt, self.prompt_symbols) - user_prompt = self.replace_symbols(user_prompt, self.prompt_symbols) - - # Generate candidates - self.candidates = self.generate_candidates( - summary, - system_prompt, - user_prompt, - verbose=verbose, - mask=mask, - num_responses=num_responses, - temperature_min_max=temperature_min_max, - generation_technique=generation_technique, - experts_list=experts_list, - ) - - if verbose: - print(f"OptoPrimeMulti > Generated candidates (self.candidates): {self.candidates}") - - if "TERMINATE" in self.candidates: return {} - - # Select the response using the selector or the default select_candidate method - if selector and callable(selector): # Ensure the selector is callable - self.selected_candidate = selector(self.candidates) - else: - self.selected_candidate = self.select_candidate(candidates=self.candidates, selection_technique=selection_technique, problem_summary=system_prompt) - - if verbose: print(f"OptoPrimeMulti > Selected candidate (self.selected_candidate): {self.selected_candidate}") +from opto.optimizers.optoprimemulti_base import OptoPrimeMultiMixin +from opto.utils.llm import LLMFactory # Backwards-compatible import location used by existing tests/users. - suggestion = self.extract_llm_suggestion(self.selected_candidate) - if not suggestion: - # Last-ditch: maybe caller already gave us the mapping - if isinstance(self.selected_candidate, dict): - if verbose: print("OptoPrimeMulti > No suggestion found, but candidate is a dict. Using it as suggestion.") - suggestion = self.selected_candidate - - if verbose: print(f"OptoPrimeMulti > Extracted suggestion: {suggestion}") - update_dict = self.construct_update_dict(suggestion) - if verbose: print(f"OptoPrimeMulti > Constructed update_dict: {update_dict}") - return update_dict +class OptoPrimeMulti(OptoPrimeMultiMixin, OptoPrime): + """Multi-candidate OptoPrime optimizer built on the V1 OptoPrime contract.""" diff --git a/opto/optimizers/optoprimemulti_base.py b/opto/optimizers/optoprimemulti_base.py new file mode 100644 index 00000000..b2cf571b --- /dev/null +++ b/opto/optimizers/optoprimemulti_base.py @@ -0,0 +1,576 @@ +from __future__ import annotations + +import json +import warnings +from collections import Counter +from concurrent.futures import ThreadPoolExecutor, as_completed +from difflib import SequenceMatcher +from typing import Any, Callable, Dict, List, Optional, Union + + +class OptoPrimeMultiMixin: + """Shared multi-candidate machinery for OptoPrime-style optimizers. + + The concrete optimizer decides the prompt/extraction contract through its + normal OptoPrime base class. This mixin only fans out LLM calls, selects a + candidate, and feeds parsed variable updates back into + ``construct_update_dict``. + """ + + def __init__( + self, + *args, + num_responses: int = 3, + temperature_min_max: Optional[List[float]] = None, + selector: Optional[Callable[[List[Any]], Any]] = None, + generation_technique: str = "temperature_variation", + selection_technique: str = "best_of_n", + experts_list: Optional[List[str]] = None, + llm_profiles: Optional[List[str]] = None, + llm_weights: Optional[List[float]] = None, + **kwargs, + ): + super().__init__(*args, **kwargs) + self.temperature_min_max = list(temperature_min_max or [0.0, 1.0]) + self.candidates: List[Any] = [] + self.candidate_details: List[Dict[str, Any]] = [] + self.selected_candidate: Any = None + self.selected_candidate_details: Optional[Dict[str, Any]] = None + self.num_responses = num_responses + self.selector = selector + self.generation_technique = generation_technique + self.selection_technique = selection_technique + self.experts_list = experts_list + self.llm_profiles = llm_profiles + self.llm_weights = llm_weights or ([1.0] * len(llm_profiles) if llm_profiles else None) + self._llm_instances: Dict[str, Any] = {} + + def _get_llm_for_profile(self, profile: Optional[str] = None): + if profile is None: + return self.llm + if profile not in self._llm_instances: + try: + import sys + import opto.utils.llm as llm_module + + owner_module = sys.modules.get(self.__class__.__module__) + owner_factory = getattr(owner_module, "LLMFactory", None) + if owner_factory is not None and getattr(owner_factory, "__module__", None) != "opto.utils.llm": + factory = owner_factory + else: + factory = llm_module.LLMFactory + self._llm_instances[profile] = factory.get_llm(profile) + except Exception as exc: + warnings.warn( + f"Failed to create LLM for profile '{profile}': {exc}. Falling back to default LLM.", + stacklevel=2, + ) + return self.llm + return self._llm_instances[profile] + + def _get_llms_for_generation(self, num_responses: int) -> List[Any]: + if not self.llm_profiles: + return [self.llm] * num_responses + return [self._get_llm_for_profile(self.llm_profiles[i % len(self.llm_profiles)]) for i in range(num_responses)] + + def _llm_response_format(self) -> Optional[Dict[str, str]]: + return {"type": "json_object"} if getattr(self, "use_json_object_format", False) else None + + def _extract_contents(self, response: Any) -> List[str]: + if response is None: + return [] + if isinstance(response, str): + return [response] + if isinstance(response, list): + contents: List[str] = [] + for item in response: + contents.extend(self._extract_contents(item)) + return contents + choices = getattr(response, "choices", None) + if choices is None and isinstance(response, dict): + choices = response.get("choices") + if choices is None: + return [str(response)] + + contents = [] + for choice in choices: + message = getattr(choice, "message", None) + if message is not None and hasattr(message, "content"): + contents.append(message.content) + elif isinstance(choice, dict): + message = choice.get("message", {}) + contents.append(message.get("content", str(choice)) if isinstance(message, dict) else str(choice)) + else: + contents.append(str(choice)) + return [content for content in contents if isinstance(content, str)] + + def _call_llm_responses( + self, + system_prompt: str, + user_prompt: str, + verbose: Union[bool, str] = False, + max_tokens: int = 4096, + num_responses: int = 1, + temperature: float = 0.0, + llm: Any = None, + ) -> List[str]: + if verbose not in (False, "output"): + print("Prompt\n", system_prompt + user_prompt) + + active_llm = llm or self.llm + messages = [{"role": "system", "content": system_prompt}, {"role": "user", "content": user_prompt}] + response_format = self._llm_response_format() + + def invoke(include_response_format: bool): + kwargs = { + "messages": messages, + "max_tokens": max_tokens, + "n": num_responses, + "temperature": temperature, + } + if include_response_format and response_format is not None: + kwargs["response_format"] = response_format + if hasattr(active_llm, "create"): + return active_llm.create(**kwargs) + return active_llm(**kwargs) + + try: + response = invoke(include_response_format=True) + except Exception as exc: + if response_format is None: + if verbose: + print(f"ERROR {exc}") + return [] + try: + response = invoke(include_response_format=False) + except Exception as inner_exc: + if verbose: + print(f"ERROR {inner_exc}") + return [] + + contents = self._extract_contents(response) + if verbose: + print("LLM responses:\n", contents) + return contents + + def call_llm( + self, + system_prompt: str, + user_prompt: str, + verbose: Union[bool, str] = False, + max_tokens: int = 4096, + num_responses: int = 1, + temperature: float = 0.0, + llm: Any = None, + ) -> List[str]: + """V1-compatible multi-response LLM helper.""" + return self._call_llm_responses( + system_prompt=system_prompt, + user_prompt=user_prompt, + verbose=verbose, + max_tokens=max_tokens, + num_responses=num_responses, + temperature=temperature, + llm=llm, + ) + + def _parallel_call_llm(self, arg_dicts: List[Dict[str, Any]]) -> List[str]: + if not arg_dicts: + return [] + outputs: List[Optional[str]] = [None] * len(arg_dicts) + with ThreadPoolExecutor(max_workers=min(8, len(arg_dicts))) as pool: + futures = {pool.submit(self._call_llm_responses, **kwargs): idx for idx, kwargs in enumerate(arg_dicts)} + for future in as_completed(futures): + idx = futures[future] + try: + responses = future.result() + if responses: + outputs[idx] = responses[0] + except Exception as exc: + if arg_dicts[idx].get("verbose"): + print(f"[parallel-llm] worker {idx} failed: {exc}") + return [output for output in outputs if output is not None] + + def _candidate_text(self, candidate: Any) -> str: + if candidate is None: + return "" + if isinstance(candidate, str): + return candidate.strip() + if isinstance(candidate, dict): + if "raw" in candidate or "text" in candidate: + return str(candidate.get("raw", candidate.get("text", ""))).strip() + return str(candidate).strip() + return str(candidate).strip() + + def _candidate_variables_from_extraction(self, extracted: Any) -> Dict[str, Any]: + if not isinstance(extracted, dict): + return {} + if isinstance(extracted.get("variables"), dict): + return extracted["variables"] + if isinstance(extracted.get("suggestion"), dict): + return extracted["suggestion"] + return { + key: value + for key, value in extracted.items() + if key not in {"reasoning", "answer", "raw", "text", "parsed", "valid", "terminate"} + } + + def _parse_candidate(self, candidate: Any) -> Dict[str, Any]: + if isinstance(candidate, dict) and any(key in candidate for key in ("variables", "parsed", "terminate", "valid")): + variables = self._candidate_variables_from_extraction(candidate) + return { + "raw": candidate.get("raw", self._candidate_text(candidate)), + "parsed": candidate.get("parsed", candidate), + "variables": variables, + "reasoning": candidate.get("reasoning", ""), + "valid": bool(candidate.get("valid", variables)), + "terminate": bool(candidate.get("terminate", False)), + } + if isinstance(candidate, dict): + variables = self._candidate_variables_from_extraction(candidate) + return { + "raw": candidate, + "parsed": candidate, + "variables": variables, + "reasoning": candidate.get("reasoning", ""), + "valid": bool(variables), + "terminate": False, + } + + raw_text = self._candidate_text(candidate) + if raw_text.upper() == "TERMINATE": + return {"raw": raw_text, "parsed": {}, "variables": {}, "reasoning": "", "valid": False, "terminate": True} + try: + extracted = self.extract_llm_suggestion(raw_text) + except Exception: + extracted = {} + variables = self._candidate_variables_from_extraction(extracted) + return { + "raw": raw_text, + "parsed": extracted, + "variables": variables, + "reasoning": extracted.get("reasoning", "") if isinstance(extracted, dict) else "", + "valid": bool(variables), + "terminate": False, + } + + def _resolve_candidate_details(self, candidate: Any, candidates: List[Any], verbose: bool = False) -> Dict[str, Any]: + details = self._parse_candidate(candidate) + if details["valid"]: + return details + for fallback in reversed(candidates): + fallback_details = self._parse_candidate(fallback) + if fallback_details["valid"]: + if verbose: + print("Falling back to the last parseable candidate.") + return fallback_details + return details + + def _build_update_dict_from_candidate(self, candidate: Any, candidates: List[Any], verbose: bool = False): + details = self._resolve_candidate_details(candidate, candidates, verbose=verbose) + if not details["valid"]: + return {}, details + return self.construct_update_dict(details["variables"]), details + + def generate_candidates( + self, + summary: Any, + system_prompt: str, + user_prompt: str, + verbose: Union[bool, str] = False, + mask: Any = None, + max_tokens: Optional[int] = None, + num_responses: int = 3, + generation_technique: str = "temperature_variation", + temperature_min_max: Optional[List[float]] = None, + experts_list: Optional[List[str]] = None, + ) -> List[str]: + del mask + num_responses = num_responses or self.num_responses + max_tokens = max_tokens or self.max_tokens + temp_range = list(temperature_min_max or self.temperature_min_max or [0.0, 1.0]) + temp_min, temp_max = (temp_range[0], temp_range[1]) if len(temp_range) >= 2 else (0.0, 1.0) + generation_technique = (generation_technique or "temperature_variation").lower() + candidates: List[str] = [] + + if generation_technique == "multi_llm" and self.llm_profiles: + llms = self._get_llms_for_generation(num_responses) + arg_dicts = [ + { + "system_prompt": f"{system_prompt}\n\n[LLM profile: {self.llm_profiles[idx % len(self.llm_profiles)]}]", + "user_prompt": user_prompt, + "verbose": verbose, + "max_tokens": max_tokens, + "num_responses": 1, + "temperature": temp_min, + "llm": llm, + } + for idx, llm in enumerate(llms) + ] + candidates.extend(self._parallel_call_llm(arg_dicts)) + elif generation_technique == "self_refinement": + for _ in range(num_responses): + meta_prompt = system_prompt + if candidates: + meta_prompt = ( + f"{system_prompt}\n\nRefine the previous candidate for the same problem. " + "Preserve the exact output format specified above. Suggest changes only for " + "trainable variables or trainable code. Never modify fixed inputs.\n" + f"PREVIOUS_CANDIDATE:\n<<<\n{candidates[-1]}\n>>>" + ) + response = self._call_llm_responses( + system_prompt=meta_prompt, + user_prompt=user_prompt, + verbose=verbose, + max_tokens=max_tokens, + num_responses=1, + temperature=temp_min, + ) + if response: + candidates.append(response[0]) + elif generation_technique == "iterative_alternatives": + for _ in range(num_responses): + meta_prompt = system_prompt + if candidates: + previous = "\n".join(f"CANDIDATE {idx + 1}:\n<<<\n{candidate}\n>>>" for idx, candidate in enumerate(candidates)) + meta_prompt = ( + f"{system_prompt}\n\nGenerate a materially different new candidate for the same problem. " + "Preserve the exact output format specified above. Suggest changes only for " + "trainable variables or trainable code. Never modify fixed inputs.\n" + f"{previous}" + ) + response = self._call_llm_responses( + system_prompt=meta_prompt, + user_prompt=user_prompt, + verbose=verbose, + max_tokens=max_tokens, + num_responses=1, + temperature=temp_min, + ) + if response: + candidates.append(response[0]) + elif generation_technique == "multi_experts": + experts = self._expert_list(system_prompt, user_prompt, num_responses, verbose, max_tokens, experts_list) + arg_dicts = [ + { + "system_prompt": ( + f"You are a `{expert}`. Provide your strongest candidate solution for the problem below. " + f"Follow the exact output format specified below.\n{self.output_format_prompt}" + ), + "user_prompt": f"PROBLEM:\n\n{user_prompt}", + "verbose": verbose, + "max_tokens": max_tokens, + "num_responses": 1, + "temperature": 0.0, + } + for expert in experts[:num_responses] + ] + candidates.extend(self._parallel_call_llm(arg_dicts)) + + if not candidates or generation_technique == "temperature_variation": + if generation_technique not in {"temperature_variation", "multi_llm", "self_refinement", "iterative_alternatives", "multi_experts"} and verbose: + print(f"Unknown generation technique: {generation_technique}; defaulting to temperature_variation") + temperatures = [temp_max - i * (temp_max - temp_min) / max(1, num_responses - 1) for i in range(num_responses)] + arg_dicts = [ + { + "system_prompt": system_prompt, + "user_prompt": user_prompt, + "verbose": verbose, + "max_tokens": max_tokens, + "num_responses": 1, + "temperature": temperature, + } + for temperature in temperatures + ] + candidates.extend(self._parallel_call_llm(arg_dicts)) + + if self.log is not None: + self.log.append( + { + "system_prompt": system_prompt, + "user_prompt": user_prompt, + "response": candidates, + "generation_technique": generation_technique, + "llm_profiles": self.llm_profiles, + } + ) + self.summary_log.append({"problem_instance": self.problem_instance(summary) if summary is not None else {}, "summary": summary}) + return candidates + + def _expert_list( + self, + system_prompt: str, + user_prompt: str, + num_responses: int, + verbose: Union[bool, str], + max_tokens: int, + experts_list: Optional[List[str]], + ) -> List[str]: + experts: List[str] = [] + if isinstance(experts_list, list) and all(isinstance(expert, str) for expert in experts_list): + experts = list(experts_list) + else: + expert_json = self._call_llm_responses( + system_prompt=( + "Generate a JSON object with key `experts`, where `experts` is an array " + "of complementary expert persona strings that would help optimize the problem." + ), + user_prompt=f"NUMBER_OF_EXPERTS={num_responses}\nPROBLEM:\n<<<\n{system_prompt}\n{user_prompt}\n>>>", + verbose=verbose, + max_tokens=max_tokens, + num_responses=1, + temperature=0.0, + ) + if expert_json: + try: + parsed = json.loads(expert_json[0]) + if isinstance(parsed, dict): + if isinstance(parsed.get("experts"), list): + parsed = parsed["experts"] + elif len(parsed) == 1 and isinstance(next(iter(parsed.values())), list): + parsed = next(iter(parsed.values())) + else: + parsed = [] + if isinstance(parsed, list): + experts = [str(expert) for expert in parsed] + except Exception: + experts = [] + default_experts = ["Algorithm Expert", "Performance Optimizer", "Prompt Engineer", "Compiler Specialist", "Critical Reviewer"] + while len(experts) < num_responses: + experts.append(default_experts[len(experts) % len(default_experts)]) + return experts + + def select_candidate(self, candidates: List[Any], selection_technique: str = "best_of_n", problem_summary: str = "") -> Any: + if not candidates: + return {} + if len(candidates) == 1: + return candidates[0] + selection_technique = (selection_technique or "last_of_n").lower() + candidate_texts = [self._candidate_text(candidate) for candidate in candidates] + if selection_technique in {"moa", "mixture_of_agents"}: + return self._select_moa(candidates, candidate_texts, problem_summary) + if selection_technique in {"bestofn", "best_of_n"}: + return self._select_bestofn(candidates, candidate_texts, problem_summary) + if selection_technique == "majority": + return self._select_majority(candidates, candidate_texts) + return candidates[-1] + + def _select_moa(self, candidates: List[Any], candidate_texts: List[str], summary: Optional[str] = None) -> Any: + system_prompt = ( + "You are an expert at synthesizing multiple candidate updates into one stronger candidate. " + f"Follow the exact output format specified below.\n{self.output_format_prompt}" + ) + prefix = f"PROBLEM:\n{summary}\n\n" if summary else "" + user_prompt = prefix + "".join(f"CANDIDATE {idx}:\n{text}\n\n" for idx, text in enumerate(candidate_texts, start=1)) + response = self._call_llm_responses(system_prompt=system_prompt, user_prompt=user_prompt, num_responses=1, temperature=0.0) + return response[0] if response else candidates[-1] + + def _select_bestofn(self, candidates: List[Any], candidate_texts: List[str], summary: Optional[str] = None) -> Any: + system_prompt = ( + "You are an expert evaluator of candidate optimizer updates. Choose or synthesize the most promising candidate update. " + f"Follow the exact output format specified below.\n{self.output_format_prompt}" + ) + prefix = f"PROBLEM:\n{summary}\n\n" if summary else "" + user_prompt = prefix + "".join(f"CANDIDATE {idx}:\n{text}\n\n" for idx, text in enumerate(candidate_texts, start=1)) + response = self._call_llm_responses(system_prompt=system_prompt, user_prompt=user_prompt, num_responses=1, temperature=0.0) + return response[0] if response else candidates[-1] + + def _select_majority(self, candidates: List[Any], candidate_texts: List[str]) -> Any: + if len(candidate_texts) <= 1: + return candidates[0] + most_common_text, count = Counter(candidate_texts).most_common(1)[0] + if count > 1: + return candidates[candidate_texts.index(most_common_text)] + try: + import numpy as np + from sklearn.cluster import AgglomerativeClustering + except Exception: + return candidates[-1] + + n = len(candidate_texts) + distance = np.zeros((n, n)) + for i in range(n): + for j in range(i + 1, n): + similarity = SequenceMatcher(None, candidate_texts[i], candidate_texts[j]).ratio() + distance[i, j] = distance[j, i] = 1 - similarity + try: + clusterer = AgglomerativeClustering(n_clusters=None, metric="precomputed", linkage="complete", distance_threshold=0.2) + except TypeError: + clusterer = AgglomerativeClustering(n_clusters=None, affinity="precomputed", linkage="complete", distance_threshold=0.2) + labels = clusterer.fit(distance).labels_ + if len(set(labels)) == 1: + return candidates[-1] + top_label = Counter(labels).most_common(1)[0][0] + indices = [idx for idx, label in enumerate(labels) if label == top_label] + sub_distances = distance[np.ix_(indices, indices)] + medoid_idx = indices[int(sub_distances.sum(axis=1).argmin())] + return candidates[medoid_idx] + + def _step( + self, + verbose: Union[bool, str] = False, + mask: Any = None, + num_responses: Optional[int] = None, + temperature_min_max: Optional[List[float]] = None, + selector: Optional[Callable[[List[Any]], Any]] = None, + generation_technique: Optional[str] = None, + selection_technique: Optional[str] = None, + experts_list: Optional[List[str]] = None, + *args, + **kwargs, + ) -> Dict[Any, Any]: + del args, kwargs + summary = self.summarize() + system_prompt, user_prompt = self.construct_prompt(summary, mask=mask) + if hasattr(self, "replace_symbols") and hasattr(self, "prompt_symbols"): + system_prompt = self.replace_symbols(system_prompt, self.prompt_symbols) + user_prompt = self.replace_symbols(user_prompt, self.prompt_symbols) + + num_responses = num_responses or self.num_responses + selector = selector or self.selector + generation_technique = generation_technique or self.generation_technique + selection_technique = selection_technique or self.selection_technique + experts_list = experts_list or self.experts_list + problem_summary = f"{system_prompt}\n\n{user_prompt}" + + self.candidates = self.generate_candidates( + summary=summary, + system_prompt=system_prompt, + user_prompt=user_prompt, + verbose=verbose, + mask=mask, + num_responses=num_responses, + temperature_min_max=temperature_min_max or self.temperature_min_max, + generation_technique=generation_technique, + experts_list=experts_list, + ) + self.candidate_details = [self._parse_candidate(candidate) for candidate in self.candidates] + + if not self.candidates: + self.selected_candidate = None + self.selected_candidate_details = None + return {} + if all(details["terminate"] for details in self.candidate_details): + self.selected_candidate = "TERMINATE" + self.selected_candidate_details = {"raw": "TERMINATE", "parsed": {}, "variables": {}, "reasoning": "", "valid": False, "terminate": True} + return {} + + self.selected_candidate = selector(self.candidates) if selector and callable(selector) else self.select_candidate( + candidates=self.candidates, + selection_technique=selection_technique, + problem_summary=problem_summary, + ) + update_dict, details = self._build_update_dict_from_candidate(self.selected_candidate, self.candidates, verbose=bool(verbose)) + self.selected_candidate_details = details + if verbose: + print( + "Generated candidates:", + self.candidates, + "\nSelected candidate:", + self.selected_candidate, + "\nSelected candidate details:", + details, + "\nUpdate dict:", + update_dict, + ) + return update_dict diff --git a/opto/optimizers/optoprimemulti_v2.py b/opto/optimizers/optoprimemulti_v2.py new file mode 100644 index 00000000..ae163cdc --- /dev/null +++ b/opto/optimizers/optoprimemulti_v2.py @@ -0,0 +1,31 @@ +from typing import Union + +from opto.optimizers.optoprime_v2 import OptoPrimeV2 +from opto.optimizers.optoprimemulti_base import OptoPrimeMultiMixin + + +class OptoPrimeMultiV2(OptoPrimeMultiMixin, OptoPrimeV2): + """Multi-candidate OptoPrime optimizer built on the V2 OptoPrime contract.""" + + def call_llm( + self, + system_prompt: str, + user_prompt: str, + verbose: Union[bool, str] = False, + max_tokens: int = 4096, + ) -> str: + """V2-compatible public LLM helper. + + Multi-candidate internals call ``_call_llm_responses`` directly. This + public method intentionally keeps OptoPrimeV2's single-string return + contract. + """ + responses = self._call_llm_responses( + system_prompt=system_prompt, + user_prompt=user_prompt, + verbose=verbose, + max_tokens=max_tokens, + num_responses=1, + temperature=0.0, + ) + return responses[0] if responses else "" diff --git a/tests/llm_optimizers_tests/test_optoprimemultiv2.py b/tests/llm_optimizers_tests/test_optoprimemultiv2.py new file mode 100644 index 00000000..32643966 --- /dev/null +++ b/tests/llm_optimizers_tests/test_optoprimemultiv2.py @@ -0,0 +1,49 @@ +import os + +import pytest + +from opto.optimizers import OptoPrimeMultiV2 +from opto.optimizers.optoprime_v2 import OptimizerPromptSymbolSetJSON +from opto.trace.nodes import GRAPH, node +from opto.utils.llm import LLM + + +def _test_model_name(): + if os.getenv("OPENAI_API_KEY"): + return os.getenv("TRACE_OPENAI_TEST_MODEL", "gpt-4o-mini") + if os.getenv("OPENROUTER_API_KEY"): + return os.getenv("TRACE_OPENROUTER_TEST_MODEL", "openrouter/openai/gpt-4o-mini") + return None + + +@pytest.fixture(autouse=True) +def clear_graph(): + GRAPH.clear() + yield + GRAPH.clear() + + +@pytest.mark.llm +def test_optoprimemultiv2_real_llm_json_smoke(): + model_name = _test_model_name() + if model_name is None: + pytest.skip("Set OPENAI_API_KEY or OPENROUTER_API_KEY to run this smoke test.") + os.environ["TRACE_LITELLM_MODEL"] = model_name + + prompt = node("alpha", name="prompt", trainable=True, description="The optimized value should become the exact string BETA_TOKEN.") + optimizer = OptoPrimeMultiV2( + [prompt], + llm=LLM(), + optimizer_prompt_symbol_set=OptimizerPromptSymbolSetJSON(), + num_responses=2, + generation_technique="temperature_variation", + selection_technique="best_of_n", + max_tokens=512, + ) + + optimizer.zero_feedback() + optimizer.backward(prompt, "Please update the single trainable variable so its new value is exactly BETA_TOKEN.") + updates = optimizer.step(bypassing=True) + + assert prompt in updates + assert "BETA_TOKEN" in str(updates[prompt]).upper() diff --git a/tests/unit_tests/test_optoprimemultiv2.py b/tests/unit_tests/test_optoprimemultiv2.py new file mode 100644 index 00000000..81859058 --- /dev/null +++ b/tests/unit_tests/test_optoprimemultiv2.py @@ -0,0 +1,332 @@ +import json + +import pytest + +from opto.optimizers import OptoPrimeMultiV2 +from opto.optimizers.optoprime_v2 import OptimizerPromptSymbolSetJSON +from opto.trace.nodes import GRAPH, node +from opto.utils.llm import DummyLLM, LLMFactory + + +def make_xml_candidate(var_name: str, value: str, reasoning: str = "update"): + return f"{reasoning}\n{var_name}{value}" + + +def make_json_candidate(var_name: str, value: str, reasoning: str = "update"): + return json.dumps({"reasoning": reasoning, "suggestion": {var_name: value}}) + + +class ScriptedResponder: + def __init__(self, var_name: str, value: str, output_mode: str = "xml"): + self.var_name = var_name + self.value = value + self.output_mode = output_mode + self.calls = [] + + def candidate(self, value: str | None = None): + if self.output_mode == "json": + return make_json_candidate(self.var_name, value or self.value) + return make_xml_candidate(self.var_name, value or self.value) + + def __call__(self, *args, **kwargs): + self.calls.append({"args": args, "kwargs": kwargs}) + messages = kwargs.get("messages", []) + system_prompt = messages[0]["content"] if messages else "" + if "expert persona" in system_prompt: + return json.dumps(["Algorithm Expert", "Prompt Engineer", "Reviewer"]) + if "LLM profile:" in system_prompt: + profile = system_prompt.split("LLM profile:", 1)[1].split("]", 1)[0].strip() + return self.candidate(value=f"{profile}_value") + if "Choose or synthesize the most promising candidate update" in system_prompt: + return self.candidate(value="selected_by_best_of_n") + if "synthesizing multiple candidate updates" in system_prompt: + return self.candidate(value="selected_by_moa") + if "PREVIOUS_CANDIDATE" in system_prompt: + return self.candidate(value="self_refined_value") + if "Generate a materially different new candidate" in system_prompt: + return self.candidate(value="iterative_value") + if "Provide your strongest candidate solution" in system_prompt: + return self.candidate(value="expert_value") + return self.candidate() + + +@pytest.fixture(autouse=True) +def clear_graph(): + GRAPH.clear() + yield + GRAPH.clear() + + +def test_optoprimemultiv2_default_xml_mode_does_not_force_json_object(): + prompt = node("alpha", name="prompt", trainable=True, description="simple prompt") + responder = ScriptedResponder(prompt.py_name, "beta", output_mode="xml") + optimizer = OptoPrimeMultiV2( + [prompt], + llm=DummyLLM(responder), + num_responses=2, + generation_technique="temperature_variation", + selection_technique="last_of_n", + max_tokens=256, + ) + + optimizer.zero_feedback() + optimizer.backward(prompt, "Change the trainable variable to beta.") + updates = optimizer.step(bypassing=True) + + assert updates[prompt] == "beta" + assert len(optimizer.candidates) == 2 + assert optimizer.selected_candidate_details["variables"][prompt.py_name] == "beta" + assert all("response_format" not in call["kwargs"] for call in responder.calls) + + +def test_optoprimemultiv2_json_symbol_set_uses_json_object_response_format(): + prompt = node("alpha", name="prompt", trainable=True, description="simple prompt") + responder = ScriptedResponder(prompt.py_name, "beta", output_mode="json") + optimizer = OptoPrimeMultiV2( + [prompt], + llm=DummyLLM(responder), + optimizer_prompt_symbol_set=OptimizerPromptSymbolSetJSON(), + num_responses=1, + generation_technique="temperature_variation", + selection_technique="last_of_n", + max_tokens=256, + ) + + optimizer.zero_feedback() + optimizer.backward(prompt, "Change the trainable variable to beta.") + updates = optimizer.step(bypassing=True) + + assert updates[prompt] == "beta" + assert any(call["kwargs"].get("response_format") == {"type": "json_object"} for call in responder.calls) + + +@pytest.mark.parametrize( + "generation_technique, expected_value", + [ + ("temperature_variation", "beta"), + ("self_refinement", "self_refined_value"), + ("iterative_alternatives", "iterative_value"), + ("multi_experts", "expert_value"), + ], +) +def test_optoprimemultiv2_generation_strategies(generation_technique, expected_value): + prompt = node("alpha", name="prompt", trainable=True, description="simple prompt") + responder = ScriptedResponder(prompt.py_name, "beta", output_mode="xml") + optimizer = OptoPrimeMultiV2( + [prompt], + llm=DummyLLM(responder), + num_responses=3, + generation_technique=generation_technique, + selection_technique="last_of_n", + experts_list=["Algorithm Expert", "Prompt Engineer", "Reviewer"], + max_tokens=256, + ) + + optimizer.zero_feedback() + optimizer.backward(prompt, f"Change the trainable variable to {expected_value}.") + updates = optimizer.step(bypassing=True) + + assert updates[prompt] == expected_value + assert optimizer.selected_candidate_details["valid"] is True + + +def test_optoprimemultiv2_falls_back_to_last_valid_candidate(): + prompt = node("alpha", name="prompt", trainable=True, description="simple prompt") + responder = ScriptedResponder(prompt.py_name, "beta", output_mode="xml") + optimizer = OptoPrimeMultiV2( + [prompt], + llm=DummyLLM(responder), + num_responses=2, + generation_technique="temperature_variation", + selection_technique="last_of_n", + selector=lambda candidates: "not a parseable candidate", + max_tokens=256, + ) + + optimizer.zero_feedback() + optimizer.backward(prompt, "Change the trainable variable to beta.") + updates = optimizer.step(bypassing=True) + + assert updates[prompt] == "beta" + assert optimizer.selected_candidate_details["valid"] is True + + +def test_optoprimemultiv2_selected_terminate_falls_back_to_last_valid_candidate(): + prompt = node("alpha", name="prompt", trainable=True, description="simple prompt") + responder = ScriptedResponder(prompt.py_name, "beta", output_mode="xml") + optimizer = OptoPrimeMultiV2( + [prompt], + llm=DummyLLM(responder), + num_responses=2, + generation_technique="temperature_variation", + selection_technique="last_of_n", + selector=lambda candidates: "TERMINATE", + max_tokens=256, + ) + + optimizer.zero_feedback() + optimizer.backward(prompt, "Change the trainable variable to beta.") + updates = optimizer.step(bypassing=True) + + assert updates[prompt] == "beta" + assert optimizer.selected_candidate_details["valid"] is True + assert optimizer.selected_candidate_details["terminate"] is False + + +def test_optoprimemultiv2_plain_dict_suggestion_candidate_is_parsed(): + prompt = node("alpha", name="prompt", trainable=True, description="simple prompt") + optimizer = OptoPrimeMultiV2( + [prompt], + llm=DummyLLM(lambda *args, **kwargs: "TERMINATE"), + max_tokens=256, + ) + + parsed = optimizer._parse_candidate( + { + "reasoning": "use the explicit suggestion mapping", + "suggestion": {prompt.py_name: "beta"}, + } + ) + + assert parsed["valid"] is True + assert parsed["variables"] == {prompt.py_name: "beta"} + + +def test_optoprimemultiv2_public_call_llm_returns_string_like_v2(): + prompt = node("alpha", name="prompt", trainable=True, description="simple prompt") + responder = ScriptedResponder(prompt.py_name, "beta", output_mode="xml") + optimizer = OptoPrimeMultiV2([prompt], llm=DummyLLM(responder), max_tokens=256) + + response = optimizer.call_llm("system", "user", max_tokens=32) + + assert isinstance(response, str) + assert "" in response + + +def test_optoprimemultiv2_all_terminate_returns_no_update(): + prompt = node("alpha", name="prompt", trainable=True, description="simple prompt") + optimizer = OptoPrimeMultiV2( + [prompt], + llm=DummyLLM(lambda *args, **kwargs: "TERMINATE"), + num_responses=2, + generation_technique="temperature_variation", + selection_technique="last_of_n", + max_tokens=256, + ) + + optimizer.zero_feedback() + optimizer.backward(prompt, "No useful update exists.") + updates = optimizer.step(bypassing=True) + + assert updates == {} + assert optimizer.selected_candidate == "TERMINATE" + + +def test_optoprimemultiv2_selection_techniques(): + prompt = node("alpha", name="prompt", trainable=True, description="simple prompt") + responder = ScriptedResponder(prompt.py_name, "beta", output_mode="xml") + optimizer = OptoPrimeMultiV2([prompt], llm=DummyLLM(responder), max_tokens=256) + candidate_a = make_xml_candidate(prompt.py_name, "value_a") + candidate_b = make_xml_candidate(prompt.py_name, "value_b") + + assert optimizer._parse_candidate( + optimizer.select_candidate([candidate_a, candidate_b], selection_technique="last_of_n", problem_summary="problem") + )["variables"][prompt.py_name] == "value_b" + assert optimizer._parse_candidate( + optimizer.select_candidate([candidate_a, candidate_a, candidate_b], selection_technique="majority", problem_summary="problem") + )["variables"][prompt.py_name] == "value_a" + assert optimizer._parse_candidate( + optimizer.select_candidate([candidate_a, candidate_b], selection_technique="best_of_n", problem_summary="problem") + )["variables"][prompt.py_name] == "selected_by_best_of_n" + assert optimizer._parse_candidate( + optimizer.select_candidate([candidate_a, candidate_b], selection_technique="moa", problem_summary="problem") + )["variables"][prompt.py_name] == "selected_by_moa" + + +def test_optoprimemultiv2_majority_duplicate_consensus_without_sklearn(monkeypatch): + prompt = node("alpha", name="prompt", trainable=True, description="simple prompt") + responder = ScriptedResponder(prompt.py_name, "beta", output_mode="xml") + optimizer = OptoPrimeMultiV2([prompt], llm=DummyLLM(responder), max_tokens=256) + + candidate_a = make_xml_candidate(prompt.py_name, "value_a") + candidate_b = make_xml_candidate(prompt.py_name, "value_b") + + import builtins + + original_import = builtins.__import__ + + def blocked_import(name, *args, **kwargs): + if name in {"numpy", "sklearn"} or name.startswith("sklearn"): + raise ImportError(name) + return original_import(name, *args, **kwargs) + + monkeypatch.setattr(builtins, "__import__", blocked_import) + + selected = optimizer.select_candidate( + [candidate_a, candidate_a, candidate_b], + selection_technique="majority", + problem_summary="problem", + ) + assert optimizer._parse_candidate(selected)["variables"][prompt.py_name] == "value_a" + + +def test_optoprimemultiv2_multi_experts_json_mode_accepts_experts_object(): + prompt = node("alpha", name="prompt", trainable=True, description="simple prompt") + + class ExpertObjectResponder(ScriptedResponder): + def __call__(self, *args, **kwargs): + self.calls.append({"args": args, "kwargs": kwargs}) + messages = kwargs.get("messages", []) + system_prompt = messages[0]["content"] if messages else "" + if "expert persona" in system_prompt: + return json.dumps({"experts": ["Algorithm Expert", "Reviewer"]}) + if "Provide your strongest candidate solution" in system_prompt: + return make_json_candidate(self.var_name, "expert_value") + return make_json_candidate(self.var_name, "fallback") + + responder = ExpertObjectResponder(prompt.py_name, "beta", output_mode="json") + optimizer = OptoPrimeMultiV2( + [prompt], + llm=DummyLLM(responder), + optimizer_prompt_symbol_set=OptimizerPromptSymbolSetJSON(), + num_responses=2, + generation_technique="multi_experts", + selection_technique="last_of_n", + max_tokens=256, + ) + + optimizer.zero_feedback() + optimizer.backward(prompt, "Change the trainable variable.") + updates = optimizer.step(bypassing=True) + + assert updates[prompt] == "expert_value" + assert any(call["kwargs"].get("response_format") == {"type": "json_object"} for call in responder.calls) + + +def test_optoprimemultiv2_multi_llm_generation_uses_profiles(monkeypatch): + prompt = node("alpha", name="prompt", trainable=True, description="simple prompt") + default_responder = ScriptedResponder(prompt.py_name, "default_value", output_mode="xml") + profile_calls = [] + + def fake_get_llm(profile): + profile_calls.append(profile) + return DummyLLM(lambda *args, **kwargs: make_xml_candidate(prompt.py_name, f"{profile}_value")) + + monkeypatch.setattr(LLMFactory, "get_llm", fake_get_llm) + optimizer = OptoPrimeMultiV2( + [prompt], + llm=DummyLLM(default_responder), + llm_profiles=["cheap", "premium"], + num_responses=3, + generation_technique="multi_llm", + selection_technique="last_of_n", + max_tokens=256, + ) + + optimizer.zero_feedback() + optimizer.backward(prompt, "Change the trainable variable.") + updates = optimizer.step(bypassing=True) + + assert profile_calls == ["cheap", "premium"] + assert len(optimizer.candidates) == 3 + assert updates[prompt] == "cheap_value"