Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ Important:
2. Create secret `longport_token_paper` for paper / `longport_token_hk` for HK / `longport_token_sg` for SG (or your custom `LONGPORT_SECRET_NAME`) in Secret Manager and add your LongPort access token as the first version.
3. Set the required env vars above on the Cloud Run service.
4. Deploy the app to Cloud Run (e.g. `gcloud run deploy` from repo root with Dockerfile or buildpack).
5. Create a Cloud Scheduler job that POSTs to the Cloud Run URL. Choose the cron from the strategy-layer cadence in `UsEquityStrategies`; this platform repo only owns the runtime trigger wiring.
5. Create two Cloud Scheduler jobs that POST to the Cloud Run URL. Use `"/precheck"` after the open window and `"/"` near the close window. Choose both crons from the strategy-layer cadence in `UsEquityStrategies`; this platform repo only owns the runtime trigger wiring.

IAM: the Cloud Run service account needs **Secret Manager Admin** (or Secret Accessor for the configured `LONGPORT_SECRET_NAME`, `LONGPORT_APP_KEY_SECRET_NAME`, and `LONGPORT_APP_SECRET_SECRET_NAME`, such as `longport_token_paper`, `longport-app-key-paper`, `longport-app-secret-paper`) and **Logs Writer**. Build/deploy typically uses a separate account with Artifact Registry Writer, Cloud Run Admin, Service Account User.

