diff --git a/.buildkite/test.rules.test.txt b/.buildkite/test.rules.test.txt index a7fffb20236a..e5b041f45b3d 100644 --- a/.buildkite/test.rules.test.txt +++ b/.buildkite/test.rules.test.txt @@ -101,6 +101,9 @@ release/release_tests.yaml: tools ci/lint/lint.sh: tools ci/ray_ci/tester.py: tools ci/ci.sh: tools +# Windows CI driver scripts trigger the Windows test steps, not just tools. +ci/ray_ci/windows/build_base.sh: windows tools +ci/ray_ci/windows/install_tools.sh: windows tools # === Build Scripts === diff --git a/.buildkite/test.rules.txt b/.buildkite/test.rules.txt index 0028b24414b3..58bce054e633 100644 --- a/.buildkite/test.rules.txt +++ b/.buildkite/test.rules.txt @@ -17,7 +17,7 @@ ! python cpp core_cpp java workflow cgraphs_direct_transport dashboard ! ray_client runtime_env_container ! data dask serve ml tune train train_gpu llm rllib rllib_gpu rllib_directly -! linux_wheels macos_wheels docker doc python_dependencies min_build tools +! linux_wheels macos_wheels docker doc python_dependencies min_build tools windows ! release_tests spark_on_ray # NOTE: dstrodtman is leading effort to rework content in RST files, and wanted to shift @@ -250,6 +250,15 @@ ci/ray_ci/macos/macos_ci_build.sh @ macos_wheels ; +# Windows CI driver scripts (base image build, tool install, cleanup) feed the +# windowsbuild image that every Windows test consumes. Emit 'windows' so the +# Windows test steps (tagged 'windows') run, not just the image rebuild. Must +# precede the general ci/ray_ci/ rule below, which would otherwise swallow +# these into 'tools' only. +ci/ray_ci/windows/ +@ windows tools +; + ci/pipeline/ ci/build/ ci/ray_ci/ diff --git a/.buildkite/windows.rayci.yml b/.buildkite/windows.rayci.yml index d6621a220db3..572fccba7dda 100644 --- a/.buildkite/windows.rayci.yml +++ b/.buildkite/windows.rayci.yml @@ -9,6 +9,7 @@ steps: if: build.env("BUILDKITE_PIPELINE_ID") == "0189942e-0876-4b8f-80a4-617f988ec59b" || build.env("BUILDKITE_PIPELINE_ID") == "018f4f1e-1b73-4906-9802-92422e3badaa" tags: - core_cpp + - windows job_env: WINDOWS instance_type: windows commands: diff --git a/ci/ray_ci/windows/build_base.sh b/ci/ray_ci/windows/build_base.sh index 2398179d5c83..080cfc179dce 100644 --- a/ci/ray_ci/windows/build_base.sh +++ b/ci/ray_ci/windows/build_base.sh @@ -16,9 +16,9 @@ conda init # Conda 26.3.1 crashes on Windows when it tries to clean up a locked exe during # a build-variant swap. Preventing self-update avoids that code path. conda config --set auto_update_conda false -conda install -q -y python="${PYTHON_FULL_VERSION}" requests=2.32.3 +conda install -q -y python="${PYTHON_FULL_VERSION}" requests=2.32.3 pyopenssl=23.2.0 # Force CA trust stack to the newest versions available at build time. -conda update -c conda-forge -q -y ca-certificates certifi +conda update --freeze-installed -c conda-forge -q -y ca-certificates certifi # Install torch first, as some dependencies (e.g. torch-spline-conv) need torch to be # installed for their own install. diff --git a/python/ray/_private/object_ref_generator.py b/python/ray/_private/object_ref_generator.py index 2b60fb537a6f..d2cf4e715560 100644 --- a/python/ray/_private/object_ref_generator.py +++ b/python/ray/_private/object_ref_generator.py @@ -5,7 +5,7 @@ from typing import TYPE_CHECKING, Deque, Iterator, Optional import ray -from ray.exceptions import ObjectRefStreamEndOfStreamError +from ray.exceptions import GetTimeoutError, ObjectRefStreamEndOfStreamError from ray.util.annotations import DeveloperAPI, PublicAPI if TYPE_CHECKING: @@ -227,7 +227,21 @@ def _next_sync(self, timeout_s: Optional[int | float] = None) -> "ray.ObjectRef" # The generator ref contains an exception # if there's any failure. It contains nothing otherwise. # In that case, it should raise StopIteration. - ray.get(self._generator_ref) + # + # Bound this get by the caller's timeout: the return object + # can be remote — or lost to a failed node and pending + # reconstruction — and an unbounded get would block the + # caller until it is restored (e.g. the Ray Data scheduling + # thread; a saturated cluster can then deadlock, since the + # blocked consumer is what releases backpressured CPUs). + # Per this method's contract, a timeout is reported as "no + # object ready yet" (nil ref) so the caller retries. + ray.get( + self._generator_ref, + timeout=(None if timeout_s is None or timeout_s < 0 else timeout_s), + ) + except GetTimeoutError: + return ray.ObjectRef.nil() except Exception: self._generator_task_raised = True return self._generator_ref @@ -270,8 +284,20 @@ async def _next_async(self, timeout_s: Optional[int | float] = None): try: # The generator ref contains an exception # if there's any failure. It contains nothing otherwise. - # In that case, it should raise StopSyncIteration. - await self._generator_ref + # In that case, it should raise StopAsyncIteration. + # + # Bound this await by the caller's timeout, mirroring + # _next_sync: the return object can be remote — or lost to a + # failed node and pending reconstruction — and an unbounded + # await would block the caller until it is restored. Per this + # method's contract, a timeout is reported as "no object + # ready yet" (nil ref) so the caller retries. + if timeout_s is None or timeout_s < 0: + await self._generator_ref + else: + await asyncio.wait_for(self._generator_ref, timeout=timeout_s) + except asyncio.TimeoutError: + return ray.ObjectRef.nil() except Exception: self._generator_task_raised = True return self._generator_ref diff --git a/python/ray/data/_internal/planner/_obstore_download.py b/python/ray/data/_internal/planner/_obstore_download.py index fe725aa534f8..2310df765e98 100644 --- a/python/ray/data/_internal/planner/_obstore_download.py +++ b/python/ray/data/_internal/planner/_obstore_download.py @@ -2,9 +2,11 @@ import inspect import logging import os +import re import threading from datetime import datetime, timedelta, timezone -from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional +from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple +from urllib.parse import urlparse if TYPE_CHECKING: import s3fs # noqa: F401 @@ -98,6 +100,45 @@ def _log_fallback_warning() -> None: ) +# Path-style AWS S3 host: ``s3.amazonaws.com`` (legacy global) or +# ``s3..amazonaws.com`` (regional). Virtual-host hosts like +# ``.s3..amazonaws.com`` already carry the bucket in the +# netloc and route correctly through obstore without rewriting. +_AWS_PATH_STYLE_HOST_RE = re.compile( + r"^s3(?:\.[a-z0-9-]+)?\.amazonaws\.com$", re.IGNORECASE +) + + +def _split_obstore_uri(uri: str) -> Tuple[str, str]: + """Split a URI into ``(store_url, path)`` for ``obstore.store.from_url``. + + Wraps :func:`_split_uri` to rewrite AWS path-style HTTPS URLs of the + form ``https://s3..amazonaws.com//`` into + ``store_url='s3://'`` / ``path=''``. Without this, obstore + constructs an ``S3Store`` keyed by the regional host endpoint, which + pins region from the URL and bails with ``BareRedirect`` whenever the + bucket actually lives in a different region (AWS's PermanentRedirect + omits the ``Location`` header, so generic redirect-following fails). + Rewriting to ``s3://`` lets ``S3Store`` discover the bucket's + real region once and cache it for the store's lifetime. + + Pre-signed HTTPS URLs (any URI with a query string) are passed through + unchanged because their signature is bound to host + path + query, so + rewriting would invalidate the signature. + """ + parsed = urlparse(uri, allow_fragments=False) + if ( + parsed.scheme in ("http", "https") + and not parsed.query + and _AWS_PATH_STYLE_HOST_RE.match(parsed.netloc) + ): + raw_path = parsed.path[1:] if parsed.path.startswith("/") else parsed.path + bucket, _, key = raw_path.partition("/") + if bucket: + return f"s3://{bucket}", key + return _split_uri(uri) + + def _is_obstore_supported_url(path: str) -> bool: """Check if *path* is a URL that obstore can handle. @@ -484,6 +525,54 @@ def _plan_obstore_routing( return True, fs_kwargs +# Per-process cache: AWS bucket name -> region (``None`` if discovery failed). +# Buckets do not change region, so caching for the process lifetime is safe. +_BUCKET_REGION_CACHE: Dict[str, Optional[str]] = {} +_BUCKET_REGION_CACHE_LOCK = threading.Lock() + + +def _discover_aws_bucket_region(bucket: str) -> Optional[str]: + """Discover an AWS S3 bucket's region, cached per-process. + + Delegates to :func:`pyarrow.fs.resolve_s3_region`, which issues a HEAD + probe and reads ``x-amz-bucket-region`` (AWS returns it for both 200 OK + and 301 PermanentRedirect responses). PyArrow is already a required Ray + Data dependency and caches region lookups in its C++ S3 layer; the extra + per-process dict here also caches negative results, so a bucket that can't + be resolved (network error, non-AWS endpoint, or a PyArrow build without + S3 support) is probed at most once. + + Returns ``None`` when the region cannot be determined. Callers fall back + to obstore's default region handling, which preserves prior behavior for + non-AWS S3-compatible backends (MinIO, R2, etc.). + """ + with _BUCKET_REGION_CACHE_LOCK: + if bucket in _BUCKET_REGION_CACHE: + return _BUCKET_REGION_CACHE[bucket] + + # Probe outside the lock so concurrent first-time lookups don't serialize + # on a network round-trip. + region: Optional[str] = None + try: + region = pyarrow.fs.resolve_s3_region(bucket) + except Exception as e: + # Includes AttributeError on PyArrow builds compiled without S3 + # support, where ``resolve_s3_region`` is absent. + logger.debug("Failed to discover region for bucket %r: %s", bucket, e) + + with _BUCKET_REGION_CACHE_LOCK: + # Another thread may have resolved the same bucket while we probed. + # A real region must win over a ``None`` result: never let a failed + # probe overwrite a region a concurrent thread already cached, or + # later ``StoreRegistry.get`` calls would skip region injection and + # cross-region downloads could fail intermittently. + cached = _BUCKET_REGION_CACHE.get(bucket) + if cached is not None: + return cached + _BUCKET_REGION_CACHE[bucket] = region + return region + + class StoreRegistry: """Cache of store_url -> ObjectStore instances. @@ -506,6 +595,23 @@ def __init__( def get(self, store_url: str) -> Any: if store_url not in self._cache: kwargs = dict(self._filesystem_kwargs) + # obstore's S3Store defaults ``region`` to ``us-east-1`` and does + # not auto-discover the bucket's real region, so cross-region + # buckets fail with ``BareRedirect`` (AWS's PermanentRedirect + # omits the ``Location`` header, defeating generic redirect + # following). Probe ``x-amz-bucket-region`` once per bucket and + # supply the discovered region explicitly. Skip when the caller + # already provided a region or a custom endpoint (MinIO/R2/etc.). + if ( + store_url.startswith(("s3://", "s3a://")) + and "region" not in kwargs + and "endpoint" not in kwargs + ): + bucket = store_url.split("://", 1)[1].split("/", 1)[0] + if bucket: + region = _discover_aws_bucket_region(bucket) + if region: + kwargs["region"] = region if store_url.startswith("http://"): # obstore's reqwest client rejects http:// by default. Auto-enable it # to maintain parity with PyArrow (which accepts http:// via fsspec), @@ -800,7 +906,7 @@ async def _resolve_size( import obstore as obs try: - store_url, path = _split_uri(uri) + store_url, path = _split_obstore_uri(uri) store = registry.get(store_url) async with semaphore: meta = await obs.head_async(store, path) @@ -851,7 +957,7 @@ async def _fetch_ranged( pipeline never loses a file due to a transient range error. """ try: - store_url, path = _split_uri(uri) + store_url, path = _split_obstore_uri(uri) store = registry.get(store_url) result = bytearray(size) @@ -914,6 +1020,6 @@ async def _fetch(uri: str, registry: StoreRegistry) -> bytes: """Download a single URI as a whole-file GET and return raw bytes.""" import obstore as obs - store_url, path = _split_uri(uri) + store_url, path = _split_obstore_uri(uri) result = await obs.get_async(registry.get(store_url), path) return bytes(await result.bytes_async()) diff --git a/python/ray/data/_internal/planner/download_partition_actor.py b/python/ray/data/_internal/planner/download_partition_actor.py index 2f0dc0ead34a..282d4758f81c 100644 --- a/python/ray/data/_internal/planner/download_partition_actor.py +++ b/python/ray/data/_internal/planner/download_partition_actor.py @@ -14,11 +14,12 @@ StoreRegistry, _extract_credentials_from_filesystem, _is_obstore_supported_url, + _split_obstore_uri, ) from ray.data._internal.util import RetryingPyFileSystem, _arrow_batcher from ray.data.block import BlockAccessor from ray.data.context import DataContext -from ray.data.datasource.path_util import _resolve_paths_and_filesystem, _split_uri +from ray.data.datasource.path_util import _resolve_paths_and_filesystem logger = logging.getLogger(__name__) @@ -268,7 +269,7 @@ def _sample_sizes(self, uris: List[str]) -> List[int]: async def _head_one(uri: str) -> int: try: - store_url, path = _split_uri(uri) + store_url, path = _split_obstore_uri(uri) store = self._registry.get(store_url) async with sem: meta = await obs.head_async(store, path) diff --git a/python/ray/data/tests/unit/test_obstore_download.py b/python/ray/data/tests/unit/test_obstore_download.py index 9173a5444fb1..01e70e76c094 100644 --- a/python/ray/data/tests/unit/test_obstore_download.py +++ b/python/ray/data/tests/unit/test_obstore_download.py @@ -10,13 +10,18 @@ import pytest from ray.data._internal.planner._obstore_download import ( + _BUCKET_REGION_CACHE, _FILE_SIZE_COLUMN_PREFIX, + StoreRegistry, + _discover_aws_bucket_region, _download_uris_with_obstore, _extract_credentials_from_filesystem, _is_obstore_supported_url, _obstore_filesystem_requires_threaded_download, _plan_obstore_routing, + _resolve_size, _S3FSSessionCredentialProvider, + _split_obstore_uri, download_bytes_async, ) from ray.data._internal.planner.plan_download_op import ( @@ -58,6 +63,255 @@ def test_split_uri(uri, expected_store_url, expected_path): assert path == expected_path +# TestSplitObstoreUri — AWS path-style HTTPS URLs must be rewritten to s3:// +# so obstore's S3Store keys off the bucket (with region discovery) instead of +# off the regional host endpoint (which pins region and fails with +# BareRedirect on cross-region buckets). +@pytest.mark.parametrize( + "uri, expected_store_url, expected_path", + [ + # Path-style regional: must rewrite to s3://. + ( + "https://s3.us-east-1.amazonaws.com/ray-example-data/imagenet/foo.jpg", + "s3://ray-example-data", + "imagenet/foo.jpg", + ), + # Path-style legacy global endpoint: also rewrite. + ( + "https://s3.amazonaws.com/my-bucket/key.bin", + "s3://my-bucket", + "key.bin", + ), + # Virtual-host style: bucket already in netloc, leave alone. + ( + "https://my-bucket.s3.us-west-2.amazonaws.com/key.bin", + "https://my-bucket.s3.us-west-2.amazonaws.com", + "key.bin", + ), + # Native s3:// URIs: no change. + ( + "s3://my-bucket/prefix/key.jpg", + "s3://my-bucket", + "prefix/key.jpg", + ), + # Pre-signed URLs (any query string): signature is bound to host + + # path + query, so rewriting would invalidate it. Pass through. + ( + "https://s3.us-east-1.amazonaws.com/bucket/key?X-Amz-Signature=abc", + "https://s3.us-east-1.amazonaws.com", + "bucket/key?X-Amz-Signature=abc", + ), + # Non-AWS HTTPS (MinIO, R2 custom domain, generic file server): + # leave alone — only AWS S3 needs the region-discovery rewrite. + ( + "https://files.example.com/bucket/key.bin", + "https://files.example.com", + "bucket/key.bin", + ), + # Other clouds: untouched. + ("gs://bucket/path", "gs://bucket", "path"), + # Path-style with nested key. + ( + "https://s3.eu-central-1.amazonaws.com/bucket/a/b/c.parquet", + "s3://bucket", + "a/b/c.parquet", + ), + # Path-style with only a bucket (no key): bucket is preserved, key + # is empty. Edge case; downstream obstore call would fail but that's + # the caller's problem. + ( + "https://s3.amazonaws.com/just-a-bucket", + "s3://just-a-bucket", + "", + ), + ], +) +def test_split_obstore_uri(uri, expected_store_url, expected_path): + store_url, path = _split_obstore_uri(uri) + assert store_url == expected_store_url + assert path == expected_path + + +# TestBucketRegionDiscovery — exercise the region-discovery path used by +# StoreRegistry to find the real region of cross-region S3 buckets. Region +# resolution is delegated to ``pyarrow.fs.resolve_s3_region``; these tests +# patch it to avoid real network calls. +class TestBucketRegionDiscovery: + @staticmethod + def _clear_cache(): + _BUCKET_REGION_CACHE.clear() + + def test_resolved_region_is_returned(self): + # Whether the bucket is same-region (200 OK) or cross-region (301 + # PermanentRedirect), PyArrow reads x-amz-bucket-region and returns it. + self._clear_cache() + with patch("pyarrow.fs.resolve_s3_region", return_value="us-west-2"): + assert _discover_aws_bucket_region("cross-region-bucket") == "us-west-2" + + def test_cache_hit_avoids_second_probe(self): + # First call probes; second call must not invoke resolve_s3_region. + self._clear_cache() + with patch("pyarrow.fs.resolve_s3_region", return_value="eu-west-1") as resolve: + _discover_aws_bucket_region("euro-bucket") + _discover_aws_bucket_region("euro-bucket") + assert resolve.call_count == 1 + + def test_failure_returns_none_and_caches(self): + # On resolution failure (network error, non-AWS endpoint) return None + # so StoreRegistry leaves obstore's defaults intact for MinIO/R2/etc. + # Cache the negative result so we don't keep probing. + self._clear_cache() + with patch( + "pyarrow.fs.resolve_s3_region", side_effect=OSError("nope") + ) as resolve: + assert _discover_aws_bucket_region("unreachable") is None + assert _discover_aws_bucket_region("unreachable") is None + assert resolve.call_count == 1 + + def test_missing_resolve_s3_region_returns_none(self): + # PyArrow builds compiled without S3 support lack resolve_s3_region, + # so the attribute access raises AttributeError; discovery must degrade + # to None rather than propagate it. + self._clear_cache() + with patch("pyarrow.fs.resolve_s3_region", side_effect=AttributeError("no S3")): + assert _discover_aws_bucket_region("no-s3-support") is None + + def test_none_result_does_not_overwrite_cached_region(self): + # Race: this thread probes (outside the lock) and gets None, while a + # concurrent thread resolves and caches the real region. The failed + # probe must not clobber the cached region, otherwise later + # StoreRegistry.get calls skip region injection and cross-region + # downloads fail intermittently. + self._clear_cache() + + def resolve_then_simulate_concurrent_win(bucket): + # Stand in for another thread caching the real region mid-probe. + _BUCKET_REGION_CACHE[bucket] = "us-west-2" + return None + + with patch( + "pyarrow.fs.resolve_s3_region", + side_effect=resolve_then_simulate_concurrent_win, + ): + assert _discover_aws_bucket_region("racy-bucket") == "us-west-2" + assert _BUCKET_REGION_CACHE["racy-bucket"] == "us-west-2" + + +# TestStoreRegistryRegionInjection — verify region discovery happens at +# store-construction time for s3:// URLs without an explicit region, and is +# correctly skipped when the caller has already configured one. +class TestStoreRegistryRegionInjection: + @staticmethod + def _make_registry_capturing_kwargs(): + _BUCKET_REGION_CACHE.clear() + captured = {} + + def fake_from_url(url, **kwargs): + captured["store_url"] = url + captured["kwargs"] = kwargs + return MagicMock(name="fake-store") + + reg = StoreRegistry() + reg._from_url = fake_from_url + return reg, captured + + def test_s3_url_injects_discovered_region(self): + reg, captured = self._make_registry_capturing_kwargs() + with patch( + "ray.data._internal.planner._obstore_download._discover_aws_bucket_region", + return_value="us-west-2", + ): + reg.get("s3://ray-example-data") + assert captured["kwargs"].get("region") == "us-west-2" + + def test_s3a_url_also_injects_region(self): + reg, captured = self._make_registry_capturing_kwargs() + with patch( + "ray.data._internal.planner._obstore_download._discover_aws_bucket_region", + return_value="us-west-2", + ): + reg.get("s3a://some-bucket") + assert captured["kwargs"].get("region") == "us-west-2" + + def test_explicit_region_not_overridden(self): + # Caller (e.g., creds extracted from PyArrow S3FileSystem) already + # supplied region; do not probe and do not override. + reg = StoreRegistry(region="eu-central-1") + captured = {} + + def fake_from_url(url, **kwargs): + captured["kwargs"] = kwargs + return MagicMock() + + reg._from_url = fake_from_url + with patch( + "ray.data._internal.planner._obstore_download._discover_aws_bucket_region" + ) as probe: + reg.get("s3://bucket") + probe.assert_not_called() + assert captured["kwargs"]["region"] == "eu-central-1" + + def test_custom_endpoint_skips_probe(self): + # MinIO / R2 / localstack: caller configured a custom endpoint, so + # the AWS region probe would be both wrong and wasteful. + reg = StoreRegistry(endpoint="http://localhost:9000") + reg._from_url = lambda *a, **k: MagicMock() + with patch( + "ray.data._internal.planner._obstore_download._discover_aws_bucket_region" + ) as probe: + reg.get("s3://bucket") + probe.assert_not_called() + + def test_https_url_does_not_probe(self): + # Virtual-hosted-style or presigned HTTPS URLs reach get() unchanged + # (after _split_obstore_uri). obstore parses the region from the + # netloc directly, so probing would be redundant. + reg, captured = self._make_registry_capturing_kwargs() + with patch( + "ray.data._internal.planner._obstore_download._discover_aws_bucket_region" + ) as probe: + reg.get("https://bucket.s3.us-east-1.amazonaws.com") + probe.assert_not_called() + + +# TestResolveSizeRewrite — the HEAD size probe must apply the same path-style +# -> s3:// rewrite as the download paths. Otherwise a cross-region path-style +# URL keeps the regional HTTPS store, hits BareRedirect, returns size 0, and +# wrongly skips ranged downloads even when a whole-file GET would succeed. +class TestResolveSizeRewrite: + def test_resolve_size_rewrites_path_style_uri(self): + _BUCKET_REGION_CACHE.clear() + captured = {} + + def fake_from_url(url, **kwargs): + captured["store_url"] = url + return MagicMock(name="store") + + registry = StoreRegistry() + registry._from_url = fake_from_url + + async def fake_head_async(store, path): + captured["path"] = path + return {"size": 4096} + + with patch( + "ray.data._internal.planner._obstore_download._discover_aws_bucket_region", + return_value="us-west-2", + ), patch("obstore.head_async", side_effect=fake_head_async): + size = asyncio.run( + _resolve_size( + "https://s3.us-east-1.amazonaws.com/cross-region-bucket/key.bin", + registry, + asyncio.Semaphore(), + ) + ) + + # Rewritten to s3:// so region discovery applies; HEAD succeeds. + assert size == 4096 + assert captured["store_url"] == "s3://cross-region-bucket" + assert captured["path"] == "key.bin" + + # TestDownloadHelpers class TestDownloadHelpers: """Unit tests for credential extraction and URL-scheme detection.""" diff --git a/python/ray/llm/_internal/serve/core/server/llm_server.py b/python/ray/llm/_internal/serve/core/server/llm_server.py index 05621d8fc8e4..ee7e49a84201 100644 --- a/python/ray/llm/_internal/serve/core/server/llm_server.py +++ b/python/ray/llm/_internal/serve/core/server/llm_server.py @@ -297,7 +297,17 @@ async def _maybe_add_request_id_to_request( "TranscriptionRequest", ], ): - """Add the request id to the request.""" + """Stamp the Serve request id, unless the caller set request_id explicitly. + + request_id defaults to a random uuid (never None), so use model_fields_set + to avoid clobbering an id a caller deliberately set (e.g. a P/D connector's + coordination id). Some request types (tokenize/detokenize) have no + request_id field at all -- skip those. + """ + if not hasattr(request, "request_id"): + return + if "request_id" in request.model_fields_set: + return request_id = get_serve_request_id() if request_id: request.request_id = request_id diff --git a/python/ray/llm/tests/serve/cpu/deployments/llm/test_llm_server.py b/python/ray/llm/tests/serve/cpu/deployments/llm/test_llm_server.py index ef5d13514e96..345cab4da4ce 100644 --- a/python/ray/llm/tests/serve/cpu/deployments/llm/test_llm_server.py +++ b/python/ray/llm/tests/serve/cpu/deployments/llm/test_llm_server.py @@ -14,6 +14,7 @@ LoraConfig, ModelLoadingConfig, ) +from ray.llm._internal.serve.core.configs.openai_api_models import CompletionRequest from ray.llm._internal.serve.core.protocol import RawRequestInfo from ray.llm._internal.serve.core.server.llm_server import LLMServer from ray.llm._internal.serve.engines.vllm.vllm_engine import ( @@ -731,5 +732,58 @@ def test_noop_when_request_id_unset(self): ) +class TestMaybeAddRequestId: + """``_maybe_add_request_id_to_request`` fills the Serve request id for a + defaulted request_id but never clobbers one the caller set explicitly.""" + + def _set_ctx(self, request_id): + serve.context._serve_request_context.set( + serve.context._RequestContext(request_id=request_id) + ) + + @pytest.mark.asyncio + async def test_defaulted_request_id_is_overwritten_with_serve_id(self): + server = LLMServer.__new__(LLMServer) + req = CompletionRequest(model="m", prompt="hi") # request_id defaulted + assert "request_id" not in req.model_fields_set + self._set_ctx("serve-ctx-id") + try: + await server._maybe_add_request_id_to_request(req) + finally: + serve.context._serve_request_context.set(serve.context._RequestContext()) + assert req.request_id == "serve-ctx-id" + + @pytest.mark.asyncio + async def test_explicit_request_id_is_preserved(self): + server = LLMServer.__new__(LLMServer) + req = CompletionRequest(model="m", prompt="hi", request_id="caller-set-id") + assert "request_id" in req.model_fields_set + self._set_ctx("serve-ctx-id") + try: + await server._maybe_add_request_id_to_request(req) + finally: + serve.context._serve_request_context.set(serve.context._RequestContext()) + # Caller's id wins; the Serve context id does not clobber it. + assert req.request_id == "caller-set-id" + + @pytest.mark.asyncio + async def test_request_without_request_id_field_is_skipped(self): + """Request types without a request_id field (e.g. tokenize/detokenize) + must be handled gracefully, not raise.""" + from pydantic import BaseModel + + class _NoRequestId(BaseModel): + pass + + server = LLMServer.__new__(LLMServer) + req = _NoRequestId() + self._set_ctx("serve-ctx-id") + try: + await server._maybe_add_request_id_to_request(req) + finally: + serve.context._serve_request_context.set(serve.context._RequestContext()) + assert not hasattr(req, "request_id") + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/serve/_private/constants.py b/python/ray/serve/_private/constants.py index 4dec001c8747..07eab9475b8a 100644 --- a/python/ray/serve/_private/constants.py +++ b/python/ray/serve/_private/constants.py @@ -936,8 +936,15 @@ # Hold released replica ports out of the pool this long so proxies can # drop their stale slot before a new replica grabs the same port. 0 disables. +# Defaults to hard-stop-after plus a margin: soft-stopped (reloaded-out) +# HAProxy workers run no health checks and keep routing to their frozen +# server list until hard-stop-after fires, so a freed port must stay out +# of the pool at least that long or another app's replica can inherit the +# old app's traffic. The margin covers the broadcast/reload lag before an +# old worker's hard-stop clock starts. RAY_SERVE_PORT_QUARANTINE_S = get_env_float_non_negative( - "RAY_SERVE_PORT_QUARANTINE_S", 10.0 + "RAY_SERVE_PORT_QUARANTINE_S", + float(RAY_SERVE_HAPROXY_HARD_STOP_AFTER_S + 30), ) # The minimum drain period for a HTTP proxy. # If RAY_SERVE_FORCE_STOP_UNHEALTHY_REPLICAS is set to 1, diff --git a/python/ray/tests/test_streaming_generator.py b/python/ray/tests/test_streaming_generator.py index 39006025eb44..666efe6b7a40 100644 --- a/python/ray/tests/test_streaming_generator.py +++ b/python/ray/tests/test_streaming_generator.py @@ -575,5 +575,65 @@ async def async_f(self): ray.get(next(g)) +def test_next_sync_timeout_when_generator_ref_unavailable( + monkeypatch, ray_start_cluster +): + """_next_sync(timeout_s) must not block in its end-of-stream handling. + + After all yielded refs are consumed, ``_next_sync`` calls + ``ray.get(generator_ref)`` to distinguish a normal end of the stream + from a task failure. If that object is unavailable — e.g. it lived in + the plasma store of a node that died — the get must be bounded by the + caller's timeout (reporting "not ready yet" with a nil ref) instead of + blocking the caller until the object is reconstructed. A blocked caller + can deadlock: reconstruction needs a CPU, and the blocked caller may be + what releases one (see ray-project/ray#63701). + """ + # Force the generator task's return object into plasma (instead of + # being inlined with the owner) so that it is lost when its node dies. + monkeypatch.setenv("RAY_max_direct_call_object_size", "0") + cluster = ray_start_cluster + cluster.add_node(num_cpus=0) + ray.init(address=cluster.address) + worker_node = cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() + + @ray.remote(num_returns="streaming") + def gen(): + yield 1 + + g = gen.remote() + # Consume the single yielded ref so that the stream is exhausted. + first = g._next_sync(timeout_s=30) + assert not first.is_nil() + + # Lose the generator's return object together with its node. + cluster.remove_node(worker_node) + + # End-of-stream handling must honor the timeout: a nil ref, not a block. + start = time.monotonic() + assert g._next_sync(timeout_s=0).is_nil() + assert time.monotonic() - start < 10 + + # The async counterpart must honor the timeout the same way. + start = time.monotonic() + assert asyncio.run(g._next_async(timeout_s=0)).is_nil() + assert time.monotonic() - start < 10 + + # Once a node is back, lineage reconstruction restores the return object + # and the stream terminates normally. + cluster.add_node(num_cpus=1) + cluster.wait_for_nodes() + deadline = time.monotonic() + 60 + while time.monotonic() < deadline: + try: + ref = g._next_sync(timeout_s=1) + except StopIteration: + break + assert ref.is_nil() + else: + pytest.fail("Generator did not finish after the node was restored.") + + if __name__ == "__main__": sys.exit(pytest.main(["-sv", __file__]))