diff --git a/.flake8 b/.flake8 index 9747547..24102ab 100644 --- a/.flake8 +++ b/.flake8 @@ -1,4 +1,6 @@ [flake8] inline-quotes = " multiline-quotes = """ -docstring-quotes = """ \ No newline at end of file +docstring-quotes = """ +ignore = E203, E266, E501, W503 +max-line-length = 88 \ No newline at end of file diff --git a/flows/lib/shared_args.py b/flows/lib/shared_args.py index f7d4489..4d5d801 100644 --- a/flows/lib/shared_args.py +++ b/flows/lib/shared_args.py @@ -41,6 +41,14 @@ }, } +DATA_FREEZE_PATH = { + "flags": ["--data_freeze_path"], + "keys": { + "help": "Path to data freeze list TSV on S3.", + "type": str, + }, +} + DATE = { "flags": ["--date"], "keys": { diff --git a/flows/lib/utils.py b/flows/lib/utils.py index 1f40f8f..ed8f8bd 100644 --- a/flows/lib/utils.py +++ b/flows/lib/utils.py @@ -8,6 +8,7 @@ import shlex import shutil import subprocess +import tempfile from argparse import Action from csv import DictReader, Sniffer from datetime import datetime @@ -755,6 +756,42 @@ def upload_to_s3(local_path: str, s3_path: str, gz: bool = False) -> None: raise e +def parse_s3_file(data_freeze_path: str) -> dict: + """ + Fetch and parse a 2-column TSV from the given S3 path. + + Args: + data_freeze_path (str): The S3 path to the TSV file. + Returns: + dict: A dictionary mapping column 1 keys to their corresponding values. + + """ + # from s3 to temporary file + print(f"Fetching data freeze file from {data_freeze_path}") + with tempfile.NamedTemporaryFile() as tmp_file: + local_path = tmp_file.name + fetch_from_s3(data_freeze_path, local_path) + parsed = {} + with open(local_path, "r") as f: + for line in f: + parts = [p.strip() for p in line.strip().split("\t")] + key = parts[0] + if len(parts) < 2: + parsed[key] = [] + continue + # For each value column, split on comma and store as list + value_lists = [ + [v.strip() for v in col.split(",") if v.strip()] + for col in parts[1:] + ] + parsed[key] = ( + value_lists + if len(value_lists) > 1 + else (value_lists[0] if value_lists else []) + ) + return parsed + + def set_index_name( index_type: str, hub_name: str, diff --git a/flows/lib/wrapper_fetch_parse_validate.py b/flows/lib/wrapper_fetch_parse_validate.py index 522f23a..310a63d 100644 --- a/flows/lib/wrapper_fetch_parse_validate.py +++ b/flows/lib/wrapper_fetch_parse_validate.py @@ -6,6 +6,7 @@ from fetch_previous_file_pair import fetch_previous_file_pair from shared_args import ( APPEND, + DATA_FREEZE_PATH, DRY_RUN, MIN_ASSIGNED, MIN_VALID, @@ -37,6 +38,7 @@ def fetch_parse_validate( yaml_path: str, s3_path: str, work_dir: str, + data_freeze_path: Optional[str] = None, taxdump_path: Optional[str] = None, append: bool = False, dry_run: bool = False, @@ -51,6 +53,7 @@ def fetch_parse_validate( yaml_path (str): Path to the source YAML file. s3_path (str): Path to the TSV directory on S3. work_dir (str): Path to the working directory. + data_freeze_path (str, optional): Path to a data freeze list TSV on S3. taxdump_path (str, optional): Path to an NCBI format taxdump. append (bool, optional): Flag to append values to an existing TSV file(s). dry_run (bool, optional): Flag to run the flow without updating s3/git files. @@ -65,7 +68,12 @@ def fetch_parse_validate( append = False working_yaml = os.path.join(work_dir, os.path.basename(yaml_path)) file_parser = PARSERS.parsers[parser.name] - file_parser.func(working_yaml=working_yaml, work_dir=work_dir, append=append) + file_parser.func( + working_yaml=working_yaml, + work_dir=work_dir, + append=append, + data_freeze_path=data_freeze_path, + ) if dry_run: # set s3_path = None to skip copying the validated file to S3/git s3_path = None @@ -90,6 +98,7 @@ def fetch_parse_validate( required(YAML_PATH), required(S3_PATH), WORK_DIR, + DATA_FREEZE_PATH, TAXDUMP_PATH, APPEND, DRY_RUN, diff --git a/flows/parsers/args.py b/flows/parsers/args.py index a848e9b..4572795 100644 --- a/flows/parsers/args.py +++ b/flows/parsers/args.py @@ -1,10 +1,13 @@ import argparse -from flows.lib.shared_args import APPEND, INPUT_PATH, YAML_PATH +from flows.lib.shared_args import APPEND, DATA_FREEZE_PATH, INPUT_PATH, YAML_PATH from flows.lib.shared_args import parse_args as _parse_args from flows.lib.shared_args import required def parse_args(description: str = "An input file parser") -> argparse.Namespace: """Parse command-line arguments.""" - return _parse_args([required(INPUT_PATH), required(YAML_PATH), APPEND], description) + return _parse_args( + [required(INPUT_PATH), required(YAML_PATH), APPEND, DATA_FREEZE_PATH], + description, + ) diff --git a/flows/parsers/parse_ncbi_assemblies.py b/flows/parsers/parse_ncbi_assemblies.py index 3efffab..c202f4b 100644 --- a/flows/parsers/parse_ncbi_assemblies.py +++ b/flows/parsers/parse_ncbi_assemblies.py @@ -1,5 +1,6 @@ import json import os +import re import subprocess from collections import defaultdict from glob import glob @@ -9,7 +10,7 @@ from flows.lib import utils # noqa: E402 from flows.lib.conditional_import import flow, run_count, task # noqa: E402 -from flows.lib.utils import Config, Parser # noqa: E402 +from flows.lib.utils import Config, Parser, parse_s3_file # noqa: E402 from flows.parsers.args import parse_args # noqa: E402 @@ -353,6 +354,7 @@ def process_assembly_reports( """ for report in parse_assembly_report(jsonl_path=jsonl_path): try: + print(f"Processing report for {report.get('accession', 'unknown')}") processed_report = process_assembly_report( report, previous_report, config, parsed ) @@ -376,9 +378,83 @@ def process_assembly_reports( continue +@task(log_prints=True) +def parse_data_freeze_file(data_freeze_path: str) -> dict: + """ + Fetch and parse a 2-column TSV with the data freeze list of assemblies and their + respective status from the given S3 path. + + Args: + data_freeze_path (str): The S3 path to the data freeze list TSV file. + Returns: + dict: A dictionary mapping assembly accessions to their freeze subsets. + + """ + # from s3 to temporary file + print(f"Fetching data freeze file from {data_freeze_path}") + data_freeze = parse_s3_file(data_freeze_path) + print(f"Parsed {len(data_freeze)} entries from data freeze file") + return data_freeze + + +@task() +def set_data_freeze_default(parsed: dict, data_freeze_name: str): + """ + Set the default data freeze information for all assemblies. + + Args: + parsed (dict): A dictionary containing parsed data. + data_freeze_name (str): The name of the default data freeze. + """ + for line in parsed.values(): + line["dataFreeze"] = [data_freeze_name] + line["assemblyID"] = line["genbankAccession"] + + +@task(log_prints=True) +def process_datafreeze_info(processed_report: dict, data_freeze: dict, config: Config): + """ + Process the data freeze information for a given assembly report. + Rename the assembly + + Args: + processed_report (dict): A dictionary containing processed assembly data. + data_freeze (dict): A dictionary containing data freeze information. + """ + data_freeze_name = ( + re.sub(r"\.tsv(\.gz)?$", "", os.path.basename(config.meta["file_name"])) + if config.meta["file_name"] + else "data_freeze" + ) + print(f"Processing data freeze info for {data_freeze_name}") + for line in processed_report.values(): + print( + f"Processing data freeze info for {line['refseqAccession']} - " + f"{line['genbankAccession']}" + ) + status = data_freeze.get(line["refseqAccession"]) or data_freeze.get( + line["genbankAccession"] + ) + if not status: + continue + line["dataFreeze"] = status + + accession_name = ( + line["refseqAccession"] + if line["refseqAccession"] in data_freeze + else line["genbankAccession"] + ) + line["assemblyID"] = f"{accession_name}_{data_freeze_name}" + + @flow(log_prints=True) def parse_ncbi_assemblies( - input_path: str, yaml_path: str, append: bool, feature_file: Optional[str] = None + input_path: str, + yaml_path: str, + append: bool, + feature_file: Optional[str] = None, + data_freeze_path: Optional[str] = None, + **kwargs, ): """ Parse NCBI datasets assembly data. @@ -388,6 +464,8 @@ def parse_ncbi_assemblies( yaml_path (str): Path to the YAML configuration file. append (bool): Flag to append values to an existing TSV file(s). feature_file (str): Path to the feature file. + data_freeze_path (str): Path to data freeze list TSV on S3. + **kwargs: Additional keyword arguments. """ config = utils.load_config( config_file=yaml_path, @@ -396,11 +474,20 @@ def parse_ncbi_assemblies( ) if feature_file is not None: set_up_feature_file(config) + biosamples = {} parsed = {} previous_report = {} if append else None process_assembly_reports(input_path, config, biosamples, parsed, previous_report) set_representative_assemblies(parsed, biosamples) + + if data_freeze_path is None: + set_data_freeze_default(parsed, data_freeze_name="latest") + else: + data_freeze = parse_data_freeze_file( + data_freeze_path + ) # This returns the data freeze dictionary + process_datafreeze_info(parsed, data_freeze, config) write_to_tsv(parsed, config) diff --git a/flows/parsers/parse_refseq_organelles.py b/flows/parsers/parse_refseq_organelles.py index 09b7d37..c7245bd 100644 --- a/flows/parsers/parse_refseq_organelles.py +++ b/flows/parsers/parse_refseq_organelles.py @@ -2,7 +2,9 @@ from flows.parsers.args import parse_args # noqa: E402 -def parse_refseq_organelles(working_yaml: str, work_dir: str, append: bool) -> None: +def parse_refseq_organelles( + working_yaml: str, work_dir: str, append: bool, **kwargs +) -> None: """ Wrapper function to parse the RefSeq organelles JSONL file. @@ -10,6 +12,7 @@ def parse_refseq_organelles(working_yaml: str, work_dir: str, append: bool) -> N working_yaml (str): Path to the working YAML file. work_dir (str): Path to the working directory. append (bool): Whether to append to the existing TSV file. + **kwargs: Additional keyword arguments. """ print("parsing RefSeq organelles files") diff --git a/flows/parsers/parse_sequencing_status.py b/flows/parsers/parse_sequencing_status.py index 3be87c8..37915d7 100644 --- a/flows/parsers/parse_sequencing_status.py +++ b/flows/parsers/parse_sequencing_status.py @@ -2,7 +2,9 @@ from flows.parsers.args import parse_args # noqa: E402 -def parse_sequencing_status(working_yaml: str, work_dir: str, append: bool) -> None: +def parse_sequencing_status( + working_yaml: str, work_dir: str, append: bool, **kwargs +) -> None: """ Wrapper function to parse the sequencing status files. @@ -10,6 +12,7 @@ def parse_sequencing_status(working_yaml: str, work_dir: str, append: bool) -> N working_yaml (str): Path to the working YAML file. work_dir (str): Path to the working directory. append (bool): Whether to append to the existing TSV file. + **kwargs: Additional keyword arguments. """ print("parsing sequencing status files") diff --git a/flows/parsers/parse_skip_parsing.py b/flows/parsers/parse_skip_parsing.py index 11e21c2..7e1dd5b 100644 --- a/flows/parsers/parse_skip_parsing.py +++ b/flows/parsers/parse_skip_parsing.py @@ -24,7 +24,9 @@ def check_tsv_file_exists(config: Config, work_dir: str) -> bool: @flow() -def parse_skip_parsing(working_yaml: str, work_dir: str, append: bool) -> None: +def parse_skip_parsing( + working_yaml: str, work_dir: str, append: bool, **kwargs +) -> None: """ Skip parsing. @@ -32,6 +34,7 @@ def parse_skip_parsing(working_yaml: str, work_dir: str, append: bool) -> None: working_yaml (str): Path to the working YAML file. work_dir (str): Path to the working directory. append (bool): Whether to append to the existing TSV file. + **kwargs: Additional keyword arguments. """ # Get the config from the YAML file config = load_config(working_yaml) diff --git a/flows/prefect.yaml b/flows/prefect.yaml index 9a45e6d..8571667 100644 --- a/flows/prefect.yaml +++ b/flows/prefect.yaml @@ -74,6 +74,7 @@ deployments: match: prefect.resource.type: ncbi.datasets prefect.resource.matches.previous: "no" + prefect.resource.id": fetch.datasets./home/ubuntu/tmp/test/assembly-data-{{ prefect.variables.date }}/ncbi_datasets_eukaryota.jsonl" expect: - update.ncbi.datasets.finished parameters: @@ -167,3 +168,40 @@ deployments: output_path: "/home/ubuntu/tmp/test/genomehubs-taxonomy/eukaryota/nodes.jsonl" s3_path: "s3://goat/resources/taxonomy/genomehubs/eukaryota/nodes.jsonl.gz" work_pool: *goat_data_work_pool + + - name: update-data-freeze + # This flow updates a data freeze JSONL file + entrypoint: flows/updaters/update_ncbi_datasets.py:update_ncbi_datasets + parameters: + output_path: "/home/ubuntu/tmp/test/assembly-data-{{ prefect.variables.date }}/vgp_phase1.jsonl" + s3_path: s3://goat/resources/data-freeezes/vgp_phase1.jsonl.gz + data_freeze_path: "s3://goat/resources/data-freezes/vgp_phase1_list_data_freeze.tsv" + work_pool: *goat_data_work_pool + + - name: fetch-parse-validate-data-freeze + entrypoint: flows/lib/wrapper_fetch_parse_validate.py:fetch_parse_validate + parameters: + parser: "ParserEnum.NCBI_ASSEMBLIES" + yaml_path: "../goat-data-main/sources/assembly-data/vgp_phase1.types.yaml" + s3_path: "s3://goat/sources/assembly-data/" + work_dir: "/home/ubuntu/tmp/test/assembly-data-{{ prefect.variables.date }}" + data_freeze_path: "s3://goat/resources/data-freezes/vgp_phase1_list_data_freeze.tsv" + append: true + dry_run: true + triggers: + - enabled: true + match: + prefect.resource.type: ncbi.datasets + prefect.resource.matches.previous: "no" + prefect.resource.id": fetch.datasets./home/ubuntu/tmp/test/assembly-data-{{ prefect.variables.date }}/vgp_phase1.jsonl" + expect: + - update.ncbi.datasets.finished + parameters: + parser: "ParserEnum.NCBI_ASSEMBLIES" + yaml_path: "../goat-data-main/sources/assembly-data/vgp_phase1.types.yaml" + s3_path: "s3://goat/sources/assembly-data/" + work_dir: "/home/ubuntu/tmp/test/assembly-data-{{ prefect.variables.date }}" + data_freeze_path: "s3://goat/resources/data-freezes/vgp_phase1_list_data_freeze.tsv" + append: true + dry_run: true + work_pool: *goat_data_work_pool diff --git a/flows/updaters/update_ncbi_datasets.py b/flows/updaters/update_ncbi_datasets.py index ec7b476..e9cf53b 100644 --- a/flows/updaters/update_ncbi_datasets.py +++ b/flows/updaters/update_ncbi_datasets.py @@ -1,11 +1,13 @@ import hashlib import os +from typing import Optional import boto3 from botocore.exceptions import ClientError from flows.lib.conditional_import import emit_event, flow, task from flows.lib.shared_args import ( + DATA_FREEZE_PATH, OUTPUT_PATH, ROOT_TAXID, S3_PATH, @@ -13,35 +15,10 @@ parse_args, required, ) -from flows.lib.utils import run_quoted +from flows.lib.utils import parse_s3_file, run_quoted -@task(retries=2, retry_delay_seconds=2, log_prints=True) -def fetch_ncbi_datasets_summary( - root_taxid: str, - file_path: str, - min_lines: int = 1, -) -> int: - """ - Fetch NCBI datasets summary for a given root taxID. - - Args: - root_taxid (str): Root taxonomic ID for fetching datasets. - file_path (str): Path to the output file. - min_lines (int): Minimum number of lines in the output file. - - Returns: - int: Number of lines written to the output file. - """ - - # Check if the file already exists and truncate it - os.makedirs(os.path.dirname(file_path), exist_ok=True) - # open the file for writing and truncate it - with open(file_path, "w") as f: - f.truncate(0) - - line_count = 0 - +def fetch_by_root_id(root_taxid, file_path): taxids = [root_taxid] if root_taxid == "2759": taxids = [ @@ -67,6 +44,7 @@ def fetch_ncbi_datasets_summary( "554296", "42452", ] + line_count = 0 for taxid in taxids: if not taxid.isdigit(): raise ValueError(f"Invalid taxid: {taxid}") @@ -103,6 +81,85 @@ def fetch_ncbi_datasets_summary( except Exception as e: # Raise an error if writing to the file fails raise RuntimeError(f"Error writing datasets summary to file: {e}") from e + return line_count + + +def fetch_data_freeze_accessions(data_freeze_path, file_path): + data_freeze = parse_s3_file(data_freeze_path) + accessions = list(data_freeze.keys()) + line_count = 0 + batch_size = 50 + + def chunks(lst, n): + for i in range(0, len(lst), n): + yield lst[i : i + n] + + with open(file_path, "a") as f: + for batch in chunks(accessions, batch_size): + command = [ + "datasets", + "summary", + "genome", + "accession", + *batch, + "--as-json-lines", + ] + result = run_quoted(command, capture_output=True, text=True) + if result.returncode != 0: + if "no genome data" in result.stderr: + print( + f"Warning: {result.stderr.strip()}. " + f"Skipping batch {batch} and continuing." + ) + continue + raise RuntimeError(f"Error fetching datasets summary: {result.stderr}") + + try: + print( + f"Writing datasets summary for batch {batch} to file: {file_path}" + ) + for line in result.stdout.splitlines(): + f.write(line + "\n") + line_count += 1 + except Exception as e: + raise RuntimeError( + f"Error writing datasets summary to file: {e}" + ) from e + return line_count + + +@task(retries=2, retry_delay_seconds=2, log_prints=True) +def fetch_ncbi_datasets_summary( + root_taxid: str, + file_path: str, + data_freeze_path: Optional[str] = None, + min_lines: int = 1, +) -> int: + """ + Fetch NCBI datasets summary for a given root taxID. + + Args: + root_taxid (str): Root taxonomic ID for fetching datasets. + file_path (str): Path to the output file. + data_freeze_path (Optional[str]): Path to the data freeze directory. + min_lines (int): Minimum number of lines in the output file. + + Returns: + int: Number of lines written to the output file. + """ + + # Check if the file already exists and truncate it + os.makedirs(os.path.dirname(file_path), exist_ok=True) + # open the file for writing and truncate it + with open(file_path, "w") as f: + f.truncate(0) + + line_count = 0 + + if data_freeze_path is None: + line_count = fetch_by_root_id(root_taxid, file_path) + else: + line_count = fetch_data_freeze_accessions(data_freeze_path, file_path) # Check if the file has at least min_lines lines if line_count < min_lines: @@ -165,8 +222,15 @@ def generate_md5(file_path): @flow() -def update_ncbi_datasets(root_taxid: str, output_path: str, s3_path: str) -> None: - line_count = fetch_ncbi_datasets_summary(root_taxid, file_path=output_path) +def update_ncbi_datasets( + root_taxid: str, + output_path: str, + s3_path: str, + data_freeze_path: Optional[str] = None, +) -> None: + line_count = fetch_ncbi_datasets_summary( + root_taxid, file_path=output_path, data_freeze_path=data_freeze_path + ) if s3_path: status = compare_datasets_summary(output_path, s3_path) emit_event( @@ -185,7 +249,12 @@ def update_ncbi_datasets(root_taxid: str, output_path: str, s3_path: str) -> Non if __name__ == "__main__": """Run the flow.""" args = parse_args( - [default(ROOT_TAXID, "taxon"), required(OUTPUT_PATH), S3_PATH], + [ + default(ROOT_TAXID, "taxon"), + required(OUTPUT_PATH), + DATA_FREEZE_PATH, + S3_PATH, + ], "Fetch assembly metadata from NCBI datasets.", )