Skip to content

Move service ingest construction into ingest core#2221

Open
jioffe502 wants to merge 1 commit into
NVIDIA:mainfrom
jioffe502:codex/root-ingest-service-mode
Open

Move service ingest construction into ingest core#2221
jioffe502 wants to merge 1 commit into
NVIDIA:mainfrom
jioffe502:codex/root-ingest-service-mode

Conversation

@jioffe502

Copy link
Copy Markdown
Collaborator

Summary

  • add nemo_retriever.ingest.service for typed service ingest request/options, service request resolution, service ingestor construction, and execution summary
  • add root retriever ingest --run-mode service with service URL/concurrency/token options
  • make legacy retriever pipeline run --run-mode service bridge through the same service construction path
  • keep CLI-only service flag rejection in adapters/cli/service_flags.py instead of service core

Not moved

  • eval/recall/BEIR and QA sweep logic remain in legacy/eval paths
  • harness-managed Helm workflows remain unchanged
  • local/batch ingest planning and GraphIngestor execution remain in ingest.plan / ingest.execution

Validation

  • commit hooks on amend: trailing whitespace, EOF, large files, AST, debug statements, Black, flake8
  • py_compile on touched Python source files
  • root service dry-run smoke: token redaction and service request resolution
  • pipeline service-mode rejection smoke via shared CLI helper
  • live service smoke on MicroK8s/Helm: retriever ingest --run-mode service ingested jp20, 20 PDFs -> 3352 rows, service job completed 20/20 with 0 failures
  • service BEIR recall against jp20 via /v1/query: 115 queries, 0 empty-hit queries, recall@10=0.9652

Notes

pre-commit was not installed as a standalone command in the shell, but the repository commit hooks ran and passed during git commit --amend. pytest was not available in the local venv, so focused pytest was not run here.

@jioffe502 jioffe502 requested review from a team as code owners June 9, 2026 16:58
@jioffe502 jioffe502 requested a review from edknv June 9, 2026 16:58
@greptile-apps

greptile-apps Bot commented Jun 9, 2026

Copy link
Copy Markdown
Contributor

Greptile Summary

This PR extracts the service-mode ingest construction logic out of the legacy pipeline/__main__.py and the root CLI into a dedicated nemo_retriever.ingest.service module, then wires both entry points through it. The new module also introduces typed option dataclasses, a ServiceIngestPlanRequestServiceIngestRequest resolution step, and a ServiceIngestExecutionResult for structured summaries.

  • New ingest/service.py: Typed ServiceIngest*Options dataclasses, resolve_service_ingest_request, build_service_ingestor, execute_service_ingest_request, and helper builders for extract/dedup/caption/chunk/embed/store params; the \"auto\" input-type now builds a split config from actual file extensions (fixing a silent no-op in the old code for chunking + --input-type auto).
  • Root CLI (adapters/cli/main.py): Adds --run-mode service support alongside --service-url, --service-concurrency, --service-api-token; incompatible-flag rejection is delegated to the new shared service_flags.py helper.
  • Legacy pipeline (pipeline/__main__.py): Removes _build_service_ingestor and _reject_service_incompatible_flags in favour of the shared ingest.service path; the ValueError-on-empty-file-set regression (previously typer.BadParameter) and the missing test coverage for ingest/service.py noted in earlier review threads remain open.

Confidence Score: 4/5

Safe to merge with one outstanding fix: the legacy pipeline's run() wrapper has no except clause, so a ValueError from build_service_ingestor (empty file set) surfaces as a raw traceback rather than a clean error message.

The refactoring correctly consolidates service ingest construction into a single module and both entry points wire through it cleanly. The behavioral fix for --input-type auto + --text-chunk (previously a silent no-op) is correct. The open issue is in pipeline/main.py: run() wraps ingest in try/finally only, so the ValueError raised by build_service_ingestor when no files match the glob propagates as an unhandled exception rather than a user-friendly message — the old code raised typer.BadParameter which Typer caught automatically.

nemo_retriever/src/nemo_retriever/pipeline/main.py — the run() function needs an except clause (or the ValueError should be converted to typer.BadParameter) to restore clean error reporting for empty glob patterns in service mode.

Important Files Changed

Filename Overview
nemo_retriever/src/nemo_retriever/ingest/service.py New 573-line core module; implements service ingest request resolution, ingestor construction, and execution summary. No unit tests exist for its business logic (previous thread). The public functions carry docstrings and type annotations but the _build_service_caption_params overrides dict uses artificially prefixed key names for error messages that don't match field names.
nemo_retriever/src/nemo_retriever/adapters/cli/main.py Adds --run-mode service with service-URL/concurrency/token options; wires through ServiceIngestPlanRequest; delegates incompatible-flag rejection to shared helper. IngestorRunMode is a Literal type that Typer handles correctly. Explicit fallback validation present.
nemo_retriever/src/nemo_retriever/adapters/cli/service_flags.py New file; extracts shared service-incompatible-flag rejection logic. SPDX header present. Logic is clean and correctly mirrors the ParameterSource enum-name comparison from the old private helper.
nemo_retriever/src/nemo_retriever/adapters/cli/ingest_workflow.py Adds service_ingest_request_to_dry_run_data and run_service_ingest_workflow. Secret-field redaction works for service_api_token (ends with token). Dry-run early-return prevents KeyError on missing service_url key.
nemo_retriever/src/nemo_retriever/pipeline/main.py Removes ~130 lines of now-centralised service helpers. The ValueError from empty file set propagates unhandled through the try/finally (only) wrapper in run() (previous thread). Silent behavior change for --input-type auto + --text-chunk is now a correct fix but previously returned null split config.
nemo_retriever/tests/test_root_cli_workflow.py Adds three service-mode integration tests (flag rejection, dry-run token redaction, end-to-end wiring). Tests are well-structured and validate the right assertions including token redaction and output message format.
nemo_retriever/tests/test_pipeline_helpers.py Migrates test_build_service_ingestor_wires_extract_embed_and_chunking to use the new public ServiceIngestRequest API. Clean migration with no loss of assertion coverage.

Sequence Diagram

sequenceDiagram
    participant CLI as retriever ingest CLI
    participant SCLI as pipeline run CLI (legacy)
    participant SF as service_flags.py
    participant Main as adapters/cli/main.py
    participant IW as ingest_workflow.py
    participant SVC as ingest/service.py
    participant SI as ServiceIngestor

    CLI->>Main: "ingest_command(run_mode=service, ...)"
    Main->>SF: reject_service_incompatible_flags(ctx, ROOT_SERVICE_INCOMPATIBLE_FLAGS)
    SF-->>Main: raises BadParameter if incompatible flags present
    Main->>SVC: resolve_service_ingest_request(ServiceIngestPlanRequest)
    SVC->>SVC: resolve_service_documents() concrete paths
    SVC-->>Main: ServiceIngestRequest
    Main->>IW: run_service_ingest_workflow(request, dry_run)
    alt "dry_run=True"
        IW->>IW: service_ingest_request_to_dry_run_data()
        IW-->>Main: dry-run dict (token redacted)
    else "dry_run=False"
        IW->>SVC: execute_service_ingest_request(request)
        SVC->>SVC: build_service_ingestor(request)
        SVC->>SVC: expand_service_file_patterns()
        SVC->>SI: ServiceIngestor(url, concurrency, token)
        SVC->>SI: .files().extract().dedup?.caption?.embed().store?
        SI-->>SVC: configured ingestor
        SVC->>SI: .ingest()
        SI-->>SVC: result
        SVC-->>IW: ServiceIngestExecutionResult
        IW-->>Main: to_summary_dict()
    end
    Main-->>CLI: JSON or formatted output

    SCLI->>SF: reject_service_incompatible_flags(ctx, _SERVICE_INCOMPATIBLE_FLAGS)
    SCLI->>SVC: build_service_ingestor(ServiceIngestRequest(file_patterns))
    SVC->>SVC: expand_service_file_patterns(file_patterns)
    SVC->>SI: configure stages
    SI-->>SCLI: ingestor for .ingest()
Loading
Prompt To Fix All With AI
Fix the following 2 code review issues. Work through them one at a time, proposing concise fixes.

---

