From b917f1ebcb5c864df463487c9894861be52fd124 Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Thu, 28 May 2026 22:54:00 +1000 Subject: [PATCH 1/3] feat: improve driver management --- CLAUDE.md | 66 ++++++++++++++++++ spec/driver_manager/driver_cleanup_spec.cr | 14 +++- spec/processes/local_spec.cr | 46 +++++++------ spec/processes/unload_leak_spec.cr | 59 ++++++++++++++++ src/core-app.cr | 4 +- src/placeos-core.cr | 2 +- src/placeos-core/driver_manager.cr | 26 ++++++- src/placeos-core/process_check.cr | 2 +- src/placeos-core/process_manager.cr | 2 +- src/placeos-core/process_manager/common.cr | 13 +++- src/placeos-core/process_manager/local.cr | 80 +++++++++++++++------- 11 files changed, 257 insertions(+), 57 deletions(-) create mode 100644 CLAUDE.md create mode 100644 spec/processes/unload_leak_spec.cr diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 00000000..7fbdd4dc --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,66 @@ +PlaceOS is a building automation platform. + +## Working on this project + +This crystal lang library is the control and logic layer, it manages the launching of client defined drivers and logic modules. + +Use `crystal tool format` and `./bin/ameba` to format and lint code. +Run specs using `./test` make sure to run specs using a subagent. You can also run individual spec files or use `focus: true` to isolate the spec you're working on. Tests can take some time to run. + +Make sure to write thorough tests. Reference existing models in the lib folder if you want to see ORM internals. There are some extentions to the ORM in this project in ./lib/placeos-models/base/* if you need to understand why something isn't working. + +Make sure to create and maintain a new plan file for each task to keep track of progress. + +## 1. Plan Node Default +- Enter plan mode for ANY non-trivial task (3+ steps or architectural decisions) +- If something goes sideways, STOP and re-plan immediately, don’t keep pushing +- Use plan mode for verification steps, not just building +- Write detailed specs upfront to reduce ambiguity + +## 2. Subagent Strategy +- Use subagents liberally to keep main context window clean +- Offload research, exploration, and parallel analysis to subagents +- For complex problems, throw more compute at it via subagents +- One task per subagent for focused execution + +## 3. Self-Improvement Loop +- After ANY correction from the user, update `tasks/lessons.md` with the pattern +- Write rules for yourself that prevent the same mistake +- Ruthlessly iterate on these lessons until mistake rate drops +- Review lessons at session start for relevant project + +## 4. Verification Before Done +- Never mark a task complete without proving it works +- Diff behavior between main and your changes when relevant +- Ask yourself: "Would a staff engineer approve this?" +- Run tests, check logs, demonstrate correctness + +## 5. Demand Elegance (Balanced) +- For non-trivial changes, pause and ask: "Is there a more elegant way?" +- If a fix feels hacky: "Knowing everything I know now, implement the elegant solution" +- Skip this for simple, obvious fixes, don’t over-engineer +- Challenge your own work before presenting it + +## 6. Autonomous Bug Fixing +- When given a bug report, don’t ask for hand-holding +- Don’t start by trying to fix it. Instead, start by writing a test that reproduces the bug. Then, have subagents try to fix the bug and prove it by passing that test. +- Point at logs, errors, failing tests, then resolve them +- Zero context switching required from the user + +--- + +## Task Management + +1. **Plan First**: Write plan to `tasks/todo.md` with checkable items +2. **Verify Plan**: Check in before starting implementation +3. **Track Progress**: Mark items complete as you go +4. **Explain Changes**: High-level summary at each step +5. **Document Results**: Add review section to `tasks/todo.md` +6. **Capture Lessons**: Update `tasks/lessons.md` after corrections + +--- + +## Core Principles + +- **Simplicity First**: Make every change as simple as possible. Impact minimal code. +- **No Laziness**: Find root causes. No temporary fixes. Senior developer standards. diff --git a/spec/driver_manager/driver_cleanup_spec.cr b/spec/driver_manager/driver_cleanup_spec.cr index c2ccb768..53ffce23 100644 --- a/spec/driver_manager/driver_cleanup_spec.cr +++ b/spec/driver_manager/driver_cleanup_spec.cr @@ -24,9 +24,19 @@ module PlaceOS::Core tracker = DriverCleanup::StaleProcessTracker.new(DriverStore::BINARY_PATH, REDIS_CLIENT) stale_list = tracker.update_and_find_stale(ENV["STALE_THRESHOLD_DAYS"]?.try &.to_i || 30) stale_list.size.should eq(0) - driver_file = Path[DriverStore::BINARY_PATH, "drivers_place_private_helper_cce023a_#{Core::ARCH}"].to_s - value = REDIS_CLIENT.hgetall(driver_file) + # `driver_path` is derived from the actual driver's file_name + commit, not + # a hardcoded string (which goes stale as private-drivers' master moves on). + value = REDIS_CLIENT.hgetall(driver_path) value["last_executed_at"].to_i64.should be > 0 + ensure + # Without these stops the DriverResource feed (a Resource change-feed + # listener) keeps running across later spec files. When a subsequent spec calls + # `clear_tables`, the feed sees the `:deleted` events and removes the shared driver + # binaries from disk, causing later tests that launch drivers to hang on `start_process`. + if (mm = module_manager) && (m = mod) + mm.unload_module(m) rescue nil + end + resource_manager.try &.stop rescue nil end end end diff --git a/spec/processes/local_spec.cr b/spec/processes/local_spec.cr index 00fc19e4..b4cee663 100644 --- a/spec/processes/local_spec.cr +++ b/spec/processes/local_spec.cr @@ -58,6 +58,28 @@ module PlaceOS::Core::ProcessManager code.should eq 200 end + # Drain debug messages from the channel until we see the expected echo + # or hit the deadline. The driver emits startup status messages + # (e.g. `proxy_in_use`, `connected`) after `start` runs, and those land + # on the debug channel ahead of the echo response — collecting a fixed + # count of 2 messages is racy and skips past `[1,"hello"]` on slow boots. + drain_for_echo = ->(channel : Channel(String), deadline : Time::Span) { + collected = [] of String + ends_at = Time.monotonic + deadline + loop do + remaining = ends_at - Time.monotonic + break if remaining <= Time::Span.zero + select + when message = channel.receive + collected << message + break if message == %([1,"hello"]) + when timeout remaining + break + end + end + collected + } + it "debug" do pm = Local.new(discovery_mock) module_id = mod.id.as(String) @@ -73,17 +95,7 @@ module PlaceOS::Core::ProcessManager result.should eq %("hello") code.should eq 200 - messages = [] of String - 2.times do - select - when message = message_channel.receive - messages << message - when timeout 2.seconds - break - end - end - - messages.should contain %([1,"hello"]) + drain_for_echo.call(message_channel, 5.seconds).should contain %([1,"hello"]) end it "ignore" do @@ -103,17 +115,7 @@ module PlaceOS::Core::ProcessManager result.should eq %("hello") code.should eq 200 - messages = [] of String - 2.times do - select - when message = message_channel.receive - messages << message - when timeout 2.seconds - break - end - end - - messages.should contain %([1,"hello"]) + drain_for_echo.call(message_channel, 5.seconds).should contain %([1,"hello"]) pm.ignore(module_id, &callback) diff --git a/spec/processes/unload_leak_spec.cr b/spec/processes/unload_leak_spec.cr new file mode 100644 index 00000000..1b277f5f --- /dev/null +++ b/spec/processes/unload_leak_spec.cr @@ -0,0 +1,59 @@ +require "../helper" + +# Regression coverage for a fiber/heap leak in `ProcessManager::Common#unload`. +# +# A `Driver::Protocol::Management` spawns a long-lived `process_events` fiber in +# its constructor (i.e. as soon as a driver is `load`ed, before any process is +# launched). That fiber loops `until terminated?`, and the ONLY thing that flips +# `@terminated` is `Management#terminate`. If `unload` drops the manager from its +# maps without terminating it, the suspended `process_events` fiber keeps the +# whole manager object alive forever -> unbounded fiber + heap growth under the +# repeated load/unload churn produced by lazy modules. +# +# These specs deliberately use a dummy driver_key so they exercise the unload +# lifecycle without depending on a compiled driver binary (the leaked fiber is +# created in the manager constructor, no child process required). +module PlaceOS::Core::ProcessManager + count_process_events_fibers = -> { + count = 0 + Fiber.each { |fiber| count += 1 if fiber.name == "process_events" } + count + } + + describe Local, tags: "processes" do + it "terminates the driver manager when the last module is unloaded" do + pm = Local.new(discovery_mock) + driver_key = "leak-test-#{UUID.random}" + module_id = "leak-mod" + + pm.load(module_id: module_id, driver_key: driver_key) + manager = pm.protocol_manager_by_driver?(driver_key).not_nil! + manager.terminated?.should be_false + + pm.unload(module_id) + + # allow the :terminate event to be processed by process_events + sleep 200.milliseconds + manager.terminated?.should be_true + end + + it "does not leak process_events fibers across repeated load/unload cycles" do + pm = Local.new(discovery_mock) + before = count_process_events_fibers.call + + 5.times do |i| + driver_key = "leak-cycle-#{UUID.random}" + module_id = "cycle-mod-#{i}" + + pm.load(module_id: module_id, driver_key: driver_key) + pm.unload(module_id) + sleep 100.milliseconds + end + + # give any pending terminations time to unwind their fibers + sleep 200.milliseconds + + (count_process_events_fibers.call - before).should eq 0 + end + end +end diff --git a/src/core-app.cr b/src/core-app.cr index a235dd32..5da1fc71 100644 --- a/src/core-app.cr +++ b/src/core-app.cr @@ -90,7 +90,7 @@ server.cluster(process_count, "-w", "--workers") if cluster terminate = Proc(Signal, Nil).new do |signal| puts " > terminating gracefully" - spawn { server.close } + spawn(name: "server-shutdown") { server.close } signal.ignore end @@ -102,7 +102,7 @@ Signal::TERM.trap &terminate # Wait for redis and postgres to be ready PlaceOS::Core.wait_for_resources -spawn do +spawn(name: "start-managers") do begin PlaceOS::Core.start_managers rescue error diff --git a/src/placeos-core.cr b/src/placeos-core.cr index 603b72ca..b54c7f2c 100644 --- a/src/placeos-core.cr +++ b/src/placeos-core.cr @@ -18,7 +18,7 @@ module PlaceOS::Core # Acquire resources on startup resource_manager.start do # Start managing modules once relevant resources present - spawn { module_manager.start } + spawn(name: "module-manager-start") { module_manager.start } Fiber.yield end end diff --git a/src/placeos-core/driver_manager.cr b/src/placeos-core/driver_manager.cr index 03b03154..de53846a 100644 --- a/src/placeos-core/driver_manager.cr +++ b/src/placeos-core/driver_manager.cr @@ -133,8 +133,30 @@ module PlaceOS::Core def self.remove_driver(driver : Model::Driver, store : DriverStore) path = store.driver_binary_path(driver.file_name, driver.commit) - Log.info { {message: "removing driver binary as it got removed from drivers", driver_id: driver.id.as(String), path: path.to_s} } - remove_stale_driver(path, driver.id.as(String)) + driver_id = driver.id.as(String) + + # A binary is keyed by `file_name + commit` and may be shared by multiple + # `Model::Driver` rows (e.g., the same source compiled for different roles, + # or a row recreated mid-flight). Deleting the file on every :Deleted event + # creates a race where a sibling row that still needs the binary loses it. + # Skip the deletion when another row references the same binary; otherwise + # `DriverIntegrity.sync_drivers` handles cleanup of truly-stale binaries. + still_referenced = begin + Model::Driver.where(file_name: driver.file_name, commit: driver.commit).any? do |other| + other.id != driver_id + end + rescue ex + Log.warn(exception: ex) { {message: "could not check for sibling driver rows; skipping binary removal", driver_id: driver_id, path: path.to_s} } + true + end + + if still_referenced + Log.info { {message: "skipping driver binary removal, still referenced by another driver row", driver_id: driver_id, path: path.to_s} } + return + end + + Log.info { {message: "removing driver binary as it got removed from drivers", driver_id: driver_id, path: path.to_s} } + remove_stale_driver(path, driver_id) end def start_driver_jobs diff --git a/src/placeos-core/process_check.cr b/src/placeos-core/process_check.cr index a687d06c..42f44e32 100644 --- a/src/placeos-core/process_check.cr +++ b/src/placeos-core/process_check.cr @@ -47,7 +47,7 @@ module PlaceOS::Core # Asynchronously check if any processes are timing out on comms, and if so, restart them grouped_managers.each do |protocol_manager, module_ids| # Asynchronously check if any processes are timing out on comms, and if so, restart them - spawn do + spawn(name: "liveness-check") do state = begin # If there's an empty response, the modules that were meant to be running are not. # This is taken as a sign that the process is dead. diff --git a/src/placeos-core/process_manager.cr b/src/placeos-core/process_manager.cr index e0b61448..500268fd 100644 --- a/src/placeos-core/process_manager.cr +++ b/src/placeos-core/process_manager.cr @@ -40,7 +40,7 @@ module PlaceOS::Core debug(module_id, &callback) # Asyncronously send debug messages from the module - spawn do + spawn(name: "debug-forward") do while message = channel.receive? socket.send(message) end diff --git a/src/placeos-core/process_manager/common.cr b/src/placeos-core/process_manager/common.cr index 736e75cc..4fa54623 100644 --- a/src/placeos-core/process_manager/common.cr +++ b/src/placeos-core/process_manager/common.cr @@ -9,6 +9,14 @@ module PlaceOS::Core::ProcessManager::Common raise ModuleError.new("No protocol manager for #{module_id}") if manager.nil? + # Fail fast when the driver binary is missing. Without this check, the + # underlying `Management#start_process` would time out internally without + # ever rejecting the `@starting` promise, leaving the caller blocked forever. + driver_path = manager.@driver_path + unless File.exists?(driver_path) + raise ModuleError.new("Driver binary missing for #{module_id} at #{driver_path}") + end + manager.start(module_id, payload) rescue exception raise module_error(module_id, exception) @@ -239,14 +247,17 @@ module PlaceOS::Core::ProcessManager::Common protected def set_driver_protocol_manager(driver_key, manager : Driver::Protocol::Management?) driver_key = ProcessManager.path_to_key(driver_key) Log.trace { {message: "#{manager.nil? ? "removing" : "setting"} driver process manager", driver_key: driver_key} } + removed_man = nil protocol_manager_lock.synchronize do if manager.nil? - @driver_protocol_managers.delete(driver_key) + removed_man = @driver_protocol_managers.delete(driver_key) else @driver_protocol_managers[driver_key] = manager manager end end + removed_man.try(&.terminate) rescue nil + manager end # Error handling diff --git a/src/placeos-core/process_manager/local.cr b/src/placeos-core/process_manager/local.cr index 39f757c9..19f70c6b 100644 --- a/src/placeos-core/process_manager/local.cr +++ b/src/placeos-core/process_manager/local.cr @@ -62,14 +62,21 @@ module PlaceOS::Core def execute(module_id : String, payload : String | IO, user_id : String?, mod : Model::Module? = nil) mod = mod || Model::Module.find?(module_id) - raise ModuleError.new("Could not locate module #{module_id}, no matching database record") unless mod - # Check if this is a lazy module that needs to be loaded - return execute_lazy(mod, payload, user_id) if mod.launch_on_execute - raise ModuleError.new("Could not locate module #{module_id}, it is stopped") unless mod.running + # Lazy modules are launched on demand and require the database record + return execute_lazy(mod, payload, user_id) if mod && mod.launch_on_execute + + # Prefer an already-loaded module: a running process can be executed even + # when the database lookup is unavailable (e.g. a stale/missing record). + manager = protocol_manager_by_module?(module_id) + + if manager.nil? + # Not loaded: the database record is needed to (re)load the module + raise ModuleError.new("Could not locate module #{module_id}, no matching database record") unless mod + raise ModuleError.new("Could not locate module #{module_id}, it is stopped") unless mod.running + manager = ensure_lazy_module_loaded(mod) + end - # the module should be running and have a management module - manager = protocol_manager_by_module?(module_id) || ensure_lazy_module_loaded(mod) request_body = payload.is_a?(IO) ? payload.gets_to_end : payload manager.execute( module_id, @@ -107,35 +114,58 @@ module PlaceOS::Core end end + # Per-module load locks: serialize concurrent `ensure_lazy_module_loaded` calls + # for the same module so only one fiber spawns the driver. Without this, + # concurrent first-time executes race past the `protocol_manager_by_module?` + # check, each create their own `Driver::Protocol::Management` (and a child + # driver process), and only the last to `set_module_protocol_manager` wins — + # the rest leak orphaned managers + zombie driver processes. + private getter lazy_load_locks : Hash(String, Mutex) = {} of String => Mutex + private getter lazy_load_locks_lock : Mutex = Mutex.new + + private def lazy_load_lock_for(module_id : String) : Mutex + lazy_load_locks_lock.synchronize do + lazy_load_locks[module_id] ||= Mutex.new + end + end + # Spawn driver and load module for lazy execution + # + # No fast path here — `load()` registers the manager in `protocol_manager_by_module?` + # *before* `manager.start` returns. A check outside the lock would let a second + # fiber observe the freshly-mapped manager and proceed to `manager.execute` while + # the driver-side module hasn't been registered yet ("driver not available"). + # Every caller goes through the lock so all execs happen strictly after start. private def ensure_lazy_module_loaded(mod : Model::Module) module_id = mod.id.as(String) - # Already loaded? - if manager = protocol_manager_by_module?(module_id) - Log.debug { {message: "lazy module already loaded", module_id: module_id} } - return manager - end + lazy_load_lock_for(module_id).synchronize do + if manager = protocol_manager_by_module?(module_id) + Log.debug { {message: "lazy module already loaded", module_id: module_id} } + return manager + end - driver = mod.driver! - repository = driver.repository! + driver = mod.driver! + repository = driver.repository! - driver_path = store.built?(driver.file_name, driver.commit, repository.branch, repository.uri) - raise ModuleError.new("Driver not compiled for lazy module #{module_id}") if driver_path.nil? + driver_path = store.built?(driver.file_name, driver.commit, repository.branch, repository.uri) + raise ModuleError.new("Driver not compiled for lazy module #{module_id}") if driver_path.nil? - ::Log.with_context(module_id: module_id, driver_key: driver_path) do - # Spawn driver and register module - load(module_id, driver_path.to_s) + ::Log.with_context(module_id: module_id, driver_key: driver_path) do + # Spawn driver and register module + load(module_id, driver_path.to_s) - # Start the module instance - manager = protocol_manager_by_module?(module_id) - raise ModuleError.new("Failed to load lazy module #{module_id}") if manager.nil? + # Start the module instance — blocks until the driver acks the start, + # so the manager is fully ready by the time we release the lock. + manager = protocol_manager_by_module?(module_id) + raise ModuleError.new("Failed to load lazy module #{module_id}") if manager.nil? - manager.start(module_id, ModuleManager.start_payload(mod)) + manager.start(module_id, ModuleManager.start_payload(mod)) - Log.info { {message: "spawned driver for lazy module execution", module_id: module_id, name: mod.name} } + Log.info { {message: "spawned driver for lazy module execution", module_id: module_id, name: mod.name} } + end + protocol_manager_by_module?(module_id).as(Driver::Protocol::Management) end - manager.as(Driver::Protocol::Management) end # Schedule unload of lazy module after idle timeout @@ -147,7 +177,7 @@ module PlaceOS::Core lazy_unload_scheduled[module_id] = true end - spawn do + spawn(name: "lazy-unload") do sleep ModuleManager.lazy_unload_delay # Check if still no active executions and unload still scheduled From a1a6b8eef3a6339c06f0cec232dcfad0c3d9c9ba Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Thu, 28 May 2026 23:05:41 +1000 Subject: [PATCH 2/3] fix(process_manager): improve performance --- src/placeos-core/process_manager/local.cr | 106 +++++++++++++--------- 1 file changed, 65 insertions(+), 41 deletions(-) diff --git a/src/placeos-core/process_manager/local.cr b/src/placeos-core/process_manager/local.cr index 19f70c6b..7fb0537b 100644 --- a/src/placeos-core/process_manager/local.cr +++ b/src/placeos-core/process_manager/local.cr @@ -114,57 +114,81 @@ module PlaceOS::Core end end - # Per-module load locks: serialize concurrent `ensure_lazy_module_loaded` calls - # for the same module so only one fiber spawns the driver. Without this, - # concurrent first-time executes race past the `protocol_manager_by_module?` - # check, each create their own `Driver::Protocol::Management` (and a child - # driver process), and only the last to `set_module_protocol_manager` wins — - # the rest leak orphaned managers + zombie driver processes. - private getter lazy_load_locks : Hash(String, Mutex) = {} of String => Mutex - private getter lazy_load_locks_lock : Mutex = Mutex.new - - private def lazy_load_lock_for(module_id : String) : Mutex - lazy_load_locks_lock.synchronize do - lazy_load_locks[module_id] ||= Mutex.new - end - end + # Per-module load coordination. While a module is being lazy-loaded an entry + # exists in this hash holding a `Channel(Nil)`; concurrent callers see the + # entry and wait on the channel — the loader closes it (in `ensure`) on + # success or failure, waking everyone to re-check `protocol_manager_by_module?`. + # The entry is removed when the load resolves, so the hash size stays bounded + # by the number of modules *currently* loading (typically a handful), not by + # the cumulative count of distinct modules ever seen. One mutex protects the + # whole hash — same approach as `Edge::Client#load_binary`. + private getter lazy_loading : Hash(String, Channel(Nil)) = {} of String => Channel(Nil) + private getter lazy_loading_lock : Mutex = Mutex.new # Spawn driver and load module for lazy execution # - # No fast path here — `load()` registers the manager in `protocol_manager_by_module?` - # *before* `manager.start` returns. A check outside the lock would let a second - # fiber observe the freshly-mapped manager and proceed to `manager.execute` while - # the driver-side module hasn't been registered yet ("driver not available"). - # Every caller goes through the lock so all execs happen strictly after start. + # Serialized so that `load()` registering the manager and `manager.start` + # finishing happen atomically from the perspective of other callers: without + # this a second fiber could observe the freshly-mapped manager (set by `load`) + # and call `manager.execute` while the driver-side module isn't registered + # yet ("driver not available"). private def ensure_lazy_module_loaded(mod : Model::Module) module_id = mod.id.as(String) - lazy_load_lock_for(module_id).synchronize do - if manager = protocol_manager_by_module?(module_id) - Log.debug { {message: "lazy module already loaded", module_id: module_id} } - return manager + loop do + waiter, perform_load = lazy_loading_lock.synchronize do + # An in-flight load entry is the source of truth while loading is + # in progress. `load()` registers the manager in + # `protocol_manager_by_module?` *before* `manager.start` returns, so + # checking that map first would let us observe a half-started manager + # and proceed to `manager.execute` before the driver has acked start. + # Only trust the manager map once the load entry is gone. + if existing = lazy_loading[module_id]? + {existing, false} + elsif manager = protocol_manager_by_module?(module_id) + Log.debug { {message: "lazy module already loaded", module_id: module_id} } + return manager + else + chan = Channel(Nil).new + lazy_loading[module_id] = chan + {chan, true} + end end - driver = mod.driver! - repository = driver.repository! - - driver_path = store.built?(driver.file_name, driver.commit, repository.branch, repository.uri) - raise ModuleError.new("Driver not compiled for lazy module #{module_id}") if driver_path.nil? - - ::Log.with_context(module_id: module_id, driver_key: driver_path) do - # Spawn driver and register module - load(module_id, driver_path.to_s) - - # Start the module instance — blocks until the driver acks the start, - # so the manager is fully ready by the time we release the lock. - manager = protocol_manager_by_module?(module_id) - raise ModuleError.new("Failed to load lazy module #{module_id}") if manager.nil? - - manager.start(module_id, ModuleManager.start_payload(mod)) + unless perform_load + # Another fiber is loading — wait for it to finish, then re-check. + waiter.receive? + next + end - Log.info { {message: "spawned driver for lazy module execution", module_id: module_id, name: mod.name} } + begin + driver = mod.driver! + repository = driver.repository! + + driver_path = store.built?(driver.file_name, driver.commit, repository.branch, repository.uri) + raise ModuleError.new("Driver not compiled for lazy module #{module_id}") if driver_path.nil? + + ::Log.with_context(module_id: module_id, driver_key: driver_path) do + # Spawn driver and register module + load(module_id, driver_path.to_s) + + # Start the module instance — blocks until the driver acks the start, + # so the manager is fully ready by the time we signal waiters. + manager = protocol_manager_by_module?(module_id) + raise ModuleError.new("Failed to load lazy module #{module_id}") if manager.nil? + + manager.start(module_id, ModuleManager.start_payload(mod)) + + Log.info { {message: "spawned driver for lazy module execution", module_id: module_id, name: mod.name} } + end + return protocol_manager_by_module?(module_id).as(Driver::Protocol::Management) + ensure + # Clear the in-flight marker and wake any waiters (on success or failure). + # On failure, one of the waiters will re-check, find no manager, and + # become the next loader. + lazy_loading_lock.synchronize { lazy_loading.delete(module_id) } + waiter.close end - protocol_manager_by_module?(module_id).as(Driver::Protocol::Management) end end From bae57a5dc6651dcf19b5230fc1c664b036df993f Mon Sep 17 00:00:00 2001 From: Stephen von Takach Date: Thu, 28 May 2026 23:37:44 +1000 Subject: [PATCH 3/3] ensure single build service hit no concurrent calls or queuing --- spec/driver_manager/driver_store_spec.cr | 93 +++++++++++++++++++ .../driver_manager/driver_store.cr | 90 ++++++++++++++++-- 2 files changed, 173 insertions(+), 10 deletions(-) create mode 100644 spec/driver_manager/driver_store_spec.cr diff --git a/spec/driver_manager/driver_store_spec.cr b/spec/driver_manager/driver_store_spec.cr new file mode 100644 index 00000000..63f9e835 --- /dev/null +++ b/spec/driver_manager/driver_store_spec.cr @@ -0,0 +1,93 @@ +require "../helper" + +module PlaceOS::Core + describe DriverStore, tags: "driver_store" do + describe ".compiled?" do + # Regression: previously, multiple modules sharing the same driver + # (`file_name + commit`) would each take their own `compiled?` path, + # all see `File.exists?` as false, and each call the build service and + # `fetch_binary` to the same on-disk path — racing the write and hammering + # the build service for the same binary. `compiled?` now coordinates + # in-flight fetches keyed by binary path: one fiber fetches, the rest wait + # on a channel and observe the binary once it's on disk. + it "dedupes concurrent fetches of the same binary" do + _, driver, _ = setup + + store = DriverStore.new + path = store.driver_binary_path(driver.file_name, driver.commit) + + # Force a first-time fetch by removing any binary left on disk from a + # prior spec, and zero out the slow-path counter. + File.delete(path.to_s) rescue nil + DriverStore.reset_compiled_attempts + + repo = driver.repository! + concurrency = 5 + results = Channel(Bool).new(concurrency) + + concurrency.times do + spawn do + results.send store.compiled?(driver.file_name, driver.commit, repo.branch, repo.uri) + end + end + + concurrency.times { results.receive.should be_true } + + # All `concurrency` callers should resolve through a single fetch: + # one fiber became the loader, the rest waited on the channel and then + # found the binary already on disk. + DriverStore.compiled_attempts.should eq 1 + File.exists?(path.to_s).should be_true + end + + it "shares a single failed fetch across concurrent callers" do + _, driver, _ = setup + + store = DriverStore.new + repo = driver.repository! + + # A commit that the build service does not (and cannot) produce — every + # `compiled?` against it must come back `false`. With dedup, all + # concurrent callers observe the *same* failed fetch (one round-trip); + # without it, each caller would pay its own round-trip to the build + # service. + bogus_commit = "deadbee" + path = store.driver_binary_path(driver.file_name, bogus_commit).to_s + File.delete(path) rescue nil + DriverStore.reset_compiled_attempts + + concurrency = 5 + results = Channel(Bool).new(concurrency) + concurrency.times do + spawn do + results.send store.compiled?(driver.file_name, bogus_commit, repo.branch, repo.uri) + end + end + + concurrency.times { results.receive.should be_false } + + DriverStore.compiled_attempts.should eq 1 + File.exists?(path).should be_false + end + + it "lets a later call short-circuit without a fetch when the binary is already present" do + _, driver, _ = setup + + store = DriverStore.new + repo = driver.repository! + path = store.driver_binary_path(driver.file_name, driver.commit).to_s + + # Force the warming call to go through the slow path. + File.delete(path) rescue nil + DriverStore.reset_compiled_attempts + store.compiled?(driver.file_name, driver.commit, repo.branch, repo.uri).should be_true + DriverStore.compiled_attempts.should eq 1 + + # A subsequent call should hit the fast path — no new fetch. + DriverStore.reset_compiled_attempts + store.compiled?(driver.file_name, driver.commit, repo.branch, repo.uri).should be_true + DriverStore.compiled_attempts.should eq 0 + end + end + end +end diff --git a/src/placeos-core/driver_manager/driver_store.cr b/src/placeos-core/driver_manager/driver_store.cr index c9fec062..5710c428 100644 --- a/src/placeos-core/driver_manager/driver_store.cr +++ b/src/placeos-core/driver_manager/driver_store.cr @@ -1,5 +1,6 @@ require "uri" require "digest" +require "promise" require "connect-proxy" require "./build_api" @@ -15,24 +16,93 @@ module PlaceOS::Core Dir.mkdir_p binary_path end + # Per-binary fetch coordination. Multiple modules can share a binary (same + # `file_name + commit`); without this, concurrent `compiled?` calls from + # different module loads would each `File.exists?` (false), each call out to + # the build service, and each `fetch_binary` would write to the same path at + # the same time — corrupting the file and hammering the build service for + # the same driver. + # + # While a fetch is in flight an entry exists in this hash holding a + # `Promise(Bool)`. The loader resolves the promise with its result on + # success, or rejects it with the underlying exception on failure; + # concurrent waiters call `promise.get` and observe exactly the same + # outcome (one fetch attempt per wave, success or failure shared). Hash + # entries are removed before the promise resolves, so storage is bounded by + # the number of binaries currently being fetched (typically zero in steady + # state). + @@loading_binaries : Hash(String, Promise::DeferredPromise(Bool)) = {} of String => Promise::DeferredPromise(Bool) + @@loading_binaries_lock : Mutex = Mutex.new + + # Counts traversals of the slow path in `compiled?` — i.e. how many times a + # fiber actually became the loader and called the build service. Exposed for + # tests so specs can assert that N concurrent first-time `compiled?` calls + # result in exactly one fetch attempt, not N. + @@compiled_attempts : Atomic(Int32) = Atomic(Int32).new(0) + + def self.compiled_attempts : Int32 + @@compiled_attempts.get + end + + def self.reset_compiled_attempts : Nil + @@compiled_attempts.set(0) + end + def compiled?(file_name : String, commit : String, branch : String, uri : String) : Bool Log.debug { {message: "Checking whether driver is compiled or not?", driver: file_name, commit: commit, branch: branch, repo: uri} } path = Path[binary_path, executable_name(file_name, commit)] - if File.exists?(path) - # Validate that the local file is a valid executable by running it with -h - if validate_binary(path) - return true + # Fast path — the binary is already on disk and intact. Validating outside + # the lock is safe: a partial write from a concurrent fetch will fail the + # `-h` probe and we'll fall through into the slow path. + return true if File.exists?(path) && validate_binary(path) + + key = path.to_s + promise, perform_fetch = @@loading_binaries_lock.synchronize do + if existing = @@loading_binaries[key]? + {existing, false} else - Log.warn { {message: "Local binary exists but is corrupted, removing and re-downloading", driver_file: file_name, path: path.to_s} } - File.delete(path) rescue nil + # Re-check under the lock — another fiber may have completed a + # fetch between our fast-path check and acquiring the lock. + if File.exists?(path) + if validate_binary(path) + return true + else + Log.warn { {message: "Local binary exists but is corrupted, removing and re-downloading", driver_file: file_name, path: path.to_s} } + File.delete(path) rescue nil + end + end + prom = Promise.new(Bool) + @@loading_binaries[key] = prom + {prom, true} end end - resp = BuildApi.compiled?(file_name, commit, branch, uri) - return false unless resp.success? - ret = fetch_binary(LinkData.from_json(resp.body)) rescue nil - !ret.nil? + # Waiter — share the in-flight loader's outcome (returns its value or + # re-raises its exception). + return promise.get unless perform_fetch + + # Loader — perform the fetch exactly once and broadcast the result. + result = false + error = nil + begin + @@compiled_attempts.add(1) + resp = BuildApi.compiled?(file_name, commit, branch, uri) + if resp.success? + fetched = fetch_binary(LinkData.from_json(resp.body)) rescue nil + result = !fetched.nil? + end + rescue ex + error = ex + end + + @@loading_binaries_lock.synchronize { @@loading_binaries.delete(key) } + if error + promise.reject(error) + raise error + end + promise.resolve(result) + result end def compile(file_name : String, url : String, commit : String, branch : String, force : Bool, username : String? = nil, password : String? = nil, fetch : Bool = true) : Result