From b606727820046544cb75968e8f25aea18abefdc0 Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Tue, 19 May 2026 16:31:52 -0500 Subject: [PATCH 01/12] Replace signal-based daemon IPC with UNIX socketpair The Ruby driver coordinated with its child daemons (sync_daemon_mpi, sync_daemon_fs, thapi_sampling_daemon) via POSIX real-time signals (SIGRTMIN+0..3) and a busy-spin on a Signal.trap-toggled @@ready flag. That flag was shared across SyncDaemon and SamplingDaemon, which was a latent race. Switch to one SOCK_SEQPACKET UNIX socketpair per daemon. The parent sends short ASCII tokens (INIT, LOCAL_BARRIER, GLOBAL_BARRIER, FINISH) and blocks on recvmsg for the daemon's READY reply. No signal handlers, no busy waits, no shared state. The argv contract for the daemon binaries changes: they now take a single argument (parent_pid is gone). These are internal binaries shipped under BINDIR, but worth noting. sync_daemon_mpi.c is renamed to .cpp so MPI session setup and the message loop share the same C++ idioms (string_view dispatch); the build switches to MPICXX, and configure.ac wraps AX_MPI/AX_MPI_SESSION in AC_LANG_PUSH([C++]) so MPICXX is set. Co-Authored-By: Claude Opus 4.7 --- configure.ac | 2 + sampling/thapi_sampling_daemon.cpp | 84 ++++++++----- xprof/Makefile.am | 6 +- xprof/sync_daemon_fs | 67 ++++++----- ...{sync_daemon_mpi.c => sync_daemon_mpi.cpp} | 96 ++++++++------- xprof/xprof.rb.in | 113 ++++++++++-------- 6 files changed, 214 insertions(+), 154 deletions(-) rename xprof/{sync_daemon_mpi.c => sync_daemon_mpi.cpp} (68%) diff --git a/configure.ac b/configure.ac index 089c4beef..4267c45ac 100644 --- a/configure.ac +++ b/configure.ac @@ -68,10 +68,12 @@ AC_ARG_ENABLE([sync-daemon-mpi], [enable_sync_daemon_mpi="$enableval" user_set_sync_daemon_mpi="yes_$enableval"], [enable_sync_daemon_mpi="yes" user_set_sync_daemon_mpi="no"]) AC_SUBST([user_set_sync_daemon_mpi]) +AC_LANG_PUSH([C++]) AS_IF([test "x$enable_sync_daemon_mpi" != xno], [AX_MPI([enable_sync_daemon_mpi=yes], [enable_sync_daemon_mpi=no])]) AS_IF([test "x$enable_sync_daemon_mpi" != xno], [AX_MPI_SESSION([enable_sync_daemon_mpi=yes], [enable_sync_daemon_mpi=no])]) +AC_LANG_POP([C++]) AS_IF([test "x$user_set_sync_daemon_mpi" = "xyes_yes" && test "x$enable_sync_daemon_mpi" = "xno"], [AC_MSG_ERROR([sync-daemon with MPI requested but MPI Sessions are not supported!])]) AS_IF([test "x$user_set_sync_daemon_mpi" != "xyes_no" && test "x$enable_sync_daemon_mpi" = "xno"], diff --git a/sampling/thapi_sampling_daemon.cpp b/sampling/thapi_sampling_daemon.cpp index 77f2dec9a..2154ed9f3 100644 --- a/sampling/thapi_sampling_daemon.cpp +++ b/sampling/thapi_sampling_daemon.cpp @@ -1,22 +1,53 @@ -#include #include #include #include +#include +#include +#include #include -#define RT_SIGNAL_READY (SIGRTMIN) -#define RT_SIGNAL_FINISH (SIGRTMIN + 3) -typedef void (*plugin_initialize_func)(void); -typedef void (*plugin_finalize_func)(void); +using namespace std::string_view_literals; -int main(int argc, char **argv) { +constexpr auto MSG_INIT = "INIT"sv; +constexpr auto MSG_FINISH = "FINISH"sv; +constexpr auto MSG_READY = "READY"sv; + +using plugin_initialize_func = void (*)(); +using plugin_finalize_func = void (*)(); - // Setup signaling, to exit the sampling loop - int parent_pid = std::atoi(argv[1]); - sigset_t signal_set; - sigemptyset(&signal_set); - sigaddset(&signal_set, RT_SIGNAL_FINISH); - sigprocmask(SIG_BLOCK, &signal_set, NULL); +static int recv_expect(const int fd, const std::string_view want) { + char buf[64]; + const ssize_t n = read(fd, buf, sizeof(buf)); + if (n < 0) { + std::perror("thapi_sampling_daemon: read"); + return -1; + } + if (n == 0) { + std::cerr << "thapi_sampling_daemon: parent closed socket unexpectedly" << std::endl; + return -1; + } + const std::string_view got(buf, n); + if (got != want) { + std::cerr << "thapi_sampling_daemon: expected " << want << ", got " << got << std::endl; + return -1; + } + return 0; +} + +static int send_msg(const int fd, const std::string_view msg) { + if (write(fd, msg.data(), msg.size()) < 0) { + std::perror("thapi_sampling_daemon: write"); + return -1; + } + return 0; +} + +int main(int argc, char **argv) { + if (argc < 2) { + std::cerr << "usage: " << argv[0] << " [plugin.so ...]" << std::endl; + return 1; + } + const int fd = std::atoi(argv[1]); // DL Open struct Plugin { @@ -33,30 +64,23 @@ int main(int argc, char **argv) { std::cerr << "Failed to load " << argv[i] << ": " << dlerror() << std::endl; continue; } - plugin_initialize_func init_func = - reinterpret_cast(dlsym(handle, "thapi_initialize_sampling_plugin")); - - plugin_finalize_func fini_func = - reinterpret_cast(dlsym(handle, "thapi_finalize_sampling_plugin")); - + auto init_func = reinterpret_cast( + dlsym(handle, "thapi_initialize_sampling_plugin")); + auto fini_func = reinterpret_cast( + dlsym(handle, "thapi_finalize_sampling_plugin")); plugins.push_back({handle, init_func, fini_func}); } // User pluging - for (const auto &plugin : plugins) { + for (const auto &plugin : plugins) plugin.initialize(); - } - // Signal Ready to manager - kill(parent_pid, RT_SIGNAL_READY); + // Handshake: parent → INIT, daemon → READY + if (recv_expect(fd, MSG_INIT) < 0) return 1; + if (send_msg(fd, MSG_READY) < 0) return 1; - // Wait for to finish - while (true) { - int signum; - sigwait(&signal_set, &signum); - if (signum == RT_SIGNAL_FINISH) - break; - } + // Wait for shutdown: parent → FINISH + if (recv_expect(fd, MSG_FINISH) < 0) return 1; // Finalization for (const auto &plugin : plugins) { @@ -65,6 +89,8 @@ int main(int argc, char **argv) { dlclose(plugin.handle); } + if (send_msg(fd, MSG_READY) < 0) return 1; + close(fd); // Will call the destructor, who will finalize all the not unregistered plugin return 0; } diff --git a/xprof/Makefile.am b/xprof/Makefile.am index d50dcec89..bc9af57f9 100644 --- a/xprof/Makefile.am +++ b/xprof/Makefile.am @@ -14,8 +14,8 @@ if FOUND_MPI bin_SCRIPTS += sync_daemon_mpi endif -sync_daemon_mpi: $(srcdir)/sync_daemon_mpi.c - $(MPICC) $(CFLAGS) $< -o $@ +sync_daemon_mpi: $(srcdir)/sync_daemon_mpi.cpp + $(MPICXX) $(CXXFLAGS) -std=c++17 $(WERROR) -Wall -Wextra $< -o $@ iprof: xprof.rb cp xprof.rb $@ @@ -220,7 +220,7 @@ EXTRA_DIST = \ $(TRACE_COMMON)\ $(TRACE_OUT) \ perfetto_pruned.proto \ - sync_daemon_mpi.c \ + sync_daemon_mpi.cpp \ sync_daemon_fs CLEANFILES = \ diff --git a/xprof/sync_daemon_fs b/xprof/sync_daemon_fs index 51e080c43..9932e7b6d 100755 --- a/xprof/sync_daemon_fs +++ b/xprof/sync_daemon_fs @@ -1,13 +1,14 @@ #!/usr/bin/env ruby # Cannot use require_relative as iprof is not a rb file -# Load MPITopo and RT_SIGNAL_* +# Load MPITopo and related helpers load(File.join(__dir__, 'iprof')) require 'open3' require 'fileutils' require 'securerandom' require 'etc' +require 'socket' FOLDER_JOBID = File.join('.thapi_lock', MPITopo.job_id) SHARED_LOCAL_FILESYSTEM = File.join('/', 'dev', 'shm', Etc.getlogin, FOLDER_JOBID) @@ -50,37 +51,43 @@ def local_barrier(name) busy_wait_until { count_file(folder) == MPITopo.local_size } end -global_handle = nil -parent_pid = nil - -# Set trap -Signal.trap(SyncDaemon::RT_SIGNAL_GLOBAL_BARRIER) do - global_barrier(global_handle) - Process.kill(SyncDaemon::RT_SIGNAL_READY, parent_pid) -end +fd = ARGV[0].to_i +sock = UNIXSocket.for_fd(fd) +send_ready = -> { sock.sendmsg(SyncDaemon::MSG_READY) } +global_handle = init_global_barrier local_barrier_count = 0 -Signal.trap(SyncDaemon::RT_SIGNAL_LOCAL_BARRIER) do - local_barrier(local_barrier_count.to_s) - local_barrier_count += 1 - Process.kill(SyncDaemon::RT_SIGNAL_READY, parent_pid) -end -Signal.trap(SyncDaemon::RT_SIGNAL_FINISH) do - # We cannot delete SHARED_LOCAL_FILESYSTEM - # Some rank can exit the `global_barrier` (hence calling this function) - # when others ranks are still in the `local_barrier` - # If we delete SHARED_LOCAL_FILESYSTEM, it will deadlock - # - # One possibility to be abble to remove `SHARED_LOCAL_FILESYSTEM`, - # is to make all ranks busy_wait_until in the `global_barrier`. - # This will ensure that every-one exited the `local_barrier`. - # but given the poor performance of our FS, we will avoid that for now... - exit +loop do + msg, = sock.recvmsg(64) + raise 'sync_daemon_fs: parent closed socket without sending FINISH' if msg.empty? + + case msg + when SyncDaemon::MSG_INIT + # Initial handshake; no work to do. + when SyncDaemon::MSG_LOCAL_BARRIER + local_barrier(local_barrier_count.to_s) + local_barrier_count += 1 + when SyncDaemon::MSG_GLOBAL_BARRIER + global_barrier(global_handle) + when SyncDaemon::MSG_FINISH + # We cannot delete SHARED_LOCAL_FILESYSTEM + # Some rank can exit the `global_barrier` (hence calling this function) + # when others ranks are still in the `local_barrier` + # If we delete SHARED_LOCAL_FILESYSTEM, it will deadlock + # + # One possibility to be abble to remove `SHARED_LOCAL_FILESYSTEM`, + # is to make all ranks busy_wait_until in the `global_barrier`. + # This will ensure that every-one exited the `local_barrier`. + # but given the poor performance of our FS, we will avoid that for now... + send_ready.call + break + else + warn("sync_daemon_fs: unknown message '#{msg}'") + exit(1) + end + + send_ready.call end -# Init global barrier -global_handle = init_global_barrier -parent_pid = ARGV[0].to_i -Process.kill(SyncDaemon::RT_SIGNAL_READY, parent_pid) -sleep +sock.close diff --git a/xprof/sync_daemon_mpi.c b/xprof/sync_daemon_mpi.cpp similarity index 68% rename from xprof/sync_daemon_mpi.c rename to xprof/sync_daemon_mpi.cpp index fd69e1a20..ca3dbd3d4 100644 --- a/xprof/sync_daemon_mpi.c +++ b/xprof/sync_daemon_mpi.cpp @@ -1,15 +1,18 @@ #include "mpi.h" -#include -#include -#include -#include -#include - -// Define real-time signals -#define RT_SIGNAL_READY SIGRTMIN -#define RT_SIGNAL_GLOBAL_BARRIER SIGRTMIN + 1 -#define RT_SIGNAL_LOCAL_BARRIER SIGRTMIN + 2 -#define RT_SIGNAL_FINISH SIGRTMIN + 3 +#include +#include +#include +#include +#include +#include + +using namespace std::string_view_literals; + +constexpr auto MSG_INIT = "INIT"sv; +constexpr auto MSG_LOCAL_BARRIER = "LOCAL_BARRIER"sv; +constexpr auto MSG_GLOBAL_BARRIER = "GLOBAL_BARRIER"sv; +constexpr auto MSG_FINISH = "FINISH"sv; +constexpr auto MSG_READY = "READY"sv; #define CHECK_MPI(x) \ do { \ @@ -79,57 +82,70 @@ int MPIX_Init_Session(MPI_Session *lib_shandle, MPI_Comm *lib_comm) { return ret; } -int signal_loop(int parent_pid, MPI_Comm MPI_COMM_WORLD_THAPI, MPI_Comm MPI_COMM_NODE) { - // Initialize signal set and add signals - sigset_t signal_set; - sigemptyset(&signal_set); - sigaddset(&signal_set, RT_SIGNAL_GLOBAL_BARRIER); - sigaddset(&signal_set, RT_SIGNAL_LOCAL_BARRIER); - sigaddset(&signal_set, RT_SIGNAL_FINISH); - sigprocmask(SIG_BLOCK, &signal_set, NULL); - kill(parent_pid, RT_SIGNAL_READY); - - // Processing loop: - // Should be only exited when receiving RT_SIGNAL_FINISH +static int send_msg(const int fd, const std::string_view msg) { + if (write(fd, msg.data(), msg.size()) < 0) { + perror("sync_daemon_mpi: write"); + return -1; + } + return 0; +} + +int message_loop(const int fd, MPI_Comm MPI_COMM_WORLD_THAPI, MPI_Comm MPI_COMM_NODE) { + char buf[64]; + + // Processing loop: should only be exited when receiving MSG_FINISH while (true) { - int signum; - sigwait(&signal_set, &signum); - if (signum == RT_SIGNAL_FINISH) { - // We don't signal ready when cleaning up - // The master will just `wait` for us to finish - return 0; - } else if (signum == RT_SIGNAL_LOCAL_BARRIER) { + const ssize_t n = read(fd, buf, sizeof(buf)); + if (n < 0) { + perror("sync_daemon_mpi: read"); + return 1; + } + if (n == 0) { + fprintf(stderr, "sync_daemon_mpi: parent closed socket unexpectedly\n"); + return 1; + } + const std::string_view msg(buf, n); + + if (msg == MSG_FINISH) { + return send_msg(fd, MSG_READY); + } else if (msg == MSG_LOCAL_BARRIER) { MPI_Barrier(MPI_COMM_NODE); - } else if (signum == RT_SIGNAL_GLOBAL_BARRIER) { + } else if (msg == MSG_GLOBAL_BARRIER) { MPI_Barrier(MPI_COMM_WORLD_THAPI); + } else if (msg == MSG_INIT) { + // Initial handshake; no work to do. } else { - fprintf(stderr, "Wrong signal received %d. Exiting", signum); + fprintf(stderr, "sync_daemon_mpi: unknown message '%.*s'\n", static_cast(msg.size()), + msg.data()); return 1; } - kill(parent_pid, RT_SIGNAL_READY); - } - // Unreachable - fprintf(stderr, "Wrong signal_loop exit"); - return 1; + if (send_msg(fd, MSG_READY) < 0) + return 1; + } } int main(int argc, char **argv) { // Initialization int ret = 0; - int parent_pid = 0; // World Session and Communicator MPI_Session lib_shandle = MPI_SESSION_NULL; MPI_Comm MPI_COMM_WORLD_THAPI = MPI_COMM_NULL; MPI_Comm MPI_COMM_NODE = MPI_COMM_NULL; + if (argc < 2) { + fprintf(stderr, "usage: %s \n", argv[0]); + return 1; + } + CHECK_MPI(MPIX_Init_Session(&lib_shandle, &MPI_COMM_WORLD_THAPI)); CHECK_MPI(MPI_Comm_split_type(MPI_COMM_WORLD_THAPI, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &MPI_COMM_NODE)); - parent_pid = atoi(argv[1]); - ret = signal_loop(parent_pid, MPI_COMM_WORLD_THAPI, MPI_COMM_NODE); + const int fd = atoi(argv[1]); + ret = message_loop(fd, MPI_COMM_WORLD_THAPI, MPI_COMM_NODE); + close(fd); fn_exit: if (MPI_COMM_NODE != MPI_COMM_NULL) diff --git a/xprof/xprof.rb.in b/xprof/xprof.rb.in index 7983bd4f5..f8003fcdb 100755 --- a/xprof/xprof.rb.in +++ b/xprof/xprof.rb.in @@ -344,56 +344,29 @@ def thapi_trace_dir_root end # _ -# | \ _. _ ._ _ _ ._ -# |_/ (_| (/_ | | | (_) | | +# |_) _. ._ ._ o _ ._ +# |_) (_| | | | (/_ | # - -class SpawnDaemon - SIGRTMIN = 34 - RT_SIGNAL_READY = SIGRTMIN - RT_SIGNAL_FINISH = SIGRTMIN + 3 - - # This belongs to the global ruby process! - # This Object is NOT thread-safe, please do not spawn user thread - @@ready = false - Signal.trap(RT_SIGNAL_READY) { @@ready = true } - - # Any blocking implementation - # (`IO.pipe`, `throw/catch`+sleep` deadlock for an unknown reason) - # This is why we spin-lock - def _lazy_exec(_cond, &block) - block.call - busy_wait_until { @@ready } - @@ready = false - end - - def lazy_exec(message = nil, cond, &block) +class SyncDaemon + MSG_INIT = 'INIT' + MSG_LOCAL_BARRIER = 'LOCAL_BARRIER' + MSG_GLOBAL_BARRIER = 'GLOBAL_BARRIER' + MSG_FINISH = 'FINISH' + MSG_READY = 'READY' + + # block triggers the daemon; lazy_exec blocks on its READY reply. + def lazy_exec_mpi(message = nil, &block) LOGGER.with_info_block(message) do - unless cond + unless MPITopo.active? LOGGER.debug("#{message}: No-op") return end - # Don't inline, Trap doesn't work with logger - # https://bugs.ruby-lang.org/issues/7917 - _lazy_exec(cond, &block) + block.call + wait_ready end end -end - -# _ -# |_) _. ._ ._ o _ ._ -# |_) (_| | | | (/_ | -# -class SyncDaemon < SpawnDaemon - RT_SIGNAL_GLOBAL_BARRIER = SIGRTMIN + 1 - RT_SIGNAL_LOCAL_BARRIER = SIGRTMIN + 2 - - def lazy_exec_mpi(message = nil, &block) - lazy_exec(message, MPITopo.active?, &block) - end def initialize - super daemon_type = env_fetch_first('THAPI_SYNC_DAEMON') daemon = case daemon_type when nil @@ -414,9 +387,12 @@ class SyncDaemon < SpawnDaemon raise("Error: THAPI_SYNC_DAEMON value (#{daemon_type}) is not supported. Allowed: [mpi,fs] ") end - LOGGER.debug { "spawn(#{daemon} #{Process.pid})" } + LOGGER.debug { "spawn(#{daemon})" } lazy_exec_mpi("Initialize SyncDaemon #{daemon_type}") do - @pid = spawn("#{daemon} #{Process.pid}") + @parent, child = UNIXSocket.pair(:SEQPACKET) + @pid = Process.spawn(daemon, child.fileno.to_s, child.fileno => child.fileno) + child.close + @parent.sendmsg(MSG_INIT) end end @@ -424,19 +400,21 @@ class SyncDaemon < SpawnDaemon LOGGER.debug { 'Finalize SyncDaemon' } return unless MPITopo.active? - Process.kill(RT_SIGNAL_FINISH, @pid) + @parent.sendmsg(MSG_FINISH) + wait_ready Process.wait(@pid) + @parent.close end def local_barrier(name) lazy_exec_mpi("Local_barrier #{name}") do - Process.kill(RT_SIGNAL_LOCAL_BARRIER, @pid) + @parent.sendmsg(MSG_LOCAL_BARRIER) end end def global_barrier lazy_exec_mpi('Global_barrier') do - Process.kill(RT_SIGNAL_GLOBAL_BARRIER, @pid) + @parent.sendmsg(MSG_GLOBAL_BARRIER) end end @@ -453,6 +431,13 @@ class SyncDaemon < SpawnDaemon syncd.finalize end end + + private + + def wait_ready + reply, = @parent.recvmsg(64) + raise "SyncDaemon: expected #{MSG_READY}, got '#{reply}'" unless reply == MSG_READY + end end # __ @@ -464,20 +449,35 @@ def sampling? OPTIONS[:sample] && MPITopo.local_master? end -class SamplingDaemon < SpawnDaemon +class SamplingDaemon + MSG_INIT = 'INIT' + MSG_FINISH = 'FINISH' + MSG_READY = 'READY' + + # block triggers the daemon; lazy_exec blocks on its READY reply. def lazy_exec_sampling(message = nil, &block) - lazy_exec(message, sampling?, &block) + LOGGER.with_info_block(message) do + unless sampling? + LOGGER.debug("#{message}: No-op") + return + end + block.call + wait_ready + end end def initialize(h = {}) - super() daemon_path = "#{BINDIR}/thapi_sampling_daemon" raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path) path_so = h.fetch('THAPI_SAMPLING_LIBRARIES', '').gsub(':', ' ') - LOGGER.debug { "spawn(sampling_daemon) #{Process.pid} #{path_so}" } + LOGGER.debug { "spawn(sampling_daemon) #{path_so}" } lazy_exec_sampling('Initialize SamplingDaemon') do - @pid = spawn(h, "#{daemon_path} #{Process.pid} #{path_so}") + @parent, child = UNIXSocket.pair(:SEQPACKET) + @pid = Process.spawn(h, "#{daemon_path} #{child.fileno} #{path_so}", + child.fileno => child.fileno) + child.close + @parent.sendmsg(MSG_INIT) end end @@ -485,8 +485,17 @@ class SamplingDaemon < SpawnDaemon LOGGER.debug { 'Finalize SamplingDaemon' } return unless sampling? - Process.kill(RT_SIGNAL_FINISH, @pid) + @parent.sendmsg(MSG_FINISH) + wait_ready Process.wait(@pid) + @parent.close + end + + private + + def wait_ready + reply, = @parent.recvmsg(64) + raise "SamplingDaemon: expected #{MSG_READY}, got '#{reply}'" unless reply == MSG_READY end end From b1f173754813ccb88914dd0f46f2b973c2866e79 Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Tue, 19 May 2026 16:41:05 -0500 Subject: [PATCH 02/12] Fix clang-format-18 violations in daemon sources Remove manual = alignment on constexpr/using declarations, collapse the dlsym reinterpret_cast lines to fit clang-format's continuation indent, and break the if-then-return chains onto two lines (LLVM default disallows short ifs on one line). Co-Authored-By: Claude Opus 4.7 --- sampling/thapi_sampling_daemon.cpp | 26 +++++++++++++++----------- xprof/sync_daemon_mpi.cpp | 8 ++++---- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/sampling/thapi_sampling_daemon.cpp b/sampling/thapi_sampling_daemon.cpp index 2154ed9f3..c55fbfd3f 100644 --- a/sampling/thapi_sampling_daemon.cpp +++ b/sampling/thapi_sampling_daemon.cpp @@ -8,12 +8,12 @@ using namespace std::string_view_literals; -constexpr auto MSG_INIT = "INIT"sv; +constexpr auto MSG_INIT = "INIT"sv; constexpr auto MSG_FINISH = "FINISH"sv; -constexpr auto MSG_READY = "READY"sv; +constexpr auto MSG_READY = "READY"sv; using plugin_initialize_func = void (*)(); -using plugin_finalize_func = void (*)(); +using plugin_finalize_func = void (*)(); static int recv_expect(const int fd, const std::string_view want) { char buf[64]; @@ -64,10 +64,10 @@ int main(int argc, char **argv) { std::cerr << "Failed to load " << argv[i] << ": " << dlerror() << std::endl; continue; } - auto init_func = reinterpret_cast( - dlsym(handle, "thapi_initialize_sampling_plugin")); - auto fini_func = reinterpret_cast( - dlsym(handle, "thapi_finalize_sampling_plugin")); + auto init_func = + reinterpret_cast(dlsym(handle, "thapi_initialize_sampling_plugin")); + auto fini_func = + reinterpret_cast(dlsym(handle, "thapi_finalize_sampling_plugin")); plugins.push_back({handle, init_func, fini_func}); } @@ -76,11 +76,14 @@ int main(int argc, char **argv) { plugin.initialize(); // Handshake: parent → INIT, daemon → READY - if (recv_expect(fd, MSG_INIT) < 0) return 1; - if (send_msg(fd, MSG_READY) < 0) return 1; + if (recv_expect(fd, MSG_INIT) < 0) + return 1; + if (send_msg(fd, MSG_READY) < 0) + return 1; // Wait for shutdown: parent → FINISH - if (recv_expect(fd, MSG_FINISH) < 0) return 1; + if (recv_expect(fd, MSG_FINISH) < 0) + return 1; // Finalization for (const auto &plugin : plugins) { @@ -89,7 +92,8 @@ int main(int argc, char **argv) { dlclose(plugin.handle); } - if (send_msg(fd, MSG_READY) < 0) return 1; + if (send_msg(fd, MSG_READY) < 0) + return 1; close(fd); // Will call the destructor, who will finalize all the not unregistered plugin return 0; diff --git a/xprof/sync_daemon_mpi.cpp b/xprof/sync_daemon_mpi.cpp index ca3dbd3d4..38bc8d1da 100644 --- a/xprof/sync_daemon_mpi.cpp +++ b/xprof/sync_daemon_mpi.cpp @@ -8,11 +8,11 @@ using namespace std::string_view_literals; -constexpr auto MSG_INIT = "INIT"sv; -constexpr auto MSG_LOCAL_BARRIER = "LOCAL_BARRIER"sv; +constexpr auto MSG_INIT = "INIT"sv; +constexpr auto MSG_LOCAL_BARRIER = "LOCAL_BARRIER"sv; constexpr auto MSG_GLOBAL_BARRIER = "GLOBAL_BARRIER"sv; -constexpr auto MSG_FINISH = "FINISH"sv; -constexpr auto MSG_READY = "READY"sv; +constexpr auto MSG_FINISH = "FINISH"sv; +constexpr auto MSG_READY = "READY"sv; #define CHECK_MPI(x) \ do { \ From b5504410cfe8138ea670d062c4733ce85e434b06 Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Tue, 19 May 2026 16:46:53 -0500 Subject: [PATCH 03/12] Move fd init before CHECK_MPI to avoid goto-crosses-init error C++ forbids a goto from jumping over a variable initialization. CHECK_MPI expands to goto fn_exit, so const int fd must be declared before the first CHECK_MPI call. Co-Authored-By: Claude Opus 4.7 --- xprof/sync_daemon_mpi.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xprof/sync_daemon_mpi.cpp b/xprof/sync_daemon_mpi.cpp index 38bc8d1da..96998e7af 100644 --- a/xprof/sync_daemon_mpi.cpp +++ b/xprof/sync_daemon_mpi.cpp @@ -138,12 +138,12 @@ int main(int argc, char **argv) { fprintf(stderr, "usage: %s \n", argv[0]); return 1; } + const int fd = atoi(argv[1]); CHECK_MPI(MPIX_Init_Session(&lib_shandle, &MPI_COMM_WORLD_THAPI)); CHECK_MPI(MPI_Comm_split_type(MPI_COMM_WORLD_THAPI, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &MPI_COMM_NODE)); - const int fd = atoi(argv[1]); ret = message_loop(fd, MPI_COMM_WORLD_THAPI, MPI_COMM_NODE); close(fd); From dc2bbbde2aac5258ad1d26672a6691dc2987a12d Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Tue, 19 May 2026 17:03:33 -0500 Subject: [PATCH 04/12] Fix daemon socketpair: clear O_NONBLOCK and port shell test driver Ruby creates UNIX sockets with O_NONBLOCK on Linux by default. The inherited fd carried that flag into the daemon, so its blocking read() returned EAGAIN ("Resource temporarily unavailable") and the daemon exited before processing any message. Clear the flag on the child end before spawn in both SyncDaemon and SamplingDaemon. Replace integration_tests/light_iprof_only_sync.sh with a Ruby driver that speaks the new socketpair protocol. The old shell driver spawned the daemon directly using the signal protocol (passing PARENT_PID as argv) and is incompatible with the new argv contract. Co-Authored-By: Claude Opus 4.7 --- integration_tests/light_iprof_only_sync.rb | 41 ++++++++++++++ integration_tests/light_iprof_only_sync.sh | 64 ---------------------- integration_tests/parallel_execution.bats | 4 +- xprof/xprof.rb.in | 6 ++ 4 files changed, 49 insertions(+), 66 deletions(-) create mode 100755 integration_tests/light_iprof_only_sync.rb delete mode 100755 integration_tests/light_iprof_only_sync.sh diff --git a/integration_tests/light_iprof_only_sync.rb b/integration_tests/light_iprof_only_sync.rb new file mode 100755 index 000000000..16fd682b8 --- /dev/null +++ b/integration_tests/light_iprof_only_sync.rb @@ -0,0 +1,41 @@ +#!/usr/bin/env ruby + +# Stand-in for the iprof Ruby driver: spawn the configured sync daemon, +# drive the socketpair protocol manually, run the user command in between. +# Used by integration tests to exercise daemon binaries without the rest +# of iprof. + +require 'socket' + +daemon_kind = ENV.fetch('THAPI_SYNC_DAEMON') +daemon = "sync_daemon_#{daemon_kind}" + +MSG_INIT = 'INIT' +MSG_LOCAL_BARRIER = 'LOCAL_BARRIER' +MSG_GLOBAL_BARRIER = 'GLOBAL_BARRIER' +MSG_FINISH = 'FINISH' +MSG_READY = 'READY' + +parent, child = UNIXSocket.pair(:SEQPACKET) +# Ruby sets O_NONBLOCK on sockets by default; the daemon uses blocking +# read, so clear it on the inherited fd. +child.nonblock = false +pid = Process.spawn(daemon, child.fileno.to_s, child.fileno => child.fileno) +child.close + +send_and_wait = lambda do |msg| + parent.sendmsg(msg) + reply, = parent.recvmsg(64) + raise "expected #{MSG_READY}, got '#{reply}'" unless reply == MSG_READY +end + +send_and_wait.call(MSG_INIT) +send_and_wait.call(MSG_LOCAL_BARRIER) + +system(*ARGV) || exit($?.exitstatus || 1) + +send_and_wait.call(MSG_LOCAL_BARRIER) +send_and_wait.call(MSG_GLOBAL_BARRIER) +send_and_wait.call(MSG_FINISH) +parent.close +Process.wait(pid) diff --git a/integration_tests/light_iprof_only_sync.sh b/integration_tests/light_iprof_only_sync.sh deleted file mode 100755 index a0588eeb8..000000000 --- a/integration_tests/light_iprof_only_sync.sh +++ /dev/null @@ -1,64 +0,0 @@ -#!/bin/bash -set -euo pipefail - -# For loging and Daemon to send signal to us -PARENT_PID=$$ - -# Get base real-time signal number -SIGRTMIN=$(kill -l SIGRTMIN) - -# Set signals as defined in MPI daemon code -RT_SIGNAL_READY=$((SIGRTMIN + 0)) -RT_SIGNAL_GLOBAL_BARRIER=$((SIGRTMIN + 1)) -RT_SIGNAL_LOCAL_BARRIER=$((SIGRTMIN + 2)) -RT_SIGNAL_FINISH=$((SIGRTMIN + 3)) - -# Signal handler for capturing signals -handle_signal() { - echo "$PARENT_PID $(date) | Received signal $1 from sync_daemon" - if [ "$1" == "RT_SIGNAL_READY" ]; then - SIGNAL_RECEIVED="true" - fi -} - -# Setup trap for ready signal sent from signal daemon -trap 'handle_signal RT_SIGNAL_READY' $RT_SIGNAL_READY - -# Function to wait for RT_SIGNAL_READY -wait_for_signal() { - while [[ "$SIGNAL_RECEIVED" == "false" ]]; do - sleep 0.1 # Small sleep to prevent busy looping - done -} - -# To avoid race condition, `SIGNAL_RECEIVED` need to be set -# before spawning or signaling the daemon -spawn_daemon_blocking() { - SIGNAL_RECEIVED="false" - sync_daemon_"${THAPI_SYNC_DAEMON}" $PARENT_PID & - DAEMON_PID=$! - wait_for_signal -} - -send_signal_blocking() { - SIGNAL_RECEIVED="false" - kill -"$1" $DAEMON_PID - wait_for_signal -} - -echo "$PARENT_PID $(date) | Spawn Daemon" -spawn_daemon_blocking -echo "$PARENT_PID $(date) | Send Local Barrier signal" -send_signal_blocking $RT_SIGNAL_LOCAL_BARRIER -# Run test program -"$@" - -# Final synchronization after mpi_hello_world execution -echo "$PARENT_PID $(date) | Send Local Barrier signal" -send_signal_blocking $RT_SIGNAL_LOCAL_BARRIER -echo "$PARENT_PID $(date) | Send Global Barrier signal" -send_signal_blocking $RT_SIGNAL_GLOBAL_BARRIER -echo "$PARENT_PID $(date) | Send Termination signal" -kill -"$RT_SIGNAL_FINISH" $DAEMON_PID -echo "$PARENT_PID $(date) | Wait for daemon to quit" -wait $DAEMON_PID diff --git a/integration_tests/parallel_execution.bats b/integration_tests/parallel_execution.bats index 74c79b95f..aea05874c 100644 --- a/integration_tests/parallel_execution.bats +++ b/integration_tests/parallel_execution.bats @@ -8,7 +8,7 @@ launch_mpi() { # THAPI_SYNC_DAEMON=fs Tests @test "sync_daemon_fs" { - THAPI_SYNC_DAEMON=fs launch_mpi -n 2 ./integration_tests/light_iprof_only_sync.sh clinfo + THAPI_SYNC_DAEMON=fs launch_mpi -n 2 ./integration_tests/light_iprof_only_sync.rb clinfo } @test "iprof_fs" { @@ -27,7 +27,7 @@ launch_mpi() { # bats test_tags=mpi_sync_daemon @test "sync_daemon_mpi" { - THAPI_SYNC_DAEMON=mpi launch_mpi -n 2 ./integration_tests/light_iprof_only_sync.sh clinfo + THAPI_SYNC_DAEMON=mpi launch_mpi -n 2 ./integration_tests/light_iprof_only_sync.rb clinfo } # bats test_tags=mpi_sync_daemon diff --git a/xprof/xprof.rb.in b/xprof/xprof.rb.in index f8003fcdb..b0f1ed836 100755 --- a/xprof/xprof.rb.in +++ b/xprof/xprof.rb.in @@ -390,6 +390,9 @@ class SyncDaemon LOGGER.debug { "spawn(#{daemon})" } lazy_exec_mpi("Initialize SyncDaemon #{daemon_type}") do @parent, child = UNIXSocket.pair(:SEQPACKET) + # Ruby sets O_NONBLOCK on sockets by default; the C/Ruby daemon uses + # blocking read, so clear it on the inherited fd. + child.nonblock = false @pid = Process.spawn(daemon, child.fileno.to_s, child.fileno => child.fileno) child.close @parent.sendmsg(MSG_INIT) @@ -474,6 +477,9 @@ class SamplingDaemon LOGGER.debug { "spawn(sampling_daemon) #{path_so}" } lazy_exec_sampling('Initialize SamplingDaemon') do @parent, child = UNIXSocket.pair(:SEQPACKET) + # Ruby sets O_NONBLOCK on sockets by default; the C daemon uses + # blocking read, so clear it on the inherited fd. + child.nonblock = false @pid = Process.spawn(h, "#{daemon_path} #{child.fileno} #{path_so}", child.fileno => child.fileno) child.close From 32c466985c3059ee5f25ea391164751e7ba03ee2 Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Tue, 19 May 2026 17:15:28 -0500 Subject: [PATCH 05/12] Require 'io/nonblock' for child.nonblock= setter The setter is not loaded by default in Ruby 3+; CI hit NoMethodError. Reproduced locally and confirmed the require resolves it. Co-Authored-By: Claude Opus 4.7 --- integration_tests/light_iprof_only_sync.rb | 1 + xprof/xprof.rb.in | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/integration_tests/light_iprof_only_sync.rb b/integration_tests/light_iprof_only_sync.rb index 16fd682b8..f06c48c7a 100755 --- a/integration_tests/light_iprof_only_sync.rb +++ b/integration_tests/light_iprof_only_sync.rb @@ -6,6 +6,7 @@ # of iprof. require 'socket' +require 'io/nonblock' daemon_kind = ENV.fetch('THAPI_SYNC_DAEMON') daemon = "sync_daemon_#{daemon_kind}" diff --git a/xprof/xprof.rb.in b/xprof/xprof.rb.in index b0f1ed836..6261f4bdc 100755 --- a/xprof/xprof.rb.in +++ b/xprof/xprof.rb.in @@ -34,6 +34,7 @@ require 'optparse_thapi' require 'pty' require 'digest/md5' require 'socket' +require 'io/nonblock' require 'logger' require 'set' require 'securerandom' @@ -390,7 +391,7 @@ class SyncDaemon LOGGER.debug { "spawn(#{daemon})" } lazy_exec_mpi("Initialize SyncDaemon #{daemon_type}") do @parent, child = UNIXSocket.pair(:SEQPACKET) - # Ruby sets O_NONBLOCK on sockets by default; the C/Ruby daemon uses + # Ruby sets O_NONBLOCK on sockets by default; the daemon uses # blocking read, so clear it on the inherited fd. child.nonblock = false @pid = Process.spawn(daemon, child.fileno.to_s, child.fileno => child.fileno) From d41d5f813b21f972ff2e2a670ca008f8c892ac27 Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Tue, 19 May 2026 17:31:32 -0500 Subject: [PATCH 06/12] Deduplicate daemon protocol code Extract the socketpair protocol shared by iprof and its daemons: - Ruby side: new DaemonClient module holds MSG_INIT/FINISH/READY, lazy_exec, finalize, wait_ready, and the spawn boilerplate (incl. the nonblock= dance). SyncDaemon and SamplingDaemon each include it and supply only their own gating predicate and per-instance state. - C++ side: new utils/include/daemon_proto.hpp holds the MSG_* string_view constants, send_msg, and recv_expect. Both sync_daemon_mpi and thapi_sampling_daemon use it. - Also: shrink the SyncDaemon daemon-path case statement, drop the send_ready lambda in sync_daemon_fs, and add the new header to utils/Makefile.am noinst_HEADERS. Net: ~80 fewer lines across the daemon code (header included). Co-Authored-By: Claude Opus 4.7 --- sampling/thapi_sampling_daemon.cpp | 45 ++------ utils/Makefile.am | 1 + utils/include/daemon_proto.hpp | 53 ++++++++++ xprof/Makefile.am | 2 +- xprof/sync_daemon_fs | 23 ++-- xprof/sync_daemon_mpi.cpp | 31 ++---- xprof/xprof.rb.in | 162 ++++++++++++----------------- 7 files changed, 146 insertions(+), 171 deletions(-) create mode 100644 utils/include/daemon_proto.hpp diff --git a/sampling/thapi_sampling_daemon.cpp b/sampling/thapi_sampling_daemon.cpp index c55fbfd3f..338c6fea6 100644 --- a/sampling/thapi_sampling_daemon.cpp +++ b/sampling/thapi_sampling_daemon.cpp @@ -1,46 +1,15 @@ +#include "daemon_proto.hpp" #include #include #include -#include -#include -#include #include -using namespace std::string_view_literals; - -constexpr auto MSG_INIT = "INIT"sv; -constexpr auto MSG_FINISH = "FINISH"sv; -constexpr auto MSG_READY = "READY"sv; +using namespace daemon_proto; using plugin_initialize_func = void (*)(); using plugin_finalize_func = void (*)(); -static int recv_expect(const int fd, const std::string_view want) { - char buf[64]; - const ssize_t n = read(fd, buf, sizeof(buf)); - if (n < 0) { - std::perror("thapi_sampling_daemon: read"); - return -1; - } - if (n == 0) { - std::cerr << "thapi_sampling_daemon: parent closed socket unexpectedly" << std::endl; - return -1; - } - const std::string_view got(buf, n); - if (got != want) { - std::cerr << "thapi_sampling_daemon: expected " << want << ", got " << got << std::endl; - return -1; - } - return 0; -} - -static int send_msg(const int fd, const std::string_view msg) { - if (write(fd, msg.data(), msg.size()) < 0) { - std::perror("thapi_sampling_daemon: write"); - return -1; - } - return 0; -} +constexpr auto WHO = "thapi_sampling_daemon"; int main(int argc, char **argv) { if (argc < 2) { @@ -76,13 +45,13 @@ int main(int argc, char **argv) { plugin.initialize(); // Handshake: parent → INIT, daemon → READY - if (recv_expect(fd, MSG_INIT) < 0) + if (recv_expect(WHO, fd, MSG_INIT) < 0) return 1; - if (send_msg(fd, MSG_READY) < 0) + if (send_msg(WHO, fd, MSG_READY) < 0) return 1; // Wait for shutdown: parent → FINISH - if (recv_expect(fd, MSG_FINISH) < 0) + if (recv_expect(WHO, fd, MSG_FINISH) < 0) return 1; // Finalization @@ -92,7 +61,7 @@ int main(int argc, char **argv) { dlclose(plugin.handle); } - if (send_msg(fd, MSG_READY) < 0) + if (send_msg(WHO, fd, MSG_READY) < 0) return 1; close(fd); // Will call the destructor, who will finalize all the not unregistered plugin diff --git a/utils/Makefile.am b/utils/Makefile.am index 5de4a68dd..c3b96593a 100644 --- a/utils/Makefile.am +++ b/utils/Makefile.am @@ -10,6 +10,7 @@ noinst_HEADERS = \ include/utlist.h \ include/json.hpp \ include/magic_enum.hpp \ + include/daemon_proto.hpp \ xprof_utils.hpp nodist_noinst_HEADERS = lttng/tracepoint_gen.h diff --git a/utils/include/daemon_proto.hpp b/utils/include/daemon_proto.hpp new file mode 100644 index 000000000..f635affdd --- /dev/null +++ b/utils/include/daemon_proto.hpp @@ -0,0 +1,53 @@ +// Wire protocol shared between iprof (parent) and the C/C++ daemons +// (sync_daemon_mpi, thapi_sampling_daemon). Messages are short ASCII +// tokens exchanged over a SOCK_SEQPACKET socketpair; each parent +// command is acknowledged by the daemon with MSG_READY. + +#pragma once + +#include +#include +#include +#include +#include + +namespace daemon_proto { + +using namespace std::string_view_literals; + +constexpr auto MSG_INIT = "INIT"sv; +constexpr auto MSG_LOCAL_BARRIER = "LOCAL_BARRIER"sv; +constexpr auto MSG_GLOBAL_BARRIER = "GLOBAL_BARRIER"sv; +constexpr auto MSG_FINISH = "FINISH"sv; +constexpr auto MSG_READY = "READY"sv; + +inline int send_msg(const char *who, int fd, std::string_view msg) { + if (write(fd, msg.data(), msg.size()) < 0) { + std::perror(who); + return -1; + } + return 0; +} + +// Read one message from fd and verify it matches `want`. Returns 0 on +// match, -1 on syscall failure, EOF, or mismatch. +inline int recv_expect(const char *who, int fd, std::string_view want) { + char buf[64]; + const ssize_t n = read(fd, buf, sizeof(buf)); + if (n < 0) { + std::perror(who); + return -1; + } + if (n == 0) { + std::cerr << who << ": parent closed socket unexpectedly" << std::endl; + return -1; + } + const std::string_view got(buf, n); + if (got != want) { + std::cerr << who << ": expected " << want << ", got " << got << std::endl; + return -1; + } + return 0; +} + +} // namespace daemon_proto diff --git a/xprof/Makefile.am b/xprof/Makefile.am index bc9af57f9..f2177d402 100644 --- a/xprof/Makefile.am +++ b/xprof/Makefile.am @@ -15,7 +15,7 @@ if FOUND_MPI endif sync_daemon_mpi: $(srcdir)/sync_daemon_mpi.cpp - $(MPICXX) $(CXXFLAGS) -std=c++17 $(WERROR) -Wall -Wextra $< -o $@ + $(MPICXX) $(CXXFLAGS) -std=c++17 $(WERROR) -Wall -Wextra -I$(top_srcdir)/utils/include $< -o $@ iprof: xprof.rb cp xprof.rb $@ diff --git a/xprof/sync_daemon_fs b/xprof/sync_daemon_fs index 9932e7b6d..efadb59ad 100755 --- a/xprof/sync_daemon_fs +++ b/xprof/sync_daemon_fs @@ -51,10 +51,7 @@ def local_barrier(name) busy_wait_until { count_file(folder) == MPITopo.local_size } end -fd = ARGV[0].to_i -sock = UNIXSocket.for_fd(fd) -send_ready = -> { sock.sendmsg(SyncDaemon::MSG_READY) } - +sock = UNIXSocket.for_fd(ARGV[0].to_i) global_handle = init_global_barrier local_barrier_count = 0 @@ -71,23 +68,19 @@ loop do when SyncDaemon::MSG_GLOBAL_BARRIER global_barrier(global_handle) when SyncDaemon::MSG_FINISH - # We cannot delete SHARED_LOCAL_FILESYSTEM - # Some rank can exit the `global_barrier` (hence calling this function) - # when others ranks are still in the `local_barrier` - # If we delete SHARED_LOCAL_FILESYSTEM, it will deadlock - # - # One possibility to be abble to remove `SHARED_LOCAL_FILESYSTEM`, - # is to make all ranks busy_wait_until in the `global_barrier`. - # This will ensure that every-one exited the `local_barrier`. - # but given the poor performance of our FS, we will avoid that for now... - send_ready.call + # We cannot delete SHARED_LOCAL_FILESYSTEM here: some rank can exit + # global_barrier (hence reach FINISH) while others are still in + # local_barrier. Removing SHARED_LOCAL_FILESYSTEM would deadlock them. + # One fix: have all ranks busy_wait_until in global_barrier so we + # know local_barrier is done — but given FS performance, we skip it. + sock.sendmsg(SyncDaemon::MSG_READY) break else warn("sync_daemon_fs: unknown message '#{msg}'") exit(1) end - send_ready.call + sock.sendmsg(SyncDaemon::MSG_READY) end sock.close diff --git a/xprof/sync_daemon_mpi.cpp b/xprof/sync_daemon_mpi.cpp index 96998e7af..05a330608 100644 --- a/xprof/sync_daemon_mpi.cpp +++ b/xprof/sync_daemon_mpi.cpp @@ -1,18 +1,11 @@ +#include "daemon_proto.hpp" #include "mpi.h" -#include #include #include -#include -#include -#include -using namespace std::string_view_literals; +using namespace daemon_proto; -constexpr auto MSG_INIT = "INIT"sv; -constexpr auto MSG_LOCAL_BARRIER = "LOCAL_BARRIER"sv; -constexpr auto MSG_GLOBAL_BARRIER = "GLOBAL_BARRIER"sv; -constexpr auto MSG_FINISH = "FINISH"sv; -constexpr auto MSG_READY = "READY"sv; +constexpr auto WHO = "sync_daemon_mpi"; #define CHECK_MPI(x) \ do { \ @@ -82,14 +75,6 @@ int MPIX_Init_Session(MPI_Session *lib_shandle, MPI_Comm *lib_comm) { return ret; } -static int send_msg(const int fd, const std::string_view msg) { - if (write(fd, msg.data(), msg.size()) < 0) { - perror("sync_daemon_mpi: write"); - return -1; - } - return 0; -} - int message_loop(const int fd, MPI_Comm MPI_COMM_WORLD_THAPI, MPI_Comm MPI_COMM_NODE) { char buf[64]; @@ -97,17 +82,17 @@ int message_loop(const int fd, MPI_Comm MPI_COMM_WORLD_THAPI, MPI_Comm MPI_COMM_ while (true) { const ssize_t n = read(fd, buf, sizeof(buf)); if (n < 0) { - perror("sync_daemon_mpi: read"); + perror(WHO); return 1; } if (n == 0) { - fprintf(stderr, "sync_daemon_mpi: parent closed socket unexpectedly\n"); + fprintf(stderr, "%s: parent closed socket unexpectedly\n", WHO); return 1; } const std::string_view msg(buf, n); if (msg == MSG_FINISH) { - return send_msg(fd, MSG_READY); + return send_msg(WHO, fd, MSG_READY); } else if (msg == MSG_LOCAL_BARRIER) { MPI_Barrier(MPI_COMM_NODE); } else if (msg == MSG_GLOBAL_BARRIER) { @@ -115,12 +100,12 @@ int message_loop(const int fd, MPI_Comm MPI_COMM_WORLD_THAPI, MPI_Comm MPI_COMM_ } else if (msg == MSG_INIT) { // Initial handshake; no work to do. } else { - fprintf(stderr, "sync_daemon_mpi: unknown message '%.*s'\n", static_cast(msg.size()), + fprintf(stderr, "%s: unknown message '%.*s'\n", WHO, static_cast(msg.size()), msg.data()); return 1; } - if (send_msg(fd, MSG_READY) < 0) + if (send_msg(WHO, fd, MSG_READY) < 0) return 1; } } diff --git a/xprof/xprof.rb.in b/xprof/xprof.rb.in index 6261f4bdc..491dd8b3f 100755 --- a/xprof/xprof.rb.in +++ b/xprof/xprof.rb.in @@ -344,65 +344,29 @@ def thapi_trace_dir_root end end -# _ -# |_) _. ._ ._ o _ ._ -# |_) (_| | | | (/_ | -# -class SyncDaemon - MSG_INIT = 'INIT' - MSG_LOCAL_BARRIER = 'LOCAL_BARRIER' - MSG_GLOBAL_BARRIER = 'GLOBAL_BARRIER' - MSG_FINISH = 'FINISH' - MSG_READY = 'READY' +# Shared socketpair protocol between iprof and its sync/sampling daemons. +# Subclasses define `active?` (gating predicate) and call spawn_daemon +# from their initialize. +module DaemonClient + MSG_INIT = 'INIT' + MSG_FINISH = 'FINISH' + MSG_READY = 'READY' # block triggers the daemon; lazy_exec blocks on its READY reply. - def lazy_exec_mpi(message = nil, &block) + def lazy_exec(message = nil) LOGGER.with_info_block(message) do - unless MPITopo.active? + unless active? LOGGER.debug("#{message}: No-op") return end - block.call + yield wait_ready end end - def initialize - daemon_type = env_fetch_first('THAPI_SYNC_DAEMON') - daemon = case daemon_type - when nil - # Default is MPI - if File.exist?("#{BINDIR}/sync_daemon_mpi") - "#{BINDIR}/sync_daemon_mpi" - else - LOGGER.warn("No #{BINDIR}/sync_daemon_mpi binary. Fallback to #{BINDIR}/sync_daemon_f") - "#{BINDIR}/sync_daemon_fs" - end - when 'mpi' - raise("No #{BINDIR}/sync_daemon_mpi binary") unless File.exist?("#{BINDIR}/sync_daemon_mpi") - - "#{BINDIR}/sync_daemon_mpi" - when 'fs' - "#{BINDIR}/sync_daemon_fs" - else - raise("Error: THAPI_SYNC_DAEMON value (#{daemon_type}) is not supported. Allowed: [mpi,fs] ") - end - - LOGGER.debug { "spawn(#{daemon})" } - lazy_exec_mpi("Initialize SyncDaemon #{daemon_type}") do - @parent, child = UNIXSocket.pair(:SEQPACKET) - # Ruby sets O_NONBLOCK on sockets by default; the daemon uses - # blocking read, so clear it on the inherited fd. - child.nonblock = false - @pid = Process.spawn(daemon, child.fileno.to_s, child.fileno => child.fileno) - child.close - @parent.sendmsg(MSG_INIT) - end - end - def finalize - LOGGER.debug { 'Finalize SyncDaemon' } - return unless MPITopo.active? + LOGGER.debug { "Finalize #{self.class}" } + return unless active? @parent.sendmsg(MSG_FINISH) wait_ready @@ -410,16 +374,59 @@ class SyncDaemon @parent.close end - def local_barrier(name) - lazy_exec_mpi("Local_barrier #{name}") do - @parent.sendmsg(MSG_LOCAL_BARRIER) + private + + def spawn_daemon + @parent, child = UNIXSocket.pair(:SEQPACKET) + # Ruby sets O_NONBLOCK on sockets by default; the daemon uses + # blocking read, so clear it on the inherited fd. + child.nonblock = false + @pid = yield child.fileno + child.close + @parent.sendmsg(MSG_INIT) + end + + def wait_ready + reply, = @parent.recvmsg(64) + raise "#{self.class}: expected #{MSG_READY}, got '#{reply}'" unless reply == MSG_READY + end +end + +# _ +# |_) _. ._ ._ o _ ._ +# |_) (_| | | | (/_ | +# +class SyncDaemon + include DaemonClient + + MSG_LOCAL_BARRIER = 'LOCAL_BARRIER' + MSG_GLOBAL_BARRIER = 'GLOBAL_BARRIER' + + def initialize + daemon_type = env_fetch_first('THAPI_SYNC_DAEMON') + raise("Error: THAPI_SYNC_DAEMON=#{daemon_type} is not supported. Allowed: [mpi,fs]") \ + unless [nil, 'mpi', 'fs'].include?(daemon_type) + + mpi_path = "#{BINDIR}/sync_daemon_mpi" + if daemon_type == 'mpi' && !File.exist?(mpi_path) + raise("No #{mpi_path} binary") + elsif daemon_type.nil? && !File.exist?(mpi_path) + LOGGER.warn("No #{mpi_path} binary. Fallback to #{BINDIR}/sync_daemon_fs") + end + daemon = (daemon_type != 'fs' && File.exist?(mpi_path)) ? mpi_path : "#{BINDIR}/sync_daemon_fs" + + LOGGER.debug { "spawn(#{daemon})" } + lazy_exec("Initialize SyncDaemon #{daemon_type}") do + spawn_daemon { |fd| Process.spawn(daemon, fd.to_s, fd => fd) } end end + def local_barrier(name) + lazy_exec("Local_barrier #{name}") { @parent.sendmsg(MSG_LOCAL_BARRIER) } + end + def global_barrier - lazy_exec_mpi('Global_barrier') do - @parent.sendmsg(MSG_GLOBAL_BARRIER) - end + lazy_exec('Global_barrier') { @parent.sendmsg(MSG_GLOBAL_BARRIER) } end # The context manager ensure that when the block yield is exited @@ -438,9 +445,8 @@ class SyncDaemon private - def wait_ready - reply, = @parent.recvmsg(64) - raise "SyncDaemon: expected #{MSG_READY}, got '#{reply}'" unless reply == MSG_READY + def active? + MPITopo.active? end end @@ -454,21 +460,7 @@ def sampling? end class SamplingDaemon - MSG_INIT = 'INIT' - MSG_FINISH = 'FINISH' - MSG_READY = 'READY' - - # block triggers the daemon; lazy_exec blocks on its READY reply. - def lazy_exec_sampling(message = nil, &block) - LOGGER.with_info_block(message) do - unless sampling? - LOGGER.debug("#{message}: No-op") - return - end - block.call - wait_ready - end - end + include DaemonClient def initialize(h = {}) daemon_path = "#{BINDIR}/thapi_sampling_daemon" @@ -476,33 +468,15 @@ class SamplingDaemon path_so = h.fetch('THAPI_SAMPLING_LIBRARIES', '').gsub(':', ' ') LOGGER.debug { "spawn(sampling_daemon) #{path_so}" } - lazy_exec_sampling('Initialize SamplingDaemon') do - @parent, child = UNIXSocket.pair(:SEQPACKET) - # Ruby sets O_NONBLOCK on sockets by default; the C daemon uses - # blocking read, so clear it on the inherited fd. - child.nonblock = false - @pid = Process.spawn(h, "#{daemon_path} #{child.fileno} #{path_so}", - child.fileno => child.fileno) - child.close - @parent.sendmsg(MSG_INIT) + lazy_exec('Initialize SamplingDaemon') do + spawn_daemon { |fd| Process.spawn(h, "#{daemon_path} #{fd} #{path_so}", fd => fd) } end end - def finalize - LOGGER.debug { 'Finalize SamplingDaemon' } - return unless sampling? - - @parent.sendmsg(MSG_FINISH) - wait_ready - Process.wait(@pid) - @parent.close - end - private - def wait_ready - reply, = @parent.recvmsg(64) - raise "SamplingDaemon: expected #{MSG_READY}, got '#{reply}'" unless reply == MSG_READY + def active? + sampling? end end From a0dfbcf925b9ecc84c67787084eecc9738f379f1 Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Tue, 19 May 2026 17:43:30 -0500 Subject: [PATCH 07/12] Drop redundant daemon-spawn debug logs These LOGGER.debug { "spawn(...)" } lines predated the socketpair refactor when they included Process.pid (needed for kill-based IPC). Now that spawn is gated by lazy_exec, the wrapping LOGGER.with_info_block("Initialize ...Daemon ...") already logs entry and exit; the inner debug is dead. Co-Authored-By: Claude Opus 4.7 --- xprof/xprof.rb.in | 2 -- 1 file changed, 2 deletions(-) diff --git a/xprof/xprof.rb.in b/xprof/xprof.rb.in index 491dd8b3f..87c0b908a 100755 --- a/xprof/xprof.rb.in +++ b/xprof/xprof.rb.in @@ -415,7 +415,6 @@ class SyncDaemon end daemon = (daemon_type != 'fs' && File.exist?(mpi_path)) ? mpi_path : "#{BINDIR}/sync_daemon_fs" - LOGGER.debug { "spawn(#{daemon})" } lazy_exec("Initialize SyncDaemon #{daemon_type}") do spawn_daemon { |fd| Process.spawn(daemon, fd.to_s, fd => fd) } end @@ -467,7 +466,6 @@ class SamplingDaemon raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path) path_so = h.fetch('THAPI_SAMPLING_LIBRARIES', '').gsub(':', ' ') - LOGGER.debug { "spawn(sampling_daemon) #{path_so}" } lazy_exec('Initialize SamplingDaemon') do spawn_daemon { |fd| Process.spawn(h, "#{daemon_path} #{fd} #{path_so}", fd => fd) } end From ff65ac5bae8687f9b7457277899333faf58ce2c2 Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Wed, 20 May 2026 17:12:47 +0000 Subject: [PATCH 08/12] Refactor daemon classes: self-gating, unified spawn(), no-op when inactive - Lift daemon spawn boilerplate into DaemonClient#spawn (mirrors Process.spawn signature + log:); fd insertion and `fd => fd` cloexec clearing now hidden from callers. - Both SyncDaemon and SamplingDaemon early-return on `unless active?`, so bogus THAPI_SYNC_DAEMON values and missing binaries no longer raise on runs that don't actually need a daemon. - Replace top-level `sampling?` with `SamplingDaemon.active?` (class method); add SyncDaemon.active? for symmetry; hoist the instance `active?` delegate into DaemonClient. - SyncDaemon#initialize daemon-path selection rewritten as case/when, with a better error pointing at --disable-sync-daemon-mpi. - Fix SAMPLING_DAEMON_PATH copy-paste bug (was pointing at sync_daemon_mpi); switch sampling spawn to argv-form to drop the shell middleman. - LocalMaster.setup guards SamplingDaemon against bt_analysis raising before teardown can run. - Add integration test: bogus THAPI_SYNC_DAEMON is ignored when not under MPI. Co-Authored-By: Claude Opus 4.7 (1M context) --- integration_tests/parallel_execution.bats | 5 + xprof/xprof.rb.in | 139 +++++++++++++--------- 2 files changed, 86 insertions(+), 58 deletions(-) diff --git a/integration_tests/parallel_execution.bats b/integration_tests/parallel_execution.bats index aea05874c..35d413f6d 100644 --- a/integration_tests/parallel_execution.bats +++ b/integration_tests/parallel_execution.bats @@ -45,6 +45,11 @@ launch_mpi() { THAPI_SYNC_DAEMON_MPI_NO_FINALIZE=1 THAPI_SYNC_DAEMON=mpi launch_mpi -n 2 iprof ./mpi_helloworld } +# Non-MPI runs should ignore THAPI_SYNC_DAEMON entirely (no validation, no spawn). +@test "sync_daemon_ignored_without_mpi" { + THAPI_SYNC_DAEMON=whatever-not-in-list iprof -- true +} + # Test Traced Rank @test "iprof_mpi+traced_ranks" { diff --git a/xprof/xprof.rb.in b/xprof/xprof.rb.in index 87c0b908a..3226787de 100755 --- a/xprof/xprof.rb.in +++ b/xprof/xprof.rb.in @@ -344,9 +344,12 @@ def thapi_trace_dir_root end end -# Shared socketpair protocol between iprof and its sync/sampling daemons. -# Subclasses define `active?` (gating predicate) and call spawn_daemon -# from their initialize. +# _ +# | \ _. _ ._ _ _ ._ +# |_/ (_| (/_ | | | (_) | | +# +# Shared socketpair protocol between iprof and its (sync/sampling) daemons. +# Subclasses define `active?` (gating predicate) and call `spawn` from their initialize. module DaemonClient MSG_INIT = 'INIT' MSG_FINISH = 'FINISH' @@ -374,18 +377,32 @@ module DaemonClient @parent.close end - private + # Instance delegate so DaemonClient#lazy_exec / #finalize can call `active?` + # on the instance. Subclasses define the class-level `self.active?`. + def active? + self.class.active? + end - def spawn_daemon - @parent, child = UNIXSocket.pair(:SEQPACKET) - # Ruby sets O_NONBLOCK on sockets by default; the daemon uses - # blocking read, so clear it on the inherited fd. - child.nonblock = false - @pid = yield child.fileno - child.close - @parent.sendmsg(MSG_INIT) + # Mirrors `Process.spawn(*cmd)` but adds: + # - `log:` for the lazy_exec message + # - implicit socketpair: the daemon's end is inserted as the first program + # argument (after optional env hash) and kept open across exec via `fd => fd`. + def spawn(*cmd, log:) + lazy_exec(log) do + @parent, child = UNIXSocket.pair(:SEQPACKET) + # Ruby sets O_NONBLOCK on sockets by default; the daemon uses + # blocking read, so clear it on the inherited fd. + child.nonblock = false + fd = child.fileno + insert_at = cmd.first.is_a?(Hash) ? 2 : 1 + @pid = Process.spawn(*cmd.dup.insert(insert_at, fd.to_s), fd => fd) + child.close + @parent.sendmsg(MSG_INIT) + end end + private + def wait_ready reply, = @parent.recvmsg(64) raise "#{self.class}: expected #{MSG_READY}, got '#{reply}'" unless reply == MSG_READY @@ -402,22 +419,37 @@ class SyncDaemon MSG_LOCAL_BARRIER = 'LOCAL_BARRIER' MSG_GLOBAL_BARRIER = 'GLOBAL_BARRIER' + SYNC_DAEMON_MPI_PATH = "#{BINDIR}/sync_daemon_mpi" + SYNC_DAEMON_FS_PATH = "#{BINDIR}/sync_daemon_fs" + + def self.active? + MPITopo.active? + end + def initialize - daemon_type = env_fetch_first('THAPI_SYNC_DAEMON') - raise("Error: THAPI_SYNC_DAEMON=#{daemon_type} is not supported. Allowed: [mpi,fs]") \ - unless [nil, 'mpi', 'fs'].include?(daemon_type) - - mpi_path = "#{BINDIR}/sync_daemon_mpi" - if daemon_type == 'mpi' && !File.exist?(mpi_path) - raise("No #{mpi_path} binary") - elsif daemon_type.nil? && !File.exist?(mpi_path) - LOGGER.warn("No #{mpi_path} binary. Fallback to #{BINDIR}/sync_daemon_fs") - end - daemon = (daemon_type != 'fs' && File.exist?(mpi_path)) ? mpi_path : "#{BINDIR}/sync_daemon_fs" + return unless active? - lazy_exec("Initialize SyncDaemon #{daemon_type}") do - spawn_daemon { |fd| Process.spawn(daemon, fd.to_s, fd => fd) } - end + daemon_path = case (daemon_type = env_fetch_first('THAPI_SYNC_DAEMON')) + when 'mpi' + unless File.exist?(SYNC_DAEMON_MPI_PATH) + raise "No #{SYNC_DAEMON_MPI_PATH} binary. Did you compile with `--disable-sync-daemon-mpi`?" + end + + SYNC_DAEMON_MPI_PATH + when 'fs' + SYNC_DAEMON_FS_PATH + when nil + if File.exist?(SYNC_DAEMON_MPI_PATH) + SYNC_DAEMON_MPI_PATH + else + LOGGER.warn("No #{SYNC_DAEMON_MPI_PATH} binary. Fallback to #{SYNC_DAEMON_FS_PATH}") + SYNC_DAEMON_FS_PATH + end + else + raise "Error: THAPI_SYNC_DAEMON=#{daemon_type} is not supported. Allowed: [mpi,fs]" + end + + spawn(daemon_path, log: "Initialize SyncDaemon #{daemon_type}") end def local_barrier(name) @@ -442,11 +474,6 @@ class SyncDaemon end end - private - - def active? - MPITopo.active? - end end # __ @@ -454,27 +481,21 @@ end # __) (_| | | | |_) | | | | (_| # | _| -def sampling? - OPTIONS[:sample] && MPITopo.local_master? -end - class SamplingDaemon include DaemonClient - def initialize(h = {}) - daemon_path = "#{BINDIR}/thapi_sampling_daemon" - raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path) + SAMPLING_DAEMON_PATH = "#{BINDIR}/thapi_sampling_daemon" - path_so = h.fetch('THAPI_SAMPLING_LIBRARIES', '').gsub(':', ' ') - lazy_exec('Initialize SamplingDaemon') do - spawn_daemon { |fd| Process.spawn(h, "#{daemon_path} #{fd} #{path_so}", fd => fd) } - end + def self.active? + OPTIONS[:sample] && MPITopo.local_master? end - private + def initialize(h = {}) + return unless active? + raise "No sampling_daemon binary found at #{SAMPLING_DAEMON_PATH}" unless File.exist?(SAMPLING_DAEMON_PATH) - def active? - sampling? + path_so = h.fetch('THAPI_SAMPLING_LIBRARIES', '').split(':') + spawn(h, SAMPLING_DAEMON_PATH, *path_so, log: 'Initialize SamplingDaemon') end end @@ -644,18 +665,20 @@ module LocalMaster raise unless MPITopo.local_master? lttng_setup(backends) - - sampling_daemon = SamplingDaemon.new(env) if sampling? - pids = bt_analysis(backends) if OPTIONS[:archive] - - [sampling_daemon, pids] + sampling_daemon = SamplingDaemon.new(env) + begin + archive_pids = bt_analysis(backends) if OPTIONS[:archive] + rescue StandardError + sampling_daemon.finalize + raise + end + [sampling_daemon, archive_pids] end - def teardown_daemon(sampling_daemon, archive_pids) + def teardown(sampling_daemon, archive_pids) raise unless MPITopo.local_master? - sampling_daemon.finalize if sampling? - + sampling_daemon.finalize lttng_teardown_session bt_wait_for_archive_processes(archive_pids) if OPTIONS[:archive] @@ -726,7 +749,7 @@ module LocalMaster profiling: OPTIONS[:profile]) end - if sampling? + if SamplingDaemon.active? channel_name = 'non-blocking-channel' exec("lttng enable-channel --userspace --session=#{lttng_session_uuid} #{channel_name}") exec("lttng add-context --userspace --session=#{lttng_session_uuid} --channel=#{channel_name} -t vpid -t vtid") @@ -900,7 +923,7 @@ def all_env_tracers(usr_binary) # Customization if OPTIONS[:'backend-names'].include?('ze') h['LTTNG_UST_ZE_PARANOID_DRIFT'] = 1 if OPTIONS[:profile] - if sampling? + if SamplingDaemon.active? # The current only reliable way to use zes api # is to call zesInit and set ZES_ENABLE_SYSMAN to 0 h['ZES_ENABLE_SYSMAN'] = 0 @@ -916,7 +939,7 @@ def all_env_tracers(usr_binary) if OPTIONS[:'backend-names'].include?('cxi') backends << 'cxi' - if sampling? + if SamplingDaemon.active? h['LTTNG_UST_CXI_SAMPLING_CXI'] = 1 h['THAPI_SAMPLING_LIBRARIES'] << File.join(PKGLIBDIR, 'cxi', 'libCXISampling.so') end @@ -928,7 +951,7 @@ def all_env_tracers(usr_binary) end # Sample - if sampling? + if SamplingDaemon.active? LOGGER.debug('Sampling Enabled') h['LTTNG_UST_SAMPLING'] = 1 h[%w[LD_LIBRARY_PATH prepend]] << File.join(PKGLIBDIR, 'sampling') @@ -961,7 +984,7 @@ def all_trace_and_processing(usr_argv) launch_usr_bin(env, usr_argv) ensure syncd.local_barrier('waiting_for_application_ending') - LocalMaster.teardown_daemon(sampling_daemon, archive_pids) if MPITopo.local_master? + LocalMaster.teardown(sampling_daemon, archive_pids) if MPITopo.local_master? end LocalMaster.bt_analysis(backends) if MPITopo.local_master? && !OPTIONS[:archive] From efd411ffe12955f006d3a5222ad52fea66d8a5c6 Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Wed, 20 May 2026 17:57:20 +0000 Subject: [PATCH 09/12] cleaning fs --- xprof/sync_daemon_fs | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/xprof/sync_daemon_fs b/xprof/sync_daemon_fs index efadb59ad..3547e25c7 100755 --- a/xprof/sync_daemon_fs +++ b/xprof/sync_daemon_fs @@ -51,30 +51,27 @@ def local_barrier(name) busy_wait_until { count_file(folder) == MPITopo.local_size } end +msg = nil sock = UNIXSocket.for_fd(ARGV[0].to_i) global_handle = init_global_barrier local_barrier_count = 0 -loop do +# We cannot delete SHARED_LOCAL_FILESYSTEM on FINISH: some rank can exit +# global_barrier (hence reach FINISH) while others are still in +# local_barrier. Removing SHARED_LOCAL_FILESYSTEM would deadlock them. +# One fix: have all ranks busy_wait_until in global_barrier so we +# know local_barrier is done — but given FS performance, we skip it. +until msg == SyncDaemon::MSG_FINISH msg, = sock.recvmsg(64) raise 'sync_daemon_fs: parent closed socket without sending FINISH' if msg.empty? case msg - when SyncDaemon::MSG_INIT - # Initial handshake; no work to do. + when SyncDaemon::MSG_INIT then nil when SyncDaemon::MSG_LOCAL_BARRIER local_barrier(local_barrier_count.to_s) local_barrier_count += 1 - when SyncDaemon::MSG_GLOBAL_BARRIER - global_barrier(global_handle) - when SyncDaemon::MSG_FINISH - # We cannot delete SHARED_LOCAL_FILESYSTEM here: some rank can exit - # global_barrier (hence reach FINISH) while others are still in - # local_barrier. Removing SHARED_LOCAL_FILESYSTEM would deadlock them. - # One fix: have all ranks busy_wait_until in global_barrier so we - # know local_barrier is done — but given FS performance, we skip it. - sock.sendmsg(SyncDaemon::MSG_READY) - break + when SyncDaemon::MSG_GLOBAL_BARRIER then global_barrier(global_handle) + when SyncDaemon::MSG_FINISH then nil else warn("sync_daemon_fs: unknown message '#{msg}'") exit(1) From 8af5e4c32ad72fee95674d402e3b354b67075a26 Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Wed, 20 May 2026 18:54:13 +0000 Subject: [PATCH 10/12] Surface daemon non-zero exit in DaemonClient#finalize MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Was discarding $? from Process.wait. If the daemon crashes during teardown (e.g., MPI_Finalize segfault — see THAPI_SYNC_DAEMON_MPI_NO_FINALIZE workaround in parallel_execution.bats), the failure was silent. Switch to wait2 and warn on non-success. Co-Authored-By: Claude Opus 4.7 --- xprof/xprof.rb.in | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/xprof/xprof.rb.in b/xprof/xprof.rb.in index 3226787de..9b4804dd6 100755 --- a/xprof/xprof.rb.in +++ b/xprof/xprof.rb.in @@ -373,8 +373,9 @@ module DaemonClient @parent.sendmsg(MSG_FINISH) wait_ready - Process.wait(@pid) + _, status = Process.wait2(@pid) @parent.close + LOGGER.warn("#{self.class} exited #{status}") unless status.success? end # Instance delegate so DaemonClient#lazy_exec / #finalize can call `active?` @@ -473,7 +474,6 @@ class SyncDaemon syncd.finalize end end - end # __ From 4416be36cb6ecc743d3c0bd87b5fddcb422de2dc Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Wed, 20 May 2026 18:54:25 +0000 Subject: [PATCH 11/12] Factor warning flags into AM_C{,XX}FLAGS Pull '-Wall -Wextra $(WERROR)' (plus '-std=c++17' for C++) out of each per-target line into AM_CFLAGS / AM_CXXFLAGS. Per-target rules that already set _CFLAGS / _CXXFLAGS now reference $(AM_*FLAGS) explicitly, since automake target-specific flags replace rather than extend AM_*. Side fix: sampling/thapi_sampling_daemon was missing -std=c++17 entirely. It now picks it up via AM_CXXFLAGS. Co-Authored-By: Claude Opus 4.7 --- sampling/Makefile.am | 11 +++++++---- xprof/Makefile.am | 25 ++++++++++++++----------- 2 files changed, 21 insertions(+), 15 deletions(-) diff --git a/sampling/Makefile.am b/sampling/Makefile.am index 8a8ee1202..fc378cfe5 100644 --- a/sampling/Makefile.am +++ b/sampling/Makefile.am @@ -4,6 +4,9 @@ else WERROR = endif +AM_CFLAGS = -Wall -Wextra $(WERROR) +AM_CXXFLAGS = -std=c++17 -Wall -Wextra $(WERROR) + TRACEPOINT_GEN = \ $(srcdir)/sampling_events.yaml @@ -29,7 +32,7 @@ nodist_libsamplingtracepoints_la_SOURCES = \ $(SAMPLING_STATIC_PROBES_SRC) libsamplingtracepoints_la_CPPFLAGS = -I$(top_srcdir)/utils -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./ -libsamplingtracepoints_la_CFLAGS = -fPIC -Wall -Wextra -Wno-unused-parameter -Wno-type-limits -Wno-sign-compare $(WERROR) $(LTTNG_UST_CFLAGS) +libsamplingtracepoints_la_CFLAGS = $(AM_CFLAGS) -fPIC -Wno-unused-parameter -Wno-type-limits -Wno-sign-compare $(LTTNG_UST_CFLAGS) libsamplingtracepoints_la_LDFLAGS = $(LTTNG_UST_LIBS) @@ -50,7 +53,7 @@ libThapiSampling_la_SOURCES = \ thapi_sampling.c libThapiSampling_la_CPPFLAGS = -I$(top_srcdir)/utils/include -libThapiSampling_la_CFLAGS = -Wall -Wextra $(WERROR) +libThapiSampling_la_CFLAGS = $(AM_CFLAGS) libThapiSampling_la_LDFLAGS = -lpthread -version-info 1:0:0 $(LTTNG_UST_LIBS) bin_PROGRAMS = thapi_sampling_daemon @@ -60,7 +63,7 @@ thapi_sampling_daemon_SOURCES = \ thapi_sampling_daemon.cpp thapi_sampling_daemon_CPPFLAGS = -I$(top_srcdir)/utils/include -thapi_sampling_daemon_CXXFLAGS = -Wall -Wextra $(WERROR) +thapi_sampling_daemon_CXXFLAGS = $(AM_CXXFLAGS) libHeartbeatSampling_la_SOURCES = heartbeat_sampling_plugin.c @@ -68,7 +71,7 @@ nodist_libHeartbeatSampling_la_SOURCES = \ $(SAMPLING_STATIC_PROBES_INCL) libHeartbeatSampling_la_CPPFLAGS = -I$(top_srcdir)/utils/include -libHeartbeatSampling_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) $(LTTNG_UST_CFLAGS) +libHeartbeatSampling_la_CFLAGS = $(AM_CFLAGS) -Wno-unused-parameter $(LTTNG_UST_CFLAGS) libHeartbeatSampling_la_LDFLAGS = -avoid-version -module libHeartbeatSampling_la_LIBADD = libThapiSampling.la libsamplingtracepoints.la -ldl $(LTTNG_UST_LIBS) diff --git a/xprof/Makefile.am b/xprof/Makefile.am index f2177d402..affe3316c 100644 --- a/xprof/Makefile.am +++ b/xprof/Makefile.am @@ -6,6 +6,9 @@ else WERROR = endif +AM_CFLAGS = -Wall -Wextra $(WERROR) +AM_CXXFLAGS = -std=c++17 -Wall -Wextra $(WERROR) + bin_SCRIPTS = \ iprof \ sync_daemon_fs @@ -15,7 +18,7 @@ if FOUND_MPI endif sync_daemon_mpi: $(srcdir)/sync_daemon_mpi.cpp - $(MPICXX) $(CXXFLAGS) -std=c++17 $(WERROR) -Wall -Wextra -I$(top_srcdir)/utils/include $< -o $@ + $(MPICXX) $(CXXFLAGS) $(AM_CXXFLAGS) -I$(top_srcdir)/utils/include $< -o $@ iprof: xprof.rb cp xprof.rb $@ @@ -130,23 +133,23 @@ bt2_LTLIBRARIES = libXTally.la libXTimeline.la libXAggreg.la libXStripper.la # Compiler flags libXTally_la_CPPFLAGS = -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./ -I./btx_sink_tally -libXTally_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) -libXTally_la_CXXFLAGS = -std=c++17 -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) +libXTally_la_CFLAGS = $(AM_CFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) +libXTally_la_CXXFLAGS = $(AM_CXXFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) libXTally_la_LDFLAGS = $(BABELTRACE2_LIBS) -avoid-version -module libXTimeline_la_CPPFLAGS = -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./ -I./btx_sink_timeline -libXTimeline_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) -libXTimeline_la_CXXFLAGS = -std=c++17 -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) $(PROTOBUF_CFLAGS) +libXTimeline_la_CFLAGS = $(AM_CFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) +libXTimeline_la_CXXFLAGS = $(AM_CXXFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) $(PROTOBUF_CFLAGS) libXTimeline_la_LDFLAGS = $(BABELTRACE2_LIBS) $(PROTOBUF_LIBS) -avoid-version -module libXAggreg_la_CPPFLAGS = -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./ -I./btx_filter_aggreg -libXAggreg_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) -libXAggreg_la_CXXFLAGS = -std=c++17 -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) +libXAggreg_la_CFLAGS = $(AM_CFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) +libXAggreg_la_CXXFLAGS = $(AM_CXXFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) libXAggreg_la_LDFLAGS = $(BABELTRACE2_LIBS) -avoid-version -module libXStripper_la_CPPFLAGS = -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./ -I./btx_filter_stripper -libXStripper_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) -libXStripper_la_CXXFLAGS = -std=c++17 -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) +libXStripper_la_CFLAGS = $(AM_CFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) +libXStripper_la_CXXFLAGS = $(AM_CXXFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) libXStripper_la_LDFLAGS = $(BABELTRACE2_LIBS) -avoid-version -module # Cannot use check_LTLIBRARIES because we need the shared version of those @@ -172,7 +175,7 @@ $(BTX_INTERVAL_GENERATED_SOURCE_TEST) &: $(srcdir)/btx_interval_model.yaml noinst_LTLIBRARIES = libtestintervalsource.la nodist_libtestintervalsource_la_SOURCES = $(BTX_INTERVAL_GENERATED_SOURCE_TEST) libtestintervalsource_la_CPPFLAGS = -I$(top_srcdir)/utils -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./btx_source_interval_test/ -libtestintervalsource_la_CFLAGS = -fPIC -Wall -Wextra -Wno-unused-parameter $(WERROR) $(BABELTRACE2_CFLAGS) +libtestintervalsource_la_CFLAGS = $(AM_CFLAGS) -fPIC -Wno-unused-parameter $(BABELTRACE2_CFLAGS) BTX_AGGREG_GENERATED_SOURCE_TEST = \ btx_source_aggreg_test/metababel/metababel.h \ @@ -188,7 +191,7 @@ $(BTX_AGGREG_GENERATED_SOURCE_TEST) &: $(srcdir)/btx_aggreg_model.yaml noinst_LTLIBRARIES += libtestaggregsource.la nodist_libtestaggregsource_la_SOURCES = $(BTX_AGGREG_GENERATED_SOURCE_TEST) libtestaggregsource_la_CPPFLAGS = -I$(top_srcdir)/utils -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./btx_source_aggreg_test/ -libtestaggregsource_la_CFLAGS = -fPIC -Wall -Wextra -Wno-unused-parameter $(WERROR) $(BABELTRACE2_CFLAGS) +libtestaggregsource_la_CFLAGS = $(AM_CFLAGS) -fPIC -Wno-unused-parameter $(BABELTRACE2_CFLAGS) TRACE_COMMON = \ From 7e4ae123e013bd0fd50b2a400629bd4b88f1bd6a Mon Sep 17 00:00:00 2001 From: Thomas Applencourt Date: Wed, 20 May 2026 18:54:34 +0000 Subject: [PATCH 12/12] Apply clang-format-18 and minor whitespace tidy clang-format-18 on daemon_proto.hpp (CI runs the same version) and strip an over-aligned 'then nil' in sync_daemon_fs's case arm. Co-Authored-By: Claude Opus 4.7 --- utils/include/daemon_proto.hpp | 8 ++++---- xprof/sync_daemon_fs | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/utils/include/daemon_proto.hpp b/utils/include/daemon_proto.hpp index f635affdd..6ce08d55d 100644 --- a/utils/include/daemon_proto.hpp +++ b/utils/include/daemon_proto.hpp @@ -15,11 +15,11 @@ namespace daemon_proto { using namespace std::string_view_literals; -constexpr auto MSG_INIT = "INIT"sv; -constexpr auto MSG_LOCAL_BARRIER = "LOCAL_BARRIER"sv; +constexpr auto MSG_INIT = "INIT"sv; +constexpr auto MSG_LOCAL_BARRIER = "LOCAL_BARRIER"sv; constexpr auto MSG_GLOBAL_BARRIER = "GLOBAL_BARRIER"sv; -constexpr auto MSG_FINISH = "FINISH"sv; -constexpr auto MSG_READY = "READY"sv; +constexpr auto MSG_FINISH = "FINISH"sv; +constexpr auto MSG_READY = "READY"sv; inline int send_msg(const char *who, int fd, std::string_view msg) { if (write(fd, msg.data(), msg.size()) < 0) { diff --git a/xprof/sync_daemon_fs b/xprof/sync_daemon_fs index 3547e25c7..1166dbfd7 100755 --- a/xprof/sync_daemon_fs +++ b/xprof/sync_daemon_fs @@ -66,7 +66,7 @@ until msg == SyncDaemon::MSG_FINISH raise 'sync_daemon_fs: parent closed socket without sending FINISH' if msg.empty? case msg - when SyncDaemon::MSG_INIT then nil + when SyncDaemon::MSG_INIT then nil when SyncDaemon::MSG_LOCAL_BARRIER local_barrier(local_barrier_count.to_s) local_barrier_count += 1