Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 96 additions & 0 deletions .github/scripts/seqopt_deap_comparison.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
"""Phase-C comparison: SeqOpt's pure-Python NSGA-II selection core vs the DEAP reference.

Benchmarks the NSGA-II survival selection (fast non-dominated sort + crowding + selNSGA2) of
three implementations across a grid of population size x objective count:

* ours (engine="exact") — pure-Python, RNG-matched to DEAP
* ours (engine="fast") — numpy-vectorized non-dominated sort (identical result)
* DEAP (tools.selNSGA2) — the reference oracle (dev/test-only dependency)

Reports correctness (survivor rank/crowding profile identical to DEAP) plus wall-clock and
peak memory, so the maintainer can make the ship-ours-vs-depend-on-DEAP call from data. The
shipped runtime never imports DEAP; it is needed only to run this script.

Usage: python .github/scripts/seqopt_deap_comparison.py
"""
import time
import tracemalloc
import numpy as np

from aaanalysis.protein_design_pro._backend.seqopt.nsga2 import (
fast_non_dominated_sort, crowding_distance, select_nsga2, select_nsga2_engine)

try:
from deap import base, creator, tools
except ImportError: # pragma: no cover
raise SystemExit("This comparison needs the dev-only 'deap' package: pip install deap")

if not hasattr(creator, "_CmpFitMax"):
creator.create("_CmpFitMax", base.Fitness, weights=(1.0,))
if not hasattr(creator, "_CmpInd"):
creator.create("_CmpInd", list, fitness=creator._CmpFitMax)

GRID = [(50, 2), (100, 2), (200, 2), (200, 3), (500, 3)]
REPEATS = 5


def deap_select(W, mu):
creator._CmpFitMax.weights = (1.0,) * W.shape[1]
inds = []
for i, row in enumerate(W):
ind = creator._CmpInd(row.tolist())
ind.fitness.values = tuple(float(v) for v in row)
ind.idx = i
inds.append(ind)
chosen = tools.selNSGA2(inds, mu)
return {ind.idx for ind in chosen}


def profile(W, idxs):
_, rank = fast_non_dominated_sort(W)
crowd = np.zeros(len(W))
for front in fast_non_dominated_sort(W)[0]:
d = crowding_distance(W, front)
for j, member in enumerate(front):
crowd[member] = d[j]
return sorted((int(rank[i]), round(float(crowd[i]), 9)) for i in idxs)


def timed(fn, repeats):
tracemalloc.start()
t0 = time.perf_counter()
for _ in range(repeats):
fn()
dt = (time.perf_counter() - t0) / repeats
peak = tracemalloc.get_traced_memory()[1]
tracemalloc.stop()
return dt * 1e3, peak / 1024.0 # ms, KiB


def main():
print(f"{'n x m':>10} | {'impl':<12} | {'ms/call':>9} | {'peak KiB':>9} | correct")
print("-" * 60)
for n, m in GRID:
rng = np.random.default_rng(n * 10 + m)
W = rng.random((n, m))
mu = n // 2
ref = profile(W, deap_select(W, mu))
runs = {
"ours-exact": lambda: select_nsga2(W, mu),
"ours-fast": lambda: select_nsga2_engine(W, mu, engine="fast"),
"deap": lambda: deap_select(W, mu),
}
sels = {
"ours-exact": set(select_nsga2(W, mu)[0]),
"ours-fast": set(select_nsga2_engine(W, mu, engine="fast")[0]),
"deap": deap_select(W, mu),
}
for name, fn in runs.items():
ms, kib = timed(fn, REPEATS)
ok = "OK" if profile(W, sels[name]) == ref else "DIFF"
print(f"{f'{n}x{m}':>10} | {name:<12} | {ms:9.3f} | {kib:9.1f} | {ok}")
print("-" * 60)


