Skip to content

Data analysis and shared memory services#898

Open
cappel89 wants to merge 6 commits into
mainfrom
data-analysis-and-shared-memory-services
Open

Data analysis and shared memory services#898
cappel89 wants to merge 6 commits into
mainfrom
data-analysis-and-shared-memory-services

Conversation

@cappel89
Copy link
Copy Markdown
Member

@cappel89 cappel89 commented May 19, 2026

Description

PR to introduce a shared memory service, that is responsible to allocate shared memory objects to be used by signals on the device server, and share data with dedicated 2D data analysis services. In order to test, please launch the service while you have REDIS running through shared_memory/cli/launch.py.

In addition, below are 2 scripts running a writer and reader that communicate through REDIS, and allocated memory to exchange data and calculating the sum over the image. To test, please copy the scripts below to your system and run a test.

Writer process.

#!/usr/bin/env python3
"""Small manual writer for the shared-memory manager demo.

Run the manager first:

    python -m bec_server.shared_memory.cli.launch --bec-server localhost:6379

Then start ``scripts/shared_memory_reader.py`` and this writer script.
"""

from __future__ import annotations

import time

import numpy as np

from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import RedisConnector
from bec_server.shared_memory.models import PayloadDescriptor
from bec_server.shared_memory.ring_buffer import RingBufferView

REDIS_BOOTSTRAP = "localhost:6379"
CLIENT_ID = "shared-mem-demo"
SIGNAL = "demo.image"
IMAGE_SHAPE = (4, 4)
IMAGE_COUNT = 10
SLOTS = 2
WAIT_TIMEOUT = 10.0
POLL_INTERVAL = 0.1


def wait_for_view(connector: RedisConnector) -> RingBufferView:
    deadline = time.monotonic() + WAIT_TIMEOUT
    while time.monotonic() < deadline:
        info = connector.get(MessageEndpoints.shared_memory_info())
        if info is not None:
            shared_mem_info = info.info.get(CLIENT_ID, {}).get(SIGNAL)
            if shared_mem_info is not None:
                return RingBufferView(shared_mem_info.buffer_desc)
        time.sleep(POLL_INTERVAL)
    raise TimeoutError(
        f"Timed out waiting for shared memory allocation {CLIENT_ID}/{SIGNAL}."
    )


def wait_for_result(
    connector: RedisConnector, slot_index: int
) -> messages.SharedMemSlotProcessed:
    deadline = time.monotonic() + WAIT_TIMEOUT
    while time.monotonic() < deadline:
        records = connector.xread(
            MessageEndpoints.shared_memory_slot_processed(), block=100, count=1
        )
        for record in records or []:
            event = record.get("data")
            if (
                isinstance(event, messages.SharedMemSlotProcessed)
                and event.client_id == CLIENT_ID
                and event.signal == SIGNAL
                and event.slot_index == slot_index
            ):
                return event
    raise TimeoutError(f"Timed out waiting for processed event for slot {slot_index}.")


def main() -> None:
    connector = RedisConnector(
        REDIS_BOOTSTRAP, name="SharedMemoryDemoWriter RedisConnector"
    )
    payload = PayloadDescriptor.from_numpy(np.zeros(IMAGE_SHAPE, dtype=np.float64))
    view: RingBufferView | None = None
    allocation_requested = False

    try:
        connector.delete(MessageEndpoints.shared_memory_slot_written())
        connector.delete(MessageEndpoints.shared_memory_slot_processed())

        print(
            f"writer: requesting {SLOTS}-slot buffer for {CLIENT_ID}/{SIGNAL}",
            flush=True,
        )
        connector.xadd(
            MessageEndpoints.shared_memory_allocate(),
            {
                "client_id": CLIENT_ID,
                "slots": SLOTS,
                "payload_desc": payload,
                "signal": SIGNAL,
            },
            max_size=1000,
        )
        allocation_requested = True

        view = wait_for_view(connector)
        print(f"writer: attached to shared memory {view.name}", flush=True)

        for image_index in range(IMAGE_COUNT):
            image = np.full(IMAGE_SHAPE, image_index, dtype=np.float64)
            slot_index = view.write_data(image)
            print(
                f"writer: image {image_index + 1}/{IMAGE_COUNT} -> slot {slot_index}",
                flush=True,
            )

            connector.xadd(
                MessageEndpoints.shared_memory_slot_written(),
                {
                    "data": messages.SharedMemSlotWritten(
                        client_id=CLIENT_ID,
                        signal=SIGNAL,
                        slot_index=slot_index,
                    )
                },
            )

            result = wait_for_result(connector, slot_index)
            print(
                f"writer: reader processed slot {slot_index}, result={result.result}",
                flush=True,
            )

        print("writer: done", flush=True)
    finally:
        if view is not None:
            view.close()
        if allocation_requested:
            print("writer: requesting deallocation", flush=True)
            connector.xadd(
                MessageEndpoints.shared_memory_deallocate(),
                {"client_id": CLIENT_ID, "signal": SIGNAL},
                max_size=1000,
            )
        connector.shutdown(per_thread_timeout_s=1)