### Issue 1 of 2
nemo_retriever/src/nemo_retriever/ingest/service.py:416-422
The `overrides` dict keys use a `caption_` prefix (e.g. `"caption_context_text_max_chars"`) that matches the CLI *parameter variable* names but not the `ServiceIngestCaptionOptions` field names (`context_text_max_chars`, `temperature`, …). A user who sets `--caption-context-text-max-chars` without `--caption` will see `"Caption options require --caption: caption_context_text_max_chars"` — the extra prefix is confusing compared to the actual option name. Using the `ServiceIngestCaptionOptions` field names (without the prefix) keeps the error message consistent with the dataclass interface.

```suggestion
    overrides = {
        "context_text_max_chars": caption.context_text_max_chars,
        "temperature": caption.temperature,
        "top_p": caption.top_p,
        "max_tokens": caption.max_tokens,
        "caption_infographics": caption.caption_infographics,
    }
```

### Issue 2 of 2
nemo_retriever/src/nemo_retriever/ingest/service.py:229-232
**`ServiceIngestRequest.documents` has dual semantics across call sites**

`build_service_ingestor` calls `expand_service_file_patterns(request.documents)` to expand globs, so it works correctly whether `documents` holds resolved concrete paths (root CLI path, after `resolve_service_documents`) or raw glob patterns (legacy pipeline path, `file_patterns` passed directly). However `service_split_config_for_request` and `ServiceIngestExecutionResult.documents` consume `request.documents` directly without expansion. If `execute_service_ingest_request` were called from the legacy pipeline with unresolved globs, the dry-run split config and the `"documents"` key in `to_summary_dict` would reflect the raw patterns rather than the actual file list. A docstring or comment on `ServiceIngestRequest.documents` clarifying which semantics are expected (and that `build_service_ingestor` normalises either form) would prevent future callers from hitting this silently.

Reviews (2): Last reviewed commit: "Move service ingest construction into in..." | Re-trigger Greptile

