From 021e0e767efa4b34215ce91f2eee8cfc7f089dd3 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 26 Jun 2026 23:27:31 +0300 Subject: [PATCH 1/5] docs: plan connection-plan split (Full-lane bundle) Co-Authored-By: Claude Opus 4.8 (1M context) --- .../design.md | 142 +++++++++++++++++ .../plan.md | 144 ++++++++++++++++++ 2 files changed, 286 insertions(+) create mode 100644 planning/changes/2026-06-26.03-connection-plan-split/design.md create mode 100644 planning/changes/2026-06-26.03-connection-plan-split/plan.md diff --git a/planning/changes/2026-06-26.03-connection-plan-split/design.md b/planning/changes/2026-06-26.03-connection-plan-split/design.md new file mode 100644 index 0000000..45418eb --- /dev/null +++ b/planning/changes/2026-06-26.03-connection-plan-split/design.md @@ -0,0 +1,142 @@ +--- +summary: Split build_connection_factory into a pure build_connection_plan(url) -> ConnectionPlan and a thin failover loop, so host-plan parsing/ordering is unit-tested without mocking asyncpg.connect and the connect call is written once. +--- + +# Design: Split the connection factory into plan + failover loop + +## Summary + +`build_connection_factory` is one 79-line closure that entangles two concerns: +**pure** host-plan parsing/ordering (parse the URL into asyncpg connect args, +wrap `target_session_attrs`, pop host/port, zip + shuffle multi-host pairs) and +**async** connect-with-failover I/O — and it writes the `asyncpg.connect(...)` +call twice (the bulk attempt and the per-host loop). This change extracts the +pure half into a `build_connection_plan(url) -> ConnectionPlan` and leaves a thin +loop that consumes the plan through a single `_connect` helper. Parsing/ordering +becomes unit-testable with **no `asyncpg.connect` mock**; the connect call is +written **once**. Behaviour is preserved exactly, including the two-stage +strategy and its two-lifetime shuffle. + +## Motivation + +- The pure ordering logic (zip + shuffle + multi/single-host branch) is testable + today only by mocking `asyncpg.connect` and inferring order from call + arguments — there is no seam at the parse boundary. +- The `asyncpg.connect(...)` call is duplicated (the bulk attempt and the + per-host attempt), so a change to connect kwargs must be made in two places. +- The genuinely bug-prone part — keeping `host` and `port` **paired** through + `zip(strict=True)` and the `itemgetter` split — has no direct, mock-free test. + +## Non-goals + +- No behaviour change. Same connect kwargs, same two-stage primary→failover + strategy, same exceptions (`TimeoutError` reraised for single-host; per-host + swallowing of `TimeoutError`/`OSError`/`TargetServerAttributeNotMatched`; + `TargetServerAttributeNotMatched` raised when every host fails), same logs. +- No public-API change: `build_connection_plan` / `ConnectionPlan` are + **internal seams** (not added to `__init__.py`'s `__all__`); + `build_connection_factory` stays the only public symbol of this capability. +- No new file: the plan is an internal seam **within** `connections.py`, not a + separate module. +- Not touching the multi-host detection overlap with `dsn.py` (separate concern). +- Not changing the two-lifetime shuffle (bulk shuffled once at build; failover + re-shuffled per connection). If the build-once bulk shuffle later looks like a + load-balancing weakness, that is a separate behaviour decision for + `deferred.md`, not this refactor. + +## Design + +### 1. `ConnectionPlan` — the pure plan (internal) + +A frozen, slotted, keyword-only dataclass (house style, as `Transaction`): + +```python +@dataclasses.dataclass(kw_only=True, frozen=True, slots=True) +class ConnectionPlan: + connect_args: Mapping[str, Any] # base kwargs, minus host/port/target_session_attrs + target_session_attrs: SessionAttribute | None + primary_host: str | list[str] # bulk attempt: list for multi-host, scalar for single + primary_port: int | list[int] | None + failover: tuple[tuple[str, int], ...] # per-host pairs; () for single-host +``` + +`build_connection_plan(url: sqlalchemy.URL) -> ConnectionPlan` does the entire +parse phase, pure: `PGDialect_asyncpg().create_connect_args(url)`, pop + wrap +`target_session_attrs` into `SessionAttribute`, pop `host`/`port`, and: + +- **multi-host** (both lists): `zip(strict=True)` into pairs, `random.shuffle` + once, derive `primary_host`/`primary_port` from the shuffled order, and keep + the same shuffled pairs as `failover`. So + `list(zip(primary_host, primary_port)) == list(failover)` — primary and + failover come from one shuffle, host/port paired throughout. +- **single-host** (scalars): `primary_host`/`primary_port` are the scalars; + `failover = ()`. + +No timeout (an I/O concern), no logging, no `asyncpg.connect`. + +### 2. `build_connection_factory` — the thin loop + +```python +def build_connection_factory(url, timeout): + plan = build_connection_plan(url) # parse-once, at build time + + async def _connection_factory(): + try: + return await _connect(plan, plan.primary_host, plan.primary_port, timeout) + except TimeoutError: + if not plan.failover: + raise + logger.warning("Failed to fetch asyncpg connection. Trying host by host.") + for host, port in _reshuffled(plan.failover): # per-call re-shuffle + try: + return await _connect(plan, host, port, timeout) + except (TimeoutError, OSError, asyncpg.TargetServerAttributeNotMatched) as exc: + logger.warning("Failed to fetch asyncpg connection from %s, %s", host, exc) + raise asyncpg.TargetServerAttributeNotMatched( + f"None of the hosts match the target attribute requirement {plan.target_session_attrs}" + ) + + return _connection_factory +``` + +`_connect(plan, host, port, timeout)` is the single `asyncpg.connect(...)` call: + +```python +async def _connect(plan, host, port, timeout): + return await asyncpg.connect( + **plan.connect_args, host=host, port=port, + timeout=timeout, target_session_attrs=plan.target_session_attrs, + ) +``` + +`_reshuffled(failover)` returns a freshly-shuffled copy each call (the per-call +failover re-shuffle that lives in the loop, not the plan). + +## Testing + +- **New** mock-free `build_connection_plan` tests in + `tests/test_connection_factory.py`: + - multi-host: `set(failover) == {(h1,p1),(h2,p2)}`; + `list(zip(primary_host, primary_port)) == list(failover)` (pairing preserved); + `target_session_attrs` wrapped to `SessionAttribute`; + `connect_args` has no `host`/`port`/`target_session_attrs`. + - single-host: `failover == ()`; scalar `primary_host`/`primary_port`; tsa. + - These assert order-independent properties — no seed, no `asyncpg.connect` mock. +- **Keep** the four existing loop tests (they mock `asyncpg.connect` and prove + the failover orchestration: success, single-host timeout reraise, multi-host + all-fail → `TargetServerAttributeNotMatched`, primary-fail-then-failover + success). Plan tests and loop tests cover different things (parsing vs + orchestration) — complementary, not duplicative. +- `just test` (Docker Postgres) green; `just lint-ci` clean. + +## Risk + +- **Low. Behaviour-preserving refactor.** The parse/zip/shuffle/branch logic and + the connect kwargs are moved verbatim; the two-stage loop and its exception + handling are unchanged. Risk is accidental drift in the connect-arg assembly or + the single/multi branch — mitigated by writing the mock-free plan tests first + (they pin the pairing and branch) and keeping the four loop tests as a + live/mock backstop on orchestration. +- **Shuffle lifetimes**: the per-call failover re-shuffle must stay in the loop + (not the once-built plan), or load-balancing behaviour changes — called out + explicitly so the implementer does not fold it into `build_connection_plan`. diff --git a/planning/changes/2026-06-26.03-connection-plan-split/plan.md b/planning/changes/2026-06-26.03-connection-plan-split/plan.md new file mode 100644 index 0000000..11543aa --- /dev/null +++ b/planning/changes/2026-06-26.03-connection-plan-split/plan.md @@ -0,0 +1,144 @@ +# connection-plan-split — implementation plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use +> superpowers:subagent-driven-development (recommended) or +> superpowers:executing-plans to implement this plan task-by-task. Steps +> use checkbox (`- [ ]`) syntax for tracking. New behaviour/tests are TDD — +> failing test first. + +**Goal:** Extract a pure `build_connection_plan(url) -> ConnectionPlan` out of +`build_connection_factory`, leaving a thin failover loop with a single `_connect` +helper — parsing/ordering testable without mocking `asyncpg.connect`, connect +call written once, behaviour unchanged. + +**Spec:** [`design.md`](./design.md) + +**Branch:** `feat/connection-plan-split` + +**Commit strategy:** Per-task commits. + +--- + +### Task 1: Pure plan + mock-free tests, TDD + +**Files:** +- Modify: `tests/test_connection_factory.py` (add plan tests) +- Modify: `db_retry/connections.py` (add `ConnectionPlan` + `build_connection_plan`) + +Introduce the pure plan, built test-first. Do NOT yet rewire +`build_connection_factory` (Task 2) — add the new code alongside the existing +closure. + +- [ ] **Step 1: RED — write the plan tests** + + In `tests/test_connection_factory.py`, add mock-free tests importing + `build_connection_plan` (and `ConnectionPlan`) from `db_retry.connections`: + - multi-host DSN (`host=host1:5432&host=host2:5432`, with a + `target_session_attrs`): assert `set(plan.failover) == {("host1",5432),("host2",5432)}`; + `list(zip(plan.primary_host, plan.primary_port)) == list(plan.failover)` + (host/port stay paired through the shuffle); `plan.target_session_attrs == + SessionAttribute("read-write")` (or whatever the DSN sets); `"host"`, + `"port"`, `"target_session_attrs"` are NOT in `plan.connect_args`. + - single-host DSN: `plan.failover == ()`; `plan.primary_host == "host"` (scalar); + `plan.primary_port` is the scalar/None; tsa correct. + Run `uv run pytest tests/test_connection_factory.py -k plan` → fails + (ImportError / AttributeError). No DB needed for these. + +- [ ] **Step 2: GREEN — write the plan** + + In `db_retry/connections.py` add the `ConnectionPlan` dataclass + (`kw_only=True, frozen=True, slots=True`, `connect_args: Mapping`, + `target_session_attrs`, `primary_host`, `primary_port`, `failover`) and the + pure `build_connection_plan(url)` per the spec: `create_connect_args`, pop + + wrap tsa, pop host/port, multi-host → `zip(strict=True)` + `random.shuffle` + once → derive primary + failover from the same shuffled pairs; single-host → + scalars + `failover=()`. No timeout, no logging, no `asyncpg.connect`. + + Run `uv run pytest tests/test_connection_factory.py -k plan` → pass. No mock. + +- [ ] **Step 3: Commit** + + ```bash + git add db_retry/connections.py tests/test_connection_factory.py + git commit -m "feat: extract pure build_connection_plan from connection factory" + ``` + +--- + +### Task 2: Rewire the factory onto the plan + single `_connect` + +**Files:** +- Modify: `db_retry/connections.py` + +Replace the factory's inline parse + duplicated connect with the plan + a single +`_connect` helper. Behaviour-preserving. + +- [ ] **Step 1: Edit** + + - Add `async def _connect(plan, host, port, timeout)` — the single + `asyncpg.connect(**plan.connect_args, host=host, port=port, timeout=timeout, + target_session_attrs=plan.target_session_attrs)` call. + - Add `_reshuffled(failover)` — returns a freshly shuffled copy each call + (e.g. `random.sample(failover, len(failover))`), preserving the per-call + failover re-shuffle. + - Rewrite `build_connection_factory`: `plan = build_connection_plan(url)` once; + the closure tries `_connect(plan, plan.primary_host, plan.primary_port, + timeout)`, on `TimeoutError` reraises if `not plan.failover` else logs and + falls through; iterates `_reshuffled(plan.failover)` calling `_connect`, + swallowing `(TimeoutError, OSError, asyncpg.TargetServerAttributeNotMatched)` + with the per-host warning; raises `TargetServerAttributeNotMatched` naming + `plan.target_session_attrs` if all fail. + - Remove the now-inlined parse code and the duplicated connect. Drop imports + that are now only used inside `build_connection_plan` if they end up unused + at module scope (most stay). Keep the warning log strings verbatim. + - Keep the per-call re-shuffle in the loop — do NOT move it into + `build_connection_plan`. + +- [ ] **Step 2: Verify** + + - `just test` (Docker Postgres) → all green, including the four existing loop + tests (success / single-host reraise / multi-host all-fail / fail-then-success). + - `just lint-ci` → clean. + +- [ ] **Step 3: Commit** + + ```bash + git add db_retry/connections.py + git commit -m "refactor: drive connection factory through ConnectionPlan and one _connect" + ``` + +--- + +### Task 3: Promote the architecture doc + finalize bundle + +**Files:** +- Modify: `architecture/connections.md` +- Modify: `planning/changes/2026-06-26.03-connection-plan-split/design.md` (summary) + +- [ ] **Step 1: Edit `architecture/connections.md`** + + Document the split: `build_connection_plan` → `ConnectionPlan` (primary + + failover, host/port paired through the shuffle), the single `_connect` helper, + and that the per-call failover re-shuffle lives in the loop while the bulk + shuffle is computed once at build. Keep the two-stage rationale and the + signature of `build_connection_factory` (unchanged public surface). No + frontmatter. + +- [ ] **Step 2: Finalize summary + verify planning** + + Set `design.md` `summary:` to the realized result. `just check-planning` → + `planning: OK`. + +- [ ] **Step 3: Commit** + + ```bash + git add architecture/connections.md planning/ + git commit -m "docs: promote connection-plan split into architecture/connections.md" + ``` + +--- + +### Task 4: Ship + +- [ ] **Step 1:** `just lint-ci` and `just test` both green. +- [ ] **Step 2:** Push `feat/connection-plan-split`, open PR, watch CI. From 011f4ce9dfeaf3248c63a03f08b154080030c9c3 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 26 Jun 2026 23:36:37 +0300 Subject: [PATCH 2/5] feat: extract pure build_connection_plan from connection factory --- db_retry/connections.py | 40 ++++++++++++++++++++++++++++++++ tests/test_connection_factory.py | 31 ++++++++++++++++++++++++- 2 files changed, 70 insertions(+), 1 deletion(-) diff --git a/db_retry/connections.py b/db_retry/connections.py index 4198b43..1f4c765 100644 --- a/db_retry/connections.py +++ b/db_retry/connections.py @@ -1,3 +1,4 @@ +import dataclasses import logging import random import typing @@ -16,6 +17,45 @@ logger = logging.getLogger(__name__) +@dataclasses.dataclass(kw_only=True, frozen=True, slots=True) +class ConnectionPlan: + connect_args: typing.Mapping[str, typing.Any] + target_session_attrs: SessionAttribute | None + primary_host: str | list[str] + primary_port: int | list[int] | None + failover: tuple[tuple[str, int], ...] + + +def build_connection_plan(url: sqlalchemy.URL) -> ConnectionPlan: + connect_args: dict[str, typing.Any] = PGDialect_asyncpg().create_connect_args(url)[1] + raw_target_session_attrs: str | None = connect_args.pop("target_session_attrs", None) + target_session_attrs: SessionAttribute | None = ( + SessionAttribute(raw_target_session_attrs) if raw_target_session_attrs else None + ) + raw_hosts: str | list[str] = connect_args.pop("host") + raw_ports: int | list[int] | None = connect_args.pop("port", None) + primary_host: str | list[str] + primary_port: int | list[int] | None + failover: tuple[tuple[str, int], ...] + if isinstance(raw_hosts, list) and isinstance(raw_ports, list): + hosts_and_ports: list[tuple[str, int]] = list(zip(raw_hosts, raw_ports, strict=True)) + random.shuffle(hosts_and_ports) + primary_host = list(map(itemgetter(0), hosts_and_ports)) + primary_port = list(map(itemgetter(1), hosts_and_ports)) + failover = tuple(hosts_and_ports) + else: + primary_host = raw_hosts + primary_port = raw_ports + failover = () + return ConnectionPlan( + connect_args=connect_args, + target_session_attrs=target_session_attrs, + primary_host=primary_host, + primary_port=primary_port, + failover=failover, + ) + + def build_connection_factory( url: sqlalchemy.URL, timeout: float, diff --git a/tests/test_connection_factory.py b/tests/test_connection_factory.py index 507f062..74b994a 100644 --- a/tests/test_connection_factory.py +++ b/tests/test_connection_factory.py @@ -5,9 +5,10 @@ import asyncpg import pytest import sqlalchemy +from asyncpg.connect_utils import SessionAttribute from sqlalchemy.ext import asyncio as sa_async -from db_retry.connections import build_connection_factory +from db_retry.connections import ConnectionPlan, build_connection_factory, build_connection_plan async def test_connection_factory_success() -> None: @@ -63,3 +64,31 @@ async def test_connection_factory_failure_and_success(monkeypatch: pytest.Monkey factory: typing.Final = build_connection_factory(url=url, timeout=1.0) result = await factory() assert result is mock_connection + + +def test_build_connection_plan_multihost() -> None: + url: typing.Final = sqlalchemy.make_url( + "postgresql+asyncpg://user:password@/database?host=host1:5432&host=host2:5432&target_session_attrs=read-write" + ) + plan: typing.Final[ConnectionPlan] = build_connection_plan(url) + assert set(plan.failover) == {("host1", 5432), ("host2", 5432)} + assert isinstance(plan.primary_host, list) + assert isinstance(plan.primary_port, list) + assert list(zip(plan.primary_host, plan.primary_port, strict=True)) == list(plan.failover) + assert plan.target_session_attrs == SessionAttribute("read-write") + assert "host" not in plan.connect_args + assert "port" not in plan.connect_args + assert "target_session_attrs" not in plan.connect_args + + +def test_build_connection_plan_single_host() -> None: + port: typing.Final = 5432 + url: typing.Final = sqlalchemy.make_url(f"postgresql+asyncpg://user:password@host1:{port}/database") + plan: typing.Final[ConnectionPlan] = build_connection_plan(url) + assert plan.failover == () + assert plan.primary_host == "host1" + assert plan.primary_port == port + assert plan.target_session_attrs is None + assert "host" not in plan.connect_args + assert "port" not in plan.connect_args + assert "target_session_attrs" not in plan.connect_args From a224c268ab10b2e4acde0019b17cb20300740ca6 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 26 Jun 2026 23:44:18 +0300 Subject: [PATCH 3/5] refactor: drive connection factory through ConnectionPlan and one _connect --- db_retry/connections.py | 70 +++++++++++++++-------------------------- 1 file changed, 26 insertions(+), 44 deletions(-) diff --git a/db_retry/connections.py b/db_retry/connections.py index 1f4c765..45be158 100644 --- a/db_retry/connections.py +++ b/db_retry/connections.py @@ -56,64 +56,46 @@ def build_connection_plan(url: sqlalchemy.URL) -> ConnectionPlan: ) +async def _connect( + plan: ConnectionPlan, + host: str | list[str], + port: int | list[int] | None, + timeout: float, # noqa: ASYNC109 +) -> "ConnectionType": + return await asyncpg.connect( + **plan.connect_args, + host=host, + port=port, + timeout=timeout, + target_session_attrs=plan.target_session_attrs, + ) + + +def _reshuffled(failover: tuple[tuple[str, int], ...]) -> list[tuple[str, int]]: + return random.sample(failover, len(failover)) + + def build_connection_factory( url: sqlalchemy.URL, timeout: float, ) -> typing.Callable[[], typing.Awaitable["ConnectionType"]]: - connect_args: typing.Final[dict[str, typing.Any]] = PGDialect_asyncpg().create_connect_args(url)[1] - raw_target_session_attrs: typing.Final[str | None] = connect_args.pop("target_session_attrs", None) - target_session_attrs: typing.Final[SessionAttribute | None] = ( - SessionAttribute(raw_target_session_attrs) if raw_target_session_attrs else None - ) - - raw_hosts: typing.Final[str | list[str]] = connect_args.pop("host") - raw_ports: typing.Final[int | list[int] | None] = connect_args.pop("port", None) - hosts_and_ports: list[tuple[str, int]] - hosts: str | list[str] - ports: int | list[int] | None - if isinstance(raw_hosts, list) and isinstance(raw_ports, list): - hosts_and_ports = list(zip(raw_hosts, raw_ports, strict=True)) - random.shuffle(hosts_and_ports) - hosts = list(map(itemgetter(0), hosts_and_ports)) - ports = list(map(itemgetter(1), hosts_and_ports)) - else: - hosts_and_ports = [] - hosts = raw_hosts - ports = raw_ports + plan: typing.Final = build_connection_plan(url) async def _connection_factory() -> "ConnectionType": - connection: ConnectionType - nonlocal hosts_and_ports try: - connection = await asyncpg.connect( - **connect_args, - host=hosts, - port=ports, - timeout=timeout, - target_session_attrs=target_session_attrs, - ) - return connection # noqa: TRY300 + return await _connect(plan, plan.primary_host, plan.primary_port, timeout) except TimeoutError: - if not hosts_and_ports: + if not plan.failover: raise logger.warning("Failed to fetch asyncpg connection. Trying host by host.") - hosts_and_ports_copy: typing.Final = hosts_and_ports.copy() - random.shuffle(hosts_and_ports_copy) - for one_host, one_port in hosts_and_ports_copy: + for host, port in _reshuffled(plan.failover): try: - connection = await asyncpg.connect( - **connect_args, - host=one_host, - port=one_port, - timeout=timeout, - target_session_attrs=target_session_attrs, - ) - return connection # noqa: TRY300 + return await _connect(plan, host, port, timeout) except (TimeoutError, OSError, asyncpg.TargetServerAttributeNotMatched) as exc: - logger.warning("Failed to fetch asyncpg connection from %s, %s", one_host, exc) - msg: typing.Final = f"None of the hosts match the target attribute requirement {target_session_attrs}" + logger.warning("Failed to fetch asyncpg connection from %s, %s", host, exc) + msg: typing.Final = f"None of the hosts match the target attribute requirement {plan.target_session_attrs}" raise asyncpg.TargetServerAttributeNotMatched(msg) return _connection_factory From 0ab9a6cf21d6b14c6dcedd78521ef7331754f551 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 26 Jun 2026 23:51:47 +0300 Subject: [PATCH 4/5] docs: promote connection-plan split into architecture/connections.md Co-Authored-By: Claude Sonnet 4.6 --- architecture/connections.md | 66 +++++++++++++------ .../design.md | 2 +- 2 files changed, 47 insertions(+), 21 deletions(-) diff --git a/architecture/connections.md b/architecture/connections.md index 0e6cb28..531bd67 100644 --- a/architecture/connections.md +++ b/architecture/connections.md @@ -12,37 +12,63 @@ build_connection_factory(url: sqlalchemy.URL, timeout: float) -> Callable[[], Awaitable[asyncpg.Connection]] ``` -The `url` is translated into asyncpg connect args **once**, at factory-build -time, via `PGDialect_asyncpg().create_connect_args(url)`. `target_session_attrs` -is popped from those args and, if present, wrapped in asyncpg's -`SessionAttribute` enum — so a `target_session_attrs` carried on the DSN (e.g. -`read-write`/`prefer-standby` set by [`build_db_dsn`](dsn.md)) **is honored**, -not discarded. +`build_connection_factory` is the only public symbol of this capability; +`build_connection_plan` and `ConnectionPlan` are **internal seams** (not in +`__all__`) and not part of the public API. -## Host handling +## Connection plan (internal) -`host` and `port` are popped from the connect args: +`build_connection_plan(url)` is the pure parse phase — no I/O, no logging, no +`asyncpg.connect`. It calls `PGDialect_asyncpg().create_connect_args(url)` once, +pops `host`/`port`/`target_session_attrs` from the result, and returns a frozen, +slotted `ConnectionPlan`: -- **Multi-host** (both are lists): they are zipped into `(host, port)` pairs - (`strict=True` — lengths must match), the pair list is `random.shuffle`d for - load balancing, then split back into parallel `hosts`/`ports` lists. The - shuffled pair list is also retained for the failover path. -- **Single-host** (scalars): used as-is; the retained pair list is empty. +```python +class ConnectionPlan: + connect_args: Mapping[str, Any] # base kwargs, minus host/port/target_session_attrs + target_session_attrs: SessionAttribute | None + primary_host: str | list[str] # list for multi-host, scalar for single + primary_port: int | list[int] | None + failover: tuple[tuple[str, int], ...] # per-host pairs; () for single-host +``` + +`target_session_attrs` (e.g. `read-write`/`prefer-standby` set by +[`build_db_dsn`](dsn.md)) is wrapped in asyncpg's `SessionAttribute` enum so it +is **honored**, not discarded. + +**Multi-host** (both `host` and `port` are lists): the hosts and ports are zipped +(`strict=True` — lengths must match), the resulting `(host, port)` pairs are +`random.shuffle`d **once** at build time, then split back into `primary_host` and +`primary_port` lists and also stored as `failover`. Key invariant: +`list(zip(primary_host, primary_port)) == list(failover)` — primary and failover +come from the **same** shuffle, so host/port stay paired throughout. + +**Single-host** (scalars): `primary_host`/`primary_port` are the scalars; +`failover = ()`. ## Connect and failover -The returned `_connection_factory`: +`build_connection_factory` calls `build_connection_plan(url)` once at build time, +then returns `_connection_factory`, a thin async loop that consumes the plan via a +single `_connect(plan, host, port, timeout)` helper (one `asyncpg.connect` call +site): -1. Attempts one `asyncpg.connect(...)` against the full host/port set with the - given `timeout` and `target_session_attrs`. On success, returns immediately. -2. On `TimeoutError`, if there is no multi-host pair list it re-raises. With a - pair list, it logs a warning and falls through to host-by-host probing. -3. Re-shuffles a copy of the pairs and tries each `(host, port)` individually, - swallowing `TimeoutError`, `OSError`, and `asyncpg.TargetServerAttributeNotMatched` +1. Attempts `_connect` against `plan.primary_host`/`plan.primary_port` with the + given `timeout`. On success, returns immediately. +2. On `TimeoutError`, if `plan.failover` is empty (single-host), re-raises. With a + failover list, logs a warning and falls through. +3. Calls `_reshuffled(plan.failover)` — a **per-call** shuffle of a copy of the + failover pairs — and tries each `(host, port)` individually, swallowing + `TimeoutError`, `OSError`, and `asyncpg.TargetServerAttributeNotMatched` (logging a warning per failed host) and returning the first that connects. 4. If every host fails, raises `asyncpg.TargetServerAttributeNotMatched` naming the unmet `target_session_attrs`. +**Two shuffle lifetimes**: the bulk/primary order is shuffled **once** when +`build_connection_plan` runs (at factory-build time); the failover list is +re-shuffled **per connection** in the loop via `_reshuffled`. These are distinct: +the plan is immutable; the per-call re-shuffle is ephemeral. + The two-stage design lets the fast path use asyncpg's own multi-host attempt, and only pays the per-host cost when the bulk attempt times out — typically when `target_session_attrs` (e.g. `read-write`) excludes some hosts. diff --git a/planning/changes/2026-06-26.03-connection-plan-split/design.md b/planning/changes/2026-06-26.03-connection-plan-split/design.md index 45418eb..61fb2b0 100644 --- a/planning/changes/2026-06-26.03-connection-plan-split/design.md +++ b/planning/changes/2026-06-26.03-connection-plan-split/design.md @@ -1,5 +1,5 @@ --- -summary: Split build_connection_factory into a pure build_connection_plan(url) -> ConnectionPlan and a thin failover loop, so host-plan parsing/ordering is unit-tested without mocking asyncpg.connect and the connect call is written once. +summary: Extracted build_connection_plan(url) -> ConnectionPlan as a pure internal seam and reduced build_connection_factory to a thin failover loop, eliminating the duplicated asyncpg.connect call and enabling mock-free unit tests of host-plan parsing. --- # Design: Split the connection factory into plan + failover loop From 6b9a3a031d10a63adda6641ef3d1e3272f4c61d0 Mon Sep 17 00:00:00 2001 From: Artur Shiriev Date: Fri, 26 Jun 2026 23:56:15 +0300 Subject: [PATCH 5/5] docs: describe _reshuffled as random.sample precisely Co-Authored-By: Claude Opus 4.8 (1M context) --- architecture/connections.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/architecture/connections.md b/architecture/connections.md index 531bd67..6c52e59 100644 --- a/architecture/connections.md +++ b/architecture/connections.md @@ -57,8 +57,9 @@ site): given `timeout`. On success, returns immediately. 2. On `TimeoutError`, if `plan.failover` is empty (single-host), re-raises. With a failover list, logs a warning and falls through. -3. Calls `_reshuffled(plan.failover)` — a **per-call** shuffle of a copy of the - failover pairs — and tries each `(host, port)` individually, swallowing +3. Calls `_reshuffled(plan.failover)` — `random.sample(...)` returns a + **per-call** freshly-shuffled list, leaving the plan's `failover` tuple + untouched — and tries each `(host, port)` individually, swallowing `TimeoutError`, `OSError`, and `asyncpg.TargetServerAttributeNotMatched` (logging a warning per failed host) and returning the first that connects. 4. If every host fails, raises `asyncpg.TargetServerAttributeNotMatched` naming