Expand Down Expand Up @@ -320,6 +320,6 @@ Secret Manager 中需存在 `LONGPORT_SECRET_NAME` 指定的密钥(默认: `lo
2. 在 Secret Manager 中为 paper 创建 `longport_token_paper`、为 HK 创建 `longport_token_hk`、为 SG 创建 `longport_token_sg`(或使用你自定义的 `LONGPORT_SECRET_NAME`),并将 LongPort access token 作为第一个版本写入。
3. 在 Cloud Run 服务上配置上述环境变量。
4. 部署至 Cloud Run(如从仓库根目录执行 `gcloud run deploy`)。
5. 创建 Cloud Scheduler 定时任务,POST 到 Cloud Run URL。cron 频率以 `UsEquityStrategies` 里的策略层 cadence 为准;这个平台仓只维护运行时触发 wiring。
5. 创建两个 Cloud Scheduler 定时任务,POST 到 Cloud Run URL。开盘后窗口走 `"/precheck"`,临近收盘窗口走 `"/"`。cron 频率以 `UsEquityStrategies` 里的策略层 cadence 为准;这个平台仓只维护运行时触发 wiring。

IAM: Cloud Run 服务账号需要 **Secret Manager Admin**(或当前 `LONGPORT_SECRET_NAME`、`LONGPORT_APP_KEY_SECRET_NAME`、`LONGPORT_APP_SECRET_SECRET_NAME` 对应 secret 的 Secret Accessor,例如 `longport_token_paper`、`longport-app-key-paper`、`longport-app-secret-paper`)和 **Logs Writer** 权限。
82 changes: 70 additions & 12 deletions application/execution_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@

from __future__ import annotations

import math
import traceback
from collections.abc import Mapping
from dataclasses import dataclass

from quant_platform_kit.common.cash_sweep import (
estimate_cash_sweep_sale_quantity_to_fund_buy,
)
from quant_platform_kit.common.quantity import (
floor_to_quantity_step,
format_quantity,
Expand Down Expand Up @@ -166,6 +168,7 @@ def _estimate_buy_quantity_candidate(
can_buy_value,
estimate_max_purchase_quantity,
notify_issue,
dry_run_only=False,
):
budget_quantity = floor_to_quantity_step(can_buy_value / ref_price, 0.0001)
cash_limit_quantity = estimate_cash_buy_quantity_safe(
Expand All @@ -178,6 +181,8 @@ def _estimate_buy_quantity_candidate(
)
if cash_limit_quantity is None:
return None
if dry_run_only and float(cash_limit_quantity or 0.0) <= 0.0:
cash_limit_quantity = budget_quantity
candidate_quantity = _normalize_trade_quantity(
min(budget_quantity, float(cash_limit_quantity)),
allow_fractional=True,
Expand Down Expand Up @@ -229,6 +234,7 @@ def execute_rebalance_cycle(
investable_cash = float(execution["investable_cash"])
current_min_trade = float(execution["current_min_trade"])
dry_run_sale_proceeds = 0.0
cash_sweep_sold_this_cycle = False

def append_order_id_suffix(log_message, order_id):
order_id_text = str(order_id or "").strip()
Expand Down Expand Up @@ -325,6 +331,7 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols):
if cash_sweep_price is None or cash_sweep_price <= 0.0:
return 0
base_buying_power = max(0.0, float(investable_cash))
funding_needs = []
for buy_symbol in candidate_symbols:
underweight_value = target_values[buy_symbol] - market_values[buy_symbol]
if underweight_value <= threshold_value:
Expand All @@ -336,18 +343,18 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols):
)
if buy_price is None:
continue
ask = round(buy_price * limit_buy_premium, 2) if buy_symbol in limit_order_symbols else round(buy_price, 2)
max_buy_quantity = int(underweight_value // ask)
if max_buy_quantity <= 0:
continue
required_buying_power = max_buy_quantity * ask
if base_buying_power >= required_buying_power:
return 0
return min(
max_quantity,
max(1, math.ceil((required_buying_power - base_buying_power) / cash_sweep_price)),
ask = (
round(buy_price * limit_buy_premium, 2)
if buy_symbol in limit_order_symbols
else round(buy_price, 2)
)
return 0
funding_needs.append((underweight_value, ask))
return estimate_cash_sweep_sale_quantity_to_fund_buy(
max_quantity,
cash_sweep_price,
base_buying_power,
funding_needs,
)

for symbol in strategy_assets:
diff = target_values[symbol] - market_values[symbol]
Expand Down Expand Up @@ -413,6 +420,8 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols):
if submitted:
action_done = True
sell_submitted = True
if symbol == cash_sweep_symbol:
cash_sweep_sold_this_cycle = True
elif sellable_quantities[symbol] <= 0 and quantities[symbol] > 0:
record_skip_log(
skip_logs,
Expand Down Expand Up @@ -485,6 +494,7 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols):
if submitted:
action_done = True
sell_submitted = True
cash_sweep_sold_this_cycle = True

if sell_submitted:
if dry_run_only and dry_run_sale_proceeds > 0.0:
Expand Down Expand Up @@ -576,6 +586,7 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols):
can_buy_value=can_buy_value,
estimate_max_purchase_quantity=estimate_max_purchase_quantity,
notify_issue=notify_issue,
dry_run_only=dry_run_only,
)
if limit_candidate is None:
continue
Expand All @@ -602,6 +613,7 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols):
can_buy_value=can_buy_value,
estimate_max_purchase_quantity=estimate_max_purchase_quantity,
notify_issue=notify_issue,
dry_run_only=dry_run_only,
)
if market_candidate is not None:
market_quantity, _market_budget_quantity, _market_cash_limit_quantity = market_candidate
Expand Down Expand Up @@ -685,6 +697,52 @@ def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols):
price=f"{price:.2f}",
)

if (
not cash_sweep_sold_this_cycle
and cash_sweep_symbol
and cash_sweep_symbol in strategy_assets
):
cash_sweep_price = safe_quote_last_price(
f"{cash_sweep_symbol}.US",
market_data_port=market_data_port,
notify_issue=notify_issue,
)
if cash_sweep_price is not None and cash_sweep_price > 0.0 and investable_cash > cash_sweep_price * 2:
quantity = int(investable_cash // cash_sweep_price)
if quantity > 0:
quantity_text = format_quantity(quantity)
if dry_run_only:
submitted = record_dry_run(
f"{cash_sweep_symbol}.US",
"buy",
quantity_text,
round(cash_sweep_price, 2),
order_type="market",
)
else:
submitted = submit_order_via_port(
f"{cash_sweep_symbol}.US",
"market",
"buy",
quantity,
translator(
"market_buy",
symbol=cash_sweep_symbol,
qty=quantity_text,
price=round(cash_sweep_price, 2),
),
)
if submitted:
rebuy_message = translator(
"cash_sweep_rebuy",
symbol=f"{cash_sweep_symbol}.US",
qty=quantity_text,
price=f"{cash_sweep_price:.2f}",
)
note_logs.append(rebuy_message)
print(with_prefix(rebuy_message), flush=True)
action_done = True

return ExecutionCycleResult(
plan=dict(plan or {}),
portfolio=dict(portfolio or {}),
Expand Down
13 changes: 10 additions & 3 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def build_composer(*, dry_run_only_override: bool | None = None):
)


def run_strategy(*, force_run: bool = False, validation_only: bool = False):
def run_strategy(*, force_run: bool = False, validation_only: bool = False, validation_label: str = "backfill"):
composer = build_composer(dry_run_only_override=True if validation_only else None)
reporting_adapters = composer.build_reporting_adapters()
log_context, report = reporting_adapters.start_run()
Expand Down Expand Up @@ -244,11 +244,11 @@ def run_strategy(*, force_run: bool = False, validation_only: bool = False):
reporting_adapters.log_event(
log_context,
"market_hours_bypassed",
message="Market hours bypassed for backfill execution",
message=f"Market hours bypassed for {validation_label} execution",
)
print(
composer.with_prefix(
"Market hours bypassed for backfill verification; validation only, no orders will be submitted."
f"Market hours bypassed for {validation_label} verification; validation only, no orders will be submitted."
),
flush=True,
)
Expand Down Expand Up @@ -305,5 +305,12 @@ def handle_backfill():
return "OK", 200


@app.route("/precheck", methods=["POST", "GET"])
def handle_precheck():
"""Pre-market / post-open verification entrypoint for dry-run only execution."""
run_strategy(force_run=True, validation_only=True, validation_label="precheck")
return "Precheck OK", 200


if __name__ == "__main__":
app.run(host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
2 changes: 2 additions & 0 deletions notifications/telegram.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
"buy_deferred_non_usd_cash": "检测到非 USD 现金({currencies}),但美股策略可用 USD 现金为 ${available}、可投资现金为 ${investable};请先换汇或入金 USD 后再买入",
"buy_deferred_small_cash": "{symbol} 目标差额 ${diff},但可投资现金 ${investable} 不足买入 1 股(价格 ${price})",
"buy_deferred_cash_limit": "{symbol} 目标差额 ${diff},预算可买 {budget_qty} 股,但券商估算可买数量为 0;可能有未完成挂单、结算或购买力占用",
"cash_sweep_rebuy": "🏦 [尾部回补] 剩余可投资现金回补 {symbol}: {qty}股 @ ${price}",
"fractional_limit_buy_fallback_to_market": "{symbol} 限价买入碎股不稳定,已改为市价买入 {qty} 股;限价参考价 ${limit_price},市价参考价 ${market_price}",
"limit_buy": "📈 [限价买入] {symbol}: {qty}股 @ ${price}",
"market_buy": "📈 [市价买入] {symbol}: {qty}股 @ ${price}",
Expand Down Expand Up @@ -155,6 +156,7 @@
"buy_deferred_non_usd_cash": "Non-USD cash is present ({currencies}), but this US-equity strategy has USD cash ${available} and investable cash ${investable}; convert or deposit USD before buying",
"buy_deferred_small_cash": "{symbol} target gap ${diff}, but investable cash ${investable} is not enough for 1 share at ${price}",
"buy_deferred_cash_limit": "{symbol} target gap ${diff}, budget supports {budget_qty} shares, but broker estimate returned 0; an open order, settlement, or buying-power hold may still be blocking funds",
"cash_sweep_rebuy": "🏦 [tail rebuy] residual investable cash rebought {symbol}: {qty} shares @ ${price}",
"fractional_limit_buy_fallback_to_market": "{symbol} fractional limit buy is unstable, falling back to market buy for {qty} shares; limit reference ${limit_price}, market reference ${market_price}",
"limit_buy": "📈 [Limit buy] {symbol}: {qty} shares @ ${price}",
"market_buy": "📈 [Market buy] {symbol}: {qty} shares @ ${price}",
Expand Down
68 changes: 68 additions & 0 deletions tests/test_rebalance_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1106,6 +1106,74 @@ def test_dry_run_cash_sweep_can_simulate_buy_after_sell_settlement(self):
self.assertIn("🧪 模拟限价买入 SOXL.US", sent_messages[0])
self.assertNotIn("买入说明", sent_messages[0])

def test_dry_run_cash_sweep_ignores_zero_estimator_after_sell_settlement(self):
initial_plan = _build_plan(
strategy_symbols=("SOXL", "SOXX", "BOXX"),
risk_symbols=("SOXL", "SOXX"),
safe_haven_symbols=("BOXX",),
targets={"SOXL": 500.0, "SOXX": 0.0, "BOXX": 1000.0},
market_values={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 1000.0},
sellable_quantities={"SOXL": 0, "SOXX": 0, "BOXX": 10},
quantities={"SOXL": 0, "SOXX": 0, "BOXX": 10},
current_min_trade=100.0,
trade_threshold_value=100.0,
investable_cash=0.0,
market_status="🧯 过热降档(SOXX)",
deploy_ratio_text="15.0%",
income_ratio_text="0.0%",
income_locked_ratio_text="0.0%",
signal_message="SOXX 仍在 140 日门槛线上方,但触发过热降档,目标仓位 SOXL 15.0%",
available_cash=0.0,
total_strategy_equity=1000.0,
portfolio_rows=(("SOXL", "SOXX"), ("BOXX",)),
)

sent_messages, _, _ = self._run_strategy(
initial_plan,
prices={"SOXL.US": 100.0, "SOXX.US": 200.0, "BOXX.US": 100.0},
estimate_max_purchase_quantity_value=0,
dry_run_only=True,
)

self.assertEqual(len(sent_messages), 1)
self.assertIn("🧪 模拟运行模式", sent_messages[0])
self.assertIn("🧪 模拟市价卖出 BOXX.US", sent_messages[0])
self.assertIn("🧪 模拟限价买入 SOXL.US", sent_messages[0])
self.assertNotIn("券商估算可买数量为 0", sent_messages[0])

def test_dry_run_rebuys_cash_sweep_symbol_with_remaining_investable_cash(self):
initial_plan = _build_plan(
strategy_symbols=("SOXL", "SOXX", "BOXX"),
risk_symbols=("SOXL", "SOXX"),
safe_haven_symbols=("BOXX",),
targets={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 0.0},
market_values={"SOXL": 0.0, "SOXX": 0.0, "BOXX": 0.0},
sellable_quantities={"SOXL": 0, "SOXX": 0, "BOXX": 0},
quantities={"SOXL": 0, "SOXX": 0, "BOXX": 0},
current_min_trade=100.0,
trade_threshold_value=100.0,
investable_cash=500.0,
market_status="🧯 过热降档(SOXX)",
deploy_ratio_text="0.0%",
income_ratio_text="0.0%",
income_locked_ratio_text="0.0%",
signal_message="无其他买单,仅保留现金回补",
available_cash=500.0,
total_strategy_equity=500.0,
portfolio_rows=(("SOXL", "SOXX"), ("BOXX",)),
)

sent_messages, _, _ = self._run_strategy(
initial_plan,
prices={"SOXL.US": 100.0, "SOXX.US": 200.0, "BOXX.US": 100.0},
estimate_max_purchase_quantity_value=10,
dry_run_only=True,
)

self.assertEqual(len(sent_messages), 1)
self.assertIn("🧪 模拟运行模式", sent_messages[0])
self.assertIn("🧪 模拟市价买入 BOXX.US", sent_messages[0])

def test_retries_account_refresh_after_sell_until_buying_power_updates(self):
initial_plan = _build_plan(
strategy_profile="tqqq_growth_income",
Expand Down
23 changes: 21 additions & 2 deletions tests/test_request_handling.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,10 +218,10 @@ def fake_run_strategy():
def test_handle_backfill_forces_strategy_run(self):
module = load_module()
observed = {"force_run": None, "validation_only": None}

def fake_run_strategy(*, force_run=False, validation_only=False):
def fake_run_strategy(*, force_run=False, validation_only=False, validation_label="backfill"):
observed["force_run"] = force_run
observed["validation_only"] = validation_only
observed["validation_label"] = validation_label

module.run_strategy = fake_run_strategy

Expand All @@ -232,6 +232,25 @@ def fake_run_strategy(*, force_run=False, validation_only=False):
self.assertEqual(body, "OK")
self.assertTrue(observed["force_run"])
self.assertTrue(observed["validation_only"])
def test_handle_precheck_forces_strategy_run(self):
module = load_module()
observed = {"force_run": None, "validation_only": None}

def fake_run_strategy(*, force_run=False, validation_only=False, validation_label="backfill"):
observed["force_run"] = force_run
observed["validation_only"] = validation_only
observed["validation_label"] = validation_label

module.run_strategy = fake_run_strategy

with module.app.test_request_context("/precheck", method="POST"):
body, status = module.handle_precheck()

self.assertEqual(status, 200)
self.assertEqual(body, "Precheck OK")
self.assertTrue(observed["force_run"])
self.assertTrue(observed["validation_only"])
self.assertEqual(observed["validation_label"], "precheck")

def test_run_strategy_emits_structured_runtime_events(self):
module = load_module()
Expand Down
Loading