From 3e3085b19e04794cb7b22777f92e223955f60264 Mon Sep 17 00:00:00 2001 From: Xiaoya Chong <150726549+xiaoyachong@users.noreply.github.com> Date: Tue, 14 Apr 2026 17:57:20 -0700 Subject: [PATCH 1/2] call publisher start --- src/arroyopy/block.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/src/arroyopy/block.py b/src/arroyopy/block.py index f4ff53b..5fef00a 100644 --- a/src/arroyopy/block.py +++ b/src/arroyopy/block.py @@ -78,6 +78,7 @@ def __init__( self._running = False self._operator_task: Optional[asyncio.Task] = None self._listener_tasks: List[asyncio.Task] = [] + self._publisher_tasks: List[asyncio.Task] = [] # Wire up publishers to operator (sync operation) for publisher in self._publishers: @@ -127,7 +128,7 @@ async def start(self) -> None: """ Start the block and begin processing. - This starts all listeners and the operator's processing loop. + This starts all publishers, listeners and the operator's processing loop. """ if self._running: logger.warning(f"Block '{self.name}' is already running") @@ -140,6 +141,13 @@ async def start(self) -> None: self._running = True + # Start all publishers as background tasks + for publisher in self._publishers: + if hasattr(publisher, 'start'): + task = asyncio.create_task(publisher.start()) + self._publisher_tasks.append(task) + logger.info(f"Started publisher: {publisher.__class__.__name__}") + # Start the operator as a background task (it runs an infinite loop) self._operator_task = asyncio.create_task(self.operator.start()) @@ -157,7 +165,7 @@ async def stop(self) -> None: """ Stop the block and all its components. - This gracefully shuts down listeners and the operator. + This gracefully shuts down listeners, publishers and the operator. """ if not self._running: logger.warning(f"Block '{self.name}' is not running") @@ -176,12 +184,14 @@ async def stop(self) -> None: all_tasks = self._listener_tasks.copy() if self._operator_task is not None: all_tasks.append(self._operator_task) + all_tasks.extend(self._publisher_tasks) for task in all_tasks: if not task.done(): task.cancel() if all_tasks: await asyncio.gather(*all_tasks, return_exceptions=True) self._listener_tasks.clear() + self._publisher_tasks.clear() self._operator_task = None logger.info(f"Block '{self.name}' stopped") From 2de1f1978f1a7214b6b33fb915b6bb8c1748ea98 Mon Sep 17 00:00:00 2001 From: Xiaoya Chong <150726549+xiaoyachong@users.noreply.github.com> Date: Fri, 17 Apr 2026 11:14:48 -0700 Subject: [PATCH 2/2] black fix --- src/arroyopy/block.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/arroyopy/block.py b/src/arroyopy/block.py index 5fef00a..e6cce1b 100644 --- a/src/arroyopy/block.py +++ b/src/arroyopy/block.py @@ -6,6 +6,7 @@ - Any number of listeners (sources) that feed the operator - Any number of publishers (sinks) that receive processed messages """ + import asyncio import logging from typing import List, Optional @@ -143,7 +144,7 @@ async def start(self) -> None: # Start all publishers as background tasks for publisher in self._publishers: - if hasattr(publisher, 'start'): + if hasattr(publisher, "start"): task = asyncio.create_task(publisher.start()) self._publisher_tasks.append(task) logger.info(f"Started publisher: {publisher.__class__.__name__}")