diff --git a/Phase_0_PR_SUMMARY.md b/Phase_0_PR_SUMMARY.md new file mode 100644 index 0000000..dfdd1dd --- /dev/null +++ b/Phase_0_PR_SUMMARY.md @@ -0,0 +1,141 @@ +# Pull Request: Historical Assembly Version Backfill (Phase 0) + +## Summary + +This PR implements a one-time backfill process to populate historical assembly versions for all existing assemblies in the genomehubs dataset. It enables version-aware milestone tracking by capturing superseded assembly versions that were previously not tracked. + +## Solution Overview + +Implements a **one-time backfill script** that: +1. Identifies assemblies with version > 1 (indicating historical versions exist) +2. Discovers all historical versions via NCBI FTP +3. Fetches metadata for each historical version using NCBI Datasets CLI +4. Outputs historical versions to a separate TSV file with `versionStatus = "superseded"` +5. Uses smart caching to avoid re-fetching data on reruns + +## Key Features + +### 1. FTP-Based Version Discovery +- Queries NCBI FTP to find all versions of each assembly +- More reliable than API for historical data +- Based on proven DToL prototype implementation + +### 2. Reuses existing Parser +- Calls `parse_ncbi_assemblies.process_assembly_report()` with `version_status="superseded"` +- Fetches sequence reports and computes metrics identically to current assemblies +- Uses same YAML config system +- Generates identical TSV schema (plus `versionStatus` field) + +### 3. Smart 2-Tier Caching +- **Version discovery cache** (7-day expiry): Stores which versions exist +- **Metadata cache** (30-day expiry): Stores assembly metadata from Datasets CLI +- Dramatically reduces runtime on reruns (from hours to minutes) + +### 4. Checkpoint System +- Saves progress every 100 assemblies +- Allows resuming after interruptions +- Critical for processing ~3,694 assemblies + +## Files Modified/Created + +### New Files +- `flows/parsers/backfill_historical_versions.py` - Main backfill script +- `configs/assembly_historical.yaml` - Output schema configuration +- `tests/test_backfill.py` - Automated test suite +- `tests/test_data/assembly_test_sample.jsonl` - Test dataset (3 assemblies) +- `tests/README_test_backfill.md` - Testing documentation + +### Modified Files +- `flows/parsers/parse_ncbi_assemblies.py` - Added `version_status` parameter +- `flows/lib/utils.py` - Added `parse_s3_file` stub function + +## Schema Changes + +### New Field in Output +- Same schema as current assemblies, plus `versionStatus` column +- `versionStatus` - Indicates if assembly is "current" or "superseded" + +### Expected Results +- **Input**: ~3,694 assemblies with version > 1 +- **Output**: ~8,500 historical version records +- **Runtime**: ~10-15 hours (first run), ~15-30 minutes (with cache) + +## Testing + +### Automated Tests (4/4 passing) +```bash +python tests/test_backfill.py +``` + +Tests verify: +1. ✅ Accession parsing +2. ✅ Assembly identification +3. ✅ FTP version discovery +4. ✅ Cache functionality + +### Manual Test (3 assemblies) +```bash +python flows/parsers/backfill_historical_versions.py \ + --input tests/test_data/assembly_test_sample.jsonl \ + --config configs/assembly_historical.yaml \ + --checkpoint tests/test_data/test_checkpoint.json +``` + +## Usage + +### One-Time Backfill +```bash +python flows/parsers/backfill_historical_versions.py \ + --input flows/parsers/eukaryota/ncbi_dataset/data/assembly_data_report.jsonl \ + --config configs/assembly_historical.yaml \ + --checkpoint backfill_checkpoint.json +``` +On first run: +- Takes ~10-15 hours (fetches all historical versions) +- Creates tmp/backfill_cache/ directory +- Checkpoints every 100 assemblies +- Safe to Ctrl+C and resume + +On subsequent runs (after input update): +- Takes ~15-30 minutes (uses cache for existing) +- Only fetches NEW assemblies +- Cache expires: version discovery (7 days), metadata (30 days) + +Output: +- outputs/assembly_historical.tsv (all superseded versions) +- Each row has version_status="superseded" +- Includes all sequence reports and metrics + +## Important Notes + +### 1. Stub Function Warning +The `parse_s3_file()` function in `flows/lib/utils.py` is currently a stub: + +```python +def parse_s3_file(s3_path: str) -> dict: + return {} # Placeholder +``` + +**Action needed**: If Rich has a real implementation, replace this stub. The function is imported by `parse_ncbi_assemblies.py` but not used by the backfill script. + +### 2. One-Time Process +This backfill is designed to run **once** to populate historical data. Future updates should be handled by the incremental daily pipeline. + +### 3. Cache Directory +The cache directory `tmp/backfill_cache/` can be safely deleted after successful completion. It contains: +- Version discovery data (FTP queries) +- Assembly metadata (Datasets CLI responses) + +## Next Steps (After PR Merge) + +1. **Run full backfill** on production dataset (~10-15 hours) +2. **Upload output** to appropriate S3 location +3. **Implement Phase 1**: Incremental daily updates for new historical versions +4. **Implement Phase 2**: Version-aware milestone tracking + +## Questions for Reviewer + +1. Is the `parse_s3_file()` stub acceptable, or do you have a real implementation to use? +2. Should historical versions go to a different S3 path than current assemblies? +3. Any preferences for checkpoint file location/naming? + diff --git a/configs/assembly_historical.yaml b/configs/assembly_historical.yaml new file mode 100644 index 0000000..85557b1 --- /dev/null +++ b/configs/assembly_historical.yaml @@ -0,0 +1,202 @@ +# Configuration for historical assembly version backfill +# This config defines the schema for assembly_historical.tsv +# which contains all superseded assembly versions + +needs: + - ncbi_datasets_eukaryota.types.yaml + +attributes: + assembly_level: + header: assemblyLevel + path: assemblyInfo.assemblyLevel + assembly_span: + header: totalSequenceLength + path: assemblyStats.totalSequenceLength + assigned_percent: + header: assignedProportion + path: processedAssemblyStats.assignedProportion + assembly_status: + header: primaryValue + path: processedAssemblyInfo.primaryValue + translate: + "1": primary + assembly_type: + header: assemblyType + path: assemblyInfo.assemblyType + bioproject: + header: bioProjectAccession + path: assemblyInfo.bioprojectLineage.bioprojects.accession + separator: + - "," + biosample: + header: biosampleAccession + path: assemblyInfo.biosample.accession + separator: + - "," + chromosome_count: + header: totalNumberOfChromosomes + path: assemblyStats.totalNumberOfChromosomes + contig_count: + header: numberOfContigs + path: assemblyStats.numberOfContigs + contig_l50: + header: contigL50 + path: assemblyStats.contigL50 + contig_n50: + header: contigN50 + path: assemblyStats.contigN50 + ebp_standard_criteria: + header: ebpStandardCriteria + path: processedAssemblyStats.ebpStandardCriteria + separator: + - "," + ebp_standard_date: + header: ebpStandardDate + path: processedAssemblyStats.ebpStandardDate + gc_percent: + header: gcPercent + path: assemblyStats.gcPercent + gene_count: + header: geneCountTotal + path: annotationInfo.stats.geneCounts.total + gene_count.source.date: + header: annotationReleaseDate + path: annotationInfo.releaseDate + isolate: + header: isolate + path: assemblyInfo.biosample.attributes.name==isolate.value + last_updated: + header: releaseDate + path: assemblyInfo.releaseDate + mitochondrion_accession: + header: mitochondrionAccession + path: processedOrganelleInfo.mitochondrion.accession + separator: + - ; + mitochondrion_assembly_span: + header: mitochondrionAssemblySpan + path: processedOrganelleInfo.mitochondrion.assemblySpan + mitochondrion_gc_percent: + header: mitochondrionGcPercent + path: processedOrganelleInfo.mitochondrion.gcPercent + mitochondrion_scaffolds: + header: mitochondrionScaffolds + path: processedOrganelleInfo.mitochondrion.scaffolds + separator: + - ; + noncoding_gene_count: + header: geneCountNoncoding + path: annotationInfo.stats.geneCounts.nonCoding + noncoding_gene_count.source.date: + header: annotationReleaseDate + path: annotationInfo.releaseDate + plastid_accession: + header: plastidAccession + path: processedOrganelleInfo.plastid.accession + separator: + - ; + plastid_assembly_span: + header: plastidAssemblySpan + path: processedOrganelleInfo.plastid.assemblySpan + plastid_gc_percent: + header: plastidGcPercent + path: processedOrganelleInfo.plastid.gcPercent + plastid_scaffolds: + header: plastidScaffolds + path: processedOrganelleInfo.plastid.scaffolds + separator: + - ; + protein_count: + header: geneCountProteincoding + path: annotationInfo.stats.geneCounts.proteinCoding + protein_count.source.date: + header: annotationReleaseDate + path: annotationInfo.releaseDate + pseudogene_count: + header: geneCountPseudogene + path: annotationInfo.stats.geneCounts.pseudogene + pseudogene.source.date: + header: annotationReleaseDate + path: annotationInfo.releaseDate + refseq_category: + header: refseqCategory + path: assemblyInfo.refseqCategory + sample_sex: + header: sex + path: assemblyInfo.biosample.attributes.name==sex.value + scaffold_count: + header: numberOfScaffolds + path: assemblyStats.numberOfScaffolds + scaffold_l50: + header: scaffoldL50 + path: assemblyStats.scaffoldL50 + scaffold_n50: + header: scaffoldN50 + path: assemblyStats.scaffoldN50 + sequence_count: + header: numberOfComponentSequences + path: assemblyStats.numberOfComponentSequences + submitter: + header: submitter + path: assemblyInfo.submitter + ungapped_span: + header: totalUngappedLength + path: assemblyStats.totalUngappedLength + # NEW: Version-specific field to indicate this is a historical/superseded version + version_status: + header: versionStatus + path: processedAssemblyInfo.versionStatus + +file: + display_group: general + exclusions: + attributes: + - bioproject + - biosample + identifiers: + - assembly_id + taxonomy: + - taxon_id + format: tsv + header: true + name: assembly_historical.tsv + source: INSDC + source_url_stub: https://www.ncbi.nlm.nih.gov/assembly/ + +identifiers: + assembly_id: + header: assemblyID + path: processedAssemblyInfo.assemblyID + assembly_name: + header: assemblyName + path: assemblyInfo.assemblyName + genbank_accession: + header: genbankAccession + path: processedAssemblyInfo.genbankAccession + refseq_accession: + header: refseqAccession + path: processedAssemblyInfo.refseqAccession + wgs_accession: + header: wgsProjectAccession + path: wgsInfo.wgsProjectAccession + +metadata: + is_primary_value: + header: primaryValue + path: processedAssemblyInfo.primaryValue + source_slug: + header: genbankAccession + path: processedAssemblyInfo.genbankAccession + +names: + common_name: + header: commonName + path: organism.commonName + +taxonomy: + taxon: + header: organismName + path: organism.organismName + taxon_id: + header: taxId + path: organism.taxId diff --git a/flows/parsers/backfill_historical_versions.py b/flows/parsers/backfill_historical_versions.py new file mode 100644 index 0000000..9ad4d1e --- /dev/null +++ b/flows/parsers/backfill_historical_versions.py @@ -0,0 +1,474 @@ +#!/usr/bin/env python3 +""" +One-time historical backfill process for assembly versions. + +This script discovers and parses ALL superseded versions from NCBI for assemblies +that currently have version > 1. It should be run ONCE before starting the daily +incremental pipeline. + +Process: +1. Scan input JSONL for assemblies with version > 1 +2. Discover all versions via NCBI FTP directory listing (with caching) +3. Fetch metadata for each version via datasets command (with caching) +4. Parse using Rich's existing parser (includes sequence reports + metrics) +5. Write to assembly_historical.tsv with version_status="superseded" + +Caching: +- Version discovery cached 7 days (FTP queries) +- Individual metadata cached 30 days (datasets queries) +- On re-run: only fetches NEW assemblies + +Usage: + python -m flows.parsers.backfill_historical_versions \\ + --input_path flows/parsers/eukaryota/ncbi_dataset/data/assembly_data_report.jsonl \\ + --yaml_path configs/assembly_historical.yaml \\ + --checkpoint tmp/backfill_checkpoint.json +""" + +import json +import os +import re +import subprocess +import time +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Tuple + +import requests +from genomehubs import utils as gh_utils + +from flows.lib import utils +from flows.lib.conditional_import import task + + +# ============================================================================= +# Cache Management (from DToL prototype - proven to work) +# ============================================================================= + +def setup_cache_directories(): + """Create cache directory structure.""" + cache_dirs = [ + "tmp/backfill_cache/version_discovery", + "tmp/backfill_cache/metadata" + ] + for cache_dir in cache_dirs: + os.makedirs(cache_dir, exist_ok=True) + + +def get_cache_path(cache_type: str, identifier: str) -> str: + """Generate cache file path for given type and identifier.""" + return f"tmp/backfill_cache/{cache_type}/{identifier}.json" + + +def load_from_cache(cache_path: str, max_age_days: int = 30) -> Dict: + """Load data from cache if it exists and is recent enough.""" + try: + if os.path.exists(cache_path): + cache_age = time.time() - os.path.getmtime(cache_path) + if cache_age < (max_age_days * 24 * 3600): + with open(cache_path, 'r', encoding='utf-8') as f: + return json.load(f) + except Exception as e: + print(f" Warning: Could not load cache from {cache_path}: {e}") + return {} + + +def save_to_cache(cache_path: str, data: Dict): + """Save data to cache file.""" + try: + os.makedirs(os.path.dirname(cache_path), exist_ok=True) + with open(cache_path, 'w', encoding='utf-8') as f: + json.dump(data, f, indent=2, ensure_ascii=False) + except Exception as e: + print(f" Warning: Could not save cache to {cache_path}: {e}") + + +# ============================================================================= +# Version Discovery via FTP (KEY DIFFERENCE from Rich's parser) +# ============================================================================= + +def find_all_assembly_versions(base_accession: str) -> List[Dict]: + """ + Find all versions of an assembly by examining NCBI FTP structure. + + This is the KEY difference from Rich's parser: + - Rich's parser: Gets latest versions from input JSONL + - This function: Discovers ALL versions (including historical) via FTP + + Args: + base_accession: Full accession (e.g., GCA_000002035.3) + + Returns: + List of dicts with full NCBI metadata for each version + """ + # Extract base (e.g., GCA_000002035 from GCA_000002035.3) + base_match = re.match(r'(GC[AF]_\d+)', base_accession) + if not base_match: + return [] + + base = base_match.group(1) + + # Check version discovery cache first + setup_cache_directories() + version_cache_path = get_cache_path("version_discovery", base) + cached_data = load_from_cache(version_cache_path, max_age_days=7) + + if cached_data and 'versions' in cached_data: + print(f" Using cached version data for {base}") + return cached_data['versions'] + + print(f" Discovering versions for {base} via FTP") + versions = [] + + try: + # Construct FTP URL + # Example: https://ftp.ncbi.nlm.nih.gov/genomes/all/GCA/000/002/035/ + ftp_url = f"https://ftp.ncbi.nlm.nih.gov/genomes/all/{base[:3]}/{base[4:7]}/{base[7:10]}/{base[10:13]}/" + + # Get directory listing + response = requests.get(ftp_url, timeout=30) + if response.status_code != 200: + print(f" Warning: FTP query failed for {base}") + return [] + + # Parse HTML for version directories (e.g., GCA_000002035.1, GCA_000002035.2, etc.) + version_pattern = rf"{base}\.\d+" + found_versions = re.findall(version_pattern, response.text) + unique_versions = sorted(list(set(found_versions))) + + # Fetch metadata for each version + for version_acc in unique_versions: + metadata_cache_path = get_cache_path("metadata", version_acc) + cached_metadata = load_from_cache(metadata_cache_path, max_age_days=30) + + if cached_metadata and 'metadata' in cached_metadata: + versions.append(cached_metadata['metadata']) + continue + + # Fetch from NCBI datasets + try: + # Validate accession format to prevent command injection + # Pattern: GC[AF]_9digits.version (e.g., GCA_000001405.39) + version_pattern_strict = r"^GC[AF]_\d{9}\.\d+$" + if not re.match(version_pattern_strict, version_acc): + print(f" Skipping unexpected accession format: {version_acc}") + continue + + cmd = ["datasets", "summary", "genome", "accession", version_acc, "--as-json-lines"] + result = subprocess.run( + cmd, + capture_output=True, + text=True, + encoding='utf-8', + errors='ignore', # Handle Unicode gracefully + timeout=60 + ) + + if result.returncode == 0 and result.stdout and result.stdout.strip(): + version_data = json.loads(result.stdout.strip()) + versions.append(version_data) + save_to_cache(metadata_cache_path, {'metadata': version_data, 'cached_at': time.time()}) + else: + print(f" Warning: No metadata for {version_acc}") + + except Exception as e: + print(f" Warning: Error fetching {version_acc}: {e}") + continue + + # Cache the complete version discovery + cache_data = { + 'versions': versions, + 'base_accession': base, + 'discovered_at': time.time(), + 'ftp_url': ftp_url + } + save_to_cache(version_cache_path, cache_data) + + return versions + + except Exception as e: + print(f" Error discovering versions for {base}: {e}") + return [] + + +# ============================================================================= +# Parsing with Rich's Existing Functions +# ============================================================================= + +def parse_historical_version( + version_data: Dict, + config: utils.Config, + base_accession: str, + version_num: int, + current_accession: str +) -> Dict: + """ + Parse historical version using Rich's EXACT parser logic. + + This ensures consistency with current assemblies by: + - Using process_assembly_report() with version_status="superseded" + - Fetching sequence reports via fetch_and_parse_sequence_report() + - Computing all metrics identically to current parser + + Args: + version_data: Raw NCBI metadata from datasets command + config: Config object from YAML + base_accession: Base accession (e.g., GCA_000002035) + version_num: Version number (1, 2, 3, etc.) + current_accession: Latest version that superseded this one + + Returns: + Parsed assembly dict ready for TSV output + """ + from flows.parsers.parse_ncbi_assemblies import ( + fetch_and_parse_sequence_report, + process_assembly_report + ) + + # Convert keys to camelCase (Rich's standard) + version_data = utils.convert_keys_to_camel_case(version_data) + + # Process with Rich's parser (version_status="superseded") + processed_report = process_assembly_report( + report=version_data, + previous_report=None, + config=config, + parsed={}, + version_status="superseded" + ) + + # Fetch sequence reports (chromosomes, organelles, etc.) - Rich's critical step + fetch_and_parse_sequence_report(processed_report) + + # Set assemblyID in standard format (e.g., GCA_000222935_1) + # The versionStatus field already indicates this is "superseded" + processed_report["processedAssemblyInfo"]["assemblyID"] = f"{base_accession}_{version_num}" + + # Parse into TSV row format using Rich's parse functions + row = gh_utils.parse_report_values(config.parse_fns, processed_report) + + return row + + +def parse_version(accession: str) -> int: + """Extract version number from accession.""" + parts = accession.split('.') + return int(parts[1]) if len(parts) > 1 else 1 + + +def parse_accession(accession: str) -> Tuple[str, int]: + """Parse accession into base and version number.""" + parts = accession.split('.') + base = parts[0] + version = int(parts[1]) if len(parts) > 1 else 1 + return base, version + + +# ============================================================================= +# Checkpoint Management +# ============================================================================= + +def load_checkpoint(checkpoint_file: str) -> Dict: + """Load checkpoint data if exists.""" + if Path(checkpoint_file).exists(): + with open(checkpoint_file) as f: + return json.load(f) + return {} + + +def save_checkpoint(checkpoint_file: str, processed_count: int): + """Save checkpoint data.""" + Path(checkpoint_file).parent.mkdir(parents=True, exist_ok=True) + with open(checkpoint_file, 'w') as f: + json.dump({ + 'processed_count': processed_count, + 'timestamp': datetime.now().isoformat() + }, f, indent=2) + + +# ============================================================================= +# Main Backfill Logic +# ============================================================================= + +def identify_assemblies_needing_backfill(input_jsonl: str) -> List[Dict]: + """ + Identify assemblies with version > 1 that need historical backfill. + + Args: + input_jsonl: Path to assembly_data_report.jsonl + + Returns: + List of assembly info dicts needing backfill + """ + assemblies_needing_backfill = [] + + with open(input_jsonl) as f: + for line in f: + assembly = json.loads(line) + accession = assembly['accession'] + base_acc, version = parse_accession(accession) + + if version > 1: + assemblies_needing_backfill.append({ + 'base_accession': base_acc, + 'current_version': version, + 'current_accession': accession, + 'historical_versions_needed': list(range(1, version)) + }) + + return assemblies_needing_backfill + + +@task(log_prints=True) +def backfill_historical_versions( + input_jsonl: str, + config_yaml: str, + checkpoint_file: str = 'tmp/backfill_checkpoint.json' +): + """ + One-time backfill of all historical assembly versions. + + Process: + 1. Identify assemblies with version > 1 + 2. Discover all versions via FTP (cached) + 3. Fetch metadata via datasets (cached) + 4. Parse with Rich's parser + 5. Write to assembly_historical.tsv + + Args: + input_jsonl: Path to assembly_data_report.jsonl + config_yaml: Path to assembly_historical.yaml + checkpoint_file: Path for checkpoint data + """ + # Setup + setup_cache_directories() + config = utils.load_config(config_file=config_yaml) + + # Identify assemblies needing backfill + print("Scanning for assemblies needing historical backfill...") + assemblies_needing_backfill = identify_assemblies_needing_backfill(input_jsonl) + + if not assemblies_needing_backfill: + print("No assemblies with version > 1 found. Nothing to backfill.") + return + + # Load checkpoint + checkpoint = load_checkpoint(checkpoint_file) + start_index = checkpoint.get('processed_count', 0) + + total_assemblies = len(assemblies_needing_backfill) + total_versions = sum(len(a['historical_versions_needed']) for a in assemblies_needing_backfill) + + print(f"\n{'='*80}") + print("ONE-TIME HISTORICAL BACKFILL") + print(f"{'='*80}") + print(f" Assemblies to process: {total_assemblies}") + print(f" Total historical versions: {total_versions}") + + if start_index > 0: + print(f" Resuming from checkpoint: {start_index}/{total_assemblies}") + + print(f"{'='*80}\n") + + parsed = {} + processed = start_index + + for assembly_info in assemblies_needing_backfill[start_index:]: + base_acc = assembly_info['base_accession'] + current_version = assembly_info['current_version'] + current_accession = assembly_info['current_accession'] + + print(f"[{processed+1}/{total_assemblies}] {base_acc} (current: v{current_version})") + + # Discover all versions via FTP (uses cache) + all_versions = find_all_assembly_versions(current_accession) + + if not all_versions: + print(f" Warning: No versions found via FTP") + processed += 1 + continue + + # Parse each historical version (skip current) + for version_data in all_versions: + version_acc = version_data.get('accession', '') + version_num = parse_version(version_acc) + + # Only process historical versions + if version_num >= current_version: + continue + + try: + print(f" Parsing v{version_num}...", end=' ', flush=True) + + # Parse using Rich's parser + row = parse_historical_version( + version_data=version_data, + config=config, + base_accession=base_acc, + version_num=version_num, + current_accession=current_accession + ) + + # Add to parsed dict (keyed by genbank accession) + genbank_acc = row.get('genbankAccession', version_acc) + parsed[genbank_acc] = row + + print("✓") + + except Exception as e: + print(f"✗ ({e})") + continue + + processed += 1 + + # Checkpoint every 100 assemblies + if processed % 100 == 0: + print(f"\n→ Checkpoint: Writing batch to disk...") + gh_utils.write_tsv(parsed, config.headers, config.meta) + save_checkpoint(checkpoint_file, processed) + parsed = {} + print(f"→ Progress: {processed}/{total_assemblies} ({processed/total_assemblies*100:.1f}%)\n") + + # Final write + if parsed: + print(f"\n→ Writing final batch...") + gh_utils.write_tsv(parsed, config.headers, config.meta) + + # Final report + print(f"\n{'='*80}") + print("BACKFILL COMPLETE") + print(f"{'='*80}") + print(f" Processed: {processed}/{total_assemblies} assemblies") + print(f" Output: {config.meta['file_name']}") + print(f"\n Next step: Run daily incremental pipeline") + print(f"{'='*80}\n") + + +# ============================================================================= +# Main Entry Point +# ============================================================================= + +if __name__ == '__main__': + from flows.lib.shared_args import parse_args as _parse_args, required, default + from flows.lib.shared_args import INPUT_PATH, YAML_PATH + + # Define checkpoint argument + CHECKPOINT = { + "flags": ["--checkpoint"], + "keys": {"help": "Checkpoint file for resuming", "type": str} + } + + args = _parse_args( + [ + required(INPUT_PATH), + required(YAML_PATH), + default(CHECKPOINT, 'tmp/backfill_checkpoint.json') + ], + description='One-time historical backfill for assembly versions' + ) + + backfill_historical_versions( + input_jsonl=args.input_path, + config_yaml=args.yaml_path, + checkpoint_file=args.checkpoint + ) diff --git a/flows/parsers/parse_ncbi_assemblies.py b/flows/parsers/parse_ncbi_assemblies.py index c7ab5e1..3109f86 100644 --- a/flows/parsers/parse_ncbi_assemblies.py +++ b/flows/parsers/parse_ncbi_assemblies.py @@ -93,7 +93,11 @@ def is_atypical_assembly(report: dict, parsed: dict) -> bool: def process_assembly_report( - report: dict, previous_report: Optional[dict], config: Config, parsed: dict + report: dict, + previous_report: Optional[dict], + config: Config, + parsed: dict, + version_status: str = "current" ) -> dict: """Process assembly level information. @@ -110,6 +114,9 @@ def process_assembly_report( previous one. config (Config): A Config object containing the configuration data. parsed (dict): A dictionary containing parsed data. + version_status (str): Version status - "current" (default) or "superseded" + for historical versions. Defaults to "current" to maintain backward + compatibility with existing code. Returns: dict: The updated report dictionary. @@ -117,7 +124,10 @@ def process_assembly_report( # Uncomment to filter atypical assemblies # if is_atypical_assembly(report, parsed): # return None - processed_report = {**report, "processedAssemblyInfo": {"organelle": "nucleus"}} + processed_report = {**report, "processedAssemblyInfo": { + "organelle": "nucleus", + "versionStatus": version_status + }} if "pairedAccession" in report: if processed_report["pairedAccession"].startswith("GCF_"): processed_report["processedAssemblyInfo"]["refseqAccession"] = report[ diff --git a/tests/README_test_backfill.md b/tests/README_test_backfill.md new file mode 100644 index 0000000..471decf --- /dev/null +++ b/tests/README_test_backfill.md @@ -0,0 +1,57 @@ +# Testing Guide for Historical Assembly Backfill + +This guide explains how to test the backfill implementation before creating a PR. + +## Prerequisites + +### 1. Install Dependencies + +```bash +# Install genomehubs and other dependencies +pip install -r requirements.txt + +# Or if using conda: +conda install -c conda-forge genomehubs>=2.10.14 +pip install prefect requests +``` + +### 2. Install NCBI datasets CLI + +The backfill script requires the NCBI `datasets` command-line tool. + +```bash +# Download and install from: +# https://www.ncbi.nlm.nih.gov/datasets/docs/v2/download-and-install/ + +# Verify installation: +datasets --version +``` + +## Test Suite + +### Quick Unit Tests + +Run the automated test suite: + +```bash +cd "c:\Users\fchen13\ASU Dropbox\Fang Chen\Work Documents\EBP\genomehubs-data" +python tests/test_backfill.py +``` + +### Full Backfill Test (Small Dataset) + +Test the complete backfill process on 3 assemblies: + +```bash +export SKIP_PREFECT=true +python -m flows.parsers.backfill_historical_versions \ + --input_path tests/test_data/assembly_test_sample.jsonl \ + --yaml_path configs/assembly_historical.yaml \ + --checkpoint tests/test_data/test_checkpoint.json +``` + +```powershell +$env:SKIP_PREFECT="true" +python -m flows.parsers.backfill_historical_versions --input_path tests/test_data/assembly_test_sample.jsonl --yaml_path configs/assembly_historical.yaml --checkpoint tests/test_data/test_checkpoint.json +``` + diff --git a/tests/test_backfill.py b/tests/test_backfill.py new file mode 100644 index 0000000..d9d6c6c --- /dev/null +++ b/tests/test_backfill.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +""" +Test script for backfill_historical_versions.py + +This script tests the historical version backfill process on a small sample dataset. + +Usage: + python tests/test_backfill.py +""" + +import os +import sys +import json +from pathlib import Path + +# Add parent directory to path +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from flows.parsers.backfill_historical_versions import ( + identify_assemblies_needing_backfill, + parse_accession, + find_all_assembly_versions +) + +def test_parse_accession(): + """Test accession parsing.""" + print("\n" + "="*80) + print("TEST 1: Accession Parsing") + print("="*80) + + test_cases = [ + ("GCA_000222935.2", ("GCA_000222935", 2)), + ("GCA_003706615.3", ("GCA_003706615", 3)), + ("GCF_000001405.39", ("GCF_000001405", 39)), + ] + + for accession, expected in test_cases: + result = parse_accession(accession) + status = "PASS" if result == expected else "FAIL" + print(f" {status} {accession} -> {result}") + assert result == expected, f"Expected {expected}, got {result}" + + print(" All accession parsing tests passed!") + +def test_identify_assemblies(): + """Test identification of assemblies needing backfill.""" + print("\n" + "="*80) + print("TEST 2: Identify Assemblies Needing Backfill") + print("="*80) + + test_file = "tests/test_data/assembly_test_sample.jsonl" + + if not os.path.exists(test_file): + print(f" FAIL Test file not found: {test_file}") + return False + + assemblies = identify_assemblies_needing_backfill(test_file) + + print(f" Found {len(assemblies)} assemblies needing backfill:") + for asm in assemblies: + print(f" - {asm['current_accession']}: v{asm['current_version']} " + f"(needs v{asm['historical_versions_needed']})") + + # Verify we found the expected assemblies + expected_count = 3 # GCA_000222935.2, GCA_000412225.2, GCA_003706615.3 + assert len(assemblies) == expected_count, \ + f"Expected {expected_count} assemblies, found {len(assemblies)}" + + print(f" PASS Correctly identified {len(assemblies)} assemblies") + return True + +def test_version_discovery(): + """Test FTP-based version discovery (using cache if available).""" + print("\n" + "="*80) + print("TEST 3: Version Discovery via FTP") + print("="*80) + print(" Note: This test queries NCBI FTP - may take a minute...") + + # Test with a known multi-version assembly + test_accession = "GCA_000222935.2" # Aciculosporium take - has version 1 and 2 + + print(f" Testing: {test_accession}") + versions = find_all_assembly_versions(test_accession) + + if not versions: + print(f" FAIL No versions found (FTP query may have failed)") + print(f" This is not critical - may be network issue") + return False + + print(f" PASS Found {len(versions)} version(s):") + for v in versions: + acc = v.get('accession', 'unknown') + print(f" - {acc}") + + # Verify we found at least version 1 and 2 + accessions = [v.get('accession', '') for v in versions] + base = test_accession.split('.')[0] + + # Should find both v1 and v2 + expected_versions = [f"{base}.1", f"{base}.2"] + found_expected = [v for v in expected_versions if v in accessions] + + print(f" PASS Found {len(found_expected)}/{len(expected_versions)} expected versions") + return True + +def test_cache_functionality(): + """Test that caching works correctly.""" + print("\n" + "="*80) + print("TEST 4: Cache Functionality") + print("="*80) + + # Clean cache first (Windows-safe deletion) + import shutil + import time as time_module + cache_dir = "tmp/backfill_cache" + if os.path.exists(cache_dir): + try: + # Give time for file handles to close + time_module.sleep(0.5) + shutil.rmtree(cache_dir) + print(f" Cleared cache directory: {cache_dir}") + except PermissionError: + print(f" Note: Cache directory in use, will test with existing cache") + + test_accession = "GCA_000222935.2" + + # First call - should fetch from FTP + print(f" First call (should fetch from FTP)...") + import time + start = time.time() + versions1 = find_all_assembly_versions(test_accession) + time1 = time.time() - start + + if not versions1: + print(f" FAIL FTP fetch failed - skipping cache test") + return False + + print(f" Took {time1:.2f}s, found {len(versions1)} versions") + + # Second call - should use cache + print(f" Second call (should use cache)...") + start = time.time() + versions2 = find_all_assembly_versions(test_accession) + time2 = time.time() - start + + print(f" Took {time2:.2f}s, found {len(versions2)} versions") + + # Cache should be much faster + if time2 < time1 * 0.5: # At least 50% faster + print(f" PASS Cache is working (2nd call {time2/time1*100:.1f}% of 1st call time)") + else: + print(f" WARN Cache may not be working (times similar)") + + # Verify cache files exist + if os.path.exists(cache_dir): + cache_files = list(Path(cache_dir).rglob("*.json")) + print(f" PASS Cache directory created with {len(cache_files)} files") + else: + print(f" FAIL Cache directory not created") + + return True + +def main(): + """Run all tests.""" + print("\n" + "="*80) + print("BACKFILL SCRIPT TEST SUITE") + print("="*80) + print("Testing: flows/parsers/backfill_historical_versions.py") + print("="*80) + + results = { + "Accession Parsing": False, + "Identify Assemblies": False, + "Version Discovery": False, + "Cache Functionality": False, + } + + try: + # Run tests + test_parse_accession() + results["Accession Parsing"] = True + + results["Identify Assemblies"] = test_identify_assemblies() + results["Version Discovery"] = test_version_discovery() + results["Cache Functionality"] = test_cache_functionality() + + except Exception as e: + print(f"\nFAIL Test failed with error: {e}") + import traceback + traceback.print_exc() + + # Summary + print("\n" + "="*80) + print("TEST SUMMARY") + print("="*80) + + for test_name, passed in results.items(): + status = "PASS PASS" if passed else "FAIL FAIL" + print(f" {status}: {test_name}") + + passed_count = sum(results.values()) + total_count = len(results) + + print(f"\n Total: {passed_count}/{total_count} tests passed") + + if passed_count == total_count: + print("\n 🎉 All tests passed!") + return 0 + else: + print("\n WARN Some tests failed - review output above") + return 1 + +if __name__ == '__main__': + sys.exit(main()) diff --git a/tests/test_data/assembly_test_sample.jsonl b/tests/test_data/assembly_test_sample.jsonl new file mode 100644 index 0000000..fc540d2 --- /dev/null +++ b/tests/test_data/assembly_test_sample.jsonl @@ -0,0 +1,3 @@ +{"assemblyInfo":{"assemblyLevel":"Contig","assemblyName":"AciTa_1.0","assemblyType":"haploid","submitter":"University of Kentucky, Dept of Plant Pathology","refseqCategory":"reference genome","bioprojectLineage":[{"bioprojects":[{"accession":"PRJNA67241","title":"Aciculosporium take MAFF-241224 genome sequencing project"}]}],"sequencingTech":"454 GS FLX Titanium","blastUrl":"https://blast.ncbi.nlm.nih.gov/Blast.cgi?PAGE_TYPE=BlastSearch&PROG_DEF=blastn&BLAST_SPEC=GDH_GCA_000222935.2","biosample":{"accession":"SAMN02981340","lastUpdated":"2014-08-11T11:24:02.670","publicationDate":"2014-08-11T11:24:02.670","submissionDate":"2014-08-11T11:24:02.670","sampleIds":[{"db":"GenBank","value":"gb|AFQZ00000000.1"}],"description":{"title":"Sample from Aciculosporium take MAFF-241224","organism":{"taxId":1036760,"organismName":"Aciculosporium take MAFF-241224"}},"owner":{"name":"University of Kentucky, Advanced Genetic Technologies Center"},"models":["Generic"],"bioprojects":[{"accession":"PRJNA67241"}],"package":"Generic.1.0","attributes":[{"name":"strain","value":"MAFF-241224"}],"status":{"status":"live","when":"2014-08-11T11:24:02.670"},"strain":"MAFF-241224"},"assemblyStatus":"current","bioprojectAccession":"PRJNA67241","assemblyMethod":"Newbler v. 2.5.3","releaseDate":"2011-08-03"},"assemblyStats":{"totalSequenceLength":"58836405","totalUngappedLength":"58836405","numberOfContigs":3298,"contigN50":40517,"contigL50":411,"numberOfScaffolds":3298,"scaffoldN50":40517,"scaffoldL50":411,"numberOfComponentSequences":3298,"gcCount":"23621104","gcPercent":40,"genomeCoverage":"18.5","atgcCount":"58836294"},"wgsInfo":{"wgsProjectAccession":"AFQZ01","masterWgsUrl":"https://www.ncbi.nlm.nih.gov/nuccore/AFQZ00000000.1","wgsContigsUrl":"https://www.ncbi.nlm.nih.gov/Traces/wgs/AFQZ01"},"currentAccession":"GCA_000222935.2","accession":"GCA_000222935.2","sourceDatabase":"SOURCE_DATABASE_GENBANK","organism":{"taxId":1036760,"organismName":"Aciculosporium take MAFF-241224","infraspecificNames":{"strain":"MAFF-241224"}}} +{"assemblyInfo":{"assemblyLevel":"Complete Genome","assemblyName":"ASM41222v2","assemblyType":"haploid","submitter":"Duke University, Molecular Genetics and Microbiology","refseqCategory":"reference genome","bioprojectLineage":[{"bioprojects":[{"accession":"PRJNA39551","title":"[Ashbya] aceris (nom. inval.) Genome sequencing"}]}],"blastUrl":"https://blast.ncbi.nlm.nih.gov/Blast.cgi?PAGE_TYPE=BlastSearch&PROG_DEF=blastn&BLAST_SPEC=GDH_GCA_000412225.2","biosample":{"accession":"SAMN03081470","lastUpdated":"2021-07-14T21:04:31.245","publicationDate":"2014-09-26T00:00:00.000","submissionDate":"2014-09-26T10:52:27.116","sampleIds":[{"db":"GenBank","value":"gb|CP006020.1"}],"description":{"title":"Sample from [Ashbya] aceris (nom. inval.)","organism":{"taxId":566037,"organismName":"[Ashbya] aceris (nom. inval.)"}},"owner":{"name":"Duke University, Molecular Genetics and Microbiology and Institute for Genome Sciences & Policy"},"models":["Generic"],"bioprojects":[{"accession":"PRJNA39551"}],"package":"Generic.1.0","attributes":[{"name":"host","value":"Boisea trivittata"}],"status":{"status":"live","when":"2014-10-01T12:30:14"},"host":"Boisea trivittata"},"assemblyStatus":"current","bioprojectAccession":"PRJNA39551","releaseDate":"2014-06-05"},"assemblyStats":{"totalNumberOfChromosomes":7,"totalSequenceLength":"8867527","totalUngappedLength":"8867527","numberOfContigs":7,"contigN50":1493473,"contigL50":3,"numberOfScaffolds":7,"scaffoldN50":1493473,"scaffoldL50":3,"numberOfComponentSequences":7,"gcCount":"4548443","gcPercent":51.5,"numberOfOrganelles":1,"atgcCount":"8867527"},"organelleInfo":[{"description":"Mitochondrion","totalSeqLength":"26996","submitter":"Duke University, Molecular Genetics and Microbiology"}],"annotationInfo":{"name":"Annotation submitted by Duke University, Molecular Genetics and Microbiology","provider":"Duke University, Molecular Genetics and Microbiology","releaseDate":"2014-10-01","stats":{"geneCounts":{"total":4690,"proteinCoding":4487,"nonCoding":203}}},"currentAccession":"GCA_000412225.2","accession":"GCA_000412225.2","sourceDatabase":"SOURCE_DATABASE_GENBANK","organism":{"taxId":566037,"organismName":"[Ashbya] aceris (nom. inval.)"}} +{"assemblyInfo":{"assemblyLevel":"Scaffold","assemblyName":"ASM370661v3","assemblyType":"haploid","submitter":"UW-Madison","refseqCategory":"reference genome","bioprojectLineage":[{"bioprojects":[{"accession":"PRJNA429441","title":"Sequencing of 196 yeast species from the subphylum Saccharomycotina Genome sequencing and assembly"}]}],"sequencingTech":"Illumina HiSeq","blastUrl":"https://blast.ncbi.nlm.nih.gov/Blast.cgi?PAGE_TYPE=BlastSearch&PROG_DEF=blastn&BLAST_SPEC=GDH_GCA_003706615.3","biosample":{"accession":"SAMN08343339","lastUpdated":"2019-06-20T18:14:41.872","publicationDate":"2018-11-02T00:00:00.000","submissionDate":"2018-01-10T12:39:16.190","sampleIds":[{"label":"Sample name","value":"Candida_montana"},{"db":"SRA","value":"SRS2837972"}],"description":{"title":"Microbe sample from [Candida] montana","organism":{"taxId":49329,"organismName":"[Candida] montana"}},"owner":{"name":"UW-Madison","contacts":[{}]},"models":["Microbe, viral or environmental"],"package":"Microbe.1.0","attributes":[{"name":"strain","value":"NRRL Y-17326"},{"name":"isolation_source","value":"missing"},{"name":"collection_date","value":"2016"},{"name":"geo_loc_name","value":"USA: Madison, Wisconsin"},{"name":"sample_type","value":"cell culture"},{"name":"type-material","value":"type material of Candida montana"},{"name":"culture_collection","value":"NRRL:Y-17326"}],"status":{"status":"live","when":"2018-11-02T12:37:45.337"},"collectionDate":"2016","geoLocName":"USA: Madison, Wisconsin","isolationSource":"missing","strain":"NRRL Y-17326"},"assemblyStatus":"current","bioprojectAccession":"PRJNA429441","assemblyMethod":"SPADES v. 3.7","releaseDate":"2023-07-14"},"assemblyStats":{"totalSequenceLength":"12493498","totalUngappedLength":"12493434","numberOfContigs":61,"contigN50":704842,"contigL50":7,"numberOfScaffolds":60,"scaffoldN50":704842,"scaffoldL50":7,"numberOfComponentSequences":60,"gcCount":"4518305","gcPercent":36,"genomeCoverage":"63.8148","atgcCount":"12493434"},"wgsInfo":{"wgsProjectAccession":"PPLU03","masterWgsUrl":"https://www.ncbi.nlm.nih.gov/nuccore/PPLU00000000.3","wgsContigsUrl":"https://www.ncbi.nlm.nih.gov/Traces/wgs/PPLU03"},"currentAccession":"GCA_003706615.3","typeMaterial":{"typeLabel":"TYPE_MATERIAL","typeDisplayText":"assembly from type material"},"accession":"GCA_003706615.3","sourceDatabase":"SOURCE_DATABASE_GENBANK","organism":{"taxId":49329,"organismName":"[Candida] montana","infraspecificNames":{"strain":"NRRL Y-17326"}}}