diff --git a/.flake8 b/.flake8 new file mode 100644 index 0000000..9747547 --- /dev/null +++ b/.flake8 @@ -0,0 +1,4 @@ +[flake8] +inline-quotes = " +multiline-quotes = """ +docstring-quotes = """ \ No newline at end of file diff --git a/.github/workflows/flake8.yml b/.github/workflows/flake8.yml index 528987c..8454a22 100644 --- a/.github/workflows/flake8.yml +++ b/.github/workflows/flake8.yml @@ -20,6 +20,6 @@ jobs: with: ignore: E203,E701,W503,W504,BLK100 max_line_length: 88 - path: src + path: flows plugins: flake8-black flake8-isort flake8-quotes error_classes: E,H,I00,Q00 diff --git a/.vscode/settings.json b/.vscode/settings.json index a656443..292308e 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -12,5 +12,8 @@ "yaml.schemas": { "swaggerviewer:openapi": "file:///Users/rchallis/projects/genomehubs/genomehubs/src/genomehubs-api/src/api-v2.yaml" }, - "restructuredtext.pythonRecommendation.disabled": true + "restructuredtext.pythonRecommendation.disabled": true, + "python-envs.defaultEnvManager": "ms-python.python:conda", + "python-envs.defaultPackageManager": "ms-python.python:conda", + "python-envs.pythonProjects": [] } diff --git a/flows/README.md b/flows/README.md index e8a4baf..de7354c 100644 --- a/flows/README.md +++ b/flows/README.md @@ -2,7 +2,6 @@ A collection of prefect flows for processing and importing data into a GenomeHubs index. - # Initial setup ## Install prefect @@ -85,7 +84,7 @@ prefect --no-prompt deploy --prefect-file flows/prefect.yaml --all # Local development -All example commands below assume dependencies have been installed as described in [install dependencies](#install-dependencies) above. For local development, the [install prefect](#install-prefect) and [deploy flows](#deploy-flows) steps are not needed. All flows can be run with a `SKIP_PREFECT` environment variable to run without a Prefect API connection. +All example commands below assume dependencies have been installed as described in [install dependencies](#install-dependencies) above. For local development, the [install prefect](#install-prefect) and [deploy flows](#deploy-flows) steps are not needed. All flows can be run with a `SKIP_PREFECT` environment variable to run without a Prefect API connection. When writing new flows and tasks, please follow the established conventions, referring to existing files as examples. The import sections need to handle running with and without prefect so will typically make use of `flows/lib/conditional_import.py` and command line arguments should be standardised across all flows by importing argument definitions from `flows/lib/shared_args.py`. @@ -100,7 +99,7 @@ Updaters are flows used to update the local copy of data from a remote resource, The `update_ncbi_datasets.py` updater runs the [ncbi datasets] tool for a given root taxon ID to return a JSONL file with one line per assembly. It will optionally compare the number of lines in the fetched file to a previous version (stored in an s3 bucket) to determine whether there are additional records available. The flow emits an event on completion that can be used to trigger related flows. ``` -SKIP_PREFECT=true python3 flows/updaters/update_ncbi_datasets.py -r 9608 -o /tmp/assembly-data/ncbi_datasets_canidae.jsonl -s s3://goat/resources/assembly-data/ncbi_datasets_canidae.jsonl +SKIP_PREFECT=true python3 -m flows.updaters.update_ncbi_datasets -r 9608 -o /tmp/assembly-data/ncbi_datasets_canidae.jsonl -s s3://goat/resources/assembly-data/ncbi_datasets_canidae.jsonl ``` ## Fetch parse validate @@ -116,7 +115,7 @@ The flow at `flows/lib/fetch_previous_file_pair.py` is used to fetch a YAML/TSV This example command assumes the [genomehubs/goat-data](https://github.com/genomehubs/goat-data) repository is available in a sibling directory. ``` -SKIP_PREFECT=true python3 flows/lib/fetch_previous_file_pair.py -y ../goat-data/sources/assembly-data/ncbi_datasets_eukaryota.types.yaml -s s3://goat/sources/assembly-data -w /tmp/assembly-data +SKIP_PREFECT=true python3 -m flows.lib.fetch_previous_file_pair -y ../goat-data/sources/assembly-data/ncbi_datasets_eukaryota.types.yaml -s s3://goat/sources/assembly-data -w /tmp/assembly-data ``` ### `flows/parsers` @@ -130,7 +129,7 @@ The `parse_ncbi_assemblies.py` parser takes an NCBI datasets JSONL file as input This example command assumes the [genomehubs/goat-data](https://github.com/genomehubs/goat-data) repository is available in a sibling directory. ``` -SKIP_PREFECT=true python3 flows/parsers/parse_ncbi_assemblies.py -i /tmp/assembly-data/ncbi_datasets_canidae.jsonl -y /tmp/assembly-data/ncbi_datasets_eukaryota.types.yaml -a +SKIP_PREFECT=true python3 -m flows.parsers.parse_ncbi_assemblies -i /tmp/assembly-data/ncbi_datasets_canidae.jsonl -y /tmp/assembly-data/ncbi_datasets_eukaryota.types.yaml -a ``` ### Validate @@ -142,7 +141,7 @@ The `blobtk validate` command is still experimental and has not been tested on t #### Example ``` -SKIP_PREFECT=true python3 flows/lib/validate_file_pair.py -y /tmp/assembly-data/ncbi_datasets_eukaryota.types.yaml -w /tmp/assembly-data +SKIP_PREFECT=true python3 -m flows.lib.validate_file_pair -y /tmp/assembly-data/ncbi_datasets_eukaryota.types.yaml -w /tmp/assembly-data ``` diff --git a/flows/feature_parsers/__init__.py b/flows/feature_parsers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flows/feature_parsers/args.py b/flows/feature_parsers/args.py index aff55a5..659b5c0 100644 --- a/flows/feature_parsers/args.py +++ b/flows/feature_parsers/args.py @@ -1,13 +1,4 @@ -#!/usr/bin/env python3 - -# sourcery skip: avoid-builtin-shadow import argparse -import sys -from os.path import abspath, dirname - -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" from flows.lib.shared_args import INPUT_PATH, YAML_PATH from flows.lib.shared_args import parse_args as _parse_args diff --git a/flows/feature_parsers/parse_blobtoolkit_assembly.py b/flows/feature_parsers/parse_blobtoolkit_assembly.py index dfd3335..eb5dd66 100644 --- a/flows/feature_parsers/parse_blobtoolkit_assembly.py +++ b/flows/feature_parsers/parse_blobtoolkit_assembly.py @@ -1,18 +1,6 @@ -#!/usr/bin/env python3 - -# sourcery skip: avoid-builtin-shadow import os -import sys from glob import glob -from os.path import abspath, dirname - -# from genomehubs import utils as gh_utils - -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" -# from flows.lib import utils # noqa: E402 from flows.lib.conditional_import import flow # noqa: E402 from flows.lib.utils import Parser # noqa: E402 from flows.parsers.args import parse_args # noqa: E402 diff --git a/flows/feature_parsers/parse_busco_features.py b/flows/feature_parsers/parse_busco_features.py index 8d5a04c..342862a 100644 --- a/flows/feature_parsers/parse_busco_features.py +++ b/flows/feature_parsers/parse_busco_features.py @@ -1,18 +1,6 @@ -#!/usr/bin/env python3 - -# sourcery skip: avoid-builtin-shadow import os -import sys from glob import glob -from os.path import abspath, dirname - -# from genomehubs import utils as gh_utils - -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" -# from flows.lib import utils # noqa: E402 from flows.lib.conditional_import import flow # noqa: E402 from flows.lib.utils import Parser # noqa: E402 from flows.parsers.args import parse_args # noqa: E402 diff --git a/flows/feature_parsers/register.py b/flows/feature_parsers/register.py index e3c421b..beb9332 100644 --- a/flows/feature_parsers/register.py +++ b/flows/feature_parsers/register.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import importlib.util import os from enum import Enum, auto diff --git a/flows/lib/__init__.py b/flows/lib/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flows/lib/conditional_import.py b/flows/lib/conditional_import.py index 82dc6b1..ada08c4 100644 --- a/flows/lib/conditional_import.py +++ b/flows/lib/conditional_import.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import os diff --git a/flows/lib/fetch_genomehubs_target_list.py b/flows/lib/fetch_genomehubs_target_list.py index f0872c9..c12d978 100644 --- a/flows/lib/fetch_genomehubs_target_list.py +++ b/flows/lib/fetch_genomehubs_target_list.py @@ -1,9 +1,6 @@ -#!/usr/bin/env python3 - import os from urllib.parse import urlencode -import requests from conditional_import import emit_event, flow, task from shared_args import ( API_URL, @@ -15,6 +12,8 @@ parse_args, ) +from flows.lib.utils import safe_get + @task() def fetch_genomehubs_list_file( @@ -52,7 +51,7 @@ def fetch_genomehubs_list_file( print(f"Fetching records from {url}") # Fetch the list of target records - response = requests.get(url, headers={"Accept": "text/tab-separated-values"}) + response = safe_get(url, headers={"Accept": "text/tab-separated-values"}) response.raise_for_status() records = response.text # write records to file @@ -124,3 +123,4 @@ def fetch_genomehubs_target_list( ) fetch_genomehubs_target_list(**vars(args)) + fetch_genomehubs_target_list(**vars(args)) diff --git a/flows/lib/fetch_previous_file_pair.py b/flows/lib/fetch_previous_file_pair.py index 91e7794..db48dbf 100644 --- a/flows/lib/fetch_previous_file_pair.py +++ b/flows/lib/fetch_previous_file_pair.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import gzip import os import shutil diff --git a/flows/lib/for_each_record.py b/flows/lib/for_each_record.py index e25a6f2..fc03aa7 100644 --- a/flows/lib/for_each_record.py +++ b/flows/lib/for_each_record.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - from typing import Generator from conditional_import import flow @@ -7,7 +5,6 @@ ID_COLUMN, INPUT_PATH, S3_PATH, - SSH_PATH, WORK_DIR, multi, parse_args, @@ -48,7 +45,6 @@ def for_each_record( required(ID_COLUMN), WORK_DIR, multi(S3_PATH), - # multi(SSH_PATH), ], "Run a flow for each record in an input file.", ) diff --git a/flows/lib/index_assembly_features.py b/flows/lib/index_assembly_features.py index 886fc08..1c0951b 100644 --- a/flows/lib/index_assembly_features.py +++ b/flows/lib/index_assembly_features.py @@ -1,9 +1,6 @@ -#!/usr/bin/env python3 - import os from urllib.parse import urlencode -import requests from conditional_import import flow, task from shared_args import ( ASSEMBLY_ID, @@ -15,7 +12,7 @@ parse_args, required, ) -from utils import find_http_file, find_s3_file, get_genomehubs_attribute_value +from utils import find_http_file, find_s3_file, get_genomehubs_attribute_value, safe_get @task() @@ -56,7 +53,7 @@ def list_busco_lineages(assembly_id: str, work_dir: str) -> list: ) url = f"{goat_api}/search?{queryString}" # Fetch the list of BUSCO lineages - response = requests.get(url) + response = safe_get(url) response.raise_for_status() return [ get_genomehubs_attribute_value(result, "odb10_lineage") @@ -75,9 +72,9 @@ def find_busco_files(assembly_id, busco_lineages, work_dir, http_path): for path in busco_http_path: if busco_file := find_http_file(path, f"{lineage}/full_table.tsv"): local_file = f"{busco_work_dir}/{lineage}_full_table.tsv" - requests.get(busco_file).content + safe_get(busco_file).content with open(local_file, "wb") as file: - file.write(requests.get(busco_file).content) + file.write(safe_get(busco_file).content) busco_files.append(local_file) break return busco_files @@ -87,7 +84,7 @@ def find_busco_files(assembly_id, busco_lineages, work_dir, http_path): def find_blobtoolkit_files(assembly_id, work_dir, http_path): blobtoolkit_api_url = "https://blobtoolkit.genomehubs.org/api/v1" blobtoolkit_search_url = f"{blobtoolkit_api_url}/search/{assembly_id}" - response = requests.get(blobtoolkit_search_url) + response = safe_get(blobtoolkit_search_url) response.raise_for_status() results = response.json() if not results: @@ -107,7 +104,7 @@ def find_blobtoolkit_files(assembly_id, work_dir, http_path): return [] # fetch the full dataset metadata blobtoolkit_metadata_url = f"{blobtoolkit_api_url}/dataset/id/{dataset_id}" - response = requests.get(blobtoolkit_metadata_url) + response = safe_get(blobtoolkit_metadata_url) response.raise_for_status() metadata = response.json() print(metadata) @@ -132,8 +129,8 @@ def index_assembly_features( # if snapshot_exists(s3_path, assembly_id, "feature"): # return taxon_id # find_files(assembly_id, work_dir, s3_path) - busco_lineages = list_busco_lineages(assembly_id, work_dir) - busco_files = find_busco_files(assembly_id, busco_lineages, work_dir, http_path) + # busco_lineages = list_busco_lineages(assembly_id, work_dir) + # busco_files = find_busco_files(assembly_id, busco_lineages, work_dir, http_path) blobtoolkit_files = find_blobtoolkit_files(assembly_id, work_dir, http_path) print(blobtoolkit_files) diff --git a/flows/lib/process_features.py b/flows/lib/process_features.py index 4f2e142..1ed2239 100644 --- a/flows/lib/process_features.py +++ b/flows/lib/process_features.py @@ -1,14 +1,8 @@ -import sys from enum import Enum -from os.path import abspath, dirname from shared_args import WORK_DIR, YAML_PATH, parse_args, required from utils import enum_action -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" - from flows.feature_parsers.register import register_plugins # noqa: E402 PARSERS = register_plugins() diff --git a/flows/lib/shared_args.py b/flows/lib/shared_args.py index 426952f..f7d4489 100644 --- a/flows/lib/shared_args.py +++ b/flows/lib/shared_args.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python3 """ Arguments shared between scripts. diff --git a/flows/lib/shared_tasks.py b/flows/lib/shared_tasks.py index af0e4bb..f0f0b60 100644 --- a/flows/lib/shared_tasks.py +++ b/flows/lib/shared_tasks.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import os from conditional_import import NO_CACHE, task diff --git a/flows/lib/utils.py b/flows/lib/utils.py index 9149fc3..1f40f8f 100644 --- a/flows/lib/utils.py +++ b/flows/lib/utils.py @@ -1,6 +1,13 @@ #!/usr/bin/python3 import contextlib +import gzip +import hashlib +import os +import re +import shlex +import shutil +import subprocess from argparse import Action from csv import DictReader, Sniffer from datetime import datetime @@ -9,6 +16,8 @@ import boto3 import requests +from botocore.exceptions import ClientError +from dateutil import parser from genomehubs import utils as gh_utils @@ -540,6 +549,15 @@ def __call__(self, parser, namespace, values, option_string=None): return EnumAction +def safe_get(*args, method="GET", timeout=300, **kwargs): + if method == "GET": + return requests.get(*args, timeout=timeout, **kwargs) + elif method == "POST": + return requests.post(*args, timeout=timeout, **kwargs) + elif method == "HEAD": + return requests.head(*args, timeout=timeout, **kwargs) + + def find_http_file(http_path: str, filename: str) -> str: """ Find files for the record ID. @@ -551,7 +569,7 @@ def find_http_file(http_path: str, filename: str) -> str: Returns: str: Path to the file. """ - response = requests.get(f"{http_path}/{filename}") + response = safe_get(f"{http_path}/{filename}") return f"{http_path}/{filename}" if response.status_code == 200 else None @@ -596,6 +614,147 @@ def find_s3_file(s3_path: list, filename: str) -> str: return None +def is_safe_path(path: str) -> bool: + # Only allow alphanumeric, dash, underscore, dot, slash, colon (for s3), tilde, + # and absolute paths. + # Tilde (~) and absolute paths are allowed because this function is only used + # with trusted internal input. + # Directory traversal ('..') is blocked. + # Allow URLs (e.g., http://, https://, s3://) and URL-safe characters + # URL-safe: alphanumeric, dash, underscore, dot, slash, colon, tilde, percent, + # question, ampersand, equals + url_pattern = r"^[\w]+://" + url_safe_pattern = r"^[\w\-/.:~%?&=]+$" + if re.match(url_pattern, path): + return ".." not in path and re.match(url_safe_pattern, path) + return ".." not in path if re.match(url_safe_pattern, path) else False + + +def run_quoted(cmd, **kwargs): + quoted_cmd = [shlex.quote(str(arg)) for arg in cmd] + return subprocess.run(quoted_cmd, **kwargs) + + +def popen_quoted(cmd, **kwargs): + quoted_cmd = [shlex.quote(str(arg)) for arg in cmd] + return subprocess.Popen(quoted_cmd, **kwargs) + + +def parse_s3_path(s3_path): + # Extract bucket name and key from the S3 path + bucket, key = s3_path.removeprefix("s3://").split("/", 1) + return bucket, key + + +def fetch_from_s3(s3_path: str, local_path: str, gz: bool = False) -> None: + """ + Fetch a file from S3. + + Args: + s3_path (str): Path to the remote file on s3. + local_path (str): Path to the local file. + gz (bool): Whether to gunzip the file after downloading. Defaults to False. + + Returns: + None: This function downloads the file from S3 to the local path. + """ + s3 = boto3.client("s3") + + bucket, key = parse_s3_path(s3_path) + + if s3_path.endswith(".gz") and not local_path.endswith(".gz"): + gz = True + + # Download the file from S3 to the local path + try: + if gz: + gz_path = f"{local_path}.gz" + s3.download_file(Bucket=bucket, Key=key, Filename=gz_path) + # Unzip gz_path to local_path, then remove gz_path + with gzip.open(gz_path, "rb") as f_in, open(local_path, "wb") as f_out: + shutil.copyfileobj(f_in, f_out) + os.remove(gz_path) + else: + s3.download_file(Bucket=bucket, Key=key, Filename=local_path) + except ClientError as e: + print(f"Error downloading {s3_path} to {local_path}: {e}") + raise e + + +def upload_to_s3(local_path: str, s3_path: str, gz: bool = False) -> None: + """ + Upload a file to S3. + + Args: + local_path (str): Path to the local file. + s3_path (str): Path to the remote file on s3. + gz (bool): Whether to gzip the file before uploading. Defaults to False. + + Returns: + None: This function uploads the local file to S3. + """ + + if not is_safe_path(local_path): + raise ValueError(f"Unsafe local path: {local_path}") + if not is_safe_path(s3_path): + raise ValueError(f"Unsafe s3 path: {s3_path}") + + if s3_path.endswith(".gz") and not local_path.endswith(".gz"): + gz = True + + try: + if gz: + gz_path = f"{local_path}.gz" + with open(local_path, "rb") as f_in, gzip.open(gz_path, "wb") as f_out: + f_out.write(f_in.read()) + try: + # use s3cmd for uploads due to issues with boto3 and large files + cmd = [ + "s3cmd", + "put", + "--acl-public", + gz_path, + s3_path, + ] + result = run_quoted( + cmd, + capture_output=True, + text=True, + ) + if result.returncode != 0: + print( + ( + f"Error uploading {local_path} to {s3_path} " + f"with s3cmd: {result.stderr}" + ) + ) + raise RuntimeError(f"s3cmd upload failed: {result.stderr}") + finally: + if os.path.exists(gz_path): + os.remove(gz_path) + else: + # use s3cmd for uploads due to issues with boto3 and large files + cmd = [ + "s3cmd", + "put", + "--acl-public", + local_path, + s3_path, + ] + result = run_quoted(cmd, capture_output=True, text=True) + if result.returncode != 0: + print( + ( + f"Error uploading {local_path} to {s3_path} " + f"with s3cmd: {result.stderr}" + ) + ) + raise RuntimeError(f"s3cmd upload failed: {result.stderr}") + except Exception as e: + print(f"Error uploading {local_path} to {s3_path}: {e}") + raise e + + def set_index_name( index_type: str, hub_name: str, @@ -640,4 +799,137 @@ def parse_tsv(text: str) -> List[Dict[str, str]]: sniffer = Sniffer() dialect = sniffer.sniff(text) reader = DictReader(StringIO(text), dialect=dialect) - return [row for row in reader] + return list(reader) + + +def last_modified_git_remote(http_path: str) -> Optional[int]: + """ + Get the last modified date of a file in a git repository. + + Args: + http_path (str): Path to the HTTP file. + + Returns: + Optional[int]: Last modified date of the file, or None if not found. + """ + try: + if not http_path.startswith("https://gitlab.com/"): + print(f"Malformed GitLab URL (missing prefix): {http_path}") + return None + project_path = http_path.removeprefix("https://gitlab.com/") + project_path = project_path.removesuffix(".git") + parts = project_path.split("/") + if len(parts) < 6: + print(f"Malformed GitLab URL (not enough parts): {http_path}") + return None + project = "%2F".join(parts[:2]) + ref = parts[4] + file = "%2F".join(parts[5:]).split("?")[0] + api_url = ( + f"https://gitlab.com/api/v4/projects/{project}/repository/commits" + f"?ref_name={ref}&path={file}&per_page=1" + ) + response = safe_get(api_url) + if response.status_code == 200: + commits = response.json() + if commits and commits[0].get("committed_date"): + dt = parser.isoparse(commits[0]["committed_date"]) + return int(dt.timestamp()) + else: + response = safe_get(http_path, method="HEAD", allow_redirects=True) + if response.status_code == 200: + if last_modified := response.headers.get("Last-Modified", None): + dt = parser.parse(last_modified) + return int(dt.timestamp()) + return None + except Exception as e: + print(f"Error parsing GitLab URL or fetching commit info: {e}") + return None + + +def last_modified_http(http_path: str) -> Optional[int]: + """ + Get the last modified date of a file. + + Args: + http_path (str): Path to the HTTP file. + + Returns: + Optional[int]: Last modified date of the file, or None if not found. + """ + if "gitlab.com" in http_path: + return last_modified_git_remote(http_path) + response = safe_get(http_path, method="HEAD", allow_redirects=True) + if response.status_code == 200: + if last_modified := response.headers.get("Last-Modified", None): + dt = parser.parse(last_modified) + return int(dt.timestamp()) + return None + return None + + +def last_modified_s3(s3_path: str) -> Optional[int]: + """ + Get the last modified date of a file on S3. + + Args: + s3_path (str): Path to the remote file on s3. + + Returns: + Optional[int]: Last modified date of the file, or None if not found. + """ + s3 = boto3.client("s3") + + bucket, key = parse_s3_path(s3_path) + + # Return None if the remote file does not exist + try: + response = s3.head_object(Bucket=bucket, Key=key) + last_modified = response.get("LastModified", None) + return int(last_modified.timestamp()) if last_modified is not None else None + except ClientError: + return None + + +def last_modified(local_path: str) -> Optional[int]: + """ + Get the last modified date of a local file. + + Args: + local_path (str): Path to the local file. + + Returns: + Optional[int]: Last modified date of the file as a Unix timestamp, or None if + the file does not exist. + """ + if not os.path.exists(local_path): + return None + mtime = os.path.getmtime(local_path) + return int(mtime) + + +def is_local_file_current_http(local_path: str, http_path: str) -> bool: + """ + Compare the last modified date of a local file with a remote file on HTTP. + + Args: + local_path (str): Path to the local file. + http_path (str): Path to the HTTP directory. + + Returns: + bool: True if the local file is up-to-date, False otherwise. + """ + local_date = last_modified(local_path) + remote_date = last_modified_http(http_path) + print(f"Local date: {local_date}, Remote date: {remote_date}") + if local_date is None or remote_date is None: + return False + return local_date >= remote_date + + +def generate_md5(file_path): + hash_md5 = hashlib.md5() + with open(file_path, "rb") as f: + for chunk in iter(lambda: f.read(4096), b""): + hash_md5.update(chunk) + return hash_md5.hexdigest() diff --git a/flows/lib/validate_file_pair.py b/flows/lib/validate_file_pair.py index 1ada5f3..23c7031 100644 --- a/flows/lib/validate_file_pair.py +++ b/flows/lib/validate_file_pair.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import json import os import shutil @@ -43,9 +41,14 @@ def validate_yaml_file( Returns: bool: True if the YAML file is valid, False otherwise. """ + if not utils.is_safe_path(yaml_path): + raise ValueError(f"Unsafe YAML path: {yaml_path}") + # Validate the YAML file using blobtk validate cmd = ["blobtk", "validate", "-g", yaml_path] if taxdump_path is not None: + if not utils.is_safe_path(taxdump_path): + raise ValueError(f"Unsafe taxdump path: {taxdump_path}") cmd.extend( [ "-t", @@ -56,7 +59,7 @@ def validate_yaml_file( ) # Run the command with subprocess run and capture stdout - result = subprocess.run(cmd, stdout=subprocess.PIPE, text=True) + result = utils.run_quoted(cmd, stdout=subprocess.PIPE, text=True) status = result.returncode == 0 output = result.stdout sys.stdout.write(output) diff --git a/flows/lib/wrapper_fetch_parse_validate.py b/flows/lib/wrapper_fetch_parse_validate.py index 68253d1..522f23a 100644 --- a/flows/lib/wrapper_fetch_parse_validate.py +++ b/flows/lib/wrapper_fetch_parse_validate.py @@ -1,10 +1,5 @@ -#!/usr/bin/env python3 - -# sourcery skip: avoid-builtin-shadow import os -import sys from enum import Enum -from os.path import abspath, dirname from typing import Optional from conditional_import import flow @@ -24,10 +19,6 @@ from utils import enum_action from validate_file_pair import validate_file_pair -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" - from flows.parsers.register import register_plugins # noqa: E402 PARSERS = register_plugins() diff --git a/flows/parsers/__init__.py b/flows/parsers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flows/parsers/args.py b/flows/parsers/args.py index fec9845..a848e9b 100644 --- a/flows/parsers/args.py +++ b/flows/parsers/args.py @@ -1,13 +1,4 @@ -#!/usr/bin/env python3 - -# sourcery skip: avoid-builtin-shadow import argparse -import sys -from os.path import abspath, dirname - -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" from flows.lib.shared_args import APPEND, INPUT_PATH, YAML_PATH from flows.lib.shared_args import parse_args as _parse_args diff --git a/flows/parsers/parse_ncbi_assemblies.py b/flows/parsers/parse_ncbi_assemblies.py index f9814f1..3efffab 100644 --- a/flows/parsers/parse_ncbi_assemblies.py +++ b/flows/parsers/parse_ncbi_assemblies.py @@ -1,21 +1,12 @@ -#!/usr/bin/env python3 - -# sourcery skip: avoid-builtin-shadow import json import os import subprocess -import sys from collections import defaultdict from glob import glob -from os.path import abspath, dirname from typing import Generator, Optional from genomehubs import utils as gh_utils -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" - from flows.lib import utils # noqa: E402 from flows.lib.conditional_import import flow, run_count, task # noqa: E402 from flows.lib.utils import Config, Parser # noqa: E402 @@ -53,7 +44,9 @@ def fetch_ncbi_datasets_sequences( Yields: dict: The sequence report data as a JSON object, one line at a time. """ - result = subprocess.run( + if not utils.is_safe_path(accession): + raise ValueError(f"Unsafe accession: {accession}") + result = utils.run_quoted( [ "datasets", "summary", @@ -76,6 +69,28 @@ def fetch_ncbi_datasets_sequences( yield json.loads(line) +def is_atypical_assembly(report: dict, parsed: dict) -> bool: + """ + Check if an assembly is atypical. + + Args: + report (dict): A dictionary containing the assembly information. + parsed (dict): A dictionary containing parsed data. + + Returns: + bool: True if the assembly is atypical, False otherwise. + """ + if "assemblyInfo" not in report: + return True + if report["assemblyInfo"].get("atypical", {}).get("isAtypical", False): + # delete from parsed if present + accession = report["accession"] + if accession in parsed: + del parsed[accession] + return True + return False + + def process_assembly_report( report: dict, previous_report: Optional[dict], config: Config, parsed: dict ) -> dict: @@ -98,6 +113,9 @@ def process_assembly_report( Returns: dict: The updated report dictionary. """ + # Uncomment to filter atypical assemblies + # if is_atypical_assembly(report, parsed): + # return None processed_report = {**report, "processedAssemblyInfo": {"organelle": "nucleus"}} if "pairedAccession" in report: if processed_report["pairedAccession"].startswith("GCF_"): @@ -215,6 +233,8 @@ def add_report_to_parsed_reports( continue linked_row = parsed[acc] if accession not in linked_row["linkedAssembly"]: + if not isinstance(linked_row["linkedAssembly"], list): + linked_row["linkedAssembly"] = [] linked_row["linkedAssembly"].append(accession) if acc not in row["linkedAssembly"]: row["linkedAssembly"].append(acc) @@ -336,7 +356,9 @@ def process_assembly_reports( processed_report = process_assembly_report( report, previous_report, config, parsed ) - if use_previous_report(processed_report, parsed, config): + if processed_report is None or use_previous_report( + processed_report, parsed, config + ): continue fetch_and_parse_sequence_report(processed_report) append_features(processed_report, config) @@ -345,7 +367,11 @@ def process_assembly_reports( previous_report = processed_report except Exception as e: print( - f"Error processing report for {report.get('accession', 'unknown')}: {e}" + ( + f"Error processing report for " + f"{report.get('accession', 'unknown')}: " + f"{e} (line {e.__traceback__.tb_lineno})" + ) ) continue diff --git a/flows/parsers/parse_refseq_organelles.py b/flows/parsers/parse_refseq_organelles.py index d496801..09b7d37 100644 --- a/flows/parsers/parse_refseq_organelles.py +++ b/flows/parsers/parse_refseq_organelles.py @@ -1,13 +1,3 @@ -#!/usr/bin/env python3 - -# sourcery skip: avoid-builtin-shadow -import sys -from os.path import abspath, dirname - -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" - from flows.lib.utils import Parser # noqa: E402 from flows.parsers.args import parse_args # noqa: E402 diff --git a/flows/parsers/parse_sequencing_status.py b/flows/parsers/parse_sequencing_status.py index 74df853..3be87c8 100644 --- a/flows/parsers/parse_sequencing_status.py +++ b/flows/parsers/parse_sequencing_status.py @@ -1,13 +1,3 @@ -#!/usr/bin/env python3 - -# sourcery skip: avoid-builtin-shadow -import sys -from os.path import abspath, dirname - -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" - from flows.lib.utils import Parser # noqa: E402 from flows.parsers.args import parse_args # noqa: E402 diff --git a/flows/parsers/parse_skip_parsing.py b/flows/parsers/parse_skip_parsing.py index 9c92443..11e21c2 100644 --- a/flows/parsers/parse_skip_parsing.py +++ b/flows/parsers/parse_skip_parsing.py @@ -1,13 +1,4 @@ -#!/usr/bin/env python3 - -# sourcery skip: avoid-builtin-shadow import os -import sys -from os.path import abspath, dirname - -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" from flows.lib.conditional_import import flow, task # noqa: E402 from flows.lib.shared_tasks import get_filenames # noqa: E402 diff --git a/flows/parsers/register.py b/flows/parsers/register.py index 2a714ab..940b8d4 100644 --- a/flows/parsers/register.py +++ b/flows/parsers/register.py @@ -1,5 +1,3 @@ -#!/usr/bin/env python3 - import importlib.util import os from enum import Enum, auto diff --git a/flows/prefect.yaml b/flows/prefect.yaml index 96d98f9..9a45e6d 100644 --- a/flows/prefect.yaml +++ b/flows/prefect.yaml @@ -30,6 +30,8 @@ definitions: cron: "5 0 * * 1-6" weekly: &weekly # Runs every Monday at 00:05 UTC cron: "5 0 * * 1" + monthly: &monthly # Runs on the 1st of every month at 00:05 UTC + cron: "5 0 1 * *" deployments: - name: update-ncbi-datasets @@ -82,3 +84,86 @@ deployments: append: true dry_run: true work_pool: *goat_data_work_pool + + - name: update-tolid-prefixes + # This flow updates the TOLID prefixes file + entrypoint: flows/updaters/update_tolid_prefixes.py:update_tolid_prefixes + parameters: + # Local directory path to save the TOLID prefixes TSV file + output_path: "/home/ubuntu/tmp/test/tolid-prefixes" + schedules: + - *weekly + work_pool: *goat_data_work_pool + + - name: update-ott-taxonomy + # This flow updates the OTT taxonomy file + entrypoint: flows/updaters/update_ott_taxonomy.py:update_ott_taxonomy + parameters: + # Local directory path to save the OTT taxonomy file + output_path: "/home/ubuntu/tmp/test/ott-taxonomy" + schedules: + - *monthly + work_pool: *goat_data_work_pool + + - name: update-ncbi-taxonomy + # This flow updates the NCBI taxonomy dump + entrypoint: flows/updaters/update_ncbi_taxonomy.py:update_ncbi_taxonomy + parameters: + # Local path to save the NCBI taxonomy dump files + output_path: "/home/ubuntu/tmp/test/ncbi-taxonomy" + schedules: + - *daily + work_pool: *goat_data_work_pool + + - name: update-ena-taxonomy-extra + # This flow updates the ENA taxonomy dump + entrypoint: flows/updaters/update_ena_taxonomy_extra.py:update_ena_taxonomy_extra + parameters: + # Root taxon Id to use. 2759 is the taxon Id for Eukaryota + root_taxid: "2759" + # Local path to the NCBI taxonomy dump files + taxdump_path: "/home/ubuntu/tmp/test/ncbi-taxonomy" + # Local path to save the ENA taxonomy JSONL file + output_path: "/home/ubuntu/tmp/test/ena-taxonomy/ena-taxonomy-extra.jsonl" + # The S3 path to save the ENA taxonomy JSONL file + s3_path: "s3://goat/resources/taxonomy/ena/ena-taxonomy-extra.jsonl.gz" + # A flag to indicate if the flow should append to the existing JSONL file + append: true + triggers: + - enabled: true + match: + prefect.resource.type: ncbi.taxonomy + expect: + - update.ncbi.taxonomy.finished + parameters: + root_taxid: "2759" + taxdump_path: "/home/ubuntu/tmp/test/ncbi-taxonomy" + output_path: "/home/ubuntu/tmp/test/ena-taxonomy/ena-taxonomy-extra.jsonl" + s3_path: "s3://goat/resources/taxonomy/ena/ena-taxonomy-extra.jsonl.gz" + append: true + work_pool: *goat_data_work_pool + + - name: update-genomehubs-taxonomy + # This flow updates the GenomeHubs taxonomy file + entrypoint: flows/updaters/update_genomehubs_taxonomy.py:update_genomehubs_taxonomy + parameters: + # The root taxon Id to use. 2759 is the taxon Id for Eukaryota + root_taxid: "2759" + # Local path to the input configuration YAML file + input_path: "/home/ubuntu/tmp/test/genomehubs-taxonomy/eukaryota-taxonomy-input.yaml" + # Local path to save the GenomeHubs taxonomy JSONL file + output_path: "/home/ubuntu/tmp/test/genomehubs-taxonomy/eukaryota/nodes.jsonl" + # The S3 path to save the GenomeHubs taxonomy JSONL file + s3_path: "s3://goat/resources/taxonomy/genomehubs/eukaryota/nodes.jsonl.gz" + triggers: + - enabled: true + match: + prefect.resource.type: ena.taxonomy + expect: + - update.ena.taxonomy.finished + parameters: + root_taxid: "2759" + input_path: "/home/ubuntu/tmp/test/genomehubs-taxonomy/eukaryota-taxonomy-input.yaml" + output_path: "/home/ubuntu/tmp/test/genomehubs-taxonomy/eukaryota/nodes.jsonl" + s3_path: "s3://goat/resources/taxonomy/genomehubs/eukaryota/nodes.jsonl.gz" + work_pool: *goat_data_work_pool diff --git a/flows/updaters/__init__.py b/flows/updaters/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flows/updaters/api/__init__.py b/flows/updaters/api/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/flows/updaters/api/api_config.py b/flows/updaters/api/api_config.py index 4409306..333bcea 100644 --- a/flows/updaters/api/api_config.py +++ b/flows/updaters/api/api_config.py @@ -1,8 +1,9 @@ import json -import requests import yaml +from flows.lib.utils import safe_get + ##################################################################### # VGL ##################################################################### @@ -24,7 +25,7 @@ def vgl_url_opener(**kwargs): "https://raw.githubusercontent.com/vgl-hub/genome-portal/" "master/_data/table_tracker.yml" ) - return requests.get(vgl_url, stream=True) + return safe_get(vgl_url, stream=True) def vgl_hub_count_handler(r_text): @@ -81,7 +82,7 @@ def vgl_row_handler(r_text, fieldnames, **kwargs): def nhm_url_opener(**kwargs): - return requests.post(nhm_url, headers=nhm_headers, json=nhm_post_data) + return safe_get(nhm_url, method="POST", headers=nhm_headers, json=nhm_post_data) def nhm_api_count_handler(r_text): @@ -93,7 +94,9 @@ def nhm_row_handler(fieldnames, **kwargs): nhm_post_data_after = nhm_post_data result = [] while True: - response = requests.post(nhm_url, headers=nhm_headers, json=nhm_post_data_after) + response = safe_get( + nhm_url, method="POST", headers=nhm_headers, json=nhm_post_data_after + ) r = response.json() dl = r["result"]["records"] for species in dl: @@ -166,9 +169,7 @@ def nhm_row_handler(fieldnames, **kwargs): def sts_url_opener(token): - return requests.get( - sts_url, headers={"Token": token, "Project": "ALL"}, verify=False - ) + return safe_get(sts_url, headers={"Token": token, "Project": "ALL"}, verify=False) def sts_api_count_handler(r_text): @@ -184,7 +185,7 @@ def sts_row_handler(result_count, fieldnames, token, **kwargs): print(page) url = f"{sts_url}?page={page}&page_size={page_size}" - r = requests.get( + r = safe_get( url, headers={"Token": token, "Project": "ALL"}, verify=False ).json() dl = r["data"]["list"] diff --git a/flows/updaters/update_boat_config.py b/flows/updaters/update_boat_config.py index a369013..71c571c 100644 --- a/flows/updaters/update_boat_config.py +++ b/flows/updaters/update_boat_config.py @@ -1,21 +1,9 @@ -#!/usr/bin/env python3 - -# sourcery skip: avoid-builtin-shadow import hashlib import os -import subprocess -import sys -from os.path import abspath, dirname import boto3 -import requests from botocore.exceptions import ClientError -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" - -# from flows.lib.conditional_import import emit_event, flow, task from flows.lib.conditional_import import flow, task from flows.lib.shared_args import ( APPEND, @@ -26,10 +14,16 @@ parse_args, required, ) -from flows.lib.utils import parse_tsv +from flows.lib.utils import is_safe_path, parse_tsv, run_quoted, safe_get def taxon_id_to_ssh_path(ssh_host, taxon_id, assembly_name): + + if not is_safe_path(ssh_host): + raise ValueError(f"Unsafe ssh host: {ssh_host}") + if not is_safe_path(taxon_id): + raise ValueError(f"Unsafe taxon_id: {taxon_id}") + command = [ "ssh", ssh_host, @@ -40,7 +34,7 @@ def taxon_id_to_ssh_path(ssh_host, taxon_id, assembly_name): f"speciesops getdir --taxon_id {taxon_id}'" ), ] - result = subprocess.run(command, capture_output=True, text=True) + result = run_quoted(command, capture_output=True, text=True) if result.returncode != 0: print( ( @@ -66,6 +60,10 @@ def taxon_id_to_ssh_path(ssh_host, taxon_id, assembly_name): def lookup_buscos(ssh_host, file_path): if "lustre" in file_path: + if not is_safe_path(ssh_host): + raise ValueError(f"Unsafe ssh host: {ssh_host}") + if not is_safe_path(file_path): + raise ValueError(f"Unsafe file path: {file_path}") command = [ "ssh", @@ -74,7 +72,7 @@ def lookup_buscos(ssh_host, file_path): "-c", (f"'ls -d {file_path}/*_odb*/'"), ] - result = subprocess.run(command, capture_output=True, text=True) + result = run_quoted(command, capture_output=True, text=True) if result.returncode != 0: return [] busco_dirs = [ @@ -90,6 +88,12 @@ def assembly_id_to_busco_sets(alt_host, assembly_id): Fetch the alternative path for an assembly ID from the alt host. This function uses SSH to run a command on the alt host to get the path. """ + + if not is_safe_path(alt_host): + raise ValueError(f"Unsafe alt host: {alt_host}") + if not is_safe_path(assembly_id): + raise ValueError(f"Unsafe assembly_id: {assembly_id}") + # find file on alt_host command = [ "ssh", @@ -98,7 +102,7 @@ def assembly_id_to_busco_sets(alt_host, assembly_id): "-c", f"'ls /volumes/data/by_accession/{assembly_id}'", ] - result = subprocess.run(command, capture_output=True, text=True) + result = run_quoted(command, capture_output=True, text=True) if result.returncode == 0: return f"/volumes/data/by_accession/{assembly_id}", result.stdout.splitlines() @@ -115,7 +119,7 @@ def assembly_id_to_busco_sets(alt_host, assembly_id): busco_url = ( f"https://busco.cog.sanger.ac.uk/{assembly_id}/{lineage}/full_table.tsv" ) - response = requests.get(busco_url) + response = safe_get(busco_url) if response.status_code == 200: busco_sets.append(lineage) return f"https://busco.cog.sanger.ac.uk/{assembly_id}", busco_sets @@ -186,7 +190,7 @@ def fetch_goat_results(root_taxid): # fetch query_url with accept header tsv. use python module requests headers = {"Accept": "text/tab-separated-values"} - response = requests.get(query_url, headers=headers) + response = safe_get(query_url, headers=headers) if response.status_code != 200: raise RuntimeError( f"Error fetching BoaT config info: {response.status_code} {response.text}" diff --git a/flows/updaters/update_ena_taxonomy_extra.py b/flows/updaters/update_ena_taxonomy_extra.py new file mode 100644 index 0000000..90e680c --- /dev/null +++ b/flows/updaters/update_ena_taxonomy_extra.py @@ -0,0 +1,195 @@ +import json +import os +from urllib.request import urlopen + +from tqdm import tqdm + +from flows.lib.conditional_import import emit_event, flow, task +from flows.lib.shared_args import ( + APPEND, + OUTPUT_PATH, + ROOT_TAXID, + S3_PATH, + TAXDUMP_PATH, + default, + parse_args, + required, +) +from flows.lib.utils import fetch_from_s3, upload_to_s3 + + +@task(log_prints=True) +def read_ncbi_tax_ids(taxdump_path: str) -> set[str]: + """Read NCBI tax IDs from the taxdump nodes file.""" + print(f"Reading NCBI taxids from {taxdump_path}") + tax_ids = set() + nodes_file = os.path.join(taxdump_path, "nodes.dmp") + with open(nodes_file, "r") as f: + for line in f: + fields = line.strip().split("\t") + if len(fields) > 1: + tax_ids.add(fields[0]) + return tax_ids + + +@task(log_prints=True) +def add_jsonl_tax_ids(jsonl_path: str, tax_ids: set[str]) -> None: + print(f"Reading previously fetched ENA taxids from {jsonl_path}") + filtered_path = f"{jsonl_path}.filtered" + try: + with open(jsonl_path, "r") as f, open(filtered_path, "w") as f_out: + for line in f: + data = json.loads(line) + tax_id = data["taxId"] + if tax_id not in tax_ids: + f_out.write(line) + tax_ids.add(tax_id) + os.replace(filtered_path, jsonl_path) + except Exception as e: + print(f"Error reading {jsonl_path}: {e}") + exit() + + +@task(log_prints=True) +def get_ena_api_new_taxids(root_taxid: str, existing_tax_ids: set[str]) -> set[str]: + print(f"Fetching new taxids for tax_tree({root_taxid}) from ENA API") + + limit = 10000000 + url = ( + f"https://www.ebi.ac.uk/ena/portal/api/search?result=taxon" + f"&query=tax_tree({root_taxid})&limit={limit}" + ) + + # Stream the content of the URL + column_index = None + new_tax_ids = set() + with urlopen(url) as response: + for line in response: + columns = line.decode("utf-8").strip().split("\t") + if column_index is None: + column_index = 0 if columns[0] == "tax_id" else 1 + else: + tax_id = columns[column_index] + if tax_id not in existing_tax_ids: + new_tax_ids.add(tax_id) + return new_tax_ids + + +@task(log_prints=True) +def fetch_ena_jsonl(tax_id, f_out): + print("Fetching new tax_ids from ENA API") + url = "https://www.ebi.ac.uk/ena/taxonomy/rest/tax-id/" + with urlopen(url + tax_id) as response: + for line in response: + f_out.write(line.decode("utf-8").strip()) + f_out.write("\n") + + +@task(log_prints=True) +def update_ena_jsonl(new_tax_ids: set[str], output_path: str, append: bool) -> None: + print(f"Updating ENA JSONL file at {output_path} with new tax IDs") + url = "https://www.ebi.ac.uk/ena/taxonomy/rest/tax-id/" + try: + os.makedirs(os.path.dirname(output_path), exist_ok=True) + with open(output_path, "a" if append else "w") as f: + for tax_id in tqdm(new_tax_ids, desc="Fetching ENA tax IDs"): + try: + with urlopen(url + tax_id) as response: + for line in response: + f.write(line.decode("utf-8").strip()) + f.write("\n") + except Exception as e: + print(f"Error fetching {tax_id}: {e}") + except Exception as e: + print(f"Error updating {output_path}: {e}") + + +@task(log_prints=True) +def sort_jsonl_by_lineage(jsonl_path: str) -> None: + print(f"Sorting JSONL file by lineage at {jsonl_path}") + sorted_path = f"{jsonl_path}.sorted" + try: + with open(jsonl_path, "r") as f: + data = [json.loads(line) for line in f] + data.sort(key=lambda x: x.get("lineage", "")) + with open(sorted_path, "w") as f_out: + for entry in data: + f_out.write(json.dumps(entry) + "\n") + os.replace(sorted_path, jsonl_path) + except Exception as e: + print(f"Error sorting {jsonl_path}: {e}") + exit() + + +@task(log_prints=True) +def fetch_s3_jsonl(s3_path: str, local_path: str) -> None: + print(f"Fetching existing ENA JSONL file from {s3_path} to {local_path}") + fetch_from_s3(s3_path, local_path) + + +@task(log_prints=True) +def upload_s3_jsonl(local_path: str, s3_path: str) -> None: + print(f"Uploading updated ENA JSONL file from {local_path} to {s3_path}") + upload_to_s3(local_path, s3_path) + + +@flow() +def update_ena_taxonomy_extra( + root_taxid: str, taxdump_path: str, output_path: str, s3_path: str, append: bool +) -> None: + """Update the ENA taxonomy JSONL file. + + Args: + root_taxid (str): Root taxon ID to filter by. + taxdump_path (str): Path to the NCBI taxdump files. + output_path (str): Path to save the taxonomy dump. + s3_path (str): S3 path to upload the taxonomy dump. + append (bool): Flag to append entries to an existing JSONL file. + """ + + # 1. read IDs from ncbi nodes file + existing_tax_ids = read_ncbi_tax_ids(taxdump_path) + if append: + # 2. fetch jsonl file from s3 if s3_path is provided + if s3_path: + fetch_s3_jsonl(s3_path, output_path) + # 3. read existing IDs from local JSONL file + add_jsonl_tax_ids(output_path, existing_tax_ids) + # 4. fetch list of new IDs from ENA API + new_tax_ids = get_ena_api_new_taxids(root_taxid, existing_tax_ids) + # 5. fetch details for new IDs from ENA API and save to JSONL file + update_ena_jsonl(new_tax_ids, output_path, append) + # 6. sort the JSONL file by lineage + sort_jsonl_by_lineage(output_path) + # 7. upload updated JSONL file to s3 if s3_path is provided + if s3_path: + upload_s3_jsonl(output_path, s3_path) + + status = len(new_tax_ids) == 0 + + emit_event( + event="update.ena.taxonomy.finished", + resource={ + "prefect.resource.id": f"fetch.taxonomy.{output_path}", + "prefect.resource.type": "ena.taxonomy", + "prefect.resource.matches.previous": ("yes" if status else "no"), + }, + payload={"matches_previous": status}, + ) + return status + + +if __name__ == "__main__": + """Run the flow.""" + args = parse_args( + [ + default(ROOT_TAXID, "2759"), + required(TAXDUMP_PATH), + required(OUTPUT_PATH), + S3_PATH, + APPEND, + ], + "Fetch extra taxa from ENA taxonomy API and optionally filter by root taxon.", + ) + + update_ena_taxonomy_extra(**vars(args)) diff --git a/flows/updaters/update_genomehubs_taxonomy.py b/flows/updaters/update_genomehubs_taxonomy.py new file mode 100644 index 0000000..d4eb7e9 --- /dev/null +++ b/flows/updaters/update_genomehubs_taxonomy.py @@ -0,0 +1,172 @@ +import os +import subprocess +from collections import defaultdict + +import yaml + +from flows.lib.conditional_import import emit_event, flow, task +from flows.lib.shared_args import ( + INPUT_PATH, + OUTPUT_PATH, + ROOT_TAXID, + S3_PATH, + default, + parse_args, + required, +) +from flows.lib.utils import fetch_from_s3, is_safe_path, popen_quoted, upload_to_s3 + + +def get_file_paths_from_config(config: dict, file_paths: dict) -> dict: + key = config.get("xref_label") + input_path = config.get("path") + output_path = config.get("out") + if key is not None and input_path is not None: + file_paths[key] = { + "input": input_path, + } + return output_path + + +@task(log_prints=True) +def read_input_config(input_path: str) -> dict: + print(f"Reading input config from {input_path}") + file_paths = defaultdict(dict) + try: + with open(input_path, "r") as f: + config = yaml.safe_load(f) + except Exception as e: + print(f"Error reading {input_path}: {e}") + exit() + try: + output_path = get_file_paths_from_config(config, file_paths) + if output_path is not None: + file_paths["out"] = output_path + for taxonomy in config.get("taxonomies", []): + get_file_paths_from_config(taxonomy, file_paths) + except Exception as e: + print(f"Error parsing {input_path}: {e}") + exit() + return file_paths + + +@task(log_prints=True) +def run_blobtk_taxonomy(root_taxid: str, input_path: str, output_path: str) -> None: + print(f"Running blobtk taxonomy with root taxid {root_taxid}") + if not is_safe_path(root_taxid): + raise ValueError(f"Unsafe root taxid: {root_taxid}") + if not is_safe_path(input_path): + raise ValueError(f"Unsafe input path: {input_path}") + if not is_safe_path(output_path): + raise ValueError(f"Unsafe output path: {output_path}") + cmd = [ + "blobtk", + "taxonomy", + "-c", + input_path, + "-O", + output_path, + "-r", + root_taxid, + ] + print(f"Running command: {' '.join(cmd)}") + try: + # Inputs have been validated by is_safe_path; safe to use in subprocess + process = popen_quoted( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + for line in process.stdout: + print(line, end="") + process.wait() + if process.returncode != 0: + print(f"Command failed with exit code {process.returncode}") + exit() + except Exception as e: + print(f"Error running blobtk taxonomy: {e}") + exit() + + +@task(log_prints=True) +def fetch_s3_file(s3_path: str, local_path: str) -> None: + print(f"Fetching file from {s3_path} to {local_path}") + fetch_from_s3(s3_path, local_path) + + +@task(log_prints=True) +def upload_s3_file(local_path: str, s3_path: str) -> None: + print(f"Uploading file from {local_path} to {s3_path}") + upload_to_s3(local_path, s3_path) + + +@flow() +def update_genomehubs_taxonomy( + root_taxid: str, input_path: str, output_path: str, s3_path: str +) -> None: + """Update the GenomeHubs taxonomy JSONL file. + + Args: + root_taxid (str): Root taxon ID to filter by. + input_path (str): Path to the input config file. + output_path (str): Path to save the taxonomy dump. + s3_path (str): S3 path to upload the taxonomy dump. + """ + + # 1. parse input config yaml + file_paths = read_input_config(input_path) + + # 2. check files exist locally (file or directory) + + for key, paths in file_paths.items(): + if "input" in paths: + taxonomy_path = paths["input"] + if not os.path.exists(taxonomy_path): + print(f"Error: {taxonomy_path} not found") + exit() + if not (os.path.isfile(taxonomy_path) or os.path.isdir(taxonomy_path)): + print(f"Error: {taxonomy_path} is not a file or directory") + exit() + # 3. run blobtk to collate and filter taxonomies + run_blobtk_taxonomy(root_taxid, input_path, output_path) + + # 4. upload updated JSONL file to s3 if s3_path is provided + if s3_path: + upload_s3_file(f"{output_path}", s3_path) + + # 5. count lines in output file + line_count = 0 + try: + with open(f"{output_path}", "r") as f: + line_count = sum(1 for _ in f) + print(f"Output file has {line_count} lines") + except Exception as e: + print(f"Error reading {output_path}: {e}") + exit() + + emit_event( + event="update.genomehubs.taxonomy.finished", + resource={ + "prefect.resource.id": f"fetch.taxonomy.{output_path}", + "prefect.resource.type": "genomehubs.taxonomy", + # "prefect.resource.matches.previous": ("yes" if status else "no"), + }, + payload={"line_count": line_count}, + ) + + +if __name__ == "__main__": + """Run the flow.""" + args = parse_args( + [ + default(ROOT_TAXID, "2759"), + required(INPUT_PATH), + required(OUTPUT_PATH), + S3_PATH, + ], + "Collate source taxonomies and names into GenomeHubs JSONL taxonomy format.", + ) + + update_genomehubs_taxonomy(**vars(args)) diff --git a/flows/updaters/update_ncbi_datasets.py b/flows/updaters/update_ncbi_datasets.py index b864b6a..ec7b476 100644 --- a/flows/updaters/update_ncbi_datasets.py +++ b/flows/updaters/update_ncbi_datasets.py @@ -1,18 +1,9 @@ -#!/usr/bin/env python3 - import hashlib import os -import subprocess -import sys -from os.path import abspath, dirname import boto3 from botocore.exceptions import ClientError -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" - from flows.lib.conditional_import import emit_event, flow, task from flows.lib.shared_args import ( OUTPUT_PATH, @@ -22,6 +13,7 @@ parse_args, required, ) +from flows.lib.utils import run_quoted @task(retries=2, retry_delay_seconds=2, log_prints=True) @@ -76,6 +68,8 @@ def fetch_ncbi_datasets_summary( "42452", ] for taxid in taxids: + if not taxid.isdigit(): + raise ValueError(f"Invalid taxid: {taxid}") # datasets summary for the root taxID command = [ "datasets", @@ -85,7 +79,7 @@ def fetch_ncbi_datasets_summary( taxid, "--as-json-lines", ] - result = subprocess.run(command, capture_output=True, text=True) + result = run_quoted(command, capture_output=True, text=True) if result.returncode != 0: if ( "V2reportsRankType" in result.stderr diff --git a/flows/updaters/update_ncbi_taxonomy.py b/flows/updaters/update_ncbi_taxonomy.py new file mode 100644 index 0000000..890d11a --- /dev/null +++ b/flows/updaters/update_ncbi_taxonomy.py @@ -0,0 +1,123 @@ +import os + +from flows.lib.conditional_import import emit_event, flow, task +from flows.lib.shared_args import OUTPUT_PATH, parse_args, required +from flows.lib.utils import ( + generate_md5, + is_local_file_current_http, + is_safe_path, + run_quoted, +) + + +@task(retries=2, retry_delay_seconds=2, log_prints=True) +def fetch_ncbi_taxonomy( + local_path: str, + http_path: str = "https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/taxdump.tar.gz", +) -> bool: + """ + Fetch the NCBI taxonomy dump. + + Args: + http_path (str): URL to fetch the taxonomy dump from. + local_path (str): Path to save the taxonomy dump. + + Returns: + bool: True if the fetched file matches the remote version, False otherwise. + """ + # create local_path if it doesn't exist + if not is_safe_path(local_path): + raise ValueError(f"Unsafe local path: {local_path}") + if not is_safe_path(http_path): + raise ValueError(f"Unsafe HTTP path: {http_path}") + os.makedirs(local_path, exist_ok=True) + local_gz_file = f"{local_path}/taxdump.tar.gz" + # Fetch the remote file + cmd = ["curl", "-sSL", http_path, "-o", local_gz_file] + print(f"Running command: {' '.join(cmd)}") + run_quoted(cmd, check=True) + + remote_md5_path = f"{http_path}.md5" + # Fetch the remote MD5 checksum + cmd = ["curl", "-sSL", remote_md5_path] + print(f"Running command: {' '.join(cmd)}") + result = run_quoted(cmd, check=True, capture_output=True, text=True) + remote_md5 = result.stdout.split()[0] + + # Calculate the local MD5 checksum + local_md5 = generate_md5(local_gz_file) + print(f"Local MD5: {local_md5}, Remote MD5: {remote_md5}") + + if local_md5 != remote_md5: + print("MD5 checksums do not match. The file may be corrupted.") + return False + + # extract the tar.gz file + cmd = ["tar", "-xzf", local_gz_file, "-C", local_path] + print(f"Running command: {' '.join(cmd)}") + run_quoted(cmd, check=True) + + # set the timestamp of extracted files to match the tar.gz file + gz_mtime = os.path.getmtime(local_gz_file) + for fname in os.listdir(local_path): + fpath = os.path.join(local_path, fname) + if os.path.isfile(fpath): + os.utime(fpath, (gz_mtime, gz_mtime)) + + # remove the tar.gz file + os.remove(local_gz_file) + + return True + + +@task(log_prints=True) +def taxonomy_is_up_to_date(local_path: str, http_path: str) -> bool: + """ + Check if the local NCBI taxonomy file is up-to-date with the remote file. + + Args: + local_path (str): Path to the local file. + http_path (str): Path to the remote file on HTTP. + + Returns: + bool: True if the local file is up-to-date, False otherwise. + """ + return is_local_file_current_http(f"{local_path}/nodes.dmp", http_path) + + +@flow() +def update_ncbi_taxonomy(output_path: str) -> None: + """Fetch and the NCBI taxonomy dump. + + Args: + output_path (str): Path to save the taxonomy dump. + """ + http_path = "https://ftp.ncbi.nlm.nih.gov/pub/taxonomy/taxdump.tar.gz" + status = None + if taxonomy_is_up_to_date(output_path, http_path): + status = True + else: + status = False + fetch_ncbi_taxonomy(local_path=output_path, http_path=http_path) + print(f"Taxonomy update status: {status}") + + emit_event( + event="update.ncbi.taxonomy.finished", + resource={ + "prefect.resource.id": f"fetch.taxonomy.{output_path}", + "prefect.resource.type": "ncbi.taxonomy", + "prefect.resource.matches.previous": "yes" if status else "no", + }, + payload={"matches_previous": status}, + ) + return status + + +if __name__ == "__main__": + """Run the flow.""" + args = parse_args( + [required(OUTPUT_PATH)], + "Fetch NCBI taxdump.", + ) + + update_ncbi_taxonomy(**vars(args)) diff --git a/flows/updaters/update_nhm.py b/flows/updaters/update_nhm.py index 4f5a7f4..b82d679 100644 --- a/flows/updaters/update_nhm.py +++ b/flows/updaters/update_nhm.py @@ -1,12 +1,3 @@ -#!/usr/bin/env python3 - -import sys -from os.path import abspath, dirname - -if __name__ == "__main__" and __package__ is None: - sys.path.insert(0, dirname(dirname(dirname(abspath(__file__))))) - __package__ = "flows" - from flows.lib.conditional_import import emit_event, flow, task from flows.lib.shared_args import MIN_RECORDS, OUTPUT_PATH, parse_args, required from flows.updaters.api import api_config as cfg diff --git a/flows/updaters/update_ott_taxonomy.py b/flows/updaters/update_ott_taxonomy.py new file mode 100644 index 0000000..7280a14 --- /dev/null +++ b/flows/updaters/update_ott_taxonomy.py @@ -0,0 +1,165 @@ +import json +import os + +from flows.lib.conditional_import import emit_event, flow, task +from flows.lib.shared_args import OUTPUT_PATH, parse_args, required +from flows.lib.utils import is_local_file_current_http, is_safe_path, run_quoted + + +@task(retries=2, retry_delay_seconds=2, log_prints=True) +def fetch_ott_taxonomy( + local_path: str, + http_path: str, +) -> bool: + """ + Fetch the OTT taxonomy and filter by root taxon if specified. + + Args: + http_path (str): URL to fetch the taxonomy from. + local_path (str): Path to save the taxonomy. + + Returns: + bool: True if the fetched file matches the remote version, False otherwise. + """ + if not is_safe_path(local_path): + raise ValueError(f"Unsafe local path: {local_path}") + if not is_safe_path(http_path): + raise ValueError(f"Unsafe HTTP path: {http_path}") + # create local_path if it doesn't exist + os.makedirs(local_path, exist_ok=True) + local_gz_file = f"{local_path}/ott.tar.gz" + # Fetch the remote file + cmd = ["curl", "-sSL", http_path, "-o", local_gz_file] + print(f"Running command: {' '.join(cmd)}") + # Inputs have been validated by is_safe_path; safe to use in subprocess + run_quoted(cmd, check=True) + + # extract the tar.gz file + cmd = ["tar", "-xzf", local_gz_file, "-C", local_path] + print(f"Running command: {' '.join(cmd)}") + # Inputs have been validated by is_safe_path; safe to use in subprocess + run_quoted(cmd, check=True) + + # Find the extracted subdirectory (should start with 'ott') + extracted_dirs = [ + d + for d in os.listdir(local_path) + if os.path.isdir(os.path.join(local_path, d)) and d.startswith("ott") + ] + if not extracted_dirs: + raise RuntimeError("No extracted ott directory found.") + ott_dir = os.path.join(local_path, extracted_dirs[0]) + + # Move all files from the ott subdirectory to local_path + for fname in os.listdir(ott_dir): + src = os.path.join(ott_dir, fname) + dst = os.path.join(local_path, fname) + if os.path.exists(dst): + os.remove(dst) + os.rename(src, dst) + + # Remove the now-empty ott subdirectory + os.rmdir(ott_dir) + + # set the timestamp of extracted files to match the tar.gz file + gz_mtime = os.path.getmtime(local_gz_file) + for fname in os.listdir(local_path): + fpath = os.path.join(local_path, fname) + if os.path.isfile(fpath): + os.utime(fpath, (gz_mtime, gz_mtime)) + + # remove the tar.gz file + os.remove(local_gz_file) + + return True + + +@task(log_prints=True) +def ott_taxonomy_is_up_to_date(local_path: str, http_path: str) -> bool: + """ + Check if the local OTT taxonomy file is up-to-date with the remote file. + + Args: + local_path (str): Path to the local file. + http_path (str): Path to the remote file on HTTP. + + Returns: + bool: True if the local file is up-to-date, False otherwise. + """ + return is_local_file_current_http(f"{local_path}/taxonomy.tsv", http_path) + + +def set_ott_url() -> str: + """Set the OTT URL. + + Returns: + str: The OTT URL. + """ + + # Fetch OTT taxonomy info as JSON + cmd = [ + "curl", + "-X", + "POST", + "-s", + "https://api.opentreeoflife.org/v3/taxonomy/about", + ] + print(f"Running command: {' '.join(cmd)}") + # Input is generated from static strings; safe to use in subprocess + result = run_quoted(cmd, check=True, capture_output=True, text=True) + ott_json = json.loads(result.stdout) + + # Extract required fields + source = ott_json.get("source", "") + name = ott_json.get("name", "") + version = ott_json.get("version", "") + + # Replace "draft" with "." in source to get OTT_VERSION + ott_version = source.replace("draft", ".") + ott_major_version = f"{name}{version}" + + return ( + f"https://files.opentreeoflife.org/ott/" + f"{ott_major_version}/{ott_version}.tgz" + ) + + +@flow() +def update_ott_taxonomy(output_path: str) -> None: + """Fetch the OTT taxonomy file. + + Args: + output_path (str): Path to save the taxonomy dump. + """ + http_path = set_ott_url() + status = None + complete = False + if ott_taxonomy_is_up_to_date(output_path, http_path): + status = True + complete = True + else: + status = False + complete = fetch_ott_taxonomy(local_path=output_path, http_path=http_path) + print(f"OTT taxonomy file matches previous: {status}") + + if complete: + emit_event( + event="update.ott.taxonomy.finished", + resource={ + "prefect.resource.id": f"fetch.ott.taxonomy.{output_path}", + "prefect.resource.type": "ott.taxonomy", + "prefect.resource.matches.previous": "yes" if status else "no", + }, + payload={"matches_previous": status}, + ) + return status + + +if __name__ == "__main__": + """Run the flow.""" + args = parse_args( + [required(OUTPUT_PATH)], + "Fetch OTT taxonomy file.", + ) + + update_ott_taxonomy(**vars(args)) diff --git a/flows/updaters/update_tolid_prefixes.py b/flows/updaters/update_tolid_prefixes.py new file mode 100644 index 0000000..0f46b44 --- /dev/null +++ b/flows/updaters/update_tolid_prefixes.py @@ -0,0 +1,112 @@ +import os + +from flows.lib.conditional_import import emit_event, flow, task +from flows.lib.shared_args import OUTPUT_PATH, parse_args, required +from flows.lib.utils import is_local_file_current_http, is_safe_path, run_quoted + + +@task(retries=2, retry_delay_seconds=2, log_prints=True) +def fetch_tolid_prefixes( + local_path: str, + http_path: str = ( + "https://gitlab.com/wtsi-grit/darwin-tree-of-life-sample-naming/" + "-/raw/master/tolids.txt?ref_type=heads" + ), + min_lines: int = 400000, +) -> bool: + """ + Fetch the ToLID prefix file and filter by root taxon if specified. + + Args: + http_path (str): URL to fetch the ToLID prefix file from. + local_path (str): Path to save the ToLID prefix file. + + Returns: + bool: True if the fetched file matches the remote version, False otherwise. + """ + + if not is_safe_path(local_path): + raise ValueError(f"Unsafe local path: {local_path}") + if not is_safe_path(http_path): + raise ValueError(f"Unsafe HTTP path: {http_path}") + + # create local_path if it doesn't exist + os.makedirs(local_path, exist_ok=True) + local_file = f"{local_path}/tolids.txt" + # Fetch the remote file + cmd = ["curl", "-sSL", http_path, "-o", local_file] + print(f"Running command: {' '.join(cmd)}") + run_quoted(cmd, check=True) + + # check the number of lines in the file + with open(local_file, "r") as f: + num_lines = sum(1 for _ in f) + if num_lines < min_lines: + print(f"File has too few lines: {num_lines} < {min_lines}") + return False, num_lines + + return True, num_lines + + +@task(log_prints=True) +def tolid_file_is_up_to_date(local_path: str, http_path: str) -> bool: + """ + Check if the local ToLID prefixes file is up-to-date with the remote file. + + Args: + local_path (str): Path to the local file. + http_path (str): Path to the remote file on HTTP. + + Returns: + bool: True if the local file is up-to-date, False otherwise. + """ + return is_local_file_current_http(f"{local_path}/tolids.txt", http_path) + + +@flow() +def update_tolid_prefixes(output_path: str) -> None: + """Fetch the ToLID prefixes file. + + Args: + output_path (str): Path to save the taxonomy dump. + """ + http_path = ( + "https://gitlab.com/wtsi-grit/darwin-tree-of-life-sample-naming/" + "-/raw/master/tolids.txt?ref_type=heads" + ) + status = None + complete = False + if tolid_file_is_up_to_date(output_path, http_path): + status = True + complete = True + line_count = 0 + with open(f"{output_path}/tolids.txt", "r") as f: + line_count = sum(1 for _ in f) + else: + status = False + complete, line_count = fetch_tolid_prefixes( + local_path=output_path, http_path=http_path + ) + print(f"TolID file matches previous: {status}") + + if complete: + emit_event( + event="update.tolid.prefixes.finished", + resource={ + "prefect.resource.id": f"fetch.tolid.prefixes.{output_path}", + "prefect.resource.type": "tolid.prefixes", + "prefect.resource.matches.previous": "yes" if status else "no", + }, + payload={"matches_previous": status, "line_count": line_count}, + ) + return status + + +if __name__ == "__main__": + """Run the flow.""" + args = parse_args( + [required(OUTPUT_PATH)], + "Fetch ToLID prefixes.", + ) + + update_tolid_prefixes(**vars(args)) diff --git a/pyproject.toml b/pyproject.toml index 11de11b..3d40f11 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,4 +10,16 @@ exclude = ''' | build | dist )/ -''' \ No newline at end of file +''' + +[tool.isort] +profile = "black" +line_length = 88 # Match black's default +multi_line_output = 3 +include_trailing_comma = true +force_grid_wrap = 0 +use_parentheses = true +ensure_newline_before_comments = true + +[tool.flake8] +max-line-length = 88 \ No newline at end of file diff --git a/scripts/bulk_goat_lookup.py b/scripts/bulk_goat_lookup.py index 1d7a109..f763cfc 100755 --- a/scripts/bulk_goat_lookup.py +++ b/scripts/bulk_goat_lookup.py @@ -91,7 +91,7 @@ def main(): f"&ranks={args.ranks.replace(',', '%2C')}" ) response = requests.get( - api_url, headers={"Accept": "text/tab-separated-values"} + api_url, headers={"Accept": "text/tab-separated-values"}, timeout=300 ) if response.status_code == 200: parse_results(