From 3cf8fd10d688d7326b9768586f91707d9e7f9449 Mon Sep 17 00:00:00 2001 From: Thorsten Kurth Date: Mon, 20 Apr 2026 10:38:04 +0200 Subject: [PATCH 01/15] Adding Running Stats Normalizer for states/observations fopr all algorithms. Signed-off-by: Thorsten Kurth --- CMakeLists.txt | 1 + .../include/internal/rl/off_policy/ddpg.h | 4 ++ src/csrc/include/internal/rl/off_policy/sac.h | 4 ++ src/csrc/include/internal/rl/off_policy/td3.h | 4 ++ src/csrc/include/internal/rl/on_policy/ppo.h | 4 ++ src/csrc/rl/off_policy/ddpg.cpp | 38 ++++++++++++++++++- src/csrc/rl/off_policy/sac.cpp | 36 +++++++++++++++++- src/csrc/rl/off_policy/td3.cpp | 35 ++++++++++++++++- src/csrc/rl/on_policy/ppo.cpp | 36 +++++++++++++++++- tests/rl/CMakeLists.txt | 7 ++++ 10 files changed, 164 insertions(+), 5 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index db2fb27..f6ed1ba 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -164,6 +164,7 @@ target_sources(${PROJECT_NAME} ${CMAKE_CURRENT_SOURCE_DIR}/src/csrc/models/rl/common_models.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/csrc/models/rl/sac_model.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/csrc/rl/policy.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/src/csrc/rl/running_normalizer.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/csrc/rl/utils.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/csrc/rl/off_policy/interface.cpp ${CMAKE_CURRENT_SOURCE_DIR}/src/csrc/rl/off_policy/ddpg.cpp diff --git a/src/csrc/include/internal/rl/off_policy/ddpg.h b/src/csrc/include/internal/rl/off_policy/ddpg.h index 2ac10b6..710cf67 100644 --- a/src/csrc/include/internal/rl/off_policy/ddpg.h +++ b/src/csrc/include/internal/rl/off_policy/ddpg.h @@ -32,6 +32,7 @@ #include "internal/rl/noise_actor.h" #include "internal/rl/off_policy.h" #include "internal/rl/replay_buffer.h" +#include "internal/rl/running_normalizer.h" #include "internal/rl/utils.h" namespace torchfort { @@ -305,6 +306,9 @@ class DDPGSystem : public RLOffPolicySystem, public std::enable_shared_from_this std::shared_ptr noise_actor_train_; std::shared_ptr noise_actor_exploration_; + // state normalizer (optional, null if disabled) + std::unique_ptr state_normalizer_; + // some parameters int batch_size_; int num_critics_; diff --git a/src/csrc/include/internal/rl/off_policy/sac.h b/src/csrc/include/internal/rl/off_policy/sac.h index 9f913c2..5215358 100644 --- a/src/csrc/include/internal/rl/off_policy/sac.h +++ b/src/csrc/include/internal/rl/off_policy/sac.h @@ -32,6 +32,7 @@ #include "internal/rl/off_policy.h" #include "internal/rl/policy.h" #include "internal/rl/replay_buffer.h" +#include "internal/rl/running_normalizer.h" #include "internal/rl/utils.h" namespace torchfort { @@ -428,6 +429,9 @@ class SACSystem : public RLOffPolicySystem, public std::enable_shared_from_this< // system comm std::shared_ptr system_comm_; + // state normalizer (optional, null if disabled) + std::unique_ptr state_normalizer_; + // some parameters int batch_size_; int num_critics_; diff --git a/src/csrc/include/internal/rl/off_policy/td3.h b/src/csrc/include/internal/rl/off_policy/td3.h index 4600105..49491df 100644 --- a/src/csrc/include/internal/rl/off_policy/td3.h +++ b/src/csrc/include/internal/rl/off_policy/td3.h @@ -32,6 +32,7 @@ #include "internal/rl/noise_actor.h" #include "internal/rl/off_policy.h" #include "internal/rl/replay_buffer.h" +#include "internal/rl/running_normalizer.h" #include "internal/rl/utils.h" namespace torchfort { @@ -338,6 +339,9 @@ class TD3System : public RLOffPolicySystem, public std::enable_shared_from_this< std::shared_ptr noise_actor_train_; std::shared_ptr noise_actor_exploration_; + // state normalizer (optional, null if disabled) + std::unique_ptr state_normalizer_; + // some parameters int batch_size_; int num_critics_; diff --git a/src/csrc/include/internal/rl/on_policy/ppo.h b/src/csrc/include/internal/rl/on_policy/ppo.h index 5a16d76..67f8a78 100644 --- a/src/csrc/include/internal/rl/on_policy/ppo.h +++ b/src/csrc/include/internal/rl/on_policy/ppo.h @@ -32,6 +32,7 @@ #include "internal/rl/on_policy.h" #include "internal/rl/policy.h" #include "internal/rl/rollout_buffer.h" +#include "internal/rl/running_normalizer.h" #include "internal/rl/utils.h" namespace torchfort { @@ -317,6 +318,9 @@ class PPOSystem : public RLOnPolicySystem, public std::enable_shared_from_this system_comm_; + // state normalizer (optional, null if disabled) + std::unique_ptr state_normalizer_; + // some parameters int batch_size_; float epsilon_, clip_q_; diff --git a/src/csrc/rl/off_policy/ddpg.cpp b/src/csrc/rl/off_policy/ddpg.cpp index 5a6fdd3..746ad7f 100644 --- a/src/csrc/rl/off_policy/ddpg.cpp +++ b/src/csrc/rl/off_policy/ddpg.cpp @@ -35,12 +35,16 @@ DDPGSystem::DDPGSystem(const char* name, const YAML::Node& system_node, int mode auto algo_node = system_node["algorithm"]; if (algo_node["parameters"]) { auto params = get_params(algo_node["parameters"]); - std::set supported_params{"batch_size", "nstep", "nstep_reward_reduction", "gamma", "rho"}; + std::set supported_params{"batch_size", "nstep", "nstep_reward_reduction", "gamma", "rho", + "normalize_state"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; gamma_ = params.get_param("gamma")[0]; rho_ = params.get_param("rho")[0]; nstep_ = params.get_param("nstep", 1)[0]; + if (params.get_param("normalize_state", false)[0]) { + state_normalizer_ = std::make_unique(); + } auto redmode = params.get_param("nstep_reward_reduction", "sum")[0]; if (redmode == "sum") { nstep_reward_reduction_ = RewardReductionMode::Sum; @@ -324,6 +328,12 @@ void DDPGSystem::saveCheckpoint(const std::string& checkpoint_dir) const { system_state_->save(state_path.native()); } + // state normalizer + if (state_normalizer_) { + auto normalizer_path = root_dir / "state_normalizer.pt"; + state_normalizer_->save(normalizer_path.native()); + } + // lastly, save the replay buffer: { auto buffer_path = root_dir / "replay_buffer"; @@ -356,6 +366,19 @@ void DDPGSystem::loadCheckpoint(const std::string& checkpoint_dir) { system_state_->load(state_path.native()); } + // state normalizer + if (state_normalizer_) { + auto normalizer_path = root_dir / "state_normalizer.pt"; + if (!std::filesystem::exists(normalizer_path)) { + torchfort::logging::print( + "DDPG: state normalizer is enabled but no saved state was found in the checkpoint. " + "Starting with empty statistics.", + torchfort::logging::warn); + } else { + state_normalizer_->load(normalizer_path.native()); + } + } + // lastly, load the replay buffer: { auto buffer_path = root_dir / "replay_buffer"; @@ -369,6 +392,7 @@ void DDPGSystem::loadCheckpoint(const std::string& checkpoint_dir) { // we should pass a tuple (s, a, s', r, d) void DDPGSystem::updateReplayBuffer(torch::Tensor s, torch::Tensor a, torch::Tensor sp, torch::Tensor r, torch::Tensor d) { + if (state_normalizer_) state_normalizer_->update(s); replay_buffer_->update(s, a, sp, r, d); } @@ -395,7 +419,7 @@ torch::Tensor DDPGSystem::predictWithNoiseTrain_(torch::Tensor state) { // no grad guard torch::NoGradGuard no_grad; - // prepare inputs + // prepare inputs (state is already on model_device_ and already normalized by trainStep) p_model_target_.model->to(model_device_); p_model_target_.model->eval(); state = state.to(model_device_); @@ -428,6 +452,7 @@ torch::Tensor DDPGSystem::predict(torch::Tensor state) { p_model_.model->to(model_device_); p_model_.model->eval(); state = state.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); // do fwd pass auto action = (p_model_.model)->forward(std::vector{state})[0]; @@ -446,6 +471,7 @@ torch::Tensor DDPGSystem::predictExplore(torch::Tensor state) { p_model_.model->to(model_device_); p_model_.model->eval(); state = state.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); // do fwd pass auto action = (*noise_actor_exploration_)(p_model_, state); @@ -465,6 +491,7 @@ torch::Tensor DDPGSystem::evaluate(torch::Tensor state, torch::Tensor action) { q_model_.model->eval(); state = state.to(model_device_); action = action.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor reward = (q_model_.model)->forward(std::vector{state, action})[0]; @@ -494,6 +521,13 @@ void DDPGSystem::trainStep(float& p_loss_val, float& q_loss_val) { r = r.to(model_device_); d = d.to(model_device_); + // sync and apply state normalization + if (state_normalizer_) { + state_normalizer_->sync(p_model_.comm); + s = state_normalizer_->normalize(s); + sp = state_normalizer_->normalize(sp); + } + // get a new action by predicting one with target network ap = predictWithNoiseTrain_(sp); } diff --git a/src/csrc/rl/off_policy/sac.cpp b/src/csrc/rl/off_policy/sac.cpp index 39ff249..0cd353d 100644 --- a/src/csrc/rl/off_policy/sac.cpp +++ b/src/csrc/rl/off_policy/sac.cpp @@ -63,12 +63,16 @@ SACSystem::SACSystem(const char* name, const YAML::Node& system_node, int model_ if (algo_node["parameters"]) { auto params = get_params(algo_node["parameters"]); std::set supported_params{"batch_size", "num_critics", "nstep", "nstep_reward_reduction", - "gamma", "rho", "alpha", "target_entropy"}; + "gamma", "rho", "alpha", "target_entropy", + "normalize_state"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; num_critics_ = params.get_param("num_critics", 2)[0]; gamma_ = params.get_param("gamma")[0]; rho_ = params.get_param("rho")[0]; + if (params.get_param("normalize_state", false)[0]) { + state_normalizer_ = std::make_unique(); + } // alpha needs special care AlphaModel am; am.setup(params.get_param("alpha", 0.)[0]); @@ -433,6 +437,12 @@ void SACSystem::saveCheckpoint(const std::string& checkpoint_dir) const { system_state_->save(state_path.native()); } + // state normalizer + if (state_normalizer_) { + auto normalizer_path = root_dir / "state_normalizer.pt"; + state_normalizer_->save(normalizer_path.native()); + } + // lastly, save the replay buffer: { auto buffer_path = root_dir / "replay_buffer"; @@ -525,6 +535,19 @@ void SACSystem::loadCheckpoint(const std::string& checkpoint_dir) { system_state_->load(state_path.native()); } + // state normalizer + if (state_normalizer_) { + auto normalizer_path = root_dir / "state_normalizer.pt"; + if (!std::filesystem::exists(normalizer_path)) { + torchfort::logging::print( + "SAC: state normalizer is enabled but no saved state was found in the checkpoint. " + "Starting with empty statistics.", + torchfort::logging::warn); + } else { + state_normalizer_->load(normalizer_path.native()); + } + } + // lastly, load the replay buffer: { auto buffer_path = root_dir / "replay_buffer"; @@ -538,6 +561,7 @@ void SACSystem::loadCheckpoint(const std::string& checkpoint_dir) { // we should pass a tuple (s, a, s', r, d) void SACSystem::updateReplayBuffer(torch::Tensor s, torch::Tensor a, torch::Tensor sp, torch::Tensor r, torch::Tensor d) { + if (state_normalizer_) state_normalizer_->update(s); // note that we have to rescale the action: [a_low, a_high] -> [-1, 1], // but the replay buffer only stores scaled actions! replay_buffer_->update(s, a, sp, r, d); @@ -582,6 +606,7 @@ torch::Tensor SACSystem::predict(torch::Tensor state) { p_model_.model->to(model_device_); p_model_.model->eval(); state = state.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); // do fwd pass auto action = (p_model_.model)->forwardDeterministic(state); @@ -600,6 +625,7 @@ torch::Tensor SACSystem::predictExplore(torch::Tensor state) { p_model_.model->to(model_device_); p_model_.model->eval(); state = state.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor action, log_probs; @@ -620,6 +646,7 @@ torch::Tensor SACSystem::evaluate(torch::Tensor state, torch::Tensor action) { q_models_[0].model->eval(); state = state.to(model_device_); action = action.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor reward = (q_models_[0].model)->forward(std::vector{state, action})[0]; @@ -648,6 +675,13 @@ void SACSystem::trainStep(float& p_loss_val, float& q_loss_val) { sp = sp.to(model_device_); r = r.to(model_device_); d = d.to(model_device_); + + // sync and apply state normalization + if (state_normalizer_) { + state_normalizer_->sync(p_model_.comm); + s = state_normalizer_->normalize(s); + sp = state_normalizer_->normalize(sp); + } } // train step diff --git a/src/csrc/rl/off_policy/td3.cpp b/src/csrc/rl/off_policy/td3.cpp index cc3c5ec..f704d33 100644 --- a/src/csrc/rl/off_policy/td3.cpp +++ b/src/csrc/rl/off_policy/td3.cpp @@ -35,7 +35,7 @@ TD3System::TD3System(const char* name, const YAML::Node& system_node, int model_ if (algo_node["parameters"]) { auto params = get_params(algo_node["parameters"]); std::set supported_params{"batch_size", "num_critics", "policy_lag", "nstep", "nstep_reward_reduction", - "gamma", "rho"}; + "gamma", "rho", "normalize_state"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; num_critics_ = params.get_param("num_critics", 2)[0]; @@ -43,6 +43,9 @@ TD3System::TD3System(const char* name, const YAML::Node& system_node, int model_ gamma_ = params.get_param("gamma")[0]; rho_ = params.get_param("rho")[0]; nstep_ = params.get_param("nstep", 1)[0]; + if (params.get_param("normalize_state", false)[0]) { + state_normalizer_ = std::make_unique(); + } auto redmode = params.get_param("nstep_reward_reduction", "sum")[0]; if (redmode == "sum") { nstep_reward_reduction_ = RewardReductionMode::Sum; @@ -387,6 +390,12 @@ void TD3System::saveCheckpoint(const std::string& checkpoint_dir) const { system_state_->save(state_path.native()); } + // state normalizer + if (state_normalizer_) { + auto normalizer_path = root_dir / "state_normalizer.pt"; + state_normalizer_->save(normalizer_path.native()); + } + // lastly, save the replay buffer: { auto buffer_path = root_dir / "replay_buffer"; @@ -428,6 +437,19 @@ void TD3System::loadCheckpoint(const std::string& checkpoint_dir) { system_state_->load(state_path.native()); } + // state normalizer + if (state_normalizer_) { + auto normalizer_path = root_dir / "state_normalizer.pt"; + if (!std::filesystem::exists(normalizer_path)) { + torchfort::logging::print( + "TD3: state normalizer is enabled but no saved state was found in the checkpoint. " + "Starting with empty statistics.", + torchfort::logging::warn); + } else { + state_normalizer_->load(normalizer_path.native()); + } + } + // lastly, load the replay buffer: { auto buffer_path = root_dir / "replay_buffer"; @@ -441,6 +463,7 @@ void TD3System::loadCheckpoint(const std::string& checkpoint_dir) { // we should pass a tuple (s, a, s', r, d) void TD3System::updateReplayBuffer(torch::Tensor s, torch::Tensor a, torch::Tensor sp, torch::Tensor r, torch::Tensor d) { + if (state_normalizer_) state_normalizer_->update(s); replay_buffer_->update(s, a, sp, r, d); } @@ -500,6 +523,7 @@ torch::Tensor TD3System::predict(torch::Tensor state) { p_model_.model->to(model_device_); p_model_.model->eval(); state = state.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); // do fwd pass auto action = (p_model_.model)->forward(std::vector{state})[0]; @@ -518,6 +542,7 @@ torch::Tensor TD3System::predictExplore(torch::Tensor state) { p_model_.model->to(model_device_); p_model_.model->eval(); state = state.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); // do fwd pass auto action = (*noise_actor_exploration_)(p_model_, state); @@ -537,6 +562,7 @@ torch::Tensor TD3System::evaluate(torch::Tensor state, torch::Tensor action) { q_models_[0].model->eval(); state = state.to(model_device_); action = action.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor reward = (q_models_[0].model)->forward(std::vector{state, action})[0]; @@ -568,6 +594,13 @@ void TD3System::trainStep(float& p_loss_val, float& q_loss_val) { r = r.to(model_device_); d = d.to(model_device_); + // sync and apply state normalization + if (state_normalizer_) { + state_normalizer_->sync(p_model_.comm); + s = state_normalizer_->normalize(s); + sp = state_normalizer_->normalize(sp); + } + // get a new action by predicting one with target network ap = predictWithNoiseTrain_(sp); } diff --git a/src/csrc/rl/on_policy/ppo.cpp b/src/csrc/rl/on_policy/ppo.cpp index d40f82b..36bb890 100644 --- a/src/csrc/rl/on_policy/ppo.cpp +++ b/src/csrc/rl/on_policy/ppo.cpp @@ -43,9 +43,13 @@ PPOSystem::PPOSystem(const char* name, const YAML::Node& system_node, int model_ "target_kl_divergence", "entropy_loss_coefficient", "value_loss_coefficient", - "normalize_advantage"}; + "normalize_advantage", + "normalize_state"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; + if (params.get_param("normalize_state", false)[0]) { + state_normalizer_ = std::make_unique(); + } gamma_ = params.get_param("gamma")[0]; gae_lambda_ = params.get_param("gae_lambda")[0]; target_kl_divergence_ = params.get_param("target_kl_divergence")[0]; @@ -250,6 +254,12 @@ void PPOSystem::saveCheckpoint(const std::string& checkpoint_dir) const { system_state_->save(state_path.native()); } + // state normalizer + if (state_normalizer_) { + auto normalizer_path = root_dir / "state_normalizer.pt"; + state_normalizer_->save(normalizer_path.native()); + } + // lastly, save the replay buffer: { auto buffer_path = root_dir / "rollout_buffer"; @@ -300,6 +310,19 @@ void PPOSystem::loadCheckpoint(const std::string& checkpoint_dir) { system_state_->load(state_path.native()); } + // state normalizer + if (state_normalizer_) { + auto normalizer_path = root_dir / "state_normalizer.pt"; + if (!std::filesystem::exists(normalizer_path)) { + torchfort::logging::print( + "PPO: state normalizer is enabled but no saved state was found in the checkpoint. " + "Starting with empty statistics.", + torchfort::logging::warn); + } else { + state_normalizer_->load(normalizer_path.native()); + } + } + // lastly, load the rollout buffer: { auto buffer_path = root_dir / "rollout_buffer"; @@ -341,8 +364,10 @@ void PPOSystem::updateRolloutBuffer(torch::Tensor stens, torch::Tensor atens, to } // compute q: + if (state_normalizer_) state_normalizer_->update(stens); torch::Tensor ad = as.to(model_device_); torch::Tensor sd = stens.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) sd = state_normalizer_->normalize(sd); torch::Tensor log_p_tensor, entropy_tensor, value; std::tie(log_p_tensor, entropy_tensor, value) = (pq_model_.model)->evaluateAction(sd, ad); @@ -380,6 +405,7 @@ torch::Tensor PPOSystem::predict(torch::Tensor state) { pq_model_.model->to(model_device_); pq_model_.model->eval(); state = state.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor action, value; @@ -406,6 +432,7 @@ torch::Tensor PPOSystem::predictExplore(torch::Tensor state) { pq_model_.model->to(model_device_); pq_model_.model->eval(); state = state.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor action, log_probs, value; @@ -432,6 +459,7 @@ torch::Tensor PPOSystem::evaluate(torch::Tensor state, torch::Tensor action) { pq_model_.model->to(model_device_); pq_model_.model->eval(); state = state.to(model_device_); + if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor action_tmp, value; @@ -459,6 +487,12 @@ void PPOSystem::trainStep(float& p_loss_val, float& q_loss_val) { logp = logp.to(model_device_); adv = adv.to(model_device_); ret = ret.to(model_device_); + + // sync and apply state normalization + if (state_normalizer_) { + state_normalizer_->sync(pq_model_.comm); + s = state_normalizer_->normalize(s); + } } // train step diff --git a/tests/rl/CMakeLists.txt b/tests/rl/CMakeLists.txt index 25df135..d9c51fb 100644 --- a/tests/rl/CMakeLists.txt +++ b/tests/rl/CMakeLists.txt @@ -4,6 +4,7 @@ set(test_targets test_replay_buffer test_rollout_buffer test_distributions + test_running_normalizer test_interface test_off_policy test_on_policy @@ -27,6 +28,12 @@ target_sources(test_distributions test_distributions.cpp ) +add_executable(test_running_normalizer) +target_sources(test_running_normalizer + PRIVATE + test_running_normalizer.cpp + ) + add_executable(test_interface) target_sources(test_interface PRIVATE From b043d87f4574a81ac9755d376b045ea9a006e47c Mon Sep 17 00:00:00 2001 From: Thorsten Kurth Date: Mon, 20 Apr 2026 11:02:18 +0200 Subject: [PATCH 02/15] fixing advantage buffer normalization by calling once for the whole history instead of calling it per each batch (which was technically wrong) Signed-off-by: Thorsten Kurth --- src/csrc/include/internal/rl/on_policy/ppo.h | 37 +--------------- src/csrc/include/internal/rl/rollout_buffer.h | 42 +++++++++++++++++++ src/csrc/rl/on_policy/ppo.cpp | 15 ++++++- 3 files changed, 57 insertions(+), 37 deletions(-) diff --git a/src/csrc/include/internal/rl/on_policy/ppo.h b/src/csrc/include/internal/rl/on_policy/ppo.h index 67f8a78..de37918 100644 --- a/src/csrc/include/internal/rl/on_policy/ppo.h +++ b/src/csrc/include/internal/rl/on_policy/ppo.h @@ -46,7 +46,7 @@ template void train_ppo(const ACPolicyPack& pq_model, torch::Tensor state_tensor, torch::Tensor action_tensor, torch::Tensor q_tensor, torch::Tensor log_p_tensor, torch::Tensor adv_tensor, torch::Tensor ret_tensor, const T& epsilon, const T& clip_q, const T& entropy_loss_coeff, const T& q_loss_coeff, - const T& target_kl_divergence, bool normalize_advantage, T& p_loss_val, T& q_loss_val, T& kl_divergence, + const T& target_kl_divergence, T& p_loss_val, T& q_loss_val, T& kl_divergence, T& clip_fraction, T& explained_var) { // nvtx marker @@ -66,40 +66,6 @@ void train_ppo(const ACPolicyPack& pq_model, torch::Tensor state_tensor, torch:: assert(adv_tensor.dim() == 1); assert(ret_tensor.dim() == 1); - // normalize advantages if requested - if (normalize_advantage && (batch_size > 1)) { - // make sure we are not going to compute gradients - torch::NoGradGuard no_grad; - - // compute mean - torch::Tensor adv_mean = torch::sum(adv_tensor); - auto options = torch::TensorOptions().dtype(torch::kLong).device(adv_mean.device()); - torch::Tensor adv_count = torch::tensor({torch::numel(adv_tensor)}, options); - - // average mean across all nodes - if (pq_model.comm) { - std::vector means = {adv_mean, adv_count}; - pq_model.comm->allreduce(means, false); - adv_mean = means[0]; - adv_count = means[1]; - } - adv_mean = adv_mean / adv_count; - - // compute std - torch::Tensor adv_std = torch::sum(torch::square(adv_tensor - adv_mean)); - - // average std across all nodes - if (pq_model.comm) { - std::vector stds = {adv_std}; - pq_model.comm->allreduce(stds, false); - adv_std = stds[0]; - } - adv_std = torch::sqrt(adv_std / (adv_count - 1)); - - // update advantage tensor - adv_tensor = (adv_tensor - adv_mean) / (adv_std + 1.e-8); - } - // set models to train pq_model.model->train(); @@ -330,6 +296,7 @@ class PPOSystem : public RLOnPolicySystem, public std::enable_shared_from_this #include "internal/defines.h" +#include "internal/distributed.h" #include "internal/rl/rl.h" namespace torchfort { @@ -179,6 +180,47 @@ class GAELambdaRolloutBuffer : public RolloutBuffer, public std::enable_shared_f return; } + // Normalize all stored advantages to zero mean and unit variance over the full rollout. + // In distributed mode, statistics are combined across ranks via allreduce so that all + // ranks use the same normalization. Call this once after finalize() and before sampling. + void normalizeAdvantages(std::shared_ptr comm) { + if (!finalized_) { + throw std::runtime_error( + "GAELambdaRolloutBuffer::normalizeAdvantages: buffer must be finalized before normalizing advantages."); + } + + torch::NoGradGuard no_grad; + + // stack all per-step advantages into [size_, n_envs_] and flatten to 1D + auto all_adv = torch::stack(advantages_, 0).flatten().to(torch::kFloat32); + + // compute global sum and count for the mean + auto adv_sum = torch::sum(all_adv); + auto count_tensor = torch::tensor({static_cast(all_adv.numel())}).to(all_adv.device()); + + if (comm) { + std::vector stats = {adv_sum, count_tensor}; + comm->allreduce(stats, false); + adv_sum = stats[0]; + count_tensor = stats[1]; + } + auto adv_mean = adv_sum / count_tensor; + + // compute global sum of squared deviations for the std + auto adv_sq = torch::sum(torch::square(all_adv - adv_mean)); + if (comm) { + std::vector sq_stats = {adv_sq}; + comm->allreduce(sq_stats, false); + adv_sq = sq_stats[0]; + } + auto adv_std = torch::sqrt(adv_sq / (count_tensor - 1.) + 1e-8); + + // normalize all stored advantages in-place + for (auto& adv : advantages_) { + adv = (adv - adv_mean) / adv_std; + } + } + std::tuple sample(int batch_size) { diff --git a/src/csrc/rl/on_policy/ppo.cpp b/src/csrc/rl/on_policy/ppo.cpp index 36bb890..ee40b17 100644 --- a/src/csrc/rl/on_policy/ppo.cpp +++ b/src/csrc/rl/on_policy/ppo.cpp @@ -63,6 +63,7 @@ PPOSystem::PPOSystem(const char* name, const YAML::Node& system_node, int model_ entropy_loss_coeff_ = params.get_param("entropy_loss_coefficient", 0.0)[0]; value_loss_coeff_ = params.get_param("value_loss_coefficient", 0.5)[0]; normalize_advantage_ = params.get_param("normalize_advantage", true)[0]; + advantage_normalized_ = false; } else { THROW_INVALID_USAGE("Missing parameters section in algorithm section in configuration file."); } @@ -373,9 +374,19 @@ void PPOSystem::updateRolloutBuffer(torch::Tensor stens, torch::Tensor atens, to // the replay buffer only stores scaled actions! rollout_buffer_->update(stens, as, rtens, value, log_p_tensor, etens); + + // normalize advantages once over the full rollout as soon as it is finalized, + // before any mini-batch sampling starts + if (normalize_advantage_ && rollout_buffer_->isReady() && !advantage_normalized_) { + rollout_buffer_->normalizeAdvantages(pq_model_.comm); + advantage_normalized_ = true; + } } -void PPOSystem::resetRolloutBuffer() { rollout_buffer_->reset(); } +void PPOSystem::resetRolloutBuffer() { + rollout_buffer_->reset(); + advantage_normalized_ = false; +} void PPOSystem::setSeed(unsigned int seed) { rollout_buffer_->setSeed(seed); } @@ -497,7 +508,7 @@ void PPOSystem::trainStep(float& p_loss_val, float& q_loss_val) { // train step train_ppo(pq_model_, s, a, q, logp, adv, ret, epsilon_, clip_q_, entropy_loss_coeff_, value_loss_coeff_, - target_kl_divergence_, normalize_advantage_, p_loss_val, q_loss_val, current_kl_divergence_, clip_fraction_, + target_kl_divergence_, p_loss_val, q_loss_val, current_kl_divergence_, clip_fraction_, explained_variance_); // system logging From c054a6796e9adce2267668c8bc88efeaf103ab59 Mon Sep 17 00:00:00 2001 From: Thorsten Kurth Date: Mon, 20 Apr 2026 11:07:26 +0200 Subject: [PATCH 03/15] fixing formatting Signed-off-by: Thorsten Kurth --- src/csrc/include/internal/rl/on_policy/ppo.h | 4 ++-- src/csrc/rl/off_policy/ddpg.cpp | 23 ++++++++++-------- src/csrc/rl/off_policy/sac.cpp | 23 ++++++++++-------- src/csrc/rl/off_policy/td3.cpp | 24 +++++++++++-------- src/csrc/rl/on_policy/ppo.cpp | 25 +++++++++++--------- 5 files changed, 56 insertions(+), 43 deletions(-) diff --git a/src/csrc/include/internal/rl/on_policy/ppo.h b/src/csrc/include/internal/rl/on_policy/ppo.h index de37918..434be49 100644 --- a/src/csrc/include/internal/rl/on_policy/ppo.h +++ b/src/csrc/include/internal/rl/on_policy/ppo.h @@ -46,8 +46,8 @@ template void train_ppo(const ACPolicyPack& pq_model, torch::Tensor state_tensor, torch::Tensor action_tensor, torch::Tensor q_tensor, torch::Tensor log_p_tensor, torch::Tensor adv_tensor, torch::Tensor ret_tensor, const T& epsilon, const T& clip_q, const T& entropy_loss_coeff, const T& q_loss_coeff, - const T& target_kl_divergence, T& p_loss_val, T& q_loss_val, T& kl_divergence, - T& clip_fraction, T& explained_var) { + const T& target_kl_divergence, T& p_loss_val, T& q_loss_val, T& kl_divergence, T& clip_fraction, + T& explained_var) { // nvtx marker torchfort::nvtx::rangePush("torchfort_train_ppo"); diff --git a/src/csrc/rl/off_policy/ddpg.cpp b/src/csrc/rl/off_policy/ddpg.cpp index 746ad7f..ed001c5 100644 --- a/src/csrc/rl/off_policy/ddpg.cpp +++ b/src/csrc/rl/off_policy/ddpg.cpp @@ -35,8 +35,8 @@ DDPGSystem::DDPGSystem(const char* name, const YAML::Node& system_node, int mode auto algo_node = system_node["algorithm"]; if (algo_node["parameters"]) { auto params = get_params(algo_node["parameters"]); - std::set supported_params{"batch_size", "nstep", "nstep_reward_reduction", "gamma", "rho", - "normalize_state"}; + std::set supported_params{"batch_size", "nstep", "nstep_reward_reduction", + "gamma", "rho", "normalize_state"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; gamma_ = params.get_param("gamma")[0]; @@ -370,10 +370,9 @@ void DDPGSystem::loadCheckpoint(const std::string& checkpoint_dir) { if (state_normalizer_) { auto normalizer_path = root_dir / "state_normalizer.pt"; if (!std::filesystem::exists(normalizer_path)) { - torchfort::logging::print( - "DDPG: state normalizer is enabled but no saved state was found in the checkpoint. " - "Starting with empty statistics.", - torchfort::logging::warn); + torchfort::logging::print("DDPG: state normalizer is enabled but no saved state was found in the checkpoint. " + "Starting with empty statistics.", + torchfort::logging::warn); } else { state_normalizer_->load(normalizer_path.native()); } @@ -392,7 +391,8 @@ void DDPGSystem::loadCheckpoint(const std::string& checkpoint_dir) { // we should pass a tuple (s, a, s', r, d) void DDPGSystem::updateReplayBuffer(torch::Tensor s, torch::Tensor a, torch::Tensor sp, torch::Tensor r, torch::Tensor d) { - if (state_normalizer_) state_normalizer_->update(s); + if (state_normalizer_) + state_normalizer_->update(s); replay_buffer_->update(s, a, sp, r, d); } @@ -452,7 +452,8 @@ torch::Tensor DDPGSystem::predict(torch::Tensor state) { p_model_.model->to(model_device_); p_model_.model->eval(); state = state.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); + if (state_normalizer_ && state_normalizer_->isInitialized()) + state = state_normalizer_->normalize(state); // do fwd pass auto action = (p_model_.model)->forward(std::vector{state})[0]; @@ -471,7 +472,8 @@ torch::Tensor DDPGSystem::predictExplore(torch::Tensor state) { p_model_.model->to(model_device_); p_model_.model->eval(); state = state.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); + if (state_normalizer_ && state_normalizer_->isInitialized()) + state = state_normalizer_->normalize(state); // do fwd pass auto action = (*noise_actor_exploration_)(p_model_, state); @@ -491,7 +493,8 @@ torch::Tensor DDPGSystem::evaluate(torch::Tensor state, torch::Tensor action) { q_model_.model->eval(); state = state.to(model_device_); action = action.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); + if (state_normalizer_ && state_normalizer_->isInitialized()) + state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor reward = (q_model_.model)->forward(std::vector{state, action})[0]; diff --git a/src/csrc/rl/off_policy/sac.cpp b/src/csrc/rl/off_policy/sac.cpp index 0cd353d..76f342c 100644 --- a/src/csrc/rl/off_policy/sac.cpp +++ b/src/csrc/rl/off_policy/sac.cpp @@ -62,8 +62,8 @@ SACSystem::SACSystem(const char* name, const YAML::Node& system_node, int model_ auto algo_node = system_node["algorithm"]; if (algo_node["parameters"]) { auto params = get_params(algo_node["parameters"]); - std::set supported_params{"batch_size", "num_critics", "nstep", "nstep_reward_reduction", - "gamma", "rho", "alpha", "target_entropy", + std::set supported_params{"batch_size", "num_critics", "nstep", "nstep_reward_reduction", + "gamma", "rho", "alpha", "target_entropy", "normalize_state"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; @@ -539,10 +539,9 @@ void SACSystem::loadCheckpoint(const std::string& checkpoint_dir) { if (state_normalizer_) { auto normalizer_path = root_dir / "state_normalizer.pt"; if (!std::filesystem::exists(normalizer_path)) { - torchfort::logging::print( - "SAC: state normalizer is enabled but no saved state was found in the checkpoint. " - "Starting with empty statistics.", - torchfort::logging::warn); + torchfort::logging::print("SAC: state normalizer is enabled but no saved state was found in the checkpoint. " + "Starting with empty statistics.", + torchfort::logging::warn); } else { state_normalizer_->load(normalizer_path.native()); } @@ -561,7 +560,8 @@ void SACSystem::loadCheckpoint(const std::string& checkpoint_dir) { // we should pass a tuple (s, a, s', r, d) void SACSystem::updateReplayBuffer(torch::Tensor s, torch::Tensor a, torch::Tensor sp, torch::Tensor r, torch::Tensor d) { - if (state_normalizer_) state_normalizer_->update(s); + if (state_normalizer_) + state_normalizer_->update(s); // note that we have to rescale the action: [a_low, a_high] -> [-1, 1], // but the replay buffer only stores scaled actions! replay_buffer_->update(s, a, sp, r, d); @@ -606,7 +606,8 @@ torch::Tensor SACSystem::predict(torch::Tensor state) { p_model_.model->to(model_device_); p_model_.model->eval(); state = state.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); + if (state_normalizer_ && state_normalizer_->isInitialized()) + state = state_normalizer_->normalize(state); // do fwd pass auto action = (p_model_.model)->forwardDeterministic(state); @@ -625,7 +626,8 @@ torch::Tensor SACSystem::predictExplore(torch::Tensor state) { p_model_.model->to(model_device_); p_model_.model->eval(); state = state.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); + if (state_normalizer_ && state_normalizer_->isInitialized()) + state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor action, log_probs; @@ -646,7 +648,8 @@ torch::Tensor SACSystem::evaluate(torch::Tensor state, torch::Tensor action) { q_models_[0].model->eval(); state = state.to(model_device_); action = action.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); + if (state_normalizer_ && state_normalizer_->isInitialized()) + state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor reward = (q_models_[0].model)->forward(std::vector{state, action})[0]; diff --git a/src/csrc/rl/off_policy/td3.cpp b/src/csrc/rl/off_policy/td3.cpp index f704d33..73af389 100644 --- a/src/csrc/rl/off_policy/td3.cpp +++ b/src/csrc/rl/off_policy/td3.cpp @@ -34,8 +34,9 @@ TD3System::TD3System(const char* name, const YAML::Node& system_node, int model_ auto algo_node = system_node["algorithm"]; if (algo_node["parameters"]) { auto params = get_params(algo_node["parameters"]); - std::set supported_params{"batch_size", "num_critics", "policy_lag", "nstep", "nstep_reward_reduction", - "gamma", "rho", "normalize_state"}; + std::set supported_params{ + "batch_size", "num_critics", "policy_lag", "nstep", "nstep_reward_reduction", + "gamma", "rho", "normalize_state"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; num_critics_ = params.get_param("num_critics", 2)[0]; @@ -441,10 +442,9 @@ void TD3System::loadCheckpoint(const std::string& checkpoint_dir) { if (state_normalizer_) { auto normalizer_path = root_dir / "state_normalizer.pt"; if (!std::filesystem::exists(normalizer_path)) { - torchfort::logging::print( - "TD3: state normalizer is enabled but no saved state was found in the checkpoint. " - "Starting with empty statistics.", - torchfort::logging::warn); + torchfort::logging::print("TD3: state normalizer is enabled but no saved state was found in the checkpoint. " + "Starting with empty statistics.", + torchfort::logging::warn); } else { state_normalizer_->load(normalizer_path.native()); } @@ -463,7 +463,8 @@ void TD3System::loadCheckpoint(const std::string& checkpoint_dir) { // we should pass a tuple (s, a, s', r, d) void TD3System::updateReplayBuffer(torch::Tensor s, torch::Tensor a, torch::Tensor sp, torch::Tensor r, torch::Tensor d) { - if (state_normalizer_) state_normalizer_->update(s); + if (state_normalizer_) + state_normalizer_->update(s); replay_buffer_->update(s, a, sp, r, d); } @@ -523,7 +524,8 @@ torch::Tensor TD3System::predict(torch::Tensor state) { p_model_.model->to(model_device_); p_model_.model->eval(); state = state.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); + if (state_normalizer_ && state_normalizer_->isInitialized()) + state = state_normalizer_->normalize(state); // do fwd pass auto action = (p_model_.model)->forward(std::vector{state})[0]; @@ -542,7 +544,8 @@ torch::Tensor TD3System::predictExplore(torch::Tensor state) { p_model_.model->to(model_device_); p_model_.model->eval(); state = state.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); + if (state_normalizer_ && state_normalizer_->isInitialized()) + state = state_normalizer_->normalize(state); // do fwd pass auto action = (*noise_actor_exploration_)(p_model_, state); @@ -562,7 +565,8 @@ torch::Tensor TD3System::evaluate(torch::Tensor state, torch::Tensor action) { q_models_[0].model->eval(); state = state.to(model_device_); action = action.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); + if (state_normalizer_ && state_normalizer_->isInitialized()) + state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor reward = (q_models_[0].model)->forward(std::vector{state, action})[0]; diff --git a/src/csrc/rl/on_policy/ppo.cpp b/src/csrc/rl/on_policy/ppo.cpp index ee40b17..8ba5bac 100644 --- a/src/csrc/rl/on_policy/ppo.cpp +++ b/src/csrc/rl/on_policy/ppo.cpp @@ -315,10 +315,9 @@ void PPOSystem::loadCheckpoint(const std::string& checkpoint_dir) { if (state_normalizer_) { auto normalizer_path = root_dir / "state_normalizer.pt"; if (!std::filesystem::exists(normalizer_path)) { - torchfort::logging::print( - "PPO: state normalizer is enabled but no saved state was found in the checkpoint. " - "Starting with empty statistics.", - torchfort::logging::warn); + torchfort::logging::print("PPO: state normalizer is enabled but no saved state was found in the checkpoint. " + "Starting with empty statistics.", + torchfort::logging::warn); } else { state_normalizer_->load(normalizer_path.native()); } @@ -365,10 +364,12 @@ void PPOSystem::updateRolloutBuffer(torch::Tensor stens, torch::Tensor atens, to } // compute q: - if (state_normalizer_) state_normalizer_->update(stens); + if (state_normalizer_) + state_normalizer_->update(stens); torch::Tensor ad = as.to(model_device_); torch::Tensor sd = stens.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) sd = state_normalizer_->normalize(sd); + if (state_normalizer_ && state_normalizer_->isInitialized()) + sd = state_normalizer_->normalize(sd); torch::Tensor log_p_tensor, entropy_tensor, value; std::tie(log_p_tensor, entropy_tensor, value) = (pq_model_.model)->evaluateAction(sd, ad); @@ -416,7 +417,8 @@ torch::Tensor PPOSystem::predict(torch::Tensor state) { pq_model_.model->to(model_device_); pq_model_.model->eval(); state = state.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); + if (state_normalizer_ && state_normalizer_->isInitialized()) + state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor action, value; @@ -443,7 +445,8 @@ torch::Tensor PPOSystem::predictExplore(torch::Tensor state) { pq_model_.model->to(model_device_); pq_model_.model->eval(); state = state.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); + if (state_normalizer_ && state_normalizer_->isInitialized()) + state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor action, log_probs, value; @@ -470,7 +473,8 @@ torch::Tensor PPOSystem::evaluate(torch::Tensor state, torch::Tensor action) { pq_model_.model->to(model_device_); pq_model_.model->eval(); state = state.to(model_device_); - if (state_normalizer_ && state_normalizer_->isInitialized()) state = state_normalizer_->normalize(state); + if (state_normalizer_ && state_normalizer_->isInitialized()) + state = state_normalizer_->normalize(state); // do fwd pass torch::Tensor action_tmp, value; @@ -508,8 +512,7 @@ void PPOSystem::trainStep(float& p_loss_val, float& q_loss_val) { // train step train_ppo(pq_model_, s, a, q, logp, adv, ret, epsilon_, clip_q_, entropy_loss_coeff_, value_loss_coeff_, - target_kl_divergence_, p_loss_val, q_loss_val, current_kl_divergence_, clip_fraction_, - explained_variance_); + target_kl_divergence_, p_loss_val, q_loss_val, current_kl_divergence_, clip_fraction_, explained_variance_); // system logging if ((system_state_->report_frequency > 0) && (train_step_count_ % system_state_->report_frequency == 0)) { From 68f7aabf08faf19e61e4d22ae39f5ae8605080bc Mon Sep 17 00:00:00 2001 From: Thorsten Kurth Date: Mon, 20 Apr 2026 11:27:17 +0200 Subject: [PATCH 04/15] Adding missing files for normalizer Signed-off-by: Thorsten Kurth --- .../include/internal/rl/running_normalizer.h | 87 +++++ src/csrc/rl/running_normalizer.cpp | 127 +++++++ tests/rl/test_running_normalizer.cpp | 328 ++++++++++++++++++ 3 files changed, 542 insertions(+) create mode 100644 src/csrc/include/internal/rl/running_normalizer.h create mode 100644 src/csrc/rl/running_normalizer.cpp create mode 100644 tests/rl/test_running_normalizer.cpp diff --git a/src/csrc/include/internal/rl/running_normalizer.h b/src/csrc/include/internal/rl/running_normalizer.h new file mode 100644 index 0000000..d9dcee8 --- /dev/null +++ b/src/csrc/include/internal/rl/running_normalizer.h @@ -0,0 +1,87 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2023-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include + +#include "internal/distributed.h" + +namespace torchfort { + +namespace rl { + +// Online per-feature normalizer using Welford's parallel algorithm. +// +// Running statistics (mean, M2, count) are stored on CPU. normalize() moves them +// to the input tensor's device on-the-fly so the normalization arithmetic runs on +// GPU when called with device tensors. +// +// Two normalization modes are supported via the scale_only constructor flag: +// +// scale_only = false (default): x_norm = (x - mean) / sqrt(var + eps) +// Use for observations/states where zero-centering is desirable. +// +// scale_only = true: x_norm = x / sqrt(var + eps) +// Use for returns, where the mean must be preserved so the value function +// can learn the correct absolute level. The mean is still tracked internally +// (for distributed sync via Chan's algorithm) but not subtracted during normalization. +// +// Distributed sync: call sync() once per training step to combine per-rank running +// statistics across MPI ranks using Chan's parallel algorithm via two allreduce calls: +// 1. allreduce(count, weighted_mean) -> global count and mean +// 2. allreduce(local M2 contribution) -> global M2 +class RunningNormalizer { +public: + explicit RunningNormalizer(float eps = 1e-8f, bool scale_only = false) + : count_(0), eps_(eps), scale_only_(scale_only) {} + + // Update running statistics with a batch of samples. + // x shape: [batch, feature...]. Statistics are tracked per feature element. + // x may be on any device; statistics are always kept on CPU. + void update(torch::Tensor x); + + // Normalize x using current running statistics. + // Returns x unchanged if fewer than 2 samples have been seen. + // Statistics are moved to x.device() for the computation. + // In scale_only mode, only divides by std without subtracting the mean. + torch::Tensor normalize(torch::Tensor x) const; + + // Combine running statistics across MPI ranks using Chan's parallel algorithm. + // No-op if comm is null or count_ == 0. + void sync(std::shared_ptr comm); + + // Checkpoint support. + void save(const std::string& path) const; + void load(const std::string& path); + + bool isInitialized() const { return count_ > 0; } + +private: + torch::Tensor mean_; // per-feature mean, CPU float32 + torch::Tensor M2_; // per-feature sum of squared deviations, CPU float32 + int64_t count_; + float eps_; + bool scale_only_; +}; + +} // namespace rl + +} // namespace torchfort diff --git a/src/csrc/rl/running_normalizer.cpp b/src/csrc/rl/running_normalizer.cpp new file mode 100644 index 0000000..baa532c --- /dev/null +++ b/src/csrc/rl/running_normalizer.cpp @@ -0,0 +1,127 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2023-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "internal/rl/running_normalizer.h" + +namespace torchfort { + +namespace rl { + +void RunningNormalizer::update(torch::Tensor x) { + torch::NoGradGuard no_grad; + + // move to CPU float32, flatten to [batch, features] + int64_t batch_size = x.size(0); + auto x_flat = x.reshape({batch_size, -1}).to(torch::kFloat32).cpu(); + + // batch statistics + auto batch_mean = x_flat.mean(0); + auto batch_M2 = torch::sum(torch::square(x_flat - batch_mean.unsqueeze(0)), 0); + + if (count_ == 0) { + mean_ = batch_mean; + M2_ = batch_M2; + count_ = batch_size; + } else { + // Chan's parallel algorithm: combine (count_, mean_, M2_) with (batch_size, batch_mean, batch_M2) + int64_t new_count = count_ + batch_size; + auto delta = batch_mean - mean_; + auto new_mean = mean_ + delta * (static_cast(batch_size) / static_cast(new_count)); + auto new_M2 = M2_ + batch_M2 + + torch::square(delta) * + (static_cast(count_) * static_cast(batch_size) / static_cast(new_count)); + count_ = new_count; + mean_ = new_mean; + M2_ = new_M2; + } +} + +torch::Tensor RunningNormalizer::normalize(torch::Tensor x) const { + if (count_ < 2) return x; + + torch::NoGradGuard no_grad; + + auto orig_shape = x.sizes().vec(); + int64_t batch_size = x.size(0); + + // flatten to [batch, features], normalize, restore shape + auto x_flat = x.reshape({batch_size, -1}).to(torch::kFloat32); + + auto var = M2_ / static_cast(count_ - 1); + auto std = torch::sqrt(var + eps_).to(x.device()); + + if (scale_only_) { + // preserve the mean: divide by std only (used for return normalization) + return (x_flat / std).reshape(orig_shape); + } else { + auto mean = mean_.to(x.device()); + return ((x_flat - mean) / std).reshape(orig_shape); + } +} + +void RunningNormalizer::sync(std::shared_ptr comm) { + if (!comm || count_ == 0) return; + + torch::NoGradGuard no_grad; + + // Step 1: compute global count and global mean via allreduce of (count, count*mean). + // Using false (sum, not average) so we get the global sums directly. + auto count_tensor = torch::tensor({static_cast(count_)}); + auto weighted_mean = mean_ * static_cast(count_); + + std::vector step1 = {count_tensor, weighted_mean}; + comm->allreduce(step1, false); + + float global_count = step1[0].item(); + auto global_mean = step1[1] / global_count; + + // Step 2: combine M2 across ranks using Chan's formula. + // Each rank contributes: M2_i + n_i * (mean_i - global_mean)^2 + auto local_contribution = M2_ + static_cast(count_) * torch::square(mean_ - global_mean); + std::vector step2 = {local_contribution}; + comm->allreduce(step2, false); + + count_ = static_cast(global_count); + mean_ = global_mean; + M2_ = step2[0]; +} + +void RunningNormalizer::save(const std::string& path) const { + torch::serialize::OutputArchive archive; + archive.write("mean", mean_.defined() ? mean_ : torch::zeros({1})); + archive.write("M2", M2_.defined() ? M2_ : torch::zeros({1})); + archive.write("count", torch::tensor({count_})); + archive.write("scale_only", torch::tensor({static_cast(scale_only_)})); + archive.save_to(path); +} + +void RunningNormalizer::load(const std::string& path) { + torch::serialize::InputArchive archive; + archive.load_from(path); + archive.read("mean", mean_); + archive.read("M2", M2_); + torch::Tensor count_tensor; + archive.read("count", count_tensor); + count_ = count_tensor.item(); + torch::Tensor scale_only_tensor; + archive.read("scale_only", scale_only_tensor); + scale_only_ = static_cast(scale_only_tensor.item()); +} + +} // namespace rl + +} // namespace torchfort diff --git a/tests/rl/test_running_normalizer.cpp b/tests/rl/test_running_normalizer.cpp new file mode 100644 index 0000000..c877984 --- /dev/null +++ b/tests/rl/test_running_normalizer.cpp @@ -0,0 +1,328 @@ +/* + * SPDX-FileCopyrightText: Copyright (c) 2023-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-License-Identifier: Apache-2.0 + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "internal/rl/running_normalizer.h" +#include +#include + +using namespace torchfort; + +// Ground truth distribution parameters for 4 independent features. +// Each feature has a distinct nonzero mean and non-unit std so that a buggy +// normalizer (e.g. one that ignores the mean or gets variance wrong) is +// reliably caught. +static const int N_FEATURES = 4; +static const std::vector TRUE_MEAN = {3.0f, -2.0f, 0.5f, 10.0f}; +static const std::vector TRUE_STD = {1.5f, 0.5f, 3.0f, 0.2f}; + +// Helper: build a [batch_size, N_FEATURES] tensor sampled from the ground-truth distribution. +static torch::Tensor make_batch(int batch_size) { + auto mean = torch::tensor(TRUE_MEAN); + auto std = torch::tensor(TRUE_STD); + return torch::randn({batch_size, N_FEATURES}) * std.unsqueeze(0) + mean.unsqueeze(0); +} + +// ---- Test 1: statistics accuracy ----------------------------------------- +// Feed N batches from a known distribution, then verify that the normalizer's +// running mean and std converge to the true values within a tight tolerance. +// With 50000 total samples the estimation error should be well below 1%. +TEST(RunningNormalizer, StatsAccuracy) { + torch::manual_seed(42); + torch::NoGradGuard no_grad; + + rl::RunningNormalizer normalizer; + + const int batch_size = 100; + const int n_batches = 500; // 50000 samples total + + for (int i = 0; i < n_batches; ++i) { + normalizer.update(make_batch(batch_size)); + } + + ASSERT_TRUE(normalizer.isInitialized()); + + // Access internal state via a fresh normalize pass on a zero tensor to + // extract mean and std indirectly, OR test via normalized output. + // We instead check the running statistics directly by normalizing a tensor + // whose value we control and inspecting the result. + // + // Strategy: normalize(true_mean_tensor) should yield ~0, and + // normalize(true_mean_tensor + true_std_tensor) should yield ~1. + auto mean_tensor = torch::tensor(TRUE_MEAN).unsqueeze(0); // [1, 4] + auto std_tensor = torch::tensor(TRUE_STD).unsqueeze(0); + + auto normalized_mean = normalizer.normalize(mean_tensor); + auto normalized_mean_plus_std = normalizer.normalize(mean_tensor + std_tensor); + + // normalized(true_mean) should be ~0 for each feature + for (int f = 0; f < N_FEATURES; ++f) { + EXPECT_NEAR(normalized_mean[0][f].item(), 0.0f, 0.05f) + << "Feature " << f << ": normalized mean should be ~0"; + } + + // normalized(true_mean + true_std) should be ~1 for each feature + for (int f = 0; f < N_FEATURES; ++f) { + EXPECT_NEAR(normalized_mean_plus_std[0][f].item(), 1.0f, 0.05f) + << "Feature " << f << ": normalized(mean + std) should be ~1"; + } +} + +// ---- Test 2: normalized output has zero mean and unit variance ----------- +// After training the normalizer, normalize a large fresh batch drawn from +// the same distribution and verify the output is approximately N(0,1). +TEST(RunningNormalizer, NormalizedOutputDistribution) { + torch::manual_seed(123); + torch::NoGradGuard no_grad; + + rl::RunningNormalizer normalizer; + + // Warm up the normalizer with 50000 samples + const int warmup_batches = 500; + const int batch_size = 100; + for (int i = 0; i < warmup_batches; ++i) { + normalizer.update(make_batch(batch_size)); + } + + // Normalize a fresh large batch (10000 samples) and measure output stats + const int test_size = 10000; + auto test_batch = make_batch(test_size); + auto normalized = normalizer.normalize(test_batch); + + // Per-feature mean should be ~0 + auto out_mean = normalized.mean(0); // [N_FEATURES] + for (int f = 0; f < N_FEATURES; ++f) { + EXPECT_NEAR(out_mean[f].item(), 0.0f, 0.05f) + << "Feature " << f << ": normalized output mean should be ~0"; + } + + // Per-feature std should be ~1 + auto out_std = normalized.std(0); // [N_FEATURES], unbiased + for (int f = 0; f < N_FEATURES; ++f) { + EXPECT_NEAR(out_std[f].item(), 1.0f, 0.05f) + << "Feature " << f << ": normalized output std should be ~1"; + } +} + +// ---- Test 3: incremental vs. single-batch equivalence -------------------- +// Verify that many small batch updates give the same running statistics as +// one large batch update. This validates the Chan parallel algorithm. +TEST(RunningNormalizer, IncrementalVsBatch) { + torch::manual_seed(7); + torch::NoGradGuard no_grad; + + // Build a fixed dataset once + const int total_samples = 10000; + const int small_batch = 10; + auto full_data = make_batch(total_samples); // [10000, 4] + + // Normalizer A: one large update + rl::RunningNormalizer norm_batch; + norm_batch.update(full_data); + + // Normalizer B: many small updates + rl::RunningNormalizer norm_incremental; + for (int i = 0; i < total_samples / small_batch; ++i) { + norm_incremental.update(full_data.slice(0, i * small_batch, (i + 1) * small_batch)); + } + + // Both should produce identical normalized output for the same input + auto probe = make_batch(32); + auto out_batch = norm_batch.normalize(probe); + auto out_incremental = norm_incremental.normalize(probe); + + // Element-wise match to float32 precision + EXPECT_TRUE(torch::allclose(out_batch, out_incremental, /*rtol=*/1e-4, /*atol=*/1e-5)) + << "Batch and incremental normalizers should produce identical output"; +} + +// ---- Test 4: early return when not enough data --------------------------- +// normalize() should return the input unchanged until at least 2 samples +// have been seen (no valid variance estimate before that). +TEST(RunningNormalizer, EarlyReturnBeforeInitialized) { + torch::NoGradGuard no_grad; + + rl::RunningNormalizer normalizer; + EXPECT_FALSE(normalizer.isInitialized()); + + auto input = make_batch(4); + auto output = normalizer.normalize(input); + + // Should be the exact same tensor (no-op) + EXPECT_TRUE(torch::equal(input, output)) + << "normalize() should return input unchanged before stats are initialized"; +} + +// ---- Test 5: checkpoint round-trip --------------------------------------- +// Save and load the normalizer state, then verify the loaded normalizer +// produces the same normalized output as the original. +TEST(RunningNormalizer, CheckpointRoundTrip) { + torch::manual_seed(99); + torch::NoGradGuard no_grad; + + rl::RunningNormalizer normalizer; + for (int i = 0; i < 200; ++i) { + normalizer.update(make_batch(50)); + } + + const std::string path = "/tmp/test_running_normalizer.pt"; + normalizer.save(path); + + rl::RunningNormalizer loaded; + loaded.load(path); + + ASSERT_TRUE(loaded.isInitialized()); + + auto probe = make_batch(16); + auto out_original = normalizer.normalize(probe); + auto out_loaded = loaded.normalize(probe); + + EXPECT_TRUE(torch::allclose(out_original, out_loaded, /*rtol=*/1e-5, /*atol=*/1e-6)) + << "Loaded normalizer should produce identical output to original"; +} + +// ========================================================================= +// scale_only mode tests (return normalization) +// ========================================================================= + +// ---- Test 6: scale_only preserves the mean -------------------------------- +// The defining property of scale_only mode: the mean of the input distribution +// is NOT removed. After normalization the output mean should be ~(true_mean / true_std), +// not ~0. +TEST(RunningNormalizerScaleOnly, MeanPreserved) { + torch::manual_seed(200); + torch::NoGradGuard no_grad; + + rl::RunningNormalizer normalizer(1e-8f, /* scale_only = */ true); + + const int batch_size = 100; + const int n_batches = 500; // 50000 samples + + for (int i = 0; i < n_batches; ++i) { + normalizer.update(make_batch(batch_size)); + } + + // Normalize a fresh large batch and check output statistics + const int test_size = 10000; + auto test_batch = make_batch(test_size); + auto normalized = normalizer.normalize(test_batch); + + auto out_mean = normalized.mean(0); // [N_FEATURES] + auto out_std = normalized.std(0); + + for (int f = 0; f < N_FEATURES; ++f) { + float expected_mean = TRUE_MEAN[f] / TRUE_STD[f]; + // output mean should be ~true_mean / true_std (NOT ~0) + EXPECT_NEAR(out_mean[f].item(), expected_mean, 0.05f) + << "Feature " << f << ": scale_only output mean should be ~true_mean/true_std, not 0"; + + // output std should still be ~1 (variance is still normalized) + EXPECT_NEAR(out_std[f].item(), 1.0f, 0.05f) + << "Feature " << f << ": scale_only output std should be ~1"; + } +} + +// ---- Test 7: scale_only vs full — same std, different mean --------------- +// Both modes should produce unit output std. Only the mean differs. +// This test makes the contrast explicit with the same data and seed. +TEST(RunningNormalizerScaleOnly, SameStdDifferentMean) { + torch::manual_seed(201); + torch::NoGradGuard no_grad; + + rl::RunningNormalizer full_norm(1e-8f, /* scale_only = */ false); + rl::RunningNormalizer scale_norm(1e-8f, /* scale_only = */ true); + + const int batch_size = 100; + const int n_batches = 500; + + for (int i = 0; i < n_batches; ++i) { + auto batch = make_batch(batch_size); + full_norm.update(batch); + scale_norm.update(batch); + } + + const int test_size = 10000; + // use the same seed so both see identical test data + torch::manual_seed(9999); + auto test_batch = make_batch(test_size); + + auto out_full = full_norm.normalize(test_batch); + auto out_scale = scale_norm.normalize(test_batch); + + auto full_mean = out_full.mean(0); + auto scale_mean = out_scale.mean(0); + auto full_std = out_full.std(0); + auto scale_std = out_scale.std(0); + + for (int f = 0; f < N_FEATURES; ++f) { + // full mode: mean ~0 + EXPECT_NEAR(full_mean[f].item(), 0.0f, 0.05f) + << "Feature " << f << ": full mode mean should be ~0"; + + // scale_only mode: mean nonzero (only zero if true mean happens to be 0) + // Here all TRUE_MEAN values are nonzero, so the output mean must differ from 0 + EXPECT_GT(std::abs(scale_mean[f].item()), 0.1f) + << "Feature " << f << ": scale_only mode mean should be nonzero"; + + // both modes: std ~1 + EXPECT_NEAR(full_std[f].item(), 1.0f, 0.05f) + << "Feature " << f << ": full mode std should be ~1"; + EXPECT_NEAR(scale_std[f].item(), 1.0f, 0.05f) + << "Feature " << f << ": scale_only mode std should be ~1"; + } +} + +// ---- Test 8: scale_only checkpoint round-trip preserves mode ------------- +// Saving and loading a scale_only normalizer should restore the flag so that +// the loaded normalizer still does not subtract the mean. +TEST(RunningNormalizerScaleOnly, CheckpointPreservesMode) { + torch::manual_seed(202); + torch::NoGradGuard no_grad; + + rl::RunningNormalizer normalizer(1e-8f, /* scale_only = */ true); + for (int i = 0; i < 200; ++i) { + normalizer.update(make_batch(50)); + } + + const std::string path = "/tmp/test_running_normalizer_scale_only.pt"; + normalizer.save(path); + + // Load into a default (scale_only=false) instance — the saved flag should override + rl::RunningNormalizer loaded; + loaded.load(path); + + auto probe = make_batch(16); + auto out_original = normalizer.normalize(probe); + auto out_loaded = loaded.normalize(probe); + + // Outputs must match exactly (scale_only mode was restored from checkpoint) + EXPECT_TRUE(torch::allclose(out_original, out_loaded, /*rtol=*/1e-5, /*atol=*/1e-6)) + << "Loaded scale_only normalizer should produce identical output to original"; + + // Verify the loaded normalizer does NOT zero-center: its output mean should be nonzero + auto large_probe = make_batch(5000); + auto out_large = loaded.normalize(large_probe); + auto out_mean = out_large.mean(0); + for (int f = 0; f < N_FEATURES; ++f) { + EXPECT_GT(std::abs(out_mean[f].item()), 0.1f) + << "Feature " << f << ": loaded scale_only normalizer must not zero-center output"; + } +} + +int main(int argc, char* argv[]) { + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} From cd6a45854e3325d516b84cf2eabf49930d01f93f Mon Sep 17 00:00:00 2001 From: Thorsten Kurth Date: Mon, 20 Apr 2026 11:30:51 +0200 Subject: [PATCH 05/15] updating license header in test Signed-off-by: Thorsten Kurth --- tests/rl/test_running_normalizer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/rl/test_running_normalizer.cpp b/tests/rl/test_running_normalizer.cpp index c877984..ef5a4ad 100644 --- a/tests/rl/test_running_normalizer.cpp +++ b/tests/rl/test_running_normalizer.cpp @@ -1,5 +1,5 @@ /* - * SPDX-FileCopyrightText: Copyright (c) 2023-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved. + * SPDX-FileCopyrightText: Copyright (c) 2023-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. * SPDX-License-Identifier: Apache-2.0 * * Licensed under the Apache License, Version 2.0 (the "License"); From 0e3684df22e0f14b5d0fe431bff5fd7b232ad4ff Mon Sep 17 00:00:00 2001 From: Thorsten Kurth Date: Mon, 20 Apr 2026 11:31:35 +0200 Subject: [PATCH 06/15] fixing code formatting Signed-off-by: Thorsten Kurth --- src/csrc/rl/running_normalizer.cpp | 6 ++- tests/rl/test_running_normalizer.cpp | 57 ++++++++++++---------------- 2 files changed, 29 insertions(+), 34 deletions(-) diff --git a/src/csrc/rl/running_normalizer.cpp b/src/csrc/rl/running_normalizer.cpp index baa532c..444d07a 100644 --- a/src/csrc/rl/running_normalizer.cpp +++ b/src/csrc/rl/running_normalizer.cpp @@ -51,7 +51,8 @@ void RunningNormalizer::update(torch::Tensor x) { } torch::Tensor RunningNormalizer::normalize(torch::Tensor x) const { - if (count_ < 2) return x; + if (count_ < 2) + return x; torch::NoGradGuard no_grad; @@ -74,7 +75,8 @@ torch::Tensor RunningNormalizer::normalize(torch::Tensor x) const { } void RunningNormalizer::sync(std::shared_ptr comm) { - if (!comm || count_ == 0) return; + if (!comm || count_ == 0) + return; torch::NoGradGuard no_grad; diff --git a/tests/rl/test_running_normalizer.cpp b/tests/rl/test_running_normalizer.cpp index ef5a4ad..8a67070 100644 --- a/tests/rl/test_running_normalizer.cpp +++ b/tests/rl/test_running_normalizer.cpp @@ -27,12 +27,12 @@ using namespace torchfort; // reliably caught. static const int N_FEATURES = 4; static const std::vector TRUE_MEAN = {3.0f, -2.0f, 0.5f, 10.0f}; -static const std::vector TRUE_STD = {1.5f, 0.5f, 3.0f, 0.2f}; +static const std::vector TRUE_STD = {1.5f, 0.5f, 3.0f, 0.2f}; // Helper: build a [batch_size, N_FEATURES] tensor sampled from the ground-truth distribution. static torch::Tensor make_batch(int batch_size) { auto mean = torch::tensor(TRUE_MEAN); - auto std = torch::tensor(TRUE_STD); + auto std = torch::tensor(TRUE_STD); return torch::randn({batch_size, N_FEATURES}) * std.unsqueeze(0) + mean.unsqueeze(0); } @@ -47,7 +47,7 @@ TEST(RunningNormalizer, StatsAccuracy) { rl::RunningNormalizer normalizer; const int batch_size = 100; - const int n_batches = 500; // 50000 samples total + const int n_batches = 500; // 50000 samples total for (int i = 0; i < n_batches; ++i) { normalizer.update(make_batch(batch_size)); @@ -63,7 +63,7 @@ TEST(RunningNormalizer, StatsAccuracy) { // Strategy: normalize(true_mean_tensor) should yield ~0, and // normalize(true_mean_tensor + true_std_tensor) should yield ~1. auto mean_tensor = torch::tensor(TRUE_MEAN).unsqueeze(0); // [1, 4] - auto std_tensor = torch::tensor(TRUE_STD).unsqueeze(0); + auto std_tensor = torch::tensor(TRUE_STD).unsqueeze(0); auto normalized_mean = normalizer.normalize(mean_tensor); auto normalized_mean_plus_std = normalizer.normalize(mean_tensor + std_tensor); @@ -92,7 +92,7 @@ TEST(RunningNormalizer, NormalizedOutputDistribution) { // Warm up the normalizer with 50000 samples const int warmup_batches = 500; - const int batch_size = 100; + const int batch_size = 100; for (int i = 0; i < warmup_batches; ++i) { normalizer.update(make_batch(batch_size)); } @@ -100,20 +100,18 @@ TEST(RunningNormalizer, NormalizedOutputDistribution) { // Normalize a fresh large batch (10000 samples) and measure output stats const int test_size = 10000; auto test_batch = make_batch(test_size); - auto normalized = normalizer.normalize(test_batch); + auto normalized = normalizer.normalize(test_batch); // Per-feature mean should be ~0 auto out_mean = normalized.mean(0); // [N_FEATURES] for (int f = 0; f < N_FEATURES; ++f) { - EXPECT_NEAR(out_mean[f].item(), 0.0f, 0.05f) - << "Feature " << f << ": normalized output mean should be ~0"; + EXPECT_NEAR(out_mean[f].item(), 0.0f, 0.05f) << "Feature " << f << ": normalized output mean should be ~0"; } // Per-feature std should be ~1 auto out_std = normalized.std(0); // [N_FEATURES], unbiased for (int f = 0; f < N_FEATURES; ++f) { - EXPECT_NEAR(out_std[f].item(), 1.0f, 0.05f) - << "Feature " << f << ": normalized output std should be ~1"; + EXPECT_NEAR(out_std[f].item(), 1.0f, 0.05f) << "Feature " << f << ": normalized output std should be ~1"; } } @@ -126,7 +124,7 @@ TEST(RunningNormalizer, IncrementalVsBatch) { // Build a fixed dataset once const int total_samples = 10000; - const int small_batch = 10; + const int small_batch = 10; auto full_data = make_batch(total_samples); // [10000, 4] // Normalizer A: one large update @@ -141,7 +139,7 @@ TEST(RunningNormalizer, IncrementalVsBatch) { // Both should produce identical normalized output for the same input auto probe = make_batch(32); - auto out_batch = norm_batch.normalize(probe); + auto out_batch = norm_batch.normalize(probe); auto out_incremental = norm_incremental.normalize(probe); // Element-wise match to float32 precision @@ -162,8 +160,7 @@ TEST(RunningNormalizer, EarlyReturnBeforeInitialized) { auto output = normalizer.normalize(input); // Should be the exact same tensor (no-op) - EXPECT_TRUE(torch::equal(input, output)) - << "normalize() should return input unchanged before stats are initialized"; + EXPECT_TRUE(torch::equal(input, output)) << "normalize() should return input unchanged before stats are initialized"; } // ---- Test 5: checkpoint round-trip --------------------------------------- @@ -188,7 +185,7 @@ TEST(RunningNormalizer, CheckpointRoundTrip) { auto probe = make_batch(16); auto out_original = normalizer.normalize(probe); - auto out_loaded = loaded.normalize(probe); + auto out_loaded = loaded.normalize(probe); EXPECT_TRUE(torch::allclose(out_original, out_loaded, /*rtol=*/1e-5, /*atol=*/1e-6)) << "Loaded normalizer should produce identical output to original"; @@ -209,7 +206,7 @@ TEST(RunningNormalizerScaleOnly, MeanPreserved) { rl::RunningNormalizer normalizer(1e-8f, /* scale_only = */ true); const int batch_size = 100; - const int n_batches = 500; // 50000 samples + const int n_batches = 500; // 50000 samples for (int i = 0; i < n_batches; ++i) { normalizer.update(make_batch(batch_size)); @@ -218,10 +215,10 @@ TEST(RunningNormalizerScaleOnly, MeanPreserved) { // Normalize a fresh large batch and check output statistics const int test_size = 10000; auto test_batch = make_batch(test_size); - auto normalized = normalizer.normalize(test_batch); + auto normalized = normalizer.normalize(test_batch); auto out_mean = normalized.mean(0); // [N_FEATURES] - auto out_std = normalized.std(0); + auto out_std = normalized.std(0); for (int f = 0; f < N_FEATURES; ++f) { float expected_mean = TRUE_MEAN[f] / TRUE_STD[f]; @@ -230,8 +227,7 @@ TEST(RunningNormalizerScaleOnly, MeanPreserved) { << "Feature " << f << ": scale_only output mean should be ~true_mean/true_std, not 0"; // output std should still be ~1 (variance is still normalized) - EXPECT_NEAR(out_std[f].item(), 1.0f, 0.05f) - << "Feature " << f << ": scale_only output std should be ~1"; + EXPECT_NEAR(out_std[f].item(), 1.0f, 0.05f) << "Feature " << f << ": scale_only output std should be ~1"; } } @@ -246,7 +242,7 @@ TEST(RunningNormalizerScaleOnly, SameStdDifferentMean) { rl::RunningNormalizer scale_norm(1e-8f, /* scale_only = */ true); const int batch_size = 100; - const int n_batches = 500; + const int n_batches = 500; for (int i = 0; i < n_batches; ++i) { auto batch = make_batch(batch_size); @@ -259,18 +255,17 @@ TEST(RunningNormalizerScaleOnly, SameStdDifferentMean) { torch::manual_seed(9999); auto test_batch = make_batch(test_size); - auto out_full = full_norm.normalize(test_batch); + auto out_full = full_norm.normalize(test_batch); auto out_scale = scale_norm.normalize(test_batch); - auto full_mean = out_full.mean(0); + auto full_mean = out_full.mean(0); auto scale_mean = out_scale.mean(0); - auto full_std = out_full.std(0); - auto scale_std = out_scale.std(0); + auto full_std = out_full.std(0); + auto scale_std = out_scale.std(0); for (int f = 0; f < N_FEATURES; ++f) { // full mode: mean ~0 - EXPECT_NEAR(full_mean[f].item(), 0.0f, 0.05f) - << "Feature " << f << ": full mode mean should be ~0"; + EXPECT_NEAR(full_mean[f].item(), 0.0f, 0.05f) << "Feature " << f << ": full mode mean should be ~0"; // scale_only mode: mean nonzero (only zero if true mean happens to be 0) // Here all TRUE_MEAN values are nonzero, so the output mean must differ from 0 @@ -278,10 +273,8 @@ TEST(RunningNormalizerScaleOnly, SameStdDifferentMean) { << "Feature " << f << ": scale_only mode mean should be nonzero"; // both modes: std ~1 - EXPECT_NEAR(full_std[f].item(), 1.0f, 0.05f) - << "Feature " << f << ": full mode std should be ~1"; - EXPECT_NEAR(scale_std[f].item(), 1.0f, 0.05f) - << "Feature " << f << ": scale_only mode std should be ~1"; + EXPECT_NEAR(full_std[f].item(), 1.0f, 0.05f) << "Feature " << f << ": full mode std should be ~1"; + EXPECT_NEAR(scale_std[f].item(), 1.0f, 0.05f) << "Feature " << f << ": scale_only mode std should be ~1"; } } @@ -306,7 +299,7 @@ TEST(RunningNormalizerScaleOnly, CheckpointPreservesMode) { auto probe = make_batch(16); auto out_original = normalizer.normalize(probe); - auto out_loaded = loaded.normalize(probe); + auto out_loaded = loaded.normalize(probe); // Outputs must match exactly (scale_only mode was restored from checkpoint) EXPECT_TRUE(torch::allclose(out_original, out_loaded, /*rtol=*/1e-5, /*atol=*/1e-6)) From 4e31cbe7135d4102eefe87b9a9f1972440fbdbb1 Mon Sep 17 00:00:00 2001 From: Thorsten Kurth Date: Mon, 20 Apr 2026 12:00:03 +0200 Subject: [PATCH 07/15] adding rewards and return normalization Signed-off-by: Thorsten Kurth --- .../include/internal/rl/off_policy/ddpg.h | 3 ++ src/csrc/include/internal/rl/off_policy/sac.h | 3 ++ src/csrc/include/internal/rl/off_policy/td3.h | 3 ++ src/csrc/include/internal/rl/on_policy/ppo.h | 7 ++- src/csrc/include/internal/rl/rollout_buffer.h | 37 ++++++++++++++ src/csrc/rl/off_policy/ddpg.cpp | 33 ++++++++++++- src/csrc/rl/off_policy/sac.cpp | 49 +++++++++++++++++-- src/csrc/rl/off_policy/td3.cpp | 35 +++++++++++-- src/csrc/rl/on_policy/ppo.cpp | 48 +++++++++++++++--- 9 files changed, 201 insertions(+), 17 deletions(-) diff --git a/src/csrc/include/internal/rl/off_policy/ddpg.h b/src/csrc/include/internal/rl/off_policy/ddpg.h index 710cf67..4c0190a 100644 --- a/src/csrc/include/internal/rl/off_policy/ddpg.h +++ b/src/csrc/include/internal/rl/off_policy/ddpg.h @@ -309,6 +309,9 @@ class DDPGSystem : public RLOffPolicySystem, public std::enable_shared_from_this // state normalizer (optional, null if disabled) std::unique_ptr state_normalizer_; + // reward normalizer (optional, null if disabled); scale_only=true so mean is preserved + std::unique_ptr reward_normalizer_; + // some parameters int batch_size_; int num_critics_; diff --git a/src/csrc/include/internal/rl/off_policy/sac.h b/src/csrc/include/internal/rl/off_policy/sac.h index 5215358..677059f 100644 --- a/src/csrc/include/internal/rl/off_policy/sac.h +++ b/src/csrc/include/internal/rl/off_policy/sac.h @@ -432,6 +432,9 @@ class SACSystem : public RLOffPolicySystem, public std::enable_shared_from_this< // state normalizer (optional, null if disabled) std::unique_ptr state_normalizer_; + // reward normalizer (optional, null if disabled); scale_only=true so mean is preserved + std::unique_ptr reward_normalizer_; + // some parameters int batch_size_; int num_critics_; diff --git a/src/csrc/include/internal/rl/off_policy/td3.h b/src/csrc/include/internal/rl/off_policy/td3.h index 49491df..1d26702 100644 --- a/src/csrc/include/internal/rl/off_policy/td3.h +++ b/src/csrc/include/internal/rl/off_policy/td3.h @@ -342,6 +342,9 @@ class TD3System : public RLOffPolicySystem, public std::enable_shared_from_this< // state normalizer (optional, null if disabled) std::unique_ptr state_normalizer_; + // reward normalizer (optional, null if disabled); scale_only=true so mean is preserved + std::unique_ptr reward_normalizer_; + // some parameters int batch_size_; int num_critics_; diff --git a/src/csrc/include/internal/rl/on_policy/ppo.h b/src/csrc/include/internal/rl/on_policy/ppo.h index 434be49..e0b410b 100644 --- a/src/csrc/include/internal/rl/on_policy/ppo.h +++ b/src/csrc/include/internal/rl/on_policy/ppo.h @@ -287,6 +287,9 @@ class PPOSystem : public RLOnPolicySystem, public std::enable_shared_from_this state_normalizer_; + // return normalizer (optional, null if disabled); scale_only=true so mean is preserved + std::unique_ptr return_normalizer_; + // some parameters int batch_size_; float epsilon_, clip_q_; @@ -296,7 +299,9 @@ class PPOSystem : public RLOnPolicySystem, public std::enable_shared_from_this comm, RunningNormalizer& return_normalizer) { + if (!finalized_) { + throw std::runtime_error( + "GAELambdaRolloutBuffer::normalizeReturns: buffer must be finalized before normalizing returns."); + } + + torch::NoGradGuard no_grad; + + // flatten all returns to [size_ * n_envs_, 1]: single scalar feature per sample + auto all_ret = torch::stack(returns_, 0).reshape({-1, 1}).to(torch::kFloat32); + + // update running variance with this rollout's returns, then sync across ranks + return_normalizer.update(all_ret); + return_normalizer.sync(comm); + + // apply scale-only normalization: R_norm = R / std(R) + // the same std is applied to advantages: A_scaled = A / std(R), + // preserving the relationship A = R - V when both are on the same scale + auto all_ret_norm = return_normalizer.normalize(all_ret); + auto all_adv = torch::stack(advantages_, 0).reshape({-1, 1}).to(torch::kFloat32); + auto all_adv_scaled = return_normalizer.normalize(all_adv); + + // write normalized values back to per-step tensors + auto ret_reshaped = all_ret_norm.reshape({static_cast(size_), static_cast(n_envs_)}); + auto adv_reshaped = all_adv_scaled.reshape({static_cast(size_), static_cast(n_envs_)}); + for (size_t step = 0; step < size_; ++step) { + returns_[step] = ret_reshaped[step]; + advantages_[step] = adv_reshaped[step]; + } + } + std::tuple sample(int batch_size) { diff --git a/src/csrc/rl/off_policy/ddpg.cpp b/src/csrc/rl/off_policy/ddpg.cpp index ed001c5..d0e7b81 100644 --- a/src/csrc/rl/off_policy/ddpg.cpp +++ b/src/csrc/rl/off_policy/ddpg.cpp @@ -36,15 +36,18 @@ DDPGSystem::DDPGSystem(const char* name, const YAML::Node& system_node, int mode if (algo_node["parameters"]) { auto params = get_params(algo_node["parameters"]); std::set supported_params{"batch_size", "nstep", "nstep_reward_reduction", - "gamma", "rho", "normalize_state"}; + "gamma", "rho", "normalize_states", "normalize_rewards"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; gamma_ = params.get_param("gamma")[0]; rho_ = params.get_param("rho")[0]; nstep_ = params.get_param("nstep", 1)[0]; - if (params.get_param("normalize_state", false)[0]) { + if (params.get_param("normalize_states", false)[0]) { state_normalizer_ = std::make_unique(); } + if (params.get_param("normalize_rewards", false)[0]) { + reward_normalizer_ = std::make_unique(1e-8f, /* scale_only = */ true); + } auto redmode = params.get_param("nstep_reward_reduction", "sum")[0]; if (redmode == "sum") { nstep_reward_reduction_ = RewardReductionMode::Sum; @@ -334,6 +337,12 @@ void DDPGSystem::saveCheckpoint(const std::string& checkpoint_dir) const { state_normalizer_->save(normalizer_path.native()); } + // reward normalizer + if (reward_normalizer_) { + auto normalizer_path = root_dir / "reward_normalizer.pt"; + reward_normalizer_->save(normalizer_path.native()); + } + // lastly, save the replay buffer: { auto buffer_path = root_dir / "replay_buffer"; @@ -378,6 +387,18 @@ void DDPGSystem::loadCheckpoint(const std::string& checkpoint_dir) { } } + // reward normalizer + if (reward_normalizer_) { + auto normalizer_path = root_dir / "reward_normalizer.pt"; + if (!std::filesystem::exists(normalizer_path)) { + torchfort::logging::print("DDPG: reward normalizer is enabled but no saved state was found in the checkpoint. " + "Starting with empty statistics.", + torchfort::logging::warn); + } else { + reward_normalizer_->load(normalizer_path.native()); + } + } + // lastly, load the replay buffer: { auto buffer_path = root_dir / "replay_buffer"; @@ -393,6 +414,8 @@ void DDPGSystem::updateReplayBuffer(torch::Tensor s, torch::Tensor a, torch::Ten torch::Tensor d) { if (state_normalizer_) state_normalizer_->update(s); + if (reward_normalizer_) + reward_normalizer_->update(r.unsqueeze(1)); replay_buffer_->update(s, a, sp, r, d); } @@ -531,6 +554,12 @@ void DDPGSystem::trainStep(float& p_loss_val, float& q_loss_val) { sp = state_normalizer_->normalize(sp); } + // sync and apply reward normalization + if (reward_normalizer_) { + reward_normalizer_->sync(p_model_.comm); + r = reward_normalizer_->normalize(r.unsqueeze(1)).squeeze(1); + } + // get a new action by predicting one with target network ap = predictWithNoiseTrain_(sp); } diff --git a/src/csrc/rl/off_policy/sac.cpp b/src/csrc/rl/off_policy/sac.cpp index 76f342c..4649fb1 100644 --- a/src/csrc/rl/off_policy/sac.cpp +++ b/src/csrc/rl/off_policy/sac.cpp @@ -62,17 +62,20 @@ SACSystem::SACSystem(const char* name, const YAML::Node& system_node, int model_ auto algo_node = system_node["algorithm"]; if (algo_node["parameters"]) { auto params = get_params(algo_node["parameters"]); - std::set supported_params{"batch_size", "num_critics", "nstep", "nstep_reward_reduction", - "gamma", "rho", "alpha", "target_entropy", - "normalize_state"}; + std::set supported_params{"batch_size", "num_critics", "nstep", "nstep_reward_reduction", + "gamma", "rho", "alpha", "target_entropy", + "normalize_states", "normalize_rewards"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; num_critics_ = params.get_param("num_critics", 2)[0]; gamma_ = params.get_param("gamma")[0]; rho_ = params.get_param("rho")[0]; - if (params.get_param("normalize_state", false)[0]) { + if (params.get_param("normalize_states", false)[0]) { state_normalizer_ = std::make_unique(); } + if (params.get_param("normalize_rewards", false)[0]) { + reward_normalizer_ = std::make_unique(1e-8f, /* scale_only = */ true); + } // alpha needs special care AlphaModel am; am.setup(params.get_param("alpha", 0.)[0]); @@ -215,6 +218,18 @@ SACSystem::SACSystem(const char* name, const YAML::Node& system_node, int model_ } // in this case we want to optimize the entropy coefficient + // NOTE on normalization interactions: + // The automatic alpha tuning adjusts alpha so that the policy entropy matches H_target = -action_dim. + // This balance depends on Q-values being on a consistent scale, since the policy gradient mixes + // Q(s,a) with alpha * log_pi. Therefore: + // - normalize_rewards is strongly recommended when using alpha_optimizer: it keeps Q-values + // on a consistent scale regardless of reward magnitude, making the default H_target heuristic + // robust across tasks. Without it, tasks with large rewards require a proportionally large alpha + // to have any effect, and vice versa. + // - normalize_states interacts more mildly, but loading a checkpoint where the state normalizer + // has no saved statistics (e.g. enabling normalization mid-training) will cause the policy + // entropy over normalized states to differ significantly from the pre-training value, forcing + // alpha to re-adapt. This transient disruption is more severe in SAC than in DDPG/TD3. if (system_node["alpha_optimizer"]) { // register alpha as a new parameter alpha_optimizer_ = get_optimizer(system_node["alpha_optimizer"], alpha_model_->parameters()); @@ -443,6 +458,12 @@ void SACSystem::saveCheckpoint(const std::string& checkpoint_dir) const { state_normalizer_->save(normalizer_path.native()); } + // reward normalizer + if (reward_normalizer_) { + auto normalizer_path = root_dir / "reward_normalizer.pt"; + reward_normalizer_->save(normalizer_path.native()); + } + // lastly, save the replay buffer: { auto buffer_path = root_dir / "replay_buffer"; @@ -547,6 +568,18 @@ void SACSystem::loadCheckpoint(const std::string& checkpoint_dir) { } } + // reward normalizer + if (reward_normalizer_) { + auto normalizer_path = root_dir / "reward_normalizer.pt"; + if (!std::filesystem::exists(normalizer_path)) { + torchfort::logging::print("SAC: reward normalizer is enabled but no saved state was found in the checkpoint. " + "Starting with empty statistics.", + torchfort::logging::warn); + } else { + reward_normalizer_->load(normalizer_path.native()); + } + } + // lastly, load the replay buffer: { auto buffer_path = root_dir / "replay_buffer"; @@ -562,6 +595,8 @@ void SACSystem::updateReplayBuffer(torch::Tensor s, torch::Tensor a, torch::Tens torch::Tensor d) { if (state_normalizer_) state_normalizer_->update(s); + if (reward_normalizer_) + reward_normalizer_->update(r.unsqueeze(1)); // note that we have to rescale the action: [a_low, a_high] -> [-1, 1], // but the replay buffer only stores scaled actions! replay_buffer_->update(s, a, sp, r, d); @@ -685,6 +720,12 @@ void SACSystem::trainStep(float& p_loss_val, float& q_loss_val) { s = state_normalizer_->normalize(s); sp = state_normalizer_->normalize(sp); } + + // sync and apply reward normalization + if (reward_normalizer_) { + reward_normalizer_->sync(p_model_.comm); + r = reward_normalizer_->normalize(r.unsqueeze(1)).squeeze(1); + } } // train step diff --git a/src/csrc/rl/off_policy/td3.cpp b/src/csrc/rl/off_policy/td3.cpp index 73af389..139b0af 100644 --- a/src/csrc/rl/off_policy/td3.cpp +++ b/src/csrc/rl/off_policy/td3.cpp @@ -35,8 +35,8 @@ TD3System::TD3System(const char* name, const YAML::Node& system_node, int model_ if (algo_node["parameters"]) { auto params = get_params(algo_node["parameters"]); std::set supported_params{ - "batch_size", "num_critics", "policy_lag", "nstep", "nstep_reward_reduction", - "gamma", "rho", "normalize_state"}; + "batch_size", "num_critics", "policy_lag", "nstep", "nstep_reward_reduction", + "gamma", "rho", "normalize_states", "normalize_rewards"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; num_critics_ = params.get_param("num_critics", 2)[0]; @@ -44,9 +44,12 @@ TD3System::TD3System(const char* name, const YAML::Node& system_node, int model_ gamma_ = params.get_param("gamma")[0]; rho_ = params.get_param("rho")[0]; nstep_ = params.get_param("nstep", 1)[0]; - if (params.get_param("normalize_state", false)[0]) { + if (params.get_param("normalize_states", false)[0]) { state_normalizer_ = std::make_unique(); } + if (params.get_param("normalize_rewards", false)[0]) { + reward_normalizer_ = std::make_unique(1e-8f, /* scale_only = */ true); + } auto redmode = params.get_param("nstep_reward_reduction", "sum")[0]; if (redmode == "sum") { nstep_reward_reduction_ = RewardReductionMode::Sum; @@ -397,6 +400,12 @@ void TD3System::saveCheckpoint(const std::string& checkpoint_dir) const { state_normalizer_->save(normalizer_path.native()); } + // reward normalizer + if (reward_normalizer_) { + auto normalizer_path = root_dir / "reward_normalizer.pt"; + reward_normalizer_->save(normalizer_path.native()); + } + // lastly, save the replay buffer: { auto buffer_path = root_dir / "replay_buffer"; @@ -450,6 +459,18 @@ void TD3System::loadCheckpoint(const std::string& checkpoint_dir) { } } + // reward normalizer + if (reward_normalizer_) { + auto normalizer_path = root_dir / "reward_normalizer.pt"; + if (!std::filesystem::exists(normalizer_path)) { + torchfort::logging::print("TD3: reward normalizer is enabled but no saved state was found in the checkpoint. " + "Starting with empty statistics.", + torchfort::logging::warn); + } else { + reward_normalizer_->load(normalizer_path.native()); + } + } + // lastly, load the replay buffer: { auto buffer_path = root_dir / "replay_buffer"; @@ -465,6 +486,8 @@ void TD3System::updateReplayBuffer(torch::Tensor s, torch::Tensor a, torch::Tens torch::Tensor d) { if (state_normalizer_) state_normalizer_->update(s); + if (reward_normalizer_) + reward_normalizer_->update(r.unsqueeze(1)); replay_buffer_->update(s, a, sp, r, d); } @@ -605,6 +628,12 @@ void TD3System::trainStep(float& p_loss_val, float& q_loss_val) { sp = state_normalizer_->normalize(sp); } + // sync and apply reward normalization + if (reward_normalizer_) { + reward_normalizer_->sync(p_model_.comm); + r = reward_normalizer_->normalize(r.unsqueeze(1)).squeeze(1); + } + // get a new action by predicting one with target network ap = predictWithNoiseTrain_(sp); } diff --git a/src/csrc/rl/on_policy/ppo.cpp b/src/csrc/rl/on_policy/ppo.cpp index 8ba5bac..28ea049 100644 --- a/src/csrc/rl/on_policy/ppo.cpp +++ b/src/csrc/rl/on_policy/ppo.cpp @@ -44,10 +44,11 @@ PPOSystem::PPOSystem(const char* name, const YAML::Node& system_node, int model_ "entropy_loss_coefficient", "value_loss_coefficient", "normalize_advantage", - "normalize_state"}; + "normalize_returns", + "normalize_states"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; - if (params.get_param("normalize_state", false)[0]) { + if (params.get_param("normalize_states", false)[0]) { state_normalizer_ = std::make_unique(); } gamma_ = params.get_param("gamma")[0]; @@ -63,7 +64,12 @@ PPOSystem::PPOSystem(const char* name, const YAML::Node& system_node, int model_ entropy_loss_coeff_ = params.get_param("entropy_loss_coefficient", 0.0)[0]; value_loss_coeff_ = params.get_param("value_loss_coefficient", 0.5)[0]; normalize_advantage_ = params.get_param("normalize_advantage", true)[0]; + normalize_returns_ = params.get_param("normalize_returns", false)[0]; + if (normalize_returns_) { + return_normalizer_ = std::make_unique(1e-8f, /* scale_only = */ true); + } advantage_normalized_ = false; + returns_normalized_ = false; } else { THROW_INVALID_USAGE("Missing parameters section in algorithm section in configuration file."); } @@ -261,6 +267,12 @@ void PPOSystem::saveCheckpoint(const std::string& checkpoint_dir) const { state_normalizer_->save(normalizer_path.native()); } + // return normalizer + if (return_normalizer_) { + auto normalizer_path = root_dir / "return_normalizer.pt"; + return_normalizer_->save(normalizer_path.native()); + } + // lastly, save the replay buffer: { auto buffer_path = root_dir / "rollout_buffer"; @@ -323,6 +335,18 @@ void PPOSystem::loadCheckpoint(const std::string& checkpoint_dir) { } } + // return normalizer + if (return_normalizer_) { + auto normalizer_path = root_dir / "return_normalizer.pt"; + if (!std::filesystem::exists(normalizer_path)) { + torchfort::logging::print("PPO: return normalizer is enabled but no saved state was found in the checkpoint. " + "Starting with empty statistics.", + torchfort::logging::warn); + } else { + return_normalizer_->load(normalizer_path.native()); + } + } + // lastly, load the rollout buffer: { auto buffer_path = root_dir / "rollout_buffer"; @@ -376,16 +400,26 @@ void PPOSystem::updateRolloutBuffer(torch::Tensor stens, torch::Tensor atens, to // the replay buffer only stores scaled actions! rollout_buffer_->update(stens, as, rtens, value, log_p_tensor, etens); - // normalize advantages once over the full rollout as soon as it is finalized, - // before any mini-batch sampling starts - if (normalize_advantage_ && rollout_buffer_->isReady() && !advantage_normalized_) { - rollout_buffer_->normalizeAdvantages(pq_model_.comm); - advantage_normalized_ = true; + // once per rollout, after finalization and before any mini-batch sampling: + // 1. normalize returns (scale R and A by running return std, preserving mean) + // 2. normalize advantages (zero-center and unit-std A on top of the return scale) + // order matters: return normalization must happen first so advantage normalization + // operates on the already-return-scaled advantages + if (rollout_buffer_->isReady()) { + if (return_normalizer_ && !returns_normalized_) { + rollout_buffer_->normalizeReturns(pq_model_.comm, *return_normalizer_); + returns_normalized_ = true; + } + if (normalize_advantage_ && !advantage_normalized_) { + rollout_buffer_->normalizeAdvantages(pq_model_.comm); + advantage_normalized_ = true; + } } } void PPOSystem::resetRolloutBuffer() { rollout_buffer_->reset(); + returns_normalized_ = false; advantage_normalized_ = false; } From 7f26036e85776af00d9c64d3b40a30d16bc73a20 Mon Sep 17 00:00:00 2001 From: Thorsten Kurth Date: Mon, 20 Apr 2026 12:05:53 +0200 Subject: [PATCH 08/15] adding tests for returns and rewards normalization Signed-off-by: Thorsten Kurth --- tests/rl/test_replay_buffer.cpp | 140 ++++++++++++++++++++++++++ tests/rl/test_rollout_buffer.cpp | 141 +++++++++++++++++++++++++++ tests/rl/test_running_normalizer.cpp | 75 ++++++++++++++ 3 files changed, 356 insertions(+) diff --git a/tests/rl/test_replay_buffer.cpp b/tests/rl/test_replay_buffer.cpp index 24e854f..cf2bc37 100644 --- a/tests/rl/test_replay_buffer.cpp +++ b/tests/rl/test_replay_buffer.cpp @@ -16,6 +16,7 @@ */ #include "internal/rl/replay_buffer.h" +#include "internal/rl/running_normalizer.h" #include #include @@ -296,6 +297,145 @@ TEST_P(ReplayBuffer, SaveRestore) { INSTANTIATE_TEST_SUITE_P(MultiEnv, ReplayBuffer, testing::Range(1, 3), testing::PrintToStringParamName()); +// ========================================================================= +// Reward normalization tests +// Simulate the workflow used by DDPG/TD3/SAC: update the reward normalizer +// with each incoming reward batch (as in updateReplayBuffer), then normalize +// rewards sampled from the buffer (as in trainStep). +// ========================================================================= + +// ---- RewardNormalization: unit std, nonzero mean ------------------------- +// After the normalizer has seen enough rewards, normalizing a sampled reward +// batch should yield unit std but preserve the mean (scale_only=true). +TEST(RewardNormalization, UnitStdPreservedMean) { + torch::manual_seed(300); + torch::NoGradGuard no_grad; + + // Rewards ~ N(mean=5, std=2): a typical dense-reward task distribution + const float true_mean = 5.0f; + const float true_std = 2.0f; + const int n_envs = 1; + const int buffer_size = 512; + + auto rbuff = std::make_shared( + buffer_size, buffer_size, n_envs, 0.99f, 1, rl::RewardReductionMode::Sum, -1); + + rl::RunningNormalizer reward_normalizer(1e-8f, /* scale_only = */ true); + + // Fill the buffer, updating the normalizer with each reward batch exactly + // as DDPGSystem::updateReplayBuffer does + torch::Tensor state = torch::zeros({n_envs, 4}, torch::kFloat32); + for (int i = 0; i < buffer_size; ++i) { + auto action = torch::zeros({n_envs, 2}, torch::kFloat32); + auto next_state = state + 0.01f; + auto reward = torch::randn({n_envs}, torch::kFloat32) * true_std + true_mean; + auto done = torch::zeros({n_envs}, torch::kFloat32); + + // mirror the system's updateReplayBuffer call + reward_normalizer.update(reward.unsqueeze(1)); + rbuff->update(state, action, next_state, reward, done); + state = next_state; + } + + // Sample a batch and normalize rewards as trainStep does + const int batch_size = 256; + torch::Tensor s, a, sp, r, d; + std::tie(s, a, sp, r, d) = rbuff->sample(batch_size); + + // mirror the system's trainStep normalization + auto r_norm = reward_normalizer.normalize(r.unsqueeze(1)).squeeze(1); + + // std of normalized rewards should be ~1 + EXPECT_NEAR(r_norm.std().item(), 1.0f, 0.15f) + << "Normalized rewards should have std ~1"; + + // mean should be ~true_mean / true_std = 2.5 (not ~0) + float expected_mean = true_mean / true_std; + EXPECT_NEAR(r_norm.mean().item(), expected_mean, 0.3f) + << "Normalized rewards should preserve mean as ~true_mean/true_std"; +} + +// ---- RewardNormalization: sign preservation ------------------------------ +// Rewards from a strictly positive distribution must remain positive after +// normalization. This is the key correctness requirement for scale_only mode. +TEST(RewardNormalization, SignPreservation) { + torch::manual_seed(301); + torch::NoGradGuard no_grad; + + // Rewards ~ Uniform(1, 5): always positive + const int n_envs = 1; + const int buffer_size = 256; + + auto rbuff = std::make_shared( + buffer_size, buffer_size, n_envs, 0.99f, 1, rl::RewardReductionMode::Sum, -1); + + rl::RunningNormalizer reward_normalizer(1e-8f, /* scale_only = */ true); + + torch::Tensor state = torch::zeros({n_envs, 4}, torch::kFloat32); + for (int i = 0; i < buffer_size; ++i) { + auto action = torch::zeros({n_envs, 2}, torch::kFloat32); + auto next_state = state + 0.01f; + // strictly positive rewards in [1, 5] + auto reward = torch::rand({n_envs}, torch::kFloat32) * 4.0f + 1.0f; + auto done = torch::zeros({n_envs}, torch::kFloat32); + + reward_normalizer.update(reward.unsqueeze(1)); + rbuff->update(state, action, next_state, reward, done); + state = next_state; + } + + torch::Tensor s, a, sp, r, d; + std::tie(s, a, sp, r, d) = rbuff->sample(buffer_size); + auto r_norm = reward_normalizer.normalize(r.unsqueeze(1)).squeeze(1); + + // all normalized rewards must remain positive + EXPECT_TRUE((r_norm > 0).all().item()) + << "All positive rewards must remain positive after scale_only normalization"; +} + +// ---- RewardNormalization: large-scale rewards normalized to unit range ---- +// Rewards with large magnitude (e.g. N(100, 20)) should be brought to unit +// std. Without normalization these would dominate the Bellman target. +TEST(RewardNormalization, LargeScaleNormalizedToUnitStd) { + torch::manual_seed(302); + torch::NoGradGuard no_grad; + + const float true_mean = 100.0f; + const float true_std = 20.0f; + const int n_envs = 1; + const int buffer_size = 512; + + auto rbuff = std::make_shared( + buffer_size, buffer_size, n_envs, 0.99f, 1, rl::RewardReductionMode::Sum, -1); + + rl::RunningNormalizer reward_normalizer(1e-8f, /* scale_only = */ true); + + torch::Tensor state = torch::zeros({n_envs, 4}, torch::kFloat32); + for (int i = 0; i < buffer_size; ++i) { + auto action = torch::zeros({n_envs, 2}, torch::kFloat32); + auto next_state = state + 0.01f; + auto reward = torch::randn({n_envs}, torch::kFloat32) * true_std + true_mean; + auto done = torch::zeros({n_envs}, torch::kFloat32); + + reward_normalizer.update(reward.unsqueeze(1)); + rbuff->update(state, action, next_state, reward, done); + state = next_state; + } + + torch::Tensor s, a, sp, r, d; + std::tie(s, a, sp, r, d) = rbuff->sample(buffer_size); + auto r_norm = reward_normalizer.normalize(r.unsqueeze(1)).squeeze(1); + + // std should be close to 1 regardless of the original reward scale + EXPECT_NEAR(r_norm.std().item(), 1.0f, 0.15f) + << "Large-scale rewards must be normalized to unit std"; + + // mean should be ~true_mean/true_std = 5, not 0 and not 100 + float expected_mean = true_mean / true_std; + EXPECT_NEAR(r_norm.mean().item(), expected_mean, 0.5f) + << "Mean should be preserved as ~true_mean/true_std, not removed"; +} + int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv); diff --git a/tests/rl/test_rollout_buffer.cpp b/tests/rl/test_rollout_buffer.cpp index 469b7ae..9e8cc9f 100644 --- a/tests/rl/test_rollout_buffer.cpp +++ b/tests/rl/test_rollout_buffer.cpp @@ -16,6 +16,7 @@ */ #include "internal/rl/rollout_buffer.h" +#include "internal/rl/running_normalizer.h" #include #include @@ -288,6 +289,146 @@ TEST_P(RolloutBuffer, SaveRestore) { INSTANTIATE_TEST_SUITE_P(MultiEnv, RolloutBuffer, testing::Range(1, 3), testing::PrintToStringParamName()); +// ========================================================================= +// normalizeReturns tests +// These tests use n_env=1 for simplicity; the multi-env path is covered by +// the parameterized suite above for the base buffer operations. +// ========================================================================= + +// ---- NormalizeReturns: A = R - V relationship is preserved --------------- +// normalizeReturns scales both returns and advantages by the same factor, so +// the relationship A = R - V must still hold exactly after normalization. +TEST(NormalizeReturns, MaintainsAdvantageReturnRelationship) { + torch::manual_seed(42); + torch::NoGradGuard no_grad; + + const int buffer_size = 16; + const int n_env = 1; + + std::shared_ptr rbuff; + torch::Tensor last_val, last_done; + std::tie(rbuff, last_val, last_done) = getTestRolloutBuffer(buffer_size, n_env); + + // apply return normalization + rl::RunningNormalizer normalizer(1e-8f, /* scale_only = */ true); + rbuff->normalizeReturns(nullptr, normalizer); + + // verify A = R - V still holds for every entry + float max_violation = 0.f; + int n_steps = buffer_size / n_env; + for (int i = 0; i < n_steps; ++i) { + torch::Tensor s, a, r, q, log_p, adv, ret, d; + std::tie(s, a, r, q, log_p, adv, ret, d) = rbuff->getFull(i); + // ret = adv + q => adv - (ret - q) should be ~0 + float violation = torch::sum(torch::abs(adv - (ret - q))).item(); + max_violation = std::max(max_violation, violation); + } + + EXPECT_NEAR(max_violation, 0.f, 1e-5f) + << "A = R - V must hold after normalizeReturns (both scaled by same factor)"; +} + +// ---- NormalizeReturns: unit std, nonzero mean ---------------------------- +// After normalization the collection of all returns should have std ~1 but +// mean should NOT be zero (scale_only preserves the mean). +TEST(NormalizeReturns, UnitStdPreservedMean) { + torch::manual_seed(43); + torch::NoGradGuard no_grad; + + // Use a larger buffer to get a stable std estimate + const int buffer_size = 128; + const int n_env = 1; + + // Warm up the normalizer over several rollouts so it has stable stats + rl::RunningNormalizer normalizer(1e-8f, /* scale_only = */ true); + for (int rollout = 0; rollout < 20; ++rollout) { + std::shared_ptr rbuff; + torch::Tensor last_val, last_done; + std::tie(rbuff, last_val, last_done) = getTestRolloutBuffer(buffer_size, n_env); + rbuff->normalizeReturns(nullptr, normalizer); + } + + // Final rollout: check statistics of normalized returns + std::shared_ptr rbuff; + torch::Tensor last_val, last_done; + std::tie(rbuff, last_val, last_done) = getTestRolloutBuffer(buffer_size, n_env); + rbuff->normalizeReturns(nullptr, normalizer); + + // collect all normalized returns + std::vector ret_vec; + int n_steps = buffer_size / n_env; + for (int i = 0; i < n_steps; ++i) { + torch::Tensor s, a, r, q, log_p, adv, ret, d; + std::tie(s, a, r, q, log_p, adv, ret, d) = rbuff->getFull(i); + ret_vec.push_back(ret); + } + auto all_ret = torch::cat(ret_vec, 0).flatten().to(torch::kFloat32); + + // std should be ~1 (scale normalization) + float out_std = all_ret.std().item(); + EXPECT_NEAR(out_std, 1.0f, 0.2f) + << "Normalized returns should have std ~1"; + + // mean should NOT be zero (scale_only: mean is preserved) + // The test buffer uses positive rewards (dist uniform in [1,5]) so returns > 0 + float out_mean = all_ret.mean().item(); + EXPECT_GT(out_mean, 0.1f) + << "Normalized returns should have nonzero mean (scale_only preserves mean)"; +} + +// ---- NormalizeReturns + NormalizeAdvantages: correct combined effect ------ +// When both are applied in order (returns first, advantages second), the +// end state should be: returns have unit std + nonzero mean, advantages +// have unit std + zero mean. +TEST(NormalizeReturns, OrderWithAdvantageNormalization) { + torch::manual_seed(44); + torch::NoGradGuard no_grad; + + const int buffer_size = 64; + const int n_env = 1; + + // Warm up the return normalizer + rl::RunningNormalizer ret_normalizer(1e-8f, /* scale_only = */ true); + for (int rollout = 0; rollout < 10; ++rollout) { + std::shared_ptr rbuff; + torch::Tensor last_val, last_done; + std::tie(rbuff, last_val, last_done) = getTestRolloutBuffer(buffer_size, n_env); + rbuff->normalizeReturns(nullptr, ret_normalizer); + } + + // Final rollout: apply both normalizations in the correct order + std::shared_ptr rbuff; + torch::Tensor last_val, last_done; + std::tie(rbuff, last_val, last_done) = getTestRolloutBuffer(buffer_size, n_env); + + rbuff->normalizeReturns(nullptr, ret_normalizer); // step 1: scale R and A by return std + rbuff->normalizeAdvantages(nullptr); // step 2: zero-center and unit-std A + + // collect normalized returns and advantages + std::vector ret_vec, adv_vec; + int n_steps = buffer_size / n_env; + for (int i = 0; i < n_steps; ++i) { + torch::Tensor s, a, r, q, log_p, adv, ret, d; + std::tie(s, a, r, q, log_p, adv, ret, d) = rbuff->getFull(i); + ret_vec.push_back(ret); + adv_vec.push_back(adv); + } + auto all_ret = torch::cat(ret_vec, 0).flatten().to(torch::kFloat32); + auto all_adv = torch::cat(adv_vec, 0).flatten().to(torch::kFloat32); + + // returns: unit std, nonzero mean + EXPECT_NEAR(all_ret.std().item(), 1.0f, 0.2f) + << "Returns should have std ~1 after normalizeReturns"; + EXPECT_GT(all_ret.mean().item(), 0.1f) + << "Returns should have nonzero mean after normalizeReturns (scale_only)"; + + // advantages: unit std, zero mean + EXPECT_NEAR(all_adv.std().item(), 1.0f, 0.1f) + << "Advantages should have std ~1 after normalizeAdvantages"; + EXPECT_NEAR(all_adv.mean().item(), 0.0f, 0.1f) + << "Advantages should have zero mean after normalizeAdvantages"; +} + int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv); diff --git a/tests/rl/test_running_normalizer.cpp b/tests/rl/test_running_normalizer.cpp index 8a67070..54e7bc0 100644 --- a/tests/rl/test_running_normalizer.cpp +++ b/tests/rl/test_running_normalizer.cpp @@ -315,6 +315,81 @@ TEST(RunningNormalizerScaleOnly, CheckpointPreservesMode) { } } +// ---- Test 9: sign preservation ------------------------------------------- +// For reward normalization, dividing by std must never flip the sign of a +// reward. Positive rewards must stay positive and negative rewards must stay +// negative after normalization. This is the key property that distinguishes +// scale_only from full normalization for the reward use case. +TEST(RunningNormalizerScaleOnly, SignPreservation) { + torch::manual_seed(203); + torch::NoGradGuard no_grad; + + // Rewards are strictly positive (e.g. sparse +1 reward task) + rl::RunningNormalizer pos_normalizer(1e-8f, /* scale_only = */ true); + for (int i = 0; i < 300; ++i) { + // uniform in [0.5, 2.0]: always positive + auto rewards = torch::rand({100, 1}) * 1.5f + 0.5f; + pos_normalizer.update(rewards); + } + auto pos_probe = torch::rand({1000, 1}) * 1.5f + 0.5f; + auto pos_normalized = pos_normalizer.normalize(pos_probe); + EXPECT_TRUE((pos_normalized > 0).all().item()) + << "scale_only: all positive rewards must remain positive after normalization"; + + // Rewards with mixed signs: positive and negative values + rl::RunningNormalizer mixed_normalizer(1e-8f, /* scale_only = */ true); + for (int i = 0; i < 300; ++i) { + auto rewards = torch::randn({100, 1}) * 2.0f; // mean=0, some positive, some negative + mixed_normalizer.update(rewards); + } + // A clearly positive value must normalize to a positive value + auto clearly_positive = torch::ones({1, 1}) * 5.0f; + auto clearly_negative = torch::ones({1, 1}) * -5.0f; + EXPECT_GT(mixed_normalizer.normalize(clearly_positive)[0][0].item(), 0.0f) + << "scale_only: clearly positive reward must normalize to positive value"; + EXPECT_LT(mixed_normalizer.normalize(clearly_negative)[0][0].item(), 0.0f) + << "scale_only: clearly negative reward must normalize to negative value"; +} + +// ---- Test 10: large-scale reward normalization ---------------------------- +// Simulate a task with large reward magnitudes (e.g. a control task where +// rewards are in the hundreds). The normalizer should scale them to unit std +// while preserving the mean, making the scale task-agnostic. +TEST(RunningNormalizerScaleOnly, LargeScaleRewards) { + torch::manual_seed(204); + torch::NoGradGuard no_grad; + + // Rewards ~ N(mean=100, std=20): large positive values typical of dense reward tasks + const float reward_mean = 100.0f; + const float reward_std = 20.0f; + + rl::RunningNormalizer normalizer(1e-8f, /* scale_only = */ true); + for (int i = 0; i < 500; ++i) { + auto rewards = torch::randn({100, 1}) * reward_std + reward_mean; + normalizer.update(rewards); + } + + // Normalize a large fresh batch + const int test_size = 10000; + auto test_rewards = torch::randn({test_size, 1}) * reward_std + reward_mean; + auto normalized = normalizer.normalize(test_rewards); + + // std should be ~1 (scale normalization worked) + float out_std = normalized.std().item(); + EXPECT_NEAR(out_std, 1.0f, 0.05f) + << "Large-scale rewards should be scaled to unit std"; + + // mean should be ~reward_mean / reward_std = 5.0 (mean is preserved, not removed) + float out_mean = normalized.mean().item(); + float expected_mean = reward_mean / reward_std; + EXPECT_NEAR(out_mean, expected_mean, 0.1f) + << "Large-scale rewards: mean should be preserved as ~mean/std after scale normalization"; + + // all values should still be positive (since mean >> std, all rewards are positive) + EXPECT_TRUE((normalized > 0).all().item()) + << "All rewards should remain positive after scale normalization"; +} + int main(int argc, char* argv[]) { ::testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); From 917fbdc325641c3578c114e1fe68e1f884a4488b Mon Sep 17 00:00:00 2001 From: Thorsten Kurth Date: Mon, 20 Apr 2026 12:14:13 +0200 Subject: [PATCH 09/15] fixing indentation Signed-off-by: Thorsten Kurth --- src/csrc/include/internal/rl/on_policy/ppo.h | 4 +-- src/csrc/rl/off_policy/ddpg.cpp | 4 +-- src/csrc/rl/off_policy/sac.cpp | 6 ++-- src/csrc/rl/off_policy/td3.cpp | 4 +-- tests/rl/test_replay_buffer.cpp | 38 ++++++++++---------- tests/rl/test_rollout_buffer.cpp | 22 +++++------- tests/rl/test_running_normalizer.cpp | 8 ++--- 7 files changed, 38 insertions(+), 48 deletions(-) diff --git a/src/csrc/include/internal/rl/on_policy/ppo.h b/src/csrc/include/internal/rl/on_policy/ppo.h index e0b410b..ea74992 100644 --- a/src/csrc/include/internal/rl/on_policy/ppo.h +++ b/src/csrc/include/internal/rl/on_policy/ppo.h @@ -300,8 +300,8 @@ class PPOSystem : public RLOnPolicySystem, public std::enable_shared_from_this supported_params{"batch_size", "nstep", "nstep_reward_reduction", - "gamma", "rho", "normalize_states", "normalize_rewards"}; + std::set supported_params{ + "batch_size", "nstep", "nstep_reward_reduction", "gamma", "rho", "normalize_states", "normalize_rewards"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; gamma_ = params.get_param("gamma")[0]; diff --git a/src/csrc/rl/off_policy/sac.cpp b/src/csrc/rl/off_policy/sac.cpp index 4649fb1..99e7ac1 100644 --- a/src/csrc/rl/off_policy/sac.cpp +++ b/src/csrc/rl/off_policy/sac.cpp @@ -62,9 +62,9 @@ SACSystem::SACSystem(const char* name, const YAML::Node& system_node, int model_ auto algo_node = system_node["algorithm"]; if (algo_node["parameters"]) { auto params = get_params(algo_node["parameters"]); - std::set supported_params{"batch_size", "num_critics", "nstep", "nstep_reward_reduction", - "gamma", "rho", "alpha", "target_entropy", - "normalize_states", "normalize_rewards"}; + std::set supported_params{ + "batch_size", "num_critics", "nstep", "nstep_reward_reduction", "gamma", + "rho", "alpha", "target_entropy", "normalize_states", "normalize_rewards"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; num_critics_ = params.get_param("num_critics", 2)[0]; diff --git a/src/csrc/rl/off_policy/td3.cpp b/src/csrc/rl/off_policy/td3.cpp index 139b0af..8ad9bee 100644 --- a/src/csrc/rl/off_policy/td3.cpp +++ b/src/csrc/rl/off_policy/td3.cpp @@ -35,8 +35,8 @@ TD3System::TD3System(const char* name, const YAML::Node& system_node, int model_ if (algo_node["parameters"]) { auto params = get_params(algo_node["parameters"]); std::set supported_params{ - "batch_size", "num_critics", "policy_lag", "nstep", "nstep_reward_reduction", - "gamma", "rho", "normalize_states", "normalize_rewards"}; + "batch_size", "num_critics", "policy_lag", "nstep", "nstep_reward_reduction", "gamma", + "rho", "normalize_states", "normalize_rewards"}; check_params(supported_params, params.keys()); batch_size_ = params.get_param("batch_size")[0]; num_critics_ = params.get_param("num_critics", 2)[0]; diff --git a/tests/rl/test_replay_buffer.cpp b/tests/rl/test_replay_buffer.cpp index cf2bc37..2e60e47 100644 --- a/tests/rl/test_replay_buffer.cpp +++ b/tests/rl/test_replay_buffer.cpp @@ -313,12 +313,12 @@ TEST(RewardNormalization, UnitStdPreservedMean) { // Rewards ~ N(mean=5, std=2): a typical dense-reward task distribution const float true_mean = 5.0f; - const float true_std = 2.0f; - const int n_envs = 1; - const int buffer_size = 512; + const float true_std = 2.0f; + const int n_envs = 1; + const int buffer_size = 512; - auto rbuff = std::make_shared( - buffer_size, buffer_size, n_envs, 0.99f, 1, rl::RewardReductionMode::Sum, -1); + auto rbuff = std::make_shared(buffer_size, buffer_size, n_envs, 0.99f, 1, + rl::RewardReductionMode::Sum, -1); rl::RunningNormalizer reward_normalizer(1e-8f, /* scale_only = */ true); @@ -329,7 +329,7 @@ TEST(RewardNormalization, UnitStdPreservedMean) { auto action = torch::zeros({n_envs, 2}, torch::kFloat32); auto next_state = state + 0.01f; auto reward = torch::randn({n_envs}, torch::kFloat32) * true_std + true_mean; - auto done = torch::zeros({n_envs}, torch::kFloat32); + auto done = torch::zeros({n_envs}, torch::kFloat32); // mirror the system's updateReplayBuffer call reward_normalizer.update(reward.unsqueeze(1)); @@ -346,8 +346,7 @@ TEST(RewardNormalization, UnitStdPreservedMean) { auto r_norm = reward_normalizer.normalize(r.unsqueeze(1)).squeeze(1); // std of normalized rewards should be ~1 - EXPECT_NEAR(r_norm.std().item(), 1.0f, 0.15f) - << "Normalized rewards should have std ~1"; + EXPECT_NEAR(r_norm.std().item(), 1.0f, 0.15f) << "Normalized rewards should have std ~1"; // mean should be ~true_mean / true_std = 2.5 (not ~0) float expected_mean = true_mean / true_std; @@ -363,11 +362,11 @@ TEST(RewardNormalization, SignPreservation) { torch::NoGradGuard no_grad; // Rewards ~ Uniform(1, 5): always positive - const int n_envs = 1; + const int n_envs = 1; const int buffer_size = 256; - auto rbuff = std::make_shared( - buffer_size, buffer_size, n_envs, 0.99f, 1, rl::RewardReductionMode::Sum, -1); + auto rbuff = std::make_shared(buffer_size, buffer_size, n_envs, 0.99f, 1, + rl::RewardReductionMode::Sum, -1); rl::RunningNormalizer reward_normalizer(1e-8f, /* scale_only = */ true); @@ -377,7 +376,7 @@ TEST(RewardNormalization, SignPreservation) { auto next_state = state + 0.01f; // strictly positive rewards in [1, 5] auto reward = torch::rand({n_envs}, torch::kFloat32) * 4.0f + 1.0f; - auto done = torch::zeros({n_envs}, torch::kFloat32); + auto done = torch::zeros({n_envs}, torch::kFloat32); reward_normalizer.update(reward.unsqueeze(1)); rbuff->update(state, action, next_state, reward, done); @@ -401,12 +400,12 @@ TEST(RewardNormalization, LargeScaleNormalizedToUnitStd) { torch::NoGradGuard no_grad; const float true_mean = 100.0f; - const float true_std = 20.0f; - const int n_envs = 1; - const int buffer_size = 512; + const float true_std = 20.0f; + const int n_envs = 1; + const int buffer_size = 512; - auto rbuff = std::make_shared( - buffer_size, buffer_size, n_envs, 0.99f, 1, rl::RewardReductionMode::Sum, -1); + auto rbuff = std::make_shared(buffer_size, buffer_size, n_envs, 0.99f, 1, + rl::RewardReductionMode::Sum, -1); rl::RunningNormalizer reward_normalizer(1e-8f, /* scale_only = */ true); @@ -415,7 +414,7 @@ TEST(RewardNormalization, LargeScaleNormalizedToUnitStd) { auto action = torch::zeros({n_envs, 2}, torch::kFloat32); auto next_state = state + 0.01f; auto reward = torch::randn({n_envs}, torch::kFloat32) * true_std + true_mean; - auto done = torch::zeros({n_envs}, torch::kFloat32); + auto done = torch::zeros({n_envs}, torch::kFloat32); reward_normalizer.update(reward.unsqueeze(1)); rbuff->update(state, action, next_state, reward, done); @@ -427,8 +426,7 @@ TEST(RewardNormalization, LargeScaleNormalizedToUnitStd) { auto r_norm = reward_normalizer.normalize(r.unsqueeze(1)).squeeze(1); // std should be close to 1 regardless of the original reward scale - EXPECT_NEAR(r_norm.std().item(), 1.0f, 0.15f) - << "Large-scale rewards must be normalized to unit std"; + EXPECT_NEAR(r_norm.std().item(), 1.0f, 0.15f) << "Large-scale rewards must be normalized to unit std"; // mean should be ~true_mean/true_std = 5, not 0 and not 100 float expected_mean = true_mean / true_std; diff --git a/tests/rl/test_rollout_buffer.cpp b/tests/rl/test_rollout_buffer.cpp index 9e8cc9f..2399ec2 100644 --- a/tests/rl/test_rollout_buffer.cpp +++ b/tests/rl/test_rollout_buffer.cpp @@ -324,8 +324,7 @@ TEST(NormalizeReturns, MaintainsAdvantageReturnRelationship) { max_violation = std::max(max_violation, violation); } - EXPECT_NEAR(max_violation, 0.f, 1e-5f) - << "A = R - V must hold after normalizeReturns (both scaled by same factor)"; + EXPECT_NEAR(max_violation, 0.f, 1e-5f) << "A = R - V must hold after normalizeReturns (both scaled by same factor)"; } // ---- NormalizeReturns: unit std, nonzero mean ---------------------------- @@ -366,14 +365,12 @@ TEST(NormalizeReturns, UnitStdPreservedMean) { // std should be ~1 (scale normalization) float out_std = all_ret.std().item(); - EXPECT_NEAR(out_std, 1.0f, 0.2f) - << "Normalized returns should have std ~1"; + EXPECT_NEAR(out_std, 1.0f, 0.2f) << "Normalized returns should have std ~1"; // mean should NOT be zero (scale_only: mean is preserved) // The test buffer uses positive rewards (dist uniform in [1,5]) so returns > 0 float out_mean = all_ret.mean().item(); - EXPECT_GT(out_mean, 0.1f) - << "Normalized returns should have nonzero mean (scale_only preserves mean)"; + EXPECT_GT(out_mean, 0.1f) << "Normalized returns should have nonzero mean (scale_only preserves mean)"; } // ---- NormalizeReturns + NormalizeAdvantages: correct combined effect ------ @@ -401,8 +398,8 @@ TEST(NormalizeReturns, OrderWithAdvantageNormalization) { torch::Tensor last_val, last_done; std::tie(rbuff, last_val, last_done) = getTestRolloutBuffer(buffer_size, n_env); - rbuff->normalizeReturns(nullptr, ret_normalizer); // step 1: scale R and A by return std - rbuff->normalizeAdvantages(nullptr); // step 2: zero-center and unit-std A + rbuff->normalizeReturns(nullptr, ret_normalizer); // step 1: scale R and A by return std + rbuff->normalizeAdvantages(nullptr); // step 2: zero-center and unit-std A // collect normalized returns and advantages std::vector ret_vec, adv_vec; @@ -417,16 +414,13 @@ TEST(NormalizeReturns, OrderWithAdvantageNormalization) { auto all_adv = torch::cat(adv_vec, 0).flatten().to(torch::kFloat32); // returns: unit std, nonzero mean - EXPECT_NEAR(all_ret.std().item(), 1.0f, 0.2f) - << "Returns should have std ~1 after normalizeReturns"; + EXPECT_NEAR(all_ret.std().item(), 1.0f, 0.2f) << "Returns should have std ~1 after normalizeReturns"; EXPECT_GT(all_ret.mean().item(), 0.1f) << "Returns should have nonzero mean after normalizeReturns (scale_only)"; // advantages: unit std, zero mean - EXPECT_NEAR(all_adv.std().item(), 1.0f, 0.1f) - << "Advantages should have std ~1 after normalizeAdvantages"; - EXPECT_NEAR(all_adv.mean().item(), 0.0f, 0.1f) - << "Advantages should have zero mean after normalizeAdvantages"; + EXPECT_NEAR(all_adv.std().item(), 1.0f, 0.1f) << "Advantages should have std ~1 after normalizeAdvantages"; + EXPECT_NEAR(all_adv.mean().item(), 0.0f, 0.1f) << "Advantages should have zero mean after normalizeAdvantages"; } int main(int argc, char* argv[]) { diff --git a/tests/rl/test_running_normalizer.cpp b/tests/rl/test_running_normalizer.cpp index 54e7bc0..c4fa7dd 100644 --- a/tests/rl/test_running_normalizer.cpp +++ b/tests/rl/test_running_normalizer.cpp @@ -361,7 +361,7 @@ TEST(RunningNormalizerScaleOnly, LargeScaleRewards) { // Rewards ~ N(mean=100, std=20): large positive values typical of dense reward tasks const float reward_mean = 100.0f; - const float reward_std = 20.0f; + const float reward_std = 20.0f; rl::RunningNormalizer normalizer(1e-8f, /* scale_only = */ true); for (int i = 0; i < 500; ++i) { @@ -376,8 +376,7 @@ TEST(RunningNormalizerScaleOnly, LargeScaleRewards) { // std should be ~1 (scale normalization worked) float out_std = normalized.std().item(); - EXPECT_NEAR(out_std, 1.0f, 0.05f) - << "Large-scale rewards should be scaled to unit std"; + EXPECT_NEAR(out_std, 1.0f, 0.05f) << "Large-scale rewards should be scaled to unit std"; // mean should be ~reward_mean / reward_std = 5.0 (mean is preserved, not removed) float out_mean = normalized.mean().item(); @@ -386,8 +385,7 @@ TEST(RunningNormalizerScaleOnly, LargeScaleRewards) { << "Large-scale rewards: mean should be preserved as ~mean/std after scale normalization"; // all values should still be positive (since mean >> std, all rewards are positive) - EXPECT_TRUE((normalized > 0).all().item()) - << "All rewards should remain positive after scale normalization"; + EXPECT_TRUE((normalized > 0).all().item()) << "All rewards should remain positive after scale normalization"; } int main(int argc, char* argv[]) { From 42e2b3137b075f6cc93d93a1fb9763b605539b47 Mon Sep 17 00:00:00 2001 From: Thorsten Kurth Date: Mon, 20 Apr 2026 12:23:30 +0200 Subject: [PATCH 10/15] updating documentation Signed-off-by: Thorsten Kurth --- docs/api/config.rst | 282 +++++++++++++++++++++++++++----------------- 1 file changed, 174 insertions(+), 108 deletions(-) diff --git a/docs/api/config.rst b/docs/api/config.rst index 72539d1..a2992fc 100644 --- a/docs/api/config.rst +++ b/docs/api/config.rst @@ -286,65 +286,95 @@ The following table lists the available algorithm types: The following table lists the available options by algorithm type: -+----------------+-------------+------------------------------+------------+-------------------------------------------------------------------------------------------+ -| Algorithm Name | Kind | Option | Data Type | Description | -+================+=============+==============================+============+===========================================================================================+ -| ``ddpg`` | off policy | ``batch_size`` | integer | batch size used in training | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``nstep`` | integer | number of steps for N-step training | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``nstep_reward_reduction`` | string | reduction mode for N-step training (see below) | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``gamma`` | float | discount factor | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``rho`` | boolean | weight average factor for target weights (in some frameworks called rho = 1-tau) | -+----------------+-------------+------------------------------+------------+-------------------------------------------------------------------------------------------+ -| ``td3`` | off policy | ``batch_size`` | integer | batch size used in training | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``nstep`` | integer | number of steps for N-step training | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``nstep_reward_reduction`` | string | reduction mode for N-step training (see below) | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``gamma`` | float | discount factor | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``rho`` | float | weight average factor for target weights (in some frameworks called rho = 1-tau) | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``num_critics`` | integer | number of critic networks used | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``policy_lag`` | integer | update frequency for the policy in units of critic updates | -+----------------+-------------+------------------------------+------------+-------------------------------------------------------------------------------------------+ -| ``sac`` | off policy | ``batch_size`` | integer | batch size used in training | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``nstep`` | integer | number of steps for N-step training | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``nstep_reward_reduction`` | string | reduction mode for N-step training (see below) | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``gamma`` | float | discount factor | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``alpha`` | float | entropy regularization coefficient | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``rho`` | boolean | weight average factor for target weights (in some frameworks called rho = 1-tau) | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``policy_lag`` | integer | update frequency for the policy in units of value updates | -+----------------+-------------+------------------------------+------------+-------------------------------------------------------------------------------------------+ -| ``ppo`` | on policy | ``batch_size`` | integer | batch size used in training | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``gae_lambda`` | float | discount factor for General Advantage Estimator | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``epsilon`` | float | clip ratio, policy discrepancy regularization | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``gamma`` | float | discount factor | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``clip_q`` | float | clip range for value function estimate (denoted by `clip_vf` in Stable Baselines) | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``target_kl_divergence`` | float | target KL divergence for KL regularization | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``entropy_loss_coefficient`` | float | entropy loss coefficient: weight for entropy component of the loss function | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``value_loss_coefficient`` | float | value loss coefficient: weight for value estimate component of the loss function | -+ + +------------------------------+------------+-------------------------------------------------------------------------------------------+ -| | | ``normalize_advantage`` | boolean | if set to true, advantage values are normalized over all buffer entries | -+----------------+-------------+------------------------------+------------+-------------------------------------------------------------------------------------------+ ++----------------+-------------+------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| Algorithm Name | Kind | Option | Data Type | Description | ++================+=============+==============================+============+==================================================================================================+ +| ``ddpg`` | off policy | ``batch_size`` | integer | batch size used in training | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``nstep`` | integer | number of steps for N-step training | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``nstep_reward_reduction`` | string | reduction mode for N-step training (see below) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``gamma`` | float | discount factor | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``rho`` | float | weight average factor for target weights (in some frameworks called rho = 1-tau) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``normalize_states`` | boolean | enable online per-feature normalization of observations to zero mean and unit variance | +| | | | | using a running Welford estimator (default = ``false``) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``normalize_rewards`` | boolean | enable running std normalization of rewards (scale only, mean preserved) (default = ``false``) | ++----------------+-------------+------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| ``td3`` | off policy | ``batch_size`` | integer | batch size used in training | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``nstep`` | integer | number of steps for N-step training | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``nstep_reward_reduction`` | string | reduction mode for N-step training (see below) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``gamma`` | float | discount factor | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``rho`` | float | weight average factor for target weights (in some frameworks called rho = 1-tau) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``num_critics`` | integer | number of critic networks used | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``policy_lag`` | integer | update frequency for the policy in units of critic updates | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``normalize_states`` | boolean | enable online per-feature normalization of observations to zero mean and unit variance | +| | | | | using a running Welford estimator (default = ``false``) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``normalize_rewards`` | boolean | enable running std normalization of rewards (scale only, mean preserved) (default = ``false``) | ++----------------+-------------+------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| ``sac`` | off policy | ``batch_size`` | integer | batch size used in training | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``nstep`` | integer | number of steps for N-step training | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``nstep_reward_reduction`` | string | reduction mode for N-step training. Note: only ``sum``, ``mean``, and ``weighted_mean`` are | +| | | | | supported for SAC; the ``_no_skip`` variants are not available | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``gamma`` | float | discount factor | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``rho`` | float | weight average factor for target weights (in some frameworks called rho = 1-tau) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``num_critics`` | integer | number of critic networks used (default = ``2``) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``alpha`` | float | initial entropy regularization coefficient (default = ``0.0``, i.e. disabled) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``target_entropy`` | float | target entropy for automatic alpha tuning; positive values trigger the heuristic | +| | | | | ``-action_dim`` (default = ``1.0``, i.e. use heuristic) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``normalize_states`` | boolean | enable online per-feature normalization of observations to zero mean and unit variance | +| | | | | using a running Welford estimator (default = ``false``) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``normalize_rewards`` | boolean | enable running std normalization of rewards (scale only, mean preserved) (default = ``false``). | +| | | | | **Strongly recommended** when using ``alpha_optimizer``: reward normalization keeps Q-values | +| | | | | on a consistent scale, making the automatic entropy tuning robust across tasks with different | +| | | | | reward magnitudes. | ++----------------+-------------+------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| ``ppo`` | on policy | ``batch_size`` | integer | batch size used in training | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``gae_lambda`` | float | discount factor for General Advantage Estimator | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``epsilon`` | float | clip ratio, policy discrepancy regularization | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``gamma`` | float | discount factor | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``clip_q`` | float | clip range for value function estimate (denoted by ``clip_vf`` in Stable Baselines) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``target_kl_divergence`` | float | target KL divergence for early stopping of gradient steps | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``entropy_loss_coefficient`` | float | entropy loss coefficient: weight for entropy component of the loss function | +| | | | | (default = ``0.0``; a value of ``0.01`` is a common starting point for discrete action spaces) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``value_loss_coefficient`` | float | value loss coefficient: weight for value estimate component of the loss function | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``normalize_advantage`` | boolean | normalize advantage values over the full rollout before mini-batch training (default = ``true``) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``normalize_states`` | boolean | enable online per-feature normalization of observations to zero mean and unit variance | +| | | | | using a running Welford estimator (default = ``false``) | ++ + +------------------------------+------------+--------------------------------------------------------------------------------------------------+ +| | | ``normalize_returns`` | boolean | enable running std normalization of GAE returns (scale only, mean preserved). Also scales | +| | | | | advantages by the same factor for consistency. Applied before ``normalize_advantage`` | +| | | | | (default = ``false``) | ++----------------+-------------+------------------------------+------------+--------------------------------------------------------------------------------------------------+ The parameter ``nstep_reward_reduction`` defines how the reward is accumulated over N-step rollouts. The options are summarized in a table below (:math:`N` is the value from parameter ``nstep`` described above): @@ -426,71 +456,83 @@ The block in the configuration file defining actor properties takes the followin The following table lists the available options for every action type for ``ddpg`` and ``td3`` algorithms: -+----------------------------------------------+-------------------+------------+-------------------------------------------------------------------+ -| Actor Type | Option | Data Type | Description | -+==============================================+===================+============+===================================================================+ -| ``space_noise`` or ``parameter_noise`` | ``a_low`` | float | lower bound for action value | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``a_high`` | float | upper bound for action value | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``clip`` | float | clip value for training noise | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``sigma_train`` | float | standard deviation for gaussian training noise | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``sigma_explore`` | float | standard deviation for gaussian exploration noise | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``adaptive`` | bool | flag to specify whether the standard deviation should be adaptive | -+----------------------------------------------+-------------------+------------+-------------------------------------------------------------------+ -| ``space_noise_ou`` or ``parameter_noise_ou`` | ``a_low`` | float | lower bound for action value | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``a_high`` | float | upper bound for action value | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``clip`` | float | clip value for training noise | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``sigma_train`` | float | standard deviation for Ornstein-Uhlenbeck training noise | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``sigma_explore`` | float | standard deviation for Ornstein-Uhlenbeck exploration noise | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``xi`` | float | mean reversion parameter for Ornstein-Uhlenbeck noise | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``dt`` | float | time-step parameter for Ornstein-Uhlenbeck noise | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``adaptive`` | bool | flag to specify whether the standard deviation should be adaptive | -+----------------------------------------------+-------------------+------------+-------------------------------------------------------------------+ -| ``gaussian_ac`` | ``a_low`` | float | lower bound for action value | -+ +-------------------+------------+-------------------------------------------------------------------+ -| | ``a_high`` | float | upper bound for action value | -+----------------------------------------------+-------------------+------------+-------------------------------------------------------------------+ - -The meaning for most of these parameters should be evident from looking at the details of the implementations for the various RL algorithms linked above. ++----------------------------------------------+-------------------+------------+------------------------------------------------------------------------------------------------------+ +| Actor Type | Option | Data Type | Description | ++==============================================+===================+============+======================================================================================================+ +| ``space_noise`` or ``parameter_noise`` | ``a_low`` | float | lower bound for action value | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``a_high`` | float | upper bound for action value | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``clip`` | float | clip magnitude for target policy smoothing noise, i.e. :math:`\varepsilon \sim | +| | | | \mathrm{clip}(\mathcal{N}(0,\sigma_\mathrm{train}), -\mathrm{clip}, \mathrm{clip})`. | +| | | | TD3 paper recommends ``0.5`` | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``sigma_train`` | float | standard deviation for **target policy smoothing** noise (TD3 only): noise added to the target | +| | | | actor when computing Bellman targets, not during rollout collection. TD3 paper recommends ``0.2``. | +| | | | For DDPG, this parameter is unused as DDPG does not use target policy smoothing. | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``sigma_explore`` | float | standard deviation for exploration noise added to the live policy during rollout collection. | +| | | | TD3 paper recommends ``0.1`` | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``adaptive`` | bool | flag to specify whether the standard deviation should be adaptive | ++----------------------------------------------+-------------------+------------+------------------------------------------------------------------------------------------------------+ +| ``space_noise_ou`` or ``parameter_noise_ou`` | ``a_low`` | float | lower bound for action value | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``a_high`` | float | upper bound for action value | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``clip`` | float | clip magnitude for target policy smoothing noise (see above) | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``sigma_train`` | float | standard deviation for Ornstein-Uhlenbeck target policy smoothing noise (see above). | +| | | | **Warning (TD3 only):** OU noise is temporally correlated and violates the i.i.d. assumption | +| | | | required by TD3 target policy smoothing. Prefer ``space_noise`` for this purpose. | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``sigma_explore`` | float | standard deviation for Ornstein-Uhlenbeck exploration noise during rollout collection | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``xi`` | float | mean reversion parameter for Ornstein-Uhlenbeck noise | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``dt`` | float | time-step parameter for Ornstein-Uhlenbeck noise | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``adaptive`` | bool | flag to specify whether the standard deviation should be adaptive | ++----------------------------------------------+-------------------+------------+------------------------------------------------------------------------------------------------------+ +| ``gaussian_ac`` or ``squashed_gaussian_ac`` | ``a_low`` | float | lower bound for action value | ++ +-------------------+------------+------------------------------------------------------------------------------------------------------+ +| | ``a_high`` | float | upper bound for action value | ++----------------------------------------------+-------------------+------------+------------------------------------------------------------------------------------------------------+ + +The meaning for most of these parameters should be evident from looking at the details of the implementations for the various RL algorithms linked above. However, some parameters require a more detailed explanation: in general, the suffix ``_ou`` refers to stateful noise of Ornstein-Uhlenbeck type with zero drift. This noise type is often used if correlation between time steps is desired and thus popular in reinforcement learning. Check out the `wikipedia page `_ for details. -The prefix ``space`` refers to applying the noise to the predicted ation directly. For example, if :math:`p` is our (deterministic) policy function, an exploration action using space noise type is obtained by computing +The prefix ``space`` refers to applying the noise to the predicted action directly. For example, if :math:`p` is our (deterministic) policy function, an exploration action using space noise type is obtained by computing .. math:: - \tilde{a} = \mathrm{clip}(p(\theta, s) + \mathcal{N}(0,\sigma_\mathrm{explore}), a_\mathrm{low}, a_\mathrm{high}) - -for any input state :math:`s` and policy weights :math:`\theta`. In case of parameter noise, the noise will be applied to each weight of :math:`p` instead. Hence, the noised action is computed via + \tilde{a} = \mathrm{clip}(p(\theta, s) + \mathcal{N}(0,\sigma_\mathrm{explore}), a_\mathrm{low}, a_\mathrm{high}) + +for any input state :math:`s` and policy weights :math:`\theta`. In case of parameter noise, the noise will be applied to each weight of :math:`p` instead. Hence, the noised action is computed via .. math:: - \tilde{a} = \mathrm{clip}(p(\theta + \mathcal{N}(0,\sigma_\mathrm{explore}), s), a_\mathrm{low}, a_\mathrm{high}) - + \tilde{a} = \mathrm{clip}(p(\theta + \mathcal{N}(0,\sigma_\mathrm{explore}), s), a_\mathrm{low}, a_\mathrm{high}) + The parameter ``adaptive`` specifies whether the noise variance :math:`\sigma` should be taken relative to the magnitude of the action magnitudes or weight magnitudes for space and parameter noise respectively. In terms of the former, this would mean that .. math:: - + a &= p(\theta, s) - - \tilde{a} &= \mathrm{clip}(a + \sigma_\mathrm{explore}\,\mathcal{N}(0,\|a\|), a_\mathrm{low}, a_\mathrm{high}) + + \tilde{a} &= \mathrm{clip}(a + \sigma_\mathrm{explore}\,\mathcal{N}(0,\|a\|), a_\mathrm{low}, a_\mathrm{high}) and analogous for parameter noise. Whichever noise type and parameters are the best highly depends on the behavior of the environment and therefore we cannot give a general recommendation. -For algorithm type ``sac``, only action bounds are supported as the noise is built into the algorithm and cannot be customized. -For algorithm type ``ppo``, ``gaussian_ac`` is the only supported actor type. +.. note:: + + **TD3 target policy smoothing:** ``sigma_train`` and ``clip`` control the noise added to the *target* actor when computing Bellman targets — this is TD3's target policy smoothing regularization, not noise applied during rollout collection. These two roles (target smoothing vs. exploration) are intentionally separate and should be tuned independently. For DDPG, ``sigma_train`` has no effect as DDPG does not use target policy smoothing. + +For algorithm type ``sac``, only action bounds are required as the stochastic policy with squashed Gaussian noise is built into the algorithm. The actor type for SAC is always ``gaussian`` (squashed Gaussian policy) and cannot be customized. + +For algorithm type ``ppo``, two actor types are supported: ``gaussian_ac`` uses a standard Gaussian policy with action clipping, while ``squashed_gaussian_ac`` uses a squashed (tanh-bounded) Gaussian policy with action scaling — the latter is recommended when the action space requires strict bounds. Policy and Critic Properties ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ @@ -605,7 +647,31 @@ The block configuration for DDPG and TD3 looks as follows: parameters: