From f2e800ddceb16c519d87c0d82b09ee98b2daeb5e Mon Sep 17 00:00:00 2001 From: benedictpaten Date: Sun, 8 Feb 2026 21:32:33 -0800 Subject: [PATCH 1/5] parallelize paffy chain by splitting PAF by query contig Use paffy split_file to split PAF files by query contig before chaining, enabling parallel execution of paffy chain on independent chunks. The chainSplitQueryLength config parameter (default 10MB) controls the minimum query contig length for splitting; contigs shorter than this are batched together. Set to 0 to disable splitting and preserve original behavior. Co-Authored-By: Claude Opus 4.6 --- src/cactus/cactus_progressive_config.xml | 1 + src/cactus/paf/local_alignment.py | 90 +++++++++++++++++++++--- submodules/paffy | 2 +- 3 files changed, 82 insertions(+), 11 deletions(-) diff --git a/src/cactus/cactus_progressive_config.xml b/src/cactus/cactus_progressive_config.xml index 4b821a953..fbfbf714f 100644 --- a/src/cactus/cactus_progressive_config.xml +++ b/src/cactus/cactus_progressive_config.xml @@ -110,6 +110,7 @@ outputSecondaryAlignments="0" dechunkBatchSize="1000" pickIngroupPrimaryAlignmentsSeparatelyToOutgroups="1" + chainSplitQueryLength="10000000" slurmChunkScale="3" bigGenomeChunkScale="2" > diff --git a/src/cactus/paf/local_alignment.py b/src/cactus/paf/local_alignment.py index d49890225..0474927b9 100755 --- a/src/cactus/paf/local_alignment.py +++ b/src/cactus/paf/local_alignment.py @@ -609,27 +609,83 @@ def chain_alignments(job, alignment_files, alignment_names, reference_event_name def chain_one_alignment(job, alignment_file, alignment_name, params, include_inverted_alignments): - """ run paffy chain on one PAF. include_inverted_alignemnts is a flag to control if we additionally include - the inverted paf records for chaining. + """ run paffy chain on one PAF. include_inverted_alignments is a flag to control if we additionally include + the inverted paf records for chaining. If chainSplitQueryLength is set in the config, splits the PAF by query + contig and chains each chunk in parallel before concatenating the results. """ work_dir = job.fileStore.getLocalTempDir() alignment_path = os.path.join(work_dir, alignment_name + '.paf') - alignment_inv_path = os.path.join(work_dir, alignment_name + '.inv.paf') output_path = os.path.join(work_dir, alignment_name + '.chained.paf') # Copy the alignments from the job store job.fileStore.readGlobalFile(alignment_file, alignment_path) - # Get the forward and reverse versions of each alignment for symmetry with chaining if include_inverted_alignments - # is set + # Append inverted alignments BEFORE splitting (inversion swaps query/target) if include_inverted_alignments: + alignment_inv_path = os.path.join(work_dir, alignment_name + '.inv.paf') shutil.copyfile(alignment_path, alignment_inv_path) cactus_call(parameters=['paffy', 'invert', "-i", alignment_inv_path], outfile=alignment_path, outappend=True, job_memory=job.memory) - # Now chain the alignments - cactus_call(parameters=['paffy', 'chain', "-i", alignment_path, + chain_params = ['paffy', 'chain', + "--maxGapLength", params.find("blast").attrib["chainMaxGapLength"], + "--chainGapOpen", params.find("blast").attrib["chainGapOpen"], + "--chainGapExtend", params.find("blast").attrib["chainGapExtend"], + "--trimFraction", params.find("blast").attrib["chainTrimFraction"], + "--logLevel", getLogLevelString()] + + split_query_length = getOptionalAttrib(params.find("blast"), "chainSplitQueryLength", + typeFn=int, default=0) + + if split_query_length > 0: + # Split the PAF by query contig + split_dir = os.path.join(work_dir, 'split') + os.makedirs(split_dir) + split_prefix = os.path.join(split_dir, 'split_') + cactus_call(parameters=['paffy', 'split_file', '-i', alignment_path, '-q', + '-m', str(split_query_length), '-p', split_prefix], + job_memory=job.memory) + split_files = [os.path.join(split_dir, f) for f in sorted(os.listdir(split_dir)) + if f.endswith('.paf')] + else: + split_files = [] + + if len(split_files) <= 1: + # No splitting or only one chunk — chain directly (no fan-out overhead) + input_path = split_files[0] if split_files else alignment_path + cactus_call(parameters=chain_params + ["-i", input_path], + outfile=output_path, job_memory=job.memory) + job.fileStore.deleteGlobalFile(alignment_file) + return job.fileStore.writeGlobalFile(output_path) + + # Multiple chunks — fan out parallel chain jobs + root_job = Job() + job.addChild(root_job) + + chained_chunk_ids = [] + for split_file in split_files: + chunk_size = os.path.getsize(split_file) + chunk_file_id = job.fileStore.writeGlobalFile(split_file) + chained_chunk_ids.append( + root_job.addChildJobFn(chain_one_split_chunk, chunk_file_id, params, + disk=4 * chunk_size, + memory=cactus_clamp_memory(2 * chunk_size)).rv()) + + job.fileStore.deleteGlobalFile(alignment_file) + return root_job.addFollowOnJobFn(concatenate_chained_chunks, chained_chunk_ids, + disk=2 * alignment_file.size).rv() + + +def chain_one_split_chunk(job, chunk_file_id, params): + """Run paffy chain on a single split chunk of a PAF file.""" + work_dir = job.fileStore.getLocalTempDir() + chunk_path = os.path.join(work_dir, 'chunk.paf') + output_path = os.path.join(work_dir, 'chained_chunk.paf') + + job.fileStore.readGlobalFile(chunk_file_id, chunk_path) + + cactus_call(parameters=['paffy', 'chain', "-i", chunk_path, "--maxGapLength", params.find("blast").attrib["chainMaxGapLength"], "--chainGapOpen", params.find("blast").attrib["chainGapOpen"], "--chainGapExtend", params.find("blast").attrib["chainGapExtend"], @@ -637,11 +693,25 @@ def chain_one_alignment(job, alignment_file, alignment_name, params, include_inv "--logLevel", getLogLevelString()], outfile=output_path, job_memory=job.memory) - job.fileStore.deleteGlobalFile(alignment_file) + job.fileStore.deleteGlobalFile(chunk_file_id) + return job.fileStore.writeGlobalFile(output_path) + + +def concatenate_chained_chunks(job, chained_chunk_ids): + """Concatenate chained PAF chunks back into a single file.""" + work_dir = job.fileStore.getLocalTempDir() + output_path = os.path.join(work_dir, 'chained_combined.paf') + + with open(output_path, 'w') as out_file: + for chunk_id in chained_chunk_ids: + chunk_path = job.fileStore.readGlobalFile(chunk_id) + with open(chunk_path, 'r') as chunk_file: + shutil.copyfileobj(chunk_file, out_file) + job.fileStore.deleteGlobalFile(chunk_id) return job.fileStore.writeGlobalFile(output_path) - - + + def tile_alignments(job, alignment_files, reference_event_name, params, has_resources=False, total_sequence_size=0): # do everything post-chaining # Memory: paffy tile loads all PAFs + creates SequenceCountArray (2 bytes per base of query sequence) diff --git a/submodules/paffy b/submodules/paffy index 8b3c73246..f60e1b8fc 160000 --- a/submodules/paffy +++ b/submodules/paffy @@ -1 +1 @@ -Subproject commit 8b3c732460ad8c307161ec558ba72736b4fd421a +Subproject commit f60e1b8fc79ad7b68b9fc7c29cbbd41ff475d496 From df5526b77de0e8143c1ca50e1ccdbe32d3269764 Mon Sep 17 00:00:00 2001 From: Glenn Hickey Date: Mon, 9 Feb 2026 08:44:41 -0500 Subject: [PATCH 2/5] fix mac build --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 2b8d1f5dc..6d2086fc0 100644 --- a/Makefile +++ b/Makefile @@ -295,7 +295,7 @@ suball.abPOA: rm -fr ${INCLDIR}/simde && cp -r submodules/abPOA/include/simde ${INCLDIR} suball.lastz: - cd submodules/lastz/src && sed -i Makefile -e 's/-lm -o/-lm $${LIBS} -o/g' + cd submodules/lastz/src && sed -i -e 's/-lm -o/-lm $${LIBS} -o/g' Makefile cd submodules/lastz && LIBS="${jemallocLib}" ${MAKE} ln -f submodules/lastz/src/lastz bin @@ -321,7 +321,7 @@ suball.FASTGA: ln -f submodules/FASTGA/GIXmake ${BINDIR} ln -f submodules/FASTGA/GIXrm ${BINDIR} suball.FASTAN: - cd submodules/FASTAN && sed -i Makefile -e 's/-lm -lz/-lm -lpthread -lz/g' && ${MAKE} || true + cd submodules/FASTAN && sed -i -e 's/-lm -lz/-lm -lpthread -lz/g' Makefile && ${MAKE} || true ln -f submodules/FASTAN/FasTAN ${BINDIR} suball.alntools: cd submodules/alntools && ${MAKE} From 26dacbe3586580cc276431490fba34f0df1a5398 Mon Sep 17 00:00:00 2001 From: benedictpaten Date: Wed, 11 Feb 2026 09:09:12 -0800 Subject: [PATCH 3/5] increase chainSplitQueryLength default to 100M and add comment Co-Authored-By: Claude Opus 4.6 --- src/cactus/cactus_progressive_config.xml | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/cactus/cactus_progressive_config.xml b/src/cactus/cactus_progressive_config.xml index fbfbf714f..79d1c27dc 100644 --- a/src/cactus/cactus_progressive_config.xml +++ b/src/cactus/cactus_progressive_config.xml @@ -79,6 +79,9 @@ + From c44982dfbe3c7fbadac959a86398c311dcfa3b22 Mon Sep 17 00:00:00 2001 From: benedictpaten Date: Wed, 11 Feb 2026 10:02:24 -0800 Subject: [PATCH 4/5] skip PAF query-contig splitting for small files Add chainSplitMinFileSize config option (default 100MB) to skip the paffy split_file step when the PAF is below a size threshold, avoiding overhead for small alignments. Also increase chainSplitQueryLength to 1B. Co-Authored-By: Claude Opus 4.6 --- src/cactus/cactus_progressive_config.xml | 5 ++++- src/cactus/paf/local_alignment.py | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/src/cactus/cactus_progressive_config.xml b/src/cactus/cactus_progressive_config.xml index 79d1c27dc..6b630bff1 100644 --- a/src/cactus/cactus_progressive_config.xml +++ b/src/cactus/cactus_progressive_config.xml @@ -82,6 +82,8 @@ + diff --git a/src/cactus/paf/local_alignment.py b/src/cactus/paf/local_alignment.py index 0474927b9..84eb726c0 100755 --- a/src/cactus/paf/local_alignment.py +++ b/src/cactus/paf/local_alignment.py @@ -637,8 +637,11 @@ def chain_one_alignment(job, alignment_file, alignment_name, params, include_inv split_query_length = getOptionalAttrib(params.find("blast"), "chainSplitQueryLength", typeFn=int, default=0) + chain_split_min_file_size = getOptionalAttrib(params.find("blast"), "chainSplitMinFileSize", + typeFn=int, default=0) + paf_file_size = os.path.getsize(alignment_path) - if split_query_length > 0: + if paf_file_size >= chain_split_min_file_size: # Split the PAF by query contig split_dir = os.path.join(work_dir, 'split') os.makedirs(split_dir) From 62258b433ae6d3c613e070890584128087f08006 Mon Sep 17 00:00:00 2001 From: benedictpaten Date: Wed, 11 Feb 2026 10:05:35 -0800 Subject: [PATCH 5/5] swap chainSplitQueryLength and chainSplitMinFileSize values Co-Authored-By: Claude Opus 4.6 --- src/cactus/cactus_progressive_config.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/cactus/cactus_progressive_config.xml b/src/cactus/cactus_progressive_config.xml index 6b630bff1..999813d5b 100644 --- a/src/cactus/cactus_progressive_config.xml +++ b/src/cactus/cactus_progressive_config.xml @@ -115,8 +115,8 @@ outputSecondaryAlignments="0" dechunkBatchSize="1000" pickIngroupPrimaryAlignmentsSeparatelyToOutgroups="1" - chainSplitQueryLength="1000000000" - chainSplitMinFileSize="100000000" + chainSplitQueryLength="100000000" + chainSplitMinFileSize="1000000000" slurmChunkScale="3" bigGenomeChunkScale="2" >