Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 47 additions & 20 deletions architecture/connections.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,64 @@ build_connection_factory(url: sqlalchemy.URL, timeout: float)
-> Callable[[], Awaitable[asyncpg.Connection]]
```

The `url` is translated into asyncpg connect args **once**, at factory-build
time, via `PGDialect_asyncpg().create_connect_args(url)`. `target_session_attrs`
is popped from those args and, if present, wrapped in asyncpg's
`SessionAttribute` enum — so a `target_session_attrs` carried on the DSN (e.g.
`read-write`/`prefer-standby` set by [`build_db_dsn`](dsn.md)) **is honored**,
not discarded.
`build_connection_factory` is the only public symbol of this capability;
`build_connection_plan` and `ConnectionPlan` are **internal seams** (not in
`__all__`) and not part of the public API.

## Host handling
## Connection plan (internal)

`host` and `port` are popped from the connect args:
`build_connection_plan(url)` is the pure parse phase — no I/O, no logging, no
`asyncpg.connect`. It calls `PGDialect_asyncpg().create_connect_args(url)` once,
pops `host`/`port`/`target_session_attrs` from the result, and returns a frozen,
slotted `ConnectionPlan`:

- **Multi-host** (both are lists): they are zipped into `(host, port)` pairs
(`strict=True` — lengths must match), the pair list is `random.shuffle`d for
load balancing, then split back into parallel `hosts`/`ports` lists. The
shuffled pair list is also retained for the failover path.
- **Single-host** (scalars): used as-is; the retained pair list is empty.
```python
class ConnectionPlan:
connect_args: Mapping[str, Any] # base kwargs, minus host/port/target_session_attrs
target_session_attrs: SessionAttribute | None
primary_host: str | list[str] # list for multi-host, scalar for single
primary_port: int | list[int] | None
failover: tuple[tuple[str, int], ...] # per-host pairs; () for single-host
```

`target_session_attrs` (e.g. `read-write`/`prefer-standby` set by
[`build_db_dsn`](dsn.md)) is wrapped in asyncpg's `SessionAttribute` enum so it
is **honored**, not discarded.

**Multi-host** (both `host` and `port` are lists): the hosts and ports are zipped
(`strict=True` — lengths must match), the resulting `(host, port)` pairs are
`random.shuffle`d **once** at build time, then split back into `primary_host` and
`primary_port` lists and also stored as `failover`. Key invariant:
`list(zip(primary_host, primary_port)) == list(failover)` — primary and failover
come from the **same** shuffle, so host/port stay paired throughout.

**Single-host** (scalars): `primary_host`/`primary_port` are the scalars;
`failover = ()`.

## Connect and failover

The returned `_connection_factory`:
`build_connection_factory` calls `build_connection_plan(url)` once at build time,
then returns `_connection_factory`, a thin async loop that consumes the plan via a
single `_connect(plan, host, port, timeout)` helper (one `asyncpg.connect` call
site):

1. Attempts one `asyncpg.connect(...)` against the full host/port set with the
given `timeout` and `target_session_attrs`. On success, returns immediately.
2. On `TimeoutError`, if there is no multi-host pair list it re-raises. With a
pair list, it logs a warning and falls through to host-by-host probing.
3. Re-shuffles a copy of the pairs and tries each `(host, port)` individually,
swallowing `TimeoutError`, `OSError`, and `asyncpg.TargetServerAttributeNotMatched`
1. Attempts `_connect` against `plan.primary_host`/`plan.primary_port` with the
given `timeout`. On success, returns immediately.
2. On `TimeoutError`, if `plan.failover` is empty (single-host), re-raises. With a
failover list, logs a warning and falls through.
3. Calls `_reshuffled(plan.failover)` — `random.sample(...)` returns a
**per-call** freshly-shuffled list, leaving the plan's `failover` tuple
untouched — and tries each `(host, port)` individually, swallowing
`TimeoutError`, `OSError`, and `asyncpg.TargetServerAttributeNotMatched`
(logging a warning per failed host) and returning the first that connects.
4. If every host fails, raises `asyncpg.TargetServerAttributeNotMatched` naming
the unmet `target_session_attrs`.

**Two shuffle lifetimes**: the bulk/primary order is shuffled **once** when
`build_connection_plan` runs (at factory-build time); the failover list is
re-shuffled **per connection** in the loop via `_reshuffled`. These are distinct:
the plan is immutable; the per-call re-shuffle is ephemeral.

The two-stage design lets the fast path use asyncpg's own multi-host attempt,
and only pays the per-host cost when the bulk attempt times out — typically when
`target_session_attrs` (e.g. `read-write`) excludes some hosts.
Expand Down
108 changes: 65 additions & 43 deletions db_retry/connections.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import dataclasses
import logging
import random
import typing
Expand All @@ -16,64 +17,85 @@
logger = logging.getLogger(__name__)


def build_connection_factory(
url: sqlalchemy.URL,
timeout: float,
) -> typing.Callable[[], typing.Awaitable["ConnectionType"]]:
connect_args: typing.Final[dict[str, typing.Any]] = PGDialect_asyncpg().create_connect_args(url)[1]
raw_target_session_attrs: typing.Final[str | None] = connect_args.pop("target_session_attrs", None)
target_session_attrs: typing.Final[SessionAttribute | None] = (
@dataclasses.dataclass(kw_only=True, frozen=True, slots=True)
class ConnectionPlan:
connect_args: typing.Mapping[str, typing.Any]
target_session_attrs: SessionAttribute | None
primary_host: str | list[str]
primary_port: int | list[int] | None
failover: tuple[tuple[str, int], ...]


def build_connection_plan(url: sqlalchemy.URL) -> ConnectionPlan:
connect_args: dict[str, typing.Any] = PGDialect_asyncpg().create_connect_args(url)[1]
raw_target_session_attrs: str | None = connect_args.pop("target_session_attrs", None)
target_session_attrs: SessionAttribute | None = (
SessionAttribute(raw_target_session_attrs) if raw_target_session_attrs else None
)

raw_hosts: typing.Final[str | list[str]] = connect_args.pop("host")
raw_ports: typing.Final[int | list[int] | None] = connect_args.pop("port", None)
hosts_and_ports: list[tuple[str, int]]
hosts: str | list[str]
ports: int | list[int] | None
raw_hosts: str | list[str] = connect_args.pop("host")
raw_ports: int | list[int] | None = connect_args.pop("port", None)
primary_host: str | list[str]
primary_port: int | list[int] | None
failover: tuple[tuple[str, int], ...]
if isinstance(raw_hosts, list) and isinstance(raw_ports, list):
hosts_and_ports = list(zip(raw_hosts, raw_ports, strict=True))
hosts_and_ports: list[tuple[str, int]] = list(zip(raw_hosts, raw_ports, strict=True))
random.shuffle(hosts_and_ports)
hosts = list(map(itemgetter(0), hosts_and_ports))
ports = list(map(itemgetter(1), hosts_and_ports))
primary_host = list(map(itemgetter(0), hosts_and_ports))
primary_port = list(map(itemgetter(1), hosts_and_ports))
failover = tuple(hosts_and_ports)
else:
hosts_and_ports = []
hosts = raw_hosts
ports = raw_ports
primary_host = raw_hosts
primary_port = raw_ports
failover = ()
return ConnectionPlan(
connect_args=connect_args,
target_session_attrs=target_session_attrs,
primary_host=primary_host,
primary_port=primary_port,
failover=failover,
)


async def _connect(
plan: ConnectionPlan,
host: str | list[str],
port: int | list[int] | None,
timeout: float, # noqa: ASYNC109
) -> "ConnectionType":
return await asyncpg.connect(
**plan.connect_args,
host=host,
port=port,
timeout=timeout,
target_session_attrs=plan.target_session_attrs,
)


def _reshuffled(failover: tuple[tuple[str, int], ...]) -> list[tuple[str, int]]:
return random.sample(failover, len(failover))


def build_connection_factory(
url: sqlalchemy.URL,
timeout: float,
) -> typing.Callable[[], typing.Awaitable["ConnectionType"]]:
plan: typing.Final = build_connection_plan(url)

async def _connection_factory() -> "ConnectionType":
connection: ConnectionType
nonlocal hosts_and_ports
try:
connection = await asyncpg.connect(
**connect_args,
host=hosts,
port=ports,
timeout=timeout,
target_session_attrs=target_session_attrs,
)
return connection # noqa: TRY300
return await _connect(plan, plan.primary_host, plan.primary_port, timeout)
except TimeoutError:
if not hosts_and_ports:
if not plan.failover:
raise

logger.warning("Failed to fetch asyncpg connection. Trying host by host.")

hosts_and_ports_copy: typing.Final = hosts_and_ports.copy()
random.shuffle(hosts_and_ports_copy)
for one_host, one_port in hosts_and_ports_copy:
for host, port in _reshuffled(plan.failover):
try:
connection = await asyncpg.connect(
**connect_args,
host=one_host,
port=one_port,
timeout=timeout,
target_session_attrs=target_session_attrs,
)
return connection # noqa: TRY300
return await _connect(plan, host, port, timeout)
except (TimeoutError, OSError, asyncpg.TargetServerAttributeNotMatched) as exc:
logger.warning("Failed to fetch asyncpg connection from %s, %s", one_host, exc)
msg: typing.Final = f"None of the hosts match the target attribute requirement {target_session_attrs}"
logger.warning("Failed to fetch asyncpg connection from %s, %s", host, exc)
msg: typing.Final = f"None of the hosts match the target attribute requirement {plan.target_session_attrs}"
raise asyncpg.TargetServerAttributeNotMatched(msg)

return _connection_factory
142 changes: 142 additions & 0 deletions planning/changes/2026-06-26.03-connection-plan-split/design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
---
summary: Extracted build_connection_plan(url) -> ConnectionPlan as a pure internal seam and reduced build_connection_factory to a thin failover loop, eliminating the duplicated asyncpg.connect call and enabling mock-free unit tests of host-plan parsing.
---

# Design: Split the connection factory into plan + failover loop

## Summary

`build_connection_factory` is one 79-line closure that entangles two concerns:
**pure** host-plan parsing/ordering (parse the URL into asyncpg connect args,
wrap `target_session_attrs`, pop host/port, zip + shuffle multi-host pairs) and
**async** connect-with-failover I/O — and it writes the `asyncpg.connect(...)`
call twice (the bulk attempt and the per-host loop). This change extracts the
pure half into a `build_connection_plan(url) -> ConnectionPlan` and leaves a thin
loop that consumes the plan through a single `_connect` helper. Parsing/ordering
becomes unit-testable with **no `asyncpg.connect` mock**; the connect call is
written **once**. Behaviour is preserved exactly, including the two-stage
strategy and its two-lifetime shuffle.

## Motivation

- The pure ordering logic (zip + shuffle + multi/single-host branch) is testable
today only by mocking `asyncpg.connect` and inferring order from call
arguments — there is no seam at the parse boundary.
- The `asyncpg.connect(...)` call is duplicated (the bulk attempt and the
per-host attempt), so a change to connect kwargs must be made in two places.
- The genuinely bug-prone part — keeping `host` and `port` **paired** through
`zip(strict=True)` and the `itemgetter` split — has no direct, mock-free test.

## Non-goals

- No behaviour change. Same connect kwargs, same two-stage primary→failover
strategy, same exceptions (`TimeoutError` reraised for single-host; per-host
swallowing of `TimeoutError`/`OSError`/`TargetServerAttributeNotMatched`;
`TargetServerAttributeNotMatched` raised when every host fails), same logs.
- No public-API change: `build_connection_plan` / `ConnectionPlan` are
**internal seams** (not added to `__init__.py`'s `__all__`);
`build_connection_factory` stays the only public symbol of this capability.
- No new file: the plan is an internal seam **within** `connections.py`, not a
separate module.
- Not touching the multi-host detection overlap with `dsn.py` (separate concern).
- Not changing the two-lifetime shuffle (bulk shuffled once at build; failover
re-shuffled per connection). If the build-once bulk shuffle later looks like a
load-balancing weakness, that is a separate behaviour decision for
`deferred.md`, not this refactor.

## Design

### 1. `ConnectionPlan` — the pure plan (internal)

A frozen, slotted, keyword-only dataclass (house style, as `Transaction`):

```python
@dataclasses.dataclass(kw_only=True, frozen=True, slots=True)
class ConnectionPlan:
connect_args: Mapping[str, Any] # base kwargs, minus host/port/target_session_attrs
target_session_attrs: SessionAttribute | None
primary_host: str | list[str] # bulk attempt: list for multi-host, scalar for single
primary_port: int | list[int] | None
failover: tuple[tuple[str, int], ...] # per-host pairs; () for single-host
```

`build_connection_plan(url: sqlalchemy.URL) -> ConnectionPlan` does the entire
parse phase, pure: `PGDialect_asyncpg().create_connect_args(url)`, pop + wrap
`target_session_attrs` into `SessionAttribute`, pop `host`/`port`, and:

- **multi-host** (both lists): `zip(strict=True)` into pairs, `random.shuffle`
once, derive `primary_host`/`primary_port` from the shuffled order, and keep
the same shuffled pairs as `failover`. So
`list(zip(primary_host, primary_port)) == list(failover)` — primary and
failover come from one shuffle, host/port paired throughout.
- **single-host** (scalars): `primary_host`/`primary_port` are the scalars;
`failover = ()`.

No timeout (an I/O concern), no logging, no `asyncpg.connect`.

### 2. `build_connection_factory` — the thin loop

```python
def build_connection_factory(url, timeout):
plan = build_connection_plan(url) # parse-once, at build time

async def _connection_factory():
try:
return await _connect(plan, plan.primary_host, plan.primary_port, timeout)
except TimeoutError:
if not plan.failover:
raise
logger.warning("Failed to fetch asyncpg connection. Trying host by host.")
for host, port in _reshuffled(plan.failover): # per-call re-shuffle
try:
return await _connect(plan, host, port, timeout)
except (TimeoutError, OSError, asyncpg.TargetServerAttributeNotMatched) as exc:
logger.warning("Failed to fetch asyncpg connection from %s, %s", host, exc)
raise asyncpg.TargetServerAttributeNotMatched(
f"None of the hosts match the target attribute requirement {plan.target_session_attrs}"
)

return _connection_factory
```

`_connect(plan, host, port, timeout)` is the single `asyncpg.connect(...)` call:

```python
async def _connect(plan, host, port, timeout):
return await asyncpg.connect(
**plan.connect_args, host=host, port=port,
timeout=timeout, target_session_attrs=plan.target_session_attrs,
)
```

`_reshuffled(failover)` returns a freshly-shuffled copy each call (the per-call
failover re-shuffle that lives in the loop, not the plan).

## Testing

- **New** mock-free `build_connection_plan` tests in
`tests/test_connection_factory.py`:
- multi-host: `set(failover) == {(h1,p1),(h2,p2)}`;
`list(zip(primary_host, primary_port)) == list(failover)` (pairing preserved);
`target_session_attrs` wrapped to `SessionAttribute`;
`connect_args` has no `host`/`port`/`target_session_attrs`.
- single-host: `failover == ()`; scalar `primary_host`/`primary_port`; tsa.
- These assert order-independent properties — no seed, no `asyncpg.connect` mock.
- **Keep** the four existing loop tests (they mock `asyncpg.connect` and prove
the failover orchestration: success, single-host timeout reraise, multi-host
all-fail → `TargetServerAttributeNotMatched`, primary-fail-then-failover
success). Plan tests and loop tests cover different things (parsing vs
orchestration) — complementary, not duplicative.
- `just test` (Docker Postgres) green; `just lint-ci` clean.

## Risk

- **Low. Behaviour-preserving refactor.** The parse/zip/shuffle/branch logic and
the connect kwargs are moved verbatim; the two-stage loop and its exception
handling are unchanged. Risk is accidental drift in the connect-arg assembly or
the single/multi branch — mitigated by writing the mock-free plan tests first
(they pin the pairing and branch) and keeping the four loop tests as a
live/mock backstop on orchestration.
- **Shuffle lifetimes**: the per-call failover re-shuffle must stay in the loop
(not the once-built plan), or load-balancing behaviour changes — called out
explicitly so the implementer does not fold it into `build_connection_plan`.
Loading