diff --git a/contributing/samples/gepa/experiment.py b/contributing/samples/gepa/experiment.py index f3751206a8..2710c3894c 100644 --- a/contributing/samples/gepa/experiment.py +++ b/contributing/samples/gepa/experiment.py @@ -43,7 +43,6 @@ from tau_bench.types import EnvRunResult from tau_bench.types import RunConfig import tau_bench_agent as tau_bench_agent_lib - import utils diff --git a/contributing/samples/gepa/run_experiment.py b/contributing/samples/gepa/run_experiment.py index d857da9635..e31db15788 100644 --- a/contributing/samples/gepa/run_experiment.py +++ b/contributing/samples/gepa/run_experiment.py @@ -25,7 +25,6 @@ from absl import flags import experiment from google.genai import types - import utils _OUTPUT_DIR = flags.DEFINE_string( diff --git a/src/google/adk/skills/_utils.py b/src/google/adk/skills/_utils.py index cab70a8d4b..e3185135e7 100644 --- a/src/google/adk/skills/_utils.py +++ b/src/google/adk/skills/_utils.py @@ -17,6 +17,7 @@ from __future__ import annotations import logging +import os import pathlib from typing import Union @@ -401,7 +402,7 @@ def _load_skill_from_gcs_dir( f" name '{skill_name_expected}'." ) - def _load_files_in_dir(subdir: str) -> Dict[str, Union[str, bytes]]: + def _load_files_in_dir(subdir: str) -> dict[str, str | bytes]: prefix = f"{skill_dir_prefix}{subdir}/" blobs = bucket.list_blobs(prefix=prefix) result = {} @@ -411,10 +412,26 @@ def _load_files_in_dir(subdir: str) -> Dict[str, Union[str, bytes]]: if not relative_path: continue + # Use PurePosixPath for platform-independent GCS path validation + p = pathlib.PurePosixPath(relative_path) + + # Reject absolute paths and traversal sequences + if p.is_absolute() or ".." in p.parts: + raise ValueError(f"Unsafe path in skill resource: {relative_path!r}") + + normalized = p.as_posix() + + # Prevent silent file overwrites via path aliasing + if normalized in result: + raise ValueError(f"Duplicate normalized path detected: {normalized!r}") + + # NOTE: Final path safety enforced during materialization + # via realpath + commonpath checks in skill_toolset.py try: - result[relative_path] = blob.download_as_text() + result[normalized] = blob.download_as_text() except UnicodeDecodeError: - result[relative_path] = blob.download_as_bytes() + result[normalized] = blob.download_as_bytes() + return result references = _load_files_in_dir("references") diff --git a/src/google/adk/tools/skill_toolset.py b/src/google/adk/tools/skill_toolset.py index a7a2c683ca..9f7fef11e6 100644 --- a/src/google/adk/tools/skill_toolset.py +++ b/src/google/adk/tools/skill_toolset.py @@ -520,6 +520,7 @@ def _build_wrapper_code( ) # Build the boilerplate extract string + code_lines = [ "import os", "import tempfile", @@ -531,8 +532,14 @@ def _build_wrapper_code( "def _materialize_and_run():", " _orig_cwd = os.getcwd()", " with tempfile.TemporaryDirectory() as td:", + " _real_base = os.path.realpath(td)", " for rel_path, content in _files.items():", - " full_path = os.path.join(td, rel_path)", + " if os.path.isabs(rel_path):", + " raise ValueError(f'Absolute path rejected: {rel_path!r}')", + " _safe = os.path.realpath(os.path.join(td, rel_path))", + " if os.path.commonpath([_real_base, _safe]) != _real_base:", + " raise ValueError(f'Path traversal detected: {rel_path!r}')", + " full_path = _safe", " os.makedirs(os.path.dirname(full_path), exist_ok=True)", " mode = 'wb' if isinstance(content, bytes) else 'w'", " with open(full_path, mode) as f:",