Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions crates/gitlawb-node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ hmac = { workspace = true }
http-body-util = "0.1"
tokio-util = { version = "0.7", features = ["io"] }
dirs-next = "2"
# NFC-normalize visibility globs/paths at the matcher seam so a deny rule
# withholds a canonically-equivalent path regardless of NFC/NFD form (#101).
unicode-normalization = "0.1"
reqwest = { workspace = true }
libp2p-core = "0.43"
libp2p-gossipsub = "0.49.4"
Expand Down
193 changes: 136 additions & 57 deletions crates/gitlawb-node/src/api/repos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ async fn replication_withheld_set(
disk_path: std::path::PathBuf,
) -> (bool, Option<std::collections::HashSet<String>>) {
let announce = match &rules {
Some(rules) => visibility_check(rules, is_public, owner_did, None, "/") == Decision::Allow,
Some(rules) => crate::visibility::listable_at_root(rules, is_public, owner_did, None),
None => false,
};
if !announce {
Expand Down Expand Up @@ -93,6 +93,43 @@ async fn replication_withheld_set(
}
}

/// The replicable object set for a full-scan pin fallback, failing closed (#99).
/// The full-scan candidate set includes dangling objects the reachable-only
/// withheld set never classified, so compute the reachable visibility-allowed
/// blob set and the all-blob universe off the async worker and keep only
/// non-blobs plus allowed blobs. Any error in either walk (or a task panic)
/// pins nothing this push, mirroring the degraded-path shape of
/// `replication_withheld_set`.
async fn fail_closed_full_scan_objects(
disk_path: std::path::PathBuf,
rules: Vec<crate::db::VisibilityRule>,
is_public: bool,
owner_did: String,
candidates: Vec<String>,
) -> Vec<String> {
tokio::task::spawn_blocking(move || -> anyhow::Result<Vec<String>> {
let allowed = crate::git::visibility_pack::replicable_blob_set(
&disk_path, &rules, is_public, &owner_did,
)?;
let all_blobs = crate::git::push_delta::all_blob_oids(&disk_path)?;
Ok(crate::git::visibility_pack::replicable_objects_fail_closed(
candidates, &allowed, &all_blobs,
))
})
.await
.map_err(|e| {
tracing::warn!(err = %e, "fail-closed blob walk task panicked; pinning nothing this push")
})
.ok()
.and_then(|r| {
r.map_err(|e| {
tracing::warn!(err = %e, "fail-closed blob walk failed; pinning nothing this push")
})
.ok()
})
.unwrap_or_default()
}

// ── Request / Response types ───────────────────────────────────────────────

#[derive(Debug, Deserialize)]
Expand Down Expand Up @@ -207,50 +244,61 @@ pub struct ListReposQuery {
/// present, returns one page and the `X-Total-Count` response header carries the
/// total matching row count. Without `limit`, falls back to returning every row
/// (kept for backwards compat with peer sync and existing CLI tooling).
///
/// Every returned row passes the per-caller `"/"` visibility gate
/// (`crate::visibility::listable_at_root`), the same decision the per-repo
/// content endpoints make, so neither the page nor `X-Total-Count` leaks a repo
/// (or its mere count) the caller may not read (#97).
pub async fn list_repos(
State(state): State<AppState>,
Query(query): Query<ListReposQuery>,
auth: Option<Extension<AuthenticatedDid>>,
) -> Result<Response> {
use axum::http::HeaderValue;
use axum::response::IntoResponse;

if let Some(raw_limit) = query.limit {
let limit = raw_limit.clamp(1, 200);
let offset = query.offset.unwrap_or(0).max(0);
let (rows, total) = state
.db
.list_all_repos_paged(query.owner.as_deref(), limit, offset)
.await?;
let body: Vec<RepoResponse> = rows
.into_iter()
.map(|(r, stars)| to_response(&r, &state, stars))
.collect();
let mut response = Json(body).into_response();
response.headers_mut().insert(
"X-Total-Count",
HeaderValue::from_str(&total.to_string()).unwrap_or(HeaderValue::from_static("0")),
);
return Ok(response);
}
let caller = auth.as_ref().map(|e| e.0 .0.as_str());

let repos = state.db.list_all_repos_with_stars().await?;
let filtered: Vec<(RepoRecord, i64)> = repos
// Over-fetch the deduped set (did:key-aware DEDUP_CTE collapses mirror rows),
// then apply the per-repo "/" visibility gate in Rust BEFORE pagination so
// neither the page nor X-Total-Count leaks a repo the caller may not read —
// including its mere count. The "/" decision depends on owner short/full-DID
// matching and JSON reader-DID membership, so it cannot be a clean SQL
// predicate without drifting from visibility_check; the count is derived from
// the visible set (#97).
let owner_filtered = state
.db
.list_all_repos_deduped_with_stars(query.owner.as_deref())
.await?;

let ids: Vec<String> = owner_filtered.iter().map(|(r, _)| r.id.clone()).collect();
let rules_by_repo = state.db.list_visibility_rules_for_repos(&ids).await?;
let visible: Vec<(crate::db::RepoRecord, i64)> = owner_filtered
.into_iter()
.filter(|(r, _)| {
if let Some(owner) = &query.owner {
crate::api::did_matches(owner.as_str(), &r.owner_did)
} else {
true
}
let rules = rules_by_repo.get(&r.id).map(Vec::as_slice).unwrap_or(&[]);
crate::visibility::listable_at_root(rules, r.is_public, &r.owner_did, caller)
})
.collect();
let deduped = dedupe_canonical_repos(filtered);
let total = deduped.len() as i64;
let resp: Vec<_> = deduped

let total = visible.len() as i64;

// Paginate in Rust when a limit is set: SQL LIMIT/OFFSET cannot run before
// the visibility filter without returning short pages and a leaked count.
let page: Vec<(crate::db::RepoRecord, i64)> = match query.limit {
Some(raw_limit) => {
let limit = raw_limit.clamp(1, 200) as usize;
let offset = query.offset.unwrap_or(0).max(0) as usize;
visible.into_iter().skip(offset).take(limit).collect()
}
None => visible,
};

let body: Vec<RepoResponse> = page
.into_iter()
.map(|(r, stars)| to_response(&r, &state, stars))
.collect();
let mut response = Json(resp).into_response();
let mut response = Json(body).into_response();
response.headers_mut().insert(
"X-Total-Count",
HeaderValue::from_str(&total.to_string()).unwrap_or(HeaderValue::from_static("0")),
Expand Down Expand Up @@ -882,12 +930,14 @@ pub async fn git_receive_pack(
)
.await;

// Compute the per-push pin candidate set once, off the async worker (git
// subprocess). On the happy path this is the push delta (objects this push
// introduced); on any non-commit tip or git error it fails closed to a full
// repo scan. Only needed when something will actually replicate. All
// degraded paths log inside the helper rather than failing silently.
let candidates: Vec<String> = if withheld.is_some() {
// Resolve the per-push pin candidate set once, off the async worker, then
// filter to what may actually replicate. Delta path: the reachable-only
// `withheld` set suffices (delta objects are reachable). Full-scan path: the
// candidate set can include dangling blobs the withheld set never classified,
// so fail closed — replicate a blob only if it is reachable AND
// visibility-allowed (#99). Only computed when something will actually
// replicate; every degraded path logs rather than failing silently.
let object_list: Vec<String> = if let Some(withheld_set) = withheld.clone() {
let new_tips: Vec<String> = ref_updates
.iter()
.map(|u| u.new_sha.clone())
Expand All @@ -898,16 +948,32 @@ pub async fn git_receive_pack(
.map(|u| u.old_sha.clone())
.filter(|s| s != ZERO_SHA)
.collect();
crate::git::push_delta::resolve_candidates_for_push(disk_path.clone(), new_tips, old_tips)
let pin_set = crate::git::push_delta::resolve_candidates_for_push(
disk_path.clone(),
new_tips,
old_tips,
)
.await;
if pin_set.full_scan {
fail_closed_full_scan_objects(
disk_path.clone(),
rules_opt.clone().unwrap_or_default(),
record.is_public,
record.owner_did.clone(),
pin_set.candidates,
)
.await
} else {
crate::git::visibility_pack::replicable_objects(pin_set.candidates, &withheld_set)
}
} else {
Vec::new()
};

// Pin new git objects to the local IPFS node (no-op if ipfs_api is empty).
// Skipped entirely when the public cannot read the repo (withheld == None).
if let Some(withheld_ipfs) = withheld.clone() {
let candidates_ipfs = candidates.clone();
if withheld.is_some() {
let object_list_ipfs = object_list.clone();
let ipfs_api = state.config.ipfs_api.clone();
let repo_path_clone = disk_path.clone();
let db_clone = state.db.clone();
Expand All @@ -924,9 +990,8 @@ pub async fn git_receive_pack(
let pinned = crate::ipfs_pin::pin_new_objects(
&ipfs_api,
&repo_path_clone,
candidates_ipfs,
object_list_ipfs,
&db_clone,
&withheld_ipfs,
)
.await;
if !pinned.is_empty() {
Expand Down Expand Up @@ -1030,23 +1095,21 @@ pub async fn git_receive_pack(
let owner_did_for_arweave = record.owner_did.clone();
let self_public_url = state.config.public_url.clone();
let node_keypair = Arc::clone(&state.node_keypair);
let withheld_pinata = withheld;
let candidates_pinata = candidates;
let object_list_pinata = object_list;
let do_pinata_replication = withheld.is_some();
tokio::spawn(async move {
let pinned = match &withheld_pinata {
Some(withheld) => {
crate::pinata::pin_new_objects(
&http_client,
&pinata_upload_url,
&pinata_jwt,
&repo_path_clone,
candidates_pinata,
&db_clone,
withheld,
)
.await
}
None => Vec::new(),
let pinned = if do_pinata_replication {
crate::pinata::pin_new_objects(
&http_client,
&pinata_upload_url,
&pinata_jwt,
&repo_path_clone,
object_list_pinata,
&db_clone,
)
.await
} else {
Vec::new()
};

if !pinned.is_empty() {
Expand Down Expand Up @@ -1203,8 +1266,24 @@ pub async fn list_refs(
/// which node hosts it. Results from unreachable peers are silently omitted.
pub async fn list_federated_repos(
State(state): State<AppState>,
auth: Option<Extension<AuthenticatedDid>>,
) -> Result<Json<serde_json::Value>> {
let caller = auth.as_ref().map(|e| e.0 .0.as_str());
let local_repos = dedupe_canonical_repos(state.db.list_all_repos_with_stars().await?);

// Hide local repos the caller may not read at "/" before federating them, so
// the federated surface does not enumerate private repos (#97). Peer repos
// arrive already filtered by each peer's own /api/v1/repos (anonymous view).
let ids: Vec<String> = local_repos.iter().map(|(r, _)| r.id.clone()).collect();
let rules_by_repo = state.db.list_visibility_rules_for_repos(&ids).await?;
let local_repos: Vec<(crate::db::RepoRecord, i64)> = local_repos
.into_iter()
.filter(|(r, _)| {
let rules = rules_by_repo.get(&r.id).map(Vec::as_slice).unwrap_or(&[]);
crate::visibility::listable_at_root(rules, r.is_public, &r.owner_did, caller)
})
.collect();

let local_node_url = state
.config
.public_url
Expand Down
Loading
Loading