Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .buildkite/test.rules.test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ release/release_tests.yaml: tools
ci/lint/lint.sh: tools
ci/ray_ci/tester.py: tools
ci/ci.sh: tools
# Windows CI driver scripts trigger the Windows test steps, not just tools.
ci/ray_ci/windows/build_base.sh: windows tools
ci/ray_ci/windows/install_tools.sh: windows tools

# === Build Scripts ===

Expand Down
11 changes: 10 additions & 1 deletion .buildkite/test.rules.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
! python cpp core_cpp java workflow cgraphs_direct_transport dashboard
! ray_client runtime_env_container
! data dask serve ml tune train train_gpu llm rllib rllib_gpu rllib_directly
! linux_wheels macos_wheels docker doc python_dependencies min_build tools
! linux_wheels macos_wheels docker doc python_dependencies min_build tools windows
! release_tests spark_on_ray

# NOTE: dstrodtman is leading effort to rework content in RST files, and wanted to shift
Expand Down Expand Up @@ -250,6 +250,15 @@ ci/ray_ci/macos/macos_ci_build.sh
@ macos_wheels
;

# Windows CI driver scripts (base image build, tool install, cleanup) feed the
# windowsbuild image that every Windows test consumes. Emit 'windows' so the
# Windows test steps (tagged 'windows') run, not just the image rebuild. Must
# precede the general ci/ray_ci/ rule below, which would otherwise swallow
# these into 'tools' only.
ci/ray_ci/windows/
@ windows tools
;

ci/pipeline/
ci/build/
ci/ray_ci/
Expand Down
1 change: 1 addition & 0 deletions .buildkite/windows.rayci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ steps:
if: build.env("BUILDKITE_PIPELINE_ID") == "0189942e-0876-4b8f-80a4-617f988ec59b" || build.env("BUILDKITE_PIPELINE_ID") == "018f4f1e-1b73-4906-9802-92422e3badaa"
tags:
- core_cpp
- windows
job_env: WINDOWS
instance_type: windows
commands:
Expand Down
4 changes: 2 additions & 2 deletions ci/ray_ci/windows/build_base.sh
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ conda init
# Conda 26.3.1 crashes on Windows when it tries to clean up a locked exe during
# a build-variant swap. Preventing self-update avoids that code path.
conda config --set auto_update_conda false
conda install -q -y python="${PYTHON_FULL_VERSION}" requests=2.32.3
conda install -q -y python="${PYTHON_FULL_VERSION}" requests=2.32.3 pyopenssl=23.2.0
# Force CA trust stack to the newest versions available at build time.
conda update -c conda-forge -q -y ca-certificates certifi
conda update --freeze-installed -c conda-forge -q -y ca-certificates certifi

# Install torch first, as some dependencies (e.g. torch-spline-conv) need torch to be
# installed for their own install.
Expand Down
34 changes: 30 additions & 4 deletions python/ray/_private/object_ref_generator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from typing import TYPE_CHECKING, Deque, Iterator, Optional

import ray
from ray.exceptions import ObjectRefStreamEndOfStreamError
from ray.exceptions import GetTimeoutError, ObjectRefStreamEndOfStreamError
from ray.util.annotations import DeveloperAPI, PublicAPI

if TYPE_CHECKING:
Expand Down Expand Up @@ -227,7 +227,21 @@ def _next_sync(self, timeout_s: Optional[int | float] = None) -> "ray.ObjectRef"
# The generator ref contains an exception
# if there's any failure. It contains nothing otherwise.
# In that case, it should raise StopIteration.
ray.get(self._generator_ref)
#
# Bound this get by the caller's timeout: the return object
# can be remote — or lost to a failed node and pending
# reconstruction — and an unbounded get would block the
# caller until it is restored (e.g. the Ray Data scheduling
# thread; a saturated cluster can then deadlock, since the
# blocked consumer is what releases backpressured CPUs).
# Per this method's contract, a timeout is reported as "no
# object ready yet" (nil ref) so the caller retries.
ray.get(
self._generator_ref,
timeout=(None if timeout_s is None or timeout_s < 0 else timeout_s),
)
except GetTimeoutError:
return ray.ObjectRef.nil()
except Exception:
self._generator_task_raised = True
return self._generator_ref
Expand Down Expand Up @@ -270,8 +284,20 @@ async def _next_async(self, timeout_s: Optional[int | float] = None):
try:
# The generator ref contains an exception
# if there's any failure. It contains nothing otherwise.
# In that case, it should raise StopSyncIteration.
await self._generator_ref
# In that case, it should raise StopAsyncIteration.
#
# Bound this await by the caller's timeout, mirroring
# _next_sync: the return object can be remote — or lost to a
# failed node and pending reconstruction — and an unbounded
# await would block the caller until it is restored. Per this
# method's contract, a timeout is reported as "no object
# ready yet" (nil ref) so the caller retries.
if timeout_s is None or timeout_s < 0:
await self._generator_ref
else:
await asyncio.wait_for(self._generator_ref, timeout=timeout_s)
except asyncio.TimeoutError:
return ray.ObjectRef.nil()
except Exception:
self._generator_task_raised = True
return self._generator_ref
Expand Down
114 changes: 110 additions & 4 deletions python/ray/data/_internal/planner/_obstore_download.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import inspect
import logging
import os
import re
import threading
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional
from typing import TYPE_CHECKING, Any, Dict, Iterator, List, Optional, Tuple
from urllib.parse import urlparse

