diff --git a/src/areas/generate/components/WorkflowPanel.tsx b/src/areas/generate/components/WorkflowPanel.tsx index e2d920b..5d6c14f 100644 --- a/src/areas/generate/components/WorkflowPanel.tsx +++ b/src/areas/generate/components/WorkflowPanel.tsx @@ -10,6 +10,7 @@ import { useAppStore } from '@shared/stores/appStore' import { useExtensionsStore } from '@shared/stores/extensionsStore' import { useNavStore } from '@shared/stores/navStore' import { useWorkflowRunStore } from '@areas/workflows/workflowRunStore' +import { useWaitButton } from '@areas/workflows/useWaitButton' import { buildAllWorkflowExtensions, getWorkflowExtension } from '@areas/workflows/mockExtensions' import { validateWorkflowPreflight } from '@areas/workflows/preflight' import type { WorkflowExtension } from '@areas/workflows/mockExtensions' @@ -335,10 +336,7 @@ function TextParamRow({ nodeId, nodes, onPatch }: { nodeId: string; nodes: FlowN } function WaitParamRow({ nodeId }: { nodeId: string }) { - const status = useWorkflowRunStore((s) => s.runState.status) - const activeNodeId = useWorkflowRunStore((s) => s.activeNodeId) - const continueRun = useWorkflowRunStore((s) => s.continueRun) - const isPaused = status === 'paused' && activeNodeId === nodeId + const { waitState, canContinue, isRunning, label, buttonClass, onContinue } = useWaitButton(nodeId) return (
@@ -348,15 +346,29 @@ function WaitParamRow({ nodeId }: { nodeId: string }) { Wait
- {isPaused ? ( + {waitState ? ( ) : (

diff --git a/src/areas/workflows/nodeBehaviors.ts b/src/areas/workflows/nodeBehaviors.ts new file mode 100644 index 0000000..ad3e746 --- /dev/null +++ b/src/areas/workflows/nodeBehaviors.ts @@ -0,0 +1,101 @@ +import type { WFNode, WFEdge } from '@shared/types/electron.d' + +// ─── Node behaviors registry ────────────────────────────────────────────────── +// +// Add a new entry here when you introduce a node type that needs to participate +// in the runner's control-flow logic. The predicates and helpers below derive +// everything they need from this table — no hardcoded type checks anywhere else. +// +// • passthrough — data flows through this node unchanged. Resolvers (inputs, +// preflight typing) walk past it to find the real source. +// • branchStarter — splits the run into a user-driven sub-DAG. The runner +// pauses on these nodes and exposes a Continue/Retry button. +// • sceneOutput — terminal sink that gets pushed to the 3D viewer. Used by +// the immediate mesh-push logic during execution. + +export interface NodeBehavior { + passthrough?: boolean + branchStarter?: boolean + sceneOutput?: boolean +} + +const BEHAVIORS: Record = { + waitNode: { passthrough: true, branchStarter: true }, + outputNode: { sceneOutput: true }, +} + +export const isPassthrough = (type: string | undefined): boolean => !!type && !!BEHAVIORS[type]?.passthrough +export const isBranchStarter = (type: string | undefined): boolean => !!type && !!BEHAVIORS[type]?.branchStarter +export const isSceneOutput = (type: string | undefined): boolean => !!type && !!BEHAVIORS[type]?.sceneOutput + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +/** + * Walks `sourceId` backwards through passthrough nodes (following each one's + * incoming edge) until it hits a non-passthrough node. Returns its id, or + * undefined if a passthrough has no incoming edge. + */ +export function resolveDataSource( + sourceId: string, + edges: WFEdge[], + nodeMap: Map, +): string | undefined { + let cur = sourceId + const seen = new Set() + while (isPassthrough(nodeMap.get(cur)?.type) && !seen.has(cur)) { + seen.add(cur) + const parent = edges.find((e) => e.target === cur) + if (!parent) return undefined + cur = parent.source + } + return cur +} + +/** + * Walks backwards from `nodeId` and returns the set of nearest upstream + * branch-starter (Wait) nodes — the first Wait found on each incoming path, + * without traversing past it. Empty = no upstream Wait. Size > 1 = the node + * merges two distinct branches. + */ +export function nearestUpstreamWaits( + nodeId: string, + edges: WFEdge[], + nodeMap: Map, +): Set { + const result = new Set() + const seen = new Set() + const stack = edges.filter((e) => e.target === nodeId).map((e) => e.source) + while (stack.length > 0) { + const id = stack.pop()! + if (seen.has(id)) continue + seen.add(id) + if (isBranchStarter(nodeMap.get(id)?.type)) { result.add(id); continue } + for (const e of edges) if (e.target === id) stack.push(e.source) + } + return result +} + +/** + * True if any forward path from `sourceId` reaches a sceneOutput node, walking + * through passthrough nodes along the way. + */ +export function reachesSceneOutput( + sourceId: string, + edges: WFEdge[], + nodeMap: Map, +): boolean { + const stack = [sourceId] + const seen = new Set() + while (stack.length > 0) { + const id = stack.pop()! + if (seen.has(id)) continue + seen.add(id) + for (const e of edges) { + if (e.source !== id) continue + const tType = nodeMap.get(e.target)?.type + if (isSceneOutput(tType)) return true + if (isPassthrough(tType)) stack.push(e.target) + } + } + return false +} diff --git a/src/areas/workflows/nodes/WaitNode.tsx b/src/areas/workflows/nodes/WaitNode.tsx index 8e30302..3f3dff7 100644 --- a/src/areas/workflows/nodes/WaitNode.tsx +++ b/src/areas/workflows/nodes/WaitNode.tsx @@ -1,15 +1,38 @@ import { Handle, Position } from '@xyflow/react' import type { WFNodeData } from '@shared/types/electron.d' -import { useWorkflowRunStore } from '../workflowRunStore' +import { useWaitButton } from '../useWaitButton' import BaseNode from './BaseNode' const HANDLE_STYLE = { background: '#71717a', width: 14, height: 14, border: '2.5px solid #18181b' } export default function WaitNode({ id, data, selected }: { id: string; data: WFNodeData; selected?: boolean }) { - const status = useWorkflowRunStore((s) => s.runState.status) - const activeNodeId = useWorkflowRunStore((s) => s.activeNodeId) - const continueRun = useWorkflowRunStore((s) => s.continueRun) - const isPaused = status === 'paused' && activeNodeId === id + const { waitState, canContinue, isRunning, label, buttonClass, statusText, onContinue } = useWaitButton(id) + + const subheader = waitState ? ( + + ) : undefined return ( } - subheader={isPaused ? ( - - ) : undefined} + subheader={subheader} handles={ <> @@ -43,9 +56,7 @@ export default function WaitNode({ id, data, selected }: { id: string; data: WFN } >

-

- {isPaused ? 'Workflow paused — click Continue to resume.' : 'Pauses the workflow until you click Continue.'} -

+

{statusText}

) diff --git a/src/areas/workflows/preflight.ts b/src/areas/workflows/preflight.ts index 9ee9813..755ce5d 100644 --- a/src/areas/workflows/preflight.ts +++ b/src/areas/workflows/preflight.ts @@ -1,5 +1,6 @@ import type { Workflow, WFNode } from '@shared/types/electron.d' import { getWorkflowExtension, type WorkflowExtension } from './mockExtensions' +import { isPassthrough, resolveDataSource, nearestUpstreamWaits } from './nodeBehaviors' type DataType = 'image' | 'text' | 'mesh' @@ -55,11 +56,17 @@ export function validateWorkflowPreflight( ): WorkflowPreflightIssue[] { const issues: WorkflowPreflightIssue[] = [] const nodeMap = new Map(workflow.nodes.map((node) => [node.id, node])) - const outputTypes = new Map() + const outputTypes = new Map() for (const node of workflow.nodes) { outputTypes.set(node.id, getNodeOutputType(node, allExtensions)) } + // Passthrough nodes inherit their resolved upstream source's type. + for (const node of workflow.nodes) { + if (!isPassthrough(node.type)) continue + const realSourceId = resolveDataSource(node.id, workflow.edges, nodeMap) + if (realSourceId && realSourceId !== node.id) outputTypes.set(node.id, outputTypes.get(realSourceId)) + } for (const node of workflow.nodes) { if (node.type === 'meshNode' && node.data.params?.source === 'current' && !options?.currentMeshUrl) { @@ -70,6 +77,19 @@ export function validateWorkflowPreflight( }) } + // A node fed by two different Wait branches can't be scheduled into a single + // branch — it would run before either branch produces its mesh. + if ( + (node.type === 'extensionNode' || node.type === 'outputNode') && + nearestUpstreamWaits(node.id, workflow.edges, nodeMap).size > 1 + ) { + pushIssue(issues, { + key: `${node.id}:wait-merge`, + nodeId: node.id, + message: `${nodeLabel(node, allExtensions)} merges two Wait branches, which isn't supported. Route it through a single Wait.`, + }) + } + if (node.type !== 'extensionNode') continue const ext = getWorkflowExtension(node.data.extensionId ?? '', allExtensions) diff --git a/src/areas/workflows/useWaitButton.ts b/src/areas/workflows/useWaitButton.ts new file mode 100644 index 0000000..ac94df1 --- /dev/null +++ b/src/areas/workflows/useWaitButton.ts @@ -0,0 +1,47 @@ +import { useWorkflowRunStore } from './workflowRunStore' +import type { WaitState } from './workflowRunStore' + +export interface WaitButtonModel { + waitState: WaitState | undefined + canContinue: boolean + isRunning: boolean + label: 'Retry' | 'Continue' + buttonClass: string + statusText: string + onContinue: () => void +} + +// Shared derivation for the Wait Continue/Retry control, used by both the +// canvas node (WaitNode) and the params panel (WaitParamRow). Keeps the two +// renderings in sync — they differ only in markup/sizing, not in logic. +export function useWaitButton(nodeId: string): WaitButtonModel { + const waitState = useWorkflowRunStore((s) => s.waitStates[nodeId]) + const runningBranchId = useWorkflowRunStore((s) => s.runningBranchId) + const status = useWorkflowRunStore((s) => s.runState.status) + const continueRun = useWorkflowRunStore((s) => s.continueRun) + + const otherBranchRunning = runningBranchId !== null && runningBranchId !== nodeId + // Pre-phase: shared nodes (e.g. Generate Mesh) still running before any branch hands off. + const inPrePhase = status === 'running' && runningBranchId === null + const isRunning = waitState === 'running' + const canContinue = (waitState === 'pending' || waitState === 'done' || waitState === 'error') && !otherBranchRunning && !inPrePhase + const label: 'Retry' | 'Continue' = waitState === 'done' || waitState === 'error' ? 'Retry' : 'Continue' + + const buttonClass = waitState === 'error' + ? 'bg-red-500/15 border-red-500/30 text-red-400 hover:bg-red-500/25' + : waitState === 'done' + ? 'bg-emerald-500/15 border-emerald-500/30 text-emerald-400 hover:bg-emerald-500/25' + : 'bg-amber-500/15 border-amber-500/30 text-amber-400 hover:bg-amber-500/25' + + const statusText = + waitState === 'blocked' ? 'Waiting for the previous Wait to finish…' : + waitState === 'running' ? 'Branch in progress…' : + waitState === 'done' ? 'Branch finished — Retry to re-run.' : + waitState === 'error' ? 'Branch failed — Retry to re-run.' : + waitState === 'pending' && inPrePhase ? 'Waiting for upstream nodes…' : + waitState === 'pending' && otherBranchRunning ? 'Another branch is running…' : + waitState === 'pending' ? 'Workflow paused — click Continue to run this branch.' : + 'Pauses the workflow until you click Continue.' + + return { waitState, canContinue, isRunning, label, buttonClass, statusText, onContinue: () => continueRun(nodeId) } +} diff --git a/src/areas/workflows/useWorkflowRunner.ts b/src/areas/workflows/useWorkflowRunner.ts deleted file mode 100644 index 091ea16..0000000 --- a/src/areas/workflows/useWorkflowRunner.ts +++ /dev/null @@ -1,321 +0,0 @@ -import { useState, useCallback, useRef } from 'react' -import axios from 'axios' -import { useAppStore } from '@shared/stores/appStore' -import type { Workflow, WFNode, WFEdge } from '@shared/types/electron.d' -import { getWorkflowExtension } from './mockExtensions' -import type { WorkflowExtension } from './mockExtensions' - -// ─── Types ──────────────────────────────────────────────────────────────────── - -export interface WorkflowRunState { - status: 'idle' | 'running' | 'done' | 'error' - blockIndex: number - blockTotal: number - blockProgress: number - blockStep: string - outputUrl?: string - outputPath?: string - error?: string -} - -const IDLE: WorkflowRunState = { - status: 'idle', blockIndex: 0, blockTotal: 0, blockProgress: 0, blockStep: '', -} - -// ─── Topological sort (Kahn's algorithm) ───────────────────────────────────── - -function topoSort(nodes: WFNode[], edges: WFEdge[]): WFNode[] { - const nodeMap = new Map(nodes.map((n) => [n.id, n])) - const inDegree = new Map(nodes.map((n) => [n.id, 0])) - const adj = new Map(nodes.map((n) => [n.id, [] as string[]])) - - for (const e of edges) { - if (!nodeMap.has(e.source) || !nodeMap.has(e.target)) continue - adj.get(e.source)!.push(e.target) - inDegree.set(e.target, (inDegree.get(e.target) ?? 0) + 1) - } - - const queue = nodes.filter((n) => (inDegree.get(n.id) ?? 0) === 0) - const result: WFNode[] = [] - - while (queue.length > 0) { - const node = queue.shift()! - result.push(node) - for (const neighbor of adj.get(node.id) ?? []) { - const deg = (inDegree.get(neighbor) ?? 0) - 1 - inDegree.set(neighbor, deg) - if (deg === 0) queue.push(nodeMap.get(neighbor)!) - } - } - - return result -} - -// ─── Hook ───────────────────────────────────────────────────────────────────── - -export function useWorkflowRunner(allExtensions: WorkflowExtension[]) { - const apiUrl = useAppStore((s) => s.apiUrl) - const [runState, setRunState] = useState(IDLE) - const cancelRef = useRef(false) - const activeJobId = useRef(null) - - const run = useCallback(async ( - workflow: Workflow, - imagePath: string, - imageData?: string, - currentMeshUrl?: string, // outputUrl of the mesh currently in the scene (before job is replaced) - ) => { - cancelRef.current = false - - const ordered = topoSort(workflow.nodes, workflow.edges) - const execNodes = ordered.filter((n) => n.type === 'extensionNode' && n.data.enabled) - - setRunState({ - status: 'running', blockIndex: 0, blockTotal: execNodes.length, - blockProgress: 0, blockStep: 'Starting…', - }) - - try { - const client = axios.create({ baseURL: apiUrl }) - const settings = await window.electron.settings.get() - const workspaceDir = settings.workspaceDir.replace(/\\/g, '/') - - // Track outputs per node so branches each get the correct predecessor output. - // outputType distinguishes image files from mesh files for multi-input routing. - const nodeOutputs = new Map() - - // Pre-populate source nodes - for (const node of ordered) { - if (node.type === 'imageNode') nodeOutputs.set(node.id, { filePath: imagePath, outputType: 'image' }) - if (node.type === 'textNode') nodeOutputs.set(node.id, { text: node.data.params?.text as string | undefined }) - if (node.type === 'meshNode') { - const source = node.data.params?.source as 'file' | 'current' | undefined - if (source === 'current') { - if (currentMeshUrl) { - let meshFilePath: string - if (currentMeshUrl.includes('serve-file?path=')) { - const encoded = currentMeshUrl.split('serve-file?path=')[1] - meshFilePath = decodeURIComponent(encoded).replace(/\\/g, '/') - } else { - const rel = currentMeshUrl.replace(/^\/workspace\//, '') - meshFilePath = `${workspaceDir}/${rel}` - } - nodeOutputs.set(node.id, { filePath: meshFilePath, outputType: 'mesh' }) - } - } else { - const fp = node.data.params?.filePath as string | undefined - if (fp) nodeOutputs.set(node.id, { filePath: fp, outputType: 'mesh' }) - } - } - } - - for (let i = 0; i < execNodes.length; i++) { - if (cancelRef.current) { setRunState(IDLE); return } - - const node = execNodes[i] - const ext = getWorkflowExtension(node.data.extensionId ?? '', allExtensions) - - // Resolve this node's inputs from its actual predecessors in the graph. - // For multi-input nodes, route by outputType (image vs mesh). - let nodeInputPath: string | undefined - let nodeInputText: string | undefined - let nodeInputMeshPath: string | undefined // for multi-input: the mesh wire - - const incomingEdges = workflow.edges.filter((e) => e.target === node.id) - if (ext?.inputs && ext.inputs.length > 1) { - // Multi-input: route each edge by the source node's outputType - for (const edge of incomingEdges) { - const src = nodeOutputs.get(edge.source) - if (!src) continue - if (src.outputType === 'mesh') nodeInputMeshPath = src.filePath - else if (src.outputType === 'image') nodeInputPath = src.filePath - else if (src.filePath !== undefined) nodeInputPath = src.filePath - if (src.text !== undefined) nodeInputText = src.text - } - } else { - // Single-input: original behaviour - for (const edge of incomingEdges) { - const src = nodeOutputs.get(edge.source) - if (src?.filePath !== undefined) nodeInputPath = src.filePath - if (src?.text !== undefined) nodeInputText = src.text - } - } - - setRunState((s) => ({ ...s, blockIndex: i, blockProgress: 0, blockStep: 'Starting…' })) - - // Model extensions always go through the HTTP API (job queue, progress, GPU). - // Process extensions always go through IPC runProcess (CPU, synchronous). - const isGeneratorNode = ext?.type === 'model' - - if (isGeneratorNode) { - // ── Generator: call Python FastAPI ────────────────────────────────── - // For multi-input nodes, use the resolved image path (not the global imagePath) - const activeImagePath = nodeInputPath ?? imagePath - const base64 = imageData && nodeInputPath === undefined - ? imageData - : await window.electron.fs.readFileBase64(activeImagePath) - const bytes = Uint8Array.from(atob(base64), (c) => c.charCodeAt(0)) - const blob = new Blob([bytes], { type: 'image/png' }) - const fname = activeImagePath.split(/[\\/]/).pop() ?? 'image.png' - - // For multi-input nodes: inject the mesh input as params.mesh_path - const extraParams: Record = {} - if (nodeInputMeshPath) { - const norm = nodeInputMeshPath.replace(/\\/g, '/') - extraParams.mesh_path = norm.startsWith(workspaceDir) - ? norm.slice(workspaceDir.length).replace(/^\//, '') - : norm - } - - // Merge schema defaults (with per-variant paramDefaults already applied) - // under user overrides so Python receives the effective values, not an - // empty dict that falls back to hardcoded defaults in the generator. - const schemaDefaults = Object.fromEntries( - (ext.params ?? []).map((p) => [p.id, p.default]), - ) - const effectiveParams = { ...schemaDefaults, ...(node.data.params ?? {}) } - - const fd = new FormData() - fd.append('image', blob, fname) - fd.append('model_id', node.data.extensionId ?? '') - fd.append('collection', 'Workflows') - fd.append('remesh', 'none') - fd.append('enable_texture', 'false') - fd.append('texture_resolution', '1024') - fd.append('params', JSON.stringify({ ...effectiveParams, ...extraParams })) - - setRunState((s) => ({ ...s, blockProgress: 5, blockStep: 'Submitting to model…' })) - - const { data } = await client.post<{ job_id: string }>( - '/generate/from-image', fd, - { headers: { 'Content-Type': 'multipart/form-data' } }, - ) - const jobId = data.job_id - activeJobId.current = jobId - - while (true) { - if (cancelRef.current) { - await client.post(`/generate/cancel/${jobId}`).catch(() => {}) - activeJobId.current = null - setRunState(IDLE) - return - } - await new Promise((r) => setTimeout(r, 1200)) - - const { data: st } = await client.get<{ - status: string; progress?: number; step?: string - output_url?: string; error?: string - }>(`/generate/status/${jobId}`) - - if (st.status === 'done' && st.output_url) { - const rel = st.output_url.replace(/^\/workspace\//, '') - nodeInputPath = `${workspaceDir}/${rel}` - activeJobId.current = null - setRunState((s) => ({ ...s, blockProgress: 100, blockStep: 'Generation complete' })) - break - } - if (st.status === 'error') throw new Error(st.error ?? 'Generation failed') - - setRunState((s) => ({ - ...s, - blockProgress: st.progress ?? s.blockProgress, - blockStep: st.step ?? 'Generating…', - })) - } - - } else { - // ── Process extension ──────────────────────────────────────────────── - if (ext?.input === 'mesh' && !nodeInputPath) { - throw new Error(`${ext.name} needs an incoming mesh connection`) - } - if (ext?.input === 'image' && !nodeInputPath) { - throw new Error(`${ext.name} needs an incoming image connection`) - } - if (ext?.input === 'text' && !nodeInputText) { - throw new Error(`${ext.name} needs an incoming text connection`) - } - const parts = (node.data.extensionId ?? '').split('/') - const extId = parts[0] - const nodeId = parts[1] ?? '' - const result = await window.electron.extensions.runProcess( - extId, - { filePath: nodeInputPath, text: nodeInputText, nodeId }, - node.data.params as Record, - ) - if (!result.success) throw new Error(result.error ?? 'Process extension failed') - nodeInputPath = result.result?.filePath ?? nodeInputPath - nodeInputText = result.result?.text ?? nodeInputText - setRunState((s) => ({ ...s, blockProgress: 100, blockStep: 'Done' })) - } - - // Store this node's output so downstream nodes (including other branches) can read it. - // Tag with outputType so multi-input nodes can route by type. - const outputType = ext?.output ?? (nodeInputPath ? 'mesh' : undefined) - nodeOutputs.set(node.id, { filePath: nodeInputPath, text: nodeInputText, outputType }) - } - - // Determine outputUrl: prefer what feeds the outputNode (Add to Scene) - let outputUrl: string | undefined - let outputPath: string | undefined - - const outputNodeDef = ordered.find((n) => n.type === 'outputNode') - if (outputNodeDef) { - for (const edge of workflow.edges.filter((e) => e.target === outputNodeDef.id)) { - const src = nodeOutputs.get(edge.source) - if (src?.filePath) { - const norm = src.filePath.replace(/\\/g, '/') - if (norm.startsWith(workspaceDir)) { - const rel = norm.slice(workspaceDir.length).replace(/^\//, '') - outputUrl = `/workspace/${rel}` - } - } - } - } - - // Fallback: scan all nodes for a workspace file (last one wins) - if (!outputUrl) { - for (const node of execNodes) { - const out = nodeOutputs.get(node.id) - if (out?.filePath) { - const norm = out.filePath.replace(/\\/g, '/') - if (norm.startsWith(workspaceDir)) { - const rel = norm.slice(workspaceDir.length).replace(/^\//, '') - outputUrl = `/workspace/${rel}` - } else { - outputPath = out.filePath - } - } - } - } - - setRunState({ - status: 'done', - blockIndex: execNodes.length - 1, - blockTotal: execNodes.length, - blockProgress: 100, - blockStep: 'Done', - outputUrl, - outputPath, - }) - - } catch (err) { - if (!cancelRef.current) { - setRunState((s) => ({ ...s, status: 'error', error: String(err) })) - } - } - }, [apiUrl, allExtensions]) - - const cancel = useCallback(() => { - cancelRef.current = true - if (activeJobId.current) { - const client = axios.create({ baseURL: apiUrl }) - client.post(`/generate/cancel/${activeJobId.current}`).catch(() => {}) - activeJobId.current = null - } - setRunState(IDLE) - }, [apiUrl]) - - const reset = useCallback(() => setRunState(IDLE), []) - - return { runState, run, cancel, reset } -} diff --git a/src/areas/workflows/workflowRunStore.ts b/src/areas/workflows/workflowRunStore.ts index d58ac48..f996b34 100644 --- a/src/areas/workflows/workflowRunStore.ts +++ b/src/areas/workflows/workflowRunStore.ts @@ -1,9 +1,10 @@ import { create } from 'zustand' -import axios from 'axios' +import axios, { AxiosInstance } from 'axios' import { useAppStore } from '@shared/stores/appStore' import { getWorkflowExtension } from './mockExtensions' import type { WorkflowExtension } from './mockExtensions' import type { Workflow, WFNode, WFEdge } from '@shared/types/electron.d' +import { isBranchStarter, isSceneOutput, resolveDataSource, reachesSceneOutput, nearestUpstreamWaits } from './nodeBehaviors' // ─── Types ──────────────────────────────────────────────────────────────────── @@ -18,401 +19,570 @@ export interface WorkflowRunState { error?: string } +export type WaitState = 'blocked' | 'pending' | 'running' | 'done' | 'error' + const IDLE: WorkflowRunState = { status: 'idle', blockIndex: 0, blockTotal: 0, blockProgress: 0, blockStep: '', } -// Module-level refs — survive component unmounts / navigation +// ─── Module-level run context (survives between run() and continueRun(id)) ─── + const _cancel = { current: false } const _activeJobId = { current: null as string | null } -const _resume = { current: null as (() => void) | null } -function flushResume(): void { - const fn = _resume.current - if (!fn) return - _resume.current = null - fn() +interface NodeOutput { filePath?: string; text?: string; outputType?: string } + +interface RunContext { + workflow: Workflow + allExtensions: WorkflowExtension[] + client: AxiosInstance + workspaceDir: string + selectedImagePath: string + selectedImageData?: string + overrideImageData?: string + nodeOutputs: Map + nodeMap: Map + /** nodes in execution (topological) order */ + ordered: WFNode[] + branches: Map + waitIds: string[] + /** waitId → nearest upstream waitId (null = top-level, runnable from the start) */ + parentWait: Map + /** workspace URL of the most recently pushed scene mesh (last branch the user ran wins) */ + lastSceneMesh?: string } +const _ctx = { current: null as RunContext | null } -// ─── Topological sort ───────────────────────────────────────────────────────── +// ─── Topological sort (DFS preorder, branch-first) ─────────────────────────── function topoSort(nodes: WFNode[], edges: WFEdge[]): WFNode[] { - const nodeMap = new Map(nodes.map((n) => [n.id, n])) - const inDegree = new Map(nodes.map((n) => [n.id, 0])) - const adj = new Map(nodes.map((n) => [n.id, [] as string[]])) + const nodeMap = new Map(nodes.map((n) => [n.id, n])) + const adj = new Map(nodes.map((n) => [n.id, [] as string[]])) + const inDeg = new Map(nodes.map((n) => [n.id, 0])) for (const e of edges) { if (!nodeMap.has(e.source) || !nodeMap.has(e.target)) continue adj.get(e.source)!.push(e.target) - inDegree.set(e.target, (inDegree.get(e.target) ?? 0) + 1) + inDeg.set(e.target, (inDeg.get(e.target) ?? 0) + 1) } - const queue = nodes.filter((n) => (inDegree.get(n.id) ?? 0) === 0) + + const visited = new Set() const result: WFNode[] = [] - while (queue.length > 0) { - const node = queue.shift()! - result.push(node) - for (const neighbor of adj.get(node.id) ?? []) { - const deg = (inDegree.get(neighbor) ?? 0) - 1 - inDegree.set(neighbor, deg) - if (deg === 0) queue.push(nodeMap.get(neighbor)!) + + const visit = (id: string): void => { + if (visited.has(id)) return + for (const e of edges) { + if (e.target === id && !visited.has(e.source) && nodeMap.has(e.source)) return } + const node = nodeMap.get(id) + if (!node) return + visited.add(id) + result.push(node) + for (const childId of adj.get(id) ?? []) visit(childId) } + + for (const node of nodes) if ((inDeg.get(node.id) ?? 0) === 0) visit(node.id) + for (const node of nodes) if (!visited.has(node.id)) visit(node.id) return result } +// ─── Branch identification ──────────────────────────────────────────────────── +// A node belongs to Wait W's branch if its single nearest upstream Wait is W +// (dominance). Nodes with no upstream Wait — or with multiple (merges) — execute +// in the pre-phase before any user pause. + +function identifyBranches(workflow: Workflow): { + preExecExtNodes: WFNode[] + branches: Map + waitIds: string[] + parentWait: Map + ordered: WFNode[] +} { + const ordered = topoSort(workflow.nodes, workflow.edges) + const nodeMap = new Map(workflow.nodes.map((n) => [n.id, n])) + const waitIds = ordered.filter((n) => isBranchStarter(n.type)).map((n) => n.id) + + // A node is owned by its single nearest upstream Wait (dominance). This lets + // Wait → … → Wait chains nest: nodes after the 2nd Wait belong to it, not the 1st. + const branchOwner = new Map() + for (const node of workflow.nodes) { + if (isBranchStarter(node.type)) continue + const nearest = nearestUpstreamWaits(node.id, workflow.edges, nodeMap) + if (nearest.size === 1) branchOwner.set(node.id, [...nearest][0]) + } + + // Each Wait's parent = its own nearest upstream Wait (null if top-level). + const parentWait = new Map() + for (const w of waitIds) { + const nearest = nearestUpstreamWaits(w, workflow.edges, nodeMap) + parentWait.set(w, nearest.size === 1 ? [...nearest][0] : null) + } + + const branches = new Map() + for (const w of waitIds) branches.set(w, []) + const preExecExtNodes: WFNode[] = [] + for (const node of ordered) { + if (node.type !== 'extensionNode' || !node.data.enabled) continue + const owner = branchOwner.get(node.id) + if (owner) branches.get(owner)!.push(node) + else preExecExtNodes.push(node) + } + + return { preExecExtNodes, branches, waitIds, parentWait, ordered } +} + +// ─── Per-node execution ────────────────────────────────────────────────────── +// Resolves inputs (walking through Wait passthroughs), runs the extension +// (model or process), updates nodeOutputs, and pushes the mesh to the scene +// if it feeds an Add-to-Scene through Waits. + +async function executeExtensionNode( + node: WFNode, + ctx: RunContext, + setRunState: (updater: (s: WorkflowRunState) => WorkflowRunState) => void, +): Promise { + const { workflow, allExtensions, client, workspaceDir, nodeOutputs, nodeMap, + selectedImagePath, selectedImageData } = ctx + + const ext = getWorkflowExtension(node.data.extensionId ?? '', allExtensions) + + const resolveSource = (sourceId: string): NodeOutput | undefined => { + const realId = resolveDataSource(sourceId, workflow.edges, nodeMap) + return realId ? nodeOutputs.get(realId) : undefined + } + + let nodeInputPath: string | undefined + let nodeInputText: string | undefined + let nodeInputMeshPath: string | undefined + + const incomingEdges = workflow.edges.filter((e) => e.target === node.id) + + if (ext?.inputs && ext.inputs.length > 1) { + for (const edge of incomingEdges) { + const src = resolveSource(edge.source) + if (!src) continue + if (src.outputType === 'mesh') nodeInputMeshPath = src.filePath + else if (src.outputType === 'image') nodeInputPath = src.filePath + else if (src.filePath !== undefined) nodeInputPath = src.filePath + if (src.text !== undefined) nodeInputText = src.text + } + } else { + for (const edge of incomingEdges) { + const src = resolveSource(edge.source) + if (src?.filePath !== undefined) nodeInputPath = src.filePath + if (src?.text !== undefined) nodeInputText = src.text + } + } + + const isModelNode = ext?.type === 'model' + + if (isModelNode) { + const activeImagePath = nodeInputPath ?? selectedImagePath + const base64 = selectedImageData && nodeInputPath === undefined + ? selectedImageData + : await window.electron.fs.readFileBase64(activeImagePath) + const bytes = Uint8Array.from(atob(base64), (c) => c.charCodeAt(0)) + const blob = new Blob([bytes], { type: 'image/png' }) + const fname = activeImagePath.split(/[\\/]/).pop() ?? 'image.png' + + const extraParams: Record = {} + if (nodeInputMeshPath) { + const norm = nodeInputMeshPath.replace(/\\/g, '/') + extraParams.mesh_path = norm.startsWith(workspaceDir) + ? norm.slice(workspaceDir.length).replace(/^\//, '') + : norm + } + + const schemaDefaults = Object.fromEntries( + (ext.params ?? []).map((p) => [p.id, p.default]), + ) + const effectiveParams = { ...schemaDefaults, ...(node.data.params ?? {}) } + + const fd = new FormData() + fd.append('image', blob, fname) + fd.append('model_id', node.data.extensionId ?? '') + fd.append('collection', 'Workflows') + fd.append('remesh', 'none') + fd.append('enable_texture', 'false') + fd.append('texture_resolution', '1024') + fd.append('params', JSON.stringify({ ...effectiveParams, ...extraParams })) + + setRunState((s) => ({ ...s, blockProgress: 5, blockStep: 'Submitting to model…' })) + + const { data } = await client.post<{ job_id: string }>( + '/generate/from-image', fd, + { headers: { 'Content-Type': 'multipart/form-data' } }, + ) + _activeJobId.current = data.job_id + + while (true) { + if (_cancel.current) { + await client.post(`/generate/cancel/${_activeJobId.current}`).catch(() => {}) + _activeJobId.current = null + throw new Error('Cancelled') + } + await new Promise((r) => setTimeout(r, 1200)) + + const { data: st } = await client.get<{ + status: string; progress?: number; step?: string; output_url?: string; error?: string + }>(`/generate/status/${_activeJobId.current}`) + + if (st.status === 'done' && st.output_url) { + const rel = st.output_url.replace(/^\/workspace\//, '') + nodeInputPath = `${workspaceDir}/${rel}` + _activeJobId.current = null + setRunState((s) => ({ ...s, blockProgress: 100, blockStep: 'Generation complete' })) + break + } + if (st.status === 'error') throw new Error(st.error ?? 'Generation failed') + + setRunState((s) => ({ ...s, blockProgress: st.progress ?? s.blockProgress, blockStep: st.step ?? 'Generating…' })) + useAppStore.getState().updateCurrentJob({ status: 'generating', progress: st.progress, step: st.step }) + } + } else { + if (ext?.input === 'mesh' && !nodeInputPath) throw new Error(`${ext.name} needs an incoming mesh connection`) + if (ext?.input === 'image' && !nodeInputPath) throw new Error(`${ext.name} needs an incoming image connection`) + if (ext?.input === 'text' && !nodeInputText) throw new Error(`${ext.name} needs an incoming text connection`) + + const parts = (node.data.extensionId ?? '').split('/') + const extId = parts[0] + const nid = parts[1] ?? '' + const result = await window.electron.extensions.runProcess( + extId, + { filePath: nodeInputPath, text: nodeInputText, nodeId: nid }, + node.data.params as Record, + ) + if (!result.success) throw new Error(result.error ?? 'Process extension failed') + nodeInputPath = result.result?.filePath ?? nodeInputPath + nodeInputText = result.result?.text ?? nodeInputText + setRunState((s) => ({ ...s, blockProgress: 100, blockStep: 'Done' })) + } + + const outputType = ext?.output ?? (nodeInputPath ? 'mesh' : undefined) + nodeOutputs.set(node.id, { filePath: nodeInputPath, text: nodeInputText, outputType }) + + const norm = nodeInputPath?.replace(/\\/g, '/') + if (norm?.startsWith(workspaceDir) && reachesSceneOutput(node.id, workflow.edges, nodeMap)) { + const url = `/workspace/${norm.slice(workspaceDir.length).replace(/^\//, '')}` + ctx.lastSceneMesh = url // remember it so finalize() keeps the last-run branch in view + useAppStore.getState().updateCurrentJob({ status: 'done', progress: 100, outputUrl: url }) + } +} + +// ─── Wait dependency helpers ─────────────────────────────────────────────────── + +/** All Waits nested (transitively) under `rootId`, via the parentWait chain. */ +function descendantWaits(rootId: string, ctx: RunContext): Set { + const out = new Set() + let frontier = new Set([rootId]) + while (frontier.size > 0) { + const next = new Set() + for (const w of ctx.waitIds) { + const parent = ctx.parentWait.get(w) + if (parent && frontier.has(parent) && !out.has(w)) { out.add(w); next.add(w) } + } + frontier = next + } + return out +} + // ─── Store ──────────────────────────────────────────────────────────────────── interface WorkflowRunStore { runState: WorkflowRunState activeNodeId: string | null activeWorkflowId: string | null - /** nodeId → workspace URL for image outputs (populated after each run) */ nodeImageOutputs: Record + waitStates: Record + runningBranchId: string | null run: (workflow: Workflow, allExtensions: WorkflowExtension[], overrideImageData?: string) => Promise cancel: () => void reset: () => void - continueRun: () => void + continueRun: (waitId: string) => Promise } -export const useWorkflowRunStore = create((set) => ({ - runState: IDLE, - activeNodeId: null, - activeWorkflowId: null, - nodeImageOutputs: {}, - - async run(workflow, allExtensions, overrideImageData?) { - _cancel.current = false - - const appState = useAppStore.getState() - const apiUrl = appState.apiUrl - const ordered = topoSort(workflow.nodes, workflow.edges) - const execNodes = ordered.filter((n) => - (n.type === 'extensionNode' || n.type === 'waitNode') && n.data.enabled, - ) +export const useWorkflowRunStore = create((set, get) => { + const setRunState = (updater: (s: WorkflowRunState) => WorkflowRunState): void => { + set((s) => ({ runState: updater(s.runState) })) + } - const selectedImagePath = appState.selectedImagePath ?? '' - const selectedImageData = overrideImageData ?? appState.selectedImageData ?? undefined - const currentMeshUrl = appState.currentJob?.outputUrl - - set({ - activeWorkflowId: workflow.id, - nodeImageOutputs: {}, - runState: { status: 'running', blockIndex: 0, blockTotal: execNodes.length, blockProgress: 0, blockStep: 'Starting…' }, - }) - - appState.setCurrentJob({ - id: crypto.randomUUID(), - imageFile: selectedImagePath, - status: 'generating', - progress: 0, - createdAt: Date.now(), - }) - - try { - const client = axios.create({ baseURL: apiUrl }) - const settings = await window.electron.settings.get() - const workspaceDir = settings.workspaceDir.replace(/\\/g, '/') - - // Clean up tmp folder from previous run - const tmpAbsPath = settings.workspaceDir.replace(/[\\/]+$/, '') + '/tmp' - window.electron.fs.deleteDirectory(tmpAbsPath).catch(() => {}) - - // nodeId → { filePath, text, outputType } - const nodeOutputs = new Map() - const outputNodeIds = new Set(ordered.filter((n) => n.type === 'outputNode').map((n) => n.id)) - - // Pre-populate source nodes - for (const node of ordered) { - if (node.type === 'imageNode') { - const fp = node.data.params?.filePath as string | undefined - // When the agent provides an override image, ignore any hardcoded filePath so the - // model node falls through to selectedImageData (= overrideImageData). - const resolvedPath = overrideImageData ? undefined : (fp ?? selectedImagePath ?? undefined) - nodeOutputs.set(node.id, { filePath: resolvedPath, outputType: 'image' }) + const collectImageOutputs = (ctx: RunContext): Record => { + const out: Record = {} + for (const [nodeId, o] of ctx.nodeOutputs) { + if (o.outputType === 'image' && o.filePath) { + const norm = o.filePath.replace(/\\/g, '/') + if (norm.startsWith(ctx.workspaceDir)) { + out[nodeId] = `/workspace/${norm.slice(ctx.workspaceDir.length).replace(/^\//, '')}` } - if (node.type === 'textNode') { - nodeOutputs.set(node.id, { text: node.data.params?.text as string | undefined }) + } + } + return out + } + + const finalize = (ctx: RunContext, finalWaitStates?: Record): void => { + // Prefer the mesh of the last branch the user actually ran — it's already in the + // viewer, and topo order must not override the user's last action. + let outputUrl: string | undefined = ctx.lastSceneMesh + let outputPath: string | undefined + + const lastOutputNode = outputUrl ? undefined : [...ctx.ordered].reverse().find((n) => isSceneOutput(n.type)) + if (lastOutputNode) { + for (const edge of ctx.workflow.edges.filter((e) => e.target === lastOutputNode.id)) { + const src = ctx.nodeOutputs.get(edge.source) + if (src?.filePath) { + const norm = src.filePath.replace(/\\/g, '/') + if (norm.startsWith(ctx.workspaceDir)) { + outputUrl = `/workspace/${norm.slice(ctx.workspaceDir.length).replace(/^\//, '')}` + } } - if (node.type === 'meshNode') { - const source = node.data.params?.source as 'file' | 'current' | undefined - if (source === 'current' && currentMeshUrl) { - let meshFilePath: string - if (currentMeshUrl.includes('serve-file?path=')) { - // URL like /optimize/serve-file?path=D%3A%5C... → extract and decode the real path - const encoded = currentMeshUrl.split('serve-file?path=')[1] - meshFilePath = decodeURIComponent(encoded).replace(/\\/g, '/') - } else { - // URL like /workspace/Workflows/file.glb → resolve to absolute path - const rel = currentMeshUrl.replace(/^\/workspace\//, '') - meshFilePath = `${workspaceDir}/${rel}` - } - nodeOutputs.set(node.id, { filePath: meshFilePath, outputType: 'mesh' }) + } + } + if (!outputUrl) { + for (const [, o] of ctx.nodeOutputs) { + if (o.filePath) { + const norm = o.filePath.replace(/\\/g, '/') + if (norm.startsWith(ctx.workspaceDir)) { + outputUrl = `/workspace/${norm.slice(ctx.workspaceDir.length).replace(/^\//, '')}` } else { - const fp = node.data.params?.filePath as string | undefined - if (fp) nodeOutputs.set(node.id, { filePath: fp, outputType: 'mesh' }) + outputPath = o.filePath } } } + } - for (let i = 0; i < execNodes.length; i++) { - if (_cancel.current) { set({ runState: IDLE, activeNodeId: null }); return } + set((s) => ({ + activeNodeId: null, + runningBranchId: null, + waitStates: finalWaitStates ?? s.waitStates, + nodeImageOutputs: collectImageOutputs(ctx), + runState: { + status: 'done', + blockIndex: 0, + blockTotal: 0, + blockProgress: 100, + blockStep: 'Done', + outputUrl, + outputPath, + }, + })) + useAppStore.getState().updateCurrentJob({ status: 'done', progress: 100, outputUrl }) + } - const node = execNodes[i] - const ext = getWorkflowExtension(node.data.extensionId ?? '', allExtensions) + return { + runState: IDLE, + activeNodeId: null, + activeWorkflowId: null, + nodeImageOutputs: {}, + waitStates: {}, + runningBranchId: null, - // ── Resolve inputs ──────────────────────────────────────────────── - let nodeInputPath: string | undefined - let nodeInputText: string | undefined - let nodeInputMeshPath: string | undefined + async run(workflow, allExtensions, overrideImageData?) { + _cancel.current = false - const incomingEdges = workflow.edges.filter((e) => e.target === node.id) + const appState = useAppStore.getState() + const apiUrl = appState.apiUrl - if (ext?.inputs && ext.inputs.length > 1) { - // Multi-input: route each incoming edge by the source node's outputType - for (const edge of incomingEdges) { - const src = nodeOutputs.get(edge.source) - if (!src) continue - if (src.outputType === 'mesh') nodeInputMeshPath = src.filePath - else if (src.outputType === 'image') nodeInputPath = src.filePath - else if (src.filePath !== undefined) nodeInputPath = src.filePath - if (src.text !== undefined) nodeInputText = src.text - } - } else { - // Single-input - for (const edge of incomingEdges) { - const src = nodeOutputs.get(edge.source) - if (src?.filePath !== undefined) nodeInputPath = src.filePath - if (src?.text !== undefined) nodeInputText = src.text - } - } + const { preExecExtNodes, branches, waitIds, parentWait, ordered } = identifyBranches(workflow) + const branchSteps = waitIds.reduce((acc, w) => acc + (branches.get(w)?.length ?? 0), 0) + const totalSteps = preExecExtNodes.length + branchSteps - set((s) => ({ - activeNodeId: node.id, - runState: { ...s.runState, blockIndex: i, blockProgress: 0, blockStep: 'Starting…' }, - })) - - // ── Wait node → pause until continueRun(), then passthrough ─────── - if (node.type === 'waitNode') { - set((s) => ({ runState: { ...s.runState, status: 'paused', blockStep: 'Paused — click Continue' } })) - await new Promise((resolve) => { _resume.current = resolve }) - if (_cancel.current) { set({ runState: IDLE, activeNodeId: null }); return } - - nodeOutputs.set(node.id, { - filePath: nodeInputPath, - text: nodeInputText, - outputType: incomingEdges[0] ? nodeOutputs.get(incomingEdges[0].source)?.outputType : undefined, - }) - set((s) => ({ runState: { ...s.runState, status: 'running' } })) - continue - } + const selectedImagePath = appState.selectedImagePath ?? '' + const selectedImageData = overrideImageData ?? appState.selectedImageData ?? undefined + const currentMeshUrl = appState.currentJob?.outputUrl - // ── Model extensions → HTTP API ─────────────────────────────────── - // Process extensions → IPC runProcess - const isModelNode = ext?.type === 'model' - - if (isModelNode) { - const activeImagePath = nodeInputPath ?? selectedImagePath - const base64 = selectedImageData && nodeInputPath === undefined - ? selectedImageData - : await window.electron.fs.readFileBase64(activeImagePath) - const bytes = Uint8Array.from(atob(base64), (c) => c.charCodeAt(0)) - const blob = new Blob([bytes], { type: 'image/png' }) - const fname = activeImagePath.split(/[\\/]/).pop() ?? 'image.png' - - // For multi-input nodes: inject mesh path as params.mesh_path - const extraParams: Record = {} - if (nodeInputMeshPath) { - const norm = nodeInputMeshPath.replace(/\\/g, '/') - extraParams.mesh_path = norm.startsWith(workspaceDir) - ? norm.slice(workspaceDir.length).replace(/^\//, '') - : norm - } + set({ + activeWorkflowId: workflow.id, + nodeImageOutputs: {}, + // Top-level Waits are pending; nested Waits start blocked until their parent finishes. + waitStates: Object.fromEntries(waitIds.map((id) => [id, parentWait.get(id) ? 'blocked' as WaitState : 'pending' as WaitState])), + runningBranchId: null, + runState: { + status: 'running', blockIndex: 0, blockTotal: totalSteps, + blockProgress: 0, blockStep: 'Starting…', + }, + }) - // Merge schema defaults (with per-variant paramDefaults already applied) - // under user overrides so Python receives the effective values, not an - // empty dict that falls back to hardcoded defaults in the generator. - const schemaDefaults = Object.fromEntries( - (ext.params ?? []).map((p) => [p.id, p.default]), - ) - const effectiveParams = { ...schemaDefaults, ...(node.data.params ?? {}) } - - const fd = new FormData() - fd.append('image', blob, fname) - fd.append('model_id', node.data.extensionId ?? '') - fd.append('collection', 'Workflows') - fd.append('remesh', 'none') - fd.append('enable_texture', 'false') - fd.append('texture_resolution', '1024') - fd.append('params', JSON.stringify({ ...effectiveParams, ...extraParams })) - - set((s) => ({ runState: { ...s.runState, blockProgress: 5, blockStep: 'Submitting to model…' } })) - - const { data } = await client.post<{ job_id: string }>( - '/generate/from-image', fd, - { headers: { 'Content-Type': 'multipart/form-data' } }, - ) - _activeJobId.current = data.job_id - - while (true) { - if (_cancel.current) { - await client.post(`/generate/cancel/${_activeJobId.current}`).catch(() => {}) - _activeJobId.current = null - set({ runState: IDLE, activeNodeId: null }) - return - } - await new Promise((r) => setTimeout(r, 1200)) - - const { data: st } = await client.get<{ - status: string; progress?: number; step?: string; output_url?: string; error?: string - }>(`/generate/status/${_activeJobId.current}`) - - if (st.status === 'done' && st.output_url) { - const rel = st.output_url.replace(/^\/workspace\//, '') - nodeInputPath = `${workspaceDir}/${rel}` - _activeJobId.current = null - set((s) => ({ runState: { ...s.runState, blockProgress: 100, blockStep: 'Generation complete' } })) - break - } - if (st.status === 'error') throw new Error(st.error ?? 'Generation failed') - - const total = execNodes.length - const overall = total > 0 - ? Math.round((i / total) * 100 + (st.progress ?? 0) / total) - : st.progress ?? 0 - set((s) => ({ - runState: { ...s.runState, blockProgress: st.progress ?? s.runState.blockProgress, blockStep: st.step ?? 'Generating…' }, - })) - useAppStore.getState().updateCurrentJob({ status: 'generating', progress: overall, step: st.step }) - } + appState.setCurrentJob({ + id: crypto.randomUUID(), + imageFile: selectedImagePath, + status: 'generating', + progress: 0, + createdAt: Date.now(), + }) - } else { - // ── Process extension → IPC ───────────────────────────────────── - if (ext?.input === 'mesh' && !nodeInputPath) { - throw new Error(`${ext.name} needs an incoming mesh connection`) + try { + const client = axios.create({ baseURL: apiUrl }) + const settings = await window.electron.settings.get() + const workspaceDir = settings.workspaceDir.replace(/\\/g, '/') + + const tmpAbsPath = settings.workspaceDir.replace(/[\\/]+$/, '') + '/tmp' + window.electron.fs.deleteDirectory(tmpAbsPath).catch(() => {}) + + const nodeOutputs = new Map() + const nodeMap = new Map(workflow.nodes.map((n) => [n.id, n])) + + // Pre-populate source nodes + for (const node of ordered) { + if (node.type === 'imageNode') { + const fp = node.data.params?.filePath as string | undefined + const resolvedPath = overrideImageData ? undefined : (fp ?? selectedImagePath ?? undefined) + nodeOutputs.set(node.id, { filePath: resolvedPath, outputType: 'image' }) } - if (ext?.input === 'image' && !nodeInputPath) { - throw new Error(`${ext.name} needs an incoming image connection`) + if (node.type === 'textNode') { + nodeOutputs.set(node.id, { text: node.data.params?.text as string | undefined }) } - if (ext?.input === 'text' && !nodeInputText) { - throw new Error(`${ext.name} needs an incoming text connection`) + if (node.type === 'meshNode') { + const source = node.data.params?.source as 'file' | 'current' | undefined + if (source === 'current' && currentMeshUrl) { + let meshFilePath: string + if (currentMeshUrl.includes('serve-file?path=')) { + const encoded = currentMeshUrl.split('serve-file?path=')[1] + meshFilePath = decodeURIComponent(encoded).replace(/\\/g, '/') + } else { + const rel = currentMeshUrl.replace(/^\/workspace\//, '') + meshFilePath = `${workspaceDir}/${rel}` + } + nodeOutputs.set(node.id, { filePath: meshFilePath, outputType: 'mesh' }) + } else { + const fp = node.data.params?.filePath as string | undefined + if (fp) nodeOutputs.set(node.id, { filePath: fp, outputType: 'mesh' }) + } } - const parts = (node.data.extensionId ?? '').split('/') - const extId = parts[0] - const nodeId = parts[1] ?? '' - const result = await window.electron.extensions.runProcess( - extId, - { filePath: nodeInputPath, text: nodeInputText, nodeId }, - node.data.params as Record, - ) - if (!result.success) throw new Error(result.error ?? 'Process extension failed') - nodeInputPath = result.result?.filePath ?? nodeInputPath - nodeInputText = result.result?.text ?? nodeInputText - set((s) => ({ runState: { ...s.runState, blockProgress: 100, blockStep: 'Done' } })) } - // Store output with type for downstream routing - const outputType = ext?.output ?? (nodeInputPath ? 'mesh' : undefined) - nodeOutputs.set(node.id, { filePath: nodeInputPath, text: nodeInputText, outputType }) - - // If this node feeds an Add-to-Scene, push the mesh to currentJob - // immediately so the 3D viewer loads it without waiting for the rest of the run. - const norm = nodeInputPath?.replace(/\\/g, '/') - if ( - norm?.startsWith(workspaceDir) && - workflow.edges.some((e) => e.source === node.id && outputNodeIds.has(e.target)) - ) { - useAppStore.getState().updateCurrentJob({ - status: 'done', - progress: 100, - outputUrl: `/workspace/${norm.slice(workspaceDir.length).replace(/^\//, '')}`, - }) + const ctx: RunContext = { + workflow, allExtensions, client, workspaceDir, selectedImagePath, selectedImageData, + overrideImageData, nodeOutputs, nodeMap, ordered, branches, waitIds, parentWait, + } + _ctx.current = ctx + + // Pre-phase: nodes that don't belong to any single branch (sources + merges). + for (let i = 0; i < preExecExtNodes.length; i++) { + if (_cancel.current) { _ctx.current = null; set({ runState: IDLE, activeNodeId: null }); return } + const node = preExecExtNodes[i] + set((s) => ({ + activeNodeId: node.id, + runState: { ...s.runState, blockIndex: i, blockProgress: 0, blockStep: 'Starting…' }, + })) + await executeExtensionNode(node, ctx, setRunState) } - } - // ── Collect image outputs for preview nodes ─────────────────────── - const imageOutputs: Record = {} - for (const [nodeId, out] of nodeOutputs) { - if (out.outputType === 'image' && out.filePath) { - const norm = out.filePath.replace(/\\/g, '/') - if (norm.startsWith(workspaceDir)) { - imageOutputs[nodeId] = `/workspace/${norm.slice(workspaceDir.length).replace(/^\//, '')}` - } + if (waitIds.length > 0) { + // Hand off to the user — branches run on demand via continueRun(id). + set((s) => ({ + activeNodeId: null, + runState: { ...s.runState, status: 'paused', blockStep: 'Pick a branch and click Continue' }, + })) + return + } + + finalize(ctx) + } catch (err) { + if (!_cancel.current) { + set((s) => ({ runState: { ...s.runState, status: 'error', error: String(err) }, activeNodeId: null })) + useAppStore.getState().updateCurrentJob({ status: 'error', error: String(err) }) } } + }, + + async continueRun(waitId) { + const state = get() + if (state.runningBranchId !== null) return + // Only runnable Waits: blocked (parent not done) and running are not. + const ws = state.waitStates[waitId] + if (ws !== 'pending' && ws !== 'done' && ws !== 'error') return + const ctx = _ctx.current + if (!ctx) return + + const branch = ctx.branches.get(waitId) ?? [] + // Re-running a Wait invalidates everything downstream: descendant branches + // were computed against the old output, so drop their outputs and reset + // them to blocked until this branch produces a fresh result. + const descendants = descendantWaits(waitId, ctx) + + _cancel.current = false + + // Reset outputs for this branch's nodes so Retry re-executes cleanly. + for (const node of branch) ctx.nodeOutputs.delete(node.id) + for (const d of descendants) for (const node of ctx.branches.get(d) ?? []) ctx.nodeOutputs.delete(node.id) + + set((s) => { + const waitStates = { ...s.waitStates, [waitId]: 'running' as WaitState } + for (const d of descendants) waitStates[d] = 'blocked' + return { + runningBranchId: waitId, + waitStates, + runState: { ...s.runState, status: 'running', blockIndex: 0, blockTotal: branch.length, blockProgress: 0, blockStep: branch.length === 0 ? 'Done' : 'Starting…' }, + } + }) - // ── Resolve final output URL ────────────────────────────────────── - let outputUrl: string | undefined - let outputPath: string | undefined - - // Use the last AddToScene in topo order — its predecessor is the final scene mesh. - const outputNodeDef = [...ordered].reverse().find((n) => n.type === 'outputNode') - if (outputNodeDef) { - for (const edge of workflow.edges.filter((e) => e.target === outputNodeDef.id)) { - const src = nodeOutputs.get(edge.source) - if (src?.filePath) { - const norm = src.filePath.replace(/\\/g, '/') - if (norm.startsWith(workspaceDir)) { - outputUrl = `/workspace/${norm.slice(workspaceDir.length).replace(/^\//, '')}` - } + const finishBranch = (next: WaitState, err?: string): void => { + if (_cancel.current) return + const newWaitStates = { ...get().waitStates, [waitId]: next } + // Unblock nested Waits whose parent branch just finished. + if (next === 'done') { + for (const w of ctx.waitIds) { + if (ctx.parentWait.get(w) === waitId && newWaitStates[w] === 'blocked') newWaitStates[w] = 'pending' } } - } - if (!outputUrl) { - for (const node of execNodes) { - const out = nodeOutputs.get(node.id) - if (out?.filePath) { - const norm = out.filePath.replace(/\\/g, '/') - if (norm.startsWith(workspaceDir)) { - outputUrl = `/workspace/${norm.slice(workspaceDir.length).replace(/^\//, '')}` - } else { - outputPath = out.filePath - } + // A failed branch can never feed its descendants — surface them as error + // too, otherwise they stay 'blocked' and the run hangs on 'paused' forever. + if (next === 'error') { + for (const d of descendantWaits(waitId, ctx)) { + if (newWaitStates[d] === 'blocked') newWaitStates[d] = 'error' } } - } + const allFinished = ctx.waitIds.every((id) => newWaitStates[id] === 'done' || newWaitStates[id] === 'error') + const anyError = ctx.waitIds.some((id) => newWaitStates[id] === 'error') - set({ - activeNodeId: null, - nodeImageOutputs: imageOutputs, - runState: { - status: 'done', - blockIndex: execNodes.length > 0 ? execNodes.length - 1 : 0, - blockTotal: execNodes.length, - blockProgress: 100, - blockStep: 'Done', - outputUrl, - outputPath, - }, - }) - useAppStore.getState().updateCurrentJob({ status: 'done', progress: 100, outputUrl }) + if (allFinished && !anyError) { + finalize(ctx, newWaitStates) + } else { + set((s) => ({ + activeNodeId: null, + runningBranchId: null, + waitStates: newWaitStates, + runState: { + ...s.runState, + status: allFinished ? 'error' : 'paused', + error: err ?? s.runState.error, + blockStep: err ? `Branch failed: ${err}` : 'Pick a branch and click Continue', + }, + })) + } + } - } catch (err) { - if (!_cancel.current) { - set((s) => ({ runState: { ...s.runState, status: 'error', error: String(err) }, activeNodeId: null })) - useAppStore.getState().updateCurrentJob({ status: 'error', error: String(err) }) + try { + for (let i = 0; i < branch.length; i++) { + if (_cancel.current) return + const node = branch[i] + set((s) => ({ + activeNodeId: node.id, + runState: { ...s.runState, blockIndex: i, blockProgress: 0, blockStep: 'Starting…' }, + })) + await executeExtensionNode(node, ctx, setRunState) + } + finishBranch('done') + } catch (err) { + finishBranch('error', String(err)) } - } - }, - - cancel() { - _cancel.current = true - flushResume() - if (_activeJobId.current) { - const apiUrl = useAppStore.getState().apiUrl - axios.create({ baseURL: apiUrl }).post(`/generate/cancel/${_activeJobId.current}`).catch(() => {}) - _activeJobId.current = null - } - set({ runState: IDLE, activeNodeId: null, activeWorkflowId: null, nodeImageOutputs: {} }) - // Clear the generation HUD so it doesn't show stale progress after cancel. - // The backend's subprocess hard-kill is asynchronous; the UI shouldn't wait. - useAppStore.getState().setCurrentJob(null) - }, - - reset() { - set({ runState: IDLE, activeNodeId: null, activeWorkflowId: null, nodeImageOutputs: {} }) - }, - - continueRun() { - flushResume() - }, -})) + }, + + cancel() { + _cancel.current = true + if (_activeJobId.current) { + const apiUrl = useAppStore.getState().apiUrl + axios.create({ baseURL: apiUrl }).post(`/generate/cancel/${_activeJobId.current}`).catch(() => {}) + _activeJobId.current = null + } + _ctx.current = null + set({ runState: IDLE, activeNodeId: null, activeWorkflowId: null, nodeImageOutputs: {}, waitStates: {}, runningBranchId: null }) + useAppStore.getState().setCurrentJob(null) + }, + + reset() { + _ctx.current = null + set({ runState: IDLE, activeNodeId: null, activeWorkflowId: null, nodeImageOutputs: {}, waitStates: {}, runningBranchId: null }) + }, + } +})