Data analysis and shared memory services#898
Open
cappel89 wants to merge 6 commits into
Open
Conversation
1baef84 to
db070a4
Compare
Codecov Report❌ Patch coverage is 📢 Thoughts on this report? Let us know! |
bf68ba2 to
c4b68c7
Compare
Contributor
There was a problem hiding this comment.
Pull request overview
This PR introduces a shared-memory ring buffer implementation in bec_server and wires it into BEC messaging/endpoints so device-server signals (writers) and external analysis services (readers) can exchange large payloads via shared memory while coordinating via Redis events.
Changes:
- Added a POSIX-semaphore–backed shared-memory ring buffer (
RingBuffer/RingBufferView) plus Pydantic descriptors for attaching to buffers. - Implemented a
SharedMemoryManagerservice +SharedMemoryClienthelper and a CLI launch entrypoint for running the manager. - Extended
bec_libmessage/endpoints contracts and added unit/integration-style tests for ring-buffer behavior and Redis event flow.
Reviewed changes
Copilot reviewed 12 out of 14 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| bec_server/bec_server/shared_memory/ring_buffer.py | New shared-memory ring buffer implementation with POSIX semaphore locking. |
| bec_server/bec_server/shared_memory/models.py | New Pydantic models describing payload dtype/shape and ring buffer descriptors. |
| bec_server/bec_server/shared_memory/manager.py | New service that allocates/deallocates ring buffers and publishes allocation info. |
| bec_server/bec_server/shared_memory/client.py | New client that subscribes to allocation info and reads/writes via attached views. |
| bec_server/bec_server/shared_memory/cli/launch.py | CLI launcher for the shared memory manager service. |
| bec_server/bec_server/shared_memory/README.md | Design/usage documentation for layout, locking, and ownership. |
| bec_server/tests/tests_shared_memory/test_ring_buffer.py | Unit tests for descriptor integrity, locking semantics, and resource cleanup. |
| bec_server/tests/tests_shared_memory/test_ring_buffer_event_flow.py | Multiprocess + fakeredis test validating slot-written/slot-processed flow. |
| bec_server/pyproject.toml | Adds posix_ipc dependency for semaphore support. |
| bec_lib/bec_lib/messages.py | Adds shared-memory-related BECMessage types (currently with a problematic dependency). |
| bec_lib/bec_lib/endpoints.py | Adds Redis endpoints for shared-memory allocation/info and slot event streams. |
| bec_lib/tests/test_bec_messages.py | Adds message round-trip + endpoint contract tests (currently imports bec_server). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Comment on lines
+20
to
+22
| # TODO remove bec_server depencency.. | ||
| from bec_server.shared_memory.models import PayloadDescriptor, SharedMemInfo | ||
|
|
Comment on lines
+8
to
+15
| from bec_lib.endpoints import MessageEndpoints, MessageOp | ||
| from bec_lib.serialization import MsgpackSerialization | ||
| from bec_server.shared_memory.models import ( | ||
| DTypeDescriptor, | ||
| PayloadDescriptor, | ||
| RingBufferDescriptor, | ||
| SharedMemInfo, | ||
| ) |
Comment on lines
+289
to
+304
| def __init__(self, slots: int, payload: PayloadDescriptor, name_suffix: str = ""): | ||
| if not 0 < slots: | ||
| raise ValueError("Ring buffer must contain at least one slot.") | ||
| name = f"bec_psm_{uuid4().hex[:6]}" | ||
| reader_count_name = f"{name}_cnt" | ||
| data_lock_names = tuple(self._semaphore_name(name, f"_d_{index}") for index in range(slots)) | ||
| reader_gate_names = tuple( | ||
| self._semaphore_name(name, f"_g_{index}") for index in range(slots) | ||
| ) | ||
| reader_count_lock_names = tuple( | ||
| self._semaphore_name(name, f"_c_{index}") for index in range(slots) | ||
| ) | ||
| shm = shared_memory.SharedMemory(create=True, size=slots * payload.nbytes, name=name) | ||
| reader_count_shm = shared_memory.SharedMemory( | ||
| create=True, size=slots * READER_COUNT_DTYPE.itemsize, name=reader_count_name | ||
| ) |
Comment on lines
+44
to
+49
| logger.error( | ||
| f"Shared memory object for client {request.client_id} and signal {request.signal} already exists. Overwriting." | ||
| ) | ||
| # TODO should this republish the info? | ||
| self._publish_allocation_info(self._shared_memory_info) | ||
| return |
Comment on lines
+3
to
+16
| from typing import TYPE_CHECKING | ||
|
|
||
| from bec_lib.endpoints import MessageEndpoints | ||
| from bec_lib.logger import bec_logger | ||
| from bec_server.shared_memory.models import PayloadDescriptor | ||
| from bec_server.shared_memory.ring_buffer import RingBufferView | ||
|
|
||
| if TYPE_CHECKING: | ||
| import numpy as np | ||
|
|
||
| from bec_lib.connector import MessageObject | ||
| from bec_lib.messages import SharedMemAllocationInfo | ||
| from bec_lib.redis_connector import RedisConnector | ||
|
|
Comment on lines
+5
to
+18
| from typing import TYPE_CHECKING, Literal, Tuple | ||
|
|
||
| from bec_lib import messages | ||
| from bec_lib.bec_service import BECService | ||
| from bec_lib.endpoints import MessageEndpoints | ||
| from bec_lib.logger import bec_logger | ||
| from bec_server.shared_memory.models import SharedMemInfo | ||
| from bec_server.shared_memory.ring_buffer import RingBuffer | ||
|
|
||
| SUPPORTED_DATATYPES = Literal["str", "float", "byte", "np.array", "list", "dict"] | ||
|
|
||
| if TYPE_CHECKING: | ||
| from bec_lib.redis_connector import MessageObject, RedisConnector | ||
|
|
Comment on lines
+67
to
+72
| return cls( | ||
| nbytes=array.nbytes, | ||
| shape=array.shape, | ||
| dtype=DTypeDescriptor.from_numpy(array.dtype), | ||
| layout="C" if array.flags.c_contiguous else "C", | ||
| ) |
Comment on lines
+47
to
+49
| logger.error( | ||
| f"Ring buffer view for signal {signal} already exists, should not happend. Received info update: {buff_info}" | ||
| ) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Description
PR to introduce a shared memory service, that is responsible to allocate shared memory objects to be used by signals on the device server, and share data with dedicated 2D data analysis services. In order to test, please launch the service while you have REDIS running through shared_memory/cli/launch.py.
In addition, below are 2 scripts running a writer and reader that communicate through REDIS, and allocated memory to exchange data and calculating the sum over the image. To test, please copy the scripts below to your system and run a test.
Writer process.
Reader