From a80dc737c3954d3e4cf3a93af0beda138deca1c1 Mon Sep 17 00:00:00 2001 From: Mohamad Karim Date: Tue, 5 May 2026 22:58:26 +0330 Subject: [PATCH] feat: add exit node health checks and auto-failover MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Monitors all configured exit node URLs in the background and automatically switches to the next healthy URL when one goes down, then switches back to primary when it recovers. - Add _exit_node_health_loop: background task that pings all exit node URLs every health_check_interval seconds (default 30s) - Add _ping_exit_node: lightweight GET to detect reachability - Add _record_exit_node_failure / _record_exit_node_success: track consecutive failures per URL with cooldown (2x interval) - Add _try_exit_node_failover: switches _exit_node_url to next alive URL; clears it to "" when all are down so traffic silently falls back to Apps Script - Add _record_exit_node_success recovery: restores _exit_node_url from all-down state and switches back to primary when it recovers - Support urls[] list in exit_node config for fallback URLs - Promote first entry of urls[] if url field is empty - Bump version to 1.2.0 Config additions (all optional, backward-compatible): exit_node.urls — fallback URL list exit_node.health_check_interval — default 30s (min 10s) exit_node.health_check_failures_before_failover — default 3 --- .claude/settings.json | 7 ++ .claude/settings.local.json | 7 ++ .gitignore | 1 + README.md | 23 +++++ config.example.json | 5 +- src/core/constants.py | 2 +- src/relay/domain_fronter.py | 199 +++++++++++++++++++++++++++++++++++- 7 files changed, 240 insertions(+), 4 deletions(-) create mode 100644 .claude/settings.json create mode 100644 .claude/settings.local.json diff --git a/.claude/settings.json b/.claude/settings.json new file mode 100644 index 0000000..993d248 --- /dev/null +++ b/.claude/settings.json @@ -0,0 +1,7 @@ +{ + "permissions": { + "allow": [ + "Bash(python -c \"import sys; sys.path.insert\\(0,'src'\\); import relay.domain_fronter; import proxy.proxy_server; print\\('Full import OK'\\)\")" + ] + } +} diff --git a/.claude/settings.local.json b/.claude/settings.local.json new file mode 100644 index 0000000..c3b3be6 --- /dev/null +++ b/.claude/settings.local.json @@ -0,0 +1,7 @@ +{ + "permissions": { + "allow": [ + "Bash(python -c \"import sys; sys.path.insert\\(0,'src'\\); from relay.domain_fronter import DomainFronter; print\\('OK'\\)\")" + ] + } +} diff --git a/.gitignore b/.gitignore index 22162c8..05a49d6 100644 --- a/.gitignore +++ b/.gitignore @@ -37,3 +37,4 @@ domainfront_certs_*/ # Local scripts (excluded from git pushes) scripts/ +ideas.md diff --git a/README.md b/README.md index 8778806..c723407 100644 --- a/README.md +++ b/README.md @@ -216,6 +216,29 @@ Notes: - `mode: "selective"` = only domains in `hosts` go through exit node. - `psk` must exactly match your deployed exit node secret. +**Failover (multiple exit nodes):** Add a `urls` list to configure fallback exit nodes. The proxy monitors all of them with periodic health checks and automatically switches to the next available URL when one goes down: + +```json +"exit_node": { + "enabled": true, + "provider": "cloudflare", + "url": "https://PRIMARY-WORKER.YOUR-SUBDOMAIN.workers.dev", + "urls": [ + "https://PRIMARY-WORKER.YOUR-SUBDOMAIN.workers.dev", + "https://FALLBACK-WORKER.YOUR-SUBDOMAIN.workers.dev" + ], + "psk": "CHANGE_ME_TO_A_STRONG_SECRET", + "mode": "full", + "health_check_interval": 30, + "health_check_failures_before_failover": 3 +} +``` + +- `urls` — ordered list of exit node URLs; primary first, fallbacks after. +- `health_check_interval` — seconds between health checks (default: `30`). +- `health_check_failures_before_failover` — consecutive failures before a URL is marked dead and the next one is tried (default: `3`). +- When the primary URL recovers, the proxy automatically switches back to it. + Production recommendation: - Keep `verify_ssl: true` - Keep `listen_host: 127.0.0.1` unless LAN sharing is explicitly needed diff --git a/config.example.json b/config.example.json index 3539dec..9a0dbb7 100644 --- a/config.example.json +++ b/config.example.json @@ -89,11 +89,14 @@ "enabled": false, "provider": "cloudflare", "url": "", + "urls": [], "psk": "", "mode": "full", "hosts": [ "example.com", "example.org" - ] + ], + "health_check_interval": 30, + "health_check_failures_before_failover": 3 } } diff --git a/src/core/constants.py b/src/core/constants.py index 766fde2..b35e635 100644 --- a/src/core/constants.py +++ b/src/core/constants.py @@ -8,7 +8,7 @@ from __future__ import annotations # ── Version ─────────────────────────────────────────────────────────────── -__version__ = "1.1.0" +__version__ = "1.2.0" # ── Size caps ───────────────────────────────────────────────────────────── diff --git a/src/relay/domain_fronter.py b/src/relay/domain_fronter.py index ec208ca..7d6b767 100644 --- a/src/relay/domain_fronter.py +++ b/src/relay/domain_fronter.py @@ -228,6 +228,27 @@ def __init__(self, config: dict): for h in (en_cfg.get("hosts") or []) if h ) + + # Health check & auto-failover state. + # _exit_node_urls holds the full ordered list (primary first, then fallbacks). + # _exit_node_url always points to the *currently active* URL and is updated + # on failover so the rest of the relay code needs no changes. + self._exit_node_urls: list[str] = self._build_exit_node_url_list( + self._exit_node_url, en_cfg, + ) + # If url was empty but urls[] has entries, promote the first entry. + if not self._exit_node_url and self._exit_node_urls: + self._exit_node_url = self._exit_node_urls[0] + self._exit_node_failures: dict[str, int] = {} # url → consecutive failures + self._exit_node_dead_until: dict[str, float] = {} # url → cooldown timestamp + self._health_check_interval: float = max( + 10.0, float(en_cfg.get("health_check_interval", 30)), + ) + self._health_check_max_failures: int = max( + 1, int(en_cfg.get("health_check_failures_before_failover", 3)), + ) + self._exit_node_health_task: asyncio.Task | None = None + if self._exit_node_enabled and self._exit_node_url: log.info( "Exit node enabled [mode=%s, provider=%s]: %s", @@ -235,6 +256,16 @@ def __init__(self, config: dict): self._exit_node_provider, self._exit_node_url, ) + if len(self._exit_node_urls) > 1: + log.info( + "Exit node failover pool: %d URLs (health-check every %.0fs, " + "failover after %d failures)", + len(self._exit_node_urls), + self._health_check_interval, + self._health_check_max_failures, + ) + for i, u in enumerate(self._exit_node_urls): + log.info(" [%d] %s", i + 1, u) elif self._exit_node_enabled: log.warning( "Exit node is enabled but no URL is configured for provider '%s'", @@ -886,6 +917,12 @@ async def _warm_pool(self): self._stats_task = self._spawn(self._stats_logger()) if self._execution_task is None: self._execution_task = self._spawn(self._execution_logger()) + # Exit node health checker — runs for any configured exit node so that + # a single dead URL can also recover automatically via the loop. + if (self._exit_node_enabled + and len(self._exit_node_urls) >= 1 + and self._exit_node_health_task is None): + self._exit_node_health_task = self._spawn(self._exit_node_health_loop()) # Start H2 connection (runs alongside H1 pool) if self._h2: self._spawn(self._h2_connect_and_warm()) @@ -932,6 +969,7 @@ async def close(self): self._stats_task = None self._execution_task = None self._keepalive_task = None + self._exit_node_health_task = None await self._flush_pool() @@ -1172,6 +1210,159 @@ def _pick_from(mapping: dict[str, object], *keys: str) -> str: # Backward compatibility for older config format. return _pick_from(en_cfg, "relay_url") + @staticmethod + def _build_exit_node_url_list(primary: str, en_cfg: dict) -> list[str]: + """Return ordered list of exit node URLs: primary first, then fallbacks.""" + urls: list[str] = [] + seen: set[str] = set() + if primary: + urls.append(primary) + seen.add(primary) + for u in (en_cfg.get("urls") or []): + u = str(u).strip().rstrip("/") + if u and u not in seen: + urls.append(u) + seen.add(u) + return urls + + def _is_exit_node_dead(self, url: str) -> bool: + """True if url is in cooldown after repeated failures.""" + deadline = self._exit_node_dead_until.get(url, 0.0) + if deadline <= time.time(): + if deadline: + self._exit_node_dead_until.pop(url, None) + self._exit_node_failures.pop(url, None) + return False + return True + + def _record_exit_node_success(self, url: str) -> None: + was_failed = bool(self._exit_node_failures.get(url, 0) + or self._exit_node_dead_until.get(url, 0.0)) + self._exit_node_failures.pop(url, None) + self._exit_node_dead_until.pop(url, None) + + if was_failed: + log.info("Exit node recovered: %s", url.split("//", 1)[-1][:70]) + + # Case 1: all nodes were down (_exit_node_url cleared) — restore with + # whichever URL responded first. + if not self._exit_node_url: + self._exit_node_url = url + log.info("Exit node restored after all-down: %s", url.split("//", 1)[-1][:60]) + return + + # Case 2: primary recovered while we were on a fallback — switch back. + if (self._exit_node_urls + and url == self._exit_node_urls[0] + and self._exit_node_url != self._exit_node_urls[0]): + self._exit_node_url = self._exit_node_urls[0] + log.info( + "Switched back to primary exit node: %s", + url.split("//", 1)[-1][:60], + ) + + def _record_exit_node_failure(self, url: str) -> None: + # Skip if already in cooldown — avoid inflated counts and log spam. + if self._exit_node_dead_until.get(url, 0.0) > time.time(): + return + count = self._exit_node_failures.get(url, 0) + 1 + self._exit_node_failures[url] = count + if count >= self._health_check_max_failures: + cooldown = self._health_check_interval * 2 + self._exit_node_dead_until[url] = time.time() + cooldown + log.warning( + "Exit node marked dead for %.0fs after %d consecutive failures: %s", + cooldown, count, url[:70], + ) + self._try_exit_node_failover(url) + else: + log.debug( + "Exit node failure %d/%d: %s", + count, self._health_check_max_failures, url[:70], + ) + + def _try_exit_node_failover(self, failed_url: str) -> None: + """Switch _exit_node_url to the next alive URL in the pool.""" + if len(self._exit_node_urls) <= 1: + # No fallback — clear active URL so _exit_node_matches returns False + # until the health loop restores it after recovery. + self._exit_node_url = "" + log.error( + "Exit node %s is down and no fallback URLs are configured. " + "Traffic falls back to Apps Script until recovery.", + failed_url[:70], + ) + return + for url in self._exit_node_urls: + if url != failed_url and not self._is_exit_node_dead(url): + self._exit_node_url = url + log.warning( + "Exit node failover: %s → %s", + failed_url.split("//", 1)[-1][:50], + url.split("//", 1)[-1][:50], + ) + return + # All URLs are dead — clear active so traffic bypasses silently. + self._exit_node_url = "" + log.error( + "All %d exit node URLs are down. Traffic falls back to Apps Script " + "until the health check restores a URL.", + len(self._exit_node_urls), + ) + + async def _ping_exit_node(self, url: str) -> bool: + """Send a lightweight GET to url. Returns True if any HTTP response arrives.""" + try: + parsed = urlparse(url) + host = parsed.hostname or "" + port = parsed.port or (443 if parsed.scheme == "https" else 80) + use_ssl = parsed.scheme == "https" + timeout = min(self._health_check_interval / 3, 10.0) + ctx = self._ssl_ctx() if use_ssl else None + reader, writer = await asyncio.wait_for( + asyncio.open_connection(host, port, ssl=ctx), + timeout=timeout, + ) + try: + writer.write( + f"GET / HTTP/1.1\r\nHost: {host}\r\n" + f"Connection: close\r\nUser-Agent: MasterHttpRelay/healthcheck\r\n\r\n" + .encode() + ) + await asyncio.wait_for(writer.drain(), timeout=timeout) + line = await asyncio.wait_for(reader.readline(), timeout=timeout) + return line.startswith(b"HTTP/") + finally: + writer.close() + try: + await asyncio.wait_for(writer.wait_closed(), timeout=2.0) + except Exception: + pass + except Exception as exc: + log.debug("Exit node ping failed (%s): %s", url.split("//", 1)[-1][:50], exc) + return False + + async def _exit_node_health_loop(self) -> None: + """Background task: periodically ping all exit node URLs and failover as needed.""" + # Initial delay so the first check doesn't race with startup. + await asyncio.sleep(self._health_check_interval) + while True: + try: + for url in list(self._exit_node_urls): + alive = await self._ping_exit_node(url) + if alive: + self._record_exit_node_success(url) + else: + self._record_exit_node_failure(url) + except asyncio.CancelledError: + break + except Exception as exc: + log.debug("Exit node health loop error: %s", exc) + try: + await asyncio.sleep(self._health_check_interval) + except asyncio.CancelledError: + break + def _exit_node_matches(self, url: str) -> bool: """Return True if this URL should be routed through the exit node.""" if not self._exit_node_enabled or not self._exit_node_url: @@ -1203,6 +1394,7 @@ async def _relay_via_exit_node(self, payload: dict) -> bytes: body of the outer Apps Script relay call, so Apps Script POSTs it to the exit node URL on our behalf. """ + active_url = self._exit_node_url # Build inner payload: what the exit node will execute inner = dict(payload) inner["k"] = self._exit_node_psk @@ -1212,7 +1404,7 @@ async def _relay_via_exit_node(self, payload: dict) -> bytes: # Apps Script does: UrlFetchApp.fetch(exit_node_url, { method: "POST", payload: inner_json }) outer = self._build_payload( "POST", - self._exit_node_url, + active_url, {"Content-Type": "application/json"}, inner_json, ) @@ -1221,7 +1413,7 @@ async def _relay_via_exit_node(self, payload: dict) -> bytes: log.debug( "Exit node chain: Apps Script → %s → %s", - self._exit_node_url.split("//", 1)[-1][:50], + active_url.split("//", 1)[-1][:50], payload.get("u", "")[:60], ) @@ -1237,6 +1429,7 @@ async def _relay_via_exit_node(self, payload: dict) -> bytes: _, _, apps_script_body = split_raw_response(raw) result = parse_relay_response(apps_script_body, self._max_response_body_bytes) log.debug("Exit node relay OK: %s", payload.get("u", "")[:80]) + self._record_exit_node_success(active_url) return result # ── Apps Script relay (apps_script mode) ────────────────────── @@ -1277,6 +1470,7 @@ async def relay(self, method: str, url: str, if self._exit_node_matches(url): t0 = time.perf_counter() errored = False + _active_en_url = self._exit_node_url try: return await asyncio.wait_for( self._relay_via_exit_node(payload), @@ -1284,6 +1478,7 @@ async def relay(self, method: str, url: str, ) except Exception as exc: errored = True + self._record_exit_node_failure(_active_en_url) log.warning( "Exit node failed for %s (%s: %s), falling back to Apps Script", url[:60], type(exc).__name__, exc,