Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions cloud_pipelines_backend/orchestrator_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ def __init__(
default_task_annotations: dict[str, Any] | None = None,
sleep_seconds_between_queue_sweeps: float = 1.0,
output_data_purge_duration: datetime.timedelta = None,
*,
# Internal/experimental:
_max_queue_batch_size: int = 1,
_max_queue_batch_duration: datetime.timedelta = datetime.timedelta(),
):
self._session_factory = session_factory
self._launcher = launcher
Expand All @@ -62,6 +66,9 @@ def __init__(
self._running_executions_queue_idle = False
self._output_data_purge_duration = output_data_purge_duration

self._max_queue_batch_size = _max_queue_batch_size
self._max_queue_batch_duration = _max_queue_batch_duration

def run_loop(self):
while True:
try:
Expand All @@ -77,12 +84,32 @@ def process_each_queue_once(self):
self.internal_process_running_executions_queue,
]
for queue_handler in queue_handlers:
try:
with self._session_factory() as session:
queue_handler(session=session)
except Exception as exc:
_logger.exception(f"Error while executing {queue_handler=}")
bugsnag_instrumentation.notify(exception=exc)
# Optimization: Process same queue multiple times so that multiple fast operations can be batched together.
# Before: Queued (fast), Running (slow), Queued (fast), Running (slow), Queued (fast), Running (slow)
# After: Queued (fast), Queued (fast), Queued (fast), Running (slow)
# After: Queued (fast), Queued (slow), Running (slow)
# Process same queue multiple times
# * For up to X milliseconds (e.g. 200ms)
# * Up to Y times (reduces blast radius of time-related bugs)
# * If the queue is not empty
# Using monotonic timer to avoid bugs when time changes
start_time_seconds = time.monotonic()
for _ in range(self._max_queue_batch_size):
try:
with self._session_factory() as session:
queue_had_items = queue_handler(session=session)
if not queue_had_items:
break
except Exception as exc:
_logger.exception(f"Error while executing {queue_handler=}")
bugsnag_instrumentation.notify(exception=exc)
if (
datetime.timedelta(
milliseconds=(time.monotonic() - start_time_seconds) * 1000
)
> self._max_queue_batch_duration
):
break

def internal_process_queued_executions_queue(self, session: orm.Session):
query = (
Expand Down
Loading