Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion fridata.py
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,8 @@ def create_parser():
add_common_arguments(verify_chains_parser)

create_archive_parser = subparsers.add_parser(
"create_archive", help="Create PDB compressed archive"
"create_archive",
help="Write one PDB .zip per H5 shard under data_path/archives/<dataset>/",
)
add_common_arguments(create_archive_parser)

Expand Down
26 changes: 26 additions & 0 deletions tests/test_archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from toolbox.scripts.archive import _shard_zip_name


def test_shard_zip_name_distinct_batches_same_basename():
data = "/data"
a = f"{data}/structures/PDB/subset_/fourth_7/0/pdbs.h5"
b = f"{data}/structures/PDB/subset_/fourth_7/1/pdbs.h5"
assert _shard_zip_name(a, data) != _shard_zip_name(b, data)
assert _shard_zip_name(a, data).endswith(".zip")
assert "0" in _shard_zip_name(a, data) or "pdbs" in _shard_zip_name(a, data)


def test_shard_zip_name_distinct_trees_same_batch_id():
data = "/data"
a = f"{data}/structures/PDB/subset_/fourth_7/0/pdbs.h5"
b = f"{data}/structures/PDB/subset_/other_slug/0/pdbs.h5"
assert _shard_zip_name(a, data) != _shard_zip_name(b, data)


def test_shard_zip_name_long_path_uses_hash():
data = "/data"
long_mid = "x" * 300
h5 = f"{data}/structures/{long_mid}/0/pdbs.h5"
name = _shard_zip_name(h5, data)
assert name.endswith(".zip")
assert len(name) < 80
2 changes: 1 addition & 1 deletion toolbox/models/embedding/embedding.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ def run(self):
create_index(self.embeddings_index_path, present_embeddings, self.structures_dataset.config.data_path)

end = time.time()
logger.info(f"Total time: {format_time(end - start)}")
logger.info(f"Total time: {format_time(end - start)}\n")


def missing_ids_to_fasta(self, missing_ids: List[str]) -> Dict[str, str]:
Expand Down
66 changes: 36 additions & 30 deletions toolbox/scripts/archive.py
Original file line number Diff line number Diff line change
@@ -1,61 +1,67 @@
from toolbox.models.manage_dataset.index.handle_index import read_index
import datetime
import os
import hashlib
import zipfile
from pathlib import Path

from dask.distributed import as_completed
from tqdm import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm

from toolbox.models.manage_dataset.index.handle_index import read_index
from toolbox.models.manage_dataset.utils import read_all_pdbs_from_h5
from toolbox.utlis.logging import logger


def process_h5_file(h5_file, dataset_path, output_dir):
full_h5_file_path = Path(dataset_path) / h5_file
prots = read_all_pdbs_from_h5(full_h5_file_path)
archive_name = os.path.basename(h5_file).replace(".h5", ".zip")
def _shard_zip_name(h5_file: str, data_path_str: str) -> str:
"""Stable unique .zip basename per H5 shard (avoids .../N/pdbs.h5 → pdbs.zip collisions)."""
h5_posix = Path(h5_file).as_posix()
root = Path(data_path_str).as_posix().rstrip("/")
if root and h5_posix.startswith(root + "/"):
rel = h5_posix.removeprefix(root + "/")
else:
rel = h5_posix
stem = rel.removesuffix(".h5").replace("/", "__")
if len(stem) > 200:
stem = hashlib.sha256(rel.encode()).hexdigest()[:24]
return f"{stem}.zip"


def process_h5_file(h5_file, data_path_str, output_dir):
h5_path = Path(h5_file)
prots = read_all_pdbs_from_h5(str(h5_path))
archive_name = _shard_zip_name(h5_file, data_path_str)
archive_path = Path(output_dir) / archive_name

with zipfile.ZipFile(archive_path, "w") as zipf:
for p, pdb_file_content in prots.items():
code = p.removesuffix(".pdb")
zipf.writestr(f"{code}.pdb", pdb_file_content)

os.system(f"tar -czf {str(archive_path)}.tgz {str(archive_path)}")

return str(archive_path)


def create_archive(structures_dataset: "StructuresDataset"):
data_path = structures_dataset.config.data_path
dataset_path = structures_dataset.dataset_path()
proteins_index = read_index(Path(dataset_path) / "dataset_reversed.idx", structures_dataset.config.data_path)
output_dir = Path(dataset_path) / "archives"
output_dir.mkdir(exist_ok=True)
proteins_index = read_index(
Path(dataset_path) / "dataset_reversed.idx", data_path
)
output_dir = Path(data_path) / "archives" / structures_dataset.dataset_dir_name()
output_dir.mkdir(parents=True, exist_ok=True)

client = structures_dataset._client

futures = []
for h5_file in proteins_index.keys():
future = client.submit(process_h5_file, h5_file, dataset_path, output_dir)
future = client.submit(
process_h5_file, h5_file, data_path, output_dir
)
futures.append(future)

n = len(futures)
logger.info("Building combined PDB archive from %s H5 shard(s)", n)

current_time = datetime.datetime.now().strftime("%Y%m%d%H%M%S")
final_archive_name = f"archive_pdb_{structures_dataset.dataset_dir_name()}_{current_time}.zip"
final_archive_path = Path.cwd() / final_archive_name

with zipfile.ZipFile(final_archive_path, "w") as final_zip:
with logging_redirect_tqdm():
with tqdm(total=n, desc="H5 shards → final zip", unit="h5") as pbar:
i = 0
for fut in as_completed(futures):
archive_path = fut.result()
with open(archive_path, "rb") as f:
archive_data = f.read()
final_zip.writestr(f"{i}.zip", archive_data)
i += 1
pbar.update(1)
logger.info("Writing %s PDB shard zip(s) under %s", n, output_dir)

with logging_redirect_tqdm():
with tqdm(total=n, desc="H5 shards → zip", unit="h5") as pbar:
for fut in as_completed(futures):
fut.result()
pbar.update(1)
Loading