Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .flake8
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
[flake8]
inline-quotes = "
multiline-quotes = """
docstring-quotes = """
docstring-quotes = """
ignore = E203, E266, E501, W503
max-line-length = 88
8 changes: 8 additions & 0 deletions flows/lib/shared_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,14 @@
},
}

DATA_FREEZE_PATH = {
"flags": ["--data_freeze_path"],
"keys": {
"help": "Path to data freeze list TSV on S3.",
"type": str,
},
}

DATE = {
"flags": ["--date"],
"keys": {
Expand Down
37 changes: 37 additions & 0 deletions flows/lib/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import shlex
import shutil
import subprocess
import tempfile
from argparse import Action
from csv import DictReader, Sniffer
from datetime import datetime
Expand Down Expand Up @@ -755,6 +756,42 @@ def upload_to_s3(local_path: str, s3_path: str, gz: bool = False) -> None:
raise e


def parse_s3_file(data_freeze_path: str) -> dict:
"""
Fetch and parse a 2-column TSV from the given S3 path.

Args:
data_freeze_path (str): The S3 path to the TSV file.
Returns:
dict: A dictionary mapping column 1 keys to their corresponding values.

"""
# from s3 to temporary file
print(f"Fetching data freeze file from {data_freeze_path}")
with tempfile.NamedTemporaryFile() as tmp_file:
local_path = tmp_file.name
fetch_from_s3(data_freeze_path, local_path)
parsed = {}
with open(local_path, "r") as f:
for line in f:
parts = [p.strip() for p in line.strip().split("\t")]
key = parts[0]
if len(parts) < 2:
parsed[key] = []
continue
# For each value column, split on comma and store as list
value_lists = [
[v.strip() for v in col.split(",") if v.strip()]
for col in parts[1:]
]
parsed[key] = (
value_lists
if len(value_lists) > 1
else (value_lists[0] if value_lists else [])
)
return parsed


def set_index_name(
index_type: str,
hub_name: str,
Expand Down
11 changes: 10 additions & 1 deletion flows/lib/wrapper_fetch_parse_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from fetch_previous_file_pair import fetch_previous_file_pair
from shared_args import (
APPEND,
DATA_FREEZE_PATH,
DRY_RUN,
MIN_ASSIGNED,
MIN_VALID,
Expand Down Expand Up @@ -37,6 +38,7 @@ def fetch_parse_validate(
yaml_path: str,
s3_path: str,
work_dir: str,
data_freeze_path: Optional[str] = None,
taxdump_path: Optional[str] = None,
append: bool = False,
dry_run: bool = False,
Expand All @@ -51,6 +53,7 @@ def fetch_parse_validate(
yaml_path (str): Path to the source YAML file.
s3_path (str): Path to the TSV directory on S3.
work_dir (str): Path to the working directory.
data_freeze_path (str, optional): Path to a data freeze list TSV on S3.
taxdump_path (str, optional): Path to an NCBI format taxdump.
append (bool, optional): Flag to append values to an existing TSV file(s).
dry_run (bool, optional): Flag to run the flow without updating s3/git files.
Expand All @@ -65,7 +68,12 @@ def fetch_parse_validate(
append = False
working_yaml = os.path.join(work_dir, os.path.basename(yaml_path))
file_parser = PARSERS.parsers[parser.name]
file_parser.func(working_yaml=working_yaml, work_dir=work_dir, append=append)
file_parser.func(
working_yaml=working_yaml,
work_dir=work_dir,
append=append,
data_freeze_path=data_freeze_path,
)
if dry_run:
# set s3_path = None to skip copying the validated file to S3/git
s3_path = None
Expand All @@ -90,6 +98,7 @@ def fetch_parse_validate(
required(YAML_PATH),
required(S3_PATH),
WORK_DIR,
DATA_FREEZE_PATH,
TAXDUMP_PATH,
APPEND,
DRY_RUN,
Expand Down
7 changes: 5 additions & 2 deletions flows/parsers/args.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import argparse

from flows.lib.shared_args import APPEND, INPUT_PATH, YAML_PATH
from flows.lib.shared_args import APPEND, DATA_FREEZE_PATH, INPUT_PATH, YAML_PATH
from flows.lib.shared_args import parse_args as _parse_args
from flows.lib.shared_args import required


def parse_args(description: str = "An input file parser") -> argparse.Namespace:
"""Parse command-line arguments."""
return _parse_args([required(INPUT_PATH), required(YAML_PATH), APPEND], description)
return _parse_args(
[required(INPUT_PATH), required(YAML_PATH), APPEND, DATA_FREEZE_PATH],
description,
)
91 changes: 89 additions & 2 deletions flows/parsers/parse_ncbi_assemblies.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import os
import re
import subprocess
from collections import defaultdict
from glob import glob
Expand All @@ -9,7 +10,7 @@

from flows.lib import utils # noqa: E402
from flows.lib.conditional_import import flow, run_count, task # noqa: E402
from flows.lib.utils import Config, Parser # noqa: E402
from flows.lib.utils import Config, Parser, parse_s3_file # noqa: E402
from flows.parsers.args import parse_args # noqa: E402


Expand Down Expand Up @@ -353,6 +354,7 @@ def process_assembly_reports(
"""
for report in parse_assembly_report(jsonl_path=jsonl_path):
try:
print(f"Processing report for {report.get('accession', 'unknown')}")
processed_report = process_assembly_report(
report, previous_report, config, parsed
)
Expand All @@ -376,9 +378,83 @@ def process_assembly_reports(
continue


@task(log_prints=True)
def parse_data_freeze_file(data_freeze_path: str) -> dict:
"""
Fetch and parse a 2-column TSV with the data freeze list of assemblies and their
respective status from the given S3 path.

Args:
data_freeze_path (str): The S3 path to the data freeze list TSV file.
Returns:
dict: A dictionary mapping assembly accessions to their freeze subsets.

"""
# from s3 to temporary file
print(f"Fetching data freeze file from {data_freeze_path}")
data_freeze = parse_s3_file(data_freeze_path)
print(f"Parsed {len(data_freeze)} entries from data freeze file")
return data_freeze


@task()
def set_data_freeze_default(parsed: dict, data_freeze_name: str):
"""
Set the default data freeze information for all assemblies.

Args:
parsed (dict): A dictionary containing parsed data.
data_freeze_name (str): The name of the default data freeze.
"""
for line in parsed.values():
line["dataFreeze"] = [data_freeze_name]
line["assemblyID"] = line["genbankAccession"]


@task(log_prints=True)
def process_datafreeze_info(processed_report: dict, data_freeze: dict, config: Config):
"""
Process the data freeze information for a given assembly report.
Rename the assembly

Args:
processed_report (dict): A dictionary containing processed assembly data.
data_freeze (dict): A dictionary containing data freeze information.
"""
data_freeze_name = (
re.sub(r"\.tsv(\.gz)?$", "", os.path.basename(config.meta["file_name"]))
if config.meta["file_name"]
else "data_freeze"
)
print(f"Processing data freeze info for {data_freeze_name}")
for line in processed_report.values():
print(
f"Processing data freeze info for {line['refseqAccession']} - "
f"{line['genbankAccession']}"
)
status = data_freeze.get(line["refseqAccession"]) or data_freeze.get(
line["genbankAccession"]
)
if not status:
continue
line["dataFreeze"] = status

accession_name = (
line["refseqAccession"]
if line["refseqAccession"] in data_freeze
else line["genbankAccession"]
)
line["assemblyID"] = f"{accession_name}_{data_freeze_name}"


@flow(log_prints=True)
def parse_ncbi_assemblies(
input_path: str, yaml_path: str, append: bool, feature_file: Optional[str] = None
input_path: str,
yaml_path: str,
append: bool,
feature_file: Optional[str] = None,
data_freeze_path: Optional[str] = None,
**kwargs,
):
"""
Parse NCBI datasets assembly data.
Expand All @@ -388,6 +464,8 @@ def parse_ncbi_assemblies(
yaml_path (str): Path to the YAML configuration file.
append (bool): Flag to append values to an existing TSV file(s).
feature_file (str): Path to the feature file.
data_freeze_path (str): Path to data freeze list TSV on S3.
**kwargs: Additional keyword arguments.
"""
config = utils.load_config(
config_file=yaml_path,
Expand All @@ -396,11 +474,20 @@ def parse_ncbi_assemblies(
)
if feature_file is not None:
set_up_feature_file(config)

biosamples = {}
parsed = {}
previous_report = {} if append else None
process_assembly_reports(input_path, config, biosamples, parsed, previous_report)
set_representative_assemblies(parsed, biosamples)

if data_freeze_path is None:
set_data_freeze_default(parsed, data_freeze_name="latest")
else:
data_freeze = parse_data_freeze_file(
data_freeze_path
) # This returns the data freeze dictionary
process_datafreeze_info(parsed, data_freeze, config)
write_to_tsv(parsed, config)


Expand Down
5 changes: 4 additions & 1 deletion flows/parsers/parse_refseq_organelles.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
from flows.parsers.args import parse_args # noqa: E402


def parse_refseq_organelles(working_yaml: str, work_dir: str, append: bool) -> None:
def parse_refseq_organelles(
working_yaml: str, work_dir: str, append: bool, **kwargs
) -> None:
"""
Wrapper function to parse the RefSeq organelles JSONL file.

Args:
working_yaml (str): Path to the working YAML file.
work_dir (str): Path to the working directory.
append (bool): Whether to append to the existing TSV file.
**kwargs: Additional keyword arguments.
"""
print("parsing RefSeq organelles files")

Expand Down
5 changes: 4 additions & 1 deletion flows/parsers/parse_sequencing_status.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@
from flows.parsers.args import parse_args # noqa: E402


def parse_sequencing_status(working_yaml: str, work_dir: str, append: bool) -> None:
def parse_sequencing_status(
working_yaml: str, work_dir: str, append: bool, **kwargs
) -> None:
"""
Wrapper function to parse the sequencing status files.

Args:
working_yaml (str): Path to the working YAML file.
work_dir (str): Path to the working directory.
append (bool): Whether to append to the existing TSV file.
**kwargs: Additional keyword arguments.
"""
print("parsing sequencing status files")

Expand Down
5 changes: 4 additions & 1 deletion flows/parsers/parse_skip_parsing.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,17 @@ def check_tsv_file_exists(config: Config, work_dir: str) -> bool:


@flow()
def parse_skip_parsing(working_yaml: str, work_dir: str, append: bool) -> None:
def parse_skip_parsing(
working_yaml: str, work_dir: str, append: bool, **kwargs
) -> None:
"""
Skip parsing.

Args:
working_yaml (str): Path to the working YAML file.
work_dir (str): Path to the working directory.
append (bool): Whether to append to the existing TSV file.
**kwargs: Additional keyword arguments.
"""
# Get the config from the YAML file
config = load_config(working_yaml)
Expand Down
38 changes: 38 additions & 0 deletions flows/prefect.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ deployments:
match:
prefect.resource.type: ncbi.datasets
prefect.resource.matches.previous: "no"
prefect.resource.id": fetch.datasets./home/ubuntu/tmp/test/assembly-data-{{ prefect.variables.date }}/ncbi_datasets_eukaryota.jsonl"
expect:
- update.ncbi.datasets.finished
parameters:
Expand Down Expand Up @@ -167,3 +168,40 @@ deployments:
output_path: "/home/ubuntu/tmp/test/genomehubs-taxonomy/eukaryota/nodes.jsonl"
s3_path: "s3://goat/resources/taxonomy/genomehubs/eukaryota/nodes.jsonl.gz"
work_pool: *goat_data_work_pool

- name: update-data-freeze
# This flow updates a data freeze JSONL file
entrypoint: flows/updaters/update_ncbi_datasets.py:update_ncbi_datasets
parameters:
output_path: "/home/ubuntu/tmp/test/assembly-data-{{ prefect.variables.date }}/vgp_phase1.jsonl"
s3_path: s3://goat/resources/data-freeezes/vgp_phase1.jsonl.gz
data_freeze_path: "s3://goat/resources/data-freezes/vgp_phase1_list_data_freeze.tsv"
work_pool: *goat_data_work_pool

- name: fetch-parse-validate-data-freeze
entrypoint: flows/lib/wrapper_fetch_parse_validate.py:fetch_parse_validate
parameters:
parser: "ParserEnum.NCBI_ASSEMBLIES"
yaml_path: "../goat-data-main/sources/assembly-data/vgp_phase1.types.yaml"
s3_path: "s3://goat/sources/assembly-data/"
work_dir: "/home/ubuntu/tmp/test/assembly-data-{{ prefect.variables.date }}"
data_freeze_path: "s3://goat/resources/data-freezes/vgp_phase1_list_data_freeze.tsv"
append: true
dry_run: true
triggers:
- enabled: true
match:
prefect.resource.type: ncbi.datasets
prefect.resource.matches.previous: "no"
prefect.resource.id": fetch.datasets./home/ubuntu/tmp/test/assembly-data-{{ prefect.variables.date }}/vgp_phase1.jsonl"
expect:
- update.ncbi.datasets.finished
parameters:
parser: "ParserEnum.NCBI_ASSEMBLIES"
yaml_path: "../goat-data-main/sources/assembly-data/vgp_phase1.types.yaml"
s3_path: "s3://goat/sources/assembly-data/"
work_dir: "/home/ubuntu/tmp/test/assembly-data-{{ prefect.variables.date }}"
data_freeze_path: "s3://goat/resources/data-freezes/vgp_phase1_list_data_freeze.tsv"
append: true
dry_run: true
work_pool: *goat_data_work_pool
Loading