feat(persistence,rest,hfs): FHIR Bulk Data Export ($export) — async kick-off, postgres-s3 multi-instance, Inferno v2.0.0#108
Merged
Merged
Conversation
Free function exposed at crate root that dispatches per FhirVersion to
the existing helios_fhir::{r4,r4b,r5,r6}::get_compartment_params helpers.
Lets persistence reuse the lookup without depending on helios-rest.
…t handler Drops the private get_compartment_params_for_version wrapper in favor of the new shared dispatch on the helios-fhir crate.
Returned by fenced ExportWorkerStorage methods when a stale worker's mutation is rejected because the job has been reclaimed.
- ExportRequest gains until / elements / include_associated_data / patient_refs - ExportManifest gains deleted / link (IG-required) - New StartExportInput bundles kickoff metadata (transaction_time, request_url, owner_subject, fhir_version) - New RawExportManifest / RawManifestEntry: storage-side manifest carrying ExportPartKey rather than wire URLs - New ExportJobMetadata, ExportFileMetadata, ExpiredExportRef - New GroupExportProvider::get_group_members_with_periods (default impl derived from get_group_members) so backends can surface Group.member.period.start for the _since-newly-added filter - BulkExportStorage gains start_export(StartExportInput) signature, RawExportManifest return, get_export_job_metadata, get_export_file_metadata, count_active_exports, list_expired_exports
ExportPartKey (with embedded fencing_token), ExportPartWriter (line + byte counter over a boxed AsyncWrite), FinalizedPart, DownloadUrl, and the ExportOutputStore trait. Decouples 'where the bytes go' from the job-state backend.
…rker - WorkerId, ExportJobLease (with fencing_token), LeaseError - ExportClaimStrategy: claim_next + heartbeat + release - ExportWorkerStorage: every method fenced by (worker_id, fencing_token) so a stale worker cannot mutate progress, file rows, or terminal status after its lease has been reclaimed - BulkExportJobStore marker trait (BulkExportStorage + ExportWorkerStorage + ExportClaimStrategy) for bootstrap-time selection of the job store - DefaultExportWorker drives a claimed job to completion under its lease, applying _typeFilter / _since / _until / _elements, supporting resume from the persisted cursor, and honoring since_newly_added=exclude via Group.member.period.start
…umns bulk_export_jobs: worker_id, lease_expiry, fencing_token, heartbeat_at, owner_subject, request_url, fhir_version + idx_export_jobs_claim. bulk_export_files: part_index, fencing_token + a backfill that assigns 0-based sequential part_index per (job_id, file_type, resource_type) before creating the unique idx_export_files_part. Includes test exercising the duplicate-row backfill case.
- start_export(StartExportInput): persists frozen kickoff metadata - get_export_manifest -> RawExportManifest assembled from rows - get_export_job_metadata / get_export_file_metadata - count_active_exports / list_expired_exports - ExportClaimStrategy via process-local mutex + INSERT/UPDATE - ExportWorkerStorage: every mutation fenced by worker_id + fencing_token (UPDATE … WHERE worker_id=? AND fencing_token=? for terminals, WHERE EXISTS-guarded ON CONFLICT upserts for progress + file rows) - get_group_members_with_periods reads Group.member.period.start - resolve_group_patient_ids flattens nested Groups with a cycle guard - Tests: stale-worker fencing, claim/lifecycle, group-cycle, since_newly_added
ALTER TABLE bulk_export_jobs ADD COLUMN IF NOT EXISTS … for the lease fields, owner_subject, request_url, fhir_version. ALTER bulk_export_files for part_index + fencing_token; ROW_NUMBER() backfill before the unique idx_export_files_part.
PostgresSkipLocked claim strategy (FOR UPDATE SKIP LOCKED inside a transaction), fully-fenced ExportWorkerStorage (every mutation guarded by worker_id + fencing_token), all new BulkExportStorage methods, get_group_members_with_periods + nested-Group flattening with cycle guard. Bind sites use i32 / i64 to match the actual column types on bulk_export_progress / bulk_export_files.
Default impl reports unsupported; AwsS3Client overrides it via PresigningConfig from the AWS SDK. Used by S3OutputStore to mint direct-from-S3 download URLs for the bulk-export manifest.
Reserved for future S3OutputStore integrations; unused now that S3 is output-only and keys live in S3OutputStore::object_key.
S3 is no longer a bulk-export job-state backend; the model is preserved for a future read-modify-write integration.
Reserved for future S3OutputStore integration; unused now that the synchronous BulkExportStorage path has been removed.
S3 is output-only for bulk export — job state lives in SQLite or PostgreSQL. Drops the synchronous start_export / run_export_job path and adds stub PatientExportProvider / GroupExportProvider impls returning UnsupportedCapability so an S3-resource-storage deployment satisfies the trait hierarchy.
ExportOutputStore impl backed by AwsS3Client. open_writer returns a
local scratch tempfile; finalize_part fsyncs + put_object's it to S3
under {tenant}/exports/{job_id}/{file_type}-{rt}-{part}-{token}.ndjson.
download_url either pre-signs (Auto / AlwaysPresigned) or returns an
HFS-served URL (AlwaysToken). delete_job_outputs lists + deletes by
prefix. AccessTokenMode encodes the requires_access_token posture.
bulk_export_start_manifest_and_delete is gone (the impl was removed); bulk_export_invalid_format_and_fetch_batch_cursor is reduced to the fetch_export_batch cursor case which still exercises ExportDataProvider.
postgres_integration_export_claim_skip_locked: claim ordering, fencing token bumps. postgres_integration_export_stale_worker_fenced_out: LeaseLost on every fenced ExportWorkerStorage call after reclaim. postgres_integration_export_count_active_and_expire: count + list filtering. claim_specific helper drains foreign jobs so tests can cope with the shared SHARED_PG container.
…add S3OutputStore round-trip The lifecycle test now exercises the remaining ExportDataProvider surface. Adds test_minio_s3_output_store_round_trip: write → finalize → pre-signed GET → open_reader → idempotent delete against MinIO.
…ort_batch S3 is no longer a bulk-export job-state backend; verify the ExportDataProvider data feed instead.
ExportOutputStore impl backed by tokio::fs. open_writer creates a
.tmp under ${HFS_DATA_DIR}/exports/{tenant}/{job_id}/, finalize_part
fsyncs + atomic rename, download_url returns an HFS-served URL with
requires_access_token=true, open_reader serves the file, and
delete_job_outputs is idempotent. Includes a write→finalize→read→delete
round-trip test.
ExportDataProvider / PatientExportProvider / GroupExportProvider impls returning UnsupportedCapability so MongoDB can satisfy the trait hierarchy without supporting bulk export as a primary.
CompositeStorage gains an export_provider: Option<DynGroupExportProvider> field set by with_full_primary (with the new GroupExportProvider bound on T). Each trait method delegates to the primary or returns UnsupportedCapability when no primary impl is wired in.
Authorizes the HFS-served (requires_access_token=true) download path
using the helios_auth Principal — checks ownership against
job_owner_subject (or system/* wildcard) plus a system/{ResourceType}.rs
scope. Pre-signed downloads bypass HFS and never reach this trait.
bulk_export_jobs: Arc<dyn BulkExportJobStore>, bulk_export_output: Arc<dyn ExportOutputStore>, bulk_export_file_auth: Arc<dyn ExportFileAuth>, plus an Arc<BulkExportConfig>. New with_bulk_export(...) builder and accessors so handlers can reach the subsystem behind feature toggles without touching the resource-storage S type parameter.
Full configuration surface: enabled, backend (embedded|postgres-s3), output_backend (local-fs|s3), output_dir, s3_bucket, requires_access_token (auto|true|false), file_url_ttl_secs, output_ttl_secs, worker_concurrency, disable_local_worker, max_concurrent_per_tenant, batch_size, lease_duration_secs, heartbeat_interval_secs, cleanup_interval_secs, since_newly_added (include|exclude). validate() rejects local-fs + requires_access_token=false (no pre-signed URL capability).
Upgrade astral-tokio-tar from 0.6.1 to 0.6.2 in Cargo.lock to clear RUSTSEC-2026-0145, which is pulled in through testcontainers.
# Conflicts: # .github/workflows/bulk-export-smoke.yml
smunini
requested changes
May 22, 2026
Contributor
smunini
left a comment
There was a problem hiding this comment.
Looks better - see my comment here and the pending comment about validate
Restore the SQLite/PostgreSQL Elasticsearch composite rows as full lifecycle smoke coverage, while keeping MongoDB and S3 primary backends explicit as unsupported cases until their bulk-export provider behavior exists.
smunini
reviewed
May 26, 2026
| {"backend":"sqlite","bulk_mode":"embedded-local","expectation":"full"}, | ||
| {"backend":"sqlite","bulk_mode":"postgres-s3","expectation":"full"}, | ||
| {"backend":"postgres","bulk_mode":"embedded-local","expectation":"full"}, | ||
| {"backend":"postgres","bulk_mode":"postgres-s3","expectation":"full"}, |
Contributor
There was a problem hiding this comment.
postgres-s3 is not a valid backend config
smunini
reviewed
May 26, 2026
| {"backend":"postgres-elasticsearch","bulk_mode":"embedded-local","expectation":"endpoint-unavailable"}, | ||
| {"backend":"postgres-elasticsearch","bulk_mode":"postgres-s3","expectation":"endpoint-unavailable"}, | ||
| {"backend":"sqlite-elasticsearch","bulk_mode":"embedded-local","expectation":"full"}, | ||
| {"backend":"sqlite-elasticsearch","bulk_mode":"postgres-s3","expectation":"full"}, |
Contributor
There was a problem hiding this comment.
postgres-s3 is not a valid backend config
smunini
reviewed
May 26, 2026
smunini
reviewed
May 26, 2026
Use export_topology to distinguish the bulk export job/output topology from the persistence backend matrix, and keep unsupported runtime modes as endpoint-unavailable coverage. Configure AWS OIDC for postgres-s3 export rows, validate HFS_S3_EXPORT_BUCKET access, pass the real export bucket into HFS, and clean each job's isolated tenant export prefix after the smoke run.
The bulk-export subsystem previously took a separate `HFS_BULK_EXPORT_BACKEND`
(embedded|postgres-s3) plus its own `HFS_BULK_EXPORT_DATABASE_URL`, and
constructed a fresh SqliteBackend or PostgresBackend internally — even when the
FHIR side already had one configured. This change drops both env vars and has
`build_bulk_export()` accept the job-state store as a parameter, so the FHIR
backend's existing instance (and connection pool) is reused.
Composite startup paths (sqlite-elasticsearch, postgres-elasticsearch) now also
wire bulk export, passing the underlying relational primary as both data
provider and job store while the composite continues to serve the rest of the
API.
CI smoke matrix collapses from a two-dimensional `{backend, export_topology}`
into `{backend, output}` (output: local/s3/none); job state is no longer a
matrix knob because it's derived from the FHIR backend. 15 rows × 3 FHIR
versions. mongodb/s3 rows still depend on backend-side ExportDataProvider /
BulkExportJobStore work that is not in this change.
…on-relational backends Implements the three bulk-export read-side traits for backends that previously stubbed them out: - MongoDB: `ExportDataProvider`, `PatientExportProvider`, and `GroupExportProvider` over the resources collection. Keyset pagination on (last_updated, id) using a $or filter, matching the SQLite/Postgres cursor format. Patient compartment lookup uses the search_index collection (subject/patient → Patient/<id>) when the resource type isn't Patient itself. - S3: completes the `PatientExportProvider` and `GroupExportProvider` stubs. Since S3 has no search index, the compartment scan iterates the resource_type prefix and inspects `subject.reference`/`patient.reference` on each resource's JSON. `get_group_members` reads `Group/<id>/current.json` and parses `member[].entity.reference`. For job state, MongoDB and S3 lack transactional/atomic-claim semantics suitable for `BulkExportJobStore`, so the mongodb, mongodb-elasticsearch, and s3-elasticsearch startup paths now pair the primary backend with an embedded SQLite sidecar (`./data/bulk_export.db`) via a new `build_embedded_job_store` helper. The principle of "reuse the primary's config" still holds for SQLite/Postgres backends; for the others, the sidecar is HFS-owned local state and requires no extra configuration. Pure-S3 (no Elasticsearch) remains intentionally unwired — without a search index there's no scalable enumeration, matching the workflow matrix's `endpoint-unavailable` row.
The previous implementation looked up compartment members via the search_index collection, but mongodb-elasticsearch deployments offload search and leave search_index empty — so patient/group exports of non-Patient resource types returned nothing. Switched to querying the resources collection directly using dot notation on data.subject.reference / data.patient.reference. Works identically whether or not search is offloaded.
When using the embedded SQLite sidecar job store (mongodb/s3 backends),
default the DB path to `${HFS_BULK_EXPORT_OUTPUT_DIR}/bulk_export.db`
instead of `${HFS_DATA_DIR}/bulk_export.db`.
This prevents parallel HFS instances with different output directories
(e.g. CI smoke jobs running with max-parallel: 2) from racing on the
same SQLite file. Output dir is already unique per smoke job via
`RESULTS_DIR/export-output`.
…ured
Adds a third fallback to `build_embedded_job_store`: when neither
`HFS_BULK_EXPORT_OUTPUT_DIR` nor `HFS_DATA_DIR` is set, use
`${TMPDIR}/hfs-bulk-export-{pid}.db` so parallel HFS processes (e.g. CI
smoke jobs on the same runner with `max-parallel: 2`) don't race on a
single `./data/bulk_export.db` file.
Affects mongodb / mongodb-elasticsearch / s3-elasticsearch backends with
`output=s3` only — the other rows already get a unique output_dir.
The mongodb-elasticsearch startup path was the only composite that didn't call start_sync_workers() — leftover from when its row was endpoint-unavailable in CI and full smoke coverage didn't run. With default async sync mode, writes enqueue events that never get drained without a worker, so ES never sees seeded resources. Mirrors the sqlite-elasticsearch / postgres-elasticsearch / s3-elasticsearch startup paths.
…nt config
Two fixes surfaced by full smoke coverage of the composite backends:
1. Patient/group compartment lookup for non-Patient resource types
previously JOINed the search_index table. When search is offloaded to
Elasticsearch (sqlite-elasticsearch / postgres-elasticsearch), that
table is empty, so Observations were never returned. Now both SQLite
(json_extract over the data BLOB) and Postgres (data #>> '{subject,
reference}' over JSONB) read the resource payload directly, which is
correct whether or not search is offloaded — matching the MongoDB
implementation. Added a regression test that force-offloads search.
2. The primary S3 store never read an endpoint env var, so it always
targeted real AWS even against MinIO. start_s3 / start_s3_elasticsearch
now honor HFS_S3_ENDPOINT, HFS_S3_FORCE_PATH_STYLE, and HFS_S3_ALLOW_HTTP,
mirroring the bulk-export output store. The smoke workflow sets these
for the s3 / s3-elasticsearch backends so writes hit MinIO.
s3-elasticsearch points its primary store at MinIO via AWS_* env creds, which collide with the real-AWS creds the s3 output store needs (a single process shares one AWS SDK credential chain). The S3 output path is already validated by the sqlite/postgres rows, so s3-elasticsearch only needs output=local to exercise S3-as-primary bulk export.
…imary store [skip ci]
…n crate READMEs [skip ci] Move bulk-export env-var docs into helios-rest's README (matching the project convention of documenting config in crate READMEs) and surface $export from the root README's env table, Features list, and Core Components. Drop two stale rows (HFS_BULK_EXPORT_BACKEND, HFS_BULK_EXPORT_DATABASE_URL) from the docker compose guide — neither is read by the code, which reuses HFS_STORAGE_BACKEND/HFS_DATABASE_URL for job state.
smunini
approved these changes
May 28, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
FHIR Bulk Data Export
Implements the FHIR Bulk Data Access IG $export family (system / patient / group) end-to-end, per Discussion #104. Embedded single-instance (SQLite job state + local-FS output + in-process worker pool) is the zero-config default; a multi-instance topology (PostgreSQL job state + S3-compatible output with pre-signed download URLs) is selected at startup with no handler changes. Ships with an external smoke workflow that exercises both topologies on every run and an Inferno Bulk Data IG v2.0.0 conformance workflow against the full SMART Backend Services + Keycloak stack.
Why
Bulk export is the API population-health platforms, payer-provider exchanges, registries, and research/AI pipelines converge on. CRUD + search are not enough once a workload needs every Observation for every patient in a cohort — that's a data-engineering problem (long-running work, durable state, fileserver bandwidth, multi-instance fan-out) rather than a request/response one. The IG defines an asynchronous, manifest-based, NDJSON-over-HTTPS pattern; this PR ships it as a first-class HFS subsystem.
Changes
Persistence — new traits + types (helios-persistence)
Persistence — backends
REST (helios-rest)
<S>, …, BulkExportBundle) sharing the inner build_app.Auth (helios-auth)
helios-fhir
helios-fhirpath
helios-hfs
Ops + docs
Testing
Notes
Implements Discussion #104.