Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/data_index/defaults/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,16 @@

# --- General config ---
REGION = "ap-southeast-2"
BATCH_SIZE = 1_000
OUT_DIR = pathlib.Path(".load/orchestrate-fargate")
THRESHOLD_BYTES = 10 * 1024**2 # 10 MB

# --- Local config ---
BATCH_SIZE = 1_000
MAX_WORKERS = 8 # concurrent batches (limits RAM/CPU pressure)
S5CMD_WORKERS = 8 # s5cmd defaults to 256 — cap it for local runs
TRANSFORM_WORKERS = (
4 # transform threads per batch (total = MAX_WORKERS × TRANSFORM_WORKERS)
)

# --- Live Inventory Source config
_s3_metadata_catalog_config = S3TablesCatalogConfig(
Expand Down Expand Up @@ -59,7 +65,7 @@
)

# --- File fetcher ---
_file_fetcher = S5CMDFetcher(anon=True)
_file_fetcher = S5CMDFetcher(num_workers=S5CMD_WORKERS, anon=True)

# --- Metadata extractor ---
_unstructured_netcdf_extractor = UnstructuedNetCDFExtractor()
Expand Down Expand Up @@ -106,10 +112,12 @@ def run_index_local(
metadata_factory: InMemoryUnstructuredMetadata
| DiskCachedUnstructuredMetadata
| None = None,
transform_max_workers: int | None = None,
transform_max_workers: int | None = TRANSFORM_WORKERS,
):

data_index.orchestrate(
data_index.orchestrate.with_options(
task_runner=prefect.task_runners.ThreadPoolTaskRunner(max_workers=MAX_WORKERS)
)(
inventory_source,
partitioner,
fetcher,
Expand All @@ -122,4 +130,4 @@ def run_index_local(


if __name__ == "__main__":
run_index_local(transform_max_workers=32)
run_index_local()
Loading