if TYPE_CHECKING:
import s3fs # noqa: F401
Expand Down Expand Up @@ -98,6 +100,45 @@ def _log_fallback_warning() -> None:
)


# Path-style AWS S3 host: ``s3.amazonaws.com`` (legacy global) or
# ``s3.<region>.amazonaws.com`` (regional). Virtual-host hosts like
# ``<bucket>.s3.<region>.amazonaws.com`` already carry the bucket in the
# netloc and route correctly through obstore without rewriting.
_AWS_PATH_STYLE_HOST_RE = re.compile(
r"^s3(?:\.[a-z0-9-]+)?\.amazonaws\.com$", re.IGNORECASE
)


def _split_obstore_uri(uri: str) -> Tuple[str, str]:
"""Split a URI into ``(store_url, path)`` for ``obstore.store.from_url``.

Wraps :func:`_split_uri` to rewrite AWS path-style HTTPS URLs of the
form ``https://s3.<region>.amazonaws.com/<bucket>/<key>`` into
``store_url='s3://<bucket>'`` / ``path='<key>'``. Without this, obstore
constructs an ``S3Store`` keyed by the regional host endpoint, which
pins region from the URL and bails with ``BareRedirect`` whenever the
bucket actually lives in a different region (AWS's PermanentRedirect
omits the ``Location`` header, so generic redirect-following fails).
Rewriting to ``s3://<bucket>`` lets ``S3Store`` discover the bucket's
real region once and cache it for the store's lifetime.

Pre-signed HTTPS URLs (any URI with a query string) are passed through
unchanged because their signature is bound to host + path + query, so
rewriting would invalidate the signature.
"""
parsed = urlparse(uri, allow_fragments=False)
if (
parsed.scheme in ("http", "https")
and not parsed.query
and _AWS_PATH_STYLE_HOST_RE.match(parsed.netloc)
):
raw_path = parsed.path[1:] if parsed.path.startswith("/") else parsed.path
bucket, _, key = raw_path.partition("/")
if bucket:
return f"s3://{bucket}", key
return _split_uri(uri)


def _is_obstore_supported_url(path: str) -> bool:
"""Check if *path* is a URL that obstore can handle.

Expand Down Expand Up @@ -484,6 +525,54 @@ def _plan_obstore_routing(
return True, fs_kwargs


# Per-process cache: AWS bucket name -> region (``None`` if discovery failed).
# Buckets do not change region, so caching for the process lifetime is safe.
_BUCKET_REGION_CACHE: Dict[str, Optional[str]] = {}
_BUCKET_REGION_CACHE_LOCK = threading.Lock()


def _discover_aws_bucket_region(bucket: str) -> Optional[str]:
"""Discover an AWS S3 bucket's region, cached per-process.

Delegates to :func:`pyarrow.fs.resolve_s3_region`, which issues a HEAD
probe and reads ``x-amz-bucket-region`` (AWS returns it for both 200 OK
and 301 PermanentRedirect responses). PyArrow is already a required Ray
Data dependency and caches region lookups in its C++ S3 layer; the extra
per-process dict here also caches negative results, so a bucket that can't
be resolved (network error, non-AWS endpoint, or a PyArrow build without
S3 support) is probed at most once.

Returns ``None`` when the region cannot be determined. Callers fall back
to obstore's default region handling, which preserves prior behavior for
non-AWS S3-compatible backends (MinIO, R2, etc.).
"""
with _BUCKET_REGION_CACHE_LOCK:
if bucket in _BUCKET_REGION_CACHE:
return _BUCKET_REGION_CACHE[bucket]

# Probe outside the lock so concurrent first-time lookups don't serialize
# on a network round-trip.
region: Optional[str] = None
try:
region = pyarrow.fs.resolve_s3_region(bucket)
except Exception as e:
# Includes AttributeError on PyArrow builds compiled without S3
# support, where ``resolve_s3_region`` is absent.
logger.debug("Failed to discover region for bucket %r: %s", bucket, e)

with _BUCKET_REGION_CACHE_LOCK:
# Another thread may have resolved the same bucket while we probed.
# A real region must win over a ``None`` result: never let a failed
# probe overwrite a region a concurrent thread already cached, or
# later ``StoreRegistry.get`` calls would skip region injection and
# cross-region downloads could fail intermittently.
cached = _BUCKET_REGION_CACHE.get(bucket)
if cached is not None:
return cached
_BUCKET_REGION_CACHE[bucket] = region
return region


class StoreRegistry:
"""Cache of store_url -> ObjectStore instances.

Expand All @@ -506,6 +595,23 @@ def __init__(
def get(self, store_url: str) -> Any:
if store_url not in self._cache:
kwargs = dict(self._filesystem_kwargs)
# obstore's S3Store defaults ``region`` to ``us-east-1`` and does
# not auto-discover the bucket's real region, so cross-region
# buckets fail with ``BareRedirect`` (AWS's PermanentRedirect
# omits the ``Location`` header, defeating generic redirect
# following). Probe ``x-amz-bucket-region`` once per bucket and
# supply the discovered region explicitly. Skip when the caller
# already provided a region or a custom endpoint (MinIO/R2/etc.).
if (
store_url.startswith(("s3://", "s3a://"))
and "region" not in kwargs
and "endpoint" not in kwargs
):
bucket = store_url.split("://", 1)[1].split("/", 1)[0]
if bucket:
region = _discover_aws_bucket_region(bucket)
if region:
kwargs["region"] = region
if store_url.startswith("http://"):
# obstore's reqwest client rejects http:// by default. Auto-enable it
# to maintain parity with PyArrow (which accepts http:// via fsspec),
Expand Down Expand Up @@ -800,7 +906,7 @@ async def _resolve_size(
import obstore as obs

try:
store_url, path = _split_uri(uri)
store_url, path = _split_obstore_uri(uri)
store = registry.get(store_url)
async with semaphore:
meta = await obs.head_async(store, path)
Expand Down Expand Up @@ -851,7 +957,7 @@ async def _fetch_ranged(
pipeline never loses a file due to a transient range error.
"""
try:
store_url, path = _split_uri(uri)
store_url, path = _split_obstore_uri(uri)
store = registry.get(store_url)
result = bytearray(size)

Expand Down Expand Up @@ -914,6 +1020,6 @@ async def _fetch(uri: str, registry: StoreRegistry) -> bytes:
"""Download a single URI as a whole-file GET and return raw bytes."""
import obstore as obs

store_url, path = _split_uri(uri)
store_url, path = _split_obstore_uri(uri)
result = await obs.get_async(registry.get(store_url), path)
return bytes(await result.bytes_async())
5 changes: 3 additions & 2 deletions python/ray/data/_internal/planner/download_partition_actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
StoreRegistry,
_extract_credentials_from_filesystem,
_is_obstore_supported_url,
_split_obstore_uri,
)
from ray.data._internal.util import RetryingPyFileSystem, _arrow_batcher
from ray.data.block import BlockAccessor
from ray.data.context import DataContext
from ray.data.datasource.path_util import _resolve_paths_and_filesystem, _split_uri
from ray.data.datasource.path_util import _resolve_paths_and_filesystem

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -268,7 +269,7 @@ def _sample_sizes(self, uris: List[str]) -> List[int]:

async def _head_one(uri: str) -> int:
try:
store_url, path = _split_uri(uri)
store_url, path = _split_obstore_uri(uri)
store = self._registry.get(store_url)
async with sem:
meta = await obs.head_async(store, path)
Expand Down
Loading
Loading