Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
183 changes: 147 additions & 36 deletions application/execution_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from __future__ import annotations

import math
import traceback
from collections.abc import Mapping
from dataclasses import dataclass
Expand Down Expand Up @@ -222,10 +223,12 @@ def execute_rebalance_cycle(
quantities = dict(portfolio["quantities"])
sellable_quantities = dict(portfolio["sellable_quantities"])
target_values = dict(allocation["targets"])
cash_sweep_symbol = str(portfolio.get("cash_sweep_symbol") or "").strip().upper()
available_cash = float(portfolio["liquid_cash"])
cash_by_currency = _normalize_cash_by_currency(portfolio.get("cash_by_currency"))
investable_cash = float(execution["investable_cash"])
current_min_trade = float(execution["current_min_trade"])
dry_run_sale_proceeds = 0.0

def append_order_id_suffix(log_message, order_id):
order_id_text = str(order_id or "").strip()
Expand Down Expand Up @@ -311,6 +314,41 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
print(with_prefix(message), flush=True)
return True

def cash_sweep_sale_quantity_to_fund_buy(max_quantity, candidate_symbols):
if not cash_sweep_symbol or max_quantity <= 0:
return 0
cash_sweep_price = safe_quote_last_price(
f"{cash_sweep_symbol}.US",
market_data_port=market_data_port,
notify_issue=notify_issue,
)
if cash_sweep_price is None or cash_sweep_price <= 0.0:
return 0
base_buying_power = max(0.0, float(investable_cash))
for buy_symbol in candidate_symbols:
underweight_value = target_values[buy_symbol] - market_values[buy_symbol]
if underweight_value <= threshold_value:
continue
buy_price = safe_quote_last_price(
f"{buy_symbol}.US",
market_data_port=market_data_port,
notify_issue=notify_issue,
)
if buy_price is None:
continue
ask = round(buy_price * limit_buy_premium, 2) if buy_symbol in limit_order_symbols else round(buy_price, 2)
max_buy_quantity = int(underweight_value // ask)
if max_buy_quantity <= 0:
continue
required_buying_power = max_buy_quantity * ask
if base_buying_power >= required_buying_power:
return 0
Comment on lines +344 to +345
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Continue scanning buy candidates before skipping sweep sale

The helper exits with return 0 as soon as the first underweight symbol is affordable with current buying power, so it never checks later candidates that may still require extra cash. In a cycle with multiple buy candidates (e.g., a small first deficit and a larger second deficit), this prevents the cash-sweep sell from being triggered even though later buys will be skipped for insufficient investable cash.

Useful? React with 👍 / 👎.

return min(
max_quantity,
max(1, math.ceil((required_buying_power - base_buying_power) / cash_sweep_price)),
)
return 0

for symbol in strategy_assets:
diff = target_values[symbol] - market_values[symbol]
if diff < -threshold_value and abs(diff) > current_min_trade:
Expand Down Expand Up @@ -361,6 +399,8 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
round(price, 2),
order_type="market",
)
if submitted:
dry_run_sale_proceeds += float(quantity) * round(price, 2)
else:
submitted = submit_order_via_port(
f"{symbol}.US",
Expand All @@ -373,11 +413,11 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
if submitted:
action_done = True
sell_submitted = True
elif sellable_quantities[symbol] <= 0 and quantities[symbol] > 0:
record_skip_log(
skip_logs,
translator=translator,
with_prefix=with_prefix,
elif sellable_quantities[symbol] <= 0 and quantities[symbol] > 0:
record_skip_log(
Comment on lines +416 to +417
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Restore sell-skipped branch to quantity check level

The sell_skipped logging path became unreachable after this change because elif sellable_quantities[symbol] <= 0 ... is now attached to if submitted instead of the outer if quantity > 0 block. When a position is unsellable (quantity == 0 from _sell_order_quantity due to no sellable shares), we no longer emit the skip log, which removes the operator-facing signal explaining why a required rebalance sell was skipped.

Useful? React with 👍 / 👎.

skip_logs,
translator=translator,
with_prefix=with_prefix,
kind="sell_skipped",
detail=(
f"Symbol: {symbol}.US Diff: ${abs(diff):.2f} "
Expand All @@ -386,38 +426,109 @@ def record_dry_run(symbol, side, quantity, price, *, order_type):
),
)

if sell_submitted:
previous_investable_cash = investable_cash
refresh_attempts = max(1, int(post_sell_refresh_attempts or 1))
refresh_interval = max(0.0, float(post_sell_refresh_interval_sec or 0.0))
best_refreshed_state = None
best_investable_cash = previous_investable_cash
for attempt in range(refresh_attempts):
if attempt > 0:
sleeper(refresh_interval)
refreshed_state = fetch_replanned_state()
refreshed_execution = refreshed_state[2]
refreshed_investable_cash = float(refreshed_execution["investable_cash"])
if best_refreshed_state is None or refreshed_investable_cash > best_investable_cash:
best_refreshed_state = refreshed_state
best_investable_cash = refreshed_investable_cash
if refreshed_investable_cash > previous_investable_cash:
best_refreshed_state = refreshed_state
break
plan, portfolio, execution, allocation = best_refreshed_state
threshold_value = float(execution["trade_threshold_value"])
limit_order_symbols = set(
allocation.get("risk_symbols", ()) + allocation.get("income_symbols", ())
buy_candidates = [
symbol
for symbol in strategy_assets
if (target_values[symbol] - market_values[symbol]) > threshold_value
and abs(target_values[symbol] - market_values[symbol]) > current_min_trade
]
funding_buy_candidates = [
symbol
for symbol in buy_candidates
if symbol != cash_sweep_symbol
]
if (
not sell_submitted
and funding_buy_candidates
and cash_sweep_symbol
and sellable_quantities.get(cash_sweep_symbol, 0.0) > 0.0
):
sweep_quantity = cash_sweep_sale_quantity_to_fund_buy(
int(sellable_quantities[cash_sweep_symbol]),
funding_buy_candidates,
)
strategy_assets = tuple(allocation["strategy_symbols"])
market_values = dict(portfolio["market_values"])
quantities = dict(portfolio["quantities"])
sellable_quantities = dict(portfolio["sellable_quantities"])
target_values = dict(allocation["targets"])
available_cash = float(portfolio["liquid_cash"])
cash_by_currency = _normalize_cash_by_currency(portfolio.get("cash_by_currency"))
investable_cash = float(execution["investable_cash"])
current_min_trade = float(execution["current_min_trade"])
if sweep_quantity > 0:
sweep_price = round(
float(
safe_quote_last_price(
f"{cash_sweep_symbol}.US",
market_data_port=market_data_port,
notify_issue=notify_issue,
)
or 0.0
),
2,
)
if dry_run_only:
submitted = record_dry_run(
f"{cash_sweep_symbol}.US",
"sell",
format_quantity(sweep_quantity),
sweep_price,
order_type="market",
)
if submitted:
dry_run_sale_proceeds += float(sweep_quantity) * sweep_price
else:
submitted = submit_order_via_port(
f"{cash_sweep_symbol}.US",
"market",
"sell",
sweep_quantity,
translator(
"market_sell",
symbol=cash_sweep_symbol,
qty=format_quantity(sweep_quantity),
price=sweep_price,
),
)
if submitted:
action_done = True
sell_submitted = True

if sell_submitted:
if dry_run_only and dry_run_sale_proceeds > 0.0:
simulated_cash = float(dry_run_sale_proceeds)
available_cash = max(0.0, available_cash + simulated_cash)
investable_cash = max(0.0, investable_cash + simulated_cash)
validation_message = (
f"🧪 验证回款已入账: cash=${simulated_cash:.2f} "
f"investable=${investable_cash:.2f}"
)
note_logs.append(validation_message)
print(with_prefix(validation_message), flush=True)
else:
previous_investable_cash = investable_cash
refresh_attempts = max(1, int(post_sell_refresh_attempts or 1))
refresh_interval = max(0.0, float(post_sell_refresh_interval_sec or 0.0))
best_refreshed_state = None
best_investable_cash = previous_investable_cash
for attempt in range(refresh_attempts):
if attempt > 0:
sleeper(refresh_interval)
refreshed_state = fetch_replanned_state()
refreshed_execution = refreshed_state[2]
refreshed_investable_cash = float(refreshed_execution["investable_cash"])
if best_refreshed_state is None or refreshed_investable_cash > best_investable_cash:
best_refreshed_state = refreshed_state
best_investable_cash = refreshed_investable_cash
if refreshed_investable_cash > previous_investable_cash:
best_refreshed_state = refreshed_state
break
plan, portfolio, execution, allocation = best_refreshed_state
threshold_value = float(execution["trade_threshold_value"])
limit_order_symbols = set(
allocation.get("risk_symbols", ()) + allocation.get("income_symbols", ())
)
strategy_assets = tuple(allocation["strategy_symbols"])
market_values = dict(portfolio["market_values"])
quantities = dict(portfolio["quantities"])
sellable_quantities = dict(portfolio["sellable_quantities"])
target_values = dict(allocation["targets"])
available_cash = float(portfolio["liquid_cash"])
cash_by_currency = _normalize_cash_by_currency(portfolio.get("cash_by_currency"))
investable_cash = float(execution["investable_cash"])
current_min_trade = float(execution["current_min_trade"])

if (
available_cash <= 0.0
Expand Down
65 changes: 65 additions & 0 deletions application/longbridge_execution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
from __future__ import annotations

import time
from typing import Any

from quant_platform_kit.common.models import ExecutionReport

_qpk_submit_order = None


def _get_qpk_submit_order():
global _qpk_submit_order
if _qpk_submit_order is None:
from quant_platform_kit.longbridge.execution import submit_order as qpk_submit_order

_qpk_submit_order = qpk_submit_order
return _qpk_submit_order


def _get_error_code(exc: Exception) -> str | None:
code = getattr(exc, "code", None)
if code is None:
payload = getattr(exc, "args", None)
if payload:
text = " ".join(str(item) for item in payload)
if "603203" in text:
return "603203"
return str(code).strip() if code is not None else None


def _is_retriable_internal_error(exc: Exception) -> bool:
code = _get_error_code(exc)
if code == "603203":
return True
message = str(exc).lower()
return "internal server error" in message


def submit_order(
t_ctx: Any,
symbol: str,
*,
order_kind: str,
side: str,
quantity: float,
submitted_price: float | None = None,
) -> ExecutionReport:
last_error: Exception | None = None
for attempt in range(2):
try:
return _get_qpk_submit_order()(
t_ctx,
symbol,
order_kind=order_kind,
side=side,
quantity=quantity,
submitted_price=submitted_price,
)
except Exception as exc:
last_error = exc
if attempt == 0 and _is_retriable_internal_error(exc):
time.sleep(0.5)
continue
raise
raise last_error or RuntimeError("LongBridge submit_order failed")
Loading