Skip to content

historical assembly version backfill (phase 0)#3

Open
fchen13 wants to merge 11 commits intogenomehubs:mainfrom
fchen13:fang-assemblies
Open

historical assembly version backfill (phase 0)#3
fchen13 wants to merge 11 commits intogenomehubs:mainfrom
fchen13:fang-assemblies

Conversation

@fchen13
Copy link

@fchen13 fchen13 commented Dec 19, 2025

This PR implements a one-time backfill process to populate historical assembly versions for all existing assemblies in the genomehubs dataset. It enables version-aware milestone tracking by capturing superseded assembly versions that were previously not tracked.

Summary by Sourcery

Introduce a one-time historical backfill pipeline for superseded NCBI assemblies and add version/status handling to assembly parsing.

New Features:

  • Add a standalone backfill script that discovers, caches, and parses all historical NCBI assembly versions into a dedicated TSV using the existing parsing stack.
  • Support marking assemblies with a version status flag to distinguish current from superseded versions in parsed outputs.
  • Add optional data-freeze handling to the NCBI assemblies parser, allowing assemblies to be tagged and renamed according to freeze lists from S3.

Enhancements:

  • Improve robustness and traceability of the NCBI assemblies parsing flow with safer linked-assembly handling, optional atypical-assembly filtering, additional logging, and error trapping.
  • Define a new YAML config describing the schema for historical assembly outputs compatible with the existing genomehubs pipeline.

Documentation:

  • Add markdown documentation describing the backfill process, rationale, and testing workflow for historical assembly versions.

Tests:

  • Introduce a dedicated test suite and sample JSONL data to exercise the historical backfill script, version discovery, caching behavior, and accession parsing.

@sourcery-ai
Copy link

sourcery-ai bot commented Dec 19, 2025

Reviewer's Guide

Implements a one-time historical assembly version backfill pipeline plus version-aware parsing and data-freeze support for NCBI assemblies, producing a dedicated TSV of superseded assemblies while minimally impacting existing flows.

Sequence diagram for updated NCBI assemblies parsing with data freeze handling

sequenceDiagram
  participant Wrapper as parse_ncbi_assemblies_wrapper
  participant Flow as parse_ncbi_assemblies
  participant Proc as process_assembly_reports
  participant Report as process_assembly_report
  participant Seq as fetch_and_parse_sequence_report
  participant Add as add_report_to_parsed_reports
  participant FreezeDefault as set_data_freeze_default
  participant FreezeFile as parse_data_freeze_file
  participant FreezeProc as process_datafreeze_info
  participant Writer as write_to_tsv

  Wrapper->>Flow: parse_ncbi_assemblies(input_path, yaml_path, append, data_freeze_path)
  Flow->>Proc: process_assembly_reports(jsonl_path, config, biosamples, parsed, previous_report)

  loop for each report
    Proc->>Report: process_assembly_report(report, previous_report, config, parsed, version_status=current)
    alt processed_report is None or use_previous_report is True
      Proc-->>Flow: skip report
    else
      Proc->>Seq: fetch_and_parse_sequence_report(processed_report)
      Seq-->>Proc: update processed_report with sequence stats
      Proc->>Add: add_report_to_parsed_reports(parsed, processed_report, config, biosamples)
      Add-->>Proc: parsed updated
    end
  end

  alt data_freeze_path is None
    Flow->>FreezeDefault: set_data_freeze_default(parsed, data_freeze_name=latest)
    FreezeDefault-->>Flow: parsed assemblies annotated with latest
  else data_freeze_path provided
    Flow->>FreezeFile: parse_data_freeze_file(data_freeze_path)
    FreezeFile-->>Flow: data_freeze dict
    Flow->>FreezeProc: process_datafreeze_info(parsed, data_freeze, config)
    FreezeProc-->>Flow: parsed assemblies annotated with freeze subsets and assemblyID
  end

  Flow->>Writer: write_to_tsv(parsed, config)
  Writer-->>Flow: assembly TSV written
Loading

Entity relationship diagram for current vs historical assembly TSV outputs

