diff --git a/src/data_index/defaults/local.py b/src/data_index/defaults/local.py index ba59e42..d1f5fc0 100644 --- a/src/data_index/defaults/local.py +++ b/src/data_index/defaults/local.py @@ -24,10 +24,16 @@ # --- General config --- REGION = "ap-southeast-2" -BATCH_SIZE = 1_000 OUT_DIR = pathlib.Path(".load/orchestrate-fargate") THRESHOLD_BYTES = 10 * 1024**2 # 10 MB +# --- Local config --- +BATCH_SIZE = 1_000 +MAX_WORKERS = 8 # concurrent batches (limits RAM/CPU pressure) +S5CMD_WORKERS = 8 # s5cmd defaults to 256 — cap it for local runs +TRANSFORM_WORKERS = ( + 4 # transform threads per batch (total = MAX_WORKERS × TRANSFORM_WORKERS) +) # --- Live Inventory Source config _s3_metadata_catalog_config = S3TablesCatalogConfig( @@ -59,7 +65,7 @@ ) # --- File fetcher --- -_file_fetcher = S5CMDFetcher(anon=True) +_file_fetcher = S5CMDFetcher(num_workers=S5CMD_WORKERS, anon=True) # --- Metadata extractor --- _unstructured_netcdf_extractor = UnstructuedNetCDFExtractor() @@ -106,10 +112,12 @@ def run_index_local( metadata_factory: InMemoryUnstructuredMetadata | DiskCachedUnstructuredMetadata | None = None, - transform_max_workers: int | None = None, + transform_max_workers: int | None = TRANSFORM_WORKERS, ): - data_index.orchestrate( + data_index.orchestrate.with_options( + task_runner=prefect.task_runners.ThreadPoolTaskRunner(max_workers=MAX_WORKERS) + )( inventory_source, partitioner, fetcher, @@ -122,4 +130,4 @@ def run_index_local( if __name__ == "__main__": - run_index_local(transform_max_workers=32) + run_index_local()