diff --git a/.gitignore b/.gitignore index 818e20a..c000fa2 100644 --- a/.gitignore +++ b/.gitignore @@ -119,6 +119,9 @@ ipython_config.py # https://pdm.fming.dev/#use-with-ide .pdm.toml +# uv +.python-version + # PEP 582; used by e.g. github.com/David-OConnor/pyflow and github.com/pdm-project/pdm __pypackages__/ diff --git a/.python-version b/.python-version deleted file mode 100644 index 24ee5b1..0000000 --- a/.python-version +++ /dev/null @@ -1 +0,0 @@ -3.13 diff --git a/pyproject.toml b/pyproject.toml index 2d89315..7d4654e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,9 +3,10 @@ name = "kanta-lab-preprocessing" version = "0.1.0" description = "Add your description here" readme = "README.md" -requires-python = ">=3.13" +requires-python = ">=3.12" dependencies = [ "pandas>=3.0.2", + "polars>=1.40.0", ] [dependency-groups] @@ -14,3 +15,10 @@ dev = [ "pytest>=9.0.3", "ruff>=0.15.10", ] + +[build-system] +requires = ["uv_build>=0.11.14,<0.12.0"] +build-backend = "uv_build" + +[tool.uv.build-backend] +module-name = "kanta" diff --git a/src/kanta/intake/__init__.py b/src/kanta/intake/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/kanta/intake/__main__.py b/src/kanta/intake/__main__.py new file mode 100644 index 0000000..6bd162d --- /dev/null +++ b/src/kanta/intake/__main__.py @@ -0,0 +1,65 @@ +if __name__ == "__main__": + from argparse import ArgumentParser + from datetime import date + from pathlib import Path + + from kanta.intake import assemble + from kanta.intake import tidyup + + parser = ArgumentParser() + + parser.add_argument( + "--source-list-file", + required=True, + type=Path, + help="File containing pair of paths to main & freetext data, one pair per line (TSV without header).", + ) + parser.add_argument( + "--phenotype-file", + help="Path to phenotype file with FINNGENID and SEX columns (.txt.gz)", + required=True, + type=Path, + ) + parser.add_argument( + "--output-dir", + help="Path to write the output files", + required=True, + type=Path, + ) + parser.add_argument( + "--partition-n-buckets", + help="How many buckets to partition the data into to spread the sort+dedup computations.", + required=False, + type=int, + default=24, + ) + parser.add_argument( + "--debug", + help="Increase verbosity and keep intermediate files", + required=False, + action="store_true", + ) + + args = parser.parse_args() + + # Assemble stage + output_file_assemble_stage = ( + args.output_dir + / f"finngen_R14_kanta_laboratory_responses.assemble-stage.{date.today()}.parquet" + ) + post_assemble_file = assemble.main( + args.source_list_file, output_file_assemble_stage + ) + + # Tidy-up stage + output_file_tidyup_stage = ( + args.output_dir + / f"finngen_R14_kanta_laboratory_responses_internal_1.0_{date.today()}.parquet" + ) + tidyup.main( + output_file_assemble_stage, + args.phenotype_file, + output_file_tidyup_stage, + partition_n_buckets=args.partition_n_buckets, + keep_intermediate_files=args.debug, + ) diff --git a/src/kanta/intake/assemble.py b/src/kanta/intake/assemble.py new file mode 100644 index 0000000..65ff63e --- /dev/null +++ b/src/kanta/intake/assemble.py @@ -0,0 +1,261 @@ +""" +Merges the incoming Kanta Lab data from THL into one coherent file. + + +Differences from the WDL implementation +======================================= +- Uses CSV-aware parsing, robust to edge cases like new-line character inside + CSV values. +""" + +import gzip +from argparse import ArgumentParser +from itertools import zip_longest +from pathlib import Path + +import polars as pl + + +EXPECTED_COLUMNS_MAIN = [ + "FINNGENID", + "EVENT_AGE", + "APPROX_EVENT_DAY", + "TIME", + "asiakirjaoid_pseudo", + "merkintaoid_pseudo", + "entryoid_pseudo", + "load_id_pseudo", + "file_name_pseudo", + "laboratoriotutkimusoid", + "laboratoriotutkimusnimike", + "paikallinentutkimusnimike_koodi", + "paikallinentutkimusnimike_selite", + "tutkimuskoodistonjarjestelma", + "tiedonlahde", + "tutkimusvastauksentila", + "tutkimustulosarvo", + "tutkimustulosyksikko", + "tutkimuksennaytelaatu", + "tutkimuksentekotapa", + "tuloksenpoikkeavuus", + "viitearvoryhma", + "viitevalialkuarvo", + "viitevalialkuyksikko", + "viitevaliloppuarvo", + "viitevaliloppuyksikko", +] + +EXPECTED_COLUMNS_FREETEXT = [ + "FINNGENID", + "EVENT_AGE", + "APPROX_EVENT_DAY", + "TIME", + "asiakirjaoid_pseudo", + "merkintaoid_pseudo", + "entryoid_pseudo", + "load_id_pseudo", + "file_name_pseudo", + "tutkimustulosteksti", +] + +COL_PREFIX_MAIN = "main." +COL_PREFIX_FREETEXT = "freetext." + + +def main(source_list_file: Path, output_file: Path) -> Path: + print() + print("=== ASSEMBLE STAGE ===") + pairs = validate_input_pairs(source_list_file) + + print("# Merge by pair") + merge_by_pair(pairs, output_file) + + print("# Checking merge consistency") + is_consistent = check_merge_consistency(output_file) + print("All good." if is_consistent else "!!! Inconsitent merge !!!") + + +def validate_input_pairs( + source_list_file: Path, *, separator="\t" +) -> list[tuple[Path, Path]]: + pairs = [] + with open(source_list_file) as fp: + for line in fp: + values = line.split(separator, maxsplit=2) + + main = validate_tsv_gz(values[0], source_list_file.parent) + freetext = validate_tsv_gz(values[1], source_list_file.parent) + + pairs.append((main, freetext)) + + for main, freetext in pairs: + check_columns(main, EXPECTED_COLUMNS_MAIN, "main") + check_columns(freetext, EXPECTED_COLUMNS_FREETEXT, "freetext") + + return pairs + + +def merge_by_pair(pairs: list[tuple[Path, Path]], parquet_output: str | Path) -> None: + to_concat = [] + for path_main, path_freetext in pairs: + print(f"Adding horizontal merge: {path_main} & {path_freetext}") + + df_main = ( + pl.scan_csv( + path_main, + infer_schema=False, + separator="\t", + row_index_name="_rowid", + row_index_offset=1, + ) + .with_columns(pl.lit(path_main.name).alias("_filename")) + .select(pl.all().name.prefix(COL_PREFIX_MAIN)) + ) + + df_freetext = ( + pl.scan_csv( + path_freetext, + infer_schema=False, + separator="\t", + row_index_name="_rowid", + row_index_offset=1, + ) + .with_columns(pl.lit(path_freetext.name).alias("_filename")) + .select(pl.all().name.prefix(COL_PREFIX_FREETEXT)) + ) + + df_merged = pl.concat([df_main, df_freetext], how="horizontal") + + to_concat.append(df_merged) + + ( + pl.concat(to_concat) + .with_row_index(name="_rowid_source", offset=1) + .pipe(reorder_columns) + .sink_parquet(parquet_output) + ) + + +def reorder_columns(frame: pl.LazyFame | pl.DataFrame) -> pl.LazyFrame | pl.DataFrame: + column_order = ( + ["_rowid_source"] + # Columns for main + + [COL_PREFIX_MAIN + "_rowid", COL_PREFIX_MAIN + "_filename"] + + [COL_PREFIX_MAIN + cc for cc in EXPECTED_COLUMNS_MAIN] + # Columns for freetext + + [COL_PREFIX_FREETEXT + "_rowid", COL_PREFIX_FREETEXT + "_filename"] + + [COL_PREFIX_FREETEXT + cc for cc in EXPECTED_COLUMNS_FREETEXT] + ) + return frame.select(column_order) + + +def check_merge_consistency(data_path: str | Path) -> bool: + # First check: all shared columns have the same values + shared_cols = set(EXPECTED_COLUMNS_MAIN).intersection(EXPECTED_COLUMNS_FREETEXT) + + check_shared_columns_same_values = ( + pl.scan_parquet(data_path) + .select( + pl.all_horizontal( + pl.col(COL_PREFIX_MAIN + cc) == pl.col(COL_PREFIX_FREETEXT + cc) + for cc in shared_cols + ).all() + ) + .collect(engine="streaming") + .item() + ) + + assert check_shared_columns_same_values + + # Second check: main and freetext have same height. + # This is done by checking the absence of null in _rowid, which happens iif + # the main and freetext data are of different height. + check_same_height = ( + pl.scan_parquet(data_path) + .select( + pl.all_horizontal(pl.selectors.ends_with("._rowid").is_not_null().all()) + ) + .collect(engine="streaming") + .item() + ) + + assert check_same_height + + return check_shared_columns_same_values and check_same_height + + +def validate_tsv_gz(filename: str, in_dir: Path) -> Path: + """Check if path exists and is a proper TSV & gz""" + full_path = (in_dir / filename.strip()).resolve() + + if not full_path.exists(): + raise FileNotFoundError(f"File does not exist: {full_path}") + + # Check it's readable as a gzip file + try: + with gzip.open(full_path, "rt", encoding="utf-8") as ff: + first_line = ff.readline() + except OSError as ee: + raise ValueError(f"File is not a valid gzip: {full_path}") from ee + + # Check it's actual TSV + if "\t" not in first_line: + raise ValueError( + f"File does not appear to be TSV (no \\t on first line): {full_path}" + ) + + return full_path + + +def check_columns(file_path: Path, expected_columns: list[str], label: str) -> None: + actual_columns = get_columns(file_path) + + if actual_columns != expected_columns: + if len(actual_columns) == 0: + raise Exception(f"No columns in {file_path}") + + if len(expected_columns) == 0: + raise Exception( + f"Misconfigured expected columns ({label}): no columns listed" + ) + + if set(actual_columns) != set(expected_columns): + message = f"Columns differ for {label}:\n" + message += f"Only in expected columns: {list(set(expected_columns) - set(actual_columns))}\n" + message += f"Only in actual columns: {list(set(actual_columns) - set(expected_columns))}" + raise Exception(message) + + # Else it's the same columns but in different order + message = "Column order differ:\n" + for col_expected, col_actual in zip_longest(expected_columns, actual_columns): + comp = "==" if col_expected == col_actual else "=!=/!\\=!=" + message += f"{col_expected} {comp} {col_actual}\n" + raise Exception(message) + + +def get_columns(input_path: Path) -> list[str]: + # We checked that the file is a proper TSV gz beforehand, so we now explicitely specify the separator + df = pl.read_csv( + input_path, has_header=True, separator="\t", infer_schema=False, n_rows=0 + ) + return df.columns + + +if __name__ == "__main__": + parser = ArgumentParser() + parser.add_argument( + "--source-list-file", + required=True, + type=Path, + help="File containing pair of paths to main & freetext data, one pair per line (TSV without header).", + ) + parser.add_argument( + "--output-file", + required=True, + type=Path, + help="Path to output the intermediary file from this stage.", + ) + + args = parser.parse_args() + + main(args.source_list_file, args.output_file) diff --git a/src/kanta/intake/tidyup.py b/src/kanta/intake/tidyup.py new file mode 100644 index 0000000..2f7ca0d --- /dev/null +++ b/src/kanta/intake/tidyup.py @@ -0,0 +1,271 @@ +""" +Tidy-up the raw data into a subset of necessary column, and apply sorting +and deduplication. + + +Differences from the WDL implementation +======================================= +- No logging of duplicates/err lines. +- Outputs to a single parquet file, no .txt.gz, as this is very slow. + + +VM choice and performance +========================= +Best config: 32 CPUs / 32 GB RAM and use 24 buckets. Runs in 2-3 min. + +For lower specs, run with 16 or 8 CPUs and allocate 2 GB RAM per CPU, use 24 +buckets. Runs in 5-8 min. + +Lowest tested working spec: 8 CPUs / 8 GB RAM, 32 buckets. Runs in 6-12 min. + +If failing due to OOM in the sort+dedup stage, try increasing the bucket count. + +The GCP VM type appears to matter. N2D is about 2x faster than E2. +""" + +import tempfile +import shutil +from argparse import ArgumentParser +from pathlib import Path + +import polars as pl + + +COLUMNS_UNIQUENESS_SORT = [ + "FINNGENID", + "APPROX_EVENT_DAY", + "TIME", + "laboratoriotutkimusnimike", + "paikallinentutkimusnimike_koodi", + "tutkimusvastauksentila", + "tutkimustulosarvo", + "tutkimustulosyksikko", +] + + +def main( + assembled_file: Path, + phenotype_file: Path, + output_file: Path, + *, + partition_n_buckets: int, + keep_intermediate_files: bool, +): + # Set up output file and temporary directory for intermediate files + tmp_dir = Path(tempfile.mkdtemp()) + + print() + print("=== TIDY-UP STAGE ===") + print("# Run info") + print(f"- Partition into N buckets: {partition_n_buckets}") + print(f"- Directory for intermediate files: {tmp_dir}") + print(f"- Output file: {output_file}") + + tmp_file_consolidate = tmp_dir / "consolidated.parquet" + + tmp_dir_partition = tmp_dir / "partition" + tmp_dir_partition.mkdir() + + tmp_dir_sort_dedup = tmp_dir / "sort_dedup" + tmp_dir_sort_dedup.mkdir() + + print("# Consolidate") + consolidated_file = consolidate_columns(assembled_file, tmp_file_consolidate) + + print("# Partition") + partition(consolidated_file, tmp_dir_partition, partition_n_buckets) + + print("# Sort + Dedup") + for bucket_file in tmp_dir_partition.glob("bucket_id__*.parquet"): + ( + pl.scan_parquet(bucket_file) + .pipe(sort_dedup) + .sink_parquet(tmp_dir_sort_dedup / bucket_file.name) + ) + + print("# Concatenate + join SEX") + bucket_files = [] + for bucket_id in range(partition_n_buckets): + bucket_files.append(tmp_dir_sort_dedup / f"bucket_id__{bucket_id}.parquet") + + df_pheno = pl.scan_csv( + phenotype_file, + infer_schema=False, + separator="\t", + ).select("FINNGENID", "SEX") + + df_concat = ( + pl.scan_parquet(bucket_files) + # Join SEX + .join( + df_pheno, + left_on="FINNGENID", + right_on="FINNGENID", + how="left", + maintain_order="left", + ) + .with_row_index(name="_rowid", offset=1) + ) + + print("# Sanitize text fields") + # Unicode "SYMBOL FOR NEWLINE", displayed as: ␤ + unicode_newline = "\u2424" + # Unicode "SYMBOL FOR HORIZONTAL TABULATION", displayed as: ␉ + unicode_tab = "\u2409" + trusted_columns = [ + "FINNGENID", + "EVENT_AGE", + "APPROX_EVENT_DAY", + "TIME", + "asiakirjaoid_pseudo", + "merkintaoid_pseudo", + "entryoid_pseudo", + "load_id_pseudo", + "file_name_pseudo", + "laboratoriotutkimusoid", + "_rowid", + "_rowid_source", + "SEX", + ] + ( + df_concat.with_columns( + pl.selectors.exclude(*trusted_columns) + .str.replace_all(pattern="\r\n|\r|\n", value=unicode_newline) + .str.replace_all(pattern="\t", value=unicode_tab, literal=True) + ) + # Re-order column to be somewhat backward compatible with previous implementation + .select( + "_rowid", + "_rowid_source", + "FINNGENID", + "EVENT_AGE", + "APPROX_EVENT_DAY", + "TIME", + "laboratoriotutkimusnimike", + "paikallinentutkimusnimike_koodi", + "paikallinentutkimusnimike_selite", + "tutkimuskoodistonjarjestelma", + "tutkimusvastauksentila", + "tutkimustulosarvo", + "tutkimustulosyksikko", + "tuloksenpoikkeavuus", + "viitearvoryhma", + "viitevalialkuarvo", + "viitevalialkuyksikko", + "viitevaliloppuarvo", + "viitevaliloppuyksikko", + "tutkimustulosteksti", + "SEX", + ) + .sink_parquet(output_file) + ) + + if not keep_intermediate_files: + shutil.rmtree(tmp_dir) + + +def init_cli(): + parser = ArgumentParser() + parser.add_argument( + "--assembled-file", + help="Path to assembled file from the intake.assemble step (Parquet)", + required=True, + type=Path, + ) + parser.add_argument( + "--phenotype-file", + help="Path to phenotype file with FINNGENID and SEX columns (.txt.gz)", + required=True, + type=Path, + ) + parser.add_argument( + "--output-file", + help="Path to write the output file", + required=True, + type=Path, + ) + parser.add_argument( + "--partition-n-buckets", + help="How many buckets to partition the data into to spread the sort+dedup computations.", + required=False, + type=int, + default=24, + ) + parser.add_argument( + "--keep-intermediate-files", + help="Keep intermediate files, useful for debugging.", + action="store_true", + ) + args = parser.parse_args() + + return args + + +def consolidate_columns(assembled_file: Path, output_file: Path) -> Path: + """Remove unecessary columns form the assembled file and rename the ones we will keep.""" + rename_columns = { + "main.FINNGENID": "FINNGENID", + "main.EVENT_AGE": "EVENT_AGE", + "main.APPROX_EVENT_DAY": "APPROX_EVENT_DAY", + "main.TIME": "TIME", + "main.laboratoriotutkimusnimike": "laboratoriotutkimusnimike", + "main.paikallinentutkimusnimike_koodi": "paikallinentutkimusnimike_koodi", + "main.paikallinentutkimusnimike_selite": "paikallinentutkimusnimike_selite", + "main.tutkimuskoodistonjarjestelma": "tutkimuskoodistonjarjestelma", + "main.tutkimusvastauksentila": "tutkimusvastauksentila", + "main.tutkimustulosarvo": "tutkimustulosarvo", + "main.tutkimustulosyksikko": "tutkimustulosyksikko", + "main.tuloksenpoikkeavuus": "tuloksenpoikkeavuus", + "main.viitearvoryhma": "viitearvoryhma", + "main.viitevalialkuarvo": "viitevalialkuarvo", + "main.viitevalialkuyksikko": "viitevalialkuyksikko", + "main.viitevaliloppuarvo": "viitevaliloppuarvo", + "main.viitevaliloppuyksikko": "viitevaliloppuyksikko", + "freetext.tutkimustulosteksti": "tutkimustulosteksti", + } + + out_columns = list(rename_columns.keys()) + ["_rowid_source"] + + ( + pl.scan_parquet(assembled_file) + .select(pl.col(out_columns)) + .rename(rename_columns) + .sink_parquet(output_file) + ) + + return output_file + + +def partition(assembled_file: Path, tmp_dir: Path, n_buckets): + for bucket_id in range(n_buckets): + ( + pl.scan_parquet(assembled_file) + .filter(pl.col("FINNGENID").hash() % n_buckets == bucket_id) + .sink_parquet(tmp_dir / f"bucket_id__{bucket_id}.parquet") + ) + + +def sort_dedup(frame: pl.LazyFrame | pl.DataFrame): + all_columns = frame.collect_schema().names() + sort_subset_columns = set(COLUMNS_UNIQUENESS_SORT) + other_columns = [] + for cc in all_columns: + if cc not in sort_subset_columns: + other_columns.append(cc) + + sort_full_columns = COLUMNS_UNIQUENESS_SORT + other_columns + + return frame.sort(by=sort_full_columns).unique( + subset=COLUMNS_UNIQUENESS_SORT, keep="first", maintain_order=True + ) + + +if __name__ == "__main__": + args = init_cli() + main( + args.assembled_file, + args.phenotype_file, + args.output_file, + partition_n_buckets=args.partition_n_buckets, + keep_intermediate_files=args.keep_intermediate_files, + ) diff --git a/uv.lock b/uv.lock index 14d81d0..209cda9 100644 --- a/uv.lock +++ b/uv.lock @@ -1,6 +1,6 @@ version = 1 revision = 3 -requires-python = ">=3.13" +requires-python = ">=3.12" resolution-markers = [ "python_full_version >= '3.14' and sys_platform == 'win32'", "python_full_version >= '3.14' and sys_platform == 'emscripten'", @@ -103,9 +103,10 @@ wheels = [ [[package]] name = "kanta-lab-preprocessing" version = "0.1.0" -source = { virtual = "." } +source = { editable = "." } dependencies = [ { name = "pandas" }, + { name = "polars" }, ] [package.dev-dependencies] @@ -116,7 +117,10 @@ dev = [ ] [package.metadata] -requires-dist = [{ name = "pandas", specifier = ">=3.0.2" }] +requires-dist = [ + { name = "pandas", specifier = ">=3.0.2" }, + { name = "polars", specifier = ">=1.40.0" }, +] [package.metadata.requires-dev] dev = [ @@ -143,6 +147,17 @@ version = "2.4.4" source = { registry = "https://pypi.org/simple" } sdist = { url = "https://files.pythonhosted.org/packages/d7/9f/b8cef5bffa569759033adda9481211426f12f53299629b410340795c2514/numpy-2.4.4.tar.gz", hash = "sha256:2d390634c5182175533585cc89f3608a4682ccb173cc9bb940b2881c8d6f8fa0", size = 20731587, upload-time = "2026-03-29T13:22:01.298Z" } wheels = [ + { url = "https://files.pythonhosted.org/packages/28/05/32396bec30fb2263770ee910142f49c1476d08e8ad41abf8403806b520ce/numpy-2.4.4-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:15716cfef24d3a9762e3acdf87e27f58dc823d1348f765bbea6bef8c639bfa1b", size = 16689272, upload-time = "2026-03-29T13:18:49.223Z" }, + { url = "https://files.pythonhosted.org/packages/c5/f3/a983d28637bfcd763a9c7aafdb6d5c0ebf3d487d1e1459ffdb57e2f01117/numpy-2.4.4-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:23cbfd4c17357c81021f21540da84ee282b9c8fba38a03b7b9d09ba6b951421e", size = 14699573, upload-time = "2026-03-29T13:18:52.629Z" }, + { url = "https://files.pythonhosted.org/packages/9b/fd/e5ecca1e78c05106d98028114f5c00d3eddb41207686b2b7de3e477b0e22/numpy-2.4.4-cp312-cp312-macosx_14_0_arm64.whl", hash = "sha256:8b3b60bb7cba2c8c81837661c488637eee696f59a877788a396d33150c35d842", size = 5204782, upload-time = "2026-03-29T13:18:55.579Z" }, + { url = "https://files.pythonhosted.org/packages/de/2f/702a4594413c1a8632092beae8aba00f1d67947389369b3777aed783fdca/numpy-2.4.4-cp312-cp312-macosx_14_0_x86_64.whl", hash = "sha256:e4a010c27ff6f210ff4c6ef34394cd61470d01014439b192ec22552ee867f2a8", size = 6552038, upload-time = "2026-03-29T13:18:57.769Z" }, + { url = "https://files.pythonhosted.org/packages/7f/37/eed308a8f56cba4d1fdf467a4fc67ef4ff4bf1c888f5fc980481890104b1/numpy-2.4.4-cp312-cp312-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f9e75681b59ddaa5e659898085ae0eaea229d054f2ac0c7e563a62205a700121", size = 15670666, upload-time = "2026-03-29T13:19:00.341Z" }, + { url = "https://files.pythonhosted.org/packages/0a/0d/0e3ecece05b7a7e87ab9fb587855548da437a061326fff64a223b6dcb78a/numpy-2.4.4-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:81f4a14bee47aec54f883e0cad2d73986640c1590eb9bfaaba7ad17394481e6e", size = 16645480, upload-time = "2026-03-29T13:19:03.63Z" }, + { url = "https://files.pythonhosted.org/packages/34/49/f2312c154b82a286758ee2f1743336d50651f8b5195db18cdb63675ff649/numpy-2.4.4-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:62d6b0f03b694173f9fcb1fb317f7222fd0b0b103e784c6549f5e53a27718c44", size = 17020036, upload-time = "2026-03-29T13:19:07.428Z" }, + { url = "https://files.pythonhosted.org/packages/7b/e9/736d17bd77f1b0ec4f9901aaec129c00d59f5d84d5e79bba540ef12c2330/numpy-2.4.4-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:fbc356aae7adf9e6336d336b9c8111d390a05df88f1805573ebb0807bd06fd1d", size = 18368643, upload-time = "2026-03-29T13:19:10.775Z" }, + { url = "https://files.pythonhosted.org/packages/63/f6/d417977c5f519b17c8a5c3bc9e8304b0908b0e21136fe43bf628a1343914/numpy-2.4.4-cp312-cp312-win32.whl", hash = "sha256:0d35aea54ad1d420c812bfa0385c71cd7cc5bcf7c65fed95fc2cd02fe8c79827", size = 5961117, upload-time = "2026-03-29T13:19:13.464Z" }, + { url = "https://files.pythonhosted.org/packages/2d/5b/e1deebf88ff431b01b7406ca3583ab2bbb90972bbe1c568732e49c844f7e/numpy-2.4.4-cp312-cp312-win_amd64.whl", hash = "sha256:b5f0362dc928a6ecd9db58868fca5e48485205e3855957bdedea308f8672ea4a", size = 12320584, upload-time = "2026-03-29T13:19:16.155Z" }, + { url = "https://files.pythonhosted.org/packages/58/89/e4e856ac82a68c3ed64486a544977d0e7bdd18b8da75b78a577ca31c4395/numpy-2.4.4-cp312-cp312-win_arm64.whl", hash = "sha256:846300f379b5b12cc769334464656bc882e0735d27d9726568bc932fdc49d5ec", size = 10221450, upload-time = "2026-03-29T13:19:18.994Z" }, { url = "https://files.pythonhosted.org/packages/14/1d/d0a583ce4fefcc3308806a749a536c201ed6b5ad6e1322e227ee4848979d/numpy-2.4.4-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:08f2e31ed5e6f04b118e49821397f12767934cfdd12a1ce86a058f91e004ee50", size = 16684933, upload-time = "2026-03-29T13:19:22.47Z" }, { url = "https://files.pythonhosted.org/packages/c1/62/2b7a48fbb745d344742c0277f01286dead15f3f68e4f359fbfcf7b48f70f/numpy-2.4.4-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:e823b8b6edc81e747526f70f71a9c0a07ac4e7ad13020aa736bb7c9d67196115", size = 14694532, upload-time = "2026-03-29T13:19:25.581Z" }, { url = "https://files.pythonhosted.org/packages/e5/87/499737bfba066b4a3bebff24a8f1c5b2dee410b209bc6668c9be692580f0/numpy-2.4.4-cp313-cp313-macosx_14_0_arm64.whl", hash = "sha256:4a19d9dba1a76618dd86b164d608566f393f8ec6ac7c44f0cc879011c45e65af", size = 5199661, upload-time = "2026-03-29T13:19:28.31Z" }, @@ -207,6 +222,14 @@ dependencies = [ ] sdist = { url = "https://files.pythonhosted.org/packages/da/99/b342345300f13440fe9fe385c3c481e2d9a595ee3bab4d3219247ac94e9a/pandas-3.0.2.tar.gz", hash = "sha256:f4753e73e34c8d83221ba58f232433fca2748be8b18dbca02d242ed153945043", size = 4645855, upload-time = "2026-03-31T06:48:30.816Z" } wheels = [ + { url = "https://files.pythonhosted.org/packages/f3/b0/c20bd4d6d3f736e6bd6b55794e9cd0a617b858eaad27c8f410ea05d953b7/pandas-3.0.2-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:232a70ebb568c0c4d2db4584f338c1577d81e3af63292208d615907b698a0f18", size = 10347921, upload-time = "2026-03-31T06:46:33.36Z" }, + { url = "https://files.pythonhosted.org/packages/35/d0/4831af68ce30cc2d03c697bea8450e3225a835ef497d0d70f31b8cdde965/pandas-3.0.2-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:970762605cff1ca0d3f71ed4f3a769ea8f85fc8e6348f6e110b8fea7e6eb5a14", size = 9888127, upload-time = "2026-03-31T06:46:36.253Z" }, + { url = "https://files.pythonhosted.org/packages/61/a9/16ea9346e1fc4a96e2896242d9bc674764fb9049b0044c0132502f7a771e/pandas-3.0.2-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:aff4e6f4d722e0652707d7bcb190c445fe58428500c6d16005b02401764b1b3d", size = 10399577, upload-time = "2026-03-31T06:46:39.224Z" }, + { url = "https://files.pythonhosted.org/packages/c4/a8/3a61a721472959ab0ce865ef05d10b0d6bfe27ce8801c99f33d4fa996e65/pandas-3.0.2-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:ef8b27695c3d3dc78403c9a7d5e59a62d5464a7e1123b4e0042763f7104dc74f", size = 10880030, upload-time = "2026-03-31T06:46:42.412Z" }, + { url = "https://files.pythonhosted.org/packages/da/65/7225c0ea4d6ce9cb2160a7fb7f39804871049f016e74782e5dade4d14109/pandas-3.0.2-cp312-cp312-musllinux_1_2_aarch64.whl", hash = "sha256:f8d68083e49e16b84734eb1a4dcae4259a75c90fb6e2251ab9a00b61120c06ab", size = 11409468, upload-time = "2026-03-31T06:46:45.2Z" }, + { url = "https://files.pythonhosted.org/packages/fa/5b/46e7c76032639f2132359b5cf4c785dd8cf9aea5ea64699eac752f02b9db/pandas-3.0.2-cp312-cp312-musllinux_1_2_x86_64.whl", hash = "sha256:32cc41f310ebd4a296d93515fcac312216adfedb1894e879303987b8f1e2b97d", size = 11936381, upload-time = "2026-03-31T06:46:48.293Z" }, + { url = "https://files.pythonhosted.org/packages/7b/8b/721a9cff6fa6a91b162eb51019c6243b82b3226c71bb6c8ef4a9bd65cbc6/pandas-3.0.2-cp312-cp312-win_amd64.whl", hash = "sha256:a4785e1d6547d8427c5208b748ae2efb64659a21bd82bf440d4262d02bfa02a4", size = 9744993, upload-time = "2026-03-31T06:46:51.488Z" }, + { url = "https://files.pythonhosted.org/packages/d5/18/7f0bd34ae27b28159aa80f2a6799f47fda34f7fb938a76e20c7b7fe3b200/pandas-3.0.2-cp312-cp312-win_arm64.whl", hash = "sha256:08504503f7101300107ecdc8df73658e4347586db5cfdadabc1592e9d7e7a0fd", size = 9056118, upload-time = "2026-03-31T06:46:54.548Z" }, { url = "https://files.pythonhosted.org/packages/bf/ca/3e639a1ea6fcd0617ca4e8ca45f62a74de33a56ae6cd552735470b22c8d3/pandas-3.0.2-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:b5918ba197c951dec132b0c5929a00c0bf05d5942f590d3c10a807f6e15a57d3", size = 10321105, upload-time = "2026-03-31T06:46:57.327Z" }, { url = "https://files.pythonhosted.org/packages/0b/77/dbc82ff2fb0e63c6564356682bf201edff0ba16c98630d21a1fb312a8182/pandas-3.0.2-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:d606a041c89c0a474a4702d532ab7e73a14fe35c8d427b972a625c8e46373668", size = 9864088, upload-time = "2026-03-31T06:46:59.935Z" }, { url = "https://files.pythonhosted.org/packages/5c/2b/341f1b04bbca2e17e13cd3f08c215b70ef2c60c5356ef1e8c6857449edc7/pandas-3.0.2-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:710246ba0616e86891b58ab95f2495143bb2bc83ab6b06747c74216f583a6ac9", size = 10369066, upload-time = "2026-03-31T06:47:02.792Z" }, @@ -270,6 +293,34 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, ] +[[package]] +name = "polars" +version = "1.40.1" +source = { registry = "https://pypi.org/simple" } +dependencies = [ + { name = "polars-runtime-32" }, +] +sdist = { url = "https://files.pythonhosted.org/packages/b3/8c/bc9bc948058348ed43117cecc3007cd608f395915dae8a00974579a5dab1/polars-1.40.1.tar.gz", hash = "sha256:ab2694134b137596b5a59bfd7b4c54ebbc9b59f9403127f18e32d363777552e8", size = 733574, upload-time = "2026-04-22T19:15:55.507Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/ea/91/74fc60d94488685a92ac9d49d7ec55f3e91fe9b77942a6235a5fa7f249c3/polars-1.40.1-py3-none-any.whl", hash = "sha256:c0f861219d1319cdea45c4ce4d30355a47176b8f98dcedf95ea8269f131b8abd", size = 828723, upload-time = "2026-04-22T19:14:25.452Z" }, +] + +[[package]] +name = "polars-runtime-32" +version = "1.40.1" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/54/ba/26d40f039be9f552b5fd7365a621bdfc0f8e912ef77094ae4693491b0bae/polars_runtime_32-1.40.1.tar.gz", hash = "sha256:37f3065615d1bf90d03b5326222df4c5c1f8a5d33e50470aa588e3465e6eb814", size = 2935843, upload-time = "2026-04-22T19:15:57.26Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/7d/46/22c8af5eed68ac2eeb556e0fa3ca8a7b798e984ceff4450888f3b5ac61fd/polars_runtime_32-1.40.1-cp310-abi3-macosx_10_12_x86_64.whl", hash = "sha256:b748ef652270cc49e9e69f99a035e0eb4d5f856d42bcd6ac4d9d80a40142aa1e", size = 52098755, upload-time = "2026-04-22T19:14:28.555Z" }, + { url = "https://files.pythonhosted.org/packages/c6/3e/48599a38009ca60ff82a6f38c8a621ce3c0286aa7397c7d79e741bd9060e/polars_runtime_32-1.40.1-cp310-abi3-macosx_11_0_arm64.whl", hash = "sha256:d249b3743e05986060cec0a7aaa542d020df6c6b876e556023a310efd581f9be", size = 46367542, upload-time = "2026-04-22T19:14:32.433Z" }, + { url = "https://files.pythonhosted.org/packages/43/e9/384bc069367a1a36ee31c13782c178dbd039b2b873b772d4a0fc23a2373d/polars_runtime_32-1.40.1-cp310-abi3-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5987b30e7aa1059d069498496e8dda35afd592b0ac3d46ed87e3ff8df1ad652c", size = 50252104, upload-time = "2026-04-22T19:14:35.945Z" }, + { url = "https://files.pythonhosted.org/packages/15/ef/7d57ceb0651af74194e97ed6583e148d352f03d696090221b8059cdfc90b/polars_runtime_32-1.40.1-cp310-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:8d7f42a8b3f16fc66002cc0f6516f7dd7653396886ae0ed362ab95c0b3408b59", size = 56250788, upload-time = "2026-04-22T19:14:39.743Z" }, + { url = "https://files.pythonhosted.org/packages/10/0f/e4b3ffc748827a14a474ec9c42e45c066050e440fec57e914091d9adda75/polars_runtime_32-1.40.1-cp310-abi3-musllinux_1_2_aarch64.whl", hash = "sha256:e5f7becc237a7ec9d9a10878dc8e54b73bbf4e2d94a2991c37d7a0b38590d8f9", size = 50432590, upload-time = "2026-04-22T19:14:43.388Z" }, + { url = "https://files.pythonhosted.org/packages/d9/0b/b8d95fbed869fa4caabe9c400e4210374913b376e925e96fdcfa9be6416b/polars_runtime_32-1.40.1-cp310-abi3-musllinux_1_2_x86_64.whl", hash = "sha256:992d14cf191dde043d36fbdbc98a65e43fbc7e9a5024cecd45f838ac4988c1ee", size = 54155564, upload-time = "2026-04-22T19:14:47.239Z" }, + { url = "https://files.pythonhosted.org/packages/06/d9/d091d8fb5cbed5e9536adfed955c4c89987a4cc3b8e73ae4532402b91c74/polars_runtime_32-1.40.1-cp310-abi3-win_amd64.whl", hash = "sha256:f78bb2abd00101cbb23cc0cb068f7e36e081057a15d2ec2dde3dda280709f030", size = 51829755, upload-time = "2026-04-22T19:14:50.85Z" }, + { url = "https://files.pythonhosted.org/packages/65/ad/b33c3022a394f3eb55c3310597cec615412a8a33880055eee191d154a628/polars_runtime_32-1.40.1-cp310-abi3-win_arm64.whl", hash = "sha256:b5cbfaf6b085b420b4bfcbe24e8f665076d1cccfdb80c0484c02a023ce205537", size = 45822104, upload-time = "2026-04-22T19:14:54.192Z" }, +] + [[package]] name = "prompt-toolkit" version = "3.0.52" diff --git a/wdl/pre-merge.json b/wdl/pre-merge.json deleted file mode 100644 index 6ea88a5..0000000 --- a/wdl/pre-merge.json +++ /dev/null @@ -1,7 +0,0 @@ -{ - "pre_merge.test":false, - "pre_merge.kanta_list": "gs://fg-3/kanta_v3/inputs/kanta_file_list.txt", - "pre_merge.prefix": "finngen_R14_kanta_laboratory_responses_internal_VERSION.txt.gz", - "pre_merge.version": "1.0" - -} diff --git a/wdl/pre-merge.wdl b/wdl/pre-merge.wdl deleted file mode 100644 index a161165..0000000 --- a/wdl/pre-merge.wdl +++ /dev/null @@ -1,114 +0,0 @@ -version 1.0 - -workflow pre_merge { - input { - Boolean test - File kanta_list - String prefix - String version - - } - String docker = "eu.gcr.io/finngen-sandbox-v3-containers/bioinformatics:1.0.1" - - # Remove quotation marks (and test if needed) - scatter (year_files in read_tsv(kanta_list)) { - # THIS STEP REMOVES QUOTATION BLOCKS - call process_file as process_responses {input :input_file= year_files[0],test = test,docker=docker} - call process_file as process_ft { input :input_file= year_files[1],test = test ,docker=docker} - call merge_ft {input: responses_file = process_responses.cleaned_file,ft_file = process_ft.cleaned_file,docker=docker} - } - - call merge_files {input:rr_files = merge_ft.merged_year,out_file = sub(prefix,"VERSION",if test then version +"_test" else version),docker=docker } - output { - File merged_kanta =merge_files.merged_file - } -} - -task merge_ft { - input { - File responses_file - File ft_file - String docker - } - - String out_file = sub(basename(responses_file),'.txt.gz','_merged.txt.gz') - command <<< - set -euo pipefail - F1="~{responses_file}" - F2="~{ft_file}" - OUT="~{out_file}" - # 1. Get headers safely. - # 'head -1' often causes 'zcat' to return exit code 141 (SIGPIPE). - # '|| true' ensures H1/H2 assignments don't trigger 'set -e'. - H1=$(zcat -f "$F1" | head -1 || true) - H2=$(zcat -f "$F2" | head -1 || true) - -# Check if we actually got headers before proceeding - if [[ -z "$H1" || -z "$H2" ]]; then - echo "Error: Could not read headers from input files." >&2 - exit 1 - fi - # Get headers and find indices for columns in F2 not in F1 - OFF=$(echo "$H1" | tr '\t' '\n' | wc -l) - - # Join files and process in one AWK pass - paste <(zcat -f "$F1") <(zcat -f "$F2") | awk -F'\t' -v OFS='\t' -v h1="$H1" -v h2="$H2" -v off="$OFF" ' - BEGIN { - split(h1, a1); split(h2, a2) - for(i in a1) map[a1[i]] = i - for(i in a2) if(a2[i] in map) pairs[map[a2[i]]] = i + off; else new[++n] = i + off - } - { - for(p in pairs) if($p != $pairs[p]) { print "Err line "NR": "$p" != "$pairs[p] > "/dev/stderr"; exit 1 } - res = $1; for(i=2; i<=off; i++) res = res OFS $i - for(i=1; i<=n; i++) res = res OFS $(new[i]) - print res - if(NR%50000==0) printf "\rRow %d", NR > "/dev/stderr" - }' | gzip > "$OUT" - - - >>> - runtime { - disks: "local-disk ~{ceil(size(responses_file,'GB')*3) + 10} HDD" - docker : "~{docker}" - } - output { - File merged_year = out_file - } -} - -task process_file { - input { - File input_file - Boolean test - String docker - } - String base = sub(basename(input_file),'.txt.gz','_cleaned.txt.gz') - command <<< - zcat -f ~{input_file} | sed 's/\(^\|\t\)"/\1/g; s/"\(\t\|$\)/\1/g' | tr -d '\r' | awk -F'\t' '/^FG/{if(NR>1)print ""; printf "%s",$0; next} {printf " %s",$0} END{print ""}' | awk -F'\t' 'BEGIN{OFS="\t"} NR==1{cols=NF} {if(NF ~{base} - >>> - runtime { - disks: "local-disk ~{ceil(size(input_file,'GB')*3) + 10} HDD" - docker:"~{docker}" - } - output {File cleaned_file = base} - -} - -task merge_files { - input { - Array[File] rr_files - String out_file - String docker - } - command <<< - zcat ~{rr_files[0]} | head -n1 | bgzip -c > ~{out_file} - while read f; do echo $f && zcat $f | sed -E 1d | bgzip -c >> ~{out_file}; done < ~{write_lines(rr_files)} - zcat ~{out_file} | wc -l - >>> - runtime { - disks: "local-disk ~{ceil(size(rr_files,'GB'))*3 + 10} HDD" - docker:"~{docker}" - } - output { File merged_file = out_file} -} diff --git a/wdl/sort_dup.json b/wdl/sort_dup.json deleted file mode 100644 index 906d162..0000000 --- a/wdl/sort_dup.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "kanta_sort_dup.test":false, - "kanta_sort_dup.kanta_data":"gs://fg-3/kanta_v3/finngen_R14_kanta_laboratory_responses_internal_1.0.txt.gz", - "kanta_sort_dup.sex_map.min_pheno": "gs://finngen-production-library-red/finngen_R13/phenotype_1.0/data/finngen_R13_minimum_1.0.txt.gz", - "kanta_sort_dup.kanta_docker": "eu.gcr.io/finngen-sandbox-v3-containers/kanta:dev", - "kanta_sort_dup.split.n_chunks": 32, - -} diff --git a/wdl/sort_dup.wdl b/wdl/sort_dup.wdl deleted file mode 100644 index d5b6596..0000000 --- a/wdl/sort_dup.wdl +++ /dev/null @@ -1,249 +0,0 @@ -version 1.0 - -workflow kanta_sort_dup{ - input { - # works with 100k lines - Boolean test - File kanta_data - String kanta_docker - } - # this has python 3.6, needed in the merge step. - String base_docker = "eu.gcr.io/finngen-sandbox-v3-containers/kanta:v3_base" - call get_cols {input:docker=kanta_docker} - # split input in chunks - # s_cols (names) --> sort_cols (indices) - call split { - input: - test = test, - kanta_data = kanta_data, - cols = get_cols.cols, - s_cols = get_cols.s_cols, - docker=base_docker - } - - # builds sex dictionary mapping from pheno file - call sex_map {input: docker=base_docker} - - # extract columns sort and extract duplicates/errs - scatter (i in range(length(split.chunks))) { - call sort { - input : - index = i, - chunk = split.chunks[i], - sort_cols = split.sort_cols, - sex_map = sex_map.sex_map, - docker=base_docker - } - } - # merge chunks (unique/dup/err) - String prefix = basename(kanta_data,'.txt.gz') - call merge { - input : - sorted_chunks = sort.sorted_chunk, - sort_cols = split.sort_cols, - header = split.header, - docker= base_docker, - prefix = if test then prefix+ "_test" else prefix - } -} - -task merge { - input { - File header - Array[File] sorted_chunks - Array[String] sort_cols - String prefix - String docker - } - - Int chunk_size = ceil(size(sorted_chunks,"GB")) - command <<< - # CONCAT PRE-SORTED FILES - echo "SORT FILES" - for col in ~{sep=' ' sort_cols}; do echo "${col},${col}" >> sort_keys.tmp; done - SORT_ARGS=$(cat sort_keys.tmp | xargs -I {} echo "-k {}" | tr '\n' ' ') - /usr/bin/time -v sort -t $'\t' -m $SORT_ARGS ~{sep=" " sorted_chunks} > sorted.txt - #/usr/bin/time -v sort -t $'\t' -m -k ~{sep=" -k " sort_cols} ~{sep=" " sorted_chunks} > sorted.txt - # REMOVE DUPS - python3 <>> - runtime { - disks: "local-disk ~{chunk_size*4+10} HDD" - docker : "~{docker}" - } - output { - Array[File] kanta_files = glob("~{prefix}*gz") - } -} - -task sort { - input { - File chunk - Array[String] sort_cols - Int index - String docker - File sex_map - } - String out_file = "kanta_sorted_" + index - - command <<< - for col in ~{sep=' ' sort_cols}; do echo "${col},${col}" >> sort_keys.tmp; done - SORT_ARGS=$(cat sort_keys.tmp | xargs -I {} echo "-k {}" | tr '\n' ' ') - zcat ~{chunk} | sort -t $'\t' $SORT_ARGS > tmp.txt - #zcat ~{chunk} | sort -t $'\t' -k ~{sep=" -k " sort_cols} > ~{out_file} - - #add sex - awk -F'\t' 'BEGIN {OFS="\t"} NR==FNR {sex[$1]=$2; next} NR==1 {print $0, "SEX"; next} {print $0, (sex[$1] ? sex[$1] : "NA")}' \ - ~{sex_map} tmp.txt > ~{out_file} - - # check file size - count_tmp=$(wc -l < tmp.txt) - count_out=$(wc -l < ~{out_file}) - - # Perform the assertion - if [[ "$count_tmp" -ne "$count_out" ]]; then - echo "❌ Assertion Failed: Line counts do not match!" >&2 - echo "tmp.txt has $count_tmp lines." >&2 - echo "~{out_file} has $count_out lines." >&2 - exit 1 # Exit with a non-zero status to signal an error - else - echo "✅ Assertion Passed: Both files have $count_tmp lines." - fi - >>> - - runtime { - disks: "local-disk ~{ceil(size(chunk,'GB'))*3 + 10} HDD" - docker: "~{docker}" - } - - output { - File sorted_chunk = out_file - } -} - -task get_cols { - input { - String docker - } - - command <<< - # get required columns to cut from git repository - cp /finngen_qc/magic_config.py ./config.py - python3 -c "import config;o= open('./columns.txt','wt') ;o.write('\n'.join(list(config.config['rename_cols'].keys())) + '\n');o.write('\n'.join(config.config['other_cols'])+ '\n')" - python3 -c "import config;o= open('./sort_columns.txt','wt') ;o.write('\n'.join(config.config['sort_cols'])+ '\n')" - >>> - runtime { - disks: "local-disk 10 HDD" - docker : "~{docker}" - } - output { - Array[String] cols = read_lines("columns.txt") - Array[String] s_cols = read_lines("sort_columns.txt") - } -} - -task split { - input { - Boolean test - File kanta_data - Int n_chunks - Array[String] cols - Array[String] s_cols - String docker - } - - Int disk_size = ceil(size(kanta_data,"GB"))*10*n_chunks - - command <<< - echo "SORT KANTA" - cat ~{write_lines(cols)} > columns.txt - cat ~{write_lines(s_cols)} > sort_columns.txt - COLS=$(zcat ~{kanta_data} | head -n1 | tr '\t' '\n' | grep -wnf columns.txt | cut -f 1 -d ':' | tr '\n' ',' | rev | cut -c2- | rev) - echo $COLS - - # uncompress and split new header from body - zcat ~{kanta_data} | cut -f $COLS | head -n1 > header.txt - zcat ~{kanta_data} | cut -f $COLS | sed -E 1d ~{if test then " | head -n 10000 " else ""}> tmp.tsv - - # GET SORT COLS AND KEEP ORDER - echo "COLS" - while read f; - do - cat header.txt | head -n1 | tr '\t' '\n'| grep -wn $f | cut -f 1 -d ':' >> sort_cols.txt - done < sort_columns.txt - cat sort_cols.txt - - # SPLIT INTO N FILES - split tmp.tsv -n l/~{n_chunks} -d kanta_chunk --filter='gzip > $FILE.gz' - >>> - - runtime { - disks: "local-disk ~{disk_size} HDD" - docker : "~{docker}" - } - - output { - Array[File] chunks = glob("./kanta_chunk*gz") - File header = "header.txt" - Array[String] sort_cols = read_lines("sort_cols.txt") - } -} - -task sex_map { - input { - File min_pheno - String docker - } - String sex_file = "sex_map.txt" - command <<< - # get sex col - sexcol=$(awk '{for(i=1;i<=NF;i++){if($i=="SEX"){print i; exit}}}' <(zcat ~{min_pheno} | head -n1)) - # extract sex only and sort - zcat ~{min_pheno} | cut -f 1,$sexcol | (sed -u 1q ; sort )>> ~{sex_file} - >>> - runtime { - disks: "local-disk ~{ceil(size(min_pheno,'GB')) * 3} HDD" - docker : "~{docker}" - } - output {File sex_map = sex_file} -}