Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,16 @@

from __future__ import annotations

from dataclasses import asdict
from typing import Any

from nemo_retriever.ingest.execution import execute_ingest_plan
from nemo_retriever.ingest.plan import ResolvedIngestPlan
from nemo_retriever.ingest.service import (
ServiceIngestRequest,
execute_service_ingest_request,
service_split_config_for_request,
)
from nemo_retriever.ingest_manifest import format_branch_summary

_DRY_RUN_SECRET_FIELD_PATTERNS = ("api_key", "password", "secret", "credential", "bearer")
Expand Down Expand Up @@ -88,6 +94,23 @@ def _ingest_plan_to_dry_run_data(plan: ResolvedIngestPlan) -> dict[str, Any]:
}


def service_ingest_request_to_dry_run_data(request: ServiceIngestRequest) -> dict[str, Any]:
"""Return the JSON payload printed by ``retriever ingest --run-mode service --dry-run``."""
return {
"dry_run": True,
"run_mode": "service",
"documents": list(request.documents),
"input_type": request.input_type,
"service": _strip_secret_values(asdict(request.connection)),
"extract": _params_to_dry_run_dict(request.extract_params),
"split_config": _params_to_dry_run_dict(service_split_config_for_request(request)),
"dedup": _params_to_dry_run_dict(request.dedup_params),
"caption": _params_to_dry_run_dict(request.caption_params),
"embed": _params_to_dry_run_dict(request.embed_params),
"store": _params_to_dry_run_dict(request.store_params),
}


def run_ingest_workflow(
plan: ResolvedIngestPlan,
*,
Expand All @@ -98,3 +121,15 @@ def run_ingest_workflow(
return _ingest_plan_to_dry_run_data(plan)

return execute_ingest_plan(plan).to_summary_dict()


def run_service_ingest_workflow(
request: ServiceIngestRequest,
*,
dry_run: bool = False,
) -> dict[str, Any]:
"""Apply root ingest workflow policy to an already-resolved service request."""
if dry_run:
return service_ingest_request_to_dry_run_data(request)

return execute_service_ingest_request(request).to_summary_dict()
Loading
Loading