if __name__ == "__main__":
    main()

Reader

#!/usr/bin/env python3
"""Small manual reader for the shared-memory manager demo."""

from __future__ import annotations

import time

import numpy as np

from bec_lib import messages
from bec_lib.endpoints import MessageEndpoints
from bec_lib.redis_connector import RedisConnector
from bec_server.shared_memory.ring_buffer import RingBufferView

REDIS_BOOTSTRAP = "localhost:6379"
CLIENT_ID = "shared-mem-demo"
SIGNAL = "demo.image"
EVENT_COUNT = 10
PROCESSING_DELAY = 1.0
WAIT_TIMEOUT = 30.0
POLL_INTERVAL = 0.1


def wait_for_view(connector: RedisConnector) -> RingBufferView:
    deadline = time.monotonic() + WAIT_TIMEOUT
    while time.monotonic() < deadline:
        info = connector.get(MessageEndpoints.shared_memory_info())
        if info is not None:
            shared_mem_info = info.info.get(CLIENT_ID, {}).get(SIGNAL)
            if shared_mem_info is not None:
                return RingBufferView(shared_mem_info.buffer_desc)
        time.sleep(POLL_INTERVAL)
    raise TimeoutError(
        f"Timed out waiting for shared memory allocation {CLIENT_ID}/{SIGNAL}."
    )


def main() -> None:
    connector = RedisConnector(
        REDIS_BOOTSTRAP, name="SharedMemoryDemoReader RedisConnector"
    )
    view: RingBufferView | None = None

    try:
        print(f"reader: waiting for {CLIENT_ID}/{SIGNAL}", flush=True)
        view = wait_for_view(connector)
        print(f"reader: attached to shared memory {view.name}", flush=True)

        processed = 0
        while processed < EVENT_COUNT:
            records = connector.xread(
                MessageEndpoints.shared_memory_slot_written(),
                block=1000,
                count=1,
                from_start=processed == 0,
            )
            if not records:
                continue

            event = records[0].get("data")
            if not isinstance(event, messages.SharedMemSlotWritten):
                continue
            if event.client_id != CLIENT_ID or event.signal != SIGNAL:
                continue

            data = view.copy_data(event.slot_index)
            print(
                f"reader: processing slot {event.slot_index}, shape={data.shape}",
                flush=True,
            )
            time.sleep(PROCESSING_DELAY)

            result = {"sum": float(np.sum(data))}
            connector.xadd(
                MessageEndpoints.shared_memory_slot_processed(),
                {
                    "data": messages.SharedMemSlotProcessed(
                        client_id=event.client_id,
                        signal=event.signal,
                        slot_index=event.slot_index,
                        result=result,
                    )
                },
            )

            processed += 1
            print(
                f"reader: processed {processed}/{EVENT_COUNT}, slot={event.slot_index}, result={result}",
                flush=True,
            )

        print("reader: done", flush=True)
    finally:
        if view is not None:
            view.close()
        connector.shutdown(per_thread_timeout_s=1)


if __name__ == "__main__":
    main()

@cappel89 cappel89 force-pushed the data-analysis-and-shared-memory-services branch 2 times, most recently from 1baef84 to db070a4 Compare May 27, 2026 13:22
@cappel89 cappel89 force-pushed the data-analysis-and-shared-memory-services branch from bf68ba2 to c4b68c7 Compare June 1, 2026 13:07
@cappel89 cappel89 requested a review from wakonig June 1, 2026 13:12
@cappel89 cappel89 marked this pull request as ready for review June 1, 2026 14:06
Copilot AI review requested due to automatic review settings June 1, 2026 14:06
Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR introduces a shared-memory ring buffer implementation in bec_server and wires it into BEC messaging/endpoints so device-server signals (writers) and external analysis services (readers) can exchange large payloads via shared memory while coordinating via Redis events.

Changes:

  • Added a POSIX-semaphore–backed shared-memory ring buffer (RingBuffer/RingBufferView) plus Pydantic descriptors for attaching to buffers.
  • Implemented a SharedMemoryManager service + SharedMemoryClient helper and a CLI launch entrypoint for running the manager.
  • Extended bec_lib message/endpoints contracts and added unit/integration-style tests for ring-buffer behavior and Redis event flow.

