diff --git a/configure.ac b/configure.ac index 089c4beef..4267c45ac 100644 --- a/configure.ac +++ b/configure.ac @@ -68,10 +68,12 @@ AC_ARG_ENABLE([sync-daemon-mpi], [enable_sync_daemon_mpi="$enableval" user_set_sync_daemon_mpi="yes_$enableval"], [enable_sync_daemon_mpi="yes" user_set_sync_daemon_mpi="no"]) AC_SUBST([user_set_sync_daemon_mpi]) +AC_LANG_PUSH([C++]) AS_IF([test "x$enable_sync_daemon_mpi" != xno], [AX_MPI([enable_sync_daemon_mpi=yes], [enable_sync_daemon_mpi=no])]) AS_IF([test "x$enable_sync_daemon_mpi" != xno], [AX_MPI_SESSION([enable_sync_daemon_mpi=yes], [enable_sync_daemon_mpi=no])]) +AC_LANG_POP([C++]) AS_IF([test "x$user_set_sync_daemon_mpi" = "xyes_yes" && test "x$enable_sync_daemon_mpi" = "xno"], [AC_MSG_ERROR([sync-daemon with MPI requested but MPI Sessions are not supported!])]) AS_IF([test "x$user_set_sync_daemon_mpi" != "xyes_no" && test "x$enable_sync_daemon_mpi" = "xno"], diff --git a/integration_tests/light_iprof_only_sync.rb b/integration_tests/light_iprof_only_sync.rb new file mode 100755 index 000000000..f06c48c7a --- /dev/null +++ b/integration_tests/light_iprof_only_sync.rb @@ -0,0 +1,42 @@ +#!/usr/bin/env ruby + +# Stand-in for the iprof Ruby driver: spawn the configured sync daemon, +# drive the socketpair protocol manually, run the user command in between. +# Used by integration tests to exercise daemon binaries without the rest +# of iprof. + +require 'socket' +require 'io/nonblock' + +daemon_kind = ENV.fetch('THAPI_SYNC_DAEMON') +daemon = "sync_daemon_#{daemon_kind}" + +MSG_INIT = 'INIT' +MSG_LOCAL_BARRIER = 'LOCAL_BARRIER' +MSG_GLOBAL_BARRIER = 'GLOBAL_BARRIER' +MSG_FINISH = 'FINISH' +MSG_READY = 'READY' + +parent, child = UNIXSocket.pair(:SEQPACKET) +# Ruby sets O_NONBLOCK on sockets by default; the daemon uses blocking +# read, so clear it on the inherited fd. +child.nonblock = false +pid = Process.spawn(daemon, child.fileno.to_s, child.fileno => child.fileno) +child.close + +send_and_wait = lambda do |msg| + parent.sendmsg(msg) + reply, = parent.recvmsg(64) + raise "expected #{MSG_READY}, got '#{reply}'" unless reply == MSG_READY +end + +send_and_wait.call(MSG_INIT) +send_and_wait.call(MSG_LOCAL_BARRIER) + +system(*ARGV) || exit($?.exitstatus || 1) + +send_and_wait.call(MSG_LOCAL_BARRIER) +send_and_wait.call(MSG_GLOBAL_BARRIER) +send_and_wait.call(MSG_FINISH) +parent.close +Process.wait(pid) diff --git a/integration_tests/light_iprof_only_sync.sh b/integration_tests/light_iprof_only_sync.sh deleted file mode 100755 index a0588eeb8..000000000 --- a/integration_tests/light_iprof_only_sync.sh +++ /dev/null @@ -1,64 +0,0 @@ -#!/bin/bash -set -euo pipefail - -# For loging and Daemon to send signal to us -PARENT_PID=$$ - -# Get base real-time signal number -SIGRTMIN=$(kill -l SIGRTMIN) - -# Set signals as defined in MPI daemon code -RT_SIGNAL_READY=$((SIGRTMIN + 0)) -RT_SIGNAL_GLOBAL_BARRIER=$((SIGRTMIN + 1)) -RT_SIGNAL_LOCAL_BARRIER=$((SIGRTMIN + 2)) -RT_SIGNAL_FINISH=$((SIGRTMIN + 3)) - -# Signal handler for capturing signals -handle_signal() { - echo "$PARENT_PID $(date) | Received signal $1 from sync_daemon" - if [ "$1" == "RT_SIGNAL_READY" ]; then - SIGNAL_RECEIVED="true" - fi -} - -# Setup trap for ready signal sent from signal daemon -trap 'handle_signal RT_SIGNAL_READY' $RT_SIGNAL_READY - -# Function to wait for RT_SIGNAL_READY -wait_for_signal() { - while [[ "$SIGNAL_RECEIVED" == "false" ]]; do - sleep 0.1 # Small sleep to prevent busy looping - done -} - -# To avoid race condition, `SIGNAL_RECEIVED` need to be set -# before spawning or signaling the daemon -spawn_daemon_blocking() { - SIGNAL_RECEIVED="false" - sync_daemon_"${THAPI_SYNC_DAEMON}" $PARENT_PID & - DAEMON_PID=$! - wait_for_signal -} - -send_signal_blocking() { - SIGNAL_RECEIVED="false" - kill -"$1" $DAEMON_PID - wait_for_signal -} - -echo "$PARENT_PID $(date) | Spawn Daemon" -spawn_daemon_blocking -echo "$PARENT_PID $(date) | Send Local Barrier signal" -send_signal_blocking $RT_SIGNAL_LOCAL_BARRIER -# Run test program -"$@" - -# Final synchronization after mpi_hello_world execution -echo "$PARENT_PID $(date) | Send Local Barrier signal" -send_signal_blocking $RT_SIGNAL_LOCAL_BARRIER -echo "$PARENT_PID $(date) | Send Global Barrier signal" -send_signal_blocking $RT_SIGNAL_GLOBAL_BARRIER -echo "$PARENT_PID $(date) | Send Termination signal" -kill -"$RT_SIGNAL_FINISH" $DAEMON_PID -echo "$PARENT_PID $(date) | Wait for daemon to quit" -wait $DAEMON_PID diff --git a/integration_tests/parallel_execution.bats b/integration_tests/parallel_execution.bats index 74c79b95f..35d413f6d 100644 --- a/integration_tests/parallel_execution.bats +++ b/integration_tests/parallel_execution.bats @@ -8,7 +8,7 @@ launch_mpi() { # THAPI_SYNC_DAEMON=fs Tests @test "sync_daemon_fs" { - THAPI_SYNC_DAEMON=fs launch_mpi -n 2 ./integration_tests/light_iprof_only_sync.sh clinfo + THAPI_SYNC_DAEMON=fs launch_mpi -n 2 ./integration_tests/light_iprof_only_sync.rb clinfo } @test "iprof_fs" { @@ -27,7 +27,7 @@ launch_mpi() { # bats test_tags=mpi_sync_daemon @test "sync_daemon_mpi" { - THAPI_SYNC_DAEMON=mpi launch_mpi -n 2 ./integration_tests/light_iprof_only_sync.sh clinfo + THAPI_SYNC_DAEMON=mpi launch_mpi -n 2 ./integration_tests/light_iprof_only_sync.rb clinfo } # bats test_tags=mpi_sync_daemon @@ -45,6 +45,11 @@ launch_mpi() { THAPI_SYNC_DAEMON_MPI_NO_FINALIZE=1 THAPI_SYNC_DAEMON=mpi launch_mpi -n 2 iprof ./mpi_helloworld } +# Non-MPI runs should ignore THAPI_SYNC_DAEMON entirely (no validation, no spawn). +@test "sync_daemon_ignored_without_mpi" { + THAPI_SYNC_DAEMON=whatever-not-in-list iprof -- true +} + # Test Traced Rank @test "iprof_mpi+traced_ranks" { diff --git a/sampling/Makefile.am b/sampling/Makefile.am index 8a8ee1202..fc378cfe5 100644 --- a/sampling/Makefile.am +++ b/sampling/Makefile.am @@ -4,6 +4,9 @@ else WERROR = endif +AM_CFLAGS = -Wall -Wextra $(WERROR) +AM_CXXFLAGS = -std=c++17 -Wall -Wextra $(WERROR) + TRACEPOINT_GEN = \ $(srcdir)/sampling_events.yaml @@ -29,7 +32,7 @@ nodist_libsamplingtracepoints_la_SOURCES = \ $(SAMPLING_STATIC_PROBES_SRC) libsamplingtracepoints_la_CPPFLAGS = -I$(top_srcdir)/utils -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./ -libsamplingtracepoints_la_CFLAGS = -fPIC -Wall -Wextra -Wno-unused-parameter -Wno-type-limits -Wno-sign-compare $(WERROR) $(LTTNG_UST_CFLAGS) +libsamplingtracepoints_la_CFLAGS = $(AM_CFLAGS) -fPIC -Wno-unused-parameter -Wno-type-limits -Wno-sign-compare $(LTTNG_UST_CFLAGS) libsamplingtracepoints_la_LDFLAGS = $(LTTNG_UST_LIBS) @@ -50,7 +53,7 @@ libThapiSampling_la_SOURCES = \ thapi_sampling.c libThapiSampling_la_CPPFLAGS = -I$(top_srcdir)/utils/include -libThapiSampling_la_CFLAGS = -Wall -Wextra $(WERROR) +libThapiSampling_la_CFLAGS = $(AM_CFLAGS) libThapiSampling_la_LDFLAGS = -lpthread -version-info 1:0:0 $(LTTNG_UST_LIBS) bin_PROGRAMS = thapi_sampling_daemon @@ -60,7 +63,7 @@ thapi_sampling_daemon_SOURCES = \ thapi_sampling_daemon.cpp thapi_sampling_daemon_CPPFLAGS = -I$(top_srcdir)/utils/include -thapi_sampling_daemon_CXXFLAGS = -Wall -Wextra $(WERROR) +thapi_sampling_daemon_CXXFLAGS = $(AM_CXXFLAGS) libHeartbeatSampling_la_SOURCES = heartbeat_sampling_plugin.c @@ -68,7 +71,7 @@ nodist_libHeartbeatSampling_la_SOURCES = \ $(SAMPLING_STATIC_PROBES_INCL) libHeartbeatSampling_la_CPPFLAGS = -I$(top_srcdir)/utils/include -libHeartbeatSampling_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) $(LTTNG_UST_CFLAGS) +libHeartbeatSampling_la_CFLAGS = $(AM_CFLAGS) -Wno-unused-parameter $(LTTNG_UST_CFLAGS) libHeartbeatSampling_la_LDFLAGS = -avoid-version -module libHeartbeatSampling_la_LIBADD = libThapiSampling.la libsamplingtracepoints.la -ldl $(LTTNG_UST_LIBS) diff --git a/sampling/thapi_sampling_daemon.cpp b/sampling/thapi_sampling_daemon.cpp index 77f2dec9a..338c6fea6 100644 --- a/sampling/thapi_sampling_daemon.cpp +++ b/sampling/thapi_sampling_daemon.cpp @@ -1,22 +1,22 @@ -#include +#include "daemon_proto.hpp" #include #include #include #include -#define RT_SIGNAL_READY (SIGRTMIN) -#define RT_SIGNAL_FINISH (SIGRTMIN + 3) -typedef void (*plugin_initialize_func)(void); -typedef void (*plugin_finalize_func)(void); +using namespace daemon_proto; -int main(int argc, char **argv) { +using plugin_initialize_func = void (*)(); +using plugin_finalize_func = void (*)(); - // Setup signaling, to exit the sampling loop - int parent_pid = std::atoi(argv[1]); - sigset_t signal_set; - sigemptyset(&signal_set); - sigaddset(&signal_set, RT_SIGNAL_FINISH); - sigprocmask(SIG_BLOCK, &signal_set, NULL); +constexpr auto WHO = "thapi_sampling_daemon"; + +int main(int argc, char **argv) { + if (argc < 2) { + std::cerr << "usage: " << argv[0] << " [plugin.so ...]" << std::endl; + return 1; + } + const int fd = std::atoi(argv[1]); // DL Open struct Plugin { @@ -33,30 +33,26 @@ int main(int argc, char **argv) { std::cerr << "Failed to load " << argv[i] << ": " << dlerror() << std::endl; continue; } - plugin_initialize_func init_func = + auto init_func = reinterpret_cast(dlsym(handle, "thapi_initialize_sampling_plugin")); - - plugin_finalize_func fini_func = + auto fini_func = reinterpret_cast(dlsym(handle, "thapi_finalize_sampling_plugin")); - plugins.push_back({handle, init_func, fini_func}); } // User pluging - for (const auto &plugin : plugins) { + for (const auto &plugin : plugins) plugin.initialize(); - } - // Signal Ready to manager - kill(parent_pid, RT_SIGNAL_READY); + // Handshake: parent → INIT, daemon → READY + if (recv_expect(WHO, fd, MSG_INIT) < 0) + return 1; + if (send_msg(WHO, fd, MSG_READY) < 0) + return 1; - // Wait for to finish - while (true) { - int signum; - sigwait(&signal_set, &signum); - if (signum == RT_SIGNAL_FINISH) - break; - } + // Wait for shutdown: parent → FINISH + if (recv_expect(WHO, fd, MSG_FINISH) < 0) + return 1; // Finalization for (const auto &plugin : plugins) { @@ -65,6 +61,9 @@ int main(int argc, char **argv) { dlclose(plugin.handle); } + if (send_msg(WHO, fd, MSG_READY) < 0) + return 1; + close(fd); // Will call the destructor, who will finalize all the not unregistered plugin return 0; } diff --git a/utils/Makefile.am b/utils/Makefile.am index 5de4a68dd..c3b96593a 100644 --- a/utils/Makefile.am +++ b/utils/Makefile.am @@ -10,6 +10,7 @@ noinst_HEADERS = \ include/utlist.h \ include/json.hpp \ include/magic_enum.hpp \ + include/daemon_proto.hpp \ xprof_utils.hpp nodist_noinst_HEADERS = lttng/tracepoint_gen.h diff --git a/utils/include/daemon_proto.hpp b/utils/include/daemon_proto.hpp new file mode 100644 index 000000000..6ce08d55d --- /dev/null +++ b/utils/include/daemon_proto.hpp @@ -0,0 +1,53 @@ +// Wire protocol shared between iprof (parent) and the C/C++ daemons +// (sync_daemon_mpi, thapi_sampling_daemon). Messages are short ASCII +// tokens exchanged over a SOCK_SEQPACKET socketpair; each parent +// command is acknowledged by the daemon with MSG_READY. + +#pragma once + +#include +#include +#include +#include +#include + +namespace daemon_proto { + +using namespace std::string_view_literals; + +constexpr auto MSG_INIT = "INIT"sv; +constexpr auto MSG_LOCAL_BARRIER = "LOCAL_BARRIER"sv; +constexpr auto MSG_GLOBAL_BARRIER = "GLOBAL_BARRIER"sv; +constexpr auto MSG_FINISH = "FINISH"sv; +constexpr auto MSG_READY = "READY"sv; + +inline int send_msg(const char *who, int fd, std::string_view msg) { + if (write(fd, msg.data(), msg.size()) < 0) { + std::perror(who); + return -1; + } + return 0; +} + +// Read one message from fd and verify it matches `want`. Returns 0 on +// match, -1 on syscall failure, EOF, or mismatch. +inline int recv_expect(const char *who, int fd, std::string_view want) { + char buf[64]; + const ssize_t n = read(fd, buf, sizeof(buf)); + if (n < 0) { + std::perror(who); + return -1; + } + if (n == 0) { + std::cerr << who << ": parent closed socket unexpectedly" << std::endl; + return -1; + } + const std::string_view got(buf, n); + if (got != want) { + std::cerr << who << ": expected " << want << ", got " << got << std::endl; + return -1; + } + return 0; +} + +} // namespace daemon_proto diff --git a/xprof/Makefile.am b/xprof/Makefile.am index d50dcec89..affe3316c 100644 --- a/xprof/Makefile.am +++ b/xprof/Makefile.am @@ -6,6 +6,9 @@ else WERROR = endif +AM_CFLAGS = -Wall -Wextra $(WERROR) +AM_CXXFLAGS = -std=c++17 -Wall -Wextra $(WERROR) + bin_SCRIPTS = \ iprof \ sync_daemon_fs @@ -14,8 +17,8 @@ if FOUND_MPI bin_SCRIPTS += sync_daemon_mpi endif -sync_daemon_mpi: $(srcdir)/sync_daemon_mpi.c - $(MPICC) $(CFLAGS) $< -o $@ +sync_daemon_mpi: $(srcdir)/sync_daemon_mpi.cpp + $(MPICXX) $(CXXFLAGS) $(AM_CXXFLAGS) -I$(top_srcdir)/utils/include $< -o $@ iprof: xprof.rb cp xprof.rb $@ @@ -130,23 +133,23 @@ bt2_LTLIBRARIES = libXTally.la libXTimeline.la libXAggreg.la libXStripper.la # Compiler flags libXTally_la_CPPFLAGS = -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./ -I./btx_sink_tally -libXTally_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) -libXTally_la_CXXFLAGS = -std=c++17 -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) +libXTally_la_CFLAGS = $(AM_CFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) +libXTally_la_CXXFLAGS = $(AM_CXXFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) libXTally_la_LDFLAGS = $(BABELTRACE2_LIBS) -avoid-version -module libXTimeline_la_CPPFLAGS = -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./ -I./btx_sink_timeline -libXTimeline_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) -libXTimeline_la_CXXFLAGS = -std=c++17 -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) $(PROTOBUF_CFLAGS) +libXTimeline_la_CFLAGS = $(AM_CFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) +libXTimeline_la_CXXFLAGS = $(AM_CXXFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) $(PROTOBUF_CFLAGS) libXTimeline_la_LDFLAGS = $(BABELTRACE2_LIBS) $(PROTOBUF_LIBS) -avoid-version -module libXAggreg_la_CPPFLAGS = -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./ -I./btx_filter_aggreg -libXAggreg_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) -libXAggreg_la_CXXFLAGS = -std=c++17 -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) +libXAggreg_la_CFLAGS = $(AM_CFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) +libXAggreg_la_CXXFLAGS = $(AM_CXXFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) libXAggreg_la_LDFLAGS = $(BABELTRACE2_LIBS) -avoid-version -module libXStripper_la_CPPFLAGS = -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./ -I./btx_filter_stripper -libXStripper_la_CFLAGS = -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) -libXStripper_la_CXXFLAGS = -std=c++17 -Wall -Wextra -Wno-unused-parameter $(WERROR) -fno-fast-math $(BABELTRACE2_CFLAGS) +libXStripper_la_CFLAGS = $(AM_CFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) +libXStripper_la_CXXFLAGS = $(AM_CXXFLAGS) -Wno-unused-parameter -fno-fast-math $(BABELTRACE2_CFLAGS) libXStripper_la_LDFLAGS = $(BABELTRACE2_LIBS) -avoid-version -module # Cannot use check_LTLIBRARIES because we need the shared version of those @@ -172,7 +175,7 @@ $(BTX_INTERVAL_GENERATED_SOURCE_TEST) &: $(srcdir)/btx_interval_model.yaml noinst_LTLIBRARIES = libtestintervalsource.la nodist_libtestintervalsource_la_SOURCES = $(BTX_INTERVAL_GENERATED_SOURCE_TEST) libtestintervalsource_la_CPPFLAGS = -I$(top_srcdir)/utils -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./btx_source_interval_test/ -libtestintervalsource_la_CFLAGS = -fPIC -Wall -Wextra -Wno-unused-parameter $(WERROR) $(BABELTRACE2_CFLAGS) +libtestintervalsource_la_CFLAGS = $(AM_CFLAGS) -fPIC -Wno-unused-parameter $(BABELTRACE2_CFLAGS) BTX_AGGREG_GENERATED_SOURCE_TEST = \ btx_source_aggreg_test/metababel/metababel.h \ @@ -188,7 +191,7 @@ $(BTX_AGGREG_GENERATED_SOURCE_TEST) &: $(srcdir)/btx_aggreg_model.yaml noinst_LTLIBRARIES += libtestaggregsource.la nodist_libtestaggregsource_la_SOURCES = $(BTX_AGGREG_GENERATED_SOURCE_TEST) libtestaggregsource_la_CPPFLAGS = -I$(top_srcdir)/utils -I$(top_srcdir)/utils/include -I$(srcdir)/include -I./btx_source_aggreg_test/ -libtestaggregsource_la_CFLAGS = -fPIC -Wall -Wextra -Wno-unused-parameter $(WERROR) $(BABELTRACE2_CFLAGS) +libtestaggregsource_la_CFLAGS = $(AM_CFLAGS) -fPIC -Wno-unused-parameter $(BABELTRACE2_CFLAGS) TRACE_COMMON = \ @@ -220,7 +223,7 @@ EXTRA_DIST = \ $(TRACE_COMMON)\ $(TRACE_OUT) \ perfetto_pruned.proto \ - sync_daemon_mpi.c \ + sync_daemon_mpi.cpp \ sync_daemon_fs CLEANFILES = \ diff --git a/xprof/sync_daemon_fs b/xprof/sync_daemon_fs index 51e080c43..1166dbfd7 100755 --- a/xprof/sync_daemon_fs +++ b/xprof/sync_daemon_fs @@ -1,13 +1,14 @@ #!/usr/bin/env ruby # Cannot use require_relative as iprof is not a rb file -# Load MPITopo and RT_SIGNAL_* +# Load MPITopo and related helpers load(File.join(__dir__, 'iprof')) require 'open3' require 'fileutils' require 'securerandom' require 'etc' +require 'socket' FOLDER_JOBID = File.join('.thapi_lock', MPITopo.job_id) SHARED_LOCAL_FILESYSTEM = File.join('/', 'dev', 'shm', Etc.getlogin, FOLDER_JOBID) @@ -50,37 +51,33 @@ def local_barrier(name) busy_wait_until { count_file(folder) == MPITopo.local_size } end -global_handle = nil -parent_pid = nil +msg = nil +sock = UNIXSocket.for_fd(ARGV[0].to_i) +global_handle = init_global_barrier +local_barrier_count = 0 -# Set trap -Signal.trap(SyncDaemon::RT_SIGNAL_GLOBAL_BARRIER) do - global_barrier(global_handle) - Process.kill(SyncDaemon::RT_SIGNAL_READY, parent_pid) -end +# We cannot delete SHARED_LOCAL_FILESYSTEM on FINISH: some rank can exit +# global_barrier (hence reach FINISH) while others are still in +# local_barrier. Removing SHARED_LOCAL_FILESYSTEM would deadlock them. +# One fix: have all ranks busy_wait_until in global_barrier so we +# know local_barrier is done — but given FS performance, we skip it. +until msg == SyncDaemon::MSG_FINISH + msg, = sock.recvmsg(64) + raise 'sync_daemon_fs: parent closed socket without sending FINISH' if msg.empty? -local_barrier_count = 0 -Signal.trap(SyncDaemon::RT_SIGNAL_LOCAL_BARRIER) do - local_barrier(local_barrier_count.to_s) - local_barrier_count += 1 - Process.kill(SyncDaemon::RT_SIGNAL_READY, parent_pid) -end + case msg + when SyncDaemon::MSG_INIT then nil + when SyncDaemon::MSG_LOCAL_BARRIER + local_barrier(local_barrier_count.to_s) + local_barrier_count += 1 + when SyncDaemon::MSG_GLOBAL_BARRIER then global_barrier(global_handle) + when SyncDaemon::MSG_FINISH then nil + else + warn("sync_daemon_fs: unknown message '#{msg}'") + exit(1) + end -Signal.trap(SyncDaemon::RT_SIGNAL_FINISH) do - # We cannot delete SHARED_LOCAL_FILESYSTEM - # Some rank can exit the `global_barrier` (hence calling this function) - # when others ranks are still in the `local_barrier` - # If we delete SHARED_LOCAL_FILESYSTEM, it will deadlock - # - # One possibility to be abble to remove `SHARED_LOCAL_FILESYSTEM`, - # is to make all ranks busy_wait_until in the `global_barrier`. - # This will ensure that every-one exited the `local_barrier`. - # but given the poor performance of our FS, we will avoid that for now... - exit + sock.sendmsg(SyncDaemon::MSG_READY) end -# Init global barrier -global_handle = init_global_barrier -parent_pid = ARGV[0].to_i -Process.kill(SyncDaemon::RT_SIGNAL_READY, parent_pid) -sleep +sock.close diff --git a/xprof/sync_daemon_mpi.c b/xprof/sync_daemon_mpi.cpp similarity index 72% rename from xprof/sync_daemon_mpi.c rename to xprof/sync_daemon_mpi.cpp index fd69e1a20..05a330608 100644 --- a/xprof/sync_daemon_mpi.c +++ b/xprof/sync_daemon_mpi.cpp @@ -1,15 +1,11 @@ +#include "daemon_proto.hpp" #include "mpi.h" -#include -#include -#include -#include -#include +#include +#include -// Define real-time signals -#define RT_SIGNAL_READY SIGRTMIN -#define RT_SIGNAL_GLOBAL_BARRIER SIGRTMIN + 1 -#define RT_SIGNAL_LOCAL_BARRIER SIGRTMIN + 2 -#define RT_SIGNAL_FINISH SIGRTMIN + 3 +using namespace daemon_proto; + +constexpr auto WHO = "sync_daemon_mpi"; #define CHECK_MPI(x) \ do { \ @@ -79,57 +75,62 @@ int MPIX_Init_Session(MPI_Session *lib_shandle, MPI_Comm *lib_comm) { return ret; } -int signal_loop(int parent_pid, MPI_Comm MPI_COMM_WORLD_THAPI, MPI_Comm MPI_COMM_NODE) { - // Initialize signal set and add signals - sigset_t signal_set; - sigemptyset(&signal_set); - sigaddset(&signal_set, RT_SIGNAL_GLOBAL_BARRIER); - sigaddset(&signal_set, RT_SIGNAL_LOCAL_BARRIER); - sigaddset(&signal_set, RT_SIGNAL_FINISH); - sigprocmask(SIG_BLOCK, &signal_set, NULL); - kill(parent_pid, RT_SIGNAL_READY); +int message_loop(const int fd, MPI_Comm MPI_COMM_WORLD_THAPI, MPI_Comm MPI_COMM_NODE) { + char buf[64]; - // Processing loop: - // Should be only exited when receiving RT_SIGNAL_FINISH + // Processing loop: should only be exited when receiving MSG_FINISH while (true) { - int signum; - sigwait(&signal_set, &signum); - if (signum == RT_SIGNAL_FINISH) { - // We don't signal ready when cleaning up - // The master will just `wait` for us to finish - return 0; - } else if (signum == RT_SIGNAL_LOCAL_BARRIER) { + const ssize_t n = read(fd, buf, sizeof(buf)); + if (n < 0) { + perror(WHO); + return 1; + } + if (n == 0) { + fprintf(stderr, "%s: parent closed socket unexpectedly\n", WHO); + return 1; + } + const std::string_view msg(buf, n); + + if (msg == MSG_FINISH) { + return send_msg(WHO, fd, MSG_READY); + } else if (msg == MSG_LOCAL_BARRIER) { MPI_Barrier(MPI_COMM_NODE); - } else if (signum == RT_SIGNAL_GLOBAL_BARRIER) { + } else if (msg == MSG_GLOBAL_BARRIER) { MPI_Barrier(MPI_COMM_WORLD_THAPI); + } else if (msg == MSG_INIT) { + // Initial handshake; no work to do. } else { - fprintf(stderr, "Wrong signal received %d. Exiting", signum); + fprintf(stderr, "%s: unknown message '%.*s'\n", WHO, static_cast(msg.size()), + msg.data()); return 1; } - kill(parent_pid, RT_SIGNAL_READY); - } - // Unreachable - fprintf(stderr, "Wrong signal_loop exit"); - return 1; + if (send_msg(WHO, fd, MSG_READY) < 0) + return 1; + } } int main(int argc, char **argv) { // Initialization int ret = 0; - int parent_pid = 0; // World Session and Communicator MPI_Session lib_shandle = MPI_SESSION_NULL; MPI_Comm MPI_COMM_WORLD_THAPI = MPI_COMM_NULL; MPI_Comm MPI_COMM_NODE = MPI_COMM_NULL; + if (argc < 2) { + fprintf(stderr, "usage: %s \n", argv[0]); + return 1; + } + const int fd = atoi(argv[1]); + CHECK_MPI(MPIX_Init_Session(&lib_shandle, &MPI_COMM_WORLD_THAPI)); CHECK_MPI(MPI_Comm_split_type(MPI_COMM_WORLD_THAPI, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, &MPI_COMM_NODE)); - parent_pid = atoi(argv[1]); - ret = signal_loop(parent_pid, MPI_COMM_WORLD_THAPI, MPI_COMM_NODE); + ret = message_loop(fd, MPI_COMM_WORLD_THAPI, MPI_COMM_NODE); + close(fd); fn_exit: if (MPI_COMM_NODE != MPI_COMM_NULL) diff --git a/xprof/xprof.rb.in b/xprof/xprof.rb.in index 7983bd4f5..9b4804dd6 100755 --- a/xprof/xprof.rb.in +++ b/xprof/xprof.rb.in @@ -34,6 +34,7 @@ require 'optparse_thapi' require 'pty' require 'digest/md5' require 'socket' +require 'io/nonblock' require 'logger' require 'set' require 'securerandom' @@ -347,97 +348,117 @@ end # | \ _. _ ._ _ _ ._ # |_/ (_| (/_ | | | (_) | | # - -class SpawnDaemon - SIGRTMIN = 34 - RT_SIGNAL_READY = SIGRTMIN - RT_SIGNAL_FINISH = SIGRTMIN + 3 - - # This belongs to the global ruby process! - # This Object is NOT thread-safe, please do not spawn user thread - @@ready = false - Signal.trap(RT_SIGNAL_READY) { @@ready = true } - - # Any blocking implementation - # (`IO.pipe`, `throw/catch`+sleep` deadlock for an unknown reason) - # This is why we spin-lock - def _lazy_exec(_cond, &block) - block.call - busy_wait_until { @@ready } - @@ready = false - end - - def lazy_exec(message = nil, cond, &block) +# Shared socketpair protocol between iprof and its (sync/sampling) daemons. +# Subclasses define `active?` (gating predicate) and call `spawn` from their initialize. +module DaemonClient + MSG_INIT = 'INIT' + MSG_FINISH = 'FINISH' + MSG_READY = 'READY' + + # block triggers the daemon; lazy_exec blocks on its READY reply. + def lazy_exec(message = nil) LOGGER.with_info_block(message) do - unless cond + unless active? LOGGER.debug("#{message}: No-op") return end - # Don't inline, Trap doesn't work with logger - # https://bugs.ruby-lang.org/issues/7917 - _lazy_exec(cond, &block) + yield + wait_ready + end + end + + def finalize + LOGGER.debug { "Finalize #{self.class}" } + return unless active? + + @parent.sendmsg(MSG_FINISH) + wait_ready + _, status = Process.wait2(@pid) + @parent.close + LOGGER.warn("#{self.class} exited #{status}") unless status.success? + end + + # Instance delegate so DaemonClient#lazy_exec / #finalize can call `active?` + # on the instance. Subclasses define the class-level `self.active?`. + def active? + self.class.active? + end + + # Mirrors `Process.spawn(*cmd)` but adds: + # - `log:` for the lazy_exec message + # - implicit socketpair: the daemon's end is inserted as the first program + # argument (after optional env hash) and kept open across exec via `fd => fd`. + def spawn(*cmd, log:) + lazy_exec(log) do + @parent, child = UNIXSocket.pair(:SEQPACKET) + # Ruby sets O_NONBLOCK on sockets by default; the daemon uses + # blocking read, so clear it on the inherited fd. + child.nonblock = false + fd = child.fileno + insert_at = cmd.first.is_a?(Hash) ? 2 : 1 + @pid = Process.spawn(*cmd.dup.insert(insert_at, fd.to_s), fd => fd) + child.close + @parent.sendmsg(MSG_INIT) end end + + private + + def wait_ready + reply, = @parent.recvmsg(64) + raise "#{self.class}: expected #{MSG_READY}, got '#{reply}'" unless reply == MSG_READY + end end # _ # |_) _. ._ ._ o _ ._ # |_) (_| | | | (/_ | # -class SyncDaemon < SpawnDaemon - RT_SIGNAL_GLOBAL_BARRIER = SIGRTMIN + 1 - RT_SIGNAL_LOCAL_BARRIER = SIGRTMIN + 2 +class SyncDaemon + include DaemonClient - def lazy_exec_mpi(message = nil, &block) - lazy_exec(message, MPITopo.active?, &block) - end + MSG_LOCAL_BARRIER = 'LOCAL_BARRIER' + MSG_GLOBAL_BARRIER = 'GLOBAL_BARRIER' - def initialize - super - daemon_type = env_fetch_first('THAPI_SYNC_DAEMON') - daemon = case daemon_type - when nil - # Default is MPI - if File.exist?("#{BINDIR}/sync_daemon_mpi") - "#{BINDIR}/sync_daemon_mpi" - else - LOGGER.warn("No #{BINDIR}/sync_daemon_mpi binary. Fallback to #{BINDIR}/sync_daemon_f") - "#{BINDIR}/sync_daemon_fs" - end - when 'mpi' - raise("No #{BINDIR}/sync_daemon_mpi binary") unless File.exist?("#{BINDIR}/sync_daemon_mpi") - - "#{BINDIR}/sync_daemon_mpi" - when 'fs' - "#{BINDIR}/sync_daemon_fs" - else - raise("Error: THAPI_SYNC_DAEMON value (#{daemon_type}) is not supported. Allowed: [mpi,fs] ") - end - - LOGGER.debug { "spawn(#{daemon} #{Process.pid})" } - lazy_exec_mpi("Initialize SyncDaemon #{daemon_type}") do - @pid = spawn("#{daemon} #{Process.pid}") - end - end + SYNC_DAEMON_MPI_PATH = "#{BINDIR}/sync_daemon_mpi" + SYNC_DAEMON_FS_PATH = "#{BINDIR}/sync_daemon_fs" - def finalize - LOGGER.debug { 'Finalize SyncDaemon' } - return unless MPITopo.active? + def self.active? + MPITopo.active? + end - Process.kill(RT_SIGNAL_FINISH, @pid) - Process.wait(@pid) + def initialize + return unless active? + + daemon_path = case (daemon_type = env_fetch_first('THAPI_SYNC_DAEMON')) + when 'mpi' + unless File.exist?(SYNC_DAEMON_MPI_PATH) + raise "No #{SYNC_DAEMON_MPI_PATH} binary. Did you compile with `--disable-sync-daemon-mpi`?" + end + + SYNC_DAEMON_MPI_PATH + when 'fs' + SYNC_DAEMON_FS_PATH + when nil + if File.exist?(SYNC_DAEMON_MPI_PATH) + SYNC_DAEMON_MPI_PATH + else + LOGGER.warn("No #{SYNC_DAEMON_MPI_PATH} binary. Fallback to #{SYNC_DAEMON_FS_PATH}") + SYNC_DAEMON_FS_PATH + end + else + raise "Error: THAPI_SYNC_DAEMON=#{daemon_type} is not supported. Allowed: [mpi,fs]" + end + + spawn(daemon_path, log: "Initialize SyncDaemon #{daemon_type}") end def local_barrier(name) - lazy_exec_mpi("Local_barrier #{name}") do - Process.kill(RT_SIGNAL_LOCAL_BARRIER, @pid) - end + lazy_exec("Local_barrier #{name}") { @parent.sendmsg(MSG_LOCAL_BARRIER) } end def global_barrier - lazy_exec_mpi('Global_barrier') do - Process.kill(RT_SIGNAL_GLOBAL_BARRIER, @pid) - end + lazy_exec('Global_barrier') { @parent.sendmsg(MSG_GLOBAL_BARRIER) } end # The context manager ensure that when the block yield is exited @@ -460,33 +481,21 @@ end # __) (_| | | | |_) | | | | (_| # | _| -def sampling? - OPTIONS[:sample] && MPITopo.local_master? -end +class SamplingDaemon + include DaemonClient -class SamplingDaemon < SpawnDaemon - def lazy_exec_sampling(message = nil, &block) - lazy_exec(message, sampling?, &block) - end + SAMPLING_DAEMON_PATH = "#{BINDIR}/thapi_sampling_daemon" - def initialize(h = {}) - super() - daemon_path = "#{BINDIR}/thapi_sampling_daemon" - raise "No sampling_daemon binary found at #{daemon_path}" unless File.exist?(daemon_path) - - path_so = h.fetch('THAPI_SAMPLING_LIBRARIES', '').gsub(':', ' ') - LOGGER.debug { "spawn(sampling_daemon) #{Process.pid} #{path_so}" } - lazy_exec_sampling('Initialize SamplingDaemon') do - @pid = spawn(h, "#{daemon_path} #{Process.pid} #{path_so}") - end + def self.active? + OPTIONS[:sample] && MPITopo.local_master? end - def finalize - LOGGER.debug { 'Finalize SamplingDaemon' } - return unless sampling? + def initialize(h = {}) + return unless active? + raise "No sampling_daemon binary found at #{SAMPLING_DAEMON_PATH}" unless File.exist?(SAMPLING_DAEMON_PATH) - Process.kill(RT_SIGNAL_FINISH, @pid) - Process.wait(@pid) + path_so = h.fetch('THAPI_SAMPLING_LIBRARIES', '').split(':') + spawn(h, SAMPLING_DAEMON_PATH, *path_so, log: 'Initialize SamplingDaemon') end end @@ -656,18 +665,20 @@ module LocalMaster raise unless MPITopo.local_master? lttng_setup(backends) - - sampling_daemon = SamplingDaemon.new(env) if sampling? - pids = bt_analysis(backends) if OPTIONS[:archive] - - [sampling_daemon, pids] + sampling_daemon = SamplingDaemon.new(env) + begin + archive_pids = bt_analysis(backends) if OPTIONS[:archive] + rescue StandardError + sampling_daemon.finalize + raise + end + [sampling_daemon, archive_pids] end - def teardown_daemon(sampling_daemon, archive_pids) + def teardown(sampling_daemon, archive_pids) raise unless MPITopo.local_master? - sampling_daemon.finalize if sampling? - + sampling_daemon.finalize lttng_teardown_session bt_wait_for_archive_processes(archive_pids) if OPTIONS[:archive] @@ -738,7 +749,7 @@ module LocalMaster profiling: OPTIONS[:profile]) end - if sampling? + if SamplingDaemon.active? channel_name = 'non-blocking-channel' exec("lttng enable-channel --userspace --session=#{lttng_session_uuid} #{channel_name}") exec("lttng add-context --userspace --session=#{lttng_session_uuid} --channel=#{channel_name} -t vpid -t vtid") @@ -912,7 +923,7 @@ def all_env_tracers(usr_binary) # Customization if OPTIONS[:'backend-names'].include?('ze') h['LTTNG_UST_ZE_PARANOID_DRIFT'] = 1 if OPTIONS[:profile] - if sampling? + if SamplingDaemon.active? # The current only reliable way to use zes api # is to call zesInit and set ZES_ENABLE_SYSMAN to 0 h['ZES_ENABLE_SYSMAN'] = 0 @@ -928,7 +939,7 @@ def all_env_tracers(usr_binary) if OPTIONS[:'backend-names'].include?('cxi') backends << 'cxi' - if sampling? + if SamplingDaemon.active? h['LTTNG_UST_CXI_SAMPLING_CXI'] = 1 h['THAPI_SAMPLING_LIBRARIES'] << File.join(PKGLIBDIR, 'cxi', 'libCXISampling.so') end @@ -940,7 +951,7 @@ def all_env_tracers(usr_binary) end # Sample - if sampling? + if SamplingDaemon.active? LOGGER.debug('Sampling Enabled') h['LTTNG_UST_SAMPLING'] = 1 h[%w[LD_LIBRARY_PATH prepend]] << File.join(PKGLIBDIR, 'sampling') @@ -973,7 +984,7 @@ def all_trace_and_processing(usr_argv) launch_usr_bin(env, usr_argv) ensure syncd.local_barrier('waiting_for_application_ending') - LocalMaster.teardown_daemon(sampling_daemon, archive_pids) if MPITopo.local_master? + LocalMaster.teardown(sampling_daemon, archive_pids) if MPITopo.local_master? end LocalMaster.bt_analysis(backends) if MPITopo.local_master? && !OPTIONS[:archive]