From 7b7e690a025f450e18fa0cf81a81d92cbc1ae44b Mon Sep 17 00:00:00 2001 From: Kevin Codex Date: Mon, 22 Jun 2026 14:36:53 +0800 Subject: [PATCH 1/5] feat(storage): storage-agnostic BlobStore layer + push-latency quick wins Replace the hardcoded Tigris client with a backend-agnostic object store. BlobStore trait (get/put/head/delete/list) with backends: - s3: any S3-compatible service (Tigris, Cloudflare R2, AWS S3, MinIO, B2) - fs: local filesystem (atomic writes; unit-tested) - ipfs: Kubo node via MFS (etag = content CID) RepoArchive composes a bare repo into one repos/v1/{slug}/{repo}.tar.zst object on top of any backend (tar+zstd moved here; git/tigris.rs removed). RepoStore rewired from Option to Option. Config (backward compatible): GITLAWB_STORAGE_BACKEND, GITLAWB_S3_BUCKET, GITLAWB_S3_ENDPOINT, GITLAWB_S3_FORCE_PATH_STYLE, GITLAWB_STORAGE_FS_DIR, GITLAWB_ASYNC_UPLOAD. GITLAWB_TIGRIS_BUCKET kept as a legacy alias. Push-latency quick wins: - Skip the redundant pre-write download when the cached etag already matches storage (previously up to two full-repo downloads per push). - Write-back ack: ack the client before the durable upload completes; the advisory lock is held until upload finishes so cross-node consistency holds. cargo check clean; storage::fs + repo_store unit tests pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- Cargo.lock | 1 + crates/gitlawb-node/Cargo.toml | 1 + crates/gitlawb-node/src/api/repos.rs | 15 +- crates/gitlawb-node/src/config.rs | 36 ++- crates/gitlawb-node/src/git/mod.rs | 1 - crates/gitlawb-node/src/git/repo_store.rs | 282 ++++++++++++------ crates/gitlawb-node/src/main.rs | 23 +- .../src/{git/tigris.rs => storage/archive.rs} | 153 ++++------ crates/gitlawb-node/src/storage/fs.rs | 187 ++++++++++++ crates/gitlawb-node/src/storage/ipfs.rs | 192 ++++++++++++ crates/gitlawb-node/src/storage/mod.rs | 160 ++++++++++ crates/gitlawb-node/src/storage/s3.rs | 166 +++++++++++ 12 files changed, 1003 insertions(+), 214 deletions(-) rename crates/gitlawb-node/src/{git/tigris.rs => storage/archive.rs} (59%) create mode 100644 crates/gitlawb-node/src/storage/fs.rs create mode 100644 crates/gitlawb-node/src/storage/ipfs.rs create mode 100644 crates/gitlawb-node/src/storage/mod.rs create mode 100644 crates/gitlawb-node/src/storage/s3.rs diff --git a/Cargo.lock b/Cargo.lock index 1f3080b..2e2ab75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3362,6 +3362,7 @@ dependencies = [ "async-compression", "async-graphql", "async-graphql-axum", + "async-trait", "aws-config", "aws-sdk-s3", "axum", diff --git a/crates/gitlawb-node/Cargo.toml b/crates/gitlawb-node/Cargo.toml index 881d4bd..29011c2 100644 --- a/crates/gitlawb-node/Cargo.toml +++ b/crates/gitlawb-node/Cargo.toml @@ -32,6 +32,7 @@ sqlx = { version = "0.8", features = ["postgres", "runtime-tokio-rustls", "chron clap = { version = "4", features = ["derive", "env"] } bytes = "1" libc = "0.2" +async-trait = "0.1" cid = { workspace = true } hex = { workspace = true } sha2 = { workspace = true } diff --git a/crates/gitlawb-node/src/api/repos.rs b/crates/gitlawb-node/src/api/repos.rs index 47f9eaf..7bdcfcd 100644 --- a/crates/gitlawb-node/src/api/repos.rs +++ b/crates/gitlawb-node/src/api/repos.rs @@ -549,9 +549,20 @@ pub async fn git_receive_pack( let receive_result = smart_http::receive_pack(&disk_path, body).await; // Always release the advisory lock — even on error — to prevent stale locks - // from blocking subsequent pushes. Only upload to Tigris when the push + // from blocking subsequent pushes. Only upload to storage when the push // succeeded; uploading a half-applied repo would propagate corruption. - guard.release(receive_result.is_ok()).await; + let push_ok = receive_result.is_ok(); + if push_ok && state.config.async_upload { + // Write-back: ack the client now; the durable upload to object storage + // and the advisory-lock release run in the background. The lock is held + // until the upload finishes, so a concurrent writer on another machine + // can't observe a stale archive. Trades a small crash-durability window + // (local copy survives; lazy migration re-syncs) for much lower latency. + tokio::spawn(guard.release(true)); + } else { + // Strict path (or failed push): upload-before-ack / prompt lock release. + guard.release(push_ok).await; + } let result = receive_result.map_err(|e| { tracing::error!(repo = %name, err = %e, "git receive-pack failed"); diff --git a/crates/gitlawb-node/src/config.rs b/crates/gitlawb-node/src/config.rs index 929471a..b93526e 100644 --- a/crates/gitlawb-node/src/config.rs +++ b/crates/gitlawb-node/src/config.rs @@ -121,11 +121,45 @@ pub struct Config { #[arg(long, env = "GITLAWB_HEARTBEAT_INTERVAL_HOURS", default_value_t = 20)] pub heartbeat_interval_hours: u64, - /// Tigris (S3-compatible) bucket for repo storage. + /// Tigris (S3-compatible) bucket for repo storage. Legacy alias for + /// `s3_bucket` — still honoured so existing deployments keep working. /// Leave empty to disable Tigris and use local-only storage. #[arg(long, env = "GITLAWB_TIGRIS_BUCKET", default_value = "")] pub tigris_bucket: String, + /// Object-storage backend: `s3` (any S3-compatible service), `fs` (local + /// directory), or `ipfs` (Kubo MFS). Empty = auto-detect: `s3` when a bucket + /// is set, else `fs` when a storage dir is set, else `ipfs` when an IPFS API + /// is set, else local-only. + #[arg(long, env = "GITLAWB_STORAGE_BACKEND", default_value = "")] + pub storage_backend: String, + + /// Bucket for the `s3` backend (Tigris, R2, AWS S3, MinIO, B2). Falls back to + /// `tigris_bucket` when empty. + #[arg(long, env = "GITLAWB_S3_BUCKET", default_value = "")] + pub s3_bucket: String, + + /// Endpoint URL override for the `s3` backend (e.g. R2/MinIO). On Tigris/Fly + /// the endpoint is auto-provided via `AWS_ENDPOINT_URL_S3`, so leave empty. + #[arg(long, env = "GITLAWB_S3_ENDPOINT", default_value = "")] + pub s3_endpoint: String, + + /// Force path-style S3 addressing (required by MinIO and some S3-compatibles). + #[arg(long, env = "GITLAWB_S3_FORCE_PATH_STYLE", default_value_t = false)] + pub s3_force_path_style: bool, + + /// Directory for the `fs` (local filesystem) storage backend. + #[arg(long, env = "GITLAWB_STORAGE_FS_DIR", default_value = "")] + pub storage_fs_dir: String, + + /// Acknowledge a push to the client before the durable upload to object + /// storage finishes (write-back). Greatly lowers push latency; the local + /// copy and the advisory lock keep cross-node consistency, at the cost of a + /// small durability window if the node crashes mid-upload. Set false for + /// strict upload-before-ack durability. + #[arg(long, env = "GITLAWB_ASYNC_UPLOAD", default_value_t = true)] + pub async_upload: bool, + /// Maximum pack body size for git-receive-pack and git-upload-pack, in bytes. /// Applies only to git smart-HTTP routes — all other API routes keep the 2 MB default. /// Default: 2 GB. Set lower on resource-constrained nodes. diff --git a/crates/gitlawb-node/src/git/mod.rs b/crates/gitlawb-node/src/git/mod.rs index 49259d5..a1d2687 100644 --- a/crates/gitlawb-node/src/git/mod.rs +++ b/crates/gitlawb-node/src/git/mod.rs @@ -2,5 +2,4 @@ pub mod issues; pub mod repo_store; pub mod smart_http; pub mod store; -pub mod tigris; pub mod visibility_pack; diff --git a/crates/gitlawb-node/src/git/repo_store.rs b/crates/gitlawb-node/src/git/repo_store.rs index a5c367e..d0d9d61 100644 --- a/crates/gitlawb-node/src/git/repo_store.rs +++ b/crates/gitlawb-node/src/git/repo_store.rs @@ -1,14 +1,17 @@ -//! Centralized repo storage layer — local disk cache backed by Tigris (S3). +//! Centralized repo storage layer — local disk cache backed by a pluggable +//! object store (S3-compatible / filesystem / IPFS) via [`RepoArchive`]. //! //! Every handler that needs access to a git repo on disk goes through `RepoStore`: //! -//! - `acquire()` — ensures the repo is on local disk (downloads from Tigris on cache miss). -//! - `release_after_write()` — uploads the updated repo to Tigris after a write operation. -//! - `init()` — creates a new bare repo locally and uploads to Tigris. +//! - `acquire()` — ensures the repo is on local disk (downloads on cache miss). +//! - `acquire_write()` — write lock + ensures local matches storage (skips the +//! download when the cached etag already matches — the push-latency win). +//! - `release()` / `release_after_write()` — upload the updated repo to storage. +//! - `init()` — creates a new bare repo locally and uploads to storage. //! -//! When Tigris is disabled (bucket empty), this is a simple passthrough to local disk. +//! When no backend is configured, this is a simple passthrough to local disk. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; use std::sync::Arc; @@ -18,18 +21,23 @@ use tokio::sync::Mutex; use tracing::{debug, info, warn}; use super::store; -use super::tigris::TigrisClient; +use crate::storage::archive::RepoArchive; -/// Centralized repo storage: local disk cache + optional Tigris backend. +/// Centralized repo storage: local disk cache + optional object-storage backend +/// (S3-compatible / filesystem / IPFS) behind the [`RepoArchive`] layer. #[derive(Clone)] pub struct RepoStore { repos_dir: PathBuf, - tigris: Option, + archive: Option, /// Shared Postgres pool for advisory locks. pool: PgPool, - /// Tracks repos already confirmed to exist in Tigris — avoids redundant + /// Tracks repos already confirmed to exist in storage — avoids redundant /// HEAD checks and background uploads for repos we've already migrated. migrated: Arc>>, + /// Last-known archive etag per `owner_slug/repo` key. Lets a write skip the + /// pre-write download when our local copy already matches storage (the + /// common case under sticky routing) — the main push-latency win. + versions: Arc>>, } impl RepoStore { @@ -37,18 +45,74 @@ impl RepoStore { pub fn for_testing(repos_dir: PathBuf, pool: PgPool) -> Self { Self { repos_dir, - tigris: None, + archive: None, pool, - migrated: Arc::new(tokio::sync::Mutex::new(std::collections::HashSet::new())), + migrated: Arc::new(Mutex::new(HashSet::new())), + versions: Arc::new(Mutex::new(HashMap::new())), } } - pub fn new(repos_dir: PathBuf, tigris: Option, pool: PgPool) -> Self { + pub fn new(repos_dir: PathBuf, archive: Option, pool: PgPool) -> Self { Self { repos_dir, - tigris, + archive, pool, migrated: Arc::new(Mutex::new(HashSet::new())), + versions: Arc::new(Mutex::new(HashMap::new())), + } + } + + /// Ensure the local copy matches storage, skipping the download when our + /// cached etag already equals the current archive etag. Used by the + /// read-before-write (`acquire_fresh`) and write (`acquire_write`) paths. + async fn sync_down_if_stale( + &self, + owner_slug: &str, + repo_name: &str, + local_path: &Path, + ) -> Result<()> { + let Some(ref archive) = self.archive else { + return Ok(()); + }; + let key = format!("{owner_slug}/{repo_name}"); + + let remote_etag = match archive.head_etag(owner_slug, repo_name).await { + Ok(Some(etag)) => etag, + Ok(None) => return Ok(()), // not in storage yet — local is authoritative + Err(e) => { + // HEAD failed — fall back to a valid local copy if we have one. + if local_path.exists() { + warn!(repo = %repo_name, err = %e, "storage head failed — using local copy"); + return Ok(()); + } + return Err(e).context("storage head before access"); + } + }; + + if local_path.exists() { + let known = self.versions.lock().await.get(&key).cloned(); + if known.as_deref() == Some(remote_etag.as_str()) { + debug!(repo = %repo_name, "local copy current (etag match) — skipping download"); + return Ok(()); + } + } + + match archive.download(owner_slug, repo_name, local_path).await { + Ok(()) => { + self.versions.lock().await.insert(key, remote_etag); + Ok(()) + } + Err(e) => { + // Self-heal: a corrupt/unreadable archive must not block access + // when a valid local copy exists; a later upload re-syncs storage. + if local_path.exists() { + warn!(repo = %repo_name, err = %e, + "archive download failed — falling back to local copy"); + Ok(()) + } else { + Err(e).context("downloading repo archive") + } + } } } @@ -61,33 +125,44 @@ impl RepoStore { // Fast path: repo exists locally if local_path.exists() { - // Lazy migration: if Tigris is enabled and we haven't confirmed this - // repo is in Tigris yet, check and upload in the background. - if let Some(ref tigris) = self.tigris { + // Lazy migration: if storage is enabled and we haven't confirmed this + // repo is in storage yet, check and upload in the background. + if let Some(ref archive) = self.archive { let key = format!("{owner_slug}/{repo_name}"); let already_migrated = self.migrated.lock().await.contains(&key); if !already_migrated { - let tigris = tigris.clone(); + let archive = archive.clone(); let slug = owner_slug.clone(); let name = repo_name.to_string(); let path = local_path.clone(); let migrated = Arc::clone(&self.migrated); + let versions = Arc::clone(&self.versions); tokio::spawn(async move { - // Check if already in Tigris before uploading - match tigris.exists(&slug, &name).await { + // Check if already in storage before uploading + match archive.exists(&slug, &name).await { Ok(true) => { - debug!(repo = %name, "repo already in tigris — skipping migration"); + debug!(repo = %name, "repo already in storage — skipping migration"); } Ok(false) => { - info!(repo = %name, "migrating local repo to tigris"); - if let Err(e) = tigris.upload(&slug, &name, &path).await { - warn!(repo = %name, err = %e, "lazy migration to tigris failed"); - return; + info!(repo = %name, "migrating local repo to storage"); + match archive.upload(&slug, &name, &path).await { + Ok(etag) => { + if let Some(etag) = etag { + versions + .lock() + .await + .insert(format!("{slug}/{name}"), etag); + } + info!(repo = %name, "lazy migration to storage complete"); + } + Err(e) => { + warn!(repo = %name, err = %e, "lazy migration to storage failed"); + return; + } } - info!(repo = %name, "lazy migration to tigris complete"); } Err(e) => { - warn!(repo = %name, err = %e, "tigris existence check failed"); + warn!(repo = %name, err = %e, "storage existence check failed"); return; } } @@ -98,19 +173,21 @@ impl RepoStore { return Ok(local_path); } - // Try downloading from Tigris - if let Some(ref tigris) = self.tigris { - if tigris.exists(&owner_slug, repo_name).await.unwrap_or(false) { - debug!(repo = %repo_name, "cache miss — downloading from tigris"); - tigris + // Try downloading from storage + if let Some(ref archive) = self.archive { + if let Some(remote_etag) = archive + .head_etag(&owner_slug, repo_name) + .await + .unwrap_or(None) + { + debug!(repo = %repo_name, "cache miss — downloading from storage"); + archive .download(&owner_slug, repo_name, &local_path) .await - .context("downloading repo from tigris")?; - // Mark as migrated since we just downloaded it - self.migrated - .lock() - .await - .insert(format!("{owner_slug}/{repo_name}")); + .context("downloading repo from storage")?; + let key = format!("{owner_slug}/{repo_name}"); + self.migrated.lock().await.insert(key.clone()); + self.versions.lock().await.insert(key, remote_etag); return Ok(local_path); } } @@ -126,28 +203,8 @@ impl RepoStore { /// will operate on. pub async fn acquire_fresh(&self, owner_did: &str, repo_name: &str) -> Result { let (owner_slug, local_path) = self.local_path(owner_did, repo_name)?; - - if let Some(ref tigris) = self.tigris { - if tigris.exists(&owner_slug, repo_name).await.unwrap_or(false) { - debug!(repo = %repo_name, "acquire_fresh: downloading latest from tigris"); - if let Err(e) = tigris.download(&owner_slug, repo_name, &local_path).await { - // The Tigris archive is present (HEAD ok) but unreadable — a - // corrupt/partial upload, or a transient GET failure. If we have a - // valid local copy, proceed with it rather than blocking the write; - // the post-write upload re-syncs (self-heals) Tigris. Only hard-fail - // when there is no local copy to fall back to. - if local_path.exists() { - warn!(repo = %repo_name, err = %e, - "acquire_fresh: tigris download failed — falling back to local copy"); - return Ok(local_path); - } - return Err(e).context("downloading repo from tigris (fresh)"); - } - return Ok(local_path); - } - } - - // Tigris disabled or repo not in Tigris — fall back to local + self.sync_down_if_stale(&owner_slug, repo_name, &local_path) + .await?; Ok(local_path) } @@ -178,23 +235,21 @@ impl RepoStore { anyhow::bail!("could not acquire advisory lock after 60s — possible stale lock for {owner_slug}/{repo_name}"); } - // Always download the latest from Tigris before writing. - // Local disk may be stale if another machine pushed since our last access. - if let Some(ref tigris) = self.tigris { - if tigris.exists(&owner_slug, repo_name).await.unwrap_or(false) { - debug!(repo = %repo_name, "write acquire: downloading latest from tigris"); - if let Err(e) = tigris.download(&owner_slug, repo_name, &local_path).await { - // Same self-healing fallback as acquire_fresh: a corrupt/unreadable - // Tigris archive must not block a write when a valid local copy - // exists — release(success) will re-upload a good archive. - if local_path.exists() { - warn!(repo = %repo_name, err = %e, - "write acquire: tigris download failed — falling back to local copy"); - } else { - return Err(e).context("downloading repo from tigris for write"); - } - } - } + // Ensure local matches the latest in storage before writing. The etag + // cache skips the full download when our copy is already current (the + // common single-machine case under sticky routing); a stale copy (another + // machine pushed since) still triggers a download. The advisory lock above + // serializes this so the post-write upload can't race a concurrent writer. + if let Err(e) = self + .sync_down_if_stale(&owner_slug, repo_name, &local_path) + .await + { + // Release the lock we acquired before bailing, to avoid a stale lock. + let _ = sqlx::query("SELECT pg_advisory_unlock($1)") + .bind(lock_key) + .execute(&self.pool) + .await; + return Err(e); } Ok(RepoWriteGuard { @@ -203,7 +258,8 @@ impl RepoStore { local_path, lock_key, pool: self.pool.clone(), - tigris: self.tigris.clone(), + archive: self.archive.clone(), + versions: Arc::clone(&self.versions), }) } @@ -213,15 +269,25 @@ impl RepoStore { store::init_bare(&local_path).context("initializing bare repo")?; - // Upload to Tigris in background - if let Some(ref tigris) = self.tigris { - let tigris = tigris.clone(); + // Upload to storage in background + if let Some(ref archive) = self.archive { + let archive = archive.clone(); let owner_slug = owner_slug.clone(); let repo_name = repo_name.to_string(); let path = local_path.clone(); + let versions = Arc::clone(&self.versions); tokio::spawn(async move { - if let Err(e) = tigris.upload(&owner_slug, &repo_name, &path).await { - warn!(repo = %repo_name, err = %e, "failed to upload new repo to tigris"); + match archive.upload(&owner_slug, &repo_name, &path).await { + Ok(Some(etag)) => { + versions + .lock() + .await + .insert(format!("{owner_slug}/{repo_name}"), etag); + } + Ok(None) => {} + Err(e) => { + warn!(repo = %repo_name, err = %e, "failed to upload new repo to storage"); + } } }); } @@ -229,10 +295,10 @@ impl RepoStore { Ok(local_path) } - /// Upload a repo to Tigris after a write operation (push, merge, fork, etc.). + /// Upload a repo to storage after a write operation (merge, fork, etc.). /// Call this after any operation that modifies the git repo on disk. pub async fn release_after_write(&self, owner_did: &str, repo_name: &str) { - if let Some(ref tigris) = self.tigris { + if let Some(ref archive) = self.archive { let (owner_slug, local_path) = match self.local_path(owner_did, repo_name) { Ok(p) => p, Err(e) => { @@ -240,8 +306,17 @@ impl RepoStore { return; } }; - if let Err(e) = tigris.upload(&owner_slug, repo_name, &local_path).await { - warn!(repo = %repo_name, err = %e, "failed to upload repo to tigris after write"); + match archive.upload(&owner_slug, repo_name, &local_path).await { + Ok(Some(etag)) => { + self.versions + .lock() + .await + .insert(format!("{owner_slug}/{repo_name}"), etag); + } + Ok(None) => {} + Err(e) => { + warn!(repo = %repo_name, err = %e, "failed to upload repo to storage after write"); + } } } } @@ -348,14 +423,15 @@ fn validate_repo_name(repo_name: &str) -> Result<()> { } /// Guard returned by `acquire_write()`. Holds the Postgres advisory lock and -/// uploads to Tigris + releases the lock on `release()`. +/// uploads to storage + releases the lock on `release()`. pub struct RepoWriteGuard { owner_slug: String, repo_name: String, pub local_path: PathBuf, lock_key: i64, pool: PgPool, - tigris: Option, + archive: Option, + versions: Arc>>, } impl RepoWriteGuard { @@ -364,24 +440,38 @@ impl RepoWriteGuard { &self.local_path } - /// Upload to Tigris (only when the write succeeded) and release the advisory + /// Upload to storage (only when the write succeeded) and release the advisory /// lock. Pass `success = false` when the write operation failed — uploading a /// half-applied or otherwise inconsistent repo would propagate corruption to - /// Tigris (and to every node that later downloads it). The lock is always + /// storage (and to every node that later downloads it). The lock is always /// released regardless, to avoid stale locks blocking future writes. + /// + /// IMPORTANT: the advisory lock is held until the upload finishes, so a + /// concurrent writer on another machine cannot read a stale archive. When + /// callers want a fast client ack, they spawn this future as a background + /// task (write-back) — the lock + etag-cache update still complete in order. pub async fn release(self, success: bool) { - // Upload to Tigris only on success. + // Upload to storage only on success. if success { - if let Some(ref tigris) = self.tigris { - if let Err(e) = tigris + if let Some(ref archive) = self.archive { + match archive .upload(&self.owner_slug, &self.repo_name, &self.local_path) .await { - warn!(repo = %self.repo_name, err = %e, "failed to upload repo to tigris after write"); + Ok(Some(etag)) => { + self.versions + .lock() + .await + .insert(format!("{}/{}", self.owner_slug, self.repo_name), etag); + } + Ok(None) => {} + Err(e) => { + warn!(repo = %self.repo_name, err = %e, "failed to upload repo to storage after write"); + } } } } else { - warn!(repo = %self.repo_name, "write failed — skipping tigris upload to avoid propagating an inconsistent repo"); + warn!(repo = %self.repo_name, "write failed — skipping storage upload to avoid propagating an inconsistent repo"); } // Release advisory lock diff --git a/crates/gitlawb-node/src/main.rs b/crates/gitlawb-node/src/main.rs index c881634..096a698 100644 --- a/crates/gitlawb-node/src/main.rs +++ b/crates/gitlawb-node/src/main.rs @@ -17,6 +17,7 @@ mod pinata; mod rate_limit; mod server; mod state; +mod storage; mod sync; #[cfg(test)] mod test_support; @@ -169,25 +170,13 @@ async fn main() -> Result<()> { info!(" fly machine: {mid}"); } - // Initialize Tigris S3 client if bucket is configured - let tigris = if !config.tigris_bucket.is_empty() { - match git::tigris::TigrisClient::new(&config.tigris_bucket).await { - Ok(client) => { - info!(bucket = %config.tigris_bucket, "tigris storage enabled"); - Some(client) - } - Err(e) => { - tracing::warn!(err = %e, "failed to initialize Tigris client — using local-only storage"); - None - } - } - } else { - info!("tigris storage disabled (no bucket configured)"); - None - }; + // Initialize the storage-agnostic blob backend (S3-compatible / filesystem / + // IPFS), then wrap it in the repo-archive layer. `None` = local-only mode. + let blob_store = storage::build(&config).await; + let archive = blob_store.map(storage::archive::RepoArchive::new); let repo_store = - git::repo_store::RepoStore::new(config.repos_dir.clone(), tigris, db.pool().clone()); + git::repo_store::RepoStore::new(config.repos_dir.clone(), archive, db.pool().clone()); let rate_limiter = rate_limit::RateLimiter::new(10, std::time::Duration::from_secs(3600)); diff --git a/crates/gitlawb-node/src/git/tigris.rs b/crates/gitlawb-node/src/storage/archive.rs similarity index 59% rename from crates/gitlawb-node/src/git/tigris.rs rename to crates/gitlawb-node/src/storage/archive.rs index ad26ddc..dd072ce 100644 --- a/crates/gitlawb-node/src/git/tigris.rs +++ b/crates/gitlawb-node/src/storage/archive.rs @@ -1,69 +1,61 @@ -//! Tigris (S3-compatible) storage client for git bare repos. -//! -//! Repos are stored as `repos/v1/{owner_slug}/{repo_name}.tar.zst` — a -//! zstd-compressed tar archive of the bare repo directory. +//! Repo-archive layer: stores a bare git repo as a single +//! `repos/v1/{owner_slug}/{repo_name}.tar.zst` object on top of any +//! [`BlobStore`] backend. Backend-agnostic replacement for the old +//! Tigris-specific client. use std::collections::HashMap; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex, OnceLock}; use anyhow::{Context, Result}; -use aws_sdk_s3::Client as S3Client; +use bytes::Bytes; use tracing::{debug, info}; -/// Wrapper around the S3 client with the configured bucket. +use super::BlobStore; + #[derive(Clone)] -pub struct TigrisClient { - s3: S3Client, - bucket: String, +pub struct RepoArchive { + store: Arc, } -impl TigrisClient { - /// Create a new client. Uses AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and - /// AWS_ENDPOINT_URL_S3 env vars — all set automatically by Fly for Tigris buckets. - pub async fn new(bucket: &str) -> Result { - let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; - let s3 = S3Client::new(&config); - info!(bucket = %bucket, "tigris storage client initialized"); - Ok(Self { - s3, - bucket: bucket.to_string(), - }) +impl RepoArchive { + pub fn new(store: Arc) -> Self { + Self { store } } - /// S3 key for a given repo: `repos/v1/{owner_slug}/{repo_name}.tar.zst` - fn repo_key(owner_slug: &str, repo_name: &str) -> String { + /// Object key for a repo archive. + fn key(owner_slug: &str, repo_name: &str) -> String { format!("repos/v1/{owner_slug}/{repo_name}.tar.zst") } - /// Check if a repo archive exists in Tigris. - pub async fn exists(&self, owner_slug: &str, repo_name: &str) -> Result { - let key = Self::repo_key(owner_slug, repo_name); - match self - .s3 - .head_object() - .bucket(&self.bucket) - .key(&key) - .send() - .await - { - Ok(_) => Ok(true), - Err(e) => { - if e.as_service_error().is_some_and(|e| e.is_not_found()) { - Ok(false) - } else { - Err(anyhow::anyhow!("tigris HEAD {key}: {e}")) - } - } - } + /// Current archive etag, or `None` if the repo isn't in storage yet. + pub async fn head_etag(&self, owner_slug: &str, repo_name: &str) -> Result> { + let key = Self::key(owner_slug, repo_name); + Ok(self + .store + .head(&key) + .await? + .map(|m| m.etag.unwrap_or_else(|| format!("size:{}", m.size)))) } - /// Upload a local bare repo directory to Tigris as a tar.zst archive. - pub async fn upload(&self, owner_slug: &str, repo_name: &str, local_path: &Path) -> Result<()> { - let key = Self::repo_key(owner_slug, repo_name); - debug!(key = %key, path = %local_path.display(), "uploading repo to tigris"); + /// Whether the repo archive exists in storage. + pub async fn exists(&self, owner_slug: &str, repo_name: &str) -> Result { + Ok(self + .store + .head(&Self::key(owner_slug, repo_name)) + .await? + .is_some()) + } - // Create tar.zst in memory + /// Compress the bare repo and upload it. Returns the new etag (for the + /// skip-redundant-download cache). + pub async fn upload( + &self, + owner_slug: &str, + repo_name: &str, + local_path: &Path, + ) -> Result> { + let key = Self::key(owner_slug, repo_name); let archive_bytes = tokio::task::spawn_blocking({ let local_path = local_path.to_path_buf(); move || compress_repo(&local_path) @@ -72,49 +64,31 @@ impl TigrisClient { .context("tar task panicked")? .context("compressing repo")?; - let body = aws_sdk_s3::primitives::ByteStream::from(archive_bytes); - - self.s3 - .put_object() - .bucket(&self.bucket) - .key(&key) - .body(body) - .content_type("application/zstd") - .send() + let meta = self + .store + .put(&key, Bytes::from(archive_bytes)) .await - .context(format!("tigris PUT {key}"))?; - - info!(key = %key, "uploaded repo to tigris"); - Ok(()) + .context("uploading repo archive")?; + info!(key = %key, backend = self.store.backend_name(), "uploaded repo archive"); + Ok(meta.etag.or_else(|| Some(format!("size:{}", meta.size)))) } - /// Download a repo archive from Tigris and extract to local disk. + /// Download the repo archive and extract it to `local_path` (atomic swap). pub async fn download( &self, owner_slug: &str, repo_name: &str, local_path: &Path, ) -> Result<()> { - let key = Self::repo_key(owner_slug, repo_name); - debug!(key = %key, path = %local_path.display(), "downloading repo from tigris"); - - let resp = self - .s3 - .get_object() - .bucket(&self.bucket) - .key(&key) - .send() + let key = Self::key(owner_slug, repo_name); + debug!(key = %key, "downloading repo archive"); + let data = self + .store + .get(&key) .await - .context(format!("tigris GET {key}"))?; + .context("fetching repo archive")? + .ok_or_else(|| anyhow::anyhow!("repo archive missing: {key}"))?; - let data = resp - .body - .collect() - .await - .context("reading tigris response body")? - .into_bytes(); - - // Extract tar.zst to local path tokio::task::spawn_blocking({ let local_path = local_path.to_path_buf(); move || decompress_repo(&data, &local_path) @@ -122,23 +96,14 @@ impl TigrisClient { .await .context("extract task panicked")? .context("extracting repo")?; - - info!(key = %key, path = %local_path.display(), "downloaded repo from tigris"); + info!(key = %key, path = %local_path.display(), "downloaded repo archive"); Ok(()) } - /// Delete a repo archive from Tigris. + /// Delete a repo archive. #[allow(dead_code)] pub async fn delete(&self, owner_slug: &str, repo_name: &str) -> Result<()> { - let key = Self::repo_key(owner_slug, repo_name); - self.s3 - .delete_object() - .bucket(&self.bucket) - .key(&key) - .send() - .await - .context(format!("tigris DELETE {key}"))?; - Ok(()) + self.store.delete(&Self::key(owner_slug, repo_name)).await } } @@ -147,11 +112,8 @@ fn compress_repo(repo_path: &Path) -> Result> { let buf = Vec::new(); let encoder = zstd::stream::Encoder::new(buf, 3)?; // level 3 = fast + decent ratio let mut tar = tar::Builder::new(encoder); - - // Append the bare repo directory contents (not the directory itself) tar.append_dir_all(".", repo_path) .context("building tar archive")?; - let encoder = tar.into_inner().context("finishing tar")?; let compressed = encoder.finish().context("finishing zstd")?; Ok(compressed) @@ -197,8 +159,6 @@ fn decompress_repo(data: &[u8], local_path: &Path) -> Result<()> { std::fs::create_dir_all(&tmp_dir).context("creating temp extract dir")?; - // Unpack into the temp dir; on any failure, clean up and bail without - // touching local_path. let unpack = (|| -> Result<()> { let decoder = zstd::stream::Decoder::new(data)?; let mut archive = tar::Archive::new(decoder); @@ -221,6 +181,5 @@ fn decompress_repo(data: &[u8], local_path: &Path) -> Result<()> { std::fs::remove_dir_all(local_path).context("removing stale repo dir")?; } std::fs::rename(&tmp_dir, local_path).context("swapping extracted repo into place")?; - Ok(()) } diff --git a/crates/gitlawb-node/src/storage/fs.rs b/crates/gitlawb-node/src/storage/fs.rs new file mode 100644 index 0000000..3dfbb07 --- /dev/null +++ b/crates/gitlawb-node/src/storage/fs.rs @@ -0,0 +1,187 @@ +//! Local filesystem blob backend. +//! +//! Stores each object as a file under a configured root directory, using the +//! object key as a relative path. For self-hosters without S3 and for tests of +//! the storage abstraction. The etag is a `size-mtime` fingerprint so the +//! skip-redundant-download optimization works against it. + +use std::path::{Path, PathBuf}; + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use bytes::Bytes; + +use super::{validate_key, BlobStore, ObjectMeta}; + +#[derive(Clone)] +pub struct FsBlobStore { + root: PathBuf, +} + +impl FsBlobStore { + pub fn new(root: impl AsRef) -> Result { + let root = root.as_ref().to_path_buf(); + std::fs::create_dir_all(&root) + .with_context(|| format!("creating storage dir {}", root.display()))?; + Ok(Self { root }) + } + + fn path_for(&self, key: &str) -> Result { + validate_key(key)?; + let path = self.root.join(key); + // Defence in depth: the resolved path must stay under root. + if !path.starts_with(&self.root) { + anyhow::bail!("blob key escaped storage root: {key}"); + } + Ok(path) + } + + fn meta_of(path: &Path) -> Result { + let md = std::fs::metadata(path).context("stat blob")?; + let mtime = md + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_nanos()) + .unwrap_or(0); + Ok(ObjectMeta { + size: md.len(), + etag: Some(format!("{}-{}", md.len(), mtime)), + }) + } +} + +#[async_trait] +impl BlobStore for FsBlobStore { + fn backend_name(&self) -> &'static str { + "fs" + } + + async fn get(&self, key: &str) -> Result> { + let path = self.path_for(key)?; + match tokio::fs::read(&path).await { + Ok(data) => Ok(Some(Bytes::from(data))), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e).context(format!("reading {}", path.display())), + } + } + + async fn put(&self, key: &str, body: Bytes) -> Result { + let path = self.path_for(key)?; + let parent = path + .parent() + .context("blob path has no parent")? + .to_path_buf(); + let tmp = path.with_extension("tmp-put"); + let path2 = path.clone(); + // Atomic write: temp file in the same dir, then rename into place. + tokio::task::spawn_blocking(move || -> Result<()> { + std::fs::create_dir_all(&parent).context("creating blob parent dir")?; + std::fs::write(&tmp, &body).context("writing temp blob")?; + std::fs::rename(&tmp, &path2).context("renaming blob into place")?; + Ok(()) + }) + .await + .context("fs put task panicked")??; + Self::meta_of(&path) + } + + async fn head(&self, key: &str) -> Result> { + let path = self.path_for(key)?; + match Self::meta_of(&path) { + Ok(m) => Ok(Some(m)), + Err(_) if !path.exists() => Ok(None), + Err(e) => Err(e), + } + } + + async fn delete(&self, key: &str) -> Result<()> { + let path = self.path_for(key)?; + match tokio::fs::remove_file(&path).await { + Ok(()) => Ok(()), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e).context(format!("deleting {}", path.display())), + } + } + + async fn list(&self, prefix: &str) -> Result> { + let root = self.root.clone(); + let prefix = prefix.to_string(); + tokio::task::spawn_blocking(move || -> Result> { + let mut keys = Vec::new(); + let mut stack = vec![root.clone()]; + while let Some(dir) = stack.pop() { + let rd = match std::fs::read_dir(&dir) { + Ok(rd) => rd, + Err(_) => continue, + }; + for entry in rd.flatten() { + let path = entry.path(); + if path.is_dir() { + stack.push(path); + } else if let Ok(rel) = path.strip_prefix(&root) { + let key = rel.to_string_lossy().replace('\\', "/"); + if key.starts_with(&prefix) { + keys.push(key); + } + } + } + } + Ok(keys) + }) + .await + .context("fs list task panicked")? + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn put_get_head_delete_list_roundtrip() { + let dir = tempfile::tempdir().unwrap(); + let store = FsBlobStore::new(dir.path()).unwrap(); + + // Absent key + assert!(store.get("repos/v1/a/x.tar.zst").await.unwrap().is_none()); + assert!(store.head("repos/v1/a/x.tar.zst").await.unwrap().is_none()); + + // Put then get + let body = Bytes::from_static(b"hello blob"); + let meta = store + .put("repos/v1/a/x.tar.zst", body.clone()) + .await + .unwrap(); + assert_eq!(meta.size, body.len() as u64); + assert!(meta.etag.is_some()); + let got = store.get("repos/v1/a/x.tar.zst").await.unwrap().unwrap(); + assert_eq!(got, body); + + // Head returns matching etag (stable across reads) + let h = store.head("repos/v1/a/x.tar.zst").await.unwrap().unwrap(); + assert_eq!(h.etag, meta.etag); + + // List by prefix + store + .put("repos/v1/b/y.tar.zst", Bytes::from_static(b"y")) + .await + .unwrap(); + let mut keys = store.list("repos/v1/").await.unwrap(); + keys.sort(); + assert_eq!(keys, vec!["repos/v1/a/x.tar.zst", "repos/v1/b/y.tar.zst"]); + + // Delete is idempotent + store.delete("repos/v1/a/x.tar.zst").await.unwrap(); + store.delete("repos/v1/a/x.tar.zst").await.unwrap(); + assert!(store.get("repos/v1/a/x.tar.zst").await.unwrap().is_none()); + } + + #[tokio::test] + async fn rejects_key_traversal() { + let dir = tempfile::tempdir().unwrap(); + let store = FsBlobStore::new(dir.path()).unwrap(); + assert!(store.get("../escape").await.is_err()); + assert!(store.put("a/../../etc/passwd", Bytes::new()).await.is_err()); + } +} diff --git a/crates/gitlawb-node/src/storage/ipfs.rs b/crates/gitlawb-node/src/storage/ipfs.rs new file mode 100644 index 0000000..8fe3d62 --- /dev/null +++ b/crates/gitlawb-node/src/storage/ipfs.rs @@ -0,0 +1,192 @@ +//! IPFS (content-addressed) blob backend over a Kubo node's Mutable File System. +//! +//! Kubo's MFS (`/api/v0/files/*`) provides a path-addressed, mutable namespace +//! backed by content-addressed IPFS objects — a natural fit for a key→blob store. +//! Each object's etag is its IPFS CID (from `files/stat`), giving true +//! content-addressing for the skip-redundant-download optimization. +//! +//! Requires a reachable Kubo HTTP API (`GITLAWB_IPFS_API`, e.g. +//! `http://127.0.0.1:5001`). Objects written here are also retrievable by CID +//! from the wider IPFS network once pinned/announced by the node. + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use bytes::Bytes; + +use super::{validate_key, BlobStore, ObjectMeta}; + +#[derive(Clone)] +pub struct IpfsBlobStore { + api: String, + client: reqwest::Client, +} + +impl IpfsBlobStore { + pub fn new(api: &str) -> Result { + Ok(Self { + api: api.trim_end_matches('/').to_string(), + client: reqwest::Client::new(), + }) + } + + /// MFS path for a key: `/gitlawb/` (namespaced to avoid clobbering + /// other MFS users on a shared node). + fn mfs_path(key: &str) -> String { + format!("/gitlawb/{key}") + } +} + +#[async_trait] +impl BlobStore for IpfsBlobStore { + fn backend_name(&self) -> &'static str { + "ipfs" + } + + async fn get(&self, key: &str) -> Result> { + validate_key(key)?; + let url = format!("{}/api/v0/files/read", self.api); + let resp = self + .client + .post(&url) + .query(&[("arg", Self::mfs_path(key).as_str())]) + .send() + .await + .context("IPFS files/read")?; + if resp.status().is_success() { + Ok(Some(resp.bytes().await.context("reading IPFS body")?)) + } else { + // Kubo returns 500 with a JSON message when the path is absent. + let body = resp.text().await.unwrap_or_default(); + if body.contains("does not exist") || body.contains("no link named") { + Ok(None) + } else { + anyhow::bail!("IPFS files/read {key}: {body}") + } + } + } + + async fn put(&self, key: &str, body: Bytes) -> Result { + validate_key(key)?; + let size = body.len() as u64; + let url = format!("{}/api/v0/files/write", self.api); + let part = reqwest::multipart::Part::bytes(body.to_vec()).file_name("blob"); + let form = reqwest::multipart::Form::new().part("data", part); + let resp = self + .client + .post(&url) + .query(&[ + ("arg", Self::mfs_path(key).as_str()), + ("create", "true"), + ("parents", "true"), + ("truncate", "true"), + ]) + .multipart(form) + .send() + .await + .context("IPFS files/write")?; + if !resp.status().is_success() { + let status = resp.status(); + let b = resp.text().await.unwrap_or_default(); + anyhow::bail!("IPFS files/write {key} returned {status}: {b}"); + } + // etag = CID from stat + let etag = self.head(key).await?.and_then(|m| m.etag); + Ok(ObjectMeta { size, etag }) + } + + async fn head(&self, key: &str) -> Result> { + validate_key(key)?; + let url = format!("{}/api/v0/files/stat", self.api); + let resp = self + .client + .post(&url) + .query(&[("arg", Self::mfs_path(key).as_str())]) + .send() + .await + .context("IPFS files/stat")?; + if resp.status().is_success() { + let v: serde_json::Value = resp.json().await.context("parsing files/stat")?; + Ok(Some(ObjectMeta { + size: v.get("Size").and_then(|s| s.as_u64()).unwrap_or(0), + etag: v + .get("Hash") + .and_then(|h| h.as_str()) + .map(|s| s.to_string()), + })) + } else { + let body = resp.text().await.unwrap_or_default(); + if body.contains("does not exist") || body.contains("no link named") { + Ok(None) + } else { + anyhow::bail!("IPFS files/stat {key}: {body}") + } + } + } + + async fn delete(&self, key: &str) -> Result<()> { + validate_key(key)?; + let url = format!("{}/api/v0/files/rm", self.api); + let resp = self + .client + .post(&url) + .query(&[("arg", Self::mfs_path(key).as_str()), ("force", "true")]) + .send() + .await + .context("IPFS files/rm")?; + if resp.status().is_success() { + Ok(()) + } else { + let body = resp.text().await.unwrap_or_default(); + if body.contains("does not exist") || body.contains("no link named") { + Ok(()) + } else { + anyhow::bail!("IPFS files/rm {key}: {body}") + } + } + } + + async fn list(&self, prefix: &str) -> Result> { + // Best-effort recursive walk of the MFS subtree under `prefix`. + let mut keys = Vec::new(); + let mut stack = vec![prefix.trim_end_matches('/').to_string()]; + while let Some(rel) = stack.pop() { + let url = format!("{}/api/v0/files/ls", self.api); + let mfs = if rel.is_empty() { + "/gitlawb".to_string() + } else { + Self::mfs_path(&rel) + }; + let resp = self + .client + .post(&url) + .query(&[("arg", mfs.as_str()), ("long", "true")]) + .send() + .await + .context("IPFS files/ls")?; + if !resp.status().is_success() { + continue; + } + let v: serde_json::Value = resp.json().await.context("parsing files/ls")?; + if let Some(entries) = v.get("Entries").and_then(|e| e.as_array()) { + for entry in entries { + let name = entry.get("Name").and_then(|n| n.as_str()).unwrap_or(""); + if name.is_empty() { + continue; + } + let child = if rel.is_empty() { + name.to_string() + } else { + format!("{rel}/{name}") + }; + // Type 1 = directory in Kubo's MFS ls. + if entry.get("Type").and_then(|t| t.as_u64()) == Some(1) { + stack.push(child); + } else { + keys.push(child); + } + } + } + } + Ok(keys) + } +} diff --git a/crates/gitlawb-node/src/storage/mod.rs b/crates/gitlawb-node/src/storage/mod.rs new file mode 100644 index 0000000..857dd91 --- /dev/null +++ b/crates/gitlawb-node/src/storage/mod.rs @@ -0,0 +1,160 @@ +//! Storage-agnostic blob layer. +//! +//! Repos are persisted to a pluggable object store behind the [`BlobStore`] +//! trait. Backends: +//! - [`s3::S3BlobStore`] — any S3-compatible service (Tigris, Cloudflare R2, +//! AWS S3, MinIO, Backblaze B2). Selected by default when a bucket is set. +//! - [`fs::FsBlobStore`] — a local/mounted directory; for self-hosters & tests. +//! - [`ipfs::IpfsBlobStore`] — content-addressed storage over a Kubo (IPFS) node +//! using its Mutable File System (MFS) for a key→blob namespace. +//! +//! Higher layers ([`archive::RepoArchive`]) compose a bare repo into a single +//! `repos/v1/{slug}/{repo}.tar.zst` object on top of whichever backend is active, +//! so the repo-storage semantics are identical regardless of backend. + +use std::sync::Arc; + +use anyhow::Result; +use async_trait::async_trait; +use bytes::Bytes; +use tracing::{info, warn}; + +use crate::config::Config; + +pub mod archive; +pub mod fs; +pub mod ipfs; +pub mod s3; + +/// Metadata about a stored object. `etag` is an opaque change-detection token +/// (S3 ETag, IPFS CID, or a size/mtime fingerprint for the filesystem backend). +#[derive(Debug, Clone)] +pub struct ObjectMeta { + pub size: u64, + pub etag: Option, +} + +/// A backend-agnostic key→bytes object store. +/// +/// Keys are forward-slash-delimited paths (e.g. `repos/v1/slug/repo.tar.zst`). +/// Implementations must reject `..` traversal in keys. +#[async_trait] +pub trait BlobStore: Send + Sync { + /// Short backend name, for logs. + fn backend_name(&self) -> &'static str; + + /// Fetch an object. Returns `None` if the key does not exist. + async fn get(&self, key: &str) -> Result>; + + /// Store an object, returning its metadata (including the new etag). + async fn put(&self, key: &str, body: Bytes) -> Result; + + /// Fetch object metadata without the body. Returns `None` if absent. + async fn head(&self, key: &str) -> Result>; + + /// Delete an object. Succeeds (no-op) if the key does not exist. + async fn delete(&self, key: &str) -> Result<()>; + + /// List object keys under a prefix. Part of the backend interface (and + /// implemented by every backend) for future admin/GC/migration use; not yet + /// wired to a caller, hence the allow. + #[allow(dead_code)] + async fn list(&self, prefix: &str) -> Result>; +} + +/// Build the configured blob store, or `None` for local-only (passthrough) mode. +/// +/// Selection order: +/// 1. Explicit `GITLAWB_STORAGE_BACKEND` (`s3` | `fs` | `ipfs`). +/// 2. Auto: `s3` if a bucket is configured (incl. legacy `GITLAWB_TIGRIS_BUCKET`), +/// else `fs` if a storage dir is set, else local-only. +pub async fn build(config: &Config) -> Option> { + let bucket = if !config.s3_bucket.is_empty() { + config.s3_bucket.clone() + } else { + config.tigris_bucket.clone() + }; + + let backend = if !config.storage_backend.is_empty() { + config.storage_backend.to_ascii_lowercase() + } else if !bucket.is_empty() { + "s3".to_string() + } else if !config.storage_fs_dir.is_empty() { + "fs".to_string() + } else if !config.ipfs_api.is_empty() { + "ipfs".to_string() + } else { + info!("object storage disabled (no backend configured) — local-only mode"); + return None; + }; + + match backend.as_str() { + "s3" => { + if bucket.is_empty() { + warn!("storage backend=s3 but no bucket configured — local-only mode"); + return None; + } + let endpoint = (!config.s3_endpoint.is_empty()).then(|| config.s3_endpoint.clone()); + match s3::S3BlobStore::new(&bucket, endpoint, config.s3_force_path_style).await { + Ok(s) => { + info!(bucket = %bucket, backend = "s3", "object storage enabled"); + Some(Arc::new(s) as Arc) + } + Err(e) => { + warn!(err = %e, "failed to init S3 storage — local-only mode"); + None + } + } + } + "fs" => { + if config.storage_fs_dir.is_empty() { + warn!("storage backend=fs but GITLAWB_STORAGE_FS_DIR is empty — local-only mode"); + return None; + } + match fs::FsBlobStore::new(&config.storage_fs_dir) { + Ok(s) => { + info!(dir = %config.storage_fs_dir, backend = "fs", "object storage enabled"); + Some(Arc::new(s) as Arc) + } + Err(e) => { + warn!(err = %e, "failed to init filesystem storage — local-only mode"); + None + } + } + } + "ipfs" => { + if config.ipfs_api.is_empty() { + warn!("storage backend=ipfs but GITLAWB_IPFS_API is empty — local-only mode"); + return None; + } + match ipfs::IpfsBlobStore::new(&config.ipfs_api) { + Ok(s) => { + info!(api = %config.ipfs_api, backend = "ipfs", "object storage enabled"); + Some(Arc::new(s) as Arc) + } + Err(e) => { + warn!(err = %e, "failed to init IPFS storage — local-only mode"); + None + } + } + } + other => { + warn!(backend = %other, "unknown GITLAWB_STORAGE_BACKEND — local-only mode"); + None + } + } +} + +/// Reject keys that could escape the namespace (`..`) or are absolute. +pub(crate) fn validate_key(key: &str) -> Result<()> { + if key.is_empty() { + anyhow::bail!("blob key is empty"); + } + if key.split('/').any(|seg| seg == ".." || seg == ".") { + anyhow::bail!("blob key contains traversal segment: {key}"); + } + if key.starts_with('/') || key.contains('\0') { + anyhow::bail!("blob key is absolute or contains null byte: {key}"); + } + Ok(()) +} diff --git a/crates/gitlawb-node/src/storage/s3.rs b/crates/gitlawb-node/src/storage/s3.rs new file mode 100644 index 0000000..2da2911 --- /dev/null +++ b/crates/gitlawb-node/src/storage/s3.rs @@ -0,0 +1,166 @@ +//! S3-compatible blob backend. +//! +//! Works with any S3 API implementation: Tigris, Cloudflare R2, AWS S3, MinIO, +//! Backblaze B2. Credentials and (for Tigris on Fly) the endpoint are read from +//! the standard AWS env vars (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, +//! `AWS_ENDPOINT_URL_S3`, `AWS_REGION`). `endpoint`/`force_path_style` override +//! those for self-hosted services like MinIO. + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use aws_sdk_s3::Client as S3Client; +use bytes::Bytes; +use tracing::debug; + +use super::{validate_key, BlobStore, ObjectMeta}; + +#[derive(Clone)] +pub struct S3BlobStore { + s3: S3Client, + bucket: String, +} + +impl S3BlobStore { + /// Build a client. `endpoint` overrides `AWS_ENDPOINT_URL_S3` (for R2/MinIO); + /// `force_path_style` is required by MinIO and some S3-compatibles. + pub async fn new( + bucket: &str, + endpoint: Option, + force_path_style: bool, + ) -> Result { + let shared = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; + let mut builder = aws_sdk_s3::config::Builder::from(&shared); + if let Some(ep) = endpoint { + builder = builder.endpoint_url(ep); + } + if force_path_style { + builder = builder.force_path_style(true); + } + let s3 = S3Client::from_conf(builder.build()); + Ok(Self { + s3, + bucket: bucket.to_string(), + }) + } +} + +#[async_trait] +impl BlobStore for S3BlobStore { + fn backend_name(&self) -> &'static str { + "s3" + } + + async fn get(&self, key: &str) -> Result> { + validate_key(key)?; + match self + .s3 + .get_object() + .bucket(&self.bucket) + .key(key) + .send() + .await + { + Ok(resp) => { + let data = resp + .body + .collect() + .await + .context("reading S3 response body")? + .into_bytes(); + Ok(Some(data)) + } + Err(e) => { + if e.as_service_error().is_some_and(|e| e.is_no_such_key()) { + Ok(None) + } else { + Err(anyhow::anyhow!("S3 GET {key}: {e}")) + } + } + } + } + + async fn put(&self, key: &str, body: Bytes) -> Result { + validate_key(key)?; + let size = body.len() as u64; + let resp = self + .s3 + .put_object() + .bucket(&self.bucket) + .key(key) + .body(aws_sdk_s3::primitives::ByteStream::from(body)) + .send() + .await + .context(format!("S3 PUT {key}"))?; + debug!(key = %key, size, "s3 put"); + Ok(ObjectMeta { + size, + etag: resp.e_tag().map(|s| s.to_string()), + }) + } + + async fn head(&self, key: &str) -> Result> { + validate_key(key)?; + match self + .s3 + .head_object() + .bucket(&self.bucket) + .key(key) + .send() + .await + { + Ok(resp) => Ok(Some(ObjectMeta { + size: resp.content_length().unwrap_or(0).max(0) as u64, + etag: resp.e_tag().map(|s| s.to_string()), + })), + Err(e) => { + if e.as_service_error().is_some_and(|e| e.is_not_found()) { + Ok(None) + } else { + Err(anyhow::anyhow!("S3 HEAD {key}: {e}")) + } + } + } + } + + async fn delete(&self, key: &str) -> Result<()> { + validate_key(key)?; + self.s3 + .delete_object() + .bucket(&self.bucket) + .key(key) + .send() + .await + .context(format!("S3 DELETE {key}"))?; + Ok(()) + } + + async fn list(&self, prefix: &str) -> Result> { + let mut keys = Vec::new(); + let mut continuation: Option = None; + loop { + let mut req = self + .s3 + .list_objects_v2() + .bucket(&self.bucket) + .prefix(prefix); + if let Some(token) = continuation.take() { + req = req.continuation_token(token); + } + let resp = req.send().await.context(format!("S3 LIST {prefix}"))?; + for obj in resp.contents() { + if let Some(k) = obj.key() { + keys.push(k.to_string()); + } + } + if resp.is_truncated().unwrap_or(false) { + continuation = resp.next_continuation_token().map(|s| s.to_string()); + if continuation.is_none() { + break; + } + } else { + break; + } + } + Ok(keys) + } +} From 500593a68a4c4a133049ff5cd27f6bc4d38175de Mon Sep 17 00:00:00 2001 From: Kevin Codex Date: Tue, 23 Jun 2026 19:39:26 +0800 Subject: [PATCH 2/5] fix(storage): address CodeRabbit review (data-integrity + robustness) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Major: - repo_store: add require_fresh to sync_down_if_stale. Write path (acquire_write) fails closed instead of falling back to a stale local copy that a later upload would use to clobber a newer remote archive (lost update); read path (acquire_fresh) keeps self-heal. - fs: unique per-write temp name (uuid) + cleanup on failure, so concurrent puts to the same key can't corrupt each other. - storage::build: return Result and fail closed — a configured-but-unavailable or misconfigured backend now aborts boot instead of silently downgrading to local-only (durability drift). main.rs propagates the error. - ipfs: give the HTTP client connect/total timeouts so an unresponsive Kubo API can't hang push/write flows. Minor/trivial: - repo_store::acquire: propagate storage HEAD errors on cache miss instead of collapsing them into a misleading 404. - repo_store: log advisory-lock release failures instead of discarding them. - archive: clean up the extracted temp dir if the swap step fails. - fs::list / ipfs::list: propagate read/HTTP errors instead of silently returning a partial listing as success. - ipfs::put: stream the body (Part::stream_with_length) instead of to_vec, halving peak upload memory. - storage::build doc: document IPFS auto-detection. Adds a concurrent-put FS test. Deferred (follow-up): integrating the write-back release task with graceful-shutdown drain. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/gitlawb-node/src/git/repo_store.rs | 47 +++++++++---- crates/gitlawb-node/src/main.rs | 6 +- crates/gitlawb-node/src/storage/archive.rs | 15 ++-- crates/gitlawb-node/src/storage/fs.rs | 59 +++++++++++++--- crates/gitlawb-node/src/storage/ipfs.rs | 24 ++++++- crates/gitlawb-node/src/storage/mod.rs | 79 ++++++++++------------ 6 files changed, 155 insertions(+), 75 deletions(-) diff --git a/crates/gitlawb-node/src/git/repo_store.rs b/crates/gitlawb-node/src/git/repo_store.rs index d0d9d61..6353d14 100644 --- a/crates/gitlawb-node/src/git/repo_store.rs +++ b/crates/gitlawb-node/src/git/repo_store.rs @@ -63,13 +63,22 @@ impl RepoStore { } /// Ensure the local copy matches storage, skipping the download when our - /// cached etag already equals the current archive etag. Used by the - /// read-before-write (`acquire_fresh`) and write (`acquire_write`) paths. + /// cached etag already equals the current archive etag. + /// + /// `require_fresh` selects the failure policy: + /// - `false` (read path, `acquire_fresh`): self-heal — if a storage HEAD or + /// download fails but a valid local copy exists, use it; a later upload + /// re-syncs storage. + /// - `true` (write path, `acquire_write`): fail closed — never fall back to + /// a possibly-stale local copy. The remote etag differs (remote is newer), + /// so uploading our stale copy after the write would clobber it (lost + /// update). Propagate the error so the write is rejected instead. async fn sync_down_if_stale( &self, owner_slug: &str, repo_name: &str, local_path: &Path, + require_fresh: bool, ) -> Result<()> { let Some(ref archive) = self.archive else { return Ok(()); @@ -80,8 +89,9 @@ impl RepoStore { Ok(Some(etag)) => etag, Ok(None) => return Ok(()), // not in storage yet — local is authoritative Err(e) => { - // HEAD failed — fall back to a valid local copy if we have one. - if local_path.exists() { + // HEAD failed. Read path: fall back to a valid local copy if we + // have one. Write path: fail closed (see `require_fresh`). + if !require_fresh && local_path.exists() { warn!(repo = %repo_name, err = %e, "storage head failed — using local copy"); return Ok(()); } @@ -103,9 +113,11 @@ impl RepoStore { Ok(()) } Err(e) => { - // Self-heal: a corrupt/unreadable archive must not block access - // when a valid local copy exists; a later upload re-syncs storage. - if local_path.exists() { + // Read path self-heal only: a corrupt/unreadable archive must not + // block access when a valid local copy exists. On the write path + // the remote etag differs (remote is newer), so falling back and + // later uploading our stale copy would clobber it — fail closed. + if !require_fresh && local_path.exists() { warn!(repo = %repo_name, err = %e, "archive download failed — falling back to local copy"); Ok(()) @@ -178,7 +190,7 @@ impl RepoStore { if let Some(remote_etag) = archive .head_etag(&owner_slug, repo_name) .await - .unwrap_or(None) + .context("checking storage for repo")? { debug!(repo = %repo_name, "cache miss — downloading from storage"); archive @@ -203,7 +215,7 @@ impl RepoStore { /// will operate on. pub async fn acquire_fresh(&self, owner_did: &str, repo_name: &str) -> Result { let (owner_slug, local_path) = self.local_path(owner_did, repo_name)?; - self.sync_down_if_stale(&owner_slug, repo_name, &local_path) + self.sync_down_if_stale(&owner_slug, repo_name, &local_path, false) .await?; Ok(local_path) } @@ -241,14 +253,18 @@ impl RepoStore { // machine pushed since) still triggers a download. The advisory lock above // serializes this so the post-write upload can't race a concurrent writer. if let Err(e) = self - .sync_down_if_stale(&owner_slug, repo_name, &local_path) + .sync_down_if_stale(&owner_slug, repo_name, &local_path, true) .await { // Release the lock we acquired before bailing, to avoid a stale lock. - let _ = sqlx::query("SELECT pg_advisory_unlock($1)") + if let Err(unlock_err) = sqlx::query("SELECT pg_advisory_unlock($1)") .bind(lock_key) .execute(&self.pool) - .await; + .await + { + warn!(repo = %repo_name, err = %unlock_err, + "failed to release advisory lock after sync error"); + } return Err(e); } @@ -475,10 +491,13 @@ impl RepoWriteGuard { } // Release advisory lock - let _ = sqlx::query("SELECT pg_advisory_unlock($1)") + if let Err(e) = sqlx::query("SELECT pg_advisory_unlock($1)") .bind(self.lock_key) .execute(&self.pool) - .await; + .await + { + warn!(repo = %self.repo_name, err = %e, "failed to release advisory lock"); + } } } diff --git a/crates/gitlawb-node/src/main.rs b/crates/gitlawb-node/src/main.rs index 096a698..1378813 100644 --- a/crates/gitlawb-node/src/main.rs +++ b/crates/gitlawb-node/src/main.rs @@ -172,7 +172,11 @@ async fn main() -> Result<()> { // Initialize the storage-agnostic blob backend (S3-compatible / filesystem / // IPFS), then wrap it in the repo-archive layer. `None` = local-only mode. - let blob_store = storage::build(&config).await; + // Fail closed: a configured-but-unreachable backend aborts boot rather than + // silently running local-only and dropping durability. + let blob_store = storage::build(&config) + .await + .context("initializing object storage backend")?; let archive = blob_store.map(storage::archive::RepoArchive::new); let repo_store = diff --git a/crates/gitlawb-node/src/storage/archive.rs b/crates/gitlawb-node/src/storage/archive.rs index dd072ce..80cda91 100644 --- a/crates/gitlawb-node/src/storage/archive.rs +++ b/crates/gitlawb-node/src/storage/archive.rs @@ -177,9 +177,16 @@ fn decompress_repo(data: &[u8], local_path: &Path) -> Result<()> { // must not interleave or they race to a nondeterministic overwrite/failure. let lock = publish_lock(local_path); let _publish = lock.lock().expect("publish lock poisoned"); - if local_path.exists() { - std::fs::remove_dir_all(local_path).context("removing stale repo dir")?; + let swap = (|| -> Result<()> { + if local_path.exists() { + std::fs::remove_dir_all(local_path).context("removing stale repo dir")?; + } + std::fs::rename(&tmp_dir, local_path).context("swapping extracted repo into place")?; + Ok(()) + })(); + if swap.is_err() { + // Don't leak the extracted temp dir if the swap failed. + let _ = std::fs::remove_dir_all(&tmp_dir); } - std::fs::rename(&tmp_dir, local_path).context("swapping extracted repo into place")?; - Ok(()) + swap } diff --git a/crates/gitlawb-node/src/storage/fs.rs b/crates/gitlawb-node/src/storage/fs.rs index 3dfbb07..972d2a2 100644 --- a/crates/gitlawb-node/src/storage/fs.rs +++ b/crates/gitlawb-node/src/storage/fs.rs @@ -72,14 +72,23 @@ impl BlobStore for FsBlobStore { .parent() .context("blob path has no parent")? .to_path_buf(); - let tmp = path.with_extension("tmp-put"); + // Unique temp name per write: a fixed suffix would let concurrent puts + // to the same key overwrite each other's temp file and corrupt the blob. + let tmp = path.with_extension(format!("{}.tmp-put", uuid::Uuid::new_v4())); let path2 = path.clone(); - // Atomic write: temp file in the same dir, then rename into place. + // Atomic write: temp file in the same dir, then rename into place. On any + // failure, remove the temp file so a failed write can't leak it. tokio::task::spawn_blocking(move || -> Result<()> { std::fs::create_dir_all(&parent).context("creating blob parent dir")?; - std::fs::write(&tmp, &body).context("writing temp blob")?; - std::fs::rename(&tmp, &path2).context("renaming blob into place")?; - Ok(()) + let write_and_swap = (|| -> Result<()> { + std::fs::write(&tmp, &body).context("writing temp blob")?; + std::fs::rename(&tmp, &path2).context("renaming blob into place")?; + Ok(()) + })(); + if write_and_swap.is_err() { + let _ = std::fs::remove_file(&tmp); + } + write_and_swap }) .await .context("fs put task panicked")??; @@ -111,10 +120,10 @@ impl BlobStore for FsBlobStore { let mut keys = Vec::new(); let mut stack = vec![root.clone()]; while let Some(dir) = stack.pop() { - let rd = match std::fs::read_dir(&dir) { - Ok(rd) => rd, - Err(_) => continue, - }; + // Propagate read errors rather than skipping: a partial listing + // reported as success would mislead GC/admin/migration callers. + let rd = std::fs::read_dir(&dir) + .with_context(|| format!("listing {}", dir.display()))?; for entry in rd.flatten() { let path = entry.path(); if path.is_dir() { @@ -184,4 +193,36 @@ mod tests { assert!(store.get("../escape").await.is_err()); assert!(store.put("a/../../etc/passwd", Bytes::new()).await.is_err()); } + + #[tokio::test] + async fn concurrent_puts_same_key_do_not_corrupt_or_leak_temps() { + let dir = tempfile::tempdir().unwrap(); + let store = FsBlobStore::new(dir.path()).unwrap(); + let key = "repos/v1/a/x.tar.zst"; + let body = Bytes::from_static(b"the-one-true-blob"); + + // Many concurrent writers of the same key: with a fixed temp name they + // would clobber each other's temp file mid-write and corrupt the result. + let mut handles = Vec::new(); + for _ in 0..16 { + let store = store.clone(); + let body = body.clone(); + handles.push(tokio::spawn(async move { store.put(key, body).await })); + } + for h in handles { + h.await.unwrap().unwrap(); + } + + // Final content is intact... + assert_eq!(store.get(key).await.unwrap().unwrap(), body); + // ...and no unique-suffixed temp files were left behind. + let leftovers: Vec = store + .list("repos/v1/") + .await + .unwrap() + .into_iter() + .filter(|k| k.contains("tmp-put")) + .collect(); + assert!(leftovers.is_empty(), "leaked temp files: {leftovers:?}"); + } } diff --git a/crates/gitlawb-node/src/storage/ipfs.rs b/crates/gitlawb-node/src/storage/ipfs.rs index 8fe3d62..2a4e749 100644 --- a/crates/gitlawb-node/src/storage/ipfs.rs +++ b/crates/gitlawb-node/src/storage/ipfs.rs @@ -23,9 +23,17 @@ pub struct IpfsBlobStore { impl IpfsBlobStore { pub fn new(api: &str) -> Result { + // Bound requests so an unresponsive Kubo API can't hang push/write flows + // indefinitely. connect_timeout guards the dial; the generous total + // timeout still allows large repo-archive transfers. + let client = reqwest::Client::builder() + .connect_timeout(std::time::Duration::from_secs(5)) + .timeout(std::time::Duration::from_secs(300)) + .build() + .context("building IPFS HTTP client")?; Ok(Self { api: api.trim_end_matches('/').to_string(), - client: reqwest::Client::new(), + client, }) } @@ -69,7 +77,10 @@ impl BlobStore for IpfsBlobStore { validate_key(key)?; let size = body.len() as u64; let url = format!("{}/api/v0/files/write", self.api); - let part = reqwest::multipart::Part::bytes(body.to_vec()).file_name("blob"); + // Stream the body instead of copying it via to_vec — avoids doubling + // peak memory for large archives. Length is known, so set it explicitly. + let part = reqwest::multipart::Part::stream_with_length(reqwest::Body::from(body), size) + .file_name("blob"); let form = reqwest::multipart::Form::new().part("data", part); let resp = self .client @@ -164,7 +175,14 @@ impl BlobStore for IpfsBlobStore { .await .context("IPFS files/ls")?; if !resp.status().is_success() { - continue; + // Distinguish "this subtree doesn't exist" (fine — nothing to + // list) from real auth/network/server errors, which must surface + // rather than masquerade as an empty listing. + let body = resp.text().await.unwrap_or_default(); + if body.contains("does not exist") || body.contains("no link named") { + continue; + } + anyhow::bail!("IPFS files/ls {mfs}: {body}"); } let v: serde_json::Value = resp.json().await.context("parsing files/ls")?; if let Some(entries) = v.get("Entries").and_then(|e| e.as_array()) { diff --git a/crates/gitlawb-node/src/storage/mod.rs b/crates/gitlawb-node/src/storage/mod.rs index 857dd91..cfe6bab 100644 --- a/crates/gitlawb-node/src/storage/mod.rs +++ b/crates/gitlawb-node/src/storage/mod.rs @@ -14,10 +14,10 @@ use std::sync::Arc; -use anyhow::Result; +use anyhow::{Context, Result}; use async_trait::async_trait; use bytes::Bytes; -use tracing::{info, warn}; +use tracing::info; use crate::config::Config; @@ -62,13 +62,21 @@ pub trait BlobStore: Send + Sync { async fn list(&self, prefix: &str) -> Result>; } -/// Build the configured blob store, or `None` for local-only (passthrough) mode. +/// Build the configured blob store. +/// +/// Returns `Ok(None)` only when no backend is configured at all (local-only +/// passthrough mode). A configured-but-unavailable or misconfigured backend +/// returns `Err`: we fail closed rather than silently degrading to local-only, +/// which would accept writes without the intended durable backend and risk +/// cross-node persistence drift. /// /// Selection order: /// 1. Explicit `GITLAWB_STORAGE_BACKEND` (`s3` | `fs` | `ipfs`). /// 2. Auto: `s3` if a bucket is configured (incl. legacy `GITLAWB_TIGRIS_BUCKET`), -/// else `fs` if a storage dir is set, else local-only. -pub async fn build(config: &Config) -> Option> { +/// else `fs` if `GITLAWB_STORAGE_FS_DIR` is set, +/// else `ipfs` if `GITLAWB_IPFS_API` is set, +/// else local-only. +pub async fn build(config: &Config) -> Result>> { let bucket = if !config.s3_bucket.is_empty() { config.s3_bucket.clone() } else { @@ -85,62 +93,45 @@ pub async fn build(config: &Config) -> Option> { "ipfs".to_string() } else { info!("object storage disabled (no backend configured) — local-only mode"); - return None; + return Ok(None); }; + // A backend was selected (explicitly or by auto-detection); fail closed from + // here — a missing required setting or an init failure is a hard error. match backend.as_str() { "s3" => { if bucket.is_empty() { - warn!("storage backend=s3 but no bucket configured — local-only mode"); - return None; + anyhow::bail!( + "storage backend=s3 but no bucket configured (set GITLAWB_S3_BUCKET)" + ); } let endpoint = (!config.s3_endpoint.is_empty()).then(|| config.s3_endpoint.clone()); - match s3::S3BlobStore::new(&bucket, endpoint, config.s3_force_path_style).await { - Ok(s) => { - info!(bucket = %bucket, backend = "s3", "object storage enabled"); - Some(Arc::new(s) as Arc) - } - Err(e) => { - warn!(err = %e, "failed to init S3 storage — local-only mode"); - None - } - } + let s = s3::S3BlobStore::new(&bucket, endpoint, config.s3_force_path_style) + .await + .context("initializing S3 storage")?; + info!(bucket = %bucket, backend = "s3", "object storage enabled"); + Ok(Some(Arc::new(s) as Arc)) } "fs" => { if config.storage_fs_dir.is_empty() { - warn!("storage backend=fs but GITLAWB_STORAGE_FS_DIR is empty — local-only mode"); - return None; - } - match fs::FsBlobStore::new(&config.storage_fs_dir) { - Ok(s) => { - info!(dir = %config.storage_fs_dir, backend = "fs", "object storage enabled"); - Some(Arc::new(s) as Arc) - } - Err(e) => { - warn!(err = %e, "failed to init filesystem storage — local-only mode"); - None - } + anyhow::bail!("storage backend=fs but GITLAWB_STORAGE_FS_DIR is empty"); } + let s = fs::FsBlobStore::new(&config.storage_fs_dir) + .context("initializing filesystem storage")?; + info!(dir = %config.storage_fs_dir, backend = "fs", "object storage enabled"); + Ok(Some(Arc::new(s) as Arc)) } "ipfs" => { if config.ipfs_api.is_empty() { - warn!("storage backend=ipfs but GITLAWB_IPFS_API is empty — local-only mode"); - return None; - } - match ipfs::IpfsBlobStore::new(&config.ipfs_api) { - Ok(s) => { - info!(api = %config.ipfs_api, backend = "ipfs", "object storage enabled"); - Some(Arc::new(s) as Arc) - } - Err(e) => { - warn!(err = %e, "failed to init IPFS storage — local-only mode"); - None - } + anyhow::bail!("storage backend=ipfs but GITLAWB_IPFS_API is empty"); } + let s = + ipfs::IpfsBlobStore::new(&config.ipfs_api).context("initializing IPFS storage")?; + info!(api = %config.ipfs_api, backend = "ipfs", "object storage enabled"); + Ok(Some(Arc::new(s) as Arc)) } other => { - warn!(backend = %other, "unknown GITLAWB_STORAGE_BACKEND — local-only mode"); - None + anyhow::bail!("unknown GITLAWB_STORAGE_BACKEND: {other}"); } } } From d35a1c8c31c2b45e9312cd20c7ef894072bd7c04 Mon Sep 17 00:00:00 2001 From: Kevin Codex Date: Wed, 24 Jun 2026 00:33:09 +0800 Subject: [PATCH 3/5] fix(storage): address beardthelion review (lock soundness, durability, tests) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit P1: - config: default GITLAWB_ASYNC_UPLOAD to false. Write-back ack opened a data-loss window (crash between ack and upload → stale remote overwrites newer local on restart); strict upload-before-ack is now the default and the latency optimization is opt-in. P2: - repo_store: hold ONE pooled connection for the advisory lock's lifetime. pg_advisory_lock is session-scoped, so acquiring and releasing on different pooled connections let unlock silently no-op while the lock lingered. Acquire, sync-error release, and final release now all run on the pinned connection (RepoWriteGuard owns a PoolConnection instead of the pool). - s3: add TimeoutConfig (60s attempt / 300s operation) so a hung endpoint can't block the task — and, under async_upload, the advisory lock — unbounded. - storage::build: scope the fail-closed doc to config + client construction; note that live connectivity (e.g. unreachable S3 bucket) surfaces on first use. P3: - ipfs: reject `..`/`.` traversal in list() prefixes; make a failed post-write stat non-fatal (the write succeeded — return without an etag). - docs: replace stale generic "Tigris" references with "storage" across repo_store/api/state/archive; keep legitimate Tigris-as-S3-example mentions. Tests: archive compress/decompress round-trip + atomicity (corrupt archive leaves existing copy intact) + fs upload/download; repo_store sync_down_if_stale etag-skip and require_fresh fail-closed-vs-fallback. Skipped with reason: IPFS string-matched error detection — Kubo's HTTP API returns 500 + JSON message with no stable error codes, so substring matching is the pragmatic norm; a structured-error layer is disproportionate here. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/gitlawb-node/src/api/issues.rs | 4 +- crates/gitlawb-node/src/api/pulls.rs | 2 +- crates/gitlawb-node/src/api/repos.rs | 6 +- crates/gitlawb-node/src/config.rs | 11 +- crates/gitlawb-node/src/git/repo_store.rs | 147 ++++++++++++++++++--- crates/gitlawb-node/src/state.rs | 2 +- crates/gitlawb-node/src/storage/archive.rs | 96 +++++++++++++- crates/gitlawb-node/src/storage/ipfs.rs | 17 ++- crates/gitlawb-node/src/storage/mod.rs | 12 +- crates/gitlawb-node/src/storage/s3.rs | 10 ++ 10 files changed, 272 insertions(+), 35 deletions(-) diff --git a/crates/gitlawb-node/src/api/issues.rs b/crates/gitlawb-node/src/api/issues.rs index 59b647c..504d001 100644 --- a/crates/gitlawb-node/src/api/issues.rs +++ b/crates/gitlawb-node/src/api/issues.rs @@ -70,7 +70,7 @@ pub async fn create_issue( let create_result = git_issues::create_issue(&disk_path, &issue_id, &json_str); - // Always release the advisory lock — even on error; upload to Tigris only on success. + // Always release the advisory lock — even on error; upload to storage only on success. guard.release(create_result.is_ok()).await; create_result.map_err(|e| AppError::Git(e.to_string()))?; @@ -250,7 +250,7 @@ pub async fn close_issue( let close_result = git_issues::close_issue(&disk_path, &issue_id); - // Always release the advisory lock — even on error; upload to Tigris only on success. + // Always release the advisory lock — even on error; upload to storage only on success. guard.release(close_result.is_ok()).await; let updated = close_result diff --git a/crates/gitlawb-node/src/api/pulls.rs b/crates/gitlawb-node/src/api/pulls.rs index 26be610..75fe7a8 100644 --- a/crates/gitlawb-node/src/api/pulls.rs +++ b/crates/gitlawb-node/src/api/pulls.rs @@ -224,7 +224,7 @@ pub async fn merge_pr( &pr.title, ); - // Always release the advisory lock — even on error; upload to Tigris only on success. + // Always release the advisory lock — even on error; upload to storage only on success. guard.release(merge_result.is_ok()).await; let merge_sha = merge_result.map_err(|e| AppError::Git(e.to_string()))?; diff --git a/crates/gitlawb-node/src/api/repos.rs b/crates/gitlawb-node/src/api/repos.rs index 7bdcfcd..ea47686 100644 --- a/crates/gitlawb-node/src/api/repos.rs +++ b/crates/gitlawb-node/src/api/repos.rs @@ -376,7 +376,7 @@ pub async fn git_info_refs( } } - // For receive-pack (push), download the latest from Tigris so the client + // For receive-pack (push), download the latest from storage so the client // sees the same refs that acquire_write() will operate on. let disk_path = if service == "git-receive-pack" { state @@ -1141,7 +1141,7 @@ pub async fn fork_repo( ))); } - // Ensure source repo is on local disk (downloads from Tigris on cache miss) + // Ensure source repo is on local disk (downloads from storage on cache miss) let source_path = state .repo_store .acquire(&source.owner_did, &source.name) @@ -1168,7 +1168,7 @@ pub async fn fork_repo( ))); } - // Upload fork to Tigris + // Upload fork to storage state .repo_store .release_after_write(&forker_did, &fork_name) diff --git a/crates/gitlawb-node/src/config.rs b/crates/gitlawb-node/src/config.rs index b93526e..17be97f 100644 --- a/crates/gitlawb-node/src/config.rs +++ b/crates/gitlawb-node/src/config.rs @@ -153,11 +153,12 @@ pub struct Config { pub storage_fs_dir: String, /// Acknowledge a push to the client before the durable upload to object - /// storage finishes (write-back). Greatly lowers push latency; the local - /// copy and the advisory lock keep cross-node consistency, at the cost of a - /// small durability window if the node crashes mid-upload. Set false for - /// strict upload-before-ack durability. - #[arg(long, env = "GITLAWB_ASYNC_UPLOAD", default_value_t = true)] + /// storage finishes (write-back). Lowers push latency, but opens a + /// durability window: if the node stops between the ack and the upload, on + /// restart a stale remote archive can overwrite the newer local copy. Off + /// by default (strict upload-before-ack); opt in only where the latency win + /// is worth that risk. + #[arg(long, env = "GITLAWB_ASYNC_UPLOAD", default_value_t = false)] pub async_upload: bool, /// Maximum pack body size for git-receive-pack and git-upload-pack, in bytes. diff --git a/crates/gitlawb-node/src/git/repo_store.rs b/crates/gitlawb-node/src/git/repo_store.rs index 6353d14..0625e74 100644 --- a/crates/gitlawb-node/src/git/repo_store.rs +++ b/crates/gitlawb-node/src/git/repo_store.rs @@ -16,7 +16,8 @@ use std::path::{Path, PathBuf}; use std::sync::Arc; use anyhow::{Context, Result}; -use sqlx::PgPool; +use sqlx::pool::PoolConnection; +use sqlx::{PgPool, Postgres}; use tokio::sync::Mutex; use tracing::{debug, info, warn}; @@ -128,9 +129,9 @@ impl RepoStore { } } - /// Ensure a repo is available on local disk, downloading from Tigris if needed. - /// If the repo exists locally but not yet in Tigris, a background upload is - /// spawned to lazily migrate it (on-demand migration for pre-Tigris repos). + /// Ensure a repo is available on local disk, downloading from storage if needed. + /// If the repo exists locally but not yet in storage, a background upload is + /// spawned to lazily migrate it (on-demand migration for pre-storage repos). /// Returns the local path to the bare repo. pub async fn acquire(&self, owner_did: &str, repo_name: &str) -> Result { let (owner_slug, local_path) = self.local_path(owner_did, repo_name)?; @@ -209,7 +210,7 @@ impl RepoStore { Ok(local_path) } - /// Ensure a repo is available on local disk with the **latest** Tigris state. + /// Ensure a repo is available on local disk with the **latest** storage state. /// Use this for operations that precede a write (e.g. `info/refs` for /// `git-receive-pack`) so the client sees the same refs that `acquire_write()` /// will operate on. @@ -226,13 +227,25 @@ impl RepoStore { let (owner_slug, local_path) = self.local_path(owner_did, repo_name)?; let lock_key = advisory_lock_key(&owner_slug, repo_name); - // Acquire Postgres advisory lock with retry using pg_try_advisory_lock - // to avoid blocking indefinitely on stale locks from crashed connections. + // Postgres advisory locks are SESSION-scoped: they bind to one backend + // connection and only release on that same connection. With a pool, + // acquiring and releasing on different checked-out connections means the + // unlock silently no-ops while the lock lingers on the original. So we + // pin ONE connection for the whole lock lifetime — acquire, release on + // sync error, and the final release in the guard all run on it. + let mut conn = self + .pool + .acquire() + .await + .context("acquiring db connection for advisory lock")?; + + // Acquire with retry using pg_try_advisory_lock to avoid blocking + // indefinitely on stale locks from crashed connections. let mut acquired = false; for attempt in 0..60 { let row: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)") .bind(lock_key) - .fetch_one(&self.pool) + .fetch_one(&mut *conn) .await .context("trying advisory lock")?; if row.0 { @@ -256,10 +269,10 @@ impl RepoStore { .sync_down_if_stale(&owner_slug, repo_name, &local_path, true) .await { - // Release the lock we acquired before bailing, to avoid a stale lock. + // Release the lock on the SAME connection before bailing. if let Err(unlock_err) = sqlx::query("SELECT pg_advisory_unlock($1)") .bind(lock_key) - .execute(&self.pool) + .execute(&mut *conn) .await { warn!(repo = %repo_name, err = %unlock_err, @@ -273,13 +286,13 @@ impl RepoStore { repo_name: repo_name.to_string(), local_path, lock_key, - pool: self.pool.clone(), + conn, archive: self.archive.clone(), versions: Arc::clone(&self.versions), }) } - /// Initialize a new bare repo on local disk and upload to Tigris. + /// Initialize a new bare repo on local disk and upload to storage. pub async fn init(&self, owner_did: &str, repo_name: &str) -> Result { let (owner_slug, local_path) = self.local_path(owner_did, repo_name)?; @@ -445,7 +458,10 @@ pub struct RepoWriteGuard { repo_name: String, pub local_path: PathBuf, lock_key: i64, - pool: PgPool, + /// The connection the session-scoped advisory lock was taken on. The lock + /// must be released on this same connection, so it's held for the guard's + /// lifetime and dropped (returned to the pool) only after `release()`. + conn: PoolConnection, archive: Option, versions: Arc>>, } @@ -466,7 +482,7 @@ impl RepoWriteGuard { /// concurrent writer on another machine cannot read a stale archive. When /// callers want a fast client ack, they spawn this future as a background /// task (write-back) — the lock + etag-cache update still complete in order. - pub async fn release(self, success: bool) { + pub async fn release(mut self, success: bool) { // Upload to storage only on success. if success { if let Some(ref archive) = self.archive { @@ -490,10 +506,11 @@ impl RepoWriteGuard { warn!(repo = %self.repo_name, "write failed — skipping storage upload to avoid propagating an inconsistent repo"); } - // Release advisory lock + // Release the advisory lock on the same connection it was taken on, then + // drop the connection (returns it to the pool). if let Err(e) = sqlx::query("SELECT pg_advisory_unlock($1)") .bind(self.lock_key) - .execute(&self.pool) + .execute(&mut *self.conn) .await { warn!(repo = %self.repo_name, err = %e, "failed to release advisory lock"); @@ -671,4 +688,102 @@ mod tests { ); } } + + // ── sync_down_if_stale (fs-backed archive, lazy pool) ────────────────── + + /// A RepoStore over an fs-backed archive. `sync_down_if_stale` never touches + /// the pool, so a lazy (never-connected) pool is fine. + fn store_with_fs_archive(repos_dir: PathBuf, store_root: &Path) -> RepoStore { + let blob: Arc = + Arc::new(crate::storage::fs::FsBlobStore::new(store_root).unwrap()); + let archive = crate::storage::archive::RepoArchive::new(blob); + let pool = sqlx::PgPool::connect_lazy("postgres://invalid").unwrap(); + RepoStore::new(repos_dir, Some(archive), pool) + } + + #[tokio::test] + async fn sync_down_if_stale_downloads_then_skips_on_etag_match() { + let store_root = tempfile::tempdir().unwrap(); + let repos_dir = tempfile::tempdir().unwrap(); + let store = store_with_fs_archive(repos_dir.path().to_path_buf(), store_root.path()); + + // Seed the archive with a repo. + let seed = tempfile::tempdir().unwrap(); + std::fs::write(seed.path().join("HEAD"), b"v1\n").unwrap(); + store + .archive + .as_ref() + .unwrap() + .upload("owner", "repo", seed.path()) + .await + .unwrap(); + + let local = repos_dir.path().join("owner").join("repo.git"); + + // First call downloads. + store + .sync_down_if_stale("owner", "repo", &local, false) + .await + .unwrap(); + assert_eq!(std::fs::read(local.join("HEAD")).unwrap(), b"v1\n"); + + // Locally mutate, then sync again: the cached etag still matches the + // remote, so the download is skipped and our local edit survives. + std::fs::write(local.join("HEAD"), b"LOCAL-EDIT\n").unwrap(); + store + .sync_down_if_stale("owner", "repo", &local, false) + .await + .unwrap(); + assert_eq!( + std::fs::read(local.join("HEAD")).unwrap(), + b"LOCAL-EDIT\n", + "etag match must skip the download (local copy preserved)" + ); + } + + #[tokio::test] + async fn sync_down_if_stale_require_fresh_fails_closed_on_bad_remote() { + let store_root = tempfile::tempdir().unwrap(); + let repos_dir = tempfile::tempdir().unwrap(); + let store = store_with_fs_archive(repos_dir.path().to_path_buf(), store_root.path()); + + let seed = tempfile::tempdir().unwrap(); + std::fs::write(seed.path().join("HEAD"), b"v1\n").unwrap(); + store + .archive + .as_ref() + .unwrap() + .upload("owner", "repo", seed.path()) + .await + .unwrap(); + + let local = repos_dir.path().join("owner").join("repo.git"); + store + .sync_down_if_stale("owner", "repo", &local, false) + .await + .unwrap(); + + // Corrupt the stored archive: HEAD now succeeds with a *new* etag (so the + // cache no longer matches and a download is forced), but the download + // decompresses garbage and fails. + let blob_path = store_root.path().join("repos/v1/owner/repo.tar.zst"); + std::fs::write(&blob_path, b"corrupted not-a-tar-zst").unwrap(); + + // Write path: must fail closed rather than fall back to the stale local + // copy (which a later upload would use to clobber the newer remote). + assert!( + store + .sync_down_if_stale("owner", "repo", &local, true) + .await + .is_err(), + "require_fresh=true must propagate the download error" + ); + + // Read path: self-heals — falls back to the valid local copy. + store + .sync_down_if_stale("owner", "repo", &local, false) + .await + .expect("require_fresh=false must fall back to the local copy"); + assert_eq!(std::fs::read(local.join("HEAD")).unwrap(), b"v1\n"); + } } diff --git a/crates/gitlawb-node/src/state.rs b/crates/gitlawb-node/src/state.rs index 85ce0a8..0fa099b 100644 --- a/crates/gitlawb-node/src/state.rs +++ b/crates/gitlawb-node/src/state.rs @@ -47,7 +47,7 @@ pub struct AppState { pub graphql_schema: Arc, /// Fly.io machine ID — used for fly-replay routing in multi-machine deployments pub machine_id: Option, - /// Centralized repo storage: local disk cache + optional Tigris backend + /// Centralized repo storage: local disk cache + optional object-store backend pub repo_store: RepoStore, /// Per-DID rate limiter for creation endpoints (repos, issues, PRs) pub rate_limiter: RateLimiter, diff --git a/crates/gitlawb-node/src/storage/archive.rs b/crates/gitlawb-node/src/storage/archive.rs index 80cda91..fd32045 100644 --- a/crates/gitlawb-node/src/storage/archive.rs +++ b/crates/gitlawb-node/src/storage/archive.rs @@ -1,7 +1,7 @@ //! Repo-archive layer: stores a bare git repo as a single //! `repos/v1/{owner_slug}/{repo_name}.tar.zst` object on top of any //! [`BlobStore`] backend. Backend-agnostic replacement for the old -//! Tigris-specific client. +//! single-backend (Tigris-only) client. use std::collections::HashMap; use std::path::{Path, PathBuf}; @@ -190,3 +190,97 @@ fn decompress_repo(data: &[u8], local_path: &Path) -> Result<()> { } swap } + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + + fn seed_repo(dir: &std::path::Path) { + fs::create_dir_all(dir.join("refs/heads")).unwrap(); + fs::write(dir.join("HEAD"), b"ref: refs/heads/main\n").unwrap(); + fs::write(dir.join("refs/heads/main"), b"abc123\n").unwrap(); + fs::write(dir.join("config"), b"[core]\n\tbare = true\n").unwrap(); + } + + #[test] + fn compress_decompress_round_trip_preserves_files() { + let src = tempfile::tempdir().unwrap(); + seed_repo(src.path()); + + let bytes = compress_repo(src.path()).unwrap(); + assert!(!bytes.is_empty()); + + let out_parent = tempfile::tempdir().unwrap(); + let out = out_parent.path().join("restored.git"); + decompress_repo(&bytes, &out).unwrap(); + + assert_eq!( + fs::read(out.join("HEAD")).unwrap(), + b"ref: refs/heads/main\n" + ); + assert_eq!(fs::read(out.join("refs/heads/main")).unwrap(), b"abc123\n"); + assert_eq!( + fs::read(out.join("config")).unwrap(), + b"[core]\n\tbare = true\n" + ); + } + + #[test] + fn decompress_swap_replaces_existing_dir_atomically() { + let src = tempfile::tempdir().unwrap(); + fs::write(src.path().join("HEAD"), b"new\n").unwrap(); + let bytes = compress_repo(src.path()).unwrap(); + + // Pre-existing copy with stale junk that the swap must fully replace. + let out_parent = tempfile::tempdir().unwrap(); + let out = out_parent.path().join("repo.git"); + fs::create_dir_all(&out).unwrap(); + fs::write(out.join("STALE"), b"old\n").unwrap(); + + decompress_repo(&bytes, &out).unwrap(); + assert_eq!(fs::read(out.join("HEAD")).unwrap(), b"new\n"); + assert!( + !out.join("STALE").exists(), + "stale content must be gone after the swap" + ); + } + + #[test] + fn decompress_corrupt_archive_leaves_existing_copy_untouched() { + let out_parent = tempfile::tempdir().unwrap(); + let out = out_parent.path().join("repo.git"); + fs::create_dir_all(&out).unwrap(); + fs::write(out.join("HEAD"), b"good\n").unwrap(); + + // Garbage is not a valid tar.zst: unpack fails before the swap, so the + // existing copy is preserved (atomicity claim). + assert!(decompress_repo(b"not a real archive", &out).is_err()); + assert_eq!(fs::read(out.join("HEAD")).unwrap(), b"good\n"); + } + + #[tokio::test] + async fn upload_download_round_trip_over_fs_backend() { + let store_dir = tempfile::tempdir().unwrap(); + let store: Arc = + Arc::new(crate::storage::fs::FsBlobStore::new(store_dir.path()).unwrap()); + let archive = RepoArchive::new(store); + + let src = tempfile::tempdir().unwrap(); + seed_repo(src.path()); + + assert!(!archive.exists("owner", "repo").await.unwrap()); + let etag = archive.upload("owner", "repo", src.path()).await.unwrap(); + assert!(etag.is_some()); + assert!(archive.exists("owner", "repo").await.unwrap()); + + let out_parent = tempfile::tempdir().unwrap(); + let out = out_parent.path().join("repo.git"); + archive.download("owner", "repo", &out).await.unwrap(); + assert_eq!( + fs::read(out.join("HEAD")).unwrap(), + b"ref: refs/heads/main\n" + ); + assert_eq!(fs::read(out.join("refs/heads/main")).unwrap(), b"abc123\n"); + } +} diff --git a/crates/gitlawb-node/src/storage/ipfs.rs b/crates/gitlawb-node/src/storage/ipfs.rs index 2a4e749..46bc92d 100644 --- a/crates/gitlawb-node/src/storage/ipfs.rs +++ b/crates/gitlawb-node/src/storage/ipfs.rs @@ -100,8 +100,16 @@ impl BlobStore for IpfsBlobStore { let b = resp.text().await.unwrap_or_default(); anyhow::bail!("IPFS files/write {key} returned {status}: {b}"); } - // etag = CID from stat - let etag = self.head(key).await?.and_then(|m| m.etag); + // etag = CID from stat. The write already succeeded, so a failed stat + // must not fail the put — just return without an etag (callers treat a + // missing etag as "always re-check", never as a lost write). + let etag = match self.head(key).await { + Ok(m) => m.and_then(|m| m.etag), + Err(e) => { + tracing::warn!(key = %key, err = %e, "IPFS stat after write failed — returning no etag"); + None + } + }; Ok(ObjectMeta { size, etag }) } @@ -157,6 +165,11 @@ impl BlobStore for IpfsBlobStore { } async fn list(&self, prefix: &str) -> Result> { + // Reject traversal in the prefix before it's spliced into an MFS path + // (matches get/put/head/delete, which validate via validate_key). + if prefix.split('/').any(|seg| seg == ".." || seg == ".") { + anyhow::bail!("list prefix contains traversal segment: {prefix}"); + } // Best-effort recursive walk of the MFS subtree under `prefix`. let mut keys = Vec::new(); let mut stack = vec![prefix.trim_end_matches('/').to_string()]; diff --git a/crates/gitlawb-node/src/storage/mod.rs b/crates/gitlawb-node/src/storage/mod.rs index cfe6bab..0a27242 100644 --- a/crates/gitlawb-node/src/storage/mod.rs +++ b/crates/gitlawb-node/src/storage/mod.rs @@ -65,10 +65,14 @@ pub trait BlobStore: Send + Sync { /// Build the configured blob store. /// /// Returns `Ok(None)` only when no backend is configured at all (local-only -/// passthrough mode). A configured-but-unavailable or misconfigured backend -/// returns `Err`: we fail closed rather than silently degrading to local-only, -/// which would accept writes without the intended durable backend and risk -/// cross-node persistence drift. +/// passthrough mode). A misconfigured backend (missing required setting, or a +/// client that fails to construct) returns `Err`: we fail closed rather than +/// silently degrading to local-only, which would accept writes without the +/// intended durable backend and risk cross-node persistence drift. +/// +/// Note: this validates configuration and client construction, not live +/// connectivity — e.g. the S3 client builds successfully against an unreachable +/// or wrong bucket, and that surfaces as an error on the first real request. /// /// Selection order: /// 1. Explicit `GITLAWB_STORAGE_BACKEND` (`s3` | `fs` | `ipfs`). diff --git a/crates/gitlawb-node/src/storage/s3.rs b/crates/gitlawb-node/src/storage/s3.rs index 2da2911..1c07b47 100644 --- a/crates/gitlawb-node/src/storage/s3.rs +++ b/crates/gitlawb-node/src/storage/s3.rs @@ -36,6 +36,16 @@ impl S3BlobStore { if force_path_style { builder = builder.force_path_style(true); } + // Bound requests so a hung endpoint can't block the calling task forever + // (under async_upload it would hold the advisory lock and stall every + // later push). attempt_timeout bounds a single try; operation_timeout + // bounds the whole call incl. retries — generous enough for large + // archive transfers. + let timeouts = aws_sdk_s3::config::timeout::TimeoutConfig::builder() + .operation_attempt_timeout(std::time::Duration::from_secs(60)) + .operation_timeout(std::time::Duration::from_secs(300)) + .build(); + builder = builder.timeout_config(timeouts); let s3 = S3Client::from_conf(builder.build()); Ok(Self { s3, From e179daaee991ca48139033512407ad63505eec66 Mon Sep 17 00:00:00 2001 From: Kevin Codex Date: Wed, 24 Jun 2026 10:09:27 +0800 Subject: [PATCH 4/5] fix(storage): isolate advisory-lock pool + #[must_use] write guard MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to the connection-pinning fix (beardthelion review): - Pool exhaustion (P2): pinning a connection per push from the shared pool let a burst of concurrent/slow pushes drain it and block every DB handler node-wide. Give advisory locks a SEPARATE bounded pool (16 conns) created in main; acquire_write pins from it. RepoStore now holds only that lock pool — it touches Postgres solely for locks — so the handler pool is never starved by pushes. Removed the now-unused Db::pool() accessor. - Lock leak via dropped guard (P3): add #[must_use] to RepoWriteGuard. Dropping it without release() returns the connection to the pool with the session advisory lock still held until that backend connection is evicted; the annotation makes an accidental drop a compile-time warning. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/gitlawb-node/src/db/mod.rs | 5 ----- crates/gitlawb-node/src/git/repo_store.rs | 26 +++++++++++++++++------ crates/gitlawb-node/src/main.rs | 15 +++++++++++-- 3 files changed, 33 insertions(+), 13 deletions(-) diff --git a/crates/gitlawb-node/src/db/mod.rs b/crates/gitlawb-node/src/db/mod.rs index cfb289e..86a3dc8 100644 --- a/crates/gitlawb-node/src/db/mod.rs +++ b/crates/gitlawb-node/src/db/mod.rs @@ -246,11 +246,6 @@ pub struct Db { } impl Db { - /// Access the underlying Postgres connection pool. - pub fn pool(&self) -> &PgPool { - &self.pool - } - #[cfg(test)] pub fn for_testing(pool: PgPool) -> Self { Self { pool } diff --git a/crates/gitlawb-node/src/git/repo_store.rs b/crates/gitlawb-node/src/git/repo_store.rs index 0625e74..26eecc4 100644 --- a/crates/gitlawb-node/src/git/repo_store.rs +++ b/crates/gitlawb-node/src/git/repo_store.rs @@ -30,8 +30,13 @@ use crate::storage::archive::RepoArchive; pub struct RepoStore { repos_dir: PathBuf, archive: Option, - /// Shared Postgres pool for advisory locks. - pool: PgPool, + /// Bounded pool dedicated to advisory-lock connections — the only DB pool + /// RepoStore needs, as it touches Postgres solely for advisory locks. A push + /// pins one connection for its whole lifetime (the lock is held across both + /// receive-pack and the upload), so a separate budget keeps a burst of + /// concurrent/slow pushes from draining the handler pool and starving every + /// other DB handler. + lock_pool: PgPool, /// Tracks repos already confirmed to exist in storage — avoids redundant /// HEAD checks and background uploads for repos we've already migrated. migrated: Arc>>, @@ -47,17 +52,20 @@ impl RepoStore { Self { repos_dir, archive: None, - pool, + lock_pool: pool, migrated: Arc::new(Mutex::new(HashSet::new())), versions: Arc::new(Mutex::new(HashMap::new())), } } - pub fn new(repos_dir: PathBuf, archive: Option, pool: PgPool) -> Self { + /// `lock_pool` is a bounded pool that advisory-lock connections are pinned + /// from, kept separate from the handler pool so push concurrency can't + /// drain it. + pub fn new(repos_dir: PathBuf, archive: Option, lock_pool: PgPool) -> Self { Self { repos_dir, archive, - pool, + lock_pool, migrated: Arc::new(Mutex::new(HashSet::new())), versions: Arc::new(Mutex::new(HashMap::new())), } @@ -234,7 +242,7 @@ impl RepoStore { // pin ONE connection for the whole lock lifetime — acquire, release on // sync error, and the final release in the guard all run on it. let mut conn = self - .pool + .lock_pool .acquire() .await .context("acquiring db connection for advisory lock")?; @@ -453,6 +461,12 @@ fn validate_repo_name(repo_name: &str) -> Result<()> { /// Guard returned by `acquire_write()`. Holds the Postgres advisory lock and /// uploads to storage + releases the lock on `release()`. +/// +/// `#[must_use]`: dropping the guard without calling `release()` returns its +/// connection to the pool with the *session* advisory lock still held, which +/// then lingers until that backend connection is evicted — so the lock must be +/// released explicitly. +#[must_use = "call release() — dropping the guard leaks the advisory lock onto the pooled connection"] pub struct RepoWriteGuard { owner_slug: String, repo_name: String, diff --git a/crates/gitlawb-node/src/main.rs b/crates/gitlawb-node/src/main.rs index 1378813..ab4ec7d 100644 --- a/crates/gitlawb-node/src/main.rs +++ b/crates/gitlawb-node/src/main.rs @@ -179,8 +179,19 @@ async fn main() -> Result<()> { .context("initializing object storage backend")?; let archive = blob_store.map(storage::archive::RepoArchive::new); - let repo_store = - git::repo_store::RepoStore::new(config.repos_dir.clone(), archive, db.pool().clone()); + // Dedicated, bounded pool for advisory-lock connections. A push pins one + // connection for its whole lifetime (lock held across receive-pack + + // upload), so giving locks their own budget keeps a burst of concurrent or + // slow pushes from draining the main pool and stalling every other DB + // handler node-wide. Sized independently of the handler pool. + const ADVISORY_LOCK_POOL_SIZE: u32 = 16; + let lock_pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(ADVISORY_LOCK_POOL_SIZE) + .connect(&config.database_url) + .await + .context("creating advisory-lock connection pool")?; + + let repo_store = git::repo_store::RepoStore::new(config.repos_dir.clone(), archive, lock_pool); let rate_limiter = rate_limit::RateLimiter::new(10, std::time::Duration::from_secs(3600)); From bd7c0261cbb0b0be8ead6df4e6c20a0679a7d9db Mon Sep 17 00:00:00 2001 From: Kevin Codex Date: Wed, 24 Jun 2026 11:18:52 +0800 Subject: [PATCH 5/5] fix(storage): propagate durability errors + serialize background uploads MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit CodeRabbit round (data-integrity/consistency): - release()/release_after_write() now return Result. The push strict path, merge, fork, and issue writes surface a durable-upload failure to the client instead of acking a write that never reached storage; the async write-back path logs it. (findings 3, 4) - Cache invalidation on failure: on a failed write OR a failed upload, drop the cached etag so the next write re-downloads and reconciles rather than trusting un-persisted local state. (finding 5) - Background uploads serialized: new upload_under_lock() takes the per-repo advisory lock. init() uploads synchronously under it (fail closed); lazy migration uploads under it (skip-if-exists). Stops an older snapshot from clobbering a concurrent locked push. (finding 2) - archive swap no longer destroys the old repo up front: move it to a backup, rename the new copy in, delete the backup on success, restore it on rename failure — local_path is never left missing. (finding 6) - fs head() distinguishes NotFound from permission/IO errors via ErrorKind instead of path.exists(). (finding 1) - fs list() propagates per-entry read errors instead of dropping them via flatten(). (finding 7) - Reworded the async-upload comment to state the real tradeoff: on a crash after ack, storage stays stale until the repo's next successful push — it is not auto-reconciled. (finding 8) Already in place from earlier commits (verified): IPFS list() traversal guard, IPFS non-fatal post-write stat, S3 client timeouts. Co-Authored-By: Claude Opus 4.8 (1M context) --- crates/gitlawb-node/src/api/issues.rs | 22 +- crates/gitlawb-node/src/api/pulls.rs | 9 +- crates/gitlawb-node/src/api/repos.rs | 36 +++- crates/gitlawb-node/src/git/repo_store.rs | 224 +++++++++++++-------- crates/gitlawb-node/src/storage/archive.rs | 29 ++- crates/gitlawb-node/src/storage/fs.rs | 16 +- 6 files changed, 229 insertions(+), 107 deletions(-) diff --git a/crates/gitlawb-node/src/api/issues.rs b/crates/gitlawb-node/src/api/issues.rs index 504d001..c308dde 100644 --- a/crates/gitlawb-node/src/api/issues.rs +++ b/crates/gitlawb-node/src/api/issues.rs @@ -71,9 +71,16 @@ pub async fn create_issue( let create_result = git_issues::create_issue(&disk_path, &issue_id, &json_str); // Always release the advisory lock — even on error; upload to storage only on success. - guard.release(create_result.is_ok()).await; + let release_result = guard.release(create_result.is_ok()).await; create_result.map_err(|e| AppError::Git(e.to_string()))?; + // The write succeeded locally; surface a durable-upload failure rather than + // acking an issue that never reached storage. + release_result.map_err(|e| { + AppError::Git(format!( + "issue stored locally but durable upload failed: {e}" + )) + })?; // Bump trust score for the issue author — increment current score by 0.05 // (avoids the push_count=0 stuck-at-0.05 bug for agents who only file issues) @@ -229,11 +236,11 @@ pub async fn close_issue( .ok() .and_then(|i| i.author), Ok(None) => { - guard.release(false).await; + let _ = guard.release(false).await; return Err(AppError::NotFound(format!("issue {issue_id} not found"))); } Err(e) => { - guard.release(false).await; + let _ = guard.release(false).await; return Err(AppError::Git(e.to_string())); } }; @@ -242,7 +249,7 @@ pub async fn close_issue( .as_deref() .is_some_and(|a| crate::api::did_matches(&auth.0, a)); if !is_owner && !is_author { - guard.release(false).await; + let _ = guard.release(false).await; return Err(AppError::Forbidden( "only the repo owner or the issue author can close this issue".into(), )); @@ -251,11 +258,16 @@ pub async fn close_issue( let close_result = git_issues::close_issue(&disk_path, &issue_id); // Always release the advisory lock — even on error; upload to storage only on success. - guard.release(close_result.is_ok()).await; + let release_result = guard.release(close_result.is_ok()).await; let updated = close_result .map_err(|e| AppError::Git(e.to_string()))? .ok_or_else(|| AppError::RepoNotFound(format!("issue {issue_id} not found")))?; + release_result.map_err(|e| { + AppError::Git(format!( + "issue stored locally but durable upload failed: {e}" + )) + })?; let issue: serde_json::Value = serde_json::from_str(&updated) .map_err(|e| AppError::BadRequest(format!("invalid issue data: {e}")))?; diff --git a/crates/gitlawb-node/src/api/pulls.rs b/crates/gitlawb-node/src/api/pulls.rs index 75fe7a8..fb940a3 100644 --- a/crates/gitlawb-node/src/api/pulls.rs +++ b/crates/gitlawb-node/src/api/pulls.rs @@ -225,9 +225,16 @@ pub async fn merge_pr( ); // Always release the advisory lock — even on error; upload to storage only on success. - guard.release(merge_result.is_ok()).await; + let release_result = guard.release(merge_result.is_ok()).await; let merge_sha = merge_result.map_err(|e| AppError::Git(e.to_string()))?; + // Surface a durable-upload failure rather than acking a merge that never + // reached storage. + release_result.map_err(|e| { + AppError::Git(format!( + "merge applied locally but durable upload failed: {e}" + )) + })?; state.db.merge_pr(&pr.id, &merger_did).await?; let _ = state.db.touch_repo(&record.id).await; diff --git a/crates/gitlawb-node/src/api/repos.rs b/crates/gitlawb-node/src/api/repos.rs index ea47686..14d4c29 100644 --- a/crates/gitlawb-node/src/api/repos.rs +++ b/crates/gitlawb-node/src/api/repos.rs @@ -556,12 +556,28 @@ pub async fn git_receive_pack( // Write-back: ack the client now; the durable upload to object storage // and the advisory-lock release run in the background. The lock is held // until the upload finishes, so a concurrent writer on another machine - // can't observe a stale archive. Trades a small crash-durability window - // (local copy survives; lazy migration re-syncs) for much lower latency. - tokio::spawn(guard.release(true)); + // can't observe a stale archive. Durability tradeoff: if the node stops + // after this ack but before the upload, the local copy survives but + // STORAGE stays stale until this repo's next successful push re-uploads + // — it is not automatically reconciled. Hence async_upload is opt-in. + let repo_label = name.to_string(); + tokio::spawn(async move { + if let Err(e) = guard.release(true).await { + tracing::error!(repo = %repo_label, err = %e, + "write-back durable upload failed after push was acked"); + } + }); } else { - // Strict path (or failed push): upload-before-ack / prompt lock release. - guard.release(push_ok).await; + // Strict path (or failed push): upload-before-ack. On a successful push + // whose durable upload then fails, surface an error so the client knows + // the push is not durably stored (a failed push returns Ok from release + // and the push error is reported below). + if let Err(e) = guard.release(push_ok).await { + tracing::error!(repo = %name, err = %e, "durable upload failed after push"); + return Err(AppError::Git(format!( + "push applied locally but durable upload to storage failed: {e}" + ))); + } } let result = receive_result.map_err(|e| { @@ -1168,11 +1184,17 @@ pub async fn fork_repo( ))); } - // Upload fork to storage + // Upload fork to storage — fail closed if the durable upload fails rather + // than reporting a fork that only exists on this node's local disk. state .repo_store .release_after_write(&forker_did, &fork_name) - .await; + .await + .map_err(|e| { + AppError::Git(format!( + "fork created locally but durable upload failed: {e}" + )) + })?; let now = Utc::now(); let record = crate::db::RepoRecord { diff --git a/crates/gitlawb-node/src/git/repo_store.rs b/crates/gitlawb-node/src/git/repo_store.rs index 26eecc4..6a495a5 100644 --- a/crates/gitlawb-node/src/git/repo_store.rs +++ b/crates/gitlawb-node/src/git/repo_store.rs @@ -19,7 +19,7 @@ use anyhow::{Context, Result}; use sqlx::pool::PoolConnection; use sqlx::{PgPool, Postgres}; use tokio::sync::Mutex; -use tracing::{debug, info, warn}; +use tracing::{debug, warn}; use super::store; use crate::storage::archive::RepoArchive; @@ -148,46 +148,28 @@ impl RepoStore { if local_path.exists() { // Lazy migration: if storage is enabled and we haven't confirmed this // repo is in storage yet, check and upload in the background. - if let Some(ref archive) = self.archive { + if self.archive.is_some() { let key = format!("{owner_slug}/{repo_name}"); let already_migrated = self.migrated.lock().await.contains(&key); if !already_migrated { - let archive = archive.clone(); + let this = self.clone(); let slug = owner_slug.clone(); let name = repo_name.to_string(); let path = local_path.clone(); - let migrated = Arc::clone(&self.migrated); - let versions = Arc::clone(&self.versions); + let key = key.clone(); tokio::spawn(async move { - // Check if already in storage before uploading - match archive.exists(&slug, &name).await { - Ok(true) => { - debug!(repo = %name, "repo already in storage — skipping migration"); - } - Ok(false) => { - info!(repo = %name, "migrating local repo to storage"); - match archive.upload(&slug, &name, &path).await { - Ok(etag) => { - if let Some(etag) = etag { - versions - .lock() - .await - .insert(format!("{slug}/{name}"), etag); - } - info!(repo = %name, "lazy migration to storage complete"); - } - Err(e) => { - warn!(repo = %name, err = %e, "lazy migration to storage failed"); - return; - } - } + // Upload under the advisory lock (skip if already present) + // so this opportunistic migration can't clobber a + // concurrent locked push by landing a stale snapshot. + match this.upload_under_lock(&slug, &name, &path, true).await { + Ok(()) => { + this.migrated.lock().await.insert(key); + debug!(repo = %name, "lazy migration to storage complete (or already present)"); } Err(e) => { - warn!(repo = %name, err = %e, "storage existence check failed"); - return; + warn!(repo = %name, err = %e, "lazy migration to storage failed"); } } - migrated.lock().await.insert(format!("{slug}/{name}")); }); } } @@ -306,55 +288,110 @@ impl RepoStore { store::init_bare(&local_path).context("initializing bare repo")?; - // Upload to storage in background - if let Some(ref archive) = self.archive { - let archive = archive.clone(); - let owner_slug = owner_slug.clone(); - let repo_name = repo_name.to_string(); - let path = local_path.clone(); - let versions = Arc::clone(&self.versions); - tokio::spawn(async move { - match archive.upload(&owner_slug, &repo_name, &path).await { - Ok(Some(etag)) => { - versions - .lock() - .await - .insert(format!("{owner_slug}/{repo_name}"), etag); - } - Ok(None) => {} - Err(e) => { - warn!(repo = %repo_name, err = %e, "failed to upload new repo to storage"); - } - } - }); - } + // Upload the new repo synchronously under the advisory lock: a background + // upload could land the empty repo *after* a racing first push and clobber + // it, and a silent failure would leave the repo absent from storage. Fail + // closed instead — surface upload errors to the caller. + self.upload_under_lock(&owner_slug, repo_name, &local_path, false) + .await + .context("uploading new repo to storage")?; Ok(local_path) } /// Upload a repo to storage after a write operation (merge, fork, etc.). - /// Call this after any operation that modifies the git repo on disk. - pub async fn release_after_write(&self, owner_did: &str, repo_name: &str) { - if let Some(ref archive) = self.archive { - let (owner_slug, local_path) = match self.local_path(owner_did, repo_name) { - Ok(p) => p, - Err(e) => { - warn!(repo = %repo_name, err = %e, "rejected unsafe path in release_after_write"); - return; - } + /// Call this after any operation that modifies the git repo on disk. Returns + /// `Err` if the durable upload fails so the caller can surface it rather than + /// acking a write that never reached storage. + pub async fn release_after_write(&self, owner_did: &str, repo_name: &str) -> Result<()> { + let Some(ref archive) = self.archive else { + return Ok(()); + }; + let (owner_slug, local_path) = self + .local_path(owner_did, repo_name) + .context("rejected unsafe path in release_after_write")?; + let key = format!("{owner_slug}/{repo_name}"); + match archive.upload(&owner_slug, repo_name, &local_path).await { + Ok(Some(etag)) => { + self.versions.lock().await.insert(key, etag); + Ok(()) + } + Ok(None) => Ok(()), + Err(e) => { + // Invalidate so the next access re-downloads rather than trusting + // a local copy we failed to persist. + self.versions.lock().await.remove(&key); + Err(e).context("uploading repo to storage after write") + } + } + } + + /// Upload `local_path` to storage while holding the per-repo advisory lock, + /// so a background or init-time upload can't clobber a concurrent locked + /// write by landing an older snapshot after it. With `skip_if_exists`, skips + /// the upload when the archive is already present (used by lazy migration). + async fn upload_under_lock( + &self, + owner_slug: &str, + repo_name: &str, + local_path: &Path, + skip_if_exists: bool, + ) -> Result<()> { + let Some(ref archive) = self.archive else { + return Ok(()); + }; + let lock_key = advisory_lock_key(owner_slug, repo_name); + let mut conn = self + .lock_pool + .acquire() + .await + .context("acquiring db connection for upload lock")?; + + let mut acquired = false; + for attempt in 0..30 { + let row: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)") + .bind(lock_key) + .fetch_one(&mut *conn) + .await + .context("trying advisory lock")?; + if row.0 { + acquired = true; + break; + } + if attempt < 29 { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + if !acquired { + anyhow::bail!("could not acquire advisory lock for upload of {owner_slug}/{repo_name}"); + } + + let outcome: Result> = + if skip_if_exists && archive.exists(owner_slug, repo_name).await.unwrap_or(false) { + Ok(None) // already present — nothing to upload + } else { + archive.upload(owner_slug, repo_name, local_path).await }; - match archive.upload(&owner_slug, repo_name, &local_path).await { - Ok(Some(etag)) => { - self.versions - .lock() - .await - .insert(format!("{owner_slug}/{repo_name}"), etag); - } - Ok(None) => {} - Err(e) => { - warn!(repo = %repo_name, err = %e, "failed to upload repo to storage after write"); - } + + // Release the lock on the same connection regardless of outcome. + if let Err(e) = sqlx::query("SELECT pg_advisory_unlock($1)") + .bind(lock_key) + .execute(&mut *conn) + .await + { + warn!(repo = %repo_name, err = %e, "failed to release upload lock"); + } + + match outcome { + Ok(Some(etag)) => { + self.versions + .lock() + .await + .insert(format!("{owner_slug}/{repo_name}"), etag); + Ok(()) } + Ok(None) => Ok(()), + Err(e) => Err(e).context("uploading repo to storage under lock"), } } @@ -496,32 +533,47 @@ impl RepoWriteGuard { /// concurrent writer on another machine cannot read a stale archive. When /// callers want a fast client ack, they spawn this future as a background /// task (write-back) — the lock + etag-cache update still complete in order. - pub async fn release(mut self, success: bool) { - // Upload to storage only on success. - if success { + pub async fn release(mut self, success: bool) -> Result<()> { + let key = format!("{}/{}", self.owner_slug, self.repo_name); + + // Upload to storage only on success. Capture the outcome so we can both + // release the lock unconditionally and propagate a durable-upload + // failure to the caller (a synchronous caller turns it into a client + // error; a write-back caller logs it). + let upload_result: Result<()> = if success { if let Some(ref archive) = self.archive { match archive .upload(&self.owner_slug, &self.repo_name, &self.local_path) .await { Ok(Some(etag)) => { - self.versions - .lock() - .await - .insert(format!("{}/{}", self.owner_slug, self.repo_name), etag); + self.versions.lock().await.insert(key.clone(), etag); + Ok(()) } - Ok(None) => {} + Ok(None) => Ok(()), Err(e) => { - warn!(repo = %self.repo_name, err = %e, "failed to upload repo to storage after write"); + // The durable copy is now stale. Drop the cached etag so + // the next write re-downloads and reconciles rather than + // trusting a local copy we failed to persist. + self.versions.lock().await.remove(&key); + Err(e).context("uploading repo to storage after write") } } + } else { + Ok(()) } } else { - warn!(repo = %self.repo_name, "write failed — skipping storage upload to avoid propagating an inconsistent repo"); - } + // Write failed: skip the upload (a half-applied repo must not reach + // storage) and invalidate the cached etag — the local copy may be + // dirty, so the next write must re-download instead of skipping on a + // now-misleading etag match. + warn!(repo = %self.repo_name, "write failed — skipping storage upload and invalidating etag cache"); + self.versions.lock().await.remove(&key); + Ok(()) + }; - // Release the advisory lock on the same connection it was taken on, then - // drop the connection (returns it to the pool). + // Release the advisory lock on the same connection it was taken on + // regardless of the upload outcome, then drop the connection. if let Err(e) = sqlx::query("SELECT pg_advisory_unlock($1)") .bind(self.lock_key) .execute(&mut *self.conn) @@ -529,6 +581,8 @@ impl RepoWriteGuard { { warn!(repo = %self.repo_name, err = %e, "failed to release advisory lock"); } + + upload_result } } diff --git a/crates/gitlawb-node/src/storage/archive.rs b/crates/gitlawb-node/src/storage/archive.rs index fd32045..d5828a4 100644 --- a/crates/gitlawb-node/src/storage/archive.rs +++ b/crates/gitlawb-node/src/storage/archive.rs @@ -178,11 +178,32 @@ fn decompress_repo(data: &[u8], local_path: &Path) -> Result<()> { let lock = publish_lock(local_path); let _publish = lock.lock().expect("publish lock poisoned"); let swap = (|| -> Result<()> { - if local_path.exists() { - std::fs::remove_dir_all(local_path).context("removing stale repo dir")?; + // Move any existing repo aside to a backup first, rather than deleting + // it up front: if the rename of the new copy then fails, we restore the + // backup so `local_path` is never left without a valid repo. (Most + // platforms refuse to rename onto a non-empty dir, hence the move-aside.) + let backup = if local_path.exists() { + let b = parent.join(format!(".{file_name}.bak-{}", uuid::Uuid::new_v4())); + std::fs::rename(local_path, &b).context("moving existing repo to backup")?; + Some(b) + } else { + None + }; + match std::fs::rename(&tmp_dir, local_path).context("swapping extracted repo into place") { + Ok(()) => { + if let Some(b) = backup { + let _ = std::fs::remove_dir_all(&b); + } + Ok(()) + } + Err(e) => { + // Restore the previous copy so the repo isn't left missing. + if let Some(b) = backup { + let _ = std::fs::rename(&b, local_path); + } + Err(e) + } } - std::fs::rename(&tmp_dir, local_path).context("swapping extracted repo into place")?; - Ok(()) })(); if swap.is_err() { // Don't leak the extracted temp dir if the swap failed. diff --git a/crates/gitlawb-node/src/storage/fs.rs b/crates/gitlawb-node/src/storage/fs.rs index 972d2a2..9ba48d0 100644 --- a/crates/gitlawb-node/src/storage/fs.rs +++ b/crates/gitlawb-node/src/storage/fs.rs @@ -97,10 +97,12 @@ impl BlobStore for FsBlobStore { async fn head(&self, key: &str) -> Result> { let path = self.path_for(key)?; - match Self::meta_of(&path) { - Ok(m) => Ok(Some(m)), - Err(_) if !path.exists() => Ok(None), - Err(e) => Err(e), + // Probe existence by io error kind, not path.exists(): a permission/IO + // error must surface, not be silently reported as "not found". + match std::fs::metadata(&path) { + Ok(_) => Self::meta_of(&path).map(Some), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e).context(format!("stat {}", path.display())), } } @@ -124,7 +126,11 @@ impl BlobStore for FsBlobStore { // reported as success would mislead GC/admin/migration callers. let rd = std::fs::read_dir(&dir) .with_context(|| format!("listing {}", dir.display()))?; - for entry in rd.flatten() { + for entry in rd { + // Propagate per-entry errors rather than dropping them via + // flatten(): a partial listing must not look like success. + let entry = + entry.with_context(|| format!("reading entry under {}", dir.display()))?; let path = entry.path(); if path.is_dir() { stack.push(path);