diff --git a/docs/user_guide/byonoy/absorbance_96/hello-world.ipynb b/docs/user_guide/byonoy/absorbance_96/hello-world.ipynb index b1255e4f78b..4a076e63455 100644 --- a/docs/user_guide/byonoy/absorbance_96/hello-world.ipynb +++ b/docs/user_guide/byonoy/absorbance_96/hello-world.ipynb @@ -3,7 +3,7 @@ { "cell_type": "markdown", "id": "n0a5pl74u0o", - "source": "# Byonoy Absorbance 96\n\nThe Absorbance 96 Automate (A96A) is a USB-HID plate reader from Byonoy that measures absorbance across a 96-well plate in a single flash. It supports:\n\n- [Absorbance](../../capabilities/absorbance) (single-wavelength, full-plate)\n\nThe hardware consists of three physical parts: a **base unit** (holds the plate), an **illumination unit** (light source, sits on top during measurement), and an optional **SBS adapter** for standard footprint integration. PLR models all three as resources so a robotic arm can move the illumination unit on and off the base.\n\n| Model | PLR Name | Factory function |\n|---|---|---|\n| Absorbance 96 Automate (full setup) | `ByonoyAbsorbance96` | `byonoy_a96a` |\n| Detection unit only | `ByonoyAbsorbance96` | `byonoy_a96a_detection_unit` |\n| Illumination unit | `Resource` | `byonoy_a96a_illumination_unit` |\n| Parking base (no backend) | `ByonoyAbsorbanceBaseUnit` | `byonoy_a96a_parking_unit` |\n| SBS adapter | `ResourceHolder` | `byonoy_sbs_adapter` |\n\n- **Communication**: USB HID (VID `0x16D0` / PID `0x1199`)\n- **Communication level**: Firmware", + "source": "# Byonoy Absorbance 96\n\nThe Absorbance 96 Automate (A96A) is a USB-HID plate reader from Byonoy that measures absorbance across a 96-well plate in a single flash. It supports:\n\n- [Absorbance](../../capabilities/absorbance) (single-wavelength, full-plate)\n\nThe hardware consists of three physical parts: a **base unit** (holds the plate), an **illumination unit** (light source, sits on top during measurement), and an optional **SBS adapter** for standard footprint integration. PLR models all three as resources so a robotic arm can move the illumination unit on and off the base.\n\n- **Communication**: USB HID (VID `0x16D0` / PID `0x1199`)\n- **Communication level**: Firmware", "metadata": {} }, { diff --git a/docs/user_guide/byonoy/index.md b/docs/user_guide/byonoy/index.md index 3cfedb983f5..53ad0851c69 100644 --- a/docs/user_guide/byonoy/index.md +++ b/docs/user_guide/byonoy/index.md @@ -5,4 +5,26 @@ absorbance_96/hello-world luminescence_96/hello-world +luminescence_96/led_bar ``` + +## Absorbance 96 models + +| Model | PLR resource | Factory function | +|---|---|---| +| A96A full setup | `ByonoyAbsorbance96` + illumination unit | `byonoy_a96a` | +| Detection unit only | `ByonoyAbsorbance96` | `byonoy_a96a_detection_unit` | +| Illumination unit | `Resource` | `byonoy_a96a_illumination_unit` | +| Parking base (no backend) | `ByonoyAbsorbanceBaseUnit` | `byonoy_a96a_parking_unit` | +| SBS adapter | `ResourceHolder` | `byonoy_sbs_adapter` | + +## Luminescence 96 models + +| Model | PLR resource | Factory function | +|---|---|---| +| L96 full setup | `ByonoyLuminescenceBaseUnit` + `ByonoyLuminescence96` | `byonoy_l96` | +| L96A full setup (automate) | `ByonoyLuminescenceBaseUnit` + `ByonoyLuminescence96` | `byonoy_l96a` | +| L96 reader unit only | `ByonoyLuminescence96` | `byonoy_l96_reader_unit` | +| L96A reader unit only | `ByonoyLuminescence96` | `byonoy_l96a_reader_unit` | +| L96 base unit only | `ByonoyLuminescenceBaseUnit` | `byonoy_l96_base_unit` | +| L96A base unit only | `ByonoyLuminescenceBaseUnit` | `byonoy_l96a_base_unit` | diff --git a/docs/user_guide/byonoy/luminescence_96/hello-world.ipynb b/docs/user_guide/byonoy/luminescence_96/hello-world.ipynb index d225b5e7c5d..6c41dc7c144 100644 --- a/docs/user_guide/byonoy/luminescence_96/hello-world.ipynb +++ b/docs/user_guide/byonoy/luminescence_96/hello-world.ipynb @@ -2,79 +2,362 @@ "cells": [ { "cell_type": "markdown", - "id": "ey04kywsg19", - "source": "# Byonoy Luminescence 96\n\nThe Luminescence 96 is a USB-HID plate reader from Byonoy that measures luminescence across a 96-well plate. It supports:\n\n- [Luminescence](../../capabilities/luminescence) (full-plate, configurable integration time)\n\nThe hardware consists of a **base unit** (holds the plate) and a **reader unit** (detector, sits on top during measurement). PLR models both as resources so a robotic arm can move the reader unit on and off the base. Two hardware variants exist: the L96 (manual) and L96A (automate, with a preferred pickup location for robotic handling).\n\n| Model | PLR Name | Factory function |\n|---|---|---|\n| L96 full setup | `ByonoyLuminescenceBaseUnit` + `ByonoyLuminescence96` | `byonoy_l96` |\n| L96A full setup (automate) | `ByonoyLuminescenceBaseUnit` + `ByonoyLuminescence96` | `byonoy_l96a` |\n| L96 reader unit only | `ByonoyLuminescence96` | `byonoy_l96_reader_unit` |\n| L96A reader unit only | `ByonoyLuminescence96` | `byonoy_l96a_reader_unit` |\n| L96 base unit only | `ByonoyLuminescenceBaseUnit` | `byonoy_l96_base_unit` |\n| L96A base unit only | `ByonoyLuminescenceBaseUnit` | `byonoy_l96a_base_unit` |\n\n- **Communication**: USB HID (VID `0x16D0` / PID `0x119B`)\n- **Communication level**: Firmware", - "metadata": {} + "id": "intro", + "metadata": {}, + "source": [ + "# Byonoy Luminescence 96 — lab guide\n", + "\n", + "Run a luminescence assay on the Byonoy L96 from PyLabRobot. Assumes the device is plugged in via USB and you've installed `pylabrobot` (with `hid` and `hidapi`).\n", + "\n", + "The L96 is a 96-well luminescence-only plate reader. It reads emitted light per well, no excitation source. Communicates over USB HID (vid `0x16D0`, pid `0x119B`)." + ] }, { "cell_type": "markdown", - "id": "9y33vci34d6", - "source": "## Setup\n\nUse `byonoy_l96a` (automate) or `byonoy_l96` (manual) to create the full setup (base unit + reader unit). The reader unit is both a `Resource` and a `Device`.", - "metadata": {} + "id": "s1-md", + "metadata": {}, + "source": [ + "## 1. Connect\n", + "\n", + "`base` is a resource (a plate goes here); `reader` is both a resource and a device (the detector unit). After `setup()` the HID handle is open.\n", + "\n", + "> **One process at a time.** macOS / Windows / Linux all give exclusive HID access. If a previous Python session crashed without `stop()`, the next `setup()` will fail with \"device already open\". Replug the USB cable to force-release." + ] }, { "cell_type": "code", - "id": "g7yhpou4ecd", - "source": "from pylabrobot.byonoy import byonoy_l96a\n\nbase, reader = byonoy_l96a(name=\"l96a\")\nawait reader.setup()", - "metadata": {}, "execution_count": null, - "outputs": [] + "id": "s1-code", + "metadata": {}, + "outputs": [], + "source": [ + "from pylabrobot.byonoy import byonoy_l96 # or byonoy_l96a for the automate variant\n", + "\n", + "base, reader = byonoy_l96(name=\"l96\")\n", + "await reader.setup()" + ] }, { "cell_type": "markdown", - "id": "yaf5kv2g5np", - "source": "## Luminescence\n\nThe luminescence capability is exposed as `reader.luminescence`. For the full API, see [Luminescence](../../capabilities/luminescence).\n\nBefore reading, remove the reader unit from the base so a plate can be assigned, then place the reader unit back.", + "id": "05b48622", + "source": "## Resource layout\n\nThe reader unit is both a `Resource` and a `Device`. The base unit owns two child holders, and the interlock lives on the plate holder:\n\n```\nByonoyLuminescenceBaseUnit (base)\n +-- plate_holder (assign plates here)\n +-- reader_unit_holder (reader unit sits here during measurement)\n```\n\nFor the full capability surface (parameters, return types) see [Luminescence](../../capabilities/luminescence).", "metadata": {} }, + { + "cell_type": "markdown", + "id": "s2-md", + "metadata": {}, + "source": [ + "## 2. Load a plate\n", + "\n", + "The reader-on-base interlock prevents you from assigning a plate while the detector is still on the holder — it forces a sane physical sequence.\n", + "\n", + "After running this cell, physically place the plate in the reader and place the detector back on top." + ] + }, { "cell_type": "code", - "id": "ochf7cbgdxi", - "source": "from pylabrobot.resources import Cor_96_wellplate_360ul_Fb\n\n# Remove reader unit so plate can be loaded\nbase.reader_unit_holder.unassign_child_resource(reader)\n\nplate = Cor_96_wellplate_360ul_Fb(name=\"plate\")\nbase.plate_holder.assign_child_resource(plate)", + "execution_count": null, + "id": "s2-code", + "metadata": {}, + "outputs": [], + "source": [ + "from pylabrobot.resources import Cor_96_wellplate_360ul_Fb\n", + "\n", + "base.reader_unit_holder.unassign_child_resource(reader) # take detector off\n", + "plate = Cor_96_wellplate_360ul_Fb(name=\"plate\")\n", + "base.plate_holder.assign_child_resource(plate)" + ] + }, + { + "cell_type": "markdown", + "id": "s3-md", "metadata": {}, + "source": [ + "## 3. Read — the basics\n", + "\n", + "`focal_height` is required by the abstract `Luminescence` capability but **ignored by the L96** — the device has a fixed optical configuration (the detector unit clamps onto the base; geometry is determined by plate + base + detector heights, not user-tunable). Pass `0` by convention.\n", + "\n", + "### Result shape\n", + "\n", + "`data` is plate row-major: `data[0]` = `[A1..A12]`, `data[1]` = `[B1..B12]`, ..., `data[7]` = `[H1..H12]`. So `data[2][5]` is well `C6`. Values are floats in **RLU (Relative Light Units) per integration period** — not per second. Doubling integration time roughly doubles signal *and* dark counts.\n", + "\n", + "### Background\n", + "\n", + "With nothing in the wells (and a dark environment), expect noise around ±50 RLU at SENSITIVE (2 s). Non-zero noise is dark-current spread; **negative values are normal** because the firmware applies a baseline subtraction.\n", + "\n", + "> **Light leakage.** The L96 is designed to be light-tight from above (the detector unit covers the plate) but the bottom housing isn't perfectly sealed. Reading on a *white* surface vs a *black* surface can change empty-well readings from ~50 to ~50,000+ RLU because reflected ambient light leaks in. For real assays use a black mat or a dark cabinet." + ] + }, + { + "cell_type": "code", "execution_count": null, - "outputs": [] + "id": "s3-code", + "metadata": {}, + "outputs": [], + "source": [ + "results = await reader.luminescence.read(plate=plate, focal_height=0)\n", + "data = results[0].data # 8 × 12 list[list[float]]\n", + "timestamp = results[0].timestamp # epoch seconds\n", + "\n", + "print(f\"timestamp={timestamp}\")\n", + "for row in data:\n", + " print(\" \" + \" \".join(f\"{v:8.2f}\" for v in row))" + ] + }, + { + "cell_type": "markdown", + "id": "s4-md", + "metadata": {}, + "source": [ + "## 4. Picking an integration mode\n", + "\n", + "Four modes, mapping to the byonoy_device_library presets:\n", + "\n", + "| Mode | Integration time | Use for |\n", + "|---|---|---|\n", + "| `RAPID` | 100 ms | Saturation checks, quick \"is it bright?\" |\n", + "| `SENSITIVE` | 2 s (default) | Most assays — luciferase, BRET, NanoBiT |\n", + "| `ULTRA_SENSITIVE` | 20 s | Very faint signals; low-copy reporters |\n", + "| `CUSTOM` | user-supplied | Your own duration |" + ] }, { "cell_type": "code", - "id": "h7wn22dkjls", - "source": "results = await reader.luminescence.read_luminescence(plate, focal_height=13.0)", + "execution_count": null, + "id": "s4-code", + "metadata": {}, + "outputs": [], + "source": [ + "from pylabrobot.byonoy import ByonoyLuminescence96Backend\n", + "\n", + "# Preset\n", + "results = await reader.luminescence.read(\n", + " plate=plate,\n", + " focal_height=0,\n", + " backend_params=ByonoyLuminescence96Backend.LuminescenceParams(\n", + " mode=\"ultra_sensitive\",\n", + " ),\n", + ")\n", + "\n", + "# Custom (any duration in seconds) — auto-switches to CUSTOM mode\n", + "results = await reader.luminescence.read(\n", + " plate=plate,\n", + " focal_height=0,\n", + " backend_params=ByonoyLuminescence96Backend.LuminescenceParams(\n", + " integration_time=5.0,\n", + " ),\n", + ")" + ] + }, + { + "cell_type": "markdown", + "id": "s5-md", "metadata": {}, + "source": "## 5. Reading specific wells\n\nPass a `wells=` list to `read()` — only those wells get real values back; everything else comes back as `None`. The result shape is still 8×12 (per the `LuminescenceResult` contract); unmeasured cells are `None` so you can distinguish \"didn't read\" from a legitimate `0.0` reading (baseline subtraction can yield ~0 or negative values).\n\n> **No speed-up.** The firmware always integrates the whole 96-sensor array. Reading one column with `SENSITIVE` takes the same wall-clock as reading the full plate (~28 s in our hardware test). The `wells` filter keeps your downstream tidy — it doesn't save time. If you want fast, use `RAPID` mode." + }, + { + "cell_type": "code", "execution_count": null, - "outputs": [] + "id": "s5-code", + "metadata": {}, + "outputs": [], + "source": "# Only column 1 (A1, B1, ..., H1)\ncol1_wells = [plate.get_well(f\"{r}1\") for r in \"ABCDEFGH\"]\n\nresults = await reader.luminescence.read(\n plate=plate,\n focal_height=0,\n wells=col1_wells,\n)\n# results[0].data[0][0] is A1 (float); results[0].data[0][1] is A2 (None)" }, { "cell_type": "markdown", - "id": "3zuj9ae45as", - "source": "### Custom integration time\n\nUse {class}`~pylabrobot.byonoy.luminescence_96.ByonoyLuminescence96Backend.LuminescenceParams` to set the integration time (in seconds, default 2).", - "metadata": {} + "id": "s6-md", + "metadata": {}, + "source": [ + "## 6. Timed read (delay before reading)\n", + "\n", + "For a substrate-injection assay where you want a fixed delay between adding reagent and reading. `await asyncio.sleep` doesn't block the event loop, and the reader stays connected." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "s6-code", + "metadata": {}, + "outputs": [], + "source": [ + "import asyncio\n", + "\n", + "# ... pipette substrate into the plate ...\n", + "await asyncio.sleep(60) # 60 s incubation\n", + "results = await reader.luminescence.read(plate=plate, focal_height=0)" + ] + }, + { + "cell_type": "markdown", + "id": "s7-md", + "metadata": {}, + "source": [ + "## 7. Kinetic read (time series)\n", + "\n", + "Read the same plate every N seconds, collect a stack of matrices. With `SENSITIVE` (2 s) the wall-clock per read is around 3 s including overhead, so `interval_s` must be ≥ 3. With `ULTRA_SENSITIVE` (20 s) it's around 28 s — plan accordingly." + ] }, { "cell_type": "code", - "id": "49joyxhhy8t", - "source": "from pylabrobot.byonoy import ByonoyLuminescence96Backend\n\nresults = await reader.luminescence.read_luminescence(\n plate,\n focal_height=13.0,\n backend_params=ByonoyLuminescence96Backend.LuminescenceParams(integration_time=5),\n)", + "execution_count": null, + "id": "s7-code", "metadata": {}, + "outputs": [], + "source": [ + "import asyncio, time\n", + "import numpy as np\n", + "\n", + "frames = []\n", + "duration_s = 600 # 10 minutes total\n", + "interval_s = 30 # one read every 30 s\n", + "\n", + "t_start = time.time()\n", + "while time.time() - t_start < duration_s:\n", + " t_read = time.time()\n", + " results = await reader.luminescence.read(plate=plate, focal_height=0)\n", + " frames.append({\n", + " \"t\": t_read - t_start,\n", + " \"data\": results[0].data,\n", + " })\n", + " elapsed = time.time() - t_read\n", + " if elapsed < interval_s:\n", + " await asyncio.sleep(interval_s - elapsed)\n", + "\n", + "matrix_stack = np.array([f[\"data\"] for f in frames]) # (n_frames, 8, 12)\n", + "times = np.array([f[\"t\"] for f in frames])\n", + "print(f\"collected {len(frames)} frames over {duration_s} s\")\n", + "# Trace for well C6:\n", + "trace = matrix_stack[:, 2, 5]" + ] + }, + { + "cell_type": "markdown", + "id": "s8-md", + "metadata": {}, + "source": [ + "## 8. Stopping a long read\n", + "\n", + "If you need to bail out of an `ULTRA_SENSITIVE` read mid-flight (or any read takes longer than expected), `cancel()` raises a flag the read loop checks; bail-out is within ~2 s. After cancel the device stays initialised and immediately accepts new reads." + ] + }, + { + "cell_type": "code", "execution_count": null, - "outputs": [] + "id": "s8-code", + "metadata": {}, + "outputs": [], + "source": [ + "task = asyncio.create_task(\n", + " reader.luminescence.read(plate=plate, focal_height=0,\n", + " backend_params=ByonoyLuminescence96Backend.LuminescenceParams(\n", + " mode=\"ultra_sensitive\",\n", + " ),\n", + " )\n", + ")\n", + "await asyncio.sleep(1.0)\n", + "await reader.driver.cancel()\n", + "try:\n", + " await task\n", + "except asyncio.CancelledError:\n", + " print(\"aborted cleanly\")" + ] }, { "cell_type": "markdown", - "id": "tjfltsit53", - "source": "## Resource layout\n\nThe Luminescence 96 has an interlock: you cannot assign a plate to the base while the reader unit is on top. In an automated workcell, use a robotic arm to move the reader unit off the base before loading the plate.\n\n```\nByonoyLuminescenceBaseUnit (base)\n +-- plate_holder (assign plates here)\n +-- reader_unit_holder (reader unit sits here during measurement)\n```", - "metadata": {} + "id": "s9-md", + "metadata": {}, + "source": [ + "## 9. Device health & identity\n", + "\n", + "Useful at the start of a session, in error messages, or for run logging.\n", + "\n", + "> **`slot_state`**: `OCCUPIED` when a plate is loaded, `UNKNOWN` when nothing is in the reader (the firmware can't tell empty from missing). Don't treat `UNKNOWN` as an error.\n", + ">\n", + "> **`error_code`**: `0` is `NO_ERROR`. The Lum96 firmware doesn't publish a documented table for non-zero values, so non-zero codes surface as `errorCode=0xNN` (matching what Byonoy's own C library returns). For Abs96 / AbsOne backends, names like `ERROR_CALIB` / `AMBIENT_LIGHT` are decoded automatically." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "s9-code", + "metadata": {}, + "outputs": [], + "source": [ + "status = await reader.driver.request_status()\n", + "env = await reader.driver.request_environment()\n", + "info = await reader.driver.request_device_info()\n", + "versions = await reader.driver.request_versions()\n", + "api = await reader.driver.request_api_version()\n", + "supported = await reader.driver.request_supported_reports()\n", + "\n", + "print(f\"{info.device_name} sn={info.serial_no} fw={info.firmware_version}\")\n", + "print(f\" uptime {status.uptime_s} s, T={env.temperature_c:.1f}°C, RH={env.humidity*100:.0f}%\")\n", + "print(f\" slot: {status.slot_state.name}, error: {reader.driver.describe_error_code(status.error_code)}\")\n", + "print(f\" api v{api}, fw production={versions.is_production}\")\n", + "print(f\" supported reports ({len(supported)}): \" + \", \".join(f\"0x{i:04X}\" for i in supported))" + ] }, { "cell_type": "markdown", - "id": "ck97t28eylh", - "source": "## Teardown", - "metadata": {} + "id": "s10-md", + "metadata": {}, + "source": "## 10. Visual feedback (LED bar)\n\nThe L96 has a 20-pixel addressable RGB front bar. Useful in a workcell to flag run state — solid colors for status, firmware-driven animations (`BREATHING`, `CYLON`, etc.) for \"busy\" indicators, or per-pixel control for progress bars and custom animations.\n\nSee the dedicated [LED bar notebook](led_bar.ipynb) for the full surface and recipes (KITT scanner, gradients, etc.). Quick taste below." }, { "cell_type": "code", - "id": "thvjaquzdj", - "source": "await reader.stop()", + "execution_count": null, + "id": "s10-code", + "metadata": {}, + "outputs": [], + "source": "await reader.driver.set_led_color((0, 255, 0)) # solid green: ready\nawait reader.driver.set_led_color((0, 255, 0), \"breathing\", duration_ms=10000) # busy\nawait reader.driver.set_led_colors([(255, 0, 0)] * 10 + [(0, 0, 255)] * 10) # per-pixel split" + }, + { + "cell_type": "markdown", + "id": "s11-md", "metadata": {}, + "source": [ + "## 11. End-point luciferase recipe\n", + "\n", + "End-to-end workflow for a typical end-point luciferase assay." + ] + }, + { + "cell_type": "code", "execution_count": null, - "outputs": [] + "id": "s11-code", + "metadata": {}, + "outputs": [], + "source": "import asyncio, time\nimport numpy as np\nfrom pylabrobot.byonoy import byonoy_l96, ByonoyLuminescence96Backend\nfrom pylabrobot.resources import Cor_96_wellplate_360ul_Fb\n\n# Connect\nbase, reader = byonoy_l96(name=\"assay\")\nawait reader.setup()\nawait reader.driver.set_led_color((255, 150, 0)) # amber: prep\n\n# Sanity check\nstatus = await reader.driver.request_status()\ninfo = await reader.driver.request_device_info()\nprint(f\"{info.device_name} sn={info.serial_no} — {status.slot_state.name}\")\nassert status.error_code == 0\n\n# Load plate\nbase.reader_unit_holder.unassign_child_resource(reader)\nplate = Cor_96_wellplate_360ul_Fb(name=\"assay_plate\")\nbase.plate_holder.assign_child_resource(plate)\n# (operator places plate, places detector back on top)\n\n# Read — green while measuring\nawait reader.driver.set_led_color((0, 255, 0))\nresults = await reader.luminescence.read(\n plate=plate,\n focal_height=0,\n backend_params=ByonoyLuminescence96Backend.LuminescenceParams(\n mode=\"sensitive\",\n ),\n)\ndata = np.array(results[0].data) # 8 × 12\n\n# Save + tidy up\nnp.save(f\"luminescence_{int(time.time())}.npy\", data)\nawait reader.driver.set_led_color((0, 0, 0)) # off\nawait reader.stop()" + }, + { + "cell_type": "markdown", + "id": "s12-md", + "metadata": {}, + "source": [ + "## 12. Troubleshooting\n", + "\n", + "| Symptom | Likely cause | Fix |\n", + "|---|---|---|\n", + "| `setup()` raises \"device already open\" | Previous Python session left HID handle locked | Replug USB cable, or kill stale Python processes |\n", + "| All wells read very high (10⁴–10⁶) with no sample | Light leak through housing bottom | Use a dark mat or close the room |\n", + "| Strong gradient A→H with no sample | Directional light leak (one side leakier) | Same — dark mat |\n", + "| `slot_state=UNKNOWN` | No plate loaded | Expected — firmware cannot detect \"nothing\" definitively |\n", + "| Read takes 28 s for one well in SENSITIVE | Firmware always integrates 96 wells | Use `RAPID` for fast, accept the 28 s for SENSITIVE |\n", + "| `cancel()` returns immediately, read keeps going | No measurement in flight | `cancel()` auto-detects the in-flight trigger; no-op if there isn't one |\n", + "| Negative readings on empty wells | Firmware baseline subtraction | Expected — they should sit around zero |\n", + "| `focal_height` parameter has no effect | L96 has fixed optics | Expected — parameter is ABC contract, ignored on this device |" + ] + }, + { + "cell_type": "markdown", + "id": "s13-md", + "metadata": {}, + "source": [ + "## 13. Reference\n", + "\n", + "- **Hardware protocol**: HID 64-byte frames, vid `0x16D0`, pid `0x119B`. Report IDs decoded from Byonoy's `byonoyusbhid.h`. `routing_info=\\x80\\x40` requests a reply; `\\x00\\x00` is fire-and-forget." + ] } ], "metadata": { diff --git a/docs/user_guide/byonoy/luminescence_96/led_bar.ipynb b/docs/user_guide/byonoy/luminescence_96/led_bar.ipynb new file mode 100644 index 00000000000..5306306c5ea --- /dev/null +++ b/docs/user_guide/byonoy/luminescence_96/led_bar.ipynb @@ -0,0 +1,311 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "intro", + "metadata": {}, + "source": [ + "# Byonoy L96 — LED bar\n", + "\n", + "The L96 has a 20-pixel addressable RGB front bar on the chassis. PyLabRobot exposes two methods to drive it:\n", + "\n", + "| Method | Use for |\n", + "|---|---|\n", + "| `set_led_color(color, effect)` | Single color across all 20 pixels, optionally animated by the firmware (`BREATHING`, `CYLON`, `RAINBOW`, ...) |\n", + "| `set_led_colors(colors)` | Per-pixel control — supply a list of up to 20 RGB triplets. Fast enough for real-time animation. |" + ] + }, + { + "cell_type": "markdown", + "id": "connect-md", + "metadata": {}, + "source": [ + "## Connect\n", + "\n", + "We'll talk to the device's driver directly, so the bar can be driven without setting up a plate read." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "connect-code", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-17T02:44:03.229447Z", + "iopub.status.busy": "2026-05-17T02:44:03.229245Z", + "iopub.status.idle": "2026-05-17T02:44:03.307144Z", + "shell.execute_reply": "2026-05-17T02:44:03.306740Z" + } + }, + "outputs": [], + "source": [ + "from pylabrobot.byonoy import byonoy_l96\n", + "\n", + "base, reader = byonoy_l96(name=\"l96\")\n", + "await reader.setup()\n", + "drv = reader.driver" + ] + }, + { + "cell_type": "markdown", + "id": "solid-md", + "metadata": {}, + "source": [ + "## Solid colors\n", + "\n", + "The simplest call: one color, `SOLID` effect (the default). The firmware snaps the bar to that color." + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "solid-code", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-17T02:44:03.308426Z", + "iopub.status.busy": "2026-05-17T02:44:03.308329Z", + "iopub.status.idle": "2026-05-17T02:44:06.321067Z", + "shell.execute_reply": "2026-05-17T02:44:06.319852Z" + } + }, + "outputs": [], + "source": [ + "import asyncio\n", + "\n", + "await drv.set_led_color((255, 0, 0)) # red\n", + "await asyncio.sleep(1)\n", + "await drv.set_led_color((0, 255, 0)) # green\n", + "await asyncio.sleep(1)\n", + "await drv.set_led_color((0, 0, 255)) # blue\n", + "await asyncio.sleep(1)\n", + "await drv.set_led_color((0, 0, 0)) # off" + ] + }, + { + "cell_type": "markdown", + "id": "effects-md", + "metadata": {}, + "source": [ + "## Built-in effects\n", + "\n", + "The firmware can animate the base color for you. Pass `duration_ms` to set how long the effect runs before reverting to firmware control. Use `force=True` to override an unexpired previous duration.\n", + "\n", + "| Effect | Behavior |\n", + "|---|---|\n", + "| `SOLID` | Snap to color (default) |\n", + "| `BREATHING` | Pulse brightness |\n", + "| `BLINKING` | Flash on/off |\n", + "| `CYLON` | Bouncing dot across the bar |\n", + "| `RAINBOW` | Cycle through hues (ignores supplied color) |\n", + "| `PROGRESS` | Fill progressively, driven by `effect_state` (0–255) |" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "effects-code", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-17T02:44:06.324515Z", + "iopub.status.busy": "2026-05-17T02:44:06.324197Z", + "iopub.status.idle": "2026-05-17T02:44:16.339410Z", + "shell.execute_reply": "2026-05-17T02:44:16.337534Z" + } + }, + "outputs": [], + "source": [ + "await drv.set_led_color((0, 255, 0), \"breathing\", duration_ms=6000)\n", + "await asyncio.sleep(6)\n", + "\n", + "await drv.set_led_color((255, 0, 255), \"cylon\", duration_ms=4000, force=True)\n", + "await asyncio.sleep(4)\n", + "\n", + "await drv.set_led_color((0, 0, 0)) # back to off" + ] + }, + { + "cell_type": "markdown", + "id": "pixels-md", + "metadata": {}, + "source": [ + "## Per-pixel control\n", + "\n", + "`set_led_colors(colors)` takes a list of up to 20 `(r, g, b)` triplets, one per pixel (left to right). Shorter lists are zero-padded; longer ones are truncated." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "pixels-code", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-17T02:44:16.344037Z", + "iopub.status.busy": "2026-05-17T02:44:16.343628Z", + "iopub.status.idle": "2026-05-17T02:44:23.366202Z", + "shell.execute_reply": "2026-05-17T02:44:23.364702Z" + } + }, + "outputs": [], + "source": [ + "# Left half red, right half blue\n", + "await drv.set_led_colors([(255, 0, 0)] * 10 + [(0, 0, 255)] * 10)\n", + "await asyncio.sleep(2)\n", + "\n", + "# Rainbow gradient across the bar\n", + "import colorsys\n", + "def hue(i, n=20):\n", + " r, g, b = colorsys.hsv_to_rgb(i / n, 1, 1)\n", + " return (int(r * 255), int(g * 255), int(b * 255))\n", + "\n", + "await drv.set_led_colors([hue(i) for i in range(20)])\n", + "await asyncio.sleep(3)\n", + "\n", + "# Every other pixel\n", + "await drv.set_led_colors([(0, 255, 0) if i % 2 == 0 else (0, 0, 0) for i in range(20)])\n", + "await asyncio.sleep(2)\n", + "\n", + "await drv.set_led_colors([(0, 0, 0)] * 20) # off" + ] + }, + { + "cell_type": "markdown", + "id": "scanner-md", + "metadata": {}, + "source": [ + "## Animation recipe: KITT scanner\n", + "\n", + "A back-and-forth scanner with a fading trail — the kind of thing you'd want as a \"robot is busy\" indicator. Frame rate is set by how fast you call `set_led_colors`; ~30 fps is comfortable." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "scanner-code", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-17T02:44:23.369015Z", + "iopub.status.busy": "2026-05-17T02:44:23.368873Z", + "iopub.status.idle": "2026-05-17T02:44:30.548475Z", + "shell.execute_reply": "2026-05-17T02:44:30.547235Z" + } + }, + "outputs": [], + "source": [ + "import asyncio\n", + "\n", + "N = 20\n", + "trail = 4 # length of the fading trail\n", + "decay = 0.45 # brightness ratio per trail step\n", + "step_s = 0.06 # ~16 fps; lower for faster\n", + "\n", + "def frame(head):\n", + " px = [(0, 0, 0)] * N\n", + " for k in range(trail + 1):\n", + " v = int(255 * (decay ** k))\n", + " if 0 <= head - k < N:\n", + " px[head - k] = (v, 0, 0) # red trail\n", + " return px\n", + "\n", + "positions = list(range(N)) + list(range(N - 2, 0, -1)) # 0..19..1\n", + "for _ in range(3): # 3 ping-pong cycles\n", + " for head in positions:\n", + " await drv.set_led_colors(frame(head))\n", + " await asyncio.sleep(step_s)\n", + "\n", + "await drv.set_led_colors([(0, 0, 0)] * 20)" + ] + }, + { + "cell_type": "markdown", + "id": "walking-md", + "metadata": {}, + "source": [ + "## Animation recipe: walking dot\n", + "\n", + "Simplest possible animation — one bright pixel marches across the bar. Useful as a progress indicator with a known step count." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "walking-code", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-17T02:44:30.551351Z", + "iopub.status.busy": "2026-05-17T02:44:30.551132Z", + "iopub.status.idle": "2026-05-17T02:44:33.613534Z", + "shell.execute_reply": "2026-05-17T02:44:33.612076Z" + } + }, + "outputs": [], + "source": [ + "for pos in range(20):\n", + " px = [(0, 0, 0)] * 20\n", + " px[pos] = (0, 255, 0)\n", + " await drv.set_led_colors(px)\n", + " await asyncio.sleep(0.15)\n", + "\n", + "await drv.set_led_colors([(0, 0, 0)] * 20)" + ] + }, + { + "cell_type": "markdown", + "id": "teardown-md", + "metadata": {}, + "source": [ + "## Teardown" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "teardown-code", + "metadata": { + "execution": { + "iopub.execute_input": "2026-05-17T02:44:33.616454Z", + "iopub.status.busy": "2026-05-17T02:44:33.616208Z", + "iopub.status.idle": "2026-05-17T02:44:33.621351Z", + "shell.execute_reply": "2026-05-17T02:44:33.620597Z" + } + }, + "outputs": [], + "source": [ + "await drv.set_led_color((0, 0, 0)) # ensure bar is off\n", + "await reader.stop()" + ] + }, + { + "cell_type": "markdown", + "id": "reference", + "metadata": {}, + "source": [ + "## Reference\n", + "\n", + "- `set_led_color(color, effect, *, duration_ms=0, force=False, low_power=False)` — single uniform color, optional firmware-driven effect.\n", + "- `set_led_colors(colors)` — list of up to 20 `(r, g, b)` triplets, one per pixel. Pads with black; truncates if longer.\n", + "- Both methods live on `reader.driver`. Source: `pylabrobot/byonoy/driver.py`." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "env", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.12" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} \ No newline at end of file diff --git a/pylabrobot/byonoy/__init__.py b/pylabrobot/byonoy/__init__.py index c9289dff529..a81b32658e2 100644 --- a/pylabrobot/byonoy/__init__.py +++ b/pylabrobot/byonoy/__init__.py @@ -8,6 +8,20 @@ byonoy_a96a_parking_unit, byonoy_sbs_adapter, ) +from .driver import ( + LUM96_PRESET_S, + Abs1StatusError, + Abs96StatusError, + ByonoyDevice, + ByonoyDeviceInfo, + ByonoyEnvironment, + ByonoySlotState, + ByonoyStatus, + ByonoyVersions, + LedEffect, + Lum96IntegrationMode, + encode_well_bitmask, +) from .luminescence_96 import ( ByonoyLuminescence96, ByonoyLuminescence96Backend, diff --git a/pylabrobot/byonoy/absorbance_96.py b/pylabrobot/byonoy/absorbance_96.py index 1cc7f59c09a..a6fcc9bc812 100644 --- a/pylabrobot/byonoy/absorbance_96.py +++ b/pylabrobot/byonoy/absorbance_96.py @@ -1,8 +1,9 @@ +import asyncio import logging import time from typing import List, Optional, Tuple -from pylabrobot.byonoy.backend import ByonoyBase, ByonoyDevice +from pylabrobot.byonoy.driver import ABS96_ERROR_NAMES, ByonoyDevice, ByonoyDriver from pylabrobot.capabilities.capability import BackendParams from pylabrobot.capabilities.plate_reading.absorbance import ( Absorbance, @@ -26,11 +27,13 @@ # --------------------------------------------------------------------------- -class ByonoyAbsorbance96Backend(ByonoyBase, AbsorbanceBackend): +class ByonoyAbsorbance96Backend(ByonoyDriver, AbsorbanceBackend): """Backend for the Byonoy Absorbance 96 Automate plate reader.""" + _ERROR_NAMES = ABS96_ERROR_NAMES + def __init__(self) -> None: - super().__init__(pid=0x1199, device_type=ByonoyDevice.ABSORBANCE_96) + super().__init__(pid=0x1199, device_type=ByonoyDevice.ABSORBANCE_96, name="Byonoy A96") self.available_wavelengths: List[float] = [] async def setup(self, backend_params: Optional["BackendParams"] = None) -> None: @@ -38,8 +41,8 @@ async def setup(self, backend_params: Optional["BackendParams"] = None) -> None: await self.initialize_measurements() self.available_wavelengths = await self.request_available_absorbance_wavelengths() logger.info( - "[Byonoy A96 pid=0x%04X] ready, available wavelengths: %s nm", - self.io.pid, + "[%s] ready, available wavelengths: %s nm", + self.name, self.available_wavelengths, ) @@ -56,61 +59,96 @@ async def request_available_absorbance_wavelengths(self) -> List[float]: return [w for w in available_wavelengths if w != 0] async def _run_abs_measurement(self, signal_wl: int, reference_wl: int, is_reference: bool): - await self.send_command( - report_id=0x0010, - payload=b"\x00" * 60, - wait_for_response=False, - ) + with self._measurement_in_flight(0x0320): + await self.send_command( + report_id=0x0010, + payload=b"\x00" * 60, + wait_for_response=False, + ) - payload2 = Writer().u16(7).u8(0).raw_bytes(b"\x00" * 52).finish() - await self.send_command( - report_id=0x0200, - payload=payload2, - wait_for_response=False, - ) + payload2 = Writer().u16(7).u8(0).raw_bytes(b"\x00" * 52).finish() + await self.send_command( + report_id=0x0200, + payload=payload2, + wait_for_response=False, + ) - payload3 = Writer().i16(signal_wl).i16(reference_wl).u8(int(is_reference)).u8(0).finish() - await self.send_command( - report_id=0x0320, - payload=payload3, - wait_for_response=False, - routing_info=b"\x00\x40", - ) + payload3 = Writer().i16(signal_wl).i16(reference_wl).u8(int(is_reference)).u8(0).finish() + await self.send_command( + report_id=0x0320, + payload=payload3, + wait_for_response=False, + routing_info=b"\x00\x40", + ) - rows: List[float] = [] - t0 = time.time() - - while True: - if time.time() - t0 > 120: - logger.error( - "[Byonoy A96 pid=0x%04X] measurement timed out after 120s (signal=%d nm, ref=%d nm)", - self.io.pid, - signal_wl, - reference_wl, - ) - raise TimeoutError("Measurement timeout.") - - chunk = await self.io.read(64, timeout=30) - if len(chunk) == 0: - continue - - reader = Reader(chunk) - report_id = reader.u16() - - if report_id == 0x0500: - seq = reader.u8() - seq_len = reader.u8() - _ = reader.i16() # signal_wl_nm - _ = reader.i16() # reference_wl_nm - _ = reader.u32() # duration_ms - row = [reader.f32() for _ in range(12)] - _ = reader.u8() # flags - _ = reader.u8() # progress - - rows.extend(row) - - if seq == seq_len - 1: - break + # Index by seq so out-of-order/dropped chunks surface as None slots + # rather than silently shifting subsequent rows into the wrong wells. + rows_by_seq: List[Optional[List[float]]] = [] + flags_by_seq: List[Optional[int]] = [] + expected_chunks: Optional[int] = None + t0 = time.time() + + while True: + if self._abort_requested: + logger.info("[%s] measurement aborted by cancel()", self.name) + raise asyncio.CancelledError("Absorbance measurement aborted via cancel().") + if time.time() - t0 > 120: + logger.error( + "[%s] measurement timed out after 120s (signal=%d nm, ref=%d nm)", + self.name, + signal_wl, + reference_wl, + ) + raise TimeoutError("Measurement timeout.") + + chunk = await self.io.read(64, timeout=30) + if len(chunk) == 0: + continue + + reader = Reader(chunk) + report_id = reader.u16() + + if report_id == 0x0500: + seq = reader.u8() + seq_len = reader.u8() + _ = reader.i16() # signal_wl_nm + _ = reader.i16() # reference_wl_nm + _ = reader.u32() # duration_ms + row = [reader.f32() for _ in range(12)] + flags = reader.u8() + _ = reader.u8() # progress (0..100 running %); not surfaced + + if seq_len == 0: + raise RuntimeError(f"{self.name} firmware sent chunk with seq_len=0") + if expected_chunks is None: + expected_chunks = seq_len + rows_by_seq = [None] * seq_len + flags_by_seq = [None] * seq_len + elif seq_len != expected_chunks: + raise RuntimeError( + f"{self.name} firmware changed seq_len mid-stream: {expected_chunks} → {seq_len}" + ) + if not 0 <= seq < seq_len: + raise RuntimeError(f"{self.name} firmware sent seq={seq} (seq_len={seq_len})") + rows_by_seq[seq] = row + flags_by_seq[seq] = flags + + if all(r is not None for r in rows_by_seq): + break + + if expected_chunks is None: + raise RuntimeError(f"{self.name} absorbance read produced no chunks") + chunk_flags: List[int] = [f for f in flags_by_seq if f is not None] + rows: List[float] = [v for r in rows_by_seq if r is not None for v in r] + + status = await self.request_status() + if status.error_code != 0: + raise RuntimeError( + f"{self.name} firmware error after measurement (signal={signal_wl} nm, " + f"ref={reference_wl} nm): {self.describe_error_code(status.error_code)} " + f"(chunk flags: {[f'0x{f:02x}' for f in chunk_flags]})" + ) + self._warn_chunk_flags(chunk_flags) return rows @@ -130,13 +168,14 @@ async def read_absorbance( wavelength: int, backend_params: Optional[SerializableMixin] = None, ) -> List[AbsorbanceResult]: - assert wavelength in self.available_wavelengths, ( - f"Wavelength {wavelength} nm not in available wavelengths {self.available_wavelengths}." - ) + if wavelength not in self.available_wavelengths: + raise ValueError( + f"Wavelength {wavelength} nm not in available wavelengths {self.available_wavelengths}." + ) logger.info( - "[Byonoy A96 pid=0x%04X] reading absorbance: plate='%s', wavelength=%d nm, wells=%d/%d", - self.io.pid, + "[%s] reading absorbance: plate='%s', wavelength=%d nm, wells=%d/%d", + self.name, plate.name, wavelength, len(wells), @@ -253,13 +292,13 @@ def assign_child_resource( ) -> None: if isinstance(resource, _ByonoyAbsorbanceReaderPlateHolder): if self.plate_holder._byonoy_base is not None: - raise ValueError("ByonoyBase can only have one plate holder assigned.") + raise ValueError("ByonoyDriver can only have one plate holder assigned.") self.plate_holder._byonoy_base = self super().assign_child_resource(resource, location, reassign) def check_can_drop_resource_here(self, resource: Resource, *, reassign: bool = True) -> None: raise RuntimeError( - "ByonoyBase does not support assigning child resources directly. " + "ByonoyDriver does not support assigning child resources directly. " "Use the plate_holder or illumination_unit_holder to assign plates and the " "illumination unit, respectively." ) diff --git a/pylabrobot/byonoy/backend.py b/pylabrobot/byonoy/backend.py deleted file mode 100644 index 726e7509568..00000000000 --- a/pylabrobot/byonoy/backend.py +++ /dev/null @@ -1,109 +0,0 @@ -import asyncio -import enum -import logging -import threading -import time -from abc import ABCMeta -from typing import Optional - -from pylabrobot.capabilities.capability import BackendParams -from pylabrobot.device import Driver -from pylabrobot.io.binary import Reader, Writer -from pylabrobot.io.hid import HID - -logger = logging.getLogger(__name__) - - -class ByonoyDevice(enum.Enum): - ABSORBANCE_96 = enum.auto() - LUMINESCENCE_96 = enum.auto() - - -class ByonoyBase(Driver, metaclass=ABCMeta): - """Shared HID communication logic for Byonoy plate readers.""" - - def __init__(self, pid: int, device_type: ByonoyDevice) -> None: - super().__init__() - self.io = HID(human_readable_device_name="Byonoy Plate Reader", vid=0x16D0, pid=pid) - self._background_thread: Optional[threading.Thread] = None - self._stop_background = threading.Event() - self._ping_interval = 1.0 - self._sending_pings = False - self._device_type = device_type - - async def setup(self, backend_params: Optional[BackendParams] = None) -> None: - await self.io.setup() - logger.info("[Byonoy %s pid=0x%04X] connected", self._device_type.name, self.io.pid) - self._stop_background.clear() - self._background_thread = threading.Thread(target=self._background_ping_worker, daemon=True) - self._background_thread.start() - - async def stop(self) -> None: - self._stop_background.set() - if self._background_thread and self._background_thread.is_alive(): - self._background_thread.join(timeout=2.0) - await self.io.stop() - logger.info("[Byonoy %s pid=0x%04X] disconnected", self._device_type.name, self.io.pid) - - def _assemble_command(self, report_id: int, payload: bytes, routing_info: bytes) -> bytes: - packet = Writer().u16(report_id).raw_bytes(payload).finish() - packet += b"\x00" * (62 - len(packet)) + routing_info - return packet - - async def send_command( - self, - report_id: int, - payload: bytes, - wait_for_response: bool = True, - routing_info: bytes = b"\x00\x00", - ) -> Optional[bytes]: - command = self._assemble_command(report_id, payload=payload, routing_info=routing_info) - await self.io.write(command) - if not wait_for_response: - return None - - t0 = time.time() - while True: - if time.time() - t0 > 120: - logger.error( - "[Byonoy %s pid=0x%04X] timeout waiting for response to command 0x%04X after 120s", - self._device_type.name, - self.io.pid, - report_id, - ) - raise TimeoutError("Reading data timed out after 2 minutes.") - response = await self.io.read(64, timeout=30) - if len(response) == 0: - continue - response_report_id = Reader(response).u16() - if report_id == response_report_id: - break - return response - - def _background_ping_worker(self) -> None: - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete(self._ping_loop()) - except Exception: - logger.error("Background ping worker crashed", exc_info=True) - finally: - loop.close() - - async def _ping_loop(self) -> None: - while not self._stop_background.is_set(): - if self._sending_pings: - payload = Writer().u8(1).finish() - cmd = self._assemble_command( - report_id=0x0040, - payload=payload, - routing_info=b"\x00\x00", - ) - await self.io.write(cmd) - self._stop_background.wait(self._ping_interval) - - def _start_background_pings(self) -> None: - self._sending_pings = True - - def _stop_background_pings(self) -> None: - self._sending_pings = False diff --git a/pylabrobot/byonoy/driver.py b/pylabrobot/byonoy/driver.py new file mode 100644 index 00000000000..be866b370e1 --- /dev/null +++ b/pylabrobot/byonoy/driver.py @@ -0,0 +1,520 @@ +import asyncio +import contextlib +import enum +import logging +import time +from abc import ABCMeta +from dataclasses import dataclass +from typing import Dict, Iterator, List, Literal, Optional, Tuple + +from pylabrobot.capabilities.capability import BackendParams +from pylabrobot.device import Driver +from pylabrobot.io.binary import Reader, Writer +from pylabrobot.io.hid import HID + +logger = logging.getLogger(__name__) + + +class ByonoyDevice(enum.Enum): + ABSORBANCE_96 = enum.auto() + LUMINESCENCE_96 = enum.auto() + + +class ByonoySlotState(enum.IntEnum): + UNKNOWN = 0 + EMPTY = 1 + OCCUPIED = 2 + UNDETERMINED = 3 + + +Lum96IntegrationMode = Literal["rapid", "sensitive", "ultra_sensitive", "custom"] + + +# Preset integration times (matches byonoy_device_library: hidmeasurements.cpp) +LUM96_PRESET_S: Dict[Lum96IntegrationMode, float] = { + "rapid": 0.1, + "sensitive": 2.0, + "ultra_sensitive": 20.0, +} + + +def encode_well_bitmask(selected: List[bool], n: int = 96) -> bytes: + """Pack a length-n bool list into a little-endian bitmask, LSB-first within each byte.""" + if len(selected) != n: + raise ValueError(f"expected {n} bools, got {len(selected)}") + nbytes = (n + 7) // 8 + out = bytearray(nbytes) + for i, b in enumerate(selected): + if b: + out[i // 8] |= 1 << (i % 8) + return bytes(out) + + +@dataclass +class ByonoyStatus: + is_initialized: bool + slot_state: ByonoySlotState + error_code: int + uptime_s: int + is_measuring: bool + boot_completed: bool + + +@dataclass +class ByonoyEnvironment: + temperature_c: float + humidity: float # 0..1 + acceleration_g: Tuple[float, float, float] + + +@dataclass +class ByonoyVersions: + system_version: int + stm_version: int + stm_dev_version: int + esp_version: int + esp_dev_version: int + stm_bootloader_version: int + + @property + def system_version_known(self) -> bool: + return self.system_version != 0 + + @property + def is_production(self) -> bool: + return self.stm_dev_version == 0 and self.esp_dev_version == 0 + + +@dataclass +class ByonoyDeviceInfo: + device_id: str + device_name: str + manufacturer: str + serial_no: str + firmware_version: str + ref_number: str + + +# device_data_field_id (byonoyusbhid.h) +_DD_DEVICE_ID = 0 +_DD_DEVICE_NAME = 1 +_DD_DEVICE_MANUFACTURER = 2 +_DD_SERIAL_NO = 3 +_DD_FIRMWARE_VERSION = 4 +_DD_REF_NUMBER = 8 + +# device_data_field_flags (byonoyusbhid.h) +_FLAG_TYPE_MASK = 0x0F +_FLAG_TYPE_STRING = 0x02 +_FLAG_TYPE_INTEGER = 0x01 +_FLAG_TYPE_FLOAT = 0x04 +_FLAG_TYPE_BOOLEAN = 0x03 +_FLAG_HAS_MORE_DATA = 0x10 + + +LedEffect = Literal["solid", "progress", "cylon", "rainbow", "blinking", "breathing"] + +_LED_EFFECT_CODES: Dict[LedEffect, int] = { + "solid": 0x00, + "progress": 0x01, + "cylon": 0x02, + "rainbow": 0x03, + "blinking": 0x04, + "breathing": 0x05, +} + + +# --- Firmware error codes (per Byonoy hid-reports source) ------------------- +# +# The status_in_t.error_code byte is device-specific. Byonoy's own C library +# defines a Status base class that just stringifies the hex code, with per- +# device subclasses (Abs96Status, Abs1Status) providing named tables. There +# is no documented Lum96 table — Lum96 inherits the generic stringifier. +# +# These mirror the enums in: +# hid-reports/src/hid/report/request/abs96status.cpp +# hid-reports/src/hid/report/request/abs1status.cpp + + +class Abs96StatusError(enum.IntEnum): + NO_ERROR = 0 + ERROR_CALIB = 1 + ERROR_AMBIENT = 2 + ERROR_USB = 3 + ERROR_HARDWARE = 4 + ERROR_TEMPERATURE = 5 + ERROR_NO_MEASUREMENTUNIT = 6 + ERROR_NO_ACK = 10 + + +class Abs1StatusError(enum.IntFlag): + """AbsOne errors are a bit-flag set — multiple can be raised at once.""" + + NO_ERROR = 0 + AMBIENT_LIGHT = 1 + MIN_LIGHT = 2 + USB = 4 + HARDWARE = 8 + EEPROM = 16 + TIMEOUT = 32 + POWER_CALIBRATION = 64 + NOISE_LIMIT = 128 + + +_GENERIC_ERROR_NAMES: Dict[int, str] = {0: "NO_ERROR"} +ABS96_ERROR_NAMES: Dict[int, str] = {e.value: e.name for e in Abs96StatusError} +ABS1_ERROR_NAMES: Dict[int, str] = {e.value: e.name for e in Abs1StatusError} + + +_ACCEL_LSB_PER_G = 16384.0 # 14-bit signed @ ±2 g full scale + + +class ByonoyDriver(Driver, metaclass=ABCMeta): + """Shared HID communication logic for Byonoy plate readers.""" + + # Firmware error-code → name mapping. Default mirrors Byonoy's generic + # Status::firmwareErrorId (only NO_ERROR is documented). Subclasses for + # specific devices (e.g. ByonoyAbsorbance96Backend) override with their + # documented tables. Lum96 has no documented table; inherits the default. + _ERROR_NAMES: Dict[int, str] = _GENERIC_ERROR_NAMES + + def __init__(self, pid: int, device_type: ByonoyDevice, name: str) -> None: + super().__init__() + self.io = HID(human_readable_device_name=name, vid=0x16D0, pid=pid) + self._device_type = device_type + self._abort_requested = False + self._in_flight_trigger: Optional[int] = None + # Serializes write+response-sniff in send_command. Does NOT cover the + # measurement read loops in subclasses (which poll io.read directly so + # cancel() can still send the abort while they run). + self._io_lock = asyncio.Lock() + + @property + def name(self) -> str: + return self.io.human_readable_device_name + + async def setup(self, backend_params: Optional[BackendParams] = None) -> None: + await self.io.setup() + logger.info("[%s] connected", self.name) + + async def stop(self) -> None: + if self._in_flight_trigger is not None: + await self.cancel() + await self.io.stop() + logger.info("[%s] disconnected", self.name) + + def _assemble_command(self, report_id: int, payload: bytes, routing_info: bytes) -> bytes: + packet = Writer().u16(report_id).raw_bytes(payload).finish() + packet += b"\x00" * (62 - len(packet)) + routing_info + return packet + + async def send_command( + self, + report_id: int, + payload: bytes, + wait_for_response: bool = True, + routing_info: bytes = b"\x00\x00", + ) -> Optional[bytes]: + command = self._assemble_command(report_id, payload=payload, routing_info=routing_info) + async with self._io_lock: + await self.io.write(command) + if not wait_for_response: + return None + + t0 = time.time() + while True: + if time.time() - t0 > 120: + logger.error( + "[%s] timeout waiting for response to command 0x%04X after 120s", + self.name, + report_id, + ) + raise TimeoutError("Reading data timed out after 2 minutes.") + response = await self.io.read(64, timeout=30) + if len(response) == 0: + continue + response_report_id = Reader(response).u16() + if report_id == response_report_id: + break + return response + + async def request_status(self) -> ByonoyStatus: + """Read REP_STATUS_IN (0x0300): init/slot/error/uptime/measuring/boot.""" + response = await self.send_command( + report_id=0x0300, payload=b"\x00" * 60, routing_info=b"\x80\x40" + ) + assert response is not None + r = Reader(response[2:]) + return ByonoyStatus( + is_initialized=r.u8() != 0, + slot_state=ByonoySlotState(r.u8()), + error_code=r.u8(), + uptime_s=r.u32(), + is_measuring=r.u8() != 0, + boot_completed=r.u8() != 0, + ) + + def _warn_chunk_flags(self, chunk_flags: List[int]) -> None: + """Log non-zero per-chunk flag bytes from a measurement read loop. + + Vendor bit definitions for the measurement-result `flags` byte aren't + published, so we can't decode them — only surface that *something* was + flagged. Subclasses' read loops call this after the loop completes + (after error_code has been checked and didn't raise). + """ + if any(f != 0 for f in chunk_flags): + logger.warning( + "[%s] non-zero chunk flags during read: %s " + "(vendor bit definitions not published; data may be unreliable)", + self.name, + [f"0x{f:02x}" for f in chunk_flags], + ) + + def describe_error_code(self, code: int) -> str: + """Return a human-readable name for a firmware error_code byte. + + Looks up `code` in this backend's `_ERROR_NAMES` table. Unknown codes + fall back to `"errorCode=0xNN"` matching the C library's generic + Status::firmwareErrorId. The default table only has NO_ERROR (0); + subclasses for documented devices (Abs96, AbsOne) populate richer + tables. Lum96 has no documented table — codes other than 0 will + surface as the hex sentinel, which is the honest answer. + """ + if code in self._ERROR_NAMES: + return self._ERROR_NAMES[code] + return f"errorCode=0x{code:02x}" + + async def request_environment(self) -> ByonoyEnvironment: + """Read REP_ENVIRONMENT_IN (0x0310): temperature, humidity, acceleration.""" + response = await self.send_command( + report_id=0x0310, payload=b"\x00" * 60, routing_info=b"\x80\x40" + ) + assert response is not None + r = Reader(response[2:]) + temp_c = r.i16() / 100.0 + humidity = r.i16() / 1000.0 + ax, ay, az = r.i16(), r.i16(), r.i16() + return ByonoyEnvironment( + temperature_c=temp_c, + humidity=humidity, + acceleration_g=(ax / _ACCEL_LSB_PER_G, ay / _ACCEL_LSB_PER_G, az / _ACCEL_LSB_PER_G), + ) + + async def request_api_version(self) -> int: + """Read REP_API_VERSION_IN (0x0050): a single u32.""" + response = await self.send_command( + report_id=0x0050, payload=b"\x00" * 60, routing_info=b"\x80\x40" + ) + assert response is not None + return Reader(response[2:]).u32() + + async def request_supported_reports(self) -> List[int]: + """Read REP_SUPPORTED_REPORTS_IN (0x0010): list of report IDs the device supports. + + Reply is delivered in seq/seq_len chunks of up to 29 u16 ids; zero-valued + entries are padding. Returns the deduplicated, ordered union. + """ + cmd = self._assemble_command(report_id=0x0010, payload=b"\x00" * 60, routing_info=b"\x80\x40") + await self.io.write(cmd) + + seen: List[int] = [] + t0 = time.time() + while True: + if time.time() - t0 > 30: + raise TimeoutError("Timed out reading supported reports.") + chunk = await self.io.read(64, timeout=10) + if len(chunk) == 0: + continue + r = Reader(chunk) + if r.u16() != 0x0010: + continue + seq = r.u8() + seq_len = r.u8() + ids = [r.u16() for _ in range(29)] + seen.extend(i for i in ids if i != 0) + if seq == seq_len - 1: + break + # Preserve order, drop dupes + out: List[int] = [] + for i in seen: + if i not in out: + out.append(i) + return out + + async def _read_data_field(self, field_index: int) -> object: + """Read a named device-data field via REP_DEVICE_DATA_READ_IN (0x0200). + + Returns the field's value typed per the response flags + (str / int / float / bool / bytes). Truncates if HAS_MORE_DATA is set + (shouldn't happen for the short identity strings; log if it does). + + Private — the only documented caller is `request_device_info`, which knows + the field types ahead of time. Promote to public if you find a use case + that needs the polymorphic-by-flag-byte shape. + """ + payload = Writer().u16(field_index).u8(0).raw_bytes(b"\x00" * 57).finish() + response = await self.send_command(report_id=0x0200, payload=payload, routing_info=b"\x80\x40") + assert response is not None + r = Reader(response[2:]) + _ = r.u16() # echoed field_index + flags = r.u8() + data_type = flags & _FLAG_TYPE_MASK + if flags & _FLAG_HAS_MORE_DATA: + logger.warning( + "[%s] field 0x%04X has more data than fits in one report; truncating", + self.name, + field_index, + ) + raw = r.raw_bytes(52) + if data_type == _FLAG_TYPE_STRING: + return raw.split(b"\x00", 1)[0].decode("utf-8", errors="replace") + if data_type == _FLAG_TYPE_INTEGER: + return int.from_bytes(raw[:4], "little", signed=False) + if data_type == _FLAG_TYPE_FLOAT: + return Reader(raw[:4]).f32() + if data_type == _FLAG_TYPE_BOOLEAN: + return raw[0] != 0 + return raw # TypeBytes + + async def request_device_info(self) -> ByonoyDeviceInfo: + """Read identity strings (matches C lib's byonoy_get_device_information).""" + + async def s(idx: int) -> str: + v = await self._read_data_field(idx) + return v if isinstance(v, str) else str(v) + + return ByonoyDeviceInfo( + device_id=await s(_DD_DEVICE_ID), + device_name=await s(_DD_DEVICE_NAME), + manufacturer=await s(_DD_DEVICE_MANUFACTURER), + serial_no=await s(_DD_SERIAL_NO), + firmware_version=await s(_DD_FIRMWARE_VERSION), + ref_number=await s(_DD_REF_NUMBER), + ) + + @contextlib.contextmanager + def _measurement_in_flight(self, report_id: int) -> Iterator[None]: + """Mark `report_id` as the in-flight measurement trigger for the duration + of the `with` block. Subclasses' read methods wrap their trigger + result + loop in this so `cancel()` can find the right report to abort and so a + concurrent second read raises instead of corrupting the read buffer. + """ + if self._in_flight_trigger is not None: + raise RuntimeError( + f"Byonoy device busy: report 0x{self._in_flight_trigger:04X} already in " + f"flight; call cancel() before starting 0x{report_id:04X}." + ) + # Entry-side reset is load-bearing for correctness; exit-side is hygiene + # so a between-reads inspection doesn't see stale True from a prior cancel. + self._in_flight_trigger = report_id + self._abort_requested = False + try: + yield + finally: + self._in_flight_trigger = None + self._abort_requested = False + + async def cancel(self) -> None: + """Abort the in-flight measurement via REP_ABORT_REPORT_OUT (0x0060). + + Uses the report id tracked by the read method's `_measurement_in_flight` + context. If no measurement is in flight, this is a no-op. + + Empirically the firmware stops emitting result chunks but sends no closing + notification, so we also raise `_abort_requested`; subclasses' read loops + poll the flag and bail out instead of waiting 120 s for the hard timeout. + """ + report_id = self._in_flight_trigger + if report_id is None: + logger.info("[%s] cancel(): no measurement in flight; no-op", self.name) + return + self._abort_requested = True + payload = Writer().u16(report_id).raw_bytes(b"\x00" * 58).finish() + await self.send_command(report_id=0x0060, payload=payload, wait_for_response=False) + logger.info("[%s] sent abort for in-flight report 0x%04X", self.name, report_id) + + async def set_led_color( + self, + color: Tuple[int, int, int], + effect: LedEffect = "solid", + *, + low_power: bool = False, + force: bool = False, + effect_state: int = 0, + duration_ms: int = 0, + ) -> None: + """Set the LED bar to a single color via REP_LED_BAR_EFFECTS_OUT (0x0351). + + Mirrors the vendor's user-facing `set_led_effect(effect, color, modes, ...)` + in byonoy_device_library. The firmware renders `effect` over `color`: + "solid" just shows the color; "breathing"/"cylon"/"blinking"/"rainbow"/ + "progress" animate it. + + Packed layout (vendor byonoyusbhid.h led_bar_effects_out_t): + effect:u8 color:(r,g,b u8) effect_state:u8 flags:u8 duration_ms:u32 + + `force` (FLAG_LED_FORCE=0x10) overrides an unexpired previous + `duration_ms`. `low_power` (FLAG_LED_LOWPOWER=0x01) reduces brightness. + + `effect_state` is a 0..255 parameter used only by the "progress" effect + to indicate fill level (0 = empty bar, 255 = full bar). It is ignored + by every other effect ("solid", "breathing", "blinking", "cylon", + "rainbow"); leave it at 0 unless you're driving progress. + + The PC routing tag (request_info=0x4000) is required — the firmware + silently drops LED writes that arrive with the default LEGACY tag. + """ + flags = (0x01 if low_power else 0) | (0x10 if force else 0) + r_, g, b = color + payload = ( + Writer() + .u8(_LED_EFFECT_CODES[effect]) + .u8(r_ & 0xFF) + .u8(g & 0xFF) + .u8(b & 0xFF) + .u8(effect_state & 0xFF) + .u8(flags) + .u32(int(duration_ms)) + .finish() + ) + await self.send_command( + report_id=0x0351, + payload=payload, + wait_for_response=False, + routing_info=b"\x00\x40", + ) + + async def set_led_colors(self, colors: List[Tuple[int, int, int]]) -> None: + """Set each of the 20 LED bar pixels individually via + REP_LED_BAR_COLOURS_OUT (0x0350). Pads with black if fewer than 20 are + given; truncates if more. Fast enough for real-time animation (~30+ fps). + + Like `set_led_color`, requires the PC routing tag (request_info=0x4000); + the firmware silently drops writes with the default LEGACY tag. + """ + pixels = list(colors[:20]) + [(0, 0, 0)] * max(0, 20 - len(colors)) + w = Writer() + for r_, g, b in pixels: + w.u8(r_ & 0xFF).u8(g & 0xFF).u8(b & 0xFF) + await self.send_command( + report_id=0x0350, + payload=w.finish(), + wait_for_response=False, + routing_info=b"\x00\x40", + ) + + async def request_versions(self) -> ByonoyVersions: + """Read REP_VERSIONS_IN (0x0080): system / STM / ESP / bootloader versions.""" + response = await self.send_command( + report_id=0x0080, payload=b"\x00" * 60, routing_info=b"\x80\x40" + ) + assert response is not None + r = Reader(response[2:]) + return ByonoyVersions( + system_version=r.u32(), + stm_version=r.u32(), + stm_dev_version=r.u32(), + esp_version=r.u32(), + esp_dev_version=r.u32(), + stm_bootloader_version=r.u32(), + ) diff --git a/pylabrobot/byonoy/driver_tests.py b/pylabrobot/byonoy/driver_tests.py new file mode 100644 index 00000000000..4480ddd3ae2 --- /dev/null +++ b/pylabrobot/byonoy/driver_tests.py @@ -0,0 +1,141 @@ +import unittest + +from pylabrobot.byonoy.driver import ( + _GENERIC_ERROR_NAMES, + _LED_EFFECT_CODES, + ABS1_ERROR_NAMES, + ABS96_ERROR_NAMES, + LUM96_PRESET_S, + Abs1StatusError, + Abs96StatusError, + ByonoyDevice, + ByonoyDriver, + encode_well_bitmask, +) + + +class EncodeWellBitmaskTests(unittest.TestCase): + def test_all_false_is_zero_filled(self): + self.assertEqual(encode_well_bitmask([False] * 96), b"\x00" * 12) + + def test_all_true_is_all_ones(self): + self.assertEqual(encode_well_bitmask([True] * 96), b"\xff" * 12) + + def test_a1_only_sets_byte0_bit0(self): + bools = [False] * 96 + bools[0] = True + self.assertEqual(encode_well_bitmask(bools), b"\x01" + b"\x00" * 11) + + def test_h12_only_sets_byte11_bit7(self): + bools = [False] * 96 + bools[95] = True + self.assertEqual(encode_well_bitmask(bools), b"\x00" * 11 + b"\x80") + + def test_bits_7_and_8_cross_byte_boundary(self): + bools = [False] * 96 + bools[7] = True # byte 0, bit 7 → 0x80 + bools[8] = True # byte 1, bit 0 → 0x01 + self.assertEqual(encode_well_bitmask(bools), b"\x80\x01" + b"\x00" * 10) + + def test_first_column_has_8_bits_set(self): + bools = [False] * 96 + for r in range(8): + bools[r * 12] = True + result = encode_well_bitmask(bools) + self.assertEqual(sum(bin(b).count("1") for b in result), 8) + + def test_custom_n_size(self): + # bit 0 + bit 2 → 0x05 + self.assertEqual(encode_well_bitmask([True, False, True], n=3), b"\x05") + + def test_length_mismatch_raises(self): + with self.assertRaises(ValueError): + encode_well_bitmask([True] * 95) + with self.assertRaises(ValueError): + encode_well_bitmask([True] * 96, n=24) + + +class IntegrationModePresetTests(unittest.TestCase): + def test_preset_values_match_vendor(self): + self.assertEqual(LUM96_PRESET_S["rapid"], 0.1) + self.assertEqual(LUM96_PRESET_S["sensitive"], 2.0) + self.assertEqual(LUM96_PRESET_S["ultra_sensitive"], 20.0) + + def test_custom_is_not_a_preset(self): + # "custom" is a Literal value but has no preset — read_luminescence + # requires the caller to set integration_time explicitly. + self.assertNotIn("custom", LUM96_PRESET_S) + + +class ErrorTableTests(unittest.TestCase): + def test_generic_table_has_only_no_error(self): + self.assertEqual(_GENERIC_ERROR_NAMES, {0: "NO_ERROR"}) + + def test_abs96_table_round_trips(self): + self.assertEqual(ABS96_ERROR_NAMES[0], "NO_ERROR") + self.assertEqual(ABS96_ERROR_NAMES[1], "ERROR_CALIB") + self.assertEqual(ABS96_ERROR_NAMES[Abs96StatusError.ERROR_NO_ACK], "ERROR_NO_ACK") + + def test_abs1_flag_bit_values(self): + # AbsOne is a bit-flag enum; verify bit positions match the vendor header. + self.assertEqual(Abs1StatusError.AMBIENT_LIGHT.value, 1) + self.assertEqual(Abs1StatusError.MIN_LIGHT.value, 2) + self.assertEqual(Abs1StatusError.USB.value, 4) + self.assertEqual(Abs1StatusError.HARDWARE.value, 8) + self.assertEqual(Abs1StatusError.NOISE_LIMIT.value, 128) + + def test_abs1_supports_combined_flags(self): + combined = Abs1StatusError.AMBIENT_LIGHT | Abs1StatusError.HARDWARE + self.assertEqual(combined.value, 9) + + def test_abs1_table_includes_all_enum_members(self): + for member in Abs1StatusError: + self.assertIn(member.value, ABS1_ERROR_NAMES) + + +class DescribeErrorCodeTests(unittest.TestCase): + """describe_error_code() is pure — bypass __init__ to avoid HID setup.""" + + def _make_driver(self, error_names): + drv = ByonoyDriver.__new__(ByonoyDriver) + drv._ERROR_NAMES = error_names + return drv + + def test_known_code_returns_name(self): + drv = self._make_driver(ABS96_ERROR_NAMES) + self.assertEqual(drv.describe_error_code(1), "ERROR_CALIB") + self.assertEqual(drv.describe_error_code(0), "NO_ERROR") + + def test_unknown_code_falls_back_to_hex(self): + drv = self._make_driver(ABS96_ERROR_NAMES) + self.assertEqual(drv.describe_error_code(0xAB), "errorCode=0xab") + + def test_generic_table_only_knows_no_error(self): + # Lum96 uses the generic table — anything non-zero is the hex sentinel. + drv = self._make_driver(_GENERIC_ERROR_NAMES) + self.assertEqual(drv.describe_error_code(0), "NO_ERROR") + self.assertEqual(drv.describe_error_code(7), "errorCode=0x07") + self.assertEqual(drv.describe_error_code(255), "errorCode=0xff") + + +class LedEffectCodeTests(unittest.TestCase): + def test_codes_cover_all_effect_literals(self): + self.assertEqual( + set(_LED_EFFECT_CODES), + {"solid", "progress", "cylon", "rainbow", "blinking", "breathing"}, + ) + + def test_solid_is_zero(self): + self.assertEqual(_LED_EFFECT_CODES["solid"], 0x00) + + def test_codes_are_unique(self): + self.assertEqual(len(set(_LED_EFFECT_CODES.values())), len(_LED_EFFECT_CODES)) + + +class ByonoyDeviceEnumTests(unittest.TestCase): + def test_distinct_values(self): + self.assertNotEqual(ByonoyDevice.ABSORBANCE_96, ByonoyDevice.LUMINESCENCE_96) + + +if __name__ == "__main__": + unittest.main() diff --git a/pylabrobot/byonoy/luminescence_96.py b/pylabrobot/byonoy/luminescence_96.py index 6abf9e68c9d..c89027f3717 100644 --- a/pylabrobot/byonoy/luminescence_96.py +++ b/pylabrobot/byonoy/luminescence_96.py @@ -1,9 +1,16 @@ +import asyncio import logging import time from dataclasses import dataclass from typing import List, Optional, Tuple -from pylabrobot.byonoy.backend import ByonoyBase, ByonoyDevice +from pylabrobot.byonoy.driver import ( + LUM96_PRESET_S, + ByonoyDevice, + ByonoyDriver, + Lum96IntegrationMode, + encode_well_bitmask, +) from pylabrobot.capabilities.capability import BackendParams from pylabrobot.capabilities.plate_reading.luminescence import ( Luminescence, @@ -27,21 +34,26 @@ # --------------------------------------------------------------------------- -class ByonoyLuminescence96Backend(ByonoyBase, LuminescenceBackend): +class ByonoyLuminescence96Backend(ByonoyDriver, LuminescenceBackend): """Backend for the Byonoy Luminescence 96 Automate plate reader.""" def __init__(self) -> None: - super().__init__(pid=0x119B, device_type=ByonoyDevice.LUMINESCENCE_96) + super().__init__(pid=0x119B, device_type=ByonoyDevice.LUMINESCENCE_96, name="Byonoy L96") @dataclass class LuminescenceParams(BackendParams): """Byonoy Luminescence 96 parameters for luminescence reads. Args: - integration_time: Integration time in seconds. Default 2. + mode: One of "rapid" (100 ms), "sensitive" (2 s, default), + "ultra_sensitive" (20 s), or "custom". Presets match the + byonoy_device_library mapping. + integration_time: Integration time in seconds. If set, forces "custom" + mode regardless of `mode`. Required when `mode == "custom"`. """ - integration_time: float = 2 + mode: Lum96IntegrationMode = "sensitive" + integration_time: Optional[float] = None async def read_luminescence( self, @@ -55,78 +67,148 @@ async def read_luminescence( Args: plate: The plate being read. wells: Wells to measure. - focal_height: Focal height in mm. + focal_height: Required by the abstract :class:`LuminescenceBackend` + contract but **ignored on the Byonoy L96** — the device has a + fixed optical configuration (the detector unit clamps onto the + base; the optical path is determined by plate + base + detector + geometry, not user-tunable). Passing any value is harmless; + passing 0 is conventional. backend_params: Backend-specific parameters. """ if not isinstance(backend_params, self.LuminescenceParams): backend_params = ByonoyLuminescence96Backend.LuminescenceParams() - integration_time = backend_params.integration_time + # Resolve mode + integration time + if backend_params.integration_time is not None: + mode = "custom" + integration_time = backend_params.integration_time + elif backend_params.mode == "custom": + raise ValueError("'custom' mode requires integration_time to be set.") + else: + mode = backend_params.mode + integration_time = LUM96_PRESET_S[mode] + + # Firmware always scans all 96 wells; this mask only filters which are + # reported (others come back as 0.0). Single source of truth: the wells arg. + well_set = set(id(w) for w in wells) + mask_bools = [id(w) in well_set for w in plate.get_all_items()] + + well_mask = encode_well_bitmask(mask_bools, n=96) logger.info( - "[Byonoy L96 pid=0x%04X] reading luminescence: plate='%s', integration_time=%.1fs, wells=%d/%d", - self.io.pid, + "[%s] reading luminescence: plate='%s', mode=%s, integration_time=%.3fs, wells=%d/96", + self.name, plate.name, + mode, integration_time, - len(wells), - plate.num_items, - ) - - await self.send_command( - report_id=0x0010, - payload=b"\x00" * 60, - wait_for_response=False, - ) - - payload2 = Writer().u16(7).u8(0).raw_bytes(b"\x00" * 52).finish() - await self.send_command( - report_id=0x0200, - payload=payload2, - wait_for_response=False, + sum(mask_bools), ) - payload3 = ( - Writer().i32(int(integration_time * 1000 * 1000)).raw_bytes(b"\xff" * 12).u8(0).u8(0).finish() - ) - await self.send_command( - report_id=0x0340, - payload=payload3, - wait_for_response=False, - ) - - t0 = time.time() - all_rows: List[Optional[float]] = [] - - while True: - if time.time() - t0 > 120: - logger.error("[Byonoy L96 pid=0x%04X] luminescence read timed out after 120s", self.io.pid) - raise TimeoutError("Reading luminescence data timed out after 2 minutes.") - - chunk = await self.io.read(64, timeout=30) - if len(chunk) == 0: - continue - - reader = Reader(chunk) - report_id = reader.u16() + with self._measurement_in_flight(0x0340): + await self.send_command( + report_id=0x0010, + payload=b"\x00" * 60, + wait_for_response=False, + ) - if report_id == 0x0600: - seq = reader.u8() - seq_len = reader.u8() - _ = reader.u32() # integration_time_us - _ = reader.u32() # duration_ms - row = [reader.f32() for _ in range(12)] - _ = reader.u8() # flags - _ = reader.u8() # progress + payload2 = Writer().u16(7).u8(0).raw_bytes(b"\x00" * 52).finish() + await self.send_command( + report_id=0x0200, + payload=payload2, + wait_for_response=False, + ) - all_rows.extend(row) + payload3 = ( + Writer() + .i32(int(integration_time * 1_000_000)) + .raw_bytes(well_mask) + .u8(0) # is_reference_measurement + .u8(0) # flags + .finish() + ) + await self.send_command( + report_id=0x0340, + payload=payload3, + wait_for_response=False, + ) - if seq == seq_len - 1: - break + t0 = time.time() + # Index by seq so an out-of-order or dropped chunk surfaces as a None + # slot instead of silently shifting subsequent rows into the wrong wells. + rows_by_seq: List[Optional[List[float]]] = [] + flags_by_seq: List[Optional[int]] = [] + expected_chunks: Optional[int] = None + + while True: + if self._abort_requested: + logger.info("[%s] read aborted by cancel()", self.name) + raise asyncio.CancelledError("Luminescence read aborted via cancel().") + if time.time() - t0 > 120: + logger.error("[%s] luminescence read timed out after 120s", self.name) + raise TimeoutError("Reading luminescence data timed out after 2 minutes.") + + chunk = await self.io.read(64, timeout=2) + if len(chunk) == 0: + continue + + reader = Reader(chunk) + report_id = reader.u16() + + if report_id == 0x0600: + seq = reader.u8() + seq_len = reader.u8() + _ = reader.u32() # integration_time_us + _ = reader.u32() # duration_ms + row = [reader.f32() for _ in range(12)] + flags = reader.u8() + _ = reader.u8() # progress (0..100 running %); not surfaced + + if seq_len == 0: + raise RuntimeError(f"{self.name} firmware sent chunk with seq_len=0") + if expected_chunks is None: + expected_chunks = seq_len + rows_by_seq = [None] * seq_len + flags_by_seq = [None] * seq_len + elif seq_len != expected_chunks: + raise RuntimeError( + f"{self.name} firmware changed seq_len mid-stream: {expected_chunks} → {seq_len}" + ) + if not 0 <= seq < seq_len: + raise RuntimeError(f"{self.name} firmware sent seq={seq} (seq_len={seq_len})") + rows_by_seq[seq] = row + flags_by_seq[seq] = flags + + if all(r is not None for r in rows_by_seq): + break + + if expected_chunks is None: + raise RuntimeError(f"{self.name} luminescence read produced no chunks") + chunk_flags: List[int] = [f for f in flags_by_seq if f is not None] + all_rows: List[float] = [v for row in rows_by_seq if row is not None for v in row] + + # Check firmware health before trusting the data. error_code is the + # authoritative post-measurement status byte; per-chunk flags are + # undocumented but a non-zero value means the firmware flagged the chunk. + status = await self.request_status() + if status.error_code != 0: + raise RuntimeError( + f"{self.name} firmware error after read: " + f"{self.describe_error_code(status.error_code)} " + f"(chunk flags: {[f'0x{f:02x}' for f in chunk_flags]})" + ) + self._warn_chunk_flags(chunk_flags) + if len(all_rows) != 96: + raise RuntimeError( + f"{self.name} luminescence read produced {len(all_rows)} values (expected 96)" + ) - hybrid_result: List[Optional[float]] = all_rows[96 * 0 : 96 * 1] + # Firmware zero-fills wells outside the mask. Convert those to None per + # the LuminescenceResult contract ("None for unmeasured wells") — 0.0 is + # a legitimate measurement (baseline subtraction can yield ~0 or negative). + masked: List[Optional[float]] = [v if m else None for v, m in zip(all_rows, mask_bools)] return [ LuminescenceResult( - data=reshape_2d(hybrid_result, (8, 12)), + data=reshape_2d(masked, (8, 12)), temperature=None, timestamp=time.time(), ) @@ -220,13 +302,13 @@ def assign_child_resource( ) -> None: if isinstance(resource, _ByonoyLuminescenceReaderPlateHolder): if self.plate_holder._byonoy_base is not None: - raise ValueError("ByonoyBase can only have one plate holder assigned.") + raise ValueError("ByonoyDriver can only have one plate holder assigned.") self.plate_holder._byonoy_base = self super().assign_child_resource(resource, location, reassign) def check_can_drop_resource_here(self, resource: Resource, *, reassign: bool = True) -> None: raise RuntimeError( - "ByonoyBase does not support assigning child resources directly. " + "ByonoyDriver does not support assigning child resources directly. " "Use the plate_holder or reader_unit_holder to assign plates and the reader unit, " "respectively." ) diff --git a/pylabrobot/io/ftdi.py b/pylabrobot/io/ftdi.py index ee967cc8ce2..e52cb286de4 100644 --- a/pylabrobot/io/ftdi.py +++ b/pylabrobot/io/ftdi.py @@ -74,7 +74,7 @@ def __init__( f"Import error: {_PYUSB_ERROR}" ) - self._human_readable_device_name = human_readable_device_name + self.human_readable_device_name = human_readable_device_name self._device_id = device_id self._vid = vid self._pid = pid @@ -86,7 +86,7 @@ def __init__( if get_capture_or_validation_active(): raise RuntimeError( - f"Cannot create a new FTDI object for '{self._human_readable_device_name}' while capture or validation is active" + f"Cannot create a new FTDI object for '{self.human_readable_device_name}' while capture or validation is active" ) @property @@ -198,7 +198,7 @@ async def setup(self): logger.info(f"Successfully opened FTDI device: {self.device_id}") except FtdiError as e: raise RuntimeError( - f"Failed to open FTDI device for '{self._human_readable_device_name}': {e}. " + f"Failed to open FTDI device for '{self.human_readable_device_name}': {e}. " "Is the device connected? Is it in use by another process? " "Try restarting the kernel." ) from e @@ -329,7 +329,7 @@ async def readline(self) -> bytes: # type: ignore # very dumb it's reading from def serialize(self): return { - "human_readable_device_name": self._human_readable_device_name, + "human_readable_device_name": self.human_readable_device_name, "device_id": self._device_id, "vid": self._vid, "pid": self._pid, diff --git a/pylabrobot/io/hid.py b/pylabrobot/io/hid.py index f6b24ff3a51..1b624da6962 100644 --- a/pylabrobot/io/hid.py +++ b/pylabrobot/io/hid.py @@ -32,7 +32,7 @@ class HID(IOBase): def __init__( self, human_readable_device_name: str, vid: int, pid: int, serial_number: Optional[str] = None ): - self._human_readable_device_name = human_readable_device_name + self.human_readable_device_name = human_readable_device_name self.vid = vid self.pid = pid self.serial_number = serial_number @@ -144,7 +144,7 @@ async def write(self, data: bytes, report_id: bytes = b"\x00"): def _write(): if self.device is None: - raise RuntimeError(f"Call setup() first for device '{self._human_readable_device_name}'.") + raise RuntimeError(f"Call setup() first for device '{self.human_readable_device_name}'.") return self.device.write(write_data) if self._executor is None: @@ -161,7 +161,7 @@ async def read(self, size: int, timeout: int) -> bytes: def _read(): if self.device is None: - raise RuntimeError(f"Call setup() first for device '{self._human_readable_device_name}'.") + raise RuntimeError(f"Call setup() first for device '{self.human_readable_device_name}'.") try: return self.device.read(size, timeout=int(timeout)) except HIDException as e: @@ -179,7 +179,7 @@ def _read(): def serialize(self): return { - "human_readable_device_name": self._human_readable_device_name, + "human_readable_device_name": self.human_readable_device_name, "vid": self.vid, "pid": self.pid, "serial_number": self.serial_number, diff --git a/pylabrobot/io/serial.py b/pylabrobot/io/serial.py index b8b5c085071..26ddf801d96 100644 --- a/pylabrobot/io/serial.py +++ b/pylabrobot/io/serial.py @@ -51,7 +51,7 @@ def __init__( dsrdtr: bool = False, xonxoff: bool = False, ): - self._human_readable_device_name = human_readable_device_name + self.human_readable_device_name = human_readable_device_name self._port = port self._vid = vid self._pid = pid @@ -202,7 +202,7 @@ def _open_serial() -> serial.Serial: except serial.SerialException as e: logger.error( - f"Could not connect to device '{self._human_readable_device_name}', is it in use by a different notebook/process?" + f"Could not connect to device '{self.human_readable_device_name}', is it in use by a different notebook/process?" ) if self._executor is not None: self._executor.shutdown(wait=True) @@ -220,7 +220,7 @@ async def stop(self): loop = asyncio.get_running_loop() if self._executor is None: - raise RuntimeError(f"Call setup() first for device '{self._human_readable_device_name}'.") + raise RuntimeError(f"Call setup() first for device '{self.human_readable_device_name}'.") await loop.run_in_executor(self._executor, self._ser.close) if self._executor is not None: @@ -232,7 +232,7 @@ async def write(self, data: bytes): loop = asyncio.get_running_loop() if self._executor is None or self._ser is None: - raise RuntimeError(f"Call setup() first for device '{self._human_readable_device_name}'.") + raise RuntimeError(f"Call setup() first for device '{self.human_readable_device_name}'.") await loop.run_in_executor(self._executor, self._ser.write, data) @@ -246,7 +246,7 @@ async def read(self, num_bytes: int = 1) -> bytes: loop = asyncio.get_running_loop() if self._executor is None or self._ser is None: - raise RuntimeError(f"Call setup() first for device '{self._human_readable_device_name}'.") + raise RuntimeError(f"Call setup() first for device '{self.human_readable_device_name}'.") data = await loop.run_in_executor(self._executor, self._ser.read, num_bytes) @@ -263,7 +263,7 @@ async def readline(self) -> bytes: # type: ignore # very dumb it's reading from loop = asyncio.get_running_loop() if self._executor is None or self._ser is None: - raise RuntimeError(f"Call setup() first for device '{self._human_readable_device_name}'.") + raise RuntimeError(f"Call setup() first for device '{self.human_readable_device_name}'.") data = await loop.run_in_executor(self._executor, self._ser.readline) @@ -280,7 +280,7 @@ async def send_break(self, duration: float): loop = asyncio.get_running_loop() if self._executor is None or self._ser is None: - raise RuntimeError(f"Call setup() first for device '{self._human_readable_device_name}'.") + raise RuntimeError(f"Call setup() first for device '{self.human_readable_device_name}'.") def _send_break(ser, duration: float) -> None: """Send a break condition for the specified duration.""" @@ -294,7 +294,7 @@ def _send_break(ser, duration: float) -> None: async def reset_input_buffer(self): loop = asyncio.get_running_loop() if self._executor is None or self._ser is None: - raise RuntimeError(f"Call setup() first for device '{self._human_readable_device_name}'.") + raise RuntimeError(f"Call setup() first for device '{self.human_readable_device_name}'.") await loop.run_in_executor(self._executor, self._ser.reset_input_buffer) logger.log(LOG_LEVEL_IO, "[%s] reset_input_buffer", self._port) capturer.record(SerialCommand(device_id=self.port, action="reset_input_buffer", data="")) @@ -302,7 +302,7 @@ async def reset_input_buffer(self): async def reset_output_buffer(self): loop = asyncio.get_running_loop() if self._executor is None or self._ser is None: - raise RuntimeError(f"Call setup() first for device '{self._human_readable_device_name}'.") + raise RuntimeError(f"Call setup() first for device '{self.human_readable_device_name}'.") await loop.run_in_executor(self._executor, self._ser.reset_output_buffer) logger.log(LOG_LEVEL_IO, "[%s] reset_output_buffer", self._port) capturer.record(SerialCommand(device_id=self.port, action="reset_output_buffer", data="")) @@ -341,7 +341,7 @@ def rts(self, value: bool): def serialize(self): return { - "human_readable_device_name": self._human_readable_device_name, + "human_readable_device_name": self.human_readable_device_name, "port": self._port, "baudrate": self.baudrate, "bytesize": self.bytesize, diff --git a/pylabrobot/io/socket.py b/pylabrobot/io/socket.py index 575cb4d7c1d..ae465fa3cfa 100644 --- a/pylabrobot/io/socket.py +++ b/pylabrobot/io/socket.py @@ -38,7 +38,7 @@ def __init__( ssl_context: Optional[ssl.SSLContext] = None, server_hostname: Optional[str] = None, ): - self._human_readable_device_name = human_readable_device_name + self.human_readable_device_name = human_readable_device_name self._host = host self._port = port self._reader: Optional[asyncio.StreamReader] = None @@ -91,7 +91,7 @@ async def reconnect(self): def serialize(self): return { - "human_readable_device_name": self._human_readable_device_name, + "human_readable_device_name": self.human_readable_device_name, "host": self._host, "port": self._port, "type": "Socket", @@ -105,7 +105,7 @@ async def write(self, data: bytes, timeout: Optional[float] = None) -> None: """ if self._writer is None: raise RuntimeError( - f"Socket for '{self._human_readable_device_name}' not set up; call setup() first" + f"Socket for '{self.human_readable_device_name}' not set up; call setup() first" ) timeout = self._write_timeout if timeout is None else timeout async with self._write_lock: @@ -142,7 +142,7 @@ async def read(self, num_bytes: int = 128, timeout: Optional[float] = None) -> b """ if self._reader is None: raise RuntimeError( - f"Socket for '{self._human_readable_device_name}' not set up; call setup() first" + f"Socket for '{self.human_readable_device_name}' not set up; call setup() first" ) timeout = self._read_timeout if timeout is None else timeout async with self._read_lock: @@ -165,7 +165,7 @@ async def readline(self, timeout: Optional[float] = None) -> bytes: """Wrapper around StreamReader.readline with lock and io logging.""" if self._reader is None: raise RuntimeError( - f"Socket for '{self._human_readable_device_name}' not set up; call setup() first" + f"Socket for '{self.human_readable_device_name}' not set up; call setup() first" ) timeout = self._read_timeout if timeout is None else timeout async with self._read_lock: @@ -189,7 +189,7 @@ async def readuntil(self, separator: bytes = b"\n", timeout: Optional[float] = N Do not retry on timeouts.""" if self._reader is None: raise RuntimeError( - f"Socket for '{self._human_readable_device_name}' not set up; call setup() first" + f"Socket for '{self.human_readable_device_name}' not set up; call setup() first" ) timeout = self._read_timeout if timeout is None else timeout async with self._read_lock: @@ -227,7 +227,7 @@ async def read_exact(self, num_bytes: int, timeout: Optional[float] = None) -> b """ if self._reader is None: raise RuntimeError( - f"Socket for '{self._human_readable_device_name}' not set up; call setup() first" + f"Socket for '{self.human_readable_device_name}' not set up; call setup() first" ) timeout = self._read_timeout if timeout is None else timeout data = bytearray() @@ -265,7 +265,7 @@ async def read_until_eof(self, chunk_size: int = 1024, timeout: Optional[float] while True: if self._reader is None: raise RuntimeError( - f"Socket for '{self._human_readable_device_name}' not set up; call setup() first" + f"Socket for '{self.human_readable_device_name}' not set up; call setup() first" ) try: chunk = await asyncio.wait_for(self._reader.read(chunk_size), timeout=timeout) diff --git a/pylabrobot/io/usb.py b/pylabrobot/io/usb.py index a0b4bcba101..ab54e8c53de 100644 --- a/pylabrobot/io/usb.py +++ b/pylabrobot/io/usb.py @@ -106,7 +106,7 @@ def __init__( # unique id in the logs self._unique_id = f"[{hex(self._id_vendor)}:{hex(self._id_product)}][{self._serial_number or ''}][{self._device_address or ''}]" - self._human_readable_device_name = human_readable_device_name + self.human_readable_device_name = human_readable_device_name async def write(self, data: bytes, timeout: Optional[float] = None): """Write data to the device. @@ -118,7 +118,7 @@ async def write(self, data: bytes, timeout: Optional[float] = None): """ if self.dev is None or self.read_endpoint is None: - raise RuntimeError(f"USB device for '{self._human_readable_device_name}' is not connected.") + raise RuntimeError(f"USB device for '{self.human_readable_device_name}' is not connected.") if timeout is None: timeout = self.write_timeout @@ -128,7 +128,7 @@ async def write(self, data: bytes, timeout: Optional[float] = None): write_endpoint = self.write_endpoint dev = self.dev if self._executor is None or dev is None or write_endpoint is None: - raise RuntimeError(f"Call setup() first for USB device '{self._human_readable_device_name}'.") + raise RuntimeError(f"Call setup() first for USB device '{self.human_readable_device_name}'.") await loop.run_in_executor( self._executor, lambda: dev.write( @@ -169,7 +169,7 @@ def _read_packet( """ if self.dev is None or self.read_endpoint is None: - raise RuntimeError(f"USB device for '{self._human_readable_device_name}' is not connected.") + raise RuntimeError(f"USB device for '{self.human_readable_device_name}' is not connected.") ep = endpoint if endpoint is not None else self.read_endpoint if ep is None: @@ -221,7 +221,7 @@ async def read(self, timeout: Optional[int] = None, size: Optional[int] = None) """ if self.dev is None or self.read_endpoint is None: - raise RuntimeError(f"USB device for '{self._human_readable_device_name}' is not connected.") + raise RuntimeError(f"USB device for '{self.human_readable_device_name}' is not connected.") if timeout is None: timeout = self.read_timeout @@ -261,12 +261,12 @@ def read_or_timeout(): return resp raise TimeoutError( - f"Timeout while reading from USB device '{self._human_readable_device_name}'." + f"Timeout while reading from USB device '{self.human_readable_device_name}'." ) loop = asyncio.get_running_loop() if self._executor is None or self.dev is None: - raise RuntimeError(f"Call setup() first for USB device '{self._human_readable_device_name}'.") + raise RuntimeError(f"Call setup() first for USB device '{self.human_readable_device_name}'.") return await loop.run_in_executor(self._executor, read_or_timeout) def get_available_devices(self) -> List["usb.core.Device"]: @@ -283,7 +283,7 @@ def get_available_devices(self) -> List["usb.core.Device"]: if self._device_address is not None: if dev.address is None: raise RuntimeError( - f"A device address was specified for '{self._human_readable_device_name}', but the backend used for PyUSB does " + f"A device address was specified for '{self.human_readable_device_name}', but the backend used for PyUSB does " "not support device addresses." ) @@ -293,7 +293,7 @@ def get_available_devices(self) -> List["usb.core.Device"]: if self._serial_number is not None: if dev._serial_number is None: raise RuntimeError( - f"A serial number was specified for '{self._human_readable_device_name}', but the device does not have a serial number." + f"A serial number was specified for '{self.human_readable_device_name}', but the device does not have a serial number." ) if dev.serial_number != self._serial_number: @@ -331,7 +331,7 @@ def ctrl_transfer( timeout: Optional[int] = None, ) -> bytearray: if self.dev is None: - raise RuntimeError(f"USB device for '{self._human_readable_device_name}' is not connected.") + raise RuntimeError(f"USB device for '{self.human_readable_device_name}' is not connected.") if timeout is None: timeout = self.read_timeout @@ -467,7 +467,7 @@ def serialize(self) -> dict: d = { **super().serialize(), - "human_readable_device_name": self._human_readable_device_name, + "human_readable_device_name": self.human_readable_device_name, "id_vendor": self._id_vendor, "id_product": self._id_product, "device_address": self._device_address,