Reviewed changes

Copilot reviewed 12 out of 14 changed files in this pull request and generated 8 comments.

Show a summary per file
File Description
bec_server/bec_server/shared_memory/ring_buffer.py New shared-memory ring buffer implementation with POSIX semaphore locking.
bec_server/bec_server/shared_memory/models.py New Pydantic models describing payload dtype/shape and ring buffer descriptors.
bec_server/bec_server/shared_memory/manager.py New service that allocates/deallocates ring buffers and publishes allocation info.
bec_server/bec_server/shared_memory/client.py New client that subscribes to allocation info and reads/writes via attached views.
bec_server/bec_server/shared_memory/cli/launch.py CLI launcher for the shared memory manager service.
bec_server/bec_server/shared_memory/README.md Design/usage documentation for layout, locking, and ownership.
bec_server/tests/tests_shared_memory/test_ring_buffer.py Unit tests for descriptor integrity, locking semantics, and resource cleanup.
bec_server/tests/tests_shared_memory/test_ring_buffer_event_flow.py Multiprocess + fakeredis test validating slot-written/slot-processed flow.
bec_server/pyproject.toml Adds posix_ipc dependency for semaphore support.
bec_lib/bec_lib/messages.py Adds shared-memory-related BECMessage types (currently with a problematic dependency).
bec_lib/bec_lib/endpoints.py Adds Redis endpoints for shared-memory allocation/info and slot event streams.
bec_lib/tests/test_bec_messages.py Adds message round-trip + endpoint contract tests (currently imports bec_server).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +20 to +22
# TODO remove bec_server depencency..
from bec_server.shared_memory.models import PayloadDescriptor, SharedMemInfo

Comment on lines +8 to +15
from bec_lib.endpoints import MessageEndpoints, MessageOp
from bec_lib.serialization import MsgpackSerialization
from bec_server.shared_memory.models import (
DTypeDescriptor,
PayloadDescriptor,
RingBufferDescriptor,
SharedMemInfo,
)
Comment on lines +289 to +304
def __init__(self, slots: int, payload: PayloadDescriptor, name_suffix: str = ""):
if not 0 < slots:
raise ValueError("Ring buffer must contain at least one slot.")
name = f"bec_psm_{uuid4().hex[:6]}"
reader_count_name = f"{name}_cnt"
data_lock_names = tuple(self._semaphore_name(name, f"_d_{index}") for index in range(slots))
reader_gate_names = tuple(
self._semaphore_name(name, f"_g_{index}") for index in range(slots)
)
reader_count_lock_names = tuple(
self._semaphore_name(name, f"_c_{index}") for index in range(slots)
)
shm = shared_memory.SharedMemory(create=True, size=slots * payload.nbytes, name=name)
reader_count_shm = shared_memory.SharedMemory(
create=True, size=slots * READER_COUNT_DTYPE.itemsize, name=reader_count_name
)
Comment on lines +44 to +49
logger.error(
f"Shared memory object for client {request.client_id} and signal {request.signal} already exists. Overwriting."
)
# TODO should this republish the info?
self._publish_allocation_info(self._shared_memory_info)
return
Comment on lines +3 to +16
from typing import TYPE_CHECKING

from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_server.shared_memory.models import PayloadDescriptor
from bec_server.shared_memory.ring_buffer import RingBufferView

if TYPE_CHECKING:
import numpy as np

from bec_lib.connector import MessageObject
from bec_lib.messages import SharedMemAllocationInfo
from bec_lib.redis_connector import RedisConnector

Comment on lines +5 to +18
from typing import TYPE_CHECKING, Literal, Tuple

from bec_lib import messages
from bec_lib.bec_service import BECService
from bec_lib.endpoints import MessageEndpoints
from bec_lib.logger import bec_logger
from bec_server.shared_memory.models import SharedMemInfo
from bec_server.shared_memory.ring_buffer import RingBuffer

SUPPORTED_DATATYPES = Literal["str", "float", "byte", "np.array", "list", "dict"]

if TYPE_CHECKING:
from bec_lib.redis_connector import MessageObject, RedisConnector

Comment on lines +67 to +72
return cls(
nbytes=array.nbytes,
shape=array.shape,
dtype=DTypeDescriptor.from_numpy(array.dtype),
layout="C" if array.flags.c_contiguous else "C",
)
Comment on lines +47 to +49
logger.error(
f"Ring buffer view for signal {signal} already exists, should not happend. Received info update: {buff_info}"
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants