diff --git a/Cargo.lock b/Cargo.lock index 1f3080b..2e2ab75 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3362,6 +3362,7 @@ dependencies = [ "async-compression", "async-graphql", "async-graphql-axum", + "async-trait", "aws-config", "aws-sdk-s3", "axum", diff --git a/crates/gitlawb-node/Cargo.toml b/crates/gitlawb-node/Cargo.toml index 881d4bd..29011c2 100644 --- a/crates/gitlawb-node/Cargo.toml +++ b/crates/gitlawb-node/Cargo.toml @@ -32,6 +32,7 @@ sqlx = { version = "0.8", features = ["postgres", "runtime-tokio-rustls", "chron clap = { version = "4", features = ["derive", "env"] } bytes = "1" libc = "0.2" +async-trait = "0.1" cid = { workspace = true } hex = { workspace = true } sha2 = { workspace = true } diff --git a/crates/gitlawb-node/src/api/issues.rs b/crates/gitlawb-node/src/api/issues.rs index 59b647c..c308dde 100644 --- a/crates/gitlawb-node/src/api/issues.rs +++ b/crates/gitlawb-node/src/api/issues.rs @@ -70,10 +70,17 @@ pub async fn create_issue( let create_result = git_issues::create_issue(&disk_path, &issue_id, &json_str); - // Always release the advisory lock — even on error; upload to Tigris only on success. - guard.release(create_result.is_ok()).await; + // Always release the advisory lock — even on error; upload to storage only on success. + let release_result = guard.release(create_result.is_ok()).await; create_result.map_err(|e| AppError::Git(e.to_string()))?; + // The write succeeded locally; surface a durable-upload failure rather than + // acking an issue that never reached storage. + release_result.map_err(|e| { + AppError::Git(format!( + "issue stored locally but durable upload failed: {e}" + )) + })?; // Bump trust score for the issue author — increment current score by 0.05 // (avoids the push_count=0 stuck-at-0.05 bug for agents who only file issues) @@ -229,11 +236,11 @@ pub async fn close_issue( .ok() .and_then(|i| i.author), Ok(None) => { - guard.release(false).await; + let _ = guard.release(false).await; return Err(AppError::NotFound(format!("issue {issue_id} not found"))); } Err(e) => { - guard.release(false).await; + let _ = guard.release(false).await; return Err(AppError::Git(e.to_string())); } }; @@ -242,7 +249,7 @@ pub async fn close_issue( .as_deref() .is_some_and(|a| crate::api::did_matches(&auth.0, a)); if !is_owner && !is_author { - guard.release(false).await; + let _ = guard.release(false).await; return Err(AppError::Forbidden( "only the repo owner or the issue author can close this issue".into(), )); @@ -250,12 +257,17 @@ pub async fn close_issue( let close_result = git_issues::close_issue(&disk_path, &issue_id); - // Always release the advisory lock — even on error; upload to Tigris only on success. - guard.release(close_result.is_ok()).await; + // Always release the advisory lock — even on error; upload to storage only on success. + let release_result = guard.release(close_result.is_ok()).await; let updated = close_result .map_err(|e| AppError::Git(e.to_string()))? .ok_or_else(|| AppError::RepoNotFound(format!("issue {issue_id} not found")))?; + release_result.map_err(|e| { + AppError::Git(format!( + "issue stored locally but durable upload failed: {e}" + )) + })?; let issue: serde_json::Value = serde_json::from_str(&updated) .map_err(|e| AppError::BadRequest(format!("invalid issue data: {e}")))?; diff --git a/crates/gitlawb-node/src/api/pulls.rs b/crates/gitlawb-node/src/api/pulls.rs index 26be610..fb940a3 100644 --- a/crates/gitlawb-node/src/api/pulls.rs +++ b/crates/gitlawb-node/src/api/pulls.rs @@ -224,10 +224,17 @@ pub async fn merge_pr( &pr.title, ); - // Always release the advisory lock — even on error; upload to Tigris only on success. - guard.release(merge_result.is_ok()).await; + // Always release the advisory lock — even on error; upload to storage only on success. + let release_result = guard.release(merge_result.is_ok()).await; let merge_sha = merge_result.map_err(|e| AppError::Git(e.to_string()))?; + // Surface a durable-upload failure rather than acking a merge that never + // reached storage. + release_result.map_err(|e| { + AppError::Git(format!( + "merge applied locally but durable upload failed: {e}" + )) + })?; state.db.merge_pr(&pr.id, &merger_did).await?; let _ = state.db.touch_repo(&record.id).await; diff --git a/crates/gitlawb-node/src/api/repos.rs b/crates/gitlawb-node/src/api/repos.rs index 47f9eaf..14d4c29 100644 --- a/crates/gitlawb-node/src/api/repos.rs +++ b/crates/gitlawb-node/src/api/repos.rs @@ -376,7 +376,7 @@ pub async fn git_info_refs( } } - // For receive-pack (push), download the latest from Tigris so the client + // For receive-pack (push), download the latest from storage so the client // sees the same refs that acquire_write() will operate on. let disk_path = if service == "git-receive-pack" { state @@ -549,9 +549,36 @@ pub async fn git_receive_pack( let receive_result = smart_http::receive_pack(&disk_path, body).await; // Always release the advisory lock — even on error — to prevent stale locks - // from blocking subsequent pushes. Only upload to Tigris when the push + // from blocking subsequent pushes. Only upload to storage when the push // succeeded; uploading a half-applied repo would propagate corruption. - guard.release(receive_result.is_ok()).await; + let push_ok = receive_result.is_ok(); + if push_ok && state.config.async_upload { + // Write-back: ack the client now; the durable upload to object storage + // and the advisory-lock release run in the background. The lock is held + // until the upload finishes, so a concurrent writer on another machine + // can't observe a stale archive. Durability tradeoff: if the node stops + // after this ack but before the upload, the local copy survives but + // STORAGE stays stale until this repo's next successful push re-uploads + // — it is not automatically reconciled. Hence async_upload is opt-in. + let repo_label = name.to_string(); + tokio::spawn(async move { + if let Err(e) = guard.release(true).await { + tracing::error!(repo = %repo_label, err = %e, + "write-back durable upload failed after push was acked"); + } + }); + } else { + // Strict path (or failed push): upload-before-ack. On a successful push + // whose durable upload then fails, surface an error so the client knows + // the push is not durably stored (a failed push returns Ok from release + // and the push error is reported below). + if let Err(e) = guard.release(push_ok).await { + tracing::error!(repo = %name, err = %e, "durable upload failed after push"); + return Err(AppError::Git(format!( + "push applied locally but durable upload to storage failed: {e}" + ))); + } + } let result = receive_result.map_err(|e| { tracing::error!(repo = %name, err = %e, "git receive-pack failed"); @@ -1130,7 +1157,7 @@ pub async fn fork_repo( ))); } - // Ensure source repo is on local disk (downloads from Tigris on cache miss) + // Ensure source repo is on local disk (downloads from storage on cache miss) let source_path = state .repo_store .acquire(&source.owner_did, &source.name) @@ -1157,11 +1184,17 @@ pub async fn fork_repo( ))); } - // Upload fork to Tigris + // Upload fork to storage — fail closed if the durable upload fails rather + // than reporting a fork that only exists on this node's local disk. state .repo_store .release_after_write(&forker_did, &fork_name) - .await; + .await + .map_err(|e| { + AppError::Git(format!( + "fork created locally but durable upload failed: {e}" + )) + })?; let now = Utc::now(); let record = crate::db::RepoRecord { diff --git a/crates/gitlawb-node/src/config.rs b/crates/gitlawb-node/src/config.rs index 929471a..17be97f 100644 --- a/crates/gitlawb-node/src/config.rs +++ b/crates/gitlawb-node/src/config.rs @@ -121,11 +121,46 @@ pub struct Config { #[arg(long, env = "GITLAWB_HEARTBEAT_INTERVAL_HOURS", default_value_t = 20)] pub heartbeat_interval_hours: u64, - /// Tigris (S3-compatible) bucket for repo storage. + /// Tigris (S3-compatible) bucket for repo storage. Legacy alias for + /// `s3_bucket` — still honoured so existing deployments keep working. /// Leave empty to disable Tigris and use local-only storage. #[arg(long, env = "GITLAWB_TIGRIS_BUCKET", default_value = "")] pub tigris_bucket: String, + /// Object-storage backend: `s3` (any S3-compatible service), `fs` (local + /// directory), or `ipfs` (Kubo MFS). Empty = auto-detect: `s3` when a bucket + /// is set, else `fs` when a storage dir is set, else `ipfs` when an IPFS API + /// is set, else local-only. + #[arg(long, env = "GITLAWB_STORAGE_BACKEND", default_value = "")] + pub storage_backend: String, + + /// Bucket for the `s3` backend (Tigris, R2, AWS S3, MinIO, B2). Falls back to + /// `tigris_bucket` when empty. + #[arg(long, env = "GITLAWB_S3_BUCKET", default_value = "")] + pub s3_bucket: String, + + /// Endpoint URL override for the `s3` backend (e.g. R2/MinIO). On Tigris/Fly + /// the endpoint is auto-provided via `AWS_ENDPOINT_URL_S3`, so leave empty. + #[arg(long, env = "GITLAWB_S3_ENDPOINT", default_value = "")] + pub s3_endpoint: String, + + /// Force path-style S3 addressing (required by MinIO and some S3-compatibles). + #[arg(long, env = "GITLAWB_S3_FORCE_PATH_STYLE", default_value_t = false)] + pub s3_force_path_style: bool, + + /// Directory for the `fs` (local filesystem) storage backend. + #[arg(long, env = "GITLAWB_STORAGE_FS_DIR", default_value = "")] + pub storage_fs_dir: String, + + /// Acknowledge a push to the client before the durable upload to object + /// storage finishes (write-back). Lowers push latency, but opens a + /// durability window: if the node stops between the ack and the upload, on + /// restart a stale remote archive can overwrite the newer local copy. Off + /// by default (strict upload-before-ack); opt in only where the latency win + /// is worth that risk. + #[arg(long, env = "GITLAWB_ASYNC_UPLOAD", default_value_t = false)] + pub async_upload: bool, + /// Maximum pack body size for git-receive-pack and git-upload-pack, in bytes. /// Applies only to git smart-HTTP routes — all other API routes keep the 2 MB default. /// Default: 2 GB. Set lower on resource-constrained nodes. diff --git a/crates/gitlawb-node/src/db/mod.rs b/crates/gitlawb-node/src/db/mod.rs index cfb289e..86a3dc8 100644 --- a/crates/gitlawb-node/src/db/mod.rs +++ b/crates/gitlawb-node/src/db/mod.rs @@ -246,11 +246,6 @@ pub struct Db { } impl Db { - /// Access the underlying Postgres connection pool. - pub fn pool(&self) -> &PgPool { - &self.pool - } - #[cfg(test)] pub fn for_testing(pool: PgPool) -> Self { Self { pool } diff --git a/crates/gitlawb-node/src/git/mod.rs b/crates/gitlawb-node/src/git/mod.rs index 49259d5..a1d2687 100644 --- a/crates/gitlawb-node/src/git/mod.rs +++ b/crates/gitlawb-node/src/git/mod.rs @@ -2,5 +2,4 @@ pub mod issues; pub mod repo_store; pub mod smart_http; pub mod store; -pub mod tigris; pub mod visibility_pack; diff --git a/crates/gitlawb-node/src/git/repo_store.rs b/crates/gitlawb-node/src/git/repo_store.rs index a5c367e..6a495a5 100644 --- a/crates/gitlawb-node/src/git/repo_store.rs +++ b/crates/gitlawb-node/src/git/repo_store.rs @@ -1,35 +1,49 @@ -//! Centralized repo storage layer — local disk cache backed by Tigris (S3). +//! Centralized repo storage layer — local disk cache backed by a pluggable +//! object store (S3-compatible / filesystem / IPFS) via [`RepoArchive`]. //! //! Every handler that needs access to a git repo on disk goes through `RepoStore`: //! -//! - `acquire()` — ensures the repo is on local disk (downloads from Tigris on cache miss). -//! - `release_after_write()` — uploads the updated repo to Tigris after a write operation. -//! - `init()` — creates a new bare repo locally and uploads to Tigris. +//! - `acquire()` — ensures the repo is on local disk (downloads on cache miss). +//! - `acquire_write()` — write lock + ensures local matches storage (skips the +//! download when the cached etag already matches — the push-latency win). +//! - `release()` / `release_after_write()` — upload the updated repo to storage. +//! - `init()` — creates a new bare repo locally and uploads to storage. //! -//! When Tigris is disabled (bucket empty), this is a simple passthrough to local disk. +//! When no backend is configured, this is a simple passthrough to local disk. -use std::collections::HashSet; +use std::collections::{HashMap, HashSet}; use std::path::{Path, PathBuf}; use std::sync::Arc; use anyhow::{Context, Result}; -use sqlx::PgPool; +use sqlx::pool::PoolConnection; +use sqlx::{PgPool, Postgres}; use tokio::sync::Mutex; -use tracing::{debug, info, warn}; +use tracing::{debug, warn}; use super::store; -use super::tigris::TigrisClient; +use crate::storage::archive::RepoArchive; -/// Centralized repo storage: local disk cache + optional Tigris backend. +/// Centralized repo storage: local disk cache + optional object-storage backend +/// (S3-compatible / filesystem / IPFS) behind the [`RepoArchive`] layer. #[derive(Clone)] pub struct RepoStore { repos_dir: PathBuf, - tigris: Option, - /// Shared Postgres pool for advisory locks. - pool: PgPool, - /// Tracks repos already confirmed to exist in Tigris — avoids redundant + archive: Option, + /// Bounded pool dedicated to advisory-lock connections — the only DB pool + /// RepoStore needs, as it touches Postgres solely for advisory locks. A push + /// pins one connection for its whole lifetime (the lock is held across both + /// receive-pack and the upload), so a separate budget keeps a burst of + /// concurrent/slow pushes from draining the handler pool and starving every + /// other DB handler. + lock_pool: PgPool, + /// Tracks repos already confirmed to exist in storage — avoids redundant /// HEAD checks and background uploads for repos we've already migrated. migrated: Arc>>, + /// Last-known archive etag per `owner_slug/repo` key. Lets a write skip the + /// pre-write download when our local copy already matches storage (the + /// common case under sticky routing) — the main push-latency win. + versions: Arc>>, } impl RepoStore { @@ -37,80 +51,146 @@ impl RepoStore { pub fn for_testing(repos_dir: PathBuf, pool: PgPool) -> Self { Self { repos_dir, - tigris: None, - pool, - migrated: Arc::new(tokio::sync::Mutex::new(std::collections::HashSet::new())), + archive: None, + lock_pool: pool, + migrated: Arc::new(Mutex::new(HashSet::new())), + versions: Arc::new(Mutex::new(HashMap::new())), } } - pub fn new(repos_dir: PathBuf, tigris: Option, pool: PgPool) -> Self { + /// `lock_pool` is a bounded pool that advisory-lock connections are pinned + /// from, kept separate from the handler pool so push concurrency can't + /// drain it. + pub fn new(repos_dir: PathBuf, archive: Option, lock_pool: PgPool) -> Self { Self { repos_dir, - tigris, - pool, + archive, + lock_pool, migrated: Arc::new(Mutex::new(HashSet::new())), + versions: Arc::new(Mutex::new(HashMap::new())), } } - /// Ensure a repo is available on local disk, downloading from Tigris if needed. - /// If the repo exists locally but not yet in Tigris, a background upload is - /// spawned to lazily migrate it (on-demand migration for pre-Tigris repos). + /// Ensure the local copy matches storage, skipping the download when our + /// cached etag already equals the current archive etag. + /// + /// `require_fresh` selects the failure policy: + /// - `false` (read path, `acquire_fresh`): self-heal — if a storage HEAD or + /// download fails but a valid local copy exists, use it; a later upload + /// re-syncs storage. + /// - `true` (write path, `acquire_write`): fail closed — never fall back to + /// a possibly-stale local copy. The remote etag differs (remote is newer), + /// so uploading our stale copy after the write would clobber it (lost + /// update). Propagate the error so the write is rejected instead. + async fn sync_down_if_stale( + &self, + owner_slug: &str, + repo_name: &str, + local_path: &Path, + require_fresh: bool, + ) -> Result<()> { + let Some(ref archive) = self.archive else { + return Ok(()); + }; + let key = format!("{owner_slug}/{repo_name}"); + + let remote_etag = match archive.head_etag(owner_slug, repo_name).await { + Ok(Some(etag)) => etag, + Ok(None) => return Ok(()), // not in storage yet — local is authoritative + Err(e) => { + // HEAD failed. Read path: fall back to a valid local copy if we + // have one. Write path: fail closed (see `require_fresh`). + if !require_fresh && local_path.exists() { + warn!(repo = %repo_name, err = %e, "storage head failed — using local copy"); + return Ok(()); + } + return Err(e).context("storage head before access"); + } + }; + + if local_path.exists() { + let known = self.versions.lock().await.get(&key).cloned(); + if known.as_deref() == Some(remote_etag.as_str()) { + debug!(repo = %repo_name, "local copy current (etag match) — skipping download"); + return Ok(()); + } + } + + match archive.download(owner_slug, repo_name, local_path).await { + Ok(()) => { + self.versions.lock().await.insert(key, remote_etag); + Ok(()) + } + Err(e) => { + // Read path self-heal only: a corrupt/unreadable archive must not + // block access when a valid local copy exists. On the write path + // the remote etag differs (remote is newer), so falling back and + // later uploading our stale copy would clobber it — fail closed. + if !require_fresh && local_path.exists() { + warn!(repo = %repo_name, err = %e, + "archive download failed — falling back to local copy"); + Ok(()) + } else { + Err(e).context("downloading repo archive") + } + } + } + } + + /// Ensure a repo is available on local disk, downloading from storage if needed. + /// If the repo exists locally but not yet in storage, a background upload is + /// spawned to lazily migrate it (on-demand migration for pre-storage repos). /// Returns the local path to the bare repo. pub async fn acquire(&self, owner_did: &str, repo_name: &str) -> Result { let (owner_slug, local_path) = self.local_path(owner_did, repo_name)?; // Fast path: repo exists locally if local_path.exists() { - // Lazy migration: if Tigris is enabled and we haven't confirmed this - // repo is in Tigris yet, check and upload in the background. - if let Some(ref tigris) = self.tigris { + // Lazy migration: if storage is enabled and we haven't confirmed this + // repo is in storage yet, check and upload in the background. + if self.archive.is_some() { let key = format!("{owner_slug}/{repo_name}"); let already_migrated = self.migrated.lock().await.contains(&key); if !already_migrated { - let tigris = tigris.clone(); + let this = self.clone(); let slug = owner_slug.clone(); let name = repo_name.to_string(); let path = local_path.clone(); - let migrated = Arc::clone(&self.migrated); + let key = key.clone(); tokio::spawn(async move { - // Check if already in Tigris before uploading - match tigris.exists(&slug, &name).await { - Ok(true) => { - debug!(repo = %name, "repo already in tigris — skipping migration"); - } - Ok(false) => { - info!(repo = %name, "migrating local repo to tigris"); - if let Err(e) = tigris.upload(&slug, &name, &path).await { - warn!(repo = %name, err = %e, "lazy migration to tigris failed"); - return; - } - info!(repo = %name, "lazy migration to tigris complete"); + // Upload under the advisory lock (skip if already present) + // so this opportunistic migration can't clobber a + // concurrent locked push by landing a stale snapshot. + match this.upload_under_lock(&slug, &name, &path, true).await { + Ok(()) => { + this.migrated.lock().await.insert(key); + debug!(repo = %name, "lazy migration to storage complete (or already present)"); } Err(e) => { - warn!(repo = %name, err = %e, "tigris existence check failed"); - return; + warn!(repo = %name, err = %e, "lazy migration to storage failed"); } } - migrated.lock().await.insert(format!("{slug}/{name}")); }); } } return Ok(local_path); } - // Try downloading from Tigris - if let Some(ref tigris) = self.tigris { - if tigris.exists(&owner_slug, repo_name).await.unwrap_or(false) { - debug!(repo = %repo_name, "cache miss — downloading from tigris"); - tigris + // Try downloading from storage + if let Some(ref archive) = self.archive { + if let Some(remote_etag) = archive + .head_etag(&owner_slug, repo_name) + .await + .context("checking storage for repo")? + { + debug!(repo = %repo_name, "cache miss — downloading from storage"); + archive .download(&owner_slug, repo_name, &local_path) .await - .context("downloading repo from tigris")?; - // Mark as migrated since we just downloaded it - self.migrated - .lock() - .await - .insert(format!("{owner_slug}/{repo_name}")); + .context("downloading repo from storage")?; + let key = format!("{owner_slug}/{repo_name}"); + self.migrated.lock().await.insert(key.clone()); + self.versions.lock().await.insert(key, remote_etag); return Ok(local_path); } } @@ -120,34 +200,14 @@ impl RepoStore { Ok(local_path) } - /// Ensure a repo is available on local disk with the **latest** Tigris state. + /// Ensure a repo is available on local disk with the **latest** storage state. /// Use this for operations that precede a write (e.g. `info/refs` for /// `git-receive-pack`) so the client sees the same refs that `acquire_write()` /// will operate on. pub async fn acquire_fresh(&self, owner_did: &str, repo_name: &str) -> Result { let (owner_slug, local_path) = self.local_path(owner_did, repo_name)?; - - if let Some(ref tigris) = self.tigris { - if tigris.exists(&owner_slug, repo_name).await.unwrap_or(false) { - debug!(repo = %repo_name, "acquire_fresh: downloading latest from tigris"); - if let Err(e) = tigris.download(&owner_slug, repo_name, &local_path).await { - // The Tigris archive is present (HEAD ok) but unreadable — a - // corrupt/partial upload, or a transient GET failure. If we have a - // valid local copy, proceed with it rather than blocking the write; - // the post-write upload re-syncs (self-heals) Tigris. Only hard-fail - // when there is no local copy to fall back to. - if local_path.exists() { - warn!(repo = %repo_name, err = %e, - "acquire_fresh: tigris download failed — falling back to local copy"); - return Ok(local_path); - } - return Err(e).context("downloading repo from tigris (fresh)"); - } - return Ok(local_path); - } - } - - // Tigris disabled or repo not in Tigris — fall back to local + self.sync_down_if_stale(&owner_slug, repo_name, &local_path, false) + .await?; Ok(local_path) } @@ -157,13 +217,25 @@ impl RepoStore { let (owner_slug, local_path) = self.local_path(owner_did, repo_name)?; let lock_key = advisory_lock_key(&owner_slug, repo_name); - // Acquire Postgres advisory lock with retry using pg_try_advisory_lock - // to avoid blocking indefinitely on stale locks from crashed connections. + // Postgres advisory locks are SESSION-scoped: they bind to one backend + // connection and only release on that same connection. With a pool, + // acquiring and releasing on different checked-out connections means the + // unlock silently no-ops while the lock lingers on the original. So we + // pin ONE connection for the whole lock lifetime — acquire, release on + // sync error, and the final release in the guard all run on it. + let mut conn = self + .lock_pool + .acquire() + .await + .context("acquiring db connection for advisory lock")?; + + // Acquire with retry using pg_try_advisory_lock to avoid blocking + // indefinitely on stale locks from crashed connections. let mut acquired = false; for attempt in 0..60 { let row: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)") .bind(lock_key) - .fetch_one(&self.pool) + .fetch_one(&mut *conn) .await .context("trying advisory lock")?; if row.0 { @@ -178,23 +250,25 @@ impl RepoStore { anyhow::bail!("could not acquire advisory lock after 60s — possible stale lock for {owner_slug}/{repo_name}"); } - // Always download the latest from Tigris before writing. - // Local disk may be stale if another machine pushed since our last access. - if let Some(ref tigris) = self.tigris { - if tigris.exists(&owner_slug, repo_name).await.unwrap_or(false) { - debug!(repo = %repo_name, "write acquire: downloading latest from tigris"); - if let Err(e) = tigris.download(&owner_slug, repo_name, &local_path).await { - // Same self-healing fallback as acquire_fresh: a corrupt/unreadable - // Tigris archive must not block a write when a valid local copy - // exists — release(success) will re-upload a good archive. - if local_path.exists() { - warn!(repo = %repo_name, err = %e, - "write acquire: tigris download failed — falling back to local copy"); - } else { - return Err(e).context("downloading repo from tigris for write"); - } - } + // Ensure local matches the latest in storage before writing. The etag + // cache skips the full download when our copy is already current (the + // common single-machine case under sticky routing); a stale copy (another + // machine pushed since) still triggers a download. The advisory lock above + // serializes this so the post-write upload can't race a concurrent writer. + if let Err(e) = self + .sync_down_if_stale(&owner_slug, repo_name, &local_path, true) + .await + { + // Release the lock on the SAME connection before bailing. + if let Err(unlock_err) = sqlx::query("SELECT pg_advisory_unlock($1)") + .bind(lock_key) + .execute(&mut *conn) + .await + { + warn!(repo = %repo_name, err = %unlock_err, + "failed to release advisory lock after sync error"); } + return Err(e); } Ok(RepoWriteGuard { @@ -202,47 +276,122 @@ impl RepoStore { repo_name: repo_name.to_string(), local_path, lock_key, - pool: self.pool.clone(), - tigris: self.tigris.clone(), + conn, + archive: self.archive.clone(), + versions: Arc::clone(&self.versions), }) } - /// Initialize a new bare repo on local disk and upload to Tigris. + /// Initialize a new bare repo on local disk and upload to storage. pub async fn init(&self, owner_did: &str, repo_name: &str) -> Result { let (owner_slug, local_path) = self.local_path(owner_did, repo_name)?; store::init_bare(&local_path).context("initializing bare repo")?; - // Upload to Tigris in background - if let Some(ref tigris) = self.tigris { - let tigris = tigris.clone(); - let owner_slug = owner_slug.clone(); - let repo_name = repo_name.to_string(); - let path = local_path.clone(); - tokio::spawn(async move { - if let Err(e) = tigris.upload(&owner_slug, &repo_name, &path).await { - warn!(repo = %repo_name, err = %e, "failed to upload new repo to tigris"); - } - }); - } + // Upload the new repo synchronously under the advisory lock: a background + // upload could land the empty repo *after* a racing first push and clobber + // it, and a silent failure would leave the repo absent from storage. Fail + // closed instead — surface upload errors to the caller. + self.upload_under_lock(&owner_slug, repo_name, &local_path, false) + .await + .context("uploading new repo to storage")?; Ok(local_path) } - /// Upload a repo to Tigris after a write operation (push, merge, fork, etc.). - /// Call this after any operation that modifies the git repo on disk. - pub async fn release_after_write(&self, owner_did: &str, repo_name: &str) { - if let Some(ref tigris) = self.tigris { - let (owner_slug, local_path) = match self.local_path(owner_did, repo_name) { - Ok(p) => p, - Err(e) => { - warn!(repo = %repo_name, err = %e, "rejected unsafe path in release_after_write"); - return; - } + /// Upload a repo to storage after a write operation (merge, fork, etc.). + /// Call this after any operation that modifies the git repo on disk. Returns + /// `Err` if the durable upload fails so the caller can surface it rather than + /// acking a write that never reached storage. + pub async fn release_after_write(&self, owner_did: &str, repo_name: &str) -> Result<()> { + let Some(ref archive) = self.archive else { + return Ok(()); + }; + let (owner_slug, local_path) = self + .local_path(owner_did, repo_name) + .context("rejected unsafe path in release_after_write")?; + let key = format!("{owner_slug}/{repo_name}"); + match archive.upload(&owner_slug, repo_name, &local_path).await { + Ok(Some(etag)) => { + self.versions.lock().await.insert(key, etag); + Ok(()) + } + Ok(None) => Ok(()), + Err(e) => { + // Invalidate so the next access re-downloads rather than trusting + // a local copy we failed to persist. + self.versions.lock().await.remove(&key); + Err(e).context("uploading repo to storage after write") + } + } + } + + /// Upload `local_path` to storage while holding the per-repo advisory lock, + /// so a background or init-time upload can't clobber a concurrent locked + /// write by landing an older snapshot after it. With `skip_if_exists`, skips + /// the upload when the archive is already present (used by lazy migration). + async fn upload_under_lock( + &self, + owner_slug: &str, + repo_name: &str, + local_path: &Path, + skip_if_exists: bool, + ) -> Result<()> { + let Some(ref archive) = self.archive else { + return Ok(()); + }; + let lock_key = advisory_lock_key(owner_slug, repo_name); + let mut conn = self + .lock_pool + .acquire() + .await + .context("acquiring db connection for upload lock")?; + + let mut acquired = false; + for attempt in 0..30 { + let row: (bool,) = sqlx::query_as("SELECT pg_try_advisory_lock($1)") + .bind(lock_key) + .fetch_one(&mut *conn) + .await + .context("trying advisory lock")?; + if row.0 { + acquired = true; + break; + } + if attempt < 29 { + tokio::time::sleep(std::time::Duration::from_secs(1)).await; + } + } + if !acquired { + anyhow::bail!("could not acquire advisory lock for upload of {owner_slug}/{repo_name}"); + } + + let outcome: Result> = + if skip_if_exists && archive.exists(owner_slug, repo_name).await.unwrap_or(false) { + Ok(None) // already present — nothing to upload + } else { + archive.upload(owner_slug, repo_name, local_path).await }; - if let Err(e) = tigris.upload(&owner_slug, repo_name, &local_path).await { - warn!(repo = %repo_name, err = %e, "failed to upload repo to tigris after write"); + + // Release the lock on the same connection regardless of outcome. + if let Err(e) = sqlx::query("SELECT pg_advisory_unlock($1)") + .bind(lock_key) + .execute(&mut *conn) + .await + { + warn!(repo = %repo_name, err = %e, "failed to release upload lock"); + } + + match outcome { + Ok(Some(etag)) => { + self.versions + .lock() + .await + .insert(format!("{owner_slug}/{repo_name}"), etag); + Ok(()) } + Ok(None) => Ok(()), + Err(e) => Err(e).context("uploading repo to storage under lock"), } } @@ -348,14 +497,24 @@ fn validate_repo_name(repo_name: &str) -> Result<()> { } /// Guard returned by `acquire_write()`. Holds the Postgres advisory lock and -/// uploads to Tigris + releases the lock on `release()`. +/// uploads to storage + releases the lock on `release()`. +/// +/// `#[must_use]`: dropping the guard without calling `release()` returns its +/// connection to the pool with the *session* advisory lock still held, which +/// then lingers until that backend connection is evicted — so the lock must be +/// released explicitly. +#[must_use = "call release() — dropping the guard leaks the advisory lock onto the pooled connection"] pub struct RepoWriteGuard { owner_slug: String, repo_name: String, pub local_path: PathBuf, lock_key: i64, - pool: PgPool, - tigris: Option, + /// The connection the session-scoped advisory lock was taken on. The lock + /// must be released on this same connection, so it's held for the guard's + /// lifetime and dropped (returned to the pool) only after `release()`. + conn: PoolConnection, + archive: Option, + versions: Arc>>, } impl RepoWriteGuard { @@ -364,31 +523,66 @@ impl RepoWriteGuard { &self.local_path } - /// Upload to Tigris (only when the write succeeded) and release the advisory + /// Upload to storage (only when the write succeeded) and release the advisory /// lock. Pass `success = false` when the write operation failed — uploading a /// half-applied or otherwise inconsistent repo would propagate corruption to - /// Tigris (and to every node that later downloads it). The lock is always + /// storage (and to every node that later downloads it). The lock is always /// released regardless, to avoid stale locks blocking future writes. - pub async fn release(self, success: bool) { - // Upload to Tigris only on success. - if success { - if let Some(ref tigris) = self.tigris { - if let Err(e) = tigris + /// + /// IMPORTANT: the advisory lock is held until the upload finishes, so a + /// concurrent writer on another machine cannot read a stale archive. When + /// callers want a fast client ack, they spawn this future as a background + /// task (write-back) — the lock + etag-cache update still complete in order. + pub async fn release(mut self, success: bool) -> Result<()> { + let key = format!("{}/{}", self.owner_slug, self.repo_name); + + // Upload to storage only on success. Capture the outcome so we can both + // release the lock unconditionally and propagate a durable-upload + // failure to the caller (a synchronous caller turns it into a client + // error; a write-back caller logs it). + let upload_result: Result<()> = if success { + if let Some(ref archive) = self.archive { + match archive .upload(&self.owner_slug, &self.repo_name, &self.local_path) .await { - warn!(repo = %self.repo_name, err = %e, "failed to upload repo to tigris after write"); + Ok(Some(etag)) => { + self.versions.lock().await.insert(key.clone(), etag); + Ok(()) + } + Ok(None) => Ok(()), + Err(e) => { + // The durable copy is now stale. Drop the cached etag so + // the next write re-downloads and reconciles rather than + // trusting a local copy we failed to persist. + self.versions.lock().await.remove(&key); + Err(e).context("uploading repo to storage after write") + } } + } else { + Ok(()) } } else { - warn!(repo = %self.repo_name, "write failed — skipping tigris upload to avoid propagating an inconsistent repo"); + // Write failed: skip the upload (a half-applied repo must not reach + // storage) and invalidate the cached etag — the local copy may be + // dirty, so the next write must re-download instead of skipping on a + // now-misleading etag match. + warn!(repo = %self.repo_name, "write failed — skipping storage upload and invalidating etag cache"); + self.versions.lock().await.remove(&key); + Ok(()) + }; + + // Release the advisory lock on the same connection it was taken on + // regardless of the upload outcome, then drop the connection. + if let Err(e) = sqlx::query("SELECT pg_advisory_unlock($1)") + .bind(self.lock_key) + .execute(&mut *self.conn) + .await + { + warn!(repo = %self.repo_name, err = %e, "failed to release advisory lock"); } - // Release advisory lock - let _ = sqlx::query("SELECT pg_advisory_unlock($1)") - .bind(self.lock_key) - .execute(&self.pool) - .await; + upload_result } } @@ -562,4 +756,102 @@ mod tests { ); } } + + // ── sync_down_if_stale (fs-backed archive, lazy pool) ────────────────── + + /// A RepoStore over an fs-backed archive. `sync_down_if_stale` never touches + /// the pool, so a lazy (never-connected) pool is fine. + fn store_with_fs_archive(repos_dir: PathBuf, store_root: &Path) -> RepoStore { + let blob: Arc = + Arc::new(crate::storage::fs::FsBlobStore::new(store_root).unwrap()); + let archive = crate::storage::archive::RepoArchive::new(blob); + let pool = sqlx::PgPool::connect_lazy("postgres://invalid").unwrap(); + RepoStore::new(repos_dir, Some(archive), pool) + } + + #[tokio::test] + async fn sync_down_if_stale_downloads_then_skips_on_etag_match() { + let store_root = tempfile::tempdir().unwrap(); + let repos_dir = tempfile::tempdir().unwrap(); + let store = store_with_fs_archive(repos_dir.path().to_path_buf(), store_root.path()); + + // Seed the archive with a repo. + let seed = tempfile::tempdir().unwrap(); + std::fs::write(seed.path().join("HEAD"), b"v1\n").unwrap(); + store + .archive + .as_ref() + .unwrap() + .upload("owner", "repo", seed.path()) + .await + .unwrap(); + + let local = repos_dir.path().join("owner").join("repo.git"); + + // First call downloads. + store + .sync_down_if_stale("owner", "repo", &local, false) + .await + .unwrap(); + assert_eq!(std::fs::read(local.join("HEAD")).unwrap(), b"v1\n"); + + // Locally mutate, then sync again: the cached etag still matches the + // remote, so the download is skipped and our local edit survives. + std::fs::write(local.join("HEAD"), b"LOCAL-EDIT\n").unwrap(); + store + .sync_down_if_stale("owner", "repo", &local, false) + .await + .unwrap(); + assert_eq!( + std::fs::read(local.join("HEAD")).unwrap(), + b"LOCAL-EDIT\n", + "etag match must skip the download (local copy preserved)" + ); + } + + #[tokio::test] + async fn sync_down_if_stale_require_fresh_fails_closed_on_bad_remote() { + let store_root = tempfile::tempdir().unwrap(); + let repos_dir = tempfile::tempdir().unwrap(); + let store = store_with_fs_archive(repos_dir.path().to_path_buf(), store_root.path()); + + let seed = tempfile::tempdir().unwrap(); + std::fs::write(seed.path().join("HEAD"), b"v1\n").unwrap(); + store + .archive + .as_ref() + .unwrap() + .upload("owner", "repo", seed.path()) + .await + .unwrap(); + + let local = repos_dir.path().join("owner").join("repo.git"); + store + .sync_down_if_stale("owner", "repo", &local, false) + .await + .unwrap(); + + // Corrupt the stored archive: HEAD now succeeds with a *new* etag (so the + // cache no longer matches and a download is forced), but the download + // decompresses garbage and fails. + let blob_path = store_root.path().join("repos/v1/owner/repo.tar.zst"); + std::fs::write(&blob_path, b"corrupted not-a-tar-zst").unwrap(); + + // Write path: must fail closed rather than fall back to the stale local + // copy (which a later upload would use to clobber the newer remote). + assert!( + store + .sync_down_if_stale("owner", "repo", &local, true) + .await + .is_err(), + "require_fresh=true must propagate the download error" + ); + + // Read path: self-heals — falls back to the valid local copy. + store + .sync_down_if_stale("owner", "repo", &local, false) + .await + .expect("require_fresh=false must fall back to the local copy"); + assert_eq!(std::fs::read(local.join("HEAD")).unwrap(), b"v1\n"); + } } diff --git a/crates/gitlawb-node/src/git/tigris.rs b/crates/gitlawb-node/src/git/tigris.rs deleted file mode 100644 index ad26ddc..0000000 --- a/crates/gitlawb-node/src/git/tigris.rs +++ /dev/null @@ -1,226 +0,0 @@ -//! Tigris (S3-compatible) storage client for git bare repos. -//! -//! Repos are stored as `repos/v1/{owner_slug}/{repo_name}.tar.zst` — a -//! zstd-compressed tar archive of the bare repo directory. - -use std::collections::HashMap; -use std::path::{Path, PathBuf}; -use std::sync::{Arc, Mutex, OnceLock}; - -use anyhow::{Context, Result}; -use aws_sdk_s3::Client as S3Client; -use tracing::{debug, info}; - -/// Wrapper around the S3 client with the configured bucket. -#[derive(Clone)] -pub struct TigrisClient { - s3: S3Client, - bucket: String, -} - -impl TigrisClient { - /// Create a new client. Uses AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY, and - /// AWS_ENDPOINT_URL_S3 env vars — all set automatically by Fly for Tigris buckets. - pub async fn new(bucket: &str) -> Result { - let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; - let s3 = S3Client::new(&config); - info!(bucket = %bucket, "tigris storage client initialized"); - Ok(Self { - s3, - bucket: bucket.to_string(), - }) - } - - /// S3 key for a given repo: `repos/v1/{owner_slug}/{repo_name}.tar.zst` - fn repo_key(owner_slug: &str, repo_name: &str) -> String { - format!("repos/v1/{owner_slug}/{repo_name}.tar.zst") - } - - /// Check if a repo archive exists in Tigris. - pub async fn exists(&self, owner_slug: &str, repo_name: &str) -> Result { - let key = Self::repo_key(owner_slug, repo_name); - match self - .s3 - .head_object() - .bucket(&self.bucket) - .key(&key) - .send() - .await - { - Ok(_) => Ok(true), - Err(e) => { - if e.as_service_error().is_some_and(|e| e.is_not_found()) { - Ok(false) - } else { - Err(anyhow::anyhow!("tigris HEAD {key}: {e}")) - } - } - } - } - - /// Upload a local bare repo directory to Tigris as a tar.zst archive. - pub async fn upload(&self, owner_slug: &str, repo_name: &str, local_path: &Path) -> Result<()> { - let key = Self::repo_key(owner_slug, repo_name); - debug!(key = %key, path = %local_path.display(), "uploading repo to tigris"); - - // Create tar.zst in memory - let archive_bytes = tokio::task::spawn_blocking({ - let local_path = local_path.to_path_buf(); - move || compress_repo(&local_path) - }) - .await - .context("tar task panicked")? - .context("compressing repo")?; - - let body = aws_sdk_s3::primitives::ByteStream::from(archive_bytes); - - self.s3 - .put_object() - .bucket(&self.bucket) - .key(&key) - .body(body) - .content_type("application/zstd") - .send() - .await - .context(format!("tigris PUT {key}"))?; - - info!(key = %key, "uploaded repo to tigris"); - Ok(()) - } - - /// Download a repo archive from Tigris and extract to local disk. - pub async fn download( - &self, - owner_slug: &str, - repo_name: &str, - local_path: &Path, - ) -> Result<()> { - let key = Self::repo_key(owner_slug, repo_name); - debug!(key = %key, path = %local_path.display(), "downloading repo from tigris"); - - let resp = self - .s3 - .get_object() - .bucket(&self.bucket) - .key(&key) - .send() - .await - .context(format!("tigris GET {key}"))?; - - let data = resp - .body - .collect() - .await - .context("reading tigris response body")? - .into_bytes(); - - // Extract tar.zst to local path - tokio::task::spawn_blocking({ - let local_path = local_path.to_path_buf(); - move || decompress_repo(&data, &local_path) - }) - .await - .context("extract task panicked")? - .context("extracting repo")?; - - info!(key = %key, path = %local_path.display(), "downloaded repo from tigris"); - Ok(()) - } - - /// Delete a repo archive from Tigris. - #[allow(dead_code)] - pub async fn delete(&self, owner_slug: &str, repo_name: &str) -> Result<()> { - let key = Self::repo_key(owner_slug, repo_name); - self.s3 - .delete_object() - .bucket(&self.bucket) - .key(&key) - .send() - .await - .context(format!("tigris DELETE {key}"))?; - Ok(()) - } -} - -/// Compress a bare repo directory into a tar.zst byte vector. -fn compress_repo(repo_path: &Path) -> Result> { - let buf = Vec::new(); - let encoder = zstd::stream::Encoder::new(buf, 3)?; // level 3 = fast + decent ratio - let mut tar = tar::Builder::new(encoder); - - // Append the bare repo directory contents (not the directory itself) - tar.append_dir_all(".", repo_path) - .context("building tar archive")?; - - let encoder = tar.into_inner().context("finishing tar")?; - let compressed = encoder.finish().context("finishing zstd")?; - Ok(compressed) -} - -/// Per-repo-path lock serializing the publish (swap-into-place) step of -/// `decompress_repo`. Concurrent extractions unpack into isolated temp dirs in -/// parallel, but the final `remove_dir_all` + `rename` must not interleave for -/// the same `local_path`, or they race to a nondeterministic overwrite/failure. -fn publish_lock(local_path: &Path) -> Arc> { - // KNOWN LIMITATION: this map is never evicted — one (PathBuf, Arc) - // entry accrues per distinct repo path for the process lifetime. Bounded by - // the number of repos a node hosts, so it's negligible for normal use, but - // high-volume/churning deployments may want LRU or weak-ref eviction here. - static LOCKS: OnceLock>>>> = OnceLock::new(); - let locks = LOCKS.get_or_init(|| Mutex::new(HashMap::new())); - let mut map = locks.lock().expect("publish lock map poisoned"); - map.entry(local_path.to_path_buf()) - .or_insert_with(|| Arc::new(Mutex::new(()))) - .clone() -} - -/// Decompress a tar.zst byte vector into a local directory. -/// -/// Extraction is atomic with respect to `local_path`: the archive is unpacked -/// into a sibling temp directory first, and only swapped into place once it -/// fully succeeds. A corrupt or truncated archive therefore can never clobber a -/// good existing copy at `local_path` — on failure we discard the temp dir and -/// leave `local_path` exactly as it was. -fn decompress_repo(data: &[u8], local_path: &Path) -> Result<()> { - let parent = local_path.parent().context("repo path has no parent")?; - std::fs::create_dir_all(parent).context("creating parent dir")?; - - let file_name = local_path - .file_name() - .context("repo path has no file name")? - .to_string_lossy(); - // Unique per-extraction temp dir: a fixed name would let two concurrent - // extractions of the same repo share one dir and clobber each other's - // in-progress unpack. A fresh UUID also means it can't collide with a - // leftover dir from a previously-interrupted run. - let tmp_dir = parent.join(format!(".{file_name}.tmp-extract.{}", uuid::Uuid::new_v4())); - - std::fs::create_dir_all(&tmp_dir).context("creating temp extract dir")?; - - // Unpack into the temp dir; on any failure, clean up and bail without - // touching local_path. - let unpack = (|| -> Result<()> { - let decoder = zstd::stream::Decoder::new(data)?; - let mut archive = tar::Archive::new(decoder); - archive.unpack(&tmp_dir).context("unpacking tar.zst")?; - Ok(()) - })(); - if let Err(e) = unpack { - let _ = std::fs::remove_dir_all(&tmp_dir); - return Err(e); - } - - // Swap the freshly-extracted repo into place. rename within the same parent - // is effectively atomic, but most platforms refuse to rename onto a - // non-empty dir, so remove the old copy first. Serialize this per repo path: - // concurrent extractions unpack into isolated temp dirs, but their swaps - // must not interleave or they race to a nondeterministic overwrite/failure. - let lock = publish_lock(local_path); - let _publish = lock.lock().expect("publish lock poisoned"); - if local_path.exists() { - std::fs::remove_dir_all(local_path).context("removing stale repo dir")?; - } - std::fs::rename(&tmp_dir, local_path).context("swapping extracted repo into place")?; - - Ok(()) -} diff --git a/crates/gitlawb-node/src/main.rs b/crates/gitlawb-node/src/main.rs index c881634..ab4ec7d 100644 --- a/crates/gitlawb-node/src/main.rs +++ b/crates/gitlawb-node/src/main.rs @@ -17,6 +17,7 @@ mod pinata; mod rate_limit; mod server; mod state; +mod storage; mod sync; #[cfg(test)] mod test_support; @@ -169,25 +170,28 @@ async fn main() -> Result<()> { info!(" fly machine: {mid}"); } - // Initialize Tigris S3 client if bucket is configured - let tigris = if !config.tigris_bucket.is_empty() { - match git::tigris::TigrisClient::new(&config.tigris_bucket).await { - Ok(client) => { - info!(bucket = %config.tigris_bucket, "tigris storage enabled"); - Some(client) - } - Err(e) => { - tracing::warn!(err = %e, "failed to initialize Tigris client — using local-only storage"); - None - } - } - } else { - info!("tigris storage disabled (no bucket configured)"); - None - }; + // Initialize the storage-agnostic blob backend (S3-compatible / filesystem / + // IPFS), then wrap it in the repo-archive layer. `None` = local-only mode. + // Fail closed: a configured-but-unreachable backend aborts boot rather than + // silently running local-only and dropping durability. + let blob_store = storage::build(&config) + .await + .context("initializing object storage backend")?; + let archive = blob_store.map(storage::archive::RepoArchive::new); + + // Dedicated, bounded pool for advisory-lock connections. A push pins one + // connection for its whole lifetime (lock held across receive-pack + + // upload), so giving locks their own budget keeps a burst of concurrent or + // slow pushes from draining the main pool and stalling every other DB + // handler node-wide. Sized independently of the handler pool. + const ADVISORY_LOCK_POOL_SIZE: u32 = 16; + let lock_pool = sqlx::postgres::PgPoolOptions::new() + .max_connections(ADVISORY_LOCK_POOL_SIZE) + .connect(&config.database_url) + .await + .context("creating advisory-lock connection pool")?; - let repo_store = - git::repo_store::RepoStore::new(config.repos_dir.clone(), tigris, db.pool().clone()); + let repo_store = git::repo_store::RepoStore::new(config.repos_dir.clone(), archive, lock_pool); let rate_limiter = rate_limit::RateLimiter::new(10, std::time::Duration::from_secs(3600)); diff --git a/crates/gitlawb-node/src/state.rs b/crates/gitlawb-node/src/state.rs index 85ce0a8..0fa099b 100644 --- a/crates/gitlawb-node/src/state.rs +++ b/crates/gitlawb-node/src/state.rs @@ -47,7 +47,7 @@ pub struct AppState { pub graphql_schema: Arc, /// Fly.io machine ID — used for fly-replay routing in multi-machine deployments pub machine_id: Option, - /// Centralized repo storage: local disk cache + optional Tigris backend + /// Centralized repo storage: local disk cache + optional object-store backend pub repo_store: RepoStore, /// Per-DID rate limiter for creation endpoints (repos, issues, PRs) pub rate_limiter: RateLimiter, diff --git a/crates/gitlawb-node/src/storage/archive.rs b/crates/gitlawb-node/src/storage/archive.rs new file mode 100644 index 0000000..d5828a4 --- /dev/null +++ b/crates/gitlawb-node/src/storage/archive.rs @@ -0,0 +1,307 @@ +//! Repo-archive layer: stores a bare git repo as a single +//! `repos/v1/{owner_slug}/{repo_name}.tar.zst` object on top of any +//! [`BlobStore`] backend. Backend-agnostic replacement for the old +//! single-backend (Tigris-only) client. + +use std::collections::HashMap; +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex, OnceLock}; + +use anyhow::{Context, Result}; +use bytes::Bytes; +use tracing::{debug, info}; + +use super::BlobStore; + +#[derive(Clone)] +pub struct RepoArchive { + store: Arc, +} + +impl RepoArchive { + pub fn new(store: Arc) -> Self { + Self { store } + } + + /// Object key for a repo archive. + fn key(owner_slug: &str, repo_name: &str) -> String { + format!("repos/v1/{owner_slug}/{repo_name}.tar.zst") + } + + /// Current archive etag, or `None` if the repo isn't in storage yet. + pub async fn head_etag(&self, owner_slug: &str, repo_name: &str) -> Result> { + let key = Self::key(owner_slug, repo_name); + Ok(self + .store + .head(&key) + .await? + .map(|m| m.etag.unwrap_or_else(|| format!("size:{}", m.size)))) + } + + /// Whether the repo archive exists in storage. + pub async fn exists(&self, owner_slug: &str, repo_name: &str) -> Result { + Ok(self + .store + .head(&Self::key(owner_slug, repo_name)) + .await? + .is_some()) + } + + /// Compress the bare repo and upload it. Returns the new etag (for the + /// skip-redundant-download cache). + pub async fn upload( + &self, + owner_slug: &str, + repo_name: &str, + local_path: &Path, + ) -> Result> { + let key = Self::key(owner_slug, repo_name); + let archive_bytes = tokio::task::spawn_blocking({ + let local_path = local_path.to_path_buf(); + move || compress_repo(&local_path) + }) + .await + .context("tar task panicked")? + .context("compressing repo")?; + + let meta = self + .store + .put(&key, Bytes::from(archive_bytes)) + .await + .context("uploading repo archive")?; + info!(key = %key, backend = self.store.backend_name(), "uploaded repo archive"); + Ok(meta.etag.or_else(|| Some(format!("size:{}", meta.size)))) + } + + /// Download the repo archive and extract it to `local_path` (atomic swap). + pub async fn download( + &self, + owner_slug: &str, + repo_name: &str, + local_path: &Path, + ) -> Result<()> { + let key = Self::key(owner_slug, repo_name); + debug!(key = %key, "downloading repo archive"); + let data = self + .store + .get(&key) + .await + .context("fetching repo archive")? + .ok_or_else(|| anyhow::anyhow!("repo archive missing: {key}"))?; + + tokio::task::spawn_blocking({ + let local_path = local_path.to_path_buf(); + move || decompress_repo(&data, &local_path) + }) + .await + .context("extract task panicked")? + .context("extracting repo")?; + info!(key = %key, path = %local_path.display(), "downloaded repo archive"); + Ok(()) + } + + /// Delete a repo archive. + #[allow(dead_code)] + pub async fn delete(&self, owner_slug: &str, repo_name: &str) -> Result<()> { + self.store.delete(&Self::key(owner_slug, repo_name)).await + } +} + +/// Compress a bare repo directory into a tar.zst byte vector. +fn compress_repo(repo_path: &Path) -> Result> { + let buf = Vec::new(); + let encoder = zstd::stream::Encoder::new(buf, 3)?; // level 3 = fast + decent ratio + let mut tar = tar::Builder::new(encoder); + tar.append_dir_all(".", repo_path) + .context("building tar archive")?; + let encoder = tar.into_inner().context("finishing tar")?; + let compressed = encoder.finish().context("finishing zstd")?; + Ok(compressed) +} + +/// Per-repo-path lock serializing the publish (swap-into-place) step of +/// `decompress_repo`. Concurrent extractions unpack into isolated temp dirs in +/// parallel, but the final `remove_dir_all` + `rename` must not interleave for +/// the same `local_path`, or they race to a nondeterministic overwrite/failure. +fn publish_lock(local_path: &Path) -> Arc> { + // KNOWN LIMITATION: this map is never evicted — one (PathBuf, Arc) + // entry accrues per distinct repo path for the process lifetime. Bounded by + // the number of repos a node hosts, so it's negligible for normal use, but + // high-volume/churning deployments may want LRU or weak-ref eviction here. + static LOCKS: OnceLock>>>> = OnceLock::new(); + let locks = LOCKS.get_or_init(|| Mutex::new(HashMap::new())); + let mut map = locks.lock().expect("publish lock map poisoned"); + map.entry(local_path.to_path_buf()) + .or_insert_with(|| Arc::new(Mutex::new(()))) + .clone() +} + +/// Decompress a tar.zst byte vector into a local directory. +/// +/// Extraction is atomic with respect to `local_path`: the archive is unpacked +/// into a sibling temp directory first, and only swapped into place once it +/// fully succeeds. A corrupt or truncated archive therefore can never clobber a +/// good existing copy at `local_path` — on failure we discard the temp dir and +/// leave `local_path` exactly as it was. +fn decompress_repo(data: &[u8], local_path: &Path) -> Result<()> { + let parent = local_path.parent().context("repo path has no parent")?; + std::fs::create_dir_all(parent).context("creating parent dir")?; + + let file_name = local_path + .file_name() + .context("repo path has no file name")? + .to_string_lossy(); + // Unique per-extraction temp dir: a fixed name would let two concurrent + // extractions of the same repo share one dir and clobber each other's + // in-progress unpack. A fresh UUID also means it can't collide with a + // leftover dir from a previously-interrupted run. + let tmp_dir = parent.join(format!(".{file_name}.tmp-extract.{}", uuid::Uuid::new_v4())); + + std::fs::create_dir_all(&tmp_dir).context("creating temp extract dir")?; + + let unpack = (|| -> Result<()> { + let decoder = zstd::stream::Decoder::new(data)?; + let mut archive = tar::Archive::new(decoder); + archive.unpack(&tmp_dir).context("unpacking tar.zst")?; + Ok(()) + })(); + if let Err(e) = unpack { + let _ = std::fs::remove_dir_all(&tmp_dir); + return Err(e); + } + + // Swap the freshly-extracted repo into place. rename within the same parent + // is effectively atomic, but most platforms refuse to rename onto a + // non-empty dir, so remove the old copy first. Serialize this per repo path: + // concurrent extractions unpack into isolated temp dirs, but their swaps + // must not interleave or they race to a nondeterministic overwrite/failure. + let lock = publish_lock(local_path); + let _publish = lock.lock().expect("publish lock poisoned"); + let swap = (|| -> Result<()> { + // Move any existing repo aside to a backup first, rather than deleting + // it up front: if the rename of the new copy then fails, we restore the + // backup so `local_path` is never left without a valid repo. (Most + // platforms refuse to rename onto a non-empty dir, hence the move-aside.) + let backup = if local_path.exists() { + let b = parent.join(format!(".{file_name}.bak-{}", uuid::Uuid::new_v4())); + std::fs::rename(local_path, &b).context("moving existing repo to backup")?; + Some(b) + } else { + None + }; + match std::fs::rename(&tmp_dir, local_path).context("swapping extracted repo into place") { + Ok(()) => { + if let Some(b) = backup { + let _ = std::fs::remove_dir_all(&b); + } + Ok(()) + } + Err(e) => { + // Restore the previous copy so the repo isn't left missing. + if let Some(b) = backup { + let _ = std::fs::rename(&b, local_path); + } + Err(e) + } + } + })(); + if swap.is_err() { + // Don't leak the extracted temp dir if the swap failed. + let _ = std::fs::remove_dir_all(&tmp_dir); + } + swap +} + +#[cfg(test)] +mod tests { + use super::*; + use std::fs; + + fn seed_repo(dir: &std::path::Path) { + fs::create_dir_all(dir.join("refs/heads")).unwrap(); + fs::write(dir.join("HEAD"), b"ref: refs/heads/main\n").unwrap(); + fs::write(dir.join("refs/heads/main"), b"abc123\n").unwrap(); + fs::write(dir.join("config"), b"[core]\n\tbare = true\n").unwrap(); + } + + #[test] + fn compress_decompress_round_trip_preserves_files() { + let src = tempfile::tempdir().unwrap(); + seed_repo(src.path()); + + let bytes = compress_repo(src.path()).unwrap(); + assert!(!bytes.is_empty()); + + let out_parent = tempfile::tempdir().unwrap(); + let out = out_parent.path().join("restored.git"); + decompress_repo(&bytes, &out).unwrap(); + + assert_eq!( + fs::read(out.join("HEAD")).unwrap(), + b"ref: refs/heads/main\n" + ); + assert_eq!(fs::read(out.join("refs/heads/main")).unwrap(), b"abc123\n"); + assert_eq!( + fs::read(out.join("config")).unwrap(), + b"[core]\n\tbare = true\n" + ); + } + + #[test] + fn decompress_swap_replaces_existing_dir_atomically() { + let src = tempfile::tempdir().unwrap(); + fs::write(src.path().join("HEAD"), b"new\n").unwrap(); + let bytes = compress_repo(src.path()).unwrap(); + + // Pre-existing copy with stale junk that the swap must fully replace. + let out_parent = tempfile::tempdir().unwrap(); + let out = out_parent.path().join("repo.git"); + fs::create_dir_all(&out).unwrap(); + fs::write(out.join("STALE"), b"old\n").unwrap(); + + decompress_repo(&bytes, &out).unwrap(); + assert_eq!(fs::read(out.join("HEAD")).unwrap(), b"new\n"); + assert!( + !out.join("STALE").exists(), + "stale content must be gone after the swap" + ); + } + + #[test] + fn decompress_corrupt_archive_leaves_existing_copy_untouched() { + let out_parent = tempfile::tempdir().unwrap(); + let out = out_parent.path().join("repo.git"); + fs::create_dir_all(&out).unwrap(); + fs::write(out.join("HEAD"), b"good\n").unwrap(); + + // Garbage is not a valid tar.zst: unpack fails before the swap, so the + // existing copy is preserved (atomicity claim). + assert!(decompress_repo(b"not a real archive", &out).is_err()); + assert_eq!(fs::read(out.join("HEAD")).unwrap(), b"good\n"); + } + + #[tokio::test] + async fn upload_download_round_trip_over_fs_backend() { + let store_dir = tempfile::tempdir().unwrap(); + let store: Arc = + Arc::new(crate::storage::fs::FsBlobStore::new(store_dir.path()).unwrap()); + let archive = RepoArchive::new(store); + + let src = tempfile::tempdir().unwrap(); + seed_repo(src.path()); + + assert!(!archive.exists("owner", "repo").await.unwrap()); + let etag = archive.upload("owner", "repo", src.path()).await.unwrap(); + assert!(etag.is_some()); + assert!(archive.exists("owner", "repo").await.unwrap()); + + let out_parent = tempfile::tempdir().unwrap(); + let out = out_parent.path().join("repo.git"); + archive.download("owner", "repo", &out).await.unwrap(); + assert_eq!( + fs::read(out.join("HEAD")).unwrap(), + b"ref: refs/heads/main\n" + ); + assert_eq!(fs::read(out.join("refs/heads/main")).unwrap(), b"abc123\n"); + } +} diff --git a/crates/gitlawb-node/src/storage/fs.rs b/crates/gitlawb-node/src/storage/fs.rs new file mode 100644 index 0000000..9ba48d0 --- /dev/null +++ b/crates/gitlawb-node/src/storage/fs.rs @@ -0,0 +1,234 @@ +//! Local filesystem blob backend. +//! +//! Stores each object as a file under a configured root directory, using the +//! object key as a relative path. For self-hosters without S3 and for tests of +//! the storage abstraction. The etag is a `size-mtime` fingerprint so the +//! skip-redundant-download optimization works against it. + +use std::path::{Path, PathBuf}; + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use bytes::Bytes; + +use super::{validate_key, BlobStore, ObjectMeta}; + +#[derive(Clone)] +pub struct FsBlobStore { + root: PathBuf, +} + +impl FsBlobStore { + pub fn new(root: impl AsRef) -> Result { + let root = root.as_ref().to_path_buf(); + std::fs::create_dir_all(&root) + .with_context(|| format!("creating storage dir {}", root.display()))?; + Ok(Self { root }) + } + + fn path_for(&self, key: &str) -> Result { + validate_key(key)?; + let path = self.root.join(key); + // Defence in depth: the resolved path must stay under root. + if !path.starts_with(&self.root) { + anyhow::bail!("blob key escaped storage root: {key}"); + } + Ok(path) + } + + fn meta_of(path: &Path) -> Result { + let md = std::fs::metadata(path).context("stat blob")?; + let mtime = md + .modified() + .ok() + .and_then(|t| t.duration_since(std::time::UNIX_EPOCH).ok()) + .map(|d| d.as_nanos()) + .unwrap_or(0); + Ok(ObjectMeta { + size: md.len(), + etag: Some(format!("{}-{}", md.len(), mtime)), + }) + } +} + +#[async_trait] +impl BlobStore for FsBlobStore { + fn backend_name(&self) -> &'static str { + "fs" + } + + async fn get(&self, key: &str) -> Result> { + let path = self.path_for(key)?; + match tokio::fs::read(&path).await { + Ok(data) => Ok(Some(Bytes::from(data))), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e).context(format!("reading {}", path.display())), + } + } + + async fn put(&self, key: &str, body: Bytes) -> Result { + let path = self.path_for(key)?; + let parent = path + .parent() + .context("blob path has no parent")? + .to_path_buf(); + // Unique temp name per write: a fixed suffix would let concurrent puts + // to the same key overwrite each other's temp file and corrupt the blob. + let tmp = path.with_extension(format!("{}.tmp-put", uuid::Uuid::new_v4())); + let path2 = path.clone(); + // Atomic write: temp file in the same dir, then rename into place. On any + // failure, remove the temp file so a failed write can't leak it. + tokio::task::spawn_blocking(move || -> Result<()> { + std::fs::create_dir_all(&parent).context("creating blob parent dir")?; + let write_and_swap = (|| -> Result<()> { + std::fs::write(&tmp, &body).context("writing temp blob")?; + std::fs::rename(&tmp, &path2).context("renaming blob into place")?; + Ok(()) + })(); + if write_and_swap.is_err() { + let _ = std::fs::remove_file(&tmp); + } + write_and_swap + }) + .await + .context("fs put task panicked")??; + Self::meta_of(&path) + } + + async fn head(&self, key: &str) -> Result> { + let path = self.path_for(key)?; + // Probe existence by io error kind, not path.exists(): a permission/IO + // error must surface, not be silently reported as "not found". + match std::fs::metadata(&path) { + Ok(_) => Self::meta_of(&path).map(Some), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(e).context(format!("stat {}", path.display())), + } + } + + async fn delete(&self, key: &str) -> Result<()> { + let path = self.path_for(key)?; + match tokio::fs::remove_file(&path).await { + Ok(()) => Ok(()), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(()), + Err(e) => Err(e).context(format!("deleting {}", path.display())), + } + } + + async fn list(&self, prefix: &str) -> Result> { + let root = self.root.clone(); + let prefix = prefix.to_string(); + tokio::task::spawn_blocking(move || -> Result> { + let mut keys = Vec::new(); + let mut stack = vec![root.clone()]; + while let Some(dir) = stack.pop() { + // Propagate read errors rather than skipping: a partial listing + // reported as success would mislead GC/admin/migration callers. + let rd = std::fs::read_dir(&dir) + .with_context(|| format!("listing {}", dir.display()))?; + for entry in rd { + // Propagate per-entry errors rather than dropping them via + // flatten(): a partial listing must not look like success. + let entry = + entry.with_context(|| format!("reading entry under {}", dir.display()))?; + let path = entry.path(); + if path.is_dir() { + stack.push(path); + } else if let Ok(rel) = path.strip_prefix(&root) { + let key = rel.to_string_lossy().replace('\\', "/"); + if key.starts_with(&prefix) { + keys.push(key); + } + } + } + } + Ok(keys) + }) + .await + .context("fs list task panicked")? + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn put_get_head_delete_list_roundtrip() { + let dir = tempfile::tempdir().unwrap(); + let store = FsBlobStore::new(dir.path()).unwrap(); + + // Absent key + assert!(store.get("repos/v1/a/x.tar.zst").await.unwrap().is_none()); + assert!(store.head("repos/v1/a/x.tar.zst").await.unwrap().is_none()); + + // Put then get + let body = Bytes::from_static(b"hello blob"); + let meta = store + .put("repos/v1/a/x.tar.zst", body.clone()) + .await + .unwrap(); + assert_eq!(meta.size, body.len() as u64); + assert!(meta.etag.is_some()); + let got = store.get("repos/v1/a/x.tar.zst").await.unwrap().unwrap(); + assert_eq!(got, body); + + // Head returns matching etag (stable across reads) + let h = store.head("repos/v1/a/x.tar.zst").await.unwrap().unwrap(); + assert_eq!(h.etag, meta.etag); + + // List by prefix + store + .put("repos/v1/b/y.tar.zst", Bytes::from_static(b"y")) + .await + .unwrap(); + let mut keys = store.list("repos/v1/").await.unwrap(); + keys.sort(); + assert_eq!(keys, vec!["repos/v1/a/x.tar.zst", "repos/v1/b/y.tar.zst"]); + + // Delete is idempotent + store.delete("repos/v1/a/x.tar.zst").await.unwrap(); + store.delete("repos/v1/a/x.tar.zst").await.unwrap(); + assert!(store.get("repos/v1/a/x.tar.zst").await.unwrap().is_none()); + } + + #[tokio::test] + async fn rejects_key_traversal() { + let dir = tempfile::tempdir().unwrap(); + let store = FsBlobStore::new(dir.path()).unwrap(); + assert!(store.get("../escape").await.is_err()); + assert!(store.put("a/../../etc/passwd", Bytes::new()).await.is_err()); + } + + #[tokio::test] + async fn concurrent_puts_same_key_do_not_corrupt_or_leak_temps() { + let dir = tempfile::tempdir().unwrap(); + let store = FsBlobStore::new(dir.path()).unwrap(); + let key = "repos/v1/a/x.tar.zst"; + let body = Bytes::from_static(b"the-one-true-blob"); + + // Many concurrent writers of the same key: with a fixed temp name they + // would clobber each other's temp file mid-write and corrupt the result. + let mut handles = Vec::new(); + for _ in 0..16 { + let store = store.clone(); + let body = body.clone(); + handles.push(tokio::spawn(async move { store.put(key, body).await })); + } + for h in handles { + h.await.unwrap().unwrap(); + } + + // Final content is intact... + assert_eq!(store.get(key).await.unwrap().unwrap(), body); + // ...and no unique-suffixed temp files were left behind. + let leftovers: Vec = store + .list("repos/v1/") + .await + .unwrap() + .into_iter() + .filter(|k| k.contains("tmp-put")) + .collect(); + assert!(leftovers.is_empty(), "leaked temp files: {leftovers:?}"); + } +} diff --git a/crates/gitlawb-node/src/storage/ipfs.rs b/crates/gitlawb-node/src/storage/ipfs.rs new file mode 100644 index 0000000..46bc92d --- /dev/null +++ b/crates/gitlawb-node/src/storage/ipfs.rs @@ -0,0 +1,223 @@ +//! IPFS (content-addressed) blob backend over a Kubo node's Mutable File System. +//! +//! Kubo's MFS (`/api/v0/files/*`) provides a path-addressed, mutable namespace +//! backed by content-addressed IPFS objects — a natural fit for a key→blob store. +//! Each object's etag is its IPFS CID (from `files/stat`), giving true +//! content-addressing for the skip-redundant-download optimization. +//! +//! Requires a reachable Kubo HTTP API (`GITLAWB_IPFS_API`, e.g. +//! `http://127.0.0.1:5001`). Objects written here are also retrievable by CID +//! from the wider IPFS network once pinned/announced by the node. + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use bytes::Bytes; + +use super::{validate_key, BlobStore, ObjectMeta}; + +#[derive(Clone)] +pub struct IpfsBlobStore { + api: String, + client: reqwest::Client, +} + +impl IpfsBlobStore { + pub fn new(api: &str) -> Result { + // Bound requests so an unresponsive Kubo API can't hang push/write flows + // indefinitely. connect_timeout guards the dial; the generous total + // timeout still allows large repo-archive transfers. + let client = reqwest::Client::builder() + .connect_timeout(std::time::Duration::from_secs(5)) + .timeout(std::time::Duration::from_secs(300)) + .build() + .context("building IPFS HTTP client")?; + Ok(Self { + api: api.trim_end_matches('/').to_string(), + client, + }) + } + + /// MFS path for a key: `/gitlawb/` (namespaced to avoid clobbering + /// other MFS users on a shared node). + fn mfs_path(key: &str) -> String { + format!("/gitlawb/{key}") + } +} + +#[async_trait] +impl BlobStore for IpfsBlobStore { + fn backend_name(&self) -> &'static str { + "ipfs" + } + + async fn get(&self, key: &str) -> Result> { + validate_key(key)?; + let url = format!("{}/api/v0/files/read", self.api); + let resp = self + .client + .post(&url) + .query(&[("arg", Self::mfs_path(key).as_str())]) + .send() + .await + .context("IPFS files/read")?; + if resp.status().is_success() { + Ok(Some(resp.bytes().await.context("reading IPFS body")?)) + } else { + // Kubo returns 500 with a JSON message when the path is absent. + let body = resp.text().await.unwrap_or_default(); + if body.contains("does not exist") || body.contains("no link named") { + Ok(None) + } else { + anyhow::bail!("IPFS files/read {key}: {body}") + } + } + } + + async fn put(&self, key: &str, body: Bytes) -> Result { + validate_key(key)?; + let size = body.len() as u64; + let url = format!("{}/api/v0/files/write", self.api); + // Stream the body instead of copying it via to_vec — avoids doubling + // peak memory for large archives. Length is known, so set it explicitly. + let part = reqwest::multipart::Part::stream_with_length(reqwest::Body::from(body), size) + .file_name("blob"); + let form = reqwest::multipart::Form::new().part("data", part); + let resp = self + .client + .post(&url) + .query(&[ + ("arg", Self::mfs_path(key).as_str()), + ("create", "true"), + ("parents", "true"), + ("truncate", "true"), + ]) + .multipart(form) + .send() + .await + .context("IPFS files/write")?; + if !resp.status().is_success() { + let status = resp.status(); + let b = resp.text().await.unwrap_or_default(); + anyhow::bail!("IPFS files/write {key} returned {status}: {b}"); + } + // etag = CID from stat. The write already succeeded, so a failed stat + // must not fail the put — just return without an etag (callers treat a + // missing etag as "always re-check", never as a lost write). + let etag = match self.head(key).await { + Ok(m) => m.and_then(|m| m.etag), + Err(e) => { + tracing::warn!(key = %key, err = %e, "IPFS stat after write failed — returning no etag"); + None + } + }; + Ok(ObjectMeta { size, etag }) + } + + async fn head(&self, key: &str) -> Result> { + validate_key(key)?; + let url = format!("{}/api/v0/files/stat", self.api); + let resp = self + .client + .post(&url) + .query(&[("arg", Self::mfs_path(key).as_str())]) + .send() + .await + .context("IPFS files/stat")?; + if resp.status().is_success() { + let v: serde_json::Value = resp.json().await.context("parsing files/stat")?; + Ok(Some(ObjectMeta { + size: v.get("Size").and_then(|s| s.as_u64()).unwrap_or(0), + etag: v + .get("Hash") + .and_then(|h| h.as_str()) + .map(|s| s.to_string()), + })) + } else { + let body = resp.text().await.unwrap_or_default(); + if body.contains("does not exist") || body.contains("no link named") { + Ok(None) + } else { + anyhow::bail!("IPFS files/stat {key}: {body}") + } + } + } + + async fn delete(&self, key: &str) -> Result<()> { + validate_key(key)?; + let url = format!("{}/api/v0/files/rm", self.api); + let resp = self + .client + .post(&url) + .query(&[("arg", Self::mfs_path(key).as_str()), ("force", "true")]) + .send() + .await + .context("IPFS files/rm")?; + if resp.status().is_success() { + Ok(()) + } else { + let body = resp.text().await.unwrap_or_default(); + if body.contains("does not exist") || body.contains("no link named") { + Ok(()) + } else { + anyhow::bail!("IPFS files/rm {key}: {body}") + } + } + } + + async fn list(&self, prefix: &str) -> Result> { + // Reject traversal in the prefix before it's spliced into an MFS path + // (matches get/put/head/delete, which validate via validate_key). + if prefix.split('/').any(|seg| seg == ".." || seg == ".") { + anyhow::bail!("list prefix contains traversal segment: {prefix}"); + } + // Best-effort recursive walk of the MFS subtree under `prefix`. + let mut keys = Vec::new(); + let mut stack = vec![prefix.trim_end_matches('/').to_string()]; + while let Some(rel) = stack.pop() { + let url = format!("{}/api/v0/files/ls", self.api); + let mfs = if rel.is_empty() { + "/gitlawb".to_string() + } else { + Self::mfs_path(&rel) + }; + let resp = self + .client + .post(&url) + .query(&[("arg", mfs.as_str()), ("long", "true")]) + .send() + .await + .context("IPFS files/ls")?; + if !resp.status().is_success() { + // Distinguish "this subtree doesn't exist" (fine — nothing to + // list) from real auth/network/server errors, which must surface + // rather than masquerade as an empty listing. + let body = resp.text().await.unwrap_or_default(); + if body.contains("does not exist") || body.contains("no link named") { + continue; + } + anyhow::bail!("IPFS files/ls {mfs}: {body}"); + } + let v: serde_json::Value = resp.json().await.context("parsing files/ls")?; + if let Some(entries) = v.get("Entries").and_then(|e| e.as_array()) { + for entry in entries { + let name = entry.get("Name").and_then(|n| n.as_str()).unwrap_or(""); + if name.is_empty() { + continue; + } + let child = if rel.is_empty() { + name.to_string() + } else { + format!("{rel}/{name}") + }; + // Type 1 = directory in Kubo's MFS ls. + if entry.get("Type").and_then(|t| t.as_u64()) == Some(1) { + stack.push(child); + } else { + keys.push(child); + } + } + } + } + Ok(keys) + } +} diff --git a/crates/gitlawb-node/src/storage/mod.rs b/crates/gitlawb-node/src/storage/mod.rs new file mode 100644 index 0000000..0a27242 --- /dev/null +++ b/crates/gitlawb-node/src/storage/mod.rs @@ -0,0 +1,155 @@ +//! Storage-agnostic blob layer. +//! +//! Repos are persisted to a pluggable object store behind the [`BlobStore`] +//! trait. Backends: +//! - [`s3::S3BlobStore`] — any S3-compatible service (Tigris, Cloudflare R2, +//! AWS S3, MinIO, Backblaze B2). Selected by default when a bucket is set. +//! - [`fs::FsBlobStore`] — a local/mounted directory; for self-hosters & tests. +//! - [`ipfs::IpfsBlobStore`] — content-addressed storage over a Kubo (IPFS) node +//! using its Mutable File System (MFS) for a key→blob namespace. +//! +//! Higher layers ([`archive::RepoArchive`]) compose a bare repo into a single +//! `repos/v1/{slug}/{repo}.tar.zst` object on top of whichever backend is active, +//! so the repo-storage semantics are identical regardless of backend. + +use std::sync::Arc; + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use bytes::Bytes; +use tracing::info; + +use crate::config::Config; + +pub mod archive; +pub mod fs; +pub mod ipfs; +pub mod s3; + +/// Metadata about a stored object. `etag` is an opaque change-detection token +/// (S3 ETag, IPFS CID, or a size/mtime fingerprint for the filesystem backend). +#[derive(Debug, Clone)] +pub struct ObjectMeta { + pub size: u64, + pub etag: Option, +} + +/// A backend-agnostic key→bytes object store. +/// +/// Keys are forward-slash-delimited paths (e.g. `repos/v1/slug/repo.tar.zst`). +/// Implementations must reject `..` traversal in keys. +#[async_trait] +pub trait BlobStore: Send + Sync { + /// Short backend name, for logs. + fn backend_name(&self) -> &'static str; + + /// Fetch an object. Returns `None` if the key does not exist. + async fn get(&self, key: &str) -> Result>; + + /// Store an object, returning its metadata (including the new etag). + async fn put(&self, key: &str, body: Bytes) -> Result; + + /// Fetch object metadata without the body. Returns `None` if absent. + async fn head(&self, key: &str) -> Result>; + + /// Delete an object. Succeeds (no-op) if the key does not exist. + async fn delete(&self, key: &str) -> Result<()>; + + /// List object keys under a prefix. Part of the backend interface (and + /// implemented by every backend) for future admin/GC/migration use; not yet + /// wired to a caller, hence the allow. + #[allow(dead_code)] + async fn list(&self, prefix: &str) -> Result>; +} + +/// Build the configured blob store. +/// +/// Returns `Ok(None)` only when no backend is configured at all (local-only +/// passthrough mode). A misconfigured backend (missing required setting, or a +/// client that fails to construct) returns `Err`: we fail closed rather than +/// silently degrading to local-only, which would accept writes without the +/// intended durable backend and risk cross-node persistence drift. +/// +/// Note: this validates configuration and client construction, not live +/// connectivity — e.g. the S3 client builds successfully against an unreachable +/// or wrong bucket, and that surfaces as an error on the first real request. +/// +/// Selection order: +/// 1. Explicit `GITLAWB_STORAGE_BACKEND` (`s3` | `fs` | `ipfs`). +/// 2. Auto: `s3` if a bucket is configured (incl. legacy `GITLAWB_TIGRIS_BUCKET`), +/// else `fs` if `GITLAWB_STORAGE_FS_DIR` is set, +/// else `ipfs` if `GITLAWB_IPFS_API` is set, +/// else local-only. +pub async fn build(config: &Config) -> Result>> { + let bucket = if !config.s3_bucket.is_empty() { + config.s3_bucket.clone() + } else { + config.tigris_bucket.clone() + }; + + let backend = if !config.storage_backend.is_empty() { + config.storage_backend.to_ascii_lowercase() + } else if !bucket.is_empty() { + "s3".to_string() + } else if !config.storage_fs_dir.is_empty() { + "fs".to_string() + } else if !config.ipfs_api.is_empty() { + "ipfs".to_string() + } else { + info!("object storage disabled (no backend configured) — local-only mode"); + return Ok(None); + }; + + // A backend was selected (explicitly or by auto-detection); fail closed from + // here — a missing required setting or an init failure is a hard error. + match backend.as_str() { + "s3" => { + if bucket.is_empty() { + anyhow::bail!( + "storage backend=s3 but no bucket configured (set GITLAWB_S3_BUCKET)" + ); + } + let endpoint = (!config.s3_endpoint.is_empty()).then(|| config.s3_endpoint.clone()); + let s = s3::S3BlobStore::new(&bucket, endpoint, config.s3_force_path_style) + .await + .context("initializing S3 storage")?; + info!(bucket = %bucket, backend = "s3", "object storage enabled"); + Ok(Some(Arc::new(s) as Arc)) + } + "fs" => { + if config.storage_fs_dir.is_empty() { + anyhow::bail!("storage backend=fs but GITLAWB_STORAGE_FS_DIR is empty"); + } + let s = fs::FsBlobStore::new(&config.storage_fs_dir) + .context("initializing filesystem storage")?; + info!(dir = %config.storage_fs_dir, backend = "fs", "object storage enabled"); + Ok(Some(Arc::new(s) as Arc)) + } + "ipfs" => { + if config.ipfs_api.is_empty() { + anyhow::bail!("storage backend=ipfs but GITLAWB_IPFS_API is empty"); + } + let s = + ipfs::IpfsBlobStore::new(&config.ipfs_api).context("initializing IPFS storage")?; + info!(api = %config.ipfs_api, backend = "ipfs", "object storage enabled"); + Ok(Some(Arc::new(s) as Arc)) + } + other => { + anyhow::bail!("unknown GITLAWB_STORAGE_BACKEND: {other}"); + } + } +} + +/// Reject keys that could escape the namespace (`..`) or are absolute. +pub(crate) fn validate_key(key: &str) -> Result<()> { + if key.is_empty() { + anyhow::bail!("blob key is empty"); + } + if key.split('/').any(|seg| seg == ".." || seg == ".") { + anyhow::bail!("blob key contains traversal segment: {key}"); + } + if key.starts_with('/') || key.contains('\0') { + anyhow::bail!("blob key is absolute or contains null byte: {key}"); + } + Ok(()) +} diff --git a/crates/gitlawb-node/src/storage/s3.rs b/crates/gitlawb-node/src/storage/s3.rs new file mode 100644 index 0000000..1c07b47 --- /dev/null +++ b/crates/gitlawb-node/src/storage/s3.rs @@ -0,0 +1,176 @@ +//! S3-compatible blob backend. +//! +//! Works with any S3 API implementation: Tigris, Cloudflare R2, AWS S3, MinIO, +//! Backblaze B2. Credentials and (for Tigris on Fly) the endpoint are read from +//! the standard AWS env vars (`AWS_ACCESS_KEY_ID`, `AWS_SECRET_ACCESS_KEY`, +//! `AWS_ENDPOINT_URL_S3`, `AWS_REGION`). `endpoint`/`force_path_style` override +//! those for self-hosted services like MinIO. + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use aws_sdk_s3::Client as S3Client; +use bytes::Bytes; +use tracing::debug; + +use super::{validate_key, BlobStore, ObjectMeta}; + +#[derive(Clone)] +pub struct S3BlobStore { + s3: S3Client, + bucket: String, +} + +impl S3BlobStore { + /// Build a client. `endpoint` overrides `AWS_ENDPOINT_URL_S3` (for R2/MinIO); + /// `force_path_style` is required by MinIO and some S3-compatibles. + pub async fn new( + bucket: &str, + endpoint: Option, + force_path_style: bool, + ) -> Result { + let shared = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await; + let mut builder = aws_sdk_s3::config::Builder::from(&shared); + if let Some(ep) = endpoint { + builder = builder.endpoint_url(ep); + } + if force_path_style { + builder = builder.force_path_style(true); + } + // Bound requests so a hung endpoint can't block the calling task forever + // (under async_upload it would hold the advisory lock and stall every + // later push). attempt_timeout bounds a single try; operation_timeout + // bounds the whole call incl. retries — generous enough for large + // archive transfers. + let timeouts = aws_sdk_s3::config::timeout::TimeoutConfig::builder() + .operation_attempt_timeout(std::time::Duration::from_secs(60)) + .operation_timeout(std::time::Duration::from_secs(300)) + .build(); + builder = builder.timeout_config(timeouts); + let s3 = S3Client::from_conf(builder.build()); + Ok(Self { + s3, + bucket: bucket.to_string(), + }) + } +} + +#[async_trait] +impl BlobStore for S3BlobStore { + fn backend_name(&self) -> &'static str { + "s3" + } + + async fn get(&self, key: &str) -> Result> { + validate_key(key)?; + match self + .s3 + .get_object() + .bucket(&self.bucket) + .key(key) + .send() + .await + { + Ok(resp) => { + let data = resp + .body + .collect() + .await + .context("reading S3 response body")? + .into_bytes(); + Ok(Some(data)) + } + Err(e) => { + if e.as_service_error().is_some_and(|e| e.is_no_such_key()) { + Ok(None) + } else { + Err(anyhow::anyhow!("S3 GET {key}: {e}")) + } + } + } + } + + async fn put(&self, key: &str, body: Bytes) -> Result { + validate_key(key)?; + let size = body.len() as u64; + let resp = self + .s3 + .put_object() + .bucket(&self.bucket) + .key(key) + .body(aws_sdk_s3::primitives::ByteStream::from(body)) + .send() + .await + .context(format!("S3 PUT {key}"))?; + debug!(key = %key, size, "s3 put"); + Ok(ObjectMeta { + size, + etag: resp.e_tag().map(|s| s.to_string()), + }) + } + + async fn head(&self, key: &str) -> Result> { + validate_key(key)?; + match self + .s3 + .head_object() + .bucket(&self.bucket) + .key(key) + .send() + .await + { + Ok(resp) => Ok(Some(ObjectMeta { + size: resp.content_length().unwrap_or(0).max(0) as u64, + etag: resp.e_tag().map(|s| s.to_string()), + })), + Err(e) => { + if e.as_service_error().is_some_and(|e| e.is_not_found()) { + Ok(None) + } else { + Err(anyhow::anyhow!("S3 HEAD {key}: {e}")) + } + } + } + } + + async fn delete(&self, key: &str) -> Result<()> { + validate_key(key)?; + self.s3 + .delete_object() + .bucket(&self.bucket) + .key(key) + .send() + .await + .context(format!("S3 DELETE {key}"))?; + Ok(()) + } + + async fn list(&self, prefix: &str) -> Result> { + let mut keys = Vec::new(); + let mut continuation: Option = None; + loop { + let mut req = self + .s3 + .list_objects_v2() + .bucket(&self.bucket) + .prefix(prefix); + if let Some(token) = continuation.take() { + req = req.continuation_token(token); + } + let resp = req.send().await.context(format!("S3 LIST {prefix}"))?; + for obj in resp.contents() { + if let Some(k) = obj.key() { + keys.push(k.to_string()); + } + } + if resp.is_truncated().unwrap_or(false) { + continuation = resp.next_continuation_token().map(|s| s.to_string()); + if continuation.is_none() { + break; + } + } else { + break; + } + } + Ok(keys) + } +}