From 540f95293dfa805b099ce2c2892028ee0225546f Mon Sep 17 00:00:00 2001 From: Kaylee <65376239+KayleeWilliams@users.noreply.github.com> Date: Thu, 2 Jul 2026 10:18:13 +0100 Subject: [PATCH 1/3] Make generation safe under concurrent invocation against a shared outDir MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Parallel task graphs (lint/typecheck/build each depending on docs generation) race on the shared output directory, causing intermittent partial reads, ENOENT on just-replaced files, and half-written outputs. - Write every generated artifact to a temp sibling and atomically rename into place (new internal writeFileAtomic/copyFileAtomic), so concurrent readers see old or new content, never a truncated file. - Close delete-then-recreate windows: agent-skills and mounted markdown mirrors now write new files first and prune stale entries after, instead of rm -rf'ing a live directory before rebuilding it. - Serialize `leadtype generate` runs per outDir with a cross-process mkdir lock under os.tmpdir() (mtime keepalive, 10min stale reclaim, 5min wait timeout that fails loudly). LEADTYPE_NO_LOCK=1 opts out. - Document the concurrency semantics on the generate pipeline page. Fixes #117 πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .changeset/concurrent-generate-safety.md | 25 +++ docs/pipeline/generate-static-artifacts.mdx | 11 ++ packages/leadtype/src/cli/generate.ts | 45 +++++- .../src/convert/convert-concurrency.test.ts | 95 +++++++++++ packages/leadtype/src/convert/convert.ts | 7 +- packages/leadtype/src/feed/index.ts | 5 +- .../leadtype/src/internal/atomic-fs.test.ts | 98 ++++++++++++ packages/leadtype/src/internal/atomic-fs.ts | 64 ++++++++ .../src/internal/generate-lock.test.ts | 122 ++++++++++++++ .../leadtype/src/internal/generate-lock.ts | 151 ++++++++++++++++++ packages/leadtype/src/llm/llm.ts | 53 +++--- packages/leadtype/src/llm/skills.ts | 37 +++-- packages/leadtype/src/mcp/card.ts | 9 +- packages/leadtype/src/nlweb/artifacts.ts | 7 +- packages/leadtype/src/search/node.ts | 7 +- packages/leadtype/src/sync/sync.ts | 5 +- 16 files changed, 678 insertions(+), 63 deletions(-) create mode 100644 .changeset/concurrent-generate-safety.md create mode 100644 packages/leadtype/src/convert/convert-concurrency.test.ts create mode 100644 packages/leadtype/src/internal/atomic-fs.test.ts create mode 100644 packages/leadtype/src/internal/atomic-fs.ts create mode 100644 packages/leadtype/src/internal/generate-lock.test.ts create mode 100644 packages/leadtype/src/internal/generate-lock.ts diff --git a/.changeset/concurrent-generate-safety.md b/.changeset/concurrent-generate-safety.md new file mode 100644 index 00000000..8669c976 --- /dev/null +++ b/.changeset/concurrent-generate-safety.md @@ -0,0 +1,25 @@ +--- +"leadtype": patch +--- + +Make generation safe to invoke concurrently against a shared `outDir`. + +Parallel task graphs (lint, typecheck, and build each depending on "docs are +generated") used to race on the shared output directory, causing intermittent +partial reads, ENOENT on files another run had just replaced, and half-written +artifacts. + +- Every generated artifact (converted `docs/*.md`, `llms.txt`, `llms-full.txt`, + search index, sitemaps, robots, feeds, MCP card, NLWeb, skills, sync + manifests) is now written to a temp sibling and atomically renamed into + place, so concurrent readers see the old content or the new content β€” never + a truncated file. +- Delete-then-recreate windows are gone: the agent-skills surface and mounted + markdown mirrors now write the new files first and prune stale ones after, + instead of `rm -rf`-ing a live directory before rebuilding it. +- `leadtype generate` runs are single-flight per output directory via a + cross-process lock stored under the system temp dir (keyed by the resolved + `--out` path). Concurrent invocations wait for the in-flight run; locks + abandoned by crashed runs are reclaimed after 10 minutes, and waiting runs + fail loudly after 5 minutes instead of hanging CI. Set `LEADTYPE_NO_LOCK=1` + to opt out. diff --git a/docs/pipeline/generate-static-artifacts.mdx b/docs/pipeline/generate-static-artifacts.mdx index aa90ce47..3f427334 100644 --- a/docs/pipeline/generate-static-artifacts.mdx +++ b/docs/pipeline/generate-static-artifacts.mdx @@ -158,6 +158,17 @@ npx leadtype lint docs --error-unknown npx leadtype generate --src . --out public --base-url https://docs.example.com ``` +## Concurrent invocation + +Parallel task graphs often invoke generation more than once at the same time β€” lint, typecheck, and build each declaring "docs are generated" as a prerequisite. `leadtype generate` is safe under that pattern: + +- **Runs are single-flight per output directory.** A cross-process lock (keyed by the resolved `--out` path, stored under the system temp dir β€” never inside the output directory) serializes concurrent runs. Later invocations wait for the in-flight run, then regenerate. Locks abandoned by crashed runs are reclaimed automatically after 10 minutes; waiting runs fail loudly after 5 minutes rather than hanging CI. +- **Every artifact is replaced atomically.** Files are written to a temp sibling and renamed into place, so a sibling build step reading the output directory (`tsc`, `next build`, a dev server watching `public/`) sees the old artifact or the new one β€” never a truncated file, and never a missing file for pages that still exist. + +Set `LEADTYPE_NO_LOCK=1` to skip the lock β€” for example on network filesystems where directory-based locking is unreliable, or when your task runner already serializes generation. + +Even though concurrent runs are safe, they are redundant: each waits its turn and regenerates the same output. If your task graph supports it, prefer a single generation task that lint, typecheck, and build all depend on. + ## Use library APIs for custom pipelines The CLI is the happy path. Use the library APIs directly when you need custom plugin order, filters, or generated JSON paths. Keep conversion first β€” LLM files, search, and Agent Readability artifacts read the generated markdown: diff --git a/packages/leadtype/src/cli/generate.ts b/packages/leadtype/src/cli/generate.ts index 8b29170a..2d302e68 100644 --- a/packages/leadtype/src/cli/generate.ts +++ b/packages/leadtype/src/cli/generate.ts @@ -1,5 +1,5 @@ import { existsSync } from "node:fs"; -import { cp, mkdir, mkdtemp, readFile, rm, writeFile } from "node:fs/promises"; +import { cp, mkdir, mkdtemp, readFile, rm } from "node:fs/promises"; import { tmpdir } from "node:os"; import path from "node:path"; import { pathToFileURL } from "node:url"; @@ -8,6 +8,7 @@ import type { Pluggable, PluggableList } from "unified"; import { convertAllMdx } from "../convert"; import { type DocsFeedConfig, generateFeedArtifacts } from "../feed"; import { type DocsI18nManifest, normalizeDocsI18nConfig } from "../i18n"; +import { copyFileAtomic, writeFileAtomic } from "../internal/atomic-fs"; import { type DocsPathMount, normalizeBaseUrl, @@ -15,6 +16,10 @@ import { normalizeUrlPrefix, } from "../internal/docs-url"; import { parseFrontmatter } from "../internal/frontmatter"; +import { + acquireGenerateLock, + type GenerateLock, +} from "../internal/generate-lock"; import { logger, setLogFormat, @@ -2142,7 +2147,6 @@ async function copyMountedMarkdownMirrors( `Mounted URL prefix "${urlPrefix}" must resolve inside the output directory.` ); } - await rm(targetDir, { force: true, recursive: true }); const files = await fg("**/*.md", { absolute: false, cwd: sourceDir, @@ -2153,9 +2157,24 @@ async function copyMountedMarkdownMirrors( const sourcePath = path.join(sourceDir, file); const targetPath = path.join(targetDir, file); await mkdir(path.dirname(targetPath), { recursive: true }); - await cp(sourcePath, targetPath); + await copyFileAtomic(sourcePath, targetPath); }) ); + // Prune mirror files whose source pages no longer exist. Pruning after + // the copy (instead of rm -rf on the whole mirror before it) keeps the + // mirror readable throughout β€” a concurrent reader never sees the + // directory disappear mid-generation. + const currentFiles = new Set(files); + const mirroredFiles = await fg("**/*.md", { + absolute: false, + cwd: targetDir, + onlyFiles: true, + }); + await Promise.all( + mirroredFiles + .filter((file) => !currentFiles.has(file)) + .map((file) => rm(path.join(targetDir, file), { force: true })) + ); }) ); } @@ -2203,7 +2222,7 @@ async function copyDefaultLocaleMarkdownAliases( const sourcePath = path.join(defaultLocaleDir, file); const targetPath = path.join(docsDir, file); await mkdir(path.dirname(targetPath), { recursive: true }); - await cp(sourcePath, targetPath); + await copyFileAtomic(sourcePath, targetPath); }) ); await rm(defaultLocaleDir, { force: true, recursive: true }); @@ -2248,7 +2267,7 @@ async function writeI18nManifest( } const outputPath = path.join(outDir, DEFAULT_DOCS_DIR, "i18n-manifest.json"); await mkdir(path.dirname(outputPath), { recursive: true }); - await writeFile(outputPath, `${JSON.stringify(manifest, null, 2)}\n`); + await writeFileAtomic(outputPath, `${JSON.stringify(manifest, null, 2)}\n`); return outputPath; } @@ -2512,6 +2531,21 @@ export async function runGenerateCommand( return 1; } + // Serialize concurrent generate runs targeting the same outDir (parallel CI + // task graphs commonly fan out lint/typecheck/build, each regenerating docs). + // Atomic per-file writes keep individual artifacts readable at all times; + // the lock keeps whole runs from interleaving their read-back phases. + let generateLock: GenerateLock | undefined; + if (process.env.LEADTYPE_NO_LOCK !== "1") { + try { + generateLock = await acquireGenerateLock(outDir); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + reportFailure(message); + return 1; + } + } + let sourceMirror: SourceMirror | undefined; try { const metadata = await resolveGenerateMetadata( @@ -2944,6 +2978,7 @@ export async function runGenerateCommand( return 1; } finally { await sourceMirror?.cleanup(); + await generateLock?.release(); } return 0; } diff --git a/packages/leadtype/src/convert/convert-concurrency.test.ts b/packages/leadtype/src/convert/convert-concurrency.test.ts new file mode 100644 index 00000000..89058be9 --- /dev/null +++ b/packages/leadtype/src/convert/convert-concurrency.test.ts @@ -0,0 +1,95 @@ +import { + mkdir, + mkdtemp, + readdir, + readFile, + rm, + writeFile, +} from "node:fs/promises"; +import { tmpdir } from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { convertAllMdx } from "./convert"; + +const tempDirs: string[] = []; + +async function createTempProject(): Promise { + const dir = await mkdtemp(path.join(tmpdir(), "leadtype-convert-race-")); + tempDirs.push(dir); + return dir; +} + +afterEach(async () => { + await Promise.all( + tempDirs.splice(0).map((dir) => rm(dir, { force: true, recursive: true })) + ); +}); + +describe("convertAllMdx concurrency", () => { + it("keeps every output complete while concurrent runs share an outDir", async () => { + const dir = await createTempProject(); + const srcDir = path.join(dir, "docs"); + const outDir = path.join(dir, "public", "docs"); + const fileCount = 24; + const concurrentRuns = 3; + // Large enough that a truncating write would be observable mid-flight. + const filler = + "Some paragraph text that pads the document body.\n\n".repeat(200); + + await mkdir(srcDir, { recursive: true }); + await Promise.all( + Array.from({ length: fileCount }, (_, index) => + writeFile( + path.join(srcDir, `doc-${index}.mdx`), + `---\ntitle: "Doc ${index}"\n---\n\n# Doc ${index}\n\n${filler}\nEND-OF-DOC-${index}\n` + ) + ) + ); + + let runsSettled = false; + const runs = Promise.all( + Array.from({ length: concurrentRuns }, () => + convertAllMdx({ srcDir, outDir }) + ) + ).finally(() => { + runsSettled = true; + }); + + // Concurrent reader modeling a sibling build step (tsc, next build) + // reading the shared output directory while generation is in flight: + // every successfully read file must be complete, and a file must never + // disappear once it has been observed. + const seen = new Set(); + const reader = (async () => { + while (!runsSettled) { + for (let index = 0; index < fileCount; index++) { + const outputPath = path.join(outDir, `doc-${index}.md`); + try { + const content = await readFile(outputPath, "utf8"); + seen.add(index); + expect(content.startsWith("---")).toBe(true); + expect(content).toContain(`END-OF-DOC-${index}`); + } catch (error) { + expect((error as NodeJS.ErrnoException).code).toBe("ENOENT"); + expect(seen.has(index)).toBe(false); + } + } + } + })(); + + await Promise.all([runs, reader]); + + // Final state: every output present and complete, no temp files leaked. + const entries = await readdir(outDir); + expect(entries.sort()).toEqual( + Array.from({ length: fileCount }, (_, index) => `doc-${index}.md`).sort() + ); + for (let index = 0; index < fileCount; index++) { + const content = await readFile( + path.join(outDir, `doc-${index}.md`), + "utf8" + ); + expect(content).toContain(`END-OF-DOC-${index}`); + } + }); +}); diff --git a/packages/leadtype/src/convert/convert.ts b/packages/leadtype/src/convert/convert.ts index acd8e4f0..60424f04 100644 --- a/packages/leadtype/src/convert/convert.ts +++ b/packages/leadtype/src/convert/convert.ts @@ -1,6 +1,6 @@ import { execFile } from "node:child_process"; import { existsSync } from "node:fs"; -import { mkdir, readFile, realpath, writeFile } from "node:fs/promises"; +import { mkdir, readFile, realpath } from "node:fs/promises"; import { cpus } from "node:os"; import { basename, dirname, join, relative, resolve, sep } from "node:path"; import { performance } from "node:perf_hooks"; @@ -9,6 +9,7 @@ import type { Root } from "mdast"; import { mdxToMdast } from "satteri"; import { glob as fg } from "tinyglobby"; import type { PluggableList } from "unified"; +import { writeFileAtomic } from "../internal/atomic-fs"; import { deriveDocContext, resolvePlaceholderStrings, @@ -1117,7 +1118,7 @@ async function processMdxFile( } await mkdir(dirname(outputPath), { recursive: true }); - await writeFile(outputPath, markdown); + await writeFileAtomic(outputPath, markdown); if (!writeToStdout) { const ms = Date.now() - startedAt; @@ -1264,7 +1265,7 @@ export async function convertAllMdx( } ); const outputPath = deriveOutputPath(mdxFilePath, srcDir, outDir); - await writeFile(outputPath, markdown); + await writeFileAtomic(outputPath, markdown); logger.debug({ human: { message: `convert ${mdxFilePath} β†’ ${outputPath} (${Date.now() - fileStartedAt}ms)`, diff --git a/packages/leadtype/src/feed/index.ts b/packages/leadtype/src/feed/index.ts index 2ddd15d6..af1e5421 100644 --- a/packages/leadtype/src/feed/index.ts +++ b/packages/leadtype/src/feed/index.ts @@ -1,8 +1,9 @@ import { existsSync } from "node:fs"; -import { mkdir, readFile, stat, writeFile } from "node:fs/promises"; +import { mkdir, readFile, stat } from "node:fs/promises"; import path from "node:path"; import { glob as fg } from "tinyglobby"; import { type DocsI18nConfig, normalizeDocsI18nConfig } from "../i18n"; +import { writeFileAtomic } from "../internal/atomic-fs"; import { type DocsPathMount, normalizeBaseUrl, @@ -429,7 +430,7 @@ export async function generateFeedArtifacts( entries, }); await mkdir(path.dirname(outputPath), { recursive: true }); - await writeFile(outputPath, rendered); + await writeFileAtomic(outputPath, rendered); feedFiles[format] = outputPath; } files[feed.id] = feedFiles; diff --git a/packages/leadtype/src/internal/atomic-fs.test.ts b/packages/leadtype/src/internal/atomic-fs.test.ts new file mode 100644 index 00000000..0ddb108a --- /dev/null +++ b/packages/leadtype/src/internal/atomic-fs.test.ts @@ -0,0 +1,98 @@ +import { mkdtemp, readdir, readFile, rm, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { copyFileAtomic, writeFileAtomic } from "./atomic-fs"; + +const tempDirs: string[] = []; + +async function createTempDir(): Promise { + const dir = await mkdtemp(path.join(tmpdir(), "leadtype-atomic-fs-")); + tempDirs.push(dir); + return dir; +} + +afterEach(async () => { + await Promise.all( + tempDirs.splice(0).map((dir) => rm(dir, { force: true, recursive: true })) + ); +}); + +describe("writeFileAtomic", () => { + it("writes new files and replaces existing ones", async () => { + const dir = await createTempDir(); + const target = path.join(dir, "artifact.txt"); + + await writeFileAtomic(target, "first"); + expect(await readFile(target, "utf8")).toBe("first"); + + await writeFileAtomic(target, "second"); + expect(await readFile(target, "utf8")).toBe("second"); + }); + + it("leaves no temp files behind after writing", async () => { + const dir = await createTempDir(); + await writeFileAtomic(path.join(dir, "artifact.txt"), "content"); + const entries = await readdir(dir); + expect(entries).toEqual(["artifact.txt"]); + }); + + it("throws without creating the target when the directory is missing", async () => { + const dir = await createTempDir(); + const target = path.join(dir, "missing", "artifact.txt"); + await expect(writeFileAtomic(target, "content")).rejects.toThrow(); + }); + + it("never exposes partial content to concurrent readers", async () => { + const dir = await createTempDir(); + const target = path.join(dir, "artifact.txt"); + // Large enough that a non-atomic write would be observable mid-flight. + const sizeBytes = 4 * 1024 * 1024; + const payloadA = "a".repeat(sizeBytes); + const payloadB = "b".repeat(sizeBytes); + await writeFileAtomic(target, payloadA); + + const rounds = 25; + const writers = (async () => { + for (let round = 0; round < rounds; round++) { + await Promise.all([ + writeFileAtomic(target, payloadA), + writeFileAtomic(target, payloadB), + ]); + } + })(); + const readers = (async () => { + for (let round = 0; round < rounds * 4; round++) { + const seen = await readFile(target, "utf8"); + expect(seen === payloadA || seen === payloadB).toBe(true); + } + })(); + + await Promise.all([writers, readers]); + }); +}); + +describe("copyFileAtomic", () => { + it("replaces the target with a copy of the source", async () => { + const dir = await createTempDir(); + const source = path.join(dir, "source.txt"); + const target = path.join(dir, "target.txt"); + await writeFile(source, "copied"); + await writeFile(target, "stale"); + + await copyFileAtomic(source, target); + + expect(await readFile(target, "utf8")).toBe("copied"); + const entries = await readdir(dir); + expect(entries.sort()).toEqual(["source.txt", "target.txt"]); + }); + + it("cleans up its temp file when the source is missing", async () => { + const dir = await createTempDir(); + const target = path.join(dir, "target.txt"); + await expect( + copyFileAtomic(path.join(dir, "missing.txt"), target) + ).rejects.toThrow(); + expect(await readdir(dir)).toEqual([]); + }); +}); diff --git a/packages/leadtype/src/internal/atomic-fs.ts b/packages/leadtype/src/internal/atomic-fs.ts new file mode 100644 index 00000000..12406f10 --- /dev/null +++ b/packages/leadtype/src/internal/atomic-fs.ts @@ -0,0 +1,64 @@ +import { randomBytes } from "node:crypto"; +import { copyFile, rename, rm, writeFile } from "node:fs/promises"; +import { basename, dirname, join } from "node:path"; + +/** + * Generated artifacts are read while they are being (re)generated β€” by + * concurrent `leadtype generate` runs sharing an outDir, and by sibling build + * steps (tsc, next build) that read the output directory. A plain `writeFile` + * truncates the destination first, so those readers can observe empty or + * half-written files. Writing to a sibling temp file and renaming into place + * makes every artifact replacement atomic: readers see the old content or the + * new content, never a partial file. + * + * The temp file lives in the same directory as the destination so the rename + * never crosses a filesystem boundary (rename is only atomic within one). + * No fsync: these are reproducible build outputs, so crash durability is not + * worth the per-file cost β€” concurrent-reader atomicity is the goal. + */ + +function tempPathFor(filePath: string): string { + const suffix = randomBytes(6).toString("hex"); + return join( + dirname(filePath), + `.${basename(filePath)}.${process.pid}-${suffix}.tmp` + ); +} + +async function commitTempFile( + tempPath: string, + filePath: string, + write: () => Promise +): Promise { + try { + await write(); + await rename(tempPath, filePath); + } catch (error) { + try { + await rm(tempPath, { force: true }); + } catch { + // Best-effort cleanup; surface the original failure instead. + } + throw error; + } +} + +/** Atomically replace `filePath` with `data` (write temp sibling + rename). */ +export async function writeFileAtomic( + filePath: string, + data: string | Uint8Array +): Promise { + const tempPath = tempPathFor(filePath); + await commitTempFile(tempPath, filePath, () => writeFile(tempPath, data)); +} + +/** Atomically replace `targetPath` with a copy of `sourcePath`. */ +export async function copyFileAtomic( + sourcePath: string, + targetPath: string +): Promise { + const tempPath = tempPathFor(targetPath); + await commitTempFile(tempPath, targetPath, () => + copyFile(sourcePath, tempPath) + ); +} diff --git a/packages/leadtype/src/internal/generate-lock.test.ts b/packages/leadtype/src/internal/generate-lock.test.ts new file mode 100644 index 00000000..7283740b --- /dev/null +++ b/packages/leadtype/src/internal/generate-lock.test.ts @@ -0,0 +1,122 @@ +import { existsSync } from "node:fs"; +import { mkdir, mkdtemp, rm, utimes } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import path from "node:path"; +import { afterEach, describe, expect, it } from "vitest"; +import { acquireGenerateLock, generateLockPath } from "./generate-lock"; + +const tempDirs: string[] = []; +const lockPaths: string[] = []; + +async function createTempOutDir(): Promise { + const dir = await mkdtemp(path.join(tmpdir(), "leadtype-lock-out-")); + tempDirs.push(dir); + lockPaths.push(generateLockPath(dir)); + return dir; +} + +afterEach(async () => { + await Promise.all( + [...tempDirs.splice(0), ...lockPaths.splice(0)].map((dir) => + rm(dir, { force: true, recursive: true }) + ) + ); +}); + +describe("acquireGenerateLock", () => { + it("acquires, holds, and releases", async () => { + const outDir = await createTempOutDir(); + const lock = await acquireGenerateLock(outDir); + expect(existsSync(lock.lockPath)).toBe(true); + + await lock.release(); + expect(existsSync(lock.lockPath)).toBe(false); + // Release is idempotent. + await lock.release(); + }); + + it("keys the lock by outDir so unrelated runs do not contend", async () => { + const outDirA = await createTempOutDir(); + const outDirB = await createTempOutDir(); + expect(generateLockPath(outDirA)).not.toBe(generateLockPath(outDirB)); + + const lockA = await acquireGenerateLock(outDirA); + const lockB = await acquireGenerateLock(outDirB, { waitTimeoutMs: 500 }); + await Promise.all([lockA.release(), lockB.release()]); + }); + + it("waits for the holder to release before acquiring", async () => { + const outDir = await createTempOutDir(); + const first = await acquireGenerateLock(outDir); + + let firstReleased = false; + const second = acquireGenerateLock(outDir, { + pollIntervalMs: 20, + waitTimeoutMs: 5000, + }).then(async (lock) => { + expect(firstReleased).toBe(true); + await lock.release(); + }); + + await new Promise((resolve) => setTimeout(resolve, 100)); + firstReleased = true; + await first.release(); + await second; + }); + + it("reclaims a stale lock left by a crashed run", async () => { + const outDir = await createTempOutDir(); + const lockPath = generateLockPath(outDir); + await mkdir(lockPath); + const stale = new Date(Date.now() - 60_000); + await utimes(lockPath, stale, stale); + + const lock = await acquireGenerateLock(outDir, { + pollIntervalMs: 20, + staleMs: 1000, + waitTimeoutMs: 2000, + }); + expect(existsSync(lock.lockPath)).toBe(true); + await lock.release(); + }); + + it("fails loudly when the holder never releases", async () => { + const outDir = await createTempOutDir(); + const holder = await acquireGenerateLock(outDir); + try { + await expect( + acquireGenerateLock(outDir, { + pollIntervalMs: 20, + waitTimeoutMs: 200, + }) + ).rejects.toThrow(/timed out .*leadtype generate/); + } finally { + await holder.release(); + } + }); + + it("serializes concurrent acquisitions", async () => { + const outDir = await createTempOutDir(); + let inCriticalSection = 0; + let maxInCriticalSection = 0; + + await Promise.all( + Array.from({ length: 5 }, async () => { + const lock = await acquireGenerateLock(outDir, { + pollIntervalMs: 10, + waitTimeoutMs: 10_000, + }); + inCriticalSection += 1; + maxInCriticalSection = Math.max( + maxInCriticalSection, + inCriticalSection + ); + await new Promise((resolve) => setTimeout(resolve, 20)); + inCriticalSection -= 1; + await lock.release(); + }) + ); + + expect(maxInCriticalSection).toBe(1); + }); +}); diff --git a/packages/leadtype/src/internal/generate-lock.ts b/packages/leadtype/src/internal/generate-lock.ts new file mode 100644 index 00000000..5e6c22ed --- /dev/null +++ b/packages/leadtype/src/internal/generate-lock.ts @@ -0,0 +1,151 @@ +import { createHash } from "node:crypto"; +import { mkdir, rm, stat, utimes, writeFile } from "node:fs/promises"; +import { tmpdir } from "node:os"; +import path from "node:path"; + +/** + * Cross-process single-flight lock for `leadtype generate`. + * + * Parallel task graphs (lint/typecheck/build each depending on "docs are + * generated") commonly invoke generation concurrently against the same + * outDir. Atomic per-file writes keep every individual artifact readable, but + * whole runs still interleave: one run's read-back of `outDir/docs` can see a + * mix of its own and another run's output, and both runs burn the same work. + * This lock serializes runs per outDir so concurrent invocations wait for the + * in-flight run instead of racing it. + * + * The lock lives under os.tmpdir(), keyed by a hash of the resolved outDir β€” + * never inside the output directory itself, which is typically deployed + * verbatim (e.g. `public/`) and must not accumulate lock droppings from + * crashed runs. `mkdir` without `recursive` is the atomic acquire: exactly + * one process creates the directory, everyone else gets EEXIST. + * + * Stale handling: the holder refreshes the lock directory's mtime on an + * interval; a lock whose mtime is older than `staleMs` belongs to a crashed + * run and is reclaimed. Concurrent reclaimers race on the next mkdir and + * exactly one wins. + */ + +const DEFAULT_STALE_MS = 10 * 60 * 1000; +const DEFAULT_WAIT_TIMEOUT_MS = 5 * 60 * 1000; +const DEFAULT_POLL_INTERVAL_MS = 150; +const KEEPALIVE_DIVISOR = 4; +const MIN_KEEPALIVE_MS = 1000; +const LOCK_KEY_LENGTH = 16; + +export type GenerateLockOptions = { + /** Age after which an unrefreshed lock is considered abandoned. */ + staleMs?: number; + /** How long to wait for the holder before failing loudly. */ + waitTimeoutMs?: number; + /** Delay between acquisition attempts while waiting. */ + pollIntervalMs?: number; +}; + +export type GenerateLock = { + lockPath: string; + release: () => Promise; +}; + +export function generateLockPath(outDir: string): string { + const key = createHash("sha256") + .update(path.resolve(outDir)) + .digest("hex") + .slice(0, LOCK_KEY_LENGTH); + return path.join(tmpdir(), `leadtype-generate-${key}.lock`); +} + +function sleep(ms: number): Promise { + return new Promise((resolve) => setTimeout(resolve, ms)); +} + +async function isStale(lockPath: string, staleMs: number): Promise { + try { + const info = await stat(lockPath); + return Date.now() - info.mtimeMs > staleMs; + } catch { + // Lock vanished between EEXIST and stat β€” the holder released; retry. + return false; + } +} + +async function tryAcquire(lockPath: string): Promise { + try { + await mkdir(lockPath); + return true; + } catch (error) { + if ((error as NodeJS.ErrnoException).code === "EEXIST") { + return false; + } + throw error; + } +} + +async function writeOwnerMetadata(lockPath: string): Promise { + try { + await writeFile( + path.join(lockPath, "owner.json"), + `${JSON.stringify({ + pid: process.pid, + acquiredAt: new Date().toISOString(), + })}\n` + ); + } catch { + // Diagnostics only β€” the lock is the directory itself. + } +} + +export async function acquireGenerateLock( + outDir: string, + options: GenerateLockOptions = {} +): Promise { + const staleMs = options.staleMs ?? DEFAULT_STALE_MS; + const waitTimeoutMs = options.waitTimeoutMs ?? DEFAULT_WAIT_TIMEOUT_MS; + const pollIntervalMs = options.pollIntervalMs ?? DEFAULT_POLL_INTERVAL_MS; + const lockPath = generateLockPath(outDir); + const startedAt = Date.now(); + + while (!(await tryAcquire(lockPath))) { + if (await isStale(lockPath, staleMs)) { + await rm(lockPath, { force: true, recursive: true }); + continue; + } + if (Date.now() - startedAt >= waitTimeoutMs) { + throw new Error( + `timed out after ${Math.round(waitTimeoutMs / 1000)}s waiting for another \`leadtype generate\` run writing to "${outDir}" ` + + `(lock: ${lockPath}). If no other run is active, delete the lock directory, ` + + "or set LEADTYPE_NO_LOCK=1 to skip locking entirely." + ); + } + await sleep(pollIntervalMs); + } + + await writeOwnerMetadata(lockPath); + + // Refresh the lock's mtime so runs longer than staleMs are not reclaimed + // from under us. unref keeps the interval from holding the process open. + const keepaliveMs = Math.max( + MIN_KEEPALIVE_MS, + Math.floor(staleMs / KEEPALIVE_DIVISOR) + ); + const keepalive = setInterval(() => { + const now = new Date(); + utimes(lockPath, now, now).catch(() => { + // Best-effort refresh; a missed touch only matters past staleMs. + }); + }, keepaliveMs); + keepalive.unref(); + + let released = false; + return { + lockPath, + release: async () => { + if (released) { + return; + } + released = true; + clearInterval(keepalive); + await rm(lockPath, { force: true, recursive: true }); + }, + }; +} diff --git a/packages/leadtype/src/llm/llm.ts b/packages/leadtype/src/llm/llm.ts index 4946baa2..cc261e2b 100644 --- a/packages/leadtype/src/llm/llm.ts +++ b/packages/leadtype/src/llm/llm.ts @@ -1,12 +1,5 @@ import { existsSync } from "node:fs"; -import { - mkdir, - readdir, - readFile, - rm, - stat, - writeFile, -} from "node:fs/promises"; +import { mkdir, readdir, readFile, rm, stat } from "node:fs/promises"; import path from "node:path"; import type { PluggableList } from "unified"; import type { DocsFeedConfig } from "../feed"; @@ -20,6 +13,7 @@ import { outputRelativePathForLocale, toLocalizedDocsUrlPath, } from "../i18n"; +import { writeFileAtomic } from "../internal/atomic-fs"; import { slugifyDocsHeading } from "../internal/docs-heading"; import { type DocsPathMount, @@ -2274,13 +2268,13 @@ export async function generateLlmsTxt(config: LlmsTxtConfig): Promise { (transformer, value, context) => transformer.beforeLlmsTxt?.(value, context) ); - await writeFile(outputPath, artifact.content); + await writeFileAtomic(outputPath, artifact.content); // Publish a discovery copy at the well-known location so agents can find // llms.txt without guessing the root path. Served statically from the // output (public) dir; no route handler needed. const wellKnownPath = path.join(outDir, ".well-known", "llms.txt"); await mkdir(path.dirname(wellKnownPath), { recursive: true }); - await writeFile(wellKnownPath, artifact.content); + await writeFileAtomic(wellKnownPath, artifact.content); } if (hasNav || resolved.length > 0) { @@ -2330,7 +2324,7 @@ export async function generateLlmsTxt(config: LlmsTxtConfig): Promise { (transformer, value, context) => transformer.beforeLlmsTxt?.(value, context) ); - await writeFile(docsLlmsPath, artifact.content); + await writeFileAtomic(docsLlmsPath, artifact.content); } } @@ -2392,11 +2386,11 @@ export async function generateLLMFullContextFiles( (transformer, value, context) => transformer.beforeLlmsFull?.(value, context) ); - await writeFile(outputPath, artifact.content); + await writeFileAtomic(outputPath, artifact.content); // Discovery copy at the well-known location, alongside .well-known/llms.txt. const wellKnownFull = path.join(outDir, ".well-known", "llms-full.txt"); await mkdir(path.dirname(wellKnownFull), { recursive: true }); - await writeFile(wellKnownFull, artifact.content); + await writeFileAtomic(wellKnownFull, artifact.content); return; } @@ -2419,7 +2413,7 @@ export async function generateLLMFullContextFiles( (transformer, value, context) => transformer.beforeLlmsFull?.(value, context) ); - await writeFile(localeFullPath, artifact.content); + await writeFileAtomic(localeFullPath, artifact.content); } function toAgentReadabilityPage( @@ -2569,10 +2563,10 @@ export async function generateAgentReadabilityArtifacts( if (shouldEmitRootCrawlerFiles) { await mkdir(outDir, { recursive: true }); await mkdir(path.dirname(files.apiCatalog), { recursive: true }); - await writeFile(files.apiCatalog, renderApiCatalog({ manifest })); - await writeFile(files.sitemapXml, sitemapXml); - await writeFile(files.sitemapMd, sitemapMd); - await writeFile( + await writeFileAtomic(files.apiCatalog, renderApiCatalog({ manifest })); + await writeFileAtomic(files.sitemapXml, sitemapXml); + await writeFileAtomic(files.sitemapMd, sitemapMd); + await writeFileAtomic( files.robotsTxt, renderRobotsTxt({ baseUrl, @@ -2583,7 +2577,10 @@ export async function generateAgentReadabilityArtifacts( }) ); } - await writeFile(files.manifest, `${JSON.stringify(manifest, null, 2)}\n`); + await writeFileAtomic( + files.manifest, + `${JSON.stringify(manifest, null, 2)}\n` + ); return { files: { @@ -2862,7 +2859,7 @@ export async function generateAgentArtifacts( ...markdownUrlPath.slice(1).split("/") ); await mkdir(path.dirname(filePath), { recursive: true }); - await writeFile(filePath, renderAgentPageMirror(doc)); + await writeFileAtomic(filePath, renderAgentPageMirror(doc)); return filePath; }) ); @@ -2870,10 +2867,10 @@ export async function generateAgentArtifacts( await mkdir(outDir, { recursive: true }); const llmsTxt = renderAgentPagesLlmsTxt(inputs.product, docs, resolved); const llmsTxtPath = path.join(outDir, "llms.txt"); - await writeFile(llmsTxtPath, llmsTxt); + await writeFileAtomic(llmsTxtPath, llmsTxt); const wellKnownLlmsTxtPath = path.join(outDir, ".well-known", "llms.txt"); await mkdir(path.dirname(wellKnownLlmsTxtPath), { recursive: true }); - await writeFile(wellKnownLlmsTxtPath, llmsTxt); + await writeFileAtomic(wellKnownLlmsTxtPath, llmsTxt); const manifest: AgentReadabilityManifest = { version: 1, @@ -2892,7 +2889,7 @@ export async function generateAgentArtifacts( ...(config.agents?.seo ? { seo: config.agents.seo } : {}), }; const manifestPath = path.join(outDir, AGENT_READABILITY_MANIFEST_FILE); - await writeFile(manifestPath, `${JSON.stringify(manifest, null, 2)}\n`); + await writeFileAtomic(manifestPath, `${JSON.stringify(manifest, null, 2)}\n`); if (!(config.emitRootCrawlerFiles ?? true)) { return { @@ -2911,13 +2908,13 @@ export async function generateAgentArtifacts( const robotsTxtPath = path.join(outDir, ROBOTS_FILE); const apiCatalogPath = path.join(outDir, API_CATALOG_FILE); await mkdir(path.dirname(apiCatalogPath), { recursive: true }); - await writeFile(apiCatalogPath, renderApiCatalog({ manifest })); - await writeFile(sitemapXmlPath, renderSitemapXml(pages)); - await writeFile( + await writeFileAtomic(apiCatalogPath, renderApiCatalog({ manifest })); + await writeFileAtomic(sitemapXmlPath, renderSitemapXml(pages)); + await writeFileAtomic( sitemapMdPath, renderSitemapMarkdown({ product: inputs.product, navigation, pages }) ); - await writeFile( + await writeFileAtomic( robotsTxtPath, renderRobotsTxt({ baseUrl, @@ -3210,7 +3207,7 @@ export async function generateAgentsMd( (transformer, value, context) => transformer.beforeAgentsMd?.(value, context) ); - await writeFile(outputPath, artifact.content); + await writeFileAtomic(outputPath, artifact.content); return { outputPath }; } diff --git a/packages/leadtype/src/llm/skills.ts b/packages/leadtype/src/llm/skills.ts index b1dff2c4..c24fd173 100644 --- a/packages/leadtype/src/llm/skills.ts +++ b/packages/leadtype/src/llm/skills.ts @@ -1,6 +1,7 @@ import { createHash } from "node:crypto"; -import { mkdir, readFile, rm, writeFile } from "node:fs/promises"; +import { mkdir, readdir, readFile, rm } from "node:fs/promises"; import path from "node:path"; +import { writeFileAtomic } from "../internal/atomic-fs"; import type { DocsSkillSpec } from "./llm"; const NON_SLUG_PATTERN = /[^a-z0-9]+/g; @@ -228,15 +229,15 @@ export async function generateSkillArtifacts( } const body = await resolveBody(docsSkill, config.srcDir); const skillPath = path.join(outDir, "SKILL.md"); - await writeFile(skillPath, renderSkillMd(docsSkill, body)); + await writeFileAtomic(skillPath, renderSkillMd(docsSkill, body)); return { files: [skillPath], skills: [docsSkill.name] }; } const skillsRoot = path.join(outDir, WELL_KNOWN_DIR, SKILLS_DIR); const cardPath = path.join(outDir, WELL_KNOWN_DIR, "agent-card.json"); - // Validate every skill before touching the existing surface β€” a throw after - // the rm below would erase the last good output and leave a partial rewrite. + // Validate every skill before touching the existing surface β€” a throw + // mid-write would otherwise leave a partially updated discovery surface. for (const skill of skills) { if (!SKILL_NAME_PATTERN.test(skill.name)) { throw new Error( @@ -252,13 +253,9 @@ export async function generateSkillArtifacts( } } - // Clear generated artifacts first on every run, so skills/cards the config no - // longer emits (renamed, removed, or fully disabled) don't linger and keep - // getting discovered by clients. - await rm(skillsRoot, { recursive: true, force: true }); - if (skills.length === 0) { - // Whole surface disabled β€” also drop a stale agent card from a prior run. + // Whole surface disabled β€” drop the generated artifacts from prior runs. + await rm(skillsRoot, { recursive: true, force: true }); await rm(cardPath, { force: true }); return { files: [], skills: [] }; } @@ -281,7 +278,7 @@ export async function generateSkillArtifacts( const dir = path.join(skillsRoot, skill.name); await mkdir(dir, { recursive: true }); const skillPath = path.join(dir, "SKILL.md"); - await writeFile(skillPath, content); + await writeFileAtomic(skillPath, content); files.push(skillPath); manifestEntries.push({ name: skill.name, @@ -296,7 +293,7 @@ export async function generateSkillArtifacts( } const indexPath = path.join(skillsRoot, "index.json"); - await writeFile( + await writeFileAtomic( indexPath, `${JSON.stringify( { @@ -309,11 +306,25 @@ export async function generateSkillArtifacts( ); files.push(indexPath); + // Prune skills the config no longer emits (renamed or removed) so they don't + // keep getting discovered. Pruning runs after the new surface is fully in + // place β€” deleting first would leave a window where concurrent readers see + // no skills at all. + const keep = new Set([...skills.map((skill) => skill.name), "index.json"]); + const existingEntries = await readdir(skillsRoot); + await Promise.all( + existingEntries + .filter((entry) => !keep.has(entry)) + .map((entry) => + rm(path.join(skillsRoot, entry), { force: true, recursive: true }) + ) + ); + if (config.skills?.agentCard === false) { // Card disabled but skills exist β€” remove any stale card from a prior run. await rm(cardPath, { force: true }); } else { - await writeFile( + await writeFileAtomic( cardPath, `${JSON.stringify(buildAgentCard(config, skills), null, 2)}\n` ); diff --git a/packages/leadtype/src/mcp/card.ts b/packages/leadtype/src/mcp/card.ts index 83c1f6ea..4c416810 100644 --- a/packages/leadtype/src/mcp/card.ts +++ b/packages/leadtype/src/mcp/card.ts @@ -1,5 +1,6 @@ -import { mkdir, writeFile } from "node:fs/promises"; +import { mkdir } from "node:fs/promises"; import path from "node:path"; +import { writeFileAtomic } from "../internal/atomic-fs.js"; import { normalizeBaseUrl } from "../internal/docs-url.js"; import type { LlmsProductInfo } from "../llm/llm.js"; import { @@ -229,8 +230,8 @@ export async function generateMcpServerCard( const card = createMcpServerCard(options); const json = `${JSON.stringify(card, null, 2)}\n`; await mkdir(path.dirname(outputPath), { recursive: true }); - await writeFile(outputPath, json); - await writeFile(rootPath, json); - await writeFile(wellKnownPath, json); + await writeFileAtomic(outputPath, json); + await writeFileAtomic(rootPath, json); + await writeFileAtomic(wellKnownPath, json); return { outputPath, rootPath, wellKnownPath, card }; } diff --git a/packages/leadtype/src/nlweb/artifacts.ts b/packages/leadtype/src/nlweb/artifacts.ts index d1b62e9d..05258b03 100644 --- a/packages/leadtype/src/nlweb/artifacts.ts +++ b/packages/leadtype/src/nlweb/artifacts.ts @@ -1,5 +1,6 @@ -import { mkdir, writeFile } from "node:fs/promises"; +import { mkdir } from "node:fs/promises"; import path from "node:path"; +import { writeFileAtomic } from "../internal/atomic-fs.js"; import { normalizeBaseUrl } from "../internal/docs-url.js"; import type { LlmsProductInfo } from "../llm/llm.js"; import type { AgentReadabilityPage } from "../llm/readability.js"; @@ -92,8 +93,8 @@ export async function generateNlwebArtifacts( toSchemaFeedLine(page, config.product, baseUrl) ); await mkdir(path.dirname(schemaFeed), { recursive: true }); - await writeFile(schemaFeed, `${lines.join("\n")}\n`); - await writeFile( + await writeFileAtomic(schemaFeed, `${lines.join("\n")}\n`); + await writeFileAtomic( schemaMap, renderSchemaMapXml(`${baseUrl}/${NLWEB_SCHEMA_FEED_PATH}`) ); diff --git a/packages/leadtype/src/search/node.ts b/packages/leadtype/src/search/node.ts index 265739a7..032bc535 100644 --- a/packages/leadtype/src/search/node.ts +++ b/packages/leadtype/src/search/node.ts @@ -1,5 +1,5 @@ import { existsSync } from "node:fs"; -import { mkdir, readdir, readFile, writeFile } from "node:fs/promises"; +import { mkdir, readdir, readFile } from "node:fs/promises"; import path from "node:path"; import { type DocsI18nConfig, @@ -9,6 +9,7 @@ import { outputRelativePathForLocale, toLocalizedDocsUrlPath, } from "../i18n"; +import { writeFileAtomic } from "../internal/atomic-fs"; import { type DocsPathMount, GENERIC_DOC_TITLES, @@ -332,10 +333,10 @@ export async function generateDocsSearchFiles( const serializedContent = `${JSON.stringify(content)}\n`; await mkdir(path.dirname(outputPath), { recursive: true }); - await writeFile(outputPath, serialized); + await writeFileAtomic(outputPath, serialized); if (contentOutputPath) { await mkdir(path.dirname(contentOutputPath), { recursive: true }); - await writeFile(contentOutputPath, serializedContent); + await writeFileAtomic(contentOutputPath, serializedContent); } const indexBytes = Buffer.byteLength(serialized, "utf-8"); diff --git a/packages/leadtype/src/sync/sync.ts b/packages/leadtype/src/sync/sync.ts index ab7b1502..20bac81c 100644 --- a/packages/leadtype/src/sync/sync.ts +++ b/packages/leadtype/src/sync/sync.ts @@ -1,7 +1,8 @@ import { spawn } from "node:child_process"; import { existsSync } from "node:fs"; -import { mkdir, readFile, rm, writeFile } from "node:fs/promises"; +import { mkdir, readFile, rm } from "node:fs/promises"; import path from "node:path"; +import { writeFileAtomic } from "../internal/atomic-fs"; import { normalizeUrlPrefix } from "../internal/docs-url"; import type { DocsCollection } from "../llm"; @@ -212,7 +213,7 @@ export async function writeSyncManifest( manifest: SyncManifest ): Promise { await mkdir(cacheDir, { recursive: true }); - await writeFile( + await writeFileAtomic( path.join(cacheDir, SYNC_MANIFEST_FILE), `${JSON.stringify(manifest, null, 2)}\n` ); From 153e845d79225e465bf662c4066c7c1c90a02e30 Mon Sep 17 00:00:00 2001 From: Kaylee <65376239+KayleeWilliams@users.noreply.github.com> Date: Thu, 2 Jul 2026 10:20:37 +0100 Subject: [PATCH 2/3] Add 0.4 changelog entry for concurrent generation safety MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- docs/changelog/0-4.mdx | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/docs/changelog/0-4.mdx b/docs/changelog/0-4.mdx index f5cb7353..68ca2372 100644 --- a/docs/changelog/0-4.mdx +++ b/docs/changelog/0-4.mdx @@ -14,3 +14,11 @@ Leadtype 0.4 is the next minor release after the 0.3 line. These notes are being - Batched Git frontmatter enrichment during `convertAllMdx`. When `enrichFrontmatterFromGit` is enabled, conversion now reads Git history once for the docs tree and maps results back to each converted file instead of spawning `git log` per file. In a 120-file synthetic docs benchmark, the Git metadata read dropped from ~2.36s of per-file process spawning to ~12ms for the batched read; full conversion with enrichment added ~27ms over no enrichment. - Preserved best-effort Git behavior for shallow clones, missing Git, and untracked files. `lastModified` still comes from the latest file commit, while `lastAuthor` now falls back to the latest non-bot author when the newest commit was authored by automation. - Cached repeated `` and `` resolution within a conversion run. Pages that reuse the same partial now share one raw file read and one parsed markdown AST, while section anchors such as `file.mdx#setup` still extract from cloned ASTs. The current Leadtype docs and c15t fixture do not contain repeated real include nodes, but a synthetic 200-page repeated-include benchmark cut include expansion from ~400ms to ~68ms. + +## Concurrent generation safety + +- Made generation safe to invoke concurrently against a shared `outDir`. Parallel task graphs where lint, typecheck, and build each depend on docs generation used to race on the output directory, causing intermittent partial reads, ENOENT on files another run had just replaced, and half-written artifacts. +- Every generated artifact β€” converted `docs/*.md`, `llms.txt`, `llms-full.txt`, the search index, sitemaps, robots, feeds, the MCP server card, NLWeb artifacts, skills, and sync manifests β€” is now written to a temp sibling and atomically renamed into place, so concurrent readers (including a sibling `tsc` or framework build reading `public/`) see the old content or the new content, never a truncated file. +- Removed delete-then-recreate windows: the agent-skills surface and mounted markdown mirrors write the new files first and prune stale entries after, instead of `rm -rf`-ing a live directory before rebuilding it. +- `leadtype generate` runs are now single-flight per output directory via a cross-process lock stored under the system temp dir, keyed by the resolved `--out` path. Concurrent invocations wait for the in-flight run; locks abandoned by crashed runs are reclaimed after 10 minutes, and waiting runs fail loudly after 5 minutes instead of hanging CI. Set `LEADTYPE_NO_LOCK=1` to opt out. +- Overhead is negligible: the atomic write adds one rename per file (~0.1–0.2ms) and the lock a fixed ~8ms per run β€” about 1–2% end to end on a 300-page site. From 6b4dcc336cc8b01647cf998d67e9dc6070c39f88 Mon Sep 17 00:00:00 2001 From: Kaylee <65376239+KayleeWilliams@users.noreply.github.com> Date: Thu, 2 Jul 2026 10:57:07 +0100 Subject: [PATCH 3/3] Address PR review: harden lock recovery and mirror pruning MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Reclaim abandoned locks via atomic rename-then-verify instead of a plain rm, so a competing waiter can never delete a fresh lock acquired between its staleness check and the removal (Codex review). - Release the lock on SIGINT/SIGTERM and re-raise, so an interrupted run never stalls the next one; probe the owner.json pid so a hard-killed holder is reclaimed immediately instead of after the stale window (Pullfrog review). - Raise the default wait timeout to 15 minutes (above the 10-minute stale window) so waiters outlive crashed holders and never fail a healthy long run; LEADTYPE_LOCK_TIMEOUT_MS overrides (Pullfrog review). - Sweep temp files leaked by hard-killed runs at the start of the next locked run so they never linger in a deployed public/ (Pullfrog review). - Exclude a nested mirror target from its own source glob so a mount whose urlPrefix resolves inside its source subtree never re-mirrors its previous output (Codex review). - Prune directories left empty after removing stale mirror files (Pullfrog review). πŸ€– Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- .changeset/concurrent-generate-safety.md | 11 +- docs/changelog/0-4.mdx | 2 +- docs/pipeline/generate-static-artifacts.mdx | 4 +- packages/leadtype/src/cli/generate.ts | 71 +++++++++- .../leadtype/src/internal/atomic-fs.test.ts | 60 +++++++- packages/leadtype/src/internal/atomic-fs.ts | 38 ++++- .../src/internal/generate-lock.test.ts | 36 ++++- .../leadtype/src/internal/generate-lock.ts | 131 ++++++++++++++++-- 8 files changed, 322 insertions(+), 31 deletions(-) diff --git a/.changeset/concurrent-generate-safety.md b/.changeset/concurrent-generate-safety.md index 8669c976..790c7d90 100644 --- a/.changeset/concurrent-generate-safety.md +++ b/.changeset/concurrent-generate-safety.md @@ -19,7 +19,10 @@ artifacts. instead of `rm -rf`-ing a live directory before rebuilding it. - `leadtype generate` runs are single-flight per output directory via a cross-process lock stored under the system temp dir (keyed by the resolved - `--out` path). Concurrent invocations wait for the in-flight run; locks - abandoned by crashed runs are reclaimed after 10 minutes, and waiting runs - fail loudly after 5 minutes instead of hanging CI. Set `LEADTYPE_NO_LOCK=1` - to opt out. + `--out` path). Concurrent invocations wait for the in-flight run. Abandoned + locks recover fast: interrupted runs (SIGINT/SIGTERM) release on the way + out, hard-killed runs are reclaimed as soon as their recorded pid is gone, + and unidentifiable locks are reclaimed after 10 minutes. Waiting runs fail + loudly after 15 minutes instead of hanging CI (`LEADTYPE_LOCK_TIMEOUT_MS` + overrides). Set `LEADTYPE_NO_LOCK=1` to opt out. Temp files leaked by a + hard-killed run are swept at the start of the next locked run. diff --git a/docs/changelog/0-4.mdx b/docs/changelog/0-4.mdx index 68ca2372..1e7399ce 100644 --- a/docs/changelog/0-4.mdx +++ b/docs/changelog/0-4.mdx @@ -20,5 +20,5 @@ Leadtype 0.4 is the next minor release after the 0.3 line. These notes are being - Made generation safe to invoke concurrently against a shared `outDir`. Parallel task graphs where lint, typecheck, and build each depend on docs generation used to race on the output directory, causing intermittent partial reads, ENOENT on files another run had just replaced, and half-written artifacts. - Every generated artifact β€” converted `docs/*.md`, `llms.txt`, `llms-full.txt`, the search index, sitemaps, robots, feeds, the MCP server card, NLWeb artifacts, skills, and sync manifests β€” is now written to a temp sibling and atomically renamed into place, so concurrent readers (including a sibling `tsc` or framework build reading `public/`) see the old content or the new content, never a truncated file. - Removed delete-then-recreate windows: the agent-skills surface and mounted markdown mirrors write the new files first and prune stale entries after, instead of `rm -rf`-ing a live directory before rebuilding it. -- `leadtype generate` runs are now single-flight per output directory via a cross-process lock stored under the system temp dir, keyed by the resolved `--out` path. Concurrent invocations wait for the in-flight run; locks abandoned by crashed runs are reclaimed after 10 minutes, and waiting runs fail loudly after 5 minutes instead of hanging CI. Set `LEADTYPE_NO_LOCK=1` to opt out. +- `leadtype generate` runs are now single-flight per output directory via a cross-process lock stored under the system temp dir, keyed by the resolved `--out` path. Concurrent invocations wait for the in-flight run. Abandoned locks recover fast: interrupted runs release on the way out, hard-killed runs are reclaimed as soon as their recorded process is gone, and unidentifiable locks are reclaimed after 10 minutes. Waiting runs fail loudly after 15 minutes instead of hanging CI (`LEADTYPE_LOCK_TIMEOUT_MS` overrides). Set `LEADTYPE_NO_LOCK=1` to opt out. - Overhead is negligible: the atomic write adds one rename per file (~0.1–0.2ms) and the lock a fixed ~8ms per run β€” about 1–2% end to end on a 300-page site. diff --git a/docs/pipeline/generate-static-artifacts.mdx b/docs/pipeline/generate-static-artifacts.mdx index 3f427334..b4e0050d 100644 --- a/docs/pipeline/generate-static-artifacts.mdx +++ b/docs/pipeline/generate-static-artifacts.mdx @@ -162,8 +162,8 @@ npx leadtype generate --src . --out public --base-url https://docs.example.com Parallel task graphs often invoke generation more than once at the same time β€” lint, typecheck, and build each declaring "docs are generated" as a prerequisite. `leadtype generate` is safe under that pattern: -- **Runs are single-flight per output directory.** A cross-process lock (keyed by the resolved `--out` path, stored under the system temp dir β€” never inside the output directory) serializes concurrent runs. Later invocations wait for the in-flight run, then regenerate. Locks abandoned by crashed runs are reclaimed automatically after 10 minutes; waiting runs fail loudly after 5 minutes rather than hanging CI. -- **Every artifact is replaced atomically.** Files are written to a temp sibling and renamed into place, so a sibling build step reading the output directory (`tsc`, `next build`, a dev server watching `public/`) sees the old artifact or the new one β€” never a truncated file, and never a missing file for pages that still exist. +- **Runs are single-flight per output directory.** A cross-process lock (keyed by the resolved `--out` path, stored under the system temp dir β€” never inside the output directory) serializes concurrent runs. Later invocations wait for the in-flight run, then regenerate. Abandoned locks recover fast: an interrupted run (Ctrl-C, `SIGTERM`) releases its lock on the way out, a hard-killed run's lock is reclaimed as soon as its recorded process is gone, and an unidentifiable lock is reclaimed after 10 minutes without refresh. Waiting runs fail loudly after 15 minutes rather than hanging CI (`LEADTYPE_LOCK_TIMEOUT_MS` overrides this for very large sites). +- **Every artifact is replaced atomically.** Files are written to a temp sibling and renamed into place, so a sibling build step reading the output directory (`tsc`, `next build`, a dev server watching `public/`) sees the old artifact or the new one β€” never a truncated file, and never a missing file for pages that still exist. Temp files leaked by a hard-killed run are swept at the start of the next locked run, so they never linger in a deployed `public/`. Set `LEADTYPE_NO_LOCK=1` to skip the lock β€” for example on network filesystems where directory-based locking is unreliable, or when your task runner already serializes generation. diff --git a/packages/leadtype/src/cli/generate.ts b/packages/leadtype/src/cli/generate.ts index 2d302e68..2fbef3cd 100644 --- a/packages/leadtype/src/cli/generate.ts +++ b/packages/leadtype/src/cli/generate.ts @@ -1,5 +1,5 @@ import { existsSync } from "node:fs"; -import { cp, mkdir, mkdtemp, readFile, rm } from "node:fs/promises"; +import { cp, mkdir, mkdtemp, readFile, rm, rmdir } from "node:fs/promises"; import { tmpdir } from "node:os"; import path from "node:path"; import { pathToFileURL } from "node:url"; @@ -8,7 +8,11 @@ import type { Pluggable, PluggableList } from "unified"; import { convertAllMdx } from "../convert"; import { type DocsFeedConfig, generateFeedArtifacts } from "../feed"; import { type DocsI18nManifest, normalizeDocsI18nConfig } from "../i18n"; -import { copyFileAtomic, writeFileAtomic } from "../internal/atomic-fs"; +import { + copyFileAtomic, + sweepLeakedTempFiles, + writeFileAtomic, +} from "../internal/atomic-fs"; import { type DocsPathMount, normalizeBaseUrl, @@ -2147,9 +2151,21 @@ async function copyMountedMarkdownMirrors( `Mounted URL prefix "${urlPrefix}" must resolve inside the output directory.` ); } + // A mount whose urlPrefix resolves inside its own source subtree (e.g. + // pathPrefix "guides" with urlPrefix "/docs/guides/public") nests + // targetDir under sourceDir. Exclude the mirror from the source glob so + // a previous run's mirror output is never re-mirrored into itself. + const targetRelativeToSource = path.relative(sourceDir, targetDir); + const targetInsideSource = + targetRelativeToSource.length > 0 && + !targetRelativeToSource.startsWith("..") && + !path.isAbsolute(targetRelativeToSource); const files = await fg("**/*.md", { absolute: false, cwd: sourceDir, + ignore: targetInsideSource + ? [`${normalizeDocsPath(targetRelativeToSource)}/**`] + : [], onlyFiles: true, }); await Promise.all( @@ -2170,15 +2186,47 @@ async function copyMountedMarkdownMirrors( cwd: targetDir, onlyFiles: true, }); + const staleFiles = mirroredFiles.filter( + (file) => !currentFiles.has(file) + ); await Promise.all( - mirroredFiles - .filter((file) => !currentFiles.has(file)) - .map((file) => rm(path.join(targetDir, file), { force: true })) + staleFiles.map((file) => + rm(path.join(targetDir, file), { force: true }) + ) ); + await removeEmptyMirrorDirs(targetDir, staleFiles); }) ); } +/** + * Remove directories left empty after pruning stale mirror files, walking + * each pruned file's parent chain up to (but never including) the mirror + * root. A non-empty directory stops the walk β€” everything above it is + * non-empty too. + */ +async function removeEmptyMirrorDirs( + targetDir: string, + prunedFiles: string[] +): Promise { + const parents = new Set( + prunedFiles.map((file) => path.dirname(path.join(targetDir, file))) + ); + for (const parent of [...parents].sort( + (left, right) => right.length - left.length + )) { + let current = parent; + while (current.startsWith(`${targetDir}${path.sep}`)) { + try { + await rmdir(current); + } catch { + break; + } + current = path.dirname(current); + } + } +} + async function hasMarkdownFiles(dir: string): Promise { if (!existsSync(dir)) { return false; @@ -2538,7 +2586,13 @@ export async function runGenerateCommand( let generateLock: GenerateLock | undefined; if (process.env.LEADTYPE_NO_LOCK !== "1") { try { - generateLock = await acquireGenerateLock(outDir); + const waitTimeoutMs = Number(process.env.LEADTYPE_LOCK_TIMEOUT_MS); + generateLock = await acquireGenerateLock( + outDir, + Number.isFinite(waitTimeoutMs) && waitTimeoutMs > 0 + ? { waitTimeoutMs } + : {} + ); } catch (error) { const message = error instanceof Error ? error.message : String(error); reportFailure(message); @@ -2548,6 +2602,11 @@ export async function runGenerateCommand( let sourceMirror: SourceMirror | undefined; try { + if (generateLock) { + // With the lock held, no other run is in flight β€” safe to sweep temp + // files leaked into the output tree by a previous hard-killed run. + await sweepLeakedTempFiles(outDir); + } const metadata = await resolveGenerateMetadata( srcDir, loadedConfig, diff --git a/packages/leadtype/src/internal/atomic-fs.test.ts b/packages/leadtype/src/internal/atomic-fs.test.ts index 0ddb108a..3cfb9520 100644 --- a/packages/leadtype/src/internal/atomic-fs.test.ts +++ b/packages/leadtype/src/internal/atomic-fs.test.ts @@ -1,8 +1,20 @@ -import { mkdtemp, readdir, readFile, rm, writeFile } from "node:fs/promises"; +import { + mkdir, + mkdtemp, + readdir, + readFile, + rm, + writeFile, +} from "node:fs/promises"; import { tmpdir } from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it } from "vitest"; -import { copyFileAtomic, writeFileAtomic } from "./atomic-fs"; +import { + copyFileAtomic, + isAtomicTempFileName, + sweepLeakedTempFiles, + writeFileAtomic, +} from "./atomic-fs"; const tempDirs: string[] = []; @@ -96,3 +108,47 @@ describe("copyFileAtomic", () => { expect(await readdir(dir)).toEqual([]); }); }); + +describe("isAtomicTempFileName", () => { + it("matches only the temp-sibling naming scheme", () => { + expect(isAtomicTempFileName(".guide.md.12345-a1b2c3d4e5f6.tmp")).toBe(true); + // User dotfiles and near-misses must never match β€” the sweep deletes. + expect(isAtomicTempFileName(".gitkeep")).toBe(false); + expect(isAtomicTempFileName(".env.tmp")).toBe(false); + expect(isAtomicTempFileName("guide.md.12345-a1b2c3d4e5f6.tmp")).toBe(false); + expect(isAtomicTempFileName(".guide.md.12345-short.tmp")).toBe(false); + }); +}); + +describe("sweepLeakedTempFiles", () => { + it("removes leaked temp siblings but leaves everything else", async () => { + const dir = await createTempDir(); + await mkdir(path.join(dir, "docs", "guides"), { recursive: true }); + const leaked = path.join( + dir, + "docs", + "guides", + ".page.md.99999-abcdef012345.tmp" + ); + const artifact = path.join(dir, "docs", "guides", "page.md"); + const dotfile = path.join(dir, "docs", ".gitkeep"); + await writeFile(leaked, "partial"); + await writeFile(artifact, "content"); + await writeFile(dotfile, ""); + + await sweepLeakedTempFiles(dir); + + expect(await readdir(path.join(dir, "docs", "guides"))).toEqual([ + "page.md", + ]); + expect(await readFile(artifact, "utf8")).toBe("content"); + expect(await readdir(path.join(dir, "docs"))).toContain(".gitkeep"); + }); + + it("is a no-op for a missing directory", async () => { + const dir = await createTempDir(); + await expect( + sweepLeakedTempFiles(path.join(dir, "missing")) + ).resolves.toBeUndefined(); + }); +}); diff --git a/packages/leadtype/src/internal/atomic-fs.ts b/packages/leadtype/src/internal/atomic-fs.ts index 12406f10..cdfeff0e 100644 --- a/packages/leadtype/src/internal/atomic-fs.ts +++ b/packages/leadtype/src/internal/atomic-fs.ts @@ -1,6 +1,8 @@ import { randomBytes } from "node:crypto"; +import { existsSync } from "node:fs"; import { copyFile, rename, rm, writeFile } from "node:fs/promises"; import { basename, dirname, join } from "node:path"; +import { glob as fg } from "tinyglobby"; /** * Generated artifacts are read while they are being (re)generated β€” by @@ -17,14 +19,48 @@ import { basename, dirname, join } from "node:path"; * worth the per-file cost β€” concurrent-reader atomicity is the goal. */ +const TEMP_SUFFIX_BYTES = 6; +// Mirrors tempPathFor: `.{name}.{pid}-{12 hex chars}.tmp`. +const TEMP_FILE_NAME_PATTERN = /^\..+\.\d+-[0-9a-f]{12}\.tmp$/; + function tempPathFor(filePath: string): string { - const suffix = randomBytes(6).toString("hex"); + const suffix = randomBytes(TEMP_SUFFIX_BYTES).toString("hex"); return join( dirname(filePath), `.${basename(filePath)}.${process.pid}-${suffix}.tmp` ); } +/** True when `fileName` matches the temp-sibling naming scheme used here. */ +export function isAtomicTempFileName(fileName: string): boolean { + return TEMP_FILE_NAME_PATTERN.test(fileName); +} + +/** + * Remove temp siblings leaked by a hard-killed run (SIGKILL, OOM kill). + * Normal and error paths clean up after themselves; this sweep exists so a + * crashed run cannot leave `.name.pid-hex.tmp` droppings in a directory that + * is deployed verbatim (e.g. `public/`). Callers must hold the generate lock + * (or otherwise know no run is in flight) so an active run's in-progress + * temp files are never swept. + */ +export async function sweepLeakedTempFiles(dir: string): Promise { + if (!existsSync(dir)) { + return; + } + const candidates = await fg("**/.*.tmp", { + absolute: true, + cwd: dir, + dot: true, + onlyFiles: true, + }); + await Promise.all( + candidates + .filter((candidate) => isAtomicTempFileName(basename(candidate))) + .map((candidate) => rm(candidate, { force: true })) + ); +} + async function commitTempFile( tempPath: string, filePath: string, diff --git a/packages/leadtype/src/internal/generate-lock.test.ts b/packages/leadtype/src/internal/generate-lock.test.ts index 7283740b..e18f3804 100644 --- a/packages/leadtype/src/internal/generate-lock.test.ts +++ b/packages/leadtype/src/internal/generate-lock.test.ts @@ -1,5 +1,5 @@ import { existsSync } from "node:fs"; -import { mkdir, mkdtemp, rm, utimes } from "node:fs/promises"; +import { mkdir, mkdtemp, rm, utimes, writeFile } from "node:fs/promises"; import { tmpdir } from "node:os"; import path from "node:path"; import { afterEach, describe, expect, it } from "vitest"; @@ -64,6 +64,40 @@ describe("acquireGenerateLock", () => { await second; }); + it("reclaims a lock whose recorded holder process is dead", async () => { + const outDir = await createTempOutDir(); + const lockPath = generateLockPath(outDir); + await mkdir(lockPath); + // Fresh mtime, but the recorded pid cannot exist (beyond pid_max on + // Linux and macOS) β€” models a SIGKILLed/OOM-killed holder. + await writeFile( + path.join(lockPath, "owner.json"), + `${JSON.stringify({ pid: 2 ** 30, acquiredAt: "2026-01-01T00:00:00Z" })}\n` + ); + + const lock = await acquireGenerateLock(outDir, { + pollIntervalMs: 20, + staleMs: 60_000, + waitTimeoutMs: 2000, + }); + expect(existsSync(lock.lockPath)).toBe(true); + await lock.release(); + }); + + it("registers signal handlers while held and removes them on release", async () => { + const outDir = await createTempOutDir(); + const sigintBefore = process.listenerCount("SIGINT"); + const sigtermBefore = process.listenerCount("SIGTERM"); + + const lock = await acquireGenerateLock(outDir); + expect(process.listenerCount("SIGINT")).toBe(sigintBefore + 1); + expect(process.listenerCount("SIGTERM")).toBe(sigtermBefore + 1); + + await lock.release(); + expect(process.listenerCount("SIGINT")).toBe(sigintBefore); + expect(process.listenerCount("SIGTERM")).toBe(sigtermBefore); + }); + it("reclaims a stale lock left by a crashed run", async () => { const outDir = await createTempOutDir(); const lockPath = generateLockPath(outDir); diff --git a/packages/leadtype/src/internal/generate-lock.ts b/packages/leadtype/src/internal/generate-lock.ts index 5e6c22ed..92033730 100644 --- a/packages/leadtype/src/internal/generate-lock.ts +++ b/packages/leadtype/src/internal/generate-lock.ts @@ -1,5 +1,14 @@ -import { createHash } from "node:crypto"; -import { mkdir, rm, stat, utimes, writeFile } from "node:fs/promises"; +import { createHash, randomBytes } from "node:crypto"; +import { rmSync } from "node:fs"; +import { + mkdir, + readFile, + rename, + rm, + stat, + utimes, + writeFile, +} from "node:fs/promises"; import { tmpdir } from "node:os"; import path from "node:path"; @@ -20,18 +29,27 @@ import path from "node:path"; * crashed runs. `mkdir` without `recursive` is the atomic acquire: exactly * one process creates the directory, everyone else gets EEXIST. * - * Stale handling: the holder refreshes the lock directory's mtime on an - * interval; a lock whose mtime is older than `staleMs` belongs to a crashed - * run and is reclaimed. Concurrent reclaimers race on the next mkdir and - * exactly one wins. + * Abandoned-lock recovery, ordered by how fast each path fires: + * 1. SIGINT/SIGTERM: the holder removes its lock on the way out, so an + * interrupted run never stalls the next one. + * 2. Dead holder (SIGKILL, OOM kill): waiters probe the pid recorded in + * `owner.json` β€” the lock lives in the local tmpdir, so the holder is + * always a same-machine process β€” and reclaim as soon as it is gone. + * 3. Stale mtime: the holder refreshes the lock's mtime on an interval; a + * lock older than `staleMs` is reclaimed even when no pid is readable. */ const DEFAULT_STALE_MS = 10 * 60 * 1000; -const DEFAULT_WAIT_TIMEOUT_MS = 5 * 60 * 1000; +// Longer than the stale window so a waiter always outlives a crashed holder +// (the lock goes stale and is reclaimed before any waiter gives up), while a +// healthy long-running holder β€” whose keepalive keeps the lock fresh β€” is +// waited on rather than failed. +const DEFAULT_WAIT_TIMEOUT_MS = 15 * 60 * 1000; const DEFAULT_POLL_INTERVAL_MS = 150; const KEEPALIVE_DIVISOR = 4; const MIN_KEEPALIVE_MS = 1000; const LOCK_KEY_LENGTH = 16; +const RECLAIM_SUFFIX_BYTES = 4; export type GenerateLockOptions = { /** Age after which an unrefreshed lock is considered abandoned. */ @@ -59,14 +77,77 @@ function sleep(ms: number): Promise { return new Promise((resolve) => setTimeout(resolve, ms)); } -async function isStale(lockPath: string, staleMs: number): Promise { +async function readOwnerPid(lockPath: string): Promise { + try { + const raw = await readFile(path.join(lockPath, "owner.json"), "utf8"); + const pid = (JSON.parse(raw) as { pid?: unknown }).pid; + return typeof pid === "number" ? pid : undefined; + } catch { + // Holder mid-acquire or metadata unreadable β€” fall back to the mtime window. + return; + } +} + +function isProcessAlive(pid: number): boolean { + try { + process.kill(pid, 0); + return true; + } catch (error) { + // EPERM means the pid exists but belongs to another user. + return (error as NodeJS.ErrnoException).code === "EPERM"; + } +} + +async function isAbandoned( + lockPath: string, + staleMs: number +): Promise { try { const info = await stat(lockPath); - return Date.now() - info.mtimeMs > staleMs; + if (Date.now() - info.mtimeMs > staleMs) { + return true; + } } catch { // Lock vanished between EEXIST and stat β€” the holder released; retry. return false; } + const pid = await readOwnerPid(lockPath); + return pid !== undefined && !isProcessAlive(pid); +} + +/** + * Remove an abandoned lock without racing competing acquirers: rename it to a + * unique trash path first (rename is atomic, so exactly one reclaimer wins), + * then re-check that what we grabbed really was the abandoned lock. A plain + * `rm` here could delete a fresh lock that a competing process created + * between our abandonment check and the removal. + */ +async function reclaimAbandonedLock( + lockPath: string, + staleMs: number +): Promise { + const suffix = randomBytes(RECLAIM_SUFFIX_BYTES).toString("hex"); + const trashPath = `${lockPath}.reclaim-${process.pid}-${suffix}`; + try { + await rename(lockPath, trashPath); + } catch { + // Another reclaimer won, or the holder released β€” go retry the acquire. + return; + } + if (await isAbandoned(trashPath, staleMs)) { + await rm(trashPath, { force: true, recursive: true }); + return; + } + // We grabbed a live lock created in the checkβ†’rename window; hand it back. + try { + await rename(trashPath, lockPath); + } catch { + // A third process acquired lockPath in the same window. Dropping the + // displaced lock is the least-bad option: the window is microseconds + // wide, and the displaced run's artifacts remain safe under the atomic + // per-file writes. + await rm(trashPath, { force: true, recursive: true }); + } } async function tryAcquire(lockPath: string): Promise { @@ -91,7 +172,7 @@ async function writeOwnerMetadata(lockPath: string): Promise { })}\n` ); } catch { - // Diagnostics only β€” the lock is the directory itself. + // Waiters fall back to the mtime stale window without a readable pid. } } @@ -106,20 +187,41 @@ export async function acquireGenerateLock( const startedAt = Date.now(); while (!(await tryAcquire(lockPath))) { - if (await isStale(lockPath, staleMs)) { - await rm(lockPath, { force: true, recursive: true }); + if (await isAbandoned(lockPath, staleMs)) { + await reclaimAbandonedLock(lockPath, staleMs); continue; } if (Date.now() - startedAt >= waitTimeoutMs) { throw new Error( `timed out after ${Math.round(waitTimeoutMs / 1000)}s waiting for another \`leadtype generate\` run writing to "${outDir}" ` + - `(lock: ${lockPath}). If no other run is active, delete the lock directory, ` + - "or set LEADTYPE_NO_LOCK=1 to skip locking entirely." + `(lock: ${lockPath}). If no other run is active, delete the lock directory. ` + + "Set LEADTYPE_LOCK_TIMEOUT_MS to wait longer, or LEADTYPE_NO_LOCK=1 to skip locking entirely." ); } await sleep(pollIntervalMs); } + // An interrupted holder must not stall the next run until a slower + // recovery path fires, so release the lock on the way out and re-raise to + // preserve the default terminate behavior and exit code. + const onSigint = (): void => releaseOnSignal("SIGINT"); + const onSigterm = (): void => releaseOnSignal("SIGTERM"); + const removeSignalHandlers = (): void => { + process.removeListener("SIGINT", onSigint); + process.removeListener("SIGTERM", onSigterm); + }; + const releaseOnSignal = (signal: NodeJS.Signals): void => { + try { + rmSync(lockPath, { force: true, recursive: true }); + } catch { + // Best effort β€” the dead-pid reclaim covers whatever we couldn't remove. + } + removeSignalHandlers(); + process.kill(process.pid, signal); + }; + process.once("SIGINT", onSigint); + process.once("SIGTERM", onSigterm); + await writeOwnerMetadata(lockPath); // Refresh the lock's mtime so runs longer than staleMs are not reclaimed @@ -145,6 +247,7 @@ export async function acquireGenerateLock( } released = true; clearInterval(keepalive); + removeSignalHandlers(); await rm(lockPath, { force: true, recursive: true }); }, };