Comment on lines +1326 to 1352
ingestor = ingest_service.build_service_ingestor(
ingest_service.ServiceIngestRequest(
documents=file_patterns,
input_type=input_type,
extract_params=extract_params,
embed_params=embed_params,
text_chunk_params=text_chunk_params,
enable_text_chunk=enable_text_chunk,
dedup_params=DedupParams(iou_threshold=dedup_iou_threshold) if enable_dedup else None,
caption_params=(
CaptionParams(
context_text_max_chars=caption_context_text_max_chars,
temperature=caption_temperature,
top_p=caption_top_p,
max_tokens=caption_max_tokens,
)
if enable_caption
else None
),
store_params=StoreParams(storage_uri=store_images_uri) if store_images_uri is not None else None,
connection=ingest_service.ServiceIngestConnectionOptions(
service_url=service_url,
service_concurrency=service_concurrency,
service_api_token=service_api_token,
),
)
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 ValueError from empty file set is unhandled in the legacy pipeline path

The old _build_service_ingestor raised typer.BadParameter when no files matched the glob patterns; the replacement build_service_ingestor raises ValueError instead. The run() function here has only a try/finally (no except), so any ValueError propagates unhandled to Typer and produces a raw Python traceback rather than a clean error message. This regression is reachable when _resolve_file_patterns constructs a valid glob like dir/**/*.pdf but the directory is empty — the upstream helper validates path existence, not that the glob actually matches files.

Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/pipeline/__main__.py
Line: 1326-1352

Comment:
**`ValueError` from empty file set is unhandled in the legacy pipeline path**

The old `_build_service_ingestor` raised `typer.BadParameter` when no files matched the glob patterns; the replacement `build_service_ingestor` raises `ValueError` instead. The `run()` function here has only a `try/finally` (no `except`), so any `ValueError` propagates unhandled to Typer and produces a raw Python traceback rather than a clean error message. This regression is reachable when `_resolve_file_patterns` constructs a valid glob like `dir/**/*.pdf` but the directory is empty — the upstream helper validates path existence, not that the glob actually matches files.

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +259 to +271
def execute_service_ingest_request(request: ServiceIngestRequest) -> ServiceIngestExecutionResult:
result = build_service_ingestor(request).ingest()
result_n_rows = _count_service_result_rows(result)
return ServiceIngestExecutionResult(
request=request,
result=result,
n_rows=result_n_rows,
result_n_rows=result_n_rows,
metadata={
"service_url": request.connection.service_url,
"input_type": request.input_type,
},
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 No unit tests for the new ingest/service.py module

nemo_retriever/ingest/service.py introduces substantial new business logic — profile-based extract defaults, document-type validation, caption/dedup/chunk param builders with their own error conditions, _split_config_for_auto_documents, and ServiceIngestExecutionResult — but no corresponding test_ingest_service.py exists. The new CLI-level tests exercise the wiring end-to-end, but functions like _build_service_caption_params (which raises on caption_context_text_max_chars < 0), _build_service_dedup_params (raises when iou_threshold is set without enabled), and _split_config_for_auto_documents (mixed media type branching) are untested in isolation. Per the test-mirrors-source-structure and test-coverage-new-code rules, a companion test module is required.

Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/ingest/service.py
Line: 259-271

Comment:
**No unit tests for the new `ingest/service.py` module**

`nemo_retriever/ingest/service.py` introduces substantial new business logic — profile-based extract defaults, document-type validation, caption/dedup/chunk param builders with their own error conditions, `_split_config_for_auto_documents`, and `ServiceIngestExecutionResult` — but no corresponding `test_ingest_service.py` exists. The new CLI-level tests exercise the wiring end-to-end, but functions like `_build_service_caption_params` (which raises on `caption_context_text_max_chars < 0`), `_build_service_dedup_params` (raises when `iou_threshold` is set without `enabled`), and `_split_config_for_auto_documents` (mixed media type branching) are untested in isolation. Per the `test-mirrors-source-structure` and `test-coverage-new-code` rules, a companion test module is required.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines +134 to +158
@dataclass(frozen=True)
class ServiceIngestExecutionResult:
request: ServiceIngestRequest
result: object
n_rows: int | None
result_n_rows: int | None
metadata: dict[str, Any]

@property
def documents(self) -> list[str]:
return list(self.request.documents)

@property
def service_url(self) -> str:
return self.request.connection.service_url

def to_summary_dict(self) -> dict[str, Any]:
return {
"run_mode": "service",
"documents": self.documents,
"service_url": self.service_url,
"result": self.result,
"n_rows": self.n_rows,
"result_n_rows": self.result_n_rows,
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 n_rows and result_n_rows are always identical, creating a confusing public API

In execute_service_ingest_request, both fields are set to the same value (_count_service_result_rows(result)) and both are emitted in to_summary_dict. If the intent was to distinguish "rows reported by the service" from "rows counted from the ingest result object", that distinction is not implemented — callers see two keys with the same number and no documented difference. The redundant field should either be removed, or the two fields should be sourced from different places with their distinction documented.

Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/ingest/service.py
Line: 134-158

Comment:
**`n_rows` and `result_n_rows` are always identical, creating a confusing public API**

In `execute_service_ingest_request`, both fields are set to the same value (`_count_service_result_rows(result)`) and both are emitted in `to_summary_dict`. If the intent was to distinguish "rows reported by the service" from "rows counted from the ingest result object", that distinction is not implemented — callers see two keys with the same number and no documented difference. The redundant field should either be removed, or the two fields should be sourced from different places with their distinction documented.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

Comment on lines +200 to +207
run_mode: str = typer.Option(
"inprocess",
"--run-mode",
help="Execution mode for the SDK ingestor. Defaults to inprocess; use batch for Ray Data scale-out.",
help=(
"Execution mode for ingest: inprocess (default), batch for Ray Data scale-out, "
"or service for a remote retriever service."
),
),

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 run_mode changed from IngestRunModeValue to str, so Typer no longer renders valid choices in --help output (was [inprocess|batch], now just TEXT) and no longer performs automatic validation before the function body runs. Adding click.Choice preserves the Typer/Click help-text and validation ergonomics without sacrificing the new service value.

Suggested change
run_mode: str = typer.Option(
"inprocess",
"--run-mode",
help="Execution mode for the SDK ingestor. Defaults to inprocess; use batch for Ray Data scale-out.",
help=(
"Execution mode for ingest: inprocess (default), batch for Ray Data scale-out, "
"or service for a remote retriever service."
),
),
run_mode: str = typer.Option(
"inprocess",
"--run-mode",
click_type=click.Choice(["inprocess", "batch", "service"]),
help=(
"Execution mode for ingest: inprocess (default), batch for Ray Data scale-out, "
"or service for a remote retriever service."
),
),
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/adapters/cli/main.py
Line: 200-207

Comment:
`run_mode` changed from `IngestRunModeValue` to `str`, so Typer no longer renders valid choices in `--help` output (was `[inprocess|batch]`, now just `TEXT`) and no longer performs automatic validation before the function body runs. Adding `click.Choice` preserves the Typer/Click help-text and validation ergonomics without sacrificing the new `service` value.

```suggestion
    run_mode: str = typer.Option(
        "inprocess",
        "--run-mode",
        click_type=click.Choice(["inprocess", "batch", "service"]),
        help=(
            "Execution mode for ingest: inprocess (default), batch for Ray Data scale-out, "
            "or service for a remote retriever service."
        ),
    ),
```

How can I resolve this? If you propose a fix, please make it concise.

Comment on lines +259 to +260
def execute_service_ingest_request(request: ServiceIngestRequest) -> ServiceIngestExecutionResult:
result = build_service_ingestor(request).ingest()

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Public functions execute_service_ingest_request, expand_service_file_patterns, and service_split_config_for_request lack docstrings. Per the docstrings-public-interface rule, all public functions must describe their behaviour, parameters, returns, and any exceptions raised.

Suggested change
def execute_service_ingest_request(request: ServiceIngestRequest) -> ServiceIngestExecutionResult:
result = build_service_ingestor(request).ingest()
def execute_service_ingest_request(request: ServiceIngestRequest) -> ServiceIngestExecutionResult:
"""Execute a resolved service ingest request and return a structured result.
Args:
request: A fully-resolved ``ServiceIngestRequest`` produced by
``resolve_service_ingest_request`` or constructed directly.
Returns:
A ``ServiceIngestExecutionResult`` containing the raw ingest result,
the row count (when the result exposes a ``dataframe`` attribute), and
connection metadata.
Raises:
ValueError: If no files matched the input patterns in *request*.
"""
result = build_service_ingestor(request).ingest()
Prompt To Fix With AI
This is a comment left during a code review.
Path: nemo_retriever/src/nemo_retriever/ingest/service.py
Line: 259-260

Comment:
Public functions `execute_service_ingest_request`, `expand_service_file_patterns`, and `service_split_config_for_request` lack docstrings. Per the `docstrings-public-interface` rule, all public functions must describe their behaviour, parameters, returns, and any exceptions raised.

```suggestion
def execute_service_ingest_request(request: ServiceIngestRequest) -> ServiceIngestExecutionResult:
    """Execute a resolved service ingest request and return a structured result.

    Args:
        request: A fully-resolved ``ServiceIngestRequest`` produced by
            ``resolve_service_ingest_request`` or constructed directly.

    Returns:
        A ``ServiceIngestExecutionResult`` containing the raw ingest result,
        the row count (when the result exposes a ``dataframe`` attribute), and
        connection metadata.

    Raises:
        ValueError: If no files matched the input patterns in *request*.
    """
    result = build_service_ingestor(request).ingest()
```

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

@jioffe502 jioffe502 force-pushed the codex/root-ingest-service-mode branch from 05bc509 to e574193 Compare June 9, 2026 17:39
@jioffe502

Copy link
Copy Markdown
Collaborator Author

Greptile triage/update after e574193:\n\n- The reported legacy pipeline empty-file P1 is not valid as stated. The service branch calls _resolve_file_patterns before build_service_ingestor; that helper checks matched globs and raises typer.BadParameter. I also reproduced an empty directory service-mode run locally and got a clean Typer Invalid value error, not a traceback.\n- Addressed the root ingest --run-mode help issue by using the existing IngestorRunMode Literal, which renders [inprocess|batch|service] and keeps invalid values as clean Typer errors. I did not use click_type because that produced FUNCTION help and a traceback on this Typer stack.\n- Added public service docstrings and documented why service-mode n_rows aliases result_n_rows: service ingest cannot locally verify the remote VDB row count, so both represent rows counted from the service result.\n- I did not add a broad test_ingest_service.py mirror. Existing pipeline service coverage was CLI-bound plus a construction helper test, and this PR mirrors that with root ingest service CLI coverage plus the shared build_service_ingestor helper. The live jp20 service ingest/recall smoke remains the stronger service-mode signal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant