diff --git a/builders/test_runner.py b/builders/test_runner.py new file mode 100644 index 0000000..d98c97a --- /dev/null +++ b/builders/test_runner.py @@ -0,0 +1,44 @@ +import subprocess +import sys + +from fastapi import FastAPI, Response +from ray import serve + + +fastapi = FastAPI() + + +@serve.deployment(num_replicas=1) +@serve.ingress(fastapi) +class TestRunner: + def __init__(self) -> None: + subprocess.run( + [sys.executable, "-m", "pip", "install", "pytest", "-q"], + check=True, + ) + + @fastapi.post("/") + def run(self) -> Response: + result = subprocess.run( + [ + sys.executable, + "-m", + "pytest", + "tests/model_snapshots/", + "-v", + "--tb=short", + "--no-header", + "-s", + "--color=no", + ], + capture_output=True, + text=True, + ) + + output = result.stdout + if result.returncode != 0: + output += f"\nSTDERR:\n{result.stderr}" + return Response(content=output, media_type="text/plain") + + +app = TestRunner.bind() # type: ignore[attr-defined] diff --git a/builders/throughput_runner.py b/builders/throughput_runner.py new file mode 100644 index 0000000..976d596 --- /dev/null +++ b/builders/throughput_runner.py @@ -0,0 +1,58 @@ +import subprocess +import sys + +from fastapi import FastAPI, Response +from ray import serve + + +fastapi = FastAPI() + + +@serve.deployment(num_replicas=1) +@serve.ingress(fastapi) +class ThroughputRunner: + def __init__(self) -> None: + subprocess.run( + [sys.executable, "-m", "pip", "install", "pytest", "-q"], + check=True, + ) + + @fastapi.post("/") + def run( + self, + duration_s: float = 300.0, + concurrency: int = 8, + timeout: float = 60.0, + wait_ready: bool = True, + wait_timeout_s: float = 0.0, + wait_interval_s: float = 10.0, + ) -> Response: + cmd = [ + sys.executable, + "tests/perf_throughput.py", + "--duration-s", + str(duration_s), + "--concurrency", + str(concurrency), + "--timeout", + str(timeout), + ] + if wait_ready: + cmd.extend( + [ + "--wait-ready", + "--wait-timeout-s", + str(wait_timeout_s), + "--wait-interval-s", + str(wait_interval_s), + ] + ) + + result = subprocess.run(cmd, capture_output=True, text=True) + output = result.stdout + ( + f"\nSTDERR:\n{result.stderr}" if result.returncode != 0 else "" + ) + return Response(content=output, media_type="text/plain") + + +app = ThroughputRunner.bind() # type: ignore[attr-defined] diff --git a/helm/rayservice/applications/test-runner.yaml b/helm/rayservice/applications/test-runner.yaml new file mode 100644 index 0000000..765c0b4 --- /dev/null +++ b/helm/rayservice/applications/test-runner.yaml @@ -0,0 +1,16 @@ +- name: test-runner + import_path: builders.test_runner:app + route_prefix: /run-tests + runtime_env: + working_dir: https://github.com/RationAI/model-service/archive/refs/heads/main.zip + pip: + - git+https://github.com/RationAI/rationai-sdk-python.git + deployments: + - name: TestRunner + autoscaling_config: + min_replicas: 0 + max_replicas: 1 + target_ongoing_requests: 1 + ray_actor_options: + num_cpus: 1 + memory: 2147483648 diff --git a/helm/rayservice/applications/throughput-test.yaml b/helm/rayservice/applications/throughput-test.yaml new file mode 100644 index 0000000..a056a56 --- /dev/null +++ b/helm/rayservice/applications/throughput-test.yaml @@ -0,0 +1,16 @@ +- name: throughput-runner + import_path: builders.throughput_runner:app + route_prefix: /run-throughput + runtime_env: + working_dir: https://github.com/RationAI/model-service/archive/refs/heads/main.zip + pip: + - git+https://github.com/RationAI/rationai-sdk-python.git + deployments: + - name: ThroughputRunner + autoscaling_config: + min_replicas: 0 + max_replicas: 1 + target_ongoing_requests: 1 + ray_actor_options: + num_cpus: 1 + memory: 2147483648 \ No newline at end of file diff --git a/helm/rayservice/values.yaml b/helm/rayservice/values.yaml index 6e62751..2dbc443 100644 --- a/helm/rayservice/values.yaml +++ b/helm/rayservice/values.yaml @@ -8,3 +8,5 @@ applications: - prostate-classifier-1 - prov-gigapath - virchow2 + - test-runner + - throughput-test diff --git a/helm/rayservice/workers/cpu-workers.yaml b/helm/rayservice/workers/cpu-workers.yaml index eaa044b..0de6e15 100644 --- a/helm/rayservice/workers/cpu-workers.yaml +++ b/helm/rayservice/workers/cpu-workers.yaml @@ -45,6 +45,8 @@ template: mountPath: /mnt/cache - name: huggingface-cache mountPath: /mnt/huggingface_cache + - name: test-refs + mountPath: /mnt/test_refs volumes: - name: data persistentVolumeClaim: @@ -64,3 +66,6 @@ template: - name: huggingface-cache persistentVolumeClaim: claimName: huggingface-cache-pvc + - name: test-refs + persistentVolumeClaim: + claimName: model-test-refs-pvc diff --git a/helm/rayservice/workers/mig20-workers.yaml b/helm/rayservice/workers/mig20-workers.yaml index 77032d9..86571c7 100644 --- a/helm/rayservice/workers/mig20-workers.yaml +++ b/helm/rayservice/workers/mig20-workers.yaml @@ -55,6 +55,8 @@ template: mountPath: /mnt/cache - name: huggingface-cache mountPath: /mnt/huggingface_cache + - name: test-refs + mountPath: /mnt/test_refs volumes: - name: data persistentVolumeClaim: @@ -74,3 +76,6 @@ template: - name: huggingface-cache persistentVolumeClaim: claimName: huggingface-cache-pvc + - name: test-refs + persistentVolumeClaim: + claimName: model-test-refs-pvc diff --git a/pvc/model-test-refs-pvc.yaml b/pvc/model-test-refs-pvc.yaml new file mode 100644 index 0000000..ae08c4e --- /dev/null +++ b/pvc/model-test-refs-pvc.yaml @@ -0,0 +1,12 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: model-test-refs-pvc + namespace: rationai-jobs-ns +spec: + accessModes: + - ReadWriteMany + resources: + requests: + storage: 5Gi + storageClassName: nfs-csi diff --git a/pyproject.toml b/pyproject.toml index 53c4222..5276b78 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -18,5 +18,8 @@ dependencies = [ ] [dependency-groups] -dev = ["mypy>=1.18.2", "ruff>=0.14.6"] +dev = ["mypy>=1.18.2", "ruff>=0.14.6", "pytest>=8.4.2"] docs = ["mkdocs>=1.6.0", "mkdocs-material>=9.6.0", "pymdown-extensions>=10.0"] + +[tool.pytest.ini_options] +pythonpath = ["tests/model_snapshots"] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/model_snapshots/__init__.py b/tests/model_snapshots/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/model_snapshots/_shared.py b/tests/model_snapshots/_shared.py new file mode 100644 index 0000000..ed40070 --- /dev/null +++ b/tests/model_snapshots/_shared.py @@ -0,0 +1,209 @@ +from __future__ import annotations + +import os +from pathlib import Path + +import numpy as np +import pytest +from numpy.typing import NDArray +from rationai import Client +from ratiopath.openslide import OpenSlide + + +def _models_base_url() -> str: + return os.environ.get( + "MODEL_SERVICE_MODELS_BASE_URL", + "http://rayservice-model-tests-serve-svc.rationai-jobs-ns.svc.cluster.local:8000", + ) + + +def test_refs_dir() -> Path: + return Path(os.environ.get("MODEL_SERVICE_TEST_REFS_DIR", "/mnt/test_refs")) + + +def _read_tile_at( + slide_path: str, x: int, y: int, tile_size: int, level: int +) -> NDArray[np.uint8]: + with OpenSlide(slide_path) as slide: + downsample = slide.level_downsamples[level] + x_rel = int(x / downsample) + y_rel = int(y / downsample) + tile = slide.read_region_relative( + (x_rel, y_rel), level, (tile_size, tile_size) + ).convert("RGB") + return np.asarray(tile, dtype=np.uint8) + + +def run_binary_classifier_case( + model_id: str, + slide_path: str, + x: int, + y: int, + expected_score: float, + tile_size: int = 512, + level: int = 0, + timeout_s: float = 600.0, + expected_is_positive: bool | None = None, + threshold: float = 0.5, + atol: float = 1e-6, + rtol: float = 1e-5, + case_name: str | None = None, +) -> None: + tile = _read_tile_at(slide_path, x, y, tile_size, level) + + with Client(models_base_url=_models_base_url(), timeout=timeout_s) as client: + actual_score = float(client.models.classify_image(model=model_id, image=tile)) + + delta = actual_score - expected_score + name = case_name or "case" + + if expected_is_positive is not None: + actual_is_positive = actual_score >= threshold + assert actual_is_positive == expected_is_positive, ( + f"Binary class mismatch: expected_is_positive={expected_is_positive}, " + f"actual_score={actual_score:.6f}, threshold={threshold:.3f}" + ) + + if not np.isclose(actual_score, expected_score, rtol=rtol, atol=atol): + pytest.fail( + f"Binary score mismatch beyond tolerance (atol={atol}, rtol={rtol}, " + f"expected={expected_score:.6f}, actual={actual_score:.6f})" + ) + + print(f"\n/{model_id}") + print( + f"{name} stats: score={actual_score:.6f} expected={expected_score:.6f} " + f"delta={delta:+.6f} threshold={threshold:.3f}" + ) + + +def run_semantic_segmentation_case( + model_id: str, + slide_path: str, + x: int, + y: int, + expected_array_path: Path | str, + tile_size: int = 1024, + level: int = 0, + timeout_s: float = 1200.0, + atol: float = 1e-6, + rtol: float = 1e-5, + epithelium_threshold: float | None = None, + min_epithelium_fraction: float | None = None, + epithelium_channel: int | None = None, + case_name: str | None = None, +) -> None: + expected_array_path = Path(expected_array_path) + if not expected_array_path.exists(): + pytest.fail(f"Reference file does not exist: {expected_array_path}") + + tile = _read_tile_at(slide_path, x, y, tile_size, level) + expected = np.load(expected_array_path) + + with Client(models_base_url=_models_base_url(), timeout=timeout_s) as client: + actual = np.asarray(client.models.segment_image(model=model_id, image=tile)) + + if actual.shape != expected.shape: + pytest.fail(f"Shape mismatch: expected={expected.shape}, actual={actual.shape}") + + max_diff = np.abs(actual.astype(np.float32) - expected.astype(np.float32)).max() + + if actual.ndim == 4: + stats_slice = actual[0, 0] + elif actual.ndim == 3: + stats_slice = actual[0] + else: + stats_slice = actual.squeeze() + + stats_slice = stats_slice.astype(np.float32) + min_val = float(stats_slice.min()) + mean_val = float(stats_slice.mean()) + max_val = float(stats_slice.max()) + frac_05 = float((stats_slice >= 0.5).mean()) + name = case_name or "case" + + close_mask = np.isclose(actual, expected, rtol=rtol, atol=atol) + if not close_mask.all(): + mismatch_fraction = float((~close_mask).mean()) + pytest.fail( + "Output mismatch beyond tolerance " + f"(atol={atol}, rtol={rtol}, max_abs_diff={max_diff}, " + f"mismatch_fraction={mismatch_fraction:.6f})" + ) + + if epithelium_threshold is not None and min_epithelium_fraction is not None: + if actual.ndim == 4: + channel = 0 if epithelium_channel is None else epithelium_channel + epithelium = actual[0, channel] + elif actual.ndim == 3: + channel = 0 if epithelium_channel is None else epithelium_channel + epithelium = actual[channel] + else: + epithelium = actual.squeeze() + + if epithelium.ndim != 2: + pytest.fail( + "Cannot determine epithelium channel; provide epithelium_channel explicitly." + ) + + fraction = float((epithelium >= epithelium_threshold).mean()) + if fraction < min_epithelium_fraction: + pytest.fail( + "Epithelium coverage too low: " + f"fraction={fraction:.6f} < min_fraction={min_epithelium_fraction:.6f}" + ) + + print(f"\n/{model_id}") + print( + f"{name} stats: shape={actual.shape} max_diff={max_diff:.6f} " + f"min={min_val:.6f} mean={mean_val:.6f} max={max_val:.6f} " + f"frac>=0.5={frac_05:.6f}" + ) + + +def run_embed_case( + model_id: str, + slide_path: str, + x: int, + y: int, + expected_array_path: Path | str, + tile_size: int = 224, + level: int = 0, + timeout_s: float = 1200.0, + min_cosine_similarity: float = 0.999, + case_name: str | None = None, +) -> None: + expected_array_path = Path(expected_array_path) + if not expected_array_path.exists(): + pytest.fail(f"Reference file does not exist: {expected_array_path}") + + tile = _read_tile_at(slide_path, x, y, tile_size, level) + expected = np.load(expected_array_path).flatten().astype(np.float32) + + with Client(models_base_url=_models_base_url(), timeout=timeout_s) as client: + actual = ( + np.asarray(client.models.embed_image(model=model_id, image=tile)) + .flatten() + .astype(np.float32) + ) + + if actual.shape != expected.shape: + pytest.fail(f"Shape mismatch: expected={expected.shape}, actual={actual.shape}") + + similarity = float( + np.dot(actual, expected) / (np.linalg.norm(actual) * np.linalg.norm(expected)) + ) + actual_norm = float(np.linalg.norm(actual)) + expected_norm = float(np.linalg.norm(expected)) + name = case_name or "case" + + if similarity < min_cosine_similarity: + pytest.fail( + f"Embedding similarity too low: {similarity:.6f} < {min_cosine_similarity}" + ) + + print(f"\n/{model_id}") + print( + f"{name} stats: shape={actual.shape} cosine_similarity={similarity:.6f} " + f"norm_actual={actual_norm:.6f} norm_expected={expected_norm:.6f}" + ) diff --git a/tests/model_snapshots/generate_references.py b/tests/model_snapshots/generate_references.py new file mode 100644 index 0000000..24c49b5 --- /dev/null +++ b/tests/model_snapshots/generate_references.py @@ -0,0 +1,119 @@ +import json +import os +from typing import TypedDict + +import numpy as np +from rationai import Client + +from tests.model_snapshots._shared import _read_tile_at, test_refs_dir + + +OUT_DIR = test_refs_dir() +MODELS_BASE_URL = os.environ.get( + "MODEL_SERVICE_MODELS_BASE_URL", + "http://rayservice-model-tests-serve-svc.rationai-jobs-ns.svc.cluster.local:8000", +) +BINARY_POSITIVE_THRESHOLD = 0.5 + + +class CaseConfig(TypedDict): + label: str + slide_path: str + model_id: str + type: str + tile_size: int + level: int + x: int + y: int + + +# Keep only one active case here. Store other candidate slides in new_images.txt +# and swap them in when you want to regenerate a different reference. +ACTIVE_CASE: CaseConfig = { + "label": "prov-gigapath", + "slide_path": "/mnt/data/MOU/prostate/tile_level_annotations/P-2016_1367-01-0.mrxs", + "model_id": "prov-gigapath", + "type": "embed", + "tile_size": 224, + "level": 0, + "x": 40000, + "y": 70000, +} + +CASES: list[CaseConfig] = [ACTIVE_CASE] + + +def generate_references() -> None: + OUT_DIR.mkdir(parents=True, exist_ok=True) + print(f"== Generating references to {OUT_DIR} via {MODELS_BASE_URL} ==") + + with Client(models_base_url=MODELS_BASE_URL, timeout=1200) as client: + for case in CASES: + label, model_id, mtype = case["label"], case["model_id"], case["type"] + print(f"\n[{label}] {model_id} ({mtype})") + + try: + tile = _read_tile_at( + case["slide_path"], + case["x"], + case["y"], + case["tile_size"], + case["level"], + ) + except Exception as e: + print(f" -> Failed to read tile: {e}") + continue + + try: + if mtype == "binary": + score = float( + client.models.classify_image( + model=model_id, image=tile, timeout=600 + ) + ) + out_file = OUT_DIR / f"{label}_{model_id}_expected.json" + with out_file.open("w") as f: + json.dump( + { + "label": label, + "model_id": model_id, + "slide_path": case["slide_path"], + "x": case["x"], + "y": case["y"], + "tile_size": case["tile_size"], + "level": case["level"], + "threshold": BINARY_POSITIVE_THRESHOLD, + "expected_is_positive": score + >= BINARY_POSITIVE_THRESHOLD, + "expected_score": score, + }, + f, + indent=2, + ) + print(f" -> Saved {out_file}") + + elif mtype == "semantic": + arr = np.asarray( + client.models.segment_image( + model=model_id, image=tile, timeout=1200 + ) + ) + out_file = OUT_DIR / f"{label}_{model_id}_expected.npy" + np.save(out_file, arr) + print(f" -> Saved {out_file} shape={arr.shape}") + + elif mtype == "embed": + arr = np.asarray( + client.models.embed_image( + model=model_id, image=tile, timeout=1200 + ) + ) + out_file = OUT_DIR / f"{label}_{model_id}_expected.npy" + np.save(out_file, arr) + print(f" -> Saved {out_file} shape={arr.shape}") + except Exception as e: + print(f" -> ERROR: {e}") + + +if __name__ == "__main__": + generate_references() diff --git a/tests/model_snapshots/new_images.txt b/tests/model_snapshots/new_images.txt new file mode 100644 index 0000000..9a74517 --- /dev/null +++ b/tests/model_snapshots/new_images.txt @@ -0,0 +1,7 @@ +# New images and coordinates +# Format: label | slide_path | x | y | tile_size | level | notes +prostate_positive | /mnt/data/MOU/prostate/tile_level_annotations/P-2016_2386-06-1.mrxs | 43390 | 45865 | 512 | 0 | prostate-classifier-1 positive +prostate_negative | /mnt/data/MOU/prostate/tile_level_annotations/P-2016_0845-02-0.mrxs | 32950 | 108990 | 512 | 0 | prostate-classifier-1 negative +prostate | /mnt/data/MOU/prostate/tile_level_annotations/P-2016_1367-01-0.mrxs | 40000 | 70000 | 224 | 0 | virchow2 embed +prov-gigapath | /mnt/data/MOU/prostate/tile_level_annotations/P-2016_1367-01-0.mrxs | 40000 | 70000 | 224 | 0 | prov-gigapath embed +colorectum_kos04 | /mnt/data/MOU/colorectum/tissue_microarray/he/KOS04.mrxs | 46000 | 82400 | 1024 | 0 | episeg-1 semantic diff --git a/tests/model_snapshots/test_binary_classifier_model_snapshot.py b/tests/model_snapshots/test_binary_classifier_model_snapshot.py new file mode 100644 index 0000000..fcbef95 --- /dev/null +++ b/tests/model_snapshots/test_binary_classifier_model_snapshot.py @@ -0,0 +1,53 @@ +import json + +import pytest + +from tests.model_snapshots._shared import run_binary_classifier_case, test_refs_dir + + +BINARY_POSITIVE_THRESHOLD = 0.5 + + +@pytest.mark.parametrize( + "label, slide_path, x, y", + [ + ( + "prostate_positive", + "/mnt/data/MOU/prostate/tile_level_annotations/P-2016_2386-06-1.mrxs", + 43390, + 45865, + ), + ( + "prostate_negative", + "/mnt/data/MOU/prostate/tile_level_annotations/P-2016_0845-02-0.mrxs", + 32950, + 108990, + ), + ], +) +def test_prostate_classifier_snapshot( + label: str, slide_path: str, x: int, y: int +) -> None: + model_id = "prostate-classifier-1" + json_path = test_refs_dir() / f"{label}_{model_id}_expected.json" + + with json_path.open() as f: + reference = json.load(f) + + expected_score = reference["expected_score"] + threshold = reference.get("threshold", BINARY_POSITIVE_THRESHOLD) + expected_is_positive = reference.get("expected_is_positive") + assert expected_is_positive is not None + + run_binary_classifier_case( + model_id=model_id, + slide_path=slide_path, + x=x, + y=y, + expected_score=expected_score, + tile_size=512, + level=0, + expected_is_positive=expected_is_positive, + threshold=threshold, + case_name=label, + ) diff --git a/tests/model_snapshots/test_prov_gigapath_model_snapshot.py b/tests/model_snapshots/test_prov_gigapath_model_snapshot.py new file mode 100644 index 0000000..f206d19 --- /dev/null +++ b/tests/model_snapshots/test_prov_gigapath_model_snapshot.py @@ -0,0 +1,30 @@ +import pytest + +from tests.model_snapshots._shared import run_embed_case, test_refs_dir + + +@pytest.mark.parametrize( + "label, slide_path, x, y", + [ + ( + "prov-gigapath", + "/mnt/data/MOU/prostate/tile_level_annotations/P-2016_1367-01-0.mrxs", + 40000, + 70000, + ), + ], +) +def test_prov_gigapath(label: str, slide_path: str, x: int, y: int) -> None: + model_id = "prov-gigapath" + expected_array_path = test_refs_dir() / f"{label}_{model_id}_expected.npy" + + run_embed_case( + model_id=model_id, + slide_path=slide_path, + x=x, + y=y, + expected_array_path=expected_array_path, + tile_size=224, + level=0, + case_name=label, + ) diff --git a/tests/model_snapshots/test_semantic_segmentation_model_snapshot.py b/tests/model_snapshots/test_semantic_segmentation_model_snapshot.py new file mode 100644 index 0000000..7022871 --- /dev/null +++ b/tests/model_snapshots/test_semantic_segmentation_model_snapshot.py @@ -0,0 +1,32 @@ +import pytest + +from tests.model_snapshots._shared import run_semantic_segmentation_case, test_refs_dir + + +@pytest.mark.parametrize( + "label, slide_path, x, y", + [ + ( + "colorectum_kos04", + "/mnt/data/MOU/colorectum/tissue_microarray/he/KOS04.mrxs", + 46000, + 82400, + ), + ], +) +def test_semantic_episeg(label: str, slide_path: str, x: int, y: int) -> None: + model_id = "episeg-1" + expected_array_path = test_refs_dir() / f"{label}_{model_id}_expected.npy" + + run_semantic_segmentation_case( + model_id=model_id, + slide_path=slide_path, + x=x, + y=y, + expected_array_path=expected_array_path, + tile_size=1024, + level=0, + epithelium_threshold=0.5, + min_epithelium_fraction=0.01, + case_name=label, + ) diff --git a/tests/model_snapshots/test_virchow2_model_snapshot.py b/tests/model_snapshots/test_virchow2_model_snapshot.py new file mode 100644 index 0000000..8fa79d8 --- /dev/null +++ b/tests/model_snapshots/test_virchow2_model_snapshot.py @@ -0,0 +1,30 @@ +import pytest + +from tests.model_snapshots._shared import run_embed_case, test_refs_dir + + +@pytest.mark.parametrize( + "label, slide_path, x, y", + [ + ( + "virchow2", + "/mnt/data/MOU/prostate/tile_level_annotations/P-2016_1367-01-0.mrxs", + 40000, + 70000, + ), + ], +) +def test_virchow2(label: str, slide_path: str, x: int, y: int) -> None: + model_id = "virchow2" + expected_array_path = test_refs_dir() / f"{label}_{model_id}_expected.npy" + + run_embed_case( + model_id=model_id, + slide_path=slide_path, + x=x, + y=y, + expected_array_path=expected_array_path, + tile_size=224, + level=0, + case_name=label, + ) diff --git a/tests/perf_throughput.py b/tests/perf_throughput.py new file mode 100644 index 0000000..2be004d --- /dev/null +++ b/tests/perf_throughput.py @@ -0,0 +1,317 @@ +from __future__ import annotations + +import argparse +import os +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from dataclasses import dataclass, field +from threading import Lock +from typing import TypedDict + +import numpy as np +from rationai import Client + + +DEFAULT_MODELS = [ + ("prostate-classifier-1", "binary", 512), + ("episeg-1", "semantic", 1024), + ("virchow2", "embed", 224), + ("prov-gigapath", "embed", 224), +] +POOL_SIZE_DEFAULT = 64 + + +class ModelResult(TypedDict): + name: str + model_type: str + tile_size: int + elapsed_s: float + ok: int + fail_503: int + fail_other: int + throughput: float + p50: float + p95: float + p99: float + + +@dataclass +class Stats: + ok: int = 0 + fail_503: int = 0 + fail_other: int = 0 + latencies: list[float] = field(default_factory=list) + lock: Lock = field(default_factory=Lock) + + @property + def total(self) -> int: + return self.ok + self.fail_503 + self.fail_other + + def percentile(self, p: float) -> float: + if not self.latencies: + return 0.0 + return float(np.percentile(self.latencies, p)) + + +def _models_base_url() -> str: + return os.environ.get( + "MODEL_SERVICE_MODELS_BASE_URL", + "http://rayservice-model-tests-serve-svc.rationai-jobs-ns.svc.cluster.local:8000", + ) + + +def make_pool(tile_size: int, n: int) -> list[np.ndarray]: + rng = np.random.default_rng(seed=42) + return [ + rng.integers(0, 256, (tile_size, tile_size, 3), dtype=np.uint8) + for _ in range(n) + ] + + +def _call_model( + client: Client, model_id: str, model_type: str, image: np.ndarray +) -> None: + if model_type == "binary": + client.models.classify_image(model=model_id, image=image) + elif model_type == "semantic": + client.models.segment_image(model=model_id, image=image) + elif model_type == "embed": + client.models.embed_image(model=model_id, image=image) + else: + raise ValueError(f"Unknown model type: {model_type}") + + +def wait_for_ready( + model_id: str, + model_type: str, + tile_size: int, + timeout: float, + models_base_url: str, + wait_timeout_s: float, + wait_interval_s: float, +) -> None: + image = make_pool(tile_size, 1)[0] + start = time.perf_counter() + reported = False + + while True: + try: + with Client(models_base_url=models_base_url, timeout=timeout) as client: + _call_model(client, model_id, model_type, image) + if reported: + print(f"{model_id} ready after {time.perf_counter() - start:.1f}s") + return + except Exception as exc: + if isinstance(exc, ValueError): + raise + status_code = getattr(getattr(exc, "response", None), "status_code", None) + if status_code not in (None, 503, 504): + raise + if not reported: + print(f"{model_id} waiting for readiness...") + reported = True + elapsed = time.perf_counter() - start + if wait_timeout_s > 0 and elapsed >= wait_timeout_s: + raise RuntimeError( + f"{model_id} not ready after {wait_timeout_s:.1f}s" + ) from exc + time.sleep(wait_interval_s) + + +def send_loop( + model_id: str, + model_type: str, + pool: list[np.ndarray], + stats: Stats, + end_time: float, + timeout: float, + models_base_url: str, +) -> None: + pool_len = len(pool) + idx = 0 + with Client(models_base_url=models_base_url, timeout=timeout) as client: + while time.perf_counter() < end_time: + image = pool[idx % pool_len] + idx += 1 + t0 = time.perf_counter() + try: + _call_model(client, model_id, model_type, image) + latency = time.perf_counter() - t0 + with stats.lock: + stats.ok += 1 + stats.latencies.append(latency) + except Exception as exc: + status_code = getattr( + getattr(exc, "response", None), "status_code", None + ) + with stats.lock: + if status_code == 503: + stats.fail_503 += 1 + else: + stats.fail_other += 1 + + +def run_model( + name: str, + model_type: str, + tile_size: int, + duration_s: float, + concurrency: int, + timeout: float, + pool_size: int, + models_base_url: str, +) -> ModelResult: + if pool_size <= 0: + raise ValueError("pool_size must be > 0") + + pool = make_pool(tile_size, pool_size) + stats = Stats() + + start = time.perf_counter() + end_time = start + duration_s + with ThreadPoolExecutor(max_workers=concurrency) as executor: + futures = [ + executor.submit( + send_loop, + name, + model_type, + pool, + stats, + end_time, + timeout, + models_base_url, + ) + for _ in range(concurrency) + ] + for future in as_completed(futures): + future.result() + elapsed = time.perf_counter() - start + + throughput = stats.ok / elapsed if elapsed > 0 else 0.0 + return { + "name": name, + "model_type": model_type, + "tile_size": tile_size, + "elapsed_s": elapsed, + "ok": stats.ok, + "fail_503": stats.fail_503, + "fail_other": stats.fail_other, + "throughput": throughput, + "p50": stats.percentile(50), + "p95": stats.percentile(95), + "p99": stats.percentile(99), + } + + +def parse_models(values: list[str]) -> list[tuple[str, str, int]]: + if not values: + return DEFAULT_MODELS + parsed: list[tuple[str, str, int]] = [] + for item in values: + parts = [p.strip() for p in item.split(",")] + if len(parts) != 3: + raise ValueError("--model expects: model_id,model_type,tile_size") + model_id, model_type, tile_size = parts[0], parts[1], int(parts[2]) + parsed.append((model_id, model_type, tile_size)) + return parsed + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Run per-model throughput tests and report img/s via SDK." + ) + parser.add_argument( + "--model", + action="append", + default=[], + help="Model spec: model_id,model_type,tile_size (repeatable)", + ) + parser.add_argument( + "--models-base-url", + default=_models_base_url(), + help="Base URL for the SDK (default: MODEL_SERVICE_MODELS_BASE_URL or http://localhost:8000)", + ) + parser.add_argument("--duration-s", type=float, default=300.0) + parser.add_argument("--concurrency", type=int, default=64) + parser.add_argument("--timeout", type=float, default=60.0) + parser.add_argument("--pool-size", type=int, default=POOL_SIZE_DEFAULT) + parser.add_argument( + "--wait-ready", + action="store_true", + help="Wait for each model to become ready before running the test", + ) + parser.add_argument( + "--wait-timeout-s", + type=float, + default=0.0, + help="Max time to wait for readiness (0 = wait forever)", + ) + parser.add_argument( + "--wait-interval-s", + type=float, + default=10.0, + help="Wait interval between readiness checks", + ) + args = parser.parse_args() + + models = parse_models(args.model) + + print("=" * 72) + print("Throughput Test (img/s) - SDK") + print("=" * 72) + print(f"Models base URL: {args.models_base_url}") + print(f"Duration: {args.duration_s:.0f}s") + print(f"Concurrency: {args.concurrency}") + print(f"Timeout: {args.timeout}s") + print() + + results: list[ModelResult] = [] + for name, model_type, tile_size in models: + if args.wait_ready: + wait_for_ready( + model_id=name, + model_type=model_type, + tile_size=tile_size, + timeout=args.timeout, + models_base_url=args.models_base_url, + wait_timeout_s=args.wait_timeout_s, + wait_interval_s=args.wait_interval_s, + ) + result = run_model( + name, + model_type, + tile_size, + args.duration_s, + args.concurrency, + args.timeout, + args.pool_size, + args.models_base_url, + ) + results.append(result) + print( + f"{name} stats: ok={result['ok']} fail_503={result['fail_503']} " + f"fail_other={result['fail_other']} elapsed={result['elapsed_s']:.2f}s " + f"img/s={result['throughput']:.2f} p50={result['p50']:.3f}s " + f"p95={result['p95']:.3f}s p99={result['p99']:.3f}s" + ) + + print("Summary") + print( + "name".ljust(28), + "img/s".rjust(10), + "p50".rjust(10), + "p95".rjust(10), + "p99".rjust(10), + ) + for r in results: + print( + r["name"].ljust(28), + f"{r['throughput']:.2f}".rjust(10), + f"{r['p50']:.3f}".rjust(10), + f"{r['p95']:.3f}".rjust(10), + f"{r['p99']:.3f}".rjust(10), + ) + + +if __name__ == "__main__": + main() diff --git a/uv.lock b/uv.lock index 3ac8fef..ecad61f 100644 --- a/uv.lock +++ b/uv.lock @@ -1235,6 +1235,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/fa/5e/f8e9a1d23b9c20a551a8a02ea3637b4642e22c2626e3a13a9a29cdea99eb/importlib_metadata-8.7.1-py3-none-any.whl", hash = "sha256:5a1f80bf1daa489495071efbb095d75a634cf28a8bc299581244063b53176151", size = 27865, upload-time = "2025-12-21T10:00:18.329Z" }, ] +[[package]] +name = "iniconfig" +version = "2.3.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/72/34/14ca021ce8e5dfedc35312d08ba8bf51fdd999c576889fc2c24cb97f4f10/iniconfig-2.3.0.tar.gz", hash = "sha256:c76315c77db068650d49c5b56314774a7804df16fee4402c1f19d6d15d8c4730", size = 20503, upload-time = "2025-10-18T21:55:43.219Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/cb/b1/3846dd7f199d53cb17f49cba7e651e9ce294d8497c8c150530ed11865bb8/iniconfig-2.3.0-py3-none-any.whl", hash = "sha256:f631c04d2c48c52b84d0d0549c99ff3859c98df65b3101406327ecc7d53fbf12", size = 7484, upload-time = "2025-10-18T21:55:41.639Z" }, +] + [[package]] name = "itsdangerous" version = "2.2.0" @@ -1777,6 +1786,7 @@ dependencies = [ [package.dev-dependencies] dev = [ { name = "mypy" }, + { name = "pytest" }, { name = "ruff" }, ] docs = [ @@ -1798,6 +1808,7 @@ requires-dist = [ [package.metadata.requires-dev] dev = [ { name = "mypy", specifier = ">=1.18.2" }, + { name = "pytest", specifier = ">=8.4.2" }, { name = "ruff", specifier = ">=0.14.6" }, ] docs = [ @@ -2570,6 +2581,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/75/a6/a0a304dc33b49145b21f4808d763822111e67d1c3a32b524a1baf947b6e1/platformdirs-4.9.6-py3-none-any.whl", hash = "sha256:e61adb1d5e5cb3441b4b7710bea7e4c12250ca49439228cc1021c00dcfac0917", size = 21348, upload-time = "2026-04-09T00:04:09.463Z" }, ] +[[package]] +name = "pluggy" +version = "1.6.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/f9/e2/3e91f31a7d2b083fe6ef3fa267035b518369d9511ffab804f839851d2779/pluggy-1.6.0.tar.gz", hash = "sha256:7dcc130b76258d33b90f61b658791dede3486c3e6bfb003ee5c9bfb396dd22f3", size = 69412, upload-time = "2025-05-15T12:30:07.975Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, +] + [[package]] name = "prometheus-client" version = "0.25.0" @@ -2983,6 +3003,22 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/15/73/a7141a1a0559bf1a7aa42a11c879ceb19f02f5c6c371c6d57fd86cefd4d1/pyproj-3.7.2-cp314-cp314t-win_arm64.whl", hash = "sha256:d9d25bae416a24397e0d85739f84d323b55f6511e45a522dd7d7eae70d10c7e4", size = 6391844, upload-time = "2025-08-14T12:05:40.745Z" }, ] +[[package]] +name = "pytest" +version = "9.0.3" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "colorama", marker = "sys_platform == 'win32'" }, + { name = "iniconfig" }, + { name = "packaging" }, + { name = "pluggy" }, + { name = "pygments" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/7d/0d/549bd94f1a0a402dc8cf64563a117c0f3765662e2e668477624baeec44d5/pytest-9.0.3.tar.gz", hash = "sha256:b86ada508af81d19edeb213c681b1d48246c1a91d304c6c81a427674c17eb91c", size = 1572165, upload-time = "2026-04-07T17:16:18.027Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/d4/24/a372aaf5c9b7208e7112038812994107bc65a84cd00e0354a88c2c77a617/pytest-9.0.3-py3-none-any.whl", hash = "sha256:2c5efc453d45394fdd706ade797c0a81091eccd1d6e4bccfcd476e2b8e0ab5d9", size = 375249, upload-time = "2026-04-07T17:16:16.13Z" }, +] + [[package]] name = "python-dateutil" version = "2.9.0.post0"