if __name__ == "__main__":
main()
16 changes: 14 additions & 2 deletions CONTEXT.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,8 @@ _Avoid_: interaction (overloaded with the relational/PPI [[relational / interact
### SeqOpt directed-evolution vocabulary

**SeqOpt**:
The **search/optimization** layer of [[design / engineering]] (**[pro]**, `aaanalysis/protein_design_pro/`), the counterpart to [[SeqMut]]'s scoring: it *searches* over sequence variants of **one wild-type** for those that best satisfy several objectives at once. A `Tool` (`run` → [[Pareto front]] `df_pareto`, `eval` → Pareto-quality metrics), paired with `SeqOptPlot`. Reuses model-bound [[SeqMut]] as the fitness engine and `ShapModel` for residue guidance, so it is `pro` (imports SHAP) even though [[SeqMut]] stays core. Two modes — `"impact"` (SHAP-guided, adaptive) and `"importance"` (feature-importance-guided, greedy) — see [[guidance mode (impact / importance)]]. It realizes the search that [[SeqMut]] and [[combined variant]] defer to.
_Avoid_: optimizer (overloaded with numerical/perf optimization — this is evolutionary *search*); generator, sampler.
The **search/optimization** layer of [[design / engineering]] (**[pro]**, `aaanalysis/protein_design_pro/`), the counterpart to [[SeqMut]]'s scoring: it *searches* over sequence variants of **one wild-type** for those that best satisfy several objectives at once. This is **protein engineering** — machine-learning-guided **directed evolution** of an *existing* sequence (Yang et al. 2019; Wittmann et al. 2021) — explicitly **not** *de novo protein design* (building new proteins from the ground up, e.g. RFdiffusion→ProteinMPNN→AlphaFold; Yang et al. 2026), which is out of scope. A `Tool` (`run` → [[Pareto front]] `df_pareto`, `eval` → Pareto-quality metrics), paired with `SeqOptPlot`. Reuses model-bound [[SeqMut]] as the fitness engine and `ShapModel` for residue guidance, so it is `pro` (imports SHAP) even though [[SeqMut]] stays core. Two modes — `"impact"` (SHAP-guided, adaptive) and `"importance"` (feature-importance-guided, greedy) — see [[guidance mode (impact / importance)]]. It realizes the search that [[SeqMut]] and [[combined variant]] defer to.
_Avoid_: optimizer (overloaded with numerical/perf optimization — this is evolutionary *search*); generator, sampler; **de novo design / protein design** (SeqOpt does protein *engineering* / directed evolution, not generation of new proteins).

**population** / **generation**:
A **population** is the set of candidate **variants** (each a mutation-set on the one wild-type, carrying its [[prediction shift]] fitness and a per-residue importance map) that [[SeqOpt]] evolves; a **generation** (`generation`) is one evolve-score-select round. Standard NSGA-II vocabulary, claimed here from the reservation the [[design / engineering]] entry held for it.
Expand All @@ -155,6 +155,18 @@ _Avoid_: strategy, policy.
How generated variants — which have no ground-truth label — enter the per-generation `ShapModel` refit: each variant's own model **prediction score** in `[0, 1]` becomes its **soft label** (`ShapModel.fit(fuzzy_labeling=True)`), explained against the original balanced 0/1 reference set. This is what lets SHAP attribution track the moving [[population]] (the `mode="impact"` engine). The shipped `ShapModel` fuzzy-labeling path, applied to directed evolution.
_Avoid_: pseudo-labeling (no hard threshold is assigned — the label *is* the continuous score), self-training.

**evolutionary operators** (in [[SeqOpt]]):
The DEAP-mapped algorithm families [[SeqOpt]] re-implements in **pure Python** (DEAP is a dev/test-only parity oracle, never a runtime dependency): **crossover** (uniform / one-point / two-point over mutation-sets), **mutation** (substitution / shift), **variation** (`varAnd` = crossover *and* mutation, `varOr` = one of crossover / mutation / reproduction), **survival** (`mu_plus_lambda` elitist / `mu_comma_lambda` / `ea_simple` generational), **constraints** (feasibility callables penalized DeltaPenalty- or ClosestValidPenalty-style), and the single-objective **Hall of Fame** (`SeqOpt.hall_of_fame_`) alongside the [[Pareto front]] archive. Counterpart to the multi-objective [[non-dominated rank]] / [[crowding distance]] selection core.
_Avoid_: GA primitive (these are the EA layer); DEAP operator (ours is an independent re-implementation).

**engine (exact / fast)** (in [[SeqOpt]]):
The NSGA-II sort/selection kernel. `engine="exact"` is the pure-Python path whose RNG stream and crowding formula (`nobj·span` normalization) are matched to the **DEAP reference**, so on a fixed seed it reproduces DEAP's [[non-dominated rank]] + [[crowding distance]] ordering; `engine="fast"` vectorizes the O(n²) non-dominated sort with numpy and yields a **numerically identical** front (verified faster than DEAP). The orthogonal axis to [[guidance mode (impact / importance)]].
_Avoid_: byte-exact (the bar is equivalence — identical front membership + ordering, values within tolerance — not byte-identical serialization).

**convergence** (`SeqOpt.eval`):
The optional generational-distance metric: the mean range-normalized distance from each [[Pareto front]] point to its nearest point on a user-supplied **reference front** (`ref_front`); lower = closer to the target. Joins `hypervolume` and `spread` in `df_eval` only when a reference is given.
_Avoid_: accuracy (this is a set-to-set distance, not a classification score).

### Multi-class & regression labeling vocabulary

Helpers on `SequenceFeature` that turn a multi-class or continuous target into the
Expand Down
6 changes: 5 additions & 1 deletion aaanalysis/_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ def _folder_path(super_folder, folder_name):
COL_HYPERVOLUME = "hypervolume" # SeqOpt.eval — objective-space volume dominated by the front
COL_N_FRONT = "n_front" # SeqOpt.eval — number of variants on the first (rank=0) front
COL_SPREAD = "spread" # SeqOpt.eval — objective-space diversity of the front
COL_CONVERGENCE = "convergence" # SeqOpt.eval — generational distance to a reference front
# Fixed lower-bound columns of df_pareto (one column per objective is inserted between
# COL_SEQ_MUT and COL_RANK at run time; COL_RANK defined in the eval block below).
COLS_PARETO_BASE = [COL_ENTRY, COL_VARIANT, COL_N_MUT, COL_SEQ_MUT]
Expand All @@ -304,7 +305,10 @@ def _folder_path(super_folder, folder_name):
LIST_SEQOPT_ALGORITHMS = ["nsga2", "greedy"] # population NSGA-II | importance-ordered greedy walk
LIST_SEQOPT_CROSSOVER = ["uniform", "one_point", "two_point"]
LIST_SEQOPT_MUTATION = ["substitution", "shift"]
LIST_SEQOPT_SURVIVAL = ["mu_plus_lambda", "mu_comma_lambda"]
LIST_SEQOPT_VARIATION = ["and", "or"] # varAnd (crossover AND mutation) | varOr (one of)
LIST_SEQOPT_SURVIVAL = ["mu_plus_lambda", "mu_comma_lambda", "ea_simple"]
LIST_SEQOPT_PENALTY = ["delta", "closest_valid"] # DeltaPenalty | ClosestValidPenalty semantics
LIST_SEQOPT_ENGINE = ["exact", "fast"] # pure-Python RNG-matched | numpy-vectorized
LIST_SEQOPT_INIT = ["random", "suggest"] # random seeding | warm-start from SeqMut.suggest
LIST_OBJECTIVE_GOALS = ["max", "min"]
# Built-in objective sources (a callable(df_variant)->array is also accepted at run time).
Expand Down
18 changes: 18 additions & 0 deletions aaanalysis/protein_design_pro/_backend/seqopt/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,24 @@ def hypervolume(W, ref: Optional[np.ndarray] = None) -> float:
return float(box * dominated.mean())


def convergence(W, ref_front) -> float:
"""Generational distance from a front to a reference front (lower = closer/converged).

Mean over the front of the minimum range-normalized Euclidean distance to a reference
point; ``inf`` when either front is empty.
"""
W = np.asarray(W, dtype=float)
R = np.asarray(ref_front, dtype=float)
if W.size == 0 or R.size == 0:
return float("inf")
both = np.vstack([W, R])
rng = both.max(axis=0) - both.min(axis=0)
rng[rng == 0] = 1.0
Wn, Rn = W / rng, R / rng
dists = np.sqrt(((Wn[:, None, :] - Rn[None, :, :]) ** 2).sum(axis=2))
return float(dists.min(axis=1).mean())


def spread(W) -> float:
"""Objective-space diversity of a front: mean pairwise Euclidean distance (0 if <2 points)."""
W = np.asarray(W, dtype=float)
Expand Down
107 changes: 104 additions & 3 deletions aaanalysis/protein_design_pro/_backend/seqopt/nsga2.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,15 +114,17 @@ def crowding_distance(W, front) -> np.ndarray:
for k in range(m):
order = np.argsort(sub[:, k], kind="mergesort") # stable, deterministic
lo, hi = sub[order[0], k], sub[order[-1], k]
span = hi - lo
# DEAP normalizes each objective by ``nobj * (max - min)`` (assignCrowdingDist), so the
# crowding values are byte-comparable with the reference oracle, not just rank-equal.
norm = m * (hi - lo)
dist[order[0]] = np.inf
dist[order[-1]] = np.inf
if span == 0:
if norm == 0:
continue
for r in range(1, n - 1):
prev_v = sub[order[r - 1], k]
next_v = sub[order[r + 1], k]
dist[order[r]] += (next_v - prev_v) / span
dist[order[r]] += (next_v - prev_v) / norm
return dist


Expand Down Expand Up @@ -206,3 +208,102 @@ def select_nsga2(W, mu) -> Tuple[List[int], np.ndarray, np.ndarray]:
survivors.extend(order[:remaining])
break
return survivors, rank, crowding


# III Fast (vectorized) engine — numerically identical fronts, numpy-vectorized for speed
def _dominance_matrix(W, max_cells=4_000_000):
"""Boolean ``dom[i, j] = i dominates j`` via row-chunks (caps the (block, n, m) transient).

Computing the full ``(n, n, m)`` broadcast at once costs O(n^2 m) transient memory; chunking
the first axis keeps it at O(block * n * m) while the stored result is the lean ``(n, n)``
boolean matrix. ``max_cells`` bounds the transient (block adapts to n * m).
"""
W = np.asarray(W, dtype=float)
n, m = W.shape
dom = np.zeros((n, n), dtype=bool)
block = max(1, min(n, int(max_cells // max(n * m, 1))))
for s in range(0, n, block):
e = min(s + block, n)
sub = W[s:e]
ge = (sub[:, None, :] >= W[None, :, :]).all(axis=2) # (block, n)
gt = (sub[:, None, :] > W[None, :, :]).any(axis=2)
dom[s:e] = ge & gt
return dom


def fast_non_dominated_sort_vec(W) -> Tuple[List[List[int]], np.ndarray]:
"""Vectorized fast non-dominated sort (``engine='fast'``).

Produces the **same** fronts and ranks as :func:`fast_non_dominated_sort` (same dominance
relation, same ascending-index order within a front), computing the pairwise dominance
matrix with numpy in **row-chunks** (memory-bounded) instead of a Python double loop.
"""
W = np.asarray(W, dtype=float)
n = W.shape[0]
if n == 0:
return [], np.zeros(0, dtype=int)
dom = _dominance_matrix(W) # dom[i, j] == True <=> i dominates j
remaining = dom.sum(axis=0) # how many points dominate j
rank = np.zeros(n, dtype=int)
placed = np.zeros(n, dtype=bool)
fronts: List[List[int]] = []
current = np.where(remaining == 0)[0]
f = 0
while current.size:
rank[current] = f
placed[current] = True
fronts.append(sorted(current.tolist()))
remaining = remaining - dom[current].sum(axis=0)
current = np.where((remaining == 0) & (~placed))[0]
f += 1
return fronts, rank


def rank_and_crowding(W, engine="exact") -> Tuple[np.ndarray, np.ndarray]:
"""Assign rank + crowding to every row, via the exact or the fast (vectorized) sort.

Both engines return identical ``rank`` and ``crowding`` (the fast path only vectorizes the
O(n^2) dominance scan); ``engine='fast'`` is a speed option, not a different result.
"""
W = np.asarray(W, dtype=float)
n = W.shape[0]
if engine == ut.LIST_SEQOPT_ENGINE[1]: # "fast"
fronts, rank = fast_non_dominated_sort_vec(W)
else: # "exact"
fronts, rank = fast_non_dominated_sort(W)
crowding = np.zeros(n, dtype=float)
for front in fronts:
d = crowding_distance(W, front)
for idx, member in enumerate(front):
crowding[member] = d[idx]
return rank, crowding


def select_nsga2_engine(W, mu, engine="exact") -> Tuple[List[int], np.ndarray, np.ndarray]:
"""Engine-aware NSGA-II survival selection (exact or vectorized sort, identical result).

The fast path fills survivors **front-by-front in the same order** as :func:`select_nsga2`
(ascending index within a front, partial front by descending crowding) so the survivor
list -- not just its set -- is identical to the exact engine; population order drives the
downstream RNG, so any order difference would otherwise cascade into different offspring.
"""
if engine != ut.LIST_SEQOPT_ENGINE[1]: # "exact"
return select_nsga2(W, mu)
W = np.asarray(W, dtype=float)
n = W.shape[0]
fronts, rank = fast_non_dominated_sort_vec(W)
crowding = np.zeros(n, dtype=float)
for front in fronts:
d = crowding_distance(W, front)
for idx, member in enumerate(front):
crowding[member] = d[idx]
survivors: List[int] = []
for front in fronts:
if len(survivors) + len(front) <= mu:
survivors.extend(front)
else:
remaining = mu - len(survivors)
order = sorted(front, key=lambda i: (-crowding[i], i))
survivors.extend(order[:remaining])
break
return survivors, rank, crowding
66 changes: 66 additions & 0 deletions aaanalysis/protein_design_pro/_backend/seqopt/penalty.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
This is a script for the backend of SeqOpt constraint handling: it penalizes the objective
rows of infeasible variants so the search avoids them, mirroring DEAP's ``DeltaPenalty``
(fixed worst objective for any infeasible individual) and ``ClosestValidPenalty`` (penalty
scaled by how many constraints are violated). Pure-Python / numpy.
"""
from typing import Callable, Dict, List
import numpy as np

import aaanalysis.utils as ut


# I Helper Functions
def constraint_violation(genome: Dict[int, str], constraints: List[Callable]) -> int:
"""Count violated feasibility constraints (0 = feasible). Each callable maps genome->bool."""
return sum(1 for c in constraints if not bool(c(genome)))


# II Main Functions
def apply_penalty(F, genomes, constraints, goals, penalty="delta"):
"""Degrade the objective rows of infeasible variants so they are dominated.

Parameters
----------
F : array-like, shape (n, m)
Raw objective matrix (n variants, m objectives), in each objective's own goal sense.
genomes : list of dict
The variants aligned to ``F`` rows.
constraints : list of callable
Feasibility predicates ``genome -> bool`` (True = feasible).
goals : list of str
Per-objective ``"max"`` / ``"min"``.
penalty : str, default="delta"
``"delta"`` pushes every infeasible variant to a single worst value per objective;
``"closest_valid"`` scales the push by the number of violated constraints (the variant
that is "less infeasible" is penalized less).

Returns
-------
F_pen : np.ndarray, shape (n, m)
Copy of ``F`` with infeasible rows penalized (feasible rows untouched). When every
variant is infeasible the relative ``closest_valid`` ordering is still meaningful.
"""
if not constraints:
return np.asarray(F, dtype=float)
F = np.array(F, dtype=float, copy=True)
n, m = F.shape
violations = np.array([constraint_violation(g, constraints) for g in genomes], dtype=int)
feasible = violations == 0
ref = F[feasible] if feasible.any() else F
for c, goal in enumerate(goals):
col = F[:, c]
span = float(np.ptp(col)) or 1.0
if goal == ut.LIST_OBJECTIVE_GOALS[0]: # "max" — worse is smaller
base = float(ref[:, c].min())
step = span
for r in np.where(~feasible)[0]:
scale = violations[r] if penalty == ut.LIST_SEQOPT_PENALTY[1] else 1
F[r, c] = base - step * scale
else: # "min" — worse is larger
base = float(ref[:, c].max())
step = span
for r in np.where(~feasible)[0]:
scale = violations[r] if penalty == ut.LIST_SEQOPT_PENALTY[1] else 1
F[r, c] = base + step * scale
return F
Loading
Loading