From 08817642e8a70528f29f74a74c2a60b5e99d755f Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Wed, 13 May 2026 07:28:12 +0800 Subject: [PATCH 1/7] Handle LongBridge account and submit failures locally --- application/execution_service.py | 118 ++++++++++++++++++-- application/longbridge_execution.py | 65 +++++++++++ application/longbridge_portfolio.py | 142 +++++++++++++++++++++++++ application/runtime_broker_adapters.py | 7 +- main.py | 9 +- tests/test_longbridge_local_helpers.py | 128 ++++++++++++++++++++++ tests/test_rebalance_service.py | 119 +++++++++++++++++++++ tests/test_runtime_broker_adapters.py | 2 +- 8 files changed, 578 insertions(+), 12 deletions(-) create mode 100644 application/longbridge_execution.py create mode 100644 application/longbridge_portfolio.py create mode 100644 tests/test_longbridge_local_helpers.py diff --git a/application/execution_service.py b/application/execution_service.py index 0239d1e..a644b6c 100644 --- a/application/execution_service.py +++ b/application/execution_service.py @@ -2,6 +2,7 @@ from __future__ import annotations +import math import traceback from collections.abc import Mapping from dataclasses import dataclass @@ -222,6 +223,7 @@ def execute_rebalance_cycle( quantities = dict(portfolio["quantities"]) sellable_quantities = dict(portfolio["sellable_quantities"]) target_values = dict(allocation["targets"]) + cash_sweep_symbol = str(portfolio.get("cash_sweep_symbol") or "").strip().upper() available_cash = float(portfolio["liquid_cash"]) cash_by_currency = _normalize_cash_by_currency(portfolio.get("cash_by_currency")) investable_cash = float(execution["investable_cash"]) @@ -311,6 +313,41 @@ def record_dry_run(symbol, side, quantity, price, *, order_type): print(with_prefix(message), flush=True) return True + def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): + if not cash_sweep_symbol or max_quantity <= 0: + return 0 + cash_sweep_price = safe_quote_last_price( + f"{cash_sweep_symbol}.US", + market_data_port=market_data_port, + notify_issue=notify_issue, + ) + if cash_sweep_price is None or cash_sweep_price <= 0.0: + return 0 + base_buying_power = max(0.0, float(investable_cash)) + for buy_symbol in candidate_symbols: + underweight_value = target_values[buy_symbol] - market_values[buy_symbol] + if underweight_value <= threshold_value: + continue + buy_price = safe_quote_last_price( + f"{buy_symbol}.US", + market_data_port=market_data_port, + notify_issue=notify_issue, + ) + if buy_price is None: + continue + ask = round(buy_price * limit_buy_premium, 2) if buy_symbol in limit_order_symbols else round(buy_price, 2) + max_buy_quantity = int(underweight_value // ask) + if max_buy_quantity <= 0: + continue + required_buying_power = max_buy_quantity * ask + if base_buying_power >= required_buying_power: + return 0 + return min( + max_quantity, + max(1, math.ceil((required_buying_power - base_buying_power) / cash_sweep_price)), + ) + return 0 + for symbol in strategy_assets: diff = target_values[symbol] - market_values[symbol] if diff < -threshold_value and abs(diff) > current_min_trade: @@ -373,18 +410,85 @@ def record_dry_run(symbol, side, quantity, price, *, order_type): if submitted: action_done = True sell_submitted = True - elif sellable_quantities[symbol] <= 0 and quantities[symbol] > 0: - record_skip_log( - skip_logs, - translator=translator, - with_prefix=with_prefix, + elif sellable_quantities[symbol] <= 0 and quantities[symbol] > 0: + record_skip_log( + skip_logs, + translator=translator, + with_prefix=with_prefix, kind="sell_skipped", detail=( f"Symbol: {symbol}.US Diff: ${abs(diff):.2f} " - f"Held: {quantities[symbol]} Sellable: {sellable_quantities[symbol]} " - f"(no sellable)" + f"Held: {quantities[symbol]} Sellable: {sellable_quantities[symbol]} " + f"(no sellable)" + ), + ) + + buy_candidates = [ + symbol + for symbol in strategy_assets + if (target_values[symbol] - market_values[symbol]) > threshold_value + and abs(target_values[symbol] - market_values[symbol]) > current_min_trade + ] + funding_buy_candidates = [ + symbol + for symbol in buy_candidates + if symbol != cash_sweep_symbol + ] + if ( + not sell_submitted + and funding_buy_candidates + and cash_sweep_symbol + and sellable_quantities.get(cash_sweep_symbol, 0.0) > 0.0 + ): + sweep_quantity = cash_sweep_sale_quantity_to_fund_buy( + int(sellable_quantities[cash_sweep_symbol]), + funding_buy_candidates, + ) + if sweep_quantity > 0: + sweep_price = round( + float( + safe_quote_last_price( + f"{cash_sweep_symbol}.US", + market_data_port=market_data_port, + notify_issue=notify_issue, + ) + or 0.0 + ), + 2, + ) + if dry_run_only: + submitted = record_dry_run( + f"{cash_sweep_symbol}.US", + "sell", + format_quantity(sweep_quantity), + sweep_price, + order_type="market", + ) + else: + submitted = submit_order_via_port( + f"{cash_sweep_symbol}.US", + "market", + "sell", + sweep_quantity, + translator( + "market_sell", + symbol=cash_sweep_symbol, + qty=format_quantity(sweep_quantity), + price=sweep_price, ), ) + if submitted: + action_done = True + sell_submitted = True + cash_sweep_sold_this_cycle = True + if dry_run_only: + dry_run_sale_events.append( + ( + cash_sweep_symbol, + sweep_quantity, + sweep_quantity * sweep_price, + ) + ) if sell_submitted: previous_investable_cash = investable_cash diff --git a/application/longbridge_execution.py b/application/longbridge_execution.py new file mode 100644 index 0000000..4e7193c --- /dev/null +++ b/application/longbridge_execution.py @@ -0,0 +1,65 @@ +from __future__ import annotations + +import time +from typing import Any + +from quant_platform_kit.common.models import ExecutionReport + +_qpk_submit_order = None + + +def _get_qpk_submit_order(): + global _qpk_submit_order + if _qpk_submit_order is None: + from quant_platform_kit.longbridge.execution import submit_order as qpk_submit_order + + _qpk_submit_order = qpk_submit_order + return _qpk_submit_order + + +def _get_error_code(exc: Exception) -> str | None: + code = getattr(exc, "code", None) + if code is None: + payload = getattr(exc, "args", None) + if payload: + text = " ".join(str(item) for item in payload) + if "603203" in text: + return "603203" + return str(code).strip() if code is not None else None + + +def _is_retriable_internal_error(exc: Exception) -> bool: + code = _get_error_code(exc) + if code == "603203": + return True + message = str(exc).lower() + return "internal server error" in message + + +def submit_order( + t_ctx: Any, + symbol: str, + *, + order_kind: str, + side: str, + quantity: float, + submitted_price: float | None = None, +) -> ExecutionReport: + last_error: Exception | None = None + for attempt in range(2): + try: + return _get_qpk_submit_order()( + t_ctx, + symbol, + order_kind=order_kind, + side=side, + quantity=quantity, + submitted_price=submitted_price, + ) + except Exception as exc: + last_error = exc + if attempt == 0 and _is_retriable_internal_error(exc): + time.sleep(0.5) + continue + raise + raise last_error or RuntimeError("LongBridge submit_order failed") diff --git a/application/longbridge_portfolio.py b/application/longbridge_portfolio.py new file mode 100644 index 0000000..7abbb33 --- /dev/null +++ b/application/longbridge_portfolio.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +from collections.abc import Callable, Iterable +from typing import Any + + +def _normalize_symbol(symbol: str) -> str: + return str(symbol or "").strip().upper() + + +def _fetch_last_prices(q_ctx: Any, symbols: list[str]) -> dict[str, float]: + normalized_symbols = [] + for symbol in symbols: + normalized = _normalize_symbol(symbol) + if normalized: + normalized_symbols.append(normalized) + normalized_symbols = list(dict.fromkeys(normalized_symbols)) + if not normalized_symbols: + return {} + + quotes = list(q_ctx.quote(normalized_symbols) or []) + prices: dict[str, float] = {} + for index, quote in enumerate(quotes): + fallback_symbol = normalized_symbols[index] if index < len(normalized_symbols) else "" + quoted_symbol = _normalize_symbol(getattr(quote, "symbol", "") or fallback_symbol) + if not quoted_symbol: + continue + last_done = getattr(quote, "last_done", None) + if last_done is None: + continue + try: + prices[quoted_symbol] = float(last_done) + except (TypeError, ValueError): + continue + return prices + + +def fetch_strategy_account_state( + q_ctx: Any, + t_ctx: Any, + strategy_assets: Iterable[str], + *, + position_log_fn: Callable[[str], None] | None = None, + warning_log_fn: Callable[[str], None] | None = None, +) -> dict[str, Any]: + def warn(message: str) -> None: + if warning_log_fn is not None: + warning_log_fn(message) + + available_cash = 0.0 + cash_by_currency: dict[str, float] = {} + try: + account_balance = t_ctx.account_balance() + except Exception as exc: + warn( + "[longbridge_account_balance_failed] " + f"error_type={type(exc).__name__} error={exc}" + ) + account_balance = () + for account in account_balance: + for cash_info in getattr(account, "cash_infos", []): + currency = str(getattr(cash_info, "currency", "") or "").strip().upper() + if not currency: + continue + cash_amount = float(getattr(cash_info, "available_cash", 0.0)) + cash_by_currency[currency] = cash_by_currency.get(currency, 0.0) + cash_amount + if currency == "USD": + available_cash += cash_amount + + assets = [str(symbol).strip().upper() for symbol in strategy_assets if str(symbol).strip()] + market_values = {symbol: 0.0 for symbol in assets} + quantities = {symbol: 0.0 for symbol in assets} + sellable_quantities = {symbol: 0.0 for symbol in assets} + filter_enabled = bool(assets) + + position_rows: list[tuple[str, str, Any, Any]] = [] + try: + positions_response = t_ctx.stock_positions() + except Exception as exc: + warn( + "[longbridge_stock_positions_failed] " + f"error_type={type(exc).__name__} error={exc}" + ) + positions_response = None + if positions_response and hasattr(positions_response, "channels"): + for channel in positions_response.channels: + for position in getattr(channel, "positions", []): + full_symbol = str(getattr(position, "symbol", "") or "").strip().upper() + if not full_symbol: + continue + root_symbol = full_symbol.split(".")[0].strip().upper() + if filter_enabled and root_symbol not in market_values: + continue + if root_symbol not in market_values: + market_values[root_symbol] = 0.0 + quantities[root_symbol] = 0.0 + sellable_quantities[root_symbol] = 0.0 + + raw_quantity = getattr(position, "quantity", 0) + raw_available_quantity = getattr(position, "available_quantity", raw_quantity) + if raw_quantity is None: + raw_quantity = 0 + if raw_available_quantity is None: + raw_available_quantity = raw_quantity + if position_log_fn is not None: + position_log_fn( + "[position_snapshot] raw " + f"symbol={root_symbol} full_symbol={full_symbol} " + f"quantity={raw_quantity} available_quantity={raw_available_quantity}" + ) + + position_rows.append((root_symbol, full_symbol, raw_quantity, raw_available_quantity)) + + prices = _fetch_last_prices(q_ctx, [full_symbol for _root_symbol, full_symbol, _quantity, _available in position_rows]) + for root_symbol, full_symbol, raw_quantity, raw_available_quantity in position_rows: + last_price = prices.get(full_symbol) + if last_price is None: + continue + + quantity = float(raw_quantity) + available_quantity = float(raw_available_quantity) + market_values[root_symbol] += quantity * last_price + quantities[root_symbol] += quantity + sellable_quantities[root_symbol] += available_quantity + + if position_log_fn is not None: + for symbol in assets or tuple(sorted(quantities)): + position_log_fn( + "[position_snapshot] aggregate " + f"symbol={symbol} quantity={quantities.get(symbol, 0.0)} " + f"sellable_quantity={sellable_quantities.get(symbol, 0.0)} " + f"market_value={market_values.get(symbol, 0.0):.2f}" + ) + + return { + "available_cash": available_cash, + "cash_by_currency": cash_by_currency, + "market_values": market_values, + "quantities": quantities, + "sellable_quantities": sellable_quantities, + "total_strategy_equity": available_cash + sum(market_values.values()), + } diff --git a/application/runtime_broker_adapters.py b/application/runtime_broker_adapters.py index 3c2307e..ee736fc 100644 --- a/application/runtime_broker_adapters.py +++ b/application/runtime_broker_adapters.py @@ -10,6 +10,9 @@ import pandas as pd from quant_platform_kit.common.models import PricePoint, PriceSeries, QuoteSnapshot +from quant_platform_kit.common.runtime_inputs import ( + DEFAULT_SEMICONDUCTOR_ROTATION_HISTORY_LOOKBACK, +) from quant_platform_kit.common.port_adapters import ( CallableExecutionPort, CallableMarketDataPort, @@ -34,7 +37,7 @@ class LongBridgeBrokerAdapters: fetch_strategy_account_state_fn: Callable[[Any, Any], Mapping[str, Any]] submit_order_fn: Callable[..., Any] clock: Callable[[], datetime] = _utcnow - price_history_lookback: int = 260 + price_history_lookback: int = DEFAULT_SEMICONDUCTOR_ROTATION_HISTORY_LOOKBACK def normalize_market_symbol(self, symbol: str) -> str: value = str(symbol or "").strip().upper() @@ -208,7 +211,7 @@ def build_runtime_broker_adapters( fetch_strategy_account_state_fn: Callable[[Any, Any], Mapping[str, Any]], submit_order_fn: Callable[..., Any], clock: Callable[[], datetime] = _utcnow, - price_history_lookback: int = 260, + price_history_lookback: int = DEFAULT_SEMICONDUCTOR_ROTATION_HISTORY_LOOKBACK, ) -> LongBridgeBrokerAdapters: return LongBridgeBrokerAdapters( strategy_symbols=tuple(strategy_symbols), diff --git a/main.py b/main.py index 2564e67..0c0becd 100644 --- a/main.py +++ b/main.py @@ -15,6 +15,8 @@ from application.runtime_composer import build_runtime_composer from application.rebalance_service import run_strategy as run_rebalance_cycle from application.runtime_strategy_adapters import build_runtime_strategy_adapters +from application.longbridge_execution import submit_order +from application.longbridge_portfolio import fetch_strategy_account_state from entrypoints.cloud_run import is_market_open_now from runtime_config_support import load_platform_runtime_settings from notifications.telegram import build_signal_text, build_strategy_display_name, build_translator @@ -37,10 +39,8 @@ estimate_max_purchase_quantity, fetch_last_price, fetch_order_status, - fetch_strategy_account_state, fetch_token_from_secret, refresh_token_if_needed, - submit_order, ) from strategy_runtime import load_strategy_runtime from decision_mapper import map_strategy_decision_to_plan @@ -110,6 +110,10 @@ def log_position_snapshot(message): print(f"[{ACCOUNT_REGION}] {message}", flush=True) +def log_runtime_warning(message): + print(f"[{ACCOUNT_REGION}] [warning] {message}", flush=True) + + BROKER_ADAPTERS = build_runtime_broker_adapters( strategy_symbols=tuple(MANAGED_SYMBOLS), account_hash=ACCOUNT_PREFIX or ACCOUNT_REGION or "longbridge", @@ -123,6 +127,7 @@ def log_position_snapshot(message): if getattr(RUNTIME_SETTINGS, "debug_position_snapshot", False) else None ), + warning_log_fn=log_runtime_warning, ), submit_order_fn=submit_order, ) diff --git a/tests/test_longbridge_local_helpers.py b/tests/test_longbridge_local_helpers.py new file mode 100644 index 0000000..143af10 --- /dev/null +++ b/tests/test_longbridge_local_helpers.py @@ -0,0 +1,128 @@ +import sys +import types +import unittest +from pathlib import Path +from unittest.mock import patch + + +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) +QPK_SRC = ROOT.parent / "QuantPlatformKit" / "src" +if str(QPK_SRC) not in sys.path: + sys.path.insert(0, str(QPK_SRC)) + +from application.longbridge_execution import submit_order +from application.longbridge_portfolio import fetch_strategy_account_state +from quant_platform_kit.common.models import ExecutionReport + + +class FakeQuoteContext: + def __init__(self): + self.quote_calls = [] + + def quote(self, symbols): + self.quote_calls.append(tuple(symbols)) + prices = {"SOXL.US": 50.0, "QQQI.US": 20.0} + return [ + type("Quote", (), {"symbol": symbol, "last_done": prices[symbol]})() + for symbol in symbols + ] + + +class FakePosition: + def __init__(self, symbol, quantity, available_quantity=None): + self.symbol = symbol + self.quantity = quantity + self.available_quantity = available_quantity if available_quantity is not None else quantity + + +class FakeChannel: + def __init__(self, positions): + self.positions = positions + + +class FakePositionsResponse: + def __init__(self): + self.channels = [FakeChannel([FakePosition("SOXL.US", 3), FakePosition("QQQI.US", 2, 1)])] + + +class LongBridgeLocalHelpersTests(unittest.TestCase): + def test_fetch_strategy_account_state_falls_back_when_account_balance_fails(self): + class BalanceFailingTradeContext: + def account_balance(self): + raise RuntimeError("boom") + + def stock_positions(self): + return FakePositionsResponse() + + warnings = [] + state = fetch_strategy_account_state( + FakeQuoteContext(), + BalanceFailingTradeContext(), + ["SOXL", "QQQI", "SPYI"], + warning_log_fn=warnings.append, + ) + + self.assertEqual(state["available_cash"], 0.0) + self.assertEqual(state["cash_by_currency"], {}) + self.assertEqual(state["market_values"]["SOXL"], 150.0) + self.assertEqual(state["quantities"]["QQQI"], 2) + self.assertEqual(state["sellable_quantities"]["QQQI"], 1) + self.assertEqual(state["total_strategy_equity"], 190.0) + self.assertEqual( + warnings, + [ + "[longbridge_account_balance_failed] error_type=RuntimeError error=boom", + ], + ) + + def test_submit_order_retries_once_on_internal_server_error(self): + longport_module = types.ModuleType("longport") + openapi_module = types.ModuleType("longport.openapi") + openapi_module.OrderSide = types.SimpleNamespace(Buy="Buy", Sell="Sell") + openapi_module.OrderType = types.SimpleNamespace(LO="LO", MO="MO") + openapi_module.TimeInForceType = types.SimpleNamespace(Day="Day") + + attempts = {"count": 0} + + def fake_submit_order(*_args, **_kwargs): + attempts["count"] += 1 + if attempts["count"] == 1: + exc = RuntimeError("internal server error") + exc.code = 603203 + raise exc + return ExecutionReport( + symbol="BOXX", + side="sell", + quantity=1.0, + status="submitted", + broker_order_id="OID-1", + raw_payload={}, + ) + + ctx = types.SimpleNamespace(submit_order=fake_submit_order) + with patch.dict( + sys.modules, + {"longport": longport_module, "longport.openapi": openapi_module}, + ): + with patch("application.longbridge_execution.time.sleep", lambda _seconds: None): + with patch( + "application.longbridge_execution._qpk_submit_order", + fake_submit_order, + ): + report = submit_order( + ctx, + "BOXX.US", + order_kind="market", + side="sell", + quantity=4.6177, + ) + + self.assertEqual(report.status, "submitted") + self.assertEqual(report.broker_order_id, "OID-1") + self.assertEqual(attempts["count"], 2) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_rebalance_service.py b/tests/test_rebalance_service.py index 83552cb..433289c 100644 --- a/tests/test_rebalance_service.py +++ b/tests/test_rebalance_service.py @@ -108,6 +108,7 @@ def _build_plan( "sellable_quantities": dict(sellable_quantities), "total_equity": float(total_strategy_equity), "liquid_cash": float(available_cash), + "cash_sweep_symbol": (safe_haven_symbols[0] if safe_haven_symbols else None), "cash_by_currency": dict(cash_by_currency or {}), }, "execution": { @@ -952,6 +953,124 @@ def test_refreshes_account_state_after_sell_and_can_place_followup_buy(self): self.assertNotIn("买入跳过", sent_messages[0]) self.assertEqual(len(observed_plan_inputs), 2) + def test_cash_sweep_symbol_can_fund_buy_when_investable_cash_is_zero(self): + initial_plan = _build_plan( + strategy_symbols=("SOXL", "SOXX", "BOXX"), + risk_symbols=("SOXL", "SOXX"), + safe_haven_symbols=("BOXX",), + targets={"SOXL": 500.0, "SOXX": 0.0, "BOXX": 1000.0}, + market_values={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 1000.0}, + sellable_quantities={"SOXL": 0, "SOXX": 0, "BOXX": 10}, + quantities={"SOXL": 0, "SOXX": 0, "BOXX": 10}, + current_min_trade=100.0, + trade_threshold_value=100.0, + investable_cash=0.0, + market_status="🧯 过热降档(SOXX)", + deploy_ratio_text="15.0%", + income_ratio_text="0.0%", + income_locked_ratio_text="0.0%", + signal_message="SOXX 仍在 140 日门槛线上方,但触发过热降档,目标仓位 SOXL 15.0%", + available_cash=0.0, + total_strategy_equity=1000.0, + portfolio_rows=(("SOXL", "SOXX"), ("BOXX",)), + ) + refreshed_plan = _build_plan( + strategy_symbols=("SOXL", "SOXX", "BOXX"), + risk_symbols=("SOXL", "SOXX"), + safe_haven_symbols=("BOXX",), + targets={"SOXL": 500.0, "SOXX": 0.0, "BOXX": 1000.0}, + market_values={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 500.0}, + sellable_quantities={"SOXL": 0, "SOXX": 0, "BOXX": 5}, + quantities={"SOXL": 0, "SOXX": 0, "BOXX": 5}, + current_min_trade=100.0, + trade_threshold_value=100.0, + investable_cash=500.0, + market_status="🧯 过热降档(SOXX)", + deploy_ratio_text="15.0%", + income_ratio_text="0.0%", + income_locked_ratio_text="0.0%", + signal_message="SOXX 仍在 140 日门槛线上方,但触发过热降档,目标仓位 SOXL 15.0%", + available_cash=500.0, + total_strategy_equity=1000.0, + portfolio_rows=(("SOXL", "SOXX"), ("BOXX",)), + ) + before_sell_snapshot = _build_snapshot(initial_plan, phase="before_cash_sweep") + after_sell_snapshot = _build_snapshot(refreshed_plan, phase="after_cash_sweep") + sent_messages, observed_snapshots, observed_plan_inputs = self._run_strategy( + initial_plan, + refreshed_plan=refreshed_plan, + portfolio_snapshots=[before_sell_snapshot, after_sell_snapshot], + prices={"SOXL.US": 100.0, "SOXX.US": 200.0, "BOXX.US": 100.0}, + estimate_max_purchase_quantity_value=10, + ) + + self.assertEqual(observed_snapshots, [before_sell_snapshot, after_sell_snapshot]) + self.assertEqual(len(observed_plan_inputs), 2) + self.assertEqual(len(sent_messages), 1) + self.assertIn("BOXX", sent_messages[0]) + self.assertIn("市价卖出", sent_messages[0]) + self.assertIn("限价买入", sent_messages[0]) + self.assertIn("SOXL", sent_messages[0]) + + def test_cash_sweep_symbol_can_fund_buy_when_investable_cash_is_positive_but_short(self): + initial_plan = _build_plan( + strategy_symbols=("SOXL", "SOXX", "BOXX"), + risk_symbols=("SOXL", "SOXX"), + safe_haven_symbols=("BOXX",), + targets={"SOXL": 500.0, "SOXX": 0.0, "BOXX": 1000.0}, + market_values={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 1000.0}, + sellable_quantities={"SOXL": 0, "SOXX": 0, "BOXX": 10}, + quantities={"SOXL": 0, "SOXX": 0, "BOXX": 10}, + current_min_trade=100.0, + trade_threshold_value=100.0, + investable_cash=200.0, + market_status="🧯 过热降档(SOXX)", + deploy_ratio_text="15.0%", + income_ratio_text="0.0%", + income_locked_ratio_text="0.0%", + signal_message="SOXX 仍在 140 日门槛线上方,但触发过热降档,目标仓位 SOXL 15.0%", + available_cash=200.0, + total_strategy_equity=1000.0, + portfolio_rows=(("SOXL", "SOXX"), ("BOXX",)), + ) + refreshed_plan = _build_plan( + strategy_symbols=("SOXL", "SOXX", "BOXX"), + risk_symbols=("SOXL", "SOXX"), + safe_haven_symbols=("BOXX",), + targets={"SOXL": 500.0, "SOXX": 0.0, "BOXX": 1000.0}, + market_values={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 700.0}, + sellable_quantities={"SOXL": 0, "SOXX": 0, "BOXX": 7}, + quantities={"SOXL": 0, "SOXX": 0, "BOXX": 7}, + current_min_trade=100.0, + trade_threshold_value=100.0, + investable_cash=700.0, + market_status="🧯 过热降档(SOXX)", + deploy_ratio_text="15.0%", + income_ratio_text="0.0%", + income_locked_ratio_text="0.0%", + signal_message="SOXX 仍在 140 日门槛线上方,但触发过热降档,目标仓位 SOXL 15.0%", + available_cash=700.0, + total_strategy_equity=1000.0, + portfolio_rows=(("SOXL", "SOXX"), ("BOXX",)), + ) + before_sell_snapshot = _build_snapshot(initial_plan, phase="before_cash_sweep_short") + after_sell_snapshot = _build_snapshot(refreshed_plan, phase="after_cash_sweep_short") + sent_messages, observed_snapshots, observed_plan_inputs = self._run_strategy( + initial_plan, + refreshed_plan=refreshed_plan, + portfolio_snapshots=[before_sell_snapshot, after_sell_snapshot], + prices={"SOXL.US": 100.0, "SOXX.US": 200.0, "BOXX.US": 100.0}, + estimate_max_purchase_quantity_value=10, + ) + + self.assertEqual(observed_snapshots, [before_sell_snapshot, after_sell_snapshot]) + self.assertEqual(len(observed_plan_inputs), 2) + self.assertEqual(len(sent_messages), 1) + self.assertIn("BOXX", sent_messages[0]) + self.assertIn("市价卖出", sent_messages[0]) + self.assertIn("限价买入", sent_messages[0]) + self.assertIn("SOXL", sent_messages[0]) + def test_retries_account_refresh_after_sell_until_buying_power_updates(self): initial_plan = _build_plan( strategy_profile="tqqq_growth_income", diff --git a/tests/test_runtime_broker_adapters.py b/tests/test_runtime_broker_adapters.py index ac2edab..a229379 100644 --- a/tests/test_runtime_broker_adapters.py +++ b/tests/test_runtime_broker_adapters.py @@ -58,7 +58,7 @@ def candlesticks(self, symbol, period, lookback, adjust_type): assert quote_a.last_price == 125.67 assert quote_a == quote_b assert observed["quotes"] == ["SOXL.US"] - assert observed["history"] == [("SOXL.US", "day", 260, "forward")] + assert observed["history"] == [("SOXL.US", "day", 420, "forward")] assert series.symbol == "SOXL.US" assert [point.close for point in series.points] == [123.45, 125.67] From 3e780c947f0d4905f00dcbe9d7291f649caa1281 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Wed, 13 May 2026 07:29:45 +0800 Subject: [PATCH 2/7] Fix LongBridge cash sweep ruff issues --- application/execution_service.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/application/execution_service.py b/application/execution_service.py index a644b6c..72a9aa3 100644 --- a/application/execution_service.py +++ b/application/execution_service.py @@ -480,15 +480,6 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): if submitted: action_done = True sell_submitted = True - cash_sweep_sold_this_cycle = True - if dry_run_only: - dry_run_sale_events.append( - ( - cash_sweep_symbol, - sweep_quantity, - sweep_quantity * sweep_price, - ) - ) if sell_submitted: previous_investable_cash = investable_cash From 90f055f1ec378a09b01d390abded10789b482ddf Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Wed, 13 May 2026 07:31:15 +0800 Subject: [PATCH 3/7] Add fallback for legacy semiconductor lookback constant --- application/runtime_broker_adapters.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/application/runtime_broker_adapters.py b/application/runtime_broker_adapters.py index ee736fc..de25b1a 100644 --- a/application/runtime_broker_adapters.py +++ b/application/runtime_broker_adapters.py @@ -10,9 +10,6 @@ import pandas as pd from quant_platform_kit.common.models import PricePoint, PriceSeries, QuoteSnapshot -from quant_platform_kit.common.runtime_inputs import ( - DEFAULT_SEMICONDUCTOR_ROTATION_HISTORY_LOOKBACK, -) from quant_platform_kit.common.port_adapters import ( CallableExecutionPort, CallableMarketDataPort, @@ -24,6 +21,13 @@ build_portfolio_snapshot_from_account_state, ) +try: + from quant_platform_kit.common.runtime_inputs import ( + DEFAULT_SEMICONDUCTOR_ROTATION_HISTORY_LOOKBACK, + ) +except ImportError: # pragma: no cover - compatibility with older packaged wheels + DEFAULT_SEMICONDUCTOR_ROTATION_HISTORY_LOOKBACK = 420 + def _utcnow() -> datetime: return datetime.now(timezone.utc) From 4293fc8b2423ca1bf75b619b89828dbfef88688e Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Wed, 13 May 2026 12:15:45 +0800 Subject: [PATCH 4/7] Add LongBridge backfill route --- main.py | 18 ++++++++-- tests/test_request_handling.py | 48 +++++++++++++++++++++++++-- tests/test_shared_chat_id_fallback.py | 39 ++++++++++++++++++++++ 3 files changed, 100 insertions(+), 5 deletions(-) diff --git a/main.py b/main.py index 0c0becd..5c1bf8b 100644 --- a/main.py +++ b/main.py @@ -192,7 +192,7 @@ def build_composer(): ) -def run_strategy(): +def run_strategy(*, force_run: bool = False): composer = build_composer() reporting_adapters = composer.build_reporting_adapters() log_context, report = reporting_adapters.start_run() @@ -224,7 +224,7 @@ def run_strategy(): error_message=str(error), ) print(composer.with_prefix(f"Market hours check failed: {error}"), flush=True) - if not market_open: + if not market_open and not force_run: reporting_adapters.log_event( log_context, "outside_market_hours", @@ -239,6 +239,13 @@ def run_strategy(): ) print(composer.with_prefix("Outside market hours; skip."), flush=True) return + if force_run and not market_open: + reporting_adapters.log_event( + log_context, + "market_hours_bypassed", + message="Market hours bypassed for backfill execution", + ) + print(composer.with_prefix("Market hours bypassed for backfill execution."), flush=True) run_rebalance_cycle( runtime=composer.build_rebalance_runtime(), config=composer.build_rebalance_config(strategy_plugin_signals=strategy_plugin_signals), @@ -285,5 +292,12 @@ def handle_trigger(): return "OK", 200 +@app.route("/backfill", methods=["POST", "GET"]) +def handle_backfill(): + """Manual backfill entrypoint that bypasses market-hours guards.""" + run_strategy(force_run=True) + return "OK", 200 + + if __name__ == "__main__": app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080))) diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index 888ad78..a540613 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -7,9 +7,6 @@ from pathlib import Path from unittest.mock import patch -from quant_platform_kit.common.runtime_target import build_runtime_target - - ROOT = Path(__file__).resolve().parents[1] if str(ROOT) not in sys.path: sys.path.insert(0, str(ROOT)) @@ -17,6 +14,8 @@ if str(PLATFORM_KIT_SRC) not in sys.path: sys.path.insert(0, str(PLATFORM_KIT_SRC)) +from quant_platform_kit.common.runtime_target import build_runtime_target + @contextmanager def install_stub_modules(): @@ -103,6 +102,9 @@ def run(self, *args, **kwargs): google_module.auth = google_auth_module google_cloud_module.secretmanager_v1 = google_secretmanager_module + pandas_module = types.ModuleType("pandas") + pandas_module.Timestamp = lambda value=None: value + pandas_market_calendars = types.ModuleType("pandas_market_calendars") strategy_runtime_module = types.ModuleType("strategy_runtime") @@ -132,6 +134,11 @@ def run(self, *args, **kwargs): ): setattr(openapi_module, name, type(name, (), {})) + us_equity_strategies_module = types.ModuleType("us_equity_strategies") + us_equity_strategies_module.__path__ = [] + catalog_module = types.ModuleType("us_equity_strategies.catalog") + catalog_module.resolve_canonical_profile = lambda profile: profile + modules = { "flask": flask_module, "requests": requests_module, @@ -142,10 +149,13 @@ def run(self, *args, **kwargs): "google.auth": google_auth_module, "google.cloud": google_cloud_module, "google.cloud.secretmanager_v1": google_secretmanager_module, + "pandas": pandas_module, "pandas_market_calendars": pandas_market_calendars, "strategy_runtime": strategy_runtime_module, "longport": longport_module, "longport.openapi": openapi_module, + "us_equity_strategies": us_equity_strategies_module, + "us_equity_strategies.catalog": catalog_module, } original = {name: sys.modules.get(name) for name in modules} sys.modules.update(modules) @@ -205,6 +215,22 @@ def fake_run_strategy(): self.assertEqual(body, "OK") self.assertTrue(observed["called"]) + def test_handle_backfill_forces_strategy_run(self): + module = load_module() + observed = {"force_run": None} + + def fake_run_strategy(*, force_run=False): + observed["force_run"] = force_run + + module.run_strategy = fake_run_strategy + + with module.app.test_request_context("/backfill", method="POST"): + body, status = module.handle_backfill() + + self.assertEqual(status, 200) + self.assertEqual(body, "OK") + self.assertTrue(observed["force_run"]) + def test_run_strategy_emits_structured_runtime_events(self): module = load_module() observed = [] @@ -222,6 +248,22 @@ def test_run_strategy_emits_structured_runtime_events(self): ) self.assertTrue(all(run_id == "run-001" for run_id, _event, _fields in observed)) + def test_run_strategy_force_runs_when_market_closed(self): + module = load_module() + observed = [] + + module.build_run_id = lambda: "run-001" + module.emit_runtime_log = lambda context, event, **fields: observed.append((context.run_id, event, fields)) + module.is_market_open_now = lambda: False + module.run_rebalance_cycle = lambda **_kwargs: observed.append(("rebalance", "called", {})) + + module.run_strategy(force_run=True) + + events = [event for _run_id, event, _fields in observed] + self.assertIn("market_hours_bypassed", events) + self.assertIn("strategy_cycle_completed", events) + self.assertIn(("rebalance", "called", {}), observed) + def test_run_strategy_persists_machine_readable_report(self): module = load_module() observed_reports = [] diff --git a/tests/test_shared_chat_id_fallback.py b/tests/test_shared_chat_id_fallback.py index 91e985f..fb5ca3f 100644 --- a/tests/test_shared_chat_id_fallback.py +++ b/tests/test_shared_chat_id_fallback.py @@ -15,6 +15,8 @@ if str(PLATFORM_KIT_SRC) not in sys.path: sys.path.insert(0, str(PLATFORM_KIT_SRC)) +from quant_platform_kit.common.runtime_target import build_runtime_target + @contextmanager def install_stub_modules(): @@ -52,6 +54,31 @@ def run(self, *args, **kwargs): cloud_run_module = types.ModuleType("entrypoints.cloud_run") cloud_run_module.is_market_open_now = lambda: True + runtime_config_support_module = types.ModuleType("runtime_config_support") + runtime_config_support_module.load_platform_runtime_settings = lambda **_kwargs: types.SimpleNamespace( + project_id=None, + secret_name="longport_token_hk", + account_prefix="HK", + strategy_profile="soxl_soxx_trend_income", + strategy_display_name="SOXL/SOXX Semiconductor Trend Income", + strategy_domain="us_equity", + account_region="HK", + notify_lang="en", + tg_token=None, + tg_chat_id="shared-chat-id", + dry_run_only=False, + fractional_limit_buy_fallback_to_market=False, + runtime_target=build_runtime_target( + platform_id="longbridge", + strategy_profile="soxl_soxx_trend_income", + dry_run_only=False, + deployment_selector="HK", + account_selector=("HK",), + account_scope="HK", + service_name="longbridge-quant-hk-service", + ), + ) + qpk_longbridge_module = types.ModuleType("quant_platform_kit.longbridge") qpk_longbridge_module.build_contexts = lambda *args, **kwargs: ("quote-context", "trade-context") qpk_longbridge_module.calculate_rotation_indicators = lambda *args, **kwargs: {} @@ -76,6 +103,9 @@ def run(self, *args, **kwargs): google_module.auth = google_auth_module google_cloud_module.secretmanager_v1 = google_secretmanager_module + pandas_module = types.ModuleType("pandas") + pandas_module.Timestamp = lambda value=None: value + pandas_market_calendars = types.ModuleType("pandas_market_calendars") strategy_runtime_module = types.ModuleType("strategy_runtime") @@ -104,19 +134,28 @@ def run(self, *args, **kwargs): ): setattr(openapi_module, name, type(name, (), {})) + us_equity_strategies_module = types.ModuleType("us_equity_strategies") + us_equity_strategies_module.__path__ = [] + catalog_module = types.ModuleType("us_equity_strategies.catalog") + catalog_module.resolve_canonical_profile = lambda profile: profile + modules = { "flask": flask_module, "requests": requests_module, "entrypoints.cloud_run": cloud_run_module, + "runtime_config_support": runtime_config_support_module, "quant_platform_kit.longbridge": qpk_longbridge_module, "google": google_module, "google.auth": google_auth_module, "google.cloud": google_cloud_module, "google.cloud.secretmanager_v1": google_secretmanager_module, + "pandas": pandas_module, "pandas_market_calendars": pandas_market_calendars, "strategy_runtime": strategy_runtime_module, "longport": longport_module, "longport.openapi": openapi_module, + "us_equity_strategies": us_equity_strategies_module, + "us_equity_strategies.catalog": catalog_module, } original = {name: sys.modules.get(name) for name in modules} sys.modules.update(modules) From a3609e90b068012a7abffb067d62d7fb69923097 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Wed, 13 May 2026 12:52:07 +0800 Subject: [PATCH 5/7] Make LongBridge backfill validation-only --- application/runtime_composer.py | 3 ++- main.py | 18 ++++++++----- tests/test_request_handling.py | 46 +++++++++++++++++++++++++++++++-- 3 files changed, 58 insertions(+), 9 deletions(-) diff --git a/application/runtime_composer.py b/application/runtime_composer.py index d2e1855..a353ce6 100644 --- a/application/runtime_composer.py +++ b/application/runtime_composer.py @@ -226,6 +226,7 @@ def build_runtime_composer( order_poll_max_attempts: int, dry_run_only: bool, fractional_limit_buy_fallback_to_market: bool, + dry_run_only_override: bool | None = None, broker_adapters: Any, strategy_adapters: Any, estimate_max_purchase_quantity_fn: Callable[..., float], @@ -265,7 +266,7 @@ def build_runtime_composer( limit_buy_premium=float(limit_buy_premium), order_poll_interval_sec=int(order_poll_interval_sec), order_poll_max_attempts=int(order_poll_max_attempts), - dry_run_only=bool(dry_run_only), + dry_run_only=bool(dry_run_only if dry_run_only_override is None else dry_run_only_override), fractional_limit_buy_fallback_to_market=bool(fractional_limit_buy_fallback_to_market), broker_adapters=broker_adapters, strategy_adapters=strategy_adapters, diff --git a/main.py b/main.py index 5c1bf8b..d18e412 100644 --- a/main.py +++ b/main.py @@ -149,7 +149,7 @@ def log_runtime_warning(message): ) -def build_composer(): +def build_composer(*, dry_run_only_override: bool | None = None): return build_runtime_composer( project_id=PROJECT_ID, secret_name=SECRET_NAME, @@ -172,6 +172,7 @@ def build_composer(): order_poll_interval_sec=ORDER_POLL_INTERVAL_SEC, order_poll_max_attempts=ORDER_POLL_MAX_ATTEMPTS, dry_run_only=RUNTIME_SETTINGS.dry_run_only, + dry_run_only_override=dry_run_only_override, fractional_limit_buy_fallback_to_market=RUNTIME_SETTINGS.fractional_limit_buy_fallback_to_market, broker_adapters=BROKER_ADAPTERS, strategy_adapters=STRATEGY_ADAPTERS, @@ -192,8 +193,8 @@ def build_composer(): ) -def run_strategy(*, force_run: bool = False): - composer = build_composer() +def run_strategy(*, force_run: bool = False, validation_only: bool = False): + composer = build_composer(dry_run_only_override=True if validation_only else None) reporting_adapters = composer.build_reporting_adapters() log_context, report = reporting_adapters.start_run() notification_adapters = composer.build_notification_adapters() @@ -245,7 +246,12 @@ def run_strategy(*, force_run: bool = False): "market_hours_bypassed", message="Market hours bypassed for backfill execution", ) - print(composer.with_prefix("Market hours bypassed for backfill execution."), flush=True) + print( + composer.with_prefix( + "Market hours bypassed for backfill verification; validation only, no orders will be submitted." + ), + flush=True, + ) run_rebalance_cycle( runtime=composer.build_rebalance_runtime(), config=composer.build_rebalance_config(strategy_plugin_signals=strategy_plugin_signals), @@ -294,8 +300,8 @@ def handle_trigger(): @app.route("/backfill", methods=["POST", "GET"]) def handle_backfill(): - """Manual backfill entrypoint that bypasses market-hours guards.""" - run_strategy(force_run=True) + """Manual backfill entrypoint for verification-only execution.""" + run_strategy(force_run=True, validation_only=True) return "OK", 200 diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index a540613..c4c9c82 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -217,10 +217,11 @@ def fake_run_strategy(): def test_handle_backfill_forces_strategy_run(self): module = load_module() - observed = {"force_run": None} + observed = {"force_run": None, "validation_only": None} - def fake_run_strategy(*, force_run=False): + def fake_run_strategy(*, force_run=False, validation_only=False): observed["force_run"] = force_run + observed["validation_only"] = validation_only module.run_strategy = fake_run_strategy @@ -230,6 +231,7 @@ def fake_run_strategy(*, force_run=False): self.assertEqual(status, 200) self.assertEqual(body, "OK") self.assertTrue(observed["force_run"]) + self.assertTrue(observed["validation_only"]) def test_run_strategy_emits_structured_runtime_events(self): module = load_module() @@ -264,6 +266,46 @@ def test_run_strategy_force_runs_when_market_closed(self): self.assertIn("strategy_cycle_completed", events) self.assertIn(("rebalance", "called", {}), observed) + def test_run_strategy_validation_only_uses_dry_run_composer(self): + module = load_module() + observed = {"override": None} + + class FakeComposer: + def build_reporting_adapters(self): + return types.SimpleNamespace( + start_run=lambda: (types.SimpleNamespace(run_id="run-001"), {"status": "pending"}), + log_event=lambda *args, **kwargs: None, + persist_execution_report=lambda report: types.SimpleNamespace(local_path="/tmp/report.json"), + ) + + def build_notification_adapters(self): + return types.SimpleNamespace(publish_cycle_notification=lambda **_kwargs: None) + + def load_strategy_plugin_signals(self, *_args, **_kwargs): + return (), None + + def attach_strategy_plugin_report(self, *_args, **_kwargs): + return None + + def with_prefix(self, message): + return message + + def build_rebalance_runtime(self): + return types.SimpleNamespace() + + def build_rebalance_config(self, *, strategy_plugin_signals=()): + return types.SimpleNamespace() + + module.build_composer = lambda *, dry_run_only_override=None: observed.__setitem__("override", dry_run_only_override) or FakeComposer() + module.is_market_open_now = lambda: False + module.run_rebalance_cycle = lambda **_kwargs: None + module.persist_execution_report = lambda report: types.SimpleNamespace(local_path="/tmp/report.json") + module.build_run_id = lambda: "run-001" + + module.run_strategy(force_run=True, validation_only=True) + + self.assertTrue(observed["override"]) + def test_run_strategy_persists_machine_readable_report(self): module = load_module() observed_reports = [] From 64c516c5154e54dee4ab20531177caf32b5718a7 Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Wed, 13 May 2026 13:28:06 +0800 Subject: [PATCH 6/7] Simulate post-sell buying power in LongBridge backfill --- application/execution_service.py | 86 +++++++++++++++++++------------- tests/test_rebalance_service.py | 35 +++++++++++++ 2 files changed, 86 insertions(+), 35 deletions(-) diff --git a/application/execution_service.py b/application/execution_service.py index 72a9aa3..3873c80 100644 --- a/application/execution_service.py +++ b/application/execution_service.py @@ -228,6 +228,7 @@ def execute_rebalance_cycle( cash_by_currency = _normalize_cash_by_currency(portfolio.get("cash_by_currency")) investable_cash = float(execution["investable_cash"]) current_min_trade = float(execution["current_min_trade"]) + dry_run_sale_proceeds = 0.0 def append_order_id_suffix(log_message, order_id): order_id_text = str(order_id or "").strip() @@ -398,6 +399,8 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): round(price, 2), order_type="market", ) + if submitted: + dry_run_sale_proceeds += float(quantity) * round(price, 2) else: submitted = submit_order_via_port( f"{symbol}.US", @@ -418,10 +421,10 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): kind="sell_skipped", detail=( f"Symbol: {symbol}.US Diff: ${abs(diff):.2f} " - f"Held: {quantities[symbol]} Sellable: {sellable_quantities[symbol]} " - f"(no sellable)" - ), - ) + f"Held: {quantities[symbol]} Sellable: {sellable_quantities[symbol]} " + f"(no sellable)" + ), + ) buy_candidates = [ symbol @@ -464,6 +467,8 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): sweep_price, order_type="market", ) + if submitted: + dry_run_sale_proceeds += float(sweep_quantity) * sweep_price else: submitted = submit_order_via_port( f"{cash_sweep_symbol}.US", @@ -482,37 +487,48 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): sell_submitted = True if sell_submitted: - previous_investable_cash = investable_cash - refresh_attempts = max(1, int(post_sell_refresh_attempts or 1)) - refresh_interval = max(0.0, float(post_sell_refresh_interval_sec or 0.0)) - best_refreshed_state = None - best_investable_cash = previous_investable_cash - for attempt in range(refresh_attempts): - if attempt > 0: - sleeper(refresh_interval) - refreshed_state = fetch_replanned_state() - refreshed_execution = refreshed_state[2] - refreshed_investable_cash = float(refreshed_execution["investable_cash"]) - if best_refreshed_state is None or refreshed_investable_cash > best_investable_cash: - best_refreshed_state = refreshed_state - best_investable_cash = refreshed_investable_cash - if refreshed_investable_cash > previous_investable_cash: - best_refreshed_state = refreshed_state - break - plan, portfolio, execution, allocation = best_refreshed_state - threshold_value = float(execution["trade_threshold_value"]) - limit_order_symbols = set( - allocation.get("risk_symbols", ()) + allocation.get("income_symbols", ()) - ) - strategy_assets = tuple(allocation["strategy_symbols"]) - market_values = dict(portfolio["market_values"]) - quantities = dict(portfolio["quantities"]) - sellable_quantities = dict(portfolio["sellable_quantities"]) - target_values = dict(allocation["targets"]) - available_cash = float(portfolio["liquid_cash"]) - cash_by_currency = _normalize_cash_by_currency(portfolio.get("cash_by_currency")) - investable_cash = float(execution["investable_cash"]) - current_min_trade = float(execution["current_min_trade"]) + if dry_run_only and dry_run_sale_proceeds > 0.0: + simulated_cash = float(dry_run_sale_proceeds) + available_cash = max(0.0, available_cash + simulated_cash) + investable_cash = max(0.0, investable_cash + simulated_cash) + validation_message = ( + f"🧪 验证回款已入账: cash=${simulated_cash:.2f} " + f"investable=${investable_cash:.2f}" + ) + note_logs.append(validation_message) + print(with_prefix(validation_message), flush=True) + else: + previous_investable_cash = investable_cash + refresh_attempts = max(1, int(post_sell_refresh_attempts or 1)) + refresh_interval = max(0.0, float(post_sell_refresh_interval_sec or 0.0)) + best_refreshed_state = None + best_investable_cash = previous_investable_cash + for attempt in range(refresh_attempts): + if attempt > 0: + sleeper(refresh_interval) + refreshed_state = fetch_replanned_state() + refreshed_execution = refreshed_state[2] + refreshed_investable_cash = float(refreshed_execution["investable_cash"]) + if best_refreshed_state is None or refreshed_investable_cash > best_investable_cash: + best_refreshed_state = refreshed_state + best_investable_cash = refreshed_investable_cash + if refreshed_investable_cash > previous_investable_cash: + best_refreshed_state = refreshed_state + break + plan, portfolio, execution, allocation = best_refreshed_state + threshold_value = float(execution["trade_threshold_value"]) + limit_order_symbols = set( + allocation.get("risk_symbols", ()) + allocation.get("income_symbols", ()) + ) + strategy_assets = tuple(allocation["strategy_symbols"]) + market_values = dict(portfolio["market_values"]) + quantities = dict(portfolio["quantities"]) + sellable_quantities = dict(portfolio["sellable_quantities"]) + target_values = dict(allocation["targets"]) + available_cash = float(portfolio["liquid_cash"]) + cash_by_currency = _normalize_cash_by_currency(portfolio.get("cash_by_currency")) + investable_cash = float(execution["investable_cash"]) + current_min_trade = float(execution["current_min_trade"]) if ( available_cash <= 0.0 diff --git a/tests/test_rebalance_service.py b/tests/test_rebalance_service.py index 433289c..e65c5a7 100644 --- a/tests/test_rebalance_service.py +++ b/tests/test_rebalance_service.py @@ -1071,6 +1071,41 @@ def test_cash_sweep_symbol_can_fund_buy_when_investable_cash_is_positive_but_sho self.assertIn("限价买入", sent_messages[0]) self.assertIn("SOXL", sent_messages[0]) + def test_dry_run_cash_sweep_can_simulate_buy_after_sell_settlement(self): + initial_plan = _build_plan( + strategy_symbols=("SOXL", "SOXX", "BOXX"), + risk_symbols=("SOXL", "SOXX"), + safe_haven_symbols=("BOXX",), + targets={"SOXL": 500.0, "SOXX": 0.0, "BOXX": 1000.0}, + market_values={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 1000.0}, + sellable_quantities={"SOXL": 0, "SOXX": 0, "BOXX": 10}, + quantities={"SOXL": 0, "SOXX": 0, "BOXX": 10}, + current_min_trade=100.0, + trade_threshold_value=100.0, + investable_cash=0.0, + market_status="🧯 过热降档(SOXX)", + deploy_ratio_text="15.0%", + income_ratio_text="0.0%", + income_locked_ratio_text="0.0%", + signal_message="SOXX 仍在 140 日门槛线上方,但触发过热降档,目标仓位 SOXL 15.0%", + available_cash=0.0, + total_strategy_equity=1000.0, + portfolio_rows=(("SOXL", "SOXX"), ("BOXX",)), + ) + + sent_messages, _, _ = self._run_strategy( + initial_plan, + prices={"SOXL.US": 100.0, "SOXX.US": 200.0, "BOXX.US": 100.0}, + estimate_max_purchase_quantity_value=10, + dry_run_only=True, + ) + + self.assertEqual(len(sent_messages), 1) + self.assertIn("🧪 模拟运行模式", sent_messages[0]) + self.assertIn("🧪 模拟市价卖出 BOXX.US", sent_messages[0]) + self.assertIn("🧪 模拟限价买入 SOXL.US", sent_messages[0]) + self.assertNotIn("买入说明", sent_messages[0]) + def test_retries_account_refresh_after_sell_until_buying_power_updates(self): initial_plan = _build_plan( strategy_profile="tqqq_growth_income", From 4f9f879ab34b41c3b75f4752eb06e0f6f36cd1fa Mon Sep 17 00:00:00 2001 From: Pigbibi <20649888+Pigbibi@users.noreply.github.com> Date: Thu, 14 May 2026 03:54:24 +0800 Subject: [PATCH 7/7] feat: add LongBridge precheck entrypoint --- main.py | 13 ++++++++++--- tests/test_request_handling.py | 23 ++++++++++++++++++++++- 2 files changed, 32 insertions(+), 4 deletions(-) diff --git a/main.py b/main.py index d18e412..e4b80b1 100644 --- a/main.py +++ b/main.py @@ -193,7 +193,7 @@ def build_composer(*, dry_run_only_override: bool | None = None): ) -def run_strategy(*, force_run: bool = False, validation_only: bool = False): +def run_strategy(*, force_run: bool = False, validation_only: bool = False, validation_label: str = "backfill"): composer = build_composer(dry_run_only_override=True if validation_only else None) reporting_adapters = composer.build_reporting_adapters() log_context, report = reporting_adapters.start_run() @@ -244,11 +244,11 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False): reporting_adapters.log_event( log_context, "market_hours_bypassed", - message="Market hours bypassed for backfill execution", + message=f"Market hours bypassed for {validation_label} execution", ) print( composer.with_prefix( - "Market hours bypassed for backfill verification; validation only, no orders will be submitted." + f"Market hours bypassed for {validation_label} verification; validation only, no orders will be submitted." ), flush=True, ) @@ -305,5 +305,12 @@ def handle_backfill(): return "OK", 200 +@app.route("/precheck", methods=["POST", "GET"]) +def handle_precheck(): + """Pre-market / post-open verification entrypoint for dry-run only execution.""" + run_strategy(force_run=True, validation_only=True, validation_label="precheck") + return "Precheck OK", 200 + + if __name__ == "__main__": app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080))) diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index c4c9c82..3c899ec 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -219,9 +219,10 @@ def test_handle_backfill_forces_strategy_run(self): module = load_module() observed = {"force_run": None, "validation_only": None} - def fake_run_strategy(*, force_run=False, validation_only=False): + def fake_run_strategy(*, force_run=False, validation_only=False, validation_label="backfill"): observed["force_run"] = force_run observed["validation_only"] = validation_only + observed["validation_label"] = validation_label module.run_strategy = fake_run_strategy @@ -233,6 +234,26 @@ def fake_run_strategy(*, force_run=False, validation_only=False): self.assertTrue(observed["force_run"]) self.assertTrue(observed["validation_only"]) + def test_handle_precheck_forces_strategy_run(self): + module = load_module() + observed = {"force_run": None, "validation_only": None} + + def fake_run_strategy(*, force_run=False, validation_only=False, validation_label="backfill"): + observed["force_run"] = force_run + observed["validation_only"] = validation_only + observed["validation_label"] = validation_label + + module.run_strategy = fake_run_strategy + + with module.app.test_request_context("/precheck", method="POST"): + body, status = module.handle_precheck() + + self.assertEqual(status, 200) + self.assertEqual(body, "Precheck OK") + self.assertTrue(observed["force_run"]) + self.assertTrue(observed["validation_only"]) + self.assertEqual(observed["validation_label"], "precheck") + def test_run_strategy_emits_structured_runtime_events(self): module = load_module() observed = []