Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 127 additions & 0 deletions docs/docs/extraction/nemo-retriever-api-reference.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,132 @@
# NeMo Retriever API Reference

## Service-mode async ingest jobs

Use `create_ingestor(run_mode="service")` when you want the Python client to submit
documents to a running NeMo Retriever service instead of executing the pipeline in
the local process or Ray cluster. Service mode exposes three ingest surfaces:

- `ingest()` blocks until all submitted documents finish and returns a
`ServiceIngestResult` with per-document completion events, `job_id`,
`document_ids`, `failures`, `job_status`, `elapsed_s`, and, by default, a
combined `dataframe` of completed results.
- `ingest_stream()` is a synchronous generator for callers that want progress
events without managing an event loop.
- `aingest_stream()` is an asynchronous generator for applications that already
run inside an event loop.

### Synchronous streaming example

```python
from nemo_retriever import create_ingestor


ingestor = (
create_ingestor(
run_mode="service",
base_url="http://localhost:7670",
documents=["docs/alpha.pdf", "docs/beta.pdf"],
)
.extract()
.embed()
)

for event in ingestor.ingest_stream():
match event["event"]:
case "job_created":
print(
f"created job {event['job_id']} "
f"for {event['expected_documents']} documents"
)
case "upload_complete":
print(f"uploaded {event['filename']} as {event['document_id']}")
case "upload_failed":
print(f"could not upload {event['filename']}: {event['error']}")
case "document_complete":
if event["status"] == "completed":
print(f"{event['document_id']} completed with {event['result_rows']} rows")
else:
print(f"{event['document_id']} failed: {event.get('error')}")
case "job_progress":
print(f"{event['completed']} completed, {event['failed']} failed")
case "job_finalized" | "job_partial" | "job_failed":
print(f"job {event['job_id']} finished with status {event['status']}")
```

Use this form from scripts, notebooks, CLI commands, or worker processes that are
otherwise synchronous but still need live job and document progress.

### Async streaming example

```python
import asyncio

from nemo_retriever import create_ingestor


async def main() -> None:
ingestor = (
create_ingestor(
run_mode="service",
base_url="http://localhost:7670",
documents=["docs/alpha.pdf", "docs/beta.pdf"],
)
.extract()
.embed()
)

async for event in ingestor.aingest_stream():
if event["event"] == "job_created":
print(f"created job {event['job_id']}")
elif event["event"] == "upload_complete":
print(f"uploaded {event['filename']}")
elif event["event"] == "upload_failed":
print(f"could not upload {event['filename']}: {event['error']}")
elif event["event"] == "document_complete":
print(f"{event['document_id']}: {event['status']}")
elif event["event"] in {"job_finalized", "job_partial", "job_failed"}:
print(f"job {event['job_id']}: {event['status']}")


asyncio.run(main())
```

Use this form from async web services, task runners, or notebooks that need to
keep other async work moving while ingestion is in flight. The trailing
`asyncio.run(main())` drives the coroutine from a standalone script. Inside an
environment that already runs an event loop — a Jupyter notebook, a FastAPI
handler, or another async task — `await main()` (or inline the `async for` loop)
instead, because `asyncio.run()` cannot be nested inside a running loop.

### Event shapes

The streaming APIs yield dictionaries. Check the `event` key first, then read the
fields that apply to that event type. The examples above handle the events most
callers act on; any event you do not explicitly handle (such as `job_started`)
can be safely ignored.

| Event | Meaning | Key fields |
| --- | --- | --- |
| `job_created` | The service created one aggregate job for the submitted document set. | `job_id`, `expected_documents` |
| `upload_complete` | One local file uploaded and was assigned a service document ID. | `job_id`, `filename`, `document_id` |
| `document_complete` | One document reached a terminal document state. | `job_id`, `document_id`, `status`, `result_rows`, `elapsed_s`, `error` |
| `upload_failed` | One local file could not be uploaded. | `job_id`, `filename`, `error` |
| `job_started` | At least one document in the job started processing. | `job_id`, `status`, `expected_documents`, `counts`, `completed`, `failed`, `remaining`, `progress_pct`, `started_at` |
| `job_progress` | The job reached a progress reporting milestone. | `job_id`, `status`, `expected_documents`, `counts`, `completed`, `failed`, `remaining`, `progress_pct`, `elapsed_s` |
| `job_finalized` | All documents completed successfully. | `job_id`, `status`, `expected_documents`, `counts`, `completed`, `failed`, `remaining`, `progress_pct`, `elapsed_s`, `finalized_at` |
| `job_partial` | Some documents completed and some failed. | `job_id`, `status`, `expected_documents`, `counts`, `completed`, `failed`, `remaining`, `progress_pct`, `elapsed_s`, `finalized_at` |
| `job_failed` | Every document in the job failed. | `job_id`, `status`, `expected_documents`, `counts`, `completed`, `failed`, `remaining`, `progress_pct`, `elapsed_s`, `finalized_at` |

Comment on lines +101 to +119

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 job_started event documented in the table but absent from both code examples

The event shapes table lists job_started with its full set of key fields (job_id, status, expected_documents, counts, completed, failed, remaining, progress_pct, started_at), but neither the synchronous nor the asynchronous code example handles it. Readers who copy the examples verbatim will silently drop job_started events. A catch-all (case _: ... / else: ...) branch or a brief note clarifying that job_started does not require a handler in typical use would remove the ambiguity.

Prompt To Fix With AI
This is a comment left during a code review.
Path: docs/docs/extraction/nemo-retriever-api-reference.md
Line: 97-113

Comment:
**`job_started` event documented in the table but absent from both code examples**

The event shapes table lists `job_started` with its full set of key fields (`job_id`, `status`, `expected_documents`, `counts`, `completed`, `failed`, `remaining`, `progress_pct`, `started_at`), but neither the synchronous nor the asynchronous code example handles it. Readers who copy the examples verbatim will silently drop `job_started` events. A catch-all (`case _: ...` / `else: ...`) branch or a brief note clarifying that `job_started` does not require a handler in typical use would remove the ambiguity.

How can I resolve this? If you propose a fix, please make it concise.

Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!

`document_complete` uses `status="completed"` or `status="failed"`. Job terminal
events use aggregate statuses: `job_finalized` reports `status="completed"`,
`job_partial` reports `status="partial_success"`, and `job_failed` reports
`status="failed"`.

Use `ingest(return_results=False)` when you only need the final job metadata and
document IDs. The default `ingest()` behavior fetches result rows for each
completed document so it can populate `result.dataframe`; streaming callers can
avoid that materialization and handle each event as it arrives.

## PDF pre-splitting for parallel ingest

Large PDFs are split into page batches before Ray processing so extraction can run in parallel. This happens on the default ingest path; you do not need extra configuration for typical workloads.
Expand Down