Skip to content

add lance writer#120

Open
Hynek Kydlíček (hynky1999) wants to merge 1 commit into
mainfrom
codex/add-lance-writer
Open

add lance writer#120
Hynek Kydlíček (hynky1999) wants to merge 1 commit into
mainfrom
codex/add-lance-writer

Conversation

@hynky1999

Copy link
Copy Markdown
Collaborator

Purpose

Add a Lance dataset writer to Refiner pipelines.

Key design decisions

  • Adds write_lance(output, mode="create") with optional pylance dependency.
  • Streams each Refiner shard into a background Lance fragment writer via a bounded queue-backed RecordBatchReader.
  • Writes Refiner sidecar metadata under _refiner_lance_fragments/<job_id>/ and uses a reducer stage to commit only finalized worker fragments.
  • Keeps Lance file sizing and storage defaults owned by Lance.

Test evidence

  • uv run pytest tests/pipeline/test_sinks.py -q -> 13 passed
  • uv run ruff check --force-exclude .
  • uv run ruff format --force-exclude --check .
  • uv run ty check
  • Full uv run pytest was run; it hit a transient Hugging Face 429 in an unrelated LeRobot hub test. The failed test was rerun after the reset window and passed.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces support for writing Lance datasets by adding a new write_lance sink. The implementation follows a distributed write model where individual workers stream rows into uncommitted Lance fragments, followed by a single-worker reducer stage that commits the finalized fragments to the dataset. The changes include documentation updates, new dependencies in pyproject.toml, and the core logic in src/refiner/pipeline/sinks/lance.py. Feedback focuses on improving the robustness of background thread error handling and optimizing the efficiency of the reducer stage by narrowing the scope of file discovery to job-specific prefixes.

Comment thread src/refiner/pipeline/sinks/lance.py Outdated
Comment thread src/refiner/pipeline/sinks/lance.py Outdated

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: c9c69a55f6

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "Codex (@codex) review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".

Comment thread src/refiner/pipeline/sinks/lance.py
@hynky1999 Hynek Kydlíček (hynky1999) force-pushed the codex/add-lance-writer branch 2 times, most recently from ba8deea to 73e7b51 Compare April 29, 2026 16:42

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: ba8deea3b4

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "Codex (@codex) review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".

for fragment in fragment_json
]
)
read_version = existing.version

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve base dataset version for append commits

In append mode this reducer reads existing.version right before LanceDataset.commit, but Lance distributed writes expect read_version to be the version captured when fragments were produced. If another writer updates or overwrites the dataset between fragment generation and reducer commit, this code can commit stale fragments against a newer table state instead of triggering the intended conflict path, which can silently mix data from incompatible write windows. Persist the writer-stage base version with shard metadata and use that fixed version at commit time.

Useful? React with 👍 / 👎.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 73e7b5142b

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "Codex (@codex) review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".

lance = _import_lance()
fragment_json: list[str] = []
schema: pa.Schema | None = None
for rel_path in sorted(metadata_paths):

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Deduplicate metadata paths before committing fragments

LanceCommitReducerSink._run_commit iterates metadata_paths directly, but _managed_paths can surface duplicate entries from DataFolder.find on some backends (the repo already accounts for this in file-cleanup tests). If the same metadata file appears twice, this loop re-reads it and appends the same fragment JSON twice, which can duplicate rows in the committed Lance dataset (or trigger commit-time fragment conflicts). Deduplicating metadata_paths before reading prevents duplicate fragment commits.

Useful? React with 👍 / 👎.

@hynky1999 Hynek Kydlíček (hynky1999) force-pushed the codex/add-lance-writer branch 3 times, most recently from 884259b to 9bb3e7d Compare April 29, 2026 17:08

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 9bb3e7d634

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "Codex (@codex) review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".

Comment thread src/refiner/pipeline/sinks/lance.py
Comment thread src/refiner/pipeline/sinks/lance.py
@hynky1999 Hynek Kydlíček (hynky1999) force-pushed the codex/add-lance-writer branch 2 times, most recently from 627e321 to e210d4c Compare April 29, 2026 17:29

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: e210d4c0e4

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "Codex (@codex) review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".

queue = self._queue()
for batch in batches:
self._raise_if_failed()
queue.put(batch)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Prevent shard writer deadlock on failed fragment thread

_StreamingShardWriter.put_batches performs a blocking queue.put(batch) on a bounded queue, but only checks task_future before and after the call. If the Lance fragment thread fails after the pre-check and the queue is full (common with large shards that produce many batches), the producer blocks forever in put and never reaches the post-check, so the worker hangs instead of failing fast. This is triggered by write-time Lance errors (for example transient object-store failures) during active streaming.

Useful? React with 👍 / 👎.

@hynky1999 Hynek Kydlíček (hynky1999) force-pushed the codex/add-lance-writer branch 4 times, most recently from 6314462 to 7b971c2 Compare April 29, 2026 21:17

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 7b971c2458

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "Codex (@codex) review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".

if not self.closed:
self.closed = True
self._raise_if_failed()
self._queue().put(_QUEUE_CLOSED)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Prevent finish from hanging on failed writer thread

Handle the sentinel enqueue in finish() without an unbounded blocking put, because this can deadlock the worker: if the Lance writer thread fails after _raise_if_failed() but before consuming more items, and the queue is already full, self._queue().put(_QUEUE_CLOSED) blocks forever. In that state shard finalization (on_shard_complete) and worker shutdown (close) can hang instead of surfacing the write failure.

Useful? React with 👍 / 👎.

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 3c84489189

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "Codex (@codex) review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".

Comment on lines +195 to +199
fragments = lance.fragment.write_fragments(
reader,
self.dataset_uri,
schema=self.schema,
mode=self.mode,

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Propagate DataFolder credentials into Lance fragment writes

write_lance_dataset accepts DataFolderLike, but _StreamingShardWriter._run passes only self.dataset_uri into lance.fragment.write_fragments and never forwards storage options from the resolved DataFolder. This breaks runs where access is configured via (path, fs) or non-env filesystem options (custom endpoints, temporary creds, etc.): other sinks can write through the configured fsspec filesystem, while Lance writes fall back to Lance defaults/environment and fail to read/write the target URI.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant