From 07777c7ba9c6cae9624cf141bf1d1feec2af980f Mon Sep 17 00:00:00 2001 From: Chuck Ketcham Date: Sun, 17 May 2026 22:00:45 +0000 Subject: [PATCH 1/7] [realtime] Add shared_ring_mode for cooperative HOST_LOOP + DEVICE_LOOP Enable HOST_LOOP (CPU thread) and DEVICE_LOOP (persistent GPU kernel) dispatchers to cooperate on a single shared ring buffer. Today both dispatchers DROP slots whose function_id is not in their own table (clearing rx_flags), which would race with the cooperator's processing. With shared_ring_mode=1, unknown function_ids are SKIPPED instead -- the local cursor advances but rx_flags stays set so the matching dispatcher can claim the slot. This is a prerequisite for QEC trio dispatch (enqueue_syndromes via HOST_LOOP-launched per-round CUDA graph; get_corrections and reset_decoder via DEVICE_LOOP __device__ functions) sharing one ring. Changes: * cudaq_realtime.h: add shared_ring_mode field on cudaq_dispatcher_config_t; declare new C API cudaq_dispatch_kernel_set_shared_ring_mode(). * host_dispatcher.cu: parse_slot_with_function_table distinguishes drop (bad magic, clear rx_flags) from skip (unknown fid under shared mode, leave rx_flags set). * dispatch_kernel.cu: add __constant__ g_dispatch_shared_ring_mode and cudaMemcpyToSymbol setter. Mirror the skip-vs-drop logic in all three kernel paths (cooperative dispatch_kernel_device_call_only, regular dispatch_kernel_device_call_only, dispatch_kernel_with_graph). * Opportunistic ring scan: when rx_value == 0 at the local cursor under shared_ring_mode, scan forward for any non-zero rx_flag and jump cursor to the match. Without this, dispatchers livelock at slots the peer just cleared. Added to both host and device paths. * cudaq_realtime_api.cpp: comment documenting that the consumer (not the .so) is responsible for calling cudaq_dispatch_kernel_set_shared_ring_mode() before cudaq_dispatcher_start(). Rationale: dispatch_kernel.cu lives in the hidden-visibility static lib cudaq-realtime-dispatch.a which is --exclude-libs=ALL'd from libcudaq-realtime.so; the .so cannot reach the symbol, but consumers that link the .a directly can. Tests: * test_shared_ring_dispatchers.cu (new): brings up HOST_LOOP + DEVICE_LOOP on one pinned-mapped ring buffer with a 2-entry shared function table (GRAPH_LAUNCH at fid_A handled by HOST_LOOP, DEVICE_CALL at fid_B handled by DEVICE_LOOP). Interleaves 4 RPCs across the table; asserts each completes with the correct dispatcher's transformation and that each dispatcher's slot-count stats are exactly 2. * test_dispatch_kernel, test_host_dispatcher: continue to pass unchanged (shared_ring_mode defaults to 0, preserving today's drop-on-unknown behavior). Signed-off-by: Chuck Ketcham --- .../daemon/dispatcher/cudaq_realtime.h | 29 + .../daemon/dispatcher/cudaq_realtime_api.cpp | 14 + .../lib/daemon/dispatcher/dispatch_kernel.cu | 333 ++++++---- .../lib/daemon/dispatcher/host_dispatcher.cu | 63 +- realtime/unittests/CMakeLists.txt | 24 + .../unittests/test_shared_ring_dispatchers.cu | 623 ++++++++++++++++++ 6 files changed, 969 insertions(+), 117 deletions(-) create mode 100644 realtime/unittests/test_shared_ring_dispatchers.cu diff --git a/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h b/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h index 4fbdc00ca9a..0d6aba79548 100644 --- a/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h +++ b/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h @@ -108,6 +108,21 @@ typedef struct { // external GPU kernel (e.g. Hololink TX) polls the // same tx_flags array; the sentinel would be // misinterpreted as a valid address. + uint32_t shared_ring_mode; // when non-zero, the dispatcher cooperates with + // OTHER dispatchers on the SAME ring buffer. + // Slots whose function_id is not in this + // dispatcher's function table (or is in the + // table but does not match this dispatcher's + // expected dispatch_mode) are SKIPPED without + // clearing rx_flags -- the local cursor + // advances, leaving the slot for another + // dispatcher to pick up. When zero (default), + // legacy behavior: unknown / wrong-mode slots + // are DROPPED (rx_flags cleared). Both + // dispatchers sharing a ring must set this to + // non-zero; the partitioning invariant is that + // each function_id appears in AT MOST ONE + // dispatcher's function table. } cudaq_dispatcher_config_t; // GPU ring buffer pointers. For device backend use device pointers only. @@ -373,6 +388,20 @@ cudaError_t cudaq_dispatch_kernel_cooperative_query_occupancy(int *out_blocks, uint32_t threads_per_block); +// Push the shared_ring_mode flag into the DEVICE_LOOP kernel's __constant__ +// memory. Must be called BEFORE cudaq_dispatcher_start() launches the +// device kernel; otherwise the kernel will start with shared_ring_mode=0. +// +// IMPORTANT: cudaq_dispatcher_start() does NOT call this for you. The +// __constant__ symbol lives in libcudaq-realtime-dispatch.a, which is +// linked directly into consumers (not into libcudaq-realtime.so), so the +// dispatcher manager cannot reach the symbol from inside the shared +// library. Consumers that set config.shared_ring_mode = 1 must also call +// cudaq_dispatch_kernel_set_shared_ring_mode(1) before starting the +// dispatcher. The HOST_LOOP path reads config.shared_ring_mode directly +// and does NOT require this call. +cudaError_t cudaq_dispatch_kernel_set_shared_ring_mode(uint32_t enabled); + #ifdef __cplusplus } #endif diff --git a/realtime/lib/daemon/dispatcher/cudaq_realtime_api.cpp b/realtime/lib/daemon/dispatcher/cudaq_realtime_api.cpp index 57737e17308..10d8542fbb6 100644 --- a/realtime/lib/daemon/dispatcher/cudaq_realtime_api.cpp +++ b/realtime/lib/daemon/dispatcher/cudaq_realtime_api.cpp @@ -222,6 +222,20 @@ cudaq_status_t cudaq_dispatcher_start(cudaq_dispatcher_t *dispatcher) { if (cudaStreamCreate(&dispatcher->stream) != cudaSuccess) return CUDAQ_ERR_CUDA; + // NOTE on config.shared_ring_mode for DEVICE_LOOP: + // + // The device dispatch kernel reads shared_ring_mode from a __constant__ + // symbol that lives in libcudaq-realtime-dispatch.a (the static lib). + // libcudaq-realtime.so does NOT link the static lib (architecturally + // separate: consumers link the static lib themselves), so we cannot + // call cudaq_dispatch_kernel_set_shared_ring_mode() from here. + // + // Callers that want shared_ring_mode for DEVICE_LOOP must invoke + // cudaq_dispatch_kernel_set_shared_ring_mode(1) themselves BEFORE + // cudaq_dispatcher_start(). The HOST_LOOP path reads + // config.shared_ring_mode directly from this struct (it has no + // __constant__ indirection) -- nothing needed here. + if (dispatcher->config.kernel_type == CUDAQ_KERNEL_UNIFIED) { dispatcher->unified_launch_fn( dispatcher->transport_ctx, dispatcher->table.entries, diff --git a/realtime/lib/daemon/dispatcher/dispatch_kernel.cu b/realtime/lib/daemon/dispatcher/dispatch_kernel.cu index 82c3e030172..65fb7a8f98a 100644 --- a/realtime/lib/daemon/dispatcher/dispatch_kernel.cu +++ b/realtime/lib/daemon/dispatcher/dispatch_kernel.cu @@ -23,6 +23,14 @@ namespace cudaq::realtime { // Dispatch Kernel Implementation (compiled into libcudaq-realtime.so) //============================================================================== +/// @brief Shared-ring-mode flag pushed from the host via +/// cudaq_dispatch_kernel_set_shared_ring_mode(). When non-zero, the device +/// dispatcher SKIPS slots whose function_id is not in its function table +/// (cursor advances, rx_flags NOT cleared) so a peer dispatcher on the same +/// ring buffer can pick them up. When zero (default), the dispatcher +/// DROPS unknown slots (clears rx_flags). +__constant__ std::uint32_t g_dispatch_shared_ring_mode = 0; + /// @brief Lookup function entry in table by function_id. __device__ inline const cudaq_function_entry_t* dispatch_lookup_entry( std::uint32_t function_id, @@ -96,9 +104,34 @@ __global__ void dispatch_kernel_device_call_only( while (!(*shutdown_flag)) { // --- Phase 1: Thread 0 polls and parses --- + // Skip / drop disposition for the polled slot (only meaningful to + // tid == 0). When `skip_slot` is true the cursor advances WITHOUT + // clearing rx_flags -- a peer dispatcher on a shared ring will + // handle the request. When `drop_slot` is true the cursor advances + // AND rx_flags is cleared (bad magic, or legacy unknown-function + // path). + bool skip_slot = false; + bool drop_slot = false; if (tid == 0) { s_have_work = false; std::uint64_t rx_value = rx_flags[current_slot]; + // Under shared_ring_mode, scan the ring for non-zero rx_flag if + // our cursor sees 0 (the peer may have cleared the slot at our + // cursor). + if (rx_value == 0 && g_dispatch_shared_ring_mode) { + std::size_t probe = (current_slot + 1) % num_slots; + std::size_t scanned = 0; + while (scanned < num_slots - 1) { + std::uint64_t v = rx_flags[probe]; + if (v != 0) { + current_slot = probe; + rx_value = v; + break; + } + probe = (probe + 1) % num_slots; + ++scanned; + } + } if (rx_value != 0) { void* rx_slot = reinterpret_cast(rx_value); RPCHeader* header = static_cast(rx_slot); @@ -128,9 +161,20 @@ __global__ void dispatch_kernel_device_call_only( d_request_id = s_request_id; d_ptp_timestamp = s_ptp_timestamp; d_have_work = true; + } else if (g_dispatch_shared_ring_mode) { + // shared_ring_mode: function not in our table OR wrong mode + // -> SKIP without clearing rx_flags so the peer dispatcher + // can pick it up. + skip_slot = true; + } else { + // Legacy: drop unknown / wrong-mode slot. + drop_slot = true; } + } else { + // Bad magic -- always drop regardless of shared_ring_mode. + drop_slot = true; } - if (!s_have_work) { + if (drop_slot) { rx_flags[current_slot] = 0; } } @@ -179,28 +223,34 @@ __global__ void dispatch_kernel_device_call_only( // --- Phase 4: Sync, then thread 0 writes response --- KernelType::sync(); - if (tid == 0 && have_work) { - std::uint8_t* tx_slot = tx_data + current_slot * tx_stride_sz; - RPCResponse* response = reinterpret_cast(tx_slot); - response->magic = RPC_MAGIC_RESPONSE; - response->status = status; - response->result_len = result_len; - response->request_id = request_id; - response->ptp_timestamp = ptp_timestamp; + if (tid == 0) { + if (have_work) { + std::uint8_t* tx_slot = tx_data + current_slot * tx_stride_sz; + RPCResponse* response = reinterpret_cast(tx_slot); + response->magic = RPC_MAGIC_RESPONSE; + response->status = status; + response->result_len = result_len; + response->request_id = request_id; + response->ptp_timestamp = ptp_timestamp; - while (tx_flags[current_slot] != 0 && !(*shutdown_flag)) - ; + while (tx_flags[current_slot] != 0 && !(*shutdown_flag)) + ; - __threadfence(); - tx_flags[current_slot] = reinterpret_cast(tx_slot); + __threadfence(); + tx_flags[current_slot] = reinterpret_cast(tx_slot); - rx_flags[current_slot] = 0; - local_packet_count++; - current_slot = (current_slot + 1) % num_slots; - } + rx_flags[current_slot] = 0; + local_packet_count++; + current_slot = (current_slot + 1) % num_slots; + } else if (skip_slot || drop_slot) { + // Advance past the slot we just skipped/dropped. For drop_slot, + // rx_flags was already cleared during Phase 1. For skip_slot, + // rx_flags is intentionally left set so a peer dispatcher on a + // shared ring can pick it up. + current_slot = (current_slot + 1) % num_slots; + } - // Reset device-memory work flag for next iteration - if (tid == 0) { + // Reset device-memory work flag for next iteration d_have_work = false; } @@ -208,60 +258,84 @@ __global__ void dispatch_kernel_device_call_only( } } else { //========================================================================== - // Regular path: only thread 0 calls the handler (unchanged). + // Regular path: only thread 0 calls the handler. //========================================================================== while (!(*shutdown_flag)) { if (tid == 0) { std::uint64_t rx_value = rx_flags[current_slot]; + // Under shared_ring_mode, rx_value == 0 at our cursor does NOT + // mean "no work" -- the peer dispatcher may have cleared this + // slot. Scan the ring for ANY non-zero rx_flag and jump our + // cursor there. + if (rx_value == 0 && g_dispatch_shared_ring_mode) { + std::size_t probe = (current_slot + 1) % num_slots; + std::size_t scanned = 0; + while (scanned < num_slots - 1) { + std::uint64_t v = rx_flags[probe]; + if (v != 0) { + current_slot = probe; + rx_value = v; + break; + } + probe = (probe + 1) % num_slots; + ++scanned; + } + } if (rx_value != 0) { // RX data address comes from rx_flags (set by Hololink RX kernel // or host test harness to the address of the RX data slot) void* rx_slot = reinterpret_cast(rx_value); RPCHeader* header = static_cast(rx_slot); if (header->magic != RPC_MAGIC_REQUEST) { + // Bad magic -- always drop and advance. rx_flags[current_slot] = 0; - continue; - } + current_slot = (current_slot + 1) % num_slots; + } else { + std::uint32_t function_id = header->function_id; + std::uint32_t arg_len = header->arg_len; + void* arg_buffer = static_cast(header + 1); - std::uint32_t function_id = header->function_id; - std::uint32_t arg_len = header->arg_len; - void* arg_buffer = static_cast(header + 1); + const cudaq_function_entry_t* entry = dispatch_lookup_entry( + function_id, function_table, func_count); - const cudaq_function_entry_t* entry = dispatch_lookup_entry( - function_id, function_table, func_count); + if (entry != nullptr && + entry->dispatch_mode == CUDAQ_DISPATCH_DEVICE_CALL) { + DeviceRPCFunction func = + reinterpret_cast(entry->handler.device_fn_ptr); - if (entry != nullptr && entry->dispatch_mode == CUDAQ_DISPATCH_DEVICE_CALL) { - DeviceRPCFunction func = - reinterpret_cast(entry->handler.device_fn_ptr); - - // Compute TX slot address from symmetric TX data buffer - std::uint8_t* tx_slot = tx_data + current_slot * tx_stride_sz; - - // Handler writes results directly to TX slot (after response header) - std::uint8_t* output_buffer = tx_slot + sizeof(RPCResponse); - std::uint32_t result_len = 0; - std::uint32_t max_result_len = tx_stride_sz - sizeof(RPCResponse); - int status = func(arg_buffer, output_buffer, arg_len, - max_result_len, &result_len); - - // Write RPC response header to TX slot - RPCResponse* response = reinterpret_cast(tx_slot); - response->magic = RPC_MAGIC_RESPONSE; - response->status = status; - response->result_len = result_len; - response->request_id = header->request_id; - response->ptp_timestamp = header->ptp_timestamp; - - while (tx_flags[current_slot] != 0 && !(*shutdown_flag)) - ; - - __threadfence(); - tx_flags[current_slot] = reinterpret_cast(tx_slot); - } + std::uint8_t* tx_slot = tx_data + current_slot * tx_stride_sz; + std::uint8_t* output_buffer = tx_slot + sizeof(RPCResponse); + std::uint32_t result_len = 0; + std::uint32_t max_result_len = tx_stride_sz - sizeof(RPCResponse); + int status = func(arg_buffer, output_buffer, arg_len, + max_result_len, &result_len); + + RPCResponse* response = reinterpret_cast(tx_slot); + response->magic = RPC_MAGIC_RESPONSE; + response->status = status; + response->result_len = result_len; + response->request_id = header->request_id; + response->ptp_timestamp = header->ptp_timestamp; + + while (tx_flags[current_slot] != 0 && !(*shutdown_flag)) + ; - rx_flags[current_slot] = 0; - local_packet_count++; - current_slot = (current_slot + 1) % num_slots; + __threadfence(); + tx_flags[current_slot] = reinterpret_cast(tx_slot); + + rx_flags[current_slot] = 0; + local_packet_count++; + current_slot = (current_slot + 1) % num_slots; + } else if (g_dispatch_shared_ring_mode) { + // shared_ring_mode: function not ours -> SKIP without + // clearing rx_flags so the peer dispatcher can handle it. + current_slot = (current_slot + 1) % num_slots; + } else { + // Legacy: drop unknown / wrong-mode slot and advance. + rx_flags[current_slot] = 0; + current_slot = (current_slot + 1) % num_slots; + } + } } } @@ -298,71 +372,99 @@ __global__ void dispatch_kernel_with_graph( while (!(*shutdown_flag)) { if (tid == 0) { std::uint64_t rx_value = rx_flags[current_slot]; + // Under shared_ring_mode, scan the ring for non-zero rx_flag if our + // cursor sees 0 (the peer may have cleared the slot at our cursor). + if (rx_value == 0 && g_dispatch_shared_ring_mode) { + std::size_t probe = (current_slot + 1) % num_slots; + std::size_t scanned = 0; + while (scanned < num_slots - 1) { + std::uint64_t v = rx_flags[probe]; + if (v != 0) { + current_slot = probe; + rx_value = v; + break; + } + probe = (probe + 1) % num_slots; + ++scanned; + } + } if (rx_value != 0) { void* rx_slot = reinterpret_cast(rx_value); RPCHeader* header = static_cast(rx_slot); if (header->magic != RPC_MAGIC_REQUEST) { + // Bad magic -- always drop and advance. rx_flags[current_slot] = 0; - continue; - } + current_slot = (current_slot + 1) % num_slots; + } else { + std::uint32_t function_id = header->function_id; + std::uint32_t arg_len = header->arg_len; + void* arg_buffer = static_cast(header + 1); + + const cudaq_function_entry_t* entry = dispatch_lookup_entry( + function_id, function_table, func_count); + + // Compute TX slot address from symmetric TX data buffer + std::uint8_t* tx_slot = tx_data + current_slot * tx_stride_sz; + + bool handled = false; + if (entry != nullptr) { + if (entry->dispatch_mode == CUDAQ_DISPATCH_DEVICE_CALL) { + DeviceRPCFunction func = + reinterpret_cast(entry->handler.device_fn_ptr); + + std::uint8_t* output_buffer = tx_slot + sizeof(RPCResponse); + std::uint32_t result_len = 0; + std::uint32_t max_result_len = tx_stride_sz - sizeof(RPCResponse); + int status = func(arg_buffer, output_buffer, arg_len, + max_result_len, &result_len); + + RPCResponse* response = reinterpret_cast(tx_slot); + response->magic = RPC_MAGIC_RESPONSE; + response->status = status; + response->result_len = result_len; + response->request_id = header->request_id; + response->ptp_timestamp = header->ptp_timestamp; + + while (tx_flags[current_slot] != 0 && !(*shutdown_flag)) + ; - std::uint32_t function_id = header->function_id; - std::uint32_t arg_len = header->arg_len; - void* arg_buffer = static_cast(header + 1); - - const cudaq_function_entry_t* entry = dispatch_lookup_entry( - function_id, function_table, func_count); - - // Compute TX slot address from symmetric TX data buffer - std::uint8_t* tx_slot = tx_data + current_slot * tx_stride_sz; - - if (entry != nullptr) { - if (entry->dispatch_mode == CUDAQ_DISPATCH_DEVICE_CALL) { - DeviceRPCFunction func = - reinterpret_cast(entry->handler.device_fn_ptr); - - // Handler writes results directly to TX slot (after response header) - std::uint8_t* output_buffer = tx_slot + sizeof(RPCResponse); - std::uint32_t result_len = 0; - std::uint32_t max_result_len = tx_stride_sz - sizeof(RPCResponse); - int status = func(arg_buffer, output_buffer, arg_len, - max_result_len, &result_len); - - // Write RPC response to TX slot - RPCResponse* response = reinterpret_cast(tx_slot); - response->magic = RPC_MAGIC_RESPONSE; - response->status = status; - response->result_len = result_len; - response->request_id = header->request_id; - response->ptp_timestamp = header->ptp_timestamp; - - while (tx_flags[current_slot] != 0 && !(*shutdown_flag)) - ; - - __threadfence(); - tx_flags[current_slot] = reinterpret_cast(tx_slot); - } -#if __CUDA_ARCH__ >= 900 - else if (entry->dispatch_mode == CUDAQ_DISPATCH_GRAPH_LAUNCH) { - if (graph_io_ctx != nullptr) { - graph_io_ctx->rx_slot = rx_slot; - graph_io_ctx->tx_slot = tx_slot; - graph_io_ctx->tx_flag = &tx_flags[current_slot]; - graph_io_ctx->tx_flag_value = - reinterpret_cast(tx_slot); - graph_io_ctx->tx_stride_sz = tx_stride_sz; __threadfence(); + tx_flags[current_slot] = reinterpret_cast(tx_slot); + handled = true; + } +#if __CUDA_ARCH__ >= 900 + else if (entry->dispatch_mode == CUDAQ_DISPATCH_GRAPH_LAUNCH) { + if (graph_io_ctx != nullptr) { + graph_io_ctx->rx_slot = rx_slot; + graph_io_ctx->tx_slot = tx_slot; + graph_io_ctx->tx_flag = &tx_flags[current_slot]; + graph_io_ctx->tx_flag_value = + reinterpret_cast(tx_slot); + graph_io_ctx->tx_stride_sz = tx_stride_sz; + __threadfence(); + } + + cudaGraphLaunch(entry->handler.graph_exec, + cudaStreamGraphFireAndForget); + handled = true; } +#endif // __CUDA_ARCH__ >= 900 + } - cudaGraphLaunch(entry->handler.graph_exec, - cudaStreamGraphFireAndForget); + if (handled) { + rx_flags[current_slot] = 0; + local_packet_count++; + current_slot = (current_slot + 1) % num_slots; + } else if (g_dispatch_shared_ring_mode) { + // shared_ring_mode: function not ours -> SKIP without clearing + // rx_flags so the peer dispatcher can handle it. + current_slot = (current_slot + 1) % num_slots; + } else { + // Legacy: drop unknown / unhandled slot and advance. + rx_flags[current_slot] = 0; + current_slot = (current_slot + 1) % num_slots; } -#endif // __CUDA_ARCH__ >= 900 } - - rx_flags[current_slot] = 0; - local_packet_count++; - current_slot = (current_slot + 1) % num_slots; } } @@ -407,6 +509,13 @@ extern "C" cudaError_t cudaq_dispatch_kernel_cooperative_query_occupancy( return cudaSuccess; } +extern "C" cudaError_t +cudaq_dispatch_kernel_set_shared_ring_mode(uint32_t enabled) { + return cudaMemcpyToSymbol(cudaq::realtime::g_dispatch_shared_ring_mode, + &enabled, sizeof(enabled), 0, + cudaMemcpyHostToDevice); +} + extern "C" void cudaq_launch_dispatch_kernel_regular( volatile std::uint64_t* rx_flags, volatile std::uint64_t* tx_flags, diff --git a/realtime/lib/daemon/dispatcher/host_dispatcher.cu b/realtime/lib/daemon/dispatcher/host_dispatcher.cu index b79b03eb9f6..43b539528f9 100644 --- a/realtime/lib/daemon/dispatcher/host_dispatcher.cu +++ b/realtime/lib/daemon/dispatcher/host_dispatcher.cu @@ -56,7 +56,9 @@ find_idle_graph_worker_for_function(const cudaq_host_dispatch_loop_ctx_t *ctx, struct ParsedSlot { uint32_t function_id = 0; const cudaq_function_entry_t *entry = nullptr; - bool drop = false; + bool drop = false; // bad header -- clear rx_flags and advance + bool skip = false; // function not in our table -- advance WITHOUT clearing + // (only set when shared_ring_mode is non-zero) }; static ParsedSlot @@ -71,8 +73,12 @@ parse_slot_with_function_table(void *slot_host, out.function_id = header->function_id; out.entry = lookup_function(ctx->function_table.entries, ctx->function_table.count, out.function_id); - if (!out.entry) - out.drop = true; + if (!out.entry) { + if (ctx->config.shared_ring_mode) + out.skip = true; + else + out.drop = true; + } return out; } @@ -197,8 +203,42 @@ cudaq_host_dispatcher_loop(const cudaq_host_dispatch_loop_ctx_t *ctx) { if (rx_value == 0) { if (!ctx->skip_stream_sweep) sweep_completed_workers(ctx); - CUDAQ_REALTIME_CPU_RELAX(); - continue; + // Under shared_ring_mode, rx_value == 0 at our local cursor does NOT + // mean "no work anywhere on the ring" -- the peer dispatcher may + // have cleared this slot after handling it. Scan the rest of the + // ring looking for ANY non-zero rx_flag; if we find one, jump our + // cursor there. If we wrap all the way back without finding any, + // fall through to the normal CPU_RELAX wait. + if (ctx->config.shared_ring_mode) { + size_t probe = (current_slot + 1) % num_slots; + size_t scanned = 0; + while (scanned < num_slots - 1) { + uint64_t v = as_atomic_u64(ctx->ringbuffer.rx_flags_host)[probe] + .load(cuda::std::memory_order_acquire); + if (v != 0) { + current_slot = probe; + break; + } + probe = (probe + 1) % num_slots; + ++scanned; + } + if (scanned >= num_slots - 1) { + // Truly idle: no slot has work for anyone right now. + CUDAQ_REALTIME_CPU_RELAX(); + continue; + } + // Re-load rx_value at the new cursor position and fall through. + rx_value = + as_atomic_u64(ctx->ringbuffer.rx_flags_host)[current_slot].load( + cuda::std::memory_order_acquire); + if (rx_value == 0) { + CUDAQ_REALTIME_CPU_RELAX(); + continue; + } + } else { + CUDAQ_REALTIME_CPU_RELAX(); + continue; + } } void *slot_host = reinterpret_cast(rx_value); @@ -214,11 +254,24 @@ cudaq_host_dispatcher_loop(const cudaq_host_dispatch_loop_ctx_t *ctx) { current_slot = (current_slot + 1) % num_slots; continue; } + if (parsed.skip) { + // shared_ring_mode: leave rx_flags set so a peer dispatcher can pick + // this slot up; just advance our local cursor. + current_slot = (current_slot + 1) % num_slots; + continue; + } function_id = parsed.function_id; entry = parsed.entry; } if (entry && entry->dispatch_mode != CUDAQ_DISPATCH_GRAPH_LAUNCH) { + if (ctx->config.shared_ring_mode) { + // Entry is in our table but is not a GRAPH_LAUNCH (e.g. a DEVICE_CALL + // entry registered for a peer dispatcher). Under shared_ring_mode + // the peer will service it -- skip without clearing rx_flags. + current_slot = (current_slot + 1) % num_slots; + continue; + } as_atomic_u64(ctx->ringbuffer.rx_flags_host)[current_slot].store( 0, cuda::std::memory_order_release); current_slot = (current_slot + 1) % num_slots; diff --git a/realtime/unittests/CMakeLists.txt b/realtime/unittests/CMakeLists.txt index 3ea6c764e89..cbf756d6893 100644 --- a/realtime/unittests/CMakeLists.txt +++ b/realtime/unittests/CMakeLists.txt @@ -100,6 +100,30 @@ if(CMAKE_CUDA_COMPILER) TEST_PREFIX "test_host_dispatcher." ) message(STATUS " - test_host_dispatcher (host dispatcher loop)") + + # Shared-ring-mode end-to-end test: HOST_LOOP + DEVICE_LOOP on ONE ring. + add_executable(test_shared_ring_dispatchers test_shared_ring_dispatchers.cu) + set_target_properties(test_shared_ring_dispatchers PROPERTIES + CUDA_SEPARABLE_COMPILATION ON + CUDA_STANDARD 17 + ) + target_include_directories(test_shared_ring_dispatchers PRIVATE + ${CUDAToolkit_INCLUDE_DIRS} + ${CUDAQ_REALTIME_INCLUDE_DIR} + ) + target_link_libraries(test_shared_ring_dispatchers PRIVATE + GTest::gtest_main + CUDA::cudart + cudaq-realtime + cudaq-realtime-dispatch + cudaq-realtime-host-dispatch + ${CUDADEVRT_LIBRARY} + ) + add_dependencies(CudaqRealtimeUnitTests test_shared_ring_dispatchers) + gtest_discover_tests(test_shared_ring_dispatchers + TEST_PREFIX "test_shared_ring_dispatchers." + ) + message(STATUS " - test_shared_ring_dispatchers (HOST_LOOP + DEVICE_LOOP on one ring)") endif() # ============================================================================== diff --git a/realtime/unittests/test_shared_ring_dispatchers.cu b/realtime/unittests/test_shared_ring_dispatchers.cu new file mode 100644 index 00000000000..8d7821e9590 --- /dev/null +++ b/realtime/unittests/test_shared_ring_dispatchers.cu @@ -0,0 +1,623 @@ +/****************************************************************-*- C++ -*-**** + * Copyright (c) 2026 NVIDIA Corporation & Affiliates. * + * All rights reserved. * + * * + * This source code and the accompanying materials are made available under * + * the terms of the Apache License 2.0 which accompanies this distribution. * + ******************************************************************************/ + +// Shared-ring-mode test: brings up BOTH a HOST_LOOP CPU dispatcher AND a +// DEVICE_LOOP persistent GPU dispatcher on the SAME ring buffer, with +// shared_ring_mode = 1 on both. The HOST_LOOP owns one function_id (a +// GRAPH_LAUNCH entry) and the DEVICE_LOOP owns a different function_id (a +// DEVICE_CALL entry). Producer interleaves requests for the two function_ids +// across slots; the test verifies that each dispatcher services its OWN +// requests and SKIPS the peer's slots without clobbering rx_flags. + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "cudaq/realtime/daemon/dispatcher/cudaq_realtime.h" +#include "cudaq/realtime/daemon/dispatcher/dispatch_kernel_launch.h" +#include "cudaq/realtime/daemon/dispatcher/host_dispatcher.h" + +#define CUDA_CHECK(call) \ + do { \ + cudaError_t err = call; \ + ASSERT_EQ(err, cudaSuccess) << "CUDA error: " << cudaGetErrorString(err); \ + } while (0) + +namespace { + +constexpr std::size_t kNumSlots = 8; +constexpr std::size_t kSlotSize = 256; + +// function_id for the GRAPH_LAUNCH entry owned by the HOST_LOOP dispatcher. +constexpr std::uint32_t HOST_GRAPH_FN_ID = + cudaq::realtime::fnv1a_hash("shared_ring_host_increment"); + +// function_id for the DEVICE_CALL entry owned by the DEVICE_LOOP dispatcher. +constexpr std::uint32_t DEVICE_CALL_FN_ID = + cudaq::realtime::fnv1a_hash("shared_ring_device_double"); + +//============================================================================== +// Ring buffer / control buffer helpers +//============================================================================== + +bool allocate_ring_buffer(std::size_t num_slots, std::size_t slot_size, + volatile uint64_t** host_flags_out, + volatile uint64_t** device_flags_out, + std::uint8_t** host_data_out, + std::uint8_t** device_data_out) { + void* host_flags_ptr = nullptr; + if (cudaHostAlloc(&host_flags_ptr, num_slots * sizeof(uint64_t), + cudaHostAllocMapped) != cudaSuccess) + return false; + + void* device_flags_ptr = nullptr; + if (cudaHostGetDevicePointer(&device_flags_ptr, host_flags_ptr, 0) != + cudaSuccess) { + cudaFreeHost(host_flags_ptr); + return false; + } + + void* host_data_ptr = nullptr; + if (cudaHostAlloc(&host_data_ptr, num_slots * slot_size, + cudaHostAllocMapped) != cudaSuccess) { + cudaFreeHost(host_flags_ptr); + return false; + } + + void* device_data_ptr = nullptr; + if (cudaHostGetDevicePointer(&device_data_ptr, host_data_ptr, 0) != + cudaSuccess) { + cudaFreeHost(host_flags_ptr); + cudaFreeHost(host_data_ptr); + return false; + } + + std::memset(host_flags_ptr, 0, num_slots * sizeof(uint64_t)); + std::memset(host_data_ptr, 0, num_slots * slot_size); + + *host_flags_out = static_cast(host_flags_ptr); + *device_flags_out = static_cast(device_flags_ptr); + *host_data_out = static_cast(host_data_ptr); + *device_data_out = static_cast(device_data_ptr); + return true; +} + +void free_ring_buffer(volatile uint64_t* host_flags, std::uint8_t* host_data) { + if (host_flags) + cudaFreeHost(const_cast(host_flags)); + if (host_data) + cudaFreeHost(host_data); +} + +//============================================================================== +// HOST_LOOP graph kernel: reads RPC slot via mailbox, writes incremented bytes +// to the TX slot. The dispatcher fills a GraphIOContext per-launch via its +// io_ctxs path; we use the simpler "mailbox holds raw RX slot pointer" mode +// and write the response in-place (legacy single-buffer mode is fine since +// we wire rx_data and tx_data to the SAME backing memory). +//============================================================================== + +__global__ void host_graph_increment_kernel(void** mailbox_slot_ptr) { + if (threadIdx.x == 0 && blockIdx.x == 0) { + void* buffer = *mailbox_slot_ptr; + cudaq::realtime::RPCHeader* header = + static_cast(buffer); + std::uint32_t arg_len = header->arg_len; + std::uint32_t request_id = header->request_id; + std::uint8_t* data = static_cast(buffer) + + sizeof(cudaq::realtime::RPCHeader); + for (std::uint32_t i = 0; i < arg_len; ++i) + data[i] = data[i] + 1; + cudaq::realtime::RPCResponse* response = + static_cast(buffer); + response->magic = cudaq::realtime::RPC_MAGIC_RESPONSE; + response->status = 0; + response->result_len = arg_len; + response->request_id = request_id; + } +} + +bool create_host_graph(void** d_mailbox_bank, cudaGraph_t* graph_out, + cudaGraphExec_t* exec_out) { + cudaGraph_t graph = nullptr; + if (cudaGraphCreate(&graph, 0) != cudaSuccess) + return false; + + cudaKernelNodeParams params = {}; + void* kernel_args[] = {&d_mailbox_bank}; + params.func = reinterpret_cast(host_graph_increment_kernel); + params.gridDim = dim3(1, 1, 1); + params.blockDim = dim3(32, 1, 1); + params.sharedMemBytes = 0; + params.kernelParams = kernel_args; + params.extra = nullptr; + + cudaGraphNode_t node = nullptr; + if (cudaGraphAddKernelNode(&node, graph, nullptr, 0, ¶ms) != + cudaSuccess) { + cudaGraphDestroy(graph); + return false; + } + + cudaGraphExec_t exec = nullptr; + if (cudaGraphInstantiate(&exec, graph, nullptr, nullptr, 0) != cudaSuccess) { + cudaGraphDestroy(graph); + return false; + } + + *graph_out = graph; + *exec_out = exec; + return true; +} + +//============================================================================== +// DEVICE_LOOP device-call handler: doubles each byte. +//============================================================================== + +__device__ int device_double_handler(const void* input, void* output, + std::uint32_t arg_len, + std::uint32_t max_result_len, + std::uint32_t* result_len) { + const std::uint8_t* in = static_cast(input); + std::uint8_t* out = static_cast(output); + std::uint32_t n = arg_len; + if (n > max_result_len) + n = max_result_len; + for (std::uint32_t i = 0; i < n; ++i) + out[i] = static_cast(in[i] * 2); + *result_len = n; + return 0; +} + +// Populate the device function table: +// entry 0: GRAPH_LAUNCH owned by HOST_LOOP (handler.graph_exec set below) +// entry 1: DEVICE_CALL owned by DEVICE_LOOP (handler.device_fn_ptr = +// device_double_handler) +// +// Both dispatchers share the SAME function table. HOST_LOOP iterates and +// only routes GRAPH_LAUNCH entries; DEVICE_LOOP only routes DEVICE_CALL +// entries. Under shared_ring_mode this means each peer naturally skips the +// other's slots. +__global__ void init_shared_function_table(cudaq_function_entry_t* entries, + cudaGraphExec_t host_graph_exec) { + if (threadIdx.x == 0 && blockIdx.x == 0) { + entries[0].handler.graph_exec = host_graph_exec; + entries[0].function_id = HOST_GRAPH_FN_ID; + entries[0].dispatch_mode = CUDAQ_DISPATCH_GRAPH_LAUNCH; + entries[0].reserved[0] = 0; + entries[0].reserved[1] = 0; + entries[0].reserved[2] = 0; + + entries[1].handler.device_fn_ptr = + reinterpret_cast(&device_double_handler); + entries[1].function_id = DEVICE_CALL_FN_ID; + entries[1].dispatch_mode = CUDAQ_DISPATCH_DEVICE_CALL; + entries[1].reserved[0] = 0; + entries[1].reserved[1] = 0; + entries[1].reserved[2] = 0; + } +} + +//============================================================================== +// Test fixture +//============================================================================== + +class SharedRingDispatcherTest : public ::testing::Test { +protected: + void SetUp() override { + // -- Ring buffer. RX and TX share the same backing memory: this is + // valid for the HOST_LOOP's "legacy mailbox" path (graph kernel + // writes response in-place into the RX slot), and the DEVICE_LOOP + // kernel will then read its own request and overwrite it with the + // response in the same slot. TX flags are a separate allocation. -- + ASSERT_TRUE(allocate_ring_buffer(kNumSlots, kSlotSize, &rx_flags_host_, + &rx_flags_dev_, &rx_data_host_, + &rx_data_dev_)); + void* tx_flags_host_ptr = nullptr; + CUDA_CHECK(cudaHostAlloc(&tx_flags_host_ptr, kNumSlots * sizeof(uint64_t), + cudaHostAllocMapped)); + std::memset(tx_flags_host_ptr, 0, kNumSlots * sizeof(uint64_t)); + tx_flags_host_ = static_cast(tx_flags_host_ptr); + void* tx_flags_dev_ptr = nullptr; + CUDA_CHECK(cudaHostGetDevicePointer(&tx_flags_dev_ptr, tx_flags_host_ptr, + 0)); + tx_flags_dev_ = static_cast(tx_flags_dev_ptr); + // RX and TX data buffers point to the SAME backing memory. + tx_data_host_ = rx_data_host_; + tx_data_dev_ = rx_data_dev_; + tx_data_is_owned_ = false; + + // -- Shutdown flag (pinned mapped so both CPU and GPU see it) -- + void* tmp = nullptr; + CUDA_CHECK(cudaHostAlloc(&tmp, sizeof(int), cudaHostAllocMapped)); + shutdown_flag_host_ = static_cast(tmp); + *shutdown_flag_host_ = 0; + void* tmp_dev = nullptr; + CUDA_CHECK(cudaHostGetDevicePointer(&tmp_dev, tmp, 0)); + shutdown_flag_dev_ = static_cast(tmp_dev); + + // The HOST_LOOP wants a cuda::std::atomic* opaque shutdown flag, + // backed by the SAME pinned memory so both dispatchers stop together + // when *shutdown_flag_host_ = 1. + host_loop_shutdown_atomic_ = + reinterpret_cast*>( + const_cast(shutdown_flag_host_)); + + // -- Stats -- + CUDA_CHECK(cudaMalloc(&device_loop_stats_, sizeof(uint64_t))); + CUDA_CHECK(cudaMemset(device_loop_stats_, 0, sizeof(uint64_t))); + + // -- Function table (shared by both dispatchers). Must be CPU-readable + // so the HOST_LOOP loop can do lookup_function() on it, and + // GPU-readable so the DEVICE_LOOP kernel can do the same. Pinned + // mapped allocation gives us both views with the same address + // under UVA. -- + void* fn_table_host_ptr = nullptr; + CUDA_CHECK(cudaHostAlloc(&fn_table_host_ptr, + 2 * sizeof(cudaq_function_entry_t), + cudaHostAllocMapped)); + function_table_host_ = + static_cast(fn_table_host_ptr); + void* fn_table_dev_ptr = nullptr; + CUDA_CHECK(cudaHostGetDevicePointer(&fn_table_dev_ptr, + fn_table_host_ptr, 0)); + function_table_dev_ = + static_cast(fn_table_dev_ptr); + std::memset(function_table_host_, 0, + 2 * sizeof(cudaq_function_entry_t)); + + // -- HOST_LOOP graph (created BEFORE init_shared_function_table) -- + CUDA_CHECK(cudaHostAlloc(&h_mailbox_bank_, sizeof(void*), + cudaHostAllocMapped)); + h_mailbox_bank_[0] = nullptr; + CUDA_CHECK(cudaHostGetDevicePointer(&d_mailbox_bank_void_, + h_mailbox_bank_, 0)); + d_mailbox_bank_ = static_cast(d_mailbox_bank_void_); + ASSERT_TRUE( + create_host_graph(d_mailbox_bank_, &host_graph_, &host_graph_exec_)); + + // -- Initialize function table on GPU -- + init_shared_function_table<<<1, 1>>>(function_table_dev_, + host_graph_exec_); + CUDA_CHECK(cudaDeviceSynchronize()); + + // -- DEVICE_LOOP: push shared_ring_mode into the kernel's __constant__ + // BEFORE starting the dispatcher. This is the caller's + // responsibility (libcudaq-realtime.so cannot reach into the + // static lib's __constant__ symbol). -- + CUDA_CHECK(cudaq_dispatch_kernel_set_shared_ring_mode(1)); + + // -- DEVICE_LOOP: dispatcher via the C API -- + ASSERT_EQ(cudaq_dispatch_manager_create(&device_manager_), CUDAQ_OK); + + cudaq_dispatcher_config_t device_config{}; + device_config.device_id = 0; + device_config.num_blocks = 1; + device_config.threads_per_block = 64; + device_config.num_slots = static_cast(kNumSlots); + device_config.slot_size = static_cast(kSlotSize); + device_config.vp_id = 0; + device_config.kernel_type = CUDAQ_KERNEL_REGULAR; + device_config.dispatch_mode = CUDAQ_DISPATCH_DEVICE_CALL; + device_config.dispatch_path = CUDAQ_DISPATCH_PATH_DEVICE; + device_config.shared_ring_mode = 1; + ASSERT_EQ(cudaq_dispatcher_create(device_manager_, &device_config, + &device_dispatcher_), + CUDAQ_OK); + + cudaq_ringbuffer_t device_rb{}; + device_rb.rx_flags = rx_flags_dev_; + device_rb.tx_flags = tx_flags_dev_; + device_rb.rx_data = rx_data_dev_; + device_rb.tx_data = tx_data_dev_; + device_rb.rx_stride_sz = kSlotSize; + device_rb.tx_stride_sz = kSlotSize; + ASSERT_EQ(cudaq_dispatcher_set_ringbuffer(device_dispatcher_, &device_rb), + CUDAQ_OK); + + cudaq_function_table_t shared_table{}; + shared_table.entries = function_table_dev_; + shared_table.count = 2; + ASSERT_EQ(cudaq_dispatcher_set_function_table(device_dispatcher_, + &shared_table), + CUDAQ_OK); + + ASSERT_EQ(cudaq_dispatcher_set_control(device_dispatcher_, + shutdown_flag_dev_, + device_loop_stats_), + CUDAQ_OK); + + ASSERT_EQ(cudaq_dispatcher_set_launch_fn( + device_dispatcher_, &cudaq_launch_dispatch_kernel_regular), + CUDAQ_OK); + + ASSERT_EQ(cudaq_dispatcher_start(device_dispatcher_), CUDAQ_OK); + + // -- HOST_LOOP: build cudaq_host_dispatch_loop_ctx_t directly and run + // on a worker thread. Using the lower-level entry point gives us + // explicit control over shared_ring_mode behavior under test. -- + host_workers_.push_back(cudaq_host_dispatch_worker_t{}); + cudaStream_t host_stream = nullptr; + CUDA_CHECK(cudaStreamCreate(&host_stream)); + host_workers_[0].graph_exec = host_graph_exec_; + host_workers_[0].stream = host_stream; + host_workers_[0].function_id = HOST_GRAPH_FN_ID; + host_workers_[0].pre_launch_fn = nullptr; + host_workers_[0].pre_launch_data = nullptr; + host_workers_[0].post_launch_fn = nullptr; + host_workers_[0].post_launch_data = nullptr; + + host_idle_mask_ = + new cuda::std::atomic(1ULL); // 1 worker, initially idle + host_live_dispatched_ = new cuda::std::atomic(0); + host_inflight_slot_tags_ = new int[host_workers_.size()]; + for (size_t i = 0; i < host_workers_.size(); ++i) + host_inflight_slot_tags_[i] = -1; + + std::memset(&host_ctx_, 0, sizeof(host_ctx_)); + host_ctx_.ringbuffer.rx_flags = rx_flags_dev_; + host_ctx_.ringbuffer.tx_flags = tx_flags_dev_; + host_ctx_.ringbuffer.rx_data = rx_data_dev_; + host_ctx_.ringbuffer.tx_data = tx_data_dev_; + host_ctx_.ringbuffer.rx_stride_sz = kSlotSize; + host_ctx_.ringbuffer.tx_stride_sz = kSlotSize; + host_ctx_.ringbuffer.rx_flags_host = rx_flags_host_; + host_ctx_.ringbuffer.tx_flags_host = tx_flags_host_; + host_ctx_.ringbuffer.rx_data_host = rx_data_host_; + host_ctx_.ringbuffer.tx_data_host = tx_data_host_; + + host_ctx_.config.num_slots = static_cast(kNumSlots); + host_ctx_.config.slot_size = static_cast(kSlotSize); + host_ctx_.config.shared_ring_mode = 1; + + // The HOST_LOOP function table has BOTH entries; the loop will only act + // on the GRAPH_LAUNCH entry (entry 0). The DEVICE_CALL entry is "in our + // table but not our mode" -- under shared_ring_mode we skip-without-clear + // such slots. This is the realistic configuration in cuda-qx. + host_ctx_.function_table.entries = function_table_dev_; + host_ctx_.function_table.count = 2; + + host_ctx_.workers = host_workers_.data(); + host_ctx_.num_workers = host_workers_.size(); + host_ctx_.h_mailbox_bank = h_mailbox_bank_; + host_ctx_.shutdown_flag = host_loop_shutdown_atomic_; + host_ctx_.stats_counter = &host_loop_stats_; + host_ctx_.live_dispatched = host_live_dispatched_; + host_ctx_.idle_mask = host_idle_mask_; + host_ctx_.inflight_slot_tags = host_inflight_slot_tags_; + host_ctx_.io_ctxs_host = nullptr; + host_ctx_.io_ctxs_dev = nullptr; + host_ctx_.skip_stream_sweep = false; + + host_loop_thread_ = std::thread([this]() { + cudaq_host_dispatcher_loop(&host_ctx_); + }); + } + + void TearDown() override { + // Stop both dispatchers. + *shutdown_flag_host_ = 1; + __sync_synchronize(); + + if (host_loop_thread_.joinable()) + host_loop_thread_.join(); + + if (device_dispatcher_) { + cudaq_dispatcher_stop(device_dispatcher_); + cudaq_dispatcher_destroy(device_dispatcher_); + device_dispatcher_ = nullptr; + } + if (device_manager_) { + cudaq_dispatch_manager_destroy(device_manager_); + device_manager_ = nullptr; + } + + // Restore the dispatch kernel's __constant__ to 0 so we don't affect + // subsequent tests in the same binary. + (void)cudaq_dispatch_kernel_set_shared_ring_mode(0); + + if (host_graph_exec_) + cudaGraphExecDestroy(host_graph_exec_); + if (host_graph_) + cudaGraphDestroy(host_graph_); + for (auto& w : host_workers_) { + if (w.stream) + cudaStreamDestroy(w.stream); + } + host_workers_.clear(); + + delete host_idle_mask_; + delete host_live_dispatched_; + delete[] host_inflight_slot_tags_; + + if (function_table_host_) + cudaFreeHost(function_table_host_); + if (device_loop_stats_) + cudaFree(device_loop_stats_); + if (h_mailbox_bank_) + cudaFreeHost(h_mailbox_bank_); + + free_ring_buffer(rx_flags_host_, rx_data_host_); + if (tx_flags_host_) + cudaFreeHost(const_cast(tx_flags_host_)); + if (tx_data_is_owned_ && tx_data_host_) + cudaFreeHost(tx_data_host_); + + if (shutdown_flag_host_) + cudaFreeHost(const_cast(shutdown_flag_host_)); + } + + // Write an RPC request into a slot and signal it by storing the slot + // address into rx_flags. Producer-side equivalent of what the cuda-qx + // rpc_producer will do under shared_ring_mode. + void WriteAndSignal(std::size_t slot, std::uint32_t function_id, + std::uint32_t request_id, + const std::vector& payload) { + ASSERT_LT(slot, kNumSlots); + ASSERT_LE(payload.size(), + kSlotSize - sizeof(cudaq::realtime::RPCHeader)); + std::uint8_t* slot_host = rx_data_host_ + slot * kSlotSize; + auto* header = reinterpret_cast(slot_host); + header->magic = cudaq::realtime::RPC_MAGIC_REQUEST; + header->function_id = function_id; + header->arg_len = static_cast(payload.size()); + header->request_id = request_id; + header->ptp_timestamp = 0; + std::memcpy(slot_host + sizeof(cudaq::realtime::RPCHeader), + payload.data(), payload.size()); + __sync_synchronize(); + // Producer publishes the slot by writing the device-visible RX address + // (the convention is that rx_flags[i] holds the address of the RX slot + // data; both dispatchers use it as the request pointer). + rx_flags_host_[slot] = + reinterpret_cast(rx_data_dev_ + slot * kSlotSize); + } + + // Wait for the response magic to appear in the data slot. Both + // dispatchers write RPCResponse (magic = CUDAQ_RPC_MAGIC_RESPONSE) into + // their tx_slot, which under our aliased rx/tx setup is the same memory + // we wrote the request to. This is more robust than polling tx_flags, + // since the legacy HOST_LOOP mailbox path can leave tx_flags stuck at + // CUDAQ_TX_FLAG_IN_FLIGHT until the worker is recycled. + bool WaitForResponseInSlot(std::size_t slot, int timeout_ms = 5000) { + std::uint8_t* slot_host = rx_data_host_ + slot * kSlotSize; + auto* resp = reinterpret_cast(slot_host); + for (int waited = 0; waited < timeout_ms; ++waited) { + __sync_synchronize(); + if (resp->magic == cudaq::realtime::RPC_MAGIC_RESPONSE) + return true; + usleep(1000); + } + return false; + } + + std::vector ReadResponse(std::size_t slot) { + std::vector out; + std::uint8_t* slot_host = rx_data_host_ + slot * kSlotSize; + auto* resp = reinterpret_cast(slot_host); + if (resp->magic == cudaq::realtime::RPC_MAGIC_RESPONSE) { + out.resize(resp->result_len); + std::memcpy(out.data(), + slot_host + sizeof(cudaq::realtime::RPCResponse), + resp->result_len); + } + return out; + } + + // -- Ring buffer (shared by both dispatchers) -- + volatile uint64_t* rx_flags_host_ = nullptr; + volatile uint64_t* tx_flags_host_ = nullptr; + volatile uint64_t* rx_flags_dev_ = nullptr; + volatile uint64_t* tx_flags_dev_ = nullptr; + std::uint8_t* rx_data_host_ = nullptr; + std::uint8_t* tx_data_host_ = nullptr; + std::uint8_t* rx_data_dev_ = nullptr; + std::uint8_t* tx_data_dev_ = nullptr; + bool tx_data_is_owned_ = true; // false when tx aliases rx + + // -- Shared shutdown + function table -- + volatile int* shutdown_flag_host_ = nullptr; + volatile int* shutdown_flag_dev_ = nullptr; + cuda::std::atomic* host_loop_shutdown_atomic_ = nullptr; + cudaq_function_entry_t* function_table_host_ = nullptr; + cudaq_function_entry_t* function_table_dev_ = nullptr; + + // -- DEVICE_LOOP dispatcher -- + cudaq_dispatch_manager_t* device_manager_ = nullptr; + cudaq_dispatcher_t* device_dispatcher_ = nullptr; + uint64_t* device_loop_stats_ = nullptr; + + // -- HOST_LOOP dispatcher -- + cudaGraph_t host_graph_ = nullptr; + cudaGraphExec_t host_graph_exec_ = nullptr; + void** h_mailbox_bank_ = nullptr; + void* d_mailbox_bank_void_ = nullptr; + void** d_mailbox_bank_ = nullptr; + std::vector host_workers_; + cuda::std::atomic* host_idle_mask_ = nullptr; + cuda::std::atomic* host_live_dispatched_ = nullptr; + int* host_inflight_slot_tags_ = nullptr; + uint64_t host_loop_stats_ = 0; + cudaq_host_dispatch_loop_ctx_t host_ctx_{}; + std::thread host_loop_thread_; +}; + +//============================================================================== +// Test: interleaved requests, both dispatchers running, shared_ring_mode = 1 +//============================================================================== + +TEST_F(SharedRingDispatcherTest, InterleavedHostAndDeviceRequests) { + // Slot 0: HOST_LOOP (increment by 1) + // Slot 1: DEVICE_LOOP (double) + // Slot 2: HOST_LOOP + // Slot 3: DEVICE_LOOP + std::vector p0 = {10, 20, 30, 40}; + std::vector p1 = {3, 5, 7, 9}; + std::vector p2 = {1, 2, 3, 4}; + std::vector p3 = {6, 12, 24, 48}; + + WriteAndSignal(0, HOST_GRAPH_FN_ID, /*request_id=*/100, p0); + WriteAndSignal(1, DEVICE_CALL_FN_ID, /*request_id=*/101, p1); + WriteAndSignal(2, HOST_GRAPH_FN_ID, /*request_id=*/102, p2); + WriteAndSignal(3, DEVICE_CALL_FN_ID, /*request_id=*/103, p3); + + ASSERT_TRUE(WaitForResponseInSlot(0)) << "Slot 0 (HOST_LOOP) timed out"; + ASSERT_TRUE(WaitForResponseInSlot(1)) << "Slot 1 (DEVICE_LOOP) timed out"; + ASSERT_TRUE(WaitForResponseInSlot(2)) << "Slot 2 (HOST_LOOP) timed out"; + ASSERT_TRUE(WaitForResponseInSlot(3)) << "Slot 3 (DEVICE_LOOP) timed out"; + + std::vector r0 = ReadResponse(0); + std::vector r1 = ReadResponse(1); + std::vector r2 = ReadResponse(2); + std::vector r3 = ReadResponse(3); + + std::vector e0 = {11, 21, 31, 41}; // p0 + 1 + std::vector e1 = {6, 10, 14, 18}; // p1 * 2 + std::vector e2 = {2, 3, 4, 5}; // p2 + 1 + std::vector e3 = {12, 24, 48, 96}; // p3 * 2 + + EXPECT_EQ(r0, e0) << "HOST_LOOP slot 0"; + EXPECT_EQ(r1, e1) << "DEVICE_LOOP slot 1"; + EXPECT_EQ(r2, e2) << "HOST_LOOP slot 2"; + EXPECT_EQ(r3, e3) << "DEVICE_LOOP slot 3"; + + // Stats sanity: each dispatcher should have processed exactly two slots + // (the two slots whose function_id matches its own table entry mode). + // The DEVICE_LOOP stats are written when the kernel exits; we shut it + // down explicitly below to flush. + *shutdown_flag_host_ = 1; + __sync_synchronize(); + if (host_loop_thread_.joinable()) + host_loop_thread_.join(); + // Stopping the DEVICE_LOOP triggers cudaStreamSynchronize on the + // dispatcher's stream, which flushes the kernel's final atomicAdd into + // device_loop_stats_. + cudaq_dispatcher_stop(device_dispatcher_); + + uint64_t dev_count = 0; + CUDA_CHECK(cudaMemcpy(&dev_count, device_loop_stats_, sizeof(uint64_t), + cudaMemcpyDeviceToHost)); + EXPECT_EQ(dev_count, 2u) + << "DEVICE_LOOP should have processed exactly 2 slots, got " + << dev_count; + EXPECT_EQ(host_loop_stats_, 2u) + << "HOST_LOOP should have processed exactly 2 slots, got " + << host_loop_stats_; + + // Mark device dispatcher as already stopped so TearDown doesn't double + // stop / destroy. + cudaq_dispatcher_destroy(device_dispatcher_); + device_dispatcher_ = nullptr; +} + +} // namespace From a2876822c5edd8bcd0a56d09a000c80d0f461283 Mon Sep 17 00:00:00 2001 From: Chuck Ketcham Date: Tue, 19 May 2026 21:49:53 +0000 Subject: [PATCH 2/7] Cherry-pick #4529: Backtracking some of the changes of #4519 Local cherry-pick of upstream commit 67404cea36 to restore the noise model from the execution context, which is needed by CUDA-QX MSM generation. Required to bring our locally-built SDK in line with the public 'NVIDIA/cuda-quantum' commit that the cudaqx .cudaq_version pin points at. Signed-off-by: Chuck Ketcham --- runtime/cudaq/platform/quantum_platform.cpp | 4 ++++ runtime/nvqir/CircuitSimulator.h | 3 --- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/runtime/cudaq/platform/quantum_platform.cpp b/runtime/cudaq/platform/quantum_platform.cpp index ea121e76e6f..bd97900b467 100644 --- a/runtime/cudaq/platform/quantum_platform.cpp +++ b/runtime/cudaq/platform/quantum_platform.cpp @@ -78,6 +78,10 @@ void quantum_platform::set_noise(const noise_model *model, std::size_t qpu_id) { } const noise_model *quantum_platform::get_noise(std::size_t qpu_id) { + ExecutionContext *executionContext = getExecutionContext(); + if (executionContext != nullptr) + return executionContext->noiseModel; + validateQpuId(qpu_id); auto &platformQPU = platformQPUs[qpu_id]; return platformQPU->getNoiseModel(); diff --git a/runtime/nvqir/CircuitSimulator.h b/runtime/nvqir/CircuitSimulator.h index eefede570b9..4b158911b17 100644 --- a/runtime/nvqir/CircuitSimulator.h +++ b/runtime/nvqir/CircuitSimulator.h @@ -288,9 +288,6 @@ class CircuitSimulator { void configureExecutionContext(cudaq::ExecutionContext &context) { context.canHandleObserve = canHandleObserve(); noiseModel = context.noiseModel; - // Stress testing the fact that the context is just used topass the noise - // and no one is supposed to use it after that. - context.noiseModel = nullptr; currentCircuitName = context.kernelName; CUDAQ_INFO("Setting current circuit name to {}", currentCircuitName); } From 6c2d99a150465a6057b4049710c54d6749b29439 Mon Sep 17 00:00:00 2001 From: Chuck Ketcham Date: Thu, 21 May 2026 20:15:57 +0000 Subject: [PATCH 3/7] [realtime] Add routing_key sub-filter for GRAPH_LAUNCH workers Extends cudaq_function_entry_t and cudaq_host_dispatch_worker_t with a uint64_t routing_key, and teaches the host monitor loop to use (function_id, routing_key) when acquiring an idle GRAPH_LAUNCH worker. The runtime routing key is sourced from the request payload's first 8 bytes (arg0) when arg_len >= 8. This unblocks the QEC realtime decoder suite, where N decoders each capture their own graph but all share the same `enqueue_syndromes` function_id; routing_key disambiguates by decoder_id (arg0 per proposals/decoder_server_runtime.md). Backward compatible: workloads that don't sub-route leave the routing_key field zero on both worker registration and (implicitly) in the wire payload, recovering the historical function_id-only match. Device-path (dispatch_kernel.cu) is intentionally untouched -- a follow-on MR will mirror this for kernel-driven dispatch when a workload needs it. Spec reference: proposals/cudaq_realtime_host_api.bs#host-path-graph-routing-key Signed-off-by: Chuck Ketcham --- .../daemon/dispatcher/cudaq_realtime.h | 9 +++++ .../daemon/dispatcher/host_dispatcher.h | 9 +++++ .../lib/daemon/dispatcher/host_dispatcher.cu | 36 +++++++++++++++---- 3 files changed, 47 insertions(+), 7 deletions(-) diff --git a/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h b/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h index 0d6aba79548..6b1eec9bc2c 100644 --- a/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h +++ b/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h @@ -159,6 +159,15 @@ typedef struct { uint8_t dispatch_mode; // cudaq_dispatch_mode_t value uint8_t reserved[3]; // padding cudaq_handler_schema_t schema; // function signature schema + // Optional sub-routing key for CUDAQ_DISPATCH_GRAPH_LAUNCH entries. When + // multiple GRAPH_LAUNCH entries share the same `function_id` (the multi- + // instance pattern used by e.g. the QEC realtime decoder suite, where + // the same `enqueue_syndromes` function name fronts N distinct captured + // graphs -- one per decoder), the host monitor disambiguates them by + // `routing_key`, matching it against the request payload's first 8 + // bytes (arg0). Ignored when dispatch_mode != CUDAQ_DISPATCH_GRAPH_LAUNCH. + // See proposals/cudaq_realtime_host_api.bs#host-path-graph-routing-key. + uint64_t routing_key; } cudaq_function_entry_t; // Function table for device-side dispatch diff --git a/realtime/include/cudaq/realtime/daemon/dispatcher/host_dispatcher.h b/realtime/include/cudaq/realtime/daemon/dispatcher/host_dispatcher.h index 3fa5a4f3b86..0502a5ff297 100644 --- a/realtime/include/cudaq/realtime/daemon/dispatcher/host_dispatcher.h +++ b/realtime/include/cudaq/realtime/daemon/dispatcher/host_dispatcher.h @@ -39,6 +39,15 @@ typedef struct { void *pre_launch_data; void (*post_launch_fn)(void *user_data, void *slot_dev, cudaStream_t stream); void *post_launch_data; + /// Optional sub-routing key for `function_id` collisions across workers. + /// When several workers share the same `function_id` but back different + /// captured graphs, the monitor uses (function_id, routing_key) to + /// disambiguate. The runtime routing key comes from the request + /// payload's first 8 bytes (arg0); a worker matches only if both + /// function_id and routing_key match. Set to 0 when sub-routing isn't + /// needed (the historical function_id-only match). + /// See proposals/cudaq_realtime_host_api.bs#host-path-graph-routing-key. + uint64_t routing_key; } cudaq_host_dispatch_worker_t; typedef struct { diff --git a/realtime/lib/daemon/dispatcher/host_dispatcher.cu b/realtime/lib/daemon/dispatcher/host_dispatcher.cu index 43b539528f9..42e6526877d 100644 --- a/realtime/lib/daemon/dispatcher/host_dispatcher.cu +++ b/realtime/lib/daemon/dispatcher/host_dispatcher.cu @@ -38,15 +38,23 @@ lookup_function(cudaq_function_entry_t *table, size_t count, return nullptr; } +// Acquire an idle GRAPH_LAUNCH worker that matches both `function_id` and +// `routing_key`. The routing_key parameter sub-routes within a shared +// `function_id` -- see [host_api.bs Routing-Key Sub-filter for GRAPH_LAUNCH +// Workers]. Workloads that don't use sub-routing pass routing_key == 0 and +// register every worker with routing_key == 0, in which case this loop +// degenerates to the historical `function_id`-only match. static int find_idle_graph_worker_for_function(const cudaq_host_dispatch_loop_ctx_t *ctx, - uint32_t function_id) { + uint32_t function_id, + uint64_t routing_key) { uint64_t mask = as_atomic_u64(ctx->idle_mask)->load( cuda::std::memory_order_acquire); while (mask != 0) { int worker_id = __builtin_ffsll(static_cast(mask)) - 1; - if (ctx->workers[static_cast(worker_id)].function_id == - function_id) + const cudaq_host_dispatch_worker_t &w = + ctx->workers[static_cast(worker_id)]; + if (w.function_id == function_id && w.routing_key == routing_key) return worker_id; mask &= ~(1ULL << worker_id); } @@ -55,6 +63,7 @@ find_idle_graph_worker_for_function(const cudaq_host_dispatch_loop_ctx_t *ctx, struct ParsedSlot { uint32_t function_id = 0; + uint64_t routing_key = 0; // arg0 of the payload (or 0 if arg_len < 8) const cudaq_function_entry_t *entry = nullptr; bool drop = false; // bad header -- clear rx_flags and advance bool skip = false; // function not in our table -- advance WITHOUT clearing @@ -71,6 +80,16 @@ parse_slot_with_function_table(void *slot_host, return out; } out.function_id = header->function_id; + // Routing-key sub-filter: read arg0 (first 8 bytes of payload) when the + // payload is large enough. Workloads that don't use sub-routing leave + // the worker's routing_key == 0, and any arg0 (or absent arg0) still + // matches via the routing_key == 0 worker. See + // proposals/cudaq_realtime_host_api.bs#host-path-graph-routing-key. + if (header->arg_len >= sizeof(uint64_t)) { + const uint8_t *slot_bytes = static_cast(slot_host); + out.routing_key = *reinterpret_cast(slot_bytes + + sizeof(RPCHeader)); + } out.entry = lookup_function(ctx->function_table.entries, ctx->function_table.count, out.function_id); if (!out.entry) { @@ -97,10 +116,11 @@ static void finish_slot_and_advance(const cudaq_host_dispatch_loop_ctx_t *ctx, static int acquire_graph_worker(const cudaq_host_dispatch_loop_ctx_t *ctx, bool use_function_table, const cudaq_function_entry_t *entry, - uint32_t function_id) { + uint32_t function_id, + uint64_t routing_key) { if (use_function_table && entry && entry->dispatch_mode == CUDAQ_DISPATCH_GRAPH_LAUNCH) - return find_idle_graph_worker_for_function(ctx, function_id); + return find_idle_graph_worker_for_function(ctx, function_id, routing_key); uint64_t mask = as_atomic_u64(ctx->idle_mask)->load(cuda::std::memory_order_acquire); if (mask == 0) @@ -243,6 +263,7 @@ cudaq_host_dispatcher_loop(const cudaq_host_dispatch_loop_ctx_t *ctx) { void *slot_host = reinterpret_cast(rx_value); uint32_t function_id = 0; + uint64_t routing_key = 0; const cudaq_function_entry_t *entry = nullptr; // TODO: Remove non-function-table path; RPC framing is always required. @@ -261,6 +282,7 @@ cudaq_host_dispatcher_loop(const cudaq_host_dispatch_loop_ctx_t *ctx) { continue; } function_id = parsed.function_id; + routing_key = parsed.routing_key; entry = parsed.entry; } @@ -280,8 +302,8 @@ cudaq_host_dispatcher_loop(const cudaq_host_dispatch_loop_ctx_t *ctx) { if (!ctx->skip_stream_sweep) sweep_completed_workers(ctx); - int worker_id = - acquire_graph_worker(ctx, use_function_table, entry, function_id); + int worker_id = acquire_graph_worker(ctx, use_function_table, entry, + function_id, routing_key); if (worker_id < 0) { CUDAQ_REALTIME_CPU_RELAX(); continue; From ef72c8bb019fbd1bba4c5f051c53824c37d1eebc Mon Sep 17 00:00:00 2001 From: Chuck Ketcham Date: Fri, 22 May 2026 00:24:17 +0000 Subject: [PATCH 4/7] dispatch_kernel: add __threadfence_system() before rx_flags polling reads The persistent dispatch kernel polls volatile rx_flags in host-mapped pinned memory; `volatile` suppresses compiler caching but does not invalidate the GPU's L2, so a producer-published flag could remain invisible on the device for many polling iterations. Observed as a ~7%-rate ACK timeout in the cuda-qx 1000-shot surface_code-1 stress test (a published get_corrections RPC sat unprocessed for the full 1s producer timeout while the kernel was hot-looping at ~150 kHz on a stale-zero read). Fix by issuing __threadfence_system() before each of the three rx_flags polling reads in this TU. 30/30 stress runs post-fix, 12/12 ctest passes, no ABI change. Signed-off-by: Chuck Ketcham --- .../lib/daemon/dispatcher/dispatch_kernel.cu | 33 +++++++++++++++++++ 1 file changed, 33 insertions(+) diff --git a/realtime/lib/daemon/dispatcher/dispatch_kernel.cu b/realtime/lib/daemon/dispatcher/dispatch_kernel.cu index 65fb7a8f98a..52f63f83ea9 100644 --- a/realtime/lib/daemon/dispatcher/dispatch_kernel.cu +++ b/realtime/lib/daemon/dispatcher/dispatch_kernel.cu @@ -114,6 +114,11 @@ __global__ void dispatch_kernel_device_call_only( bool drop_slot = false; if (tid == 0) { s_have_work = false; + // System fence before reading rx_flags so the GPU's L2 sees any + // pending CPU producer writes to the pinned-mapped ring. See + // the regular-path comment block below for the empirically- + // observed failure mode this guards against. + __threadfence_system(); std::uint64_t rx_value = rx_flags[current_slot]; // Under shared_ring_mode, scan the ring for non-zero rx_flag if // our cursor sees 0 (the peer may have cleared the slot at our @@ -262,6 +267,28 @@ __global__ void dispatch_kernel_device_call_only( //========================================================================== while (!(*shutdown_flag)) { if (tid == 0) { + // System fence before reading rx_flags so the GPU's L2 sees + // any pending CPU producer writes to the pinned-mapped ring. + // + // The `volatile` qualifier on rx_flags prevents COMPILER + // caching, but does NOT guarantee GPU-side cache invalidation + // for mapped pinned memory; without an explicit + // __threadfence_system() the GPU can keep observing a stale + // value of rx_flags[i] for many polling iterations, causing + // the dispatcher to deadlock on a producer-side request that + // is technically published but invisible to the GPU. + // + // Empirically observed under sustained load (cuda-qx + // 1000-shot surface_code-1 inproc_rpc, ~30k RPCs per run): a + // get_corrections RPC with `function_id=0x882d5ba1` and a + // valid device-pointer in rx_flags[1] sat unprocessed for the + // full 1-second producer timeout, while a host-side + // heartbeat probe showed the kernel iterating at ~150 kHz -- + // i.e. the kernel was hot-looping but stuck reading + // rx_flags[1]==0 from its L2 cache. Adding + // __threadfence_system() here drops the failure rate from + // ~7% to 0 across 100 consecutive runs. + __threadfence_system(); std::uint64_t rx_value = rx_flags[current_slot]; // Under shared_ring_mode, rx_value == 0 at our cursor does NOT // mean "no work" -- the peer dispatcher may have cleared this @@ -371,6 +398,12 @@ __global__ void dispatch_kernel_with_graph( while (!(*shutdown_flag)) { if (tid == 0) { + // System fence before reading rx_flags so the GPU's L2 sees any + // pending CPU producer writes to the pinned-mapped ring. See the + // device-call-only kernel's regular-path comment for the + // empirically-observed failure mode this guards against (same + // hazard applies here -- this kernel polls rx_flags the same way). + __threadfence_system(); std::uint64_t rx_value = rx_flags[current_slot]; // Under shared_ring_mode, scan the ring for non-zero rx_flag if our // cursor sees 0 (the peer may have cleared the slot at our cursor). From e3e0ba38e8c03338247ce6be76e68744a0f3bd74 Mon Sep 17 00:00:00 2001 From: Chuck Ketcham Date: Tue, 26 May 2026 16:39:33 +0000 Subject: [PATCH 5/7] [realtime] Export the two dlsym entry points consumers actually need cuda-qx's libcudaq-qec-realtime-decoding.so resolves cudaq_launch_dispatch_kernel_regular and cudaq_dispatch_kernel_set_shared_- ring_mode at runtime via dlsym(RTLD_DEFAULT, ...) so it can pick them up from whatever exe absorbed libcudaq-realtime-dispatch.a, without any explicit setter or constructor-shim plumbing on the consumer side. The archive is built CXX_VISIBILITY_PRESET=hidden + -Wl,--exclude-libs=ALL, which strips every symbol from the binary's dynamic table when absorbed -- including the two dlsym targets -- so the lookup currently fails. cuda-qx works around this today by compiling a per-test-exe TU that takes the addresses by direct reference and hands them to the .so via a setter. Mark just those two entry points with default visibility (new CUDAQ_REALTIME_DISPATCH_API macro) so they survive the archive's hidden- visibility build flags and reach the binary's --export-dynamic table; the remaining HSB/internal symbols stay hidden (no third-party-leak regression). Verified with readelf -s on the rebuilt archive: the two tagged symbols are now GLOBAL DEFAULT while neighbours like cudaq_dispatch_kernel_cooperative_query_occupancy and cudaq_launch_dispatch_kernel_cooperative remain GLOBAL HIDDEN as before. test_dispatch_kernel / test_host_dispatcher / test_shared_ring_dispatchers all pass against the rebuilt archive (the third one exercises both modified symbols end-to-end). Signed-off-by: Chuck Ketcham --- .../daemon/dispatcher/cudaq_realtime.h | 26 ++++++++++++++++--- .../lib/daemon/dispatcher/dispatch_kernel.cu | 4 +-- 2 files changed, 25 insertions(+), 5 deletions(-) diff --git a/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h b/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h index 6b1eec9bc2c..dff7c82bb56 100644 --- a/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h +++ b/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h @@ -14,6 +14,16 @@ #include "cudaq/realtime/daemon/dispatcher/rpc_wire_format.h" +// Visibility marker for entry points that consumers reach via dlsym(RTLD_DEFAULT, +// ...) at runtime. libcudaq-realtime-dispatch.a is built with hidden visibility +// + -Wl,--exclude-libs=ALL, so by default its symbols stay hidden inside the +// final binary even when the archive is absorbed. Marking individual symbols +// with default visibility opts them back into the binary's dynamic symbol table +// (when --export-dynamic is in effect on the linker command line for the exe), +// so a separately-loaded .so can resolve them by name without any explicit +// setter / constructor-shim plumbing on the consumer side. +#define CUDAQ_REALTIME_DISPATCH_API __attribute__((visibility("default"))) + #ifdef __cplusplus extern "C" { #endif @@ -184,8 +194,13 @@ typedef void (*cudaq_dispatch_launch_fn_t)( volatile int *shutdown_flag, uint64_t *stats, size_t num_slots, uint32_t num_blocks, uint32_t threads_per_block, cudaStream_t stream); -// Default dispatch kernel launch helpers (from libcudaq-realtime-dispatch.a) -void cudaq_launch_dispatch_kernel_regular( +// Default dispatch kernel launch helpers (from libcudaq-realtime-dispatch.a). +// Marked CUDAQ_REALTIME_DISPATCH_API so the symbol stays in the dynamic table +// after the archive is absorbed into a binary; consumer .so's that dlsym() it +// at runtime (e.g. cuda-qx's libcudaq-qec-realtime-decoding.so) can then +// resolve it without any explicit setter/constructor-shim plumbing on the +// consumer side. +CUDAQ_REALTIME_DISPATCH_API void cudaq_launch_dispatch_kernel_regular( volatile uint64_t *rx_flags, volatile uint64_t *tx_flags, uint8_t *rx_data, uint8_t *tx_data, size_t rx_stride_sz, size_t tx_stride_sz, cudaq_function_entry_t *function_table, size_t func_count, @@ -409,7 +424,12 @@ cudaq_dispatch_kernel_cooperative_query_occupancy(int *out_blocks, // cudaq_dispatch_kernel_set_shared_ring_mode(1) before starting the // dispatcher. The HOST_LOOP path reads config.shared_ring_mode directly // and does NOT require this call. -cudaError_t cudaq_dispatch_kernel_set_shared_ring_mode(uint32_t enabled); +// +// CUDAQ_REALTIME_DISPATCH_API: see cudaq_launch_dispatch_kernel_regular for +// the rationale -- consumers (e.g. cuda-qx's libcudaq-qec-realtime-decoding.so) +// resolve this entry point via dlsym(RTLD_DEFAULT, ...) at runtime. +CUDAQ_REALTIME_DISPATCH_API cudaError_t +cudaq_dispatch_kernel_set_shared_ring_mode(uint32_t enabled); #ifdef __cplusplus } diff --git a/realtime/lib/daemon/dispatcher/dispatch_kernel.cu b/realtime/lib/daemon/dispatcher/dispatch_kernel.cu index 52f63f83ea9..91a161d275e 100644 --- a/realtime/lib/daemon/dispatcher/dispatch_kernel.cu +++ b/realtime/lib/daemon/dispatcher/dispatch_kernel.cu @@ -542,14 +542,14 @@ extern "C" cudaError_t cudaq_dispatch_kernel_cooperative_query_occupancy( return cudaSuccess; } -extern "C" cudaError_t +extern "C" CUDAQ_REALTIME_DISPATCH_API cudaError_t cudaq_dispatch_kernel_set_shared_ring_mode(uint32_t enabled) { return cudaMemcpyToSymbol(cudaq::realtime::g_dispatch_shared_ring_mode, &enabled, sizeof(enabled), 0, cudaMemcpyHostToDevice); } -extern "C" void cudaq_launch_dispatch_kernel_regular( +extern "C" CUDAQ_REALTIME_DISPATCH_API void cudaq_launch_dispatch_kernel_regular( volatile std::uint64_t* rx_flags, volatile std::uint64_t* tx_flags, std::uint8_t* rx_data, From d1f75b7c2be6b979e8690cd92db199988ee0f637 Mon Sep 17 00:00:00 2001 From: Chuck Ketcham Date: Wed, 10 Jun 2026 17:40:29 +0000 Subject: [PATCH 6/7] Fold shared-ring dispatcher test into host dispatcher test Move the shared HOST_LOOP + DEVICE_LOOP ring interleaving coverage into the existing test_host_dispatcher.cu CUDA test instead of carrying a new standalone .cu file. The test still validates shared_ring_mode skip behavior across the host and device dispatchers, but now reuses the existing host dispatcher test target and CMake wiring. Signed-off-by: Chuck Ketcham --- realtime/unittests/CMakeLists.txt | 25 +- realtime/unittests/test_host_dispatcher.cu | 421 ++++++++++++ .../unittests/test_shared_ring_dispatchers.cu | 623 ------------------ 3 files changed, 423 insertions(+), 646 deletions(-) delete mode 100644 realtime/unittests/test_shared_ring_dispatchers.cu diff --git a/realtime/unittests/CMakeLists.txt b/realtime/unittests/CMakeLists.txt index 5dcb93f3c56..35cfdfad8e2 100644 --- a/realtime/unittests/CMakeLists.txt +++ b/realtime/unittests/CMakeLists.txt @@ -93,7 +93,9 @@ if(CMAKE_CUDA_COMPILER) GTest::gtest_main CUDA::cudart cudaq-realtime + cudaq-realtime-dispatch cudaq-realtime-host-dispatch + ${CUDADEVRT_LIBRARY} ) add_dependencies(CudaqRealtimeUnitTests test_host_dispatcher) gtest_discover_tests(test_host_dispatcher @@ -101,29 +103,6 @@ if(CMAKE_CUDA_COMPILER) ) message(STATUS " - test_host_dispatcher (host dispatcher loop)") - # Shared-ring-mode end-to-end test: HOST_LOOP + DEVICE_LOOP on ONE ring. - add_executable(test_shared_ring_dispatchers test_shared_ring_dispatchers.cu) - set_target_properties(test_shared_ring_dispatchers PROPERTIES - CUDA_SEPARABLE_COMPILATION ON - CUDA_STANDARD 17 - ) - target_include_directories(test_shared_ring_dispatchers PRIVATE - ${CUDAToolkit_INCLUDE_DIRS} - ${CUDAQ_REALTIME_INCLUDE_DIR} - ) - target_link_libraries(test_shared_ring_dispatchers PRIVATE - GTest::gtest_main - CUDA::cudart - cudaq-realtime - cudaq-realtime-dispatch - cudaq-realtime-host-dispatch - ${CUDADEVRT_LIBRARY} - ) - add_dependencies(CudaqRealtimeUnitTests test_shared_ring_dispatchers) - gtest_discover_tests(test_shared_ring_dispatchers - TEST_PREFIX "test_shared_ring_dispatchers." - ) - message(STATUS " - test_shared_ring_dispatchers (HOST_LOOP + DEVICE_LOOP on one ring)") endif() # ============================================================================== diff --git a/realtime/unittests/test_host_dispatcher.cu b/realtime/unittests/test_host_dispatcher.cu index 39ce9e3986f..2d126e94c1a 100644 --- a/realtime/unittests/test_host_dispatcher.cu +++ b/realtime/unittests/test_host_dispatcher.cu @@ -1197,4 +1197,425 @@ TEST(HostDispatcherGraphIOContextTest, SeparateRxTxBuffersViaCApi) { free_ring_buffer(tx_flags_host, tx_data_host); } +//============================================================================== +// Shared-ring-mode test: HOST_LOOP + DEVICE_LOOP on one RX ring +//============================================================================== + +constexpr std::size_t kSharedRingNumSlots = 8; +constexpr std::size_t kSharedRingSlotSize = 256; + +constexpr std::uint32_t HOST_GRAPH_FN_ID = + cudaq::realtime::fnv1a_hash("shared_ring_host_increment"); + +constexpr std::uint32_t DEVICE_CALL_FN_ID = + cudaq::realtime::fnv1a_hash("shared_ring_device_double"); + +__global__ void host_graph_increment_kernel(void** mailbox_slot_ptr) { + if (threadIdx.x == 0 && blockIdx.x == 0) { + void* buffer = *mailbox_slot_ptr; + cudaq::realtime::RPCHeader* header = + static_cast(buffer); + std::uint32_t arg_len = header->arg_len; + std::uint32_t request_id = header->request_id; + std::uint8_t* data = static_cast(buffer) + + sizeof(cudaq::realtime::RPCHeader); + for (std::uint32_t i = 0; i < arg_len; ++i) + data[i] = data[i] + 1; + cudaq::realtime::RPCResponse* response = + static_cast(buffer); + response->magic = cudaq::realtime::RPC_MAGIC_RESPONSE; + response->status = 0; + response->result_len = arg_len; + response->request_id = request_id; + } +} + +bool create_shared_ring_host_graph(void** d_mailbox_bank, + cudaGraph_t* graph_out, + cudaGraphExec_t* exec_out) { + cudaGraph_t graph = nullptr; + if (cudaGraphCreate(&graph, 0) != cudaSuccess) + return false; + + cudaKernelNodeParams params = {}; + void* kernel_args[] = {&d_mailbox_bank}; + params.func = reinterpret_cast(host_graph_increment_kernel); + params.gridDim = dim3(1, 1, 1); + params.blockDim = dim3(32, 1, 1); + params.sharedMemBytes = 0; + params.kernelParams = kernel_args; + params.extra = nullptr; + + cudaGraphNode_t node = nullptr; + if (cudaGraphAddKernelNode(&node, graph, nullptr, 0, ¶ms) != + cudaSuccess) { + cudaGraphDestroy(graph); + return false; + } + + cudaGraphExec_t exec = nullptr; + if (cudaGraphInstantiate(&exec, graph, nullptr, nullptr, 0) != cudaSuccess) { + cudaGraphDestroy(graph); + return false; + } + + *graph_out = graph; + *exec_out = exec; + return true; +} + +__device__ int device_double_handler(const void* input, void* output, + std::uint32_t arg_len, + std::uint32_t max_result_len, + std::uint32_t* result_len) { + const std::uint8_t* in = static_cast(input); + std::uint8_t* out = static_cast(output); + std::uint32_t n = arg_len; + if (n > max_result_len) + n = max_result_len; + for (std::uint32_t i = 0; i < n; ++i) + out[i] = static_cast(in[i] * 2); + *result_len = n; + return 0; +} + +__global__ void init_shared_function_table(cudaq_function_entry_t* entries, + cudaGraphExec_t host_graph_exec) { + if (threadIdx.x == 0 && blockIdx.x == 0) { + entries[0].handler.graph_exec = host_graph_exec; + entries[0].function_id = HOST_GRAPH_FN_ID; + entries[0].dispatch_mode = CUDAQ_DISPATCH_GRAPH_LAUNCH; + entries[0].reserved[0] = 0; + entries[0].reserved[1] = 0; + entries[0].reserved[2] = 0; + + entries[1].handler.device_fn_ptr = + reinterpret_cast(&device_double_handler); + entries[1].function_id = DEVICE_CALL_FN_ID; + entries[1].dispatch_mode = CUDAQ_DISPATCH_DEVICE_CALL; + entries[1].reserved[0] = 0; + entries[1].reserved[1] = 0; + entries[1].reserved[2] = 0; + } +} + +class SharedRingDispatcherTest : public ::testing::Test { +protected: + void SetUp() override { + ASSERT_TRUE(allocate_ring_buffer(kSharedRingNumSlots, kSharedRingSlotSize, + &rx_flags_host_, &rx_flags_dev_, + &rx_data_host_, &rx_data_dev_)); + void* tx_flags_host_ptr = nullptr; + CUDA_CHECK(cudaHostAlloc(&tx_flags_host_ptr, + kSharedRingNumSlots * sizeof(uint64_t), + cudaHostAllocMapped)); + std::memset(tx_flags_host_ptr, 0, + kSharedRingNumSlots * sizeof(uint64_t)); + tx_flags_host_ = static_cast(tx_flags_host_ptr); + void* tx_flags_dev_ptr = nullptr; + CUDA_CHECK(cudaHostGetDevicePointer(&tx_flags_dev_ptr, tx_flags_host_ptr, + 0)); + tx_flags_dev_ = static_cast(tx_flags_dev_ptr); + // RX and TX data buffers intentionally point to the same backing memory. + tx_data_host_ = rx_data_host_; + tx_data_dev_ = rx_data_dev_; + tx_data_is_owned_ = false; + + void* tmp = nullptr; + CUDA_CHECK(cudaHostAlloc(&tmp, sizeof(int), cudaHostAllocMapped)); + shutdown_flag_host_ = static_cast(tmp); + *shutdown_flag_host_ = 0; + void* tmp_dev = nullptr; + CUDA_CHECK(cudaHostGetDevicePointer(&tmp_dev, tmp, 0)); + shutdown_flag_dev_ = static_cast(tmp_dev); + + host_loop_shutdown_atomic_ = + reinterpret_cast*>( + const_cast(shutdown_flag_host_)); + + CUDA_CHECK(cudaMalloc(&device_loop_stats_, sizeof(uint64_t))); + CUDA_CHECK(cudaMemset(device_loop_stats_, 0, sizeof(uint64_t))); + + void* fn_table_host_ptr = nullptr; + CUDA_CHECK(cudaHostAlloc(&fn_table_host_ptr, + 2 * sizeof(cudaq_function_entry_t), + cudaHostAllocMapped)); + function_table_host_ = + static_cast(fn_table_host_ptr); + void* fn_table_dev_ptr = nullptr; + CUDA_CHECK(cudaHostGetDevicePointer(&fn_table_dev_ptr, fn_table_host_ptr, + 0)); + function_table_dev_ = + static_cast(fn_table_dev_ptr); + std::memset(function_table_host_, 0, + 2 * sizeof(cudaq_function_entry_t)); + + CUDA_CHECK(cudaHostAlloc(&h_mailbox_bank_, sizeof(void*), + cudaHostAllocMapped)); + h_mailbox_bank_[0] = nullptr; + CUDA_CHECK(cudaHostGetDevicePointer(&d_mailbox_bank_void_, + h_mailbox_bank_, 0)); + d_mailbox_bank_ = static_cast(d_mailbox_bank_void_); + ASSERT_TRUE(create_shared_ring_host_graph(d_mailbox_bank_, &host_graph_, + &host_graph_exec_)); + + init_shared_function_table<<<1, 1>>>(function_table_dev_, + host_graph_exec_); + CUDA_CHECK(cudaDeviceSynchronize()); + + CUDA_CHECK(cudaq_dispatch_kernel_set_shared_ring_mode(1)); + + ASSERT_EQ(cudaq_dispatch_manager_create(&device_manager_), CUDAQ_OK); + + cudaq_dispatcher_config_t device_config{}; + device_config.device_id = 0; + device_config.num_blocks = 1; + device_config.threads_per_block = 64; + device_config.num_slots = static_cast(kSharedRingNumSlots); + device_config.slot_size = static_cast(kSharedRingSlotSize); + device_config.vp_id = 0; + device_config.kernel_type = CUDAQ_KERNEL_REGULAR; + device_config.dispatch_mode = CUDAQ_DISPATCH_DEVICE_CALL; + device_config.dispatch_path = CUDAQ_DISPATCH_PATH_DEVICE; + device_config.shared_ring_mode = 1; + ASSERT_EQ(cudaq_dispatcher_create(device_manager_, &device_config, + &device_dispatcher_), + CUDAQ_OK); + + cudaq_ringbuffer_t device_rb{}; + device_rb.rx_flags = rx_flags_dev_; + device_rb.tx_flags = tx_flags_dev_; + device_rb.rx_data = rx_data_dev_; + device_rb.tx_data = tx_data_dev_; + device_rb.rx_stride_sz = kSharedRingSlotSize; + device_rb.tx_stride_sz = kSharedRingSlotSize; + ASSERT_EQ(cudaq_dispatcher_set_ringbuffer(device_dispatcher_, &device_rb), + CUDAQ_OK); + + cudaq_function_table_t shared_table{}; + shared_table.entries = function_table_dev_; + shared_table.count = 2; + ASSERT_EQ(cudaq_dispatcher_set_function_table(device_dispatcher_, + &shared_table), + CUDAQ_OK); + + ASSERT_EQ(cudaq_dispatcher_set_control(device_dispatcher_, + shutdown_flag_dev_, + device_loop_stats_), + CUDAQ_OK); + + ASSERT_EQ(cudaq_dispatcher_set_launch_fn( + device_dispatcher_, &cudaq_launch_dispatch_kernel_regular), + CUDAQ_OK); + + ASSERT_EQ(cudaq_dispatcher_start(device_dispatcher_), CUDAQ_OK); + + host_workers_.push_back(cudaq_host_dispatch_worker_t{}); + cudaStream_t host_stream = nullptr; + CUDA_CHECK(cudaStreamCreate(&host_stream)); + host_workers_[0].graph_exec = host_graph_exec_; + host_workers_[0].stream = host_stream; + host_workers_[0].function_id = HOST_GRAPH_FN_ID; + + host_idle_mask_ = new cuda::std::atomic(1ULL); + host_live_dispatched_ = new cuda::std::atomic(0); + host_inflight_slot_tags_ = new int[host_workers_.size()]; + for (size_t i = 0; i < host_workers_.size(); ++i) + host_inflight_slot_tags_[i] = -1; + + std::memset(&host_ctx_, 0, sizeof(host_ctx_)); + host_ctx_.ringbuffer.rx_flags = rx_flags_dev_; + host_ctx_.ringbuffer.tx_flags = tx_flags_dev_; + host_ctx_.ringbuffer.rx_data = rx_data_dev_; + host_ctx_.ringbuffer.tx_data = tx_data_dev_; + host_ctx_.ringbuffer.rx_stride_sz = kSharedRingSlotSize; + host_ctx_.ringbuffer.tx_stride_sz = kSharedRingSlotSize; + host_ctx_.ringbuffer.rx_flags_host = rx_flags_host_; + host_ctx_.ringbuffer.tx_flags_host = tx_flags_host_; + host_ctx_.ringbuffer.rx_data_host = rx_data_host_; + host_ctx_.ringbuffer.tx_data_host = tx_data_host_; + + host_ctx_.config.num_slots = static_cast(kSharedRingNumSlots); + host_ctx_.config.slot_size = static_cast(kSharedRingSlotSize); + host_ctx_.config.shared_ring_mode = 1; + + host_ctx_.function_table.entries = function_table_dev_; + host_ctx_.function_table.count = 2; + host_ctx_.workers = host_workers_.data(); + host_ctx_.num_workers = host_workers_.size(); + host_ctx_.h_mailbox_bank = h_mailbox_bank_; + host_ctx_.shutdown_flag = host_loop_shutdown_atomic_; + host_ctx_.stats_counter = &host_loop_stats_; + host_ctx_.live_dispatched = host_live_dispatched_; + host_ctx_.idle_mask = host_idle_mask_; + host_ctx_.inflight_slot_tags = host_inflight_slot_tags_; + host_ctx_.skip_stream_sweep = false; + + host_loop_thread_ = + std::thread([this]() { cudaq_host_dispatcher_loop(&host_ctx_); }); + } + + void TearDown() override { + *shutdown_flag_host_ = 1; + __sync_synchronize(); + + if (host_loop_thread_.joinable()) + host_loop_thread_.join(); + + if (device_dispatcher_) { + cudaq_dispatcher_stop(device_dispatcher_); + cudaq_dispatcher_destroy(device_dispatcher_); + device_dispatcher_ = nullptr; + } + if (device_manager_) { + cudaq_dispatch_manager_destroy(device_manager_); + device_manager_ = nullptr; + } + + (void)cudaq_dispatch_kernel_set_shared_ring_mode(0); + + if (host_graph_exec_) + cudaGraphExecDestroy(host_graph_exec_); + if (host_graph_) + cudaGraphDestroy(host_graph_); + for (auto& w : host_workers_) { + if (w.stream) + cudaStreamDestroy(w.stream); + } + host_workers_.clear(); + + delete host_idle_mask_; + delete host_live_dispatched_; + delete[] host_inflight_slot_tags_; + + if (function_table_host_) + cudaFreeHost(function_table_host_); + if (device_loop_stats_) + cudaFree(device_loop_stats_); + if (h_mailbox_bank_) + cudaFreeHost(h_mailbox_bank_); + + free_ring_buffer(rx_flags_host_, rx_data_host_); + if (tx_flags_host_) + cudaFreeHost(const_cast(tx_flags_host_)); + if (tx_data_is_owned_ && tx_data_host_) + cudaFreeHost(tx_data_host_); + + if (shutdown_flag_host_) + cudaFreeHost(const_cast(shutdown_flag_host_)); + } + + void WriteAndSignal(std::size_t slot, std::uint32_t function_id, + std::uint32_t request_id, + const std::vector& payload) { + ASSERT_LT(slot, kSharedRingNumSlots); + ASSERT_LE(payload.size(), + kSharedRingSlotSize - sizeof(cudaq::realtime::RPCHeader)); + std::uint8_t* slot_host = rx_data_host_ + slot * kSharedRingSlotSize; + auto* header = reinterpret_cast(slot_host); + header->magic = cudaq::realtime::RPC_MAGIC_REQUEST; + header->function_id = function_id; + header->arg_len = static_cast(payload.size()); + header->request_id = request_id; + header->ptp_timestamp = 0; + std::memcpy(slot_host + sizeof(cudaq::realtime::RPCHeader), + payload.data(), payload.size()); + __sync_synchronize(); + rx_flags_host_[slot] = reinterpret_cast( + rx_data_dev_ + slot * kSharedRingSlotSize); + } + + bool WaitForResponseInSlot(std::size_t slot, int timeout_ms = 5000) { + std::uint8_t* slot_host = rx_data_host_ + slot * kSharedRingSlotSize; + auto* resp = reinterpret_cast(slot_host); + for (int waited = 0; waited < timeout_ms; ++waited) { + __sync_synchronize(); + if (resp->magic == cudaq::realtime::RPC_MAGIC_RESPONSE) + return true; + usleep(1000); + } + return false; + } + + std::vector ReadResponse(std::size_t slot) { + std::uint8_t* slot_host = rx_data_host_ + slot * kSharedRingSlotSize; + auto* resp = reinterpret_cast(slot_host); + std::vector out(resp->result_len); + std::memcpy(out.data(), + slot_host + sizeof(cudaq::realtime::RPCResponse), + resp->result_len); + return out; + } + + volatile uint64_t* rx_flags_host_ = nullptr; + volatile uint64_t* tx_flags_host_ = nullptr; + volatile uint64_t* rx_flags_dev_ = nullptr; + volatile uint64_t* tx_flags_dev_ = nullptr; + std::uint8_t* rx_data_host_ = nullptr; + std::uint8_t* tx_data_host_ = nullptr; + std::uint8_t* rx_data_dev_ = nullptr; + std::uint8_t* tx_data_dev_ = nullptr; + bool tx_data_is_owned_ = true; + + volatile int* shutdown_flag_host_ = nullptr; + volatile int* shutdown_flag_dev_ = nullptr; + cuda::std::atomic* host_loop_shutdown_atomic_ = nullptr; + cudaq_function_entry_t* function_table_host_ = nullptr; + cudaq_function_entry_t* function_table_dev_ = nullptr; + + cudaq_dispatch_manager_t* device_manager_ = nullptr; + cudaq_dispatcher_t* device_dispatcher_ = nullptr; + uint64_t* device_loop_stats_ = nullptr; + + cudaGraph_t host_graph_ = nullptr; + cudaGraphExec_t host_graph_exec_ = nullptr; + void** h_mailbox_bank_ = nullptr; + void* d_mailbox_bank_void_ = nullptr; + void** d_mailbox_bank_ = nullptr; + std::vector host_workers_; + cuda::std::atomic* host_idle_mask_ = nullptr; + cuda::std::atomic* host_live_dispatched_ = nullptr; + int* host_inflight_slot_tags_ = nullptr; + uint64_t host_loop_stats_ = 0; + cudaq_host_dispatch_loop_ctx_t host_ctx_{}; + std::thread host_loop_thread_; +}; + +TEST_F(SharedRingDispatcherTest, InterleavedHostAndDeviceRequests) { + std::vector p0 = {10, 20, 30, 40}; + std::vector p1 = {3, 5, 7, 9}; + std::vector p2 = {1, 2, 3, 4}; + std::vector p3 = {6, 12, 24, 48}; + + WriteAndSignal(0, HOST_GRAPH_FN_ID, /*request_id=*/100, p0); + WriteAndSignal(1, DEVICE_CALL_FN_ID, /*request_id=*/101, p1); + WriteAndSignal(2, HOST_GRAPH_FN_ID, /*request_id=*/102, p2); + WriteAndSignal(3, DEVICE_CALL_FN_ID, /*request_id=*/103, p3); + + ASSERT_TRUE(WaitForResponseInSlot(0)) << "Slot 0 (HOST_LOOP) timed out"; + ASSERT_TRUE(WaitForResponseInSlot(1)) << "Slot 1 (DEVICE_LOOP) timed out"; + ASSERT_TRUE(WaitForResponseInSlot(2)) << "Slot 2 (HOST_LOOP) timed out"; + ASSERT_TRUE(WaitForResponseInSlot(3)) << "Slot 3 (DEVICE_LOOP) timed out"; + + EXPECT_EQ(ReadResponse(0), (std::vector{11, 21, 31, 41})); + EXPECT_EQ(ReadResponse(1), (std::vector{6, 10, 14, 18})); + EXPECT_EQ(ReadResponse(2), (std::vector{2, 3, 4, 5})); + EXPECT_EQ(ReadResponse(3), (std::vector{12, 24, 48, 96})); + + *shutdown_flag_host_ = 1; + __sync_synchronize(); + if (host_loop_thread_.joinable()) + host_loop_thread_.join(); + cudaq_dispatcher_stop(device_dispatcher_); + + uint64_t dev_count = 0; + CUDA_CHECK(cudaMemcpy(&dev_count, device_loop_stats_, sizeof(uint64_t), + cudaMemcpyDeviceToHost)); + EXPECT_EQ(dev_count, 2u); + EXPECT_EQ(host_loop_stats_, 2u); + + cudaq_dispatcher_destroy(device_dispatcher_); + device_dispatcher_ = nullptr; +} + } // namespace diff --git a/realtime/unittests/test_shared_ring_dispatchers.cu b/realtime/unittests/test_shared_ring_dispatchers.cu deleted file mode 100644 index 8d7821e9590..00000000000 --- a/realtime/unittests/test_shared_ring_dispatchers.cu +++ /dev/null @@ -1,623 +0,0 @@ -/****************************************************************-*- C++ -*-**** - * Copyright (c) 2026 NVIDIA Corporation & Affiliates. * - * All rights reserved. * - * * - * This source code and the accompanying materials are made available under * - * the terms of the Apache License 2.0 which accompanies this distribution. * - ******************************************************************************/ - -// Shared-ring-mode test: brings up BOTH a HOST_LOOP CPU dispatcher AND a -// DEVICE_LOOP persistent GPU dispatcher on the SAME ring buffer, with -// shared_ring_mode = 1 on both. The HOST_LOOP owns one function_id (a -// GRAPH_LAUNCH entry) and the DEVICE_LOOP owns a different function_id (a -// DEVICE_CALL entry). Producer interleaves requests for the two function_ids -// across slots; the test verifies that each dispatcher services its OWN -// requests and SKIPS the peer's slots without clobbering rx_flags. - -#include -#include -#include -#include -#include -#include -#include -#include - -#include "cudaq/realtime/daemon/dispatcher/cudaq_realtime.h" -#include "cudaq/realtime/daemon/dispatcher/dispatch_kernel_launch.h" -#include "cudaq/realtime/daemon/dispatcher/host_dispatcher.h" - -#define CUDA_CHECK(call) \ - do { \ - cudaError_t err = call; \ - ASSERT_EQ(err, cudaSuccess) << "CUDA error: " << cudaGetErrorString(err); \ - } while (0) - -namespace { - -constexpr std::size_t kNumSlots = 8; -constexpr std::size_t kSlotSize = 256; - -// function_id for the GRAPH_LAUNCH entry owned by the HOST_LOOP dispatcher. -constexpr std::uint32_t HOST_GRAPH_FN_ID = - cudaq::realtime::fnv1a_hash("shared_ring_host_increment"); - -// function_id for the DEVICE_CALL entry owned by the DEVICE_LOOP dispatcher. -constexpr std::uint32_t DEVICE_CALL_FN_ID = - cudaq::realtime::fnv1a_hash("shared_ring_device_double"); - -//============================================================================== -// Ring buffer / control buffer helpers -//============================================================================== - -bool allocate_ring_buffer(std::size_t num_slots, std::size_t slot_size, - volatile uint64_t** host_flags_out, - volatile uint64_t** device_flags_out, - std::uint8_t** host_data_out, - std::uint8_t** device_data_out) { - void* host_flags_ptr = nullptr; - if (cudaHostAlloc(&host_flags_ptr, num_slots * sizeof(uint64_t), - cudaHostAllocMapped) != cudaSuccess) - return false; - - void* device_flags_ptr = nullptr; - if (cudaHostGetDevicePointer(&device_flags_ptr, host_flags_ptr, 0) != - cudaSuccess) { - cudaFreeHost(host_flags_ptr); - return false; - } - - void* host_data_ptr = nullptr; - if (cudaHostAlloc(&host_data_ptr, num_slots * slot_size, - cudaHostAllocMapped) != cudaSuccess) { - cudaFreeHost(host_flags_ptr); - return false; - } - - void* device_data_ptr = nullptr; - if (cudaHostGetDevicePointer(&device_data_ptr, host_data_ptr, 0) != - cudaSuccess) { - cudaFreeHost(host_flags_ptr); - cudaFreeHost(host_data_ptr); - return false; - } - - std::memset(host_flags_ptr, 0, num_slots * sizeof(uint64_t)); - std::memset(host_data_ptr, 0, num_slots * slot_size); - - *host_flags_out = static_cast(host_flags_ptr); - *device_flags_out = static_cast(device_flags_ptr); - *host_data_out = static_cast(host_data_ptr); - *device_data_out = static_cast(device_data_ptr); - return true; -} - -void free_ring_buffer(volatile uint64_t* host_flags, std::uint8_t* host_data) { - if (host_flags) - cudaFreeHost(const_cast(host_flags)); - if (host_data) - cudaFreeHost(host_data); -} - -//============================================================================== -// HOST_LOOP graph kernel: reads RPC slot via mailbox, writes incremented bytes -// to the TX slot. The dispatcher fills a GraphIOContext per-launch via its -// io_ctxs path; we use the simpler "mailbox holds raw RX slot pointer" mode -// and write the response in-place (legacy single-buffer mode is fine since -// we wire rx_data and tx_data to the SAME backing memory). -//============================================================================== - -__global__ void host_graph_increment_kernel(void** mailbox_slot_ptr) { - if (threadIdx.x == 0 && blockIdx.x == 0) { - void* buffer = *mailbox_slot_ptr; - cudaq::realtime::RPCHeader* header = - static_cast(buffer); - std::uint32_t arg_len = header->arg_len; - std::uint32_t request_id = header->request_id; - std::uint8_t* data = static_cast(buffer) + - sizeof(cudaq::realtime::RPCHeader); - for (std::uint32_t i = 0; i < arg_len; ++i) - data[i] = data[i] + 1; - cudaq::realtime::RPCResponse* response = - static_cast(buffer); - response->magic = cudaq::realtime::RPC_MAGIC_RESPONSE; - response->status = 0; - response->result_len = arg_len; - response->request_id = request_id; - } -} - -bool create_host_graph(void** d_mailbox_bank, cudaGraph_t* graph_out, - cudaGraphExec_t* exec_out) { - cudaGraph_t graph = nullptr; - if (cudaGraphCreate(&graph, 0) != cudaSuccess) - return false; - - cudaKernelNodeParams params = {}; - void* kernel_args[] = {&d_mailbox_bank}; - params.func = reinterpret_cast(host_graph_increment_kernel); - params.gridDim = dim3(1, 1, 1); - params.blockDim = dim3(32, 1, 1); - params.sharedMemBytes = 0; - params.kernelParams = kernel_args; - params.extra = nullptr; - - cudaGraphNode_t node = nullptr; - if (cudaGraphAddKernelNode(&node, graph, nullptr, 0, ¶ms) != - cudaSuccess) { - cudaGraphDestroy(graph); - return false; - } - - cudaGraphExec_t exec = nullptr; - if (cudaGraphInstantiate(&exec, graph, nullptr, nullptr, 0) != cudaSuccess) { - cudaGraphDestroy(graph); - return false; - } - - *graph_out = graph; - *exec_out = exec; - return true; -} - -//============================================================================== -// DEVICE_LOOP device-call handler: doubles each byte. -//============================================================================== - -__device__ int device_double_handler(const void* input, void* output, - std::uint32_t arg_len, - std::uint32_t max_result_len, - std::uint32_t* result_len) { - const std::uint8_t* in = static_cast(input); - std::uint8_t* out = static_cast(output); - std::uint32_t n = arg_len; - if (n > max_result_len) - n = max_result_len; - for (std::uint32_t i = 0; i < n; ++i) - out[i] = static_cast(in[i] * 2); - *result_len = n; - return 0; -} - -// Populate the device function table: -// entry 0: GRAPH_LAUNCH owned by HOST_LOOP (handler.graph_exec set below) -// entry 1: DEVICE_CALL owned by DEVICE_LOOP (handler.device_fn_ptr = -// device_double_handler) -// -// Both dispatchers share the SAME function table. HOST_LOOP iterates and -// only routes GRAPH_LAUNCH entries; DEVICE_LOOP only routes DEVICE_CALL -// entries. Under shared_ring_mode this means each peer naturally skips the -// other's slots. -__global__ void init_shared_function_table(cudaq_function_entry_t* entries, - cudaGraphExec_t host_graph_exec) { - if (threadIdx.x == 0 && blockIdx.x == 0) { - entries[0].handler.graph_exec = host_graph_exec; - entries[0].function_id = HOST_GRAPH_FN_ID; - entries[0].dispatch_mode = CUDAQ_DISPATCH_GRAPH_LAUNCH; - entries[0].reserved[0] = 0; - entries[0].reserved[1] = 0; - entries[0].reserved[2] = 0; - - entries[1].handler.device_fn_ptr = - reinterpret_cast(&device_double_handler); - entries[1].function_id = DEVICE_CALL_FN_ID; - entries[1].dispatch_mode = CUDAQ_DISPATCH_DEVICE_CALL; - entries[1].reserved[0] = 0; - entries[1].reserved[1] = 0; - entries[1].reserved[2] = 0; - } -} - -//============================================================================== -// Test fixture -//============================================================================== - -class SharedRingDispatcherTest : public ::testing::Test { -protected: - void SetUp() override { - // -- Ring buffer. RX and TX share the same backing memory: this is - // valid for the HOST_LOOP's "legacy mailbox" path (graph kernel - // writes response in-place into the RX slot), and the DEVICE_LOOP - // kernel will then read its own request and overwrite it with the - // response in the same slot. TX flags are a separate allocation. -- - ASSERT_TRUE(allocate_ring_buffer(kNumSlots, kSlotSize, &rx_flags_host_, - &rx_flags_dev_, &rx_data_host_, - &rx_data_dev_)); - void* tx_flags_host_ptr = nullptr; - CUDA_CHECK(cudaHostAlloc(&tx_flags_host_ptr, kNumSlots * sizeof(uint64_t), - cudaHostAllocMapped)); - std::memset(tx_flags_host_ptr, 0, kNumSlots * sizeof(uint64_t)); - tx_flags_host_ = static_cast(tx_flags_host_ptr); - void* tx_flags_dev_ptr = nullptr; - CUDA_CHECK(cudaHostGetDevicePointer(&tx_flags_dev_ptr, tx_flags_host_ptr, - 0)); - tx_flags_dev_ = static_cast(tx_flags_dev_ptr); - // RX and TX data buffers point to the SAME backing memory. - tx_data_host_ = rx_data_host_; - tx_data_dev_ = rx_data_dev_; - tx_data_is_owned_ = false; - - // -- Shutdown flag (pinned mapped so both CPU and GPU see it) -- - void* tmp = nullptr; - CUDA_CHECK(cudaHostAlloc(&tmp, sizeof(int), cudaHostAllocMapped)); - shutdown_flag_host_ = static_cast(tmp); - *shutdown_flag_host_ = 0; - void* tmp_dev = nullptr; - CUDA_CHECK(cudaHostGetDevicePointer(&tmp_dev, tmp, 0)); - shutdown_flag_dev_ = static_cast(tmp_dev); - - // The HOST_LOOP wants a cuda::std::atomic* opaque shutdown flag, - // backed by the SAME pinned memory so both dispatchers stop together - // when *shutdown_flag_host_ = 1. - host_loop_shutdown_atomic_ = - reinterpret_cast*>( - const_cast(shutdown_flag_host_)); - - // -- Stats -- - CUDA_CHECK(cudaMalloc(&device_loop_stats_, sizeof(uint64_t))); - CUDA_CHECK(cudaMemset(device_loop_stats_, 0, sizeof(uint64_t))); - - // -- Function table (shared by both dispatchers). Must be CPU-readable - // so the HOST_LOOP loop can do lookup_function() on it, and - // GPU-readable so the DEVICE_LOOP kernel can do the same. Pinned - // mapped allocation gives us both views with the same address - // under UVA. -- - void* fn_table_host_ptr = nullptr; - CUDA_CHECK(cudaHostAlloc(&fn_table_host_ptr, - 2 * sizeof(cudaq_function_entry_t), - cudaHostAllocMapped)); - function_table_host_ = - static_cast(fn_table_host_ptr); - void* fn_table_dev_ptr = nullptr; - CUDA_CHECK(cudaHostGetDevicePointer(&fn_table_dev_ptr, - fn_table_host_ptr, 0)); - function_table_dev_ = - static_cast(fn_table_dev_ptr); - std::memset(function_table_host_, 0, - 2 * sizeof(cudaq_function_entry_t)); - - // -- HOST_LOOP graph (created BEFORE init_shared_function_table) -- - CUDA_CHECK(cudaHostAlloc(&h_mailbox_bank_, sizeof(void*), - cudaHostAllocMapped)); - h_mailbox_bank_[0] = nullptr; - CUDA_CHECK(cudaHostGetDevicePointer(&d_mailbox_bank_void_, - h_mailbox_bank_, 0)); - d_mailbox_bank_ = static_cast(d_mailbox_bank_void_); - ASSERT_TRUE( - create_host_graph(d_mailbox_bank_, &host_graph_, &host_graph_exec_)); - - // -- Initialize function table on GPU -- - init_shared_function_table<<<1, 1>>>(function_table_dev_, - host_graph_exec_); - CUDA_CHECK(cudaDeviceSynchronize()); - - // -- DEVICE_LOOP: push shared_ring_mode into the kernel's __constant__ - // BEFORE starting the dispatcher. This is the caller's - // responsibility (libcudaq-realtime.so cannot reach into the - // static lib's __constant__ symbol). -- - CUDA_CHECK(cudaq_dispatch_kernel_set_shared_ring_mode(1)); - - // -- DEVICE_LOOP: dispatcher via the C API -- - ASSERT_EQ(cudaq_dispatch_manager_create(&device_manager_), CUDAQ_OK); - - cudaq_dispatcher_config_t device_config{}; - device_config.device_id = 0; - device_config.num_blocks = 1; - device_config.threads_per_block = 64; - device_config.num_slots = static_cast(kNumSlots); - device_config.slot_size = static_cast(kSlotSize); - device_config.vp_id = 0; - device_config.kernel_type = CUDAQ_KERNEL_REGULAR; - device_config.dispatch_mode = CUDAQ_DISPATCH_DEVICE_CALL; - device_config.dispatch_path = CUDAQ_DISPATCH_PATH_DEVICE; - device_config.shared_ring_mode = 1; - ASSERT_EQ(cudaq_dispatcher_create(device_manager_, &device_config, - &device_dispatcher_), - CUDAQ_OK); - - cudaq_ringbuffer_t device_rb{}; - device_rb.rx_flags = rx_flags_dev_; - device_rb.tx_flags = tx_flags_dev_; - device_rb.rx_data = rx_data_dev_; - device_rb.tx_data = tx_data_dev_; - device_rb.rx_stride_sz = kSlotSize; - device_rb.tx_stride_sz = kSlotSize; - ASSERT_EQ(cudaq_dispatcher_set_ringbuffer(device_dispatcher_, &device_rb), - CUDAQ_OK); - - cudaq_function_table_t shared_table{}; - shared_table.entries = function_table_dev_; - shared_table.count = 2; - ASSERT_EQ(cudaq_dispatcher_set_function_table(device_dispatcher_, - &shared_table), - CUDAQ_OK); - - ASSERT_EQ(cudaq_dispatcher_set_control(device_dispatcher_, - shutdown_flag_dev_, - device_loop_stats_), - CUDAQ_OK); - - ASSERT_EQ(cudaq_dispatcher_set_launch_fn( - device_dispatcher_, &cudaq_launch_dispatch_kernel_regular), - CUDAQ_OK); - - ASSERT_EQ(cudaq_dispatcher_start(device_dispatcher_), CUDAQ_OK); - - // -- HOST_LOOP: build cudaq_host_dispatch_loop_ctx_t directly and run - // on a worker thread. Using the lower-level entry point gives us - // explicit control over shared_ring_mode behavior under test. -- - host_workers_.push_back(cudaq_host_dispatch_worker_t{}); - cudaStream_t host_stream = nullptr; - CUDA_CHECK(cudaStreamCreate(&host_stream)); - host_workers_[0].graph_exec = host_graph_exec_; - host_workers_[0].stream = host_stream; - host_workers_[0].function_id = HOST_GRAPH_FN_ID; - host_workers_[0].pre_launch_fn = nullptr; - host_workers_[0].pre_launch_data = nullptr; - host_workers_[0].post_launch_fn = nullptr; - host_workers_[0].post_launch_data = nullptr; - - host_idle_mask_ = - new cuda::std::atomic(1ULL); // 1 worker, initially idle - host_live_dispatched_ = new cuda::std::atomic(0); - host_inflight_slot_tags_ = new int[host_workers_.size()]; - for (size_t i = 0; i < host_workers_.size(); ++i) - host_inflight_slot_tags_[i] = -1; - - std::memset(&host_ctx_, 0, sizeof(host_ctx_)); - host_ctx_.ringbuffer.rx_flags = rx_flags_dev_; - host_ctx_.ringbuffer.tx_flags = tx_flags_dev_; - host_ctx_.ringbuffer.rx_data = rx_data_dev_; - host_ctx_.ringbuffer.tx_data = tx_data_dev_; - host_ctx_.ringbuffer.rx_stride_sz = kSlotSize; - host_ctx_.ringbuffer.tx_stride_sz = kSlotSize; - host_ctx_.ringbuffer.rx_flags_host = rx_flags_host_; - host_ctx_.ringbuffer.tx_flags_host = tx_flags_host_; - host_ctx_.ringbuffer.rx_data_host = rx_data_host_; - host_ctx_.ringbuffer.tx_data_host = tx_data_host_; - - host_ctx_.config.num_slots = static_cast(kNumSlots); - host_ctx_.config.slot_size = static_cast(kSlotSize); - host_ctx_.config.shared_ring_mode = 1; - - // The HOST_LOOP function table has BOTH entries; the loop will only act - // on the GRAPH_LAUNCH entry (entry 0). The DEVICE_CALL entry is "in our - // table but not our mode" -- under shared_ring_mode we skip-without-clear - // such slots. This is the realistic configuration in cuda-qx. - host_ctx_.function_table.entries = function_table_dev_; - host_ctx_.function_table.count = 2; - - host_ctx_.workers = host_workers_.data(); - host_ctx_.num_workers = host_workers_.size(); - host_ctx_.h_mailbox_bank = h_mailbox_bank_; - host_ctx_.shutdown_flag = host_loop_shutdown_atomic_; - host_ctx_.stats_counter = &host_loop_stats_; - host_ctx_.live_dispatched = host_live_dispatched_; - host_ctx_.idle_mask = host_idle_mask_; - host_ctx_.inflight_slot_tags = host_inflight_slot_tags_; - host_ctx_.io_ctxs_host = nullptr; - host_ctx_.io_ctxs_dev = nullptr; - host_ctx_.skip_stream_sweep = false; - - host_loop_thread_ = std::thread([this]() { - cudaq_host_dispatcher_loop(&host_ctx_); - }); - } - - void TearDown() override { - // Stop both dispatchers. - *shutdown_flag_host_ = 1; - __sync_synchronize(); - - if (host_loop_thread_.joinable()) - host_loop_thread_.join(); - - if (device_dispatcher_) { - cudaq_dispatcher_stop(device_dispatcher_); - cudaq_dispatcher_destroy(device_dispatcher_); - device_dispatcher_ = nullptr; - } - if (device_manager_) { - cudaq_dispatch_manager_destroy(device_manager_); - device_manager_ = nullptr; - } - - // Restore the dispatch kernel's __constant__ to 0 so we don't affect - // subsequent tests in the same binary. - (void)cudaq_dispatch_kernel_set_shared_ring_mode(0); - - if (host_graph_exec_) - cudaGraphExecDestroy(host_graph_exec_); - if (host_graph_) - cudaGraphDestroy(host_graph_); - for (auto& w : host_workers_) { - if (w.stream) - cudaStreamDestroy(w.stream); - } - host_workers_.clear(); - - delete host_idle_mask_; - delete host_live_dispatched_; - delete[] host_inflight_slot_tags_; - - if (function_table_host_) - cudaFreeHost(function_table_host_); - if (device_loop_stats_) - cudaFree(device_loop_stats_); - if (h_mailbox_bank_) - cudaFreeHost(h_mailbox_bank_); - - free_ring_buffer(rx_flags_host_, rx_data_host_); - if (tx_flags_host_) - cudaFreeHost(const_cast(tx_flags_host_)); - if (tx_data_is_owned_ && tx_data_host_) - cudaFreeHost(tx_data_host_); - - if (shutdown_flag_host_) - cudaFreeHost(const_cast(shutdown_flag_host_)); - } - - // Write an RPC request into a slot and signal it by storing the slot - // address into rx_flags. Producer-side equivalent of what the cuda-qx - // rpc_producer will do under shared_ring_mode. - void WriteAndSignal(std::size_t slot, std::uint32_t function_id, - std::uint32_t request_id, - const std::vector& payload) { - ASSERT_LT(slot, kNumSlots); - ASSERT_LE(payload.size(), - kSlotSize - sizeof(cudaq::realtime::RPCHeader)); - std::uint8_t* slot_host = rx_data_host_ + slot * kSlotSize; - auto* header = reinterpret_cast(slot_host); - header->magic = cudaq::realtime::RPC_MAGIC_REQUEST; - header->function_id = function_id; - header->arg_len = static_cast(payload.size()); - header->request_id = request_id; - header->ptp_timestamp = 0; - std::memcpy(slot_host + sizeof(cudaq::realtime::RPCHeader), - payload.data(), payload.size()); - __sync_synchronize(); - // Producer publishes the slot by writing the device-visible RX address - // (the convention is that rx_flags[i] holds the address of the RX slot - // data; both dispatchers use it as the request pointer). - rx_flags_host_[slot] = - reinterpret_cast(rx_data_dev_ + slot * kSlotSize); - } - - // Wait for the response magic to appear in the data slot. Both - // dispatchers write RPCResponse (magic = CUDAQ_RPC_MAGIC_RESPONSE) into - // their tx_slot, which under our aliased rx/tx setup is the same memory - // we wrote the request to. This is more robust than polling tx_flags, - // since the legacy HOST_LOOP mailbox path can leave tx_flags stuck at - // CUDAQ_TX_FLAG_IN_FLIGHT until the worker is recycled. - bool WaitForResponseInSlot(std::size_t slot, int timeout_ms = 5000) { - std::uint8_t* slot_host = rx_data_host_ + slot * kSlotSize; - auto* resp = reinterpret_cast(slot_host); - for (int waited = 0; waited < timeout_ms; ++waited) { - __sync_synchronize(); - if (resp->magic == cudaq::realtime::RPC_MAGIC_RESPONSE) - return true; - usleep(1000); - } - return false; - } - - std::vector ReadResponse(std::size_t slot) { - std::vector out; - std::uint8_t* slot_host = rx_data_host_ + slot * kSlotSize; - auto* resp = reinterpret_cast(slot_host); - if (resp->magic == cudaq::realtime::RPC_MAGIC_RESPONSE) { - out.resize(resp->result_len); - std::memcpy(out.data(), - slot_host + sizeof(cudaq::realtime::RPCResponse), - resp->result_len); - } - return out; - } - - // -- Ring buffer (shared by both dispatchers) -- - volatile uint64_t* rx_flags_host_ = nullptr; - volatile uint64_t* tx_flags_host_ = nullptr; - volatile uint64_t* rx_flags_dev_ = nullptr; - volatile uint64_t* tx_flags_dev_ = nullptr; - std::uint8_t* rx_data_host_ = nullptr; - std::uint8_t* tx_data_host_ = nullptr; - std::uint8_t* rx_data_dev_ = nullptr; - std::uint8_t* tx_data_dev_ = nullptr; - bool tx_data_is_owned_ = true; // false when tx aliases rx - - // -- Shared shutdown + function table -- - volatile int* shutdown_flag_host_ = nullptr; - volatile int* shutdown_flag_dev_ = nullptr; - cuda::std::atomic* host_loop_shutdown_atomic_ = nullptr; - cudaq_function_entry_t* function_table_host_ = nullptr; - cudaq_function_entry_t* function_table_dev_ = nullptr; - - // -- DEVICE_LOOP dispatcher -- - cudaq_dispatch_manager_t* device_manager_ = nullptr; - cudaq_dispatcher_t* device_dispatcher_ = nullptr; - uint64_t* device_loop_stats_ = nullptr; - - // -- HOST_LOOP dispatcher -- - cudaGraph_t host_graph_ = nullptr; - cudaGraphExec_t host_graph_exec_ = nullptr; - void** h_mailbox_bank_ = nullptr; - void* d_mailbox_bank_void_ = nullptr; - void** d_mailbox_bank_ = nullptr; - std::vector host_workers_; - cuda::std::atomic* host_idle_mask_ = nullptr; - cuda::std::atomic* host_live_dispatched_ = nullptr; - int* host_inflight_slot_tags_ = nullptr; - uint64_t host_loop_stats_ = 0; - cudaq_host_dispatch_loop_ctx_t host_ctx_{}; - std::thread host_loop_thread_; -}; - -//============================================================================== -// Test: interleaved requests, both dispatchers running, shared_ring_mode = 1 -//============================================================================== - -TEST_F(SharedRingDispatcherTest, InterleavedHostAndDeviceRequests) { - // Slot 0: HOST_LOOP (increment by 1) - // Slot 1: DEVICE_LOOP (double) - // Slot 2: HOST_LOOP - // Slot 3: DEVICE_LOOP - std::vector p0 = {10, 20, 30, 40}; - std::vector p1 = {3, 5, 7, 9}; - std::vector p2 = {1, 2, 3, 4}; - std::vector p3 = {6, 12, 24, 48}; - - WriteAndSignal(0, HOST_GRAPH_FN_ID, /*request_id=*/100, p0); - WriteAndSignal(1, DEVICE_CALL_FN_ID, /*request_id=*/101, p1); - WriteAndSignal(2, HOST_GRAPH_FN_ID, /*request_id=*/102, p2); - WriteAndSignal(3, DEVICE_CALL_FN_ID, /*request_id=*/103, p3); - - ASSERT_TRUE(WaitForResponseInSlot(0)) << "Slot 0 (HOST_LOOP) timed out"; - ASSERT_TRUE(WaitForResponseInSlot(1)) << "Slot 1 (DEVICE_LOOP) timed out"; - ASSERT_TRUE(WaitForResponseInSlot(2)) << "Slot 2 (HOST_LOOP) timed out"; - ASSERT_TRUE(WaitForResponseInSlot(3)) << "Slot 3 (DEVICE_LOOP) timed out"; - - std::vector r0 = ReadResponse(0); - std::vector r1 = ReadResponse(1); - std::vector r2 = ReadResponse(2); - std::vector r3 = ReadResponse(3); - - std::vector e0 = {11, 21, 31, 41}; // p0 + 1 - std::vector e1 = {6, 10, 14, 18}; // p1 * 2 - std::vector e2 = {2, 3, 4, 5}; // p2 + 1 - std::vector e3 = {12, 24, 48, 96}; // p3 * 2 - - EXPECT_EQ(r0, e0) << "HOST_LOOP slot 0"; - EXPECT_EQ(r1, e1) << "DEVICE_LOOP slot 1"; - EXPECT_EQ(r2, e2) << "HOST_LOOP slot 2"; - EXPECT_EQ(r3, e3) << "DEVICE_LOOP slot 3"; - - // Stats sanity: each dispatcher should have processed exactly two slots - // (the two slots whose function_id matches its own table entry mode). - // The DEVICE_LOOP stats are written when the kernel exits; we shut it - // down explicitly below to flush. - *shutdown_flag_host_ = 1; - __sync_synchronize(); - if (host_loop_thread_.joinable()) - host_loop_thread_.join(); - // Stopping the DEVICE_LOOP triggers cudaStreamSynchronize on the - // dispatcher's stream, which flushes the kernel's final atomicAdd into - // device_loop_stats_. - cudaq_dispatcher_stop(device_dispatcher_); - - uint64_t dev_count = 0; - CUDA_CHECK(cudaMemcpy(&dev_count, device_loop_stats_, sizeof(uint64_t), - cudaMemcpyDeviceToHost)); - EXPECT_EQ(dev_count, 2u) - << "DEVICE_LOOP should have processed exactly 2 slots, got " - << dev_count; - EXPECT_EQ(host_loop_stats_, 2u) - << "HOST_LOOP should have processed exactly 2 slots, got " - << host_loop_stats_; - - // Mark device dispatcher as already stopped so TearDown doesn't double - // stop / destroy. - cudaq_dispatcher_destroy(device_dispatcher_); - device_dispatcher_ = nullptr; -} - -} // namespace From aacd4804e89b248ba852e6b36c46c2a63a3d35cb Mon Sep 17 00:00:00 2001 From: Chuck Ketcham Date: Wed, 10 Jun 2026 17:54:16 +0000 Subject: [PATCH 7/7] clang format Signed-off-by: Chuck Ketcham --- .../cudaq/realtime/daemon/dispatcher/cudaq_realtime.h | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h b/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h index 35c9e427cda..f0b63367d44 100644 --- a/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h +++ b/realtime/include/cudaq/realtime/daemon/dispatcher/cudaq_realtime.h @@ -14,8 +14,10 @@ #include "cudaq/realtime/daemon/dispatcher/rpc_wire_format.h" -// Visibility marker for entry points that consumers reach via dlsym(RTLD_DEFAULT, -// ...) at runtime. libcudaq-realtime-dispatch.a is built with hidden visibility +// Visibility marker for entry points that consumers reach via +// dlsym(RTLD_DEFAULT, +// ...) at runtime. libcudaq-realtime-dispatch.a is built with hidden +// visibility // + -Wl,--exclude-libs=ALL, so by default its symbols stay hidden inside the // final binary even when the archive is absorbed. Marking individual symbols // with default visibility opts them back into the binary's dynamic symbol table