diff --git a/src/google/adk/artifacts/file_artifact_service.py b/src/google/adk/artifacts/file_artifact_service.py index 9c3870b6e3..af02aae5c0 100644 --- a/src/google/adk/artifacts/file_artifact_service.py +++ b/src/google/adk/artifacts/file_artifact_service.py @@ -85,6 +85,39 @@ def _to_posix_path(path_value: str) -> PurePosixPath: return PurePosixPath(path_value) +def _is_absolute_path(path_value: str) -> bool: + """Returns whether the value is absolute on POSIX or Windows.""" + return ( + PurePosixPath(path_value).is_absolute() + or PureWindowsPath(path_value).is_absolute() + ) + + +def _validate_scope_identifier(identifier_name: str, identifier: str) -> str: + """Validates user/session identifiers before using them in paths.""" + if "\x00" in identifier: + raise InputValidationError( + f"{identifier_name} must not contain null bytes." + ) + if _is_absolute_path(identifier): + raise InputValidationError( + f"{identifier_name} {identifier!r} must be a single path component." + ) + + pure_path = _to_posix_path(identifier) + if pure_path.is_absolute() or len(pure_path.parts) != 1: + raise InputValidationError( + f"{identifier_name} {identifier!r} must be a single path component." + ) + + component = pure_path.parts[0] + if component in {"", ".", ".."}: + raise InputValidationError( + f"{identifier_name} {identifier!r} is not a valid storage scope." + ) + return component + + def _resolve_scoped_artifact_path( scope_root: Path, filename: str ) -> tuple[Path, Path]: @@ -109,7 +142,7 @@ def _resolve_scoped_artifact_path( pure_path = _to_posix_path(stripped) scope_root_resolved = scope_root.resolve(strict=False) - if pure_path.is_absolute(): + if _is_absolute_path(stripped) or pure_path.is_absolute(): raise InputValidationError( f"Absolute artifact filename {filename!r} is not permitted; " "provide a path relative to the storage scope." @@ -138,31 +171,6 @@ def _is_user_scoped(session_id: Optional[str], filename: str) -> bool: return session_id is None or _file_has_user_namespace(filename) -def _validate_path_segment(value: str, field_name: str) -> None: - """Rejects values that could alter the constructed filesystem path. - - Args: - value: The caller-supplied identifier (e.g. user_id or session_id). - field_name: Human-readable name used in the error message. - - Raises: - InputValidationError: If the value contains path separators, traversal - segments, or null bytes. - """ - if not value: - raise InputValidationError(f"{field_name} must not be empty.") - if "\x00" in value: - raise InputValidationError(f"{field_name} must not contain null bytes.") - if "/" in value or "\\" in value: - raise InputValidationError( - f"{field_name} {value!r} must not contain path separators." - ) - if value in (".", "..") or ".." in value.split("/"): - raise InputValidationError( - f"{field_name} {value!r} must not contain traversal segments." - ) - - def _user_artifacts_dir(base_root: Path) -> Path: """Returns the path that stores user-scoped artifacts.""" return base_root / "artifacts" @@ -170,8 +178,12 @@ def _user_artifacts_dir(base_root: Path) -> Path: def _session_artifacts_dir(base_root: Path, session_id: str) -> Path: """Returns the path that stores session-scoped artifacts.""" - _validate_path_segment(session_id, "session_id") - return base_root / "sessions" / session_id / "artifacts" + return ( + base_root + / "sessions" + / _validate_scope_identifier("session_id", session_id) + / "artifacts" + ) def _versions_dir(artifact_dir: Path) -> Path: @@ -246,8 +258,9 @@ def __init__(self, root_dir: Path | str): def _base_root(self, user_id: str, /) -> Path: """Returns the artifacts root directory for a user.""" - _validate_path_segment(user_id, "user_id") - return self.root_dir / "users" / user_id + return ( + self.root_dir / "users" / _validate_scope_identifier("user_id", user_id) + ) def _scope_root( self, diff --git a/tests/unittests/artifacts/test_artifact_service.py b/tests/unittests/artifacts/test_artifact_service.py index 8b82397097..1795ad60b8 100644 --- a/tests/unittests/artifacts/test_artifact_service.py +++ b/tests/unittests/artifacts/test_artifact_service.py @@ -25,8 +25,8 @@ from typing import Union from unittest import mock from unittest.mock import patch -from urllib.parse import unquote from urllib.parse import urlparse +from urllib.request import url2pathname from google.adk.artifacts.base_artifact_service import ArtifactVersion from google.adk.artifacts.base_artifact_service import ensure_part @@ -43,6 +43,11 @@ FIXED_DATETIME = datetime(2025, 1, 1, 12, 0, 0) +def _path_from_file_uri(uri: str) -> Path: + """Converts a file URI to a local filesystem path for assertions.""" + return Path(url2pathname(urlparse(uri).path)) + + class ArtifactServiceType(Enum): FILE = "FILE" IN_MEMORY = "IN_MEMORY" @@ -642,8 +647,7 @@ async def test_file_metadata_camelcase(tmp_path, artifact_service_factory): "version": 0, "customMetadata": {}, } - parsed_canonical = urlparse(metadata["canonicalUri"]) - canonical_path = Path(unquote(parsed_canonical.path)) + canonical_path = _path_from_file_uri(metadata["canonicalUri"]) assert canonical_path.name == "report.txt" assert canonical_path.read_bytes() == b"binary-content" @@ -690,8 +694,7 @@ async def test_file_list_artifact_versions(tmp_path, artifact_service_factory): ).resolve() assert version_meta.canonical_uri == version_payload_path.as_uri() assert version_meta.custom_metadata == custom_metadata - parsed_version_uri = urlparse(version_meta.canonical_uri) - version_uri_path = Path(unquote(parsed_version_uri.path)) + version_uri_path = _path_from_file_uri(version_meta.canonical_uri) assert version_uri_path.read_bytes() == b"binary-content" fetched = await artifact_service.get_artifact_version( @@ -832,6 +835,66 @@ async def test_file_save_artifact_rejects_absolute_path_within_scope(tmp_path): ) +@pytest.mark.asyncio +@pytest.mark.parametrize( + "user_id", + [ + "../escape-user", + "..\\escape-user", + "nested/user", + "nested\\user", + "/absolute-user", + "C:\\absolute-user", + ".", + "..", + ], +) +async def test_file_save_artifact_rejects_invalid_user_scope_identifiers( + tmp_path, user_id +): + """FileArtifactService rejects user IDs that alter the scope path.""" + artifact_service = FileArtifactService(root_dir=tmp_path / "artifacts") + with pytest.raises(InputValidationError): + await artifact_service.save_artifact( + app_name="myapp", + user_id=user_id, + session_id="sess123", + filename="proof.txt", + artifact=types.Part(text="content"), + ) + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "session_id", + [ + "../escape-session", + "..\\escape-session", + "../../../escape-session", + "..\\..\\..\\escape-session", + "nested/session", + "nested\\session", + "/absolute-session", + "C:\\absolute-session", + ".", + "..", + ], +) +async def test_file_save_artifact_rejects_invalid_session_scope_identifiers( + tmp_path, session_id +): + """FileArtifactService rejects session IDs that alter the scope path.""" + artifact_service = FileArtifactService(root_dir=tmp_path / "artifacts") + with pytest.raises(InputValidationError): + await artifact_service.save_artifact( + app_name="myapp", + user_id="user123", + session_id=session_id, + filename="proof.txt", + artifact=types.Part(text="content"), + ) + + class TestEnsurePart: """Tests for the ensure_part normalization helper."""