diff --git a/README.md b/README.md index 0307c63..b95179e 100644 --- a/README.md +++ b/README.md @@ -161,7 +161,7 @@ Important: 2. Create secret `longport_token_paper` for paper / `longport_token_hk` for HK / `longport_token_sg` for SG (or your custom `LONGPORT_SECRET_NAME`) in Secret Manager and add your LongPort access token as the first version. 3. Set the required env vars above on the Cloud Run service. 4. Deploy the app to Cloud Run (e.g. `gcloud run deploy` from repo root with Dockerfile or buildpack). -5. Create a Cloud Scheduler job that POSTs to the Cloud Run URL. Choose the cron from the strategy-layer cadence in `UsEquityStrategies`; this platform repo only owns the runtime trigger wiring. +5. Create two Cloud Scheduler jobs that POST to the Cloud Run URL. Use `"/precheck"` after the open window and `"/"` near the close window. Choose both crons from the strategy-layer cadence in `UsEquityStrategies`; this platform repo only owns the runtime trigger wiring. IAM: the Cloud Run service account needs **Secret Manager Admin** (or Secret Accessor for the configured `LONGPORT_SECRET_NAME`, `LONGPORT_APP_KEY_SECRET_NAME`, and `LONGPORT_APP_SECRET_SECRET_NAME`, such as `longport_token_paper`, `longport-app-key-paper`, `longport-app-secret-paper`) and **Logs Writer**. Build/deploy typically uses a separate account with Artifact Registry Writer, Cloud Run Admin, Service Account User. @@ -320,6 +320,6 @@ Secret Manager 中需存在 `LONGPORT_SECRET_NAME` 指定的密钥(默认: `lo 2. 在 Secret Manager 中为 paper 创建 `longport_token_paper`、为 HK 创建 `longport_token_hk`、为 SG 创建 `longport_token_sg`(或使用你自定义的 `LONGPORT_SECRET_NAME`),并将 LongPort access token 作为第一个版本写入。 3. 在 Cloud Run 服务上配置上述环境变量。 4. 部署至 Cloud Run(如从仓库根目录执行 `gcloud run deploy`)。 -5. 创建 Cloud Scheduler 定时任务,POST 到 Cloud Run URL。cron 频率以 `UsEquityStrategies` 里的策略层 cadence 为准;这个平台仓只维护运行时触发 wiring。 +5. 创建两个 Cloud Scheduler 定时任务,POST 到 Cloud Run URL。开盘后窗口走 `"/precheck"`,临近收盘窗口走 `"/"`。cron 频率以 `UsEquityStrategies` 里的策略层 cadence 为准;这个平台仓只维护运行时触发 wiring。 IAM: Cloud Run 服务账号需要 **Secret Manager Admin**(或当前 `LONGPORT_SECRET_NAME`、`LONGPORT_APP_KEY_SECRET_NAME`、`LONGPORT_APP_SECRET_SECRET_NAME` 对应 secret 的 Secret Accessor,例如 `longport_token_paper`、`longport-app-key-paper`、`longport-app-secret-paper`)和 **Logs Writer** 权限。 diff --git a/application/execution_service.py b/application/execution_service.py index 3873c80..ec7ceba 100644 --- a/application/execution_service.py +++ b/application/execution_service.py @@ -2,11 +2,13 @@ from __future__ import annotations -import math import traceback from collections.abc import Mapping from dataclasses import dataclass +from quant_platform_kit.common.cash_sweep import ( + estimate_cash_sweep_sale_quantity_to_fund_buy, +) from quant_platform_kit.common.quantity import ( floor_to_quantity_step, format_quantity, @@ -166,6 +168,7 @@ def _estimate_buy_quantity_candidate( can_buy_value, estimate_max_purchase_quantity, notify_issue, + dry_run_only=False, ): budget_quantity = floor_to_quantity_step(can_buy_value / ref_price, 0.0001) cash_limit_quantity = estimate_cash_buy_quantity_safe( @@ -178,6 +181,8 @@ def _estimate_buy_quantity_candidate( ) if cash_limit_quantity is None: return None + if dry_run_only and float(cash_limit_quantity or 0.0) <= 0.0: + cash_limit_quantity = budget_quantity candidate_quantity = _normalize_trade_quantity( min(budget_quantity, float(cash_limit_quantity)), allow_fractional=True, @@ -229,6 +234,7 @@ def execute_rebalance_cycle( investable_cash = float(execution["investable_cash"]) current_min_trade = float(execution["current_min_trade"]) dry_run_sale_proceeds = 0.0 + cash_sweep_sold_this_cycle = False def append_order_id_suffix(log_message, order_id): order_id_text = str(order_id or "").strip() @@ -325,6 +331,7 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): if cash_sweep_price is None or cash_sweep_price <= 0.0: return 0 base_buying_power = max(0.0, float(investable_cash)) + funding_needs = [] for buy_symbol in candidate_symbols: underweight_value = target_values[buy_symbol] - market_values[buy_symbol] if underweight_value <= threshold_value: @@ -336,18 +343,18 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): ) if buy_price is None: continue - ask = round(buy_price * limit_buy_premium, 2) if buy_symbol in limit_order_symbols else round(buy_price, 2) - max_buy_quantity = int(underweight_value // ask) - if max_buy_quantity <= 0: - continue - required_buying_power = max_buy_quantity * ask - if base_buying_power >= required_buying_power: - return 0 - return min( - max_quantity, - max(1, math.ceil((required_buying_power - base_buying_power) / cash_sweep_price)), + ask = ( + round(buy_price * limit_buy_premium, 2) + if buy_symbol in limit_order_symbols + else round(buy_price, 2) ) - return 0 + funding_needs.append((underweight_value, ask)) + return estimate_cash_sweep_sale_quantity_to_fund_buy( + max_quantity, + cash_sweep_price, + base_buying_power, + funding_needs, + ) for symbol in strategy_assets: diff = target_values[symbol] - market_values[symbol] @@ -413,6 +420,8 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): if submitted: action_done = True sell_submitted = True + if symbol == cash_sweep_symbol: + cash_sweep_sold_this_cycle = True elif sellable_quantities[symbol] <= 0 and quantities[symbol] > 0: record_skip_log( skip_logs, @@ -485,6 +494,7 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): if submitted: action_done = True sell_submitted = True + cash_sweep_sold_this_cycle = True if sell_submitted: if dry_run_only and dry_run_sale_proceeds > 0.0: @@ -576,6 +586,7 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): can_buy_value=can_buy_value, estimate_max_purchase_quantity=estimate_max_purchase_quantity, notify_issue=notify_issue, + dry_run_only=dry_run_only, ) if limit_candidate is None: continue @@ -602,6 +613,7 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): can_buy_value=can_buy_value, estimate_max_purchase_quantity=estimate_max_purchase_quantity, notify_issue=notify_issue, + dry_run_only=dry_run_only, ) if market_candidate is not None: market_quantity, _market_budget_quantity, _market_cash_limit_quantity = market_candidate @@ -685,6 +697,52 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols): price=f"{price:.2f}", ) + if ( + not cash_sweep_sold_this_cycle + and cash_sweep_symbol + and cash_sweep_symbol in strategy_assets + ): + cash_sweep_price = safe_quote_last_price( + f"{cash_sweep_symbol}.US", + market_data_port=market_data_port, + notify_issue=notify_issue, + ) + if cash_sweep_price is not None and cash_sweep_price > 0.0 and investable_cash > cash_sweep_price * 2: + quantity = int(investable_cash // cash_sweep_price) + if quantity > 0: + quantity_text = format_quantity(quantity) + if dry_run_only: + submitted = record_dry_run( + f"{cash_sweep_symbol}.US", + "buy", + quantity_text, + round(cash_sweep_price, 2), + order_type="market", + ) + else: + submitted = submit_order_via_port( + f"{cash_sweep_symbol}.US", + "market", + "buy", + quantity, + translator( + "market_buy", + symbol=cash_sweep_symbol, + qty=quantity_text, + price=round(cash_sweep_price, 2), + ), + ) + if submitted: + rebuy_message = translator( + "cash_sweep_rebuy", + symbol=f"{cash_sweep_symbol}.US", + qty=quantity_text, + price=f"{cash_sweep_price:.2f}", + ) + note_logs.append(rebuy_message) + print(with_prefix(rebuy_message), flush=True) + action_done = True + return ExecutionCycleResult( plan=dict(plan or {}), portfolio=dict(portfolio or {}), diff --git a/main.py b/main.py index d18e412..e4b80b1 100644 --- a/main.py +++ b/main.py @@ -193,7 +193,7 @@ def build_composer(*, dry_run_only_override: bool | None = None): ) -def run_strategy(*, force_run: bool = False, validation_only: bool = False): +def run_strategy(*, force_run: bool = False, validation_only: bool = False, validation_label: str = "backfill"): composer = build_composer(dry_run_only_override=True if validation_only else None) reporting_adapters = composer.build_reporting_adapters() log_context, report = reporting_adapters.start_run() @@ -244,11 +244,11 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False): reporting_adapters.log_event( log_context, "market_hours_bypassed", - message="Market hours bypassed for backfill execution", + message=f"Market hours bypassed for {validation_label} execution", ) print( composer.with_prefix( - "Market hours bypassed for backfill verification; validation only, no orders will be submitted." + f"Market hours bypassed for {validation_label} verification; validation only, no orders will be submitted." ), flush=True, ) @@ -305,5 +305,12 @@ def handle_backfill(): return "OK", 200 +@app.route("/precheck", methods=["POST", "GET"]) +def handle_precheck(): + """Pre-market / post-open verification entrypoint for dry-run only execution.""" + run_strategy(force_run=True, validation_only=True, validation_label="precheck") + return "Precheck OK", 200 + + if __name__ == "__main__": app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080))) diff --git a/notifications/telegram.py b/notifications/telegram.py index 71e3e13..ec9d2aa 100644 --- a/notifications/telegram.py +++ b/notifications/telegram.py @@ -61,6 +61,7 @@ "buy_deferred_non_usd_cash": "检测到非 USD 现金({currencies}),但美股策略可用 USD 现金为 ${available}、可投资现金为 ${investable};请先换汇或入金 USD 后再买入", "buy_deferred_small_cash": "{symbol} 目标差额 ${diff},但可投资现金 ${investable} 不足买入 1 股(价格 ${price})", "buy_deferred_cash_limit": "{symbol} 目标差额 ${diff},预算可买 {budget_qty} 股,但券商估算可买数量为 0;可能有未完成挂单、结算或购买力占用", + "cash_sweep_rebuy": "🏦 [尾部回补] 剩余可投资现金回补 {symbol}: {qty}股 @ ${price}", "fractional_limit_buy_fallback_to_market": "{symbol} 限价买入碎股不稳定,已改为市价买入 {qty} 股;限价参考价 ${limit_price},市价参考价 ${market_price}", "limit_buy": "📈 [限价买入] {symbol}: {qty}股 @ ${price}", "market_buy": "📈 [市价买入] {symbol}: {qty}股 @ ${price}", @@ -155,6 +156,7 @@ "buy_deferred_non_usd_cash": "Non-USD cash is present ({currencies}), but this US-equity strategy has USD cash ${available} and investable cash ${investable}; convert or deposit USD before buying", "buy_deferred_small_cash": "{symbol} target gap ${diff}, but investable cash ${investable} is not enough for 1 share at ${price}", "buy_deferred_cash_limit": "{symbol} target gap ${diff}, budget supports {budget_qty} shares, but broker estimate returned 0; an open order, settlement, or buying-power hold may still be blocking funds", + "cash_sweep_rebuy": "🏦 [tail rebuy] residual investable cash rebought {symbol}: {qty} shares @ ${price}", "fractional_limit_buy_fallback_to_market": "{symbol} fractional limit buy is unstable, falling back to market buy for {qty} shares; limit reference ${limit_price}, market reference ${market_price}", "limit_buy": "📈 [Limit buy] {symbol}: {qty} shares @ ${price}", "market_buy": "📈 [Market buy] {symbol}: {qty} shares @ ${price}", diff --git a/tests/test_rebalance_service.py b/tests/test_rebalance_service.py index e65c5a7..e96609b 100644 --- a/tests/test_rebalance_service.py +++ b/tests/test_rebalance_service.py @@ -1106,6 +1106,74 @@ def test_dry_run_cash_sweep_can_simulate_buy_after_sell_settlement(self): self.assertIn("🧪 模拟限价买入 SOXL.US", sent_messages[0]) self.assertNotIn("买入说明", sent_messages[0]) + def test_dry_run_cash_sweep_ignores_zero_estimator_after_sell_settlement(self): + initial_plan = _build_plan( + strategy_symbols=("SOXL", "SOXX", "BOXX"), + risk_symbols=("SOXL", "SOXX"), + safe_haven_symbols=("BOXX",), + targets={"SOXL": 500.0, "SOXX": 0.0, "BOXX": 1000.0}, + market_values={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 1000.0}, + sellable_quantities={"SOXL": 0, "SOXX": 0, "BOXX": 10}, + quantities={"SOXL": 0, "SOXX": 0, "BOXX": 10}, + current_min_trade=100.0, + trade_threshold_value=100.0, + investable_cash=0.0, + market_status="🧯 过热降档(SOXX)", + deploy_ratio_text="15.0%", + income_ratio_text="0.0%", + income_locked_ratio_text="0.0%", + signal_message="SOXX 仍在 140 日门槛线上方,但触发过热降档,目标仓位 SOXL 15.0%", + available_cash=0.0, + total_strategy_equity=1000.0, + portfolio_rows=(("SOXL", "SOXX"), ("BOXX",)), + ) + + sent_messages, _, _ = self._run_strategy( + initial_plan, + prices={"SOXL.US": 100.0, "SOXX.US": 200.0, "BOXX.US": 100.0}, + estimate_max_purchase_quantity_value=0, + dry_run_only=True, + ) + + self.assertEqual(len(sent_messages), 1) + self.assertIn("🧪 模拟运行模式", sent_messages[0]) + self.assertIn("🧪 模拟市价卖出 BOXX.US", sent_messages[0]) + self.assertIn("🧪 模拟限价买入 SOXL.US", sent_messages[0]) + self.assertNotIn("券商估算可买数量为 0", sent_messages[0]) + + def test_dry_run_rebuys_cash_sweep_symbol_with_remaining_investable_cash(self): + initial_plan = _build_plan( + strategy_symbols=("SOXL", "SOXX", "BOXX"), + risk_symbols=("SOXL", "SOXX"), + safe_haven_symbols=("BOXX",), + targets={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 0.0}, + market_values={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 0.0}, + sellable_quantities={"SOXL": 0, "SOXX": 0, "BOXX": 0}, + quantities={"SOXL": 0, "SOXX": 0, "BOXX": 0}, + current_min_trade=100.0, + trade_threshold_value=100.0, + investable_cash=500.0, + market_status="🧯 过热降档(SOXX)", + deploy_ratio_text="0.0%", + income_ratio_text="0.0%", + income_locked_ratio_text="0.0%", + signal_message="无其他买单,仅保留现金回补", + available_cash=500.0, + total_strategy_equity=500.0, + portfolio_rows=(("SOXL", "SOXX"), ("BOXX",)), + ) + + sent_messages, _, _ = self._run_strategy( + initial_plan, + prices={"SOXL.US": 100.0, "SOXX.US": 200.0, "BOXX.US": 100.0}, + estimate_max_purchase_quantity_value=10, + dry_run_only=True, + ) + + self.assertEqual(len(sent_messages), 1) + self.assertIn("🧪 模拟运行模式", sent_messages[0]) + self.assertIn("🧪 模拟市价买入 BOXX.US", sent_messages[0]) + def test_retries_account_refresh_after_sell_until_buying_power_updates(self): initial_plan = _build_plan( strategy_profile="tqqq_growth_income", diff --git a/tests/test_request_handling.py b/tests/test_request_handling.py index c4c9c82..3a8c5e3 100644 --- a/tests/test_request_handling.py +++ b/tests/test_request_handling.py @@ -218,10 +218,10 @@ def fake_run_strategy(): def test_handle_backfill_forces_strategy_run(self): module = load_module() observed = {"force_run": None, "validation_only": None} - - def fake_run_strategy(*, force_run=False, validation_only=False): + def fake_run_strategy(*, force_run=False, validation_only=False, validation_label="backfill"): observed["force_run"] = force_run observed["validation_only"] = validation_only + observed["validation_label"] = validation_label module.run_strategy = fake_run_strategy @@ -232,6 +232,25 @@ def fake_run_strategy(*, force_run=False, validation_only=False): self.assertEqual(body, "OK") self.assertTrue(observed["force_run"]) self.assertTrue(observed["validation_only"]) + def test_handle_precheck_forces_strategy_run(self): + module = load_module() + observed = {"force_run": None, "validation_only": None} + + def fake_run_strategy(*, force_run=False, validation_only=False, validation_label="backfill"): + observed["force_run"] = force_run + observed["validation_only"] = validation_only + observed["validation_label"] = validation_label + + module.run_strategy = fake_run_strategy + + with module.app.test_request_context("/precheck", method="POST"): + body, status = module.handle_precheck() + + self.assertEqual(status, 200) + self.assertEqual(body, "Precheck OK") + self.assertTrue(observed["force_run"]) + self.assertTrue(observed["validation_only"]) + self.assertEqual(observed["validation_label"], "precheck") def test_run_strategy_emits_structured_runtime_events(self): module = load_module()