Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
205 changes: 204 additions & 1 deletion src/synth_panel/cli/commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@
persona_system_prompt_from_template,
)
from synth_panel.question_budget import QuestionFailureBudget
from synth_panel.response_coercion import is_typed_schema
from synth_panel.runtime import AgentRuntime
from synth_panel.synthesis import (
STRATEGY_MAP_REDUCE,
Expand Down Expand Up @@ -874,6 +875,133 @@ def _estimate_output_tokens_per_response(question: Any) -> int:
return _DRY_RUN_OUTPUT_TOKENS_TEXT_DEFAULT


def _question_meta_for_save(questions: list[Any]) -> list[dict[str, Any]]:
"""Build the per-question metadata persisted alongside a saved result.

sy-547: ``poll-summary`` / ``analyze`` read the saved result's
``questions`` field to recover each question's ``response_schema`` and
bucket it as enum/scale. Previously the standard run path never passed
``questions=`` to :func:`save_panel_result`, so the schema was lost and
every question degraded to ``kind=text``. Emit a minimal
``{text, response_schema?}`` entry per authored question, preserving the
declared schema verbatim so the downstream ``_detect_kind`` is schema-
driven rather than guessing from response shape.
"""
meta: list[dict[str, Any]] = []
for q in questions:
if not isinstance(q, dict):
meta.append({"text": str(q)})
continue
entry: dict[str, Any] = {"text": build_question_prompt(q)}
rs = q.get("response_schema")
if rs is not None:
entry["response_schema"] = rs
meta.append(entry)
return meta


def _blend_dropped_models(ensemble: Any) -> list[str]:
"""Return blend members that produced zero usable responses (sy-546).

A model is "dropped" when, across all its panelists, every primary
(non-follow-up) response is an error or empty — i.e. the model
contributed nothing to the blended distributions. A bad slug that
404s on every call lands here, as does a model that died mid-run.
Order follows ``ensemble.models``.
"""
dropped: list[str] = []
for mr in ensemble.model_results:
usable = 0
for pr in mr.panelist_results:
for resp in pr.responses:
if not isinstance(resp, dict) or resp.get("follow_up"):
continue
if resp.get("error"):
continue
val = resp.get("response")
if val is None:
continue
if isinstance(val, str) and (not val.strip() or val.strip().startswith("[error:")):
continue
usable += 1
if usable == 0:
dropped.append(mr.model)
return dropped


def _run_model_preflight(
models: list[str],
*,
args: Any,
client: Any | None = None,
) -> int | None:
"""Probe *models* for reachability; return an exit code to abort or None.

sy-546. Aborts (returns 1) when the reachable-model count violates the
configured guard:

* ``--min-models N`` set → abort if fewer than N models are reachable.
* Otherwise (default, or ``--require-all-models``) → abort if ANY model
is unreachable.

Unreachable models are always reported. Inconclusive probes (transient /
auth / missing-credential failures) never count against the guard — they
are not a property of the slug.
"""
from synth_panel.preflight import preflight_models

print(
f"Pre-flight: probing {len(set(models))} model(s) for reachability...",
file=sys.stderr,
)
report = preflight_models(models, client=client)

reachable = [p.model for p in report.probes if p.status == "reachable"]
inconclusive = [p for p in report.probes if p.status == "inconclusive"]
for p in inconclusive:
print(
f"Pre-flight: could not verify {p.model} (inconclusive: {p.detail or 'unknown error'}); "
"proceeding — it will surface at call time if truly broken.",
file=sys.stderr,
)

min_models = getattr(args, "min_models", None)
require_all = getattr(args, "require_all_models", False)

if min_models is not None and not require_all:
# Relaxed guard: only abort if too few models are reachable. Treat
# inconclusive as potentially-usable so a flaky probe doesn't trip
# the floor.
usable = len(reachable) + len(inconclusive)
if usable < min_models:
if report.unreachable:
print(report.failure_message(), file=sys.stderr)
print(
f"Pre-flight failed: only {usable} model(s) reachable, --min-models requires {min_models}.",
file=sys.stderr,
)
return 1
if report.unreachable:
bad = ", ".join(p.model for p in report.unreachable)
print(
f"Pre-flight: WARNING — {len(report.unreachable)} model(s) unreachable ({bad}); "
f"proceeding with {len(reachable)} reachable model(s) per --min-models={min_models}.",
file=sys.stderr,
)
return None

# Default / --require-all-models: any unreachable slug is fatal.
if report.unreachable:
print(report.failure_message(), file=sys.stderr)
return 1

print(
f"Pre-flight: OK — all {len(reachable)} model(s) reachable.",
file=sys.stderr,
)
return None


def _iter_instrument_attachments(instrument: Instrument):
"""Yield every attachment dict the instrument would send to the model.

Expand Down Expand Up @@ -943,6 +1071,11 @@ def _emit_dry_run_preview(
question_count = len(questions)
llm_calls = persona_count * question_count

# sy-547 (d): count questions with an enforceable typed response_schema
# (enum/scale) so the preview can be explicit that these are coerced
# post-hoc, not constrained at generation.
typed_schema_count = sum(1 for q in questions if isinstance(q, dict) and is_typed_schema(q.get("response_schema")))

system_prompt_chars = sum(len(system_prompt_fn(p)) for p in personas)
question_chars = sum(len(build_question_prompt(q)) for q in questions)
follow_up_chars = 0
Expand Down Expand Up @@ -1037,7 +1170,20 @@ def _emit_dry_run_preview(
if vision_warning:
print(f"Validation: WARNING — {vision_warning}", file=sys.stderr)
else:
print("Validation: OK", file=sys.stderr)
print("Validation: OK (instrument spec is structurally valid)", file=sys.stderr)
# sy-547 (d): be explicit that a typed response_schema is NOT
# enforced at generation. The real run coerces free-text answers to
# the nearest enum option / in-range integer post-hoc and flags any
# that can't be mapped, but "Validation: OK" must not read as
# "output is guaranteed to be one of the options".
if typed_schema_count:
print(
f"Note: {typed_schema_count} question(s) declare an enum/scale response_schema. "
"These are NOT constrained at generation — the run coerces each free-text answer "
"to the nearest option / in-range integer post-hoc and flags unmappable answers. "
"Dry-run does not perform that coercion.",
file=sys.stderr,
)
return

preview: dict[str, Any] = {
Expand All @@ -1051,6 +1197,11 @@ def _emit_dry_run_preview(
"estimated_cost_usd": round(cost.total_cost, 6),
"cost_is_estimated": pricing_is_estimated,
"validation": "warning" if vision_warning else "ok",
# sy-547 (d): advertise that typed response_schemas are coerced
# post-hoc, not enforced at generation, so JSON consumers don't
# treat "validation: ok" as "output guaranteed constrained".
"typed_schema_question_count": typed_schema_count,
"typed_schema_enforced": False,
"rounds": [
{
"name": r.name,
Expand Down Expand Up @@ -1533,6 +1684,25 @@ def system_prompt_fn(persona: dict) -> str:
if persona_models:
print(format_assignment_breakdown(persona_models), file=sys.stderr)

# ── sy-546: model reachability pre-flight ────────────────────────────
# For multi-model runs (--models weighted / ensemble / --blend), probe
# every distinct slug with a 1-token call before spending. A bogus slug
# (e.g. a 404'ing OpenRouter id) deterministically fails every call and
# silently shrinks an ensemble/blend; catch it here and fail fast naming
# the bad slug(s). Runs on BOTH --dry-run and real runs so a dry-run's
# "OK" actually means the spec is runnable. Bypassable with
# --skip-preflight. Transient/auth/credential failures are inconclusive
# and never block the run.
preflight_models_list: list[str] = []
if model_spec is not None and len(model_spec) > 1:
preflight_models_list = [m for m, _w in model_spec]
elif persona_models:
preflight_models_list = sorted(set(persona_models.values()))
if len(set(preflight_models_list)) > 1 and not getattr(args, "skip_preflight", False):
rc = _run_model_preflight(preflight_models_list, args=args, client=None)
if rc is not None:
return rc

# ── sp-x8g: --dry-run preview ────────────────────────────────────────
# Short-circuit before any LLM-invoking code (variant expansion,
# ensemble, blend, orchestrator). Shows the user what each question
Expand Down Expand Up @@ -1940,6 +2110,7 @@ def system_prompt_fn(persona: dict) -> str:

# Run all panelists in parallel via the orchestrator
blend_result = None # populated only when --blend is active
blend_drop_warning: str | None = None # sy-546: set when a blend member dropped

# ── sp-hsk3: checkpoint + resume wiring ────────────────────────────
# We snapshot progress every K completed panelists so a crashed or
Expand Down Expand Up @@ -2098,6 +2269,24 @@ def system_prompt_fn(persona: dict) -> str:
blend_weights = {m: w for m, w in model_spec}
blend_result = blend_distributions(ensemble, weights=blend_weights, questions=questions)

# sy-546: detect blend members that contributed ZERO usable responses
# (e.g. a slug that 404'd on every call slipped past pre-flight, or a
# member died mid-run). The blend silently degrades to the survivors;
# emit a loud, top-level warning stating the new N so the operator
# isn't fooled into treating it as a full-strength blend.
dropped_models = _blend_dropped_models(ensemble)
if dropped_models:
surviving = [m for m in ensemble_models if m not in dropped_models]
blend_drop_warning = (
f"BLEND DEGRADED: {len(dropped_models)} of {len(ensemble_models)} model(s) "
f"produced no usable responses ({', '.join(dropped_models)}). "
f"The blend dropped to {len(surviving)} model(s): {', '.join(surviving) or 'none'}. "
"Distributions and synthesis reflect only the surviving model(s). "
"Re-run with corrected --models (or --require-all-models to abort instead)."
)
logger.warning(blend_drop_warning)
print(f"\nWarning: {blend_drop_warning}\n", file=sys.stderr)

# Flatten all panelist results across models for output + synthesis
panelist_results = [pr for mr in ensemble.model_results for pr in mr.panelist_results]
else:
Expand Down Expand Up @@ -2582,6 +2771,12 @@ def _on_complete(pr: PanelistResult) -> None:
persona_count=len(personas),
question_count=len(questions),
instrument_name=inst_name,
# sy-547: persist the authored question defs (text +
# response_schema) so poll-summary / analyze recognize enum/scale
# questions instead of falling back to kind=text. Without this the
# saved result carried only the question text echoed on each
# response, dropping the schema kind.
questions=_question_meta_for_save(questions),
models=all_models,
synthesis=synthesis_dict,
metadata=metadata,
Expand Down Expand Up @@ -2852,6 +3047,14 @@ def _cli_panelist_formatter(pr: PanelistResult, panel_model: str) -> dict[str, A
warnings_list = extra.setdefault("warnings", [])
if isinstance(warnings_list, list):
warnings_list.extend(assignment_warnings)
# sy-546: carry the blend-degraded warning into the JSON envelope so
# CI / MCP consumers detect the dropped member(s) without scraping
# stderr, and expose the count explicitly.
if blend_drop_warning is not None:
warnings_list = extra.setdefault("warnings", [])
if isinstance(warnings_list, list):
warnings_list.append(blend_drop_warning)
extra["blend_degraded"] = True
# sp-g270: surface --personas-merge name-collision drops so JSON
# consumers can assert panel size matches expectations. Always
# present (as []) when --personas-merge was used so downstream
Expand Down
36 changes: 36 additions & 0 deletions src/synth_panel/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,42 @@ def build_parser() -> argparse.ArgumentParser:
"can be passed to 'synthpanel analyze <RESULT_ID>'."
),
)
# sy-546: model reachability pre-flight guards for multi-model runs.
panel_run_parser.add_argument(
"--skip-preflight",
action="store_true",
default=False,
help=(
"Skip the model reachability pre-flight. By default, runs with "
"multiple models (--models, ensemble, or --blend) probe each slug "
"with a 1-token call before spending and abort if any slug is "
"unreachable (e.g. a bad OpenRouter model id that 404s)."
),
)
panel_run_parser.add_argument(
"--require-all-models",
action="store_true",
default=False,
help=(
"Abort the run if ANY model in --models is unreachable in "
"pre-flight, even when the others would still satisfy --min-models. "
"This is the default for multi-model runs; the flag is accepted "
"for explicitness and to override a relaxed --min-models."
),
)
panel_run_parser.add_argument(
"--min-models",
type=int,
default=None,
metavar="N",
help=(
"Minimum number of reachable models required to proceed with a "
"multi-model run. If pre-flight finds fewer than N reachable "
"models, the run aborts. Without this flag (and without "
"--require-all-models), the run still aborts on ANY unreachable "
"slug — set --min-models to deliberately allow a degraded run."
),
)
panel_run_parser.add_argument(
"--dry-run",
action="store_true",
Expand Down
29 changes: 29 additions & 0 deletions src/synth_panel/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from synth_panel.persistence import Session
from synth_panel.prompts import build_question_blocks
from synth_panel.question_budget import QuestionFailureBudget
from synth_panel.response_coercion import coerce_response, is_typed_schema
from synth_panel.routing import route_round
from synth_panel.runtime import AgentRuntime, TurnSummary
from synth_panel.structured.output import StructuredOutputConfig, StructuredOutputEngine
Expand Down Expand Up @@ -1180,6 +1181,34 @@ def _run_panelist(
}
tracker.record_turn(summary.usage)

# sy-547: when the question declares a typed
# ``response_schema`` (enum/scale), coerce the free-text
# answer to the nearest declared option / in-range integer
# and persist BOTH the raw text (``response``, untouched)
# and the typed value (``response_typed``). The schema kind
# is stamped on the response so poll-summary/analyze bucket
# the question as enum/scale instead of falling back to
# ``kind=text``. Unmappable answers set ``schema_unmapped``
# so the caller can tally a run-level failure count and emit
# a per-response warning.
q_schema = question.get("response_schema") if isinstance(question, dict) else None
if is_typed_schema(q_schema):
resp_dict["response_schema"] = q_schema
coerced = coerce_response(q_schema, response_text)
if coerced is not None:
if coerced.mapped:
resp_dict["response_typed"] = coerced.value
else:
resp_dict["schema_unmapped"] = True
logger.warning(
"panelist %s q%d: answer %r could not be mapped to "
"declared %s response_schema; stored as raw text only",
name,
qi,
response_text[:80],
coerced.kind,
)

# Extraction pass: extract structured data from the
# free-text response (--extract-schema).
if extract_engine and extract_config:
Expand Down
Loading
Loading