Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion MIGRATION.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
# Migration Guide

## 1.4.x -> 1.5.0 (REST client)
## 1.4.x -> 1.5.0 (REST / Stream clients)

- `x10.perpetual.trading_client.PerpetualTradingClient` has been replaced with
`x10.clients.rest.RestApiClient` (client has the same interface but new name reflects its purpose better).
- Leftover models were migrated to `x10.models.*`.
- Most of the dataclasses are immutable now.
- `markets_info` module has been merged into `info` module.
- `x10.perpetual.stream_client.PerpetualStreamClient` has been replaced with
`x10.clients.stream.StreamClient` (same interface, renamed to match the `RestApiClient` naming convention).

---

Expand Down
4 changes: 2 additions & 2 deletions examples/cases/advanced/load_testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@

from examples.utils import BTC_USD_MARKET, create_rest_client
from x10.clients.rest import RestApiClient
from x10.clients.stream import StreamClient
from x10.models.market import MarketModel
from x10.models.order import OrderSide
from x10.perpetual.order_object import create_order_object
from x10.perpetual.stream_client.stream_client import PerpetualStreamClient

LOGGER = logging.getLogger()
MARKET_NAME = BTC_USD_MARKET
Expand Down Expand Up @@ -58,7 +58,7 @@ async def create_orders_loop(*, rest_client: RestApiClient, market: MarketModel,


async def order_confirmation_loop(*, stream_url: str, api_key: str):
stream_client = PerpetualStreamClient(api_url=stream_url)
stream_client = StreamClient(api_url=stream_url)

async with stream_client.subscribe_to_account_updates(api_key) as account_stream:
while not stop_event.is_set():
Expand Down
4 changes: 2 additions & 2 deletions examples/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@
from dotenv import load_dotenv

from x10.clients.rest import RestApiClient
from x10.clients.stream import StreamClient
from x10.config import TESTNET_CONFIG, Config
from x10.core.stark_account import StarkPerpetualAccount
from x10.models.market import TradingConfigModel
from x10.perpetual.simple_client.simple_trading_client import BlockingTradingClient
from x10.perpetual.stream_client import PerpetualStreamClient
from x10.utils.string import is_hex_string

BTC_USD_MARKET = "BTC-USD"
Expand Down Expand Up @@ -88,7 +88,7 @@ def create_blocking_client(config: Config = TESTNET_CONFIG):


def create_stream_client(config: Config = TESTNET_CONFIG):
return PerpetualStreamClient(api_url=config.endpoints.stream_url)
return StreamClient(api_url=config.endpoints.stream_url)


def get_adjust_price_by_pct(config: TradingConfigModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@ async def _serve_message(websocket):

@pytest.mark.asyncio
async def test_orderbook_stream(create_orderbook_message):
from x10.perpetual.stream_client import PerpetualStreamClient
from x10.clients.stream import StreamClient

message_model = create_orderbook_message()

async with websockets.serve(serve_message(message_model.model_dump_json()), "127.0.0.1", 0) as server:
stream_client = PerpetualStreamClient(api_url=get_url_from_server(server))
stream_client = StreamClient(api_url=get_url_from_server(server))
stream = await stream_client.subscribe_to_orderbooks()
msg = await stream.recv()
await stream.close()
Expand All @@ -48,13 +48,13 @@ async def test_orderbook_stream(create_orderbook_message):

@pytest.mark.asyncio
async def test_account_update_trade_stream(create_account_update_trade_message):
from x10.perpetual.stream_client import PerpetualStreamClient
from x10.clients.stream import StreamClient

api_key = "dummy_api_key"
message_model = create_account_update_trade_message()

async with websockets.serve(serve_message(message_model.model_dump_json()), "127.0.0.1", 0) as server:
stream_client = PerpetualStreamClient(api_url=get_url_from_server(server))
stream_client = StreamClient(api_url=get_url_from_server(server))
stream = await stream_client.subscribe_to_account_updates(api_key)
msg = await stream.recv()
await stream.close()
Expand Down Expand Up @@ -95,13 +95,13 @@ async def test_account_update_trade_stream(create_account_update_trade_message):

@pytest.mark.asyncio
async def test_account_update_stream_with_unexpected_type(create_account_update_unknown_message):
from x10.perpetual.stream_client import PerpetualStreamClient
from x10.clients.stream import StreamClient

api_key = "dummy_api_key"
message_model = create_account_update_unknown_message()

async with websockets.serve(serve_message(message_model.model_dump_json()), "127.0.0.1", 0) as server:
stream_client = PerpetualStreamClient(api_url=get_url_from_server(server))
stream_client = StreamClient(api_url=get_url_from_server(server))
stream = await stream_client.subscribe_to_account_updates(api_key)
msg = await stream.recv()
await stream.close()
Expand All @@ -123,12 +123,12 @@ async def test_account_update_stream_with_unexpected_type(create_account_update_
@pytest.mark.asyncio
async def test_candle_stream():
from tests.fixtures.candle import create_candle_stream_message
from x10.perpetual.stream_client import PerpetualStreamClient
from x10.clients.stream import StreamClient

message_model = create_candle_stream_message()

async with websockets.serve(serve_message(message_model.model_dump_json()), "127.0.0.1", 0) as server:
stream_client = PerpetualStreamClient(api_url=get_url_from_server(server))
stream_client = StreamClient(api_url=get_url_from_server(server))
stream = await stream_client.subscribe_to_candles("ETH-USD", "trades", "PT1M")
msg = await stream.recv()
await stream.close()
Expand Down
2 changes: 1 addition & 1 deletion x10/clients/rest/rest_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@

class RestApiClient:
"""
X10 REST API Client.
Extended REST API Client.
"""

__markets: Dict[str, MarketModel] | None
Expand Down
2 changes: 2 additions & 0 deletions x10/clients/stream/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from x10.clients.stream.stream_client import StreamClient # noqa: F401
from x10.clients.stream.stream_connection import StreamConnection # noqa: F401
Original file line number Diff line number Diff line change
@@ -1,21 +1,18 @@
from typing import Dict, List, Optional, Type

from x10.clients.stream.stream_connection import StreamConnection, StreamMsgResponseType
from x10.models.account import AccountStreamDataModel
from x10.models.candle import CandleInterval, CandleModel, CandleType
from x10.models.funding_rate import FundingRateModel
from x10.models.http import WrappedStreamResponseModel
from x10.models.orderbook import OrderbookUpdateModel
from x10.models.trade import PublicTradeModel
from x10.perpetual.stream_client.perpetual_stream_connection import (
PerpetualStreamConnection,
StreamMsgResponseType,
)
from x10.utils.http import UrlQueryParam, get_url


class PerpetualStreamClient:
class StreamClient:
"""
X10 Perpetual Stream Client for the X10 WebSocket v1.
Extended Stream (WebSocket) Client.
"""

__api_url: str
Expand Down Expand Up @@ -80,5 +77,5 @@ def __connect(
stream_url: str,
msg_model_class: Type[StreamMsgResponseType],
api_key: Optional[str] = None,
) -> PerpetualStreamConnection[StreamMsgResponseType]:
return PerpetualStreamConnection(stream_url, msg_model_class, api_key)
) -> StreamConnection[StreamMsgResponseType]:
return StreamConnection(stream_url, msg_model_class, api_key)
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,10 @@
StreamMsgResponseType = TypeVar("StreamMsgResponseType", bound=X10BaseModel)


class PerpetualStreamConnection(Generic[StreamMsgResponseType]):
class StreamConnection(Generic[StreamMsgResponseType]):
__stream_url: str
__msg_model_class: Type[StreamMsgResponseType]
__api_key: Optional[str]
__msgs_count: int
__websocket: Optional[WebSocketClientProtocol]

def __init__(
Expand All @@ -31,7 +30,6 @@ def __init__(
self.__stream_url = stream_url
self.__msg_model_class = msg_model_class
self.__api_key = api_key
self.__msgs_count = 0
self.__websocket = None

async def send(self, data):
Expand All @@ -44,13 +42,11 @@ async def recv(self) -> StreamMsgResponseType:

async def close(self):
assert self.__websocket is not None

if not self.__websocket.closed:
await self.__websocket.close()
LOGGER.debug("Stream closed: %s", self.__stream_url)

@property
def msgs_count(self):
return self.__msgs_count
LOGGER.debug("Stream closed: %s", self.__stream_url)

@property
def closed(self):
Expand All @@ -66,6 +62,7 @@ async def __anext__(self) -> StreamMsgResponseType:

if self.__websocket.closed:
raise StopAsyncIteration

try:
return await self.__receive()
except websockets.ConnectionClosed:
Expand All @@ -75,7 +72,6 @@ async def __receive(self) -> StreamMsgResponseType:
assert self.__websocket is not None

data = await self.__websocket.recv()
self.__msgs_count += 1

return self.__msg_model_class.model_validate_json(data)

Expand All @@ -95,6 +91,10 @@ async def __aexit__(
await self.close()

async def __await_impl__(self):
await self.__connect()
return self

async def __connect(self):
extra_headers: dict[str, str] = {
RequestHeader.USER_AGENT: USER_AGENT,
}
Expand All @@ -105,5 +105,3 @@ async def __await_impl__(self):
self.__websocket = await websockets.connect(self.__stream_url, extra_headers=extra_headers)

LOGGER.debug("Connected to stream: %s", self.__stream_url)

return self
36 changes: 17 additions & 19 deletions x10/perpetual/orderbook.py
Original file line number Diff line number Diff line change
@@ -1,30 +1,30 @@
import asyncio
import decimal
from collections.abc import Awaitable
from dataclasses import dataclass
from decimal import Decimal
from typing import Callable, Iterable, Tuple

from sortedcontainers import SortedDict

from x10.clients.stream import StreamClient
from x10.config import Config
from x10.models.http import StreamDataType
from x10.models.orderbook import OrderbookUpdateModel
from x10.perpetual.stream_client.stream_client import PerpetualStreamClient


@dataclass
class OrderBookEntry:
price: decimal.Decimal
amount: decimal.Decimal
price: Decimal
amount: Decimal

def __repr__(self) -> str:
return f"OrderBookEntry(price={self.price}, amount={self.amount})"


@dataclass(frozen=True)
class ImpactDetails:
price: decimal.Decimal
amount: decimal.Decimal
price: Decimal
amount: Decimal


class OrderBook:
Expand All @@ -50,11 +50,11 @@ def __init__(
best_bid_change_callback: Callable[[OrderBookEntry | None], Awaitable[None]] | None = None,
depth: int | None = None,
) -> None:
self.__stream_client = PerpetualStreamClient(api_url=config.endpoints.stream_url)
self.__stream_client = StreamClient(api_url=config.endpoints.stream_url)
self.__market_name = market_name
self.__task: asyncio.Task | None = None
self._bid_prices: "SortedDict[decimal.Decimal, OrderBookEntry]" = SortedDict() # type: ignore
self._ask_prices: "SortedDict[decimal.Decimal, OrderBookEntry]" = SortedDict() # type: ignore
self._bid_prices: SortedDict = SortedDict()
self._ask_prices: SortedDict = SortedDict()
self.best_ask_change_callback = best_ask_change_callback
self.best_bid_change_callback = best_bid_change_callback
self.depth = depth
Expand Down Expand Up @@ -159,12 +159,10 @@ def best_ask(self) -> OrderBookEntry | None:
except IndexError:
return None

def __price_impact_notional(
self, notional: decimal.Decimal, levels: Iterable[Tuple[decimal.Decimal, OrderBookEntry]]
):
def __price_impact_notional(self, notional: Decimal, levels: Iterable[Tuple[Decimal, OrderBookEntry]]):
remaining_to_spend = notional
total_amount = decimal.Decimal(0)
weighted_sum = decimal.Decimal(0)
total_amount = Decimal(0)
weighted_sum = Decimal(0)
for price, entry in levels:
available_at_price = entry.amount
amount_to_purchase = min(remaining_to_spend / price, available_at_price)
Expand All @@ -183,10 +181,10 @@ def __price_impact_notional(
average_price = weighted_sum / total_amount
return ImpactDetails(price=average_price, amount=total_amount)

def __price_impact_qty(self, qty: decimal.Decimal, levels: Iterable[Tuple[decimal.Decimal, OrderBookEntry]]):
def __price_impact_qty(self, qty: Decimal, levels: Iterable[Tuple[Decimal, OrderBookEntry]]):
remaining_qty = qty
total_amount = decimal.Decimal(0)
total_spent = decimal.Decimal(0)
total_amount = Decimal(0)
total_spent = Decimal(0)
for price, entry in levels:
available_at_price = entry.amount
take = min(remaining_qty, available_at_price)
Expand All @@ -203,7 +201,7 @@ def __price_impact_qty(self, qty: decimal.Decimal, levels: Iterable[Tuple[decima
average_price = total_spent / total_amount
return ImpactDetails(price=average_price, amount=total_amount)

def calculate_price_impact_notional(self, notional: decimal.Decimal, side: str) -> ImpactDetails | None:
def calculate_price_impact_notional(self, notional: Decimal, side: str) -> ImpactDetails | None:
if notional <= 0:
return None
if side == "SELL":
Expand All @@ -216,7 +214,7 @@ def calculate_price_impact_notional(self, notional: decimal.Decimal, side: str)
return self.__price_impact_notional(notional, self._ask_prices.items())
return None

def calculate_price_impact_qty(self, qty: decimal.Decimal, side: str) -> ImpactDetails | None:
def calculate_price_impact_qty(self, qty: Decimal, side: str) -> ImpactDetails | None:
if qty <= 0:
return None
if side == "SELL":
Expand Down
22 changes: 8 additions & 14 deletions x10/perpetual/simple_client/simple_trading_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@
import time
from dataclasses import dataclass
from decimal import Decimal
from typing import Awaitable, Dict, Union, cast
from typing import Awaitable, Dict, Optional, cast

from x10.clients.rest.modules.info_module import InfoModule
from x10.clients.rest.modules.order_management_module import OrderManagementModule
from x10.clients.stream import StreamClient, StreamConnection
from x10.config import Config
from x10.core.stark_account import StarkPerpetualAccount
from x10.errors import SdkError, ValidationError
Expand All @@ -21,10 +22,6 @@
TimeInForce,
)
from x10.perpetual.order_object import create_order_object
from x10.perpetual.stream_client.perpetual_stream_connection import (
PerpetualStreamConnection,
)
from x10.perpetual.stream_client.stream_client import PerpetualStreamClient


def condition_to_awaitable(condition: asyncio.Condition) -> Awaitable:
Expand Down Expand Up @@ -85,15 +82,12 @@ def __init__(self, config: Config, account: StarkPerpetualAccount):
self.__account = account
self.__info_module = InfoModule(config, api_key=account.api_key)
self.__orders_module = OrderManagementModule(config, api_key=account.api_key)
self.__markets: Union[None, Dict[str, MarketModel]] = None
self.__stream_client: PerpetualStreamClient = PerpetualStreamClient(api_url=config.endpoints.stream_url)
self.__account_stream: Union[
None,
PerpetualStreamConnection[WrappedStreamResponseModel[AccountStreamDataModel]],
] = None
self.__markets: Optional[Dict[str, MarketModel]] = None
self.__stream_client: StreamClient = StreamClient(api_url=config.endpoints.stream_url)
self.__account_stream: Optional[StreamConnection[WrappedStreamResponseModel[AccountStreamDataModel]]] = None
self.__order_waiters: Dict[str, OrderWaiter] = {}
self.__cancel_waiters: Dict[str, CancelWaiter] = {}
self.__stream_task = asyncio.create_task(self.___order_stream())
self.__stream_task = asyncio.create_task(self.__order_stream())

@staticmethod
async def create(config: Config, account: StarkPerpetualAccount) -> "BlockingTradingClient":
Expand Down Expand Up @@ -130,15 +124,15 @@ async def __handle_order(self, order: OpenOrderModel):
else:
await self.__handle_update(order)

async def ___order_stream(self):
async def __order_stream(self):
self.__account_stream = await self.__stream_client.subscribe_to_account_updates(self.__account.api_key)
async for event in self.__account_stream:
if not (event.data and event.data.orders):
continue
for order in event.data.orders:
await self.__handle_order(order)
print("Order stream closed, reconnecting...")
await self.___order_stream()
await self.__order_stream()

async def cancel_order(self, order_external_id: str) -> TimedCancel:
awaitable: Awaitable
Expand Down
Loading
Loading