From f33a293bca75f635dd7b93bfbeb43478e66574cd Mon Sep 17 00:00:00 2001 From: gkennos Date: Tue, 19 May 2026 22:16:40 +1000 Subject: [PATCH 1/5] improved insert logging and skipping merge if empty --- src/orm_loader/tables/loadable_table.py | 167 ++++++++++++++++++++++-- src/orm_loader/tables/typing.py | 3 + tests/loaders/test_loader_e2e.py | 157 ++++++++++++++++++++++ tests/loaders/test_pg_loader.py | 45 +++++++ 4 files changed, 361 insertions(+), 11 deletions(-) diff --git a/src/orm_loader/tables/loadable_table.py b/src/orm_loader/tables/loadable_table.py index 1ce97fe..d20e0f7 100644 --- a/src/orm_loader/tables/loadable_table.py +++ b/src/orm_loader/tables/loadable_table.py @@ -7,6 +7,7 @@ from typing import Type, ClassVar, Optional, Any, Iterator from pathlib import Path from contextlib import contextmanager +from time import perf_counter from .orm_table import ORMTableBase from .typing import CSVTableProtocol @@ -16,6 +17,11 @@ logger = logging.getLogger(__name__) +def _format_elapsed(seconds: float) -> str: + """Return a compact, human-readable duration for phase logging.""" + return f"{seconds:.2f}s" + + def _require_bind(session: so.Session) -> sa.Engine | sa.Connection: """Return a bound connectable or raise a stable runtime error.""" try: @@ -123,6 +129,7 @@ def manage_indices( """ backend = resolve_backend(session) resolved_index_strategy = backend.resolve_index_strategy(index_strategy) + table_name = cls.__tablename__ indices = list(cls.__table__.indexes) if resolved_index_strategy == "drop_rebuild" else [] inspector = sa.inspect(_require_bind(session)) @@ -133,37 +140,92 @@ def manage_indices( to_drop = [i for i in indices if i.name in existing_in_db] if to_drop: - logger.info(f"Dropping {len(to_drop)} active indices...") + logger.info(f"Table `{table_name}`: Dropping {len(to_drop)} active indices.") + drop_started = perf_counter() for idx in to_drop: session.execute(sa.schema.DropIndex(idx)) + logger.info( + f"Table `{table_name}`: Finished dropping {len(to_drop)} active indices " + f"in {_format_elapsed(perf_counter() - drop_started)}." + ) + logger.info(f"Table `{table_name}`: Committing after index drop.") + commit_started = perf_counter() session.commit() + logger.info( + f"Table `{table_name}`: Commit after index drop completed in " + f"{_format_elapsed(perf_counter() - commit_started)}." + ) + fk_restore_started: float | None = None try: + logger.info(f"Table `{table_name}`: Disabling foreign key checks before merge.") + fk_disable_started = perf_counter() with backend.merge_context(cls, session): - yield - session.commit() + logger.info( + f"Table `{table_name}`: Foreign key checks disabled in " + f"{_format_elapsed(perf_counter() - fk_disable_started)}." + ) + try: + logger.info(f"Table `{table_name}`: Starting merge operation.") + merge_started = perf_counter() + yield + logger.info( + f"Table `{table_name}`: Merge operation SQL completed in " + f"{_format_elapsed(perf_counter() - merge_started)}." + ) + logger.info(f"Table `{table_name}`: Committing merged rows.") + commit_started = perf_counter() + session.commit() + logger.info( + f"Table `{table_name}`: Merge commit completed in " + f"{_format_elapsed(perf_counter() - commit_started)}." + ) + finally: + logger.info(f"Table `{table_name}`: Restoring foreign key checks.") + fk_restore_started = perf_counter() except Exception as e: session.rollback() - logger.error(f"Table `{cls.__tablename__}`: Merge operation failed - {e}") + logger.error(f"Table `{table_name}`: Merge operation failed - {e}") raise finally: + if fk_restore_started is not None: + logger.info( + f"Table `{table_name}`: Foreign key checks restored in " + f"{_format_elapsed(perf_counter() - fk_restore_started)}." + ) if indices: - logger.info(f"Table `{cls.__tablename__}`: Verifying/Rebuilding indices.") + logger.info(f"Table `{table_name}`: Verifying/Rebuilding indices.") + rebuild_started = perf_counter() inspector.clear_cache() # Required to ensure we get the current state of the database after potential changes - existing_idx_names = {idx['name'] for idx in inspector.get_indexes(cls.__tablename__)} + existing_idx_names = {idx['name'] for idx in inspector.get_indexes(table_name)} for idx in indices: if idx.name not in existing_idx_names: try: - logger.info(f"Restoring missing index: {idx.name}") + logger.info(f"Table `{table_name}`: Restoring missing index: {idx.name}") + create_started = perf_counter() session.execute(sa.schema.CreateIndex(idx)) + logger.info( + f"Table `{table_name}`: Restored missing index `{idx.name}` in " + f"{_format_elapsed(perf_counter() - create_started)}." + ) + logger.info(f"Table `{table_name}`: Committing restored index `{idx.name}`.") + commit_started = perf_counter() session.commit() + logger.info( + f"Table `{table_name}`: Commit after restoring index `{idx.name}` " + f"completed in {_format_elapsed(perf_counter() - commit_started)}." + ) except Exception as e: session.rollback() - logger.error(f"Failed to restore {idx.name}: {e}") + logger.error(f"Table `{table_name}`: Failed to restore {idx.name}: {e}") else: - logger.debug(f"Index {idx.name} actually exists on disk. Skipping.") + logger.debug(f"Table `{table_name}`: Index {idx.name} already exists on disk. Skipping.") + logger.info( + f"Table `{table_name}`: Index verification/rebuild completed in " + f"{_format_elapsed(perf_counter() - rebuild_started)}." + ) @classmethod @@ -328,7 +390,8 @@ def load_csv( chunksize Optional chunk size for incremental loading. merge_strategy - Merge strategy to apply (e.g. ``replace`` or ``upsert``). + Merge strategy to apply (e.g. ``replace``, ``upsert``, or + ``insert_if_empty``). quote_mode Quoting mode used by the PostgreSQL fast-path loader. index_strategy @@ -407,6 +470,20 @@ def _merge_insert( backend = resolve_backend(session) backend.merge_insert(cls, session, target, staging) + @classmethod + def _target_has_rows( + cls: Type[CSVTableProtocol], + session: so.Session, + target: str, + ) -> bool: + """ + Return whether the target table currently contains any rows. + """ + row = session.execute( + sa.text(f'SELECT 1 FROM "{target}" LIMIT 1') + ).first() + return row is not None + @classmethod def _merge_upsert( @@ -436,32 +513,100 @@ def merge_from_staging( session An active SQLAlchemy session. merge_strategy - Merge strategy to apply. + Merge strategy to apply (for example ``replace``, + ``upsert``, or ``insert_if_empty``). """ target = cls.__tablename__ staging = cls.staging_tablename() pk_cols = cls.pk_names() _require_bind(session) + if merge_strategy in {"replace", "upsert"}: + logger.info( + f"Table `{target}`: Checking whether target table is empty for merge optimisation." + ) + check_started = perf_counter() + has_rows = cls._target_has_rows( + session=session, + target=target, + ) + logger.info( + f"Table `{target}`: Empty-table optimisation check completed in " + f"{_format_elapsed(perf_counter() - check_started)}." + ) + if not has_rows: + logger.info( + f"Table `{target}`: Target table is empty; routing merge strategy " + f"`{merge_strategy}` to insert-if-empty fast path." + ) + merge_strategy = "insert_if_empty" + if merge_strategy == "replace": + logger.info(f"Table `{target}`: Merge replace delete phase starting.") + delete_started = perf_counter() cls._merge_replace( session=session, target=target, staging=staging, pk_cols=pk_cols, ) + logger.info( + f"Table `{target}`: Merge replace delete phase completed in " + f"{_format_elapsed(perf_counter() - delete_started)}." + ) + logger.info(f"Table `{target}`: Merge insert phase starting.") + insert_started = perf_counter() cls._merge_insert( session=session, target=target, staging=staging, ) + logger.info( + f"Table `{target}`: Merge insert phase completed in " + f"{_format_elapsed(perf_counter() - insert_started)}." + ) elif merge_strategy == "upsert": + logger.info(f"Table `{target}`: Merge upsert phase starting.") + upsert_started = perf_counter() cls._merge_upsert( session=session, target=target, staging=staging, pk_cols=pk_cols, ) + logger.info( + f"Table `{target}`: Merge upsert phase completed in " + f"{_format_elapsed(perf_counter() - upsert_started)}." + ) + elif merge_strategy == "insert_if_empty": + logger.info(f"Table `{target}`: Checking whether target table is empty.") + check_started = perf_counter() + has_rows = cls._target_has_rows( + session=session, + target=target, + ) + logger.info( + f"Table `{target}`: Empty-table check completed in " + f"{_format_elapsed(perf_counter() - check_started)}." + ) + + if has_rows: + raise ValueError( + f"Table `{target}` is not empty; cannot use merge strategy " + f"'insert_if_empty'" + ) + + logger.info(f"Table `{target}`: Merge insert-if-empty phase starting.") + insert_started = perf_counter() + cls._merge_insert( + session=session, + target=target, + staging=staging, + ) + logger.info( + f"Table `{target}`: Merge insert-if-empty phase completed in " + f"{_format_elapsed(perf_counter() - insert_started)}." + ) else: raise ValueError(f"Unknown merge strategy '{merge_strategy}'") diff --git a/src/orm_loader/tables/typing.py b/src/orm_loader/tables/typing.py index bda4751..38322be 100644 --- a/src/orm_loader/tables/typing.py +++ b/src/orm_loader/tables/typing.py @@ -104,6 +104,9 @@ def drop_staging_table(cls, session: so.Session) -> None: ... @classmethod def _merge_insert(cls, session: so.Session, target: str, staging: str) -> None: ... + @classmethod + def _target_has_rows(cls, session: so.Session, target: str) -> bool: ... + @classmethod def _merge_replace(cls, session: so.Session, target: str, staging: str, pk_cols: list[str]) -> None: ... diff --git a/tests/loaders/test_loader_e2e.py b/tests/loaders/test_loader_e2e.py index bb53dd9..e4d7ad3 100644 --- a/tests/loaders/test_loader_e2e.py +++ b/tests/loaders/test_loader_e2e.py @@ -1,5 +1,6 @@ from typing import Type, cast +import logging import numpy as np import pandas as pd import pytest @@ -228,6 +229,100 @@ def test_merge_strategies(session, tmp_path, merge_strategy, expected_rows, expe assert [(r.id, r.name) for r in rows] == expected_rows +def test_insert_if_empty_merge_strategy(session, tmp_path): + csv_path = tmp_path / "test_table.csv" + + pd.DataFrame( + [ + {"id": 1, "name": "alpha"}, + {"id": 2, "name": "beta"}, + ] + ).to_csv(csv_path, index=False, sep="\t") + + loader = PandasLoader() + inserted = _SimpleTable.load_csv( + session, + csv_path, + dedupe=False, + loader=loader, + merge_strategy="insert_if_empty", + ) + session.commit() + + assert inserted == 2 + + rows = session.execute(sa.select(SimpleTable).order_by(SimpleTable.id)).scalars().all() + + assert [(r.id, r.name) for r in rows] == [ + (1, "alpha"), + (2, "beta"), + ] + + +def test_insert_if_empty_raises_on_non_empty_target(session, tmp_path): + csv_path = tmp_path / "test_table.csv" + + pd.DataFrame([{"id": 1, "name": "alpha"}]).to_csv(csv_path, index=False, sep="\t") + + loader = PandasLoader() + _SimpleTable.load_csv(session, csv_path, dedupe=False, loader=loader) + session.commit() + + pd.DataFrame([{"id": 2, "name": "beta"}]).to_csv(csv_path, index=False, sep="\t") + + with pytest.raises(ValueError, match="is not empty; cannot use merge strategy 'insert_if_empty'"): + _SimpleTable.load_csv( + session, + csv_path, + dedupe=False, + loader=loader, + merge_strategy="insert_if_empty", + ) + + +@pytest.mark.parametrize("merge_strategy", ["replace", "upsert"]) +def test_empty_target_routes_merge_to_insert_if_empty(session, tmp_path, caplog, merge_strategy): + csv_path = tmp_path / "test_table.csv" + + pd.DataFrame( + [ + {"id": 1, "name": "alpha"}, + {"id": 2, "name": "beta"}, + ] + ).to_csv(csv_path, index=False, sep="\t") + + caplog.set_level(logging.INFO, logger="orm_loader.tables.loadable_table") + + inserted = _SimpleTable.load_csv( + session, + csv_path, + dedupe=False, + loader=PandasLoader(), + merge_strategy=merge_strategy, + ) + session.commit() + + assert inserted == 2 + + rows = session.execute(sa.select(SimpleTable).order_by(SimpleTable.id)).scalars().all() + assert [(r.id, r.name) for r in rows] == [ + (1, "alpha"), + (2, "beta"), + ] + + messages = [record.getMessage() for record in caplog.records] + + assert any("Checking whether target table is empty for merge optimisation." in message for message in messages) + assert any( + f"Target table is empty; routing merge strategy `{merge_strategy}` to insert-if-empty fast path." + in message + for message in messages + ) + assert any("Merge insert-if-empty phase starting." in message for message in messages) + assert not any("Merge replace delete phase starting." in message for message in messages) + assert not any("Merge upsert phase starting." in message for message in messages) + + def test_staging_table_is_created_and_dropped(session, engine, tmp_path): csv_path = tmp_path / "test_table.csv" @@ -466,6 +561,68 @@ def test_explicit_drop_rebuild_on_sqlite_restores_index(session, engine, tmp_pat assert "ix_test_table_name" in {idx["name"] for idx in inspector.get_indexes("test_table")} +def test_drop_rebuild_logging_shows_merge_phases(session, tmp_path, caplog): + csv_path = tmp_path / "test_table.csv" + + pd.DataFrame( + [ + {"id": 1, "name": "alpha"}, + {"id": 2, "name": "beta"}, + ] + ).to_csv(csv_path, index=False, sep="\t") + + loader = PandasLoader() + _SimpleTable.load_csv(session, csv_path, dedupe=False, loader=loader) + session.commit() + + pd.DataFrame( + [ + {"id": 2, "name": "beta_updated"}, + {"id": 3, "name": "gamma"}, + ] + ).to_csv(csv_path, index=False, sep="\t") + + caplog.set_level(logging.INFO, logger="orm_loader.tables.loadable_table") + + _SimpleTable.load_csv( + session, + csv_path, + dedupe=False, + loader=loader, + merge_strategy="replace", + index_strategy="drop_rebuild", + ) + session.commit() + + messages = [record.getMessage() for record in caplog.records] + + assert any("Dropping 1 active indices." in message for message in messages) + assert any("Finished dropping 1 active indices in " in message for message in messages) + assert any("Committing after index drop." in message for message in messages) + assert any("Commit after index drop completed in " in message for message in messages) + assert any("Disabling foreign key checks before merge." in message for message in messages) + assert any("Foreign key checks disabled in " in message for message in messages) + assert any("Starting merge operation." in message for message in messages) + assert any("Merge replace delete phase starting." in message for message in messages) + assert any("Merge replace delete phase completed in " in message for message in messages) + assert any("Merge insert phase starting." in message for message in messages) + assert any("Merge insert phase completed in " in message for message in messages) + assert any("Merge operation SQL completed in " in message for message in messages) + assert any("Committing merged rows." in message for message in messages) + assert any("Merge commit completed in " in message for message in messages) + assert any("Restoring foreign key checks." in message for message in messages) + assert any("Foreign key checks restored in " in message for message in messages) + assert any("Verifying/Rebuilding indices." in message for message in messages) + assert any("Restoring missing index: ix_test_table_name" in message for message in messages) + assert any("Restored missing index `ix_test_table_name` in " in message for message in messages) + assert any("Committing restored index `ix_test_table_name`." in message for message in messages) + assert any( + "Commit after restoring index `ix_test_table_name` completed in " in message + for message in messages + ) + assert any("Index verification/rebuild completed in " in message for message in messages) + + def test_invalid_index_strategy_raises(session, tmp_path): """An unrecognised strategy value raises ValueError before any DB work.""" csv_path = tmp_path / "test_table.csv" diff --git a/tests/loaders/test_pg_loader.py b/tests/loaders/test_pg_loader.py index 0e278a8..f556354 100644 --- a/tests/loaders/test_pg_loader.py +++ b/tests/loaders/test_pg_loader.py @@ -95,6 +95,51 @@ def test_postgres_upsert_does_not_update(pg_session, tmp_path): assert [(r.id, r.name) for r in rows] == [(1, "alpha")] +@pytest.mark.postgres +def test_postgres_insert_if_empty(pg_session, tmp_path): + csv = tmp_path / "test_table.csv" + + pd.DataFrame( + [ + {"id": 1, "name": "alpha"}, + {"id": 2, "name": "beta"}, + ] + ).to_csv(csv, index=False) + + inserted = SimpleTable.load_csv( + pg_session, + csv, + merge_strategy="insert_if_empty", + ) + pg_session.commit() + + assert inserted == 2 + + rows = pg_session.execute(sa.select(SimpleTable).order_by(SimpleTable.id)).scalars().all() + assert [(r.id, r.name) for r in rows] == [ + (1, "alpha"), + (2, "beta"), + ] + + +@pytest.mark.postgres +def test_postgres_insert_if_empty_raises_on_non_empty_target(pg_session, tmp_path): + csv = tmp_path / "test_table.csv" + + pd.DataFrame([{"id": 1, "name": "alpha"}]).to_csv(csv, index=False) + SimpleTable.load_csv(pg_session, csv) + pg_session.commit() + + pd.DataFrame([{"id": 2, "name": "beta"}]).to_csv(csv, index=False) + + with pytest.raises(ValueError, match="is not empty; cannot use merge strategy 'insert_if_empty'"): + SimpleTable.load_csv( + pg_session, + csv, + merge_strategy="insert_if_empty", + ) + + @pytest.mark.postgres def test_postgres_copy_large_batch(pg_session, tmp_path): csv = tmp_path / "test_table.csv" From 207f98a860381ab63e4d57873fddcf653e0f2d4a Mon Sep 17 00:00:00 2001 From: gkennos Date: Tue, 19 May 2026 22:17:42 +1000 Subject: [PATCH 2/5] improved insert logging and skipping merge if empty --- CHANGELOG.md | 6 +++++- pyproject.toml | 2 +- uv.lock | 6 +++++- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3580cb4..78358e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -114,4 +114,8 @@ - update to handle psycopg (as opposed to psycopg2) cleanly - overall api cleanup with the goal of being more explicit about selection of specific db backends - general typing cleanup -- removed example notebooks until they can be cleaned up with working use-cases according to updated api \ No newline at end of file +- removed example notebooks until they can be cleaned up with working use-cases according to updated api + +# 0.4.1 +- improved loader logging +- skip merge step if table empty \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index 407130e..419d7e2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "orm-loader" -version = "0.4.0" +version = "0.4.1" description = "Generic base classes to handle ORM functionality for multiple downstream datamodels" readme = "README.md" authors = [ diff --git a/uv.lock b/uv.lock index 87d1bcd..a2ef818 100644 --- a/uv.lock +++ b/uv.lock @@ -164,6 +164,7 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/f8/0a/a3871375c7b9727edaeeea994bfff7c63ff7804c9829c19309ba2e058807/greenlet-3.3.0-cp312-cp312-macosx_11_0_universal2.whl", hash = "sha256:b01548f6e0b9e9784a2c99c5651e5dc89ffcbe870bc5fb2e5ef864e9cc6b5dcb", size = 276379, upload-time = "2025-12-04T14:23:30.498Z" }, { url = "https://files.pythonhosted.org/packages/43/ab/7ebfe34dce8b87be0d11dae91acbf76f7b8246bf9d6b319c741f99fa59c6/greenlet-3.3.0-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:349345b770dc88f81506c6861d22a6ccd422207829d2c854ae2af8025af303e3", size = 597294, upload-time = "2025-12-04T14:50:06.847Z" }, { url = "https://files.pythonhosted.org/packages/a4/39/f1c8da50024feecd0793dbd5e08f526809b8ab5609224a2da40aad3a7641/greenlet-3.3.0-cp312-cp312-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:e8e18ed6995e9e2c0b4ed264d2cf89260ab3ac7e13555b8032b25a74c6d18655", size = 607742, upload-time = "2025-12-04T14:57:42.349Z" }, + { url = "https://files.pythonhosted.org/packages/77/cb/43692bcd5f7a0da6ec0ec6d58ee7cddb606d055ce94a62ac9b1aa481e969/greenlet-3.3.0-cp312-cp312-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:c024b1e5696626890038e34f76140ed1daf858e37496d33f2af57f06189e70d7", size = 622297, upload-time = "2025-12-04T15:07:13.552Z" }, { url = "https://files.pythonhosted.org/packages/75/b0/6bde0b1011a60782108c01de5913c588cf51a839174538d266de15e4bf4d/greenlet-3.3.0-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:047ab3df20ede6a57c35c14bf5200fcf04039d50f908270d3f9a7a82064f543b", size = 609885, upload-time = "2025-12-04T14:26:02.368Z" }, { url = "https://files.pythonhosted.org/packages/49/0e/49b46ac39f931f59f987b7cd9f34bfec8ef81d2a1e6e00682f55be5de9f4/greenlet-3.3.0-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:2d9ad37fc657b1102ec880e637cccf20191581f75c64087a549e66c57e1ceb53", size = 1567424, upload-time = "2025-12-04T15:04:23.757Z" }, { url = "https://files.pythonhosted.org/packages/05/f5/49a9ac2dff7f10091935def9165c90236d8f175afb27cbed38fb1d61ab6b/greenlet-3.3.0-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:83cd0e36932e0e7f36a64b732a6f60c2fc2df28c351bae79fbaf4f8092fe7614", size = 1636017, upload-time = "2025-12-04T14:27:29.688Z" }, @@ -171,6 +172,7 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/02/2f/28592176381b9ab2cafa12829ba7b472d177f3acc35d8fbcf3673d966fff/greenlet-3.3.0-cp313-cp313-macosx_11_0_universal2.whl", hash = "sha256:a1e41a81c7e2825822f4e068c48cb2196002362619e2d70b148f20a831c00739", size = 275140, upload-time = "2025-12-04T14:23:01.282Z" }, { url = "https://files.pythonhosted.org/packages/2c/80/fbe937bf81e9fca98c981fe499e59a3f45df2a04da0baa5c2be0dca0d329/greenlet-3.3.0-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:9f515a47d02da4d30caaa85b69474cec77b7929b2e936ff7fb853d42f4bf8808", size = 599219, upload-time = "2025-12-04T14:50:08.309Z" }, { url = "https://files.pythonhosted.org/packages/c2/ff/7c985128f0514271b8268476af89aee6866df5eec04ac17dcfbc676213df/greenlet-3.3.0-cp313-cp313-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:7d2d9fd66bfadf230b385fdc90426fcd6eb64db54b40c495b72ac0feb5766c54", size = 610211, upload-time = "2025-12-04T14:57:43.968Z" }, + { url = "https://files.pythonhosted.org/packages/79/07/c47a82d881319ec18a4510bb30463ed6891f2ad2c1901ed5ec23d3de351f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:30a6e28487a790417d036088b3bcb3f3ac7d8babaa7d0139edbaddebf3af9492", size = 624311, upload-time = "2025-12-04T15:07:14.697Z" }, { url = "https://files.pythonhosted.org/packages/fd/8e/424b8c6e78bd9837d14ff7df01a9829fc883ba2ab4ea787d4f848435f23f/greenlet-3.3.0-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:087ea5e004437321508a8d6f20efc4cfec5e3c30118e1417ea96ed1d93950527", size = 612833, upload-time = "2025-12-04T14:26:03.669Z" }, { url = "https://files.pythonhosted.org/packages/b5/ba/56699ff9b7c76ca12f1cdc27a886d0f81f2189c3455ff9f65246780f713d/greenlet-3.3.0-cp313-cp313-musllinux_1_2_aarch64.whl", hash = "sha256:ab97cf74045343f6c60a39913fa59710e4bd26a536ce7ab2397adf8b27e67c39", size = 1567256, upload-time = "2025-12-04T15:04:25.276Z" }, { url = "https://files.pythonhosted.org/packages/1e/37/f31136132967982d698c71a281a8901daf1a8fbab935dce7c0cf15f942cc/greenlet-3.3.0-cp313-cp313-musllinux_1_2_x86_64.whl", hash = "sha256:5375d2e23184629112ca1ea89a53389dddbffcf417dad40125713d88eb5f96e8", size = 1636483, upload-time = "2025-12-04T14:27:30.804Z" }, @@ -178,6 +180,7 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d7/7c/f0a6d0ede2c7bf092d00bc83ad5bafb7e6ec9b4aab2fbdfa6f134dc73327/greenlet-3.3.0-cp314-cp314-macosx_11_0_universal2.whl", hash = "sha256:60c2ef0f578afb3c8d92ea07ad327f9a062547137afe91f38408f08aacab667f", size = 275671, upload-time = "2025-12-04T14:23:05.267Z" }, { url = "https://files.pythonhosted.org/packages/44/06/dac639ae1a50f5969d82d2e3dd9767d30d6dbdbab0e1a54010c8fe90263c/greenlet-3.3.0-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0a5d554d0712ba1de0a6c94c640f7aeba3f85b3a6e1f2899c11c2c0428da9365", size = 646360, upload-time = "2025-12-04T14:50:10.026Z" }, { url = "https://files.pythonhosted.org/packages/e0/94/0fb76fe6c5369fba9bf98529ada6f4c3a1adf19e406a47332245ef0eb357/greenlet-3.3.0-cp314-cp314-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:3a898b1e9c5f7307ebbde4102908e6cbfcb9ea16284a3abe15cab996bee8b9b3", size = 658160, upload-time = "2025-12-04T14:57:45.41Z" }, + { url = "https://files.pythonhosted.org/packages/93/79/d2c70cae6e823fac36c3bbc9077962105052b7ef81db2f01ec3b9bf17e2b/greenlet-3.3.0-cp314-cp314-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:dcd2bdbd444ff340e8d6bdf54d2f206ccddbb3ccfdcd3c25bf4afaa7b8f0cf45", size = 671388, upload-time = "2025-12-04T15:07:15.789Z" }, { url = "https://files.pythonhosted.org/packages/b8/14/bab308fc2c1b5228c3224ec2bf928ce2e4d21d8046c161e44a2012b5203e/greenlet-3.3.0-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:5773edda4dc00e173820722711d043799d3adb4f01731f40619e07ea2750b955", size = 660166, upload-time = "2025-12-04T14:26:05.099Z" }, { url = "https://files.pythonhosted.org/packages/4b/d2/91465d39164eaa0085177f61983d80ffe746c5a1860f009811d498e7259c/greenlet-3.3.0-cp314-cp314-musllinux_1_2_aarch64.whl", hash = "sha256:ac0549373982b36d5fd5d30beb8a7a33ee541ff98d2b502714a09f1169f31b55", size = 1615193, upload-time = "2025-12-04T15:04:27.041Z" }, { url = "https://files.pythonhosted.org/packages/42/1b/83d110a37044b92423084d52d5d5a3b3a73cafb51b547e6d7366ff62eff1/greenlet-3.3.0-cp314-cp314-musllinux_1_2_x86_64.whl", hash = "sha256:d198d2d977460358c3b3a4dc844f875d1adb33817f0613f663a656f463764ccc", size = 1683653, upload-time = "2025-12-04T14:27:32.366Z" }, @@ -185,6 +188,7 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/a0/66/bd6317bc5932accf351fc19f177ffba53712a202f9df10587da8df257c7e/greenlet-3.3.0-cp314-cp314t-macosx_11_0_universal2.whl", hash = "sha256:d6ed6f85fae6cdfdb9ce04c9bf7a08d666cfcfb914e7d006f44f840b46741931", size = 282638, upload-time = "2025-12-04T14:25:20.941Z" }, { url = "https://files.pythonhosted.org/packages/30/cf/cc81cb030b40e738d6e69502ccbd0dd1bced0588e958f9e757945de24404/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:d9125050fcf24554e69c4cacb086b87b3b55dc395a8b3ebe6487b045b2614388", size = 651145, upload-time = "2025-12-04T14:50:11.039Z" }, { url = "https://files.pythonhosted.org/packages/9c/ea/1020037b5ecfe95ca7df8d8549959baceb8186031da83d5ecceff8b08cd2/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_ppc64le.manylinux_2_28_ppc64le.whl", hash = "sha256:87e63ccfa13c0a0f6234ed0add552af24cc67dd886731f2261e46e241608bee3", size = 654236, upload-time = "2025-12-04T14:57:47.007Z" }, + { url = "https://files.pythonhosted.org/packages/69/cc/1e4bae2e45ca2fa55299f4e85854606a78ecc37fead20d69322f96000504/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_s390x.manylinux_2_28_s390x.whl", hash = "sha256:2662433acbca297c9153a4023fe2161c8dcfdcc91f10433171cf7e7d94ba2221", size = 662506, upload-time = "2025-12-04T15:07:16.906Z" }, { url = "https://files.pythonhosted.org/packages/57/b9/f8025d71a6085c441a7eaff0fd928bbb275a6633773667023d19179fe815/greenlet-3.3.0-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:3c6e9b9c1527a78520357de498b0e709fb9e2f49c3a513afd5a249007261911b", size = 653783, upload-time = "2025-12-04T14:26:06.225Z" }, { url = "https://files.pythonhosted.org/packages/f6/c7/876a8c7a7485d5d6b5c6821201d542ef28be645aa024cfe1145b35c120c1/greenlet-3.3.0-cp314-cp314t-musllinux_1_2_aarch64.whl", hash = "sha256:286d093f95ec98fdd92fcb955003b8a3d054b4e2cab3e2707a5039e7b50520fd", size = 1614857, upload-time = "2025-12-04T15:04:28.484Z" }, { url = "https://files.pythonhosted.org/packages/4f/dc/041be1dff9f23dac5f48a43323cd0789cb798342011c19a248d9c9335536/greenlet-3.3.0-cp314-cp314t-musllinux_1_2_x86_64.whl", hash = "sha256:6c10513330af5b8ae16f023e8ddbfb486ab355d04467c4679c5cfe4659975dd9", size = 1676034, upload-time = "2025-12-04T14:27:33.531Z" }, @@ -614,7 +618,7 @@ wheels = [ [[package]] name = "orm-loader" -version = "0.4.0" +version = "0.4.1" source = { editable = "." } dependencies = [ { name = "chardet" }, From 179d0c1cba929ca868b7d10942b5617eda0a9967 Mon Sep 17 00:00:00 2001 From: gkennos Date: Tue, 19 May 2026 22:31:20 +1000 Subject: [PATCH 3/5] code review suggestions --- src/orm_loader/tables/loadable_table.py | 75 ++++++++++++++----------- 1 file changed, 41 insertions(+), 34 deletions(-) diff --git a/src/orm_loader/tables/loadable_table.py b/src/orm_loader/tables/loadable_table.py index d20e0f7..c8299c0 100644 --- a/src/orm_loader/tables/loadable_table.py +++ b/src/orm_loader/tables/loadable_table.py @@ -189,43 +189,43 @@ def manage_indices( logger.error(f"Table `{table_name}`: Merge operation failed - {e}") raise finally: - if fk_restore_started is not None: - logger.info( + if fk_restore_started is not None: + logger.info( f"Table `{table_name}`: Foreign key checks restored in " f"{_format_elapsed(perf_counter() - fk_restore_started)}." - ) - if indices: - logger.info(f"Table `{table_name}`: Verifying/Rebuilding indices.") - rebuild_started = perf_counter() - inspector.clear_cache() # Required to ensure we get the current state of the database after potential changes - existing_idx_names = {idx['name'] for idx in inspector.get_indexes(table_name)} + ) + if indices: + logger.info(f"Table `{table_name}`: Verifying/Rebuilding indices.") + rebuild_started = perf_counter() + inspector.clear_cache() # Required to ensure we get the current state of the database after potential changes + existing_idx_names = {idx['name'] for idx in inspector.get_indexes(table_name)} - for idx in indices: - if idx.name not in existing_idx_names: - try: - logger.info(f"Table `{table_name}`: Restoring missing index: {idx.name}") - create_started = perf_counter() - session.execute(sa.schema.CreateIndex(idx)) - logger.info( - f"Table `{table_name}`: Restored missing index `{idx.name}` in " - f"{_format_elapsed(perf_counter() - create_started)}." - ) - logger.info(f"Table `{table_name}`: Committing restored index `{idx.name}`.") - commit_started = perf_counter() - session.commit() - logger.info( - f"Table `{table_name}`: Commit after restoring index `{idx.name}` " - f"completed in {_format_elapsed(perf_counter() - commit_started)}." + for idx in indices: + if idx.name not in existing_idx_names: + try: + logger.info(f"Table `{table_name}`: Restoring missing index: {idx.name}") + create_started = perf_counter() + session.execute(sa.schema.CreateIndex(idx)) + logger.info( + f"Table `{table_name}`: Restored missing index `{idx.name}` in " + f"{_format_elapsed(perf_counter() - create_started)}." + ) + logger.info(f"Table `{table_name}`: Committing restored index `{idx.name}`.") + commit_started = perf_counter() + session.commit() + logger.info( + f"Table `{table_name}`: Commit after restoring index `{idx.name}` " + f"completed in {_format_elapsed(perf_counter() - commit_started)}." ) - except Exception as e: - session.rollback() - logger.error(f"Table `{table_name}`: Failed to restore {idx.name}: {e}") - else: - logger.debug(f"Table `{table_name}`: Index {idx.name} already exists on disk. Skipping.") - logger.info( - f"Table `{table_name}`: Index verification/rebuild completed in " - f"{_format_elapsed(perf_counter() - rebuild_started)}." - ) + except Exception as e: + session.rollback() + logger.error(f"Table `{table_name}`: Failed to restore {idx.name}: {e}") + else: + logger.debug(f"Table `{table_name}`: Index {idx.name} already exists on disk. Skipping.") + logger.info( + f"Table `{table_name}`: Index verification/rebuild completed in " + f"{_format_elapsed(perf_counter() - rebuild_started)}." + ) @classmethod @@ -479,8 +479,15 @@ def _target_has_rows( """ Return whether the target table currently contains any rows. """ + table = cls.__table__ + if target not in {table.name, table.fullname}: + table = sa.Table( + target, + sa.MetaData(), + autoload_with=session.get_bind(), + ) row = session.execute( - sa.text(f'SELECT 1 FROM "{target}" LIMIT 1') + sa.select(sa.literal(1)).select_from(table).limit(1) ).first() return row is not None From 7d8eaaff1f40e97b73eb1e1b3e944979c5ea466e Mon Sep 17 00:00:00 2001 From: gkennos Date: Tue, 19 May 2026 22:41:11 +1000 Subject: [PATCH 4/5] code review suggestions --- src/orm_loader/tables/loadable_table.py | 65 ++++++++++++++++--------- tests/loaders/test_loader_e2e.py | 6 ++- 2 files changed, 46 insertions(+), 25 deletions(-) diff --git a/src/orm_loader/tables/loadable_table.py b/src/orm_loader/tables/loadable_table.py index c8299c0..acec0f9 100644 --- a/src/orm_loader/tables/loadable_table.py +++ b/src/orm_loader/tables/loadable_table.py @@ -166,13 +166,7 @@ def manage_indices( f"{_format_elapsed(perf_counter() - fk_disable_started)}." ) try: - logger.info(f"Table `{table_name}`: Starting merge operation.") - merge_started = perf_counter() yield - logger.info( - f"Table `{table_name}`: Merge operation SQL completed in " - f"{_format_elapsed(perf_counter() - merge_started)}." - ) logger.info(f"Table `{table_name}`: Committing merged rows.") commit_started = perf_counter() session.commit() @@ -191,8 +185,8 @@ def manage_indices( finally: if fk_restore_started is not None: logger.info( - f"Table `{table_name}`: Foreign key checks restored in " - f"{_format_elapsed(perf_counter() - fk_restore_started)}." + f"Table `{table_name}`: Foreign key checks restored in " + f"{_format_elapsed(perf_counter() - fk_restore_started)}." ) if indices: logger.info(f"Table `{table_name}`: Verifying/Rebuilding indices.") @@ -410,7 +404,27 @@ def load_csv( raise ValueError( f"CSV filename '{path.name}' does not match table '{cls.__tablename__}'" ) - + + if merge_strategy == "insert_if_empty": + logger.info( + f"Table `{cls.__tablename__}`: Checking whether target table is empty before staging load." + ) + check_started = perf_counter() + has_rows = cls._target_has_rows( + session=session, + target=cls.__tablename__, + ) + logger.info( + f"Table `{cls.__tablename__}`: Pre-load empty-table check completed in " + f"{_format_elapsed(perf_counter() - check_started)}." + ) + + if has_rows: + raise ValueError( + f"Table `{cls.__tablename__}` is not empty; cannot use merge strategy " + f"'insert_if_empty'" + ) + loader_context = LoaderContext( tableclass=cls, session=session, @@ -528,6 +542,7 @@ def merge_from_staging( pk_cols = cls.pk_names() _require_bind(session) + target_empty_confirmed = False if merge_strategy in {"replace", "upsert"}: logger.info( f"Table `{target}`: Checking whether target table is empty for merge optimisation." @@ -546,6 +561,7 @@ def merge_from_staging( f"Table `{target}`: Target table is empty; routing merge strategy " f"`{merge_strategy}` to insert-if-empty fast path." ) + target_empty_confirmed = True merge_strategy = "insert_if_empty" if merge_strategy == "replace": @@ -586,22 +602,23 @@ def merge_from_staging( f"{_format_elapsed(perf_counter() - upsert_started)}." ) elif merge_strategy == "insert_if_empty": - logger.info(f"Table `{target}`: Checking whether target table is empty.") - check_started = perf_counter() - has_rows = cls._target_has_rows( - session=session, - target=target, - ) - logger.info( - f"Table `{target}`: Empty-table check completed in " - f"{_format_elapsed(perf_counter() - check_started)}." - ) - - if has_rows: - raise ValueError( - f"Table `{target}` is not empty; cannot use merge strategy " - f"'insert_if_empty'" + if not target_empty_confirmed: + logger.info(f"Table `{target}`: Checking whether target table is empty.") + check_started = perf_counter() + has_rows = cls._target_has_rows( + session=session, + target=target, ) + logger.info( + f"Table `{target}`: Empty-table check completed in " + f"{_format_elapsed(perf_counter() - check_started)}." + ) + + if has_rows: + raise ValueError( + f"Table `{target}` is not empty; cannot use merge strategy " + f"'insert_if_empty'" + ) logger.info(f"Table `{target}`: Merge insert-if-empty phase starting.") insert_started = perf_counter() diff --git a/tests/loaders/test_loader_e2e.py b/tests/loaders/test_loader_e2e.py index e4d7ad3..5368c8a 100644 --- a/tests/loaders/test_loader_e2e.py +++ b/tests/loaders/test_loader_e2e.py @@ -259,7 +259,7 @@ def test_insert_if_empty_merge_strategy(session, tmp_path): ] -def test_insert_if_empty_raises_on_non_empty_target(session, tmp_path): +def test_insert_if_empty_raises_on_non_empty_target(session, engine, tmp_path): csv_path = tmp_path / "test_table.csv" pd.DataFrame([{"id": 1, "name": "alpha"}]).to_csv(csv_path, index=False, sep="\t") @@ -279,6 +279,9 @@ def test_insert_if_empty_raises_on_non_empty_target(session, tmp_path): merge_strategy="insert_if_empty", ) + inspector = sa.inspect(engine) + assert not inspector.has_table(SimpleTable.staging_tablename()) + @pytest.mark.parametrize("merge_strategy", ["replace", "upsert"]) def test_empty_target_routes_merge_to_insert_if_empty(session, tmp_path, caplog, merge_strategy): @@ -319,6 +322,7 @@ def test_empty_target_routes_merge_to_insert_if_empty(session, tmp_path, caplog, for message in messages ) assert any("Merge insert-if-empty phase starting." in message for message in messages) + assert not any("Checking whether target table is empty." in message for message in messages) assert not any("Merge replace delete phase starting." in message for message in messages) assert not any("Merge upsert phase starting." in message for message in messages) From c3ee6bc9deea5d196efcc795c3973efc4c7286a6 Mon Sep 17 00:00:00 2001 From: gkennos Date: Tue, 19 May 2026 22:54:09 +1000 Subject: [PATCH 5/5] correcting log message in test --- tests/loaders/test_loader_e2e.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/loaders/test_loader_e2e.py b/tests/loaders/test_loader_e2e.py index 5368c8a..09290a7 100644 --- a/tests/loaders/test_loader_e2e.py +++ b/tests/loaders/test_loader_e2e.py @@ -606,12 +606,12 @@ def test_drop_rebuild_logging_shows_merge_phases(session, tmp_path, caplog): assert any("Commit after index drop completed in " in message for message in messages) assert any("Disabling foreign key checks before merge." in message for message in messages) assert any("Foreign key checks disabled in " in message for message in messages) - assert any("Starting merge operation." in message for message in messages) + assert any("Merging staging data into target table" in message for message in messages) assert any("Merge replace delete phase starting." in message for message in messages) assert any("Merge replace delete phase completed in " in message for message in messages) assert any("Merge insert phase starting." in message for message in messages) assert any("Merge insert phase completed in " in message for message in messages) - assert any("Merge operation SQL completed in " in message for message in messages) + assert any("Merging staging data into target table" in message for message in messages) assert any("Committing merged rows." in message for message in messages) assert any("Merge commit completed in " in message for message in messages) assert any("Restoring foreign key checks." in message for message in messages)