diff --git a/src/scope/cloud/fal_app.py b/src/scope/cloud/fal_app.py index ea8d4ea64..7e23fe298 100644 --- a/src/scope/cloud/fal_app.py +++ b/src/scope/cloud/fal_app.py @@ -177,6 +177,12 @@ def is_running(self) -> bool: # Persistent shared directory for sample LoRAs (survives session cleanup) SHARED_LORA_DIR = "/data/models/lora" +# Persistent user LoRA directory: stored on the /data volume so LoRAs installed +# by the user survive fal.ai worker resets between jobs. This is distinct from +# SHARED_LORA_DIR (which holds pre-bundled sample LoRAs) so the two can be +# managed and cleaned up independently. +USER_LORA_DIR = "/data/models/user-loras" + # Gates the "ready" WebSocket message until the previous session's cleanup completes. # Initialized lazily to ensure an event loop is available. @@ -286,25 +292,36 @@ def cleanup_session_data(): This prevents data leakage between users on fal.ai by clearing: - Assets directory (uploaded images, videos) - Recording files in temp directory + - User-installed LoRA directory (persistent volume, cleared per-session for isolation) """ from pathlib import Path + def _rmdir_contents(path: Path, label: str) -> None: + """Delete all children of *path* without removing the directory itself.""" + if not path.exists(): + return + for item in path.iterdir(): + try: + if item.is_file(): + item.unlink() + elif item.is_dir(): + shutil.rmtree(item) + except Exception as e: + print(f"Warning: Failed to delete {item}: {e}") + print(f"Cleaned up {label}: {path}") + try: # Clean assets directory (matches DAYDREAM_SCOPE_ASSETS_DIR set in setup) - assets_dir = Path(ASSETS_DIR_PATH).expanduser() - if assets_dir.exists(): - for item in assets_dir.iterdir(): - try: - if item.is_file(): - item.unlink() - elif item.is_dir(): - shutil.rmtree(item) - except Exception as e: - print(f"Warning: Failed to delete {item}: {e}") - print(f"Cleaned up assets directory: {assets_dir}") + _rmdir_contents(Path(ASSETS_DIR_PATH).expanduser(), "assets directory") + except Exception as e: + print(f"Warning: Assets cleanup failed: {e}") + try: + # Clean user LoRA directory (persistent volume — must be wiped between + # sessions to prevent LoRA files from one user leaking to the next). + _rmdir_contents(Path(USER_LORA_DIR), "user LoRA directory") except Exception as e: - print(f"Warning: Session cleanup failed: {e}") + print(f"Warning: User LoRA cleanup failed: {e}") async def cleanup_installed_plugins(): @@ -479,7 +496,10 @@ def setup(self): # not shared between users scope_env["DAYDREAM_SCOPE_LOGS_DIR"] = ASSETS_DIR_PATH + "/logs" scope_env["DAYDREAM_SCOPE_ASSETS_DIR"] = ASSETS_DIR_PATH - scope_env["DAYDREAM_SCOPE_LORA_DIR"] = ASSETS_DIR_PATH + "/lora" + # Store user-installed LoRAs on the persistent /data volume so they + # survive fal.ai worker resets between jobs. The tmp-based path was + # cleared on every new job, causing pipeline load failures (#923). + scope_env["DAYDREAM_SCOPE_LORA_DIR"] = USER_LORA_DIR scope_env["DAYDREAM_SCOPE_LORA_SHARED_DIR"] = "/data/models/lora" scope_env["UV_CACHE_DIR"] = "/tmp/uv-cache" diff --git a/src/scope/cloud/livepeer_app.py b/src/scope/cloud/livepeer_app.py index c36840d17..47eb0b22b 100644 --- a/src/scope/cloud/livepeer_app.py +++ b/src/scope/cloud/livepeer_app.py @@ -54,6 +54,9 @@ REMOTE_VIDEO_CLOCK_RATE = 90_000 REMOTE_VIDEO_TIME_BASE = fractions.Fraction(1, REMOTE_VIDEO_CLOCK_RATE) ASSETS_DIR_PATH = os.getenv("DAYDREAM_SCOPE_ASSETS_DIR", "/tmp/.daydream-scope/assets") +# User LoRA dir lives on the persistent volume; read from env so it matches the +# value injected by the outer fal app at worker start (#923). +USER_LORA_DIR = os.getenv("DAYDREAM_SCOPE_LORA_DIR", "/data/models/user-loras") @asynccontextmanager @@ -1178,16 +1181,15 @@ async def _cleanup_plugins_via_scope_client() -> dict[str, Any]: } -def _cleanup_assets_dir() -> dict[str, Any]: - """Delete all files and directories inside the configured assets directory.""" - assets_dir = Path(ASSETS_DIR_PATH).expanduser() +def _cleanup_dir_contents(dir_path: Path) -> dict[str, Any]: + """Delete all files and directories inside *dir_path* without removing it.""" deleted = 0 errors: list[dict[str, str]] = [] - if not assets_dir.exists(): - return {"path": str(assets_dir), "deleted": deleted, "errors": errors} + if not dir_path.exists(): + return {"path": str(dir_path), "deleted": deleted, "errors": errors} - for item in assets_dir.iterdir(): + for item in dir_path.iterdir(): try: if item.is_file(): item.unlink() @@ -1198,24 +1200,41 @@ def _cleanup_assets_dir() -> dict[str, Any]: except Exception as exc: errors.append({"path": str(item), "error": str(exc)}) - return {"path": str(assets_dir), "deleted": deleted, "errors": errors} + return {"path": str(dir_path), "deleted": deleted, "errors": errors} + + +def _cleanup_assets_dir() -> dict[str, Any]: + """Delete all files and directories inside the configured assets directory.""" + return _cleanup_dir_contents(Path(ASSETS_DIR_PATH).expanduser()) + + +def _cleanup_user_lora_dir() -> dict[str, Any]: + """Delete user-installed LoRAs from the persistent volume. + + User LoRAs are stored on /data (not /tmp) so they survive worker resets + within a session (#923). They must still be wiped at session end to + prevent one user's LoRAs from leaking to the next user on the same worker. + """ + return _cleanup_dir_contents(Path(USER_LORA_DIR)) @app.post("/internal/cleanup-session") async def cleanup_session() -> dict[str, Any]: - """Cleanup plugins and assets after the outer fal websocket disconnects.""" + """Cleanup plugins, assets and user LoRAs after the outer fal websocket disconnects.""" try: plugins = await _cleanup_plugins_via_scope_client() assets = _cleanup_assets_dir() + loras = _cleanup_user_lora_dir() except RuntimeError as exc: raise HTTPException(status_code=503, detail=str(exc)) from exc except Exception as exc: raise HTTPException(status_code=500, detail=str(exc)) from exc return { - "ok": not plugins["failed"] and not assets["errors"], + "ok": not plugins["failed"] and not assets["errors"] and not loras["errors"], "plugins": plugins, "assets": assets, + "loras": loras, } diff --git a/src/scope/cloud/livepeer_fal_app.py b/src/scope/cloud/livepeer_fal_app.py index 3a48ed49d..74192cfdd 100644 --- a/src/scope/cloud/livepeer_fal_app.py +++ b/src/scope/cloud/livepeer_fal_app.py @@ -28,6 +28,10 @@ RUNNER_FAILURE_WINDOW_SECONDS = 60.0 ASSETS_DIR_PATH = "/tmp/.daydream-scope/assets" +# Persistent user LoRA directory (matches fal_app.py). Stored on the /data +# volume so user-installed LoRAs survive fal.ai worker resets (#923). +USER_LORA_DIR = "/data/models/user-loras" + # Gates startup cleanup so only one cleanup run executes at a time. _cleanup_event: asyncio.Event | None = None @@ -272,7 +276,9 @@ def setup(self): runner_env.setdefault("DAYDREAM_SCOPE_MODELS_DIR", "/data/models") runner_env.setdefault("DAYDREAM_SCOPE_LORA_SHARED_DIR", "/data/models/lora") runner_env.setdefault("DAYDREAM_SCOPE_ASSETS_DIR", ASSETS_DIR_PATH) - runner_env.setdefault("DAYDREAM_SCOPE_LORA_DIR", ASSETS_DIR_PATH + "/lora") + # Store user-installed LoRAs on the persistent /data volume so they + # survive fal.ai worker resets between jobs (#923). + runner_env.setdefault("DAYDREAM_SCOPE_LORA_DIR", USER_LORA_DIR) runner_env.setdefault("DAYDREAM_SCOPE_LOGS_DIR", ASSETS_DIR_PATH + "/logs") runner_env.setdefault( "DAYDREAM_SCOPE_PLUGINS_DIR", ASSETS_DIR_PATH + "/plugins" diff --git a/src/scope/core/pipelines/wan2_1/lora/utils.py b/src/scope/core/pipelines/wan2_1/lora/utils.py index e34f4b63d..45ff50c1e 100644 --- a/src/scope/core/pipelines/wan2_1/lora/utils.py +++ b/src/scope/core/pipelines/wan2_1/lora/utils.py @@ -410,6 +410,23 @@ def parse_lora_weights( f"parse_lora_weights: Matched base_key='{base_key}' -> model_key='{model_key}'" ) + # Validate LoRA dimensions against the model weight before injecting. + # lora_A shape: [rank, in_features] — in_features must match model weight dim 1 + # lora_B shape: [out_features, rank] — out_features must match model weight dim 0 + # (model weight shape is [out_features, in_features] for nn.Linear) + model_weight = model_state.get(model_key) + if model_weight is not None and lora_A.ndim == 2 and lora_B.ndim == 2: + lora_in = lora_A.shape[1] # LoRA expects this input dimension + lora_out = lora_B.shape[0] # LoRA expects this output dimension + model_out, model_in = model_weight.shape[0], model_weight.shape[1] + if lora_in != model_in or lora_out != model_out: + raise ValueError( + f"LoRA dimension mismatch at layer '{base_key}': " + f"LoRA expects ({lora_out}×{lora_in}) but model layer is ({model_out}×{model_in}). " + f"This LoRA was likely trained for a different model size (e.g. Wan2.1-5B vs 1.3B). " + f"Please use a LoRA that matches the loaded model architecture." + ) + # Extract alpha and rank alpha = None if alpha_key and alpha_key in lora_state: diff --git a/tests/test_lora_dimension_validation.py b/tests/test_lora_dimension_validation.py new file mode 100644 index 000000000..6f4c0e12c --- /dev/null +++ b/tests/test_lora_dimension_validation.py @@ -0,0 +1,87 @@ +"""Tests for LoRA dimension validation in parse_lora_weights. + +Regression test for issue #922: a LoRA trained for Wan2.1-5B (in_features=5120) +was silently loaded into the Wan2.1-1.3B model (in_features=1536) and only +failed 156 times at inference time with an inscrutable RuntimeError. +""" + +import pytest +import torch + +from scope.core.pipelines.wan2_1.lora.utils import parse_lora_weights + + +def _make_model_state(in_features: int, out_features: int = 256) -> dict: + """Minimal model state dict with one linear layer.""" + return { + "blocks.0.self_attn.q.weight": torch.zeros(out_features, in_features), + } + + +def _make_lora_state(rank: int, in_features: int, out_features: int = 256) -> dict: + """Minimal PEFT-format LoRA state targeting the same layer.""" + return { + "diffusion_model.blocks.0.self_attn.q.lora_A.weight": torch.zeros(rank, in_features), + "diffusion_model.blocks.0.self_attn.q.lora_B.weight": torch.zeros(out_features, rank), + } + + +class TestLoRADimensionValidation: + """Verify parse_lora_weights raises a clear error on dimension mismatch.""" + + def test_compatible_lora_loads_successfully(self): + """LoRA matching the model's dimensions should parse without error.""" + model_state = _make_model_state(in_features=1536) + lora_state = _make_lora_state(rank=32, in_features=1536) + + mapping = parse_lora_weights(lora_state, model_state) + + assert len(mapping) == 1 + key = "blocks.0.self_attn.q.weight" + assert key in mapping + assert mapping[key]["rank"] == 32 + + def test_incompatible_lora_raises_value_error(self): + """LoRA trained for 5B (in_features=5120) must not silently load into 1.3B (in_features=1536).""" + model_state = _make_model_state(in_features=1536) # 1.3B model + lora_state = _make_lora_state(rank=32, in_features=5120) # 5B LoRA + + with pytest.raises(ValueError, match="LoRA dimension mismatch"): + parse_lora_weights(lora_state, model_state) + + def test_error_message_is_user_friendly(self): + """The error message should name the layer and the dimension sizes.""" + model_state = _make_model_state(in_features=1536) + lora_state = _make_lora_state(rank=32, in_features=5120) + + with pytest.raises(ValueError) as exc_info: + parse_lora_weights(lora_state, model_state) + + msg = str(exc_info.value) + assert "blocks.0.self_attn.q" in msg, "Layer name should appear in error" + assert "5120" in msg, "LoRA in_features should appear in error" + assert "1536" in msg, "Model in_features should appear in error" + assert "model size" in msg.lower() or "architecture" in msg.lower(), ( + "Error should hint at model size mismatch" + ) + + def test_out_features_mismatch_also_caught(self): + """LoRA with wrong output dimension should also be rejected.""" + model_state = _make_model_state(in_features=1536, out_features=256) + # LoRA with matching in_features but wrong out_features + lora_state = { + "diffusion_model.blocks.0.self_attn.q.lora_A.weight": torch.zeros(32, 1536), + "diffusion_model.blocks.0.self_attn.q.lora_B.weight": torch.zeros(512, 32), # wrong + } + + with pytest.raises(ValueError, match="LoRA dimension mismatch"): + parse_lora_weights(lora_state, model_state) + + def test_compatible_5b_lora_on_5b_model(self): + """LoRA trained for 5B on a 5B model should load fine.""" + model_state = _make_model_state(in_features=5120, out_features=5120) + lora_state = _make_lora_state(rank=32, in_features=5120, out_features=5120) + + mapping = parse_lora_weights(lora_state, model_state) + + assert len(mapping) == 1