diff --git a/README.md b/README.md
index e245989c..e0477c41 100644
--- a/README.md
+++ b/README.md
@@ -17,6 +17,8 @@ This repository is devoted to **QligFEP**, an automated workflow for small molec
- [Compiling Q for local use (non-MPI)](#compiling-q-for-local-use-non-mpi)
- [Setting up HPC configurations](#setting-up-hpc-configurations)
- [⌨️ Command line interface (CLI)](#️-command-line-interface-cli)
+- [Tutorials](#tutorials)
+ - [Non-equilibrium FEP (NEQ²)](#non-equilibrium-fep-neq2)
- [📊 Benchmarking](#-benchmarking)
- [📚 Citations](#-citations)
- [⏩ Q-GPU](#-q-gpu)
@@ -142,30 +144,48 @@ Now you're set with the qligfep package. This includes the command-linde-interfa
5. `qlomap`: wraps `Lomap` to generate the `.json` perturbation mapping;
6. `qmapfep`: in-house developed method to generate the `.json` perturbation mapping, interactively visualize and add or remove edges.
7. `qligfep`: main CLI for running QligFEP simulations.
-8. `setupFEP`: sets up all the the QligFEP files for a simulation, including protein and water systems.
+8. `setupFEP`: sets up all the QligFEP files for a simulation, including protein and water systems. Pass `--neq` to set up the non-equilibrium (NEQ²) workflow instead of the windowed one.
9. `qligfep_analyze`: CLI to analyze the results of a QligFEP simulation.
10. `ligalign`: aligns a set of ligands to a reference ligand based on their maximum common substructure (MCS).
+11. `qligfep_neq_analyze`: CLI to analyze the results of a non-equilibrium (NEQ²) QligFEP simulation.
## Tutorials
-We are working on the documentation and tutorials for QligFEP. In the meantime, please refer to the Tyk2 case study available in the [tutorials directory](/tutorials/Tyk2/README.md). In addition to that, you can check the [benchmarking section](#-benchmarking) below, which contains the link to our benchmarking repository with scripts to reproduce the results.
+We are working on the documentation and tutorials for QligFEP. In the meantime, please refer to the Tyk2 case study available in the [tutorials directory](/tutorials/Tyk2/README.md). A dedicated [non-equilibrium (NEQ²) tutorial](/tutorials/Tyk2/neq2/README.md) walks through the NEQ² workflow end to end. In addition to that, you can check the [benchmarking section](#-benchmarking) below, which contains the link to our benchmarking repository with scripts to reproduce the results.
-# 📊 Benchmarking
+
-To check and reproduce QligFEP performance results, please refer to our [benchmarking repository](https://github.com/qusers/qligfepv2-BenchmarkExperiments).
+### Non-equilibrium FEP (NEQ²)
-For the preprint describing the benchmarking results, see:
+Alongside the standard windowed (equilibrium) protocol, QligFEP supports a **non-equilibrium** alchemical workflow, referred to as **NEQ²**. Rather than sampling many fixed-λ windows, NEQ² drives λ continuously from one end state to the other over many short, independent switching trajectories, and recovers ΔΔG from the Bennett Acceptance Ratio (BAR) over the resulting forward and reverse work distributions. Because the switching trajectories are independent, they parallelize trivially across a cluster.
-> Alencar Araripe D, Díaz Holguín A, Poso A, van Westen GJP, Åqvist J, Gutiérrez-de-Terán H, et al. Doing More with Less: Accurate and Scalable Ligand Free Energy Calculations by Focusing on the Binding Site. ChemRxiv. 2025; [doi:10.26434/chemrxiv-2025-x3r3z](https://doi.org/10.26434/chemrxiv-2025-x3r3z-v3)
+Set up a non-equilibrium calculation by passing `--neq` to `setupFEP`, and analyze the accumulated switching work with the `qligfep_neq_analyze` CLI. The non-equilibrium engine (`qdyn_neq`) is built together with the other Q binaries by `make all` in `src/q6`. See the [Tyk2 NEQ² tutorial](/tutorials/Tyk2/neq2/README.md) for an end-to-end walkthrough.
-# 📚 Citations
-Q6: https://doi.org/10.1016/j.softx.2017.12.001
+# 📊 Benchmarking
-Q https://doi.org/10.1016/S1093-3263(98)80006-5
+To check and reproduce QligFEP performance results, please refer to our [benchmarking repository](https://github.com/qusers/qligfepv2-BenchmarkExperiments).
-QligFEP: https://doi.org/10.1186/s13321-019-0348-5
+# 📚 Citations
+
+To cite the latest version of QligFEP, cite:
+```bibtex
+@article{araripe2026qligfepv2,
+ author = {Alencar Araripe, David and Díaz-Holguín, Alejandro and Poso, Antti and van Westen, Gerard J. P. and Åqvist, Johan and Gutiérrez-de-Terán, Hugo and Jespers, Willem},
+ title = {Doing More with Less: Accurate and Scalable Ligand Free Energy Calculations by Focusing on the Binding Site},
+ journal = {Journal of Chemical Information and Modeling},
+ year = {2026},
+ volume = {66},
+ number = {6},
+ pages = {3164--3172},
+ doi = {10.1021/acs.jcim.5c02932},
+ url = {https://doi.org/10.1021/acs.jcim.5c02932},
+}
+```
+**Other relevant references:**
-QresFEP: https://doi.org/10.1021/acs.jctc.9b00538
+- Q: https://doi.org/10.1016/S1093-3263(98)80006-5
+- QligFEP: https://doi.org/10.1186/s13321-019-0348-5
+- QresFEP: https://doi.org/10.1021/acs.jctc.9b00538
# ⏩ Q-GPU
diff --git a/pyproject.toml b/pyproject.toml
index 5bc58c8c..9ae4ad51 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -32,14 +32,24 @@ dependencies = [
"tqdm",
"py3Dmol",
"numpy",
+ "pandas",
"matplotlib",
"rdkit",
"scikit-learn",
"loguru",
- "cinnabar",
"MolClusterkit @ git+https://github.com/David-Araripe/MolClusterkit.git#master",
]
+[project.optional-dependencies]
+test = ["pytest"]
+
+[tool.pytest.ini_options]
+testpaths = ["test"]
+filterwarnings = [
+ # third-party (gufe/pydantic) deprecation surfaced when importing the package
+ "ignore:.*pydantic.config.Extra.*:DeprecationWarning",
+]
+
[tool.setuptools]
include-package-data = true
@@ -106,4 +116,5 @@ qmapfep = "QligFEP.CLI.qmapfep_cli:main_exe"
qligfep = "QligFEP.CLI.qligfep_cli:main_exe"
setupFEP = "QligFEP.CLI.setupFEP:main_exe"
qligfep_analyze = "QligFEP.analyze_FEP:main_exe"
+qligfep_neq_analyze = "QligFEP.analyze_neq:main_exe"
ligalign = "QligFEP.CLI.lig_align_cli:main_exe"
\ No newline at end of file
diff --git a/src/QligFEP/CLI/lomap_wrap_cli.py b/src/QligFEP/CLI/lomap_wrap_cli.py
index cdcbc8a1..5fac38ff 100644
--- a/src/QligFEP/CLI/lomap_wrap_cli.py
+++ b/src/QligFEP/CLI/lomap_wrap_cli.py
@@ -19,9 +19,18 @@
class LomapWrap:
"""Class to wrap the lomap package for the QligFEP CLI."""
- def __init__(self, inp: str, out: Optional[str] = None, time=30, verbose="info", **kwargs):
+ def __init__(
+ self,
+ inp: str,
+ out: Optional[str] = None,
+ time=30,
+ verbose="info",
+ exp_key: Optional[str] = None,
+ **kwargs,
+ ):
self.nodes = {}
self.inp = inp
+ self.exp_key = exp_key
self.out = self._parse_output(out)
self.cores = self._setup_cores()
self.lomap_args = {
@@ -158,17 +167,24 @@ def run_lomap(self) -> None:
db_mol = lomap.DBMolecules(**self.lomap_args)
rundir = db_mol.options["directory"]
- extra_data_numerical = []
for lomap_mol in db_mol._list:
mol = lomap_mol.getMolecule()
molpath = Path(rundir) / lomap_mol.getName()
extra_data = self.extract_user_defined_properties(molpath)
- extra_data_numerical.extend([k for k, v in extra_data.items() if isinstance(v, (int, float))])
name = molpath.stem # remove the file extension
smiles = Chem.MolToSmiles(mol)
formal_charge = Chem.GetFormalCharge(mol)
self.nodes.update({name: {"smiles": smiles, "formal_charge": formal_charge, **extra_data}})
- extra_data_numerical = list(set(extra_data_numerical)) # keep unique
+
+ # Rename experimental key to standardized dg_value on nodes
+ if self.exp_key:
+ for name, node in self.nodes.items():
+ if self.exp_key not in node:
+ raise KeyError(
+ f"exp_key '{self.exp_key}' not found in node '{name}'. "
+ f"Available keys: {', '.join(k for k in node if k not in ('smiles', 'formal_charge'))}"
+ )
+ node["dg_value"] = node.pop(self.exp_key)
# Calculate the similarity matrices
strict, loose = db_mol.build_matrices()
@@ -180,25 +196,11 @@ def run_lomap(self) -> None:
for edge in result_dict["edges"]:
_from = edge["from"]
_to = edge["to"]
- # check for the same formal charge
same = self.nodes[_to]["formal_charge"] == self.nodes[_from]["formal_charge"]
- for key in extra_data_numerical:
- try:
- delta = self.nodes[_from][key] - self.nodes[_to][key]
- edge.update({f"delta_{key}": delta})
- except TypeError:
- logger.warning(
- f"The {key} property is not numerical for one the ligands: {_from} | {_to}"
- )
- edge.update({f"delta_{key}": None})
- except KeyError:
- logger.info(
- f"The {key} property is not present for one the ligands: {_from} | {_to} "
- "Information won't be kept in edge..."
- )
+ if self.exp_key:
+ edge["ddg_value"] = self.nodes[_to]["dg_value"] - self.nodes[_from]["dg_value"]
edge.update({"same_charge": same})
same_charges.append(same)
- # update the potential ddG value
if all(same_charges):
logger.info("All ligands have the same formal charge.")
else:
@@ -234,12 +236,19 @@ def parse_arguments() -> argparse.Namespace:
"--time", "-t", type=int, default=30, help="Maximum time in seconds used to perform the MCS search."
)
parser.add_argument("--verbose", "-v", type=str, default="info", help="Verbosity level.")
+ parser.add_argument(
+ "--exp-key",
+ "-exp",
+ type=str,
+ default=None,
+ help="SDF property containing experimental dG. Stored as dg_value on nodes, ddg_value on edges.",
+ )
return parser.parse_args()
def main(args):
# TODO: could implement other lomap arguemnts here
- lomap = LomapWrap(args.input, args.output, args.time, args.verbose)
+ lomap = LomapWrap(args.input, args.output, args.time, args.verbose, exp_key=args.exp_key)
lomap.run_lomap()
diff --git a/src/QligFEP/CLI/parser_base.py b/src/QligFEP/CLI/parser_base.py
index 94461b33..d840e672 100644
--- a/src/QligFEP/CLI/parser_base.py
+++ b/src/QligFEP/CLI/parser_base.py
@@ -227,6 +227,88 @@ def parse_arguments(program: str) -> argparse.Namespace:
"Reproducible random state for the random FEP seed generator. Defaults to None (random FEP seeds)."
),
)
+ parser.add_argument(
+ "-neq",
+ "--neq",
+ dest="neq",
+ action="store_true",
+ help=(
+ "Set up a non-equilibrium (NEQ) FEP instead of the windowed equilibrium approach. "
+ "Instead of many fixed-lambda windows, NEQ runs the `qdyn_neq` engine to drive lambda "
+ "from one endpoint to the other over a single simulation, accumulating the switching "
+ "work. Free energies are obtained from BAR over the forward/reverse work distributions "
+ "(see `qligfep_neq_analyze`). When set, the windowed parameters `--windows` and "
+ "`--sampling` are not used."
+ ),
+ )
+ parser.add_argument(
+ "-neqr",
+ "--neq-reps",
+ dest="neq_reps",
+ type=int,
+ default=5,
+ help=(
+ "NEQ only: number of forward/reverse switching pairs run per replicate. Each replicate "
+ "keeps a continuous endpoint equilibration and fires a forward and reverse switch from "
+ "successive snapshots to decorrelate the work samples. Defaults to 5."
+ ),
+ )
+ parser.add_argument(
+ "-neqs",
+ "--neq-steps",
+ dest="neq_steps",
+ type=int,
+ default=50000,
+ help=(
+ "NEQ only: length of each lambda-switching simulation in MD steps. Recommended >16000. "
+ "Defaults to 50000."
+ ),
+ )
+ parser.add_argument(
+ "-neqes",
+ "--neq-eq-steps",
+ dest="neq_eq_steps",
+ type=int,
+ default=1000,
+ help=(
+ "NEQ only: number of endpoint equilibration steps between successive switches "
+ "(spacing). Recommended >250. Defaults to 1000."
+ ),
+ )
+ parser.add_argument(
+ "-neqrs",
+ "--neq-relax-steps",
+ dest="neq_relax_steps",
+ type=int,
+ default=5000,
+ help=(
+ "NEQ only: length (MD steps) of the one-time endpoint relaxation run at lambda=0 "
+ "and lambda=1 before the first switch, to settle the nearly-decoupled ligand at each "
+ "endpoint. Applied to the first switching iteration; later iterations use the shorter "
+ "--neq-eq-steps (tEQ) spacing. ~10 ps at 2 fs. Defaults to 5000."
+ ),
+ )
+ parser.add_argument(
+ "-L",
+ "--neq-steepness",
+ dest="neq_L",
+ type=float,
+ default=8.0,
+ help=(
+ "NEQ only: steepness `L` of the sigmoidal lambda schedule l(t) = 1/[1+e^(L(t-0.5))]. "
+ "Higher L spends more time near lambda=0 and lambda=1; lower L approaches a linear "
+ "schedule. Recommended between 4 and 16. Defaults to 8. Ignored when "
+ "`--neq-schedule linear` is used."
+ ),
+ )
+ parser.add_argument(
+ "-neqsched",
+ "--neq-schedule",
+ dest="neq_schedule",
+ default="sigmoidal",
+ choices=["sigmoidal", "linear"],
+ help="NEQ only: lambda switching schedule. Defaults to `sigmoidal`.",
+ )
parser.add_argument(
"-log",
"--log-level",
diff --git a/src/QligFEP/CLI/qligfep_cli.py b/src/QligFEP/CLI/qligfep_cli.py
index 7421a966..04baa9c2 100644
--- a/src/QligFEP/CLI/qligfep_cli.py
+++ b/src/QligFEP/CLI/qligfep_cli.py
@@ -41,6 +41,13 @@ def main(args: Optional[argparse.Namespace] = None, **kwargs) -> None:
"dr_force": args.dr_force,
"random_state": args.random_state,
"wath_ligand_only": args.wath_ligand_only,
+ "neq": args.neq,
+ "neq_reps": args.neq_reps,
+ "neq_steps": args.neq_steps,
+ "neq_eq_steps": args.neq_eq_steps,
+ "neq_relax_steps": args.neq_relax_steps,
+ "neq_L": args.neq_L,
+ "neq_schedule": args.neq_schedule,
}
else:
param_dict = {}
@@ -65,6 +72,13 @@ def main(args: Optional[argparse.Namespace] = None, **kwargs) -> None:
command_str += f" --{k}".replace("_", "-")
elif k == "dr_force":
command_str += f" --{k} {v}".replace("dr_force", "distance_restraint_force")
+ elif k == "neq":
+ if v:
+ command_str += " --neq"
+ elif k == "neq_L":
+ command_str += f" --neq-steepness {v}"
+ elif k in ("neq_reps", "neq_steps", "neq_eq_steps", "neq_relax_steps", "neq_schedule"):
+ command_str += f" --{k.replace('_', '-')} {v}"
else:
command_str += f" --{k} {v}"
command_str += f" --restraint_method {args.restraint_method}"
@@ -94,8 +108,9 @@ def main(args: Optional[argparse.Namespace] = None, **kwargs) -> None:
run.write_water_pdb(inputdir)
- logger.debug("Getting the lambdas")
- lambdas = run.get_lambdas(args.windows, args.sampling)
+ if not run.neq:
+ logger.debug("Getting the lambdas")
+ lambdas = run.get_lambdas(args.windows, args.sampling)
logger.debug("Writing atom mapping for distance restraints")
run.avoid_water_protein_clashes(
@@ -109,6 +124,15 @@ def main(args: Optional[argparse.Namespace] = None, **kwargs) -> None:
run.write_FEP_file(change_charges, change_vdw, FEP_vdw, inputdir, lig_size1, lig_size2)
overlapping_atoms = run.set_restraints(writedir, args.restraint_method, strict_check=True)
+ if run.neq:
+ logger.debug("Writing the non-equilibrium MD files")
+ file_list = run.write_MD_neq(inputdir, lig_size1, lig_size2, overlapping_atoms)
+ run.write_neq_runfile(inputdir, file_list)
+ logger.debug(f"Generated files: {file_list}")
+ logger.debug("Writing the submit files")
+ run.write_submitfile(writedir)
+ return
+
# Handling the correct offset here
logger.debug("Writing the MD files")
if args.start == "0.5":
diff --git a/src/QligFEP/CLI/setupFEP.py b/src/QligFEP/CLI/setupFEP.py
index 9dc29f44..41996afc 100644
--- a/src/QligFEP/CLI/setupFEP.py
+++ b/src/QligFEP/CLI/setupFEP.py
@@ -43,6 +43,12 @@ def create_call(**kwargs):
template += " -wath {water_thresh}"
if "wath_ligand_only" in kwargs and kwargs["wath_ligand_only"]:
template += " -wath-ligo"
+ if kwargs.get("neq"):
+ template += (
+ " --neq --neq-reps {neq_reps} --neq-steps {neq_steps} "
+ "--neq-eq-steps {neq_eq_steps} --neq-relax-steps {neq_relax_steps} "
+ "-L {neq_L} --neq-schedule {neq_schedule}"
+ )
return template.format(**kwargs)
@@ -120,6 +126,13 @@ def main(args: Optional[argparse.Namespace] = None, **kwargs) -> None:
water_thresh=args.water_thresh,
log=args.log,
wath_ligand_only=args.wath_ligand_only,
+ neq=args.neq,
+ neq_reps=args.neq_reps,
+ neq_steps=args.neq_steps,
+ neq_eq_steps=args.neq_eq_steps,
+ neq_relax_steps=args.neq_relax_steps,
+ neq_L=args.neq_L,
+ neq_schedule=args.neq_schedule,
)
logger.info(f"Submitting the command:\n{command}")
dst = sys_dir / f"FEP_{lig1}_{lig2}"
diff --git a/src/QligFEP/INPUTS/neq_endpoint.inp b/src/QligFEP/INPUTS/neq_endpoint.inp
new file mode 100644
index 00000000..c208367d
--- /dev/null
+++ b/src/QligFEP/INPUTS/neq_endpoint.inp
@@ -0,0 +1,49 @@
+[MD]
+steps STEPS_VAR
+stepsize STEPSIZE
+temperature T_VAR
+bath_coupling 10.0
+shake_hydrogens STEPTOGGLE
+shake_solute STEPTOGGLE
+shake_solvent on
+lrf on
+separate_scaling on
+
+[cut-offs]
+solute_solvent 10
+solute_solute 10
+solvent_solvent 10
+q_atom 99
+lrf 99
+
+[sphere]
+shell_force 10.0
+shell_radius SPHERE
+
+[solvent]
+radial_force 60.0
+polarisation on
+polarisation_force 20.0
+
+[intervals]
+output OUTPUT_VAR
+trajectory 100000000
+non_bond 25
+
+[files]
+topology dualtop.top
+trajectory neq.dcd
+restart RESTARTFILE
+final FINALFILE
+fep FEP_VAR
+
+[trajectory_atoms]
+not excluded
+
+[lambdas]
+EQ_LAMBDA
+
+[sequence_restraints]
+WATER_RESTRAINT
+
+[distance_restraints]
diff --git a/src/QligFEP/INPUTS/run_neq.sh b/src/QligFEP/INPUTS/run_neq.sh
new file mode 100644
index 00000000..61b1829d
--- /dev/null
+++ b/src/QligFEP/INPUTS/run_neq.sh
@@ -0,0 +1,147 @@
+#!/bin/bash
+#
+#SBATCH --nodes=NODES
+#SBATCH --ntasks-per-node=NTASKS
+#SBATCH --mem-per-cpu=1700 # stay at/below the per-core memory so billing matches the core count
+#SBATCH -A ACCOUNT
+# d-hh:mm:ss
+#SBATCH --time=TIME
+#SBATCH -J JOBNAME
+#SBATCH --array=1-TOTAL_JOBS
+#SBATCH -o slurm.run%a.%N.%j.out
+
+# Number of forward/reverse switching pairs run per replicate.
+neq_reps=NEQ_REPS
+
+# Define your parameters
+temperatures=(TEMP_VAR)
+seeds=(RANDOM_SEEDS)
+runs=${#seeds[@]}
+workdir="$( cd -P "$( dirname "$SOURCE" )" && pwd )"
+inputfiles=$workdir/inputfiles
+fepfile=FEPS
+ncores=$SLURM_NTASKS
+
+# Debug prints
+echo "Number of temperatures: ${#temperatures[@]}"
+echo "Number of runs: $runs"
+echo "Array task ID: $SLURM_ARRAY_TASK_ID"
+echo "Cores available: $ncores"
+
+# Validate inputs
+if [ -z "$runs" ] || [ "$runs" -eq 0 ]; then
+ echo "Error: 'runs' variable is not set or is zero"
+ exit 1
+fi
+if [ ${#temperatures[@]} -eq 0 ]; then
+ echo "Error: No temperatures specified"
+ exit 1
+fi
+
+# Calculate which temperature and replicate this array task corresponds to
+TID=$((SLURM_ARRAY_TASK_ID - 1)) # 0-based
+temp_idx=$((TID / runs))
+run_num=$((TID % runs + 1))
+temperature=${temperatures[$temp_idx]}
+seed=${seeds[$run_num-1]}
+
+## Load modules
+MODULES
+
+## define the MPI equilibration engine (qdynp) and serial switching engine (qdyn_neq)
+QDYN
+QDYN_NEQ
+
+starttime=$(date +%s)
+starttime_readable=$(date)
+
+rundir=$workdir/$temperature/$run_num
+mkdir -p $rundir
+cd $rundir || exit
+
+echo "Running NEQ in $rundir"
+echo "Parameters T=$temperature, replicate=$run_num, seed=$seed, neq_reps=$neq_reps"
+echo
+
+cp $inputfiles/eq*.inp .
+cp $inputfiles/relax_*.inp .
+cp $inputfiles/neq_*.inp .
+cp $inputfiles/*.top .
+cp $inputfiles/$fepfile .
+
+# Custom random seed, temperature and FEP file for this realization
+sed -i "s/SEED_VAR/$seed/" eq1.inp
+sed -i "s/T_VAR/$temperature/" *.inp
+sed -i "s/FEP_VAR/$fepfile/" *.inp
+
+# 1) Equilibration eq1 -> eq5 with the MPI engine across all cores (fixed lambda)
+for i in 1 2 3 4 5; do
+ time mpirun -np $ncores --bind-to core $qdyn eq$i.inp > eq$i.log
+done
+
+# 2) Endpoint equilibration chain (MPI): one continuous trajectory per endpoint,
+# saving a checkpoint per replicate to decorrelate the switch starting points.
+# The first iteration is a longer one-time relaxation (relax_${s}.inp) that settles the
+# nearly-decoupled ligand at each endpoint before any switching; later iterations use the
+# shorter tEQ spacing (eq6_${s}.inp). State 0 sits at lambda (0 1), state 1 at (1 0).
+cp eq5.re eq6_0_prev.re
+cp eq5.re eq6_1_prev.re
+for rep in $(seq 0 $((neq_reps - 1))); do
+ for s in 0 1; do
+ if [ "$rep" -eq 0 ]; then eqsrc=relax_${s}.inp; else eqsrc=eq6_${s}.inp; fi
+ sed "s|RESTARTFILE|eq6_${s}_prev.re|; s|FINALFILE|eq6_${s}_${rep}.re|" \
+ "$eqsrc" > eq6_${s}_run${rep}.inp
+ time mpirun -np $ncores --bind-to core $qdyn eq6_${s}_run${rep}.inp > eq6_${s}_${rep}.log
+ cp eq6_${s}_${rep}.re eq6_${s}_prev.re
+ done
+done
+
+# 3) Lambda switches with the serial engine, one per core. Each switch only needs
+# its own eq6 checkpoint, so they are independent and run concurrently: mpirun
+# launches one rank per switch, --bind-to core pins each to its own core, and the
+# launcher routes each rank to its (input, log) pair. State 1 (1->0) is the forward
+# work logged as neq_1; state 0 (0->1) is the reverse work logged as neq_0.
+: > switch_list.txt
+for rep in $(seq 0 $((neq_reps - 1))); do
+ for s in 0 1; do
+ sed "s|RESTARTFILE|eq6_${s}_${rep}.re|; s|FINALFILE|neq_${s}_${rep}.re|" \
+ neq_${s}.inp > neq_${s}_run${rep}.inp
+ echo "neq_${s}_run${rep}.inp neq_${s}_${rep}.log" >> switch_list.txt
+ done
+done
+
+cat > neq_launch.sh < "\$2"
+EOF
+chmod +x neq_launch.sh
+
+nsw=$(wc -l < switch_list.txt)
+for (( off=0; off dict:
+ """Point estimate and bootstrap confidence interval for a comparison statistic.
+
+ Resamples the paired ``(y_true, y_pred)`` values with replacement ``nbootstrap``
+ times and reports the statistic computed on the full data (``mle``) together with
+ the ``ci`` confidence-interval bounds (``low``/``high``). Supported statistics are
+ "RMSE", "MUE" and "KTAU".
+ """
+ if rng is None:
+ rng = np.random.default_rng(12345)
+ compute = {
+ "RMSE": lambda a, b: float(np.sqrt(np.mean((a - b) ** 2))),
+ "MUE": lambda a, b: float(np.mean(np.abs(a - b))),
+ "KTAU": lambda a, b: float(kendalltau(a, b)[0]),
+ }[statistic]
+
+ n = len(y_true)
+ s_n = np.empty(nbootstrap)
+ for replicate in range(nbootstrap):
+ idx = rng.choice(n, size=n, replace=True)
+ s_n[replicate] = compute(y_true[idx], y_pred[idx])
+ s_n.sort()
+
+ low_frac = (1.0 - ci) / 2.0
+ low_idx = int(np.floor(nbootstrap * low_frac))
+ high_idx = min(int(np.ceil(nbootstrap * (1.0 - low_frac))), nbootstrap - 1)
+ return {
+ "mle": compute(y_true, y_pred),
+ "low": float(s_n[low_idx]),
+ "high": float(s_n[high_idx]),
+ }
+
+
+def create_ddG_plot(
+ results_df: pd.DataFrame,
+ margin: float = 1.0,
+ xylims: tuple | None = None,
+ output_path: str | None = None,
+ target_name: str | None = None,
+ savefig: bool = False,
+ font: str | None = None,
+):
+ """Creates the ddG plot for the FEP that has already been analyzed. The plot will
+ show the experimental (X axis) vs mean predicted values (Y axis), with error bars
+ representing the standard error of the mean (SEM). Points are colored based on
+ their deviation from experimental values (blue = 0 deviation, red = 3+ kcal/mol).
+
+ Args:
+ reuslts_df: pd.DataFrame with the results from the FEP, output from `prepare_df`.
+ margin: margin value to be added/subtracted to the max/min values obtained. Defaults to 1.0.
+ xylims: if values are passed, x&y min will be xylims[0] and max will be [1]. Defaults to None.
+ output_path: path to save the plot. If None, the plot will not be saved. Defaults to None.
+ target_name: name of the target protein to be added in the plot. Defaults to None.
+ savefig: if True, will save the plot to the output_path. Defaults to False.
+
+ Returns:
+ the matplotlib figure and axis objects (fig, ax).
+ """
+ fep_names = results_df["fep_name"].values
+ avg_values = results_df["Q_ddG_avg"].values
+ sem_values = results_df["Q_ddG_sem"].values
+ exp_values = results_df["ddg_value"].values
+ nan_val_idxs = np.where(np.isnan(avg_values))[0]
+ if len(nan_val_idxs) > 0:
+ logger.warning(f"Dropping FEPs with nan values: {fep_names[nan_val_idxs]}")
+ mask = ~np.isin(np.arange(len(avg_values)), nan_val_idxs)
+ avg_values = avg_values[mask]
+ sem_values = sem_values[mask]
+ exp_values = exp_values[mask]
+
+ # Calculate absolute deviations for coloring
+ deviations = np.abs(avg_values - exp_values)
+
+ # Create colormap from blue to red, with max at 3 kcal/mol
+ norm = mcolors.Normalize(vmin=0, vmax=4)
+ cmap = cm.RdBu_r
+
+ ## CALCULATE STATISTICS
+ def result_to_latex(res, latexify_each=False): # TODO: move this out of this method?
+ """Round the statistic's output to one decimal case and return a LaTeX string."""
+ mle = round(res["mle"], 2)
+ low = round(res["low"], 2)
+ high = round(res["high"], 2)
+
+ if latexify_each:
+ return f"${mle:.2f}_{{{low}}}^{{{high}}}$"
+ else:
+ return f"{mle:.2f}_{{{low}}}^{{{high}}}"
+
+ statistics = ["RMSE", "MUE", "KTAU"]
+ stats_dict = {}
+ for stat in statistics:
+ boot = bootstrap_statistic(avg_values, exp_values, statistic=stat)
+ stats_dict[stat] = result_to_latex(boot)
+
+ if xylims is not None:
+ assert len(xylims) == 2, "xylims must be a tuple with 2 elements."
+ assert xylims[0] < xylims[1], "xylims[0] must be smaller than xylims[1]."
+ min_val = xylims[0]
+ max_val = xylims[1]
+ else:
+ all_values = np.concatenate([avg_values, exp_values])
+ min_val = min(all_values) - margin
+ max_val = max(all_values) + margin
+
+ fig, ax = plt.subplots(figsize=(7, 4.5))
+
+ # Plot colored points with error bars
+ scatter = plt.errorbar(
+ exp_values,
+ avg_values,
+ fmt="o",
+ yerr=sem_values,
+ ecolor="gray",
+ elinewidth=1.5,
+ capsize=2,
+ zorder=4,
+ linestyle="None",
+ markersize=8,
+ )
+
+ # Remove the default markers and add colored ones
+ scatter[0].remove()
+
+ # Add the colored scatter points
+ scatter_points = plt.scatter(
+ exp_values,
+ avg_values,
+ c=deviations,
+ cmap=cmap,
+ norm=norm,
+ s=45,
+ zorder=5,
+ edgecolors="black",
+ linewidths=0.5,
+ alpha=0.8,
+ )
+
+ plt.plot([min_val, max_val], [min_val, max_val], "k-", linewidth=1.5, zorder=3) # Black identity line
+
+ # Highlight predictions within 1 and 2 kcal/mol of the experimental affinity
+ ax.fill_between(
+ [min_val, max_val],
+ [min_val - 1, max_val - 1],
+ [min_val + 1, max_val + 1],
+ color="darkgray",
+ alpha=0.3,
+ zorder=2,
+ )
+ ax.fill_between(
+ [min_val, max_val],
+ [min_val - 2, max_val - 2],
+ [min_val + 2, max_val + 2],
+ color="lightgray",
+ alpha=0.3,
+ zorder=1,
+ )
+
+ # Add colorbar
+ cbar = plt.colorbar(scatter_points, ax=ax, shrink=0.40, aspect=10, anchor=(0.0, 0.85), pad=0.05)
+
+ cbar.set_label("|Deviation| (kcal/mol)", rotation=270, labelpad=20)
+ cbar.ax.tick_params(labelsize=10)
+ cbar.set_ticks([0, 1, 2, 3, 4])
+ cbar.set_ticklabels(["0", "1", "2", "3", "≥4"])
+ cbar.ax.tick_params(labelsize=10)
+
+ # set labels, make it square and add legend
+ plt.title(
+ f"{(target_name + ' ' if target_name is not None else '')}"
+ r"$\Delta\Delta \text{G}_{\text{BAR}}$ ($\mathrm{N}="
+ f"{len(exp_values)}$)"
+ )
+ plt.xlabel(r"$\Delta\Delta G_{exp} (kcal/mol)$")
+ plt.ylabel(r"$\Delta\Delta G_{calc} (kcal/mol)$")
+ plt.xlim(min_val, max_val)
+ plt.ylim(min_val, max_val)
+ ax.set_aspect("equal", adjustable="box")
+
+ # add statistics to the plot
+ unit = r"\frac{kcal}{mol}"
+ text_body = (
+ f"$\\tau = {stats_dict['KTAU']}$",
+ f"RMSE = ${stats_dict['RMSE']} {unit}$",
+ f"MUE = ${stats_dict['MUE']} {unit}$",
+ )
+ logger.info(f"Stats: {' '.join(text_body)}")
+ hori_height = 0.35
+ spacing = 0.085
+ txt_positions = (
+ (1.04, hori_height),
+ (1.04, hori_height - spacing),
+ (1.04, hori_height - spacing * 2),
+ )
+ for txt_position, body in zip(txt_positions, text_body):
+ plt.text(
+ *txt_position,
+ body,
+ fontsize=12,
+ verticalalignment="bottom",
+ horizontalalignment="left",
+ transform=ax.transAxes,
+ fontproperties=font,
+ )
+
+ legend_elements = [
+ Line2D([0], [0], color="k", linestyle="-", label="Identity line"),
+ Patch(facecolor="darkgray", alpha=0.3, label="Within 1 kcal/mol"),
+ Patch(facecolor="lightgray", alpha=0.3, label="Within 2 kcal/mol"),
+ ]
+
+ ax.legend(
+ handles=legend_elements,
+ bbox_to_anchor=(1.04, 0),
+ loc="lower left",
+ borderaxespad=0,
+ frameon=False,
+ )
+ # Remove top and right spines using matplotlib
+ ax.spines["top"].set_visible(False)
+ ax.spines["right"].set_visible(False)
+ ax.grid(True, linestyle="--", linewidth=0.5, alpha=0.5)
+ ax.set_axisbelow(True)
+
+ if savefig:
+ if output_path is None:
+ output_path = Path().cwd()
+ logger.info("Using default name to save the plot at the current working directory...")
+ fig.savefig(f"{target_name}_ddG_plot.png", dpi=300, bbox_inches="tight")
+ return fig, ax
+ if isinstance(output_path, str):
+ output_path = Path(output_path)
+ assert isinstance(output_path, Path), "output_path must be a string or a Path object."
+ if output_path.is_dir():
+ output_path = output_path / f"{target_name}_ddG_plot.png"
+ logger.info(f"Using default name to save the plot at {output_path}")
+ elif output_path.exists():
+ logger.warning(f"File {output_path} already exists. Overwriting...")
+ fig.savefig(output_path, dpi=300, bbox_inches="tight")
+ return fig, ax
diff --git a/src/QligFEP/analyze_FEP.py b/src/QligFEP/analyze_FEP.py
index b9c945ad..af51df5e 100644
--- a/src/QligFEP/analyze_FEP.py
+++ b/src/QligFEP/analyze_FEP.py
@@ -7,15 +7,10 @@
from pathlib import Path
from typing import Optional
-import matplotlib.cm as cm
-import matplotlib.colors as mcolors
-import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
-from cinnabar import stats as cstats
-from matplotlib.lines import Line2D
-from matplotlib.patches import Patch
+from .analysis_plotting import create_ddG_plot, prepare_df
from .IO import read_qfep, read_qfep_verbose, run_command
from .logger import logger, setup_logger
from .settings.settings import Q_PATHS
@@ -489,234 +484,6 @@ def populate_mapping_dictionary(self, method, output_file: str | Path | None = N
with output_file.open("w") as f:
json.dump(self.mapping_json, f, indent=4)
- @staticmethod
- def prepare_df(json_dict, experimental_data: bool = True):
- pref = "dg" if "dg_error" in json_dict["edges"][0] else "ddg"
- df = pd.DataFrame(json_dict["edges"])
- if experimental_data:
- df = (
- df.assign(
- ddg_value=lambda x: x[pref + "_value"],
- residual=lambda x: x[pref + "_value"] - x["Q_ddG_avg"],
- residual_abs=lambda x: x["residual"].abs(),
- )
- .sort_values("residual_abs", ascending=False)
- .drop(columns="residual_abs")
- )
- df = df.assign(
- fep_name=lambda x: "FEP_" + x["from"] + "_" + x["to"],
- )
- return df
-
- @staticmethod
- def create_ddG_plot(
- results_df: pd.DataFrame,
- margin: float = 1.0,
- xylims: tuple | None = None,
- output_path: str | None = None,
- target_name: str | None = None,
- savefig: bool = False,
- font: str | None = None,
- ):
- """Creates the ddG plot for the FEP that has already been analyzed. The plot will
- show the experimental (X axis) vs mean predicted values (Y axis), with error bars
- representing the standard error of the mean (SEM). Points are colored based on
- their deviation from experimental values (blue = 0 deviation, red = 3+ kcal/mol).
-
- Args:
- reuslts_df: pd.DataFrame with the results from the FEP, output from `prepare_df`.
- margin: margin value to be added/subtracted to the max/min values obtained. Defaults to 1.0.
- xylims: if values are passed, x&y min will be xylims[0] and max will be [1]. Defaults to None.
- output_path: path to save the plot. If None, the plot will not be saved. Defaults to None.
- target_name: name of the target protein to be added in the plot. Defaults to None.
- savefig: if True, will save the plot to the output_path. Defaults to False.
-
- Returns:
- the matplotlib figure and axis objects (fig, ax).
- """
- fep_names = results_df["fep_name"].values
- avg_values = results_df["Q_ddG_avg"].values
- sem_values = results_df["Q_ddG_sem"].values
- exp_values = results_df["ddg_value"].values
- nan_val_idxs = np.where(np.isnan(avg_values))[0]
- if len(nan_val_idxs) > 0:
- logger.warning(f"Dropping FEPs with nan values: {fep_names[nan_val_idxs]}")
- mask = ~np.isin(np.arange(len(avg_values)), nan_val_idxs)
- avg_values = avg_values[mask]
- sem_values = sem_values[mask]
- exp_values = exp_values[mask]
-
- # Calculate absolute deviations for coloring
- deviations = np.abs(avg_values - exp_values)
-
- # Create colormap from blue to red, with max at 3 kcal/mol
- norm = mcolors.Normalize(vmin=0, vmax=4)
- cmap = cm.RdBu_r
-
- ## CALCULATE STATISTICS
- def result_to_latex(res, latexify_each=False): # TODO: move this out of this method?
- """Round cinnabar's output to one decimal case and return a LaTeX string."""
- mle = round(res["mle"], 2)
- low = round(res["low"], 2)
- high = round(res["high"], 2)
-
- if latexify_each:
- return f"${mle:.2f}_{{{low}}}^{{{high}}}$"
- else:
- return f"{mle:.2f}_{{{low}}}^{{{high}}}"
-
- statistics = ["RMSE", "MUE", "KTAU"]
- stats_dict = {}
- for stat in statistics:
- cinnabar_stats = cstats.bootstrap_statistic(avg_values, exp_values, statistic=stat)
- stats_dict[stat] = result_to_latex(cinnabar_stats)
-
- if xylims is not None:
- assert len(xylims) == 2, "xylims must be a tuple with 2 elements."
- assert xylims[0] < xylims[1], "xylims[0] must be smaller than xylims[1]."
- min_val = xylims[0]
- max_val = xylims[1]
- else:
- all_values = avg_values + exp_values
- min_val = min(all_values) - margin
- max_val = max(all_values) + margin
-
- fig, ax = plt.subplots(figsize=(7, 4.5))
-
- # Plot colored points with error bars
- scatter = plt.errorbar(
- exp_values,
- avg_values,
- fmt="o",
- yerr=sem_values,
- ecolor="gray",
- elinewidth=1.5,
- capsize=2,
- zorder=4,
- linestyle="None",
- markersize=8,
- )
-
- # Remove the default markers and add colored ones
- scatter[0].remove()
-
- # Add the colored scatter points
- scatter_points = plt.scatter(
- exp_values,
- avg_values,
- c=deviations,
- cmap=cmap,
- norm=norm,
- s=45,
- zorder=5,
- edgecolors="black",
- linewidths=0.5,
- alpha=0.8,
- )
-
- plt.plot([min_val, max_val], [min_val, max_val], "k-", linewidth=1.5, zorder=3) # Black identity line
-
- # Highlight predictions within 1 and 2 kcal/mol of the experimental affinity
- ax.fill_between(
- [min_val, max_val],
- [min_val - 1, max_val - 1],
- [min_val + 1, max_val + 1],
- color="darkgray",
- alpha=0.3,
- zorder=2,
- )
- ax.fill_between(
- [min_val, max_val],
- [min_val - 2, max_val - 2],
- [min_val + 2, max_val + 2],
- color="lightgray",
- alpha=0.3,
- zorder=1,
- )
-
- # Add colorbar
- cbar = plt.colorbar(scatter_points, ax=ax, shrink=0.40, aspect=10, anchor=(0.0, 0.85), pad=0.05)
-
- cbar.set_label("|Deviation| (kcal/mol)", rotation=270, labelpad=20)
- cbar.ax.tick_params(labelsize=10)
- cbar.set_ticks([0, 1, 2, 3, 4])
- cbar.set_ticklabels(["0", "1", "2", "3", "≥4"])
- cbar.ax.tick_params(labelsize=10)
-
- # set labels, make it square and add legend
- plt.title(
- f"{(target_name + ' ' if target_name is not None else '')}"
- r"$\Delta\Delta \text{G}_{\text{BAR}}$ ($\mathrm{N}="
- f"{len(exp_values)}$)"
- )
- plt.xlabel("$\Delta\Delta G_{exp} (kcal/mol)$") # noqa: W605
- plt.ylabel("$\Delta\Delta G_{calc} (kcal/mol)$") # noqa: W605
- plt.xlim(min_val, max_val)
- plt.ylim(min_val, max_val)
- ax.set_aspect("equal", adjustable="box")
-
- # add statistics to the plot
- unit = r"\frac{kcal}{mol}"
- text_body = (
- f"$\\tau = {stats_dict['KTAU']}$",
- f"RMSE = ${stats_dict['RMSE']} {unit}$",
- f"MUE = ${stats_dict['MUE']} {unit}$",
- )
- logger.info(f"Stats: {' '.join(text_body)}")
- hori_height = 0.35
- spacing = 0.085
- txt_positions = (
- (1.04, hori_height),
- (1.04, hori_height - spacing),
- (1.04, hori_height - spacing * 2),
- )
- for txt_position, body in zip(txt_positions, text_body):
- plt.text(
- *txt_position,
- body,
- fontsize=12,
- verticalalignment="bottom",
- horizontalalignment="left",
- transform=ax.transAxes,
- fontproperties=font,
- )
-
- legend_elements = [
- Line2D([0], [0], color="k", linestyle="-", label="Identity line"),
- Patch(facecolor="darkgray", alpha=0.3, label="Within 1 kcal/mol"),
- Patch(facecolor="lightgray", alpha=0.3, label="Within 2 kcal/mol"),
- ]
-
- ax.legend(
- handles=legend_elements,
- bbox_to_anchor=(1.04, 0),
- loc="lower left",
- borderaxespad=0,
- frameon=False,
- )
- # Remove top and right spines using matplotlib
- ax.spines["top"].set_visible(False)
- ax.spines["right"].set_visible(False)
- ax.grid(True, linestyle="--", linewidth=0.5, alpha=0.5)
- ax.set_axisbelow(True)
-
- if savefig:
- if output_path is None:
- output_path = Path().cwd()
- logger.info("Using default name to save the plot at the current working directory...")
- fig.savefig(f"{target_name}_ddG_plot.png", dpi=300, bbox_inches="tight")
- return fig, ax
- if isinstance(output_path, str):
- output_path = Path(output_path)
- assert isinstance(output_path, Path), "output_path must be a string or a Path object."
- if output_path.isdir():
- output_path = output_path / f"{target_name}_ddG_plot.png"
- logger.info(f"Using default name to save the plot at {output_path}")
- elif output_path.exists(): # Fixed typo: exits() -> exists()
- logger.warning(f"File {output_path} already exists. Overwriting...")
- fig.savefig(output_path, dpi=300, bbox_inches="tight")
- return fig, ax
-
def save_json_data(self, out_path: str | Path | None = None):
"""Save the data dictionary to a json file.
@@ -763,7 +530,7 @@ def parse_arguments() -> argparse.Namespace:
parser.add_argument(
"-norun",
- "--no_run_data",
+ "--no-run-data",
dest="no_run_data",
action="store_true",
help=(
@@ -884,14 +651,14 @@ def main(args: argparse.Namespace):
if args.experimental_key is not None:
fep_reader.load_experimental_data(exp_key=args.experimental_key)
results_json = json.loads((Path.cwd() / results_file).read_text())
- results_df = fep_reader.prepare_df(results_json)
+ results_df = prepare_df(results_json)
if fep_reader.ignored_edges:
results_df = results_df.query("~fep_name.isin(@fep_reader.ignored_edges)").reset_index(drop=True)
- fig, ax = fep_reader.create_ddG_plot(results_df=results_df)
+ fig, ax = create_ddG_plot(results_df=results_df)
fig.savefig(f"{args.target}_ddG_plot.png", dpi=300, bbox_inches="tight")
else:
results_json = json.loads((Path.cwd() / results_file).read_text())
- results_df = fep_reader.prepare_df(results_json, experimental_data=False)
+ results_df = prepare_df(results_json, experimental_data=False)
if args.save_verbose:
verbose_qEnergies = pd.concat(fep_reader.verbose_qEnergies)
verbose_dgBar = pd.concat(fep_reader.verbose_dgBar)
diff --git a/src/QligFEP/analyze_neq.py b/src/QligFEP/analyze_neq.py
new file mode 100644
index 00000000..62011c55
--- /dev/null
+++ b/src/QligFEP/analyze_neq.py
@@ -0,0 +1,547 @@
+"""Module containing the analysis for non-equilibrium (NEQ) FEP calculations.
+
+The NEQ run script (run_neq.sh, generated by QligFEP with ``--neq``) drives a series of
+forward and reverse lambda-switching simulations with the ``qdyn_neq`` engine. Each switch
+writes its accumulated work to the run log (lines containing ``work accumulated``). This
+module reads those work values and estimates the free energy difference with the Bennett
+Acceptance Ratio (BAR), bootstrapping for uncertainty, and combines the protein and water
+legs into the relative binding free energy ``ddG = dF_protein - dF_water``.
+
+Logs are read from the directory layout produced by ``run_neq.sh``: forward switches are
+written to ``neq_1_*.log`` and reverse switches to ``neq_0_*.log`` (the labels follow the
+original implementation: state 1 sweeps lambda 1->0, state 0 sweeps lambda 0->1).
+
+When a mapping JSON and experimental key are supplied, the per-edge ddG is compared to
+experiment (correlation metrics and the shared ddG plot). Switch-completion counts and the
+per-replicate slurm run status are reported as run diagnostics.
+"""
+
+import argparse
+import glob
+import json
+import os
+from pathlib import Path
+from typing import Optional
+
+import numpy as np
+import pandas as pd
+from scipy.optimize import brentq
+from scipy.stats import gaussian_kde, kendalltau, pearsonr, spearmanr
+
+from .analysis_plotting import create_ddG_plot
+from .logger import logger, setup_logger
+
+# Boltzmann constant in the two energy units Q can report work in.
+KB_KCAL = 0.0019872041 # kcal/(mol*K)
+KB_KJ = 0.0083144621 # kJ/(mol*K)
+
+# The switching work written by qdyn_neq is accumulated from Q energy components in kcal/mol.
+# BAR needs beta*W to be dimensionless, so with work in kcal/mol the beta factor is
+# 1/(k_B*T) (~1.68 mol/kcal at 300 K); the "kcal" units apply this and are the default. The
+# "kT" units instead use beta = 1, treating the work as already in units of k_B*T. The BAR
+DEFAULT_WORK_UNITS = "kcal"
+
+
+def beta_from_units(work_units: str, temperature: float) -> float:
+ """Return the BAR beta factor for the requested work units (see the module note)."""
+ if work_units == "kT":
+ return 1.0
+ if work_units == "kcal":
+ return 1.0 / (KB_KCAL * temperature)
+ if work_units == "kJ":
+ return 1.0 / (KB_KJ * temperature)
+ raise ValueError(f"Unknown work units: {work_units!r}")
+
+
+def dF_to_kcal(dF: float, work_units: str, temperature: float) -> float:
+ """Convert a BAR free energy expressed in the work units back to kcal/mol."""
+ if work_units == "kT":
+ return dF * KB_KCAL * temperature
+ if work_units == "kcal":
+ return dF
+ if work_units == "kJ":
+ return dF / (KB_KJ / KB_KCAL)
+ raise ValueError(f"Unknown work units: {work_units!r}")
+
+
+def read_final_work(log_path: str) -> Optional[float]:
+ """Read the final accumulated switching work from a qdyn_neq log.
+
+ Each switch prints ``At step N, work accumulated was ...`` every output interval;
+ the last such line holds the total switching work. Returns None if the log does not
+ exist or did not terminate normally.
+ """
+ if not os.path.exists(log_path):
+ return None
+ with open(log_path) as handle:
+ lines = handle.readlines()
+ if not lines or "terminated normally" not in "".join(lines[-15:]):
+ return None
+ for line in reversed(lines):
+ if "work accumulated" in line:
+ try:
+ return float(line.split()[6])
+ except (IndexError, ValueError):
+ logger.warning(f"Could not parse the work value in {log_path}")
+ return None
+ return None
+
+
+def collect_works_with_counts(folder: str) -> tuple[list[float], list[float], dict]:
+ """Collect switch works and report how many switches were attempted vs completed.
+
+ A switch log exists once qdyn_neq starts; ``read_final_work`` returns None when the switch
+ did not finish (SHAKE failure, time limit, or a high-energy nearly-decoupled-ligand crash).
+ The attempted-minus-completed difference is the count of failed switches, the instability
+ the manuscript attributes BAR work-overlap problems to. Forward switches are neq_1_*.log,
+ reverse are neq_0_*.log, gathered recursively below ``folder``.
+ """
+ forward, reverse = [], []
+ counts = {
+ "forward_attempted": 0,
+ "forward_completed": 0,
+ "reverse_attempted": 0,
+ "reverse_completed": 0,
+ }
+ for log_path in glob.glob(os.path.join(folder, "**", "neq_1_*.log"), recursive=True):
+ counts["forward_attempted"] += 1
+ work = read_final_work(log_path)
+ if work is not None:
+ forward.append(work)
+ counts["forward_completed"] += 1
+ for log_path in glob.glob(os.path.join(folder, "**", "neq_0_*.log"), recursive=True):
+ counts["reverse_attempted"] += 1
+ work = read_final_work(log_path)
+ if work is not None:
+ reverse.append(work)
+ counts["reverse_completed"] += 1
+ counts["failed"] = (counts["forward_attempted"] - counts["forward_completed"]) + (
+ counts["reverse_attempted"] - counts["reverse_completed"]
+ )
+ return forward, reverse, counts
+
+
+def collect_works(folder: str) -> tuple[list[float], list[float]]:
+ """Collect the final work of every completed switch under a leg directory.
+
+ Returns (forward_works, reverse_works) gathered from neq_1_*.log and neq_0_*.log found
+ recursively below ``folder``.
+ """
+ forward, reverse, _ = collect_works_with_counts(folder)
+ return forward, reverse
+
+
+def _bracket_root(func, x0: float = 0.0, step: float = 1.0, max_expand: int = 1000):
+ """Bracket the root of a monotonically increasing function around x0."""
+ f0 = func(x0)
+ if f0 == 0.0:
+ return x0, x0
+ direction = -1.0 if f0 > 0 else 1.0 # increasing func: f(x0)>0 means the root is below
+ x_prev, f_prev = x0, f0
+ expand = step
+ for _ in range(max_expand):
+ x_new = x0 + direction * expand
+ f_new = func(x_new)
+ if f_new == 0.0:
+ return x_new, x_new
+ if (f_prev < 0) != (f_new < 0):
+ return (min(x_prev, x_new), max(x_prev, x_new))
+ x_prev, f_prev = x_new, f_new
+ expand *= 1.5
+ raise RuntimeError("Could not bracket the BAR root; check the work distributions")
+
+
+def bar_delta_f(work_forward, work_reverse, beta: float = 1.0) -> float:
+ """Bennett Acceptance Ratio estimate of the free energy difference.
+
+ Solves sum_i 1/(1+exp(beta*(Wf_i - dF))) = sum_j 1/(1+exp(beta*(Wr_j + dF))) for dF.
+ The work arrays are the switching works of the forward and reverse processes, in the
+ energy units matching ``beta`` (see ``beta_from_units``).
+ """
+ wf = np.asarray(work_forward, dtype=float)
+ wr = np.asarray(work_reverse, dtype=float)
+ if wf.size == 0 or wr.size == 0:
+ raise ValueError("BAR needs at least one forward and one reverse work value")
+
+ def objective(dF):
+ a = 1.0 / (1.0 + np.exp(beta * (wf - dF)))
+ b = 1.0 / (1.0 + np.exp(beta * (wr + dF)))
+ return a.sum() - b.sum()
+
+ lo, hi = _bracket_root(objective, x0=float((wf.mean() - wr.mean()) / 2.0))
+ if lo == hi:
+ return lo
+ return brentq(objective, lo, hi)
+
+
+def work_overlap(work_forward, work_reverse) -> float:
+ """Overlap of the forward and negated-reverse work distributions (0 = none, 1 = full)."""
+ wf = np.asarray(work_forward, dtype=float)
+ wr_neg = -np.asarray(work_reverse, dtype=float)
+ if wf.size < 2 or wr_neg.size < 2 or np.ptp(wf) == 0 or np.ptp(wr_neg) == 0:
+ return float("nan")
+ kde_f = gaussian_kde(wf)
+ kde_r = gaussian_kde(wr_neg)
+ grid = np.linspace(min(wf.min(), wr_neg.min()) - 1, max(wf.max(), wr_neg.max()) + 1, 2000)
+ # np.trapz was renamed to np.trapezoid in NumPy 2.0; fall back on older NumPy.
+ trapezoid = np.trapezoid if hasattr(np, "trapezoid") else np.trapz # noqa: NPY201
+ return float(trapezoid(np.minimum(kde_f(grid), kde_r(grid)), grid))
+
+
+def bar_with_uncertainty(work_forward, work_reverse, beta=1.0, n_bootstrap=1000, rng=None):
+ """BAR free energy with a bootstrap standard error and the work-distribution overlap.
+
+ Returns (dF, dF_err, overlap).
+ """
+ if rng is None:
+ rng = np.random.default_rng(12345)
+ wf = np.asarray(work_forward, dtype=float)
+ wr = np.asarray(work_reverse, dtype=float)
+ dF = bar_delta_f(wf, wr, beta)
+
+ boot = []
+ for _ in range(n_bootstrap):
+ wf_bs = wf[rng.integers(0, wf.size, wf.size)]
+ wr_bs = wr[rng.integers(0, wr.size, wr.size)]
+ try:
+ boot.append(bar_delta_f(wf_bs, wr_bs, beta))
+ except (RuntimeError, ValueError):
+ continue
+ dF_err = float(np.std(boot, ddof=1)) if len(boot) > 1 else float("nan")
+ return dF, dF_err, work_overlap(wf, wr)
+
+
+def analyze_leg(folder: str, beta: float, n_bootstrap: int, rng) -> dict:
+ """Collect works for a single leg (protein or water) and run BAR."""
+ forward, reverse, counts = collect_works_with_counts(folder)
+ result = {"n_forward": len(forward), "n_reverse": len(reverse), "n_failed": counts["failed"]}
+ if not forward or not reverse:
+ logger.warning(f"No complete forward/reverse work values found in {folder}")
+ result.update({"dF": float("nan"), "dF_err": float("nan"), "overlap": float("nan")})
+ return result
+ dF, dF_err, overlap = bar_with_uncertainty(forward, reverse, beta, n_bootstrap, rng)
+ result.update({"dF": dF, "dF_err": dF_err, "overlap": overlap})
+ return result
+
+
+def analyze_edge(name, protein_dir, water_dir, beta, work_units, temperature, n_bootstrap, rng):
+ """Compute the relative binding free energy for one perturbation edge."""
+ protein = analyze_leg(str(protein_dir), beta, n_bootstrap, rng)
+ water = analyze_leg(str(water_dir), beta, n_bootstrap, rng)
+ dF_p = dF_to_kcal(protein["dF"], work_units, temperature)
+ dF_w = dF_to_kcal(water["dF"], work_units, temperature)
+ err_p = dF_to_kcal(protein["dF_err"], work_units, temperature)
+ err_w = dF_to_kcal(water["dF_err"], work_units, temperature)
+ return {
+ "edge": name,
+ "ddG_kcal": dF_p - dF_w,
+ "ddG_err_kcal": float(np.sqrt(err_p**2 + err_w**2)),
+ "dF_protein_kcal": dF_p,
+ "dF_water_kcal": dF_w,
+ "overlap_protein": protein["overlap"],
+ "overlap_water": water["overlap"],
+ "n_forward_protein": protein["n_forward"],
+ "n_reverse_protein": protein["n_reverse"],
+ "n_failed_protein": protein["n_failed"],
+ "n_forward_water": water["n_forward"],
+ "n_reverse_water": water["n_reverse"],
+ "n_failed_water": water["n_failed"],
+ }
+
+
+# SLURM failure markers (shared with the equilibrium analyzer's status detection).
+RUN_FAILURE_KEYWORDS = {
+ "DUE TO TIME LIMIT": "TIMEOUT",
+ "CANCELLED": "CANCELLED",
+ "Out Of Memory": "OOM",
+ "abnormally": "CRASHED",
+}
+
+
+def parse_run_diagnostics(edge_dir: str) -> list[dict]:
+ """Read per-replicate run metadata from the NEQ slurm*.out files in an edge directory.
+
+ run_neq.sh writes ``# Runtime:``, ``# Random seed:`` and ``# Replicate Number:``
+ footers. Status is SUCCESS unless a known SLURM failure marker is present. Returns one dict
+ per slurm*.out found (empty list if none), so missing logs degrade gracefully.
+ """
+ diagnostics = []
+ for slurm_out in sorted(glob.glob(os.path.join(edge_dir, "slurm*.out"))):
+ with open(slurm_out) as handle:
+ text = handle.read()
+ runtime, seed, replicate, status = "", None, None, "SUCCESS"
+ for line in text.splitlines():
+ if line.startswith("# Runtime:"):
+ runtime = line.split()[-1].strip()
+ elif line.startswith("# Random seed:"):
+ seed = line.split()[-1].strip()
+ elif line.startswith("# Replicate Number:"):
+ replicate = line.split()[-1].strip()
+ for keyword, label in RUN_FAILURE_KEYWORDS.items():
+ if keyword in text:
+ status = label
+ break
+ diagnostics.append({"replicate": replicate, "runtime": runtime, "seed": seed, "status": status})
+ return diagnostics
+
+
+def load_experimental_ddG(mapping_json: str, exp_key: str) -> dict:
+ """Map each edge's ``FEP__`` name to its experimental ddG from the mapping JSON
+ (the same file used by setupFEP / qlomap). Edges lacking ``exp_key`` are skipped.
+ """
+ with open(mapping_json) as handle:
+ mapping = json.load(handle)
+ experimental = {}
+ for edge in mapping.get("edges", []):
+ if edge.get(exp_key) is not None:
+ experimental[f"FEP_{edge['from']}_{edge['to']}"] = edge[exp_key]
+ return experimental
+
+
+def correlation_metrics(predicted, experimental) -> dict:
+ """Agreement metrics between predicted and experimental ddG, over their finite pairs.
+
+ R2 is the squared Pearson correlation (as reported for the manuscript comparison).
+ """
+ pred = np.asarray(predicted, dtype=float)
+ exp = np.asarray(experimental, dtype=float)
+ mask = ~(np.isnan(pred) | np.isnan(exp))
+ pred, exp = pred[mask], exp[mask]
+ n = int(pred.size)
+ metrics = {key: float("nan") for key in ("r2", "pearson", "spearman", "kendall", "rmse", "mae")}
+ metrics["n"] = n
+ if n >= 1:
+ metrics["rmse"] = float(np.sqrt(np.mean((pred - exp) ** 2)))
+ metrics["mae"] = float(np.mean(np.abs(pred - exp)))
+ if n >= 2:
+ pearson = float(pearsonr(pred, exp)[0])
+ metrics["pearson"] = pearson
+ metrics["r2"] = pearson**2
+ metrics["spearman"] = float(spearmanr(pred, exp)[0])
+ metrics["kendall"] = float(kendalltau(pred, exp)[0])
+ return metrics
+
+
+def find_edges(protein_root: Path, water_root: Path):
+ """Find perturbation edges shared by the protein and water setup directories.
+
+ Expects the setupFEP layout (e.g. 2.protein/FEP_l1_l2 and 1.water/FEP_l1_l2). Returns a
+ sorted list of (name, protein_dir, water_dir).
+ """
+ protein_edges = {p.name: p for p in protein_root.glob("FEP_*") if p.is_dir()}
+ water_edges = {p.name: p for p in water_root.glob("FEP_*") if p.is_dir()}
+ shared = sorted(set(protein_edges) & set(water_edges))
+ return [(name, protein_edges[name], water_edges[name]) for name in shared]
+
+
+def parse_arguments() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(
+ prog="qligfep_neq_analyze",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ description="BAR analysis of non-equilibrium (NEQ) FEP switching work.",
+ )
+ parser.add_argument(
+ "-p",
+ "--protein-dir",
+ dest="protein_dir",
+ default="2.protein",
+ help=(
+ "Path to the directory containing the protein-leg FEP_* edges. "
+ "Defaults to `2.protein` in the current working directory."
+ ),
+ )
+ parser.add_argument(
+ "-w",
+ "--water-dir",
+ dest="water_dir",
+ default="1.water",
+ help=(
+ "Path to the directory containing the water-leg FEP_* edges. "
+ "Defaults to `1.water` in the current working directory."
+ ),
+ )
+ parser.add_argument(
+ "-pe",
+ "--protein-edge",
+ dest="protein_edge",
+ default=None,
+ help="Analyze a single edge: the protein-leg directory (used with --water-edge).",
+ )
+ parser.add_argument(
+ "-we",
+ "--water-edge",
+ dest="water_edge",
+ default=None,
+ help="Analyze a single edge: the water-leg directory (used with --protein-edge).",
+ )
+ parser.add_argument(
+ "-T",
+ "--temperature",
+ dest="temperature",
+ type=float,
+ default=298.0,
+ help="Temperature (K) used for the kcal/mol conversion (and beta when not kT). Defaults to 298.",
+ )
+ parser.add_argument(
+ "-u",
+ "--work-units",
+ dest="work_units",
+ default=DEFAULT_WORK_UNITS,
+ choices=["kT", "kcal", "kJ"],
+ help=(
+ "Units the switching work is assumed to be in for BAR. Defaults to `kcal` "
+ "(beta=1/kT, the physically consistent factor). `kT` uses beta=1, treating the "
+ "work as already in units of k_B*T."
+ ),
+ )
+ parser.add_argument(
+ "-nb",
+ "--n-bootstrap",
+ dest="n_bootstrap",
+ type=int,
+ default=1000,
+ help="Number of bootstrap resamples for the uncertainty estimate. Defaults to 1000.",
+ )
+ parser.add_argument(
+ "-o",
+ "--output",
+ dest="output",
+ default="neq_results.csv",
+ help="Output CSV file for the per-edge results. Defaults to `neq_results.csv`.",
+ )
+ parser.add_argument(
+ "-rs",
+ "--random_state",
+ dest="random_state",
+ type=int,
+ default=None,
+ help="Random state for the reproducible bootstrap. Defaults to None.",
+ )
+ parser.add_argument(
+ "-j",
+ "--json-file",
+ dest="json_file",
+ default=None,
+ help=(
+ "Mapping JSON used to run setupFEP (qlomap output). When given with --experimental-key, "
+ "the per-edge ddG is compared to experiment (correlation metrics + plot)."
+ ),
+ )
+ parser.add_argument(
+ "-exp",
+ "--experimental-key",
+ dest="experimental_key",
+ default=None,
+ help="Key in the mapping JSON edges holding the experimental ddG (e.g. `ddg_value`).",
+ )
+ parser.add_argument(
+ "-t",
+ "--target",
+ dest="target",
+ default="neq",
+ help="Target name used in the plot title and output file names. Defaults to `neq`.",
+ )
+ parser.add_argument(
+ "-norun",
+ "--no-run-data",
+ dest="no_run_data",
+ action="store_true",
+ help="Skip parsing slurm*.out run diagnostics (use when those files are absent).",
+ )
+ parser.add_argument(
+ "-log",
+ "--log-level",
+ dest="log",
+ default="info",
+ help="Set the log level for the logger. Defaults to `info`.",
+ choices=["trace", "debug", "info", "warning", "error", "critical"],
+ )
+ return parser.parse_args()
+
+
+def main(args: argparse.Namespace) -> pd.DataFrame:
+ setup_logger(level=args.log.upper())
+ beta = beta_from_units(args.work_units, args.temperature)
+ rng = np.random.default_rng(args.random_state)
+
+ if args.protein_edge and args.water_edge:
+ edges = [(Path(args.water_edge).name, Path(args.protein_edge), Path(args.water_edge))]
+ else:
+ edges = find_edges(Path(args.protein_dir), Path(args.water_dir))
+ if not edges:
+ raise FileNotFoundError(f"No shared FEP_* edges found in {args.protein_dir} and {args.water_dir}")
+
+ rows = [
+ analyze_edge(name, pdir, wdir, beta, args.work_units, args.temperature, args.n_bootstrap, rng)
+ for name, pdir, wdir in edges
+ ]
+ df = pd.DataFrame(rows)
+
+ if args.json_file and args.experimental_key:
+ df = compare_to_experiment(df, args.json_file, args.experimental_key, args.target)
+
+ df.to_csv(args.output, index=False)
+ logger.info(f"Wrote {len(df)} edge(s) to {args.output}\n{df.to_string(index=False)}")
+
+ if not args.no_run_data:
+ write_run_diagnostics(edges, args.output)
+ return df
+
+
+def compare_to_experiment(df: pd.DataFrame, mapping_json: str, exp_key: str, target: str) -> pd.DataFrame:
+ """Attach experimental ddG to the results, log correlation metrics, and save the ddG plot.
+
+ Returns the results frame with a ``ddG_exp`` column added.
+ """
+ experimental = load_experimental_ddG(mapping_json, exp_key)
+ df = df.assign(ddG_exp=df["edge"].map(experimental))
+ matched = df.dropna(subset=["ddG_exp"])
+ if matched.empty:
+ logger.warning("No analyzed edges matched the experimental mapping; skipping comparison.")
+ return df
+
+ metrics = correlation_metrics(matched["ddG_kcal"], matched["ddG_exp"])
+ logger.info(
+ f"Experimental comparison (n={metrics['n']}): R2={metrics['r2']:.3f} "
+ f"Pearson={metrics['pearson']:.3f} Spearman={metrics['spearman']:.3f} "
+ f"Kendall={metrics['kendall']:.3f} RMSE={metrics['rmse']:.3f} MAE={metrics['mae']:.3f}"
+ )
+
+ plot_df = pd.DataFrame(
+ {
+ "fep_name": matched["edge"].to_numpy(),
+ "Q_ddG_avg": matched["ddG_kcal"].to_numpy(),
+ "Q_ddG_sem": matched["ddG_err_kcal"].to_numpy(),
+ "ddg_value": matched["ddG_exp"].to_numpy(),
+ }
+ )
+ fig, _ = create_ddG_plot(plot_df, target_name=target)
+ plot_path = f"{target}_neq_ddG_plot.png"
+ fig.savefig(plot_path, dpi=300, bbox_inches="tight")
+ logger.info(f"Saved ddG plot to {plot_path}")
+ return df
+
+
+def write_run_diagnostics(edges, output: str) -> None:
+ """Collect per-replicate slurm run diagnostics for every edge/leg and write them next to
+ the results CSV (``