diff --git a/cloud_pipelines_backend/orchestrator_sql.py b/cloud_pipelines_backend/orchestrator_sql.py index 79017b6..cabb11a 100644 --- a/cloud_pipelines_backend/orchestrator_sql.py +++ b/cloud_pipelines_backend/orchestrator_sql.py @@ -50,6 +50,10 @@ def __init__( default_task_annotations: dict[str, Any] | None = None, sleep_seconds_between_queue_sweeps: float = 1.0, output_data_purge_duration: datetime.timedelta = None, + *, + # Internal/experimental: + _max_queue_batch_size: int = 1, + _max_queue_batch_duration: datetime.timedelta = datetime.timedelta(), ): self._session_factory = session_factory self._launcher = launcher @@ -62,6 +66,9 @@ def __init__( self._running_executions_queue_idle = False self._output_data_purge_duration = output_data_purge_duration + self._max_queue_batch_size = _max_queue_batch_size + self._max_queue_batch_duration = _max_queue_batch_duration + def run_loop(self): while True: try: @@ -77,12 +84,32 @@ def process_each_queue_once(self): self.internal_process_running_executions_queue, ] for queue_handler in queue_handlers: - try: - with self._session_factory() as session: - queue_handler(session=session) - except Exception as exc: - _logger.exception(f"Error while executing {queue_handler=}") - bugsnag_instrumentation.notify(exception=exc) + # Optimization: Process same queue multiple times so that multiple fast operations can be batched together. + # Before: Queued (fast), Running (slow), Queued (fast), Running (slow), Queued (fast), Running (slow) + # After: Queued (fast), Queued (fast), Queued (fast), Running (slow) + # After: Queued (fast), Queued (slow), Running (slow) + # Process same queue multiple times + # * For up to X milliseconds (e.g. 200ms) + # * Up to Y times (reduces blast radius of time-related bugs) + # * If the queue is not empty + # Using monotonic timer to avoid bugs when time changes + start_time_seconds = time.monotonic() + for _ in range(self._max_queue_batch_size): + try: + with self._session_factory() as session: + queue_had_items = queue_handler(session=session) + if not queue_had_items: + break + except Exception as exc: + _logger.exception(f"Error while executing {queue_handler=}") + bugsnag_instrumentation.notify(exception=exc) + if ( + datetime.timedelta( + milliseconds=(time.monotonic() - start_time_seconds) * 1000 + ) + > self._max_queue_batch_duration + ): + break def internal_process_queued_executions_queue(self, session: orm.Session): query = (