erDiagram
  assemblies_current {
    string assemblyID PK
    string genbankAccession
    string refseqAccession
    string versionStatus
    string dataFreeze
  }

  assemblies_historical {
    string assemblyID PK
    string genbankAccession
    string refseqAccession
    string versionStatus
  }

  assemblies_current ||--o{ assemblies_historical : has_superseded_versions
Loading

Flow diagram for the historical assembly backfill pipeline

graph TD
  A[input_jsonl<br>assembly_data_report.jsonl]
  B[identify_assemblies_needing_backfill<br>version > 1]
  C[setup_cache_directories]
  D[for each assembly<br>base_accession, current_version]
  E[find_all_assembly_versions<br>FTP discovery + cache]
  F[NCBI_FTP]
  G[load_from_cache<br>version_discovery]
  H[save_to_cache<br>version_discovery]
  I[for each version<br>metadata fetch]
  J[load_from_cache<br>metadata]
  K[datasets_CLI<br>datasets summary genome]
  L[save_to_cache<br>metadata]
  M[parse_historical_version<br>version_status = superseded]
  N[process_assembly_report]
  O[fetch_and_parse_sequence_report]
  P[gh_utils.parse_report_values]
  Q[parsed dict<br>keyed by genbankAccession]
  R[periodic checkpoint<br>every 100 assemblies]
  S[gh_utils.write_tsv<br>batch write]
  T[save_checkpoint]
  U[assembly_historical.tsv<br>outputs]

  A --> B
  B --> C
  C --> D
  D --> E

  E --> G
  G -->|cache hit| E_done[use cached versions]
  G -->|cache miss| F
  F --> E
  E --> H
  E --> I

  I --> J
  J -->|cache hit| I_done[use cached metadata]
  J -->|cache miss| K
  K --> I
  I --> L
  I --> M

  M --> N
  M --> O
  M --> P
  P --> Q

  D --> R
  R --> S
  R --> T
  S --> U

  Q -->|final batch| S
Loading

File-Level Changes

Change Details Files
Add a standalone historical assembly backfill script that discovers, fetches, parses, and outputs superseded assembly versions using existing parsing logic plus caching and checkpointing.
  • Introduce backfill_historical_versions.py that scans an input JSONL for assemblies with version>1, discovers all versions via NCBI FTP, and fetches metadata with the NCBI Datasets CLI.
  • Implement two-tier JSON file caching (version discovery and per-accession metadata) under tmp/backfill_cache with age-based expiry.
  • Reuse existing genomehubs parsing pipeline by calling process_assembly_report with version_status='superseded' and fetch_and_parse_sequence_report, then writing rows via gh_utils.write_tsv.
  • Add helpers for accession/version parsing, backfill target identification, and a checkpoint mechanism that resumes processing by tracking processed assemblies.
flows/parsers/backfill_historical_versions.py
Extend the NCBI assemblies parser to track assembly version status and support optional data-freeze driven assembly IDs and subsets.
  • Add an optional version_status argument to process_assembly_report and set processedAssemblyInfo.versionStatus, defaulting to 'current' for backward compatibility.
  • Improve process_assembly_reports with per-report logging, defensive try/except around processing, and skip-on-error behavior.
  • Introduce data-freeze handling in parse_ncbi_assemblies: either assign a default 'latest' freeze to all parsed assemblies or, when provided, load a TSV from S3 and set per-assembly dataFreeze and assemblyID (accession_freezeName).
  • Wire data_freeze_path through parse_ncbi_assemblies and parse_ncbi_assemblies_wrapper and add tasks parse_data_freeze_file, set_data_freeze_default, and process_datafreeze_info.
flows/parsers/parse_ncbi_assemblies.py
Add a stub S3 parsing helper and a dedicated historical assemblies config to support the new backfill flow.
  • Add utils.parse_s3_file as a stub returning an empty dict so parse_ncbi_assemblies can import it without impacting existing callers.
  • Create configs/assembly_historical.yaml defining the TSV schema (headers, paths, identifiers, metadata) for assembly_historical.tsv including a new versionStatus column and assemblyID field tailored for historical versions.
flows/lib/utils.py
configs/assembly_historical.yaml
Introduce tests, fixtures, and documentation for the historical backfill feature.
  • Add tests/test_backfill.py with unit-style tests for accession parsing, backfill target identification, FTP version discovery, and cache behavior, plus a small JSONL fixture in tests/test_data/assembly_test_sample.jsonl.
  • Provide README_test_backfill.md with instructions for installing dependencies, running the backfill tests, and executing a small backfill run for manual verification.
  • Document the Phase 0 historical backfill design, usage, and operational details in Phase_0_PR_SUMMARY.md.
tests/test_backfill.py
tests/test_data/assembly_test_sample.jsonl
tests/README_test_backfill.md
Phase_0_PR_SUMMARY.md

Tips and commands

Interacting with Sourcery

  • Trigger a new review: Comment @sourcery-ai review on the pull request.
  • Continue discussions: Reply directly to Sourcery's review comments.
  • Generate a GitHub issue from a review comment: Ask Sourcery to create an
    issue from a review comment by replying to it. You can also reply to a
    review comment with @sourcery-ai issue to create an issue from it.
  • Generate a pull request title: Write @sourcery-ai anywhere in the pull
    request title to generate a title at any time. You can also comment
    @sourcery-ai title on the pull request to (re-)generate the title at any time.
  • Generate a pull request summary: Write @sourcery-ai summary anywhere in
    the pull request body to generate a PR summary at any time exactly where you
    want it. You can also comment @sourcery-ai summary on the pull request to
    (re-)generate the summary at any time.
  • Generate reviewer's guide: Comment @sourcery-ai guide on the pull
    request to (re-)generate the reviewer's guide at any time.
  • Resolve all Sourcery comments: Comment @sourcery-ai resolve on the
    pull request to resolve all Sourcery comments. Useful if you've already
    addressed all the comments and don't want to see them anymore.
  • Dismiss all Sourcery reviews: Comment @sourcery-ai dismiss on the pull
    request to dismiss all existing Sourcery reviews. Especially useful if you
    want to start fresh with a new review - don't forget to comment
    @sourcery-ai review to trigger a new review!

Customizing Your Experience

Access your dashboard to:

  • Enable or disable review features such as the Sourcery-generated pull request
    summary, the reviewer's guide, and others.
  • Change the review language.
  • Add, remove or edit custom review instructions.
  • Adjust other review settings.

Getting Help

Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey there - I've reviewed your changes and found some issues that need to be addressed.

Blocking issues:

  • Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'. (link)

General comments:

  • The new data-freeze handling path currently relies on utils.parse_s3_file, which is a stub returning {}, so providing data_freeze_path will silently do nothing; consider either wiring this up to a real S3/TSV reader or failing fast when data_freeze_path is set to avoid misleading callers.
  • process_datafreeze_info is a bit misleading in both naming and typing (it actually walks the parsed rows dict, not a single processed_report) and emits a print per row; renaming the parameter, tightening the type hints, and reducing per-row logging would make its intent clearer and avoid very noisy output on large runs.
  • The new is_atypical_assembly helper and its commented-out call in process_assembly_report look like unused/dead code in this version; either wire this filtering behind a config flag or remove it to keep the parser easier to follow.
Prompt for AI Agents
Please address the comments from this code review:

## Overall Comments
- The new data-freeze handling path currently relies on `utils.parse_s3_file`, which is a stub returning `{}`, so providing `data_freeze_path` will silently do nothing; consider either wiring this up to a real S3/TSV reader or failing fast when `data_freeze_path` is set to avoid misleading callers.
- `process_datafreeze_info` is a bit misleading in both naming and typing (it actually walks the `parsed` rows dict, not a single `processed_report`) and emits a `print` per row; renaming the parameter, tightening the type hints, and reducing per-row logging would make its intent clearer and avoid very noisy output on large runs.
- The new `is_atypical_assembly` helper and its commented-out call in `process_assembly_report` look like unused/dead code in this version; either wire this filtering behind a config flag or remove it to keep the parser easier to follow.

## Individual Comments

### Comment 1
<location> `flows/lib/utils.py:194-201` </location>
<code_context>
+The `parse_s3_file()` function in `flows/lib/utils.py` is currently a stub:
+
+```python
+def parse_s3_file(s3_path: str) -> dict:
+    return {}  # Placeholder
+```
</code_context>

<issue_to_address>
**issue (bug_risk):** parse_s3_file stub makes data-freeze path effectively a no-op and breaks the new data_freeze_path flow branch

Because `parse_s3_file` always returns `{}`, `process_datafreeze_info` will treat any provided `data_freeze_path` as an empty mapping, silently skipping updates while also bypassing `set_data_freeze_default`. If this stub is only for a backfill or temporary context, please either implement minimal TSV parsing here (to return the expected 2‑column mapping) or explicitly guard/disable the `data_freeze_path` branch when this stub is in use.
</issue_to_address>

### Comment 2
<location> `tests/test_backfill.py:19-22` </location>
<code_context>
+# Add parent directory to path
+sys.path.insert(0, str(Path(__file__).parent.parent))
+
+from flows.parsers.backfill_historical_versions import (
+    identify_assemblies_needing_backfill,
+    parse_accession,
+    find_all_assembly_versions
+)
+
</code_context>

<issue_to_address>
**suggestion (testing):** Add tests that exercise the full backfill flow and parse_historical_version behaviour, not just helper discovery functions.

Current tests cover only parsing helpers and cache behaviour, not the logic that guarantees the backfill itself is correct.

Please also cover:
- `parse_historical_version`: verify it calls `process_assembly_report` with `version_status="superseded"`, runs sequence parsing, and produces a row with the expected `assemblyID`.
- `backfill_historical_versions`: an orchestration/integration test that wires together version discovery, parsing, and TSV writing.

For example, you could:
- Mock `find_all_assembly_versions`, `parse_historical_version`, and `gh_utils.write_tsv`, then run `backfill_historical_versions` on a small JSONL fixture and assert the expected versions are requested/batched.
- Unit test `parse_historical_version` with synthetic `version_data`, asserting `versionStatus="superseded"` and the expected `assemblyID` format.

These tests will demonstrate that the end‑to‑end backfill behaves as described, not just that its helpers work in isolation.

Suggested implementation:

```python
import sys
from pathlib import Path

import pytest

# Ensure project root is importable when running this file directly
sys.path.insert(0, str(Path(__file__).parent.parent))

from flows.parsers import backfill_historical_versions as backfill_module
from flows.parsers.backfill_historical_versions import (
    parse_historical_version,
    backfill_historical_versions,
    find_all_assembly_versions,
)
from flows.utils import gh_utils


def test_parse_historical_version_calls_process_with_superseded_and_returns_row(monkeypatch):
    """
    Verify that parse_historical_version:
    - calls process_assembly_report with version_status="superseded"
    - performs sequence parsing (we assert that the sequence helper is invoked)
    - returns a row whose assemblyID is derived from the accession.
    """
    called = {
        "process_args": None,
        "sequence_args": None,
    }

    def fake_process_assembly_report(accession, version_status, *args, **kwargs):
        called["process_args"] = {
            "accession": accession,
            "version_status": version_status,
            "args": args,
            "kwargs": kwargs,
        }
        # Return a minimal row-like structure that parse_historical_version
        # is expected to adapt/augment.
        return {
            "assemblyID": f"{accession}_base",
            "accession": accession,
        }

    def fake_parse_sequences_for_assembly(row, *args, **kwargs):
        called["sequence_args"] = {
            "row": row,
            "args": args,
            "kwargs": kwargs,
        }
        # Simulate sequence parsing enriching the row
        row["hasSequences"] = True
        return row

    monkeypatch.setattr(
        backfill_module,
        "process_assembly_report",
        fake_process_assembly_report,
    )
    # Some codebases name this helper differently; we wire to a generic
    # parse_sequences_for_assembly symbol here.
    monkeypatch.setattr(
        backfill_module,
        "parse_sequences_for_assembly",
        fake_parse_sequences_for_assembly,
    )

    version_data = {
        "genbankAccession": "GCF_000001405.39",
    }

    row = parse_historical_version(version_data)

    # process_assembly_report should be invoked once with version_status="superseded"
    assert called["process_args"] is not None
    assert called["process_args"]["accession"] == "GCF_000001405.39"
    assert called["process_args"]["version_status"] == "superseded"

    # Sequence parsing should have been invoked with the row from process_assembly_report
    assert called["sequence_args"] is not None
    assert called["sequence_args"]["row"]["accession"] == "GCF_000001405.39"
    assert called["sequence_args"]["row"]["hasSequences"] is True

    # The returned row should contain an assemblyID derived from the accession
    assert "assemblyID" in row
    assert "GCF_000001405.39" in row["assemblyID"]


def test_backfill_historical_versions_integration(monkeypatch, tmp_path):
    """
    Orchestration/integration test for backfill_historical_versions:

    - Mocks version discovery (find_all_assembly_versions)
    - Mocks parsing of individual historical versions (parse_historical_version)
    - Mocks TSV writing (gh_utils.write_tsv)

    Then verifies that:
    - All discovered versions are requested
    - Batching/TSV writing is invoked with the expected rows.
    """
    # Input JSONL fixture: one assembly accession to backfill
    input_path = tmp_path / "assemblies.jsonl"
    assembly_record = {
        "assembly_accession": "GCF_000001405.39",
    }
    input_path.write_text(f"{assembly_record}\n", encoding="utf-8")

    # Mock version discovery: two superseded versions for the accession
    discovered_versions = [
        {"genbankAccession": "GCF_000001405.1"},
        {"genbankAccession": "GCF_000001405.2"},
    ]

    def fake_find_all_assembly_versions(accession, *args, **kwargs):
        assert accession == "GCF_000001405.39"
        return discovered_versions

    monkeypatch.setattr(
        backfill_module,
        "find_all_assembly_versions",
        fake_find_all_assembly_versions,
    )

    # Capture which version_data objects are passed to parse_historical_version
    parsed_versions = []

    def fake_parse_historical_version(version_data, *args, **kwargs):
        parsed_versions.append(version_data)
        accession = version_data["genbankAccession"]
        # Return a simple row keyed by accession
        return {
            "assemblyID": f"{accession}_row",
            "accession": accession,
        }

    monkeypatch.setattr(
        backfill_module,
        "parse_historical_version",
        fake_parse_historical_version,
    )

    # Capture TSV writes
    written_tsps = []

    def fake_write_tsv(path, rows, *args, **kwargs):
        written_tsps.append(
            {
                "path": Path(path),
                "rows": list(rows),
            }
        )

    monkeypatch.setattr(gh_utils, "write_tsv", fake_write_tsv)

    # Execute the backfill orchestration. The signature may include additional
    # arguments (such as batch size or output directory); we pass those that
    # are generally useful for orchestration tests.
    output_dir = tmp_path / "out"
    output_dir.mkdir()
    backfill_historical_versions(
        input_jsonl_path=str(input_path),
        output_dir=str(output_dir),
    )

    # All discovered versions should have been parsed
    assert len(parsed_versions) == len(discovered_versions)
    parsed_accessions = {v["genbankAccession"] for v in parsed_versions}
    assert parsed_accessions == {
        "GCF_000001405.1",
        "GCF_000001405.2",
    }

    # At least one TSV write should have occurred containing the returned rows
    assert written_tsps, "Expected gh_utils.write_tsv to be called at least once"
    all_written_rows = []
    for batch in written_tsps:
        all_written_rows.extend(batch["rows"])

    written_accessions = {r["accession"] for r in all_written_rows}
    assert written_accessions == {
        "GCF_000001405.1",
        "GCF_000001405.2",
    }

    # The assemblyID format should match what parse_historical_version emits
    for row in all_written_rows:
        assert row["assemblyID"].endswith("_row")

```

The above tests make a few assumptions that you may need to adapt:

1. **Import paths**
   - The code assumes:
     - `flows.parsers.backfill_historical_versions` is importable as a module.
     - `parse_historical_version`, `backfill_historical_versions`, and `find_all_assembly_versions` are defined in that module.
     - `flows.utils.gh_utils.write_tsv` exists and is the function used by `backfill_historical_versions` to emit TSVs.
   - If your actual module paths or function names differ, update the imports and the `monkeypatch.setattr(...)` targets accordingly.

2. **Helper names inside `parse_historical_version`**
   - The test assumes `parse_historical_version` internally calls:
     - `backfill_historical_versions.process_assembly_report(...)`
     - `backfill_historical_versions.parse_sequences_for_assembly(...)`
   - If your implementation uses different helper names or additional parameters, adjust the `monkeypatch.setattr` targets and the fake helper signatures to match the real code.

3. **Function signatures**
   - `parse_historical_version(version_data)` is called with a single `version_data` argument.
   - `backfill_historical_versions(input_jsonl_path=..., output_dir=...)` is called with keyword arguments `input_jsonl_path` and `output_dir`.
   - If your functions require additional parameters (e.g. cache paths, clients, batch sizes), update the test call sites and pass appropriate dummy values.

4. **Input JSONL structure**
   - The integration test assumes each JSONL line consumed by `backfill_historical_versions` contains an `assembly_accession` field used to drive `find_all_assembly_versions`.
   - If your JSONL schema uses a different field name, adjust `assembly_record` and the assertion in `fake_find_all_assembly_versions`.

These adjustments will align the tests with your actual implementation while keeping the overall intent: end‑to‑end validation that historical versions are discovered, parsed with `version_status="superseded"`, and written out via TSV batches.
</issue_to_address>

### Comment 3
<location> `flows/parsers/backfill_historical_versions.py:59` </location>
<code_context>
+        os.makedirs(cache_dir, exist_ok=True)
+
+
+def get_cache_path(cache_type: str, identifier: str) -> str:
+    """Generate cache file path for given type and identifier."""
+    safe_id = hashlib.md5(identifier.encode()).hexdigest()[:16]
</code_context>

<issue_to_address>
**issue (complexity):** Consider simplifying the cache key format and splitting `find_all_assembly_versions` into smaller helper functions to make caching behavior clearer and the code easier to understand and test.

- The cache key format currently mixes the raw identifier with a truncated hash, which adds indirection without extra safety. If you don’t depend on the raw identifier being visible in the filename, you can simplify cache paths to just use a stable hash (or a sanitized identifier) and keep the same behavior:

  ```python
  def get_cache_path(cache_type: str, identifier: str) -> str:
      # Full hash keeps collisions negligible and filenames short/fixed-length
      safe_id = hashlib.md5(identifier.encode("utf-8")).hexdigest()
      return f"tmp/backfill_cache/{cache_type}/{safe_id}.json"
  ```

  If you like having the raw ID in filenames for debugging, you can still drop the hash slice and keep the structure simpler and more predictable:

  ```python
  def get_cache_path(cache_type: str, identifier: str) -> str:
      safe_identifier = re.sub(r"[^A-Za-z0-9_.-]", "_", identifier)
      return f"tmp/backfill_cache/{cache_type}/{safe_identifier}.json"
  ```

- `find_all_assembly_versions` currently does FTP discovery, metadata fetching, and caching in one function, which makes it harder to test and evolve. You could split it into two small, single‑purpose helpers while keeping the external behavior intact:

  ```python
  def discover_version_accessions(base_accession: str) -> List[str]:
      base_match = re.match(r"(GC[AF]_\d+)", base_accession)
      if not base_match:
          return []
      base = base_match.group(1)

      setup_cache_directories()
      version_cache_path = get_cache_path("version_discovery", base)
      cached_data = load_from_cache(version_cache_path, max_age_days=7)
      if cached_data and "accessions" in cached_data:
          return cached_data["accessions"]

      ftp_url = f"https://ftp.ncbi.nlm.nih.gov/genomes/all/{base[:3]}/{base[4:7]}/{base[7:10]}/{base[10:13]}/"
      resp = requests.get(ftp_url, timeout=30)
      if resp.status_code != 200:
          return []

      version_pattern = rf"{base}\.\d+"
      accessions = sorted(set(re.findall(version_pattern, resp.text)))
      save_to_cache(version_cache_path, {"accessions": accessions, "ftp_url": ftp_url, "base_accession": base})
      return accessions


  def fetch_version_metadata(version_acc: str) -> Dict:
      metadata_cache_path = get_cache_path("metadata", version_acc)
      cached_metadata = load_from_cache(metadata_cache_path, max_age_days=30)
      if cached_metadata and "metadata" in cached_metadata:
          return cached_metadata["metadata"]

      cmd = ["datasets", "summary", "genome", "accession", version_acc, "--as-json-lines"]
      result = subprocess.run(cmd, capture_output=True, text=True, encoding="utf-8", errors="ignore", timeout=60)
      if result.returncode != 0 or not result.stdout.strip():
          return {}

      version_data = json.loads(result.stdout.strip())
      save_to_cache(metadata_cache_path, {"metadata": version_data, "cached_at": time.time()})
      return version_data


  def find_all_assembly_versions(base_accession: str) -> List[Dict]:
      accessions = discover_version_accessions(base_accession)
      versions: List[Dict] = []
      for version_acc in accessions:
          metadata = fetch_version_metadata(version_acc)
          if metadata:
              versions.append(metadata)
      return versions
  ```

  This keeps the same caching semantics and call site (`find_all_assembly_versions` stays public) but makes version discovery vs. metadata retrieval easier to reason about and test independently.
</issue_to_address>

### Comment 4
<location> `flows/parsers/backfill_historical_versions.py:153-160` </location>
<code_context>
                result = subprocess.run(
                    cmd,
                    capture_output=True,
                    text=True,
                    encoding='utf-8',
                    errors='ignore',  # Handle Unicode gracefully
                    timeout=60
                )
</code_context>

<issue_to_address>
**security (python.lang.security.audit.dangerous-subprocess-use-audit):** Detected subprocess function 'run' without a static string. If this data can be controlled by a malicious actor, it may be an instance of command injection. Audit the use of this call to ensure it is not controllable by an external resource. You may consider using 'shlex.escape()'.

*Source: opengrep*
</issue_to_address>

Sourcery is free for open source - if you like our reviews please consider sharing them ✨
Help me be more useful! Please click 👍 or 👎 on each comment and I'll use the feedback to improve your reviews.

Comment on lines +19 to +22
from flows.parsers.backfill_historical_versions import (
identify_assemblies_needing_backfill,
parse_accession,
find_all_assembly_versions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion (testing): Add tests that exercise the full backfill flow and parse_historical_version behaviour, not just helper discovery functions.

Current tests cover only parsing helpers and cache behaviour, not the logic that guarantees the backfill itself is correct.

Please also cover:

  • parse_historical_version: verify it calls process_assembly_report with version_status="superseded", runs sequence parsing, and produces a row with the expected assemblyID.
  • backfill_historical_versions: an orchestration/integration test that wires together version discovery, parsing, and TSV writing.

For example, you could:

  • Mock find_all_assembly_versions, parse_historical_version, and gh_utils.write_tsv, then run backfill_historical_versions on a small JSONL fixture and assert the expected versions are requested/batched.
  • Unit test parse_historical_version with synthetic version_data, asserting versionStatus="superseded" and the expected assemblyID format.

These tests will demonstrate that the end‑to‑end backfill behaves as described, not just that its helpers work in isolation.

Suggested implementation:

import sys
from pathlib import Path

import pytest

# Ensure project root is importable when running this file directly
sys.path.insert(0, str(Path(__file__).parent.parent))

from flows.parsers import backfill_historical_versions as backfill_module
from flows.parsers.backfill_historical_versions import (
    parse_historical_version,
    backfill_historical_versions,
    find_all_assembly_versions,
)
from flows.utils import gh_utils


def test_parse_historical_version_calls_process_with_superseded_and_returns_row(monkeypatch):
    """
    Verify that parse_historical_version:
    - calls process_assembly_report with version_status="superseded"
    - performs sequence parsing (we assert that the sequence helper is invoked)
    - returns a row whose assemblyID is derived from the accession.
    """
    called = {
        "process_args": None,
        "sequence_args": None,
    }

    def fake_process_assembly_report(accession, version_status, *args, **kwargs):
        called["process_args"] = {
            "accession": accession,
            "version_status": version_status,
            "args": args,
            "kwargs": kwargs,
        }
        # Return a minimal row-like structure that parse_historical_version
        # is expected to adapt/augment.
        return {
            "assemblyID": f"{accession}_base",
            "accession": accession,
        }

    def fake_parse_sequences_for_assembly(row, *args, **kwargs):
        called["sequence_args"] = {
            "row": row,
            "args": args,
            "kwargs": kwargs,
        }
        # Simulate sequence parsing enriching the row
        row["hasSequences"] = True
        return row

    monkeypatch.setattr(
        backfill_module,
        "process_assembly_report",
        fake_process_assembly_report,
    )
    # Some codebases name this helper differently; we wire to a generic
    # parse_sequences_for_assembly symbol here.
    monkeypatch.setattr(
        backfill_module,
        "parse_sequences_for_assembly",
        fake_parse_sequences_for_assembly,
    )

    version_data = {
        "genbankAccession": "GCF_000001405.39",
    }

    row = parse_historical_version(version_data)

    # process_assembly_report should be invoked once with version_status="superseded"
    assert called["process_args"] is not None
    assert called["process_args"]["accession"] == "GCF_000001405.39"
    assert called["process_args"]["version_status"] == "superseded"

    # Sequence parsing should have been invoked with the row from process_assembly_report
    assert called["sequence_args"] is not None
    assert called["sequence_args"]["row"]["accession"] == "GCF_000001405.39"
    assert called["sequence_args"]["row"]["hasSequences"] is True

    # The returned row should contain an assemblyID derived from the accession
    assert "assemblyID" in row
    assert "GCF_000001405.39" in row["assemblyID"]


def test_backfill_historical_versions_integration(monkeypatch, tmp_path):
    """
    Orchestration/integration test for backfill_historical_versions:

    - Mocks version discovery (find_all_assembly_versions)
    - Mocks parsing of individual historical versions (parse_historical_version)
    - Mocks TSV writing (gh_utils.write_tsv)

    Then verifies that:
    - All discovered versions are requested
    - Batching/TSV writing is invoked with the expected rows.
    """
    # Input JSONL fixture: one assembly accession to backfill
    input_path = tmp_path / "assemblies.jsonl"
    assembly_record = {
        "assembly_accession": "GCF_000001405.39",
    }
    input_path.write_text(f"{assembly_record}\n", encoding="utf-8")

    # Mock version discovery: two superseded versions for the accession
    discovered_versions = [
        {"genbankAccession": "GCF_000001405.1"},
        {"genbankAccession": "GCF_000001405.2"},
    ]

    def fake_find_all_assembly_versions(accession, *args, **kwargs):
        assert accession == "GCF_000001405.39"
        return discovered_versions

    monkeypatch.setattr(
        backfill_module,
        "find_all_assembly_versions",
        fake_find_all_assembly_versions,
    )

    # Capture which version_data objects are passed to parse_historical_version
    parsed_versions = []

    def fake_parse_historical_version(version_data, *args, **kwargs):
        parsed_versions.append(version_data)
        accession = version_data["genbankAccession"]
        # Return a simple row keyed by accession
        return {
            "assemblyID": f"{accession}_row",
            "accession": accession,
        }

    monkeypatch.setattr(
        backfill_module,
        "parse_historical_version",
        fake_parse_historical_version,
    )

    # Capture TSV writes
    written_tsps = []

    def fake_write_tsv(path, rows, *args, **kwargs):
        written_tsps.append(
            {
                "path": Path(path),
                "rows": list(rows),
            }
        )

    monkeypatch.setattr(gh_utils, "write_tsv", fake_write_tsv)

    # Execute the backfill orchestration. The signature may include additional
    # arguments (such as batch size or output directory); we pass those that
    # are generally useful for orchestration tests.
    output_dir = tmp_path / "out"
    output_dir.mkdir()
    backfill_historical_versions(
        input_jsonl_path=str(input_path),
        output_dir=str(output_dir),
    )

    # All discovered versions should have been parsed
    assert len(parsed_versions) == len(discovered_versions)
    parsed_accessions = {v["genbankAccession"] for v in parsed_versions}
    assert parsed_accessions == {
        "GCF_000001405.1",
        "GCF_000001405.2",
    }

    # At least one TSV write should have occurred containing the returned rows
    assert written_tsps, "Expected gh_utils.write_tsv to be called at least once"
    all_written_rows = []
    for batch in written_tsps:
        all_written_rows.extend(batch["rows"])

    written_accessions = {r["accession"] for r in all_written_rows}
    assert written_accessions == {
        "GCF_000001405.1",
        "GCF_000001405.2",
    }

    # The assemblyID format should match what parse_historical_version emits
    for row in all_written_rows:
        assert row["assemblyID"].endswith("_row")

The above tests make a few assumptions that you may need to adapt:

  1. Import paths

    • The code assumes:
      • flows.parsers.backfill_historical_versions is importable as a module.
      • parse_historical_version, backfill_historical_versions, and find_all_assembly_versions are defined in that module.
      • flows.utils.gh_utils.write_tsv exists and is the function used by backfill_historical_versions to emit TSVs.
    • If your actual module paths or function names differ, update the imports and the monkeypatch.setattr(...) targets accordingly.
  2. Helper names inside parse_historical_version

    • The test assumes parse_historical_version internally calls:
      • backfill_historical_versions.process_assembly_report(...)
      • backfill_historical_versions.parse_sequences_for_assembly(...)
    • If your implementation uses different helper names or additional parameters, adjust the monkeypatch.setattr targets and the fake helper signatures to match the real code.
  3. Function signatures

    • parse_historical_version(version_data) is called with a single version_data argument.
    • backfill_historical_versions(input_jsonl_path=..., output_dir=...) is called with keyword arguments input_jsonl_path and output_dir.
    • If your functions require additional parameters (e.g. cache paths, clients, batch sizes), update the test call sites and pass appropriate dummy values.
  4. Input JSONL structure

    • The integration test assumes each JSONL line consumed by backfill_historical_versions contains an assembly_accession field used to drive find_all_assembly_versions.
    • If your JSONL schema uses a different field name, adjust assembly_record and the assertion in fake_find_all_assembly_versions.

These adjustments will align the tests with your actual implementation while keeping the overall intent: end‑to‑end validation that historical versions are discovered, parsed with version_status="superseded", and written out via TSV batches.

- Keep versionStatus field from PR branch (key feature)
- Integrate security improvements from upstream (is_safe_path, run_quoted)
- Maintain backward compatibility with default version_status='current'
@fchen13
Copy link
Author

fchen13 commented Dec 19, 2025

@richardchallis This PR is ready for review!

Implements Phase 0 of historical assembly version backfill. All tests passing (4/4).

Questions for review:

  1. Is the parse_s3_file() implementation acceptable?
  2. Should historical versions go to a different S3 path?
  3. Any preferences for checkpoint file location?

Copy link
Contributor

@rjchallis rjchallis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fchen13 great to see you've ported your prototype into this repo. Looks like it is working well from a quick run on the test data!

Looking through the code the logic looks good, I've just commented on a few implementation details to help align it with the rest of the code


### Manual Test (3 assemblies)
```bash
python flows/parsers/backfill_historical_versions.py \
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parsers should run as modules, not scripts - changing the way it is called to python -m flows.parsers.backfill_historical_versions will ensure all relative imports work and that prefect will be able to call it without any issues

Copy link
Author

@fchen13 fchen13 Dec 19, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated all documentation to use module syntax (python -m flows.parsers.backfill_historical_versions) instead of script syntax. Fixed in commit 792f34e.

Changes:

  • Updated docstring in the parser
  • Updated test README with correct usage

- taxon_id
format: tsv
header: true
name: ../outputs/assembly_historical.tsv
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use paths here - just use the file name, it should be written alongside the yaml file. There are helpers to copy the yaml into a working directory as part of the fetch_parse_validate workflow when this gets run in production

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed from ../outputs/assembly_historical.tsv to just assembly_historical.tsv.

Now the file will be written alongside the YAML when the workflow copies it to the working directory. Fixed in commit 91ab12b.

header: versionStatus
path: processedAssemblyInfo.versionStatus

file:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should have a needs section to pull in full attribute definitions as per ncbi_datasets yaml

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added needs section referencing ncbi_datasets_eukaryota.types.yaml to inherit the base attribute definitions. Fixed in commit 91ab12b.


```bash
export SKIP_PREFECT=true
python flows/parsers/backfill_historical_versions.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run as a module

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

# =============================================================================

if __name__ == '__main__':
import argparse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't use argparse directly - use the shared from flows.parsers.args import parse_args as in ncbi_datasets parser. All parsers must use the same args to fit into the fetch_parse_validate workflow. If more args are needed for a parser then all parsers must be updated to handle/ignore the extra arg, but avoiding new args with naming conventions is best

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Replaced direct argparse usage with the shared argument parser from flows.lib.shared_args
  • Used the standard INPUT_PATH and YAML_PATH arguments that all other parsers use
  • Created a custom CHECKPOINT argument following the shared argument pattern
  • Updated argument names:
    --input → --input_path
    --config → --yaml_path
    --checkpoint remains the same but now has a default value
  • Updated documentation in both the script's docstring and tests/README_test_backfill.md

@rjchallis
Copy link
Contributor

Sorry, forgot to answer your questions directly:

  1. Is the parse_s3_file() implementation acceptable?

There is a working implementation of this function lower down in the utils file so don't redefine it

  1. Should historical versions go to a different S3 path?

Will be easiest to manage the parser args if it uses a similar pattern to parse ncbi datasets to determine the path. Probably easiest if historic parsing lives in the assembly-data directory alongside main parsed assemblies so they can share ATTR_ yamls

  1. Any preferences for checkpoint file location?

Putting them in a subdirectory alongside the json/tsv will be easiest to maintain, keeps related files together and means location can be determined without extra args

Fang Chen and others added 7 commits December 19, 2025 10:23
Use pattern GC[AF]_\d{9}\.\d+ to validate version_acc before subprocess call.
This addresses the security concern raised by sourcery-ai and confirmed by @rjchallis.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
As suggested by @rjchallis, removed the stub function since the real
implementation is already present in the same file at line 769.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
As suggested by @rjchallis, updated all documentation to show running
the parser as a module rather than as a script:

- Changed: python flows/parsers/backfill_historical_versions.py
- To: python -m flows.parsers.backfill_historical_versions

This ensures:
- Relative imports work correctly
- Prefect can call it without issues
- Follows Python best practices

Updated files:
- flows/parsers/backfill_historical_versions.py (docstring)
- tests/README_test_backfill.md (usage examples)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Two changes per @rjchallis feedback:

1. Changed output filename from '../outputs/assembly_historical.tsv'
   to 'assembly_historical.tsv'
   - File should be written alongside the YAML file
   - Workflow helpers copy YAML to working directory

2. Added 'needs' section to pull in base attribute definitions
   - References ncbi_datasets_eukaryota.types.yaml
   - Inherits standard field definitions

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Remove hash generation from cache file paths and use just the
accession for better human readability. Also remove unused hashlib
import.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
Replace direct argparse usage with the shared argument parser system
used by all other parsers. This ensures consistent argument handling
across the workflow and follows project conventions.

Changes:
- Use INPUT_PATH and YAML_PATH from shared_args
- Define custom CHECKPOINT argument following shared pattern
- Update argument names: --input → --input_path, --config → --yaml_path
- Update documentation to reflect new argument names

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
@fchen13 fchen13 closed this Dec 22, 2025
@fchen13 fchen13 deleted the fang-assemblies branch December 22, 2025 13:47
@fchen13 fchen13 reopened this Dec 22, 2025
Copy link
Contributor

@rjchallis rjchallis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @fchen13 - This is good, but still a few changes needed.

I just ran it on 4006 assemblies, which looked to all process successfully, but the resulting output file has only 6 assemblies. Re-running from the cached data gave the same output so there seems to be an error with loading the cached data and the only lines in the file are from after the last checkpoint.

Once that is working it looks like only minor changes needed to update the example yaml, ensure command line args match other parsers and catch any flake8 complaints

# This config defines the schema for assembly_historical.tsv
# which contains all superseded assembly versions

needs:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be in the file section and should refer to ATTR_assembly.types.yaml directly.


if __name__ == '__main__':
from flows.lib.shared_args import parse_args as _parse_args, required, default
from flows.lib.shared_args import INPUT_PATH, YAML_PATH
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imports should be at the top level

from flows.lib.shared_args import INPUT_PATH, YAML_PATH

# Define checkpoint argument
CHECKPOINT = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

arguments must be defined in shared_args and used consistently across all parsers. When called by prefect, this will be as part of fetch-parse-validate to get yaml/tsv files from s3/github repo so will need to use identical command line arguments to other parsers.

If a suitable location can be derived based on the yaml/input location then add a function to set the checkpoint. If it needs to be in a separate folder, consider making this more generic, e.g. tmpdir, workingdir, etc if that would be more reusable across parsers. If you still need to add an argument, add it to shared_agrs and ensure all parsers work with the new variable, even if they just ignore it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants