Conversation
trigger
Reviewer's GuideThis PR enhances the core utility layer with S3/file helpers and robust HTTP handling, tightens input validation, refactors CLI entrypoints, and extends the Prefect pipeline by adding new taxonomy update deployments with a monthly schedule alias. Sequence diagram for S3 file upload with validationsequenceDiagram
participant U as "User/Flow"
participant Utils as "upload_to_s3()"
participant S3Cmd as "s3cmd subprocess"
U->>Utils: Call upload_to_s3(local_path, s3_path, gz)
Utils->>Utils: is_safe_path(local_path)
Utils->>Utils: is_safe_path(s3_path)
alt gz required
Utils->>Utils: gzip local file
Utils->>S3Cmd: s3cmd put gzipped file to s3_path
S3Cmd-->>Utils: result
Utils->>Utils: remove gzipped file
else no gz
Utils->>S3Cmd: s3cmd put local file to s3_path
S3Cmd-->>Utils: result
end
Utils-->>U: Success or error
Sequence diagram for Prefect taxonomy update pipeline (new deployments)sequenceDiagram
participant Prefect as "Prefect Scheduler"
participant Flow as "update-ena-taxonomy-extra"
participant S3 as "S3 Storage"
Prefect->>Flow: Trigger flow (monthly/triggered)
Flow->>S3: fetch_from_s3(s3_path, output_path) [if append]
Flow->>Flow: read_ncbi_tax_ids(taxdump_path)
Flow->>Flow: add_jsonl_tax_ids(output_path, tax_ids)
Flow->>Flow: get_ena_api_new_taxids(root_taxid, tax_ids)
Flow->>Flow: update_ena_jsonl(new_tax_ids, output_path, append)
Flow->>Flow: sort_jsonl_by_lineage(output_path)
Flow->>S3: upload_to_s3(output_path, s3_path)
Flow->>Prefect: emit_event(update.ena.taxonomy.finished)
Class diagram for new and updated utility functions in utils.pyclassDiagram
class Utils {
+find_http_file(http_path: str, filename: str) str
+find_s3_file(s3_path: list, filename: str) str
+is_safe_path(path: str) bool
+parse_s3_path(s3_path)
+fetch_from_s3(s3_path: str, local_path: str, gz: bool)
+upload_to_s3(local_path: str, s3_path: str, gz: bool)
+last_modified_git_remote(http_path: str) int
+last_modified_http(http_path: str) int
+last_modified_s3(s3_path: str) int
+last_modified(local_path: str) int
+is_local_file_current_http(local_path: str, http_path: str) bool
+generate_md5(file_path)
}
File-Level Changes
Tips and commandsInteracting with Sourcery
Customizing Your ExperienceAccess your dashboard to:
Getting Help
|
There was a problem hiding this comment.
Hey there - I've reviewed your changes - here's some feedback:
Blocking issues:
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected a 'requests' call without a timeout set. By default, 'requests' calls wait until the connection is closed. This means a 'requests' call without a timeout will hang the program if a response is never received. Consider setting a timeout for all 'requests'. (link)
- Detected a 'requests' call without a timeout set. By default, 'requests' calls wait until the connection is closed. This means a 'requests' call without a timeout will hang the program if a response is never received. Consider setting a timeout for all 'requests'. (link)
- Detected a 'requests' call without a timeout set. By default, 'requests' calls wait until the connection is closed. This means a 'requests' call without a timeout will hang the program if a response is never received. Consider setting a timeout for all 'requests'. (link)
- Detected subprocess function 'Popen' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
General comments:
- The S3 helpers mix boto3 for downloads and s3cmd calls for uploads and even duplicate
parse_s3_pathdefinitions—consider standardizing on one SDK (e.g. boto3) and extracting shared utilities to avoid duplication. - All of the updater scripts follow the same “check‐up‐to‐date then fetch and emit event” pattern—think about abstracting that common flow into a shared helper or decorator to DRY up the code.
- There are a lot of print statements in Prefect tasks and utils; switching to Prefect’s built‐in logger or Python’s logging module would give you structured logs and better control over log levels.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The S3 helpers mix boto3 for downloads and s3cmd calls for uploads and even duplicate `parse_s3_path` definitions—consider standardizing on one SDK (e.g. boto3) and extracting shared utilities to avoid duplication.
- All of the updater scripts follow the same “check‐up‐to‐date then fetch and emit event” pattern—think about abstracting that common flow into a shared helper or decorator to DRY up the code.
- There are a lot of print statements in Prefect tasks and utils; switching to Prefect’s built‐in logger or Python’s logging module would give you structured logs and better control over log levels.
## Individual Comments
### Comment 1
<location> `flows/lib/utils.py:668-674` </location>
<code_context>
+ gz_path = f"{local_path}.gz"
+ with open(local_path, "rb") as f_in, gzip.open(gz_path, "wb") as f_out:
+ f_out.write(f_in.read())
+ cmd = [
+ "s3cmd",
+ "put",
+ "setacl",
+ "--acl-public",
+ shlex.quote(gz_path),
+ shlex.quote(s3_path),
+ ]
+ result = subprocess.run(cmd, capture_output=True, text=True)
</code_context>
<issue_to_address>
**issue (bug_risk):** The s3cmd command construction may be incorrect.
'setacl' should not be used as an argument to 'put'. Please review the s3cmd documentation and update the command to ensure correct upload and ACL application.
</issue_to_address>
### Comment 2
<location> `flows/lib/utils.py:745` </location>
<code_context>
+ return list(reader)
+
+
+def last_modified_git_remote(http_path: str) -> Optional[str]:
+ """
+ Get the last modified date of a file in a git repository.
</code_context>
<issue_to_address>
**issue:** Potential for index errors when parsing GitLab URLs.
Accessing specific indices in the URL without validation may cause IndexError if the format is unexpected. Please add checks or error handling for malformed URLs.
</issue_to_address>
### Comment 3
<location> `flows/lib/utils.py:828` </location>
<code_context>
+ return None
+
+
+def last_modified(local_path: str) -> Optional[str]:
+ """
+ Get the last modified date of a local file.
</code_context>
<issue_to_address>
**suggestion:** Return type annotation does not match returned value type.
The function should be annotated as Optional[int] to accurately reflect its return type and ensure type safety.
Suggested implementation:
```python
def last_modified(local_path: str) -> Optional[int]:
"""
Get the last modified UNIX timestamp of a local file.
Args:
local_path (str): Path to the local file.
Returns:
Optional[int]: Last modified UNIX timestamp of the file, or None if the file does not exist.
"""
if not os.path.exists(local_path):
return None
mtime = os.path.getmtime(local_path)
```
```python
mtime = os.path.getmtime(local_path)
return int(mtime)
```
</issue_to_address>
### Comment 4
<location> `flows/parsers/parse_ncbi_assemblies.py:79` </location>
<code_context>
yield json.loads(line)
+def is_atypical_assembly(report: dict, parsed: dict) -> bool:
+ """
+ Check if an assembly is atypical.
</code_context>
<issue_to_address>
**issue:** Side effect in atypical assembly check may be unexpected.
Deleting entries from the parsed dictionary may cause unintended side effects for callers. Recommend separating mutation from the check or documenting this behavior.
</issue_to_address>
### Comment 5
<location> `flows/parsers/parse_ncbi_assemblies.py:243` </location>
<code_context>
continue
linked_row = parsed[acc]
if accession not in linked_row["linkedAssembly"]:
+ if not isinstance(linked_row["linkedAssembly"], list):
+ linked_row["linkedAssembly"] = []
linked_row["linkedAssembly"].append(accession)
</code_context>
<issue_to_address>
**question:** Type normalization for linkedAssembly may mask upstream data issues.
Instead of converting linkedAssembly to a list when its type is unexpected, consider raising an error or warning to surface potential upstream data issues, unless this behavior is intentional and clearly documented.
</issue_to_address>
### Comment 6
<location> `flows/parsers/parse_ncbi_assemblies.py:377-380` </location>
<code_context>
except Exception as e:
print(
- f"Error processing report for {report.get('accession', 'unknown')}: {e}"
+ (
+ f"Error processing report for "
+ f"{report.get('accession', 'unknown')}: "
+ f"{e} (line {e.__traceback__.tb_lineno})"
+ )
)
</code_context>
<issue_to_address>
**suggestion:** Using e.__traceback__.tb_lineno may not reliably indicate the error line.
e.__traceback__.tb_lineno only reports the innermost frame's line number. For more accurate error locations, use the traceback module.
</issue_to_address>
### Comment 7
<location> `flows/updaters/update_ena_taxonomy_extra.py:74` </location>
<code_context>
+ # Stream the content of the URL
+ column_index = None
+ new_tax_ids = set()
+ with urlopen(url) as response:
+ for line in response:
+ columns = line.decode("utf-8").strip().split("\t")
</code_context>
<issue_to_address>
**issue:** ENA API response parsing assumes column order and format.
Validate the response header to ensure 'tax_id' is in the expected position, and add error handling for unexpected formats.
</issue_to_address>
### Comment 8
<location> `flows/updaters/update_ncbi_taxonomy.py:45` </location>
<code_context>
+ cmd = ["curl", "-sSL", remote_md5_path]
+ print(f"Running command: {' '.join(cmd)}")
+ result = subprocess.run(cmd, check=True, capture_output=True, text=True)
+ remote_md5 = result.stdout.split()[0]
+
+ # Calculate the local MD5 checksum
</code_context>
<issue_to_address>
**issue (bug_risk):** Splitting remote MD5 output may fail if format changes.
The code relies on the checksum being the first token, which may not be reliable if the file format changes. Use output validation or a regex to ensure correct extraction.
</issue_to_address>
### Comment 9
<location> `flows/lib/utils.py:607` </location>
<code_context>
return None
+def fetch_from_s3(s3_path: str, local_path: str, gz: bool = False) -> None:
+ """
+ Fetch a file from S3.
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring to use a single parse_s3_path helper and unify S3 operations with boto3 for clarity and consistency.
```markdown
Consider factoring out the duplicated `parse_s3_path` and switching the upload path to use `boto3` (instead of mixing in `s3cmd`). This keeps all current functionality, removes the inner‐function duplication, and unifies your S3 API usage.
1. At module top, add a single helper:
```python
def parse_s3_path(s3_url: str) -> tuple[str, str]:
"""Split "s3://bucket/key/path" into ("bucket", "key/path")."""
url = s3_url.removeprefix("s3://")
bucket, key = url.split("/", 1)
return bucket, key
```
2. Simplify `fetch_from_s3`:
```python
def fetch_from_s3(s3_path: str, local_path: str, gz: bool = False) -> None:
bucket, key = parse_s3_path(s3_path)
gz = gz or (s3_path.endswith(".gz") and not local_path.endswith(".gz"))
download_target = f"{local_path}.gz" if gz else local_path
boto3.client("s3").download_file(bucket, key, download_target)
if gz:
with gzip.open(download_target, "rb") as f_in, open(local_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(download_target)
```
3. Simplify `upload_to_s3`:
```python
def upload_to_s3(local_path: str, s3_path: str, gz: bool = False) -> None:
bucket, key = parse_s3_path(s3_path)
gz = gz or (s3_path.endswith(".gz") and not local_path.endswith(".gz"))
source = local_path
if gz:
gz_path = f"{local_path}.gz"
with open(local_path, "rb") as f_in, gzip.open(gz_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
source = gz_path
boto3.client("s3").upload_file(
Filename=source,
Bucket=bucket,
Key=key,
ExtraArgs={"ACL": "public-read"}
)
if gz:
os.remove(gz_path)
```
Benefits:
- Single `parse_s3_path` at module level.
- All S3 ops use `boto3` (no `subprocess` / `s3cmd`).
- Clear, linear flow in download/upload (no nested `try`/`except` or branching on two clients).
```
</issue_to_address>
### Comment 10
<location> `flows/updaters/update_genomehubs_taxonomy.py:28` </location>
<code_context>
+from flows.lib.utils import fetch_from_s3, upload_to_s3
+
+
+def get_file_paths_from_config(config: dict, file_paths: dict) -> dict:
+ key = config.get("xref_label")
+ input_path = config.get("path")
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring the input config parsing into a single loop to simplify control flow and remove unnecessary function indirection.
Here’s a way to collapse both functions into one straight-forward loop, drop the external mutation and the “sometimes returns `out`” oddity, and keep all the same behavior:
```python
@task(log_prints=True)
def read_input_config(input_path: str) -> dict:
print(f"Reading input config from {input_path}")
try:
config = yaml.safe_load(open(input_path))
except Exception as e:
print(f"Error reading {input_path}: {e}")
exit()
file_paths = {}
# include top‐level config plus any nested taxonomies
for entry in [config] + config.get("taxonomies", []):
label = entry.get("xref_label")
inp = entry.get("path")
if label and inp:
file_paths[label] = {"input": inp}
# capture the 'out' key once, if present
outp = config.get("out")
if outp:
file_paths["out"] = outp
return file_paths
```
Remove `get_file_paths_from_config` entirely, and adjust any downstream code that expects that “sometimes‐returned” `output_path` to pull from `file_paths.get("out")` instead. This preserves all existing behavior but makes the control flow linear and explicit.
</issue_to_address>
### Comment 11
<location> `flows/lib/utils.py:676` </location>
<code_context>
result = subprocess.run(cmd, capture_output=True, text=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 12
<location> `flows/lib/utils.py:687` </location>
<code_context>
result = subprocess.run(cmd, capture_output=True, text=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 13
<location> `flows/lib/utils.py:766` </location>
<code_context>
response = requests.get(api_url)
</code_context>
<issue_to_address>
**security (python.requests.best-practice.use-timeout):** Detected a 'requests' call without a timeout set. By default, 'requests' calls wait until the connection is closed. This means a 'requests' call without a timeout will hang the program if a response is never received. Consider setting a timeout for all 'requests'.
```suggestion
response = requests.get(api_url, timeout=30)
```
*Source: opengrep*
</issue_to_address>
### Comment 14
<location> `flows/lib/utils.py:773` </location>
<code_context>
response = requests.head(http_path, allow_redirects=True)
</code_context>
<issue_to_address>
**security (python.requests.best-practice.use-timeout):** Detected a 'requests' call without a timeout set. By default, 'requests' calls wait until the connection is closed. This means a 'requests' call without a timeout will hang the program if a response is never received. Consider setting a timeout for all 'requests'.
```suggestion
response = requests.head(http_path, allow_redirects=True, timeout=30)
```
*Source: opengrep*
</issue_to_address>
### Comment 15
<location> `flows/lib/utils.py:791` </location>
<code_context>
response = requests.head(http_path, allow_redirects=True)
</code_context>
<issue_to_address>
**security (python.requests.best-practice.use-timeout):** Detected a 'requests' call without a timeout set. By default, 'requests' calls wait until the connection is closed. This means a 'requests' call without a timeout will hang the program if a response is never received. Consider setting a timeout for all 'requests'.
```suggestion
response = requests.head(http_path, allow_redirects=True, timeout=30)
```
*Source: opengrep*
</issue_to_address>
### Comment 16
<location> `flows/updaters/update_genomehubs_taxonomy.py:76-82` </location>
<code_context>
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'Popen' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 17
<location> `flows/updaters/update_ncbi_taxonomy.py:38` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 18
<location> `flows/updaters/update_ncbi_taxonomy.py:44` </location>
<code_context>
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 19
<location> `flows/updaters/update_ncbi_taxonomy.py:58` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 20
<location> `flows/updaters/update_ott_taxonomy.py:41` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 21
<location> `flows/updaters/update_ott_taxonomy.py:46` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 22
<location> `flows/updaters/update_ott_taxonomy.py:113` </location>
<code_context>
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 23
<location> `flows/updaters/update_tolid_prefixes.py:43` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 24
<location> `flows/updaters/update_ott_taxonomy.py:3` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Unused skip comment ([`unused-skip-comment`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/unused-skip-comment))
</issue_to_address>
### Comment 25
<location> `flows/updaters/update_tolid_prefixes.py:3` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Unused skip comment ([`unused-skip-comment`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/unused-skip-comment))
</issue_to_address>
### Comment 26
<location> `flows/updaters/update_ena_taxonomy_extra.py:13` </location>
<code_context>
import json
import os
import sys
from os.path import abspath, dirname
from urllib.request import urlopen
from tqdm import tqdm
if __name__ == "__main__" and __package__ is None:
sys.path.insert(0, dirname(dirname(dirname(abspath(__file__)))))
__package__ = "flows"
from flows.lib.conditional_import import emit_event, flow, task
from flows.lib.shared_args import (
APPEND,
OUTPUT_PATH,
ROOT_TAXID,
S3_PATH,
TAXDUMP_PATH,
default,
parse_args,
required,
)
from flows.lib.utils import fetch_from_s3, upload_to_s3
</code_context>
<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `\_\_package\_\_` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))
<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:
```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.
How can you solve this?
Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>
### Comment 27
<location> `flows/updaters/update_genomehubs_taxonomy.py:13` </location>
<code_context>
import os
import subprocess
import sys
from collections import defaultdict
from os.path import abspath, dirname
import yaml
if __name__ == "__main__" and __package__ is None:
sys.path.insert(0, dirname(dirname(dirname(abspath(__file__)))))
__package__ = "flows"
from flows.lib.conditional_import import emit_event, flow, task
from flows.lib.shared_args import (
INPUT_PATH,
OUTPUT_PATH,
ROOT_TAXID,
S3_PATH,
default,
parse_args,
required,
)
from flows.lib.utils import fetch_from_s3, upload_to_s3
</code_context>
<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `\_\_package\_\_` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))
<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:
```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.
How can you solve this?
Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>
### Comment 28
<location> `flows/updaters/update_ncbi_taxonomy.py:10` </location>
<code_context>
import os
import subprocess
import sys
from os.path import abspath, dirname
if __name__ == "__main__" and __package__ is None:
sys.path.insert(0, dirname(dirname(dirname(abspath(__file__)))))
__package__ = "flows"
from flows.lib.conditional_import import emit_event, flow, task
from flows.lib.shared_args import OUTPUT_PATH, parse_args, required
from flows.lib.utils import generate_md5, is_local_file_current_http
</code_context>
<issue_to_address>
**issue (code-quality):** Don't assign to builtin variable `\_\_package\_\_` ([`avoid-builtin-shadow`](https://docs.sourcery.ai/Reference/Default-Rules/comments/avoid-builtin-shadow/))
<br/><details><summary>Explanation</summary>Python has a number of `builtin` variables: functions and constants that
form a part of the language, such as `list`, `getattr`, and `type`
(See https://docs.python.org/3/library/functions.html).
It is valid, in the language, to re-bind such variables:
```python
list = [1, 2, 3]
```
However, this is considered poor practice.
- It will confuse other developers.
- It will confuse syntax highlighters and linters.
- It means you can no longer use that builtin for its original purpose.
How can you solve this?
Rename the variable something more specific, such as `integers`.
In a pinch, `my_list` and similar names are colloquially-recognized
placeholders.</details>
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
@sourcery-ai resolve |
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey there - I've reviewed your changes and found some issues that need to be addressed.
Blocking issues:
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Certificate verification has been explicitly disabled. This permits insecure connections to insecure servers. Re-enable certification validation. (link)
- Certificate verification has been explicitly disabled. This permits insecure connections to insecure servers. Re-enable certification validation. (link)
- Detected subprocess function 'Popen' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
Prompt for AI Agents
Please address the comments from this code review:
## Individual Comments
### Comment 1
<location> `flows/lib/utils.py:609` </location>
<code_context>
+def is_safe_path(path: str) -> bool:
+ # Only allow alphanumeric, dash, underscore, dot, slash, and colon (for s3)
+ return bool(re.match(r"^[\w\-/.:]+$", path))
+
+
</code_context>
<issue_to_address>
**🚨 issue (security):** The regex in is_safe_path may allow unintended path traversal.
The current regex permits patterns like '../', which can lead to directory traversal vulnerabilities. Please add explicit checks for '..' or strengthen the validation to prevent this security issue.
</issue_to_address>
### Comment 2
<location> `flows/lib/utils.py:680-686` </location>
<code_context>
+ with open(local_path, "rb") as f_in, gzip.open(gz_path, "wb") as f_out:
+ f_out.write(f_in.read())
+ # use s3cmd for uploads due to issues with boto3 and large files
+ cmd = [
+ "s3cmd",
+ "put",
+ "setacl",
+ "--acl-public",
+ gz_path,
+ s3_path,
+ ]
+ result = subprocess.run(cmd, capture_output=True, text=True)
</code_context>
<issue_to_address>
**issue (bug_risk):** s3cmd invocation includes 'setacl' as a positional argument, which may not be valid.
'setacl' should not be used as a positional argument with 'put'. Please confirm the correct s3cmd syntax to avoid command failure.
</issue_to_address>
### Comment 3
<location> `flows/lib/utils.py:758` </location>
<code_context>
+ return list(reader)
+
+
+def last_modified_git_remote(http_path: str) -> Optional[str]:
+ """
+ Get the last modified date of a file in a git repository.
</code_context>
<issue_to_address>
**issue:** last_modified_git_remote returns inconsistent types (int or str).
Ensure the function consistently returns either an int or a str, or update the documentation to clarify the return type.
</issue_to_address>
### Comment 4
<location> `flows/lib/utils.py:800` </location>
<code_context>
+ return None
+
+
+def last_modified_http(http_path: str) -> Optional[str]:
+ """
+ Get the last modified date of a file.
</code_context>
<issue_to_address>
**issue:** last_modified_http returns int or None, but signature says Optional[str].
Update the return type to Optional[int], or ensure the function always returns a str as specified.
</issue_to_address>
### Comment 5
<location> `flows/lib/utils.py:821` </location>
<code_context>
+ return None
+
+
+def last_modified_s3(s3_path: str) -> Optional[str]:
+ """
+ Get the last modified date of a file on S3.
</code_context>
<issue_to_address>
**issue:** last_modified_s3 returns int or None, but signature says Optional[str].
Update the return type to Optional[int], or ensure the function consistently returns a str if that is required.
</issue_to_address>
### Comment 6
<location> `flows/updaters/update_nhm.py:75` </location>
<code_context>
)
update_nhm(**vars(args))
+ update_nhm(**vars(args))
</code_context>
<issue_to_address>
**issue (bug_risk):** update_nhm is called twice in succession.
Invoking update_nhm twice with identical arguments may cause duplicate processing or unintended side effects.
</issue_to_address>
### Comment 7
<location> `flows/updaters/update_ncbi_taxonomy.py:128` </location>
<code_context>
+ "Fetch NCBI taxdump.",
+ )
+
+ update_ncbi_taxonomy(**vars(args))
+ update_ncbi_taxonomy(**vars(args))
</code_context>
<issue_to_address>
**issue (bug_risk):** update_ncbi_taxonomy is called twice in __main__.
This may cause unnecessary duplication or unintended side effects.
</issue_to_address>
### Comment 8
<location> `flows/lib/utils.py:607` </location>
<code_context>
return None
+def is_safe_path(path: str) -> bool:
+ # Only allow alphanumeric, dash, underscore, dot, slash, and colon (for s3)
+ return bool(re.match(r"^[\w\-/.:]+$", path))
</code_context>
<issue_to_address>
**issue (complexity):** Consider moving S3-related functions and HTTP/Git last-modified logic into separate utility modules to improve code organization and testability.
### Split S3 helpers into their own module
Move all S3‐related functions (`is_safe_path`, `parse_s3_path`, `fetch_from_s3`, `upload_to_s3`, `last_modified_s3`) into a new `s3_utils.py`. This keeps the main file focused and makes each utility easier to test.
```python
# s3_utils.py
import os, re, gzip, shutil
import boto3
from botocore.exceptions import ClientError
import subprocess
def is_safe_path(path: str) -> bool:
return bool(re.match(r"^[\w\-/.:]+$", path))
def parse_s3_path(s3_path: str) -> tuple[str, str]:
bucket, key = s3_path.removeprefix("s3://").split("/", 1)
return bucket, key
def fetch_from_s3(s3_path: str, local_path: str, gz: bool = False) -> None:
s3 = boto3.client("s3")
bucket, key = parse_s3_path(s3_path)
if s3_path.endswith(".gz") and not local_path.endswith(".gz"):
gz = True
try:
target = f"{local_path}.gz" if gz else local_path
s3.download_file(Bucket=bucket, Key=key, Filename=target)
if gz:
with gzip.open(target, "rb") as fin, open(local_path, "wb") as fout:
shutil.copyfileobj(fin, fout)
os.remove(target)
except ClientError as e:
raise RuntimeError(f"Error downloading {s3_path}: {e}")
def upload_to_s3(local_path: str, s3_path: str, gz: bool = False) -> None:
if not is_safe_path(local_path) or not is_safe_path(s3_path):
raise ValueError(f"Unsafe path: {local_path} or {s3_path}")
if s3_path.endswith(".gz") and not local_path.endswith(".gz"):
gz = True
cmd_base = ["s3cmd", "put", "setacl", "--acl-public"]
try:
if gz:
gz_path = f"{local_path}.gz"
with open(local_path, "rb") as fin, gzip.open(gz_path, "wb") as fout:
fout.write(fin.read())
cmd = cmd_base + [gz_path, s3_path]
os.remove(gz_path)
else:
cmd = cmd_base + [local_path, s3_path]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode:
raise RuntimeError(f"s3cmd failed: {result.stderr}")
except Exception as e:
raise RuntimeError(f"Error uploading {local_path}: {e}")
def last_modified_s3(s3_path: str) -> int | None:
s3 = boto3.client("s3")
bucket, key = parse_s3_path(s3_path)
try:
lm = s3.head_object(Bucket=bucket, Key=key)["LastModified"]
return int(lm.timestamp())
except ClientError:
return None
```
Then in your main file:
```python
from genomehubs.s3_utils import (
fetch_from_s3,
upload_to_s3,
last_modified_s3,
)
```
---
### Consolidate HTTP/Git “last modified” logic
Factor out the common HEAD+`Last-Modified` handling, and split GitLab parsing into its own helper:
```python
# http_utils.py
import requests
from dateutil import parser
from typing import Optional
def _get_head_last_modified(url: str, timeout=300) -> int | None:
resp = requests.head(url, allow_redirects=True, timeout=timeout)
if resp.ok and (lm := resp.headers.get("Last-Modified")):
return int(parser.parse(lm).timestamp())
return None
def last_modified_http(http_path: str) -> int | None:
if "gitlab.com" in http_path:
return last_modified_gitlab(http_path)
return _get_head_last_modified(http_path)
def last_modified_gitlab(http_path: str) -> int | None:
# parse URL → project, ref, file
api = build_gitlab_api_url(http_path)
resp = requests.get(api, timeout=300)
if resp.ok:
commits = resp.json()
if commits and (cd := commits[0].get("committed_date")):
return int(parser.isoparse(cd).timestamp())
return _get_head_last_modified(http_path)
```
Now you’ve:
- Moved S3 logic into its own module
- Centralized HTTP/Git “last modified” abstractions
- Eliminated repeated gz and HEAD/`Last-Modified` patterns
This reduces cognitive load in your main script and keeps each utility focused.
</issue_to_address>
### Comment 9
<location> `flows/lib/utils.py:688` </location>
<code_context>
result = subprocess.run(cmd, capture_output=True, text=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 10
<location> `flows/lib/utils.py:700` </location>
<code_context>
result = subprocess.run(cmd, capture_output=True, text=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 11
<location> `flows/updaters/api/api_config.py:171-173` </location>
<code_context>
return requests.get(
sts_url, headers={"Token": token, "Project": "ALL"}, verify=False, timeout=300
)
</code_context>
<issue_to_address>
**security (python.requests.security.disabled-cert-validation):** Certificate verification has been explicitly disabled. This permits insecure connections to insecure servers. Re-enable certification validation.
```suggestion
return requests.get(
sts_url, headers={"Token": token, "Project": "ALL"}, verify=True, timeout=300
)
```
*Source: opengrep*
</issue_to_address>
### Comment 12
<location> `flows/updaters/api/api_config.py:189-191` </location>
<code_context>
r = requests.get(
url, headers={"Token": token, "Project": "ALL"}, verify=False, timeout=300
).json()
</code_context>
<issue_to_address>
**security (python.requests.security.disabled-cert-validation):** Certificate verification has been explicitly disabled. This permits insecure connections to insecure servers. Re-enable certification validation.
```suggestion
r = requests.get(
url, headers={"Token": token, "Project": "ALL"}, verify=True, timeout=300
).json()
```
*Source: opengrep*
</issue_to_address>
### Comment 13
<location> `flows/updaters/update_genomehubs_taxonomy.py:83-89` </location>
<code_context>
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'Popen' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 14
<location> `flows/updaters/update_ncbi_taxonomy.py:43` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 15
<location> `flows/updaters/update_ncbi_taxonomy.py:49` </location>
<code_context>
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 16
<location> `flows/updaters/update_ncbi_taxonomy.py:63` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 17
<location> `flows/updaters/update_ott_taxonomy.py:45` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 18
<location> `flows/updaters/update_ott_taxonomy.py:50` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 19
<location> `flows/updaters/update_ott_taxonomy.py:117` </location>
<code_context>
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 20
<location> `flows/updaters/update_tolid_prefixes.py:49` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 21
<location> `flows/updaters/update_ena_taxonomy_extra.py:13` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Unused skip comment ([`unused-skip-comment`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/unused-skip-comment))
</issue_to_address>
### Comment 22
<location> `flows/updaters/update_genomehubs_taxonomy.py:13` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Unused skip comment ([`unused-skip-comment`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/unused-skip-comment))
</issue_to_address>
### Comment 23
<location> `flows/updaters/update_ncbi_datasets.py:14` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Unused skip comment ([`unused-skip-comment`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/unused-skip-comment))
</issue_to_address>
### Comment 24
<location> `flows/updaters/update_ncbi_taxonomy.py:10` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Unused skip comment ([`unused-skip-comment`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/unused-skip-comment))
</issue_to_address>
### Comment 25
<location> `flows/updaters/update_nhm.py:8` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Unused skip comment ([`unused-skip-comment`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/unused-skip-comment))
</issue_to_address>
### Comment 26
<location> `flows/updaters/update_ott_taxonomy.py:10` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Unused skip comment ([`unused-skip-comment`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/unused-skip-comment))
</issue_to_address>
### Comment 27
<location> `flows/updaters/update_tolid_prefixes.py:10` </location>
<code_context>
</code_context>
<issue_to_address>
**issue (code-quality):** Unused skip comment ([`unused-skip-comment`](https://docs.sourcery.ai/Reference/Rules-and-In-Line-Suggestions/Python/Default-Rules/unused-skip-comment))
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
|
@sourcery-ai resolve |
|
@sourcery-ai review |
There was a problem hiding this comment.
Hey there - I've reviewed your changes - here's some feedback:
Blocking issues:
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Certificate verification has been explicitly disabled. This permits insecure connections to insecure servers. Re-enable certification validation. (link)
- Certificate verification has been explicitly disabled. This permits insecure connections to insecure servers. Re-enable certification validation. (link)
- Detected subprocess function 'Popen' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
- Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)
General comments:
- The new is_safe_path regex is too restrictive for URLs (e.g. query parameters with '?' or '&'), so consider splitting path validation for local paths versus HTTP/S3 URLs or expanding the regex to allow valid URL characters.
- Rather than sprinkling timeout=300 everywhere, centralize HTTP client configuration (e.g. a shared requests.Session or wrapper) to keep timeouts consistent and reduce duplication.
- Relying on s3cmd in subprocess for large uploads adds external dependencies and platform constraints—evaluate using boto3’s multipart upload to handle large files natively and simplify the code path.
Prompt for AI Agents
Please address the comments from this code review:
## Overall Comments
- The new is_safe_path regex is too restrictive for URLs (e.g. query parameters with '?' or '&'), so consider splitting path validation for local paths versus HTTP/S3 URLs or expanding the regex to allow valid URL characters.
- Rather than sprinkling timeout=300 everywhere, centralize HTTP client configuration (e.g. a shared requests.Session or wrapper) to keep timeouts consistent and reduce duplication.
- Relying on s3cmd in subprocess for large uploads adds external dependencies and platform constraints—evaluate using boto3’s multipart upload to handle large files natively and simplify the code path.
## Individual Comments
### Comment 1
<location> `flows/lib/utils.py:657-666` </location>
<code_context>
+def upload_to_s3(local_path: str, s3_path: str, gz: bool = False) -> None:
</code_context>
<issue_to_address>
**suggestion (bug_risk):** Subprocess errors are printed and raised, but partial uploads may leave files in inconsistent state.
Use a try/finally block to guarantee temporary files are removed if the subprocess fails after creating a .gz file.
</issue_to_address>
### Comment 2
<location> `flows/lib/utils.py:762-604` </location>
<code_context>
+ return list(reader)
+
+
+def last_modified_git_remote(http_path: str) -> Optional[int]:
+ """
+ Get the last modified date of a file in a git repository.
+
+ Args:
+ http_path (str): Path to the HTTP file.
+
+ Returns:
+ Optional[int]: Last modified date of the file, or None if not found.
+ """
+ try:
+ if not http_path.startswith("https://gitlab.com/"):
+ print(f"Malformed GitLab URL (missing prefix): {http_path}")
+ return None
+ project_path = http_path.removeprefix("https://gitlab.com/")
+ project_path = project_path.removesuffix(".git")
</code_context>
<issue_to_address>
**suggestion:** last_modified_git_remote parses GitLab URLs with strict assumptions about URL structure.
Relying on a fixed number of URL segments may cause failures if the URL structure varies. Please make the parsing logic more flexible or clearly specify the required format.
</issue_to_address>
### Comment 3
<location> `flows/parsers/parse_ncbi_assemblies.py:47` </location>
<code_context>
Yields:
dict: The sequence report data as a JSON object, one line at a time.
"""
+ if not utils.is_safe_path(accession):
+ raise ValueError(f"Unsafe accession: {accession}")
result = subprocess.run(
</code_context>
<issue_to_address>
**question (bug_risk):** is_safe_path is used for accession, but accessions may contain underscores or other valid characters.
Verify that is_safe_path's regex allows all valid NCBI accession characters to prevent rejecting legitimate accessions.
</issue_to_address>
### Comment 4
<location> `flows/parsers/parse_ncbi_assemblies.py:116-118` </location>
<code_context>
Returns:
dict: The updated report dictionary.
"""
+ # Uncomment to filter atypical assemblies
+ # if is_atypical_assembly(report, parsed):
+ # return None
processed_report = {**report, "processedAssemblyInfo": {"organelle": "nucleus"}}
if "pairedAccession" in report:
</code_context>
<issue_to_address>
**nitpick:** Commented-out code for filtering atypical assemblies may lead to confusion.
Remove the commented-out filtering code if it's not needed. If toggling is required, use a configuration option instead.
</issue_to_address>
### Comment 5
<location> `flows/feature_parsers/parse_blobtoolkit_assembly.py:66` </location>
<code_context>
"""Run the flow."""
args = parse_args("Parse assembly data from a BlobToolKit BlobDir.")
parse_blobtoolkit_assembly_data(**vars(args))
+ parse_blobtoolkit_assembly_data(**vars(args))
</code_context>
<issue_to_address>
**issue (bug_risk):** parse_blobtoolkit_assembly_data is called twice in plugin().
Please remove the duplicate call unless it is required for correct functionality.
</issue_to_address>
### Comment 6
<location> `flows/updaters/update_nhm.py:65` </location>
<code_context>
)
update_nhm(**vars(args))
+ update_nhm(**vars(args))
</code_context>
<issue_to_address>
**issue (bug_risk):** update_nhm is called twice in the main block.
Remove the duplicate call to prevent unnecessary repeated execution.
</issue_to_address>
### Comment 7
<location> `flows/updaters/update_ncbi_taxonomy.py:122` </location>
<code_context>
+ "Fetch NCBI taxdump.",
+ )
+
+ update_ncbi_taxonomy(**vars(args))
+ update_ncbi_taxonomy(**vars(args))
</code_context>
<issue_to_address>
**issue (bug_risk):** update_ncbi_taxonomy is called twice in the main block.
Remove the duplicate call to avoid running the flow twice unnecessarily.
</issue_to_address>
### Comment 8
<location> `flows/lib/utils.py:622` </location>
<code_context>
+ return bucket, key
+
+
+def fetch_from_s3(s3_path: str, local_path: str, gz: bool = False) -> None:
+ """
+ Fetch a file from S3.
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring S3 and HTTP/GitLab logic into unified helpers using boto3 and urllib, and collapsing last-modified checks into a single function.
Here are a few concrete steps to collapse the S3‐ and HTTP/GitLab‐specific logic into much smaller helpers while preserving all functionality:
1. Replace all of your subprocess/s3cmd upload calls with boto3’s high‐level `upload_file`, so you can drop the `is_safe_path` check entirely.
2. Use `urllib.parse` to split `s3://…` URLs in one line instead of a custom `parse_s3_path`.
3. Collapse your `last_modified_*` functions down to a single HEAD + fallback to GitLab API.
---
```python
# 1) Simplify parse_s3_path with urllib
from urllib.parse import urlparse
def parse_s3_path(s3_path: str) -> tuple[str,str]:
p = urlparse(s3_path)
return p.netloc, p.path.lstrip("/")
# 2) Unified fetch and upload using boto3 only
import gzip
from boto3.s3.transfer import TransferConfig
def fetch_from_s3(s3_path: str, local_path: str) -> None:
bucket, key = parse_s3_path(s3_path)
s3 = boto3.client("s3")
tmp = local_path + (".gz" if s3_path.endswith(".gz") and not local_path.endswith(".gz") else "")
s3.download_file(bucket, key, tmp)
if tmp.endswith(".gz"):
with gzip.open(tmp, "rb") as f_in, open(local_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
os.remove(tmp)
def upload_to_s3(local_path: str, s3_path: str, gz: bool=False) -> None:
bucket, key = parse_s3_path(s3_path)
s3 = boto3.client("s3")
upload_file = local_path
if gz or s3_path.endswith(".gz") and not local_path.endswith(".gz"):
gz_path = local_path + ".gz"
with open(local_path,"rb") as src, gzip.open(gz_path,"wb") as dst:
dst.write(src.read())
upload_file = gz_path
# bump multipart_threshold if you have very large files
config = TransferConfig(multipart_threshold=100*1024*1024)
s3.upload_file(upload_file, bucket, key,
ExtraArgs={"ACL":"public"},
Config=config)
if upload_file.endswith(".gz") and upload_file != local_path:
os.remove(upload_file)
```
```python
# 3) Collapse all last‐modified logic into one helper
from datetime import datetime
from dateutil import parser
def last_modified_http(url: str) -> Optional[int]:
try:
r = requests.head(url, allow_redirects=True, timeout=10)
r.raise_for_status()
lm = r.headers.get("Last-Modified")
if lm:
return int(parser.parse(lm).timestamp())
except Exception:
return None
def last_modified_gitlab(url: str) -> Optional[int]:
# e.g. https://gitlab.com/group/project/-/blob/BRANCH/path/to/file
m = re.match(r"https://gitlab\.com/(?P<proj>[^/]+/[^/]+)/-/blob/(?P<ref>[^/]+)/(?P<path>.+)$", url)
if not m:
return None
proj = m.group("proj").replace("/", "%2F")
api = (f"https://gitlab.com/api/v4/projects/{proj}/repository/commits"
f"?ref_name={m.group('ref')}&path={m.group('path')}&per_page=1")
try:
resp = requests.get(api, timeout=10)
resp.raise_for_status()
commits = resp.json()
if commits:
return int(parser.isoparse(commits[0]["committed_date"]).timestamp())
except Exception:
pass
# fallback to standard HEAD
return last_modified_http(url)
def last_modified(url: str) -> Optional[int]:
if url.startswith("s3://"):
bucket, key = parse_s3_path(url)
try:
r = boto3.client("s3").head_object(Bucket=bucket, Key=key)
return int(r["LastModified"].timestamp())
except ClientError:
return None
return (last_modified_gitlab(url) if "gitlab.com" in url
else last_modified_http(url))
```
These changes:
- Remove the custom `is_safe_path` and all subprocess/s3cmd code.
- Unify parse logic with `urllib.parse`.
- Merge your three `last_modified_*` functions into one small dispatcher + two minimal backends.
This collapses several hundred lines of nested logic, yet keeps all existing behavior intact.
</issue_to_address>
### Comment 9
<location> `flows/updaters/update_ena_taxonomy_extra.py:54` </location>
<code_context>
+
+
+@task(log_prints=True)
+def get_ena_api_new_taxids(root_taxid: str, existing_tax_ids: set[str]) -> set[str]:
+ print(f"Fetching new taxids for tax_tree({root_taxid}) from ENA API")
+
</code_context>
<issue_to_address>
**issue (complexity):** Consider refactoring repeated HTTP fetch and file update logic into helper functions to reduce boilerplate and clarify task intent.
Here’s one way to cut down on ~50 lines of boilerplate by factoring out your common HTTP-fetch/JSON/file patterns into two small helpers:
```python
# flows/lib/utils.py
from urllib.request import urlopen
import json
import sys
def fetch_lines(url: str):
"""Yields decoded lines from a URL or raises."""
try:
with urlopen(url) as resp:
for raw in resp:
yield raw.decode("utf-8").strip()
except Exception as e:
print(f"Error fetching {url}: {e}", file=sys.stderr)
raise
def replace_file(path: str, lines):
"""
Write iterable of strings to a temp file, then os.replace it over path.
Keeps your edit→swap pattern DRY.
"""
import os, tempfile
dirn = os.path.dirname(path)
os.makedirs(dirn, exist_ok=True)
fd, tmp = tempfile.mkstemp(dir=dirn)
with os.fdopen(fd, "w") as f:
for line in lines:
f.write(line + "\n")
os.replace(tmp, path)
```
Then your tasks collapse a lot. For example:
```python
@task
def get_ena_api_new_taxids(root: str, seen: set[str]) -> set[str]:
url = (f"https://www.ebi.ac.uk/ena/portal/api/search?"
f"result=taxon&query=tax_tree({root})&limit=10000000")
it = fetch_lines(url)
columns = next(it).split("\t")
idx = 0 if columns[0]=="tax_id" else 1
return {c.split("\t")[idx] for c in it if c.split("\t")[idx] not in seen}
```
```python
@task
def update_ena_jsonl(new_ids: set[str], output_path: str, append: bool):
mode = "a" if append else "w"
def gen():
for taxid in tqdm(new_ids, desc="Fetch ENA"):
yield from fetch_lines(f"https://www.ebi.ac.uk/ena/taxonomy/rest/tax-id/{taxid}")
if append:
with open(output_path, mode) as f:
for line in gen():
f.write(line+"\n")
else:
replace_file(output_path, gen())
```
You can apply the same pattern to `add_jsonl_tax_ids` and `sort_jsonl_by_lineage` by using `replace_file(...)` with a generator, and drop all the manual `try/except/open/tmp/replace` noise. This keeps each task focused on just _what_ to read or write, not _how_ to do the boilerplate.
</issue_to_address>
### Comment 10
<location> `flows/lib/utils.py:692` </location>
<code_context>
result = subprocess.run(cmd, capture_output=True, text=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 11
<location> `flows/lib/utils.py:704` </location>
<code_context>
result = subprocess.run(cmd, capture_output=True, text=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 12
<location> `flows/updaters/api/api_config.py:171-173` </location>
<code_context>
return requests.get(
sts_url, headers={"Token": token, "Project": "ALL"}, verify=False, timeout=300
)
</code_context>
<issue_to_address>
**security (python.requests.security.disabled-cert-validation):** Certificate verification has been explicitly disabled. This permits insecure connections to insecure servers. Re-enable certification validation.
```suggestion
return requests.get(
sts_url, headers={"Token": token, "Project": "ALL"}, verify=True, timeout=300
)
```
*Source: opengrep*
</issue_to_address>
### Comment 13
<location> `flows/updaters/api/api_config.py:189-191` </location>
<code_context>
r = requests.get(
url, headers={"Token": token, "Project": "ALL"}, verify=False, timeout=300
).json()
</code_context>
<issue_to_address>
**security (python.requests.security.disabled-cert-validation):** Certificate verification has been explicitly disabled. This permits insecure connections to insecure servers. Re-enable certification validation.
```suggestion
r = requests.get(
url, headers={"Token": token, "Project": "ALL"}, verify=True, timeout=300
).json()
```
*Source: opengrep*
</issue_to_address>
### Comment 14
<location> `flows/updaters/update_genomehubs_taxonomy.py:75-81` </location>
<code_context>
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'Popen' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 15
<location> `flows/updaters/update_ncbi_taxonomy.py:35` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 16
<location> `flows/updaters/update_ncbi_taxonomy.py:42` </location>
<code_context>
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 17
<location> `flows/updaters/update_ncbi_taxonomy.py:57` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 18
<location> `flows/updaters/update_ott_taxonomy.py:36` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 19
<location> `flows/updaters/update_ott_taxonomy.py:42` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 20
<location> `flows/updaters/update_ott_taxonomy.py:110` </location>
<code_context>
result = subprocess.run(cmd, check=True, capture_output=True, text=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>
### Comment 21
<location> `flows/updaters/update_tolid_prefixes.py:41` </location>
<code_context>
subprocess.run(cmd, check=True)
</code_context>
<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.
*Source: opengrep*
</issue_to_address>Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.
| def upload_to_s3(local_path: str, s3_path: str, gz: bool = False) -> None: | ||
| """ | ||
| Upload a file to S3. | ||
|
|
||
| Args: | ||
| local_path (str): Path to the local file. | ||
| s3_path (str): Path to the remote file on s3. | ||
| gz (bool): Whether to gzip the file before uploading. Defaults to False. | ||
|
|
||
| Returns: |
There was a problem hiding this comment.
suggestion (bug_risk): Subprocess errors are printed and raised, but partial uploads may leave files in inconsistent state.
Use a try/finally block to guarantee temporary files are removed if the subprocess fails after creating a .gz file.
| @@ -596,6 +604,114 @@ def find_s3_file(s3_path: list, filename: str) -> str: | |||
| return None | |||
There was a problem hiding this comment.
suggestion: last_modified_git_remote parses GitLab URLs with strict assumptions about URL structure.
Relying on a fixed number of URL segments may cause failures if the URL structure varies. Please make the parsing logic more flexible or clearly specify the required format.
| Yields: | ||
| dict: The sequence report data as a JSON object, one line at a time. | ||
| """ | ||
| if not utils.is_safe_path(accession): |
There was a problem hiding this comment.
question (bug_risk): is_safe_path is used for accession, but accessions may contain underscores or other valid characters.
Verify that is_safe_path's regex allows all valid NCBI accession characters to prevent rejecting legitimate accessions.
| # if is_atypical_assembly(report, parsed): | ||
| # return None | ||
| processed_report = {**report, "processedAssemblyInfo": {"organelle": "nucleus"}} |
There was a problem hiding this comment.
nitpick: Commented-out code for filtering atypical assemblies may lead to confusion.
Remove the commented-out filtering code if it's not needed. If toggling is required, use a configuration option instead.
| @@ -76,3 +64,4 @@ def plugin(): | |||
| """Run the flow.""" | |||
| args = parse_args("Parse assembly data from a BlobToolKit BlobDir.") | |||
| parse_blobtoolkit_assembly_data(**vars(args)) | |||
There was a problem hiding this comment.
issue (bug_risk): parse_blobtoolkit_assembly_data is called twice in plugin().
Please remove the duplicate call unless it is required for correct functionality.
| @@ -72,3 +63,4 @@ def update_nhm(output_path: str, min_records: int) -> None: | |||
| ) | |||
|
|
|||
| update_nhm(**vars(args)) | |||
There was a problem hiding this comment.
issue (bug_risk): update_nhm is called twice in the main block.
Remove the duplicate call to prevent unnecessary repeated execution.
| "Fetch NCBI taxdump.", | ||
| ) | ||
|
|
||
| update_ncbi_taxonomy(**vars(args)) |
There was a problem hiding this comment.
issue (bug_risk): update_ncbi_taxonomy is called twice in the main block.
Remove the duplicate call to avoid running the flow twice unnecessarily.
Summary by Sourcery
Implement safe-path validation, S3 utilities, and file currency checks in utility library, enforce HTTP timeouts, and expand the Prefect pipeline with new taxonomy update flows
New Features:
Bug Fixes:
Enhancements:
Documentation: