diff --git a/src/areas/generate/components/WorkflowPanel.tsx b/src/areas/generate/components/WorkflowPanel.tsx
index e2d920b..5d6c14f 100644
--- a/src/areas/generate/components/WorkflowPanel.tsx
+++ b/src/areas/generate/components/WorkflowPanel.tsx
@@ -10,6 +10,7 @@ import { useAppStore } from '@shared/stores/appStore'
import { useExtensionsStore } from '@shared/stores/extensionsStore'
import { useNavStore } from '@shared/stores/navStore'
import { useWorkflowRunStore } from '@areas/workflows/workflowRunStore'
+import { useWaitButton } from '@areas/workflows/useWaitButton'
import { buildAllWorkflowExtensions, getWorkflowExtension } from '@areas/workflows/mockExtensions'
import { validateWorkflowPreflight } from '@areas/workflows/preflight'
import type { WorkflowExtension } from '@areas/workflows/mockExtensions'
@@ -335,10 +336,7 @@ function TextParamRow({ nodeId, nodes, onPatch }: { nodeId: string; nodes: FlowN
}
function WaitParamRow({ nodeId }: { nodeId: string }) {
- const status = useWorkflowRunStore((s) => s.runState.status)
- const activeNodeId = useWorkflowRunStore((s) => s.activeNodeId)
- const continueRun = useWorkflowRunStore((s) => s.continueRun)
- const isPaused = status === 'paused' && activeNodeId === nodeId
+ const { waitState, canContinue, isRunning, label, buttonClass, onContinue } = useWaitButton(nodeId)
return (
@@ -348,15 +346,29 @@ function WaitParamRow({ nodeId }: { nodeId: string }) {
Wait
- {isPaused ? (
+ {waitState ? (
) : (
diff --git a/src/areas/workflows/nodeBehaviors.ts b/src/areas/workflows/nodeBehaviors.ts
new file mode 100644
index 0000000..ad3e746
--- /dev/null
+++ b/src/areas/workflows/nodeBehaviors.ts
@@ -0,0 +1,101 @@
+import type { WFNode, WFEdge } from '@shared/types/electron.d'
+
+// ─── Node behaviors registry ──────────────────────────────────────────────────
+//
+// Add a new entry here when you introduce a node type that needs to participate
+// in the runner's control-flow logic. The predicates and helpers below derive
+// everything they need from this table — no hardcoded type checks anywhere else.
+//
+// • passthrough — data flows through this node unchanged. Resolvers (inputs,
+// preflight typing) walk past it to find the real source.
+// • branchStarter — splits the run into a user-driven sub-DAG. The runner
+// pauses on these nodes and exposes a Continue/Retry button.
+// • sceneOutput — terminal sink that gets pushed to the 3D viewer. Used by
+// the immediate mesh-push logic during execution.
+
+export interface NodeBehavior {
+ passthrough?: boolean
+ branchStarter?: boolean
+ sceneOutput?: boolean
+}
+
+const BEHAVIORS: Record = {
+ waitNode: { passthrough: true, branchStarter: true },
+ outputNode: { sceneOutput: true },
+}
+
+export const isPassthrough = (type: string | undefined): boolean => !!type && !!BEHAVIORS[type]?.passthrough
+export const isBranchStarter = (type: string | undefined): boolean => !!type && !!BEHAVIORS[type]?.branchStarter
+export const isSceneOutput = (type: string | undefined): boolean => !!type && !!BEHAVIORS[type]?.sceneOutput
+
+// ─── Helpers ──────────────────────────────────────────────────────────────────
+
+/**
+ * Walks `sourceId` backwards through passthrough nodes (following each one's
+ * incoming edge) until it hits a non-passthrough node. Returns its id, or
+ * undefined if a passthrough has no incoming edge.
+ */
+export function resolveDataSource(
+ sourceId: string,
+ edges: WFEdge[],
+ nodeMap: Map,
+): string | undefined {
+ let cur = sourceId
+ const seen = new Set()
+ while (isPassthrough(nodeMap.get(cur)?.type) && !seen.has(cur)) {
+ seen.add(cur)
+ const parent = edges.find((e) => e.target === cur)
+ if (!parent) return undefined
+ cur = parent.source
+ }
+ return cur
+}
+
+/**
+ * Walks backwards from `nodeId` and returns the set of nearest upstream
+ * branch-starter (Wait) nodes — the first Wait found on each incoming path,
+ * without traversing past it. Empty = no upstream Wait. Size > 1 = the node
+ * merges two distinct branches.
+ */
+export function nearestUpstreamWaits(
+ nodeId: string,
+ edges: WFEdge[],
+ nodeMap: Map,
+): Set {
+ const result = new Set()
+ const seen = new Set()
+ const stack = edges.filter((e) => e.target === nodeId).map((e) => e.source)
+ while (stack.length > 0) {
+ const id = stack.pop()!
+ if (seen.has(id)) continue
+ seen.add(id)
+ if (isBranchStarter(nodeMap.get(id)?.type)) { result.add(id); continue }
+ for (const e of edges) if (e.target === id) stack.push(e.source)
+ }
+ return result
+}
+
+/**
+ * True if any forward path from `sourceId` reaches a sceneOutput node, walking
+ * through passthrough nodes along the way.
+ */
+export function reachesSceneOutput(
+ sourceId: string,
+ edges: WFEdge[],
+ nodeMap: Map,
+): boolean {
+ const stack = [sourceId]
+ const seen = new Set()
+ while (stack.length > 0) {
+ const id = stack.pop()!
+ if (seen.has(id)) continue
+ seen.add(id)
+ for (const e of edges) {
+ if (e.source !== id) continue
+ const tType = nodeMap.get(e.target)?.type
+ if (isSceneOutput(tType)) return true
+ if (isPassthrough(tType)) stack.push(e.target)
+ }
+ }
+ return false
+}
diff --git a/src/areas/workflows/nodes/WaitNode.tsx b/src/areas/workflows/nodes/WaitNode.tsx
index 8e30302..3f3dff7 100644
--- a/src/areas/workflows/nodes/WaitNode.tsx
+++ b/src/areas/workflows/nodes/WaitNode.tsx
@@ -1,15 +1,38 @@
import { Handle, Position } from '@xyflow/react'
import type { WFNodeData } from '@shared/types/electron.d'
-import { useWorkflowRunStore } from '../workflowRunStore'
+import { useWaitButton } from '../useWaitButton'
import BaseNode from './BaseNode'
const HANDLE_STYLE = { background: '#71717a', width: 14, height: 14, border: '2.5px solid #18181b' }
export default function WaitNode({ id, data, selected }: { id: string; data: WFNodeData; selected?: boolean }) {
- const status = useWorkflowRunStore((s) => s.runState.status)
- const activeNodeId = useWorkflowRunStore((s) => s.activeNodeId)
- const continueRun = useWorkflowRunStore((s) => s.continueRun)
- const isPaused = status === 'paused' && activeNodeId === id
+ const { waitState, canContinue, isRunning, label, buttonClass, statusText, onContinue } = useWaitButton(id)
+
+ const subheader = waitState ? (
+
+ ) : undefined
return (
}
- subheader={isPaused ? (
-
- ) : undefined}
+ subheader={subheader}
handles={
<>
@@ -43,9 +56,7 @@ export default function WaitNode({ id, data, selected }: { id: string; data: WFN
}
>
-
- {isPaused ? 'Workflow paused — click Continue to resume.' : 'Pauses the workflow until you click Continue.'}
-
+
{statusText}
)
diff --git a/src/areas/workflows/preflight.ts b/src/areas/workflows/preflight.ts
index 9ee9813..755ce5d 100644
--- a/src/areas/workflows/preflight.ts
+++ b/src/areas/workflows/preflight.ts
@@ -1,5 +1,6 @@
import type { Workflow, WFNode } from '@shared/types/electron.d'
import { getWorkflowExtension, type WorkflowExtension } from './mockExtensions'
+import { isPassthrough, resolveDataSource, nearestUpstreamWaits } from './nodeBehaviors'
type DataType = 'image' | 'text' | 'mesh'
@@ -55,11 +56,17 @@ export function validateWorkflowPreflight(
): WorkflowPreflightIssue[] {
const issues: WorkflowPreflightIssue[] = []
const nodeMap = new Map(workflow.nodes.map((node) => [node.id, node]))
- const outputTypes = new Map()
+ const outputTypes = new Map()
for (const node of workflow.nodes) {
outputTypes.set(node.id, getNodeOutputType(node, allExtensions))
}
+ // Passthrough nodes inherit their resolved upstream source's type.
+ for (const node of workflow.nodes) {
+ if (!isPassthrough(node.type)) continue
+ const realSourceId = resolveDataSource(node.id, workflow.edges, nodeMap)
+ if (realSourceId && realSourceId !== node.id) outputTypes.set(node.id, outputTypes.get(realSourceId))
+ }
for (const node of workflow.nodes) {
if (node.type === 'meshNode' && node.data.params?.source === 'current' && !options?.currentMeshUrl) {
@@ -70,6 +77,19 @@ export function validateWorkflowPreflight(
})
}
+ // A node fed by two different Wait branches can't be scheduled into a single
+ // branch — it would run before either branch produces its mesh.
+ if (
+ (node.type === 'extensionNode' || node.type === 'outputNode') &&
+ nearestUpstreamWaits(node.id, workflow.edges, nodeMap).size > 1
+ ) {
+ pushIssue(issues, {
+ key: `${node.id}:wait-merge`,
+ nodeId: node.id,
+ message: `${nodeLabel(node, allExtensions)} merges two Wait branches, which isn't supported. Route it through a single Wait.`,
+ })
+ }
+
if (node.type !== 'extensionNode') continue
const ext = getWorkflowExtension(node.data.extensionId ?? '', allExtensions)
diff --git a/src/areas/workflows/useWaitButton.ts b/src/areas/workflows/useWaitButton.ts
new file mode 100644
index 0000000..ac94df1
--- /dev/null
+++ b/src/areas/workflows/useWaitButton.ts
@@ -0,0 +1,47 @@
+import { useWorkflowRunStore } from './workflowRunStore'
+import type { WaitState } from './workflowRunStore'
+
+export interface WaitButtonModel {
+ waitState: WaitState | undefined
+ canContinue: boolean
+ isRunning: boolean
+ label: 'Retry' | 'Continue'
+ buttonClass: string
+ statusText: string
+ onContinue: () => void
+}
+
+// Shared derivation for the Wait Continue/Retry control, used by both the
+// canvas node (WaitNode) and the params panel (WaitParamRow). Keeps the two
+// renderings in sync — they differ only in markup/sizing, not in logic.
+export function useWaitButton(nodeId: string): WaitButtonModel {
+ const waitState = useWorkflowRunStore((s) => s.waitStates[nodeId])
+ const runningBranchId = useWorkflowRunStore((s) => s.runningBranchId)
+ const status = useWorkflowRunStore((s) => s.runState.status)
+ const continueRun = useWorkflowRunStore((s) => s.continueRun)
+
+ const otherBranchRunning = runningBranchId !== null && runningBranchId !== nodeId
+ // Pre-phase: shared nodes (e.g. Generate Mesh) still running before any branch hands off.
+ const inPrePhase = status === 'running' && runningBranchId === null
+ const isRunning = waitState === 'running'
+ const canContinue = (waitState === 'pending' || waitState === 'done' || waitState === 'error') && !otherBranchRunning && !inPrePhase
+ const label: 'Retry' | 'Continue' = waitState === 'done' || waitState === 'error' ? 'Retry' : 'Continue'
+
+ const buttonClass = waitState === 'error'
+ ? 'bg-red-500/15 border-red-500/30 text-red-400 hover:bg-red-500/25'
+ : waitState === 'done'
+ ? 'bg-emerald-500/15 border-emerald-500/30 text-emerald-400 hover:bg-emerald-500/25'
+ : 'bg-amber-500/15 border-amber-500/30 text-amber-400 hover:bg-amber-500/25'
+
+ const statusText =
+ waitState === 'blocked' ? 'Waiting for the previous Wait to finish…' :
+ waitState === 'running' ? 'Branch in progress…' :
+ waitState === 'done' ? 'Branch finished — Retry to re-run.' :
+ waitState === 'error' ? 'Branch failed — Retry to re-run.' :
+ waitState === 'pending' && inPrePhase ? 'Waiting for upstream nodes…' :
+ waitState === 'pending' && otherBranchRunning ? 'Another branch is running…' :
+ waitState === 'pending' ? 'Workflow paused — click Continue to run this branch.' :
+ 'Pauses the workflow until you click Continue.'
+
+ return { waitState, canContinue, isRunning, label, buttonClass, statusText, onContinue: () => continueRun(nodeId) }
+}
diff --git a/src/areas/workflows/useWorkflowRunner.ts b/src/areas/workflows/useWorkflowRunner.ts
deleted file mode 100644
index 091ea16..0000000
--- a/src/areas/workflows/useWorkflowRunner.ts
+++ /dev/null
@@ -1,321 +0,0 @@
-import { useState, useCallback, useRef } from 'react'
-import axios from 'axios'
-import { useAppStore } from '@shared/stores/appStore'
-import type { Workflow, WFNode, WFEdge } from '@shared/types/electron.d'
-import { getWorkflowExtension } from './mockExtensions'
-import type { WorkflowExtension } from './mockExtensions'
-
-// ─── Types ────────────────────────────────────────────────────────────────────
-
-export interface WorkflowRunState {
- status: 'idle' | 'running' | 'done' | 'error'
- blockIndex: number
- blockTotal: number
- blockProgress: number
- blockStep: string
- outputUrl?: string
- outputPath?: string
- error?: string
-}
-
-const IDLE: WorkflowRunState = {
- status: 'idle', blockIndex: 0, blockTotal: 0, blockProgress: 0, blockStep: '',
-}
-
-// ─── Topological sort (Kahn's algorithm) ─────────────────────────────────────
-
-function topoSort(nodes: WFNode[], edges: WFEdge[]): WFNode[] {
- const nodeMap = new Map(nodes.map((n) => [n.id, n]))
- const inDegree = new Map(nodes.map((n) => [n.id, 0]))
- const adj = new Map(nodes.map((n) => [n.id, [] as string[]]))
-
- for (const e of edges) {
- if (!nodeMap.has(e.source) || !nodeMap.has(e.target)) continue
- adj.get(e.source)!.push(e.target)
- inDegree.set(e.target, (inDegree.get(e.target) ?? 0) + 1)
- }
-
- const queue = nodes.filter((n) => (inDegree.get(n.id) ?? 0) === 0)
- const result: WFNode[] = []
-
- while (queue.length > 0) {
- const node = queue.shift()!
- result.push(node)
- for (const neighbor of adj.get(node.id) ?? []) {
- const deg = (inDegree.get(neighbor) ?? 0) - 1
- inDegree.set(neighbor, deg)
- if (deg === 0) queue.push(nodeMap.get(neighbor)!)
- }
- }
-
- return result
-}
-
-// ─── Hook ─────────────────────────────────────────────────────────────────────
-
-export function useWorkflowRunner(allExtensions: WorkflowExtension[]) {
- const apiUrl = useAppStore((s) => s.apiUrl)
- const [runState, setRunState] = useState(IDLE)
- const cancelRef = useRef(false)
- const activeJobId = useRef(null)
-
- const run = useCallback(async (
- workflow: Workflow,
- imagePath: string,
- imageData?: string,
- currentMeshUrl?: string, // outputUrl of the mesh currently in the scene (before job is replaced)
- ) => {
- cancelRef.current = false
-
- const ordered = topoSort(workflow.nodes, workflow.edges)
- const execNodes = ordered.filter((n) => n.type === 'extensionNode' && n.data.enabled)
-
- setRunState({
- status: 'running', blockIndex: 0, blockTotal: execNodes.length,
- blockProgress: 0, blockStep: 'Starting…',
- })
-
- try {
- const client = axios.create({ baseURL: apiUrl })
- const settings = await window.electron.settings.get()
- const workspaceDir = settings.workspaceDir.replace(/\\/g, '/')
-
- // Track outputs per node so branches each get the correct predecessor output.
- // outputType distinguishes image files from mesh files for multi-input routing.
- const nodeOutputs = new Map()
-
- // Pre-populate source nodes
- for (const node of ordered) {
- if (node.type === 'imageNode') nodeOutputs.set(node.id, { filePath: imagePath, outputType: 'image' })
- if (node.type === 'textNode') nodeOutputs.set(node.id, { text: node.data.params?.text as string | undefined })
- if (node.type === 'meshNode') {
- const source = node.data.params?.source as 'file' | 'current' | undefined
- if (source === 'current') {
- if (currentMeshUrl) {
- let meshFilePath: string
- if (currentMeshUrl.includes('serve-file?path=')) {
- const encoded = currentMeshUrl.split('serve-file?path=')[1]
- meshFilePath = decodeURIComponent(encoded).replace(/\\/g, '/')
- } else {
- const rel = currentMeshUrl.replace(/^\/workspace\//, '')
- meshFilePath = `${workspaceDir}/${rel}`
- }
- nodeOutputs.set(node.id, { filePath: meshFilePath, outputType: 'mesh' })
- }
- } else {
- const fp = node.data.params?.filePath as string | undefined
- if (fp) nodeOutputs.set(node.id, { filePath: fp, outputType: 'mesh' })
- }
- }
- }
-
- for (let i = 0; i < execNodes.length; i++) {
- if (cancelRef.current) { setRunState(IDLE); return }
-
- const node = execNodes[i]
- const ext = getWorkflowExtension(node.data.extensionId ?? '', allExtensions)
-
- // Resolve this node's inputs from its actual predecessors in the graph.
- // For multi-input nodes, route by outputType (image vs mesh).
- let nodeInputPath: string | undefined
- let nodeInputText: string | undefined
- let nodeInputMeshPath: string | undefined // for multi-input: the mesh wire
-
- const incomingEdges = workflow.edges.filter((e) => e.target === node.id)
- if (ext?.inputs && ext.inputs.length > 1) {
- // Multi-input: route each edge by the source node's outputType
- for (const edge of incomingEdges) {
- const src = nodeOutputs.get(edge.source)
- if (!src) continue
- if (src.outputType === 'mesh') nodeInputMeshPath = src.filePath
- else if (src.outputType === 'image') nodeInputPath = src.filePath
- else if (src.filePath !== undefined) nodeInputPath = src.filePath
- if (src.text !== undefined) nodeInputText = src.text
- }
- } else {
- // Single-input: original behaviour
- for (const edge of incomingEdges) {
- const src = nodeOutputs.get(edge.source)
- if (src?.filePath !== undefined) nodeInputPath = src.filePath
- if (src?.text !== undefined) nodeInputText = src.text
- }
- }
-
- setRunState((s) => ({ ...s, blockIndex: i, blockProgress: 0, blockStep: 'Starting…' }))
-
- // Model extensions always go through the HTTP API (job queue, progress, GPU).
- // Process extensions always go through IPC runProcess (CPU, synchronous).
- const isGeneratorNode = ext?.type === 'model'
-
- if (isGeneratorNode) {
- // ── Generator: call Python FastAPI ──────────────────────────────────
- // For multi-input nodes, use the resolved image path (not the global imagePath)
- const activeImagePath = nodeInputPath ?? imagePath
- const base64 = imageData && nodeInputPath === undefined
- ? imageData
- : await window.electron.fs.readFileBase64(activeImagePath)
- const bytes = Uint8Array.from(atob(base64), (c) => c.charCodeAt(0))
- const blob = new Blob([bytes], { type: 'image/png' })
- const fname = activeImagePath.split(/[\\/]/).pop() ?? 'image.png'
-
- // For multi-input nodes: inject the mesh input as params.mesh_path
- const extraParams: Record = {}
- if (nodeInputMeshPath) {
- const norm = nodeInputMeshPath.replace(/\\/g, '/')
- extraParams.mesh_path = norm.startsWith(workspaceDir)
- ? norm.slice(workspaceDir.length).replace(/^\//, '')
- : norm
- }
-
- // Merge schema defaults (with per-variant paramDefaults already applied)
- // under user overrides so Python receives the effective values, not an
- // empty dict that falls back to hardcoded defaults in the generator.
- const schemaDefaults = Object.fromEntries(
- (ext.params ?? []).map((p) => [p.id, p.default]),
- )
- const effectiveParams = { ...schemaDefaults, ...(node.data.params ?? {}) }
-
- const fd = new FormData()
- fd.append('image', blob, fname)
- fd.append('model_id', node.data.extensionId ?? '')
- fd.append('collection', 'Workflows')
- fd.append('remesh', 'none')
- fd.append('enable_texture', 'false')
- fd.append('texture_resolution', '1024')
- fd.append('params', JSON.stringify({ ...effectiveParams, ...extraParams }))
-
- setRunState((s) => ({ ...s, blockProgress: 5, blockStep: 'Submitting to model…' }))
-
- const { data } = await client.post<{ job_id: string }>(
- '/generate/from-image', fd,
- { headers: { 'Content-Type': 'multipart/form-data' } },
- )
- const jobId = data.job_id
- activeJobId.current = jobId
-
- while (true) {
- if (cancelRef.current) {
- await client.post(`/generate/cancel/${jobId}`).catch(() => {})
- activeJobId.current = null
- setRunState(IDLE)
- return
- }
- await new Promise((r) => setTimeout(r, 1200))
-
- const { data: st } = await client.get<{
- status: string; progress?: number; step?: string
- output_url?: string; error?: string
- }>(`/generate/status/${jobId}`)
-
- if (st.status === 'done' && st.output_url) {
- const rel = st.output_url.replace(/^\/workspace\//, '')
- nodeInputPath = `${workspaceDir}/${rel}`
- activeJobId.current = null
- setRunState((s) => ({ ...s, blockProgress: 100, blockStep: 'Generation complete' }))
- break
- }
- if (st.status === 'error') throw new Error(st.error ?? 'Generation failed')
-
- setRunState((s) => ({
- ...s,
- blockProgress: st.progress ?? s.blockProgress,
- blockStep: st.step ?? 'Generating…',
- }))
- }
-
- } else {
- // ── Process extension ────────────────────────────────────────────────
- if (ext?.input === 'mesh' && !nodeInputPath) {
- throw new Error(`${ext.name} needs an incoming mesh connection`)
- }
- if (ext?.input === 'image' && !nodeInputPath) {
- throw new Error(`${ext.name} needs an incoming image connection`)
- }
- if (ext?.input === 'text' && !nodeInputText) {
- throw new Error(`${ext.name} needs an incoming text connection`)
- }
- const parts = (node.data.extensionId ?? '').split('/')
- const extId = parts[0]
- const nodeId = parts[1] ?? ''
- const result = await window.electron.extensions.runProcess(
- extId,
- { filePath: nodeInputPath, text: nodeInputText, nodeId },
- node.data.params as Record,
- )
- if (!result.success) throw new Error(result.error ?? 'Process extension failed')
- nodeInputPath = result.result?.filePath ?? nodeInputPath
- nodeInputText = result.result?.text ?? nodeInputText
- setRunState((s) => ({ ...s, blockProgress: 100, blockStep: 'Done' }))
- }
-
- // Store this node's output so downstream nodes (including other branches) can read it.
- // Tag with outputType so multi-input nodes can route by type.
- const outputType = ext?.output ?? (nodeInputPath ? 'mesh' : undefined)
- nodeOutputs.set(node.id, { filePath: nodeInputPath, text: nodeInputText, outputType })
- }
-
- // Determine outputUrl: prefer what feeds the outputNode (Add to Scene)
- let outputUrl: string | undefined
- let outputPath: string | undefined
-
- const outputNodeDef = ordered.find((n) => n.type === 'outputNode')
- if (outputNodeDef) {
- for (const edge of workflow.edges.filter((e) => e.target === outputNodeDef.id)) {
- const src = nodeOutputs.get(edge.source)
- if (src?.filePath) {
- const norm = src.filePath.replace(/\\/g, '/')
- if (norm.startsWith(workspaceDir)) {
- const rel = norm.slice(workspaceDir.length).replace(/^\//, '')
- outputUrl = `/workspace/${rel}`
- }
- }
- }
- }
-
- // Fallback: scan all nodes for a workspace file (last one wins)
- if (!outputUrl) {
- for (const node of execNodes) {
- const out = nodeOutputs.get(node.id)
- if (out?.filePath) {
- const norm = out.filePath.replace(/\\/g, '/')
- if (norm.startsWith(workspaceDir)) {
- const rel = norm.slice(workspaceDir.length).replace(/^\//, '')
- outputUrl = `/workspace/${rel}`
- } else {
- outputPath = out.filePath
- }
- }
- }
- }
-
- setRunState({
- status: 'done',
- blockIndex: execNodes.length - 1,
- blockTotal: execNodes.length,
- blockProgress: 100,
- blockStep: 'Done',
- outputUrl,
- outputPath,
- })
-
- } catch (err) {
- if (!cancelRef.current) {
- setRunState((s) => ({ ...s, status: 'error', error: String(err) }))
- }
- }
- }, [apiUrl, allExtensions])
-
- const cancel = useCallback(() => {
- cancelRef.current = true
- if (activeJobId.current) {
- const client = axios.create({ baseURL: apiUrl })
- client.post(`/generate/cancel/${activeJobId.current}`).catch(() => {})
- activeJobId.current = null
- }
- setRunState(IDLE)
- }, [apiUrl])
-
- const reset = useCallback(() => setRunState(IDLE), [])
-
- return { runState, run, cancel, reset }
-}
diff --git a/src/areas/workflows/workflowRunStore.ts b/src/areas/workflows/workflowRunStore.ts
index d58ac48..f996b34 100644
--- a/src/areas/workflows/workflowRunStore.ts
+++ b/src/areas/workflows/workflowRunStore.ts
@@ -1,9 +1,10 @@
import { create } from 'zustand'
-import axios from 'axios'
+import axios, { AxiosInstance } from 'axios'
import { useAppStore } from '@shared/stores/appStore'
import { getWorkflowExtension } from './mockExtensions'
import type { WorkflowExtension } from './mockExtensions'
import type { Workflow, WFNode, WFEdge } from '@shared/types/electron.d'
+import { isBranchStarter, isSceneOutput, resolveDataSource, reachesSceneOutput, nearestUpstreamWaits } from './nodeBehaviors'
// ─── Types ────────────────────────────────────────────────────────────────────
@@ -18,401 +19,570 @@ export interface WorkflowRunState {
error?: string
}
+export type WaitState = 'blocked' | 'pending' | 'running' | 'done' | 'error'
+
const IDLE: WorkflowRunState = {
status: 'idle', blockIndex: 0, blockTotal: 0, blockProgress: 0, blockStep: '',
}
-// Module-level refs — survive component unmounts / navigation
+// ─── Module-level run context (survives between run() and continueRun(id)) ───
+
const _cancel = { current: false }
const _activeJobId = { current: null as string | null }
-const _resume = { current: null as (() => void) | null }
-function flushResume(): void {
- const fn = _resume.current
- if (!fn) return
- _resume.current = null
- fn()
+interface NodeOutput { filePath?: string; text?: string; outputType?: string }
+
+interface RunContext {
+ workflow: Workflow
+ allExtensions: WorkflowExtension[]
+ client: AxiosInstance
+ workspaceDir: string
+ selectedImagePath: string
+ selectedImageData?: string
+ overrideImageData?: string
+ nodeOutputs: Map
+ nodeMap: Map
+ /** nodes in execution (topological) order */
+ ordered: WFNode[]
+ branches: Map
+ waitIds: string[]
+ /** waitId → nearest upstream waitId (null = top-level, runnable from the start) */
+ parentWait: Map
+ /** workspace URL of the most recently pushed scene mesh (last branch the user ran wins) */
+ lastSceneMesh?: string
}
+const _ctx = { current: null as RunContext | null }
-// ─── Topological sort ─────────────────────────────────────────────────────────
+// ─── Topological sort (DFS preorder, branch-first) ───────────────────────────
function topoSort(nodes: WFNode[], edges: WFEdge[]): WFNode[] {
- const nodeMap = new Map(nodes.map((n) => [n.id, n]))
- const inDegree = new Map(nodes.map((n) => [n.id, 0]))
- const adj = new Map(nodes.map((n) => [n.id, [] as string[]]))
+ const nodeMap = new Map(nodes.map((n) => [n.id, n]))
+ const adj = new Map(nodes.map((n) => [n.id, [] as string[]]))
+ const inDeg = new Map(nodes.map((n) => [n.id, 0]))
for (const e of edges) {
if (!nodeMap.has(e.source) || !nodeMap.has(e.target)) continue
adj.get(e.source)!.push(e.target)
- inDegree.set(e.target, (inDegree.get(e.target) ?? 0) + 1)
+ inDeg.set(e.target, (inDeg.get(e.target) ?? 0) + 1)
}
- const queue = nodes.filter((n) => (inDegree.get(n.id) ?? 0) === 0)
+
+ const visited = new Set()
const result: WFNode[] = []
- while (queue.length > 0) {
- const node = queue.shift()!
- result.push(node)
- for (const neighbor of adj.get(node.id) ?? []) {
- const deg = (inDegree.get(neighbor) ?? 0) - 1
- inDegree.set(neighbor, deg)
- if (deg === 0) queue.push(nodeMap.get(neighbor)!)
+
+ const visit = (id: string): void => {
+ if (visited.has(id)) return
+ for (const e of edges) {
+ if (e.target === id && !visited.has(e.source) && nodeMap.has(e.source)) return
}
+ const node = nodeMap.get(id)
+ if (!node) return
+ visited.add(id)
+ result.push(node)
+ for (const childId of adj.get(id) ?? []) visit(childId)
}
+
+ for (const node of nodes) if ((inDeg.get(node.id) ?? 0) === 0) visit(node.id)
+ for (const node of nodes) if (!visited.has(node.id)) visit(node.id)
return result
}
+// ─── Branch identification ────────────────────────────────────────────────────
+// A node belongs to Wait W's branch if its single nearest upstream Wait is W
+// (dominance). Nodes with no upstream Wait — or with multiple (merges) — execute
+// in the pre-phase before any user pause.
+
+function identifyBranches(workflow: Workflow): {
+ preExecExtNodes: WFNode[]
+ branches: Map
+ waitIds: string[]
+ parentWait: Map
+ ordered: WFNode[]
+} {
+ const ordered = topoSort(workflow.nodes, workflow.edges)
+ const nodeMap = new Map(workflow.nodes.map((n) => [n.id, n]))
+ const waitIds = ordered.filter((n) => isBranchStarter(n.type)).map((n) => n.id)
+
+ // A node is owned by its single nearest upstream Wait (dominance). This lets
+ // Wait → … → Wait chains nest: nodes after the 2nd Wait belong to it, not the 1st.
+ const branchOwner = new Map()
+ for (const node of workflow.nodes) {
+ if (isBranchStarter(node.type)) continue
+ const nearest = nearestUpstreamWaits(node.id, workflow.edges, nodeMap)
+ if (nearest.size === 1) branchOwner.set(node.id, [...nearest][0])
+ }
+
+ // Each Wait's parent = its own nearest upstream Wait (null if top-level).
+ const parentWait = new Map()
+ for (const w of waitIds) {
+ const nearest = nearestUpstreamWaits(w, workflow.edges, nodeMap)
+ parentWait.set(w, nearest.size === 1 ? [...nearest][0] : null)
+ }
+
+ const branches = new Map()
+ for (const w of waitIds) branches.set(w, [])
+ const preExecExtNodes: WFNode[] = []
+ for (const node of ordered) {
+ if (node.type !== 'extensionNode' || !node.data.enabled) continue
+ const owner = branchOwner.get(node.id)
+ if (owner) branches.get(owner)!.push(node)
+ else preExecExtNodes.push(node)
+ }
+
+ return { preExecExtNodes, branches, waitIds, parentWait, ordered }
+}
+
+// ─── Per-node execution ──────────────────────────────────────────────────────
+// Resolves inputs (walking through Wait passthroughs), runs the extension
+// (model or process), updates nodeOutputs, and pushes the mesh to the scene
+// if it feeds an Add-to-Scene through Waits.
+
+async function executeExtensionNode(
+ node: WFNode,
+ ctx: RunContext,
+ setRunState: (updater: (s: WorkflowRunState) => WorkflowRunState) => void,
+): Promise {
+ const { workflow, allExtensions, client, workspaceDir, nodeOutputs, nodeMap,
+ selectedImagePath, selectedImageData } = ctx
+
+ const ext = getWorkflowExtension(node.data.extensionId ?? '', allExtensions)
+
+ const resolveSource = (sourceId: string): NodeOutput | undefined => {
+ const realId = resolveDataSource(sourceId, workflow.edges, nodeMap)
+ return realId ? nodeOutputs.get(realId) : undefined
+ }
+
+ let nodeInputPath: string | undefined
+ let nodeInputText: string | undefined
+ let nodeInputMeshPath: string | undefined
+
+ const incomingEdges = workflow.edges.filter((e) => e.target === node.id)
+
+ if (ext?.inputs && ext.inputs.length > 1) {
+ for (const edge of incomingEdges) {
+ const src = resolveSource(edge.source)
+ if (!src) continue
+ if (src.outputType === 'mesh') nodeInputMeshPath = src.filePath
+ else if (src.outputType === 'image') nodeInputPath = src.filePath
+ else if (src.filePath !== undefined) nodeInputPath = src.filePath
+ if (src.text !== undefined) nodeInputText = src.text
+ }
+ } else {
+ for (const edge of incomingEdges) {
+ const src = resolveSource(edge.source)
+ if (src?.filePath !== undefined) nodeInputPath = src.filePath
+ if (src?.text !== undefined) nodeInputText = src.text
+ }
+ }
+
+ const isModelNode = ext?.type === 'model'
+
+ if (isModelNode) {
+ const activeImagePath = nodeInputPath ?? selectedImagePath
+ const base64 = selectedImageData && nodeInputPath === undefined
+ ? selectedImageData
+ : await window.electron.fs.readFileBase64(activeImagePath)
+ const bytes = Uint8Array.from(atob(base64), (c) => c.charCodeAt(0))
+ const blob = new Blob([bytes], { type: 'image/png' })
+ const fname = activeImagePath.split(/[\\/]/).pop() ?? 'image.png'
+
+ const extraParams: Record = {}
+ if (nodeInputMeshPath) {
+ const norm = nodeInputMeshPath.replace(/\\/g, '/')
+ extraParams.mesh_path = norm.startsWith(workspaceDir)
+ ? norm.slice(workspaceDir.length).replace(/^\//, '')
+ : norm
+ }
+
+ const schemaDefaults = Object.fromEntries(
+ (ext.params ?? []).map((p) => [p.id, p.default]),
+ )
+ const effectiveParams = { ...schemaDefaults, ...(node.data.params ?? {}) }
+
+ const fd = new FormData()
+ fd.append('image', blob, fname)
+ fd.append('model_id', node.data.extensionId ?? '')
+ fd.append('collection', 'Workflows')
+ fd.append('remesh', 'none')
+ fd.append('enable_texture', 'false')
+ fd.append('texture_resolution', '1024')
+ fd.append('params', JSON.stringify({ ...effectiveParams, ...extraParams }))
+
+ setRunState((s) => ({ ...s, blockProgress: 5, blockStep: 'Submitting to model…' }))
+
+ const { data } = await client.post<{ job_id: string }>(
+ '/generate/from-image', fd,
+ { headers: { 'Content-Type': 'multipart/form-data' } },
+ )
+ _activeJobId.current = data.job_id
+
+ while (true) {
+ if (_cancel.current) {
+ await client.post(`/generate/cancel/${_activeJobId.current}`).catch(() => {})
+ _activeJobId.current = null
+ throw new Error('Cancelled')
+ }
+ await new Promise((r) => setTimeout(r, 1200))
+
+ const { data: st } = await client.get<{
+ status: string; progress?: number; step?: string; output_url?: string; error?: string
+ }>(`/generate/status/${_activeJobId.current}`)
+
+ if (st.status === 'done' && st.output_url) {
+ const rel = st.output_url.replace(/^\/workspace\//, '')
+ nodeInputPath = `${workspaceDir}/${rel}`
+ _activeJobId.current = null
+ setRunState((s) => ({ ...s, blockProgress: 100, blockStep: 'Generation complete' }))
+ break
+ }
+ if (st.status === 'error') throw new Error(st.error ?? 'Generation failed')
+
+ setRunState((s) => ({ ...s, blockProgress: st.progress ?? s.blockProgress, blockStep: st.step ?? 'Generating…' }))
+ useAppStore.getState().updateCurrentJob({ status: 'generating', progress: st.progress, step: st.step })
+ }
+ } else {
+ if (ext?.input === 'mesh' && !nodeInputPath) throw new Error(`${ext.name} needs an incoming mesh connection`)
+ if (ext?.input === 'image' && !nodeInputPath) throw new Error(`${ext.name} needs an incoming image connection`)
+ if (ext?.input === 'text' && !nodeInputText) throw new Error(`${ext.name} needs an incoming text connection`)
+
+ const parts = (node.data.extensionId ?? '').split('/')
+ const extId = parts[0]
+ const nid = parts[1] ?? ''
+ const result = await window.electron.extensions.runProcess(
+ extId,
+ { filePath: nodeInputPath, text: nodeInputText, nodeId: nid },
+ node.data.params as Record,
+ )
+ if (!result.success) throw new Error(result.error ?? 'Process extension failed')
+ nodeInputPath = result.result?.filePath ?? nodeInputPath
+ nodeInputText = result.result?.text ?? nodeInputText
+ setRunState((s) => ({ ...s, blockProgress: 100, blockStep: 'Done' }))
+ }
+
+ const outputType = ext?.output ?? (nodeInputPath ? 'mesh' : undefined)
+ nodeOutputs.set(node.id, { filePath: nodeInputPath, text: nodeInputText, outputType })
+
+ const norm = nodeInputPath?.replace(/\\/g, '/')
+ if (norm?.startsWith(workspaceDir) && reachesSceneOutput(node.id, workflow.edges, nodeMap)) {
+ const url = `/workspace/${norm.slice(workspaceDir.length).replace(/^\//, '')}`
+ ctx.lastSceneMesh = url // remember it so finalize() keeps the last-run branch in view
+ useAppStore.getState().updateCurrentJob({ status: 'done', progress: 100, outputUrl: url })
+ }
+}
+
+// ─── Wait dependency helpers ───────────────────────────────────────────────────
+
+/** All Waits nested (transitively) under `rootId`, via the parentWait chain. */
+function descendantWaits(rootId: string, ctx: RunContext): Set {
+ const out = new Set()
+ let frontier = new Set([rootId])
+ while (frontier.size > 0) {
+ const next = new Set()
+ for (const w of ctx.waitIds) {
+ const parent = ctx.parentWait.get(w)
+ if (parent && frontier.has(parent) && !out.has(w)) { out.add(w); next.add(w) }
+ }
+ frontier = next
+ }
+ return out
+}
+
// ─── Store ────────────────────────────────────────────────────────────────────
interface WorkflowRunStore {
runState: WorkflowRunState
activeNodeId: string | null
activeWorkflowId: string | null
- /** nodeId → workspace URL for image outputs (populated after each run) */
nodeImageOutputs: Record
+ waitStates: Record
+ runningBranchId: string | null
run: (workflow: Workflow, allExtensions: WorkflowExtension[], overrideImageData?: string) => Promise
cancel: () => void
reset: () => void
- continueRun: () => void
+ continueRun: (waitId: string) => Promise
}
-export const useWorkflowRunStore = create((set) => ({
- runState: IDLE,
- activeNodeId: null,
- activeWorkflowId: null,
- nodeImageOutputs: {},
-
- async run(workflow, allExtensions, overrideImageData?) {
- _cancel.current = false
-
- const appState = useAppStore.getState()
- const apiUrl = appState.apiUrl
- const ordered = topoSort(workflow.nodes, workflow.edges)
- const execNodes = ordered.filter((n) =>
- (n.type === 'extensionNode' || n.type === 'waitNode') && n.data.enabled,
- )
+export const useWorkflowRunStore = create((set, get) => {
+ const setRunState = (updater: (s: WorkflowRunState) => WorkflowRunState): void => {
+ set((s) => ({ runState: updater(s.runState) }))
+ }
- const selectedImagePath = appState.selectedImagePath ?? ''
- const selectedImageData = overrideImageData ?? appState.selectedImageData ?? undefined
- const currentMeshUrl = appState.currentJob?.outputUrl
-
- set({
- activeWorkflowId: workflow.id,
- nodeImageOutputs: {},
- runState: { status: 'running', blockIndex: 0, blockTotal: execNodes.length, blockProgress: 0, blockStep: 'Starting…' },
- })
-
- appState.setCurrentJob({
- id: crypto.randomUUID(),
- imageFile: selectedImagePath,
- status: 'generating',
- progress: 0,
- createdAt: Date.now(),
- })
-
- try {
- const client = axios.create({ baseURL: apiUrl })
- const settings = await window.electron.settings.get()
- const workspaceDir = settings.workspaceDir.replace(/\\/g, '/')
-
- // Clean up tmp folder from previous run
- const tmpAbsPath = settings.workspaceDir.replace(/[\\/]+$/, '') + '/tmp'
- window.electron.fs.deleteDirectory(tmpAbsPath).catch(() => {})
-
- // nodeId → { filePath, text, outputType }
- const nodeOutputs = new Map()
- const outputNodeIds = new Set(ordered.filter((n) => n.type === 'outputNode').map((n) => n.id))
-
- // Pre-populate source nodes
- for (const node of ordered) {
- if (node.type === 'imageNode') {
- const fp = node.data.params?.filePath as string | undefined
- // When the agent provides an override image, ignore any hardcoded filePath so the
- // model node falls through to selectedImageData (= overrideImageData).
- const resolvedPath = overrideImageData ? undefined : (fp ?? selectedImagePath ?? undefined)
- nodeOutputs.set(node.id, { filePath: resolvedPath, outputType: 'image' })
+ const collectImageOutputs = (ctx: RunContext): Record => {
+ const out: Record = {}
+ for (const [nodeId, o] of ctx.nodeOutputs) {
+ if (o.outputType === 'image' && o.filePath) {
+ const norm = o.filePath.replace(/\\/g, '/')
+ if (norm.startsWith(ctx.workspaceDir)) {
+ out[nodeId] = `/workspace/${norm.slice(ctx.workspaceDir.length).replace(/^\//, '')}`
}
- if (node.type === 'textNode') {
- nodeOutputs.set(node.id, { text: node.data.params?.text as string | undefined })
+ }
+ }
+ return out
+ }
+
+ const finalize = (ctx: RunContext, finalWaitStates?: Record): void => {
+ // Prefer the mesh of the last branch the user actually ran — it's already in the
+ // viewer, and topo order must not override the user's last action.
+ let outputUrl: string | undefined = ctx.lastSceneMesh
+ let outputPath: string | undefined
+
+ const lastOutputNode = outputUrl ? undefined : [...ctx.ordered].reverse().find((n) => isSceneOutput(n.type))
+ if (lastOutputNode) {
+ for (const edge of ctx.workflow.edges.filter((e) => e.target === lastOutputNode.id)) {
+ const src = ctx.nodeOutputs.get(edge.source)
+ if (src?.filePath) {
+ const norm = src.filePath.replace(/\\/g, '/')
+ if (norm.startsWith(ctx.workspaceDir)) {
+ outputUrl = `/workspace/${norm.slice(ctx.workspaceDir.length).replace(/^\//, '')}`
+ }
}
- if (node.type === 'meshNode') {
- const source = node.data.params?.source as 'file' | 'current' | undefined
- if (source === 'current' && currentMeshUrl) {
- let meshFilePath: string
- if (currentMeshUrl.includes('serve-file?path=')) {
- // URL like /optimize/serve-file?path=D%3A%5C... → extract and decode the real path
- const encoded = currentMeshUrl.split('serve-file?path=')[1]
- meshFilePath = decodeURIComponent(encoded).replace(/\\/g, '/')
- } else {
- // URL like /workspace/Workflows/file.glb → resolve to absolute path
- const rel = currentMeshUrl.replace(/^\/workspace\//, '')
- meshFilePath = `${workspaceDir}/${rel}`
- }
- nodeOutputs.set(node.id, { filePath: meshFilePath, outputType: 'mesh' })
+ }
+ }
+ if (!outputUrl) {
+ for (const [, o] of ctx.nodeOutputs) {
+ if (o.filePath) {
+ const norm = o.filePath.replace(/\\/g, '/')
+ if (norm.startsWith(ctx.workspaceDir)) {
+ outputUrl = `/workspace/${norm.slice(ctx.workspaceDir.length).replace(/^\//, '')}`
} else {
- const fp = node.data.params?.filePath as string | undefined
- if (fp) nodeOutputs.set(node.id, { filePath: fp, outputType: 'mesh' })
+ outputPath = o.filePath
}
}
}
+ }
- for (let i = 0; i < execNodes.length; i++) {
- if (_cancel.current) { set({ runState: IDLE, activeNodeId: null }); return }
+ set((s) => ({
+ activeNodeId: null,
+ runningBranchId: null,
+ waitStates: finalWaitStates ?? s.waitStates,
+ nodeImageOutputs: collectImageOutputs(ctx),
+ runState: {
+ status: 'done',
+ blockIndex: 0,
+ blockTotal: 0,
+ blockProgress: 100,
+ blockStep: 'Done',
+ outputUrl,
+ outputPath,
+ },
+ }))
+ useAppStore.getState().updateCurrentJob({ status: 'done', progress: 100, outputUrl })
+ }
- const node = execNodes[i]
- const ext = getWorkflowExtension(node.data.extensionId ?? '', allExtensions)
+ return {
+ runState: IDLE,
+ activeNodeId: null,
+ activeWorkflowId: null,
+ nodeImageOutputs: {},
+ waitStates: {},
+ runningBranchId: null,
- // ── Resolve inputs ────────────────────────────────────────────────
- let nodeInputPath: string | undefined
- let nodeInputText: string | undefined
- let nodeInputMeshPath: string | undefined
+ async run(workflow, allExtensions, overrideImageData?) {
+ _cancel.current = false
- const incomingEdges = workflow.edges.filter((e) => e.target === node.id)
+ const appState = useAppStore.getState()
+ const apiUrl = appState.apiUrl
- if (ext?.inputs && ext.inputs.length > 1) {
- // Multi-input: route each incoming edge by the source node's outputType
- for (const edge of incomingEdges) {
- const src = nodeOutputs.get(edge.source)
- if (!src) continue
- if (src.outputType === 'mesh') nodeInputMeshPath = src.filePath
- else if (src.outputType === 'image') nodeInputPath = src.filePath
- else if (src.filePath !== undefined) nodeInputPath = src.filePath
- if (src.text !== undefined) nodeInputText = src.text
- }
- } else {
- // Single-input
- for (const edge of incomingEdges) {
- const src = nodeOutputs.get(edge.source)
- if (src?.filePath !== undefined) nodeInputPath = src.filePath
- if (src?.text !== undefined) nodeInputText = src.text
- }
- }
+ const { preExecExtNodes, branches, waitIds, parentWait, ordered } = identifyBranches(workflow)
+ const branchSteps = waitIds.reduce((acc, w) => acc + (branches.get(w)?.length ?? 0), 0)
+ const totalSteps = preExecExtNodes.length + branchSteps
- set((s) => ({
- activeNodeId: node.id,
- runState: { ...s.runState, blockIndex: i, blockProgress: 0, blockStep: 'Starting…' },
- }))
-
- // ── Wait node → pause until continueRun(), then passthrough ───────
- if (node.type === 'waitNode') {
- set((s) => ({ runState: { ...s.runState, status: 'paused', blockStep: 'Paused — click Continue' } }))
- await new Promise((resolve) => { _resume.current = resolve })
- if (_cancel.current) { set({ runState: IDLE, activeNodeId: null }); return }
-
- nodeOutputs.set(node.id, {
- filePath: nodeInputPath,
- text: nodeInputText,
- outputType: incomingEdges[0] ? nodeOutputs.get(incomingEdges[0].source)?.outputType : undefined,
- })
- set((s) => ({ runState: { ...s.runState, status: 'running' } }))
- continue
- }
+ const selectedImagePath = appState.selectedImagePath ?? ''
+ const selectedImageData = overrideImageData ?? appState.selectedImageData ?? undefined
+ const currentMeshUrl = appState.currentJob?.outputUrl
- // ── Model extensions → HTTP API ───────────────────────────────────
- // Process extensions → IPC runProcess
- const isModelNode = ext?.type === 'model'
-
- if (isModelNode) {
- const activeImagePath = nodeInputPath ?? selectedImagePath
- const base64 = selectedImageData && nodeInputPath === undefined
- ? selectedImageData
- : await window.electron.fs.readFileBase64(activeImagePath)
- const bytes = Uint8Array.from(atob(base64), (c) => c.charCodeAt(0))
- const blob = new Blob([bytes], { type: 'image/png' })
- const fname = activeImagePath.split(/[\\/]/).pop() ?? 'image.png'
-
- // For multi-input nodes: inject mesh path as params.mesh_path
- const extraParams: Record = {}
- if (nodeInputMeshPath) {
- const norm = nodeInputMeshPath.replace(/\\/g, '/')
- extraParams.mesh_path = norm.startsWith(workspaceDir)
- ? norm.slice(workspaceDir.length).replace(/^\//, '')
- : norm
- }
+ set({
+ activeWorkflowId: workflow.id,
+ nodeImageOutputs: {},
+ // Top-level Waits are pending; nested Waits start blocked until their parent finishes.
+ waitStates: Object.fromEntries(waitIds.map((id) => [id, parentWait.get(id) ? 'blocked' as WaitState : 'pending' as WaitState])),
+ runningBranchId: null,
+ runState: {
+ status: 'running', blockIndex: 0, blockTotal: totalSteps,
+ blockProgress: 0, blockStep: 'Starting…',
+ },
+ })
- // Merge schema defaults (with per-variant paramDefaults already applied)
- // under user overrides so Python receives the effective values, not an
- // empty dict that falls back to hardcoded defaults in the generator.
- const schemaDefaults = Object.fromEntries(
- (ext.params ?? []).map((p) => [p.id, p.default]),
- )
- const effectiveParams = { ...schemaDefaults, ...(node.data.params ?? {}) }
-
- const fd = new FormData()
- fd.append('image', blob, fname)
- fd.append('model_id', node.data.extensionId ?? '')
- fd.append('collection', 'Workflows')
- fd.append('remesh', 'none')
- fd.append('enable_texture', 'false')
- fd.append('texture_resolution', '1024')
- fd.append('params', JSON.stringify({ ...effectiveParams, ...extraParams }))
-
- set((s) => ({ runState: { ...s.runState, blockProgress: 5, blockStep: 'Submitting to model…' } }))
-
- const { data } = await client.post<{ job_id: string }>(
- '/generate/from-image', fd,
- { headers: { 'Content-Type': 'multipart/form-data' } },
- )
- _activeJobId.current = data.job_id
-
- while (true) {
- if (_cancel.current) {
- await client.post(`/generate/cancel/${_activeJobId.current}`).catch(() => {})
- _activeJobId.current = null
- set({ runState: IDLE, activeNodeId: null })
- return
- }
- await new Promise((r) => setTimeout(r, 1200))
-
- const { data: st } = await client.get<{
- status: string; progress?: number; step?: string; output_url?: string; error?: string
- }>(`/generate/status/${_activeJobId.current}`)
-
- if (st.status === 'done' && st.output_url) {
- const rel = st.output_url.replace(/^\/workspace\//, '')
- nodeInputPath = `${workspaceDir}/${rel}`
- _activeJobId.current = null
- set((s) => ({ runState: { ...s.runState, blockProgress: 100, blockStep: 'Generation complete' } }))
- break
- }
- if (st.status === 'error') throw new Error(st.error ?? 'Generation failed')
-
- const total = execNodes.length
- const overall = total > 0
- ? Math.round((i / total) * 100 + (st.progress ?? 0) / total)
- : st.progress ?? 0
- set((s) => ({
- runState: { ...s.runState, blockProgress: st.progress ?? s.runState.blockProgress, blockStep: st.step ?? 'Generating…' },
- }))
- useAppStore.getState().updateCurrentJob({ status: 'generating', progress: overall, step: st.step })
- }
+ appState.setCurrentJob({
+ id: crypto.randomUUID(),
+ imageFile: selectedImagePath,
+ status: 'generating',
+ progress: 0,
+ createdAt: Date.now(),
+ })
- } else {
- // ── Process extension → IPC ─────────────────────────────────────
- if (ext?.input === 'mesh' && !nodeInputPath) {
- throw new Error(`${ext.name} needs an incoming mesh connection`)
+ try {
+ const client = axios.create({ baseURL: apiUrl })
+ const settings = await window.electron.settings.get()
+ const workspaceDir = settings.workspaceDir.replace(/\\/g, '/')
+
+ const tmpAbsPath = settings.workspaceDir.replace(/[\\/]+$/, '') + '/tmp'
+ window.electron.fs.deleteDirectory(tmpAbsPath).catch(() => {})
+
+ const nodeOutputs = new Map()
+ const nodeMap = new Map(workflow.nodes.map((n) => [n.id, n]))
+
+ // Pre-populate source nodes
+ for (const node of ordered) {
+ if (node.type === 'imageNode') {
+ const fp = node.data.params?.filePath as string | undefined
+ const resolvedPath = overrideImageData ? undefined : (fp ?? selectedImagePath ?? undefined)
+ nodeOutputs.set(node.id, { filePath: resolvedPath, outputType: 'image' })
}
- if (ext?.input === 'image' && !nodeInputPath) {
- throw new Error(`${ext.name} needs an incoming image connection`)
+ if (node.type === 'textNode') {
+ nodeOutputs.set(node.id, { text: node.data.params?.text as string | undefined })
}
- if (ext?.input === 'text' && !nodeInputText) {
- throw new Error(`${ext.name} needs an incoming text connection`)
+ if (node.type === 'meshNode') {
+ const source = node.data.params?.source as 'file' | 'current' | undefined
+ if (source === 'current' && currentMeshUrl) {
+ let meshFilePath: string
+ if (currentMeshUrl.includes('serve-file?path=')) {
+ const encoded = currentMeshUrl.split('serve-file?path=')[1]
+ meshFilePath = decodeURIComponent(encoded).replace(/\\/g, '/')
+ } else {
+ const rel = currentMeshUrl.replace(/^\/workspace\//, '')
+ meshFilePath = `${workspaceDir}/${rel}`
+ }
+ nodeOutputs.set(node.id, { filePath: meshFilePath, outputType: 'mesh' })
+ } else {
+ const fp = node.data.params?.filePath as string | undefined
+ if (fp) nodeOutputs.set(node.id, { filePath: fp, outputType: 'mesh' })
+ }
}
- const parts = (node.data.extensionId ?? '').split('/')
- const extId = parts[0]
- const nodeId = parts[1] ?? ''
- const result = await window.electron.extensions.runProcess(
- extId,
- { filePath: nodeInputPath, text: nodeInputText, nodeId },
- node.data.params as Record,
- )
- if (!result.success) throw new Error(result.error ?? 'Process extension failed')
- nodeInputPath = result.result?.filePath ?? nodeInputPath
- nodeInputText = result.result?.text ?? nodeInputText
- set((s) => ({ runState: { ...s.runState, blockProgress: 100, blockStep: 'Done' } }))
}
- // Store output with type for downstream routing
- const outputType = ext?.output ?? (nodeInputPath ? 'mesh' : undefined)
- nodeOutputs.set(node.id, { filePath: nodeInputPath, text: nodeInputText, outputType })
-
- // If this node feeds an Add-to-Scene, push the mesh to currentJob
- // immediately so the 3D viewer loads it without waiting for the rest of the run.
- const norm = nodeInputPath?.replace(/\\/g, '/')
- if (
- norm?.startsWith(workspaceDir) &&
- workflow.edges.some((e) => e.source === node.id && outputNodeIds.has(e.target))
- ) {
- useAppStore.getState().updateCurrentJob({
- status: 'done',
- progress: 100,
- outputUrl: `/workspace/${norm.slice(workspaceDir.length).replace(/^\//, '')}`,
- })
+ const ctx: RunContext = {
+ workflow, allExtensions, client, workspaceDir, selectedImagePath, selectedImageData,
+ overrideImageData, nodeOutputs, nodeMap, ordered, branches, waitIds, parentWait,
+ }
+ _ctx.current = ctx
+
+ // Pre-phase: nodes that don't belong to any single branch (sources + merges).
+ for (let i = 0; i < preExecExtNodes.length; i++) {
+ if (_cancel.current) { _ctx.current = null; set({ runState: IDLE, activeNodeId: null }); return }
+ const node = preExecExtNodes[i]
+ set((s) => ({
+ activeNodeId: node.id,
+ runState: { ...s.runState, blockIndex: i, blockProgress: 0, blockStep: 'Starting…' },
+ }))
+ await executeExtensionNode(node, ctx, setRunState)
}
- }
- // ── Collect image outputs for preview nodes ───────────────────────
- const imageOutputs: Record = {}
- for (const [nodeId, out] of nodeOutputs) {
- if (out.outputType === 'image' && out.filePath) {
- const norm = out.filePath.replace(/\\/g, '/')
- if (norm.startsWith(workspaceDir)) {
- imageOutputs[nodeId] = `/workspace/${norm.slice(workspaceDir.length).replace(/^\//, '')}`
- }
+ if (waitIds.length > 0) {
+ // Hand off to the user — branches run on demand via continueRun(id).
+ set((s) => ({
+ activeNodeId: null,
+ runState: { ...s.runState, status: 'paused', blockStep: 'Pick a branch and click Continue' },
+ }))
+ return
+ }
+
+ finalize(ctx)
+ } catch (err) {
+ if (!_cancel.current) {
+ set((s) => ({ runState: { ...s.runState, status: 'error', error: String(err) }, activeNodeId: null }))
+ useAppStore.getState().updateCurrentJob({ status: 'error', error: String(err) })
}
}
+ },
+
+ async continueRun(waitId) {
+ const state = get()
+ if (state.runningBranchId !== null) return
+ // Only runnable Waits: blocked (parent not done) and running are not.
+ const ws = state.waitStates[waitId]
+ if (ws !== 'pending' && ws !== 'done' && ws !== 'error') return
+ const ctx = _ctx.current
+ if (!ctx) return
+
+ const branch = ctx.branches.get(waitId) ?? []
+ // Re-running a Wait invalidates everything downstream: descendant branches
+ // were computed against the old output, so drop their outputs and reset
+ // them to blocked until this branch produces a fresh result.
+ const descendants = descendantWaits(waitId, ctx)
+
+ _cancel.current = false
+
+ // Reset outputs for this branch's nodes so Retry re-executes cleanly.
+ for (const node of branch) ctx.nodeOutputs.delete(node.id)
+ for (const d of descendants) for (const node of ctx.branches.get(d) ?? []) ctx.nodeOutputs.delete(node.id)
+
+ set((s) => {
+ const waitStates = { ...s.waitStates, [waitId]: 'running' as WaitState }
+ for (const d of descendants) waitStates[d] = 'blocked'
+ return {
+ runningBranchId: waitId,
+ waitStates,
+ runState: { ...s.runState, status: 'running', blockIndex: 0, blockTotal: branch.length, blockProgress: 0, blockStep: branch.length === 0 ? 'Done' : 'Starting…' },
+ }
+ })
- // ── Resolve final output URL ──────────────────────────────────────
- let outputUrl: string | undefined
- let outputPath: string | undefined
-
- // Use the last AddToScene in topo order — its predecessor is the final scene mesh.
- const outputNodeDef = [...ordered].reverse().find((n) => n.type === 'outputNode')
- if (outputNodeDef) {
- for (const edge of workflow.edges.filter((e) => e.target === outputNodeDef.id)) {
- const src = nodeOutputs.get(edge.source)
- if (src?.filePath) {
- const norm = src.filePath.replace(/\\/g, '/')
- if (norm.startsWith(workspaceDir)) {
- outputUrl = `/workspace/${norm.slice(workspaceDir.length).replace(/^\//, '')}`
- }
+ const finishBranch = (next: WaitState, err?: string): void => {
+ if (_cancel.current) return
+ const newWaitStates = { ...get().waitStates, [waitId]: next }
+ // Unblock nested Waits whose parent branch just finished.
+ if (next === 'done') {
+ for (const w of ctx.waitIds) {
+ if (ctx.parentWait.get(w) === waitId && newWaitStates[w] === 'blocked') newWaitStates[w] = 'pending'
}
}
- }
- if (!outputUrl) {
- for (const node of execNodes) {
- const out = nodeOutputs.get(node.id)
- if (out?.filePath) {
- const norm = out.filePath.replace(/\\/g, '/')
- if (norm.startsWith(workspaceDir)) {
- outputUrl = `/workspace/${norm.slice(workspaceDir.length).replace(/^\//, '')}`
- } else {
- outputPath = out.filePath
- }
+ // A failed branch can never feed its descendants — surface them as error
+ // too, otherwise they stay 'blocked' and the run hangs on 'paused' forever.
+ if (next === 'error') {
+ for (const d of descendantWaits(waitId, ctx)) {
+ if (newWaitStates[d] === 'blocked') newWaitStates[d] = 'error'
}
}
- }
+ const allFinished = ctx.waitIds.every((id) => newWaitStates[id] === 'done' || newWaitStates[id] === 'error')
+ const anyError = ctx.waitIds.some((id) => newWaitStates[id] === 'error')
- set({
- activeNodeId: null,
- nodeImageOutputs: imageOutputs,
- runState: {
- status: 'done',
- blockIndex: execNodes.length > 0 ? execNodes.length - 1 : 0,
- blockTotal: execNodes.length,
- blockProgress: 100,
- blockStep: 'Done',
- outputUrl,
- outputPath,
- },
- })
- useAppStore.getState().updateCurrentJob({ status: 'done', progress: 100, outputUrl })
+ if (allFinished && !anyError) {
+ finalize(ctx, newWaitStates)
+ } else {
+ set((s) => ({
+ activeNodeId: null,
+ runningBranchId: null,
+ waitStates: newWaitStates,
+ runState: {
+ ...s.runState,
+ status: allFinished ? 'error' : 'paused',
+ error: err ?? s.runState.error,
+ blockStep: err ? `Branch failed: ${err}` : 'Pick a branch and click Continue',
+ },
+ }))
+ }
+ }
- } catch (err) {
- if (!_cancel.current) {
- set((s) => ({ runState: { ...s.runState, status: 'error', error: String(err) }, activeNodeId: null }))
- useAppStore.getState().updateCurrentJob({ status: 'error', error: String(err) })
+ try {
+ for (let i = 0; i < branch.length; i++) {
+ if (_cancel.current) return
+ const node = branch[i]
+ set((s) => ({
+ activeNodeId: node.id,
+ runState: { ...s.runState, blockIndex: i, blockProgress: 0, blockStep: 'Starting…' },
+ }))
+ await executeExtensionNode(node, ctx, setRunState)
+ }
+ finishBranch('done')
+ } catch (err) {
+ finishBranch('error', String(err))
}
- }
- },
-
- cancel() {
- _cancel.current = true
- flushResume()
- if (_activeJobId.current) {
- const apiUrl = useAppStore.getState().apiUrl
- axios.create({ baseURL: apiUrl }).post(`/generate/cancel/${_activeJobId.current}`).catch(() => {})
- _activeJobId.current = null
- }
- set({ runState: IDLE, activeNodeId: null, activeWorkflowId: null, nodeImageOutputs: {} })
- // Clear the generation HUD so it doesn't show stale progress after cancel.
- // The backend's subprocess hard-kill is asynchronous; the UI shouldn't wait.
- useAppStore.getState().setCurrentJob(null)
- },
-
- reset() {
- set({ runState: IDLE, activeNodeId: null, activeWorkflowId: null, nodeImageOutputs: {} })
- },
-
- continueRun() {
- flushResume()
- },
-}))
+ },
+
+ cancel() {
+ _cancel.current = true
+ if (_activeJobId.current) {
+ const apiUrl = useAppStore.getState().apiUrl
+ axios.create({ baseURL: apiUrl }).post(`/generate/cancel/${_activeJobId.current}`).catch(() => {})
+ _activeJobId.current = null
+ }
+ _ctx.current = null
+ set({ runState: IDLE, activeNodeId: null, activeWorkflowId: null, nodeImageOutputs: {}, waitStates: {}, runningBranchId: null })
+ useAppStore.getState().setCurrentJob(null)
+ },
+
+ reset() {
+ _ctx.current = null
+ set({ runState: IDLE, activeNodeId: null, activeWorkflowId: null, nodeImageOutputs: {}, waitStates: {}, runningBranchId: null })
+ },
+ }
+})