Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
PlaceOS is a building automation platform.

## Working on this project

This crystal lang library is the control and logic layer, it manages the launching of client defined drivers and logic modules.

Use `crystal tool format` and `./bin/ameba` to format and lint code.
Run specs using `./test` make sure to run specs using a subagent. You can also run individual spec files or use `focus: true` to isolate the spec you're working on. Tests can take some time to run.

Make sure to write thorough tests. Reference existing models in the lib folder if you want to see ORM internals. There are some extentions to the ORM in this project in ./lib/placeos-models/base/* if you need to understand why something isn't working.

Make sure to create and maintain a new plan file for each task to keep track of progress.

## 1. Plan Node Default
- Enter plan mode for ANY non-trivial task (3+ steps or architectural decisions)
- If something goes sideways, STOP and re-plan immediately, don’t keep pushing
- Use plan mode for verification steps, not just building
- Write detailed specs upfront to reduce ambiguity

## 2. Subagent Strategy
- Use subagents liberally to keep main context window clean
- Offload research, exploration, and parallel analysis to subagents
- For complex problems, throw more compute at it via subagents
- One task per subagent for focused execution

## 3. Self-Improvement Loop
- After ANY correction from the user, update `tasks/lessons.md` with the pattern
- Write rules for yourself that prevent the same mistake
- Ruthlessly iterate on these lessons until mistake rate drops
- Review lessons at session start for relevant project

## 4. Verification Before Done
- Never mark a task complete without proving it works
- Diff behavior between main and your changes when relevant
- Ask yourself: "Would a staff engineer approve this?"
- Run tests, check logs, demonstrate correctness

## 5. Demand Elegance (Balanced)
- For non-trivial changes, pause and ask: "Is there a more elegant way?"
- If a fix feels hacky: "Knowing everything I know now, implement the elegant solution"
- Skip this for simple, obvious fixes, don’t over-engineer
- Challenge your own work before presenting it

## 6. Autonomous Bug Fixing
- When given a bug report, don’t ask for hand-holding
- Don’t start by trying to fix it. Instead, start by writing a test that reproduces the bug. Then, have subagents try to fix the bug and prove it by passing that test.
- Point at logs, errors, failing tests, then resolve them
- Zero context switching required from the user

---

## Task Management

1. **Plan First**: Write plan to `tasks/todo.md` with checkable items
2. **Verify Plan**: Check in before starting implementation
3. **Track Progress**: Mark items complete as you go
4. **Explain Changes**: High-level summary at each step
5. **Document Results**: Add review section to `tasks/todo.md`
6. **Capture Lessons**: Update `tasks/lessons.md` after corrections

---

## Core Principles

- **Simplicity First**: Make every change as simple as possible. Impact minimal code.
- **No Laziness**: Find root causes. No temporary fixes. Senior developer standards.
14 changes: 12 additions & 2 deletions spec/driver_manager/driver_cleanup_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,19 @@ module PlaceOS::Core
tracker = DriverCleanup::StaleProcessTracker.new(DriverStore::BINARY_PATH, REDIS_CLIENT)
stale_list = tracker.update_and_find_stale(ENV["STALE_THRESHOLD_DAYS"]?.try &.to_i || 30)
stale_list.size.should eq(0)
driver_file = Path[DriverStore::BINARY_PATH, "drivers_place_private_helper_cce023a_#{Core::ARCH}"].to_s
value = REDIS_CLIENT.hgetall(driver_file)
# `driver_path` is derived from the actual driver's file_name + commit, not
# a hardcoded string (which goes stale as private-drivers' master moves on).
value = REDIS_CLIENT.hgetall(driver_path)
value["last_executed_at"].to_i64.should be > 0
ensure
# Without these stops the DriverResource feed (a Resource<Model::Driver> change-feed
# listener) keeps running across later spec files. When a subsequent spec calls
# `clear_tables`, the feed sees the `:deleted` events and removes the shared driver
# binaries from disk, causing later tests that launch drivers to hang on `start_process`.
if (mm = module_manager) && (m = mod)
mm.unload_module(m) rescue nil
end
resource_manager.try &.stop rescue nil
end
end
end
93 changes: 93 additions & 0 deletions spec/driver_manager/driver_store_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
require "../helper"

module PlaceOS::Core
describe DriverStore, tags: "driver_store" do
describe ".compiled?" do
# Regression: previously, multiple modules sharing the same driver
# (`file_name + commit`) would each take their own `compiled?` path,
# all see `File.exists?` as false, and each call the build service and
# `fetch_binary` to the same on-disk path — racing the write and hammering
# the build service for the same binary. `compiled?` now coordinates
# in-flight fetches keyed by binary path: one fiber fetches, the rest wait
# on a channel and observe the binary once it's on disk.
it "dedupes concurrent fetches of the same binary" do
_, driver, _ = setup

store = DriverStore.new
path = store.driver_binary_path(driver.file_name, driver.commit)

# Force a first-time fetch by removing any binary left on disk from a
# prior spec, and zero out the slow-path counter.
File.delete(path.to_s) rescue nil
DriverStore.reset_compiled_attempts

repo = driver.repository!
concurrency = 5
results = Channel(Bool).new(concurrency)

concurrency.times do
spawn do
results.send store.compiled?(driver.file_name, driver.commit, repo.branch, repo.uri)
end
end

concurrency.times { results.receive.should be_true }

# All `concurrency` callers should resolve through a single fetch:
# one fiber became the loader, the rest waited on the channel and then
# found the binary already on disk.
DriverStore.compiled_attempts.should eq 1
File.exists?(path.to_s).should be_true
end

it "shares a single failed fetch across concurrent callers" do
_, driver, _ = setup

store = DriverStore.new
repo = driver.repository!

# A commit that the build service does not (and cannot) produce — every
# `compiled?` against it must come back `false`. With dedup, all
# concurrent callers observe the *same* failed fetch (one round-trip);
# without it, each caller would pay its own round-trip to the build
# service.
bogus_commit = "deadbee"
path = store.driver_binary_path(driver.file_name, bogus_commit).to_s
File.delete(path) rescue nil
DriverStore.reset_compiled_attempts

concurrency = 5
results = Channel(Bool).new(concurrency)
concurrency.times do
spawn do
results.send store.compiled?(driver.file_name, bogus_commit, repo.branch, repo.uri)
end
end

concurrency.times { results.receive.should be_false }

DriverStore.compiled_attempts.should eq 1
File.exists?(path).should be_false
end

it "lets a later call short-circuit without a fetch when the binary is already present" do
_, driver, _ = setup

store = DriverStore.new
repo = driver.repository!
path = store.driver_binary_path(driver.file_name, driver.commit).to_s

# Force the warming call to go through the slow path.
File.delete(path) rescue nil
DriverStore.reset_compiled_attempts
store.compiled?(driver.file_name, driver.commit, repo.branch, repo.uri).should be_true
DriverStore.compiled_attempts.should eq 1

# A subsequent call should hit the fast path — no new fetch.
DriverStore.reset_compiled_attempts
store.compiled?(driver.file_name, driver.commit, repo.branch, repo.uri).should be_true
DriverStore.compiled_attempts.should eq 0
end
end
end
end
46 changes: 24 additions & 22 deletions spec/processes/local_spec.cr
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,28 @@ module PlaceOS::Core::ProcessManager
code.should eq 200
end

# Drain debug messages from the channel until we see the expected echo
# or hit the deadline. The driver emits startup status messages
# (e.g. `proxy_in_use`, `connected`) after `start` runs, and those land
# on the debug channel ahead of the echo response — collecting a fixed
# count of 2 messages is racy and skips past `[1,"hello"]` on slow boots.
drain_for_echo = ->(channel : Channel(String), deadline : Time::Span) {
collected = [] of String
ends_at = Time.monotonic + deadline
loop do
remaining = ends_at - Time.monotonic
break if remaining <= Time::Span.zero
select
when message = channel.receive
collected << message
break if message == %([1,"hello"])
when timeout remaining
break
end
end
collected
}

it "debug" do
pm = Local.new(discovery_mock)
module_id = mod.id.as(String)
Expand All @@ -73,17 +95,7 @@ module PlaceOS::Core::ProcessManager
result.should eq %("hello")
code.should eq 200

messages = [] of String
2.times do
select
when message = message_channel.receive
messages << message
when timeout 2.seconds
break
end
end

messages.should contain %([1,"hello"])
drain_for_echo.call(message_channel, 5.seconds).should contain %([1,"hello"])
end

it "ignore" do
Expand All @@ -103,17 +115,7 @@ module PlaceOS::Core::ProcessManager
result.should eq %("hello")
code.should eq 200

messages = [] of String
2.times do
select
when message = message_channel.receive
messages << message
when timeout 2.seconds
break
end
end

messages.should contain %([1,"hello"])
drain_for_echo.call(message_channel, 5.seconds).should contain %([1,"hello"])

pm.ignore(module_id, &callback)

Expand Down
59 changes: 59 additions & 0 deletions spec/processes/unload_leak_spec.cr
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
require "../helper"

# Regression coverage for a fiber/heap leak in `ProcessManager::Common#unload`.
#
# A `Driver::Protocol::Management` spawns a long-lived `process_events` fiber in
# its constructor (i.e. as soon as a driver is `load`ed, before any process is
# launched). That fiber loops `until terminated?`, and the ONLY thing that flips
# `@terminated` is `Management#terminate`. If `unload` drops the manager from its
# maps without terminating it, the suspended `process_events` fiber keeps the
# whole manager object alive forever -> unbounded fiber + heap growth under the
# repeated load/unload churn produced by lazy modules.
#
# These specs deliberately use a dummy driver_key so they exercise the unload
# lifecycle without depending on a compiled driver binary (the leaked fiber is
# created in the manager constructor, no child process required).
module PlaceOS::Core::ProcessManager
count_process_events_fibers = -> {
count = 0
Fiber.each { |fiber| count += 1 if fiber.name == "process_events" }
count
}

describe Local, tags: "processes" do
it "terminates the driver manager when the last module is unloaded" do
pm = Local.new(discovery_mock)
driver_key = "leak-test-#{UUID.random}"
module_id = "leak-mod"

pm.load(module_id: module_id, driver_key: driver_key)
manager = pm.protocol_manager_by_driver?(driver_key).not_nil!
manager.terminated?.should be_false

pm.unload(module_id)

# allow the :terminate event to be processed by process_events
sleep 200.milliseconds
manager.terminated?.should be_true
end

it "does not leak process_events fibers across repeated load/unload cycles" do
pm = Local.new(discovery_mock)
before = count_process_events_fibers.call

5.times do |i|
driver_key = "leak-cycle-#{UUID.random}"
module_id = "cycle-mod-#{i}"

pm.load(module_id: module_id, driver_key: driver_key)
pm.unload(module_id)
sleep 100.milliseconds
end

# give any pending terminations time to unwind their fibers
sleep 200.milliseconds

(count_process_events_fibers.call - before).should eq 0
end
end
end
4 changes: 2 additions & 2 deletions src/core-app.cr
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ server.cluster(process_count, "-w", "--workers") if cluster

terminate = Proc(Signal, Nil).new do |signal|
puts " > terminating gracefully"
spawn { server.close }
spawn(name: "server-shutdown") { server.close }
signal.ignore
end

Expand All @@ -102,7 +102,7 @@ Signal::TERM.trap &terminate
# Wait for redis and postgres to be ready
PlaceOS::Core.wait_for_resources

spawn do
spawn(name: "start-managers") do
begin
PlaceOS::Core.start_managers
rescue error
Expand Down
2 changes: 1 addition & 1 deletion src/placeos-core.cr
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ module PlaceOS::Core
# Acquire resources on startup
resource_manager.start do
# Start managing modules once relevant resources present
spawn { module_manager.start }
spawn(name: "module-manager-start") { module_manager.start }
Fiber.yield
end
end
Expand Down
26 changes: 24 additions & 2 deletions src/placeos-core/driver_manager.cr
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,30 @@ module PlaceOS::Core

def self.remove_driver(driver : Model::Driver, store : DriverStore)
path = store.driver_binary_path(driver.file_name, driver.commit)
Log.info { {message: "removing driver binary as it got removed from drivers", driver_id: driver.id.as(String), path: path.to_s} }
remove_stale_driver(path, driver.id.as(String))
driver_id = driver.id.as(String)

# A binary is keyed by `file_name + commit` and may be shared by multiple
# `Model::Driver` rows (e.g., the same source compiled for different roles,
# or a row recreated mid-flight). Deleting the file on every :Deleted event
# creates a race where a sibling row that still needs the binary loses it.
# Skip the deletion when another row references the same binary; otherwise
# `DriverIntegrity.sync_drivers` handles cleanup of truly-stale binaries.
still_referenced = begin
Model::Driver.where(file_name: driver.file_name, commit: driver.commit).any? do |other|
other.id != driver_id
end
rescue ex
Log.warn(exception: ex) { {message: "could not check for sibling driver rows; skipping binary removal", driver_id: driver_id, path: path.to_s} }
true
end

if still_referenced
Log.info { {message: "skipping driver binary removal, still referenced by another driver row", driver_id: driver_id, path: path.to_s} }
return
end

Log.info { {message: "removing driver binary as it got removed from drivers", driver_id: driver_id, path: path.to_s} }
remove_stale_driver(path, driver_id)
end

def start_driver_jobs
Expand Down
Loading
Loading