Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions flows/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,59 @@ This example command assumes the [genomehubs/goat-data](https://github.com/genom
SKIP_PREFECT=true python3 -m flows.parsers.parse_ncbi_assemblies -i /tmp/assembly-data/ncbi_datasets_canidae.jsonl -y /tmp/assembly-data/ncbi_datasets_eukaryota.types.yaml -a
```

### Assembly version tracking

Three flows track NCBI assembly versions that become superseded over time, writing the result to `assembly_historical.tsv`:

- `flows/parsers/parse_backfill_historical_versions.py` — **one-time** backfill. Given a JSONL of current assemblies, finds every assembly with version > 1, discovers its earlier versions over NCBI FTP, and parses each one into `assembly_historical.tsv`. Run once before starting the daily incremental flow below.
- `flows/parsers/parse_assembly_versions.py` — **daily incremental**. Given today's assembly JSONL and yesterday's parsed `assembly_current.tsv` (kept alongside it as `assembly_current.tsv.previous`), detects any assembly that became superseded since yesterday and appends it to `assembly_historical.tsv`. No NCBI fetches are required — the row is copied from the previous parse. If a predecessor version isn't found in `assembly_current.tsv.previous` (e.g. it was never present in the historical TSV in the first place), the assembly is written to `missing_versions.json` instead of failing.
- `flows/updaters/update_assembly_versions.py` — updater that consumes `missing_versions.json`, fetches each missing accession's raw NCBI metadata, and writes it to `missing_assembly_versions.jsonl`. Feed that file back into `parse_backfill_historical_versions.py` as its `--input_path` to parse the missing version(s) into `assembly_historical.tsv`.

Only `parse_backfill_historical_versions.py` takes a `--yaml_path`; its output path is resolved relative to the *directory the YAML file lives in*, not `--work_dir`, so place a copy of the YAML in `--work_dir` if you want the TSV written there. `parse_assembly_versions.py` and `update_assembly_versions.py` take no YAML — all paths are derived from `--input_path`/`--work_dir`.

#### Example: minimal end-to-end run

This walkthrough uses the existing test fixture `tests/test_data/assembly_test_sample.jsonl`, which contains three real assemblies, two of them (`GCA_000222935.2` and `GCA_000412225.2`) at version 2 — no extra data needs to be fetched or fabricated to exercise both code paths below.

```
mkdir -p /tmp/assembly-versions
cp configs/assembly_historical.types.yaml /tmp/assembly-versions/
cp tests/test_data/assembly_test_sample.jsonl /tmp/assembly-versions/assembly_data_report.jsonl

# 1. One-time backfill: discovers and parses each assembly's earlier version(s) via NCBI FTP
SKIP_PREFECT=true python3 -m flows.parsers.parse_backfill_historical_versions \
--input_path /tmp/assembly-versions/assembly_data_report.jsonl \
--yaml_path /tmp/assembly-versions/assembly_historical.types.yaml \
--work_dir /tmp/assembly-versions
# -> writes /tmp/assembly-versions/assembly_historical.tsv

# 2. Simulate "yesterday's" parse: a current.tsv that only knows about GCA_000222935.1
printf "accession\tassembly_id\nGCA_000222935.1\tGCA_000222935_1\n" \
> /tmp/assembly-versions/assembly_current.tsv.previous

# 3. Daily incremental: compares today's JSONL against yesterday's TSV
SKIP_PREFECT=true python3 -m flows.parsers.parse_assembly_versions \
--input_path /tmp/assembly-versions/assembly_data_report.jsonl
# -> GCA_000222935.2 supersedes the .1 row above: appended to assembly_historical.tsv
# -> GCA_000412225.2 and GCA_003706615.3 have no earlier version in
# assembly_current.tsv.previous: both written to
# /tmp/assembly-versions/missing_versions.json instead

# 4. Updater: fetch raw metadata for the accessions reported as missing
SKIP_PREFECT=true python3 -m flows.updaters.update_assembly_versions \
--missing_json /tmp/assembly-versions/missing_versions.json \
--work_dir /tmp/assembly-versions
# -> writes /tmp/assembly-versions/missing_assembly_versions.jsonl

# 5. Feed the updater's output back into step 1 to backfill the missing version
SKIP_PREFECT=true python3 -m flows.parsers.parse_backfill_historical_versions \
--input_path /tmp/assembly-versions/missing_assembly_versions.jsonl \
--yaml_path /tmp/assembly-versions/assembly_historical.types.yaml \
--work_dir /tmp/assembly-versions
```

Steps 1, 4 and 5 fetch from NCBI and require the [`datasets`](https://www.ncbi.nlm.nih.gov/datasets/docs/v2/download-and-install/) CLI on `PATH` plus network access (FTP for version discovery, `datasets summary` for metadata); without it they report a clean per-accession fetch failure rather than crashing. Step 3 (`parse_assembly_versions.py`) makes no network calls at all — it only reads the previous TSV and the new JSONL.

### Validate

The flow at `flows/lib/validate_file_pair.py` runs [blobtk validate](https://github.com/genomehubs/blobtk/wiki/blobtk-validate) to validate the entries in a TSV file against the YAML configuration and, optionally, an NCBI taxdump format taxonomy.
Expand Down
228 changes: 228 additions & 0 deletions flows/lib/assembly_versions_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
"""Shared utilities for assembly version discovery and fetching.

Used by both the backfill parser and the incremental updater to discover
assembly versions via NCBI FTP and fetch per-version metadata.
"""

import json
import os
import re
import time

from flows.lib import utils

ACCESSION_PATTERN = re.compile(r"^GC[AF]_\d{9}\.\d+$")


def parse_version(accession: str) -> int:
"""Extract the version number from a dotted accession string.

Args:
accession (str): e.g. GCA_000002035.3

Returns:
int: Version number (defaults to 1 if no dot-suffix).
"""
parts = accession.split(".")
return int(parts[1]) if len(parts) > 1 else 1


def parse_accession(accession: str) -> tuple[str, int]:
"""Split an accession into its base and version components.

Args:
accession (str): e.g. GCA_000002035.3

Returns:
tuple: (base_accession, version_number).
"""
parts = accession.split(".")
return parts[0], int(parts[1]) if len(parts) > 1 else 1


def setup_cache_directories(work_dir: str) -> None:
"""Create cache directory structure under work_dir.

Args:
work_dir (str): Path to the working directory.
"""
for subdir in ("version_discovery", "metadata"):
os.makedirs(
os.path.join(work_dir, "backfill_cache", subdir), exist_ok=True
)


def get_cache_path(work_dir: str, cache_type: str, identifier: str) -> str:
"""Generate a human-readable cache file path.

Args:
work_dir (str): Path to the working directory.
cache_type (str): Cache category (version_discovery or metadata).
identifier (str): Accession string used as the filename stem.

Returns:
str: Path to the JSON cache file.
"""
safe_id = re.sub(r"[^A-Za-z0-9_.-]", "_", identifier)
return os.path.join(work_dir, "backfill_cache", cache_type, f"{safe_id}.json")


def load_from_cache(cache_path: str, max_age_days: int = 30) -> dict:
"""Load data from cache if it exists and is recent enough.

Args:
cache_path (str): Path to the cache JSON file.
max_age_days (int): Maximum acceptable age in days.

Returns:
dict: Cached data, or empty dict on miss/expiry.
"""
try:
if os.path.exists(cache_path):
cache_age = time.time() - os.path.getmtime(cache_path)
if cache_age < (max_age_days * 24 * 3600):
with open(cache_path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f" Warning: Could not load cache from {cache_path}: {e}")
return {}


def save_to_cache(cache_path: str, data: dict) -> None:
"""Save data to a cache file, creating parent dirs as needed.

Args:
cache_path (str): Path to the cache JSON file.
data (dict): Data to persist.
"""
try:
os.makedirs(os.path.dirname(cache_path), exist_ok=True)
with open(cache_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
except Exception as e:
print(f" Warning: Could not save cache to {cache_path}: {e}")


def discover_version_accessions(base_accession: str, work_dir: str) -> list[str]:
"""Discover all versioned accessions for a base assembly via NCBI FTP.

Args:
base_accession (str): Full accession (e.g. GCA_000002035.3).
work_dir (str): Working directory for cache storage.

Returns:
list: Sorted list of versioned accession strings.
"""
import requests

base_match = re.match(r"(GC[AF]_\d+)", base_accession)
if not base_match:
return []

base = base_match.group(1)
setup_cache_directories(work_dir)
cache_path = get_cache_path(work_dir, "version_discovery", base)
cached = load_from_cache(cache_path, max_age_days=7)

if cached and "accessions" in cached:
print(f" Using cached version list for {base}")
return cached["accessions"]

print(f" Discovering versions for {base} via FTP")
ftp_url = (
f"https://ftp.ncbi.nlm.nih.gov/genomes/all/"
f"{base[:3]}/{base[4:7]}/{base[7:10]}/{base[10:13]}/"
)

try:
response = requests.get(ftp_url, timeout=30)
if response.status_code != 200:
print(f" Warning: FTP query failed for {base}")
return []
except Exception as e:
print(f" Error querying FTP for {base}: {e}")
return []

version_pattern = rf"{re.escape(base)}\.\d+"
accessions = sorted(set(re.findall(version_pattern, response.text)))

save_to_cache(cache_path, {
"accessions": accessions,
"base_accession": base,
"ftp_url": ftp_url,
})
return accessions


def fetch_version_metadata(version_acc: str, work_dir: str) -> dict:
"""Fetch NCBI datasets metadata for a single assembly version.

Uses utils.run_quoted to safely invoke the datasets CLI. Results are
cached for 30 days.

Args:
version_acc (str): Versioned accession (e.g. GCA_000002035.1).
work_dir (str): Working directory for cache storage.

Returns:
dict: Metadata dict, or empty dict on failure.
"""
cache_path = get_cache_path(work_dir, "metadata", version_acc)
cached = load_from_cache(cache_path, max_age_days=30)

if cached and "metadata" in cached:
return cached["metadata"]

if not ACCESSION_PATTERN.match(version_acc):
print(f" Skipping unexpected accession format: {version_acc}")
return {}

cmd = [
"datasets", "summary", "genome", "accession",
version_acc, "--as-json-lines",
]
try:
result = utils.run_quoted(
cmd,
capture_output=True,
text=True,
encoding="utf-8",
errors="ignore",
timeout=60,
)
if result.returncode == 0 and result.stdout and result.stdout.strip():
version_data = json.loads(result.stdout.strip())
save_to_cache(cache_path, {
"metadata": version_data,
"cached_at": time.time(),
})
return version_data

print(f" Warning: No metadata for {version_acc}")
except Exception as e:
print(f" Warning: Error fetching {version_acc}: {e}")

return {}


def find_all_assembly_versions(base_accession: str, work_dir: str) -> list[dict]:
"""Discover all versions and fetch metadata for each.

Delegates to discover_version_accessions for FTP discovery and
fetch_version_metadata for per-version metadata retrieval. Both layers
use independent caches.

Args:
base_accession (str): Full accession (e.g. GCA_000002035.3).
work_dir (str): Working directory for cache storage.

Returns:
list: List of metadata dicts, one per version found.
"""
accessions = discover_version_accessions(base_accession, work_dir)
versions = []
for version_acc in accessions:
metadata = fetch_version_metadata(version_acc, work_dir)
if metadata:
versions.append(metadata)
return versions
8 changes: 8 additions & 0 deletions flows/lib/shared_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,14 @@
},
}

MISSING_JSON = {
"flags": ["-m", "--missing_json"],
"keys": {
"help": "Path to missing_versions.json produced by parse_assembly_versions.",
"type": str,
},
}

WORK_DIR = {
"flags": ["-w", "--work_dir"],
"keys": {
Expand Down
Loading
Loading