Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -114,4 +114,8 @@
- update to handle psycopg (as opposed to psycopg2) cleanly
- overall api cleanup with the goal of being more explicit about selection of specific db backends
- general typing cleanup
- removed example notebooks until they can be cleaned up with working use-cases according to updated api
- removed example notebooks until they can be cleaned up with working use-cases according to updated api

# 0.4.1
- improved loader logging
- skip merge step if table empty
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "orm-loader"
version = "0.4.0"
version = "0.4.1"
description = "Generic base classes to handle ORM functionality for multiple downstream datamodels"
readme = "README.md"
authors = [
Expand Down
213 changes: 191 additions & 22 deletions src/orm_loader/tables/loadable_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import Type, ClassVar, Optional, Any, Iterator
from pathlib import Path
from contextlib import contextmanager
from time import perf_counter

from .orm_table import ORMTableBase
from .typing import CSVTableProtocol
Expand All @@ -16,6 +17,11 @@
logger = logging.getLogger(__name__)


def _format_elapsed(seconds: float) -> str:
"""Return a compact, human-readable duration for phase logging."""
return f"{seconds:.2f}s"


def _require_bind(session: so.Session) -> sa.Engine | sa.Connection:
"""Return a bound connectable or raise a stable runtime error."""
try:
Expand Down Expand Up @@ -123,6 +129,7 @@ def manage_indices(
"""
backend = resolve_backend(session)
resolved_index_strategy = backend.resolve_index_strategy(index_strategy)
table_name = cls.__tablename__

indices = list(cls.__table__.indexes) if resolved_index_strategy == "drop_rebuild" else []
inspector = sa.inspect(_require_bind(session))
Expand All @@ -133,37 +140,86 @@ def manage_indices(
to_drop = [i for i in indices if i.name in existing_in_db]

if to_drop:
logger.info(f"Dropping {len(to_drop)} active indices...")
logger.info(f"Table `{table_name}`: Dropping {len(to_drop)} active indices.")
drop_started = perf_counter()
for idx in to_drop:
session.execute(sa.schema.DropIndex(idx))
logger.info(
f"Table `{table_name}`: Finished dropping {len(to_drop)} active indices "
f"in {_format_elapsed(perf_counter() - drop_started)}."
)
logger.info(f"Table `{table_name}`: Committing after index drop.")
commit_started = perf_counter()
session.commit()
logger.info(
f"Table `{table_name}`: Commit after index drop completed in "
f"{_format_elapsed(perf_counter() - commit_started)}."
)

fk_restore_started: float | None = None
try:
logger.info(f"Table `{table_name}`: Disabling foreign key checks before merge.")
fk_disable_started = perf_counter()
with backend.merge_context(cls, session):
yield
session.commit()
logger.info(
f"Table `{table_name}`: Foreign key checks disabled in "
f"{_format_elapsed(perf_counter() - fk_disable_started)}."
)
try:
yield
logger.info(f"Table `{table_name}`: Committing merged rows.")
commit_started = perf_counter()
session.commit()
logger.info(
f"Table `{table_name}`: Merge commit completed in "
f"{_format_elapsed(perf_counter() - commit_started)}."
)
finally:
logger.info(f"Table `{table_name}`: Restoring foreign key checks.")
fk_restore_started = perf_counter()

except Exception as e:
session.rollback()
logger.error(f"Table `{cls.__tablename__}`: Merge operation failed - {e}")
logger.error(f"Table `{table_name}`: Merge operation failed - {e}")
raise
finally:
if indices:
logger.info(f"Table `{cls.__tablename__}`: Verifying/Rebuilding indices.")
inspector.clear_cache() # Required to ensure we get the current state of the database after potential changes
existing_idx_names = {idx['name'] for idx in inspector.get_indexes(cls.__tablename__)}
if fk_restore_started is not None:
logger.info(
f"Table `{table_name}`: Foreign key checks restored in "
f"{_format_elapsed(perf_counter() - fk_restore_started)}."
)
Comment thread
gkennos marked this conversation as resolved.
if indices:
logger.info(f"Table `{table_name}`: Verifying/Rebuilding indices.")
rebuild_started = perf_counter()
inspector.clear_cache() # Required to ensure we get the current state of the database after potential changes
existing_idx_names = {idx['name'] for idx in inspector.get_indexes(table_name)}

for idx in indices:
if idx.name not in existing_idx_names:
try:
logger.info(f"Restoring missing index: {idx.name}")
session.execute(sa.schema.CreateIndex(idx))
session.commit()
except Exception as e:
session.rollback()
logger.error(f"Failed to restore {idx.name}: {e}")
else:
logger.debug(f"Index {idx.name} actually exists on disk. Skipping.")
for idx in indices:
if idx.name not in existing_idx_names:
try:
logger.info(f"Table `{table_name}`: Restoring missing index: {idx.name}")
create_started = perf_counter()
session.execute(sa.schema.CreateIndex(idx))
logger.info(
f"Table `{table_name}`: Restored missing index `{idx.name}` in "
f"{_format_elapsed(perf_counter() - create_started)}."
)
logger.info(f"Table `{table_name}`: Committing restored index `{idx.name}`.")
commit_started = perf_counter()
session.commit()
logger.info(
f"Table `{table_name}`: Commit after restoring index `{idx.name}` "
f"completed in {_format_elapsed(perf_counter() - commit_started)}."
)
except Exception as e:
session.rollback()
logger.error(f"Table `{table_name}`: Failed to restore {idx.name}: {e}")
else:
logger.debug(f"Table `{table_name}`: Index {idx.name} already exists on disk. Skipping.")
logger.info(
f"Table `{table_name}`: Index verification/rebuild completed in "
f"{_format_elapsed(perf_counter() - rebuild_started)}."
)


@classmethod
Expand Down Expand Up @@ -328,7 +384,8 @@ def load_csv(
chunksize
Optional chunk size for incremental loading.
merge_strategy
Merge strategy to apply (e.g. ``replace`` or ``upsert``).
Merge strategy to apply (e.g. ``replace``, ``upsert``, or
``insert_if_empty``).
quote_mode
Quoting mode used by the PostgreSQL fast-path loader.
index_strategy
Expand All @@ -347,7 +404,27 @@ def load_csv(
raise ValueError(
f"CSV filename '{path.name}' does not match table '{cls.__tablename__}'"
)


if merge_strategy == "insert_if_empty":
logger.info(
f"Table `{cls.__tablename__}`: Checking whether target table is empty before staging load."
)
check_started = perf_counter()
has_rows = cls._target_has_rows(
session=session,
target=cls.__tablename__,
)
logger.info(
f"Table `{cls.__tablename__}`: Pre-load empty-table check completed in "
f"{_format_elapsed(perf_counter() - check_started)}."
)

if has_rows:
raise ValueError(
f"Table `{cls.__tablename__}` is not empty; cannot use merge strategy "
f"'insert_if_empty'"
)

loader_context = LoaderContext(
tableclass=cls,
session=session,
Expand Down Expand Up @@ -407,6 +484,27 @@ def _merge_insert(
backend = resolve_backend(session)
backend.merge_insert(cls, session, target, staging)

@classmethod
def _target_has_rows(
cls: Type[CSVTableProtocol],
session: so.Session,
target: str,
) -> bool:
"""
Return whether the target table currently contains any rows.
"""
table = cls.__table__
if target not in {table.name, table.fullname}:
table = sa.Table(
target,
sa.MetaData(),
autoload_with=session.get_bind(),
)
row = session.execute(
sa.select(sa.literal(1)).select_from(table).limit(1)
).first()
return row is not None


@classmethod
def _merge_upsert(
Expand Down Expand Up @@ -436,32 +534,103 @@ def merge_from_staging(
session
An active SQLAlchemy session.
merge_strategy
Merge strategy to apply.
Merge strategy to apply (for example ``replace``,
``upsert``, or ``insert_if_empty``).
"""
target = cls.__tablename__
staging = cls.staging_tablename()
pk_cols = cls.pk_names()

_require_bind(session)
target_empty_confirmed = False
if merge_strategy in {"replace", "upsert"}:
logger.info(
f"Table `{target}`: Checking whether target table is empty for merge optimisation."
)
check_started = perf_counter()
has_rows = cls._target_has_rows(
session=session,
target=target,
)
logger.info(
f"Table `{target}`: Empty-table optimisation check completed in "
f"{_format_elapsed(perf_counter() - check_started)}."
)
if not has_rows:
logger.info(
f"Table `{target}`: Target table is empty; routing merge strategy "
f"`{merge_strategy}` to insert-if-empty fast path."
)
target_empty_confirmed = True
merge_strategy = "insert_if_empty"
Comment thread
gkennos marked this conversation as resolved.
Comment thread
gkennos marked this conversation as resolved.

if merge_strategy == "replace":
logger.info(f"Table `{target}`: Merge replace delete phase starting.")
delete_started = perf_counter()
cls._merge_replace(
session=session,
target=target,
staging=staging,
pk_cols=pk_cols,
)
logger.info(
f"Table `{target}`: Merge replace delete phase completed in "
f"{_format_elapsed(perf_counter() - delete_started)}."
)
logger.info(f"Table `{target}`: Merge insert phase starting.")
insert_started = perf_counter()
cls._merge_insert(
session=session,
target=target,
staging=staging,
)
logger.info(
f"Table `{target}`: Merge insert phase completed in "
f"{_format_elapsed(perf_counter() - insert_started)}."
)
elif merge_strategy == "upsert":
logger.info(f"Table `{target}`: Merge upsert phase starting.")
upsert_started = perf_counter()
cls._merge_upsert(
session=session,
target=target,
staging=staging,
pk_cols=pk_cols,
)
logger.info(
f"Table `{target}`: Merge upsert phase completed in "
f"{_format_elapsed(perf_counter() - upsert_started)}."
)
elif merge_strategy == "insert_if_empty":
if not target_empty_confirmed:
logger.info(f"Table `{target}`: Checking whether target table is empty.")
check_started = perf_counter()
has_rows = cls._target_has_rows(
session=session,
target=target,
)
Comment thread
gkennos marked this conversation as resolved.
logger.info(
f"Table `{target}`: Empty-table check completed in "
f"{_format_elapsed(perf_counter() - check_started)}."
)

if has_rows:
raise ValueError(
f"Table `{target}` is not empty; cannot use merge strategy "
f"'insert_if_empty'"
)

logger.info(f"Table `{target}`: Merge insert-if-empty phase starting.")
insert_started = perf_counter()
cls._merge_insert(
session=session,
target=target,
staging=staging,
)
logger.info(
f"Table `{target}`: Merge insert-if-empty phase completed in "
f"{_format_elapsed(perf_counter() - insert_started)}."
)
else:
raise ValueError(f"Unknown merge strategy '{merge_strategy}'")

Expand Down
3 changes: 3 additions & 0 deletions src/orm_loader/tables/typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ def drop_staging_table(cls, session: so.Session) -> None: ...
@classmethod
def _merge_insert(cls, session: so.Session, target: str, staging: str) -> None: ...

@classmethod
def _target_has_rows(cls, session: so.Session, target: str) -> bool: ...

@classmethod
def _merge_replace(cls, session: so.Session, target: str, staging: str, pk_cols: list[str]) -> None: ...

Expand Down
Loading
Loading