diff --git a/src/arroyopy/block.py b/src/arroyopy/block.py index f4ff53b..e6cce1b 100644 --- a/src/arroyopy/block.py +++ b/src/arroyopy/block.py @@ -6,6 +6,7 @@ - Any number of listeners (sources) that feed the operator - Any number of publishers (sinks) that receive processed messages """ + import asyncio import logging from typing import List, Optional @@ -78,6 +79,7 @@ def __init__( self._running = False self._operator_task: Optional[asyncio.Task] = None self._listener_tasks: List[asyncio.Task] = [] + self._publisher_tasks: List[asyncio.Task] = [] # Wire up publishers to operator (sync operation) for publisher in self._publishers: @@ -127,7 +129,7 @@ async def start(self) -> None: """ Start the block and begin processing. - This starts all listeners and the operator's processing loop. + This starts all publishers, listeners and the operator's processing loop. """ if self._running: logger.warning(f"Block '{self.name}' is already running") @@ -140,6 +142,13 @@ async def start(self) -> None: self._running = True + # Start all publishers as background tasks + for publisher in self._publishers: + if hasattr(publisher, "start"): + task = asyncio.create_task(publisher.start()) + self._publisher_tasks.append(task) + logger.info(f"Started publisher: {publisher.__class__.__name__}") + # Start the operator as a background task (it runs an infinite loop) self._operator_task = asyncio.create_task(self.operator.start()) @@ -157,7 +166,7 @@ async def stop(self) -> None: """ Stop the block and all its components. - This gracefully shuts down listeners and the operator. + This gracefully shuts down listeners, publishers and the operator. """ if not self._running: logger.warning(f"Block '{self.name}' is not running") @@ -176,12 +185,14 @@ async def stop(self) -> None: all_tasks = self._listener_tasks.copy() if self._operator_task is not None: all_tasks.append(self._operator_task) + all_tasks.extend(self._publisher_tasks) for task in all_tasks: if not task.done(): task.cancel() if all_tasks: await asyncio.gather(*all_tasks, return_exceptions=True) self._listener_tasks.clear() + self._publisher_tasks.clear() self._operator_task = None logger.info(f"Block '{self.name}' stopped")