[codex] add adaptive inference rate limiting#139
[codex] add adaptive inference rate limiting#139Hynek Kydlíček (hynky1999) wants to merge 4 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request implements an adaptive request limiting mechanism for inference providers, allowing the system to dynamically adjust concurrency based on HTTP 429 responses and successful request windows. The changes include the AdaptiveRateLimit configuration, the AdaptiveRateLimiter logic, and integration into the generation pipeline with enhanced metrics and documentation. Reviewer feedback identifies opportunities to improve documentation self-containment, eliminate redundant variable updates in the generation loop, and simplify logic in the rate limiter by leveraging class invariants.
|
|
||
| pipeline = mdr.read_jsonl("input.jsonl").map_async( | ||
| mdr.inference.generate( | ||
| fn=summarize, |
There was a problem hiding this comment.
| running_requests += 1 | ||
| try: | ||
| response = await client.generate(request_payload) | ||
| if permit is not None: | ||
| async with permit: | ||
| response = await client.generate(request_payload) | ||
| else: | ||
| response = await client.generate(request_payload) | ||
| except GenerationRateLimitError as err: | ||
| if adaptive_limiter is not None: | ||
| await adaptive_limiter.record_rate_limit(err.retry_after_seconds) | ||
| row.log_throughput("rate_limited_requests", 1, unit="requests") | ||
| row.log_throughput("failed_requests", 1, unit="requests") | ||
| raise | ||
| except Exception: | ||
| row.log_throughput("failed_requests", 1, unit="requests") | ||
| raise | ||
| finally: | ||
| running_requests -= 1 | ||
| request_semaphore.release() | ||
| if request_semaphore is not None: | ||
| request_semaphore.release() |
There was a problem hiding this comment.
The running_requests variable is updated on every request, even when an adaptive_limiter is in use. In the adaptive case, the running_requests metric already uses adaptive_limiter.running as the source of truth, making the updates to the local running_requests variable redundant and potentially confusing. For better clarity and maintainability, these updates should only happen when using the fixed-concurrency request_semaphore.
if adaptive_limiter is None:
running_requests += 1
try:
if permit is not None:
async with permit:
response = await client.generate(request_payload)
else:
response = await client.generate(request_payload)
except GenerationRateLimitError as err:
if adaptive_limiter is not None:
await adaptive_limiter.record_rate_limit(err.retry_after_seconds)
row.log_throughput("rate_limited_requests", 1, unit="requests")
row.log_throughput("failed_requests", 1, unit="requests")
raise
except Exception:
row.log_throughput("failed_requests", 1, unit="requests")
raise
finally:
if adaptive_limiter is None:
running_requests -= 1
if request_semaphore is not None:
request_semaphore.release()| now = time.monotonic() | ||
| if self._should_grow(now): | ||
| next_limit = min( | ||
| self._config.max_concurrency or self._limit, |
There was a problem hiding this comment.
Since __init__ ensures self._config.max_concurrency is not None, the or self._limit part is redundant and can be confusing. Simplifying this expression will improve readability by relying on the class invariant.
| self._config.max_concurrency or self._limit, | |
| self._config.max_concurrency, |
| return ( | ||
| self._successes >= self._config.success_window_requests | ||
| and now - self._last_adjusted_at >= self._config.success_window_seconds | ||
| and self._limit < (self._config.max_concurrency or self._limit) |
There was a problem hiding this comment.
Since __init__ ensures self._config.max_concurrency is not None, the or self._limit part is redundant and can be confusing. Simplifying this expression will improve readability by relying on the class invariant.
| and self._limit < (self._config.max_concurrency or self._limit) | |
| and self._limit < self._config.max_concurrency |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 439b2e67ff
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
| self._condition.wait(), | ||
| timeout=cooldown_remaining, | ||
| ) | ||
| except TimeoutError: |
There was a problem hiding this comment.
Catch asyncio timeout class in cooldown wait
Use asyncio.TimeoutError (or both exception classes) here instead of built-in TimeoutError. On Python 3.10, asyncio.wait_for(...) raises asyncio.TimeoutError, so this except does not match; when cooldown expires, acquire() raises out of the limiter and fails queued inference requests instead of resuming them. Since pyproject.toml declares requires-python = ">=3.10", this is a real runtime regression for supported environments.
Useful? React with 👍 / 👎.
439b2e6 to
0e3998a
Compare
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 0e3998aca0
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "Codex (@codex) review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "Codex (@codex) address that feedback".
| await adaptive_limiter.record_rate_limit(err.retry_after_seconds) | ||
| row.log_throughput("rate_limited_requests", 1, unit="requests") | ||
| row.log_throughput("failed_requests", 1, unit="requests") | ||
| raise |
There was a problem hiding this comment.
Retry rate-limited calls instead of re-raising immediately
When rate_limit is enabled, this branch updates the adaptive limiter and then immediately re-raises the GenerationRateLimitError. In the current execution path, uncaught async step exceptions propagate out of map_async and fail the shard/worker, so a single HTTP 429 still aborts the job before the reduced concurrency can stabilize subsequent requests. This makes the new adaptive limiter ineffective for normal generate(...) usage unless every caller adds custom exception handling.
Useful? React with 👍 / 👎.
0e3998a to
7416fc0
Compare
7416fc0 to
e0aeab5
Compare
Summary
Makes
mdr.inference.generate(...)rate-limit aware by default, with a singlerate_limitparameter for choosing adaptive or fixed request concurrency.Changes
mdr.inference.AdaptiveRateLimit, used by default whenrate_limitis omitted orNone.mdr.inference.StaticRateLimitfor the old fixed-concurrency behavior.max_concurrent_requestsfromgenerate(...); concurrency caps now live on the rate-limit object.Retry-Afterfrom OpenAI-compatible endpoints through a typedGenerationRateLimitError.Validation
uv run ruff check --force-exclude --fix src/refiner/inference tests/test_inference.py tests/test_inference_rate_limit.py examples/lerobot/sarm_annotation.pyuv run ruff format --force-exclude src/refiner/inference tests/test_inference.py tests/test_inference_rate_limit.py examples/lerobot/sarm_annotation.pyuv run ty checkuv run pytestFull suite:
627 passed.