Websocket requests including supported entity types#47
Websocket requests including supported entity types#47albaintor wants to merge 44 commits intounfoldedcircle:mainfrom
Conversation
Improved requests signatures Handled unkown entity types
39be2f9 to
df90062
Compare
|
All done and tested : #47 To test it and include it in the integrations : This is what I have done for Kodi and it now works on Remote 2 Thanks @kennymc-c for the work, chatgpt helped me also for the rest :-) |
|
I'm now getting timeouts no matter how high I set the timeout value. Right after the timeout the response is shown in the log. Could something blocking the response? |
|
I have fixed some stuff in the meantime : after authentication the integration is requesting supported entity types. Normally the remote should respond quickly to this request. |
|
@zehnm could advise otherwise |
|
Also with the current version I still get a timeout |
|
The problem comes from the remote core : it cannot accept any requests from the integration if it itself is waiting for a response from the integration after sending a request.
This is the reason why I chose to ignore the timeout in the ucapi and try again later, but the right way would be to accept requests at anytime on the core side. Don't know if this is easy to implement |
|
I'm back at my original non-blocking version that works fine in the same case. I also added a timeout to prevent a loop while waiting for an answer. As the timeout is 30 seconds just like the setup timeout I should get a possible exception before the setup itself times out. |
|
Can you be more specific : in your version if I remember correctly the requests took the control over the websocket server until the response occurs ? In that case no others requests from the remote would be handled. Anyway I don't understand how the result would differ from the new implementation : if you are in the setup flow and the remote expects a response from the integration, I don't get how you could send a request and get a response |
|
Ok I have found the cause : the core is not in cause. The websocket handle of requests is blocked until a response is done. |
|
While it now works during the setup process I think I'm running into a race condition after a driver restart when adding the available entitites after receiving the connect event. The remote is asking for the available entitites but they have not yet all been added. Also the connect event is sent right after the setup again so I don't think it's the right place for this anway. |
This could be due to this code but I don't understand the problem : the supported entity types are extracted right after authentication but before the connected event so before the request for available entities. This should introduce just a request/response but no race condition... do you have logs ? await self._authenticate(websocket, True)
# Request supported entity types from remote
asyncio.create_task(self._update_supported_entity_types(websocket))
self._events.emit(uc.Events.CLIENT_CONNECTED) |
|
According to my log the authentication respone comes after my request: |
|
Normally the driver should be started before the setup flow like this await api.init("driver.json", setup_flow.driver_setup_handler)Anyway, I have moved the call to entity types extraction inside the request for available entities. The problem should not occur anymore hopefully |
|
Is this ready for review? Otherwise please mark this PR as draft if there are more updates or fixes coming. |
We thought it was but after testings it needed some adjustements. I will test the modifications before switching it back to review |
| await self._handle_ws_event_msg(websocket, msg, msg_data) | ||
| elif kind == "resp": | ||
| # Response to a previously sent request | ||
| # Some implementations use "req_id", others use "id" |
There was a problem hiding this comment.
Where did you see an id field in a response message?
Response messages may not contain an id field, but a req_id for the corresponding request message.
zehnm
left a comment
There was a problem hiding this comment.
We need a better solution for the incoming message processing that keeps message ordering.
I can start working on that after the next firmware release. Otherwise let me know if you'd like to implement it.
| if isinstance(message, str): | ||
| # JSON text message | ||
| await self._process_ws_message(websocket, message) | ||
| asyncio.create_task(self._process_ws_message(websocket, message)) |
There was a problem hiding this comment.
After further consideration, creating a dedicated task per received JSON message is not a good solution and can create more issues than it solves.
- Increases concurrency without a bound.
- That makes message ordering non-deterministic. Especially for the press-and-hold key commands and events this could easily introduce hard to trace bugs.
- Multiple tasks are accessing the shared connection state in the class, easily creating more issues.
I suggest a proper solution with a consumer/producer model with queues. One task reads from the socket and enqueues messages, one task writes outbound messages, and worker tasks or inline logic process messages from the queue. This keeps websocket access centralized, gives you explicit backpressure, and makes ordering and shutdown behavior much easier to control.
A simple demo implementation based on the Websockets library documentation https://websockets.readthedocs.io/en/15.0.1/howto/patterns.html
import asyncio
from websockets import serve
from websockets.exceptions import ConnectionClosedOK, ConnectionClosedError
async def consumer(ws, incoming: asyncio.Queue):
try:
async for message in ws:
await incoming.put(message)
finally:
await incoming.put(None) # sentinel
async def producer(ws, outgoing: asyncio.Queue):
try:
while True:
msg = await outgoing.get()
if msg is None:
break
await ws.send(msg)
except (ConnectionClosedOK, ConnectionClosedError):
pass
async def router(incoming: asyncio.Queue, outgoing: asyncio.Queue):
while True:
msg = await incoming.get()
if msg is None:
break
# process message (possibly slow)
reply = await process_message(msg)
# enqueue reply without directly touching ws
await outgoing.put(reply)
async def handler(ws):
incoming = asyncio.Queue()
outgoing = asyncio.Queue()
tasks = [
asyncio.create_task(consumer(ws, incoming)),
asyncio.create_task(producer(ws, outgoing)),
asyncio.create_task(router(incoming, outgoing)),
]
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
for t in pending:
t.cancel()|
Hi, I agree about appending tasks (a network packet shouldn't be handled as a task even if tasks are not really threads). |
See #47 (comment) and PR #49 |
# Conflicts: # ucapi/api.py # ucapi/api_definitions.py # ucapi/media_player.py
…egration-python-library into websocket_requests # Conflicts: # ucapi/api.py
|
I have applied and tested the suggested approach. IA helped me this was much easier this way. I have done a few tests but this should be tested deeper. |
|
Hmm this needs additional work because you cannot request the client inside a request with this code. |
|
This is better now, the queues are not blocked when a request/response occur within a pending request |
This is the PR I have modified from #46