From 676f0b5fa2d81fa11d5adf148ea3b61714621071 Mon Sep 17 00:00:00 2001 From: Goutam Date: Thu, 11 Jun 2026 12:26:11 -0700 Subject: [PATCH 1/5] [Data][3/n] Fix cross-region S3 bucket access in obstore download path (#63890) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit obstore's S3Store defaults region to us-east-1 and does not follow AWS PermanentRedirect responses, so any obstore-routed S3 request against a bucket in a different region fails non-retryably with BareRedirect. - `_split_obstore_uri` rewrites `https://s3..amazonaws.com//` to s3:// + so StoreRegistry can apply region discovery. - `_discover_aws_bucket_region` resolves a bucket's region via `pyarrow.fs.resolve_s3_region` (already a required Ray Data dependency), cached per bucket. PyArrow issues the `x-amz-bucket-region` HEAD probe and handles the legacy global endpoint / IMDS edge cases; we additionally cache negative results so unresolvable buckets are probed at most once. The probe runs outside the cache lock, and the write-back never lets a `None` result overwrite a region a concurrent thread already cached (a real region always wins), so racing first-time lookups can't intermittently disable region injection. - `StoreRegistry.get` injects the discovered region for `s3://, s3a://` URLs, skipping injection when the caller already supplied a region or a custom endpoint (MinIO/R2/etc.). - All obstore call sites — the HEAD size probe (`_resolve_size`), the actor HEAD path (`_head_one`), ranged downloads (`_fetch_ranged`), and whole-file GET (`_fetch`) — go through `_split_obstore_uri`, so a path-style cross-region URL no longer slips past the rewrite (which previously left the size probe on the regional HTTPS store, returning size 0 and wrongly skipping ranged downloads). GCS and Azure are unaffected: neither encodes region in the endpoint (GCS uses a global endpoint addressed by bucket name; Azure is keyed by storage account), so they have no cross-region redirect failure mode. --------- Signed-off-by: Goutam Co-authored-by: Claude Opus 4.8 (1M context) --- .../_internal/planner/_obstore_download.py | 114 +++++++- .../planner/download_partition_actor.py | 5 +- .../data/tests/unit/test_obstore_download.py | 254 ++++++++++++++++++ 3 files changed, 367 insertions(+), 6 deletions(-) diff --git a/python/ray/data/_internal/planner/_obstore_download.py b/python/ray/data/_internal/planner/_obstore_download.py index fe725aa534f8..2310df765e98 100644 --- a/python/ray/data/_internal/planner/_obstore_download.py +++ b/python/ray/data/_internal/planner/_obstore_download.py @@ -2,9 +2,11 @@ import inspect import logging import os +import re import threading from datetime import datetime, timedelta, timezone -from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple +from urllib.parse import urlparse if TYPE_CHECKING: import s3fs # noqa: F401 @@ -98,6 +100,45 @@ def _log_fallback_warning() -> None: ) +# Path-style AWS S3 host: ``s3.amazonaws.com`` (legacy global) or +# ``s3..amazonaws.com`` (regional). Virtual-host hosts like +# ``.s3..amazonaws.com`` already carry the bucket in the +# netloc and route correctly through obstore without rewriting. +_AWS_PATH_STYLE_HOST_RE = re.compile( + r"^s3(?:\.[a-z0-9-]+)?\.amazonaws\.com$", re.IGNORECASE +) + + +def _split_obstore_uri(uri: str) -> Tuple[str, str]: + """Split a URI into ``(store_url, path)`` for ``obstore.store.from_url``. + + Wraps :func:`_split_uri` to rewrite AWS path-style HTTPS URLs of the + form ``https://s3..amazonaws.com//`` into + ``store_url='s3://'`` / ``path=''``. Without this, obstore + constructs an ``S3Store`` keyed by the regional host endpoint, which + pins region from the URL and bails with ``BareRedirect`` whenever the + bucket actually lives in a different region (AWS's PermanentRedirect + omits the ``Location`` header, so generic redirect-following fails). + Rewriting to ``s3://`` lets ``S3Store`` discover the bucket's + real region once and cache it for the store's lifetime. + + Pre-signed HTTPS URLs (any URI with a query string) are passed through + unchanged because their signature is bound to host + path + query, so + rewriting would invalidate the signature. + """ + parsed = urlparse(uri, allow_fragments=False) + if ( + parsed.scheme in ("http", "https") + and not parsed.query + and _AWS_PATH_STYLE_HOST_RE.match(parsed.netloc) + ): + raw_path = parsed.path[1:] if parsed.path.startswith("/") else parsed.path + bucket, _, key = raw_path.partition("/") + if bucket: + return f"s3://{bucket}", key + return _split_uri(uri) + + def _is_obstore_supported_url(path: str) -> bool: """Check if *path* is a URL that obstore can handle. @@ -484,6 +525,54 @@ def _plan_obstore_routing( return True, fs_kwargs +# Per-process cache: AWS bucket name -> region (``None`` if discovery failed). +# Buckets do not change region, so caching for the process lifetime is safe. +_BUCKET_REGION_CACHE: Dict[str, Optional[str]] = {} +_BUCKET_REGION_CACHE_LOCK = threading.Lock() + + +def _discover_aws_bucket_region(bucket: str) -> Optional[str]: + """Discover an AWS S3 bucket's region, cached per-process. + + Delegates to :func:`pyarrow.fs.resolve_s3_region`, which issues a HEAD + probe and reads ``x-amz-bucket-region`` (AWS returns it for both 200 OK + and 301 PermanentRedirect responses). PyArrow is already a required Ray + Data dependency and caches region lookups in its C++ S3 layer; the extra + per-process dict here also caches negative results, so a bucket that can't + be resolved (network error, non-AWS endpoint, or a PyArrow build without + S3 support) is probed at most once. + + Returns ``None`` when the region cannot be determined. Callers fall back + to obstore's default region handling, which preserves prior behavior for + non-AWS S3-compatible backends (MinIO, R2, etc.). + """ + with _BUCKET_REGION_CACHE_LOCK: + if bucket in _BUCKET_REGION_CACHE: + return _BUCKET_REGION_CACHE[bucket] + + # Probe outside the lock so concurrent first-time lookups don't serialize + # on a network round-trip. + region: Optional[str] = None + try: + region = pyarrow.fs.resolve_s3_region(bucket) + except Exception as e: + # Includes AttributeError on PyArrow builds compiled without S3 + # support, where ``resolve_s3_region`` is absent. + logger.debug("Failed to discover region for bucket %r: %s", bucket, e) + + with _BUCKET_REGION_CACHE_LOCK: + # Another thread may have resolved the same bucket while we probed. + # A real region must win over a ``None`` result: never let a failed + # probe overwrite a region a concurrent thread already cached, or + # later ``StoreRegistry.get`` calls would skip region injection and + # cross-region downloads could fail intermittently. + cached = _BUCKET_REGION_CACHE.get(bucket) + if cached is not None: + return cached + _BUCKET_REGION_CACHE[bucket] = region + return region + + class StoreRegistry: """Cache of store_url -> ObjectStore instances. @@ -506,6 +595,23 @@ def __init__( def get(self, store_url: str) -> Any: if store_url not in self._cache: kwargs = dict(self._filesystem_kwargs) + # obstore's S3Store defaults ``region`` to ``us-east-1`` and does + # not auto-discover the bucket's real region, so cross-region + # buckets fail with ``BareRedirect`` (AWS's PermanentRedirect + # omits the ``Location`` header, defeating generic redirect + # following). Probe ``x-amz-bucket-region`` once per bucket and + # supply the discovered region explicitly. Skip when the caller + # already provided a region or a custom endpoint (MinIO/R2/etc.). + if ( + store_url.startswith(("s3://", "s3a://")) + and "region" not in kwargs + and "endpoint" not in kwargs + ): + bucket = store_url.split("://", 1)[1].split("/", 1)[0] + if bucket: + region = _discover_aws_bucket_region(bucket) + if region: + kwargs["region"] = region if store_url.startswith("http://"): # obstore's reqwest client rejects http:// by default. Auto-enable it # to maintain parity with PyArrow (which accepts http:// via fsspec), @@ -800,7 +906,7 @@ async def _resolve_size( import obstore as obs try: - store_url, path = _split_uri(uri) + store_url, path = _split_obstore_uri(uri) store = registry.get(store_url) async with semaphore: meta = await obs.head_async(store, path) @@ -851,7 +957,7 @@ async def _fetch_ranged( pipeline never loses a file due to a transient range error. """ try: - store_url, path = _split_uri(uri) + store_url, path = _split_obstore_uri(uri) store = registry.get(store_url) result = bytearray(size) @@ -914,6 +1020,6 @@ async def _fetch(uri: str, registry: StoreRegistry) -> bytes: """Download a single URI as a whole-file GET and return raw bytes.""" import obstore as obs - store_url, path = _split_uri(uri) + store_url, path = _split_obstore_uri(uri) result = await obs.get_async(registry.get(store_url), path) return bytes(await result.bytes_async()) diff --git a/python/ray/data/_internal/planner/download_partition_actor.py b/python/ray/data/_internal/planner/download_partition_actor.py index 2f0dc0ead34a..282d4758f81c 100644 --- a/python/ray/data/_internal/planner/download_partition_actor.py +++ b/python/ray/data/_internal/planner/download_partition_actor.py @@ -14,11 +14,12 @@ StoreRegistry, _extract_credentials_from_filesystem, _is_obstore_supported_url, + _split_obstore_uri, ) from ray.data._internal.util import RetryingPyFileSystem, _arrow_batcher from ray.data.block import BlockAccessor from ray.data.context import DataContext -from ray.data.datasource.path_util import _resolve_paths_and_filesystem, _split_uri +from ray.data.datasource.path_util import _resolve_paths_and_filesystem logger = logging.getLogger(__name__) @@ -268,7 +269,7 @@ def _sample_sizes(self, uris: List[str]) -> List[int]: async def _head_one(uri: str) -> int: try: - store_url, path = _split_uri(uri) + store_url, path = _split_obstore_uri(uri) store = self._registry.get(store_url) async with sem: meta = await obs.head_async(store, path) diff --git a/python/ray/data/tests/unit/test_obstore_download.py b/python/ray/data/tests/unit/test_obstore_download.py index 9173a5444fb1..01e70e76c094 100644 --- a/python/ray/data/tests/unit/test_obstore_download.py +++ b/python/ray/data/tests/unit/test_obstore_download.py @@ -10,13 +10,18 @@ import pytest from ray.data._internal.planner._obstore_download import ( + _BUCKET_REGION_CACHE, _FILE_SIZE_COLUMN_PREFIX, + StoreRegistry, + _discover_aws_bucket_region, _download_uris_with_obstore, _extract_credentials_from_filesystem, _is_obstore_supported_url, _obstore_filesystem_requires_threaded_download, _plan_obstore_routing, + _resolve_size, _S3FSSessionCredentialProvider, + _split_obstore_uri, download_bytes_async, ) from ray.data._internal.planner.plan_download_op import ( @@ -58,6 +63,255 @@ def test_split_uri(uri, expected_store_url, expected_path): assert path == expected_path +# TestSplitObstoreUri — AWS path-style HTTPS URLs must be rewritten to s3:// +# so obstore's S3Store keys off the bucket (with region discovery) instead of +# off the regional host endpoint (which pins region and fails with +# BareRedirect on cross-region buckets). +@pytest.mark.parametrize( + "uri, expected_store_url, expected_path", + [ + # Path-style regional: must rewrite to s3://. + ( + "https://s3.us-east-1.amazonaws.com/ray-example-data/imagenet/foo.jpg", + "s3://ray-example-data", + "imagenet/foo.jpg", + ), + # Path-style legacy global endpoint: also rewrite. + ( + "https://s3.amazonaws.com/my-bucket/key.bin", + "s3://my-bucket", + "key.bin", + ), + # Virtual-host style: bucket already in netloc, leave alone. + ( + "https://my-bucket.s3.us-west-2.amazonaws.com/key.bin", + "https://my-bucket.s3.us-west-2.amazonaws.com", + "key.bin", + ), + # Native s3:// URIs: no change. + ( + "s3://my-bucket/prefix/key.jpg", + "s3://my-bucket", + "prefix/key.jpg", + ), + # Pre-signed URLs (any query string): signature is bound to host + + # path + query, so rewriting would invalidate it. Pass through. + ( + "https://s3.us-east-1.amazonaws.com/bucket/key?X-Amz-Signature=abc", + "https://s3.us-east-1.amazonaws.com", + "bucket/key?X-Amz-Signature=abc", + ), + # Non-AWS HTTPS (MinIO, R2 custom domain, generic file server): + # leave alone — only AWS S3 needs the region-discovery rewrite. + ( + "https://files.example.com/bucket/key.bin", + "https://files.example.com", + "bucket/key.bin", + ), + # Other clouds: untouched. + ("gs://bucket/path", "gs://bucket", "path"), + # Path-style with nested key. + ( + "https://s3.eu-central-1.amazonaws.com/bucket/a/b/c.parquet", + "s3://bucket", + "a/b/c.parquet", + ), + # Path-style with only a bucket (no key): bucket is preserved, key + # is empty. Edge case; downstream obstore call would fail but that's + # the caller's problem. + ( + "https://s3.amazonaws.com/just-a-bucket", + "s3://just-a-bucket", + "", + ), + ], +) +def test_split_obstore_uri(uri, expected_store_url, expected_path): + store_url, path = _split_obstore_uri(uri) + assert store_url == expected_store_url + assert path == expected_path + + +# TestBucketRegionDiscovery — exercise the region-discovery path used by +# StoreRegistry to find the real region of cross-region S3 buckets. Region +# resolution is delegated to ``pyarrow.fs.resolve_s3_region``; these tests +# patch it to avoid real network calls. +class TestBucketRegionDiscovery: + @staticmethod + def _clear_cache(): + _BUCKET_REGION_CACHE.clear() + + def test_resolved_region_is_returned(self): + # Whether the bucket is same-region (200 OK) or cross-region (301 + # PermanentRedirect), PyArrow reads x-amz-bucket-region and returns it. + self._clear_cache() + with patch("pyarrow.fs.resolve_s3_region", return_value="us-west-2"): + assert _discover_aws_bucket_region("cross-region-bucket") == "us-west-2" + + def test_cache_hit_avoids_second_probe(self): + # First call probes; second call must not invoke resolve_s3_region. + self._clear_cache() + with patch("pyarrow.fs.resolve_s3_region", return_value="eu-west-1") as resolve: + _discover_aws_bucket_region("euro-bucket") + _discover_aws_bucket_region("euro-bucket") + assert resolve.call_count == 1 + + def test_failure_returns_none_and_caches(self): + # On resolution failure (network error, non-AWS endpoint) return None + # so StoreRegistry leaves obstore's defaults intact for MinIO/R2/etc. + # Cache the negative result so we don't keep probing. + self._clear_cache() + with patch( + "pyarrow.fs.resolve_s3_region", side_effect=OSError("nope") + ) as resolve: + assert _discover_aws_bucket_region("unreachable") is None + assert _discover_aws_bucket_region("unreachable") is None + assert resolve.call_count == 1 + + def test_missing_resolve_s3_region_returns_none(self): + # PyArrow builds compiled without S3 support lack resolve_s3_region, + # so the attribute access raises AttributeError; discovery must degrade + # to None rather than propagate it. + self._clear_cache() + with patch("pyarrow.fs.resolve_s3_region", side_effect=AttributeError("no S3")): + assert _discover_aws_bucket_region("no-s3-support") is None + + def test_none_result_does_not_overwrite_cached_region(self): + # Race: this thread probes (outside the lock) and gets None, while a + # concurrent thread resolves and caches the real region. The failed + # probe must not clobber the cached region, otherwise later + # StoreRegistry.get calls skip region injection and cross-region + # downloads fail intermittently. + self._clear_cache() + + def resolve_then_simulate_concurrent_win(bucket): + # Stand in for another thread caching the real region mid-probe. + _BUCKET_REGION_CACHE[bucket] = "us-west-2" + return None + + with patch( + "pyarrow.fs.resolve_s3_region", + side_effect=resolve_then_simulate_concurrent_win, + ): + assert _discover_aws_bucket_region("racy-bucket") == "us-west-2" + assert _BUCKET_REGION_CACHE["racy-bucket"] == "us-west-2" + + +# TestStoreRegistryRegionInjection — verify region discovery happens at +# store-construction time for s3:// URLs without an explicit region, and is +# correctly skipped when the caller has already configured one. +class TestStoreRegistryRegionInjection: + @staticmethod + def _make_registry_capturing_kwargs(): + _BUCKET_REGION_CACHE.clear() + captured = {} + + def fake_from_url(url, **kwargs): + captured["store_url"] = url + captured["kwargs"] = kwargs + return MagicMock(name="fake-store") + + reg = StoreRegistry() + reg._from_url = fake_from_url + return reg, captured + + def test_s3_url_injects_discovered_region(self): + reg, captured = self._make_registry_capturing_kwargs() + with patch( + "ray.data._internal.planner._obstore_download._discover_aws_bucket_region", + return_value="us-west-2", + ): + reg.get("s3://ray-example-data") + assert captured["kwargs"].get("region") == "us-west-2" + + def test_s3a_url_also_injects_region(self): + reg, captured = self._make_registry_capturing_kwargs() + with patch( + "ray.data._internal.planner._obstore_download._discover_aws_bucket_region", + return_value="us-west-2", + ): + reg.get("s3a://some-bucket") + assert captured["kwargs"].get("region") == "us-west-2" + + def test_explicit_region_not_overridden(self): + # Caller (e.g., creds extracted from PyArrow S3FileSystem) already + # supplied region; do not probe and do not override. + reg = StoreRegistry(region="eu-central-1") + captured = {} + + def fake_from_url(url, **kwargs): + captured["kwargs"] = kwargs + return MagicMock() + + reg._from_url = fake_from_url + with patch( + "ray.data._internal.planner._obstore_download._discover_aws_bucket_region" + ) as probe: + reg.get("s3://bucket") + probe.assert_not_called() + assert captured["kwargs"]["region"] == "eu-central-1" + + def test_custom_endpoint_skips_probe(self): + # MinIO / R2 / localstack: caller configured a custom endpoint, so + # the AWS region probe would be both wrong and wasteful. + reg = StoreRegistry(endpoint="http://localhost:9000") + reg._from_url = lambda *a, **k: MagicMock() + with patch( + "ray.data._internal.planner._obstore_download._discover_aws_bucket_region" + ) as probe: + reg.get("s3://bucket") + probe.assert_not_called() + + def test_https_url_does_not_probe(self): + # Virtual-hosted-style or presigned HTTPS URLs reach get() unchanged + # (after _split_obstore_uri). obstore parses the region from the + # netloc directly, so probing would be redundant. + reg, captured = self._make_registry_capturing_kwargs() + with patch( + "ray.data._internal.planner._obstore_download._discover_aws_bucket_region" + ) as probe: + reg.get("https://bucket.s3.us-east-1.amazonaws.com") + probe.assert_not_called() + + +# TestResolveSizeRewrite — the HEAD size probe must apply the same path-style +# -> s3:// rewrite as the download paths. Otherwise a cross-region path-style +# URL keeps the regional HTTPS store, hits BareRedirect, returns size 0, and +# wrongly skips ranged downloads even when a whole-file GET would succeed. +class TestResolveSizeRewrite: + def test_resolve_size_rewrites_path_style_uri(self): + _BUCKET_REGION_CACHE.clear() + captured = {} + + def fake_from_url(url, **kwargs): + captured["store_url"] = url + return MagicMock(name="store") + + registry = StoreRegistry() + registry._from_url = fake_from_url + + async def fake_head_async(store, path): + captured["path"] = path + return {"size": 4096} + + with patch( + "ray.data._internal.planner._obstore_download._discover_aws_bucket_region", + return_value="us-west-2", + ), patch("obstore.head_async", side_effect=fake_head_async): + size = asyncio.run( + _resolve_size( + "https://s3.us-east-1.amazonaws.com/cross-region-bucket/key.bin", + registry, + asyncio.Semaphore(), + ) + ) + + # Rewritten to s3:// so region discovery applies; HEAD succeeds. + assert size == 4096 + assert captured["store_url"] == "s3://cross-region-bucket" + assert captured["path"] == "key.bin" + + # TestDownloadHelpers class TestDownloadHelpers: """Unit tests for credential extraction and URL-scheme detection.""" From 84c29ec38c92ccfc5bf65bb11920cd90c76369a8 Mon Sep 17 00:00:00 2001 From: Xinyuan <43737116+xinyuangui2@users.noreply.github.com> Date: Thu, 11 Jun 2026 12:58:08 -0700 Subject: [PATCH 2/5] [Core] ObjectRefGenerator._next_sync: honor timeout_s in the end-of-stream ray.get (#64014) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `_next_sync` documents *"if an object is not available within the given timeout, it returns a nil object reference"*, but its end-of-stream handling calls `ray.get(generator_ref)` with **no timeout** (to distinguish a normal end of the stream from a task failure). The bug: after all yielded refs are consumed, the next `_next_sync(timeout_s=...)` call reaches that get. The generator's return object normally resolves locally, but if it lives in plasma and its node died, the get blocks the calling thread until lineage reconstruction re-runs the task — which needs a free CPU. On a saturated cluster this can deadlock: the blocked caller (e.g. Ray Data's scheduling thread) is what consumes outputs and releases the CPUs held by output-backpressured tasks, so reconstruction can never start. Serve's `to_object_ref(timeout_s=...)` similarly blocks past the user's requested timeout when a replica node dies. - Apply the caller's `timeout_s` to the end-of-stream get; report a timeout as a nil ref (retry), per the documented contract. - `timeout_s=None` (and `-1`) keep the blocking behavior, so `__next__` and other timeout-less callers are unchanged. - Regression test: stream exhausted + return object lost with its node → `_next_sync(timeout_s=0)` returns nil instead of blocking (hangs forever without the fix), and the stream terminates normally once the node is restored. 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Signed-off-by: xgui Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com> Co-authored-by: Claude Fable 5 Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> --- python/ray/_private/object_ref_generator.py | 34 +++++++++-- python/ray/tests/test_streaming_generator.py | 60 ++++++++++++++++++++ 2 files changed, 90 insertions(+), 4 deletions(-) diff --git a/python/ray/_private/object_ref_generator.py b/python/ray/_private/object_ref_generator.py index 2b60fb537a6f..d2cf4e715560 100644 --- a/python/ray/_private/object_ref_generator.py +++ b/python/ray/_private/object_ref_generator.py @@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Deque, Iterator, Optional import ray -from ray.exceptions import ObjectRefStreamEndOfStreamError +from ray.exceptions import GetTimeoutError, ObjectRefStreamEndOfStreamError from ray.util.annotations import DeveloperAPI, PublicAPI if TYPE_CHECKING: @@ -227,7 +227,21 @@ def _next_sync(self, timeout_s: Optional[int | float] = None) -> "ray.ObjectRef" # The generator ref contains an exception # if there's any failure. It contains nothing otherwise. # In that case, it should raise StopIteration. - ray.get(self._generator_ref) + # + # Bound this get by the caller's timeout: the return object + # can be remote — or lost to a failed node and pending + # reconstruction — and an unbounded get would block the + # caller until it is restored (e.g. the Ray Data scheduling + # thread; a saturated cluster can then deadlock, since the + # blocked consumer is what releases backpressured CPUs). + # Per this method's contract, a timeout is reported as "no + # object ready yet" (nil ref) so the caller retries. + ray.get( + self._generator_ref, + timeout=(None if timeout_s is None or timeout_s < 0 else timeout_s), + ) + except GetTimeoutError: + return ray.ObjectRef.nil() except Exception: self._generator_task_raised = True return self._generator_ref @@ -270,8 +284,20 @@ async def _next_async(self, timeout_s: Optional[int | float] = None): try: # The generator ref contains an exception # if there's any failure. It contains nothing otherwise. - # In that case, it should raise StopSyncIteration. - await self._generator_ref + # In that case, it should raise StopAsyncIteration. + # + # Bound this await by the caller's timeout, mirroring + # _next_sync: the return object can be remote — or lost to a + # failed node and pending reconstruction — and an unbounded + # await would block the caller until it is restored. Per this + # method's contract, a timeout is reported as "no object + # ready yet" (nil ref) so the caller retries. + if timeout_s is None or timeout_s < 0: + await self._generator_ref + else: + await asyncio.wait_for(self._generator_ref, timeout=timeout_s) + except asyncio.TimeoutError: + return ray.ObjectRef.nil() except Exception: self._generator_task_raised = True return self._generator_ref diff --git a/python/ray/tests/test_streaming_generator.py b/python/ray/tests/test_streaming_generator.py index 39006025eb44..666efe6b7a40 100644 --- a/python/ray/tests/test_streaming_generator.py +++ b/python/ray/tests/test_streaming_generator.py @@ -575,5 +575,65 @@ async def async_f(self): ray.get(next(g)) +def test_next_sync_timeout_when_generator_ref_unavailable( + monkeypatch, ray_start_cluster +): + """_next_sync(timeout_s) must not block in its end-of-stream handling. + + After all yielded refs are consumed, ``_next_sync`` calls + ``ray.get(generator_ref)`` to distinguish a normal end of the stream + from a task failure. If that object is unavailable — e.g. it lived in + the plasma store of a node that died — the get must be bounded by the + caller's timeout (reporting "not ready yet" with a nil ref) instead of + blocking the caller until the object is reconstructed. A blocked caller + can deadlock: reconstruction needs a CPU, and the blocked caller may be + what releases one (see ray-project/ray#63701). + """ + # Force the generator task's return object into plasma (instead of + # being inlined with the owner) so that it is lost when its node dies. + monkeypatch.setenv("RAY_max_direct_call_object_size", "0") + cluster = ray_start_cluster + cluster.add_node(num_cpus=0) + ray.init(address=cluster.address) + worker_node = cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() + + @ray.remote(num_returns="streaming") + def gen(): + yield 1 + + g = gen.remote() + # Consume the single yielded ref so that the stream is exhausted. + first = g._next_sync(timeout_s=30) + assert not first.is_nil() + + # Lose the generator's return object together with its node. + cluster.remove_node(worker_node) + + # End-of-stream handling must honor the timeout: a nil ref, not a block. + start = time.monotonic() + assert g._next_sync(timeout_s=0).is_nil() + assert time.monotonic() - start < 10 + + # The async counterpart must honor the timeout the same way. + start = time.monotonic() + assert asyncio.run(g._next_async(timeout_s=0)).is_nil() + assert time.monotonic() - start < 10 + + # Once a node is back, lineage reconstruction restores the return object + # and the stream terminates normally. + cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() + deadline = time.monotonic() + 60 + while time.monotonic() < deadline: + try: + ref = g._next_sync(timeout_s=1) + except StopIteration: + break + assert ref.is_nil() + else: + pytest.fail("Generator did not finish after the node was restored.") + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__])) From 9ba119183ea3d9fcaace319d20a366a008012924 Mon Sep 17 00:00:00 2001 From: Elliot Barnwell Date: Thu, 11 Jun 2026 16:11:04 -0700 Subject: [PATCH 3/5] [ci] windows: pin pyopenssl in conda base to fix cryptography skew (#64034) The Windows base image build (ci/ray_ci/windows/build_base.sh) crashes when running `conda update -c conda-forge ca-certificates certifi`: AttributeError: module 'lib' has no attribute 'X509_V_FLAG_CB_ISSUER_CHECK' Upgrading the conda base env to python 3.10 (`conda install python=...`) pulls cryptography>=38, which removed `_lib.X509_V_FLAG_CB_ISSUER_CHECK`. pyopenssl is not part of that transaction, so the stale py3.8-era pyopenssl is left behind and still references the removed attribute at import. The next conda invocation imports requests -> urllib3.contrib.pyopenssl -> OpenSSL.crypto and detonates before conda can run, failing the base image build. Co-resolve pyopenssl 23.2.0 in the same conda install transaction so it stays compatible with the cryptography 38.x that gets installed. --------- Signed-off-by: elliot-barn Co-authored-by: Claude Opus 4.8 (1M context) --- .buildkite/test.rules.test.txt | 3 +++ .buildkite/test.rules.txt | 11 ++++++++++- .buildkite/windows.rayci.yml | 1 + ci/ray_ci/windows/build_base.sh | 4 ++-- 4 files changed, 16 insertions(+), 3 deletions(-) diff --git a/.buildkite/test.rules.test.txt b/.buildkite/test.rules.test.txt index a7fffb20236a..e5b041f45b3d 100644 --- a/.buildkite/test.rules.test.txt +++ b/.buildkite/test.rules.test.txt @@ -101,6 +101,9 @@ release/release_tests.yaml: tools ci/lint/lint.sh: tools ci/ray_ci/tester.py: tools ci/ci.sh: tools +# Windows CI driver scripts trigger the Windows test steps, not just tools. +ci/ray_ci/windows/build_base.sh: windows tools +ci/ray_ci/windows/install_tools.sh: windows tools # === Build Scripts === diff --git a/.buildkite/test.rules.txt b/.buildkite/test.rules.txt index 0028b24414b3..58bce054e633 100644 --- a/.buildkite/test.rules.txt +++ b/.buildkite/test.rules.txt @@ -17,7 +17,7 @@ ! python cpp core_cpp java workflow cgraphs_direct_transport dashboard ! ray_client runtime_env_container ! data dask serve ml tune train train_gpu llm rllib rllib_gpu rllib_directly -! linux_wheels macos_wheels docker doc python_dependencies min_build tools +! linux_wheels macos_wheels docker doc python_dependencies min_build tools windows ! release_tests spark_on_ray # NOTE: dstrodtman is leading effort to rework content in RST files, and wanted to shift @@ -250,6 +250,15 @@ ci/ray_ci/macos/macos_ci_build.sh @ macos_wheels ; +# Windows CI driver scripts (base image build, tool install, cleanup) feed the +# windowsbuild image that every Windows test consumes. Emit 'windows' so the +# Windows test steps (tagged 'windows') run, not just the image rebuild. Must +# precede the general ci/ray_ci/ rule below, which would otherwise swallow +# these into 'tools' only. +ci/ray_ci/windows/ +@ windows tools +; + ci/pipeline/ ci/build/ ci/ray_ci/ diff --git a/.buildkite/windows.rayci.yml b/.buildkite/windows.rayci.yml index d6621a220db3..572fccba7dda 100644 --- a/.buildkite/windows.rayci.yml +++ b/.buildkite/windows.rayci.yml @@ -9,6 +9,7 @@ steps: if: build.env("BUILDKITE_PIPELINE_ID") == "0189942e-0876-4b8f-80a4-617f988ec59b" || build.env("BUILDKITE_PIPELINE_ID") == "018f4f1e-1b73-4906-9802-92422e3badaa" tags: - core_cpp + - windows job_env: WINDOWS instance_type: windows commands: diff --git a/ci/ray_ci/windows/build_base.sh b/ci/ray_ci/windows/build_base.sh index 2398179d5c83..080cfc179dce 100644 --- a/ci/ray_ci/windows/build_base.sh +++ b/ci/ray_ci/windows/build_base.sh @@ -16,9 +16,9 @@ conda init # Conda 26.3.1 crashes on Windows when it tries to clean up a locked exe during # a build-variant swap. Preventing self-update avoids that code path. conda config --set auto_update_conda false -conda install -q -y python="${PYTHON_FULL_VERSION}" requests=2.32.3 +conda install -q -y python="${PYTHON_FULL_VERSION}" requests=2.32.3 pyopenssl=23.2.0 # Force CA trust stack to the newest versions available at build time. -conda update -c conda-forge -q -y ca-certificates certifi +conda update --freeze-installed -c conda-forge -q -y ca-certificates certifi # Install torch first, as some dependencies (e.g. torch-spline-conv) need torch to be # installed for their own install. From 0b3944d89b3d18bb9459946ac7853e1e74777c0a Mon Sep 17 00:00:00 2001 From: kourosh hakhamaneshi <31483498+kouroshHakha@users.noreply.github.com> Date: Thu, 11 Jun 2026 17:47:11 -0700 Subject: [PATCH 4/5] [serve.llm] Don't clobber an explicitly-set request_id with the Serve id (#64044) Signed-off-by: Kourosh Hakhamaneshi Co-authored-by: Claude Fable 5 --- .../_internal/serve/core/server/llm_server.py | 12 ++++- .../cpu/deployments/llm/test_llm_server.py | 54 +++++++++++++++++++ 2 files changed, 65 insertions(+), 1 deletion(-) diff --git a/python/ray/llm/_internal/serve/core/server/llm_server.py b/python/ray/llm/_internal/serve/core/server/llm_server.py index 05621d8fc8e4..ee7e49a84201 100644 --- a/python/ray/llm/_internal/serve/core/server/llm_server.py +++ b/python/ray/llm/_internal/serve/core/server/llm_server.py @@ -297,7 +297,17 @@ async def _maybe_add_request_id_to_request( "TranscriptionRequest", ], ): - """Add the request id to the request.""" + """Stamp the Serve request id, unless the caller set request_id explicitly. + + request_id defaults to a random uuid (never None), so use model_fields_set + to avoid clobbering an id a caller deliberately set (e.g. a P/D connector's + coordination id). Some request types (tokenize/detokenize) have no + request_id field at all -- skip those. + """ + if not hasattr(request, "request_id"): + return + if "request_id" in request.model_fields_set: + return request_id = get_serve_request_id() if request_id: request.request_id = request_id diff --git a/python/ray/llm/tests/serve/cpu/deployments/llm/test_llm_server.py b/python/ray/llm/tests/serve/cpu/deployments/llm/test_llm_server.py index ef5d13514e96..345cab4da4ce 100644 --- a/python/ray/llm/tests/serve/cpu/deployments/llm/test_llm_server.py +++ b/python/ray/llm/tests/serve/cpu/deployments/llm/test_llm_server.py @@ -14,6 +14,7 @@ LoraConfig, ModelLoadingConfig, ) +from ray.llm._internal.serve.core.configs.openai_api_models import CompletionRequest from ray.llm._internal.serve.core.protocol import RawRequestInfo from ray.llm._internal.serve.core.server.llm_server import LLMServer from ray.llm._internal.serve.engines.vllm.vllm_engine import ( @@ -731,5 +732,58 @@ def test_noop_when_request_id_unset(self): ) +class TestMaybeAddRequestId: + """``_maybe_add_request_id_to_request`` fills the Serve request id for a + defaulted request_id but never clobbers one the caller set explicitly.""" + + def _set_ctx(self, request_id): + serve.context._serve_request_context.set( + serve.context._RequestContext(request_id=request_id) + ) + + @pytest.mark.asyncio + async def test_defaulted_request_id_is_overwritten_with_serve_id(self): + server = LLMServer.__new__(LLMServer) + req = CompletionRequest(model="m", prompt="hi") # request_id defaulted + assert "request_id" not in req.model_fields_set + self._set_ctx("serve-ctx-id") + try: + await server._maybe_add_request_id_to_request(req) + finally: + serve.context._serve_request_context.set(serve.context._RequestContext()) + assert req.request_id == "serve-ctx-id" + + @pytest.mark.asyncio + async def test_explicit_request_id_is_preserved(self): + server = LLMServer.__new__(LLMServer) + req = CompletionRequest(model="m", prompt="hi", request_id="caller-set-id") + assert "request_id" in req.model_fields_set + self._set_ctx("serve-ctx-id") + try: + await server._maybe_add_request_id_to_request(req) + finally: + serve.context._serve_request_context.set(serve.context._RequestContext()) + # Caller's id wins; the Serve context id does not clobber it. + assert req.request_id == "caller-set-id" + + @pytest.mark.asyncio + async def test_request_without_request_id_field_is_skipped(self): + """Request types without a request_id field (e.g. tokenize/detokenize) + must be handled gracefully, not raise.""" + from pydantic import BaseModel + + class _NoRequestId(BaseModel): + pass + + server = LLMServer.__new__(LLMServer) + req = _NoRequestId() + self._set_ctx("serve-ctx-id") + try: + await server._maybe_add_request_id_to_request(req) + finally: + serve.context._serve_request_context.set(serve.context._RequestContext()) + assert not hasattr(req, "request_id") + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) From d379709c779d2b5724edc85f853ea510287fd929 Mon Sep 17 00:00:00 2001 From: harshit-anyscale Date: Fri, 12 Jun 2026 06:30:29 +0530 Subject: [PATCH 5/5] [serve] Default RAY_SERVE_PORT_QUARANTINE_S to hard-stop-after + margin (#64021) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## Why are these changes needed? `RAY_SERVE_PORT_QUARANTINE_S` holds a released direct-ingress replica port out of the allocation pool so that stale routing state pointing at the old replica drains before another replica can inherit the port. It currently defaults to **10 seconds**. The consumers that hold stale routing state the longest are soft-stopped (reloaded-out) HAProxy worker processes: they run no health checks (see [haproxy#3330](https://github.com/haproxy/haproxy/issues/3330)) and keep routing to their frozen server list until `hard-stop-after` fires — **120s by default** (`RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S`), commonly configured higher. With the current defaults the quarantine is 12x shorter than the window it exists to outlive: a freed port can be handed to a *different app's* replica at +10s while old workers keep sending it the previous app's traffic for up to +120s. Observed in sustained load testing: a just-freed direct-ingress port was recycled into another app's replica inside the stale-worker window, and a soft-stopped worker routed the old app's traffic to it — surfacing as unretried wrong-app 404s at the client. Health checks cannot catch this (they validate the address is serving, not which app is serving). ## What does this change do? Derives the default quarantine from the hard-stop window instead of a fixed 10s: ```python RAY_SERVE_PORT_QUARANTINE_S = get_env_float_non_negative( "RAY_SERVE_PORT_QUARANTINE_S", float(RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S + 30), ) ``` The `+30s` margin covers the broadcast/coalesce/reload latency that elapses before an old worker's hard-stop clock starts (the clock runs from the worker's *orphaning* at reload, which can lag the port release). An explicit `RAY_SERVE_PORT_QUARANTINE_S` still overrides, and `0` still disables quarantining entirely. Sizing rule this encodes (must hold for correctness, now holds by default): ``` port quarantine >= hard-stop-after + reload propagation lag ``` Signed-off-by: harshit Co-authored-by: Seiji Eicher <58963096+eicherseiji@users.noreply.github.com> --- python/ray/serve/_private/constants.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index 4dec001c8747..07eab9475b8a 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -936,8 +936,15 @@ # Hold released replica ports out of the pool this long so proxies can # drop their stale slot before a new replica grabs the same port. 0 disables. +# Defaults to hard-stop-after plus a margin: soft-stopped (reloaded-out) +# HAProxy workers run no health checks and keep routing to their frozen +# server list until hard-stop-after fires, so a freed port must stay out +# of the pool at least that long or another app's replica can inherit the +# old app's traffic. The margin covers the broadcast/reload lag before an +# old worker's hard-stop clock starts. RAY_SERVE_PORT_QUARANTINE_S = get_env_float_non_negative( - "RAY_SERVE_PORT_QUARANTINE_S", 10.0 + "RAY_SERVE_PORT_QUARANTINE_S", + float(RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S + 30), ) # The minimum drain period for a HTTP proxy. # If RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS is set to 1,